diff --git a/src/stasis/operations/lsmTree.c b/src/stasis/operations/lsmTree.c index 48ed148..2a94498 100644 --- a/src/stasis/operations/lsmTree.c +++ b/src/stasis/operations/lsmTree.c @@ -1,3 +1,5 @@ +#include + #include #include // XXX including fixed.h breaks page api encapsulation; we need a "last slot" @@ -5,96 +7,26 @@ #include "../page/fixed.h" #include -const int MAX_LSM_COMPARATORS = 256; +static lsm_comparator_t comparators[MAX_LSM_COMPARATORS]; -typedef struct nodeRecord { - pageid_t ptr; - int key; - // char funk[1000]; -} nodeRecord; - -#define HEADER_SIZE (2 * sizeof(nodeRecord)) - -typedef struct lsmTreeState { - // pthread_mutex_t mut; - // pageid_t * dirtyPages; - pageid_t lastLeaf; -} lsmTreeState; - -/** Initialize a page for use as an internal node of the tree. - * lsmTree nodes are based on fixed.h. This function allocates a page - * that can hold fixed length records, and then sets up a tree node - * header in the first two nodeRecords on the page. - */ -static void initializeNodePage(int xid, Page * p) { - fixedPageInitialize(p, sizeof(nodeRecord), 0); - recordid reserved1 = recordPreAlloc(xid, p, sizeof(nodeRecord)); - recordPostAlloc(xid, p, reserved1); - recordid reserved2 = recordPreAlloc(xid, p, sizeof(nodeRecord)); - recordPostAlloc(xid, p, reserved2); +void lsmTreeRegisterComparator(int id, lsm_comparator_t i) { + // XXX need to de-init this somewhere... assert(!comparators[id]); + comparators[id] = i; } -/** - * A macro that hardcodes the page implementation to use fixed.h's page implementation. - */ -#define readNodeRecord(xid,p,slot) readNodeRecordFixed(xid,p,slot) -/** - * @see readNodeRecord - */ -#define writeNodeRecord(xid,p,slot,key,ptr) writeNodeRecordFixed(xid,p,slot,key,ptr) -//#define readNodeRecord(xid,p,slot) readNodeRecordVirtualMethods(xid,p,slot) -//#define writeNodeRecord(xid,p,slot,key,ptr) writeNodeRecordVirtualMethods(xid,p,slot,key,ptr) +#define HEADER_SIZE (2 * sizeof(lsmTreeNodeRecord)) -/** - * Read a record from the page node, assuming the nodes are fixed pages. - */ -static inline nodeRecord readNodeRecordFixed(int xid, Page * const p, int slot) { - return *(nodeRecord*)fixed_record_ptr(p, slot); -} -/** - * Read a record from the page node, using stasis' general-purpose page access API. - */ -static inline nodeRecord readNodeRecordVirtualMethods(int xid, Page * const p, int slot) { - nodeRecord ret; +// These two constants only apply to the root page. +#define DEPTH 0 +#define COMPARATOR 1 - recordid rid = {p->id, slot, sizeof(nodeRecord)}; - const nodeRecord * nr = (const nodeRecord*)recordReadNew(xid,p,rid); - ret = *nr; - assert(ret.ptr > 1 || slot < 2); - recordReadDone(xid,p,rid,(const byte*)nr); +// These two apply to all other pages. +#define PREV_LEAF 0 +#define NEXT_LEAF 1 - DEBUG("reading {%lld, %d, %d} = %d, %lld\n", p->id, slot, sizeof(nodeRecord), ret.key, ret.ptr); - - return ret; -} - -/** - @see readNodeFixed - */ -static inline void writeNodeRecordFixed(int xid, Page * const p, int slot, int key, pageid_t ptr) { - nodeRecord * nr = (nodeRecord*)fixed_record_ptr(p,slot); - nr->key = key; - nr->ptr = ptr; - pageWriteLSN(xid, p, 0); // XXX need real LSN? -} - -/** - @see readNodeVirtualMethods -*/ -static inline void writeNodeRecordVirtualMethods(int xid, Page * const p, int slot, int key, pageid_t ptr) { - nodeRecord src; - src.key = key; - src.ptr = ptr; - assert(src.ptr > 1 || slot < 2); - - recordid rid = {p->id, slot, sizeof(nodeRecord)}; - nodeRecord * target = (nodeRecord*)recordWriteNew(xid,p,rid); - *target = src; - DEBUG("Writing to record {%d %d %lld}\n", rid.page, rid.slot, rid.size); - recordWriteDone(xid,p,rid,(byte*)target); - pageWriteLSN(xid, p, 0); // XXX need real LSN? -} +// This one applies to all pages. +#define FIRST_SLOT 2 /** @@ -105,7 +37,7 @@ static inline void writeNodeRecordVirtualMethods(int xid, Page * const p, int sl pageLoaded and pageFlushed callbacks. Those callbacks maintain an impl pointer, which tracks dirty pages, a mutex, and other information on behalf of the tree. (Note that the dirtyPage list - must be stored in a global hash tree if the root is evicted with + must be stored somewhere in memory if the root is evicted with outstanding dirty tree pages...) Note that this has a particularly nice, general purpose property @@ -124,116 +56,244 @@ static inline void writeNodeRecordVirtualMethods(int xid, Page * const p, int sl uses fixedPage (for now) - slot 0: depth of tree. - slot 1: slot id of first key in leaf records. [unimplemented] + slot 0: the integer id of the comparator used by this tree. + slot 1: depth of tree. - the remainder of the slots contain nodeRecords + the remainder of the slots contain lsmTreeNodeRecords internal node page layout ------------------------- uses fixedPage (for now) - slot 0: prev page [unimplemented] - slot 1: next page [unimplemented] - the remainder of the slots contain nodeRecords + slot 0: prev page + slot 1: next page + the remainder of the slots contain lsmTreeNodeRecords leaf page layout ---------------- - Defined by client, but calling readRecord() on the slot id must - return the first key stored on the page. + Defined by client. */ -recordid TlsmCreate(int xid, int leafFirstSlot, int keySize) { - // XXX generalize later - assert(keySize == sizeof(int)); - // XXX hardcoded to fixed.h's current page layout, and node records - // that contain the key... - // can the pages hold at least two keys? - assert(HEADER_SIZE + 2 * (sizeof(nodeRecord) /*XXX +keySize*/) < +typedef struct lsmTreeState { + pageid_t lastLeaf; +} lsmTreeState; + +/** Initialize a page for use as an internal node of the tree. + * lsmTree nodes are based on fixed.h. This function allocates a page + * that can hold fixed length records, and then sets up a tree node + * header in the first two lsmTreeNodeRecords on the page. + */ +static void initializeNodePage(int xid, Page *p, size_t keylen) { + fixedPageInitialize(p, sizeof(lsmTreeNodeRecord)+keylen, 0); + recordid reserved1 = recordPreAlloc(xid, p, sizeof(lsmTreeNodeRecord)+keylen); + recordPostAlloc(xid, p, reserved1); + recordid reserved2 = recordPreAlloc(xid, p, sizeof(lsmTreeNodeRecord)+keylen); + recordPostAlloc(xid, p, reserved2); +} + +/** + * A macro that hardcodes the page implementation to use fixed.h's + * page implementation. + */ + +#define readNodeRecord(xid,p,slot,keylen) readNodeRecordFixed(xid,p,slot,keylen) +/** + * @see readNodeRecord + */ +#define writeNodeRecord(xid,p,slot,key,keylen,ptr) \ + writeNodeRecordFixed(xid,p,slot,key,keylen,ptr) +/** + * @see readNodeRecord + */ +#define getKeySize(xid,p) getKeySizeFixed(xid,p) + +/* +#define getKeySize(xid,p) getKeySizeVirtualMethods(xid,p) +#define readNodeRecord(xid,p,slot,keylen) \ + readNodeRecordVirtualMethods(xid,p,slot,keylen) +#define writeNodeRecord(xid,p,slot,key,keylen,ptr) \ + writeNodeRecordVirtualMethods(xid,p,slot,key,keylen,ptr) +*/ + +static inline size_t getKeySizeFixed(int xid, Page const *p) { + return *recordsize_ptr(p) - sizeof(lsmTreeNodeRecord); +} + +static inline size_t getKeySizeVirtualMethods(int xid, Page *p) { + recordid rid = { p->id, 0, 0 }; + return recordGetLength(xid, p, rid) - sizeof(lsmTreeNodeRecord); +} +/** + * Read a record from the page node, assuming the nodes are fixed pages. + */ +static inline +const lsmTreeNodeRecord* readNodeRecordFixed(int xid, Page *const p, int slot, + int keylen) { + return (const lsmTreeNodeRecord*)fixed_record_ptr(p, slot); +} +/** + * Read a record from the page node, using stasis' general-purpose + * page access API. + */ +static inline +lsmTreeNodeRecord* readNodeRecordVirtualMethods(int xid, Page * p, + int slot, int keylen) { + lsmTreeNodeRecord *ret; + + recordid rid = {p->id, slot, sizeof(lsmTreeNodeRecord)}; + const lsmTreeNodeRecord *nr + = (const lsmTreeNodeRecord*)recordReadNew(xid,p,rid); + memcpy(ret, nr, sizeof(lsmTreeNodeRecord) + keylen); + recordReadDone(xid,p,rid,(const byte*)nr); + + DEBUG("reading {%lld, %d, %d} = %d, %lld\n", + p->id, slot, sizeof(lsmTreeNodeRecord), ret.key, ret.ptr); + + return ret; +} + +/** + @see readNodeFixed + */ +static inline +void writeNodeRecordFixed(int xid, Page *p, int slot, + const byte *key, size_t keylen, pageid_t ptr) { + lsmTreeNodeRecord *nr = (lsmTreeNodeRecord*)fixed_record_ptr(p,slot); + nr->ptr = ptr; + memcpy(nr+1, key, keylen); + pageWriteLSN(xid, p, 0); // XXX need real LSN? +} + +/** + @see readNodeVirtualMethods +*/ +static inline +void writeNodeRecordVirtualMethods(int xid, Page *p, int slot, + const byte *key, size_t keylen, + pageid_t ptr) { + recordid rid = {p->id, slot, sizeof(lsmTreeNodeRecord)}; + lsmTreeNodeRecord *target = (lsmTreeNodeRecord*)recordWriteNew(xid,p,rid); + target->ptr = ptr; + memcpy(target+1,key,keylen); + + DEBUG("Writing to record {%d %d %lld}\n", rid.page, rid.slot, rid.size); + recordWriteDone(xid,p,rid,(byte*)target); + pageWriteLSN(xid, p, 0); // XXX need real LSN? +} + +recordid TlsmCreate(int xid, int comparator, int keySize) { + + // can the pages hold at least two keys? + assert(HEADER_SIZE + 2 * (sizeof(lsmTreeNodeRecord) +keySize) < USABLE_SIZE_OF_PAGE - 2 * sizeof(short)); pageid_t root = TpageAlloc(xid); - + DEBUG("Root = %lld\n", root); recordid ret = { root, 0, 0 }; - Page * const p = loadPage(xid, ret.page); + Page *p = loadPage(xid, ret.page); writelock(p->rwlatch,0); - fixedPageInitialize(p, sizeof(nodeRecord), 0); + fixedPageInitialize(p, sizeof(lsmTreeNodeRecord) + keySize, 0); *page_type_ptr(p) = LSM_ROOT_PAGE; - lsmTreeState * state = malloc(sizeof(lsmTreeState)); - state->lastLeaf = -1; /// constants.h - // pthread_mutex_init(&(state->mut),0); - // state->dirtyPages = malloc(sizeof(Page*)*2); - // state->dirtyPages[0] = ret.page; - // state->dirtyPages[1] = -1; // XXX this should be defined in constants.h + lsmTreeState *state = malloc(sizeof(lsmTreeState)); + state->lastLeaf = -1; /// XXX define something in constants.h? p->impl = state; - recordid treeDepth = recordPreAlloc(xid, p, sizeof(nodeRecord)); - recordPostAlloc(xid,p,treeDepth); + recordid tmp + = recordPreAlloc(xid, p, sizeof(lsmTreeNodeRecord) + keySize); + recordPostAlloc(xid,p,tmp); - assert(treeDepth.page == ret.page - && treeDepth.slot == 0 - && treeDepth.size == sizeof(nodeRecord)); + assert(tmp.page == ret.page + && tmp.slot == DEPTH + && tmp.size == sizeof(lsmTreeNodeRecord) + keySize); - recordid slotOff = recordPreAlloc(xid, p, sizeof(nodeRecord)); - recordPostAlloc(xid,p,slotOff); + tmp = recordPreAlloc(xid, p, sizeof(lsmTreeNodeRecord) + keySize); + recordPostAlloc(xid,p,tmp); - assert(slotOff.page == ret.page - && slotOff.slot == 1 - && slotOff.size == sizeof(nodeRecord)); + assert(tmp.page == ret.page + && tmp.slot == COMPARATOR + && tmp.size == sizeof(lsmTreeNodeRecord) + keySize); - // ptr is zero because tree depth starts out as zero. - writeNodeRecord(xid, p, 0, 0, 0); - // ptr = slotOff (which isn't used, for now...) - writeNodeRecord(xid, p, 1, 0, leafFirstSlot); + byte *dummy = calloc(1,keySize); + + writeNodeRecord(xid, p, DEPTH, dummy, keySize, 0); + writeNodeRecord(xid, p, COMPARATOR, dummy, keySize, comparator); unlock(p->rwlatch); releasePage(p); return ret; } -static recordid buildPathToLeaf(int xid, recordid root, Page * const root_p, - int depth, const byte * key, size_t key_len, - pageid_t val_page) { +static recordid buildPathToLeaf(int xid, recordid root, Page *root_p, + int depth, const byte *key, size_t key_len, + pageid_t val_page, pageid_t lastLeaf) { // root is the recordid on the root page that should point to the // new subtree. assert(depth); - DEBUG("buildPathToLeaf(depth=%d) called\n",depth); + DEBUG("buildPathToLeaf(depth=%d) (lastleaf=%lld) called\n",depth, lastLeaf); pageid_t child = TpageAlloc(xid); // XXX Use some other function... + DEBUG("new child = %lld internal? %d\n", child, depth-1); - Page * const child_p = loadPage(xid, child); + Page *child_p = loadPage(xid, child); writelock(child_p->rwlatch,0); - initializeNodePage(xid, child_p); + initializeNodePage(xid, child_p, key_len); recordid ret; if(depth-1) { // recurse: the page we just allocated is not a leaf. - recordid child_rec = recordPreAlloc(xid, child_p, sizeof(nodeRecord)); + recordid child_rec = recordPreAlloc(xid, child_p, sizeof(lsmTreeNodeRecord)+key_len); assert(child_rec.size != INVALID_SLOT); recordPostAlloc(xid, child_p, child_rec); ret = buildPathToLeaf(xid, child_rec, child_p, depth-1, key, key_len, - val_page); + val_page,lastLeaf); + + unlock(child_p->rwlatch); + releasePage(child_p); + } else { // set leaf - recordid leaf_rec = recordPreAlloc(xid, child_p, sizeof(nodeRecord)); - assert(leaf_rec.slot == 2); // XXX + + byte *dummy = calloc(1, key_len); + + // backward link. + writeNodeRecord(xid,child_p,PREV_LEAF,dummy,key_len,lastLeaf); + // forward link (initialize to -1) + writeNodeRecord(xid,child_p,NEXT_LEAF,dummy,key_len,-1); + + recordid leaf_rec = recordPreAlloc(xid, child_p, + sizeof(lsmTreeNodeRecord)+key_len); + + assert(leaf_rec.slot == FIRST_SLOT); + recordPostAlloc(xid, child_p, leaf_rec); - writeNodeRecord(xid,child_p,leaf_rec.slot,*(int*)key,val_page); + writeNodeRecord(xid,child_p,leaf_rec.slot,key,key_len,val_page); ret = leaf_rec; - } - unlock(child_p->rwlatch); - releasePage(child_p); - writeNodeRecord(xid, root_p, root.slot, *(int*)key, child); + unlock(child_p->rwlatch); + releasePage(child_p); + if(lastLeaf != -1) { + // install forward link in previous page + Page *lastLeafP = loadPage(xid, lastLeaf); + writelock(lastLeafP->rwlatch,0); + writeNodeRecord(xid,lastLeafP,NEXT_LEAF,dummy,key_len,child); + unlock(lastLeafP->rwlatch); + releasePage(lastLeafP); + } + + DEBUG("%lld <-> %lld\n", lastLeaf, child); + free(dummy); + } + + writeNodeRecord(xid, root_p, root.slot, key, key_len, child); return ret; } @@ -252,47 +312,46 @@ static recordid buildPathToLeaf(int xid, recordid root, Page * const root_p, */ -static recordid appendInternalNode(int xid, Page * const p, +static recordid appendInternalNode(int xid, Page *p, int depth, const byte *key, size_t key_len, - pageid_t val_page) { + pageid_t val_page, pageid_t lastLeaf) { if(!depth) { // leaf node. - recordid ret = recordPreAlloc(xid, p, sizeof(nodeRecord)); + recordid ret = recordPreAlloc(xid, p, sizeof(lsmTreeNodeRecord)+key_len); if(ret.size != INVALID_SLOT) { recordPostAlloc(xid, p, ret); - writeNodeRecord(xid,p,ret.slot,*(int*)key,val_page); - assert(val_page); // XXX + writeNodeRecord(xid,p,ret.slot,key,key_len,val_page); } return ret; } else { // recurse int slot = *recordcount_ptr(p)-1; - assert(slot >= 2); // XXX - nodeRecord nr = readNodeRecord(xid, p, slot); - pageid_t child_id = nr.ptr; + assert(slot >= FIRST_SLOT); // there should be no empty nodes + const lsmTreeNodeRecord *nr = readNodeRecord(xid, p, slot, key_len); + pageid_t child_id = nr->ptr; recordid ret; { - Page * const child_page = loadPage(xid, child_id); + Page *child_page = loadPage(xid, child_id); writelock(child_page->rwlatch,0); - ret = appendInternalNode(xid, child_page, depth-1, - key, key_len, val_page); + ret = appendInternalNode(xid, child_page, depth-1, key, key_len, + val_page, lastLeaf); + unlock(child_page->rwlatch); releasePage(child_page); } if(ret.size == INVALID_SLOT) { // subtree is full; split - if(depth > 1) { - DEBUG("subtree is full at depth %d\n", depth); - } - - ret = recordPreAlloc(xid, p, sizeof(nodeRecord)); + ret = recordPreAlloc(xid, p, sizeof(lsmTreeNodeRecord)+key_len); if(ret.size != INVALID_SLOT) { recordPostAlloc(xid, p, ret); - ret = buildPathToLeaf(xid, ret, p, depth, key, key_len, val_page); + ret = buildPathToLeaf(xid, ret, p, depth, key, key_len, val_page, + lastLeaf); - DEBUG("split tree rooted at %lld, wrote value to {%d %d %lld}\n", p->id, ret.page, ret.slot, ret.size); + DEBUG("split tree rooted at %lld, wrote value to {%d %d %lld}\n", + p->id, ret.page, ret.slot, ret.size); } else { - // ret is NULLRID; this is the root of a full tree. Return NULLRID to the caller. + // ret is NULLRID; this is the root of a full tree. Return + // NULLRID to the caller. } } else { // we inserted the value in to a subtree rooted here. @@ -305,53 +364,74 @@ static recordid appendInternalNode(int xid, Page * const p, * Traverse from the root of the page to the right most leaf (the one * with the higest base key value). */ -static pageid_t findLastLeaf(int xid, Page * const root, int depth) { +static pageid_t findLastLeaf(int xid, Page *root, int depth) { if(!depth) { DEBUG("Found last leaf = %lld\n", root->id); return root->id; } else { - nodeRecord nr = readNodeRecord(xid, root, (*recordcount_ptr(root))-1); + // passing zero as length is OK, as long as we don't try to access the key. + const lsmTreeNodeRecord *nr = readNodeRecord(xid, root, + (*recordcount_ptr(root))-1,0); pageid_t ret; - { - Page * const p = loadPage(xid, nr.ptr); - writelock(p->rwlatch,0); - ret = findLastLeaf(xid,p,depth-1); - unlock(p->rwlatch); - releasePage(p); - } + + Page *p = loadPage(xid, nr->ptr); + readlock(p->rwlatch,0); + ret = findLastLeaf(xid,p,depth-1); + unlock(p->rwlatch); + releasePage(p); + return ret; } } +/** + * Traverse from the root of the tree to the left most (lowest valued + * key) leaf. + */ +static pageid_t findFirstLeaf(int xid, Page *root, int depth) { + if(!depth) { + return root->id; + } else { + const lsmTreeNodeRecord *nr = readNodeRecord(xid,root,FIRST_SLOT,0); + Page *p = loadPage(xid, nr->ptr); + readlock(p->rwlatch,0); + pageid_t ret = findFirstLeaf(xid,p,depth-1); + unlock(p->rwlatch); + releasePage(p); + return ret; + } +} recordid TlsmAppendPage(int xid, recordid tree, - const byte *key, size_t keySize, + const byte *key, long val_page) { - Page * const p = loadPage(xid, tree.page); + Page *p = loadPage(xid, tree.page); writelock(p->rwlatch, 0); - lsmTreeState * s = p->impl; - // pthread_mutex_lock(&(s->mut)); + lsmTreeState *s = p->impl; + + size_t keySize = getKeySize(xid,p); tree.slot = 0; - tree.size = sizeof(nodeRecord); + tree.size = sizeof(lsmTreeNodeRecord)+keySize; - nodeRecord nr = readNodeRecord(xid,p,0); - int depth = nr.ptr; - // const nodeRecord * nr = (const nodeRecord*)recordReadNew(xid,p,tree); - // int depth = nr->ptr; - // recordReadDone(xid,p,tree,(const byte*)nr); + + const lsmTreeNodeRecord *nr = readNodeRecord(xid, p, DEPTH, keySize); + int depth = nr->ptr; if(s->lastLeaf == -1) { s->lastLeaf = findLastLeaf(xid, p, depth); } - Page * lastLeaf; + + Page *lastLeaf; + if(s->lastLeaf != tree.page) { lastLeaf= loadPage(xid, s->lastLeaf); - writelock(lastLeaf->rwlatch, 0); // tree depth is in slot zero of root + writelock(lastLeaf->rwlatch, 0); } else { lastLeaf = p; } - recordid ret = recordPreAlloc(xid, lastLeaf, sizeof(nodeRecord)); + recordid ret = recordPreAlloc(xid, lastLeaf, + sizeof(lsmTreeNodeRecord)+keySize); if(ret.size == INVALID_SLOT) { if(lastLeaf->id != p->id) { @@ -363,58 +443,72 @@ recordid TlsmAppendPage(int xid, recordid tree, tree.slot = 0; assert(tree.page == p->id); - ret = appendInternalNode(xid, p, depth, key, keySize, - val_page); + ret = appendInternalNode(xid, p, depth, key, keySize, val_page, + s->lastLeaf == tree.page ? -1 : s->lastLeaf); if(ret.size == INVALID_SLOT) { DEBUG("Need to split root; depth = %d\n", depth); pageid_t child = TpageAlloc(xid); - - Page * lc = loadPage(xid, child); - + Page *lc = loadPage(xid, child); writelock(lc->rwlatch,0); - initializeNodePage(xid, lc); + initializeNodePage(xid, lc,keySize); - for(int i = 2; i < *recordcount_ptr(p); i++) { + for(int i = FIRST_SLOT; i < *recordcount_ptr(p); i++) { - recordid cnext = recordPreAlloc(xid, lc, sizeof(nodeRecord)); + recordid cnext = recordPreAlloc(xid, lc, + sizeof(lsmTreeNodeRecord)+keySize); - assert(i == cnext.slot); // XXX hardcoded to current node format... + assert(i == cnext.slot); assert(cnext.size != INVALID_SLOT); recordPostAlloc(xid, lc, cnext); - nodeRecord nr = readNodeRecord(xid,p,i); - writeNodeRecord(xid,lc,i,nr.key,nr.ptr); + const lsmTreeNodeRecord *nr = readNodeRecord(xid,p,i,keySize); + writeNodeRecord(xid,lc,i,(byte*)(nr+1),keySize,nr->ptr); } // deallocate old entries, and update pointer on parent node. - // XXX this is a terrible way to do this. - recordid pFirstSlot = {p->id, 2, sizeof(nodeRecord)}; - *recordcount_ptr(p) = 3; - nodeRecord * nr = (nodeRecord*)recordWriteNew(xid, p, pFirstSlot); + recordid pFirstSlot = { p->id, FIRST_SLOT, + sizeof(lsmTreeNodeRecord)+keySize }; + + // @todo should fixed.h support bulk deallocation directly? + *recordcount_ptr(p) = FIRST_SLOT+1; + + lsmTreeNodeRecord *nr + = (lsmTreeNodeRecord*)recordWriteNew(xid, p, pFirstSlot); + // don't overwrite key... nr->ptr = child; - assert(nr->ptr > 1);///XXX recordWriteDone(xid,p,pFirstSlot,(byte*)nr); pageWriteLSN(xid, p, 0); // XXX need real LSN? + byte *dummy = calloc(1,keySize); + if(!depth) { + s->lastLeaf = lc->id; + writeNodeRecord(xid,lc,PREV_LEAF,dummy,keySize,-1); + writeNodeRecord(xid,lc,NEXT_LEAF,dummy,keySize,-1); + } + unlock(lc->rwlatch); releasePage(lc); + depth ++; - writeNodeRecord(xid,p,0,0,depth); + writeNodeRecord(xid,p,DEPTH,dummy,keySize,depth); + free(dummy); assert(tree.page == p->id); - ret = appendInternalNode(xid, p, depth, key, keySize, - val_page); + ret = appendInternalNode(xid, p, depth, key, keySize, val_page, + s->lastLeaf == tree.page ? -1 : s->lastLeaf); + assert(ret.size != INVALID_SLOT); } else { - DEBUG("Appended new internal node tree depth = %d key = %d\n", depth, *(int*)key); + DEBUG("Appended new internal node tree depth = %d key = %d\n", + depth, *(int*)key); } s->lastLeaf = ret.page; DEBUG("lastleaf is %lld\n", s->lastLeaf); @@ -425,7 +519,7 @@ recordid TlsmAppendPage(int xid, recordid tree, recordPostAlloc(xid, lastLeaf, ret); - writeNodeRecord(xid, lastLeaf, ret.slot, *(int*)key, val_page); + writeNodeRecord(xid, lastLeaf, ret.slot, key, keySize, val_page); if(lastLeaf->id != p->id) { unlock(lastLeaf->rwlatch); @@ -433,32 +527,34 @@ recordid TlsmAppendPage(int xid, recordid tree, } } - // XXX do something to make this transactional... - // pthread_mutex_unlock(&(s->mut)); unlock(p->rwlatch); releasePage(p); return ret; } -static pageid_t lsmLookup(int xid, Page * const node, int depth, - const byte *key, size_t keySize) { - // Start at slot 2 to skip reserved slots on page... - if(*recordcount_ptr(node) == 2) { return -1; } - assert(*recordcount_ptr(node) > 2); - nodeRecord prev = readNodeRecord(xid,node,2); +static pageid_t lsmLookup(int xid, Page *node, int depth, + const byte *key, size_t keySize, lsm_comparator_t cmp) { - // should do binary search instead. - for(int i = 3; i < *recordcount_ptr(node); i++) { - nodeRecord rec = readNodeRecord(xid,node,i); + if(*recordcount_ptr(node) == FIRST_SLOT) { return -1; } + assert(*recordcount_ptr(node) > FIRST_SLOT); + + const lsmTreeNodeRecord *prev = readNodeRecord(xid,node,FIRST_SLOT,keySize); + int prev_cmp_key = cmp(prev+1,key); + + // @todo binary search within each page + for(int i = FIRST_SLOT+1; i < *recordcount_ptr(node); i++) { + const lsmTreeNodeRecord *rec = readNodeRecord(xid,node,i,keySize); + + int rec_cmp_key = cmp(rec+1,key); if(depth) { - if(prev.key <= *(int*)key && rec.key > *(int*)key) { - pageid_t child_id = prev.ptr; - Page * const child_page = loadPage(xid, child_id); + if(prev_cmp_key <= 0 && rec_cmp_key > 0) { + pageid_t child_id = prev->ptr; + Page *child_page = loadPage(xid, child_id); readlock(child_page->rwlatch,0); - long ret = lsmLookup(xid,child_page,depth-1,key,keySize); + long ret = lsmLookup(xid,child_page,depth-1,key,keySize,cmp); unlock(child_page->rwlatch); releasePage(child_page); return ret; @@ -466,22 +562,22 @@ static pageid_t lsmLookup(int xid, Page * const node, int depth, } else { - if(prev.key == *(int*)key) { - return prev.ptr; + if(prev_cmp_key == 0) { + return prev->ptr; } } prev = rec; - - if(prev.key > *(int*)key) { break; } + prev_cmp_key = rec_cmp_key; + if(rec_cmp_key > 0) { break; } } if(depth) { - if(prev.key <= *(int*)key) { - pageid_t child_id = prev.ptr; - Page * const child_page = loadPage(xid, child_id); + if(prev_cmp_key <= 0) { + pageid_t child_id = prev->ptr; + Page *child_page = loadPage(xid, child_id); readlock(child_page->rwlatch,0); - long ret = lsmLookup(xid,child_page,depth-1,key,keySize); + long ret = lsmLookup(xid,child_page,depth-1,key,keySize,cmp); unlock(child_page->rwlatch); releasePage(child_page); return ret; @@ -489,32 +585,37 @@ static pageid_t lsmLookup(int xid, Page * const node, int depth, } else { - if(prev.key == *(int*)key) { - return prev.ptr; + if(prev_cmp_key == 0) { + return prev->ptr; } } return -1; } -pageid_t TlsmFindPage(int xid, recordid tree, const byte * key, size_t keySize) { - Page * const p = loadPage(xid, tree.page); +/** + Look up the value associated with key. + + @return -1 if key isn't in the tree. +*/ +pageid_t TlsmFindPage(int xid, recordid tree, const byte *key) { + Page *p = loadPage(xid, tree.page); readlock(p->rwlatch,0); - //lsmTreeState * s = p->impl; - // pthread_mutex_lock(&(s->mut)); tree.slot = 0; tree.size = *recordsize_ptr(p); - nodeRecord nr = readNodeRecord(xid, p , 0); - // const nodeRecord * nr = (const nodeRecord*)recordReadNew(xid, p, tree); + size_t keySize = getKeySize(xid,p); - int depth = nr.ptr; + const lsmTreeNodeRecord *depth_nr = readNodeRecord(xid, p , 0, keySize); + const lsmTreeNodeRecord *cmp_nr = readNodeRecord(xid, p , 1, keySize); - pageid_t ret = lsmLookup(xid, p, depth, key, keySize); + int depth = depth_nr->ptr; + + lsm_comparator_t cmp = comparators[cmp_nr->ptr]; + + pageid_t ret = lsmLookup(xid, p, depth, key, keySize, cmp); - // recordReadDone(xid, p, tree, (const byte*)nr); - //pthread_mutex_unlock(&(s->mut)); unlock(p->rwlatch); releasePage(p); @@ -528,9 +629,8 @@ pageid_t TlsmFindPage(int xid, recordid tree, const byte * key, size_t keySize) associated with the tree. */ static void lsmPageLoaded(Page *p) { - lsmTreeState * state = malloc(sizeof(lsmTreeState)); + lsmTreeState *state = malloc(sizeof(lsmTreeState)); state->lastLeaf = -1; - //pthread_mutex_init(&(state->mut),0); p->impl = state; } /** @@ -538,8 +638,7 @@ static void lsmPageLoaded(Page *p) { This is called by the buffer manager. */ static void lsmPageFlushed(Page *p) { - lsmTreeState * state = p->impl; - //pthread_mutex_destroy(&(state->mut)); + lsmTreeState *state = p->impl; free(state); } /** @@ -552,3 +651,83 @@ page_impl lsmRootImpl() { pi.page_type = LSM_ROOT_PAGE; return pi; } +///--------------------- Iterator implementation + +lladdIterator_t *lsmTreeIterator_open(int xid, recordid root) { + Page *p = loadPage(xid,root.page); + readlock(p->rwlatch,0); + size_t keySize = getKeySize(xid,p); + const lsmTreeNodeRecord *nr = readNodeRecord(xid,p,DEPTH,keySize); + int depth = nr->ptr; + pageid_t leafid = findFirstLeaf(xid, p, depth); + if(leafid != root.page) { + unlock(p->rwlatch); + releasePage(p); + p = loadPage(xid,leafid); + readlock(p->rwlatch,0); + } + lsmIteratorImpl *impl = malloc(sizeof(lsmIteratorImpl)); + impl->p = p; + { + recordid rid = { p->id, 1, keySize }; + impl->current = rid; + } + DEBUG("keysize = %d, slot = %d\n", keySize, impl->current.slot); + impl->t = 0; + impl->justOnePage = (depth == 0); + + lladdIterator_t *it = malloc(sizeof(lladdIterator_t)); + it->type = -1; // XXX LSM_TREE_ITERATOR; + it->impl = impl; + /* itdef = { <-- @todo register lsmTree iterators with stasis someday... + lsmTreeIterator_close; + lsmTreeIterator_next; + lsmTreeIterator_next; + lsmTreeIterator_key; + lsmTreeIterator_value; + lsmTreeIterator_tupleDone; + lsmTreeIterator_releaseLock; + } */ + return it; +} +void lsmTreeIterator_close(int xid, lladdIterator_t *it) { + lsmIteratorImpl *impl = it->impl; + if(impl->p) { + unlock(impl->p->rwlatch); + releasePage(impl->p); + } + free(impl); + free(it); +} + +int lsmTreeIterator_next(int xid, lladdIterator_t *it) { + lsmIteratorImpl *impl = it->impl; + size_t keySize = impl->current.size; + impl->current = fixedNext(xid, impl->p, impl->current); + if(impl->current.size == INVALID_SLOT) { + const lsmTreeNodeRecord *next_rec = readNodeRecord(xid,impl->p,NEXT_LEAF, + impl->current.size); + unlock(impl->p->rwlatch); + releasePage(impl->p); + + DEBUG("done with page %lld next = %lld\n", impl->p->id, next_rec->ptr); + + if(next_rec->ptr != -1 && ! impl->justOnePage) { + impl->p = loadPage(xid, next_rec->ptr); + readlock(impl->p->rwlatch,0); + impl->current.page = next_rec->ptr; + impl->current.slot = 2; + impl->current.size = keySize; + } else { + impl->p = 0; + impl->current.size = -1; + } + } + if(impl->current.size != INVALID_SLOT) { + impl->t = readNodeRecord(xid,impl->p,impl->current.slot,impl->current.size); + return 1; + } else { + impl->t = 0; + return 0; + } +} diff --git a/src/stasis/page/fixed.c b/src/stasis/page/fixed.c index 1e8db52..f14ce85 100644 --- a/src/stasis/page/fixed.c +++ b/src/stasis/page/fixed.c @@ -79,22 +79,6 @@ static int fixedGetLength(int xid, Page *p, recordid rid) { return rid.slot > *recordcount_ptr(p) ? INVALID_SLOT : physical_slot_length(*recordsize_ptr(p)); } -static recordid fixedNext(int xid, Page *p, recordid rid) { - short n = *recordcount_ptr(p); - rid.slot++; - rid.size = *recordsize_ptr(p); - if(rid.slot >= n) { - return NULLRID; - } else { - return rid; - } -} -static recordid fixedFirst(int xid, Page *p) { - recordid rid = { p->id, -1, 0 }; - rid.size = *recordsize_ptr(p); - return fixedNext(xid, p, rid); -} - static int notSupported(int xid, Page * p) { return 0; } static int fixedFreespace(int xid, Page * p) { diff --git a/src/stasis/page/fixed.h b/src/stasis/page/fixed.h index 76114ad..00aa8bb 100644 --- a/src/stasis/page/fixed.h +++ b/src/stasis/page/fixed.h @@ -7,6 +7,22 @@ #define recordcount_ptr(page) shorts_from_end((page), 2) #define fixed_record_ptr(page, n) bytes_from_start((page), *recordsize_ptr((page)) * (n)) +static inline recordid fixedNext(int xid, Page *p, recordid rid) { + short n = *recordcount_ptr(p); + rid.slot++; + rid.size = *recordsize_ptr(p); + if(rid.slot >= n) { + return NULLRID; + } else { + return rid; + } +} +static inline recordid fixedFirst(int xid, Page *p) { + recordid rid = { p->id, -1, 0 }; + rid.size = *recordsize_ptr(p); + return fixedNext(xid, p, rid); +} + void fixedPageInit(); void fixedPageDeinit(); page_impl fixedImpl(); diff --git a/stasis/constants.h b/stasis/constants.h index 1f3e43e..a31064c 100644 --- a/stasis/constants.h +++ b/stasis/constants.h @@ -241,4 +241,7 @@ extern const short SLOT_TYPE_LENGTHS[]; #define FILE_PERM (S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH) #define LOG_MODE (O_CREAT | O_RDWR | O_SYNC) + +#define MAX_LSM_COMPARATORS 256 + #endif diff --git a/stasis/operations/lsmTree.h b/stasis/operations/lsmTree.h index aa3eb10..7c7927b 100644 --- a/stasis/operations/lsmTree.h +++ b/stasis/operations/lsmTree.h @@ -22,13 +22,9 @@ typedef struct { recordid pos; } lladd_lsm_iterator; -typedef struct { - int id; - // fcn pointer... -} comparator_impl; +typedef int(*lsm_comparator_t)(const void* a, const void* b); -void lsmTreeRegisterComparator(comparator_impl i); -extern const int MAX_LSM_COMPARATORS; +void lsmTreeRegisterComparator(int id, lsm_comparator_t i); /** Initialize a new LSM tree. @@ -47,7 +43,7 @@ recordid TlsmDealloc(int xid, recordid tree); ascending order; LSM trees do not support update in place. */ recordid TlsmAppendPage(int xid, recordid tree, - const byte *key, size_t keySize, + const byte *key, long pageid); /** Lookup a leaf page. @@ -60,25 +56,51 @@ recordid TlsmAppendPage(int xid, recordid tree, Currently unused. */ pageid_t TlsmFindPage(int xid, recordid tree, - const byte *key, size_t keySize); + const byte *key); + +/// --------------- Iterator implementation + +typedef struct lsmTreeNodeRecord { + pageid_t ptr; +} lsmTreeNodeRecord; + +typedef struct lsmIteratorImpl { + Page * p; + recordid current; + const lsmTreeNodeRecord *t; + int justOnePage; +} lsmIteratorImpl; /** Return a forward iterator over the tree's leaf pages (*not* their - contents). -*/ -lladdIterator_t * TlsmIterator(int xid, recordid hash); + contents). The iterator starts before the first leaf page. -/** + @see iterator.h for documentation of lsmTree's iterator interface. +*/ +lladdIterator_t * lsmTreeIterator_open(int xid, recordid tree); + +/* These are the functions that implement lsmTree's iterator. They're public so that performance critical code can call them without paying for a virtual method invocation. - - XXX should they be public? */ -void lsmTreeIterator_close(int xid, void * it); -int lsmTreeIterator_next (int xid, void * it); -int lsmTreeIterator_key (int xid, void * it, byte **key); -int lsmTreeIterator_value(int xid, void * it, byte **value); +void lsmTreeIterator_close(int xid, lladdIterator_t * it); +int lsmTreeIterator_next (int xid, lladdIterator_t * it); +static inline int lsmTreeIterator_key (int xid, lladdIterator_t *it, + byte **key) { + lsmIteratorImpl * impl = (lsmIteratorImpl*)it->impl; + *key = (byte*)(impl->t+1); + return sizeof(impl->current.size); + +} +static inline int lsmTreeIterator_value(int xid, lladdIterator_t *it, + byte **value) { + lsmIteratorImpl * impl = (lsmIteratorImpl*)it->impl; + *value = (byte*)&(impl->t->ptr); + return sizeof(impl->t->ptr); +} +static inline void lsmTreeIterator_tupleDone(int xid, void *it) { } +static inline void lsmTreeIterator_releaseLock(int xid, void *it) { } #endif // _LSMTREE_H__ diff --git a/test/stasis/check_lsmTree.c b/test/stasis/check_lsmTree.c index 3cb3fc1..f30dfba 100644 --- a/test/stasis/check_lsmTree.c +++ b/test/stasis/check_lsmTree.c @@ -13,34 +13,74 @@ #include #define LOG_NAME "check_lsmTree.log" -#define NUM_ENTRIES 100000 +#define NUM_ENTRIES_A 100000 +#define NUM_ENTRIES_B 10 +#define NUM_ENTRIES_C 0 + #define OFFSET (NUM_ENTRIES * 10) -#define DEBUG(...) +typedef int64_t lsmkey_t; + +int cmp(const void *ap, const void *bp) { + lsmkey_t a = *(lsmkey_t*)ap; + lsmkey_t b = *(lsmkey_t*)bp; + if(a < b) { return -1; } + if(a == b) { return 0; } + return 1; +} + +void insertProbeIter(lsmkey_t NUM_ENTRIES) { + int intcmp = 0; + lsmTreeRegisterComparator(intcmp,cmp); + + Tinit(); + int xid = Tbegin(); + recordid tree = TlsmCreate(xid, intcmp, sizeof(lsmkey_t)); + for(lsmkey_t i = 0; i < NUM_ENTRIES; i++) { + long pagenum = TlsmFindPage(xid, tree, (byte*)&i); + assert(pagenum == -1); + DEBUG("TlsmAppendPage %d\n",i); + TlsmAppendPage(xid, tree, (const byte*)&i, i + OFFSET); + pagenum = TlsmFindPage(xid, tree, (byte*)&i); + assert(pagenum == i + OFFSET); + } + + for(lsmkey_t i = 0; i < NUM_ENTRIES; i++) { + long pagenum = TlsmFindPage(xid, tree, (byte*)&i); + assert(pagenum == i + OFFSET); + } + + int64_t count = 0; + + lladdIterator_t * it = lsmTreeIterator_open(xid, tree); + + while(lsmTreeIterator_next(xid, it)) { + lsmkey_t * key; + lsmkey_t **key_ptr = &key; + int size = lsmTreeIterator_key(xid, it, (byte**)key_ptr); + assert(size == sizeof(lsmkey_t)); + long *value; + long **value_ptr = &value; + size = lsmTreeIterator_value(xid, it, (byte**)value_ptr); + assert(size == sizeof(pageid_t)); + assert(*key + OFFSET == *value); + assert(*key == count); + count++; + } + assert(count == NUM_ENTRIES); + + lsmTreeIterator_close(xid, it); + + Tcommit(xid); + Tdeinit(); +} /** @test */ START_TEST(lsmTreeTest) { - Tinit(); - int xid = Tbegin(); - recordid tree = TlsmCreate(xid, 0, sizeof(int)); // xxx comparator not set. - for(int i = 0; i < NUM_ENTRIES; i++) { - long pagenum = TlsmFindPage(xid, tree, (byte*)&i, sizeof(int)); - assert(pagenum == -1); - DEBUG("TlsmAppendPage %d\n",i); - TlsmAppendPage(xid, tree, (const byte*)&i, sizeof(int), i + OFFSET); - // fflush(NULL); - pagenum = TlsmFindPage(xid, tree, (byte*)&i, sizeof(int)); - assert(pagenum == i + OFFSET); - } - - for(int i = 0; i < NUM_ENTRIES; i++) { - long pagenum = TlsmFindPage(xid, tree, (byte*)&i, sizeof(int)); - assert(pagenum == i + OFFSET); - } - - Tcommit(xid); - Tdeinit(); + insertProbeIter(NUM_ENTRIES_A); + insertProbeIter(NUM_ENTRIES_B); + insertProbeIter(NUM_ENTRIES_C); } END_TEST Suite * check_suite(void) {