From 960ff041e2149b3d2d0298aa6d255c00b5579942 Mon Sep 17 00:00:00 2001 From: sears Date: Tue, 24 Aug 2010 00:40:48 +0000 Subject: [PATCH] remove old comments git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@1055 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe --- logstore.cpp | 4 +- memTreeComponent.h | 36 ----- mergeManager.cpp | 323 ++++++++++++++++++++------------------------- mergeManager.h | 6 +- merger.cpp | 8 +- 5 files changed, 151 insertions(+), 226 deletions(-) diff --git a/logstore.cpp b/logstore.cpp index 8894a2e..2c29279 100644 --- a/logstore.cpp +++ b/logstore.cpp @@ -179,7 +179,9 @@ void logtable::flushTable() gettimeofday(&start_tv,0); start = tv_to_double(start_tv); - merge_mgr->finished_merge(0); +#ifdef NO_SNOWSHOVEL + merge_mgr->finished_merge(0); // XXX will deadlock.. +#endif flushing = true; bool blocked = false; diff --git a/memTreeComponent.h b/memTreeComponent.h index fc21f25..7654f69 100644 --- a/memTreeComponent.h +++ b/memTreeComponent.h @@ -189,35 +189,10 @@ public: batchedRevalidatingIterator( rbtree_t *s, int64_t* cur_size, int64_t target_size, bool * flushing, int batch_size, pthread_mutex_t * rb_mut ) : s_(s), cur_size_(cur_size), target_size_(target_size), flushing_(flushing), batch_size_(batch_size), num_batched_(batch_size), cur_off_(batch_size), mut_(rb_mut) { next_ret_ = (TUPLE**)malloc(sizeof(next_ret_[0]) * batch_size_); populate_next_ret(); -/* if(mut_) pthread_mutex_lock(mut_); - if(s_->begin() == s_->end()) { - next_ret_ = NULL; - } else { - next_ret_ = (*s_->begin())->create_copy(); // the create_copy() calls have to happen before we release mut_... - } - if(mut_) pthread_mutex_unlock(mut_); */ } batchedRevalidatingIterator( rbtree_t *s, int batch_size, pthread_mutex_t * rb_mut, TUPLE *&key ) : s_(s), cur_size_(0), target_size_(0), flushing_(0), batch_size_(batch_size), num_batched_(batch_size), cur_off_(batch_size), mut_(rb_mut) { next_ret_ = (TUPLE**)malloc(sizeof(next_ret_[0]) * batch_size_); populate_next_ret(key); -/* if(mut_) pthread_mutex_lock(mut_); - if(key) { - if(s_->find(key) != s_->end()) { - next_ret_ = (*(s_->find(key)))->create_copy(); - } else if(s_->upper_bound(key) != s_->end()) { - next_ret_ = (*(s_->upper_bound(key)))->create_copy(); - } else { - next_ret_ = NULL; - } - } else { - if(s_->begin() == s_->end()) { - next_ret_ = NULL; - } else { - next_ret_ = (*s_->begin())->create_copy(); // the create_copy() calls have to happen before we release mut_... - } - } - // DEBUG("changing mem next ret = %s key = %s\n", next_ret_ ? (const char*)next_ret_->key() : "NONE", key ? (const char*)key->key() : "NULL"); - if(mut_) pthread_mutex_unlock(mut_); */ } ~batchedRevalidatingIterator() { @@ -225,20 +200,9 @@ public: TUPLE::freetuple(next_ret_[i]); } free(next_ret_); -// if(next_ret_) TUPLE::freetuple(next_ret_); } TUPLE* next_callerFrees() { -/* if(mut_) pthread_mutex_lock(mut_); - TUPLE * ret = next_ret_; - if(next_ret_) { - if(s_->upper_bound(next_ret_) == s_->end()) { - next_ret_ = 0; - } else { - next_ret_ = (*s_->upper_bound(next_ret_))->create_copy(); - } - } - if(mut_) pthread_mutex_unlock(mut_); */ if(cur_off_ == num_batched_) { return NULL; } // the last thing we did is call populate_next_ret_(), which only leaves us in this state at the end of the iterator. TUPLE * ret = next_ret_[cur_off_]; cur_off_++; diff --git a/mergeManager.cpp b/mergeManager.cpp index 776f9b2..5374b04 100644 --- a/mergeManager.cpp +++ b/mergeManager.cpp @@ -23,8 +23,6 @@ mergeStats* mergeManager:: get_merge_stats(int mergeLevel) { } mergeManager::~mergeManager() { - pthread_mutex_destroy(&throttle_mut); - pthread_cond_destroy(&throttle_wokeup_cond); delete c0; delete c1; delete c2; @@ -51,35 +49,45 @@ void mergeManager::set_c0_size(int64_t size) { c0->target_size = size; } -void mergeManager::update_progress(mergeStats * s, int delta) { - s->delta += delta; +void mergeManager::sleep_on_mini_delta(mergeStats *s, int delta) { s->mini_delta += delta; - { - if(/*s->merge_level < 2*/ s->merge_level == 1 && s->mergeable_size && delta) { - int64_t effective_max_delta = (int64_t)(UPDATE_PROGRESS_PERIOD * s->bps); + if(/*s->merge_level < 2*/ s->merge_level == 1 && s->mergeable_size && delta) { + int64_t effective_max_delta = (int64_t)(UPDATE_PROGRESS_PERIOD * s->bps); // if(s->merge_level == 0) { s->base_size = ltable->tree_bytes; } - if(s->mini_delta > effective_max_delta) { - struct timeval now; - gettimeofday(&now, 0); - double now_double = tv_to_double(&now); - double elapsed_delta = now_double - ts_to_double(&s->last_mini_tick); - double slp = UPDATE_PROGRESS_PERIOD - elapsed_delta; - if(slp > 0.001) { -// struct timespec sleeptime; -// double_to_ts(&sleeptime, slp); -// nanosleep(&sleeptime, 0); -// printf("%d Sleep A %f\n", s->merge_level, slp); - } - double_to_ts(&s->last_mini_tick, now_double); - s->mini_delta = 0; + if(s->mini_delta > effective_max_delta) { + struct timeval now; + gettimeofday(&now, 0); + double now_double = tv_to_double(&now); + double elapsed_delta = now_double - ts_to_double(&s->last_mini_tick); + double slp = UPDATE_PROGRESS_PERIOD - elapsed_delta; + if(slp > 0.001) { + struct timespec sleeptime; + double_to_ts(&sleeptime, slp); + nanosleep(&sleeptime, 0); + printf("%d Sleep A %f\n", s->merge_level, slp); } + double_to_ts(&s->last_mini_tick, now_double); + s->mini_delta = 0; } } +} +void mergeManager::update_progress(mergeStats * s, int delta) { + s->delta += delta; +#if 0 +#ifdef NO_SNOWSHOVEL + if(merge_level < 2 && delta) { +#else + if(merge_level == 1 && delta) { +#endif + sleep_on_mini_delta(s, delta); + } +#endif + if((!delta) || s->delta > UPDATE_PROGRESS_DELTA) { + rwlc_writelock(ltable->header_mut); if(delta) { - rwlc_writelock(ltable->header_mut); s->delta = 0; if(!s->need_tick) { s->need_tick = 1; } } @@ -96,33 +104,23 @@ void mergeManager::update_progress(mergeStats * s, int delta) { } else if(s->merge_level == 1) { // C0-C1 merge (c0 is continuously growing...) if(s->active) { s->in_progress = ((double)(s->bytes_in_large+s->bytes_in_small)) / (double)(s->base_size+ltable->mean_c0_effective_size); - if(s->in_progress > 0.95) { s->in_progress = 0.95; } - assert(s->in_progress > -0.01 && s->in_progress < 1.02); +// if(s->in_progress > 0.95) { s->in_progress = 0.95; } +// assert(s->in_progress > -0.01 && s->in_progress < 1.02); } else { s->in_progress = 0; } } - if(s->merge_level != 2) { - if(s->mergeable_size) { - s->out_progress = ((double)s->current_size + (double)s->base_size) / (double)s->target_size; - } else { - s->out_progress = 0.0; - } - } #ifdef NO_SNOWSHOVEL s->current_size = s->base_size + s->bytes_out - s->bytes_in_large; - s->out_progress = ((double)s->current_size) / (double)s->target_size; #else - if(delta) { - if(s->merge_level == 0) { - s->current_size = ltable->tree_bytes; // we need to track the number of bytes consumed by the merger; this data is not present in s, so fall back on ltable's aggregate. - } else { - s->current_size = s->base_size + s->bytes_out - s->bytes_in_large; - } - s->out_progress = ((double)s->current_size) / (double)s->target_size; + if(s->merge_level == 0) { + s->current_size = ltable->tree_bytes; // we need to track the number of bytes consumed by the merger; this data is not present in s, so fall back on ltable's aggregate. + } else { + s->current_size = s->base_size + s->bytes_out - s->bytes_in_large; } #endif + s->out_progress = ((double)s->current_size) / (double)s->target_size; struct timeval now; gettimeofday(&now, 0); double elapsed_delta = tv_to_double(&now) - ts_to_double(&s->last_tick); @@ -140,11 +138,99 @@ void mergeManager::update_progress(mergeStats * s, int delta) { s->bytes_in_small_delta = 0; - if(delta) rwlc_unlock(ltable->header_mut); + rwlc_unlock(ltable->header_mut); } } +void mergeManager::tick_based_on_merge_progress(mergeStats *s) { + /* model the effect of linux + stasis' write caches; at the end + of this merge, we need to force up to FORCE_INTERVAL bytes + after we think we're done writing the next component. */ + int64_t overshoot_fudge = (int64_t)((s->out_progress) * ((double)FORCE_INTERVAL)); + /* model the effect of amortizing this computation: we could + become this much more overshot if we don't act now. */ + int64_t overshoot_fudge2 = UPDATE_PROGRESS_DELTA; + /* multiply by 2 for good measure. These are 'soft' walls, and + still let writes trickle through. Once we've exhausted the + fudge factors, we'll hit a hard wall, and stop writes + entirely, so it's better to start throttling too early than + too late. */ + overshoot_fudge *= 2; + overshoot_fudge2 *= 4; + + const double max_c0_sleep = 0.1; + const double min_c0_sleep = 0.01; + const double max_c1_sleep = 0.5; + const double min_c1_sleep = 0.1; + double max_sleep = s->merge_level == 0 ? max_c0_sleep : max_c1_sleep; + double min_sleep = s->merge_level == 0 ? min_c0_sleep : min_c1_sleep; + + int spin = 0; + double total_sleep = 0.0; + while(1) { + int64_t overshoot = 0; + int64_t overshoot2 = 0; + int64_t raw_overshoot = 0; + double bps; + // This needs to be here (and not in update_progress), since the other guy's in_progress changes while we sleep. + mergeStats * s1; + if(s->merge_level == 0) { + // dead code + s1 = c1; + } else { + s1 = c2; + } + if(s->mergeable_size) { // only apply backpressure if the next merger is not waiting for us + rwlc_readlock(ltable->header_mut); + if(s1->active && s->mergeable_size) { + raw_overshoot = (int64_t)(((double)s->target_size) * (s->out_progress - s1->in_progress)); + bps = s1->bps; + } + rwlc_unlock(ltable->header_mut); + } +//#define PP_THREAD_INFO +#ifdef PP_THREAD_INFO + printf("\nMerge thread %d %6f %6f Overshoot: raw=%lld, d=%lld eff=%lld Throttle min(1, %6f) spin %d, total_sleep %6.3f\n", s->merge_level, c0_out_progress, c0_c1_in_progress, raw_overshoot, overshoot_fudge, overshoot, -1.0, spin, total_sleep); +#endif + bool one_threshold = (overshoot > 0 || overshoot2 > 0) || (raw_overshoot > 0); + bool two_threshold = (overshoot > 0 || overshoot2 > 0) && (raw_overshoot > 0); + + if(one_threshold && (two_threshold || total_sleep < 0.01)) { + + // throttle + // it took "elapsed" seconds to process "tick_length_bytes" mb + double sleeptime = 2.0 * fmax((double)overshoot,(double)overshoot2) / bps; + if(sleeptime < min_sleep) { sleeptime = min_sleep; } + if(sleeptime > max_sleep) { sleeptime = max_sleep; } + + spin ++; + total_sleep += sleeptime; + + if((spin > 40) || (total_sleep > (max_sleep * 20.0))) { + printf("\nMerge thread %d c0->out=%f c1->in=%f c1->out=%f c2->in=%f\n", s->merge_level, c0->out_progress, c1->in_progress, c1->out_progress, c2->in_progress); + printf("\nMerge thread %d Overshoot: raw=%lld, d=%lld eff=%lld eff2=%lld Throttle min(1, %6f) spin %d, total_sleep %6.3f\n", s->merge_level, (long long)raw_overshoot, (long long)overshoot_fudge, (long long)overshoot, (long long)overshoot2, sleeptime, spin, total_sleep); + } + + sleeping[s->merge_level] = true; + + struct timespec ts; + double_to_ts(&ts, sleeptime); + nanosleep(&ts, 0); + // printf("%d Sleep B %f\n", s->merge_level, sleeptime); + + sleeping[s->merge_level] = false; + update_progress(s1, 0); + } else if(overshoot > 0 || overshoot2 > 0) { // this is our second time around the loop, so remember we're in the hole + s->need_tick ++; + if(s->need_tick > 500) { printf("need tick %d\n", s->need_tick); } + } else { // all is well in the world. we can run worry-free for a while. + s->need_tick = 0; + } + break; + } +} + /** * This function is invoked periodically by the merge threads. It updates mergeManager's statistics, and applies * backpressure as necessary. @@ -166,156 +252,38 @@ void mergeManager::update_progress(mergeStats * s, int delta) { * * bytes_consumed_by_merger = sum(bytes_in_small_delta) */ -void mergeManager::tick(mergeStats * s, bool block, bool force) { -#define PRINT_SKIP 10000 - if(force || s->need_tick) { - - if(block) { - if(s->merge_level == 0) { -// pthread_mutex_lock(<able->tick_mut); - rwlc_readlock(ltable->header_mut); - - while(sleeping[s->merge_level]) { - abort(); - rwlc_unlock(ltable->header_mut); -// pthread_cond_wait(&throttle_wokeup_cond, <able->tick_mut); - rwlc_writelock(ltable->header_mut); - } - } else { - rwlc_readlock(ltable->header_mut); - while(sleeping[s->merge_level]) { - abort(); // if we're asleep, didn't this thread make us sleep??? - rwlc_cond_wait(&throttle_wokeup_cond, ltable->header_mut); - } - } +void mergeManager::tick(mergeStats * s) { + if(s->need_tick) { #ifdef NO_SNOWSHOVEL - bool snowshovel = false; + bool snowshovel = false; #else - bool snowshovel = true; + bool snowshovel = true; #endif - if((!snowshovel) || s->merge_level == 1) { // apply backpressure based on merge progress. - int64_t overshoot = 0; - int64_t overshoot2 = 0; - int64_t raw_overshoot = 0; + if((!snowshovel) || s->merge_level == 1) { // apply backpressure based on merge progress. + tick_based_on_merge_progress(s); + } else if(s->merge_level == 0) { + // Simple backpressure algorithm based on how full C0 is. - /* model the effect of linux + stasis' write caches; at the end - of this merge, we need to force up to FORCE_INTERVAL bytes - after we think we're done writing the next component. */ - double skew = 0.0; - - int64_t overshoot_fudge = (int64_t)((s->out_progress-skew) * ((double)FORCE_INTERVAL)/(1.0-skew)); - /* model the effect of amortizing this computation: we could - become this much more overshot if we don't act now. */ - int64_t overshoot_fudge2 = UPDATE_PROGRESS_DELTA; //(int64_t)(((double)UPDATE_PROGRESS_PERIOD) * s->bps / 1000.0); - /* multiply by 2 for good measure. These are 'soft' walls, and - still let writes trickle through. Once we've exausted the - fudge factors, we'll hit a hard wall, and stop writes - entirely, so it's better to start thottling too early than - too late. */ - overshoot_fudge *= 2; - overshoot_fudge2 *= 4; - int spin = 0; - double total_sleep = 0.0; - do{ - overshoot = 0; - overshoot2 = 0; - raw_overshoot = 0; - double bps; - // This needs to be here (and not in update_progress), since the other guy's in_progress changes while we sleep. - if(s->merge_level == 0) { - abort(); - if(!(c1->active && c0->mergeable_size)) { overshoot_fudge = 0; overshoot_fudge2 = 0; } - raw_overshoot = (int64_t)(((double)c0->target_size) * (c0->out_progress - c1->in_progress)); - overshoot = raw_overshoot + overshoot_fudge; - overshoot2 = raw_overshoot + overshoot_fudge2; - bps = c1->bps; - } else if (s->merge_level == 1) { - if(!(c2->active && c1->mergeable_size)) { overshoot_fudge = 0; overshoot_fudge2 = 0; } - raw_overshoot = (int64_t)(((double)c1->target_size) * (c1->out_progress - c2->in_progress)); - overshoot = raw_overshoot + overshoot_fudge; - overshoot2 = raw_overshoot + overshoot_fudge2; - bps = c2->bps; - if(!c1->mergeable_size) { overshoot = 0; overshoot2 = 0; } - } - -//#define PP_THREAD_INFO - #ifdef PP_THREAD_INFO - printf("\nMerge thread %d %6f %6f Overshoot: raw=%lld, d=%lld eff=%lld Throttle min(1, %6f) spin %d, total_sleep %6.3f\n", s->merge_level, c0_out_progress, c0_c1_in_progress, raw_overshoot, overshoot_fudge, overshoot, -1.0, spin, total_sleep); - #endif - bool one_threshold = (overshoot > 0 || overshoot2 > 0) || (raw_overshoot > 0); - bool two_threshold = (overshoot > 0 || overshoot2 > 0) && (raw_overshoot > 0); - - if(one_threshold && (two_threshold || total_sleep < 0.01)) { - // throttle - // it took "elapsed" seconds to process "tick_length_bytes" mb - double sleeptime = 2.0 * fmax((double)overshoot,(double)overshoot2) / bps; - double max_c0_sleep = 0.1; - double min_c0_sleep = 0.01; - double max_c1_sleep = 0.5; - double min_c1_sleep = 0.1; - double max_sleep = s->merge_level == 0 ? max_c0_sleep : max_c1_sleep; - double min_sleep = s->merge_level == 0 ? min_c0_sleep : min_c1_sleep; - - if(sleeptime < min_sleep) { sleeptime = min_sleep; } - if(sleeptime > max_sleep) { sleeptime = max_sleep; } - - spin ++; - total_sleep += sleeptime; - - if((spin > 40) || (total_sleep > (max_sleep * 20.0))) { - printf("\nMerge thread %d c0->out=%f c1->in=%f c1->out=%f c2->in=%f\n", s->merge_level, c0->out_progress, c1->in_progress, c1->out_progress, c2->in_progress); - printf("\nMerge thread %d Overshoot: raw=%lld, d=%lld eff=%lld eff2=%lld Throttle min(1, %6f) spin %d, total_sleep %6.3f\n", s->merge_level, (long long)raw_overshoot, (long long)overshoot_fudge, (long long)overshoot, (long long)overshoot2, sleeptime, spin, total_sleep); - } - - sleeping[s->merge_level] = true; - if(s->merge_level == 0) abort(); - rwlc_unlock(ltable->header_mut); - struct timespec ts; - double_to_ts(&ts, sleeptime); - nanosleep(&ts, 0); - // printf("%d Sleep B %f\n", s->merge_level, sleeptime); - - // rwlc_writelock(ltable->header_mut); - rwlc_readlock(ltable->header_mut); - sleeping[s->merge_level] = false; - pthread_cond_broadcast(&throttle_wokeup_cond); - if(s->merge_level == 0) { update_progress(c1, 0); } - if(s->merge_level == 1) { update_progress(c2, 0); } - } else { - if(overshoot > 0 || overshoot2 > 0) { - s->need_tick ++; - if(s->need_tick > 500) { printf("need tick %d\n", s->need_tick); } - } else { - s->need_tick = 0; - } - break; - } - } while(1); - } else if(s->merge_level == 0) { - while(/*s->current_size*/ltable->tree_bytes > ltable->max_c0_size) { - rwlc_unlock(ltable->header_mut); + // Is C0 bigger than is allowed? + while(ltable->tree_bytes > ltable->max_c0_size) { // can't use s->current_size, since this is the thread that maintains that number... printf("\nMEMORY OVERRUN!!!! SLEEP!!!!\n"); struct timespec ts; double_to_ts(&ts, 0.1); nanosleep(&ts, 0); - rwlc_readlock(ltable->header_mut); } - double delta = ((double)ltable->tree_bytes)/(0.9*(double)ltable->max_c0_size); // 0 <= delta <= 1.1111 // - (0.9 * (double)ltable->max_c0_size); + // Linear backpressure model + double delta = ((double)ltable->tree_bytes)/(0.9*(double)ltable->max_c0_size); // 0 <= delta <= 1.111... delta -= 1.0; if(delta > 0.00005) { - double slp = 0.001 + 10.0 * delta; //delta / (double)(ltable->max_c0_size); + double slp = 0.001 + 10.0 * delta; //0.0015 < slp < 1.112111.. + DEBUG("\nsleeping %0.6f tree_megabytes %0.3f\n", slp, ((double)ltable->tree_bytes)/(1024.0*1024.0)); struct timespec sleeptime; double_to_ts(&sleeptime, slp); - rwlc_unlock(ltable->header_mut); DEBUG("%d Sleep C %f\n", s->merge_level, slp); - nanosleep(&sleeptime, 0); - rwlc_readlock(ltable->header_mut); } } - rwlc_unlock(ltable->header_mut); - } } } @@ -326,7 +294,7 @@ void mergeManager::read_tuple_from_small_component(int merge_level, datatuple * (s->bytes_in_small_delta) += tup->byte_length(); (s->bytes_in_small) += tup->byte_length(); update_progress(s, tup->byte_length()); - tick(s, true); + tick(s); } } void mergeManager::read_tuple_from_large_component(int merge_level, datatuple * tup) { @@ -334,9 +302,7 @@ void mergeManager::read_tuple_from_large_component(int merge_level, datatuple * mergeStats * s = get_merge_stats(merge_level); s->num_tuples_in_large++; s->bytes_in_large += tup->byte_length(); -// tick(s, false); // would be no-op; we just reduced current_size. update_progress(s, tup->byte_length()); - tick(s,false); } } @@ -344,15 +310,10 @@ void mergeManager::wrote_tuple(int merge_level, datatuple * tup) { mergeStats * s = get_merge_stats(merge_level); (s->num_tuples_out)++; (s->bytes_out) += tup->byte_length(); - - // XXX this just updates stat's current size, and (perhaps) does a pretty print. It should not need a mutex. - // update_progress(s, tup->byte_length()); - // tick(s, false); } void mergeManager::finished_merge(int merge_level) { update_progress(get_merge_stats(merge_level), 0); - tick(get_merge_stats(merge_level), false, true); // XXX what does this do??? get_merge_stats(merge_level)->active = false; if(merge_level != 0) { get_merge_stats(merge_level - 1)->mergeable_size = 0; @@ -392,8 +353,6 @@ mergeManager::mergeManager(logtable *ltable): c0(new mergeStats(0, ltable ? ltable->max_c0_size : 10000000)), c1(new mergeStats(1, (int64_t)(ltable ? ((double)(ltable->max_c0_size) * *ltable->R()) : 100000000.0) )), c2(new mergeStats(2, 0)) { - pthread_mutex_init(&throttle_mut, 0); - pthread_cond_init(&throttle_wokeup_cond, 0); struct timeval tv; gettimeofday(&tv, 0); sleeping[0] = false; diff --git a/mergeManager.h b/mergeManager.h index 62613a0..35a8557 100644 --- a/mergeManager.h +++ b/mergeManager.h @@ -43,8 +43,10 @@ public: void new_merge(int mergelevel); void set_c0_size(int64_t size); + void sleep_on_mini_delta(mergeStats *s, int delta); void update_progress(mergeStats *s, int delta); - void tick(mergeStats * s, bool block, bool force = false); + void tick(mergeStats * s); + void tick_based_on_merge_progress(mergeStats * s); mergeStats* get_merge_stats(int mergeLevel); void read_tuple_from_small_component(int merge_level, datatuple * tup); void read_tuple_from_large_component(int merge_level, datatuple * tup); @@ -62,8 +64,6 @@ private: mergeStats * c0; mergeStats * c1; mergeStats * c2; - pthread_mutex_t throttle_mut; - pthread_cond_t throttle_wokeup_cond; bool sleeping[3]; bool still_running; pthread_cond_t pp_cond; diff --git a/merger.cpp b/merger.cpp index 814174d..27184e6 100644 --- a/merger.cpp +++ b/merger.cpp @@ -227,8 +227,6 @@ void* memMergeThread(void*arg) ltable->update_persistent_header(xid, 1); Tcommit(xid); - ltable->merge_mgr->finished_merge(1); - //TODO: this is simplistic for now //6: if c1' is too big, signal the other merger @@ -282,6 +280,8 @@ void* memMergeThread(void*arg) // 13 rwlc_unlock(ltable->header_mut); + + ltable->merge_mgr->finished_merge(1); // stats->pretty_print(stdout); //TODO: get the freeing outside of the lock @@ -390,10 +390,10 @@ void *diskMergeThread(void*arg) ltable->update_persistent_header(xid, 2); Tcommit(xid); - ltable->merge_mgr->finished_merge(2); - rwlc_unlock(ltable->header_mut); // stats->pretty_print(stdout); + ltable->merge_mgr->finished_merge(2); + } return 0;