From a00531ae6dc1a1510202528a4c47f1b2e42ed4f7 Mon Sep 17 00:00:00 2001 From: sears Date: Sat, 11 Dec 2010 00:51:19 +0000 Subject: [PATCH] simplified merge_scheduler git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@1479 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe --- logstore.cpp | 17 ++- logstore.h | 5 - mergeManager.cpp | 6 +- mergeManager.h | 4 +- mergeStats.h | 9 +- merger.cpp | 250 +++++++++++++++----------------------- merger.h | 35 ++---- newserver.cpp | 9 +- server.cpp | 7 +- test/check_gen.cpp | 8 +- test/check_merge.cpp | 17 ++- test/check_mergelarge.cpp | 16 ++- test/check_mergetuple.cpp | 20 +-- test/check_testAndSet.cpp | 20 +-- 14 files changed, 168 insertions(+), 255 deletions(-) diff --git a/logstore.cpp b/logstore.cpp index 68e21a4..dabe1ad 100644 --- a/logstore.cpp +++ b/logstore.cpp @@ -23,7 +23,7 @@ template logtable::logtable(pageid_t internal_region_size, pageid_t datapage_region_size, pageid_t datapage_size) { - r_val = MIN_R; + r_val = 3.0; // MIN_R tree_c0 = NULL; tree_c0_mergeable = NULL; c0_is_merging = false; @@ -36,8 +36,6 @@ logtable::logtable(pageid_t internal_region_size, pageid_t datapage_regio this->shutting_down_ = false; flushing = false; this->merge_mgr = new mergeManager(this); - this->mergedata = 0; - //tmerger = new tuplemerger(&append_merger); tmerger = new tuplemerger(&replace_merger); header_mut = rwlc_initlock(); @@ -61,6 +59,8 @@ logtable::logtable(pageid_t internal_region_size, pageid_t datapage_regio template logtable::~logtable() { + delete merge_mgr; // shuts down pretty print thread. + if(tree_c1 != NULL) delete tree_c1; if(tree_c2 != NULL) @@ -85,7 +85,7 @@ template void logtable::init_stasis() { DataPage::register_stasis_page_impl(); - stasis_buffer_manager_size = 768 * 1024; // 4GB = 2^10 pages: + //stasis_buffer_manager_size = 768 * 1024; // 4GB = 2^10 pages: // XXX Workaround Stasis' (still broken) default concurrent buffer manager // stasis_buffer_manager_factory = stasis_buffer_manager_hash_factory; // stasis_buffer_manager_hint_writes_are_sequential = 0; @@ -112,6 +112,8 @@ recordid logtable::allocTable(int xid) merge_mgr->new_merge(0); c0_stats->starting_merge(); + tree_c0 = new memTreeComponent::rbtree_t; + update_persistent_header(xid, 1); update_persistent_header(xid, 2); @@ -124,6 +126,7 @@ void logtable::openTable(int xid, recordid rid) { Tread(xid, table_rec, &tbl_header); tree_c2 = new diskTreeComponent(xid, tbl_header.c2_root, tbl_header.c2_state, tbl_header.c2_dp_state, 0); tree_c1 = new diskTreeComponent(xid, tbl_header.c1_root, tbl_header.c1_state, tbl_header.c1_dp_state, 0); + tree_c0 = new memTreeComponent::rbtree_t; merge_mgr->get_merge_stats(1)->bytes_out = tbl_header.c1_base_size; merge_mgr->get_merge_stats(1)->base_size = tbl_header.c1_base_size; @@ -160,12 +163,6 @@ void logtable::update_persistent_header(int xid, int merge_level) { Tset(xid, table_rec, &tbl_header); } -template -void logtable::setMergeData(logtable_mergedata * mdata){ - this->mergedata = mdata; - bump_epoch(); -} - template void logtable::flushTable() { diff --git a/logstore.h b/logstore.h index 2d68ba7..621a58d 100644 --- a/logstore.h +++ b/logstore.h @@ -100,9 +100,6 @@ public: void set_tree_c0_mergeable(memTreeComponent::rbtree_ptr_t newtree){tree_c0_mergeable = newtree; bump_epoch(); } void update_persistent_header(int xid, int merge_level); - void setMergeData(logtable_mergedata * mdata); - logtable_mergedata* getMergeData(){return mergedata;} - inline tuplemerger * gettuplemerger(){return tmerger;} public: @@ -118,8 +115,6 @@ public: pageid_t c1_mergeable_size; pageid_t c1_base_size; }; - - logtable_mergedata * mergedata; rwlc * header_mut; pthread_mutex_t tick_mut; pthread_mutex_t rb_mut; diff --git a/mergeManager.cpp b/mergeManager.cpp index 2dbb12a..47de3d8 100644 --- a/mergeManager.cpp +++ b/mergeManager.cpp @@ -23,13 +23,13 @@ mergeStats* mergeManager:: get_merge_stats(int mergeLevel) { } mergeManager::~mergeManager() { - delete c0; - delete c1; - delete c2; still_running = false; pthread_cond_signal(&pp_cond); pthread_join(pp_thread, 0); pthread_cond_destroy(&pp_cond); + delete c0; + delete c1; + delete c2; } void mergeManager::new_merge(int mergeLevel) { diff --git a/mergeManager.h b/mergeManager.h index e63c6f9..17a86c5 100644 --- a/mergeManager.h +++ b/mergeManager.h @@ -64,16 +64,14 @@ public: private: logtable* ltable; double throttle_seconds; -// double elapsed_seconds; double last_throttle_seconds; -// double last_elapsed_seconds; mergeStats * c0; mergeStats * c1; mergeStats * c2; bool sleeping[3]; bool still_running; + // Needed so that the pretty print thread can be woken up during shutdown. pthread_cond_t pp_cond; pthread_t pp_thread; - }; #endif /* MERGEMANAGER_H_ */ diff --git a/mergeStats.h b/mergeStats.h index 9b0c52a..f1dbaad 100644 --- a/mergeStats.h +++ b/mergeStats.h @@ -46,17 +46,13 @@ class mergeStats { lifetime_elapsed(0), lifetime_consumed(0), bps(10.0*1024.0*1024.0), - print_skipped(0), active(false) { gettimeofday(&sleep,0); gettimeofday(&last,0); mergeManager::double_to_ts(&last_tick, mergeManager::tv_to_double(&last)); mergeManager::double_to_ts(&last_mini_tick, mergeManager::tv_to_double(&last)); - pthread_mutex_init(&mut,0); - } - ~mergeStats() { - pthread_mutex_destroy(&mut); } + ~mergeStats() { } void new_merge2() { if(just_handed_off) { bytes_out = 0; @@ -151,11 +147,8 @@ class mergeStats { double bps; - int print_skipped; // used by pretty print in mergeManager. - bool active; - pthread_mutex_t mut; // protects things touched in tick(), and nothing else. public: void pretty_print(FILE* fd) { diff --git a/merger.cpp b/merger.cpp index 45f43e2..02cec26 100644 --- a/merger.cpp +++ b/merger.cpp @@ -1,78 +1,29 @@ - #include #include "merger.h" - #include #undef try #undef end -int merge_scheduler::addlogtable(logtable *ltable) -{ - - struct logtable_mergedata * mdata = new logtable_mergedata; - - // initialize merge data - ltable->set_tree_c0_mergeable(NULL); - - mergedata.push_back(std::make_pair(ltable, mdata)); - return mergedata.size()-1; - +static void* memMerge_thr(void* arg) { + return ((merge_scheduler*)arg)->memMergeThread(); +} +static void* diskMerge_thr(void* arg) { + return ((merge_scheduler*)arg)->diskMergeThread(); } -merge_scheduler::~merge_scheduler() -{ - mergedata.clear(); +merge_scheduler::merge_scheduler(logtable *ltable) : ltable_(ltable), MIN_R(3.0) { } +merge_scheduler::~merge_scheduler() { } +void merge_scheduler::shutdown() { + ltable_->stop(); + pthread_join(mem_merge_thread_, 0); + pthread_join(disk_merge_thread_, 0); } -void merge_scheduler::shutdown() -{ - //signal shutdown - for(size_t i=0; i *ltable = mergedata[i].first; - - ltable->stop(); - - } - - for(size_t i=0; imemmerge_thread,0); - pthread_join(mdata->diskmerge_thread,0); - } -} - -void merge_scheduler::startlogtable(int index, int64_t MAX_C0_SIZE) -{ - - logtable * ltable = mergedata[index].first; - struct logtable_mergedata *mdata = mergedata[index].second; - - //initialize rb-tree - ltable->set_tree_c0(new memTreeComponent::rbtree_t); - - //disk merger args -#ifdef NO_SNOWSHOVEL - ltable->set_max_c0_size(MAX_C0_SIZE); -#else - ltable->set_max_c0_size(MAX_C0_SIZE*2); // XXX blatant hack. -#endif - diskTreeComponent ** block1_scratch = new diskTreeComponent*; - *block1_scratch=0; - - DEBUG("Tree C1 is %lld\n", (long long)ltable->get_tree_c1()->get_root_rec().page); - DEBUG("Tree C2 is %lld\n", (long long)ltable->get_tree_c2()->get_root_rec().page); - - void * (*diskmerger)(void*) = diskMergeThread; - void * (*memmerger)(void*) = memMergeThread; - - pthread_create(&mdata->diskmerge_thread, 0, diskmerger, ltable); - pthread_create(&mdata->memmerge_thread, 0, memmerger, ltable); - +void merge_scheduler::start() { + pthread_create(&mem_merge_thread_, 0, memMerge_thr, this); + pthread_create(&disk_merge_thread_, 0, diskMerge_thr, this); } template @@ -106,51 +57,49 @@ void merge_iterators(int xid, diskTreeComponent * forceMe, Merge algorithm: actual order: 1 2 3 4 5 6 12 11.5 11 [7 8 (9) 10] 13 */ -void* memMergeThread(void*arg) -{ +void * merge_scheduler::memMergeThread() { int xid; - logtable * ltable = (logtable*)arg; - assert(ltable->get_tree_c1()); + assert(ltable_->get_tree_c1()); int merge_count =0; - mergeStats * stats = ltable->merge_mgr->get_merge_stats(1); + mergeStats * stats = ltable_->merge_mgr->get_merge_stats(1); while(true) // 1 { - rwlc_writelock(ltable->header_mut); - ltable->merge_mgr->new_merge(1); + rwlc_writelock(ltable_->header_mut); + ltable_->merge_mgr->new_merge(1); int done = 0; // 2: wait for c0_mergable #ifdef NO_SNOWSHOVEL - while(!ltable->get_tree_c0_mergeable()) + while(!ltable_->get_tree_c0_mergeable()) { - pthread_cond_signal(<able->c0_needed); + pthread_cond_signal(<able_->c0_needed); - if(!ltable->is_still_running()){ + if(!ltable_->is_still_running()){ done = 1; break; } DEBUG("mmt:\twaiting for block ready cond\n"); - rwlc_cond_wait(<able->c0_ready, ltable->header_mut); + rwlc_cond_wait(<able_->c0_ready, ltable_->header_mut); DEBUG("mmt:\tblock ready\n"); } #else // the merge iterator will wait until c0 is big enough for us to proceed. - if(!ltable->is_still_running()) { + if(!ltable_->is_still_running()) { done = 1; } #endif if(done==1) { - pthread_cond_signal(<able->c1_ready); // no block is ready. this allows the other thread to wake up, and see that we're shutting down. - rwlc_unlock(ltable->header_mut); + pthread_cond_signal(<able_->c1_ready); // no block is ready. this allows the other thread to wake up, and see that we're shutting down. + rwlc_unlock(ltable_->header_mut); break; } @@ -162,40 +111,40 @@ void* memMergeThread(void*arg) // 4: Merge //create the iterators - diskTreeComponent::iterator *itrA = ltable->get_tree_c1()->open_iterator(); + diskTreeComponent::iterator *itrA = ltable_->get_tree_c1()->open_iterator(); #ifdef NO_SNOWSHOVEL memTreeComponent::iterator *itrB = - new memTreeComponent::iterator(ltable->get_tree_c0_mergeable()); + new memTreeComponent::iterator(ltable_->get_tree_c0_mergeable()); #else // memTreeComponent::revalidatingIterator *itrB = -// new memTreeComponent::revalidatingIterator(ltable->get_tree_c0(), <able->rb_mut); +// new memTreeComponent::revalidatingIterator(ltable_->get_tree_c0(), <able_->rb_mut); // memTreeComponent::batchedRevalidatingIterator *itrB = -// new memTreeComponent::batchedRevalidatingIterator(ltable->get_tree_c0(), <able->tree_bytes, ltable->max_c0_size, <able->flushing, 100, <able->rb_mut); +// new memTreeComponent::batchedRevalidatingIterator(ltable_->get_tree_c0(), <able_->tree_bytes, ltable_->max_c0_size, <able_->flushing, 100, <able_->rb_mut); #endif - const int64_t min_bloom_target = ltable->max_c0_size; + const int64_t min_bloom_target = ltable_->max_c0_size; //create a new tree - diskTreeComponent * c1_prime = new diskTreeComponent(xid, ltable->internal_region_size, ltable->datapage_region_size, ltable->datapage_size, stats, (stats->target_size < min_bloom_target ? min_bloom_target : stats->target_size) / 100); + diskTreeComponent * c1_prime = new diskTreeComponent(xid, ltable_->internal_region_size, ltable_->datapage_region_size, ltable_->datapage_size, stats, (stats->target_size < min_bloom_target ? min_bloom_target : stats->target_size) / 100); - ltable->set_tree_c1_prime(c1_prime); + ltable_->set_tree_c1_prime(c1_prime); - rwlc_unlock(ltable->header_mut); + rwlc_unlock(ltable_->header_mut); #ifndef NO_SNOWSHOVEL // needs to be past the rwlc_unlock... memTreeComponent::batchedRevalidatingIterator *itrB = - new memTreeComponent::batchedRevalidatingIterator(ltable->get_tree_c0(), <able->tree_bytes, ltable->max_c0_size, <able->flushing, 100, <able->rb_mut); + new memTreeComponent::batchedRevalidatingIterator(ltable_->get_tree_c0(), <able_->tree_bytes, ltable_->max_c0_size, <able_->flushing, 100, <able_->rb_mut); #endif //: do the merge DEBUG("mmt:\tMerging:\n"); - merge_iterators(xid, c1_prime, itrA, itrB, ltable, c1_prime, stats, false); + merge_iterators(xid, c1_prime, itrA, itrB, ltable_, c1_prime, stats, false); delete itrA; delete itrB; // 5: force c1' - rwlc_writelock(ltable->header_mut); + rwlc_writelock(ltable_->header_mut); //force write the new tree to disk c1_prime->force(xid); @@ -208,24 +157,24 @@ void* memMergeThread(void*arg) // first, we need to move the c1' into c1. // 12: delete old c1 - ltable->get_tree_c1()->dealloc(xid); - delete ltable->get_tree_c1(); + ltable_->get_tree_c1()->dealloc(xid); + delete ltable_->get_tree_c1(); // 10: c1 = c1' - ltable->set_tree_c1(c1_prime); - ltable->set_tree_c1_prime(0); + ltable_->set_tree_c1(c1_prime); + ltable_->set_tree_c1_prime(0); #ifdef NO_SNOWSHOVEL // 11.5: delete old c0_mergeable - memTreeComponent::tearDownTree(ltable->get_tree_c0_mergeable()); + memTreeComponent::tearDownTree(ltable_->get_tree_c0_mergeable()); // 11: c0_mergeable = NULL - ltable->set_tree_c0_mergeable(NULL); + ltable_->set_tree_c0_mergeable(NULL); #endif - ltable->set_c0_is_merging(false); + ltable_->set_c0_is_merging(false); double new_c1_size = stats->output_size(); - pthread_cond_signal(<able->c0_needed); + pthread_cond_signal(<able_->c0_needed); - ltable->update_persistent_header(xid, 1); + ltable_->update_persistent_header(xid, 1); Tcommit(xid); //TODO: this is simplistic for now @@ -233,28 +182,28 @@ void* memMergeThread(void*arg) // update c0 effective size. double frac = 1.0/(double)merge_count; - ltable->num_c0_mergers = merge_count; - ltable->mean_c0_effective_size = + ltable_->num_c0_mergers = merge_count; + ltable_->mean_c0_effective_size = (int64_t) ( - ((double)ltable->mean_c0_effective_size)*(1-frac) + + ((double)ltable_->mean_c0_effective_size)*(1-frac) + ((double)stats->bytes_in_small*frac)); - ltable->merge_mgr->get_merge_stats(0)->target_size = ltable->mean_c0_effective_size; - double target_R = *ltable->R(); + ltable_->merge_mgr->get_merge_stats(0)->target_size = ltable_->mean_c0_effective_size; + double target_R = *ltable_->R(); - printf("Merge done. R = %f MemSize = %lld Mean = %lld, This = %lld, Count = %d factor %3.3fcur%3.3favg\n", target_R, (long long)ltable->max_c0_size, (long long int)ltable->mean_c0_effective_size, stats->bytes_in_small, merge_count, ((double)stats->bytes_in_small) / (double)ltable->max_c0_size, ((double)ltable->mean_c0_effective_size) / (double)ltable->max_c0_size); + printf("Merge done. R = %f MemSize = %lld Mean = %lld, This = %lld, Count = %d factor %3.3fcur%3.3favg\n", target_R, (long long)ltable_->max_c0_size, (long long int)ltable_->mean_c0_effective_size, stats->bytes_in_small, merge_count, ((double)stats->bytes_in_small) / (double)ltable_->max_c0_size, ((double)ltable_->mean_c0_effective_size) / (double)ltable_->max_c0_size); assert(target_R >= MIN_R); - bool signal_c2 = (new_c1_size / ltable->mean_c0_effective_size > target_R); + bool signal_c2 = (new_c1_size / ltable_->mean_c0_effective_size > target_R); DEBUG("\nc1 size %f R %f\n", new_c1_size, target_R); if( signal_c2 ) { DEBUG("mmt:\tsignaling C2 for merge\n"); DEBUG("mmt:\tnew_c1_size %.2f\tMAX_C0_SIZE %lld\ta->max_size %lld\t targetr %.2f \n", new_c1_size, - ltable->max_c0_size, a->max_size, target_R); + ltable_->max_c0_size, a->max_size, target_R); // XXX need to report backpressure here! - while(ltable->get_tree_c1_mergeable()) { - rwlc_cond_wait(<able->c1_needed, ltable->header_mut); + while(ltable_->get_tree_c1_mergeable()) { + rwlc_cond_wait(<able_->c1_needed, ltable_->header_mut); } xid = Tbegin(); @@ -262,27 +211,27 @@ void* memMergeThread(void*arg) // we just set c1 = c1'. Want to move c1 -> c1 mergeable, clean out c1. // 7: and perhaps c1_mergeable - ltable->set_tree_c1_mergeable(ltable->get_tree_c1()); // c1_prime == c1. + ltable_->set_tree_c1_mergeable(ltable_->get_tree_c1()); // c1_prime == c1. stats->handed_off_tree(); // 8: c1 = new empty. - ltable->set_tree_c1(new diskTreeComponent(xid, ltable->internal_region_size, ltable->datapage_region_size, ltable->datapage_size, stats)); + ltable_->set_tree_c1(new diskTreeComponent(xid, ltable_->internal_region_size, ltable_->datapage_region_size, ltable_->datapage_size, stats)); - pthread_cond_signal(<able->c1_ready); + pthread_cond_signal(<able_->c1_ready); pageid_t old_bytes_out = stats->bytes_out; stats->bytes_out = 0; // XXX HACK - ltable->update_persistent_header(xid, 1); + ltable_->update_persistent_header(xid, 1); stats->bytes_out = old_bytes_out; Tcommit(xid); } -// DEBUG("mmt:\tUpdated C1's position on disk to %lld\n",ltable->get_tree_c1()->get_root_rec().page); +// DEBUG("mmt:\tUpdated C1's position on disk to %lld\n",ltable_->get_tree_c1()->get_root_rec().page); // 13 - rwlc_unlock(ltable->header_mut); + rwlc_unlock(ltable_->header_mut); - ltable->merge_mgr->finished_merge(1); + ltable_->merge_mgr->finished_merge(1); // stats->pretty_print(stdout); //TODO: get the freeing outside of the lock @@ -293,43 +242,42 @@ void* memMergeThread(void*arg) } -void *diskMergeThread(void*arg) +void * merge_scheduler::diskMergeThread() { int xid; - logtable * ltable = (logtable*)arg; - assert(ltable->get_tree_c2()); + assert(ltable_->get_tree_c2()); int merge_count =0; - mergeStats * stats = ltable->merge_mgr->get_merge_stats(2); + mergeStats * stats = ltable_->merge_mgr->get_merge_stats(2); while(true) { // 2: wait for input - rwlc_writelock(ltable->header_mut); - ltable->merge_mgr->new_merge(2); + rwlc_writelock(ltable_->header_mut); + ltable_->merge_mgr->new_merge(2); int done = 0; // get a new input for merge - while(!ltable->get_tree_c1_mergeable()) + while(!ltable_->get_tree_c1_mergeable()) { - pthread_cond_signal(<able->c1_needed); + pthread_cond_signal(<able_->c1_needed); - if(!ltable->is_still_running()){ + if(!ltable_->is_still_running()){ done = 1; break; } DEBUG("dmt:\twaiting for block ready cond\n"); - rwlc_cond_wait(<able->c1_ready, ltable->header_mut); + rwlc_cond_wait(<able_->c1_ready, ltable_->header_mut); DEBUG("dmt:\tblock ready\n"); } if(done==1) { - rwlc_unlock(ltable->header_mut); + rwlc_unlock(ltable_->header_mut); break; } @@ -340,23 +288,23 @@ void *diskMergeThread(void*arg) // 4: do the merge. //create the iterators - diskTreeComponent::iterator *itrA = ltable->get_tree_c2()->open_iterator(); + diskTreeComponent::iterator *itrA = ltable_->get_tree_c2()->open_iterator(); #ifdef NO_SNOWSHOVEL - diskTreeComponent::iterator *itrB = ltable->get_tree_c1_mergeable()->open_iterator(); + diskTreeComponent::iterator *itrB = ltable_->get_tree_c1_mergeable()->open_iterator(); #else - diskTreeComponent::iterator *itrB = ltable->get_tree_c1_mergeable()->open_iterator(<able->merge_mgr->cur_c1_c2_progress_delta, 0.05, <able->shutting_down_); + diskTreeComponent::iterator *itrB = ltable_->get_tree_c1_mergeable()->open_iterator(<able_->merge_mgr->cur_c1_c2_progress_delta, 0.05, <able_->shutting_down_); #endif //create a new tree - diskTreeComponent * c2_prime = new diskTreeComponent(xid, ltable->internal_region_size, ltable->datapage_region_size, ltable->datapage_size, stats, (ltable->max_c0_size * *ltable->R() + stats->base_size)/ 1000); -// diskTreeComponent * c2_prime = new diskTreeComponent(xid, ltable->internal_region_size, ltable->datapage_region_size, ltable->datapage_size, stats); + diskTreeComponent * c2_prime = new diskTreeComponent(xid, ltable_->internal_region_size, ltable_->datapage_region_size, ltable_->datapage_size, stats, (ltable_->max_c0_size * *ltable_->R() + stats->base_size)/ 1000); +// diskTreeComponent * c2_prime = new diskTreeComponent(xid, ltable_->internal_region_size, ltable_->datapage_region_size, ltable_->datapage_size, stats); - rwlc_unlock(ltable->header_mut); + rwlc_unlock(ltable_->header_mut); //do the merge DEBUG("dmt:\tMerging:\n"); - merge_iterators(xid, c2_prime, itrA, itrB, ltable, c2_prime, stats, true); + merge_iterators(xid, c2_prime, itrA, itrB, ltable_, c2_prime, stats, true); delete itrA; delete itrB; @@ -366,15 +314,15 @@ void *diskMergeThread(void*arg) // (skip 6, 7, 8, 8.5, 9)) - rwlc_writelock(ltable->header_mut); + rwlc_writelock(ltable_->header_mut); //12 - ltable->get_tree_c2()->dealloc(xid); - delete ltable->get_tree_c2(); + ltable_->get_tree_c2()->dealloc(xid); + delete ltable_->get_tree_c2(); //11.5 - ltable->get_tree_c1_mergeable()->dealloc(xid); + ltable_->get_tree_c1_mergeable()->dealloc(xid); //11 - delete ltable->get_tree_c1_mergeable(); - ltable->set_tree_c1_mergeable(0); + delete ltable_->get_tree_c1_mergeable(); + ltable_->set_tree_c1_mergeable(0); //writes complete //now atomically replace the old c2 with new c2 @@ -382,23 +330,23 @@ void *diskMergeThread(void*arg) merge_count++; //update the current optimal R value - *(ltable->R()) = std::max(MIN_R, sqrt( ((double)stats->output_size()) / ((double)ltable->mean_c0_effective_size) ) ); + *(ltable_->R()) = std::max(MIN_R, sqrt( ((double)stats->output_size()) / ((double)ltable_->mean_c0_effective_size) ) ); - DEBUG("\nR = %f\n", *(ltable->R())); + DEBUG("\nR = %f\n", *(ltable_->R())); DEBUG("dmt:\tmerge_count %lld\t#written bytes: %lld\n optimal r %.2f", stats.merge_count, stats.output_size(), *(a->r_i)); // 10: C2 is never too big - ltable->set_tree_c2(c2_prime); + ltable_->set_tree_c2(c2_prime); stats->handed_off_tree(); DEBUG("dmt:\tUpdated C2's position on disk to %lld\n",(long long)-1); // 13 - ltable->update_persistent_header(xid, 2); + ltable_->update_persistent_header(xid, 2); Tcommit(xid); - rwlc_unlock(ltable->header_mut); + rwlc_unlock(ltable_->header_mut); // stats->pretty_print(stdout); - ltable->merge_mgr->finished_merge(2); + ltable_->merge_mgr->finished_merge(2); } @@ -413,14 +361,14 @@ static void periodically_force(int xid, int *i, diskTreeComponent * forceMe, sta } } -static int garbage_collect(logtable * ltable, datatuple ** garbage, int garbage_len, int next_garbage, bool force = false) { +static int garbage_collect(logtable * ltable_, datatuple ** garbage, int garbage_len, int next_garbage, bool force = false) { if(next_garbage == garbage_len || force) { - pthread_mutex_lock(<able->rb_mut); + pthread_mutex_lock(<able_->rb_mut); for(int i = 0; i < next_garbage; i++) { datatuple * t2tmp = NULL; { - memTreeComponent::rbtree_t::iterator rbitr = ltable->get_tree_c0()->find(garbage[i]); - if(rbitr != ltable->get_tree_c0()->end()) { + memTreeComponent::rbtree_t::iterator rbitr = ltable_->get_tree_c0()->find(garbage[i]); + if(rbitr != ltable_->get_tree_c0()->end()) { t2tmp = *rbitr; if((t2tmp->datalen() == garbage[i]->datalen()) && !memcmp(t2tmp->data(), garbage[i]->data(), garbage[i]->datalen())) { @@ -431,13 +379,13 @@ static int garbage_collect(logtable * ltable, datatuple ** garbage, i } } // close rbitr before touching the tree. if(t2tmp) { - ltable->get_tree_c0()->erase(garbage[i]); - ltable->tree_bytes -= garbage[i]->byte_length(); + ltable_->get_tree_c0()->erase(garbage[i]); + ltable_->tree_bytes -= garbage[i]->byte_length(); datatuple::freetuple(t2tmp); } datatuple::freetuple(garbage[i]); } - pthread_mutex_unlock(<able->rb_mut); + pthread_mutex_unlock(<able_->rb_mut); return 0; } else { return next_garbage; diff --git a/merger.h b/merger.h index 9206455..02b7534 100644 --- a/merger.h +++ b/merger.h @@ -8,33 +8,22 @@ #undef try #undef end -//TODO: 400 bytes overhead per tuple, this is nuts, check if this is true... -static const int RB_TREE_OVERHEAD = 400; -static const double MIN_R = 3.0; - -struct logtable_mergedata -{ - //merge threads - pthread_t diskmerge_thread; - pthread_t memmerge_thread; -}; - -class merge_scheduler -{ - std::vector *, logtable_mergedata*> > mergedata; - +class merge_scheduler { public: - ~merge_scheduler(); + merge_scheduler(logtable * ltable); + ~merge_scheduler(); - int addlogtable(logtable * ltable); - void startlogtable(int index, int64_t MAX_C0_SIZE = 100*1024*1024); + void start(); + void shutdown(); - struct logtable_mergedata *getMergeData(int index){return mergedata[index].second;} + void * memMergeThread(); + void * diskMergeThread(); - void shutdown(); +private: + pthread_t mem_merge_thread_; + pthread_t disk_merge_thread_; + logtable * ltable_; + const double MIN_R; }; -void* memMergeThread(void* arg); -void* diskMergeThread(void* arg); - #endif diff --git a/newserver.cpp b/newserver.cpp index 418ab98..5e74709 100644 --- a/newserver.cpp +++ b/newserver.cpp @@ -21,7 +21,6 @@ int main(int argc, char *argv[]) int xid = Tbegin(); - merge_scheduler * mscheduler = new merge_scheduler; logtable ltable; @@ -51,11 +50,9 @@ int main(int argc, char *argv[]) } Tcommit(xid); - - int lindex = mscheduler->addlogtable(<able); - ltable.setMergeData(mscheduler->getMergeData(lindex)); - - mscheduler->startlogtable(lindex, c0_size); + ltable.set_max_c0_size(c0_size); + merge_scheduler * mscheduler = new merge_scheduler(<able); + mscheduler->start(); simpleServer *lserver = new simpleServer(<able); diff --git a/server.cpp b/server.cpp index c7127b1..8e99538 100644 --- a/server.cpp +++ b/server.cpp @@ -54,7 +54,6 @@ int main(int argc, char *argv[]) int xid = Tbegin(); - mscheduler = new merge_scheduler; logtable ltable; @@ -72,8 +71,6 @@ int main(int argc, char *argv[]) Tcommit(xid); - int lindex = mscheduler->addlogtable(<able); - ltable.setMergeData(mscheduler->getMergeData(lindex)); int64_t c0_size = 1024 * 1024 * 512 * 1; @@ -89,7 +86,9 @@ int main(int argc, char *argv[]) printf("note: running w/ 2GB c0 for benchmarking"); // XXX build a separate test server and deployment server? } - mscheduler->startlogtable(lindex, c0_size); + ltable.set_max_c0_size(c0_size); + mscheduler = new merge_scheduler(<able); + mscheduler->start(); lserver = new logserver(100, 32432); diff --git a/test/check_gen.cpp b/test/check_gen.cpp index 574ae61..0ee8191 100644 --- a/test/check_gen.cpp +++ b/test/check_gen.cpp @@ -18,21 +18,21 @@ int main(int argc, char **argv) int xid = Tbegin(); - logtable ltable(1000, 10000, 5); + logtable *ltable = new logtable(1000, 10000, 5); - recordid table_root = ltable.allocTable(xid); + recordid table_root = ltable->allocTable(xid); Tcommit(xid); xid = Tbegin(); RegionAllocator * ro_alloc = new RegionAllocator(); - diskTreeComponent::internalNodes::iterator * it = new diskTreeComponent::internalNodes::iterator(xid,ro_alloc, ltable.get_tree_c2()->get_root_rid() ); + diskTreeComponent::internalNodes::iterator * it = new diskTreeComponent::internalNodes::iterator(xid,ro_alloc, ltable->get_tree_c2()->get_root_rid() ); it->close(); delete it; delete ro_alloc; Tcommit(xid); - + delete ltable; logtable::deinit_stasis(); diff --git a/test/check_merge.cpp b/test/check_merge.cpp index 836e090..5020846 100644 --- a/test/check_merge.cpp +++ b/test/check_merge.cpp @@ -50,17 +50,15 @@ void insertProbeIter(size_t NUM_ENTRIES) int xid = Tbegin(); - merge_scheduler mscheduler; - logtable ltable(1000, 10000, 5); + logtable * ltable = new logtable(1000, 10000, 5); + ltable->set_max_c0_size(10 * 1024 * 1024); + merge_scheduler mscheduler(ltable); - recordid table_root = ltable.allocTable(xid); + recordid table_root = ltable->allocTable(xid); Tcommit(xid); - int lindex = mscheduler.addlogtable(<able); - ltable.setMergeData(mscheduler.getMergeData(lindex)); - - mscheduler.startlogtable(lindex, 10 * 1024 * 1024); + mscheduler.start(); printf("Stage 1: Writing %llu keys\n", (unsigned long long)NUM_ENTRIES); @@ -85,7 +83,7 @@ void insertProbeIter(size_t NUM_ENTRIES) datasize += newtuple->byte_length(); gettimeofday(&ti_st,0); - ltable.insertTuple(newtuple); + ltable->insertTuple(newtuple); gettimeofday(&ti_end,0); insert_time += tv_to_double(ti_end) - tv_to_double(ti_st); @@ -122,7 +120,7 @@ void insertProbeIter(size_t NUM_ENTRIES) //rkey[keylen-1]='\0'; //find the key with the given tuple - datatuple *dt = ltable.findTuple(xid, rkey, keylen); + datatuple *dt = ltable->findTuple(xid, rkey, keylen); assert(dt!=0); //if(dt!=0) @@ -150,6 +148,7 @@ void insertProbeIter(size_t NUM_ENTRIES) Tcommit(xid); + delete ltable; logtable::deinit_stasis(); } diff --git a/test/check_mergelarge.cpp b/test/check_mergelarge.cpp index 75bcd34..e3dc0ea 100644 --- a/test/check_mergelarge.cpp +++ b/test/check_mergelarge.cpp @@ -44,17 +44,15 @@ void insertProbeIter(size_t NUM_ENTRIES) int xid = Tbegin(); - merge_scheduler mscheduler; - logtable ltable(1000, 10000, 100); + logtable *ltable = new logtable(1000, 10000, 100); + ltable->set_max_c0_size(10*1024*1024); + merge_scheduler mscheduler(ltable); - recordid table_root = ltable.allocTable(xid); + recordid table_root = ltable->allocTable(xid); Tcommit(xid); - int lindex = mscheduler.addlogtable(<able); - ltable.setMergeData(mscheduler.getMergeData(lindex)); - - mscheduler.startlogtable(lindex, 10 * 1024 * 1024); + mscheduler.start(); printf("Stage 1: Writing %llu keys\n", (unsigned long long)NUM_ENTRIES); @@ -75,7 +73,7 @@ void insertProbeIter(size_t NUM_ENTRIES) datasize += newtuple->byte_length(); gettimeofday(&ti_st,0); - ltable.insertTuple(newtuple); + ltable->insertTuple(newtuple); gettimeofday(&ti_end,0); insert_time += tv_to_double(ti_end) - tv_to_double(ti_st); @@ -90,10 +88,10 @@ void insertProbeIter(size_t NUM_ENTRIES) printf("datasize: %llu\n", (unsigned long long)datasize); mscheduler.shutdown(); + delete ltable; printf("merge threads finished.\n"); gettimeofday(&stop_tv,0); printf("run time: %6.1f\n", (tv_to_double(stop_tv) - tv_to_double(start_tv))); - logtable::deinit_stasis(); } diff --git a/test/check_mergetuple.cpp b/test/check_mergetuple.cpp index 57d5116..d0608c9 100644 --- a/test/check_mergetuple.cpp +++ b/test/check_mergetuple.cpp @@ -102,16 +102,15 @@ void insertProbeIter(size_t NUM_ENTRIES) int xid = Tbegin(); - merge_scheduler mscheduler; - logtable ltable(1000, 1000, 40); + logtable *ltable = new logtable(1000, 1000, 40); + ltable->set_max_c0_size(10 * 1024 * 1024); + merge_scheduler mscheduler(ltable); - recordid table_root = ltable.allocTable(xid); + recordid table_root = ltable->allocTable(xid); Tcommit(xid); - int lindex = mscheduler.addlogtable(<able); - ltable.setMergeData(mscheduler.getMergeData(lindex)); - mscheduler.startlogtable(lindex, 10 * 1024 * 1024); + mscheduler.start(); printf("Stage 1: Writing %llu keys\n", (unsigned long long)NUM_ENTRIES); @@ -134,7 +133,7 @@ void insertProbeIter(size_t NUM_ENTRIES) datasize += newtuple->byte_length(); gettimeofday(&ti_st,0); - ltable.insertTuple(newtuple); + ltable->insertTuple(newtuple); gettimeofday(&ti_end,0); insert_time += tv_to_double(ti_end) - tv_to_double(ti_st); @@ -151,7 +150,7 @@ void insertProbeIter(size_t NUM_ENTRIES) datatuple *deltuple = datatuple::create((*key_arr)[del_index].c_str(), (*key_arr)[del_index].length()+1); gettimeofday(&ti_st,0); - ltable.insertTuple(deltuple); + ltable->insertTuple(deltuple); gettimeofday(&ti_end,0); insert_time += tv_to_double(ti_end) - tv_to_double(ti_st); @@ -172,7 +171,7 @@ void insertProbeIter(size_t NUM_ENTRIES) datatuple *uptuple = datatuple::create((*key_arr)[up_index].c_str(), (*key_arr)[up_index].length()+1, ditem.c_str(), ditem.length()+1); gettimeofday(&ti_st,0); - ltable.insertTuple(uptuple); + ltable->insertTuple(uptuple); gettimeofday(&ti_end,0); insert_time += tv_to_double(ti_end) - tv_to_double(ti_st); @@ -207,7 +206,7 @@ void insertProbeIter(size_t NUM_ENTRIES) memcpy((byte*)rkey, (*key_arr)[ri].c_str(), keylen); //find the key with the given tuple - datatuple *dt = ltable.findTuple(xid, rkey, keylen); + datatuple *dt = ltable->findTuple(xid, rkey, keylen); if(std::find(del_list.begin(), del_list.end(), i) == del_list.end()) { @@ -248,6 +247,7 @@ void insertProbeIter(size_t NUM_ENTRIES) Tcommit(xid); + delete ltable; logtable::deinit_stasis(); } diff --git a/test/check_testAndSet.cpp b/test/check_testAndSet.cpp index 1398d47..df909ca 100644 --- a/test/check_testAndSet.cpp +++ b/test/check_testAndSet.cpp @@ -27,7 +27,8 @@ #define NUM_THREADS 128 unsigned char vals[NUM_THREADS]; -logtable* ltbl; + +logtable * ltable; int myucharcmp(const void * ap, const void * bp) { unsigned char a = *(unsigned char*)ap; @@ -43,7 +44,7 @@ void * worker(void * idp) { printf("id = %d key = %d\n", (int)id, (int)key); datatuple * dt = datatuple::create(&key, sizeof(key), &id, sizeof(id)); datatuple * dtdelete = datatuple::create(&key, sizeof(key)); - succ = ltbl->testAndSetTuple(dt, dtdelete); + succ = ltable->testAndSetTuple(dt, dtdelete); datatuple::freetuple(dt); datatuple::freetuple(dtdelete); vals[id] = key; @@ -60,18 +61,16 @@ void insertProbeIter(size_t NUM_ENTRIES) logtable::init_stasis(); int xid = Tbegin(); - merge_scheduler mscheduler; - logtable ltable(1000, 10000, 5); - ltbl = <able; + ltable = new logtable(1000, 10000, 5); + ltable->set_max_c0_size(10*1024*1024); - recordid table_root = ltable.allocTable(xid); + merge_scheduler mscheduler(ltable); + + recordid table_root = ltable->allocTable(xid); Tcommit(xid); - int lindex = mscheduler.addlogtable(<able); - ltable.setMergeData(mscheduler.getMergeData(lindex)); - - mscheduler.startlogtable(lindex, 10 * 1024 * 1024); + mscheduler.start(); pthread_t *threads = (pthread_t*)malloc(NUM_THREADS * sizeof(pthread_t)); @@ -92,6 +91,7 @@ void insertProbeIter(size_t NUM_ENTRIES) } mscheduler.shutdown(); + delete ltable; logtable::deinit_stasis(); printf("\npass\n");