From 43425128ba60695a90c9399a0bded80864ed3984 Mon Sep 17 00:00:00 2001 From: sears Date: Tue, 14 Dec 2010 00:06:32 +0000 Subject: [PATCH] clean up initialization / marshalling code. Lets some of the public mergeStat fields to be protected git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@1490 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe --- logstore.cpp | 47 ++++++++++++++---------------------- logstore.h | 18 ++++---------- mergeManager.cpp | 50 +++++++++++++++++++++++++++++++++------ mergeManager.h | 10 +++++++- mergeStats.h | 48 +++++++++++++++++++++++++++++++++++-- merger.cpp | 36 +++++++++++++--------------- newserver.cpp | 5 ++-- server.cpp | 31 +++++++++++------------- test/check_merge.cpp | 3 +-- test/check_mergelarge.cpp | 3 +-- test/check_mergetuple.cpp | 3 +-- test/check_testAndSet.cpp | 3 +-- 12 files changed, 157 insertions(+), 100 deletions(-) diff --git a/logstore.cpp b/logstore.cpp index ec1fcd2..99951f0 100644 --- a/logstore.cpp +++ b/logstore.cpp @@ -20,8 +20,11 @@ static inline double tv_to_double(struct timeval tv) ///////////////////////////////////////////////////////////////// template -logtable::logtable(pageid_t internal_region_size, pageid_t datapage_region_size, pageid_t datapage_size) +logtable::logtable(pageid_t max_c0_size, pageid_t internal_region_size, pageid_t datapage_region_size, pageid_t datapage_size) { + this->max_c0_size = max_c0_size; + this->mean_c0_effective_size = max_c0_size; + this->num_c0_mergers = 0; r_val = 3.0; // MIN_R tree_c0 = NULL; @@ -35,7 +38,7 @@ logtable::logtable(pageid_t internal_region_size, pageid_t datapage_regio this->accepting_new_requests = true; this->shutting_down_ = false; flushing = false; - this->merge_mgr = new mergeManager(this); + this->merge_mgr = 0; tmerger = new tuplemerger(&replace_merger); header_mut = rwlc_initlock(); @@ -108,15 +111,14 @@ recordid logtable::allocTable(int xid) //create the small tree tree_c1 = new diskTreeComponent(xid, internal_region_size, datapage_region_size, datapage_size, stats); - c0_stats = merge_mgr->get_merge_stats(0); + merge_mgr = new mergeManager(this); + merge_mgr->set_c0_size(max_c0_size); merge_mgr->new_merge(0); - c0_stats->starting_merge(); + merge_mgr->get_merge_stats(0)->starting_merge(); tree_c0 = new memTreeComponent::rbtree_t; - - update_persistent_header(xid, 1); - update_persistent_header(xid, 2); - + tbl_header.merge_manager = merge_mgr->talloc(xid); + update_persistent_header(xid); return table_rec; } @@ -128,19 +130,15 @@ void logtable::openTable(int xid, recordid rid) { tree_c1 = new diskTreeComponent(xid, tbl_header.c1_root, tbl_header.c1_state, tbl_header.c1_dp_state, 0); tree_c0 = new memTreeComponent::rbtree_t; - merge_mgr->get_merge_stats(1)->bytes_out = tbl_header.c1_base_size; - merge_mgr->get_merge_stats(1)->base_size = tbl_header.c1_base_size; - merge_mgr->get_merge_stats(1)->mergeable_size = tbl_header.c1_mergeable_size; - merge_mgr->get_merge_stats(2)->base_size = tbl_header.c2_base_size; - merge_mgr->get_merge_stats(2)->bytes_out = tbl_header.c2_base_size; + merge_mgr = new mergeManager(this, xid, tbl_header.merge_manager); + merge_mgr->set_c0_size(max_c0_size); - c0_stats = merge_mgr->get_merge_stats(0); merge_mgr->new_merge(0); - c0_stats->starting_merge(); + merge_mgr->get_merge_stats(0)->starting_merge(); } template -void logtable::update_persistent_header(int xid, int merge_level) { +void logtable::update_persistent_header(int xid) { tbl_header.c2_root = tree_c2->get_root_rid(); tbl_header.c2_dp_state = tree_c2->get_datapage_allocator_rid(); @@ -149,16 +147,7 @@ void logtable::update_persistent_header(int xid, int merge_level) { tbl_header.c1_dp_state = tree_c1->get_datapage_allocator_rid(); tbl_header.c1_state = tree_c1->get_internal_node_allocator_rid(); - if(merge_level == 1) { - tbl_header.c1_base_size = merge_mgr->get_merge_stats(1)->bytes_out; - tbl_header.c1_mergeable_size = merge_mgr->get_merge_stats(1)->mergeable_size; - } else if(merge_level == 2) { - tbl_header.c1_mergeable_size = 0; - tbl_header.c2_base_size = merge_mgr->get_merge_stats(2)->bytes_out; - } else { - assert(merge_level == 1 || merge_level == 2); - abort(); - } + merge_mgr->marshal(xid, tbl_header.merge_manager); Tset(xid, table_rec, &tbl_header); } @@ -193,7 +182,7 @@ void logtable::flushTable() } set_c0_is_merging(true); - c0_stats->handed_off_tree(); + merge_mgr->get_merge_stats(0)->handed_off_tree(); merge_mgr->new_merge(0); gettimeofday(&stop_tv,0); @@ -202,7 +191,7 @@ void logtable::flushTable() DEBUG("Signaled c0-c1 merge thread\n"); merge_count ++; - c0_stats->starting_merge(); + merge_mgr->get_merge_stats(0)->starting_merge(); tsize = 0; tree_bytes = 0; @@ -503,7 +492,7 @@ datatuple * logtable::insertTupleHelper(datatuple *tuple) pre_t = *rbitr; //do the merging datatuple *new_t = tmerger->merge(pre_t, tuple); - c0_stats->merged_tuples(new_t, tuple, pre_t); + merge_mgr->get_merge_stats(0)->merged_tuples(new_t, tuple, pre_t); t = new_t; tree_c0->erase(pre_t); //remove the previous tuple diff --git a/logstore.h b/logstore.h index 621a58d..4889f38 100644 --- a/logstore.h +++ b/logstore.h @@ -34,7 +34,7 @@ public: // 6GB ~= 100B * 500 GB / (datapage_size * 4KB) // (100B * 500GB) / (6GB * 4KB) = 2.035 // RCS: Set this to 1 so that we do (on average) one seek per b-tree read. - logtable(pageid_t internal_region_size = 1000, pageid_t datapage_region_size = 10000, pageid_t datapage_size = 1); + logtable(pageid_t max_c0_size = 100 * 1024 * 1024, pageid_t internal_region_size = 1000, pageid_t datapage_region_size = 10000, pageid_t datapage_size = 1); ~logtable(); @@ -88,17 +88,11 @@ public: inline memTreeComponent::rbtree_ptr_t get_tree_c0(){return tree_c0;} inline memTreeComponent::rbtree_ptr_t get_tree_c0_mergeable(){return tree_c0_mergeable;} void set_tree_c0(memTreeComponent::rbtree_ptr_t newtree){tree_c0 = newtree; bump_epoch(); } - void set_max_c0_size(int64_t max_c0_size) { - this->max_c0_size = max_c0_size; - this->mean_c0_effective_size = max_c0_size; - this->num_c0_mergers = 0; - merge_mgr->set_c0_size(max_c0_size); - merge_mgr->get_merge_stats(1); - } + bool get_c0_is_merging() { return c0_is_merging; } void set_c0_is_merging(bool is_merging) { c0_is_merging = is_merging; } void set_tree_c0_mergeable(memTreeComponent::rbtree_ptr_t newtree){tree_c0_mergeable = newtree; bump_epoch(); } - void update_persistent_header(int xid, int merge_level); + void update_persistent_header(int xid); inline tuplemerger * gettuplemerger(){return tmerger;} @@ -111,9 +105,7 @@ public: recordid c1_root; recordid c1_state; recordid c1_dp_state; - pageid_t c2_base_size; - pageid_t c1_mergeable_size; - pageid_t c1_base_size; + recordid merge_manager; }; rwlc * header_mut; pthread_mutex_t tick_mut; @@ -165,8 +157,6 @@ private: std::vector its; - mergeStats * c0_stats; - public: bool shutting_down_; diff --git a/mergeManager.cpp b/mergeManager.cpp index 9bc6617..292463f 100644 --- a/mergeManager.cpp +++ b/mergeManager.cpp @@ -10,6 +10,10 @@ #include "logstore.h" #include "math.h" #include "time.h" +#include +#undef try +#undef end + mergeStats* mergeManager:: get_merge_stats(int mergeLevel) { if (mergeLevel == 0) { return c0; @@ -46,6 +50,7 @@ void mergeManager::new_merge(int mergeLevel) { s->new_merge2(); } void mergeManager::set_c0_size(int64_t size) { + assert(size); c0->target_size = size; } void mergeManager::update_progress(mergeStats * s, int delta) { @@ -241,18 +246,14 @@ void * merge_manager_pretty_print_thread(void * arg) { return m->pretty_print_thread(); } -mergeManager::mergeManager(logtable *ltable): - UPDATE_PROGRESS_PERIOD(0.005), - cur_c1_c2_progress_delta(0.0), - ltable(ltable), - c0(new mergeStats(0, ltable ? ltable->max_c0_size : 10000000)), - c1(new mergeStats(1, (int64_t)(ltable ? ((double)(ltable->max_c0_size) * *ltable->R()) : 100000000.0) )), - c2(new mergeStats(2, 0)) { +void mergeManager::init_helper(void) { struct timeval tv; gettimeofday(&tv, 0); sleeping[0] = false; sleeping[1] = false; sleeping[2] = false; + cur_c1_c2_progress_delta = c2->in_progress - c1->out_progress; + #if EXTENDED_STATS double_to_ts(&c0->stats_last_tick, tv_to_double(&tv)); double_to_ts(&c1->stats_last_tick, tv_to_double(&tv)); @@ -263,6 +264,41 @@ mergeManager::mergeManager(logtable *ltable): pthread_create(&pp_thread, 0, merge_manager_pretty_print_thread, (void*)this); } +mergeManager::mergeManager(logtable *ltable): + UPDATE_PROGRESS_PERIOD(0.005), + ltable(ltable) { + c0 = new mergeStats(0, ltable ? ltable->max_c0_size : 10000000); + c1 = new mergeStats(1, (int64_t)(ltable ? ((double)(ltable->max_c0_size) * *ltable->R()) : 100000000.0) ); + c2 = new mergeStats(2, 0); + init_helper(); +} +mergeManager::mergeManager(logtable *ltable, int xid, recordid rid): + UPDATE_PROGRESS_PERIOD(0.005), + ltable(ltable) { + marshalled_header h; + Tread(xid, rid, &h); + c0 = new mergeStats(xid, h.c0); + c1 = new mergeStats(xid, h.c1); + c2 = new mergeStats(xid, h.c2); + init_helper(); +} +recordid mergeManager::talloc(int xid) { + marshalled_header h; + recordid ret = Talloc(xid, sizeof(h)); + h.c0 = c0->talloc(xid); + h.c1 = c1->talloc(xid); + h.c2 = c2->talloc(xid); + Tset(xid, ret, &h); + return ret; +} +void mergeManager::marshal(int xid, recordid rid) { + marshalled_header h; + Tread(xid, rid, &h); + c0->marshal(xid, h.c0); + c1->marshal(xid, h.c1); + c2->marshal(xid, h.c2); +} + void mergeManager::pretty_print(FILE * out) { #if EXTENDED_STATS diff --git a/mergeManager.h b/mergeManager.h index 48b0396..0cf4f44 100644 --- a/mergeManager.h +++ b/mergeManager.h @@ -38,7 +38,9 @@ public: return (1000000ULL * (uint64_t)tv.tv_sec) + ((uint64_t)tv.tv_usec); } mergeManager(logtable *ltable); - + mergeManager(logtable *ltable, int xid, recordid rid); + void marshal(int xid, recordid rid); + recordid talloc(int xid); ~mergeManager(); void new_merge(int mergelevel); @@ -60,6 +62,12 @@ public: double cur_c1_c2_progress_delta; private: + void init_helper(void); + struct marshalled_header { + recordid c0; + recordid c1; + recordid c2; + }; logtable* ltable; double throttle_seconds; double last_throttle_seconds; diff --git a/mergeStats.h b/mergeStats.h index 15ef37b..cce9f67 100644 --- a/mergeStats.h +++ b/mergeStats.h @@ -21,6 +21,10 @@ #include // XXX for double_to_ts, etc... create a util class. +#include +#undef try +#undef end + class mergeStats { public: mergeStats(int merge_level, int64_t target_size) : @@ -59,6 +63,35 @@ class mergeStats { mergeManager::double_to_ts(&stats_last_tick, mergeManager::tv_to_double(&last)); #endif } + mergeStats(int xid, recordid rid) { + marshalled_header h; + Tread(xid, rid, &h); + merge_level = h.merge_level; + base_size = h.base_size; + mergeable_size = h.mergeable_size; + target_size = h.target_size; + current_size = 0; + bytes_out = base_size; + bytes_in_small = 0; + bytes_in_large = 0; + just_handed_off= false; + delta = 0; + need_tick = 0; + in_progress = 0; + out_progress = ((double)base_size) / (double)target_size; + active = false; + } + recordid talloc(int xid) { + return Talloc(xid, sizeof(marshalled_header)); + } + void marshal(int xid, recordid rid) { + marshalled_header h; + h.merge_level = merge_level; + h.base_size = base_size; + h.mergeable_size = mergeable_size; + h.target_size = h.target_size; + Tset(xid, rid, &h); + } ~mergeStats() { } void new_merge2() { if(just_handed_off) { @@ -110,7 +143,7 @@ class mergeStats { pageid_t output_size() { return bytes_out; } - const int merge_level; // 1 => C0->C1, 2 => C1->C2 + int merge_level; // 1 => C0->C1, 2 => C1->C2 protected: double float_tv(struct timeval& tv) { @@ -118,12 +151,23 @@ class mergeStats { } friend class mergeManager; - public: // XXX only accessed during initialization. + protected: // XXX only accessed during initialization. + struct marshalled_header { + int merge_level; + pageid_t base_size; + pageid_t mergeable_size; + pageid_t target_size; // Needed? + }; + public: pageid_t base_size; // size of table at beginning of merge. for c0, size of table at beginning of current c0-c1 merge round, plus data written since then. (this minus c1->bytes_in_small is the current size) + protected: pageid_t mergeable_size; // protected by mutex. + public: pageid_t target_size; + protected: pageid_t current_size; pageid_t bytes_out; // How many bytes worth of tuples did we write? + public: pageid_t bytes_in_small; // How many bytes from the small input tree (for C0, we ignore tree overheads)? protected: pageid_t bytes_in_large; // Bytes from the large input? diff --git a/merger.cpp b/merger.cpp index 4cc9b2a..263259c 100644 --- a/merger.cpp +++ b/merger.cpp @@ -139,27 +139,28 @@ void * merge_scheduler::memMergeThread() { double new_c1_size = stats->output_size(); pthread_cond_signal(<able_->c0_needed); - ltable_->update_persistent_header(xid, 1); + ltable_->update_persistent_header(xid); Tcommit(xid); //TODO: this is simplistic for now //6: if c1' is too big, signal the other merger - // update c0 effective size. - double frac = 1.0/(double)merge_count; - ltable_->num_c0_mergers = merge_count; - ltable_->mean_c0_effective_size = - (int64_t) ( - ((double)ltable_->mean_c0_effective_size)*(1-frac) + - ((double)stats->bytes_in_small*frac)); - ltable_->merge_mgr->get_merge_stats(0)->target_size = ltable_->mean_c0_effective_size; - double target_R = *ltable_->R(); + if(stats->bytes_in_small) { + // update c0 effective size. + double frac = 1.0/(double)merge_count; + ltable_->num_c0_mergers = merge_count; + ltable_->mean_c0_effective_size = + (int64_t) ( + ((double)ltable_->mean_c0_effective_size)*(1-frac) + + ((double)stats->bytes_in_small*frac)); + ltable_->merge_mgr->get_merge_stats(0)->target_size = ltable_->mean_c0_effective_size; + } - printf("Merge done. R = %f MemSize = %lld Mean = %lld, This = %lld, Count = %d factor %3.3fcur%3.3favg\n", target_R, (long long)ltable_->max_c0_size, (long long int)ltable_->mean_c0_effective_size, stats->bytes_in_small, merge_count, ((double)stats->bytes_in_small) / (double)ltable_->max_c0_size, ((double)ltable_->mean_c0_effective_size) / (double)ltable_->max_c0_size); + printf("Merge done. R = %f MemSize = %lld Mean = %lld, This = %lld, Count = %d factor %3.3fcur%3.3favg\n", *ltable_->R(), (long long)ltable_->max_c0_size, (long long int)ltable_->mean_c0_effective_size, stats->bytes_in_small, merge_count, ((double)stats->bytes_in_small) / (double)ltable_->max_c0_size, ((double)ltable_->mean_c0_effective_size) / (double)ltable_->max_c0_size); - assert(target_R >= MIN_R); - bool signal_c2 = (new_c1_size / ltable_->mean_c0_effective_size > target_R); - DEBUG("\nc1 size %f R %f\n", new_c1_size, target_R); + assert(*ltable_->R() >= MIN_R); + bool signal_c2 = (new_c1_size / ltable_->mean_c0_effective_size > *ltable_->R()); + DEBUG("\nc1 size %f R %f\n", new_c1_size, *ltable_->R()); if( signal_c2 ) { DEBUG("mmt:\tsignaling C2 for merge\n"); @@ -183,10 +184,7 @@ void * merge_scheduler::memMergeThread() { ltable_->set_tree_c1(new diskTreeComponent(xid, ltable_->internal_region_size, ltable_->datapage_region_size, ltable_->datapage_size, stats)); pthread_cond_signal(<able_->c1_ready); - pageid_t old_bytes_out = stats->bytes_out; - stats->bytes_out = 0; // XXX HACK - ltable_->update_persistent_header(xid, 1); - stats->bytes_out = old_bytes_out; + ltable_->update_persistent_header(xid); Tcommit(xid); } @@ -302,7 +300,7 @@ void * merge_scheduler::diskMergeThread() DEBUG("dmt:\tUpdated C2's position on disk to %lld\n",(long long)-1); // 13 - ltable_->update_persistent_header(xid, 2); + ltable_->update_persistent_header(xid); Tcommit(xid); rwlc_unlock(ltable_->header_mut); diff --git a/newserver.cpp b/newserver.cpp index 5e74709..31964ff 100644 --- a/newserver.cpp +++ b/newserver.cpp @@ -22,8 +22,6 @@ int main(int argc, char *argv[]) int xid = Tbegin(); - logtable ltable; - recordid table_root = ROOT_RECORD; int64_t c0_size = 1024 * 1024 * 512 * 1; @@ -38,6 +36,8 @@ int main(int argc, char *argv[]) printf("note: running w/ 2GB c0 for benchmarking\n"); // XXX build a separate test server and deployment server? } + logtable ltable(c0_size); + if(TrecordType(xid, ROOT_RECORD) == INVALID_SLOT) { printf("Creating empty logstore\n"); table_root = ltable.allocTable(xid); @@ -50,7 +50,6 @@ int main(int argc, char *argv[]) } Tcommit(xid); - ltable.set_max_c0_size(c0_size); merge_scheduler * mscheduler = new merge_scheduler(<able); mscheduler->start(); diff --git a/server.cpp b/server.cpp index 8e99538..9a15aa2 100644 --- a/server.cpp +++ b/server.cpp @@ -54,8 +54,21 @@ int main(int argc, char *argv[]) int xid = Tbegin(); + int64_t c0_size = 1024 * 1024 * 512 * 1; - logtable ltable; + if(argc == 2 && !strcmp(argv[1], "--test")) { + + c0_size = 1024 * 1024 * 100; + printf("warning: running w/ tiny c0 for testing"); // XXX build a separate test server and deployment server? + } + + if(argc == 2 && !strcmp(argv[1], "--benchmark")) { + + c0_size = 1024 * 1024 * 1024 * 1; + printf("note: running w/ 2GB c0 for benchmarking"); // XXX build a separate test server and deployment server? + } + + logtable ltable(c0_size); recordid table_root = ROOT_RECORD; if(TrecordType(xid, ROOT_RECORD) == INVALID_SLOT) { @@ -71,22 +84,6 @@ int main(int argc, char *argv[]) Tcommit(xid); - - int64_t c0_size = 1024 * 1024 * 512 * 1; - - if(argc == 2 && !strcmp(argv[1], "--test")) { - - c0_size = 1024 * 1024 * 100; - printf("warning: running w/ tiny c0 for testing"); // XXX build a separate test server and deployment server? - } - - if(argc == 2 && !strcmp(argv[1], "--benchmark")) { - - c0_size = 1024 * 1024 * 1024 * 1; - printf("note: running w/ 2GB c0 for benchmarking"); // XXX build a separate test server and deployment server? - } - - ltable.set_max_c0_size(c0_size); mscheduler = new merge_scheduler(<able); mscheduler->start(); diff --git a/test/check_merge.cpp b/test/check_merge.cpp index 5020846..3f51962 100644 --- a/test/check_merge.cpp +++ b/test/check_merge.cpp @@ -50,8 +50,7 @@ void insertProbeIter(size_t NUM_ENTRIES) int xid = Tbegin(); - logtable * ltable = new logtable(1000, 10000, 5); - ltable->set_max_c0_size(10 * 1024 * 1024); + logtable * ltable = new logtable(10 * 1024 * 1024, 1000, 10000, 5); merge_scheduler mscheduler(ltable); recordid table_root = ltable->allocTable(xid); diff --git a/test/check_mergelarge.cpp b/test/check_mergelarge.cpp index e3dc0ea..062c441 100644 --- a/test/check_mergelarge.cpp +++ b/test/check_mergelarge.cpp @@ -44,8 +44,7 @@ void insertProbeIter(size_t NUM_ENTRIES) int xid = Tbegin(); - logtable *ltable = new logtable(1000, 10000, 100); - ltable->set_max_c0_size(10*1024*1024); + logtable *ltable = new logtable(10*1024*1024, 1000, 10000, 100); merge_scheduler mscheduler(ltable); recordid table_root = ltable->allocTable(xid); diff --git a/test/check_mergetuple.cpp b/test/check_mergetuple.cpp index d0608c9..3475582 100644 --- a/test/check_mergetuple.cpp +++ b/test/check_mergetuple.cpp @@ -102,8 +102,7 @@ void insertProbeIter(size_t NUM_ENTRIES) int xid = Tbegin(); - logtable *ltable = new logtable(1000, 1000, 40); - ltable->set_max_c0_size(10 * 1024 * 1024); + logtable *ltable = new logtable(10 * 1024 * 1024, 1000, 1000, 40); merge_scheduler mscheduler(ltable); recordid table_root = ltable->allocTable(xid); diff --git a/test/check_testAndSet.cpp b/test/check_testAndSet.cpp index df909ca..b2a8f59 100644 --- a/test/check_testAndSet.cpp +++ b/test/check_testAndSet.cpp @@ -61,8 +61,7 @@ void insertProbeIter(size_t NUM_ENTRIES) logtable::init_stasis(); int xid = Tbegin(); - ltable = new logtable(1000, 10000, 5); - ltable->set_max_c0_size(10*1024*1024); + ltable = new logtable(10 * 1024 * 1024, 1000, 10000, 5); merge_scheduler mscheduler(ltable);