diff --git a/diskTreeComponent.cpp b/diskTreeComponent.cpp index c763a08..ed63d14 100644 --- a/diskTreeComponent.cpp +++ b/diskTreeComponent.cpp @@ -69,7 +69,7 @@ int diskTreeComponent::insertTuple(int xid, datatuple *t) dp = insertDataPage(xid, t); // stats->num_datapages_out++; } else if(!dp->append(t)) { - // stats->bytes_out += (PAGE_SIZE * dp->get_page_count()); + // stats->bytes_out_with_overhead += (PAGE_SIZE * dp->get_page_count()); ((mergeStats*)stats)->wrote_datapage(dp); dp->writes_done(); delete dp; diff --git a/logstore.cpp b/logstore.cpp index d7a4cd0..4e55f02 100644 --- a/logstore.cpp +++ b/logstore.cpp @@ -23,6 +23,7 @@ template logtable::logtable(pageid_t internal_region_size, pageid_t datapage_region_size, pageid_t datapage_size) { + r_val = MIN_R; tree_c0 = NULL; tree_c0_mergeable = NULL; tree_c1 = NULL; @@ -142,6 +143,7 @@ void logtable::flushTable() gettimeofday(&start_tv,0); start = tv_to_double(start_tv); + c0_stats->handed_off_tree(); c0_stats->finished_merge(); c0_stats->new_merge(); @@ -191,6 +193,8 @@ void logtable::flushTable() stop-start, start-last_start); } last_start = stop; + } else { + DEBUG("signaled c0-c1 merge\n"); } } @@ -420,8 +424,10 @@ void logtable::insertTuple(datatuple *tuple) { datatuple *pre_t = *rbitr; //do the merging + c0_stats->read_tuple_from_large_component(pre_t); datatuple *new_t = tmerger->merge(pre_t, tuple); c0_stats->merged_tuples(new_t, tuple, pre_t); + c0_stats->wrote_tuple(new_t); tree_c0->erase(pre_t); //remove the previous tuple tree_c0->insert(new_t); //insert the new tuple @@ -438,6 +444,7 @@ void logtable::insertTuple(datatuple *tuple) //insert tuple into the rbtree tree_c0->insert(t); + c0_stats->wrote_tuple(t); tsize++; tree_bytes += t->byte_length();// + RB_TREE_OVERHEAD; diff --git a/logstore.h b/logstore.h index 1badbbf..c690af3 100644 --- a/logstore.h +++ b/logstore.h @@ -37,6 +37,8 @@ public: ~logtable(); + double * R() { return &r_val; } + //user access functions datatuple * findTuple(int xid, const datatuple::key_t key, size_t keySize); @@ -110,6 +112,7 @@ public: } private: + double r_val; recordid table_rec; struct table_header tbl_header; uint64_t epoch; diff --git a/mergeManager.cpp b/mergeManager.cpp index dfa2980..7a2e3d5 100644 --- a/mergeManager.cpp +++ b/mergeManager.cpp @@ -37,107 +37,109 @@ void mergeManager::new_merge(mergeStats * s) { if(s->merge_level == 0) { // queueSize was set during startup } else if(s->merge_level == 1) { - c1_queueSize = c1_queueSize > s->bytes_out ? c1_queueSize : s->bytes_out; + c1_queueSize = (pageid_t)(*ltable->R() * (double)c0_queueSize); //c1_queueSize > s->bytes_out ? c1_queueSize : s->bytes_out; } else if(s->merge_level == 2) { - c2_queueSize = s->bytes_in_small; } else { abort(); } - pretty_print(stdout); } pthread_mutex_unlock(&mut); } void mergeManager::set_c0_size(int64_t size) { c0_queueSize = size; } -void mergeManager::tick(mergeStats * s, bool done) { - if(s->merge_level == 0) { - pthread_mutex_lock(&throttle_mut); - } - // throttle? - if(s->bytes_in_small_delta > c0_queueSize / 10000) { - if(s->merge_level == 0) { - struct timeval now; - gettimeofday(&now, 0); - double elapsed_delta = tv_to_double(&now) - ts_to_double(&last_throttle); - pageid_t bytes_written_delta = (s->bytes_in_small_delta - s->bytes_collapsed_delta); - double min_throughput = 0.0; // don't throttle below 100 kilobytes / sec - double max_throughput = 10.0 * 1024.0 * 1024.0; - double c0_badness = (double)((c0_totalConsumed + bytes_written_delta - c1_totalConsumed)- c0_queueSize) / ((double)c0_queueSize); - double raw_throughput = ((double)bytes_written_delta)/elapsed_delta; - if(raw_throughput > max_throughput || c0_badness > 0) { - //double target_throughput = min_throughput / (c0_badness); // * c0_badness * c0_badness); - double target_throughput; - if(c0_badness > 0) { - target_throughput = (max_throughput - min_throughput) * (1.0-sqrt(sqrt(c0_badness))) + min_throughput; - } else { - target_throughput = max_throughput; - } - double target_elapsed = ((double)bytes_written_delta)/target_throughput; - //printf("Worked %6.1f (target %6.1f)\n", elapsed_delta, target_elapsed); - if(target_elapsed > elapsed_delta) { - struct timespec sleep_until; - double_to_ts(&sleep_until, ts_to_double(&last_throttle) + target_elapsed); - //fprintf(stdout, "Throttling for %6.1f seconds\n", target_elapsed - (double)elapsed_delta); - last_throttle_seconds = target_elapsed - (double)elapsed_delta; - last_elapsed_seconds = target_elapsed; - throttle_seconds += last_throttle_seconds; - elapsed_seconds += last_elapsed_seconds; - pthread_mutex_lock(&dummy_throttle_mut); - pthread_cond_timedwait(&dummy_throttle_cond, &dummy_throttle_mut, &sleep_until); - pthread_mutex_unlock(&dummy_throttle_mut); - memcpy(&last_throttle, &sleep_until, sizeof(sleep_until)); - } else { - double_to_ts(&last_throttle, tv_to_double(&now)); - } - } else { - double_to_ts(&last_throttle, tv_to_double(&now)); - } - } - struct timeval now; - gettimeofday(&now,0); - unsigned long long elapsed = long_tv(now) - long_tv(s->last); +/** + * This function is invoked periodically by the merge threads. It updates mergeManager's statistics, and applies + * backpressure as necessary. + * + * Here is the backpressure algorithm. + * + * We want to maintain these two invariants: + * - for each byte consumed by the app->c0 threads, a byte is consumed by the c0->c1 merge thread. + * - for each byte consumed by the c0->c1 thread, the c1->c2 thread consumes a byte + * + * More concretely (and taking into account varying values of R): + * capacity(C_i) - current_size(C_i) >= size(C_i_mergeable) - bytes_consumed_by_next_merger + * + * where: + * capacity c0 = c0_queue_size + * capacity c1 = c1_queue_size + * + * current_size(c_i) = sum(bytes_out_delta) - sum(bytes_in_large_delta) + * + * bytes_consumed_by_merger = sum(bytes_in_small_delta) + */ +void mergeManager::tick(mergeStats * s, bool done) { + pageid_t tick_length_bytes = 1024*1024; + if(done || (s->bytes_in_small_delta > tick_length_bytes)) { pthread_mutex_lock(&mut); - if(s->merge_level == 0) { - c0_totalConsumed += s->bytes_in_small_delta; - c0_totalWorktime += elapsed; - c0_totalCollapsed += s->bytes_collapsed_delta; - } else if(s->merge_level == 1) { - c1_totalConsumed += s->bytes_in_small_delta; - c1_totalWorktime += elapsed; - c1_totalCollapsed += s->bytes_collapsed_delta; - } else if(s->merge_level == 2) { - c2_totalConsumed += s->bytes_in_small_delta; - c2_totalWorktime += elapsed; - c2_totalCollapsed += s->bytes_collapsed_delta; - } else { abort(); } - pthread_mutex_unlock(&mut); + struct timeval now; + gettimeofday(&now, 0); + double elapsed_delta = tv_to_double(&now) - ts_to_double(&s->last_tick); + double bps = (double)s->bytes_in_small_delta / (double)elapsed_delta; + + pageid_t current_size = s->bytes_out - s->bytes_in_large; + + int64_t overshoot; + int64_t overshoot_fudge = (int64_t)((double)c0_queueSize * 0.1); + do{ + overshoot = 0; + if(s->merge_level == 0) { + if(done) { + // c0->bytes_in_small = 0; + } else if(c0->mergeable_size) { + overshoot = (overshoot_fudge + c0->mergeable_size - c1->bytes_in_small) // amount left to process + - (c0_queueSize - current_size); // - room for more insertions + } + } else if (s->merge_level == 1) { + if(done) { + c0->mergeable_size = 0; + c1->bytes_in_small = 0; + } else if(/*c1_queueSize && */c1->mergeable_size) { + overshoot = (c1->mergeable_size - c2->bytes_in_small) + - (c1_queueSize - current_size); + } + } else if (s->merge_level == 2) { + if(done) { + c1->mergeable_size = 0; + c2->bytes_in_small = 0; + } + // Never throttle this merge. + } + static int num_skipped = 0; + if(num_skipped == 10) { + printf("#%d mbps %6.1f overshoot %9lld current_size = %9lld ",s->merge_level, bps / (1024.0*1024.0), overshoot, current_size); + pretty_print(stdout); + num_skipped = 0; + } + num_skipped ++; + if(overshoot > 0) { + // throttle + // it took "elapsed" seconds to process "tick_length_bytes" mb + double sleeptime = (double)overshoot / bps; // 2 is a fudge factor + + struct timespec sleep_until; + if(sleeptime > 1) { sleeptime = 1; } + double_to_ts(&sleep_until, sleeptime + tv_to_double(&now)); +// printf("\nMerge thread %d Overshoot: %lld Throttle %6f\n", s->merge_level, overshoot, sleeptime); +// pthread_mutex_lock(&dummy_throttle_mut); + pthread_cond_timedwait(&dummy_throttle_cond, &mut, &sleep_until); +// pthread_mutex_unlock(&dummy_throttle_mut); + gettimeofday(&now, 0); + } + } while(overshoot > 0); + memcpy(&s->last_tick, &now, sizeof(now)); s->bytes_in_small_delta = 0; - s->bytes_collapsed_delta = 0; - - memcpy(&s->last, &now, sizeof(now)); - - pretty_print(stdout); - } - if(s->merge_level == 0) { - pthread_mutex_unlock(&throttle_mut); +// pretty_print(stdout); + pthread_mutex_unlock(&mut); } } -mergeManager::mergeManager(void *ltable): +mergeManager::mergeManager(logtable *ltable): ltable(ltable), c0_queueSize(0), c1_queueSize(0), - c2_queueSize(0), - c0_totalConsumed(0), - c0_totalCollapsed(0), - c0_totalWorktime(0), - c1_totalConsumed(0), - c1_totalCollapsed(0), - c1_totalWorktime(0), - c2_totalConsumed(0), - c2_totalCollapsed(0), - c2_totalWorktime(0), +// c2_queueSize(0), c0(new mergeStats(this, 0)), c1(new mergeStats(this, 1)), c2(new mergeStats(this, 2)) { @@ -147,7 +149,9 @@ mergeManager::mergeManager(void *ltable): pthread_cond_init(&dummy_throttle_cond, 0); struct timeval tv; gettimeofday(&tv, 0); - double_to_ts(&last_throttle, tv_to_double(&tv)); + double_to_ts(&c0->last_tick, tv_to_double(&tv)); + double_to_ts(&c1->last_tick, tv_to_double(&tv)); + double_to_ts(&c2->last_tick, tv_to_double(&tv)); } void mergeManager::pretty_print(FILE * out) { @@ -176,19 +180,21 @@ void mergeManager::pretty_print(FILE * out) { have_c1m ? "C1'" : "...", c2->active ? "RUN" : "---", have_c2 ? "C2" : ".."); - fprintf(out, "Throttle: %6.1f%% (cur) %6.1f%% (overall) ", 100.0*(last_throttle_seconds/(last_elapsed_seconds)), 100.0*(throttle_seconds/(elapsed_seconds))); - fprintf(out, "C0 size %4lld collapsed %4lld resident %4lld ", - 2*c0_queueSize/mb, - c0_totalCollapsed/mb, - (c0_totalConsumed - (c0_totalCollapsed + c1_totalConsumed))/mb); - fprintf(out, "C1 size %4lld collapsed %4lld resident %4lld ", - 2*c1_queueSize/mb, - c1_totalCollapsed/mb, - (c1_totalConsumed - (c1_totalCollapsed + c2_totalConsumed))/mb); - fprintf(out, "C2 size %4lld collapsed %4lld ", - 2*c2_queueSize/mb, c2_totalCollapsed/mb); - fprintf(out, "C1 MB/s (eff; active) %6.1f C2 MB/s %6.1f\r", - ((double)c1_totalConsumed)/((double)c1_totalWorktime), - ((double)c2_totalConsumed)/((double)c2_totalWorktime)); + fprintf(out, "[size in small/large, out, mergeable] C0 %4lld %4lld %4lld %4lld %4lld ", c0_queueSize/mb, c0->bytes_in_small/mb, c0->bytes_in_large/mb, c0->bytes_out/mb, c0->mergeable_size/mb); + fprintf(out, "C1 %4lld %4lld %4lld %4lld %4lld ", c1_queueSize/mb, c1->bytes_in_small/mb, c1->bytes_in_large/mb, c1->bytes_out/mb, c1->mergeable_size/mb); + fprintf(out, "C2 .... %4lld %4lld %4lld %4lld ", c2->bytes_in_small/mb, c2->bytes_in_large/mb, c2->bytes_out/mb, c2->mergeable_size/mb); +// fprintf(out, "Throttle: %6.1f%% (cur) %6.1f%% (overall) ", 100.0*(last_throttle_seconds/(last_elapsed_seconds)), 100.0*(throttle_seconds/(elapsed_seconds))); +// fprintf(out, "C0 size %4lld resident %4lld ", +// 2*c0_queueSize/mb, +// (c0->bytes_out - c0->bytes_in_large)/mb); +// fprintf(out, "C1 size %4lld resident %4lld\r", +// 2*c1_queueSize/mb, +// (c1->bytes_out - c1->bytes_in_large)/mb); +// fprintf(out, "C2 size %4lld\r", +// 2*c2_queueSize/mb); +// fprintf(out, "C1 MB/s (eff; active) %6.1f C2 MB/s %6.1f\r", +// ((double)c1_totalConsumed)/((double)c1_totalWorktime), +// ((double)c2_totalConsumed)/((double)c2_totalWorktime)); fflush(out); + fprintf(out, "\r"); } diff --git a/mergeManager.h b/mergeManager.h index dc46019..5445c39 100644 --- a/mergeManager.h +++ b/mergeManager.h @@ -13,6 +13,10 @@ #undef end #include #include +#include + +template +class logtable; class mergeStats; @@ -32,7 +36,7 @@ private: return (1000000ULL * (uint64_t)tv.tv_sec) + ((uint64_t)tv.tv_usec); } public: - mergeManager(void *ltable); + mergeManager(logtable *ltable); ~mergeManager(); @@ -44,19 +48,9 @@ public: private: pthread_mutex_t mut; - void* ltable; + logtable* ltable; pageid_t c0_queueSize; pageid_t c1_queueSize; // How many bytes must c0-c1 consume before trying to swap over to an empty c1? ( = current target c1 size) - pageid_t c2_queueSize; // How many bytes must c1-c2 consume before there is room for a new empty c1? ( = previous c1 size) - pageid_t c0_totalConsumed; - pageid_t c0_totalCollapsed; - pageid_t c0_totalWorktime; - pageid_t c1_totalConsumed; // What is long-term effective throughput of merger #1? (Excluding blocked times) - pageid_t c1_totalCollapsed; - pageid_t c1_totalWorktime; - pageid_t c2_totalConsumed; // What is long term effective throughput of merger #2? (Excluding blocked times) - pageid_t c2_totalCollapsed; - pageid_t c2_totalWorktime; double throttle_seconds; double elapsed_seconds; double last_throttle_seconds; @@ -64,7 +58,6 @@ private: mergeStats * c0; mergeStats * c1; mergeStats * c2; - struct timespec last_throttle; pthread_mutex_t throttle_mut; pthread_mutex_t dummy_throttle_mut; pthread_cond_t dummy_throttle_cond; diff --git a/mergeStats.h b/mergeStats.h index 5f71143..104fa04 100644 --- a/mergeStats.h +++ b/mergeStats.h @@ -24,13 +24,13 @@ class mergeStats { merge_mgr(merge_mgr), merge_level(merge_level), merge_count(0), + mergeable_size(0), + bytes_out_with_overhead(0), bytes_out(0), num_tuples_out(0), num_datapages_out(0), bytes_in_small(0), bytes_in_small_delta(0), - bytes_collapsed(0), - bytes_collapsed_delta(0), num_tuples_in_small(0), bytes_in_large(0), num_tuples_in_large(0), @@ -41,15 +41,14 @@ class mergeStats { void new_merge() { merge_mgr->new_merge(this); merge_count++; - bytes_out = 0; + // bytes_out_with_overhead = 0; + // bytes_out = 0; num_tuples_out = 0; num_datapages_out = 0; bytes_in_small = 0; bytes_in_small_delta = 0; - bytes_collapsed = 0; - bytes_collapsed_delta = 0; num_tuples_in_small = 0; - bytes_in_large = 0; + // bytes_in_large = 0; num_tuples_in_large = 0; gettimeofday(&sleep,0); } @@ -58,6 +57,11 @@ class mergeStats { gettimeofday(&start, 0); gettimeofday(&last, 0); } + void handed_off_tree() { + mergeable_size = bytes_out - bytes_in_large; + bytes_out = 0; + bytes_in_large = 0; + } void finished_merge() { active = false; merge_mgr->tick(this, true); @@ -78,19 +82,15 @@ class mergeStats { } } void merged_tuples(datatuple * merged, datatuple * small, datatuple * large) { - pageid_t d = (merged->byte_length() - (small->byte_length() + large->byte_length())); - bytes_collapsed += d; - bytes_collapsed_delta += d; } void wrote_tuple(datatuple * tup) { num_tuples_out++; - bytes_out_tuples += tup->byte_length(); + bytes_out += tup->byte_length(); } void wrote_datapage(DataPage *dp) { num_datapages_out++; - bytes_out += (PAGE_SIZE * dp->get_page_count()); + bytes_out_with_overhead += (PAGE_SIZE * dp->get_page_count()); } - // TODO: merger.cpp probably shouldn't compute R from this. pageid_t output_size() { return bytes_out; } @@ -107,15 +107,17 @@ class mergeStats { return ((double)tv.tv_sec) + ((double)tv.tv_usec) / 1000000.0; } friend class mergeManager; +protected: + struct timespec last_tick; - pageid_t bytes_out; // How many bytes did we write (including internal tree nodes)? - pageid_t bytes_out_tuples; // How many bytes worth of tuples did we write? + pageid_t mergeable_size; + + pageid_t bytes_out_with_overhead;// How many bytes did we write (including internal tree nodes)? + pageid_t bytes_out; // How many bytes worth of tuples did we write? pageid_t num_tuples_out; // How many tuples did we write? pageid_t num_datapages_out; // How many datapages? pageid_t bytes_in_small; // How many bytes from the small input tree (for C0, we ignore tree overheads)? pageid_t bytes_in_small_delta; // How many bytes from the small input tree during this tick (for C0, we ignore tree overheads)? - pageid_t bytes_collapsed; // How many bytes disappeared due to tuple merges? - pageid_t bytes_collapsed_delta; pageid_t num_tuples_in_small; // Tuples from the small input? pageid_t bytes_in_large; // Bytes from the large input? pageid_t num_tuples_in_large; // Tuples from large input? @@ -126,7 +128,7 @@ class mergeStats { double sleep_time = float_tv(start) - float_tv(sleep); double work_time = float_tv(done) - float_tv(start); double total_time = sleep_time + work_time; - double mb_out = ((double)bytes_out) /(1024.0*1024.0); + double mb_out = ((double)bytes_out_with_overhead) /(1024.0*1024.0); double mb_ins = ((double)bytes_in_small) /(1024.0*1024.0); double mb_inl = ((double)bytes_in_large) /(1024.0*1024.0); double kt_out = ((double)num_tuples_out) /(1024.0); diff --git a/merger.cpp b/merger.cpp index 794a4eb..b764013 100644 --- a/merger.cpp +++ b/merger.cpp @@ -55,8 +55,6 @@ void merge_scheduler::shutdown() pthread_join(mdata->memmerge_thread,0); pthread_join(mdata->diskmerge_thread,0); } - - } void merge_scheduler::startlogtable(int index, int64_t MAX_C0_SIZE) @@ -64,8 +62,6 @@ void merge_scheduler::startlogtable(int index, int64_t MAX_C0_SIZE) logtable * ltable = mergedata[index].first; struct logtable_mergedata *mdata = mergedata[index].second; - - static double R = MIN_R; //initialize rb-tree ltable->set_tree_c0(new memTreeComponent::rbtree_t); @@ -83,7 +79,6 @@ void merge_scheduler::startlogtable(int index, int64_t MAX_C0_SIZE) struct merger_args diskmerge_args= { ltable, 0, //max_tree_size No max size for biggest component - &R, //r_i }; *mdata->diskmerge_args = diskmerge_args; @@ -91,8 +86,7 @@ void merge_scheduler::startlogtable(int index, int64_t MAX_C0_SIZE) struct merger_args memmerge_args = { ltable, - (int64_t)(R * R * MAX_C0_SIZE), - &R, + (int64_t)(MAX_C0_SIZE), // XXX why did this multiply by R^2 before?? }; *mdata->memmerge_args = memmerge_args; @@ -230,6 +224,8 @@ void* memMergeThread(void*arg) memTreeComponent::tearDownTree(ltable->get_tree_c0_mergeable()); // 11: c0_mergeable = NULL ltable->set_tree_c0_mergeable(NULL); + double new_c1_size = stats->output_size(); + stats->handed_off_tree(); pthread_cond_signal(<able->c0_needed); ltable->update_persistent_header(xid); @@ -237,11 +233,10 @@ void* memMergeThread(void*arg) //TODO: this is simplistic for now //6: if c1' is too big, signal the other merger - double target_R = *(a->r_i); - double new_c1_size = stats->output_size(); + double target_R = *ltable->R(); assert(target_R >= MIN_R); - bool signal_c2 = (new_c1_size / ltable->max_c0_size > target_R) || - (a->max_size && new_c1_size > a->max_size ); + bool signal_c2 = (new_c1_size / ltable->max_c0_size > target_R); + DEBUG("\nc1 size %f R %f\n", new_c1_size, target_R); if( signal_c2 ) { DEBUG("mmt:\tsignaling C2 for merge\n"); @@ -346,7 +341,7 @@ void *diskMergeThread(void*arg) //do the merge DEBUG("dmt:\tMerging:\n"); - merge_iterators(xid, NULL, itrA, itrB, ltable, c2_prime, stats, true); + merge_iterators(xid, c2_prime, itrA, itrB, ltable, c2_prime, stats, true); delete itrA; delete itrB; @@ -372,11 +367,14 @@ void *diskMergeThread(void*arg) merge_count++; //update the current optimal R value - *(a->r_i) = std::max(MIN_R, sqrt( (stats->output_size() * 1.0) / (ltable->max_c0_size) ) ); + *(ltable->R()) = std::max(MIN_R, sqrt( ((double)stats->output_size()) / (ltable->max_c0_size) ) ); + DEBUG("\nR = %f\n", *(ltable->R())); + DEBUG("dmt:\tmerge_count %lld\t#written bytes: %lld\n optimal r %.2f", stats.merge_count, stats.output_size(), *(a->r_i)); // 10: C2 is never to big ltable->set_tree_c2(c2_prime); + stats->handed_off_tree(); DEBUG("dmt:\tUpdated C2's position on disk to %lld\n",(long long)-1); // 13 @@ -424,7 +422,9 @@ void merge_iterators(int xid, datatuple::freetuple(t1); //advance itrA t1 = itrA->next_callerFrees(); - stats->read_tuple_from_large_component(t1); + if(t1) { + stats->read_tuple_from_large_component(t1); + } } if(t1 != 0 && datatuple::compare(t1->key(), t1->keylen(), t2->key(), t2->keylen()) == 0) diff --git a/merger.h b/merger.h index 9a85543..46d7373 100644 --- a/merger.h +++ b/merger.h @@ -16,7 +16,6 @@ struct merger_args { logtable * ltable; int64_t max_size; - double * r_i; }; struct logtable_mergedata