2010-04-29 01:03:56 +00:00
/*
* mergeStats . h
*
2012-01-19 16:49:54 +00:00
* Copyright 2010 - 2012 Yahoo ! Inc .
*
* Licensed under the Apache License , Version 2.0 ( the " License " ) ;
* you may not use this file except in compliance with the License .
* You may obtain a copy of the License at
*
* http : //www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing , software
* distributed under the License is distributed on an " AS IS " BASIS ,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
* See the License for the specific language governing permissions and
* limitations under the License .
*
2010-04-29 01:03:56 +00:00
* Created on : Apr 27 , 2010
* Author : sears
*/
# ifndef MERGESTATS_H_
# define MERGESTATS_H_
# include <stasis/common.h>
2010-12-13 22:27:13 +00:00
# define EXTENDED_STATS 1
2010-05-19 23:42:06 +00:00
# include <sys/time.h>
# include <stdio.h>
2012-02-23 01:25:24 +00:00
# include "dataTuple.h"
# include "dataPage.h"
2010-06-02 21:47:58 +00:00
# include <mergeManager.h> // XXX for double_to_ts, etc... create a util class.
2010-04-29 23:13:04 +00:00
2010-12-14 00:06:32 +00:00
# include <stasis/transactional.h>
2010-05-19 23:42:06 +00:00
class mergeStats {
2010-12-17 18:48:26 +00:00
private :
void init_helper ( void ) {
# if EXTENDED_STATS
gettimeofday ( & stats_sleep , 0 ) ;
gettimeofday ( & stats_start , 0 ) ;
gettimeofday ( & stats_done , 0 ) ;
struct timeval last ;
gettimeofday ( & last , 0 ) ;
mergeManager : : double_to_ts ( & stats_last_tick , mergeManager : : tv_to_double ( & last ) ) ;
# endif
}
2011-01-11 22:07:39 +00:00
pageid_t rb_size_estimator ( pageid_t num_bytes , pageid_t num_tuples ) {
// Experimentally determined numbers
// pageid_t small_tup_est = num_bytes + 110L * num_tuples;
// pageid_t big_tup_est = num_bytes + (2L * num_bytes) / 100L;
// Pessimistic numbers
pageid_t small_tup_est = num_bytes + 220L * num_tuples ;
pageid_t big_tup_est = num_bytes + ( 4L * num_bytes ) / 100L ;
return big_tup_est > small_tup_est ? big_tup_est : small_tup_est ; // max
}
2010-04-29 23:13:04 +00:00
public :
2010-06-02 21:47:58 +00:00
mergeStats ( int merge_level , int64_t target_size ) :
2010-04-29 23:13:04 +00:00
merge_level ( merge_level ) ,
2010-05-26 00:58:17 +00:00
base_size ( 0 ) ,
2010-06-21 20:03:05 +00:00
mergeable_size ( 0 ) ,
2010-05-28 21:20:26 +00:00
target_size ( target_size ) ,
2010-04-29 23:13:04 +00:00
bytes_out ( 0 ) ,
bytes_in_small ( 0 ) ,
bytes_in_large ( 0 ) ,
2011-01-11 19:24:16 +00:00
num_tuples_out ( 0 ) ,
num_tuples_in_small ( 0 ) ,
num_tuples_in_large ( 0 ) ,
2010-05-26 00:58:17 +00:00
just_handed_off ( false ) ,
2010-06-05 00:41:52 +00:00
delta ( 0 ) ,
2010-06-18 23:00:23 +00:00
need_tick ( 0 ) ,
2010-06-05 00:41:52 +00:00
in_progress ( 0 ) ,
out_progress ( 0 ) ,
2010-12-13 22:27:13 +00:00
active ( false )
# if EXTENDED_STATS
,
stats_merge_count ( 0 ) ,
stats_bytes_out_with_overhead ( 0 ) ,
stats_num_datapages_out ( 0 ) ,
stats_bytes_in_small_delta ( 0 ) ,
stats_lifetime_elapsed ( 0 ) ,
2010-12-17 18:48:26 +00:00
stats_lifetime_active ( 0 ) ,
stats_elapsed ( 0 ) ,
stats_active ( 0 ) ,
2010-12-13 22:27:13 +00:00
stats_lifetime_consumed ( 0 ) ,
stats_bps ( 10.0 * 1024.0 * 1024.0 )
# endif // EXTENDED_STATS
{
2010-12-17 18:48:26 +00:00
init_helper ( ) ;
2010-04-29 01:03:56 +00:00
}
2010-12-14 00:06:32 +00:00
mergeStats ( int xid , recordid rid ) {
marshalled_header h ;
Tread ( xid , rid , & h ) ;
merge_level = h . merge_level ;
base_size = h . base_size ;
mergeable_size = h . mergeable_size ;
target_size = h . target_size ;
bytes_out = base_size ;
bytes_in_small = 0 ;
bytes_in_large = 0 ;
2011-01-11 19:24:16 +00:00
num_tuples_out = 0 ;
num_tuples_in_small = 0 ;
num_tuples_in_large = 0 ;
2010-12-14 00:06:32 +00:00
just_handed_off = false ;
delta = 0 ;
need_tick = 0 ;
in_progress = 0 ;
out_progress = ( ( double ) base_size ) / ( double ) target_size ;
active = false ;
2010-12-17 18:48:26 +00:00
# if EXTENDED_STATS
stats_merge_count = 0 ;
stats_bytes_out_with_overhead = 0 ;
stats_num_datapages_out = 0 ;
stats_bytes_in_small_delta = 0 ;
stats_lifetime_elapsed = 0 ;
stats_lifetime_active = 0 ;
stats_elapsed = 0 ;
stats_active = 0 ;
stats_lifetime_consumed = 0 ;
stats_bps = 10.0 * 1024.0 * 1024.0 ;
# endif
init_helper ( ) ;
2010-12-14 00:06:32 +00:00
}
recordid talloc ( int xid ) {
return Talloc ( xid , sizeof ( marshalled_header ) ) ;
}
void marshal ( int xid , recordid rid ) {
marshalled_header h ;
h . merge_level = merge_level ;
h . base_size = base_size ;
h . mergeable_size = mergeable_size ;
h . target_size = h . target_size ;
Tset ( xid , rid , & h ) ;
}
2010-12-11 00:51:19 +00:00
~ mergeStats ( ) { }
2010-06-02 21:47:58 +00:00
void new_merge2 ( ) {
2010-05-26 00:58:17 +00:00
if ( just_handed_off ) {
bytes_out = 0 ;
2010-06-05 00:41:52 +00:00
out_progress = 0 ;
2010-05-26 00:58:17 +00:00
just_handed_off = false ;
}
base_size = bytes_out ;
bytes_out = 0 ;
2010-04-29 23:13:04 +00:00
bytes_in_small = 0 ;
2010-05-26 00:58:17 +00:00
bytes_in_large = 0 ;
2011-01-11 19:24:16 +00:00
num_tuples_out = 0 ;
num_tuples_in_small = 0 ;
num_tuples_in_large = 0 ;
2010-06-05 00:41:52 +00:00
in_progress = 0 ;
2010-12-13 22:27:13 +00:00
# if EXTENDED_STATS
stats_merge_count + + ;
stats_bytes_out_with_overhead = 0 ;
stats_num_datapages_out = 0 ;
stats_bytes_in_small_delta = 0 ;
# endif
2010-04-29 01:03:56 +00:00
}
2010-04-29 23:13:04 +00:00
void starting_merge ( ) {
2010-05-12 22:16:41 +00:00
active = true ;
2010-12-13 22:27:13 +00:00
# if EXTENDED_STATS
gettimeofday ( & stats_start , 0 ) ;
struct timeval last ;
2010-05-12 22:16:41 +00:00
gettimeofday ( & last , 0 ) ;
2010-12-13 22:27:13 +00:00
mergeManager : : double_to_ts ( & stats_last_tick , mergeManager : : tv_to_double ( & last ) ) ;
# endif
2010-04-29 23:13:04 +00:00
}
2010-12-15 00:15:59 +00:00
pageid_t get_current_size ( ) {
if ( merge_level = = 0 ) {
2011-01-11 22:07:39 +00:00
return rb_size_estimator ( base_size + bytes_in_small - bytes_in_large - bytes_out ,
/*num_tuples_base + */ num_tuples_in_small - num_tuples_in_large - num_tuples_out ) ; ;
2010-12-15 00:15:59 +00:00
} else {
// s->bytes_out has strange semantics. It's how many bytes our input has written into this tree.
return base_size + bytes_out - bytes_in_large ;
}
}
2010-05-21 23:43:17 +00:00
void handed_off_tree ( ) {
2010-05-26 00:58:17 +00:00
if ( merge_level = = 2 ) {
} else {
2010-12-15 00:15:59 +00:00
mergeable_size = get_current_size ( ) ;
2010-05-26 00:58:17 +00:00
just_handed_off = true ;
}
2010-05-21 23:43:17 +00:00
}
2012-02-23 01:11:55 +00:00
void merged_tuples ( dataTuple * merged , dataTuple * small , dataTuple * large ) {
2010-05-12 22:16:41 +00:00
}
2012-02-23 01:11:55 +00:00
void wrote_datapage ( dataPage * dp ) {
2010-12-13 22:27:13 +00:00
# if EXTENDED_STATS
stats_num_datapages_out + + ;
stats_bytes_out_with_overhead + = ( PAGE_SIZE * dp - > get_page_count ( ) ) ;
# endif
2010-04-29 23:13:04 +00:00
}
pageid_t output_size ( ) {
return bytes_out ;
}
protected :
2010-04-29 01:03:56 +00:00
double float_tv ( struct timeval & tv ) {
return ( ( double ) tv . tv_sec ) + ( ( double ) tv . tv_usec ) / 1000000.0 ;
}
2010-04-29 23:13:04 +00:00
friend class mergeManager ;
2010-06-02 21:47:58 +00:00
2010-12-14 01:49:23 +00:00
protected :
2010-12-14 00:06:32 +00:00
struct marshalled_header {
int merge_level ;
pageid_t base_size ;
pageid_t mergeable_size ;
pageid_t target_size ; // Needed?
} ;
2010-12-17 18:48:26 +00:00
public : // XXX eliminate public fields; these are still required because various bits of calculation (bloom filter size, estimated c0 run length, etc...) are managed outside of mergeManager.
int merge_level ; /// The tree component / merge level that we're tracking. 1 => C0->C1, 2 => C1->C2
pageid_t base_size ; /// size of existing tree component (c[merge_level]') at beginning of current merge.
2010-12-14 00:06:32 +00:00
protected :
2010-12-17 18:48:26 +00:00
pageid_t mergeable_size ; /// The size of c[merge_level]_mergeable, assuming it exists. Protected by mutex.
2010-12-14 00:06:32 +00:00
public :
2010-12-17 18:48:26 +00:00
pageid_t target_size ; /// How big should the c[merge_level] tree component be?
2010-12-14 23:06:21 +00:00
protected :
2010-12-17 18:48:26 +00:00
pageid_t bytes_out ; /// For C0, number of bytes consumed by downstream merger. For merge_level 1 and 2, number of bytes enqueued for the downstream (C1-C2, and nil) mergers.
2010-12-14 00:06:32 +00:00
public :
2010-12-17 18:48:26 +00:00
pageid_t bytes_in_small ; /// For C0, number of bytes inserted by application. For C1, C2, number of bytes read from small tree in C(n-1) - Cn merger.
2010-08-21 03:09:18 +00:00
protected :
2010-12-17 18:48:26 +00:00
pageid_t bytes_in_large ; /// Bytes from the large input? (for C0, bytes deleted due to updates)
2010-05-26 00:58:17 +00:00
2011-01-11 19:24:16 +00:00
pageid_t num_tuples_out ; /// How many tuples did we write? TODO Only used for C0, so not stored on disk.
pageid_t num_tuples_in_small ; /// Tuples from the small input? TODO Only used for C0, so not stored on disk.
pageid_t num_tuples_in_large ; /// Tuples from large input? TODO Only used for C0, so not stored on disk.
2010-12-17 18:48:26 +00:00
// todo: simplify confusing hand off logic, and remove this field?
2010-05-26 00:58:17 +00:00
bool just_handed_off ;
2010-12-17 18:48:26 +00:00
// These fields are used to amortize mutex acquisitions.
2010-06-05 00:41:52 +00:00
int delta ;
2010-06-18 23:00:23 +00:00
int need_tick ;
2010-12-17 18:48:26 +00:00
// todo in_progress and out_progress are derived fields. eliminate them?
2010-06-05 00:41:52 +00:00
double in_progress ;
double out_progress ;
2010-12-17 18:48:26 +00:00
bool active ; /// True if this merger is running, or blocked by rate limiting. False if the upstream input does not exist.
2010-12-13 22:27:13 +00:00
# if EXTENDED_STATS
2010-12-17 18:48:26 +00:00
pageid_t stats_merge_count ; /// This is the stats_merge_count'th merge
struct timeval stats_sleep ; /// When did we go to sleep waiting for input?
struct timeval stats_start ; /// When did we wake up and start merging? (at steady state with max throughput, this should be equal to stats_sleep)
struct timeval stats_done ; /// When did we finish merging?
2010-12-13 22:27:13 +00:00
struct timespec stats_last_tick ;
2010-12-17 18:48:26 +00:00
pageid_t stats_bytes_out_with_overhead ; /// How many bytes did we write (including internal tree nodes)?
pageid_t stats_num_datapages_out ; /// How many datapages?
pageid_t stats_bytes_in_small_delta ; /// How many bytes from the small input tree during this tick (for C0, we ignore tree overheads)?
double stats_lifetime_elapsed ; /// How long has this tree existed, in seconds?
double stats_lifetime_active ; /// How long has this tree been running (i.e.; active = true), in seconds?
double stats_elapsed ; /// How long did this merge take, including idle time (not valid until after merge is complete)?
double stats_active ; /// How long did this merge take once it started running?
double stats_lifetime_consumed ; /// How many bytes has this tree consumed from upstream mergers?
double stats_bps ; /// Effective throughput while active.
2010-12-13 22:27:13 +00:00
# endif
2010-06-17 04:49:19 +00:00
2010-04-29 23:13:04 +00:00
public :
2010-04-29 01:03:56 +00:00
void pretty_print ( FILE * fd ) {
2010-12-13 22:27:13 +00:00
# if EXTENDED_STATS
2010-12-17 18:48:26 +00:00
double sleep_time = stats_elapsed - stats_active ;
double work_time = stats_active ;
double total_time = stats_elapsed ;
double mb_out = ( ( double ) bytes_out ) / ( 1024.0 * 1024.0 ) ;
double phys_mb_out = ( ( double ) stats_bytes_out_with_overhead ) / ( 1024.0 * 1024.0 ) ;
2010-04-29 23:13:04 +00:00
double mb_ins = ( ( double ) bytes_in_small ) / ( 1024.0 * 1024.0 ) ;
double mb_inl = ( ( double ) bytes_in_large ) / ( 1024.0 * 1024.0 ) ;
2011-01-11 19:24:16 +00:00
double kt_out = ( ( double ) num_tuples_out ) / ( 1024.0 ) ;
double kt_ins = ( ( double ) num_tuples_in_small ) / ( 1024.0 ) ;
double kt_inl = ( ( double ) num_tuples_in_large ) / ( 1024.0 ) ;
2010-04-29 23:13:04 +00:00
double mb_hdd = mb_out + mb_inl + ( merge_level = = 1 ? 0.0 : mb_ins ) ;
double kt_hdd = kt_out + kt_inl + ( merge_level = = 1 ? 0.0 : kt_ins ) ;
2010-04-29 01:03:56 +00:00
2010-04-29 23:13:04 +00:00
fprintf ( fd ,
" ===================================================================== \n "
" Thread %d merge %lld: sleep %6.2f sec, run %6.2f sec \n "
" megabytes kTuples datapages MB/s (real) kTup/s (real) \n "
" Wrote %7lld %7lld %9lld " " %6.1f %6.1f " " %8.1f %8.1f " " \n "
" Read (small) %7lld %7lld - " " %6.1f %6.1f " " %8.1f %8.1f " " \n "
" Read (large) %7lld %7lld - " " %6.1f %6.1f " " %8.1f %8.1f " " \n "
" Disk %7lld %7lld - " " %6.1f %6.1f " " %8.1f %8.1f " " \n "
" ..................................................................... \n "
2010-12-17 18:48:26 +00:00
" avg tuple len: %6.2fKB w/ disk ovehead: %6.2fKB \n "
2010-04-29 23:13:04 +00:00
" effective throughput: (mb/s ; nsec/byte): (%.2f; %.2f) active " " \n "
" (%.2f; %.2f) wallclock " " \n "
" ..................................................................... \n "
,
2010-12-13 22:27:13 +00:00
merge_level , stats_merge_count ,
2010-04-29 23:13:04 +00:00
sleep_time ,
work_time ,
2010-12-13 22:27:13 +00:00
( long long ) mb_out , ( long long ) kt_out , stats_num_datapages_out , mb_out / work_time , mb_out / total_time , kt_out / work_time , kt_out / total_time ,
2010-04-29 23:13:04 +00:00
( long long ) mb_ins , ( long long ) kt_ins , mb_ins / work_time , mb_ins / total_time , kt_ins / work_time , kt_ins / total_time ,
( long long ) mb_inl , ( long long ) kt_inl , mb_inl / work_time , mb_inl / total_time , kt_inl / work_time , kt_inl / total_time ,
( long long ) mb_hdd , ( long long ) kt_hdd , mb_hdd / work_time , mb_hdd / total_time , kt_hdd / work_time , kt_hdd / total_time ,
2010-12-17 18:48:26 +00:00
mb_out / kt_out , phys_mb_out / kt_out ,
2010-04-29 23:13:04 +00:00
mb_ins / work_time , 1000.0 * work_time / mb_ins , mb_ins / total_time , 1000.0 * total_time / mb_ins
) ;
2010-12-13 22:27:13 +00:00
# endif
2010-04-29 01:03:56 +00:00
}
2010-04-29 23:13:04 +00:00
} ;
2010-04-29 01:03:56 +00:00
# endif /* MERGESTATS_H_ */