restore support for concurrent disk reads; logserver was accidentally using a findTuple function that assumed diffs. This commit uses rwlc, and requires Stasis -r1393 or better

git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@814 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
sears 2010-05-27 01:49:27 +00:00
parent 0e375c005b
commit 6eecf1557a
5 changed files with 79 additions and 57 deletions

View file

@ -435,7 +435,7 @@ int op_insert(pthread_data* data, datatuple * tuple) {
} }
int op_find(pthread_data* data, datatuple * tuple) { int op_find(pthread_data* data, datatuple * tuple) {
//find the tuple //find the tuple
datatuple *dt = data->ltable->findTuple(-1, tuple->key(), tuple->keylen()); datatuple *dt = data->ltable->findTuple_first(-1, tuple->key(), tuple->keylen());
#ifdef STATS_ENABLED #ifdef STATS_ENABLED
@ -514,7 +514,7 @@ int op_stat_space_usage(pthread_data* data) {
int xid = Tbegin(); int xid = Tbegin();
pthread_mutex_lock(&data->ltable->header_mut); rwlc_readlock(data->ltable->header_mut);
/* pageid_t datapage_c1_region_length, datapage_c1_mergeable_region_length = 0, datapage_c2_region_length; /* pageid_t datapage_c1_region_length, datapage_c1_mergeable_region_length = 0, datapage_c2_region_length;
pageid_t datapage_c1_region_count, datapage_c1_mergeable_region_count = 0, datapage_c2_region_count; pageid_t datapage_c1_region_count, datapage_c1_mergeable_region_count = 0, datapage_c2_region_count;
@ -587,7 +587,7 @@ int op_stat_space_usage(pthread_data* data) {
; ;
} while(TregionNextBoundaryTag(xid, &pid, &tag, 0/*all allocation managers*/)); } while(TregionNextBoundaryTag(xid, &pid, &tag, 0/*all allocation managers*/));
pthread_mutex_unlock(&data->ltable->header_mut); rwlc_unlock(data->ltable->header_mut);
Tcommit(xid); Tcommit(xid);
@ -672,7 +672,7 @@ int op_dbg_blockmap(pthread_data* data) {
// produce a list of stasis regions // produce a list of stasis regions
int xid = Tbegin(); int xid = Tbegin();
pthread_mutex_lock(&data->ltable->header_mut); rwlc_readlock(data->ltable->header_mut);
// produce a list of regions used by current tree components // produce a list of regions used by current tree components
/* pageid_t datapage_c1_region_length, datapage_c1_mergeable_region_length = 0, datapage_c2_region_length; /* pageid_t datapage_c1_region_length, datapage_c1_mergeable_region_length = 0, datapage_c2_region_length;
@ -714,7 +714,7 @@ int op_dbg_blockmap(pthread_data* data) {
&internal_c2_region_length, &internal_c2_region_count, &internal_c2_regions, &internal_c2_region_length, &internal_c2_region_count, &internal_c2_regions,
&datapage_c2_region_length, &datapage_c2_region_count, &datapage_c2_regions); &datapage_c2_region_length, &datapage_c2_region_count, &datapage_c2_regions);
pthread_mutex_unlock(&data->ltable->header_mut); rwlc_unlock(data->ltable->header_mut);
Tcommit(xid); Tcommit(xid);

View file

@ -35,7 +35,8 @@ logtable<TUPLE>::logtable(pageid_t internal_region_size, pageid_t datapage_regio
//tmerger = new tuplemerger(&append_merger); //tmerger = new tuplemerger(&append_merger);
tmerger = new tuplemerger(&replace_merger); tmerger = new tuplemerger(&replace_merger);
pthread_mutex_init(&header_mut, 0); header_mut = rwlc_initlock();
pthread_mutex_init(&rb_mut, 0);
pthread_cond_init(&c0_needed, 0); pthread_cond_init(&c0_needed, 0);
pthread_cond_init(&c0_ready, 0); pthread_cond_init(&c0_ready, 0);
pthread_cond_init(&c1_needed, 0); pthread_cond_init(&c1_needed, 0);
@ -68,7 +69,8 @@ logtable<TUPLE>::~logtable()
memTreeComponent<datatuple>::tearDownTree(tree_c0); memTreeComponent<datatuple>::tearDownTree(tree_c0);
} }
pthread_mutex_destroy(&header_mut); pthread_mutex_destroy(&rb_mut);
rwlc_deletelock(header_mut);
pthread_cond_destroy(&c0_needed); pthread_cond_destroy(&c0_needed);
pthread_cond_destroy(&c0_ready); pthread_cond_destroy(&c0_ready);
pthread_cond_destroy(&c1_needed); pthread_cond_destroy(&c1_needed);
@ -144,7 +146,7 @@ void logtable<TUPLE>::flushTable()
start = tv_to_double(start_tv); start = tv_to_double(start_tv);
pthread_mutex_lock(&header_mut); rwlc_writelock(header_mut);
int expmcount = merge_count; int expmcount = merge_count;
c0_stats->finished_merge(); c0_stats->finished_merge();
@ -155,10 +157,10 @@ void logtable<TUPLE>::flushTable()
bool blocked = false; bool blocked = false;
while(get_tree_c0_mergeable()) { while(get_tree_c0_mergeable()) {
pthread_cond_wait(&c0_needed, &header_mut); rwlc_cond_wait(&c0_needed, header_mut);
blocked = true; blocked = true;
if(expmcount != merge_count) { if(expmcount != merge_count) {
pthread_mutex_unlock(&header_mut); rwlc_writeunlock(header_mut);
return; return;
} }
} }
@ -181,7 +183,7 @@ void logtable<TUPLE>::flushTable()
tsize = 0; tsize = 0;
tree_bytes = 0; tree_bytes = 0;
pthread_mutex_unlock(&header_mut); rwlc_writeunlock(header_mut);
if(blocked && stop - start > 0.1) { if(blocked && stop - start > 0.1) {
if(first) if(first)
@ -207,7 +209,9 @@ datatuple * logtable<TUPLE>::findTuple(int xid, const datatuple::key_t key, size
//prepare a search tuple //prepare a search tuple
datatuple *search_tuple = datatuple::create(key, keySize); datatuple *search_tuple = datatuple::create(key, keySize);
pthread_mutex_lock(&header_mut); rwlc_readlock(header_mut);
pthread_mutex_lock(&rb_mut);
datatuple *ret_tuple=0; datatuple *ret_tuple=0;
@ -219,6 +223,8 @@ datatuple * logtable<TUPLE>::findTuple(int xid, const datatuple::key_t key, size
ret_tuple = (*rbitr)->create_copy(); ret_tuple = (*rbitr)->create_copy();
} }
pthread_mutex_unlock(&rb_mut);
bool done = false; bool done = false;
//step: 2 look into first in tree if exists (a first level merge going on) //step: 2 look into first in tree if exists (a first level merge going on)
if(get_tree_c0_mergeable() != 0) if(get_tree_c0_mergeable() != 0)
@ -245,7 +251,6 @@ datatuple * logtable<TUPLE>::findTuple(int xid, const datatuple::key_t key, size
} }
} }
//TODO: Arrange to only hold read latches while hitting disk.
//step 3: check c1 //step 3: check c1
if(!done) if(!done)
@ -335,7 +340,7 @@ datatuple * logtable<TUPLE>::findTuple(int xid, const datatuple::key_t key, size
} }
} }
pthread_mutex_unlock(&header_mut); // XXX release this each time we could block on disk... rwlc_unlock(header_mut);
datatuple::freetuple(search_tuple); datatuple::freetuple(search_tuple);
return ret_tuple; return ret_tuple;
@ -351,21 +356,28 @@ datatuple * logtable<TUPLE>::findTuple_first(int xid, datatuple::key_t key, size
//prepare a search tuple //prepare a search tuple
datatuple * search_tuple = datatuple::create(key, keySize); datatuple * search_tuple = datatuple::create(key, keySize);
pthread_mutex_lock(&header_mut); rwlc_readlock(header_mut);
datatuple *ret_tuple=0; datatuple *ret_tuple=0;
//step 1: look in tree_c0 //step 1: look in tree_c0
pthread_mutex_lock(&rb_mut);
memTreeComponent<datatuple>::rbtree_t::iterator rbitr = get_tree_c0()->find(search_tuple); memTreeComponent<datatuple>::rbtree_t::iterator rbitr = get_tree_c0()->find(search_tuple);
if(rbitr != get_tree_c0()->end()) if(rbitr != get_tree_c0()->end())
{ {
DEBUG("tree_c0 size %d\n", tree_c0->size()); DEBUG("tree_c0 size %d\n", tree_c0->size());
ret_tuple = (*rbitr)->create_copy(); ret_tuple = (*rbitr)->create_copy();
pthread_mutex_unlock(&rb_mut);
} }
else else
{ {
DEBUG("Not in mem tree %d\n", tree_c0->size()); DEBUG("Not in mem tree %d\n", tree_c0->size());
pthread_mutex_unlock(&rb_mut);
//step: 2 look into first in tree if exists (a first level merge going on) //step: 2 look into first in tree if exists (a first level merge going on)
if(get_tree_c0_mergeable() != NULL) if(get_tree_c0_mergeable() != NULL)
{ {
@ -407,7 +419,7 @@ datatuple * logtable<TUPLE>::findTuple_first(int xid, datatuple::key_t key, size
} }
} }
pthread_mutex_unlock(&header_mut); rwlc_unlock(header_mut);
datatuple::freetuple(search_tuple); datatuple::freetuple(search_tuple);
return ret_tuple; return ret_tuple;
@ -417,8 +429,9 @@ datatuple * logtable<TUPLE>::findTuple_first(int xid, datatuple::key_t key, size
template<class TUPLE> template<class TUPLE>
void logtable<TUPLE>::insertTuple(datatuple *tuple) void logtable<TUPLE>::insertTuple(datatuple *tuple)
{ {
rwlc_readlock(header_mut);
//lock the red-black tree //lock the red-black tree
pthread_mutex_lock(&header_mut); pthread_mutex_lock(&rb_mut);
c0_stats->read_tuple_from_small_component(tuple); c0_stats->read_tuple_from_small_component(tuple);
//find the previous tuple with same key in the memtree if exists //find the previous tuple with same key in the memtree if exists
memTreeComponent<datatuple>::rbtree_t::iterator rbitr = tree_c0->find(tuple); memTreeComponent<datatuple>::rbtree_t::iterator rbitr = tree_c0->find(tuple);
@ -455,15 +468,17 @@ void logtable<TUPLE>::insertTuple(datatuple *tuple)
c0_stats->wrote_tuple(t); c0_stats->wrote_tuple(t);
pthread_mutex_unlock(&rb_mut);
//flushing logic //flushing logic
if(tree_bytes >= max_c0_size ) if(tree_bytes >= max_c0_size )
{ {
DEBUG("tree size before merge %d tuples %lld bytes.\n", tsize, tree_bytes); DEBUG("tree size before merge %d tuples %lld bytes.\n", tsize, tree_bytes);
pthread_mutex_unlock(&header_mut); rwlc_unlock(header_mut);
flushTable(); flushTable();
} else { } else {
//unlock //unlock
pthread_mutex_unlock(&header_mut); rwlc_unlock(header_mut);
} }
DEBUG("tree size %d tuples %lld bytes.\n", tsize, tree_bytes); DEBUG("tree size %d tuples %lld bytes.\n", tsize, tree_bytes);

View file

@ -100,7 +100,8 @@ public:
}; };
logtable_mergedata * mergedata; logtable_mergedata * mergedata;
pthread_mutex_t header_mut; rwlc * header_mut;
pthread_mutex_t rb_mut;
int64_t max_c0_size; int64_t max_c0_size;
mergeManager * merge_mgr; mergeManager * merge_mgr;
@ -251,10 +252,12 @@ public:
last_returned(NULL), last_returned(NULL),
key(NULL), key(NULL),
valid(false) { valid(false) {
pthread_mutex_lock(&ltable->header_mut); rwlc_readlock(ltable->header_mut);
pthread_mutex_lock(&ltable->rb_mut);
ltable->registerIterator(this); ltable->registerIterator(this);
pthread_mutex_unlock(&ltable->rb_mut);
validate(); validate();
pthread_mutex_unlock(&ltable->header_mut); rwlc_unlock(ltable->header_mut);
} }
explicit iterator(logtable* ltable,TUPLE *key) explicit iterator(logtable* ltable,TUPLE *key)
@ -265,18 +268,22 @@ public:
key(key), key(key),
valid(false) valid(false)
{ {
pthread_mutex_lock(&ltable->header_mut); rwlc_readlock(ltable->header_mut);
pthread_mutex_lock(&ltable->rb_mut);
ltable->registerIterator(this); ltable->registerIterator(this);
pthread_mutex_unlock(&ltable->rb_mut);
validate(); validate();
pthread_mutex_unlock(&ltable->header_mut); rwlc_unlock(ltable->header_mut);
} }
~iterator() { ~iterator() {
pthread_mutex_lock(&ltable->header_mut); rwlc_readlock(ltable->header_mut);
pthread_mutex_lock(&ltable->rb_mut);
ltable->forgetIterator(this); ltable->forgetIterator(this);
invalidate(); invalidate();
pthread_mutex_unlock(&ltable->rb_mut);
if(last_returned) TUPLE::freetuple(last_returned); if(last_returned) TUPLE::freetuple(last_returned);
pthread_mutex_unlock(&ltable->header_mut); rwlc_unlock(ltable->header_mut);
} }
private: private:
TUPLE * getnextHelper() { TUPLE * getnextHelper() {
@ -290,19 +297,19 @@ public:
} }
public: public:
TUPLE * getnextIncludingTombstones() { TUPLE * getnextIncludingTombstones() {
pthread_mutex_lock(&ltable->header_mut); rwlc_readlock(ltable->header_mut);
revalidate(); revalidate();
TUPLE * ret = getnextHelper(); TUPLE * ret = getnextHelper();
pthread_mutex_unlock(&ltable->header_mut); rwlc_readunlock(ltable->header_mut);
return ret ? ret->create_copy() : NULL; return ret ? ret->create_copy() : NULL;
} }
TUPLE * getnext() { TUPLE * getnext() {
pthread_mutex_lock(&ltable->header_mut); rwlc_readlock(ltable->header_mut);
revalidate(); revalidate();
TUPLE * ret; TUPLE * ret;
while((ret = getnextHelper()) && ret->isDelete()) { } // getNextHelper handles its own memory. while((ret = getnextHelper()) && ret->isDelete()) { } // getNextHelper handles its own memory.
pthread_mutex_unlock(&ltable->header_mut); rwlc_unlock(ltable->header_mut);
return ret ? ret->create_copy() : NULL; // XXX hate making copy! Caller should not manage our memory. return ret ? ret->create_copy() : NULL; // XXX hate making copy! Caller should not manage our memory.
} }
@ -364,7 +371,7 @@ public:
t = NULL; t = NULL;
} }
c0_it = new typename memTreeComponent<TUPLE>::revalidatingIterator(ltable->get_tree_c0(), NULL/*need something that is not &ltable->header_mut*/, t); c0_it = new typename memTreeComponent<TUPLE>::revalidatingIterator(ltable->get_tree_c0(), &ltable->rb_mut, t);
c0_mergeable_it[0] = new typename memTreeComponent<TUPLE>::iterator (ltable->get_tree_c0_mergeable(), t); c0_mergeable_it[0] = new typename memTreeComponent<TUPLE>::iterator (ltable->get_tree_c0_mergeable(), t);
disk_it[0] = ltable->get_tree_c1()->open_iterator(t); disk_it[0] = ltable->get_tree_c1()->open_iterator(t);
if(ltable->get_tree_c1_mergeable()) { if(ltable->get_tree_c1_mergeable()) {

View file

@ -154,7 +154,7 @@ void mergeManager::tick(mergeStats * s, bool block) {
} }
double_to_ts(&sleep_until, sleeptime + tv_to_double(&now)); double_to_ts(&sleep_until, sleeptime + tv_to_double(&now));
pthread_cond_timedwait(&dummy_throttle_cond, &ltable->header_mut, &sleep_until); rwlc_cond_timedwait(&dummy_throttle_cond, ltable->header_mut, &sleep_until);
gettimeofday(&now, 0); gettimeofday(&now, 0);
} }
} while((overshoot > 0) && (raw_overshoot > 0)); } while((overshoot > 0) && (raw_overshoot > 0));

