network iterator API is now working (though it could use a better test suite)

git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@601 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
sears 2010-02-20 01:18:39 +00:00
parent 57c9afc8d8
commit 63a14d8509
18 changed files with 567 additions and 200 deletions

View file

@ -1,3 +1,5 @@
#include <network.h>
#ifndef _DATATUPLE_H_ #ifndef _DATATUPLE_H_
#define _DATATUPLE_H_ #define _DATATUPLE_H_
@ -6,13 +8,12 @@ typedef unsigned char byte;
#include <cstring> #include <cstring>
#include <assert.h> #include <assert.h>
typedef struct datatuple typedef struct datatuple
{ {
public: public:
typedef uint32_t len_t ;
typedef unsigned char* key_t ; typedef unsigned char* key_t ;
typedef unsigned char* data_t ; typedef unsigned char* data_t ;
static const len_t DELETE = ((len_t)0) - 1;
private: private:
len_t datalen_; len_t datalen_;
byte* key_; byte* key_;
@ -62,6 +63,10 @@ public:
return strcmp((char*)k1,(char*)k2); return strcmp((char*)k1,(char*)k2);
} }
static int compare_obj(const datatuple * a, const datatuple* b) {
return compare(a->key(), b->key());
}
inline void setDelete() { inline void setDelete() {
datalen_ = DELETE; datalen_ = DELETE;
} }

View file

