removed some more redundant code by unifying merge_iterators

git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@532 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
sears 2010-01-28 02:20:49 +00:00
parent 23359c2570
commit 094a0d41ee
6 changed files with 88 additions and 344 deletions

View file

@ -18,6 +18,7 @@ typedef struct datatuple
{ {
typedef uchar* key_t; typedef uchar* key_t;
typedef uchar* data_t; typedef uchar* data_t;
static const size_t isize = sizeof(uint32_t);
uint32_t *keylen; //key length should be size of string + 1 for \n uint32_t *keylen; //key length should be size of string + 1 for \n
uint32_t *datalen; uint32_t *datalen;
key_t key; key_t key;
@ -32,6 +33,26 @@ typedef struct datatuple
//return (*((int32_t*)lhs.key)) <= (*((int32_t*)rhs.key)); //return (*((int32_t*)lhs.key)) <= (*((int32_t*)rhs.key));
} }
void clone(const datatuple& tuple) {
//create a copy
byte * arr = (byte*) malloc(tuple.byte_length());
keylen = (uint32_t*) arr;
*keylen = *tuple.keylen;
datalen = (uint32_t*) (arr+isize);
*datalen = *tuple.datalen;
key = (datatuple::key_t) (arr+isize+isize);
memcpy((byte*)key, (byte*)tuple.key, *keylen);
if(!tuple.isDelete())
{
data = (datatuple::data_t) (arr+isize+isize+ *keylen);
memcpy((byte*)data, (byte*)tuple.data, *datalen);
}
else
data = 0;
}
/** /**
* return -1 if k1 < k2 * return -1 if k1 < k2
* 0 if k1 == k2 * 0 if k1 == k2

View file

@ -1,17 +1,6 @@
#include "logstore.h" #include "logstore.h"
#include "logiterators.h" #include "logiterators.h"
//template <class MEMTREE, class TUPLE>
/*
template <>
const byte* toByteArray<std::set<datatuple,datatuple>, datatuple>(
memTreeIterator<std::set<datatuple,datatuple>, datatuple> * const t)
{
return (*(t->it_)).to_bytes();
}
*/
///////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////
// tree iterator implementation // tree iterator implementation
///////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////
@ -28,21 +17,10 @@ treeIterator<TUPLE>::treeIterator(recordid tree) :
template <class TUPLE> template <class TUPLE>
treeIterator<TUPLE>::treeIterator(recordid tree, TUPLE& key) : treeIterator<TUPLE>::treeIterator(recordid tree, TUPLE& key) :
tree_(tree), tree_(tree),
//scratch_(), lsmIterator_(logtreeIterator::openAt(-1,tree,key.get_key()))
lsmIterator_(logtreeIterator::openAt(-1,tree,key.get_key()))//toByteArray())),
//slot_(0)
{ {
init_helper(); init_helper();
/*
treeIterator * end = this->end();
for(;*this != *end && **this < key; ++(*this))
{
DEBUG("treeIterator was not at the given TUPLE");
}
delete end;
*/
} }
template <class TUPLE> template <class TUPLE>
@ -126,70 +104,9 @@ TUPLE * treeIterator<TUPLE>::getnext()
readTuple = dp_itr->getnext(-1); readTuple = dp_itr->getnext(-1);
assert(readTuple); assert(readTuple);
} }
else // else readTuple is null. We're done.
{
// TODO: what is this?
//past end of iterator! "end" should contain the pageid of the
// last leaf, and 1+ numslots on that page.
//abort();
}
} }
return curr_tuple=readTuple; curr_tuple = readTuple;
return curr_tuple;
} }
/*
template <class TUPLE>
treeIterator<TUPLE>::treeIterator(treeIteratorHandle* tree, TUPLE& key) :
tree_(tree?tree->r_:NULLRID),
scratch_(),
lsmIterator_(logtreeIterator::openAt(-1,tree?tree->r_:NULLRID,key.get_key())),//toByteArray())),
slot_(0)
{
init_helper();
if(lsmIterator_) {
treeIterator * end = this->end();
for(;*this != *end && **this < key; ++(*this)) { }
delete end;
} else {
this->slot_ = 0;
this->pageid_ = 0;
}
}
template <class TUPLE>
treeIterator<TUPLE>::treeIterator(recordid tree, TUPLE &scratch) :
tree_(tree),
scratch_(scratch),
lsmIterator_(logtreeIterator::open(-1,tree)),
slot_(0)
{
init_helper();
}
template <class TUPLE>
treeIterator<TUPLE>::treeIterator(treeIteratorHandle* tree) :
tree_(tree?tree->r_:NULLRID),
scratch_(),
lsmIterator_(logtreeIterator::open(-1,tree?tree->r_:NULLRID)),
slot_(0)
{
init_helper();
}
template <class TUPLE>
treeIterator<TUPLE>::treeIterator(treeIterator& t) :
tree_(t.tree_),
scratch_(t.scratch_),
lsmIterator_(t.lsmIterator_?logtreeIterator::copy(-1,t.lsmIterator_):0),
slot_(t.slot_),
pageid_(t.pageid_),
p_((Page*)((t.p_)?loadPage(-1,t.p_->id):0))
//currentPage_((PAGELAYOUT*)((p_)?p_->impl:0))
{
if(p_)
readlock(p_->rwlatch,0);
}
*/

