readability fix: fields in mergeStats that are only used for computing extended stats for pretty print are now prefixed with "stats_", and can be optionally disabled at compile time

git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@1489 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
sears 2010-12-13 22:27:13 +00:00
parent 7e44641799
commit eb4446f409
4 changed files with 126 additions and 102 deletions

View file

@ -70,14 +70,14 @@ int diskTreeComponent::insertTuple(int xid, datatuple *t)
int ret = 0; // no error.
if(dp==0) {
dp = insertDataPage(xid, t);
// stats->num_datapages_out++;
// stats->stats_num_datapages_out++;
} else if(!dp->append(t)) {
// stats->bytes_out_with_overhead += (PAGE_SIZE * dp->get_page_count());
// stats->stats_bytes_out_with_overhead += (PAGE_SIZE * dp->get_page_count());
((mergeStats*)stats)->wrote_datapage(dp);
dp->writes_done();
delete dp;
dp = insertDataPage(xid, t);
// stats->num_datapages_out++;
// stats->stats_num_datapages_out++;
}
return ret;
}

View file

@ -79,22 +79,25 @@ void mergeManager::update_progress(mergeStats * s, int delta) {
}
s->out_progress = ((double)s->current_size) / (double)s->target_size;
#if EXTENDED_STATS
struct timeval now;
gettimeofday(&now, 0);
double elapsed_delta = tv_to_double(&now) - ts_to_double(&s->last_tick);
if(elapsed_delta < 0.0000001) { elapsed_delta = 0.0000001; }
s->lifetime_elapsed += elapsed_delta;
s->lifetime_consumed += s->bytes_in_small_delta;
double tau = 60.0; // number of seconds to look back for window computation. (this is the expected mean residence time in an exponential decay model, so the units are not so intuitive...)
double decay = exp((0.0-elapsed_delta)/tau);
double stats_elapsed_delta = tv_to_double(&now) - ts_to_double(&s->stats_last_tick);
if(stats_elapsed_delta < 0.0000001) { stats_elapsed_delta = 0.0000001; }
s->stats_lifetime_elapsed += stats_elapsed_delta;
s->stats_lifetime_consumed += s->stats_bytes_in_small_delta;
double stats_tau = 60.0; // number of seconds to look back for window computation. (this is the expected mean residence time in an exponential decay model, so the units are not so intuitive...)
double stats_decay = exp((0.0-stats_elapsed_delta)/stats_tau);
double_to_ts(&s->last_tick, tv_to_double(&now));
double_to_ts(&s->stats_last_tick, tv_to_double(&now));
double window_bps = ((double)s->bytes_in_small_delta) / (double)elapsed_delta;
double stats_window_bps = ((double)s->stats_bytes_in_small_delta) / (double)stats_elapsed_delta;
s->bps = (1.0-decay) * window_bps + decay * s->bps;
s->stats_bps = (1.0-stats_decay) * stats_window_bps + stats_decay * s->stats_bps;
s->bytes_in_small_delta = 0;
s->stats_bytes_in_small_delta = 0;
#endif
rwlc_unlock(ltable->header_mut);
@ -119,7 +122,7 @@ void mergeManager::update_progress(mergeStats * s, int delta) {
*
* current_size(c_i) = sum(bytes_out_delta) - sum(bytes_in_large_delta)
*
* bytes_consumed_by_merger = sum(bytes_in_small_delta)
* bytes_consumed_by_merger = sum(stats_bytes_in_small_delta)
*/
void mergeManager::tick(mergeStats * s) {
if(s->merge_level == 1) { // apply backpressure based on merge progress.
@ -173,8 +176,10 @@ void mergeManager::tick(mergeStats * s) {
void mergeManager::read_tuple_from_small_component(int merge_level, datatuple * tup) {
if(tup) {
mergeStats * s = get_merge_stats(merge_level);
(s->num_tuples_in_small)++;
(s->bytes_in_small_delta) += tup->byte_length();
#if EXTENDED_STATS
(s->stats_num_tuples_in_small)++;
(s->stats_bytes_in_small_delta) += tup->byte_length();
#endif
(s->bytes_in_small) += tup->byte_length();
update_progress(s, tup->byte_length());
tick(s);
@ -183,7 +188,9 @@ void mergeManager::read_tuple_from_small_component(int merge_level, datatuple *
void mergeManager::read_tuple_from_large_component(int merge_level, int tuple_count, pageid_t byte_len) {
if(tuple_count) {
mergeStats * s = get_merge_stats(merge_level);
s->num_tuples_in_large += tuple_count;
#if EXTENDED_STATS
s->stats_num_tuples_in_large += tuple_count;
#endif
s->bytes_in_large += byte_len;
update_progress(s, byte_len);
}
@ -191,7 +198,9 @@ void mergeManager::read_tuple_from_large_component(int merge_level, int tuple_co
void mergeManager::wrote_tuple(int merge_level, datatuple * tup) {
mergeStats * s = get_merge_stats(merge_level);
(s->num_tuples_out)++;
#if EXTENDED_STATS
(s->stats_num_tuples_out)++;
#endif
(s->bytes_out) += tup->byte_length();
}
@ -202,7 +211,9 @@ void mergeManager::finished_merge(int merge_level) {
get_merge_stats(merge_level - 1)->mergeable_size = 0;
update_progress(get_merge_stats(merge_level-1), 0);
}
gettimeofday(&get_merge_stats(merge_level)->done, 0);
#if EXTENDED_STATS
gettimeofday(&get_merge_stats(merge_level)->stats_done, 0);
#endif
update_progress(get_merge_stats(merge_level), 0);
}
@ -242,16 +253,19 @@ mergeManager::mergeManager(logtable<datatuple> *ltable):
sleeping[0] = false;
sleeping[1] = false;
sleeping[2] = false;
double_to_ts(&c0->last_tick, tv_to_double(&tv));
double_to_ts(&c1->last_tick, tv_to_double(&tv));
double_to_ts(&c2->last_tick, tv_to_double(&tv));
#if EXTENDED_STATS
double_to_ts(&c0->stats_last_tick, tv_to_double(&tv));
double_to_ts(&c1->stats_last_tick, tv_to_double(&tv));
double_to_ts(&c2->stats_last_tick, tv_to_double(&tv));
#endif
still_running = true;
pthread_cond_init(&pp_cond, 0);
pthread_create(&pp_thread, 0, merge_manager_pretty_print_thread, (void*)this);
}
void mergeManager::pretty_print(FILE * out) {
pageid_t mb = 1024 * 1024;
#if EXTENDED_STATS
logtable<datatuple> * lt = (logtable<datatuple>*)ltable;
bool have_c0 = false;
bool have_c0m = false;
@ -265,18 +279,21 @@ void mergeManager::pretty_print(FILE * out) {
have_c1m = NULL != lt->get_tree_c1_mergeable() ;
have_c2 = NULL != lt->get_tree_c2();
}
pageid_t mb = 1024 * 1024;
fprintf(out,"[merge progress MB/s window (lifetime)]: app [%s %6lldMB ~ %3.0f%%/%3.0f%% %6.1fsec %4.1f (%4.1f)] %s %s [%s %3.0f%% ~ %3.0f%% %4.1f (%4.1f)] %s %s [%s %3.0f%% %4.1f (%4.1f)] %s ",
c0->active ? "RUN" : "---", (long long)(c0->lifetime_consumed / mb), 100.0 * c0->out_progress, 100.0 * ((double)ltable->tree_bytes)/(double)ltable->max_c0_size, c0->lifetime_elapsed, c0->bps/((double)mb), c0->lifetime_consumed/(((double)mb)*c0->lifetime_elapsed),
c0->active ? "RUN" : "---", (long long)(c0->stats_lifetime_consumed / mb), 100.0 * c0->out_progress, 100.0 * ((double)ltable->tree_bytes)/(double)ltable->max_c0_size, c0->stats_lifetime_elapsed, c0->stats_bps/((double)mb), c0->stats_lifetime_consumed/(((double)mb)*c0->stats_lifetime_elapsed),
have_c0 ? "C0" : "..",
have_c0m ? "C0'" : "...",
c1->active ? "RUN" : "---", 100.0 * c1->in_progress, 100.0 * c1->out_progress, c1->bps/((double)mb), c1->lifetime_consumed/(((double)mb)*c1->lifetime_elapsed),
c1->active ? "RUN" : "---", 100.0 * c1->in_progress, 100.0 * c1->out_progress, c1->stats_bps/((double)mb), c1->stats_lifetime_consumed/(((double)mb)*c1->stats_lifetime_elapsed),
have_c1 ? "C1" : "..",
have_c1m ? "C1'" : "...",
c2->active ? "RUN" : "---", 100.0 * c2->in_progress, c2->bps/((double)mb), c2->lifetime_consumed/(((double)mb)*c2->lifetime_elapsed),
c2->active ? "RUN" : "---", 100.0 * c2->in_progress, c2->stats_bps/((double)mb), c2->stats_lifetime_consumed/(((double)mb)*c2->stats_lifetime_elapsed),
have_c2 ? "C2" : "..");
#endif
//#define PP_SIZES
#ifdef PP_SIZES
{
pageid_t mb = 1024 * 1024;
fprintf(out, "[target cur base in_small in_large, out, mergeable] C0 %4lld %4lld %4lld %4lld %4lld %4lld %4lld ",
c0->target_size/mb, c0->current_size/mb, c0->base_size/mb, c0->bytes_in_small/mb,
c0->bytes_in_large/mb, c0->bytes_out/mb, c0->mergeable_size/mb);
@ -288,6 +305,7 @@ void mergeManager::pretty_print(FILE * out) {
fprintf(out, "C2 ---- %4lld %4lld %4lld %4lld %4lld %4lld ",
/*----*/ c2->current_size/mb, c2->base_size/mb, c2->bytes_in_small/mb,
c2->bytes_in_large/mb, c2->bytes_out/mb, c2->mergeable_size/mb);
}
#endif
// fprintf(out, "Throttle: %6.1f%% (cur) %6.1f%% (overall) ", 100.0*(last_throttle_seconds/(last_elapsed_seconds)), 100.0*(throttle_seconds/(elapsed_seconds)));
// fprintf(out, "C0 size %4lld resident %4lld ",
@ -304,8 +322,8 @@ void mergeManager::pretty_print(FILE * out) {
fflush(out);
#if 0 // XXX would like to bring this back somehow...
assert((!c1->active) || (c1->in_progress >= -0.01 && c1->in_progress < 1.02));
#endif
assert((!c2->active) || (c2->in_progress >= -0.01 && c2->in_progress < 1.10));
#endif
fprintf(out, "\r");
}

View file

@ -12,6 +12,8 @@
#undef try
#undef end
#define EXTENDED_STATS 1
#include <sys/time.h>
#include <stdio.h>
#include "datatuple.h"
@ -23,34 +25,39 @@ class mergeStats {
public:
mergeStats(int merge_level, int64_t target_size) :
merge_level(merge_level),
merge_count(0),
base_size(0),
mergeable_size(0),
target_size(target_size),
current_size(0),
bytes_out_with_overhead(0),
bytes_out(0),
num_tuples_out(0),
num_datapages_out(0),
bytes_in_small(0),
bytes_in_small_delta(0),
num_tuples_in_small(0),
bytes_in_large(0),
num_tuples_in_large(0),
just_handed_off(false),
mini_delta(0),
delta(0),
need_tick(0),
in_progress(0),
out_progress(0),
lifetime_elapsed(0),
lifetime_consumed(0),
bps(10.0*1024.0*1024.0),
active(false) {
gettimeofday(&sleep,0);
active(false)
#if EXTENDED_STATS
,
stats_merge_count(0),
stats_bytes_out_with_overhead(0),
stats_num_tuples_out(0),
stats_num_datapages_out(0),
stats_bytes_in_small_delta(0),
stats_num_tuples_in_small(0),
stats_num_tuples_in_large(0),
stats_lifetime_elapsed(0),
stats_lifetime_consumed(0),
stats_bps(10.0*1024.0*1024.0)
#endif // EXTENDED_STATS
{
#if EXTENDED_STATS
gettimeofday(&stats_sleep,0);
struct timeval last;
gettimeofday(&last,0);
mergeManager::double_to_ts(&last_tick, mergeManager::tv_to_double(&last));
mergeManager::double_to_ts(&last_mini_tick, mergeManager::tv_to_double(&last));
mergeManager::double_to_ts(&stats_last_tick, mergeManager::tv_to_double(&last));
#endif
}
~mergeStats() { }
void new_merge2() {
@ -61,26 +68,29 @@ class mergeStats {
}
base_size = bytes_out;
current_size = base_size;
merge_count++;
bytes_out_with_overhead = 0;
bytes_out = 0;
num_tuples_out = 0;
num_datapages_out = 0;
bytes_in_small = 0;
bytes_in_small_delta = 0;
num_tuples_in_small = 0;
bytes_in_large = 0;
num_tuples_in_large = 0;
in_progress = 0;
gettimeofday(&sleep,0);
#if EXTENDED_STATS
stats_merge_count++;
stats_bytes_out_with_overhead = 0;
stats_num_tuples_out = 0;
stats_num_datapages_out = 0;
stats_bytes_in_small_delta = 0;
stats_num_tuples_in_small = 0;
stats_num_tuples_in_large = 0;
gettimeofday(&stats_sleep,0);
#endif
}
void starting_merge() {
active = true;
gettimeofday(&start, 0);
#if EXTENDED_STATS
gettimeofday(&stats_start, 0);
struct timeval last;
gettimeofday(&last, 0);
mergeManager::double_to_ts(&last_tick, mergeManager::tv_to_double(&last));
mergeManager::double_to_ts(&last_mini_tick, mergeManager::tv_to_double(&last));
mergeManager::double_to_ts(&stats_last_tick, mergeManager::tv_to_double(&last));
#endif
}
void handed_off_tree() {
if(merge_level == 2) {
@ -92,75 +102,70 @@ class mergeStats {
void merged_tuples(datatuple * merged, datatuple * small, datatuple * large) {
}
void wrote_datapage(DataPage<datatuple> *dp) {
num_datapages_out++;
bytes_out_with_overhead += (PAGE_SIZE * dp->get_page_count());
#if EXTENDED_STATS
stats_num_datapages_out++;
stats_bytes_out_with_overhead += (PAGE_SIZE * dp->get_page_count());
#endif
}
pageid_t output_size() {
return bytes_out;
}
const int merge_level; // 1 => C0->C1, 2 => C1->C2
protected:
pageid_t merge_count; // This is the merge_count'th merge
struct timeval sleep; // When did we go to sleep waiting for input?
struct timeval start; // When did we wake up and start merging? (at steady state with max throughput, this should be equal to sleep)
struct timeval done; // When did we finish merging?
struct timeval last;
double float_tv(struct timeval& tv) {
return ((double)tv.tv_sec) + ((double)tv.tv_usec) / 1000000.0;
}
friend class mergeManager;
struct timespec last_mini_tick;
struct timespec last_tick;
public: // XXX only accessed during initialization.
pageid_t base_size; // size of table at beginning of merge. for c0, size of table at beginning of current c0-c1 merge round, plus data written since then. (this minus c1->bytes_in_small is the current size)
pageid_t mergeable_size; // protected by mutex.
pageid_t target_size;
pageid_t current_size;
protected:
pageid_t bytes_out_with_overhead;// How many bytes did we write (including internal tree nodes)?
public:
pageid_t bytes_out; // How many bytes worth of tuples did we write?
protected:
pageid_t num_tuples_out; // How many tuples did we write?
pageid_t num_datapages_out; // How many datapages?
public:
pageid_t bytes_in_small; // How many bytes from the small input tree (for C0, we ignore tree overheads)?
protected:
pageid_t bytes_in_small_delta; // How many bytes from the small input tree during this tick (for C0, we ignore tree overheads)?
pageid_t num_tuples_in_small; // Tuples from the small input?
pageid_t bytes_in_large; // Bytes from the large input?
pageid_t num_tuples_in_large; // Tuples from large input?
bool just_handed_off;
int mini_delta;
int delta;
int need_tick;
double in_progress;
double out_progress;
double lifetime_elapsed;
double lifetime_consumed;
double bps;
bool active;
#if EXTENDED_STATS
pageid_t stats_merge_count; // This is the stats_merge_count'th merge
struct timeval stats_sleep; // When did we go to sleep waiting for input?
struct timeval stats_start; // When did we wake up and start merging? (at steady state with max throughput, this should be equal to stats_sleep)
struct timeval stats_done; // When did we finish merging?
struct timespec stats_last_tick;
pageid_t stats_bytes_out_with_overhead;// How many bytes did we write (including internal tree nodes)?
pageid_t stats_num_tuples_out; // How many tuples did we write?
pageid_t stats_num_datapages_out; // How many datapages?
pageid_t stats_bytes_in_small_delta; // How many bytes from the small input tree during this tick (for C0, we ignore tree overheads)?
pageid_t stats_num_tuples_in_small; // Tuples from the small input?
pageid_t stats_num_tuples_in_large; // Tuples from large input?
double stats_lifetime_elapsed;
double stats_lifetime_consumed;
double stats_bps;
#endif
public:
void pretty_print(FILE* fd) {
double sleep_time = float_tv(start) - float_tv(sleep);
double work_time = float_tv(done) - float_tv(start);
#if EXTENDED_STATS
double sleep_time = float_tv(stats_start) - float_tv(stats_sleep);
double work_time = float_tv(stats_done) - float_tv(stats_start);
double total_time = sleep_time + work_time;
double mb_out = ((double)bytes_out_with_overhead) /(1024.0*1024.0);
double mb_out = ((double)stats_bytes_out_with_overhead) /(1024.0*1024.0);
double mb_ins = ((double)bytes_in_small) /(1024.0*1024.0);
double mb_inl = ((double)bytes_in_large) /(1024.0*1024.0);
double kt_out = ((double)num_tuples_out) /(1024.0);
double kt_ins= ((double)num_tuples_in_small) /(1024.0);
double kt_inl = ((double)num_tuples_in_large) /(1024.0);
double kt_out = ((double)stats_num_tuples_out) /(1024.0);
double kt_ins= ((double)stats_num_tuples_in_small) /(1024.0);
double kt_inl = ((double)stats_num_tuples_in_large) /(1024.0);
double mb_hdd = mb_out + mb_inl + (merge_level == 1 ? 0.0 : mb_ins);
double kt_hdd = kt_out + kt_inl + (merge_level == 1 ? 0.0 : kt_ins);
@ -179,16 +184,17 @@ class mergeStats {
" (%.2f; %.2f) wallclock" "\n"
".....................................................................\n"
,
merge_level, merge_count,
merge_level, stats_merge_count,
sleep_time,
work_time,
(long long)mb_out, (long long)kt_out, num_datapages_out, mb_out / work_time, mb_out / total_time, kt_out / work_time, kt_out / total_time,
(long long)mb_out, (long long)kt_out, stats_num_datapages_out, mb_out / work_time, mb_out / total_time, kt_out / work_time, kt_out / total_time,
(long long)mb_ins, (long long)kt_ins, mb_ins / work_time, mb_ins / total_time, kt_ins / work_time, kt_ins / total_time,
(long long)mb_inl, (long long)kt_inl, mb_inl / work_time, mb_inl / total_time, kt_inl / work_time, kt_inl / total_time,
(long long)mb_hdd, (long long)kt_hdd, mb_hdd / work_time, mb_hdd / total_time, kt_hdd / work_time, kt_hdd / total_time,
mb_out / kt_out,
mb_ins / work_time, 1000.0 * work_time / mb_ins, mb_ins / total_time, 1000.0 * total_time / mb_ins
);
#endif
}
};

View file

@ -121,7 +121,7 @@ void * merge_scheduler::memMergeThread() {
c1_prime->force(xid);
merge_count++;
DEBUG("mmt:\tmerge_count %lld #bytes written %lld\n", stats.merge_count, stats.output_size());
DEBUG("mmt:\tmerge_count %lld #bytes written %lld\n", stats.stats_merge_count, stats.output_size());
// Immediately clean out c0 mergeable so that writers may continue.
@ -257,7 +257,7 @@ void * merge_scheduler::diskMergeThread()
diskTreeComponent::iterator *itrB = ltable_->get_tree_c1_mergeable()->open_iterator(&ltable_->merge_mgr->cur_c1_c2_progress_delta, 0.05, &ltable_->shutting_down_);
//create a new tree
diskTreeComponent * c2_prime = new diskTreeComponent(xid, ltable_->internal_region_size, ltable_->datapage_region_size, ltable_->datapage_size, stats, (ltable_->max_c0_size * *ltable_->R() + stats->base_size)/ 1000);
diskTreeComponent * c2_prime = new diskTreeComponent(xid, ltable_->internal_region_size, ltable_->datapage_region_size, ltable_->datapage_size, stats, (uint64_t)(ltable_->max_c0_size * *ltable_->R() + stats->base_size)/ 1000);
// diskTreeComponent * c2_prime = new diskTreeComponent(xid, ltable_->internal_region_size, ltable_->datapage_region_size, ltable_->datapage_size, stats);
rwlc_unlock(ltable_->header_mut);
@ -295,7 +295,7 @@ void * merge_scheduler::diskMergeThread()
DEBUG("\nR = %f\n", *(ltable_->R()));
DEBUG("dmt:\tmerge_count %lld\t#written bytes: %lld\n optimal r %.2f", stats.merge_count, stats.output_size(), *(a->r_i));
DEBUG("dmt:\tmerge_count %lld\t#written bytes: %lld\n optimal r %.2f", stats.stats_merge_count, stats.output_size(), *(a->r_i));
// 10: C2 is never too big
ltable_->set_tree_c2(c2_prime);
stats->handed_off_tree();