From e46dcce461b054230da45a671bae0bc0a36d9269 Mon Sep 17 00:00:00 2001 From: Sears Russell Date: Sun, 4 Nov 2007 01:51:37 +0000 Subject: [PATCH] No more leaks; "fixed" RB tree memory usage estimates. --- benchmarks/roseTable.cpp | 14 ++- src/stasis/operations/lsmIterators.h | 28 ++++- src/stasis/operations/lsmTable.h | 131 +++++++++++++++++------ src/stasis/operations/lsmTree.c | 2 + src/stasis/page/compression/pageLayout.h | 3 +- src/stasis/truncation.c | 4 + 6 files changed, 143 insertions(+), 39 deletions(-) diff --git a/benchmarks/roseTable.cpp b/benchmarks/roseTable.cpp index e1b3074..d97be3e 100644 --- a/benchmarks/roseTable.cpp +++ b/benchmarks/roseTable.cpp @@ -38,8 +38,11 @@ namespace rose { typename PAGELAYOUT::FMT::TUP t; - static const long INSERTS = 1000000; - long int count = INSERTS / 20; + static const long INSERTS = 10000000; + // static const long INSERTS = 10000000; +// static const long INSERTS = 100000; + static const long COUNT = INSERTS / 100; + long int count = COUNT; struct timeval start_tv, now_tv; double start, now, last_start; @@ -48,12 +51,17 @@ namespace rose { start = rose::tv_to_double(start_tv); last_start = start; + printf("tuple 'size'%d\n", PAGELAYOUT::FMT::TUP::sizeofBytes()); + for(long int i = 0; i < INSERTS; i++) { t.set0(&i); + t.set1(&i); + t.set2(&i); + t.set3(&i); TlsmTableInsert(h,t); count --; if(!count) { - count = INSERTS / 20; + count = COUNT; gettimeofday(&now_tv,0); now = tv_to_double(now_tv); printf("%3d%% complete " diff --git a/src/stasis/operations/lsmIterators.h b/src/stasis/operations/lsmIterators.h index bf0252b..b52009c 100644 --- a/src/stasis/operations/lsmIterators.h +++ b/src/stasis/operations/lsmIterators.h @@ -38,7 +38,6 @@ template class stlSetIterator; template inline const byte * toByteArray(stlSetIterator * const t); - /** Scans through an LSM tree's leaf pages, each tuple in the tree, in order. This iterator is designed for maximum forward scan @@ -70,7 +69,23 @@ class treeIterator { { init_helper(); } - typedef recordid handle; + // typedef recordid handle; + class treeIteratorHandle { + public: + treeIteratorHandle() : r_(NULLRID) {} + treeIteratorHandle(const recordid r) : r_(r) {} + /* const treeIteratorHandle & operator=(const recordid *r) { + r_ = *r; + return this; + } */ + treeIteratorHandle * operator=(const recordid &r) { + r_ = r; + return this; + } + + recordid r_; + }; + typedef treeIteratorHandle* handle; explicit treeIterator(recordid tree) : tree_(tree), scratch_(), @@ -80,6 +95,15 @@ class treeIterator { { init_helper(); } + explicit treeIterator(treeIteratorHandle* tree) : + tree_(tree->r_), + scratch_(), + keylen_(ROW::sizeofBytes()), + lsmIterator_(lsmTreeIterator_open(-1,tree->r_)), + slot_(0) + { + init_helper(); + } explicit treeIterator(treeIterator& t) : tree_(t.tree_), scratch_(t.scratch_), diff --git a/src/stasis/operations/lsmTable.h b/src/stasis/operations/lsmTable.h index e61bd84..a1e279a 100644 --- a/src/stasis/operations/lsmTable.h +++ b/src/stasis/operations/lsmTable.h @@ -19,9 +19,14 @@ namespace rose { dispatched), interface to the underlying primititves */ + // Lower total work by perfomrming one merge at higher level + // for every FUDGE^2 merges at the immediately lower level. + // (Constrast to R, which controls the ratio of sizes of the trees.) + static const int FUDGE = 1; template struct merge_args { + int worker_id; pageid_t(*pageAlloc)(int,void*); void *pageAllocState; pthread_mutex_t * block_ready_mut; @@ -30,6 +35,10 @@ namespace rose { pthread_cond_t * in_block_ready_cond; pthread_cond_t * out_block_ready_cond; bool * still_open; + pageid_t * my_tree_size; + pageid_t * out_tree_size; + pageid_t max_size; + pageid_t r_i; typename ITERA::handle in_process_tree; typename ITERB::handle ** in_tree; typename ITERA::handle ** out_tree; @@ -110,8 +119,10 @@ namespace rose { // Initialize tree with an empty tree. // XXX hardcodes ITERA's type: - typename ITERA::handle oldtree = TlsmCreate(xid, PAGELAYOUT::cmp_id(),a->pageAlloc, - a->pageAllocState,PAGELAYOUT::FMT::TUP::sizeofBytes()); + typename ITERA::handle oldtree + = new typename ITERA::treeIteratorHandle( + TlsmCreate(xid, PAGELAYOUT::cmp_id(),a->pageAlloc, + a->pageAllocState,PAGELAYOUT::FMT::TUP::sizeofBytes()) ); Tcommit(xid); // loop around here to produce multiple batches for merge. @@ -142,20 +153,12 @@ namespace rose { gettimeofday(&wait_tv,0); - // XXX keep in_tree handle around so that it can be freed below. - - typename ITERB::handle old_in_tree = **a->in_tree; - ITERA taBegin(oldtree); ITERB tbBegin(**a->in_tree); ITERA *taEnd = taBegin.end(); ITERB *tbEnd = tbBegin.end(); - free(*a->in_tree); // free's copy of handle; not tree - *a->in_tree = 0; // free slot for producer - - pthread_cond_signal(a->in_block_needed_cond); pthread_mutex_unlock(a->block_ready_mut); @@ -180,6 +183,7 @@ namespace rose { delete taEnd; delete tbEnd; + gettimeofday(&stop_tv,0); // TlsmFree(wait_queue[0]) /// XXX Need to implement (de)allocation! @@ -195,26 +199,47 @@ namespace rose { double throughput = ((double)(insertedTuples * (uint64_t)PAGELAYOUT::FMT::TUP::sizeofBytes())) / (1024.0 * 1024.0 * total_elapsed); - printf("merge # %-6d: comp ratio: %-9.3f waited %6.1f sec " - "worked %6.1f sec inserts %-12ld (%9.3f mb/s) %6d pages\n", merge_count, ratio, - wait_elapsed, work_elapsed, (unsigned long)insertedTuples, throughput, mergedPages); + printf("worker %d merge # %-6d: comp ratio: %-9.3f waited %6.1f sec " + "worked %6.1f sec inserts %-12ld (%9.3f mb/s) %6d pages (need %6d)\n", a->worker_id, merge_count, ratio, + wait_elapsed, work_elapsed, (unsigned long)insertedTuples, throughput, mergedPages, !a->out_tree_size ? -1 : (FUDGE * *a->out_tree_size / a->r_i)); gettimeofday(&start_tv,0); pthread_mutex_lock(a->block_ready_mut); - static int threshold_calc = 1000; // XXX REALLY NEED TO FIX THIS! - if(a->out_tree && // is there a upstream merger (note the lack of the * on a->out_tree)? - mergedPages > threshold_calc // do we have enough data to bother it? + // keep actual handle around so that it can be freed below. + typename ITERB::handle old_in_tree = **a->in_tree; + delete old_in_tree; + free(*a->in_tree); // free pointer to handle + + // XXX should we delay this to this point? + // otherwise, the contents of in_tree become temporarily unavailable to observers. + *a->in_tree = 0; // tell producer that the slot is now open + + pthread_cond_signal(a->in_block_needed_cond); + + + if(a->out_tree_size) { + *a->my_tree_size = *a->out_tree_size / (a->r_i * FUDGE); + } else { + if(*a->my_tree_size < mergedPages) { + *a->my_tree_size = mergedPages; + } + } + + if(a->out_tree && // is there a upstream merger? (note the lack of the * on a->out_tree) + ((a->max_size && mergedPages > a->max_size ) + || + mergedPages > (FUDGE * *a->out_tree_size / a->r_i)) // do we have enough data to bother it? ) { while(*a->out_tree) { // we probably don't need the "while..." pthread_cond_wait(a->out_block_needed_cond, a->block_ready_mut); } // XXX C++? Objects? Constructors? Who needs them? - *a->out_tree = (recordid*)malloc(sizeof(tree)); - **a->out_tree = tree; + *a->out_tree = (typeof(*a->out_tree))malloc(sizeof(**a->out_tree)); + **a->out_tree = new typename ITERA::treeIteratorHandle(tree); pthread_cond_signal(a->out_block_ready_cond); // This is a bit wasteful; allocate a new empty tree to merge against. @@ -228,7 +253,7 @@ namespace rose { // XXX TlsmFree(xid,oldtree); - oldtree = tree; + *oldtree = tree; pthread_mutex_unlock(a->block_ready_mut); @@ -287,6 +312,7 @@ namespace rose { pthread_mutex_t * mut; pthread_cond_t * input_ready_cond; pthread_cond_t * input_needed_cond; + pageid_t * input_size; merge_args, treeIterator > * args1; @@ -296,6 +322,15 @@ namespace rose { typename PAGELAYOUT::FMT::TUP> > * args2; }; + // How many bytes of tuples can we afford to keep in RAM? + // this is just a guessed value... it seems about right based on + // experiments, but 450 bytes overhead per tuple is insane! + static const int RB_TREE_OVERHEAD = 450; + static const pageid_t MEM_SIZE = 800 * 1000 * 1000; + // How many pages should we try to fill with the first C1 merge? + static const pageid_t START_SIZE = 10 * 1000; + static const int R = 40; + template lsmTableHandle * TlsmTableStart(recordid& tree) { /// XXX xid for daemon processes? @@ -335,6 +370,13 @@ namespace rose { pthread_cond_init(block1_ready_cond,0); pthread_cond_init(block2_ready_cond,0); + pageid_t * block0_size = (pageid_t*)malloc(sizeof(pageid_t)); + // don't merge until we have enough data to be worthwhile... + *block0_size = START_SIZE; + pageid_t * block1_size = (pageid_t*)malloc(sizeof(pageid_t)); + // similarly, wait to merge the next block until we have merged block FUDGE times. + *block1_size = FUDGE * R * *block0_size; + LSM_HANDLE ** block1_scratch = (LSM_HANDLE**) malloc(sizeof(LSM_HANDLE*)); *block1_scratch = 0; @@ -357,7 +399,7 @@ namespace rose { ret->input_ready_cond = block0_ready_cond; ret->input_needed_cond = block0_needed_cond; - + ret->input_size = block0_size; recordid * ridp = (recordid*)malloc(sizeof(recordid)); *ridp = h.bigTreeAllocState; @@ -365,6 +407,7 @@ namespace rose { ret->args1 = (merge_args*)malloc(sizeof(merge_args)); merge_args tmpargs1 = { + 1, TlsmRegionAllocRid, ridp, block_ready_mut, @@ -373,7 +416,11 @@ namespace rose { block1_ready_cond, block2_ready_cond, ret->still_open, - NULLRID, + block1_size, + 0, // biggest component computes its size directly. + 0, // No max size for biggest component + R, + new typename LSM_ITER::treeIteratorHandle(NULLRID), block1_scratch, 0 }; @@ -386,6 +433,7 @@ namespace rose { ret->args2 = (merge_args*)malloc(sizeof(merge_args)); merge_args tmpargs2 = { + 2, TlsmRegionAllocRid, ridp, block_ready_mut, @@ -394,7 +442,11 @@ namespace rose { block0_ready_cond, block1_ready_cond, ret->still_open, - NULLRID, + block0_size, + block1_size, + (R * MEM_SIZE) / (PAGE_SIZE * 4), // 4 = estimated compression ratio + R, + new typename LSM_ITER::treeIteratorHandle(NULLRID), block0_scratch, block1_scratch }; @@ -407,16 +459,7 @@ namespace rose { return ret; } template - void TlsmTableStop( lsmTableHandle * h) { - *(h->still_open) = 0; - pthread_join(h->merge1_thread,0); - pthread_join(h->merge2_thread,0); - } - template - void TlsmTableInsert( lsmTableHandle *h, - typename PAGELAYOUT::FMT::TUP &t) { - h->scratch_handle->insert(t); - if(h->scratch_handle->size() > 100000) { // XXX set threshold sanely!!! + void TlsmTableFlush(lsmTableHandle *h) { pthread_mutex_lock(h->mut); while(*h->input_handle) { @@ -429,10 +472,32 @@ namespace rose { *(h->input_handle) = tmp_ptr; pthread_cond_signal(h->input_ready_cond); - h->scratch_handle = new typeof(*h->scratch_handle); pthread_mutex_unlock(h->mut); + + } + template + void TlsmTableStop( lsmTableHandle * h) { + TlsmTableFlush(h); + *(h->still_open) = 0; + pthread_join(h->merge1_thread,0); + pthread_join(h->merge2_thread,0); + } + template + void TlsmTableInsert( lsmTableHandle *h, + typename PAGELAYOUT::FMT::TUP &t) { + h->scratch_handle->insert(t); + //XXX 4 = estimated compression ratio. + + uint64_t handleBytes = h->scratch_handle->size() * (RB_TREE_OVERHEAD + PAGELAYOUT::FMT::TUP::sizeofBytes()); + uint64_t inputSizeThresh = (4 * PAGE_SIZE * *h->input_size); // / (PAGELAYOUT::FMT::TUP::sizeofBytes()); + uint64_t memSizeThresh = MEM_SIZE; + + if(handleBytes > inputSizeThresh || handleBytes > memSizeThresh) { // XXX ok? + printf("Handle mbytes %ld Input size: %ld input size thresh: %ld mbytes mem size thresh: %ld\n", + handleBytes / (1024*1024), *h->input_size, inputSizeThresh / (1024*1024), memSizeThresh / (1024*1024)); + TlsmTableFlush(h); } } diff --git a/src/stasis/operations/lsmTree.c b/src/stasis/operations/lsmTree.c index 76fd01b..da34f1d 100644 --- a/src/stasis/operations/lsmTree.c +++ b/src/stasis/operations/lsmTree.c @@ -253,6 +253,8 @@ recordid TlsmCreate(int xid, int comparator, writeNodeRecord(xid, p, DEPTH, dummy, keySize, 0); writeNodeRecord(xid, p, COMPARATOR, dummy, keySize, comparator); + free(dummy); + unlock(p->rwlatch); releasePage(p); return ret; diff --git a/src/stasis/page/compression/pageLayout.h b/src/stasis/page/compression/pageLayout.h index ccc2d83..88a263d 100644 --- a/src/stasis/page/compression/pageLayout.h +++ b/src/stasis/page/compression/pageLayout.h @@ -28,7 +28,7 @@ namespace rose { plugin_id_t pluginid = plugin_id(); - plugin_id_t * plugins = new plugin_id_t[column_count]; + plugin_id_t * plugins = (plugin_id_t*)malloc(column_count * sizeof(plugin_id_t)); for(column_number_t c = 0; c < column_count; c++) { plugins[c] = pluginid; } @@ -38,6 +38,7 @@ namespace rose { typename COMPRESSOR::TYP val = *(typename COMPRESSOR::TYP*)(t->get(c)); com->offset(val); } + free(plugins); return f; } static inline int cmp_id() { diff --git a/src/stasis/truncation.c b/src/stasis/truncation.c index 1bc4afc..6eca6e3 100644 --- a/src/stasis/truncation.c +++ b/src/stasis/truncation.c @@ -44,7 +44,11 @@ void dirtyPages_remove(Page * p) { //assert(pblHtLookup(dirtyPages, &(p->id), sizeof(int))); // printf("With lsn = %d\n", (lsn_t)pblHtCurrent(dirtyPages)); p->dirty = 0; + lsn_t * old = pblHtLookup(dirtyPages, &(p->id),sizeof(int)); pblHtRemove(dirtyPages, &(p->id), sizeof(int)); + if(old) { + free(old); + } //assert(!ret); <--- Due to a bug in the PBL compatibility mode, //there is no way to tell whether the value didn't exist, or if it //was null.