From cf6c6e5955fcbfc9ac2d5d61172359ac89224dc4 Mon Sep 17 00:00:00 2001 From: sears Date: Mon, 21 Jun 2010 22:59:05 +0000 Subject: [PATCH] incease UPDATE_PROGRESS_DELTA by 10x, and FORCE_INTERVAL by 2.5x; tick no longer grabs a writelock on header_mut, and instead uses its own mutex git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@870 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe --- logstore.cpp | 2 ++ logstore.h | 1 + mergeManager.cpp | 38 +++++++++++++++++++++++--------------- mergeManager.h | 4 ++-- server.cpp | 2 +- 5 files changed, 29 insertions(+), 18 deletions(-) diff --git a/logstore.cpp b/logstore.cpp index a08b270..aff7609 100644 --- a/logstore.cpp +++ b/logstore.cpp @@ -36,6 +36,7 @@ logtable::logtable(pageid_t internal_region_size, pageid_t datapage_regio tmerger = new tuplemerger(&replace_merger); header_mut = rwlc_initlock(); + pthread_mutex_init(&tick_mut, 0); pthread_mutex_init(&rb_mut, 0); pthread_cond_init(&c0_needed, 0); pthread_cond_init(&c0_ready, 0); @@ -66,6 +67,7 @@ logtable::~logtable() } pthread_mutex_destroy(&rb_mut); + pthread_mutex_destroy(&tick_mut); rwlc_deletelock(header_mut); pthread_cond_destroy(&c0_needed); pthread_cond_destroy(&c0_ready); diff --git a/logstore.h b/logstore.h index c91683d..e91660d 100644 --- a/logstore.h +++ b/logstore.h @@ -104,6 +104,7 @@ public: logtable_mergedata * mergedata; rwlc * header_mut; + pthread_mutex_t tick_mut; pthread_mutex_t rb_mut; int64_t max_c0_size; mergeManager * merge_mgr; diff --git a/mergeManager.cpp b/mergeManager.cpp index baf9c65..49e09c6 100644 --- a/mergeManager.cpp +++ b/mergeManager.cpp @@ -118,10 +118,13 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) { if(force || s->need_tick) { if(block) { - rwlc_writelock(ltable->header_mut); + pthread_mutex_lock(<able->tick_mut); + rwlc_readlock(ltable->header_mut); while(sleeping[s->merge_level]) { - rwlc_cond_wait(&throttle_wokeup_cond, ltable->header_mut); + rwlc_unlock(ltable->header_mut); + pthread_cond_wait(&throttle_wokeup_cond, <able->tick_mut); + rwlc_readlock(ltable->header_mut); } int64_t overshoot = 0; @@ -143,7 +146,7 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) { entirely, so it's better to start thottling too early than too late. */ overshoot_fudge *= 2; - overshoot_fudge2 *= 2; + overshoot_fudge2 *= 4; int spin = 0; double total_sleep = 0.0; do{ @@ -187,7 +190,7 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) { struct timespec sleep_until; double max_c0_sleep = 0.1; - double min_c0_sleep = 0.05; + double min_c0_sleep = 0.01; double max_c1_sleep = 0.5; double min_c1_sleep = 0.1; double max_sleep = s->merge_level == 0 ? max_c0_sleep : max_c1_sleep; @@ -200,9 +203,7 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) { total_sleep += sleeptime; if((spin > 40) || (total_sleep > (max_sleep * 20.0))) { -// if(spin > 20 || s->merge_level == 0) { printf("\nMerge thread %d Overshoot: raw=%lld, d=%lld eff=%lld Throttle min(1, %6f) spin %d, total_sleep %6.3f\n", s->merge_level, raw_overshoot, overshoot_fudge, overshoot, sleeptime, spin, total_sleep); -// } } struct timeval now; @@ -210,7 +211,9 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) { double_to_ts(&sleep_until, sleeptime + tv_to_double(&now)); sleeping[s->merge_level] = true; - rwlc_cond_timedwait(&dummy_throttle_cond, ltable->header_mut, &sleep_until); + rwlc_unlock(ltable->header_mut); + pthread_cond_timedwait(&dummy_throttle_cond, <able->tick_mut, &sleep_until); + rwlc_readlock(ltable->header_mut); sleeping[s->merge_level] = false; pthread_cond_broadcast(&throttle_wokeup_cond); gettimeofday(&now, 0); @@ -219,7 +222,7 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) { } else { if(overshoot > 0 || overshoot2 > 0) { s->need_tick ++; - if(s->need_tick > 100) { printf("need tick %d\n", s->need_tick); } + if(s->need_tick > 500) { printf("need tick %d\n", s->need_tick); } } else { s->need_tick = 0; } @@ -227,14 +230,19 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) { } } while(1); rwlc_unlock(ltable->header_mut); + pthread_mutex_unlock(<able->tick_mut); } else { - if(s->print_skipped == PRINT_SKIP) { - if(!force) rwlc_writelock(ltable->header_mut); - pretty_print(stdout); - if(!force) rwlc_unlock(ltable->header_mut); - s->print_skipped = 0; - } else { - s->print_skipped++; + if(!force) { + if(s->print_skipped == PRINT_SKIP) { + pthread_mutex_lock(<able->tick_mut); + rwlc_readlock(ltable->header_mut); + pretty_print(stdout); + rwlc_unlock(ltable->header_mut); + pthread_mutex_unlock(<able->tick_mut); + s->print_skipped = 0; + } else { + s->print_skipped++; + } } } } diff --git a/mergeManager.h b/mergeManager.h index 02f21ff..b45e8a8 100644 --- a/mergeManager.h +++ b/mergeManager.h @@ -21,8 +21,8 @@ class mergeStats; class mergeManager { public: - static const int UPDATE_PROGRESS_DELTA = 1024 * 1024; - static const int FORCE_INTERVAL = 10 * 1024 * 1024; + static const int UPDATE_PROGRESS_DELTA = 10 * 1024 * 1024; + static const int FORCE_INTERVAL = 25 * 1024 * 1024; static double tv_to_double(struct timeval * tv) { return (double)tv->tv_sec + ((double)tv->tv_usec)/1000000.0; } diff --git a/server.cpp b/server.cpp index 13e75b3..c7127b1 100644 --- a/server.cpp +++ b/server.cpp @@ -79,7 +79,7 @@ int main(int argc, char *argv[]) if(argc == 2 && !strcmp(argv[1], "--test")) { - c0_size = 1024 * 1024 * 10; + c0_size = 1024 * 1024 * 100; printf("warning: running w/ tiny c0 for testing"); // XXX build a separate test server and deployment server? }