From 0c7d45f7880be85d5ea58134d6702000dff8f730 Mon Sep 17 00:00:00 2001 From: Sears Russell Date: Thu, 13 Mar 2008 01:16:37 +0000 Subject: [PATCH] Added support for tombstones and rudimentary versioning. workload 2 is broken at the moment. --- benchmarks/roseTable.h | 2 +- benchmarks/roseTableTpcCH-workload1.cpp | 120 ++++++++++++++------ benchmarks/roseTableTpcCH-workload2.cpp | 48 +++++++- benchmarks/roseTableTpcCH.h | 45 ++++++-- src/stasis/operations/lsmIterators.h | 139 ++++++++++++++++++++--- src/stasis/operations/lsmTable.h | 144 ++++++++++++++++++++++-- src/stasis/operations/lsmTree.c | 4 + 7 files changed, 435 insertions(+), 67 deletions(-) diff --git a/benchmarks/roseTable.h b/benchmarks/roseTable.h index 47ffad9..ec4eb55 100644 --- a/benchmarks/roseTable.h +++ b/benchmarks/roseTable.h @@ -58,7 +58,7 @@ namespace rose { Tcommit(xid); - lsmTableHandle* h = TlsmTableStart(lsmTable); + lsmTableHandle* h = TlsmTableStart(lsmTable, INVALID_COL); typename PAGELAYOUT::FMT::TUP t; typename PAGELAYOUT::FMT::TUP s; diff --git a/benchmarks/roseTableTpcCH-workload1.cpp b/benchmarks/roseTableTpcCH-workload1.cpp index 8921e24..ef26bc2 100644 --- a/benchmarks/roseTableTpcCH-workload1.cpp +++ b/benchmarks/roseTableTpcCH-workload1.cpp @@ -1,20 +1,36 @@ //#define LEAK_TEST + +// XID_COL is 0 indexed. +#define XID_COL 4 + #include "roseTableTpcCH.h" #include "stasis/page/compression/compression.h" int main(int argc, char **argv) { typedef int32_t typ0; - typedef int16_t typ1; - typedef int8_t typ2; + typedef int16_t typ1;//typedef int16_t typ1; + typedef int8_t typ2; typedef int8_t typ3; - typedef int32_t typ4; - typedef int32_t typ5; + // typedef int8_t typ2; + // typedef int8_t typ3; + typedef rose::epoch_t typ4; + typedef int8_t typ5; typedef int8_t typ6; typedef int32_t typ7; typedef int8_t typ8; typedef int8_t typ9; + /* typedef int64_t typ0; + typedef int64_t typ1; + typedef int64_t typ2; + typedef int64_t typ3; + typedef int64_t typ4; + typedef int64_t typ5; + typedef int64_t typ6; + typedef int64_t typ7; + typedef int64_t typ8; + typedef int64_t typ9; */ /* typedef int32_t typ10; typedef int16_t typ11; @@ -36,49 +52,52 @@ int main(int argc, char **argv) { typedef int64_t typ17; typedef int64_t typ18; typedef int64_t typ19; - #define COLS 20 - typedef rose::StaticTuple tup; + #define COLS 7 + typedef rose::StaticTuple tup; using rose::For; using rose::Rle; using rose::Nop; int ret; // multicolumn is deprecated; want static dispatch! - /* + rose::plugin_id_t * plugins = (rose::plugin_id_t*)malloc(COLS * sizeof(rose::plugin_id_t)); // todo try Rle / For plugins[0] = rose::plugin_id, Rle, typ0>(); plugins[1] = rose::plugin_id, Rle, typ1>(); // rle - plugins[2] = rose::plugin_id, Rle, typ2>(); - plugins[3] = rose::plugin_id, Rle, typ3>(); + plugins[2] = rose::plugin_id, Nop, typ2>(); + plugins[3] = rose::plugin_id, Nop, typ3>(); // todo try nop / for - plugins[4] = rose::plugin_id, For, typ4>(); // rle - plugins[5] = rose::plugin_id, Rle, typ5>(); - plugins[6] = rose::plugin_id, Nop, typ6>(); - // todo try nop - plugins[7] = rose::plugin_id, For, typ7>(); // for - plugins[8] = rose::plugin_id, Rle, typ8>(); - plugins[9] = rose::plugin_id, Rle, typ9>(); + plugins[4] = rose::plugin_id, Nop, typ4>(); // rle + plugins[5] = rose::plugin_id, Nop, typ5>(); - // todo try Rle / For - plugins[10] = rose::plugin_id, Rle, typ10>(); - plugins[11] = rose::plugin_id, Rle, typ11>(); // rle - plugins[12] = rose::plugin_id, Rle, typ12>(); - plugins[13] = rose::plugin_id, Rle, typ13>(); - // todo try nop / for - plugins[14] = rose::plugin_id, For, typ14>(); // rle - plugins[15] = rose::plugin_id, Rle, typ15>(); - plugins[16] = rose::plugin_id, Nop, typ16>(); - // todo try nop - plugins[17] = rose::plugin_id, For, typ17>(); // for - plugins[18] = rose::plugin_id, Rle, typ18>(); - plugins[19] = rose::plugin_id, Rle, typ19>(); + plugins[6] = rose::plugin_id, Rle, typ6>(); +// // todo try nop +// plugins[7] = rose::plugin_id, For, typ7>(); // for +// plugins[8] = rose::plugin_id, Rle, typ8>(); +// plugins[9] = rose::plugin_id, Rle, typ9>(); +// +// // todo try Rle / For +// plugins[10] = rose::plugin_id, Rle, typ10>(); +// plugins[11] = rose::plugin_id, Rle, typ11>(); // rle +// plugins[12] = rose::plugin_id, Rle, typ12>(); +// plugins[13] = rose::plugin_id, Rle, typ13>(); +// // todo try nop / for +// plugins[14] = rose::plugin_id, For, typ14>(); // rle +// plugins[15] = rose::plugin_id, Rle, typ15>(); +// plugins[16] = rose::plugin_id, Nop, typ16>(); +// // todo try nop +// plugins[17] = rose::plugin_id, For, typ17>(); // for +// plugins[18] = rose::plugin_id, Rle, typ18>(); +// plugins[19] = rose::plugin_id, Rle, typ19>(); rose::DynamicMultiColumnTypePageLayout >::initPageLayout(plugins); ret = rose::main - > >(argc,argv); - */ + > >(argc,argv); + /* return rose::main ,Rle, @@ -126,7 +145,42 @@ int main(int argc, char **argv) { > > > + (argc,argv);*/ + /* + rose::StaticMultiColumnTypePageLayout + ,Rle, + Nop,Nop, + Nop,Nop + // Nop,Nop, + // Nop,Nop, + // Nop,Nop, + // Nop,Nop, + // Nop,Nop, + // Nop,Nop, + // Nop,Nop + > + >::initPageLayout(); + + ret = rose::main + ,Rle, + Nop,Nop, + Nop,Nop + // Nop,Nop, + // Nop,Nop, + // Nop,Nop, + // Nop,Nop, + // Nop,Nop, + // Nop,Nop, + // Nop,Nop + > + > + > (argc,argv); - + */ return ret; } diff --git a/benchmarks/roseTableTpcCH-workload2.cpp b/benchmarks/roseTableTpcCH-workload2.cpp index b2d3d4c..c9b85b6 100644 --- a/benchmarks/roseTableTpcCH-workload2.cpp +++ b/benchmarks/roseTableTpcCH-workload2.cpp @@ -1,5 +1,7 @@ //#define LEAK_TEST +#define XID_COL 254 // XXX + #include "roseTableTpcCH.h" #include "stasis/page/compression/compression.h" @@ -16,8 +18,19 @@ int main(int argc, char **argv) { typedef int8_t typ8; typedef int8_t typ9; - #define COLS 10 - typedef rose::StaticTuple tup; + typedef rose::epoch_t typ10; + typedef int64_t typ11; + typedef int64_t typ12; + typedef int64_t typ13; + typedef int64_t typ14; + typedef int64_t typ15; + typedef int64_t typ16; + typedef int64_t typ17; + typedef int64_t typ18; + typedef int64_t typ19; + + #define COLS 11 + typedef rose::StaticTuple tup; using rose::For; using rose::Rle; using rose::Nop; @@ -37,6 +50,21 @@ int main(int argc, char **argv) { plugins[8] = rose::plugin_id, Nop, typ8>(); plugins[9] = rose::plugin_id, Nop, typ9>(); + // todo try Rle / For + plugins[10] = rose::plugin_id, Rle, typ10>(); + plugins[11] = rose::plugin_id, Rle, typ11>(); // rle + plugins[12] = rose::plugin_id, Rle, typ12>(); + plugins[13] = rose::plugin_id, Rle, typ13>(); + // todo try nop / for + plugins[14] = rose::plugin_id, For, typ14>(); // rle + plugins[15] = rose::plugin_id, Rle, typ15>(); + plugins[16] = rose::plugin_id, Nop, typ16>(); + // todo try nop + plugins[17] = rose::plugin_id, For, typ17>(); // for + plugins[18] = rose::plugin_id, Rle, typ18>(); + plugins[19] = rose::plugin_id, Rle, typ19>(); + + rose::DynamicMultiColumnTypePageLayout >::initPageLayout(plugins); ret = rose::main @@ -63,7 +91,13 @@ int main(int argc, char **argv) { For,Rle, Rle,Rle, For,For, - For,Rle > + For,Rle, + Rle,Rle, + Rle,Rle, + For,Rle, + Nop,For, + Rle,Rle + > >::initPageLayout(); ret = rose::main @@ -74,7 +108,13 @@ int main(int argc, char **argv) { For,Rle, Rle,Rle, For,For, - For,Rle > + For,Rle, + Rle,Rle, + Rle,Rle, + For,Rle, + Nop,For, + Rle,Rle + > > > (argc,argv); diff --git a/benchmarks/roseTableTpcCH.h b/benchmarks/roseTableTpcCH.h index a1e9e26..956a290 100644 --- a/benchmarks/roseTableTpcCH.h +++ b/benchmarks/roseTableTpcCH.h @@ -58,7 +58,7 @@ namespace rose { Tcommit(xid); - lsmTableHandle* h = TlsmTableStart(lsmTable); + lsmTableHandle* h = TlsmTableStart(lsmTable, XID_COL); typename PAGELAYOUT::FMT::TUP t; typename PAGELAYOUT::FMT::TUP s; @@ -79,7 +79,8 @@ namespace rose { // int column[] = { 0 , 1, 2, 3, 4, 5, 6, 7, 8, 9 }; // 0 1 2 3 4 5 6 7 8 9 // int column[] = { 3 , 4, 1, 11, 0, 5, 6, 9, 10, 14 }; - int column[] = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }; + const int column[] = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }; + static long COUNT = INSERTS / 100; long int count = COUNT; @@ -91,6 +92,8 @@ namespace rose { start = rose::tv_to_double(start_tv); last_start = start; + epoch_t this_xid = 0; + epoch_t last_ts_col = 0; printf("tuple 'size'%d ; %ld\n", PAGELAYOUT::FMT::TUP::sizeofBytes(), sizeof(typename PAGELAYOUT::FMT::TUP)); @@ -98,11 +101,12 @@ namespace rose { typename PAGELAYOUT::FMT::TUP scratch; int max_col_number = 0; - for(int col =0; col < PAGELAYOUT::FMT::TUP::NN ; col++) { + for(int col =0; col < PAGELAYOUT::FMT::TUP::NN; col++) { max_col_number = max_col_number < column[col] ? column[col] : max_col_number; } char ** toks = (char**)malloc(sizeof(char*)*(max_col_number+1)); + char * mode; printf("Reading from file %s\n", file); int inserts = 0; size_t line_len = 100; @@ -127,7 +131,8 @@ namespace rose { { char * saveptr; int i; - toks[0] = strtok_r(line, ",\n", &saveptr); + mode = strtok_r(line, ",\n", &saveptr); + toks[0] = strtok_r(0, ",\n", &saveptr); for(i = 1; i < (max_col_number+1); i++) { toks[i] = strtok_r(0, ",\n", &saveptr); if(!toks[i]) { @@ -451,10 +456,35 @@ namespace rose { scratch.set19(&t); } + int needupdate = 0; + if(last_ts_col != *(epoch_t*)scratch.get(XID_COL)) { + this_xid++; + last_ts_col = *(epoch_t*)scratch.get(XID_COL); + needupdate = 1; + } + if(!strcmp(mode, "add")) { + (*(epoch_t*)scratch.get(XID_COL)) = this_xid * 2; // will be *2 + 1 for deletes + // abort(); + TlsmTableInsert(h,scratch); + if(needupdate) { TlsmTableUpdateTimestamp(h,(this_xid-1) * 2); } - // abort(); - TlsmTableInsert(h,scratch); + } else if(!strcmp(mode, "delete")) { + (*(epoch_t*)scratch.get(XID_COL)) = this_xid * 2 + 1; // + 1 => delete + TlsmTableInsert(h,scratch); + if(needupdate) { TlsmTableUpdateTimestamp(h,(this_xid-1) * 2 + 1); } + } else if(!strcmp(mode, "deliver")) { + TlsmTableInsert(h,scratch); + (*(epoch_t*)scratch.get(XID_COL)) = this_xid * 2 + 1; // + 1 => delete + (*(epoch_t*)scratch.get(PAGELAYOUT::FMT::TUP::NN-1)) = 0; // undelivered tuple + TlsmTableInsert(h,scratch); + if(needupdate) { TlsmTableUpdateTimestamp(h,(this_xid-1) * 2 + 1); } + } else if(!strcmp(mode, "status")) { + typename PAGELAYOUT::FMT::TUP scratch2; + // XXX never finds tuples; gets to correct page, then + // fails because it doesn't know the xid, so no tuples match. + TlsmTableFind(xid,h,scratch, scratch2); + } count --; if(!count) { count = COUNT; @@ -471,6 +501,8 @@ namespace rose { (((double)PAGELAYOUT::FMT::TUP::sizeofBytes())*(double)count/1000000.0)/(now-last_start) ); last_start = now; + int count = TlsmTableCount(xid,h); + printf("counted %d tuples\n", count); } } } @@ -546,4 +578,3 @@ namespace rose { return 0; } } - diff --git a/src/stasis/operations/lsmIterators.h b/src/stasis/operations/lsmIterators.h index e8b1c49..ad20b46 100644 --- a/src/stasis/operations/lsmIterators.h +++ b/src/stasis/operations/lsmIterators.h @@ -38,6 +38,107 @@ template class stlSetIterator; template inline const byte * toByteArray(stlSetIterator * const t); + + +/** + Scans over another iterator, checking for tombstones, and garbage collecting old tuples. + */ +template +class gcIterator { + public: + explicit gcIterator(ITER * i, ITER * iend, epoch_t beginning_of_time, column_number_t ts_col) : i_(i), newest_(), newTS_(-1), iend_(iend), freeIt(0), beginning_of_time_(beginning_of_time), ts_col_(ts_col) { /*++(*i);*/ if(*i_ != *iend_) { ++(*this);} } + explicit gcIterator(gcIterator& t) : i_(new ITER(*(t.i_))), newest_(t.newest_), newTS_(t.newTS_), iend_(t.iend_), freeIt(1), beginning_of_time_(t.beginning_of_time_), ts_col_(t.ts_col_) { } + + ~gcIterator() { + if (freeIt) { + delete i_; + } + } + ROW & operator*() { + return newest_; + } + inline bool operator==(const gcIterator &a) const { + return (*i_) == (*a.i_); + } + inline bool operator!=(const gcIterator &a) const { + return (*i_) != (*a.i_); + } + inline void operator++() { + do { + if(*i_ == *iend_) { return; } + assert(*i_ != *iend_); + newest_ = **i_; + newTS_ = *(epoch_t*)newest_.get(ts_col_); + ++(*i_); + while(ts_col_ != INVALID_COL && (*i_ != *iend_) && myTupCmp(newest_,**i_)) { + const ROW& r = (**i_); + epoch_t ts = *(epoch_t*)r.get(ts_col_); //r.column_count()-1); + + if(ts >= newTS_) { + // if(*(int*)((**i_).get((**i_).column_count()-1)) >= newTS_) { + // newTS_ = *(int*)(**i_).get((**i_).column_count()-1); + newTS_ = ts; + + newest_ = r;//**i_; + } + ++(*i_); + } + /* if (((newTS_ & 0x1) && (newTS_ < beginning_of_time_) && (ts_col_ != INVALID_COL))) { + printf("gc'ed tombstone!!!\n"); + } */ + } while ((newTS_ & 0x1) && (newTS_ < beginning_of_time_) && (ts_col_ != INVALID_COL)); + } + inline void operator--() { + (*i_)--; + } + /* inline gcIterator* end() { + return new gcIterator(i_->end()); + } */ + private: + bool myTupCmp(const ROW &a, const ROW &b) { + /* for(int i = 0; i < cnt; i++) { + if(a.get(i) != b.get(i)) { + return 0; + } + }*/ + if(ROW::NN > 0) if(*a.get0() != *b.get0()) { if(0 != ts_col_) return 0; } + if(ROW::NN > 1) if(*a.get1() != *b.get1()) { if(1 != ts_col_) return 0; } + if(ROW::NN > 2) if(*a.get2() != *b.get2()) { if(2 != ts_col_) return 0; } + if(ROW::NN > 3) if(*a.get3() != *b.get3()) { if(3 != ts_col_) return 0; } + if(ROW::NN > 4) if(*a.get4() != *b.get4()) { if(4 != ts_col_) return 0; } + if(ROW::NN > 5) if(*a.get5() != *b.get5()) { if(5 != ts_col_) return 0; } + if(ROW::NN > 6) if(*a.get6() != *b.get6()) { if(6 != ts_col_) return 0; } + if(ROW::NN > 7) if(*a.get7() != *b.get7()) { if(7 != ts_col_) return 0; } + if(ROW::NN > 8) if(*a.get8() != *b.get8()) { if(8 != ts_col_) return 0; } + if(ROW::NN > 9) if(*a.get9() != *b.get9()) { if(9 != ts_col_) return 0; } + if(ROW::NN > 10) if(*a.get10() != *b.get10()) { if(10 != ts_col_) return 0; } + if(ROW::NN > 11) if(*a.get11() != *b.get11()) { if(11 != ts_col_) return 0; } + if(ROW::NN > 12) if(*a.get12() != *b.get12()) { if(12 != ts_col_) return 0; } + if(ROW::NN > 13) if(*a.get13() != *b.get13()) { if(13 != ts_col_) return 0; } + if(ROW::NN > 14) if(*a.get14() != *b.get14()) { if(14 != ts_col_) return 0; } + if(ROW::NN > 15) if(*a.get15() != *b.get15()) { if(15 != ts_col_) return 0; } + if(ROW::NN > 16) if(*a.get16() != *b.get16()) { if(16 != ts_col_) return 0; } + if(ROW::NN > 17) if(*a.get17() != *b.get17()) { if(17 != ts_col_) return 0; } + if(ROW::NN > 18) if(*a.get18() != *b.get18()) { if(18 != ts_col_) return 0; } + if(ROW::NN > 19) if(*a.get19() != *b.get19()) { if(19 != ts_col_) return 0; } + return 1; + } + + explicit gcIterator() { abort(); } + void operator=(gcIterator & t) { abort(); } + int operator-(gcIterator & t) { abort(); } + ITER * i_; + ROW newest_; + epoch_t newTS_; + ITER * iend_; + bool freeIt; + epoch_t beginning_of_time_; + column_number_t ts_col_; +}; + +//--------------------------------------------------------------------------- + + /** Scans through an LSM tree's leaf pages, each tuple in the tree, in order. This iterator is designed for maximum forward scan @@ -47,7 +148,11 @@ template class treeIterator { private: inline void init_helper() { - if(!lsmTreeIterator_next(-1, lsmIterator_)) { + if(!lsmIterator_) { + currentPage_ = 0; + pageid_ = -1; + p_ = 0; + } else if(!lsmTreeIterator_next(-1, lsmIterator_)) { currentPage_ = 0; pageid_ = -1; p_ = 0; @@ -97,10 +202,10 @@ class treeIterator { init_helper(); } explicit treeIterator(treeIteratorHandle* tree) : - tree_(tree->r_), + tree_(tree?tree->r_:NULLRID), scratch_(), keylen_(ROW::sizeofBytes()), - lsmIterator_(lsmTreeIterator_open(-1,tree->r_)), + lsmIterator_(lsmTreeIterator_open(-1,tree?tree->r_:NULLRID)), slot_(0) { init_helper(); @@ -109,15 +214,16 @@ class treeIterator { tree_(t.tree_), scratch_(t.scratch_), keylen_(t.keylen_), - lsmIterator_(lsmTreeIterator_copy(-1,t.lsmIterator_)), + lsmIterator_(t.lsmIterator_?lsmTreeIterator_copy(-1,t.lsmIterator_):0), slot_(t.slot_), pageid_(t.pageid_), p_((Page*)((t.p_)?loadPage(-1,t.p_->id):0)), currentPage_((PAGELAYOUT*)((p_)?p_->impl:0)) { } ~treeIterator() { - - lsmTreeIterator_close(-1, lsmIterator_); + if(lsmIterator_) { + lsmTreeIterator_close(-1, lsmIterator_); + } if(p_) { releasePage(p_); p_ = 0; @@ -148,10 +254,13 @@ class treeIterator { abort(); } } + /* for(int c = 0; c < (scratch_).column_count(); c++) { + assert(*(byte*)(scratch_).get(c) || !*(byte*)(scratch_).get(c)); + } */ return scratch_; } inline bool operator==(const treeIterator &a) const { - return (slot_ == a.slot_ && pageid_ == a.pageid_); + return (slot_ == a.slot_ && pageid_ == a.pageid_)/* || !(lsmIterator_ && a.lsmIterator_)*/ ; } inline bool operator!=(const treeIterator &a) const { return !(*this==a); @@ -259,8 +368,12 @@ class mergeIterator { { } const ROW& operator* () { - if(curr_ == A || curr_ == BOTH) { return *a_; } - if(curr_ == B) { return *b_; } + if(curr_ == A) { return *a_; } + if(curr_ == B || curr_ == BOTH) { return *b_; } + // abort(); + curr_ = calcCurr(A); + if(curr_ == A) { return *a_; } + if(curr_ == B || curr_ == BOTH) { return *b_; } abort(); } void seekEnd() { @@ -402,15 +515,11 @@ class versioningIterator { } inline unsigned int offset() { return off_; } private: - // unsigned int off_; ITER a_; ITER aend_; int check_tombstone_; ROW tombstone_; - // ROW &scratch_; off_t off_; - // int before_eof_; - // typeof(ROW::TIMESTAMP) beginning_of_time_; friend const byte* toByteArray(versioningIterator * const t); }; @@ -470,9 +579,9 @@ inline const byte * toByteArray(stlSetIterator * const t) { position */ template inline const byte * toByteArray(mergeIterator * const t) { - if(t->curr_ == t->A || t->curr_ == t->BOTH) { + if(t->curr_ == t->A) { return toByteArray(&t->a_); - } else if(t->curr_ == t->B) { + } else if(t->curr_ == t->B || t->curr_ == t->BOTH) { return toByteArray(&t->b_); } abort(); diff --git a/src/stasis/operations/lsmTable.h b/src/stasis/operations/lsmTable.h index e16d24d..d45590d 100644 --- a/src/stasis/operations/lsmTable.h +++ b/src/stasis/operations/lsmTable.h @@ -12,6 +12,7 @@ #include namespace rose { + /** @file @@ -46,6 +47,8 @@ namespace rose { typename ITERA::handle ** out_tree; void * out_tree_allocer; typename ITERA::handle my_tree; + epoch_t * last_complete_xact; + column_number_t ts_col; }; template @@ -72,6 +75,9 @@ namespace rose { typename PAGELAYOUT::FMT * mc = PAGELAYOUT::initPage(p, &**begin); for(ITER i(*begin); i != *end; ++i) { + /* for(int c = 0; c < (*i).column_count(); c++) { + assert(*(byte*)(*i).get(c) || !*(byte*)(*i).get(c)); + } */ rose::slot_index_t ret = mc->append(xid, *i); if(ret == rose::NOSPACE) { @@ -164,6 +170,8 @@ namespace rose { gettimeofday(&wait_tv,0); + epoch_t current_timestamp = a->last_complete_xact ? *(a->last_complete_xact) : 0; + uint64_t insertedTuples; pageid_t mergedPages; ITERA *taBegin = new ITERA(tree); @@ -213,10 +221,19 @@ namespace rose { mergedPages = compressData , typename PAGELAYOUT::FMT::TUP> > (xid, &vBegin, &vEnd,tree->r_,a->pageAlloc,a->pageAllocState,&insertedTuples); */ - mergedPages = compressData - > - (xid, &mBegin, &mEnd,tree->r_,a->pageAlloc,a->pageAllocState,&insertedTuples); + /* mergedPages = compressData + > + (xid, &mBegin, &mEnd,tree->r_,a->pageAlloc,a->pageAllocState,&insertedTuples); */ + + gcIterator > gcBegin(&mBegin, &mEnd, current_timestamp, a->ts_col); + gcIterator > gcEnd(&mEnd, &mEnd, current_timestamp, a->ts_col); + + //++gcBegin; + + mergedPages = compressData + > > + (xid, &gcBegin, &gcEnd,tree->r_,a->pageAlloc,a->pageAllocState,&insertedTuples); // these tree iterators keep pages pinned! Don't call force until they've been deleted, or we'll deadlock. } // free all the stack allocated iterators... @@ -422,10 +439,12 @@ namespace rose { typename PAGELAYOUT::FMT>, stlSetIterator, typename PAGELAYOUT::FMT::TUP> > * args2; + epoch_t last_xact; }; template - lsmTableHandle * TlsmTableStart(recordid& tree) { + // XXX ts_col should be an argument to TlsmTableAlloc, not start!!! + lsmTableHandle * TlsmTableStart(recordid& tree, column_number_t ts_col) { /// XXX xid for daemon processes? lsmTableHeader_t h; Tread(-1, tree, &h); @@ -502,6 +521,8 @@ namespace rose { ret->input_needed_cond = block0_needed_cond; ret->input_size = block0_size; + ret->last_xact = 0; + recordid * ridp = (recordid*)malloc(sizeof(recordid)); *ridp = h.bigTreeAllocState; recordid * oldridp = (recordid*)malloc(sizeof(recordid)); @@ -532,7 +553,9 @@ namespace rose { allocer_scratch, 0, 0, - new typename LSM_ITER::treeIteratorHandle(NULLRID) + new typename LSM_ITER::treeIteratorHandle(NULLRID), + &(ret->last_xact), + ts_col }; *ret->args1 = tmpargs1; void * (*merger1)(void*) = mergeThread; @@ -566,7 +589,9 @@ namespace rose { 0, block1_scratch, allocer_scratch, - new typename LSM_ITER::treeIteratorHandle(NULLRID) + new typename LSM_ITER::treeIteratorHandle(NULLRID), + 0, + ts_col }; *ret->args2 = tmpargs2; void * (*merger2)(void*) = mergeThread; @@ -626,9 +651,21 @@ namespace rose { pthread_join(h->merge1_thread,0); pthread_join(h->merge2_thread,0); } + template + void TlsmTableUpdateTimestamp(lsmTableHandle *h, + epoch_t ts) { + pthread_mutex_lock(h->mut); + assert(h->last_xact <= ts); + h->last_xact = ts; + pthread_mutex_unlock(h->mut); + } template void TlsmTableInsert( lsmTableHandle *h, typename PAGELAYOUT::FMT::TUP &t) { + /* for(int i = 0; i < t.column_count(); i++) { // This helps valgrind warn earlier... + assert(*((char*)t.get(i)) || *((char*)t.get(i))+1); + } */ + h->scratch_handle->insert(t); uint64_t handleBytes = h->scratch_handle->size() * (RB_TREE_OVERHEAD + PAGELAYOUT::FMT::TUP::sizeofBytes()); @@ -688,6 +725,99 @@ namespace rose { return ret; } + template + int TlsmTableCount(int xid, lsmTableHandle *h) { + /* treeIterator it1(NULLRID); + treeIterator it2(NULLRID; + stlSetIterator, + typename PAGELAYOUT::FMT::TUP::stl_cmp>, + typename PAGELAYOUT::FMT::TUP> it3(???);*/ + + // typename std::set LSM_ITER; + /* taypedef stlSetIterator, + typename PAGELAYOUT::FMT::TUP> RB_ITER; */ + typedef std::_Rb_tree_const_iterator RB_ITER; + // typedef mergeIterator INNER_MERGE; + typedef mergeIterator LSM_LSM ; + typedef mergeIterator RB_RB ; + typedef mergeIterator LSM_M_LSM_LSM; + typedef mergeIterator M_LSM_LSM_LSM_M_RB_RB; + LSM_ITER * it1end; + LSM_ITER * it2end; + LSM_ITER * it3end; + int ret =0; + pthread_mutex_lock(h->mut); + + { + + LSM_ITER it2(*h->args1->in_tree ? **h->args1->in_tree : 0); + it2end = it2.end(); + + // while(it2 != *it2end) { *it2; ++it2; ret++;} + + + RB_ITER it4(*h->args2->in_tree ? (**h->args2->in_tree)->begin() : h->scratch_handle->end()); + RB_ITER it4end(*h->args2->in_tree ? (**h->args2->in_tree)->end() : h->scratch_handle->end()); + + // while(it4 != it4end) { *it4; ++it4; ret++; } + + LSM_ITER it1(h->args1->my_tree); + it1end = it1.end(); + + // while(it1 != *it1end) { *it1; ++it1; ret++; } + + LSM_ITER it3(h->args2->my_tree); + it3end = it3.end(); + + // while(it3 != *it3end) { *it3; ++it3; ret++; } + + RB_ITER it5 = h->scratch_handle->begin(); + RB_ITER it5end = h->scratch_handle->end(); + + // while(it5 != it5end) { *it5; ++it5; ret++; } + + RB_RB m45(it4,it5,it4end,it5end); + RB_RB m45end(it4,it5,it4end,it5end); + m45end.seekEnd(); + + // while(m45 != m45end) { ++m45; } + + LSM_LSM m23(it2,it3,*it2end,*it3end); + LSM_LSM m23end(it2,it3,*it2end,*it3end); + m23end.seekEnd(); + + // while(m23 != m23end) { ++m23; } + + LSM_M_LSM_LSM m123(it1,m23,*it1end,m23end); + LSM_M_LSM_LSM m123end(it1,m23,*it1end,m23end); + m123end.seekEnd(); + + // if(m123 != m123end) { ++m123; } + + M_LSM_LSM_LSM_M_RB_RB m12345(m123,m45,m123end,m45end); + M_LSM_LSM_LSM_M_RB_RB m12345end(m123,m45,m123end,m45end); + m12345end.seekEnd(); + + while(m12345 != m12345end) { + *m12345; + ++ret; + ++m12345; + } + + + } // free the stack allocated iterators + delete it2end; + delete it1end; + delete it3end; + + pthread_mutex_unlock(h->mut); + + return ret; + } + template const typename PAGELAYOUT::FMT::TUP * TlsmTableFind(int xid, lsmTableHandle *h, @@ -738,7 +868,7 @@ namespace rose { } else { DEBUG("no tree"); } - + pthread_mutex_unlock(h->mut); DEBUG("Not in any tree\n"); assert(r == 0); return r; diff --git a/src/stasis/operations/lsmTree.c b/src/stasis/operations/lsmTree.c index 231c72d..9534525 100644 --- a/src/stasis/operations/lsmTree.c +++ b/src/stasis/operations/lsmTree.c @@ -717,6 +717,9 @@ pageid_t TlsmFindPage(int xid, recordid tree, const byte *key) { } pageid_t TlsmLastPage(int xid, recordid tree) { + if(tree.page == 0 && tree.slot == 0 && tree.size == -1) { + return -1; + } Page * root = loadPage(xid, tree.page); readlock(root->rwlatch,0); lsmTreeState *state = root->impl; @@ -781,6 +784,7 @@ page_impl lsmRootImpl() { ///--------------------- Iterator implementation lladdIterator_t *lsmTreeIterator_open(int xid, recordid root) { + if(root.page == 0 && root.slot == 0 && root.size == -1) { return 0; } Page *p = loadPage(xid,root.page); readlock(p->rwlatch,0); size_t keySize = getKeySize(xid,p);