From cd5ec5f70ce0f4b8eeed234ddd3c64f6c9204b72 Mon Sep 17 00:00:00 2001 From: Sears Russell Date: Mon, 15 Oct 2007 17:46:44 +0000 Subject: [PATCH] Initial ROSE check in. --- benchmarks/Makefile.am | 7 +- benchmarks/rose.cpp | 1495 +++++++++++++++++ benchmarks/rose.sh | 5 + src/stasis/page/compression/compression.h | 77 + src/stasis/page/compression/for-impl.h | 87 + src/stasis/page/compression/for.h | 158 ++ .../page/compression/multicolumn-impl.h | 266 +++ src/stasis/page/compression/multicolumn.h | 167 ++ src/stasis/page/compression/pstar-impl.h | 110 ++ src/stasis/page/compression/pstar.h | 119 ++ src/stasis/page/compression/rle-impl.h | 76 + src/stasis/page/compression/rle.h | 100 ++ src/stasis/page/compression/tuple.h | 336 ++++ 13 files changed, 3001 insertions(+), 2 deletions(-) create mode 100644 benchmarks/rose.cpp create mode 100755 benchmarks/rose.sh create mode 100644 src/stasis/page/compression/compression.h create mode 100644 src/stasis/page/compression/for-impl.h create mode 100644 src/stasis/page/compression/for.h create mode 100644 src/stasis/page/compression/multicolumn-impl.h create mode 100644 src/stasis/page/compression/multicolumn.h create mode 100644 src/stasis/page/compression/pstar-impl.h create mode 100644 src/stasis/page/compression/pstar.h create mode 100644 src/stasis/page/compression/rle-impl.h create mode 100644 src/stasis/page/compression/rle.h create mode 100644 src/stasis/page/compression/tuple.h diff --git a/benchmarks/Makefile.am b/benchmarks/Makefile.am index 7a35faf..0e3acaf 100644 --- a/benchmarks/Makefile.am +++ b/benchmarks/Makefile.am @@ -1,11 +1,14 @@ LDADD=$(top_builddir)/src/stasis/libstasis.la \ $(top_builddir)/src/libdfa/librw.la + +rose_SOURCES=rose.cpp + if BUILD_BENCHMARKS noinst_PROGRAMS=lhtableThreaded naiveHash logicalHash readLogicalHash naiveMultiThreaded logicalMultThreaded rawSet \ arrayListSet logicalMultiReaders linearHashNTA linkedListNTA pageOrientedListNTA \ - linearHashNTAThreaded linearHashNTAMultiReader linearHashNTAWriteRequests transitiveClosure zeroCopy sequentialThroughput + linearHashNTAThreaded linearHashNTAMultiReader linearHashNTAWriteRequests transitiveClosure zeroCopy sequentialThroughput rose endif AM_CFLAGS=${GLOBAL_CFLAGS} - +AM_CXXFLAGS=${GLOBAL_CXXFLAGS} -I ${top_builddir}/src SUBDIRS=berkeleyDB diff --git a/benchmarks/rose.cpp b/benchmarks/rose.cpp new file mode 100644 index 0000000..978b90c --- /dev/null +++ b/benchmarks/rose.cpp @@ -0,0 +1,1495 @@ +// Copyright 2007 Google Inc. All Rights Reserved. +// Author: sears@google.com (Rusty Sears) + +#include +#include +#include + +#include + +#include "stasis/page/compression/for-impl.h" +#include "stasis/page/compression/pstar-impl.h" +#include "stasis/page/compression/rle-impl.h" +#include "stasis/page/compression/multicolumn-impl.h" +#include "stasis/page/compression/tuple.h" + +#undef end +#undef begin + +/* + If this is defined, then check to see that the data produced by + decompressing the data actually matches the original dataset. +*/ + +#define CHECK_OUTPUT + +using rose::Pstar; +using rose::Multicolumn; +using rose::Tuple; +using rose::For; +using rose::Rle; +using rose::plugin_id_t; +using rose::column_number_t; +using rose::slot_index_t; + +typedef int32_t val_t; +static const int32_t GB = 1024 * 1024 * 1024; + +static int lsm_sim; // XXX this global variable shouldn't be global! + + +template +class treeIterator { + public: + explicit treeIterator(recordid tree, ROW &scratch, int keylen) : + tree_(tree), + scratch_(scratch), + keylen_(keylen), + lsmIterator_(lsmTreeIterator_open(-1,tree)), + slot_(0) + { + if(!lsmTreeIterator_next(-1, lsmIterator_)) { + currentPage_ = 0; + pageid_ = -1; + p_ = 0; + } else { + pageid_t * pid_tmp; + lsmTreeIterator_value(-1,lsmIterator_,(byte**)&pid_tmp); + pageid_ = *pid_tmp; + p_ = loadPage(-1,pageid_); + currentPage_ = (PAGELAYOUT*)p_->impl; + } + } + explicit treeIterator(treeIterator& t) : + tree_(t.tree_), + scratch_(t.scratch_), + keylen_(t.keylen_), + lsmIterator_(lsmTreeIterator_copy(-1,t.lsmIterator_)), + slot_(t.slot_), + pageid_(t.pageid_), + p_((Page*)((t.p_)?loadPage(-1,t.p_->id):0)), + currentPage_((PAGELAYOUT*)((p_)?p_->impl:0)) { + } + ~treeIterator() { + + lsmTreeIterator_close(-1, lsmIterator_); + if(p_) { + releasePage(p_); + p_ = 0; + } + } + ROW & operator*() { + ROW* readTuple = currentPage_->recordRead(-1,slot_, &scratch_); + + if(!readTuple) { + releasePage(p_); + p_=0; + if(lsmTreeIterator_next(-1,lsmIterator_)) { + pageid_t *pid_tmp; + + slot_ = 0; + + lsmTreeIterator_value(-1,lsmIterator_,(byte**)&pid_tmp); + pageid_ = *pid_tmp; + p_ = loadPage(-1,pageid_); + + currentPage_ = (PAGELAYOUT*)p_->impl; + + readTuple = currentPage_->recordRead(-1,slot_, &scratch_); + assert(readTuple); + } else { + // past end of iterator! "end" should contain the pageid of the + // last leaf, and 1+ numslots on that page. + abort(); + } + } + return scratch_; + } + inline bool operator==(const treeIterator &a) const { + return (slot_ == a.slot_ && pageid_ == a.pageid_); + } + inline bool operator!=(const treeIterator &a) const { + return !(*this==a); + } + inline void operator++() { + slot_++; + } + inline void operator--() { + // This iterator consumes its input, and only partially supports + // "==". "--" is just for book keeping, so we don't need to worry + // about setting the other state. + slot_--; + } + inline treeIterator* end() { + treeIterator* t = new treeIterator(tree_,scratch_,keylen_); + if(t->p_) { + releasePage(t->p_); + t->p_=0; + } + t->currentPage_ = 0; + + pageid_t pid = TlsmLastPage(-1,tree_); + if(pid != -1) { + t->pageid_= pid; + Page * p = loadPage(-1, t->pageid_); + PAGELAYOUT * lastPage = (PAGELAYOUT*)p->impl; + t->slot_ = 0; + while(lastPage->recordRead(-1,t->slot_,&scratch_)) { t->slot_++; } + releasePage(p); + } else { + // begin == end already; we're done. + } + return t; + } + private: + explicit treeIterator() { abort(); } + void operator=(treeIterator & t) { abort(); } + int operator-(treeIterator & t) { abort(); } + recordid tree_; + ROW & scratch_; + int keylen_; + lladdIterator_t * lsmIterator_; + slot_index_t slot_; + pageid_t pageid_; + Page * p_; + PAGELAYOUT * currentPage_; +}; + +/** + The following initPage() functions initialize the page that is + passed into them, and allocate a new PAGELAYOUT object of the + appropriate type. +*/ +template +static Pstar * initPage(Pstar **pstar, + Page *p, TYPE current) { + *pstar = new Pstar(-1, p); + (*pstar)->compressor()->offset(current); + return *pstar; +} +template +static Pstar * initPage(Pstar **pstar, + Page *p, Tuple & current) { + *pstar = new Pstar(-1, p); + (*pstar)->compressor()->offset(current); + return *pstar; +} + +template +static Multicolumn > * initPage(Multicolumn > ** mc, + Page *p, Tuple & t) { + column_number_t column_count = t.column_count(); + plugin_id_t plugin_id = + rose::plugin_id >,COMPRESSOR,TYPE>(); + + plugin_id_t * plugins = new plugin_id_t[column_count]; + for(column_number_t c = 0; c < column_count; c++) { + plugins[c] = plugin_id; + } + + *mc = new Multicolumn >(-1,p,column_count,plugins); + for(column_number_t c = 0; c < column_count; c++) { + ((COMPRESSOR*)(*mc)->compressor(c))->offset(*t.get(c)); + } + + delete [] plugins; + return *mc; +} +template +static Multicolumn > * initPage(Multicolumn > ** mc, + Page *p, TYPE t) { + plugin_id_t plugin_id = + rose::plugin_id >,COMPRESSOR,TYPE>(); + + plugin_id_t * plugins = new plugin_id_t[1]; + plugins[0] = plugin_id; + + *mc = new Multicolumn >(-1,p,1,plugins); + ((COMPRESSOR*)(*mc)->compressor(0))->offset(t); + + delete [] plugins; + return *mc; +} + +#define FIRST_PAGE 1 + +/** + Bypass stasis' allocation mechanisms. Stasis' page allocation + costs should be minimal, but this program doesn't support + durability yet, and using the normal allocator would complicate + things. */ +pageid_t roseFastAlloc(int xid, void * conf) { + static pthread_mutex_t alloc_mut = PTHREAD_MUTEX_INITIALIZER; + int *num_pages = (int*)conf; + pthread_mutex_lock(&alloc_mut); + pageid_t ret = FIRST_PAGE + *num_pages; + (*num_pages)++; + pthread_mutex_unlock(&alloc_mut); + return ret; +} + +/*static pthread_key_t alloc_key; +static pthread_once_t alloc_key_once; +static void alloc_setup(void) { + pthread_once(&alloc_key_once,alloc_key_create); + pthread_setspecific(alloc_key,malloc(sizeof(alloc_struct))); + } */ + +#define INT_CMP 1 +#define TUP_CMP 2 +/** + Comparators for the lsmTrees + */ +template +int intCmp(const void *ap, const void *bp) { + TYPE a = *(TYPE*)ap; + TYPE b = *(TYPE*)bp; + if(ab) { return 1; } + return 0; +} + +template +int tupCmp(const void *ap, const void *bp) { + column_number_t count = *(column_number_t*)ap; + TYPE * a = (TYPE*)(1+(column_number_t*)ap); + TYPE * b = (TYPE*)(1+(column_number_t*)bp); + + for(column_number_t i = 0; i < count; i++) { + if(a[i] < b[i]) { return -1; } + if(a[i] > b[i]) { return 1; } + } + return 0; +} + +/** + cast from a TYPE to a byte *. These were written to make it easier + to write templates that work with different types of iterators. +*/ +inline const byte * toByteArray(int const *const *const i) { + return (const byte*)*i; +} +inline const byte * toByteArray(Tuple::iterator * const t) { + return (**t).toByteArray(); +} + +#define RAND_TUP_CHECK +#define RAND_TUP_YES 0 +#define RAND_TUP_NO 1 +#define RAND_TUP_NEVER -1 +#define RAND_TUP_BROKE -2 +/** + Produce a stream of random tuples. The stream is repeatable, and + supports increment (each value in the stream must be read using + operator*()), and decrement (at least one increment must occur + between consecutive decrements). + */ + +template class randomIterator; + +template +inline const byte * toByteArray(randomIterator * const t); + +template +class randomIterator { + public: + randomIterator(unsigned int seed, unsigned int count, + column_number_t col_count, double bump_prob, int ret_tuple) : + ret_tuple_(ret_tuple), + off_(0), + count_(count), + col_count_(col_count), + bump_thresh_(static_cast(bump_prob*static_cast(RAND_MAX))), + random_state_(seed), + can_deref_(RAND_TUP_YES), + scratch_(col_count_) + { + TYPE val =0; + for(column_number_t c = 0; c < col_count_; c++) { + scratch_.set(c,&val); + } + } + explicit randomIterator(const randomIterator &i) : + ret_tuple_(i.ret_tuple_), + off_(i.off_), + count_(i.count_), + col_count_(i.col_count_), + bump_thresh_(i.bump_thresh_), + random_state_(i.random_state_), + can_deref_(i.can_deref_), + scratch_(col_count_) { + if(lsm_sim) { // XXX hack! + struct timeval s; + gettimeofday(&s, 0); + random_state_ = s.tv_usec; + } + for(column_number_t c = 0; c < col_count_; c++) { + scratch_.set(c,i.scratch_.get(c)); + } + } + + Tuple& operator*() { + if(can_deref_ == RAND_TUP_NO) { return scratch_; } +#ifdef RAND_TUP_CHECK + assert(can_deref_ == RAND_TUP_YES); +#endif + can_deref_ = RAND_TUP_NO; + for(column_number_t i = 0; i < col_count_; i++) { + unsigned int bump = + rand_r(&random_state_) < bump_thresh_; + TYPE val = bump+*scratch_.get(i); + scratch_.set(i,&val); + } + return scratch_; + } + inline bool operator==(const randomIterator &a) const { + return(off_ == a.off_); + } + inline bool operator!=(const randomIterator &a) const { + return(off_ != a.off_); + } + inline void operator++() { + off_++; + if(can_deref_ == RAND_TUP_NO) { + can_deref_ = RAND_TUP_YES; + } else if(can_deref_ == RAND_TUP_NEVER) { + can_deref_ = RAND_TUP_NO; + } else { + assert(can_deref_ == RAND_TUP_BROKE); + } + } + inline void operator+=(int i) { + can_deref_ = RAND_TUP_BROKE; + off_+=i; + } + inline void operator--() { + off_--; +#ifdef RAND_TUP_CHECK + assert(can_deref_ != RAND_TUP_NEVER); +#endif + if(can_deref_ == RAND_TUP_YES) { + can_deref_ = RAND_TUP_NO; + } else if(can_deref_ == RAND_TUP_NO) { + can_deref_ = RAND_TUP_NEVER; + } else { + assert(can_deref_ == RAND_TUP_BROKE); + } + } + inline int operator-(randomIterator&i) { + return off_ - i.off_; + } + inline void operator=(randomIterator const &i) { + off_ = i.off_; + count_ = i.count_; + col_count_ = i.col_count_; + bump_thresh_ = i.bump_thresh_; + random_state_=i.random_state_; + for(column_number_t c = 0; c < col_count_; c++) { + scratch_.set(c,i.scratch_.get(c)); + } + can_deref_ = i.can_deref_; + } + inline void offset(unsigned int off) { + off_ = off; + } + private: + int ret_tuple_; + unsigned int off_; + unsigned int count_; + column_number_t col_count_; + long bump_thresh_; + unsigned int random_state_; + int can_deref_; + Tuple scratch_; + friend const byte* toByteArray(randomIterator * const t); +}; +#undef RAND_TUP_CHECK +/** Produce a byte array from the value stored at t's current + position. If ret_tuple_ is false, it converts the first value + of the tuple into a byte array. Otherwise, it converts the iterator's + current tuple into a byte array. +*/ +inline const byte * toByteArray(randomIterator * const t) { + if(t->ret_tuple_ == 0) { + return (const byte*)(**t).get(0); + } else { + return (**t).toByteArray(); + } +} + +template class mergeIterator; + +template +inline const byte * toByteArray(mergeIterator * const t); + +template +class mergeIterator { + private: + static const int A = 0; + static const int B = 1; + static const int NONE = -1; + static const int BOTH = -2; + + inline int calcCurr(int oldcur) { + int cur; + if(oldcur == NONE) { return NONE; } + if(a_ == aend_) { + if(b_ == bend_) { + cur = NONE; + } else { + cur = B; + } + } else { + if(b_ == bend_) { + cur = A; + } else { + if((*a_) < (*b_)) { + cur = A; + } else if((*a_) == (*b_)) { + cur = BOTH; + } else { + cur = B; + } + } + } + return cur; + } + public: + mergeIterator(ITER & a, ITER & b, ITER & aend, ITER & bend) : + off_(0), + a_(a), + b_(b), + aend_(aend), + bend_(bend), + curr_(calcCurr(A)), + before_eof_(0) + {} + explicit mergeIterator(mergeIterator &i) : + off_(i.off_), + a_(i.a_), + b_(i.b_), + aend_(i.aend_), + bend_(i.bend_), + curr_(i.curr_), + before_eof_(i.before_eof_) + { } + + ROW& operator* () { + if(curr_ == A || curr_ == BOTH) { return *a_; } + if(curr_ == B) { return *b_; } + abort(); + } + void seekEnd() { + curr_ = NONE; + } + // XXX Only works if exactly one of the comparators is derived from end. + inline bool operator==(const mergeIterator &o) const { + if(curr_ == NONE && o.curr_ == NONE) { + return 1; + } else if(curr_ != NONE && o.curr_ != NONE) { + return (a_ == o.a_) && (b_ == o.b_); + } + return 0; + } + inline bool operator!=(const mergeIterator &o) const { + return !(*this == o); + } + inline void operator++() { + off_++; + if(curr_ == BOTH) { + ++a_; + ++b_; + } else { + if(curr_ == A) { ++a_; } + if(curr_ == B) { ++b_; } + } + curr_ = calcCurr(curr_); + } + inline void operator--() { + off_--; + if(curr_ == BOTH) { + --a_; + --b_; + } else { + if(curr_ == A) { --a_; } + if(curr_ == B) { --b_; } + } + if(curr_ == NONE) { + before_eof_ = 1; + } else { + before_eof_ = 0; + } + } + inline int operator-(mergeIterator&i) { + return off_ - i.off_; + } + inline void operator=(mergeIterator const &i) { + off_ = i.off_; + a_ = i.a_; + b_ = i.b_; + aend_ = i.aend_; + bend_ = i.bend_; + curr_ = i.curr_; + before_eof_ = i.before_eof; + } + inline unsigned int offset() { return off_; } + private: + unsigned int off_; + ITER a_; + ITER b_; + ITER aend_; + ITER bend_; + int curr_; + int before_eof_; + friend const byte* toByteArray(mergeIterator * const t); +}; +#undef RAND_TUP_CHECK + +/** Produce a byte array from the value stored at t's current + position */ +template +inline const byte * toByteArray(mergeIterator * const t) { + if(t->curr_ == t->A || t->curr_ == t->BOTH) { + return toByteArray(&t->a_); + } else if(t->curr_ == t->B) { + return toByteArray(&t->b_); + } + abort(); +} + +/** + Create pages that are managed by Pstar, and + use them to store a compressed representation of the data set. + + @param dataset A pointer to the data that should be compressed. + @param inserts The number of elements in dataset. + + @return the number of pages that were needed to store the + compressed data. +*/ +template +pageid_t compressData(ITER * const begin, ITER * const end, + int buildTree, recordid tree, pageid_t (*pageAlloc)(int,void*), + void *pageAllocState, uint64_t * inserted) { + + *inserted = 0; + + if(*begin == *end) { + return 0; + } + + pageid_t next_page = pageAlloc(-1,pageAllocState); + Page *p = loadPage(-1, next_page); + pageid_t pageCount = 0; + + + if(*begin != *end && buildTree) { + TlsmAppendPage(-1,tree,toByteArray(begin),p->id); + } + pageCount++; + + PAGELAYOUT * mc; + + initPage(&mc, p, **begin); + + int lastEmpty = 0; + + for(ITER i(*begin); i != *end; ++i) { + rose::slot_index_t ret = mc->append(-1, *i); + + (*inserted)++; + + if(ret == rose::NOSPACE) { + p->dirty = 1; + mc->pack(); + releasePage(p); + + --(*end); + if(i != *end) { + next_page = pageAlloc(-1,pageAllocState); + p = loadPage(-1, next_page); + + mc = initPage(&mc, p, *i); + if(buildTree) { + TlsmAppendPage(-1,tree,toByteArray(&i),p->id); + } + pageCount++; + lastEmpty = 0; + } else { + lastEmpty = 1; + } + ++(*end); + --i; + } + } + + p->dirty = 1; + mc->pack(); + releasePage(p); + return pageCount; +} + +template +inline const byte* toByteArray(treeIterator *const t) { + return (const byte*)&(**t); +} +template +inline const byte* toByteArray(treeIterator,PAGELAYOUT> *const t) { + return (**t).toByteArray(); +} + +/** + Read compressed data from pages starting at FIRST_PAGE. + + @param num_pages the number of compressed pages to be read. + @param dataset a pointer to the uncompresed representation of the data + that was inserted. This function uses this as a sanity check to + make sure that the value read from the pages matches the original + data. + @return The nubmer of values read from the compressed pages. +*/ +template +int readData(pageid_t firstPage, unsigned int num_pages, + ITER &iter1, ROW* scratch) { + + // Copy the iterator (use the original to compute the number + // of rows read below). + ITER iter(iter1); + + for(unsigned int j = 0; j < num_pages; j++) { + Page * p = loadPage(-1, firstPage + j); + PAGELAYOUT * mc = (PAGELAYOUT*)(p->impl); + + int slot = 0; + + for(ROW* i = mc->recordRead(-1, slot, scratch); + i; i = mc->recordRead(-1, slot, scratch)) { +#ifdef CHECK_OUTPUT + assert(*i == *iter); +#endif + ++(iter); + slot++; + } + releasePage(p); + } + unsigned int count = iter-iter1; + iter1 = iter; + return count; +} +/** + Like readData, but uses an lsm tree to locate the pages. +*/ +template +int readDataFromTree(recordid tree, ITER &iter, ROW *scratch, int keylen) { + + unsigned int count = 0; + lladdIterator_t * it = lsmTreeIterator_open(-1,tree); + + while(lsmTreeIterator_next(-1, it)) { + byte * firstPage; + int valsize = lsmTreeIterator_value(-1,it,&firstPage); +#ifdef CHECK_OUTPUT + assert(valsize == sizeof(pageid_t)); + byte * scratchBuf; + int keysize = lsmTreeIterator_key(-1,it,&scratchBuf); + assert(keysize == keylen); + + const byte * iterBuf = toByteArray(&iter); + assert(!memcmp(iterBuf,scratchBuf,keylen)); +#else + (void)valsize; +#endif + count += + readData(*(pageid_t*)firstPage,1,iter,scratch); + } + lsmTreeIterator_close(-1,it); + return count; +} + +double tv_to_double(struct timeval tv) { + return static_cast(tv.tv_sec) + + (static_cast(tv.tv_usec) / 1000000.0); +} + +template +struct insert_args { + int comparator_idx; + int rowsize; + ITER *begin; + ITER *end; + pageid_t(*pageAlloc)(int,void*); + void *pageAllocState; + pthread_mutex_t * block_ready_mut; + pthread_cond_t * block_needed_cond; + pthread_cond_t * block_ready_cond; + int max_waiters; + int wait_count; + recordid * wait_queue; + ROW *scratchA; + ROW *scratchB; + pageid_t mergedPages; +}; + +template +void* insertThread(void* arg) { + insert_args* a = + (insert_args*)arg; + + struct timeval start_tv, start_wait_tv, stop_tv; + + int insert_count = 0; + + pageid_t lastTreeBlocks = 0; + uint64_t lastTreeInserts = 0; + pageid_t desiredInserts = 0; + + // this is a hand-tuned value; it should be set dynamically, not staticly + double K = 0.18; + + // loop around here to produce multiple batches for merge. + while(1) { + gettimeofday(&start_tv,0); + ITER i(*(a->begin)); + ITER j(desiredInserts ? *(a->begin) : *(a->end)); + if(desiredInserts) { + j += desiredInserts; + } + recordid tree = TlsmCreate(-1, a->comparator_idx,a->rowsize); + lastTreeBlocks = compressData + (&i, &j,1,tree,a->pageAlloc,a->pageAllocState, &lastTreeInserts); + + gettimeofday(&start_wait_tv,0); + pthread_mutex_lock(a->block_ready_mut); + while(a->wait_count >= a->max_waiters) { + pthread_cond_wait(a->block_needed_cond,a->block_ready_mut); + } + + memcpy(&a->wait_queue[a->wait_count],&tree,sizeof(recordid)); + a->wait_count++; + + pthread_cond_signal(a->block_ready_cond); + gettimeofday(&stop_tv,0); + double work_elapsed = tv_to_double(start_wait_tv) - tv_to_double(start_tv); + double wait_elapsed = tv_to_double(stop_tv) - tv_to_double(start_wait_tv); + double elapsed = tv_to_double(stop_tv) - tv_to_double(start_tv); + printf("insert# %-6d waited %6.1f sec " + "worked %6.1f sec inserts %-12ld (%9.3f mb/s)\n", + ++insert_count, + wait_elapsed, + work_elapsed, + (long int)lastTreeInserts, + (lastTreeInserts*(uint64_t)a->rowsize / (1024.0*1024.0)) / elapsed); + + if(a->mergedPages != -1) { + desiredInserts = (pageid_t)(((double)a->mergedPages / K) + * ((double)lastTreeInserts + / (double)lastTreeBlocks)); + } + pthread_mutex_unlock(a->block_ready_mut); + + } + return 0; +} + +template +void* mergeThread(void* arg) { + insert_args* a = + (insert_args*)arg; + + struct timeval start_tv, wait_tv, stop_tv; + + int merge_count = 0; + // loop around here to produce multiple batches for merge. + while(1) { + + gettimeofday(&start_tv,0); + + pthread_mutex_lock(a->block_ready_mut); + while(a->wait_count <2) { + pthread_cond_wait(a->block_ready_cond,a->block_ready_mut); + } + + gettimeofday(&wait_tv,0); + + recordid * oldTreeA = &a->wait_queue[0]; + recordid * oldTreeB = &a->wait_queue[1]; + + pthread_mutex_unlock(a->block_ready_mut); + + recordid tree = TlsmCreate(-1, a->comparator_idx,a->rowsize); + + treeIterator taBegin(*oldTreeA,*(a->scratchA),a->rowsize); + treeIterator tbBegin(*oldTreeB,*(a->scratchB),a->rowsize); + + treeIterator *taEnd = taBegin.end(); + treeIterator *tbEnd = tbBegin.end(); + + mergeIterator,ROW> + mBegin(taBegin, tbBegin, *taEnd, *tbEnd); + + mergeIterator,ROW> + mEnd(taBegin, tbBegin, *taEnd, *tbEnd); + + + mEnd.seekEnd(); + uint64_t insertedTuples; + pageid_t mergedPages = compressData,ROW>,ROW> + (&mBegin, &mEnd,1,tree,a->pageAlloc,a->pageAllocState,&insertedTuples); + + delete taEnd; + delete tbEnd; + + gettimeofday(&stop_tv,0); + + pthread_mutex_lock(a->block_ready_mut); + + a->mergedPages = mergedPages; + + // TlsmFree(wait_queue[0]) /// XXX Need to implement (de)allocation! + // TlsmFree(wait_queue[1]) + + memcpy(&a->wait_queue[0],&tree,sizeof(tree)); + for(int i = 1; i + 1 < a->wait_count; i++) { + memcpy(&a->wait_queue[i],&a->wait_queue[i+1],sizeof(tree)); + } + a->wait_count--; + pthread_mutex_unlock(a->block_ready_mut); + + merge_count++; + + double wait_elapsed = tv_to_double(wait_tv) - tv_to_double(start_tv); + double work_elapsed = tv_to_double(stop_tv) - tv_to_double(wait_tv); + double total_elapsed = wait_elapsed + work_elapsed; + double ratio = ((double)(insertedTuples * (uint64_t)a->rowsize)) + / (double)(PAGE_SIZE * mergedPages); + double throughput = ((double)(insertedTuples * (uint64_t)a->rowsize)) + / (1024.0 * 1024.0 * total_elapsed); + + printf("merge # %-6d: comp ratio: %-9.3f waited %6.1f sec " + "worked %6.1f sec inserts %-12ld (%9.3f mb/s)\n", merge_count, ratio, + wait_elapsed, work_elapsed, (unsigned long)insertedTuples, throughput); + + pthread_cond_signal(a->block_needed_cond); + } + return 0; +} + + + +/** + Test driver for lsm tree. This function dispatches to the correct + invocations of compressData() and readDataFromTree(). +*/ +template +void run_test(unsigned int inserts, column_number_t column_count, + int buildTree, ITER& begin, ITER& end, int comparator_idx, + int rowsize, ROW &scratch) { + + // Init storage -------------------------------------------- + + struct timeval start_tv, stop_tv; + double elapsed, start, stop, decompressed_size; + + unlink("storefile.txt"); + unlink("logfile.txt"); + + // sync to minimize the measured performance impact of the file + // deletions. + sync(); + + stasis_page_impl_register(Pstar, val_t>::impl()); + stasis_page_impl_register(Pstar, val_t>::impl()); + stasis_page_impl_register(Multicolumn >::impl()); + + bufferManagerNonBlockingSlowHandleType = IO_HANDLE_PFILE; + + lsmTreeRegisterComparator(INT_CMP, intCmp); + lsmTreeRegisterComparator(TUP_CMP, tupCmp); + + int num_pages = 0; + + TlsmSetPageAllocator(roseFastAlloc, &num_pages); + + Tinit(); + + recordid tree = NULLRID; + + pthread_mutex_t block_ready_mut = PTHREAD_MUTEX_INITIALIZER; + pthread_cond_t block_needed_cond = PTHREAD_COND_INITIALIZER; + pthread_cond_t block_ready_cond = PTHREAD_COND_INITIALIZER; + int max_waiters = 3; // merged, old, new. + if(lsm_sim) { + struct insert_args args = { + comparator_idx, + rowsize, + &begin, + &end, + roseFastAlloc, + &num_pages, + &block_ready_mut, + &block_needed_cond, + &block_ready_cond, + max_waiters, // max waiters + 0, // wait count + new recordid[max_waiters], // wait queue + new ROW(scratch), + new ROW(scratch), + -1 // merged pages + }; + + pthread_t inserter; + pthread_create(&inserter, 0, + (void*(*)(void*))insertThread, + &args); + pthread_t merger; + pthread_create(&merger, 0, + (void*(*)(void*))mergeThread, + &args); + + pthread_join(inserter,0); + pthread_join(merger,0); + return; + } + + // Compress data ------------------------------------------- + + gettimeofday(&start_tv, 0); + + if(buildTree) { + tree = TlsmCreate(-1, comparator_idx,rowsize); + } + + uint64_t insertedByCompress; + compressData + (&begin, &end,buildTree,tree,roseFastAlloc,(void*)&num_pages, + &insertedByCompress); + + gettimeofday(&stop_tv, 0); + + start = tv_to_double(start_tv); + stop = tv_to_double(stop_tv); + elapsed = stop - start; + decompressed_size = + static_cast(inserts * sizeof(val_t) * column_count); + + printf("%8d %9.2fx", num_pages, + decompressed_size/static_cast(num_pages*PAGE_SIZE)); + printf(" %10.3f", decompressed_size / (GB*elapsed)); + fflush(stdout); + + // the two commented out bodies of this if test merge iterators + // and tree iterators + + // if(buildTree) { + // gettimeofday(&start_tv, 0); + + // int oldNumPages = num_pages; + + // ROW scratch_row(column_count); // XXX use scratch_row? + + /* + treeIterator treeBegin(tree,scratch,rowsize); + treeIterator * treeEnd = treeBegin.end(); + tree = TlsmCreate(-1, comparator_idx,rowsize); + compressData,ROW> + (&treeBegin, treeEnd,buildTree,tree,&num_pages); + delete treeEnd; + */ + + + /* treeIterator treeBegin(tree,scratch,rowsize); + treeIterator treeBegin2(tree,scratch,rowsize); + treeIterator * treeEnd = treeBegin.end(); + treeIterator * treeEnd2 = treeBegin2.end(); + treeIterator * treeEnd3 = treeBegin2.end(); + + mergeIterator,ROW> mbegin(treeBegin,*treeEnd3,*treeEnd,*treeEnd2); + //mergeIterator,ROW> mbegin(treeBegin,treeBegin,*treeEnd,*treeEnd2); + mergeIterator,ROW> mend(treeBegin,*treeEnd3,*treeEnd,*treeEnd2); + mend.seekEnd(); + + assert(inserts == 0 || mbegin != mend); + + tree = TlsmCreate(-1, comparator_idx,rowsize); + + + compressData< + PAGELAYOUT, + ENGINE, + TYPE, + mergeIterator, ROW>, + ROW> + (&mbegin,&mend,buildTree,tree,&num_pages); + + + delete treeEnd; + delete treeEnd2; + delete treeEnd3; + + + assert(num_pages - oldNumPages == oldNumPages); + + gettimeofday(&stop_tv, 0); + + start = tv_to_double(start_tv); + stop = tv_to_double(stop_tv); + elapsed = stop - start; + decompressed_size = + static_cast(inserts * sizeof(val_t) * column_count); + + + // printf("%8d %9.2fx", num_pages - oldNumPages, + // decompressed_size/static_cast((num_pages-oldNumPages)*PAGE_SIZE)); + printf(" %10.3f", decompressed_size / (GB*elapsed)); + fflush(stdout); + */ + // } + // Read data ------------------------------------------- + + gettimeofday(&start_tv, 0); + + unsigned int count; + + if(!buildTree) { + ITER i(begin); + count = readData + (FIRST_PAGE, num_pages, i, &scratch); + } else { + ITER i(begin); + count = readDataFromTree + (tree, i, &scratch, rowsize); + } + + gettimeofday(&stop_tv, 0); + + assert(count == inserts); + + start = tv_to_double(start_tv); + stop = tv_to_double(stop_tv); + elapsed = stop - start; + decompressed_size = + static_cast(inserts * sizeof(val_t) * column_count); + printf(" %11.3f", decompressed_size / (GB*elapsed)); + fflush(stdout); + Tdeinit(); + +} + +/** + An extra dispatch function. This function and run_test perform + nested template instantiations. Breaking it into two functions + keeps the code size from exploding. +*/ +template +void run_test2(int engine, int multicolumn, unsigned int inserts, + ITER &begin, ITER &end, + column_number_t column_count, int buildTree) { + + if(multicolumn) { + int rowsize = Tuple::sizeofBytes(column_count); + Tuple scratch(column_count); + + switch(engine) { + case Rle::PLUGIN_ID: { + run_test >,Rle,ITER, + Tuple,val_t> + (inserts, column_count, buildTree,begin,end,TUP_CMP,rowsize,scratch); + } break; + case For::PLUGIN_ID: { + run_test >,For,ITER, + Tuple,val_t> + (inserts, column_count, buildTree,begin,end,TUP_CMP,rowsize,scratch); + } break; + default: abort(); + } + } else { + int rowsize = sizeof(val_t); + val_t scratch; + column_count = 1; + switch(engine) { + case Rle::PLUGIN_ID: { + run_test,val_t>,Rle,typeof(begin), + val_t,val_t> + (inserts, column_count,buildTree,begin,end,INT_CMP,rowsize,scratch); + } break; + case For::PLUGIN_ID: { + run_test,val_t>,For,typeof(begin), + val_t,val_t> + (inserts, column_count,buildTree,begin,end,INT_CMP,rowsize,scratch); + } break; + default: abort(); + } + } +} + +const char * usage = +"Usage:\n" +"Mode 1: Generate synthetic data\n" +"\n\t%s [-e engine] [-i insert_count] [-s random_seed] [-p p(bump)]\n" +"\t\t[-n col_count] [-m] [-t] [-l]\n\n" +"Mode 2: Read data from csv file\n" +"\n\t%s -e engine -f filename -c col_num1 -c col_num2 ... [-m] [-t] [-l]\n\n" +"Mode 3: Simulate replicatio- by running a continuous insert / merge job\n" +"\n\t%s -r [synthetic data options]\n\n" +"- engine is 1 for run length encoding, 2 for frame of reference.\n" +" If engine is not specified runs both engines with and without multicolumn\n" +" support.\n" +"- column_number starts at zero\n" +"- p(bump) is the probability of incrementing each generated value.\n" +"- -f reads from a CSV file, repeated -c arguments pick columns\n" +"- -n col_count provides a data set with the given number of columns\n" +"- -m enables multicolumn page format\n" +"- -t builds an lsmTree from the compressed pages\n" +"- -l pipelines the data instead of buffering the whole dataset in RAM\n" +"\n"; +int main(int argc, char **argv) { + // Parse arguments ----------------------------------------- + + unsigned int inserts = 1000000; + double bump_prob = 0.001; + + unsigned int seed = 0; + int engine = -1; + + int file_mode = 0; + int column_count = 0; + int requested_column_count = 0; + int * column = new int[column_count+1]; + int buildTree = 0; + int pipelined = 0; + char * file = 0; + + int multicolumn = 0; + + for (int i = 1; i < argc; i++) { + if (!strcmp(argv[i], "-e")) { + i++; + assert(i < argc); + engine = atoi(argv[i])-1; + } else if (!strcmp(argv[i], "-i")) { + i++; + assert(i < argc); + inserts = atoi(argv[i]); + } else if (!strcmp(argv[i], "-s")) { + i++; + assert(i < argc); + seed = atoi(argv[i]); + } else if (!strcmp(argv[i], "-p")) { + i++; + assert(i < argc); + bump_prob = atof(argv[i]); + } else if (!strcmp(argv[i], "-f")) { + i++; + assert(i < argc); + file_mode = 1; + file = argv[i]; + } else if (!strcmp(argv[i], "-n")) { + i++; + assert(i < argc); + requested_column_count = atoi(argv[i]); + assert(requested_column_count == + (column_number_t)requested_column_count); + } else if (!strcmp(argv[i], "-c")) { + i++; + assert(i < argc); + column[column_count] = atoi(argv[i]); + column_count++; + column = reinterpret_cast(realloc(column, + (column_count+1) * sizeof(int))); + } else if (!strcmp(argv[i], "-m")) { + multicolumn = 1; + } else if (!strcmp(argv[i], "-t")) { + buildTree = 1; + } else if (!strcmp(argv[i], "-l")) { + pipelined = 1; + } else if (!strcmp(argv[i], "-r")) { + lsm_sim = 1; + pipelined = 1; // XXX otherwise, we'd core dump later... + } else { + printf("Unrecognized argument: %s\n", argv[i]); + printf(usage, argv[0], argv[0], argv[0]); + return 1; + } + } + + if(lsm_sim && file_mode) { + printf("Sorry, lsm simulation doesn't work with file input.\n"); + printf(usage, argv[0], argv[0], argv[0]); + return 1; + } + + char * engine_name; + + switch (engine) { + case Rle::PLUGIN_ID: { + engine_name = "RLE"; + } break; + case For::PLUGIN_ID: { + engine_name = "PFOR"; + } break; + case -1: { + engine_name = "Time trial (multiple engines)"; + } break; + default: { + printf("Specify a valid compression scheme\n"); + printf(usage, argv[0], argv[0], argv[0]); + return 1; + } + } + printf("Compression scheme: %s\n", engine_name); + printf("Page size: %d\n", PAGE_SIZE); + + srandom(seed); + + // These are used throughout the rest of this file. + + struct timeval start_tv, stop_tv; + double elapsed, start, stop, decompressed_size; + + // Generate data ------------------------------------------- + + val_t current = 0; + + // dataset is managed by malloc so that it can be realloc()'ed + val_t **dataset; + + if(requested_column_count && file_mode) { + printf("-n and -f are incompatible\n"); + printf(usage,argv[0],argv[0],argv[0]); + return 1; + } + if(!file_mode) { + if(!requested_column_count) { + requested_column_count = 1; + } + column_count = requested_column_count; + } + + printf("P(bump): %f\n", bump_prob); + printf("Random seed: %d\n", seed); + printf("Column count: %d\n", column_count); + + + gettimeofday(&start_tv, 0); + + if ((!file_mode)) { + if(!pipelined) { + + dataset = new val_t*[column_count]; + + for(int col = 0; col < column_count; col++) { + current = 0; + dataset[col] + = reinterpret_cast(malloc(sizeof(val_t) * inserts)); + for (unsigned int i = 0; i < inserts; i++) { + if (bump_prob == 1) { + current++; + } else { + while (static_cast(random()) + / static_cast(RAND_MAX) < bump_prob) { + current++; + } + } + dataset[col][i] = current; + } + } + } else { + dataset = 0; + } + } else { + + dataset = new val_t*[column_count]; + int max_col_number = 0; + for(int col = 0; col < column_count; col++) { + max_col_number = max_col_number < column[col] + ? column[col] : max_col_number; + + dataset[col] = reinterpret_cast(malloc(sizeof(val_t))); + } + max_col_number++; + char **toks = reinterpret_cast + (malloc(sizeof(char *) * max_col_number)); + + printf("Reading from file %s ", file); + + inserts = 0; + + size_t line_len = 100; + // getline wants malloced memory (it probably calls realloc...) + char * line = reinterpret_cast(malloc(sizeof(char) * line_len)); + + FILE * input = fopen(file, "r"); + if (!input) { + perror("Couldn't open input"); + return 1; + } + ssize_t read_len; + + while (-1 != (read_len = getline(&line, &line_len, input))) { + int line_tok_count; + { + char *saveptr; + int i; + toks[0] = strtok_r(line, ",\n", &saveptr); + for (i = 1; i < max_col_number; i++) { + toks[i] = strtok_r(0, ",\n", &saveptr); + if (!toks[i]) { + break; + } + // printf("found token: %s\n",toks[i]); + } + line_tok_count = i; + } + if (line_tok_count < max_col_number) { + if (-1 == getline(&line, &line_len, input)) { + // done parsing file + } else { + printf("Not enough tokens on line %d (found: %d expected: %d)\n", + inserts+1, line_tok_count, max_col_number); + return 1; + } + } else { + inserts++; + for(int col = 0; col < column_count; col++) { + dataset[col] = reinterpret_cast( + realloc(dataset[col], sizeof(val_t) * (inserts + 1))); + errno = 0; + char * endptr; + dataset[col][inserts] + = (val_t)strtoll(toks[column[col]], &endptr, 0); + if (strlen(toks[column[col]]) - + (size_t)(endptr-toks[column[col]]) > 1) { + printf("Couldn't parse token #%d: %s\n", col, toks[column[col]]); + return 1; + } + if (errno) { + printf("Couldn't parse token #%d: %s", col,toks[column[col]]); + perror("strtoll error is"); + return 1; + } + // printf("token: %d\n", dataset[inserts]); + } + } + } + fclose(input); + + gettimeofday(&stop_tv, 0); + printf("%10d tuples ", inserts); + + start = tv_to_double(start_tv); + stop = tv_to_double(stop_tv); + + elapsed = stop - start; + decompressed_size = static_cast(inserts * sizeof(val_t)); + + printf ("at %6.3f gb/s\n", + column_count*decompressed_size / (GB * elapsed)); + } + + if(column_count > 1 && (!multicolumn || engine == -1)) { + printf("\nWARNING: Pstar will only use the first column.\n"); + } + + if(!pipelined) { + + Tuple::iterator begin(column_count, dataset,0); + Tuple::iterator end(column_count, dataset,inserts); + val_t * ptr_begin = dataset[0]; + val_t * ptr_end = dataset[0] + inserts; + + if(engine != -1) { + printf("\n #pages ratio comp gb/s decom gb/s\n"); + if(multicolumn) { + run_test2::iterator> + (engine, multicolumn, inserts, begin, end, + column_count, buildTree); + } else { + run_test2(engine,multicolumn,inserts,ptr_begin,ptr_end, + column_count, buildTree); + } + } else { + // if(buildTree) { + // printf("\nCompression scheme #pages ratio comp gb/s " + // "recom gb/s decom gb/s"); + // } else { + printf("\nCompression scheme #pages ratio comp gb/s decom gb/s"); + // } + printf("\nPstar (For) "); + run_test2 + (For::PLUGIN_ID,0,inserts,ptr_begin,ptr_end, + column_count,buildTree); + printf("\nMulticolumn (For) "); + run_test2::iterator> + (For::PLUGIN_ID,1,inserts,begin,end, + column_count,buildTree); + printf("\nPstar (Rle) "); + run_test2 + (Rle::PLUGIN_ID,0,inserts,ptr_begin,ptr_end, + column_count,buildTree); + printf("\nMulticolumn (Rle) "); + run_test2::iterator> + (Rle::PLUGIN_ID,1,inserts,begin,end, + column_count,buildTree); + } + printf("\n"); + + for(int col = 0; col < column_count; col++) { + free(dataset[col]); + } + delete [] dataset; + + } else { + + assert(!file_mode); + + randomIterator begin(seed, inserts, column_count, bump_prob, 1); + randomIterator end(seed,inserts,column_count,bump_prob, 1); + end.offset(inserts); + // These three iterators are for pstar. They hide the fact that they're + // backed by tuples. + randomIterator pstrbegin(seed, inserts, column_count, bump_prob, 0); + randomIterator pstrend(seed,inserts, column_count, bump_prob, 0); + pstrend.offset(inserts); + + if(engine != -1) { + printf("\n #pages ratio comp gb/s decom gb/s\n"); + if(multicolumn) { + run_test2 > + (engine, multicolumn, inserts, begin, end, + column_count, buildTree); + } else { + run_test2 >(engine,multicolumn,inserts,pstrbegin,pstrend, + column_count, buildTree); + } + } else { + // if(buildTree) { + // printf("\nCompression scheme #pages ratio comp gb/s " + // "recom gb/s decom gb/s"); + // } else { + printf("\nCompression scheme #pages ratio comp gb/s decom gb/s"); + // } + printf("\nPstar (For) "); + run_test2 > + (For::PLUGIN_ID,0,inserts,pstrbegin,pstrend, + column_count,buildTree); + printf("\nMulticolumn (For) "); + run_test2 > + (For::PLUGIN_ID,1,inserts,begin,end, + column_count,buildTree); + printf("\nPstar (Rle) "); + run_test2 > + (Rle::PLUGIN_ID,0,inserts,pstrbegin,pstrend, + column_count,buildTree); + printf("\nMulticolumn (Rle) "); + run_test2 > + (Rle::PLUGIN_ID,1,inserts,begin,end, + column_count,buildTree); + } + printf("\n"); + } + + delete [] column; + + return 0; +} diff --git a/benchmarks/rose.sh b/benchmarks/rose.sh new file mode 100755 index 0000000..81e9fce --- /dev/null +++ b/benchmarks/rose.sh @@ -0,0 +1,5 @@ +#!/bin/sh + +ROSE=./rose + + $ROSE -i 10000000 -t && $ROSE -i 10000000 && $ROSE -i 100000 -t && $ROSE -i 100000 && $ROSE -i 1000 -t && $ROSE -i 1000 && $ROSE -i 10 -t && $ROSE -i 10 && $ROSE -i 1 -t && $ROSE -i 1 && $ROSE -i 0 -t && $ROSE -i 0 && $ROSE -l -i 10000000 -t && $ROSE -l -i 10000000 && $ROSE -l -i 100000 -t && $ROSE -l -i 100000 && $ROSE -l -i 1000 -t && $ROSE -l -i 1000 && $ROSE -l -i 10 -t && $ROSE -l -i 10 && $ROSE -l -i 1 -t && $ROSE -l -i 1 && $ROSE -l -i 0 -t && $ROSE -l -i 0 && $ROSE -n 5 -i 10000000 -t && $ROSE -n 5 -i 10000000 && $ROSE -n 5 -i 100000 -t && $ROSE -n 5 -i 100000 && $ROSE -n 5 -i 1000 -t && $ROSE -n 5 -i 1000 && $ROSE -n 5 -i 10 -t && $ROSE -n 5 -i 10 && $ROSE -n 5 -i 1 -t && $ROSE -n 5 -i 1 && $ROSE -n 5 -i 0 -t && $ROSE -n 5 -i 0 && $ROSE -l -n 5 -i 10000000 -t && $ROSE -l -n 5 -i 10000000 && $ROSE -l -n 5 -i 100000 -t && $ROSE -l -n 5 -i 100000 && $ROSE -l -n 5 -i 1000 -t && $ROSE -l -n 5 -i 1000 && $ROSE -l -n 5 -i 10 -t && $ROSE -l -n 5 -i 10 && $ROSE -l -n 5 -i 1 -t && $ROSE -l -n 5 -i 1 && $ROSE -l -n 5 -i 0 -t && $ROSE -l -n 5 -i 0 diff --git a/src/stasis/page/compression/compression.h b/src/stasis/page/compression/compression.h new file mode 100644 index 0000000..47767f9 --- /dev/null +++ b/src/stasis/page/compression/compression.h @@ -0,0 +1,77 @@ +#include + +#ifndef _ROSE_COMPRESSION_COMPRESSION_H__ +#define _ROSE_COMPRESSION_COMPRESSION_H__ + +namespace rose { + +typedef int8_t record_size_t; +typedef uint16_t byte_off_t; +typedef uint32_t slot_index_t; +typedef uint8_t plugin_id_t; +typedef uint8_t column_number_t; +typedef uint16_t column_offset_t; + +static const record_size_t VARIABLE_SIZE = CHAR_MAX; +static const slot_index_t NOSPACE = UINT_MAX; +static const slot_index_t EXCEPTIONAL = UINT_MAX-1; +static const slot_index_t MAX_INDEX = UINT_MAX-2; + +/** + This function computes a page type (an integer stored in the page header) + so that Stasis can dispatch calls to the appropriate page implemenation. + + Most page types choose a single constant, but pstar's page layout varies + across different template instantiations. In particular, the page layout + depends on sizeof(TYPE), and upon COMPRESOR. Finally, pstar and the + compressors behave differently depending on whether or not TYPE is signed + or unsigned. (Non integer types are currently not supported.) + + Right now, everything happens to be a power of two and the page + type is of this form: + + BASE_PAGEID + 00PCSII(base2) + + P stores the page format PAGE_FORMAT_ID + C stores the compressor PLUGIN_ID. + S is 1 iff the type is signed + II is 00, 01, 10, 11 depending on the sizeof(type) + + Although the on disk representation is bigger; stasis tries to keep page + types within the range 0 - 255. + +*/ +template +plugin_id_t plugin_id() { + /* type_idx maps from sizeof(TYPE) to a portion of a page type: + + (u)int8_t -> 0 + (u)int16_t -> 1 + (u)int32_t -> 2 + (u)int64_t -> 3 + + */ + + // Number of bytes in type ---> 1 2 4 8 + static const int type_idx[] = { -1, 0, 1, -1, 2, -1, -1, -1, 3 }; + static const int idx_count = 4; + static const TYPE is_signed = 0 - 1; + + // assert(sizeof(TYPE) <= 8 && type_idx[sizeof(TYPE)] >= 0); + + // XXX first '2' hardcodes the number of COMPRESSOR implementations... + + plugin_id_t ret = USER_DEFINED_PAGE(0) + // II S C + + idx_count * 2 * 2 * PAGEFORMAT::PAGE_FORMAT_ID + + idx_count * 2 * COMPRESSOR::PLUGIN_ID + + idx_count * (is_signed < 0) + + type_idx[sizeof(TYPE)]; + + return ret; +} + +} + + +#endif // _ROSE_COMPRESSION_COMPRESSION_H__ diff --git a/src/stasis/page/compression/for-impl.h b/src/stasis/page/compression/for-impl.h new file mode 100644 index 0000000..87e9dbf --- /dev/null +++ b/src/stasis/page/compression/for-impl.h @@ -0,0 +1,87 @@ +#ifndef _ROSE_COMPRESSION_FOR_IMPL_H__ +#define _ROSE_COMPRESSION_FOR_IMPL_H__ + +// Copyright 2007 Google Inc. All Rights Reserved. +// Author: sears@google.com (Rusty Sears) + +#include + +#include "for.h" + +namespace rose { +template +inline void +For::offset(TYPE o) { + assert(*numdeltas_ptr() == 0); + *base_ptr() = o; +} +/** + Store a new value as a delta from the page's base offset, then update + numdeltas_ptr so that we remember that we stored the value. +*/ +template +inline slot_index_t +For::append(int xid, const TYPE dat, + byte_off_t* except, byte* exceptions, //char *exceptional, + int *free_bytes) { + // Can dat be represented as a delta from the page's base value? + // XXX this can overflow if dat and / or offset are 64 bit... + int64_t delta = (int64_t)dat - (int64_t)offset(); + + if(delta > DELTA_MAX || delta < 0) { + + // Store dat's location as a delta + *next_delta_ptr() = *except - PAGE_SIZE; + + // Store dat as an exception + *(((TYPE*)(&exceptions[*except]))-1) = dat; + + // Allocate the delta and the exception (if possible) + *free_bytes -= sizeof(TYPE) + sizeof(delta_t); + int incr = *free_bytes >= 0; + *numdeltas_ptr() += incr; + *except -= incr * sizeof(TYPE); + + /* This does the same thing as the last few lines, but with a branch. It's + marginally slower: + + *next_delta_ptr() = *except - PAGE_SIZE; + *free_bytes -= sizeof(TYPE) + sizeof(delta_t); + if(*free_bytes >= 0) { + (*numdeltas_ptr())++; + *except -= sizeof(TYPE); + *(TYPE*)(&exceptions[*except]) = dat; + } */ + + } else { + // Store the delta + *next_delta_ptr() = (delta_t) delta; + + // Allocate space for it, if possible + *free_bytes -= sizeof(delta_t); + *numdeltas_ptr() += *free_bytes >= 0; + } + + return *numdeltas_ptr() - 1; + +} + +template +inline TYPE * +For::recordRead(int xid, slot_index_t slot, byte *exceptions, + TYPE * scratch) { + if (slot >= *numdeltas_ptr()) { + return 0; + } + delta_t d = *nth_delta_ptr(slot); + if (d >= 0) { + *scratch = d + *base_ptr(); + return scratch; + } else { + *scratch = *(TYPE*)(exceptions + d + PAGE_SIZE - sizeof(TYPE)); + return scratch; + } +} + +} // namespace rose +#endif // _ROSE_COMPRESSION_FOR_IMPL_H__ diff --git a/src/stasis/page/compression/for.h b/src/stasis/page/compression/for.h new file mode 100644 index 0000000..d1643a8 --- /dev/null +++ b/src/stasis/page/compression/for.h @@ -0,0 +1,158 @@ +#ifndef _ROSE_COMPRESSION_FOR_H__ +#define _ROSE_COMPRESSION_FOR_H__ + +// Copyright 2007 Google Inc. All Rights Reserved. +// Author: sears@google.com (Rusty Sears) + +/** + @file Implementation of Frame of Reference compression + + This file implements a COMPRESSOR plugin that handles compresion + for a column of data in a page. Rather than hardcoding a + particular page layout, these plugins work with two ranges of + memory that must be contiguous. The first range contains the + compressed data. A pointer to this region is stored in the mem_ + member of this class. The second region is shared amongst multiple + compressor implementations and is passed into methods via the + "exceptions" parameter. A second parameter, "except" provides the + offset of the first byte of exceptions that is in use. If + necessary, the compressor implementation may store data as an + exception, by prepending it to the exceptions array, and + decrementing "except". + + A third parameter, "free_bytes" is used to manage byte allocation + out of some global (per page) pool. If free_bytes becomes + negative, the page is out of space, and all further allocation + attempts will fail. Compressor plugins modify it as more data is + inserted into the page. + + Finally, the compressor may define a volatile region of bytes after + the end of the compressed data. This region is used as scratch + space, and will not be written to disk. However, it is + deducted from the total number of free bytes on the page, wasting a + small amount of storage on disk. The max_overrun() method returns + the size of this extra scratch space buffer. +*/ + +#include + +#include "pstar.h" +namespace rose { + +template +class For { + public: + static const int PLUGIN_ID = 0; + /** + Set the page offset. For frame of reference, this is used to + calculate deltas. + */ + inline void offset(TYPE o); + /** + The size of the scratch space reserved at the end of the page for + speculative execution. + */ + inline size_t max_overrun() { return sizeof(delta_t) + sizeof(TYPE); } + + /** + Append a new value to a compressed portion of a page. This + function is meant to be called by pstar, not by end users. + + @param xid the transaction that is appending the value (currently unused) + @param dat contains the value to be appended to the end of the page. + @param except the offset of the first exceptional value on the page. This + should initially be set to the end of the exceptional array; + append() will modify it as needed. + @param exceptions a pointer to the beginning of the exceptions region. + @param free_bytes The number of free bytes on the page. This number will be + decremented (or incremented) by append to reflect changes in the + number of bytes in use. + @return The slot index of the newly returned value or an undefined if the + page is too full to accomodate the value (that is, free_bytes is + negative). Implementations may set free_bytes to a negative value if + an implementation defined limit prevents them from accomodating more + data, even if there are free bytes on the page. + */ + inline slot_index_t append(int xid, const TYPE dat, + byte_off_t * except, byte * exceptions, + int * free_bytes); + + /** + Read a compressed value. This function is meant to be called by + pstar, not by end users. + + @param xid Tracked for locking. Currently unused. + @param slot The index number of the slot that should be read. + @param exceptions A byte array that contains the exceptional values. + @param buf Storage space for the value to be read. This function will + return this pointer after succesfully reading the value. + The caller manages the memory passed via buf. + + @return NULL if the slot is off the end of the page, or buf if the + record exists, and has been read. + + @see Pstar::recordRead() and Multicolumn::recordRead() + + */ + inline TYPE *recordRead(int xid, slot_index_t slot, byte *exceptions, + TYPE * buf); + /** + This constructor initializes a new FOR region. + + @param xid the transaction that created the new region. + */ + For(int xid, void * mem): mem_(mem) { + *base_ptr() = -1; + *numdeltas_ptr() = 0; + }; + For(void * mem): mem_(mem) { } + + For() : mem_(0) {} + /** + @return the length of the FOR region, in bytes + */ + inline byte_off_t bytes_used() { + return ((intptr_t)(last_delta_ptr()+1)) - (intptr_t)mem_; + } + + inline void mem(byte * mem) { mem_ = mem; } + + inline void init_mem(byte * mem) { + mem_=mem; + *base_ptr() = -1; + *numdeltas_ptr() = 0; + } + + private: + + /*typedef int8_t delta_t; + static const delta_t DELTA_MAX = CHAR_MAX; + static const delta_t DELTA_MIN = CHAR_MIN;*/ + typedef int16_t delta_t; + static const delta_t DELTA_MAX = SHRT_MAX; + static const delta_t DELTA_MIN = SHRT_MIN; + /*typedef int32_t delta_t; + static const delta_t DELTA_MAX = INT_MAX; + static const delta_t DELTA_MIN = INT_MIN;*/ + + inline TYPE offset() { return *base_ptr(); } + + inline TYPE* base_ptr() { return reinterpret_cast(mem_); } + + inline slot_index_t* numdeltas_ptr() { + return reinterpret_cast(base_ptr()+1); + } + inline delta_t * nth_delta_ptr(slot_index_t n) { + return reinterpret_cast(numdeltas_ptr()+1) + n; + } + inline delta_t * last_delta_ptr() { + return nth_delta_ptr(*numdeltas_ptr()-1); + } + inline delta_t * next_delta_ptr() { + return nth_delta_ptr(*numdeltas_ptr()); + } + void * mem_; +}; + +} // namespace rose +#endif // _ROSE_COMPRESSION_FOR_H__ diff --git a/src/stasis/page/compression/multicolumn-impl.h b/src/stasis/page/compression/multicolumn-impl.h new file mode 100644 index 0000000..24a1caf --- /dev/null +++ b/src/stasis/page/compression/multicolumn-impl.h @@ -0,0 +1,266 @@ +#ifndef _ROSE_COMPRESSION_MULTICOLUMN_IMPL_H__ +#define _ROSE_COMPRESSION_MULTICOLUMN_IMPL_H__ + +#include "multicolumn.h" + +namespace rose { + +/** + Initialize a new multicolumn page +*/ +template +Multicolumn::Multicolumn(int xid, Page *p, column_number_t column_count, + plugin_id_t * plugins) : + p_(p), + columns_(new byte*[column_count]), + first_exception_byte_(USABLE_SIZE_OF_PAGE), + exceptions_(new byte[USABLE_SIZE_OF_PAGE]), + dispatcher_(column_count), + unpacked_(1) +{ + + *column_count_ptr() = column_count; + + bytes_left_ = first_header_byte_ptr()- p->memAddr; + + for(int i = 0; i < column_count; i++) { + *column_plugin_id_ptr(i) = plugins[i]; + columns_[i] = new byte[USABLE_SIZE_OF_PAGE]; + dispatcher_.set_plugin(columns_[i],i,plugins[i]); + dispatcher_.init_mem(columns_[i],i); + bytes_left_ -= dispatcher_.bytes_used(i); + } + + *stasis_page_type_ptr(p) = plugin_id(); + p->impl = this; +} + +/** + XXX this eagerly unpacks the page at load; that's a waste of + processor time and RAM, as read-only pages don't need to be + unpacked. +*/ +template +Multicolumn::Multicolumn(Page * p) : + p_(p), + columns_(new byte*[*column_count_ptr()]), + first_exception_byte_(USABLE_SIZE_OF_PAGE - *exceptions_len_ptr()), + exceptions_(p_->memAddr + *exceptions_offset_ptr()), + dispatcher_(*column_count_ptr()), + unpacked_(0) { + byte_off_t first_free = 0; + for(int i = 0; i < *column_count_ptr(); i++) { + + byte * page_column_ptr = p_->memAddr + *column_offset_ptr(i); + + dispatcher_.set_plugin(page_column_ptr,i, *column_plugin_id_ptr(i)); + + byte_off_t column_length = dispatcher_.bytes_used(i); + columns_[i] = p_->memAddr + *column_offset_ptr(i); + dispatcher_.set_plugin(columns_[i],i, *column_plugin_id_ptr(i)); + + first_free = *column_offset_ptr(i) + column_length; + } + + assert(first_free <= *exceptions_offset_ptr()); + assert(first_exception_byte_ <= USABLE_SIZE_OF_PAGE); + + bytes_left_ = *exceptions_offset_ptr() - first_free; + + assert(*stasis_page_type_ptr(p) == Multicolumn::plugin_id()); +} + +template +void Multicolumn::pack() { + byte_off_t first_free = 0; + byte_off_t last_free = (intptr_t)(first_header_byte_ptr() - p_->memAddr); + if(unpacked_) { + *exceptions_len_ptr() = USABLE_SIZE_OF_PAGE - first_exception_byte_; + last_free -= *exceptions_len_ptr(); + + *exceptions_offset_ptr() = last_free; + memcpy(&(p_->memAddr[*exceptions_offset_ptr()]), + exceptions_ + first_exception_byte_, *exceptions_len_ptr()); + + for(int i = 0; i < *column_count_ptr(); i++) { + *column_offset_ptr(i) = first_free; + + byte_off_t bytes_used = dispatcher_.bytes_used(i); + memcpy(column_base_ptr(i), columns_[i], bytes_used); + + first_free += bytes_used; + assert(first_free <= last_free); + + delete [] columns_[i]; + columns_[i] = column_base_ptr(i); + dispatcher_.mem(columns_[i],i); //compressor(i))->mem(columns_[i]); + } + delete [] exceptions_; + exceptions_ = p_->memAddr + *exceptions_offset_ptr(); + unpacked_ = 0; + } +} + +template +Multicolumn::~Multicolumn() { + byte_off_t first_free = 0; + byte_off_t last_free = (intptr_t)(first_header_byte_ptr() - p_->memAddr); + if(unpacked_) { + *exceptions_len_ptr() = USABLE_SIZE_OF_PAGE - first_exception_byte_; + last_free -= *exceptions_len_ptr(); + + *exceptions_offset_ptr() = last_free; + memcpy(&(p_->memAddr[*exceptions_offset_ptr()]), + exceptions_ + first_exception_byte_, *exceptions_len_ptr()); + + for(int i = 0; i < *column_count_ptr(); i++) { + *column_offset_ptr(i) = first_free; + + byte_off_t bytes_used = dispatcher_.bytes_used(i); + memcpy(column_base_ptr(i), columns_[i], bytes_used); + first_free += bytes_used; + assert(first_free <= last_free); + delete [] columns_[i]; + } + + delete [] exceptions_; + } + delete [] columns_; +} + +/// Begin performance-critical code ------------------------------------------- + +/** + Append a record to the page. This function is complicated by + the fact that each column was produced by a potentially + different template instantiation. Rather than harcode + compressor implementations, or fall back on virtual methods, + this function delegates compressor calls to PluginDispatcher. + + Pstar<> (and potential future implementations of multicolumn) + benefit from this scheme as they can hardcode compressors at + compile time, allowing the correct append method to be inlined, + rather than invoked via a virtual method. +*/ +template +inline slot_index_t Multicolumn::append(int xid, + TUPLE const & dat) { + + slot_index_t ret = NOSPACE; + column_number_t i = 0; + + const column_number_t cols = dat.column_count(); + + do { + + slot_index_t newret = dispatcher_.recordAppend(xid, i, dat.get(i), + &first_exception_byte_, + exceptions_, &bytes_left_); + //assert(ret == NOSPACE || newret == NOSPACE || newret == ret); + ret = newret; + i++; + } while(i < cols); + + return bytes_left_ < 0 ? NOSPACE : ret; + +} + +/** + Read a record (tuple) from the page. + + @see append for a discussion of the implementation and + associated design tradeoffs. +*/ +template +inline TUPLE* Multicolumn::recordRead(int xid, slot_index_t slot, + TUPLE *buf) { + column_number_t i = 0; + column_number_t cols = buf->column_count(); + + do { + void * ret = dispatcher_.recordRead(xid,columns_[i],i,slot,exceptions_, + buf->get(i)); + if(!ret) { + return 0; + } + i++; + } while(i < cols); + return buf; +} + +/// End performance-critical code --------------------------------------------- + +/// Stuff below this line interfaces with Stasis' buffer manager -------------- + +/** + Basic page_impl for multicolumn pages + + @see stasis/page.h and pstar-impl.h + +*/ +static const page_impl multicolumn_impl = { + -1, + 0, // multicolumnRead, + 0, // multicolumnWrite, + 0, // multicolumnReadDone, + 0, // multicolumnWriteDone, + 0, // multicolumnGetType, + 0, // multicolumnSetType, + 0, // multicolumnGetLength, + 0, // multicolumnFirst, + 0, // multicolumnNext, + 0, // multicolumnIsBlockSupported, + 0, // multicolumnBlockFirst, + 0, // multicolumnBlockNext, + 0, // multicolumnBlockDone, + 0, // multicolumnFreespace, + 0, // multicolumnCompact, + 0, // multicolumnPreRalloc, + 0, // multicolumnPostRalloc, + 0, // multicolumnFree, + 0, // dereference_identity, + 0, // multicolumnLoaded, + 0, // multicolumnFlushed + 0, // multicolumnCleanup +}; + +// XXX implement plugin_id(). Currently, it treats all instantiations of the +// same TUPLE template interchangably; this will break for binaries that +// manipulate more than one type of tuple.. +template +inline plugin_id_t +Multicolumn::plugin_id() { + return USER_DEFINED_PAGE(0) + 32 + TUPLE::TUPLE_ID; +} + +template +void multicolumnLoaded(Page *p) { + p->LSN = *stasis_page_lsn_ptr(p); + assert(*stasis_page_type_ptr(p) == Multicolumn::plugin_id()); + p->impl = new Multicolumn(p); +} + +template +static void multicolumnFlushed(Page *p) { + *stasis_page_lsn_ptr(p) = p->LSN; + ((Multicolumn*)(p->impl))->pack(); +} +template +static void multicolumnCleanup(Page *p) { + delete (Multicolumn*)p->impl; + p->impl = 0; +} + +template +page_impl Multicolumn::impl() { + page_impl ret = multicolumn_impl; + ret.page_type = Multicolumn::plugin_id(); + ret.pageLoaded = multicolumnLoaded; + ret.pageFlushed = multicolumnFlushed; + ret.pageCleanup = multicolumnCleanup; + return ret; +} + +} + +#endif // _ROSE_COMPRESSION_MULTICOLUMN_IMPL_H__ diff --git a/src/stasis/page/compression/multicolumn.h b/src/stasis/page/compression/multicolumn.h new file mode 100644 index 0000000..8cad84c --- /dev/null +++ b/src/stasis/page/compression/multicolumn.h @@ -0,0 +1,167 @@ +#ifndef _ROSE_COMPRESSION_MULTICOLUMN_H__ +#define _ROSE_COMPRESSION_MULTICOLUMN_H__ + +#include + +#include +#include + +#include "pstar.h" // for typedefs + consts (XXX add new header?) +#include "tuple.h" // XXX rename tuple.hx + +// Copyright 2007 Google Inc. All Rights Reserved. +// Author: sears@google.com (Rusty Sears) + +/** + @file Page implementation for multi-column, compressed data + + STRUCTURE OF A MULTICOLUMN PAGE + +
+ +----------------------------------------------------------------------+
+ | col #0 compressed data (opaque) | col #1 compressed data (opaque)    |
+ +-----+---------------------------+-----+------------------------------|
+ | ... | col #N compressed data (opaque) |                              |
+ +-----+---------------------------------+                              |
+ |  Free space                                                          |
+ |                                                                      |
+ |                                                                      |
+ |                             +----------------------------------------+
+ |                             | Exceptions:                            |
+ +-----------------------------+ Includes data from multiple cols       |
+ |                                                                      |
+ | Exception data is managed (bytes are copied in and out of this       |
+ | region) by the column implementations.  Multicolumn mediates between |
+ | the columns, by recording the length and offset of this region.      |
+ |                                                                      |
+ |                                      +---------------+---------------+
+ |                                  ... | exception # 1 | exception # 0 |
+ +-----------------------+--------------------+----+--------------------+
+ |  first header byte -> | col #N off, plugin | .. | col #1 off, plugin |
+ +--------------------+--+-------------+------+----+----+-----------+---+
+ | col #0 off, plugin | exceptions len | exceptions off | # of cols | ? |
+ +--------------------+----------------+----------------+-----------+---+
+
+ + Notes: + + The 'exceptions' portion of the page grows down from + first_header_byte, while the column data portion grows up from byte + zero... This was an arbitrary decision, and complicated the + implementation somewhat... + + Functions whose names end in "_ptr" return pointers to bytes in the + page. That memory is persistant; and will eventually be written + back to the page file. + +*/ + +namespace rose { + +template +/** + * A "pageLoaded()" callback function for Stasis' buffer manager. + */ +void multicolumnLoaded(Page *p); + +template class Multicolumn { + public: + static page_impl impl(); + static const plugin_id_t PAGE_FORMAT_ID = 1; + + Multicolumn(int xid, Page *p, column_number_t column_count, + plugin_id_t * plugins); + + ~Multicolumn(); + + /** + @return the compressor used for a column. The nature of the + mapping between table region and compressor instance is + implementation defined, but there will never be more than one + compressor per-column, per-page. + + @param col The column whose compressor should be returned. + @return A pointer to a compressor. This pointer is guaranteed to + be valid until the next call to this Multicolumn object. After + that, the pointer returned here is invalid. + */ + void* compressor(column_number_t col) { + return dispatcher_.compressor(col); + } + inline slot_index_t append(int xid, TUPLE const & dat); + inline TUPLE * recordRead(int xid, slot_index_t slot, TUPLE * buf); + inline void pack(); + private: + + typedef struct column_header { + byte_off_t off; + plugin_id_t plugin_id; + } column_header; + + /** + Load an existing multicolumn Page + */ + Multicolumn(Page * p); + + /** + The following functions perform pointer arithmetic. This code is + performance critical. These short, inlined functions mostly + perform simple arithmetic expression involving constants. g++'s + optimizer seems to combine and simplify these expressions for us. + + See the page layout diagram at the top of this file for an + explanation of where these pointers are stored + */ + + inline column_number_t * column_count_ptr() { + return reinterpret_cast(p_->memAddr+USABLE_SIZE_OF_PAGE)-1; + } + inline byte_off_t * exceptions_offset_ptr() { + return reinterpret_cast(column_count_ptr())-1; + } + inline byte_off_t * exceptions_len_ptr() { + return exceptions_offset_ptr()-1;; + } + inline column_header * column_header_ptr(column_number_t column_number) { + return reinterpret_cast(exceptions_len_ptr())-(1+column_number); + } + inline byte_off_t * column_offset_ptr(column_number_t column_number) { + return &(column_header_ptr(column_number)->off); + } + /** + This stores the plugin_id associated with this page's compressor. + + @see rose::plugin_id() + */ + inline plugin_id_t * column_plugin_id_ptr(column_number_t column_number) { + return &(column_header_ptr(column_number)->plugin_id); + } + /** + The first byte that contains data for this column. + + The length of the column data can be determined by calling + COMPRESSOR's bytes_used() member function. (PluginDispatcher + can handle this). + */ + inline byte * column_base_ptr(column_number_t column_number) { + return *column_offset_ptr(column_number) + p_->memAddr; + } + inline byte * first_header_byte_ptr() { + return reinterpret_cast(column_header_ptr((*column_count_ptr())-1)); + } + + static inline plugin_id_t plugin_id(); + Page * p_; + byte ** columns_; + byte_off_t first_exception_byte_; + byte * exceptions_; + PluginDispatcher dispatcher_; + int bytes_left_; + int unpacked_; + friend void multicolumnLoaded(Page *p); +}; + +} // namespace rose + + +#endif // _ROSE_COMPRESSION_MULTICOLUMN_H__ diff --git a/src/stasis/page/compression/pstar-impl.h b/src/stasis/page/compression/pstar-impl.h new file mode 100644 index 0000000..c3073c1 --- /dev/null +++ b/src/stasis/page/compression/pstar-impl.h @@ -0,0 +1,110 @@ +#ifndef _ROSE_COMPRESSION_PSTAR_IMPL_H__ +#define _ROSE_COMPRESSION_PSTAR_IMPL_H__ + +// Copyright 2007 Google Inc. All Rights Reserved. +// Author: sears@google.com (Rusty Sears) + +#include +#include + +#include "pstar.h" +#include "for.h" + +namespace rose { + +/** + Appends a value to a page managed by pstar. For now, most of the + "real work" is handled by the compression algorithm. + + This function simply checks the return value from the plugin. If + the value should be stored as an exception, then it is prepended to + the list of exceptions at end of the page. The compressed data is kept + at the beginning of the page. +*/ +template +slot_index_t +Pstar::append(int xid, const TYPE dat) { + + slot_index_t ret = plug_.append(xid, dat, freespace_ptr(), p_->memAddr, + &free_bytes_); + + return free_bytes_ >= 0 ? ret : NOSPACE; +} + +// The rest of this file interfaces with Stasis ------------------------- + +/** + Implementation of the Stasis pageLoaded() callback. + + @see stasis/page.h +*/ +template +static void pStarLoaded(Page * p) { + p->LSN = *stasis_page_lsn_ptr(p); + p->impl = new Pstar(p); +} +/** + Implementation of the Stasis pageFlushed() callback. +*/ +template +static void pStarFlushed(Page * p) { + *stasis_page_lsn_ptr(p) = p->LSN; +} + +template +static void pStarCleanup(Page * p) { + delete (Pstar*)p->impl; +} +/** + Basic page_impl for pstar + + @see stasis/page.h + +*/ +static const page_impl pstar_impl = { + -1, + 0, // pStarRead, + 0, // pStarWrite, + 0, // pStarReadDone, + 0, // pStarWriteDone, + 0, // pStarGetType, + 0, // pStarSetType, + 0, // pStarGetLength, + 0, // pStarFirst, + 0, // pStarNext, + 0, // pStarIsBlockSupported, + 0, // pStarBlockFirst, + 0, // pStarBlockNext, + 0, // pStarBlockDone, + 0, // pStarFreespace, + 0, // pStarCompact, + 0, // pStarPreRalloc, + 0, // pStarPostRalloc, + 0, // pStarFree, + 0, // dereference_identity, + 0, // pStarLoaded, + 0, // pStarFlushed + 0, // pStarCleanup +}; + +/** + Be sure to call "registerPageType(Pstar<...>::impl())" once for + each template instantiation that Stasis might encounter, even if a + particular binary might not use that instantiation. This must be + done before calling Tinit(). + + @see registerPageType() from Stasis. +*/ +template +page_impl +Pstar::impl() { + page_impl ret = pstar_impl; + ret.page_type = plugin_id,COMPRESSOR,TYPE>(); + ret.pageLoaded = pStarLoaded; + ret.pageFlushed = pStarFlushed; + ret.pageCleanup = pStarCleanup; + return ret; +} + +} // namespace rose +#endif // _ROSE_COMPRESSION_PSTAR_IMPL_H__ diff --git a/src/stasis/page/compression/pstar.h b/src/stasis/page/compression/pstar.h new file mode 100644 index 0000000..a99aeef --- /dev/null +++ b/src/stasis/page/compression/pstar.h @@ -0,0 +1,119 @@ +#ifndef _ROSE_COMPRESSION_PSTAR_H__ +#define _ROSE_COMPRESSION_PSTAR_H__ + +#include + +#include +#include + +#include "compression.h" + +// Copyright 2007 Google Inc. All Rights Reserved. +// Author: sears@google.com (Rusty Sears) + +namespace rose { + +template +void pStarLoaded(Page *p); + +template +inline plugin_id_t plugin_id(); + +template class Pstar { + public: + // Initialize a new Pstar page + Pstar(int xid, Page *p): p_(p), plug_(COMPRESSOR(xid, p->memAddr)) { + *stasis_page_type_ptr(p) = plugin_id,COMPRESSOR,TYPE>(); + *freespace_ptr() = (intptr_t)recordsize_ptr() - (intptr_t)p_->memAddr; + *recordsize_ptr() = sizeof(TYPE); + free_bytes_ = *freespace_ptr() - plug_.bytes_used() - plug_.max_overrun(); + p->impl = this; + } + inline void pack() { }; + /** + Append a new value to a page managed by pstar. + + @param xid the transaction adding the data to the page + @param dat the value to be added to the page. + */ + slot_index_t append(int xid, const TYPE dat); + + // @todo If we want to support multiple columns per page, then recordSize + // and recordType need to be handled by the compressor. + inline record_size_t recordType(int xid, slot_index_t slot) { + return *recordsize_ptr(); + } + inline void recordType(int xid, slot_index_t slot, + record_size_t type) { + *recordsize_ptr() = type; + } + inline record_size_t recordLength(int xid, slot_index_t slot) { + return physical_slot_length(recordType(xid, slot)); + } + /** + Read a value from a page managed by pstar. + + @param xid the transaction reading the record. + @param buf scratch space for recordRead. + + @return NULL if there is no such slot, or a pointer to rhe + value. + + If a pointer is returned, it might point to the memory passed via + scratch, or it might point to memory managed by the page + implementation. The return value will not be invalidated as long as + the following two conditions apply: + + 1) The page is pinned; loadPage() has been called, but releasePage() + has not been called. + + 2) The memory that scratch points to has not been freed, or reused + in a more recent call to recordRead(). + + */ + inline TYPE * recordRead(int xid, slot_index_t slot, TYPE * buf) { + // byte_off_t except = 0; + TYPE * ret = plug_.recordRead(xid, slot, p_->memAddr, buf); + // if (ret == reinterpret_cast(INVALID_SLOT)) { return 0; } + + /* if (ret == reinterpret_cast(EXCEPTIONAL)) { + return reinterpret_cast( + &(p_->memAddr[except-recordLength(xid, rid.slot)])); + } */ + return ret; + } + inline COMPRESSOR * compressor() { return &plug_; } + + static page_impl impl(); + + static const plugin_id_t PAGE_FORMAT_ID = 0; + + private: + + // Load an existing Pstar page + Pstar(Page *p): p_(p), plug_(COMPRESSOR(p->memAddr)) { + free_bytes_ = *freespace_ptr() - plug_.bytes_used() - plug_.max_overrun(); + } + + inline byte_off_t * freespace_ptr() { + return reinterpret_cast(p_->memAddr+USABLE_SIZE_OF_PAGE)-1; + } + inline record_size_t * recordsize_ptr() { + return reinterpret_cast(freespace_ptr())-1; + } + + inline void page(Page * p) { + p_ = p; + plug_.memAddr(p->memAddr); + } + + Page *p_; + + + COMPRESSOR plug_; + int free_bytes_; + friend void pStarLoaded(Page *p); +}; + +} // namespace rose +#endif // _ROSE_COMPRESSION_PSTAR_H__ diff --git a/src/stasis/page/compression/rle-impl.h b/src/stasis/page/compression/rle-impl.h new file mode 100644 index 0000000..f301825 --- /dev/null +++ b/src/stasis/page/compression/rle-impl.h @@ -0,0 +1,76 @@ +#ifndef _ROSE_COMPRESSION_RLE_IMPL_H__ +#define _ROSE_COMPRESSION_RLE_IMPL_H__ + +// Copyright 2007 Google Inc. All Rights Reserved. +// Author: sears@google.com (Rusty Sears) + +#include + +#include "rle.h" + + +namespace rose { +/** + Store a new value in run length encoding. If this value matches + the previous one, increment a counter. Otherwise, create a new + triple_t to hold the new value and its count. Most of the + complexity comes from dealing with integer overflow, and running + out of space. +*/ +template +inline slot_index_t +Rle::append(int xid, const TYPE dat, + byte_off_t* except, byte * exceptions, //char *exceptional, + int *free_bytes) { + int64_t ret; + + ret = last_block_ptr()->index + last_block_ptr()->copies; + + if (dat != last_block_ptr()->data || + last_block_ptr()->copies == MAX_COPY_COUNT) { + // this key is not the same as the last one, or + // the block is full + + *free_bytes -= sizeof(triple_t); + + // Write the changes in our overrun space + triple_t *n = new_block_ptr(); + n->index = ret; + n->copies = 1; + n->data = dat; + + // Finalize the changes unless we're out of space + (*block_count_ptr()) += (*free_bytes >= 0); + + } else if(ret == MAX_INDEX) { + // out of address space + *free_bytes = -1; + ret = NOSPACE; + } else { + // success; bump number of copies of this item, and return. + last_block_ptr()->copies++; + } + + return (slot_index_t)ret; +} +template +inline TYPE * +Rle::recordRead(int xid, slot_index_t slot, byte* exceptions, + TYPE * scratch) { + block_index_t n = nth_block_ptr(last_)->index <= slot ? last_ : 0; + // while (n < *block_count_ptr()) { + do { + triple_t * t = nth_block_ptr(n); + if (t->index <= slot && t->index + t->copies > slot) { + *scratch = t->data; + last_ = n; + return scratch; + } + n++; + } while (n < *block_count_ptr()); + return 0; +} + +} // namespace rose + +#endif // _ROSE_COMPRESSION_RLE_IMPL_H__ diff --git a/src/stasis/page/compression/rle.h b/src/stasis/page/compression/rle.h new file mode 100644 index 0000000..49c9ecd --- /dev/null +++ b/src/stasis/page/compression/rle.h @@ -0,0 +1,100 @@ +#ifndef _ROSE_COMPRESSION_RLE_H__ +#define _ROSE_COMPRESSION_RLE_H__ + +// Copyright 2007 Google Inc. All Rights Reserved. +// Author: sears@google.com (Rusty Sears) + +#include +#include +#include "pstar.h" + +namespace rose { + +template +class Rle { + + public: + typedef uint32_t block_index_t; + typedef uint16_t copy_count_t; + static const copy_count_t MAX_COPY_COUNT = USHRT_MAX; + + struct triple_t { + slot_index_t index; + copy_count_t copies; + //byte foo[100]; // <-- Uncomment to test boundaries + TYPE data; + }; + + static const int PLUGIN_ID = 1; + + inline void offset(TYPE off) { nth_block_ptr(0)->data = off; }; + inline size_t max_overrun() { return sizeof(triple_t); } + + /** @see For::append */ + inline slot_index_t append(int xid, const TYPE dat, + byte_off_t* except, byte * exceptions, + int * free_bytes); + /** @see For::recordRead */ + inline TYPE *recordRead(int xid, slot_index_t slot, byte *exceptions, + TYPE *scratch); + /** + This constructor initializes a new Rle region. + + @param xid the transaction that created the new region. + */ + Rle(int xid, void * mem): mem_(mem), last_(0) { + *block_count_ptr() = 1; + triple_t * n = last_block_ptr(); + n->index = 0; + n->copies = 0; + n->data = 0; + } + /** + This constructor is called when existing RLE data is read from + disk. + */ + Rle(void * mem): mem_(mem), last_(0) { } + + Rle() : mem_(0), last_(0) {} + + /** + @see For::bytes_used(); + */ + inline byte_off_t bytes_used() { + return ((intptr_t)(last_block_ptr()+1))-(intptr_t)mem_; + } + + inline void init_mem(void * mem) { + mem_=mem; + last_=0; + *block_count_ptr() = 1; + triple_t * n = nth_block_ptr(0); + n->index = 0; + n->copies = 0; + n->data = 0; + } + + inline void mem(void * mem) { + mem_=mem; + last_=0; + } + private: + inline TYPE offset() { return nth_block_ptr(0)->dat; } + inline block_index_t* block_count_ptr() { + return reinterpret_cast(mem_); + } + inline triple_t* nth_block_ptr(block_index_t n) { + return reinterpret_cast(block_count_ptr()+1) + n; + } + inline triple_t* last_block_ptr() { + return nth_block_ptr(*block_count_ptr()-1); + } + inline triple_t* new_block_ptr() { + return nth_block_ptr(*block_count_ptr()); + } + void * mem_; + block_index_t last_; +}; + +} // namespace rose +#endif // _ROSE_COMPRESSION_RLE_H__ diff --git a/src/stasis/page/compression/tuple.h b/src/stasis/page/compression/tuple.h new file mode 100644 index 0000000..02c288b --- /dev/null +++ b/src/stasis/page/compression/tuple.h @@ -0,0 +1,336 @@ +#ifndef _ROSE_COMPRESSION_TUPLE_H__ +#define _ROSE_COMPRESSION_TUPLE_H__ + +// Copyright 2007 Google Inc. All Rights Reserved. +// Author: sears@google.com (Rusty Sears) + +/** + @file Implementation of tuples (Tuple) and dispatch routines for + column wide compression (PluginDispatcher). +*/ + +#include +#include + +#include "compression.h" +#include "pstar-impl.h" +#include "multicolumn.h" +namespace rose { + +template class Multicolumn; +template class Tuple; + +/** + PluginDispatcher essentially just wraps calls to compressors in + switch statements. + + It has a number of deficiencies: + + 1) Performance. The switch statement is the main CPU bottleneck + for both of the current compression schemes. + + 2) PluginDispatcher has to "know" about all compression + algorithms and all data types that it may encounter. + + This approach has one advantage; it doesn't preclude other + (templatized) implementations that hardcode schema formats a + compile time. + + Performance could be partially addressed by using a blocking append + algorithm: + + A Queue up multiple append requests (or precompute read requests) + when appropriate. + + B Before appending, calculate a lower (pessimistic) bound on the + number of inserted tuples that can fit in the page: + + n = (free bytes) / (maximum space per tuple) + + C Compress n tuples from each column at a time. Only evaluate the + switch statement once for each column. + + D Repeat steps B and C until n is below some threshold, then + revert the current behavior. + + Batching read requests is simpler, and would be useful for + sequential scans over the data. + +*/ + +class PluginDispatcher{ + public: + +#define dispatchSwitch(col,cases,...) \ + static const int base = USER_DEFINED_PAGE(0) + 2 * 2 * 4;\ + switch(plugin_ids_[col]-base) { \ + cases(0, For, col,uint8_t, __VA_ARGS__); \ + cases(1, For,col,uint16_t,__VA_ARGS__); \ + cases(2, For,col,uint32_t,__VA_ARGS__); \ + cases(3, For,col,uint64_t,__VA_ARGS__); \ + cases(4, For, col,int8_t, __VA_ARGS__); \ + cases(5, For, col,int16_t, __VA_ARGS__); \ + cases(6, For, col,int32_t, __VA_ARGS__); \ + cases(7, For, col,int64_t, __VA_ARGS__); \ + cases(8, Rle, col,uint8_t, __VA_ARGS__); \ + cases(9, Rle,col,uint16_t,__VA_ARGS__); \ + cases(10,Rle,col,uint32_t,__VA_ARGS__); \ + cases(11,Rle,col,uint64_t,__VA_ARGS__); \ + cases(12,Rle, col,int8_t, __VA_ARGS__); \ + cases(13,Rle, col,int16_t, __VA_ARGS__); \ + cases(14,Rle, col,int32_t, __VA_ARGS__); \ + cases(15,Rle, col,int64_t, __VA_ARGS__); \ + default: abort(); \ + }; + +#define caseAppend(off,plug_type,col,type,fcn,ret,xid,dat,...) \ + case off: { \ + ret = ((plug_type*)plugins_[col])->fcn(xid,*(type*)dat,__VA_ARGS__); } break + +#define caseSetPlugin(off,plug_type,col,type,m) \ + case off: { plugins_[col] = new plug_type(m); } break + +#define caseDelPlugin(off,plug_type,col,type,m) \ + case off: { delete (plug_type*)plugins_[col]; } break + +#define caseRead(off,plug_type,col,type,m,ret,fcn,xid,slot,except,scratch) \ + case off: { ret = ((plug_type*)plugins_[col])->fcn(xid,slot,except,(type*)scratch); } break + +#define caseNoArg(off,plug_type,col,type,m,ret,fcn) \ + case off: { ret = ((plug_type*)plugins_[col])->fcn(); } break + +#define caseInitMem(off,plug_type,col,type,m) \ + case off: { ((plug_type*)plugins_[col])->init_mem(m); } break + +#define caseMem(off,plug_type,col,type,m) \ + case off: { ((plug_type*)plugins_[col])->mem(m); } break + +#define caseCompressor(off,plug_type,col,type,nil) \ + case off: { ret = (plug_type*)plugins_[col]; } break + + inline slot_index_t recordAppend(int xid, column_number_t col, + const void *dat, byte_off_t* except, + byte *exceptions, int *free_bytes) { + slot_index_t ret; + dispatchSwitch(col,caseAppend,append,ret,xid,dat,except,exceptions, + free_bytes); + return ret; + } + + inline void *recordRead(int xid, byte *mem, column_number_t col, + slot_index_t slot, byte* exceptions, void *scratch) { + void * ret; + dispatchSwitch(col,caseRead,mem,ret,recordRead,xid,slot,exceptions,scratch); + return ret; + } + + inline byte_off_t bytes_used(column_number_t col) { + byte_off_t ret; + dispatchSwitch(col,caseNoArg,mem,ret,bytes_used); + return ret; + } + + inline void init_mem(byte * mem, column_number_t col) { + dispatchSwitch(col,caseInitMem,mem); + } + inline void mem(byte * mem, column_number_t col) { + dispatchSwitch(col,caseMem,mem); + } + + inline void * compressor(column_number_t col) { + void * ret; + dispatchSwitch(col,caseCompressor,0); + return ret; + } + PluginDispatcher(column_number_t column_count) : + column_count_(column_count), plugin_ids_(new plugin_id_t[column_count]), plugins_(new void*[column_count]) { + for(column_number_t i = 0; i < column_count; i++) { + plugin_ids_[i] = 0; + } + } + + PluginDispatcher(int xid, byte *mem,column_number_t column_count, plugin_id_t * plugins) : + column_count_(column_count), plugin_ids_(new plugin_id_t[column_count]), plugins_(new void*[column_count]) { + for(column_number_t i = 0; i < column_count; i++) { + plugin_ids_[i] = 0; + set_plugin(mem,i,plugins[i]); + } + } + + inline void set_plugin(byte *mem,column_number_t c, plugin_id_t p) { + if(plugin_ids_[c]) { + dispatchSwitch(c,caseDelPlugin,0); + } + plugin_ids_[c] = p; + dispatchSwitch(c,caseSetPlugin,mem); + } + + ~PluginDispatcher() { + for(column_number_t i = 0; i < column_count_; i++) { + dispatchSwitch(i,caseDelPlugin,0); + } + delete[] plugin_ids_; + delete[] plugins_; + } + +#undef caseAppend +#undef caseSetPlugin +#undef caseDelPlugin +#undef caseRead +#undef caseNoArg +#undef caseInitMem +#undef caseCompressor + + private: + + column_number_t column_count_; + plugin_id_t * plugin_ids_; + void ** plugins_; +}; + +template +class Tuple { + public: + explicit Tuple(column_number_t count) : count_(count), + cols_(new TYPE[count]), + byteArray_(new byte[sizeof(count_)+count_*sizeof(TYPE)]) {} + + /*explicit Tuple(byte* b) : count_(*(column_number_t*)b), + cols_(new TYPE[count_]), + byteArray_(new byte[sizeof(count_)+count_*sizeof(TYPE)]) { + memcpy(cols_,b+sizeof(column_number_t), sizeof(TYPE)*count_); + } */ + explicit Tuple(Tuple& t) : count_(t.count_), cols_(new TYPE[count_]), + byteArray_(new byte[sizeof(count_)+count_*sizeof(TYPE)]) { + for(column_number_t c = 0; c < count_; c++) { + cols_[c] = t.cols_[c]; + } + } + Tuple(TYPE t) : count_(0), cols_(new TYPE[1]), + byteArray_(new byte[sizeof(count_)+sizeof(TYPE)]) { + cols_[0] = t; + } + /* Tuple(Tuple *t) : count_(t->count_),cols_(new TYPE[count_]) { + for(column_number_t c = 0; c < count_; c++) { + cols_[c] = t->cols_[c]; + } + } */ + inline ~Tuple() { delete[] cols_; delete[] byteArray_; } + + inline TYPE * set(column_number_t col,void* val) { + cols_[col] = *(TYPE*)val; + return (TYPE*)val; + } + inline TYPE * get(column_number_t col) const { + return &(cols_[col]); + } + inline column_number_t column_count() const { + return count_; + } + inline byte_off_t column_len(column_number_t col) const { + return sizeof(TYPE); + } + /* inline void fromByteArray(byte * b) { + assert(count_ == *(column_number_t*)b); + // memcpy(cols_,b+sizeof(column_number_t),sizeof(TYPE)*count_); + TYPE *newCols = (int*)(b + sizeof(column_number_t)); + for(column_number_t i = 0; i < count_; i++) { + cols_[i] = newCols[i]; + } + } */ + inline byte* toByteArray() { + byte* ret = byteArray_; + memcpy(ret, &count_, sizeof(count_)); + memcpy(ret+sizeof(count_), cols_, count_ * sizeof(TYPE)); + return ret; + } + /* inline operator const byte * () { + return toByteArray(); + } */ + inline operator TYPE () { + return cols_[0]; //*get(0); + } + /* inline operator TYPE () { + assert(count_ == 0); + return cols_[0]; + } */ + static inline size_t sizeofBytes(column_number_t cols) { + return sizeof(column_number_t) + cols * sizeof(TYPE); + } + static const int TUPLE_ID = 0; + + /* inline bool operator==(Tuple *t) { + return *this == *t; + } */ + inline bool operator==(Tuple &t) { + //if(t.count_ != count_) return 0; + for(column_number_t i = 0; i < count_; i++) { + if(cols_[i] != t.cols_[i]) { return 0;} + } + return 1; + } + inline bool operator<(Tuple &t) { + //if(t.count_ != count_) return 0; + for(column_number_t i = 0; i < count_; i++) { + if(cols_[i] < t.cols_[i]) { return 1;} + } + return 0; + } + + /* inline bool operator==(TYPE val) { + assert(count_ == 1); + return cols_[0] == val; + }*/ + class iterator { + public: + inline iterator(column_number_t c, TYPE const *const *const dataset, int offset) : + c_(c), + dat_(dataset), + off_(offset), + scratch_(c_) {} + inline explicit iterator(const iterator &i) : c_(i.c_), dat_(i.dat_), off_(i.off_), + scratch_(c_) {} + + inline Tuple& operator*() { + for(column_number_t i = 0; i < c_; i++) { + scratch_.set(i,(void*)&dat_[i][off_]); + } + return scratch_; + } + inline bool operator==(const iterator &a) const { + //assert(dat_==a.dat_ && c_==a.c_); + return (off_==a.off_); + } + inline bool operator!=(const iterator &a) const { + //assert(dat_==a.dat_ && c_==a.c_); + return (off_!=a.off_); + } + inline void operator++() { off_++; } + inline void operator--() { off_--; } + inline void operator+=(int i) { abort(); } + inline int operator-(iterator&i) { + return off_ - i.off_; + } + inline void operator=(iterator &i) { + assert(c_==i.c_); + assert(dat_==i.dat_); + off_=i.off_; + } + inline void offset(int off) { + off_=off; + } + private: + column_number_t c_; + TYPE const * const * dat_; + int off_; + Tuple scratch_; + }; + private: + Tuple() { abort(); } + explicit Tuple(const Tuple& t) { abort(); } + column_number_t count_; + TYPE * const cols_; + byte * byteArray_; + }; +} +#endif // _ROSE_COMPRESSION_TUPLE_H__