Fix two problems in lsmTree:

- rewrite lookup() to be simpler, and fixed a few bugs
 - was keeping node records around after pages were released
This commit is contained in:
Sears Russell 2008-11-26 07:01:43 +00:00
parent 9012bef5fc
commit 6a5e7f5589
2 changed files with 38 additions and 59 deletions

View file

@ -216,6 +216,7 @@ const lsmTreeNodeRecord* readNodeRecordFixed(int xid, Page *const p, int slot,
static inline static inline
lsmTreeNodeRecord* readNodeRecordVirtualMethods(int xid, Page * p, lsmTreeNodeRecord* readNodeRecordVirtualMethods(int xid, Page * p,
int slot, int keylen) { int slot, int keylen) {
abort(); // untested + ret is never initialized...
lsmTreeNodeRecord *ret; lsmTreeNodeRecord *ret;
recordid rid = {p->id, slot, sizeof(lsmTreeNodeRecord)}; recordid rid = {p->id, slot, sizeof(lsmTreeNodeRecord)};
@ -415,6 +416,7 @@ static recordid appendInternalNode(int xid, Page *p,
assert(slot >= FIRST_SLOT); // there should be no empty nodes assert(slot >= FIRST_SLOT); // there should be no empty nodes
const lsmTreeNodeRecord *nr = readNodeRecord(xid, p, slot, key_len); const lsmTreeNodeRecord *nr = readNodeRecord(xid, p, slot, key_len);
pageid_t child_id = nr->ptr; pageid_t child_id = nr->ptr;
nr = 0;
recordid ret; recordid ret;
{ {
Page *child_page = loadPage(xid, child_id); Page *child_page = loadPage(xid, child_id);
@ -636,67 +638,35 @@ void TlsmFree(int xid, recordid tree, lsm_page_deallocator_t dealloc,
Tdealloc(xid, *(recordid*)allocator_state); Tdealloc(xid, *(recordid*)allocator_state);
} }
static recordid lsmLookup(int xid, Page *node, int depth, static recordid lsmLookup(int xid, Page *node, int depth, const byte *key,
const byte *key, size_t keySize, lsm_comparator_t cmp) { size_t keySize, lsm_comparator_t cmp) {
if(*recordcount_ptr(node) == FIRST_SLOT) { if(*recordcount_ptr(node) == FIRST_SLOT) {
return NULLRID; return NULLRID;
} }
assert(*recordcount_ptr(node) > FIRST_SLOT); assert(*recordcount_ptr(node) > FIRST_SLOT);
int match = FIRST_SLOT;
const lsmTreeNodeRecord *prev = readNodeRecord(xid,node,FIRST_SLOT,keySize); // don't need to compare w/ first item in tree.
slotid_t prev_slot = FIRST_SLOT; const lsmTreeNodeRecord *rec = readNodeRecord(xid,node,FIRST_SLOT,keySize);
int prev_cmp_key = cmp(prev+1,key);
// @todo binary search within each page
for(int i = FIRST_SLOT+1; i < *recordcount_ptr(node); i++) { for(int i = FIRST_SLOT+1; i < *recordcount_ptr(node); i++) {
const lsmTreeNodeRecord *rec = readNodeRecord(xid,node,i,keySize); rec = readNodeRecord(xid,node,i,keySize);
int cmpval = cmp(rec+1,key);
int rec_cmp_key = cmp(rec+1,key); if(cmpval > 0) {
break;
if(depth) {
if(prev_cmp_key <= 0 && rec_cmp_key > 0) {
pageid_t child_id = prev->ptr;
Page *child_page = loadPage(xid, child_id);
readlock(child_page->rwlatch,0);
recordid ret = lsmLookup(xid,child_page,depth-1,key,keySize,cmp);
unlock(child_page->rwlatch);
releasePage(child_page);
return ret;
}
} else {
// XXX Doesn't handle runs of duplicates.
if(prev_cmp_key <= 0 && rec_cmp_key > 0) {
recordid ret = {node->id, prev_slot, keySize};
return ret;
}
} }
prev = rec; match = i;
prev_slot = i;
prev_cmp_key = rec_cmp_key;
if(rec_cmp_key > 0) { break; }
} }
if(depth) { if(depth) {
// this handles the rhs of the tree. pageid_t child_id = readNodeRecord(xid,node,match,keySize)->ptr;
if(prev_cmp_key <= 0) { Page* child_page = loadPage(xid, child_id);
pageid_t child_id = prev->ptr; readlock(child_page->rwlatch,0);
Page *child_page = loadPage(xid, child_id); recordid ret = lsmLookup(xid,child_page,depth-1,key,keySize,cmp);
readlock(child_page->rwlatch,0); unlock(child_page->rwlatch);
recordid ret = lsmLookup(xid,child_page,depth-1,key,keySize,cmp); releasePage(child_page);
unlock(child_page->rwlatch); return ret;
releasePage(child_page);
return ret;
}
} else { } else {
if(prev_cmp_key <= 0) { recordid ret = {node->id, match, keySize};
recordid ret = {node->id, prev_slot, keySize}; return ret;
return ret;
}
} }
return NULLRID;
} }
static pageid_t lsmLookupLeafPageFromRid(int xid, recordid rid, size_t keySize) { static pageid_t lsmLookupLeafPageFromRid(int xid, recordid rid, size_t keySize) {
@ -845,12 +815,19 @@ lladdIterator_t* lsmTreeIterator_openAt(int xid, recordid root, const byte* key)
Page *p = loadPage(xid,root.page); Page *p = loadPage(xid,root.page);
readlock(p->rwlatch,0); readlock(p->rwlatch,0);
size_t keySize = getKeySize(xid,p); size_t keySize = getKeySize(xid,p);
assert(keySize);
const lsmTreeNodeRecord *nr = readNodeRecord(xid,p,DEPTH,keySize); const lsmTreeNodeRecord *nr = readNodeRecord(xid,p,DEPTH,keySize);
const lsmTreeNodeRecord *cmp_nr = readNodeRecord(xid, p , COMPARATOR, keySize); const lsmTreeNodeRecord *cmp_nr = readNodeRecord(xid, p , COMPARATOR, keySize);
int depth = nr->ptr; int depth = nr->ptr;
recordid lsm_entry_rid = lsmLookup(xid,p,depth,key,getKeySize(xid,p),comparators[cmp_nr->ptr]); recordid lsm_entry_rid = lsmLookup(xid,p,depth,key,keySize,comparators[cmp_nr->ptr]);
if(lsm_entry_rid.page == NULLRID.page && lsm_entry_rid.slot == NULLRID.slot) {
unlock(p->rwlatch);
return 0;
}
assert(lsm_entry_rid.size != INVALID_SLOT);
if(root.page != lsm_entry_rid.page) { if(root.page != lsm_entry_rid.page) {
unlock(p->rwlatch); unlock(p->rwlatch);
@ -907,17 +884,17 @@ int lsmTreeIterator_next(int xid, lladdIterator_t *it) {
size_t keySize = impl->current.size; size_t keySize = impl->current.size;
impl->current = fixedNext(xid, impl->p, impl->current); impl->current = fixedNext(xid, impl->p, impl->current);
if(impl->current.size == INVALID_SLOT) { if(impl->current.size == INVALID_SLOT) {
const lsmTreeNodeRecord *next_rec = readNodeRecord(xid,impl->p,NEXT_LEAF, const lsmTreeNodeRecord next_rec = *readNodeRecord(xid,impl->p,NEXT_LEAF,
keySize); keySize);
unlock(impl->p->rwlatch); unlock(impl->p->rwlatch);
releasePage(impl->p); releasePage(impl->p);
DEBUG("done with page %lld next = %lld\n", impl->p->id, next_rec->ptr); DEBUG("done with page %lld next = %lld\n", impl->p->id, next_rec.ptr);
if(next_rec->ptr != -1 && ! impl->justOnePage) { if(next_rec.ptr != -1 && ! impl->justOnePage) {
impl->p = loadPage(xid, next_rec->ptr); impl->p = loadPage(xid, next_rec.ptr);
readlock(impl->p->rwlatch,0); readlock(impl->p->rwlatch,0);
impl->current.page = next_rec->ptr; impl->current.page = next_rec.ptr;
impl->current.slot = 2; impl->current.slot = 2;
impl->current.size = keySize; impl->current.size = keySize;
} else { } else {
@ -929,9 +906,11 @@ int lsmTreeIterator_next(int xid, lladdIterator_t *it) {
impl->current.size = keySize; impl->current.size = keySize;
} }
if(impl->current.size != INVALID_SLOT) { if(impl->current.size != INVALID_SLOT) {
impl->t = readNodeRecord(xid,impl->p,impl->current.slot,impl->current.size); impl->t = malloc(sizeof(*impl->t));
*impl->t = *readNodeRecord(xid,impl->p,impl->current.slot,impl->current.size);
return 1; return 1;
} else { } else {
free(impl->t);
impl->t = 0; impl->t = 0;
return 0; return 0;
} }

View file

@ -111,7 +111,7 @@ typedef struct lsmTreeNodeRecord {
typedef struct lsmIteratorImpl { typedef struct lsmIteratorImpl {
Page * p; Page * p;
recordid current; recordid current;
const lsmTreeNodeRecord *t; lsmTreeNodeRecord *t;
int justOnePage; int justOnePage;
} lsmIteratorImpl; } lsmIteratorImpl;