From 151373d09a3d5d0a84c70504d61cd2e63de1b03a Mon Sep 17 00:00:00 2001 From: sears Date: Wed, 28 Apr 2010 19:21:25 +0000 Subject: [PATCH] refactoring; move init_stasis into logtable, put merge stats into their own module git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@792 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe --- diskTreeComponent.cpp | 19 +----------- diskTreeComponent.h | 6 ++-- logstore.cpp | 15 +++++++++ logstore.h | 22 ++----------- merger.cpp | 65 +++++---------------------------------- merger.h | 2 -- server.cpp | 4 +-- test/check_datapage.cpp | 4 +-- test/check_gen.cpp | 4 +-- test/check_logtable.cpp | 6 ++-- test/check_logtree.cpp | 4 +-- test/check_merge.cpp | 4 +-- test/check_mergelarge.cpp | 7 ++--- test/check_mergetuple.cpp | 4 +-- 14 files changed, 46 insertions(+), 120 deletions(-) diff --git a/diskTreeComponent.cpp b/diskTreeComponent.cpp index 65dbbec..600041f 100644 --- a/diskTreeComponent.cpp +++ b/diskTreeComponent.cpp @@ -17,8 +17,6 @@ #include #include #include -#include -#include ///////////////////////////////////////////////////////////////// // LOGTREE implementation @@ -33,20 +31,6 @@ const size_t diskTreeComponent::internalNodes::root_rec_size = sizeof(int64_t); const int64_t diskTreeComponent::internalNodes::PREV_LEAF = 0; //pointer to prev leaf page const int64_t diskTreeComponent::internalNodes::NEXT_LEAF = 1; //pointer to next leaf page -// TODO move init_stasis to a more appropriate module - -void diskTreeComponent::internalNodes::init_stasis() { - - bufferManagerFileHandleType = BUFFER_MANAGER_FILE_HANDLE_PFILE; - - DataPage::register_stasis_page_impl(); - - stasis_buffer_manager_factory = stasis_buffer_manager_hash_factory; // XXX workaround stasis issue #22. - - Tinit(); - -} - recordid diskTreeComponent::get_root_rid() { return ltree->get_root_rec(); } recordid diskTreeComponent::get_datapage_allocator_rid() { return ltree->get_datapage_alloc()->header_rid(); } recordid diskTreeComponent::get_internal_node_allocator_rid() { return ltree->get_internal_node_alloc()->header_rid(); } @@ -76,7 +60,7 @@ void diskTreeComponent::writes_done() { } } -int diskTreeComponent::insertTuple(int xid, datatuple *t, merge_stats_t *stats) +int diskTreeComponent::insertTuple(int xid, datatuple *t, mergeStats *stats) { int ret = 0; // no error. if(dp==0) { @@ -141,7 +125,6 @@ datatuple * diskTreeComponent::findTuple(int xid, datatuple::key_t key, size_t k return tup; } -void diskTreeComponent::internalNodes::deinit_stasis() { Tdeinit(); } recordid diskTreeComponent::internalNodes::create(int xid) { pageid_t root = internal_node_alloc->alloc_extent(xid, 1); diff --git a/diskTreeComponent.h b/diskTreeComponent.h index ea996f6..836a5f8 100644 --- a/diskTreeComponent.h +++ b/diskTreeComponent.h @@ -10,6 +10,7 @@ #include "datapage.h" #include "datatuple.h" +#include "mergeStats.h" class diskTreeComponent { public: @@ -36,7 +37,7 @@ class diskTreeComponent { recordid get_internal_node_allocator_rid(); internalNodes * get_internal_nodes() { return ltree; } datatuple* findTuple(int xid, datatuple::key_t key, size_t keySize); - int insertTuple(int xid, /*DataPage *dp,*/ datatuple *t, merge_stats_t *stats); + int insertTuple(int xid, datatuple *t, mergeStats *stats); void writes_done(); @@ -72,9 +73,6 @@ class diskTreeComponent { public: class internalNodes{ public: - // XXX move these to another module. - static void init_stasis(); - static void deinit_stasis(); internalNodes(int xid, pageid_t internal_region_size, pageid_t datapage_region_size, pageid_t datapage_size); internalNodes(int xid, recordid root, recordid internal_node_state, recordid datapage_state); diff --git a/logstore.cpp b/logstore.cpp index e9f6ef4..5741dd8 100644 --- a/logstore.cpp +++ b/logstore.cpp @@ -2,6 +2,9 @@ #include "merger.h" #include +#include +#include + #undef try #undef end @@ -58,6 +61,18 @@ logtable::~logtable() delete tmerger; } +template +void logtable::init_stasis() { + + DataPage::register_stasis_page_impl(); + + Tinit(); + +} + +template +void logtable::deinit_stasis() { Tdeinit(); } + template recordid logtable::allocTable(int xid) { diff --git a/logstore.h b/logstore.h index 52ac373..cd4fb93 100644 --- a/logstore.h +++ b/logstore.h @@ -7,28 +7,11 @@ #include - -typedef struct merge_stats_t { - int merge_level; // 1 => C0->C1, 2 => C1->C2 - pageid_t merge_count; // This is the merge_count'th merge - struct timeval sleep; // When did we go to sleep waiting for input? - struct timeval start; // When did we wake up and start merging? (at steady state with max throughput, this should be equal to sleep) - struct timeval done; // When did we finish merging? - pageid_t bytes_out; // How many bytes did we write (including internal tree nodes)? - pageid_t num_tuples_out; // How many tuples did we write? - pageid_t num_datapages_out; // How many datapages? - pageid_t bytes_in_small; // How many bytes from the small input tree (for C0, we ignore tree overheads)? - pageid_t num_tuples_in_small; // Tuples from the small input? - pageid_t bytes_in_large; // Bytes from the large input? - pageid_t num_tuples_in_large; // Tuples from large input? -} merge_stats_t; - #include "diskTreeComponent.h" #include "memTreeComponent.h" #include "tuplemerger.h" - struct logtable_mergedata; template @@ -63,8 +46,9 @@ public: void openTable(int xid, recordid rid); void flushTable(); - inline recordid & get_table_rec(){return table_rec;} // TODO This is called by merger.cpp for no good reason. (remove the calls) - + static void init_stasis(); + static void deinit_stasis(); + inline uint64_t get_epoch() { return epoch; } void registerIterator(iterator * it); diff --git a/merger.cpp b/merger.cpp index 738386e..2024321 100644 --- a/merger.cpp +++ b/merger.cpp @@ -7,48 +7,6 @@ #undef try #undef end -void merge_stats_pp(FILE* fd, merge_stats_t &stats) { - long long sleep_time = stats.start.tv_sec - stats.sleep.tv_sec; - long long work_time = stats.done.tv_sec - stats.start.tv_sec; - long long total_time = sleep_time + work_time; - double mb_out = ((double)stats.bytes_out) /(1024.0*1024.0); - double mb_ins= ((double)stats.bytes_in_small) /(1024.0*1024.0); - double mb_inl = ((double)stats.bytes_in_large) /(1024.0*1024.0); - double kt_out = ((double)stats.num_tuples_out) /(1024.0); - double kt_ins= ((double)stats.num_tuples_in_small) /(1024.0); - double kt_inl = ((double)stats.num_tuples_in_large) /(1024.0); - double mb_hdd = mb_out + mb_inl + (stats.merge_level == 1 ? 0.0 : mb_ins); - double kt_hdd = kt_out + kt_inl + (stats.merge_level == 1 ? 0.0 : kt_ins); - - - fprintf(fd, - "=====================================================================\n" - "Thread %d merge %lld: sleep %lld sec, run %lld sec\n" - " megabytes kTuples datapages MB/s (real) kTup/s (real)\n" - "Wrote %7lld %7lld %9lld" " %6.1f %6.1f" " %8.1f %8.1f" "\n" - "Read (small) %7lld %7lld - " " %6.1f %6.1f" " %8.1f %8.1f" "\n" - "Read (large) %7lld %7lld - " " %6.1f %6.1f" " %8.1f %8.1f" "\n" - "Disk %7lld %7lld - " " %6.1f %6.1f" " %8.1f %8.1f" "\n" - ".....................................................................\n" - "avg tuple len: %6.2fkb\n" - "effective throughput: (mb/s ; nsec/byte): (%.2f; %.2f) active" "\n" - " (%.2f; %.2f) wallclock" "\n" - ".....................................................................\n" - , - stats.merge_level, stats.merge_count, - sleep_time, - work_time, - (long long)mb_out, (long long)kt_out, stats.num_datapages_out, mb_out / (double)work_time, mb_out / (double)total_time, kt_out / (double)work_time, kt_out / (double)total_time, - (long long)mb_ins, (long long)kt_ins, mb_ins / (double)work_time, mb_ins / (double)total_time, kt_ins / (double)work_time, kt_ins / (double)total_time, - (long long)mb_inl, (long long)kt_inl, mb_inl / (double)work_time, mb_inl / (double)total_time, kt_inl / (double)work_time, kt_inl / (double)total_time, - (long long)mb_hdd, (long long)kt_hdd, mb_hdd / (double)work_time, mb_hdd / (double)total_time, kt_hdd / (double)work_time, kt_hdd / (double)total_time, - mb_out / kt_out, - mb_ins / work_time, 1000.0 * work_time / mb_ins, mb_ins / total_time, 1000.0 * total_time / mb_ins - ); -} - -double merge_stats_nsec_to_merge_in_bytes(merge_stats_t); // how many nsec did we burn on each byte from the small tree (want this to be equal for the two mergers) - int merge_scheduler::addlogtable(logtable *ltable) { @@ -235,7 +193,7 @@ void merge_iterators(int xid, ITB *itrB, logtable *ltable, diskTreeComponent *scratch_tree, - merge_stats_t *stats, + mergeStats *stats, bool dropDeletes); @@ -274,11 +232,7 @@ void* memMergeThread(void*arg) while(true) // 1 { - merge_stats_t stats; - memset((void*)&stats, 0, sizeof(stats)); - stats.merge_level = 1; - stats.merge_count = merge_count; - gettimeofday(&stats.sleep,0); + mergeStats stats(1, merge_count); writelock(ltable->header_lock,0); int done = 0; // 2: wait for c0_mergable @@ -410,7 +364,7 @@ void* memMergeThread(void*arg) unlock(ltable->header_lock); gettimeofday(&stats.done, 0); - merge_stats_pp(stdout, stats); + stats.pretty_print(stdout); //TODO: get the freeing outside of the lock } @@ -434,11 +388,8 @@ void *diskMergeThread(void*arg) while(true) { - merge_stats_t stats; - memset((void*)&stats, 0, sizeof(stats)); - stats.merge_level = 2; - stats.merge_count = merge_count; - gettimeofday(&stats.sleep,0); + mergeStats stats(2, merge_count); + // 2: wait for input writelock(ltable->header_lock,0); int done = 0; @@ -479,7 +430,7 @@ void *diskMergeThread(void*arg) // 4: do the merge. //create the iterators - diskTreeComponent::iterator *itrA = ltable->get_tree_c2()->open_iterator(); //new iterator(ltable->get_tree_c2()->get_root_rec()); + diskTreeComponent::iterator *itrA = ltable->get_tree_c2()->open_iterator(); diskTreeComponent::iterator *itrB = ltable->get_tree_c1_mergeable()->open_iterator(); //create a new tree @@ -530,7 +481,7 @@ void *diskMergeThread(void*arg) unlock(ltable->header_lock); gettimeofday(&stats.done, 0); - merge_stats_pp(stdout, stats); + stats.pretty_print(stdout); } return 0; @@ -543,7 +494,7 @@ void merge_iterators(int xid, ITA *itrA, //iterator on c1 or c2 ITB *itrB, //iterator on c0 or c1, respectively logtable *ltable, - diskTreeComponent *scratch_tree, merge_stats_t *stats, + diskTreeComponent *scratch_tree, mergeStats *stats, bool dropDeletes // should be true iff this is biggest component ) { diff --git a/merger.h b/merger.h index 3a31540..94064be 100644 --- a/merger.h +++ b/merger.h @@ -62,8 +62,6 @@ struct logtable_mergedata }; -#include "logstore.h" // XXX hacky include workaround. - class merge_scheduler { std::vector *, logtable_mergedata*> > mergedata; diff --git a/server.cpp b/server.cpp index 00e7dac..55bd95d 100644 --- a/server.cpp +++ b/server.cpp @@ -37,7 +37,7 @@ void terminate (int param) printf("Deinitializing stasis...\n"); fflush(stdout); - diskTreeComponent::internalNodes::deinit_stasis(); + logtable::deinit_stasis(); exit(0); } @@ -50,7 +50,7 @@ int main(int argc, char *argv[]) prev_fn = signal (SIGINT,terminate); - diskTreeComponent::internalNodes::init_stasis(); + logtable::init_stasis(); int xid = Tbegin(); diff --git a/test/check_datapage.cpp b/test/check_datapage.cpp index ab87a00..b3aa3d8 100644 --- a/test/check_datapage.cpp +++ b/test/check_datapage.cpp @@ -30,7 +30,7 @@ void insertProbeIter(size_t NUM_ENTRIES) sync(); - diskTreeComponent::internalNodes::init_stasis(); + logtable::init_stasis(); int xid = Tbegin(); @@ -118,7 +118,7 @@ void insertProbeIter(size_t NUM_ENTRIES) Tcommit(xid); - diskTreeComponent::internalNodes::deinit_stasis(); + logtable::deinit_stasis(); } diff --git a/test/check_gen.cpp b/test/check_gen.cpp index 4200076..574ae61 100644 --- a/test/check_gen.cpp +++ b/test/check_gen.cpp @@ -14,7 +14,7 @@ int main(int argc, char **argv) sync(); - diskTreeComponent::internalNodes::init_stasis(); + logtable::init_stasis(); int xid = Tbegin(); @@ -33,7 +33,7 @@ int main(int argc, char **argv) delete ro_alloc; Tcommit(xid); - diskTreeComponent::internalNodes::deinit_stasis(); + logtable::deinit_stasis(); diff --git a/test/check_logtable.cpp b/test/check_logtable.cpp index fcff909..5deba61 100644 --- a/test/check_logtable.cpp +++ b/test/check_logtable.cpp @@ -26,7 +26,7 @@ void insertProbeIter(size_t NUM_ENTRIES) sync(); - diskTreeComponent::internalNodes::init_stasis(); + logtable::init_stasis(); int xid = Tbegin(); @@ -56,7 +56,7 @@ void insertProbeIter(size_t NUM_ENTRIES) printf("Stage 1: Writing %d keys\n", NUM_ENTRIES); - merge_stats_t *stats = (merge_stats_t*)calloc(sizeof(stats), 1); + mergeStats *stats = (mergeStats*)calloc(sizeof(stats), 1); for(size_t i = 0; i < NUM_ENTRIES; i++) { @@ -121,7 +121,7 @@ void insertProbeIter(size_t NUM_ENTRIES) printf("Random Reads completed.\n"); Tcommit(xid); - diskTreeComponent::internalNodes::deinit_stasis(); + logtable::deinit_stasis(); } /** @test diff --git a/test/check_logtree.cpp b/test/check_logtree.cpp index d207012..8cd0a5c 100644 --- a/test/check_logtree.cpp +++ b/test/check_logtree.cpp @@ -35,7 +35,7 @@ void insertProbeIter_str(int NUM_ENTRIES) sync(); - diskTreeComponent::internalNodes::init_stasis(); + logtable::init_stasis(); int xid = Tbegin(); @@ -137,7 +137,7 @@ void insertProbeIter_str(int NUM_ENTRIES) delete it; delete ro_alloc; Tcommit(xid); - diskTreeComponent::internalNodes::deinit_stasis(); + logtable::deinit_stasis(); } diff --git a/test/check_merge.cpp b/test/check_merge.cpp index fc19af1..a3904e2 100644 --- a/test/check_merge.cpp +++ b/test/check_merge.cpp @@ -24,7 +24,7 @@ void insertProbeIter(size_t NUM_ENTRIES) unlink("storefile.txt"); unlink("logfile.txt"); - diskTreeComponent::internalNodes::init_stasis(); + logtable::init_stasis(); //data generation std::vector * data_arr = new std::vector; @@ -147,7 +147,7 @@ void insertProbeIter(size_t NUM_ENTRIES) Tcommit(xid); - diskTreeComponent::internalNodes::deinit_stasis(); + logtable::deinit_stasis(); } diff --git a/test/check_mergelarge.cpp b/test/check_mergelarge.cpp index 9db8950..e17c57f 100644 --- a/test/check_mergelarge.cpp +++ b/test/check_mergelarge.cpp @@ -26,7 +26,7 @@ void insertProbeIter(size_t NUM_ENTRIES) sync(); - diskTreeComponent::internalNodes::init_stasis(); + logtable::init_stasis(); //data generation // std::vector * data_arr = new std::vector; @@ -97,10 +97,7 @@ void insertProbeIter(size_t NUM_ENTRIES) gettimeofday(&stop_tv,0); printf("run time: %6.1f\n", (tv_to_double(stop_tv) - tv_to_double(start_tv))); - //Tcommit(xid); - - diskTreeComponent::internalNodes::deinit_stasis(); - + logtable::deinit_stasis(); } diff --git a/test/check_mergetuple.cpp b/test/check_mergetuple.cpp index bc825e1..3e35080 100644 --- a/test/check_mergetuple.cpp +++ b/test/check_mergetuple.cpp @@ -26,7 +26,7 @@ void insertProbeIter(size_t NUM_ENTRIES) sync(); - diskTreeComponent::internalNodes::init_stasis(); + logtable::init_stasis(); double delete_freq = .05; double update_freq = .15; @@ -249,7 +249,7 @@ void insertProbeIter(size_t NUM_ENTRIES) Tcommit(xid); - diskTreeComponent::internalNodes::deinit_stasis(); + logtable::deinit_stasis(); }