diff --git a/src/stasis/Makefile.am b/src/stasis/Makefile.am index 9a70e01..ba6276d 100644 --- a/src/stasis/Makefile.am +++ b/src/stasis/Makefile.am @@ -14,7 +14,7 @@ libstasis_la_SOURCES=crc32.c redblack.c lhtable.c doubleLinkedList.c common.c st operations/naiveLinearHash.c operations/nestedTopActions.c \ operations/linearHashNTA.c operations/linkedListNTA.c \ operations/pageOrientedListNTA.c operations/bTree.c \ - operations/regions.c \ + operations/regions.c operations/lsmTree.c \ io/rangeTracker.c io/memory.c io/file.c io/non_blocking.c io/debug.c \ bufferManager/pageArray.c bufferManager/bufferHash.c \ replacementPolicy/lru.c replacementPolicy/lruFast.c diff --git a/src/stasis/operations/linearHashNTA.c b/src/stasis/operations/linearHashNTA.c index 1a845a9..89ee5c7 100644 --- a/src/stasis/operations/linearHashNTA.c +++ b/src/stasis/operations/linearHashNTA.c @@ -3,20 +3,18 @@ #include "../latches.h" #include #include -#include "../page.h" -#include #include #include -#include +// The next two #includes are for deprecated code. #include #include #include "../logger/logMemory.h" /** - re-entrant implementation of a linear hash hable, using nensted top actions. - + re-entrant implementation of a linear hash hable, using nested top actions. + @file - - @todo Improve concurrency of linearHashNTA and linkedListNTA. + + @todo Improve concurrency of linearHashNTA and linkedListNTA by leveraging Page.impl on the data structure header page? */ static pthread_mutex_t linear_hash_mutex;// = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP; diff --git a/src/stasis/operations/lsmTree.c b/src/stasis/operations/lsmTree.c new file mode 100644 index 0000000..48ed148 --- /dev/null +++ b/src/stasis/operations/lsmTree.c @@ -0,0 +1,554 @@ +#include +#include +// XXX including fixed.h breaks page api encapsulation; we need a "last slot" +// call. +#include "../page/fixed.h" +#include + +const int MAX_LSM_COMPARATORS = 256; + +typedef struct nodeRecord { + pageid_t ptr; + int key; + // char funk[1000]; +} nodeRecord; + +#define HEADER_SIZE (2 * sizeof(nodeRecord)) + +typedef struct lsmTreeState { + // pthread_mutex_t mut; + // pageid_t * dirtyPages; + pageid_t lastLeaf; +} lsmTreeState; + +/** Initialize a page for use as an internal node of the tree. + * lsmTree nodes are based on fixed.h. This function allocates a page + * that can hold fixed length records, and then sets up a tree node + * header in the first two nodeRecords on the page. + */ +static void initializeNodePage(int xid, Page * p) { + fixedPageInitialize(p, sizeof(nodeRecord), 0); + recordid reserved1 = recordPreAlloc(xid, p, sizeof(nodeRecord)); + recordPostAlloc(xid, p, reserved1); + recordid reserved2 = recordPreAlloc(xid, p, sizeof(nodeRecord)); + recordPostAlloc(xid, p, reserved2); +} + +/** + * A macro that hardcodes the page implementation to use fixed.h's page implementation. + */ + +#define readNodeRecord(xid,p,slot) readNodeRecordFixed(xid,p,slot) +/** + * @see readNodeRecord + */ +#define writeNodeRecord(xid,p,slot,key,ptr) writeNodeRecordFixed(xid,p,slot,key,ptr) +//#define readNodeRecord(xid,p,slot) readNodeRecordVirtualMethods(xid,p,slot) +//#define writeNodeRecord(xid,p,slot,key,ptr) writeNodeRecordVirtualMethods(xid,p,slot,key,ptr) + +/** + * Read a record from the page node, assuming the nodes are fixed pages. + */ +static inline nodeRecord readNodeRecordFixed(int xid, Page * const p, int slot) { + return *(nodeRecord*)fixed_record_ptr(p, slot); +} +/** + * Read a record from the page node, using stasis' general-purpose page access API. + */ +static inline nodeRecord readNodeRecordVirtualMethods(int xid, Page * const p, int slot) { + nodeRecord ret; + + recordid rid = {p->id, slot, sizeof(nodeRecord)}; + const nodeRecord * nr = (const nodeRecord*)recordReadNew(xid,p,rid); + ret = *nr; + assert(ret.ptr > 1 || slot < 2); + recordReadDone(xid,p,rid,(const byte*)nr); + + DEBUG("reading {%lld, %d, %d} = %d, %lld\n", p->id, slot, sizeof(nodeRecord), ret.key, ret.ptr); + + return ret; +} + +/** + @see readNodeFixed + */ +static inline void writeNodeRecordFixed(int xid, Page * const p, int slot, int key, pageid_t ptr) { + nodeRecord * nr = (nodeRecord*)fixed_record_ptr(p,slot); + nr->key = key; + nr->ptr = ptr; + pageWriteLSN(xid, p, 0); // XXX need real LSN? +} + +/** + @see readNodeVirtualMethods +*/ +static inline void writeNodeRecordVirtualMethods(int xid, Page * const p, int slot, int key, pageid_t ptr) { + nodeRecord src; + src.key = key; + src.ptr = ptr; + assert(src.ptr > 1 || slot < 2); + + recordid rid = {p->id, slot, sizeof(nodeRecord)}; + nodeRecord * target = (nodeRecord*)recordWriteNew(xid,p,rid); + *target = src; + DEBUG("Writing to record {%d %d %lld}\n", rid.page, rid.slot, rid.size); + recordWriteDone(xid,p,rid,(byte*)target); + pageWriteLSN(xid, p, 0); // XXX need real LSN? +} + +/** + + The implementation strategy used here is a bit of an experiment. + + LSM tree is updated using a FORCE/STEAL strategy. In order to do + this efficiently, its root node overrides fixedPage, adding + pageLoaded and pageFlushed callbacks. Those callbacks maintain an + impl pointer, which tracks dirty pages, a mutex, and other + information on behalf of the tree. (Note that the dirtyPage list + must be stored in a global hash tree if the root is evicted with + outstanding dirty tree pages...) + + Note that this has a particularly nice, general purpose property + that may be useful for other data structure implementations; by + using a mutex associated with the root of the data structure, we + can get rid of the static locks used by existing implementations. + + @todo Need easy way for operations to store things in p->impl, even + if the underlying page implementation wants to store something + there too (second pointer?) + + Page layout information for lsm trees: + + root page layout + ---------------- + + uses fixedPage (for now) + + slot 0: depth of tree. + slot 1: slot id of first key in leaf records. [unimplemented] + + the remainder of the slots contain nodeRecords + + internal node page layout + ------------------------- + uses fixedPage (for now) + + slot 0: prev page [unimplemented] + slot 1: next page [unimplemented] + the remainder of the slots contain nodeRecords + + leaf page layout + ---------------- + + Defined by client, but calling readRecord() on the slot id must + return the first key stored on the page. + +*/ +recordid TlsmCreate(int xid, int leafFirstSlot, int keySize) { + // XXX generalize later + assert(keySize == sizeof(int)); + + // XXX hardcoded to fixed.h's current page layout, and node records + // that contain the key... + + // can the pages hold at least two keys? + assert(HEADER_SIZE + 2 * (sizeof(nodeRecord) /*XXX +keySize*/) < + USABLE_SIZE_OF_PAGE - 2 * sizeof(short)); + + pageid_t root = TpageAlloc(xid); + + recordid ret = { root, 0, 0 }; + + Page * const p = loadPage(xid, ret.page); + writelock(p->rwlatch,0); + fixedPageInitialize(p, sizeof(nodeRecord), 0); + *page_type_ptr(p) = LSM_ROOT_PAGE; + + lsmTreeState * state = malloc(sizeof(lsmTreeState)); + state->lastLeaf = -1; /// constants.h + // pthread_mutex_init(&(state->mut),0); + // state->dirtyPages = malloc(sizeof(Page*)*2); + // state->dirtyPages[0] = ret.page; + // state->dirtyPages[1] = -1; // XXX this should be defined in constants.h + + p->impl = state; + + recordid treeDepth = recordPreAlloc(xid, p, sizeof(nodeRecord)); + recordPostAlloc(xid,p,treeDepth); + + assert(treeDepth.page == ret.page + && treeDepth.slot == 0 + && treeDepth.size == sizeof(nodeRecord)); + + recordid slotOff = recordPreAlloc(xid, p, sizeof(nodeRecord)); + recordPostAlloc(xid,p,slotOff); + + assert(slotOff.page == ret.page + && slotOff.slot == 1 + && slotOff.size == sizeof(nodeRecord)); + + // ptr is zero because tree depth starts out as zero. + writeNodeRecord(xid, p, 0, 0, 0); + // ptr = slotOff (which isn't used, for now...) + writeNodeRecord(xid, p, 1, 0, leafFirstSlot); + + unlock(p->rwlatch); + releasePage(p); + return ret; +} + +static recordid buildPathToLeaf(int xid, recordid root, Page * const root_p, + int depth, const byte * key, size_t key_len, + pageid_t val_page) { + // root is the recordid on the root page that should point to the + // new subtree. + assert(depth); + DEBUG("buildPathToLeaf(depth=%d) called\n",depth); + + pageid_t child = TpageAlloc(xid); // XXX Use some other function... + + Page * const child_p = loadPage(xid, child); + writelock(child_p->rwlatch,0); + initializeNodePage(xid, child_p); + + recordid ret; + + if(depth-1) { + // recurse: the page we just allocated is not a leaf. + recordid child_rec = recordPreAlloc(xid, child_p, sizeof(nodeRecord)); + assert(child_rec.size != INVALID_SLOT); + recordPostAlloc(xid, child_p, child_rec); + + ret = buildPathToLeaf(xid, child_rec, child_p, depth-1, key, key_len, + val_page); + } else { + // set leaf + recordid leaf_rec = recordPreAlloc(xid, child_p, sizeof(nodeRecord)); + assert(leaf_rec.slot == 2); // XXX + recordPostAlloc(xid, child_p, leaf_rec); + writeNodeRecord(xid,child_p,leaf_rec.slot,*(int*)key,val_page); + + ret = leaf_rec; + } + unlock(child_p->rwlatch); + releasePage(child_p); + + writeNodeRecord(xid, root_p, root.slot, *(int*)key, child); + + return ret; +} + +/* adding pages: + + 1) Try to append value to lsmTreeState->lastLeaf + + 2) If that fails, traverses down the root of the tree, split pages while + traversing back up. + + 3) Split is done by adding new page at end of row (no key + redistribution), except at the root, where root contents are + pushed into the first page of the next row, and a new path from root to + leaf is created starting with the root's immediate second child. + +*/ + +static recordid appendInternalNode(int xid, Page * const p, + int depth, + const byte *key, size_t key_len, + pageid_t val_page) { + if(!depth) { + // leaf node. + recordid ret = recordPreAlloc(xid, p, sizeof(nodeRecord)); + if(ret.size != INVALID_SLOT) { + recordPostAlloc(xid, p, ret); + writeNodeRecord(xid,p,ret.slot,*(int*)key,val_page); + assert(val_page); // XXX + } + return ret; + } else { + // recurse + int slot = *recordcount_ptr(p)-1; + assert(slot >= 2); // XXX + nodeRecord nr = readNodeRecord(xid, p, slot); + pageid_t child_id = nr.ptr; + recordid ret; + { + Page * const child_page = loadPage(xid, child_id); + writelock(child_page->rwlatch,0); + ret = appendInternalNode(xid, child_page, depth-1, + key, key_len, val_page); + unlock(child_page->rwlatch); + releasePage(child_page); + } + if(ret.size == INVALID_SLOT) { // subtree is full; split + if(depth > 1) { + DEBUG("subtree is full at depth %d\n", depth); + } + + ret = recordPreAlloc(xid, p, sizeof(nodeRecord)); + if(ret.size != INVALID_SLOT) { + recordPostAlloc(xid, p, ret); + ret = buildPathToLeaf(xid, ret, p, depth, key, key_len, val_page); + + DEBUG("split tree rooted at %lld, wrote value to {%d %d %lld}\n", p->id, ret.page, ret.slot, ret.size); + } else { + // ret is NULLRID; this is the root of a full tree. Return NULLRID to the caller. + } + } else { + // we inserted the value in to a subtree rooted here. + } + return ret; + } +} + +/** + * Traverse from the root of the page to the right most leaf (the one + * with the higest base key value). + */ +static pageid_t findLastLeaf(int xid, Page * const root, int depth) { + if(!depth) { + DEBUG("Found last leaf = %lld\n", root->id); + return root->id; + } else { + nodeRecord nr = readNodeRecord(xid, root, (*recordcount_ptr(root))-1); + pageid_t ret; + { + Page * const p = loadPage(xid, nr.ptr); + writelock(p->rwlatch,0); + ret = findLastLeaf(xid,p,depth-1); + unlock(p->rwlatch); + releasePage(p); + } + return ret; + } +} + +recordid TlsmAppendPage(int xid, recordid tree, + const byte *key, size_t keySize, + long val_page) { + Page * const p = loadPage(xid, tree.page); + writelock(p->rwlatch, 0); + lsmTreeState * s = p->impl; + // pthread_mutex_lock(&(s->mut)); + + tree.slot = 0; + tree.size = sizeof(nodeRecord); + + nodeRecord nr = readNodeRecord(xid,p,0); + int depth = nr.ptr; + // const nodeRecord * nr = (const nodeRecord*)recordReadNew(xid,p,tree); + // int depth = nr->ptr; + // recordReadDone(xid,p,tree,(const byte*)nr); + + if(s->lastLeaf == -1) { + s->lastLeaf = findLastLeaf(xid, p, depth); + } + Page * lastLeaf; + if(s->lastLeaf != tree.page) { + lastLeaf= loadPage(xid, s->lastLeaf); + writelock(lastLeaf->rwlatch, 0); // tree depth is in slot zero of root + } else { + lastLeaf = p; + } + + recordid ret = recordPreAlloc(xid, lastLeaf, sizeof(nodeRecord)); + + if(ret.size == INVALID_SLOT) { + if(lastLeaf->id != p->id) { + unlock(lastLeaf->rwlatch); + releasePage(lastLeaf); // don't need that page anymore... + } + // traverse down the root of the tree. + + tree.slot = 0; + + assert(tree.page == p->id); + ret = appendInternalNode(xid, p, depth, key, keySize, + val_page); + + if(ret.size == INVALID_SLOT) { + DEBUG("Need to split root; depth = %d\n", depth); + + pageid_t child = TpageAlloc(xid); + + Page * lc = loadPage(xid, child); + + writelock(lc->rwlatch,0); + + initializeNodePage(xid, lc); + + for(int i = 2; i < *recordcount_ptr(p); i++) { + + recordid cnext = recordPreAlloc(xid, lc, sizeof(nodeRecord)); + + assert(i == cnext.slot); // XXX hardcoded to current node format... + assert(cnext.size != INVALID_SLOT); + + recordPostAlloc(xid, lc, cnext); + + nodeRecord nr = readNodeRecord(xid,p,i); + writeNodeRecord(xid,lc,i,nr.key,nr.ptr); + + } + + // deallocate old entries, and update pointer on parent node. + // XXX this is a terrible way to do this. + recordid pFirstSlot = {p->id, 2, sizeof(nodeRecord)}; + *recordcount_ptr(p) = 3; + nodeRecord * nr = (nodeRecord*)recordWriteNew(xid, p, pFirstSlot); + // don't overwrite key... + nr->ptr = child; + assert(nr->ptr > 1);///XXX + recordWriteDone(xid,p,pFirstSlot,(byte*)nr); + pageWriteLSN(xid, p, 0); // XXX need real LSN? + + unlock(lc->rwlatch); + releasePage(lc); + + depth ++; + writeNodeRecord(xid,p,0,0,depth); + + assert(tree.page == p->id); + ret = appendInternalNode(xid, p, depth, key, keySize, + val_page); + assert(ret.size != INVALID_SLOT); + + } else { + DEBUG("Appended new internal node tree depth = %d key = %d\n", depth, *(int*)key); + } + s->lastLeaf = ret.page; + DEBUG("lastleaf is %lld\n", s->lastLeaf); + } else { + + // write the new value to an existing page + DEBUG("Writing %d to existing page# %lld\n", *(int*)key, lastLeaf->id); + + recordPostAlloc(xid, lastLeaf, ret); + + writeNodeRecord(xid, lastLeaf, ret.slot, *(int*)key, val_page); + + if(lastLeaf->id != p->id) { + unlock(lastLeaf->rwlatch); + releasePage(lastLeaf); + } + } + + // XXX do something to make this transactional... + // pthread_mutex_unlock(&(s->mut)); + unlock(p->rwlatch); + releasePage(p); + + return ret; +} + +static pageid_t lsmLookup(int xid, Page * const node, int depth, + const byte *key, size_t keySize) { + // Start at slot 2 to skip reserved slots on page... + if(*recordcount_ptr(node) == 2) { return -1; } + assert(*recordcount_ptr(node) > 2); + nodeRecord prev = readNodeRecord(xid,node,2); + + // should do binary search instead. + for(int i = 3; i < *recordcount_ptr(node); i++) { + nodeRecord rec = readNodeRecord(xid,node,i); + + if(depth) { + + if(prev.key <= *(int*)key && rec.key > *(int*)key) { + pageid_t child_id = prev.ptr; + Page * const child_page = loadPage(xid, child_id); + readlock(child_page->rwlatch,0); + long ret = lsmLookup(xid,child_page,depth-1,key,keySize); + unlock(child_page->rwlatch); + releasePage(child_page); + return ret; + } + + } else { + + if(prev.key == *(int*)key) { + return prev.ptr; + } + } + prev = rec; + + if(prev.key > *(int*)key) { break; } + } + + if(depth) { + + if(prev.key <= *(int*)key) { + pageid_t child_id = prev.ptr; + Page * const child_page = loadPage(xid, child_id); + readlock(child_page->rwlatch,0); + long ret = lsmLookup(xid,child_page,depth-1,key,keySize); + unlock(child_page->rwlatch); + releasePage(child_page); + return ret; + } + + } else { + + if(prev.key == *(int*)key) { + return prev.ptr; + } + + } + return -1; +} + +pageid_t TlsmFindPage(int xid, recordid tree, const byte * key, size_t keySize) { + Page * const p = loadPage(xid, tree.page); + readlock(p->rwlatch,0); + //lsmTreeState * s = p->impl; + // pthread_mutex_lock(&(s->mut)); + + tree.slot = 0; + tree.size = *recordsize_ptr(p); + + nodeRecord nr = readNodeRecord(xid, p , 0); + // const nodeRecord * nr = (const nodeRecord*)recordReadNew(xid, p, tree); + + int depth = nr.ptr; + + pageid_t ret = lsmLookup(xid, p, depth, key, keySize); + + // recordReadDone(xid, p, tree, (const byte*)nr); + //pthread_mutex_unlock(&(s->mut)); + unlock(p->rwlatch); + releasePage(p); + + return ret; + +} + +/** + The buffer manager calls this when the lsmTree's root page is + loaded. This function allocates some storage for cached values + associated with the tree. +*/ +static void lsmPageLoaded(Page *p) { + lsmTreeState * state = malloc(sizeof(lsmTreeState)); + state->lastLeaf = -1; + //pthread_mutex_init(&(state->mut),0); + p->impl = state; +} +/** + Free any soft state associated with the tree rooted at page p. + This is called by the buffer manager. +*/ +static void lsmPageFlushed(Page *p) { + lsmTreeState * state = p->impl; + //pthread_mutex_destroy(&(state->mut)); + free(state); +} +/** + A page_impl for the root of an lsmTree. +*/ +page_impl lsmRootImpl() { + page_impl pi = fixedImpl(); + pi.pageLoaded = lsmPageLoaded; + pi.pageFlushed = lsmPageFlushed; + pi.page_type = LSM_ROOT_PAGE; + return pi; +} diff --git a/src/stasis/page.c b/src/stasis/page.c index 6ba8fac..88db34e 100644 --- a/src/stasis/page.c +++ b/src/stasis/page.c @@ -133,6 +133,7 @@ void pageInit() { registerPageType(arrayListImpl()); registerPageType(blobImpl()); registerPageType(indirectImpl()); + registerPageType(lsmRootImpl()); } void pageDeinit() { diff --git a/src/stasis/page/fixed.c b/src/stasis/page/fixed.c index 96fd3d6..6242825 100644 --- a/src/stasis/page/fixed.c +++ b/src/stasis/page/fixed.c @@ -32,7 +32,6 @@ static void checkRid(Page * page, recordid rid) { fixedPageInitialize(page, rid.size, fixedRecordsPerPage(rid.size)); } - assert(*page_type_ptr(page) == FIXED_PAGE || *page_type_ptr(page) == ARRAY_LIST_PAGE); assert(page->id == rid.page); assert(*recordsize_ptr(page) == rid.size); assert(fixedRecordsPerPage(rid.size) > rid.slot); @@ -174,7 +173,10 @@ page_impl fixedImpl() { return pi; } -page_impl arrayListImpl() { +/** + @todo arrayListImpl belongs in arrayList.c +*/ +page_impl arrayListImpl() { page_impl pi = fixedImpl(); pi.page_type = ARRAY_LIST_PAGE; return pi; diff --git a/src/stasis/page/fixed.h b/src/stasis/page/fixed.h index d5b226a..76114ad 100644 --- a/src/stasis/page/fixed.h +++ b/src/stasis/page/fixed.h @@ -11,4 +11,6 @@ void fixedPageInit(); void fixedPageDeinit(); page_impl fixedImpl(); page_impl arrayListImpl(); +// @todo move lsmTreeImpl() to lsmTree.h (but first, move page.h...) +page_impl lsmRootImpl(); #endif diff --git a/stasis/constants.h b/stasis/constants.h index ae0e026..40c6a3b 100644 --- a/stasis/constants.h +++ b/stasis/constants.h @@ -227,6 +227,7 @@ extern const short SLOT_TYPE_LENGTHS[]; #define ARRAY_LIST_PAGE 6 #define BOUNDARY_TAG_PAGE 7 #define BLOB_PAGE 8 +#define LSM_ROOT_PAGE 9 #define USER_DEFINED_PAGE(n) (100+n) // 0 <= n < 155 #define MAX_PAGE_TYPE 255 diff --git a/stasis/operations.h b/stasis/operations.h index 1339cb7..1ea398f 100644 --- a/stasis/operations.h +++ b/stasis/operations.h @@ -168,7 +168,7 @@ typedef struct { #include "operations/pageOrientedListNTA.h" #include "operations/linearHashNTA.h" #include "operations/regions.h" - +#include "operations/lsmTree.h" extern Operation operationsTable[]; /* [MAX_OPERATIONS]; memset somewhere */ diff --git a/stasis/operations/linearHashNTA.h b/stasis/operations/linearHashNTA.h index 241053b..5967058 100644 --- a/stasis/operations/linearHashNTA.h +++ b/stasis/operations/linearHashNTA.h @@ -1,10 +1,8 @@ -/** +/** @file A reliable hashtable implementation. The implementation makes - use of nested top actions, and is reentrant. Currently, all keys - and values must be of the same length, although this restriction - will eventually be removed. + use of nested top actions, and is reentrant. The implementation uses a linear hash function, allowing the bucket list to be resized dynamically. Because the bucket list is diff --git a/stasis/operations/lsmTree.h b/stasis/operations/lsmTree.h new file mode 100644 index 0000000..aa3eb10 --- /dev/null +++ b/stasis/operations/lsmTree.h @@ -0,0 +1,84 @@ +#ifndef _LSMTREE_H__ +#define _LSMTREE_H__ +/** + @file + + A log structured merge tree implementation. This implementation + performs atomic bulk append operations to reduce logging overheads, + and does not support in place updates. + + However, once written, the page format of internal nodes is similar + to that of a conventional b-tree, while leaf nodes may be provided + by any page type that allows records to be appendend to a page, and + read by slot id. + + For now, LSM-trees only support fixed length keys; this restriction + will be lifted in the future. +*/ +#include +#include +typedef struct { + recordid treeRoot; + recordid pos; +} lladd_lsm_iterator; + +typedef struct { + int id; + // fcn pointer... +} comparator_impl; + +void lsmTreeRegisterComparator(comparator_impl i); +extern const int MAX_LSM_COMPARATORS; + +/** + Initialize a new LSM tree. + + @param comparator. The id of the comparator this tree should use. + (It must have been registered with lsmTreeRegisterComparator + before TlsmCreate() is called. +*/ +recordid TlsmCreate(int xid, int comparator, int keySize); +/** + Free the space associated with an LSM tree. + */ +recordid TlsmDealloc(int xid, recordid tree); +/** + Append a new leaf page to an LSM tree. Leaves must be appended in + ascending order; LSM trees do not support update in place. +*/ +recordid TlsmAppendPage(int xid, recordid tree, + const byte *key, size_t keySize, + long pageid); +/** + Lookup a leaf page. + + @param key The value you're looking for. The first page that may + contain this value will be returned. (lsmTree supports + duplicate keys...) + + @param keySize Must match the keySize passed to TlsmCreate. + Currently unused. +*/ +pageid_t TlsmFindPage(int xid, recordid tree, + const byte *key, size_t keySize); + +/** + Return a forward iterator over the tree's leaf pages (*not* their + contents). +*/ +lladdIterator_t * TlsmIterator(int xid, recordid hash); + +/** + These are the functions that implement lsmTree's iterator. + + They're public so that performance critical code can call them + without paying for a virtual method invocation. + + XXX should they be public? +*/ +void lsmTreeIterator_close(int xid, void * it); +int lsmTreeIterator_next (int xid, void * it); +int lsmTreeIterator_key (int xid, void * it, byte **key); +int lsmTreeIterator_value(int xid, void * it, byte **value); + +#endif // _LSMTREE_H__ diff --git a/test/stasis/Makefile.am b/test/stasis/Makefile.am index 02bd606..c0276ff 100644 --- a/test/stasis/Makefile.am +++ b/test/stasis/Makefile.am @@ -2,7 +2,7 @@ if HAVE_LIBCHECK ## Had to disable check_lht because lht needs to be rewritten. -TESTS = check_lhtable check_logEntry check_logWriter check_page check_operations check_transactional2 check_recovery check_blobRecovery check_bufferManager check_indirect check_pageOperations check_linearHash check_logicalLinearHash check_header check_linkedListNTA check_linearHashNTA check_pageOrientedList check_lockManager check_compensations check_errorHandling check_ringbuffer check_iterator check_multiplexer check_bTree check_regions check_allocationPolicy check_io check_rangeTracker check_replacementPolicy +TESTS = check_lhtable check_logEntry check_logWriter check_page check_operations check_transactional2 check_recovery check_blobRecovery check_bufferManager check_indirect check_pageOperations check_linearHash check_logicalLinearHash check_header check_linkedListNTA check_linearHashNTA check_pageOrientedList check_lockManager check_compensations check_errorHandling check_ringbuffer check_iterator check_multiplexer check_bTree check_regions check_allocationPolicy check_io check_rangeTracker check_replacementPolicy check_lsmTree #check_lladdhash else TESTS = diff --git a/test/stasis/check_lsmTree.c b/test/stasis/check_lsmTree.c new file mode 100644 index 0000000..3cb3fc1 --- /dev/null +++ b/test/stasis/check_lsmTree.c @@ -0,0 +1,63 @@ +#include +#include +#include "../check_includes.h" + +#include + +#include +#include +#include +#include + +#include +#include + +#define LOG_NAME "check_lsmTree.log" +#define NUM_ENTRIES 100000 +#define OFFSET (NUM_ENTRIES * 10) + +#define DEBUG(...) +/** @test +*/ +START_TEST(lsmTreeTest) +{ + Tinit(); + int xid = Tbegin(); + recordid tree = TlsmCreate(xid, 0, sizeof(int)); // xxx comparator not set. + for(int i = 0; i < NUM_ENTRIES; i++) { + long pagenum = TlsmFindPage(xid, tree, (byte*)&i, sizeof(int)); + assert(pagenum == -1); + DEBUG("TlsmAppendPage %d\n",i); + TlsmAppendPage(xid, tree, (const byte*)&i, sizeof(int), i + OFFSET); + // fflush(NULL); + pagenum = TlsmFindPage(xid, tree, (byte*)&i, sizeof(int)); + assert(pagenum == i + OFFSET); + } + + for(int i = 0; i < NUM_ENTRIES; i++) { + long pagenum = TlsmFindPage(xid, tree, (byte*)&i, sizeof(int)); + assert(pagenum == i + OFFSET); + } + + Tcommit(xid); + Tdeinit(); +} END_TEST + +Suite * check_suite(void) { + Suite *s = suite_create("lsmTree"); + /* Begin a new test */ + TCase *tc = tcase_create("simple"); + + tcase_set_timeout(tc, 1200); // 20 minute timeout + /* Sub tests are added, one per line, here */ + tcase_add_test(tc, lsmTreeTest); + + /* --------------------------------------------- */ + + tcase_add_checked_fixture(tc, setup, teardown); + + suite_add_tcase(s, tc); + return s; +} + +#include "../check_setup.h"