From 632afe6d9c5166a499cd0d1cbfe42cc6f21f2d83 Mon Sep 17 00:00:00 2001 From: sears Date: Fri, 5 Mar 2010 19:07:47 +0000 Subject: [PATCH] cleanup statistics gathering and reporting. constify inputs to merge functions (so they can't call freetuple()) git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@662 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe --- logserver.cpp | 4 - logstore.cpp | 12 +-- merger.cpp | 201 +++++++++++++++++++++++++++++++++++------------- merger.h | 9 --- tuplemerger.cpp | 7 +- tuplemerger.h | 9 +-- 6 files changed, 158 insertions(+), 84 deletions(-) diff --git a/logserver.cpp b/logserver.cpp index 6c25bb2..d7e4321 100644 --- a/logserver.cpp +++ b/logserver.cpp @@ -587,10 +587,6 @@ int op_stat_perf_report(pthread_data* data) { } -//pageid_t diskTreeComponent::build_histogram(int xid, pageid_t bucket_count, const byte **key_array, size_t * size_array) { -// -//} - int op_stat_histogram(pthread_data* data, size_t limit) { diff --git a/logstore.cpp b/logstore.cpp index 79efa75..7010e48 100644 --- a/logstore.cpp +++ b/logstore.cpp @@ -139,12 +139,9 @@ void logtable::flushTable() //this is for waiting the previous merger of the mem-tree //hopefullly this wont happen - printf("prv merge not complete\n"); - while(get_tree_c0_mergeable()) { unlock(header_lock); -// pthread_mutex_lock(mergedata->rbtree_mut); if(tree_bytes >= max_c0_size) pthread_cond_wait(mergedata->input_needed_cond, mergedata->rbtree_mut); else @@ -168,17 +165,12 @@ void logtable::flushTable() } - printf("prv merge complete\n"); - gettimeofday(&stop_tv,0); stop = tv_to_double(stop_tv); - //rbtree_ptr *tmp_ptr = new rbtree_ptr_t; //(typeof(h->scratch_tree)*) malloc(sizeof(void*)); set_tree_c0_mergeable(get_tree_c0()); -// pthread_mutex_lock(mergedata->rbtree_mut); pthread_cond_signal(mergedata->input_ready_cond); -// pthread_mutex_unlock(mergedata->rbtree_mut); merge_count ++; set_tree_c0(new rbtree_t); @@ -190,12 +182,12 @@ void logtable::flushTable() unlock(header_lock); if(first) { - printf("flush waited %f sec\n", stop-start); + printf("Blocked writes for %f sec\n", stop-start); first = 0; } else { - printf("flush waited %f sec (worked %f)\n", + printf("Blocked writes for %f sec (serviced writes for %f sec)\n", stop-start, start-last_start); } last_start = stop; diff --git a/merger.cpp b/merger.cpp index e3360c7..ee2c35e 100644 --- a/merger.cpp +++ b/merger.cpp @@ -4,11 +4,67 @@ #include "logiterators.cpp" #include "datapage.h" +typedef struct merge_stats_t { + int merge_level; // 1 => C0->C1, 2 => C1->C2 + pageid_t merge_count; // This is the merge_count'th merge + struct timeval sleep; // When did we go to sleep waiting for input? + struct timeval start; // When did we wake up and start merging? (at steady state with max throughput, this should be equal to sleep) + struct timeval done; // When did we finish merging? + pageid_t bytes_out; // How many bytes did we write (including internal tree nodes)? + pageid_t num_tuples_out; // How many tuples did we write? + pageid_t num_datapages_out; // How many datapages? + pageid_t bytes_in_small; // How many bytes from the small input tree (for C0, we ignore tree overheads)? + pageid_t num_tuples_in_small; // Tuples from the small input? + pageid_t bytes_in_large; // Bytes from the large input? + pageid_t num_tuples_in_large; // Tuples from large input? +} merge_stats_t; + +void merge_stats_pp(FILE* fd, merge_stats_t &stats) { + long long sleep_time = stats.start.tv_sec - stats.sleep.tv_sec; + long long work_time = stats.done.tv_sec - stats.start.tv_sec; + long long total_time = sleep_time + work_time; + double mb_out = ((double)stats.bytes_out) /(1024.0*1024.0); + double mb_ins= ((double)stats.bytes_in_small) /(1024.0*1024.0); + double mb_inl = ((double)stats.bytes_in_large) /(1024.0*1024.0); + double kt_out = ((double)stats.num_tuples_out) /(1024.0); + double kt_ins= ((double)stats.num_tuples_in_small) /(1024.0); + double kt_inl = ((double)stats.num_tuples_in_large) /(1024.0); + double mb_hdd = mb_out + mb_inl + (stats.merge_level == 1 ? 0.0 : mb_ins); + double kt_hdd = kt_out + kt_inl + (stats.merge_level == 1 ? 0.0 : kt_ins); + + + fprintf(fd, + "=====================================================================\n" + "Thread %d merge %lld: sleep %lld sec, run %lld sec\n" + " megabytes kTuples datapages MB/s (real) kTup/s (real)\n" + "Wrote %7lld %7lld %9lld" " %6.1f %6.1f" " %8.1f %8.1f" "\n" + "Read (small) %7lld %7lld - " " %6.1f %6.1f" " %8.1f %8.1f" "\n" + "Read (large) %7lld %7lld - " " %6.1f %6.1f" " %8.1f %8.1f" "\n" + "Disk %7lld %7lld - " " %6.1f %6.1f" " %8.1f %8.1f" "\n" + ".....................................................................\n" + "avg tuple len: %6.2fkb\n" + "effective throughput: (mb/s ; nsec/byte): (%.2f; %.2f) active" "\n" + " (%.2f; %.2f) wallclock" "\n" + ".....................................................................\n" + , + stats.merge_level, stats.merge_count, + sleep_time, + work_time, + (long long)mb_out, (long long)kt_out, stats.num_datapages_out, mb_out / (double)work_time, mb_out / (double)total_time, kt_out / (double)work_time, kt_out / (double)total_time, + (long long)mb_ins, (long long)kt_ins, mb_ins / (double)work_time, mb_ins / (double)total_time, kt_ins / (double)work_time, kt_ins / (double)total_time, + (long long)mb_inl, (long long)kt_inl, mb_inl / (double)work_time, mb_inl / (double)total_time, kt_inl / (double)work_time, kt_inl / (double)total_time, + (long long)mb_hdd, (long long)kt_hdd, mb_hdd / (double)work_time, mb_hdd / (double)total_time, kt_hdd / (double)work_time, kt_hdd / (double)total_time, + mb_out / kt_out, + mb_ins / work_time, 1000.0 * work_time / mb_ins, mb_ins / total_time, 1000.0 * total_time / mb_ins + ); +} + +double merge_stats_nsec_to_merge_in_bytes(merge_stats_t); // how many nsec did we burn on each byte from the small tree (want this to be equal for the two mergers) + inline DataPage* insertTuple(int xid, DataPage *dp, datatuple *t, logtable *ltable, - diskTreeComponent * ltree, - int64_t &dpages, int64_t &npages); + diskTreeComponent * ltree, merge_stats_t*); int merge_scheduler::addlogtable(logtable *ltable) { @@ -184,6 +240,16 @@ void merge_scheduler::startlogtable(int index, int64_t MAX_C0_SIZE) } +template +void merge_iterators(int xid, + ITA *itrA, + ITB *itrB, + logtable *ltable, + diskTreeComponent *scratch_tree, + merge_stats_t *stats, + bool dropDeletes); + + /** * Merge algorithm: Outsider's view *
@@ -208,7 +274,7 @@ void merge_scheduler::startlogtable(int index, int64_t MAX_C0_SIZE)
 void* memMergeThread(void*arg)
 {
 
-    int xid;// = Tbegin();
+    int xid;
 
     merger_args * a = (merger_args*)(arg);
 
@@ -219,6 +285,10 @@ void* memMergeThread(void*arg)
     
     while(true) // 1
     {
+    	merge_stats_t stats;
+    	stats.merge_level = 1;
+    	stats.merge_count = merge_count;
+    	gettimeofday(&stats.sleep,0);
         writelock(ltable->header_lock,0);
         int done = 0;
         // 2: wait for c0_mergable
@@ -235,14 +305,14 @@ void* memMergeThread(void*arg)
                 break;
             }
             
-            printf("mmt:\twaiting for block ready cond\n");
+            DEBUG("mmt:\twaiting for block ready cond\n");
             unlock(ltable->header_lock);
             
             pthread_cond_wait(a->in_block_ready_cond, a->block_ready_mut);
             pthread_mutex_unlock(a->block_ready_mut);
             
             writelock(ltable->header_lock,0);
-            printf("mmt:\tblock ready\n");
+            DEBUG("mmt:\tblock ready\n");
             
         }        
         *a->in_block_needed = false;
@@ -256,6 +326,8 @@ void* memMergeThread(void*arg)
             break;
         }
 
