diff --git a/diskTreeComponent.cpp b/diskTreeComponent.cpp index c560413..27618a7 100644 --- a/diskTreeComponent.cpp +++ b/diskTreeComponent.cpp @@ -70,14 +70,14 @@ int diskTreeComponent::insertTuple(int xid, datatuple *t) int ret = 0; // no error. if(dp==0) { dp = insertDataPage(xid, t); - // stats->num_datapages_out++; + // stats->stats_num_datapages_out++; } else if(!dp->append(t)) { - // stats->bytes_out_with_overhead += (PAGE_SIZE * dp->get_page_count()); + // stats->stats_bytes_out_with_overhead += (PAGE_SIZE * dp->get_page_count()); ((mergeStats*)stats)->wrote_datapage(dp); dp->writes_done(); delete dp; dp = insertDataPage(xid, t); - // stats->num_datapages_out++; + // stats->stats_num_datapages_out++; } return ret; } diff --git a/mergeManager.cpp b/mergeManager.cpp index 9b5c2d3..9bc6617 100644 --- a/mergeManager.cpp +++ b/mergeManager.cpp @@ -79,22 +79,25 @@ void mergeManager::update_progress(mergeStats * s, int delta) { } s->out_progress = ((double)s->current_size) / (double)s->target_size; + +#if EXTENDED_STATS struct timeval now; gettimeofday(&now, 0); - double elapsed_delta = tv_to_double(&now) - ts_to_double(&s->last_tick); - if(elapsed_delta < 0.0000001) { elapsed_delta = 0.0000001; } - s->lifetime_elapsed += elapsed_delta; - s->lifetime_consumed += s->bytes_in_small_delta; - double tau = 60.0; // number of seconds to look back for window computation. (this is the expected mean residence time in an exponential decay model, so the units are not so intuitive...) - double decay = exp((0.0-elapsed_delta)/tau); + double stats_elapsed_delta = tv_to_double(&now) - ts_to_double(&s->stats_last_tick); + if(stats_elapsed_delta < 0.0000001) { stats_elapsed_delta = 0.0000001; } + s->stats_lifetime_elapsed += stats_elapsed_delta; + s->stats_lifetime_consumed += s->stats_bytes_in_small_delta; + double stats_tau = 60.0; // number of seconds to look back for window computation. (this is the expected mean residence time in an exponential decay model, so the units are not so intuitive...) + double stats_decay = exp((0.0-stats_elapsed_delta)/stats_tau); - double_to_ts(&s->last_tick, tv_to_double(&now)); + double_to_ts(&s->stats_last_tick, tv_to_double(&now)); - double window_bps = ((double)s->bytes_in_small_delta) / (double)elapsed_delta; + double stats_window_bps = ((double)s->stats_bytes_in_small_delta) / (double)stats_elapsed_delta; - s->bps = (1.0-decay) * window_bps + decay * s->bps; + s->stats_bps = (1.0-stats_decay) * stats_window_bps + stats_decay * s->stats_bps; - s->bytes_in_small_delta = 0; + s->stats_bytes_in_small_delta = 0; +#endif rwlc_unlock(ltable->header_mut); @@ -119,7 +122,7 @@ void mergeManager::update_progress(mergeStats * s, int delta) { * * current_size(c_i) = sum(bytes_out_delta) - sum(bytes_in_large_delta) * - * bytes_consumed_by_merger = sum(bytes_in_small_delta) + * bytes_consumed_by_merger = sum(stats_bytes_in_small_delta) */ void mergeManager::tick(mergeStats * s) { if(s->merge_level == 1) { // apply backpressure based on merge progress. @@ -173,8 +176,10 @@ void mergeManager::tick(mergeStats * s) { void mergeManager::read_tuple_from_small_component(int merge_level, datatuple * tup) { if(tup) { mergeStats * s = get_merge_stats(merge_level); - (s->num_tuples_in_small)++; - (s->bytes_in_small_delta) += tup->byte_length(); +#if EXTENDED_STATS + (s->stats_num_tuples_in_small)++; + (s->stats_bytes_in_small_delta) += tup->byte_length(); +#endif (s->bytes_in_small) += tup->byte_length(); update_progress(s, tup->byte_length()); tick(s); @@ -183,7 +188,9 @@ void mergeManager::read_tuple_from_small_component(int merge_level, datatuple * void mergeManager::read_tuple_from_large_component(int merge_level, int tuple_count, pageid_t byte_len) { if(tuple_count) { mergeStats * s = get_merge_stats(merge_level); - s->num_tuples_in_large += tuple_count; +#if EXTENDED_STATS + s->stats_num_tuples_in_large += tuple_count; +#endif s->bytes_in_large += byte_len; update_progress(s, byte_len); } @@ -191,7 +198,9 @@ void mergeManager::read_tuple_from_large_component(int merge_level, int tuple_co void mergeManager::wrote_tuple(int merge_level, datatuple * tup) { mergeStats * s = get_merge_stats(merge_level); - (s->num_tuples_out)++; +#if EXTENDED_STATS + (s->stats_num_tuples_out)++; +#endif (s->bytes_out) += tup->byte_length(); } @@ -202,7 +211,9 @@ void mergeManager::finished_merge(int merge_level) { get_merge_stats(merge_level - 1)->mergeable_size = 0; update_progress(get_merge_stats(merge_level-1), 0); } - gettimeofday(&get_merge_stats(merge_level)->done, 0); +#if EXTENDED_STATS + gettimeofday(&get_merge_stats(merge_level)->stats_done, 0); +#endif update_progress(get_merge_stats(merge_level), 0); } @@ -242,16 +253,19 @@ mergeManager::mergeManager(logtable *ltable): sleeping[0] = false; sleeping[1] = false; sleeping[2] = false; - double_to_ts(&c0->last_tick, tv_to_double(&tv)); - double_to_ts(&c1->last_tick, tv_to_double(&tv)); - double_to_ts(&c2->last_tick, tv_to_double(&tv)); +#if EXTENDED_STATS + double_to_ts(&c0->stats_last_tick, tv_to_double(&tv)); + double_to_ts(&c1->stats_last_tick, tv_to_double(&tv)); + double_to_ts(&c2->stats_last_tick, tv_to_double(&tv)); +#endif still_running = true; pthread_cond_init(&pp_cond, 0); pthread_create(&pp_thread, 0, merge_manager_pretty_print_thread, (void*)this); } void mergeManager::pretty_print(FILE * out) { - pageid_t mb = 1024 * 1024; + +#if EXTENDED_STATS logtable * lt = (logtable*)ltable; bool have_c0 = false; bool have_c0m = false; @@ -265,29 +279,33 @@ void mergeManager::pretty_print(FILE * out) { have_c1m = NULL != lt->get_tree_c1_mergeable() ; have_c2 = NULL != lt->get_tree_c2(); } - + pageid_t mb = 1024 * 1024; fprintf(out,"[merge progress MB/s window (lifetime)]: app [%s %6lldMB ~ %3.0f%%/%3.0f%% %6.1fsec %4.1f (%4.1f)] %s %s [%s %3.0f%% ~ %3.0f%% %4.1f (%4.1f)] %s %s [%s %3.0f%% %4.1f (%4.1f)] %s ", - c0->active ? "RUN" : "---", (long long)(c0->lifetime_consumed / mb), 100.0 * c0->out_progress, 100.0 * ((double)ltable->tree_bytes)/(double)ltable->max_c0_size, c0->lifetime_elapsed, c0->bps/((double)mb), c0->lifetime_consumed/(((double)mb)*c0->lifetime_elapsed), + c0->active ? "RUN" : "---", (long long)(c0->stats_lifetime_consumed / mb), 100.0 * c0->out_progress, 100.0 * ((double)ltable->tree_bytes)/(double)ltable->max_c0_size, c0->stats_lifetime_elapsed, c0->stats_bps/((double)mb), c0->stats_lifetime_consumed/(((double)mb)*c0->stats_lifetime_elapsed), have_c0 ? "C0" : "..", have_c0m ? "C0'" : "...", - c1->active ? "RUN" : "---", 100.0 * c1->in_progress, 100.0 * c1->out_progress, c1->bps/((double)mb), c1->lifetime_consumed/(((double)mb)*c1->lifetime_elapsed), + c1->active ? "RUN" : "---", 100.0 * c1->in_progress, 100.0 * c1->out_progress, c1->stats_bps/((double)mb), c1->stats_lifetime_consumed/(((double)mb)*c1->stats_lifetime_elapsed), have_c1 ? "C1" : "..", have_c1m ? "C1'" : "...", - c2->active ? "RUN" : "---", 100.0 * c2->in_progress, c2->bps/((double)mb), c2->lifetime_consumed/(((double)mb)*c2->lifetime_elapsed), + c2->active ? "RUN" : "---", 100.0 * c2->in_progress, c2->stats_bps/((double)mb), c2->stats_lifetime_consumed/(((double)mb)*c2->stats_lifetime_elapsed), have_c2 ? "C2" : ".."); -// #define PP_SIZES +#endif +//#define PP_SIZES #ifdef PP_SIZES - fprintf(out, "[target cur base in_small in_large, out, mergeable] C0 %4lld %4lld %4lld %4lld %4lld %4lld %4lld ", - c0->target_size/mb, c0->current_size/mb, c0->base_size/mb, c0->bytes_in_small/mb, - c0->bytes_in_large/mb, c0->bytes_out/mb, c0->mergeable_size/mb); + { + pageid_t mb = 1024 * 1024; + fprintf(out, "[target cur base in_small in_large, out, mergeable] C0 %4lld %4lld %4lld %4lld %4lld %4lld %4lld ", + c0->target_size/mb, c0->current_size/mb, c0->base_size/mb, c0->bytes_in_small/mb, + c0->bytes_in_large/mb, c0->bytes_out/mb, c0->mergeable_size/mb); - fprintf(out, "C1 %4lld %4lld %4lld %4lld %4lld %4lld %4lld ", - c1->target_size/mb, c1->current_size/mb, c1->base_size/mb, c1->bytes_in_small/mb, - c1->bytes_in_large/mb, c1->bytes_out/mb, c1->mergeable_size/mb); + fprintf(out, "C1 %4lld %4lld %4lld %4lld %4lld %4lld %4lld ", + c1->target_size/mb, c1->current_size/mb, c1->base_size/mb, c1->bytes_in_small/mb, + c1->bytes_in_large/mb, c1->bytes_out/mb, c1->mergeable_size/mb); - fprintf(out, "C2 ---- %4lld %4lld %4lld %4lld %4lld %4lld ", - /*----*/ c2->current_size/mb, c2->base_size/mb, c2->bytes_in_small/mb, - c2->bytes_in_large/mb, c2->bytes_out/mb, c2->mergeable_size/mb); + fprintf(out, "C2 ---- %4lld %4lld %4lld %4lld %4lld %4lld ", + /*----*/ c2->current_size/mb, c2->base_size/mb, c2->bytes_in_small/mb, + c2->bytes_in_large/mb, c2->bytes_out/mb, c2->mergeable_size/mb); + } #endif // fprintf(out, "Throttle: %6.1f%% (cur) %6.1f%% (overall) ", 100.0*(last_throttle_seconds/(last_elapsed_seconds)), 100.0*(throttle_seconds/(elapsed_seconds))); // fprintf(out, "C0 size %4lld resident %4lld ", @@ -304,8 +322,8 @@ void mergeManager::pretty_print(FILE * out) { fflush(out); #if 0 // XXX would like to bring this back somehow... assert((!c1->active) || (c1->in_progress >= -0.01 && c1->in_progress < 1.02)); -#endif assert((!c2->active) || (c2->in_progress >= -0.01 && c2->in_progress < 1.10)); +#endif fprintf(out, "\r"); } diff --git a/mergeStats.h b/mergeStats.h index f1dbaad..15ef37b 100644 --- a/mergeStats.h +++ b/mergeStats.h @@ -12,6 +12,8 @@ #undef try #undef end +#define EXTENDED_STATS 1 + #include #include #include "datatuple.h" @@ -23,34 +25,39 @@ class mergeStats { public: mergeStats(int merge_level, int64_t target_size) : merge_level(merge_level), - merge_count(0), base_size(0), mergeable_size(0), target_size(target_size), current_size(0), - bytes_out_with_overhead(0), bytes_out(0), - num_tuples_out(0), - num_datapages_out(0), bytes_in_small(0), - bytes_in_small_delta(0), - num_tuples_in_small(0), bytes_in_large(0), - num_tuples_in_large(0), just_handed_off(false), - mini_delta(0), delta(0), need_tick(0), in_progress(0), out_progress(0), - lifetime_elapsed(0), - lifetime_consumed(0), - bps(10.0*1024.0*1024.0), - active(false) { - gettimeofday(&sleep,0); + active(false) +#if EXTENDED_STATS + , + stats_merge_count(0), + stats_bytes_out_with_overhead(0), + stats_num_tuples_out(0), + stats_num_datapages_out(0), + stats_bytes_in_small_delta(0), + stats_num_tuples_in_small(0), + stats_num_tuples_in_large(0), + stats_lifetime_elapsed(0), + stats_lifetime_consumed(0), + stats_bps(10.0*1024.0*1024.0) +#endif // EXTENDED_STATS + { +#if EXTENDED_STATS + gettimeofday(&stats_sleep,0); + struct timeval last; gettimeofday(&last,0); - mergeManager::double_to_ts(&last_tick, mergeManager::tv_to_double(&last)); - mergeManager::double_to_ts(&last_mini_tick, mergeManager::tv_to_double(&last)); + mergeManager::double_to_ts(&stats_last_tick, mergeManager::tv_to_double(&last)); +#endif } ~mergeStats() { } void new_merge2() { @@ -61,26 +68,29 @@ class mergeStats { } base_size = bytes_out; current_size = base_size; - merge_count++; - bytes_out_with_overhead = 0; bytes_out = 0; - num_tuples_out = 0; - num_datapages_out = 0; bytes_in_small = 0; - bytes_in_small_delta = 0; - num_tuples_in_small = 0; bytes_in_large = 0; - num_tuples_in_large = 0; in_progress = 0; - gettimeofday(&sleep,0); +#if EXTENDED_STATS + stats_merge_count++; + stats_bytes_out_with_overhead = 0; + stats_num_tuples_out = 0; + stats_num_datapages_out = 0; + stats_bytes_in_small_delta = 0; + stats_num_tuples_in_small = 0; + stats_num_tuples_in_large = 0; + gettimeofday(&stats_sleep,0); +#endif } void starting_merge() { active = true; - gettimeofday(&start, 0); +#if EXTENDED_STATS + gettimeofday(&stats_start, 0); + struct timeval last; gettimeofday(&last, 0); - mergeManager::double_to_ts(&last_tick, mergeManager::tv_to_double(&last)); - mergeManager::double_to_ts(&last_mini_tick, mergeManager::tv_to_double(&last)); - + mergeManager::double_to_ts(&stats_last_tick, mergeManager::tv_to_double(&last)); +#endif } void handed_off_tree() { if(merge_level == 2) { @@ -92,75 +102,70 @@ class mergeStats { void merged_tuples(datatuple * merged, datatuple * small, datatuple * large) { } void wrote_datapage(DataPage *dp) { - num_datapages_out++; - bytes_out_with_overhead += (PAGE_SIZE * dp->get_page_count()); +#if EXTENDED_STATS + stats_num_datapages_out++; + stats_bytes_out_with_overhead += (PAGE_SIZE * dp->get_page_count()); +#endif } pageid_t output_size() { return bytes_out; } const int merge_level; // 1 => C0->C1, 2 => C1->C2 protected: - pageid_t merge_count; // This is the merge_count'th merge - struct timeval sleep; // When did we go to sleep waiting for input? - struct timeval start; // When did we wake up and start merging? (at steady state with max throughput, this should be equal to sleep) - struct timeval done; // When did we finish merging? - struct timeval last; double float_tv(struct timeval& tv) { return ((double)tv.tv_sec) + ((double)tv.tv_usec) / 1000000.0; } friend class mergeManager; - struct timespec last_mini_tick; - struct timespec last_tick; public: // XXX only accessed during initialization. pageid_t base_size; // size of table at beginning of merge. for c0, size of table at beginning of current c0-c1 merge round, plus data written since then. (this minus c1->bytes_in_small is the current size) pageid_t mergeable_size; // protected by mutex. pageid_t target_size; pageid_t current_size; - protected: - - pageid_t bytes_out_with_overhead;// How many bytes did we write (including internal tree nodes)? - public: pageid_t bytes_out; // How many bytes worth of tuples did we write? - protected: - pageid_t num_tuples_out; // How many tuples did we write? - pageid_t num_datapages_out; // How many datapages? - public: pageid_t bytes_in_small; // How many bytes from the small input tree (for C0, we ignore tree overheads)? protected: - pageid_t bytes_in_small_delta; // How many bytes from the small input tree during this tick (for C0, we ignore tree overheads)? - pageid_t num_tuples_in_small; // Tuples from the small input? pageid_t bytes_in_large; // Bytes from the large input? - pageid_t num_tuples_in_large; // Tuples from large input? bool just_handed_off; - int mini_delta; int delta; int need_tick; double in_progress; double out_progress; - double lifetime_elapsed; - double lifetime_consumed; - - double bps; - bool active; +#if EXTENDED_STATS + pageid_t stats_merge_count; // This is the stats_merge_count'th merge + struct timeval stats_sleep; // When did we go to sleep waiting for input? + struct timeval stats_start; // When did we wake up and start merging? (at steady state with max throughput, this should be equal to stats_sleep) + struct timeval stats_done; // When did we finish merging? + struct timespec stats_last_tick; + pageid_t stats_bytes_out_with_overhead;// How many bytes did we write (including internal tree nodes)? + pageid_t stats_num_tuples_out; // How many tuples did we write? + pageid_t stats_num_datapages_out; // How many datapages? + pageid_t stats_bytes_in_small_delta; // How many bytes from the small input tree during this tick (for C0, we ignore tree overheads)? + pageid_t stats_num_tuples_in_small; // Tuples from the small input? + pageid_t stats_num_tuples_in_large; // Tuples from large input? + double stats_lifetime_elapsed; + double stats_lifetime_consumed; + double stats_bps; +#endif public: void pretty_print(FILE* fd) { - double sleep_time = float_tv(start) - float_tv(sleep); - double work_time = float_tv(done) - float_tv(start); +#if EXTENDED_STATS + double sleep_time = float_tv(stats_start) - float_tv(stats_sleep); + double work_time = float_tv(stats_done) - float_tv(stats_start); double total_time = sleep_time + work_time; - double mb_out = ((double)bytes_out_with_overhead) /(1024.0*1024.0); + double mb_out = ((double)stats_bytes_out_with_overhead) /(1024.0*1024.0); double mb_ins = ((double)bytes_in_small) /(1024.0*1024.0); double mb_inl = ((double)bytes_in_large) /(1024.0*1024.0); - double kt_out = ((double)num_tuples_out) /(1024.0); - double kt_ins= ((double)num_tuples_in_small) /(1024.0); - double kt_inl = ((double)num_tuples_in_large) /(1024.0); + double kt_out = ((double)stats_num_tuples_out) /(1024.0); + double kt_ins= ((double)stats_num_tuples_in_small) /(1024.0); + double kt_inl = ((double)stats_num_tuples_in_large) /(1024.0); double mb_hdd = mb_out + mb_inl + (merge_level == 1 ? 0.0 : mb_ins); double kt_hdd = kt_out + kt_inl + (merge_level == 1 ? 0.0 : kt_ins); @@ -179,16 +184,17 @@ class mergeStats { " (%.2f; %.2f) wallclock" "\n" ".....................................................................\n" , - merge_level, merge_count, + merge_level, stats_merge_count, sleep_time, work_time, - (long long)mb_out, (long long)kt_out, num_datapages_out, mb_out / work_time, mb_out / total_time, kt_out / work_time, kt_out / total_time, + (long long)mb_out, (long long)kt_out, stats_num_datapages_out, mb_out / work_time, mb_out / total_time, kt_out / work_time, kt_out / total_time, (long long)mb_ins, (long long)kt_ins, mb_ins / work_time, mb_ins / total_time, kt_ins / work_time, kt_ins / total_time, (long long)mb_inl, (long long)kt_inl, mb_inl / work_time, mb_inl / total_time, kt_inl / work_time, kt_inl / total_time, (long long)mb_hdd, (long long)kt_hdd, mb_hdd / work_time, mb_hdd / total_time, kt_hdd / work_time, kt_hdd / total_time, mb_out / kt_out, mb_ins / work_time, 1000.0 * work_time / mb_ins, mb_ins / total_time, 1000.0 * total_time / mb_ins ); +#endif } }; diff --git a/merger.cpp b/merger.cpp index 30f4255..4cc9b2a 100644 --- a/merger.cpp +++ b/merger.cpp @@ -121,7 +121,7 @@ void * merge_scheduler::memMergeThread() { c1_prime->force(xid); merge_count++; - DEBUG("mmt:\tmerge_count %lld #bytes written %lld\n", stats.merge_count, stats.output_size()); + DEBUG("mmt:\tmerge_count %lld #bytes written %lld\n", stats.stats_merge_count, stats.output_size()); // Immediately clean out c0 mergeable so that writers may continue. @@ -257,7 +257,7 @@ void * merge_scheduler::diskMergeThread() diskTreeComponent::iterator *itrB = ltable_->get_tree_c1_mergeable()->open_iterator(<able_->merge_mgr->cur_c1_c2_progress_delta, 0.05, <able_->shutting_down_); //create a new tree - diskTreeComponent * c2_prime = new diskTreeComponent(xid, ltable_->internal_region_size, ltable_->datapage_region_size, ltable_->datapage_size, stats, (ltable_->max_c0_size * *ltable_->R() + stats->base_size)/ 1000); + diskTreeComponent * c2_prime = new diskTreeComponent(xid, ltable_->internal_region_size, ltable_->datapage_region_size, ltable_->datapage_size, stats, (uint64_t)(ltable_->max_c0_size * *ltable_->R() + stats->base_size)/ 1000); // diskTreeComponent * c2_prime = new diskTreeComponent(xid, ltable_->internal_region_size, ltable_->datapage_region_size, ltable_->datapage_size, stats); rwlc_unlock(ltable_->header_mut); @@ -295,7 +295,7 @@ void * merge_scheduler::diskMergeThread() DEBUG("\nR = %f\n", *(ltable_->R())); - DEBUG("dmt:\tmerge_count %lld\t#written bytes: %lld\n optimal r %.2f", stats.merge_count, stats.output_size(), *(a->r_i)); + DEBUG("dmt:\tmerge_count %lld\t#written bytes: %lld\n optimal r %.2f", stats.stats_merge_count, stats.output_size(), *(a->r_i)); // 10: C2 is never too big ltable_->set_tree_c2(c2_prime); stats->handed_off_tree();