diff --git a/logstore.cpp b/logstore.cpp index f684819..e883b39 100644 --- a/logstore.cpp +++ b/logstore.cpp @@ -428,8 +428,8 @@ void logtable::insertTuple(datatuple *tuple) { rwlc_writelock(header_mut); // XXX want this to be a readlock, but tick, and the stats need it to be a writelock for now... //lock the red-black tree + c0_stats->read_tuple_from_small_component(tuple); // has to be before rb_mut, since it calls tick with block = true, and that releases header_mut. pthread_mutex_lock(&rb_mut); - c0_stats->read_tuple_from_small_component(tuple); //find the previous tuple with same key in the memtree if exists memTreeComponent::rbtree_t::iterator rbitr = tree_c0->find(tuple); datatuple * t = 0; diff --git a/logstore.h b/logstore.h index 9d01e6b..320bde7 100644 --- a/logstore.h +++ b/logstore.h @@ -157,7 +157,7 @@ public: merge_(merge), dups((int*)malloc(sizeof(*dups)*num_iters_)) { - current_[0] = first_iter_->getnext(); + current_[0] = first_iter_->next_callerFrees(); for(int i = 1; i < num_iters_; i++) { iters_[i-1] = iters[i-1]; current_[i] = iters_[i-1] ? iters_[i-1]->next_callerFrees() : NULL; @@ -178,17 +178,17 @@ public: free(dups); } TUPLE * peek() { - TUPLE * ret = getnext(); + TUPLE * ret = next_callerFrees(); last_iter_ = -1; // don't advance iterator on next peek() or getnext() call. return ret; } - TUPLE * getnext() { + TUPLE * next_callerFrees() { int num_dups = 0; if(last_iter_ != -1) { // get the value after the one we just returned to the user //TUPLE::freetuple(current_[last_iter_]); // should never be null if(last_iter_ == 0) { - current_[last_iter_] = first_iter_->getnext(); + current_[last_iter_] = first_iter_->next_callerFrees(); } else if(last_iter_ != -1){ current_[last_iter_] = iters_[last_iter_-1]->next_callerFrees(); } else { @@ -291,7 +291,7 @@ public: } private: TUPLE * getnextHelper() { - TUPLE * tmp = merge_it_->getnext(); + TUPLE * tmp = merge_it_->next_callerFrees(); if(last_returned && tmp) { assert(TUPLE::compare(last_returned->key(), last_returned->keylen(), tmp->key(), tmp->keylen()) < 0); TUPLE::freetuple(last_returned); @@ -304,8 +304,9 @@ public: rwlc_readlock(ltable->header_mut); revalidate(); TUPLE * ret = getnextHelper(); - rwlc_readunlock(ltable->header_mut); - return ret ? ret->create_copy() : NULL; + ret = ret ? ret->create_copy() : NULL; + rwlc_unlock(ltable->header_mut); + return ret; } TUPLE * getnext() { @@ -313,8 +314,9 @@ public: revalidate(); TUPLE * ret; while((ret = getnextHelper()) && ret->isDelete()) { } // getNextHelper handles its own memory. + ret = ret ? ret->create_copy() : NULL; // XXX hate making copy! Caller should not manage our memory. rwlc_unlock(ltable->header_mut); - return ret ? ret->create_copy() : NULL; // XXX hate making copy! Caller should not manage our memory. + return ret; } void invalidate() { @@ -392,7 +394,7 @@ public: TUPLE * junk = merge_it_->peek(); if(junk && !TUPLE::compare(junk->key(), junk->keylen(), last_returned->key(), last_returned->keylen())) { // we already returned junk - TUPLE::freetuple(merge_it_->getnext()); + TUPLE::freetuple(merge_it_->next_callerFrees()); } } valid = true; diff --git a/memTreeComponent.h b/memTreeComponent.h index ff63cb2..d4f31f1 100644 --- a/memTreeComponent.h +++ b/memTreeComponent.h @@ -118,7 +118,7 @@ public: if(next_ret_) TUPLE::freetuple(next_ret_); } - TUPLE* getnext() { + TUPLE* next_callerFrees() { if(mut_) pthread_mutex_lock(mut_); TUPLE * ret = next_ret_; if(next_ret_) { diff --git a/merger.cpp b/merger.cpp index 606a623..3086318 100644 --- a/merger.cpp +++ b/merger.cpp @@ -201,14 +201,14 @@ void* memMergeThread(void*arg) // 5: force c1' + rwlc_writelock(ltable->header_mut); + //force write the new tree to disk c1_prime->force(xid); merge_count++; DEBUG("mmt:\tmerge_count %lld #bytes written %lld\n", stats.merge_count, stats.output_size()); - rwlc_writelock(ltable->header_mut); - // Immediately clean out c0 mergeable so that writers may continue. // first, we need to move the c1' into c1. @@ -412,8 +412,8 @@ void merge_iterators(int xid, { stasis_log_t * log = (stasis_log_t*)stasis_log(); - datatuple *t1 = itrA->next_callerFrees(); rwlc_writelock(ltable->header_mut); // XXX slow + datatuple *t1 = itrA->next_callerFrees(); stats->read_tuple_from_large_component(t1); rwlc_unlock(ltable->header_mut); // XXX slow datatuple *t2 = 0; @@ -442,9 +442,8 @@ void merge_iterators(int xid, if(t1) { stats->read_tuple_from_large_component(t1); } - rwlc_unlock(ltable->header_mut); // XXX slow - periodically_force(xid, &i, forceMe, log); + rwlc_unlock(ltable->header_mut); // XXX slow } if(t1 != 0 && datatuple::compare(t1->key(), t1->keylen(), t2->key(), t2->keylen()) == 0) @@ -465,6 +464,7 @@ void merge_iterators(int xid, stats->read_tuple_from_large_component(t1); } datatuple::freetuple(mtuple); + periodically_force(xid, &i, forceMe, log); rwlc_unlock(ltable->header_mut); // XXX slow } else @@ -475,11 +475,10 @@ void merge_iterators(int xid, i+=t2->byte_length(); stats->wrote_tuple(t2); + periodically_force(xid, &i, forceMe, log); rwlc_unlock(ltable->header_mut); // XXX slow - // cannot free any tuples here; they may still be read through a lookup } - periodically_force(xid, &i, forceMe, log); datatuple::freetuple(t2); rwlc_writelock(ltable->header_mut); // XXX slow } @@ -493,12 +492,12 @@ void merge_iterators(int xid, //advance itrA t1 = itrA->next_callerFrees(); stats->read_tuple_from_large_component(t1); - rwlc_unlock(ltable->header_mut); // XXX slow periodically_force(xid, &i, forceMe, log); + rwlc_unlock(ltable->header_mut); // XXX slow rwlc_writelock(ltable->header_mut); // XXX slow } DEBUG("dpages: %d\tnpages: %d\tntuples: %d\n", dpages, npages, ntuples); scratch_tree->writes_done(); - rwlc_writeunlock(ltable->header_mut); + rwlc_unlock(ltable->header_mut); }