diff --git a/benchmarks/Makefile.am b/benchmarks/Makefile.am index 0e3acaf..3ffdefa 100644 --- a/benchmarks/Makefile.am +++ b/benchmarks/Makefile.am @@ -3,11 +3,14 @@ LDADD=$(top_builddir)/src/stasis/libstasis.la \ rose_SOURCES=rose.cpp +roseTable_SOURCES=roseTable.cpp if BUILD_BENCHMARKS -noinst_PROGRAMS=lhtableThreaded naiveHash logicalHash readLogicalHash naiveMultiThreaded logicalMultThreaded rawSet \ - arrayListSet logicalMultiReaders linearHashNTA linkedListNTA pageOrientedListNTA \ - linearHashNTAThreaded linearHashNTAMultiReader linearHashNTAWriteRequests transitiveClosure zeroCopy sequentialThroughput rose +noinst_PROGRAMS=lhtableThreaded naiveHash logicalHash readLogicalHash \ + naiveMultiThreaded logicalMultThreaded rawSet arrayListSet \ + logicalMultiReaders linearHashNTA linkedListNTA pageOrientedListNTA \ + linearHashNTAThreaded linearHashNTAMultiReader linearHashNTAWriteRequests \ + transitiveClosure zeroCopy sequentialThroughput rose roseTable endif AM_CFLAGS=${GLOBAL_CFLAGS} AM_CXXFLAGS=${GLOBAL_CXXFLAGS} -I ${top_builddir}/src diff --git a/benchmarks/rose.cpp b/benchmarks/rose.cpp index f328ee6..0b45a51 100644 --- a/benchmarks/rose.cpp +++ b/benchmarks/rose.cpp @@ -47,6 +47,8 @@ using rose::mergeThread; using rose::compressData; using rose::tv_to_double; // XXX +using rose::initPage; + static const int32_t GB = 1024 * 1024 * 1024; static int lsm_sim; // XXX this global variable shouldn't be global! @@ -54,6 +56,7 @@ static int lsm_sim; // XXX this global variable shouldn't be global! #define FIRST_PAGE 1 +#define FAST_ALLOC 6 /** Bypass stasis' allocation mechanisms. Stasis' page allocation costs should be minimal, but this program doesn't support @@ -350,8 +353,6 @@ void run_test(unsigned int inserts, column_number_t column_count, int num_pages = 0; - TlsmSetPageAllocator(roseFastAlloc, &num_pages); - Tinit(); recordid tree = NULLRID; @@ -361,7 +362,7 @@ void run_test(unsigned int inserts, column_number_t column_count, pthread_cond_t block_ready_cond = PTHREAD_COND_INITIALIZER; int max_waiters = 3; // merged, old, new. if(lsm_sim) { - struct insert_args args = { + struct insert_args args = { comparator_idx, rowsize, &begin, @@ -385,7 +386,13 @@ void run_test(unsigned int inserts, column_number_t column_count, &args); pthread_t merger; pthread_create(&merger, 0, - (void*(*)(void*))mergeThread, + (void*(*)(void*))mergeThread + , + treeIterator, + ROW, + TYPE>, &args); pthread_join(inserter,0); @@ -398,11 +405,11 @@ void run_test(unsigned int inserts, column_number_t column_count, gettimeofday(&start_tv, 0); if(buildTree) { - tree = TlsmCreate(-1, comparator_idx,rowsize); + tree = TlsmCreate(-1, comparator_idx, roseFastAlloc, &num_pages, rowsize); } uint64_t insertedByCompress; - compressData + compressData (&begin, &end,buildTree,tree,roseFastAlloc,(void*)&num_pages, &insertedByCompress); diff --git a/benchmarks/roseTable.cpp b/benchmarks/roseTable.cpp new file mode 100644 index 0000000..a0b03c5 --- /dev/null +++ b/benchmarks/roseTable.cpp @@ -0,0 +1,53 @@ +#include +#include +#include "stasis/operations/lsmTable.h" + +#include "stasis/transactional.h" + +#include "stasis/page/compression/multicolumn-impl.h" +#include "stasis/page/compression/for-impl.h" +#include "stasis/page/compression/rle-impl.h" +#include "stasis/page/compression/staticTuple.h" +#include "stasis/page/compression/pageLayout.h" + +typedef int32_t val_t; // XXX want multiple types! + +namespace rose { + template + int main(int argc, char **argv) { + static int cmp_num = 1; + static int init_num = 1; + + unlink("storefile.txt"); + unlink("logfile.txt"); + + sync(); + + bufferManagerNonBlockingSlowHandleType = IO_HANDLE_PFILE; + + Tinit(); + + int xid = Tbegin(); + + recordid lsmTable = TlsmTableAlloc(xid); + + Tcommit(xid); + + TlsmTableStart(lsmTable); + + TlsmTableStop(lsmTable); + + Tdeinit(); + + printf("test\n"); + } +} + +int main(int argc, char **argv) { + typedef rose::StaticTuple<4,int64_t,int32_t,int16_t,int8_t> tup; + // XXX multicolumn is deprecated; want static dispatch! + return rose::main + ,rose::Rle > > + (argc,argv); +} diff --git a/src/stasis/operations/lsmIterators.h b/src/stasis/operations/lsmIterators.h index 335753e..487489b 100644 --- a/src/stasis/operations/lsmIterators.h +++ b/src/stasis/operations/lsmIterators.h @@ -22,10 +22,21 @@ namespace rose { -template class mergeIterator; +template class mergeIterator; + +template +inline const byte * toByteArray(mergeIterator * const t); + + +template class versioningIterator; template -inline const byte * toByteArray(mergeIterator * const t); +inline const byte * toByteArray(versioningIterator * const t); + + +template class stlSetIterator; +template +inline const byte * toByteArray(stlSetIterator * const t); /** @@ -154,12 +165,9 @@ class treeIterator { This iterator takes two otehr iterators as arguments, and merges their output, dropping duplicate entries. - @todo LSM tree is not be very useful without support for deletion - (and, therefore, versioning). Such support will require - modifications to mergeIterator (or perhaps, a new iterator - class). + It does not understand versioning or tombstones. */ -template +template class mergeIterator { private: static const int A = 0; @@ -192,7 +200,7 @@ class mergeIterator { return cur; } public: - mergeIterator(ITER & a, ITER & b, ITER & aend, ITER & bend) : + mergeIterator(ITERA & a, ITERB & b, ITERA & aend, ITERB & bend) : off_(0), a_(a), b_(b), @@ -211,7 +219,7 @@ class mergeIterator { before_eof_(i.before_eof_) { } - ROW& operator* () { + const ROW& operator* () { if(curr_ == A || curr_ == BOTH) { return *a_; } if(curr_ == B) { return *b_; } abort(); @@ -272,19 +280,153 @@ class mergeIterator { inline unsigned int offset() { return off_; } private: unsigned int off_; - ITER a_; - ITER b_; - ITER aend_; - ITER bend_; + ITERA a_; + ITERB b_; + ITERA aend_; + ITERB bend_; int curr_; int before_eof_; - friend const byte* toByteArray(mergeIterator * const t); + friend const byte* + toByteArray(mergeIterator * const t); }; +/** + This iterator takes an iterator that produces rows with versioning + information. The rows should be sorted based on value, then sorted by + version, with the newest value first. + */ +template +class versioningIterator { + public: + versioningIterator(ITER & a, ITER & aend, + int beginning_of_time) : + a_(a), + aend_(aend), + check_tombstone_(0), + tombstone_(0), + off_(0) + {} + explicit versioningIterator(versioningIterator &i) : + a_(i.a_), + aend_(i.aend_), + check_tombstone_(i.check_tombstone_), + tombstone_(i.tombstone_), + off_(i.off_) + {} + + ROW& operator* () { + return *a_; + } + void seekEnd() { + a_ = aend_; // XXX good idea? + } + inline bool operator==(const versioningIterator &o) const { + return a_ == o.a_; + } + inline bool operator!=(const versioningIterator &o) const { + return !(*this == o); + } + inline void operator++() { + if(check_tombstone_) { + do { + ++a_; + } while(a_ != aend_ && *a_ == tombstone_); + } else { + ++a_; + } + if((*a_).tombstone()) { + tombstone_.copyFrom(*a_); + check_tombstone_ = 1; + } else { + check_tombstone_ = 0; + } + off_++; + } + inline void operator--() { + --a_; + // need to remember that we backed up so that ++ can work... + // the cursor is always positioned on a live value, and -- can + // only be followed by ++, so this should do the right thing. + check_tombstone_ = 0; + off_--; + } + inline int operator-(versioningIterator&i) { + return off_ - i.off_; + } + inline void operator=(versioningIterator const &i) { + a_ = i.a_; + aend_ = i.aend_; + check_tombstone_ = i.check_tombstone_; + tombstone_ = i.tombstone_; + // scratch_ = *a_; + off_ = i.off_; + } + inline unsigned int offset() { return off_; } + private: + // unsigned int off_; + ITER a_; + ITER aend_; + int check_tombstone_; + ROW tombstone_; + // ROW &scratch_; + off_t off_; + // int before_eof_; + // typeof(ROW::TIMESTAMP) beginning_of_time_; + friend const byte* + toByteArray(versioningIterator * const t); +}; + +/** + This iterator takes an iterator that produces rows with versioning + information. The rows should be sorted based on value, then sorted by + version, with the newest value first. + */ + template class stlSetIterator { + public: + stlSetIterator( STLITER& it, STLITER& itend ) : it_(it), itend_(itend) {} + explicit stlSetIterator(stlSetIterator &i) : it_(i.it_), itend_(i.itend_){} + const ROW& operator* () { return *it_; } + + void seekEnd() { + it_ = itend_; // XXX good idea? + } + stlSetIterator * end() { return new stlSetIterator(itend_,itend_); } + inline bool operator==(const stlSetIterator &o) const { + return it_ == o.it_; + } + inline bool operator!=(const stlSetIterator &o) const { + return !(*this == o); + } + inline void operator++() { + ++it_; + } + inline void operator--() { + --it_; + } + inline int operator-(stlSetIterator&i) { + return it_ - i.it_; + } + inline void operator=(stlSetIterator const &i) { + it_ = i.it_; + itend_ = i.itend_; + } + // inline unsigned int offset() { return off_; } + private: + // unsigned int off_; + STLITER it_; + STLITER itend_; + friend const byte* + toByteArray(stlSetIterator * const t); +}; + +template +inline const byte * toByteArray(stlSetIterator * const t) { + return (*(t->it_)).toByteArray(); +} /** Produce a byte array from the value stored at t's current position */ -template -inline const byte * toByteArray(mergeIterator * const t) { +template + inline const byte * toByteArray(mergeIterator * const t) { if(t->curr_ == t->A || t->curr_ == t->BOTH) { return toByteArray(&t->a_); } else if(t->curr_ == t->B) { @@ -293,12 +435,19 @@ inline const byte * toByteArray(mergeIterator * const t) { abort(); } +/** Produce a byte array from the value stored at t's current + position */ + template + inline const byte * toByteArray(versioningIterator * const t) { + return toByteArray(&t->a_); + } + template inline const byte* toByteArray(treeIterator *const t) { return (const byte*)&(**t); } -template -inline const byte* toByteArray(treeIterator,PAGELAYOUT> *const t) { +template +inline const byte* toByteArray(treeIterator *const t) { return (**t).toByteArray(); } diff --git a/src/stasis/operations/lsmTable.h b/src/stasis/operations/lsmTable.h new file mode 100644 index 0000000..2417d0e --- /dev/null +++ b/src/stasis/operations/lsmTable.h @@ -0,0 +1,311 @@ +#ifndef _ROSE_COMPRESSION_LSMTABLE_H__ +#define _ROSE_COMPRESSION_LSMTABLE_H__ + +#undef end +#undef begin + +#include +#include "lsmIterators.h" + +namespace rose { + /** + @file + + This file contains worker threads and the end user interface for Rose's + LSM-tree based table implementation. The page format is set at compile + time with a template instantiation. + + @see lsmWorkers.h provides a more general (and dynamically + dispatched), interface to the underlying primititves + */ + + + template + struct new_insert_args { + int comparator_idx; + int rowsize; //typedef int32_t val_t; + // ITER *begin; + // ITER *end; + pageid_t(*pageAlloc)(int,void*); + void *pageAllocState; + pthread_mutex_t * block_ready_mut; + pthread_cond_t * block_needed_cond; + pthread_cond_t * block_ready_cond; + int max_waiters; + int wait_count; + recordid * wait_queue; + typename PAGELAYOUT::FMT::TUP *scratchA; + typename PAGELAYOUT::FMT::TUP *scratchB; + pageid_t mergedPages; + }; + + template + pageid_t compressData(ITER * begin, ITER * end, recordid tree, + pageid_t (*pageAlloc)(int,void*), + void *pageAllocState, uint64_t *inserted) { + *inserted = 0; + + if(*begin == *end) { + return 0; + } + pageid_t next_page = pageAlloc(-1,pageAllocState); + Page *p = loadPage(-1, next_page); + pageid_t pageCount = 0; + + if(*begin != *end) { + TlsmAppendPage(-1,tree,toByteArray(begin),pageAlloc,pageAllocState,p->id); + } + pageCount++; + + typename PAGELAYOUT::FMT * mc = PAGELAYOUT::initPage(p, &**begin); + + int lastEmpty = 0; + + for(ITER i(*begin); i != *end; ++i) { + rose::slot_index_t ret = mc->append(-1, *i); + + (*inserted)++; + + if(ret == rose::NOSPACE) { + p->dirty = 1; + mc->pack(); + releasePage(p); + + --(*end); + if(i != *end) { + next_page = pageAlloc(-1,pageAllocState); + p = loadPage(-1, next_page); + + mc = PAGELAYOUT::initPage(p, &*i); + + TlsmAppendPage(-1,tree,toByteArray(&i),pageAlloc,pageAllocState,p->id); + + pageCount++; + lastEmpty = 0; + } else { + lastEmpty = 1; + } + ++(*end); + --i; + } + } + + p->dirty = 1; + mc->pack(); + releasePage(p); + return pageCount; + } + + + /** + ITERA is an iterator over the data structure that mergeThread creates (a lsm tree iterator). + ITERB is an iterator over the data structures that mergeThread takes as input (lsm tree, or rb tree..) + */ + template //class PAGELAYOUTX, class ENGINE, class ITERA, class ITERB, + // class ROW, class TYPE> + void* mergeThread(void* arg) { + // The ITER argument of a is unused (we don't look at it's begin or end fields...) + //insert_args* a = + // (insert_args*)arg; + new_insert_args * a = (new_insert_args*)arg; + + struct timeval start_tv, wait_tv, stop_tv; + + int merge_count = 0; + // loop around here to produce multiple batches for merge. + while(1) { + + gettimeofday(&start_tv,0); + + pthread_mutex_lock(a->block_ready_mut); + while(a->wait_count <2) { + pthread_cond_wait(a->block_ready_cond,a->block_ready_mut); + } + + gettimeofday(&wait_tv,0); + + recordid * oldTreeA = &a->wait_queue[0]; + recordid * oldTreeB = &a->wait_queue[1]; + + pthread_mutex_unlock(a->block_ready_mut); + + recordid tree = TlsmCreate(-1, a->comparator_idx,a->pageAlloc,a->pageAllocState,a->rowsize); + + ITERA taBegin(*oldTreeA,*(a->scratchA),a->rowsize); + ITERB tbBegin(*oldTreeB,*(a->scratchB),a->rowsize); + + ITERA *taEnd = taBegin.end(); + ITERB *tbEnd = tbBegin.end(); + + mergeIterator + mBegin(taBegin, tbBegin, *taEnd, *tbEnd); + + mergeIterator + mEnd(taBegin, tbBegin, *taEnd, *tbEnd); + + + mEnd.seekEnd(); + uint64_t insertedTuples; + pageid_t mergedPages = compressData > + (&mBegin, &mEnd,tree,a->pageAlloc,a->pageAllocState,&insertedTuples); + delete taEnd; + delete tbEnd; + + gettimeofday(&stop_tv,0); + + pthread_mutex_lock(a->block_ready_mut); + + a->mergedPages = mergedPages; + + // TlsmFree(wait_queue[0]) /// XXX Need to implement (de)allocation! + // TlsmFree(wait_queue[1]) + + memcpy(&a->wait_queue[0],&tree,sizeof(tree)); + for(int i = 1; i + 1 < a->wait_count; i++) { + memcpy(&a->wait_queue[i],&a->wait_queue[i+1],sizeof(tree)); + } + a->wait_count--; + pthread_mutex_unlock(a->block_ready_mut); + + merge_count++; + + double wait_elapsed = tv_to_double(wait_tv) - tv_to_double(start_tv); + double work_elapsed = tv_to_double(stop_tv) - tv_to_double(wait_tv); + double total_elapsed = wait_elapsed + work_elapsed; + double ratio = ((double)(insertedTuples * (uint64_t)a->rowsize)) + / (double)(PAGE_SIZE * mergedPages); + double throughput = ((double)(insertedTuples * (uint64_t)a->rowsize)) + / (1024.0 * 1024.0 * total_elapsed); + + printf("merge # %-6d: comp ratio: %-9.3f waited %6.1f sec " + "worked %6.1f sec inserts %-12ld (%9.3f mb/s)\n", merge_count, ratio, + wait_elapsed, work_elapsed, (unsigned long)insertedTuples, throughput); + + pthread_cond_signal(a->block_needed_cond); + } + return 0; + } + /* + + template + void* insertThread(void* arg) { + + new_insert_args * a = (new_insert_args*)arg; + struct timeval start_tv, start_wait_tv, stop_tv; + + int insert_count = 0; + + pageid_t lastTreeBlocks = 0; + uint64_t lastTreeInserts = 0; + pageid_t desiredInserts = 0; + + // this is a hand-tuned value; it should be set dynamically, not staticly + double K = 0.18; + + // loop around here to produce multiple batches for merge. + while(1) { + gettimeofday(&start_tv,0); + // XXX this needs to be an iterator over an in-memory tree. + ITER i(*(a->begin)); + ITER j(desiredInserts ? *(a->begin) : *(a->end)); + if(desiredInserts) { + j += desiredInserts; + } + recordid tree = TlsmCreate(-1, a->comparator_idx,a->rowsize); + lastTreeBlocks = + compressData + (&i, &j,1,tree,a->pageAlloc,a->pageAllocState, &lastTreeInserts); + + gettimeofday(&start_wait_tv,0); + pthread_mutex_lock(a->block_ready_mut); + while(a->wait_count >= a->max_waiters) { + pthread_cond_wait(a->block_needed_cond,a->block_ready_mut); + } + + memcpy(&a->wait_queue[a->wait_count],&tree,sizeof(recordid)); + a->wait_count++; + + pthread_cond_signal(a->block_ready_cond); + gettimeofday(&stop_tv,0); + double work_elapsed = tv_to_double(start_wait_tv) - tv_to_double(start_tv); + double wait_elapsed = tv_to_double(stop_tv) - tv_to_double(start_wait_tv); + double elapsed = tv_to_double(stop_tv) - tv_to_double(start_tv); + printf("insert# %-6d waited %6.1f sec " + "worked %6.1f sec inserts %-12ld (%9.3f mb/s)\n", + ++insert_count, + wait_elapsed, + work_elapsed, + (long int)lastTreeInserts, + (lastTreeInserts*(uint64_t)a->rowsize / (1024.0*1024.0)) / elapsed); + + if(a->mergedPages != -1) { + desiredInserts = (pageid_t)(((double)a->mergedPages / K) + * ((double)lastTreeInserts + / (double)lastTreeBlocks)); + } + pthread_mutex_unlock(a->block_ready_mut); + + } + return 0; + } + */ + typedef struct { + recordid bigTree; + recordid bigTreeAllocState; // this is probably the head of an arraylist of regions used by the tree... + recordid mediumTree; + recordid mediumTreeAllocState; + epoch_t beginning; + epoch_t end; + } lsmTableHeader_t; + + template + inline recordid TlsmTableAlloc(int xid) { + + // XXX use a (slow) page allocator in alloc, then create a new + // (fast) region allocator in start. + + recordid ret = Talloc(xid, sizeof(lsmTableHeader_t)); + lsmTableHeader_t h; + h.bigTreeAllocState = Talloc(xid,sizeof(TlsmRegionAllocConf_t)); + Tset(xid,h.bigTreeAllocState,&LSM_REGION_ALLOC_STATIC_INITIALIZER); + h.bigTree = TlsmCreate(xid, PAGELAYOUT::cmp_id(), + TlsmRegionAllocRid,&h.bigTreeAllocState, + PAGELAYOUT::FMT::TUP::sizeofBytes()); + h.mediumTreeAllocState = Talloc(xid,sizeof(TlsmRegionAllocConf_t)); + Tset(xid,h.mediumTreeAllocState,&LSM_REGION_ALLOC_STATIC_INITIALIZER); + h.mediumTree = TlsmCreate(xid, PAGELAYOUT::cmp_id(), + TlsmRegionAllocRid,&h.mediumTreeAllocState, + PAGELAYOUT::FMT::TUP::sizeofBytes()); + epoch_t beginning = 0; + epoch_t end = 0; + Tset(xid, ret, &h); + return ret; + } + template + void TlsmTableStart(recordid tree) { + /// XXX xid for daemon processes? + + void * (*merger)(void*) = mergeThread + , + treeIterator >; + + /*mergeThread + , + stlSetIterator::iterator, + typename PAGELAYOUT::FMT::TUP> > + (0); */ + + lsmTableHeader_t h; + Tread(-1, tree, &h); + + } + template + void TlsmTableStop(recordid tree) { + + } +} + +#endif // _ROSE_COMPRESSION_LSMTABLE_H__ diff --git a/src/stasis/operations/lsmTree.c b/src/stasis/operations/lsmTree.c index 39d1c82..76fd01b 100644 --- a/src/stasis/operations/lsmTree.c +++ b/src/stasis/operations/lsmTree.c @@ -8,12 +8,40 @@ #include static lsm_comparator_t comparators[MAX_LSM_COMPARATORS]; +static lsm_page_initializer_t initializers[MAX_LSM_PAGE_INITIALIZERS]; + +TlsmRegionAllocConf_t LSM_REGION_ALLOC_STATIC_INITIALIZER = + { -1, -1, 1000 }; + +pageid_t TlsmRegionAlloc(int xid, void *conf) { + TlsmRegionAllocConf_t * a = (TlsmRegionAllocConf_t*)conf; + if(a->nextPage == a->endOfRegion) { + a->nextPage = TregionAlloc(xid, a->regionSize,0); + a->endOfRegion = a->nextPage + a->regionSize; + } + pageid_t ret = a->nextPage; + (a->nextPage)++; + return ret; +} + +pageid_t TlsmRegionAllocRid(int xid, void * ridp) { + recordid rid = *(recordid*)ridp; + TlsmRegionAllocConf_t conf; + Tread(xid,rid,&conf); + pageid_t ret = TlsmRegionAlloc(xid,&conf); + // XXX get rid of Tset by storing next page in memory, and losing it + // on crash. + Tset(xid,rid,&conf); + return ret; +} void lsmTreeRegisterComparator(int id, lsm_comparator_t i) { // XXX need to de-init this somewhere... assert(!comparators[id]); comparators[id] = i; } - +void lsmTreeRegisterPageInitializer(int id, lsm_page_initializer_t i) { + initializers[id] = i; +} #define HEADER_SIZE (2 * sizeof(lsmTreeNodeRecord)) @@ -76,20 +104,6 @@ void lsmTreeRegisterComparator(int id, lsm_comparator_t i) { */ -static pageid_t defaultAllocator(int xid, void *ignored) { - return TpageAlloc(xid); -} - -static pageid_t (*pageAllocator)(int xid, void *ignored) = defaultAllocator; -static void *pageAllocatorConfig; -void TlsmSetPageAllocator(pageid_t (*allocer)(int xid, void * ignored), - void * config) { - pageAllocator = allocer; - pageAllocatorConfig = config; -} - - - typedef struct lsmTreeState { pageid_t lastLeaf; } lsmTreeState; @@ -197,13 +211,15 @@ void writeNodeRecordVirtualMethods(int xid, Page *p, int slot, stasis_page_lsn_write(xid, p, 0); // XXX need real LSN? } -recordid TlsmCreate(int xid, int comparator, int keySize) { +recordid TlsmCreate(int xid, int comparator, + lsm_page_allocator_t allocator, void *allocator_state, + int keySize) { // can the pages hold at least two keys? assert(HEADER_SIZE + 2 * (sizeof(lsmTreeNodeRecord) +keySize) < USABLE_SIZE_OF_PAGE - 2 * sizeof(short)); - pageid_t root = pageAllocator(xid, pageAllocatorConfig); + pageid_t root = allocator(xid, allocator_state); //pageAllocatorConfig); DEBUG("Root = %lld\n", root); recordid ret = { root, 0, 0 }; @@ -244,13 +260,15 @@ recordid TlsmCreate(int xid, int comparator, int keySize) { static recordid buildPathToLeaf(int xid, recordid root, Page *root_p, int depth, const byte *key, size_t key_len, - pageid_t val_page, pageid_t lastLeaf) { + pageid_t val_page, pageid_t lastLeaf, + lsm_page_allocator_t allocator, + void *allocator_state) { // root is the recordid on the root page that should point to the // new subtree. assert(depth); DEBUG("buildPathToLeaf(depth=%d) (lastleaf=%lld) called\n",depth, lastLeaf); - pageid_t child = pageAllocator(xid, pageAllocatorConfig); // XXX Use some other function... + pageid_t child = allocator(xid,allocator_state); DEBUG("new child = %lld internal? %d\n", child, depth-1); Page *child_p = loadPage(xid, child); @@ -266,7 +284,7 @@ static recordid buildPathToLeaf(int xid, recordid root, Page *root_p, stasis_record_alloc_done(xid, child_p, child_rec); ret = buildPathToLeaf(xid, child_rec, child_p, depth-1, key, key_len, - val_page,lastLeaf); + val_page,lastLeaf, allocator, allocator_state); unlock(child_p->rwlatch); releasePage(child_p); @@ -328,7 +346,9 @@ static recordid buildPathToLeaf(int xid, recordid root, Page *root_p, static recordid appendInternalNode(int xid, Page *p, int depth, const byte *key, size_t key_len, - pageid_t val_page, pageid_t lastLeaf) { + pageid_t val_page, pageid_t lastLeaf, + lsm_page_allocator_t allocator, + void *allocator_state) { assert(*stasis_page_type_ptr(p) == LSM_ROOT_PAGE || *stasis_page_type_ptr(p) == FIXED_PAGE); if(!depth) { @@ -350,7 +370,7 @@ static recordid appendInternalNode(int xid, Page *p, Page *child_page = loadPage(xid, child_id); writelock(child_page->rwlatch,0); ret = appendInternalNode(xid, child_page, depth-1, key, key_len, - val_page, lastLeaf); + val_page, lastLeaf, allocator, allocator_state); unlock(child_page->rwlatch); releasePage(child_page); @@ -360,7 +380,7 @@ static recordid appendInternalNode(int xid, Page *p, if(ret.size != INVALID_SLOT) { stasis_record_alloc_done(xid, p, ret); ret = buildPathToLeaf(xid, ret, p, depth, key, key_len, val_page, - lastLeaf); + lastLeaf, allocator, allocator_state); DEBUG("split tree rooted at %lld, wrote value to {%d %d %lld}\n", p->id, ret.page, ret.slot, ret.size); @@ -418,6 +438,7 @@ static pageid_t findFirstLeaf(int xid, Page *root, int depth) { } recordid TlsmAppendPage(int xid, recordid tree, const byte *key, + lsm_page_allocator_t allocator, void *allocator_state, long val_page) { Page *p = loadPage(xid, tree.page); writelock(p->rwlatch, 0); @@ -461,12 +482,13 @@ recordid TlsmAppendPage(int xid, recordid tree, assert(tree.page == p->id); ret = appendInternalNode(xid, p, depth, key, keySize, val_page, - s->lastLeaf == tree.page ? -1 : s->lastLeaf); + s->lastLeaf == tree.page ? -1 : s->lastLeaf, + allocator, allocator_state); if(ret.size == INVALID_SLOT) { DEBUG("Need to split root; depth = %d\n", depth); - pageid_t child = pageAllocator(xid, pageAllocatorConfig); + pageid_t child = allocator(xid, allocator_state); Page *lc = loadPage(xid, child); writelock(lc->rwlatch,0); @@ -519,7 +541,8 @@ recordid TlsmAppendPage(int xid, recordid tree, assert(tree.page == p->id); ret = appendInternalNode(xid, p, depth, key, keySize, val_page, - s->lastLeaf == tree.page ? -1 : s->lastLeaf); + s->lastLeaf == tree.page ? -1 : s->lastLeaf, + allocator, allocator_state); assert(ret.size != INVALID_SLOT); @@ -625,8 +648,8 @@ pageid_t TlsmFindPage(int xid, recordid tree, const byte *key) { size_t keySize = getKeySize(xid,p); - const lsmTreeNodeRecord *depth_nr = readNodeRecord(xid, p , 0, keySize); - const lsmTreeNodeRecord *cmp_nr = readNodeRecord(xid, p , 1, keySize); + const lsmTreeNodeRecord *depth_nr = readNodeRecord(xid, p , DEPTH, keySize); + const lsmTreeNodeRecord *cmp_nr = readNodeRecord(xid, p , COMPARATOR, keySize); int depth = depth_nr->ptr; diff --git a/src/stasis/operations/lsmWorkers.h b/src/stasis/operations/lsmWorkers.h index ad28a03..c463050 100644 --- a/src/stasis/operations/lsmWorkers.h +++ b/src/stasis/operations/lsmWorkers.h @@ -22,12 +22,7 @@ inline const byte * toByteArray(Tuple::iterator * const t) { return (**t).toByteArray(); } -double tv_to_double(struct timeval tv) { - return static_cast(tv.tv_sec) + - (static_cast(tv.tv_usec) / 1000000.0); -} - -template +template struct insert_args { int comparator_idx; int rowsize;typedef int32_t val_t; @@ -51,34 +46,34 @@ struct insert_args { passed into them, and allocate a new PAGELAYOUT object of the appropriate type. */ -template -static Pstar * initPage(Pstar **pstar, - Page *p, TYPE current) { +template +inline Pstar * initPage(Pstar **pstar, + Page *p, const TYPE current) { *pstar = new Pstar(-1, p); (*pstar)->compressor()->offset(current); return *pstar; } -template -static Pstar * initPage(Pstar **pstar, - Page *p, Tuple & current) { +template +inline Pstar * initPage(Pstar **pstar, + Page *p, const ROW & current) { *pstar = new Pstar(-1, p); (*pstar)->compressor()->offset(current); return *pstar; } -template -static Multicolumn > * initPage(Multicolumn > ** mc, - Page *p, Tuple & t) { +template +inline Multicolumn * initPage(Multicolumn ** mc, + Page *p, const ROW & t) { column_number_t column_count = t.column_count(); plugin_id_t plugin_id = - rose::plugin_id >,COMPRESSOR,TYPE>(); + rose::plugin_id,COMPRESSOR,TYPE>(); plugin_id_t * plugins = new plugin_id_t[column_count]; for(column_number_t c = 0; c < column_count; c++) { plugins[c] = plugin_id; } - *mc = new Multicolumn >(-1,p,column_count,plugins); + *mc = new Multicolumn(-1,p,column_count,plugins); for(column_number_t c = 0; c < column_count; c++) { ((COMPRESSOR*)(*mc)->compressor(c))->offset(*t.get(c)); } @@ -86,16 +81,16 @@ static Multicolumn > * initPage(Multicolumn > ** mc, delete [] plugins; return *mc; } -template -static Multicolumn > * initPage(Multicolumn > ** mc, - Page *p, TYPE t) { +template +inline Multicolumn * initPage(Multicolumn ** mc, + Page *p, const TYPE t) { plugin_id_t plugin_id = - rose::plugin_id >,COMPRESSOR,TYPE>(); + rose::plugin_id,COMPRESSOR,TYPE>(); plugin_id_t * plugins = new plugin_id_t[1]; plugins[0] = plugin_id; - *mc = new Multicolumn >(-1,p,1,plugins); + *mc = new Multicolumn(-1,p,1,plugins); ((COMPRESSOR*)(*mc)->compressor(0))->offset(t); delete [] plugins; @@ -112,10 +107,10 @@ static Multicolumn > * initPage(Multicolumn > ** mc, @return the number of pages that were needed to store the compressed data. */ -template +template pageid_t compressData(ITER * const begin, ITER * const end, - int buildTree, recordid tree, pageid_t (*pageAlloc)(int,void*), - void *pageAllocState, uint64_t * inserted) { + int buildTree, recordid tree, pageid_t (*pageAlloc)(int,void*), + void *pageAllocState, uint64_t * inserted) { *inserted = 0; @@ -129,13 +124,12 @@ pageid_t compressData(ITER * const begin, ITER * const end, if(*begin != *end && buildTree) { - TlsmAppendPage(-1,tree,toByteArray(begin),p->id); + TlsmAppendPage(-1,tree,toByteArray(begin),pageAlloc,pageAllocState,p->id); } pageCount++; PAGELAYOUT * mc; - - initPage(&mc, p, **begin); + initPage(&mc, p, **begin); int lastEmpty = 0; @@ -154,9 +148,10 @@ pageid_t compressData(ITER * const begin, ITER * const end, next_page = pageAlloc(-1,pageAllocState); p = loadPage(-1, next_page); - mc = initPage(&mc, p, *i); + mc = initPage(&mc, p, *i); + if(buildTree) { - TlsmAppendPage(-1,tree,toByteArray(&i),p->id); + TlsmAppendPage(-1,tree,toByteArray(&i),pageAlloc,pageAllocState,p->id); } pageCount++; lastEmpty = 0; @@ -176,8 +171,8 @@ pageid_t compressData(ITER * const begin, ITER * const end, template void* insertThread(void* arg) { - insert_args* a = - (insert_args*)arg; + insert_args* a = + (insert_args*)arg; struct timeval start_tv, start_wait_tv, stop_tv; @@ -198,9 +193,10 @@ void* insertThread(void* arg) { if(desiredInserts) { j += desiredInserts; } - recordid tree = TlsmCreate(-1, a->comparator_idx,a->rowsize); - lastTreeBlocks = compressData - (&i, &j,1,tree,a->pageAlloc,a->pageAllocState, &lastTreeInserts); + recordid tree = TlsmCreate(-1, a->comparator_idx,a->pageAlloc, a->pageAllocState, a->rowsize); + lastTreeBlocks = + compressData + (&i, &j,1,tree,a->pageAlloc,a->pageAllocState, &lastTreeInserts); gettimeofday(&start_wait_tv,0); pthread_mutex_lock(a->block_ready_mut); @@ -235,10 +231,16 @@ void* insertThread(void* arg) { return 0; } -template +/** + ITERA is an iterator over the data structure that mergeThread creates (a lsm tree iterator). + ITERB is an iterator over the data structures that mergeThread takes as input (lsm tree, or rb tree..) + */ +template void* mergeThread(void* arg) { - insert_args* a = - (insert_args*)arg; + // The ITER argument of a is unused (we don't look at it's begin or end fields...) + insert_args* a = + (insert_args*)arg; struct timeval start_tv, wait_tv, stop_tv; @@ -260,27 +262,26 @@ void* mergeThread(void* arg) { pthread_mutex_unlock(a->block_ready_mut); - recordid tree = TlsmCreate(-1, a->comparator_idx,a->rowsize); + recordid tree = TlsmCreate(-1, a->comparator_idx,a->pageAlloc,a->pageAllocState,a->rowsize); - treeIterator taBegin(*oldTreeA,*(a->scratchA),a->rowsize); - treeIterator tbBegin(*oldTreeB,*(a->scratchB),a->rowsize); + ITERA taBegin(*oldTreeA,*(a->scratchA),a->rowsize); + ITERB tbBegin(*oldTreeB,*(a->scratchB),a->rowsize); - treeIterator *taEnd = taBegin.end(); - treeIterator *tbEnd = tbBegin.end(); + ITERA *taEnd = taBegin.end(); + ITERB *tbEnd = tbBegin.end(); - mergeIterator,ROW> - mBegin(taBegin, tbBegin, *taEnd, *tbEnd); + mergeIterator + mBegin(taBegin, tbBegin, *taEnd, *tbEnd); - mergeIterator,ROW> - mEnd(taBegin, tbBegin, *taEnd, *tbEnd); + mergeIterator + mEnd(taBegin, tbBegin, *taEnd, *tbEnd); mEnd.seekEnd(); uint64_t insertedTuples; - pageid_t mergedPages = compressData,ROW>,ROW> - (&mBegin, &mEnd,1,tree,a->pageAlloc,a->pageAllocState,&insertedTuples); - + pageid_t mergedPages = compressData > + (&mBegin, &mEnd,1,tree,a->pageAlloc,a->pageAllocState,&insertedTuples); delete taEnd; delete tbEnd; diff --git a/src/stasis/page/compression/compression.h b/src/stasis/page/compression/compression.h index 22fd333..4519d83 100644 --- a/src/stasis/page/compression/compression.h +++ b/src/stasis/page/compression/compression.h @@ -11,6 +11,7 @@ typedef uint32_t slot_index_t; typedef uint8_t plugin_id_t; typedef uint8_t column_number_t; typedef uint16_t column_offset_t; +typedef uint16_t epoch_t; static const record_size_t VARIABLE_SIZE = CHAR_MAX; static const slot_index_t NOSPACE = UINT_MAX; @@ -71,6 +72,12 @@ inline plugin_id_t plugin_id() { return ret; } +double tv_to_double(struct timeval tv) { + return static_cast(tv.tv_sec) + + (static_cast(tv.tv_usec) / 1000000.0); +} + + } diff --git a/src/stasis/page/compression/for.h b/src/stasis/page/compression/for.h index d1643a8..12c91b4 100644 --- a/src/stasis/page/compression/for.h +++ b/src/stasis/page/compression/for.h @@ -42,6 +42,8 @@ namespace rose { template class For { public: + typedef TYPE TYP; + static const int PLUGIN_ID = 0; /** Set the page offset. For frame of reference, this is used to diff --git a/src/stasis/page/compression/multicolumn-impl.h b/src/stasis/page/compression/multicolumn-impl.h index 24a1caf..cd91229 100644 --- a/src/stasis/page/compression/multicolumn-impl.h +++ b/src/stasis/page/compression/multicolumn-impl.h @@ -2,7 +2,8 @@ #define _ROSE_COMPRESSION_MULTICOLUMN_IMPL_H__ #include "multicolumn.h" - +#include "for-impl.h" +#include "rle-impl.h" namespace rose { /** diff --git a/src/stasis/page/compression/multicolumn.h b/src/stasis/page/compression/multicolumn.h index 8aa6a5d..9621db0 100644 --- a/src/stasis/page/compression/multicolumn.h +++ b/src/stasis/page/compression/multicolumn.h @@ -68,6 +68,7 @@ template class Multicolumn { public: static page_impl impl(); static const plugin_id_t PAGE_FORMAT_ID = 1; + typedef TUPLE TUP; Multicolumn(int xid, Page *p, column_number_t column_count, plugin_id_t * plugins); diff --git a/src/stasis/page/compression/pageLayout.h b/src/stasis/page/compression/pageLayout.h new file mode 100644 index 0000000..ccc2d83 --- /dev/null +++ b/src/stasis/page/compression/pageLayout.h @@ -0,0 +1,62 @@ +#ifndef _ROSE_COMPRESSION_PAGELAYOUT_H__ +#define _ROSE_COMPRESSION_PAGELAYOUT_H__ +#include "compression.h" // for plugin_id +namespace rose { + // XXX need to be able to de-init this stuff. + static int cmp_num = 1; + static int init_num = 1; + template + class SingleColumnTypePageLayout { + public: + typedef FORMAT FMT; + static inline void initPageLayout() { + stasis_page_impl_register(FMT::impl()); + + // XXX these should register template instantiations of worker + // threads that are statically compiled to deal with the tree + // we're instantiating. + lsmTreeRegisterComparator(cmp_num, FMT::TUP::cmp); + lsmTreeRegisterPageInitializer + (init_num, (lsm_page_initializer_t)initPage); + my_cmp_num = cmp_num; + cmp_num++; + my_init_num = init_num; + init_num++; + } + static inline FORMAT * initPage(Page *p, const typename FORMAT::TUP * t) { + const column_number_t column_count = t->column_count(); + + plugin_id_t pluginid = plugin_id(); + + plugin_id_t * plugins = new plugin_id_t[column_count]; + for(column_number_t c = 0; c < column_count; c++) { + plugins[c] = pluginid; + } + FORMAT * f = new FORMAT(-1,p,column_count,plugins); + for(column_number_t c = 0; c < column_count; c++) { + COMPRESSOR* com = (COMPRESSOR*) f->compressor(c); + typename COMPRESSOR::TYP val = *(typename COMPRESSOR::TYP*)(t->get(c)); + com->offset(val); + } + return f; + } + static inline int cmp_id() { + return my_cmp_num; + } + static inline int init_id() { + return my_init_num; + } + private: + static int my_cmp_num; + static int my_init_num; + }; + template + int SingleColumnTypePageLayout::my_cmp_num = -1; + template + int SingleColumnTypePageLayout::my_init_num = -1; + + template + recordid TlsmTableAlloc(); + +} +#endif // _ROSE_COMPRESSION_PAGELAYOUT_H__ diff --git a/src/stasis/page/compression/rle.h b/src/stasis/page/compression/rle.h index 49c9ecd..626125d 100644 --- a/src/stasis/page/compression/rle.h +++ b/src/stasis/page/compression/rle.h @@ -16,6 +16,8 @@ class Rle { public: typedef uint32_t block_index_t; typedef uint16_t copy_count_t; + typedef TYPE TYP; + static const copy_count_t MAX_COPY_COUNT = USHRT_MAX; struct triple_t { diff --git a/src/stasis/page/compression/staticTuple.h b/src/stasis/page/compression/staticTuple.h index af71dc6..0a6f840 100644 --- a/src/stasis/page/compression/staticTuple.h +++ b/src/stasis/page/compression/staticTuple.h @@ -3,16 +3,23 @@ namespace rose { -template -class StaticTuple { - public: + template + class StaticTuple { + public: + static const char NORMAL = 0; + static const char TOMBSTONE = 1; + static const int TUPLE_ID = 1; + explicit inline StaticTuple() { + s.flag_ = NORMAL; initializePointers(); } - explicit inline StaticTuple(StaticTuple& t) { + s.flag_ = t.s.flag_; + s.epoch_ = t.s.epoch_; if(0 < N) s.cols0_ = t.s.cols0_; if(1 < N) s.cols1_ = t.s.cols1_; if(2 < N) s.cols2_ = t.s.cols2_; @@ -28,6 +35,21 @@ class StaticTuple { inline ~StaticTuple() { } + static inline byte_off_t sizeofBytes() { + return sizeof(flag_t) + sizeof(epoch_t) + + (0 < N) ? sizeof(TYPE0) : 0 + + (1 < N) ? sizeof(TYPE1) : 0 + + (2 < N) ? sizeof(TYPE2) : 0 + + (3 < N) ? sizeof(TYPE3) : 0 + + (4 < N) ? sizeof(TYPE4) : 0 + + (5 < N) ? sizeof(TYPE5) : 0 + + (6 < N) ? sizeof(TYPE6) : 0 + + (7 < N) ? sizeof(TYPE7) : 0 + + (8 < N) ? sizeof(TYPE8) : 0 + + (9 < N) ? sizeof(TYPE9) : 0 ; + + } + inline void* set(column_number_t col, void* val) { memcpy(cols_[col],val,size_[col]); return(cols_[col]); @@ -44,7 +66,7 @@ class StaticTuple { inline TYPE8 * set8(TYPE8* val) { s.cols8_=*val; } inline TYPE9 * set9(TYPE9* val) { s.cols9_=*val; } - inline const void* get(column_number_t col) const { + inline void* get(column_number_t col) const { return cols_[col]; } inline column_number_t column_count() const { return N; } @@ -52,13 +74,23 @@ class StaticTuple { inline byte_off_t column_len(column_number_t col) const { return size_[col]; } - inline byte* toByteArray() { - return &s; + inline byte* toByteArray() const { + return (byte*)&s; } - inline bool operator==(StaticTuple &t) { - return s == t.s; + inline bool operator==(const StaticTuple &t) const { + if(0 < N) if(s.cols0_ != t.s.cols0_) return 0; + if(1 < N) if(s.cols1_ != t.s.cols1_) return 0; + if(2 < N) if(s.cols2_ != t.s.cols2_) return 0; + if(3 < N) if(s.cols3_ != t.s.cols3_) return 0; + if(4 < N) if(s.cols4_ != t.s.cols4_) return 0; + if(5 < N) if(s.cols5_ != t.s.cols5_) return 0; + if(6 < N) if(s.cols6_ != t.s.cols6_) return 0; + if(7 < N) if(s.cols7_ != t.s.cols7_) return 0; + if(8 < N) if(s.cols8_ != t.s.cols8_) return 0; + if(9 < N) if(s.cols9_ != t.s.cols9_) return 0; + return 1; } - inline bool operator<(StaticTuple &t) { + inline bool operator<(const StaticTuple &t) const { if(0 < N) { if(s.cols0_ < t.s.cols0_) return 1; } @@ -101,6 +133,30 @@ class StaticTuple { return 0; } + static inline int cmp(const void *ap, const void *bp) { + const StaticTuple * a = (const StaticTuple*)ap; + const StaticTuple * b = (const StaticTuple*)bp; + if(*a < *b) { + return -1; + } else if(*a == *b) { + // Sort *backwards* on epoch values. + if(a->s.epoch_ > b->s.epoch_) { return 1; } + else if(a->s.epoch_ < b->s.epoch_) { return -1; } + else return 0; + } else { + return 1; + } + + } + + struct stl_cmp + { + bool operator()(const StaticTuple* s1, const StaticTuple* s2) const + { + return *s1 < *s2; + } + }; + class iterator { public: inline iterator(void const *const *const dataset, int offset) @@ -146,12 +202,14 @@ class StaticTuple { int off_; StaticTuple scratch_; }; - private: + private: explicit StaticTuple(const StaticTuple& t) { abort(); } void * cols_[N]; size_t size_[N]; + typedef char flag_t; + typedef unsigned int epoch_t; struct { TYPE0 cols0_; TYPE1 cols1_; @@ -163,6 +221,8 @@ class StaticTuple { TYPE7 cols7_; TYPE8 cols8_; TYPE9 cols9_; + flag_t flag_; + epoch_t epoch_; } s; inline void initializePointers() { @@ -188,7 +248,7 @@ class StaticTuple { if(8 < N) size_[8] = sizeof(s.cols8_); if(9 < N) size_[9] = sizeof(s.cols9_); } -}; + }; } diff --git a/src/stasis/page/compression/tuple.h b/src/stasis/page/compression/tuple.h index 8681a01..50a0756 100644 --- a/src/stasis/page/compression/tuple.h +++ b/src/stasis/page/compression/tuple.h @@ -47,7 +47,9 @@ class Tuple { } } */ inline ~Tuple() { delete[] cols_; delete[] byteArray_; } - + inline bool tombstone() { + return false; + } inline TYPE * set(column_number_t col,void* val) { cols_[col] = *(TYPE*)val; return (TYPE*)val; @@ -55,6 +57,11 @@ class Tuple { inline TYPE * get(column_number_t col) const { return &(cols_[col]); } + inline void copyFrom(Tuple t) { + for(int i = 0; i < count_; i++) { + set(i,t.get(i)); + } + } inline column_number_t column_count() const { return count_; } @@ -156,6 +163,7 @@ class Tuple { int off_; Tuple scratch_; }; + static const uint32_t TIMESTAMP = 0; private: Tuple() { abort(); } explicit Tuple(const Tuple& t) { abort(); } diff --git a/stasis/constants.h b/stasis/constants.h index c05c07d..206b629 100644 --- a/stasis/constants.h +++ b/stasis/constants.h @@ -244,6 +244,7 @@ extern const short SLOT_TYPE_LENGTHS[]; #define FILE_PERM (S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH) #define LOG_MODE (O_CREAT | O_RDWR | O_SYNC) -#define MAX_LSM_COMPARATORS 256 - +#define MAX_LSM_COMPARATORS 16 +#define MAX_LSM_PAGE_INITIALIZERS 256 +#define MAX_LSM_PAGE_ALLOCATORS 16 #endif diff --git a/stasis/operations/lsmTree.h b/stasis/operations/lsmTree.h index 4d82e8a..7ffcfe1 100644 --- a/stasis/operations/lsmTree.h +++ b/stasis/operations/lsmTree.h @@ -23,8 +23,20 @@ typedef struct { } lladd_lsm_iterator; typedef int(*lsm_comparator_t)(const void* a, const void* b); +typedef void*(*lsm_page_initializer_t)(Page *, void *); +typedef pageid_t(*lsm_page_allocator_t)(int, void *); void lsmTreeRegisterComparator(int id, lsm_comparator_t i); +void lsmTreeRegisterPageInitializer(int id, lsm_page_initializer_t i); + +pageid_t TlsmRegionAlloc(int xid, void *conf); +pageid_t TlsmRegionAllocRid(int xid, void *conf); +typedef struct { + pageid_t nextPage; + pageid_t endOfRegion; + pageid_t regionSize; +} TlsmRegionAllocConf_t; +extern TlsmRegionAllocConf_t LSM_REGION_ALLOC_STATIC_INITIALIZER; /** Initialize a new LSM tree. @@ -37,24 +49,23 @@ void lsmTreeRegisterComparator(int id, lsm_comparator_t i); @param keySize */ -recordid TlsmCreate(int xid, int comparator, int keySize); +recordid TlsmCreate(int xid, int comparator, + lsm_page_allocator_t allocator, void *allocator_state, + int keySize); /** Free the space associated with an LSM tree. */ -recordid TlsmDealloc(int xid, recordid tree); +recordid TlsmDealloc(int xid, + lsm_page_allocator_t allocator, void *allocator_state, + recordid tree); /** Append a new leaf page to an LSM tree. Leaves must be appended in ascending order; LSM trees do not support update in place. */ recordid TlsmAppendPage(int xid, recordid tree, - const byte *key, - long pageid); - -/** - Override the page allocation algorithm that LSM tree uses by default -*/ -void TlsmSetPageAllocator(pageid_t (*allocer)(int xid, void * ignored), - void * config); + const byte *key, + lsm_page_allocator_t allocator, void *allocator_state, + long pageid); /** Lookup a leaf page. diff --git a/test/stasis/check_lsmTree.c b/test/stasis/check_lsmTree.c index f30dfba..83f04c8 100644 --- a/test/stasis/check_lsmTree.c +++ b/test/stasis/check_lsmTree.c @@ -32,15 +32,18 @@ int cmp(const void *ap, const void *bp) { void insertProbeIter(lsmkey_t NUM_ENTRIES) { int intcmp = 0; lsmTreeRegisterComparator(intcmp,cmp); + TlsmRegionAllocConf_t alloc_conf = LSM_REGION_ALLOC_STATIC_INITIALIZER; Tinit(); int xid = Tbegin(); - recordid tree = TlsmCreate(xid, intcmp, sizeof(lsmkey_t)); + recordid tree = TlsmCreate(xid, intcmp, + TlsmRegionAlloc, &alloc_conf, + sizeof(lsmkey_t)); for(lsmkey_t i = 0; i < NUM_ENTRIES; i++) { long pagenum = TlsmFindPage(xid, tree, (byte*)&i); assert(pagenum == -1); DEBUG("TlsmAppendPage %d\n",i); - TlsmAppendPage(xid, tree, (const byte*)&i, i + OFFSET); + TlsmAppendPage(xid, tree, (const byte*)&i, TlsmRegionAlloc, &alloc_conf, i + OFFSET); pagenum = TlsmFindPage(xid, tree, (byte*)&i); assert(pagenum == i + OFFSET); }