2010-03-09 01:42:23 +00:00
# ifndef _MEMTREECOMPONENT_H_
# define _MEMTREECOMPONENT_H_
# include <set>
2010-08-18 17:29:25 +00:00
# include <assert.h>
2010-08-21 03:09:18 +00:00
# include <mergeManager.h> // XXX for double_to_ts.
2010-03-17 21:51:26 +00:00
2010-03-09 01:42:23 +00:00
template < class TUPLE >
class memTreeComponent {
public :
typedef std : : set < TUPLE * , TUPLE > rbtree_t ;
typedef rbtree_t * rbtree_ptr_t ;
static void tearDownTree ( rbtree_ptr_t t ) ;
2010-03-09 19:02:54 +00:00
///////////////////////////////////////////////////////////////
// Plain iterator; cannot cope with changes to underlying tree
///////////////////////////////////////////////////////////////
2010-03-09 01:42:23 +00:00
2010-03-09 19:02:54 +00:00
class iterator
2010-03-09 01:42:23 +00:00
{
private :
typedef typename rbtree_t : : const_iterator MTITER ;
public :
2010-03-09 19:02:54 +00:00
iterator ( rbtree_t * s )
2010-03-09 01:42:23 +00:00
: first_ ( true ) ,
done_ ( s = = NULL ) {
init_iterators ( s , NULL , NULL ) ;
}
2010-03-09 19:02:54 +00:00
iterator ( rbtree_t * s , TUPLE * & key )
2010-03-09 01:42:23 +00:00
: first_ ( true ) , done_ ( s = = NULL ) {
init_iterators ( s , key , NULL ) ;
}
2010-03-09 19:02:54 +00:00
~ iterator ( ) {
2010-03-09 01:42:23 +00:00
delete it_ ;
delete itend_ ;
}
2010-03-09 19:02:54 +00:00
TUPLE * next_callerFrees ( ) {
2010-03-09 01:42:23 +00:00
if ( done_ ) { return NULL ; }
if ( first_ ) { first_ = 0 ; } else { ( * it_ ) + + ; }
if ( * it_ = = * itend_ ) { done_ = true ; return NULL ; }
return ( * ( * it_ ) ) - > create_copy ( ) ;
}
private :
void init_iterators ( rbtree_t * s , TUPLE * key1 , TUPLE * key2 ) {
if ( s ) {
2010-03-09 19:02:54 +00:00
it_ = key1 ? new MTITER ( s - > find ( key1 ) ) : new MTITER ( s - > begin ( ) ) ;
itend_ = key2 ? new MTITER ( s - > find ( key2 ) ) : new MTITER ( s - > end ( ) ) ;
if ( * it_ = = * itend_ ) { done_ = true ; }
if ( key1 ) {
if ( done_ ) {
// DEBUG("memtree opened at eot\n");
} else {
// DEBUG("memtree opened key = %s\n", (**it_)->key());
}
}
2010-03-09 01:42:23 +00:00
} else {
2010-03-09 19:02:54 +00:00
it_ = NULL ;
itend_ = NULL ;
2010-03-09 01:42:23 +00:00
}
}
2010-03-09 19:02:54 +00:00
explicit iterator ( ) { abort ( ) ; }
void operator = ( iterator & t ) { abort ( ) ; }
int operator - ( iterator & t ) { abort ( ) ; }
2010-03-09 01:42:23 +00:00
private :
bool first_ ;
bool done_ ;
MTITER * it_ ;
MTITER * itend_ ;
} ;
2010-03-09 19:02:54 +00:00
///////////////////////////////////////////////////////////////
// Revalidating iterator; automatically copes with changes to underlying tree
///////////////////////////////////////////////////////////////
class revalidatingIterator
2010-03-09 17:47:47 +00:00
{
private :
typedef typename rbtree_t : : const_iterator MTITER ;
public :
2010-03-09 19:02:54 +00:00
revalidatingIterator ( rbtree_t * s , pthread_mutex_t * rb_mut ) : s_ ( s ) , mut_ ( rb_mut ) {
2010-05-19 23:42:06 +00:00
if ( mut_ ) pthread_mutex_lock ( mut_ ) ;
2010-03-09 17:47:47 +00:00
if ( s_ - > begin ( ) = = s_ - > end ( ) ) {
2010-03-09 19:02:54 +00:00
next_ret_ = NULL ;
2010-03-09 17:47:47 +00:00
} else {
2010-03-09 19:02:54 +00:00
next_ret_ = ( * s_ - > begin ( ) ) - > create_copy ( ) ; // the create_copy() calls have to happen before we release mut_...
2010-03-09 17:47:47 +00:00
}
2010-05-19 23:42:06 +00:00
if ( mut_ ) pthread_mutex_unlock ( mut_ ) ;
2010-03-09 17:47:47 +00:00
}
2010-03-09 19:02:54 +00:00
revalidatingIterator ( rbtree_t * s , pthread_mutex_t * rb_mut , TUPLE * & key ) : s_ ( s ) , mut_ ( rb_mut ) {
2010-05-19 23:42:06 +00:00
if ( mut_ ) pthread_mutex_lock ( mut_ ) ;
2010-03-09 17:47:47 +00:00
if ( key ) {
2010-03-09 19:02:54 +00:00
if ( s_ - > find ( key ) ! = s_ - > end ( ) ) {
next_ret_ = ( * ( s_ - > find ( key ) ) ) - > create_copy ( ) ;
} else if ( s_ - > upper_bound ( key ) ! = s_ - > end ( ) ) {
next_ret_ = ( * ( s_ - > upper_bound ( key ) ) ) - > create_copy ( ) ;
} else {
next_ret_ = NULL ;
}
2010-03-09 17:47:47 +00:00
} else {
2010-03-09 19:02:54 +00:00
if ( s_ - > begin ( ) = = s_ - > end ( ) ) {
next_ret_ = NULL ;
} else {
next_ret_ = ( * s_ - > begin ( ) ) - > create_copy ( ) ; // the create_copy() calls have to happen before we release mut_...
}
2010-03-09 17:47:47 +00:00
}
// DEBUG("changing mem next ret = %s key = %s\n", next_ret_ ? (const char*)next_ret_->key() : "NONE", key ? (const char*)key->key() : "NULL");
2010-05-19 23:42:06 +00:00
if ( mut_ ) pthread_mutex_unlock ( mut_ ) ;
2010-03-09 17:47:47 +00:00
}
2010-03-09 19:02:54 +00:00
~ revalidatingIterator ( ) {
2010-03-17 21:51:26 +00:00
if ( next_ret_ ) TUPLE : : freetuple ( next_ret_ ) ;
2010-03-09 17:47:47 +00:00
}
2010-05-28 01:29:10 +00:00
TUPLE * next_callerFrees ( ) {
2010-05-19 23:42:06 +00:00
if ( mut_ ) pthread_mutex_lock ( mut_ ) ;
2010-03-09 17:47:47 +00:00
TUPLE * ret = next_ret_ ;
if ( next_ret_ ) {
2010-03-09 19:02:54 +00:00
if ( s_ - > upper_bound ( next_ret_ ) = = s_ - > end ( ) ) {
next_ret_ = 0 ;
} else {
next_ret_ = ( * s_ - > upper_bound ( next_ret_ ) ) - > create_copy ( ) ;
}
2010-03-09 17:47:47 +00:00
}
2010-05-19 23:42:06 +00:00
if ( mut_ ) pthread_mutex_unlock ( mut_ ) ;
2010-03-09 17:47:47 +00:00
return ret ;
}
private :
2010-03-09 19:02:54 +00:00
explicit revalidatingIterator ( ) { abort ( ) ; }
void operator = ( revalidatingIterator & t ) { abort ( ) ; }
int operator - ( revalidatingIterator & t ) { abort ( ) ; }
2010-03-09 17:47:47 +00:00
rbtree_t * s_ ;
TUPLE * next_ret_ ;
pthread_mutex_t * mut_ ;
} ;
2010-08-18 17:29:25 +00:00
///////////////////////////////////////////////////////////////
// Revalidating iterator; automatically copes with changes to underlying tree
///////////////////////////////////////////////////////////////
class batchedRevalidatingIterator
{
private :
typedef typename rbtree_t : : const_iterator MTITER ;
void populate_next_ret_impl ( std : : _Rb_tree_const_iterator < TUPLE * > /*MTITER*/ it ) {
num_batched_ = 0 ;
cur_off_ = 0 ;
while ( it ! = s_ - > end ( ) & & num_batched_ < batch_size_ ) {
next_ret_ [ num_batched_ ] = ( * it ) - > create_copy ( ) ;
num_batched_ + + ;
it + + ;
}
}
void populate_next_ret ( TUPLE * key = NULL ) {
if ( cur_off_ = = num_batched_ ) {
if ( mut_ ) pthread_mutex_lock ( mut_ ) ;
2010-08-21 03:09:18 +00:00
if ( cur_size_ ) {
while ( * cur_size_ < ( 0.7 * ( double ) target_size_ ) & & ! * flushing_ ) { // TODO: how to pick this threshold? Too high, and the disk is idle. Too low, and we waste ram.
pthread_mutex_unlock ( mut_ ) ;
struct timespec ts ;
mergeManager : : double_to_ts ( & ts , 0.1 ) ;
nanosleep ( & ts , 0 ) ;
pthread_mutex_lock ( mut_ ) ;
}
}
2010-08-18 17:29:25 +00:00
if ( key ) {
populate_next_ret_impl ( s_ - > upper_bound ( key ) ) ;
} else {
populate_next_ret_impl ( s_ - > begin ( ) ) ;
}
if ( mut_ ) pthread_mutex_unlock ( mut_ ) ;
}
}
public :
2010-08-21 03:09:18 +00:00
batchedRevalidatingIterator ( rbtree_t * s , int64_t * cur_size , int64_t target_size , bool * flushing , int batch_size , pthread_mutex_t * rb_mut ) : s_ ( s ) , cur_size_ ( cur_size ) , target_size_ ( target_size ) , flushing_ ( flushing ) , batch_size_ ( batch_size ) , num_batched_ ( batch_size ) , cur_off_ ( batch_size ) , mut_ ( rb_mut ) {
2010-08-18 17:29:25 +00:00
next_ret_ = ( TUPLE * * ) malloc ( sizeof ( next_ret_ [ 0 ] ) * batch_size_ ) ;
populate_next_ret ( ) ;
/* if(mut_) pthread_mutex_lock(mut_);
if ( s_ - > begin ( ) = = s_ - > end ( ) ) {
next_ret_ = NULL ;
} else {
next_ret_ = ( * s_ - > begin ( ) ) - > create_copy ( ) ; // the create_copy() calls have to happen before we release mut_...
}
if ( mut_ ) pthread_mutex_unlock ( mut_ ) ; */
}
2010-08-21 03:30:12 +00:00
batchedRevalidatingIterator ( rbtree_t * s , int batch_size , pthread_mutex_t * rb_mut , TUPLE * & key ) : s_ ( s ) , cur_size_ ( 0 ) , target_size_ ( 0 ) , flushing_ ( 0 ) , batch_size_ ( batch_size ) , num_batched_ ( batch_size ) , cur_off_ ( batch_size ) , mut_ ( rb_mut ) {
2010-08-18 17:29:25 +00:00
next_ret_ = ( TUPLE * * ) malloc ( sizeof ( next_ret_ [ 0 ] ) * batch_size_ ) ;
populate_next_ret ( key ) ;
/* if(mut_) pthread_mutex_lock(mut_);
if ( key ) {
if ( s_ - > find ( key ) ! = s_ - > end ( ) ) {
next_ret_ = ( * ( s_ - > find ( key ) ) ) - > create_copy ( ) ;
} else if ( s_ - > upper_bound ( key ) ! = s_ - > end ( ) ) {
next_ret_ = ( * ( s_ - > upper_bound ( key ) ) ) - > create_copy ( ) ;
} else {
next_ret_ = NULL ;
}
} else {
if ( s_ - > begin ( ) = = s_ - > end ( ) ) {
next_ret_ = NULL ;
} else {
next_ret_ = ( * s_ - > begin ( ) ) - > create_copy ( ) ; // the create_copy() calls have to happen before we release mut_...
}
}
// DEBUG("changing mem next ret = %s key = %s\n", next_ret_ ? (const char*)next_ret_->key() : "NONE", key ? (const char*)key->key() : "NULL");
if ( mut_ ) pthread_mutex_unlock ( mut_ ) ; */
}
~ batchedRevalidatingIterator ( ) {
for ( int i = cur_off_ ; i < num_batched_ ; i + + ) {
2010-08-18 18:06:59 +00:00
TUPLE : : freetuple ( next_ret_ [ i ] ) ;
2010-08-18 17:29:25 +00:00
}
free ( next_ret_ ) ;
// if(next_ret_) TUPLE::freetuple(next_ret_);
}
TUPLE * next_callerFrees ( ) {
/* if(mut_) pthread_mutex_lock(mut_);
TUPLE * ret = next_ret_ ;
if ( next_ret_ ) {
if ( s_ - > upper_bound ( next_ret_ ) = = s_ - > end ( ) ) {
next_ret_ = 0 ;
} else {
next_ret_ = ( * s_ - > upper_bound ( next_ret_ ) ) - > create_copy ( ) ;
}
}
if ( mut_ ) pthread_mutex_unlock ( mut_ ) ; */
if ( cur_off_ = = num_batched_ ) { return NULL ; } // the last thing we did is call populate_next_ret_(), which only leaves us in this state at the end of the iterator.
TUPLE * ret = next_ret_ [ cur_off_ ] ;
cur_off_ + + ;
populate_next_ret ( ret ) ;
return ret ;
}
private :
explicit batchedRevalidatingIterator ( ) { abort ( ) ; }
void operator = ( batchedRevalidatingIterator & t ) { abort ( ) ; }
int operator - ( batchedRevalidatingIterator & t ) { abort ( ) ; }
rbtree_t * s_ ;
TUPLE * * next_ret_ ;
2010-08-21 03:09:18 +00:00
int64_t * cur_size_ ; // a pointer to the current size of the red-black tree, in bytes.
int64_t target_size_ ; // the low-water size for the tree. If cur_size_ is not null, and *cur_size_ < C * target_size_, we sleep.
bool * flushing_ ; // never block if *flushing is true.
2010-08-18 17:29:25 +00:00
int batch_size_ ;
int num_batched_ ;
int cur_off_ ;
pthread_mutex_t * mut_ ;
} ;
2010-03-09 01:42:23 +00:00
} ;
# endif //_MEMTREECOMPONENT_H_