View file

@ -7,11 +7,6 @@
#undef begin #undef begin
#undef end #undef end
template <class MEMTREE, class TUPLE> class memTreeIterator;
template <class MEMTREE, class TUPLE>
const byte* toByteArray(memTreeIterator<MEMTREE,TUPLE> * const t);
template <class TUPLE> template <class TUPLE>
class DataPage; class DataPage;
@ -19,134 +14,52 @@ class DataPage;
// memTreeIterator // memTreeIterator
///////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////
template<class MEMTREE, class TUPLE> template <class MEMTREE, class TUPLE>
class memTreeIterator{ class memTreeIterator
{
private: private:
typedef typename MEMTREE::const_iterator MTITER; typedef typename MEMTREE::const_iterator MTITER;
public: public:
memTreeIterator( MEMTREE *s ) memTreeIterator( MEMTREE *s ) : first_(true), it_(s->begin()), itend_(s->end()) { }
{ memTreeIterator( MEMTREE *s, TUPLE &key ) : first_(true), it_(s->find(key)), itend_(s->end()) { }
it_ = s->begin();
itend_ = s->end(); ~memTreeIterator() { }
TUPLE* getnext() {
if(it_==itend_) { return NULL; }
if(first_) { first_ = 0;} else { it_++; }
TUPLE *t = new TUPLE();
t->clone(*it_);
return t;
} }
memTreeIterator( MTITER& it, MTITER& itend )
{
it_ = it;
itend_ = itend;
}
explicit memTreeIterator(memTreeIterator &i)
{
it_ = i.it_;
itend_ = i.itend_;
}
const TUPLE& operator* ()
{
return *it_;
}
void seekEnd()
{
it_ = itend_;
}
memTreeIterator * end()
{
return new memTreeIterator<MEMTREE,TUPLE>(itend_,itend_);
}
inline bool operator==(const memTreeIterator &o) const {
return it_ == o.it_;
}
inline bool operator!=(const memTreeIterator &o) const {
return !(*this == o);
}
inline void operator++() {
++it_;
}
inline void operator--() {
--it_;
}
inline int operator-(memTreeIterator &i) {
return it_ - i.it_;
}
inline void operator=(memTreeIterator const &i)
{
it_ = i.it_;
itend_ = i.itend_;
}
public:
typedef MEMTREE* handle;
private: private:
explicit memTreeIterator() { abort(); }
void operator=(memTreeIterator & t) { abort(); }
int operator-(memTreeIterator & t) { abort(); }
private:
bool first_;
MTITER it_; MTITER it_;
MTITER itend_; MTITER itend_;
friend const byte* toByteArray<MEMTREE,TUPLE>(memTreeIterator<MEMTREE,TUPLE> * const t);
}; };
template <class MEMTREE, class TUPLE>
const byte* toByteArray(memTreeIterator<MEMTREE,TUPLE> * const t)
{
return (*(t->it_)).to_bytes();//toByteArray();
}
///////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////
/**
Scans through an LSM tree's leaf pages, each tuple in the tree, in
order. This iterator is designed for maximum forward scan
performance, and does not support all STL operations.
**/
template <class TUPLE> template <class TUPLE>
class treeIterator class treeIterator
{ {
public: public:
// typedef recordid handle;
class treeIteratorHandle
{
public:
treeIteratorHandle() : r_(NULLRID) {}
treeIteratorHandle(const recordid r) : r_(r) {}
treeIteratorHandle * operator=(const recordid &r) {
r_ = r;
return this;
}
recordid r_;
};
typedef treeIteratorHandle* handle;
explicit treeIterator(recordid tree); explicit treeIterator(recordid tree);
explicit treeIterator(recordid tree,TUPLE &key); explicit treeIterator(recordid tree,TUPLE &key);
//explicit treeIterator(treeIteratorHandle* tree, TUPLE& key);
//explicit treeIterator(treeIteratorHandle* tree);
//explicit treeIterator(treeIterator& t);
~treeIterator(); ~treeIterator();
TUPLE * getnext(); TUPLE * getnext();
//void advance(int count=1);
private: private:
inline void init_helper(); inline void init_helper();
@ -166,8 +79,5 @@ private:
TUPLE *curr_tuple; //current tuple TUPLE *curr_tuple; //current tuple
}; };
#endif #endif

