From 73e72b47b28a0a2cd09bca866e409178cce44956 Mon Sep 17 00:00:00 2001 From: sears Date: Tue, 14 Dec 2010 01:49:23 +0000 Subject: [PATCH] more stats cleanups git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@1493 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe --- diskTreeComponent.cpp | 16 +++++++++------- diskTreeComponent.h | 8 ++++---- logstore.cpp | 4 +--- logstore.h | 2 +- mergeManager.cpp | 33 +++++++++++++++++++++------------ mergeManager.h | 10 +++++----- mergeStats.h | 6 +++--- merger.cpp | 16 ++++++++-------- 8 files changed, 52 insertions(+), 43 deletions(-) diff --git a/diskTreeComponent.cpp b/diskTreeComponent.cpp index 27618a7..6c2b442 100644 --- a/diskTreeComponent.cpp +++ b/diskTreeComponent.cpp @@ -903,10 +903,10 @@ void diskTreeComponent::iterator::init_iterators(datatuple * key1, datatuple * k } } -diskTreeComponent::iterator::iterator(diskTreeComponent::internalNodes *tree, double* cur_progress_delta, double target_progress_delta, bool * flushing) : +diskTreeComponent::iterator::iterator(diskTreeComponent::internalNodes *tree, mergeManager * mgr, double target_progress_delta, bool * flushing) : ro_alloc_(new RegionAllocator()), tree_(tree ? tree->get_root_rec() : NULLRID), - cur_progress_delta_(cur_progress_delta), + mgr_(mgr), target_progress_delta_(target_progress_delta), flushing_(flushing) { @@ -917,7 +917,7 @@ diskTreeComponent::iterator::iterator(diskTreeComponent::internalNodes *tree, do diskTreeComponent::iterator::iterator(diskTreeComponent::internalNodes *tree, datatuple* key) : ro_alloc_(new RegionAllocator()), tree_(tree ? tree->get_root_rec() : NULLRID), - cur_progress_delta_(NULL), + mgr_(NULL), target_progress_delta_(0.0), flushing_(NULL) { @@ -1006,12 +1006,14 @@ datatuple * diskTreeComponent::iterator::next_callerFrees() // else readTuple is null. We're done. } - if(readTuple && cur_progress_delta_) { - // *cur_progress_delta is how far ahead we are, as a fraction of the total merge. - while(*cur_progress_delta_ > target_progress_delta_ && ((!flushing_) || (! *flushing_))) { // TODO: how to pick this threshold? + if(readTuple && mgr_) { + // c1_c2_progress_delta() is c1's out progress - c2's in progress. We want to stop processing c2 if we are too far ahead (ie; c2 >> c1; delta << 0). + while(mgr_->c1_c2_progress_delta() < -target_progress_delta_ && ((!flushing_) || (! *flushing_))) { // TODO: how to pick this threshold? + DEBUG("Input is too far behind. Delta is %f\n", mgr_->c1_c2_progress_delta()); struct timespec ts; - mergeManager::double_to_ts(&ts, 0.1); + mergeManager::double_to_ts(&ts, 0.01); nanosleep(&ts, 0); + mgr_->update_progress(mgr_->get_merge_stats(1), 0); } } diff --git a/diskTreeComponent.h b/diskTreeComponent.h index 17bccda..499f5fa 100644 --- a/diskTreeComponent.h +++ b/diskTreeComponent.h @@ -64,8 +64,8 @@ class diskTreeComponent { void writes_done(); - iterator * open_iterator(double* cur_size = NULL, double target_size = 0, bool * flushing = NULL) { - return new iterator(ltree, cur_size, target_size, flushing); + iterator * open_iterator(mergeManager * mgr = NULL, double target_size = 0, bool * flushing = NULL) { + return new iterator(ltree, mgr, target_size, flushing); } iterator * open_iterator(datatuple * key) { if(key != NULL) { @@ -201,7 +201,7 @@ class diskTreeComponent { { public: - explicit iterator(diskTreeComponent::internalNodes *tree, double* cur_size = NULL, double target_size = 0, bool * flushing = NULL); + explicit iterator(diskTreeComponent::internalNodes *tree, mergeManager * mgr = NULL, double target_size = 0, bool * flushing = NULL); explicit iterator(diskTreeComponent::internalNodes *tree,datatuple *key); @@ -219,7 +219,7 @@ class diskTreeComponent { RegionAllocator * ro_alloc_; // has a filehandle that we use to optimize sequential scans. recordid tree_; //root of the tree - double * cur_progress_delta_; + mergeManager * mgr_; double target_progress_delta_; bool * flushing_; diff --git a/logstore.cpp b/logstore.cpp index 99951f0..2ea4eb1 100644 --- a/logstore.cpp +++ b/logstore.cpp @@ -23,7 +23,7 @@ template logtable::logtable(pageid_t max_c0_size, pageid_t internal_region_size, pageid_t datapage_region_size, pageid_t datapage_size) { this->max_c0_size = max_c0_size; - this->mean_c0_effective_size = max_c0_size; + this->mean_c0_run_length = max_c0_size; this->num_c0_mergers = 0; r_val = 3.0; // MIN_R @@ -42,7 +42,6 @@ logtable::logtable(pageid_t max_c0_size, pageid_t internal_region_size, p tmerger = new tuplemerger(&replace_merger); header_mut = rwlc_initlock(); - pthread_mutex_init(&tick_mut, 0); pthread_mutex_init(&rb_mut, 0); pthread_cond_init(&c0_needed, 0); pthread_cond_init(&c0_ready, 0); @@ -75,7 +74,6 @@ logtable::~logtable() } pthread_mutex_destroy(&rb_mut); - pthread_mutex_destroy(&tick_mut); rwlc_deletelock(header_mut); pthread_cond_destroy(&c0_needed); pthread_cond_destroy(&c0_ready); diff --git a/logstore.h b/logstore.h index 4889f38..88498b8 100644 --- a/logstore.h +++ b/logstore.h @@ -112,7 +112,7 @@ public: pthread_mutex_t rb_mut; int64_t max_c0_size; // these track the effectiveness of snowshoveling - int64_t mean_c0_effective_size; + int64_t mean_c0_run_length; int64_t num_c0_mergers; mergeManager * merge_mgr; diff --git a/mergeManager.cpp b/mergeManager.cpp index 292463f..669fa96 100644 --- a/mergeManager.cpp +++ b/mergeManager.cpp @@ -42,7 +42,7 @@ void mergeManager::new_merge(int mergeLevel) { // target_size was set during startup } else if(s->merge_level == 1) { assert(c0->target_size); - c1->target_size = (pageid_t)(*ltable->R() * (double)c0->target_size); + c1->target_size = (pageid_t)(*ltable->R() * (double)ltable->mean_c0_run_length); assert(c1->target_size); } else if(s->merge_level == 2) { // target_size is infinity... @@ -62,8 +62,7 @@ void mergeManager::update_progress(mergeStats * s, int delta) { s->delta = 0; if(!s->need_tick) { s->need_tick = 1; } } - if(s->merge_level == 2 - ) { + if(s->merge_level == 2) { if(s->active) { s->in_progress = ((double)(s->bytes_in_large + s->bytes_in_small)) / (double)(get_merge_stats(s->merge_level-1)->mergeable_size + s->base_size); } else { @@ -71,7 +70,7 @@ void mergeManager::update_progress(mergeStats * s, int delta) { } } else if(s->merge_level == 1) { // C0-C1 merge (c0 is continuously growing...) if(s->active) { - s->in_progress = ((double)(s->bytes_in_large+s->bytes_in_small)) / (double)(s->base_size+ltable->mean_c0_effective_size); + s->in_progress = ((double)(s->bytes_in_large+s->bytes_in_small)) / (double)(s->base_size+ltable->mean_c0_run_length); } else { s->in_progress = 0; } @@ -83,7 +82,12 @@ void mergeManager::update_progress(mergeStats * s, int delta) { s->current_size = s->base_size + s->bytes_out - s->bytes_in_large; } - s->out_progress = ((double)s->current_size) / (double)s->target_size; + s->out_progress = ((double)s->current_size) / ((s->merge_level == 0 ) ? (double)ltable->mean_c0_run_length : (double)s->target_size); + if(c2->active && c1->mergeable_size) { + c1_c2_delta = c1->out_progress - c2->in_progress; + } else { + c1_c2_delta = -0.02; // We try to keep this number between -0.05 and -0.01. + } #if EXTENDED_STATS struct timeval now; @@ -136,16 +140,20 @@ void mergeManager::tick(mergeStats * s) { // Only apply back pressure if next thread is not waiting on us. rwlc_readlock(ltable->header_mut); if(c1->mergeable_size && c2->active) { - double delta = c1->out_progress - c2->in_progress; - rwlc_unlock(ltable->header_mut); - if(delta > -0.01) { + if(c1_c2_delta > -0.01) { + DEBUG("Input is too far ahead. Delta is %f\n", c1_c2_delta); + double delta = c1_c2_delta; + rwlc_unlock(ltable->header_mut); delta += 0.01; // delta > 0; double slp = 0.001 + delta; struct timespec sleeptime; DEBUG("\ndisk sleeping %0.6f tree_megabytes %0.3f\n", slp, ((double)ltable->tree_bytes)/(1024.0*1024.0)); double_to_ts(&sleeptime,slp); nanosleep(&sleeptime, 0); + update_progress(s, 0); s->need_tick = 1; + } else { + rwlc_unlock(ltable->header_mut); } } else { rwlc_unlock(ltable->header_mut); @@ -246,13 +254,14 @@ void * merge_manager_pretty_print_thread(void * arg) { return m->pretty_print_thread(); } +double mergeManager::c1_c2_progress_delta() { + return c1_c2_delta; +} + void mergeManager::init_helper(void) { struct timeval tv; + c1_c2_delta = -0.02; // XXX move this magic number somewhere. It's also in update_progress. gettimeofday(&tv, 0); - sleeping[0] = false; - sleeping[1] = false; - sleeping[2] = false; - cur_c1_c2_progress_delta = c2->in_progress - c1->out_progress; #if EXTENDED_STATS double_to_ts(&c0->stats_last_tick, tv_to_double(&tv)); diff --git a/mergeManager.h b/mergeManager.h index 0cf4f44..57a6c8a 100644 --- a/mergeManager.h +++ b/mergeManager.h @@ -46,6 +46,8 @@ public: void new_merge(int mergelevel); void set_c0_size(int64_t size); void update_progress(mergeStats *s, int delta); + double c1_c2_progress_delta(); + void tick(mergeStats * s); mergeStats* get_merge_stats(int mergeLevel); void read_tuple_from_small_component(int merge_level, datatuple * tup); @@ -60,8 +62,8 @@ public: void pretty_print(FILE * out); void *pretty_print_thread(); - double cur_c1_c2_progress_delta; private: + double c1_c2_delta; void init_helper(void); struct marshalled_header { recordid c0; @@ -69,14 +71,12 @@ private: recordid c2; }; logtable* ltable; - double throttle_seconds; - double last_throttle_seconds; mergeStats * c0; mergeStats * c1; mergeStats * c2; - bool sleeping[3]; + + // The following fields are used to shut down the pretty print thread. bool still_running; - // Needed so that the pretty print thread can be woken up during shutdown. pthread_cond_t pp_cond; pthread_t pp_thread; }; diff --git a/mergeStats.h b/mergeStats.h index cce9f67..eedf03d 100644 --- a/mergeStats.h +++ b/mergeStats.h @@ -143,7 +143,6 @@ class mergeStats { pageid_t output_size() { return bytes_out; } - int merge_level; // 1 => C0->C1, 2 => C1->C2 protected: double float_tv(struct timeval& tv) { @@ -151,14 +150,15 @@ class mergeStats { } friend class mergeManager; - protected: // XXX only accessed during initialization. + protected: struct marshalled_header { int merge_level; pageid_t base_size; pageid_t mergeable_size; pageid_t target_size; // Needed? }; - public: + public: // XXX eliminate protected fields. + int merge_level; // 1 => C0->C1, 2 => C1->C2 pageid_t base_size; // size of table at beginning of merge. for c0, size of table at beginning of current c0-c1 merge round, plus data written since then. (this minus c1->bytes_in_small is the current size) protected: pageid_t mergeable_size; // protected by mutex. diff --git a/merger.cpp b/merger.cpp index 263259c..d6b1b43 100644 --- a/merger.cpp +++ b/merger.cpp @@ -95,7 +95,7 @@ void * merge_scheduler::memMergeThread() { const int64_t min_bloom_target = ltable_->max_c0_size; //create a new tree - diskTreeComponent * c1_prime = new diskTreeComponent(xid, ltable_->internal_region_size, ltable_->datapage_region_size, ltable_->datapage_size, stats, (stats->target_size < min_bloom_target ? min_bloom_target : stats->target_size) / 100); + diskTreeComponent * c1_prime = new diskTreeComponent(xid, ltable_->internal_region_size, ltable_->datapage_region_size, ltable_->datapage_size, stats, (stats->target_size < min_bloom_target ? min_bloom_target : stats->target_size) / 100); ltable_->set_tree_c1_prime(c1_prime); @@ -149,17 +149,17 @@ void * merge_scheduler::memMergeThread() { // update c0 effective size. double frac = 1.0/(double)merge_count; ltable_->num_c0_mergers = merge_count; - ltable_->mean_c0_effective_size = + ltable_->mean_c0_run_length= (int64_t) ( - ((double)ltable_->mean_c0_effective_size)*(1-frac) + + ((double)ltable_->mean_c0_run_length)*(1-frac) + ((double)stats->bytes_in_small*frac)); - ltable_->merge_mgr->get_merge_stats(0)->target_size = ltable_->mean_c0_effective_size; + //ltable_->merge_mgr->get_merge_stats(0)->target_size = ltable_->mean_c0_run_length; } - printf("Merge done. R = %f MemSize = %lld Mean = %lld, This = %lld, Count = %d factor %3.3fcur%3.3favg\n", *ltable_->R(), (long long)ltable_->max_c0_size, (long long int)ltable_->mean_c0_effective_size, stats->bytes_in_small, merge_count, ((double)stats->bytes_in_small) / (double)ltable_->max_c0_size, ((double)ltable_->mean_c0_effective_size) / (double)ltable_->max_c0_size); + printf("Merge done. R = %f MemSize = %lld Mean = %lld, This = %lld, Count = %d factor %3.3fcur%3.3favg\n", *ltable_->R(), (long long)ltable_->max_c0_size, (long long int)ltable_->mean_c0_run_length, stats->bytes_in_small, merge_count, ((double)stats->bytes_in_small) / (double)ltable_->max_c0_size, ((double)ltable_->mean_c0_run_length) / (double)ltable_->max_c0_size); assert(*ltable_->R() >= MIN_R); - bool signal_c2 = (new_c1_size / ltable_->mean_c0_effective_size > *ltable_->R()); + bool signal_c2 = (new_c1_size / ltable_->mean_c0_run_length > *ltable_->R()); DEBUG("\nc1 size %f R %f\n", new_c1_size, *ltable_->R()); if( signal_c2 ) { @@ -252,7 +252,7 @@ void * merge_scheduler::diskMergeThread() // 4: do the merge. //create the iterators diskTreeComponent::iterator *itrA = ltable_->get_tree_c2()->open_iterator(); - diskTreeComponent::iterator *itrB = ltable_->get_tree_c1_mergeable()->open_iterator(<able_->merge_mgr->cur_c1_c2_progress_delta, 0.05, <able_->shutting_down_); + diskTreeComponent::iterator *itrB = ltable_->get_tree_c1_mergeable()->open_iterator(ltable_->merge_mgr, 0.05, <able_->shutting_down_); //create a new tree diskTreeComponent * c2_prime = new diskTreeComponent(xid, ltable_->internal_region_size, ltable_->datapage_region_size, ltable_->datapage_size, stats, (uint64_t)(ltable_->max_c0_size * *ltable_->R() + stats->base_size)/ 1000); @@ -289,7 +289,7 @@ void * merge_scheduler::diskMergeThread() merge_count++; //update the current optimal R value - *(ltable_->R()) = std::max(MIN_R, sqrt( ((double)stats->output_size()) / ((double)ltable_->mean_c0_effective_size) ) ); + *(ltable_->R()) = std::max(MIN_R, sqrt( ((double)stats->output_size()) / ((double)ltable_->mean_c0_run_length) ) ); DEBUG("\nR = %f\n", *(ltable_->R()));