2010-05-19 23:42:06 +00:00
/*
* mergeManager . cpp
*
* Created on : May 19 , 2010
* Author : sears
*/
# include "mergeManager.h"
# include "mergeStats.h"
# include "logstore.h"
# include "math.h"
2010-05-26 00:58:17 +00:00
mergeStats * mergeManager : : get_merge_stats ( int mergeLevel ) {
2010-05-19 23:42:06 +00:00
if ( mergeLevel = = 0 ) {
return c0 ;
} else if ( mergeLevel = = 1 ) {
return c1 ;
} else if ( mergeLevel = = 2 ) {
return c2 ;
} else {
abort ( ) ;
}
}
mergeManager : : ~ mergeManager ( ) {
pthread_mutex_destroy ( & mut ) ;
pthread_mutex_destroy ( & throttle_mut ) ;
pthread_mutex_destroy ( & dummy_throttle_mut ) ;
pthread_cond_destroy ( & dummy_throttle_cond ) ;
2010-05-27 23:15:24 +00:00
pthread_cond_destroy ( & throttle_wokeup_cond ) ;
2010-05-19 23:42:06 +00:00
delete c0 ;
delete c1 ;
delete c2 ;
}
2010-06-02 21:47:58 +00:00
void mergeManager : : new_merge ( int mergeLevel ) {
mergeStats * s = get_merge_stats ( mergeLevel ) ;
2010-05-19 23:42:06 +00:00
pthread_mutex_lock ( & mut ) ;
if ( s - > merge_level = = 0 ) {
2010-05-26 00:58:17 +00:00
// target_size was set during startup
2010-05-19 23:42:06 +00:00
} else if ( s - > merge_level = = 1 ) {
2010-05-26 00:58:17 +00:00
assert ( c0 - > target_size ) ;
2010-05-28 21:20:26 +00:00
c1 - > target_size = ( pageid_t ) ( * ltable - > R ( ) * ( double ) c0 - > target_size ) ;
2010-05-26 00:58:17 +00:00
assert ( c1 - > target_size ) ;
2010-05-19 23:42:06 +00:00
} else if ( s - > merge_level = = 2 ) {
2010-05-26 00:58:17 +00:00
// target_size is infinity...
2010-05-19 23:42:06 +00:00
} else { abort ( ) ; }
pthread_mutex_unlock ( & mut ) ;
2010-06-02 21:47:58 +00:00
s - > new_merge2 ( ) ;
2010-05-19 23:42:06 +00:00
}
void mergeManager : : set_c0_size ( int64_t size ) {
2010-05-26 00:58:17 +00:00
c0 - > target_size = size ;
2010-05-19 23:42:06 +00:00
}
2010-05-21 23:43:17 +00:00
/**
* This function is invoked periodically by the merge threads . It updates mergeManager ' s statistics , and applies
* backpressure as necessary .
*
* Here is the backpressure algorithm .
*
* We want to maintain these two invariants :
* - for each byte consumed by the app - > c0 threads , a byte is consumed by the c0 - > c1 merge thread .
* - for each byte consumed by the c0 - > c1 thread , the c1 - > c2 thread consumes a byte
*
* More concretely ( and taking into account varying values of R ) :
* capacity ( C_i ) - current_size ( C_i ) > = size ( C_i_mergeable ) - bytes_consumed_by_next_merger
*
* where :
* capacity c0 = c0_queue_size
* capacity c1 = c1_queue_size
*
* current_size ( c_i ) = sum ( bytes_out_delta ) - sum ( bytes_in_large_delta )
*
* bytes_consumed_by_merger = sum ( bytes_in_small_delta )
*/
2010-05-26 00:58:17 +00:00
void mergeManager : : tick ( mergeStats * s , bool block ) {
2010-06-02 21:47:58 +00:00
# define PRINT_SKIP 20
2010-06-03 00:12:31 +00:00
pageid_t tick_length_bytes = 64 * 1024 ; // probably lower than it could be for production machines. 256KB leads to whining on my dev box.
2010-05-26 00:58:17 +00:00
2010-06-02 21:47:58 +00:00
// if(s->bytes_in_small_delta > tick_length_bytes) {
pageid_t new_current_size = s - > base_size + s - > bytes_out - s - > bytes_in_large ;
2010-05-26 00:58:17 +00:00
2010-06-02 21:47:58 +00:00
if ( ( ( ! block ) & & ( new_current_size - s - > current_size > tick_length_bytes ) ) | |
( block & & s - > bytes_in_small_delta > tick_length_bytes ) ) { // other than R, these are protected by a mutex, but this is the only thread that can write them
s - > current_size = new_current_size ; // s->base_size + s->bytes_out - s->bytes_in_large;
2010-05-26 00:58:17 +00:00
if ( block ) {
2010-05-27 23:15:24 +00:00
while ( sleeping [ s - > merge_level ] ) {
rwlc_cond_wait ( & throttle_wokeup_cond , ltable - > header_mut ) ;
}
2010-05-26 00:58:17 +00:00
struct timeval now ;
gettimeofday ( & now , 0 ) ;
double elapsed_delta = tv_to_double ( & now ) - ts_to_double ( & s - > last_tick ) ;
double bps = 0 ; // = (double)s->bytes_in_small_delta / (double)elapsed_delta;
s - > lifetime_elapsed + = elapsed_delta ;
s - > lifetime_consumed + = s - > bytes_in_small_delta ;
double decay = 0.9999 ; // XXX set this in some principled way. Surely, it should depend on tick_length (once that's working...)
s - > window_elapsed = ( decay * s - > window_elapsed ) + elapsed_delta ;
s - > window_consumed = ( decay * s - > window_consumed ) + s - > bytes_in_small_delta ;
double_to_ts ( & s - > last_tick , tv_to_double ( & now ) ) ;
s - > bytes_in_small_delta = 0 ;
int64_t overshoot = 0 ;
2010-05-26 23:53:21 +00:00
int64_t raw_overshoot = 0 ;
2010-05-28 21:20:26 +00:00
int64_t overshoot_fudge = ( int64_t ) ( ( ( ( ( double ) s - > current_size ) / ( double ) ( s - > target_size ) ) - 0.5 ) * 2.0 * 1024.0 * 1024.0 ) ; // XXX set based on avg / max tuple size?
2010-05-26 00:58:17 +00:00
int spin = 0 ;
double total_sleep = 0.0 ;
do {
2010-05-26 23:53:21 +00:00
overshoot = 0 ;
raw_overshoot = 0 ;
2010-05-28 21:20:26 +00:00
double c0_out_progress , c0_c1_out_progress ;
if ( c0 - > mergeable_size ) {
c0_out_progress = ( ( double ) c0 - > current_size ) / ( double ) c0 - > target_size ;
} else {
c0_out_progress = 0 ; // don't throttle if our consumer is blocked on us.
}
if ( c1 - > mergeable_size ) {
c0_c1_out_progress = ( ( double ) c1 - > current_size ) / ( double ) c1 - > target_size ;
} else {
c0_c1_out_progress = 0 ;
}
double c0_c1_in_progress , c1_c2_in_progress ;
if ( c1 - > active ) {
c0_c1_in_progress = ( ( double ) ( c1 - > bytes_in_large + c1 - > bytes_in_small ) ) / ( double ) ( c0 - > mergeable_size + c1 - > base_size ) ;
} else {
c0_c1_in_progress = 0 ; // if our consumer is not active, it hasn't made any progress on our most recent output
}
if ( c2 - > active ) {
c1_c2_in_progress = ( ( double ) ( c2 - > bytes_in_large + c2 - > bytes_in_small ) ) / ( double ) ( c1 - > mergeable_size + c2 - > base_size ) ;
} else {
c1_c2_in_progress = 0 ;
}
2010-05-26 00:58:17 +00:00
double c0_c1_bps = c1 - > window_consumed / c1 - > window_elapsed ;
double c1_c2_bps = c2 - > window_consumed / c2 - > window_elapsed ;
if ( s - > merge_level = = 0 ) {
2010-05-28 21:20:26 +00:00
if ( ! ( c1 - > active & & c0 - > mergeable_size ) ) { overshoot_fudge = 0 ; }
raw_overshoot = ( int64_t ) ( ( ( double ) c0 - > target_size ) * ( c0_out_progress - c0_c1_in_progress ) ) ;
2010-05-26 23:53:21 +00:00
overshoot = raw_overshoot + overshoot_fudge ;
2010-05-26 00:58:17 +00:00
bps = c0_c1_bps ;
} else if ( s - > merge_level = = 1 ) {
2010-05-28 21:20:26 +00:00
if ( ! ( c2 - > active & & c1 - > mergeable_size ) ) { overshoot_fudge = 0 ; }
raw_overshoot = ( int64_t ) ( ( ( double ) c1 - > target_size ) * ( c0_c1_out_progress - c1_c2_in_progress ) ) ;
2010-05-26 23:53:21 +00:00
overshoot = raw_overshoot + overshoot_fudge ;
2010-05-26 00:58:17 +00:00
bps = c1_c2_bps ;
2010-05-19 23:42:06 +00:00
}
2010-05-26 00:58:17 +00:00
2010-05-28 21:20:26 +00:00
//#define PP_THREAD_INFO
2010-05-26 00:58:17 +00:00
# ifdef PP_THREAD_INFO
2010-05-28 21:20:26 +00:00
printf ( " \n Merge thread %d %6f %6f Overshoot: raw=%lld, d=%lld eff=%lld Throttle min(1, %6f) spin %d, total_sleep %6.3f \n " , s - > merge_level , c0_out_progress , c0_c1_in_progress , raw_overshoot , overshoot_fudge , overshoot , - 1.0 , spin , total_sleep ) ;
2010-05-26 00:58:17 +00:00
# endif
2010-06-02 21:47:58 +00:00
if ( s - > print_skipped = = PRINT_SKIP ) {
2010-05-26 00:58:17 +00:00
pretty_print ( stdout ) ;
2010-06-02 21:47:58 +00:00
s - > print_skipped = 0 ;
2010-05-27 23:15:24 +00:00
} else {
2010-06-02 21:47:58 +00:00
s - > print_skipped + + ;
2010-05-27 23:15:24 +00:00
}
2010-05-26 00:58:17 +00:00
if ( overshoot > 0 ) {
// throttle
// it took "elapsed" seconds to process "tick_length_bytes" mb
double sleeptime = 2.0 * ( double ) overshoot / bps ;
struct timespec sleep_until ;
double max_c0_sleep = 0.1 ;
double max_c1_sleep = 0.1 ;
double max_sleep = s - > merge_level = = 0 ? max_c0_sleep : max_c1_sleep ;
2010-05-26 23:53:21 +00:00
if ( sleeptime < 0.1 ) { sleeptime = 0.1 ; }
2010-05-26 00:58:17 +00:00
if ( sleeptime > max_sleep ) { sleeptime = max_sleep ; }
spin + + ;
total_sleep + = sleeptime ;
if ( ( spin > 20 ) | | ( total_sleep > ( max_sleep * 10 ) ) ) {
2010-05-28 21:20:26 +00:00
printf ( " \n Merge thread %d Overshoot: raw=%lld, d=%lld eff=%lld Throttle min(1, %6f) spin %d, total_sleep %6.3f \n " , s - > merge_level , raw_overshoot , overshoot_fudge , overshoot , sleeptime , spin , total_sleep ) ;
2010-05-26 00:58:17 +00:00
}
double_to_ts ( & sleep_until , sleeptime + tv_to_double ( & now ) ) ;
2010-05-27 23:15:24 +00:00
sleeping [ s - > merge_level ] = true ;
2010-05-27 01:49:27 +00:00
rwlc_cond_timedwait ( & dummy_throttle_cond , ltable - > header_mut , & sleep_until ) ;
2010-05-27 23:15:24 +00:00
sleeping [ s - > merge_level ] = false ;
pthread_cond_broadcast ( & throttle_wokeup_cond ) ;
2010-05-26 00:58:17 +00:00
gettimeofday ( & now , 0 ) ;
2010-05-21 23:43:17 +00:00
}
2010-05-26 23:53:21 +00:00
} while ( ( overshoot > 0 ) & & ( raw_overshoot > 0 ) ) ;
2010-05-26 00:58:17 +00:00
} else {
2010-06-02 21:47:58 +00:00
if ( s - > print_skipped = = PRINT_SKIP ) {
2010-05-27 23:15:24 +00:00
pretty_print ( stdout ) ;
2010-06-02 21:47:58 +00:00
s - > print_skipped = 0 ;
2010-05-27 23:15:24 +00:00
} else {
2010-06-02 21:47:58 +00:00
s - > print_skipped + + ;
2010-05-27 23:15:24 +00:00
}
2010-05-26 00:58:17 +00:00
}
// pthread_mutex_unlock(&mut);
2010-05-19 23:42:06 +00:00
}
}
2010-06-02 21:47:58 +00:00
void mergeManager : : read_tuple_from_small_component ( int merge_level , datatuple * tup ) {
if ( tup ) {
mergeStats * s = get_merge_stats ( merge_level ) ;
( s - > num_tuples_in_small ) + + ;
( s - > bytes_in_small_delta ) + = tup - > byte_length ( ) ;
( s - > bytes_in_small ) + = tup - > byte_length ( ) ;
tick ( s , true ) ;
}
}
void mergeManager : : wrote_tuple ( int merge_level , datatuple * tup ) {
mergeStats * s = get_merge_stats ( merge_level ) ;
( s - > num_tuples_out ) + + ;
( s - > bytes_out ) + = tup - > byte_length ( ) ;
// XXX this just updates stat's current size, and (perhaps) does a pretty print. It should not need a mutex.
tick ( s , false ) ;
}
void mergeManager : : finished_merge ( int merge_level ) {
get_merge_stats ( merge_level ) - > active = false ;
if ( merge_level ! = 0 ) {
get_merge_stats ( merge_level - 1 ) - > mergeable_size = 0 ;
}
gettimeofday ( & get_merge_stats ( merge_level ) - > done , 0 ) ;
}
2010-05-21 23:43:17 +00:00
mergeManager : : mergeManager ( logtable < datatuple > * ltable ) :
2010-05-19 23:42:06 +00:00
ltable ( ltable ) ,
2010-06-02 21:47:58 +00:00
c0 ( new mergeStats ( 0 , ltable - > max_c0_size ) ) ,
c1 ( new mergeStats ( 1 , ( int64_t ) ( ( ( double ) ltable - > max_c0_size ) * * ltable - > R ( ) ) ) ) ,
c2 ( new mergeStats ( 2 , 0 ) ) {
2010-05-19 23:42:06 +00:00
pthread_mutex_init ( & mut , 0 ) ;
pthread_mutex_init ( & throttle_mut , 0 ) ;
pthread_mutex_init ( & dummy_throttle_mut , 0 ) ;
pthread_cond_init ( & dummy_throttle_cond , 0 ) ;
2010-05-27 23:15:24 +00:00
pthread_cond_init ( & throttle_wokeup_cond , 0 ) ;
2010-05-19 23:42:06 +00:00
struct timeval tv ;
gettimeofday ( & tv , 0 ) ;
2010-05-27 23:15:24 +00:00
sleeping [ 0 ] = false ;
sleeping [ 1 ] = false ;
sleeping [ 2 ] = false ;
2010-05-21 23:43:17 +00:00
double_to_ts ( & c0 - > last_tick , tv_to_double ( & tv ) ) ;
double_to_ts ( & c1 - > last_tick , tv_to_double ( & tv ) ) ;
double_to_ts ( & c2 - > last_tick , tv_to_double ( & tv ) ) ;
2010-05-19 23:42:06 +00:00
}
void mergeManager : : pretty_print ( FILE * out ) {
pageid_t mb = 1024 * 1024 ;
logtable < datatuple > * lt = ( logtable < datatuple > * ) ltable ;
bool have_c0 = false ;
bool have_c0m = false ;
bool have_c1 = false ;
bool have_c1m = false ;
bool have_c2 = false ;
if ( lt ) {
2010-05-26 00:58:17 +00:00
// pthread_mutex_lock(<->header_mut);
2010-05-19 23:42:06 +00:00
have_c0 = NULL ! = lt - > get_tree_c0 ( ) ;
have_c0m = NULL ! = lt - > get_tree_c0_mergeable ( ) ;
have_c1 = NULL ! = lt - > get_tree_c1 ( ) ;
have_c1m = NULL ! = lt - > get_tree_c1_mergeable ( ) ;
have_c2 = NULL ! = lt - > get_tree_c2 ( ) ;
2010-05-26 00:58:17 +00:00
// pthread_mutex_unlock(<->header_mut);
2010-05-19 23:42:06 +00:00
}
2010-05-26 00:58:17 +00:00
2010-05-26 23:53:21 +00:00
double c0_out_progress = 100.0 * c0 - > current_size / c0 - > target_size ;
2010-05-28 21:20:26 +00:00
double c0_c1_in_progress = 100.0 * ( c1 - > bytes_in_large + c1 - > bytes_in_small ) / ( c0 - > mergeable_size + c1 - > base_size ) ;
2010-05-26 23:53:21 +00:00
double c0_c1_out_progress = 100.0 * c1 - > current_size / c1 - > target_size ;
2010-05-28 21:20:26 +00:00
double c1_c2_progress = 100.0 * ( c2 - > bytes_in_large + c2 - > bytes_in_small ) / ( c1 - > mergeable_size + c2 - > base_size ) ;
2010-05-26 00:58:17 +00:00
2010-05-26 23:53:21 +00:00
assert ( ( ! c1 - > active ) | | ( c0_c1_in_progress > = - 1 & & c0_c1_in_progress < 102 ) ) ;
2010-05-26 00:58:17 +00:00
assert ( ( ! c2 - > active ) | | ( c1_c2_progress > = - 1 & & c1_c2_progress < 102 ) ) ;
2010-05-26 23:53:21 +00:00
fprintf ( out , " [merge progress MB/s window (lifetime)]: app [%s %6lldMB ~ %3.0f%% %6.1fsec %4.1f (%4.1f)] %s %s [%s %3.0f%% ~ %3.0f%% %4.1f (%4.1f)] %s %s [%s %3.0f%% %4.1f (%4.1f)] %s " ,
c0 - > active ? " RUN " : " --- " , ( uint64_t ) ( c0 - > lifetime_consumed / mb ) , c0_out_progress , c0 - > lifetime_elapsed , c0 - > window_consumed / ( ( ( double ) mb ) * c0 - > window_elapsed ) , c0 - > lifetime_consumed / ( ( ( double ) mb ) * c0 - > lifetime_elapsed ) ,
2010-05-19 23:42:06 +00:00
have_c0 ? " C0 " : " .. " ,
have_c0m ? " C0' " : " ... " ,
2010-05-26 23:53:21 +00:00
c1 - > active ? " RUN " : " --- " , c0_c1_in_progress , c0_c1_out_progress , c1 - > window_consumed / ( ( ( double ) mb ) * c1 - > window_elapsed ) , c1 - > lifetime_consumed / ( ( ( double ) mb ) * c1 - > lifetime_elapsed ) ,
2010-05-19 23:42:06 +00:00
have_c1 ? " C1 " : " .. " ,
have_c1m ? " C1' " : " ... " ,
2010-05-26 00:58:17 +00:00
c2 - > active ? " RUN " : " --- " , c1_c2_progress , c2 - > window_consumed / ( ( ( double ) mb ) * c2 - > window_elapsed ) , c2 - > lifetime_consumed / ( ( ( double ) mb ) * c2 - > lifetime_elapsed ) ,
2010-05-19 23:42:06 +00:00
have_c2 ? " C2 " : " .. " ) ;
2010-06-02 21:47:58 +00:00
//#define PP_SIZES
# ifdef PP_SIZES
2010-05-26 00:58:17 +00:00
fprintf ( out , " [size in small/large, out, mergeable] C0 %4lld %4lld %4lld %4lld %4lld %4lld " ,
c0 - > target_size / mb , c0 - > current_size / mb , c0 - > bytes_in_small / mb ,
c0 - > bytes_in_large / mb , c0 - > bytes_out / mb , c0 - > mergeable_size / mb ) ;
fprintf ( out , " C1 %4lld %4lld %4lld %4lld %4lld %4lld " ,
c1 - > target_size / mb , c1 - > current_size / mb , c1 - > bytes_in_small / mb ,
c1 - > bytes_in_large / mb , c1 - > bytes_out / mb , c1 - > mergeable_size / mb ) ;
fprintf ( out , " C2 ---- %4lld %4lld %4lld %4lld %4lld " ,
/*----*/ c2 - > current_size / mb , c2 - > bytes_in_small / mb ,
c2 - > bytes_in_large / mb , c2 - > bytes_out / mb , c2 - > mergeable_size / mb ) ;
# endif
2010-05-21 23:43:17 +00:00
// fprintf(out, "Throttle: %6.1f%% (cur) %6.1f%% (overall) ", 100.0*(last_throttle_seconds/(last_elapsed_seconds)), 100.0*(throttle_seconds/(elapsed_seconds)));
// fprintf(out, "C0 size %4lld resident %4lld ",
// 2*c0_queueSize/mb,
// (c0->bytes_out - c0->bytes_in_large)/mb);
// fprintf(out, "C1 size %4lld resident %4lld\r",
// 2*c1_queueSize/mb,
// (c1->bytes_out - c1->bytes_in_large)/mb);
// fprintf(out, "C2 size %4lld\r",
// 2*c2_queueSize/mb);
// fprintf(out, "C1 MB/s (eff; active) %6.1f C2 MB/s %6.1f\r",
// ((double)c1_totalConsumed)/((double)c1_totalWorktime),
// ((double)c2_totalConsumed)/((double)c2_totalWorktime));
2010-05-19 23:42:06 +00:00
fflush ( out ) ;
2010-05-21 23:43:17 +00:00
fprintf ( out , " \r " ) ;
2010-05-19 23:42:06 +00:00
}