From e1c937a60213a6872575b1c66647aaa2a6ec02e1 Mon Sep 17 00:00:00 2001 From: sears Date: Wed, 17 Mar 2010 21:51:26 +0000 Subject: [PATCH] Refactoring of logtable. Cleaned a bunch of includes, and logtable is now a template. This is in preparation for the new merge policy git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@698 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe --- datapage.cpp | 2 + datapage.h | 3 +- diskTreeComponent.cpp | 48 +++- diskTreeComponent.h | 71 ++--- logserver.cpp | 8 +- logserver.h | 10 +- logstore.cpp | 63 +++-- logstore.h | 558 ++++++++++++++++++-------------------- memTreeComponent.cpp | 2 + memTreeComponent.h | 4 +- merger.cpp | 27 +- merger.h | 26 +- regionAllocator.h | 5 + server.cpp | 3 +- test/check_datapage.cpp | 2 + test/check_gen.cpp | 6 +- test/check_logtable.cpp | 5 +- test/check_logtree.cpp | 1 + test/check_merge.cpp | 3 +- test/check_mergelarge.cpp | 3 +- test/check_mergetuple.cpp | 3 +- 21 files changed, 433 insertions(+), 420 deletions(-) diff --git a/datapage.cpp b/datapage.cpp index da84fe5..c8e6e2e 100644 --- a/datapage.cpp +++ b/datapage.cpp @@ -1,5 +1,7 @@ #include "logstore.h" #include "datapage.h" +#include "regionAllocator.h" + #include static const int DATA_PAGE = USER_DEFINED_PAGE(1); diff --git a/datapage.h b/datapage.h index a2f2438..5d769c5 100644 --- a/datapage.h +++ b/datapage.h @@ -5,7 +5,8 @@ #include #include -#include "regionAllocator.h" + +struct RegionAllocator; //#define CHECK_FOR_SCRIBBLING diff --git a/diskTreeComponent.cpp b/diskTreeComponent.cpp index 08ac58b..f36b715 100644 --- a/diskTreeComponent.cpp +++ b/diskTreeComponent.cpp @@ -12,6 +12,7 @@ #include "merger.h" #include "diskTreeComponent.h" +#include "regionAllocator.h" #include #include @@ -46,6 +47,27 @@ void diskTreeComponent::internalNodes::init_stasis() { } +recordid diskTreeComponent::get_root_rid() { return ltree->get_root_rec(); } +recordid diskTreeComponent::get_datapage_allocator_rid() { return ltree->get_datapage_alloc()->header_rid(); } +recordid diskTreeComponent::get_internal_node_allocator_rid() { return ltree->get_internal_node_alloc()->header_rid(); } + + + +void diskTreeComponent::force(int xid) { + ltree->get_datapage_alloc()->force_regions(xid); + ltree->get_internal_node_alloc()->force_regions(xid); +} +void diskTreeComponent::dealloc(int xid) { + ltree->get_datapage_alloc()->dealloc_regions(xid); + ltree->get_internal_node_alloc()->dealloc_regions(xid); +} +void diskTreeComponent::list_regions(int xid, pageid_t *internal_node_region_length, pageid_t *internal_node_region_count, pageid_t **internal_node_regions, + pageid_t *datapage_region_length, pageid_t *datapage_region_count, pageid_t **datapage_regions) { + *internal_node_regions = ltree->get_internal_node_alloc()->list_regions(xid, internal_node_region_length, internal_node_region_count); + *datapage_regions = ltree->get_datapage_alloc() ->list_regions(xid, datapage_region_length, datapage_region_count); +} + + void diskTreeComponent::writes_done() { if(dp) { dp->writes_done(); @@ -335,6 +357,20 @@ recordid diskTreeComponent::internalNodes::appendPage(int xid, return ret; } +diskTreeComponent::internalNodes::internalNodes(int xid, pageid_t internal_region_size, pageid_t datapage_region_size, pageid_t datapage_size) +: lastLeaf(-1), + internal_node_alloc(new RegionAllocator(xid, internal_region_size)), + datapage_alloc(new RegionAllocator(xid, datapage_region_size)) +{ create(xid); } + +diskTreeComponent::internalNodes::internalNodes(int xid, recordid root, recordid internal_node_state, recordid datapage_state) +: lastLeaf(-1), + root_rec(root), + internal_node_alloc(new RegionAllocator(xid, internal_node_state)), + datapage_alloc(new RegionAllocator(xid, datapage_state)) +{ } + + /* adding pages: 1) Try to append value to lsmTreeState->lastLeaf @@ -842,7 +878,7 @@ void diskTreeComponent::internalNodes::iterator::close() { // tree iterator implementation ///////////////////////////////////////////////////////////////////// -void diskTreeComponent::diskTreeIterator::init_iterators(datatuple * key1, datatuple * key2) { +void diskTreeComponent::iterator::init_iterators(datatuple * key1, datatuple * key2) { assert(!key2); // unimplemented if(tree_.size == INVALID_SIZE) { lsmIterator_ = NULL; @@ -855,14 +891,14 @@ void diskTreeComponent::diskTreeIterator::init_iterators(datatuple * key1, datat } } -diskTreeComponent::diskTreeIterator::diskTreeIterator(diskTreeComponent::internalNodes *tree) : +diskTreeComponent::iterator::iterator(diskTreeComponent::internalNodes *tree) : tree_(tree ? tree->get_root_rec() : NULLRID) { init_iterators(NULL, NULL); init_helper(NULL); } -diskTreeComponent::diskTreeIterator::diskTreeIterator(diskTreeComponent::internalNodes *tree, datatuple* key) : +diskTreeComponent::iterator::iterator(diskTreeComponent::internalNodes *tree, datatuple* key) : tree_(tree ? tree->get_root_rec() : NULLRID) { init_iterators(key,NULL); @@ -870,7 +906,7 @@ diskTreeComponent::diskTreeIterator::diskTreeIterator(diskTreeComponent::interna } -diskTreeComponent::diskTreeIterator::~diskTreeIterator() +diskTreeComponent::iterator::~iterator() { if(lsmIterator_) { lsmIterator_->close(); @@ -886,7 +922,7 @@ diskTreeComponent::diskTreeIterator::~diskTreeIterator() } -void diskTreeComponent::diskTreeIterator::init_helper(datatuple* key1) +void diskTreeComponent::iterator::init_helper(datatuple* key1) { if(!lsmIterator_) { @@ -918,7 +954,7 @@ void diskTreeComponent::diskTreeIterator::init_helper(datatuple* key1) } } -datatuple * diskTreeComponent::diskTreeIterator::next_callerFrees() +datatuple * diskTreeComponent::iterator::next_callerFrees() { if(!this->lsmIterator_) { return NULL; } diff --git a/diskTreeComponent.h b/diskTreeComponent.h index f449e39..3f627c5 100644 --- a/diskTreeComponent.h +++ b/diskTreeComponent.h @@ -8,25 +8,13 @@ #ifndef DISKTREECOMPONENT_H_ #define DISKTREECOMPONENT_H_ -#include - -#include -#include -#include -#include -#include -#include - -#include "merger.h" -#include "regionAllocator.h" #include "datapage.h" -#include "tuplemerger.h" #include "datatuple.h" class diskTreeComponent { public: class internalNodes; - class diskTreeIterator; + class iterator; diskTreeComponent(int xid, pageid_t internal_region_size, pageid_t datapage_region_size, pageid_t datapage_size) : ltree(new diskTreeComponent::internalNodes(xid, internal_region_size, datapage_region_size, datapage_size)), @@ -43,35 +31,26 @@ class diskTreeComponent { delete ltree; } - recordid get_root_rid() { return ltree->get_root_rec(); } - recordid get_datapage_allocator_rid() { return ltree->get_datapage_alloc()->header_rid(); } - recordid get_internal_node_allocator_rid() { return ltree->get_internal_node_alloc()->header_rid(); } + recordid get_root_rid(); + recordid get_datapage_allocator_rid(); + recordid get_internal_node_allocator_rid(); internalNodes * get_internal_nodes() { return ltree; } datatuple* findTuple(int xid, datatuple::key_t key, size_t keySize); int insertTuple(int xid, /*DataPage *dp,*/ datatuple *t, merge_stats_t *stats); void writes_done(); - diskTreeIterator * iterator() { - return new diskTreeIterator(ltree); + iterator * open_iterator() { + return new iterator(ltree); } - diskTreeIterator * iterator(datatuple * key) { - return new diskTreeIterator(ltree, key); + iterator * open_iterator(datatuple * key) { + return new iterator(ltree, key); } - void force(int xid) { - ltree->get_datapage_alloc()->force_regions(xid); - ltree->get_internal_node_alloc()->force_regions(xid); - } - void dealloc(int xid) { - ltree->get_datapage_alloc()->dealloc_regions(xid); - ltree->get_internal_node_alloc()->dealloc_regions(xid); - } + void force(int xid); + void dealloc(int xid); void list_regions(int xid, pageid_t *internal_node_region_length, pageid_t *internal_node_region_count, pageid_t **internal_node_regions, - pageid_t *datapage_region_length, pageid_t *datapage_region_count, pageid_t **datapage_regions) { - *internal_node_regions = ltree->get_internal_node_alloc()->list_regions(xid, internal_node_region_length, internal_node_region_count); - *datapage_regions = ltree->get_datapage_alloc() ->list_regions(xid, datapage_region_length, datapage_region_count); - } + pageid_t *datapage_region_length, pageid_t *datapage_region_count, pageid_t **datapage_regions); void print_tree(int xid) { ltree->print_tree(xid); @@ -93,18 +72,8 @@ class diskTreeComponent { static void init_stasis(); static void deinit_stasis(); - internalNodes(int xid, pageid_t internal_region_size, pageid_t datapage_region_size, pageid_t datapage_size) - : lastLeaf(-1), - internal_node_alloc(new RegionAllocator(xid, internal_region_size)), - datapage_alloc(new RegionAllocator(xid, datapage_region_size)) - { create(xid); } - - internalNodes(int xid, recordid root, recordid internal_node_state, recordid datapage_state) - : lastLeaf(-1), - root_rec(root), - internal_node_alloc(new RegionAllocator(xid, internal_node_state)), - datapage_alloc(new RegionAllocator(xid, datapage_state)) - { } + internalNodes(int xid, pageid_t internal_region_size, pageid_t datapage_region_size, pageid_t datapage_size); + internalNodes(int xid, recordid root, recordid internal_node_state, recordid datapage_state); void print_tree(int xid); @@ -199,15 +168,15 @@ class diskTreeComponent { }; }; - class diskTreeIterator + class iterator { public: - explicit diskTreeIterator(diskTreeComponent::internalNodes *tree); + explicit iterator(diskTreeComponent::internalNodes *tree); - explicit diskTreeIterator(diskTreeComponent::internalNodes *tree,datatuple *key); + explicit iterator(diskTreeComponent::internalNodes *tree,datatuple *key); - ~diskTreeIterator(); + ~iterator(); datatuple * next_callerFrees(); @@ -215,9 +184,9 @@ class diskTreeComponent { void init_iterators(datatuple * key1, datatuple * key2); inline void init_helper(datatuple * key1); - explicit diskTreeIterator() { abort(); } - void operator=(diskTreeIterator & t) { abort(); } - int operator-(diskTreeIterator & t) { abort(); } + explicit iterator() { abort(); } + void operator=(iterator & t) { abort(); } + int operator-(iterator & t) { abort(); } private: recordid tree_; //root of the tree diff --git a/logserver.cpp b/logserver.cpp index 2ce50bd..863d7b0 100644 --- a/logserver.cpp +++ b/logserver.cpp @@ -1,3 +1,5 @@ +#include + #include "logserver.h" #include "datatuple.h" #include "merger.h" @@ -27,7 +29,7 @@ void *serverLoop(void *args); -void logserver::startserver(logtable *ltable) +void logserver::startserver(logtable *ltable) { sys_alive = true; this->ltable = ltable; @@ -479,7 +481,7 @@ int op_scan(pthread_data *data, datatuple * tuple, datatuple * tuple2, size_t li int err = writeoptosocket(*(data->workitem), LOGSTORE_RESPONSE_SENDING_TUPLES); if(!err) { - logtableIterator * itr = new logtableIterator(data->ltable, tuple); + logtable::iterator * itr = new logtable::iterator(data->ltable, tuple); datatuple * t; while(!err && (t = itr->getnext())) { if(tuple2) { // are we at the end of range? @@ -767,7 +769,7 @@ int op_dbg_blockmap(pthread_data* data) { } int op_dbg_drop_database(pthread_data * data) { - logtableIterator * itr = new logtableIterator(data->ltable); + logtable::iterator * itr = new logtable::iterator(data->ltable); datatuple * del; fprintf(stderr, "DROPPING DATABASE...\n"); long long n = 0; diff --git a/logserver.h b/logserver.h index ab26744..6125986 100644 --- a/logserver.h +++ b/logserver.h @@ -9,6 +9,8 @@ #include #include +#include "logstore.h" + #undef begin #undef try #undef end @@ -21,8 +23,6 @@ #include #endif -class logtable; - struct pthread_item; struct pthread_data { @@ -39,7 +39,7 @@ struct pthread_data { int *workitem; //id of the socket to work - logtable *ltable; + logtable *ltable; bool *sys_alive; #ifdef STATS_ENABLED @@ -97,7 +97,7 @@ public: delete qlock; } - void startserver(logtable *ltable); + void startserver(logtable *ltable); void stopserver(); @@ -126,7 +126,7 @@ private: int * self_pipe; // write a byte to self_pipe[1] to wake up select(). std::vector th_list; // list of threads - logtable *ltable; + logtable *ltable; #ifdef STATS_ENABLED int num_reqs; diff --git a/logstore.cpp b/logstore.cpp index a712de5..a73707f 100644 --- a/logstore.cpp +++ b/logstore.cpp @@ -1,17 +1,9 @@ -#include -#include -#include -#include - -#include "merger.h" #include "logstore.h" -#include "datapage.h" +#include "merger.h" #include -#include -#include -#include -#include +#undef try +#undef end static inline double tv_to_double(struct timeval tv) { @@ -23,7 +15,8 @@ static inline double tv_to_double(struct timeval tv) // LOG TABLE IMPLEMENTATION ///////////////////////////////////////////////////////////////// -logtable::logtable(pageid_t internal_region_size, pageid_t datapage_region_size, pageid_t datapage_size) +template +logtable::logtable(pageid_t internal_region_size, pageid_t datapage_region_size, pageid_t datapage_size) { tree_c0 = NULL; @@ -48,7 +41,8 @@ logtable::logtable(pageid_t internal_region_size, pageid_t datapage_region_size, this->datapage_size = datapage_size; } -logtable::~logtable() +template +logtable::~logtable() { if(tree_c1 != NULL) delete tree_c1; @@ -64,7 +58,8 @@ logtable::~logtable() delete tmerger; } -recordid logtable::allocTable(int xid) +template +recordid logtable::allocTable(int xid) { table_rec = Talloc(xid, sizeof(tbl_header)); @@ -79,13 +74,15 @@ recordid logtable::allocTable(int xid) return table_rec; } -void logtable::openTable(int xid, recordid rid) { +template +void logtable::openTable(int xid, recordid rid) { table_rec = rid; Tread(xid, table_rec, &tbl_header); tree_c2 = new diskTreeComponent(xid, tbl_header.c2_root, tbl_header.c2_state, tbl_header.c2_dp_state); tree_c1 = new diskTreeComponent(xid, tbl_header.c1_root, tbl_header.c1_state, tbl_header.c1_dp_state); } -void logtable::update_persistent_header(int xid) { +template +void logtable::update_persistent_header(int xid) { tbl_header.c2_root = tree_c2->get_root_rid(); tbl_header.c2_dp_state = tree_c2->get_datapage_allocator_rid(); @@ -97,7 +94,18 @@ void logtable::update_persistent_header(int xid) { Tset(xid, table_rec, &tbl_header); } -void logtable::flushTable() +template +void logtable::setMergeData(logtable_mergedata * mdata){ + this->mergedata = mdata; + mdata->internal_region_size = internal_region_size; + mdata->datapage_region_size = datapage_region_size; + mdata->datapage_size = datapage_size; + + bump_epoch(); +} + +template +void logtable::flushTable() { struct timeval start_tv, stop_tv; double start, stop; @@ -173,7 +181,8 @@ void logtable::flushTable() } -datatuple * logtable::findTuple(int xid, const datatuple::key_t key, size_t keySize) +template +datatuple * logtable::findTuple(int xid, const datatuple::key_t key, size_t keySize) { //prepare a search tuple datatuple *search_tuple = datatuple::create(key, keySize); @@ -319,7 +328,8 @@ datatuple * logtable::findTuple(int xid, const datatuple::key_t key, size_t keyS * returns the first record found with the matching key * (not to be used together with diffs) **/ -datatuple * logtable::findTuple_first(int xid, datatuple::key_t key, size_t keySize) +template +datatuple * logtable::findTuple_first(int xid, datatuple::key_t key, size_t keySize) { //prepare a search tuple datatuple * search_tuple = datatuple::create(key, keySize); @@ -387,7 +397,8 @@ datatuple * logtable::findTuple_first(int xid, datatuple::key_t key, size_t keyS } -void logtable::insertTuple(datatuple *tuple) +template +void logtable::insertTuple(datatuple *tuple) { //lock the red-black tree readlock(header_lock,0); @@ -440,10 +451,12 @@ void logtable::insertTuple(datatuple *tuple) DEBUG("tree size %d tuples %lld bytes.\n", tsize, tree_bytes); } -void logtable::registerIterator(logtableIterator * it) { +template +void logtable::registerIterator(iterator * it) { its.push_back(it); } -void logtable::forgetIterator(logtableIterator * it) { +template +void logtable::forgetIterator(iterator * it) { for(unsigned int i = 0; i < its.size(); i++) { if(its[i] == it) { its.erase(its.begin()+i); @@ -451,12 +464,12 @@ void logtable::forgetIterator(logtableIterator * it) { } } } -void logtable::bump_epoch() { +template +void logtable::bump_epoch() { assert(!trywritelock(header_lock,0)); epoch++; for(unsigned int i = 0; i < its.size(); i++) { its[i]->invalidate(); } } - -template class logtableIterator; +template class logtable; diff --git a/logstore.h b/logstore.h index 9f86a3d..674fa72 100644 --- a/logstore.h +++ b/logstore.h @@ -1,49 +1,41 @@ #ifndef _LOGSTORE_H_ #define _LOGSTORE_H_ +#include +#undef try #undef end -#undef begin -#include -//#include -#include -#include -#include #include -#include "logserver.h" -#include -#include -#include - -#include - - - -#include - -#include -#include -#include -#include -#include -#include +typedef struct merge_stats_t { + int merge_level; // 1 => C0->C1, 2 => C1->C2 + pageid_t merge_count; // This is the merge_count'th merge + struct timeval sleep; // When did we go to sleep waiting for input? + struct timeval start; // When did we wake up and start merging? (at steady state with max throughput, this should be equal to sleep) + struct timeval done; // When did we finish merging? + pageid_t bytes_out; // How many bytes did we write (including internal tree nodes)? + pageid_t num_tuples_out; // How many tuples did we write? + pageid_t num_datapages_out; // How many datapages? + pageid_t bytes_in_small; // How many bytes from the small input tree (for C0, we ignore tree overheads)? + pageid_t num_tuples_in_small; // Tuples from the small input? + pageid_t bytes_in_large; // Bytes from the large input? + pageid_t num_tuples_in_large; // Tuples from large input? +} merge_stats_t; #include "diskTreeComponent.h" #include "memTreeComponent.h" -#include "datapage.h" -#include "tuplemerger.h" -#include "datatuple.h" -#include "merger.h" +#include "tuplemerger.h" + + +struct logtable_mergedata; template -class logtableIterator ; - -class logtable -{ +class logtable { public: + + class iterator; logtable(pageid_t internal_region_size = 1000, pageid_t datapage_region_size = 10000, pageid_t datapage_size = 40); // scans 160KB / 2 per lookup on average. at 100MB/s, this is 0.7 ms. XXX pick datapage_size in principled way. ~logtable(); @@ -58,16 +50,13 @@ public: recordid allocTable(int xid); void openTable(int xid, recordid rid); void flushTable(); - - // DataPage* insertTuple(int xid, datatuple *tuple,diskTreeComponent::internalNodes *ltree); - // datatuple * findTuple(int xid, const datatuple::key_t key, size_t keySize, diskTreeComponent::internalNodes *ltree); inline recordid & get_table_rec(){return table_rec;} // TODO This is called by merger.cpp for no good reason. (remove the calls) inline uint64_t get_epoch() { return epoch; } - void registerIterator(logtableIterator * it); - void forgetIterator(logtableIterator * it); + void registerIterator(iterator * it); + void forgetIterator(iterator * it); void bump_epoch() ; inline diskTreeComponent * get_tree_c2(){return tree_c2;} @@ -85,14 +74,7 @@ public: void update_persistent_header(int xid); - void setMergeData(logtable_mergedata * mdata) { - this->mergedata = mdata; - - mdata->internal_region_size = internal_region_size; - mdata->datapage_region_size = datapage_region_size; - mdata->datapage_size = datapage_size; - - bump_epoch(); } + void setMergeData(logtable_mergedata * mdata); logtable_mergedata* getMergeData(){return mergedata;} inline tuplemerger * gettuplemerger(){return tmerger;} @@ -139,256 +121,256 @@ private: tuplemerger *tmerger; - std::vector *> its; + std::vector its; bool still_running_; -}; - -template -class mergeManyIterator { public: - explicit mergeManyIterator(ITRA* a, ITRN** iters, int num_iters, TUPLE*(*merge)(const TUPLE*,const TUPLE*), int (*cmp)(const TUPLE*,const TUPLE*)) : - num_iters_(num_iters+1), - first_iter_(a), - iters_((ITRN**)malloc(sizeof(*iters_) * num_iters)), // exactly the number passed in - current_((TUPLE**)malloc(sizeof(*current_) * (num_iters_))), // one more than was passed in - last_iter_(-1), - cmp_(cmp), - merge_(merge), - dups((int*)malloc(sizeof(*dups)*num_iters_)) - { - current_[0] = first_iter_->getnext(); - for(int i = 1; i < num_iters_; i++) { - iters_[i-1] = iters[i-1]; - current_[i] = iters_[i-1]->next_callerFrees(); - } - } - ~mergeManyIterator() { - delete(first_iter_); - for(int i = 0; i < num_iters_; i++) { - if(i != last_iter_) { - if(current_[i]) TUPLE::freetuple(current_[i]); - } - } - for(int i = 1; i < num_iters_; i++) { - delete iters_[i-1]; - } - free(current_); - free(iters_); - free(dups); - } - TUPLE * peek() { - TUPLE * ret = getnext(); - last_iter_ = -1; // don't advance iterator on next peek() or getnext() call. - return ret; - } - TUPLE * getnext() { - int num_dups = 0; - if(last_iter_ != -1) { - // get the value after the one we just returned to the user - //TUPLE::freetuple(current_[last_iter_]); // should never be null - if(last_iter_ == 0) { - current_[last_iter_] = first_iter_->getnext(); - } else if(last_iter_ != -1){ - current_[last_iter_] = iters_[last_iter_-1]->next_callerFrees(); - } else { - // last call was 'peek' - } - } - // find the first non-empty iterator. (Don't need to special-case ITRA since we're looking at current.) - int min = 0; - while(min < num_iters_ && !current_[min]) { - min++; - } - if(min == num_iters_) { return NULL; } - // examine current to decide which tuple to return. - for(int i = min+1; i < num_iters_; i++) { - if(current_[i]) { - int res = cmp_(current_[min], current_[i]); - if(res > 0) { // min > i - min = i; - num_dups = 0; - } else if(res == 0) { // min == i - dups[num_dups] = i; - num_dups++; - } - } - } - TUPLE * ret; - if(!merge_) { - ret = current_[min]; - } else { - // XXX use merge function to build a new ret. - abort(); - } - // advance the iterators that match the tuple we're returning. - for(int i = 0; i < num_dups; i++) { - TUPLE::freetuple(current_[dups[i]]); // should never be null - current_[dups[i]] = iters_[dups[i]-1]->next_callerFrees(); - } - last_iter_ = min; // mark the min iter to be advance at the next invocation of next(). This saves us a copy in the non-merging case. - return ret; - } -private: - int num_iters_; - ITRA * first_iter_; - ITRN ** iters_; - TUPLE ** current_; - int last_iter_; - - - int (*cmp_)(const TUPLE*,const TUPLE*); - TUPLE*(*merge_)(const TUPLE*,const TUPLE*); - - // temporary variables initiaized once for effiency - int * dups; -}; - -template -class logtableIterator { -public: - explicit logtableIterator(logtable* ltable) - : ltable(ltable), - epoch(ltable->get_epoch()), - merge_it_(NULL), - last_returned(NULL), - key(NULL), - valid(false) { - writelock(ltable->header_lock, 0); - ltable->registerIterator(this); - validate(); - unlock(ltable->header_lock); - } - - explicit logtableIterator(logtable* ltable,TUPLE *key) - : ltable(ltable), - epoch(ltable->get_epoch()), - merge_it_(NULL), - last_returned(NULL), - key(key), - valid(false) - { - writelock(ltable->header_lock, 0); - ltable->registerIterator(this); - validate(); - unlock(ltable->header_lock); - } - - ~logtableIterator() { - ltable->forgetIterator(this); - invalidate(); - if(last_returned) TUPLE::freetuple(last_returned); - } -private: - TUPLE * getnextHelper() { - TUPLE * tmp = merge_it_->getnext(); - if(last_returned && tmp) { - assert(TUPLE::compare(last_returned->key(), last_returned->keylen(), tmp->key(), tmp->keylen()) < 0); - TUPLE::freetuple(last_returned); - } - last_returned = tmp; - return last_returned; - } -public: - TUPLE * getnextIncludingTombstones() { - readlock(ltable->header_lock, 0); - revalidate(); - TUPLE * ret = getnextHelper(); - unlock(ltable->header_lock); - return ret ? ret->create_copy() : NULL; - } - - TUPLE * getnext() { - readlock(ltable->header_lock, 0); - revalidate(); - TUPLE * ret; - while((ret = getnextHelper()) && ret->isDelete()) { } // getNextHelper handles its own memory. - unlock(ltable->header_lock); - return ret ? ret->create_copy() : NULL; // XXX hate making copy! Caller should not manage our memory. - } - - void invalidate() { - if(valid) { - delete merge_it_; - merge_it_ = NULL; - valid = false; - } - } - -private: - inline void init_helper(); - - explicit logtableIterator() { abort(); } - void operator=(logtableIterator & t) { abort(); } - int operator-(logtableIterator & t) { abort(); } - -private: - static const int C1 = 0; - static const int C1_MERGEABLE = 1; - static const int C2 = 2; - logtable * ltable; - uint64_t epoch; - typedef mergeManyIterator< - typename memTreeComponent::revalidatingIterator, - typename memTreeComponent::iterator, - TUPLE> inner_merge_it_t; - typedef mergeManyIterator< - inner_merge_it_t, - diskTreeComponent::diskTreeIterator, - TUPLE> merge_it_t; - - merge_it_t* merge_it_; - - TUPLE * last_returned; - TUPLE * key; - bool valid; - void revalidate() { - if(!valid) { - validate(); - } else { - assert(epoch == ltable->get_epoch()); - } - } - - - void validate() { - typename memTreeComponent::revalidatingIterator * c0_it; - typename memTreeComponent::iterator *c0_mergeable_it[1]; - diskTreeComponent::diskTreeIterator * disk_it[3]; - epoch = ltable->get_epoch(); - if(last_returned) { - c0_it = new typename memTreeComponent::revalidatingIterator(ltable->get_tree_c0(), ltable->getMergeData()->rbtree_mut, last_returned); - c0_mergeable_it[0] = new typename memTreeComponent::iterator (ltable->get_tree_c0_mergeable(), last_returned); - disk_it[0] = ltable->get_tree_c1()->iterator(last_returned); - disk_it[1] = ltable->get_tree_c1_mergeable()->iterator(last_returned); - disk_it[2] = ltable->get_tree_c2()->iterator(last_returned); - } else if(key) { - c0_it = new typename memTreeComponent::revalidatingIterator(ltable->get_tree_c0(), ltable->getMergeData()->rbtree_mut, key); - c0_mergeable_it[0] = new typename memTreeComponent::iterator (ltable->get_tree_c0_mergeable(), key); - disk_it[0] = ltable->get_tree_c1()->iterator(key); - disk_it[1] = ltable->get_tree_c1_mergeable()->iterator(key); - disk_it[2] = ltable->get_tree_c2()->iterator(key); - } else { - c0_it = new typename memTreeComponent::revalidatingIterator(ltable->get_tree_c0(), ltable->getMergeData()->rbtree_mut ); - c0_mergeable_it[0] = new typename memTreeComponent::iterator (ltable->get_tree_c0_mergeable() ); - disk_it[0] = ltable->get_tree_c1()->iterator(); - disk_it[1] = ltable->get_tree_c1_mergeable()->iterator(); - disk_it[2] = ltable->get_tree_c2()->iterator(); - } - - inner_merge_it_t * inner_merge_it = - new inner_merge_it_t(c0_it, c0_mergeable_it, 1, NULL, TUPLE::compare_obj); - merge_it_ = new merge_it_t(inner_merge_it, disk_it, 3, NULL, TUPLE::compare_obj); // XXX Hardcodes comparator, and does not handle merges - if(last_returned) { - TUPLE * junk = merge_it_->peek(); - if(junk && !TUPLE::compare(junk->key(), junk->keylen(), last_returned->key(), last_returned->keylen())) { - // we already returned junk - TUPLE::freetuple(merge_it_->getnext()); + template + class mergeManyIterator { + public: + explicit mergeManyIterator(ITRA* a, ITRN** iters, int num_iters, TUPLE*(*merge)(const TUPLE*,const TUPLE*), int (*cmp)(const TUPLE*,const TUPLE*)) : + num_iters_(num_iters+1), + first_iter_(a), + iters_((ITRN**)malloc(sizeof(*iters_) * num_iters)), // exactly the number passed in + current_((TUPLE**)malloc(sizeof(*current_) * (num_iters_))), // one more than was passed in + last_iter_(-1), + cmp_(cmp), + merge_(merge), + dups((int*)malloc(sizeof(*dups)*num_iters_)) + { + current_[0] = first_iter_->getnext(); + for(int i = 1; i < num_iters_; i++) { + iters_[i-1] = iters[i-1]; + current_[i] = iters_[i-1]->next_callerFrees(); } } - valid = true; - } + ~mergeManyIterator() { + delete(first_iter_); + for(int i = 0; i < num_iters_; i++) { + if(i != last_iter_) { + if(current_[i]) TUPLE::freetuple(current_[i]); + } + } + for(int i = 1; i < num_iters_; i++) { + delete iters_[i-1]; + } + free(current_); + free(iters_); + free(dups); + } + TUPLE * peek() { + TUPLE * ret = getnext(); + last_iter_ = -1; // don't advance iterator on next peek() or getnext() call. + return ret; + } + TUPLE * getnext() { + int num_dups = 0; + if(last_iter_ != -1) { + // get the value after the one we just returned to the user + //TUPLE::freetuple(current_[last_iter_]); // should never be null + if(last_iter_ == 0) { + current_[last_iter_] = first_iter_->getnext(); + } else if(last_iter_ != -1){ + current_[last_iter_] = iters_[last_iter_-1]->next_callerFrees(); + } else { + // last call was 'peek' + } + } + // find the first non-empty iterator. (Don't need to special-case ITRA since we're looking at current.) + int min = 0; + while(min < num_iters_ && !current_[min]) { + min++; + } + if(min == num_iters_) { return NULL; } + // examine current to decide which tuple to return. + for(int i = min+1; i < num_iters_; i++) { + if(current_[i]) { + int res = cmp_(current_[min], current_[i]); + if(res > 0) { // min > i + min = i; + num_dups = 0; + } else if(res == 0) { // min == i + dups[num_dups] = i; + num_dups++; + } + } + } + TUPLE * ret; + if(!merge_) { + ret = current_[min]; + } else { + // XXX use merge function to build a new ret. + abort(); + } + // advance the iterators that match the tuple we're returning. + for(int i = 0; i < num_dups; i++) { + TUPLE::freetuple(current_[dups[i]]); // should never be null + current_[dups[i]] = iters_[dups[i]-1]->next_callerFrees(); + } + last_iter_ = min; // mark the min iter to be advance at the next invocation of next(). This saves us a copy in the non-merging case. + return ret; + + } + private: + int num_iters_; + ITRA * first_iter_; + ITRN ** iters_; + TUPLE ** current_; + int last_iter_; + + + int (*cmp_)(const TUPLE*,const TUPLE*); + TUPLE*(*merge_)(const TUPLE*,const TUPLE*); + + // temporary variables initiaized once for effiency + int * dups; + + }; + + + class iterator { + public: + explicit iterator(logtable* ltable) + : ltable(ltable), + epoch(ltable->get_epoch()), + merge_it_(NULL), + last_returned(NULL), + key(NULL), + valid(false) { + writelock(ltable->header_lock, 0); + ltable->registerIterator(this); + validate(); + unlock(ltable->header_lock); + } + + explicit iterator(logtable* ltable,TUPLE *key) + : ltable(ltable), + epoch(ltable->get_epoch()), + merge_it_(NULL), + last_returned(NULL), + key(key), + valid(false) + { + writelock(ltable->header_lock, 0); + ltable->registerIterator(this); + validate(); + unlock(ltable->header_lock); + } + + ~iterator() { + ltable->forgetIterator(this); + invalidate(); + if(last_returned) TUPLE::freetuple(last_returned); + } + private: + TUPLE * getnextHelper() { + TUPLE * tmp = merge_it_->getnext(); + if(last_returned && tmp) { + assert(TUPLE::compare(last_returned->key(), last_returned->keylen(), tmp->key(), tmp->keylen()) < 0); + TUPLE::freetuple(last_returned); + } + last_returned = tmp; + return last_returned; + } + public: + TUPLE * getnextIncludingTombstones() { + readlock(ltable->header_lock, 0); + revalidate(); + TUPLE * ret = getnextHelper(); + unlock(ltable->header_lock); + return ret ? ret->create_copy() : NULL; + } + + TUPLE * getnext() { + readlock(ltable->header_lock, 0); + revalidate(); + TUPLE * ret; + while((ret = getnextHelper()) && ret->isDelete()) { } // getNextHelper handles its own memory. + unlock(ltable->header_lock); + return ret ? ret->create_copy() : NULL; // XXX hate making copy! Caller should not manage our memory. + } + + void invalidate() { + if(valid) { + delete merge_it_; + merge_it_ = NULL; + valid = false; + } + } + + private: + inline void init_helper(); + + explicit iterator() { abort(); } + void operator=(iterator & t) { abort(); } + int operator-(iterator & t) { abort(); } + + private: + static const int C1 = 0; + static const int C1_MERGEABLE = 1; + static const int C2 = 2; + logtable * ltable; + uint64_t epoch; + typedef mergeManyIterator< + typename memTreeComponent::revalidatingIterator, + typename memTreeComponent::iterator> inner_merge_it_t; + typedef mergeManyIterator< + inner_merge_it_t, + diskTreeComponent::iterator> merge_it_t; + + merge_it_t* merge_it_; + + TUPLE * last_returned; + TUPLE * key; + bool valid; + void revalidate() { + if(!valid) { + validate(); + } else { + assert(epoch == ltable->get_epoch()); + } + } + + + void validate() { + typename memTreeComponent::revalidatingIterator * c0_it; + typename memTreeComponent::iterator *c0_mergeable_it[1]; + diskTreeComponent::iterator * disk_it[3]; + epoch = ltable->get_epoch(); + if(last_returned) { + c0_it = new typename memTreeComponent::revalidatingIterator(ltable->get_tree_c0(), ltable->getMergeData()->rbtree_mut, last_returned); + c0_mergeable_it[0] = new typename memTreeComponent::iterator (ltable->get_tree_c0_mergeable(), last_returned); + disk_it[0] = ltable->get_tree_c1()->open_iterator(last_returned); + disk_it[1] = ltable->get_tree_c1_mergeable()->open_iterator(last_returned); + disk_it[2] = ltable->get_tree_c2()->open_iterator(last_returned); + } else if(key) { + c0_it = new typename memTreeComponent::revalidatingIterator(ltable->get_tree_c0(), ltable->getMergeData()->rbtree_mut, key); + c0_mergeable_it[0] = new typename memTreeComponent::iterator (ltable->get_tree_c0_mergeable(), key); + disk_it[0] = ltable->get_tree_c1()->open_iterator(key); + disk_it[1] = ltable->get_tree_c1_mergeable()->open_iterator(key); + disk_it[2] = ltable->get_tree_c2()->open_iterator(key); + } else { + c0_it = new typename memTreeComponent::revalidatingIterator(ltable->get_tree_c0(), ltable->getMergeData()->rbtree_mut ); + c0_mergeable_it[0] = new typename memTreeComponent::iterator (ltable->get_tree_c0_mergeable() ); + disk_it[0] = ltable->get_tree_c1()->open_iterator(); + disk_it[1] = ltable->get_tree_c1_mergeable()->open_iterator(); + disk_it[2] = ltable->get_tree_c2()->open_iterator(); + } + + inner_merge_it_t * inner_merge_it = + new inner_merge_it_t(c0_it, c0_mergeable_it, 1, NULL, TUPLE::compare_obj); + merge_it_ = new merge_it_t(inner_merge_it, disk_it, 3, NULL, TUPLE::compare_obj); // XXX Hardcodes comparator, and does not handle merges + if(last_returned) { + TUPLE * junk = merge_it_->peek(); + if(junk && !TUPLE::compare(junk->key(), junk->keylen(), last_returned->key(), last_returned->keylen())) { + // we already returned junk + TUPLE::freetuple(merge_it_->getnext()); + } + } + valid = true; + } + }; }; diff --git a/memTreeComponent.cpp b/memTreeComponent.cpp index 3acd6d5..a9a9d2f 100644 --- a/memTreeComponent.cpp +++ b/memTreeComponent.cpp @@ -1,4 +1,6 @@ #include "memTreeComponent.h" +#include "datatuple.h" + template void memTreeComponent::tearDownTree(rbtree_ptr_t tree) { TUPLE * t = 0; diff --git a/memTreeComponent.h b/memTreeComponent.h index 584a65b..f7b3d45 100644 --- a/memTreeComponent.h +++ b/memTreeComponent.h @@ -1,7 +1,7 @@ #ifndef _MEMTREECOMPONENT_H_ #define _MEMTREECOMPONENT_H_ #include -#include "datatuple.h" + template class memTreeComponent { public: @@ -115,7 +115,7 @@ public: } ~revalidatingIterator() { - if(next_ret_) datatuple::freetuple(next_ret_); + if(next_ret_) TUPLE::freetuple(next_ret_); } TUPLE* getnext() { diff --git a/merger.cpp b/merger.cpp index 3d63f8c..fa0f770 100644 --- a/merger.cpp +++ b/merger.cpp @@ -2,6 +2,11 @@ #include #include "merger.h" + +#include +#undef try +#undef end + void merge_stats_pp(FILE* fd, merge_stats_t &stats) { long long sleep_time = stats.start.tv_sec - stats.sleep.tv_sec; long long work_time = stats.done.tv_sec - stats.start.tv_sec; @@ -44,7 +49,7 @@ void merge_stats_pp(FILE* fd, merge_stats_t &stats) { double merge_stats_nsec_to_merge_in_bytes(merge_stats_t); // how many nsec did we burn on each byte from the small tree (want this to be equal for the two mergers) -int merge_scheduler::addlogtable(logtable *ltable) +int merge_scheduler::addlogtable(logtable *ltable) { struct logtable_mergedata * mdata = new logtable_mergedata; @@ -76,7 +81,7 @@ merge_scheduler::~merge_scheduler() { for(size_t i=0; i *ltable = mergedata[i].first; logtable_mergedata *mdata = mergedata[i].second; //delete the mergedata fields @@ -112,7 +117,7 @@ void merge_scheduler::shutdown() //signal shutdown for(size_t i=0; i *ltable = mergedata[i].first; logtable_mergedata *mdata = mergedata[i].second; //flush the in memory table to write any tuples still in memory @@ -142,7 +147,7 @@ void merge_scheduler::shutdown() void merge_scheduler::startlogtable(int index, int64_t MAX_C0_SIZE) { - logtable * ltable = mergedata[index].first; + logtable * ltable = mergedata[index].first; struct logtable_mergedata *mdata = mergedata[index].second; pthread_cond_t * block1_needed_cond = new pthread_cond_t; @@ -228,7 +233,7 @@ template void merge_iterators(int xid, ITA *itrA, ITB *itrB, - logtable *ltable, + logtable *ltable, diskTreeComponent *scratch_tree, merge_stats_t *stats, bool dropDeletes); @@ -262,7 +267,7 @@ void* memMergeThread(void*arg) merger_args * a = (merger_args*)(arg); - logtable * ltable = a->ltable; + logtable * ltable = a->ltable; assert(ltable->get_tree_c1()); int merge_count =0; @@ -318,7 +323,7 @@ void* memMergeThread(void*arg) // 4: Merge //create the iterators - diskTreeComponent::diskTreeIterator *itrA = ltable->get_tree_c1()->iterator(); + diskTreeComponent::iterator *itrA = ltable->get_tree_c1()->open_iterator(); memTreeComponent::iterator *itrB = new memTreeComponent::iterator(ltable->get_tree_c0_mergeable()); @@ -420,7 +425,7 @@ void *diskMergeThread(void*arg) merger_args * a = (merger_args*)(arg); - logtable * ltable = a->ltable; + logtable * ltable = a->ltable; assert(ltable->get_tree_c2()); @@ -472,8 +477,8 @@ void *diskMergeThread(void*arg) // 4: do the merge. //create the iterators - diskTreeComponent::diskTreeIterator *itrA = ltable->get_tree_c2()->iterator(); //new diskTreeIterator(ltable->get_tree_c2()->get_root_rec()); - diskTreeComponent::diskTreeIterator *itrB = ltable->get_tree_c1_mergeable()->iterator(); + diskTreeComponent::iterator *itrA = ltable->get_tree_c2()->open_iterator(); //new iterator(ltable->get_tree_c2()->get_root_rec()); + diskTreeComponent::iterator *itrB = ltable->get_tree_c1_mergeable()->open_iterator(); //create a new tree diskTreeComponent * c2_prime = new diskTreeComponent(xid, a->internal_region_size, a->datapage_region_size, a->datapage_size); @@ -535,7 +540,7 @@ template void merge_iterators(int xid, ITA *itrA, //iterator on c1 or c2 ITB *itrB, //iterator on c0 or c1, respectively - logtable *ltable, + logtable *ltable, diskTreeComponent *scratch_tree, merge_stats_t *stats, bool dropDeletes // should be true iff this is biggest component ) diff --git a/merger.h b/merger.h index b59fb83..3a31540 100644 --- a/merger.h +++ b/merger.h @@ -1,8 +1,8 @@ #ifndef _MERGER_H_ #define _MERGER_H_ -#include -#include +#include "logstore.h" +#include "datatuple.h" #include #undef try @@ -11,26 +11,10 @@ //TODO: 400 bytes overhead per tuple, this is nuts, check if this is true... static const int RB_TREE_OVERHEAD = 400; static const double MIN_R = 3.0; -class logtable; - -typedef struct merge_stats_t { - int merge_level; // 1 => C0->C1, 2 => C1->C2 - pageid_t merge_count; // This is the merge_count'th merge - struct timeval sleep; // When did we go to sleep waiting for input? - struct timeval start; // When did we wake up and start merging? (at steady state with max throughput, this should be equal to sleep) - struct timeval done; // When did we finish merging? - pageid_t bytes_out; // How many bytes did we write (including internal tree nodes)? - pageid_t num_tuples_out; // How many tuples did we write? - pageid_t num_datapages_out; // How many datapages? - pageid_t bytes_in_small; // How many bytes from the small input tree (for C0, we ignore tree overheads)? - pageid_t num_tuples_in_small; // Tuples from the small input? - pageid_t bytes_in_large; // Bytes from the large input? - pageid_t num_tuples_in_large; // Tuples from large input? -} merge_stats_t; struct merger_args { - logtable * ltable; + logtable * ltable; int worker_id; pthread_mutex_t * block_ready_mut; @@ -82,12 +66,12 @@ struct logtable_mergedata class merge_scheduler { - std::vector > mergedata; + std::vector *, logtable_mergedata*> > mergedata; public: ~merge_scheduler(); - int addlogtable(logtable * ltable); + int addlogtable(logtable * ltable); void startlogtable(int index, int64_t MAX_C0_SIZE = 100*1024*1024); struct logtable_mergedata *getMergeData(int index){return mergedata[index].second;} diff --git a/regionAllocator.h b/regionAllocator.h index 9772f16..fd17570 100644 --- a/regionAllocator.h +++ b/regionAllocator.h @@ -8,6 +8,11 @@ #ifndef REGIONALLOCATOR_H_ #define REGIONALLOCATOR_H_ +#include +#undef try +#undef end + + class RegionAllocator { public: diff --git a/server.cpp b/server.cpp index a0d8fbf..6ac2d59 100644 --- a/server.cpp +++ b/server.cpp @@ -3,6 +3,7 @@ #include #include #include "logstore.h" +#include "logserver.h" #include "datapage.h" #include "merger.h" #include @@ -55,7 +56,7 @@ void initialize_server() mscheduler = new merge_scheduler; - logtable ltable; + logtable ltable; recordid table_root = ROOT_RECORD; if(TrecordType(xid, ROOT_RECORD) == INVALID_SLOT) { diff --git a/test/check_datapage.cpp b/test/check_datapage.cpp index a9acfe7..6f33482 100644 --- a/test/check_datapage.cpp +++ b/test/check_datapage.cpp @@ -13,7 +13,9 @@ #include #include "check_util.h" +#include "regionAllocator.h" +#include #undef begin #undef end diff --git a/test/check_gen.cpp b/test/check_gen.cpp index 720488f..8121caa 100644 --- a/test/check_gen.cpp +++ b/test/check_gen.cpp @@ -1,4 +1,8 @@ +#include +#undef begin +#undef end + #include "logstore.h" @@ -13,7 +17,7 @@ int main(int argc, char **argv) int xid = Tbegin(); - logtable ltable(1000, 10000, 5); + logtable ltable(1000, 10000, 5); recordid table_root = ltable.allocTable(xid); diff --git a/test/check_logtable.cpp b/test/check_logtable.cpp index 15317b0..fcff909 100644 --- a/test/check_logtable.cpp +++ b/test/check_logtable.cpp @@ -12,6 +12,7 @@ #include #include +#include #undef begin #undef end @@ -29,7 +30,7 @@ void insertProbeIter(size_t NUM_ENTRIES) int xid = Tbegin(); - logtable ltable(1000, 10000, 5); + logtable ltable(1000, 10000, 5); recordid table_root = ltable.allocTable(xid); Tcommit(xid); @@ -84,7 +85,7 @@ void insertProbeIter(size_t NUM_ENTRIES) printf("Stage 2: Sequentially reading %d tuples\n", NUM_ENTRIES); size_t tuplenum = 0; - diskTreeComponent::diskTreeIterator * tree_itr = ltable_c1->iterator(); + diskTreeComponent::iterator * tree_itr = ltable_c1->open_iterator(); datatuple *dt=0; diff --git a/test/check_logtree.cpp b/test/check_logtree.cpp index 7333461..c807dbe 100644 --- a/test/check_logtree.cpp +++ b/test/check_logtree.cpp @@ -20,6 +20,7 @@ #define OFFSET (NUM_ENTRIES * 10) +#include #undef begin #undef end diff --git a/test/check_merge.cpp b/test/check_merge.cpp index 3de38cf..dd24e54 100644 --- a/test/check_merge.cpp +++ b/test/check_merge.cpp @@ -12,6 +12,7 @@ #include #include +#include #undef begin #undef end @@ -46,7 +47,7 @@ void insertProbeIter(size_t NUM_ENTRIES) int xid = Tbegin(); merge_scheduler mscheduler; - logtable ltable(1000, 10000, 5); + logtable ltable(1000, 10000, 5); recordid table_root = ltable.allocTable(xid); diff --git a/test/check_mergelarge.cpp b/test/check_mergelarge.cpp index 0d5c471..fd8255f 100644 --- a/test/check_mergelarge.cpp +++ b/test/check_mergelarge.cpp @@ -12,6 +12,7 @@ #include #include +#include #undef begin #undef end @@ -45,7 +46,7 @@ void insertProbeIter(size_t NUM_ENTRIES) int xid = Tbegin(); merge_scheduler mscheduler; - logtable ltable(1000, 10000, 100); + logtable ltable(1000, 10000, 100); recordid table_root = ltable.allocTable(xid); diff --git a/test/check_mergetuple.cpp b/test/check_mergetuple.cpp index 2293e3c..204e489 100644 --- a/test/check_mergetuple.cpp +++ b/test/check_mergetuple.cpp @@ -12,6 +12,7 @@ #include #include +#include #undef begin #undef end @@ -101,7 +102,7 @@ void insertProbeIter(size_t NUM_ENTRIES) int xid = Tbegin(); merge_scheduler mscheduler; - logtable ltable(1000, 1000, 40); + logtable ltable(1000, 1000, 40); recordid table_root = ltable.allocTable(xid);