From ec747781ac3f0fcd2a47a2bda3d8303918e2ac55 Mon Sep 17 00:00:00 2001 From: sears Date: Wed, 17 Feb 2010 23:38:31 +0000 Subject: [PATCH] partially remove merger_args structure git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@579 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe --- logstore.cpp | 1 + logstore.h | 12 ++++--- merger.cpp | 89 +++++++++++----------------------------------------- merger.h | 11 ------- 4 files changed, 28 insertions(+), 85 deletions(-) diff --git a/logstore.cpp b/logstore.cpp index ad6843f..204eafd 100644 --- a/logstore.cpp +++ b/logstore.cpp @@ -834,6 +834,7 @@ logtable::logtable() tree_c0 = NULL; tree_c1 = NULL; tree_c2 = NULL; + this->still_running_ = true; this->mergedata = 0; fixed_page_count = -1; //tmerger = new tuplemerger(&append_merger); diff --git a/logstore.h b/logstore.h index 9203601..ee848ce 100644 --- a/logstore.h +++ b/logstore.h @@ -189,7 +189,7 @@ public: datatuple * findTuple(int xid, const datatuple::key_t key, size_t keySize, logtree *ltree); - inline recordid & get_table_rec(){return table_rec;} + inline recordid & get_table_rec(){return table_rec;} // TODO This is called by merger.cpp for no good reason. (remove the calls) inline logtree * get_tree_c2(){return tree_c2;} inline logtree * get_tree_c1(){return tree_c1;} @@ -234,9 +234,11 @@ public: int64_t max_c0_size; -private: - - + inline bool is_still_running() { return still_running_; } + inline void stop() { + still_running_ = false; + // XXX must need to do other things! + } private: recordid table_rec; @@ -257,6 +259,8 @@ private: // logtable_mergedata * mergedata; tuplemerger *tmerger; + + bool still_running_; }; diff --git a/merger.cpp b/merger.cpp index fb5b7a1..4d252cb 100644 --- a/merger.cpp +++ b/merger.cpp @@ -57,13 +57,6 @@ merge_scheduler::~merge_scheduler() delete mdata->input_size; //delete the merge thread structure variables - delete (recordid*) mdata->memmerge_args->pageAllocState; - delete (recordid*) mdata->memmerge_args->oldAllocState; - delete mdata->memmerge_args->still_open; - - delete (recordid*) mdata->diskmerge_args->pageAllocState; - delete (recordid*) mdata->diskmerge_args->oldAllocState; - pthread_cond_destroy(mdata->diskmerge_args->in_block_needed_cond); delete mdata->diskmerge_args->in_block_needed_cond; delete mdata->diskmerge_args->in_block_needed; @@ -76,13 +69,9 @@ merge_scheduler::~merge_scheduler() delete mdata->diskmerge_args->in_block_ready_cond; pthread_cond_destroy(mdata->diskmerge_args->out_block_ready_cond); delete mdata->diskmerge_args->out_block_ready_cond; - - delete mdata->diskmerge_args->my_tree_size; delete mdata->diskmerge_args; delete mdata->memmerge_args; - - } mergedata.clear(); @@ -99,8 +88,8 @@ void merge_scheduler::shutdown() //flush the in memory table to write any tuples still in memory ltable->flushTable(); - pthread_mutex_lock(mdata->rbtree_mut); - *(mdata->memmerge_args->still_open)=false; + pthread_mutex_lock(mdata->rbtree_mut); + ltable->stop(); pthread_cond_signal(mdata->input_ready_cond); //*(mdata->diskmerge_args->still_open)=false;//same pointer so no need @@ -138,7 +127,6 @@ void merge_scheduler::startlogtable(int index, int64_t MAX_C0_SIZE) bool *block1_needed = new bool(false); bool *block2_needed = new bool(false); - bool *system_running = new bool(true); //wait to merge the next block until we have merged block FUDGE times. static const int FUDGE = 1; @@ -150,24 +138,18 @@ void merge_scheduler::startlogtable(int index, int64_t MAX_C0_SIZE) ltable->set_tree_c0(new rbtree_t); //disk merger args - recordid * ridp = new recordid; - *ridp = ltable->get_tree_c2()->get_tree_state(); //h.bigTreeAllocState; - recordid * oldridp = new recordid; - *oldridp = NULLRID; ltable->max_c0_size = MAX_C0_SIZE; logtree ** block1_scratch = new logtree*; *block1_scratch=0; - //recordid * allocer_scratch = new recordid; - + DEBUG("Tree C1 is %lld\n", (long long)ltable->get_tree_c1()->get_root_rec().page); + DEBUG("Tree C2 is %lld\n", (long long)ltable->get_tree_c2()->get_root_rec().page); + struct merger_args diskmerge_args= { ltable, 1, //worker id - logtree::alloc_region_rid, //pageAlloc - ridp, // pageAllocState - oldridp, // oldAllocState mdata->rbtree_mut, //block_ready_mutex block1_needed_cond, //in_block_needed_cond block1_needed, //in_block_needed @@ -175,37 +157,19 @@ void merge_scheduler::startlogtable(int index, int64_t MAX_C0_SIZE) block2_needed, //out_block_needed block1_ready_cond, //in_block_ready_cond block2_ready_cond, //out_block_ready_cond - system_running, //still_open i.e. system running - block1_size, //mytree_size ? - 0, //out_tree_size, biggest component computes its size directly. 0, //max_tree_size No max size for biggest component &R, //r_i block1_scratch, //in-tree 0, //out_tree ltable->get_tree_c2()->get_root_rec(), // my_tree - ltable->get_table_rec() //tree }; *mdata->diskmerge_args = diskmerge_args; - DEBUG("Tree C2 is %lld\n", (long long)ltable->get_tree_c2()->get_root_rec().page); - - - //memory merger args - ridp = new recordid; - *ridp = ltable->get_tree_c1()->get_tree_state(); - oldridp = new recordid; - *oldridp = NULLRID; - - DEBUG("Tree C1 is %lld\n", (long long)ltable->get_tree_c1()->get_root_rec().page); - struct merger_args memmerge_args = { ltable, 2, - logtree::alloc_region_rid, //pageAlloc - ridp, // pageAllocState - oldridp, // oldAllocState mdata->rbtree_mut, //block_ready_mutex mdata->input_needed_cond, mdata->input_needed, @@ -213,15 +177,11 @@ void merge_scheduler::startlogtable(int index, int64_t MAX_C0_SIZE) block1_needed, mdata->input_ready_cond, block1_ready_cond, - system_running, - mdata->input_size, - block1_size, (int64_t)(R * R * MAX_C0_SIZE), &R, mdata->old_c0, block1_scratch, ltable->get_tree_c1()->get_root_rec(), - ltable->get_table_rec() //tree }; *mdata->memmerge_args = memmerge_args; @@ -261,7 +221,7 @@ void* memMergeThread(void*arg) //pthread_cond_signal(a->in_block_needed_cond); pthread_cond_broadcast(a->in_block_needed_cond); - if(!*(a->still_open)){ + if(!ltable->is_still_running()){ done = 1; pthread_mutex_unlock(a->block_ready_mut); break; @@ -315,6 +275,7 @@ void* memMergeThread(void*arg) //save the old dp state values DataPage::RegionAllocator *old_alloc = ltable->get_tree_c1()->get_alloc(); old_alloc->done(); // XXX do this earlier + recordid oldAllocState = ltable->get_tree_c1()->get_tree_state(); //pthread_mutex_unlock(a->block_ready_mut); unlock(ltable->mergedata->header_lock); @@ -340,28 +301,26 @@ void* memMergeThread(void*arg) writelock(ltable->mergedata->header_lock,0); merge_count++; - *a->my_tree_size = mergedPages; printf("mmt:\tmerge_count %d #pages written %lld\n", merge_count, npages); delete ltable->get_tree_c1(); ltable->set_tree_c1(scratch_tree); logtable::table_header h; - void * oldAllocState = a->pageAllocState; - Tread(xid, a->tree, &h); + Tread(xid, ltable->get_table_rec(), &h); h.c1_root = scratch_root; h.c1_state = scratch_alloc_state; //note we already updated the dpstate before the merge printf("mmt:\tUpdated C1's position on disk to %lld\n",scratch_root.page); - Tset(xid, a->tree, &h); + Tset(xid, ltable->get_table_rec(), &h); //Tcommit(xid); //xid = Tbegin(); // free old my_tree here //TODO: check - logtree::free_region_rid(xid, a->my_tree, logtree::dealloc_region_rid, oldAllocState); + logtree::free_region_rid(xid, a->my_tree, logtree::dealloc_region_rid, &oldAllocState); //free the old data pages old_alloc->dealloc_regions(xid); @@ -401,19 +360,17 @@ void* memMergeThread(void*arg) logtree *empty_tree = new logtree(new DataPage::RegionAllocator(xid, ltable->get_dpstate1() /*rid of old header*/, 10000)); // XXX should not hardcode region size); empty_tree->create(xid); - - *(recordid*)(a->pageAllocState) = empty_tree->get_tree_state(); a->my_tree = empty_tree->get_root_rec(); ltable->set_tree_c1(empty_tree); logtable::table_header h; - Tread(xid, a->tree, &h); + Tread(xid, ltable->get_table_rec(), &h); h.c1_root = empty_tree->get_root_rec(); //update root h.c1_state = empty_tree->get_tree_state(); //update index alloc state printf("mmt:\tUpdated C1's position on disk to %lld\n",empty_tree->get_root_rec().page); - Tset(xid, a->tree, &h); + Tset(xid, ltable->get_table_rec(), &h); Tcommit(xid); //xid = Tbegin(); @@ -421,7 +378,6 @@ void* memMergeThread(void*arg) else //not signaling the C2 for merge yet { printf("mmt:\tnot signaling C2 for merge\n"); - *(recordid*)a->pageAllocState = scratch_alloc_state; a->my_tree = scratch_root; } @@ -470,7 +426,7 @@ void *diskMergeThread(void*arg) *a->in_block_needed = true; pthread_cond_signal(a->in_block_needed_cond); - if(!*(a->still_open)){ + if(!ltable->is_still_running()){ done = 1; pthread_mutex_unlock(a->block_ready_mut); break; @@ -513,6 +469,8 @@ void *diskMergeThread(void*arg) DataPage::RegionAllocator *old_alloc1 = ltable->get_tree_c1()->get_alloc(); DataPage::RegionAllocator *old_alloc2 = ltable->get_tree_c2()->get_alloc(); + recordid oldAllocState = ltable->get_tree_c2()->get_tree_state(); + unlock(ltable->mergedata->header_lock); @@ -537,7 +495,6 @@ void *diskMergeThread(void*arg) writelock(ltable->mergedata->header_lock,0); merge_count++; - *a->my_tree_size = mergedPages; //update the current optimal R value *(a->r_i) = std::max(MIN_R, sqrt( (npages * 1.0) / (ltable->max_c0_size/PAGE_SIZE) ) ); @@ -547,24 +504,21 @@ void *diskMergeThread(void*arg) ltable->set_tree_c2(scratch_tree); logtable::table_header h; - void * oldAllocState = a->pageAllocState; - Tread(xid, a->tree, &h); + Tread(xid, ltable->get_table_rec(), &h); h.c2_root = scratch_root; h.c2_state = scratch_alloc_state; //note we already updated the dpstate before the merge printf("dmt:\tUpdated C2's position on disk to %lld\n",scratch_root.page); - Tset(xid, a->tree, &h); + Tset(xid, ltable->get_table_rec(), &h); // free old my_tree here //TODO: check - logtree::free_region_rid(xid, a->my_tree, logtree::dealloc_region_rid, oldAllocState); - //TlsmFree(xid,a->my_tree->r_,logtree::dealloc_region_rid,oldAllocState); + logtree::free_region_rid(xid, a->my_tree, logtree::dealloc_region_rid, &oldAllocState); //free the old data pages old_alloc2->dealloc_regions(xid); - - *(recordid*)a->pageAllocState = scratch_alloc_state; + a->my_tree = scratch_root; //// ----------- Free in_tree @@ -573,11 +527,6 @@ void *diskMergeThread(void*arg) logtree::dealloc_region_rid, &((*a->in_tree)->get_tree_state())); old_alloc1->dealloc_regions(xid); // XXX make sure that both of these are 'unlinked' before this happens - //TlsmFree(xid,a->my_tree->r_,logtree::dealloc_region_rid,oldAllocState); - - //TODO: check - //free the old data pages -// DataPage::dealloc_region_rid(xid, a->in_tree_allocer);//TODO: Tcommit(xid); diff --git a/merger.h b/merger.h index fc0eee7..c38481b 100644 --- a/merger.h +++ b/merger.h @@ -17,11 +17,6 @@ struct merger_args logtable * ltable; int worker_id; - //page allocation information - pageid_t(*pageAlloc)(int,void*); - void *pageAllocState; - void *oldAllocState; - pthread_mutex_t * block_ready_mut; pthread_cond_t * in_block_needed_cond; @@ -33,10 +28,6 @@ struct merger_args pthread_cond_t * in_block_ready_cond; pthread_cond_t * out_block_ready_cond; - bool * still_open; - - int64_t * my_tree_size; - int64_t * out_tree_size; int64_t max_size; //pageid_t double * r_i; @@ -45,8 +36,6 @@ struct merger_args logtree ** out_tree; recordid my_tree; - - recordid tree; };