diff --git a/mergeManager.cpp b/mergeManager.cpp index b92e36d..48ed39e 100644 --- a/mergeManager.cpp +++ b/mergeManager.cpp @@ -116,7 +116,7 @@ void mergeManager::update_progress(mergeStats * s, int delta) { * bytes_consumed_by_merger = sum(bytes_in_small_delta) */ void mergeManager::tick(mergeStats * s, bool block, bool force) { -#define PRINT_SKIP 100 +#define PRINT_SKIP 1000 if(force || s->need_tick) { @@ -129,7 +129,9 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) { int64_t overshoot = 0; int64_t raw_overshoot = 0; - int64_t overshoot_fudge = (int64_t)((s->out_progress-0.5) * 18.0 * 1024.0 * 1024.0); +// int64_t overshoot_fudge = (int64_t)((s->out_progress-0.5) * 18.0 * 1024.0 * 1024.0); + double skew = 0.1; // should be between 0 and 0.5. 0 means that there is no 'catch up' after merge commit + int64_t overshoot_fudge = (int64_t)((s->out_progress-skew) * (6 / (1.0-skew)) * 1024.0 * 1024.0); // should be some function of R, delta interval, and force interval. int spin = 0; double total_sleep = 0.0; do{ @@ -159,7 +161,7 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) { } else { s->print_skipped++; } - if(overshoot > 0 && (raw_overshoot > 0 || total_sleep < 0.5)) { + if(overshoot > 0 && (raw_overshoot > 0 || total_sleep < 1)) { // throttle // it took "elapsed" seconds to process "tick_length_bytes" mb double sleeptime = 2.0 * (double)overshoot / bps; @@ -168,7 +170,7 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) { double max_c0_sleep = 0.1; double min_c0_sleep = 0.05; - double max_c1_sleep = 1.0; + double max_c1_sleep = 0.1; double min_c1_sleep = 0.1; double max_sleep = s->merge_level == 0 ? max_c0_sleep : max_c1_sleep; double min_sleep = s->merge_level == 0 ? min_c0_sleep : min_c1_sleep; @@ -179,8 +181,10 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) { spin ++; total_sleep += sleeptime; - if((spin > 40) || (total_sleep > (max_sleep * 20))) { - printf("\nMerge thread %d Overshoot: raw=%lld, d=%lld eff=%lld Throttle min(1, %6f) spin %d, total_sleep %6.3f\n", s->merge_level, raw_overshoot, overshoot_fudge, overshoot, sleeptime, spin, total_sleep); + if((spin > 40) || (total_sleep > (max_sleep * 20.0))) { +// if(spin > 20 || s->merge_level == 0) { + printf("\nMerge thread %d Overshoot: raw=%lld, d=%lld eff=%lld Throttle min(1, %6f) spin %d, total_sleep %6.3f\n", s->merge_level, raw_overshoot, overshoot_fudge, overshoot, sleeptime, spin, total_sleep); +// } } struct timeval now; @@ -195,7 +199,7 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) { if(s->merge_level == 0) { update_progress(c1, 0); } if(s->merge_level == 1) { update_progress(c2, 0); } } else { - if(overshoot > 0) { s->need_tick = 1; } + if(overshoot > 0) { s->need_tick = true; } break; } } while(1); diff --git a/merger.cpp b/merger.cpp index 2efe2e8..e3c90eb 100644 --- a/merger.cpp +++ b/merger.cpp @@ -390,10 +390,10 @@ void *diskMergeThread(void*arg) return 0; } -#define FORCE_INTERVAL (1 * 1024 * 1024) // XXX do not hardcode FORCE_INTERVAL +#define FORCE_INTERVAL (5 * 1024 * 1024) // XXX do not hardcode FORCE_INTERVAL static void periodically_force(int xid, int *i, diskTreeComponent * forceMe, stasis_log_t * log) { - if(0 && *i > FORCE_INTERVAL) { + if(*i > FORCE_INTERVAL) { if(forceMe) forceMe->force(xid); log->force_tail(log, LOG_FORCE_WAL); *i = 0;