From 8c35a1265a03143226a9a081c95dc85c4e586fd5 Mon Sep 17 00:00:00 2001 From: sears Date: Fri, 18 Jun 2010 23:00:23 +0000 Subject: [PATCH] fix bug in overshoot_fudge computation; reduce number of knobs git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@860 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe --- mergeManager.cpp | 29 ++++++++++++++++------------- mergeManager.h | 2 ++ mergeStats.h | 4 ++-- merger.cpp | 4 +--- 4 files changed, 21 insertions(+), 18 deletions(-) diff --git a/mergeManager.cpp b/mergeManager.cpp index 48ed39e..74394b6 100644 --- a/mergeManager.cpp +++ b/mergeManager.cpp @@ -48,15 +48,13 @@ void mergeManager::set_c0_size(int64_t size) { c0->target_size = size; } -static const int UPDATE_PROGRESS_DELTA = 1 * 1024 * 1024; - void mergeManager::update_progress(mergeStats * s, int delta) { s->delta += delta; - if((!delta) || s->delta > UPDATE_PROGRESS_DELTA) { //512 * 1024) { + if((!delta) || s->delta > UPDATE_PROGRESS_DELTA) { if(delta) { rwlc_writelock(ltable->header_mut); s->delta = 0; - s->need_tick = true; + if(!s->need_tick) { s->need_tick = 1; } } if(s->merge_level != 0) { if(s->active) { @@ -69,7 +67,7 @@ void mergeManager::update_progress(mergeStats * s, int delta) { if(s->mergeable_size) { s->out_progress = ((double)s->current_size) / (double)s->target_size; } else { - s->out_progress = 0; + s->out_progress = 0.0; } } s->current_size = s->base_size + s->bytes_out - s->bytes_in_large; @@ -116,13 +114,12 @@ void mergeManager::update_progress(mergeStats * s, int delta) { * bytes_consumed_by_merger = sum(bytes_in_small_delta) */ void mergeManager::tick(mergeStats * s, bool block, bool force) { -#define PRINT_SKIP 1000 - +#define PRINT_SKIP 100 if(force || s->need_tick) { if(block) { rwlc_writelock(ltable->header_mut); - s->need_tick = false; + while(sleeping[s->merge_level]) { rwlc_cond_wait(&throttle_wokeup_cond, ltable->header_mut); } @@ -130,8 +127,9 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) { int64_t overshoot = 0; int64_t raw_overshoot = 0; // int64_t overshoot_fudge = (int64_t)((s->out_progress-0.5) * 18.0 * 1024.0 * 1024.0); - double skew = 0.1; // should be between 0 and 0.5. 0 means that there is no 'catch up' after merge commit - int64_t overshoot_fudge = (int64_t)((s->out_progress-skew) * (6 / (1.0-skew)) * 1024.0 * 1024.0); // should be some function of R, delta interval, and force interval. +// double skew = 0.5; // should be between 0 and 0.5. 0 means that there is no 'catch up' after merge commit + int64_t overshoot_fudge = UPDATE_PROGRESS_DELTA + (int64_t)(s->out_progress * ((double)FORCE_INTERVAL)); //+ (int64_t)((s->out_progress-skew) * (0.0 / (1.0-skew)) * 1024.0 * 1024.0); // should be some function of R, delta interval, and force interval. // xxx divide FORCE_INTERVAL by R? + overshoot_fudge *= 2; int spin = 0; double total_sleep = 0.0; do{ @@ -161,7 +159,7 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) { } else { s->print_skipped++; } - if(overshoot > 0 && (raw_overshoot > 0 || total_sleep < 1)) { + if(overshoot > 0 && (raw_overshoot > 0 || total_sleep < 0.01)) { // throttle // it took "elapsed" seconds to process "tick_length_bytes" mb double sleeptime = 2.0 * (double)overshoot / bps; @@ -170,7 +168,7 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) { double max_c0_sleep = 0.1; double min_c0_sleep = 0.05; - double max_c1_sleep = 0.1; + double max_c1_sleep = 0.5; double min_c1_sleep = 0.1; double max_sleep = s->merge_level == 0 ? max_c0_sleep : max_c1_sleep; double min_sleep = s->merge_level == 0 ? min_c0_sleep : min_c1_sleep; @@ -199,7 +197,12 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) { if(s->merge_level == 0) { update_progress(c1, 0); } if(s->merge_level == 1) { update_progress(c2, 0); } } else { - if(overshoot > 0) { s->need_tick = true; } + if(overshoot > 0) { + s->need_tick ++; + if(s->need_tick > 100) { printf("need tick %d\n", s->need_tick); } + } else { + s->need_tick = 0; + } break; } } while(1); diff --git a/mergeManager.h b/mergeManager.h index 0ca6290..d9f28c2 100644 --- a/mergeManager.h +++ b/mergeManager.h @@ -21,6 +21,8 @@ class mergeStats; class mergeManager { public: + static const int UPDATE_PROGRESS_DELTA = 1024 * 1024; + static const int FORCE_INTERVAL = 5 * 1024 * 1024; static double tv_to_double(struct timeval * tv) { return (double)tv->tv_sec + ((double)tv->tv_usec)/1000000.0; } diff --git a/mergeStats.h b/mergeStats.h index 2025ee7..c43a12f 100644 --- a/mergeStats.h +++ b/mergeStats.h @@ -39,7 +39,7 @@ class mergeStats { num_tuples_in_large(0), just_handed_off(false), delta(0), - need_tick(false), + need_tick(0), in_progress(0), out_progress(0), lifetime_elapsed(0), @@ -133,7 +133,7 @@ class mergeStats { bool just_handed_off; int delta; - bool need_tick; + int need_tick; double in_progress; double out_progress; diff --git a/merger.cpp b/merger.cpp index e3c90eb..3f7a9ea 100644 --- a/merger.cpp +++ b/merger.cpp @@ -390,10 +390,8 @@ void *diskMergeThread(void*arg) return 0; } -#define FORCE_INTERVAL (5 * 1024 * 1024) // XXX do not hardcode FORCE_INTERVAL - static void periodically_force(int xid, int *i, diskTreeComponent * forceMe, stasis_log_t * log) { - if(*i > FORCE_INTERVAL) { + if(*i > mergeManager::FORCE_INTERVAL) { if(forceMe) forceMe->force(xid); log->force_tail(log, LOG_FORCE_WAL); *i = 0;