preliminary (and largely untested) support for opening LSM-tree iterators starting at a given tuple

This commit is contained in:
Sears Russell 2008-11-24 01:32:35 +00:00
parent 23205f8cc4
commit 2b63991014
5 changed files with 199 additions and 71 deletions

View file

@ -636,13 +636,16 @@ void TlsmFree(int xid, recordid tree, lsm_page_deallocator_t dealloc,
Tdealloc(xid, *(recordid*)allocator_state);
}
static pageid_t lsmLookup(int xid, Page *node, int depth,
static const recordid lsmLookup(int xid, Page *node, int depth,
const byte *key, size_t keySize, lsm_comparator_t cmp) {
if(*recordcount_ptr(node) == FIRST_SLOT) { return -1; }
if(*recordcount_ptr(node) == FIRST_SLOT) {
return NULLRID;
}
assert(*recordcount_ptr(node) > FIRST_SLOT);
const lsmTreeNodeRecord *prev = readNodeRecord(xid,node,FIRST_SLOT,keySize);
slotid_t prev_slot = FIRST_SLOT;
int prev_cmp_key = cmp(prev+1,key);
// @todo binary search within each page
@ -657,7 +660,7 @@ static pageid_t lsmLookup(int xid, Page *node, int depth,
pageid_t child_id = prev->ptr;
Page *child_page = loadPage(xid, child_id);
readlock(child_page->rwlatch,0);
long ret = lsmLookup(xid,child_page,depth-1,key,keySize,cmp);
recordid ret = lsmLookup(xid,child_page,depth-1,key,keySize,cmp);
unlock(child_page->rwlatch);
releasePage(child_page);
return ret;
@ -666,10 +669,12 @@ static pageid_t lsmLookup(int xid, Page *node, int depth,
} else {
// XXX Doesn't handle runs of duplicates.
if(prev_cmp_key <= 0 && rec_cmp_key > 0) {
return prev->ptr;
recordid ret = {node->id, prev_slot, keySize};
return ret;
}
}
prev = rec;
prev_slot = i;
prev_cmp_key = rec_cmp_key;
if(rec_cmp_key > 0) { break; }
}
@ -680,17 +685,30 @@ static pageid_t lsmLookup(int xid, Page *node, int depth,
pageid_t child_id = prev->ptr;
Page *child_page = loadPage(xid, child_id);
readlock(child_page->rwlatch,0);
long ret = lsmLookup(xid,child_page,depth-1,key,keySize,cmp);
recordid ret = lsmLookup(xid,child_page,depth-1,key,keySize,cmp);
unlock(child_page->rwlatch);
releasePage(child_page);
return ret;
}
} else {
if(prev_cmp_key <= 0) {
return prev->ptr;
recordid ret = {node->id, prev_slot, keySize};
return ret;
}
}
return -1;
return NULLRID;
}
static pageid_t lsmLookupLeafPageFromRid(int xid, recordid rid, size_t keySize) {
pageid_t pid = -1;
if(rid.page != NULLRID.page || rid.slot != NULLRID.slot) {
Page * p2 = loadPage(xid, rid.page);
readlock(p2->rwlatch,0);
pid = readNodeRecord(xid,p2,rid.slot,keySize)->ptr;
unlock(p2->rwlatch);
releasePage(p2);
}
return pid;
}
/**
@ -714,8 +732,8 @@ pageid_t TlsmFindPage(int xid, recordid tree, const byte *key) {
lsm_comparator_t cmp = comparators[cmp_nr->ptr];
pageid_t ret = lsmLookup(xid, p, depth, key, keySize, cmp);
recordid rid = lsmLookup(xid, p, depth, key, keySize, cmp);
pageid_t ret = lsmLookupLeafPageFromRid(xid,rid,keySize);
unlock(p->rwlatch);
releasePage(p);
@ -790,7 +808,7 @@ page_impl lsmRootImpl() {
}
///--------------------- Iterator implementation
lladdIterator_t *lsmTreeIterator_open(int xid, recordid root) {
lladdIterator_t* lsmTreeIterator_open(int xid, recordid root) {
if(root.page == 0 && root.slot == 0 && root.size == -1) { return 0; }
Page *p = loadPage(xid,root.page);
readlock(p->rwlatch,0);
@ -820,17 +838,42 @@ lladdIterator_t *lsmTreeIterator_open(int xid, recordid root) {
lladdIterator_t *it = malloc(sizeof(lladdIterator_t));
it->type = -1; // XXX LSM_TREE_ITERATOR;
it->impl = impl;
/* itdef = { <-- @todo register lsmTree iterators with stasis someday...
lsmTreeIterator_close;
lsmTreeIterator_next;
lsmTreeIterator_next;
lsmTreeIterator_key;
lsmTreeIterator_value;
lsmTreeIterator_tupleDone;
lsmTreeIterator_releaseLock;
} */
return it;
}
lladdIterator_t* lsmTreeIterator_openAt(int xid, recordid root, const byte* key) {
if(root.page == NULLRID.page && root.slot == NULLRID.slot) return 0;
Page *p = loadPage(xid,root.page);
readlock(p->rwlatch,0);
size_t keySize = getKeySize(xid,p);
const lsmTreeNodeRecord *nr = readNodeRecord(xid,p,DEPTH,keySize);
const lsmTreeNodeRecord *cmp_nr = readNodeRecord(xid, p , COMPARATOR, keySize);
int depth = nr->ptr;
recordid lsm_entry_rid = lsmLookup(xid,p,depth,key,getKeySize(xid,p),comparators[cmp_nr->ptr]);
if(root.page != lsm_entry_rid.page) {
unlock(p->rwlatch);
releasePage(p);
p = loadPage(xid,lsm_entry_rid.page);
readlock(p->rwlatch,0);
}
lsmIteratorImpl *impl = malloc(sizeof(lsmIteratorImpl));
impl->p = p;
impl->current.page = lsm_entry_rid.page;
impl->current.slot = lsm_entry_rid.slot - 1; // slot before thing of interest
impl->current.size = lsm_entry_rid.size;
impl->t = 0; // value doesn't matter; will be overwritten by next()
impl->justOnePage = (depth==0);
lladdIterator_t *it = malloc(sizeof(lladdIterator_t));
it->type = -1; // XXX LSM_TREE_ITERATOR
it->impl = impl;
return it;
}
lladdIterator_t *lsmTreeIterator_copy(int xid, lladdIterator_t* i) {
lsmIteratorImpl *it = i->impl;
lsmIteratorImpl *mine = malloc(sizeof(lsmIteratorImpl));

View file

@ -244,15 +244,6 @@ class treeIterator {
}
}
public:
explicit treeIterator(recordid tree, ROW &scratch, int keylen) :
tree_(tree),
scratch_(scratch),
keylen_(keylen),
lsmIterator_(lsmTreeIterator_open(-1,tree)),
slot_(0)
{
init_helper();
}
// typedef recordid handle;
class treeIteratorHandle {
public:
@ -260,7 +251,7 @@ class treeIterator {
treeIteratorHandle(const recordid r) : r_(r) {}
/* const treeIteratorHandle & operator=(const recordid *r) {
r_ = *r;
return this;
return thisopenat;
} */
treeIteratorHandle * operator=(const recordid &r) {
r_ = r;
@ -270,6 +261,27 @@ class treeIterator {
recordid r_;
};
typedef treeIteratorHandle* handle;
explicit treeIterator(treeIteratorHandle* tree, ROW& key) :
tree_(tree?tree->r_:NULLRID),
scratch_(),
keylen_(ROW::sizeofBytes()),
lsmIterator_(lsmTreeIterator_openAt(-1,tree?tree->r_:NULLRID,key.toByteArray())),
slot_(0)
{
init_helper();
treeIterator * end = this->end();
for(;*this != *end && **this < key; ++(*this)) { }
delete end;
}
explicit treeIterator(recordid tree, ROW &scratch, int keylen) :
tree_(tree),
scratch_(scratch),
keylen_(keylen),
lsmIterator_(lsmTreeIterator_open(-1,tree)),
slot_(0)
{
init_helper();
}
explicit treeIterator(recordid tree) :
tree_(tree),
scratch_(),
@ -609,13 +621,12 @@ class versioningIterator {
*/
template<class SET,class ROW> class stlSetIterator {
private:
typedef typename SET::iterator STLITER;
typedef typename SET::const_iterator STLITER;
public:
typedef SET * handle;
stlSetIterator( SET * s ) : it_(s->begin()), itend_(s->end()) {}
stlSetIterator( STLITER& it, STLITER& itend ) : it_(it), itend_(itend) {}
explicit stlSetIterator(stlSetIterator &i) : it_(i.it_), itend_(i.itend_){}
const ROW& operator* () { return *it_; }

View file

@ -426,7 +426,7 @@ namespace rose {
bool * input_needed;
typename std::set
<typename PAGELAYOUT::FMT::TUP,
typename PAGELAYOUT::FMT::TUP::stl_cmp> * scratch_handle;
typename PAGELAYOUT::FMT::TUP::stl_cmp> * scratch_tree;
pthread_mutex_t * mut;
pthread_cond_t * input_ready_cond;
pthread_cond_t * input_needed_cond;
@ -512,7 +512,7 @@ namespace rose {
ret->input_handle = block0_scratch;
ret->input_needed = block0_needed;
ret->scratch_handle = new typeof(*ret->scratch_handle);
ret->scratch_tree = new typeof(*ret->scratch_tree);
ret->mut = block_ready_mut;
@ -622,13 +622,13 @@ namespace rose {
gettimeofday(&stop_tv,0);
stop = tv_to_double(stop_tv);
typeof(h->scratch_handle)* tmp_ptr
= (typeof(h->scratch_handle)*) malloc(sizeof(void*));
*tmp_ptr = h->scratch_handle;
typeof(h->scratch_tree)* tmp_ptr
= (typeof(h->scratch_tree)*) malloc(sizeof(void*));
*tmp_ptr = h->scratch_tree;
*(h->input_handle) = tmp_ptr;
pthread_cond_signal(h->input_ready_cond);
h->scratch_handle = new typeof(*h->scratch_handle);
h->scratch_tree = new typeof(*h->scratch_tree);
pthread_mutex_unlock(h->mut);
@ -645,7 +645,7 @@ namespace rose {
template<class PAGELAYOUT>
void TlsmTableStop( lsmTableHandle<PAGELAYOUT> * h) {
TlsmTableFlush(h);
delete(h->scratch_handle);
delete(h->scratch_tree);
*(h->still_open) = 0;
pthread_join(h->merge1_thread,0);
pthread_join(h->merge2_thread,0);
@ -665,9 +665,9 @@ namespace rose {
assert(*((char*)t.get(i)) || *((char*)t.get(i))+1);
} */
h->scratch_handle->insert(t);
h->scratch_tree->insert(t);
uint64_t handleBytes = h->scratch_handle->size() * (RB_TREE_OVERHEAD + PAGELAYOUT::FMT::TUP::sizeofBytes());
uint64_t handleBytes = h->scratch_tree->size() * (RB_TREE_OVERHEAD + PAGELAYOUT::FMT::TUP::sizeofBytes());
//XXX 4 = estimated compression ratio.
uint64_t inputSizeThresh = (4 * PAGE_SIZE * *h->input_size); // / (PAGELAYOUT::FMT::TUP::sizeofBytes());
uint64_t memSizeThresh = MEM_SIZE;
@ -692,7 +692,7 @@ namespace rose {
#endif
handleBytes > memSizeThresh ) ) { // XXX ok?
printf("Handle mbytes %lld (%lld) Input size: %lld input size thresh: %lld mbytes mem size thresh: %lld\n",
(long long) handleBytes / (1024*1024), (long long) h->scratch_handle->size(), (long long) *h->input_size,
(long long) handleBytes / (1024*1024), (long long) h->scratch_tree->size(), (long long) *h->input_size,
(long long) inputSizeThresh / (1024*1024), (long long) memSizeThresh / (1024*1024));
TlsmTableFlush<PAGELAYOUT>(h);
}
@ -759,8 +759,8 @@ namespace rose {
// while(it2 != *it2end) { *it2; ++it2; ret++;}
RB_ITER it4(*h->args2->in_tree ? (**h->args2->in_tree)->begin() : h->scratch_handle->end());
RB_ITER it4end(*h->args2->in_tree ? (**h->args2->in_tree)->end() : h->scratch_handle->end());
RB_ITER it4(*h->args2->in_tree ? (**h->args2->in_tree)->begin() : h->scratch_tree->end());
RB_ITER it4end(*h->args2->in_tree ? (**h->args2->in_tree)->end() : h->scratch_tree->end());
// while(it4 != it4end) { *it4; ++it4; ret++; }
@ -774,8 +774,8 @@ namespace rose {
// while(it3 != *it3end) { *it3; ++it3; ret++; }
RB_ITER it5 = h->scratch_handle->begin();
RB_ITER it5end = h->scratch_handle->end();
RB_ITER it5 = h->scratch_tree->begin();
RB_ITER it5end = h->scratch_tree->end();
// while(it5 != it5end) { *it5; ++it5; ret++; }
@ -828,8 +828,8 @@ namespace rose {
typename std::set
<typename PAGELAYOUT::FMT::TUP,
typename PAGELAYOUT::FMT::TUP::stl_cmp>::iterator i =
h->scratch_handle->find(val);
if(i != h->scratch_handle->end()) {
h->scratch_tree->find(val);
if(i != h->scratch_tree->end()) {
scratch = *i;
pthread_mutex_unlock(h->mut);
return &scratch;
@ -837,6 +837,49 @@ namespace rose {
pthread_mutex_unlock(h->mut);
return 0;
}
template<class PAGELAYOUT>
void**
TlsmTableFindGTE(int xid, lsmTableHandle<PAGELAYOUT> *h,
typename PAGELAYOUT::FMT::TUP &val) {
// typedef stlSetIterator<typename std::set<typename PAGELAYOUT::FMT::TUP,
typedef stlSetIterator<typename std::set<typename PAGELAYOUT::FMT::TUP,
typename PAGELAYOUT::FMT::TUP::stl_cmp>,
typename PAGELAYOUT::FMT::TUP> RB_ITER;
typedef std::set<typename PAGELAYOUT::FMT::TUP,
typename PAGELAYOUT::FMT::TUP::stl_cmp> RB_SET;
typedef treeIterator<typename PAGELAYOUT::FMT::TUP,
typename PAGELAYOUT::FMT> LSM_ITER;
typename RB_SET::const_iterator * c0 = h->scratch_tree ?
new typename RB_SET::const_iterator(h->scratch_tree->lower_bound(val))
: 0;
typename RB_SET::const_iterator * c0p = *h->args2->in_tree ?
new typename RB_SET::const_iterator((**h->args2->in_tree)->lower_bound(val))
: 0;
LSM_ITER* c1 = new LSM_ITER( h->args2->my_tree , val);
LSM_ITER* c1p = new LSM_ITER(*h->args1->in_tree ? **h->args1->in_tree : 0 , val);
LSM_ITER* c2 = new LSM_ITER( h->args1->my_tree , val);
void ** ret = (void**)malloc(10 * sizeof(void*));
ret[0] = c0;
ret[1] = c0p;
ret[2] = c1;
ret[3] = c1p;
ret[4] = c2;
ret[5] = c0 ? new typename RB_SET::const_iterator(h->scratch_tree->end()) : 0;
ret[6] = c0p ? new typename RB_SET::const_iterator((**h->args2->in_tree)->end()) : 0;
ret[7] = c1->end();
ret[8] = c1p->end();
ret[9] = c2->end();
return ret;
}
template<class PAGELAYOUT>
const typename PAGELAYOUT::FMT::TUP *
TlsmTableFind(int xid, lsmTableHandle<PAGELAYOUT> *h,
@ -847,13 +890,13 @@ namespace rose {
typename std::set
<typename PAGELAYOUT::FMT::TUP,
typename PAGELAYOUT::FMT::TUP::stl_cmp>::iterator i =
h->scratch_handle->find(val);
if(i != h->scratch_handle->end()) {
h->scratch_tree->find(val);
if(i != h->scratch_tree->end()) {
scratch = *i;
pthread_mutex_unlock(h->mut);
return &scratch;
}
DEBUG("Not in scratch_handle\n");
DEBUG("Not in scratch_tree\n");
if(*h->args2->in_tree) {
i = (**h->args2->in_tree)->find(val);
if(i != (**h->args2->in_tree)->end()) {

View file

@ -121,8 +121,13 @@ typedef struct lsmIteratorImpl {
@see iterator.h for documentation of lsmTree's iterator interface.
*/
lladdIterator_t * lsmTreeIterator_open(int xid, recordid tree);
lladdIterator_t* lsmTreeIterator_open(int xid, recordid tree);
/**
Return a forward iterator over the tree's leaf pages, starting
on the given page.
*/
lladdIterator_t* lsmTreeIterator_openAt(int xid, recordid tree, const byte* key);
/*
These are the functions that implement lsmTree's iterator.

View file

@ -296,28 +296,54 @@ namespace rose {
static void printSt(void const * const sp) {
st const * const s = (st const * const)sp;
printf("(");
if(0<N) printf("%lld",(int64_t)s->cols0_);
if(1<N) printf(", %lld",(int64_t)s->cols1_);
if(2<N) printf(", %lld",(int64_t)s->cols2_);
if(3<N) printf(", %lld",(int64_t)s->cols3_);
if(4<N) printf(", %lld",(int64_t)s->cols4_);
if(5<N) printf(", %lld",(int64_t)s->cols5_);
if(6<N) printf(", %lld",(int64_t)s->cols6_);
if(7<N) printf(", %lld",(int64_t)s->cols7_);
if(8<N) printf(", %lld",(int64_t)s->cols8_);
if(9<N) printf(", %lld",(int64_t)s->cols9_);
if(10<N) printf(", %lld",(int64_t)s->cols10_);
if(11<N) printf(", %lld",(int64_t)s->cols11_);
if(12<N) printf(", %lld",(int64_t)s->cols12_);
if(13<N) printf(", %lld",(int64_t)s->cols13_);
if(14<N) printf(", %lld",(int64_t)s->cols14_);
if(15<N) printf(", %lld",(int64_t)s->cols15_);
if(16<N) printf(", %lld",(int64_t)s->cols16_);
if(17<N) printf(", %lld",(int64_t)s->cols17_);
if(18<N) printf(", %lld",(int64_t)s->cols18_);
if(19<N) printf(", %lld",(int64_t)s->cols19_);
if(0<N) printf("%lld", (long long)s->cols0_);
if(1<N) printf(", %lld", (long long)s->cols1_);
if(2<N) printf(", %lld", (long long)s->cols2_);
if(3<N) printf(", %lld", (long long)s->cols3_);
if(4<N) printf(", %lld", (long long)s->cols4_);
if(5<N) printf(", %lld", (long long)s->cols5_);
if(6<N) printf(", %lld", (long long)s->cols6_);
if(7<N) printf(", %lld", (long long)s->cols7_);
if(8<N) printf(", %lld", (long long)s->cols8_);
if(9<N) printf(", %lld", (long long)s->cols9_);
if(10<N) printf(", %lld",(long long)s->cols10_);
if(11<N) printf(", %lld",(long long)s->cols11_);
if(12<N) printf(", %lld",(long long)s->cols12_);
if(13<N) printf(", %lld",(long long)s->cols13_);
if(14<N) printf(", %lld",(long long)s->cols14_);
if(15<N) printf(", %lld",(long long)s->cols15_);
if(16<N) printf(", %lld",(long long)s->cols16_);
if(17<N) printf(", %lld",(long long)s->cols17_);
if(18<N) printf(", %lld",(long long)s->cols18_);
if(19<N) printf(", %lld",(long long)s->cols19_);
printf(")");
}
static void printErrSt(void const * const sp) {
st const * const s = (st const * const)sp;
fprintf(stderr, "(");
if(0<N) fprintf(stderr, "%lld", (long long)s->cols0_);
if(1<N) fprintf(stderr, ", %lld", (long long)s->cols1_);
if(2<N) fprintf(stderr, ", %lld", (long long)s->cols2_);
if(3<N) fprintf(stderr, ", %lld", (long long)s->cols3_);
if(4<N) fprintf(stderr, ", %lld", (long long)s->cols4_);
if(5<N) fprintf(stderr, ", %lld", (long long)s->cols5_);
if(6<N) fprintf(stderr, ", %lld", (long long)s->cols6_);
if(7<N) fprintf(stderr, ", %lld", (long long)s->cols7_);
if(8<N) fprintf(stderr, ", %lld", (long long)s->cols8_);
if(9<N) fprintf(stderr, ", %lld", (long long)s->cols9_);
if(10<N) fprintf(stderr, ", %lld",(long long)s->cols10_);
if(11<N) fprintf(stderr, ", %lld",(long long)s->cols11_);
if(12<N) fprintf(stderr, ", %lld",(long long)s->cols12_);
if(13<N) fprintf(stderr, ", %lld",(long long)s->cols13_);
if(14<N) fprintf(stderr, ", %lld",(long long)s->cols14_);
if(15<N) fprintf(stderr, ", %lld",(long long)s->cols15_);
if(16<N) fprintf(stderr, ", %lld",(long long)s->cols16_);
if(17<N) fprintf(stderr, ", %lld",(long long)s->cols17_);
if(18<N) fprintf(stderr, ", %lld",(long long)s->cols18_);
if(19<N) fprintf(stderr, ", %lld",(long long)s->cols19_);
fprintf(stderr, ")\n");
}
static inline int noisycmp(const void *ap, const void *bp) {
st const * const a = (st const * const)ap;
st const * const b = (st const * const)bp;