From 549f97d297008213c5c56669d0430d4195b52695 Mon Sep 17 00:00:00 2001 From: Sears Russell Date: Wed, 26 Nov 2008 07:14:23 +0000 Subject: [PATCH] rewrote mergeThread, added ability to start iterators mid lsmTable --- stasis/operations/lsmTable.h | 386 ++++++++++++++++------------------- 1 file changed, 171 insertions(+), 215 deletions(-) diff --git a/stasis/operations/lsmTable.h b/stasis/operations/lsmTable.h index 3c07327..3215461 100644 --- a/stasis/operations/lsmTable.h +++ b/stasis/operations/lsmTable.h @@ -71,23 +71,23 @@ namespace rose { pageCount++; Page *p = loadPage(xid, next_page); + writelock(p->rwlatch,0); stasis_page_cleanup(p); typename PAGELAYOUT::FMT * mc = PAGELAYOUT::initPage(p, &**begin); for(ITER i(*begin); i != *end; ++i) { - /* for(int c = 0; c < (*i).column_count(); c++) { - assert(*(byte*)(*i).get(c) || !*(byte*)(*i).get(c)); - } */ + rose::slot_index_t ret = mc->append(xid, *i); if(ret == rose::NOSPACE) { dirtyPages_add(p); - // p->dirty = 1; mc->pack(); + unlock(p->rwlatch); releasePage(p); next_page = pageAlloc(xid,pageAllocState); TlsmAppendPage(xid,tree,(*i).toByteArray(),pageAlloc,pageAllocState,next_page); p = loadPage(xid, next_page); + writelock(p->rwlatch,0); mc = PAGELAYOUT::initPage(p, &*i); pageCount++; ret = mc->append(xid, *i); @@ -96,8 +96,8 @@ namespace rose { (*inserted)++; } dirtyPages_add(p); - // p->dirty = 1; mc->pack(); + unlock(p->rwlatch); releasePage(p); return pageCount; } @@ -107,28 +107,33 @@ namespace rose { // this is just a guessed value... it seems about right based on // experiments, but 450 bytes overhead per tuple is insane! static const int RB_TREE_OVERHEAD = 400; // = 450; - static const pageid_t MEM_SIZE = 1000 * 1000 * 1000; - // static const pageid_t MEM_SIZE = 100 * 1000; + static pageid_t C0_MEM_SIZE = 1000 * 1000 * 1000; + // How many pages should we try to fill with the first C1 merge? static int R = 10; // XXX set this as low as possible (for dynamic setting. = sqrt(C2 size / C0 size)) #ifdef THROTTLED static const pageid_t START_SIZE = 100; //10 * 1000; /*10 **/ //1000; // XXX 4 is fudge related to RB overhead. #else - static const pageid_t START_SIZE = MEM_SIZE * R /( PAGE_SIZE * 4); //10 * 1000; /*10 **/ //1000; // XXX 4 is fudge related to RB overhead. + Do not run this code + static const pageid_t START_SIZE = C0_MEM_SIZE * R /( PAGE_SIZE * 4); //10 * 1000; /*10 **/ //1000; // XXX 4 is fudge related to RB overhead. #endif // Lower total work by perfomrming one merge at higher level // for every FUDGE^2 merges at the immediately lower level. // (Constrast to R, which controls the ratio of sizes of the trees.) static const int FUDGE = 1; - - /** ITERA is an iterator over the data structure that mergeThread creates (a lsm tree iterator). ITERB is an iterator over the data structures that mergeThread takes as input (lsm tree, or rb tree..) */ template void* mergeThread(void* arg) { + + typedef typename PAGELAYOUT::FMT::TUP TUP; + typedef mergeIterator MERGE_ITER; + typedef gcIterator GC_ITER; + typedef stlSetIterator, TUP> STL_ITER; + // The ITER argument of a is unused (we don't look at it's begin or end fields...) merge_args * a = (merge_args*)arg; struct timeval start_tv, start_push_tv, wait_tv, stop_tv; @@ -139,19 +144,19 @@ namespace rose { // XXX hardcodes ITERA's type: // We assume that the caller set pageAllocState for us; oldPageAllocState // shouldn't be set (it should be NULLRID) - typename ITERA::handle tree - = new typename ITERA::treeIteratorHandle( - TlsmCreate(xid, PAGELAYOUT::cmp_id(),a->pageAlloc, - a->pageAllocState,PAGELAYOUT::FMT::TUP::sizeofBytes()) ); + assert(a->my_tree->r_.size != -1); // loop around here to produce multiple batches for merge. gettimeofday(&start_push_tv,0); gettimeofday(&start_tv,0); + pthread_mutex_lock(a->block_ready_mut); + while(1) { - pthread_mutex_lock(a->block_ready_mut); int done = 0; + // get a new input for merge + while(!*(a->in_tree)) { *a->in_block_needed = true; pthread_cond_signal(a->in_block_needed_cond); @@ -164,216 +169,180 @@ namespace rose { *a->in_block_needed = false; if(done) { pthread_cond_signal(a->out_block_ready_cond); - pthread_mutex_unlock(a->block_ready_mut); break; } gettimeofday(&wait_tv,0); + epoch_t current_timestamp = a->last_complete_xact ? *(a->last_complete_xact) : 0; uint64_t insertedTuples; pageid_t mergedPages; - ITERA *taBegin = new ITERA(tree); + + assert(a->my_tree->r_.size != -1); + + ITERA *taBegin = new ITERA(a->my_tree); ITERB *tbBegin = new ITERB(**a->in_tree); ITERA *taEnd = taBegin->end(); ITERB *tbEnd = tbBegin->end(); - { // this { protects us from recalcitrant iterators below (tree iterators hold stasis page latches...) - - pthread_mutex_unlock(a->block_ready_mut); Tcommit(xid); xid = Tbegin(); - // XXX hardcodes allocator type. - if(((recordid*)a->oldAllocState)->size != -1) { - // free the tree that we merged against during the last round. - TlsmFree(xid,tree->r_,TlsmRegionDeallocRid,a->oldAllocState); - } - // we're merging against old alloc state this round. - *(recordid*)(a->oldAllocState) = *(recordid*)(a->pageAllocState); - // we're merging into pagealloc state. - *(recordid*)(a->pageAllocState) = Talloc(xid, sizeof(TlsmRegionAllocConf_t)); - Tset(xid, *(recordid*)(a->pageAllocState), - &LSM_REGION_ALLOC_STATIC_INITIALIZER); - tree->r_ = TlsmCreate(xid, PAGELAYOUT::cmp_id(),a->pageAlloc, - a->pageAllocState,PAGELAYOUT::FMT::TUP::sizeofBytes()); - mergeIterator - mBegin(*taBegin, *tbBegin, *taEnd, *tbEnd); + recordid * scratchAllocState = (recordid*)malloc(sizeof(recordid)); + *scratchAllocState = Talloc(xid, sizeof(TlsmRegionAllocConf_t)); + Tset(xid, *scratchAllocState, &LSM_REGION_ALLOC_STATIC_INITIALIZER); - mergeIterator - mEnd(*taBegin, *tbBegin, *taEnd, *tbEnd); + typename ITERA::handle scratch_tree + = new typename ITERA::treeIteratorHandle( + TlsmCreate(xid, PAGELAYOUT::cmp_id(),a->pageAlloc, + scratchAllocState,TUP::sizeofBytes()) ); + // XXX + pthread_mutex_unlock(a->block_ready_mut); - mEnd.seekEnd(); + { // this { allows us to explicitly free the iterators mid-function - /* versioningIterator, - typename PAGELAYOUT::FMT::TUP> vBegin(mBegin,mEnd,0); + MERGE_ITER mBegin(*taBegin, *tbBegin, *taEnd, *tbEnd); + MERGE_ITER mEnd(*taBegin, *tbBegin, *taEnd, *tbEnd); - versioningIterator, - typename PAGELAYOUT::FMT::TUP> vEnd(mBegin,mEnd,0); + mEnd.seekEnd(); - vEnd.seekEnd(); + GC_ITER gcBegin(&mBegin, &mEnd, current_timestamp, a->ts_col); + GC_ITER gcEnd; - mergedPages = compressData - , typename PAGELAYOUT::FMT::TUP> > - (xid, &vBegin, &vEnd,tree->r_,a->pageAlloc,a->pageAllocState,&insertedTuples); */ - - /*mergedPages = compressData - > - (xid, &mBegin, &mEnd,tree->r_,a->pageAlloc,a->pageAllocState,&insertedTuples); */ - - gcIterator > gcBegin(&mBegin, &mEnd, current_timestamp, a->ts_col); - gcIterator > gcEnd; - - mergedPages = compressData - > > - (xid, &gcBegin, &gcEnd,tree->r_,a->pageAlloc,a->pageAllocState,&insertedTuples); - - // these tree iterators keep pages pinned! Don't call force until they've been deleted, or we'll deadlock. + mergedPages = compressData + (xid, &gcBegin, &gcEnd,scratch_tree->r_,a->pageAlloc,scratchAllocState,&insertedTuples); + // tree iterators are pinning pages we want to force, so free them. } // free all the stack allocated iterators... delete taBegin; delete tbBegin; delete taEnd; delete tbEnd; // XXX hardcodes tree type. - TlsmForce(xid,tree->r_,TlsmRegionForceRid,a->pageAllocState); + TlsmForce(xid,scratch_tree->r_,TlsmRegionForceRid,scratchAllocState); + + //XXX + pthread_mutex_lock(a->block_ready_mut); gettimeofday(&stop_tv,0); merge_count++; - - double wait_elapsed = tv_to_double(wait_tv) - tv_to_double(start_tv); - double work_elapsed = tv_to_double(stop_tv) - tv_to_double(wait_tv); - double push_elapsed = tv_to_double(start_tv) - tv_to_double(start_push_tv); - double total_elapsed = wait_elapsed + work_elapsed; double ratio = ((double)(insertedTuples * (uint64_t)PAGELAYOUT::FMT::TUP::sizeofBytes())) - / (double)(PAGE_SIZE * mergedPages); - double throughput = ((double)(insertedTuples * (uint64_t)PAGELAYOUT::FMT::TUP::sizeofBytes())) - / (1024.0 * 1024.0 * total_elapsed); - - printf("worker %d merge # %-6d: comp ratio: %-9.3f stalled %6.1f sec backpressure %6.1f " - "worked %6.1f sec inserts %-12ld (%9.3f mb/s) %6lld pages (need %6lld)\n", a->worker_id, merge_count, ratio, - wait_elapsed, push_elapsed, work_elapsed,(unsigned long)insertedTuples, throughput, mergedPages, !a->out_tree_size ? -1 : (FUDGE * *a->out_tree_size / a->r_i)); + / (double)(PAGE_SIZE * mergedPages); + { // print timing statistics + double wait_elapsed = tv_to_double(wait_tv) - tv_to_double(start_tv); + double work_elapsed = tv_to_double(stop_tv) - tv_to_double(wait_tv); + double push_elapsed = tv_to_double(start_tv) - tv_to_double(start_push_tv); + double total_elapsed = wait_elapsed + work_elapsed; + double throughput = ((double)(insertedTuples + * (uint64_t)PAGELAYOUT::FMT::TUP::sizeofBytes())) + / (1024.0 * 1024.0 * total_elapsed); + printf("worker %d merge # %-6d: comp ratio: %-9.3f stalled %6.1f sec backpressure %6.1f " + "worked %6.1f sec inserts %-12ld (%9.3f mb/s) %6lld pages (need %6lld)\n", + a->worker_id, merge_count, ratio, + wait_elapsed, push_elapsed, work_elapsed,(unsigned long)insertedTuples, + throughput, mergedPages, + !a->out_tree_size ? -1 + : (FUDGE * *a->out_tree_size / a->r_i)); + } gettimeofday(&start_push_tv,0); - pthread_mutex_lock(a->block_ready_mut); + *a->my_tree_size = mergedPages; - // keep actual handle around so that it can be freed below. - typename ITERB::handle old_in_tree = **a->in_tree; - if(a->in_tree_allocer) { - // TlsmFree(xid, ((typename ITERB::handle)old_in_tree)->r_,TlsmRegionDeallocRid,a->in_tree_allocer); - // XXX kludge; assumes C1 and C2 have same type of handle.... - TlsmFree(xid, ((typename ITERA::handle)old_in_tree)->r_,TlsmRegionDeallocRid,a->in_tree_allocer); - delete old_in_tree; - } else { - ((typename stlSetIterator, - typename PAGELAYOUT::FMT::TUP>::handle)old_in_tree)->clear(); - delete - ((typename stlSetIterator, - typename PAGELAYOUT::FMT::TUP>::handle)old_in_tree); + // always free in tree and old my tree + + // free old my_tree here + TlsmFree(xid,a->my_tree->r_,TlsmRegionDeallocRid,a->pageAllocState); + + if(a->out_tree) { + double frac_wasted = + ((double)RB_TREE_OVERHEAD) + / (double)(RB_TREE_OVERHEAD + TUP::sizeofBytes()); + + double target_R = sqrt(((double)(*a->out_tree_size+*a->my_tree_size)) + / ((C0_MEM_SIZE*(1-frac_wasted))/(4096*ratio))); + + printf("R_C2-C1 = %6.1f R_C1-C0 = %6.1f target = %6.1f\n", + ((double)(*a->out_tree_size/*+*a->my_tree_size*/)) / ((double)*a->my_tree_size), + ((double)*a->my_tree_size) / ((double)(C0_MEM_SIZE*(1-frac_wasted))/(4096*ratio)), + target_R); + + if(((double)*a->out_tree_size / ((double)*a->my_tree_size) < target_R) + || (a->max_size && mergedPages > a->max_size )) { + + // XXX need to report backpressure here! + while(*a->out_tree) { // we probably don't need the "while..." + pthread_cond_wait(a->out_block_needed_cond, a->block_ready_mut); + } + + *a->out_tree = (typeof(*a->out_tree))malloc(sizeof(**a->out_tree)); + **a->out_tree = new typename ITERA::treeIteratorHandle(scratch_tree->r_); + *(recordid*)(a->out_tree_allocer) = *scratchAllocState; + + pthread_cond_signal(a->out_block_ready_cond); + + // This is a bit wasteful; allocate a new empty tree to merge against. + // We don't want to ever look at the one we just handed upstream... + // We could wait for an in tree to be ready, and then pretend + // that we've just finished merging against it (to avoid all + // those merging comparisons, and a table scan...) + + // old alloc state contains the tree that we used as input for this merge... + // we can still free it below + + // create a new allocator. + *(recordid*)(a->pageAllocState) + = Talloc(xid, sizeof(TlsmRegionAllocConf_t)); + + Tset(xid, *(recordid*)(a->pageAllocState), + &LSM_REGION_ALLOC_STATIC_INITIALIZER); + + a->my_tree->r_ = TlsmCreate(xid, PAGELAYOUT::cmp_id(),a->pageAlloc, + a->pageAllocState,TUP::sizeofBytes()); + + } else { + // there is an out tree, but we don't want to push updates to it yet + // replace my_tree with output of merge + *(recordid*)a->pageAllocState = *scratchAllocState; + free(scratchAllocState); + *a->my_tree = *scratch_tree; + } + } else { // ! a->out_tree + *(recordid*)a->pageAllocState = *scratchAllocState; + free(scratchAllocState); + *a->my_tree = *scratch_tree; } - free(*a->in_tree); // free pointer to handle + //// ----------- Free in_tree - // XXX should we delay this to this point? - // otherwise, the contents of in_tree become temporarily unavailable to observers. + if(a->in_tree_allocer) { // is in tree an lsm tree or a rb tree? + // @todo don't assume C1 and C2 have same type of handle.... + TlsmFree(xid, + (**(typename ITERA::handle**)a->in_tree)->r_, + TlsmRegionDeallocRid,a->in_tree_allocer); + } else { + (**(typename STL_ITER::handle**)a->in_tree)->clear(); + } + delete **a->in_tree; + free (*a->in_tree); *a->in_tree = 0; // tell producer that the slot is now open + // todo: do this above the frees by copying state. + // then we wouldn't need to hold the mutex while freeing regions, etc. + // don't set in_block_needed = true; we're not blocked yet. pthread_cond_signal(a->in_block_needed_cond); - - -#ifdef INFINITE_RESOURCES - *a->my_tree_size = mergedPages; - double target_R = 0; - if(a->out_tree) { - double frac_wasted = ((double)RB_TREE_OVERHEAD)/(double)(RB_TREE_OVERHEAD + PAGELAYOUT::FMT::TUP::sizeofBytes()); - - target_R = sqrt(((double)(*a->out_tree_size+*a->my_tree_size)) / ((MEM_SIZE*(1-frac_wasted))/(4096*ratio))); - printf("R_C2-C1 = %6.1f R_C1-C0 = %6.1f target = %6.1f\n", - ((double)(*a->out_tree_size/*+*a->my_tree_size*/)) / ((double)*a->my_tree_size), - ((double)*a->my_tree_size) / ((double)(MEM_SIZE*(1-frac_wasted))/(4096*ratio)),target_R); - } -#else - if(a->out_tree_size) { - *a->my_tree_size = *a->out_tree_size / (a->r_i * FUDGE); - } else { - if(*a->my_tree_size < mergedPages) { - *a->my_tree_size = mergedPages; - } - } -#endif - if(a->out_tree && // is there a upstream merger? (note the lack of the * on a->out_tree) - ( - ( -#ifdef INFINITE_RESOURCES -#ifndef THROTTLED - (*a->out_block_needed) -#endif -#ifdef THROTTLED - ((double)*a->out_tree_size / ((double)*a->my_tree_size) < target_R) -#endif -#else - mergedPages > (FUDGE * *a->out_tree_size / a->r_i) // do we have enough data to bother it? -#endif - ) - || - (a->max_size && mergedPages > a->max_size ) - )) { - - // XXX need to report backpressure here! - - while(*a->out_tree) { // we probably don't need the "while..." - pthread_cond_wait(a->out_block_needed_cond, a->block_ready_mut); - } -#ifdef INFINITE_RESOURCES - // printf("pushing tree R_eff = %6.1f target = %6.1f\n", ((double)*a->out_tree_size) / ((double)*a->my_tree_size), target_R); -#endif - // XXX C++? Objects? Constructors? Who needs them? - *a->out_tree = (typeof(*a->out_tree))malloc(sizeof(**a->out_tree)); - **a->out_tree = new typename ITERA::treeIteratorHandle(tree->r_); - pthread_cond_signal(a->out_block_ready_cond); - - // This is a bit wasteful; allocate a new empty tree to merge against. - // We don't want to ever look at the one we just handed upstream... - // We could wait for an in tree to be ready, and then pretend - // that we've just finished merging against it (to avoid all - // those merging comparisons, and a table scan...) - - // old alloc state contains the tree that we used as input for this merge... we can still free it - - *(recordid*)(a->out_tree_allocer) = *(recordid*)(a->pageAllocState); - *(recordid*)(a->pageAllocState) = NULLRID; - - // create a new allocator. - *(recordid*)(a->pageAllocState) = Talloc(xid, sizeof(TlsmRegionAllocConf_t)); - Tset(xid, *(recordid*)(a->pageAllocState), - &LSM_REGION_ALLOC_STATIC_INITIALIZER); - - tree->r_ = TlsmCreate(xid, PAGELAYOUT::cmp_id(),a->pageAlloc, - a->pageAllocState,PAGELAYOUT::FMT::TUP::sizeofBytes()); - - } - - // XXX TlsmFree(xid,*a->tree); - - assert(a->my_tree->r_.page != tree->r_.page); - *a->my_tree = *tree; - - pthread_mutex_unlock(a->block_ready_mut); - gettimeofday(&start_tv,0); } + pthread_mutex_unlock(a->block_ready_mut); + Tcommit(xid); return 0; @@ -407,8 +376,6 @@ namespace rose { h.mediumTree = TlsmCreate(xid, PAGELAYOUT::cmp_id(), TlsmRegionAllocRid,&h.mediumTreeAllocState, PAGELAYOUT::FMT::TUP::sizeofBytes()); - //XXX epoch_t beginning = 0; - //XXX epoch_t end = 0; Tset(xid, ret, &h); return ret; } @@ -534,8 +501,8 @@ namespace rose { { 1, TlsmRegionAllocRid, - ridp, - oldridp, + ridp, // XXX should be renamed to my_tree_alloc_state + oldridp, // XXX no longer needed?!? block_ready_mut, block1_needed_cond, block1_needed, @@ -552,7 +519,7 @@ namespace rose { allocer_scratch, 0, 0, - new typename LSM_ITER::treeIteratorHandle(NULLRID), + new typename LSM_ITER::treeIteratorHandle(h.bigTree), // my_tree &(ret->last_xact), ts_col }; @@ -569,8 +536,8 @@ namespace rose { { 2, TlsmRegionAllocRid, - ridp, - oldridp, + ridp, + oldridp, block_ready_mut, block0_needed_cond, block0_needed, @@ -581,14 +548,13 @@ namespace rose { ret->still_open, block0_size, block1_size, - (R * MEM_SIZE) / (PAGE_SIZE * 4), // XXX 4 = estimated compression ratio + (R * C0_MEM_SIZE) / (PAGE_SIZE * 4), // XXX 4 = estimated compression ratio R, - //new typename LSM_ITER::treeIteratorHandle(NULLRID), block0_scratch, 0, block1_scratch, allocer_scratch, - new typename LSM_ITER::treeIteratorHandle(NULLRID), + new typename LSM_ITER::treeIteratorHandle(h.mediumTree), 0, ts_col }; @@ -600,7 +566,8 @@ namespace rose { return ret; } - // XXX this does not force the table to disk... it simply forces everything out of the in-memory tree. + // XXX this does not force the table to disk... + // it simply forces everything out of the in-memory tree. template void TlsmTableFlush(lsmTableHandle *h) { @@ -661,36 +628,31 @@ namespace rose { template void TlsmTableInsert( lsmTableHandle *h, typename PAGELAYOUT::FMT::TUP &t) { - /* for(int i = 0; i < t.column_count(); i++) { // This helps valgrind warn earlier... - assert(*((char*)t.get(i)) || *((char*)t.get(i))+1); - } */ + + pthread_mutex_lock(h->mut); //XXX h->scratch_tree->insert(t); uint64_t handleBytes = h->scratch_tree->size() * (RB_TREE_OVERHEAD + PAGELAYOUT::FMT::TUP::sizeofBytes()); //XXX 4 = estimated compression ratio. - uint64_t inputSizeThresh = (4 * PAGE_SIZE * *h->input_size); // / (PAGELAYOUT::FMT::TUP::sizeofBytes()); - uint64_t memSizeThresh = MEM_SIZE; + uint64_t inputSizeThresh = (4 * PAGE_SIZE * *h->input_size); + uint64_t memSizeThresh = C0_MEM_SIZE; -#ifdef INFINITE_RESOURCES static const int LATCH_INTERVAL = 10000; static int count = LATCH_INTERVAL; /// XXX HACK bool go = false; if(!count) { - pthread_mutex_lock(h->mut); + ///XXX pthread_mutex_lock(h->mut); go = *h->input_needed; - pthread_mutex_unlock(h->mut); + ///XXX pthread_mutex_unlock(h->mut); count = LATCH_INTERVAL; } count --; -#endif - if( (handleBytes > memSizeThresh / 2) && ( -#ifdef INFINITE_RESOURCES - go || -#else - handleBytes > inputSizeThresh || -#endif - handleBytes > memSizeThresh ) ) { // XXX ok? + + pthread_mutex_unlock(h->mut); + + if( (handleBytes > memSizeThresh / 2) + && ( go || handleBytes > memSizeThresh ) ) { // XXX ok? printf("Handle mbytes %lld (%lld) Input size: %lld input size thresh: %lld mbytes mem size thresh: %lld\n", (long long) handleBytes / (1024*1024), (long long) h->scratch_tree->size(), (long long) *h->input_size, (long long) inputSizeThresh / (1024*1024), (long long) memSizeThresh / (1024*1024)); @@ -727,20 +689,9 @@ namespace rose { template int TlsmTableCount(int xid, lsmTableHandle *h) { - /* treeIterator it1(NULLRID); - treeIterator it2(NULLRID; - stlSetIterator, - typename PAGELAYOUT::FMT::TUP::stl_cmp>, - typename PAGELAYOUT::FMT::TUP> it3(???);*/ - - // typename std::set LSM_ITER; - /* taypedef stlSetIterator, - typename PAGELAYOUT::FMT::TUP> RB_ITER; */ + + typedef treeIterator LSM_ITER; typedef std::_Rb_tree_const_iterator RB_ITER; - // typedef mergeIterator INNER_MERGE; typedef mergeIterator LSM_LSM ; typedef mergeIterator RB_RB ; typedef mergeIterator LSM_M_LSM_LSM; @@ -841,8 +792,8 @@ namespace rose { void** TlsmTableFindGTE(int xid, lsmTableHandle *h, typename PAGELAYOUT::FMT::TUP &val) { + pthread_mutex_lock(h->mut); - // typedef stlSetIterator, typename PAGELAYOUT::FMT::TUP> RB_ITER; @@ -880,6 +831,11 @@ namespace rose { return ret; } + template + void + TlsmTableFindGTEDone(lsmTableHandle *h) { + pthread_mutex_unlock(h->mut); + } template const typename PAGELAYOUT::FMT::TUP * TlsmTableFind(int xid, lsmTableHandle *h,