From 052508fb761b2c491657fb7d15f8ee56badaa8dc Mon Sep 17 00:00:00 2001 From: sears Date: Wed, 15 Dec 2010 00:15:59 +0000 Subject: [PATCH] remove mergeStats->current_size field, which is derivable from other fields in mergeStats git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@1508 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe --- logstore.cpp | 9 --------- memTreeComponent.h | 12 ++++++------ mergeManager.cpp | 20 ++++++++------------ mergeStats.h | 14 +++++++++----- merger.cpp | 7 +++++-- 5 files changed, 28 insertions(+), 34 deletions(-) diff --git a/logstore.cpp b/logstore.cpp index 3eb04a1..5d87cd7 100644 --- a/logstore.cpp +++ b/logstore.cpp @@ -188,8 +188,6 @@ void logtable::flushTable() merge_count ++; merge_mgr->get_merge_stats(0)->starting_merge(); - merge_mgr->get_merge_stats(0)->current_size = 0; - if(blocked && stop - start > 1.0) { if(first) { @@ -492,9 +490,6 @@ datatuple * logtable::insertTupleHelper(datatuple *tuple) tree_c0->insert(new_t); //insert the new tuple - //update the tree size (+ new_t size - pre_t size) - merge_mgr->get_merge_stats(0)->current_size += ((int64_t)new_t->byte_length() - (int64_t)pre_t->byte_length()); - } else //no tuple with same key exists in mem-tree { @@ -503,11 +498,7 @@ datatuple * logtable::insertTupleHelper(datatuple *tuple) //insert tuple into the rbtree tree_c0->insert(t); - - merge_mgr->get_merge_stats(0)->current_size += t->byte_length();// + RB_TREE_OVERHEAD; - } - merge_mgr->wrote_tuple(0, t); // needs to be here; doesn't grab a mutex. return pre_t; } diff --git a/memTreeComponent.h b/memTreeComponent.h index f32e027..953a0d6 100644 --- a/memTreeComponent.h +++ b/memTreeComponent.h @@ -3,7 +3,7 @@ #include #include #include // XXX for double_to_ts. - +#include template class memTreeComponent { public: @@ -167,8 +167,8 @@ public: void populate_next_ret(TUPLE *key=NULL) { if(cur_off_ == num_batched_) { if(mut_) pthread_mutex_lock(mut_); - if(cur_size_) { - while(*cur_size_ < (0.8 * (double)target_size_) && ! *flushing_) { // TODO: how to pick this threshold? Too high, and the disk is idle. Too low, and we waste ram. + if(mgr_) { + while(mgr_->get_merge_stats(0)->get_current_size() < (0.8 * (double)target_size_) && ! *flushing_) { // TODO: how to pick this threshold? Too high, and the disk is idle. Too low, and we waste ram. pthread_mutex_unlock(mut_); struct timespec ts; mergeManager::double_to_ts(&ts, 0.1); @@ -186,11 +186,11 @@ public: } public: - batchedRevalidatingIterator( rbtree_t *s, pageid_t* cur_size, int64_t target_size, bool * flushing, int batch_size, pthread_mutex_t * rb_mut ) : s_(s), cur_size_(cur_size), target_size_(target_size), flushing_(flushing), batch_size_(batch_size), num_batched_(batch_size), cur_off_(batch_size), mut_(rb_mut) { + batchedRevalidatingIterator( rbtree_t *s, mergeManager * mgr, int64_t target_size, bool * flushing, int batch_size, pthread_mutex_t * rb_mut ) : s_(s), mgr_(mgr), target_size_(target_size), flushing_(flushing), batch_size_(batch_size), num_batched_(batch_size), cur_off_(batch_size), mut_(rb_mut) { next_ret_ = (TUPLE**)malloc(sizeof(next_ret_[0]) * batch_size_); populate_next_ret(); } - batchedRevalidatingIterator( rbtree_t *s, int batch_size, pthread_mutex_t * rb_mut, TUPLE *&key ) : s_(s), cur_size_(0), target_size_(0), flushing_(0), batch_size_(batch_size), num_batched_(batch_size), cur_off_(batch_size), mut_(rb_mut) { + batchedRevalidatingIterator( rbtree_t *s, int batch_size, pthread_mutex_t * rb_mut, TUPLE *&key ) : s_(s), mgr_(NULL), target_size_(0), flushing_(0), batch_size_(batch_size), num_batched_(batch_size), cur_off_(batch_size), mut_(rb_mut) { next_ret_ = (TUPLE**)malloc(sizeof(next_ret_[0]) * batch_size_); populate_next_ret(key); } @@ -217,7 +217,7 @@ public: rbtree_t *s_; TUPLE ** next_ret_; - pageid_t* cur_size_; // a pointer to the current size of the red-black tree, in bytes. + mergeManager * mgr_; int64_t target_size_; // the low-water size for the tree. If cur_size_ is not null, and *cur_size_ < C * target_size_, we sleep. bool* flushing_; // never block if *flushing is true. int batch_size_; diff --git a/mergeManager.cpp b/mergeManager.cpp index 8321947..268066f 100644 --- a/mergeManager.cpp +++ b/mergeManager.cpp @@ -44,10 +44,11 @@ void mergeManager::new_merge(int mergeLevel) { assert(c0->target_size); c1->target_size = (pageid_t)(*ltable->R() * (double)ltable->mean_c0_run_length); assert(c1->target_size); + s->new_merge2(); } else if(s->merge_level == 2) { // target_size is infinity... + s->new_merge2(); } else { abort(); } - s->new_merge2(); } void mergeManager::set_c0_size(int64_t size) { assert(size); @@ -76,13 +77,7 @@ void mergeManager::update_progress(mergeStats * s, int delta) { } } - if(s->merge_level == 0) { - // ltable manages c0's current size directly. - } else { - s->current_size = s->base_size + s->bytes_out - s->bytes_in_large; - } - - s->out_progress = ((double)s->current_size) / ((s->merge_level == 0 ) ? (double)ltable->mean_c0_run_length : (double)s->target_size); + s->out_progress = ((double)s->get_current_size()) / ((s->merge_level == 0 ) ? (double)ltable->mean_c0_run_length : (double)s->target_size); if(c2->active && c1->mergeable_size) { c1_c2_delta = c1->out_progress - c2->in_progress; } else { @@ -162,16 +157,17 @@ void mergeManager::tick(mergeStats * s) { } else if(s->merge_level == 0) { // Simple backpressure algorithm based on how full C0 is. + pageid_t cur_c0_sz; // Is C0 bigger than is allowed? - while(c0->current_size > ltable->max_c0_size) { // can't use s->current_size, since this is the thread that maintains that number... + while((cur_c0_sz = s->get_current_size()) > ltable->max_c0_size) { // can't use s->current_size, since this is the thread that maintains that number... printf("\nMEMORY OVERRUN!!!! SLEEP!!!!\n"); struct timespec ts; double_to_ts(&ts, 0.1); nanosleep(&ts, 0); } // Linear backpressure model - s->out_progress = ((double)c0->current_size)/((double)ltable->max_c0_size); - double delta = ((double)c0->current_size)/(0.9*(double)ltable->max_c0_size); // 0 <= delta <= 1.111... + s->out_progress = ((double)cur_c0_sz)/((double)ltable->max_c0_size); + double delta = ((double)cur_c0_sz)/(0.9*(double)ltable->max_c0_size); // 0 <= delta <= 1.111... delta -= 1.0; if(delta > 0.00005) { double slp = 0.001 + 5.0 * delta; //0.0015 < slp < 1.112111.. @@ -325,7 +321,7 @@ void mergeManager::pretty_print(FILE * out) { } pageid_t mb = 1024 * 1024; fprintf(out,"[merge progress MB/s window (lifetime)]: app [%s %6lldMB tot %6lldMB cur ~ %3.0f%%/%3.0f%% %6.1fsec %4.1f (%4.1f)] %s %s [%s %3.0f%% ~ %3.0f%% %4.1f (%4.1f)] %s %s [%s %3.0f%% %4.1f (%4.1f)] %s ", - c0->active ? "RUN" : "---", (long long)(c0->stats_lifetime_consumed / mb), (long long)(c0->current_size / mb), 100.0 * c0->out_progress, 100.0 * ((double)c0->current_size)/(double)ltable->max_c0_size, c0->stats_lifetime_elapsed, c0->stats_bps/((double)mb), c0->stats_lifetime_consumed/(((double)mb)*c0->stats_lifetime_elapsed), + c0->active ? "RUN" : "---", (long long)(c0->stats_lifetime_consumed / mb), (long long)(c0->get_current_size() / mb), 100.0 * c0->out_progress, 100.0 * ((double)c0->get_current_size())/(double)ltable->max_c0_size, c0->stats_lifetime_elapsed, c0->stats_bps/((double)mb), c0->stats_lifetime_consumed/(((double)mb)*c0->stats_lifetime_elapsed), have_c0 ? "C0" : "..", have_c0m ? "C0'" : "...", c1->active ? "RUN" : "---", 100.0 * c1->in_progress, 100.0 * c1->out_progress, c1->stats_bps/((double)mb), c1->stats_lifetime_consumed/(((double)mb)*c1->stats_lifetime_elapsed), diff --git a/mergeStats.h b/mergeStats.h index c64d4b1..ffde8cc 100644 --- a/mergeStats.h +++ b/mergeStats.h @@ -32,7 +32,6 @@ class mergeStats { base_size(0), mergeable_size(0), target_size(target_size), - current_size(0), bytes_out(0), bytes_in_small(0), bytes_in_large(0), @@ -70,7 +69,6 @@ class mergeStats { base_size = h.base_size; mergeable_size = h.mergeable_size; target_size = h.target_size; - current_size = 0; bytes_out = base_size; bytes_in_small = 0; bytes_in_large = 0; @@ -100,7 +98,6 @@ class mergeStats { just_handed_off = false; } base_size = bytes_out; - current_size = base_size; bytes_out = 0; bytes_in_small = 0; bytes_in_large = 0; @@ -125,10 +122,18 @@ class mergeStats { mergeManager::double_to_ts(&stats_last_tick, mergeManager::tv_to_double(&last)); #endif } + pageid_t get_current_size() { + if(merge_level == 0) { + return base_size + bytes_in_small - bytes_in_large - bytes_out; + } else { + // s->bytes_out has strange semantics. It's how many bytes our input has written into this tree. + return base_size + bytes_out - bytes_in_large; + } + } void handed_off_tree() { if(merge_level == 2) { } else { - mergeable_size = current_size; + mergeable_size = get_current_size(); just_handed_off = true; } } @@ -164,7 +169,6 @@ class mergeStats { pageid_t mergeable_size; // protected by mutex. public: pageid_t target_size; - pageid_t current_size; protected: pageid_t bytes_out; // How many bytes worth of tuples did we write? public: diff --git a/merger.cpp b/merger.cpp index d14ba31..d779351 100644 --- a/merger.cpp +++ b/merger.cpp @@ -103,7 +103,7 @@ void * merge_scheduler::memMergeThread() { // needs to be past the rwlc_unlock... memTreeComponent::batchedRevalidatingIterator *itrB = - new memTreeComponent::batchedRevalidatingIterator(ltable_->get_tree_c0(), <able_->merge_mgr->get_merge_stats(0)->current_size, ltable_->max_c0_size, <able_->flushing, 100, <able_->rb_mut); + new memTreeComponent::batchedRevalidatingIterator(ltable_->get_tree_c0(), ltable_->merge_mgr, ltable_->max_c0_size, <able_->flushing, 100, <able_->rb_mut); //: do the merge DEBUG("mmt:\tMerging:\n"); @@ -339,7 +339,7 @@ static int garbage_collect(logtable * ltable_, datatuple ** garbage, } // close rbitr before touching the tree. if(t2tmp) { ltable_->get_tree_c0()->erase(garbage[i]); - ltable_->merge_mgr->get_merge_stats(0)->current_size -= garbage[i]->byte_length(); + //ltable_->merge_mgr->get_merge_stats(0)->current_size -= garbage[i]->byte_length(); datatuple::freetuple(t2tmp); } datatuple::freetuple(garbage[i]); @@ -423,6 +423,9 @@ void merge_iterators(int xid, // cannot free any tuples here; they may still be read through a lookup } if(stats->merge_level == 1) { + // We consume tuples from c0 as we read them, so update its stats here. + ltable->merge_mgr->wrote_tuple(0, t2); + next_garbage = garbage_collect(ltable, garbage, garbage_len, next_garbage); garbage[next_garbage] = t2; next_garbage++;