c++-ify diskTreeComponentIterator

git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@667 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
sears 2010-03-05 22:21:44 +00:00
parent 020f0faf17
commit 06fa7999cb
7 changed files with 113 additions and 139 deletions

View file

@ -755,12 +755,9 @@ void diskTreeComponent::print_tree(int xid, pageid_t pid, int64_t depth) {
//diskTreeComponentIterator implementation
/////////////////////////////////////////////////
lladdIterator_t* diskTreeComponentIterator::open(int xid, recordid root)
{
if(root.page == 0 && root.slot == 0 && root.size == -1)
return 0;
Page *p = loadPage(xid,root.page);
diskTreeComponentIterator::diskTreeComponentIterator(int xid, recordid root) {
if(root.page == 0 && root.slot == 0 && root.size == -1) abort();
p = loadPage(xid,root.page);
readlock(p->rwlatch,0);
DEBUG("ROOT_REC_SIZE %d\n", diskTreeComponent::root_rec_size);
@ -782,32 +779,24 @@ lladdIterator_t* diskTreeComponentIterator::open(int xid, recordid root)
assert(depth == 0);
}
diskTreeComponentIterator_t *impl = (diskTreeComponentIterator_t*)malloc(sizeof(diskTreeComponentIterator_t));
impl->p = p;
{
// Position just before the first slot.
// The first call to next() will increment us to the first slot, or return NULL.
recordid rid = { p->id, diskTreeComponent::FIRST_SLOT-1, 0};
impl->current = rid;
current = rid;
}
DEBUG("keysize = %d, slot = %d\n", keySize, impl->current.slot);
impl->t = 0;
impl->justOnePage = (depth == 0);
lladdIterator_t *it = (lladdIterator_t*) malloc(sizeof(lladdIterator_t));
it->type = -1; // XXX LSM_TREE_ITERATOR;
it->impl = impl;
return it;
DEBUG("keysize = %d, slot = %d\n", keySize, current.slot);
xid_ = xid;
done = false;
t = 0;
justOnePage = (depth == 0);
}
lladdIterator_t* diskTreeComponentIterator::openAt(int xid, recordid root, const byte* key, len_t keylen)
{
if(root.page == NULLRID.page && root.slot == NULLRID.slot)
return 0;
diskTreeComponentIterator::diskTreeComponentIterator(int xid, recordid root, const byte* key, len_t keylen) {
if(root.page == NULLRID.page && root.slot == NULLRID.slot) abort();
Page *p = loadPage(xid,root.page);
p = loadPage(xid,root.page);
readlock(p->rwlatch,0);
const byte *nr = diskTreeComponent::readRecord(xid,p,diskTreeComponent::DEPTH, diskTreeComponent::root_rec_size);
@ -818,95 +807,85 @@ lladdIterator_t* diskTreeComponentIterator::openAt(int xid, recordid root, const
if(lsm_entry_rid.page == NULLRID.page && lsm_entry_rid.slot == NULLRID.slot) {
unlock(p->rwlatch);
return 0;
done = true;
} else {
assert(lsm_entry_rid.size != INVALID_SLOT);
if(root.page != lsm_entry_rid.page)
{
unlock(p->rwlatch);
releasePage(p);
p = loadPage(xid,lsm_entry_rid.page);
readlock(p->rwlatch,0);
}
done = false;
current.page = lsm_entry_rid.page;
current.slot = lsm_entry_rid.slot-1; // this is current rid, which is one less than the first thing next will return (so subtract 1)
current.size = lsm_entry_rid.size;
xid_ = xid;
t = 0; // must be zero so free() doesn't croak.
justOnePage = (depth==0);
DEBUG("diskTreeComponentIterator: index root %lld index page %lld data page %lld key %s\n", root.page, current.page, rec->ptr, key);
DEBUG("entry = %s key = %s\n", (char*)(rec+1), (char*)key);
}
assert(lsm_entry_rid.size != INVALID_SLOT);
if(root.page != lsm_entry_rid.page)
{
unlock(p->rwlatch);
releasePage(p);
p = loadPage(xid,lsm_entry_rid.page);
readlock(p->rwlatch,0);
}
diskTreeComponentIterator_t *impl = (diskTreeComponentIterator_t*) malloc(sizeof(diskTreeComponentIterator_t));
impl->p = p;
impl->current.page = lsm_entry_rid.page;
impl->current.slot = lsm_entry_rid.slot-1; // this is current rid, which is one less than the first thing next will return (so subtract 1)
impl->current.size = lsm_entry_rid.size;
impl->t = 0; // must be zero so free() doesn't croak.
impl->justOnePage = (depth==0);
DEBUG("diskTreeComponentIterator: index root %lld index page %lld data page %lld key %s\n", root.page, impl->current.page, rec->ptr, key);
DEBUG("entry = %s key = %s\n", (char*)(rec+1), (char*)key);
lladdIterator_t *it = (lladdIterator_t*) malloc(sizeof(lladdIterator_t));
it->type = -1; // XXX LSM_TREE_ITERATOR
it->impl = impl;
return it;
}
/**
* move to the next page
**/
int diskTreeComponentIterator::next(int xid, lladdIterator_t *it)
int diskTreeComponentIterator::next()
{
diskTreeComponentIterator_t *impl = (diskTreeComponentIterator_t*) it->impl;
if(done) return 0;
impl->current = stasis_record_next(xid, impl->p, impl->current);
current = stasis_record_next(xid_, p, current);
if(impl->current.size == INVALID_SLOT) {
if(current.size == INVALID_SLOT) {
const indexnode_rec next_rec = *(const indexnode_rec*)diskTreeComponent::readRecord(xid,impl->p,
const indexnode_rec next_rec = *(const indexnode_rec*)diskTreeComponent::readRecord(xid_,p,
diskTreeComponent::NEXT_LEAF,
0);
unlock(impl->p->rwlatch);
releasePage(impl->p);
unlock(p->rwlatch);
releasePage(p);
DEBUG("done with page %lld next = %lld\n", impl->p->id, next_rec.ptr);
DEBUG("done with page %lld next = %lld\n", p->id, next_rec.ptr);
if(next_rec.ptr != -1 && ! impl->justOnePage) {
impl->p = loadPage(xid, next_rec.ptr);
readlock(impl->p->rwlatch,0);
impl->current.page = next_rec.ptr;
impl->current.slot = 2;
impl->current.size = stasis_record_length_read(xid, impl->p, impl->current); //keySize;
if(next_rec.ptr != -1 && ! justOnePage) {
p = loadPage(xid_, next_rec.ptr);
readlock(p->rwlatch,0);
current.page = next_rec.ptr;
current.slot = 2;
current.size = stasis_record_length_read(xid_, p, current);
} else {
impl->p = 0;
impl->current.size = INVALID_SLOT;
p = 0;
current.size = INVALID_SLOT;
}
}
if(impl->current.size != INVALID_SLOT) {
if(impl->t != NULL) free(impl->t);
if(current.size != INVALID_SLOT) {
if(t != NULL) { free(t); t = NULL; }
impl->t = (indexnode_rec*)malloc(impl->current.size);
memcpy(impl->t, diskTreeComponent::readRecord(xid,impl->p,impl->current), impl->current.size);
t = (indexnode_rec*)malloc(current.size);
memcpy(t, diskTreeComponent::readRecord(xid_,p,current), current.size);
return 1;
} else {
assert(!impl->p);
if(impl->t != NULL) free(impl->t);
impl->t = 0;
assert(!p);
if(t != NULL) { free(t); t = NULL; }
t = 0;
return 0;
}
}
void diskTreeComponentIterator::close(int xid, lladdIterator_t *it) {
void diskTreeComponentIterator::close() {
diskTreeComponentIterator_t *impl = (diskTreeComponentIterator_t*)it->impl;
if(impl->p) {
unlock(impl->p->rwlatch);
releasePage(impl->p);
if(p) {
unlock(p->rwlatch);
releasePage(p);
}
if(impl->t) free(impl->t);
free(impl);
free(it);
if(t) free(t);
}

View file

@ -147,39 +147,35 @@ private:
};
typedef struct {
Page * p;
recordid current;
indexnode_rec *t;
int justOnePage;
} diskTreeComponentIterator_t;
class diskTreeComponentIterator {
public:
static lladdIterator_t* open(int xid, recordid root);
static lladdIterator_t* openAt(int xid, recordid root, const byte* key, len_t keylen);
static int next(int xid, lladdIterator_t *it);
static void close(int xid, lladdIterator_t *it);
diskTreeComponentIterator(int xid, recordid root);
diskTreeComponentIterator(int xid, recordid root, const byte* key, len_t keylen);
int next();
void close();
static inline size_t key (int xid, lladdIterator_t *it, byte **key) {
diskTreeComponentIterator_t * impl = (diskTreeComponentIterator_t*)it->impl;
*key = (byte*)(impl->t+1);
return impl->current.size - sizeof(indexnode_rec);
inline size_t key (byte **key) {
*key = (byte*)(t+1);
return current.size - sizeof(indexnode_rec);
}
static inline size_t value(int xid, lladdIterator_t *it, byte **value) {
diskTreeComponentIterator_t * impl = (diskTreeComponentIterator_t*)it->impl;
*value = (byte*)&(impl->t->ptr);
return sizeof(impl->t->ptr);
inline size_t value(byte **value) {
*value = (byte*)&(t->ptr);
return sizeof(t->ptr);
}
static inline void tupleDone(int xid, void *it) { }
static inline void releaseLock(int xid, void *it) { }
inline void tupleDone() { }
inline void releaseLock() { }
private:
Page * p;
int xid_;
bool done;
recordid current;
indexnode_rec *t;
int justOnePage;
};
#endif /* DISKTREECOMPONENT_H_ */

View file

@ -12,9 +12,9 @@ void diskTreeIterator<TUPLE>::init_iterators(TUPLE * key1, TUPLE * key2) {
lsmIterator_ = NULL;
} else {
if(key1) {
lsmIterator_ = diskTreeComponentIterator::openAt(-1, tree_, key1->key(), key1->keylen());
lsmIterator_ = new diskTreeComponentIterator(-1, tree_, key1->key(), key1->keylen());
} else {
lsmIterator_ = diskTreeComponentIterator::open(-1, tree_);
lsmIterator_ = new diskTreeComponentIterator(-1, tree_);
}
}
}
@ -48,16 +48,18 @@ template <class TUPLE>
diskTreeIterator<TUPLE>::diskTreeIterator(diskTreeComponent *tree, TUPLE& key) :
tree_(tree ? tree->get_root_rec() : NULLRID)
{
init_iterators(&key,NULL);
init_helper(&key);
init_iterators(&key,NULL);
init_helper(&key);
}
template <class TUPLE>
diskTreeIterator<TUPLE>::~diskTreeIterator()
{
if(lsmIterator_)
diskTreeComponentIterator::close(-1, lsmIterator_);
if(lsmIterator_) {
lsmIterator_->close();
delete lsmIterator_;
}
if(curr_page!=NULL)
{
@ -79,7 +81,7 @@ void diskTreeIterator<TUPLE>::init_helper(TUPLE* key1)
}
else
{
if(diskTreeComponentIterator::next(-1, lsmIterator_) == 0)
if(lsmIterator_->next() == 0)
{
DEBUG("diskTreeIterator:\t__error__ init_helper():\tlogtreeIteratr::next returned 0." );
curr_page = 0;
@ -89,7 +91,7 @@ void diskTreeIterator<TUPLE>::init_helper(TUPLE* key1)
{
pageid_t * pid_tmp;
pageid_t ** hack = &pid_tmp;
diskTreeComponentIterator::value(-1,lsmIterator_,(byte**)hack);
lsmIterator_->value((byte**)hack);
curr_pageid = *pid_tmp;
curr_page = new DataPage<TUPLE>(-1, curr_pageid);
@ -119,12 +121,13 @@ TUPLE * diskTreeIterator<TUPLE>::getnext()
delete curr_page;
curr_page = 0;
if(diskTreeComponentIterator::next(-1,lsmIterator_))
if(lsmIterator_->next())
{
pageid_t *pid_tmp;
pageid_t **hack = &pid_tmp;
diskTreeComponentIterator::value(-1,lsmIterator_,(byte**)hack);
size_t ret = lsmIterator_->value((byte**)hack);
assert(ret == sizeof(pageid_t));
curr_pageid = *pid_tmp;
curr_page = new DataPage<TUPLE>(-1, curr_pageid);
DEBUG("opening datapage iterator %lld at beginning\n.", curr_pageid);

View file

@ -1,12 +1,6 @@
#ifndef _LOG_ITERATORS_H_
#define _LOG_ITERATORS_H_
#include <assert.h>
#include <stasis/iterator.h>
#undef begin
#undef end
template <class TUPLE>
class DataPage;
@ -172,7 +166,7 @@ private:
private:
recordid tree_; //root of the tree
lladdIterator_t * lsmIterator_; //diskTreeComponent iterator
diskTreeComponentIterator* lsmIterator_;
pageid_t curr_pageid; //current page id
DataPage<TUPLE> *curr_page; //current page

View file

@ -595,12 +595,12 @@ int op_stat_histogram(pthread_data* data, size_t limit) {
}
int xid = Tbegin();
lladdIterator_t * it = diskTreeComponentIterator::open(xid, data->ltable->get_tree_c2()->get_root_rec());
diskTreeComponentIterator * it = new diskTreeComponentIterator(xid, data->ltable->get_tree_c2()->get_root_rec());
size_t count = 0;
int err = 0;
while(diskTreeComponentIterator::next(xid, it)) { count++; }
diskTreeComponentIterator::close(xid, it);
while(it->next()) { count++; }
it->close();
uint64_t stride;
@ -619,12 +619,12 @@ int op_stat_histogram(pthread_data* data, size_t limit) {
size_t cur_stride = 0;
size_t i = 0;
it = diskTreeComponentIterator::open(xid, data->ltable->get_tree_c2()->get_root_rec());
while(diskTreeComponentIterator::next(xid, it)) {
it = new diskTreeComponentIterator(xid, data->ltable->get_tree_c2()->get_root_rec());
while(it->next()) {
i++;
if(i == count || !cur_stride) { // do we want to send this key? (this matches the first, last and interior keys)
byte * key;
size_t keylen= diskTreeComponentIterator::key(xid, it, &key);
size_t keylen= it->key(&key);
tup = datatuple::create(key, keylen);
if(!err) { err = writetupletosocket(*(data->workitem), tup); }
@ -635,7 +635,8 @@ int op_stat_histogram(pthread_data* data, size_t limit) {
cur_stride--;
}
diskTreeComponentIterator::close(xid, it);
it->close();
delete(it);
if(!err){ err = writeendofiteratortosocket(*(data->workitem)); }
Tcommit(xid);
return err;

View file

@ -24,8 +24,9 @@ int main(int argc, char **argv)
// lsmTableHandle<PAGELAYOUT>* h = TlsmTableStart<PAGELAYOUT>(lsmTable, INVALID_COL);
xid = Tbegin();
lladdIterator_t * it = diskTreeComponentIterator::open(xid,ltable.get_tree_c2()->get_root_rec() );
diskTreeComponentIterator::close(xid, it);
diskTreeComponentIterator * it = new diskTreeComponentIterator(xid,ltable.get_tree_c2()->get_root_rec() );
it->close();
delete it;
Tcommit(xid);
diskTreeComponent::deinit_stasis();

View file

@ -129,17 +129,16 @@ void insertProbeIter_str(int NUM_ENTRIES)
int64_t count = 0;
lladdIterator_t * it = diskTreeComponentIterator::open(xid, tree);
diskTreeComponentIterator * it = new diskTreeComponentIterator(xid, tree);
while(diskTreeComponentIterator::next(xid, it)) {
while(it->next()) {
byte * key;
byte **key_ptr = &key;
size_t keysize = diskTreeComponentIterator::key(xid, it, (byte**)key_ptr);
size_t keysize = it->key((byte**)key_ptr);
pageid_t *value;
pageid_t **value_ptr = &value;
int valsize = lsmTreeIterator_value(xid, it, (byte**)value_ptr);
//printf("keylen %d key %s\n", keysize, (char*)(key)) ;
size_t valsize = it->value((byte**)value_ptr);
assert(valsize == sizeof(pageid_t));
assert(!mycmp(std::string((char*)key), arr[count]) && !mycmp(arr[count],std::string((char*)key)));
assert(keysize == arr[count].length()+1);
@ -147,7 +146,8 @@ void insertProbeIter_str(int NUM_ENTRIES)
}
assert(count == NUM_ENTRIES);
diskTreeComponentIterator::close(xid, it);
it->close();
delete it;
Tcommit(xid);
diskTreeComponent::deinit_stasis();