2010-01-23 02:13:59 +00:00
# include <math.h>
# include "merger.h"
2010-03-05 19:07:47 +00:00
void merge_stats_pp ( FILE * fd , merge_stats_t & stats ) {
long long sleep_time = stats . start . tv_sec - stats . sleep . tv_sec ;
long long work_time = stats . done . tv_sec - stats . start . tv_sec ;
long long total_time = sleep_time + work_time ;
double mb_out = ( ( double ) stats . bytes_out ) / ( 1024.0 * 1024.0 ) ;
double mb_ins = ( ( double ) stats . bytes_in_small ) / ( 1024.0 * 1024.0 ) ;
double mb_inl = ( ( double ) stats . bytes_in_large ) / ( 1024.0 * 1024.0 ) ;
double kt_out = ( ( double ) stats . num_tuples_out ) / ( 1024.0 ) ;
double kt_ins = ( ( double ) stats . num_tuples_in_small ) / ( 1024.0 ) ;
double kt_inl = ( ( double ) stats . num_tuples_in_large ) / ( 1024.0 ) ;
double mb_hdd = mb_out + mb_inl + ( stats . merge_level = = 1 ? 0.0 : mb_ins ) ;
double kt_hdd = kt_out + kt_inl + ( stats . merge_level = = 1 ? 0.0 : kt_ins ) ;
fprintf ( fd ,
" ===================================================================== \n "
" Thread %d merge %lld: sleep %lld sec, run %lld sec \n "
" megabytes kTuples datapages MB/s (real) kTup/s (real) \n "
" Wrote %7lld %7lld %9lld " " %6.1f %6.1f " " %8.1f %8.1f " " \n "
" Read (small) %7lld %7lld - " " %6.1f %6.1f " " %8.1f %8.1f " " \n "
" Read (large) %7lld %7lld - " " %6.1f %6.1f " " %8.1f %8.1f " " \n "
" Disk %7lld %7lld - " " %6.1f %6.1f " " %8.1f %8.1f " " \n "
" ..................................................................... \n "
" avg tuple len: %6.2fkb \n "
" effective throughput: (mb/s ; nsec/byte): (%.2f; %.2f) active " " \n "
" (%.2f; %.2f) wallclock " " \n "
" ..................................................................... \n "
,
stats . merge_level , stats . merge_count ,
sleep_time ,
work_time ,
( long long ) mb_out , ( long long ) kt_out , stats . num_datapages_out , mb_out / ( double ) work_time , mb_out / ( double ) total_time , kt_out / ( double ) work_time , kt_out / ( double ) total_time ,
( long long ) mb_ins , ( long long ) kt_ins , mb_ins / ( double ) work_time , mb_ins / ( double ) total_time , kt_ins / ( double ) work_time , kt_ins / ( double ) total_time ,
( long long ) mb_inl , ( long long ) kt_inl , mb_inl / ( double ) work_time , mb_inl / ( double ) total_time , kt_inl / ( double ) work_time , kt_inl / ( double ) total_time ,
( long long ) mb_hdd , ( long long ) kt_hdd , mb_hdd / ( double ) work_time , mb_hdd / ( double ) total_time , kt_hdd / ( double ) work_time , kt_hdd / ( double ) total_time ,
mb_out / kt_out ,
mb_ins / work_time , 1000.0 * work_time / mb_ins , mb_ins / total_time , 1000.0 * total_time / mb_ins
) ;
}
double merge_stats_nsec_to_merge_in_bytes ( merge_stats_t ) ; // how many nsec did we burn on each byte from the small tree (want this to be equal for the two mergers)
2010-01-23 02:13:59 +00:00
int merge_scheduler : : addlogtable ( logtable * ltable )
{
struct logtable_mergedata * mdata = new logtable_mergedata ;
// initialize merge data
mdata - > rbtree_mut = new pthread_mutex_t ;
pthread_mutex_init ( mdata - > rbtree_mut , 0 ) ;
2010-02-18 23:31:57 +00:00
ltable - > set_tree_c0_mergeable ( NULL ) ;
2010-01-23 02:13:59 +00:00
mdata - > input_needed = new bool ( false ) ;
mdata - > input_ready_cond = new pthread_cond_t ;
pthread_cond_init ( mdata - > input_ready_cond , 0 ) ;
mdata - > input_needed_cond = new pthread_cond_t ;
pthread_cond_init ( mdata - > input_needed_cond , 0 ) ;
mdata - > input_size = new int64_t ( 100 ) ;
2010-02-18 23:31:57 +00:00
mdata - > diskmerge_args = new merger_args ;
mdata - > memmerge_args = new merger_args ;
2010-01-23 02:13:59 +00:00
mergedata . push_back ( std : : make_pair ( ltable , mdata ) ) ;
return mergedata . size ( ) - 1 ;
}
merge_scheduler : : ~ merge_scheduler ( )
{
2010-01-27 23:34:33 +00:00
for ( size_t i = 0 ; i < mergedata . size ( ) ; i + + )
2010-01-23 02:13:59 +00:00
{
logtable * ltable = mergedata [ i ] . first ;
logtable_mergedata * mdata = mergedata [ i ] . second ;
//delete the mergedata fields
delete mdata - > rbtree_mut ;
delete mdata - > input_needed ;
delete mdata - > input_ready_cond ;
delete mdata - > input_needed_cond ;
delete mdata - > input_size ;
//delete the merge thread structure variables
pthread_cond_destroy ( mdata - > diskmerge_args - > in_block_needed_cond ) ;
delete mdata - > diskmerge_args - > in_block_needed_cond ;
delete mdata - > diskmerge_args - > in_block_needed ;
pthread_cond_destroy ( mdata - > diskmerge_args - > out_block_needed_cond ) ;
delete mdata - > diskmerge_args - > out_block_needed_cond ;
delete mdata - > diskmerge_args - > out_block_needed ;
pthread_cond_destroy ( mdata - > diskmerge_args - > in_block_ready_cond ) ;
delete mdata - > diskmerge_args - > in_block_ready_cond ;
pthread_cond_destroy ( mdata - > diskmerge_args - > out_block_ready_cond ) ;
delete mdata - > diskmerge_args - > out_block_ready_cond ;
delete mdata - > diskmerge_args ;
delete mdata - > memmerge_args ;
}
mergedata . clear ( ) ;
}
void merge_scheduler : : shutdown ( )
{
//signal shutdown
2010-01-27 23:34:33 +00:00
for ( size_t i = 0 ; i < mergedata . size ( ) ; i + + )
2010-01-23 02:13:59 +00:00
{
logtable * ltable = mergedata [ i ] . first ;
logtable_mergedata * mdata = mergedata [ i ] . second ;
//flush the in memory table to write any tuples still in memory
ltable - > flushTable ( ) ;
2010-02-17 23:38:31 +00:00
pthread_mutex_lock ( mdata - > rbtree_mut ) ;
ltable - > stop ( ) ;
2010-01-23 02:13:59 +00:00
pthread_cond_signal ( mdata - > input_ready_cond ) ;
//*(mdata->diskmerge_args->still_open)=false;//same pointer so no need
pthread_mutex_unlock ( mdata - > rbtree_mut ) ;
}
2010-01-27 23:34:33 +00:00
for ( size_t i = 0 ; i < mergedata . size ( ) ; i + + )
2010-01-23 02:13:59 +00:00
{
logtable_mergedata * mdata = mergedata [ i ] . second ;
pthread_join ( mdata - > memmerge_thread , 0 ) ;
pthread_join ( mdata - > diskmerge_thread , 0 ) ;
}
}
2010-02-10 21:49:50 +00:00
void merge_scheduler : : startlogtable ( int index , int64_t MAX_C0_SIZE )
2010-01-23 02:13:59 +00:00
{
2010-02-10 21:49:50 +00:00
2010-01-23 02:13:59 +00:00
logtable * ltable = mergedata [ index ] . first ;
struct logtable_mergedata * mdata = mergedata [ index ] . second ;
pthread_cond_t * block1_needed_cond = new pthread_cond_t ;
pthread_cond_init ( block1_needed_cond , 0 ) ;
pthread_cond_t * block2_needed_cond = new pthread_cond_t ;
pthread_cond_init ( block2_needed_cond , 0 ) ;
pthread_cond_t * block1_ready_cond = new pthread_cond_t ;
pthread_cond_init ( block1_ready_cond , 0 ) ;
pthread_cond_t * block2_ready_cond = new pthread_cond_t ;
pthread_cond_init ( block2_ready_cond , 0 ) ;
bool * block1_needed = new bool ( false ) ;
bool * block2_needed = new bool ( false ) ;
//wait to merge the next block until we have merged block FUDGE times.
static const int FUDGE = 1 ;
static double R = MIN_R ;
int64_t * block1_size = new int64_t ;
* block1_size = FUDGE * ( ( int ) R ) * ( * ( mdata - > input_size ) ) ;
//initialize rb-tree
2010-03-09 01:42:23 +00:00
ltable - > set_tree_c0 ( new memTreeComponent < datatuple > : : rbtree_t ) ;
2010-01-23 02:13:59 +00:00
//disk merger args
2010-02-10 21:49:50 +00:00
ltable - > max_c0_size = MAX_C0_SIZE ;
2010-02-19 00:59:14 +00:00
diskTreeComponent * * block1_scratch = new diskTreeComponent * ;
2010-01-23 02:13:59 +00:00
* block1_scratch = 0 ;
2010-02-17 23:38:31 +00:00
DEBUG ( " Tree C1 is %lld \n " , ( long long ) ltable - > get_tree_c1 ( ) - > get_root_rec ( ) . page ) ;
DEBUG ( " Tree C2 is %lld \n " , ( long long ) ltable - > get_tree_c2 ( ) - > get_root_rec ( ) . page ) ;
2010-02-18 23:31:57 +00:00
struct merger_args diskmerge_args = {
2010-01-23 02:13:59 +00:00
ltable ,
1 , //worker id
mdata - > rbtree_mut , //block_ready_mutex
block1_needed_cond , //in_block_needed_cond
block1_needed , //in_block_needed
block2_needed_cond , //out_block_needed_cond
block2_needed , //out_block_needed
block1_ready_cond , //in_block_ready_cond
block2_ready_cond , //out_block_ready_cond
2010-03-13 00:05:06 +00:00
mdata - > internal_region_size ,
mdata - > datapage_region_size ,
mdata - > datapage_size ,
2010-01-23 02:13:59 +00:00
0 , //max_tree_size No max size for biggest component
& R , //r_i
} ;
* mdata - > diskmerge_args = diskmerge_args ;
2010-02-18 23:31:57 +00:00
struct merger_args memmerge_args =
2010-01-23 02:13:59 +00:00
{
ltable ,
2 ,
2010-02-18 23:31:57 +00:00
mdata - > rbtree_mut ,
2010-01-23 02:13:59 +00:00
mdata - > input_needed_cond ,
mdata - > input_needed ,
block1_needed_cond ,
block1_needed ,
mdata - > input_ready_cond ,
block1_ready_cond ,
2010-03-13 00:05:06 +00:00
mdata - > internal_region_size , // TODO different region / datapage sizes for C1?
mdata - > datapage_region_size ,
mdata - > datapage_size ,
2010-01-23 02:13:59 +00:00
( int64_t ) ( R * R * MAX_C0_SIZE ) ,
& R ,
} ;
* mdata - > memmerge_args = memmerge_args ;
void * ( * diskmerger ) ( void * ) = diskMergeThread ;
void * ( * memmerger ) ( void * ) = memMergeThread ;
pthread_create ( & mdata - > diskmerge_thread , 0 , diskmerger , mdata - > diskmerge_args ) ;
pthread_create ( & mdata - > memmerge_thread , 0 , memmerger , mdata - > memmerge_args ) ;
}
2010-03-05 19:07:47 +00:00
template < class ITA , class ITB >
void merge_iterators ( int xid ,
ITA * itrA ,
ITB * itrB ,
logtable * ltable ,
2010-03-13 00:05:06 +00:00
diskTreeComponent * scratch_tree ,
2010-03-05 19:07:47 +00:00
merge_stats_t * stats ,
bool dropDeletes ) ;
2010-02-18 23:31:57 +00:00
/**
2010-02-25 01:29:32 +00:00
* Merge algorithm : Outsider ' s view
2010-02-18 23:31:57 +00:00
* < pre >
1 : while ( 1 )
2 : wait for c0_mergable
3 : begin
2010-02-25 01:29:32 +00:00
4 : merge c0_mergable and c1 into c1 ' # Blocks ; tree must be consistent at this point
5 : force c1 ' # Blocks
6 : if c1 ' is too big # Blocks ; tree must be consistent at this point .
7 : c1_mergable = c1 '
2010-02-18 23:31:57 +00:00
8 : c1 = new_empty
2010-02-25 01:29:32 +00:00
8.5 : delete old c1_mergeable # Happens in other thread ( not here )
9 : else
10 : c1 = c1 '
11 : c0_mergeable = NULL
11.5 : delete old c0_mergeable
12 : delete old c1
13 : commit
2010-02-18 23:31:57 +00:00
< / pre >
2010-02-25 01:29:32 +00:00
Merge algorithm : actual order : 1 2 3 4 5 6 12 11.5 11 [ 7 8 ( 9 ) 10 ] 13
2010-02-18 23:31:57 +00:00
*/
2010-01-23 02:13:59 +00:00
void * memMergeThread ( void * arg )
{
2010-03-05 19:07:47 +00:00
int xid ;
2010-01-23 02:13:59 +00:00
2010-02-18 23:31:57 +00:00
merger_args * a = ( merger_args * ) ( arg ) ;
2010-01-23 02:13:59 +00:00
logtable * ltable = a - > ltable ;
2010-02-18 23:31:57 +00:00
assert ( ltable - > get_tree_c1 ( ) ) ;
2010-01-23 02:13:59 +00:00
int merge_count = 0 ;
2010-02-25 01:29:32 +00:00
while ( true ) // 1
2010-01-23 02:13:59 +00:00
{
2010-03-05 19:07:47 +00:00
merge_stats_t stats ;
stats . merge_level = 1 ;
stats . merge_count = merge_count ;
gettimeofday ( & stats . sleep , 0 ) ;
2010-02-20 01:18:39 +00:00
writelock ( ltable - > header_lock , 0 ) ;
2010-01-23 02:13:59 +00:00
int done = 0 ;
2010-02-25 01:29:32 +00:00
// 2: wait for c0_mergable
2010-02-18 23:31:57 +00:00
while ( ! ltable - > get_tree_c0_mergeable ( ) )
2010-01-23 02:13:59 +00:00
{
pthread_mutex_lock ( a - > block_ready_mut ) ;
* a - > in_block_needed = true ;
//pthread_cond_signal(a->in_block_needed_cond);
pthread_cond_broadcast ( a - > in_block_needed_cond ) ;
2010-02-17 23:38:31 +00:00
if ( ! ltable - > is_still_running ( ) ) {
2010-01-23 02:13:59 +00:00
done = 1 ;
pthread_mutex_unlock ( a - > block_ready_mut ) ;
break ;
}
2010-03-05 19:07:47 +00:00
DEBUG ( " mmt: \t waiting for block ready cond \n " ) ;
2010-02-20 01:18:39 +00:00
unlock ( ltable - > header_lock ) ;
2010-01-23 02:13:59 +00:00
pthread_cond_wait ( a - > in_block_ready_cond , a - > block_ready_mut ) ;
pthread_mutex_unlock ( a - > block_ready_mut ) ;
2010-02-20 01:18:39 +00:00
writelock ( ltable - > header_lock , 0 ) ;
2010-03-05 19:07:47 +00:00
DEBUG ( " mmt: \t block ready \n " ) ;
2010-01-23 02:13:59 +00:00
}
* a - > in_block_needed = false ;
if ( done = = 1 )
{
pthread_mutex_lock ( a - > block_ready_mut ) ;
2010-02-18 23:31:57 +00:00
pthread_cond_signal ( a - > out_block_ready_cond ) ; // no block is ready. this allows the other thread to wake up, and see that we're shutting down.
2010-01-23 02:13:59 +00:00
pthread_mutex_unlock ( a - > block_ready_mut ) ;
2010-02-20 01:18:39 +00:00
unlock ( ltable - > header_lock ) ;
2010-01-23 02:13:59 +00:00
break ;
}
2010-03-05 19:07:47 +00:00
gettimeofday ( & stats . start , 0 ) ;
2010-02-18 23:31:57 +00:00
// 3: Begin transaction
xid = Tbegin ( ) ;
// 4: Merge
2010-01-23 02:13:59 +00:00
//create the iterators
2010-03-13 00:05:06 +00:00
diskTreeComponent : : diskTreeIterator * itrA = ltable - > get_tree_c1 ( ) - > iterator ( ) ;
2010-03-09 19:02:54 +00:00
memTreeComponent < datatuple > : : iterator * itrB =
new memTreeComponent < datatuple > : : iterator ( ltable - > get_tree_c0_mergeable ( ) ) ;
2010-01-28 02:20:49 +00:00
2010-01-23 02:13:59 +00:00
//create a new tree
2010-03-13 00:05:06 +00:00
diskTreeComponent * c1_prime = new diskTreeComponent ( xid , a - > internal_region_size , a - > datapage_region_size , a - > datapage_size ) ;
2010-02-15 23:02:01 +00:00
//pthread_mutex_unlock(a->block_ready_mut);
2010-02-20 01:18:39 +00:00
unlock ( ltable - > header_lock ) ;
2010-02-15 23:02:01 +00:00
//: do the merge
2010-03-05 19:07:47 +00:00
DEBUG ( " mmt: \t Merging: \n " ) ;
2010-01-23 02:13:59 +00:00
2010-03-05 19:07:47 +00:00
merge_iterators < typeof ( * itrA ) , typeof ( * itrB ) > ( xid , itrA , itrB , ltable , c1_prime , & stats , false ) ;
2010-02-15 23:02:01 +00:00
2010-01-23 02:13:59 +00:00
delete itrA ;
delete itrB ;
2010-02-15 23:02:01 +00:00
2010-02-18 23:31:57 +00:00
// 5: force c1'
2010-03-13 00:05:06 +00:00
//force write the new tree to disk
c1_prime - > force ( xid ) ;
2010-02-18 23:31:57 +00:00
2010-01-23 02:13:59 +00:00
merge_count + + ;
2010-03-05 19:07:47 +00:00
DEBUG ( " mmt: \t merge_count %lld #bytes written %lld \n " , stats . merge_count , stats . bytes_out ) ;
2010-01-23 02:13:59 +00:00
2010-02-25 01:29:32 +00:00
writelock ( ltable - > header_lock , 0 ) ;
2010-01-23 02:13:59 +00:00
//TODO: this is simplistic for now
2010-02-25 01:29:32 +00:00
//6: if c1' is too big, signal the other merger
2010-01-23 02:13:59 +00:00
double target_R = * ( a - > r_i ) ;
2010-03-05 19:07:47 +00:00
double new_c1_size = stats . bytes_out ;
2010-01-23 02:13:59 +00:00
assert ( target_R > = MIN_R ) ;
2010-02-25 01:29:32 +00:00
bool signal_c2 = ( new_c1_size / ltable - > max_c0_size > target_R ) | |
( a - > max_size & & new_c1_size > a - > max_size ) ;
if ( signal_c2 )
2010-01-23 02:13:59 +00:00
{
2010-03-05 19:07:47 +00:00
DEBUG ( " mmt: \t signaling C2 for merge \n " ) ;
DEBUG ( " mmt: \t new_c1_size %.2f \t MAX_C0_SIZE %lld \t a->max_size %lld \t targetr %.2f \n " , new_c1_size ,
2010-02-10 21:49:50 +00:00
ltable - > max_c0_size , a - > max_size , target_R ) ;
2010-02-18 23:31:57 +00:00
// XXX need to report backpressure here! Also, shouldn't be inside a transaction while waiting on backpressure.
while ( ltable - > get_tree_c1_mergeable ( ) ) {
2010-01-23 02:13:59 +00:00
pthread_mutex_lock ( a - > block_ready_mut ) ;
2010-02-20 01:18:39 +00:00
unlock ( ltable - > header_lock ) ;
2010-01-23 02:13:59 +00:00
pthread_cond_wait ( a - > out_block_needed_cond , a - > block_ready_mut ) ;
pthread_mutex_unlock ( a - > block_ready_mut ) ;
2010-02-20 01:18:39 +00:00
writelock ( ltable - > header_lock , 0 ) ;
2010-01-23 02:13:59 +00:00
}
2010-02-25 01:29:32 +00:00
}
// 12: delete old c1
2010-03-13 00:05:06 +00:00
ltable - > get_tree_c1 ( ) - > dealloc ( xid ) ;
2010-02-25 01:29:32 +00:00
delete ltable - > get_tree_c1 ( ) ;
// 11.5: delete old c0_mergeable
2010-03-09 01:42:23 +00:00
memTreeComponent < datatuple > : : tearDownTree ( ltable - > get_tree_c0_mergeable ( ) ) ;
2010-02-25 01:29:32 +00:00
// 11: c0_mergeable = NULL
ltable - > set_tree_c0_mergeable ( NULL ) ;
if ( signal_c2 ) {
// 7: and perhaps c1_mergeable
ltable - > set_tree_c1_mergeable ( c1_prime ) ;
// 8: c1 = new empty.
2010-03-13 00:05:06 +00:00
ltable - > set_tree_c1 ( new diskTreeComponent ( xid , a - > internal_region_size , a - > datapage_region_size , a - > datapage_size ) ) ;
2010-01-23 02:13:59 +00:00
pthread_cond_signal ( a - > out_block_ready_cond ) ;
2010-02-18 23:31:57 +00:00
} else {
2010-02-25 01:29:32 +00:00
// 10: c1 = c1'
2010-02-18 23:31:57 +00:00
ltable - > set_tree_c1 ( c1_prime ) ;
2010-01-23 02:13:59 +00:00
}
2010-03-05 19:07:47 +00:00
DEBUG ( " mmt: \t Updated C1's position on disk to %lld \n " , ltable - > get_tree_c1 ( ) - > get_root_rec ( ) . page ) ;
2010-02-25 01:29:32 +00:00
// 13
2010-02-27 00:35:13 +00:00
ltable - > update_persistent_header ( xid ) ;
2010-02-25 01:29:32 +00:00
Tcommit ( xid ) ;
2010-01-23 02:13:59 +00:00
2010-02-20 01:18:39 +00:00
unlock ( ltable - > header_lock ) ;
2010-01-23 02:13:59 +00:00
2010-03-05 19:07:47 +00:00
gettimeofday ( & stats . done , 0 ) ;
merge_stats_pp ( stdout , stats ) ;
2010-01-23 02:13:59 +00:00
//TODO: get the freeing outside of the lock
}
return 0 ;
}
2010-01-28 02:20:49 +00:00
2010-01-23 02:13:59 +00:00
void * diskMergeThread ( void * arg )
{
int xid ; // = Tbegin();
2010-02-18 23:31:57 +00:00
merger_args * a = ( merger_args * ) ( arg ) ;
2010-01-23 02:13:59 +00:00
logtable * ltable = a - > ltable ;
2010-02-18 23:31:57 +00:00
assert ( ltable - > get_tree_c2 ( ) ) ;
2010-03-05 19:07:47 +00:00
2010-01-23 02:13:59 +00:00
int merge_count = 0 ;
while ( true )
{
2010-03-05 19:07:47 +00:00
merge_stats_t stats ;
stats . merge_level = 2 ;
stats . merge_count = merge_count ;
gettimeofday ( & stats . sleep , 0 ) ;
2010-02-25 01:29:32 +00:00
// 2: wait for input
writelock ( ltable - > header_lock , 0 ) ;
2010-01-23 02:13:59 +00:00
int done = 0 ;
// get a new input for merge
2010-02-18 23:31:57 +00:00
while ( ! ltable - > get_tree_c1_mergeable ( ) )
2010-01-23 02:13:59 +00:00
{
pthread_mutex_lock ( a - > block_ready_mut ) ;
* a - > in_block_needed = true ;
pthread_cond_signal ( a - > in_block_needed_cond ) ;
2010-02-17 23:38:31 +00:00
if ( ! ltable - > is_still_running ( ) ) {
2010-01-23 02:13:59 +00:00
done = 1 ;
pthread_mutex_unlock ( a - > block_ready_mut ) ;
break ;
}
2010-03-05 19:07:47 +00:00
DEBUG ( " dmt: \t waiting for block ready cond \n " ) ;
2010-02-20 01:18:39 +00:00
unlock ( ltable - > header_lock ) ;
2010-01-23 02:13:59 +00:00
pthread_cond_wait ( a - > in_block_ready_cond , a - > block_ready_mut ) ;
pthread_mutex_unlock ( a - > block_ready_mut ) ;
2010-03-05 19:07:47 +00:00
DEBUG ( " dmt: \t block ready \n " ) ;
2010-02-20 01:18:39 +00:00
writelock ( ltable - > header_lock , 0 ) ;
2010-01-23 02:13:59 +00:00
}
* a - > in_block_needed = false ;
if ( done = = 1 )
{
pthread_cond_signal ( a - > out_block_ready_cond ) ;
2010-02-20 01:18:39 +00:00
unlock ( ltable - > header_lock ) ;
2010-01-23 02:13:59 +00:00
break ;
}
2010-03-05 19:07:47 +00:00
gettimeofday ( & stats . start , 0 ) ;
// 3: begin
2010-02-25 01:29:32 +00:00
xid = Tbegin ( ) ;
// 4: do the merge.
2010-01-23 02:13:59 +00:00
//create the iterators
2010-03-13 00:05:06 +00:00
diskTreeComponent : : diskTreeIterator * itrA = ltable - > get_tree_c2 ( ) - > iterator ( ) ; //new diskTreeIterator<datatuple>(ltable->get_tree_c2()->get_root_rec());
diskTreeComponent : : diskTreeIterator * itrB = ltable - > get_tree_c1_mergeable ( ) - > iterator ( ) ;
2010-02-25 01:29:32 +00:00
2010-01-23 02:13:59 +00:00
//create a new tree
2010-03-13 00:05:06 +00:00
diskTreeComponent * c2_prime = new diskTreeComponent ( xid , a - > internal_region_size , a - > datapage_region_size , a - > datapage_size ) ;
2010-02-17 23:38:31 +00:00
2010-02-20 01:18:39 +00:00
unlock ( ltable - > header_lock ) ;
2010-01-23 02:13:59 +00:00
//do the merge
2010-03-05 19:07:47 +00:00
DEBUG ( " dmt: \t Merging: \n " ) ;
2010-01-23 02:13:59 +00:00
2010-03-05 19:07:47 +00:00
merge_iterators < typeof ( * itrA ) , typeof ( * itrB ) > ( xid , itrA , itrB , ltable , c2_prime , & stats , true ) ;
2010-01-23 02:13:59 +00:00
delete itrA ;
delete itrB ;
2010-02-18 23:31:57 +00:00
2010-02-25 01:29:32 +00:00
//5: force write the new region to disk
2010-03-13 00:05:06 +00:00
c2_prime - > force ( xid ) ;
2010-02-18 23:31:57 +00:00
2010-02-25 01:29:32 +00:00
// (skip 6, 7, 8, 8.5, 9))
2010-02-18 23:31:57 +00:00
2010-02-25 01:29:32 +00:00
writelock ( ltable - > header_lock , 0 ) ;
//12
2010-03-13 00:05:06 +00:00
ltable - > get_tree_c2 ( ) - > dealloc ( xid ) ;
2010-02-18 23:31:57 +00:00
delete ltable - > get_tree_c2 ( ) ;
2010-02-25 01:29:32 +00:00
//11.5
2010-03-13 00:05:06 +00:00
ltable - > get_tree_c1_mergeable ( ) - > dealloc ( xid ) ;
2010-02-25 01:29:32 +00:00
//11
delete ltable - > get_tree_c1_mergeable ( ) ;
ltable - > set_tree_c1_mergeable ( 0 ) ;
2010-01-23 02:13:59 +00:00
//writes complete
2010-02-17 22:11:22 +00:00
//now atomically replace the old c2 with new c2
2010-01-23 02:13:59 +00:00
//pthread_mutex_lock(a->block_ready_mut);
2010-02-25 01:29:32 +00:00
2010-01-23 02:13:59 +00:00
merge_count + + ;
//update the current optimal R value
2010-03-05 19:07:47 +00:00
* ( a - > r_i ) = std : : max ( MIN_R , sqrt ( ( stats . bytes_out * 1.0 ) / ( ltable - > max_c0_size ) ) ) ;
2010-01-23 02:13:59 +00:00
2010-03-05 19:07:47 +00:00
DEBUG ( " dmt: \t merge_count %lld \t #written bytes: %lld \n optimal r %.2f " , stats . merge_count , stats . bytes_out , * ( a - > r_i ) ) ;
2010-02-25 01:29:32 +00:00
// 10: C2 is never to big
2010-02-18 23:31:57 +00:00
ltable - > set_tree_c2 ( c2_prime ) ;
2010-01-23 02:13:59 +00:00
2010-03-05 19:07:47 +00:00
DEBUG ( " dmt: \t Updated C2's position on disk to %lld \n " , ( long long ) - 1 ) ;
2010-02-25 01:29:32 +00:00
// 13
2010-02-27 00:35:13 +00:00
ltable - > update_persistent_header ( xid ) ;
2010-01-23 02:13:59 +00:00
Tcommit ( xid ) ;
2010-02-20 01:18:39 +00:00
unlock ( ltable - > header_lock ) ;
2010-03-05 19:07:47 +00:00
gettimeofday ( & stats . done , 0 ) ;
merge_stats_pp ( stdout , stats ) ;
2010-01-23 02:13:59 +00:00
}
return 0 ;
}
2010-03-05 19:07:47 +00:00
2010-01-28 02:20:49 +00:00
template < class ITA , class ITB >
2010-03-05 19:07:47 +00:00
void merge_iterators ( int xid ,
2010-01-28 02:20:49 +00:00
ITA * itrA , //iterator on c1 or c2
ITB * itrB , //iterator on c0 or c1, respectively
2010-01-23 02:13:59 +00:00
logtable * ltable ,
2010-03-13 00:05:06 +00:00
diskTreeComponent * scratch_tree , merge_stats_t * stats ,
2010-01-28 02:20:49 +00:00
bool dropDeletes // should be true iff this is biggest component
)
2010-01-23 02:13:59 +00:00
{
2010-03-05 19:07:47 +00:00
stats - > bytes_out = 0 ;
stats - > num_tuples_out = 0 ;
stats - > bytes_in_small = 0 ;
stats - > num_tuples_in_small = 0 ;
stats - > bytes_in_large = 0 ;
stats - > num_tuples_in_large = 0 ;
2010-03-09 19:02:54 +00:00
datatuple * t1 = itrA - > next_callerFrees ( ) ;
2010-03-05 19:07:47 +00:00
if ( t1 ) {
stats - > num_tuples_in_large + + ;
stats - > bytes_in_large + = t1 - > byte_length ( ) ;
}
2010-01-23 02:13:59 +00:00
datatuple * t2 = 0 ;
2010-03-09 19:02:54 +00:00
while ( ( t2 = itrB - > next_callerFrees ( ) ) ! = 0 )
2010-01-23 02:13:59 +00:00
{
2010-03-05 19:07:47 +00:00
stats - > num_tuples_in_small + + ;
stats - > bytes_in_small + = t2 - > byte_length ( ) ;
2010-01-23 02:13:59 +00:00
DEBUG ( " tuple \t %lld: keylen %d datalen %d \n " ,
ntuples , * ( t2 - > keylen ) , * ( t2 - > datalen ) ) ;
2010-02-23 17:05:47 +00:00
while ( t1 ! = 0 & & datatuple : : compare ( t1 - > key ( ) , t1 - > keylen ( ) , t2 - > key ( ) , t2 - > keylen ( ) ) < 0 ) // t1 is less than t2
2010-01-23 02:13:59 +00:00
{
//insert t1
2010-03-13 00:05:06 +00:00
scratch_tree - > insertTuple ( xid , t1 , stats ) ;
2010-02-10 21:49:50 +00:00
datatuple : : freetuple ( t1 ) ;
2010-03-05 19:07:47 +00:00
stats - > num_tuples_out + + ;
2010-01-23 02:13:59 +00:00
//advance itrA
2010-03-09 19:02:54 +00:00
t1 = itrA - > next_callerFrees ( ) ;
2010-03-05 19:07:47 +00:00
if ( t1 ) {
stats - > num_tuples_in_large + + ;
stats - > bytes_in_large + = t1 - > byte_length ( ) ;
}
2010-01-23 02:13:59 +00:00
}
2010-02-23 17:05:47 +00:00
if ( t1 ! = 0 & & datatuple : : compare ( t1 - > key ( ) , t1 - > keylen ( ) , t2 - > key ( ) , t2 - > keylen ( ) ) = = 0 )
2010-01-23 02:13:59 +00:00
{
datatuple * mtuple = ltable - > gettuplemerger ( ) - > merge ( t1 , t2 ) ;
//insert merged tuple, drop deletes
2010-03-13 00:05:06 +00:00
if ( dropDeletes & & ! mtuple - > isDelete ( ) ) {
scratch_tree - > insertTuple ( xid , mtuple , stats ) ;
}
2010-02-10 21:49:50 +00:00
datatuple : : freetuple ( t1 ) ;
2010-03-09 19:02:54 +00:00
t1 = itrA - > next_callerFrees ( ) ; //advance itrA
2010-03-05 19:07:47 +00:00
if ( t1 ) {
stats - > num_tuples_in_large + + ;
stats - > bytes_in_large + = t1 - > byte_length ( ) ;
}
2010-02-10 21:49:50 +00:00
datatuple : : freetuple ( mtuple ) ;
2010-01-23 02:13:59 +00:00
}
else
{
//insert t2
2010-03-13 00:05:06 +00:00
scratch_tree - > insertTuple ( xid , t2 , stats ) ;
2010-01-28 02:20:49 +00:00
// cannot free any tuples here; they may still be read through a lookup
2010-01-23 02:13:59 +00:00
}
2010-02-10 21:49:50 +00:00
datatuple : : freetuple ( t2 ) ;
2010-03-05 19:07:47 +00:00
stats - > num_tuples_out + + ;
2010-01-23 02:13:59 +00:00
}
2010-03-13 00:05:06 +00:00
while ( t1 ! = 0 ) { // t1 is less than t2
scratch_tree - > insertTuple ( xid , t1 , stats ) ;
2010-01-23 02:13:59 +00:00
2010-02-10 21:49:50 +00:00
datatuple : : freetuple ( t1 ) ;
2010-03-05 19:07:47 +00:00
stats - > num_tuples_out + + ;
2010-01-23 02:13:59 +00:00
//advance itrA
2010-03-09 19:02:54 +00:00
t1 = itrA - > next_callerFrees ( ) ;
2010-03-05 19:07:47 +00:00
if ( t1 ) {
stats - > num_tuples_in_large + + ;
stats - > bytes_in_large + = t1 - > byte_length ( ) ;
}
2010-03-13 00:05:06 +00:00
}
2010-01-23 02:13:59 +00:00
DEBUG ( " dpages: %d \t npages: %d \t ntuples: %d \n " , dpages , npages , ntuples ) ;
2010-03-13 00:05:06 +00:00
scratch_tree - > writes_done ( ) ;
2010-01-23 02:13:59 +00:00
}