diff --git a/datatuple.h b/datatuple.h index ee1b8ec..644e5bd 100644 --- a/datatuple.h +++ b/datatuple.h @@ -106,7 +106,7 @@ public: static datatuple* create(const void* key, len_t keylen) { - return create(key, keylen, 0, DELETE)->sanity_check(); + return create(key, keylen, 0, DELETE); } static datatuple* create(const void* key, len_t keylen, const void* data, len_t datalen) { datatuple *ret = (datatuple*)malloc(sizeof(datatuple)); diff --git a/sherpa/LSMPersistentParentImpl.cc b/sherpa/LSMPersistentParentImpl.cc new file mode 100644 index 0000000..ac2c2b7 --- /dev/null +++ b/sherpa/LSMPersistentParentImpl.cc @@ -0,0 +1,135 @@ +#include "PersistentParent.h" +#include "LSMPersistentStoreImpl.h" +#include "TabletMetadata.h" +#include "tcpclient.h" + +// XXX for getpid... +#include +#include + +#include +#include + +class LSMPersistentParent : PersistentParent { +public: + + LSMPersistentParent() : ordered(true), hashed(false) { + } // we ignore the ordered flag... + SuCode::ResponseCode install(const SectionConfig& config) { + ordered.filestr << "LSM install called" << std::endl; + return SuCode::SuOk; + } + SuCode::ResponseCode init(const SectionConfig& config) { + ordered.filestr << "LSM init called" << std::endl; + ordered.init(config); + hashed.init(config); + return SuCode::SuOk; + } + PersistentStore *getHashedStore() { + ordered.filestr << "LSM getHashedStore called" << std::endl; + return &hashed; + } + PersistentStore *getOrderedStore() { + ordered.filestr << "LSM getOrderedStore called" << std::endl; + return &ordered; + } + bool ping() { + ordered.filestr << "LSM ping called" << std::endl; + return true; + } // XXX call OP_DBG_NOOP + std::string getName() { + ordered.filestr << "LSM getName called" << std::endl; + return "logstore"; + } + SuCode::ResponseCode getFreeSpaceBytes(double & freeSpaceBytes) { + ordered.filestr << "LSM getFreeSpaceBytes called" << std::endl; + freeSpaceBytes = 1024.0 *1024.0 * 1024.0; // XXX stub + return SuCode::SuOk; + } + SuCode::ResponseCode getDiskMaxBytes(double & diskMaxBytes) { + ordered.filestr << "LSM getDiskMaxBytes called" << std::endl; + diskMaxBytes = 10.0 * 1024.0 *1024.0 * 1024.0; // XXX stub + return SuCode::SuOk; + } + SuCode::ResponseCode cleanupTablet(uint64_t uniqId, + const std::string & tableName, + const std::string & tabletName) { + ordered.filestr << "LSM cleanupTablet called" << std::endl; + return SuCode::SuOk; // XXX stub + } + SuCode::ResponseCode getTabletMappingList(TabletList & tabletList) { + ordered.filestr << "LSM getTabletMappingList called" << std::endl; + + std::string metadata_table = std::string("ydht_metadata_table"); + std::string metadata_tablet = std::string("0"); + std::string metadata_tabletEnd = std::string("1"); + + size_t startlen; + size_t endlen; + + ordered.filestr << "getTabletMappingList C" << std::endl; + + unsigned char * start_tup = ordered.my_strcat(metadata_table, metadata_tablet, "", &startlen); + unsigned char * end_tup = ordered.my_strcat(metadata_table, metadata_tabletEnd, "", &endlen); + + ordered.filestr << "start tup = " << start_tup << std::endl; + ordered.filestr << "end tup = " << end_tup << std::endl; + + datatuple * starttup = datatuple::create(start_tup, startlen); + datatuple * endtup = datatuple::create(end_tup, endlen); + + free(start_tup); + free(end_tup); + ordered.filestr << "getTabletMappingList B conn = " << ordered.l_ << std::endl; + pid_t pid = getpid(); + + uint8_t rcode = logstore_client_op_returns_many(ordered.l_, OP_SCAN, starttup, endtup, 0); // 0 = no limit. + ordered.filestr << "getTabletMappingList A'" << std::endl; + + datatuple::freetuple(starttup); + datatuple::freetuple(endtup); + + datatuple * next; + ordered.filestr << "getTabletMappingList A" << std::endl; + ordered.filestr.flush(); + + TabletMetadata m; + if(rcode == LOGSTORE_RESPONSE_SENDING_TUPLES) { + while((next = logstore_client_next_tuple(ordered.l_))) { + ordered.metadata_buf(m, next->key(), next->keylen()); + + struct ydht_maptable_schema md; + md.uniq_id = 0; + md.tableName = m.table(); + md.tabletName = m.tablet(); + ordered.filestr << md.tableName << " : " << md.tabletName << std::endl; + tabletList.push_back(md); + datatuple::freetuple(next); + } + ordered.filestr << "getTabletMappingListreturns" << std::endl; + } else { + ordered.filestr << "error " << (int)rcode << " in getTabletMappingList." << std::endl; + return SuCode::PStoreUnexpectedError; // XXX should be "connection closed error" or something... + } + + return SuCode::SuOk; + } + SuCode::ResponseCode getApproximateTableSize + (const std::string& tableId, + int64_t& tableSize, int64_t & rowCount) { + ordered.filestr << "LSM getApproximateTableSize called" << std::endl; + return ordered.getApproximateTableSize(tableId, tableSize, rowCount); + } +private: + LSMPersistentStoreImpl ordered; + LSMPersistentStoreImpl hashed; +}; + +extern "C" { + void * lsmsherpa_init(); +} + +static LSMPersistentParent pp; +void * lsmsherpa_init() { + return &pp; +} diff --git a/sherpa/LSMPersistentStoreImpl.cc b/sherpa/LSMPersistentStoreImpl.cc index de96264..b0dd426 100644 --- a/sherpa/LSMPersistentStoreImpl.cc +++ b/sherpa/LSMPersistentStoreImpl.cc @@ -4,16 +4,160 @@ * Copyright (c) 2008 Yahoo, Inc. * All rights reserved. */ -#include "LSMPersistentStoreImpl.h" +#include "TabletMetadata.h" +#include "TabletIterator.h" +#include "ScanContinuation.h" +#include "OrderedScanContinuation.h" +#include "TabletRange.h" +#include "dht/UtilityBuffer.h" -#include -#include +//#include +#define DHT_DEBUG_STREAM() std::cerr +#define RESPONSE_ERROR_STREAM(x) std::cerr +#include "LSMPersistentStoreImpl.h" #include +class LSMIterator : public TabletIterator { + friend class LSMPersistentStoreImpl; +public: +// StorageRecord * data; // <- defined in parent class. next() updates it. + + LSMIterator(LSMPersistentStoreImpl* lsmImpl, const TabletMetadata& tabletMeta, ScanContinuationAutoPtr continuation, + ScanSelect::Selector ignored, const uint64_t expiryTime, + unsigned int scanLimit, size_t byteLimit /*ignored*/) : lsmImpl(lsmImpl) { + const unsigned char low_eos = (unsigned char) 0xFE; + const unsigned char high_eos = (unsigned char) 0xFF; + const unsigned char zero = (unsigned char) 0x00; + (void)zero; + // open iterator + datatuple * starttup = NULL; + datatuple * endtup = NULL; + + size_t start_key_len, end_key_len; + unsigned char *start_key, *end_key; + + if(continuation->isOrdered()) { + const OrderedScanContinuation& os = static_cast(*continuation); + + if(!os.getStartKey().isMinKey()) { + start_key = lsmImpl->buf_key(tabletMeta, os.getStartKey().getKey(), &start_key_len); + } else { + start_key = lsmImpl->buf_key(tabletMeta, "", &start_key_len); + } + + if(!os.getEndKey().isMaxKey()) { + end_key = lsmImpl->buf_key(tabletMeta, os.getEndKey().getKey(), &end_key_len); + } else { + end_key = lsmImpl->buf_key(tabletMeta, "", &end_key_len); + if(end_key[end_key_len-2] != low_eos) { + lsmImpl->filestr << "ERROR CORRUPT lsm tablet key = " << (char*)end_key << std::endl; + } else { + end_key[end_key_len-2] = high_eos; + } + } + } else { + lsmImpl->filestr << "WARNING: Scanning hash table, but ignoring contiunation range!" << std::endl; + start_key = lsmImpl->buf_key(tabletMeta, "", &start_key_len); + end_key = lsmImpl->buf_key(tabletMeta, "", &end_key_len); + if(end_key[end_key_len-2] != low_eos) { + lsmImpl->filestr << "ERROR CORRUPT lsm tablet key = " << (char*)end_key << std::endl; + } else { + end_key[end_key_len-2] = high_eos; + } + } + + starttup = datatuple::create(start_key, start_key_len); + std::string dbg((char*)start_key, start_key_len - 1); + lsmImpl->filestr << "start lsm key = " << dbg << std::endl; + + endtup = datatuple::create(end_key, end_key_len); + std::string dbg2((char*)end_key, end_key_len - 1); + lsmImpl->filestr << "end lsm key = " << dbg2 << std::endl; + + logstore_client_op_returns_many(lsmImpl->l_, OP_SCAN, starttup, endtup, scanLimit); + + datatuple::freetuple(starttup); + datatuple::freetuple(endtup); + + this->data = new StorageRecord(); + } + ~LSMIterator() { + lsmImpl->filestr << "close iterator called" << std::endl; + // close iterator by running to the end of it... TODO devise a better way to close iterators early? + while(this->data) { + next(); + } + lsmImpl->filestr << "close iterator done" << std::endl; + } + SuCode::ResponseCode next() { + datatuple * tup; + lsmImpl->filestr << "next called" << std::endl; + if((tup = logstore_client_next_tuple(lsmImpl->l_))) { + lsmImpl->filestr << "found tuple, key = " << tup->key() << " datalen = " << tup->datalen() << std::endl; + SuCode::ResponseCode rc = lsmImpl->tup_buf(*(this->data), tup); + datatuple::freetuple(tup); + return rc; + } else { + lsmImpl->filestr << "no tuple" << std::endl; + delete this->data; + this->data = NULL; + return SuCode::PStoreScanEnd; // XXX need to differentiate between end of scan and failure + } + } +private: + LSMPersistentStoreImpl * lsmImpl; +}; + // Initialize the logger -static log4cpp::Category &log = - log4cpp::Category::getInstance("dht.su."__FILE__); +//static log4cpp::Category &log = +// log4cpp::Category::getInstance("dht.su."__FILE__); + +//unsigned char* LSMPersistentStoreImpl::TabletMetadataToString(const TabletMetadata&m, size_t* len) { +// return my_strcat(m.table(), m.tablet(), "", len); +//} +//unsigned char* LSMPersistentStoreImpl::TabletMetadataToKey(const TabletMetadata&m, size_t* keylen) { +// +// std::string metadata_table = std::string("ydht_metadata_table"); +// std::string metadata_tablet= std::string("0"); +// +// size_t len; +// // note: we could call getTabletId instead, but might have to adjust the callers... +// const char * tablet_name = (const char*)TabletMetadataToString(m, &len); +// // XXX memory leak? +// std::string keystr = std::string(tablet_name, len); +// +// unsigned char * ret = my_strcat(metadata_table, metadata_tablet, tablet_name, keylen); +// +// filestr << "TabletMetadataToKey returns " << (char *) ret << std::endl; +// +// return ret; +// +//} +void LSMPersistentStoreImpl::buf_metadata(unsigned char ** buf, size_t *len, const TabletMetadata &m) { + std::string ydht_metadata_table = std::string("ydht_metadata_table"); + std::string zero = std::string("0"); + std::string tmp; unsigned char * tmp_p; size_t tmp_len; + tmp_p = my_strcat(m.table(), m.tablet(), "", &tmp_len); + tmp.assign((const char*)tmp_p, tmp_len); + free(tmp_p); + *buf = my_strcat(ydht_metadata_table, zero, tmp, len); +} + +void LSMPersistentStoreImpl::metadata_buf(TabletMetadata &m, const unsigned char * buf, size_t len) { + // Metadata table key format: + + // ydht_metadata_table[low_eos]0[low_eos]table[low_eos]tablet[low_eos][low_eos] + + std::string ydht_metadata_table, zero, tmp, table, tablet, empty; + my_strtok(buf, len, ydht_metadata_table, zero, tmp); + my_strtok((const unsigned char*)tmp.c_str(), tmp.length(), table, tablet, empty); + + filestr << "Parsed metadata: [" << table << "] [" << tablet << "] [" << empty << "](empty)" << std::endl; + m.setTable(table); + m.setTablet(tablet); + m.setTabletId(tmp.substr(0, tmp.length() - 1)); +} unsigned char* LSMPersistentStoreImpl::my_strcat(const std::string& table, const std::string& tablet, @@ -41,11 +185,45 @@ unsigned char* LSMPersistentStoreImpl::my_strcat(const std::string& table, memcpy(buf, &zero, 1); buf = 0; return ret; } +void LSMPersistentStoreImpl::my_strtok(const unsigned char* in, size_t len, std::string& table, std::string& tablet, std::string& key) { + const char * tablep = (const char*) in; + const unsigned char low_eos = (unsigned char) 0xFE; + const unsigned char high_eos = (unsigned char) 0xFF; + const unsigned char zero = (unsigned char) 0x00; + (void)high_eos; (void)zero; + + const char * tabletp = ((const char*)memchr(tablep, low_eos, len)) + 1; + int tablep_len = (tabletp - tablep)-1; // -1 is due to low_eos terminator + const char * keyp = ((const char*)memchr(tabletp, low_eos, len-tablep_len)) + 1; + int tabletp_len = (keyp - tabletp) - 1; // -1 is due to low_eos terminator + int keyp_len = (len - (tablep_len + 1 + tabletp_len + 1)) - 1; // -1 is due to null terminator. + + table.assign(tablep, tablep_len); + tablet.assign(tabletp, tabletp_len); + key.assign(keyp, keyp_len); +} unsigned char * LSMPersistentStoreImpl::buf_key(const TabletMetadata&m, const RecordKey& k, size_t * len) { - return my_strcat(m.table(), m.tablet(), k.name(), len); + return buf_key(m,k.name(),len); +} + unsigned char * + LSMPersistentStoreImpl::buf_key(const TabletMetadata&m, + const std::string s, size_t * len) { + const unsigned char low_eos = (unsigned char) 0xFE; + const unsigned char high_eos = (unsigned char) 0xFF; + const unsigned char zero = (unsigned char) 0x00; + (void)high_eos; (void)low_eos; + std::string md_name = m.getTabletId(); + *len = md_name.length() /*+ 1*/ + s.length() + 1; // md_name ends in a low_eos... + unsigned char * ret = (unsigned char*)malloc(*len); + unsigned char * buf = ret; + memcpy(buf, md_name.c_str(), md_name.length()); buf += md_name.length(); + //memcpy(buf, &low_eos, 1); buf++; + memcpy(buf, s.c_str(), s.length()); buf += s.length(); + memcpy(buf, &zero, 1); + return ret; } unsigned char * @@ -59,8 +237,7 @@ LSMPersistentStoreImpl::buf_val(const StorageRecord& val, size_t * len) { dataBlob_len = dataBlob.dataSize(); metadata_len = metadata.dataSize(); - DHT_DEBUG_STREAM() << "write storage record expiryTime " << expiryTime << " metadata len " << metadata_len << " datalen " << dataBlob_len; - +// DHT_DEBUG_STREAM() << "write storage record expiryTime " << expiryTime << " metadata len " << metadata_len << " datalen " << dataBlob_len; *len = sizeof(expiryTime) + 2 * sizeof(dataBlob_len) + dataBlob_len + metadata_len; @@ -76,12 +253,34 @@ LSMPersistentStoreImpl::buf_val(const StorageRecord& val, size_t * len) { return ret; } +SuCode::ResponseCode +LSMPersistentStoreImpl::tup_buf(StorageRecord& ret, datatuple * tup) { + SuCode::ResponseCode rc = SuCode::SuOk; + if(tup->key()) { + rc = key_buf(ret, tup->key(), tup->keylen()); + } + if(rc == SuCode::SuOk && !tup->isDelete() && tup->datalen() > 1) { + return val_buf(ret, tup->data(), tup->datalen()); + } else { + return rc; + } +} +SuCode::ResponseCode +LSMPersistentStoreImpl::key_buf(StorageRecord& ret, + const unsigned char * buf, size_t buf_len) { + std::string table, tablet, key; + my_strtok(buf, buf_len, table, tablet, key); + filestr << "key_buf parsed datatuple key: table = [" << table << "] tablet = [" << tablet << "] key = [" << key << "]" << std::endl; + ret.recordKey().setName(key); + return SuCode::SuOk; +} SuCode::ResponseCode LSMPersistentStoreImpl::val_buf(StorageRecord& ret, const unsigned char * buf, size_t buf_len) { uint64_t expiryTime; uint32_t dataBlob_len, metadata_len; +// DHT_DEBUG_STREAM() << "read storage record buf_len " << buf_len << std::endl; assert(buf_len >= sizeof(expiryTime) + sizeof(dataBlob_len) + sizeof(metadata_len)); // Copy header onto stack. @@ -90,12 +289,13 @@ LSMPersistentStoreImpl::val_buf(StorageRecord& ret, memcpy(&metadata_len, buf, sizeof(metadata_len)); buf += sizeof(metadata_len); memcpy(&dataBlob_len, buf, sizeof(dataBlob_len)); buf += sizeof(dataBlob_len); - DHT_DEBUG_STREAM() << "read storage record expiryTime " << expiryTime << " metadata len " << metadata_len << " datalen " << dataBlob_len; +// DHT_DEBUG_STREAM() << "read storage record buf_len " << buf_len << " expiryTime " << expiryTime << " metadata len " << metadata_len << " datalen " << dataBlob_len << " sum " << (sizeof(expiryTime) + sizeof(dataBlob_len) + sizeof(metadata_len) + metadata_len + dataBlob_len) << std::endl; +// +// DHT_DEBUG_STREAM() << " buffer contents " << buf << std::endl; // Is there room in ret? - assert(buf_len >= sizeof(expiryTime) + sizeof(dataBlob_len) + sizeof(metadata_len) - + metadata_len + dataBlob_len); + assert(buf_len >= sizeof(expiryTime) + sizeof(dataBlob_len) + sizeof(metadata_len) + metadata_len + dataBlob_len); if(ret.metadata().bufSize() < metadata_len || ret.dataBlob().bufSize() < dataBlob_len) { @@ -115,36 +315,90 @@ LSMPersistentStoreImpl::val_buf(StorageRecord& ret, return SuCode::SuOk; } LSMPersistentStoreImpl:: -LSMPersistentStoreImpl(bool isOrdered) : l_(NULL) { } +LSMPersistentStoreImpl(bool isOrdered) : isOrdered_(isOrdered), l_(NULL) { + filestr.open(isOrdered? "/tmp/lsm-log" : "/tmp/lsm-log-hashed", std::fstream::out | std::fstream::app); + l_ = logstore_client_open("localhost", 32432, 60); // XXX hardcode none of these values + filestr << "LSMP constructor called" << std::endl; +} LSMPersistentStoreImpl:: ~LSMPersistentStoreImpl() { + filestr << "LSMP destructor called" << std::endl; logstore_client_close(l_); } +SuCode::ResponseCode LSMPersistentStoreImpl::initMetadataMetadata(TabletMetadata& m) { + filestr << "LSMP initMetadataMetadata called [" << m << "] " << std::endl; + + std::string metadata_table = std::string("ydht_metadata_table"); + std::string metadata_tablet= std::string("0"); + size_t keylen; + char * key =(char*)my_strcat(metadata_table, metadata_tablet, "", &keylen); + std::string s(key, keylen-1); + m.setTabletId(s); + free(key); + return SuCode::SuOk; +} + SuCode::ResponseCode LSMPersistentStoreImpl:: init(const SectionConfig &config) { - if(!l_) // workaround bug 2870547 - l_ = logstore_client_open("localhost", 32432, 60); // XXX hardcode none of these values + filestr << "LSMP init called" << std::endl; +// if(!l_) // workaround bug 2870547 return l_ ? SuCode::SuOk : FwCode::NotFound; } bool LSMPersistentStoreImpl:: isOrdered(){ - return true; + filestr << "LSMP isOrdered called" << std::endl; + return isOrdered_; } SuCode::ResponseCode LSMPersistentStoreImpl:: addEmptyTablet(TabletMetadata& tabletMeta) { + filestr << "LSMP addEmptyTablet called" << std::endl; // This is a no-op; we'll simply prepend the tablet string to each record. - const std::string& mySQLTableName = tabletMeta.getTabletId(); + { + const std::string& mySQLTableName = tabletMeta.getTabletId(); - if (mySQLTableName!=""){ - return SuCode::PStoreTabletAlreadyExists; + if (mySQLTableName!=""){ + filestr << "Tablet " << mySQLTableName << " already exists!" << std::endl; + return SuCode::PStoreTabletAlreadyExists; + } else { + size_t keylen; unsigned char * key; + buf_metadata(&key, &keylen, tabletMeta); + metadata_buf(tabletMeta, key, keylen); + free(key); + return SuCode::SuOk; + } } + +// size_t keylen; +// unsigned char * key; +// buf_metadata(&key, &keylen, tabletMeta); +// //unsigned char * key = TabletMetadataToKey(tabletMeta, &keylen); +// std::string tmp((char*)key, keylen-1); +// filestr << "add tablet [" << tmp << "]" << std::endl; +// datatuple * tup = datatuple::create(key, keylen, "", 1); +// // XXX lookup tuple, if it exists, return SuCode::PStoreTabletAlreadyExists +// datatuple * result = logstore_client_op(l_, OP_INSERT, tup); +// datatuple::freetuple(tup); +// +// if(!result) { +// datatuple::freetuple(result); +// filestr << "ERROR inserting tuple in addEmptyTablet"; +// abort(); +// free(key); +// return SuCode::PStoreUnexpectedError; +// } else { +// metadata_buf(tabletMeta, key, keylen); +// std::string s((char*)key, keylen - 1); +// free(key); +// filestr << "Tablet " << s << " added" << std::endl; +// return SuCode::SuOk; +// } /* std::string newLSMTableName; @@ -160,14 +414,34 @@ addEmptyTablet(TabletMetadata& tabletMeta) //Save the mysql table name back in TabletMetadata tabletMeta.setTabletId(newLSMTableName); */ - return SuCode::SuOk; } SuCode::ResponseCode LSMPersistentStoreImpl:: dropTablet(TabletMetadata& tabletMeta) { + filestr << "LSMP dropTablet called" << std::endl; DHT_DEBUG_STREAM() << "dropTablet called. Falling back on clearTabletRange()"; - return clearTabletRange(tabletMeta, 0); + SuCode::ResponseCode ret = clearTabletRange(tabletMeta, 0); + + size_t keylen; + unsigned char * key; + buf_metadata(&key, &keylen, tabletMeta); + + datatuple * tup = datatuple::create(key, keylen); // two-argument form of datatuple::create creates a tombstone, which we will now insert. + free(key); + void * result = (void*)logstore_client_op(l_, OP_INSERT, tup); + datatuple::freetuple(tup); + + tabletMeta.setTabletId(""); + + if(!result) { + filestr << "LSMP dropTablet fails" << std::endl; + ret = SuCode::PStoreTabletCleanupFailed; + } else { + filestr << "LSMP dropTablet succeeds" << std::endl; + ret = SuCode::SuOk; + } + return ret; /* SuCode::ResponseCode rc; const std::string& mySQLTableName = tabletMeta.getTabletId(); @@ -186,6 +460,7 @@ dropTablet(TabletMetadata& tabletMeta) SuCode::ResponseCode LSMPersistentStoreImpl:: clearTabletRange(TabletMetadata& tabletMeta, uint32_t removalLimit) { + filestr << "LSMP clearTabletRange called" << std::endl; DHT_DEBUG_STREAM() << "clear tablet range is unimplemented. ignoring request"; // const std::string& mySQLTableName = tabletMeta.getTabletId(); // return mySQLCoreImpl_.deleteRange(mySQLTableName, tabletMeta.tablet(), removalLimit); @@ -194,10 +469,10 @@ clearTabletRange(TabletMetadata& tabletMeta, uint32_t removalLimit) SuCode::ResponseCode LSMPersistentStoreImpl:: -getApproximateTableSize(TabletMetadata& tabletMeta, +getApproximateTableSize(std::string tabletMeta, int64_t& tableSize, - int64_t & rowCount) -{ + int64_t & rowCount) { + filestr << "LSMP getApproximateTableSize called" << std::endl; DHT_DEBUG_STREAM() << "get approximate table size is unimplemented. returning dummy values"; tableSize = 1024 * 1024 * 1024; rowCount = 1024 * 1024; @@ -205,30 +480,44 @@ getApproximateTableSize(TabletMetadata& tabletMeta, // return mySQLCoreImpl_.getApproximateTableSize(mySQLTableName, tableSize, rowCount); return SuCode::SuOk; } +SuCode::ResponseCode LSMPersistentStoreImpl:: +getApproximateTableSize(TabletMetadata& tabletMeta, + int64_t& tableSize, + int64_t & rowCount) +{ + filestr << "LSMP getApproximateTableSize (2) called" << std::endl; + return getApproximateTableSize(tabletMeta.getTabletId(), tableSize, rowCount); +} SuCode::ResponseCode LSMPersistentStoreImpl:: get(const TabletMetadata& tabletMeta, StorageRecord& recordData) { + filestr << "LSMP get called" << tabletMeta.getTabletId() << ":" << recordData.recordKey()<< std::endl; size_t buflen; unsigned char * buf = buf_key(tabletMeta, recordData.recordKey(), &buflen); datatuple * key_tup = datatuple::create(buf, buflen); + free(buf); datatuple * result = logstore_client_op(l_, OP_FIND, key_tup); datatuple::freetuple(key_tup); SuCode::ResponseCode ret; if((!result) || result->isDelete()) { ret = SuCode::PStoreRecordNotFound; } else { + DHT_DEBUG_STREAM() << "call val buf from get, data len = " << result->datalen() << std::endl; ret = val_buf(recordData, result->data(), result->datalen()); } - free(buf); - datatuple::freetuple(result); + if(result) { + datatuple::freetuple(result); + } + filestr << "LSMP get returns succ = " << (ret == SuCode::SuOk) << std::endl; return ret; } SuCode::ResponseCode LSMPersistentStoreImpl:: update(const TabletMetadata& tabletMeta, const StorageRecord& updateData) { + filestr << "LSMP update called" << std::endl; SuCode::ResponseCode ret; { /// XXX hack. Copy of get() implementation, without the memcpy. size_t buflen; @@ -255,56 +544,85 @@ update(const TabletMetadata& tabletMeta, const StorageRecord& updateData) SuCode::ResponseCode LSMPersistentStoreImpl:: // XXX what to do about update? insert(const TabletMetadata& tabletMeta, const StorageRecord& insertData) { + filestr << "LSMP insert called" << tabletMeta.getTabletId() << ":" << insertData.recordKey()<< std::endl; size_t keybuflen, valbuflen; unsigned char * keybuf = buf_key(tabletMeta, insertData.recordKey(), &keybuflen); + filestr << "keybuf = " << keybuf << " (and perhaps a null)" << std::endl; unsigned char * valbuf = buf_val(insertData, &valbuflen); + filestr << "valbuf = " << valbuf << " (and perhaps a null)" << std::endl; datatuple * key_ins = datatuple::create(keybuf, keybuflen, valbuf, valbuflen); - datatuple * result = logstore_client_op(l_, OP_INSERT, key_ins); - datatuple::freetuple(result); + filestr << "insert create()" << std::endl; + void * result = (void*)logstore_client_op(l_, OP_INSERT, key_ins); + filestr << "insert insert()" << std::endl; + if(result) { + filestr << "LSMP insert will return result = " << result << std::endl; + } else { + filestr << "LSMP insert will return null "<< std::endl; + } + datatuple::freetuple(key_ins); + filestr << "insert free(key_ins)" << std::endl; free(keybuf); + filestr << "insert free(keybuf)" << std::endl; free(valbuf); + filestr << "insert free(valbuf)" << std::endl; + filestr << "LSMP insert returns "<< std::endl; return result ? SuCode::SuOk : SuCode::PStoreUnexpectedError; } SuCode::ResponseCode LSMPersistentStoreImpl:: remove(const TabletMetadata& tabletMeta, const RecordKey& recordName) { + filestr << "LSMP remove called" << std::endl; size_t buflen; unsigned char * buf = buf_key(tabletMeta, recordName, &buflen); datatuple * key_ins = datatuple::create(buf, buflen); datatuple * result = logstore_client_op(l_, OP_INSERT, key_ins); - datatuple::freetuple(result); + datatuple::freetuple(key_ins); free(buf); return result ? SuCode::SuOk : SuCode::PStoreUnexpectedError; } +bool LSMPersistentStoreImpl::ping() { + filestr << "LSMP ping called" << std::endl; + datatuple * ret = logstore_client_op(l_, OP_DBG_NOOP); + if(ret == NULL) { + return false; + } else { + datatuple::freetuple(ret); + return true; + } +} + StorageRecordIterator LSMPersistentStoreImpl:: scan(const TabletMetadata& tabletMeta, const ScanContinuation& continuation, - bool getMetadataOnly, const uint64_t expiryTime, unsigned int scanLimit) + ScanSelect::Selector selector, const uint64_t expiryTime, unsigned int scanLimit, size_t byteLimit) { + filestr << "LSMP scan called. " << std::endl; - /* const std::string& mySQLTableName = tabletMeta.getTabletId(); + filestr << "LSMP scan called. Tablet: " << tabletMeta.getTabletId() << std::endl; + ScanContinuationAutoPtr newContinuation; + TabletRangeAutoPtr tabletRange; - ScanContinuationAutoPtr newContinuation; - TabletRangeAutoPtr tabletRange; - - if (SuCode::SuOk != tabletMeta.keyRange(tabletRange)){ - BAD_CODE_ABORT("Bad tablet name"); - } */ + if (SuCode::SuOk != tabletMeta.keyRange(tabletRange)){ +// BAD_CODE_ABORT("Bad tablet name"); + filestr << "LSMP bad tablet name" << std::endl; + abort(); + } /* This is necessary once we turn on splits, because multiple tablets * might reside in the same mysql table. Shouldnt scan beyond the * upper limit of the tablet because that might be stale data left * from before this tablet split. */ - /* newContinuation = continuation.getContinuationLimitedToTabletRange( + newContinuation = continuation.getContinuationLimitedToTabletRange( *tabletRange); - LSMIterator* iter = new LSMIterator(mySQLTableName, newContinuation, - getMetadataOnly, - expiryTime, scanLimit); - return StorageRecordIterator(iter); */ - return StorageRecordIterator(NULL); + LSMIterator* iter = new LSMIterator(this, tabletMeta, newContinuation, + selector, /*getMetadataOnly,*/ + expiryTime, scanLimit, byteLimit); + + filestr << "LSMP scan returns" << std::endl; + return StorageRecordIterator(iter); } @@ -313,6 +631,7 @@ getSnapshotExporter(const TabletMetadata& tabletMeta, const std::string& snapshotId, SnapshotExporterAutoPtr& exporter) { + filestr << "LSMP getSnapshotExported called" << std::endl; /* const std::string& mySQLTableName = tabletMeta.getTabletId(); TabletRangeAutoPtr tabletRange; @@ -332,6 +651,7 @@ getSnapshotImporter(const TabletMetadata& tabletMeta, const std::string& snapshotId, SnapshotImporterAutoPtr& importer) { + filestr << "LSMP getSnapshotImporter called" << std::endl; /* if (version == LSMSnapshotExporter::VERSION){ const std::string& mySQLTableName = tabletMeta.getTabletId(); importer=LSMSnapshotExporter::getImporter(tabletMeta.table(), @@ -355,6 +675,7 @@ getIncomingCopyProgress(const TabletMetadata& metadata, int64_t& current, int64_t& estimated) const { + *const_cast(&filestr) << "LSMP getIncomingCopyProgress called" << std::endl; fprintf(stderr, "unsupported method getIncomingCopyProgrees called\n"); //This will be a problem when we have more than 1 @@ -376,6 +697,7 @@ getOutgoingCopyProgress(const TabletMetadata& metadata, int64_t& current, int64_t& estimated) const { + *const_cast(&filestr) << "LSMP getOutgoingCopyProgress called" << std::endl; fprintf(stderr, "unsupported method getOutgoingCopyProgrees called\n"); return 1; // return LSMSnapshotExporter:: diff --git a/sherpa/LSMPersistentStoreImpl.h b/sherpa/LSMPersistentStoreImpl.h index 81a9257..598152b 100644 --- a/sherpa/LSMPersistentStoreImpl.h +++ b/sherpa/LSMPersistentStoreImpl.h @@ -9,31 +9,48 @@ #ifndef LSM_PSTORE_IMPL_H #define LSM_PSTORE_IMPL_H +#include +#include + #include "PersistentStore.h" +#include "datatuple.h" //#include "LSMCoreImpl.h" struct logstore_handle_t; +class LSMIterator; + class LSMPersistentStoreImpl : public PersistentStore { - -private: +friend class LSMIterator; +friend class LSMPersistentParent; +protected: // LSMCoreImpl& mySQLCoreImpl_; - // bool isOrdered_; + bool isOrdered_; unsigned char * my_strcat(const std::string& table, const std::string& tablet, const std::string& key, size_t * len); + void my_strtok(const unsigned char* in, size_t len, std::string& table, std::string& tablet, std::string& key); unsigned char * buf_key(const TabletMetadata& m, const RecordKey& r, size_t * len); + unsigned char * buf_key(const TabletMetadata& m, const std::string s, + size_t * len); unsigned char * buf_val(const StorageRecord &val, size_t * len); + SuCode::ResponseCode tup_buf(StorageRecord &ret, datatuple * tup); + SuCode::ResponseCode key_buf(StorageRecord &ret, + const unsigned char * buf, size_t buf_len); SuCode::ResponseCode val_buf(StorageRecord &ret, const unsigned char * buf, size_t buf_len); public: - LSMPersistentStoreImpl(bool ordered); + std::fstream filestr; + + LSMPersistentStoreImpl(bool ordered); virtual ~LSMPersistentStoreImpl(); + SuCode::ResponseCode initMetadataMetadata(TabletMetadata& m); + /** * See PersistentStore API */ @@ -60,6 +77,15 @@ private: SuCode::ResponseCode clearTabletRange(TabletMetadata& tabletMeta, uint32_t removalLimit); + /** + Not part of PersistentStore api. PersistentParent needs to + provide this method as well, but it gets a string instead of a + TabletMetadata... + */ + SuCode::ResponseCode getApproximateTableSize(std::string tabletMeta, + int64_t& tableSize, + int64_t & rowCount); + /** * See PersistentStore API */ @@ -89,13 +115,17 @@ private: */ SuCode::ResponseCode remove(const TabletMetadata& tabletMeta, const RecordKey& recordKey); + /** + * Not part of PersistentStore API. However, PersistentParent needs to implement ping + */ + bool ping(); /** * See PersistentStore API */ - StorageRecordIterator - scan(const TabletMetadata& tabletMeta, const ScanContinuation& continuation, - bool getMetadataOnly, const uint64_t expiryTime, unsigned int scanLimit); + StorageRecordIterator + scan(const TabletMetadata& tabletMeta, const ScanContinuation& continuation, + ScanSelect::Selector selector, const uint64_t expiryTime, unsigned int scanLimit, size_t byteLimit); /** * See PersistentStore API @@ -139,6 +169,12 @@ private: LSMPersistentStoreImpl(LSMPersistentStoreImpl &); LSMPersistentStoreImpl operator=(LSMPersistentStoreImpl &); +// unsigned char* TabletMetadataToString(const TabletMetadata&m, size_t* keylen); +// unsigned char* TabletMetadataToKey(const TabletMetadata&m, size_t *key_len); + + void metadata_buf(TabletMetadata &m, const unsigned char * buf, size_t len); + void buf_metadata(unsigned char ** buf, size_t *len, const TabletMetadata &m); +protected: logstore_handle_t * l_; }; diff --git a/sherpa/Makefile b/sherpa/Makefile new file mode 100644 index 0000000..00c846f --- /dev/null +++ b/sherpa/Makefile @@ -0,0 +1,17 @@ +# yinst i log4cpp_y_dev-1.4.2 + +CPPFLAGS=-Wall -I/homes/sears/svndev/trunk/dht/common/include -I/homes/sears/svndev/trunk/dht/storage/handler/src -I/homes/sears/svndev/trunk/dht/storage/handler/include/ -I/homes/sears/svndev/trunk/dht/framework/base/include/ -I /homes/sears/svndev/trunk/dht/externals/fwcore/include -I /home/y/include -I .. -c -fPIC +CXXFLAGS=-m32 +CFLAGS=-m32 + +libsherpalogstore.so : LSMPersistentStoreImpl.o tcpclient.o LSMPersistentParentImpl.o + gcc -m32 -shared -Wl,-soname,libsherpalogstore.so -o libsherpalogstore.so LSMPersistentStoreImpl.o tcpclient.o LSMPersistentParentImpl.o + cp libsherpalogstore.so /homes/sears/sherpa-so/ + +#LSMPersistentStoreImpl.o : LSMPersistentStoreImpl.cc + +tcpclient.cpp : ../tcpclient.cpp + ln -s ../tcpclient.cpp + +clean: + rm -f LSMPersistentStoreImpl.o tcpclient.o libsherpalogstore.so