fix bug in overshoot_fudge computation; reduce number of knobs
git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@860 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
parent
eda7c0fd12
commit
8c35a1265a
4 changed files with 21 additions and 18 deletions
|
@ -48,15 +48,13 @@ void mergeManager::set_c0_size(int64_t size) {
|
||||||
c0->target_size = size;
|
c0->target_size = size;
|
||||||
}
|
}
|
||||||
|
|
||||||
static const int UPDATE_PROGRESS_DELTA = 1 * 1024 * 1024;
|
|
||||||
|
|
||||||
void mergeManager::update_progress(mergeStats * s, int delta) {
|
void mergeManager::update_progress(mergeStats * s, int delta) {
|
||||||
s->delta += delta;
|
s->delta += delta;
|
||||||
if((!delta) || s->delta > UPDATE_PROGRESS_DELTA) { //512 * 1024) {
|
if((!delta) || s->delta > UPDATE_PROGRESS_DELTA) {
|
||||||
if(delta) {
|
if(delta) {
|
||||||
rwlc_writelock(ltable->header_mut);
|
rwlc_writelock(ltable->header_mut);
|
||||||
s->delta = 0;
|
s->delta = 0;
|
||||||
s->need_tick = true;
|
if(!s->need_tick) { s->need_tick = 1; }
|
||||||
}
|
}
|
||||||
if(s->merge_level != 0) {
|
if(s->merge_level != 0) {
|
||||||
if(s->active) {
|
if(s->active) {
|
||||||
|
@ -69,7 +67,7 @@ void mergeManager::update_progress(mergeStats * s, int delta) {
|
||||||
if(s->mergeable_size) {
|
if(s->mergeable_size) {
|
||||||
s->out_progress = ((double)s->current_size) / (double)s->target_size;
|
s->out_progress = ((double)s->current_size) / (double)s->target_size;
|
||||||
} else {
|
} else {
|
||||||
s->out_progress = 0;
|
s->out_progress = 0.0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
s->current_size = s->base_size + s->bytes_out - s->bytes_in_large;
|
s->current_size = s->base_size + s->bytes_out - s->bytes_in_large;
|
||||||
|
@ -116,13 +114,12 @@ void mergeManager::update_progress(mergeStats * s, int delta) {
|
||||||
* bytes_consumed_by_merger = sum(bytes_in_small_delta)
|
* bytes_consumed_by_merger = sum(bytes_in_small_delta)
|
||||||
*/
|
*/
|
||||||
void mergeManager::tick(mergeStats * s, bool block, bool force) {
|
void mergeManager::tick(mergeStats * s, bool block, bool force) {
|
||||||
#define PRINT_SKIP 1000
|
#define PRINT_SKIP 100
|
||||||
|
|
||||||
if(force || s->need_tick) {
|
if(force || s->need_tick) {
|
||||||
|
|
||||||
if(block) {
|
if(block) {
|
||||||
rwlc_writelock(ltable->header_mut);
|
rwlc_writelock(ltable->header_mut);
|
||||||
s->need_tick = false;
|
|
||||||
while(sleeping[s->merge_level]) {
|
while(sleeping[s->merge_level]) {
|
||||||
rwlc_cond_wait(&throttle_wokeup_cond, ltable->header_mut);
|
rwlc_cond_wait(&throttle_wokeup_cond, ltable->header_mut);
|
||||||
}
|
}
|
||||||
|
@ -130,8 +127,9 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) {
|
||||||
int64_t overshoot = 0;
|
int64_t overshoot = 0;
|
||||||
int64_t raw_overshoot = 0;
|
int64_t raw_overshoot = 0;
|
||||||
// int64_t overshoot_fudge = (int64_t)((s->out_progress-0.5) * 18.0 * 1024.0 * 1024.0);
|
// int64_t overshoot_fudge = (int64_t)((s->out_progress-0.5) * 18.0 * 1024.0 * 1024.0);
|
||||||
double skew = 0.1; // should be between 0 and 0.5. 0 means that there is no 'catch up' after merge commit
|
// double skew = 0.5; // should be between 0 and 0.5. 0 means that there is no 'catch up' after merge commit
|
||||||
int64_t overshoot_fudge = (int64_t)((s->out_progress-skew) * (6 / (1.0-skew)) * 1024.0 * 1024.0); // should be some function of R, delta interval, and force interval.
|
int64_t overshoot_fudge = UPDATE_PROGRESS_DELTA + (int64_t)(s->out_progress * ((double)FORCE_INTERVAL)); //+ (int64_t)((s->out_progress-skew) * (0.0 / (1.0-skew)) * 1024.0 * 1024.0); // should be some function of R, delta interval, and force interval. // xxx divide FORCE_INTERVAL by R?
|
||||||
|
overshoot_fudge *= 2;
|
||||||
int spin = 0;
|
int spin = 0;
|
||||||
double total_sleep = 0.0;
|
double total_sleep = 0.0;
|
||||||
do{
|
do{
|
||||||
|
@ -161,7 +159,7 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) {
|
||||||
} else {
|
} else {
|
||||||
s->print_skipped++;
|
s->print_skipped++;
|
||||||
}
|
}
|
||||||
if(overshoot > 0 && (raw_overshoot > 0 || total_sleep < 1)) {
|
if(overshoot > 0 && (raw_overshoot > 0 || total_sleep < 0.01)) {
|
||||||
// throttle
|
// throttle
|
||||||
// it took "elapsed" seconds to process "tick_length_bytes" mb
|
// it took "elapsed" seconds to process "tick_length_bytes" mb
|
||||||
double sleeptime = 2.0 * (double)overshoot / bps;
|
double sleeptime = 2.0 * (double)overshoot / bps;
|
||||||
|
@ -170,7 +168,7 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) {
|
||||||
|
|
||||||
double max_c0_sleep = 0.1;
|
double max_c0_sleep = 0.1;
|
||||||
double min_c0_sleep = 0.05;
|
double min_c0_sleep = 0.05;
|
||||||
double max_c1_sleep = 0.1;
|
double max_c1_sleep = 0.5;
|
||||||
double min_c1_sleep = 0.1;
|
double min_c1_sleep = 0.1;
|
||||||
double max_sleep = s->merge_level == 0 ? max_c0_sleep : max_c1_sleep;
|
double max_sleep = s->merge_level == 0 ? max_c0_sleep : max_c1_sleep;
|
||||||
double min_sleep = s->merge_level == 0 ? min_c0_sleep : min_c1_sleep;
|
double min_sleep = s->merge_level == 0 ? min_c0_sleep : min_c1_sleep;
|
||||||
|
@ -199,7 +197,12 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) {
|
||||||
if(s->merge_level == 0) { update_progress(c1, 0); }
|
if(s->merge_level == 0) { update_progress(c1, 0); }
|
||||||
if(s->merge_level == 1) { update_progress(c2, 0); }
|
if(s->merge_level == 1) { update_progress(c2, 0); }
|
||||||
} else {
|
} else {
|
||||||
if(overshoot > 0) { s->need_tick = true; }
|
if(overshoot > 0) {
|
||||||
|
s->need_tick ++;
|
||||||
|
if(s->need_tick > 100) { printf("need tick %d\n", s->need_tick); }
|
||||||
|
} else {
|
||||||
|
s->need_tick = 0;
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
} while(1);
|
} while(1);
|
||||||
|
|
|
@ -21,6 +21,8 @@ class mergeStats;
|
||||||
|
|
||||||
class mergeManager {
|
class mergeManager {
|
||||||
public:
|
public:
|
||||||
|
static const int UPDATE_PROGRESS_DELTA = 1024 * 1024;
|
||||||
|
static const int FORCE_INTERVAL = 5 * 1024 * 1024;
|
||||||
static double tv_to_double(struct timeval * tv) {
|
static double tv_to_double(struct timeval * tv) {
|
||||||
return (double)tv->tv_sec + ((double)tv->tv_usec)/1000000.0;
|
return (double)tv->tv_sec + ((double)tv->tv_usec)/1000000.0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -39,7 +39,7 @@ class mergeStats {
|
||||||
num_tuples_in_large(0),
|
num_tuples_in_large(0),
|
||||||
just_handed_off(false),
|
just_handed_off(false),
|
||||||
delta(0),
|
delta(0),
|
||||||
need_tick(false),
|
need_tick(0),
|
||||||
in_progress(0),
|
in_progress(0),
|
||||||
out_progress(0),
|
out_progress(0),
|
||||||
lifetime_elapsed(0),
|
lifetime_elapsed(0),
|
||||||
|
@ -133,7 +133,7 @@ class mergeStats {
|
||||||
bool just_handed_off;
|
bool just_handed_off;
|
||||||
|
|
||||||
int delta;
|
int delta;
|
||||||
bool need_tick;
|
int need_tick;
|
||||||
double in_progress;
|
double in_progress;
|
||||||
double out_progress;
|
double out_progress;
|
||||||
|
|
||||||
|
|
|
@ -390,10 +390,8 @@ void *diskMergeThread(void*arg)
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
#define FORCE_INTERVAL (5 * 1024 * 1024) // XXX do not hardcode FORCE_INTERVAL
|
|
||||||
|
|
||||||
static void periodically_force(int xid, int *i, diskTreeComponent * forceMe, stasis_log_t * log) {
|
static void periodically_force(int xid, int *i, diskTreeComponent * forceMe, stasis_log_t * log) {
|
||||||
if(*i > FORCE_INTERVAL) {
|
if(*i > mergeManager::FORCE_INTERVAL) {
|
||||||
if(forceMe) forceMe->force(xid);
|
if(forceMe) forceMe->force(xid);
|
||||||
log->force_tail(log, LOG_FORCE_WAL);
|
log->force_tail(log, LOG_FORCE_WAL);
|
||||||
*i = 0;
|
*i = 0;
|
||||||
|
|
Loading…
Reference in a new issue