diff --git a/benchmarks/roseTable.cpp b/benchmarks/roseTable.cpp index a0b03c5..8b20e15 100644 --- a/benchmarks/roseTable.cpp +++ b/benchmarks/roseTable.cpp @@ -22,7 +22,7 @@ namespace rose { unlink("logfile.txt"); sync(); - + stasis_page_impl_register(Multicolumn::impl()); bufferManagerNonBlockingSlowHandleType = IO_HANDLE_PFILE; Tinit(); @@ -33,9 +33,24 @@ namespace rose { Tcommit(xid); - TlsmTableStart(lsmTable); + lsmTableHandle* h = TlsmTableStart(lsmTable); - TlsmTableStop(lsmTable); + typename PAGELAYOUT::FMT::TUP t; + + + static const long INSERTS = 1000000; + long int count = INSERTS / 20; + + for(long int i = 0; i < INSERTS; i++) { + t.set0(&i); + TlsmTableInsert(h,t); + count --; + if(!count) { + count = INSERTS / 20; + printf("%d pct complete\n", (i * 100) / INSERTS); + } + } + TlsmTableStop(h); Tdeinit(); @@ -48,6 +63,7 @@ int main(int argc, char **argv) { // XXX multicolumn is deprecated; want static dispatch! return rose::main ,rose::Rle > > + ,rose::For > > (argc,argv); + return 0; } diff --git a/src/stasis/operations/lsmIterators.h b/src/stasis/operations/lsmIterators.h index ee3d66c..bf0252b 100644 --- a/src/stasis/operations/lsmIterators.h +++ b/src/stasis/operations/lsmIterators.h @@ -46,14 +46,8 @@ inline const byte * toByteArray(stlSetIterator * const t); */ template class treeIterator { - public: - explicit treeIterator(recordid tree, ROW &scratch, int keylen) : - tree_(tree), - scratch_(scratch), - keylen_(keylen), - lsmIterator_(lsmTreeIterator_open(-1,tree)), - slot_(0) - { + private: + inline void init_helper() { if(!lsmTreeIterator_next(-1, lsmIterator_)) { currentPage_ = 0; pageid_ = -1; @@ -66,12 +60,26 @@ class treeIterator { currentPage_ = (PAGELAYOUT*)p_->impl; } } + public: + explicit treeIterator(recordid tree, ROW &scratch, int keylen) : + tree_(tree), + scratch_(scratch), + keylen_(keylen), + lsmIterator_(lsmTreeIterator_open(-1,tree)), + slot_(0) + { + init_helper(); + } typedef recordid handle; explicit treeIterator(recordid tree) : tree_(tree), scratch_(), - keylen_(ROW::sizeofBytes()) - { } + keylen_(ROW::sizeofBytes()), + lsmIterator_(lsmTreeIterator_open(-1,tree)), + slot_(0) + { + init_helper(); + } explicit treeIterator(treeIterator& t) : tree_(t.tree_), scratch_(t.scratch_), @@ -391,9 +399,9 @@ class versioningIterator { private: typedef typename SET::iterator STLITER; public: - typedef SET handle; + typedef SET * handle; - stlSetIterator( SET& s ) : it_(s.begin()), itend_(s.end()) {} + stlSetIterator( SET * s ) : it_(s->begin()), itend_(s->end()) {} stlSetIterator( STLITER& it, STLITER& itend ) : it_(it), itend_(itend) {} explicit stlSetIterator(stlSetIterator &i) : it_(i.it_), itend_(i.itend_){} diff --git a/src/stasis/operations/lsmTable.h b/src/stasis/operations/lsmTable.h index d06fd94..672757c 100644 --- a/src/stasis/operations/lsmTable.h +++ b/src/stasis/operations/lsmTable.h @@ -30,12 +30,13 @@ namespace rose { pthread_cond_t * in_block_ready_cond; pthread_cond_t * out_block_ready_cond; bool * still_open; - typename ITERA::handle ** out_tree; + typename ITERA::handle in_process_tree; typename ITERB::handle ** in_tree; + typename ITERA::handle ** out_tree; }; template - pageid_t compressData(ITER * begin, ITER * end, recordid tree, + pageid_t compressData(int xid, ITER * begin, ITER * end, recordid tree, pageid_t (*pageAlloc)(int,void*), void *pageAllocState, uint64_t *inserted) { *inserted = 0; @@ -43,12 +44,12 @@ namespace rose { if(*begin == *end) { return 0; } - pageid_t next_page = pageAlloc(-1,pageAllocState); - Page *p = loadPage(-1, next_page); + pageid_t next_page = pageAlloc(xid,pageAllocState); + Page *p = loadPage(xid, next_page); pageid_t pageCount = 0; if(*begin != *end) { - TlsmAppendPage(-1,tree,toByteArray(begin),pageAlloc,pageAllocState,p->id); + TlsmAppendPage(xid,tree,toByteArray(begin),pageAlloc,pageAllocState,p->id); } pageCount++; @@ -57,7 +58,7 @@ namespace rose { int lastEmpty = 0; for(ITER i(*begin); i != *end; ++i) { - rose::slot_index_t ret = mc->append(-1, *i); + rose::slot_index_t ret = mc->append(xid, *i); (*inserted)++; @@ -68,12 +69,12 @@ namespace rose { --(*end); if(i != *end) { - next_page = pageAlloc(-1,pageAllocState); - p = loadPage(-1, next_page); + next_page = pageAlloc(xid,pageAllocState); + p = loadPage(xid, next_page); mc = PAGELAYOUT::initPage(p, &*i); - TlsmAppendPage(-1,tree,toByteArray(&i),pageAlloc,pageAllocState,p->id); + TlsmAppendPage(xid,tree,toByteArray(&i),pageAlloc,pageAllocState,p->id); pageCount++; lastEmpty = 0; @@ -109,49 +110,58 @@ namespace rose { // Initialize tree with an empty tree. // XXX hardcodes ITERA's type: - recordid oldtree = TlsmCreate(xid, PAGELAYOUT::cmp_id(),a->pageAlloc, + typename ITERA::handle oldtree = TlsmCreate(xid, PAGELAYOUT::cmp_id(),a->pageAlloc, a->pageAllocState,PAGELAYOUT::FMT::TUP::sizeofBytes()); Tcommit(xid); // loop around here to produce multiple batches for merge. while(1) { - gettimeofday(&start_tv,0); pthread_mutex_lock(a->block_ready_mut); - if(!*(a->still_open)) { + int done = 0; + + while(!*(a->in_tree)) { + pthread_cond_signal(a->in_block_needed_cond); + if(!*(a->still_open)) { + done = 1; + break; + } + pthread_cond_wait(a->in_block_ready_cond,a->block_ready_mut); + } + if(done) { + a->in_process_tree = oldtree; + pthread_cond_signal(a->out_block_ready_cond); pthread_mutex_unlock(a->block_ready_mut); break; } - while(!*(a->in_tree)) { - pthread_cond_signal(a->in_block_needed_cond); - pthread_cond_wait(a->in_block_ready_cond,a->block_ready_mut); - } - gettimeofday(&wait_tv,0); + // XXX keep in_tree handle around so that it can be freed below. + + typename ITERB::handle old_in_tree = **a->in_tree; + + ITERA taBegin(oldtree); + ITERB tbBegin(**a->in_tree); + + ITERA *taEnd = taBegin.end(); + ITERB *tbEnd = tbBegin.end(); + + free(*a->in_tree); // free's copy of handle; not tree + *a->in_tree = 0; // free slot for producer + + pthread_cond_signal(a->in_block_needed_cond); + + pthread_mutex_unlock(a->block_ready_mut); + xid = Tbegin(); recordid tree = TlsmCreate(xid, PAGELAYOUT::cmp_id(),a->pageAlloc, a->pageAllocState,PAGELAYOUT::FMT::TUP::sizeofBytes()); - ITERA taBegin(oldtree); - ITERB tbBegin(**a->in_tree); - - // XXX keep in_tree handle around so that it can be freed below. - - free(*a->in_tree); // free's copy of handle; not tree - *a->in_tree = 0; // free slot for producer - pthread_cond_signal(a->in_block_needed_cond); - - pthread_mutex_unlock(a->block_ready_mut); - - ITERA *taEnd = taBegin.end(); - ITERB *tbEnd = tbBegin.end(); - mergeIterator mBegin(taBegin, tbBegin, *taEnd, *tbEnd); @@ -161,8 +171,9 @@ namespace rose { mEnd.seekEnd(); uint64_t insertedTuples; - pageid_t mergedPages = compressData > - (&mBegin, &mEnd,tree,a->pageAlloc,a->pageAllocState,&insertedTuples); + pageid_t mergedPages = compressData + > + (xid, &mBegin, &mEnd,tree,a->pageAlloc,a->pageAllocState,&insertedTuples); delete taEnd; delete tbEnd; @@ -195,6 +206,11 @@ namespace rose { a->pageAllocState,PAGELAYOUT::FMT::TUP::sizeofBytes()); } + + // XXX TlsmFree(xid,oldtree); + + oldtree = tree; + pthread_mutex_unlock(a->block_ready_mut); merge_count++; @@ -208,8 +224,8 @@ namespace rose { / (1024.0 * 1024.0 * total_elapsed); printf("merge # %-6d: comp ratio: %-9.3f waited %6.1f sec " - "worked %6.1f sec inserts %-12ld (%9.3f mb/s)\n", merge_count, ratio, - wait_elapsed, work_elapsed, (unsigned long)insertedTuples, throughput); + "worked %6.1f sec inserts %-12ld (%9.3f mb/s) %6d pages\n", merge_count, ratio, + wait_elapsed, work_elapsed, (unsigned long)insertedTuples, throughput, mergedPages); Tcommit(xid); @@ -250,13 +266,33 @@ namespace rose { return ret; } - /// XXX start should return a struct that contains these! - pthread_t merge1_thread; - pthread_t merge2_thread; - bool * still_open; + template + struct lsmTableHandle { + pthread_t merge1_thread; + pthread_t merge2_thread; + bool * still_open; + typename stlSetIterator + , + typename PAGELAYOUT::FMT::TUP>::handle ** input_handle; + typename std::set + * scratch_handle; + pthread_mutex_t * mut; + pthread_cond_t * input_ready_cond; + pthread_cond_t * input_needed_cond; + merge_args, treeIterator > * args1; + merge_args, stlSetIterator, + typename PAGELAYOUT::FMT::TUP> > * args2; + }; template - void TlsmTableStart(recordid tree) { + lsmTableHandle * TlsmTableStart(recordid& tree) { /// XXX xid for daemon processes? lsmTableHeader_t h; Tread(-1, tree, &h); @@ -268,6 +304,9 @@ namespace rose { typename PAGELAYOUT::FMT::TUP::stl_cmp>, typename PAGELAYOUT::FMT::TUP> RB_ITER; + typedef typename LSM_ITER::handle LSM_HANDLE; + typedef typename RB_ITER::handle RB_HANDLE; + pthread_mutex_t * block_ready_mut = (pthread_mutex_t*)malloc(sizeof(pthread_mutex_t)); pthread_cond_t * block0_needed_cond = @@ -291,17 +330,34 @@ namespace rose { pthread_cond_init(block1_ready_cond,0); pthread_cond_init(block2_ready_cond,0); - typename LSM_ITER::handle * block1_scratch = - (typename LSM_ITER::handle*) malloc(sizeof(typename LSM_ITER::handle)); - still_open = (bool*)malloc(sizeof(bool)); - *still_open = 1; + LSM_HANDLE ** block1_scratch = (LSM_HANDLE**) malloc(sizeof(LSM_HANDLE*)); + *block1_scratch = 0; + + RB_HANDLE ** block0_scratch = (RB_HANDLE**) malloc(sizeof(RB_HANDLE*)); + *block0_scratch = 0; + + lsmTableHandle * ret = (lsmTableHandle*) + malloc(sizeof(lsmTableHandle)); + + // merge1_thread initialized during pthread_create, below. + // merge2_thread initialized during pthread_create, below. + + ret->still_open = (bool*)malloc(sizeof(bool)); + *ret->still_open = 1; + + ret->input_handle = block0_scratch; + ret->scratch_handle = new typeof(*ret->scratch_handle); + + ret->mut = block_ready_mut; + + ret->input_ready_cond = block0_ready_cond; + ret->input_needed_cond = block0_needed_cond; + + recordid * ridp = (recordid*)malloc(sizeof(recordid)); *ridp = h.bigTreeAllocState; - recordid ** block1_scratch_p = (recordid**)malloc(sizeof(block1_scratch)); - *block1_scratch_p = block1_scratch; - - merge_args * args1 = (merge_args*)malloc(sizeof(merge_args)); + ret->args1 = (merge_args*)malloc(sizeof(merge_args)); merge_args tmpargs1 = { TlsmRegionAllocRid, @@ -311,18 +367,18 @@ namespace rose { block2_needed_cond, block1_ready_cond, block2_ready_cond, - still_open, - 0, - block1_scratch_p + ret->still_open, + NULLRID, + block1_scratch, + 0 }; - *args1 = tmpargs1; - void * (*merger1)(void*) = mergeThread - ; + *ret->args1 = tmpargs1; + void * (*merger1)(void*) = mergeThread; ridp = (recordid*)malloc(sizeof(recordid)); *ridp = h.mediumTreeAllocState; - merge_args * args2 = (merge_args*)malloc(sizeof(merge_args)); + ret->args2 = (merge_args*)malloc(sizeof(merge_args)); merge_args tmpargs2 = { TlsmRegionAllocRid, @@ -332,24 +388,48 @@ namespace rose { block1_needed_cond, block0_ready_cond, block1_ready_cond, - still_open, - block1_scratch_p, - 0 // XXX how does this thing get fed new trees of tuples? + ret->still_open, + NULLRID, + block0_scratch, + block1_scratch }; - *args2 = tmpargs2; - void * (*merger2)(void*) = mergeThread - ; + *ret->args2 = tmpargs2; + void * (*merger2)(void*) = mergeThread; + pthread_create(&ret->merge1_thread, 0, merger1, ret->args1); + pthread_create(&ret->merge2_thread, 0, merger2, ret->args2); - pthread_create(&merge1_thread, 0, merger1, args1); - pthread_create(&merge2_thread, 0, merger2, args2); - + return ret; } template - void TlsmTableStop(recordid tree) { - *still_open = 0; - pthread_join(merge1_thread,0); - pthread_join(merge2_thread,0); + void TlsmTableStop( lsmTableHandle * h) { + *(h->still_open) = 0; + pthread_join(h->merge1_thread,0); + pthread_join(h->merge2_thread,0); + } + template + void TlsmTableInsert( lsmTableHandle *h, + typename PAGELAYOUT::FMT::TUP &t) { + h->scratch_handle->insert(t); + if(h->scratch_handle->size() > 100000) { // XXX set threshold sanely!!! + pthread_mutex_lock(h->mut); + + while(*h->input_handle) { + pthread_cond_wait(h->input_needed_cond, h->mut); + } + + typeof(h->scratch_handle)* tmp_ptr + = (typeof(h->scratch_handle)*) malloc(sizeof(void*)); + *tmp_ptr = h->scratch_handle; + *(h->input_handle) = tmp_ptr; + + pthread_cond_signal(h->input_ready_cond); + + h->scratch_handle = new typeof(*h->scratch_handle); + + pthread_mutex_unlock(h->mut); + } + } } diff --git a/src/stasis/page/compression/staticTuple.h b/src/stasis/page/compression/staticTuple.h index 0a6f840..6ab786f 100644 --- a/src/stasis/page/compression/staticTuple.h +++ b/src/stasis/page/compression/staticTuple.h @@ -17,7 +17,7 @@ namespace rose { s.flag_ = NORMAL; initializePointers(); } - explicit inline StaticTuple(StaticTuple& t) { + explicit inline StaticTuple(const StaticTuple& t) { s.flag_ = t.s.flag_; s.epoch_ = t.s.epoch_; if(0 < N) s.cols0_ = t.s.cols0_; @@ -151,9 +151,9 @@ namespace rose { struct stl_cmp { - bool operator()(const StaticTuple* s1, const StaticTuple* s2) const + bool operator()(const StaticTuple& s1, const StaticTuple& s2) const { - return *s1 < *s2; + return s1 < s2; } }; @@ -178,6 +178,7 @@ namespace rose { if(9 < N) scratch_.set9((TYPE9*)dat_[9][off_]); return scratch_; } + inline bool operator==(const iterator &a) const { return (off_==a.off_); } @@ -204,8 +205,6 @@ namespace rose { }; private: - explicit StaticTuple(const StaticTuple& t) { abort(); } - void * cols_[N]; size_t size_[N]; typedef char flag_t;