Added support for tombstones and rudimentary versioning.

workload 2 is broken at the moment.
This commit is contained in:
Sears Russell 2008-03-13 01:16:37 +00:00
parent fbf5544853
commit 0c7d45f788
7 changed files with 435 additions and 67 deletions

View file

@ -58,7 +58,7 @@ namespace rose {
Tcommit(xid);
lsmTableHandle<PAGELAYOUT>* h = TlsmTableStart<PAGELAYOUT>(lsmTable);
lsmTableHandle<PAGELAYOUT>* h = TlsmTableStart<PAGELAYOUT>(lsmTable, INVALID_COL);
typename PAGELAYOUT::FMT::TUP t;
typename PAGELAYOUT::FMT::TUP s;

View file

@ -1,20 +1,36 @@
//#define LEAK_TEST
// XID_COL is 0 indexed.
#define XID_COL 4
#include "roseTableTpcCH.h"
#include "stasis/page/compression/compression.h"
int main(int argc, char **argv) {
typedef int32_t typ0;
typedef int16_t typ1;
typedef int8_t typ2;
typedef int16_t typ1;//typedef int16_t typ1;
typedef int8_t typ2;
typedef int8_t typ3;
typedef int32_t typ4;
typedef int32_t typ5;
// typedef int8_t typ2;
// typedef int8_t typ3;
typedef rose::epoch_t typ4;
typedef int8_t typ5;
typedef int8_t typ6;
typedef int32_t typ7;
typedef int8_t typ8;
typedef int8_t typ9;
/* typedef int64_t typ0;
typedef int64_t typ1;
typedef int64_t typ2;
typedef int64_t typ3;
typedef int64_t typ4;
typedef int64_t typ5;
typedef int64_t typ6;
typedef int64_t typ7;
typedef int64_t typ8;
typedef int64_t typ9; */
/* typedef int32_t typ10;
typedef int16_t typ11;
@ -36,49 +52,52 @@ int main(int argc, char **argv) {
typedef int64_t typ17;
typedef int64_t typ18;
typedef int64_t typ19;
#define COLS 20
typedef rose::StaticTuple<COLS,typ0,typ1,typ2,typ3,typ4,typ5,typ6,typ7,typ8,typ9,typ10,typ11,typ12,typ13,typ14,typ15,typ16,typ17,typ18,typ19> tup;
#define COLS 7
typedef rose::StaticTuple<COLS,typ0,typ1,typ2,typ3,typ4,typ5,typ6,typ7
/*,typ6,typ7,typ8,typ9,typ10,typ11,typ12,typ13,typ14,typ15,typ16,typ17,
typ18,typ19*/ > tup;
using rose::For;
using rose::Rle;
using rose::Nop;
int ret;
// multicolumn is deprecated; want static dispatch!
/*
rose::plugin_id_t * plugins = (rose::plugin_id_t*)malloc(COLS * sizeof(rose::plugin_id_t));
// todo try Rle / For
plugins[0] = rose::plugin_id<rose::Multicolumn<tup>, Rle<typ0>, typ0>();
plugins[1] = rose::plugin_id<rose::Multicolumn<tup>, Rle<typ1>, typ1>(); // rle
plugins[2] = rose::plugin_id<rose::Multicolumn<tup>, Rle<typ2>, typ2>();
plugins[3] = rose::plugin_id<rose::Multicolumn<tup>, Rle<typ3>, typ3>();
plugins[2] = rose::plugin_id<rose::Multicolumn<tup>, Nop<typ2>, typ2>();
plugins[3] = rose::plugin_id<rose::Multicolumn<tup>, Nop<typ3>, typ3>();
// todo try nop / for
plugins[4] = rose::plugin_id<rose::Multicolumn<tup>, For<typ4>, typ4>(); // rle
plugins[5] = rose::plugin_id<rose::Multicolumn<tup>, Rle<typ5>, typ5>();
plugins[6] = rose::plugin_id<rose::Multicolumn<tup>, Nop<typ6>, typ6>();
// todo try nop
plugins[7] = rose::plugin_id<rose::Multicolumn<tup>, For<typ7>, typ7>(); // for
plugins[8] = rose::plugin_id<rose::Multicolumn<tup>, Rle<typ8>, typ8>();
plugins[9] = rose::plugin_id<rose::Multicolumn<tup>, Rle<typ9>, typ9>();
plugins[4] = rose::plugin_id<rose::Multicolumn<tup>, Nop<typ4>, typ4>(); // rle
plugins[5] = rose::plugin_id<rose::Multicolumn<tup>, Nop<typ5>, typ5>();
// todo try Rle / For
plugins[10] = rose::plugin_id<rose::Multicolumn<tup>, Rle<typ10>, typ10>();
plugins[11] = rose::plugin_id<rose::Multicolumn<tup>, Rle<typ11>, typ11>(); // rle
plugins[12] = rose::plugin_id<rose::Multicolumn<tup>, Rle<typ12>, typ12>();
plugins[13] = rose::plugin_id<rose::Multicolumn<tup>, Rle<typ13>, typ13>();
// todo try nop / for
plugins[14] = rose::plugin_id<rose::Multicolumn<tup>, For<typ14>, typ14>(); // rle
plugins[15] = rose::plugin_id<rose::Multicolumn<tup>, Rle<typ15>, typ15>();
plugins[16] = rose::plugin_id<rose::Multicolumn<tup>, Nop<typ16>, typ16>();
// todo try nop
plugins[17] = rose::plugin_id<rose::Multicolumn<tup>, For<typ17>, typ17>(); // for
plugins[18] = rose::plugin_id<rose::Multicolumn<tup>, Rle<typ18>, typ18>();
plugins[19] = rose::plugin_id<rose::Multicolumn<tup>, Rle<typ19>, typ19>();
plugins[6] = rose::plugin_id<rose::Multicolumn<tup>, Rle<typ6>, typ6>();
// // todo try nop
// plugins[7] = rose::plugin_id<rose::Multicolumn<tup>, For<typ7>, typ7>(); // for
// plugins[8] = rose::plugin_id<rose::Multicolumn<tup>, Rle<typ8>, typ8>();
// plugins[9] = rose::plugin_id<rose::Multicolumn<tup>, Rle<typ9>, typ9>();
//
// // todo try Rle / For
// plugins[10] = rose::plugin_id<rose::Multicolumn<tup>, Rle<typ10>, typ10>();
// plugins[11] = rose::plugin_id<rose::Multicolumn<tup>, Rle<typ11>, typ11>(); // rle
// plugins[12] = rose::plugin_id<rose::Multicolumn<tup>, Rle<typ12>, typ12>();
// plugins[13] = rose::plugin_id<rose::Multicolumn<tup>, Rle<typ13>, typ13>();
// // todo try nop / for
// plugins[14] = rose::plugin_id<rose::Multicolumn<tup>, For<typ14>, typ14>(); // rle
// plugins[15] = rose::plugin_id<rose::Multicolumn<tup>, Rle<typ15>, typ15>();
// plugins[16] = rose::plugin_id<rose::Multicolumn<tup>, Nop<typ16>, typ16>();
// // todo try nop
// plugins[17] = rose::plugin_id<rose::Multicolumn<tup>, For<typ17>, typ17>(); // for
// plugins[18] = rose::plugin_id<rose::Multicolumn<tup>, Rle<typ18>, typ18>();
// plugins[19] = rose::plugin_id<rose::Multicolumn<tup>, Rle<typ19>, typ19>();
rose::DynamicMultiColumnTypePageLayout<rose::Multicolumn<tup> >::initPageLayout(plugins);
ret = rose::main
<rose::DynamicMultiColumnTypePageLayout<rose::Multicolumn<tup> > >(argc,argv);
*/
/* return rose::main
<rose::MultiColumnTypePageLayout
<COLS,
@ -93,7 +112,7 @@ int main(int argc, char **argv) {
(argc,argv);
*/
rose::StaticMultiColumnTypePageLayout
/* rose::StaticMultiColumnTypePageLayout
<COLS,
rose::StaticMulticolumn<COLS,tup,
Rle<typ0>,Rle<typ1>,
@ -126,7 +145,42 @@ int main(int argc, char **argv) {
>
>
>
(argc,argv);
(argc,argv);*/
/*
rose::StaticMultiColumnTypePageLayout
<COLS,
rose::StaticMulticolumn<COLS,tup,
Rle<typ0>,Rle<typ1>,
Nop<typ2>,Nop<typ3>,
Nop<typ4>,Nop<typ5>
// Nop<typ6>,Nop<typ7>,
// Nop<typ8>,Nop<typ9>,
// Nop<typ10>,Nop<typ11>,
// Nop<typ12>,Nop<typ13>,
// Nop<typ14>,Nop<typ15>,
// Nop<typ16>,Nop<typ17>,
// Nop<typ18>,Nop<typ19>
>
>::initPageLayout();
ret = rose::main
<rose::StaticMultiColumnTypePageLayout
<COLS,
rose::StaticMulticolumn<COLS,tup,
Rle<typ0>,Rle<typ1>,
Nop<typ2>,Nop<typ3>,
Nop<typ4>,Nop<typ5>
// Nop<typ6>,Nop<typ7>,
// Nop<typ8>,Nop<typ9>,
// Nop<typ10>,Nop<typ11>,
// Nop<typ12>,Nop<typ13>,
// Nop<typ14>,Nop<typ15>,
// Nop<typ16>,Nop<typ17>,
// Nop<typ18>,Nop<typ19>
>
>
>
(argc,argv);
*/
return ret;
}

View file

@ -1,5 +1,7 @@
//#define LEAK_TEST
#define XID_COL 254 // XXX
#include "roseTableTpcCH.h"
#include "stasis/page/compression/compression.h"
@ -16,8 +18,19 @@ int main(int argc, char **argv) {
typedef int8_t typ8;
typedef int8_t typ9;
#define COLS 10
typedef rose::StaticTuple<COLS,typ0,typ1,typ2,typ3,typ4,typ5,typ6,typ7,typ8,typ9> tup;
typedef rose::epoch_t typ10;
typedef int64_t typ11;
typedef int64_t typ12;
typedef int64_t typ13;
typedef int64_t typ14;
typedef int64_t typ15;
typedef int64_t typ16;
typedef int64_t typ17;
typedef int64_t typ18;
typedef int64_t typ19;
#define COLS 11
typedef rose::StaticTuple<COLS,typ0,typ1,typ2,typ3,typ4,typ5,typ6,typ7,typ8,typ9,typ10,typ11,typ12,typ13,typ14,typ15,typ16,typ17,typ18,typ19> tup;
using rose::For;
using rose::Rle;
using rose::Nop;
@ -37,6 +50,21 @@ int main(int argc, char **argv) {
plugins[8] = rose::plugin_id<rose::Multicolumn<tup>, Nop<typ8>, typ8>();
plugins[9] = rose::plugin_id<rose::Multicolumn<tup>, Nop<typ9>, typ9>();
// todo try Rle / For
plugins[10] = rose::plugin_id<rose::Multicolumn<tup>, Rle<typ10>, typ10>();
plugins[11] = rose::plugin_id<rose::Multicolumn<tup>, Rle<typ11>, typ11>(); // rle
plugins[12] = rose::plugin_id<rose::Multicolumn<tup>, Rle<typ12>, typ12>();
plugins[13] = rose::plugin_id<rose::Multicolumn<tup>, Rle<typ13>, typ13>();
// todo try nop / for
plugins[14] = rose::plugin_id<rose::Multicolumn<tup>, For<typ14>, typ14>(); // rle
plugins[15] = rose::plugin_id<rose::Multicolumn<tup>, Rle<typ15>, typ15>();
plugins[16] = rose::plugin_id<rose::Multicolumn<tup>, Nop<typ16>, typ16>();
// todo try nop
plugins[17] = rose::plugin_id<rose::Multicolumn<tup>, For<typ17>, typ17>(); // for
plugins[18] = rose::plugin_id<rose::Multicolumn<tup>, Rle<typ18>, typ18>();
plugins[19] = rose::plugin_id<rose::Multicolumn<tup>, Rle<typ19>, typ19>();
rose::DynamicMultiColumnTypePageLayout<rose::Multicolumn<tup> >::initPageLayout(plugins);
ret = rose::main
@ -63,7 +91,13 @@ int main(int argc, char **argv) {
For<typ2>,Rle<typ3>,
Rle<typ4>,Rle<typ5>,
For<typ6>,For<typ7>,
For<typ8>,Rle<typ9> >
For<typ8>,Rle<typ9>,
Rle<typ10>,Rle<typ11>,
Rle<typ12>,Rle<typ13>,
For<typ14>,Rle<typ15>,
Nop<typ16>,For<typ17>,
Rle<typ18>,Rle<typ19>
>
>::initPageLayout();
ret = rose::main
@ -74,7 +108,13 @@ int main(int argc, char **argv) {
For<typ2>,Rle<typ3>,
Rle<typ4>,Rle<typ5>,
For<typ6>,For<typ7>,
For<typ8>,Rle<typ9> >
For<typ8>,Rle<typ9>,
Rle<typ10>,Rle<typ11>,
Rle<typ12>,Rle<typ13>,
For<typ14>,Rle<typ15>,
Nop<typ16>,For<typ17>,
Rle<typ18>,Rle<typ19>
>
>
>
(argc,argv);

View file

@ -58,7 +58,7 @@ namespace rose {
Tcommit(xid);
lsmTableHandle<PAGELAYOUT>* h = TlsmTableStart<PAGELAYOUT>(lsmTable);
lsmTableHandle<PAGELAYOUT>* h = TlsmTableStart<PAGELAYOUT>(lsmTable, XID_COL);
typename PAGELAYOUT::FMT::TUP t;
typename PAGELAYOUT::FMT::TUP s;
@ -79,7 +79,8 @@ namespace rose {
// int column[] = { 0 , 1, 2, 3, 4, 5, 6, 7, 8, 9 };
// 0 1 2 3 4 5 6 7 8 9
// int column[] = { 3 , 4, 1, 11, 0, 5, 6, 9, 10, 14 };
int column[] = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
const int column[] = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
static long COUNT = INSERTS / 100;
long int count = COUNT;
@ -91,6 +92,8 @@ namespace rose {
start = rose::tv_to_double(start_tv);
last_start = start;
epoch_t this_xid = 0;
epoch_t last_ts_col = 0;
printf("tuple 'size'%d ; %ld\n", PAGELAYOUT::FMT::TUP::sizeofBytes(), sizeof(typename PAGELAYOUT::FMT::TUP));
@ -98,11 +101,12 @@ namespace rose {
typename PAGELAYOUT::FMT::TUP scratch;
int max_col_number = 0;
for(int col =0; col < PAGELAYOUT::FMT::TUP::NN ; col++) {
for(int col =0; col < PAGELAYOUT::FMT::TUP::NN; col++) {
max_col_number = max_col_number < column[col]
? column[col] : max_col_number;
}
char ** toks = (char**)malloc(sizeof(char*)*(max_col_number+1));
char * mode;
printf("Reading from file %s\n", file);
int inserts = 0;
size_t line_len = 100;
@ -127,7 +131,8 @@ namespace rose {
{
char * saveptr;
int i;
toks[0] = strtok_r(line, ",\n", &saveptr);
mode = strtok_r(line, ",\n", &saveptr);
toks[0] = strtok_r(0, ",\n", &saveptr);
for(i = 1; i < (max_col_number+1); i++) {
toks[i] = strtok_r(0, ",\n", &saveptr);
if(!toks[i]) {
@ -451,10 +456,35 @@ namespace rose {
scratch.set19(&t);
}
int needupdate = 0;
if(last_ts_col != *(epoch_t*)scratch.get(XID_COL)) {
this_xid++;
last_ts_col = *(epoch_t*)scratch.get(XID_COL);
needupdate = 1;
}
if(!strcmp(mode, "add")) {
(*(epoch_t*)scratch.get(XID_COL)) = this_xid * 2; // will be *2 + 1 for deletes
// abort();
TlsmTableInsert(h,scratch);
if(needupdate) { TlsmTableUpdateTimestamp(h,(this_xid-1) * 2); }
// abort();
TlsmTableInsert(h,scratch);
} else if(!strcmp(mode, "delete")) {
(*(epoch_t*)scratch.get(XID_COL)) = this_xid * 2 + 1; // + 1 => delete
TlsmTableInsert(h,scratch);
if(needupdate) { TlsmTableUpdateTimestamp(h,(this_xid-1) * 2 + 1); }
} else if(!strcmp(mode, "deliver")) {
TlsmTableInsert(h,scratch);
(*(epoch_t*)scratch.get(XID_COL)) = this_xid * 2 + 1; // + 1 => delete
(*(epoch_t*)scratch.get(PAGELAYOUT::FMT::TUP::NN-1)) = 0; // undelivered tuple
TlsmTableInsert(h,scratch);
if(needupdate) { TlsmTableUpdateTimestamp(h,(this_xid-1) * 2 + 1); }
} else if(!strcmp(mode, "status")) {
typename PAGELAYOUT::FMT::TUP scratch2;
// XXX never finds tuples; gets to correct page, then
// fails because it doesn't know the xid, so no tuples match.
TlsmTableFind(xid,h,scratch, scratch2);
}
count --;
if(!count) {
count = COUNT;
@ -471,6 +501,8 @@ namespace rose {
(((double)PAGELAYOUT::FMT::TUP::sizeofBytes())*(double)count/1000000.0)/(now-last_start)
);
last_start = now;
int count = TlsmTableCount(xid,h);
printf("counted %d tuples\n", count);
}
}
}
@ -546,4 +578,3 @@ namespace rose {
return 0;
}
}

View file

@ -38,6 +38,107 @@ template <class STLITER, class ROW> class stlSetIterator;
template <class STLITER, class ROW>
inline const byte * toByteArray(stlSetIterator<STLITER,ROW> * const t);
/**
Scans over another iterator, checking for tombstones, and garbage collecting old tuples.
*/
template <class ROW, class ITER>
class gcIterator {
public:
explicit gcIterator(ITER * i, ITER * iend, epoch_t beginning_of_time, column_number_t ts_col) : i_(i), newest_(), newTS_(-1), iend_(iend), freeIt(0), beginning_of_time_(beginning_of_time), ts_col_(ts_col) { /*++(*i);*/ if(*i_ != *iend_) { ++(*this);} }
explicit gcIterator(gcIterator& t) : i_(new ITER(*(t.i_))), newest_(t.newest_), newTS_(t.newTS_), iend_(t.iend_), freeIt(1), beginning_of_time_(t.beginning_of_time_), ts_col_(t.ts_col_) { }
~gcIterator() {
if (freeIt) {
delete i_;
}
}
ROW & operator*() {
return newest_;
}
inline bool operator==(const gcIterator &a) const {
return (*i_) == (*a.i_);
}
inline bool operator!=(const gcIterator &a) const {
return (*i_) != (*a.i_);
}
inline void operator++() {
do {
if(*i_ == *iend_) { return; }
assert(*i_ != *iend_);
newest_ = **i_;
newTS_ = *(epoch_t*)newest_.get(ts_col_);
++(*i_);
while(ts_col_ != INVALID_COL && (*i_ != *iend_) && myTupCmp(newest_,**i_)) {
const ROW& r = (**i_);
epoch_t ts = *(epoch_t*)r.get(ts_col_); //r.column_count()-1);
if(ts >= newTS_) {
// if(*(int*)((**i_).get((**i_).column_count()-1)) >= newTS_) {
// newTS_ = *(int*)(**i_).get((**i_).column_count()-1);
newTS_ = ts;
newest_ = r;//**i_;
}
++(*i_);
}
/* if (((newTS_ & 0x1) && (newTS_ < beginning_of_time_) && (ts_col_ != INVALID_COL))) {
printf("gc'ed tombstone!!!\n");
} */
} while ((newTS_ & 0x1) && (newTS_ < beginning_of_time_) && (ts_col_ != INVALID_COL));
}
inline void operator--() {
(*i_)--;
}
/* inline gcIterator* end() {
return new gcIterator(i_->end());
} */
private:
bool myTupCmp(const ROW &a, const ROW &b) {
/* for(int i = 0; i < cnt; i++) {
if(a.get(i) != b.get(i)) {
return 0;
}
}*/
if(ROW::NN > 0) if(*a.get0() != *b.get0()) { if(0 != ts_col_) return 0; }
if(ROW::NN > 1) if(*a.get1() != *b.get1()) { if(1 != ts_col_) return 0; }
if(ROW::NN > 2) if(*a.get2() != *b.get2()) { if(2 != ts_col_) return 0; }
if(ROW::NN > 3) if(*a.get3() != *b.get3()) { if(3 != ts_col_) return 0; }
if(ROW::NN > 4) if(*a.get4() != *b.get4()) { if(4 != ts_col_) return 0; }
if(ROW::NN > 5) if(*a.get5() != *b.get5()) { if(5 != ts_col_) return 0; }
if(ROW::NN > 6) if(*a.get6() != *b.get6()) { if(6 != ts_col_) return 0; }
if(ROW::NN > 7) if(*a.get7() != *b.get7()) { if(7 != ts_col_) return 0; }
if(ROW::NN > 8) if(*a.get8() != *b.get8()) { if(8 != ts_col_) return 0; }
if(ROW::NN > 9) if(*a.get9() != *b.get9()) { if(9 != ts_col_) return 0; }
if(ROW::NN > 10) if(*a.get10() != *b.get10()) { if(10 != ts_col_) return 0; }
if(ROW::NN > 11) if(*a.get11() != *b.get11()) { if(11 != ts_col_) return 0; }
if(ROW::NN > 12) if(*a.get12() != *b.get12()) { if(12 != ts_col_) return 0; }
if(ROW::NN > 13) if(*a.get13() != *b.get13()) { if(13 != ts_col_) return 0; }
if(ROW::NN > 14) if(*a.get14() != *b.get14()) { if(14 != ts_col_) return 0; }
if(ROW::NN > 15) if(*a.get15() != *b.get15()) { if(15 != ts_col_) return 0; }
if(ROW::NN > 16) if(*a.get16() != *b.get16()) { if(16 != ts_col_) return 0; }
if(ROW::NN > 17) if(*a.get17() != *b.get17()) { if(17 != ts_col_) return 0; }
if(ROW::NN > 18) if(*a.get18() != *b.get18()) { if(18 != ts_col_) return 0; }
if(ROW::NN > 19) if(*a.get19() != *b.get19()) { if(19 != ts_col_) return 0; }
return 1;
}
explicit gcIterator() { abort(); }
void operator=(gcIterator & t) { abort(); }
int operator-(gcIterator & t) { abort(); }
ITER * i_;
ROW newest_;
epoch_t newTS_;
ITER * iend_;
bool freeIt;
epoch_t beginning_of_time_;
column_number_t ts_col_;
};
//---------------------------------------------------------------------------
/**
Scans through an LSM tree's leaf pages, each tuple in the tree, in
order. This iterator is designed for maximum forward scan
@ -47,7 +148,11 @@ template <class ROW, class PAGELAYOUT>
class treeIterator {
private:
inline void init_helper() {
if(!lsmTreeIterator_next(-1, lsmIterator_)) {
if(!lsmIterator_) {
currentPage_ = 0;
pageid_ = -1;
p_ = 0;
} else if(!lsmTreeIterator_next(-1, lsmIterator_)) {
currentPage_ = 0;
pageid_ = -1;
p_ = 0;
@ -97,10 +202,10 @@ class treeIterator {
init_helper();
}
explicit treeIterator(treeIteratorHandle* tree) :
tree_(tree->r_),
tree_(tree?tree->r_:NULLRID),
scratch_(),
keylen_(ROW::sizeofBytes()),
lsmIterator_(lsmTreeIterator_open(-1,tree->r_)),
lsmIterator_(lsmTreeIterator_open(-1,tree?tree->r_:NULLRID)),
slot_(0)
{
init_helper();
@ -109,15 +214,16 @@ class treeIterator {
tree_(t.tree_),
scratch_(t.scratch_),
keylen_(t.keylen_),
lsmIterator_(lsmTreeIterator_copy(-1,t.lsmIterator_)),
lsmIterator_(t.lsmIterator_?lsmTreeIterator_copy(-1,t.lsmIterator_):0),
slot_(t.slot_),
pageid_(t.pageid_),
p_((Page*)((t.p_)?loadPage(-1,t.p_->id):0)),
currentPage_((PAGELAYOUT*)((p_)?p_->impl:0)) {
}
~treeIterator() {
lsmTreeIterator_close(-1, lsmIterator_);
if(lsmIterator_) {
lsmTreeIterator_close(-1, lsmIterator_);
}
if(p_) {
releasePage(p_);
p_ = 0;
@ -148,10 +254,13 @@ class treeIterator {
abort();
}
}
/* for(int c = 0; c < (scratch_).column_count(); c++) {
assert(*(byte*)(scratch_).get(c) || !*(byte*)(scratch_).get(c));
} */
return scratch_;
}
inline bool operator==(const treeIterator &a) const {
return (slot_ == a.slot_ && pageid_ == a.pageid_);
return (slot_ == a.slot_ && pageid_ == a.pageid_)/* || !(lsmIterator_ && a.lsmIterator_)*/ ;
}
inline bool operator!=(const treeIterator &a) const {
return !(*this==a);
@ -259,8 +368,12 @@ class mergeIterator {
{ }
const ROW& operator* () {
if(curr_ == A || curr_ == BOTH) { return *a_; }
if(curr_ == B) { return *b_; }
if(curr_ == A) { return *a_; }
if(curr_ == B || curr_ == BOTH) { return *b_; }
// abort();
curr_ = calcCurr(A);
if(curr_ == A) { return *a_; }
if(curr_ == B || curr_ == BOTH) { return *b_; }
abort();
}
void seekEnd() {
@ -402,15 +515,11 @@ class versioningIterator {
}
inline unsigned int offset() { return off_; }
private:
// unsigned int off_;
ITER a_;
ITER aend_;
int check_tombstone_;
ROW tombstone_;
// ROW &scratch_;
off_t off_;
// int before_eof_;
// typeof(ROW::TIMESTAMP) beginning_of_time_;
friend const byte*
toByteArray<ITER,ROW>(versioningIterator<ITER,ROW> * const t);
};
@ -470,9 +579,9 @@ inline const byte * toByteArray(stlSetIterator<SET,ROW> * const t) {
position */
template <class ITERA, class ITERB, class ROW>
inline const byte * toByteArray(mergeIterator<ITERA,ITERB,ROW> * const t) {
if(t->curr_ == t->A || t->curr_ == t->BOTH) {
if(t->curr_ == t->A) {
return toByteArray(&t->a_);
} else if(t->curr_ == t->B) {
} else if(t->curr_ == t->B || t->curr_ == t->BOTH) {
return toByteArray(&t->b_);
}
abort();

View file

@ -12,6 +12,7 @@
#include <stasis/truncation.h>
namespace rose {
/**
@file
@ -46,6 +47,8 @@ namespace rose {
typename ITERA::handle ** out_tree;
void * out_tree_allocer;
typename ITERA::handle my_tree;
epoch_t * last_complete_xact;
column_number_t ts_col;
};
template <class PAGELAYOUT, class ITER>
@ -72,6 +75,9 @@ namespace rose {
typename PAGELAYOUT::FMT * mc = PAGELAYOUT::initPage(p, &**begin);
for(ITER i(*begin); i != *end; ++i) {
/* for(int c = 0; c < (*i).column_count(); c++) {
assert(*(byte*)(*i).get(c) || !*(byte*)(*i).get(c));
} */
rose::slot_index_t ret = mc->append(xid, *i);
if(ret == rose::NOSPACE) {
@ -164,6 +170,8 @@ namespace rose {
gettimeofday(&wait_tv,0);
epoch_t current_timestamp = a->last_complete_xact ? *(a->last_complete_xact) : 0;
uint64_t insertedTuples;
pageid_t mergedPages;
ITERA *taBegin = new ITERA(tree);
@ -213,10 +221,19 @@ namespace rose {
mergedPages = compressData
<PAGELAYOUT,versioningIterator<mergeIterator<ITERA,ITERB,typename PAGELAYOUT::FMT::TUP>, typename PAGELAYOUT::FMT::TUP> >
(xid, &vBegin, &vEnd,tree->r_,a->pageAlloc,a->pageAllocState,&insertedTuples); */
mergedPages = compressData
<PAGELAYOUT,mergeIterator<ITERA,ITERB,typename PAGELAYOUT::FMT::TUP> >
(xid, &mBegin, &mEnd,tree->r_,a->pageAlloc,a->pageAllocState,&insertedTuples);
/* mergedPages = compressData
<PAGELAYOUT,mergeIterator<ITERA,ITERB,typename PAGELAYOUT::FMT::TUP> >
(xid, &mBegin, &mEnd,tree->r_,a->pageAlloc,a->pageAllocState,&insertedTuples); */
gcIterator<typename PAGELAYOUT::FMT::TUP,mergeIterator<ITERA,ITERB,typename PAGELAYOUT::FMT::TUP> > gcBegin(&mBegin, &mEnd, current_timestamp, a->ts_col);
gcIterator<typename PAGELAYOUT::FMT::TUP,mergeIterator<ITERA,ITERB,typename PAGELAYOUT::FMT::TUP> > gcEnd(&mEnd, &mEnd, current_timestamp, a->ts_col);
//++gcBegin;
mergedPages = compressData
<PAGELAYOUT, gcIterator<typename PAGELAYOUT::FMT::TUP,mergeIterator<ITERA,ITERB,typename PAGELAYOUT::FMT::TUP> > >
(xid, &gcBegin, &gcEnd,tree->r_,a->pageAlloc,a->pageAllocState,&insertedTuples);
// these tree iterators keep pages pinned! Don't call force until they've been deleted, or we'll deadlock.
} // free all the stack allocated iterators...
@ -422,10 +439,12 @@ namespace rose {
typename PAGELAYOUT::FMT>, stlSetIterator<typename std::set<typename PAGELAYOUT::FMT::TUP,
typename PAGELAYOUT::FMT::TUP::stl_cmp>,
typename PAGELAYOUT::FMT::TUP> > * args2;
epoch_t last_xact;
};
template<class PAGELAYOUT>
lsmTableHandle <PAGELAYOUT> * TlsmTableStart(recordid& tree) {
// XXX ts_col should be an argument to TlsmTableAlloc, not start!!!
lsmTableHandle <PAGELAYOUT> * TlsmTableStart(recordid& tree, column_number_t ts_col) {
/// XXX xid for daemon processes?
lsmTableHeader_t h;
Tread(-1, tree, &h);
@ -502,6 +521,8 @@ namespace rose {
ret->input_needed_cond = block0_needed_cond;
ret->input_size = block0_size;
ret->last_xact = 0;
recordid * ridp = (recordid*)malloc(sizeof(recordid));
*ridp = h.bigTreeAllocState;
recordid * oldridp = (recordid*)malloc(sizeof(recordid));
@ -532,7 +553,9 @@ namespace rose {
allocer_scratch,
0,
0,
new typename LSM_ITER::treeIteratorHandle(NULLRID)
new typename LSM_ITER::treeIteratorHandle(NULLRID),
&(ret->last_xact),
ts_col
};
*ret->args1 = tmpargs1;
void * (*merger1)(void*) = mergeThread<PAGELAYOUT, LSM_ITER, LSM_ITER>;
@ -566,7 +589,9 @@ namespace rose {
0,
block1_scratch,
allocer_scratch,
new typename LSM_ITER::treeIteratorHandle(NULLRID)
new typename LSM_ITER::treeIteratorHandle(NULLRID),
0,
ts_col
};
*ret->args2 = tmpargs2;
void * (*merger2)(void*) = mergeThread<PAGELAYOUT, LSM_ITER, RB_ITER>;
@ -626,9 +651,21 @@ namespace rose {
pthread_join(h->merge1_thread,0);
pthread_join(h->merge2_thread,0);
}
template<class PAGELAYOUT>
void TlsmTableUpdateTimestamp(lsmTableHandle<PAGELAYOUT> *h,
epoch_t ts) {
pthread_mutex_lock(h->mut);
assert(h->last_xact <= ts);
h->last_xact = ts;
pthread_mutex_unlock(h->mut);
}
template<class PAGELAYOUT>
void TlsmTableInsert( lsmTableHandle<PAGELAYOUT> *h,
typename PAGELAYOUT::FMT::TUP &t) {
/* for(int i = 0; i < t.column_count(); i++) { // This helps valgrind warn earlier...
assert(*((char*)t.get(i)) || *((char*)t.get(i))+1);
} */
h->scratch_handle->insert(t);
uint64_t handleBytes = h->scratch_handle->size() * (RB_TREE_OVERHEAD + PAGELAYOUT::FMT::TUP::sizeofBytes());
@ -688,6 +725,99 @@ namespace rose {
return ret;
}
template<class PAGELAYOUT>
int TlsmTableCount(int xid, lsmTableHandle<PAGELAYOUT> *h) {
/* treeIterator<typename PAGELAYOUT::FMT::TUP, typename PAGELAYOUT::FMT> it1(NULLRID);
treeIterator<typename PAGELAYOUT::FMT::TUP, typename PAGELAYOUT::FMT> it2(NULLRID;
stlSetIterator<typename std::set<typename PAGELAYOUT::FMT::TUP>,
typename PAGELAYOUT::FMT::TUP::stl_cmp>,
typename PAGELAYOUT::FMT::TUP> it3(???);*/
// typename std::set<typename
typedef treeIterator<typename PAGELAYOUT::FMT::TUP,
typename PAGELAYOUT::FMT> LSM_ITER;
/* taypedef stlSetIterator<typename std::set<typename PAGELAYOUT::FMT::TUP,
typename PAGELAYOUT::FMT::TUP::stl_cmp>,
typename PAGELAYOUT::FMT::TUP> RB_ITER; */
typedef std::_Rb_tree_const_iterator<typename PAGELAYOUT::FMT::TUP> RB_ITER;
// typedef mergeIterator<LSM_ITER,RB_ITER,typename PAGELAYOUT::FMT::TUP> INNER_MERGE;
typedef mergeIterator<LSM_ITER,LSM_ITER,typename PAGELAYOUT::FMT::TUP> LSM_LSM ;
typedef mergeIterator<RB_ITER,RB_ITER,typename PAGELAYOUT::FMT::TUP> RB_RB ;
typedef mergeIterator<LSM_ITER,LSM_LSM,typename PAGELAYOUT::FMT::TUP> LSM_M_LSM_LSM;
typedef mergeIterator<LSM_M_LSM_LSM,RB_RB,typename PAGELAYOUT::FMT::TUP> M_LSM_LSM_LSM_M_RB_RB;
LSM_ITER * it1end;
LSM_ITER * it2end;
LSM_ITER * it3end;
int ret =0;
pthread_mutex_lock(h->mut);
{
LSM_ITER it2(*h->args1->in_tree ? **h->args1->in_tree : 0);
it2end = it2.end();
// while(it2 != *it2end) { *it2; ++it2; ret++;}
RB_ITER it4(*h->args2->in_tree ? (**h->args2->in_tree)->begin() : h->scratch_handle->end());
RB_ITER it4end(*h->args2->in_tree ? (**h->args2->in_tree)->end() : h->scratch_handle->end());
// while(it4 != it4end) { *it4; ++it4; ret++; }
LSM_ITER it1(h->args1->my_tree);
it1end = it1.end();
// while(it1 != *it1end) { *it1; ++it1; ret++; }
LSM_ITER it3(h->args2->my_tree);
it3end = it3.end();
// while(it3 != *it3end) { *it3; ++it3; ret++; }
RB_ITER it5 = h->scratch_handle->begin();
RB_ITER it5end = h->scratch_handle->end();
// while(it5 != it5end) { *it5; ++it5; ret++; }
RB_RB m45(it4,it5,it4end,it5end);
RB_RB m45end(it4,it5,it4end,it5end);
m45end.seekEnd();
// while(m45 != m45end) { ++m45; }
LSM_LSM m23(it2,it3,*it2end,*it3end);
LSM_LSM m23end(it2,it3,*it2end,*it3end);
m23end.seekEnd();
// while(m23 != m23end) { ++m23; }
LSM_M_LSM_LSM m123(it1,m23,*it1end,m23end);
LSM_M_LSM_LSM m123end(it1,m23,*it1end,m23end);
m123end.seekEnd();
// if(m123 != m123end) { ++m123; }
M_LSM_LSM_LSM_M_RB_RB m12345(m123,m45,m123end,m45end);
M_LSM_LSM_LSM_M_RB_RB m12345end(m123,m45,m123end,m45end);
m12345end.seekEnd();
while(m12345 != m12345end) {
*m12345;
++ret;
++m12345;
}
} // free the stack allocated iterators
delete it2end;
delete it1end;
delete it3end;
pthread_mutex_unlock(h->mut);
return ret;
}
template<class PAGELAYOUT>
const typename PAGELAYOUT::FMT::TUP *
TlsmTableFind(int xid, lsmTableHandle<PAGELAYOUT> *h,
@ -738,7 +868,7 @@ namespace rose {
} else {
DEBUG("no tree");
}
pthread_mutex_unlock(h->mut);
DEBUG("Not in any tree\n");
assert(r == 0);
return r;

View file

@ -717,6 +717,9 @@ pageid_t TlsmFindPage(int xid, recordid tree, const byte *key) {
}
pageid_t TlsmLastPage(int xid, recordid tree) {
if(tree.page == 0 && tree.slot == 0 && tree.size == -1) {
return -1;
}
Page * root = loadPage(xid, tree.page);
readlock(root->rwlatch,0);
lsmTreeState *state = root->impl;
@ -781,6 +784,7 @@ page_impl lsmRootImpl() {
///--------------------- Iterator implementation
lladdIterator_t *lsmTreeIterator_open(int xid, recordid root) {
if(root.page == 0 && root.slot == 0 && root.size == -1) { return 0; }
Page *p = loadPage(xid,root.page);
readlock(p->rwlatch,0);
size_t keySize = getKeySize(xid,p);