diff --git a/logserver.cpp b/logserver.cpp index 732d283..98f63e7 100644 --- a/logserver.cpp +++ b/logserver.cpp @@ -435,7 +435,7 @@ int op_insert(pthread_data* data, datatuple * tuple) { } int op_find(pthread_data* data, datatuple * tuple) { //find the tuple - datatuple *dt = data->ltable->findTuple(-1, tuple->key(), tuple->keylen()); + datatuple *dt = data->ltable->findTuple_first(-1, tuple->key(), tuple->keylen()); #ifdef STATS_ENABLED @@ -514,7 +514,7 @@ int op_stat_space_usage(pthread_data* data) { int xid = Tbegin(); - pthread_mutex_lock(&data->ltable->header_mut); + rwlc_readlock(data->ltable->header_mut); /* pageid_t datapage_c1_region_length, datapage_c1_mergeable_region_length = 0, datapage_c2_region_length; pageid_t datapage_c1_region_count, datapage_c1_mergeable_region_count = 0, datapage_c2_region_count; @@ -587,7 +587,7 @@ int op_stat_space_usage(pthread_data* data) { ; } while(TregionNextBoundaryTag(xid, &pid, &tag, 0/*all allocation managers*/)); - pthread_mutex_unlock(&data->ltable->header_mut); + rwlc_unlock(data->ltable->header_mut); Tcommit(xid); @@ -672,7 +672,7 @@ int op_dbg_blockmap(pthread_data* data) { // produce a list of stasis regions int xid = Tbegin(); - pthread_mutex_lock(&data->ltable->header_mut); + rwlc_readlock(data->ltable->header_mut); // produce a list of regions used by current tree components /* pageid_t datapage_c1_region_length, datapage_c1_mergeable_region_length = 0, datapage_c2_region_length; @@ -714,7 +714,7 @@ int op_dbg_blockmap(pthread_data* data) { &internal_c2_region_length, &internal_c2_region_count, &internal_c2_regions, &datapage_c2_region_length, &datapage_c2_region_count, &datapage_c2_regions); - pthread_mutex_unlock(&data->ltable->header_mut); + rwlc_unlock(data->ltable->header_mut); Tcommit(xid); diff --git a/logstore.cpp b/logstore.cpp index f41cc98..67965ed 100644 --- a/logstore.cpp +++ b/logstore.cpp @@ -35,7 +35,8 @@ logtable::logtable(pageid_t internal_region_size, pageid_t datapage_regio //tmerger = new tuplemerger(&append_merger); tmerger = new tuplemerger(&replace_merger); - pthread_mutex_init(&header_mut, 0); + header_mut = rwlc_initlock(); + pthread_mutex_init(&rb_mut, 0); pthread_cond_init(&c0_needed, 0); pthread_cond_init(&c0_ready, 0); pthread_cond_init(&c1_needed, 0); @@ -68,7 +69,8 @@ logtable::~logtable() memTreeComponent::tearDownTree(tree_c0); } - pthread_mutex_destroy(&header_mut); + pthread_mutex_destroy(&rb_mut); + rwlc_deletelock(header_mut); pthread_cond_destroy(&c0_needed); pthread_cond_destroy(&c0_ready); pthread_cond_destroy(&c1_needed); @@ -144,7 +146,7 @@ void logtable::flushTable() start = tv_to_double(start_tv); - pthread_mutex_lock(&header_mut); + rwlc_writelock(header_mut); int expmcount = merge_count; c0_stats->finished_merge(); @@ -155,10 +157,10 @@ void logtable::flushTable() bool blocked = false; while(get_tree_c0_mergeable()) { - pthread_cond_wait(&c0_needed, &header_mut); + rwlc_cond_wait(&c0_needed, header_mut); blocked = true; if(expmcount != merge_count) { - pthread_mutex_unlock(&header_mut); + rwlc_writeunlock(header_mut); return; } } @@ -181,7 +183,7 @@ void logtable::flushTable() tsize = 0; tree_bytes = 0; - pthread_mutex_unlock(&header_mut); + rwlc_writeunlock(header_mut); if(blocked && stop - start > 0.1) { if(first) @@ -207,7 +209,9 @@ datatuple * logtable::findTuple(int xid, const datatuple::key_t key, size //prepare a search tuple datatuple *search_tuple = datatuple::create(key, keySize); - pthread_mutex_lock(&header_mut); + rwlc_readlock(header_mut); + + pthread_mutex_lock(&rb_mut); datatuple *ret_tuple=0; @@ -219,6 +223,8 @@ datatuple * logtable::findTuple(int xid, const datatuple::key_t key, size ret_tuple = (*rbitr)->create_copy(); } + pthread_mutex_unlock(&rb_mut); + bool done = false; //step: 2 look into first in tree if exists (a first level merge going on) if(get_tree_c0_mergeable() != 0) @@ -245,7 +251,6 @@ datatuple * logtable::findTuple(int xid, const datatuple::key_t key, size } } - //TODO: Arrange to only hold read latches while hitting disk. //step 3: check c1 if(!done) @@ -335,7 +340,7 @@ datatuple * logtable::findTuple(int xid, const datatuple::key_t key, size } } - pthread_mutex_unlock(&header_mut); // XXX release this each time we could block on disk... + rwlc_unlock(header_mut); datatuple::freetuple(search_tuple); return ret_tuple; @@ -351,21 +356,28 @@ datatuple * logtable::findTuple_first(int xid, datatuple::key_t key, size //prepare a search tuple datatuple * search_tuple = datatuple::create(key, keySize); - pthread_mutex_lock(&header_mut); + rwlc_readlock(header_mut); datatuple *ret_tuple=0; //step 1: look in tree_c0 + pthread_mutex_lock(&rb_mut); + memTreeComponent::rbtree_t::iterator rbitr = get_tree_c0()->find(search_tuple); if(rbitr != get_tree_c0()->end()) { DEBUG("tree_c0 size %d\n", tree_c0->size()); ret_tuple = (*rbitr)->create_copy(); + + pthread_mutex_unlock(&rb_mut); } else { DEBUG("Not in mem tree %d\n", tree_c0->size()); + + pthread_mutex_unlock(&rb_mut); + //step: 2 look into first in tree if exists (a first level merge going on) if(get_tree_c0_mergeable() != NULL) { @@ -407,7 +419,7 @@ datatuple * logtable::findTuple_first(int xid, datatuple::key_t key, size } } - pthread_mutex_unlock(&header_mut); + rwlc_unlock(header_mut); datatuple::freetuple(search_tuple); return ret_tuple; @@ -417,8 +429,9 @@ datatuple * logtable::findTuple_first(int xid, datatuple::key_t key, size template void logtable::insertTuple(datatuple *tuple) { + rwlc_readlock(header_mut); //lock the red-black tree - pthread_mutex_lock(&header_mut); + pthread_mutex_lock(&rb_mut); c0_stats->read_tuple_from_small_component(tuple); //find the previous tuple with same key in the memtree if exists memTreeComponent::rbtree_t::iterator rbitr = tree_c0->find(tuple); @@ -455,15 +468,17 @@ void logtable::insertTuple(datatuple *tuple) c0_stats->wrote_tuple(t); + pthread_mutex_unlock(&rb_mut); + //flushing logic if(tree_bytes >= max_c0_size ) { DEBUG("tree size before merge %d tuples %lld bytes.\n", tsize, tree_bytes); - pthread_mutex_unlock(&header_mut); + rwlc_unlock(header_mut); flushTable(); } else { //unlock - pthread_mutex_unlock(&header_mut); + rwlc_unlock(header_mut); } DEBUG("tree size %d tuples %lld bytes.\n", tsize, tree_bytes); diff --git a/logstore.h b/logstore.h index 954756f..982c22b 100644 --- a/logstore.h +++ b/logstore.h @@ -100,7 +100,8 @@ public: }; logtable_mergedata * mergedata; - pthread_mutex_t header_mut; + rwlc * header_mut; + pthread_mutex_t rb_mut; int64_t max_c0_size; mergeManager * merge_mgr; @@ -251,10 +252,12 @@ public: last_returned(NULL), key(NULL), valid(false) { - pthread_mutex_lock(<able->header_mut); + rwlc_readlock(ltable->header_mut); + pthread_mutex_lock(<able->rb_mut); ltable->registerIterator(this); + pthread_mutex_unlock(<able->rb_mut); validate(); - pthread_mutex_unlock(<able->header_mut); + rwlc_unlock(ltable->header_mut); } explicit iterator(logtable* ltable,TUPLE *key) @@ -265,18 +268,22 @@ public: key(key), valid(false) { - pthread_mutex_lock(<able->header_mut); + rwlc_readlock(ltable->header_mut); + pthread_mutex_lock(<able->rb_mut); ltable->registerIterator(this); + pthread_mutex_unlock(<able->rb_mut); validate(); - pthread_mutex_unlock(<able->header_mut); + rwlc_unlock(ltable->header_mut); } ~iterator() { - pthread_mutex_lock(<able->header_mut); + rwlc_readlock(ltable->header_mut); + pthread_mutex_lock(<able->rb_mut); ltable->forgetIterator(this); invalidate(); + pthread_mutex_unlock(<able->rb_mut); if(last_returned) TUPLE::freetuple(last_returned); - pthread_mutex_unlock(<able->header_mut); + rwlc_unlock(ltable->header_mut); } private: TUPLE * getnextHelper() { @@ -290,19 +297,19 @@ public: } public: TUPLE * getnextIncludingTombstones() { - pthread_mutex_lock(<able->header_mut); + rwlc_readlock(ltable->header_mut); revalidate(); TUPLE * ret = getnextHelper(); - pthread_mutex_unlock(<able->header_mut); + rwlc_readunlock(ltable->header_mut); return ret ? ret->create_copy() : NULL; } TUPLE * getnext() { - pthread_mutex_lock(<able->header_mut); + rwlc_readlock(ltable->header_mut); revalidate(); TUPLE * ret; while((ret = getnextHelper()) && ret->isDelete()) { } // getNextHelper handles its own memory. - pthread_mutex_unlock(<able->header_mut); + rwlc_unlock(ltable->header_mut); return ret ? ret->create_copy() : NULL; // XXX hate making copy! Caller should not manage our memory. } @@ -364,7 +371,7 @@ public: t = NULL; } - c0_it = new typename memTreeComponent::revalidatingIterator(ltable->get_tree_c0(), NULL/*need something that is not <able->header_mut*/, t); + c0_it = new typename memTreeComponent::revalidatingIterator(ltable->get_tree_c0(), <able->rb_mut, t); c0_mergeable_it[0] = new typename memTreeComponent::iterator (ltable->get_tree_c0_mergeable(), t); disk_it[0] = ltable->get_tree_c1()->open_iterator(t); if(ltable->get_tree_c1_mergeable()) { diff --git a/mergeManager.cpp b/mergeManager.cpp index dc95ab5..063fb77 100644 --- a/mergeManager.cpp +++ b/mergeManager.cpp @@ -154,7 +154,7 @@ void mergeManager::tick(mergeStats * s, bool block) { } double_to_ts(&sleep_until, sleeptime + tv_to_double(&now)); - pthread_cond_timedwait(&dummy_throttle_cond, <able->header_mut, &sleep_until); + rwlc_cond_timedwait(&dummy_throttle_cond, ltable->header_mut, &sleep_until); gettimeofday(&now, 0); } } while((overshoot > 0) && (raw_overshoot > 0)); diff --git a/merger.cpp b/merger.cpp index 9b5d02a..95b423c 100644 --- a/merger.cpp +++ b/merger.cpp @@ -145,7 +145,7 @@ void* memMergeThread(void*arg) while(true) // 1 { - pthread_mutex_lock(<able->header_mut); + rwlc_writelock(ltable->header_mut); stats->new_merge(); int done = 0; // 2: wait for c0_mergable @@ -160,7 +160,7 @@ void* memMergeThread(void*arg) DEBUG("mmt:\twaiting for block ready cond\n"); - pthread_cond_wait(<able->c0_ready, <able->header_mut); + rwlc_cond_wait(<able->c0_ready, ltable->header_mut); DEBUG("mmt:\tblock ready\n"); @@ -169,7 +169,7 @@ void* memMergeThread(void*arg) if(done==1) { pthread_cond_signal(<able->c1_ready); // no block is ready. this allows the other thread to wake up, and see that we're shutting down. - pthread_mutex_unlock(<able->header_mut); + rwlc_unlock(ltable->header_mut); break; } @@ -189,7 +189,7 @@ void* memMergeThread(void*arg) //create a new tree diskTreeComponent * c1_prime = new diskTreeComponent(xid, ltable->internal_region_size, ltable->datapage_region_size, ltable->datapage_size, stats); - pthread_mutex_unlock(<able->header_mut); + rwlc_unlock(ltable->header_mut); //: do the merge DEBUG("mmt:\tMerging:\n"); @@ -207,7 +207,7 @@ void* memMergeThread(void*arg) merge_count++; DEBUG("mmt:\tmerge_count %lld #bytes written %lld\n", stats.merge_count, stats.output_size()); - pthread_mutex_lock(<able->header_mut); + rwlc_writelock(ltable->header_mut); // Immediately clean out c0 mergeable so that writers may continue. @@ -246,7 +246,7 @@ void* memMergeThread(void*arg) // XXX need to report backpressure here! while(ltable->get_tree_c1_mergeable()) { - pthread_cond_wait(<able->c1_needed, <able->header_mut); + rwlc_cond_wait(<able->c1_needed, ltable->header_mut); } xid = Tbegin(); @@ -270,7 +270,7 @@ void* memMergeThread(void*arg) // DEBUG("mmt:\tUpdated C1's position on disk to %lld\n",ltable->get_tree_c1()->get_root_rec().page); // 13 - pthread_mutex_unlock(<able->header_mut); + rwlc_unlock(ltable->header_mut); // stats->pretty_print(stdout); //TODO: get the freeing outside of the lock @@ -298,7 +298,7 @@ void *diskMergeThread(void*arg) { // 2: wait for input - pthread_mutex_lock(<able->header_mut); + rwlc_writelock(ltable->header_mut); stats->new_merge(); int done = 0; // get a new input for merge @@ -313,13 +313,13 @@ void *diskMergeThread(void*arg) DEBUG("dmt:\twaiting for block ready cond\n"); - pthread_cond_wait(<able->c1_ready, <able->header_mut); + rwlc_cond_wait(<able->c1_ready, ltable->header_mut); DEBUG("dmt:\tblock ready\n"); } if(done==1) { - pthread_mutex_unlock(<able->header_mut); + rwlc_unlock(ltable->header_mut); break; } @@ -336,7 +336,7 @@ void *diskMergeThread(void*arg) //create a new tree diskTreeComponent * c2_prime = new diskTreeComponent(xid, ltable->internal_region_size, ltable->datapage_region_size, ltable->datapage_size, stats); - pthread_mutex_unlock(<able->header_mut); + rwlc_unlock(ltable->header_mut); //do the merge DEBUG("dmt:\tMerging:\n"); @@ -351,7 +351,7 @@ void *diskMergeThread(void*arg) // (skip 6, 7, 8, 8.5, 9)) - pthread_mutex_lock(<able->header_mut); + rwlc_writelock(ltable->header_mut); //12 ltable->get_tree_c2()->dealloc(xid); delete ltable->get_tree_c2(); @@ -383,7 +383,7 @@ void *diskMergeThread(void*arg) stats->finished_merge(); - pthread_mutex_unlock(<able->header_mut); + rwlc_unlock(ltable->header_mut); // stats->pretty_print(stdout); } @@ -403,25 +403,25 @@ void merge_iterators(int xid, stasis_log_t * log = (stasis_log_t*)stasis_log(); datatuple *t1 = itrA->next_callerFrees(); - pthread_mutex_lock(<able->header_mut); // XXX slow + rwlc_writelock(ltable->header_mut); // XXX slow stats->read_tuple_from_large_component(t1); - pthread_mutex_unlock(<able->header_mut); // XXX slow + rwlc_unlock(ltable->header_mut); // XXX slow datatuple *t2 = 0; int i = 0; while( (t2=itrB->next_callerFrees()) != 0) { - pthread_mutex_lock(<able->header_mut); // XXX slow + rwlc_writelock(ltable->header_mut); // XXX slow stats->read_tuple_from_small_component(t2); - pthread_mutex_unlock(<able->header_mut); // XXX slow + rwlc_unlock(ltable->header_mut); // XXX slow DEBUG("tuple\t%lld: keylen %d datalen %d\n", ntuples, *(t2->keylen),*(t2->datalen) ); while(t1 != 0 && datatuple::compare(t1->key(), t1->keylen(), t2->key(), t2->keylen()) < 0) // t1 is less than t2 { - pthread_mutex_lock(<able->header_mut); // XXX slow + rwlc_writelock(ltable->header_mut); // XXX slow //insert t1 scratch_tree->insertTuple(xid, t1); i+=t1->byte_length(); if(i > 1000000) { if(forceMe) forceMe->force(xid); log->force_tail(log, LOG_FORCE_WAL); i = 0; } @@ -432,13 +432,13 @@ void merge_iterators(int xid, if(t1) { stats->read_tuple_from_large_component(t1); } - pthread_mutex_unlock(<able->header_mut); // XXX slow + rwlc_unlock(ltable->header_mut); // XXX slow } if(t1 != 0 && datatuple::compare(t1->key(), t1->keylen(), t2->key(), t2->keylen()) == 0) { datatuple *mtuple = ltable->gettuplemerger()->merge(t1,t2); - pthread_mutex_lock(<able->header_mut); // XXX slow + rwlc_writelock(ltable->header_mut); // XXX slow stats->merged_tuples(mtuple, t2, t1); // this looks backwards, but is right. //insert merged tuple, drop deletes @@ -453,17 +453,17 @@ void merge_iterators(int xid, stats->read_tuple_from_large_component(t1); } datatuple::freetuple(mtuple); - pthread_mutex_unlock(<able->header_mut); // XXX slow + rwlc_unlock(ltable->header_mut); // XXX slow } else { //insert t2 scratch_tree->insertTuple(xid, t2); i+=t2->byte_length(); if(i > 1000000) { if(forceMe) forceMe->force(xid); log->force_tail(log, LOG_FORCE_WAL); i = 0; } - pthread_mutex_lock(<able->header_mut); // XXX slow + rwlc_writelock(ltable->header_mut); // XXX slow stats->wrote_tuple(t2); - pthread_mutex_unlock(<able->header_mut); // XXX slow + rwlc_unlock(ltable->header_mut); // XXX slow // cannot free any tuples here; they may still be read through a lookup } @@ -472,7 +472,7 @@ void merge_iterators(int xid, } while(t1 != 0) {// t1 is less than t2 - pthread_mutex_lock(<able->header_mut); // XXX slow + rwlc_writelock(ltable->header_mut); // XXX slow scratch_tree->insertTuple(xid, t1); stats->wrote_tuple(t1); i += t1->byte_length(); if(i > 1000000) { if(forceMe) forceMe->force(xid); log->force_tail(log, LOG_FORCE_WAL); i = 0; } @@ -481,7 +481,7 @@ void merge_iterators(int xid, //advance itrA t1 = itrA->next_callerFrees(); stats->read_tuple_from_large_component(t1); - pthread_mutex_unlock(<able->header_mut); // XXX slow + rwlc_unlock(ltable->header_mut); // XXX slow } DEBUG("dpages: %d\tnpages: %d\tntuples: %d\n", dpages, npages, ntuples);