enable force interval; play with some knobs
git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@854 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
parent
2530009ec0
commit
eda7c0fd12
2 changed files with 13 additions and 9 deletions
|
@ -116,7 +116,7 @@ void mergeManager::update_progress(mergeStats * s, int delta) {
|
||||||
* bytes_consumed_by_merger = sum(bytes_in_small_delta)
|
* bytes_consumed_by_merger = sum(bytes_in_small_delta)
|
||||||
*/
|
*/
|
||||||
void mergeManager::tick(mergeStats * s, bool block, bool force) {
|
void mergeManager::tick(mergeStats * s, bool block, bool force) {
|
||||||
#define PRINT_SKIP 100
|
#define PRINT_SKIP 1000
|
||||||
|
|
||||||
if(force || s->need_tick) {
|
if(force || s->need_tick) {
|
||||||
|
|
||||||
|
@ -129,7 +129,9 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) {
|
||||||
|
|
||||||
int64_t overshoot = 0;
|
int64_t overshoot = 0;
|
||||||
int64_t raw_overshoot = 0;
|
int64_t raw_overshoot = 0;
|
||||||
int64_t overshoot_fudge = (int64_t)((s->out_progress-0.5) * 18.0 * 1024.0 * 1024.0);
|
// int64_t overshoot_fudge = (int64_t)((s->out_progress-0.5) * 18.0 * 1024.0 * 1024.0);
|
||||||
|
double skew = 0.1; // should be between 0 and 0.5. 0 means that there is no 'catch up' after merge commit
|
||||||
|
int64_t overshoot_fudge = (int64_t)((s->out_progress-skew) * (6 / (1.0-skew)) * 1024.0 * 1024.0); // should be some function of R, delta interval, and force interval.
|
||||||
int spin = 0;
|
int spin = 0;
|
||||||
double total_sleep = 0.0;
|
double total_sleep = 0.0;
|
||||||
do{
|
do{
|
||||||
|
@ -159,7 +161,7 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) {
|
||||||
} else {
|
} else {
|
||||||
s->print_skipped++;
|
s->print_skipped++;
|
||||||
}
|
}
|
||||||
if(overshoot > 0 && (raw_overshoot > 0 || total_sleep < 0.5)) {
|
if(overshoot > 0 && (raw_overshoot > 0 || total_sleep < 1)) {
|
||||||
// throttle
|
// throttle
|
||||||
// it took "elapsed" seconds to process "tick_length_bytes" mb
|
// it took "elapsed" seconds to process "tick_length_bytes" mb
|
||||||
double sleeptime = 2.0 * (double)overshoot / bps;
|
double sleeptime = 2.0 * (double)overshoot / bps;
|
||||||
|
@ -168,7 +170,7 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) {
|
||||||
|
|
||||||
double max_c0_sleep = 0.1;
|
double max_c0_sleep = 0.1;
|
||||||
double min_c0_sleep = 0.05;
|
double min_c0_sleep = 0.05;
|
||||||
double max_c1_sleep = 1.0;
|
double max_c1_sleep = 0.1;
|
||||||
double min_c1_sleep = 0.1;
|
double min_c1_sleep = 0.1;
|
||||||
double max_sleep = s->merge_level == 0 ? max_c0_sleep : max_c1_sleep;
|
double max_sleep = s->merge_level == 0 ? max_c0_sleep : max_c1_sleep;
|
||||||
double min_sleep = s->merge_level == 0 ? min_c0_sleep : min_c1_sleep;
|
double min_sleep = s->merge_level == 0 ? min_c0_sleep : min_c1_sleep;
|
||||||
|
@ -179,8 +181,10 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) {
|
||||||
spin ++;
|
spin ++;
|
||||||
total_sleep += sleeptime;
|
total_sleep += sleeptime;
|
||||||
|
|
||||||
if((spin > 40) || (total_sleep > (max_sleep * 20))) {
|
if((spin > 40) || (total_sleep > (max_sleep * 20.0))) {
|
||||||
printf("\nMerge thread %d Overshoot: raw=%lld, d=%lld eff=%lld Throttle min(1, %6f) spin %d, total_sleep %6.3f\n", s->merge_level, raw_overshoot, overshoot_fudge, overshoot, sleeptime, spin, total_sleep);
|
// if(spin > 20 || s->merge_level == 0) {
|
||||||
|
printf("\nMerge thread %d Overshoot: raw=%lld, d=%lld eff=%lld Throttle min(1, %6f) spin %d, total_sleep %6.3f\n", s->merge_level, raw_overshoot, overshoot_fudge, overshoot, sleeptime, spin, total_sleep);
|
||||||
|
// }
|
||||||
}
|
}
|
||||||
|
|
||||||
struct timeval now;
|
struct timeval now;
|
||||||
|
@ -195,7 +199,7 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) {
|
||||||
if(s->merge_level == 0) { update_progress(c1, 0); }
|
if(s->merge_level == 0) { update_progress(c1, 0); }
|
||||||
if(s->merge_level == 1) { update_progress(c2, 0); }
|
if(s->merge_level == 1) { update_progress(c2, 0); }
|
||||||
} else {
|
} else {
|
||||||
if(overshoot > 0) { s->need_tick = 1; }
|
if(overshoot > 0) { s->need_tick = true; }
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
} while(1);
|
} while(1);
|
||||||
|
|
|
@ -390,10 +390,10 @@ void *diskMergeThread(void*arg)
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
#define FORCE_INTERVAL (1 * 1024 * 1024) // XXX do not hardcode FORCE_INTERVAL
|
#define FORCE_INTERVAL (5 * 1024 * 1024) // XXX do not hardcode FORCE_INTERVAL
|
||||||
|
|
||||||
static void periodically_force(int xid, int *i, diskTreeComponent * forceMe, stasis_log_t * log) {
|
static void periodically_force(int xid, int *i, diskTreeComponent * forceMe, stasis_log_t * log) {
|
||||||
if(0 && *i > FORCE_INTERVAL) {
|
if(*i > FORCE_INTERVAL) {
|
||||||
if(forceMe) forceMe->force(xid);
|
if(forceMe) forceMe->force(xid);
|
||||||
log->force_tail(log, LOG_FORCE_WAL);
|
log->force_tail(log, LOG_FORCE_WAL);
|
||||||
*i = 0;
|
*i = 0;
|
||||||
|
|
Loading…
Reference in a new issue