+    	gettimeofday(&stats.start, 0);
+
         // 3: Begin transaction
         xid = Tbegin();
 
@@ -274,10 +346,9 @@ void* memMergeThread(void*arg)
         unlock(ltable->header_lock);
 
         //: do the merge
-        printf("mmt:\tMerging:\n");
+        DEBUG("mmt:\tMerging:\n");
 
-        int64_t npages = 0;
-        int64_t mergedPages = merge_iterators(xid, itrA, itrB, ltable, c1_prime, npages, false);
+        merge_iterators(xid, itrA, itrB, ltable, c1_prime, &stats, false);
 
         delete itrA;
         delete itrB;
@@ -290,7 +361,7 @@ void* memMergeThread(void*arg)
         c1_prime->get_alloc()->force_regions(xid);
 
         merge_count++;        
-        printf("mmt:\tmerge_count %d #pages written %lld\n", merge_count, npages);      
+        DEBUG("mmt:\tmerge_count %lld #bytes written %lld\n", stats.merge_count, stats.bytes_out);
 
         writelock(ltable->header_lock,0);
 
@@ -298,14 +369,14 @@ void* memMergeThread(void*arg)
         //TODO: this is simplistic for now
         //6: if c1' is too big, signal the other merger
         double target_R = *(a->r_i);
