From 9dcb624649322300b9fa54a5c92d537b04e7e505 Mon Sep 17 00:00:00 2001 From: sears Date: Wed, 14 Jul 2010 21:46:27 +0000 Subject: [PATCH] improved bps estimation; add "minisleeps" to update_progress; they do not belong there long term, but this is better than nothing git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@929 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe --- mergeManager.cpp | 49 ++++++++++++++++++++++++++++++++++++++---------- mergeManager.h | 1 + mergeStats.h | 10 ++++++---- 3 files changed, 46 insertions(+), 14 deletions(-) diff --git a/mergeManager.cpp b/mergeManager.cpp index 49e09c6..b14a31b 100644 --- a/mergeManager.cpp +++ b/mergeManager.cpp @@ -9,6 +9,7 @@ #include "mergeStats.h" #include "logstore.h" #include "math.h" +#include "time.h" mergeStats* mergeManager:: get_merge_stats(int mergeLevel) { if (mergeLevel == 0) { return c0; @@ -50,6 +51,27 @@ void mergeManager::set_c0_size(int64_t size) { void mergeManager::update_progress(mergeStats * s, int delta) { s->delta += delta; + s->mini_delta += delta; + { + if(s->merge_level < 2 && s->mergeable_size && delta) { + int64_t effective_max_delta = (int64_t)(UPDATE_PROGRESS_PERIOD * s->bps); + + if(s->mini_delta > effective_max_delta) { + struct timeval now; + gettimeofday(&now, 0); + double now_double = tv_to_double(&now); + double elapsed_delta = now_double - ts_to_double(&s->last_mini_tick); + double slp = UPDATE_PROGRESS_PERIOD - elapsed_delta; + if(slp > 0.001) { + struct timespec sleeptime; + double_to_ts(&sleeptime, slp); + nanosleep(&sleeptime, 0); + } + double_to_ts(&s->last_mini_tick, now_double); + s->mini_delta = 0; + } + } + } if((!delta) || s->delta > UPDATE_PROGRESS_DELTA) { if(delta) { rwlc_writelock(ltable->header_mut); @@ -74,18 +96,21 @@ void mergeManager::update_progress(mergeStats * s, int delta) { struct timeval now; gettimeofday(&now, 0); double elapsed_delta = tv_to_double(&now) - ts_to_double(&s->last_tick); - + if(elapsed_delta < 0.0000001) { elapsed_delta = 0.0000001; } s->lifetime_elapsed += elapsed_delta; s->lifetime_consumed += s->bytes_in_small_delta; - double decay = 0.9999; // XXX set this in some principled way. Surely, it should depend on tick_length (once that's working...) - s->window_elapsed = (decay * s->window_elapsed) + elapsed_delta; - s->window_consumed = (decay * s->window_consumed) + s->bytes_in_small_delta; + double tau = 60.0; // number of seconds to look back for window computation. (this is the expected mean residence time in an exponential decay model, so the units are not so intuitive...) + double decay = exp((0.0-elapsed_delta)/tau); + // s->window_elapsed = (decay * s->window_elapsed) + elapsed_delta; + // s->window_consumed = (decay * s->window_consumed) + s->bytes_in_small_delta; double_to_ts(&s->last_tick, tv_to_double(&now)); - s->bytes_in_small_delta = 0; + double window_bps = ((double)s->bytes_in_small_delta) / (double)elapsed_delta; - s->bps = s->window_consumed / s->window_elapsed; + s->bps = (1.0-decay) * window_bps + decay * s->bps; //s->window_consumed / s->window_elapsed; + + s->bytes_in_small_delta = 0; if(delta) rwlc_unlock(ltable->header_mut); @@ -115,6 +140,9 @@ void mergeManager::update_progress(mergeStats * s, int delta) { */ void mergeManager::tick(mergeStats * s, bool block, bool force) { #define PRINT_SKIP 100 + if(block) { + // sleep(((double)delta)/[s+1]->bps); // XXX We currently sleep based on the past performance of the current tree. In the limit, this is fine, but it would be better to sleep based on the past throughput of the tree component we're waiting for. fill in the parameters + } if(force || s->need_tick) { if(block) { @@ -139,7 +167,7 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) { int64_t overshoot_fudge = (int64_t)((s->out_progress-skew) * ((double)FORCE_INTERVAL)/(1.0-skew)); /* model the effect of amortizing this computation: we could become this much more overshot if we don't act now. */ - int64_t overshoot_fudge2 = UPDATE_PROGRESS_DELTA; + int64_t overshoot_fudge2 = UPDATE_PROGRESS_DELTA; //(int64_t)(((double)UPDATE_PROGRESS_PERIOD) * s->bps / 1000.0); /* multiply by 2 for good measure. These are 'soft' walls, and still let writes trickle through. Once we've exausted the fudge factors, we'll hit a hard wall, and stop writes @@ -292,6 +320,7 @@ void mergeManager::finished_merge(int merge_level) { } mergeManager::mergeManager(logtable *ltable): + UPDATE_PROGRESS_PERIOD(0.005), ltable(ltable), c0(new mergeStats(0, ltable ? ltable->max_c0_size : 10000000)), c1(new mergeStats(1, (int64_t)(ltable ? ((double)(ltable->max_c0_size) * *ltable->R()) : 100000000.0) )), @@ -335,13 +364,13 @@ void mergeManager::pretty_print(FILE * out) { assert((!c2->active) || (c1_c2_progress >= -1 && c1_c2_progress < 102)); fprintf(out,"[merge progress MB/s window (lifetime)]: app [%s %6lldMB ~ %3.0f%% %6.1fsec %4.1f (%4.1f)] %s %s [%s %3.0f%% ~ %3.0f%% %4.1f (%4.1f)] %s %s [%s %3.0f%% %4.1f (%4.1f)] %s ", - c0->active ? "RUN" : "---", (uint64_t)(c0->lifetime_consumed / mb), c0_out_progress, c0->lifetime_elapsed, c0->window_consumed/(((double)mb)*c0->window_elapsed), c0->lifetime_consumed/(((double)mb)*c0->lifetime_elapsed), + c0->active ? "RUN" : "---", (uint64_t)(c0->lifetime_consumed / mb), c0_out_progress, c0->lifetime_elapsed, c0->bps/((double)mb), c0->lifetime_consumed/(((double)mb)*c0->lifetime_elapsed), have_c0 ? "C0" : "..", have_c0m ? "C0'" : "...", - c1->active ? "RUN" : "---", c0_c1_in_progress, c0_c1_out_progress, c1->window_consumed/(((double)mb)*c1->window_elapsed), c1->lifetime_consumed/(((double)mb)*c1->lifetime_elapsed), + c1->active ? "RUN" : "---", c0_c1_in_progress, c0_c1_out_progress, c1->bps/((double)mb), c1->lifetime_consumed/(((double)mb)*c1->lifetime_elapsed), have_c1 ? "C1" : "..", have_c1m ? "C1'" : "...", - c2->active ? "RUN" : "---", c1_c2_progress, c2->window_consumed/(((double)mb)*c2->window_elapsed), c2->lifetime_consumed/(((double)mb)*c2->lifetime_elapsed), + c2->active ? "RUN" : "---", c1_c2_progress, c2->bps/((double)mb), c2->lifetime_consumed/(((double)mb)*c2->lifetime_elapsed), have_c2 ? "C2" : ".."); //#define PP_SIZES #ifdef PP_SIZES diff --git a/mergeManager.h b/mergeManager.h index b45e8a8..112cb13 100644 --- a/mergeManager.h +++ b/mergeManager.h @@ -22,6 +22,7 @@ class mergeStats; class mergeManager { public: static const int UPDATE_PROGRESS_DELTA = 10 * 1024 * 1024; + const double UPDATE_PROGRESS_PERIOD; // in seconds, defined in constructor. static const int FORCE_INTERVAL = 25 * 1024 * 1024; static double tv_to_double(struct timeval * tv) { return (double)tv->tv_sec + ((double)tv->tv_usec)/1000000.0; diff --git a/mergeStats.h b/mergeStats.h index 4427001..b1a2def 100644 --- a/mergeStats.h +++ b/mergeStats.h @@ -38,19 +38,20 @@ class mergeStats { bytes_in_large(0), num_tuples_in_large(0), just_handed_off(false), + mini_delta(0), delta(0), need_tick(0), in_progress(0), out_progress(0), lifetime_elapsed(0), lifetime_consumed(0), - window_elapsed(0.001), - window_consumed(0), + bps(10.0*1024.0*1024.0), print_skipped(0), active(false) { gettimeofday(&sleep,0); gettimeofday(&last,0); mergeManager::double_to_ts(&last_tick, mergeManager::tv_to_double(&last)); + mergeManager::double_to_ts(&last_mini_tick, mergeManager::tv_to_double(&last)); pthread_mutex_init(&mut,0); } ~mergeStats() { @@ -82,6 +83,7 @@ class mergeStats { gettimeofday(&start, 0); gettimeofday(&last, 0); mergeManager::double_to_ts(&last_tick, mergeManager::tv_to_double(&last)); + mergeManager::double_to_ts(&last_mini_tick, mergeManager::tv_to_double(&last)); } void handed_off_tree() { @@ -113,6 +115,7 @@ class mergeStats { } friend class mergeManager; + struct timespec last_mini_tick; struct timespec last_tick; public: // XXX only accessed during initialization. pageid_t base_size; @@ -135,6 +138,7 @@ class mergeStats { bool just_handed_off; + int mini_delta; int delta; int need_tick; double in_progress; @@ -142,8 +146,6 @@ class mergeStats { double lifetime_elapsed; double lifetime_consumed; - double window_elapsed; - double window_consumed; double bps;