Initial refactoring of rose.cpp into reusable components. (Just shuffles the code around.)

This commit is contained in:
Sears Russell 2007-10-18 18:52:12 +00:00
parent cd5ec5f70c
commit 1c408dd2f1
8 changed files with 828 additions and 753 deletions

View file

@ -2,7 +2,6 @@
// Author: sears@google.com (Rusty Sears)
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <stasis/transactional.h>
@ -13,6 +12,12 @@
#include "stasis/page/compression/multicolumn-impl.h"
#include "stasis/page/compression/tuple.h"
#include "stasis/operations/lsmIterators.h"
typedef int32_t val_t; // XXX needed by lsmWorkers..
#include "stasis/operations/lsmWorkers.h"
#undef end
#undef begin
@ -32,185 +37,21 @@ using rose::plugin_id_t;
using rose::column_number_t;
using rose::slot_index_t;
typedef int32_t val_t;
using rose::treeIterator;
using rose::mergeIterator;
using rose::toByteArray;
using rose::insert_args; // XXX
using rose::insertThread;
using rose::mergeThread;
using rose::compressData;
using rose::tv_to_double; // XXX
static const int32_t GB = 1024 * 1024 * 1024;
static int lsm_sim; // XXX this global variable shouldn't be global!
template <class ROW, class PAGELAYOUT>
class treeIterator {
public:
explicit treeIterator(recordid tree, ROW &scratch, int keylen) :
tree_(tree),
scratch_(scratch),
keylen_(keylen),
lsmIterator_(lsmTreeIterator_open(-1,tree)),
slot_(0)
{
if(!lsmTreeIterator_next(-1, lsmIterator_)) {
currentPage_ = 0;
pageid_ = -1;
p_ = 0;
} else {
pageid_t * pid_tmp;
lsmTreeIterator_value(-1,lsmIterator_,(byte**)&pid_tmp);
pageid_ = *pid_tmp;
p_ = loadPage(-1,pageid_);
currentPage_ = (PAGELAYOUT*)p_->impl;
}
}
explicit treeIterator(treeIterator& t) :
tree_(t.tree_),
scratch_(t.scratch_),
keylen_(t.keylen_),
lsmIterator_(lsmTreeIterator_copy(-1,t.lsmIterator_)),
slot_(t.slot_),
pageid_(t.pageid_),
p_((Page*)((t.p_)?loadPage(-1,t.p_->id):0)),
currentPage_((PAGELAYOUT*)((p_)?p_->impl:0)) {
}
~treeIterator() {
lsmTreeIterator_close(-1, lsmIterator_);
if(p_) {
releasePage(p_);
p_ = 0;
}
}
ROW & operator*() {
ROW* readTuple = currentPage_->recordRead(-1,slot_, &scratch_);
if(!readTuple) {
releasePage(p_);
p_=0;
if(lsmTreeIterator_next(-1,lsmIterator_)) {
pageid_t *pid_tmp;
slot_ = 0;
lsmTreeIterator_value(-1,lsmIterator_,(byte**)&pid_tmp);
pageid_ = *pid_tmp;
p_ = loadPage(-1,pageid_);
currentPage_ = (PAGELAYOUT*)p_->impl;
readTuple = currentPage_->recordRead(-1,slot_, &scratch_);
assert(readTuple);
} else {
// past end of iterator! "end" should contain the pageid of the
// last leaf, and 1+ numslots on that page.
abort();
}
}
return scratch_;
}
inline bool operator==(const treeIterator &a) const {
return (slot_ == a.slot_ && pageid_ == a.pageid_);
}
inline bool operator!=(const treeIterator &a) const {
return !(*this==a);
}
inline void operator++() {
slot_++;
}
inline void operator--() {
// This iterator consumes its input, and only partially supports
// "==". "--" is just for book keeping, so we don't need to worry
// about setting the other state.
slot_--;
}
inline treeIterator* end() {
treeIterator* t = new treeIterator(tree_,scratch_,keylen_);
if(t->p_) {
releasePage(t->p_);
t->p_=0;
}
t->currentPage_ = 0;
pageid_t pid = TlsmLastPage(-1,tree_);
if(pid != -1) {
t->pageid_= pid;
Page * p = loadPage(-1, t->pageid_);
PAGELAYOUT * lastPage = (PAGELAYOUT*)p->impl;
t->slot_ = 0;
while(lastPage->recordRead(-1,t->slot_,&scratch_)) { t->slot_++; }
releasePage(p);
} else {
// begin == end already; we're done.
}
return t;
}
private:
explicit treeIterator() { abort(); }
void operator=(treeIterator & t) { abort(); }
int operator-(treeIterator & t) { abort(); }
recordid tree_;
ROW & scratch_;
int keylen_;
lladdIterator_t * lsmIterator_;
slot_index_t slot_;
pageid_t pageid_;
Page * p_;
PAGELAYOUT * currentPage_;
};
/**
The following initPage() functions initialize the page that is
passed into them, and allocate a new PAGELAYOUT object of the
appropriate type.
*/
template <class COMPRESSOR, class TYPE>
static Pstar<COMPRESSOR, TYPE> * initPage(Pstar<COMPRESSOR,TYPE> **pstar,
Page *p, TYPE current) {
*pstar = new Pstar<COMPRESSOR, TYPE>(-1, p);
(*pstar)->compressor()->offset(current);
return *pstar;
}
template <class COMPRESSOR, class TYPE>
static Pstar<COMPRESSOR, TYPE> * initPage(Pstar<COMPRESSOR,TYPE> **pstar,
Page *p, Tuple<TYPE> & current) {
*pstar = new Pstar<COMPRESSOR, TYPE>(-1, p);
(*pstar)->compressor()->offset(current);
return *pstar;
}
template <class COMPRESSOR, class TYPE >
static Multicolumn<Tuple<TYPE> > * initPage(Multicolumn<Tuple<TYPE> > ** mc,
Page *p, Tuple<TYPE> & t) {
column_number_t column_count = t.column_count();
plugin_id_t plugin_id =
rose::plugin_id<Multicolumn<Tuple<TYPE> >,COMPRESSOR,TYPE>();
plugin_id_t * plugins = new plugin_id_t[column_count];
for(column_number_t c = 0; c < column_count; c++) {
plugins[c] = plugin_id;
}
*mc = new Multicolumn<Tuple<TYPE> >(-1,p,column_count,plugins);
for(column_number_t c = 0; c < column_count; c++) {
((COMPRESSOR*)(*mc)->compressor(c))->offset(*t.get(c));
}
delete [] plugins;
return *mc;
}
template <class COMPRESSOR, class TYPE >
static Multicolumn<Tuple<TYPE> > * initPage(Multicolumn<Tuple<TYPE> > ** mc,
Page *p, TYPE t) {
plugin_id_t plugin_id =
rose::plugin_id<Multicolumn<Tuple<TYPE> >,COMPRESSOR,TYPE>();
plugin_id_t * plugins = new plugin_id_t[1];
plugins[0] = plugin_id;
*mc = new Multicolumn<Tuple<TYPE> >(-1,p,1,plugins);
((COMPRESSOR*)(*mc)->compressor(0))->offset(t);
delete [] plugins;
return *mc;
}
#define FIRST_PAGE 1
/**
@ -262,17 +103,6 @@ int tupCmp(const void *ap, const void *bp) {
return 0;
}
/**
cast from a TYPE to a byte *. These were written to make it easier
to write templates that work with different types of iterators.
*/
inline const byte * toByteArray(int const *const *const i) {
return (const byte*)*i;
}
inline const byte * toByteArray(Tuple<val_t>::iterator * const t) {
return (**t).toByteArray();
}
#define RAND_TUP_CHECK
#define RAND_TUP_YES 0
#define RAND_TUP_NO 1
@ -417,227 +247,9 @@ inline const byte * toByteArray(randomIterator<val_t> * const t) {
}
}
template <class ITER, class ROW> class mergeIterator;
template <class ITER, class ROW>
inline const byte * toByteArray(mergeIterator<ITER,ROW> * const t);
template<class ITER, class ROW>
class mergeIterator {
private:
static const int A = 0;
static const int B = 1;
static const int NONE = -1;
static const int BOTH = -2;
inline int calcCurr(int oldcur) {
int cur;
if(oldcur == NONE) { return NONE; }
if(a_ == aend_) {
if(b_ == bend_) {
cur = NONE;
} else {
cur = B;
}
} else {
if(b_ == bend_) {
cur = A;
} else {
if((*a_) < (*b_)) {
cur = A;
} else if((*a_) == (*b_)) {
cur = BOTH;
} else {
cur = B;
}
}
}
return cur;
}
public:
mergeIterator(ITER & a, ITER & b, ITER & aend, ITER & bend) :
off_(0),
a_(a),
b_(b),
aend_(aend),
bend_(bend),
curr_(calcCurr(A)),
before_eof_(0)
{}
explicit mergeIterator(mergeIterator &i) :
off_(i.off_),
a_(i.a_),
b_(i.b_),
aend_(i.aend_),
bend_(i.bend_),
curr_(i.curr_),
before_eof_(i.before_eof_)
{ }
ROW& operator* () {
if(curr_ == A || curr_ == BOTH) { return *a_; }
if(curr_ == B) { return *b_; }
abort();
}
void seekEnd() {
curr_ = NONE;
}
// XXX Only works if exactly one of the comparators is derived from end.
inline bool operator==(const mergeIterator &o) const {
if(curr_ == NONE && o.curr_ == NONE) {
return 1;
} else if(curr_ != NONE && o.curr_ != NONE) {
return (a_ == o.a_) && (b_ == o.b_);
}
return 0;
}
inline bool operator!=(const mergeIterator &o) const {
return !(*this == o);
}
inline void operator++() {
off_++;
if(curr_ == BOTH) {
++a_;
++b_;
} else {
if(curr_ == A) { ++a_; }
if(curr_ == B) { ++b_; }
}
curr_ = calcCurr(curr_);
}
inline void operator--() {
off_--;
if(curr_ == BOTH) {
--a_;
--b_;
} else {
if(curr_ == A) { --a_; }
if(curr_ == B) { --b_; }
}
if(curr_ == NONE) {
before_eof_ = 1;
} else {
before_eof_ = 0;
}
}
inline int operator-(mergeIterator&i) {
return off_ - i.off_;
}
inline void operator=(mergeIterator const &i) {
off_ = i.off_;
a_ = i.a_;
b_ = i.b_;
aend_ = i.aend_;
bend_ = i.bend_;
curr_ = i.curr_;
before_eof_ = i.before_eof;
}
inline unsigned int offset() { return off_; }
private:
unsigned int off_;
ITER a_;
ITER b_;
ITER aend_;
ITER bend_;
int curr_;
int before_eof_;
friend const byte* toByteArray<ITER,ROW>(mergeIterator<ITER,ROW> * const t);
};
#undef RAND_TUP_CHECK
/** Produce a byte array from the value stored at t's current
position */
template <class ITER, class ROW>
inline const byte * toByteArray(mergeIterator<ITER,ROW> * const t) {
if(t->curr_ == t->A || t->curr_ == t->BOTH) {
return toByteArray(&t->a_);
} else if(t->curr_ == t->B) {
return toByteArray(&t->b_);
}
abort();
}
/**
Create pages that are managed by Pstar<COMPRESSOR, TYPE>, and
use them to store a compressed representation of the data set.
@param dataset A pointer to the data that should be compressed.
@param inserts The number of elements in dataset.
@return the number of pages that were needed to store the
compressed data.
*/
template <class PAGELAYOUT, class COMPRESSOR, class TYPE, class ITER, class ROW>
pageid_t compressData(ITER * const begin, ITER * const end,
int buildTree, recordid tree, pageid_t (*pageAlloc)(int,void*),
void *pageAllocState, uint64_t * inserted) {
*inserted = 0;
if(*begin == *end) {
return 0;
}
pageid_t next_page = pageAlloc(-1,pageAllocState);
Page *p = loadPage(-1, next_page);
pageid_t pageCount = 0;
if(*begin != *end && buildTree) {
TlsmAppendPage(-1,tree,toByteArray(begin),p->id);
}
pageCount++;
PAGELAYOUT * mc;
initPage<COMPRESSOR,TYPE>(&mc, p, **begin);
int lastEmpty = 0;
for(ITER i(*begin); i != *end; ++i) {
rose::slot_index_t ret = mc->append(-1, *i);
(*inserted)++;
if(ret == rose::NOSPACE) {
p->dirty = 1;
mc->pack();
releasePage(p);
--(*end);
if(i != *end) {
next_page = pageAlloc(-1,pageAllocState);
p = loadPage(-1, next_page);
mc = initPage<COMPRESSOR, TYPE>(&mc, p, *i);
if(buildTree) {
TlsmAppendPage(-1,tree,toByteArray(&i),p->id);
}
pageCount++;
lastEmpty = 0;
} else {
lastEmpty = 1;
}
++(*end);
--i;
}
}
p->dirty = 1;
mc->pack();
releasePage(p);
return pageCount;
}
template <class PAGELAYOUT>
inline const byte* toByteArray(treeIterator<int,PAGELAYOUT> *const t) {
return (const byte*)&(**t);
}
template <class PAGELAYOUT,class TYPE>
inline const byte* toByteArray(treeIterator<Tuple<TYPE>,PAGELAYOUT> *const t) {
return (**t).toByteArray();
}
/**
Read compressed data from pages starting at FIRST_PAGE.
@ -706,177 +318,6 @@ int readDataFromTree(recordid tree, ITER &iter, ROW *scratch, int keylen) {
return count;
}
double tv_to_double(struct timeval tv) {
return static_cast<double>(tv.tv_sec) +
(static_cast<double>(tv.tv_usec) / 1000000.0);
}
template<class PAGELAYOUT,class ENGINE,class ITER,class ROW,class TYPE>
struct insert_args {
int comparator_idx;
int rowsize;
ITER *begin;
ITER *end;
pageid_t(*pageAlloc)(int,void*);
void *pageAllocState;
pthread_mutex_t * block_ready_mut;
pthread_cond_t * block_needed_cond;
pthread_cond_t * block_ready_cond;
int max_waiters;
int wait_count;
recordid * wait_queue;
ROW *scratchA;
ROW *scratchB;
pageid_t mergedPages;
};
template<class PAGELAYOUT,class ENGINE,class ITER,class ROW,class TYPE>
void* insertThread(void* arg) {
insert_args<PAGELAYOUT,ENGINE,ITER,ROW,TYPE>* a =
(insert_args<PAGELAYOUT,ENGINE,ITER,ROW,TYPE>*)arg;
struct timeval start_tv, start_wait_tv, stop_tv;
int insert_count = 0;
pageid_t lastTreeBlocks = 0;
uint64_t lastTreeInserts = 0;
pageid_t desiredInserts = 0;
// this is a hand-tuned value; it should be set dynamically, not staticly
double K = 0.18;
// loop around here to produce multiple batches for merge.
while(1) {
gettimeofday(&start_tv,0);
ITER i(*(a->begin));
ITER j(desiredInserts ? *(a->begin) : *(a->end));
if(desiredInserts) {
j += desiredInserts;
}
recordid tree = TlsmCreate(-1, a->comparator_idx,a->rowsize);
lastTreeBlocks = compressData<PAGELAYOUT,ENGINE,TYPE,ITER,ROW>
(&i, &j,1,tree,a->pageAlloc,a->pageAllocState, &lastTreeInserts);
gettimeofday(&start_wait_tv,0);
pthread_mutex_lock(a->block_ready_mut);
while(a->wait_count >= a->max_waiters) {
pthread_cond_wait(a->block_needed_cond,a->block_ready_mut);
}
memcpy(&a->wait_queue[a->wait_count],&tree,sizeof(recordid));
a->wait_count++;
pthread_cond_signal(a->block_ready_cond);
gettimeofday(&stop_tv,0);
double work_elapsed = tv_to_double(start_wait_tv) - tv_to_double(start_tv);
double wait_elapsed = tv_to_double(stop_tv) - tv_to_double(start_wait_tv);
double elapsed = tv_to_double(stop_tv) - tv_to_double(start_tv);
printf("insert# %-6d waited %6.1f sec "
"worked %6.1f sec inserts %-12ld (%9.3f mb/s)\n",
++insert_count,
wait_elapsed,
work_elapsed,
(long int)lastTreeInserts,
(lastTreeInserts*(uint64_t)a->rowsize / (1024.0*1024.0)) / elapsed);
if(a->mergedPages != -1) {
desiredInserts = (pageid_t)(((double)a->mergedPages / K)
* ((double)lastTreeInserts
/ (double)lastTreeBlocks));
}
pthread_mutex_unlock(a->block_ready_mut);
}
return 0;
}
template<class PAGELAYOUT,class ENGINE,class ITER,class ROW,class TYPE>
void* mergeThread(void* arg) {
insert_args<PAGELAYOUT,ENGINE,ITER,ROW,TYPE>* a =
(insert_args<PAGELAYOUT,ENGINE,ITER,ROW,TYPE>*)arg;
struct timeval start_tv, wait_tv, stop_tv;
int merge_count = 0;
// loop around here to produce multiple batches for merge.
while(1) {
gettimeofday(&start_tv,0);
pthread_mutex_lock(a->block_ready_mut);
while(a->wait_count <2) {
pthread_cond_wait(a->block_ready_cond,a->block_ready_mut);
}
gettimeofday(&wait_tv,0);
recordid * oldTreeA = &a->wait_queue[0];
recordid * oldTreeB = &a->wait_queue[1];
pthread_mutex_unlock(a->block_ready_mut);
recordid tree = TlsmCreate(-1, a->comparator_idx,a->rowsize);
treeIterator<ROW,PAGELAYOUT> taBegin(*oldTreeA,*(a->scratchA),a->rowsize);
treeIterator<ROW,PAGELAYOUT> tbBegin(*oldTreeB,*(a->scratchB),a->rowsize);
treeIterator<ROW,PAGELAYOUT> *taEnd = taBegin.end();
treeIterator<ROW,PAGELAYOUT> *tbEnd = tbBegin.end();
mergeIterator<treeIterator<ROW,PAGELAYOUT>,ROW>
mBegin(taBegin, tbBegin, *taEnd, *tbEnd);
mergeIterator<treeIterator<ROW,PAGELAYOUT>,ROW>
mEnd(taBegin, tbBegin, *taEnd, *tbEnd);
mEnd.seekEnd();
uint64_t insertedTuples;
pageid_t mergedPages = compressData<PAGELAYOUT,ENGINE,TYPE,
mergeIterator<treeIterator<ROW,PAGELAYOUT>,ROW>,ROW>
(&mBegin, &mEnd,1,tree,a->pageAlloc,a->pageAllocState,&insertedTuples);
delete taEnd;
delete tbEnd;
gettimeofday(&stop_tv,0);
pthread_mutex_lock(a->block_ready_mut);
a->mergedPages = mergedPages;
// TlsmFree(wait_queue[0]) /// XXX Need to implement (de)allocation!
// TlsmFree(wait_queue[1])
memcpy(&a->wait_queue[0],&tree,sizeof(tree));
for(int i = 1; i + 1 < a->wait_count; i++) {
memcpy(&a->wait_queue[i],&a->wait_queue[i+1],sizeof(tree));
}
a->wait_count--;
pthread_mutex_unlock(a->block_ready_mut);
merge_count++;
double wait_elapsed = tv_to_double(wait_tv) - tv_to_double(start_tv);
double work_elapsed = tv_to_double(stop_tv) - tv_to_double(wait_tv);
double total_elapsed = wait_elapsed + work_elapsed;
double ratio = ((double)(insertedTuples * (uint64_t)a->rowsize))
/ (double)(PAGE_SIZE * mergedPages);
double throughput = ((double)(insertedTuples * (uint64_t)a->rowsize))
/ (1024.0 * 1024.0 * total_elapsed);
printf("merge # %-6d: comp ratio: %-9.3f waited %6.1f sec "
"worked %6.1f sec inserts %-12ld (%9.3f mb/s)\n", merge_count, ratio,
wait_elapsed, work_elapsed, (unsigned long)insertedTuples, throughput);
pthread_cond_signal(a->block_needed_cond);
}
return 0;
}
/**
Test driver for lsm tree. This function dispatches to the correct
invocations of compressData() and readDataFromTree().

View file

@ -0,0 +1,306 @@
#ifndef _LSMITERATORS_H__
#define _LSMITERATORS_H__
#include "stasis/page.h"
#include "stasis/bufferManager.h"
#include "stasis/page/compression/compression.h"
#include "stasis/page/compression/tuple.h"
/**
@file
This file contains a number of C++ STL-style iterators that are
used during the LSM tree merges.
*/
/**
@todo get rid of these undefs once compensation.h has been removed...
*/
#undef end
#undef begin
namespace rose {
template <class ITER, class ROW> class mergeIterator;
template <class ITER, class ROW>
inline const byte * toByteArray(mergeIterator<ITER,ROW> * const t);
/**
Scans through an LSM tree's leaf pages, each tuple in the tree, in
order. This iterator is designed for maximum forward scan
performance, and does not support all STL operations.
*/
template <class ROW, class PAGELAYOUT>
class treeIterator {
public:
explicit treeIterator(recordid tree, ROW &scratch, int keylen) :
tree_(tree),
scratch_(scratch),
keylen_(keylen),
lsmIterator_(lsmTreeIterator_open(-1,tree)),
slot_(0)
{
if(!lsmTreeIterator_next(-1, lsmIterator_)) {
currentPage_ = 0;
pageid_ = -1;
p_ = 0;
} else {
pageid_t * pid_tmp;
lsmTreeIterator_value(-1,lsmIterator_,(byte**)&pid_tmp);
pageid_ = *pid_tmp;
p_ = loadPage(-1,pageid_);
currentPage_ = (PAGELAYOUT*)p_->impl;
}
}
explicit treeIterator(treeIterator& t) :
tree_(t.tree_),
scratch_(t.scratch_),
keylen_(t.keylen_),
lsmIterator_(lsmTreeIterator_copy(-1,t.lsmIterator_)),
slot_(t.slot_),
pageid_(t.pageid_),
p_((Page*)((t.p_)?loadPage(-1,t.p_->id):0)),
currentPage_((PAGELAYOUT*)((p_)?p_->impl:0)) {
}
~treeIterator() {
lsmTreeIterator_close(-1, lsmIterator_);
if(p_) {
releasePage(p_);
p_ = 0;
}
}
ROW & operator*() {
ROW* readTuple = currentPage_->recordRead(-1,slot_, &scratch_);
if(!readTuple) {
releasePage(p_);
p_=0;
if(lsmTreeIterator_next(-1,lsmIterator_)) {
pageid_t *pid_tmp;
slot_ = 0;
lsmTreeIterator_value(-1,lsmIterator_,(byte**)&pid_tmp);
pageid_ = *pid_tmp;
p_ = loadPage(-1,pageid_);
currentPage_ = (PAGELAYOUT*)p_->impl;
readTuple = currentPage_->recordRead(-1,slot_, &scratch_);
assert(readTuple);
} else {
// past end of iterator! "end" should contain the pageid of the
// last leaf, and 1+ numslots on that page.
abort();
}
}
return scratch_;
}
inline bool operator==(const treeIterator &a) const {
return (slot_ == a.slot_ && pageid_ == a.pageid_);
}
inline bool operator!=(const treeIterator &a) const {
return !(*this==a);
}
inline void operator++() {
slot_++;
}
inline void operator--() {
// This iterator consumes its input, and only partially supports
// "==". "--" is just for book keeping, so we don't need to worry
// about setting the other state.
slot_--;
}
inline treeIterator* end() {
treeIterator* t = new treeIterator(tree_,scratch_,keylen_);
if(t->p_) {
releasePage(t->p_);
t->p_=0;
}
t->currentPage_ = 0;
pageid_t pid = TlsmLastPage(-1,tree_);
if(pid != -1) {
t->pageid_= pid;
Page * p = loadPage(-1, t->pageid_);
PAGELAYOUT * lastPage = (PAGELAYOUT*)p->impl;
t->slot_ = 0;
while(lastPage->recordRead(-1,t->slot_,&scratch_)) { t->slot_++; }
releasePage(p);
} else {
// begin == end already; we're done.
}
return t;
}
private:
explicit treeIterator() { abort(); }
void operator=(treeIterator & t) { abort(); }
int operator-(treeIterator & t) { abort(); }
recordid tree_;
ROW & scratch_;
int keylen_;
lladdIterator_t * lsmIterator_;
slot_index_t slot_;
pageid_t pageid_;
Page * p_;
PAGELAYOUT * currentPage_;
};
/**
This iterator takes two otehr iterators as arguments, and merges
their output, dropping duplicate entries.
@todo LSM tree is not be very useful without support for deletion
(and, therefore, versioning). Such support will require
modifications to mergeIterator (or perhaps, a new iterator
class).
*/
template<class ITER, class ROW>
class mergeIterator {
private:
static const int A = 0;
static const int B = 1;
static const int NONE = -1;
static const int BOTH = -2;
inline int calcCurr(int oldcur) {
int cur;
if(oldcur == NONE) { return NONE; }
if(a_ == aend_) {
if(b_ == bend_) {
cur = NONE;
} else {
cur = B;
}
} else {
if(b_ == bend_) {
cur = A;
} else {
if((*a_) < (*b_)) {
cur = A;
} else if((*a_) == (*b_)) {
cur = BOTH;
} else {
cur = B;
}
}
}
return cur;
}
public:
mergeIterator(ITER & a, ITER & b, ITER & aend, ITER & bend) :
off_(0),
a_(a),
b_(b),
aend_(aend),
bend_(bend),
curr_(calcCurr(A)),
before_eof_(0)
{}
explicit mergeIterator(mergeIterator &i) :
off_(i.off_),
a_(i.a_),
b_(i.b_),
aend_(i.aend_),
bend_(i.bend_),
curr_(i.curr_),
before_eof_(i.before_eof_)
{ }
ROW& operator* () {
if(curr_ == A || curr_ == BOTH) { return *a_; }
if(curr_ == B) { return *b_; }
abort();
}
void seekEnd() {
curr_ = NONE;
}
// XXX Only works if exactly one of the comparators is derived from end.
inline bool operator==(const mergeIterator &o) const {
if(curr_ == NONE && o.curr_ == NONE) {
return 1;
} else if(curr_ != NONE && o.curr_ != NONE) {
return (a_ == o.a_) && (b_ == o.b_);
}
return 0;
}
inline bool operator!=(const mergeIterator &o) const {
return !(*this == o);
}
inline void operator++() {
off_++;
if(curr_ == BOTH) {
++a_;
++b_;
} else {
if(curr_ == A) { ++a_; }
if(curr_ == B) { ++b_; }
}
curr_ = calcCurr(curr_);
}
inline void operator--() {
off_--;
if(curr_ == BOTH) {
--a_;
--b_;
} else {
if(curr_ == A) { --a_; }
if(curr_ == B) { --b_; }
}
if(curr_ == NONE) {
before_eof_ = 1;
} else {
before_eof_ = 0;
}
}
inline int operator-(mergeIterator&i) {
return off_ - i.off_;
}
inline void operator=(mergeIterator const &i) {
off_ = i.off_;
a_ = i.a_;
b_ = i.b_;
aend_ = i.aend_;
bend_ = i.bend_;
curr_ = i.curr_;
before_eof_ = i.before_eof;
}
inline unsigned int offset() { return off_; }
private:
unsigned int off_;
ITER a_;
ITER b_;
ITER aend_;
ITER bend_;
int curr_;
int before_eof_;
friend const byte* toByteArray<ITER,ROW>(mergeIterator<ITER,ROW> * const t);
};
/** Produce a byte array from the value stored at t's current
position */
template <class ITER, class ROW>
inline const byte * toByteArray(mergeIterator<ITER,ROW> * const t) {
if(t->curr_ == t->A || t->curr_ == t->BOTH) {
return toByteArray(&t->a_);
} else if(t->curr_ == t->B) {
return toByteArray(&t->b_);
}
abort();
}
template <class PAGELAYOUT>
inline const byte* toByteArray(treeIterator<int,PAGELAYOUT> *const t) {
return (const byte*)&(**t);
}
template <class PAGELAYOUT,class TYPE>
inline const byte* toByteArray(treeIterator<Tuple<TYPE>,PAGELAYOUT> *const t) {
return (**t).toByteArray();
}
}
#endif // _LSMITERATORS_H__

