2010-01-23 02:13:59 +00:00
# ifndef _LOGSTORE_H_
# define _LOGSTORE_H_
2010-03-17 21:51:26 +00:00
# include <stasis/common.h>
# undef try
2010-01-23 02:13:59 +00:00
# undef end
# include <vector>
2010-03-17 21:51:26 +00:00
typedef struct merge_stats_t {
int merge_level ; // 1 => C0->C1, 2 => C1->C2
pageid_t merge_count ; // This is the merge_count'th merge
struct timeval sleep ; // When did we go to sleep waiting for input?
struct timeval start ; // When did we wake up and start merging? (at steady state with max throughput, this should be equal to sleep)
struct timeval done ; // When did we finish merging?
pageid_t bytes_out ; // How many bytes did we write (including internal tree nodes)?
pageid_t num_tuples_out ; // How many tuples did we write?
pageid_t num_datapages_out ; // How many datapages?
pageid_t bytes_in_small ; // How many bytes from the small input tree (for C0, we ignore tree overheads)?
pageid_t num_tuples_in_small ; // Tuples from the small input?
pageid_t bytes_in_large ; // Bytes from the large input?
pageid_t num_tuples_in_large ; // Tuples from large input?
} merge_stats_t ;
2010-01-23 02:13:59 +00:00
2010-02-19 00:52:21 +00:00
# include "diskTreeComponent.h"
2010-03-09 01:42:23 +00:00
# include "memTreeComponent.h"
2010-03-17 21:51:26 +00:00
2010-01-23 02:13:59 +00:00
# include "tuplemerger.h"
2010-03-17 21:51:26 +00:00
struct logtable_mergedata ;
2010-02-25 01:29:32 +00:00
2010-03-17 21:51:26 +00:00
template < class TUPLE >
class logtable {
2010-01-23 02:13:59 +00:00
public :
2010-03-17 21:51:26 +00:00
class iterator ;
2010-03-13 00:05:06 +00:00
logtable ( pageid_t internal_region_size = 1000 , pageid_t datapage_region_size = 10000 , pageid_t datapage_size = 40 ) ; // scans 160KB / 2 per lookup on average. at 100MB/s, this is 0.7 ms. XXX pick datapage_size in principled way.
2010-01-23 02:13:59 +00:00
~ logtable ( ) ;
//user access functions
2010-02-10 21:49:50 +00:00
datatuple * findTuple ( int xid , const datatuple : : key_t key , size_t keySize ) ;
2010-01-23 02:13:59 +00:00
datatuple * findTuple_first ( int xid , datatuple : : key_t key , size_t keySize ) ;
2010-02-10 21:49:50 +00:00
void insertTuple ( struct datatuple * tuple ) ;
2010-01-23 02:13:59 +00:00
//other class functions
recordid allocTable ( int xid ) ;
2010-03-01 21:26:07 +00:00
void openTable ( int xid , recordid rid ) ;
2010-01-23 02:13:59 +00:00
void flushTable ( ) ;
2010-02-17 23:38:31 +00:00
inline recordid & get_table_rec ( ) { return table_rec ; } // TODO This is called by merger.cpp for no good reason. (remove the calls)
2010-01-23 02:13:59 +00:00
2010-02-20 01:18:39 +00:00
inline uint64_t get_epoch ( ) { return epoch ; }
2010-03-17 21:51:26 +00:00
void registerIterator ( iterator * it ) ;
void forgetIterator ( iterator * it ) ;
2010-02-25 01:29:32 +00:00
void bump_epoch ( ) ;
2010-02-20 01:18:39 +00:00
2010-03-13 00:05:06 +00:00
inline diskTreeComponent * get_tree_c2 ( ) { return tree_c2 ; }
inline diskTreeComponent * get_tree_c1 ( ) { return tree_c1 ; }
inline diskTreeComponent * get_tree_c1_mergeable ( ) { return tree_c1_mergeable ; }
2010-01-23 02:13:59 +00:00
2010-03-13 00:05:06 +00:00
inline void set_tree_c1 ( diskTreeComponent * t ) { tree_c1 = t ; bump_epoch ( ) ; }
inline void set_tree_c1_mergeable ( diskTreeComponent * t ) { tree_c1_mergeable = t ; bump_epoch ( ) ; }
inline void set_tree_c2 ( diskTreeComponent * t ) { tree_c2 = t ; bump_epoch ( ) ; }
2010-01-23 02:13:59 +00:00
2010-03-09 01:42:23 +00:00
inline memTreeComponent < datatuple > : : rbtree_ptr_t get_tree_c0 ( ) { return tree_c0 ; }
inline memTreeComponent < datatuple > : : rbtree_ptr_t get_tree_c0_mergeable ( ) { return tree_c0_mergeable ; }
void set_tree_c0 ( memTreeComponent < datatuple > : : rbtree_ptr_t newtree ) { tree_c0 = newtree ; bump_epoch ( ) ; }
void set_tree_c0_mergeable ( memTreeComponent < datatuple > : : rbtree_ptr_t newtree ) { tree_c0_mergeable = newtree ; bump_epoch ( ) ; }
2010-01-23 02:13:59 +00:00
2010-02-27 00:35:13 +00:00
void update_persistent_header ( int xid ) ;
2010-03-17 21:51:26 +00:00
void setMergeData ( logtable_mergedata * mdata ) ;
2010-01-23 02:13:59 +00:00
logtable_mergedata * getMergeData ( ) { return mergedata ; }
inline tuplemerger * gettuplemerger ( ) { return tmerger ; }
public :
struct table_header {
recordid c2_root ; //tree root record --> points to the root of the b-tree
recordid c2_state ; //tree state --> describes the regions used by the index tree
recordid c2_dp_state ; //data pages state --> regions used by the data pages
recordid c1_root ;
recordid c1_state ;
recordid c1_dp_state ;
} ;
logtable_mergedata * mergedata ;
2010-02-20 01:18:39 +00:00
rwl * header_lock ;
2010-01-23 02:13:59 +00:00
2010-02-10 21:49:50 +00:00
int64_t max_c0_size ;
2010-02-17 23:38:31 +00:00
inline bool is_still_running ( ) { return still_running_ ; }
inline void stop ( ) {
still_running_ = false ;
// XXX must need to do other things!
}
2010-01-23 02:13:59 +00:00
private :
recordid table_rec ;
struct table_header tbl_header ;
2010-02-20 01:18:39 +00:00
uint64_t epoch ;
2010-03-13 00:05:06 +00:00
diskTreeComponent * tree_c2 ; //big tree
diskTreeComponent * tree_c1 ; //small tree
diskTreeComponent * tree_c1_mergeable ; //small tree: ready to be merged with c2
2010-03-09 01:42:23 +00:00
memTreeComponent < datatuple > : : rbtree_ptr_t tree_c0 ; // in-mem red black tree
memTreeComponent < datatuple > : : rbtree_ptr_t tree_c0_mergeable ; // in-mem red black tree: ready to be merged with c1.
2010-01-23 02:13:59 +00:00
int tsize ; //number of tuples
int64_t tree_bytes ; //number of bytes
//DATA PAGE SETTINGS
2010-03-13 00:05:06 +00:00
pageid_t internal_region_size ; // in number of pages
pageid_t datapage_region_size ; // "
pageid_t datapage_size ; // "
2010-01-23 02:13:59 +00:00
tuplemerger * tmerger ;
2010-02-17 23:38:31 +00:00
2010-03-17 21:51:26 +00:00
std : : vector < iterator * > its ;
2010-02-25 01:29:32 +00:00
2010-02-17 23:38:31 +00:00
bool still_running_ ;
2010-02-20 01:18:39 +00:00
public :
2010-03-17 21:51:26 +00:00
template < class ITRA , class ITRN >
class mergeManyIterator {
public :
explicit mergeManyIterator ( ITRA * a , ITRN * * iters , int num_iters , TUPLE * ( * merge ) ( const TUPLE * , const TUPLE * ) , int ( * cmp ) ( const TUPLE * , const TUPLE * ) ) :
num_iters_ ( num_iters + 1 ) ,
first_iter_ ( a ) ,
iters_ ( ( ITRN * * ) malloc ( sizeof ( * iters_ ) * num_iters ) ) , // exactly the number passed in
current_ ( ( TUPLE * * ) malloc ( sizeof ( * current_ ) * ( num_iters_ ) ) ) , // one more than was passed in
last_iter_ ( - 1 ) ,
cmp_ ( cmp ) ,
merge_ ( merge ) ,
dups ( ( int * ) malloc ( sizeof ( * dups ) * num_iters_ ) )
{
current_ [ 0 ] = first_iter_ - > getnext ( ) ;
for ( int i = 1 ; i < num_iters_ ; i + + ) {
iters_ [ i - 1 ] = iters [ i - 1 ] ;
2010-03-17 23:45:41 +00:00
current_ [ i ] = iters_ [ i - 1 ] ? iters_ [ i - 1 ] - > next_callerFrees ( ) : NULL ;
2010-03-17 21:51:26 +00:00
}
2010-02-20 01:18:39 +00:00
}
2010-03-17 21:51:26 +00:00
~ mergeManyIterator ( ) {
delete ( first_iter_ ) ;
for ( int i = 0 ; i < num_iters_ ; i + + ) {
if ( i ! = last_iter_ ) {
if ( current_ [ i ] ) TUPLE : : freetuple ( current_ [ i ] ) ;
}
}
for ( int i = 1 ; i < num_iters_ ; i + + ) {
delete iters_ [ i - 1 ] ;
}
free ( current_ ) ;
free ( iters_ ) ;
free ( dups ) ;
2010-02-20 01:18:39 +00:00
}
2010-03-17 21:51:26 +00:00
TUPLE * peek ( ) {
TUPLE * ret = getnext ( ) ;
last_iter_ = - 1 ; // don't advance iterator on next peek() or getnext() call.
return ret ;
}
TUPLE * getnext ( ) {
int num_dups = 0 ;
if ( last_iter_ ! = - 1 ) {
// get the value after the one we just returned to the user
//TUPLE::freetuple(current_[last_iter_]); // should never be null
if ( last_iter_ = = 0 ) {
current_ [ last_iter_ ] = first_iter_ - > getnext ( ) ;
} else if ( last_iter_ ! = - 1 ) {
current_ [ last_iter_ ] = iters_ [ last_iter_ - 1 ] - > next_callerFrees ( ) ;
} else {
// last call was 'peek'
}
}
// find the first non-empty iterator. (Don't need to special-case ITRA since we're looking at current.)
int min = 0 ;
while ( min < num_iters_ & & ! current_ [ min ] ) {
min + + ;
}
if ( min = = num_iters_ ) { return NULL ; }
// examine current to decide which tuple to return.
for ( int i = min + 1 ; i < num_iters_ ; i + + ) {
if ( current_ [ i ] ) {
int res = cmp_ ( current_ [ min ] , current_ [ i ] ) ;
if ( res > 0 ) { // min > i
min = i ;
num_dups = 0 ;
} else if ( res = = 0 ) { // min == i
dups [ num_dups ] = i ;
num_dups + + ;
}
}
}
TUPLE * ret ;
if ( ! merge_ ) {
ret = current_ [ min ] ;
} else {
// XXX use merge function to build a new ret.
abort ( ) ;
}
// advance the iterators that match the tuple we're returning.
for ( int i = 0 ; i < num_dups ; i + + ) {
TUPLE : : freetuple ( current_ [ dups [ i ] ] ) ; // should never be null
current_ [ dups [ i ] ] = iters_ [ dups [ i ] - 1 ] - > next_callerFrees ( ) ;
}
last_iter_ = min ; // mark the min iter to be advance at the next invocation of next(). This saves us a copy in the non-merging case.
return ret ;
2010-01-23 02:13:59 +00:00
2010-03-17 21:51:26 +00:00
}
private :
int num_iters_ ;
ITRA * first_iter_ ;
ITRN * * iters_ ;
TUPLE * * current_ ;
int last_iter_ ;
2010-01-23 02:13:59 +00:00
2010-03-17 21:51:26 +00:00
int ( * cmp_ ) ( const TUPLE * , const TUPLE * ) ;
TUPLE * ( * merge_ ) ( const TUPLE * , const TUPLE * ) ;
2010-02-20 01:18:39 +00:00
2010-03-17 21:51:26 +00:00
// temporary variables initiaized once for effiency
int * dups ;
2010-02-20 01:18:39 +00:00
2010-03-17 21:51:26 +00:00
} ;
2010-02-20 01:18:39 +00:00
2010-03-17 21:51:26 +00:00
class iterator {
public :
explicit iterator ( logtable * ltable )
: ltable ( ltable ) ,
epoch ( ltable - > get_epoch ( ) ) ,
merge_it_ ( NULL ) ,
last_returned ( NULL ) ,
key ( NULL ) ,
valid ( false ) {
writelock ( ltable - > header_lock , 0 ) ;
ltable - > registerIterator ( this ) ;
validate ( ) ;
unlock ( ltable - > header_lock ) ;
}
2010-02-25 01:29:32 +00:00
2010-03-17 21:51:26 +00:00
explicit iterator ( logtable * ltable , TUPLE * key )
: ltable ( ltable ) ,
epoch ( ltable - > get_epoch ( ) ) ,
merge_it_ ( NULL ) ,
last_returned ( NULL ) ,
key ( key ) ,
valid ( false )
{
writelock ( ltable - > header_lock , 0 ) ;
ltable - > registerIterator ( this ) ;
validate ( ) ;
unlock ( ltable - > header_lock ) ;
}
2010-01-23 02:13:59 +00:00
2010-03-17 21:51:26 +00:00
~ iterator ( ) {
ltable - > forgetIterator ( this ) ;
invalidate ( ) ;
if ( last_returned ) TUPLE : : freetuple ( last_returned ) ;
}
private :
TUPLE * getnextHelper ( ) {
TUPLE * tmp = merge_it_ - > getnext ( ) ;
if ( last_returned & & tmp ) {
assert ( TUPLE : : compare ( last_returned - > key ( ) , last_returned - > keylen ( ) , tmp - > key ( ) , tmp - > keylen ( ) ) < 0 ) ;
TUPLE : : freetuple ( last_returned ) ;
}
last_returned = tmp ;
return last_returned ;
}
public :
TUPLE * getnextIncludingTombstones ( ) {
readlock ( ltable - > header_lock , 0 ) ;
revalidate ( ) ;
TUPLE * ret = getnextHelper ( ) ;
unlock ( ltable - > header_lock ) ;
return ret ? ret - > create_copy ( ) : NULL ;
2010-02-25 01:29:32 +00:00
}
2010-02-20 01:18:39 +00:00
2010-03-17 21:51:26 +00:00
TUPLE * getnext ( ) {
readlock ( ltable - > header_lock , 0 ) ;
revalidate ( ) ;
TUPLE * ret ;
while ( ( ret = getnextHelper ( ) ) & & ret - > isDelete ( ) ) { } // getNextHelper handles its own memory.
unlock ( ltable - > header_lock ) ;
return ret ? ret - > create_copy ( ) : NULL ; // XXX hate making copy! Caller should not manage our memory.
}
2010-02-20 01:18:39 +00:00
2010-03-17 21:51:26 +00:00
void invalidate ( ) {
if ( valid ) {
delete merge_it_ ;
merge_it_ = NULL ;
valid = false ;
}
}
2010-02-20 01:18:39 +00:00
2010-03-17 21:51:26 +00:00
private :
inline void init_helper ( ) ;
explicit iterator ( ) { abort ( ) ; }
void operator = ( iterator & t ) { abort ( ) ; }
int operator - ( iterator & t ) { abort ( ) ; }
private :
static const int C1 = 0 ;
static const int C1_MERGEABLE = 1 ;
static const int C2 = 2 ;
logtable * ltable ;
uint64_t epoch ;
typedef mergeManyIterator <
typename memTreeComponent < TUPLE > : : revalidatingIterator ,
typename memTreeComponent < TUPLE > : : iterator > inner_merge_it_t ;
typedef mergeManyIterator <
inner_merge_it_t ,
diskTreeComponent : : iterator > merge_it_t ;
merge_it_t * merge_it_ ;
TUPLE * last_returned ;
TUPLE * key ;
bool valid ;
void revalidate ( ) {
if ( ! valid ) {
validate ( ) ;
} else {
assert ( epoch = = ltable - > get_epoch ( ) ) ;
}
2010-02-25 01:29:32 +00:00
}
2010-01-23 02:13:59 +00:00
2010-03-17 21:51:26 +00:00
void validate ( ) {
typename memTreeComponent < TUPLE > : : revalidatingIterator * c0_it ;
typename memTreeComponent < TUPLE > : : iterator * c0_mergeable_it [ 1 ] ;
diskTreeComponent : : iterator * disk_it [ 3 ] ;
epoch = ltable - > get_epoch ( ) ;
2010-03-17 23:45:41 +00:00
datatuple * t ;
2010-03-17 21:51:26 +00:00
if ( last_returned ) {
2010-03-17 23:45:41 +00:00
t = last_returned ;
2010-03-17 21:51:26 +00:00
} else if ( key ) {
2010-03-17 23:45:41 +00:00
t = key ;
} else {
t = NULL ;
}
c0_it = new typename memTreeComponent < TUPLE > : : revalidatingIterator ( ltable - > get_tree_c0 ( ) , ltable - > getMergeData ( ) - > rbtree_mut , t ) ;
c0_mergeable_it [ 0 ] = new typename memTreeComponent < TUPLE > : : iterator ( ltable - > get_tree_c0_mergeable ( ) , t ) ;
disk_it [ 0 ] = ltable - > get_tree_c1 ( ) - > open_iterator ( t ) ;
if ( ltable - > get_tree_c1_mergeable ( ) ) {
disk_it [ 1 ] = ltable - > get_tree_c1_mergeable ( ) - > open_iterator ( t ) ;
2010-03-17 21:51:26 +00:00
} else {
2010-03-17 23:45:41 +00:00
disk_it [ 1 ] = NULL ;
2010-03-17 21:51:26 +00:00
}
2010-03-17 23:45:41 +00:00
disk_it [ 2 ] = ltable - > get_tree_c2 ( ) - > open_iterator ( t ) ;
2010-02-20 01:18:39 +00:00
2010-03-17 21:51:26 +00:00
inner_merge_it_t * inner_merge_it =
new inner_merge_it_t ( c0_it , c0_mergeable_it , 1 , NULL , TUPLE : : compare_obj ) ;
merge_it_ = new merge_it_t ( inner_merge_it , disk_it , 3 , NULL , TUPLE : : compare_obj ) ; // XXX Hardcodes comparator, and does not handle merges
if ( last_returned ) {
TUPLE * junk = merge_it_ - > peek ( ) ;
if ( junk & & ! TUPLE : : compare ( junk - > key ( ) , junk - > keylen ( ) , last_returned - > key ( ) , last_returned - > keylen ( ) ) ) {
// we already returned junk
TUPLE : : freetuple ( merge_it_ - > getnext ( ) ) ;
}
2010-03-09 19:02:54 +00:00
}
2010-03-17 21:51:26 +00:00
valid = true ;
2010-03-09 19:02:54 +00:00
}
2010-03-17 21:51:26 +00:00
} ;
2010-02-20 01:18:39 +00:00
} ;
2010-01-23 02:13:59 +00:00
# endif