extended stats cleanup, verbose mode, documentation

git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@1518 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
sears 2010-12-17 18:48:26 +00:00
parent 052508fb76
commit a6a0ce3f54
5 changed files with 125 additions and 46 deletions

View file

@ -109,7 +109,6 @@ recordid logtable<TUPLE>::allocTable(int xid)
merge_mgr = new mergeManager(this); merge_mgr = new mergeManager(this);
merge_mgr->set_c0_size(max_c0_size); merge_mgr->set_c0_size(max_c0_size);
merge_mgr->new_merge(0); merge_mgr->new_merge(0);
merge_mgr->get_merge_stats(0)->starting_merge();
tree_c0 = new memTreeComponent<datatuple>::rbtree_t; tree_c0 = new memTreeComponent<datatuple>::rbtree_t;
tbl_header.merge_manager = merge_mgr->talloc(xid); tbl_header.merge_manager = merge_mgr->talloc(xid);
@ -129,7 +128,6 @@ void logtable<TUPLE>::openTable(int xid, recordid rid) {
merge_mgr->set_c0_size(max_c0_size); merge_mgr->set_c0_size(max_c0_size);
merge_mgr->new_merge(0); merge_mgr->new_merge(0);
merge_mgr->get_merge_stats(0)->starting_merge();
} }
template<class TUPLE> template<class TUPLE>

View file

