diff --git a/sherpa/LSMPersistentParentImpl.cc b/sherpa/LSMPersistentParentImpl.cc deleted file mode 100644 index 091caed..0000000 --- a/sherpa/LSMPersistentParentImpl.cc +++ /dev/null @@ -1,134 +0,0 @@ -#include "PersistentParent.h" -#include "LSMPersistentStoreImpl.h" -#include "TabletMetadata.h" -#include "tcpclient.h" - -#include - -// Initialize the logger -static log4cpp::Category &log = - log4cpp::Category::getInstance("dht.su."__FILE__); - -class LSMPersistentParent : PersistentParent { -public: - - LSMPersistentParent() : ordered(true), hashed(false) { - } // we ignore the ordered flag... - ~LSMPersistentParent() { - DHT_DEBUG_STREAM() << "~LSMPersistentParent called"; - } - SuCode::ResponseCode install(const SectionConfig& config) { - DHT_DEBUG_STREAM() << "LSM install called"; - return SuCode::SuOk; - } - SuCode::ResponseCode init(const SectionConfig& config) { - DHT_DEBUG_STREAM() << "LSM init called"; - ordered.init(config); - hashed.init(config); - return SuCode::SuOk; - } - PersistentStore *getHashedStore() { - DHT_DEBUG_STREAM() << "LSM getHashedStore called"; - return &hashed; - } - PersistentStore *getOrderedStore() { - DHT_DEBUG_STREAM() << "LSM getOrderedStore called"; - return &ordered; - } - bool ping() { - DHT_DEBUG_STREAM() << "LSM ping called"; - return ordered.ping(); - } - std::string getName() { - DHT_DEBUG_STREAM() << "LSM getName called"; - return "logstore"; - } - SuCode::ResponseCode getFreeSpaceBytes(double & freeSpaceBytes) { - DHT_DEBUG_STREAM() << "LSM getFreeSpaceBytes called"; - freeSpaceBytes = 1024.0 *1024.0 * 1024.0; // XXX stub - return SuCode::SuOk; - } - SuCode::ResponseCode getDiskMaxBytes(double & diskMaxBytes) { - DHT_DEBUG_STREAM() << "LSM getDiskMaxBytes called"; - diskMaxBytes = 10.0 * 1024.0 *1024.0 * 1024.0; // XXX stub - return SuCode::SuOk; - } - SuCode::ResponseCode cleanupTablet(uint64_t uniqId, - const std::string & tableName, - const std::string & tabletName) { - DHT_DEBUG_STREAM() << "LSM cleanupTablet called"; - return SuCode::SuOk; // XXX stub - } - SuCode::ResponseCode getTabletMappingList(TabletList & tabletList) { - DHT_DEBUG_STREAM() << "LSM getTabletMappingList called"; - - std::string metadata_table = std::string("ydht_metadata_table"); - std::string metadata_tablet = std::string("0"); - std::string metadata_tabletEnd = std::string("1"); - - size_t startlen; - size_t endlen; - - unsigned char * start_tup = ordered.my_strcat(metadata_table, metadata_tablet, "", &startlen); - unsigned char * end_tup = ordered.my_strcat(metadata_table, metadata_tabletEnd, "", &endlen); - - DHT_DEBUG_STREAM() << "start tup = " << start_tup; - DHT_DEBUG_STREAM() << "end tup = " << end_tup; - - datatuple * starttup = datatuple::create(start_tup, startlen); - datatuple * endtup = datatuple::create(end_tup, endlen); - - free(start_tup); - free(end_tup); - - uint8_t rcode = logstore_client_op_returns_many(ordered.l_, OP_SCAN, starttup, endtup, 0); // 0 = no limit. - - datatuple::freetuple(starttup); - datatuple::freetuple(endtup); - - datatuple * next; - - TabletMetadata m; - if(rcode == LOGSTORE_RESPONSE_SENDING_TUPLES) { - while((next = logstore_client_next_tuple(ordered.l_))) { - ordered.metadata_buf(m, next->key(), next->keylen()); - - struct ydht_maptable_schema md; - std::string cat = m.table() + m.tablet(); - md.uniq_id = 0; - for(int i = 0; i < cat.length(); i++) { - md.uniq_id += ((unsigned char)cat[i]); // XXX obviously, this is a terrible hack (and a poor hash function) - } - md.tableName = m.table(); - md.tabletName = m.tablet(); - DHT_DEBUG_STREAM() << md.uniq_id << " : " << md.tableName << " : " << md.tabletName; - tabletList.push_back(md); - datatuple::freetuple(next); - } - DHT_DEBUG_STREAM() << "getTabletMappingListreturns"; - } else { - DHT_ERROR_STREAM() << "error " << (int)rcode << " in getTabletMappingList."; - return SuCode::PStoreUnexpectedError; // XXX should be "connection closed error" or something... - } - - return SuCode::SuOk; - } - SuCode::ResponseCode getApproximateTableSize - (const std::string& tableId, - int64_t& tableSize, int64_t & rowCount) { - DHT_DEBUG_STREAM() << "LSM getApproximateTableSize called"; - return ordered.getApproximateTableSize(tableId, tableSize, rowCount); - } -private: - LSMPersistentStoreImpl ordered; - LSMPersistentStoreImpl hashed; -}; - -extern "C" { - void * lsmsherpa_init(); -} - -static LSMPersistentParent pp; -void * lsmsherpa_init() { - return &pp; -} diff --git a/sherpa/LSMPersistentStoreImpl.cc b/sherpa/LSMPersistentStoreImpl.cc deleted file mode 100644 index 1ccdc6f..0000000 --- a/sherpa/LSMPersistentStoreImpl.cc +++ /dev/null @@ -1,664 +0,0 @@ -/** - * \file LSMPersistentStoreImpl.cc - * - * Copyright (c) 2008 Yahoo, Inc. - * All rights reserved. - */ -#include "TabletMetadata.h" -#include "TabletIterator.h" -#include "ScanContinuation.h" -#include "OrderedScanContinuation.h" -#include "TabletRange.h" -#include "SuLimits.h" -#include "dht/UtilityBuffer.h" - -#include - -#include "LSMPersistentStoreImpl.h" - -#include - - -// Initialize the logger -static log4cpp::Category &log = - log4cpp::Category::getInstance("dht.su."__FILE__); - -class LSMIterator : public TabletIterator { - friend class LSMPersistentStoreImpl; - -public: -// StorageRecord * data; // <- defined in parent class. next() updates it. - - LSMIterator(LSMPersistentStoreImpl* lsmImpl, const TabletMetadata& tabletMeta, ScanContinuationAutoPtr continuation, - ScanSelect::Selector ignored, const uint64_t expiryTime, - unsigned int scanLimit, size_t byteLimit /*ignored*/) : lsmImpl(lsmImpl) { - const unsigned char low_eos = (unsigned char) 0xFE; - const unsigned char high_eos = (unsigned char) 0xFF; - const unsigned char zero = (unsigned char) 0x00; - (void)zero; - // open iterator - datatuple * starttup = NULL; - datatuple * endtup = NULL; - - size_t start_key_len, end_key_len; - unsigned char *start_key, *end_key; - - if(continuation->isOrdered()) { - const OrderedScanContinuation& os = static_cast(*continuation); - - if(!os.getStartKey().isMinKey()) { - start_key = lsmImpl->buf_key(tabletMeta, os.getStartKey().getKey(), &start_key_len); - } else { - start_key = lsmImpl->buf_key(tabletMeta, "", &start_key_len); - } - - if(!os.getEndKey().isMaxKey()) { - end_key = lsmImpl->buf_key(tabletMeta, os.getEndKey().getKey(), &end_key_len); - } else { - end_key = lsmImpl->buf_key(tabletMeta, "", &end_key_len); - if(end_key[end_key_len-2] != low_eos) { - DHT_ERROR_STREAM() << "CORRUPT lsm tablet key = " << (char*)end_key; - } else { - end_key[end_key_len-2] = high_eos; - } - } - } else { - DHT_WARN_STREAM() << "Scanning hash table, but ignoring contiunation range!"; - start_key = lsmImpl->buf_key(tabletMeta, "", &start_key_len); - end_key = lsmImpl->buf_key(tabletMeta, "", &end_key_len); - if(end_key[end_key_len-2] != low_eos) { - DHT_ERROR_STREAM() << "CORRUPT lsm tablet key = " << (char*)end_key; - } else { - end_key[end_key_len-2] = high_eos; - } - } - - starttup = datatuple::create(start_key, start_key_len); - std::string dbg((char*)start_key, start_key_len - 1); - DHT_DEBUG_STREAM() << "start lsm key = " << dbg; - - endtup = datatuple::create(end_key, end_key_len); - std::string dbg2((char*)end_key, end_key_len - 1); - DHT_DEBUG_STREAM() << "end lsm key = " << dbg2; - - uint8_t rc = logstore_client_op_returns_many(lsmImpl->scan_l_, OP_SCAN, starttup, endtup, scanLimit); - - datatuple::freetuple(starttup); - datatuple::freetuple(endtup); - - if(rc != LOGSTORE_RESPONSE_SENDING_TUPLES) { - this->error = rc; - } else { - this->error = 0; - } - - this->data = new StorageRecord(); - } - ~LSMIterator() { - DHT_DEBUG_STREAM() << "close iterator called"; - // close iterator by running to the end of it... TODO devise a better way to close iterators early? - while(this->data) { - next(); - } - DHT_DEBUG_STREAM() << "close iterator done"; - } - SuCode::ResponseCode next() { - datatuple * tup; - DHT_DEBUG_STREAM() << "next called"; - if(error) { // only catches errors during scan setup. - return SuCode::PStoreUnexpectedError; - } else if((tup = logstore_client_next_tuple(lsmImpl->scan_l_))) { - DHT_DEBUG_STREAM() << "found tuple, key = " << tup->key() << " datalen = " << tup->datalen(); - SuCode::ResponseCode rc = lsmImpl->tup_buf(*(this->data), tup); - datatuple::freetuple(tup); - return rc; - } else { - DHT_DEBUG_STREAM() << "no tuple"; - delete this->data; - this->data = NULL; - return SuCode::PStoreScanEnd; // XXX need to differentiate between end of scan and failure - } - } -private: - LSMPersistentStoreImpl * lsmImpl; - uint8_t error; -}; - -void LSMPersistentStoreImpl::buf_metadata(unsigned char ** buf, size_t *len, const TabletMetadata &m) { - std::string ydht_metadata_table = std::string("ydht_metadata_table"); - std::string zero = std::string("0"); - std::string tmp; unsigned char * tmp_p; size_t tmp_len; - tmp_p = my_strcat(m.table(), m.tablet(), "", &tmp_len); - tmp.assign((const char*)tmp_p, tmp_len); - free(tmp_p); - *buf = my_strcat(ydht_metadata_table, zero, tmp, len); -} - -void LSMPersistentStoreImpl::metadata_buf(TabletMetadata &m, const unsigned char * buf, size_t len) { - // Metadata table key format: - // ydht_metadata_table[low_eos]0[low_eos]table[low_eos]tablet[low_eos][low_eos] - - assert(buf); - std::string ydht_metadata_table, zero, tmp, table, tablet, empty; - my_strtok(buf, len, ydht_metadata_table, zero, tmp); - assert(tmp.c_str()); - my_strtok((const unsigned char*)tmp.c_str(), tmp.length(), table, tablet, empty); - - DHT_DEBUG_STREAM() << "Parsed metadata: [" << table << "] [" << tablet << "] [" << empty << "](empty)"; - m.setTable(table); - m.setTablet(tablet); - m.setTabletId(tmp.substr(0, tmp.length() - 1)); -} - -unsigned char* LSMPersistentStoreImpl::my_strcat(const std::string& table, - const std::string& tablet, - const std::string& key, - size_t * len) { - const char * a = table.c_str(); - size_t alen = table.length(); - const char * b = tablet.c_str(); - size_t blen = tablet.length(); - const char * c = key.c_str(); - size_t clen = key.length(); - // The following two bytes cannot occur in valid utf-8 - const unsigned char low_eos = (unsigned char) 0xFE; - const unsigned char high_eos = (unsigned char) 0xFF; - const unsigned char zero = (unsigned char) 0x00; - (void) high_eos; - *len = alen + 1 + blen + 1 + clen + 1; - unsigned char * ret = (unsigned char*)malloc(*len); - unsigned char * buf = ret; - memcpy(buf, a, alen); buf += alen; - memcpy(buf, &low_eos, 1); buf ++; - memcpy(buf, b, blen); buf += blen; - memcpy(buf, &low_eos, 1); buf ++; - memcpy(buf, c, clen); buf += clen; - memcpy(buf, &zero, 1); buf = 0; - return ret; -} -void LSMPersistentStoreImpl::my_strtok(const unsigned char* in, size_t len, std::string& table, std::string& tablet, std::string& key) { - const char * tablep = (const char*) in; - - const unsigned char low_eos = (unsigned char) 0xFE; - const unsigned char high_eos = (unsigned char) 0xFF; - const unsigned char zero = (unsigned char) 0x00; - (void)high_eos; (void)zero; - - const char * tabletp = ((const char*)memchr(tablep, low_eos, len)) + 1; - int tablep_len = (tabletp - tablep)-1; // -1 is due to low_eos terminator - const char * keyp = ((const char*)memchr(tabletp, low_eos, len-tablep_len)) + 1; - int tabletp_len = (keyp - tabletp) - 1; // -1 is due to low_eos terminator - int keyp_len = (len - (tablep_len + 1 + tabletp_len + 1)) - 1; // -1 is due to null terminator. - - table.assign(tablep, tablep_len); - tablet.assign(tabletp, tabletp_len); - key.assign(keyp, keyp_len); -} -unsigned char * -LSMPersistentStoreImpl::buf_key(const TabletMetadata&m, - const RecordKey& k, size_t * len) { - return buf_key(m,k.name(),len); -} - unsigned char * - LSMPersistentStoreImpl::buf_key(const TabletMetadata&m, - const std::string s, size_t * len) { - const unsigned char low_eos = (unsigned char) 0xFE; - const unsigned char high_eos = (unsigned char) 0xFF; - const unsigned char zero = (unsigned char) 0x00; - (void)high_eos; (void)low_eos; - std::string md_name = m.getTabletId(); - *len = md_name.length() /*+ 1*/ + s.length() + 1; // md_name ends in a low_eos... - unsigned char * ret = (unsigned char*)malloc(*len); - unsigned char * buf = ret; - memcpy(buf, md_name.c_str(), md_name.length()); buf += md_name.length(); - //memcpy(buf, &low_eos, 1); buf++; - memcpy(buf, s.c_str(), s.length()); buf += s.length(); - memcpy(buf, &zero, 1); - return ret; -} - -unsigned char * -LSMPersistentStoreImpl::buf_val(const StorageRecord& val, size_t * len) { - uint64_t expiryTime = val.expiryTime(); - uint32_t dataBlob_len, metadata_len; // Below, we assume these are of the same type. - - const UtilityBuffer& dataBlob = val.dataBlob(); - const UtilityBuffer& metadata = val.metadata(); - - dataBlob_len = dataBlob.dataSize(); - metadata_len = metadata.dataSize(); - -// DHT_DEBUG_STREAM() << "write storage record expiryTime " << expiryTime << " metadata len " << metadata_len << " datalen " << dataBlob_len; - - *len = sizeof(expiryTime) + 2 * sizeof(dataBlob_len) + dataBlob_len + metadata_len; - - unsigned char * ret = (unsigned char *) malloc(*len); - unsigned char * buf = ret; - memcpy(buf, &expiryTime, sizeof(expiryTime)); buf += sizeof(expiryTime); - memcpy(buf, &metadata_len, sizeof(metadata_len)); buf += sizeof(metadata_len); - memcpy(buf, &dataBlob_len, sizeof(dataBlob_len)); buf += sizeof(dataBlob_len); - memcpy(buf, const_cast(metadata).buffer(), - metadata_len); buf += metadata_len; - memcpy(buf, const_cast(dataBlob).buffer(), - dataBlob_len); buf += dataBlob_len; - - return ret; -} -SuCode::ResponseCode -LSMPersistentStoreImpl::tup_buf(StorageRecord& ret, datatuple * tup) { - SuCode::ResponseCode rc = SuCode::SuOk; - if(tup->key()) { - rc = key_buf(ret, tup->key(), tup->keylen()); - } - if(rc == SuCode::SuOk && !tup->isDelete() && tup->datalen() > 1) { - return val_buf(ret, tup->data(), tup->datalen()); - } else { - return rc; - } -} -SuCode::ResponseCode -LSMPersistentStoreImpl::key_buf(StorageRecord& ret, - const unsigned char * buf, size_t buf_len) { - std::string table, tablet, key; - my_strtok(buf, buf_len, table, tablet, key); - DHT_DEBUG_STREAM() << "key_buf parsed datatuple key: table = [" << table << "] tablet = [" << tablet << "] key = [" << key << "]"; - ret.recordKey().setName(key); - return SuCode::SuOk; -} - -SuCode::ResponseCode -LSMPersistentStoreImpl::val_buf(StorageRecord& ret, - const unsigned char * buf, size_t buf_len) { - uint64_t expiryTime; - uint32_t dataBlob_len, metadata_len; -// DHT_DEBUG_STREAM() << "read storage record buf_len " << buf_len << std::endl; - assert(buf_len >= sizeof(expiryTime) + sizeof(dataBlob_len) + sizeof(metadata_len)); - - // Copy header onto stack. - - memcpy(&expiryTime, buf, sizeof(expiryTime)); buf += sizeof(expiryTime); - memcpy(&metadata_len, buf, sizeof(metadata_len)); buf += sizeof(metadata_len); - memcpy(&dataBlob_len, buf, sizeof(dataBlob_len)); buf += sizeof(dataBlob_len); - - // Is there room in ret? - - assert(buf_len >= sizeof(expiryTime) + sizeof(dataBlob_len) + sizeof(metadata_len) + metadata_len + dataBlob_len); - - if(ret.metadata().bufSize() < metadata_len || - ret.dataBlob().bufSize() < dataBlob_len) { - return SuCode::PStoreDataTruncated; // RCS: This is what the mysql implementation does. - // it's somewhat misleading, as we don't truncate anything. - } - - // Copy the data into ret. - - // ret->setName(recordName); // somebody else's problem.... - ret.setExpiryTime(expiryTime); - - memcpy(ret.metadata().buffer(), buf, metadata_len); buf += metadata_len; - memcpy(ret.dataBlob().buffer(), buf, dataBlob_len); buf += dataBlob_len; - ret.metadata().setDataSize(metadata_len); - ret.dataBlob().setDataSize(dataBlob_len); - return SuCode::SuOk; -} - -LSMPersistentStoreImpl:: -LSMPersistentStoreImpl(bool isOrdered) : isOrdered_(isOrdered), l_(NULL), scan_l_(NULL) { - // filestr.open(isOrdered? "/tmp/lsm-log" : "/tmp/lsm-log-hashed", std::fstream::out | std::fstream::app); - // It would be unsafe to call the following, since we're statically initialized: DHT_DEBUG_STREAM() << "LSMP constructor called"; -} - -LSMPersistentStoreImpl:: -~LSMPersistentStoreImpl() -{ - DHT_DEBUG_STREAM() << "LSMP destructor called"; - if(l_) logstore_client_close(l_); - if(scan_l_) logstore_client_close(scan_l_); - DHT_DEBUG_STREAM() << "LSMP destructor cleanly closed connections"; -} - -SuCode::ResponseCode LSMPersistentStoreImpl::initMetadataMetadata(TabletMetadata& m) { - DHT_DEBUG_STREAM() << "LSMP initMetadataMetadata called"; - - std::string metadata_table = std::string("ydht_metadata_table"); - std::string metadata_tablet= std::string("0"); - size_t keylen; - char * key =(char*)my_strcat(metadata_table, metadata_tablet, "", &keylen); - std::string s(key, keylen-1); - m.setTabletId(s); - free(key); - return SuCode::SuOk; -} - -SuCode::ResponseCode LSMPersistentStoreImpl:: -init(const SectionConfig &config) -{ - DHT_DEBUG_STREAM() << "LSMP init called"; - if(!l_) { // workaround bug 2870547 - l_ = logstore_client_open("localhost", 32432, 60); // XXX hardcode none of these values - scan_l_ = logstore_client_open("localhost", 32432, 60); // XXX hardcode none of these values - } - return l_ ? SuCode::SuOk : FwCode::NotFound; -} - -bool LSMPersistentStoreImpl:: -isOrdered(){ - DHT_DEBUG_STREAM() << "LSMP isOrdered called"; - return isOrdered_; -} - -SuCode::ResponseCode LSMPersistentStoreImpl:: -addEmptyTablet(TabletMetadata& tabletMeta) -{ - DHT_DEBUG_STREAM() << "LSMP addEmptyTablet called"; - // This is a no-op; we'll simply prepend the tablet string to each record. - { - // Is table name too long? - if(tabletMeta.table().length() > SuLimits::MAX_TABLE_NAME_LENGTH_DB) { - return SuCode::PStoreIOFailed; - } - - const std::string& mySQLTableName = tabletMeta.getTabletId(); - - - if (mySQLTableName!=""){ - DHT_INFO_STREAM() << "Tablet " << mySQLTableName << " already exists!"; - return SuCode::PStoreTabletAlreadyExists; - } else { - size_t keylen; unsigned char * key; - buf_metadata(&key, &keylen, tabletMeta); - metadata_buf(tabletMeta, key, keylen); - free(key); - return SuCode::SuOk; - } - } -} - -SuCode::ResponseCode LSMPersistentStoreImpl:: -dropTablet(TabletMetadata& tabletMeta) -{ - DHT_INFO_STREAM() << "dropTablet called. Falling back on clearTabletRange()"; - SuCode::ResponseCode ret = clearTabletRange(tabletMeta, 0); - - size_t keylen; - unsigned char * key; - buf_metadata(&key, &keylen, tabletMeta); - - datatuple * tup = datatuple::create(key, keylen); // two-argument form of datatuple::create creates a tombstone, which we will now insert. - free(key); - void * result = (void*)logstore_client_op(l_, OP_INSERT, tup); - datatuple::freetuple(tup); - - tabletMeta.setTabletId(""); - - if(!result) { - DHT_WARN_STREAM() << "LSMP dropTablet fails"; - ret = SuCode::PStoreTabletCleanupFailed; - } else { - DHT_INFO_STREAM() << "LSMP dropTablet succeeds"; - ret = SuCode::SuOk; - } - return ret; -} - -// alternate to dropTablet() for when a tablet has been split and the underlying -// mysql table is being shared...just wipe out the record range for the tablet -// that is being dropped. -SuCode::ResponseCode LSMPersistentStoreImpl:: -clearTabletRange(TabletMetadata& tabletMeta, uint32_t removalLimit) -{ - DHT_WARN_STREAM() << "clear tablet range is unimplemented. ignoring request"; - return SuCode::SuOk; -} - -SuCode::ResponseCode LSMPersistentStoreImpl:: -getApproximateTableSize(std::string tabletMeta, - int64_t& tableSize, - int64_t & rowCount) { - DHT_WARN_STREAM() << "get approximate table size is unimplemented. returning dummy values"; - tableSize = 1024 * 1024 * 1024; - rowCount = 1024 * 1024; - return SuCode::SuOk; -} -SuCode::ResponseCode LSMPersistentStoreImpl:: -getApproximateTableSize(TabletMetadata& tabletMeta, - int64_t& tableSize, - int64_t & rowCount) -{ - DHT_DEBUG_STREAM() << "LSMP getApproximateTableSize (2) called"; - return getApproximateTableSize(tabletMeta.getTabletId(), tableSize, rowCount); -} - - -SuCode::ResponseCode LSMPersistentStoreImpl:: -get(const TabletMetadata& tabletMeta, StorageRecord& recordData) -{ - DHT_DEBUG_STREAM() << "LSMP get called" << tabletMeta.getTabletId() << ":" << recordData.recordKey(); - if(recordData.recordKey().name().length() > (isOrdered_ ? SuLimits::MAX_ORDERED_RECORD_NAME_LENGTH : SuLimits::MAX_RECORD_NAME_LENGTH)) { - return SuCode::PStoreIOFailed; - } - size_t buflen; - unsigned char * buf = buf_key(tabletMeta, recordData.recordKey(), &buflen); - datatuple * key_tup = datatuple::create(buf, buflen); - free(buf); - datatuple * result = logstore_client_op(l_, OP_FIND, key_tup); - datatuple::freetuple(key_tup); - SuCode::ResponseCode ret; - if((!result) || result->isDelete()) { - ret = SuCode::PStoreRecordNotFound; - } else { - //DHT_DEBUG_STREAM() << "call val buf from get, data len = " << result->datalen() << std::endl; - ret = val_buf(recordData, result->data(), result->datalen()); - } - if(result) { - datatuple::freetuple(result); - } - DHT_DEBUG_STREAM() << "LSMP get returns succ = " << (ret == SuCode::SuOk); - return ret; -} - -SuCode::ResponseCode LSMPersistentStoreImpl:: -update(const TabletMetadata& tabletMeta, const StorageRecord& updateData) -{ - DHT_DEBUG_STREAM() << "LSMP update called"; - if(updateData.recordKey().name().length() > (isOrdered_ ? SuLimits::MAX_ORDERED_RECORD_NAME_LENGTH : SuLimits::MAX_RECORD_NAME_LENGTH)) { - return SuCode::PStoreIOFailed; - } - - SuCode::ResponseCode ret; - { /// XXX hack. Copy of get() implementation, without the memcpy. - size_t buflen; - unsigned char * buf = buf_key(tabletMeta, updateData.recordKey(), &buflen); - datatuple * key_tup = datatuple::create(buf, buflen); - datatuple * result = logstore_client_op(l_, OP_FIND, key_tup); - datatuple::freetuple(key_tup); - - if((!result) || result->isDelete()) { - RESPONSE_ERROR_STREAM(SuCode::PStoreRecordNotFound) << "EC:PSTORE:No matching " << - " record to update"; - ret = SuCode::PStoreRecordNotFound; - } else { - // skip val_buf ... - ret = SuCode::SuOk; - } - free(buf); - if(result) { datatuple::freetuple(result); } // XXX differentiate between dead connection and missing tuple - } - if(ret == SuCode::PStoreRecordNotFound) { return ret; } - return insert(tabletMeta, updateData); -} - -SuCode::ResponseCode LSMPersistentStoreImpl:: // XXX what to do about update? -insert(const TabletMetadata& tabletMeta, - const StorageRecord& insertData) { - DHT_DEBUG_STREAM()<< "LSMP insert called" << tabletMeta.getTabletId() << ":" << insertData.recordKey(); - size_t keybuflen, valbuflen; - if(insertData.recordKey().name().length() > (isOrdered_ ? SuLimits::MAX_ORDERED_RECORD_NAME_LENGTH : SuLimits::MAX_RECORD_NAME_LENGTH)) { - return SuCode::PStoreIOFailed; - } - unsigned char * keybuf = buf_key(tabletMeta, insertData.recordKey(), &keybuflen); - DHT_DEBUG_STREAM() << "keybuf = " << keybuf << " (and perhaps a null)"; - unsigned char * valbuf = buf_val(insertData, &valbuflen); - DHT_DEBUG_STREAM() << "valbuf = " << valbuf << " (and perhaps a null)"; - datatuple * key_ins = datatuple::create(keybuf, keybuflen, valbuf, valbuflen); - DHT_DEBUG_STREAM() << "insert create()"; - void * result = (void*)logstore_client_op(l_, OP_INSERT, key_ins); - DHT_DEBUG_STREAM() << "insert insert()"; - if(result) { - DHT_DEBUG_STREAM() << "LSMP insert will return result = " << result; - } else { - DHT_DEBUG_STREAM() << "LSMP insert will return null "; - } - datatuple::freetuple(key_ins); - free(keybuf); - free(valbuf); - DHT_DEBUG_STREAM() << "LSMP insert returns "; - return result ? SuCode::SuOk : SuCode::PStoreUnexpectedError; -} - -SuCode::ResponseCode LSMPersistentStoreImpl:: -remove(const TabletMetadata& tabletMeta, const RecordKey& recordName) -{ - DHT_DEBUG_STREAM() << "LSMP remove called"; - if(recordName.name().length() > (isOrdered_ ? SuLimits::MAX_ORDERED_RECORD_NAME_LENGTH : SuLimits::MAX_RECORD_NAME_LENGTH)) { - return SuCode::PStoreIOFailed; - } - - StorageRecord tmp(recordName.name()); - SuCode::ResponseCode rc = get(tabletMeta, tmp); - if(SuCode::SuOk == rc) { - size_t buflen; - unsigned char * buf = buf_key(tabletMeta, recordName, &buflen); - datatuple * key_ins = datatuple::create(buf, buflen); - datatuple * result = logstore_client_op(l_, OP_INSERT, key_ins); - datatuple::freetuple(key_ins); - free(buf); - return result ? SuCode::SuOk : SuCode::PStoreUnexpectedError; - } else { - DHT_DEBUG_STREAM() << "LSMP remove: record not found, or error"; - return rc; - } -} - -bool LSMPersistentStoreImpl::ping() { - DHT_DEBUG_STREAM() << "LSMP ping called"; - datatuple * ret = logstore_client_op(l_, OP_DBG_NOOP); - if(ret == NULL) { - DHT_WARN_STREAM() << "LSMP ping failed"; - return false; - } else { - datatuple::freetuple(ret); - return true; - } -} - -StorageRecordIterator LSMPersistentStoreImpl:: -scan(const TabletMetadata& tabletMeta, const ScanContinuation& continuation, - ScanSelect::Selector selector, const uint64_t expiryTime, unsigned int scanLimit, size_t byteLimit) -{ - DHT_DEBUG_STREAM() << "LSMP scan called. Tablet: " << tabletMeta.getTabletId(); - ScanContinuationAutoPtr newContinuation; - TabletRangeAutoPtr tabletRange; - - if (SuCode::SuOk != tabletMeta.keyRange(tabletRange)){ - BAD_CODE_ABORT("Bad tablet name"); - } - - /* This is necessary once we turn on splits, because multiple tablets - * might reside in the same mysql table. Shouldnt scan beyond the - * upper limit of the tablet because that might be stale data left - * from before this tablet split. - */ - newContinuation = continuation.getContinuationLimitedToTabletRange( - *tabletRange); - - LSMIterator* iter = new LSMIterator(this, tabletMeta, newContinuation, - selector, /*getMetadataOnly,*/ - expiryTime, scanLimit, byteLimit); - - DHT_DEBUG_STREAM() << "LSMP scan returns. Error = " << iter->error; - return StorageRecordIterator(iter); - -} - -SuCode::ResponseCode LSMPersistentStoreImpl:: -getSnapshotExporter(const TabletMetadata& tabletMeta, - const std::string& snapshotId, - SnapshotExporterAutoPtr& exporter) -{ - DHT_WARN_STREAM() << "Unimplemented: LSMP getSnapshotExported called"; - /* const std::string& mySQLTableName = tabletMeta.getTabletId(); - - TabletRangeAutoPtr tabletRange; - RETURN_IF_NOT_OK(tabletMeta.keyRange(tabletRange)); - - ScanContinuationAutoPtr cont = tabletRange->getContinuationForTablet(); - - exporter = SnapshotExporterAutoPtr( - new LSMSnapshotExporter(mySQLTableName,cont,snapshotId)); - */ - return SuCode::SuOk; -} - -SuCode::ResponseCode LSMPersistentStoreImpl:: -getSnapshotImporter(const TabletMetadata& tabletMeta, - const std::string& version, - const std::string& snapshotId, - SnapshotImporterAutoPtr& importer) -{ - DHT_WARN_STREAM() << "Unimplemented: getSnapshotImporter called"; - /* if (version == LSMSnapshotExporter::VERSION){ - const std::string& mySQLTableName = tabletMeta.getTabletId(); - importer=LSMSnapshotExporter::getImporter(tabletMeta.table(), - tabletMeta.tablet(), - snapshotId, - mySQLTableName); - return SuCode::SuOk; - }else{ - RESPONSE_ERROR_STREAM(SuCode::PStoreUnexpectedError) << - "EC:IMPOSSIBLE:Unknown snapshot version " << version <<" while trying to " << - "import to tablet " << tabletMeta.tablet() << "of table " << - tabletMeta.table(); - return SuCode::PStoreUnexpectedError; - }*/ - return SuCode::SuOk; -} - -SuCode::ResponseCode LSMPersistentStoreImpl:: -getIncomingCopyProgress(const TabletMetadata& metadata, - const std::string& snapshotId, - int64_t& current, - int64_t& estimated) const -{ - DHT_DEBUG_STREAM() << "Unimplemented: LSMP getIncomingCopyProgress called"; - - //This will be a problem when we have more than 1 - //exporter/importer type. We will have to store the - //snapshot version somewhere in tablet metadata - current = 1024*1024*1024; - estimated = 1024*1024*1024; - - /* const std::string& mySQLTableName = metadata.getTabletId(); - return LSMSnapshotExporter:: - getIncomingCopyProgress(mySQLTableName, - snapshotId, - current, - estimated); */ - return SuCode::SuOk; -} - -SuCode::ResponseCode LSMPersistentStoreImpl:: -getOutgoingCopyProgress(const TabletMetadata& metadata, - const std::string& snapshotId, - int64_t& current, - int64_t& estimated) const -{ - DHT_DEBUG_STREAM() << "Unimplemented: LSMP getOutgoingCopyProgress called"; - current = 1024*1024*1024; - estimated = 1024*1024*1024; - return SuCode::SuOk; - // return LSMSnapshotExporter:: - // getOutgoingCopyProgress(snapshotId, - // current, - // estimated); -} diff --git a/sherpa/LSMPersistentStoreImpl.h b/sherpa/LSMPersistentStoreImpl.h deleted file mode 100644 index c713cd3..0000000 --- a/sherpa/LSMPersistentStoreImpl.h +++ /dev/null @@ -1,177 +0,0 @@ -/** - * \file LSMPersistentStoreImpl.h - * \brief This file is a wrapper over the LSM-Tree network protocol. - * - * Copyright (c) 2008 Yahoo, Inc. - * All rights reserved. - */ - -#ifndef LSM_PSTORE_IMPL_H -#define LSM_PSTORE_IMPL_H - -#include "PersistentStore.h" -#include "datatuple.h" -//#include "LSMCoreImpl.h" - -struct logstore_handle_t; - -class LSMIterator; - -class LSMPersistentStoreImpl : public PersistentStore -{ -friend class LSMIterator; -friend class LSMPersistentParent; -protected: - - // LSMCoreImpl& mySQLCoreImpl_; - bool isOrdered_; - unsigned char * my_strcat(const std::string& table, - const std::string& tablet, - const std::string& key, - size_t * len); - void my_strtok(const unsigned char* in, size_t len, std::string& table, std::string& tablet, std::string& key); - unsigned char * buf_key(const TabletMetadata& m, const RecordKey& r, - size_t * len); - unsigned char * buf_key(const TabletMetadata& m, const std::string s, - size_t * len); - unsigned char * buf_val(const StorageRecord &val, - size_t * len); - SuCode::ResponseCode tup_buf(StorageRecord &ret, datatuple * tup); - SuCode::ResponseCode key_buf(StorageRecord &ret, - const unsigned char * buf, size_t buf_len); - SuCode::ResponseCode val_buf(StorageRecord &ret, - const unsigned char * buf, size_t buf_len); - public: - - LSMPersistentStoreImpl(bool ordered); - virtual ~LSMPersistentStoreImpl(); - - SuCode::ResponseCode initMetadataMetadata(TabletMetadata& m); - - /** - * See PersistentStore API - */ - SuCode::ResponseCode init(const SectionConfig& config); - - /** - * See PersistentStore API - */ - bool isOrdered(); - - /** - * See PersistentStore API - */ - SuCode::ResponseCode addEmptyTablet(TabletMetadata& tabletMeta); - - /** - * See PersistentStore API - */ - SuCode::ResponseCode dropTablet(TabletMetadata& tabletMeta); - - /** - * See PersistentStore API - */ - SuCode::ResponseCode clearTabletRange(TabletMetadata& tabletMeta, - uint32_t removalLimit); - - /** - Not part of PersistentStore api. PersistentParent needs to - provide this method as well, but it gets a string instead of a - TabletMetadata... - */ - SuCode::ResponseCode getApproximateTableSize(std::string tabletMeta, - int64_t& tableSize, - int64_t & rowCount); - - /** - * See PersistentStore API - */ - SuCode::ResponseCode getApproximateTableSize(TabletMetadata& tabletMeta, - int64_t& tableSize, - int64_t & rowCount); - - /** - * See PersistentStore API - */ - SuCode::ResponseCode get(const TabletMetadata& tabletMeta, - StorageRecord& recordData); - - /** - * See PersistentStore API - */ - SuCode::ResponseCode update(const TabletMetadata& tabletMeta, - const StorageRecord& updateData); - - /** - * See PersistentStore API - */ - SuCode::ResponseCode insert(const TabletMetadata& tabletMeta, - const StorageRecord& insertData); - /** - * See PersistentStore API - */ - SuCode::ResponseCode remove(const TabletMetadata& tabletMeta, - const RecordKey& recordKey); - /** - * Not part of PersistentStore API. However, PersistentParent needs to implement ping - */ - bool ping(); - - /** - * See PersistentStore API - */ - StorageRecordIterator - scan(const TabletMetadata& tabletMeta, const ScanContinuation& continuation, - ScanSelect::Selector selector, const uint64_t expiryTime, unsigned int scanLimit, size_t byteLimit); - - /** - * See PersistentStore API - */ - SuCode::ResponseCode getSnapshotExporter(const TabletMetadata& tabletMeta, - const std::string& snapshotId, - SnapshotExporterAutoPtr& exporter); - - /** - * See PersistentStore API - */ - SuCode::ResponseCode getSnapshotImporter(const TabletMetadata& tabletMeta, - const std::string& version, - const std::string& snapshotId, - SnapshotImporterAutoPtr& snapshot) ; - - /** - * See PersistentStore API - */ - SuCode::ResponseCode getIncomingCopyProgress(const TabletMetadata& metadata, - const std::string& snapshotId, - int64_t& current, - int64_t& estimated) const; - - /** - * See PersistentStore API - */ - SuCode::ResponseCode getOutgoingCopyProgress(const TabletMetadata& metadata, - const std::string& snapshotId, - int64_t& current, - int64_t& estimated) const; -private: - /** - * connect to the database. Noop if already connected. - * - * @return SuCode::SuOk if successful, error otherwise - */ - // SuCode::ResponseCode connect(); - -private: - LSMPersistentStoreImpl(LSMPersistentStoreImpl &); - LSMPersistentStoreImpl operator=(LSMPersistentStoreImpl &); - - void metadata_buf(TabletMetadata &m, const unsigned char * buf, size_t len); - void buf_metadata(unsigned char ** buf, size_t *len, const TabletMetadata &m); -protected: - logstore_handle_t * l_; - logstore_handle_t * scan_l_; // XXX make sure that one scan handle per process suffices. - -}; - -#endif /*LSM_PSTORE_IMPL_H*/ diff --git a/sherpa/LSMServerHandler.cc b/sherpa/LSMServerHandler.cc index c53bbff..dcfe2ee 100644 --- a/sherpa/LSMServerHandler.cc +++ b/sherpa/LSMServerHandler.cc @@ -34,8 +34,8 @@ LSMServerHandler(int argc, char **argv) c0_size = 1024 * 1024 * 100; printf("warning: running w/ tiny c0 for testing\n"); // XXX build a separate test server and deployment server? } else if(!strcmp(argv[i], "--benchmark")) { - stasis_buffer_manager_size = (1024LL * 1024LL * 1024LL * 2LL) / PAGE_SIZE; // 4GB total - c0_size = 1024LL * 1024LL * 1024LL * 2LL; + stasis_buffer_manager_size = (1024LL * 1024LL * 1024LL * 1LL) / PAGE_SIZE; // 4GB total + c0_size = 1024LL * 1024LL * 1024LL * 1LL; printf("note: running w/ 2GB c0 for benchmarking\n"); // XXX build a separate test server and deployment server? } else if(!strcmp(argv[i], "--log-mode")) { i++; @@ -128,14 +128,14 @@ nextDatabaseId() ResponseCode::type LSMServerHandler:: ping() { - return sherpa::ResponseCode::Ok; + return mapkeeper::ResponseCode::Success; } ResponseCode::type LSMServerHandler:: shutdown() { exit(0); // xxx hack - return sherpa::ResponseCode::Ok; + return mapkeeper::ResponseCode::Success; } ResponseCode::type LSMServerHandler:: @@ -143,18 +143,18 @@ insert(datatuple* tuple) { ltable_->insertTuple(tuple); datatuple::freetuple(tuple); - return sherpa::ResponseCode::Ok; + return mapkeeper::ResponseCode::Success; } ResponseCode::type LSMServerHandler:: -addDatabase(const std::string& databaseName) +addMap(const std::string& databaseName) { uint32_t id = nextDatabaseId(); datatuple* tup = buildTuple(0, databaseName, (void*)&id, (uint32_t)(sizeof(id))); datatuple* ret = get(tup); if (ret) { datatuple::freetuple(ret); - return sherpa::ResponseCode::DatabaseExists; + return mapkeeper::ResponseCode::MapExists; } return insert(tup); } @@ -165,23 +165,23 @@ addDatabase(const std::string& databaseName) * all the records! */ ResponseCode::type LSMServerHandler:: -dropDatabase(const std::string& databaseName) +dropMap(const std::string& databaseName) { #if 0 Bdb::ResponseCode rc = databaseIds_.remove(databaseName); if (rc == Bdb::KeyNotFound) { - return sherpa::ResponseCode::DatabaseNotFound; - } else if (rc != Bdb::Ok) { - return sherpa::ResponseCode::Error; + return mapkeeper::ResponseCode::MapNotFound; + } else if (rc != Bdb::Success) { + return mapkeeper::ResponseCode::Error; } else { - return sherpa::ResponseCode::Ok; + return mapkeeper::ResponseCode::Success; } #endif - return sherpa::ResponseCode::Ok; + return mapkeeper::ResponseCode::Success; } void LSMServerHandler:: -listDatabases(StringListResponse& _return) +listMaps(StringListResponse& _return) { } @@ -194,7 +194,7 @@ scan(RecordListResponse& _return, const std::string& databaseName, const ScanOrd uint32_t id = getDatabaseId(databaseName); if (id == 0) { // database not found - _return.responseCode = sherpa::ResponseCode::DatabaseNotFound; + _return.responseCode = mapkeeper::ResponseCode::MapNotFound; return; } @@ -213,7 +213,7 @@ scan(RecordListResponse& _return, const std::string& databaseName, const ScanOrd (maxBytes == 0 || resultSize < maxBytes)) { datatuple* current = itr->getnext(); if (current == NULL) { - _return.responseCode = sherpa::ResponseCode::ScanEnded; + _return.responseCode = mapkeeper::ResponseCode::ScanEnded; break; } @@ -228,7 +228,7 @@ scan(RecordListResponse& _return, const std::string& databaseName, const ScanOrd if ((!endKeyIncluded && cmp >= 0) || (endKeyIncluded && cmp > 0)) { datatuple::freetuple(current); - _return.responseCode = sherpa::ResponseCode::ScanEnded; + _return.responseCode = mapkeeper::ResponseCode::ScanEnded; break; } @@ -260,17 +260,17 @@ get(BinaryResponse& _return, const std::string& databaseName, const std::string& uint32_t id = getDatabaseId(databaseName); if (id == 0) { // database not found - _return.responseCode = sherpa::ResponseCode::DatabaseNotFound; + _return.responseCode = mapkeeper::ResponseCode::MapNotFound; return; } datatuple* recordBody = get(id, recordName); if (recordBody == NULL) { // record not found - _return.responseCode = sherpa::ResponseCode::RecordNotFound; + _return.responseCode = mapkeeper::ResponseCode::RecordNotFound; return; } - _return.responseCode = sherpa::ResponseCode::Ok; + _return.responseCode = mapkeeper::ResponseCode::Success; _return.value.assign((const char*)(recordBody->data()), recordBody->datalen()); datatuple::freetuple(recordBody); } @@ -301,6 +301,14 @@ get(uint32_t databaseId, const std::string& recordName) return ret; } +ResponseCode::type LSMServerHandler:: +put(const std::string& databaseName, + const std::string& recordName, + const std::string& recordBody) +{ + return mapkeeper::ResponseCode::Success; +} + ResponseCode::type LSMServerHandler:: insert(const std::string& databaseName, const std::string& recordName, @@ -309,7 +317,7 @@ insert(const std::string& databaseName, // std::cerr << "inserting " << databaseName << "." << recordName << std::endl; uint32_t id = getDatabaseId(databaseName); if (id == 0) { - return sherpa::ResponseCode::DatabaseNotFound; + return mapkeeper::ResponseCode::MapNotFound; } datatuple* oldRecordBody = get(id, recordName); if (oldRecordBody != NULL) { @@ -317,7 +325,7 @@ insert(const std::string& databaseName, datatuple::freetuple(oldRecordBody); } else { datatuple::freetuple(oldRecordBody); - return sherpa::ResponseCode::RecordExists; + return mapkeeper::ResponseCode::RecordExists; } } @@ -328,7 +336,7 @@ insert(const std::string& databaseName, ResponseCode::type LSMServerHandler:: insertMany(const std::string& databaseName, const std::vector & records) { - return sherpa::ResponseCode::Error; + return mapkeeper::ResponseCode::Error; } ResponseCode::type LSMServerHandler:: @@ -338,11 +346,11 @@ update(const std::string& databaseName, { uint32_t id = getDatabaseId(databaseName); if (id == 0) { - return sherpa::ResponseCode::DatabaseNotFound; + return mapkeeper::ResponseCode::MapNotFound; } datatuple* oldRecordBody = get(id, recordName); if (oldRecordBody == NULL) { - return sherpa::ResponseCode::RecordNotFound; + return mapkeeper::ResponseCode::RecordNotFound; } datatuple::freetuple(oldRecordBody); datatuple* tup = buildTuple(id, recordName, recordBody); @@ -354,11 +362,11 @@ remove(const std::string& databaseName, const std::string& recordName) { uint32_t id = getDatabaseId(databaseName); if (id == 0) { - return sherpa::ResponseCode::DatabaseNotFound; + return mapkeeper::ResponseCode::MapNotFound; } datatuple* oldRecordBody = get(id, recordName); if (oldRecordBody == NULL) { - return sherpa::ResponseCode::RecordNotFound; + return mapkeeper::ResponseCode::RecordNotFound; } datatuple::freetuple(oldRecordBody); datatuple* tup = buildTuple(id, recordName); diff --git a/sherpa/LSMServerHandler.h b/sherpa/LSMServerHandler.h index c77fcdf..40061fc 100644 --- a/sherpa/LSMServerHandler.h +++ b/sherpa/LSMServerHandler.h @@ -1,4 +1,4 @@ -#include +#include "MapKeeper.h" #include #include #include @@ -7,22 +7,23 @@ using namespace ::apache::thrift; using namespace ::apache::thrift::protocol; using namespace ::apache::thrift::transport; -using namespace sherpa; +using namespace mapkeeper; using boost::shared_ptr; -class LSMServerHandler : virtual public PersistentStoreIf { +class LSMServerHandler : virtual public MapKeeperIf { public: LSMServerHandler(int argc, char **argv); ResponseCode::type ping(); ResponseCode::type shutdown(); - ResponseCode::type addDatabase(const std::string& databaseName); - ResponseCode::type dropDatabase(const std::string& databaseName); - void listDatabases(StringListResponse& _return); + ResponseCode::type addMap(const std::string& databaseName); + ResponseCode::type dropMap(const std::string& databaseName); + void listMaps(StringListResponse& _return); void scan(RecordListResponse& _return, const std::string& databaseName, const ScanOrder::type order, const std::string& startKey, const bool startKeyIncluded, const std::string& endKey, const bool endKeyIncluded, const int32_t maxRecords, const int32_t maxBytes); void get(BinaryResponse& _return, const std::string& databaseName, const std::string& recordName); + ResponseCode::type put(const std::string& databaseName, const std::string& recordName, const std::string& recordBody); ResponseCode::type insert(const std::string& databaseName, const std::string& recordName, const std::string& recordBody); ResponseCode::type insertMany(const std::string& databaseName, const std::vector & records); ResponseCode::type update(const std::string& databaseName, const std::string& recordName, const std::string& recordBody); diff --git a/sherpa/Makefile b/sherpa/Makefile index d4ce170..b38f328 100644 --- a/sherpa/Makefile +++ b/sherpa/Makefile @@ -10,7 +10,7 @@ CXXSRC = LSMServerHandler.cc LCLEAN += *~ WARN += -Werror -Wall -LINC += -I. -I /home/y/include/thrift -I /home/y/include64/stasis -I .. +LINC += -I. -I /usr/local/include/thrift -I /home/y/include64/stasis -I .. -I ../../thrift/gen-cpp LDLIBS += -L/usr/local/lib LDFLAGS += -Wl,-rpath,/home/y/lib64 -Wl,-rpath,../build @@ -19,7 +19,7 @@ LDFLAGS += -Wl,-rpath,/home/y/lib64 -Wl,-rpath,../build # Poor packaging for yicu LINC += -I/home/y/include/yicu -LDLIBS += -lthrift -ldht_persistent_store -lstasis -llogstore -L ../build +LDLIBS += -lthrift -lmapkeeper -lstasis -llogstore -L ../build -L ../../thrift/gen-cpp # Need to remove potential warnings in yapache. LDEF += -DEAPI diff --git a/sherpa/Makefile.local b/sherpa/Makefile.local index 48d19b3..822b48a 100644 --- a/sherpa/Makefile.local +++ b/sherpa/Makefile.local @@ -1,21 +1,28 @@ -CPPFLAGS =-I../../stasis -I.. -I. -I /usr/local/include/thrift -CXXFLAGS =-g -O0 -LDFLAGS=-lprofiler -ltcmalloc -lpthread -llogstore -lstasis -L ../build -L ../../stasis/build/src/stasis -lthrift -Wl,-rpath,../build,-rpath,../../stasis/build/src/stasis,-rpath,/usr/local/lib +THRIFT_DIR = /usr/local -THRIFT_SRC = dht_persistent_store/persistent_store_constants.cpp dht_persistent_store/persistent_store_types.cpp dht_persistent_store/PersistentStore.cpp +CPPFLAGS =-I../../stasis -I.. -I. -I $(THRIFT_DIR)/include/thrift -I../../mapkeeper/thrift/gen-cpp +CXXFLAGS =-g -O0 + +LDFLAGS=-lprofiler -ltcmalloc -lpthread -llogstore -lstasis -lmapkeeper -lthrift \ + -L $(THRIFT_DIR)/lib -L ../../mapkeeper/thrift/gen-cpp \ + -L ../build -L ../../stasis/build/src/stasis \ + -Wl,-rpath,\$$ORIGIN/../../build \ + -Wl,-rpath,\$$ORIGIN/../../../stasis/build/src/stasis \ + -Wl,-rpath,\$$ORIGIN/../../../mapkeeper/thrift/gen-cpp \ + -Wl,-rpath,/usr/local/lib \ + -Wl,-rpath,$(THRIFT_DIR)/lib all: main/lsm_client main/lsm_server main/lsm_shutdown dht_thrift.jar : cd ../thrift/gen-java; - bash -c 'javac \`find . -name *.java\` -cp /usr/local/lib/libthrift.jar:/usr/share/java/slf4j-api-1.5.11.jar' + bash -c 'javac \`find . -name *.java\` -cp $(THRIFT_DIR)/lib/libthrift.jar:/usr/share/java/slf4j-api-1.5.11.jar' cd ../../sherpa -main/lsm_client : $(THRIFT_SRC) +main/lsm_client : +main/lsm_shutdown : -main/lsm_shutdown : $(THRIFT_SRC) - -main/lsm_server : LSMServerHandler.cc $(THRIFT_SRC) +main/lsm_server : LSMServerHandler.cc clean: rm -f main/lsm_client main/lsm_server diff --git a/sherpa/dht_persistent_store b/sherpa/dht_persistent_store deleted file mode 120000 index cc0eaf7..0000000 --- a/sherpa/dht_persistent_store +++ /dev/null @@ -1 +0,0 @@ -../thrift/gen-cpp/ \ No newline at end of file diff --git a/sherpa/main/Makefile b/sherpa/main/Makefile index 1d0090a..f74727b 100644 --- a/sherpa/main/Makefile +++ b/sherpa/main/Makefile @@ -9,8 +9,8 @@ CXXSRC = $(addsuffix .cc, $(EXETARGET)) LCLEAN += *~ WARN += -Werror -Wall -LINC += -I../ -I /home/y/include/thrift -I /home/y/include/boost/tr1/ -I ../../ -LDLIBS += -L../ -ldht_persistent_store -lLSMServer +LINC += -I../ -I /usr/local/include/thrift -I /home/y/include/boost/tr1/ -I ../../ -I ../../../thrift/gen-cpp/ +LDLIBS += -L../ -lmapkeeper -lLSMServer -L ../../../thrift/gen-cpp LDFLAGS += -Wl,-rpath,/home/y/lib64 -Wl,-rpath,../../build ifdef UNIT_TEST diff --git a/sherpa/main/lsm_client.cc b/sherpa/main/lsm_client.cc index 3e8f7cb..d269475 100644 --- a/sherpa/main/lsm_client.cc +++ b/sherpa/main/lsm_client.cc @@ -1,4 +1,4 @@ -#include +#include "MapKeeper.h" #include #include #include @@ -14,14 +14,14 @@ int main(int argc, char **argv) { boost::shared_ptr transport(new TFramedTransport(socket)); boost::shared_ptr protocol(new TBinaryProtocol(transport)); - sherpa::PersistentStoreClient client(protocol); + mapkeeper::MapKeeperClient client(protocol); transport->open(); socket->setNoDelay(true); - sherpa::BinaryResponse getResponse; - sherpa::RecordListResponse scanResponse; + mapkeeper::BinaryResponse getResponse; + mapkeeper::RecordListResponse scanResponse; std::string db = "db"; std::string db1 = "db1"; - cout << client.addDatabase(db) << endl;; + cout << client.addMap(db) << endl;; cout << client.insert(db, "kkkkkkkkkkkkk1", "v1") << endl; cout << client.insert(db, "kkkkkkkkkkkkk2", "v2") << endl; cout << client.insert(db, "kkkkkkkkkkkkk3", "v3") << endl; @@ -58,9 +58,9 @@ int main(int argc, char **argv) { */ cout << "adding db one more time" << endl; - cout << client.addDatabase(db) << endl;; + cout << client.addMap(db) << endl;; - cout << client.addDatabase(db1) << endl;; + cout << client.addMap(db1) << endl;; cout << client.insert(db1, "new key", "new value") << endl; client.get(getResponse, db1, "new key"); cout << getResponse.responseCode << endl; @@ -75,33 +75,33 @@ int main(int argc, char **argv) { cout << getResponse.responseCode << endl; cout << client.remove(db1, "new key") << endl; - client.scan(scanResponse, db, sherpa::ScanOrder::Ascending, "", true, "", true, 100, 100); - std::vector::iterator itr; + client.scan(scanResponse, db, mapkeeper::ScanOrder::Ascending, "", true, "", true, 100, 100); + std::vector::iterator itr; for (itr = scanResponse.records.begin(); itr != scanResponse.records.end(); itr++) { cout << itr->key << " " << itr->value << endl; } std::cout << std::endl; - client.scan(scanResponse, db, sherpa::ScanOrder::Ascending, "kkkkkkkkkkkkk1", false, "kkkkkkkkkkkkk3", false, 100, 100); + client.scan(scanResponse, db, mapkeeper::ScanOrder::Ascending, "kkkkkkkkkkkkk1", false, "kkkkkkkkkkkkk3", false, 100, 100); for (itr = scanResponse.records.begin(); itr != scanResponse.records.end(); itr++) { cout << itr->key << " " << itr->value << endl; } std::cout << std::endl; - client.scan(scanResponse, db, sherpa::ScanOrder::Ascending, "kkkkkkkkkkkkk1", true, "kkkkkkkkkkkkk3", true, 100, 100); + client.scan(scanResponse, db, mapkeeper::ScanOrder::Ascending, "kkkkkkkkkkkkk1", true, "kkkkkkkkkkkkk3", true, 100, 100); for (itr = scanResponse.records.begin(); itr != scanResponse.records.end(); itr++) { cout << itr->key << " " << itr->value << endl; } std::cout << std::endl; - client.scan(scanResponse, db, sherpa::ScanOrder::Ascending, "kkkkkkkkkkkkk2", true, "kkkkkkkkkkkkk2", true, 100, 100); + client.scan(scanResponse, db, mapkeeper::ScanOrder::Ascending, "kkkkkkkkkkkkk2", true, "kkkkkkkkkkkkk2", true, 100, 100); for (itr = scanResponse.records.begin(); itr != scanResponse.records.end(); itr++) { cout << itr->key << " " << itr->value << endl; } std::cout << std::endl; - client.scan(scanResponse, db, sherpa::ScanOrder::Ascending, "k", true, "kkkkkkkkkkkkk4", true, 100, 100); + client.scan(scanResponse, db, mapkeeper::ScanOrder::Ascending, "k", true, "kkkkkkkkkkkkk4", true, 100, 100); for (itr = scanResponse.records.begin(); itr != scanResponse.records.end(); itr++) { cout << itr->key << " " << itr->value << endl; } diff --git a/sherpa/main/lsm_server.cc b/sherpa/main/lsm_server.cc index 633d86b..247b69e 100644 --- a/sherpa/main/lsm_server.cc +++ b/sherpa/main/lsm_server.cc @@ -1,11 +1,12 @@ -#include +#include "MapKeeper.h" #include #include #include +#include #include #include -#include -#include +#include +#include #include "logstore.h" #include "datatuple.h" #include "LSMServerHandler.h" @@ -17,19 +18,20 @@ using namespace ::apache::thrift::server; using namespace ::apache::thrift::concurrency; using boost::shared_ptr; -using namespace sherpa; +using namespace mapkeeper; int main(int argc, char **argv) { shared_ptr handler(new LSMServerHandler(argc, argv)); - shared_ptr processor(new PersistentStoreProcessor(handler)); + shared_ptr processor(new MapKeeperProcessor(handler)); shared_ptr serverTransport(new TServerSocket(9090)); shared_ptr transportFactory(new TFramedTransportFactory()); shared_ptr protocolFactory(new TBinaryProtocolFactory()); shared_ptr threadManager = ThreadManager::newSimpleThreadManager(32); shared_ptr threadFactory(new PosixThreadFactory()); - threadManager->threadFactory(threadFactory); - threadManager->start(); - TThreadPoolServer server(processor, serverTransport, transportFactory, protocolFactory, threadManager); + //threadManager->threadFactory(threadFactory); + //threadManager->start(); + TThreadedServer server(processor, serverTransport, transportFactory, protocolFactory); + printf("I'm using tthreaded server!"); server.serve(); return 0; } diff --git a/thrift/Makefile b/thrift/Makefile deleted file mode 100644 index f248910..0000000 --- a/thrift/Makefile +++ /dev/null @@ -1,6 +0,0 @@ - -all:: - thrift --gen cpp --gen java persistent_store.thrift - -clean:: - rm -rf gen-cpp gen-java diff --git a/thrift/build.xml b/thrift/build.xml deleted file mode 100644 index f8b68f7..0000000 --- a/thrift/build.xml +++ /dev/null @@ -1,42 +0,0 @@ - - - simple example build file - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/thrift/persistent_store.thrift b/thrift/persistent_store.thrift deleted file mode 100644 index d5f4b39..0000000 --- a/thrift/persistent_store.thrift +++ /dev/null @@ -1,224 +0,0 @@ -/* - * Defines Sherpa persistent store interface. - * - * For a good tutorial on how to write a .thrift file, see: - * - * http://wiki.apache.org/thrift/Tutorial - */ -namespace cpp sherpa -namespace java com.yahoo.sherpa - -enum ResponseCode -{ - Ok = 0, - Error, - DatabaseExists, - DatabaseNotFound, - RecordExists, - RecordNotFound, - ScanEnded, -} - -enum ScanOrder -{ - Ascending, - Descending, -} - -struct Record -{ - 1:binary key, - 2:binary value, -} - -struct RecordListResponse -{ - 1:ResponseCode responseCode, - 2:list records, -} - -struct BinaryResponse -{ - 1:ResponseCode responseCode, - 2:binary value, -} - -struct StringListResponse -{ - 1:ResponseCode responseCode, - 2:list values, -} - -/** - * Note about database name: - * Thrift string type translates to std::string in C++ and String in - * Java. Thrift does not validate whether the string is in utf8 in C++, - * but it does in Java. If you are using this class in C++, you need to - * make sure the database name is in utf8. Otherwise requests will fail. - */ -service PersistentStore -{ - /** - * Pings this persistent store. - * - * @return Ok - if ping was successful. - * Error - if ping failed. - */ - ResponseCode ping(), - - /** - * Cleanly shuts down the persistent store. - * - * @return Ok - if shutdown is in progress. - * Error - otherwise - */ - ResponseCode shutdown(), - - /** - * Add a new database to this persistent store. - * - * A database is a container for a collection of records. - * A record is a binary key / binary value pair. - * A key uniquely identifies a record in a database. - * - * @param databaseName database name - * @return Ok - on success. - * DatabaseExists - database already exists. - * Error - on any other errors. - */ - ResponseCode addDatabase(1:string databaseName), - - /** - * Drops a database from this persistent store. - * - * @param databaseName database name - * @return Ok - on success. - * DatabaseNotFound - database doesn't exist. - * Error - on any other errors. - */ - ResponseCode dropDatabase(1:string databaseName), - - /** - * List databases in this persistent store. - * - * @returns StringListResponse - * responseCode Ok - on success. - * Error - on error. - * values - list of databases. - */ - StringListResponse listDatabases(), - - /** - * Returns records in a database in lexicographical order. - * - * Note that startKey is supposed to be smaller than or equal to the endKey - * regardress of the scan order. For example, to scan all the records from - * "apple" to "banana" in descending order, startKey is "apple" and endKey - * is "banana". If startKey is larger than endKey, scan will succeed and - * result will be empty. - * - * This method will return ScanEnded if the scan was successful and it reached - * the end of the key range. It'll return Ok if it reached maxRecords or - * maxBytes, but it didn't reach the end of the key range. - * - * @param databaseName database name - * @param order Ascending or Decending. - * @param startKey Key to start scan from. If it's empty, scan starts - * from the smallest key in the database. - * @param startKeyIncluded - * Indicates whether the record that matches startKey is - * included in the response. - * @param endKey Key to end scan at. If it's emty scan ends at the largest - * key in the database. - * @param endKeyIncluded - * Indicates whether the record that matches endKey is - * included in the response. - * @param maxRecords - * Scan will return at most $maxRecords records. - * @param maxBytes Advise scan to return at most $maxBytes bytes. This - * method is not required to strictly keep the response - * size less than $maxBytes bytes. - * @return RecordListResponse - * responseCode - Ok if the scan was successful - * - ScanEnded if the scan was successful and - * scan reached the end of the range. - * - DatabaseNotFound database doesn't exist. - * - Error on any other errors - * records - list of records. - */ - RecordListResponse scan(1:string databaseName, - 2:ScanOrder order, - 3:binary startKey, - 4:bool startKeyIncluded, - 5:binary endKey, - 6:bool endKeyIncluded, - 7:i32 maxRecords, - 8:i32 maxBytes), - - /** - * Retrieves a record from a database. - * - * @param databaseName database name - * @param recordKey record to retrive. - * @returns RecordListResponse - * responseCode - Ok - * DatabaseNotFound database doesn't exist. - * RecordNotFound record doesn't exist. - * Error on any other errors. - * records - list of records - */ - BinaryResponse get(1:string databaseName, 2:binary recordKey), - - /** - * Inserts a record into a database. - * - * @param databaseName database name - * @param recordKey record key to insert - * @param recordValue record value to insert - * @returns Ok - * DatabaseNotFound database doesn't exist. - * RecordExists - * Error - */ - ResponseCode insert(1:string databaseName, 2:binary recordKey, 3:binary recordValue), - - /** - * Inserts multiple records into a database. - * - * This operation is atomic: either all the records get inserted into a database - * or none does. - * - * @param databaseName database name - * @param records list of records to insert - * @returns Ok - * DatabaseNotFound - * RecordExists - if a record already exists in a database - * Error - */ - ResponseCode insertMany(1:string databaseName, 2:list records), - - /** - * Updates a record in a database. - * - * @param databaseName database name - * @param recordKey record key to update - * @param recordValue new value for the record - * @returns Ok - * DatabaseNotFound database doesn't exist. - * RecordNotFound - * Error - */ - ResponseCode update(1:string databaseName, 2:binary recordKey, 3:binary recordValue), - - /** - * Removes a record from a database. - * - * @param databaseName database name - * @param recordKey record to remove from the database. - * @returns Ok - * DatabaseNotFound database doesn't exist. - * RecordNotFound - * Error - */ - ResponseCode remove(1:string databaseName, 2:binary recordKey), -}