From 5bd2138a8b4a63fbf0ae97f24f52d3fa298b3a73 Mon Sep 17 00:00:00 2001 From: Sears Russell Date: Mon, 20 Aug 2007 21:58:20 +0000 Subject: [PATCH] Fixes a number of memory leaks; adds pageCleanup() callback, and allows callers to use custom allocators with LSM trees --- benchmarks/Makefile.am | 2 +- clean.sh | 3 +++ src/stasis/Makefile.am | 3 ++- src/stasis/allocationPolicy.c | 4 ++- src/stasis/blobManager.c | 6 +++-- src/stasis/bufferManager/bufferHash.c | 5 +++- src/stasis/operations/alloc.c | 3 +++ src/stasis/operations/lsmTree.c | 33 +++++++++++++++++++----- src/stasis/operations/nestedTopActions.c | 3 +++ src/stasis/page.c | 7 +++++ src/stasis/page.h | 3 ++- src/stasis/page/fixed.c | 3 +++ src/stasis/page/fixed.h | 2 +- src/stasis/page/indirect.c | 3 ++- src/stasis/page/slotted.c | 2 ++ src/stasis/transactional2.c | 2 ++ stasis/operations/alloc.h | 2 +- stasis/operations/lsmTree.h | 8 +++++- stasis/operations/nestedTopActions.h | 1 + 19 files changed, 78 insertions(+), 17 deletions(-) diff --git a/benchmarks/Makefile.am b/benchmarks/Makefile.am index b6f6a47..7a35faf 100644 --- a/benchmarks/Makefile.am +++ b/benchmarks/Makefile.am @@ -4,7 +4,7 @@ LDADD=$(top_builddir)/src/stasis/libstasis.la \ if BUILD_BENCHMARKS noinst_PROGRAMS=lhtableThreaded naiveHash logicalHash readLogicalHash naiveMultiThreaded logicalMultThreaded rawSet \ arrayListSet logicalMultiReaders linearHashNTA linkedListNTA pageOrientedListNTA \ - linearHashNTAThreaded linearHashNTAMultiReader linearHashNTAWriteRequests transitiveClosure zeroCopy + linearHashNTAThreaded linearHashNTAMultiReader linearHashNTAWriteRequests transitiveClosure zeroCopy sequentialThroughput endif AM_CFLAGS=${GLOBAL_CFLAGS} diff --git a/clean.sh b/clean.sh index ed08e1a..925bfde 100755 --- a/clean.sh +++ b/clean.sh @@ -5,6 +5,9 @@ find . | grep \~$ | xargs rm -f find . -name '*.bb' | xargs rm -f find . -name '*.bbg' | xargs rm -f find . -name '*.da' | xargs rm -f +find . -name '*.o' | xargs rm -f +find . -name '*.lo' | xargs rm -f +find . -name '*.Plo' | xargs rm -f find . | perl -ne 'print if (/\/core(\.\d+)?$/)' | xargs rm -f find . | perl -ne 'print if (/\/Makefile.in$/)' | xargs rm -f find . | perl -ne 'print if (/\/storefile.txt$/)' | xargs rm -f diff --git a/src/stasis/Makefile.am b/src/stasis/Makefile.am index eb0e9f3..72ac460 100644 --- a/src/stasis/Makefile.am +++ b/src/stasis/Makefile.am @@ -15,7 +15,8 @@ libstasis_la_SOURCES=crc32.c redblack.c lhtable.c doubleLinkedList.c common.c fl operations/linearHashNTA.c operations/linkedListNTA.c \ operations/pageOrientedListNTA.c operations/bTree.c \ operations/regions.c operations/lsmTree.c \ - io/rangeTracker.c io/memory.c io/file.c io/pfile.c io/non_blocking.c io/debug.c \ + io/rangeTracker.c io/memory.c io/file.c io/pfile.c io/non_blocking.c \ + io/debug.c \ bufferManager/pageArray.c bufferManager/bufferHash.c \ replacementPolicy/lru.c replacementPolicy/lruFast.c AM_CFLAGS=${GLOBAL_CFLAGS} diff --git a/src/stasis/allocationPolicy.c b/src/stasis/allocationPolicy.c index 02a0891..5f9d8bb 100644 --- a/src/stasis/allocationPolicy.c +++ b/src/stasis/allocationPolicy.c @@ -231,10 +231,12 @@ void allocationPolicyDeinit(allocationPolicy * ap) { RB_ENTRY(delete)(next, ap->availablePages); free((void*)next); } - LH_ENTRY(destroy)(ap->xidAlloced); + LH_ENTRY(destroy)(ap->xidDealloced); RB_ENTRY(destroy)(ap->availablePages); LH_ENTRY(destroy)(ap->pageOwners); + LH_ENTRY(destroy)(ap->allPages); + free(ap); } diff --git a/src/stasis/blobManager.c b/src/stasis/blobManager.c index 7dd180d..b44cb02 100644 --- a/src/stasis/blobManager.c +++ b/src/stasis/blobManager.c @@ -63,12 +63,13 @@ void writeBlob(int xid, Page * p2, lsn_t lsn, recordid rid, const byte * buf) { static int notSupported(int xid, Page * p) { return 0; } -void blobLoaded(Page *p) { +static void blobLoaded(Page *p) { p->LSN = *lsn_ptr(p); } -void blobFlushed(Page *p) { +static void blobFlushed(Page *p) { *lsn_ptr(p) = p->LSN; } +static void blobCleanup(Page *p) { } static page_impl pi = { BLOB_PAGE, @@ -93,6 +94,7 @@ static page_impl pi = { 0, //XXX page_impl_dereference_identity, blobLoaded, blobFlushed, + blobCleanup }; page_impl blobImpl() { return pi; diff --git a/src/stasis/bufferManager/bufferHash.c b/src/stasis/bufferManager/bufferHash.c index 1076a2f..e7006b5 100644 --- a/src/stasis/bufferManager/bufferHash.c +++ b/src/stasis/bufferManager/bufferHash.c @@ -109,6 +109,7 @@ inline static Page * writeBackOnePage() { // printf("Write(%ld)\n", (long)victim->id); pageWrite(victim); + pageCleanup(victim); // Make sure that no one mistakenly thinks this is still a live copy. victim->id = -1; @@ -277,6 +278,7 @@ static void bhBufDeinit() { LH_ENTRY(openlist)(cachedPages, &iter); while((next = LH_ENTRY(readlist)(&iter))) { pageWrite((next->value)); + pageCleanup((next->value)); // normally called by writeBackOnePage() } LH_ENTRY(closelist)(&iter); LH_ENTRY(destroy)(cachedPages); @@ -299,6 +301,7 @@ static void bhSimulateBufferManagerCrash() { Page * p = next->value; writelock(p->rwlatch,0); pageFlushed(p); // normally, pageWrite() would call this... + pageCleanup(p); // normally called by writeBackOnePage() unlock(p->rwlatch); } LH_ENTRY(closelist)(&iter); @@ -331,7 +334,7 @@ void bhBufInit() { cachedPages = LH_ENTRY(create)(MAX_BUFFER_SIZE); - freeListLength = MAX_BUFFER_SIZE / 10; + freeListLength = 9 * MAX_BUFFER_SIZE / 10; freeLowWater = freeListLength - 5; freeCount = 0; pageCount = 0; diff --git a/src/stasis/operations/alloc.c b/src/stasis/operations/alloc.c index 8995509..aba80e3 100644 --- a/src/stasis/operations/alloc.c +++ b/src/stasis/operations/alloc.c @@ -160,6 +160,9 @@ void TallocInit() { allocPolicy = allocationPolicyInit(); // pthread_mutex_init(&talloc_mutex, NULL); } +void TallocDeinit() { + allocationPolicyDeinit(allocPolicy); +} static void reserveNewRegion(int xid) { int firstPage = TregionAlloc(xid, TALLOC_REGION_SIZE, STORAGE_MANAGER_TALLOC); diff --git a/src/stasis/operations/lsmTree.c b/src/stasis/operations/lsmTree.c index 2a94498..f2054ce 100644 --- a/src/stasis/operations/lsmTree.c +++ b/src/stasis/operations/lsmTree.c @@ -76,6 +76,19 @@ void lsmTreeRegisterComparator(int id, lsm_comparator_t i) { */ +static pageid_t defaultAllocator(int xid, void *ignored) { + return TpageAlloc(xid); +} + +static pageid_t (*pageAllocator)(int xid, void *ignored) = defaultAllocator; +static void *pageAllocatorConfig; +void TlsmSetPageAllocator(pageid_t (*allocer)(int xid, void * ignored), + void * config) { + pageAllocator = allocer; + pageAllocatorConfig = config; +} + + typedef struct lsmTreeState { pageid_t lastLeaf; @@ -119,7 +132,7 @@ static void initializeNodePage(int xid, Page *p, size_t keylen) { */ static inline size_t getKeySizeFixed(int xid, Page const *p) { - return *recordsize_ptr(p) - sizeof(lsmTreeNodeRecord); + return (*recordsize_ptr(p)) - sizeof(lsmTreeNodeRecord); } static inline size_t getKeySizeVirtualMethods(int xid, Page *p) { @@ -190,7 +203,7 @@ recordid TlsmCreate(int xid, int comparator, int keySize) { assert(HEADER_SIZE + 2 * (sizeof(lsmTreeNodeRecord) +keySize) < USABLE_SIZE_OF_PAGE - 2 * sizeof(short)); - pageid_t root = TpageAlloc(xid); + pageid_t root = pageAllocator(xid, pageAllocatorConfig); DEBUG("Root = %lld\n", root); recordid ret = { root, 0, 0 }; @@ -237,7 +250,7 @@ static recordid buildPathToLeaf(int xid, recordid root, Page *root_p, assert(depth); DEBUG("buildPathToLeaf(depth=%d) (lastleaf=%lld) called\n",depth, lastLeaf); - pageid_t child = TpageAlloc(xid); // XXX Use some other function... + pageid_t child = pageAllocator(xid, pageAllocatorConfig); // XXX Use some other function... DEBUG("new child = %lld internal? %d\n", child, depth-1); Page *child_p = loadPage(xid, child); @@ -316,6 +329,8 @@ static recordid appendInternalNode(int xid, Page *p, int depth, const byte *key, size_t key_len, pageid_t val_page, pageid_t lastLeaf) { + assert(*page_type_ptr(p) == LSM_ROOT_PAGE || + *page_type_ptr(p) == FIXED_PAGE); if(!depth) { // leaf node. recordid ret = recordPreAlloc(xid, p, sizeof(lsmTreeNodeRecord)+key_len); @@ -449,7 +464,7 @@ recordid TlsmAppendPage(int xid, recordid tree, if(ret.size == INVALID_SLOT) { DEBUG("Need to split root; depth = %d\n", depth); - pageid_t child = TpageAlloc(xid); + pageid_t child = pageAllocator(xid, pageAllocatorConfig); Page *lc = loadPage(xid, child); writelock(lc->rwlatch,0); @@ -629,15 +644,17 @@ pageid_t TlsmFindPage(int xid, recordid tree, const byte *key) { associated with the tree. */ static void lsmPageLoaded(Page *p) { + /// XXX should call fixedLoaded, or something... lsmTreeState *state = malloc(sizeof(lsmTreeState)); state->lastLeaf = -1; p->impl = state; } +static void lsmPageFlushed(Page *p) { } /** Free any soft state associated with the tree rooted at page p. This is called by the buffer manager. */ -static void lsmPageFlushed(Page *p) { +static void lsmPageCleanup(Page *p) { lsmTreeState *state = p->impl; free(state); } @@ -648,6 +665,7 @@ page_impl lsmRootImpl() { page_impl pi = fixedImpl(); pi.pageLoaded = lsmPageLoaded; pi.pageFlushed = lsmPageFlushed; + pi.pageCleanup = lsmPageCleanup; pi.page_type = LSM_ROOT_PAGE; return pi; } @@ -706,7 +724,7 @@ int lsmTreeIterator_next(int xid, lladdIterator_t *it) { impl->current = fixedNext(xid, impl->p, impl->current); if(impl->current.size == INVALID_SLOT) { const lsmTreeNodeRecord *next_rec = readNodeRecord(xid,impl->p,NEXT_LEAF, - impl->current.size); + keySize); unlock(impl->p->rwlatch); releasePage(impl->p); @@ -722,6 +740,9 @@ int lsmTreeIterator_next(int xid, lladdIterator_t *it) { impl->p = 0; impl->current.size = -1; } + } else { + assert(impl->current.size == keySize + sizeof(lsmTreeNodeRecord)); + impl->current.size = keySize; } if(impl->current.size != INVALID_SLOT) { impl->t = readNodeRecord(xid,impl->p,impl->current.slot,impl->current.size); diff --git a/src/stasis/operations/nestedTopActions.c b/src/stasis/operations/nestedTopActions.c index a0e0ea5..269c8e7 100644 --- a/src/stasis/operations/nestedTopActions.c +++ b/src/stasis/operations/nestedTopActions.c @@ -63,6 +63,9 @@ pblHashTable_t * nestedTopActions = NULL; void initNestedTopActions() { nestedTopActions = pblHtCreate(); } +void deinitNestedTopActions() { + pblHtDelete(nestedTopActions); +} /** @todo TbeginNestedTopAction's API might not be quite right. Are there cases where we need to pass a recordid in? diff --git a/src/stasis/page.c b/src/stasis/page.c index 88db34e..f549564 100644 --- a/src/stasis/page.c +++ b/src/stasis/page.c @@ -322,6 +322,13 @@ void pageFlushed(Page * p){ *lsn_ptr(p) = p->LSN; } } +void pageCleanup(Page * p) { + short type = *page_type_ptr(p); + if(type) { + assert(page_impls[type].page_type == type); + page_impls[type].pageCleanup(p); + } +} /// Generic block implementations diff --git a/src/stasis/page.h b/src/stasis/page.h index a61aa30..d1f55a7 100644 --- a/src/stasis/page.h +++ b/src/stasis/page.h @@ -272,7 +272,7 @@ int pageFreespace(int xid, Page * p); void pageCompact(Page * p); void pageLoaded(Page * p); void pageFlushed(Page * p); - +void pageCleanup(Page * p); /** @return -1 if the field does not exist, the size of the field otherwise (the rid parameter's size field will be ignored). */ @@ -581,6 +581,7 @@ typedef struct page_impl { This function should record p->LSN somewhere appropriate */ void (*pageFlushed)(Page * p); + void (*pageCleanup)(Page * p); } page_impl; /** diff --git a/src/stasis/page/fixed.c b/src/stasis/page/fixed.c index f14ce85..86963f0 100644 --- a/src/stasis/page/fixed.c +++ b/src/stasis/page/fixed.c @@ -79,6 +79,7 @@ static int fixedGetLength(int xid, Page *p, recordid rid) { return rid.slot > *recordcount_ptr(p) ? INVALID_SLOT : physical_slot_length(*recordsize_ptr(p)); } + static int notSupported(int xid, Page * p) { return 0; } static int fixedFreespace(int xid, Page * p) { @@ -129,6 +130,7 @@ void fixedLoaded(Page *p) { void fixedFlushed(Page *p) { *lsn_ptr(p) = p->LSN; } +void fixedCleanup(Page *p) { } page_impl fixedImpl() { static page_impl pi = { FIXED_PAGE, @@ -153,6 +155,7 @@ page_impl fixedImpl() { 0, // XXX dereference fixedLoaded, // loaded fixedFlushed, // flushed + fixedCleanup }; return pi; } diff --git a/src/stasis/page/fixed.h b/src/stasis/page/fixed.h index 00aa8bb..5447d6d 100644 --- a/src/stasis/page/fixed.h +++ b/src/stasis/page/fixed.h @@ -2,7 +2,7 @@ #ifndef __FIXED_H #define __FIXED_H - +// @todo rename fixed.h macros to something more specific #define recordsize_ptr(page) shorts_from_end((page), 1) #define recordcount_ptr(page) shorts_from_end((page), 2) #define fixed_record_ptr(page, n) bytes_from_start((page), *recordsize_ptr((page)) * (n)) diff --git a/src/stasis/page/indirect.c b/src/stasis/page/indirect.c index 007ebde..b24444f 100644 --- a/src/stasis/page/indirect.c +++ b/src/stasis/page/indirect.c @@ -242,7 +242,7 @@ void indirectLoaded(Page *p) { void indirectFlushed(Page *p) { *lsn_ptr(p) = p->LSN; } - +void indirectCleanup(Page *p) { } static page_impl pi = { INDIRECT_PAGE, 0, //read, @@ -266,6 +266,7 @@ static page_impl pi = { 0, //XXX page_impl_dereference_identity, indirectLoaded, indirectFlushed, + indirectCleanup }; /** diff --git a/src/stasis/page/slotted.c b/src/stasis/page/slotted.c index 0ba586e..c7d908d 100644 --- a/src/stasis/page/slotted.c +++ b/src/stasis/page/slotted.c @@ -507,6 +507,7 @@ void slottedFlushed(Page *p) { *lsn_ptr(p) = p->LSN; slottedFsck(p); } +void slottedCleanup(Page *p) { } page_impl slottedImpl() { static page_impl pi = { @@ -532,6 +533,7 @@ static page_impl pi = { 0, //XXX page_impl_dereference_identity, slottedLoaded, slottedFlushed, + slottedCleanup }; return pi; } diff --git a/src/stasis/transactional2.c b/src/stasis/transactional2.c index 42e9e7d..e2c6409 100644 --- a/src/stasis/transactional2.c +++ b/src/stasis/transactional2.c @@ -408,6 +408,8 @@ int Tdeinit() { assert( numActiveXactions == 0 ); truncationDeinit(); ThashDeinit(); + TallocDeinit(); + deinitNestedTopActions(); bufDeinit(); DEBUG("Closing page file tdeinit\n"); closePageFile(); diff --git a/stasis/operations/alloc.h b/stasis/operations/alloc.h index 89410bd..548a940 100644 --- a/stasis/operations/alloc.h +++ b/stasis/operations/alloc.h @@ -21,7 +21,7 @@ void allocTransactionAbort(int xid); void allocTransactionCommit(int xid); void TallocInit(); - +void TallocDeinit(); /** Allocate a record. diff --git a/stasis/operations/lsmTree.h b/stasis/operations/lsmTree.h index 7c7927b..e3319f7 100644 --- a/stasis/operations/lsmTree.h +++ b/stasis/operations/lsmTree.h @@ -45,6 +45,12 @@ recordid TlsmDealloc(int xid, recordid tree); recordid TlsmAppendPage(int xid, recordid tree, const byte *key, long pageid); + +/** + Override the page allocation algorithm that LSM tree uses by default +*/ +void TlsmSetPageAllocator(pageid_t (*allocer)(int xid, void * ignored), + void * config); /** Lookup a leaf page. @@ -92,7 +98,7 @@ static inline int lsmTreeIterator_key (int xid, lladdIterator_t *it, byte **key) { lsmIteratorImpl * impl = (lsmIteratorImpl*)it->impl; *key = (byte*)(impl->t+1); - return sizeof(impl->current.size); + return impl->current.size; } static inline int lsmTreeIterator_value(int xid, lladdIterator_t *it, diff --git a/stasis/operations/nestedTopActions.h b/stasis/operations/nestedTopActions.h index 416cd66..2f7dfd3 100644 --- a/stasis/operations/nestedTopActions.h +++ b/stasis/operations/nestedTopActions.h @@ -57,6 +57,7 @@ terms specified in this license. #define __NESTED_TOP_ACTIONS_H__ void initNestedTopActions(); +void deinitNestedTopActions(); void * TbeginNestedTopAction(int xid, int op, const byte* log_arguments, int log_arguments_length); lsn_t TendNestedTopAction(int xid, void * handle); #endif