@ -61,7 +61,7 @@ void diskTreeComponent::init_stasis() {
void diskTreeComponent::deinit_stasis() { Tdeinit(); } void diskTreeComponent::deinit_stasis() { Tdeinit(); }
void diskTreeComponent::free_region_rid(int xid, recordid tree, void diskTreeComponent::free_region_rid(int xid, recordid tree,
logtree_page_deallocator_t dealloc, void *allocator_state) diskTreeComponent_page_deallocator_t dealloc, void *allocator_state)
{ {
// Tdealloc(xid,tree); // Tdealloc(xid,tree);
dealloc(xid,allocator_state); dealloc(xid,allocator_state);
@ -218,6 +218,7 @@ recordid diskTreeComponent::create(int xid)
return ret; return ret;
} }
// XXX remove the next N records, which are completely redundant.
/** /**
* TODO: what happen if there is already such a record with a different size? * TODO: what happen if there is already such a record with a different size?
@ -272,13 +273,8 @@ const byte* diskTreeComponent::readRecord(int xid, Page * p, slotid_t slot, int6
rid.page = p->id; rid.page = p->id;
rid.slot = slot; rid.slot = slot;
rid.size = size; rid.size = size;
//byte *ret = (byte*)malloc(rid.size);
//stasis_record_read(xid,p,rid,ret);
//return ret;
const byte *nr = stasis_record_read_begin(xid,p,rid); const byte *nr = stasis_record_read_begin(xid,p,rid);
return nr; return nr;
// return readRecord(xid, p, rid);
} }
int32_t diskTreeComponent::readRecordLength(int xid, Page *p, slotid_t slot) int32_t diskTreeComponent::readRecordLength(int xid, Page *p, slotid_t slot)
@ -482,7 +478,7 @@ recordid diskTreeComponent::appendInternalNode(int xid, Page *p,
int64_t depth, int64_t depth,
const byte *key, size_t key_len, const byte *key, size_t key_len,
pageid_t val_page, pageid_t lastLeaf, pageid_t val_page, pageid_t lastLeaf,
logtree_page_allocator_t allocator, diskTreeComponent_page_allocator_t allocator,
void *allocator_state) void *allocator_state)
{ {
// assert(*stasis_page_type_ptr(p) == LOGTREE_ROOT_PAGE || // assert(*stasis_page_type_ptr(p) == LOGTREE_ROOT_PAGE ||
@ -553,7 +549,7 @@ recordid diskTreeComponent::appendInternalNode(int xid, Page *p,
recordid diskTreeComponent::buildPathToLeaf(int xid, recordid root, Page *root_p, recordid diskTreeComponent::buildPathToLeaf(int xid, recordid root, Page *root_p,
int64_t depth, const byte *key, size_t key_len, int64_t depth, const byte *key, size_t key_len,
pageid_t val_page, pageid_t lastLeaf, pageid_t val_page, pageid_t lastLeaf,
logtree_page_allocator_t allocator, diskTreeComponent_page_allocator_t allocator,
void *allocator_state) void *allocator_state)
{ {
@ -828,10 +824,10 @@ void diskTreeComponent::print_tree(int xid, pageid_t pid, int64_t depth)
} }
///////////////////////////////////////////////// /////////////////////////////////////////////////
//logtreeIterator implementation //diskTreeComponentIterator implementation
///////////////////////////////////////////////// /////////////////////////////////////////////////
lladdIterator_t* logtreeIterator::open(int xid, recordid root) lladdIterator_t* diskTreeComponentIterator::open(int xid, recordid root)
{ {
if(root.page == 0 && root.slot == 0 && root.size == -1) if(root.page == 0 && root.slot == 0 && root.size == -1)
return 0; return 0;
@ -860,7 +856,7 @@ lladdIterator_t* logtreeIterator::open(int xid, recordid root)
assert(depth == 0); assert(depth == 0);
logtreeIterator_s *impl = (logtreeIterator_s*)malloc(sizeof(logtreeIterator_s)); diskTreeComponentIterator_t *impl = (diskTreeComponentIterator_t*)malloc(sizeof(diskTreeComponentIterator_t));
impl->p = p; impl->p = p;
{ {
recordid rid = { p->id, 1, 0};//keySize }; //TODO: why does this start from 1? recordid rid = { p->id, 1, 0};//keySize }; //TODO: why does this start from 1?
@ -876,7 +872,7 @@ lladdIterator_t* logtreeIterator::open(int xid, recordid root)
return it; return it;
} }
lladdIterator_t* logtreeIterator::openAt(int xid, recordid root, const byte* key) lladdIterator_t* diskTreeComponentIterator::openAt(int xid, recordid root, const byte* key)
{ {
if(root.page == NULLRID.page && root.slot == NULLRID.slot) if(root.page == NULLRID.page && root.slot == NULLRID.slot)
return 0; return 0;
@ -906,7 +902,7 @@ lladdIterator_t* logtreeIterator::openAt(int xid, recordid root, const byte* key
readlock(p->rwlatch,0); readlock(p->rwlatch,0);
} }
logtreeIterator_s *impl = (logtreeIterator_s*) malloc(sizeof(logtreeIterator_s)); diskTreeComponentIterator_t *impl = (diskTreeComponentIterator_t*) malloc(sizeof(diskTreeComponentIterator_t));
impl->p = p; impl->p = p;
impl->current.page = lsm_entry_rid.page; impl->current.page = lsm_entry_rid.page;
@ -925,9 +921,9 @@ lladdIterator_t* logtreeIterator::openAt(int xid, recordid root, const byte* key
/** /**
* move to the next page * move to the next page
**/ **/
int logtreeIterator::next(int xid, lladdIterator_t *it) int diskTreeComponentIterator::next(int xid, lladdIterator_t *it)
{ {
logtreeIterator_s *impl = (logtreeIterator_s*) it->impl; diskTreeComponentIterator_t *impl = (diskTreeComponentIterator_t*) it->impl;
impl->current = stasis_record_next(xid, impl->p, impl->current); impl->current = stasis_record_next(xid, impl->p, impl->current);
@ -987,9 +983,9 @@ int logtreeIterator::next(int xid, lladdIterator_t *it)
} }
void logtreeIterator::close(int xid, lladdIterator_t *it) void diskTreeComponentIterator::close(int xid, lladdIterator_t *it)
{ {
logtreeIterator_s *impl = (logtreeIterator_s*)it->impl; diskTreeComponentIterator_t *impl = (diskTreeComponentIterator_t*)it->impl;
if(impl->p) if(impl->p)
{ {
unlock(impl->p->rwlatch); unlock(impl->p->rwlatch);

View file

@ -32,10 +32,12 @@ typedef struct RegionAllocConf_t
pageid_t regionSize; pageid_t regionSize;
} RegionAllocConf_t; } RegionAllocConf_t;
struct indexnode_rec {
pageid_t ptr;
};
typedef pageid_t(*logtree_page_allocator_t)(int, void *); typedef pageid_t(*diskTreeComponent_page_allocator_t)(int, void *);
typedef void(*logtree_page_deallocator_t)(int, void *); typedef void(*diskTreeComponent_page_deallocator_t)(int, void *);
class diskTreeComponent{ class diskTreeComponent{
public: public:
@ -55,7 +57,7 @@ public:
static pageid_t*list_region_rid(int xid, void * ridp, pageid_t * region_len, pageid_t * region_count); static pageid_t*list_region_rid(int xid, void * ridp, pageid_t * region_len, pageid_t * region_count);
static void dealloc_region_rid(int xid, recordid rid); static void dealloc_region_rid(int xid, recordid rid);
static void free_region_rid(int xid, recordid tree, static void free_region_rid(int xid, recordid tree,
logtree_page_deallocator_t dealloc, diskTreeComponent_page_deallocator_t dealloc,
void *allocator_state); void *allocator_state);
static void writeNodeRecord(int xid, Page *p, recordid &rid, static void writeNodeRecord(int xid, Page *p, recordid &rid,
@ -92,20 +94,20 @@ public:
//rmLeafID --> rightmost leaf id //rmLeafID --> rightmost leaf id
static recordid appendPage(int xid, recordid tree, pageid_t & rmLeafID, static recordid appendPage(int xid, recordid tree, pageid_t & rmLeafID,
const byte *key,size_t keySize, const byte *key,size_t keySize,
logtree_page_allocator_t allocator, void *allocator_state, diskTreeComponent_page_allocator_t allocator, void *allocator_state,
long val_page); long val_page);
static recordid appendInternalNode(int xid, Page *p, static recordid appendInternalNode(int xid, Page *p,
int64_t depth, int64_t depth,
const byte *key, size_t key_len, const byte *key, size_t key_len,
pageid_t val_page, pageid_t lastLeaf, pageid_t val_page, pageid_t lastLeaf,
logtree_page_allocator_t allocator, diskTreeComponent_page_allocator_t allocator,
void *allocator_state); void *allocator_state);
static recordid buildPathToLeaf(int xid, recordid root, Page *root_p, static recordid buildPathToLeaf(int xid, recordid root, Page *root_p,
int64_t depth, const byte *key, size_t key_len, int64_t depth, const byte *key, size_t key_len,
pageid_t val_page, pageid_t lastLeaf, pageid_t val_page, pageid_t lastLeaf,
logtree_page_allocator_t allocator, diskTreeComponent_page_allocator_t allocator,
void *allocator_state); void *allocator_state);
inline DataPage<datatuple>::RegionAllocator* get_alloc() { return region_alloc; } inline DataPage<datatuple>::RegionAllocator* get_alloc() { return region_alloc; }
@ -143,4 +145,44 @@ private:
}; };
typedef struct {
Page * p;
recordid current;
indexnode_rec *t;
int justOnePage;
} diskTreeComponentIterator_t;
class diskTreeComponentIterator
{
public:
static lladdIterator_t* open(int xid, recordid root);
static lladdIterator_t* openAt(int xid, recordid root, const byte* key);
static int next(int xid, lladdIterator_t *it);
static void close(int xid, lladdIterator_t *it);
static inline size_t key (int xid, lladdIterator_t *it, byte **key)
{
diskTreeComponentIterator_t * impl = (diskTreeComponentIterator_t*)it->impl;
*key = (byte*)(impl->t+1);
return impl->current.size - sizeof(indexnode_rec);
}
static inline size_t value(int xid, lladdIterator_t *it, byte **value)
{
diskTreeComponentIterator_t * impl = (diskTreeComponentIterator_t*)it->impl;
*value = (byte*)&(impl->t->ptr);
return sizeof(impl->t->ptr);
}
static inline void tupleDone(int xid, void *it) { }
static inline void releaseLock(int xid, void *it) { }
};
#endif /* DISKTREECOMPONENT_H_ */ #endif /* DISKTREECOMPONENT_H_ */

View file

@ -6,28 +6,66 @@
///////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////
template <class TUPLE> template <class TUPLE>
treeIterator<TUPLE>::treeIterator(recordid tree) : void diskTreeIterator<TUPLE>::init_iterators(TUPLE * key1, TUPLE * key2) {
assert(!key2); // unimplemented
if(tree_.size == INVALID_SIZE) {
lsmIterator_ = NULL;
} else {
if(key1) {
lsmIterator_ = diskTreeComponentIterator::openAt(-1, tree_, key1->key());
} else {
lsmIterator_ = diskTreeComponentIterator::open(-1, tree_);
}
}
}
template <class TUPLE>
diskTreeIterator<TUPLE>::diskTreeIterator(recordid tree) :
tree_(tree), tree_(tree),
lsmIterator_(logtreeIterator::open(-1,tree)), // lsmIterator_(diskTreeComponentIterator::open(-1,tree)),
curr_tuple(0) curr_tuple(0)
{ {
init_iterators(NULL,NULL);
init_helper(); init_helper();
} }
template <class TUPLE> template <class TUPLE>
treeIterator<TUPLE>::treeIterator(recordid tree, TUPLE& key) : diskTreeIterator<TUPLE>::diskTreeIterator(recordid tree, TUPLE& key) :
tree_(tree), tree_(tree),
lsmIterator_(logtreeIterator::openAt(-1,tree,key.get_key())) //lsmIterator_(diskTreeComponentIterator::openAt(-1,tree,key.key()))
curr_tuple(0)
{ {
init_iterators(&key,NULL);
init_helper(); init_helper();
}
template <class TUPLE>
diskTreeIterator<TUPLE>::diskTreeIterator(diskTreeComponent *tree) :
tree_(tree ? tree->get_root_rec() : NULLRID),
//lsmIterator_(diskTreeComponentIterator::open(-1,tree->get_root_rec())),
curr_tuple(0)
{
init_iterators(NULL, NULL);
init_helper();
}
template <class TUPLE>
diskTreeIterator<TUPLE>::diskTreeIterator(diskTreeComponent *tree, TUPLE& key) :
tree_(tree ? tree->get_root_rec() : NULLRID),
// lsmIterator_(diskTreeComponentIterator::openAt(-1,tree->get_root_rec(),key.key()))
curr_tuple(0)
{
init_iterators(&key,NULL);
init_helper();
} }
template <class TUPLE> template <class TUPLE>
treeIterator<TUPLE>::~treeIterator() diskTreeIterator<TUPLE>::~diskTreeIterator()
{ {
if(lsmIterator_) if(lsmIterator_)
logtreeIterator::close(-1, lsmIterator_); diskTreeComponentIterator::close(-1, lsmIterator_);
if(curr_tuple != NULL) if(curr_tuple != NULL)
free(curr_tuple); free(curr_tuple);
@ -42,19 +80,19 @@ treeIterator<TUPLE>::~treeIterator()
} }
template <class TUPLE> template <class TUPLE>
void treeIterator<TUPLE>::init_helper() void diskTreeIterator<TUPLE>::init_helper()
{ {
if(!lsmIterator_) if(!lsmIterator_)
{ {
printf("treeIterator:\t__error__ init_helper():\tnull lsmIterator_"); // printf("treeIterator:\t__error__ init_helper():\tnull lsmIterator_");
curr_page = 0; curr_page = 0;
dp_itr = 0; dp_itr = 0;
} }
else else
{ {
if(logtreeIterator::next(-1, lsmIterator_) == 0) if(diskTreeComponentIterator::next(-1, lsmIterator_) == 0)
{ {
//printf("treeIterator:\t__error__ init_helper():\tlogtreeIteratr::next returned 0." ); //printf("diskTreeIterator:\t__error__ init_helper():\tlogtreeIteratr::next returned 0." );
curr_page = 0; curr_page = 0;
dp_itr = 0; dp_itr = 0;
} }
@ -62,7 +100,7 @@ void treeIterator<TUPLE>::init_helper()
{ {
pageid_t * pid_tmp; pageid_t * pid_tmp;
pageid_t ** hack = &pid_tmp; pageid_t ** hack = &pid_tmp;
logtreeIterator::value(-1,lsmIterator_,(byte**)hack); diskTreeComponentIterator::value(-1,lsmIterator_,(byte**)hack);
curr_pageid = *pid_tmp; curr_pageid = *pid_tmp;
curr_page = new DataPage<TUPLE>(-1, curr_pageid); curr_page = new DataPage<TUPLE>(-1, curr_pageid);
@ -73,9 +111,9 @@ void treeIterator<TUPLE>::init_helper()
} }
template <class TUPLE> template <class TUPLE>
TUPLE * treeIterator<TUPLE>::getnext() TUPLE * diskTreeIterator<TUPLE>::getnext()
{ {
assert(this->lsmIterator_); if(!this->lsmIterator_) { return NULL; }
if(dp_itr == 0) if(dp_itr == 0)
return 0; return 0;
@ -90,12 +128,12 @@ TUPLE * treeIterator<TUPLE>::getnext()
delete curr_page; delete curr_page;
curr_page = 0; curr_page = 0;
if(logtreeIterator::next(-1,lsmIterator_)) if(diskTreeComponentIterator::next(-1,lsmIterator_))
{ {
pageid_t *pid_tmp; pageid_t *pid_tmp;
pageid_t **hack = &pid_tmp; pageid_t **hack = &pid_tmp;
logtreeIterator::value(-1,lsmIterator_,(byte**)hack); diskTreeComponentIterator::value(-1,lsmIterator_,(byte**)hack);
curr_pageid = *pid_tmp; curr_pageid = *pid_tmp;
curr_page = new DataPage<TUPLE>(-1, curr_pageid); curr_page = new DataPage<TUPLE>(-1, curr_pageid);
dp_itr = new DPITR_T(curr_page->begin()); dp_itr = new DPITR_T(curr_page->begin());
@ -110,3 +148,6 @@ TUPLE * treeIterator<TUPLE>::getnext()
curr_tuple = readTuple; curr_tuple = readTuple;
return curr_tuple; return curr_tuple;
} }
template class diskTreeIterator<datatuple>;
template class changingMemTreeIterator<rbtree_t, datatuple>;

View file

@ -10,6 +10,63 @@
template <class TUPLE> template <class TUPLE>
class DataPage; class DataPage;
template <class MEMTREE, class TUPLE>
class changingMemTreeIterator
{
private:
typedef typename MEMTREE::const_iterator MTITER;
public:
changingMemTreeIterator( MEMTREE *s, pthread_mutex_t * rb_mut ) : s_(s), mut_(rb_mut) {
pthread_mutex_lock(mut_);
if(s_->begin() == s_->end()) {
next_ret_ = NULL;
} else {
next_ret_ = (*s->begin())->create_copy(); // the create_copy() calls have to happen before we release mut_...
}
pthread_mutex_unlock(mut_);
}
changingMemTreeIterator( MEMTREE *s, pthread_mutex_t * rb_mut, TUPLE *&key ) {
pthread_mutex_lock(mut_);
if(s_->find(key) != s_->end()) {
next_ret_ = (*(s_->find(key)))->create_copy();
} else if(s_->upper_bound(key) != s->end()) {
next_ret_ = (*(s_->upper_bound(key)))->create_copy();
} else {
next_ret_ = NULL;
}
pthread_mutex_unlock(mut_);
}
~changingMemTreeIterator() { if(next_ret_) delete next_ret_; }
TUPLE* getnext() {
pthread_mutex_lock(mut_);
TUPLE * ret = next_ret_;
if(next_ret_) {
if(s_->upper_bound(next_ret_) == s_->end()) {
next_ret_ = 0;
} else {
next_ret_ = (*s_->upper_bound(next_ret_))->create_copy();
}
}
pthread_mutex_unlock(mut_);
return ret;
}
private:
explicit changingMemTreeIterator() { abort(); }
void operator=(changingMemTreeIterator & t) { abort(); }
int operator-(changingMemTreeIterator & t) { abort(); }
private:
MEMTREE *s_;
TUPLE * next_ret_;
pthread_mutex_t * mut_;
};
////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////
// memTreeIterator // memTreeIterator
///////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////
@ -21,52 +78,77 @@ private:
typedef typename MEMTREE::const_iterator MTITER; typedef typename MEMTREE::const_iterator MTITER;
public: public:
memTreeIterator( MEMTREE *s ) : first_(true), done_(false), it_(s->begin()), itend_(s->end()) { } memTreeIterator( MEMTREE *s )
memTreeIterator( MEMTREE *s, TUPLE &key ) : first_(true), done_(false), it_(s->find(key)), itend_(s->end()) { } : first_(true),
done_(s == NULL) {
init_iterators(s, NULL, NULL);
}
~memTreeIterator() { } memTreeIterator( MEMTREE *s, TUPLE *&key )
: first_(true), done_(s == NULL) {
init_iterators(s, key, NULL);
}
~memTreeIterator() {
delete it_;
delete itend_;
}
TUPLE* getnext() { TUPLE* getnext() {
if(done_) { return NULL; } if(done_) { return NULL; }
if(first_) { first_ = 0;} else { it_++; } if(first_) { first_ = 0;} else { (*it_)++; }
if(it_==itend_) { done_= true; return NULL; } if(*it_==*itend_) { done_= true; return NULL; }
return (*it_)->create_copy(); return (*(*it_))->create_copy();
} }
private: private:
void init_iterators(MEMTREE * s, TUPLE * key1, TUPLE * key2) {
if(s) {
it_ = key1 ? new MTITER(s->find(key1)) : new MTITER(s->begin());
itend_ = key2 ? new MTITER(s->find(key2)) : new MTITER(s->end());
} else {
it_ = NULL;
itend_ = NULL;
}
}
explicit memTreeIterator() { abort(); } explicit memTreeIterator() { abort(); }
void operator=(memTreeIterator & t) { abort(); } void operator=(memTreeIterator & t) { abort(); }
int operator-(memTreeIterator & t) { abort(); } int operator-(memTreeIterator & t) { abort(); }
private: private:
bool first_; bool first_;
bool done_; bool done_;
MTITER it_; MTITER *it_;
MTITER itend_; MTITER *itend_;
}; };
///////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////
template <class TUPLE> template <class TUPLE>
class treeIterator class diskTreeIterator
{ {
public: public:
explicit treeIterator(recordid tree); explicit diskTreeIterator(recordid tree);
explicit treeIterator(recordid tree,TUPLE &key); explicit diskTreeIterator(recordid tree,TUPLE &key);
~treeIterator(); explicit diskTreeIterator(diskTreeComponent *tree);
explicit diskTreeIterator(diskTreeComponent *tree,TUPLE &key);
~diskTreeIterator();
TUPLE * getnext(); TUPLE * getnext();
private: private:
void init_iterators(TUPLE * key1, TUPLE * key2);
inline void init_helper(); inline void init_helper();
explicit treeIterator() { abort(); } explicit diskTreeIterator() { abort(); }
void operator=(treeIterator & t) { abort(); } void operator=(diskTreeIterator & t) { abort(); }
int operator-(treeIterator & t) { abort(); } int operator-(diskTreeIterator & t) { abort(); }
private: private:
recordid tree_; //root of the tree recordid tree_; //root of the tree

View file

@ -436,10 +436,13 @@ void * thread_work_fn( void * args)
continue; continue;
} }
int err; int err = 0;
//step 2: read the tuple from client //step 2: read the first tuple from client
datatuple * tuple = readtuplefromsocket(*(item->data->workitem), &err); datatuple *tuple, *tuple2;
if(!err) { tuple = readtuplefromsocket(*(item->data->workitem), &err); }
// read the second tuple from client
if(!err) { tuple2 = readtuplefromsocket(*(item->data->workitem), &err); }
//step 3: process the tuple //step 3: process the tuple
if(opcode == OP_INSERT) if(opcode == OP_INSERT)
@ -485,7 +488,9 @@ void * thread_work_fn( void * args)
//send the tuple //send the tuple
err = writetupletosocket(*(item->data->workitem), dt); err = writetupletosocket(*(item->data->workitem), dt);
} }
if(!err) {
writeendofiteratortosocket(*(item->data->workitem));
}
//free datatuple //free datatuple
if(dt_needs_free) { if(dt_needs_free) {
datatuple::freetuple(dt); datatuple::freetuple(dt);
@ -493,27 +498,36 @@ void * thread_work_fn( void * args)
} }
else if(opcode == OP_SCAN) else if(opcode == OP_SCAN)
{ {
datatuple * end_tuple; size_t limit = -1;
size_t limit; size_t count = 0;
if(!err) { end_tuple = readtuplefromsocket(*(item->data->workitem), &err); }
if(!err) { limit = readcountfromsocket(*(item->data->workitem), &err); } if(!err) { limit = readcountfromsocket(*(item->data->workitem), &err); }
if(!err) { if(!err) { err = writeoptosocket(*(item->data->workitem), LOGSTORE_RESPONSE_SENDING_TUPLES); }
treeIterator<datatuple> * itr;
// if(tuple) { if(!err) {
// itr = new treeIterator<datatuple>(item->data->ltable, *tuple); logtableIterator<datatuple> * itr = new logtableIterator<datatuple>(item->data->ltable, tuple);
// } else { datatuple * t;
// itr = new treeIterator<datatuple>(item->data->ltable); while(!err && (t = itr->getnext())) {
// } if(tuple2) { // are we at the end of range?
abort(); if(datatuple::compare_obj(t, tuple2) >= 0) {
datatuple::freetuple(t);
break;
}
}
err = writetupletosocket(*(item->data->workitem), t);
datatuple::freetuple(t);
count ++;
if(count == limit) { break; } // did we hit limit?
}
delete itr; delete itr;
} }
if(!err) { writeendofiteratortosocket(*(item->data->workitem)); }
} }
else if(opcode == OP_DBG_BLOCKMAP) else if(opcode == OP_DBG_BLOCKMAP)
{ {
// produce a list of stasis regions // produce a list of stasis regions
int xid = Tbegin(); int xid = Tbegin();
readlock(item->data->ltable->getMergeData()->header_lock, 0); readlock(item->data->ltable->header_lock, 0);
// produce a list of regions used by current tree components // produce a list of regions used by current tree components
pageid_t datapage_c1_region_length, datapage_c1_mergeable_region_length = 0, datapage_c2_region_length; pageid_t datapage_c1_region_length, datapage_c1_mergeable_region_length = 0, datapage_c2_region_length;
@ -538,7 +552,7 @@ void * thread_work_fn( void * args)
tree_c1_mergeable_regions = diskTreeComponent::list_region_rid(xid, &tree_c1_mergeable_region_header, &tree_c1_mergeable_region_length, &tree_c1_mergeable_region_count); tree_c1_mergeable_regions = diskTreeComponent::list_region_rid(xid, &tree_c1_mergeable_region_header, &tree_c1_mergeable_region_length, &tree_c1_mergeable_region_count);
} }
pageid_t * tree_c2_regions = diskTreeComponent::list_region_rid(xid, &tree_c2_region_header, &tree_c2_region_length, &tree_c2_region_count); pageid_t * tree_c2_regions = diskTreeComponent::list_region_rid(xid, &tree_c2_region_header, &tree_c2_region_length, &tree_c2_region_count);
unlock(item->data->ltable->getMergeData()->header_lock); unlock(item->data->ltable->header_lock);
Tcommit(xid); Tcommit(xid);
@ -597,7 +611,8 @@ void * thread_work_fn( void * args)
} }
//free the tuple //free the tuple
datatuple::freetuple(tuple); if(tuple) datatuple::freetuple(tuple);
if(tuple2) datatuple::freetuple(tuple2);
if(err) { if(err) {
perror("could not respond to client"); perror("could not respond to client");

View file

@ -26,7 +26,6 @@ static inline double tv_to_double(struct timeval tv)
template class DataPage<datatuple>; template class DataPage<datatuple>;
logtable::logtable() logtable::logtable()
{ {
@ -41,9 +40,12 @@ logtable::logtable()
//tmerger = new tuplemerger(&append_merger); //tmerger = new tuplemerger(&append_merger);
tmerger = new tuplemerger(&replace_merger); tmerger = new tuplemerger(&replace_merger);
header_lock = initlock();
tsize = 0; tsize = 0;
tree_bytes = 0; tree_bytes = 0;
epoch = 0;
} }
@ -80,6 +82,7 @@ logtable::~logtable()
tearDownTree(tree_c0); tearDownTree(tree_c0);
} }
deletelock(header_lock);
delete tmerger; delete tmerger;
} }
@ -121,7 +124,7 @@ void logtable::flushTable()
start = tv_to_double(start_tv); start = tv_to_double(start_tv);
writelock(mergedata->header_lock,0); writelock(header_lock,0);
pthread_mutex_lock(mergedata->rbtree_mut); pthread_mutex_lock(mergedata->rbtree_mut);
int expmcount = merge_count; int expmcount = merge_count;
@ -133,7 +136,7 @@ void logtable::flushTable()
while(get_tree_c0_mergeable()) { while(get_tree_c0_mergeable()) {
unlock(mergedata->header_lock); unlock(header_lock);
// pthread_mutex_lock(mergedata->rbtree_mut); // pthread_mutex_lock(mergedata->rbtree_mut);
if(tree_bytes >= max_c0_size) if(tree_bytes >= max_c0_size)
pthread_cond_wait(mergedata->input_needed_cond, mergedata->rbtree_mut); pthread_cond_wait(mergedata->input_needed_cond, mergedata->rbtree_mut);
@ -146,12 +149,12 @@ void logtable::flushTable()
pthread_mutex_unlock(mergedata->rbtree_mut); pthread_mutex_unlock(mergedata->rbtree_mut);
writelock(mergedata->header_lock,0); writelock(header_lock,0);
pthread_mutex_lock(mergedata->rbtree_mut); pthread_mutex_lock(mergedata->rbtree_mut);
if(expmcount != merge_count) if(expmcount != merge_count)
{ {
unlock(mergedata->header_lock); unlock(header_lock);
pthread_mutex_unlock(mergedata->rbtree_mut); pthread_mutex_unlock(mergedata->rbtree_mut);
return; return;
} }
@ -177,7 +180,7 @@ void logtable::flushTable()
tree_bytes = 0; tree_bytes = 0;
pthread_mutex_unlock(mergedata->rbtree_mut); pthread_mutex_unlock(mergedata->rbtree_mut);
unlock(mergedata->header_lock); unlock(header_lock);
if(first) if(first)
{ {
printf("flush waited %f sec\n", stop-start); printf("flush waited %f sec\n", stop-start);
@ -197,7 +200,7 @@ datatuple * logtable::findTuple(int xid, const datatuple::key_t key, size_t keyS
//prepare a search tuple //prepare a search tuple
datatuple *search_tuple = datatuple::create(key, keySize); datatuple *search_tuple = datatuple::create(key, keySize);
readlock(mergedata->header_lock,0); readlock(header_lock,0);
pthread_mutex_lock(mergedata->rbtree_mut); pthread_mutex_lock(mergedata->rbtree_mut);
datatuple *ret_tuple=0; datatuple *ret_tuple=0;
@ -328,7 +331,7 @@ datatuple * logtable::findTuple(int xid, const datatuple::key_t key, size_t keyS
} }
//pthread_mutex_unlock(mergedata->rbtree_mut); //pthread_mutex_unlock(mergedata->rbtree_mut);
unlock(mergedata->header_lock); unlock(header_lock);
datatuple::freetuple(search_tuple); datatuple::freetuple(search_tuple);
return ret_tuple; return ret_tuple;
@ -411,12 +414,8 @@ datatuple * logtable::findTuple_first(int xid, datatuple::key_t key, size_t keyS
void logtable::insertTuple(datatuple *tuple) void logtable::insertTuple(datatuple *tuple)
{ {
//static int count = LATCH_INTERVAL;
//static int tsize = 0; //number of tuples
//static int64_t tree_bytes = 0; //number of bytes
//lock the red-black tree //lock the red-black tree
readlock(mergedata->header_lock,0); readlock(header_lock,0);
pthread_mutex_lock(mergedata->rbtree_mut); pthread_mutex_lock(mergedata->rbtree_mut);
//find the previous tuple with same key in the memtree if exists //find the previous tuple with same key in the memtree if exists
rbtree_t::iterator rbitr = tree_c0->find(tuple); rbtree_t::iterator rbitr = tree_c0->find(tuple);
@ -451,20 +450,16 @@ void logtable::insertTuple(datatuple *tuple)
{ {
DEBUG("tree size before merge %d tuples %lld bytes.\n", tsize, tree_bytes); DEBUG("tree size before merge %d tuples %lld bytes.\n", tsize, tree_bytes);
pthread_mutex_unlock(mergedata->rbtree_mut); pthread_mutex_unlock(mergedata->rbtree_mut);
unlock(mergedata->header_lock); unlock(header_lock);
flushTable(); flushTable();
readlock(mergedata->header_lock,0); readlock(header_lock,0);
pthread_mutex_lock(mergedata->rbtree_mut); pthread_mutex_lock(mergedata->rbtree_mut);
//tsize = 0;
//tree_bytes = 0;
} }
//unlock //unlock
pthread_mutex_unlock(mergedata->rbtree_mut); pthread_mutex_unlock(mergedata->rbtree_mut);
unlock(mergedata->header_lock); unlock(header_lock);
DEBUG("tree size %d tuples %lld bytes.\n", tsize, tree_bytes); DEBUG("tree size %d tuples %lld bytes.\n", tsize, tree_bytes);
@ -525,3 +520,5 @@ datatuple * logtable::findTuple(int xid, datatuple::key_t key, size_t keySize,
} }
return tup; return tup;
} }
template class logtableIterator<datatuple>;

View file

@ -36,14 +36,12 @@
#include "tuplemerger.h" #include "tuplemerger.h"
#include "datatuple.h" #include "datatuple.h"
struct logtable_mergedata; #include "logiterators.h"
typedef std::set<datatuple*, datatuple> rbtree_t; typedef std::set<datatuple*, datatuple> rbtree_t;
typedef rbtree_t* rbtree_ptr_t; typedef rbtree_t* rbtree_ptr_t;
struct indexnode_rec { #include "merger.h"
pageid_t ptr;
};
class logtable class logtable
{ {
@ -71,23 +69,26 @@ public:
inline recordid & get_table_rec(){return table_rec;} // TODO This is called by merger.cpp for no good reason. (remove the calls) inline recordid & get_table_rec(){return table_rec;} // TODO This is called by merger.cpp for no good reason. (remove the calls)
inline uint64_t get_epoch() { return epoch; }
inline diskTreeComponent * get_tree_c2(){return tree_c2;} inline diskTreeComponent * get_tree_c2(){return tree_c2;}
inline diskTreeComponent * get_tree_c1(){return tree_c1;} inline diskTreeComponent * get_tree_c1(){return tree_c1;}
inline diskTreeComponent * get_tree_c1_mergeable(){return tree_c1_mergeable;} inline diskTreeComponent * get_tree_c1_mergeable(){return tree_c1_mergeable;}
inline void set_tree_c1(diskTreeComponent *t){tree_c1=t;} inline void set_tree_c1(diskTreeComponent *t){tree_c1=t; epoch++; }
inline void set_tree_c1_mergeable(diskTreeComponent *t){tree_c1_mergeable=t;} inline void set_tree_c1_mergeable(diskTreeComponent *t){tree_c1_mergeable=t; epoch++; }
inline void set_tree_c2(diskTreeComponent *t){tree_c2=t;} inline void set_tree_c2(diskTreeComponent *t){tree_c2=t; epoch++; }
inline rbtree_ptr_t get_tree_c0(){return tree_c0;} inline rbtree_ptr_t get_tree_c0(){return tree_c0;}
inline rbtree_ptr_t get_tree_c0_mergeable(){return tree_c0_mergeable;} inline rbtree_ptr_t get_tree_c0_mergeable(){return tree_c0_mergeable;}
void set_tree_c0(rbtree_ptr_t newtree){tree_c0 = newtree;} void set_tree_c0(rbtree_ptr_t newtree){tree_c0 = newtree; epoch++; }
void set_tree_c0_mergeable(rbtree_ptr_t newtree){tree_c0_mergeable = newtree;} void set_tree_c0_mergeable(rbtree_ptr_t newtree){tree_c0_mergeable = newtree; epoch++; }
int get_fixed_page_count(){return fixed_page_count;} int get_fixed_page_count(){return fixed_page_count;}
void set_fixed_page_count(int count){fixed_page_count = count;} void set_fixed_page_count(int count){fixed_page_count = count;}
void setMergeData(logtable_mergedata * mdata) { this->mergedata = mdata;} void setMergeData(logtable_mergedata * mdata) { this->mergedata = mdata; epoch++; }
logtable_mergedata* getMergeData(){return mergedata;} logtable_mergedata* getMergeData(){return mergedata;}
inline tuplemerger * gettuplemerger(){return tmerger;} inline tuplemerger * gettuplemerger(){return tmerger;}
@ -106,6 +107,7 @@ public:
const static RegionAllocConf_t DATAPAGE_REGION_ALLOC_STATIC_INITIALIZER; const static RegionAllocConf_t DATAPAGE_REGION_ALLOC_STATIC_INITIALIZER;
logtable_mergedata * mergedata; logtable_mergedata * mergedata;
rwl * header_lock;
int64_t max_c0_size; int64_t max_c0_size;
@ -118,7 +120,7 @@ public:
private: private:
recordid table_rec; recordid table_rec;
struct table_header tbl_header; struct table_header tbl_header;
uint64_t epoch;
diskTreeComponent *tree_c2; //big tree diskTreeComponent *tree_c2; //big tree
diskTreeComponent *tree_c1; //small tree diskTreeComponent *tree_c1; //small tree
diskTreeComponent *tree_c1_mergeable; //small tree: ready to be merged with c2 diskTreeComponent *tree_c1_mergeable; //small tree: ready to be merged with c2
@ -137,44 +139,203 @@ private:
bool still_running_; bool still_running_;
}; };
template<class ITRA, class ITRN, class TUPLE>
typedef struct logtreeIterator_s { class mergeManyIterator {
Page * p;
recordid current;
indexnode_rec *t;
int justOnePage;
} logtreeIterator_s;
class logtreeIterator
{
public: public:
static lladdIterator_t* open(int xid, recordid root); explicit mergeManyIterator(ITRA* a, ITRN** iters, int num_iters, TUPLE*(*merge)(const TUPLE*,const TUPLE*), int (*cmp)(const TUPLE*,const TUPLE*)) :
static lladdIterator_t* openAt(int xid, recordid root, const byte* key); num_iters_(num_iters+1),
static int next(int xid, lladdIterator_t *it); first_iter_(a),
static void close(int xid, lladdIterator_t *it); iters_((ITRN**)malloc(sizeof(*iters_) * num_iters)), // exactly the number passed in
current_((TUPLE**)malloc(sizeof(*current_) * (num_iters_))), // one more than was passed in
last_iter_(-1),
cmp_(cmp),
merge_(merge),
dups((int*)malloc(sizeof(*dups)*num_iters_))
{
current_[0] = first_iter_->getnext();
for(int i = 1; i < num_iters_; i++) {
iters_[i-1] = iters[i-1];
current_[i] = iters_[i-1]->getnext();
}
}
~mergeManyIterator() {
delete(first_iter_);
for(int i = 0; i < num_iters_; i++) {
if(i != last_iter_) {
if(current_[i]) TUPLE::freetuple(current_[i]);
}
}
for(int i = 1; i < num_iters_; i++) {
delete iters_[i-1];
}
free(current_);
free(iters_);
free(dups);
}
TUPLE * getnext() {
int num_dups = 0;
if(last_iter_ != -1) {
// get the value after the one we just returned to the user
//TUPLE::freetuple(current_[last_iter_]); // should never be null
if(last_iter_ == 0) {
current_[last_iter_] = first_iter_->getnext();
} else {
current_[last_iter_] = iters_[last_iter_-1]->getnext();
}
}
// find the first non-empty iterator. (Don't need to special-case ITRA since we're looking at current.)
int min = 0;
while(min < num_iters_ && !current_[min]) {
min++;
}
if(min == num_iters_) { return NULL; }
// examine current to decide which tuple to return.
for(int i = min+1; i < num_iters_; i++) {
if(current_[i]) {
int res = cmp_(current_[min], current_[i]);
if(res > 0) { // min > i
min = i;
num_dups = 0;
} else if(res == 0) { // min == i
dups[num_dups] = i;
num_dups++;
}
}
}
TUPLE * ret;
if(!merge_) {
ret = current_[min];
} else {
// use merge function to build a new ret.
abort();
}
// advance the iterators that match the tuple we're returning.
for(int i = 0; i < num_dups; i++) {
TUPLE::freetuple(current_[dups[i]]); // should never be null
current_[dups[i]] = iters_[dups[i]-1]->getnext();
}
last_iter_ = min; // mark the min iter to be advance at the next invocation of next(). This saves us a copy in the non-merging case.
return ret;
}
private:
int num_iters_;
ITRA * first_iter_;
ITRN ** iters_;
TUPLE ** current_;
int last_iter_;
static inline size_t key (int xid, lladdIterator_t *it, byte **key) int (*cmp_)(const TUPLE*,const TUPLE*);
{ TUPLE*(*merge_)(const TUPLE*,const TUPLE*);
logtreeIterator_s * impl = (logtreeIterator_s*)it->impl;
*key = (byte*)(impl->t+1); // temporary variables initiaized once for effiency
return impl->current.size - sizeof(indexnode_rec); int * dups;
} };
template<class TUPLE>
class logtableIterator {
public:
explicit logtableIterator(logtable* ltable)
: ltable(ltable),
epoch(ltable->get_epoch()),
merge_it_(NULL),
last_returned(NULL),
key(NULL) {
readlock(ltable->header_lock, 0);
validate();
unlock(ltable->header_lock);
}
explicit logtableIterator(logtable* ltable,TUPLE *key)
: ltable(ltable),
epoch(ltable->get_epoch()),
merge_it_(NULL),
last_returned(NULL),
key(key) {
readlock(ltable->header_lock, 0);
validate();
unlock(ltable->header_lock);
}
~logtableIterator() {
invalidate();
}
TUPLE * getnext() {
readlock(ltable->header_lock, 0);
revalidate();
last_returned = merge_it_->getnext();
unlock(ltable->header_lock);
return last_returned;
}
private:
inline void init_helper();
explicit logtableIterator() { abort(); }
void operator=(logtableIterator<TUPLE> & t) { abort(); }
int operator-(logtableIterator<TUPLE> & t) { abort(); }
private:
static const int C1 = 0;
static const int C1_MERGEABLE = 1;
static const int C2 = 2;
logtable * ltable;
uint64_t epoch;
typedef mergeManyIterator<changingMemTreeIterator<rbtree_t, TUPLE>, memTreeIterator<rbtree_t, TUPLE>, TUPLE> inner_merge_it_t;
// typedef mergeManyIterator<memTreeIterator<rbtree_t, TUPLE>, diskTreeIterator<TUPLE>, TUPLE> merge_it_t;
typedef mergeManyIterator<inner_merge_it_t, diskTreeIterator<TUPLE>, TUPLE> merge_it_t;
merge_it_t* merge_it_;
TUPLE * last_returned;
TUPLE * key;
void revalidate() {
if(ltable->get_epoch() != epoch) {
TUPLE* delme = last_returned = last_returned->create_copy();
invalidate();
validate();
TUPLE::freetuple(delme);
}
}
static inline size_t value(int xid, lladdIterator_t *it, byte **value) void invalidate() {
{ delete merge_it_;
logtreeIterator_s * impl = (logtreeIterator_s*)it->impl; }
*value = (byte*)&(impl->t->ptr); void validate() {
return sizeof(impl->t->ptr); changingMemTreeIterator<rbtree_t, TUPLE> * c0_it;
} memTreeIterator<rbtree_t, TUPLE> * c0_mergeable_it[1];
diskTreeIterator<TUPLE> * disk_it[3];
epoch = ltable->get_epoch();
if(last_returned) {
c0_it = new changingMemTreeIterator<rbtree_t, TUPLE>(ltable->get_tree_c0(), ltable->getMergeData()->rbtree_mut, last_returned);
c0_mergeable_it[0] = new memTreeIterator<rbtree_t, TUPLE> (ltable->get_tree_c0_mergeable(), last_returned);
disk_it[0] = new diskTreeIterator<TUPLE> (ltable->get_tree_c1(), *last_returned);
disk_it[1] = new diskTreeIterator<TUPLE> (ltable->get_tree_c1_mergeable(), *last_returned);
disk_it[2] = new diskTreeIterator<TUPLE> (ltable->get_tree_c2(), *last_returned);
} else if(key) {
c0_it = new changingMemTreeIterator<rbtree_t, TUPLE>(ltable->get_tree_c0(), ltable->getMergeData()->rbtree_mut, key);
c0_mergeable_it[0] = new memTreeIterator<rbtree_t, TUPLE> (ltable->get_tree_c0_mergeable(), key);
disk_it[0] = new diskTreeIterator<TUPLE> (ltable->get_tree_c1(), *key);
disk_it[1] = new diskTreeIterator<TUPLE> (ltable->get_tree_c1_mergeable(), *key);
disk_it[2] = new diskTreeIterator<TUPLE> (ltable->get_tree_c2(), *key);
} else {
c0_it = new changingMemTreeIterator<rbtree_t, TUPLE>(ltable->get_tree_c0(), ltable->getMergeData()->rbtree_mut );
c0_mergeable_it[0] = new memTreeIterator<rbtree_t, TUPLE> (ltable->get_tree_c0_mergeable() );
disk_it[0] = new diskTreeIterator<TUPLE> (ltable->get_tree_c1() );
disk_it[1] = new diskTreeIterator<TUPLE> (ltable->get_tree_c1_mergeable() );
disk_it[2] = new diskTreeIterator<TUPLE> (ltable->get_tree_c2() );
}
static inline void tupleDone(int xid, void *it) { } inner_merge_it_t * inner_merge_it =
static inline void releaseLock(int xid, void *it) { } new inner_merge_it_t(c0_it, c0_mergeable_it, 1, NULL, TUPLE::compare_obj);
merge_it_ = new merge_it_t(inner_merge_it, disk_it, 3, NULL, TUPLE::compare_obj); // XXX Hardcodes comparator, and does not handle merges
}
}; };
#endif #endif

View file

@ -16,7 +16,6 @@ int merge_scheduler::addlogtable(logtable *ltable)
struct logtable_mergedata * mdata = new logtable_mergedata; struct logtable_mergedata * mdata = new logtable_mergedata;
// initialize merge data // initialize merge data
mdata->header_lock = initlock();
mdata->rbtree_mut = new pthread_mutex_t; mdata->rbtree_mut = new pthread_mutex_t;
pthread_mutex_init(mdata->rbtree_mut,0); pthread_mutex_init(mdata->rbtree_mut,0);
ltable->set_tree_c0_mergeable(NULL); ltable->set_tree_c0_mergeable(NULL);
@ -47,7 +46,6 @@ merge_scheduler::~merge_scheduler()
logtable_mergedata *mdata = mergedata[i].second; logtable_mergedata *mdata = mergedata[i].second;
//delete the mergedata fields //delete the mergedata fields
deletelock(mdata->header_lock);
delete mdata->rbtree_mut; delete mdata->rbtree_mut;
delete mdata->input_needed; delete mdata->input_needed;
delete mdata->input_ready_cond; delete mdata->input_ready_cond;
@ -218,7 +216,7 @@ void* memMergeThread(void*arg)
while(true) while(true)
{ {
writelock(ltable->mergedata->header_lock,0); writelock(ltable->header_lock,0);
int done = 0; int done = 0;
// wait for c0_mergable // wait for c0_mergable
while(!ltable->get_tree_c0_mergeable()) while(!ltable->get_tree_c0_mergeable())
@ -235,12 +233,12 @@ void* memMergeThread(void*arg)
} }
printf("mmt:\twaiting for block ready cond\n"); printf("mmt:\twaiting for block ready cond\n");
unlock(ltable->mergedata->header_lock); unlock(ltable->header_lock);
pthread_cond_wait(a->in_block_ready_cond, a->block_ready_mut); pthread_cond_wait(a->in_block_ready_cond, a->block_ready_mut);
pthread_mutex_unlock(a->block_ready_mut); pthread_mutex_unlock(a->block_ready_mut);
writelock(ltable->mergedata->header_lock,0); writelock(ltable->header_lock,0);
printf("mmt:\tblock ready\n"); printf("mmt:\tblock ready\n");
} }
@ -251,7 +249,7 @@ void* memMergeThread(void*arg)
pthread_mutex_lock(a->block_ready_mut); pthread_mutex_lock(a->block_ready_mut);
pthread_cond_signal(a->out_block_ready_cond); // no block is ready. this allows the other thread to wake up, and see that we're shutting down. pthread_cond_signal(a->out_block_ready_cond); // no block is ready. this allows the other thread to wake up, and see that we're shutting down.
pthread_mutex_unlock(a->block_ready_mut); pthread_mutex_unlock(a->block_ready_mut);
unlock(ltable->mergedata->header_lock); unlock(ltable->header_lock);
break; break;
} }
@ -261,7 +259,7 @@ void* memMergeThread(void*arg)
// 4: Merge // 4: Merge
//create the iterators //create the iterators
treeIterator<datatuple> *itrA = new treeIterator<datatuple>(ltable->get_tree_c1()->get_root_rec()); // XXX don't want get_root_rec() to be here. diskTreeIterator<datatuple> *itrA = new diskTreeIterator<datatuple>(ltable->get_tree_c1()->get_root_rec()); // XXX don't want get_root_rec() to be here.
memTreeIterator<rbtree_t, datatuple> *itrB = memTreeIterator<rbtree_t, datatuple> *itrB =
new memTreeIterator<rbtree_t, datatuple>(ltable->get_tree_c0_mergeable()); new memTreeIterator<rbtree_t, datatuple>(ltable->get_tree_c0_mergeable());
@ -270,7 +268,7 @@ void* memMergeThread(void*arg)
diskTreeComponent * c1_prime = new diskTreeComponent(xid); // XXX should not hardcode region size) diskTreeComponent * c1_prime = new diskTreeComponent(xid); // XXX should not hardcode region size)
//pthread_mutex_unlock(a->block_ready_mut); //pthread_mutex_unlock(a->block_ready_mut);
unlock(ltable->mergedata->header_lock); unlock(ltable->header_lock);
//: do the merge //: do the merge
printf("mmt:\tMerging:\n"); printf("mmt:\tMerging:\n");
@ -299,7 +297,7 @@ void* memMergeThread(void*arg)
//now atomically replace the old c1 with new c1 //now atomically replace the old c1 with new c1
//pthread_mutex_lock(a->block_ready_mut); //pthread_mutex_lock(a->block_ready_mut);
writelock(ltable->mergedata->header_lock,0); writelock(ltable->header_lock,0);
merge_count++; merge_count++;
printf("mmt:\tmerge_count %d #pages written %lld\n", merge_count, npages); printf("mmt:\tmerge_count %d #pages written %lld\n", merge_count, npages);
@ -325,11 +323,11 @@ void* memMergeThread(void*arg)
// XXX need to report backpressure here! Also, shouldn't be inside a transaction while waiting on backpressure. // XXX need to report backpressure here! Also, shouldn't be inside a transaction while waiting on backpressure.
while(ltable->get_tree_c1_mergeable()) { while(ltable->get_tree_c1_mergeable()) {
pthread_mutex_lock(a->block_ready_mut); pthread_mutex_lock(a->block_ready_mut);
unlock(ltable->mergedata->header_lock); unlock(ltable->header_lock);
pthread_cond_wait(a->out_block_needed_cond, a->block_ready_mut); pthread_cond_wait(a->out_block_needed_cond, a->block_ready_mut);
pthread_mutex_unlock(a->block_ready_mut); pthread_mutex_unlock(a->block_ready_mut);
writelock(ltable->mergedata->header_lock,0); writelock(ltable->header_lock,0);
} }
ltable->set_tree_c1_mergeable(c1_prime); ltable->set_tree_c1_mergeable(c1_prime);
@ -346,7 +344,7 @@ void* memMergeThread(void*arg)
Tcommit(xid); Tcommit(xid);
unlock(ltable->mergedata->header_lock); unlock(ltable->header_lock);
//TODO: get the freeing outside of the lock //TODO: get the freeing outside of the lock
} }
@ -372,7 +370,7 @@ void *diskMergeThread(void*arg)
while(true) while(true)
{ {
writelock(ltable->mergedata->header_lock,0); writelock(ltable->header_lock,0);
int done = 0; int done = 0;
// get a new input for merge // get a new input for merge
while(!ltable->get_tree_c1_mergeable()) while(!ltable->get_tree_c1_mergeable())
@ -388,28 +386,28 @@ void *diskMergeThread(void*arg)
} }
printf("dmt:\twaiting for block ready cond\n"); printf("dmt:\twaiting for block ready cond\n");
unlock(ltable->mergedata->header_lock); unlock(ltable->header_lock);
pthread_cond_wait(a->in_block_ready_cond, a->block_ready_mut); pthread_cond_wait(a->in_block_ready_cond, a->block_ready_mut);
pthread_mutex_unlock(a->block_ready_mut); pthread_mutex_unlock(a->block_ready_mut);
printf("dmt:\tblock ready\n"); printf("dmt:\tblock ready\n");
writelock(ltable->mergedata->header_lock,0); writelock(ltable->header_lock,0);
} }
*a->in_block_needed = false; *a->in_block_needed = false;
if(done==1) if(done==1)
{ {
pthread_cond_signal(a->out_block_ready_cond); pthread_cond_signal(a->out_block_ready_cond);
unlock(ltable->mergedata->header_lock); unlock(ltable->header_lock);
break; break;
} }
int64_t mergedPages=0; int64_t mergedPages=0;
//create the iterators //create the iterators
treeIterator<datatuple> *itrA = new treeIterator<datatuple>(ltable->get_tree_c2()->get_root_rec()); diskTreeIterator<datatuple> *itrA = new diskTreeIterator<datatuple>(ltable->get_tree_c2()->get_root_rec());
treeIterator<datatuple> *itrB = diskTreeIterator<datatuple> *itrB =
new treeIterator<datatuple>(ltable->get_tree_c1_mergeable()->get_root_rec()); new diskTreeIterator<datatuple>(ltable->get_tree_c1_mergeable()->get_root_rec());
xid = Tbegin(); xid = Tbegin();
@ -417,7 +415,7 @@ void *diskMergeThread(void*arg)
//TODO: maybe you want larger regions for the second tree? //TODO: maybe you want larger regions for the second tree?
diskTreeComponent * c2_prime = new diskTreeComponent(xid); diskTreeComponent * c2_prime = new diskTreeComponent(xid);
unlock(ltable->mergedata->header_lock); unlock(ltable->header_lock);
//do the merge //do the merge
@ -446,7 +444,7 @@ void *diskMergeThread(void*arg)
//writes complete //writes complete
//now atomically replace the old c2 with new c2 //now atomically replace the old c2 with new c2
//pthread_mutex_lock(a->block_ready_mut); //pthread_mutex_lock(a->block_ready_mut);
writelock(ltable->mergedata->header_lock,0); writelock(ltable->header_lock,0);
merge_count++; merge_count++;
//update the current optimal R value //update the current optimal R value
@ -463,7 +461,7 @@ void *diskMergeThread(void*arg)
Tcommit(xid); Tcommit(xid);
unlock(ltable->mergedata->header_lock); unlock(ltable->header_lock);
} }
return 0; return 0;
} }