@ -49,6 +49,14 @@ void mergeManager::new_merge(int mergeLevel) {
// target_size is infinity... // target_size is infinity...
s->new_merge2(); s->new_merge2();
} else { abort(); } } else { abort(); }
#ifdef EXTENDED_STATS
gettimeofday(&s->stats_start,0);
double elapsed = (tv_to_double(&s->stats_start) - tv_to_double(&s->stats_sleep));
s->stats_lifetime_elapsed += elapsed;
(s->stats_elapsed) = elapsed;
(s->stats_active) = 0;
#endif
} }
void mergeManager::set_c0_size(int64_t size) { void mergeManager::set_c0_size(int64_t size) {
assert(size); assert(size);
@ -89,7 +97,10 @@ void mergeManager::update_progress(mergeStats * s, int delta) {
gettimeofday(&now, 0); gettimeofday(&now, 0);
double stats_elapsed_delta = tv_to_double(&now) - ts_to_double(&s->stats_last_tick); double stats_elapsed_delta = tv_to_double(&now) - ts_to_double(&s->stats_last_tick);
if(stats_elapsed_delta < 0.0000001) { stats_elapsed_delta = 0.0000001; } if(stats_elapsed_delta < 0.0000001) { stats_elapsed_delta = 0.0000001; }
s->stats_lifetime_active += stats_elapsed_delta;
s->stats_lifetime_elapsed += stats_elapsed_delta; s->stats_lifetime_elapsed += stats_elapsed_delta;
s->stats_active += stats_elapsed_delta;
s->stats_elapsed += stats_elapsed_delta;
s->stats_lifetime_consumed += s->stats_bytes_in_small_delta; s->stats_lifetime_consumed += s->stats_bytes_in_small_delta;
double stats_tau = 60.0; // number of seconds to look back for window computation. (this is the expected mean residence time in an exponential decay model, so the units are not so intuitive...) double stats_tau = 60.0; // number of seconds to look back for window computation. (this is the expected mean residence time in an exponential decay model, so the units are not so intuitive...)
double stats_decay = exp((0.0-stats_elapsed_delta)/stats_tau); double stats_decay = exp((0.0-stats_elapsed_delta)/stats_tau);
@ -213,14 +224,26 @@ void mergeManager::wrote_tuple(int merge_level, datatuple * tup) {
} }
void mergeManager::finished_merge(int merge_level) { void mergeManager::finished_merge(int merge_level) {
update_progress(get_merge_stats(merge_level), 0); mergeStats *s = get_merge_stats(merge_level);
get_merge_stats(merge_level)->active = false; update_progress(s, 0);
s->active = false;
if(merge_level != 0) { if(merge_level != 0) {
get_merge_stats(merge_level - 1)->mergeable_size = 0; get_merge_stats(merge_level - 1)->mergeable_size = 0;
update_progress(get_merge_stats(merge_level-1), 0); update_progress(get_merge_stats(merge_level-1), 0);
} }
#if EXTENDED_STATS #if EXTENDED_STATS
gettimeofday(&get_merge_stats(merge_level)->stats_done, 0); gettimeofday(&s->stats_done, 0);
double elapsed = tv_to_double(&s->stats_done) - ts_to_double(&s->stats_last_tick);
(s->stats_lifetime_active) += elapsed;
(s->stats_lifetime_elapsed) += elapsed;
(s->stats_elapsed) += elapsed;
(s->stats_active) += elapsed;
memcpy(&s->stats_sleep, &s->stats_done, sizeof(s->stats_sleep));
#define VERBOSE
#ifdef VERBOSE
fprintf(stdout, "\n");
s->pretty_print(stdout);
#endif
#endif #endif
update_progress(get_merge_stats(merge_level), 0); update_progress(get_merge_stats(merge_level), 0);
} }

View file

@ -43,6 +43,7 @@ public:
recordid talloc(int xid); recordid talloc(int xid);
~mergeManager(); ~mergeManager();
void finished_merge(int merge_level);
void new_merge(int mergelevel); void new_merge(int mergelevel);
void set_c0_size(int64_t size); void set_c0_size(int64_t size);
void update_progress(mergeStats *s, int delta); void update_progress(mergeStats *s, int delta);
@ -58,22 +59,45 @@ public:
void read_tuple_from_large_component(int merge_level, int tuple_count, pageid_t byte_len); void read_tuple_from_large_component(int merge_level, int tuple_count, pageid_t byte_len);
void wrote_tuple(int merge_level, datatuple * tup); void wrote_tuple(int merge_level, datatuple * tup);
void finished_merge(int merge_level);
void pretty_print(FILE * out); void pretty_print(FILE * out);
void *pretty_print_thread(); void *pretty_print_thread();
private: private:
/**
* How far apart are the c0-c1 and c1-c2 mergers?
*
* This is c1->out_progress - c2->in_progress. We want the downstream merger
* to be slightly ahead of the upstream one so that we can mask latency blips
* due to tearing down the downstream merger and starting the new one.
* Therefore, this should always be slightly negative.
*
* TODO remove c1_c2_delta, which is derived, but difficult (from a synchronization perspective) to compute?
*/
double c1_c2_delta; double c1_c2_delta;
/** Helper method for the constructors */
void init_helper(void); void init_helper(void);
/**
* Serialization format for Stasis merge statistics header.
*
* The small amount of state maintained by mergeManager consists of derived
* and runtime-only fields. This struct reflects that, and only contains
* pointers to marshaled versions of the per-tree component statistics.
*/
struct marshalled_header { struct marshalled_header {
recordid c0; recordid c0; // Probably redundant, but included for symmetry.
recordid c1; recordid c1;
recordid c2; recordid c2;
}; };
/**
* A pointer to the logtable that we manage statistics for. Most usages of
* this are layering violations; the main exception is in pretty_print.
*
* TODO: remove mergeManager->ltable?
*/
logtable<datatuple>* ltable; logtable<datatuple>* ltable;
mergeStats * c0; mergeStats * c0; /// Per-tree component statistics for c0 and c0_mergeable (the latter should always be null...)
mergeStats * c1; mergeStats * c1; /// Per-tree component statistics for c1 and c1_mergeable.
mergeStats * c2; mergeStats * c2; /// Per-tree component statistics for c2.
// The following fields are used to shut down the pretty print thread. // The following fields are used to shut down the pretty print thread.
bool still_running; bool still_running;

View file

@ -26,6 +26,18 @@
#undef end #undef end
class mergeStats { class mergeStats {
private:
void init_helper(void) {
#if EXTENDED_STATS
gettimeofday(&stats_sleep,0);
gettimeofday(&stats_start,0);
gettimeofday(&stats_done,0);
struct timeval last;
gettimeofday(&last,0);
mergeManager::double_to_ts(&stats_last_tick, mergeManager::tv_to_double(&last));
#endif
}
public: public:
mergeStats(int merge_level, int64_t target_size) : mergeStats(int merge_level, int64_t target_size) :
merge_level(merge_level), merge_level(merge_level),
@ -51,16 +63,14 @@ class mergeStats {
stats_num_tuples_in_small(0), stats_num_tuples_in_small(0),
stats_num_tuples_in_large(0), stats_num_tuples_in_large(0),
stats_lifetime_elapsed(0), stats_lifetime_elapsed(0),
stats_lifetime_active(0),
stats_elapsed(0),
stats_active(0),
stats_lifetime_consumed(0), stats_lifetime_consumed(0),
stats_bps(10.0*1024.0*1024.0) stats_bps(10.0*1024.0*1024.0)
#endif // EXTENDED_STATS #endif // EXTENDED_STATS
{ {
#if EXTENDED_STATS init_helper();
gettimeofday(&stats_sleep,0);
struct timeval last;
gettimeofday(&last,0);
mergeManager::double_to_ts(&stats_last_tick, mergeManager::tv_to_double(&last));
#endif
} }
mergeStats(int xid, recordid rid) { mergeStats(int xid, recordid rid) {
marshalled_header h; marshalled_header h;
@ -78,6 +88,22 @@ class mergeStats {
in_progress = 0; in_progress = 0;
out_progress = ((double)base_size) / (double)target_size; out_progress = ((double)base_size) / (double)target_size;
active = false; active = false;
#if EXTENDED_STATS
stats_merge_count = 0;
stats_bytes_out_with_overhead = 0;
stats_num_tuples_out = 0;
stats_num_datapages_out = 0;
stats_bytes_in_small_delta = 0;
stats_num_tuples_in_small = 0;
stats_num_tuples_in_large = 0;
stats_lifetime_elapsed = 0;
stats_lifetime_active = 0;
stats_elapsed = 0;
stats_active = 0;
stats_lifetime_consumed = 0;
stats_bps = 10.0*1024.0*1024.0;
#endif
init_helper();
} }
recordid talloc(int xid) { recordid talloc(int xid) {
return Talloc(xid, sizeof(marshalled_header)); return Talloc(xid, sizeof(marshalled_header));
@ -110,7 +136,6 @@ class mergeStats {
stats_bytes_in_small_delta = 0; stats_bytes_in_small_delta = 0;
stats_num_tuples_in_small = 0; stats_num_tuples_in_small = 0;
stats_num_tuples_in_large = 0; stats_num_tuples_in_large = 0;
gettimeofday(&stats_sleep,0);
#endif #endif
} }
void starting_merge() { void starting_merge() {
@ -162,53 +187,61 @@ class mergeStats {
pageid_t mergeable_size; pageid_t mergeable_size;
pageid_t target_size; // Needed? pageid_t target_size; // Needed?
}; };
public: // XXX eliminate protected fields. public: // XXX eliminate public fields; these are still required because various bits of calculation (bloom filter size, estimated c0 run length, etc...) are managed outside of mergeManager.
int merge_level; // 1 => C0->C1, 2 => C1->C2 int merge_level; /// The tree component / merge level that we're tracking. 1 => C0->C1, 2 => C1->C2
pageid_t base_size; // size of table at beginning of merge. for c0, size of table at beginning of current c0-c1 merge round, plus data written since then. (this minus c1->bytes_in_small is the current size) pageid_t base_size; /// size of existing tree component (c[merge_level]') at beginning of current merge.
protected: protected:
pageid_t mergeable_size; // protected by mutex. pageid_t mergeable_size; /// The size of c[merge_level]_mergeable, assuming it exists. Protected by mutex.
public: public:
pageid_t target_size; pageid_t target_size; /// How big should the c[merge_level] tree component be?
protected: protected:
pageid_t bytes_out; // How many bytes worth of tuples did we write? pageid_t bytes_out; /// For C0, number of bytes consumed by downstream merger. For merge_level 1 and 2, number of bytes enqueued for the downstream (C1-C2, and nil) mergers.
public: public:
pageid_t bytes_in_small; // How many bytes from the small input tree (for C0, we ignore tree overheads)? pageid_t bytes_in_small; /// For C0, number of bytes inserted by application. For C1, C2, number of bytes read from small tree in C(n-1) - Cn merger.
protected: protected:
pageid_t bytes_in_large; // Bytes from the large input? pageid_t bytes_in_large; /// Bytes from the large input? (for C0, bytes deleted due to updates)
// todo: simplify confusing hand off logic, and remove this field?
bool just_handed_off; bool just_handed_off;
// These fields are used to amortize mutex acquisitions.
int delta; int delta;
int need_tick; int need_tick;
// todo in_progress and out_progress are derived fields. eliminate them?
double in_progress; double in_progress;
double out_progress; double out_progress;
bool active; bool active; /// True if this merger is running, or blocked by rate limiting. False if the upstream input does not exist.
#if EXTENDED_STATS #if EXTENDED_STATS
pageid_t stats_merge_count; // This is the stats_merge_count'th merge pageid_t stats_merge_count; /// This is the stats_merge_count'th merge
struct timeval stats_sleep; // When did we go to sleep waiting for input? struct timeval stats_sleep; /// When did we go to sleep waiting for input?
struct timeval stats_start; // When did we wake up and start merging? (at steady state with max throughput, this should be equal to stats_sleep) struct timeval stats_start; /// When did we wake up and start merging? (at steady state with max throughput, this should be equal to stats_sleep)
struct timeval stats_done; // When did we finish merging? struct timeval stats_done; /// When did we finish merging?
struct timespec stats_last_tick; struct timespec stats_last_tick;
pageid_t stats_bytes_out_with_overhead;// How many bytes did we write (including internal tree nodes)? pageid_t stats_bytes_out_with_overhead;/// How many bytes did we write (including internal tree nodes)?
pageid_t stats_num_tuples_out; // How many tuples did we write? pageid_t stats_num_tuples_out; /// How many tuples did we write?
pageid_t stats_num_datapages_out; // How many datapages? pageid_t stats_num_datapages_out; /// How many datapages?
pageid_t stats_bytes_in_small_delta; // How many bytes from the small input tree during this tick (for C0, we ignore tree overheads)? pageid_t stats_bytes_in_small_delta; /// How many bytes from the small input tree during this tick (for C0, we ignore tree overheads)?
pageid_t stats_num_tuples_in_small; // Tuples from the small input? pageid_t stats_num_tuples_in_small; /// Tuples from the small input?
pageid_t stats_num_tuples_in_large; // Tuples from large input? pageid_t stats_num_tuples_in_large; /// Tuples from large input?
double stats_lifetime_elapsed; double stats_lifetime_elapsed; /// How long has this tree existed, in seconds?
double stats_lifetime_consumed; double stats_lifetime_active; /// How long has this tree been running (i.e.; active = true), in seconds?
double stats_bps; double stats_elapsed; /// How long did this merge take, including idle time (not valid until after merge is complete)?
double stats_active; /// How long did this merge take once it started running?
double stats_lifetime_consumed; /// How many bytes has this tree consumed from upstream mergers?
double stats_bps; /// Effective throughput while active.
#endif #endif
public: public:
void pretty_print(FILE* fd) { void pretty_print(FILE* fd) {
#if EXTENDED_STATS #if EXTENDED_STATS
double sleep_time = float_tv(stats_start) - float_tv(stats_sleep); double sleep_time = stats_elapsed - stats_active;
double work_time = float_tv(stats_done) - float_tv(stats_start); double work_time = stats_active;
double total_time = sleep_time + work_time; double total_time = stats_elapsed;
double mb_out = ((double)stats_bytes_out_with_overhead) /(1024.0*1024.0); double mb_out = ((double)bytes_out) /(1024.0*1024.0);
double phys_mb_out = ((double)stats_bytes_out_with_overhead) / (1024.0 * 1024.0);
double mb_ins = ((double)bytes_in_small) /(1024.0*1024.0); double mb_ins = ((double)bytes_in_small) /(1024.0*1024.0);
double mb_inl = ((double)bytes_in_large) /(1024.0*1024.0); double mb_inl = ((double)bytes_in_large) /(1024.0*1024.0);
double kt_out = ((double)stats_num_tuples_out) /(1024.0); double kt_out = ((double)stats_num_tuples_out) /(1024.0);
@ -227,7 +260,7 @@ class mergeStats {
"Read (large) %7lld %7lld - " " %6.1f %6.1f" " %8.1f %8.1f" "\n" "Read (large) %7lld %7lld - " " %6.1f %6.1f" " %8.1f %8.1f" "\n"
"Disk %7lld %7lld - " " %6.1f %6.1f" " %8.1f %8.1f" "\n" "Disk %7lld %7lld - " " %6.1f %6.1f" " %8.1f %8.1f" "\n"
".....................................................................\n" ".....................................................................\n"
"avg tuple len: %6.2fkb\n" "avg tuple len: %6.2fKB w/ disk ovehead: %6.2fKB\n"
"effective throughput: (mb/s ; nsec/byte): (%.2f; %.2f) active" "\n" "effective throughput: (mb/s ; nsec/byte): (%.2f; %.2f) active" "\n"
" (%.2f; %.2f) wallclock" "\n" " (%.2f; %.2f) wallclock" "\n"
".....................................................................\n" ".....................................................................\n"
@ -239,7 +272,7 @@ class mergeStats {
(long long)mb_ins, (long long)kt_ins, mb_ins / work_time, mb_ins / total_time, kt_ins / work_time, kt_ins / total_time, (long long)mb_ins, (long long)kt_ins, mb_ins / work_time, mb_ins / total_time, kt_ins / work_time, kt_ins / total_time,
(long long)mb_inl, (long long)kt_inl, mb_inl / work_time, mb_inl / total_time, kt_inl / work_time, kt_inl / total_time, (long long)mb_inl, (long long)kt_inl, mb_inl / work_time, mb_inl / total_time, kt_inl / work_time, kt_inl / total_time,
(long long)mb_hdd, (long long)kt_hdd, mb_hdd / work_time, mb_hdd / total_time, kt_hdd / work_time, kt_hdd / total_time, (long long)mb_hdd, (long long)kt_hdd, mb_hdd / work_time, mb_hdd / total_time, kt_hdd / work_time, kt_hdd / total_time,
mb_out / kt_out, mb_out / kt_out, phys_mb_out / kt_out,
mb_ins / work_time, 1000.0 * work_time / mb_ins, mb_ins / total_time, 1000.0 * total_time / mb_ins mb_ins / work_time, 1000.0 * work_time / mb_ins, mb_ins / total_time, 1000.0 * total_time / mb_ins
); );
#endif #endif

View file

@ -145,6 +145,7 @@ void * merge_scheduler::memMergeThread() {
//TODO: this is simplistic for now //TODO: this is simplistic for now
//6: if c1' is too big, signal the other merger //6: if c1' is too big, signal the other merger
// XXX move this to mergeManager, and make bytes_in_small be protected.
if(stats->bytes_in_small) { if(stats->bytes_in_small) {
// update c0 effective size. // update c0 effective size.
double frac = 1.0/(double)merge_count; double frac = 1.0/(double)merge_count;
@ -156,7 +157,7 @@ void * merge_scheduler::memMergeThread() {
//ltable_->merge_mgr->get_merge_stats(0)->target_size = ltable_->mean_c0_run_length; //ltable_->merge_mgr->get_merge_stats(0)->target_size = ltable_->mean_c0_run_length;
} }
printf("Merge done. R = %f MemSize = %lld Mean = %lld, This = %lld, Count = %d factor %3.3fcur%3.3favg\n", *ltable_->R(), (long long)ltable_->max_c0_size, (long long int)ltable_->mean_c0_run_length, stats->bytes_in_small, merge_count, ((double)stats->bytes_in_small) / (double)ltable_->max_c0_size, ((double)ltable_->mean_c0_run_length) / (double)ltable_->max_c0_size); printf("\nMerge done. R = %f MemSize = %lld Mean = %lld, This = %lld, Count = %d factor %3.3fcur%3.3favg\n", *ltable_->R(), (long long)ltable_->max_c0_size, (long long int)ltable_->mean_c0_run_length, stats->bytes_in_small, merge_count, ((double)stats->bytes_in_small) / (double)ltable_->max_c0_size, ((double)ltable_->mean_c0_run_length) / (double)ltable_->max_c0_size);
assert(*ltable_->R() >= MIN_R); assert(*ltable_->R() >= MIN_R);
bool signal_c2 = (new_c1_size / ltable_->mean_c0_run_length > *ltable_->R()); bool signal_c2 = (new_c1_size / ltable_->mean_c0_run_length > *ltable_->R());