incease UPDATE_PROGRESS_DELTA by 10x, and FORCE_INTERVAL by 2.5x; tick no longer grabs a writelock on header_mut, and instead uses its own mutex

git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@870 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
sears 2010-06-21 22:59:05 +00:00
parent fded91fec6
commit cf6c6e5955
5 changed files with 29 additions and 18 deletions

View file

@ -36,6 +36,7 @@ logtable<TUPLE>::logtable(pageid_t internal_region_size, pageid_t datapage_regio
tmerger = new tuplemerger(&replace_merger);
header_mut = rwlc_initlock();
pthread_mutex_init(&tick_mut, 0);
pthread_mutex_init(&rb_mut, 0);
pthread_cond_init(&c0_needed, 0);
pthread_cond_init(&c0_ready, 0);
@ -66,6 +67,7 @@ logtable<TUPLE>::~logtable()
}
pthread_mutex_destroy(&rb_mut);
pthread_mutex_destroy(&tick_mut);
rwlc_deletelock(header_mut);
pthread_cond_destroy(&c0_needed);
pthread_cond_destroy(&c0_ready);

View file

@ -104,6 +104,7 @@ public:
logtable_mergedata * mergedata;
rwlc * header_mut;
pthread_mutex_t tick_mut;
pthread_mutex_t rb_mut;
int64_t max_c0_size;
mergeManager * merge_mgr;

View file

@ -118,10 +118,13 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) {
if(force || s->need_tick) {
if(block) {
rwlc_writelock(ltable->header_mut);
pthread_mutex_lock(&ltable->tick_mut);
rwlc_readlock(ltable->header_mut);
while(sleeping[s->merge_level]) {
rwlc_cond_wait(&throttle_wokeup_cond, ltable->header_mut);
rwlc_unlock(ltable->header_mut);
pthread_cond_wait(&throttle_wokeup_cond, &ltable->tick_mut);
rwlc_readlock(ltable->header_mut);
}
int64_t overshoot = 0;
@ -143,7 +146,7 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) {
entirely, so it's better to start thottling too early than
too late. */
overshoot_fudge *= 2;
overshoot_fudge2 *= 2;
overshoot_fudge2 *= 4;
int spin = 0;
double total_sleep = 0.0;
do{
@ -187,7 +190,7 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) {
struct timespec sleep_until;
double max_c0_sleep = 0.1;
double min_c0_sleep = 0.05;
double min_c0_sleep = 0.01;
double max_c1_sleep = 0.5;
double min_c1_sleep = 0.1;
double max_sleep = s->merge_level == 0 ? max_c0_sleep : max_c1_sleep;
@ -200,9 +203,7 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) {
total_sleep += sleeptime;
if((spin > 40) || (total_sleep > (max_sleep * 20.0))) {
// if(spin > 20 || s->merge_level == 0) {
printf("\nMerge thread %d Overshoot: raw=%lld, d=%lld eff=%lld Throttle min(1, %6f) spin %d, total_sleep %6.3f\n", s->merge_level, raw_overshoot, overshoot_fudge, overshoot, sleeptime, spin, total_sleep);
// }
}
struct timeval now;
@ -210,7 +211,9 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) {
double_to_ts(&sleep_until, sleeptime + tv_to_double(&now));
sleeping[s->merge_level] = true;
rwlc_cond_timedwait(&dummy_throttle_cond, ltable->header_mut, &sleep_until);
rwlc_unlock(ltable->header_mut);
pthread_cond_timedwait(&dummy_throttle_cond, &ltable->tick_mut, &sleep_until);
rwlc_readlock(ltable->header_mut);
sleeping[s->merge_level] = false;
pthread_cond_broadcast(&throttle_wokeup_cond);
gettimeofday(&now, 0);
@ -219,7 +222,7 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) {
} else {
if(overshoot > 0 || overshoot2 > 0) {
s->need_tick ++;
if(s->need_tick > 100) { printf("need tick %d\n", s->need_tick); }
if(s->need_tick > 500) { printf("need tick %d\n", s->need_tick); }
} else {
s->need_tick = 0;
}
@ -227,14 +230,19 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) {
}
} while(1);
rwlc_unlock(ltable->header_mut);
pthread_mutex_unlock(&ltable->tick_mut);
} else {
if(s->print_skipped == PRINT_SKIP) {
if(!force) rwlc_writelock(ltable->header_mut);
pretty_print(stdout);
if(!force) rwlc_unlock(ltable->header_mut);
s->print_skipped = 0;
} else {
s->print_skipped++;
if(!force) {
if(s->print_skipped == PRINT_SKIP) {
pthread_mutex_lock(&ltable->tick_mut);
rwlc_readlock(ltable->header_mut);
pretty_print(stdout);
rwlc_unlock(ltable->header_mut);
pthread_mutex_unlock(&ltable->tick_mut);
s->print_skipped = 0;
} else {
s->print_skipped++;
}
}
}
}

View file

@ -21,8 +21,8 @@ class mergeStats;
class mergeManager {
public:
static const int UPDATE_PROGRESS_DELTA = 1024 * 1024;
static const int FORCE_INTERVAL = 10 * 1024 * 1024;
static const int UPDATE_PROGRESS_DELTA = 10 * 1024 * 1024;
static const int FORCE_INTERVAL = 25 * 1024 * 1024;
static double tv_to_double(struct timeval * tv) {
return (double)tv->tv_sec + ((double)tv->tv_usec)/1000000.0;
}

View file

@ -79,7 +79,7 @@ int main(int argc, char *argv[])
if(argc == 2 && !strcmp(argv[1], "--test")) {
c0_size = 1024 * 1024 * 10;
c0_size = 1024 * 1024 * 100;
printf("warning: running w/ tiny c0 for testing"); // XXX build a separate test server and deployment server?
}