diff --git a/memTreeComponent.h b/memTreeComponent.h index 9ca7f70..fc21f25 100644 --- a/memTreeComponent.h +++ b/memTreeComponent.h @@ -168,7 +168,7 @@ public: if(cur_off_ == num_batched_) { if(mut_) pthread_mutex_lock(mut_); if(cur_size_) { - while(*cur_size_ < (0.7 * (double)target_size_) && ! *flushing_) { // TODO: how to pick this threshold? Too high, and the disk is idle. Too low, and we waste ram. + while(*cur_size_ < (0.8 * (double)target_size_) && ! *flushing_) { // TODO: how to pick this threshold? Too high, and the disk is idle. Too low, and we waste ram. pthread_mutex_unlock(mut_); struct timespec ts; mergeManager::double_to_ts(&ts, 0.1); diff --git a/mergeManager.cpp b/mergeManager.cpp index 3f39a60..776f9b2 100644 --- a/mergeManager.cpp +++ b/mergeManager.cpp @@ -28,6 +28,10 @@ mergeManager::~mergeManager() { delete c0; delete c1; delete c2; + still_running = false; + pthread_cond_signal(&pp_cond); + pthread_join(pp_thread, 0); + pthread_cond_destroy(&pp_cond); } void mergeManager::new_merge(int mergeLevel) { @@ -105,17 +109,18 @@ void mergeManager::update_progress(mergeStats * s, int delta) { s->out_progress = 0.0; } } + #ifdef NO_SNOWSHOVEL s->current_size = s->base_size + s->bytes_out - s->bytes_in_large; + s->out_progress = ((double)s->current_size) / (double)s->target_size; #else - if(s->merge_level == 0 && delta) { - s->current_size = s->bytes_out - s->bytes_in_large; - s->out_progress = ((double)s->current_size) / (double)s->target_size; - } else { - if(s->merge_level == 1 && s->mergeable_size) { - s->out_progress = ((double)s->current_size) / (double)s->target_size; + if(delta) { + if(s->merge_level == 0) { + s->current_size = ltable->tree_bytes; // we need to track the number of bytes consumed by the merger; this data is not present in s, so fall back on ltable's aggregate. + } else { + s->current_size = s->base_size + s->bytes_out - s->bytes_in_large; } - s->current_size = s->base_size + s->bytes_out - s->bytes_in_large; + s->out_progress = ((double)s->current_size) / (double)s->target_size; } #endif struct timeval now; @@ -163,9 +168,6 @@ void mergeManager::update_progress(mergeStats * s, int delta) { */ void mergeManager::tick(mergeStats * s, bool block, bool force) { #define PRINT_SKIP 10000 - if(block) { - // sleep(((double)delta)/[s+1]->bps); // XXX We currently sleep based on the past performance of the current tree. In the limit, this is fine, but it would be better to sleep based on the past throughput of the tree component we're waiting for. fill in the parameters - } if(force || s->need_tick) { if(block) { @@ -233,18 +235,13 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) { overshoot = raw_overshoot + overshoot_fudge; overshoot2 = raw_overshoot + overshoot_fudge2; bps = c2->bps; + if(!c1->mergeable_size) { overshoot = 0; overshoot2 = 0; } } //#define PP_THREAD_INFO #ifdef PP_THREAD_INFO printf("\nMerge thread %d %6f %6f Overshoot: raw=%lld, d=%lld eff=%lld Throttle min(1, %6f) spin %d, total_sleep %6.3f\n", s->merge_level, c0_out_progress, c0_c1_in_progress, raw_overshoot, overshoot_fudge, overshoot, -1.0, spin, total_sleep); #endif - if(s->print_skipped == PRINT_SKIP) { - pretty_print(stdout); - s->print_skipped = 0; - } else { - s->print_skipped++; - } bool one_threshold = (overshoot > 0 || overshoot2 > 0) || (raw_overshoot > 0); bool two_threshold = (overshoot > 0 || overshoot2 > 0) && (raw_overshoot > 0); @@ -265,10 +262,10 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) { spin ++; total_sleep += sleeptime; - // if((spin > 40) || (total_sleep > (max_sleep * 20.0))) { - printf("\nMerge thread %d c0->out=%f c1->in=%f c1->out=%f c2->in=%f\n", s->merge_level, c0->out_progress, c1->in_progress, c1->out_progress, c2->in_progress); - printf("\nMerge thread %d Overshoot: raw=%lld, d=%lld eff=%lld eff2=%lld Throttle min(1, %6f) spin %d, total_sleep %6.3f\n", s->merge_level, (long long)raw_overshoot, (long long)overshoot_fudge, (long long)overshoot, (long long)overshoot2, sleeptime, spin, total_sleep); - //} + if((spin > 40) || (total_sleep > (max_sleep * 20.0))) { + printf("\nMerge thread %d c0->out=%f c1->in=%f c1->out=%f c2->in=%f\n", s->merge_level, c0->out_progress, c1->in_progress, c1->out_progress, c2->in_progress); + printf("\nMerge thread %d Overshoot: raw=%lld, d=%lld eff=%lld eff2=%lld Throttle min(1, %6f) spin %d, total_sleep %6.3f\n", s->merge_level, (long long)raw_overshoot, (long long)overshoot_fudge, (long long)overshoot, (long long)overshoot2, sleeptime, spin, total_sleep); + } sleeping[s->merge_level] = true; if(s->merge_level == 0) abort(); @@ -305,8 +302,8 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) { } double delta = ((double)ltable->tree_bytes)/(0.9*(double)ltable->max_c0_size); // 0 <= delta <= 1.1111 // - (0.9 * (double)ltable->max_c0_size); delta -= 1.0; - if(delta > 0.0005) { - double slp = 0.001 + delta; //delta / (double)(ltable->max_c0_size); + if(delta > 0.00005) { + double slp = 0.001 + 10.0 * delta; //delta / (double)(ltable->max_c0_size); DEBUG("\nsleeping %0.6f tree_megabytes %0.3f\n", slp, ((double)ltable->tree_bytes)/(1024.0*1024.0)); struct timespec sleeptime; double_to_ts(&sleeptime, slp); @@ -318,20 +315,6 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) { } } rwlc_unlock(ltable->header_mut); - //if(s->merge_level == 0) pthread_mutex_unlock(<able->tick_mut); // XXX can this even be the case? - } else { - if(!force) { - if(s->print_skipped == PRINT_SKIP) { - //if(s->merge_level == 0) pthread_mutex_lock(<able->tick_mut); - rwlc_writelock(ltable->header_mut); - pretty_print(stdout); - rwlc_unlock(ltable->header_mut); - //if(s->merge_level == 0) pthread_mutex_unlock(<able->tick_mut); - s->print_skipped = 0; - } else { - s->print_skipped++; - } - } } } } @@ -379,6 +362,30 @@ void mergeManager::finished_merge(int merge_level) { update_progress(get_merge_stats(merge_level), 0); } +void * mergeManager::pretty_print_thread() { + pthread_mutex_t dummy_mut; + pthread_mutex_init(&dummy_mut, 0); + + while(still_running) { + struct timeval tv; + gettimeofday(&tv, 0); + struct timespec ts; + double_to_ts(&ts, tv_to_double(&tv)+1); + pthread_cond_timedwait(&pp_cond, &dummy_mut, &ts); + if(ltable) { + rwlc_readlock(ltable->header_mut); + pretty_print(stdout); + rwlc_unlock(ltable->header_mut); + } + } + printf("\n"); + return 0; +} +void * merge_manager_pretty_print_thread(void * arg) { + mergeManager * m = (mergeManager*)arg; + return m->pretty_print_thread(); +} + mergeManager::mergeManager(logtable *ltable): UPDATE_PROGRESS_PERIOD(0.005), ltable(ltable), @@ -395,6 +402,9 @@ mergeManager::mergeManager(logtable *ltable): double_to_ts(&c0->last_tick, tv_to_double(&tv)); double_to_ts(&c1->last_tick, tv_to_double(&tv)); double_to_ts(&c2->last_tick, tv_to_double(&tv)); + still_running = true; + pthread_cond_init(&pp_cond, 0); + pthread_create(&pp_thread, 0, merge_manager_pretty_print_thread, (void*)this); } void mergeManager::pretty_print(FILE * out) { @@ -413,37 +423,27 @@ void mergeManager::pretty_print(FILE * out) { have_c2 = NULL != lt->get_tree_c2(); } - double c0_out_progress = 100.0 * c0->current_size / c0->target_size; - double c0_c1_in_progress = 100.0 * (c1->bytes_in_large + c1->bytes_in_small) / (c0->mergeable_size + c1->base_size); - double c0_c1_out_progress = 100.0 * c1->current_size / c1->target_size; - double c1_c2_progress = 100.0 * (c2->bytes_in_large + c2->bytes_in_small) / (c1->mergeable_size + c2->base_size); - -#ifdef NO_SNOWSHOVEL - assert((!c1->active) || (c0_c1_in_progress >= -1 && c0_c1_in_progress < 102)); -#endif - assert((!c2->active) || (c1_c2_progress >= -1 && c1_c2_progress < 102)); - fprintf(out,"[merge progress MB/s window (lifetime)]: app [%s %6lldMB ~ %3.0f%% %6.1fsec %4.1f (%4.1f)] %s %s [%s %3.0f%% ~ %3.0f%% %4.1f (%4.1f)] %s %s [%s %3.0f%% %4.1f (%4.1f)] %s ", - c0->active ? "RUN" : "---", (long long)(c0->lifetime_consumed / mb), c0_out_progress, c0->lifetime_elapsed, c0->bps/((double)mb), c0->lifetime_consumed/(((double)mb)*c0->lifetime_elapsed), + c0->active ? "RUN" : "---", (long long)(c0->lifetime_consumed / mb), 100.0 * c0->out_progress, c0->lifetime_elapsed, c0->bps/((double)mb), c0->lifetime_consumed/(((double)mb)*c0->lifetime_elapsed), have_c0 ? "C0" : "..", have_c0m ? "C0'" : "...", - c1->active ? "RUN" : "---", c0_c1_in_progress, c0_c1_out_progress, c1->bps/((double)mb), c1->lifetime_consumed/(((double)mb)*c1->lifetime_elapsed), + c1->active ? "RUN" : "---", 100.0 * c1->in_progress, 100.0 * c1->out_progress, c1->bps/((double)mb), c1->lifetime_consumed/(((double)mb)*c1->lifetime_elapsed), have_c1 ? "C1" : "..", have_c1m ? "C1'" : "...", - c2->active ? "RUN" : "---", c1_c2_progress, c2->bps/((double)mb), c2->lifetime_consumed/(((double)mb)*c2->lifetime_elapsed), + c2->active ? "RUN" : "---", 100.0 * c2->in_progress, c2->bps/((double)mb), c2->lifetime_consumed/(((double)mb)*c2->lifetime_elapsed), have_c2 ? "C2" : ".."); -//#define PP_SIZES +#define PP_SIZES #ifdef PP_SIZES - fprintf(out, "[size in small/large, out, mergeable] C0 %4lld %4lld %4lld %4lld %4lld %4lld ", - c0->target_size/mb, c0->current_size/mb, c0->bytes_in_small/mb, + fprintf(out, "[target cur base in_small in_large, out, mergeable] C0 %4lld %4lld %4lld %4lld %4lld %4lld %4lld ", + c0->target_size/mb, c0->current_size/mb, c0->base_size/mb, c0->bytes_in_small/mb, c0->bytes_in_large/mb, c0->bytes_out/mb, c0->mergeable_size/mb); - fprintf(out, "C1 %4lld %4lld %4lld %4lld %4lld %4lld ", - c1->target_size/mb, c1->current_size/mb, c1->bytes_in_small/mb, + fprintf(out, "C1 %4lld %4lld %4lld %4lld %4lld %4lld %4lld ", + c1->target_size/mb, c1->current_size/mb, c1->base_size/mb, c1->bytes_in_small/mb, c1->bytes_in_large/mb, c1->bytes_out/mb, c1->mergeable_size/mb); - fprintf(out, "C2 ---- %4lld %4lld %4lld %4lld %4lld ", - /*----*/ c2->current_size/mb, c2->bytes_in_small/mb, + fprintf(out, "C2 ---- %4lld %4lld %4lld %4lld %4lld %4lld ", + /*----*/ c2->current_size/mb, c2->base_size/mb, c2->bytes_in_small/mb, c2->bytes_in_large/mb, c2->bytes_out/mb, c2->mergeable_size/mb); #endif // fprintf(out, "Throttle: %6.1f%% (cur) %6.1f%% (overall) ", 100.0*(last_throttle_seconds/(last_elapsed_seconds)), 100.0*(throttle_seconds/(elapsed_seconds))); @@ -459,5 +459,10 @@ void mergeManager::pretty_print(FILE * out) { // ((double)c1_totalConsumed)/((double)c1_totalWorktime), // ((double)c2_totalConsumed)/((double)c2_totalWorktime)); fflush(out); +#ifdef NO_SNOWSHOVEL + assert((!c1->active) || (c1->in_progress >= -0.01 && c1->in_progress < 1.02)); +#endif + assert((!c2->active) || (c2->in_progress >= -0.01 && c2->in_progress < 1.02)); + fprintf(out, "\r"); } diff --git a/mergeManager.h b/mergeManager.h index f5ff26f..62613a0 100644 --- a/mergeManager.h +++ b/mergeManager.h @@ -51,6 +51,7 @@ public: void wrote_tuple(int merge_level, datatuple * tup); void finished_merge(int merge_level); void pretty_print(FILE * out); + void *pretty_print_thread(); private: logtable* ltable; @@ -64,6 +65,9 @@ private: pthread_mutex_t throttle_mut; pthread_cond_t throttle_wokeup_cond; bool sleeping[3]; + bool still_running; + pthread_cond_t pp_cond; + pthread_t pp_thread; }; #endif /* MERGEMANAGER_H_ */ diff --git a/merger.cpp b/merger.cpp index 6c9c386..814174d 100644 --- a/merger.cpp +++ b/merger.cpp @@ -474,11 +474,11 @@ void merge_iterators(int xid, i+=t1->byte_length(); ltable->merge_mgr->wrote_tuple(stats->merge_level, t1); datatuple::freetuple(t1); + //advance itrA t1 = itrA->next_callerFrees(); - if(t1) { - ltable->merge_mgr->read_tuple_from_large_component(stats->merge_level, t1); - } + ltable->merge_mgr->read_tuple_from_large_component(stats->merge_level, t1); + periodically_force(xid, &i, forceMe, log); } @@ -495,9 +495,7 @@ void merge_iterators(int xid, datatuple::freetuple(t1); ltable->merge_mgr->wrote_tuple(stats->merge_level, mtuple); t1 = itrA->next_callerFrees(); //advance itrA - if(t1) { - ltable->merge_mgr->read_tuple_from_large_component(stats->merge_level, t1); - } + ltable->merge_mgr->read_tuple_from_large_component(stats->merge_level, t1); datatuple::freetuple(mtuple); periodically_force(xid, &i, forceMe, log); } diff --git a/test/check_merge.cpp b/test/check_merge.cpp index 5f694e6..185006a 100644 --- a/test/check_merge.cpp +++ b/test/check_merge.cpp @@ -36,6 +36,9 @@ void insertProbeIter(size_t NUM_ENTRIES) std::sort(key_arr->begin(), key_arr->end(), &mycmp); removeduplicates(key_arr); + scramble(key_arr); + + if(key_arr->size() > NUM_ENTRIES) key_arr->erase(key_arr->begin()+NUM_ENTRIES, key_arr->end()); diff --git a/test/check_util.h b/test/check_util.h index 4dcbec8..a559a24 100644 --- a/test/check_util.h +++ b/test/check_util.h @@ -34,6 +34,16 @@ void removeduplicates(std::vector *arr) } } +void scramble(std::vector *arr) { + for(int i = 0; i < arr->size(); i++) { + int other = rand() % arr->size(); + if(other != i) { + std::string s = (*arr)[i]; + (*arr)[i] = (*arr)[other]; + (*arr)[other] = s; + } + } +} //must be given a sorted array // XXX probably don't need two copies of this function.