From 0ecd7491c745f8b0ff0f4a86aa17f821e32222c9 Mon Sep 17 00:00:00 2001 From: sears Date: Sat, 5 Jun 2010 00:41:52 +0000 Subject: [PATCH] fix minor stats bugs; pull most stats computation into update_progress. The idea is that update_progress and tick, which are easily amortized, are the only stats things that need to grab latches git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@827 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe --- logstore.cpp | 2 +- mergeManager.cpp | 91 +++++++++++++++++++++++++++++------------------- mergeManager.h | 4 ++- mergeStats.h | 17 +++++---- merger.cpp | 12 +++---- 5 files changed, 77 insertions(+), 49 deletions(-) diff --git a/logstore.cpp b/logstore.cpp index aecc3e7..0e65111 100644 --- a/logstore.cpp +++ b/logstore.cpp @@ -437,7 +437,7 @@ void logtable::insertTuple(datatuple *tuple) { datatuple *pre_t = *rbitr; //do the merging - c0_stats->read_tuple_from_large_component(pre_t); + merge_mgr->read_tuple_from_large_component(0, pre_t); datatuple *new_t = tmerger->merge(pre_t, tuple); c0_stats->merged_tuples(new_t, tuple, pre_t); t = new_t; diff --git a/mergeManager.cpp b/mergeManager.cpp index 20d4d3b..1372382 100644 --- a/mergeManager.cpp +++ b/mergeManager.cpp @@ -51,6 +51,28 @@ void mergeManager::set_c0_size(int64_t size) { c0->target_size = size; } +void mergeManager::update_progress(mergeStats * s, int delta) { + s->delta += delta; + if((!delta) || s->delta > 512 * 1024) { + s->delta = 0; + if(delta) s->need_tick = true; + if(s->merge_level != 0) { + if(s->active) { + s->in_progress = ((double)(s->bytes_in_large + s->bytes_in_small)) / (double)(get_merge_stats(s->merge_level-1)->mergeable_size + s->base_size); + } else { + s->in_progress = 0; + } + } + if(s->merge_level != 2) { + if(s->mergeable_size) { + s->out_progress = ((double)s->current_size) / (double)s->target_size; + } else { + s->out_progress = 0; + } + } + } +} + /** * This function is invoked periodically by the merge threads. It updates mergeManager's statistics, and applies * backpressure as necessary. @@ -72,17 +94,20 @@ void mergeManager::set_c0_size(int64_t size) { * * bytes_consumed_by_merger = sum(bytes_in_small_delta) */ -void mergeManager::tick(mergeStats * s, bool block) { +void mergeManager::tick(mergeStats * s, bool block, bool force) { #define PRINT_SKIP 20 - pageid_t tick_length_bytes = 64*1024; // probably lower than it could be for production machines. 256KB leads to whining on my dev box. + pageid_t tick_length_bytes = 256*1024; // probably lower than it could be for production machines. 256KB leads to whining on my dev box. // if(s->bytes_in_small_delta > tick_length_bytes) { pageid_t new_current_size = s->base_size + s->bytes_out - s->bytes_in_large; - if(((!block) && (new_current_size - s->current_size > tick_length_bytes)) || - (block && s->bytes_in_small_delta > tick_length_bytes)) { // other than R, these are protected by a mutex, but this is the only thread that can write them +// if(force || ((!block) && (new_current_size - s->current_size > tick_length_bytes)) || +// (block && s->bytes_in_small_delta > tick_length_bytes)) { // other than R, these are protected by a mutex, but this is the only thread that can write them + + if(force || s->need_tick) { + if(block) { s->need_tick = false; } + s->current_size = new_current_size; // xxx move to update_progress - s->current_size = new_current_size; // s->base_size + s->bytes_out - s->bytes_in_large; if(block) { while(sleeping[s->merge_level]) { @@ -93,6 +118,8 @@ void mergeManager::tick(mergeStats * s, bool block) { double elapsed_delta = tv_to_double(&now) - ts_to_double(&s->last_tick); double bps = 0; // = (double)s->bytes_in_small_delta / (double)elapsed_delta; + // xxx move bps stuff to update_progress + s->lifetime_elapsed += elapsed_delta; s->lifetime_consumed += s->bytes_in_small_delta; double decay = 0.9999; // XXX set this in some principled way. Surely, it should depend on tick_length (once that's working...) @@ -105,48 +132,24 @@ void mergeManager::tick(mergeStats * s, bool block) { int64_t overshoot = 0; int64_t raw_overshoot = 0; - int64_t overshoot_fudge = (int64_t)(((((double)s->current_size) / (double)(s->target_size)) - 0.5) * 2.0*1024.0*1024.0); // XXX set based on avg / max tuple size? + int64_t overshoot_fudge = (int64_t)(((((double)s->current_size) / (double)(s->target_size)) - 0.5) * 4.0*1024.0*1024.0); // XXX set based on avg / max tuple size? int spin = 0; double total_sleep = 0.0; do{ overshoot = 0; raw_overshoot = 0; - - double c0_out_progress, c0_c1_out_progress; - if(c0->mergeable_size) { - c0_out_progress = ((double)c0->current_size) / (double)c0->target_size; - } else { - c0_out_progress = 0; // don't throttle if our consumer is blocked on us. - } - if(c1->mergeable_size) { - c0_c1_out_progress = ((double)c1->current_size) / (double)c1->target_size; - } else { - c0_c1_out_progress = 0; - } - - double c0_c1_in_progress, c1_c2_in_progress; - if(c1->active) { - c0_c1_in_progress = ((double)(c1->bytes_in_large + c1->bytes_in_small)) / (double)(c0->mergeable_size + c1->base_size); - } else { - c0_c1_in_progress = 0; // if our consumer is not active, it hasn't made any progress on our most recent output - } - if(c2->active) { - c1_c2_in_progress = ((double)(c2->bytes_in_large + c2->bytes_in_small)) / (double)(c1->mergeable_size + c2->base_size); - } else { - c1_c2_in_progress = 0; - } - + // more bps stuff for update_progress double c0_c1_bps = c1->window_consumed / c1->window_elapsed; double c1_c2_bps = c2->window_consumed / c2->window_elapsed; if(s->merge_level == 0) { if(!(c1->active && c0->mergeable_size)) { overshoot_fudge = 0; } - raw_overshoot = (int64_t)(((double)c0->target_size) * (c0_out_progress - c0_c1_in_progress)); + raw_overshoot = (int64_t)(((double)c0->target_size) * (c0->out_progress - c1->in_progress)); overshoot = raw_overshoot + overshoot_fudge; bps = c0_c1_bps; } else if (s->merge_level == 1) { if(!(c2->active && c1->mergeable_size)) { overshoot_fudge = 0; } - raw_overshoot = (int64_t)(((double)c1->target_size) * (c0_c1_out_progress - c1_c2_in_progress)); + raw_overshoot = (int64_t)(((double)c1->target_size) * (c1->out_progress - c2->in_progress)); overshoot = raw_overshoot + overshoot_fudge; bps = c1_c2_bps; } @@ -161,7 +164,7 @@ void mergeManager::tick(mergeStats * s, bool block) { } else { s->print_skipped++; } - if(overshoot > 0) { + if(overshoot > 0 && (raw_overshoot > 0 || total_sleep < 0.5)) { // throttle // it took "elapsed" seconds to process "tick_length_bytes" mb double sleeptime = 2.0 * (double)overshoot / bps; @@ -187,8 +190,10 @@ void mergeManager::tick(mergeStats * s, bool block) { sleeping[s->merge_level] = false; pthread_cond_broadcast(&throttle_wokeup_cond); gettimeofday(&now, 0); + } else { + break; } - } while((overshoot > 0) && (raw_overshoot > 0)); + } while(1); } else { if(s->print_skipped == PRINT_SKIP) { pretty_print(stdout); @@ -207,24 +212,40 @@ void mergeManager::read_tuple_from_small_component(int merge_level, datatuple * (s->num_tuples_in_small)++; (s->bytes_in_small_delta) += tup->byte_length(); (s->bytes_in_small) += tup->byte_length(); + update_progress(s, tup->byte_length()); tick(s, true); } } +void mergeManager::read_tuple_from_large_component(int merge_level, datatuple * tup) { + if(tup) { + mergeStats * s = get_merge_stats(merge_level); + s->num_tuples_in_large++; + s->bytes_in_large += tup->byte_length(); +// tick(s, false); // would be no-op; we just reduced current_size. + update_progress(s, tup->byte_length()); + } +} + void mergeManager::wrote_tuple(int merge_level, datatuple * tup) { mergeStats * s = get_merge_stats(merge_level); (s->num_tuples_out)++; (s->bytes_out) += tup->byte_length(); // XXX this just updates stat's current size, and (perhaps) does a pretty print. It should not need a mutex. + update_progress(s, tup->byte_length()); tick(s, false); } void mergeManager::finished_merge(int merge_level) { + update_progress(get_merge_stats(merge_level), 0); + tick(get_merge_stats(merge_level), false, true); // XXX what does this do??? get_merge_stats(merge_level)->active = false; if(merge_level != 0) { get_merge_stats(merge_level - 1)->mergeable_size = 0; + update_progress(get_merge_stats(merge_level-1), 0); } gettimeofday(&get_merge_stats(merge_level)->done, 0); + update_progress(get_merge_stats(merge_level), 0); } mergeManager::mergeManager(logtable *ltable): diff --git a/mergeManager.h b/mergeManager.h index 3bd35ed..7c38ee0 100644 --- a/mergeManager.h +++ b/mergeManager.h @@ -40,9 +40,11 @@ public: void new_merge(int mergelevel); void set_c0_size(int64_t size); - void tick(mergeStats * s, bool block); + void update_progress(mergeStats *s, int delta); + void tick(mergeStats * s, bool block, bool force = false); mergeStats* get_merge_stats(int mergeLevel); void read_tuple_from_small_component(int merge_level, datatuple * tup); + void read_tuple_from_large_component(int merge_level, datatuple * tup); void wrote_tuple(int merge_level, datatuple * tup); void finished_merge(int merge_level); void pretty_print(FILE * out); diff --git a/mergeStats.h b/mergeStats.h index 248a56b..89d0da0 100644 --- a/mergeStats.h +++ b/mergeStats.h @@ -38,6 +38,10 @@ class mergeStats { bytes_in_large(0), num_tuples_in_large(0), just_handed_off(false), + delta(0), + need_tick(false), + in_progress(0), + out_progress(0), lifetime_elapsed(0), lifetime_consumed(0), window_elapsed(0.001), @@ -51,6 +55,7 @@ class mergeStats { void new_merge2() { if(just_handed_off) { bytes_out = 0; + out_progress = 0; just_handed_off = false; } base_size = bytes_out; @@ -65,6 +70,7 @@ class mergeStats { num_tuples_in_small = 0; bytes_in_large = 0; num_tuples_in_large = 0; + in_progress = 0; gettimeofday(&sleep,0); } void starting_merge() { @@ -81,12 +87,6 @@ class mergeStats { just_handed_off = true; } } - void read_tuple_from_large_component(datatuple * tup) { - if(tup) { - num_tuples_in_large++; - bytes_in_large += tup->byte_length(); - } - } void merged_tuples(datatuple * merged, datatuple * small, datatuple * large) { } void wrote_datapage(DataPage *dp) { @@ -128,6 +128,11 @@ class mergeStats { bool just_handed_off; + int delta; + bool need_tick; + double in_progress; + double out_progress; + double lifetime_elapsed; double lifetime_consumed; double window_elapsed; diff --git a/merger.cpp b/merger.cpp index 7d76c07..1e9f44c 100644 --- a/merger.cpp +++ b/merger.cpp @@ -105,7 +105,7 @@ void merge_iterators(int xid, diskTreeComponent * forceMe, ITB *itrB, logtable *ltable, diskTreeComponent *scratch_tree, - mergeStats *stats, + mergeStats * stats, bool dropDeletes); @@ -406,7 +406,7 @@ void merge_iterators(int xid, ITA *itrA, //iterator on c1 or c2 ITB *itrB, //iterator on c0 or c1, respectively logtable *ltable, - diskTreeComponent *scratch_tree, mergeStats *stats, + diskTreeComponent *scratch_tree, mergeStats * stats, bool dropDeletes // should be true iff this is biggest component ) { @@ -414,7 +414,7 @@ void merge_iterators(int xid, rwlc_writelock(ltable->header_mut); // XXX slow datatuple *t1 = itrA->next_callerFrees(); - stats->read_tuple_from_large_component(t1); + ltable->merge_mgr->read_tuple_from_large_component(stats->merge_level, t1); rwlc_unlock(ltable->header_mut); // XXX slow datatuple *t2 = 0; @@ -440,7 +440,7 @@ void merge_iterators(int xid, //advance itrA t1 = itrA->next_callerFrees(); if(t1) { - stats->read_tuple_from_large_component(t1); + ltable->merge_mgr->read_tuple_from_large_component(stats->merge_level, t1); } periodically_force(xid, &i, forceMe, log); rwlc_unlock(ltable->header_mut); // XXX slow @@ -461,7 +461,7 @@ void merge_iterators(int xid, ltable->merge_mgr->wrote_tuple(stats->merge_level, mtuple); t1 = itrA->next_callerFrees(); //advance itrA if(t1) { - stats->read_tuple_from_large_component(t1); + ltable->merge_mgr->read_tuple_from_large_component(stats->merge_level, t1); } datatuple::freetuple(mtuple); periodically_force(xid, &i, forceMe, log); @@ -491,7 +491,7 @@ void merge_iterators(int xid, //advance itrA t1 = itrA->next_callerFrees(); - stats->read_tuple_from_large_component(t1); + ltable->merge_mgr->read_tuple_from_large_component(stats->merge_level, t1); periodically_force(xid, &i, forceMe, log); rwlc_unlock(ltable->header_mut); // XXX slow rwlc_writelock(ltable->header_mut); // XXX slow