View file

@ -0,0 +1,324 @@
#ifndef _ROSE_COMPRESSION_LSMWORKERS_H__
#define _ROSE_COMPRESSION_LSMWORKERS_H__
#include <unistd.h>
/**
@file
This file contains the worker thread implementations required for
the LSM tree and its benchmarking code.
*/
namespace rose {
/**
cast from a TYPE to a byte *. These were written to make it easier
to write templates that work with different types of iterators.
*/
inline const byte * toByteArray(int const *const *const i) {
return (const byte*)*i;
}
inline const byte * toByteArray(Tuple<val_t>::iterator * const t) {
return (**t).toByteArray();
}
double tv_to_double(struct timeval tv) {
return static_cast<double>(tv.tv_sec) +
(static_cast<double>(tv.tv_usec) / 1000000.0);
}
template<class PAGELAYOUT,class ENGINE,class ITER,class ROW,class TYPE>
struct insert_args {
int comparator_idx;
int rowsize;typedef int32_t val_t;
ITER *begin;
ITER *end;
pageid_t(*pageAlloc)(int,void*);
void *pageAllocState;
pthread_mutex_t * block_ready_mut;
pthread_cond_t * block_needed_cond;
pthread_cond_t * block_ready_cond;
int max_waiters;
int wait_count;
recordid * wait_queue;
ROW *scratchA;
ROW *scratchB;
pageid_t mergedPages;
};
/**
The following initPage() functions initialize the page that is
passed into them, and allocate a new PAGELAYOUT object of the
appropriate type.
*/
template <class COMPRESSOR, class TYPE>
static Pstar<COMPRESSOR, TYPE> * initPage(Pstar<COMPRESSOR,TYPE> **pstar,
Page *p, TYPE current) {
*pstar = new Pstar<COMPRESSOR, TYPE>(-1, p);
(*pstar)->compressor()->offset(current);
return *pstar;
}
template <class COMPRESSOR, class TYPE>
static Pstar<COMPRESSOR, TYPE> * initPage(Pstar<COMPRESSOR,TYPE> **pstar,
Page *p, Tuple<TYPE> & current) {
*pstar = new Pstar<COMPRESSOR, TYPE>(-1, p);
(*pstar)->compressor()->offset(current);
return *pstar;
}
template <class COMPRESSOR, class TYPE >
static Multicolumn<Tuple<TYPE> > * initPage(Multicolumn<Tuple<TYPE> > ** mc,
Page *p, Tuple<TYPE> & t) {
column_number_t column_count = t.column_count();
plugin_id_t plugin_id =
rose::plugin_id<Multicolumn<Tuple<TYPE> >,COMPRESSOR,TYPE>();
plugin_id_t * plugins = new plugin_id_t[column_count];
for(column_number_t c = 0; c < column_count; c++) {
plugins[c] = plugin_id;
}
*mc = new Multicolumn<Tuple<TYPE> >(-1,p,column_count,plugins);
for(column_number_t c = 0; c < column_count; c++) {
((COMPRESSOR*)(*mc)->compressor(c))->offset(*t.get(c));
}
delete [] plugins;
return *mc;
}
template <class COMPRESSOR, class TYPE >
static Multicolumn<Tuple<TYPE> > * initPage(Multicolumn<Tuple<TYPE> > ** mc,
Page *p, TYPE t) {
plugin_id_t plugin_id =
rose::plugin_id<Multicolumn<Tuple<TYPE> >,COMPRESSOR,TYPE>();
plugin_id_t * plugins = new plugin_id_t[1];
plugins[0] = plugin_id;
*mc = new Multicolumn<Tuple<TYPE> >(-1,p,1,plugins);
((COMPRESSOR*)(*mc)->compressor(0))->offset(t);
delete [] plugins;
return *mc;
}
/**
Create pages that are managed by Pstar<COMPRESSOR, TYPE>, and
use them to store a compressed representation of the data set.
@param dataset A pointer to the data that should be compressed.
@param inserts The number of elements in dataset.
@return the number of pages that were needed to store the
compressed data.
*/
template <class PAGELAYOUT, class COMPRESSOR, class TYPE, class ITER, class ROW>
pageid_t compressData(ITER * const begin, ITER * const end,
int buildTree, recordid tree, pageid_t (*pageAlloc)(int,void*),
void *pageAllocState, uint64_t * inserted) {
*inserted = 0;
if(*begin == *end) {
return 0;
}
pageid_t next_page = pageAlloc(-1,pageAllocState);
Page *p = loadPage(-1, next_page);
pageid_t pageCount = 0;
if(*begin != *end && buildTree) {
TlsmAppendPage(-1,tree,toByteArray(begin),p->id);
}
pageCount++;
PAGELAYOUT * mc;
initPage<COMPRESSOR,TYPE>(&mc, p, **begin);
int lastEmpty = 0;
for(ITER i(*begin); i != *end; ++i) {
rose::slot_index_t ret = mc->append(-1, *i);
(*inserted)++;
if(ret == rose::NOSPACE) {
p->dirty = 1;
mc->pack();
releasePage(p);
--(*end);
if(i != *end) {
next_page = pageAlloc(-1,pageAllocState);
p = loadPage(-1, next_page);
mc = initPage<COMPRESSOR, TYPE>(&mc, p, *i);
if(buildTree) {
TlsmAppendPage(-1,tree,toByteArray(&i),p->id);
}
pageCount++;
lastEmpty = 0;
} else {
lastEmpty = 1;
}
++(*end);
--i;
}
}
p->dirty = 1;
mc->pack();
releasePage(p);
return pageCount;
}
template<class PAGELAYOUT,class ENGINE,class ITER,class ROW,class TYPE>
void* insertThread(void* arg) {
insert_args<PAGELAYOUT,ENGINE,ITER,ROW,TYPE>* a =
(insert_args<PAGELAYOUT,ENGINE,ITER,ROW,TYPE>*)arg;
struct timeval start_tv, start_wait_tv, stop_tv;
int insert_count = 0;
pageid_t lastTreeBlocks = 0;
uint64_t lastTreeInserts = 0;
pageid_t desiredInserts = 0;
// this is a hand-tuned value; it should be set dynamically, not staticly
double K = 0.18;
// loop around here to produce multiple batches for merge.
while(1) {
gettimeofday(&start_tv,0);
ITER i(*(a->begin));
ITER j(desiredInserts ? *(a->begin) : *(a->end));
if(desiredInserts) {
j += desiredInserts;
}
recordid tree = TlsmCreate(-1, a->comparator_idx,a->rowsize);
lastTreeBlocks = compressData<PAGELAYOUT,ENGINE,TYPE,ITER,ROW>
(&i, &j,1,tree,a->pageAlloc,a->pageAllocState, &lastTreeInserts);
gettimeofday(&start_wait_tv,0);
pthread_mutex_lock(a->block_ready_mut);
while(a->wait_count >= a->max_waiters) {
pthread_cond_wait(a->block_needed_cond,a->block_ready_mut);
}
memcpy(&a->wait_queue[a->wait_count],&tree,sizeof(recordid));
a->wait_count++;
pthread_cond_signal(a->block_ready_cond);
gettimeofday(&stop_tv,0);
double work_elapsed = tv_to_double(start_wait_tv) - tv_to_double(start_tv);
double wait_elapsed = tv_to_double(stop_tv) - tv_to_double(start_wait_tv);
double elapsed = tv_to_double(stop_tv) - tv_to_double(start_tv);
printf("insert# %-6d waited %6.1f sec "
"worked %6.1f sec inserts %-12ld (%9.3f mb/s)\n",
++insert_count,
wait_elapsed,
work_elapsed,
(long int)lastTreeInserts,
(lastTreeInserts*(uint64_t)a->rowsize / (1024.0*1024.0)) / elapsed);
if(a->mergedPages != -1) {
desiredInserts = (pageid_t)(((double)a->mergedPages / K)
* ((double)lastTreeInserts
/ (double)lastTreeBlocks));
}
pthread_mutex_unlock(a->block_ready_mut);
}
return 0;
}
template<class PAGELAYOUT,class ENGINE,class ITER,class ROW,class TYPE>
void* mergeThread(void* arg) {
insert_args<PAGELAYOUT,ENGINE,ITER,ROW,TYPE>* a =
(insert_args<PAGELAYOUT,ENGINE,ITER,ROW,TYPE>*)arg;
struct timeval start_tv, wait_tv, stop_tv;
int merge_count = 0;
// loop around here to produce multiple batches for merge.
while(1) {
gettimeofday(&start_tv,0);
pthread_mutex_lock(a->block_ready_mut);
while(a->wait_count <2) {
pthread_cond_wait(a->block_ready_cond,a->block_ready_mut);
}
gettimeofday(&wait_tv,0);
recordid * oldTreeA = &a->wait_queue[0];
recordid * oldTreeB = &a->wait_queue[1];
pthread_mutex_unlock(a->block_ready_mut);
recordid tree = TlsmCreate(-1, a->comparator_idx,a->rowsize);
treeIterator<ROW,PAGELAYOUT> taBegin(*oldTreeA,*(a->scratchA),a->rowsize);
treeIterator<ROW,PAGELAYOUT> tbBegin(*oldTreeB,*(a->scratchB),a->rowsize);
treeIterator<ROW,PAGELAYOUT> *taEnd = taBegin.end();
treeIterator<ROW,PAGELAYOUT> *tbEnd = tbBegin.end();
mergeIterator<treeIterator<ROW,PAGELAYOUT>,ROW>
mBegin(taBegin, tbBegin, *taEnd, *tbEnd);
mergeIterator<treeIterator<ROW,PAGELAYOUT>,ROW>
mEnd(taBegin, tbBegin, *taEnd, *tbEnd);
mEnd.seekEnd();
uint64_t insertedTuples;
pageid_t mergedPages = compressData<PAGELAYOUT,ENGINE,TYPE,
mergeIterator<treeIterator<ROW,PAGELAYOUT>,ROW>,ROW>
(&mBegin, &mEnd,1,tree,a->pageAlloc,a->pageAllocState,&insertedTuples);
delete taEnd;
delete tbEnd;
gettimeofday(&stop_tv,0);
pthread_mutex_lock(a->block_ready_mut);
a->mergedPages = mergedPages;
// TlsmFree(wait_queue[0]) /// XXX Need to implement (de)allocation!
// TlsmFree(wait_queue[1])
memcpy(&a->wait_queue[0],&tree,sizeof(tree));
for(int i = 1; i + 1 < a->wait_count; i++) {
memcpy(&a->wait_queue[i],&a->wait_queue[i+1],sizeof(tree));
}
a->wait_count--;
pthread_mutex_unlock(a->block_ready_mut);
merge_count++;
double wait_elapsed = tv_to_double(wait_tv) - tv_to_double(start_tv);
double work_elapsed = tv_to_double(stop_tv) - tv_to_double(wait_tv);
double total_elapsed = wait_elapsed + work_elapsed;
double ratio = ((double)(insertedTuples * (uint64_t)a->rowsize))
/ (double)(PAGE_SIZE * mergedPages);
double throughput = ((double)(insertedTuples * (uint64_t)a->rowsize))
/ (1024.0 * 1024.0 * total_elapsed);
printf("merge # %-6d: comp ratio: %-9.3f waited %6.1f sec "
"worked %6.1f sec inserts %-12ld (%9.3f mb/s)\n", merge_count, ratio,
wait_elapsed, work_elapsed, (unsigned long)insertedTuples, throughput);
pthread_cond_signal(a->block_needed_cond);
}
return 0;
}
}
#endif // _ROSE_COMPRESSION_LSMWORKERS_H__

