diff --git a/src/stasis/operations/lsmTree.c b/src/stasis/operations/lsmTree.c index 8cd6528..b7d91d0 100644 --- a/src/stasis/operations/lsmTree.c +++ b/src/stasis/operations/lsmTree.c @@ -636,35 +636,67 @@ void TlsmFree(int xid, recordid tree, lsm_page_deallocator_t dealloc, Tdealloc(xid, *(recordid*)allocator_state); } -static const recordid lsmLookup(int xid, Page *node, int depth, const byte *key, - size_t keySize, lsm_comparator_t cmp) { +static const recordid lsmLookup(int xid, Page *node, int depth, + const byte *key, size_t keySize, lsm_comparator_t cmp) { + if(*recordcount_ptr(node) == FIRST_SLOT) { return NULLRID; } assert(*recordcount_ptr(node) > FIRST_SLOT); - int match = FIRST_SLOT; - // don't need to compare w/ first item in tree. - const lsmTreeNodeRecord *rec = readNodeRecord(xid,node,FIRST_SLOT,keySize); + + const lsmTreeNodeRecord *prev = readNodeRecord(xid,node,FIRST_SLOT,keySize); + slotid_t prev_slot = FIRST_SLOT; + int prev_cmp_key = cmp(prev+1,key); + + // @todo binary search within each page for(int i = FIRST_SLOT+1; i < *recordcount_ptr(node); i++) { - rec = readNodeRecord(xid,node,i,keySize); - int cmpval = cmp(rec+1,key); - if(cmpval > 0) { - break; + const lsmTreeNodeRecord *rec = readNodeRecord(xid,node,i,keySize); + + int rec_cmp_key = cmp(rec+1,key); + + if(depth) { + + if(prev_cmp_key <= 0 && rec_cmp_key > 0) { + pageid_t child_id = prev->ptr; + Page *child_page = loadPage(xid, child_id); + readlock(child_page->rwlatch,0); + recordid ret = lsmLookup(xid,child_page,depth-1,key,keySize,cmp); + unlock(child_page->rwlatch); + releasePage(child_page); + return ret; + } + + } else { + // XXX Doesn't handle runs of duplicates. + if(prev_cmp_key <= 0 && rec_cmp_key > 0) { + recordid ret = {node->id, prev_slot, keySize}; + return ret; + } } - match = i; + prev = rec; + prev_slot = i; + prev_cmp_key = rec_cmp_key; + if(rec_cmp_key > 0) { break; } } + if(depth) { - pageid_t child_id = readNodeRecord(xid,node,match,keySize)->ptr; - Page* child_page = loadPage(xid, child_id); - readlock(child_page->rwlatch,0); - recordid ret = lsmLookup(xid,child_page,depth-1,key,keySize,cmp); - unlock(child_page->rwlatch); - releasePage(child_page); - return ret; + // this handles the rhs of the tree. + if(prev_cmp_key <= 0) { + pageid_t child_id = prev->ptr; + Page *child_page = loadPage(xid, child_id); + readlock(child_page->rwlatch,0); + recordid ret = lsmLookup(xid,child_page,depth-1,key,keySize,cmp); + unlock(child_page->rwlatch); + releasePage(child_page); + return ret; + } } else { - recordid ret = {node->id, match, keySize}; - return ret; + if(prev_cmp_key <= 0) { + recordid ret = {node->id, prev_slot, keySize}; + return ret; + } } + return NULLRID; } static pageid_t lsmLookupLeafPageFromRid(int xid, recordid rid, size_t keySize) { @@ -820,11 +852,6 @@ lladdIterator_t* lsmTreeIterator_openAt(int xid, recordid root, const byte* key) recordid lsm_entry_rid = lsmLookup(xid,p,depth,key,getKeySize(xid,p),comparators[cmp_nr->ptr]); - if(lsm_entry_rid.page == NULLRID.page && lsm_entry_rid.slot == NULLRID.slot) { - return 0; - } - assert(lsm_entry_rid.size != INVALID_SLOT); - if(root.page != lsm_entry_rid.page) { unlock(p->rwlatch); releasePage(p); diff --git a/stasis/operations/lsmTable.h b/stasis/operations/lsmTable.h index b8ad736..3c07327 100644 --- a/stasis/operations/lsmTable.h +++ b/stasis/operations/lsmTable.h @@ -107,15 +107,14 @@ namespace rose { // this is just a guessed value... it seems about right based on // experiments, but 450 bytes overhead per tuple is insane! static const int RB_TREE_OVERHEAD = 400; // = 450; - static pageid_t C0_MEM_SIZE = 1000 * 1000 * 1000; - // static const pageid_t C0_MEM_SIZE = 100 * 1000; + static const pageid_t MEM_SIZE = 1000 * 1000 * 1000; + // static const pageid_t MEM_SIZE = 100 * 1000; // How many pages should we try to fill with the first C1 merge? static int R = 10; // XXX set this as low as possible (for dynamic setting. = sqrt(C2 size / C0 size)) #ifdef THROTTLED static const pageid_t START_SIZE = 100; //10 * 1000; /*10 **/ //1000; // XXX 4 is fudge related to RB overhead. #else - Do not run this code - static const pageid_t START_SIZE = C0_MEM_SIZE * R /( PAGE_SIZE * 4); //10 * 1000; /*10 **/ //1000; // XXX 4 is fudge related to RB overhead. + static const pageid_t START_SIZE = MEM_SIZE * R /( PAGE_SIZE * 4); //10 * 1000; /*10 **/ //1000; // XXX 4 is fudge related to RB overhead. #endif // Lower total work by perfomrming one merge at higher level // for every FUDGE^2 merges at the immediately lower level. @@ -148,9 +147,8 @@ namespace rose { // loop around here to produce multiple batches for merge. gettimeofday(&start_push_tv,0); gettimeofday(&start_tv,0); - pthread_mutex_lock(a->block_ready_mut); - while(1) { + pthread_mutex_lock(a->block_ready_mut); int done = 0; @@ -166,6 +164,7 @@ namespace rose { *a->in_block_needed = false; if(done) { pthread_cond_signal(a->out_block_ready_cond); + pthread_mutex_unlock(a->block_ready_mut); break; } @@ -182,7 +181,7 @@ namespace rose { ITERB *tbEnd = tbBegin->end(); { // this { protects us from recalcitrant iterators below (tree iterators hold stasis page latches...) - ///XXX pthread_mutex_unlock(a->block_ready_mut); + pthread_mutex_unlock(a->block_ready_mut); Tcommit(xid); xid = Tbegin(); @@ -264,7 +263,7 @@ namespace rose { gettimeofday(&start_push_tv,0); - //XXX pthread_mutex_lock(a->block_ready_mut); + pthread_mutex_lock(a->block_ready_mut); // keep actual handle around so that it can be freed below. typename ITERB::handle old_in_tree = **a->in_tree; @@ -299,10 +298,10 @@ namespace rose { if(a->out_tree) { double frac_wasted = ((double)RB_TREE_OVERHEAD)/(double)(RB_TREE_OVERHEAD + PAGELAYOUT::FMT::TUP::sizeofBytes()); - target_R = sqrt(((double)(*a->out_tree_size+*a->my_tree_size)) / ((C0_MEM_SIZE*(1-frac_wasted))/(4096*ratio))); + target_R = sqrt(((double)(*a->out_tree_size+*a->my_tree_size)) / ((MEM_SIZE*(1-frac_wasted))/(4096*ratio))); printf("R_C2-C1 = %6.1f R_C1-C0 = %6.1f target = %6.1f\n", ((double)(*a->out_tree_size/*+*a->my_tree_size*/)) / ((double)*a->my_tree_size), - ((double)*a->my_tree_size) / ((double)(C0_MEM_SIZE*(1-frac_wasted))/(4096*ratio)),target_R); + ((double)*a->my_tree_size) / ((double)(MEM_SIZE*(1-frac_wasted))/(4096*ratio)),target_R); } #else if(a->out_tree_size) { @@ -370,11 +369,11 @@ namespace rose { assert(a->my_tree->r_.page != tree->r_.page); *a->my_tree = *tree; + pthread_mutex_unlock(a->block_ready_mut); + gettimeofday(&start_tv,0); } - pthread_mutex_unlock(a->block_ready_mut); - Tcommit(xid); return 0; @@ -582,7 +581,7 @@ namespace rose { ret->still_open, block0_size, block1_size, - (R * C0_MEM_SIZE) / (PAGE_SIZE * 4), // XXX 4 = estimated compression ratio + (R * MEM_SIZE) / (PAGE_SIZE * 4), // XXX 4 = estimated compression ratio R, //new typename LSM_ITER::treeIteratorHandle(NULLRID), block0_scratch, @@ -666,30 +665,25 @@ namespace rose { assert(*((char*)t.get(i)) || *((char*)t.get(i))+1); } */ - pthread_mutex_lock(h->mut); //XXX - h->scratch_tree->insert(t); uint64_t handleBytes = h->scratch_tree->size() * (RB_TREE_OVERHEAD + PAGELAYOUT::FMT::TUP::sizeofBytes()); //XXX 4 = estimated compression ratio. uint64_t inputSizeThresh = (4 * PAGE_SIZE * *h->input_size); // / (PAGELAYOUT::FMT::TUP::sizeofBytes()); - uint64_t memSizeThresh = C0_MEM_SIZE; + uint64_t memSizeThresh = MEM_SIZE; #ifdef INFINITE_RESOURCES static const int LATCH_INTERVAL = 10000; static int count = LATCH_INTERVAL; /// XXX HACK bool go = false; if(!count) { - ///XXX pthread_mutex_lock(h->mut); + pthread_mutex_lock(h->mut); go = *h->input_needed; - ///XXX pthread_mutex_unlock(h->mut); + pthread_mutex_unlock(h->mut); count = LATCH_INTERVAL; } count --; #endif - - pthread_mutex_unlock(h->mut); - if( (handleBytes > memSizeThresh / 2) && ( #ifdef INFINITE_RESOURCES go || @@ -847,7 +841,6 @@ namespace rose { void** TlsmTableFindGTE(int xid, lsmTableHandle *h, typename PAGELAYOUT::FMT::TUP &val) { - pthread_mutex_lock(h->mut); // typedef stlSetIterator - void - TlsmTableFindGTEDone(lsmTableHandle *h) { - pthread_mutex_unlock(h->mut); - } template const typename PAGELAYOUT::FMT::TUP * TlsmTableFind(int xid, lsmTableHandle *h,