diff --git a/logstore.cpp b/logstore.cpp index 3524c0b..a08b270 100644 --- a/logstore.cpp +++ b/logstore.cpp @@ -50,10 +50,6 @@ logtable::logtable(pageid_t internal_region_size, pageid_t datapage_regio this->internal_region_size = internal_region_size; this->datapage_region_size = datapage_region_size; this->datapage_size = datapage_size; - - c0_stats = merge_mgr->get_merge_stats(0); - merge_mgr->new_merge(0); - c0_stats->starting_merge(); } template @@ -104,7 +100,13 @@ recordid logtable::allocTable(int xid) //create the small tree tree_c1 = new diskTreeComponent(xid, internal_region_size, datapage_region_size, datapage_size, stats); - update_persistent_header(xid); + c0_stats = merge_mgr->get_merge_stats(0); + merge_mgr->new_merge(0); + c0_stats->starting_merge(); + + update_persistent_header(xid, 1); + update_persistent_header(xid, 2); + return table_rec; } @@ -114,9 +116,20 @@ void logtable::openTable(int xid, recordid rid) { Tread(xid, table_rec, &tbl_header); tree_c2 = new diskTreeComponent(xid, tbl_header.c2_root, tbl_header.c2_state, tbl_header.c2_dp_state, 0); tree_c1 = new diskTreeComponent(xid, tbl_header.c1_root, tbl_header.c1_state, tbl_header.c1_dp_state, 0); + + merge_mgr->get_merge_stats(1)->bytes_out = tbl_header.c1_base_size; + merge_mgr->get_merge_stats(1)->base_size = tbl_header.c1_base_size; + merge_mgr->get_merge_stats(1)->mergeable_size = tbl_header.c1_mergeable_size; + merge_mgr->get_merge_stats(2)->base_size = tbl_header.c2_base_size; + merge_mgr->get_merge_stats(2)->bytes_out = tbl_header.c2_base_size; + + c0_stats = merge_mgr->get_merge_stats(0); + merge_mgr->new_merge(0); + c0_stats->starting_merge(); + } template -void logtable::update_persistent_header(int xid) { +void logtable::update_persistent_header(int xid, int merge_level) { tbl_header.c2_root = tree_c2->get_root_rid(); tbl_header.c2_dp_state = tree_c2->get_datapage_allocator_rid(); @@ -125,6 +138,17 @@ void logtable::update_persistent_header(int xid) { tbl_header.c1_dp_state = tree_c1->get_datapage_allocator_rid(); tbl_header.c1_state = tree_c1->get_internal_node_allocator_rid(); + if(merge_level == 1) { + tbl_header.c1_base_size = merge_mgr->get_merge_stats(1)->bytes_out; + tbl_header.c1_mergeable_size = merge_mgr->get_merge_stats(1)->mergeable_size; + } else if(merge_level == 2) { + tbl_header.c1_mergeable_size = 0; + tbl_header.c2_base_size = merge_mgr->get_merge_stats(2)->bytes_out; + } else { + assert(merge_level == 1 || merge_level == 2); + abort(); + } + Tset(xid, table_rec, &tbl_header); } @@ -218,8 +242,8 @@ datatuple * logtable::findTuple(int xid, const datatuple::key_t key, size ret_tuple = (*rbitr)->create_copy(); } - rwlc_readlock(header_mut); // has to be before rb_mut, or we could merge the tuple with itself due to an intervening merge pthread_mutex_unlock(&rb_mut); + rwlc_readlock(header_mut); // XXX: FIXME with optimisitic concurrency control. Has to be before rb_mut, or we could merge the tuple with itself due to an intervening merge bool done = false; //step: 2 look into first in tree if exists (a first level merge going on) @@ -371,8 +395,8 @@ datatuple * logtable::findTuple_first(int xid, datatuple::key_t key, size { DEBUG("Not in mem tree %d\n", tree_c0->size()); - rwlc_readlock(header_mut); pthread_mutex_unlock(&rb_mut); + rwlc_readlock(header_mut); // XXX FIXME WITH OCC!! //step: 2 look into first in tree if exists (a first level merge going on) if(get_tree_c0_mergeable() != NULL) diff --git a/logstore.h b/logstore.h index 320bde7..c91683d 100644 --- a/logstore.h +++ b/logstore.h @@ -81,7 +81,7 @@ public: merge_mgr->get_merge_stats(1); } void set_tree_c0_mergeable(memTreeComponent::rbtree_ptr_t newtree){tree_c0_mergeable = newtree; bump_epoch(); } - void update_persistent_header(int xid); + void update_persistent_header(int xid, int merge_level); void setMergeData(logtable_mergedata * mdata); logtable_mergedata* getMergeData(){return mergedata;} @@ -97,6 +97,9 @@ public: recordid c1_root; recordid c1_state; recordid c1_dp_state; + pageid_t c2_base_size; + pageid_t c1_mergeable_size; + pageid_t c1_base_size; }; logtable_mergedata * mergedata; diff --git a/mergeStats.h b/mergeStats.h index c43a12f..4427001 100644 --- a/mergeStats.h +++ b/mergeStats.h @@ -25,9 +25,9 @@ class mergeStats { merge_level(merge_level), merge_count(0), base_size(0), + mergeable_size(0), target_size(target_size), current_size(0), - mergeable_size(0), bytes_out_with_overhead(0), bytes_out(0), num_tuples_out(0), @@ -114,14 +114,17 @@ class mergeStats { friend class mergeManager; struct timespec last_tick; - + public: // XXX only accessed during initialization. pageid_t base_size; + pageid_t mergeable_size; // protected by mutex. + protected: pageid_t target_size; pageid_t current_size; - pageid_t mergeable_size; // protected by mutex. pageid_t bytes_out_with_overhead;// How many bytes did we write (including internal tree nodes)? + public: pageid_t bytes_out; // How many bytes worth of tuples did we write? + protected: pageid_t num_tuples_out; // How many tuples did we write? pageid_t num_datapages_out; // How many datapages? pageid_t bytes_in_small; // How many bytes from the small input tree (for C0, we ignore tree overheads)? diff --git a/merger.cpp b/merger.cpp index 3f7a9ea..0b7fd9f 100644 --- a/merger.cpp +++ b/merger.cpp @@ -14,9 +14,6 @@ int merge_scheduler::addlogtable(logtable *ltable) // initialize merge data ltable->set_tree_c0_mergeable(NULL); - - mdata->diskmerge_args = new merger_args; - mdata->memmerge_args = new merger_args; mergedata.push_back(std::make_pair(ltable, mdata)); return mergedata.size()-1; @@ -25,14 +22,6 @@ int merge_scheduler::addlogtable(logtable *ltable) merge_scheduler::~merge_scheduler() { - for(size_t i=0; i *ltable = mergedata[i].first; - logtable_mergedata *mdata = mergedata[i].second; - - delete mdata->diskmerge_args; - delete mdata->memmerge_args; - } mergedata.clear(); } @@ -76,26 +65,11 @@ void merge_scheduler::startlogtable(int index, int64_t MAX_C0_SIZE) DEBUG("Tree C1 is %lld\n", (long long)ltable->get_tree_c1()->get_root_rec().page); DEBUG("Tree C2 is %lld\n", (long long)ltable->get_tree_c2()->get_root_rec().page); - struct merger_args diskmerge_args= { - ltable, - 0, //max_tree_size No max size for biggest component - }; - - *mdata->diskmerge_args = diskmerge_args; - - struct merger_args memmerge_args = - { - ltable, - (int64_t)(MAX_C0_SIZE), // XXX why did this multiply by R^2 before?? - }; - - *mdata->memmerge_args = memmerge_args; - void * (*diskmerger)(void*) = diskMergeThread; void * (*memmerger)(void*) = memMergeThread; - pthread_create(&mdata->diskmerge_thread, 0, diskmerger, mdata->diskmerge_args); - pthread_create(&mdata->memmerge_thread, 0, memmerger, mdata->memmerge_args); + pthread_create(&mdata->diskmerge_thread, 0, diskmerger, ltable); + pthread_create(&mdata->memmerge_thread, 0, memmerger, ltable); } @@ -135,13 +109,11 @@ void* memMergeThread(void*arg) int xid; - merger_args * a = (merger_args*)(arg); - - logtable * ltable = a->ltable; + logtable * ltable = (logtable*)arg; assert(ltable->get_tree_c1()); int merge_count =0; - mergeStats * stats = a->ltable->merge_mgr->get_merge_stats(1); + mergeStats * stats = ltable->merge_mgr->get_merge_stats(1); while(true) // 1 { @@ -227,7 +199,7 @@ void* memMergeThread(void*arg) double new_c1_size = stats->output_size(); pthread_cond_signal(<able->c0_needed); - ltable->update_persistent_header(xid); + ltable->update_persistent_header(xid, 1); Tcommit(xid); ltable->merge_mgr->finished_merge(1); @@ -261,8 +233,10 @@ void* memMergeThread(void*arg) ltable->set_tree_c1(new diskTreeComponent(xid, ltable->internal_region_size, ltable->datapage_region_size, ltable->datapage_size, stats)); pthread_cond_signal(<able->c1_ready); - - ltable->update_persistent_header(xid); + pageid_t old_bytes_out = stats->bytes_out; + stats->bytes_out = 0; // XXX HACK + ltable->update_persistent_header(xid, 1); + stats->bytes_out = old_bytes_out; Tcommit(xid); } @@ -285,14 +259,12 @@ void *diskMergeThread(void*arg) { int xid; - merger_args * a = (merger_args*)(arg); - - logtable * ltable = a->ltable; + logtable * ltable = (logtable*)arg; assert(ltable->get_tree_c2()); int merge_count =0; - mergeStats * stats = a->ltable->merge_mgr->get_merge_stats(2); + mergeStats * stats = ltable->merge_mgr->get_merge_stats(2); while(true) { @@ -378,7 +350,7 @@ void *diskMergeThread(void*arg) DEBUG("dmt:\tUpdated C2's position on disk to %lld\n",(long long)-1); // 13 - ltable->update_persistent_header(xid); + ltable->update_persistent_header(xid, 2); Tcommit(xid); ltable->merge_mgr->finished_merge(2); diff --git a/merger.h b/merger.h index 46d7373..9206455 100644 --- a/merger.h +++ b/merger.h @@ -12,23 +12,11 @@ static const int RB_TREE_OVERHEAD = 400; static const double MIN_R = 3.0; -struct merger_args -{ - logtable * ltable; - int64_t max_size; -}; - struct logtable_mergedata { //merge threads pthread_t diskmerge_thread; pthread_t memmerge_thread; - - //merge args 1 - struct merger_args *diskmerge_args; - //merge args 2 - struct merger_args *memmerge_args; - }; class merge_scheduler