-        double new_c1_size = npages * PAGE_SIZE;
+        double new_c1_size = stats.bytes_out;
         assert(target_R >= MIN_R);
         bool signal_c2 = (new_c1_size / ltable->max_c0_size > target_R) ||
                 (a->max_size && new_c1_size > a->max_size );
         if( signal_c2  )
         {
-        	printf("mmt:\tsignaling C2 for merge\n");
-            printf("mmt:\tnew_c1_size %.2f\tMAX_C0_SIZE %lld\ta->max_size %lld\t targetr %.2f \n", new_c1_size,
+        	DEBUG("mmt:\tsignaling C2 for merge\n");
+            DEBUG("mmt:\tnew_c1_size %.2f\tMAX_C0_SIZE %lld\ta->max_size %lld\t targetr %.2f \n", new_c1_size,
                    ltable->max_c0_size, a->max_size, target_R);
 
             // XXX need to report backpressure here!  Also, shouldn't be inside a transaction while waiting on backpressure.
@@ -344,18 +415,19 @@ void* memMergeThread(void*arg)
         	ltable->set_tree_c1(c1_prime);
         }
 
-        printf("mmt:\tUpdated C1's position on disk to %lld\n",ltable->get_tree_c1()->get_root_rec().page);
+        DEBUG("mmt:\tUpdated C1's position on disk to %lld\n",ltable->get_tree_c1()->get_root_rec().page);
         // 13
         ltable->update_persistent_header(xid);
         Tcommit(xid);
 
         unlock(ltable->header_lock);
         
+        gettimeofday(&stats.done, 0);
+        merge_stats_pp(stdout, stats);
+
         //TODO: get the freeing outside of the lock
     }
 