View file

@ -4,12 +4,10 @@
#include <vector> #include <vector>
#include <utility> #include <utility>
#include "logstore.h"
#include "logiterators.h"
//TODO: 400 bytes overhead per tuple, this is nuts, check if this is true... //TODO: 400 bytes overhead per tuple, this is nuts, check if this is true...
static const int RB_TREE_OVERHEAD = 400; static const int RB_TREE_OVERHEAD = 400;
static const double MIN_R = 3.0; static const double MIN_R = 3.0;
class logtable;
struct merger_args struct merger_args
{ {
@ -32,15 +30,12 @@ struct merger_args
}; };
struct logtable_mergedata struct logtable_mergedata
{ {
//merge threads //merge threads
pthread_t diskmerge_thread; pthread_t diskmerge_thread;
pthread_t memmerge_thread; pthread_t memmerge_thread;
rwl *header_lock;
pthread_mutex_t * rbtree_mut; pthread_mutex_t * rbtree_mut;
bool *input_needed; // memmerge-input needed bool *input_needed; // memmerge-input needed
@ -56,13 +51,15 @@ struct logtable_mergedata
}; };
#include "logstore.h" // XXX hacky include workaround.
#include "logiterators.h"
class merge_scheduler class merge_scheduler
{ {
std::vector<std::pair<logtable *, logtable_mergedata*> > mergedata; std::vector<std::pair<logtable *, logtable_mergedata*> > mergedata;
public: public:
//static pageid_t C0_MEM_SIZE;
~merge_scheduler(); ~merge_scheduler();
int addlogtable(logtable * ltable); int addlogtable(logtable * ltable);

View file

@ -11,8 +11,18 @@
#include <stdio.h> #include <stdio.h>
#include <errno.h> #include <errno.h>
#include <string>
typedef unsigned char byte;
#include <cstring>
#include <assert.h>
typedef uint8_t network_op_t; typedef uint8_t network_op_t;
typedef uint32_t len_t ;
static const len_t DELETE = ((len_t)0) - 1;
#include <datatuple.h>
//server codes //server codes
static const network_op_t LOGSTORE_FIRST_RESPONSE_CODE = 1; static const network_op_t LOGSTORE_FIRST_RESPONSE_CODE = 1;
static const network_op_t LOGSTORE_RESPONSE_SUCCESS = 1; static const network_op_t LOGSTORE_RESPONSE_SUCCESS = 1;
@ -155,10 +165,10 @@ static inline int writeoptosocket(int sockd, network_op_t op) {
*/ */
static inline datatuple* readtuplefromsocket(int sockd, int * err) { static inline datatuple* readtuplefromsocket(int sockd, int * err) {
datatuple::len_t keylen, datalen, buflen; len_t keylen, datalen, buflen;
if(( *err = readfromsocket(sockd, &keylen, sizeof(keylen)) )) return NULL; if(( *err = readfromsocket(sockd, &keylen, sizeof(keylen)) )) return NULL;
if(keylen == datatuple::DELETE) return NULL; // *err is zero. if(keylen == DELETE) return NULL; // *err is zero.
if(( *err = readfromsocket(sockd, &datalen, sizeof(datalen)) )) return NULL; if(( *err = readfromsocket(sockd, &datalen, sizeof(datalen)) )) return NULL;
buflen = datatuple::length_from_header(keylen, datalen); buflen = datatuple::length_from_header(keylen, datalen);
@ -169,14 +179,21 @@ static inline datatuple* readtuplefromsocket(int sockd, int * err) {
return datatuple::from_bytes(keylen, datalen, bytes); // from_bytes consumes the buffer. return datatuple::from_bytes(keylen, datalen, bytes); // from_bytes consumes the buffer.
} }
static inline int writeendofiteratortosocket(int sockd) {
return writetosocket(sockd, &DELETE, sizeof(DELETE));
}
static inline int writetupletosocket(int sockd, const datatuple* tup) { static inline int writetupletosocket(int sockd, const datatuple* tup) {
datatuple::len_t keylen, datalen; len_t keylen, datalen;
const byte* buf = tup->get_bytes(&keylen, &datalen);
int err; int err;
if(( err = writetosocket(sockd, &keylen, sizeof(keylen)) )) return err;
if(( err = writetosocket(sockd, &datalen, sizeof(datalen)) )) return err; if(tup == NULL) {
if(( err = writetosocket(sockd, buf, datatuple::length_from_header(keylen, datalen)) )) return err; if(( err = writeendofiteratortosocket(sockd) )) return err;
} else {
const byte* buf = tup->get_bytes(&keylen, &datalen);
if(( err = writetosocket(sockd, &keylen, sizeof(keylen)) )) return err;
if(( err = writetosocket(sockd, &datalen, sizeof(datalen)) )) return err;
if(( err = writetosocket(sockd, buf, datatuple::length_from_header(keylen, datalen)) )) return err;
}
return 0; return 0;
} }
@ -188,8 +205,5 @@ static inline uint64_t readcountfromsocket(int sockd, int *err) {
static inline int writecounttosocket(int sockd, uint64_t count) { static inline int writecounttosocket(int sockd, uint64_t count) {
return writetosocket(sockd, &count, sizeof(count)); return writetosocket(sockd, &count, sizeof(count));
} }
static inline int writeendofiteratortosocket(int sockd) {
return writetosocket(sockd, &datatuple::DELETE, sizeof(datatuple::DELETE));
}
#endif /* NETWORK_H_ */ #endif /* NETWORK_H_ */

View file

@ -62,7 +62,7 @@ static inline void close_conn(logstore_handle_t *l) {
} }
datatuple * datatuple *
logstore_client_op(logstore_handle_t *l, logstore_client_op(logstore_handle_t *l,
uint8_t opcode, datatuple * tuple) uint8_t opcode, datatuple * tuple, datatuple * tuple2, uint64_t count)
{ {
if(l->server_socket < 0) if(l->server_socket < 0)
@ -105,23 +105,34 @@ logstore_client_op(logstore_handle_t *l,
//send the opcode //send the opcode
if( writetosocket(l->server_socket, &opcode, sizeof(opcode)) ) { close_conn(l); return 0; } if( writetosocket(l->server_socket, &opcode, sizeof(opcode)) ) { close_conn(l); return 0; }
//send the tuple //send the first tuple
if( writetupletosocket(l->server_socket, tuple) ) { close_conn(l); return 0; } if( writetupletosocket(l->server_socket, tuple) ) { close_conn(l); return 0; }
//send the second tuple
if( writetupletosocket(l->server_socket, tuple2) ) { close_conn(l); return 0; }
if( count != (uint64_t)-1) {
if( writecounttosocket(l->server_socket, count) ) { close_conn(l); return 0; }
}
network_op_t rcode = readopfromsocket(l->server_socket,LOGSTORE_SERVER_RESPONSE); network_op_t rcode = readopfromsocket(l->server_socket,LOGSTORE_SERVER_RESPONSE);
if( opiserror(rcode) ) { close_conn(l); return 0; } if( opiserror(rcode) ) { close_conn(l); return 0; }
datatuple * ret; datatuple * ret = 0;
if(rcode == LOGSTORE_RESPONSE_SENDING_TUPLES) if(rcode == LOGSTORE_RESPONSE_SENDING_TUPLES)
{ int err; { int err;
uint64_t count = 0; // XXX uint64_t count = 0; // XXX
while(( ret = readtuplefromsocket(l->server_socket, &err) )) { datatuple *nxt;
while(( nxt = readtuplefromsocket(l->server_socket, &err) )) {
if(ret) datatuple::freetuple(ret); // XXX
ret = nxt;
if(err) { close_conn(l); return 0; } if(err) { close_conn(l); return 0; }
count++; count++;
} }
printf("return count: %lld\n", count); if(count > 1) { printf("return count: %lld\n", count); }
} else if(rcode == LOGSTORE_RESPONSE_SUCCESS) { } else if(rcode == LOGSTORE_RESPONSE_SUCCESS) {
ret = tuple; ret = tuple;
} else { } else {

View file

@ -16,7 +16,8 @@ logstore_handle_t * logstore_client_open(const char *host, int portnum, int time
datatuple * logstore_client_op(logstore_handle_t* l, datatuple * logstore_client_op(logstore_handle_t* l,
uint8_t opcode, uint8_t opcode,
datatuple *tuple); datatuple *tuple = NULL, datatuple *tuple2 = NULL,
uint64_t count = (uint64_t)-1);
int logstore_client_close(logstore_handle_t* l); int logstore_client_close(logstore_handle_t* l);

View file

@ -24,8 +24,8 @@ int main(int argc, char **argv)
// lsmTableHandle<PAGELAYOUT>* h = TlsmTableStart<PAGELAYOUT>(lsmTable, INVALID_COL); // lsmTableHandle<PAGELAYOUT>* h = TlsmTableStart<PAGELAYOUT>(lsmTable, INVALID_COL);
xid = Tbegin(); xid = Tbegin();
lladdIterator_t * it = logtreeIterator::open(xid,ltable.get_tree_c2()->get_root_rec() ); lladdIterator_t * it = diskTreeComponentIterator::open(xid,ltable.get_tree_c2()->get_root_rec() );
logtreeIterator::close(xid, it); diskTreeComponentIterator::close(xid, it);
Tcommit(xid); Tcommit(xid);
diskTreeComponent::deinit_stasis(); diskTreeComponent::deinit_stasis();

View file

@ -18,7 +18,7 @@
#include "check_util.h" #include "check_util.h"
template class treeIterator<datatuple>; template class diskTreeIterator<datatuple>;
void insertProbeIter(size_t NUM_ENTRIES) void insertProbeIter(size_t NUM_ENTRIES)
{ {
@ -115,7 +115,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
printf("Stage 2: Sequentially reading %d tuples\n", NUM_ENTRIES); printf("Stage 2: Sequentially reading %d tuples\n", NUM_ENTRIES);
size_t tuplenum = 0; size_t tuplenum = 0;
treeIterator<datatuple> tree_itr(tree_root); diskTreeIterator<datatuple> tree_itr(tree_root);
datatuple *dt=0; datatuple *dt=0;

View file

@ -129,12 +129,12 @@ void insertProbeIter_str(int NUM_ENTRIES)
int64_t count = 0; int64_t count = 0;
lladdIterator_t * it = logtreeIterator::open(xid, tree); lladdIterator_t * it = diskTreeComponentIterator::open(xid, tree);
while(logtreeIterator::next(xid, it)) { while(diskTreeComponentIterator::next(xid, it)) {
byte * key; byte * key;
byte **key_ptr = &key; byte **key_ptr = &key;
size_t keysize = logtreeIterator::key(xid, it, (byte**)key_ptr); size_t keysize = diskTreeComponentIterator::key(xid, it, (byte**)key_ptr);
pageid_t *value; pageid_t *value;
pageid_t **value_ptr = &value; pageid_t **value_ptr = &value;
@ -147,7 +147,7 @@ void insertProbeIter_str(int NUM_ENTRIES)
} }
assert(count == NUM_ENTRIES); assert(count == NUM_ENTRIES);
logtreeIterator::close(xid, it); diskTreeComponentIterator::close(xid, it);
Tcommit(xid); Tcommit(xid);
diskTreeComponent::deinit_stasis(); diskTreeComponent::deinit_stasis();

View file

@ -110,12 +110,12 @@ void insertProbeIter(size_t NUM_ENTRIES)
for(size_t i = 0; i < NUM_ENTRIES; i++) for(size_t i = 0; i < NUM_ENTRIES; i++)
{ {
//prepare the key //prepare the key
datatuple::len_t keylen = (*key_arr)[i].length()+1; len_t keylen = (*key_arr)[i].length()+1;
//prepare the data //prepare the data
std::string ditem; std::string ditem;
getnextdata(ditem, 8192); getnextdata(ditem, 8192);
datatuple::len_t datalen = ditem.length()+1; len_t datalen = ditem.length()+1;
datatuple* newtuple = datatuple::create((*key_arr)[i].c_str(), keylen, datatuple* newtuple = datatuple::create((*key_arr)[i].c_str(), keylen,
ditem.c_str(), datalen); ditem.c_str(), datalen);
@ -155,7 +155,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
//fflush(stdout); //fflush(stdout);
//get the key //get the key
datatuple::len_t keylen = (*key_arr)[ri].length()+1; len_t keylen = (*key_arr)[ri].length()+1;
datatuple* searchtuple = datatuple::create((*key_arr)[ri].c_str(), keylen); datatuple* searchtuple = datatuple::create((*key_arr)[ri].c_str(), keylen);
@ -175,6 +175,10 @@ void insertProbeIter(size_t NUM_ENTRIES)
} }
printf("found %d\n", found_tuples); printf("found %d\n", found_tuples);
printf("Stage 3: Initiating scan TODO: look at results\n");
logstore_client_op(l, OP_SCAN, NULL, NULL, 0); // start = NULL stop = NULL limit = NONE
key_arr->clear(); key_arr->clear();
delete key_arr; delete key_arr;
@ -198,6 +202,9 @@ int main(int argc, char* argv[])
} }
//insertProbeIter(25000); //insertProbeIter(25000);
insertProbeIter(100000); insertProbeIter(100000);
//insertProbeIter(5000);
// insertProbeIter(100);
/* /*
insertProbeIter(5000); insertProbeIter(5000);
insertProbeIter(2500); insertProbeIter(2500);

View file

@ -36,8 +36,8 @@ datatuple* append_merger(datatuple *t1, datatuple *t2)
{ {
assert(!(t1->isDelete() || t2->isDelete())); assert(!(t1->isDelete() || t2->isDelete()));
datatuple::len_t keylen = t1->keylen(); len_t keylen = t1->keylen();
datatuple::len_t datalen = t1->datalen() + t2->datalen(); len_t datalen = t1->datalen() + t2->datalen();
byte * data = (byte*)malloc(datalen); byte * data = (byte*)malloc(datalen);
memcpy(data, t1->data(), t1->datalen()); memcpy(data, t1->data(), t1->datalen());
memcpy(data + t1->datalen(), t2->data(), t2->datalen()); memcpy(data + t1->datalen(), t2->data(), t2->datalen());