From 5291a25dceb9270d405930b5862bee3c400b9ab5 Mon Sep 17 00:00:00 2001 From: michim Date: Thu, 12 May 2011 20:41:18 +0000 Subject: [PATCH] added packages and conf directories. added update and scan operations. git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@2546 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe --- conf/dht_lsm_server.ini | 10 +++ packages/Makefile | 12 +++ packages/dht_lsm_server.run | 9 +++ packages/dht_lsm_server.yicf | 42 ++++++++++ packages/dht_lsm_server_log.run | 2 + sherpa/LSMServerHandler.cc | 139 ++++++++++++++++++++++---------- sherpa/LSMServerHandler.h | 3 + sherpa/main/lsm_client.cc | 79 +++++++++++++++--- 8 files changed, 241 insertions(+), 55 deletions(-) create mode 100644 conf/dht_lsm_server.ini create mode 100644 packages/Makefile create mode 100755 packages/dht_lsm_server.run create mode 100644 packages/dht_lsm_server.yicf create mode 100755 packages/dht_lsm_server_log.run diff --git a/conf/dht_lsm_server.ini b/conf/dht_lsm_server.ini new file mode 100644 index 0000000..f3f2b9c --- /dev/null +++ b/conf/dht_lsm_server.ini @@ -0,0 +1,10 @@ +env_dir=/home/y/var/dht_bdb_server/data +port=9090 +num_threads=$(num_threads) +num_partitions=$(num_partitions) +num_retries=100 +key_buffer_size_bytes=2048 +value_buffer_size_bytes=1150976 + +checkpoint_frequency_msec=1000 +checkpoint_min_change_kb=10 diff --git a/packages/Makefile b/packages/Makefile new file mode 100644 index 0000000..593000b --- /dev/null +++ b/packages/Makefile @@ -0,0 +1,12 @@ +ROOT=/home/y +include $(ROOT)/share/yahoo_cfg/Make.defs + +# Just to pick up "clean" and "package" rules + +all:: package-release + +test:: package-test + +clean:: package-clean + +include $(ROOT)/share/yahoo_cfg/Make.rules diff --git a/packages/dht_lsm_server.run b/packages/dht_lsm_server.run new file mode 100755 index 0000000..fe1239f --- /dev/null +++ b/packages/dht_lsm_server.run @@ -0,0 +1,9 @@ +#!/bin/sh +# +# This is the 'run' script used by daemontools to start dht_lsm_server. +# See the dist page for more details about daemontools. +# +# http://dist.corp.yahoo.com/by-package/daemontools_y/ +# +cd /home/y/var/dht_lsm_server/data +exec /home/y/bin/setuidgid nobody /home/y/bin64/lsm_server 2>&1 diff --git a/packages/dht_lsm_server.yicf b/packages/dht_lsm_server.yicf new file mode 100644 index 0000000..6bdab36 --- /dev/null +++ b/packages/dht_lsm_server.yicf @@ -0,0 +1,42 @@ +PRODUCT_NAME = dht_lsm_server +VERSION=1.0.0 +SHORT_DESC = LSM-Tree +LONG_DESC = LSM-Tree persistent store for sherpa +CUSTODIAN = ydht-devel@yahoo-inc.com http://twiki.corp.yahoo.com/view/YDHT + +OWNER = root +GROUP = wheel +PERM = 0444 + +YINST bug-product dht +YINST bug-component General + +YINST requires pkg daemontools_y ROOT ROOT_MAX +YINST requires pkg dht_persistent_store64 ROOT ROOT_MAX + +dir 0777 - - logs/dht_lsm_server +dir 0755 - - var/daemontools/dht_lsm_server +dir 0755 - - var/daemontools/dht_lsm_server/log +dir 0777 - - conf/dht_lsm_server +dir 0777 - - var/dht_lsm_server/data + +file 0555 - - bin64/ ../sherpa/main/lsm_server + +file 0444 - - lib64/liblogstore.so.1 ../build/liblogstore.so +symlink 0444 - - lib64/liblogstore.so liblogstore.so.1 + +file 0444 - - lib64/ ../sherpa/libLSMServer.so.1 +symlink 0444 - - lib64/libLSMServer.so libLSMServer.so.1 + +file 0555 - - var/daemontools/dht_lsm_server/run dht_lsm_server.run +file 0555 - - var/daemontools/dht_lsm_server/log/run dht_lsm_server_log.run + +configfile 0644 - - conf/dht_lsm_server/dht_lsm_server.ini ../conf/dht_lsm_server.ini expand overwrite + +YINST set log_level INFO +YINST set cache_size_gb 4 +YINST set num_partitions 32 +YINST set num_threads 32 +YINST start 30 $ROOT/bin/svcstart dht_lsm_server +YINST stop 30 $ROOT/bin/svcstop dht_lsm_server +YINST pre-deactivate $ROOT/bin/svcstop dht_lsm_server diff --git a/packages/dht_lsm_server_log.run b/packages/dht_lsm_server_log.run new file mode 100755 index 0000000..c19233d --- /dev/null +++ b/packages/dht_lsm_server_log.run @@ -0,0 +1,2 @@ +#!/bin/sh +exec /home/y/bin/setuidgid nobody /usr/sbin/cronolog -l /home/y/logs/dht_lsm_server/stdout.log "/home/y/logs/dht_lsm_server/stdout.log.%Y%m%d" diff --git a/sherpa/LSMServerHandler.cc b/sherpa/LSMServerHandler.cc index f286ce9..7d9835e 100644 --- a/sherpa/LSMServerHandler.cc +++ b/sherpa/LSMServerHandler.cc @@ -49,6 +49,7 @@ LSMServerHandler(int argc, char **argv) } } + pthread_mutex_init(&mutex_, 0); logtable::init_stasis(); int xid = Tbegin(); @@ -85,7 +86,43 @@ LSMServerHandler(int argc, char **argv) */ } //logtable::deinit_stasis(); + initNextDatabaseId(); +} + +void LSMServerHandler:: +initNextDatabaseId() +{ nextDatabaseId_ = 1; + uint32_t id = 0; + datatuple* start = buildTuple(id, ""); + datatuple* end = buildTuple(id + 1, ""); + logtable::iterator* itr = new logtable::iterator(ltable_, start); + datatuple* current; + while ((current = itr->getnext())) { + // are we at the end of range? + if (datatuple::compare_obj(current, end) >= 0) { + datatuple::freetuple(current); + break; + } + uint32_t currentId = *((uint32_t*)(current->data())); + if (currentId > nextDatabaseId_) { + nextDatabaseId_ = currentId; + } + datatuple::freetuple(current); + } + nextDatabaseId_++; + delete itr; +} + +uint32_t LSMServerHandler:: +nextDatabaseId() +{ + uint32_t id; + pthread_mutex_lock(&mutex_); + nextDatabaseId_++; + id = nextDatabaseId_; + pthread_mutex_unlock(&mutex_); + return id; } ResponseCode::type LSMServerHandler:: @@ -104,8 +141,13 @@ insert(datatuple* tuple) ResponseCode::type LSMServerHandler:: addDatabase(const std::string& databaseName) { - - datatuple* tup = buildTuple(0, databaseName, (void*)&nextDatabaseId_, (uint32_t)(sizeof(nextDatabaseId_))); + uint32_t id = nextDatabaseId(); + datatuple* tup = buildTuple(0, databaseName, (void*)&id, (uint32_t)(sizeof(id))); + datatuple* ret = get(tup); + if (ret) { + datatuple::freetuple(ret); + return sherpa::ResponseCode::DatabaseExists; + } return insert(tup); } @@ -141,44 +183,62 @@ scan(RecordListResponse& _return, const std::string& databaseName, const ScanOrd const std::string& endKey, const bool endKeyIncluded, const int32_t maxRecords, const int32_t maxBytes) { -#if 0 - BdbIterator itr; - uint32_t id; - _return.responseCode = getDatabaseId(databaseName, id); - if (_return.responseCode != sherpa::ResponseCode::Ok) { + uint32_t id = getDatabaseId(databaseName); + if (id == 0) { + // database not found + _return.responseCode = sherpa::ResponseCode::DatabaseNotFound; return; } - boost::thread_specific_ptr buffer; - if (buffer.get() == NULL) { - buffer.reset(new RecordBuffer(keyBufferSizeBytes_, valueBufferSizeBytes_)); - } + + datatuple* start = buildTuple(id, startKey); + datatuple* end; if (endKey.empty()) { - insertDatabaseId(const_cast(endKey), id + 1); + end = buildTuple(id + 1, endKey); } else { - insertDatabaseId(const_cast(endKey), id); + end = buildTuple(id, endKey); } - insertDatabaseId(const_cast(startKey), id); - itr.init(db_[id % numPartitions_], const_cast(startKey), startKeyIncluded, const_cast(endKey), endKeyIncluded, order, *buffer); + logtable::iterator* itr = new logtable::iterator(ltable_, start); int32_t resultSize = 0; - _return.responseCode = sherpa::ResponseCode::Ok; + while ((maxRecords == 0 || (int32_t)(_return.records.size()) < maxRecords) && (maxBytes == 0 || resultSize < maxBytes)) { - BdbIterator::ResponseCode rc = itr.next(*buffer); - if (rc == BdbIterator::ScanEnded) { + datatuple* current = itr->getnext(); + if (current == NULL) { _return.responseCode = sherpa::ResponseCode::ScanEnded; break; - } else if (rc != BdbIterator::Ok) { - _return.responseCode = sherpa::ResponseCode::Error; - break; } + + int cmp = datatuple::compare_obj(current, start); + std::cout << "start key: (" << std::string((const char*)(start->strippedkey()), start->strippedkeylen()) << ")" << std::endl; + std::cout << "current key: (" << std::string((const char*)(current->strippedkey()), current->strippedkeylen()) << ")" << std::endl; + std::cout << "start key: " << startKey << " cmp: " << cmp << std::endl; + if ((!startKeyIncluded) && cmp == 0) { + datatuple::freetuple(current); + continue; + } + + // are we at the end of range? + cmp = datatuple::compare_obj(current, end); + std::cout << "end key: " << endKey << " cmp: " << cmp << std::endl; + if ((!endKeyIncluded && cmp >= 0) || + (endKeyIncluded && cmp > 0)) { + datatuple::freetuple(current); + _return.responseCode = sherpa::ResponseCode::ScanEnded; + break; + } + Record rec; - rec.key.assign(buffer->getKeyBuffer() + sizeof(id), buffer->getKeySize() - sizeof(id)); - rec.value.assign(buffer->getValueBuffer(), buffer->getValueSize()); + int32_t keySize = current->strippedkeylen() - sizeof(id); + int32_t dataSize = current->datalen(); + + rec.key.assign((char*)(current->strippedkey()) + sizeof(id), keySize); + rec.value.assign((char*)(current->data()), dataSize); _return.records.push_back(rec); - resultSize += buffer->getKeySize() + buffer->getValueSize(); - } -#endif + resultSize += keySize + dataSize; + datatuple::freetuple(current); + } + delete itr; } datatuple* LSMServerHandler:: @@ -288,24 +348,19 @@ update(const std::string& databaseName, const std::string& recordName, const std::string& recordBody) { -/* - uint32_t id; - ResponseCode::type rc = getDatabaseId(databaseName, id); - if (rc != sherpa::ResponseCode::Ok) { - return rc; + uint32_t id = getDatabaseId(databaseName); + std::cout << "michim: enter update" << std::endl; + if (id == 0) { + return sherpa::ResponseCode::DatabaseNotFound; } - - insertDatabaseId(const_cast(recordName), id); - Bdb::ResponseCode dbrc = db_[id % numPartitions_]->update(recordName, recordBody); - if (dbrc == Bdb::Ok) { - return sherpa::ResponseCode::Ok; - } else if (dbrc == Bdb::KeyNotFound) { + datatuple* oldRecordBody = get(id, recordName); + if (oldRecordBody == NULL) { return sherpa::ResponseCode::RecordNotFound; - } else { - return sherpa::ResponseCode::Error; } - */ - return sherpa::ResponseCode::Error; + std::cout << "michim: updating record" << std::endl; + datatuple::freetuple(oldRecordBody); + datatuple* tup = buildTuple(id, recordName, recordBody); + return insert(tup); } ResponseCode::type LSMServerHandler:: @@ -365,6 +420,6 @@ buildTuple(uint32_t databaseId, const std::string& recordName, const void* body, *(uint32_t*)key = databaseId; memcpy(((uint32_t*)key) + 1, recordName.c_str(), recordName.size()); datatuple *tup = datatuple::create(key, keySize, body, bodySize); + std::cout << "built tuple: key: (" << std::string((const char*)(tup->strippedkey()), tup->strippedkeylen()) << ")" << std::endl; return tup; - } diff --git a/sherpa/LSMServerHandler.h b/sherpa/LSMServerHandler.h index fda81fc..772d91d 100644 --- a/sherpa/LSMServerHandler.h +++ b/sherpa/LSMServerHandler.h @@ -30,11 +30,14 @@ public: private: ResponseCode::type insert(datatuple* tuple); uint32_t getDatabaseId(const std::string& databaseName); + uint32_t nextDatabaseId(); datatuple* get(uint32_t databaseId, const std::string& recordName); datatuple* get(datatuple* tuple); datatuple* buildTuple(uint32_t databaseId, const std::string& recordName); datatuple* buildTuple(uint32_t databaseId, const std::string& recordName, const std::string& recordBody); datatuple* buildTuple(uint32_t databaseId, const std::string& recordName, const void* body, uint32_t bodySize); + void initNextDatabaseId(); logtable* ltable_; uint32_t nextDatabaseId_; + pthread_mutex_t mutex_; }; diff --git a/sherpa/main/lsm_client.cc b/sherpa/main/lsm_client.cc index 5d19342..3f1ae38 100644 --- a/sherpa/main/lsm_client.cc +++ b/sherpa/main/lsm_client.cc @@ -19,44 +19,97 @@ int main(int argc, char **argv) { socket->setNoDelay(true); sherpa::BinaryResponse getResponse; sherpa::RecordListResponse scanResponse; - - cout << client.addDatabase("michi0") << endl;; - cout << client.insert("michi0", "k1", "v1") << endl; - cout << client.insert("michi0", "k2", "v1") << endl; - cout << client.insert("michi0", "k3", "v1") << endl; - client.get(getResponse, "michi0", "k1"); + std::string db = "db"; + std::string db1 = "db1"; + cout << client.addDatabase(db) << endl;; + cout << client.insert(db, "kkkkkkkkkkkkk1", "v1") << endl; + cout << client.insert(db, "kkkkkkkkkkkkk2", "v2") << endl; + cout << client.insert(db, "kkkkkkkkkkkkk3", "v3") << endl; + client.get(getResponse, db, "kkkkkkkkkkkkk1"); cout << getResponse.responseCode << endl; cout << getResponse.value << endl; - client.get(getResponse, "michi0", "k2"); + client.get(getResponse, db, "kkkkkkkkkkkkk2"); cout << getResponse.responseCode << endl; cout << getResponse.value << endl; - client.get(getResponse, "michi0", "k3"); + client.get(getResponse, db, "kkkkkkkkkkkkk3"); cout << getResponse.responseCode << endl; cout << getResponse.value << endl; - client.get(getResponse, "fdsafdasfdasfdsaf", "k3"); + cout << client.update(db, "k0", "v11") << endl; + cout << client.update(db, "kkkkkkkkkkkkk1", "v11") << endl; + cout << client.update(db, "kkkkkkkkkkkkk2", "v12") << endl; + cout << client.update(db, "kkkkkkkkkkkkk3", "v13") << endl; + client.get(getResponse, db, "kkkkkkkkkkkkk1"); + cout << getResponse.responseCode << endl; + cout << getResponse.value << endl; + client.get(getResponse, db, "kkkkkkkkkkkkk2"); + cout << getResponse.responseCode << endl; + cout << getResponse.value << endl; + client.get(getResponse, db, "kkkkkkkkkkkkk3"); + cout << getResponse.responseCode << endl; + cout << getResponse.value << endl; + +/* + client.get(getResponse, "fdsafdasfdasfdsaf", "kkkkkkkkkkkkk3"); cout << getResponse.responseCode << endl; - client.get(getResponse, "michi0", "k4"); + client.get(getResponse, db, "k4"); cout << getResponse.responseCode << endl; - return 0; + */ - client.scan(scanResponse, "michi", sherpa::ScanOrder::Ascending, "", true, "", true, 100, 100); + client.scan(scanResponse, db, sherpa::ScanOrder::Ascending, "", true, "", true, 100, 100); std::vector::iterator itr; for (itr = scanResponse.records.begin(); itr != scanResponse.records.end(); itr++) { cout << itr->key << " " << itr->value << endl; } + std::cout << std::endl; - client.scan(scanResponse, "michi", sherpa::ScanOrder::Descending, "", true, "", true, 100, 100); + + client.scan(scanResponse, db, sherpa::ScanOrder::Ascending, "kkkkkkkkkkkkk1", false, "kkkkkkkkkkkkk3", false, 100, 100); for (itr = scanResponse.records.begin(); itr != scanResponse.records.end(); itr++) { cout << itr->key << " " << itr->value << endl; } + std::cout << std::endl; + + client.scan(scanResponse, db, sherpa::ScanOrder::Ascending, "kkkkkkkkkkkkk1", true, "kkkkkkkkkkkkk3", true, 100, 100); + for (itr = scanResponse.records.begin(); itr != scanResponse.records.end(); itr++) { + cout << itr->key << " " << itr->value << endl; + } + std::cout << std::endl; + + client.scan(scanResponse, db, sherpa::ScanOrder::Ascending, "kkkkkkkkkkkkk2", true, "kkkkkkkkkkkkk2", true, 100, 100); + for (itr = scanResponse.records.begin(); itr != scanResponse.records.end(); itr++) { + cout << itr->key << " " << itr->value << endl; + } + std::cout << std::endl; + + client.scan(scanResponse, db, sherpa::ScanOrder::Ascending, "k", true, "kkkkkkkkkkkkk4", true, 100, 100); + for (itr = scanResponse.records.begin(); itr != scanResponse.records.end(); itr++) { + cout << itr->key << " " << itr->value << endl; + } + std::cout << std::endl; + cout << "adding db one more time" << endl; + cout << client.addDatabase(db) << endl;; + + cout << client.addDatabase(db1) << endl;; + cout << client.insert(db1, "new key", "new value") << endl; + client.get(getResponse, db1, "new key"); + cout << getResponse.responseCode << endl; + cout << getResponse.value << endl; + + client.get(getResponse, db, "new key"); + cout << getResponse.responseCode << endl; + cout << getResponse.value << endl; + return 0; + + /* cout << client.remove("michi", "k1") << endl; client.get(getResponse, "michi", "k1"); cout << getResponse.responseCode << endl; transport->close(); + */ return 0; }