-    //pthread_mutex_unlock(a->block_ready_mut);
-    
     return 0;
 
 }
@@ -369,12 +441,16 @@ void *diskMergeThread(void*arg)
 
     logtable * ltable = a->ltable;
     assert(ltable->get_tree_c2());
-    
+
+
     int merge_count =0;
-    //pthread_mutex_lock(a->block_ready_mut);
     
     while(true)
     {
+    	merge_stats_t stats;
+    	stats.merge_level = 2;
+    	stats.merge_count = merge_count;
+    	gettimeofday(&stats.sleep,0);
     	// 2: wait for input
     	writelock(ltable->header_lock,0);
         int done = 0;
@@ -391,13 +467,13 @@ void *diskMergeThread(void*arg)
                 break;
             }
             
-            printf("dmt:\twaiting for block ready cond\n");
+            DEBUG("dmt:\twaiting for block ready cond\n");
             unlock(ltable->header_lock);
             
             pthread_cond_wait(a->in_block_ready_cond, a->block_ready_mut);
             pthread_mutex_unlock(a->block_ready_mut);
 
-            printf("dmt:\tblock ready\n");
+            DEBUG("dmt:\tblock ready\n");
             writelock(ltable->header_lock,0);
         }        
         *a->in_block_needed = false;
@@ -407,10 +483,10 @@ void *diskMergeThread(void*arg)
             unlock(ltable->header_lock);
             break;
         }
-        
-        int64_t mergedPages=0;
-        
-        // 3: begin
+
+    	gettimeofday(&stats.start, 0);
+
+    	// 3: begin
         xid = Tbegin();
 
         // 4: do the merge.
@@ -426,10 +502,9 @@ void *diskMergeThread(void*arg)
         unlock(ltable->header_lock);
         
         //do the merge        
-        printf("dmt:\tMerging:\n");
+        DEBUG("dmt:\tMerging:\n");
 
-        int64_t npages = 0;
-        mergedPages = merge_iterators(xid, itrA, itrB, ltable, c2_prime, npages, true);
+        merge_iterators(xid, itrA, itrB, ltable, c2_prime, &stats, true);
       
         delete itrA;
         delete itrB;        
@@ -458,54 +533,74 @@ void *diskMergeThread(void*arg)
 
         merge_count++;        
         //update the current optimal R value
-        *(a->r_i) = std::max(MIN_R, sqrt( (npages * 1.0) / (ltable->max_c0_size/PAGE_SIZE) ) );
+        *(a->r_i) = std::max(MIN_R, sqrt( (stats.bytes_out * 1.0) / (ltable->max_c0_size) ) );
         
-        printf("dmt:\tmerge_count %d\t#written pages: %lld\n optimal r %.2f", merge_count, npages, *(a->r_i));
+        DEBUG("dmt:\tmerge_count %lld\t#written bytes: %lld\n optimal r %.2f", stats.merge_count, stats.bytes_out, *(a->r_i));
         // 10: C2 is never to big
         ltable->set_tree_c2(c2_prime);
 
-        printf("dmt:\tUpdated C2's position on disk to %lld\n",(long long)-1);
+        DEBUG("dmt:\tUpdated C2's position on disk to %lld\n",(long long)-1);
         // 13
         ltable->update_persistent_header(xid);
         Tcommit(xid);
         
         unlock(ltable->header_lock);
+
+        gettimeofday(&stats.done, 0);
+        merge_stats_pp(stdout, stats);
+
     }
     return 0;
 }
 
+
+
 template 
