diff --git a/logstore.cpp b/logstore.cpp index 085e575..3524c0b 100644 --- a/logstore.cpp +++ b/logstore.cpp @@ -169,7 +169,6 @@ void logtable::flushTable() gettimeofday(&stop_tv,0); stop = tv_to_double(stop_tv); - pthread_mutex_lock(&rb_mut); set_tree_c0_mergeable(get_tree_c0()); pthread_cond_signal(&c0_ready); @@ -177,7 +176,6 @@ void logtable::flushTable() merge_count ++; set_tree_c0(new memTreeComponent::rbtree_t); - pthread_mutex_unlock(&rb_mut); c0_stats->starting_merge(); tsize = 0; @@ -207,7 +205,6 @@ datatuple * logtable::findTuple(int xid, const datatuple::key_t key, size //prepare a search tuple datatuple *search_tuple = datatuple::create(key, keySize); - rwlc_readlock(header_mut); pthread_mutex_lock(&rb_mut); @@ -221,6 +218,7 @@ datatuple * logtable::findTuple(int xid, const datatuple::key_t key, size ret_tuple = (*rbitr)->create_copy(); } + rwlc_readlock(header_mut); // has to be before rb_mut, or we could merge the tuple with itself due to an intervening merge pthread_mutex_unlock(&rb_mut); bool done = false; @@ -354,7 +352,6 @@ datatuple * logtable::findTuple_first(int xid, datatuple::key_t key, size //prepare a search tuple datatuple * search_tuple = datatuple::create(key, keySize); - rwlc_readlock(header_mut); datatuple *ret_tuple=0; //step 1: look in tree_c0 @@ -374,6 +371,7 @@ datatuple * logtable::findTuple_first(int xid, datatuple::key_t key, size { DEBUG("Not in mem tree %d\n", tree_c0->size()); + rwlc_readlock(header_mut); pthread_mutex_unlock(&rb_mut); //step: 2 look into first in tree if exists (a first level merge going on) @@ -415,9 +413,9 @@ datatuple * logtable::findTuple_first(int xid, datatuple::key_t key, size //step 5: check c2 ret_tuple = get_tree_c2()->findTuple(xid, key, keySize); } + rwlc_unlock(header_mut); } - rwlc_unlock(header_mut); datatuple::freetuple(search_tuple); return ret_tuple; @@ -465,13 +463,15 @@ void logtable::insertTuple(datatuple *tuple) merge_mgr->wrote_tuple(0, t); - pthread_mutex_unlock(&rb_mut); - //flushing logic if(tree_bytes >= max_c0_size ) { DEBUG("tree size before merge %d tuples %lld bytes.\n", tsize, tree_bytes); + // NOTE: we hold rb_mut across the (blocking on merge) flushTable. Therefore: + // *** Blocking in flushTable is REALLY BAD *** + // Because it blocks readers and writers. + // The merge policy does its best to make sure flushTable does not block. rwlc_writelock(header_mut); // the test of tree size needs to be atomic with the flushTable, and flushTable needs a writelock. if(tree_bytes >= max_c0_size) { @@ -480,6 +480,8 @@ void logtable::insertTuple(datatuple *tuple) rwlc_unlock(header_mut); } + pthread_mutex_unlock(&rb_mut); + DEBUG("tree size %d tuples %lld bytes.\n", tsize, tree_bytes); } diff --git a/mergeManager.cpp b/mergeManager.cpp index 8d70e38..b92e36d 100644 --- a/mergeManager.cpp +++ b/mergeManager.cpp @@ -48,9 +48,11 @@ void mergeManager::set_c0_size(int64_t size) { c0->target_size = size; } +static const int UPDATE_PROGRESS_DELTA = 1 * 1024 * 1024; + void mergeManager::update_progress(mergeStats * s, int delta) { s->delta += delta; - if((!delta) || s->delta > 64 * 1024) { //512 * 1024) { + if((!delta) || s->delta > UPDATE_PROGRESS_DELTA) { //512 * 1024) { if(delta) { rwlc_writelock(ltable->header_mut); s->delta = 0; @@ -127,7 +129,7 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) { int64_t overshoot = 0; int64_t raw_overshoot = 0; - int64_t overshoot_fudge = (int64_t)((s->out_progress-0.5) * 4.0 * 1024.0 * 1024.0); + int64_t overshoot_fudge = (int64_t)((s->out_progress-0.5) * 18.0 * 1024.0 * 1024.0); int spin = 0; double total_sleep = 0.0; do{ @@ -193,6 +195,7 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) { if(s->merge_level == 0) { update_progress(c1, 0); } if(s->merge_level == 1) { update_progress(c2, 0); } } else { + if(overshoot > 0) { s->need_tick = 1; } break; } } while(1); @@ -255,8 +258,8 @@ void mergeManager::finished_merge(int merge_level) { mergeManager::mergeManager(logtable *ltable): ltable(ltable), - c0(new mergeStats(0, ltable->max_c0_size)), - c1(new mergeStats(1, (int64_t)(((double)ltable->max_c0_size) * *ltable->R()))), + c0(new mergeStats(0, ltable ? ltable->max_c0_size : 10000000)), + c1(new mergeStats(1, (int64_t)(ltable ? ((double)(ltable->max_c0_size) * *ltable->R()) : 100000000.0) )), c2(new mergeStats(2, 0)) { pthread_mutex_init(&throttle_mut, 0); pthread_mutex_init(&dummy_throttle_mut, 0); diff --git a/merger.cpp b/merger.cpp index c93c6a5..2efe2e8 100644 --- a/merger.cpp +++ b/merger.cpp @@ -390,7 +390,7 @@ void *diskMergeThread(void*arg) return 0; } -#define FORCE_INTERVAL (2 * 1024 * 1024) // XXX do not hardcode FORCE_INTERVAL +#define FORCE_INTERVAL (1 * 1024 * 1024) // XXX do not hardcode FORCE_INTERVAL static void periodically_force(int xid, int *i, diskTreeComponent * forceMe, stasis_log_t * log) { if(0 && *i > FORCE_INTERVAL) {