From 0fb644640a3786d267135724e04b4d7626510afd Mon Sep 17 00:00:00 2001 From: sears Date: Thu, 17 Jun 2010 04:49:19 +0000 Subject: [PATCH] improved concurrency for merger. not happy with worst case write latency on my dev machine anymore (seems to be less than 2.5 seconds, up from less than 1). git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@842 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe --- logstore.cpp | 10 ++-- mergeManager.cpp | 120 ++++++++++++++++++++++++----------------------- mergeManager.h | 1 - mergeStats.h | 8 ++++ merger.cpp | 14 ------ 5 files changed, 75 insertions(+), 78 deletions(-) diff --git a/logstore.cpp b/logstore.cpp index 0e65111..085e575 100644 --- a/logstore.cpp +++ b/logstore.cpp @@ -169,7 +169,7 @@ void logtable::flushTable() gettimeofday(&stop_tv,0); stop = tv_to_double(stop_tv); - + pthread_mutex_lock(&rb_mut); set_tree_c0_mergeable(get_tree_c0()); pthread_cond_signal(&c0_ready); @@ -177,6 +177,7 @@ void logtable::flushTable() merge_count ++; set_tree_c0(new memTreeComponent::rbtree_t); + pthread_mutex_unlock(&rb_mut); c0_stats->starting_merge(); tsize = 0; @@ -426,7 +427,6 @@ datatuple * logtable::findTuple_first(int xid, datatuple::key_t key, size template void logtable::insertTuple(datatuple *tuple) { - rwlc_writelock(header_mut); // XXX want this to be a readlock, but tick, and the stats need it to be a writelock for now... //lock the red-black tree merge_mgr->read_tuple_from_small_component(0, tuple); // has to be before rb_mut, since it calls tick with block = true, and that releases header_mut. pthread_mutex_lock(&rb_mut); @@ -471,14 +471,14 @@ void logtable::insertTuple(datatuple *tuple) if(tree_bytes >= max_c0_size ) { DEBUG("tree size before merge %d tuples %lld bytes.\n", tsize, tree_bytes); -// rwlc_unlock(header_mut); -// rwlc_writelock(header_mut); + + rwlc_writelock(header_mut); // the test of tree size needs to be atomic with the flushTable, and flushTable needs a writelock. if(tree_bytes >= max_c0_size) { flushTable(); } + rwlc_unlock(header_mut); } - rwlc_unlock(header_mut); DEBUG("tree size %d tuples %lld bytes.\n", tsize, tree_bytes); } diff --git a/mergeManager.cpp b/mergeManager.cpp index 1372382..c94c001 100644 --- a/mergeManager.cpp +++ b/mergeManager.cpp @@ -22,7 +22,6 @@ mergeStats* mergeManager:: get_merge_stats(int mergeLevel) { } mergeManager::~mergeManager() { - pthread_mutex_destroy(&mut); pthread_mutex_destroy(&throttle_mut); pthread_mutex_destroy(&dummy_throttle_mut); pthread_cond_destroy(&dummy_throttle_cond); @@ -34,17 +33,15 @@ mergeManager::~mergeManager() { void mergeManager::new_merge(int mergeLevel) { mergeStats * s = get_merge_stats(mergeLevel); - pthread_mutex_lock(&mut); - if(s->merge_level == 0) { - // target_size was set during startup - } else if(s->merge_level == 1) { - assert(c0->target_size); - c1->target_size = (pageid_t)(*ltable->R() * (double)c0->target_size); - assert(c1->target_size); - } else if(s->merge_level == 2) { - // target_size is infinity... - } else { abort(); } - pthread_mutex_unlock(&mut); + if(s->merge_level == 0) { + // target_size was set during startup + } else if(s->merge_level == 1) { + assert(c0->target_size); + c1->target_size = (pageid_t)(*ltable->R() * (double)c0->target_size); + assert(c1->target_size); + } else if(s->merge_level == 2) { + // target_size is infinity... + } else { abort(); } s->new_merge2(); } void mergeManager::set_c0_size(int64_t size) { @@ -53,9 +50,12 @@ void mergeManager::set_c0_size(int64_t size) { void mergeManager::update_progress(mergeStats * s, int delta) { s->delta += delta; - if((!delta) || s->delta > 512 * 1024) { - s->delta = 0; - if(delta) s->need_tick = true; + if((!delta) || s->delta > 64 * 1024) { //512 * 1024) { + if(delta) { + rwlc_writelock(ltable->header_mut); + s->delta = 0; + s->need_tick = true; + } if(s->merge_level != 0) { if(s->active) { s->in_progress = ((double)(s->bytes_in_large + s->bytes_in_small)) / (double)(get_merge_stats(s->merge_level-1)->mergeable_size + s->base_size); @@ -70,6 +70,25 @@ void mergeManager::update_progress(mergeStats * s, int delta) { s->out_progress = 0; } } + s->current_size = s->base_size + s->bytes_out - s->bytes_in_large; + struct timeval now; + gettimeofday(&now, 0); + double elapsed_delta = tv_to_double(&now) - ts_to_double(&s->last_tick); + + s->lifetime_elapsed += elapsed_delta; + s->lifetime_consumed += s->bytes_in_small_delta; + double decay = 0.9999; // XXX set this in some principled way. Surely, it should depend on tick_length (once that's working...) + s->window_elapsed = (decay * s->window_elapsed) + elapsed_delta; + s->window_consumed = (decay * s->window_consumed) + s->bytes_in_small_delta; + + double_to_ts(&s->last_tick, tv_to_double(&now)); + + s->bytes_in_small_delta = 0; + + s->bps = s->window_consumed / s->window_elapsed; + + if(delta) rwlc_unlock(ltable->header_mut); + } } @@ -95,63 +114,39 @@ void mergeManager::update_progress(mergeStats * s, int delta) { * bytes_consumed_by_merger = sum(bytes_in_small_delta) */ void mergeManager::tick(mergeStats * s, bool block, bool force) { -#define PRINT_SKIP 20 - pageid_t tick_length_bytes = 256*1024; // probably lower than it could be for production machines. 256KB leads to whining on my dev box. - -// if(s->bytes_in_small_delta > tick_length_bytes) { - pageid_t new_current_size = s->base_size + s->bytes_out - s->bytes_in_large; - -// if(force || ((!block) && (new_current_size - s->current_size > tick_length_bytes)) || -// (block && s->bytes_in_small_delta > tick_length_bytes)) { // other than R, these are protected by a mutex, but this is the only thread that can write them +#define PRINT_SKIP 100 if(force || s->need_tick) { - if(block) { s->need_tick = false; } - s->current_size = new_current_size; // xxx move to update_progress - if(block) { + rwlc_writelock(ltable->header_mut); + s->need_tick = false; while(sleeping[s->merge_level]) { + assert(!sleeping[s->merge_level]); + rwlc_cond_wait(&throttle_wokeup_cond, ltable->header_mut); } - struct timeval now; - gettimeofday(&now, 0); - double elapsed_delta = tv_to_double(&now) - ts_to_double(&s->last_tick); - double bps = 0; // = (double)s->bytes_in_small_delta / (double)elapsed_delta; - - // xxx move bps stuff to update_progress - - s->lifetime_elapsed += elapsed_delta; - s->lifetime_consumed += s->bytes_in_small_delta; - double decay = 0.9999; // XXX set this in some principled way. Surely, it should depend on tick_length (once that's working...) - s->window_elapsed = (decay * s->window_elapsed) + elapsed_delta; - s->window_consumed = (decay * s->window_consumed) + s->bytes_in_small_delta; - - double_to_ts(&s->last_tick, tv_to_double(&now)); - - s->bytes_in_small_delta = 0; int64_t overshoot = 0; int64_t raw_overshoot = 0; - int64_t overshoot_fudge = (int64_t)(((((double)s->current_size) / (double)(s->target_size)) - 0.5) * 4.0*1024.0*1024.0); // XXX set based on avg / max tuple size? + int64_t overshoot_fudge = (int64_t)((s->out_progress-0.5) * 4.0 * 1024.0 * 1024.0); int spin = 0; double total_sleep = 0.0; do{ overshoot = 0; raw_overshoot = 0; - // more bps stuff for update_progress - double c0_c1_bps = c1->window_consumed / c1->window_elapsed; - double c1_c2_bps = c2->window_consumed / c2->window_elapsed; - + double bps; + // This needs to be here (and not in update_progress), since the other guy's in_progress changes while we sleep. if(s->merge_level == 0) { if(!(c1->active && c0->mergeable_size)) { overshoot_fudge = 0; } raw_overshoot = (int64_t)(((double)c0->target_size) * (c0->out_progress - c1->in_progress)); overshoot = raw_overshoot + overshoot_fudge; - bps = c0_c1_bps; + bps = c1->bps; } else if (s->merge_level == 1) { if(!(c2->active && c1->mergeable_size)) { overshoot_fudge = 0; } raw_overshoot = (int64_t)(((double)c1->target_size) * (c1->out_progress - c2->in_progress)); overshoot = raw_overshoot + overshoot_fudge; - bps = c1_c2_bps; + bps = c2->bps; } //#define PP_THREAD_INFO @@ -172,37 +167,48 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) { struct timespec sleep_until; double max_c0_sleep = 0.1; - double max_c1_sleep = 0.1; + double min_c0_sleep = 0.05; + double max_c1_sleep = 1.0; + double min_c1_sleep = 0.1; double max_sleep = s->merge_level == 0 ? max_c0_sleep : max_c1_sleep; - if(sleeptime < 0.1) { sleeptime = 0.1; } + double min_sleep = s->merge_level == 0 ? min_c0_sleep : min_c1_sleep; + + if(sleeptime < min_sleep) { sleeptime = min_sleep; } if(sleeptime > max_sleep) { sleeptime = max_sleep; } spin ++; total_sleep += sleeptime; - if((spin > 20) || (total_sleep > (max_sleep * 10))) { + if((spin > 40) || (total_sleep > (max_sleep * 20))) { printf("\nMerge thread %d Overshoot: raw=%lld, d=%lld eff=%lld Throttle min(1, %6f) spin %d, total_sleep %6.3f\n", s->merge_level, raw_overshoot, overshoot_fudge, overshoot, sleeptime, spin, total_sleep); } + struct timeval now; + gettimeofday(&now, 0); + double_to_ts(&sleep_until, sleeptime + tv_to_double(&now)); sleeping[s->merge_level] = true; rwlc_cond_timedwait(&dummy_throttle_cond, ltable->header_mut, &sleep_until); sleeping[s->merge_level] = false; pthread_cond_broadcast(&throttle_wokeup_cond); gettimeofday(&now, 0); + if(s->merge_level == 0) { update_progress(c1, 0); } + if(s->merge_level == 1) { update_progress(c2, 0); } } else { break; } } while(1); + rwlc_unlock(ltable->header_mut); } else { if(s->print_skipped == PRINT_SKIP) { + if(!force) rwlc_writelock(ltable->header_mut); pretty_print(stdout); + if(!force) rwlc_unlock(ltable->header_mut); s->print_skipped = 0; } else { s->print_skipped++; } } -// pthread_mutex_unlock(&mut); } } @@ -223,6 +229,7 @@ void mergeManager::read_tuple_from_large_component(int merge_level, datatuple * s->bytes_in_large += tup->byte_length(); // tick(s, false); // would be no-op; we just reduced current_size. update_progress(s, tup->byte_length()); + tick(s,false); } } @@ -232,8 +239,8 @@ void mergeManager::wrote_tuple(int merge_level, datatuple * tup) { (s->bytes_out) += tup->byte_length(); // XXX this just updates stat's current size, and (perhaps) does a pretty print. It should not need a mutex. - update_progress(s, tup->byte_length()); - tick(s, false); + // update_progress(s, tup->byte_length()); + // tick(s, false); } void mergeManager::finished_merge(int merge_level) { @@ -253,7 +260,6 @@ mergeManager::mergeManager(logtable *ltable): c0(new mergeStats(0, ltable->max_c0_size)), c1(new mergeStats(1, (int64_t)(((double)ltable->max_c0_size) * *ltable->R()))), c2(new mergeStats(2, 0)) { - pthread_mutex_init(&mut, 0); pthread_mutex_init(&throttle_mut, 0); pthread_mutex_init(&dummy_throttle_mut, 0); pthread_cond_init(&dummy_throttle_cond, 0); @@ -277,13 +283,11 @@ void mergeManager::pretty_print(FILE * out) { bool have_c1m = false; bool have_c2 = false; if(lt) { -// pthread_mutex_lock(<->header_mut); have_c0 = NULL != lt->get_tree_c0(); have_c0m = NULL != lt->get_tree_c0_mergeable(); have_c1 = NULL != lt->get_tree_c1(); have_c1m = NULL != lt->get_tree_c1_mergeable() ; have_c2 = NULL != lt->get_tree_c2(); -// pthread_mutex_unlock(<->header_mut); } double c0_out_progress = 100.0 * c0->current_size / c0->target_size; diff --git a/mergeManager.h b/mergeManager.h index 7c38ee0..0ca6290 100644 --- a/mergeManager.h +++ b/mergeManager.h @@ -50,7 +50,6 @@ public: void pretty_print(FILE * out); private: - pthread_mutex_t mut; logtable* ltable; double throttle_seconds; // double elapsed_seconds; diff --git a/mergeStats.h b/mergeStats.h index 89d0da0..2025ee7 100644 --- a/mergeStats.h +++ b/mergeStats.h @@ -51,6 +51,10 @@ class mergeStats { gettimeofday(&sleep,0); gettimeofday(&last,0); mergeManager::double_to_ts(&last_tick, mergeManager::tv_to_double(&last)); + pthread_mutex_init(&mut,0); + } + ~mergeStats() { + pthread_mutex_destroy(&mut); } void new_merge2() { if(just_handed_off) { @@ -138,9 +142,13 @@ class mergeStats { double window_elapsed; double window_consumed; + double bps; + int print_skipped; // used by pretty print in mergeManager. bool active; + + pthread_mutex_t mut; // protects things touched in tick(), and nothing else. public: void pretty_print(FILE* fd) { diff --git a/merger.cpp b/merger.cpp index 1e9f44c..c93c6a5 100644 --- a/merger.cpp +++ b/merger.cpp @@ -412,26 +412,21 @@ void merge_iterators(int xid, { stasis_log_t * log = (stasis_log_t*)stasis_log(); - rwlc_writelock(ltable->header_mut); // XXX slow datatuple *t1 = itrA->next_callerFrees(); ltable->merge_mgr->read_tuple_from_large_component(stats->merge_level, t1); - rwlc_unlock(ltable->header_mut); // XXX slow datatuple *t2 = 0; int i = 0; - rwlc_writelock(ltable->header_mut); // XXX slow while( (t2=itrB->next_callerFrees()) != 0) { ltable->merge_mgr->read_tuple_from_small_component(stats->merge_level, t2); - rwlc_unlock(ltable->header_mut); // XXX slow DEBUG("tuple\t%lld: keylen %d datalen %d\n", ntuples, *(t2->keylen),*(t2->datalen) ); while(t1 != 0 && datatuple::compare(t1->key(), t1->keylen(), t2->key(), t2->keylen()) < 0) // t1 is less than t2 { - rwlc_writelock(ltable->header_mut); // XXX slow //insert t1 scratch_tree->insertTuple(xid, t1); i+=t1->byte_length(); @@ -443,13 +438,11 @@ void merge_iterators(int xid, ltable->merge_mgr->read_tuple_from_large_component(stats->merge_level, t1); } periodically_force(xid, &i, forceMe, log); - rwlc_unlock(ltable->header_mut); // XXX slow } if(t1 != 0 && datatuple::compare(t1->key(), t1->keylen(), t2->key(), t2->keylen()) == 0) { datatuple *mtuple = ltable->gettuplemerger()->merge(t1,t2); - rwlc_writelock(ltable->header_mut); // XXX slow stats->merged_tuples(mtuple, t2, t1); // this looks backwards, but is right. //insert merged tuple, drop deletes @@ -465,22 +458,18 @@ void merge_iterators(int xid, } datatuple::freetuple(mtuple); periodically_force(xid, &i, forceMe, log); - rwlc_unlock(ltable->header_mut); // XXX slow } else { - rwlc_writelock(ltable->header_mut); // XXX slow //insert t2 scratch_tree->insertTuple(xid, t2); i+=t2->byte_length(); ltable->merge_mgr->wrote_tuple(stats->merge_level, t2); periodically_force(xid, &i, forceMe, log); - rwlc_unlock(ltable->header_mut); // XXX slow // cannot free any tuples here; they may still be read through a lookup } datatuple::freetuple(t2); - rwlc_writelock(ltable->header_mut); // XXX slow } while(t1 != 0) {// t1 is less than t2 @@ -493,11 +482,8 @@ void merge_iterators(int xid, t1 = itrA->next_callerFrees(); ltable->merge_mgr->read_tuple_from_large_component(stats->merge_level, t1); periodically_force(xid, &i, forceMe, log); - rwlc_unlock(ltable->header_mut); // XXX slow - rwlc_writelock(ltable->header_mut); // XXX slow } DEBUG("dpages: %d\tnpages: %d\tntuples: %d\n", dpages, npages, ntuples); scratch_tree->writes_done(); - rwlc_unlock(ltable->header_mut); }