diff --git a/logstore.cpp b/logstore.cpp index 5d87cd7..ddd6863 100644 --- a/logstore.cpp +++ b/logstore.cpp @@ -109,7 +109,6 @@ recordid logtable::allocTable(int xid) merge_mgr = new mergeManager(this); merge_mgr->set_c0_size(max_c0_size); merge_mgr->new_merge(0); - merge_mgr->get_merge_stats(0)->starting_merge(); tree_c0 = new memTreeComponent::rbtree_t; tbl_header.merge_manager = merge_mgr->talloc(xid); @@ -129,7 +128,6 @@ void logtable::openTable(int xid, recordid rid) { merge_mgr->set_c0_size(max_c0_size); merge_mgr->new_merge(0); - merge_mgr->get_merge_stats(0)->starting_merge(); } template diff --git a/mergeManager.cpp b/mergeManager.cpp index 268066f..056c69e 100644 --- a/mergeManager.cpp +++ b/mergeManager.cpp @@ -49,6 +49,14 @@ void mergeManager::new_merge(int mergeLevel) { // target_size is infinity... s->new_merge2(); } else { abort(); } +#ifdef EXTENDED_STATS + gettimeofday(&s->stats_start,0); + double elapsed = (tv_to_double(&s->stats_start) - tv_to_double(&s->stats_sleep)); + s->stats_lifetime_elapsed += elapsed; + (s->stats_elapsed) = elapsed; + (s->stats_active) = 0; + +#endif } void mergeManager::set_c0_size(int64_t size) { assert(size); @@ -89,7 +97,10 @@ void mergeManager::update_progress(mergeStats * s, int delta) { gettimeofday(&now, 0); double stats_elapsed_delta = tv_to_double(&now) - ts_to_double(&s->stats_last_tick); if(stats_elapsed_delta < 0.0000001) { stats_elapsed_delta = 0.0000001; } + s->stats_lifetime_active += stats_elapsed_delta; s->stats_lifetime_elapsed += stats_elapsed_delta; + s->stats_active += stats_elapsed_delta; + s->stats_elapsed += stats_elapsed_delta; s->stats_lifetime_consumed += s->stats_bytes_in_small_delta; double stats_tau = 60.0; // number of seconds to look back for window computation. (this is the expected mean residence time in an exponential decay model, so the units are not so intuitive...) double stats_decay = exp((0.0-stats_elapsed_delta)/stats_tau); @@ -213,14 +224,26 @@ void mergeManager::wrote_tuple(int merge_level, datatuple * tup) { } void mergeManager::finished_merge(int merge_level) { - update_progress(get_merge_stats(merge_level), 0); - get_merge_stats(merge_level)->active = false; + mergeStats *s = get_merge_stats(merge_level); + update_progress(s, 0); + s->active = false; if(merge_level != 0) { get_merge_stats(merge_level - 1)->mergeable_size = 0; update_progress(get_merge_stats(merge_level-1), 0); } #if EXTENDED_STATS - gettimeofday(&get_merge_stats(merge_level)->stats_done, 0); + gettimeofday(&s->stats_done, 0); + double elapsed = tv_to_double(&s->stats_done) - ts_to_double(&s->stats_last_tick); + (s->stats_lifetime_active) += elapsed; + (s->stats_lifetime_elapsed) += elapsed; + (s->stats_elapsed) += elapsed; + (s->stats_active) += elapsed; + memcpy(&s->stats_sleep, &s->stats_done, sizeof(s->stats_sleep)); +#define VERBOSE +#ifdef VERBOSE + fprintf(stdout, "\n"); + s->pretty_print(stdout); +#endif #endif update_progress(get_merge_stats(merge_level), 0); } diff --git a/mergeManager.h b/mergeManager.h index 57a6c8a..c5bf077 100644 --- a/mergeManager.h +++ b/mergeManager.h @@ -43,6 +43,7 @@ public: recordid talloc(int xid); ~mergeManager(); + void finished_merge(int merge_level); void new_merge(int mergelevel); void set_c0_size(int64_t size); void update_progress(mergeStats *s, int delta); @@ -58,22 +59,45 @@ public: void read_tuple_from_large_component(int merge_level, int tuple_count, pageid_t byte_len); void wrote_tuple(int merge_level, datatuple * tup); - void finished_merge(int merge_level); void pretty_print(FILE * out); void *pretty_print_thread(); private: + /** + * How far apart are the c0-c1 and c1-c2 mergers? + * + * This is c1->out_progress - c2->in_progress. We want the downstream merger + * to be slightly ahead of the upstream one so that we can mask latency blips + * due to tearing down the downstream merger and starting the new one. + * Therefore, this should always be slightly negative. + * + * TODO remove c1_c2_delta, which is derived, but difficult (from a synchronization perspective) to compute? + */ double c1_c2_delta; + /** Helper method for the constructors */ void init_helper(void); + /** + * Serialization format for Stasis merge statistics header. + * + * The small amount of state maintained by mergeManager consists of derived + * and runtime-only fields. This struct reflects that, and only contains + * pointers to marshaled versions of the per-tree component statistics. + */ struct marshalled_header { - recordid c0; + recordid c0; // Probably redundant, but included for symmetry. recordid c1; recordid c2; }; + /** + * A pointer to the logtable that we manage statistics for. Most usages of + * this are layering violations; the main exception is in pretty_print. + * + * TODO: remove mergeManager->ltable? + */ logtable* ltable; - mergeStats * c0; - mergeStats * c1; - mergeStats * c2; + mergeStats * c0; /// Per-tree component statistics for c0 and c0_mergeable (the latter should always be null...) + mergeStats * c1; /// Per-tree component statistics for c1 and c1_mergeable. + mergeStats * c2; /// Per-tree component statistics for c2. // The following fields are used to shut down the pretty print thread. bool still_running; diff --git a/mergeStats.h b/mergeStats.h index ffde8cc..924a0d8 100644 --- a/mergeStats.h +++ b/mergeStats.h @@ -26,6 +26,18 @@ #undef end class mergeStats { + private: + void init_helper(void) { +#if EXTENDED_STATS + gettimeofday(&stats_sleep,0); + gettimeofday(&stats_start,0); + gettimeofday(&stats_done,0); + + struct timeval last; + gettimeofday(&last,0); + mergeManager::double_to_ts(&stats_last_tick, mergeManager::tv_to_double(&last)); +#endif + } public: mergeStats(int merge_level, int64_t target_size) : merge_level(merge_level), @@ -51,16 +63,14 @@ class mergeStats { stats_num_tuples_in_small(0), stats_num_tuples_in_large(0), stats_lifetime_elapsed(0), + stats_lifetime_active(0), + stats_elapsed(0), + stats_active(0), stats_lifetime_consumed(0), stats_bps(10.0*1024.0*1024.0) #endif // EXTENDED_STATS { -#if EXTENDED_STATS - gettimeofday(&stats_sleep,0); - struct timeval last; - gettimeofday(&last,0); - mergeManager::double_to_ts(&stats_last_tick, mergeManager::tv_to_double(&last)); -#endif + init_helper(); } mergeStats(int xid, recordid rid) { marshalled_header h; @@ -78,6 +88,22 @@ class mergeStats { in_progress = 0; out_progress = ((double)base_size) / (double)target_size; active = false; +#if EXTENDED_STATS + stats_merge_count = 0; + stats_bytes_out_with_overhead = 0; + stats_num_tuples_out = 0; + stats_num_datapages_out = 0; + stats_bytes_in_small_delta = 0; + stats_num_tuples_in_small = 0; + stats_num_tuples_in_large = 0; + stats_lifetime_elapsed = 0; + stats_lifetime_active = 0; + stats_elapsed = 0; + stats_active = 0; + stats_lifetime_consumed = 0; + stats_bps = 10.0*1024.0*1024.0; +#endif + init_helper(); } recordid talloc(int xid) { return Talloc(xid, sizeof(marshalled_header)); @@ -110,7 +136,6 @@ class mergeStats { stats_bytes_in_small_delta = 0; stats_num_tuples_in_small = 0; stats_num_tuples_in_large = 0; - gettimeofday(&stats_sleep,0); #endif } void starting_merge() { @@ -162,53 +187,61 @@ class mergeStats { pageid_t mergeable_size; pageid_t target_size; // Needed? }; - public: // XXX eliminate protected fields. - int merge_level; // 1 => C0->C1, 2 => C1->C2 - pageid_t base_size; // size of table at beginning of merge. for c0, size of table at beginning of current c0-c1 merge round, plus data written since then. (this minus c1->bytes_in_small is the current size) + public: // XXX eliminate public fields; these are still required because various bits of calculation (bloom filter size, estimated c0 run length, etc...) are managed outside of mergeManager. + int merge_level; /// The tree component / merge level that we're tracking. 1 => C0->C1, 2 => C1->C2 + pageid_t base_size; /// size of existing tree component (c[merge_level]') at beginning of current merge. protected: - pageid_t mergeable_size; // protected by mutex. + pageid_t mergeable_size; /// The size of c[merge_level]_mergeable, assuming it exists. Protected by mutex. public: - pageid_t target_size; + pageid_t target_size; /// How big should the c[merge_level] tree component be? protected: - pageid_t bytes_out; // How many bytes worth of tuples did we write? + pageid_t bytes_out; /// For C0, number of bytes consumed by downstream merger. For merge_level 1 and 2, number of bytes enqueued for the downstream (C1-C2, and nil) mergers. public: - pageid_t bytes_in_small; // How many bytes from the small input tree (for C0, we ignore tree overheads)? + pageid_t bytes_in_small; /// For C0, number of bytes inserted by application. For C1, C2, number of bytes read from small tree in C(n-1) - Cn merger. protected: - pageid_t bytes_in_large; // Bytes from the large input? + pageid_t bytes_in_large; /// Bytes from the large input? (for C0, bytes deleted due to updates) + // todo: simplify confusing hand off logic, and remove this field? bool just_handed_off; + // These fields are used to amortize mutex acquisitions. int delta; int need_tick; + + // todo in_progress and out_progress are derived fields. eliminate them? double in_progress; double out_progress; - bool active; + bool active; /// True if this merger is running, or blocked by rate limiting. False if the upstream input does not exist. #if EXTENDED_STATS - pageid_t stats_merge_count; // This is the stats_merge_count'th merge - struct timeval stats_sleep; // When did we go to sleep waiting for input? - struct timeval stats_start; // When did we wake up and start merging? (at steady state with max throughput, this should be equal to stats_sleep) - struct timeval stats_done; // When did we finish merging? + pageid_t stats_merge_count; /// This is the stats_merge_count'th merge + struct timeval stats_sleep; /// When did we go to sleep waiting for input? + struct timeval stats_start; /// When did we wake up and start merging? (at steady state with max throughput, this should be equal to stats_sleep) + struct timeval stats_done; /// When did we finish merging? struct timespec stats_last_tick; - pageid_t stats_bytes_out_with_overhead;// How many bytes did we write (including internal tree nodes)? - pageid_t stats_num_tuples_out; // How many tuples did we write? - pageid_t stats_num_datapages_out; // How many datapages? - pageid_t stats_bytes_in_small_delta; // How many bytes from the small input tree during this tick (for C0, we ignore tree overheads)? - pageid_t stats_num_tuples_in_small; // Tuples from the small input? - pageid_t stats_num_tuples_in_large; // Tuples from large input? - double stats_lifetime_elapsed; - double stats_lifetime_consumed; - double stats_bps; + pageid_t stats_bytes_out_with_overhead;/// How many bytes did we write (including internal tree nodes)? + pageid_t stats_num_tuples_out; /// How many tuples did we write? + pageid_t stats_num_datapages_out; /// How many datapages? + pageid_t stats_bytes_in_small_delta; /// How many bytes from the small input tree during this tick (for C0, we ignore tree overheads)? + pageid_t stats_num_tuples_in_small; /// Tuples from the small input? + pageid_t stats_num_tuples_in_large; /// Tuples from large input? + double stats_lifetime_elapsed; /// How long has this tree existed, in seconds? + double stats_lifetime_active; /// How long has this tree been running (i.e.; active = true), in seconds? + double stats_elapsed; /// How long did this merge take, including idle time (not valid until after merge is complete)? + double stats_active; /// How long did this merge take once it started running? + double stats_lifetime_consumed; /// How many bytes has this tree consumed from upstream mergers? + double stats_bps; /// Effective throughput while active. #endif public: void pretty_print(FILE* fd) { #if EXTENDED_STATS - double sleep_time = float_tv(stats_start) - float_tv(stats_sleep); - double work_time = float_tv(stats_done) - float_tv(stats_start); - double total_time = sleep_time + work_time; - double mb_out = ((double)stats_bytes_out_with_overhead) /(1024.0*1024.0); + double sleep_time = stats_elapsed - stats_active; + double work_time = stats_active; + double total_time = stats_elapsed; + double mb_out = ((double)bytes_out) /(1024.0*1024.0); + double phys_mb_out = ((double)stats_bytes_out_with_overhead) / (1024.0 * 1024.0); double mb_ins = ((double)bytes_in_small) /(1024.0*1024.0); double mb_inl = ((double)bytes_in_large) /(1024.0*1024.0); double kt_out = ((double)stats_num_tuples_out) /(1024.0); @@ -227,7 +260,7 @@ class mergeStats { "Read (large) %7lld %7lld - " " %6.1f %6.1f" " %8.1f %8.1f" "\n" "Disk %7lld %7lld - " " %6.1f %6.1f" " %8.1f %8.1f" "\n" ".....................................................................\n" - "avg tuple len: %6.2fkb\n" + "avg tuple len: %6.2fKB w/ disk ovehead: %6.2fKB\n" "effective throughput: (mb/s ; nsec/byte): (%.2f; %.2f) active" "\n" " (%.2f; %.2f) wallclock" "\n" ".....................................................................\n" @@ -239,7 +272,7 @@ class mergeStats { (long long)mb_ins, (long long)kt_ins, mb_ins / work_time, mb_ins / total_time, kt_ins / work_time, kt_ins / total_time, (long long)mb_inl, (long long)kt_inl, mb_inl / work_time, mb_inl / total_time, kt_inl / work_time, kt_inl / total_time, (long long)mb_hdd, (long long)kt_hdd, mb_hdd / work_time, mb_hdd / total_time, kt_hdd / work_time, kt_hdd / total_time, - mb_out / kt_out, + mb_out / kt_out, phys_mb_out / kt_out, mb_ins / work_time, 1000.0 * work_time / mb_ins, mb_ins / total_time, 1000.0 * total_time / mb_ins ); #endif diff --git a/merger.cpp b/merger.cpp index d779351..9cdc568 100644 --- a/merger.cpp +++ b/merger.cpp @@ -145,6 +145,7 @@ void * merge_scheduler::memMergeThread() { //TODO: this is simplistic for now //6: if c1' is too big, signal the other merger + // XXX move this to mergeManager, and make bytes_in_small be protected. if(stats->bytes_in_small) { // update c0 effective size. double frac = 1.0/(double)merge_count; @@ -156,7 +157,7 @@ void * merge_scheduler::memMergeThread() { //ltable_->merge_mgr->get_merge_stats(0)->target_size = ltable_->mean_c0_run_length; } - printf("Merge done. R = %f MemSize = %lld Mean = %lld, This = %lld, Count = %d factor %3.3fcur%3.3favg\n", *ltable_->R(), (long long)ltable_->max_c0_size, (long long int)ltable_->mean_c0_run_length, stats->bytes_in_small, merge_count, ((double)stats->bytes_in_small) / (double)ltable_->max_c0_size, ((double)ltable_->mean_c0_run_length) / (double)ltable_->max_c0_size); + printf("\nMerge done. R = %f MemSize = %lld Mean = %lld, This = %lld, Count = %d factor %3.3fcur%3.3favg\n", *ltable_->R(), (long long)ltable_->max_c0_size, (long long int)ltable_->mean_c0_run_length, stats->bytes_in_small, merge_count, ((double)stats->bytes_in_small) / (double)ltable_->max_c0_size, ((double)ltable_->mean_c0_run_length) / (double)ltable_->max_c0_size); assert(*ltable_->R() >= MIN_R); bool signal_c2 = (new_c1_size / ltable_->mean_c0_run_length > *ltable_->R());