improved bps estimation; add "minisleeps" to update_progress; they do not belong there long term, but this is better than nothing
git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@929 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
parent
cf6c6e5955
commit
9dcb624649
3 changed files with 46 additions and 14 deletions
|
@ -9,6 +9,7 @@
|
|||
#include "mergeStats.h"
|
||||
#include "logstore.h"
|
||||
#include "math.h"
|
||||
#include "time.h"
|
||||
mergeStats* mergeManager:: get_merge_stats(int mergeLevel) {
|
||||
if (mergeLevel == 0) {
|
||||
return c0;
|
||||
|
@ -50,6 +51,27 @@ void mergeManager::set_c0_size(int64_t size) {
|
|||
|
||||
void mergeManager::update_progress(mergeStats * s, int delta) {
|
||||
s->delta += delta;
|
||||
s->mini_delta += delta;
|
||||
{
|
||||
if(s->merge_level < 2 && s->mergeable_size && delta) {
|
||||
int64_t effective_max_delta = (int64_t)(UPDATE_PROGRESS_PERIOD * s->bps);
|
||||
|
||||
if(s->mini_delta > effective_max_delta) {
|
||||
struct timeval now;
|
||||
gettimeofday(&now, 0);
|
||||
double now_double = tv_to_double(&now);
|
||||
double elapsed_delta = now_double - ts_to_double(&s->last_mini_tick);
|
||||
double slp = UPDATE_PROGRESS_PERIOD - elapsed_delta;
|
||||
if(slp > 0.001) {
|
||||
struct timespec sleeptime;
|
||||
double_to_ts(&sleeptime, slp);
|
||||
nanosleep(&sleeptime, 0);
|
||||
}
|
||||
double_to_ts(&s->last_mini_tick, now_double);
|
||||
s->mini_delta = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
if((!delta) || s->delta > UPDATE_PROGRESS_DELTA) {
|
||||
if(delta) {
|
||||
rwlc_writelock(ltable->header_mut);
|
||||
|
@ -74,18 +96,21 @@ void mergeManager::update_progress(mergeStats * s, int delta) {
|
|||
struct timeval now;
|
||||
gettimeofday(&now, 0);
|
||||
double elapsed_delta = tv_to_double(&now) - ts_to_double(&s->last_tick);
|
||||
|
||||
if(elapsed_delta < 0.0000001) { elapsed_delta = 0.0000001; }
|
||||
s->lifetime_elapsed += elapsed_delta;
|
||||
s->lifetime_consumed += s->bytes_in_small_delta;
|
||||
double decay = 0.9999; // XXX set this in some principled way. Surely, it should depend on tick_length (once that's working...)
|
||||
s->window_elapsed = (decay * s->window_elapsed) + elapsed_delta;
|
||||
s->window_consumed = (decay * s->window_consumed) + s->bytes_in_small_delta;
|
||||
double tau = 60.0; // number of seconds to look back for window computation. (this is the expected mean residence time in an exponential decay model, so the units are not so intuitive...)
|
||||
double decay = exp((0.0-elapsed_delta)/tau);
|
||||
// s->window_elapsed = (decay * s->window_elapsed) + elapsed_delta;
|
||||
// s->window_consumed = (decay * s->window_consumed) + s->bytes_in_small_delta;
|
||||
|
||||
double_to_ts(&s->last_tick, tv_to_double(&now));
|
||||
|
||||
s->bytes_in_small_delta = 0;
|
||||
double window_bps = ((double)s->bytes_in_small_delta) / (double)elapsed_delta;
|
||||
|
||||
s->bps = s->window_consumed / s->window_elapsed;
|
||||
s->bps = (1.0-decay) * window_bps + decay * s->bps; //s->window_consumed / s->window_elapsed;
|
||||
|
||||
s->bytes_in_small_delta = 0;
|
||||
|
||||
if(delta) rwlc_unlock(ltable->header_mut);
|
||||
|
||||
|
@ -115,6 +140,9 @@ void mergeManager::update_progress(mergeStats * s, int delta) {
|
|||
*/
|
||||
void mergeManager::tick(mergeStats * s, bool block, bool force) {
|
||||
#define PRINT_SKIP 100
|
||||
if(block) {
|
||||
// sleep(((double)delta)/[s+1]->bps); // XXX We currently sleep based on the past performance of the current tree. In the limit, this is fine, but it would be better to sleep based on the past throughput of the tree component we're waiting for. fill in the parameters
|
||||
}
|
||||
if(force || s->need_tick) {
|
||||
|
||||
if(block) {
|
||||
|
@ -139,7 +167,7 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) {
|
|||
int64_t overshoot_fudge = (int64_t)((s->out_progress-skew) * ((double)FORCE_INTERVAL)/(1.0-skew));
|
||||
/* model the effect of amortizing this computation: we could
|
||||
become this much more overshot if we don't act now. */
|
||||
int64_t overshoot_fudge2 = UPDATE_PROGRESS_DELTA;
|
||||
int64_t overshoot_fudge2 = UPDATE_PROGRESS_DELTA; //(int64_t)(((double)UPDATE_PROGRESS_PERIOD) * s->bps / 1000.0);
|
||||
/* multiply by 2 for good measure. These are 'soft' walls, and
|
||||
still let writes trickle through. Once we've exausted the
|
||||
fudge factors, we'll hit a hard wall, and stop writes
|
||||
|
@ -292,6 +320,7 @@ void mergeManager::finished_merge(int merge_level) {
|
|||
}
|
||||
|
||||
mergeManager::mergeManager(logtable<datatuple> *ltable):
|
||||
UPDATE_PROGRESS_PERIOD(0.005),
|
||||
ltable(ltable),
|
||||
c0(new mergeStats(0, ltable ? ltable->max_c0_size : 10000000)),
|
||||
c1(new mergeStats(1, (int64_t)(ltable ? ((double)(ltable->max_c0_size) * *ltable->R()) : 100000000.0) )),
|
||||
|
@ -335,13 +364,13 @@ void mergeManager::pretty_print(FILE * out) {
|
|||
assert((!c2->active) || (c1_c2_progress >= -1 && c1_c2_progress < 102));
|
||||
|
||||
fprintf(out,"[merge progress MB/s window (lifetime)]: app [%s %6lldMB ~ %3.0f%% %6.1fsec %4.1f (%4.1f)] %s %s [%s %3.0f%% ~ %3.0f%% %4.1f (%4.1f)] %s %s [%s %3.0f%% %4.1f (%4.1f)] %s ",
|
||||
c0->active ? "RUN" : "---", (uint64_t)(c0->lifetime_consumed / mb), c0_out_progress, c0->lifetime_elapsed, c0->window_consumed/(((double)mb)*c0->window_elapsed), c0->lifetime_consumed/(((double)mb)*c0->lifetime_elapsed),
|
||||
c0->active ? "RUN" : "---", (uint64_t)(c0->lifetime_consumed / mb), c0_out_progress, c0->lifetime_elapsed, c0->bps/((double)mb), c0->lifetime_consumed/(((double)mb)*c0->lifetime_elapsed),
|
||||
have_c0 ? "C0" : "..",
|
||||
have_c0m ? "C0'" : "...",
|
||||
c1->active ? "RUN" : "---", c0_c1_in_progress, c0_c1_out_progress, c1->window_consumed/(((double)mb)*c1->window_elapsed), c1->lifetime_consumed/(((double)mb)*c1->lifetime_elapsed),
|
||||
c1->active ? "RUN" : "---", c0_c1_in_progress, c0_c1_out_progress, c1->bps/((double)mb), c1->lifetime_consumed/(((double)mb)*c1->lifetime_elapsed),
|
||||
have_c1 ? "C1" : "..",
|
||||
have_c1m ? "C1'" : "...",
|
||||
c2->active ? "RUN" : "---", c1_c2_progress, c2->window_consumed/(((double)mb)*c2->window_elapsed), c2->lifetime_consumed/(((double)mb)*c2->lifetime_elapsed),
|
||||
c2->active ? "RUN" : "---", c1_c2_progress, c2->bps/((double)mb), c2->lifetime_consumed/(((double)mb)*c2->lifetime_elapsed),
|
||||
have_c2 ? "C2" : "..");
|
||||
//#define PP_SIZES
|
||||
#ifdef PP_SIZES
|
||||
|
|
|
@ -22,6 +22,7 @@ class mergeStats;
|
|||
class mergeManager {
|
||||
public:
|
||||
static const int UPDATE_PROGRESS_DELTA = 10 * 1024 * 1024;
|
||||
const double UPDATE_PROGRESS_PERIOD; // in seconds, defined in constructor.
|
||||
static const int FORCE_INTERVAL = 25 * 1024 * 1024;
|
||||
static double tv_to_double(struct timeval * tv) {
|
||||
return (double)tv->tv_sec + ((double)tv->tv_usec)/1000000.0;
|
||||
|
|
10
mergeStats.h
10
mergeStats.h
|
@ -38,19 +38,20 @@ class mergeStats {
|
|||
bytes_in_large(0),
|
||||
num_tuples_in_large(0),
|
||||
just_handed_off(false),
|
||||
mini_delta(0),
|
||||
delta(0),
|
||||
need_tick(0),
|
||||
in_progress(0),
|
||||
out_progress(0),
|
||||
lifetime_elapsed(0),
|
||||
lifetime_consumed(0),
|
||||
window_elapsed(0.001),
|
||||
window_consumed(0),
|
||||
bps(10.0*1024.0*1024.0),
|
||||
print_skipped(0),
|
||||
active(false) {
|
||||
gettimeofday(&sleep,0);
|
||||
gettimeofday(&last,0);
|
||||
mergeManager::double_to_ts(&last_tick, mergeManager::tv_to_double(&last));
|
||||
mergeManager::double_to_ts(&last_mini_tick, mergeManager::tv_to_double(&last));
|
||||
pthread_mutex_init(&mut,0);
|
||||
}
|
||||
~mergeStats() {
|
||||
|
@ -82,6 +83,7 @@ class mergeStats {
|
|||
gettimeofday(&start, 0);
|
||||
gettimeofday(&last, 0);
|
||||
mergeManager::double_to_ts(&last_tick, mergeManager::tv_to_double(&last));
|
||||
mergeManager::double_to_ts(&last_mini_tick, mergeManager::tv_to_double(&last));
|
||||
|
||||
}
|
||||
void handed_off_tree() {
|
||||
|
@ -113,6 +115,7 @@ class mergeStats {
|
|||
}
|
||||
friend class mergeManager;
|
||||
|
||||
struct timespec last_mini_tick;
|
||||
struct timespec last_tick;
|
||||
public: // XXX only accessed during initialization.
|
||||
pageid_t base_size;
|
||||
|
@ -135,6 +138,7 @@ class mergeStats {
|
|||
|
||||
bool just_handed_off;
|
||||
|
||||
int mini_delta;
|
||||
int delta;
|
||||
int need_tick;
|
||||
double in_progress;
|
||||
|
@ -142,8 +146,6 @@ class mergeStats {
|
|||
|
||||
double lifetime_elapsed;
|
||||
double lifetime_consumed;
|
||||
double window_elapsed;
|
||||
double window_consumed;
|
||||
|
||||
double bps;
|
||||
|
||||
|
|
Loading…
Reference in a new issue