From 630c16d12ecef54689174ad682ae2b790ee84cea Mon Sep 17 00:00:00 2001 From: michim Date: Wed, 11 May 2011 22:27:48 +0000 Subject: [PATCH] moving to thrift. git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@2543 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe --- CMakeLists.txt | 9 +- sherpa/LSMServerHandler.cc | 313 +++++++++++++++++++++++++++++++++++++ sherpa/LSMServerHandler.h | 33 ++++ sherpa/Makefile | 50 ++++-- sherpa/main/Makefile | 26 +++ sherpa/main/lsm_client.cc | 48 ++++++ sherpa/main/lsm_server.cc | 33 ++++ 7 files changed, 493 insertions(+), 19 deletions(-) create mode 100644 sherpa/LSMServerHandler.cc create mode 100644 sherpa/LSMServerHandler.h create mode 100644 sherpa/main/Makefile create mode 100644 sherpa/main/lsm_client.cc create mode 100644 sherpa/main/lsm_server.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index aae2135..00a21de 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -46,11 +46,16 @@ ENDMACRO(CREATE_CLIENT_EXECUTABLE NAME) INCLUDE_DIRECTORIES(${CMAKE_CURRENT_SOURCE_DIR}/../stasis/ ${CMAKE_CURRENT_SOURCE_DIR}/../stasis/src/ ${CMAKE_CURRENT_SOURCE_DIR} + /home/y/include64 /usr/include) # set linker path for this and all subdirs -LINK_DIRECTORIES(${CMAKE_CURRENT_SOURCE_DIR}/../stasis/build/src/stasis ${CMAKE_CURRENT_BINARY_DIR}) +LINK_DIRECTORIES(${CMAKE_CURRENT_SOURCE_DIR}/../stasis/build/src/stasis + ${CMAKE_CURRENT_BINARY_DIR} + /home/y/lib64) -FIND_LIBRARY(HAVE_STASIS NAMES stasis PATHS ${CMAKE_CURRENT_SOURCE_DIR}/../stasis/build/src/stasis) +FIND_LIBRARY(HAVE_STASIS NAMES stasis PATHS + ${CMAKE_CURRENT_SOURCE_DIR}/../stasis/build/src/stasis + /home/y/lib64) if(NOT HAVE_STASIS) message(STATUS "stasis not found; only building client library") endif(NOT HAVE_STASIS) diff --git a/sherpa/LSMServerHandler.cc b/sherpa/LSMServerHandler.cc new file mode 100644 index 0000000..4f7e2c7 --- /dev/null +++ b/sherpa/LSMServerHandler.cc @@ -0,0 +1,313 @@ +#include +#include +#undef end +#undef try +#undef begin + +#include +#include "merger.h" +#include "logstore.h" +#include "LSMServerHandler.h" + +LSMServerHandler:: +LSMServerHandler(int argc, char **argv) +{ + signal(SIGPIPE, SIG_IGN); + + // how big the in-memory tree should be (512MB). + int64_t c0_size = 1024 * 1024 * 512 * 1; + + // write-ahead log + // 1 -> sync on each commit + // 2 -> sync on each 2 commits + // ... + int log_mode = 0; // do not log by default. + + int64_t expiry_delta = 0; // do not gc by default + + stasis_buffer_manager_size = 1 * 1024 * 1024 * 1024 / PAGE_SIZE; // 1.5GB total + + for(int i = 1; i < argc; i++) { + if(!strcmp(argv[i], "--test")) { + stasis_buffer_manager_size = 3 * 1024 * 1024 * 128 / PAGE_SIZE; // 228MB total + c0_size = 1024 * 1024 * 100; + printf("warning: running w/ tiny c0 for testing\n"); // XXX build a separate test server and deployment server? + } else if(!strcmp(argv[i], "--benchmark")) { + stasis_buffer_manager_size = (1024LL * 1024LL * 1024LL * 2LL) / PAGE_SIZE; // 4GB total + c0_size = 1024LL * 1024LL * 1024LL * 2LL; + printf("note: running w/ 2GB c0 for benchmarking\n"); // XXX build a separate test server and deployment server? + } else if(!strcmp(argv[i], "--log-mode")) { + i++; + log_mode = atoi(argv[i]); + } else if(!strcmp(argv[i], "--expiry-delta")) { + i++; + expiry_delta = atoi(argv[i]); + } else { + fprintf(stderr, "Usage: %s [--test|--benchmark] [--log-mode ] [--expiry-delta ]", argv[0]); + abort(); + } + } + + logtable::init_stasis(); + + int xid = Tbegin(); + + + recordid table_root = ROOT_RECORD; + { + ltable_ = new logtable(log_mode, c0_size); + ltable_->expiry = expiry_delta; + + if(TrecordType(xid, ROOT_RECORD) == INVALID_SLOT) { + printf("Creating empty logstore\n"); + table_root = ltable_->allocTable(xid); + assert(table_root.page == ROOT_RECORD.page && + table_root.slot == ROOT_RECORD.slot); + } else { + printf("Opened existing logstore\n"); + table_root.size = TrecordSize(xid, ROOT_RECORD); + ltable_->openTable(xid, table_root); + } + + Tcommit(xid); + merge_scheduler * mscheduler = new merge_scheduler(ltable_); + mscheduler->start(); + ltable_->replayLog(); + +/* + printf("Stopping merge threads...\n"); + mscheduler->shutdown(); + delete mscheduler; + + printf("Deinitializing stasis...\n"); + fflush(stdout); + */ + } + //logtable::deinit_stasis(); +} + +ResponseCode::type LSMServerHandler:: +ping() +{ + return sherpa::ResponseCode::Ok; +} + +ResponseCode::type LSMServerHandler:: +addDatabase(const std::string& databaseName) +{ +#if 0 + LOG_DEBUG(__FUNCTION__); + uint32_t databaseId; + seq_.get(databaseId); + databaseId = databaseId; + std::stringstream out; + out << databaseId; + LOG_DEBUG("ID for tablet " << databaseName<< ": " << databaseId); + Bdb::ResponseCode rc = databaseIds_.insert(databaseName, out.str()); + if (rc == Bdb::KeyExists) { + LOG_DEBUG("Database " << databaseName << " already exists"); + return sherpa::ResponseCode::DatabaseExists; + } +#endif + return sherpa::ResponseCode::Ok; +} + +/** + * TODO: + * Don't just remove database from databaseIds. You need to delete + * all the records! + */ +ResponseCode::type LSMServerHandler:: +dropDatabase(const std::string& databaseName) +{ +#if 0 + Bdb::ResponseCode rc = databaseIds_.remove(databaseName); + if (rc == Bdb::KeyNotFound) { + return sherpa::ResponseCode::DatabaseNotFound; + } else if (rc != Bdb::Ok) { + return sherpa::ResponseCode::Error; + } else { + return sherpa::ResponseCode::Ok; + } +#endif + return sherpa::ResponseCode::Ok; +} + +void LSMServerHandler:: +listDatabases(StringListResponse& _return) +{ +} + +void LSMServerHandler:: +scan(RecordListResponse& _return, const std::string& databaseName, const ScanOrder::type order, + const std::string& startKey, const bool startKeyIncluded, + const std::string& endKey, const bool endKeyIncluded, + const int32_t maxRecords, const int32_t maxBytes) +{ +#if 0 + BdbIterator itr; + uint32_t id; + _return.responseCode = getDatabaseId(databaseName, id); + if (_return.responseCode != sherpa::ResponseCode::Ok) { + return; + } + boost::thread_specific_ptr buffer; + if (buffer.get() == NULL) { + buffer.reset(new RecordBuffer(keyBufferSizeBytes_, valueBufferSizeBytes_)); + } + if (endKey.empty()) { + insertDatabaseId(const_cast(endKey), id + 1); + } else { + insertDatabaseId(const_cast(endKey), id); + } + insertDatabaseId(const_cast(startKey), id); + itr.init(db_[id % numPartitions_], const_cast(startKey), startKeyIncluded, const_cast(endKey), endKeyIncluded, order, *buffer); + + int32_t resultSize = 0; + _return.responseCode = sherpa::ResponseCode::Ok; + while ((maxRecords == 0 || (int32_t)(_return.records.size()) < maxRecords) && + (maxBytes == 0 || resultSize < maxBytes)) { + BdbIterator::ResponseCode rc = itr.next(*buffer); + if (rc == BdbIterator::ScanEnded) { + _return.responseCode = sherpa::ResponseCode::ScanEnded; + break; + } else if (rc != BdbIterator::Ok) { + _return.responseCode = sherpa::ResponseCode::Error; + break; + } + Record rec; + rec.key.assign(buffer->getKeyBuffer() + sizeof(id), buffer->getKeySize() - sizeof(id)); + rec.value.assign(buffer->getValueBuffer(), buffer->getValueSize()); + _return.records.push_back(rec); + resultSize += buffer->getKeySize() + buffer->getValueSize(); + } +#endif +} + +void LSMServerHandler:: +get(BinaryResponse& _return, const std::string& databaseName, const std::string& recordName) +{ +#if 0 + LOG_DEBUG(__FUNCTION__); + uint32_t id; + _return.responseCode = getDatabaseId(databaseName, id); + if (_return.responseCode != sherpa::ResponseCode::Ok) { + return; + } + insertDatabaseId(const_cast(recordName), id); + + boost::thread_specific_ptr buffer; + if (buffer.get() == NULL) { + buffer.reset(new RecordBuffer(keyBufferSizeBytes_, valueBufferSizeBytes_)); + } + Bdb::ResponseCode dbrc = db_[id % numPartitions_]->get(recordName, _return.value, *buffer); + if (dbrc == Bdb::Ok) { + _return.responseCode = sherpa::ResponseCode::Ok; + } else if (dbrc == Bdb::KeyNotFound) { + _return.responseCode = sherpa::ResponseCode::RecordNotFound; + } else { + _return.responseCode = sherpa::ResponseCode::Error; + } +#endif + _return.responseCode = sherpa::ResponseCode::Error; +} + +/* +ResponseCode::type LSMServerHandler:: +getDatabaseId(const std::string& databaseName, uint32_t& id) +{ + std::string idString; + boost::thread_specific_ptr buffer; + if (buffer.get() == NULL) { + buffer.reset(new RecordBuffer(keyBufferSizeBytes_, valueBufferSizeBytes_)); + } + Bdb::ResponseCode rc = databaseIds_.get(databaseName, idString, *buffer); + if (rc == Bdb::KeyNotFound) { + return sherpa::ResponseCode::DatabaseNotFound; + } + std::istringstream iss(idString); + if ((iss >> id).fail()) { + return sherpa::ResponseCode::Error; + } + LOG_DEBUG("database id for " << databaseName << "=" << id); + return sherpa::ResponseCode::Ok; +} +*/ + +ResponseCode::type LSMServerHandler:: +insert(const std::string& databaseName, + const std::string& recordName, + const std::string& recordBody) +{ +/* + LOG_DEBUG(__FUNCTION__); + uint32_t id; + ResponseCode::type rc = getDatabaseId(databaseName, id); + if (rc != sherpa::ResponseCode::Ok) { + return rc; + } + insertDatabaseId(const_cast(recordName), id); + LOG_DEBUG("id=" << id); + LOG_DEBUG("numPartitions=" << numPartitions_); + LOG_DEBUG("id % numPartitions=" << id % numPartitions_); + Bdb::ResponseCode dbrc = db_[id % numPartitions_]->insert(recordName, recordBody); + if (dbrc == Bdb::KeyExists) { + return sherpa::ResponseCode::RecordExists; + } else if (dbrc != Bdb::Ok) { + return sherpa::ResponseCode::Error; + } + */ + return sherpa::ResponseCode::Ok; +} + +ResponseCode::type LSMServerHandler:: +insertMany(const std::string& databaseName, const std::vector & records) +{ + return sherpa::ResponseCode::Error; +} + +ResponseCode::type LSMServerHandler:: +update(const std::string& databaseName, + const std::string& recordName, + const std::string& recordBody) +{ +/* + uint32_t id; + ResponseCode::type rc = getDatabaseId(databaseName, id); + if (rc != sherpa::ResponseCode::Ok) { + return rc; + } + + insertDatabaseId(const_cast(recordName), id); + Bdb::ResponseCode dbrc = db_[id % numPartitions_]->update(recordName, recordBody); + if (dbrc == Bdb::Ok) { + return sherpa::ResponseCode::Ok; + } else if (dbrc == Bdb::KeyNotFound) { + return sherpa::ResponseCode::RecordNotFound; + } else { + return sherpa::ResponseCode::Error; + } + */ + return sherpa::ResponseCode::Error; +} + +ResponseCode::type LSMServerHandler:: +remove(const std::string& databaseName, const std::string& recordName) +{ +/* + uint32_t id; + ResponseCode::type rc = getDatabaseId(databaseName, id); + if (rc != sherpa::ResponseCode::Ok) { + return rc; + } + insertDatabaseId(const_cast(recordName), id); + Bdb::ResponseCode dbrc = db_[id % numPartitions_]->remove(recordName); + if (dbrc == Bdb::Ok) { + return sherpa::ResponseCode::Ok; + } else if (dbrc == Bdb::KeyNotFound) { + return sherpa::ResponseCode::RecordNotFound; + } else { + } +*/ + return sherpa::ResponseCode::Error; +} diff --git a/sherpa/LSMServerHandler.h b/sherpa/LSMServerHandler.h new file mode 100644 index 0000000..3b14997 --- /dev/null +++ b/sherpa/LSMServerHandler.h @@ -0,0 +1,33 @@ +#include +#include +#include +#include + +using namespace ::apache::thrift; +using namespace ::apache::thrift::protocol; +using namespace ::apache::thrift::transport; + +using namespace sherpa; +using boost::shared_ptr; + +class LSMServerHandler : virtual public PersistentStoreIf { +public: + LSMServerHandler(int argc, char **argv); + ResponseCode::type ping(); + ResponseCode::type addDatabase(const std::string& databaseName); + ResponseCode::type dropDatabase(const std::string& databaseName); + void listDatabases(StringListResponse& _return); + void scan(RecordListResponse& _return, const std::string& databaseName, const ScanOrder::type order, + const std::string& startKey, const bool startKeyIncluded, + const std::string& endKey, const bool endKeyIncluded, + const int32_t maxRecords, const int32_t maxBytes); + void get(BinaryResponse& _return, const std::string& databaseName, const std::string& recordName); + ResponseCode::type insert(const std::string& databaseName, const std::string& recordName, const std::string& recordBody); + ResponseCode::type insertMany(const std::string& databaseName, const std::vector & records); + ResponseCode::type update(const std::string& databaseName, const std::string& recordName, const std::string& recordBody); + ResponseCode::type remove(const std::string& databaseName, const std::string& recordName); + +private: + logtable* ltable_; + +}; diff --git a/sherpa/Makefile b/sherpa/Makefile index 6bb59fb..d4ce170 100644 --- a/sherpa/Makefile +++ b/sherpa/Makefile @@ -1,22 +1,38 @@ -# yinst i log4cpp_y_dev-1.4.2 +ROOT=/home/y +include $(ROOT)/share/yahoo_cfg/Make.defs -SHERPA_DIR = /homes/sears/svndev/trunk/dht +USE_OSE = no -CPPFLAGS=-Wall -I${SHERPA_DIR}/common/include \ - -I${SHERPA_DIR}/storage/handler/src \ - -I${SHERPA_DIR}/storage/handler/include/ \ - -I${SHERPA_DIR}/framework/base/include/ \ - -I${SHERPA_DIR}/externals/fwcore/include \ - -I /home/y/include -I .. -c -fPIC -CXXFLAGS=-m32 -g -CFLAGS=-m32 -g +LIB_NAME = LSMServer +SHLIB_VERSION = 1 -libsherpalogstore.so : LSMPersistentStoreImpl.o tcpclient.o LSMPersistentParentImpl.o - gcc -m32 -g -shared -Wl,-soname,libsherpalogstore.so -o libsherpalogstore.so LSMPersistentStoreImpl.o tcpclient.o LSMPersistentParentImpl.o - cp libsherpalogstore.so /homes/sears/sherpa-so/ +CXXSRC = LSMServerHandler.cc -tcpclient.cpp : ../tcpclient.cpp - ln -s ../tcpclient.cpp +LCLEAN += *~ +WARN += -Werror -Wall +LINC += -I. -I /home/y/include/thrift -I /home/y/include64/stasis -I .. -clean: - rm -f LSMPersistentStoreImpl.o tcpclient.o libsherpalogstore.so LSMPersistentParentImpl.o +LDLIBS += -L/usr/local/lib +LDFLAGS += -Wl,-rpath,/home/y/lib64 -Wl,-rpath,../build + + +# Poor packaging for yicu +LINC += -I/home/y/include/yicu + +LDLIBS += -lthrift -ldht_persistent_store -lstasis -llogstore -L ../build + +# Need to remove potential warnings in yapache. +LDEF += -DEAPI +LDEF += -D_FILE_OFFSET_BITS=64 + +ifdef UNIT_TEST +CXXFLAGS += -O0 -g -DUNIT_TEST +endif + +ifdef DEBUG +CXXFLAGS += -O0 -g +endif + +include $(ROOT)/share/yahoo_cfg/Make.rules + +$(SOTARGET): $(OBJS) diff --git a/sherpa/main/Makefile b/sherpa/main/Makefile new file mode 100644 index 0000000..30fb5d6 --- /dev/null +++ b/sherpa/main/Makefile @@ -0,0 +1,26 @@ +include /home/y/share/yahoo_cfg/Make.defs + +USE_OSE = no + +EXETARGET = lsm_server lsm_client + +CXXSRC = $(addsuffix .cc, $(EXETARGET)) + +LCLEAN += *~ +WARN += -Werror -Wall + +LINC += -I../ -I /home/y/include/thrift -I /home/y/include/boost/tr1/ +LDLIBS += -L../ -ldht_persistent_store -lLSMServer +LDFLAGS += -Wl,-rpath,/home/y/lib64 -Wl,-rpath,../../build + +ifdef UNIT_TEST +CXXFLAGS += -O0 -g -DUNIT_TEST +endif + +ifdef DEBUG +CXXFLAGS += -O0 -g +endif + +include /home/y/share/yahoo_cfg/Make.rules + +$(EXETARGET): $(addsuffix .o, $@) diff --git a/sherpa/main/lsm_client.cc b/sherpa/main/lsm_client.cc new file mode 100644 index 0000000..1b57cbc --- /dev/null +++ b/sherpa/main/lsm_client.cc @@ -0,0 +1,48 @@ +#include +#include +#include +#include +#include +using namespace std; + +using namespace apache::thrift; +using namespace apache::thrift::protocol; +using namespace apache::thrift::transport; + +int main(int argc, char **argv) { + boost::shared_ptr socket(new TSocket("localhost", 9090)); + boost::shared_ptr transport(new TFramedTransport(socket)); + boost::shared_ptr protocol(new TBinaryProtocol(transport)); + + sherpa::PersistentStoreClient client(protocol); + transport->open(); + socket->setNoDelay(true); + sherpa::BinaryResponse getResponse; + sherpa::RecordListResponse scanResponse; + + cout << client.addDatabase("michi") << endl;; + cout << client.insert("michi", "k1", "v1") << endl; + cout << client.insert("michi", "k1", "v1") << endl; + cout << client.insert("michi", "k11", "v11") << endl; + client.get(getResponse, "michi", "k1"); + cout << getResponse.responseCode << endl; + cout << getResponse.value << endl; + + client.scan(scanResponse, "michi", sherpa::ScanOrder::Ascending, "", true, "", true, 100, 100); + std::vector::iterator itr; + for (itr = scanResponse.records.begin(); itr != scanResponse.records.end(); itr++) { + cout << itr->key << " " << itr->value << endl; + } + + client.scan(scanResponse, "michi", sherpa::ScanOrder::Descending, "", true, "", true, 100, 100); + for (itr = scanResponse.records.begin(); itr != scanResponse.records.end(); itr++) { + cout << itr->key << " " << itr->value << endl; + } + + + cout << client.remove("michi", "k1") << endl; + client.get(getResponse, "michi", "k1"); + cout << getResponse.responseCode << endl; + transport->close(); + return 0; +} diff --git a/sherpa/main/lsm_server.cc b/sherpa/main/lsm_server.cc new file mode 100644 index 0000000..c5606a4 --- /dev/null +++ b/sherpa/main/lsm_server.cc @@ -0,0 +1,33 @@ +#include +#include "LSMServerHandler.h" +#include +#include +#include +#include +#include +#include +#include + +using namespace ::apache::thrift; +using namespace ::apache::thrift::protocol; +using namespace ::apache::thrift::transport; +using namespace ::apache::thrift::server; +using namespace ::apache::thrift::concurrency; +using boost::shared_ptr; + +using namespace sherpa; + +int main(int argc, char **argv) { + shared_ptr handler(new LSMServerHandler(argc, argv)); + shared_ptr processor(new PersistentStoreProcessor(handler)); + shared_ptr serverTransport(new TServerSocket(9090)); + shared_ptr transportFactory(new TFramedTransportFactory()); + shared_ptr protocolFactory(new TBinaryProtocolFactory()); + shared_ptr threadManager = ThreadManager::newSimpleThreadManager(32); + shared_ptr threadFactory(new PosixThreadFactory()); + threadManager->threadFactory(threadFactory); + threadManager->start(); + TThreadPoolServer server(processor, serverTransport, transportFactory, protocolFactory, threadManager); + server.serve(); + return 0; +}