From eda514ddaac4e2f9ba9342247b04cccd9ada1f67 Mon Sep 17 00:00:00 2001 From: sears Date: Wed, 2 Jun 2010 21:47:58 +0000 Subject: [PATCH] amortize the computation of statistics (but not mutex acuqisitions) across many tuple insertions. Also, refactor stats stuff with latch contention in mind (but it still grabs too many latches...). This commit causes check_tcpclient to emit warnings on my machine. Not sure if it will happen on production hardware git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@824 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe --- logstore.cpp | 10 ++++---- mergeManager.cpp | 65 +++++++++++++++++++++++++++++++++++------------- mergeManager.h | 15 +++++------ mergeStats.h | 46 +++++++++------------------------- merger.cpp | 18 +++++++------- 5 files changed, 82 insertions(+), 72 deletions(-) diff --git a/logstore.cpp b/logstore.cpp index e883b39..8db788d 100644 --- a/logstore.cpp +++ b/logstore.cpp @@ -52,7 +52,7 @@ logtable::logtable(pageid_t internal_region_size, pageid_t datapage_regio this->datapage_size = datapage_size; c0_stats = merge_mgr->get_merge_stats(0); - c0_stats->new_merge(); + merge_mgr->new_merge(0); c0_stats->starting_merge(); } @@ -149,7 +149,7 @@ void logtable::flushTable() int expmcount = merge_count; - c0_stats->finished_merge(); + merge_mgr->finished_merge(0); //this is for waiting the previous merger of the mem-tree //hopefullly this wont happen @@ -165,7 +165,7 @@ void logtable::flushTable() } c0_stats->handed_off_tree(); - c0_stats->new_merge(); + merge_mgr->new_merge(0); gettimeofday(&stop_tv,0); stop = tv_to_double(stop_tv); @@ -428,7 +428,7 @@ void logtable::insertTuple(datatuple *tuple) { rwlc_writelock(header_mut); // XXX want this to be a readlock, but tick, and the stats need it to be a writelock for now... //lock the red-black tree - c0_stats->read_tuple_from_small_component(tuple); // has to be before rb_mut, since it calls tick with block = true, and that releases header_mut. + merge_mgr->read_tuple_from_small_component(0, tuple); // has to be before rb_mut, since it calls tick with block = true, and that releases header_mut. pthread_mutex_lock(&rb_mut); //find the previous tuple with same key in the memtree if exists memTreeComponent::rbtree_t::iterator rbitr = tree_c0->find(tuple); @@ -463,7 +463,7 @@ void logtable::insertTuple(datatuple *tuple) } - c0_stats->wrote_tuple(t); + merge_mgr->wrote_tuple(0, t); pthread_mutex_unlock(&rb_mut); diff --git a/mergeManager.cpp b/mergeManager.cpp index 93f4bf0..59bd755 100644 --- a/mergeManager.cpp +++ b/mergeManager.cpp @@ -32,7 +32,8 @@ mergeManager::~mergeManager() { delete c2; } -void mergeManager::new_merge(mergeStats * s) { +void mergeManager::new_merge(int mergeLevel) { + mergeStats * s = get_merge_stats(mergeLevel); pthread_mutex_lock(&mut); if(s->merge_level == 0) { // target_size was set during startup @@ -44,6 +45,7 @@ void mergeManager::new_merge(mergeStats * s) { // target_size is infinity... } else { abort(); } pthread_mutex_unlock(&mut); + s->new_merge2(); } void mergeManager::set_c0_size(int64_t size) { c0->target_size = size; @@ -71,12 +73,16 @@ void mergeManager::set_c0_size(int64_t size) { * bytes_consumed_by_merger = sum(bytes_in_small_delta) */ void mergeManager::tick(mergeStats * s, bool block) { -#define PRINT_SKIP 0 //10000 - pageid_t tick_length_bytes = 10*1024; +#define PRINT_SKIP 20 + pageid_t tick_length_bytes = 128*1024; // probably lower than it could be for production machines. 256KB leads to whining on my dev box. - if(true || s->bytes_in_small_delta > tick_length_bytes) { +// if(s->bytes_in_small_delta > tick_length_bytes) { + pageid_t new_current_size = s->base_size + s->bytes_out - s->bytes_in_large; - s->current_size = s->base_size + s->bytes_out - s->bytes_in_large; + if(((!block) && (new_current_size - s->current_size > tick_length_bytes)) || + (block && s->bytes_in_small_delta > tick_length_bytes)) { // other than R, these are protected by a mutex, but this is the only thread that can write them + + s->current_size = new_current_size; // s->base_size + s->bytes_out - s->bytes_in_large; if(block) { while(sleeping[s->merge_level]) { @@ -149,11 +155,11 @@ void mergeManager::tick(mergeStats * s, bool block) { #ifdef PP_THREAD_INFO printf("\nMerge thread %d %6f %6f Overshoot: raw=%lld, d=%lld eff=%lld Throttle min(1, %6f) spin %d, total_sleep %6.3f\n", s->merge_level, c0_out_progress, c0_c1_in_progress, raw_overshoot, overshoot_fudge, overshoot, -1.0, spin, total_sleep); #endif - if(print_skipped == PRINT_SKIP) { + if(s->print_skipped == PRINT_SKIP) { pretty_print(stdout); - print_skipped = 0; + s->print_skipped = 0; } else { - print_skipped++; + s->print_skipped++; } if(overshoot > 0) { // throttle @@ -184,22 +190,48 @@ void mergeManager::tick(mergeStats * s, bool block) { } } while((overshoot > 0) && (raw_overshoot > 0)); } else { - if(print_skipped == PRINT_SKIP) { + if(s->print_skipped == PRINT_SKIP) { pretty_print(stdout); - print_skipped = 0; + s->print_skipped = 0; } else { - print_skipped++; + s->print_skipped++; } } // pthread_mutex_unlock(&mut); } } +void mergeManager::read_tuple_from_small_component(int merge_level, datatuple * tup) { + if(tup) { + mergeStats * s = get_merge_stats(merge_level); + (s->num_tuples_in_small)++; + (s->bytes_in_small_delta) += tup->byte_length(); + (s->bytes_in_small) += tup->byte_length(); + tick(s, true); + } +} +void mergeManager::wrote_tuple(int merge_level, datatuple * tup) { + mergeStats * s = get_merge_stats(merge_level); + (s->num_tuples_out)++; + (s->bytes_out) += tup->byte_length(); + + // XXX this just updates stat's current size, and (perhaps) does a pretty print. It should not need a mutex. + tick(s, false); +} + +void mergeManager::finished_merge(int merge_level) { + get_merge_stats(merge_level)->active = false; + if(merge_level != 0) { + get_merge_stats(merge_level - 1)->mergeable_size = 0; + } + gettimeofday(&get_merge_stats(merge_level)->done, 0); +} + mergeManager::mergeManager(logtable *ltable): ltable(ltable), - c0(new mergeStats(this, 0, ltable->max_c0_size)), - c1(new mergeStats(this, 1, (int64_t)(((double)ltable->max_c0_size) * *ltable->R()))), - c2(new mergeStats(this, 2, 0)) { + c0(new mergeStats(0, ltable->max_c0_size)), + c1(new mergeStats(1, (int64_t)(((double)ltable->max_c0_size) * *ltable->R()))), + c2(new mergeStats(2, 0)) { pthread_mutex_init(&mut, 0); pthread_mutex_init(&throttle_mut, 0); pthread_mutex_init(&dummy_throttle_mut, 0); @@ -210,7 +242,6 @@ mergeManager::mergeManager(logtable *ltable): sleeping[0] = false; sleeping[1] = false; sleeping[2] = false; - print_skipped = 0; double_to_ts(&c0->last_tick, tv_to_double(&tv)); double_to_ts(&c1->last_tick, tv_to_double(&tv)); double_to_ts(&c2->last_tick, tv_to_double(&tv)); @@ -251,8 +282,8 @@ void mergeManager::pretty_print(FILE * out) { have_c1m ? "C1'" : "...", c2->active ? "RUN" : "---", c1_c2_progress, c2->window_consumed/(((double)mb)*c2->window_elapsed), c2->lifetime_consumed/(((double)mb)*c2->lifetime_elapsed), have_c2 ? "C2" : ".."); -#define PP_SIZES - #ifdef PP_SIZES +//#define PP_SIZES +#ifdef PP_SIZES fprintf(out, "[size in small/large, out, mergeable] C0 %4lld %4lld %4lld %4lld %4lld %4lld ", c0->target_size/mb, c0->current_size/mb, c0->bytes_in_small/mb, c0->bytes_in_large/mb, c0->bytes_out/mb, c0->mergeable_size/mb); diff --git a/mergeManager.h b/mergeManager.h index 220f515..3bd35ed 100644 --- a/mergeManager.h +++ b/mergeManager.h @@ -17,18 +17,17 @@ template class logtable; - class mergeStats; class mergeManager { public: - double tv_to_double(struct timeval * tv) { + static double tv_to_double(struct timeval * tv) { return (double)tv->tv_sec + ((double)tv->tv_usec)/1000000.0; } - double ts_to_double(struct timespec * ts) { + static double ts_to_double(struct timespec * ts) { return (double)ts->tv_sec + ((double)ts->tv_nsec)/1000000000.0; } - void double_to_ts(struct timespec *ts, double time) { + static void double_to_ts(struct timespec *ts, double time) { ts->tv_sec = (time_t)(time); ts->tv_nsec = (long)((time - (double)ts->tv_sec) * 1000000000.0); } @@ -39,11 +38,14 @@ public: ~mergeManager(); - void new_merge(mergeStats * s); + void new_merge(int mergelevel); void set_c0_size(int64_t size); void tick(mergeStats * s, bool block); - void pretty_print(FILE * out); mergeStats* get_merge_stats(int mergeLevel); + void read_tuple_from_small_component(int merge_level, datatuple * tup); + void wrote_tuple(int merge_level, datatuple * tup); + void finished_merge(int merge_level); + void pretty_print(FILE * out); private: pthread_mutex_t mut; @@ -60,7 +62,6 @@ private: pthread_cond_t dummy_throttle_cond; pthread_cond_t throttle_wokeup_cond; bool sleeping[3]; - int print_skipped; }; #endif /* MERGEMANAGER_H_ */ diff --git a/mergeStats.h b/mergeStats.h index 5a67995..248a56b 100644 --- a/mergeStats.h +++ b/mergeStats.h @@ -16,12 +16,12 @@ #include #include "datatuple.h" #include "datapage.h" -#include "mergeManager.h" + +#include // XXX for double_to_ts, etc... create a util class. class mergeStats { public: - mergeStats(mergeManager* merge_mgr, int merge_level, int64_t target_size) : - merge_mgr(merge_mgr), + mergeStats(int merge_level, int64_t target_size) : merge_level(merge_level), merge_count(0), base_size(0), @@ -42,14 +42,13 @@ class mergeStats { lifetime_consumed(0), window_elapsed(0.001), window_consumed(0), + print_skipped(0), active(false) { gettimeofday(&sleep,0); gettimeofday(&last,0); - merge_mgr->double_to_ts(&last_tick, merge_mgr->tv_to_double(&last)); + mergeManager::double_to_ts(&last_tick, mergeManager::tv_to_double(&last)); } - void new_merge() { - merge_mgr->new_merge(this); - + void new_merge2() { if(just_handed_off) { bytes_out = 0; just_handed_off = false; @@ -72,7 +71,7 @@ class mergeStats { active = true; gettimeofday(&start, 0); gettimeofday(&last, 0); - merge_mgr->double_to_ts(&last_tick, merge_mgr->tv_to_double(&last)); + mergeManager::double_to_ts(&last_tick, mergeManager::tv_to_double(&last)); } void handed_off_tree() { @@ -82,36 +81,14 @@ class mergeStats { just_handed_off = true; } } - void finished_merge() { - active = false; - if(merge_level == 1) { - merge_mgr->get_merge_stats(0)->mergeable_size = 0; - } else if(merge_level == 2) { - merge_mgr->get_merge_stats(1)->mergeable_size = 0; - } - gettimeofday(&done, 0); - } void read_tuple_from_large_component(datatuple * tup) { if(tup) { num_tuples_in_large++; bytes_in_large += tup->byte_length(); } } - void read_tuple_from_small_component(datatuple * tup) { - if(tup) { - num_tuples_in_small++; - bytes_in_small_delta += tup->byte_length(); - bytes_in_small += tup->byte_length(); - merge_mgr->tick(this, true); - } - } void merged_tuples(datatuple * merged, datatuple * small, datatuple * large) { } - void wrote_tuple(datatuple * tup) { - num_tuples_out++; - bytes_out += tup->byte_length(); - merge_mgr->tick(this, false); - } void wrote_datapage(DataPage *dp) { num_datapages_out++; bytes_out_with_overhead += (PAGE_SIZE * dp->get_page_count()); @@ -119,9 +96,8 @@ class mergeStats { pageid_t output_size() { return bytes_out; } + const int merge_level; // 1 => C0->C1, 2 => C1->C2 protected: - mergeManager* merge_mgr; - int merge_level; // 1 => C0->C1, 2 => C1->C2 pageid_t merge_count; // This is the merge_count'th merge struct timeval sleep; // When did we go to sleep waiting for input? struct timeval start; // When did we wake up and start merging? (at steady state with max throughput, this should be equal to sleep) @@ -132,13 +108,13 @@ class mergeStats { return ((double)tv.tv_sec) + ((double)tv.tv_usec) / 1000000.0; } friend class mergeManager; -protected: + struct timespec last_tick; pageid_t base_size; pageid_t target_size; pageid_t current_size; - pageid_t mergeable_size; + pageid_t mergeable_size; // protected by mutex. pageid_t bytes_out_with_overhead;// How many bytes did we write (including internal tree nodes)? pageid_t bytes_out; // How many bytes worth of tuples did we write? @@ -157,6 +133,8 @@ protected: double window_elapsed; double window_consumed; + int print_skipped; // used by pretty print in mergeManager. + bool active; public: diff --git a/merger.cpp b/merger.cpp index a7a2c41..7d76c07 100644 --- a/merger.cpp +++ b/merger.cpp @@ -146,7 +146,7 @@ void* memMergeThread(void*arg) while(true) // 1 { rwlc_writelock(ltable->header_mut); - stats->new_merge(); + ltable->merge_mgr->new_merge(1); int done = 0; // 2: wait for c0_mergable while(!ltable->get_tree_c0_mergeable()) @@ -230,7 +230,7 @@ void* memMergeThread(void*arg) ltable->update_persistent_header(xid); Tcommit(xid); - stats->finished_merge(); + ltable->merge_mgr->finished_merge(1); //TODO: this is simplistic for now //6: if c1' is too big, signal the other merger @@ -299,7 +299,7 @@ void *diskMergeThread(void*arg) // 2: wait for input rwlc_writelock(ltable->header_mut); - stats->new_merge(); + ltable->merge_mgr->new_merge(2); int done = 0; // get a new input for merge while(!ltable->get_tree_c1_mergeable()) @@ -381,7 +381,7 @@ void *diskMergeThread(void*arg) ltable->update_persistent_header(xid); Tcommit(xid); - stats->finished_merge(); + ltable->merge_mgr->finished_merge(2); rwlc_unlock(ltable->header_mut); // stats->pretty_print(stdout); @@ -423,7 +423,7 @@ void merge_iterators(int xid, rwlc_writelock(ltable->header_mut); // XXX slow while( (t2=itrB->next_callerFrees()) != 0) { - stats->read_tuple_from_small_component(t2); + ltable->merge_mgr->read_tuple_from_small_component(stats->merge_level, t2); rwlc_unlock(ltable->header_mut); // XXX slow DEBUG("tuple\t%lld: keylen %d datalen %d\n", @@ -435,7 +435,7 @@ void merge_iterators(int xid, //insert t1 scratch_tree->insertTuple(xid, t1); i+=t1->byte_length(); - stats->wrote_tuple(t1); + ltable->merge_mgr->wrote_tuple(stats->merge_level, t1); datatuple::freetuple(t1); //advance itrA t1 = itrA->next_callerFrees(); @@ -458,7 +458,7 @@ void merge_iterators(int xid, i+=mtuple->byte_length(); } datatuple::freetuple(t1); - stats->wrote_tuple(mtuple); + ltable->merge_mgr->wrote_tuple(stats->merge_level, mtuple); t1 = itrA->next_callerFrees(); //advance itrA if(t1) { stats->read_tuple_from_large_component(t1); @@ -474,7 +474,7 @@ void merge_iterators(int xid, scratch_tree->insertTuple(xid, t2); i+=t2->byte_length(); - stats->wrote_tuple(t2); + ltable->merge_mgr->wrote_tuple(stats->merge_level, t2); periodically_force(xid, &i, forceMe, log); rwlc_unlock(ltable->header_mut); // XXX slow // cannot free any tuples here; they may still be read through a lookup @@ -485,7 +485,7 @@ void merge_iterators(int xid, while(t1 != 0) {// t1 is less than t2 scratch_tree->insertTuple(xid, t1); - stats->wrote_tuple(t1); + ltable->merge_mgr->wrote_tuple(stats->merge_level, t1); i += t1->byte_length(); datatuple::freetuple(t1);