diff --git a/datapage.h b/datapage.h index de90b54..7770183 100644 --- a/datapage.h +++ b/datapage.h @@ -43,20 +43,19 @@ public: // Open an existing region allocator. RegionAllocator(int xid, recordid rid) : - rid_(rid), nextPage_(INVALID_PAGE), endOfRegion_(INVALID_PAGE) { - Tread(xid, rid, &header_); + rid_ = rid; + Tread(xid, rid_, &header_); regionCount_ = TarrayListLength(xid, header_.region_list); } // Create a new region allocator. - RegionAllocator(int xid, recordid rid, pageid_t region_length) : - rid_(rid), + RegionAllocator(int xid, pageid_t region_length) : nextPage_(0), endOfRegion_(0), regionCount_(0) { - assert(TrecordSize(xid, rid) == sizeof(header_)); + rid_ = Talloc(xid, sizeof(header_)); header_.region_list = TarrayListAlloc(xid, 1, 2, sizeof(pageid_t)); header_.region_length = region_length; Tset(xid, rid_, &header_); @@ -94,7 +93,6 @@ public: Tread(xid, list_entry, &pid); TregionForce(xid, pid); } - Tset(xid, rid_, &header_); } void dealloc_regions(int xid) { pageid_t regionCount = TarrayListLength(xid, header_.region_list); @@ -105,12 +103,13 @@ public: list_entry.slot < regionCount; list_entry.slot++) { pageid_t pid; Tread(xid, list_entry, &pid); -#ifndef CHECK_FOR_SCRIBBLING // Don't actually free the page if we'll be checking that pages are used exactly once below. +//#ifndef CHECK_FOR_SCRIBBLING // Don't actually free the page if we'll be checking that pages are used exactly once below. TregionDealloc(xid, pid); -#endif +//#endif } - printf("Warning: leaking arraylist %lld in datapage\n", (long long)header_.region_list.page); - // TarrayListDealloc(xid, header_.region_list); +// printf("Warning: leaking arraylist %lld in datapage\n", (long long)header_.region_list.page); + TarrayListDealloc(xid, header_.region_list); + Tdealloc(xid, rid_); } pageid_t * list_regions(int xid, pageid_t * region_length, pageid_t * region_count) { *region_count = TarrayListLength(xid, header_.region_list); @@ -127,17 +126,17 @@ public: nextPage_ = INVALID_PAGE; endOfRegion_ = INVALID_PAGE; } + recordid header_rid() { return rid_; } private: typedef struct { recordid region_list; pageid_t region_length; } persistent_state; - const recordid rid_; + recordid rid_; pageid_t nextPage_; - pageid_t endOfRegion_; - - pageid_t regionCount_; + pageid_t endOfRegion_; + pageid_t regionCount_; persistent_state header_; public: static const size_t header_size = sizeof(persistent_state); diff --git a/logserver.cpp b/logserver.cpp index 8a057d5..0931476 100644 --- a/logserver.cpp +++ b/logserver.cpp @@ -500,18 +500,27 @@ void * thread_work_fn( void * args) readlock(item->data->ltable->getMergeData()->header_lock, 0); // produce a list of regions used by current tree components - pageid_t datapage_c1_region_length, datapage_c2_region_length; - pageid_t datapage_c1_region_count, datapage_c2_region_count; + pageid_t datapage_c1_region_length, datapage_c1_mergeable_region_length = 0, datapage_c2_region_length; + pageid_t datapage_c1_region_count, datapage_c1_mergeable_region_count = 0, datapage_c2_region_count; pageid_t * datapage_c1_regions = item->data->ltable->get_tree_c1()->get_alloc()->list_regions(xid, &datapage_c1_region_length, &datapage_c1_region_count); + pageid_t * datapage_c1_mergeable_regions = NULL; + if(item->data->ltable->get_tree_c1_mergeable()) { + datapage_c1_mergeable_regions = item->data->ltable->get_tree_c1_mergeable()->get_alloc()->list_regions(xid, &datapage_c1_mergeable_region_length, &datapage_c1_mergeable_region_count); + } pageid_t * datapage_c2_regions = item->data->ltable->get_tree_c2()->get_alloc()->list_regions(xid, &datapage_c2_region_length, &datapage_c2_region_count); - pageid_t tree_c1_region_length, tree_c2_region_length; - pageid_t tree_c1_region_count, tree_c2_region_count; + pageid_t tree_c1_region_length, tree_c1_mergeable_region_length = 0, tree_c2_region_length; + pageid_t tree_c1_region_count, tree_c1_mergeable_region_count = 0, tree_c2_region_count; - recordid tree_c1_region_header = item->data->ltable->get_treestate1(); - recordid tree_c2_region_header = item->data->ltable->get_treestate2(); + recordid tree_c1_region_header = item->data->ltable->get_tree_c1()->get_tree_state(); + recordid tree_c2_region_header = item->data->ltable->get_tree_c2()->get_tree_state(); pageid_t * tree_c1_regions = logtree::list_region_rid(xid, &tree_c1_region_header, &tree_c1_region_length, &tree_c1_region_count); + pageid_t * tree_c1_mergeable_regions = NULL; + if(item->data->ltable->get_tree_c1_mergeable()) { + recordid tree_c1_mergeable_region_header = item->data->ltable->get_tree_c1_mergeable()->get_tree_state(); + tree_c1_mergeable_regions = logtree::list_region_rid(xid, &tree_c1_mergeable_region_header, &tree_c1_mergeable_region_length, &tree_c1_mergeable_region_count); + } pageid_t * tree_c2_regions = logtree::list_region_rid(xid, &tree_c2_region_header, &tree_c2_region_length, &tree_c2_region_count); unlock(item->data->ltable->getMergeData()->header_lock); @@ -541,17 +550,31 @@ void * thread_work_fn( void * args) boundary_tag tag; pageid_t pid = ROOT_RECORD.page; TregionReadBoundaryTag(xid, pid, &tag); + pageid_t max_off = 0; bool done; do { + max_off = pid + tag.size; // print tag. printf("\tPage %lld\tSize %lld\tAllocationManager %d\n", (long long)pid, (long long)tag.size, (int)tag.allocation_manager); done = ! TregionNextBoundaryTag(xid, &pid, &tag, 0/*all allocation managers*/); } while(!done); printf("\n"); + + printf("Tree components are using %lld megabytes. File is using %lld megabytes.", + PAGE_SIZE * (tree_c1_region_length * tree_c1_region_count + + tree_c1_mergeable_region_length * tree_c1_mergeable_region_count + + tree_c2_region_length * tree_c2_region_count + + datapage_c1_region_length * datapage_c1_region_count + + datapage_c1_mergeable_region_length * datapage_c1_mergeable_region_count + + datapage_c2_region_length * datapage_c2_region_count) / (1024 * 1024), + (PAGE_SIZE * max_off) / (1024*1024)); + free(datapage_c1_regions); + if(datapage_c1_mergeable_regions) free(datapage_c1_mergeable_regions); free(datapage_c2_regions); free(tree_c1_regions); + if(tree_c1_mergeable_regions) free(tree_c1_mergeable_regions); free(tree_c2_regions); err = writeoptosocket(*(item->data->workitem), LOGSTORE_RESPONSE_SUCCESS); diff --git a/logstore.cpp b/logstore.cpp index 204eafd..e7cfb19 100644 --- a/logstore.cpp +++ b/logstore.cpp @@ -62,9 +62,8 @@ void logtree::free_region_rid(int xid, recordid tree, } -void logtree::dealloc_region_rid(int xid, void *conf) +void logtree::dealloc_region_rid(int xid, recordid rid) { - recordid rid = *(recordid*)conf; RegionAllocConf_t a; Tread(xid,rid,&a); DEBUG("{%lld <- dealloc region arraylist}\n", a.regionList.page); @@ -76,14 +75,13 @@ void logtree::dealloc_region_rid(int xid, void *conf) TregionDealloc(xid,pid); } a.regionList.slot = 0; - printf("Warning: leaking arraylist %lld in logtree\n", (long long)a.regionList.page); -// TarrayListDealloc(xid, a.regionList); +// printf("Warning: leaking arraylist %lld in logtree\n", (long long)a.regionList.page); + TarrayListDealloc(xid, a.regionList); } -void logtree::force_region_rid(int xid, void *conf) +void logtree::force_region_rid(int xid, recordid rid) { - recordid rid = *(recordid*)conf; RegionAllocConf_t a; Tread(xid,rid,&a); @@ -832,7 +830,9 @@ logtable::logtable() { tree_c0 = NULL; + tree_c0_mergeable = NULL; tree_c1 = NULL; + tree_c1_mergeable = NULL; tree_c2 = NULL; this->still_running_ = true; this->mergedata = 0; @@ -889,17 +889,17 @@ recordid logtable::allocTable(int xid) //create the big tree tbl_header.c2_dp_state = Talloc(xid, DataPage::RegionAllocator::header_size); - tree_c2 = new logtree(new DataPage::RegionAllocator(xid, tbl_header.c2_dp_state, 10000)); /// XXX do not hard code region length. - tree_c2->create(xid); + tree_c2 = new logtree(xid); //create the small tree tbl_header.c1_dp_state = Talloc(xid, DataPage::RegionAllocator::header_size); - tree_c1 = new logtree(new DataPage::RegionAllocator(xid, tbl_header.c1_dp_state, 10000)); /// XXX do not hard code region length. - tree_c1->create(xid); + tree_c1 = new logtree(xid); tbl_header.c2_root = tree_c2->get_root_rec(); + tbl_header.c2_dp_state = tree_c2->get_alloc()->header_rid(); tbl_header.c2_state = tree_c2->get_tree_state(); tbl_header.c1_root = tree_c1->get_root_rec(); + tbl_header.c2_dp_state = tree_c1->get_alloc()->header_rid(); tbl_header.c1_state = tree_c1->get_tree_state(); Tset(xid, table_rec, &tbl_header); @@ -931,7 +931,7 @@ void logtable::flushTable() printf("prv merge not complete\n"); - while(*mergedata->old_c0) { + while(get_tree_c0_mergeable()) { unlock(mergedata->header_lock); // pthread_mutex_lock(mergedata->rbtree_mut); if(tree_bytes >= max_c0_size) @@ -963,15 +963,15 @@ void logtable::flushTable() stop = tv_to_double(stop_tv); //rbtree_ptr *tmp_ptr = new rbtree_ptr_t; //(typeof(h->scratch_tree)*) malloc(sizeof(void*)); - //*tmp_ptr = tree_c0; - *(mergedata->old_c0) = tree_c0; + set_tree_c0_mergeable(get_tree_c0()); // pthread_mutex_lock(mergedata->rbtree_mut); pthread_cond_signal(mergedata->input_ready_cond); // pthread_mutex_unlock(mergedata->rbtree_mut); merge_count ++; - tree_c0 = new rbtree_t; + set_tree_c0(new rbtree_t); + tsize = 0; tree_bytes = 0; @@ -1002,20 +1002,20 @@ datatuple * logtable::findTuple(int xid, const datatuple::key_t key, size_t keyS datatuple *ret_tuple=0; //step 1: look in tree_c0 - rbtree_t::iterator rbitr = tree_c0->find(search_tuple); - if(rbitr != tree_c0->end()) + rbtree_t::iterator rbitr = get_tree_c0()->find(search_tuple); + if(rbitr != get_tree_c0()->end()) { - DEBUG("tree_c0 size %d\n", tree_c0->size()); + DEBUG("tree_c0 size %d\n", get_tree_c0()->size()); ret_tuple = (*rbitr)->create_copy(); } bool done = false; //step: 2 look into first in tree if exists (a first level merge going on) - if(*(mergedata->old_c0) != 0) + if(get_tree_c0_mergeable() != 0) { DEBUG("old mem tree not null %d\n", (*(mergedata->old_c0))->size()); - rbitr = (*(mergedata->old_c0))->find(search_tuple); - if(rbitr != (*(mergedata->old_c0))->end()) + rbitr = get_tree_c0_mergeable()->find(search_tuple); + if(rbitr != get_tree_c0_mergeable()->end()) { datatuple *tuple = *rbitr; @@ -1041,7 +1041,7 @@ datatuple * logtable::findTuple(int xid, const datatuple::key_t key, size_t keyS //step 3: check c1 if(!done) { - datatuple *tuple_c1 = findTuple(xid, key, keySize, tree_c1); + datatuple *tuple_c1 = findTuple(xid, key, keySize, get_tree_c1()); if(tuple_c1 != NULL) { bool use_copy = false; @@ -1057,9 +1057,6 @@ datatuple * logtable::findTuple(int xid, const datatuple::key_t key, size_t keyS { use_copy = true; ret_tuple = tuple_c1; - //byte *barr = (byte*)malloc(tuple_c1->byte_length()); - //memcpy(barr, (byte*)tuple_c1->keylen, tuple_c1->byte_length()); - //ret_tuple = datatuple::from_bytes(barr); } if(!use_copy) @@ -1070,11 +1067,10 @@ datatuple * logtable::findTuple(int xid, const datatuple::key_t key, size_t keyS } //step 4: check old c1 if exists - if(!done && *(mergedata->diskmerge_args->in_tree) != 0) + if(!done && get_tree_c1_mergeable() != 0) { DEBUG("old c1 tree not null\n"); - datatuple *tuple_oc1 = findTuple(xid, key, keySize, - (logtree*)( *(mergedata->diskmerge_args->in_tree))); + datatuple *tuple_oc1 = findTuple(xid, key, keySize, get_tree_c1_mergeable()); if(tuple_oc1 != NULL) { @@ -1107,7 +1103,7 @@ datatuple * logtable::findTuple(int xid, const datatuple::key_t key, size_t keyS if(!done) { DEBUG("Not in old first disk tree\n"); - datatuple *tuple_c2 = findTuple(xid, key, keySize, tree_c2); + datatuple *tuple_c2 = findTuple(xid, key, keySize, get_tree_c2()); if(tuple_c2 != NULL) { @@ -1154,8 +1150,8 @@ datatuple * logtable::findTuple_first(int xid, datatuple::key_t key, size_t keyS datatuple *ret_tuple=0; //step 1: look in tree_c0 - rbtree_t::iterator rbitr = tree_c0->find(search_tuple); - if(rbitr != tree_c0->end()) + rbtree_t::iterator rbitr = get_tree_c0()->find(search_tuple); + if(rbitr != get_tree_c0()->end()) { DEBUG("tree_c0 size %d\n", tree_c0->size()); ret_tuple = (*rbitr)->create_copy(); @@ -1165,11 +1161,11 @@ datatuple * logtable::findTuple_first(int xid, datatuple::key_t key, size_t keyS { DEBUG("Not in mem tree %d\n", tree_c0->size()); //step: 2 look into first in tree if exists (a first level merge going on) - if(*(mergedata->old_c0) != 0) + if(get_tree_c0_mergeable() != NULL) { DEBUG("old mem tree not null %d\n", (*(mergedata->old_c0))->size()); - rbitr = (*(mergedata->old_c0))->find(search_tuple); - if(rbitr != (*(mergedata->old_c0))->end()) + rbitr = get_tree_c0_mergeable()->find(search_tuple); + if(rbitr != get_tree_c0_mergeable()->end()) { ret_tuple = (*rbitr)->create_copy(); } @@ -1180,7 +1176,7 @@ datatuple * logtable::findTuple_first(int xid, datatuple::key_t key, size_t keyS DEBUG("Not in old mem tree\n"); //step 3: check c1 - ret_tuple = findTuple(xid, key, keySize, tree_c1); + ret_tuple = findTuple(xid, key, keySize, get_tree_c1()); } if(ret_tuple == 0) @@ -1188,11 +1184,10 @@ datatuple * logtable::findTuple_first(int xid, datatuple::key_t key, size_t keyS DEBUG("Not in first disk tree\n"); //step 4: check old c1 if exists - if( *(mergedata->diskmerge_args->in_tree) != 0) + if( get_tree_c1_mergeable() != 0) { DEBUG("old c1 tree not null\n"); - ret_tuple = findTuple(xid, key, keySize, - (logtree*)( *(mergedata->diskmerge_args->in_tree))); + ret_tuple = findTuple(xid, key, keySize, get_tree_c1_mergeable()); } } diff --git a/logstore.h b/logstore.h index ee848ce..7aeb1b4 100644 --- a/logstore.h +++ b/logstore.h @@ -59,10 +59,10 @@ typedef void(*logtree_page_deallocator_t)(int, void *); class logtree{ public: - logtree(DataPage::RegionAllocator * alloc): region_alloc(alloc) {} - + logtree(int xid): region_alloc(new DataPage::RegionAllocator(xid, 10000)) {create(xid);} // XXX shouldn't hardcode region size. +private: recordid create(int xid); - +public: void print_tree(int xid); static void init_stasis(); @@ -71,9 +71,9 @@ private: static pageid_t alloc_region(int xid, void *conf); public: static pageid_t alloc_region_rid(int xid, void * ridp); - static void force_region_rid(int xid, void *conf); + static void force_region_rid(int xid, recordid rid); static pageid_t*list_region_rid(int xid, void * ridp, pageid_t * region_len, pageid_t * region_count); - static void dealloc_region_rid(int xid, void *conf); + static void dealloc_region_rid(int xid, recordid rid); static void free_region_rid(int xid, recordid tree, logtree_page_deallocator_t dealloc, void *allocator_state); @@ -129,8 +129,6 @@ public: void *allocator_state); inline DataPage::RegionAllocator* get_alloc() { return region_alloc; } -// inline void set_alloc(DataPage::RegionAllocator* a1) { region_alloc = a1; } // XXX kludge; must be a better api for this - // (currently, need to get rid from dpstate. add a 'register' method that sets the rid of the region allocator?) /** Initialize a page for use as an internal node of the tree. @@ -193,18 +191,16 @@ public: inline logtree * get_tree_c2(){return tree_c2;} inline logtree * get_tree_c1(){return tree_c1;} + inline logtree * get_tree_c1_mergeable(){return tree_c1_mergeable;} inline void set_tree_c1(logtree *t){tree_c1=t;} + inline void set_tree_c1_mergeable(logtree *t){tree_c1_mergeable=t;} inline void set_tree_c2(logtree *t){tree_c2=t;} inline rbtree_ptr_t get_tree_c0(){return tree_c0;} - + inline rbtree_ptr_t get_tree_c0_mergeable(){return tree_c0_mergeable;} void set_tree_c0(rbtree_ptr_t newtree){tree_c0 = newtree;} - - inline recordid get_dpstate1(){return tbl_header.c1_dp_state;} - inline recordid get_dpstate2(){return tbl_header.c2_dp_state;} - inline recordid get_treestate1(){return tbl_header.c1_state;} - inline recordid get_treestate2(){return tbl_header.c2_state;} + void set_tree_c0_mergeable(rbtree_ptr_t newtree){tree_c0_mergeable = newtree;} int get_fixed_page_count(){return fixed_page_count;} void set_fixed_page_count(int count){fixed_page_count = count;} @@ -223,9 +219,6 @@ public: recordid c1_root; recordid c1_state; recordid c1_dp_state; - //epoch_t beginning; - //epoch_t end; - }; const static RegionAllocConf_t DATAPAGE_REGION_ALLOC_STATIC_INITIALIZER; @@ -246,8 +239,9 @@ private: logtree *tree_c2; //big tree logtree *tree_c1; //small tree + logtree *tree_c1_mergeable; //small tree: ready to be merged with c2 rbtree_ptr_t tree_c0; // in-mem red black tree - + rbtree_ptr_t tree_c0_mergeable; // in-mem red black tree: ready to be merged with c1. int tsize; //number of tuples int64_t tree_bytes; //number of bytes @@ -256,8 +250,6 @@ private: //DATA PAGE SETTINGS int fixed_page_count;//number of pages in a datapage -// logtable_mergedata * mergedata; - tuplemerger *tmerger; bool still_running_; @@ -279,7 +271,6 @@ public: static lladdIterator_t* open(int xid, recordid root); static lladdIterator_t* openAt(int xid, recordid root, const byte* key); static int next(int xid, lladdIterator_t *it); - //static lladdIterator_t *copy(int xid, lladdIterator_t* i); static void close(int xid, lladdIterator_t *it); diff --git a/merger.cpp b/merger.cpp index 4d252cb..97acd9f 100644 --- a/merger.cpp +++ b/merger.cpp @@ -19,8 +19,7 @@ int merge_scheduler::addlogtable(logtable *ltable) mdata->header_lock = initlock(); mdata->rbtree_mut = new pthread_mutex_t; pthread_mutex_init(mdata->rbtree_mut,0); - mdata->old_c0 = new rbtree_ptr_t; - *mdata->old_c0 = 0; + ltable->set_tree_c0_mergeable(NULL); mdata->input_needed = new bool(false); @@ -32,8 +31,8 @@ int merge_scheduler::addlogtable(logtable *ltable) mdata->input_size = new int64_t(100); - mdata->diskmerge_args = new merger_args; - mdata->memmerge_args = new merger_args; + mdata->diskmerge_args = new merger_args; + mdata->memmerge_args = new merger_args; mergedata.push_back(std::make_pair(ltable, mdata)); return mergedata.size()-1; @@ -50,7 +49,6 @@ merge_scheduler::~merge_scheduler() //delete the mergedata fields deletelock(mdata->header_lock); delete mdata->rbtree_mut; - delete mdata->old_c0; delete mdata->input_needed; delete mdata->input_ready_cond; delete mdata->input_needed_cond; @@ -147,7 +145,7 @@ void merge_scheduler::startlogtable(int index, int64_t MAX_C0_SIZE) DEBUG("Tree C1 is %lld\n", (long long)ltable->get_tree_c1()->get_root_rec().page); DEBUG("Tree C2 is %lld\n", (long long)ltable->get_tree_c2()->get_root_rec().page); - struct merger_args diskmerge_args= { + struct merger_args diskmerge_args= { ltable, 1, //worker id mdata->rbtree_mut, //block_ready_mutex @@ -159,18 +157,15 @@ void merge_scheduler::startlogtable(int index, int64_t MAX_C0_SIZE) block2_ready_cond, //out_block_ready_cond 0, //max_tree_size No max size for biggest component &R, //r_i - block1_scratch, //in-tree - 0, //out_tree - ltable->get_tree_c2()->get_root_rec(), // my_tree }; *mdata->diskmerge_args = diskmerge_args; - struct merger_args memmerge_args = + struct merger_args memmerge_args = { ltable, 2, - mdata->rbtree_mut, //block_ready_mutex + mdata->rbtree_mut, mdata->input_needed_cond, mdata->input_needed, block1_needed_cond, @@ -179,9 +174,6 @@ void merge_scheduler::startlogtable(int index, int64_t MAX_C0_SIZE) block1_ready_cond, (int64_t)(R * R * MAX_C0_SIZE), &R, - mdata->old_c0, - block1_scratch, - ltable->get_tree_c1()->get_root_rec(), }; *mdata->memmerge_args = memmerge_args; @@ -194,17 +186,32 @@ void merge_scheduler::startlogtable(int index, int64_t MAX_C0_SIZE) } -// deallocate/free their region -// create new data region for new data pages +/** + * Merge algorithm + *
+  1: while(1)
+  2:    wait for c0_mergable
+  3:    begin
+  4:    merge c0_mergable and c1 into c1'
+  5:    force c1'
+  6:    delete c1
+  7:    if c1' is too big
+  8:       c1 = new_empty
+  9:       c1_mergable = c1'
+ 10:    else
+ 11:       c1 = c1'
+ 12:    commit
+  
+ */ void* memMergeThread(void*arg) { int xid;// = Tbegin(); - merger_args * a = (merger_args*)(arg); - assert(a->my_tree.size != -1); + merger_args * a = (merger_args*)(arg); logtable * ltable = a->ltable; + assert(ltable->get_tree_c1()); int merge_count =0; // pthread_mutex_lock(a->block_ready_mut); @@ -213,8 +220,8 @@ void* memMergeThread(void*arg) { writelock(ltable->mergedata->header_lock,0); int done = 0; - // get a new input for merge - while(!*(a->in_tree)) + // wait for c0_mergable + while(!ltable->get_tree_c0_mergeable()) { pthread_mutex_lock(a->block_ready_mut); *a->in_block_needed = true; @@ -242,40 +249,25 @@ void* memMergeThread(void*arg) if(done==1) { pthread_mutex_lock(a->block_ready_mut); - pthread_cond_signal(a->out_block_ready_cond); + pthread_cond_signal(a->out_block_ready_cond); // no block is ready. this allows the other thread to wake up, and see that we're shutting down. pthread_mutex_unlock(a->block_ready_mut); unlock(ltable->mergedata->header_lock); break; } - if((*a->in_tree)->size()==0) //input empty, this can only happen during shutdown - { - delete *a->in_tree; - *a->in_tree = 0; - unlock(ltable->mergedata->header_lock); - continue; - } - - int64_t mergedPages=0; - - assert(a->my_tree.size != -1); - - //create the iterators - treeIterator *itrA = new treeIterator(a->my_tree); - memTreeIterator *itrB = - new memTreeIterator(*a->in_tree); - - //Tcommit(xid); + // 3: Begin transaction xid = Tbegin(); + + // 4: Merge + + //create the iterators + treeIterator *itrA = new treeIterator(ltable->get_tree_c1()->get_root_rec()); // XXX don't want get_root_rec() to be here. + memTreeIterator *itrB = + new memTreeIterator(ltable->get_tree_c0_mergeable()); + //create a new tree - logtree * scratch_tree = new logtree(new DataPage::RegionAllocator(xid, ltable->get_dpstate1() /*rid of old header*/, 10000)); // XXX should not hardcode region size) - recordid scratch_root = scratch_tree->create(xid); - - //save the old dp state values - DataPage::RegionAllocator *old_alloc = ltable->get_tree_c1()->get_alloc(); - old_alloc->done(); // XXX do this earlier - recordid oldAllocState = ltable->get_tree_c1()->get_tree_state(); + logtree * c1_prime = new logtree(xid); // XXX should not hardcode region size) //pthread_mutex_unlock(a->block_ready_mut); unlock(ltable->mergedata->header_lock); @@ -284,16 +276,24 @@ void* memMergeThread(void*arg) printf("mmt:\tMerging:\n"); int64_t npages = 0; - mergedPages = merge_iterators(xid, itrA, itrB, ltable, scratch_tree, npages, false); + int64_t mergedPages = merge_iterators(xid, itrA, itrB, ltable, c1_prime, npages, false); delete itrA; delete itrB; + // 5: force c1' + //force write the new region to disk - recordid scratch_alloc_state = scratch_tree->get_tree_state(); - logtree::force_region_rid(xid, &scratch_alloc_state); + logtree::force_region_rid(xid, c1_prime->get_tree_state()); //force write the new datapages - scratch_tree->get_alloc()->force_regions(xid); + c1_prime->get_alloc()->force_regions(xid); + + // 6: delete c1 and c0_mergeable + logtree::dealloc_region_rid(xid, ltable->get_tree_c1()->get_tree_state()); + ltable->get_tree_c1()->get_alloc()->dealloc_regions(xid); + + logtable::tearDownTree(ltable->get_tree_c0_mergeable()); + ltable->set_tree_c0_mergeable(NULL); //writes complete //now atomically replace the old c1 with new c1 @@ -303,32 +303,6 @@ void* memMergeThread(void*arg) merge_count++; printf("mmt:\tmerge_count %d #pages written %lld\n", merge_count, npages); - delete ltable->get_tree_c1(); - ltable->set_tree_c1(scratch_tree); - - logtable::table_header h; - Tread(xid, ltable->get_table_rec(), &h); - - h.c1_root = scratch_root; - h.c1_state = scratch_alloc_state; - //note we already updated the dpstate before the merge - printf("mmt:\tUpdated C1's position on disk to %lld\n",scratch_root.page); - Tset(xid, ltable->get_table_rec(), &h); - - //Tcommit(xid); - //xid = Tbegin(); - - // free old my_tree here - //TODO: check - logtree::free_region_rid(xid, a->my_tree, logtree::dealloc_region_rid, &oldAllocState); - - //free the old data pages - old_alloc->dealloc_regions(xid); - - Tcommit(xid); - //xid = Tbegin(); - - //TODO: this is simplistic for now //signal the other merger if necessary double target_R = *(a->r_i); @@ -337,12 +311,19 @@ void* memMergeThread(void*arg) if( (new_c1_size / ltable->max_c0_size > target_R) || (a->max_size && new_c1_size > a->max_size ) ) { - printf("mmt:\tsignaling C2 for merge\n"); + // 7: c1' is too big + + // 8: c1 = new empty. + ltable->set_tree_c1(new logtree(xid)); + + printf("mmt:\tsignaling C2 for merge\n"); printf("mmt:\tnew_c1_size %.2f\tMAX_C0_SIZE %lld\ta->max_size %lld\t targetr %.2f \n", new_c1_size, ltable->max_c0_size, a->max_size, target_R); + + // 9: c1_mergeable = c1' - // XXX need to report backpressure here! - while(*a->out_tree) { + // XXX need to report backpressure here! Also, shouldn't be inside a transaction while waiting on backpressure. + while(ltable->get_tree_c1_mergeable()) { pthread_mutex_lock(a->block_ready_mut); unlock(ltable->mergedata->header_lock); @@ -350,50 +331,24 @@ void* memMergeThread(void*arg) pthread_mutex_unlock(a->block_ready_mut); writelock(ltable->mergedata->header_lock,0); } - - - *a->out_tree = scratch_tree; - xid = Tbegin(); + ltable->set_tree_c1_mergeable(c1_prime); pthread_cond_signal(a->out_block_ready_cond); - - logtree *empty_tree = new logtree(new DataPage::RegionAllocator(xid, ltable->get_dpstate1() /*rid of old header*/, 10000)); // XXX should not hardcode region size); - empty_tree->create(xid); - - a->my_tree = empty_tree->get_root_rec(); - - ltable->set_tree_c1(empty_tree); - - logtable::table_header h; - Tread(xid, ltable->get_table_rec(), &h); - h.c1_root = empty_tree->get_root_rec(); //update root - h.c1_state = empty_tree->get_tree_state(); //update index alloc state - printf("mmt:\tUpdated C1's position on disk to %lld\n",empty_tree->get_root_rec().page); - Tset(xid, ltable->get_table_rec(), &h); - Tcommit(xid); - //xid = Tbegin(); - - } - else //not signaling the C2 for merge yet - { - printf("mmt:\tnot signaling C2 for merge\n"); - a->my_tree = scratch_root; + } else { + // 11: c1 = c1' + ltable->set_tree_c1(c1_prime); } - rbtree_ptr_t deltree = *a->in_tree; - *a->in_tree = 0; + // XXX want to set this stuff somewhere. + logtable::table_header h; + printf("mmt:\tUpdated C1's position on disk to %lld\n",ltable->get_tree_c1()->get_root_rec().page); + + Tcommit(xid); - - //Tcommit(xid); unlock(ltable->mergedata->header_lock); //TODO: get the freeing outside of the lock - //// ----------- Free in_tree - logtable::tearDownTree(deltree); - //deltree = 0; - - } //pthread_mutex_unlock(a->block_ready_mut); @@ -407,10 +362,10 @@ void *diskMergeThread(void*arg) { int xid;// = Tbegin(); - merger_args * a = (merger_args*)(arg); - assert(a->my_tree.size != -1); + merger_args * a = (merger_args*)(arg); logtable * ltable = a->ltable; + assert(ltable->get_tree_c2()); int merge_count =0; //pthread_mutex_lock(a->block_ready_mut); @@ -420,7 +375,7 @@ void *diskMergeThread(void*arg) writelock(ltable->mergedata->header_lock,0); int done = 0; // get a new input for merge - while(!*(a->in_tree)) + while(!ltable->get_tree_c1_mergeable()) { pthread_mutex_lock(a->block_ready_mut); *a->in_block_needed = true; @@ -451,25 +406,16 @@ void *diskMergeThread(void*arg) int64_t mergedPages=0; - assert(a->my_tree.size != -1); - //create the iterators - treeIterator *itrA = new treeIterator(a->my_tree); + treeIterator *itrA = new treeIterator(ltable->get_tree_c2()->get_root_rec()); treeIterator *itrB = - new treeIterator((*a->in_tree)->get_root_rec()); + new treeIterator(ltable->get_tree_c1_mergeable()->get_root_rec()); xid = Tbegin(); //create a new tree //TODO: maybe you want larger regions for the second tree? - logtree * scratch_tree = new logtree(new DataPage::RegionAllocator(xid, ltable->get_dpstate2() /*rid of old header*/, 10000)); // XXX should not hardcode region size - recordid scratch_root = scratch_tree->create(xid); - - //save the old dp state values - DataPage::RegionAllocator *old_alloc1 = ltable->get_tree_c1()->get_alloc(); - DataPage::RegionAllocator *old_alloc2 = ltable->get_tree_c2()->get_alloc(); - - recordid oldAllocState = ltable->get_tree_c2()->get_tree_state(); + logtree * c2_prime = new logtree(xid); unlock(ltable->mergedata->header_lock); @@ -478,16 +424,24 @@ void *diskMergeThread(void*arg) printf("dmt:\tMerging:\n"); int64_t npages = 0; - mergedPages = merge_iterators(xid, itrA, itrB, ltable, scratch_tree, npages, true); + mergedPages = merge_iterators(xid, itrA, itrB, ltable, c2_prime, npages, true); delete itrA; delete itrB; - + //force write the new region to disk - recordid scratch_alloc_state = scratch_tree->get_tree_state(); - logtree::force_region_rid(xid, &scratch_alloc_state); - //force write the new datapages - scratch_tree->get_alloc()->force_regions(xid); + logtree::force_region_rid(xid, c2_prime->get_tree_state()); + c2_prime->get_alloc()->force_regions(xid); + + logtree::dealloc_region_rid(xid, ltable->get_tree_c1_mergeable()->get_tree_state()); + ltable->get_tree_c1_mergeable()->get_alloc()->dealloc_regions(xid); + delete ltable->get_tree_c1_mergeable(); + ltable->set_tree_c1_mergeable(0); + + logtree::dealloc_region_rid(xid, ltable->get_tree_c2()->get_tree_state()); + ltable->get_tree_c2()->get_alloc()->dealloc_regions(xid); + delete ltable->get_tree_c2(); + //writes complete //now atomically replace the old c2 with new c2 @@ -500,50 +454,18 @@ void *diskMergeThread(void*arg) printf("dmt:\tmerge_count %d\t#written pages: %lld\n optimal r %.2f", merge_count, npages, *(a->r_i)); - delete ltable->get_tree_c2(); - ltable->set_tree_c2(scratch_tree); + // 11: C2 is never too big. + ltable->set_tree_c2(c2_prime); - logtable::table_header h; - Tread(xid, ltable->get_table_rec(), &h); - - h.c2_root = scratch_root; - h.c2_state = scratch_alloc_state; - //note we already updated the dpstate before the merge - printf("dmt:\tUpdated C2's position on disk to %lld\n",scratch_root.page); - Tset(xid, ltable->get_table_rec(), &h); - - // free old my_tree here - //TODO: check - logtree::free_region_rid(xid, a->my_tree, logtree::dealloc_region_rid, &oldAllocState); - - //free the old data pages - old_alloc2->dealloc_regions(xid); + logtable::table_header h; // XXX Need to set header. - a->my_tree = scratch_root; - - //// ----------- Free in_tree - //TODO: check - logtree::free_region_rid(xid, (*a->in_tree)->get_root_rec(), - logtree::dealloc_region_rid, - &((*a->in_tree)->get_tree_state())); - old_alloc1->dealloc_regions(xid); // XXX make sure that both of these are 'unlinked' before this happens + printf("dmt:\tUpdated C2's position on disk to %lld\n",(long long)-1); Tcommit(xid); - //xid = Tbegin(); - //Tcommit(xid); - delete *a->in_tree; - *a->in_tree = 0; - unlock(ltable->mergedata->header_lock); - } - - //pthread_mutex_unlock(a->block_ready_mut); - return 0; - - } template diff --git a/merger.h b/merger.h index c38481b..7dcb86b 100644 --- a/merger.h +++ b/merger.h @@ -10,8 +10,7 @@ //TODO: 400 bytes overhead per tuple, this is nuts, check if this is true... static const int RB_TREE_OVERHEAD = 400; static const double MIN_R = 3.0; -//T is either logtree or red-black tree -template + struct merger_args { logtable * ltable; @@ -28,14 +27,8 @@ struct merger_args pthread_cond_t * in_block_ready_cond; pthread_cond_t * out_block_ready_cond; - int64_t max_size; //pageid_t + int64_t max_size; double * r_i; - - T ** in_tree; - - logtree ** out_tree; - - recordid my_tree; }; @@ -49,7 +42,6 @@ struct logtable_mergedata rwl *header_lock; pthread_mutex_t * rbtree_mut; - rbtree_ptr_t *old_c0; //in-mem red black tree being merged / to be merged bool *input_needed; // memmerge-input needed @@ -58,9 +50,9 @@ struct logtable_mergedata int64_t * input_size; //merge args 1 - struct merger_args *diskmerge_args; + struct merger_args *diskmerge_args; //merge args 2 - struct merger_args *memmerge_args; + struct merger_args *memmerge_args; }; @@ -79,14 +71,8 @@ public: struct logtable_mergedata *getMergeData(int index){return mergedata[index].second;} void shutdown(); - - - }; - -void* memMergeThread(void* arg); - template int64_t merge_iterators(int xid, ITA *itrA, @@ -96,8 +82,7 @@ int64_t merge_iterators(int xid, int64_t &npages, bool dropDeletes); - +void* memMergeThread(void* arg); void* diskMergeThread(void* arg); - #endif diff --git a/test/check_datapage.cpp b/test/check_datapage.cpp index 0f29935..716aebf 100644 --- a/test/check_datapage.cpp +++ b/test/check_datapage.cpp @@ -48,10 +48,8 @@ void insertProbeIter(size_t NUM_ENTRIES) if(data_arr.size() > NUM_ENTRIES) data_arr.erase(data_arr.begin()+NUM_ENTRIES, data_arr.end()); - recordid datapage_header_rid = Talloc(xid, DataPage::RegionAllocator::header_size); - DataPage::RegionAllocator * alloc - = new DataPage::RegionAllocator(xid, datapage_header_rid, 10000); // ~ 10 datapages per region. + = new DataPage::RegionAllocator(xid, 10000); // ~ 10 datapages per region. recordid alloc_state = Talloc(xid,sizeof(RegionAllocConf_t));