commit michi patch to target mapkeeper instead of dht_persistent_store; remove cruft; cleanup makefile
git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@2938 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
parent
f05ccec98f
commit
e944fa63c9
14 changed files with 85 additions and 1315 deletions
|
@ -1,134 +0,0 @@
|
||||||
#include "PersistentParent.h"
|
|
||||||
#include "LSMPersistentStoreImpl.h"
|
|
||||||
#include "TabletMetadata.h"
|
|
||||||
#include "tcpclient.h"
|
|
||||||
|
|
||||||
#include <dht/LogUtils.h>
|
|
||||||
|
|
||||||
// Initialize the logger
|
|
||||||
static log4cpp::Category &log =
|
|
||||||
log4cpp::Category::getInstance("dht.su."__FILE__);
|
|
||||||
|
|
||||||
class LSMPersistentParent : PersistentParent {
|
|
||||||
public:
|
|
||||||
|
|
||||||
LSMPersistentParent() : ordered(true), hashed(false) {
|
|
||||||
} // we ignore the ordered flag...
|
|
||||||
~LSMPersistentParent() {
|
|
||||||
DHT_DEBUG_STREAM() << "~LSMPersistentParent called";
|
|
||||||
}
|
|
||||||
SuCode::ResponseCode install(const SectionConfig& config) {
|
|
||||||
DHT_DEBUG_STREAM() << "LSM install called";
|
|
||||||
return SuCode::SuOk;
|
|
||||||
}
|
|
||||||
SuCode::ResponseCode init(const SectionConfig& config) {
|
|
||||||
DHT_DEBUG_STREAM() << "LSM init called";
|
|
||||||
ordered.init(config);
|
|
||||||
hashed.init(config);
|
|
||||||
return SuCode::SuOk;
|
|
||||||
}
|
|
||||||
PersistentStore *getHashedStore() {
|
|
||||||
DHT_DEBUG_STREAM() << "LSM getHashedStore called";
|
|
||||||
return &hashed;
|
|
||||||
}
|
|
||||||
PersistentStore *getOrderedStore() {
|
|
||||||
DHT_DEBUG_STREAM() << "LSM getOrderedStore called";
|
|
||||||
return &ordered;
|
|
||||||
}
|
|
||||||
bool ping() {
|
|
||||||
DHT_DEBUG_STREAM() << "LSM ping called";
|
|
||||||
return ordered.ping();
|
|
||||||
}
|
|
||||||
std::string getName() {
|
|
||||||
DHT_DEBUG_STREAM() << "LSM getName called";
|
|
||||||
return "logstore";
|
|
||||||
}
|
|
||||||
SuCode::ResponseCode getFreeSpaceBytes(double & freeSpaceBytes) {
|
|
||||||
DHT_DEBUG_STREAM() << "LSM getFreeSpaceBytes called";
|
|
||||||
freeSpaceBytes = 1024.0 *1024.0 * 1024.0; // XXX stub
|
|
||||||
return SuCode::SuOk;
|
|
||||||
}
|
|
||||||
SuCode::ResponseCode getDiskMaxBytes(double & diskMaxBytes) {
|
|
||||||
DHT_DEBUG_STREAM() << "LSM getDiskMaxBytes called";
|
|
||||||
diskMaxBytes = 10.0 * 1024.0 *1024.0 * 1024.0; // XXX stub
|
|
||||||
return SuCode::SuOk;
|
|
||||||
}
|
|
||||||
SuCode::ResponseCode cleanupTablet(uint64_t uniqId,
|
|
||||||
const std::string & tableName,
|
|
||||||
const std::string & tabletName) {
|
|
||||||
DHT_DEBUG_STREAM() << "LSM cleanupTablet called";
|
|
||||||
return SuCode::SuOk; // XXX stub
|
|
||||||
}
|
|
||||||
SuCode::ResponseCode getTabletMappingList(TabletList & tabletList) {
|
|
||||||
DHT_DEBUG_STREAM() << "LSM getTabletMappingList called";
|
|
||||||
|
|
||||||
std::string metadata_table = std::string("ydht_metadata_table");
|
|
||||||
std::string metadata_tablet = std::string("0");
|
|
||||||
std::string metadata_tabletEnd = std::string("1");
|
|
||||||
|
|
||||||
size_t startlen;
|
|
||||||
size_t endlen;
|
|
||||||
|
|
||||||
unsigned char * start_tup = ordered.my_strcat(metadata_table, metadata_tablet, "", &startlen);
|
|
||||||
unsigned char * end_tup = ordered.my_strcat(metadata_table, metadata_tabletEnd, "", &endlen);
|
|
||||||
|
|
||||||
DHT_DEBUG_STREAM() << "start tup = " << start_tup;
|
|
||||||
DHT_DEBUG_STREAM() << "end tup = " << end_tup;
|
|
||||||
|
|
||||||
datatuple * starttup = datatuple::create(start_tup, startlen);
|
|
||||||
datatuple * endtup = datatuple::create(end_tup, endlen);
|
|
||||||
|
|
||||||
free(start_tup);
|
|
||||||
free(end_tup);
|
|
||||||
|
|
||||||
uint8_t rcode = logstore_client_op_returns_many(ordered.l_, OP_SCAN, starttup, endtup, 0); // 0 = no limit.
|
|
||||||
|
|
||||||
datatuple::freetuple(starttup);
|
|
||||||
datatuple::freetuple(endtup);
|
|
||||||
|
|
||||||
datatuple * next;
|
|
||||||
|
|
||||||
TabletMetadata m;
|
|
||||||
if(rcode == LOGSTORE_RESPONSE_SENDING_TUPLES) {
|
|
||||||
while((next = logstore_client_next_tuple(ordered.l_))) {
|
|
||||||
ordered.metadata_buf(m, next->key(), next->keylen());
|
|
||||||
|
|
||||||
struct ydht_maptable_schema md;
|
|
||||||
std::string cat = m.table() + m.tablet();
|
|
||||||
md.uniq_id = 0;
|
|
||||||
for(int i = 0; i < cat.length(); i++) {
|
|
||||||
md.uniq_id += ((unsigned char)cat[i]); // XXX obviously, this is a terrible hack (and a poor hash function)
|
|
||||||
}
|
|
||||||
md.tableName = m.table();
|
|
||||||
md.tabletName = m.tablet();
|
|
||||||
DHT_DEBUG_STREAM() << md.uniq_id << " : " << md.tableName << " : " << md.tabletName;
|
|
||||||
tabletList.push_back(md);
|
|
||||||
datatuple::freetuple(next);
|
|
||||||
}
|
|
||||||
DHT_DEBUG_STREAM() << "getTabletMappingListreturns";
|
|
||||||
} else {
|
|
||||||
DHT_ERROR_STREAM() << "error " << (int)rcode << " in getTabletMappingList.";
|
|
||||||
return SuCode::PStoreUnexpectedError; // XXX should be "connection closed error" or something...
|
|
||||||
}
|
|
||||||
|
|
||||||
return SuCode::SuOk;
|
|
||||||
}
|
|
||||||
SuCode::ResponseCode getApproximateTableSize
|
|
||||||
(const std::string& tableId,
|
|
||||||
int64_t& tableSize, int64_t & rowCount) {
|
|
||||||
DHT_DEBUG_STREAM() << "LSM getApproximateTableSize called";
|
|
||||||
return ordered.getApproximateTableSize(tableId, tableSize, rowCount);
|
|
||||||
}
|
|
||||||
private:
|
|
||||||
LSMPersistentStoreImpl ordered;
|
|
||||||
LSMPersistentStoreImpl hashed;
|
|
||||||
};
|
|
||||||
|
|
||||||
extern "C" {
|
|
||||||
void * lsmsherpa_init();
|
|
||||||
}
|
|
||||||
|
|
||||||
static LSMPersistentParent pp;
|
|
||||||
void * lsmsherpa_init() {
|
|
||||||
return &pp;
|
|
||||||
}
|
|
|
@ -1,664 +0,0 @@
|
||||||
/**
|
|
||||||
* \file LSMPersistentStoreImpl.cc
|
|
||||||
*
|
|
||||||
* Copyright (c) 2008 Yahoo, Inc.
|
|
||||||
* All rights reserved.
|
|
||||||
*/
|
|
||||||
#include "TabletMetadata.h"
|
|
||||||
#include "TabletIterator.h"
|
|
||||||
#include "ScanContinuation.h"
|
|
||||||
#include "OrderedScanContinuation.h"
|
|
||||||
#include "TabletRange.h"
|
|
||||||
#include "SuLimits.h"
|
|
||||||
#include "dht/UtilityBuffer.h"
|
|
||||||
|
|
||||||
#include <dht/LogUtils.h>
|
|
||||||
|
|
||||||
#include "LSMPersistentStoreImpl.h"
|
|
||||||
|
|
||||||
#include <tcpclient.h>
|
|
||||||
|
|
||||||
|
|
||||||
// Initialize the logger
|
|
||||||
static log4cpp::Category &log =
|
|
||||||
log4cpp::Category::getInstance("dht.su."__FILE__);
|
|
||||||
|
|
||||||
class LSMIterator : public TabletIterator<StorageRecord> {
|
|
||||||
friend class LSMPersistentStoreImpl;
|
|
||||||
|
|
||||||
public:
|
|
||||||
// StorageRecord * data; // <- defined in parent class. next() updates it.
|
|
||||||
|
|
||||||
LSMIterator(LSMPersistentStoreImpl* lsmImpl, const TabletMetadata& tabletMeta, ScanContinuationAutoPtr continuation,
|
|
||||||
ScanSelect::Selector ignored, const uint64_t expiryTime,
|
|
||||||
unsigned int scanLimit, size_t byteLimit /*ignored*/) : lsmImpl(lsmImpl) {
|
|
||||||
const unsigned char low_eos = (unsigned char) 0xFE;
|
|
||||||
const unsigned char high_eos = (unsigned char) 0xFF;
|
|
||||||
const unsigned char zero = (unsigned char) 0x00;
|
|
||||||
(void)zero;
|
|
||||||
// open iterator
|
|
||||||
datatuple * starttup = NULL;
|
|
||||||
datatuple * endtup = NULL;
|
|
||||||
|
|
||||||
size_t start_key_len, end_key_len;
|
|
||||||
unsigned char *start_key, *end_key;
|
|
||||||
|
|
||||||
if(continuation->isOrdered()) {
|
|
||||||
const OrderedScanContinuation& os = static_cast<const OrderedScanContinuation&>(*continuation);
|
|
||||||
|
|
||||||
if(!os.getStartKey().isMinKey()) {
|
|
||||||
start_key = lsmImpl->buf_key(tabletMeta, os.getStartKey().getKey(), &start_key_len);
|
|
||||||
} else {
|
|
||||||
start_key = lsmImpl->buf_key(tabletMeta, "", &start_key_len);
|
|
||||||
}
|
|
||||||
|
|
||||||
if(!os.getEndKey().isMaxKey()) {
|
|
||||||
end_key = lsmImpl->buf_key(tabletMeta, os.getEndKey().getKey(), &end_key_len);
|
|
||||||
} else {
|
|
||||||
end_key = lsmImpl->buf_key(tabletMeta, "", &end_key_len);
|
|
||||||
if(end_key[end_key_len-2] != low_eos) {
|
|
||||||
DHT_ERROR_STREAM() << "CORRUPT lsm tablet key = " << (char*)end_key;
|
|
||||||
} else {
|
|
||||||
end_key[end_key_len-2] = high_eos;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
DHT_WARN_STREAM() << "Scanning hash table, but ignoring contiunation range!";
|
|
||||||
start_key = lsmImpl->buf_key(tabletMeta, "", &start_key_len);
|
|
||||||
end_key = lsmImpl->buf_key(tabletMeta, "", &end_key_len);
|
|
||||||
if(end_key[end_key_len-2] != low_eos) {
|
|
||||||
DHT_ERROR_STREAM() << "CORRUPT lsm tablet key = " << (char*)end_key;
|
|
||||||
} else {
|
|
||||||
end_key[end_key_len-2] = high_eos;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
starttup = datatuple::create(start_key, start_key_len);
|
|
||||||
std::string dbg((char*)start_key, start_key_len - 1);
|
|
||||||
DHT_DEBUG_STREAM() << "start lsm key = " << dbg;
|
|
||||||
|
|
||||||
endtup = datatuple::create(end_key, end_key_len);
|
|
||||||
std::string dbg2((char*)end_key, end_key_len - 1);
|
|
||||||
DHT_DEBUG_STREAM() << "end lsm key = " << dbg2;
|
|
||||||
|
|
||||||
uint8_t rc = logstore_client_op_returns_many(lsmImpl->scan_l_, OP_SCAN, starttup, endtup, scanLimit);
|
|
||||||
|
|
||||||
datatuple::freetuple(starttup);
|
|
||||||
datatuple::freetuple(endtup);
|
|
||||||
|
|
||||||
if(rc != LOGSTORE_RESPONSE_SENDING_TUPLES) {
|
|
||||||
this->error = rc;
|
|
||||||
} else {
|
|
||||||
this->error = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
this->data = new StorageRecord();
|
|
||||||
}
|
|
||||||
~LSMIterator() {
|
|
||||||
DHT_DEBUG_STREAM() << "close iterator called";
|
|
||||||
// close iterator by running to the end of it... TODO devise a better way to close iterators early?
|
|
||||||
while(this->data) {
|
|
||||||
next();
|
|
||||||
}
|
|
||||||
DHT_DEBUG_STREAM() << "close iterator done";
|
|
||||||
}
|
|
||||||
SuCode::ResponseCode next() {
|
|
||||||
datatuple * tup;
|
|
||||||
DHT_DEBUG_STREAM() << "next called";
|
|
||||||
if(error) { // only catches errors during scan setup.
|
|
||||||
return SuCode::PStoreUnexpectedError;
|
|
||||||
} else if((tup = logstore_client_next_tuple(lsmImpl->scan_l_))) {
|
|
||||||
DHT_DEBUG_STREAM() << "found tuple, key = " << tup->key() << " datalen = " << tup->datalen();
|
|
||||||
SuCode::ResponseCode rc = lsmImpl->tup_buf(*(this->data), tup);
|
|
||||||
datatuple::freetuple(tup);
|
|
||||||
return rc;
|
|
||||||
} else {
|
|
||||||
DHT_DEBUG_STREAM() << "no tuple";
|
|
||||||
delete this->data;
|
|
||||||
this->data = NULL;
|
|
||||||
return SuCode::PStoreScanEnd; // XXX need to differentiate between end of scan and failure
|
|
||||||
}
|
|
||||||
}
|
|
||||||
private:
|
|
||||||
LSMPersistentStoreImpl * lsmImpl;
|
|
||||||
uint8_t error;
|
|
||||||
};
|
|
||||||
|
|
||||||
void LSMPersistentStoreImpl::buf_metadata(unsigned char ** buf, size_t *len, const TabletMetadata &m) {
|
|
||||||
std::string ydht_metadata_table = std::string("ydht_metadata_table");
|
|
||||||
std::string zero = std::string("0");
|
|
||||||
std::string tmp; unsigned char * tmp_p; size_t tmp_len;
|
|
||||||
tmp_p = my_strcat(m.table(), m.tablet(), "", &tmp_len);
|
|
||||||
tmp.assign((const char*)tmp_p, tmp_len);
|
|
||||||
free(tmp_p);
|
|
||||||
*buf = my_strcat(ydht_metadata_table, zero, tmp, len);
|
|
||||||
}
|
|
||||||
|
|
||||||
void LSMPersistentStoreImpl::metadata_buf(TabletMetadata &m, const unsigned char * buf, size_t len) {
|
|
||||||
// Metadata table key format:
|
|
||||||
// ydht_metadata_table[low_eos]0[low_eos]table[low_eos]tablet[low_eos][low_eos]
|
|
||||||
|
|
||||||
assert(buf);
|
|
||||||
std::string ydht_metadata_table, zero, tmp, table, tablet, empty;
|
|
||||||
my_strtok(buf, len, ydht_metadata_table, zero, tmp);
|
|
||||||
assert(tmp.c_str());
|
|
||||||
my_strtok((const unsigned char*)tmp.c_str(), tmp.length(), table, tablet, empty);
|
|
||||||
|
|
||||||
DHT_DEBUG_STREAM() << "Parsed metadata: [" << table << "] [" << tablet << "] [" << empty << "](empty)";
|
|
||||||
m.setTable(table);
|
|
||||||
m.setTablet(tablet);
|
|
||||||
m.setTabletId(tmp.substr(0, tmp.length() - 1));
|
|
||||||
}
|
|
||||||
|
|
||||||
unsigned char* LSMPersistentStoreImpl::my_strcat(const std::string& table,
|
|
||||||
const std::string& tablet,
|
|
||||||
const std::string& key,
|
|
||||||
size_t * len) {
|
|
||||||
const char * a = table.c_str();
|
|
||||||
size_t alen = table.length();
|
|
||||||
const char * b = tablet.c_str();
|
|
||||||
size_t blen = tablet.length();
|
|
||||||
const char * c = key.c_str();
|
|
||||||
size_t clen = key.length();
|
|
||||||
// The following two bytes cannot occur in valid utf-8
|
|
||||||
const unsigned char low_eos = (unsigned char) 0xFE;
|
|
||||||
const unsigned char high_eos = (unsigned char) 0xFF;
|
|
||||||
const unsigned char zero = (unsigned char) 0x00;
|
|
||||||
(void) high_eos;
|
|
||||||
*len = alen + 1 + blen + 1 + clen + 1;
|
|
||||||
unsigned char * ret = (unsigned char*)malloc(*len);
|
|
||||||
unsigned char * buf = ret;
|
|
||||||
memcpy(buf, a, alen); buf += alen;
|
|
||||||
memcpy(buf, &low_eos, 1); buf ++;
|
|
||||||
memcpy(buf, b, blen); buf += blen;
|
|
||||||
memcpy(buf, &low_eos, 1); buf ++;
|
|
||||||
memcpy(buf, c, clen); buf += clen;
|
|
||||||
memcpy(buf, &zero, 1); buf = 0;
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
void LSMPersistentStoreImpl::my_strtok(const unsigned char* in, size_t len, std::string& table, std::string& tablet, std::string& key) {
|
|
||||||
const char * tablep = (const char*) in;
|
|
||||||
|
|
||||||
const unsigned char low_eos = (unsigned char) 0xFE;
|
|
||||||
const unsigned char high_eos = (unsigned char) 0xFF;
|
|
||||||
const unsigned char zero = (unsigned char) 0x00;
|
|
||||||
(void)high_eos; (void)zero;
|
|
||||||
|
|
||||||
const char * tabletp = ((const char*)memchr(tablep, low_eos, len)) + 1;
|
|
||||||
int tablep_len = (tabletp - tablep)-1; // -1 is due to low_eos terminator
|
|
||||||
const char * keyp = ((const char*)memchr(tabletp, low_eos, len-tablep_len)) + 1;
|
|
||||||
int tabletp_len = (keyp - tabletp) - 1; // -1 is due to low_eos terminator
|
|
||||||
int keyp_len = (len - (tablep_len + 1 + tabletp_len + 1)) - 1; // -1 is due to null terminator.
|
|
||||||
|
|
||||||
table.assign(tablep, tablep_len);
|
|
||||||
tablet.assign(tabletp, tabletp_len);
|
|
||||||
key.assign(keyp, keyp_len);
|
|
||||||
}
|
|
||||||
unsigned char *
|
|
||||||
LSMPersistentStoreImpl::buf_key(const TabletMetadata&m,
|
|
||||||
const RecordKey& k, size_t * len) {
|
|
||||||
return buf_key(m,k.name(),len);
|
|
||||||
}
|
|
||||||
unsigned char *
|
|
||||||
LSMPersistentStoreImpl::buf_key(const TabletMetadata&m,
|
|
||||||
const std::string s, size_t * len) {
|
|
||||||
const unsigned char low_eos = (unsigned char) 0xFE;
|
|
||||||
const unsigned char high_eos = (unsigned char) 0xFF;
|
|
||||||
const unsigned char zero = (unsigned char) 0x00;
|
|
||||||
(void)high_eos; (void)low_eos;
|
|
||||||
std::string md_name = m.getTabletId();
|
|
||||||
*len = md_name.length() /*+ 1*/ + s.length() + 1; // md_name ends in a low_eos...
|
|
||||||
unsigned char * ret = (unsigned char*)malloc(*len);
|
|
||||||
unsigned char * buf = ret;
|
|
||||||
memcpy(buf, md_name.c_str(), md_name.length()); buf += md_name.length();
|
|
||||||
//memcpy(buf, &low_eos, 1); buf++;
|
|
||||||
memcpy(buf, s.c_str(), s.length()); buf += s.length();
|
|
||||||
memcpy(buf, &zero, 1);
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
unsigned char *
|
|
||||||
LSMPersistentStoreImpl::buf_val(const StorageRecord& val, size_t * len) {
|
|
||||||
uint64_t expiryTime = val.expiryTime();
|
|
||||||
uint32_t dataBlob_len, metadata_len; // Below, we assume these are of the same type.
|
|
||||||
|
|
||||||
const UtilityBuffer& dataBlob = val.dataBlob();
|
|
||||||
const UtilityBuffer& metadata = val.metadata();
|
|
||||||
|
|
||||||
dataBlob_len = dataBlob.dataSize();
|
|
||||||
metadata_len = metadata.dataSize();
|
|
||||||
|
|
||||||
// DHT_DEBUG_STREAM() << "write storage record expiryTime " << expiryTime << " metadata len " << metadata_len << " datalen " << dataBlob_len;
|
|
||||||
|
|
||||||
*len = sizeof(expiryTime) + 2 * sizeof(dataBlob_len) + dataBlob_len + metadata_len;
|
|
||||||
|
|
||||||
unsigned char * ret = (unsigned char *) malloc(*len);
|
|
||||||
unsigned char * buf = ret;
|
|
||||||
memcpy(buf, &expiryTime, sizeof(expiryTime)); buf += sizeof(expiryTime);
|
|
||||||
memcpy(buf, &metadata_len, sizeof(metadata_len)); buf += sizeof(metadata_len);
|
|
||||||
memcpy(buf, &dataBlob_len, sizeof(dataBlob_len)); buf += sizeof(dataBlob_len);
|
|
||||||
memcpy(buf, const_cast<UtilityBuffer&>(metadata).buffer(),
|
|
||||||
metadata_len); buf += metadata_len;
|
|
||||||
memcpy(buf, const_cast<UtilityBuffer&>(dataBlob).buffer(),
|
|
||||||
dataBlob_len); buf += dataBlob_len;
|
|
||||||
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
SuCode::ResponseCode
|
|
||||||
LSMPersistentStoreImpl::tup_buf(StorageRecord& ret, datatuple * tup) {
|
|
||||||
SuCode::ResponseCode rc = SuCode::SuOk;
|
|
||||||
if(tup->key()) {
|
|
||||||
rc = key_buf(ret, tup->key(), tup->keylen());
|
|
||||||
}
|
|
||||||
if(rc == SuCode::SuOk && !tup->isDelete() && tup->datalen() > 1) {
|
|
||||||
return val_buf(ret, tup->data(), tup->datalen());
|
|
||||||
} else {
|
|
||||||
return rc;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
SuCode::ResponseCode
|
|
||||||
LSMPersistentStoreImpl::key_buf(StorageRecord& ret,
|
|
||||||
const unsigned char * buf, size_t buf_len) {
|
|
||||||
std::string table, tablet, key;
|
|
||||||
my_strtok(buf, buf_len, table, tablet, key);
|
|
||||||
DHT_DEBUG_STREAM() << "key_buf parsed datatuple key: table = [" << table << "] tablet = [" << tablet << "] key = [" << key << "]";
|
|
||||||
ret.recordKey().setName(key);
|
|
||||||
return SuCode::SuOk;
|
|
||||||
}
|
|
||||||
|
|
||||||
SuCode::ResponseCode
|
|
||||||
LSMPersistentStoreImpl::val_buf(StorageRecord& ret,
|
|
||||||
const unsigned char * buf, size_t buf_len) {
|
|
||||||
uint64_t expiryTime;
|
|
||||||
uint32_t dataBlob_len, metadata_len;
|
|
||||||
// DHT_DEBUG_STREAM() << "read storage record buf_len " << buf_len << std::endl;
|
|
||||||
assert(buf_len >= sizeof(expiryTime) + sizeof(dataBlob_len) + sizeof(metadata_len));
|
|
||||||
|
|
||||||
// Copy header onto stack.
|
|
||||||
|
|
||||||
memcpy(&expiryTime, buf, sizeof(expiryTime)); buf += sizeof(expiryTime);
|
|
||||||
memcpy(&metadata_len, buf, sizeof(metadata_len)); buf += sizeof(metadata_len);
|
|
||||||
memcpy(&dataBlob_len, buf, sizeof(dataBlob_len)); buf += sizeof(dataBlob_len);
|
|
||||||
|
|
||||||
// Is there room in ret?
|
|
||||||
|
|
||||||
assert(buf_len >= sizeof(expiryTime) + sizeof(dataBlob_len) + sizeof(metadata_len) + metadata_len + dataBlob_len);
|
|
||||||
|
|
||||||
if(ret.metadata().bufSize() < metadata_len ||
|
|
||||||
ret.dataBlob().bufSize() < dataBlob_len) {
|
|
||||||
return SuCode::PStoreDataTruncated; // RCS: This is what the mysql implementation does.
|
|
||||||
// it's somewhat misleading, as we don't truncate anything.
|
|
||||||
}
|
|
||||||
|
|
||||||
// Copy the data into ret.
|
|
||||||
|
|
||||||
// ret->setName(recordName); // somebody else's problem....
|
|
||||||
ret.setExpiryTime(expiryTime);
|
|
||||||
|
|
||||||
memcpy(ret.metadata().buffer(), buf, metadata_len); buf += metadata_len;
|
|
||||||
memcpy(ret.dataBlob().buffer(), buf, dataBlob_len); buf += dataBlob_len;
|
|
||||||
ret.metadata().setDataSize(metadata_len);
|
|
||||||
ret.dataBlob().setDataSize(dataBlob_len);
|
|
||||||
return SuCode::SuOk;
|
|
||||||
}
|
|
||||||
|
|
||||||
LSMPersistentStoreImpl::
|
|
||||||
LSMPersistentStoreImpl(bool isOrdered) : isOrdered_(isOrdered), l_(NULL), scan_l_(NULL) {
|
|
||||||
// filestr.open(isOrdered? "/tmp/lsm-log" : "/tmp/lsm-log-hashed", std::fstream::out | std::fstream::app);
|
|
||||||
// It would be unsafe to call the following, since we're statically initialized: DHT_DEBUG_STREAM() << "LSMP constructor called";
|
|
||||||
}
|
|
||||||
|
|
||||||
LSMPersistentStoreImpl::
|
|
||||||
~LSMPersistentStoreImpl()
|
|
||||||
{
|
|
||||||
DHT_DEBUG_STREAM() << "LSMP destructor called";
|
|
||||||
if(l_) logstore_client_close(l_);
|
|
||||||
if(scan_l_) logstore_client_close(scan_l_);
|
|
||||||
DHT_DEBUG_STREAM() << "LSMP destructor cleanly closed connections";
|
|
||||||
}
|
|
||||||
|
|
||||||
SuCode::ResponseCode LSMPersistentStoreImpl::initMetadataMetadata(TabletMetadata& m) {
|
|
||||||
DHT_DEBUG_STREAM() << "LSMP initMetadataMetadata called";
|
|
||||||
|
|
||||||
std::string metadata_table = std::string("ydht_metadata_table");
|
|
||||||
std::string metadata_tablet= std::string("0");
|
|
||||||
size_t keylen;
|
|
||||||
char * key =(char*)my_strcat(metadata_table, metadata_tablet, "", &keylen);
|
|
||||||
std::string s(key, keylen-1);
|
|
||||||
m.setTabletId(s);
|
|
||||||
free(key);
|
|
||||||
return SuCode::SuOk;
|
|
||||||
}
|
|
||||||
|
|
||||||
SuCode::ResponseCode LSMPersistentStoreImpl::
|
|
||||||
init(const SectionConfig &config)
|
|
||||||
{
|
|
||||||
DHT_DEBUG_STREAM() << "LSMP init called";
|
|
||||||
if(!l_) { // workaround bug 2870547
|
|
||||||
l_ = logstore_client_open("localhost", 32432, 60); // XXX hardcode none of these values
|
|
||||||
scan_l_ = logstore_client_open("localhost", 32432, 60); // XXX hardcode none of these values
|
|
||||||
}
|
|
||||||
return l_ ? SuCode::SuOk : FwCode::NotFound;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool LSMPersistentStoreImpl::
|
|
||||||
isOrdered(){
|
|
||||||
DHT_DEBUG_STREAM() << "LSMP isOrdered called";
|
|
||||||
return isOrdered_;
|
|
||||||
}
|
|
||||||
|
|
||||||
SuCode::ResponseCode LSMPersistentStoreImpl::
|
|
||||||
addEmptyTablet(TabletMetadata& tabletMeta)
|
|
||||||
{
|
|
||||||
DHT_DEBUG_STREAM() << "LSMP addEmptyTablet called";
|
|
||||||
// This is a no-op; we'll simply prepend the tablet string to each record.
|
|
||||||
{
|
|
||||||
// Is table name too long?
|
|
||||||
if(tabletMeta.table().length() > SuLimits::MAX_TABLE_NAME_LENGTH_DB) {
|
|
||||||
return SuCode::PStoreIOFailed;
|
|
||||||
}
|
|
||||||
|
|
||||||
const std::string& mySQLTableName = tabletMeta.getTabletId();
|
|
||||||
|
|
||||||
|
|
||||||
if (mySQLTableName!=""){
|
|
||||||
DHT_INFO_STREAM() << "Tablet " << mySQLTableName << " already exists!";
|
|
||||||
return SuCode::PStoreTabletAlreadyExists;
|
|
||||||
} else {
|
|
||||||
size_t keylen; unsigned char * key;
|
|
||||||
buf_metadata(&key, &keylen, tabletMeta);
|
|
||||||
metadata_buf(tabletMeta, key, keylen);
|
|
||||||
free(key);
|
|
||||||
return SuCode::SuOk;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
SuCode::ResponseCode LSMPersistentStoreImpl::
|
|
||||||
dropTablet(TabletMetadata& tabletMeta)
|
|
||||||
{
|
|
||||||
DHT_INFO_STREAM() << "dropTablet called. Falling back on clearTabletRange()";
|
|
||||||
SuCode::ResponseCode ret = clearTabletRange(tabletMeta, 0);
|
|
||||||
|
|
||||||
size_t keylen;
|
|
||||||
unsigned char * key;
|
|
||||||
buf_metadata(&key, &keylen, tabletMeta);
|
|
||||||
|
|
||||||
datatuple * tup = datatuple::create(key, keylen); // two-argument form of datatuple::create creates a tombstone, which we will now insert.
|
|
||||||
free(key);
|
|
||||||
void * result = (void*)logstore_client_op(l_, OP_INSERT, tup);
|
|
||||||
datatuple::freetuple(tup);
|
|
||||||
|
|
||||||
tabletMeta.setTabletId("");
|
|
||||||
|
|
||||||
if(!result) {
|
|
||||||
DHT_WARN_STREAM() << "LSMP dropTablet fails";
|
|
||||||
ret = SuCode::PStoreTabletCleanupFailed;
|
|
||||||
} else {
|
|
||||||
DHT_INFO_STREAM() << "LSMP dropTablet succeeds";
|
|
||||||
ret = SuCode::SuOk;
|
|
||||||
}
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
// alternate to dropTablet() for when a tablet has been split and the underlying
|
|
||||||
// mysql table is being shared...just wipe out the record range for the tablet
|
|
||||||
// that is being dropped.
|
|
||||||
SuCode::ResponseCode LSMPersistentStoreImpl::
|
|
||||||
clearTabletRange(TabletMetadata& tabletMeta, uint32_t removalLimit)
|
|
||||||
{
|
|
||||||
DHT_WARN_STREAM() << "clear tablet range is unimplemented. ignoring request";
|
|
||||||
return SuCode::SuOk;
|
|
||||||
}
|
|
||||||
|
|
||||||
SuCode::ResponseCode LSMPersistentStoreImpl::
|
|
||||||
getApproximateTableSize(std::string tabletMeta,
|
|
||||||
int64_t& tableSize,
|
|
||||||
int64_t & rowCount) {
|
|
||||||
DHT_WARN_STREAM() << "get approximate table size is unimplemented. returning dummy values";
|
|
||||||
tableSize = 1024 * 1024 * 1024;
|
|
||||||
rowCount = 1024 * 1024;
|
|
||||||
return SuCode::SuOk;
|
|
||||||
}
|
|
||||||
SuCode::ResponseCode LSMPersistentStoreImpl::
|
|
||||||
getApproximateTableSize(TabletMetadata& tabletMeta,
|
|
||||||
int64_t& tableSize,
|
|
||||||
int64_t & rowCount)
|
|
||||||
{
|
|
||||||
DHT_DEBUG_STREAM() << "LSMP getApproximateTableSize (2) called";
|
|
||||||
return getApproximateTableSize(tabletMeta.getTabletId(), tableSize, rowCount);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
SuCode::ResponseCode LSMPersistentStoreImpl::
|
|
||||||
get(const TabletMetadata& tabletMeta, StorageRecord& recordData)
|
|
||||||
{
|
|
||||||
DHT_DEBUG_STREAM() << "LSMP get called" << tabletMeta.getTabletId() << ":" << recordData.recordKey();
|
|
||||||
if(recordData.recordKey().name().length() > (isOrdered_ ? SuLimits::MAX_ORDERED_RECORD_NAME_LENGTH : SuLimits::MAX_RECORD_NAME_LENGTH)) {
|
|
||||||
return SuCode::PStoreIOFailed;
|
|
||||||
}
|
|
||||||
size_t buflen;
|
|
||||||
unsigned char * buf = buf_key(tabletMeta, recordData.recordKey(), &buflen);
|
|
||||||
datatuple * key_tup = datatuple::create(buf, buflen);
|
|
||||||
free(buf);
|
|
||||||
datatuple * result = logstore_client_op(l_, OP_FIND, key_tup);
|
|
||||||
datatuple::freetuple(key_tup);
|
|
||||||
SuCode::ResponseCode ret;
|
|
||||||
if((!result) || result->isDelete()) {
|
|
||||||
ret = SuCode::PStoreRecordNotFound;
|
|
||||||
} else {
|
|
||||||
//DHT_DEBUG_STREAM() << "call val buf from get, data len = " << result->datalen() << std::endl;
|
|
||||||
ret = val_buf(recordData, result->data(), result->datalen());
|
|
||||||
}
|
|
||||||
if(result) {
|
|
||||||
datatuple::freetuple(result);
|
|
||||||
}
|
|
||||||
DHT_DEBUG_STREAM() << "LSMP get returns succ = " << (ret == SuCode::SuOk);
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
SuCode::ResponseCode LSMPersistentStoreImpl::
|
|
||||||
update(const TabletMetadata& tabletMeta, const StorageRecord& updateData)
|
|
||||||
{
|
|
||||||
DHT_DEBUG_STREAM() << "LSMP update called";
|
|
||||||
if(updateData.recordKey().name().length() > (isOrdered_ ? SuLimits::MAX_ORDERED_RECORD_NAME_LENGTH : SuLimits::MAX_RECORD_NAME_LENGTH)) {
|
|
||||||
return SuCode::PStoreIOFailed;
|
|
||||||
}
|
|
||||||
|
|
||||||
SuCode::ResponseCode ret;
|
|
||||||
{ /// XXX hack. Copy of get() implementation, without the memcpy.
|
|
||||||
size_t buflen;
|
|
||||||
unsigned char * buf = buf_key(tabletMeta, updateData.recordKey(), &buflen);
|
|
||||||
datatuple * key_tup = datatuple::create(buf, buflen);
|
|
||||||
datatuple * result = logstore_client_op(l_, OP_FIND, key_tup);
|
|
||||||
datatuple::freetuple(key_tup);
|
|
||||||
|
|
||||||
if((!result) || result->isDelete()) {
|
|
||||||
RESPONSE_ERROR_STREAM(SuCode::PStoreRecordNotFound) << "EC:PSTORE:No matching " <<
|
|
||||||
" record to update";
|
|
||||||
ret = SuCode::PStoreRecordNotFound;
|
|
||||||
} else {
|
|
||||||
// skip val_buf ...
|
|
||||||
ret = SuCode::SuOk;
|
|
||||||
}
|
|
||||||
free(buf);
|
|
||||||
if(result) { datatuple::freetuple(result); } // XXX differentiate between dead connection and missing tuple
|
|
||||||
}
|
|
||||||
if(ret == SuCode::PStoreRecordNotFound) { return ret; }
|
|
||||||
return insert(tabletMeta, updateData);
|
|
||||||
}
|
|
||||||
|
|
||||||
SuCode::ResponseCode LSMPersistentStoreImpl:: // XXX what to do about update?
|
|
||||||
insert(const TabletMetadata& tabletMeta,
|
|
||||||
const StorageRecord& insertData) {
|
|
||||||
DHT_DEBUG_STREAM()<< "LSMP insert called" << tabletMeta.getTabletId() << ":" << insertData.recordKey();
|
|
||||||
size_t keybuflen, valbuflen;
|
|
||||||
if(insertData.recordKey().name().length() > (isOrdered_ ? SuLimits::MAX_ORDERED_RECORD_NAME_LENGTH : SuLimits::MAX_RECORD_NAME_LENGTH)) {
|
|
||||||
return SuCode::PStoreIOFailed;
|
|
||||||
}
|
|
||||||
unsigned char * keybuf = buf_key(tabletMeta, insertData.recordKey(), &keybuflen);
|
|
||||||
DHT_DEBUG_STREAM() << "keybuf = " << keybuf << " (and perhaps a null)";
|
|
||||||
unsigned char * valbuf = buf_val(insertData, &valbuflen);
|
|
||||||
DHT_DEBUG_STREAM() << "valbuf = " << valbuf << " (and perhaps a null)";
|
|
||||||
datatuple * key_ins = datatuple::create(keybuf, keybuflen, valbuf, valbuflen);
|
|
||||||
DHT_DEBUG_STREAM() << "insert create()";
|
|
||||||
void * result = (void*)logstore_client_op(l_, OP_INSERT, key_ins);
|
|
||||||
DHT_DEBUG_STREAM() << "insert insert()";
|
|
||||||
if(result) {
|
|
||||||
DHT_DEBUG_STREAM() << "LSMP insert will return result = " << result;
|
|
||||||
} else {
|
|
||||||
DHT_DEBUG_STREAM() << "LSMP insert will return null ";
|
|
||||||
}
|
|
||||||
datatuple::freetuple(key_ins);
|
|
||||||
free(keybuf);
|
|
||||||
free(valbuf);
|
|
||||||
DHT_DEBUG_STREAM() << "LSMP insert returns ";
|
|
||||||
return result ? SuCode::SuOk : SuCode::PStoreUnexpectedError;
|
|
||||||
}
|
|
||||||
|
|
||||||
SuCode::ResponseCode LSMPersistentStoreImpl::
|
|
||||||
remove(const TabletMetadata& tabletMeta, const RecordKey& recordName)
|
|
||||||
{
|
|
||||||
DHT_DEBUG_STREAM() << "LSMP remove called";
|
|
||||||
if(recordName.name().length() > (isOrdered_ ? SuLimits::MAX_ORDERED_RECORD_NAME_LENGTH : SuLimits::MAX_RECORD_NAME_LENGTH)) {
|
|
||||||
return SuCode::PStoreIOFailed;
|
|
||||||
}
|
|
||||||
|
|
||||||
StorageRecord tmp(recordName.name());
|
|
||||||
SuCode::ResponseCode rc = get(tabletMeta, tmp);
|
|
||||||
if(SuCode::SuOk == rc) {
|
|
||||||
size_t buflen;
|
|
||||||
unsigned char * buf = buf_key(tabletMeta, recordName, &buflen);
|
|
||||||
datatuple * key_ins = datatuple::create(buf, buflen);
|
|
||||||
datatuple * result = logstore_client_op(l_, OP_INSERT, key_ins);
|
|
||||||
datatuple::freetuple(key_ins);
|
|
||||||
free(buf);
|
|
||||||
return result ? SuCode::SuOk : SuCode::PStoreUnexpectedError;
|
|
||||||
} else {
|
|
||||||
DHT_DEBUG_STREAM() << "LSMP remove: record not found, or error";
|
|
||||||
return rc;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
bool LSMPersistentStoreImpl::ping() {
|
|
||||||
DHT_DEBUG_STREAM() << "LSMP ping called";
|
|
||||||
datatuple * ret = logstore_client_op(l_, OP_DBG_NOOP);
|
|
||||||
if(ret == NULL) {
|
|
||||||
DHT_WARN_STREAM() << "LSMP ping failed";
|
|
||||||
return false;
|
|
||||||
} else {
|
|
||||||
datatuple::freetuple(ret);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
StorageRecordIterator LSMPersistentStoreImpl::
|
|
||||||
scan(const TabletMetadata& tabletMeta, const ScanContinuation& continuation,
|
|
||||||
ScanSelect::Selector selector, const uint64_t expiryTime, unsigned int scanLimit, size_t byteLimit)
|
|
||||||
{
|
|
||||||
DHT_DEBUG_STREAM() << "LSMP scan called. Tablet: " << tabletMeta.getTabletId();
|
|
||||||
ScanContinuationAutoPtr newContinuation;
|
|
||||||
TabletRangeAutoPtr tabletRange;
|
|
||||||
|
|
||||||
if (SuCode::SuOk != tabletMeta.keyRange(tabletRange)){
|
|
||||||
BAD_CODE_ABORT("Bad tablet name");
|
|
||||||
}
|
|
||||||
|
|
||||||
/* This is necessary once we turn on splits, because multiple tablets
|
|
||||||
* might reside in the same mysql table. Shouldnt scan beyond the
|
|
||||||
* upper limit of the tablet because that might be stale data left
|
|
||||||
* from before this tablet split.
|
|
||||||
*/
|
|
||||||
newContinuation = continuation.getContinuationLimitedToTabletRange(
|
|
||||||
*tabletRange);
|
|
||||||
|
|
||||||
LSMIterator* iter = new LSMIterator(this, tabletMeta, newContinuation,
|
|
||||||
selector, /*getMetadataOnly,*/
|
|
||||||
expiryTime, scanLimit, byteLimit);
|
|
||||||
|
|
||||||
DHT_DEBUG_STREAM() << "LSMP scan returns. Error = " << iter->error;
|
|
||||||
return StorageRecordIterator(iter);
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
SuCode::ResponseCode LSMPersistentStoreImpl::
|
|
||||||
getSnapshotExporter(const TabletMetadata& tabletMeta,
|
|
||||||
const std::string& snapshotId,
|
|
||||||
SnapshotExporterAutoPtr& exporter)
|
|
||||||
{
|
|
||||||
DHT_WARN_STREAM() << "Unimplemented: LSMP getSnapshotExported called";
|
|
||||||
/* const std::string& mySQLTableName = tabletMeta.getTabletId();
|
|
||||||
|
|
||||||
TabletRangeAutoPtr tabletRange;
|
|
||||||
RETURN_IF_NOT_OK(tabletMeta.keyRange(tabletRange));
|
|
||||||
|
|
||||||
ScanContinuationAutoPtr cont = tabletRange->getContinuationForTablet();
|
|
||||||
|
|
||||||
exporter = SnapshotExporterAutoPtr(
|
|
||||||
new LSMSnapshotExporter(mySQLTableName,cont,snapshotId));
|
|
||||||
*/
|
|
||||||
return SuCode::SuOk;
|
|
||||||
}
|
|
||||||
|
|
||||||
SuCode::ResponseCode LSMPersistentStoreImpl::
|
|
||||||
getSnapshotImporter(const TabletMetadata& tabletMeta,
|
|
||||||
const std::string& version,
|
|
||||||
const std::string& snapshotId,
|
|
||||||
SnapshotImporterAutoPtr& importer)
|
|
||||||
{
|
|
||||||
DHT_WARN_STREAM() << "Unimplemented: getSnapshotImporter called";
|
|
||||||
/* if (version == LSMSnapshotExporter::VERSION){
|
|
||||||
const std::string& mySQLTableName = tabletMeta.getTabletId();
|
|
||||||
importer=LSMSnapshotExporter::getImporter(tabletMeta.table(),
|
|
||||||
tabletMeta.tablet(),
|
|
||||||
snapshotId,
|
|
||||||
mySQLTableName);
|
|
||||||
return SuCode::SuOk;
|
|
||||||
}else{
|
|
||||||
RESPONSE_ERROR_STREAM(SuCode::PStoreUnexpectedError) <<
|
|
||||||
"EC:IMPOSSIBLE:Unknown snapshot version " << version <<" while trying to " <<
|
|
||||||
"import to tablet " << tabletMeta.tablet() << "of table " <<
|
|
||||||
tabletMeta.table();
|
|
||||||
return SuCode::PStoreUnexpectedError;
|
|
||||||
}*/
|
|
||||||
return SuCode::SuOk;
|
|
||||||
}
|
|
||||||
|
|
||||||
SuCode::ResponseCode LSMPersistentStoreImpl::
|
|
||||||
getIncomingCopyProgress(const TabletMetadata& metadata,
|
|
||||||
const std::string& snapshotId,
|
|
||||||
int64_t& current,
|
|
||||||
int64_t& estimated) const
|
|
||||||
{
|
|
||||||
DHT_DEBUG_STREAM() << "Unimplemented: LSMP getIncomingCopyProgress called";
|
|
||||||
|
|
||||||
//This will be a problem when we have more than 1
|
|
||||||
//exporter/importer type. We will have to store the
|
|
||||||
//snapshot version somewhere in tablet metadata
|
|
||||||
current = 1024*1024*1024;
|
|
||||||
estimated = 1024*1024*1024;
|
|
||||||
|
|
||||||
/* const std::string& mySQLTableName = metadata.getTabletId();
|
|
||||||
return LSMSnapshotExporter::
|
|
||||||
getIncomingCopyProgress(mySQLTableName,
|
|
||||||
snapshotId,
|
|
||||||
current,
|
|
||||||
estimated); */
|
|
||||||
return SuCode::SuOk;
|
|
||||||
}
|
|
||||||
|
|
||||||
SuCode::ResponseCode LSMPersistentStoreImpl::
|
|
||||||
getOutgoingCopyProgress(const TabletMetadata& metadata,
|
|
||||||
const std::string& snapshotId,
|
|
||||||
int64_t& current,
|
|
||||||
int64_t& estimated) const
|
|
||||||
{
|
|
||||||
DHT_DEBUG_STREAM() << "Unimplemented: LSMP getOutgoingCopyProgress called";
|
|
||||||
current = 1024*1024*1024;
|
|
||||||
estimated = 1024*1024*1024;
|
|
||||||
return SuCode::SuOk;
|
|
||||||
// return LSMSnapshotExporter::
|
|
||||||
// getOutgoingCopyProgress(snapshotId,
|
|
||||||
// current,
|
|
||||||
// estimated);
|
|
||||||
}
|
|
|
@ -1,177 +0,0 @@
|
||||||
/**
|
|
||||||
* \file LSMPersistentStoreImpl.h
|
|
||||||
* \brief This file is a wrapper over the LSM-Tree network protocol.
|
|
||||||
*
|
|
||||||
* Copyright (c) 2008 Yahoo, Inc.
|
|
||||||
* All rights reserved.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#ifndef LSM_PSTORE_IMPL_H
|
|
||||||
#define LSM_PSTORE_IMPL_H
|
|
||||||
|
|
||||||
#include "PersistentStore.h"
|
|
||||||
#include "datatuple.h"
|
|
||||||
//#include "LSMCoreImpl.h"
|
|
||||||
|
|
||||||
struct logstore_handle_t;
|
|
||||||
|
|
||||||
class LSMIterator;
|
|
||||||
|
|
||||||
class LSMPersistentStoreImpl : public PersistentStore
|
|
||||||
{
|
|
||||||
friend class LSMIterator;
|
|
||||||
friend class LSMPersistentParent;
|
|
||||||
protected:
|
|
||||||
|
|
||||||
// LSMCoreImpl& mySQLCoreImpl_;
|
|
||||||
bool isOrdered_;
|
|
||||||
unsigned char * my_strcat(const std::string& table,
|
|
||||||
const std::string& tablet,
|
|
||||||
const std::string& key,
|
|
||||||
size_t * len);
|
|
||||||
void my_strtok(const unsigned char* in, size_t len, std::string& table, std::string& tablet, std::string& key);
|
|
||||||
unsigned char * buf_key(const TabletMetadata& m, const RecordKey& r,
|
|
||||||
size_t * len);
|
|
||||||
unsigned char * buf_key(const TabletMetadata& m, const std::string s,
|
|
||||||
size_t * len);
|
|
||||||
unsigned char * buf_val(const StorageRecord &val,
|
|
||||||
size_t * len);
|
|
||||||
SuCode::ResponseCode tup_buf(StorageRecord &ret, datatuple * tup);
|
|
||||||
SuCode::ResponseCode key_buf(StorageRecord &ret,
|
|
||||||
const unsigned char * buf, size_t buf_len);
|
|
||||||
SuCode::ResponseCode val_buf(StorageRecord &ret,
|
|
||||||
const unsigned char * buf, size_t buf_len);
|
|
||||||
public:
|
|
||||||
|
|
||||||
LSMPersistentStoreImpl(bool ordered);
|
|
||||||
virtual ~LSMPersistentStoreImpl();
|
|
||||||
|
|
||||||
SuCode::ResponseCode initMetadataMetadata(TabletMetadata& m);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* See PersistentStore API
|
|
||||||
*/
|
|
||||||
SuCode::ResponseCode init(const SectionConfig& config);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* See PersistentStore API
|
|
||||||
*/
|
|
||||||
bool isOrdered();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* See PersistentStore API
|
|
||||||
*/
|
|
||||||
SuCode::ResponseCode addEmptyTablet(TabletMetadata& tabletMeta);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* See PersistentStore API
|
|
||||||
*/
|
|
||||||
SuCode::ResponseCode dropTablet(TabletMetadata& tabletMeta);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* See PersistentStore API
|
|
||||||
*/
|
|
||||||
SuCode::ResponseCode clearTabletRange(TabletMetadata& tabletMeta,
|
|
||||||
uint32_t removalLimit);
|
|
||||||
|
|
||||||
/**
|
|
||||||
Not part of PersistentStore api. PersistentParent needs to
|
|
||||||
provide this method as well, but it gets a string instead of a
|
|
||||||
TabletMetadata...
|
|
||||||
*/
|
|
||||||
SuCode::ResponseCode getApproximateTableSize(std::string tabletMeta,
|
|
||||||
int64_t& tableSize,
|
|
||||||
int64_t & rowCount);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* See PersistentStore API
|
|
||||||
*/
|
|
||||||
SuCode::ResponseCode getApproximateTableSize(TabletMetadata& tabletMeta,
|
|
||||||
int64_t& tableSize,
|
|
||||||
int64_t & rowCount);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* See PersistentStore API
|
|
||||||
*/
|
|
||||||
SuCode::ResponseCode get(const TabletMetadata& tabletMeta,
|
|
||||||
StorageRecord& recordData);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* See PersistentStore API
|
|
||||||
*/
|
|
||||||
SuCode::ResponseCode update(const TabletMetadata& tabletMeta,
|
|
||||||
const StorageRecord& updateData);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* See PersistentStore API
|
|
||||||
*/
|
|
||||||
SuCode::ResponseCode insert(const TabletMetadata& tabletMeta,
|
|
||||||
const StorageRecord& insertData);
|
|
||||||
/**
|
|
||||||
* See PersistentStore API
|
|
||||||
*/
|
|
||||||
SuCode::ResponseCode remove(const TabletMetadata& tabletMeta,
|
|
||||||
const RecordKey& recordKey);
|
|
||||||
/**
|
|
||||||
* Not part of PersistentStore API. However, PersistentParent needs to implement ping
|
|
||||||
*/
|
|
||||||
bool ping();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* See PersistentStore API
|
|
||||||
*/
|
|
||||||
StorageRecordIterator
|
|
||||||
scan(const TabletMetadata& tabletMeta, const ScanContinuation& continuation,
|
|
||||||
ScanSelect::Selector selector, const uint64_t expiryTime, unsigned int scanLimit, size_t byteLimit);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* See PersistentStore API
|
|
||||||
*/
|
|
||||||
SuCode::ResponseCode getSnapshotExporter(const TabletMetadata& tabletMeta,
|
|
||||||
const std::string& snapshotId,
|
|
||||||
SnapshotExporterAutoPtr& exporter);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* See PersistentStore API
|
|
||||||
*/
|
|
||||||
SuCode::ResponseCode getSnapshotImporter(const TabletMetadata& tabletMeta,
|
|
||||||
const std::string& version,
|
|
||||||
const std::string& snapshotId,
|
|
||||||
SnapshotImporterAutoPtr& snapshot) ;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* See PersistentStore API
|
|
||||||
*/
|
|
||||||
SuCode::ResponseCode getIncomingCopyProgress(const TabletMetadata& metadata,
|
|
||||||
const std::string& snapshotId,
|
|
||||||
int64_t& current,
|
|
||||||
int64_t& estimated) const;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* See PersistentStore API
|
|
||||||
*/
|
|
||||||
SuCode::ResponseCode getOutgoingCopyProgress(const TabletMetadata& metadata,
|
|
||||||
const std::string& snapshotId,
|
|
||||||
int64_t& current,
|
|
||||||
int64_t& estimated) const;
|
|
||||||
private:
|
|
||||||
/**
|
|
||||||
* connect to the database. Noop if already connected.
|
|
||||||
*
|
|
||||||
* @return SuCode::SuOk if successful, error otherwise
|
|
||||||
*/
|
|
||||||
// SuCode::ResponseCode connect();
|
|
||||||
|
|
||||||
private:
|
|
||||||
LSMPersistentStoreImpl(LSMPersistentStoreImpl &);
|
|
||||||
LSMPersistentStoreImpl operator=(LSMPersistentStoreImpl &);
|
|
||||||
|
|
||||||
void metadata_buf(TabletMetadata &m, const unsigned char * buf, size_t len);
|
|
||||||
void buf_metadata(unsigned char ** buf, size_t *len, const TabletMetadata &m);
|
|
||||||
protected:
|
|
||||||
logstore_handle_t * l_;
|
|
||||||
logstore_handle_t * scan_l_; // XXX make sure that one scan handle per process suffices.
|
|
||||||
|
|
||||||
};
|
|
||||||
|
|
||||||
#endif /*LSM_PSTORE_IMPL_H*/
|
|
|
@ -34,8 +34,8 @@ LSMServerHandler(int argc, char **argv)
|
||||||
c0_size = 1024 * 1024 * 100;
|
c0_size = 1024 * 1024 * 100;
|
||||||
printf("warning: running w/ tiny c0 for testing\n"); // XXX build a separate test server and deployment server?
|
printf("warning: running w/ tiny c0 for testing\n"); // XXX build a separate test server and deployment server?
|
||||||
} else if(!strcmp(argv[i], "--benchmark")) {
|
} else if(!strcmp(argv[i], "--benchmark")) {
|
||||||
stasis_buffer_manager_size = (1024LL * 1024LL * 1024LL * 2LL) / PAGE_SIZE; // 4GB total
|
stasis_buffer_manager_size = (1024LL * 1024LL * 1024LL * 1LL) / PAGE_SIZE; // 4GB total
|
||||||
c0_size = 1024LL * 1024LL * 1024LL * 2LL;
|
c0_size = 1024LL * 1024LL * 1024LL * 1LL;
|
||||||
printf("note: running w/ 2GB c0 for benchmarking\n"); // XXX build a separate test server and deployment server?
|
printf("note: running w/ 2GB c0 for benchmarking\n"); // XXX build a separate test server and deployment server?
|
||||||
} else if(!strcmp(argv[i], "--log-mode")) {
|
} else if(!strcmp(argv[i], "--log-mode")) {
|
||||||
i++;
|
i++;
|
||||||
|
@ -128,14 +128,14 @@ nextDatabaseId()
|
||||||
ResponseCode::type LSMServerHandler::
|
ResponseCode::type LSMServerHandler::
|
||||||
ping()
|
ping()
|
||||||
{
|
{
|
||||||
return sherpa::ResponseCode::Ok;
|
return mapkeeper::ResponseCode::Success;
|
||||||
}
|
}
|
||||||
|
|
||||||
ResponseCode::type LSMServerHandler::
|
ResponseCode::type LSMServerHandler::
|
||||||
shutdown()
|
shutdown()
|
||||||
{
|
{
|
||||||
exit(0); // xxx hack
|
exit(0); // xxx hack
|
||||||
return sherpa::ResponseCode::Ok;
|
return mapkeeper::ResponseCode::Success;
|
||||||
}
|
}
|
||||||
|
|
||||||
ResponseCode::type LSMServerHandler::
|
ResponseCode::type LSMServerHandler::
|
||||||
|
@ -143,18 +143,18 @@ insert(datatuple* tuple)
|
||||||
{
|
{
|
||||||
ltable_->insertTuple(tuple);
|
ltable_->insertTuple(tuple);
|
||||||
datatuple::freetuple(tuple);
|
datatuple::freetuple(tuple);
|
||||||
return sherpa::ResponseCode::Ok;
|
return mapkeeper::ResponseCode::Success;
|
||||||
}
|
}
|
||||||
|
|
||||||
ResponseCode::type LSMServerHandler::
|
ResponseCode::type LSMServerHandler::
|
||||||
addDatabase(const std::string& databaseName)
|
addMap(const std::string& databaseName)
|
||||||
{
|
{
|
||||||
uint32_t id = nextDatabaseId();
|
uint32_t id = nextDatabaseId();
|
||||||
datatuple* tup = buildTuple(0, databaseName, (void*)&id, (uint32_t)(sizeof(id)));
|
datatuple* tup = buildTuple(0, databaseName, (void*)&id, (uint32_t)(sizeof(id)));
|
||||||
datatuple* ret = get(tup);
|
datatuple* ret = get(tup);
|
||||||
if (ret) {
|
if (ret) {
|
||||||
datatuple::freetuple(ret);
|
datatuple::freetuple(ret);
|
||||||
return sherpa::ResponseCode::DatabaseExists;
|
return mapkeeper::ResponseCode::MapExists;
|
||||||
}
|
}
|
||||||
return insert(tup);
|
return insert(tup);
|
||||||
}
|
}
|
||||||
|
@ -165,23 +165,23 @@ addDatabase(const std::string& databaseName)
|
||||||
* all the records!
|
* all the records!
|
||||||
*/
|
*/
|
||||||
ResponseCode::type LSMServerHandler::
|
ResponseCode::type LSMServerHandler::
|
||||||
dropDatabase(const std::string& databaseName)
|
dropMap(const std::string& databaseName)
|
||||||
{
|
{
|
||||||
#if 0
|
#if 0
|
||||||
Bdb::ResponseCode rc = databaseIds_.remove(databaseName);
|
Bdb::ResponseCode rc = databaseIds_.remove(databaseName);
|
||||||
if (rc == Bdb::KeyNotFound) {
|
if (rc == Bdb::KeyNotFound) {
|
||||||
return sherpa::ResponseCode::DatabaseNotFound;
|
return mapkeeper::ResponseCode::MapNotFound;
|
||||||
} else if (rc != Bdb::Ok) {
|
} else if (rc != Bdb::Success) {
|
||||||
return sherpa::ResponseCode::Error;
|
return mapkeeper::ResponseCode::Error;
|
||||||
} else {
|
} else {
|
||||||
return sherpa::ResponseCode::Ok;
|
return mapkeeper::ResponseCode::Success;
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
return sherpa::ResponseCode::Ok;
|
return mapkeeper::ResponseCode::Success;
|
||||||
}
|
}
|
||||||
|
|
||||||
void LSMServerHandler::
|
void LSMServerHandler::
|
||||||
listDatabases(StringListResponse& _return)
|
listMaps(StringListResponse& _return)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -194,7 +194,7 @@ scan(RecordListResponse& _return, const std::string& databaseName, const ScanOrd
|
||||||
uint32_t id = getDatabaseId(databaseName);
|
uint32_t id = getDatabaseId(databaseName);
|
||||||
if (id == 0) {
|
if (id == 0) {
|
||||||
// database not found
|
// database not found
|
||||||
_return.responseCode = sherpa::ResponseCode::DatabaseNotFound;
|
_return.responseCode = mapkeeper::ResponseCode::MapNotFound;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -213,7 +213,7 @@ scan(RecordListResponse& _return, const std::string& databaseName, const ScanOrd
|
||||||
(maxBytes == 0 || resultSize < maxBytes)) {
|
(maxBytes == 0 || resultSize < maxBytes)) {
|
||||||
datatuple* current = itr->getnext();
|
datatuple* current = itr->getnext();
|
||||||
if (current == NULL) {
|
if (current == NULL) {
|
||||||
_return.responseCode = sherpa::ResponseCode::ScanEnded;
|
_return.responseCode = mapkeeper::ResponseCode::ScanEnded;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -228,7 +228,7 @@ scan(RecordListResponse& _return, const std::string& databaseName, const ScanOrd
|
||||||
if ((!endKeyIncluded && cmp >= 0) ||
|
if ((!endKeyIncluded && cmp >= 0) ||
|
||||||
(endKeyIncluded && cmp > 0)) {
|
(endKeyIncluded && cmp > 0)) {
|
||||||
datatuple::freetuple(current);
|
datatuple::freetuple(current);
|
||||||
_return.responseCode = sherpa::ResponseCode::ScanEnded;
|
_return.responseCode = mapkeeper::ResponseCode::ScanEnded;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -260,17 +260,17 @@ get(BinaryResponse& _return, const std::string& databaseName, const std::string&
|
||||||
uint32_t id = getDatabaseId(databaseName);
|
uint32_t id = getDatabaseId(databaseName);
|
||||||
if (id == 0) {
|
if (id == 0) {
|
||||||
// database not found
|
// database not found
|
||||||
_return.responseCode = sherpa::ResponseCode::DatabaseNotFound;
|
_return.responseCode = mapkeeper::ResponseCode::MapNotFound;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
datatuple* recordBody = get(id, recordName);
|
datatuple* recordBody = get(id, recordName);
|
||||||
if (recordBody == NULL) {
|
if (recordBody == NULL) {
|
||||||
// record not found
|
// record not found
|
||||||
_return.responseCode = sherpa::ResponseCode::RecordNotFound;
|
_return.responseCode = mapkeeper::ResponseCode::RecordNotFound;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
_return.responseCode = sherpa::ResponseCode::Ok;
|
_return.responseCode = mapkeeper::ResponseCode::Success;
|
||||||
_return.value.assign((const char*)(recordBody->data()), recordBody->datalen());
|
_return.value.assign((const char*)(recordBody->data()), recordBody->datalen());
|
||||||
datatuple::freetuple(recordBody);
|
datatuple::freetuple(recordBody);
|
||||||
}
|
}
|
||||||
|
@ -301,6 +301,14 @@ get(uint32_t databaseId, const std::string& recordName)
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ResponseCode::type LSMServerHandler::
|
||||||
|
put(const std::string& databaseName,
|
||||||
|
const std::string& recordName,
|
||||||
|
const std::string& recordBody)
|
||||||
|
{
|
||||||
|
return mapkeeper::ResponseCode::Success;
|
||||||
|
}
|
||||||
|
|
||||||
ResponseCode::type LSMServerHandler::
|
ResponseCode::type LSMServerHandler::
|
||||||
insert(const std::string& databaseName,
|
insert(const std::string& databaseName,
|
||||||
const std::string& recordName,
|
const std::string& recordName,
|
||||||
|
@ -309,7 +317,7 @@ insert(const std::string& databaseName,
|
||||||
// std::cerr << "inserting " << databaseName << "." << recordName << std::endl;
|
// std::cerr << "inserting " << databaseName << "." << recordName << std::endl;
|
||||||
uint32_t id = getDatabaseId(databaseName);
|
uint32_t id = getDatabaseId(databaseName);
|
||||||
if (id == 0) {
|
if (id == 0) {
|
||||||
return sherpa::ResponseCode::DatabaseNotFound;
|
return mapkeeper::ResponseCode::MapNotFound;
|
||||||
}
|
}
|
||||||
datatuple* oldRecordBody = get(id, recordName);
|
datatuple* oldRecordBody = get(id, recordName);
|
||||||
if (oldRecordBody != NULL) {
|
if (oldRecordBody != NULL) {
|
||||||
|
@ -317,7 +325,7 @@ insert(const std::string& databaseName,
|
||||||
datatuple::freetuple(oldRecordBody);
|
datatuple::freetuple(oldRecordBody);
|
||||||
} else {
|
} else {
|
||||||
datatuple::freetuple(oldRecordBody);
|
datatuple::freetuple(oldRecordBody);
|
||||||
return sherpa::ResponseCode::RecordExists;
|
return mapkeeper::ResponseCode::RecordExists;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -328,7 +336,7 @@ insert(const std::string& databaseName,
|
||||||
ResponseCode::type LSMServerHandler::
|
ResponseCode::type LSMServerHandler::
|
||||||
insertMany(const std::string& databaseName, const std::vector<Record> & records)
|
insertMany(const std::string& databaseName, const std::vector<Record> & records)
|
||||||
{
|
{
|
||||||
return sherpa::ResponseCode::Error;
|
return mapkeeper::ResponseCode::Error;
|
||||||
}
|
}
|
||||||
|
|
||||||
ResponseCode::type LSMServerHandler::
|
ResponseCode::type LSMServerHandler::
|
||||||
|
@ -338,11 +346,11 @@ update(const std::string& databaseName,
|
||||||
{
|
{
|
||||||
uint32_t id = getDatabaseId(databaseName);
|
uint32_t id = getDatabaseId(databaseName);
|
||||||
if (id == 0) {
|
if (id == 0) {
|
||||||
return sherpa::ResponseCode::DatabaseNotFound;
|
return mapkeeper::ResponseCode::MapNotFound;
|
||||||
}
|
}
|
||||||
datatuple* oldRecordBody = get(id, recordName);
|
datatuple* oldRecordBody = get(id, recordName);
|
||||||
if (oldRecordBody == NULL) {
|
if (oldRecordBody == NULL) {
|
||||||
return sherpa::ResponseCode::RecordNotFound;
|
return mapkeeper::ResponseCode::RecordNotFound;
|
||||||
}
|
}
|
||||||
datatuple::freetuple(oldRecordBody);
|
datatuple::freetuple(oldRecordBody);
|
||||||
datatuple* tup = buildTuple(id, recordName, recordBody);
|
datatuple* tup = buildTuple(id, recordName, recordBody);
|
||||||
|
@ -354,11 +362,11 @@ remove(const std::string& databaseName, const std::string& recordName)
|
||||||
{
|
{
|
||||||
uint32_t id = getDatabaseId(databaseName);
|
uint32_t id = getDatabaseId(databaseName);
|
||||||
if (id == 0) {
|
if (id == 0) {
|
||||||
return sherpa::ResponseCode::DatabaseNotFound;
|
return mapkeeper::ResponseCode::MapNotFound;
|
||||||
}
|
}
|
||||||
datatuple* oldRecordBody = get(id, recordName);
|
datatuple* oldRecordBody = get(id, recordName);
|
||||||
if (oldRecordBody == NULL) {
|
if (oldRecordBody == NULL) {
|
||||||
return sherpa::ResponseCode::RecordNotFound;
|
return mapkeeper::ResponseCode::RecordNotFound;
|
||||||
}
|
}
|
||||||
datatuple::freetuple(oldRecordBody);
|
datatuple::freetuple(oldRecordBody);
|
||||||
datatuple* tup = buildTuple(id, recordName);
|
datatuple* tup = buildTuple(id, recordName);
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
#include <dht_persistent_store/PersistentStore.h>
|
#include "MapKeeper.h"
|
||||||
#include <protocol/TBinaryProtocol.h>
|
#include <protocol/TBinaryProtocol.h>
|
||||||
#include <transport/TServerSocket.h>
|
#include <transport/TServerSocket.h>
|
||||||
#include <transport/TBufferTransports.h>
|
#include <transport/TBufferTransports.h>
|
||||||
|
@ -7,22 +7,23 @@ using namespace ::apache::thrift;
|
||||||
using namespace ::apache::thrift::protocol;
|
using namespace ::apache::thrift::protocol;
|
||||||
using namespace ::apache::thrift::transport;
|
using namespace ::apache::thrift::transport;
|
||||||
|
|
||||||
using namespace sherpa;
|
using namespace mapkeeper;
|
||||||
using boost::shared_ptr;
|
using boost::shared_ptr;
|
||||||
|
|
||||||
class LSMServerHandler : virtual public PersistentStoreIf {
|
class LSMServerHandler : virtual public MapKeeperIf {
|
||||||
public:
|
public:
|
||||||
LSMServerHandler(int argc, char **argv);
|
LSMServerHandler(int argc, char **argv);
|
||||||
ResponseCode::type ping();
|
ResponseCode::type ping();
|
||||||
ResponseCode::type shutdown();
|
ResponseCode::type shutdown();
|
||||||
ResponseCode::type addDatabase(const std::string& databaseName);
|
ResponseCode::type addMap(const std::string& databaseName);
|
||||||
ResponseCode::type dropDatabase(const std::string& databaseName);
|
ResponseCode::type dropMap(const std::string& databaseName);
|
||||||
void listDatabases(StringListResponse& _return);
|
void listMaps(StringListResponse& _return);
|
||||||
void scan(RecordListResponse& _return, const std::string& databaseName, const ScanOrder::type order,
|
void scan(RecordListResponse& _return, const std::string& databaseName, const ScanOrder::type order,
|
||||||
const std::string& startKey, const bool startKeyIncluded,
|
const std::string& startKey, const bool startKeyIncluded,
|
||||||
const std::string& endKey, const bool endKeyIncluded,
|
const std::string& endKey, const bool endKeyIncluded,
|
||||||
const int32_t maxRecords, const int32_t maxBytes);
|
const int32_t maxRecords, const int32_t maxBytes);
|
||||||
void get(BinaryResponse& _return, const std::string& databaseName, const std::string& recordName);
|
void get(BinaryResponse& _return, const std::string& databaseName, const std::string& recordName);
|
||||||
|
ResponseCode::type put(const std::string& databaseName, const std::string& recordName, const std::string& recordBody);
|
||||||
ResponseCode::type insert(const std::string& databaseName, const std::string& recordName, const std::string& recordBody);
|
ResponseCode::type insert(const std::string& databaseName, const std::string& recordName, const std::string& recordBody);
|
||||||
ResponseCode::type insertMany(const std::string& databaseName, const std::vector<Record> & records);
|
ResponseCode::type insertMany(const std::string& databaseName, const std::vector<Record> & records);
|
||||||
ResponseCode::type update(const std::string& databaseName, const std::string& recordName, const std::string& recordBody);
|
ResponseCode::type update(const std::string& databaseName, const std::string& recordName, const std::string& recordBody);
|
||||||
|
|
|
@ -10,7 +10,7 @@ CXXSRC = LSMServerHandler.cc
|
||||||
|
|
||||||
LCLEAN += *~
|
LCLEAN += *~
|
||||||
WARN += -Werror -Wall
|
WARN += -Werror -Wall
|
||||||
LINC += -I. -I /home/y/include/thrift -I /home/y/include64/stasis -I ..
|
LINC += -I. -I /usr/local/include/thrift -I /home/y/include64/stasis -I .. -I ../../thrift/gen-cpp
|
||||||
|
|
||||||
LDLIBS += -L/usr/local/lib
|
LDLIBS += -L/usr/local/lib
|
||||||
LDFLAGS += -Wl,-rpath,/home/y/lib64 -Wl,-rpath,../build
|
LDFLAGS += -Wl,-rpath,/home/y/lib64 -Wl,-rpath,../build
|
||||||
|
@ -19,7 +19,7 @@ LDFLAGS += -Wl,-rpath,/home/y/lib64 -Wl,-rpath,../build
|
||||||
# Poor packaging for yicu
|
# Poor packaging for yicu
|
||||||
LINC += -I/home/y/include/yicu
|
LINC += -I/home/y/include/yicu
|
||||||
|
|
||||||
LDLIBS += -lthrift -ldht_persistent_store -lstasis -llogstore -L ../build
|
LDLIBS += -lthrift -lmapkeeper -lstasis -llogstore -L ../build -L ../../thrift/gen-cpp
|
||||||
|
|
||||||
# Need to remove potential warnings in yapache.
|
# Need to remove potential warnings in yapache.
|
||||||
LDEF += -DEAPI
|
LDEF += -DEAPI
|
||||||
|
|
|
@ -1,21 +1,28 @@
|
||||||
CPPFLAGS =-I../../stasis -I.. -I. -I /usr/local/include/thrift
|
THRIFT_DIR = /usr/local
|
||||||
CXXFLAGS =-g -O0
|
|
||||||
LDFLAGS=-lprofiler -ltcmalloc -lpthread -llogstore -lstasis -L ../build -L ../../stasis/build/src/stasis -lthrift -Wl,-rpath,../build,-rpath,../../stasis/build/src/stasis,-rpath,/usr/local/lib
|
|
||||||
|
|
||||||
THRIFT_SRC = dht_persistent_store/persistent_store_constants.cpp dht_persistent_store/persistent_store_types.cpp dht_persistent_store/PersistentStore.cpp
|
CPPFLAGS =-I../../stasis -I.. -I. -I $(THRIFT_DIR)/include/thrift -I../../mapkeeper/thrift/gen-cpp
|
||||||
|
CXXFLAGS =-g -O0
|
||||||
|
|
||||||
|
LDFLAGS=-lprofiler -ltcmalloc -lpthread -llogstore -lstasis -lmapkeeper -lthrift \
|
||||||
|
-L $(THRIFT_DIR)/lib -L ../../mapkeeper/thrift/gen-cpp \
|
||||||
|
-L ../build -L ../../stasis/build/src/stasis \
|
||||||
|
-Wl,-rpath,\$$ORIGIN/../../build \
|
||||||
|
-Wl,-rpath,\$$ORIGIN/../../../stasis/build/src/stasis \
|
||||||
|
-Wl,-rpath,\$$ORIGIN/../../../mapkeeper/thrift/gen-cpp \
|
||||||
|
-Wl,-rpath,/usr/local/lib \
|
||||||
|
-Wl,-rpath,$(THRIFT_DIR)/lib
|
||||||
|
|
||||||
all: main/lsm_client main/lsm_server main/lsm_shutdown
|
all: main/lsm_client main/lsm_server main/lsm_shutdown
|
||||||
|
|
||||||
dht_thrift.jar :
|
dht_thrift.jar :
|
||||||
cd ../thrift/gen-java;
|
cd ../thrift/gen-java;
|
||||||
bash -c 'javac \`find . -name *.java\` -cp /usr/local/lib/libthrift.jar:/usr/share/java/slf4j-api-1.5.11.jar'
|
bash -c 'javac \`find . -name *.java\` -cp $(THRIFT_DIR)/lib/libthrift.jar:/usr/share/java/slf4j-api-1.5.11.jar'
|
||||||
cd ../../sherpa
|
cd ../../sherpa
|
||||||
|
|
||||||
main/lsm_client : $(THRIFT_SRC)
|
main/lsm_client :
|
||||||
|
main/lsm_shutdown :
|
||||||
|
|
||||||
main/lsm_shutdown : $(THRIFT_SRC)
|
main/lsm_server : LSMServerHandler.cc
|
||||||
|
|
||||||
main/lsm_server : LSMServerHandler.cc $(THRIFT_SRC)
|
|
||||||
|
|
||||||
clean:
|
clean:
|
||||||
rm -f main/lsm_client main/lsm_server
|
rm -f main/lsm_client main/lsm_server
|
||||||
|
|
|
@ -1 +0,0 @@
|
||||||
../thrift/gen-cpp/
|
|
|
@ -9,8 +9,8 @@ CXXSRC = $(addsuffix .cc, $(EXETARGET))
|
||||||
LCLEAN += *~
|
LCLEAN += *~
|
||||||
WARN += -Werror -Wall
|
WARN += -Werror -Wall
|
||||||
|
|
||||||
LINC += -I../ -I /home/y/include/thrift -I /home/y/include/boost/tr1/ -I ../../
|
LINC += -I../ -I /usr/local/include/thrift -I /home/y/include/boost/tr1/ -I ../../ -I ../../../thrift/gen-cpp/
|
||||||
LDLIBS += -L../ -ldht_persistent_store -lLSMServer
|
LDLIBS += -L../ -lmapkeeper -lLSMServer -L ../../../thrift/gen-cpp
|
||||||
LDFLAGS += -Wl,-rpath,/home/y/lib64 -Wl,-rpath,../../build
|
LDFLAGS += -Wl,-rpath,/home/y/lib64 -Wl,-rpath,../../build
|
||||||
|
|
||||||
ifdef UNIT_TEST
|
ifdef UNIT_TEST
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
#include <dht_persistent_store/PersistentStore.h>
|
#include "MapKeeper.h"
|
||||||
#include <transport/TSocket.h>
|
#include <transport/TSocket.h>
|
||||||
#include <transport/TBufferTransports.h>
|
#include <transport/TBufferTransports.h>
|
||||||
#include <protocol/TBinaryProtocol.h>
|
#include <protocol/TBinaryProtocol.h>
|
||||||
|
@ -14,14 +14,14 @@ int main(int argc, char **argv) {
|
||||||
boost::shared_ptr<TTransport> transport(new TFramedTransport(socket));
|
boost::shared_ptr<TTransport> transport(new TFramedTransport(socket));
|
||||||
boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
|
boost::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
|
||||||
|
|
||||||
sherpa::PersistentStoreClient client(protocol);
|
mapkeeper::MapKeeperClient client(protocol);
|
||||||
transport->open();
|
transport->open();
|
||||||
socket->setNoDelay(true);
|
socket->setNoDelay(true);
|
||||||
sherpa::BinaryResponse getResponse;
|
mapkeeper::BinaryResponse getResponse;
|
||||||
sherpa::RecordListResponse scanResponse;
|
mapkeeper::RecordListResponse scanResponse;
|
||||||
std::string db = "db";
|
std::string db = "db";
|
||||||
std::string db1 = "db1";
|
std::string db1 = "db1";
|
||||||
cout << client.addDatabase(db) << endl;;
|
cout << client.addMap(db) << endl;;
|
||||||
cout << client.insert(db, "kkkkkkkkkkkkk1", "v1") << endl;
|
cout << client.insert(db, "kkkkkkkkkkkkk1", "v1") << endl;
|
||||||
cout << client.insert(db, "kkkkkkkkkkkkk2", "v2") << endl;
|
cout << client.insert(db, "kkkkkkkkkkkkk2", "v2") << endl;
|
||||||
cout << client.insert(db, "kkkkkkkkkkkkk3", "v3") << endl;
|
cout << client.insert(db, "kkkkkkkkkkkkk3", "v3") << endl;
|
||||||
|
@ -58,9 +58,9 @@ int main(int argc, char **argv) {
|
||||||
*/
|
*/
|
||||||
|
|
||||||
cout << "adding db one more time" << endl;
|
cout << "adding db one more time" << endl;
|
||||||
cout << client.addDatabase(db) << endl;;
|
cout << client.addMap(db) << endl;;
|
||||||
|
|
||||||
cout << client.addDatabase(db1) << endl;;
|
cout << client.addMap(db1) << endl;;
|
||||||
cout << client.insert(db1, "new key", "new value") << endl;
|
cout << client.insert(db1, "new key", "new value") << endl;
|
||||||
client.get(getResponse, db1, "new key");
|
client.get(getResponse, db1, "new key");
|
||||||
cout << getResponse.responseCode << endl;
|
cout << getResponse.responseCode << endl;
|
||||||
|
@ -75,33 +75,33 @@ int main(int argc, char **argv) {
|
||||||
cout << getResponse.responseCode << endl;
|
cout << getResponse.responseCode << endl;
|
||||||
cout << client.remove(db1, "new key") << endl;
|
cout << client.remove(db1, "new key") << endl;
|
||||||
|
|
||||||
client.scan(scanResponse, db, sherpa::ScanOrder::Ascending, "", true, "", true, 100, 100);
|
client.scan(scanResponse, db, mapkeeper::ScanOrder::Ascending, "", true, "", true, 100, 100);
|
||||||
std::vector<sherpa::Record>::iterator itr;
|
std::vector<mapkeeper::Record>::iterator itr;
|
||||||
for (itr = scanResponse.records.begin(); itr != scanResponse.records.end(); itr++) {
|
for (itr = scanResponse.records.begin(); itr != scanResponse.records.end(); itr++) {
|
||||||
cout << itr->key << " " << itr->value << endl;
|
cout << itr->key << " " << itr->value << endl;
|
||||||
}
|
}
|
||||||
std::cout << std::endl;
|
std::cout << std::endl;
|
||||||
|
|
||||||
|
|
||||||
client.scan(scanResponse, db, sherpa::ScanOrder::Ascending, "kkkkkkkkkkkkk1", false, "kkkkkkkkkkkkk3", false, 100, 100);
|
client.scan(scanResponse, db, mapkeeper::ScanOrder::Ascending, "kkkkkkkkkkkkk1", false, "kkkkkkkkkkkkk3", false, 100, 100);
|
||||||
for (itr = scanResponse.records.begin(); itr != scanResponse.records.end(); itr++) {
|
for (itr = scanResponse.records.begin(); itr != scanResponse.records.end(); itr++) {
|
||||||
cout << itr->key << " " << itr->value << endl;
|
cout << itr->key << " " << itr->value << endl;
|
||||||
}
|
}
|
||||||
std::cout << std::endl;
|
std::cout << std::endl;
|
||||||
|
|
||||||
client.scan(scanResponse, db, sherpa::ScanOrder::Ascending, "kkkkkkkkkkkkk1", true, "kkkkkkkkkkkkk3", true, 100, 100);
|
client.scan(scanResponse, db, mapkeeper::ScanOrder::Ascending, "kkkkkkkkkkkkk1", true, "kkkkkkkkkkkkk3", true, 100, 100);
|
||||||
for (itr = scanResponse.records.begin(); itr != scanResponse.records.end(); itr++) {
|
for (itr = scanResponse.records.begin(); itr != scanResponse.records.end(); itr++) {
|
||||||
cout << itr->key << " " << itr->value << endl;
|
cout << itr->key << " " << itr->value << endl;
|
||||||
}
|
}
|
||||||
std::cout << std::endl;
|
std::cout << std::endl;
|
||||||
|
|
||||||
client.scan(scanResponse, db, sherpa::ScanOrder::Ascending, "kkkkkkkkkkkkk2", true, "kkkkkkkkkkkkk2", true, 100, 100);
|
client.scan(scanResponse, db, mapkeeper::ScanOrder::Ascending, "kkkkkkkkkkkkk2", true, "kkkkkkkkkkkkk2", true, 100, 100);
|
||||||
for (itr = scanResponse.records.begin(); itr != scanResponse.records.end(); itr++) {
|
for (itr = scanResponse.records.begin(); itr != scanResponse.records.end(); itr++) {
|
||||||
cout << itr->key << " " << itr->value << endl;
|
cout << itr->key << " " << itr->value << endl;
|
||||||
}
|
}
|
||||||
std::cout << std::endl;
|
std::cout << std::endl;
|
||||||
|
|
||||||
client.scan(scanResponse, db, sherpa::ScanOrder::Ascending, "k", true, "kkkkkkkkkkkkk4", true, 100, 100);
|
client.scan(scanResponse, db, mapkeeper::ScanOrder::Ascending, "k", true, "kkkkkkkkkkkkk4", true, 100, 100);
|
||||||
for (itr = scanResponse.records.begin(); itr != scanResponse.records.end(); itr++) {
|
for (itr = scanResponse.records.begin(); itr != scanResponse.records.end(); itr++) {
|
||||||
cout << itr->key << " " << itr->value << endl;
|
cout << itr->key << " " << itr->value << endl;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,11 +1,12 @@
|
||||||
#include <dht_persistent_store/PersistentStore.h>
|
#include "MapKeeper.h"
|
||||||
#include <protocol/TBinaryProtocol.h>
|
#include <protocol/TBinaryProtocol.h>
|
||||||
#include <server/TSimpleServer.h>
|
#include <server/TSimpleServer.h>
|
||||||
#include <server/TThreadPoolServer.h>
|
#include <server/TThreadPoolServer.h>
|
||||||
|
#include <server/TThreadedServer.h>
|
||||||
#include <transport/TServerSocket.h>
|
#include <transport/TServerSocket.h>
|
||||||
#include <transport/TBufferTransports.h>
|
#include <transport/TBufferTransports.h>
|
||||||
#include <thrift/concurrency/ThreadManager.h>
|
#include <concurrency/ThreadManager.h>
|
||||||
#include <thrift/concurrency/PosixThreadFactory.h>
|
#include <concurrency/PosixThreadFactory.h>
|
||||||
#include "logstore.h"
|
#include "logstore.h"
|
||||||
#include "datatuple.h"
|
#include "datatuple.h"
|
||||||
#include "LSMServerHandler.h"
|
#include "LSMServerHandler.h"
|
||||||
|
@ -17,19 +18,20 @@ using namespace ::apache::thrift::server;
|
||||||
using namespace ::apache::thrift::concurrency;
|
using namespace ::apache::thrift::concurrency;
|
||||||
using boost::shared_ptr;
|
using boost::shared_ptr;
|
||||||
|
|
||||||
using namespace sherpa;
|
using namespace mapkeeper;
|
||||||
|
|
||||||
int main(int argc, char **argv) {
|
int main(int argc, char **argv) {
|
||||||
shared_ptr<LSMServerHandler> handler(new LSMServerHandler(argc, argv));
|
shared_ptr<LSMServerHandler> handler(new LSMServerHandler(argc, argv));
|
||||||
shared_ptr<TProcessor> processor(new PersistentStoreProcessor(handler));
|
shared_ptr<TProcessor> processor(new MapKeeperProcessor(handler));
|
||||||
shared_ptr<TServerTransport> serverTransport(new TServerSocket(9090));
|
shared_ptr<TServerTransport> serverTransport(new TServerSocket(9090));
|
||||||
shared_ptr<TTransportFactory> transportFactory(new TFramedTransportFactory());
|
shared_ptr<TTransportFactory> transportFactory(new TFramedTransportFactory());
|
||||||
shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
|
shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
|
||||||
shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(32);
|
shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(32);
|
||||||
shared_ptr<ThreadFactory> threadFactory(new PosixThreadFactory());
|
shared_ptr<ThreadFactory> threadFactory(new PosixThreadFactory());
|
||||||
threadManager->threadFactory(threadFactory);
|
//threadManager->threadFactory(threadFactory);
|
||||||
threadManager->start();
|
//threadManager->start();
|
||||||
TThreadPoolServer server(processor, serverTransport, transportFactory, protocolFactory, threadManager);
|
TThreadedServer server(processor, serverTransport, transportFactory, protocolFactory);
|
||||||
|
printf("I'm using tthreaded server!");
|
||||||
server.serve();
|
server.serve();
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +0,0 @@
|
||||||
|
|
||||||
all::
|
|
||||||
thrift --gen cpp --gen java persistent_store.thrift
|
|
||||||
|
|
||||||
clean::
|
|
||||||
rm -rf gen-cpp gen-java
|
|
|
@ -1,42 +0,0 @@
|
||||||
<project name="ThriftClient" default="dist" basedir=".">
|
|
||||||
<description>
|
|
||||||
simple example build file
|
|
||||||
</description>
|
|
||||||
<!-- set global properties for this build -->
|
|
||||||
<property name="src" location="gen-java"/>
|
|
||||||
<property name="build" location="build"/>
|
|
||||||
<property name="dist" location="dist"/>
|
|
||||||
|
|
||||||
<target name="init">
|
|
||||||
<!-- Create the time stamp -->
|
|
||||||
<tstamp/>
|
|
||||||
<!-- Create the build directory structure used by compile -->
|
|
||||||
<mkdir dir="${build}"/>
|
|
||||||
</target>
|
|
||||||
|
|
||||||
<target name="compile" depends="init"
|
|
||||||
description="compile the source " >
|
|
||||||
<!-- Compile the java code from ${src} into ${build} -->
|
|
||||||
<javac srcdir="${src}" destdir="${build}">
|
|
||||||
<classpath>
|
|
||||||
<pathelement path="/usr/local/lib/libthrift.jar:/usr/share/java/slf4j-api-1.5.11.jar"/>
|
|
||||||
</classpath>
|
|
||||||
</javac>
|
|
||||||
</target>
|
|
||||||
|
|
||||||
<target name="dist" depends="compile"
|
|
||||||
description="generate the distribution" >
|
|
||||||
<!-- Create the distribution directory -->
|
|
||||||
<mkdir dir="${dist}/lib"/>
|
|
||||||
|
|
||||||
<!-- Put everything in ${build} into the MyProject-${DSTAMP}.jar file -->
|
|
||||||
<jar jarfile="${dist}/thriftclient.jar" basedir="${build}"/>
|
|
||||||
</target>
|
|
||||||
|
|
||||||
<target name="clean"
|
|
||||||
description="clean up" >
|
|
||||||
<!-- Delete the ${build} and ${dist} directory trees -->
|
|
||||||
<delete dir="${build}"/>
|
|
||||||
<delete dir="${dist}"/>
|
|
||||||
</target>
|
|
||||||
</project>
|
|
|
@ -1,224 +0,0 @@
|
||||||
/*
|
|
||||||
* Defines Sherpa persistent store interface.
|
|
||||||
*
|
|
||||||
* For a good tutorial on how to write a .thrift file, see:
|
|
||||||
*
|
|
||||||
* http://wiki.apache.org/thrift/Tutorial
|
|
||||||
*/
|
|
||||||
namespace cpp sherpa
|
|
||||||
namespace java com.yahoo.sherpa
|
|
||||||
|
|
||||||
enum ResponseCode
|
|
||||||
{
|
|
||||||
Ok = 0,
|
|
||||||
Error,
|
|
||||||
DatabaseExists,
|
|
||||||
DatabaseNotFound,
|
|
||||||
RecordExists,
|
|
||||||
RecordNotFound,
|
|
||||||
ScanEnded,
|
|
||||||
}
|
|
||||||
|
|
||||||
enum ScanOrder
|
|
||||||
{
|
|
||||||
Ascending,
|
|
||||||
Descending,
|
|
||||||
}
|
|
||||||
|
|
||||||
struct Record
|
|
||||||
{
|
|
||||||
1:binary key,
|
|
||||||
2:binary value,
|
|
||||||
}
|
|
||||||
|
|
||||||
struct RecordListResponse
|
|
||||||
{
|
|
||||||
1:ResponseCode responseCode,
|
|
||||||
2:list<Record> records,
|
|
||||||
}
|
|
||||||
|
|
||||||
struct BinaryResponse
|
|
||||||
{
|
|
||||||
1:ResponseCode responseCode,
|
|
||||||
2:binary value,
|
|
||||||
}
|
|
||||||
|
|
||||||
struct StringListResponse
|
|
||||||
{
|
|
||||||
1:ResponseCode responseCode,
|
|
||||||
2:list<string> values,
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Note about database name:
|
|
||||||
* Thrift string type translates to std::string in C++ and String in
|
|
||||||
* Java. Thrift does not validate whether the string is in utf8 in C++,
|
|
||||||
* but it does in Java. If you are using this class in C++, you need to
|
|
||||||
* make sure the database name is in utf8. Otherwise requests will fail.
|
|
||||||
*/
|
|
||||||
service PersistentStore
|
|
||||||
{
|
|
||||||
/**
|
|
||||||
* Pings this persistent store.
|
|
||||||
*
|
|
||||||
* @return Ok - if ping was successful.
|
|
||||||
* Error - if ping failed.
|
|
||||||
*/
|
|
||||||
ResponseCode ping(),
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Cleanly shuts down the persistent store.
|
|
||||||
*
|
|
||||||
* @return Ok - if shutdown is in progress.
|
|
||||||
* Error - otherwise
|
|
||||||
*/
|
|
||||||
ResponseCode shutdown(),
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Add a new database to this persistent store.
|
|
||||||
*
|
|
||||||
* A database is a container for a collection of records.
|
|
||||||
* A record is a binary key / binary value pair.
|
|
||||||
* A key uniquely identifies a record in a database.
|
|
||||||
*
|
|
||||||
* @param databaseName database name
|
|
||||||
* @return Ok - on success.
|
|
||||||
* DatabaseExists - database already exists.
|
|
||||||
* Error - on any other errors.
|
|
||||||
*/
|
|
||||||
ResponseCode addDatabase(1:string databaseName),
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Drops a database from this persistent store.
|
|
||||||
*
|
|
||||||
* @param databaseName database name
|
|
||||||
* @return Ok - on success.
|
|
||||||
* DatabaseNotFound - database doesn't exist.
|
|
||||||
* Error - on any other errors.
|
|
||||||
*/
|
|
||||||
ResponseCode dropDatabase(1:string databaseName),
|
|
||||||
|
|
||||||
/**
|
|
||||||
* List databases in this persistent store.
|
|
||||||
*
|
|
||||||
* @returns StringListResponse
|
|
||||||
* responseCode Ok - on success.
|
|
||||||
* Error - on error.
|
|
||||||
* values - list of databases.
|
|
||||||
*/
|
|
||||||
StringListResponse listDatabases(),
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns records in a database in lexicographical order.
|
|
||||||
*
|
|
||||||
* Note that startKey is supposed to be smaller than or equal to the endKey
|
|
||||||
* regardress of the scan order. For example, to scan all the records from
|
|
||||||
* "apple" to "banana" in descending order, startKey is "apple" and endKey
|
|
||||||
* is "banana". If startKey is larger than endKey, scan will succeed and
|
|
||||||
* result will be empty.
|
|
||||||
*
|
|
||||||
* This method will return ScanEnded if the scan was successful and it reached
|
|
||||||
* the end of the key range. It'll return Ok if it reached maxRecords or
|
|
||||||
* maxBytes, but it didn't reach the end of the key range.
|
|
||||||
*
|
|
||||||
* @param databaseName database name
|
|
||||||
* @param order Ascending or Decending.
|
|
||||||
* @param startKey Key to start scan from. If it's empty, scan starts
|
|
||||||
* from the smallest key in the database.
|
|
||||||
* @param startKeyIncluded
|
|
||||||
* Indicates whether the record that matches startKey is
|
|
||||||
* included in the response.
|
|
||||||
* @param endKey Key to end scan at. If it's emty scan ends at the largest
|
|
||||||
* key in the database.
|
|
||||||
* @param endKeyIncluded
|
|
||||||
* Indicates whether the record that matches endKey is
|
|
||||||
* included in the response.
|
|
||||||
* @param maxRecords
|
|
||||||
* Scan will return at most $maxRecords records.
|
|
||||||
* @param maxBytes Advise scan to return at most $maxBytes bytes. This
|
|
||||||
* method is not required to strictly keep the response
|
|
||||||
* size less than $maxBytes bytes.
|
|
||||||
* @return RecordListResponse
|
|
||||||
* responseCode - Ok if the scan was successful
|
|
||||||
* - ScanEnded if the scan was successful and
|
|
||||||
* scan reached the end of the range.
|
|
||||||
* - DatabaseNotFound database doesn't exist.
|
|
||||||
* - Error on any other errors
|
|
||||||
* records - list of records.
|
|
||||||
*/
|
|
||||||
RecordListResponse scan(1:string databaseName,
|
|
||||||
2:ScanOrder order,
|
|
||||||
3:binary startKey,
|
|
||||||
4:bool startKeyIncluded,
|
|
||||||
5:binary endKey,
|
|
||||||
6:bool endKeyIncluded,
|
|
||||||
7:i32 maxRecords,
|
|
||||||
8:i32 maxBytes),
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Retrieves a record from a database.
|
|
||||||
*
|
|
||||||
* @param databaseName database name
|
|
||||||
* @param recordKey record to retrive.
|
|
||||||
* @returns RecordListResponse
|
|
||||||
* responseCode - Ok
|
|
||||||
* DatabaseNotFound database doesn't exist.
|
|
||||||
* RecordNotFound record doesn't exist.
|
|
||||||
* Error on any other errors.
|
|
||||||
* records - list of records
|
|
||||||
*/
|
|
||||||
BinaryResponse get(1:string databaseName, 2:binary recordKey),
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Inserts a record into a database.
|
|
||||||
*
|
|
||||||
* @param databaseName database name
|
|
||||||
* @param recordKey record key to insert
|
|
||||||
* @param recordValue record value to insert
|
|
||||||
* @returns Ok
|
|
||||||
* DatabaseNotFound database doesn't exist.
|
|
||||||
* RecordExists
|
|
||||||
* Error
|
|
||||||
*/
|
|
||||||
ResponseCode insert(1:string databaseName, 2:binary recordKey, 3:binary recordValue),
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Inserts multiple records into a database.
|
|
||||||
*
|
|
||||||
* This operation is atomic: either all the records get inserted into a database
|
|
||||||
* or none does.
|
|
||||||
*
|
|
||||||
* @param databaseName database name
|
|
||||||
* @param records list of records to insert
|
|
||||||
* @returns Ok
|
|
||||||
* DatabaseNotFound
|
|
||||||
* RecordExists - if a record already exists in a database
|
|
||||||
* Error
|
|
||||||
*/
|
|
||||||
ResponseCode insertMany(1:string databaseName, 2:list<Record> records),
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Updates a record in a database.
|
|
||||||
*
|
|
||||||
* @param databaseName database name
|
|
||||||
* @param recordKey record key to update
|
|
||||||
* @param recordValue new value for the record
|
|
||||||
* @returns Ok
|
|
||||||
* DatabaseNotFound database doesn't exist.
|
|
||||||
* RecordNotFound
|
|
||||||
* Error
|
|
||||||
*/
|
|
||||||
ResponseCode update(1:string databaseName, 2:binary recordKey, 3:binary recordValue),
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Removes a record from a database.
|
|
||||||
*
|
|
||||||
* @param databaseName database name
|
|
||||||
* @param recordKey record to remove from the database.
|
|
||||||
* @returns Ok
|
|
||||||
* DatabaseNotFound database doesn't exist.
|
|
||||||
* RecordNotFound
|
|
||||||
* Error
|
|
||||||
*/
|
|
||||||
ResponseCode remove(1:string databaseName, 2:binary recordKey),
|
|
||||||
}
|
|
Loading…
Reference in a new issue