View file

@ -1232,7 +1232,6 @@ void logtable::insertTuple(struct datatuple &tuple)
//static int count = LATCH_INTERVAL; //static int count = LATCH_INTERVAL;
//static int tsize = 0; //number of tuples //static int tsize = 0; //number of tuples
//static int64_t tree_bytes = 0; //number of bytes //static int64_t tree_bytes = 0; //number of bytes
static const size_t isize = sizeof(uint32_t);
//lock the red-black tree //lock the red-black tree
readlock(mergedata->header_lock,0); readlock(mergedata->header_lock,0);
@ -1257,23 +1256,8 @@ void logtable::insertTuple(struct datatuple &tuple)
else //no tuple with same key exists in mem-tree else //no tuple with same key exists in mem-tree
{ {
//create a copy
datatuple t; datatuple t;
byte *arr = (byte*) malloc(tuple.byte_length()); t.clone(tuple);
t.keylen = (uint32_t*) arr;
*t.keylen = *tuple.keylen;
t.datalen = (uint32_t*) (arr+isize);
*t.datalen = *tuple.datalen;
t.key = (datatuple::key_t) (arr+isize+isize);
memcpy((byte*)t.key, (byte*)tuple.key, *t.keylen);
if(!tuple.isDelete())
{
t.data = (datatuple::data_t) (arr+isize+isize+ *(t.keylen));
memcpy((byte*)t.data, (byte*)tuple.data, *t.datalen);
}
else
t.data = 0;
//insert tuple into the rbtree //insert tuple into the rbtree
tree_c0->insert(t); tree_c0->insert(t);

View file

@ -187,7 +187,7 @@ void merge_scheduler::startlogtable(int index)
allocer_scratch, //in_tree_allocer allocer_scratch, //in_tree_allocer
0, //out_tree 0, //out_tree
0, //out_tree_allocer 0, //out_tree_allocer
new treeIterator<datatuple>::treeIteratorHandle(ltable->get_tree_c2()->get_root_rec()), // my_tree ltable->get_tree_c2()->get_root_rec(), // my_tree
ltable->get_table_rec() //tree ltable->get_table_rec() //tree
}; };
@ -227,7 +227,7 @@ void merge_scheduler::startlogtable(int index)
0, 0,
block1_scratch, block1_scratch,
allocer_scratch, allocer_scratch,
new treeIterator<datatuple>::treeIteratorHandle(ltable->get_tree_c1()->get_root_rec()), ltable->get_tree_c1()->get_root_rec(),
ltable->get_table_rec() //tree ltable->get_table_rec() //tree
}; };
@ -250,7 +250,7 @@ void* memMergeThread(void*arg)
int xid;// = Tbegin(); int xid;// = Tbegin();
merger_args<rbtree_t> * a = (merger_args<rbtree_t>*)(arg); merger_args<rbtree_t> * a = (merger_args<rbtree_t>*)(arg);
assert(a->my_tree->r_.size != -1); assert(a->my_tree.size != -1);
logtable * ltable = a->ltable; logtable * ltable = a->ltable;
@ -307,13 +307,12 @@ void* memMergeThread(void*arg)
uint64_t insertedTuples=0; uint64_t insertedTuples=0;
int64_t mergedPages=0; int64_t mergedPages=0;
assert(a->my_tree->r_.size != -1); assert(a->my_tree.size != -1);
//create the iterators //create the iterators
treeIterator<datatuple> *itrA = new treeIterator<datatuple>(a->my_tree->r_); treeIterator<datatuple> *itrA = new treeIterator<datatuple>(a->my_tree);
memTreeIterator<rbtree_t, datatuple> *itrB = memTreeIterator<rbtree_t, datatuple> *itrB =
new memTreeIterator<rbtree_t, datatuple>(*a->in_tree); new memTreeIterator<rbtree_t, datatuple>(*a->in_tree);
memTreeIterator<rbtree_t, datatuple> *itrBend = itrB->end();
//Tcommit(xid); //Tcommit(xid);
xid = Tbegin(); xid = Tbegin();
@ -335,11 +334,10 @@ void* memMergeThread(void*arg)
printf("mmt:\tMerging:\n"); printf("mmt:\tMerging:\n");
int64_t npages = 0; int64_t npages = 0;
mergedPages = merge_iterators(xid, itrA, itrB, ltable, scratch_tree, npages); mergedPages = merge_iterators<typeof(*itrA),typeof(*itrB)>(xid, itrA, itrB, ltable, scratch_tree, npages, false);
delete itrA; delete itrA;
delete itrB; delete itrB;
delete itrBend;
//force write the new region to disk //force write the new region to disk
recordid scratch_alloc_state = scratch_tree->get_tree_state(); recordid scratch_alloc_state = scratch_tree->get_tree_state();
@ -376,7 +374,7 @@ void* memMergeThread(void*arg)
// free old my_tree here // free old my_tree here
//TODO: check //TODO: check
logtree::free_region_rid(xid, a->my_tree->r_, logtree::dealloc_region_rid, oldAllocState); logtree::free_region_rid(xid, a->my_tree, logtree::dealloc_region_rid, oldAllocState);
//TlsmFree(xid,a->my_tree->r_,logtree::dealloc_region_rid,oldAllocState); //TlsmFree(xid,a->my_tree->r_,logtree::dealloc_region_rid,oldAllocState);
@ -423,7 +421,7 @@ void* memMergeThread(void*arg)
*(recordid*)(a->pageAllocState) = empty_tree->get_tree_state(); *(recordid*)(a->pageAllocState) = empty_tree->get_tree_state();
a->my_tree->r_ = empty_tree->get_root_rec(); a->my_tree = empty_tree->get_root_rec();
ltable->set_tree_c1(empty_tree); ltable->set_tree_c1(empty_tree);
@ -444,7 +442,7 @@ void* memMergeThread(void*arg)
{ {
printf("mmt:\tnot signaling C2 for merge\n"); printf("mmt:\tnot signaling C2 for merge\n");
*(recordid*)a->pageAllocState = scratch_alloc_state; *(recordid*)a->pageAllocState = scratch_alloc_state;
a->my_tree->r_ = scratch_root; a->my_tree = scratch_root;
} }
rbtree_ptr_t deltree = *a->in_tree; rbtree_ptr_t deltree = *a->in_tree;
@ -480,12 +478,13 @@ void* memMergeThread(void*arg)
} }
void *diskMergeThread(void*arg) void *diskMergeThread(void*arg)
{ {
int xid;// = Tbegin(); int xid;// = Tbegin();
merger_args<logtree> * a = (merger_args<logtree>*)(arg); merger_args<logtree> * a = (merger_args<logtree>*)(arg);
assert(a->my_tree->r_.size != -1); assert(a->my_tree.size != -1);
logtable * ltable = a->ltable; logtable * ltable = a->ltable;
@ -530,10 +529,10 @@ void *diskMergeThread(void*arg)
uint64_t insertedTuples=0; uint64_t insertedTuples=0;
int64_t mergedPages=0; int64_t mergedPages=0;
assert(a->my_tree->r_.size != -1); assert(a->my_tree.size != -1);
//create the iterators //create the iterators
treeIterator<datatuple> *itrA = new treeIterator<datatuple>(a->my_tree->r_); treeIterator<datatuple> *itrA = new treeIterator<datatuple>(a->my_tree);
treeIterator<datatuple> *itrB = treeIterator<datatuple> *itrB =
new treeIterator<datatuple>((*a->in_tree)->get_root_rec()); new treeIterator<datatuple>((*a->in_tree)->get_root_rec());
@ -559,7 +558,7 @@ void *diskMergeThread(void*arg)
printf("dmt:\tMerging:\n"); printf("dmt:\tMerging:\n");
int64_t npages = 0; int64_t npages = 0;
mergedPages = merge_iterators(xid, itrA, itrB, ltable, scratch_tree, npages); mergedPages = merge_iterators<typeof(*itrA),typeof(*itrB)>(xid, itrA, itrB, ltable, scratch_tree, npages, true);
delete itrA; delete itrA;
delete itrB; delete itrB;
@ -602,7 +601,7 @@ void *diskMergeThread(void*arg)
// free old my_tree here // free old my_tree here
//TODO: check //TODO: check
logtree::free_region_rid(xid, a->my_tree->r_, logtree::dealloc_region_rid, oldAllocState); logtree::free_region_rid(xid, a->my_tree, logtree::dealloc_region_rid, oldAllocState);
//TlsmFree(xid,a->my_tree->r_,logtree::dealloc_region_rid,oldAllocState); //TlsmFree(xid,a->my_tree->r_,logtree::dealloc_region_rid,oldAllocState);
//TODO: check //TODO: check
@ -612,7 +611,7 @@ void *diskMergeThread(void*arg)
*(recordid*)a->pageAllocState = scratch_alloc_state; *(recordid*)a->pageAllocState = scratch_alloc_state;
a->my_tree->r_ = scratch_root; a->my_tree = scratch_root;
//// ----------- Free in_tree //// ----------- Free in_tree
//TODO: check //TODO: check
@ -643,97 +642,17 @@ void *diskMergeThread(void*arg)
} }
template <class ITA, class ITB>
int64_t merge_iterators(int xid, int64_t merge_iterators(int xid,
treeIterator<datatuple> *itrA, ITA *itrA, //iterator on c1 or c2
memTreeIterator<rbtree_t, datatuple> * itrB, ITB *itrB, //iterator on c0 or c1, respectively
logtable *ltable, logtable *ltable,
logtree *scratch_tree, logtree *scratch_tree,
int64_t &npages ) int64_t &npages,
bool dropDeletes // should be true iff this is biggest component
)
{ {
int64_t dpages = 0; int64_t dpages = 0;
//int npages = 0;
int64_t ntuples = 0;
DataPage<datatuple> *dp = 0;
memTreeIterator<rbtree_t, datatuple> *itrBend = itrB->end();
datatuple *t1 = itrA->getnext();
while(*itrB != *itrBend)
{
datatuple t2 = **itrB;
DEBUG("tuple\t%lld: keylen %d datalen %d\n", ntuples, *t2.keylen,*t2.datalen );
while(t1 != 0 && datatuple::compare(t1->key, t2.key) < 0) // t1 is less than t2
{
//insert t1
dp = insertTuple(xid, dp, *t1, ltable, scratch_tree, ltable->get_dpstate1(),
dpages, npages);
free(t1->keylen);
free(t1);
ntuples++;
//advance itrA
t1 = itrA->getnext();
}
if(t1 != 0 && datatuple::compare(t1->key, t2.key) == 0)
{
datatuple *mtuple = ltable->gettuplemerger()->merge(t1,&t2);
//insert merged tuple
dp = insertTuple(xid, dp, *mtuple, ltable, scratch_tree, ltable->get_dpstate1(),
dpages, npages);
free(t1->keylen);
free(t1);
t1 = itrA->getnext(); //advance itrA
free(mtuple->keylen);
free(mtuple);
}
else
{
//insert t2
dp = insertTuple(xid, dp, t2, ltable, scratch_tree, ltable->get_dpstate1(),
dpages, npages);
//free(t2.keylen); //cannot free here it may still be read through a lookup
}
ntuples++;
++(*itrB);
}
while(t1 != 0) // t1 is less than t2
{
dp = insertTuple(xid, dp, *t1, ltable, scratch_tree, ltable->get_dpstate1(),
dpages, npages);
free(t1->keylen);
free(t1);
ntuples++;
//advance itrA
t1 = itrA->getnext();
}
delete itrBend;
if(dp!=NULL)
delete dp;
DEBUG("dpages: %d\tnpages: %d\tntuples: %d\n", dpages, npages, ntuples);
fflush(stdout);
return dpages;
}
int64_t merge_iterators(int xid,
treeIterator<datatuple> *itrA, //iterator on c2
treeIterator<datatuple> *itrB, //iterator on c1
logtable *ltable,
logtree *scratch_tree,
int64_t &npages)
{
int64_t dpages = 0;
//int npages = 0;
int64_t ntuples = 0; int64_t ntuples = 0;
DataPage<datatuple> *dp = 0; DataPage<datatuple> *dp = 0;
@ -764,7 +683,7 @@ int64_t merge_iterators(int xid,
datatuple *mtuple = ltable->gettuplemerger()->merge(t1,t2); datatuple *mtuple = ltable->gettuplemerger()->merge(t1,t2);
//insert merged tuple, drop deletes //insert merged tuple, drop deletes
if(!mtuple->isDelete()) if(dropDeletes && !mtuple->isDelete())
dp = insertTuple(xid, dp, *mtuple, ltable, scratch_tree, ltable->get_dpstate2(), dp = insertTuple(xid, dp, *mtuple, ltable, scratch_tree, ltable->get_dpstate2(),
dpages, npages); dpages, npages);
@ -779,6 +698,7 @@ int64_t merge_iterators(int xid,
//insert t2 //insert t2
dp = insertTuple(xid, dp, *t2, ltable, scratch_tree, ltable->get_dpstate2(), dp = insertTuple(xid, dp, *t2, ltable, scratch_tree, ltable->get_dpstate2(),
dpages, npages); dpages, npages);
// cannot free any tuples here; they may still be read through a lookup
} }
free(t2->keylen); free(t2->keylen);
@ -786,7 +706,7 @@ int64_t merge_iterators(int xid,
ntuples++; ntuples++;
} }
while(t1 != 0) while(t1 != 0) // t1 is less than t2
{ {
dp = insertTuple(xid, dp, *t1, ltable, scratch_tree, ltable->get_dpstate2(), dp = insertTuple(xid, dp, *t1, ltable, scratch_tree, ltable->get_dpstate2(),
dpages, npages); dpages, npages);
@ -801,7 +721,6 @@ int64_t merge_iterators(int xid,
if(dp!=NULL) if(dp!=NULL)
delete dp; delete dp;
DEBUG("dpages: %d\tnpages: %d\tntuples: %d\n", dpages, npages, ntuples); DEBUG("dpages: %d\tnpages: %d\tntuples: %d\n", dpages, npages, ntuples);
fflush(stdout);
return dpages; return dpages;

View file

@ -50,7 +50,7 @@ struct merger_args
logtree ** out_tree; logtree ** out_tree;
void * out_tree_allocer; void * out_tree_allocer;
treeIterator<datatuple>::treeIteratorHandle *my_tree; recordid my_tree;
recordid tree; recordid tree;
}; };
@ -104,21 +104,14 @@ public:
void* memMergeThread(void* arg); void* memMergeThread(void* arg);
//merges and returns the number of data pages used template <class ITA, class ITB>
int64_t merge_iterators(int xid, int64_t merge_iterators(int xid,
treeIterator<datatuple> *itrA, ITA *itrA,
memTreeIterator<rbtree_t, datatuple> * itrB, ITB *itrB,
logtable *ltable, logtable *ltable,
logtree *scratch_tree, logtree *scratch_tree,
int64_t &npages); int64_t &npages,
bool dropDeletes);
int64_t merge_iterators(int xid,
treeIterator<datatuple> *itrA,
treeIterator<datatuple> *itrB,
logtable *ltable,
logtree *scratch_tree,
int64_t &npages);
void* diskMergeThread(void* arg); void* diskMergeThread(void* arg);