View file

@ -42,7 +42,7 @@ static const slot_index_t MAX_INDEX = UINT_MAX-2;
*/
template <class PAGEFORMAT, class COMPRESSOR, class TYPE>
plugin_id_t plugin_id() {
inline plugin_id_t plugin_id() {
/* type_idx maps from sizeof(TYPE) to a portion of a page type:
(u)int8_t -> 0

View file

@ -8,7 +8,7 @@
#include "pstar.h" // for typedefs + consts (XXX add new header?)
#include "tuple.h" // XXX rename tuple.hx
#include "pluginDispatcher.h"
// Copyright 2007 Google Inc. All Rights Reserved.
// Author: sears@google.com (Rusty Sears)

View file

@ -0,0 +1,176 @@
#ifndef _ROSE_COMPRESSION_PLUGINDISPATCHER_H__
#define _ROSE_COMPRESSION_PLUGINDISPATCHER_H__
namespace rose {
/**
PluginDispatcher essentially just wraps calls to compressors in
switch statements.
It has a number of deficiencies:
1) Performance. The switch statement is the main CPU bottleneck
for both of the current compression schemes.
2) PluginDispatcher has to "know" about all compression
algorithms and all data types that it may encounter.
This approach has one advantage; it doesn't preclude other
(templatized) implementations that hardcode schema formats a
compile time.
Performance could be partially addressed by using a blocking append
algorithm:
A Queue up multiple append requests (or precompute read requests)
when appropriate.
B Before appending, calculate a lower (pessimistic) bound on the
number of inserted tuples that can fit in the page:
n = (free bytes) / (maximum space per tuple)
C Compress n tuples from each column at a time. Only evaluate the
switch statement once for each column.
D Repeat steps B and C until n is below some threshold, then
revert the current behavior.
Batching read requests is simpler, and would be useful for
sequential scans over the data.
*/
class PluginDispatcher{
public:
#define dispatchSwitch(col,cases,...) \
static const int base = USER_DEFINED_PAGE(0) + 2 * 2 * 4;\
switch(plugin_ids_[col]-base) { \
cases(0, For<uint8_t>, col,uint8_t, __VA_ARGS__); \
cases(1, For<uint16_t>,col,uint16_t,__VA_ARGS__); \
cases(2, For<uint32_t>,col,uint32_t,__VA_ARGS__); \
cases(3, For<uint64_t>,col,uint64_t,__VA_ARGS__); \
cases(4, For<int8_t>, col,int8_t, __VA_ARGS__); \
cases(5, For<int16_t>, col,int16_t, __VA_ARGS__); \
cases(6, For<int32_t>, col,int32_t, __VA_ARGS__); \
cases(7, For<int64_t>, col,int64_t, __VA_ARGS__); \
cases(8, Rle<uint8_t>, col,uint8_t, __VA_ARGS__); \
cases(9, Rle<uint16_t>,col,uint16_t,__VA_ARGS__); \
cases(10,Rle<uint32_t>,col,uint32_t,__VA_ARGS__); \
cases(11,Rle<uint64_t>,col,uint64_t,__VA_ARGS__); \
cases(12,Rle<int8_t>, col,int8_t, __VA_ARGS__); \
cases(13,Rle<int16_t>, col,int16_t, __VA_ARGS__); \
cases(14,Rle<int32_t>, col,int32_t, __VA_ARGS__); \
cases(15,Rle<int64_t>, col,int64_t, __VA_ARGS__); \
default: abort(); \
};
#define caseAppend(off,plug_type,col,type,fcn,ret,xid,dat,...) \
case off: { \
ret = ((plug_type*)plugins_[col])->fcn(xid,*(type*)dat,__VA_ARGS__); } break
#define caseSetPlugin(off,plug_type,col,type,m) \
case off: { plugins_[col] = new plug_type(m); } break
#define caseDelPlugin(off,plug_type,col,type,m) \
case off: { delete (plug_type*)plugins_[col]; } break
#define caseRead(off,plug_type,col,type,m,ret,fcn,xid,slot,except,scratch) \
case off: { ret = ((plug_type*)plugins_[col])->fcn(xid,slot,except,(type*)scratch); } break
#define caseNoArg(off,plug_type,col,type,m,ret,fcn) \
case off: { ret = ((plug_type*)plugins_[col])->fcn(); } break
#define caseInitMem(off,plug_type,col,type,m) \
case off: { ((plug_type*)plugins_[col])->init_mem(m); } break
#define caseMem(off,plug_type,col,type,m) \
case off: { ((plug_type*)plugins_[col])->mem(m); } break
#define caseCompressor(off,plug_type,col,type,nil) \
case off: { ret = (plug_type*)plugins_[col]; } break
inline slot_index_t recordAppend(int xid, column_number_t col,
const void *dat, byte_off_t* except,
byte *exceptions, int *free_bytes) {
slot_index_t ret;
dispatchSwitch(col,caseAppend,append,ret,xid,dat,except,exceptions,
free_bytes);
return ret;
}
inline void *recordRead(int xid, byte *mem, column_number_t col,
slot_index_t slot, byte* exceptions, void *scratch) {
void * ret;
dispatchSwitch(col,caseRead,mem,ret,recordRead,xid,slot,exceptions,scratch);
return ret;
}
inline byte_off_t bytes_used(column_number_t col) {
byte_off_t ret;
dispatchSwitch(col,caseNoArg,mem,ret,bytes_used);
return ret;
}
inline void init_mem(byte * mem, column_number_t col) {
dispatchSwitch(col,caseInitMem,mem);
}
inline void mem(byte * mem, column_number_t col) {
dispatchSwitch(col,caseMem,mem);
}
inline void * compressor(column_number_t col) {
void * ret;
dispatchSwitch(col,caseCompressor,0);
return ret;
}
PluginDispatcher(column_number_t column_count) :
column_count_(column_count), plugin_ids_(new plugin_id_t[column_count]), plugins_(new void*[column_count]) {
for(column_number_t i = 0; i < column_count; i++) {
plugin_ids_[i] = 0;
}
}
PluginDispatcher(int xid, byte *mem,column_number_t column_count, plugin_id_t * plugins) :
column_count_(column_count), plugin_ids_(new plugin_id_t[column_count]), plugins_(new void*[column_count]) {
for(column_number_t i = 0; i < column_count; i++) {
plugin_ids_[i] = 0;
set_plugin(mem,i,plugins[i]);
}
}
inline void set_plugin(byte *mem,column_number_t c, plugin_id_t p) {
if(plugin_ids_[c]) {
dispatchSwitch(c,caseDelPlugin,0);
}
plugin_ids_[c] = p;
dispatchSwitch(c,caseSetPlugin,mem);
}
~PluginDispatcher() {
for(column_number_t i = 0; i < column_count_; i++) {
dispatchSwitch(i,caseDelPlugin,0);
}
delete[] plugin_ids_;
delete[] plugins_;
}
#undef caseAppend
#undef caseSetPlugin
#undef caseDelPlugin
#undef caseRead
#undef caseNoArg
#undef caseInitMem
#undef caseCompressor
private:
column_number_t column_count_;
plugin_id_t * plugin_ids_;
void ** plugins_;
};
}
#endif // _ROSE_COMPRESSION_PLUGINDISPATCHER_H__

View file

@ -16,9 +16,6 @@ namespace rose {
template <class COMPRESSOR, class TYPE>
void pStarLoaded(Page *p);
template<class PAGEFORMAT, class COMPRESSOR, class TYPE>
inline plugin_id_t plugin_id();
template <class COMPRESSOR, class TYPE> class Pstar {
public:
// Initialize a new Pstar page

View file

@ -13,181 +13,12 @@
#include <ctype.h>
#include "compression.h"
#include "pstar-impl.h"
#include "multicolumn.h"
#include "for.h"
#include "rle.h"
#include "string.h"
namespace rose {
template <class TUPLE> class Multicolumn;
template<class TYPE> class Tuple;
/**
PluginDispatcher essentially just wraps calls to compressors in
switch statements.
It has a number of deficiencies:
1) Performance. The switch statement is the main CPU bottleneck
for both of the current compression schemes.
2) PluginDispatcher has to "know" about all compression
algorithms and all data types that it may encounter.
This approach has one advantage; it doesn't preclude other
(templatized) implementations that hardcode schema formats a
compile time.
Performance could be partially addressed by using a blocking append
algorithm:
A Queue up multiple append requests (or precompute read requests)
when appropriate.
B Before appending, calculate a lower (pessimistic) bound on the
number of inserted tuples that can fit in the page:
n = (free bytes) / (maximum space per tuple)
C Compress n tuples from each column at a time. Only evaluate the
switch statement once for each column.
D Repeat steps B and C until n is below some threshold, then
revert the current behavior.
Batching read requests is simpler, and would be useful for
sequential scans over the data.
*/
class PluginDispatcher{
public:
#define dispatchSwitch(col,cases,...) \
static const int base = USER_DEFINED_PAGE(0) + 2 * 2 * 4;\
switch(plugin_ids_[col]-base) { \
cases(0, For<uint8_t>, col,uint8_t, __VA_ARGS__); \
cases(1, For<uint16_t>,col,uint16_t,__VA_ARGS__); \
cases(2, For<uint32_t>,col,uint32_t,__VA_ARGS__); \
cases(3, For<uint64_t>,col,uint64_t,__VA_ARGS__); \
cases(4, For<int8_t>, col,int8_t, __VA_ARGS__); \
cases(5, For<int16_t>, col,int16_t, __VA_ARGS__); \
cases(6, For<int32_t>, col,int32_t, __VA_ARGS__); \
cases(7, For<int64_t>, col,int64_t, __VA_ARGS__); \
cases(8, Rle<uint8_t>, col,uint8_t, __VA_ARGS__); \
cases(9, Rle<uint16_t>,col,uint16_t,__VA_ARGS__); \
cases(10,Rle<uint32_t>,col,uint32_t,__VA_ARGS__); \
cases(11,Rle<uint64_t>,col,uint64_t,__VA_ARGS__); \
cases(12,Rle<int8_t>, col,int8_t, __VA_ARGS__); \
cases(13,Rle<int16_t>, col,int16_t, __VA_ARGS__); \
cases(14,Rle<int32_t>, col,int32_t, __VA_ARGS__); \
cases(15,Rle<int64_t>, col,int64_t, __VA_ARGS__); \
default: abort(); \
};
#define caseAppend(off,plug_type,col,type,fcn,ret,xid,dat,...) \
case off: { \
ret = ((plug_type*)plugins_[col])->fcn(xid,*(type*)dat,__VA_ARGS__); } break
#define caseSetPlugin(off,plug_type,col,type,m) \
case off: { plugins_[col] = new plug_type(m); } break
#define caseDelPlugin(off,plug_type,col,type,m) \
case off: { delete (plug_type*)plugins_[col]; } break
#define caseRead(off,plug_type,col,type,m,ret,fcn,xid,slot,except,scratch) \
case off: { ret = ((plug_type*)plugins_[col])->fcn(xid,slot,except,(type*)scratch); } break
#define caseNoArg(off,plug_type,col,type,m,ret,fcn) \
case off: { ret = ((plug_type*)plugins_[col])->fcn(); } break
#define caseInitMem(off,plug_type,col,type,m) \
case off: { ((plug_type*)plugins_[col])->init_mem(m); } break
#define caseMem(off,plug_type,col,type,m) \
case off: { ((plug_type*)plugins_[col])->mem(m); } break
#define caseCompressor(off,plug_type,col,type,nil) \
case off: { ret = (plug_type*)plugins_[col]; } break
inline slot_index_t recordAppend(int xid, column_number_t col,
const void *dat, byte_off_t* except,
byte *exceptions, int *free_bytes) {
slot_index_t ret;
dispatchSwitch(col,caseAppend,append,ret,xid,dat,except,exceptions,
free_bytes);
return ret;
}
inline void *recordRead(int xid, byte *mem, column_number_t col,
slot_index_t slot, byte* exceptions, void *scratch) {
void * ret;
dispatchSwitch(col,caseRead,mem,ret,recordRead,xid,slot,exceptions,scratch);
return ret;
}
inline byte_off_t bytes_used(column_number_t col) {
byte_off_t ret;
dispatchSwitch(col,caseNoArg,mem,ret,bytes_used);
return ret;
}
inline void init_mem(byte * mem, column_number_t col) {
dispatchSwitch(col,caseInitMem,mem);
}
inline void mem(byte * mem, column_number_t col) {
dispatchSwitch(col,caseMem,mem);
}
inline void * compressor(column_number_t col) {
void * ret;
dispatchSwitch(col,caseCompressor,0);
return ret;
}
PluginDispatcher(column_number_t column_count) :
column_count_(column_count), plugin_ids_(new plugin_id_t[column_count]), plugins_(new void*[column_count]) {
for(column_number_t i = 0; i < column_count; i++) {
plugin_ids_[i] = 0;
}
}
PluginDispatcher(int xid, byte *mem,column_number_t column_count, plugin_id_t * plugins) :
column_count_(column_count), plugin_ids_(new plugin_id_t[column_count]), plugins_(new void*[column_count]) {
for(column_number_t i = 0; i < column_count; i++) {
plugin_ids_[i] = 0;
set_plugin(mem,i,plugins[i]);
}
}
inline void set_plugin(byte *mem,column_number_t c, plugin_id_t p) {
if(plugin_ids_[c]) {
dispatchSwitch(c,caseDelPlugin,0);
}
plugin_ids_[c] = p;
dispatchSwitch(c,caseSetPlugin,mem);
}
~PluginDispatcher() {
for(column_number_t i = 0; i < column_count_; i++) {
dispatchSwitch(i,caseDelPlugin,0);
}
delete[] plugin_ids_;
delete[] plugins_;
}
#undef caseAppend
#undef caseSetPlugin
#undef caseDelPlugin
#undef caseRead
#undef caseNoArg
#undef caseInitMem
#undef caseCompressor
private:
column_number_t column_count_;
plugin_id_t * plugin_ids_;
void ** plugins_;
};
template<class TYPE>
class Tuple {
public: