fix minor stats bugs; pull most stats computation into update_progress. The idea is that update_progress and tick, which are easily amortized, are the only stats things that need to grab latches
git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@827 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
parent
fe7ce3a785
commit
0ecd7491c7
5 changed files with 77 additions and 49 deletions
|
@ -437,7 +437,7 @@ void logtable<TUPLE>::insertTuple(datatuple *tuple)
|
||||||
{
|
{
|
||||||
datatuple *pre_t = *rbitr;
|
datatuple *pre_t = *rbitr;
|
||||||
//do the merging
|
//do the merging
|
||||||
c0_stats->read_tuple_from_large_component(pre_t);
|
merge_mgr->read_tuple_from_large_component(0, pre_t);
|
||||||
datatuple *new_t = tmerger->merge(pre_t, tuple);
|
datatuple *new_t = tmerger->merge(pre_t, tuple);
|
||||||
c0_stats->merged_tuples(new_t, tuple, pre_t);
|
c0_stats->merged_tuples(new_t, tuple, pre_t);
|
||||||
t = new_t;
|
t = new_t;
|
||||||
|
|
|
@ -51,6 +51,28 @@ void mergeManager::set_c0_size(int64_t size) {
|
||||||
c0->target_size = size;
|
c0->target_size = size;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void mergeManager::update_progress(mergeStats * s, int delta) {
|
||||||
|
s->delta += delta;
|
||||||
|
if((!delta) || s->delta > 512 * 1024) {
|
||||||
|
s->delta = 0;
|
||||||
|
if(delta) s->need_tick = true;
|
||||||
|
if(s->merge_level != 0) {
|
||||||
|
if(s->active) {
|
||||||
|
s->in_progress = ((double)(s->bytes_in_large + s->bytes_in_small)) / (double)(get_merge_stats(s->merge_level-1)->mergeable_size + s->base_size);
|
||||||
|
} else {
|
||||||
|
s->in_progress = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if(s->merge_level != 2) {
|
||||||
|
if(s->mergeable_size) {
|
||||||
|
s->out_progress = ((double)s->current_size) / (double)s->target_size;
|
||||||
|
} else {
|
||||||
|
s->out_progress = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This function is invoked periodically by the merge threads. It updates mergeManager's statistics, and applies
|
* This function is invoked periodically by the merge threads. It updates mergeManager's statistics, and applies
|
||||||
* backpressure as necessary.
|
* backpressure as necessary.
|
||||||
|
@ -72,17 +94,20 @@ void mergeManager::set_c0_size(int64_t size) {
|
||||||
*
|
*
|
||||||
* bytes_consumed_by_merger = sum(bytes_in_small_delta)
|
* bytes_consumed_by_merger = sum(bytes_in_small_delta)
|
||||||
*/
|
*/
|
||||||
void mergeManager::tick(mergeStats * s, bool block) {
|
void mergeManager::tick(mergeStats * s, bool block, bool force) {
|
||||||
#define PRINT_SKIP 20
|
#define PRINT_SKIP 20
|
||||||
pageid_t tick_length_bytes = 64*1024; // probably lower than it could be for production machines. 256KB leads to whining on my dev box.
|
pageid_t tick_length_bytes = 256*1024; // probably lower than it could be for production machines. 256KB leads to whining on my dev box.
|
||||||
|
|
||||||
// if(s->bytes_in_small_delta > tick_length_bytes) {
|
// if(s->bytes_in_small_delta > tick_length_bytes) {
|
||||||
pageid_t new_current_size = s->base_size + s->bytes_out - s->bytes_in_large;
|
pageid_t new_current_size = s->base_size + s->bytes_out - s->bytes_in_large;
|
||||||
|
|
||||||
if(((!block) && (new_current_size - s->current_size > tick_length_bytes)) ||
|
// if(force || ((!block) && (new_current_size - s->current_size > tick_length_bytes)) ||
|
||||||
(block && s->bytes_in_small_delta > tick_length_bytes)) { // other than R, these are protected by a mutex, but this is the only thread that can write them
|
// (block && s->bytes_in_small_delta > tick_length_bytes)) { // other than R, these are protected by a mutex, but this is the only thread that can write them
|
||||||
|
|
||||||
|
if(force || s->need_tick) {
|
||||||
|
if(block) { s->need_tick = false; }
|
||||||
|
s->current_size = new_current_size; // xxx move to update_progress
|
||||||
|
|
||||||
s->current_size = new_current_size; // s->base_size + s->bytes_out - s->bytes_in_large;
|
|
||||||
|
|
||||||
if(block) {
|
if(block) {
|
||||||
while(sleeping[s->merge_level]) {
|
while(sleeping[s->merge_level]) {
|
||||||
|
@ -93,6 +118,8 @@ void mergeManager::tick(mergeStats * s, bool block) {
|
||||||
double elapsed_delta = tv_to_double(&now) - ts_to_double(&s->last_tick);
|
double elapsed_delta = tv_to_double(&now) - ts_to_double(&s->last_tick);
|
||||||
double bps = 0; // = (double)s->bytes_in_small_delta / (double)elapsed_delta;
|
double bps = 0; // = (double)s->bytes_in_small_delta / (double)elapsed_delta;
|
||||||
|
|
||||||
|
// xxx move bps stuff to update_progress
|
||||||
|
|
||||||
s->lifetime_elapsed += elapsed_delta;
|
s->lifetime_elapsed += elapsed_delta;
|
||||||
s->lifetime_consumed += s->bytes_in_small_delta;
|
s->lifetime_consumed += s->bytes_in_small_delta;
|
||||||
double decay = 0.9999; // XXX set this in some principled way. Surely, it should depend on tick_length (once that's working...)
|
double decay = 0.9999; // XXX set this in some principled way. Surely, it should depend on tick_length (once that's working...)
|
||||||
|
@ -105,48 +132,24 @@ void mergeManager::tick(mergeStats * s, bool block) {
|
||||||
|
|
||||||
int64_t overshoot = 0;
|
int64_t overshoot = 0;
|
||||||
int64_t raw_overshoot = 0;
|
int64_t raw_overshoot = 0;
|
||||||
int64_t overshoot_fudge = (int64_t)(((((double)s->current_size) / (double)(s->target_size)) - 0.5) * 2.0*1024.0*1024.0); // XXX set based on avg / max tuple size?
|
int64_t overshoot_fudge = (int64_t)(((((double)s->current_size) / (double)(s->target_size)) - 0.5) * 4.0*1024.0*1024.0); // XXX set based on avg / max tuple size?
|
||||||
int spin = 0;
|
int spin = 0;
|
||||||
double total_sleep = 0.0;
|
double total_sleep = 0.0;
|
||||||
do{
|
do{
|
||||||
overshoot = 0;
|
overshoot = 0;
|
||||||
raw_overshoot = 0;
|
raw_overshoot = 0;
|
||||||
|
// more bps stuff for update_progress
|
||||||
double c0_out_progress, c0_c1_out_progress;
|
|
||||||
if(c0->mergeable_size) {
|
|
||||||
c0_out_progress = ((double)c0->current_size) / (double)c0->target_size;
|
|
||||||
} else {
|
|
||||||
c0_out_progress = 0; // don't throttle if our consumer is blocked on us.
|
|
||||||
}
|
|
||||||
if(c1->mergeable_size) {
|
|
||||||
c0_c1_out_progress = ((double)c1->current_size) / (double)c1->target_size;
|
|
||||||
} else {
|
|
||||||
c0_c1_out_progress = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
double c0_c1_in_progress, c1_c2_in_progress;
|
|
||||||
if(c1->active) {
|
|
||||||
c0_c1_in_progress = ((double)(c1->bytes_in_large + c1->bytes_in_small)) / (double)(c0->mergeable_size + c1->base_size);
|
|
||||||
} else {
|
|
||||||
c0_c1_in_progress = 0; // if our consumer is not active, it hasn't made any progress on our most recent output
|
|
||||||
}
|
|
||||||
if(c2->active) {
|
|
||||||
c1_c2_in_progress = ((double)(c2->bytes_in_large + c2->bytes_in_small)) / (double)(c1->mergeable_size + c2->base_size);
|
|
||||||
} else {
|
|
||||||
c1_c2_in_progress = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
double c0_c1_bps = c1->window_consumed / c1->window_elapsed;
|
double c0_c1_bps = c1->window_consumed / c1->window_elapsed;
|
||||||
double c1_c2_bps = c2->window_consumed / c2->window_elapsed;
|
double c1_c2_bps = c2->window_consumed / c2->window_elapsed;
|
||||||
|
|
||||||
if(s->merge_level == 0) {
|
if(s->merge_level == 0) {
|
||||||
if(!(c1->active && c0->mergeable_size)) { overshoot_fudge = 0; }
|
if(!(c1->active && c0->mergeable_size)) { overshoot_fudge = 0; }
|
||||||
raw_overshoot = (int64_t)(((double)c0->target_size) * (c0_out_progress - c0_c1_in_progress));
|
raw_overshoot = (int64_t)(((double)c0->target_size) * (c0->out_progress - c1->in_progress));
|
||||||
overshoot = raw_overshoot + overshoot_fudge;
|
overshoot = raw_overshoot + overshoot_fudge;
|
||||||
bps = c0_c1_bps;
|
bps = c0_c1_bps;
|
||||||
} else if (s->merge_level == 1) {
|
} else if (s->merge_level == 1) {
|
||||||
if(!(c2->active && c1->mergeable_size)) { overshoot_fudge = 0; }
|
if(!(c2->active && c1->mergeable_size)) { overshoot_fudge = 0; }
|
||||||
raw_overshoot = (int64_t)(((double)c1->target_size) * (c0_c1_out_progress - c1_c2_in_progress));
|
raw_overshoot = (int64_t)(((double)c1->target_size) * (c1->out_progress - c2->in_progress));
|
||||||
overshoot = raw_overshoot + overshoot_fudge;
|
overshoot = raw_overshoot + overshoot_fudge;
|
||||||
bps = c1_c2_bps;
|
bps = c1_c2_bps;
|
||||||
}
|
}
|
||||||
|
@ -161,7 +164,7 @@ void mergeManager::tick(mergeStats * s, bool block) {
|
||||||
} else {
|
} else {
|
||||||
s->print_skipped++;
|
s->print_skipped++;
|
||||||
}
|
}
|
||||||
if(overshoot > 0) {
|
if(overshoot > 0 && (raw_overshoot > 0 || total_sleep < 0.5)) {
|
||||||
// throttle
|
// throttle
|
||||||
// it took "elapsed" seconds to process "tick_length_bytes" mb
|
// it took "elapsed" seconds to process "tick_length_bytes" mb
|
||||||
double sleeptime = 2.0 * (double)overshoot / bps;
|
double sleeptime = 2.0 * (double)overshoot / bps;
|
||||||
|
@ -187,8 +190,10 @@ void mergeManager::tick(mergeStats * s, bool block) {
|
||||||
sleeping[s->merge_level] = false;
|
sleeping[s->merge_level] = false;
|
||||||
pthread_cond_broadcast(&throttle_wokeup_cond);
|
pthread_cond_broadcast(&throttle_wokeup_cond);
|
||||||
gettimeofday(&now, 0);
|
gettimeofday(&now, 0);
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
} while((overshoot > 0) && (raw_overshoot > 0));
|
} while(1);
|
||||||
} else {
|
} else {
|
||||||
if(s->print_skipped == PRINT_SKIP) {
|
if(s->print_skipped == PRINT_SKIP) {
|
||||||
pretty_print(stdout);
|
pretty_print(stdout);
|
||||||
|
@ -207,24 +212,40 @@ void mergeManager::read_tuple_from_small_component(int merge_level, datatuple *
|
||||||
(s->num_tuples_in_small)++;
|
(s->num_tuples_in_small)++;
|
||||||
(s->bytes_in_small_delta) += tup->byte_length();
|
(s->bytes_in_small_delta) += tup->byte_length();
|
||||||
(s->bytes_in_small) += tup->byte_length();
|
(s->bytes_in_small) += tup->byte_length();
|
||||||
|
update_progress(s, tup->byte_length());
|
||||||
tick(s, true);
|
tick(s, true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
void mergeManager::read_tuple_from_large_component(int merge_level, datatuple * tup) {
|
||||||
|
if(tup) {
|
||||||
|
mergeStats * s = get_merge_stats(merge_level);
|
||||||
|
s->num_tuples_in_large++;
|
||||||
|
s->bytes_in_large += tup->byte_length();
|
||||||
|
// tick(s, false); // would be no-op; we just reduced current_size.
|
||||||
|
update_progress(s, tup->byte_length());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void mergeManager::wrote_tuple(int merge_level, datatuple * tup) {
|
void mergeManager::wrote_tuple(int merge_level, datatuple * tup) {
|
||||||
mergeStats * s = get_merge_stats(merge_level);
|
mergeStats * s = get_merge_stats(merge_level);
|
||||||
(s->num_tuples_out)++;
|
(s->num_tuples_out)++;
|
||||||
(s->bytes_out) += tup->byte_length();
|
(s->bytes_out) += tup->byte_length();
|
||||||
|
|
||||||
// XXX this just updates stat's current size, and (perhaps) does a pretty print. It should not need a mutex.
|
// XXX this just updates stat's current size, and (perhaps) does a pretty print. It should not need a mutex.
|
||||||
|
update_progress(s, tup->byte_length());
|
||||||
tick(s, false);
|
tick(s, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
void mergeManager::finished_merge(int merge_level) {
|
void mergeManager::finished_merge(int merge_level) {
|
||||||
|
update_progress(get_merge_stats(merge_level), 0);
|
||||||
|
tick(get_merge_stats(merge_level), false, true); // XXX what does this do???
|
||||||
get_merge_stats(merge_level)->active = false;
|
get_merge_stats(merge_level)->active = false;
|
||||||
if(merge_level != 0) {
|
if(merge_level != 0) {
|
||||||
get_merge_stats(merge_level - 1)->mergeable_size = 0;
|
get_merge_stats(merge_level - 1)->mergeable_size = 0;
|
||||||
|
update_progress(get_merge_stats(merge_level-1), 0);
|
||||||
}
|
}
|
||||||
gettimeofday(&get_merge_stats(merge_level)->done, 0);
|
gettimeofday(&get_merge_stats(merge_level)->done, 0);
|
||||||
|
update_progress(get_merge_stats(merge_level), 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
mergeManager::mergeManager(logtable<datatuple> *ltable):
|
mergeManager::mergeManager(logtable<datatuple> *ltable):
|
||||||
|
|
|
@ -40,9 +40,11 @@ public:
|
||||||
|
|
||||||
void new_merge(int mergelevel);
|
void new_merge(int mergelevel);
|
||||||
void set_c0_size(int64_t size);
|
void set_c0_size(int64_t size);
|
||||||
void tick(mergeStats * s, bool block);
|
void update_progress(mergeStats *s, int delta);
|
||||||
|
void tick(mergeStats * s, bool block, bool force = false);
|
||||||
mergeStats* get_merge_stats(int mergeLevel);
|
mergeStats* get_merge_stats(int mergeLevel);
|
||||||
void read_tuple_from_small_component(int merge_level, datatuple * tup);
|
void read_tuple_from_small_component(int merge_level, datatuple * tup);
|
||||||
|
void read_tuple_from_large_component(int merge_level, datatuple * tup);
|
||||||
void wrote_tuple(int merge_level, datatuple * tup);
|
void wrote_tuple(int merge_level, datatuple * tup);
|
||||||
void finished_merge(int merge_level);
|
void finished_merge(int merge_level);
|
||||||
void pretty_print(FILE * out);
|
void pretty_print(FILE * out);
|
||||||
|
|
17
mergeStats.h
17
mergeStats.h
|
@ -38,6 +38,10 @@ class mergeStats {
|
||||||
bytes_in_large(0),
|
bytes_in_large(0),
|
||||||
num_tuples_in_large(0),
|
num_tuples_in_large(0),
|
||||||
just_handed_off(false),
|
just_handed_off(false),
|
||||||
|
delta(0),
|
||||||
|
need_tick(false),
|
||||||
|
in_progress(0),
|
||||||
|
out_progress(0),
|
||||||
lifetime_elapsed(0),
|
lifetime_elapsed(0),
|
||||||
lifetime_consumed(0),
|
lifetime_consumed(0),
|
||||||
window_elapsed(0.001),
|
window_elapsed(0.001),
|
||||||
|
@ -51,6 +55,7 @@ class mergeStats {
|
||||||
void new_merge2() {
|
void new_merge2() {
|
||||||
if(just_handed_off) {
|
if(just_handed_off) {
|
||||||
bytes_out = 0;
|
bytes_out = 0;
|
||||||
|
out_progress = 0;
|
||||||
just_handed_off = false;
|
just_handed_off = false;
|
||||||
}
|
}
|
||||||
base_size = bytes_out;
|
base_size = bytes_out;
|
||||||
|
@ -65,6 +70,7 @@ class mergeStats {
|
||||||
num_tuples_in_small = 0;
|
num_tuples_in_small = 0;
|
||||||
bytes_in_large = 0;
|
bytes_in_large = 0;
|
||||||
num_tuples_in_large = 0;
|
num_tuples_in_large = 0;
|
||||||
|
in_progress = 0;
|
||||||
gettimeofday(&sleep,0);
|
gettimeofday(&sleep,0);
|
||||||
}
|
}
|
||||||
void starting_merge() {
|
void starting_merge() {
|
||||||
|
@ -81,12 +87,6 @@ class mergeStats {
|
||||||
just_handed_off = true;
|
just_handed_off = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
void read_tuple_from_large_component(datatuple * tup) {
|
|
||||||
if(tup) {
|
|
||||||
num_tuples_in_large++;
|
|
||||||
bytes_in_large += tup->byte_length();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
void merged_tuples(datatuple * merged, datatuple * small, datatuple * large) {
|
void merged_tuples(datatuple * merged, datatuple * small, datatuple * large) {
|
||||||
}
|
}
|
||||||
void wrote_datapage(DataPage<datatuple> *dp) {
|
void wrote_datapage(DataPage<datatuple> *dp) {
|
||||||
|
@ -128,6 +128,11 @@ class mergeStats {
|
||||||
|
|
||||||
bool just_handed_off;
|
bool just_handed_off;
|
||||||
|
|
||||||
|
int delta;
|
||||||
|
bool need_tick;
|
||||||
|
double in_progress;
|
||||||
|
double out_progress;
|
||||||
|
|
||||||
double lifetime_elapsed;
|
double lifetime_elapsed;
|
||||||
double lifetime_consumed;
|
double lifetime_consumed;
|
||||||
double window_elapsed;
|
double window_elapsed;
|
||||||
|
|
|
@ -414,7 +414,7 @@ void merge_iterators(int xid,
|
||||||
|
|
||||||
rwlc_writelock(ltable->header_mut); // XXX slow
|
rwlc_writelock(ltable->header_mut); // XXX slow
|
||||||
datatuple *t1 = itrA->next_callerFrees();
|
datatuple *t1 = itrA->next_callerFrees();
|
||||||
stats->read_tuple_from_large_component(t1);
|
ltable->merge_mgr->read_tuple_from_large_component(stats->merge_level, t1);
|
||||||
rwlc_unlock(ltable->header_mut); // XXX slow
|
rwlc_unlock(ltable->header_mut); // XXX slow
|
||||||
datatuple *t2 = 0;
|
datatuple *t2 = 0;
|
||||||
|
|
||||||
|
@ -440,7 +440,7 @@ void merge_iterators(int xid,
|
||||||
//advance itrA
|
//advance itrA
|
||||||
t1 = itrA->next_callerFrees();
|
t1 = itrA->next_callerFrees();
|
||||||
if(t1) {
|
if(t1) {
|
||||||
stats->read_tuple_from_large_component(t1);
|
ltable->merge_mgr->read_tuple_from_large_component(stats->merge_level, t1);
|
||||||
}
|
}
|
||||||
periodically_force(xid, &i, forceMe, log);
|
periodically_force(xid, &i, forceMe, log);
|
||||||
rwlc_unlock(ltable->header_mut); // XXX slow
|
rwlc_unlock(ltable->header_mut); // XXX slow
|
||||||
|
@ -461,7 +461,7 @@ void merge_iterators(int xid,
|
||||||
ltable->merge_mgr->wrote_tuple(stats->merge_level, mtuple);
|
ltable->merge_mgr->wrote_tuple(stats->merge_level, mtuple);
|
||||||
t1 = itrA->next_callerFrees(); //advance itrA
|
t1 = itrA->next_callerFrees(); //advance itrA
|
||||||
if(t1) {
|
if(t1) {
|
||||||
stats->read_tuple_from_large_component(t1);
|
ltable->merge_mgr->read_tuple_from_large_component(stats->merge_level, t1);
|
||||||
}
|
}
|
||||||
datatuple::freetuple(mtuple);
|
datatuple::freetuple(mtuple);
|
||||||
periodically_force(xid, &i, forceMe, log);
|
periodically_force(xid, &i, forceMe, log);
|
||||||
|
@ -491,7 +491,7 @@ void merge_iterators(int xid,
|
||||||
|
|
||||||
//advance itrA
|
//advance itrA
|
||||||
t1 = itrA->next_callerFrees();
|
t1 = itrA->next_callerFrees();
|
||||||
stats->read_tuple_from_large_component(t1);
|
ltable->merge_mgr->read_tuple_from_large_component(stats->merge_level, t1);
|
||||||
periodically_force(xid, &i, forceMe, log);
|
periodically_force(xid, &i, forceMe, log);
|
||||||
rwlc_unlock(ltable->header_mut); // XXX slow
|
rwlc_unlock(ltable->header_mut); // XXX slow
|
||||||
rwlc_writelock(ltable->header_mut); // XXX slow
|
rwlc_writelock(ltable->header_mut); // XXX slow
|
||||||
|
|
Loading…
Reference in a new issue