more bugfixes / cleanup. latching is probably overly conservative at this point, but ycsb bulkload finally seems to work

git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@819 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
sears 2010-05-28 01:29:10 +00:00
parent d981d91dfa
commit 4050475de9
4 changed files with 21 additions and 20 deletions

View file

@ -428,8 +428,8 @@ void logtable<TUPLE>::insertTuple(datatuple *tuple)
{ {
rwlc_writelock(header_mut); // XXX want this to be a readlock, but tick, and the stats need it to be a writelock for now... rwlc_writelock(header_mut); // XXX want this to be a readlock, but tick, and the stats need it to be a writelock for now...
//lock the red-black tree //lock the red-black tree
c0_stats->read_tuple_from_small_component(tuple); // has to be before rb_mut, since it calls tick with block = true, and that releases header_mut.
pthread_mutex_lock(&rb_mut); pthread_mutex_lock(&rb_mut);
c0_stats->read_tuple_from_small_component(tuple);
//find the previous tuple with same key in the memtree if exists //find the previous tuple with same key in the memtree if exists
memTreeComponent<datatuple>::rbtree_t::iterator rbitr = tree_c0->find(tuple); memTreeComponent<datatuple>::rbtree_t::iterator rbitr = tree_c0->find(tuple);
datatuple * t = 0; datatuple * t = 0;

View file

@ -157,7 +157,7 @@ public:
merge_(merge), merge_(merge),
dups((int*)malloc(sizeof(*dups)*num_iters_)) dups((int*)malloc(sizeof(*dups)*num_iters_))
{ {
current_[0] = first_iter_->getnext(); current_[0] = first_iter_->next_callerFrees();
for(int i = 1; i < num_iters_; i++) { for(int i = 1; i < num_iters_; i++) {
iters_[i-1] = iters[i-1]; iters_[i-1] = iters[i-1];
current_[i] = iters_[i-1] ? iters_[i-1]->next_callerFrees() : NULL; current_[i] = iters_[i-1] ? iters_[i-1]->next_callerFrees() : NULL;
@ -178,17 +178,17 @@ public:
free(dups); free(dups);
} }
TUPLE * peek() { TUPLE * peek() {
TUPLE * ret = getnext(); TUPLE * ret = next_callerFrees();
last_iter_ = -1; // don't advance iterator on next peek() or getnext() call. last_iter_ = -1; // don't advance iterator on next peek() or getnext() call.
return ret; return ret;
} }
TUPLE * getnext() { TUPLE * next_callerFrees() {
int num_dups = 0; int num_dups = 0;
if(last_iter_ != -1) { if(last_iter_ != -1) {
// get the value after the one we just returned to the user // get the value after the one we just returned to the user
//TUPLE::freetuple(current_[last_iter_]); // should never be null //TUPLE::freetuple(current_[last_iter_]); // should never be null
if(last_iter_ == 0) { if(last_iter_ == 0) {
current_[last_iter_] = first_iter_->getnext(); current_[last_iter_] = first_iter_->next_callerFrees();
} else if(last_iter_ != -1){ } else if(last_iter_ != -1){
current_[last_iter_] = iters_[last_iter_-1]->next_callerFrees(); current_[last_iter_] = iters_[last_iter_-1]->next_callerFrees();
} else { } else {
@ -291,7 +291,7 @@ public:
} }
private: private:
TUPLE * getnextHelper() { TUPLE * getnextHelper() {
TUPLE * tmp = merge_it_->getnext(); TUPLE * tmp = merge_it_->next_callerFrees();
if(last_returned && tmp) { if(last_returned && tmp) {
assert(TUPLE::compare(last_returned->key(), last_returned->keylen(), tmp->key(), tmp->keylen()) < 0); assert(TUPLE::compare(last_returned->key(), last_returned->keylen(), tmp->key(), tmp->keylen()) < 0);
TUPLE::freetuple(last_returned); TUPLE::freetuple(last_returned);
@ -304,8 +304,9 @@ public:
rwlc_readlock(ltable->header_mut); rwlc_readlock(ltable->header_mut);
revalidate(); revalidate();
TUPLE * ret = getnextHelper(); TUPLE * ret = getnextHelper();
rwlc_readunlock(ltable->header_mut); ret = ret ? ret->create_copy() : NULL;
return ret ? ret->create_copy() : NULL; rwlc_unlock(ltable->header_mut);
return ret;
} }
TUPLE * getnext() { TUPLE * getnext() {
@ -313,8 +314,9 @@ public:
revalidate(); revalidate();
TUPLE * ret; TUPLE * ret;
while((ret = getnextHelper()) && ret->isDelete()) { } // getNextHelper handles its own memory. while((ret = getnextHelper()) && ret->isDelete()) { } // getNextHelper handles its own memory.
ret = ret ? ret->create_copy() : NULL; // XXX hate making copy! Caller should not manage our memory.
rwlc_unlock(ltable->header_mut); rwlc_unlock(ltable->header_mut);
return ret ? ret->create_copy() : NULL; // XXX hate making copy! Caller should not manage our memory. return ret;
} }
void invalidate() { void invalidate() {
@ -392,7 +394,7 @@ public:
TUPLE * junk = merge_it_->peek(); TUPLE * junk = merge_it_->peek();
if(junk && !TUPLE::compare(junk->key(), junk->keylen(), last_returned->key(), last_returned->keylen())) { if(junk && !TUPLE::compare(junk->key(), junk->keylen(), last_returned->key(), last_returned->keylen())) {
// we already returned junk // we already returned junk
TUPLE::freetuple(merge_it_->getnext()); TUPLE::freetuple(merge_it_->next_callerFrees());
} }
} }
valid = true; valid = true;

View file

@ -118,7 +118,7 @@ public:
if(next_ret_) TUPLE::freetuple(next_ret_); if(next_ret_) TUPLE::freetuple(next_ret_);
} }
TUPLE* getnext() { TUPLE* next_callerFrees() {
if(mut_) pthread_mutex_lock(mut_); if(mut_) pthread_mutex_lock(mut_);
TUPLE * ret = next_ret_; TUPLE * ret = next_ret_;
if(next_ret_) { if(next_ret_) {

View file

@ -201,14 +201,14 @@ void* memMergeThread(void*arg)
// 5: force c1' // 5: force c1'
rwlc_writelock(ltable->header_mut);
//force write the new tree to disk //force write the new tree to disk
c1_prime->force(xid); c1_prime->force(xid);
merge_count++; merge_count++;
DEBUG("mmt:\tmerge_count %lld #bytes written %lld\n", stats.merge_count, stats.output_size()); DEBUG("mmt:\tmerge_count %lld #bytes written %lld\n", stats.merge_count, stats.output_size());
rwlc_writelock(ltable->header_mut);
// Immediately clean out c0 mergeable so that writers may continue. // Immediately clean out c0 mergeable so that writers may continue.
// first, we need to move the c1' into c1. // first, we need to move the c1' into c1.
@ -412,8 +412,8 @@ void merge_iterators(int xid,
{ {
stasis_log_t * log = (stasis_log_t*)stasis_log(); stasis_log_t * log = (stasis_log_t*)stasis_log();
datatuple *t1 = itrA->next_callerFrees();
rwlc_writelock(ltable->header_mut); // XXX slow rwlc_writelock(ltable->header_mut); // XXX slow
datatuple *t1 = itrA->next_callerFrees();
stats->read_tuple_from_large_component(t1); stats->read_tuple_from_large_component(t1);
rwlc_unlock(ltable->header_mut); // XXX slow rwlc_unlock(ltable->header_mut); // XXX slow
datatuple *t2 = 0; datatuple *t2 = 0;
@ -442,9 +442,8 @@ void merge_iterators(int xid,
if(t1) { if(t1) {
stats->read_tuple_from_large_component(t1); stats->read_tuple_from_large_component(t1);
} }
rwlc_unlock(ltable->header_mut); // XXX slow
periodically_force(xid, &i, forceMe, log); periodically_force(xid, &i, forceMe, log);
rwlc_unlock(ltable->header_mut); // XXX slow
} }
if(t1 != 0 && datatuple::compare(t1->key(), t1->keylen(), t2->key(), t2->keylen()) == 0) if(t1 != 0 && datatuple::compare(t1->key(), t1->keylen(), t2->key(), t2->keylen()) == 0)
@ -465,6 +464,7 @@ void merge_iterators(int xid,
stats->read_tuple_from_large_component(t1); stats->read_tuple_from_large_component(t1);
} }
datatuple::freetuple(mtuple); datatuple::freetuple(mtuple);
periodically_force(xid, &i, forceMe, log);
rwlc_unlock(ltable->header_mut); // XXX slow rwlc_unlock(ltable->header_mut); // XXX slow
} }
else else
@ -475,11 +475,10 @@ void merge_iterators(int xid,
i+=t2->byte_length(); i+=t2->byte_length();
stats->wrote_tuple(t2); stats->wrote_tuple(t2);
periodically_force(xid, &i, forceMe, log);
rwlc_unlock(ltable->header_mut); // XXX slow rwlc_unlock(ltable->header_mut); // XXX slow
// cannot free any tuples here; they may still be read through a lookup // cannot free any tuples here; they may still be read through a lookup
} }
periodically_force(xid, &i, forceMe, log);
datatuple::freetuple(t2); datatuple::freetuple(t2);
rwlc_writelock(ltable->header_mut); // XXX slow rwlc_writelock(ltable->header_mut); // XXX slow
} }
@ -493,12 +492,12 @@ void merge_iterators(int xid,
//advance itrA //advance itrA
t1 = itrA->next_callerFrees(); t1 = itrA->next_callerFrees();
stats->read_tuple_from_large_component(t1); stats->read_tuple_from_large_component(t1);
rwlc_unlock(ltable->header_mut); // XXX slow
periodically_force(xid, &i, forceMe, log); periodically_force(xid, &i, forceMe, log);
rwlc_unlock(ltable->header_mut); // XXX slow
rwlc_writelock(ltable->header_mut); // XXX slow rwlc_writelock(ltable->header_mut); // XXX slow
} }
DEBUG("dpages: %d\tnpages: %d\tntuples: %d\n", dpages, npages, ntuples); DEBUG("dpages: %d\tnpages: %d\tntuples: %d\n", dpages, npages, ntuples);
scratch_tree->writes_done(); scratch_tree->writes_done();
rwlc_writeunlock(ltable->header_mut); rwlc_unlock(ltable->header_mut);
} }