diff --git a/diskTreeComponent.h b/diskTreeComponent.h index fa8d9bb..471ae1d 100644 --- a/diskTreeComponent.h +++ b/diskTreeComponent.h @@ -18,14 +18,15 @@ class diskTreeComponent { class iterator; diskTreeComponent(int xid, pageid_t internal_region_size, pageid_t datapage_region_size, pageid_t datapage_size, - mergeStats* stats) : + mergeManager::mergeStats* stats) : ltree(new diskTreeComponent::internalNodes(xid, internal_region_size, datapage_region_size, datapage_size)), dp(0), datapage_size(datapage_size), stats(stats) {} - diskTreeComponent(int xid, recordid root, recordid internal_node_state, recordid datapage_state, mergeStats* stats) : + diskTreeComponent(int xid, recordid root, recordid internal_node_state, recordid datapage_state, + mergeManager::mergeStats* stats) : ltree(new diskTreeComponent::internalNodes(xid, root, internal_node_state, datapage_state)), dp(0), datapage_size(-1), @@ -73,7 +74,7 @@ class diskTreeComponent { internalNodes * ltree; DataPage* dp; pageid_t datapage_size; - mergeStats *stats; + mergeManager::mergeStats *stats; public: class internalNodes{ diff --git a/logstore.cpp b/logstore.cpp index 5415ab8..9851cc2 100644 --- a/logstore.cpp +++ b/logstore.cpp @@ -28,6 +28,7 @@ logtable::logtable(pageid_t internal_region_size, pageid_t datapage_regio tree_c1_mergeable = NULL; tree_c2 = NULL; this->still_running_ = true; + this->merge_mgr = new mergeManager(); this->mergedata = 0; //tmerger = new tuplemerger(&append_merger); tmerger = new tuplemerger(&replace_merger); @@ -78,7 +79,7 @@ recordid logtable::allocTable(int xid) { table_rec = Talloc(xid, sizeof(tbl_header)); - mergeStats * stats = 0; + mergeManager::mergeStats * stats = 0; //create the big tree tree_c2 = new diskTreeComponent(xid, internal_region_size, datapage_region_size, datapage_size, stats); diff --git a/logstore.h b/logstore.h index cd4fb93..6d84b8a 100644 --- a/logstore.h +++ b/logstore.h @@ -91,6 +91,8 @@ public: int64_t max_c0_size; + mergeManager * merge_mgr; + inline bool is_still_running() { return still_running_; } inline void stop() { still_running_ = false; diff --git a/mergeStats.h b/mergeStats.h index 36d2090..6e5b6b5 100644 --- a/mergeStats.h +++ b/mergeStats.h @@ -10,60 +10,119 @@ #include -class mergeStats { - int merge_level; // 1 => C0->C1, 2 => C1->C2 - pageid_t merge_count; // This is the merge_count'th merge - struct timeval sleep; // When did we go to sleep waiting for input? +class mergeManager { + pageid_t c1_queueSize; // How many bytes must c0-c1 consume before trying to swap over to an empty c1? ( = current target c1 size) + pageid_t c2_queueSize; // How many bytes must c1-c2 consume before there is room for a new empty c1? ( = previous c1 size) + pageid_t c1_totalConsumed; // What is long-term effective throughput of merger #1? (Excluding blocked times) + pageid_t c1_totalWorktime; + pageid_t c2_totalConsumed; // What is long term effective throughput of merger #2? (Excluding blocked times) + pageid_t c2_totalWorktime; public: - mergeStats(int merge_level, pageid_t merge_count) : - merge_level(merge_level), - merge_count(merge_count), - bytes_out(0), - num_tuples_out(0), - num_datapages_out(0), - bytes_in_small(0), - num_tuples_in_small(0), - bytes_in_large(0), - num_tuples_in_large(0) { - gettimeofday(&sleep,0); - } - void starting_merge() { - gettimeofday(&start, 0); - } - void finished_merge() { - gettimeofday(&done, 0); - } - void read_tuple_from_large_component(datatuple * tup) { - if(tup) { - num_tuples_in_large++; - bytes_in_large += tup->byte_length(); + mergeManager() : + c1_queueSize(0), + c2_queueSize(0), + c1_totalConsumed(0), + c1_totalWorktime(0), + c2_totalConsumed(0), + c2_totalWorktime(0) { } + + class mergeStats; + + void new_merge(mergeStats * s) { + if(s->merge_count) { + if(s->merge_level == 1) { + c1_queueSize = c1_queueSize > s->bytes_out ? c1_queueSize : s->bytes_out; + c1_totalConsumed += s->bytes_in_small; + c1_totalWorktime += (long_tv(s->done) - long_tv(s->start)); + } else if(s->merge_level == 2) { + c2_queueSize = s->bytes_in_small; + c2_totalConsumed += s->bytes_in_small; + c2_totalWorktime += (long_tv(s->done) - long_tv(s->start)); + } else { abort(); } + pretty_print(stdout); } } - void read_tuple_from_small_component(datatuple * tup) { - if(tup) { - num_tuples_in_small++; - bytes_in_small += tup->byte_length(); + + uint64_t long_tv(struct timeval& tv) { + return (1000000ULL * (uint64_t)tv.tv_sec) + ((uint64_t)tv.tv_usec); + } + void pretty_print(FILE * out) { + fprintf(out, + "C1 queue size %lld C2 queue size %lld C1 MB/s (eff; active) %6.1f C2 MB/s %6.1f\n", + c1_queueSize, c2_queueSize, + ((double)c1_totalConsumed)/((double)c1_totalWorktime), + ((double)c2_totalConsumed)/((double)c2_totalWorktime) + ); + } + class mergeStats { + public: + mergeStats(mergeManager* merge_mgr, int merge_level) : + merge_mgr(merge_mgr), + merge_level(merge_level), + merge_count(0), + bytes_out(0), + num_tuples_out(0), + num_datapages_out(0), + bytes_in_small(0), + num_tuples_in_small(0), + bytes_in_large(0), + num_tuples_in_large(0) { + gettimeofday(&sleep,0); } - } - void wrote_tuple(datatuple * tup) { - num_tuples_out++; - bytes_out_tuples += tup->byte_length(); - } - void wrote_datapage(DataPage *dp) { - num_datapages_out++; - bytes_out += (PAGE_SIZE * dp->get_page_count()); - } - // TODO: merger.cpp probably shouldn't compute R from this. - pageid_t output_size() { - return bytes_out; - } -private: + void new_merge() { + merge_mgr->new_merge(this); + merge_count++; + bytes_out = 0; + num_tuples_out = 0; + num_datapages_out = 0; + bytes_in_small = 0; + num_tuples_in_small = 0; + bytes_in_large = 0; + num_tuples_in_large = 0; + gettimeofday(&sleep,0); + } + void starting_merge() { + gettimeofday(&start, 0); + } + void finished_merge() { + gettimeofday(&done, 0); + } + void read_tuple_from_large_component(datatuple * tup) { + if(tup) { + num_tuples_in_large++; + bytes_in_large += tup->byte_length(); + } + } + void read_tuple_from_small_component(datatuple * tup) { + if(tup) { + num_tuples_in_small++; + bytes_in_small += tup->byte_length(); + } + } + void wrote_tuple(datatuple * tup) { + num_tuples_out++; + bytes_out_tuples += tup->byte_length(); + } + void wrote_datapage(DataPage *dp) { + num_datapages_out++; + bytes_out += (PAGE_SIZE * dp->get_page_count()); + } + // TODO: merger.cpp probably shouldn't compute R from this. + pageid_t output_size() { + return bytes_out; + } + protected: + mergeManager* merge_mgr; + int merge_level; // 1 => C0->C1, 2 => C1->C2 + pageid_t merge_count; // This is the merge_count'th merge + struct timeval sleep; // When did we go to sleep waiting for input? struct timeval start; // When did we wake up and start merging? (at steady state with max throughput, this should be equal to sleep) struct timeval done; // When did we finish merging? double float_tv(struct timeval& tv) { return ((double)tv.tv_sec) + ((double)tv.tv_usec) / 1000000.0; } + friend class mergeManager; pageid_t bytes_out; // How many bytes did we write (including internal tree nodes)? pageid_t bytes_out_tuples; // How many bytes worth of tuples did we write? @@ -73,47 +132,52 @@ private: pageid_t num_tuples_in_small; // Tuples from the small input? pageid_t bytes_in_large; // Bytes from the large input? pageid_t num_tuples_in_large; // Tuples from large input? -public: + public: void pretty_print(FILE* fd) { - double sleep_time = float_tv(start) - float_tv(sleep); - double work_time = float_tv(done) - float_tv(start); - double total_time = sleep_time + work_time; - double mb_out = ((double)bytes_out) /(1024.0*1024.0); - double mb_ins = ((double)bytes_in_small) /(1024.0*1024.0); - double mb_inl = ((double)bytes_in_large) /(1024.0*1024.0); - double kt_out = ((double)num_tuples_out) /(1024.0); - double kt_ins= ((double)num_tuples_in_small) /(1024.0); - double kt_inl = ((double)num_tuples_in_large) /(1024.0); - double mb_hdd = mb_out + mb_inl + (merge_level == 1 ? 0.0 : mb_ins); - double kt_hdd = kt_out + kt_inl + (merge_level == 1 ? 0.0 : kt_ins); + double sleep_time = float_tv(start) - float_tv(sleep); + double work_time = float_tv(done) - float_tv(start); + double total_time = sleep_time + work_time; + double mb_out = ((double)bytes_out) /(1024.0*1024.0); + double mb_ins = ((double)bytes_in_small) /(1024.0*1024.0); + double mb_inl = ((double)bytes_in_large) /(1024.0*1024.0); + double kt_out = ((double)num_tuples_out) /(1024.0); + double kt_ins= ((double)num_tuples_in_small) /(1024.0); + double kt_inl = ((double)num_tuples_in_large) /(1024.0); + double mb_hdd = mb_out + mb_inl + (merge_level == 1 ? 0.0 : mb_ins); + double kt_hdd = kt_out + kt_inl + (merge_level == 1 ? 0.0 : kt_ins); - fprintf(fd, - "=====================================================================\n" - "Thread %d merge %lld: sleep %6.2f sec, run %6.2f sec\n" - " megabytes kTuples datapages MB/s (real) kTup/s (real)\n" - "Wrote %7lld %7lld %9lld" " %6.1f %6.1f" " %8.1f %8.1f" "\n" - "Read (small) %7lld %7lld - " " %6.1f %6.1f" " %8.1f %8.1f" "\n" - "Read (large) %7lld %7lld - " " %6.1f %6.1f" " %8.1f %8.1f" "\n" - "Disk %7lld %7lld - " " %6.1f %6.1f" " %8.1f %8.1f" "\n" - ".....................................................................\n" - "avg tuple len: %6.2fkb\n" - "effective throughput: (mb/s ; nsec/byte): (%.2f; %.2f) active" "\n" - " (%.2f; %.2f) wallclock" "\n" - ".....................................................................\n" - , - merge_level, merge_count, - sleep_time, - work_time, - (long long)mb_out, (long long)kt_out, num_datapages_out, mb_out / work_time, mb_out / total_time, kt_out / work_time, kt_out / total_time, - (long long)mb_ins, (long long)kt_ins, mb_ins / work_time, mb_ins / total_time, kt_ins / work_time, kt_ins / total_time, - (long long)mb_inl, (long long)kt_inl, mb_inl / work_time, mb_inl / total_time, kt_inl / work_time, kt_inl / total_time, - (long long)mb_hdd, (long long)kt_hdd, mb_hdd / work_time, mb_hdd / total_time, kt_hdd / work_time, kt_hdd / total_time, - mb_out / kt_out, - mb_ins / work_time, 1000.0 * work_time / mb_ins, mb_ins / total_time, 1000.0 * total_time / mb_ins - ); + fprintf(fd, + "=====================================================================\n" + "Thread %d merge %lld: sleep %6.2f sec, run %6.2f sec\n" + " megabytes kTuples datapages MB/s (real) kTup/s (real)\n" + "Wrote %7lld %7lld %9lld" " %6.1f %6.1f" " %8.1f %8.1f" "\n" + "Read (small) %7lld %7lld - " " %6.1f %6.1f" " %8.1f %8.1f" "\n" + "Read (large) %7lld %7lld - " " %6.1f %6.1f" " %8.1f %8.1f" "\n" + "Disk %7lld %7lld - " " %6.1f %6.1f" " %8.1f %8.1f" "\n" + ".....................................................................\n" + "avg tuple len: %6.2fkb\n" + "effective throughput: (mb/s ; nsec/byte): (%.2f; %.2f) active" "\n" + " (%.2f; %.2f) wallclock" "\n" + ".....................................................................\n" + , + merge_level, merge_count, + sleep_time, + work_time, + (long long)mb_out, (long long)kt_out, num_datapages_out, mb_out / work_time, mb_out / total_time, kt_out / work_time, kt_out / total_time, + (long long)mb_ins, (long long)kt_ins, mb_ins / work_time, mb_ins / total_time, kt_ins / work_time, kt_ins / total_time, + (long long)mb_inl, (long long)kt_inl, mb_inl / work_time, mb_inl / total_time, kt_inl / work_time, kt_inl / total_time, + (long long)mb_hdd, (long long)kt_hdd, mb_hdd / work_time, mb_hdd / total_time, kt_hdd / work_time, kt_hdd / total_time, + mb_out / kt_out, + mb_ins / work_time, 1000.0 * work_time / mb_ins, mb_ins / total_time, 1000.0 * total_time / mb_ins + ); } -}; + }; + + mergeStats* newMergeStats(int mergeLevel) { + return new mergeStats(this, mergeLevel); + } +}; #endif /* MERGESTATS_H_ */ diff --git a/merger.cpp b/merger.cpp index 6f37b3f..e44c3cc 100644 --- a/merger.cpp +++ b/merger.cpp @@ -193,7 +193,7 @@ void merge_iterators(int xid, ITB *itrB, logtable *ltable, diskTreeComponent *scratch_tree, - mergeStats *stats, + mergeManager::mergeStats *stats, bool dropDeletes); @@ -229,11 +229,12 @@ void* memMergeThread(void*arg) assert(ltable->get_tree_c1()); int merge_count =0; + mergeManager::mergeStats * stats = a->ltable->merge_mgr->newMergeStats(1); while(true) // 1 { - mergeStats stats(1, merge_count); writelock(ltable->header_lock,0); + stats->new_merge(); int done = 0; // 2: wait for c0_mergable while(!ltable->get_tree_c0_mergeable()) @@ -270,7 +271,7 @@ void* memMergeThread(void*arg) break; } - stats.starting_merge(); + stats->starting_merge(); // 3: Begin transaction xid = Tbegin(); @@ -284,7 +285,7 @@ void* memMergeThread(void*arg) //create a new tree - diskTreeComponent * c1_prime = new diskTreeComponent(xid, a->internal_region_size, a->datapage_region_size, a->datapage_size, &stats); + diskTreeComponent * c1_prime = new diskTreeComponent(xid, a->internal_region_size, a->datapage_region_size, a->datapage_size, stats); //pthread_mutex_unlock(a->block_ready_mut); unlock(ltable->header_lock); @@ -292,7 +293,7 @@ void* memMergeThread(void*arg) //: do the merge DEBUG("mmt:\tMerging:\n"); - merge_iterators(xid, itrA, itrB, ltable, c1_prime, &stats, false); + merge_iterators(xid, itrA, itrB, ltable, c1_prime, stats, false); delete itrA; delete itrB; @@ -311,7 +312,7 @@ void* memMergeThread(void*arg) //TODO: this is simplistic for now //6: if c1' is too big, signal the other merger double target_R = *(a->r_i); - double new_c1_size = stats.output_size(); + double new_c1_size = stats->output_size(); assert(target_R >= MIN_R); bool signal_c2 = (new_c1_size / ltable->max_c0_size > target_R) || (a->max_size && new_c1_size > a->max_size ); @@ -347,7 +348,7 @@ void* memMergeThread(void*arg) ltable->set_tree_c1_mergeable(c1_prime); // 8: c1 = new empty. - ltable->set_tree_c1(new diskTreeComponent(xid, a->internal_region_size, a->datapage_region_size, a->datapage_size, &stats)); + ltable->set_tree_c1(new diskTreeComponent(xid, a->internal_region_size, a->datapage_region_size, a->datapage_size, stats)); pthread_cond_signal(a->out_block_ready_cond); @@ -363,8 +364,8 @@ void* memMergeThread(void*arg) unlock(ltable->header_lock); - stats.finished_merge(); - stats.pretty_print(stdout); + stats->finished_merge(); + stats->pretty_print(stdout); //TODO: get the freeing outside of the lock } @@ -385,13 +386,14 @@ void *diskMergeThread(void*arg) int merge_count =0; + mergeManager::mergeStats * stats = a->ltable->merge_mgr->newMergeStats(2); while(true) { - mergeStats stats(2, merge_count); // 2: wait for input writelock(ltable->header_lock,0); + stats->new_merge(); int done = 0; // get a new input for merge while(!ltable->get_tree_c1_mergeable()) @@ -423,7 +425,7 @@ void *diskMergeThread(void*arg) break; } - stats.starting_merge(); + stats->starting_merge(); // 3: begin xid = Tbegin(); @@ -434,14 +436,14 @@ void *diskMergeThread(void*arg) diskTreeComponent::iterator *itrB = ltable->get_tree_c1_mergeable()->open_iterator(); //create a new tree - diskTreeComponent * c2_prime = new diskTreeComponent(xid, a->internal_region_size, a->datapage_region_size, a->datapage_size, &stats); + diskTreeComponent * c2_prime = new diskTreeComponent(xid, a->internal_region_size, a->datapage_region_size, a->datapage_size, stats); unlock(ltable->header_lock); //do the merge DEBUG("dmt:\tMerging:\n"); - merge_iterators(xid, itrA, itrB, ltable, c2_prime, &stats, true); + merge_iterators(xid, itrA, itrB, ltable, c2_prime, stats, true); delete itrA; delete itrB; @@ -467,7 +469,7 @@ void *diskMergeThread(void*arg) merge_count++; //update the current optimal R value - *(a->r_i) = std::max(MIN_R, sqrt( (stats.output_size() * 1.0) / (ltable->max_c0_size) ) ); + *(a->r_i) = std::max(MIN_R, sqrt( (stats->output_size() * 1.0) / (ltable->max_c0_size) ) ); DEBUG("dmt:\tmerge_count %lld\t#written bytes: %lld\n optimal r %.2f", stats.merge_count, stats.output_size(), *(a->r_i)); // 10: C2 is never to big @@ -480,8 +482,8 @@ void *diskMergeThread(void*arg) unlock(ltable->header_lock); - stats.finished_merge(); - stats.pretty_print(stdout); + stats->finished_merge(); + stats->pretty_print(stdout); } return 0; @@ -492,7 +494,7 @@ void merge_iterators(int xid, ITA *itrA, //iterator on c1 or c2 ITB *itrB, //iterator on c0 or c1, respectively logtable *ltable, - diskTreeComponent *scratch_tree, mergeStats *stats, + diskTreeComponent *scratch_tree, mergeManager::mergeStats *stats, bool dropDeletes // should be true iff this is biggest component ) { diff --git a/test/check_logtable.cpp b/test/check_logtable.cpp index b864b8b..ff55c9e 100644 --- a/test/check_logtable.cpp +++ b/test/check_logtable.cpp @@ -34,8 +34,10 @@ void insertProbeIter(size_t NUM_ENTRIES) xid = Tbegin(); - mergeStats stats(1,0); - diskTreeComponent *ltable_c1 = new diskTreeComponent(xid, 1000, 10000, 5, &stats); + mergeManager merge_mgr; + mergeManager::mergeStats * stats = merge_mgr.newMergeStats(1); + + diskTreeComponent *ltable_c1 = new diskTreeComponent(xid, 1000, 10000, 5, stats); std::vector data_arr; std::vector key_arr;