From 63a14d85094633db881942bcd459c92b3c382fcf Mon Sep 17 00:00:00 2001 From: sears Date: Sat, 20 Feb 2010 01:18:39 +0000 Subject: [PATCH] network iterator API is now working (though it could use a better test suite) git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@601 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe --- datatuple.h | 9 +- diskTreeComponent.cpp | 30 ++--- diskTreeComponent.h | 56 +++++++-- logiterators.cpp | 71 ++++++++--- logiterators.h | 114 +++++++++++++++--- logserver.cpp | 51 +++++--- logstore.cpp | 37 +++--- logstore.h | 253 ++++++++++++++++++++++++++++++++------- merger.cpp | 42 ++++--- merger.h | 11 +- network.h | 36 ++++-- tcpclient.cpp | 21 +++- tcpclient.h | 3 +- test/check_gen.cpp | 4 +- test/check_logtable.cpp | 4 +- test/check_logtree.cpp | 8 +- test/check_tcpclient.cpp | 13 +- tuplemerger.cpp | 4 +- 18 files changed, 567 insertions(+), 200 deletions(-) diff --git a/datatuple.h b/datatuple.h index d1449de..8817e77 100644 --- a/datatuple.h +++ b/datatuple.h @@ -1,3 +1,5 @@ +#include + #ifndef _DATATUPLE_H_ #define _DATATUPLE_H_ @@ -6,13 +8,12 @@ typedef unsigned char byte; #include #include + typedef struct datatuple { public: - typedef uint32_t len_t ; typedef unsigned char* key_t ; typedef unsigned char* data_t ; - static const len_t DELETE = ((len_t)0) - 1; private: len_t datalen_; byte* key_; @@ -62,6 +63,10 @@ public: return strcmp((char*)k1,(char*)k2); } + static int compare_obj(const datatuple * a, const datatuple* b) { + return compare(a->key(), b->key()); + } + inline void setDelete() { datalen_ = DELETE; } diff --git a/diskTreeComponent.cpp b/diskTreeComponent.cpp index 7739fc5..e1da97e 100644 --- a/diskTreeComponent.cpp +++ b/diskTreeComponent.cpp @@ -61,7 +61,7 @@ void diskTreeComponent::init_stasis() { void diskTreeComponent::deinit_stasis() { Tdeinit(); } void diskTreeComponent::free_region_rid(int xid, recordid tree, - logtree_page_deallocator_t dealloc, void *allocator_state) + diskTreeComponent_page_deallocator_t dealloc, void *allocator_state) { // Tdealloc(xid,tree); dealloc(xid,allocator_state); @@ -218,6 +218,7 @@ recordid diskTreeComponent::create(int xid) return ret; } +// XXX remove the next N records, which are completely redundant. /** * TODO: what happen if there is already such a record with a different size? @@ -272,13 +273,8 @@ const byte* diskTreeComponent::readRecord(int xid, Page * p, slotid_t slot, int6 rid.page = p->id; rid.slot = slot; rid.size = size; - //byte *ret = (byte*)malloc(rid.size); - //stasis_record_read(xid,p,rid,ret); - //return ret; const byte *nr = stasis_record_read_begin(xid,p,rid); return nr; -// return readRecord(xid, p, rid); - } int32_t diskTreeComponent::readRecordLength(int xid, Page *p, slotid_t slot) @@ -482,7 +478,7 @@ recordid diskTreeComponent::appendInternalNode(int xid, Page *p, int64_t depth, const byte *key, size_t key_len, pageid_t val_page, pageid_t lastLeaf, - logtree_page_allocator_t allocator, + diskTreeComponent_page_allocator_t allocator, void *allocator_state) { // assert(*stasis_page_type_ptr(p) == LOGTREE_ROOT_PAGE || @@ -553,7 +549,7 @@ recordid diskTreeComponent::appendInternalNode(int xid, Page *p, recordid diskTreeComponent::buildPathToLeaf(int xid, recordid root, Page *root_p, int64_t depth, const byte *key, size_t key_len, pageid_t val_page, pageid_t lastLeaf, - logtree_page_allocator_t allocator, + diskTreeComponent_page_allocator_t allocator, void *allocator_state) { @@ -828,10 +824,10 @@ void diskTreeComponent::print_tree(int xid, pageid_t pid, int64_t depth) } ///////////////////////////////////////////////// -//logtreeIterator implementation +//diskTreeComponentIterator implementation ///////////////////////////////////////////////// -lladdIterator_t* logtreeIterator::open(int xid, recordid root) +lladdIterator_t* diskTreeComponentIterator::open(int xid, recordid root) { if(root.page == 0 && root.slot == 0 && root.size == -1) return 0; @@ -860,7 +856,7 @@ lladdIterator_t* logtreeIterator::open(int xid, recordid root) assert(depth == 0); - logtreeIterator_s *impl = (logtreeIterator_s*)malloc(sizeof(logtreeIterator_s)); + diskTreeComponentIterator_t *impl = (diskTreeComponentIterator_t*)malloc(sizeof(diskTreeComponentIterator_t)); impl->p = p; { recordid rid = { p->id, 1, 0};//keySize }; //TODO: why does this start from 1? @@ -876,7 +872,7 @@ lladdIterator_t* logtreeIterator::open(int xid, recordid root) return it; } -lladdIterator_t* logtreeIterator::openAt(int xid, recordid root, const byte* key) +lladdIterator_t* diskTreeComponentIterator::openAt(int xid, recordid root, const byte* key) { if(root.page == NULLRID.page && root.slot == NULLRID.slot) return 0; @@ -906,7 +902,7 @@ lladdIterator_t* logtreeIterator::openAt(int xid, recordid root, const byte* key readlock(p->rwlatch,0); } - logtreeIterator_s *impl = (logtreeIterator_s*) malloc(sizeof(logtreeIterator_s)); + diskTreeComponentIterator_t *impl = (diskTreeComponentIterator_t*) malloc(sizeof(diskTreeComponentIterator_t)); impl->p = p; impl->current.page = lsm_entry_rid.page; @@ -925,9 +921,9 @@ lladdIterator_t* logtreeIterator::openAt(int xid, recordid root, const byte* key /** * move to the next page **/ -int logtreeIterator::next(int xid, lladdIterator_t *it) +int diskTreeComponentIterator::next(int xid, lladdIterator_t *it) { - logtreeIterator_s *impl = (logtreeIterator_s*) it->impl; + diskTreeComponentIterator_t *impl = (diskTreeComponentIterator_t*) it->impl; impl->current = stasis_record_next(xid, impl->p, impl->current); @@ -987,9 +983,9 @@ int logtreeIterator::next(int xid, lladdIterator_t *it) } -void logtreeIterator::close(int xid, lladdIterator_t *it) +void diskTreeComponentIterator::close(int xid, lladdIterator_t *it) { - logtreeIterator_s *impl = (logtreeIterator_s*)it->impl; + diskTreeComponentIterator_t *impl = (diskTreeComponentIterator_t*)it->impl; if(impl->p) { unlock(impl->p->rwlatch); diff --git a/diskTreeComponent.h b/diskTreeComponent.h index 7f783c2..8a939a1 100644 --- a/diskTreeComponent.h +++ b/diskTreeComponent.h @@ -32,10 +32,12 @@ typedef struct RegionAllocConf_t pageid_t regionSize; } RegionAllocConf_t; +struct indexnode_rec { + pageid_t ptr; +}; -typedef pageid_t(*logtree_page_allocator_t)(int, void *); -typedef void(*logtree_page_deallocator_t)(int, void *); - +typedef pageid_t(*diskTreeComponent_page_allocator_t)(int, void *); +typedef void(*diskTreeComponent_page_deallocator_t)(int, void *); class diskTreeComponent{ public: @@ -55,7 +57,7 @@ public: static pageid_t*list_region_rid(int xid, void * ridp, pageid_t * region_len, pageid_t * region_count); static void dealloc_region_rid(int xid, recordid rid); static void free_region_rid(int xid, recordid tree, - logtree_page_deallocator_t dealloc, + diskTreeComponent_page_deallocator_t dealloc, void *allocator_state); static void writeNodeRecord(int xid, Page *p, recordid &rid, @@ -92,20 +94,20 @@ public: //rmLeafID --> rightmost leaf id static recordid appendPage(int xid, recordid tree, pageid_t & rmLeafID, const byte *key,size_t keySize, - logtree_page_allocator_t allocator, void *allocator_state, + diskTreeComponent_page_allocator_t allocator, void *allocator_state, long val_page); static recordid appendInternalNode(int xid, Page *p, int64_t depth, const byte *key, size_t key_len, pageid_t val_page, pageid_t lastLeaf, - logtree_page_allocator_t allocator, + diskTreeComponent_page_allocator_t allocator, void *allocator_state); static recordid buildPathToLeaf(int xid, recordid root, Page *root_p, int64_t depth, const byte *key, size_t key_len, pageid_t val_page, pageid_t lastLeaf, - logtree_page_allocator_t allocator, + diskTreeComponent_page_allocator_t allocator, void *allocator_state); inline DataPage::RegionAllocator* get_alloc() { return region_alloc; } @@ -143,4 +145,44 @@ private: }; +typedef struct { + Page * p; + recordid current; + indexnode_rec *t; + int justOnePage; +} diskTreeComponentIterator_t; + + +class diskTreeComponentIterator +{ + +public: + static lladdIterator_t* open(int xid, recordid root); + static lladdIterator_t* openAt(int xid, recordid root, const byte* key); + static int next(int xid, lladdIterator_t *it); + static void close(int xid, lladdIterator_t *it); + + + static inline size_t key (int xid, lladdIterator_t *it, byte **key) + { + diskTreeComponentIterator_t * impl = (diskTreeComponentIterator_t*)it->impl; + *key = (byte*)(impl->t+1); + return impl->current.size - sizeof(indexnode_rec); + } + + + static inline size_t value(int xid, lladdIterator_t *it, byte **value) + { + diskTreeComponentIterator_t * impl = (diskTreeComponentIterator_t*)it->impl; + *value = (byte*)&(impl->t->ptr); + return sizeof(impl->t->ptr); + } + + static inline void tupleDone(int xid, void *it) { } + static inline void releaseLock(int xid, void *it) { } + +}; + + + #endif /* DISKTREECOMPONENT_H_ */ diff --git a/logiterators.cpp b/logiterators.cpp index a2cf8ba..5dab878 100644 --- a/logiterators.cpp +++ b/logiterators.cpp @@ -6,28 +6,66 @@ ///////////////////////////////////////////////////////////////////// template -treeIterator::treeIterator(recordid tree) : +void diskTreeIterator::init_iterators(TUPLE * key1, TUPLE * key2) { + assert(!key2); // unimplemented + if(tree_.size == INVALID_SIZE) { + lsmIterator_ = NULL; + } else { + if(key1) { + lsmIterator_ = diskTreeComponentIterator::openAt(-1, tree_, key1->key()); + } else { + lsmIterator_ = diskTreeComponentIterator::open(-1, tree_); + } + } + } + + +template +diskTreeIterator::diskTreeIterator(recordid tree) : tree_(tree), - lsmIterator_(logtreeIterator::open(-1,tree)), +// lsmIterator_(diskTreeComponentIterator::open(-1,tree)), curr_tuple(0) { + init_iterators(NULL,NULL); init_helper(); } template -treeIterator::treeIterator(recordid tree, TUPLE& key) : +diskTreeIterator::diskTreeIterator(recordid tree, TUPLE& key) : tree_(tree), - lsmIterator_(logtreeIterator::openAt(-1,tree,key.get_key())) + //lsmIterator_(diskTreeComponentIterator::openAt(-1,tree,key.key())) + curr_tuple(0) { + init_iterators(&key,NULL); init_helper(); +} +template +diskTreeIterator::diskTreeIterator(diskTreeComponent *tree) : + tree_(tree ? tree->get_root_rec() : NULLRID), + //lsmIterator_(diskTreeComponentIterator::open(-1,tree->get_root_rec())), + curr_tuple(0) +{ + init_iterators(NULL, NULL); + init_helper(); +} + +template +diskTreeIterator::diskTreeIterator(diskTreeComponent *tree, TUPLE& key) : + tree_(tree ? tree->get_root_rec() : NULLRID), +// lsmIterator_(diskTreeComponentIterator::openAt(-1,tree->get_root_rec(),key.key())) + curr_tuple(0) +{ + init_iterators(&key,NULL); + init_helper(); + } template -treeIterator::~treeIterator() +diskTreeIterator::~diskTreeIterator() { if(lsmIterator_) - logtreeIterator::close(-1, lsmIterator_); + diskTreeComponentIterator::close(-1, lsmIterator_); if(curr_tuple != NULL) free(curr_tuple); @@ -42,19 +80,19 @@ treeIterator::~treeIterator() } template -void treeIterator::init_helper() +void diskTreeIterator::init_helper() { if(!lsmIterator_) { - printf("treeIterator:\t__error__ init_helper():\tnull lsmIterator_"); + // printf("treeIterator:\t__error__ init_helper():\tnull lsmIterator_"); curr_page = 0; dp_itr = 0; } else { - if(logtreeIterator::next(-1, lsmIterator_) == 0) + if(diskTreeComponentIterator::next(-1, lsmIterator_) == 0) { - //printf("treeIterator:\t__error__ init_helper():\tlogtreeIteratr::next returned 0." ); + //printf("diskTreeIterator:\t__error__ init_helper():\tlogtreeIteratr::next returned 0." ); curr_page = 0; dp_itr = 0; } @@ -62,7 +100,7 @@ void treeIterator::init_helper() { pageid_t * pid_tmp; pageid_t ** hack = &pid_tmp; - logtreeIterator::value(-1,lsmIterator_,(byte**)hack); + diskTreeComponentIterator::value(-1,lsmIterator_,(byte**)hack); curr_pageid = *pid_tmp; curr_page = new DataPage(-1, curr_pageid); @@ -73,9 +111,9 @@ void treeIterator::init_helper() } template -TUPLE * treeIterator::getnext() +TUPLE * diskTreeIterator::getnext() { - assert(this->lsmIterator_); + if(!this->lsmIterator_) { return NULL; } if(dp_itr == 0) return 0; @@ -90,12 +128,12 @@ TUPLE * treeIterator::getnext() delete curr_page; curr_page = 0; - if(logtreeIterator::next(-1,lsmIterator_)) + if(diskTreeComponentIterator::next(-1,lsmIterator_)) { pageid_t *pid_tmp; pageid_t **hack = &pid_tmp; - logtreeIterator::value(-1,lsmIterator_,(byte**)hack); + diskTreeComponentIterator::value(-1,lsmIterator_,(byte**)hack); curr_pageid = *pid_tmp; curr_page = new DataPage(-1, curr_pageid); dp_itr = new DPITR_T(curr_page->begin()); @@ -110,3 +148,6 @@ TUPLE * treeIterator::getnext() curr_tuple = readTuple; return curr_tuple; } + +template class diskTreeIterator; +template class changingMemTreeIterator; diff --git a/logiterators.h b/logiterators.h index e269578..516d02c 100644 --- a/logiterators.h +++ b/logiterators.h @@ -10,6 +10,63 @@ template class DataPage; +template +class changingMemTreeIterator +{ +private: + typedef typename MEMTREE::const_iterator MTITER; + +public: + changingMemTreeIterator( MEMTREE *s, pthread_mutex_t * rb_mut ) : s_(s), mut_(rb_mut) { + pthread_mutex_lock(mut_); + if(s_->begin() == s_->end()) { + next_ret_ = NULL; + } else { + next_ret_ = (*s->begin())->create_copy(); // the create_copy() calls have to happen before we release mut_... + } + pthread_mutex_unlock(mut_); + } + changingMemTreeIterator( MEMTREE *s, pthread_mutex_t * rb_mut, TUPLE *&key ) { + pthread_mutex_lock(mut_); + if(s_->find(key) != s_->end()) { + next_ret_ = (*(s_->find(key)))->create_copy(); + } else if(s_->upper_bound(key) != s->end()) { + next_ret_ = (*(s_->upper_bound(key)))->create_copy(); + } else { + next_ret_ = NULL; + } + pthread_mutex_unlock(mut_); + } + + ~changingMemTreeIterator() { if(next_ret_) delete next_ret_; } + + TUPLE* getnext() { + pthread_mutex_lock(mut_); + TUPLE * ret = next_ret_; + if(next_ret_) { + if(s_->upper_bound(next_ret_) == s_->end()) { + next_ret_ = 0; + } else { + next_ret_ = (*s_->upper_bound(next_ret_))->create_copy(); + } + } + pthread_mutex_unlock(mut_); + return ret; + } + + +private: + explicit changingMemTreeIterator() { abort(); } + void operator=(changingMemTreeIterator & t) { abort(); } + int operator-(changingMemTreeIterator & t) { abort(); } +private: + MEMTREE *s_; + TUPLE * next_ret_; + pthread_mutex_t * mut_; +}; + + + ////////////////////////////////////////////////////////////// // memTreeIterator ///////////////////////////////////////////////////////////// @@ -21,52 +78,77 @@ private: typedef typename MEMTREE::const_iterator MTITER; public: - memTreeIterator( MEMTREE *s ) : first_(true), done_(false), it_(s->begin()), itend_(s->end()) { } - memTreeIterator( MEMTREE *s, TUPLE &key ) : first_(true), done_(false), it_(s->find(key)), itend_(s->end()) { } + memTreeIterator( MEMTREE *s ) + : first_(true), + done_(s == NULL) { + init_iterators(s, NULL, NULL); + } - ~memTreeIterator() { } + memTreeIterator( MEMTREE *s, TUPLE *&key ) + : first_(true), done_(s == NULL) { + init_iterators(s, key, NULL); + } + + ~memTreeIterator() { + delete it_; + delete itend_; + } TUPLE* getnext() { if(done_) { return NULL; } - if(first_) { first_ = 0;} else { it_++; } - if(it_==itend_) { done_= true; return NULL; } + if(first_) { first_ = 0;} else { (*it_)++; } + if(*it_==*itend_) { done_= true; return NULL; } - return (*it_)->create_copy(); + return (*(*it_))->create_copy(); } private: + void init_iterators(MEMTREE * s, TUPLE * key1, TUPLE * key2) { + if(s) { + it_ = key1 ? new MTITER(s->find(key1)) : new MTITER(s->begin()); + itend_ = key2 ? new MTITER(s->find(key2)) : new MTITER(s->end()); + } else { + it_ = NULL; + itend_ = NULL; + } + } explicit memTreeIterator() { abort(); } void operator=(memTreeIterator & t) { abort(); } int operator-(memTreeIterator & t) { abort(); } private: bool first_; bool done_; - MTITER it_; - MTITER itend_; + MTITER *it_; + MTITER *itend_; }; ///////////////////////////////////////////////////////////////// template -class treeIterator +class diskTreeIterator { public: - explicit treeIterator(recordid tree); + explicit diskTreeIterator(recordid tree); - explicit treeIterator(recordid tree,TUPLE &key); - - ~treeIterator(); + explicit diskTreeIterator(recordid tree,TUPLE &key); + + explicit diskTreeIterator(diskTreeComponent *tree); + + explicit diskTreeIterator(diskTreeComponent *tree,TUPLE &key); + + ~diskTreeIterator(); TUPLE * getnext(); private: + void init_iterators(TUPLE * key1, TUPLE * key2); inline void init_helper(); - explicit treeIterator() { abort(); } - void operator=(treeIterator & t) { abort(); } - int operator-(treeIterator & t) { abort(); } + explicit diskTreeIterator() { abort(); } + void operator=(diskTreeIterator & t) { abort(); } + int operator-(diskTreeIterator & t) { abort(); } private: recordid tree_; //root of the tree diff --git a/logserver.cpp b/logserver.cpp index 9cce47c..7294716 100644 --- a/logserver.cpp +++ b/logserver.cpp @@ -436,10 +436,13 @@ void * thread_work_fn( void * args) continue; } - int err; + int err = 0; - //step 2: read the tuple from client - datatuple * tuple = readtuplefromsocket(*(item->data->workitem), &err); + //step 2: read the first tuple from client + datatuple *tuple, *tuple2; + if(!err) { tuple = readtuplefromsocket(*(item->data->workitem), &err); } + // read the second tuple from client + if(!err) { tuple2 = readtuplefromsocket(*(item->data->workitem), &err); } //step 3: process the tuple if(opcode == OP_INSERT) @@ -485,7 +488,9 @@ void * thread_work_fn( void * args) //send the tuple err = writetupletosocket(*(item->data->workitem), dt); } - + if(!err) { + writeendofiteratortosocket(*(item->data->workitem)); + } //free datatuple if(dt_needs_free) { datatuple::freetuple(dt); @@ -493,27 +498,36 @@ void * thread_work_fn( void * args) } else if(opcode == OP_SCAN) { - datatuple * end_tuple; - size_t limit; - if(!err) { end_tuple = readtuplefromsocket(*(item->data->workitem), &err); } + size_t limit = -1; + size_t count = 0; if(!err) { limit = readcountfromsocket(*(item->data->workitem), &err); } - if(!err) { - treeIterator * itr; -// if(tuple) { -// itr = new treeIterator(item->data->ltable, *tuple); -// } else { -// itr = new treeIterator(item->data->ltable); -// } - abort(); + if(!err) { err = writeoptosocket(*(item->data->workitem), LOGSTORE_RESPONSE_SENDING_TUPLES); } + + if(!err) { + logtableIterator * itr = new logtableIterator(item->data->ltable, tuple); + datatuple * t; + while(!err && (t = itr->getnext())) { + if(tuple2) { // are we at the end of range? + if(datatuple::compare_obj(t, tuple2) >= 0) { + datatuple::freetuple(t); + break; + } + } + err = writetupletosocket(*(item->data->workitem), t); + datatuple::freetuple(t); + count ++; + if(count == limit) { break; } // did we hit limit? + } delete itr; } + if(!err) { writeendofiteratortosocket(*(item->data->workitem)); } } else if(opcode == OP_DBG_BLOCKMAP) { // produce a list of stasis regions int xid = Tbegin(); - readlock(item->data->ltable->getMergeData()->header_lock, 0); + readlock(item->data->ltable->header_lock, 0); // produce a list of regions used by current tree components pageid_t datapage_c1_region_length, datapage_c1_mergeable_region_length = 0, datapage_c2_region_length; @@ -538,7 +552,7 @@ void * thread_work_fn( void * args) tree_c1_mergeable_regions = diskTreeComponent::list_region_rid(xid, &tree_c1_mergeable_region_header, &tree_c1_mergeable_region_length, &tree_c1_mergeable_region_count); } pageid_t * tree_c2_regions = diskTreeComponent::list_region_rid(xid, &tree_c2_region_header, &tree_c2_region_length, &tree_c2_region_count); - unlock(item->data->ltable->getMergeData()->header_lock); + unlock(item->data->ltable->header_lock); Tcommit(xid); @@ -597,7 +611,8 @@ void * thread_work_fn( void * args) } //free the tuple - datatuple::freetuple(tuple); + if(tuple) datatuple::freetuple(tuple); + if(tuple2) datatuple::freetuple(tuple2); if(err) { perror("could not respond to client"); diff --git a/logstore.cpp b/logstore.cpp index 756effa..2cb58ca 100644 --- a/logstore.cpp +++ b/logstore.cpp @@ -26,7 +26,6 @@ static inline double tv_to_double(struct timeval tv) template class DataPage; - logtable::logtable() { @@ -41,9 +40,12 @@ logtable::logtable() //tmerger = new tuplemerger(&append_merger); tmerger = new tuplemerger(&replace_merger); + header_lock = initlock(); + tsize = 0; tree_bytes = 0; + epoch = 0; } @@ -80,6 +82,7 @@ logtable::~logtable() tearDownTree(tree_c0); } + deletelock(header_lock); delete tmerger; } @@ -121,7 +124,7 @@ void logtable::flushTable() start = tv_to_double(start_tv); - writelock(mergedata->header_lock,0); + writelock(header_lock,0); pthread_mutex_lock(mergedata->rbtree_mut); int expmcount = merge_count; @@ -133,7 +136,7 @@ void logtable::flushTable() while(get_tree_c0_mergeable()) { - unlock(mergedata->header_lock); + unlock(header_lock); // pthread_mutex_lock(mergedata->rbtree_mut); if(tree_bytes >= max_c0_size) pthread_cond_wait(mergedata->input_needed_cond, mergedata->rbtree_mut); @@ -146,12 +149,12 @@ void logtable::flushTable() pthread_mutex_unlock(mergedata->rbtree_mut); - writelock(mergedata->header_lock,0); + writelock(header_lock,0); pthread_mutex_lock(mergedata->rbtree_mut); if(expmcount != merge_count) { - unlock(mergedata->header_lock); + unlock(header_lock); pthread_mutex_unlock(mergedata->rbtree_mut); return; } @@ -177,7 +180,7 @@ void logtable::flushTable() tree_bytes = 0; pthread_mutex_unlock(mergedata->rbtree_mut); - unlock(mergedata->header_lock); + unlock(header_lock); if(first) { printf("flush waited %f sec\n", stop-start); @@ -197,7 +200,7 @@ datatuple * logtable::findTuple(int xid, const datatuple::key_t key, size_t keyS //prepare a search tuple datatuple *search_tuple = datatuple::create(key, keySize); - readlock(mergedata->header_lock,0); + readlock(header_lock,0); pthread_mutex_lock(mergedata->rbtree_mut); datatuple *ret_tuple=0; @@ -328,7 +331,7 @@ datatuple * logtable::findTuple(int xid, const datatuple::key_t key, size_t keyS } //pthread_mutex_unlock(mergedata->rbtree_mut); - unlock(mergedata->header_lock); + unlock(header_lock); datatuple::freetuple(search_tuple); return ret_tuple; @@ -411,12 +414,8 @@ datatuple * logtable::findTuple_first(int xid, datatuple::key_t key, size_t keyS void logtable::insertTuple(datatuple *tuple) { - //static int count = LATCH_INTERVAL; - //static int tsize = 0; //number of tuples - //static int64_t tree_bytes = 0; //number of bytes - //lock the red-black tree - readlock(mergedata->header_lock,0); + readlock(header_lock,0); pthread_mutex_lock(mergedata->rbtree_mut); //find the previous tuple with same key in the memtree if exists rbtree_t::iterator rbitr = tree_c0->find(tuple); @@ -451,20 +450,16 @@ void logtable::insertTuple(datatuple *tuple) { DEBUG("tree size before merge %d tuples %lld bytes.\n", tsize, tree_bytes); pthread_mutex_unlock(mergedata->rbtree_mut); - unlock(mergedata->header_lock); + unlock(header_lock); flushTable(); - readlock(mergedata->header_lock,0); + readlock(header_lock,0); pthread_mutex_lock(mergedata->rbtree_mut); - - //tsize = 0; - //tree_bytes = 0; - } //unlock pthread_mutex_unlock(mergedata->rbtree_mut); - unlock(mergedata->header_lock); + unlock(header_lock); DEBUG("tree size %d tuples %lld bytes.\n", tsize, tree_bytes); @@ -525,3 +520,5 @@ datatuple * logtable::findTuple(int xid, datatuple::key_t key, size_t keySize, } return tup; } + +template class logtableIterator; diff --git a/logstore.h b/logstore.h index 483fd90..beec105 100644 --- a/logstore.h +++ b/logstore.h @@ -36,14 +36,12 @@ #include "tuplemerger.h" #include "datatuple.h" -struct logtable_mergedata; +#include "logiterators.h" typedef std::set rbtree_t; typedef rbtree_t* rbtree_ptr_t; -struct indexnode_rec { - pageid_t ptr; -}; +#include "merger.h" class logtable { @@ -71,23 +69,26 @@ public: inline recordid & get_table_rec(){return table_rec;} // TODO This is called by merger.cpp for no good reason. (remove the calls) + inline uint64_t get_epoch() { return epoch; } + + inline diskTreeComponent * get_tree_c2(){return tree_c2;} inline diskTreeComponent * get_tree_c1(){return tree_c1;} inline diskTreeComponent * get_tree_c1_mergeable(){return tree_c1_mergeable;} - inline void set_tree_c1(diskTreeComponent *t){tree_c1=t;} - inline void set_tree_c1_mergeable(diskTreeComponent *t){tree_c1_mergeable=t;} - inline void set_tree_c2(diskTreeComponent *t){tree_c2=t;} + inline void set_tree_c1(diskTreeComponent *t){tree_c1=t; epoch++; } + inline void set_tree_c1_mergeable(diskTreeComponent *t){tree_c1_mergeable=t; epoch++; } + inline void set_tree_c2(diskTreeComponent *t){tree_c2=t; epoch++; } inline rbtree_ptr_t get_tree_c0(){return tree_c0;} inline rbtree_ptr_t get_tree_c0_mergeable(){return tree_c0_mergeable;} - void set_tree_c0(rbtree_ptr_t newtree){tree_c0 = newtree;} - void set_tree_c0_mergeable(rbtree_ptr_t newtree){tree_c0_mergeable = newtree;} + void set_tree_c0(rbtree_ptr_t newtree){tree_c0 = newtree; epoch++; } + void set_tree_c0_mergeable(rbtree_ptr_t newtree){tree_c0_mergeable = newtree; epoch++; } int get_fixed_page_count(){return fixed_page_count;} void set_fixed_page_count(int count){fixed_page_count = count;} - void setMergeData(logtable_mergedata * mdata) { this->mergedata = mdata;} + void setMergeData(logtable_mergedata * mdata) { this->mergedata = mdata; epoch++; } logtable_mergedata* getMergeData(){return mergedata;} inline tuplemerger * gettuplemerger(){return tmerger;} @@ -106,6 +107,7 @@ public: const static RegionAllocConf_t DATAPAGE_REGION_ALLOC_STATIC_INITIALIZER; logtable_mergedata * mergedata; + rwl * header_lock; int64_t max_c0_size; @@ -118,7 +120,7 @@ public: private: recordid table_rec; struct table_header tbl_header; - + uint64_t epoch; diskTreeComponent *tree_c2; //big tree diskTreeComponent *tree_c1; //small tree diskTreeComponent *tree_c1_mergeable; //small tree: ready to be merged with c2 @@ -137,44 +139,203 @@ private: bool still_running_; }; - -typedef struct logtreeIterator_s { - Page * p; - recordid current; - indexnode_rec *t; - int justOnePage; -} logtreeIterator_s; - - -class logtreeIterator -{ - +template +class mergeManyIterator { public: - static lladdIterator_t* open(int xid, recordid root); - static lladdIterator_t* openAt(int xid, recordid root, const byte* key); - static int next(int xid, lladdIterator_t *it); - static void close(int xid, lladdIterator_t *it); + explicit mergeManyIterator(ITRA* a, ITRN** iters, int num_iters, TUPLE*(*merge)(const TUPLE*,const TUPLE*), int (*cmp)(const TUPLE*,const TUPLE*)) : + num_iters_(num_iters+1), + first_iter_(a), + iters_((ITRN**)malloc(sizeof(*iters_) * num_iters)), // exactly the number passed in + current_((TUPLE**)malloc(sizeof(*current_) * (num_iters_))), // one more than was passed in + last_iter_(-1), + cmp_(cmp), + merge_(merge), + dups((int*)malloc(sizeof(*dups)*num_iters_)) + { + current_[0] = first_iter_->getnext(); + for(int i = 1; i < num_iters_; i++) { + iters_[i-1] = iters[i-1]; + current_[i] = iters_[i-1]->getnext(); + } + } + ~mergeManyIterator() { + delete(first_iter_); + for(int i = 0; i < num_iters_; i++) { + if(i != last_iter_) { + if(current_[i]) TUPLE::freetuple(current_[i]); + } + } + for(int i = 1; i < num_iters_; i++) { + delete iters_[i-1]; + } + free(current_); + free(iters_); + free(dups); + } + TUPLE * getnext() { + int num_dups = 0; + if(last_iter_ != -1) { + // get the value after the one we just returned to the user + //TUPLE::freetuple(current_[last_iter_]); // should never be null + if(last_iter_ == 0) { + current_[last_iter_] = first_iter_->getnext(); + } else { + current_[last_iter_] = iters_[last_iter_-1]->getnext(); + } + } + // find the first non-empty iterator. (Don't need to special-case ITRA since we're looking at current.) + int min = 0; + while(min < num_iters_ && !current_[min]) { + min++; + } + if(min == num_iters_) { return NULL; } + // examine current to decide which tuple to return. + for(int i = min+1; i < num_iters_; i++) { + if(current_[i]) { + int res = cmp_(current_[min], current_[i]); + if(res > 0) { // min > i + min = i; + num_dups = 0; + } else if(res == 0) { // min == i + dups[num_dups] = i; + num_dups++; + } + } + } + TUPLE * ret; + if(!merge_) { + ret = current_[min]; + } else { + // use merge function to build a new ret. + abort(); + } + // advance the iterators that match the tuple we're returning. + for(int i = 0; i < num_dups; i++) { + TUPLE::freetuple(current_[dups[i]]); // should never be null + current_[dups[i]] = iters_[dups[i]-1]->getnext(); + } + last_iter_ = min; // mark the min iter to be advance at the next invocation of next(). This saves us a copy in the non-merging case. + return ret; - - static inline size_t key (int xid, lladdIterator_t *it, byte **key) - { - logtreeIterator_s * impl = (logtreeIterator_s*)it->impl; - *key = (byte*)(impl->t+1); - return impl->current.size - sizeof(indexnode_rec); - } - - - static inline size_t value(int xid, lladdIterator_t *it, byte **value) - { - logtreeIterator_s * impl = (logtreeIterator_s*)it->impl; - *value = (byte*)&(impl->t->ptr); - return sizeof(impl->t->ptr); - } - - static inline void tupleDone(int xid, void *it) { } - static inline void releaseLock(int xid, void *it) { } + } +private: + int num_iters_; + ITRA * first_iter_; + ITRN ** iters_; + TUPLE ** current_; + int last_iter_; + + + int (*cmp_)(const TUPLE*,const TUPLE*); + TUPLE*(*merge_)(const TUPLE*,const TUPLE*); + + // temporary variables initiaized once for effiency + int * dups; +}; + +template +class logtableIterator { +public: + explicit logtableIterator(logtable* ltable) + : ltable(ltable), + epoch(ltable->get_epoch()), + merge_it_(NULL), + last_returned(NULL), + key(NULL) { + readlock(ltable->header_lock, 0); + validate(); + unlock(ltable->header_lock); + } + + explicit logtableIterator(logtable* ltable,TUPLE *key) + : ltable(ltable), + epoch(ltable->get_epoch()), + merge_it_(NULL), + last_returned(NULL), + key(key) { + readlock(ltable->header_lock, 0); + validate(); + unlock(ltable->header_lock); + } + + ~logtableIterator() { + invalidate(); + } + + TUPLE * getnext() { + readlock(ltable->header_lock, 0); + revalidate(); + last_returned = merge_it_->getnext(); + unlock(ltable->header_lock); + return last_returned; + + } + +private: + inline void init_helper(); + + explicit logtableIterator() { abort(); } + void operator=(logtableIterator & t) { abort(); } + int operator-(logtableIterator & t) { abort(); } + +private: + static const int C1 = 0; + static const int C1_MERGEABLE = 1; + static const int C2 = 2; + logtable * ltable; + uint64_t epoch; + typedef mergeManyIterator, memTreeIterator, TUPLE> inner_merge_it_t; +// typedef mergeManyIterator, diskTreeIterator, TUPLE> merge_it_t; + typedef mergeManyIterator, TUPLE> merge_it_t; + + merge_it_t* merge_it_; + + TUPLE * last_returned; + TUPLE * key; + + void revalidate() { + if(ltable->get_epoch() != epoch) { + TUPLE* delme = last_returned = last_returned->create_copy(); + invalidate(); + validate(); + TUPLE::freetuple(delme); + } + } + + + void invalidate() { + delete merge_it_; + } + void validate() { + changingMemTreeIterator * c0_it; + memTreeIterator * c0_mergeable_it[1]; + diskTreeIterator * disk_it[3]; + epoch = ltable->get_epoch(); + if(last_returned) { + c0_it = new changingMemTreeIterator(ltable->get_tree_c0(), ltable->getMergeData()->rbtree_mut, last_returned); + c0_mergeable_it[0] = new memTreeIterator (ltable->get_tree_c0_mergeable(), last_returned); + disk_it[0] = new diskTreeIterator (ltable->get_tree_c1(), *last_returned); + disk_it[1] = new diskTreeIterator (ltable->get_tree_c1_mergeable(), *last_returned); + disk_it[2] = new diskTreeIterator (ltable->get_tree_c2(), *last_returned); + } else if(key) { + c0_it = new changingMemTreeIterator(ltable->get_tree_c0(), ltable->getMergeData()->rbtree_mut, key); + c0_mergeable_it[0] = new memTreeIterator (ltable->get_tree_c0_mergeable(), key); + disk_it[0] = new diskTreeIterator (ltable->get_tree_c1(), *key); + disk_it[1] = new diskTreeIterator (ltable->get_tree_c1_mergeable(), *key); + disk_it[2] = new diskTreeIterator (ltable->get_tree_c2(), *key); + } else { + c0_it = new changingMemTreeIterator(ltable->get_tree_c0(), ltable->getMergeData()->rbtree_mut ); + c0_mergeable_it[0] = new memTreeIterator (ltable->get_tree_c0_mergeable() ); + disk_it[0] = new diskTreeIterator (ltable->get_tree_c1() ); + disk_it[1] = new diskTreeIterator (ltable->get_tree_c1_mergeable() ); + disk_it[2] = new diskTreeIterator (ltable->get_tree_c2() ); + } + + inner_merge_it_t * inner_merge_it = + new inner_merge_it_t(c0_it, c0_mergeable_it, 1, NULL, TUPLE::compare_obj); + merge_it_ = new merge_it_t(inner_merge_it, disk_it, 3, NULL, TUPLE::compare_obj); // XXX Hardcodes comparator, and does not handle merges + } }; - #endif diff --git a/merger.cpp b/merger.cpp index f29cfe5..22af8a5 100644 --- a/merger.cpp +++ b/merger.cpp @@ -16,7 +16,6 @@ int merge_scheduler::addlogtable(logtable *ltable) struct logtable_mergedata * mdata = new logtable_mergedata; // initialize merge data - mdata->header_lock = initlock(); mdata->rbtree_mut = new pthread_mutex_t; pthread_mutex_init(mdata->rbtree_mut,0); ltable->set_tree_c0_mergeable(NULL); @@ -47,7 +46,6 @@ merge_scheduler::~merge_scheduler() logtable_mergedata *mdata = mergedata[i].second; //delete the mergedata fields - deletelock(mdata->header_lock); delete mdata->rbtree_mut; delete mdata->input_needed; delete mdata->input_ready_cond; @@ -218,7 +216,7 @@ void* memMergeThread(void*arg) while(true) { - writelock(ltable->mergedata->header_lock,0); + writelock(ltable->header_lock,0); int done = 0; // wait for c0_mergable while(!ltable->get_tree_c0_mergeable()) @@ -235,12 +233,12 @@ void* memMergeThread(void*arg) } printf("mmt:\twaiting for block ready cond\n"); - unlock(ltable->mergedata->header_lock); + unlock(ltable->header_lock); pthread_cond_wait(a->in_block_ready_cond, a->block_ready_mut); pthread_mutex_unlock(a->block_ready_mut); - writelock(ltable->mergedata->header_lock,0); + writelock(ltable->header_lock,0); printf("mmt:\tblock ready\n"); } @@ -251,7 +249,7 @@ void* memMergeThread(void*arg) pthread_mutex_lock(a->block_ready_mut); pthread_cond_signal(a->out_block_ready_cond); // no block is ready. this allows the other thread to wake up, and see that we're shutting down. pthread_mutex_unlock(a->block_ready_mut); - unlock(ltable->mergedata->header_lock); + unlock(ltable->header_lock); break; } @@ -261,7 +259,7 @@ void* memMergeThread(void*arg) // 4: Merge //create the iterators - treeIterator *itrA = new treeIterator(ltable->get_tree_c1()->get_root_rec()); // XXX don't want get_root_rec() to be here. + diskTreeIterator *itrA = new diskTreeIterator(ltable->get_tree_c1()->get_root_rec()); // XXX don't want get_root_rec() to be here. memTreeIterator *itrB = new memTreeIterator(ltable->get_tree_c0_mergeable()); @@ -270,7 +268,7 @@ void* memMergeThread(void*arg) diskTreeComponent * c1_prime = new diskTreeComponent(xid); // XXX should not hardcode region size) //pthread_mutex_unlock(a->block_ready_mut); - unlock(ltable->mergedata->header_lock); + unlock(ltable->header_lock); //: do the merge printf("mmt:\tMerging:\n"); @@ -299,7 +297,7 @@ void* memMergeThread(void*arg) //now atomically replace the old c1 with new c1 //pthread_mutex_lock(a->block_ready_mut); - writelock(ltable->mergedata->header_lock,0); + writelock(ltable->header_lock,0); merge_count++; printf("mmt:\tmerge_count %d #pages written %lld\n", merge_count, npages); @@ -325,11 +323,11 @@ void* memMergeThread(void*arg) // XXX need to report backpressure here! Also, shouldn't be inside a transaction while waiting on backpressure. while(ltable->get_tree_c1_mergeable()) { pthread_mutex_lock(a->block_ready_mut); - unlock(ltable->mergedata->header_lock); + unlock(ltable->header_lock); pthread_cond_wait(a->out_block_needed_cond, a->block_ready_mut); pthread_mutex_unlock(a->block_ready_mut); - writelock(ltable->mergedata->header_lock,0); + writelock(ltable->header_lock,0); } ltable->set_tree_c1_mergeable(c1_prime); @@ -346,7 +344,7 @@ void* memMergeThread(void*arg) Tcommit(xid); - unlock(ltable->mergedata->header_lock); + unlock(ltable->header_lock); //TODO: get the freeing outside of the lock } @@ -372,7 +370,7 @@ void *diskMergeThread(void*arg) while(true) { - writelock(ltable->mergedata->header_lock,0); + writelock(ltable->header_lock,0); int done = 0; // get a new input for merge while(!ltable->get_tree_c1_mergeable()) @@ -388,28 +386,28 @@ void *diskMergeThread(void*arg) } printf("dmt:\twaiting for block ready cond\n"); - unlock(ltable->mergedata->header_lock); + unlock(ltable->header_lock); pthread_cond_wait(a->in_block_ready_cond, a->block_ready_mut); pthread_mutex_unlock(a->block_ready_mut); printf("dmt:\tblock ready\n"); - writelock(ltable->mergedata->header_lock,0); + writelock(ltable->header_lock,0); } *a->in_block_needed = false; if(done==1) { pthread_cond_signal(a->out_block_ready_cond); - unlock(ltable->mergedata->header_lock); + unlock(ltable->header_lock); break; } int64_t mergedPages=0; //create the iterators - treeIterator *itrA = new treeIterator(ltable->get_tree_c2()->get_root_rec()); - treeIterator *itrB = - new treeIterator(ltable->get_tree_c1_mergeable()->get_root_rec()); + diskTreeIterator *itrA = new diskTreeIterator(ltable->get_tree_c2()->get_root_rec()); + diskTreeIterator *itrB = + new diskTreeIterator(ltable->get_tree_c1_mergeable()->get_root_rec()); xid = Tbegin(); @@ -417,7 +415,7 @@ void *diskMergeThread(void*arg) //TODO: maybe you want larger regions for the second tree? diskTreeComponent * c2_prime = new diskTreeComponent(xid); - unlock(ltable->mergedata->header_lock); + unlock(ltable->header_lock); //do the merge @@ -446,7 +444,7 @@ void *diskMergeThread(void*arg) //writes complete //now atomically replace the old c2 with new c2 //pthread_mutex_lock(a->block_ready_mut); - writelock(ltable->mergedata->header_lock,0); + writelock(ltable->header_lock,0); merge_count++; //update the current optimal R value @@ -463,7 +461,7 @@ void *diskMergeThread(void*arg) Tcommit(xid); - unlock(ltable->mergedata->header_lock); + unlock(ltable->header_lock); } return 0; } diff --git a/merger.h b/merger.h index f0fc738..2683be3 100644 --- a/merger.h +++ b/merger.h @@ -4,12 +4,10 @@ #include #include -#include "logstore.h" -#include "logiterators.h" - //TODO: 400 bytes overhead per tuple, this is nuts, check if this is true... static const int RB_TREE_OVERHEAD = 400; static const double MIN_R = 3.0; +class logtable; struct merger_args { @@ -32,15 +30,12 @@ struct merger_args }; - struct logtable_mergedata { //merge threads pthread_t diskmerge_thread; pthread_t memmerge_thread; - rwl *header_lock; - pthread_mutex_t * rbtree_mut; bool *input_needed; // memmerge-input needed @@ -56,13 +51,15 @@ struct logtable_mergedata }; +#include "logstore.h" // XXX hacky include workaround. +#include "logiterators.h" + class merge_scheduler { std::vector > mergedata; public: - //static pageid_t C0_MEM_SIZE; ~merge_scheduler(); int addlogtable(logtable * ltable); diff --git a/network.h b/network.h index 1617b75..8bc9fa9 100644 --- a/network.h +++ b/network.h @@ -11,8 +11,18 @@ #include #include +#include +typedef unsigned char byte; +#include +#include + typedef uint8_t network_op_t; +typedef uint32_t len_t ; +static const len_t DELETE = ((len_t)0) - 1; + +#include + //server codes static const network_op_t LOGSTORE_FIRST_RESPONSE_CODE = 1; static const network_op_t LOGSTORE_RESPONSE_SUCCESS = 1; @@ -155,10 +165,10 @@ static inline int writeoptosocket(int sockd, network_op_t op) { */ static inline datatuple* readtuplefromsocket(int sockd, int * err) { - datatuple::len_t keylen, datalen, buflen; + len_t keylen, datalen, buflen; if(( *err = readfromsocket(sockd, &keylen, sizeof(keylen)) )) return NULL; - if(keylen == datatuple::DELETE) return NULL; // *err is zero. + if(keylen == DELETE) return NULL; // *err is zero. if(( *err = readfromsocket(sockd, &datalen, sizeof(datalen)) )) return NULL; buflen = datatuple::length_from_header(keylen, datalen); @@ -169,14 +179,21 @@ static inline datatuple* readtuplefromsocket(int sockd, int * err) { return datatuple::from_bytes(keylen, datalen, bytes); // from_bytes consumes the buffer. } +static inline int writeendofiteratortosocket(int sockd) { + return writetosocket(sockd, &DELETE, sizeof(DELETE)); +} static inline int writetupletosocket(int sockd, const datatuple* tup) { - datatuple::len_t keylen, datalen; - - const byte* buf = tup->get_bytes(&keylen, &datalen); + len_t keylen, datalen; int err; - if(( err = writetosocket(sockd, &keylen, sizeof(keylen)) )) return err; - if(( err = writetosocket(sockd, &datalen, sizeof(datalen)) )) return err; - if(( err = writetosocket(sockd, buf, datatuple::length_from_header(keylen, datalen)) )) return err; + + if(tup == NULL) { + if(( err = writeendofiteratortosocket(sockd) )) return err; + } else { + const byte* buf = tup->get_bytes(&keylen, &datalen); + if(( err = writetosocket(sockd, &keylen, sizeof(keylen)) )) return err; + if(( err = writetosocket(sockd, &datalen, sizeof(datalen)) )) return err; + if(( err = writetosocket(sockd, buf, datatuple::length_from_header(keylen, datalen)) )) return err; + } return 0; } @@ -188,8 +205,5 @@ static inline uint64_t readcountfromsocket(int sockd, int *err) { static inline int writecounttosocket(int sockd, uint64_t count) { return writetosocket(sockd, &count, sizeof(count)); } -static inline int writeendofiteratortosocket(int sockd) { - return writetosocket(sockd, &datatuple::DELETE, sizeof(datatuple::DELETE)); -} #endif /* NETWORK_H_ */ diff --git a/tcpclient.cpp b/tcpclient.cpp index c62c297..ceed3fc 100644 --- a/tcpclient.cpp +++ b/tcpclient.cpp @@ -62,7 +62,7 @@ static inline void close_conn(logstore_handle_t *l) { } datatuple * logstore_client_op(logstore_handle_t *l, - uint8_t opcode, datatuple * tuple) + uint8_t opcode, datatuple * tuple, datatuple * tuple2, uint64_t count) { if(l->server_socket < 0) @@ -105,23 +105,34 @@ logstore_client_op(logstore_handle_t *l, //send the opcode if( writetosocket(l->server_socket, &opcode, sizeof(opcode)) ) { close_conn(l); return 0; } - //send the tuple + //send the first tuple if( writetupletosocket(l->server_socket, tuple) ) { close_conn(l); return 0; } + //send the second tuple + if( writetupletosocket(l->server_socket, tuple2) ) { close_conn(l); return 0; } + + if( count != (uint64_t)-1) { + if( writecounttosocket(l->server_socket, count) ) { close_conn(l); return 0; } + } + + network_op_t rcode = readopfromsocket(l->server_socket,LOGSTORE_SERVER_RESPONSE); if( opiserror(rcode) ) { close_conn(l); return 0; } - datatuple * ret; + datatuple * ret = 0; if(rcode == LOGSTORE_RESPONSE_SENDING_TUPLES) { int err; uint64_t count = 0; // XXX - while(( ret = readtuplefromsocket(l->server_socket, &err) )) { + datatuple *nxt; + while(( nxt = readtuplefromsocket(l->server_socket, &err) )) { + if(ret) datatuple::freetuple(ret); // XXX + ret = nxt; if(err) { close_conn(l); return 0; } count++; } - printf("return count: %lld\n", count); + if(count > 1) { printf("return count: %lld\n", count); } } else if(rcode == LOGSTORE_RESPONSE_SUCCESS) { ret = tuple; } else { diff --git a/tcpclient.h b/tcpclient.h index 1458a49..827ebd5 100644 --- a/tcpclient.h +++ b/tcpclient.h @@ -16,7 +16,8 @@ logstore_handle_t * logstore_client_open(const char *host, int portnum, int time datatuple * logstore_client_op(logstore_handle_t* l, uint8_t opcode, - datatuple *tuple); + datatuple *tuple = NULL, datatuple *tuple2 = NULL, + uint64_t count = (uint64_t)-1); int logstore_client_close(logstore_handle_t* l); diff --git a/test/check_gen.cpp b/test/check_gen.cpp index f21bf1a..06c6d6b 100644 --- a/test/check_gen.cpp +++ b/test/check_gen.cpp @@ -24,8 +24,8 @@ int main(int argc, char **argv) // lsmTableHandle* h = TlsmTableStart(lsmTable, INVALID_COL); xid = Tbegin(); - lladdIterator_t * it = logtreeIterator::open(xid,ltable.get_tree_c2()->get_root_rec() ); - logtreeIterator::close(xid, it); + lladdIterator_t * it = diskTreeComponentIterator::open(xid,ltable.get_tree_c2()->get_root_rec() ); + diskTreeComponentIterator::close(xid, it); Tcommit(xid); diskTreeComponent::deinit_stasis(); diff --git a/test/check_logtable.cpp b/test/check_logtable.cpp index 2777965..e0d2118 100644 --- a/test/check_logtable.cpp +++ b/test/check_logtable.cpp @@ -18,7 +18,7 @@ #include "check_util.h" -template class treeIterator; +template class diskTreeIterator; void insertProbeIter(size_t NUM_ENTRIES) { @@ -115,7 +115,7 @@ void insertProbeIter(size_t NUM_ENTRIES) printf("Stage 2: Sequentially reading %d tuples\n", NUM_ENTRIES); size_t tuplenum = 0; - treeIterator tree_itr(tree_root); + diskTreeIterator tree_itr(tree_root); datatuple *dt=0; diff --git a/test/check_logtree.cpp b/test/check_logtree.cpp index 5b2a921..8f3f05a 100644 --- a/test/check_logtree.cpp +++ b/test/check_logtree.cpp @@ -129,12 +129,12 @@ void insertProbeIter_str(int NUM_ENTRIES) int64_t count = 0; - lladdIterator_t * it = logtreeIterator::open(xid, tree); + lladdIterator_t * it = diskTreeComponentIterator::open(xid, tree); - while(logtreeIterator::next(xid, it)) { + while(diskTreeComponentIterator::next(xid, it)) { byte * key; byte **key_ptr = &key; - size_t keysize = logtreeIterator::key(xid, it, (byte**)key_ptr); + size_t keysize = diskTreeComponentIterator::key(xid, it, (byte**)key_ptr); pageid_t *value; pageid_t **value_ptr = &value; @@ -147,7 +147,7 @@ void insertProbeIter_str(int NUM_ENTRIES) } assert(count == NUM_ENTRIES); - logtreeIterator::close(xid, it); + diskTreeComponentIterator::close(xid, it); Tcommit(xid); diskTreeComponent::deinit_stasis(); diff --git a/test/check_tcpclient.cpp b/test/check_tcpclient.cpp index e7b30b7..d9dd82d 100644 --- a/test/check_tcpclient.cpp +++ b/test/check_tcpclient.cpp @@ -110,12 +110,12 @@ void insertProbeIter(size_t NUM_ENTRIES) for(size_t i = 0; i < NUM_ENTRIES; i++) { //prepare the key - datatuple::len_t keylen = (*key_arr)[i].length()+1; + len_t keylen = (*key_arr)[i].length()+1; //prepare the data std::string ditem; getnextdata(ditem, 8192); - datatuple::len_t datalen = ditem.length()+1; + len_t datalen = ditem.length()+1; datatuple* newtuple = datatuple::create((*key_arr)[i].c_str(), keylen, ditem.c_str(), datalen); @@ -155,7 +155,7 @@ void insertProbeIter(size_t NUM_ENTRIES) //fflush(stdout); //get the key - datatuple::len_t keylen = (*key_arr)[ri].length()+1; + len_t keylen = (*key_arr)[ri].length()+1; datatuple* searchtuple = datatuple::create((*key_arr)[ri].c_str(), keylen); @@ -175,6 +175,10 @@ void insertProbeIter(size_t NUM_ENTRIES) } printf("found %d\n", found_tuples); + printf("Stage 3: Initiating scan TODO: look at results\n"); + + logstore_client_op(l, OP_SCAN, NULL, NULL, 0); // start = NULL stop = NULL limit = NONE + key_arr->clear(); delete key_arr; @@ -198,6 +202,9 @@ int main(int argc, char* argv[]) } //insertProbeIter(25000); insertProbeIter(100000); + //insertProbeIter(5000); +// insertProbeIter(100); + /* insertProbeIter(5000); insertProbeIter(2500); diff --git a/tuplemerger.cpp b/tuplemerger.cpp index a8a02ef..6f6de9f 100644 --- a/tuplemerger.cpp +++ b/tuplemerger.cpp @@ -36,8 +36,8 @@ datatuple* append_merger(datatuple *t1, datatuple *t2) { assert(!(t1->isDelete() || t2->isDelete())); - datatuple::len_t keylen = t1->keylen(); - datatuple::len_t datalen = t1->datalen() + t2->datalen(); + len_t keylen = t1->keylen(); + len_t datalen = t1->datalen() + t2->datalen(); byte * data = (byte*)malloc(datalen); memcpy(data, t1->data(), t1->datalen()); memcpy(data + t1->datalen(), t2->data(), t2->datalen());