move c1-c2 backpresure to linear backpressure model; removes a few hundred lines of fussy code, and should enable further simplifiactions
git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@1488 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
parent
347a4126f3
commit
7e44641799
2 changed files with 42 additions and 157 deletions
197
mergeManager.cpp
197
mergeManager.cpp
|
@ -48,31 +48,6 @@ void mergeManager::new_merge(int mergeLevel) {
|
||||||
void mergeManager::set_c0_size(int64_t size) {
|
void mergeManager::set_c0_size(int64_t size) {
|
||||||
c0->target_size = size;
|
c0->target_size = size;
|
||||||
}
|
}
|
||||||
|
|
||||||
void mergeManager::sleep_on_mini_delta(mergeStats *s, int delta) {
|
|
||||||
s->mini_delta += delta;
|
|
||||||
if(/*s->merge_level < 2*/ s->merge_level == 1 && s->mergeable_size && delta) {
|
|
||||||
int64_t effective_max_delta = (int64_t)(UPDATE_PROGRESS_PERIOD * s->bps);
|
|
||||||
|
|
||||||
// if(s->merge_level == 0) { s->base_size = ltable->tree_bytes; }
|
|
||||||
|
|
||||||
if(s->mini_delta > effective_max_delta) {
|
|
||||||
struct timeval now;
|
|
||||||
gettimeofday(&now, 0);
|
|
||||||
double now_double = tv_to_double(&now);
|
|
||||||
double elapsed_delta = now_double - ts_to_double(&s->last_mini_tick);
|
|
||||||
double slp = UPDATE_PROGRESS_PERIOD - elapsed_delta;
|
|
||||||
if(slp > 0.001) {
|
|
||||||
struct timespec sleeptime;
|
|
||||||
double_to_ts(&sleeptime, slp);
|
|
||||||
nanosleep(&sleeptime, 0);
|
|
||||||
printf("%d Sleep A %f\n", s->merge_level, slp);
|
|
||||||
}
|
|
||||||
double_to_ts(&s->last_mini_tick, now_double);
|
|
||||||
s->mini_delta = 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
void mergeManager::update_progress(mergeStats * s, int delta) {
|
void mergeManager::update_progress(mergeStats * s, int delta) {
|
||||||
s->delta += delta;
|
s->delta += delta;
|
||||||
|
|
||||||
|
@ -125,111 +100,6 @@ void mergeManager::update_progress(mergeStats * s, int delta) {
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void mergeManager::tick_based_on_merge_progress(mergeStats *s) {
|
|
||||||
/* model the effect of linux + stasis' write caches; at the end
|
|
||||||
of this merge, we need to force up to FORCE_INTERVAL bytes
|
|
||||||
after we think we're done writing the next component. */
|
|
||||||
int64_t overshoot_fudge = (int64_t)((s->out_progress) * ((double)FORCE_INTERVAL));
|
|
||||||
/* model the effect of amortizing this computation: we could
|
|
||||||
become this much more overshot if we don't act now. */
|
|
||||||
int64_t overshoot_fudge2 = UPDATE_PROGRESS_DELTA;
|
|
||||||
/* multiply by 2 for good measure. These are 'soft' walls, and
|
|
||||||
still let writes trickle through. Once we've exhausted the
|
|
||||||
fudge factors, we'll hit a hard wall, and stop writes
|
|
||||||
entirely, so it's better to start throttling too early than
|
|
||||||
too late. */
|
|
||||||
overshoot_fudge *= 2;
|
|
||||||
overshoot_fudge2 *= 4;
|
|
||||||
|
|
||||||
if(overshoot_fudge > 0.01 * s->target_size) { overshoot_fudge = (int64_t)(0.01 * (double)s->target_size); }
|
|
||||||
if(overshoot_fudge2 > 0.01 * s->target_size) { overshoot_fudge2 = (int64_t)(0.01 * (double)s->target_size); }
|
|
||||||
|
|
||||||
const double max_c0_sleep = 0.1;
|
|
||||||
const double min_c0_sleep = 0.01;
|
|
||||||
const double max_c1_sleep = 0.5;
|
|
||||||
const double min_c1_sleep = 0.1;
|
|
||||||
double max_sleep = s->merge_level == 0 ? max_c0_sleep : max_c1_sleep;
|
|
||||||
double min_sleep = s->merge_level == 0 ? min_c0_sleep : min_c1_sleep;
|
|
||||||
|
|
||||||
int spin = 0;
|
|
||||||
double total_sleep = 0.0;
|
|
||||||
while(1) {
|
|
||||||
int64_t overshoot = 0;
|
|
||||||
int64_t overshoot2 = 0;
|
|
||||||
int64_t raw_overshoot = 0;
|
|
||||||
double bps;
|
|
||||||
// This needs to be here (and not in update_progress), since the other guy's in_progress changes while we sleep.
|
|
||||||
mergeStats * s1;
|
|
||||||
if(s->merge_level == 0) {
|
|
||||||
// dead code
|
|
||||||
s1 = c1;
|
|
||||||
} else {
|
|
||||||
s1 = c2;
|
|
||||||
}
|
|
||||||
if(s->mergeable_size) { // only apply backpressure if the next merger is not waiting for us
|
|
||||||
rwlc_readlock(ltable->header_mut);
|
|
||||||
if(s1->active && s->mergeable_size) {
|
|
||||||
raw_overshoot = (int64_t)(((double)s->target_size) * (s->out_progress - s1->in_progress));
|
|
||||||
overshoot = raw_overshoot + overshoot_fudge;
|
|
||||||
overshoot2 = raw_overshoot + overshoot_fudge2;
|
|
||||||
|
|
||||||
bps = s1->bps;
|
|
||||||
}
|
|
||||||
rwlc_unlock(ltable->header_mut);
|
|
||||||
}
|
|
||||||
if(s->merge_level == 1) {
|
|
||||||
if(s1->active && s->mergeable_size) {
|
|
||||||
cur_c1_c2_progress_delta = s1->in_progress - s->out_progress;
|
|
||||||
} else if(!s->mergeable_size) {
|
|
||||||
cur_c1_c2_progress_delta = 1;
|
|
||||||
} else {
|
|
||||||
// s1 is not active.
|
|
||||||
cur_c1_c2_progress_delta = 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
//#define PP_THREAD_INFO
|
|
||||||
#ifdef PP_THREAD_INFO
|
|
||||||
printf("\nMerge thread %d %6f %6f Overshoot: raw=%lld, d=%lld eff=%lld Throttle min(1, %6f) spin %d, total_sleep %6.3f\n", s->merge_level, c0_out_progress, c0_c1_in_progress, raw_overshoot, overshoot_fudge, overshoot, -1.0, spin, total_sleep);
|
|
||||||
#endif
|
|
||||||
bool one_threshold = (overshoot > 0 || overshoot2 > 0) || (raw_overshoot > 0);
|
|
||||||
bool two_threshold = (overshoot > 0 || overshoot2 > 0) && (raw_overshoot > 0);
|
|
||||||
|
|
||||||
if(one_threshold && (two_threshold || total_sleep < 0.01)) {
|
|
||||||
|
|
||||||
// throttle
|
|
||||||
// it took "elapsed" seconds to process "tick_length_bytes" mb
|
|
||||||
double sleeptime = 2.0 * fmax((double)overshoot,(double)overshoot2) / bps;
|
|
||||||
if(sleeptime < min_sleep) { sleeptime = min_sleep; }
|
|
||||||
if(sleeptime > max_sleep) { sleeptime = max_sleep; }
|
|
||||||
|
|
||||||
spin ++;
|
|
||||||
total_sleep += sleeptime;
|
|
||||||
|
|
||||||
if((spin > 40) || (total_sleep > (max_sleep * 20.0))) {
|
|
||||||
printf("\nMerge thread %d c0->out=%f c1->in=%f c1->out=%f c2->in=%f\n", s->merge_level, c0->out_progress, c1->in_progress, c1->out_progress, c2->in_progress);
|
|
||||||
printf("\nMerge thread %d Overshoot: raw=%lld, d=%lld eff=%lld eff2=%lld Throttle min(1, %6f) spin %d, total_sleep %6.3f\n", s->merge_level, (long long)raw_overshoot, (long long)overshoot_fudge, (long long)overshoot, (long long)overshoot2, sleeptime, spin, total_sleep);
|
|
||||||
}
|
|
||||||
|
|
||||||
sleeping[s->merge_level] = true;
|
|
||||||
|
|
||||||
struct timespec ts;
|
|
||||||
double_to_ts(&ts, sleeptime);
|
|
||||||
nanosleep(&ts, 0);
|
|
||||||
// printf("%d Sleep B %f\n", s->merge_level, sleeptime);
|
|
||||||
|
|
||||||
sleeping[s->merge_level] = false;
|
|
||||||
update_progress(s1, 0);
|
|
||||||
} else if(overshoot > 0 || overshoot2 > 0) { // this is our second time around the loop, so remember we're in the hole
|
|
||||||
s->need_tick ++;
|
|
||||||
if(s->need_tick > 500) { printf("need tick %d\n", s->need_tick); }
|
|
||||||
} else { // all is well in the world. we can run worry-free for a while.
|
|
||||||
s->need_tick = 0;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This function is invoked periodically by the merge threads. It updates mergeManager's statistics, and applies
|
* This function is invoked periodically by the merge threads. It updates mergeManager's statistics, and applies
|
||||||
* backpressure as necessary.
|
* backpressure as necessary.
|
||||||
|
@ -252,33 +122,50 @@ void mergeManager::tick_based_on_merge_progress(mergeStats *s) {
|
||||||
* bytes_consumed_by_merger = sum(bytes_in_small_delta)
|
* bytes_consumed_by_merger = sum(bytes_in_small_delta)
|
||||||
*/
|
*/
|
||||||
void mergeManager::tick(mergeStats * s) {
|
void mergeManager::tick(mergeStats * s) {
|
||||||
if(s->need_tick) {
|
if(s->merge_level == 1) { // apply backpressure based on merge progress.
|
||||||
if(s->merge_level == 1) { // apply backpressure based on merge progress.
|
if(s->need_tick) {
|
||||||
tick_based_on_merge_progress(s);
|
s->need_tick = 0;
|
||||||
} else if(s->merge_level == 0) {
|
// Only apply back pressure if next thread is not waiting on us.
|
||||||
// Simple backpressure algorithm based on how full C0 is.
|
rwlc_readlock(ltable->header_mut);
|
||||||
|
if(c1->mergeable_size && c2->active) {
|
||||||
// Is C0 bigger than is allowed?
|
double delta = c1->out_progress - c2->in_progress;
|
||||||
while(ltable->tree_bytes > ltable->max_c0_size) { // can't use s->current_size, since this is the thread that maintains that number...
|
rwlc_unlock(ltable->header_mut);
|
||||||
printf("\nMEMORY OVERRUN!!!! SLEEP!!!!\n");
|
if(delta > -0.01) {
|
||||||
struct timespec ts;
|
delta += 0.01; // delta > 0;
|
||||||
double_to_ts(&ts, 0.1);
|
double slp = 0.001 + delta;
|
||||||
nanosleep(&ts, 0);
|
struct timespec sleeptime;
|
||||||
|
DEBUG("\ndisk sleeping %0.6f tree_megabytes %0.3f\n", slp, ((double)ltable->tree_bytes)/(1024.0*1024.0));
|
||||||
|
double_to_ts(&sleeptime,slp);
|
||||||
|
nanosleep(&sleeptime, 0);
|
||||||
|
s->need_tick = 1;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
rwlc_unlock(ltable->header_mut);
|
||||||
}
|
}
|
||||||
// Linear backpressure model
|
}
|
||||||
s->current_size = ltable->tree_bytes;
|
} else if(s->merge_level == 0) {
|
||||||
s->out_progress = ((double)ltable->tree_bytes)/((double)ltable->max_c0_size);
|
// Simple backpressure algorithm based on how full C0 is.
|
||||||
double delta = ((double)ltable->tree_bytes)/(0.9*(double)ltable->max_c0_size); // 0 <= delta <= 1.111...
|
|
||||||
delta -= 1.0;
|
|
||||||
if(delta > 0.00005) {
|
|
||||||
double slp = 0.001 + 10.0 * delta; //0.0015 < slp < 1.112111..
|
|
||||||
|
|
||||||
DEBUG("\nsleeping %0.6f tree_megabytes %0.3f\n", slp, ((double)ltable->tree_bytes)/(1024.0*1024.0));
|
// Is C0 bigger than is allowed?
|
||||||
struct timespec sleeptime;
|
while(ltable->tree_bytes > ltable->max_c0_size) { // can't use s->current_size, since this is the thread that maintains that number...
|
||||||
double_to_ts(&sleeptime, slp);
|
printf("\nMEMORY OVERRUN!!!! SLEEP!!!!\n");
|
||||||
DEBUG("%d Sleep C %f\n", s->merge_level, slp);
|
struct timespec ts;
|
||||||
nanosleep(&sleeptime, 0);
|
double_to_ts(&ts, 0.1);
|
||||||
}
|
nanosleep(&ts, 0);
|
||||||
|
}
|
||||||
|
// Linear backpressure model
|
||||||
|
s->current_size = ltable->tree_bytes;
|
||||||
|
s->out_progress = ((double)ltable->tree_bytes)/((double)ltable->max_c0_size);
|
||||||
|
double delta = ((double)ltable->tree_bytes)/(0.9*(double)ltable->max_c0_size); // 0 <= delta <= 1.111...
|
||||||
|
delta -= 1.0;
|
||||||
|
if(delta > 0.00005) {
|
||||||
|
double slp = 0.001 + 5.0 * delta; //0.0015 < slp < 1.112111..
|
||||||
|
|
||||||
|
DEBUG("\nmem sleeping %0.6f tree_megabytes %0.3f\n", slp, ((double)ltable->tree_bytes)/(1024.0*1024.0));
|
||||||
|
struct timespec sleeptime;
|
||||||
|
double_to_ts(&sleeptime, slp);
|
||||||
|
DEBUG("%d Sleep C %f\n", s->merge_level, slp);
|
||||||
|
nanosleep(&sleeptime, 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,10 +43,8 @@ public:
|
||||||
|
|
||||||
void new_merge(int mergelevel);
|
void new_merge(int mergelevel);
|
||||||
void set_c0_size(int64_t size);
|
void set_c0_size(int64_t size);
|
||||||
void sleep_on_mini_delta(mergeStats *s, int delta);
|
|
||||||
void update_progress(mergeStats *s, int delta);
|
void update_progress(mergeStats *s, int delta);
|
||||||
void tick(mergeStats * s);
|
void tick(mergeStats * s);
|
||||||
void tick_based_on_merge_progress(mergeStats * s);
|
|
||||||
mergeStats* get_merge_stats(int mergeLevel);
|
mergeStats* get_merge_stats(int mergeLevel);
|
||||||
void read_tuple_from_small_component(int merge_level, datatuple * tup);
|
void read_tuple_from_small_component(int merge_level, datatuple * tup);
|
||||||
void read_tuple_from_large_component(int merge_level, datatuple * tup) {
|
void read_tuple_from_large_component(int merge_level, datatuple * tup) {
|
||||||
|
|
Loading…
Reference in a new issue