Implemented TlsmTableFind(); fixed quite a few bugs in merge, comparators, etc...

This commit is contained in:
Sears Russell 2007-11-06 02:27:04 +00:00
parent d03a4e0c1f
commit f2df518338
13 changed files with 559 additions and 213 deletions

View file

@ -13,16 +13,34 @@
#include "stasis/page/compression/pageLayout.h"
namespace rose {
template<class PAGELAYOUT>
void getTuple(long int i, typename PAGELAYOUT::FMT::TUP & t) {
typename PAGELAYOUT::FMT::TUP::TYP0 m = i;
typename PAGELAYOUT::FMT::TUP::TYP1 j = i / 65536;
typename PAGELAYOUT::FMT::TUP::TYP2 k = i / 12514500;
typename PAGELAYOUT::FMT::TUP::TYP3 l = i / 10000000;
t.set0(&m);
t.set1(&j);
t.set2(&k);
t.set3(&l);
t.set4(&j);
t.set5(&k);
t.set6(&l);
t.set7(&j);
t.set8(&k);
t.set9(&l);
}
template<class PAGELAYOUT>
int main(int argc, char **argv) {
static int cmp_num = 1;
static int init_num = 1;
unlink("storefile.txt");
unlink("logfile.txt");
sync();
stasis_page_impl_register(PAGELAYOUT::FMT::impl());
PAGELAYOUT::initPageLayout();
bufferManagerNonBlockingSlowHandleType = IO_HANDLE_PFILE;
Tinit();
@ -36,6 +54,7 @@ namespace rose {
lsmTableHandle<PAGELAYOUT>* h = TlsmTableStart<PAGELAYOUT>(lsmTable);
typename PAGELAYOUT::FMT::TUP t;
typename PAGELAYOUT::FMT::TUP s;
long INSERTS;
if(argc == 2) {
@ -43,8 +62,6 @@ namespace rose {
} else {
INSERTS = 10 * 1000 * 1000;
}
// static const long INSERTS = 10000000;
// static const long INSERTS = 100000;
static const long COUNT = INSERTS / 100;
long int count = COUNT;
@ -55,31 +72,19 @@ namespace rose {
start = rose::tv_to_double(start_tv);
last_start = start;
printf("tuple 'size'%d ; %d\n", PAGELAYOUT::FMT::TUP::sizeofBytes(), sizeof(typename PAGELAYOUT::FMT::TUP));
printf("tuple 'size'%d ; %ld\n", PAGELAYOUT::FMT::TUP::sizeofBytes(), sizeof(typename PAGELAYOUT::FMT::TUP));
for(long int i = 0; i < INSERTS; i++) {
typename PAGELAYOUT::FMT::TUP::TYP0 m = i;// % 65536;
typename PAGELAYOUT::FMT::TUP::TYP1 j = 0 / 65536;
typename PAGELAYOUT::FMT::TUP::TYP2 k = 0 / 12514500;
typename PAGELAYOUT::FMT::TUP::TYP3 l = 0 / 10000000;
t.set0(&m);
t.set1(&l);
t.set2(&l);
t.set3(&l);
t.set4(&l);
t.set5(&l);
t.set6(&l);
t.set7(&l);
t.set8(&l);
t.set9(&l);
getTuple<PAGELAYOUT>(i,t);
TlsmTableInsert(h,t);
getTuple<PAGELAYOUT>(i,t);
assert(TlsmTableFind(xid,h,t,s));
count --;
if(!count) {
count = COUNT;
gettimeofday(&now_tv,0);
now = tv_to_double(now_tv);
printf("%3d%% complete "
printf("%3ld%% write "
"%9.3f Mtup/sec (avg) %9.3f Mtup/sec (cur) "
"%9.3f Mbyte/sec (avg) %9.3f Mbyte/sec (cur)\n",
((i+1) * 100) / INSERTS,
@ -91,11 +96,45 @@ namespace rose {
last_start = now;
}
}
printf("insertions done.\n"); fflush(stdout);
count = COUNT;
gettimeofday(&start_tv,0);
start = rose::tv_to_double(start_tv);
last_start = start;
for(long int i = 0; i < INSERTS; i++) {
getTuple<PAGELAYOUT>(i,t);
typename PAGELAYOUT::FMT::TUP const * const sp = TlsmTableFind(xid,h,t,s);
assert(sp);
assert(*sp == s);
count--;
if(!count) {
count = COUNT;
gettimeofday(&now_tv,0);
now = tv_to_double(now_tv);
printf("%3ld%% read "
"%9.3f Mtup/sec (avg) %9.3f Mtup/sec (cur) "
"%9.3f Mbyte/sec (avg) %9.3f Mbyte/sec (cur)\n",
((i+1) * 100) / INSERTS,
((double)i/1000000.0)/(now-start),
((double)count/1000000.0)/(now-last_start),
(((double)PAGELAYOUT::FMT::TUP::sizeofBytes())*(double)i/1000000.0)/(now-start),
(((double)PAGELAYOUT::FMT::TUP::sizeofBytes())*(double)count/1000000.0)/(now-last_start)
);
last_start = now;
}
}
TlsmTableStop<PAGELAYOUT>(h);
Tdeinit();
printf("test\n");
return 0;
}
}
@ -133,7 +172,7 @@ int main(int argc, char **argv) {
Rle<typ2>,Rle<typ3>,
Rle<typ4>,Rle<typ5>,
Rle<typ6>,Rle<typ7>,
Rle<typ8>,Rle<typ9> >
Rle<typ8>,For<typ9> >
>
>
(argc,argv);

View file

@ -39,9 +39,9 @@ namespace rose {
pageid_t * out_tree_size;
pageid_t max_size;
pageid_t r_i;
typename ITERA::handle in_process_tree;
typename ITERB::handle ** in_tree;
typename ITERA::handle ** out_tree;
typename ITERA::handle my_tree;
};
template <class PAGELAYOUT, class ITER>
@ -58,41 +58,29 @@ namespace rose {
pageid_t pageCount = 0;
if(*begin != *end) {
TlsmAppendPage(xid,tree,toByteArray(begin),pageAlloc,pageAllocState,p->id);
TlsmAppendPage(xid,tree,(**begin).toByteArray(),pageAlloc,
pageAllocState,p->id);
}
pageCount++;
typename PAGELAYOUT::FMT * mc = PAGELAYOUT::initPage(p, &**begin);
int lastEmpty = 0;
for(ITER i(*begin); i != *end; ++i) {
rose::slot_index_t ret = mc->append(xid, *i);
(*inserted)++;
if(ret == rose::NOSPACE) {
p->dirty = 1;
mc->pack();
releasePage(p);
--(*end);
if(i != *end) {
next_page = pageAlloc(xid,pageAllocState);
p = loadPage(xid, next_page);
mc = PAGELAYOUT::initPage(p, &*i);
TlsmAppendPage(xid,tree,toByteArray(&i),pageAlloc,pageAllocState,p->id);
pageCount++;
lastEmpty = 0;
} else {
lastEmpty = 1;
}
++(*end);
--i;
next_page = pageAlloc(xid,pageAllocState);
p = loadPage(xid, next_page);
mc = PAGELAYOUT::initPage(p, &*i);
TlsmAppendPage(xid,tree,(*i).toByteArray(),pageAlloc,pageAllocState,p->id);
pageCount++;
ret = mc->append(xid, *i);
assert(ret != rose::NOSPACE);
}
(*inserted)++;
}
p->dirty = 1;
@ -110,28 +98,22 @@ namespace rose {
void* mergeThread(void* arg) {
// The ITER argument of a is unused (we don't look at it's begin or end fields...)
merge_args<PAGELAYOUT, ITERA, ITERB> * a = (merge_args<PAGELAYOUT, ITERA, ITERB>*)arg;
struct timeval start_tv, wait_tv, stop_tv;
int merge_count = 0;
int xid = Tbegin();
// Initialize tree with an empty tree.
// XXX hardcodes ITERA's type:
typename ITERA::handle oldtree
typename ITERA::handle tree
= new typename ITERA::treeIteratorHandle(
TlsmCreate(xid, PAGELAYOUT::cmp_id(),a->pageAlloc,
a->pageAllocState,PAGELAYOUT::FMT::TUP::sizeofBytes()) );
a->pageAllocState,PAGELAYOUT::FMT::TUP::sizeofBytes()) );
Tcommit(xid);
// loop around here to produce multiple batches for merge.
// loop around here to produce multiple batches for merge.
gettimeofday(&start_tv,0);
while(1) {
pthread_mutex_lock(a->block_ready_mut);
int done = 0;
@ -145,7 +127,6 @@ namespace rose {
pthread_cond_wait(a->in_block_ready_cond,a->block_ready_mut);
}
if(done) {
a->in_process_tree = oldtree;
pthread_cond_signal(a->out_block_ready_cond);
pthread_mutex_unlock(a->block_ready_mut);
break;
@ -153,7 +134,7 @@ namespace rose {
gettimeofday(&wait_tv,0);
ITERA taBegin(oldtree);
ITERA taBegin(tree);
ITERB tbBegin(**a->in_tree);
ITERA *taEnd = taBegin.end();
@ -164,7 +145,7 @@ namespace rose {
xid = Tbegin();
recordid tree = TlsmCreate(xid, PAGELAYOUT::cmp_id(),a->pageAlloc,
tree->r_ = TlsmCreate(xid, PAGELAYOUT::cmp_id(),a->pageAlloc,
a->pageAllocState,PAGELAYOUT::FMT::TUP::sizeofBytes());
mergeIterator<ITERA, ITERB, typename PAGELAYOUT::FMT::TUP>
@ -178,7 +159,7 @@ namespace rose {
pageid_t mergedPages = compressData
<PAGELAYOUT,mergeIterator<ITERA,ITERB,typename PAGELAYOUT::FMT::TUP> >
(xid, &mBegin, &mEnd,tree,a->pageAlloc,a->pageAllocState,&insertedTuples);
(xid, &mBegin, &mEnd,tree->r_,a->pageAlloc,a->pageAllocState,&insertedTuples);
delete taEnd;
delete tbEnd;
@ -200,7 +181,7 @@ namespace rose {
/ (1024.0 * 1024.0 * total_elapsed);
printf("worker %d merge # %-6d: comp ratio: %-9.3f waited %6.1f sec "
"worked %6.1f sec inserts %-12ld (%9.3f mb/s) %6d pages (need %6d)\n", a->worker_id, merge_count, ratio,
"worked %6.1f sec inserts %-12ld (%9.3f mb/s) %6ld pages (need %6ld)\n", a->worker_id, merge_count, ratio,
wait_elapsed, work_elapsed, (unsigned long)insertedTuples, throughput, mergedPages, !a->out_tree_size ? -1 : (FUDGE * *a->out_tree_size / a->r_i));
@ -239,21 +220,22 @@ namespace rose {
// XXX C++? Objects? Constructors? Who needs them?
*a->out_tree = (typeof(*a->out_tree))malloc(sizeof(**a->out_tree));
**a->out_tree = new typename ITERA::treeIteratorHandle(tree);
**a->out_tree = new typename ITERA::treeIteratorHandle(tree->r_);
pthread_cond_signal(a->out_block_ready_cond);
// This is a bit wasteful; allocate a new empty tree to merge against.
// We don't want to ever look at the one we just handed upstream...
// We could wait for an in tree to be ready, and then pass it directly
// to compress data (to avoid all those merging comparisons...)
tree = TlsmCreate(xid, PAGELAYOUT::cmp_id(),a->pageAlloc,
tree->r_ = TlsmCreate(xid, PAGELAYOUT::cmp_id(),a->pageAlloc,
a->pageAllocState,PAGELAYOUT::FMT::TUP::sizeofBytes());
}
// XXX TlsmFree(xid,oldtree);
// XXX TlsmFree(xid,*a->tree);
*oldtree = tree;
assert(a->my_tree->r_.page != tree->r_.page);
*a->my_tree = *tree;
pthread_mutex_unlock(a->block_ready_mut);
@ -290,8 +272,8 @@ namespace rose {
h.mediumTree = TlsmCreate(xid, PAGELAYOUT::cmp_id(),
TlsmRegionAllocRid,&h.mediumTreeAllocState,
PAGELAYOUT::FMT::TUP::sizeofBytes());
epoch_t beginning = 0;
epoch_t end = 0;
//XXX epoch_t beginning = 0;
//XXX epoch_t end = 0;
Tset(xid, ret, &h);
return ret;
}
@ -420,9 +402,9 @@ namespace rose {
0, // biggest component computes its size directly.
0, // No max size for biggest component
R,
new typename LSM_ITER::treeIteratorHandle(NULLRID),
block1_scratch,
0
0,
new typename LSM_ITER::treeIteratorHandle(NULLRID)
};
*ret->args1 = tmpargs1;
void * (*merger1)(void*) = mergeThread<PAGELAYOUT, LSM_ITER, LSM_ITER>;
@ -446,9 +428,10 @@ namespace rose {
block1_size,
(R * MEM_SIZE) / (PAGE_SIZE * 4), // XXX 4 = estimated compression ratio
R,
new typename LSM_ITER::treeIteratorHandle(NULLRID),
//new typename LSM_ITER::treeIteratorHandle(NULLRID),
block0_scratch,
block1_scratch
block1_scratch,
new typename LSM_ITER::treeIteratorHandle(NULLRID)
};
*ret->args2 = tmpargs2;
void * (*merger2)(void*) = mergeThread<PAGELAYOUT, LSM_ITER, RB_ITER>;
@ -488,18 +471,99 @@ namespace rose {
void TlsmTableInsert( lsmTableHandle<PAGELAYOUT> *h,
typename PAGELAYOUT::FMT::TUP &t) {
h->scratch_handle->insert(t);
//XXX 4 = estimated compression ratio.
uint64_t handleBytes = h->scratch_handle->size() * (RB_TREE_OVERHEAD + PAGELAYOUT::FMT::TUP::sizeofBytes());
//XXX 4 = estimated compression ratio.
uint64_t inputSizeThresh = (4 * PAGE_SIZE * *h->input_size); // / (PAGELAYOUT::FMT::TUP::sizeofBytes());
uint64_t memSizeThresh = MEM_SIZE;
if(handleBytes > inputSizeThresh || handleBytes > memSizeThresh) { // XXX ok?
printf("Handle mbytes %ld Input size: %ld input size thresh: %ld mbytes mem size thresh: %ld\n",
handleBytes / (1024*1024), *h->input_size, inputSizeThresh / (1024*1024), memSizeThresh / (1024*1024));
printf("Handle mbytes %ld (%ld) Input size: %ld input size thresh: %ld mbytes mem size thresh: %ld\n",
handleBytes / (1024*1024), h->scratch_handle->size(), *h->input_size, inputSizeThresh / (1024*1024), memSizeThresh / (1024*1024));
TlsmTableFlush<PAGELAYOUT>(h);
}
}
template<class PAGELAYOUT>
inline typename PAGELAYOUT::FMT::TUP *
getRecordHelper(int xid, recordid r,
typename PAGELAYOUT::FMT::TUP& val,
typename PAGELAYOUT::FMT::TUP& scratch,
byte *arry) {
if(r.size == -1) {
DEBUG("no tree\n");
return 0;
}
pageid_t pid = TlsmFindPage(xid, r, arry);
if(pid == -1) {
DEBUG("no page\n");
return 0;
}
Page * p = loadPage(xid,pid);
typename PAGELAYOUT::FMT * f =
(typename PAGELAYOUT::FMT*)p->impl;
typename PAGELAYOUT::FMT::TUP * ret =
f->recordFind(xid, val, scratch);
if(!ret) {
DEBUG("not in tree");
}
releasePage(p);
return ret;
}
template<class PAGELAYOUT>
const typename PAGELAYOUT::FMT::TUP *
TlsmTableFind(int xid, lsmTableHandle<PAGELAYOUT> *h,
typename PAGELAYOUT::FMT::TUP &val,
typename PAGELAYOUT::FMT::TUP &scratch) {
pthread_mutex_lock(h->mut);
typename std::set
<typename PAGELAYOUT::FMT::TUP,
typename PAGELAYOUT::FMT::TUP::stl_cmp>::iterator i =
h->scratch_handle->find(val);
if(i != h->scratch_handle->end()) {
scratch = *i;
pthread_mutex_unlock(h->mut);
return &scratch;
}
DEBUG("Not in scratch_handle\n");
if(*h->args2->in_tree) {
i = (**h->args2->in_tree)->find(val);
if(i != (**h->args2->in_tree)->end()) {
scratch = *i;
pthread_mutex_unlock(h->mut);
return &scratch;
}
}
DEBUG("Not in first in_tree\n");
// need to check LSM trees.
byte * arry = val.toByteArray();
typename PAGELAYOUT::FMT::TUP * r = 0;
r = getRecordHelper<PAGELAYOUT>(xid, h->args2->my_tree->r_, val, scratch, arry);
if(r) { pthread_mutex_unlock(h->mut); return r; }
DEBUG("Not in first my_tree {%lld}\n", h->args2->my_tree->r_.size);
if(*h->args1->in_tree) {
r = getRecordHelper<PAGELAYOUT>(xid, (**h->args1->in_tree)->r_, val, scratch, arry);
if(r) { pthread_mutex_unlock(h->mut); return r; }
} else {
DEBUG("no tree");
}
DEBUG("Not in second in_tree\n");
if(h->args1->my_tree) {
r = getRecordHelper<PAGELAYOUT>(xid, h->args1->my_tree->r_, val, scratch, arry);
if(r) { pthread_mutex_unlock(h->mut); return r; }
} else {
DEBUG("no tree");
}
DEBUG("Not in any tree\n");
assert(r == 0);
return r;
}
}

View file

@ -604,8 +604,8 @@ static pageid_t lsmLookup(int xid, Page *node, int depth,
}
} else {
if(prev_cmp_key == 0) {
// XXX Doesn't handle runs of duplicates.
if(prev_cmp_key <= 0 && rec_cmp_key > 0) {
return prev->ptr;
}
}
@ -615,7 +615,7 @@ static pageid_t lsmLookup(int xid, Page *node, int depth,
}
if(depth) {
// this handles the rhs of the tree.
if(prev_cmp_key <= 0) {
pageid_t child_id = prev->ptr;
Page *child_page = loadPage(xid, child_id);
@ -625,13 +625,10 @@ static pageid_t lsmLookup(int xid, Page *node, int depth,
releasePage(child_page);
return ret;
}
} else {
if(prev_cmp_key == 0) {
if(prev_cmp_key <= 0) {
return prev->ptr;
}
}
return -1;
}

View file

@ -142,9 +142,12 @@ pageid_t compressData(ITER * const begin, ITER * const end,
p->dirty = 1;
mc->pack();
releasePage(p);
--(*end);
if(i != *end) {
// XXX this used to work by decrementing i, and then running
// through again. That failed when i was right before end.
// figure out why it was broken, fix the iterators (?), and write
// a test case for this situation...
// --(*end);
// if(i != *end) {
next_page = pageAlloc(-1,pageAllocState);
p = loadPage(-1, next_page);
@ -154,12 +157,14 @@ pageid_t compressData(ITER * const begin, ITER * const end,
TlsmAppendPage(-1,tree,toByteArray(&i),pageAlloc,pageAllocState,p->id);
}
pageCount++;
lastEmpty = 0;
} else {
lastEmpty = 1;
}
++(*end);
--i;
ret = mc->append(-1, *i);
assert(ret != rose::NOSPACE);
// lastEmpty = 0;
// } else {
// lastEmpty = 1;
// }
// ++(*end);
// --i;
}
}

View file

@ -37,10 +37,10 @@ For<TYPE>::append(int xid, const TYPE dat,
*(((TYPE*)(&exceptions[*except]))-1) = dat;
// Allocate the delta and the exception (if possible)
*free_bytes -= sizeof(TYPE) + sizeof(delta_t);
(*free_bytes) -= (sizeof(TYPE) + sizeof(delta_t));
int incr = *free_bytes >= 0;
*numdeltas_ptr() += incr;
*except -= incr * sizeof(TYPE);
(*numdeltas_ptr()) += incr;
(*except) -= incr * sizeof(TYPE);
/* This does the same thing as the last few lines, but with a branch. It's
marginally slower:
@ -58,8 +58,8 @@ For<TYPE>::append(int xid, const TYPE dat,
*next_delta_ptr() = (delta_t) delta;
// Allocate space for it, if possible
*free_bytes -= sizeof(delta_t);
*numdeltas_ptr() += *free_bytes >= 0;
(*free_bytes) -= sizeof(delta_t);
(*numdeltas_ptr()) += (*free_bytes >= 0);
}
return *numdeltas_ptr() - 1;
@ -82,6 +82,50 @@ For<TYPE>::recordRead(int xid, slot_index_t slot, byte *exceptions,
return scratch;
}
}
template <class TYPE>
inline std::pair<slot_index_t,slot_index_t>*
For<TYPE>::recordFind(int xid, slot_index_t start, slot_index_t stop,
byte *exceptions, TYPE value,
std::pair<slot_index_t,slot_index_t>& scratch) {
std::pair<slot_index_t,slot_index_t>* ret = 0;
delta_t delta = value - *base_ptr();
slot_index_t i;
for(i = start; i < stop; i++) {
delta_t d = *nth_delta_ptr(i);
if(d >= 0) {
if(d == delta) {
scratch.first = i;
scratch.second = stop;
ret = &scratch;
i++;
break;
}
} else {
if(value == *(TYPE*)(exceptions + d + PAGE_SIZE - sizeof(TYPE))) {
scratch.first = i;
scratch.second = stop;
ret = &scratch;
i++;
break;
}
}
}
for(;i < stop; i++) {
delta_t d = *nth_delta_ptr(i);
if(d >= 0) {
if(d != delta) {
scratch.second = i;
break;
}
} else {
if(value != *(TYPE*)(exceptions +d + PAGE_SIZE - sizeof(TYPE))) {
scratch.second = i;
break;
}
}
}
assert(ret); //XXX
return ret;
}
} // namespace rose
#endif // _ROSE_COMPRESSION_FOR_IMPL_H__

View file

@ -1,6 +1,12 @@
#ifndef _ROSE_COMPRESSION_FOR_H__
#define _ROSE_COMPRESSION_FOR_H__
#undef try
#undef catch
#undef end
#include <algorithm>
// Copyright 2007 Google Inc. All Rights Reserved.
// Author: sears@google.com (Rusty Sears)
@ -98,6 +104,16 @@ class For {
*/
inline TYPE *recordRead(int xid, slot_index_t slot, byte *exceptions,
TYPE * buf);
/**
Find the offset of a compressed value, assuming it falls within
the given range.
@return NULL if the value is not found.
*/
inline std::pair<slot_index_t,slot_index_t>*
recordFind(int xid, slot_index_t start, slot_index_t stop,
byte *exceptions, TYPE value,
std::pair<slot_index_t,slot_index_t>& scratch);
/**
This constructor initializes a new FOR region.
@ -109,6 +125,10 @@ class For {
};
For(void * mem): mem_(mem) { }
inline slot_index_t recordCount(int xid) {
return *numdeltas_ptr();
}
For() : mem_(0) {}
/**
@return the length of the FOR region, in bytes

View file

@ -99,28 +99,31 @@ void Multicolumn<TUPLE>::pack() {
template <class TUPLE>
Multicolumn<TUPLE>::~Multicolumn() {
byte_off_t first_free = 0;
byte_off_t last_free = (intptr_t)(first_header_byte_ptr() - p_->memAddr);
if(unpacked_) {
*exceptions_len_ptr() = USABLE_SIZE_OF_PAGE - first_exception_byte_;
last_free -= *exceptions_len_ptr();
// XXX this is doing the wrong thing; it should be freeing memory, and
// doing nothing else; instead, it's pack()ing the page and leaking
// space.
//byte_off_t first_free = 0;
//byte_off_t last_free = (intptr_t)(first_header_byte_ptr() - p_->memAddr);
// if(unpacked_) {
//*exceptions_len_ptr() = USABLE_SIZE_OF_PAGE - first_exception_byte_;
//last_free -= *exceptions_len_ptr();
*exceptions_offset_ptr() = last_free;
memcpy(&(p_->memAddr[*exceptions_offset_ptr()]),
exceptions_ + first_exception_byte_, *exceptions_len_ptr());
//*exceptions_offset_ptr() = last_free;
//memcpy(&(p_->memAddr[*exceptions_offset_ptr()]),
//exceptions_ + first_exception_byte_, *exceptions_len_ptr());
for(int i = 0; i < *column_count_ptr(); i++) {
*column_offset_ptr(i) = first_free;
for(int i = 0; i < *column_count_ptr(); i++) {
//*column_offset_ptr(i) = first_free;
byte_off_t bytes_used = dispatcher_.bytes_used(i);
memcpy(column_base_ptr(i), columns_[i], bytes_used);
first_free += bytes_used;
assert(first_free <= last_free);
delete [] columns_[i];
}
delete [] exceptions_;
//byte_off_t bytes_used = dispatcher_.bytes_used(i);
//memcpy(column_base_ptr(i), columns_[i], bytes_used);
//first_free += bytes_used;
//assert(first_free <= last_free);
if(unpacked_) delete [] columns_[i];
}
if(unpacked_) delete [] exceptions_;
delete [] columns_;
}

View file

@ -80,9 +80,6 @@ namespace rose {
init_num++;
}
static inline FORMAT * initPage(Page *p, const typename FORMAT::TUP * t) {
const column_number_t column_count = t->column_count();
//plugin_id_t pluginid = plugin_id<FORMAT, COMPRESSOR, typename COMPRESSOR::TYP>();
plugin_id_t plugins[N];
if(0 < N) plugins[0] = plugin_id<FORMAT, typename FORMAT::CMP0, typename FORMAT::CMP0::TYP>();
@ -96,14 +93,8 @@ namespace rose {
if(8 < N) plugins[8] = plugin_id<FORMAT, typename FORMAT::CMP8, typename FORMAT::CMP8::TYP>();
if(9 < N) plugins[9] = plugin_id<FORMAT, typename FORMAT::CMP9, typename FORMAT::CMP9::TYP>();
FORMAT * f = new FORMAT(-1,p);//N,plugins);
FORMAT * f = new FORMAT(-1,p);
//plugin_id_t * plugins = (plugin_id_t*)malloc(column_count * sizeof(plugin_id_t));
//for(column_number_t c = 0; c < column_count; c++) {
//plugins[c] = pluginid;
//}
// FORMAT * f = new FORMAT(-1,p,column_count,plugins);
if(0 < N) f->compressor0()->offset(*t->get0());
if(1 < N) f->compressor1()->offset(*t->get1());
if(2 < N) f->compressor2()->offset(*t->get2());
@ -115,12 +106,6 @@ namespace rose {
if(8 < N) f->compressor8()->offset(*t->get8());
if(9 < N) f->compressor9()->offset(*t->get9());
/*for(column_number_t c = 0; c < column_count; c++) {
COMPRESSOR* com = (COMPRESSOR*) f->compressor(c);
typename COMPRESSOR::TYP val = *(typename COMPRESSOR::TYP*)(t->get(c));
com->offset(val);
}*/
// free(plugins);
return f;
}
static inline int cmp_id() {

View file

@ -5,6 +5,7 @@
// Author: sears@google.com (Rusty Sears)
#include <assert.h>
#include <algorithm>
#include "rle.h"
@ -31,7 +32,7 @@ Rle<TYPE>::append(int xid, const TYPE dat,
// this key is not the same as the last one, or
// the block is full
*free_bytes -= sizeof(triple_t);
(*free_bytes) -= sizeof(triple_t);
// Write the changes in our overrun space
triple_t *n = new_block_ptr();
@ -70,7 +71,31 @@ Rle<TYPE>::recordRead(int xid, slot_index_t slot, byte* exceptions,
} while (n < *block_count_ptr());
return 0;
}
template <class TYPE>
inline std::pair<slot_index_t,slot_index_t>*
Rle<TYPE>::recordFind(int xid, slot_index_t start, slot_index_t stop,
byte *exceptions, TYPE value,
std::pair<slot_index_t,slot_index_t>& scratch) {
// TYPE v = nth_block_ptr(last_)->value <= value ? lastV_ : 0;
block_index_t n = 0;
std::pair<slot_index_t,slot_index_t>* ret = 0;
do {
triple_t * t = nth_block_ptr(n);
if(t->data >= value) {
scratch.first = t->index;
do {
scratch.second = t->index + t->copies;
n++;
t = nth_block_ptr(n);
} while(n < *block_count_ptr() && t->data == value);
ret = &scratch;
break;
}
n++;
} while (n < *block_count_ptr());
assert(ret); //XXX
return ret;
}
} // namespace rose
#endif // _ROSE_COMPRESSION_RLE_IMPL_H__

View file

@ -39,6 +39,11 @@ class Rle {
/** @see For::recordRead */
inline TYPE *recordRead(int xid, slot_index_t slot, byte *exceptions,
TYPE *scratch);
/** @see For::recordFind */
inline std::pair<slot_index_t,slot_index_t>*
recordFind(int xid, slot_index_t start, slot_index_t stop,
byte *exceptions, TYPE value,
std::pair<slot_index_t,slot_index_t>& scratch);
/**
This constructor initializes a new Rle region.
@ -51,6 +56,10 @@ class Rle {
n->copies = 0;
n->data = 0;
}
inline slot_index_t recordCount(int xid) {
triple_t *n = last_block_ptr();
return (n->index) + (n->copies);
}
/**
This constructor is called when existing RLE data is read from
disk.

View file

@ -102,20 +102,12 @@ class StaticMulticolumn {
bytes_left_ = first_header_byte_ptr()- p->memAddr;
/* for(int i = 0; i < N; i++) {
*column_plugin_id_ptr(i) = plugins[i];
dispatcher_.set_plugin(columns_[i],i,plugins[i]);
dispatcher_.init_mem(columns_[i],i);
bytes_left_ -= dispatcher_.bytes_used(i);
} */
#define STATIC_MC_INIT(i,typ,cmp) \
if(i < N) { \
*column_plugin_id_ptr(i) = cmp->PLUGIN_ID; \
columns_[i] = new byte[USABLE_SIZE_OF_PAGE]; \
/* if(plugin0) delete plugin0; */ \
cmp = new typ(xid,columns_[i]); \
cmp = new typ(xid,(void*)columns_[i]); \
cmp->init_mem(columns_[i]); \
*column_plugin_id_ptr(i) = cmp->PLUGIN_ID; \
bytes_left_ -= cmp->bytes_used(); \
}
@ -131,24 +123,11 @@ class StaticMulticolumn {
}
~StaticMulticolumn() {
byte_off_t first_free = 0;
byte_off_t last_free = (intptr_t)(first_header_byte_ptr() - p_->memAddr);
if(unpacked_) {
*exceptions_len_ptr() = USABLE_SIZE_OF_PAGE - first_exception_byte_;
last_free -= *exceptions_len_ptr();
*exceptions_offset_ptr() = last_free;
memcpy(&(p_->memAddr[*exceptions_offset_ptr()]),
exceptions_ + first_exception_byte_, *exceptions_len_ptr());
#define STATIC_MC_DEINIT(i,plug) \
if(i < N) { \
*column_offset_ptr(i) = first_free; \
byte_off_t bytes_used = plug->bytes_used(); \
memcpy(column_base_ptr(i), columns_[i], bytes_used); \
first_free += bytes_used; \
assert(first_free <= last_free); \
delete [] columns_[i]; \
if(unpacked_) delete [] columns_[i]; \
delete plug; \
}
STATIC_MC_DEINIT(0,plugin0);
@ -162,8 +141,8 @@ class StaticMulticolumn {
STATIC_MC_DEINIT(8,plugin8);
STATIC_MC_DEINIT(9,plugin9);
delete [] exceptions_;
}
if(unpacked_) delete [] exceptions_;
}
/**
@ -193,27 +172,47 @@ class StaticMulticolumn {
inline slot_index_t append(int xid, TUPLE const & dat) {
slot_index_t ret = 0;
slot_index_t newret = 0;
if(0 < N) ret = plugin0->append(xid, *dat.get0(),&first_exception_byte_,
exceptions_, &bytes_left_);
// if(bytes_left_ >= 0) {
if(1 < N) newret = plugin1->append(xid, *dat.get1(),&first_exception_byte_,
exceptions_, &bytes_left_);
// if(bytes_left_ >= 0) {
// assert(newret == ret);
if(2 < N) newret = plugin2->append(xid, *dat.get2(),&first_exception_byte_,
exceptions_, &bytes_left_);
// if(bytes_left_ >= 0) {
// assert(newret == ret);
if(3 < N) newret = plugin3->append(xid, *dat.get3(),&first_exception_byte_,
exceptions_, &bytes_left_);
// if(bytes_left_ >= 0) {
// assert(newret == ret);
if(4 < N) newret = plugin4->append(xid, *dat.get4(),&first_exception_byte_,
exceptions_, &bytes_left_);
// if(bytes_left_ >= 0) {
// assert(newret == ret);
if(5 < N) newret = plugin5->append(xid, *dat.get5(),&first_exception_byte_,
exceptions_, &bytes_left_);
// if(bytes_left_ >= 0) {
// assert(newret == ret);
if(6 < N) newret = plugin6->append(xid, *dat.get6(),&first_exception_byte_,
exceptions_, &bytes_left_);
// if(bytes_left_ >= 0) {
// assert(newret == ret);
if(7 < N) newret = plugin7->append(xid, *dat.get7(),&first_exception_byte_,
exceptions_, &bytes_left_);
if(1 < N) ret = plugin1->append(xid, *dat.get1(),&first_exception_byte_,
// if(bytes_left_ >= 0) {
// assert(newret == ret);
if(8 < N) newret = plugin8->append(xid, *dat.get8(),&first_exception_byte_,
exceptions_, &bytes_left_);
if(2 < N) ret = plugin2->append(xid, *dat.get2(),&first_exception_byte_,
// if(bytes_left_ >= 0) {
// assert(newret == ret);
if(9 < N) newret = plugin9->append(xid, *dat.get9(),&first_exception_byte_,
exceptions_, &bytes_left_);
if(3 < N) ret = plugin3->append(xid, *dat.get3(),&first_exception_byte_,
exceptions_, &bytes_left_);
if(4 < N) ret = plugin4->append(xid, *dat.get4(),&first_exception_byte_,
exceptions_, &bytes_left_);
if(5 < N) ret = plugin5->append(xid, *dat.get5(),&first_exception_byte_,
exceptions_, &bytes_left_);
if(6 < N) ret = plugin6->append(xid, *dat.get6(),&first_exception_byte_,
exceptions_, &bytes_left_);
if(7 < N) ret = plugin7->append(xid, *dat.get7(),&first_exception_byte_,
exceptions_, &bytes_left_);
if(8 < N) ret = plugin8->append(xid, *dat.get8(),&first_exception_byte_,
exceptions_, &bytes_left_);
if(9 < N) ret = plugin9->append(xid, *dat.get9(),&first_exception_byte_,
exceptions_, &bytes_left_);
return bytes_left_ < 0 ? NOSPACE : ret;
// }}}}}}}}}
assert(N == 1 || bytes_left_ < 0 || newret == ret);
return (bytes_left_ < 0) ? NOSPACE : ret;
}
inline TUPLE * recordRead(int xid, slot_index_t slot, TUPLE * buf) {
bool ret = 1;
@ -228,6 +227,78 @@ class StaticMulticolumn {
if(8 < N) ret = plugin8->recordRead(xid,slot,exceptions_,const_cast<typename TUP::TYP8*>(buf->get8())) ? ret : 0;
if(9 < N) ret = plugin9->recordRead(xid,slot,exceptions_,const_cast<typename TUP::TYP9*>(buf->get9())) ? ret : 0;
return ret ? buf : 0;
}
inline slot_index_t recordCount(int xid) {
slot_index_t recordCount;
slot_index_t c;
// XXX memoize this function
if(0 < N) recordCount = plugin0->recordCount(xid);
if(1 < N) { c = plugin1->recordCount(xid); recordCount = recordCount > c ? c :recordCount; }
if(2 < N) { c = plugin2->recordCount(xid); recordCount = recordCount > c ? c :recordCount; }
if(3 < N) { c = plugin3->recordCount(xid); recordCount = recordCount > c ? c :recordCount; }
if(4 < N) { c = plugin4->recordCount(xid); recordCount = recordCount > c ? c :recordCount; }
if(5 < N) { c = plugin5->recordCount(xid); recordCount = recordCount > c ? c :recordCount; }
if(6 < N) { c = plugin6->recordCount(xid); recordCount = recordCount > c ? c :recordCount; }
if(7 < N) { c = plugin7->recordCount(xid); recordCount = recordCount > c ? c :recordCount; }
if(8 < N) { c = plugin8->recordCount(xid); recordCount = recordCount > c ? c :recordCount; }
if(9 < N) { c = plugin9->recordCount(xid); recordCount = recordCount > c ? c :recordCount; }
return recordCount;
}
/* inline slot_index_t recordCount(int xid) {
if(1 == N) return plugin0->recordCount(xid);
if(2 == N) return plugin1->recordCount(xid);
if(3 == N) return plugin2->recordCount(xid);
if(4 == N) return plugin3->recordCount(xid);
if(5 == N) return plugin4->recordCount(xid);
if(6 == N) return plugin5->recordCount(xid);
if(7 == N) return plugin6->recordCount(xid);
if(8 == N) return plugin7->recordCount(xid);
if(9 == N) return plugin8->recordCount(xid);
if(10 == N) return plugin9->recordCount(xid);
abort();
} */
inline TUPLE * recordFind(int xid, TUPLE& val, TUPLE& scratch) {
std::pair<slot_index_t,slot_index_t> pair_scratch;
std::pair<slot_index_t,slot_index_t> * ret;
// printf("static multiclumn record find\n"); fflush(stdout);
if(0 < N) ret = plugin0->recordFind(xid, 0, recordCount(xid),
exceptions_, *val.get0(), pair_scratch);
//assert(ret);
if(1 < N) if(ret) ret = plugin1->recordFind(xid, ret->first, ret->second,
exceptions_, *val.get1(), pair_scratch);
//assert(ret);
if(2 < N) if(ret) ret = plugin2->recordFind(xid, ret->first, ret->second,
exceptions_, *val.get2(), pair_scratch);
//assert(ret);
if(3 < N) if(ret) ret = plugin3->recordFind(xid, ret->first, ret->second,
exceptions_, *val.get3(), pair_scratch);
//assert(ret);
if(4 < N) if(ret) ret = plugin4->recordFind(xid, ret->first, ret->second,
exceptions_, *val.get4(), pair_scratch);
//assert(ret);
if(5 < N) if(ret) ret = plugin5->recordFind(xid, ret->first, ret->second,
exceptions_, *val.get5(), pair_scratch);
//assert(ret);
if(6 < N) if(ret) ret = plugin6->recordFind(xid, ret->first, ret->second,
exceptions_, *val.get6(), pair_scratch);
//assert(ret);
if(7 < N) if(ret) ret = plugin7->recordFind(xid, ret->first, ret->second,
exceptions_, *val.get7(), pair_scratch);
//assert(ret);
if(8 < N) if(ret) ret = plugin8->recordFind(xid, ret->first, ret->second,
exceptions_, *val.get8(), pair_scratch);
//assert(ret);
if(9 < N) if(ret) ret = plugin9->recordFind(xid, ret->first, ret->second,
exceptions_, *val.get9(), pair_scratch);
//assert(ret);
if(ret) {
// XXX slow, doesn't return whole range...
recordRead(xid, ret->first, &scratch);
return &scratch;
} else {
return 0;
}
}
inline void pack() {
byte_off_t first_free = 0;
@ -281,16 +352,15 @@ class StaticMulticolumn {
StaticMulticolumn(Page * p) :
p_(p),
first_exception_byte_(USABLE_SIZE_OF_PAGE - *exceptions_len_ptr()),
exceptions_(p_->memAddr + *exceptions_offset_ptr()),
exceptions_(p_->memAddr + *exceptions_offset_ptr()),
unpacked_(0) {
byte_off_t first_free = 0;
assert(N == *column_count_ptr());
#define STATIC_MC_INIT(i,plug,cmp) \
#define STATIC_MC_INIT(i,plug,cmp) \
if(i < N) { \
/*byte * page_column_ptr = p_->memAddr + *column_offset_ptr(i);*/ \
columns_[i] = p_->memAddr + *column_offset_ptr(i); \
plug = new cmp((void*)columns_[i]); \
plug = new cmp((void*)columns_[i]); \
first_free = *column_offset_ptr(i) + plug->bytes_used(); \
}
@ -431,6 +501,7 @@ template <int N, class TUPLE,
class COMP0, class COMP1, class COMP2, class COMP3, class COMP4,
class COMP5, class COMP6, class COMP7, class COMP8, class COMP9>
static void staticMulticolumnCleanup(Page *p) {
// printf("cleanup %d\n", N); fflush(stdout);
delete (StaticMulticolumn<N,TUPLE,COMP0,COMP1,COMP2,COMP3,COMP4,COMP5,COMP6,COMP7,COMP8,COMP9>*)p->impl;
p->impl = 0;
}

View file

@ -25,7 +25,7 @@ namespace rose {
explicit inline StaticTuple() {
s.flag_ = NORMAL;
s.flag_ = NORMAL; s.epoch_ = 0 ;
initializePointers();
}
explicit inline StaticTuple(const StaticTuple& t) {
@ -47,34 +47,38 @@ namespace rose {
inline ~StaticTuple() { }
static inline byte_off_t sizeofBytes() {
return sizeof(flag_t) + sizeof(epoch_t) +
((0 < N) ? sizeof(TYPE0) : 0) +
((1 < N) ? sizeof(TYPE1) : 0) +
((2 < N) ? sizeof(TYPE2) : 0) +
((3 < N) ? sizeof(TYPE3) : 0) +
((4 < N) ? sizeof(TYPE4) : 0) +
((5 < N) ? sizeof(TYPE5) : 0) +
((6 < N) ? sizeof(TYPE6) : 0) +
((7 < N) ? sizeof(TYPE7) : 0) +
((8 < N) ? sizeof(TYPE8) : 0) +
((9 < N) ? sizeof(TYPE9) : 0) ;
// Computing by starting from zero, and adding up column costs wouldn't
// take struct padding into account. This might over-estimate the
// size, but that's fine, since any in-memory copy will either be malloced
// according to what we say here, or will be an actual st struct.
return sizeof(st) -
((0 >= N) ? sizeof(TYPE0) : 0) -
((1 >= N) ? sizeof(TYPE1) : 0) -
((2 >= N) ? sizeof(TYPE2) : 0) -
((3 >= N) ? sizeof(TYPE3) : 0) -
((4 >= N) ? sizeof(TYPE4) : 0) -
((5 >= N) ? sizeof(TYPE5) : 0) -
((6 >= N) ? sizeof(TYPE6) : 0) -
((7 >= N) ? sizeof(TYPE7) : 0) -
((8 >= N) ? sizeof(TYPE8) : 0) -
((9 >= N) ? sizeof(TYPE9) : 0) ;
}
inline void* set(column_number_t col, void* val) {
/* inline void* set(column_number_t col, void* val) {
memcpy(((byte*)&s)+cols_[col],val,size_[col]);
return(((byte*)&s)+cols_[col]);
}
} */
inline TYPE0 * set0(TYPE0* val) { s.cols0_=*val; }
inline TYPE1 * set1(TYPE1* val) { s.cols1_=*val; }
inline TYPE2 * set2(TYPE2* val) { s.cols2_=*val; }
inline TYPE3 * set3(TYPE3* val) { s.cols3_=*val; }
inline TYPE4 * set4(TYPE4* val) { s.cols4_=*val; }
inline TYPE5 * set5(TYPE5* val) { s.cols5_=*val; }
inline TYPE6 * set6(TYPE6* val) { s.cols6_=*val; }
inline TYPE7 * set7(TYPE7* val) { s.cols7_=*val; }
inline TYPE8 * set8(TYPE8* val) { s.cols8_=*val; }
inline TYPE9 * set9(TYPE9* val) { s.cols9_=*val; }
inline void set0(TYPE0* val) { s.cols0_=*val; }
inline void set1(TYPE1* val) { s.cols1_=*val; }
inline void set2(TYPE2* val) { s.cols2_=*val; }
inline void set3(TYPE3* val) { s.cols3_=*val; }
inline void set4(TYPE4* val) { s.cols4_=*val; }
inline void set5(TYPE5* val) { s.cols5_=*val; }
inline void set6(TYPE6* val) { s.cols6_=*val; }
inline void set7(TYPE7* val) { s.cols7_=*val; }
inline void set8(TYPE8* val) { s.cols8_=*val; }
inline void set9(TYPE9* val) { s.cols9_=*val; }
inline const TYPE0 * get0() const { return &s.cols0_; }
inline const TYPE1 * get1() const { return &s.cols1_; }
@ -90,11 +94,11 @@ namespace rose {
/* inline void* get(column_number_t col) const {
return ((byte*)&s) + cols_[col];
} */
inline column_number_t column_count() const { return N; }
//inline column_number_t column_count() const { return N; }
inline byte_off_t column_len(column_number_t col) const {
/* inline byte_off_t column_len(column_number_t col) const {
return size_[col];
}
} */
inline byte* toByteArray() const {
return (byte*)&s;
}
@ -154,7 +158,88 @@ namespace rose {
return 0;
}
static void printSt(void const * const sp) {
st const * const s = (st const * const)sp;
printf("(");
if(0<N) printf("%lld",(int64_t)s->cols0_);
if(1<N) printf(", %lld",(int64_t)s->cols1_);
if(2<N) printf(", %lld",(int64_t)s->cols2_);
if(3<N) printf(", %lld",(int64_t)s->cols3_);
if(4<N) printf(", %lld",(int64_t)s->cols4_);
if(5<N) printf(", %lld",(int64_t)s->cols5_);
if(6<N) printf(", %lld",(int64_t)s->cols6_);
if(7<N) printf(", %lld",(int64_t)s->cols7_);
if(8<N) printf(", %lld",(int64_t)s->cols8_);
if(9<N) printf(", %lld",(int64_t)s->cols9_);
printf(")");
}
static inline int noisycmp(const void *ap, const void *bp) {
st const * const a = (st const * const)ap;
st const * const b = (st const * const)bp;
int ret = cmp(ap,bp);
printSt(a); printf(" cmp "); printSt(b); printf(" = %d", ret); printf("\n");
return ret;
}
static inline int cmp(const void *ap, const void *bp) {
st const * const a = (st const * const)ap;
st const * const b = (st const * const)bp;
if(0<N) {
if(a->cols0_ < b->cols0_) return -1;
if(a->cols0_ != b->cols0_) return 1;
DEBUG("0 matched\n");;
}
if(1<N) {
if(a->cols1_ < b->cols1_) return -1;
if(a->cols1_ != b->cols1_) return 1;
DEBUG("1 matched\n");
}
if(2<N) {
if(a->cols2_ < b->cols2_) return -1;
if(a->cols2_ != b->cols2_) return 1;
DEBUG("2 matched\n");
}
if(3<N) {
if(a->cols3_ < b->cols3_) return -1;
if(a->cols3_ != b->cols3_) return 1;
DEBUG("3 matched\n");
}
if(4<N) {
if(a->cols4_ < b->cols4_) return -1;
if(a->cols4_ != b->cols4_) return 1;
DEBUG("4 matched\n");
}
if(5<N) {
if(a->cols5_ < b->cols5_) return -1;
if(a->cols5_ != b->cols5_) return 1;
DEBUG("5 matched\n");
}
if(6<N) {
if(a->cols6_ < b->cols6_) return -1;
if(a->cols6_ != b->cols6_) return 1;
DEBUG("6 matched\n");
}
if(7<N) {
if(a->cols7_ < b->cols7_) return -1;
if(a->cols7_ != b->cols7_) return 1;
DEBUG("7 matched\n");
}
if(8<N) {
if(a->cols8_ < b->cols8_) return -1;
if(a->cols8_ != b->cols8_) return 1;
DEBUG("8 matched\n");
}
if(9<N) {
if(a->cols9_ < b->cols9_) return -1;
if(a->cols9_ != b->cols9_) return 1;
DEBUG("9 matched\n");
}
DEBUG("N matched\n");
return 0;
}
/* static inline int cmp(const void *ap, const void *bp) {
const StaticTuple * a = (const StaticTuple*)ap;
const StaticTuple * b = (const StaticTuple*)bp;
if(*a < *b) {
@ -167,8 +252,7 @@ namespace rose {
} else {
return 1;
}
}
} */
struct stl_cmp
{
@ -231,6 +315,8 @@ namespace rose {
typedef char flag_t;
typedef unsigned int epoch_t;
typedef struct {
flag_t flag_;
epoch_t epoch_;
TYPE0 cols0_;
TYPE1 cols1_;
TYPE2 cols2_;
@ -241,8 +327,6 @@ namespace rose {
TYPE7 cols7_;
TYPE8 cols8_;
TYPE9 cols9_;
flag_t flag_ : 1;
epoch_t epoch_ : 31;
} st;
st s;

View file

@ -100,9 +100,9 @@ extern int errno;
typedef unsigned char byte;
//@todo lsn_t should be unsigned.
// If it were unsigned, it could be typedef'ed from size_t.
typedef long long lsn_t;
typedef int64_t lsn_t;
#define LSN_T_MAX INT64_MAX
typedef long long pageid_t;
typedef int64_t pageid_t;
#define PAGEID_T_MAX INT64_MAX
/*#define DEBUGGING */