View file

@ -145,7 +145,7 @@ void* memMergeThread(void*arg)
while(true) // 1 while(true) // 1
{ {
pthread_mutex_lock(&ltable->header_mut); rwlc_writelock(ltable->header_mut);
stats->new_merge(); stats->new_merge();
int done = 0; int done = 0;
// 2: wait for c0_mergable // 2: wait for c0_mergable
@ -160,7 +160,7 @@ void* memMergeThread(void*arg)
DEBUG("mmt:\twaiting for block ready cond\n"); DEBUG("mmt:\twaiting for block ready cond\n");
pthread_cond_wait(&ltable->c0_ready, &ltable->header_mut); rwlc_cond_wait(&ltable->c0_ready, ltable->header_mut);
DEBUG("mmt:\tblock ready\n"); DEBUG("mmt:\tblock ready\n");
@ -169,7 +169,7 @@ void* memMergeThread(void*arg)
if(done==1) if(done==1)
{ {
pthread_cond_signal(&ltable->c1_ready); // no block is ready. this allows the other thread to wake up, and see that we're shutting down. pthread_cond_signal(&ltable->c1_ready); // no block is ready. this allows the other thread to wake up, and see that we're shutting down.
pthread_mutex_unlock(&ltable->header_mut); rwlc_unlock(ltable->header_mut);
break; break;
} }
@ -189,7 +189,7 @@ void* memMergeThread(void*arg)
//create a new tree //create a new tree
diskTreeComponent * c1_prime = new diskTreeComponent(xid, ltable->internal_region_size, ltable->datapage_region_size, ltable->datapage_size, stats); diskTreeComponent * c1_prime = new diskTreeComponent(xid, ltable->internal_region_size, ltable->datapage_region_size, ltable->datapage_size, stats);
pthread_mutex_unlock(&ltable->header_mut); rwlc_unlock(ltable->header_mut);
//: do the merge //: do the merge
DEBUG("mmt:\tMerging:\n"); DEBUG("mmt:\tMerging:\n");
@ -207,7 +207,7 @@ void* memMergeThread(void*arg)
merge_count++; merge_count++;
DEBUG("mmt:\tmerge_count %lld #bytes written %lld\n", stats.merge_count, stats.output_size()); DEBUG("mmt:\tmerge_count %lld #bytes written %lld\n", stats.merge_count, stats.output_size());
pthread_mutex_lock(&ltable->header_mut); rwlc_writelock(ltable->header_mut);
// Immediately clean out c0 mergeable so that writers may continue. // Immediately clean out c0 mergeable so that writers may continue.
@ -246,7 +246,7 @@ void* memMergeThread(void*arg)
// XXX need to report backpressure here! // XXX need to report backpressure here!
while(ltable->get_tree_c1_mergeable()) { while(ltable->get_tree_c1_mergeable()) {
pthread_cond_wait(&ltable->c1_needed, &ltable->header_mut); rwlc_cond_wait(&ltable->c1_needed, ltable->header_mut);
} }
xid = Tbegin(); xid = Tbegin();
@ -270,7 +270,7 @@ void* memMergeThread(void*arg)
// DEBUG("mmt:\tUpdated C1's position on disk to %lld\n",ltable->get_tree_c1()->get_root_rec().page); // DEBUG("mmt:\tUpdated C1's position on disk to %lld\n",ltable->get_tree_c1()->get_root_rec().page);
// 13 // 13
pthread_mutex_unlock(&ltable->header_mut); rwlc_unlock(ltable->header_mut);
// stats->pretty_print(stdout); // stats->pretty_print(stdout);
//TODO: get the freeing outside of the lock //TODO: get the freeing outside of the lock
@ -298,7 +298,7 @@ void *diskMergeThread(void*arg)
{ {
// 2: wait for input // 2: wait for input
pthread_mutex_lock(&ltable->header_mut); rwlc_writelock(ltable->header_mut);
stats->new_merge(); stats->new_merge();
int done = 0; int done = 0;
// get a new input for merge // get a new input for merge
@ -313,13 +313,13 @@ void *diskMergeThread(void*arg)
DEBUG("dmt:\twaiting for block ready cond\n"); DEBUG("dmt:\twaiting for block ready cond\n");
pthread_cond_wait(&ltable->c1_ready, &ltable->header_mut); rwlc_cond_wait(&ltable->c1_ready, ltable->header_mut);
DEBUG("dmt:\tblock ready\n"); DEBUG("dmt:\tblock ready\n");
} }
if(done==1) if(done==1)
{ {
pthread_mutex_unlock(&ltable->header_mut); rwlc_unlock(ltable->header_mut);
break; break;
} }
@ -336,7 +336,7 @@ void *diskMergeThread(void*arg)
//create a new tree //create a new tree
diskTreeComponent * c2_prime = new diskTreeComponent(xid, ltable->internal_region_size, ltable->datapage_region_size, ltable->datapage_size, stats); diskTreeComponent * c2_prime = new diskTreeComponent(xid, ltable->internal_region_size, ltable->datapage_region_size, ltable->datapage_size, stats);
pthread_mutex_unlock(&ltable->header_mut); rwlc_unlock(ltable->header_mut);
//do the merge //do the merge
DEBUG("dmt:\tMerging:\n"); DEBUG("dmt:\tMerging:\n");
@ -351,7 +351,7 @@ void *diskMergeThread(void*arg)
// (skip 6, 7, 8, 8.5, 9)) // (skip 6, 7, 8, 8.5, 9))
pthread_mutex_lock(&ltable->header_mut); rwlc_writelock(ltable->header_mut);
//12 //12
ltable->get_tree_c2()->dealloc(xid); ltable->get_tree_c2()->dealloc(xid);
delete ltable->get_tree_c2(); delete ltable->get_tree_c2();
@ -383,7 +383,7 @@ void *diskMergeThread(void*arg)
stats->finished_merge(); stats->finished_merge();
pthread_mutex_unlock(&ltable->header_mut); rwlc_unlock(ltable->header_mut);
// stats->pretty_print(stdout); // stats->pretty_print(stdout);
} }
@ -403,25 +403,25 @@ void merge_iterators(int xid,
stasis_log_t * log = (stasis_log_t*)stasis_log(); stasis_log_t * log = (stasis_log_t*)stasis_log();
datatuple *t1 = itrA->next_callerFrees(); datatuple *t1 = itrA->next_callerFrees();
pthread_mutex_lock(&ltable->header_mut); // XXX slow rwlc_writelock(ltable->header_mut); // XXX slow
stats->read_tuple_from_large_component(t1); stats->read_tuple_from_large_component(t1);
pthread_mutex_unlock(&ltable->header_mut); // XXX slow rwlc_unlock(ltable->header_mut); // XXX slow
datatuple *t2 = 0; datatuple *t2 = 0;
int i = 0; int i = 0;
while( (t2=itrB->next_callerFrees()) != 0) while( (t2=itrB->next_callerFrees()) != 0)
{ {
pthread_mutex_lock(&ltable->header_mut); // XXX slow rwlc_writelock(ltable->header_mut); // XXX slow
stats->read_tuple_from_small_component(t2); stats->read_tuple_from_small_component(t2);
pthread_mutex_unlock(&ltable->header_mut); // XXX slow rwlc_unlock(ltable->header_mut); // XXX slow
DEBUG("tuple\t%lld: keylen %d datalen %d\n", DEBUG("tuple\t%lld: keylen %d datalen %d\n",
ntuples, *(t2->keylen),*(t2->datalen) ); ntuples, *(t2->keylen),*(t2->datalen) );
while(t1 != 0 && datatuple::compare(t1->key(), t1->keylen(), t2->key(), t2->keylen()) < 0) // t1 is less than t2 while(t1 != 0 && datatuple::compare(t1->key(), t1->keylen(), t2->key(), t2->keylen()) < 0) // t1 is less than t2
{ {
pthread_mutex_lock(&ltable->header_mut); // XXX slow rwlc_writelock(ltable->header_mut); // XXX slow
//insert t1 //insert t1
scratch_tree->insertTuple(xid, t1); scratch_tree->insertTuple(xid, t1);
i+=t1->byte_length(); if(i > 1000000) { if(forceMe) forceMe->force(xid); log->force_tail(log, LOG_FORCE_WAL); i = 0; } i+=t1->byte_length(); if(i > 1000000) { if(forceMe) forceMe->force(xid); log->force_tail(log, LOG_FORCE_WAL); i = 0; }
@ -432,13 +432,13 @@ void merge_iterators(int xid,
if(t1) { if(t1) {
stats->read_tuple_from_large_component(t1); stats->read_tuple_from_large_component(t1);
} }
pthread_mutex_unlock(&ltable->header_mut); // XXX slow rwlc_unlock(ltable->header_mut); // XXX slow
} }
if(t1 != 0 && datatuple::compare(t1->key(), t1->keylen(), t2->key(), t2->keylen()) == 0) if(t1 != 0 && datatuple::compare(t1->key(), t1->keylen(), t2->key(), t2->keylen()) == 0)
{ {
datatuple *mtuple = ltable->gettuplemerger()->merge(t1,t2); datatuple *mtuple = ltable->gettuplemerger()->merge(t1,t2);
pthread_mutex_lock(&ltable->header_mut); // XXX slow rwlc_writelock(ltable->header_mut); // XXX slow
stats->merged_tuples(mtuple, t2, t1); // this looks backwards, but is right. stats->merged_tuples(mtuple, t2, t1); // this looks backwards, but is right.
//insert merged tuple, drop deletes //insert merged tuple, drop deletes
@ -453,17 +453,17 @@ void merge_iterators(int xid,
stats->read_tuple_from_large_component(t1); stats->read_tuple_from_large_component(t1);
} }
datatuple::freetuple(mtuple); datatuple::freetuple(mtuple);
pthread_mutex_unlock(&ltable->header_mut); // XXX slow rwlc_unlock(ltable->header_mut); // XXX slow
} }
else else
{ {
//insert t2 //insert t2
scratch_tree->insertTuple(xid, t2); scratch_tree->insertTuple(xid, t2);
i+=t2->byte_length(); if(i > 1000000) { if(forceMe) forceMe->force(xid); log->force_tail(log, LOG_FORCE_WAL); i = 0; } i+=t2->byte_length(); if(i > 1000000) { if(forceMe) forceMe->force(xid); log->force_tail(log, LOG_FORCE_WAL); i = 0; }
pthread_mutex_lock(&ltable->header_mut); // XXX slow rwlc_writelock(ltable->header_mut); // XXX slow
stats->wrote_tuple(t2); stats->wrote_tuple(t2);
pthread_mutex_unlock(&ltable->header_mut); // XXX slow rwlc_unlock(ltable->header_mut); // XXX slow
// cannot free any tuples here; they may still be read through a lookup // cannot free any tuples here; they may still be read through a lookup
} }
@ -472,7 +472,7 @@ void merge_iterators(int xid,
} }
while(t1 != 0) {// t1 is less than t2 while(t1 != 0) {// t1 is less than t2
pthread_mutex_lock(&ltable->header_mut); // XXX slow rwlc_writelock(ltable->header_mut); // XXX slow
scratch_tree->insertTuple(xid, t1); scratch_tree->insertTuple(xid, t1);
stats->wrote_tuple(t1); stats->wrote_tuple(t1);
i += t1->byte_length(); if(i > 1000000) { if(forceMe) forceMe->force(xid); log->force_tail(log, LOG_FORCE_WAL); i = 0; } i += t1->byte_length(); if(i > 1000000) { if(forceMe) forceMe->force(xid); log->force_tail(log, LOG_FORCE_WAL); i = 0; }
@ -481,7 +481,7 @@ void merge_iterators(int xid,
//advance itrA //advance itrA
t1 = itrA->next_callerFrees(); t1 = itrA->next_callerFrees();
stats->read_tuple_from_large_component(t1); stats->read_tuple_from_large_component(t1);
pthread_mutex_unlock(&ltable->header_mut); // XXX slow rwlc_unlock(ltable->header_mut); // XXX slow
} }
DEBUG("dpages: %d\tnpages: %d\tntuples: %d\n", dpages, npages, ntuples); DEBUG("dpages: %d\tnpages: %d\tntuples: %d\n", dpages, npages, ntuples);