-int64_t merge_iterators(int xid,
+void merge_iterators(int xid,
                         ITA *itrA, //iterator on c1 or c2
                         ITB *itrB, //iterator on c0 or c1, respectively
                         logtable *ltable,
-                        diskTreeComponent *scratch_tree,
-                        int64_t &npages,
+                        diskTreeComponent *scratch_tree, merge_stats_t *stats,
                         bool dropDeletes  // should be true iff this is biggest component
                         )
 {
-    int64_t dpages = 0;
-    int64_t ntuples = 0;
+	stats->bytes_out = 0;
+	stats->num_tuples_out = 0;
+	stats->bytes_in_small = 0;
+	stats->num_tuples_in_small = 0;
+	stats->bytes_in_large = 0;
+	stats->num_tuples_in_large = 0;
+
     DataPage *dp = 0;
 
     datatuple *t1 = itrA->getnext();
+    if(t1) {
+		stats->num_tuples_in_large++;
+		stats->bytes_in_large += t1->byte_length();
+    }
     datatuple *t2 = 0;
     
     while( (t2=itrB->getnext()) != 0)
     {        
+        stats->num_tuples_in_small++;
+        stats->bytes_in_small += t2->byte_length();
+
         DEBUG("tuple\t%lld: keylen %d datalen %d\n",
                ntuples, *(t2->keylen),*(t2->datalen) );        
 
         while(t1 != 0 && datatuple::compare(t1->key(), t1->keylen(), t2->key(), t2->keylen()) < 0) // t1 is less than t2
         {
             //insert t1
-            dp = insertTuple(xid, dp, t1, ltable, scratch_tree,
-                             dpages, npages);
+            dp = insertTuple(xid, dp, t1, ltable, scratch_tree, stats);
 
             datatuple::freetuple(t1);
-            ntuples++;      
+            stats->num_tuples_out++;
             //advance itrA
             t1 = itrA->getnext();
+            if(t1) {
+            	stats->num_tuples_in_large++;
+            	stats->bytes_in_large += t1->byte_length();
+            }
         }
 
         if(t1 != 0 && datatuple::compare(t1->key(), t1->keylen(), t2->key(), t2->keylen()) == 0)
@@ -514,41 +609,44 @@ int64_t merge_iterators(int xid,
             
             //insert merged tuple, drop deletes
             if(dropDeletes && !mtuple->isDelete())
-                dp = insertTuple(xid, dp, mtuple, ltable, scratch_tree,
-                                 dpages, npages);
+                dp = insertTuple(xid, dp, mtuple, ltable, scratch_tree, stats);
             
             datatuple::freetuple(t1);
             t1 = itrA->getnext();  //advance itrA
+            if(t1) {
+            	stats->num_tuples_in_large++;
+            	stats->bytes_in_large += t1->byte_length();
+            }
             datatuple::freetuple(mtuple);
         }
         else
         {        
             //insert t2
-            dp = insertTuple(xid, dp, t2, ltable, scratch_tree,
-                             dpages, npages);
+            dp = insertTuple(xid, dp, t2, ltable, scratch_tree, stats);
             // cannot free any tuples here; they may still be read through a lookup
         }
 
         datatuple::freetuple(t2);
-        ntuples++;
+        stats->num_tuples_out++;
     }
 
     while(t1 != 0) // t1 is less than t2
         {
-            dp = insertTuple(xid, dp, t1, ltable, scratch_tree,
-                             dpages, npages);
+            dp = insertTuple(xid, dp, t1, ltable, scratch_tree, stats);
 
             datatuple::freetuple(t1);
-            ntuples++;      
+            stats->num_tuples_out++;
             //advance itrA
             t1 = itrA->getnext();
+            if(t1) {
+            	stats->num_tuples_in_large++;
+            	stats->bytes_in_large += t1->byte_length();
+            }
         }
 
     if(dp!=NULL)
         delete dp;
     DEBUG("dpages: %d\tnpages: %d\tntuples: %d\n", dpages, npages, ntuples);
-    
-    return dpages;
 
 }
 
@@ -556,20 +654,19 @@ int64_t merge_iterators(int xid,
 inline DataPage*
 insertTuple(int xid, DataPage *dp, datatuple *t,
             logtable *ltable,
-            diskTreeComponent * ltree,
-            int64_t &dpages, int64_t &npages)
+            diskTreeComponent * ltree, merge_stats_t * stats)
 {
     if(dp==0)
     {
         dp = ltable->insertTuple(xid, t, ltree);
-        dpages++;
+        stats->num_datapages_out++;
     }
     else if(!dp->append(t))
     {
-        npages += dp->get_page_count();
+    	stats->bytes_out += (PAGE_SIZE * dp->get_page_count());
         delete dp;
         dp = ltable->insertTuple(xid, t, ltree);
-        dpages++;
+        stats->num_datapages_out++;
     }
 
     return dp;    
diff --git a/merger.h b/merger.h
index 2683be3..eaf8e8c 100644
--- a/merger.h
+++ b/merger.h
@@ -70,15 +70,6 @@ public:
     void shutdown();
 };
 
-template 
-int64_t merge_iterators(int xid,
-                    ITA *itrA,
-                    ITB *itrB,
-                    logtable *ltable,
-                    diskTreeComponent *scratch_tree,
-                    int64_t &npages,
-                    bool dropDeletes);
-
 void* memMergeThread(void* arg);
 void* diskMergeThread(void* arg);
 
diff --git a/tuplemerger.cpp b/tuplemerger.cpp
index 2324687..ed31a1d 100644
--- a/tuplemerger.cpp
+++ b/tuplemerger.cpp
@@ -1,9 +1,8 @@
 #include "tuplemerger.h"
 #include "logstore.h"
 
-// XXX make the imputs 'const'
 // XXX test / reason about this...
-datatuple* tuplemerger::merge(datatuple *t1, datatuple *t2)
+datatuple* tuplemerger::merge(const datatuple *t1, const datatuple *t2)
 {
     datatuple *t;
     if(t1->isDelete() && t2->isDelete()) {
@@ -31,7 +30,7 @@ datatuple* tuplemerger::merge(datatuple *t1, datatuple *t2)
  * deletes are handled by the tuplemerger::merge function
  * so here neither t1 nor t2 is a delete datatuple
  **/
-datatuple* append_merger(datatuple *t1, datatuple *t2)
+datatuple* append_merger(const datatuple *t1, const datatuple *t2)
 {
 	assert(!(t1->isDelete() || t2->isDelete()));
     len_t keylen = t1->keylen();
@@ -49,7 +48,7 @@ datatuple* append_merger(datatuple *t1, datatuple *t2)
  * deletes are handled by the tuplemerger::merge function
  * so here neither t1 nor t2 is a delete datatuple
  **/
-datatuple* replace_merger(datatuple *t1, datatuple *t2)
+datatuple* replace_merger(const datatuple *t1, const datatuple *t2)
 {
 	return t2->create_copy();
 }
diff --git a/tuplemerger.h b/tuplemerger.h
index b8314ba..abc5138 100644
--- a/tuplemerger.h
+++ b/tuplemerger.h
@@ -3,11 +3,10 @@
 
 struct datatuple;
 
-typedef datatuple* (*merge_fn_t) (datatuple*, datatuple *);
+typedef datatuple* (*merge_fn_t) (const datatuple*, const datatuple *);
 
-datatuple* append_merger(datatuple *t1, datatuple *t2);
-
-datatuple* replace_merger(datatuple *t1, datatuple *t2);
+datatuple* append_merger(const datatuple *t1, const datatuple *t2);
+datatuple* replace_merger(const datatuple *t1, const datatuple *t2);
 
 
 class tuplemerger
@@ -21,7 +20,7 @@ public:
         }
 
     
-    datatuple* merge(datatuple *t1, datatuple *t2);
+    datatuple* merge(const datatuple *t1, const datatuple *t2);
 
 private: