fix deadlock; do not clear need_tick bit until overshoot is less than zero; tweak constants accordingly (fewer mutex acquistions! less blockage!)
git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@852 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
parent
0a2c4796a8
commit
2530009ec0
3 changed files with 17 additions and 12 deletions
16
logstore.cpp
16
logstore.cpp
|
@ -169,7 +169,6 @@ void logtable<TUPLE>::flushTable()
|
||||||
|
|
||||||
gettimeofday(&stop_tv,0);
|
gettimeofday(&stop_tv,0);
|
||||||
stop = tv_to_double(stop_tv);
|
stop = tv_to_double(stop_tv);
|
||||||
pthread_mutex_lock(&rb_mut);
|
|
||||||
set_tree_c0_mergeable(get_tree_c0());
|
set_tree_c0_mergeable(get_tree_c0());
|
||||||
|
|
||||||
pthread_cond_signal(&c0_ready);
|
pthread_cond_signal(&c0_ready);
|
||||||
|
@ -177,7 +176,6 @@ void logtable<TUPLE>::flushTable()
|
||||||
|
|
||||||
merge_count ++;
|
merge_count ++;
|
||||||
set_tree_c0(new memTreeComponent<datatuple>::rbtree_t);
|
set_tree_c0(new memTreeComponent<datatuple>::rbtree_t);
|
||||||
pthread_mutex_unlock(&rb_mut);
|
|
||||||
c0_stats->starting_merge();
|
c0_stats->starting_merge();
|
||||||
|
|
||||||
tsize = 0;
|
tsize = 0;
|
||||||
|
@ -207,7 +205,6 @@ datatuple * logtable<TUPLE>::findTuple(int xid, const datatuple::key_t key, size
|
||||||
//prepare a search tuple
|
//prepare a search tuple
|
||||||
datatuple *search_tuple = datatuple::create(key, keySize);
|
datatuple *search_tuple = datatuple::create(key, keySize);
|
||||||
|
|
||||||
rwlc_readlock(header_mut);
|
|
||||||
|
|
||||||
pthread_mutex_lock(&rb_mut);
|
pthread_mutex_lock(&rb_mut);
|
||||||
|
|
||||||
|
@ -221,6 +218,7 @@ datatuple * logtable<TUPLE>::findTuple(int xid, const datatuple::key_t key, size
|
||||||
ret_tuple = (*rbitr)->create_copy();
|
ret_tuple = (*rbitr)->create_copy();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
rwlc_readlock(header_mut); // has to be before rb_mut, or we could merge the tuple with itself due to an intervening merge
|
||||||
pthread_mutex_unlock(&rb_mut);
|
pthread_mutex_unlock(&rb_mut);
|
||||||
|
|
||||||
bool done = false;
|
bool done = false;
|
||||||
|
@ -354,7 +352,6 @@ datatuple * logtable<TUPLE>::findTuple_first(int xid, datatuple::key_t key, size
|
||||||
//prepare a search tuple
|
//prepare a search tuple
|
||||||
datatuple * search_tuple = datatuple::create(key, keySize);
|
datatuple * search_tuple = datatuple::create(key, keySize);
|
||||||
|
|
||||||
rwlc_readlock(header_mut);
|
|
||||||
|
|
||||||
datatuple *ret_tuple=0;
|
datatuple *ret_tuple=0;
|
||||||
//step 1: look in tree_c0
|
//step 1: look in tree_c0
|
||||||
|
@ -374,6 +371,7 @@ datatuple * logtable<TUPLE>::findTuple_first(int xid, datatuple::key_t key, size
|
||||||
{
|
{
|
||||||
DEBUG("Not in mem tree %d\n", tree_c0->size());
|
DEBUG("Not in mem tree %d\n", tree_c0->size());
|
||||||
|
|
||||||
|
rwlc_readlock(header_mut);
|
||||||
pthread_mutex_unlock(&rb_mut);
|
pthread_mutex_unlock(&rb_mut);
|
||||||
|
|
||||||
//step: 2 look into first in tree if exists (a first level merge going on)
|
//step: 2 look into first in tree if exists (a first level merge going on)
|
||||||
|
@ -415,9 +413,9 @@ datatuple * logtable<TUPLE>::findTuple_first(int xid, datatuple::key_t key, size
|
||||||
//step 5: check c2
|
//step 5: check c2
|
||||||
ret_tuple = get_tree_c2()->findTuple(xid, key, keySize);
|
ret_tuple = get_tree_c2()->findTuple(xid, key, keySize);
|
||||||
}
|
}
|
||||||
|
rwlc_unlock(header_mut);
|
||||||
}
|
}
|
||||||
|
|
||||||
rwlc_unlock(header_mut);
|
|
||||||
datatuple::freetuple(search_tuple);
|
datatuple::freetuple(search_tuple);
|
||||||
|
|
||||||
return ret_tuple;
|
return ret_tuple;
|
||||||
|
@ -465,13 +463,15 @@ void logtable<TUPLE>::insertTuple(datatuple *tuple)
|
||||||
|
|
||||||
merge_mgr->wrote_tuple(0, t);
|
merge_mgr->wrote_tuple(0, t);
|
||||||
|
|
||||||
pthread_mutex_unlock(&rb_mut);
|
|
||||||
|
|
||||||
//flushing logic
|
//flushing logic
|
||||||
if(tree_bytes >= max_c0_size )
|
if(tree_bytes >= max_c0_size )
|
||||||
{
|
{
|
||||||
DEBUG("tree size before merge %d tuples %lld bytes.\n", tsize, tree_bytes);
|
DEBUG("tree size before merge %d tuples %lld bytes.\n", tsize, tree_bytes);
|
||||||
|
|
||||||
|
// NOTE: we hold rb_mut across the (blocking on merge) flushTable. Therefore:
|
||||||
|
// *** Blocking in flushTable is REALLY BAD ***
|
||||||
|
// Because it blocks readers and writers.
|
||||||
|
// The merge policy does its best to make sure flushTable does not block.
|
||||||
rwlc_writelock(header_mut);
|
rwlc_writelock(header_mut);
|
||||||
// the test of tree size needs to be atomic with the flushTable, and flushTable needs a writelock.
|
// the test of tree size needs to be atomic with the flushTable, and flushTable needs a writelock.
|
||||||
if(tree_bytes >= max_c0_size) {
|
if(tree_bytes >= max_c0_size) {
|
||||||
|
@ -480,6 +480,8 @@ void logtable<TUPLE>::insertTuple(datatuple *tuple)
|
||||||
rwlc_unlock(header_mut);
|
rwlc_unlock(header_mut);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pthread_mutex_unlock(&rb_mut);
|
||||||
|
|
||||||
DEBUG("tree size %d tuples %lld bytes.\n", tsize, tree_bytes);
|
DEBUG("tree size %d tuples %lld bytes.\n", tsize, tree_bytes);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -48,9 +48,11 @@ void mergeManager::set_c0_size(int64_t size) {
|
||||||
c0->target_size = size;
|
c0->target_size = size;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static const int UPDATE_PROGRESS_DELTA = 1 * 1024 * 1024;
|
||||||
|
|
||||||
void mergeManager::update_progress(mergeStats * s, int delta) {
|
void mergeManager::update_progress(mergeStats * s, int delta) {
|
||||||
s->delta += delta;
|
s->delta += delta;
|
||||||
if((!delta) || s->delta > 64 * 1024) { //512 * 1024) {
|
if((!delta) || s->delta > UPDATE_PROGRESS_DELTA) { //512 * 1024) {
|
||||||
if(delta) {
|
if(delta) {
|
||||||
rwlc_writelock(ltable->header_mut);
|
rwlc_writelock(ltable->header_mut);
|
||||||
s->delta = 0;
|
s->delta = 0;
|
||||||
|
@ -127,7 +129,7 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) {
|
||||||
|
|
||||||
int64_t overshoot = 0;
|
int64_t overshoot = 0;
|
||||||
int64_t raw_overshoot = 0;
|
int64_t raw_overshoot = 0;
|
||||||
int64_t overshoot_fudge = (int64_t)((s->out_progress-0.5) * 4.0 * 1024.0 * 1024.0);
|
int64_t overshoot_fudge = (int64_t)((s->out_progress-0.5) * 18.0 * 1024.0 * 1024.0);
|
||||||
int spin = 0;
|
int spin = 0;
|
||||||
double total_sleep = 0.0;
|
double total_sleep = 0.0;
|
||||||
do{
|
do{
|
||||||
|
@ -193,6 +195,7 @@ void mergeManager::tick(mergeStats * s, bool block, bool force) {
|
||||||
if(s->merge_level == 0) { update_progress(c1, 0); }
|
if(s->merge_level == 0) { update_progress(c1, 0); }
|
||||||
if(s->merge_level == 1) { update_progress(c2, 0); }
|
if(s->merge_level == 1) { update_progress(c2, 0); }
|
||||||
} else {
|
} else {
|
||||||
|
if(overshoot > 0) { s->need_tick = 1; }
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
} while(1);
|
} while(1);
|
||||||
|
@ -255,8 +258,8 @@ void mergeManager::finished_merge(int merge_level) {
|
||||||
|
|
||||||
mergeManager::mergeManager(logtable<datatuple> *ltable):
|
mergeManager::mergeManager(logtable<datatuple> *ltable):
|
||||||
ltable(ltable),
|
ltable(ltable),
|
||||||
c0(new mergeStats(0, ltable->max_c0_size)),
|
c0(new mergeStats(0, ltable ? ltable->max_c0_size : 10000000)),
|
||||||
c1(new mergeStats(1, (int64_t)(((double)ltable->max_c0_size) * *ltable->R()))),
|
c1(new mergeStats(1, (int64_t)(ltable ? ((double)(ltable->max_c0_size) * *ltable->R()) : 100000000.0) )),
|
||||||
c2(new mergeStats(2, 0)) {
|
c2(new mergeStats(2, 0)) {
|
||||||
pthread_mutex_init(&throttle_mut, 0);
|
pthread_mutex_init(&throttle_mut, 0);
|
||||||
pthread_mutex_init(&dummy_throttle_mut, 0);
|
pthread_mutex_init(&dummy_throttle_mut, 0);
|
||||||
|
|
|
@ -390,7 +390,7 @@ void *diskMergeThread(void*arg)
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
#define FORCE_INTERVAL (2 * 1024 * 1024) // XXX do not hardcode FORCE_INTERVAL
|
#define FORCE_INTERVAL (1 * 1024 * 1024) // XXX do not hardcode FORCE_INTERVAL
|
||||||
|
|
||||||
static void periodically_force(int xid, int *i, diskTreeComponent * forceMe, stasis_log_t * log) {
|
static void periodically_force(int xid, int *i, diskTreeComponent * forceMe, stasis_log_t * log) {
|
||||||
if(0 && *i > FORCE_INTERVAL) {
|
if(0 && *i > FORCE_INTERVAL) {
|
||||||
|
|
Loading…
Reference in a new issue