From 165a17c39bcf71a7de5f6f5b9a7a8dbe74252c2b Mon Sep 17 00:00:00 2001 From: sears Date: Wed, 22 Feb 2012 22:57:02 +0000 Subject: [PATCH] logtable -> blsm git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@3774 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe --- benchmarks/lsm_microbenchmarks.cpp | 4 +-- logserver.cpp | 2 +- logserver.h | 6 ++--- logstore.cpp | 42 +++++++++++++++--------------- logstore.h | 12 ++++----- main/newserver.cpp | 6 ++--- main/server.cpp | 6 ++--- mergeManager.cpp | 6 ++--- mergeManager.h | 8 +++--- merger.cpp | 10 +++---- merger.h | 4 +-- requestDispatch.cpp | 36 ++++++++++++------------- requestDispatch.h | 32 +++++++++++------------ sherpa/LSMServerHandler.cc | 12 ++++----- sherpa/LSMServerHandler.h | 2 +- simpleServer.cpp | 2 +- simpleServer.h | 4 +-- test/check_datapage.cpp | 8 +++--- test/check_gen.cpp | 6 ++--- test/check_logtable.cpp | 4 +-- test/check_logtree.cpp | 4 +-- test/check_merge.cpp | 6 ++--- test/check_mergelarge.cpp | 6 ++--- test/check_mergetuple.cpp | 6 ++--- test/check_testAndSet.cpp | 8 +++--- 25 files changed, 121 insertions(+), 121 deletions(-) diff --git a/benchmarks/lsm_microbenchmarks.cpp b/benchmarks/lsm_microbenchmarks.cpp index 4a6581c..c8f77ed 100644 --- a/benchmarks/lsm_microbenchmarks.cpp +++ b/benchmarks/lsm_microbenchmarks.cpp @@ -71,7 +71,7 @@ int main (int argc, char * argv[]) { printf("Hard limit=%lld\n", (long long)((stasis_dirty_page_count_hard_limit*PAGE_SIZE)/MB)); printf("Hard limit is %f pct.\n", 100.0 * ((double)stasis_dirty_page_count_hard_limit)/((double)stasis_buffer_manager_size)); - logtable::init_stasis(); + blsm::init_stasis(); RegionAllocator * readableAlloc = NULL; if(!mode) { @@ -441,5 +441,5 @@ int main (int argc, char * argv[]) { } - logtable::deinit_stasis(); + blsm::deinit_stasis(); } diff --git a/logserver.cpp b/logserver.cpp index cd68d17..7ba19ac 100644 --- a/logserver.cpp +++ b/logserver.cpp @@ -37,7 +37,7 @@ #include void *serverLoop(void *args); -void logserver::startserver(logtable *ltable) +void logserver::startserver(blsm *ltable) { sys_alive = true; this->ltable = ltable; diff --git a/logserver.h b/logserver.h index 9932193..3caec35 100644 --- a/logserver.h +++ b/logserver.h @@ -53,7 +53,7 @@ struct pthread_data { int *workitem; //id of the socket to work - logtable *ltable; + blsm *ltable; bool *sys_alive; #ifdef STATS_ENABLED @@ -111,7 +111,7 @@ public: delete qlock; } - void startserver(logtable *ltable); + void startserver(blsm *ltable); void stopserver(); @@ -140,7 +140,7 @@ private: int * self_pipe; // write a byte to self_pipe[1] to wake up select(). std::vector th_list; // list of threads - logtable *ltable; + blsm *ltable; #ifdef STATS_ENABLED int num_reqs; diff --git a/logstore.cpp b/logstore.cpp index b7c461e..d0be4a9 100644 --- a/logstore.cpp +++ b/logstore.cpp @@ -40,7 +40,7 @@ static inline double tv_to_double(struct timeval tv) // LOG TABLE IMPLEMENTATION ///////////////////////////////////////////////////////////////// -logtable::logtable(int log_mode, pageid_t max_c0_size, pageid_t internal_region_size, pageid_t datapage_region_size, pageid_t datapage_size) +blsm::blsm(int log_mode, pageid_t max_c0_size, pageid_t internal_region_size, pageid_t datapage_region_size, pageid_t datapage_size) { recovering = true; this->max_c0_size = max_c0_size; @@ -85,7 +85,7 @@ logtable::logtable(int log_mode, pageid_t max_c0_size, pageid_t internal_region_ stasis_log_file_permissions); } -logtable::~logtable() +blsm::~blsm() { delete merge_mgr; // shuts down pretty print thread. @@ -110,7 +110,7 @@ logtable::~logtable() delete tmerger; } -void logtable::init_stasis() { +void blsm::init_stasis() { DataPage::register_stasis_page_impl(); // stasis_buffer_manager_hint_writes_are_sequential = 1; @@ -118,9 +118,9 @@ void logtable::init_stasis() { } -void logtable::deinit_stasis() { Tdeinit(); } +void blsm::deinit_stasis() { Tdeinit(); } -recordid logtable::allocTable(int xid) +recordid blsm::allocTable(int xid) { table_rec = Talloc(xid, sizeof(tbl_header)); mergeStats * stats = 0; @@ -142,7 +142,7 @@ recordid logtable::allocTable(int xid) return table_rec; } -void logtable::openTable(int xid, recordid rid) { +void blsm::openTable(int xid, recordid rid) { table_rec = rid; Tread(xid, table_rec, &tbl_header); tree_c2 = new diskTreeComponent(xid, tbl_header.c2_root, tbl_header.c2_state, tbl_header.c2_dp_state, 0); @@ -156,14 +156,14 @@ void logtable::openTable(int xid, recordid rid) { } -void logtable::logUpdate(datatuple * tup) { +void blsm::logUpdate(datatuple * tup) { byte * buf = tup->to_bytes(); LogEntry * e = stasis_log_write_update(log_file, 0, INVALID_PAGE, 0/*Page**/, 0/*op*/, buf, tup->byte_length()); log_file->write_entry_done(log_file,e); free(buf); } -void logtable::replayLog() { +void blsm::replayLog() { lsn_t start = tbl_header.log_trunc; LogHandle * lh = start ? getLSNHandle(log_file, start) : getLogHandle(log_file); const LogEntry * e; @@ -184,12 +184,12 @@ void logtable::replayLog() { } -lsn_t logtable::get_log_offset() { +lsn_t blsm::get_log_offset() { if(recovering || !log_mode) { return INVALID_LSN; } return log_file->next_available_lsn(log_file); } -void logtable::truncate_log() { +void blsm::truncate_log() { if(recovering) { printf("Not truncating log until recovery is complete.\n"); } else { @@ -200,7 +200,7 @@ void logtable::truncate_log() { } } -void logtable::update_persistent_header(int xid, lsn_t trunc_lsn) { +void blsm::update_persistent_header(int xid, lsn_t trunc_lsn) { tbl_header.c2_root = tree_c2->get_root_rid(); tbl_header.c2_dp_state = tree_c2->get_datapage_allocator_rid(); @@ -219,7 +219,7 @@ void logtable::update_persistent_header(int xid, lsn_t trunc_lsn) { Tset(xid, table_rec, &tbl_header); } -void logtable::flushTable() +void blsm::flushTable() { struct timeval start_tv, stop_tv; double start, stop; @@ -277,7 +277,7 @@ void logtable::flushTable() c0_flushing = false; } -datatuple * logtable::findTuple(int xid, const datatuple::key_t key, size_t keySize) +datatuple * blsm::findTuple(int xid, const datatuple::key_t key, size_t keySize) { // Apply proportional backpressure to reads as well as writes. This prevents // starvation of the merge threads on fast boxes. @@ -463,7 +463,7 @@ datatuple * logtable::findTuple(int xid, const datatuple::key_t key, size_t keyS * returns the first record found with the matching key * (not to be used together with diffs) **/ -datatuple * logtable::findTuple_first(int xid, datatuple::key_t key, size_t keySize) +datatuple * blsm::findTuple_first(int xid, datatuple::key_t key, size_t keySize) { // Apply proportional backpressure to reads as well as writes. This prevents // starvation of the merge threads on fast boxes. @@ -563,7 +563,7 @@ datatuple * logtable::findTuple_first(int xid, datatuple::key_t key, size_t keyS } -datatuple * logtable::insertTupleHelper(datatuple *tuple) +datatuple * blsm::insertTupleHelper(datatuple *tuple) { bool need_free = false; if(!tuple->isDelete() && expiry != 0) { @@ -613,7 +613,7 @@ datatuple * logtable::insertTupleHelper(datatuple *tuple) return pre_t; } -void logtable::insertManyTuples(datatuple ** tuples, int tuple_count) { +void blsm::insertManyTuples(datatuple ** tuples, int tuple_count) { for(int i = 0; i < tuple_count; i++) { merge_mgr->read_tuple_from_small_component(0, tuples[i]); } @@ -642,7 +642,7 @@ void logtable::insertManyTuples(datatuple ** tuples, int tuple_count) { merge_mgr->read_tuple_from_large_component(0, num_old_tups, sum_old_tup_lens); } -void logtable::insertTuple(datatuple *tuple) +void blsm::insertTuple(datatuple *tuple) { if(log_mode && !recovering) { logUpdate(tuple); @@ -669,7 +669,7 @@ void logtable::insertTuple(datatuple *tuple) DEBUG("tree size %d tuples %lld bytes.\n", tsize, tree_bytes); } -bool logtable::testAndSetTuple(datatuple *tuple, datatuple *tuple2) +bool blsm::testAndSetTuple(datatuple *tuple, datatuple *tuple2) { bool succ = false; static pthread_mutex_t test_and_set_mut = PTHREAD_MUTEX_INITIALIZER; @@ -697,11 +697,11 @@ bool logtable::testAndSetTuple(datatuple *tuple, datatuple *tuple2) return succ; } -void logtable::registerIterator(iterator * it) { +void blsm::registerIterator(iterator * it) { its.push_back(it); } -void logtable::forgetIterator(iterator * it) { +void blsm::forgetIterator(iterator * it) { for(unsigned int i = 0; i < its.size(); i++) { if(its[i] == it) { its.erase(its.begin()+i); @@ -710,7 +710,7 @@ void logtable::forgetIterator(iterator * it) { } } -void logtable::bump_epoch() { +void blsm::bump_epoch() { epoch++; for(unsigned int i = 0; i < its.size(); i++) { its[i]->invalidate(); diff --git a/logstore.h b/logstore.h index 7c10176..31421ea 100644 --- a/logstore.h +++ b/logstore.h @@ -33,7 +33,7 @@ class logtable_mergedata; -class logtable { +class blsm { public: class iterator; @@ -49,9 +49,9 @@ public: // 6GB ~= 100B * 500 GB / (datapage_size * 4KB) // (100B * 500GB) / (6GB * 4KB) = 2.035 // RCS: Set this to 1 so that we do (on average) one seek per b-tree read. - logtable(int log_mode = 0, pageid_t max_c0_size = 100 * 1024 * 1024, pageid_t internal_region_size = 1000, pageid_t datapage_region_size = 10000, pageid_t datapage_size = 1); + blsm(int log_mode = 0, pageid_t max_c0_size = 100 * 1024 * 1024, pageid_t internal_region_size = 1000, pageid_t datapage_region_size = 10000, pageid_t datapage_size = 1); - ~logtable(); + ~blsm(); double * R() { return &r_val; } @@ -322,7 +322,7 @@ public: class iterator { public: - explicit iterator(logtable* ltable) + explicit iterator(blsm* ltable) : ltable(ltable), epoch(ltable->get_epoch()), merge_it_(NULL), @@ -338,7 +338,7 @@ public: // rwlc_unlock(ltable->header_mut); } - explicit iterator(logtable* ltable,datatuple *key) + explicit iterator(blsm* ltable,datatuple *key) : ltable(ltable), epoch(ltable->get_epoch()), merge_it_(NULL), @@ -427,7 +427,7 @@ public: static const int C1 = 0; static const int C1_MERGEABLE = 1; static const int C2 = 2; - logtable * ltable; + blsm * ltable; uint64_t epoch; typedef mergeManyIterator< memTreeComponent::batchedRevalidatingIterator, diff --git a/main/newserver.cpp b/main/newserver.cpp index 4730ea7..843b82e 100644 --- a/main/newserver.cpp +++ b/main/newserver.cpp @@ -61,14 +61,14 @@ int main(int argc, char *argv[]) } } - logtable::init_stasis(); + blsm::init_stasis(); int xid = Tbegin(); recordid table_root = ROOT_RECORD; { - logtable ltable(log_mode, c0_size); + blsm ltable(log_mode, c0_size); ltable.expiry = expiry_delta; if(TrecordType(xid, ROOT_RECORD) == INVALID_SLOT) { @@ -101,7 +101,7 @@ int main(int argc, char *argv[]) printf("Deinitializing stasis...\n"); fflush(stdout); } - logtable::deinit_stasis(); + blsm::deinit_stasis(); printf("Shutdown complete\n"); } diff --git a/main/server.cpp b/main/server.cpp index aca45be..060c196 100644 --- a/main/server.cpp +++ b/main/server.cpp @@ -53,7 +53,7 @@ void terminate (int param) printf("Deinitializing stasis...\n"); fflush(stdout); - logtable::deinit_stasis(); + blsm::deinit_stasis(); exit(0); } @@ -66,7 +66,7 @@ int main(int argc, char *argv[]) prev_fn = signal (SIGINT,terminate); - logtable::init_stasis(); + blsm::init_stasis(); int xid = Tbegin(); @@ -84,7 +84,7 @@ int main(int argc, char *argv[]) printf("note: running w/ 2GB c0 for benchmarking"); // XXX build a separate test server and deployment server? } - logtable ltable(c0_size); + blsm ltable(c0_size); recordid table_root = ROOT_RECORD; if(TrecordType(xid, ROOT_RECORD) == INVALID_SLOT) { diff --git a/mergeManager.cpp b/mergeManager.cpp index 4de676a..4c4dba3 100644 --- a/mergeManager.cpp +++ b/mergeManager.cpp @@ -543,7 +543,7 @@ void mergeManager::init_helper(void) { pthread_create(&update_progress_pthread, 0, merge_manager_update_progress_thread, (void*)this); } -mergeManager::mergeManager(logtable *ltable): +mergeManager::mergeManager(blsm *ltable): UPDATE_PROGRESS_PERIOD(0.005), ltable(ltable) { c0 = new mergeStats(0, ltable ? ltable->max_c0_size : 10000000); @@ -551,7 +551,7 @@ mergeManager::mergeManager(logtable *ltable): c2 = new mergeStats(2, 0); init_helper(); } -mergeManager::mergeManager(logtable *ltable, int xid, recordid rid): +mergeManager::mergeManager(blsm *ltable, int xid, recordid rid): UPDATE_PROGRESS_PERIOD(0.005), ltable(ltable) { marshalled_header h; @@ -581,7 +581,7 @@ void mergeManager::marshal(int xid, recordid rid) { void mergeManager::pretty_print(FILE * out) { #if EXTENDED_STATS - logtable * lt = ltable; + blsm * lt = ltable; bool have_c0 = false; bool have_c0m = false; bool have_c1 = false; diff --git a/mergeManager.h b/mergeManager.h index b67fee0..da0ebad 100644 --- a/mergeManager.h +++ b/mergeManager.h @@ -27,7 +27,7 @@ #include #include -class logtable; +class blsm; class mergeStats; class mergeManager { @@ -48,8 +48,8 @@ public: uint64_t long_tv(struct timeval& tv) { return (1000000ULL * (uint64_t)tv.tv_sec) + ((uint64_t)tv.tv_usec); } - mergeManager(logtable *ltable); - mergeManager(logtable *ltable, int xid, recordid rid); + mergeManager(blsm *ltable); + mergeManager(blsm *ltable, int xid, recordid rid); void marshal(int xid, recordid rid); recordid talloc(int xid); ~mergeManager(); @@ -106,7 +106,7 @@ private: * * TODO: remove mergeManager->ltable? */ - logtable* ltable; + blsm* ltable; mergeStats * c0; /// Per-tree component statistics for c0 and c0_mergeable (the latter should always be null...) mergeStats * c1; /// Per-tree component statistics for c1 and c1_mergeable. mergeStats * c2; /// Per-tree component statistics for c2. diff --git a/merger.cpp b/merger.cpp index 9256ebf..fe48d9f 100644 --- a/merger.cpp +++ b/merger.cpp @@ -28,7 +28,7 @@ static void* diskMerge_thr(void* arg) { return ((merge_scheduler*)arg)->diskMergeThread(); } -merge_scheduler::merge_scheduler(logtable *ltable) : ltable_(ltable), MIN_R(3.0) { } +merge_scheduler::merge_scheduler(blsm *ltable) : ltable_(ltable), MIN_R(3.0) { } merge_scheduler::~merge_scheduler() { } void merge_scheduler::shutdown() { @@ -42,7 +42,7 @@ void merge_scheduler::start() { pthread_create(&disk_merge_thread_, 0, diskMerge_thr, this); } -bool insert_filter(logtable * ltable, datatuple * t, bool dropDeletes) { +bool insert_filter(blsm * ltable, datatuple * t, bool dropDeletes) { if(t->isDelete()) { if(dropDeletes || ! ltable->mightBeAfterMemMerge(t)) { return false; @@ -57,7 +57,7 @@ template void merge_iterators(int xid, diskTreeComponent * forceMe, ITA *itrA, ITB *itrB, - logtable *ltable, + blsm *ltable, diskTreeComponent *scratch_tree, mergeStats * stats, bool dropDeletes); @@ -355,7 +355,7 @@ static void periodically_force(int xid, int *i, diskTreeComponent * forceMe, sta } } -static int garbage_collect(logtable * ltable_, datatuple ** garbage, int garbage_len, int next_garbage, bool force = false) { +static int garbage_collect(blsm * ltable_, datatuple ** garbage, int garbage_len, int next_garbage, bool force = false) { if(next_garbage == garbage_len || force) { pthread_mutex_lock(<able_->rb_mut); for(int i = 0; i < next_garbage; i++) { @@ -391,7 +391,7 @@ void merge_iterators(int xid, diskTreeComponent * forceMe, ITA *itrA, //iterator on c1 or c2 ITB *itrB, //iterator on c0 or c1, respectively - logtable *ltable, + blsm *ltable, diskTreeComponent *scratch_tree, mergeStats * stats, bool dropDeletes // should be true iff this is biggest component ) diff --git a/merger.h b/merger.h index 635683f..ef55c22 100644 --- a/merger.h +++ b/merger.h @@ -25,7 +25,7 @@ class merge_scheduler { public: - merge_scheduler(logtable * ltable); + merge_scheduler(blsm * ltable); ~merge_scheduler(); void start(); @@ -37,7 +37,7 @@ public: private: pthread_t mem_merge_thread_; pthread_t disk_merge_thread_; - logtable * ltable_; + blsm * ltable_; const double MIN_R; }; diff --git a/requestDispatch.cpp b/requestDispatch.cpp index d822e6b..2d2c1aa 100644 --- a/requestDispatch.cpp +++ b/requestDispatch.cpp @@ -22,21 +22,21 @@ #include "regionAllocator.h" template -inline int requestDispatch::op_insert(logtable * ltable, HANDLE fd, datatuple * tuple) { +inline int requestDispatch::op_insert(blsm * ltable, HANDLE fd, datatuple * tuple) { //insert/update/delete ltable->insertTuple(tuple); //step 4: send response return writeoptosocket(fd, LOGSTORE_RESPONSE_SUCCESS); } template -inline int requestDispatch::op_test_and_set(logtable * ltable, HANDLE fd, datatuple * tuple, datatuple * tuple2) { +inline int requestDispatch::op_test_and_set(blsm * ltable, HANDLE fd, datatuple * tuple, datatuple * tuple2) { //insert/update/delete bool succ = ltable->testAndSetTuple(tuple, tuple2); //step 4: send response return writeoptosocket(fd, succ ? LOGSTORE_RESPONSE_SUCCESS : LOGSTORE_RESPONSE_FAIL); } template -inline int requestDispatch::op_bulk_insert(logtable *ltable, HANDLE fd) { +inline int requestDispatch::op_bulk_insert(blsm *ltable, HANDLE fd) { int err = writeoptosocket(fd, LOGSTORE_RESPONSE_RECEIVING_TUPLES); datatuple ** tups = (datatuple **) malloc(sizeof(tups[0]) * 100); int tups_size = 100; @@ -60,7 +60,7 @@ inline int requestDispatch::op_bulk_insert(logtable *ltable, HANDLE fd) return err; } template -inline int requestDispatch::op_find(logtable * ltable, HANDLE fd, datatuple * tuple) { +inline int requestDispatch::op_find(blsm * ltable, HANDLE fd, datatuple * tuple) { //find the tuple datatuple *dt = ltable->findTuple_first(-1, tuple->strippedkey(), tuple->strippedkeylen()); @@ -105,12 +105,12 @@ inline int requestDispatch::op_find(logtable * ltable, HANDLE fd, datatu return err; } template -inline int requestDispatch::op_scan(logtable * ltable, HANDLE fd, datatuple * tuple, datatuple * tuple2, size_t limit) { +inline int requestDispatch::op_scan(blsm * ltable, HANDLE fd, datatuple * tuple, datatuple * tuple2, size_t limit) { size_t count = 0; int err = writeoptosocket(fd, LOGSTORE_RESPONSE_SENDING_TUPLES); if(!err) { - logtable::iterator * itr = new logtable::iterator(ltable, tuple); + blsm::iterator * itr = new blsm::iterator(ltable, tuple); datatuple * t; while(!err && (t = itr->getnext())) { if(tuple2) { // are we at the end of range? @@ -130,17 +130,17 @@ inline int requestDispatch::op_scan(logtable * ltable, HANDLE fd, datatu return err; } template -inline int requestDispatch::op_flush(logtable * ltable, HANDLE fd) { +inline int requestDispatch::op_flush(blsm * ltable, HANDLE fd) { ltable->flushTable(); return writeoptosocket(fd, LOGSTORE_RESPONSE_SUCCESS); } template -inline int requestDispatch::op_shutdown(logtable * ltable, HANDLE fd) { +inline int requestDispatch::op_shutdown(blsm * ltable, HANDLE fd) { ltable->accepting_new_requests = false; return writeoptosocket(fd, LOGSTORE_RESPONSE_SUCCESS); } template -inline int requestDispatch::op_stat_space_usage(logtable * ltable, HANDLE fd) { +inline int requestDispatch::op_stat_space_usage(blsm * ltable, HANDLE fd) { int xid = Tbegin(); @@ -238,13 +238,13 @@ inline int requestDispatch::op_stat_space_usage(logtable * ltable, HANDL return err; } template -inline int requestDispatch::op_stat_perf_report(logtable * ltable, HANDLE fd) { +inline int requestDispatch::op_stat_perf_report(blsm * ltable, HANDLE fd) { } template -inline int requestDispatch::op_stat_histogram(logtable * ltable, HANDLE fd, size_t limit) { +inline int requestDispatch::op_stat_histogram(blsm * ltable, HANDLE fd, size_t limit) { if(limit < 3) { return writeoptosocket(fd, LOGSTORE_PROTOCOL_ERROR); @@ -302,7 +302,7 @@ inline int requestDispatch::op_stat_histogram(logtable * ltable, HANDLE return err; } template -inline int requestDispatch::op_dbg_blockmap(logtable * ltable, HANDLE fd) { +inline int requestDispatch::op_dbg_blockmap(blsm * ltable, HANDLE fd) { // produce a list of stasis regions int xid = Tbegin(); @@ -406,8 +406,8 @@ inline int requestDispatch::op_dbg_blockmap(logtable * ltable, HANDLE fd } template -inline int requestDispatch::op_dbg_drop_database(logtable * ltable, HANDLE fd) { - logtable::iterator * itr = new logtable::iterator(ltable); +inline int requestDispatch::op_dbg_drop_database(blsm * ltable, HANDLE fd) { + blsm::iterator * itr = new blsm::iterator(ltable); datatuple * del; fprintf(stderr, "DROPPING DATABASE...\n"); long long n = 0; @@ -432,11 +432,11 @@ inline int requestDispatch::op_dbg_drop_database(logtable * ltable, HAND return writeoptosocket(fd, LOGSTORE_RESPONSE_SUCCESS); } template -inline int requestDispatch::op_dbg_noop(logtable * ltable, HANDLE fd) { +inline int requestDispatch::op_dbg_noop(blsm * ltable, HANDLE fd) { return writeoptosocket(fd, LOGSTORE_RESPONSE_SUCCESS); } template -inline int requestDispatch::op_dbg_set_log_mode(logtable * ltable, HANDLE fd, datatuple * tuple) { +inline int requestDispatch::op_dbg_set_log_mode(blsm * ltable, HANDLE fd, datatuple * tuple) { if(tuple->rawkeylen() != sizeof(int)) { abort(); return writeoptosocket(fd, LOGSTORE_PROTOCOL_ERROR); @@ -448,7 +448,7 @@ inline int requestDispatch::op_dbg_set_log_mode(logtable * ltable, HANDL } } template -int requestDispatch::dispatch_request(HANDLE f, logtable *ltable) { +int requestDispatch::dispatch_request(HANDLE f, blsm *ltable) { //step 1: read the opcode network_op_t opcode = readopfromsocket(f, LOGSTORE_CLIENT_REQUEST); if(opcode == LOGSTORE_CONN_CLOSED_ERROR) { @@ -485,7 +485,7 @@ int requestDispatch::dispatch_request(HANDLE f, logtable *ltable) { } template -int requestDispatch::dispatch_request(network_op_t opcode, datatuple * tuple, datatuple * tuple2, logtable * ltable, HANDLE fd) { +int requestDispatch::dispatch_request(network_op_t opcode, datatuple * tuple, datatuple * tuple2, blsm * ltable, HANDLE fd) { int err = 0; #if 0 if(tuple) { diff --git a/requestDispatch.h b/requestDispatch.h index cdc529a..ae65bb5 100644 --- a/requestDispatch.h +++ b/requestDispatch.h @@ -13,23 +13,23 @@ template class requestDispatch { private: - static inline int op_insert(logtable * ltable, HANDLE fd, datatuple * tuple); - static inline int op_test_and_set(logtable * ltable, HANDLE fd, datatuple * tuple, datatuple * tuple2); - static inline int op_find(logtable * ltable, HANDLE fd, datatuple * tuple); - static inline int op_scan(logtable * ltable, HANDLE fd, datatuple * tuple, datatuple * tuple2, size_t limit); - static inline int op_bulk_insert(logtable * ltable, HANDLE fd); - static inline int op_flush(logtable * ltable, HANDLE fd); - static inline int op_shutdown(logtable * ltable, HANDLE fd); - static inline int op_stat_space_usage(logtable * ltable, HANDLE fd); - static inline int op_stat_perf_report(logtable * ltable, HANDLE fd); - static inline int op_stat_histogram(logtable * ltable, HANDLE fd, size_t limit); - static inline int op_dbg_blockmap(logtable * ltable, HANDLE fd); - static inline int op_dbg_drop_database(logtable * ltable, HANDLE fd); - static inline int op_dbg_noop(logtable * ltable, HANDLE fd); - static inline int op_dbg_set_log_mode(logtable * ltable, HANDLE fd, datatuple * tuple); + static inline int op_insert(blsm * ltable, HANDLE fd, datatuple * tuple); + static inline int op_test_and_set(blsm * ltable, HANDLE fd, datatuple * tuple, datatuple * tuple2); + static inline int op_find(blsm * ltable, HANDLE fd, datatuple * tuple); + static inline int op_scan(blsm * ltable, HANDLE fd, datatuple * tuple, datatuple * tuple2, size_t limit); + static inline int op_bulk_insert(blsm * ltable, HANDLE fd); + static inline int op_flush(blsm * ltable, HANDLE fd); + static inline int op_shutdown(blsm * ltable, HANDLE fd); + static inline int op_stat_space_usage(blsm * ltable, HANDLE fd); + static inline int op_stat_perf_report(blsm * ltable, HANDLE fd); + static inline int op_stat_histogram(blsm * ltable, HANDLE fd, size_t limit); + static inline int op_dbg_blockmap(blsm * ltable, HANDLE fd); + static inline int op_dbg_drop_database(blsm * ltable, HANDLE fd); + static inline int op_dbg_noop(blsm * ltable, HANDLE fd); + static inline int op_dbg_set_log_mode(blsm * ltable, HANDLE fd, datatuple * tuple); public: - static int dispatch_request(HANDLE f, logtable * ltable); - static int dispatch_request(network_op_t opcode, datatuple * tuple, datatuple * tuple2, logtable * ltable, HANDLE fd); + static int dispatch_request(HANDLE f, blsm * ltable); + static int dispatch_request(network_op_t opcode, datatuple * tuple, datatuple * tuple2, blsm * ltable, HANDLE fd); }; #endif /* REQUESTDISPATCH_H_ */ diff --git a/sherpa/LSMServerHandler.cc b/sherpa/LSMServerHandler.cc index d43b669..ba43a11 100644 --- a/sherpa/LSMServerHandler.cc +++ b/sherpa/LSMServerHandler.cc @@ -89,14 +89,14 @@ LSMServerHandler(int argc, char **argv) } pthread_mutex_init(&mutex_, 0); - logtable::init_stasis(); + blsm::init_stasis(); int xid = Tbegin(); recordid table_root = ROOT_RECORD; { - ltable_ = new logtable(log_mode, c0_size); + ltable_ = new blsm(log_mode, c0_size); ltable_->expiry = expiry_delta; if(TrecordType(xid, ROOT_RECORD) == INVALID_SLOT) { @@ -135,7 +135,7 @@ initNextDatabaseId() uint32_t id = 0; datatuple* start = buildTuple(id, ""); datatuple* end = buildTuple(id + 1, ""); - logtable::iterator* itr = new logtable::iterator(ltable_, start); + blsm::iterator* itr = new blsm::iterator(ltable_, start); datatuple* current; while ((current = itr->getnext())) { // are we at the end of range? @@ -220,7 +220,7 @@ dropMap(const std::string& databaseName) datatuple::freetuple(exists); datatuple * startKey = buildTuple(id, ""); - logtable::iterator * itr = new logtable::iterator(ltable_, startKey); + blsm::iterator * itr = new blsm::iterator(ltable_, startKey); datatuple::freetuple(startKey); datatuple * current; @@ -251,7 +251,7 @@ void LSMServerHandler:: listMaps(StringListResponse& _return) { datatuple * startKey = buildTuple(0, ""); - logtable::iterator * itr = new logtable::iterator(ltable_, startKey); + blsm::iterator * itr = new blsm::iterator(ltable_, startKey); datatuple::freetuple(startKey); datatuple * current; while(NULL != (current = itr->getnext())) { @@ -290,7 +290,7 @@ scan(RecordListResponse& _return, const std::string& databaseName, const ScanOrd } else { end = buildTuple(id, endKey); } - logtable::iterator* itr = new logtable::iterator(ltable_, start); + blsm::iterator* itr = new blsm::iterator(ltable_, start); int32_t resultSize = 0; diff --git a/sherpa/LSMServerHandler.h b/sherpa/LSMServerHandler.h index 0ae0b6b..4e3d416 100644 --- a/sherpa/LSMServerHandler.h +++ b/sherpa/LSMServerHandler.h @@ -57,7 +57,7 @@ private: datatuple* buildTuple(uint32_t databaseId, const std::string& recordName, const std::string& recordBody); datatuple* buildTuple(uint32_t databaseId, const std::string& recordName, const void* body, uint32_t bodySize); void initNextDatabaseId(); - logtable* ltable_; + blsm* ltable_; uint32_t nextDatabaseId_; pthread_mutex_t mutex_; }; diff --git a/simpleServer.cpp b/simpleServer.cpp index 1105299..a534e0e 100644 --- a/simpleServer.cpp +++ b/simpleServer.cpp @@ -67,7 +67,7 @@ void * simpleServer::worker(int self) { } } -simpleServer::simpleServer(logtable * ltable, int max_threads, int port): +simpleServer::simpleServer(blsm * ltable, int max_threads, int port): ltable(ltable), port(port), max_threads(max_threads), diff --git a/simpleServer.h b/simpleServer.h index b6b1152..7ba59e6 100644 --- a/simpleServer.h +++ b/simpleServer.h @@ -29,11 +29,11 @@ public: static const int DEFAULT_PORT = 32432; static const int DEFAULT_THREADS = 1000; - simpleServer(logtable * ltable, int max_threads = DEFAULT_THREADS, int port = DEFAULT_PORT); + simpleServer(blsm * ltable, int max_threads = DEFAULT_THREADS, int port = DEFAULT_PORT); bool acceptLoop(); ~simpleServer(); private: - logtable* ltable; + blsm* ltable; int port; int max_threads; int * thread_fd; diff --git a/test/check_datapage.cpp b/test/check_datapage.cpp index a424888..596e6da 100644 --- a/test/check_datapage.cpp +++ b/test/check_datapage.cpp @@ -46,7 +46,7 @@ void insertWithConcurrentReads(size_t NUM_ENTRIES) { sync(); - logtable::init_stasis(); + blsm::init_stasis(); int xid = Tbegin(); @@ -140,7 +140,7 @@ void insertWithConcurrentReads(size_t NUM_ENTRIES) { printf("Writes complete.\n"); Tcommit(xid); - logtable::deinit_stasis(); + blsm::deinit_stasis(); } void insertProbeIter(size_t NUM_ENTRIES) @@ -151,7 +151,7 @@ void insertProbeIter(size_t NUM_ENTRIES) sync(); - logtable::init_stasis(); + blsm::init_stasis(); int xid = Tbegin(); @@ -246,7 +246,7 @@ void insertProbeIter(size_t NUM_ENTRIES) Tcommit(xid); - logtable::deinit_stasis(); + blsm::deinit_stasis(); } diff --git a/test/check_gen.cpp b/test/check_gen.cpp index ea5286b..0b3f3d1 100644 --- a/test/check_gen.cpp +++ b/test/check_gen.cpp @@ -29,11 +29,11 @@ int main(int argc, char **argv) sync(); - logtable::init_stasis(); + blsm::init_stasis(); int xid = Tbegin(); - logtable *ltable = new logtable(1000, 10000, 5); + blsm *ltable = new blsm(1000, 10000, 5); recordid table_root = ltable->allocTable(xid); @@ -48,7 +48,7 @@ int main(int argc, char **argv) delete ro_alloc; Tcommit(xid); delete ltable; - logtable::deinit_stasis(); + blsm::deinit_stasis(); diff --git a/test/check_logtable.cpp b/test/check_logtable.cpp index 3a2fa7a..8ce37c1 100644 --- a/test/check_logtable.cpp +++ b/test/check_logtable.cpp @@ -44,7 +44,7 @@ void insertProbeIter(size_t NUM_ENTRIES) sync(); - logtable::init_stasis(); + blsm::init_stasis(); int xid = Tbegin(); @@ -132,7 +132,7 @@ void insertProbeIter(size_t NUM_ENTRIES) printf("Random Reads completed.\n"); Tcommit(xid); - logtable::deinit_stasis(); + blsm::deinit_stasis(); } /** @test diff --git a/test/check_logtree.cpp b/test/check_logtree.cpp index 204d2c2..0ab0218 100644 --- a/test/check_logtree.cpp +++ b/test/check_logtree.cpp @@ -54,7 +54,7 @@ void insertProbeIter_str(int NUM_ENTRIES) system("rm -rf stasis_log/"); sync(); - logtable::init_stasis(); + blsm::init_stasis(); int xid = Tbegin(); @@ -156,7 +156,7 @@ void insertProbeIter_str(int NUM_ENTRIES) delete it; delete ro_alloc; Tcommit(xid); - logtable::deinit_stasis(); + blsm::deinit_stasis(); } diff --git a/test/check_merge.cpp b/test/check_merge.cpp index 5ea79fa..cbef2d0 100644 --- a/test/check_merge.cpp +++ b/test/check_merge.cpp @@ -45,7 +45,7 @@ void insertProbeIter(size_t NUM_ENTRIES) unlink("logfile.txt"); system("rm -rf stasis_log/"); - logtable::init_stasis(); + blsm::init_stasis(); //data generation std::vector * data_arr = new std::vector; @@ -70,7 +70,7 @@ void insertProbeIter(size_t NUM_ENTRIES) int xid = Tbegin(); - logtable * ltable = new logtable(10 * 1024 * 1024, 1000, 10000, 5); + blsm * ltable = new blsm(10 * 1024 * 1024, 1000, 10000, 5); merge_scheduler mscheduler(ltable); recordid table_root = ltable->allocTable(xid); @@ -168,7 +168,7 @@ void insertProbeIter(size_t NUM_ENTRIES) Tcommit(xid); delete ltable; - logtable::deinit_stasis(); + blsm::deinit_stasis(); } diff --git a/test/check_mergelarge.cpp b/test/check_mergelarge.cpp index 2bae9f6..8d35c96 100644 --- a/test/check_mergelarge.cpp +++ b/test/check_mergelarge.cpp @@ -43,7 +43,7 @@ void insertProbeIter(size_t NUM_ENTRIES) unlink("logfile.txt"); system("rm -rf stasis_log/"); - logtable::init_stasis(); + blsm::init_stasis(); //data generation // std::vector * data_arr = new std::vector; @@ -62,7 +62,7 @@ void insertProbeIter(size_t NUM_ENTRIES) int xid = Tbegin(); - logtable *ltable = new logtable(10*1024*1024, 1000, 10000, 100); + blsm *ltable = new blsm(10*1024*1024, 1000, 10000, 100); merge_scheduler mscheduler(ltable); recordid table_root = ltable->allocTable(xid); @@ -109,7 +109,7 @@ void insertProbeIter(size_t NUM_ENTRIES) printf("merge threads finished.\n"); gettimeofday(&stop_tv,0); printf("run time: %6.1f\n", (tv_to_double(stop_tv) - tv_to_double(start_tv))); - logtable::deinit_stasis(); + blsm::deinit_stasis(); } diff --git a/test/check_mergetuple.cpp b/test/check_mergetuple.cpp index dfe4e05..5edced0 100644 --- a/test/check_mergetuple.cpp +++ b/test/check_mergetuple.cpp @@ -45,7 +45,7 @@ void insertProbeIter(size_t NUM_ENTRIES) sync(); - logtable::init_stasis(); + blsm::init_stasis(); double delete_freq = .05; double update_freq = .15; @@ -120,7 +120,7 @@ void insertProbeIter(size_t NUM_ENTRIES) int xid = Tbegin(); - logtable *ltable = new logtable(10 * 1024 * 1024, 1000, 1000, 40); + blsm *ltable = new blsm(10 * 1024 * 1024, 1000, 1000, 40); merge_scheduler mscheduler(ltable); recordid table_root = ltable->allocTable(xid); @@ -265,7 +265,7 @@ void insertProbeIter(size_t NUM_ENTRIES) Tcommit(xid); delete ltable; - logtable::deinit_stasis(); + blsm::deinit_stasis(); } diff --git a/test/check_testAndSet.cpp b/test/check_testAndSet.cpp index e684c34..75603e1 100644 --- a/test/check_testAndSet.cpp +++ b/test/check_testAndSet.cpp @@ -42,7 +42,7 @@ unsigned char vals[NUM_THREADS]; -logtable * ltable; +blsm * ltable; int myucharcmp(const void * ap, const void * bp) { unsigned char a = *(unsigned char*)ap; @@ -73,10 +73,10 @@ void insertProbeIter(size_t NUM_ENTRIES) unlink("logfile.txt"); system("rm -rf stasis_log/"); - logtable::init_stasis(); + blsm::init_stasis(); int xid = Tbegin(); - ltable = new logtable(10 * 1024 * 1024, 1000, 10000, 5); + ltable = new blsm(10 * 1024 * 1024, 1000, 10000, 5); merge_scheduler mscheduler(ltable); @@ -106,7 +106,7 @@ void insertProbeIter(size_t NUM_ENTRIES) mscheduler.shutdown(); delete ltable; - logtable::deinit_stasis(); + blsm::deinit_stasis(); printf("\npass\n"); }