No more leaks; "fixed" RB tree memory usage estimates.

This commit is contained in:
Sears Russell 2007-11-04 01:51:37 +00:00
parent 5a6ba6ed94
commit e46dcce461
6 changed files with 143 additions and 39 deletions

View file

@ -38,8 +38,11 @@ namespace rose {
typename PAGELAYOUT::FMT::TUP t; typename PAGELAYOUT::FMT::TUP t;
static const long INSERTS = 1000000; static const long INSERTS = 10000000;
long int count = INSERTS / 20; // static const long INSERTS = 10000000;
// static const long INSERTS = 100000;
static const long COUNT = INSERTS / 100;
long int count = COUNT;
struct timeval start_tv, now_tv; struct timeval start_tv, now_tv;
double start, now, last_start; double start, now, last_start;
@ -48,12 +51,17 @@ namespace rose {
start = rose::tv_to_double(start_tv); start = rose::tv_to_double(start_tv);
last_start = start; last_start = start;
printf("tuple 'size'%d\n", PAGELAYOUT::FMT::TUP::sizeofBytes());
for(long int i = 0; i < INSERTS; i++) { for(long int i = 0; i < INSERTS; i++) {
t.set0(&i); t.set0(&i);
t.set1(&i);
t.set2(&i);
t.set3(&i);
TlsmTableInsert(h,t); TlsmTableInsert(h,t);
count --; count --;
if(!count) { if(!count) {
count = INSERTS / 20; count = COUNT;
gettimeofday(&now_tv,0); gettimeofday(&now_tv,0);
now = tv_to_double(now_tv); now = tv_to_double(now_tv);
printf("%3d%% complete " printf("%3d%% complete "

View file

@ -38,7 +38,6 @@ template <class STLITER, class ROW> class stlSetIterator;
template <class STLITER, class ROW> template <class STLITER, class ROW>
inline const byte * toByteArray(stlSetIterator<STLITER,ROW> * const t); inline const byte * toByteArray(stlSetIterator<STLITER,ROW> * const t);
/** /**
Scans through an LSM tree's leaf pages, each tuple in the tree, in Scans through an LSM tree's leaf pages, each tuple in the tree, in
order. This iterator is designed for maximum forward scan order. This iterator is designed for maximum forward scan
@ -70,7 +69,23 @@ class treeIterator {
{ {
init_helper(); init_helper();
} }
typedef recordid handle; // typedef recordid handle;
class treeIteratorHandle {
public:
treeIteratorHandle() : r_(NULLRID) {}
treeIteratorHandle(const recordid r) : r_(r) {}
/* const treeIteratorHandle & operator=(const recordid *r) {
r_ = *r;
return this;
} */
treeIteratorHandle * operator=(const recordid &r) {
r_ = r;
return this;
}
recordid r_;
};
typedef treeIteratorHandle* handle;
explicit treeIterator(recordid tree) : explicit treeIterator(recordid tree) :
tree_(tree), tree_(tree),
scratch_(), scratch_(),
@ -80,6 +95,15 @@ class treeIterator {
{ {
init_helper(); init_helper();
} }
explicit treeIterator(treeIteratorHandle* tree) :
tree_(tree->r_),
scratch_(),
keylen_(ROW::sizeofBytes()),
lsmIterator_(lsmTreeIterator_open(-1,tree->r_)),
slot_(0)
{
init_helper();
}
explicit treeIterator(treeIterator& t) : explicit treeIterator(treeIterator& t) :
tree_(t.tree_), tree_(t.tree_),
scratch_(t.scratch_), scratch_(t.scratch_),

View file

@ -19,9 +19,14 @@ namespace rose {
dispatched), interface to the underlying primititves dispatched), interface to the underlying primititves
*/ */
// Lower total work by perfomrming one merge at higher level
// for every FUDGE^2 merges at the immediately lower level.
// (Constrast to R, which controls the ratio of sizes of the trees.)
static const int FUDGE = 1;
template<class PAGELAYOUT, class ITERA, class ITERB> template<class PAGELAYOUT, class ITERA, class ITERB>
struct merge_args { struct merge_args {
int worker_id;
pageid_t(*pageAlloc)(int,void*); pageid_t(*pageAlloc)(int,void*);
void *pageAllocState; void *pageAllocState;
pthread_mutex_t * block_ready_mut; pthread_mutex_t * block_ready_mut;
@ -30,6 +35,10 @@ namespace rose {
pthread_cond_t * in_block_ready_cond; pthread_cond_t * in_block_ready_cond;
pthread_cond_t * out_block_ready_cond; pthread_cond_t * out_block_ready_cond;
bool * still_open; bool * still_open;
pageid_t * my_tree_size;
pageid_t * out_tree_size;
pageid_t max_size;
pageid_t r_i;
typename ITERA::handle in_process_tree; typename ITERA::handle in_process_tree;
typename ITERB::handle ** in_tree; typename ITERB::handle ** in_tree;
typename ITERA::handle ** out_tree; typename ITERA::handle ** out_tree;
@ -110,8 +119,10 @@ namespace rose {
// Initialize tree with an empty tree. // Initialize tree with an empty tree.
// XXX hardcodes ITERA's type: // XXX hardcodes ITERA's type:
typename ITERA::handle oldtree = TlsmCreate(xid, PAGELAYOUT::cmp_id(),a->pageAlloc, typename ITERA::handle oldtree
a->pageAllocState,PAGELAYOUT::FMT::TUP::sizeofBytes()); = new typename ITERA::treeIteratorHandle(
TlsmCreate(xid, PAGELAYOUT::cmp_id(),a->pageAlloc,
a->pageAllocState,PAGELAYOUT::FMT::TUP::sizeofBytes()) );
Tcommit(xid); Tcommit(xid);
// loop around here to produce multiple batches for merge. // loop around here to produce multiple batches for merge.
@ -142,20 +153,12 @@ namespace rose {
gettimeofday(&wait_tv,0); gettimeofday(&wait_tv,0);
// XXX keep in_tree handle around so that it can be freed below.
typename ITERB::handle old_in_tree = **a->in_tree;
ITERA taBegin(oldtree); ITERA taBegin(oldtree);
ITERB tbBegin(**a->in_tree); ITERB tbBegin(**a->in_tree);
ITERA *taEnd = taBegin.end(); ITERA *taEnd = taBegin.end();
ITERB *tbEnd = tbBegin.end(); ITERB *tbEnd = tbBegin.end();
free(*a->in_tree); // free's copy of handle; not tree
*a->in_tree = 0; // free slot for producer
pthread_cond_signal(a->in_block_needed_cond);
pthread_mutex_unlock(a->block_ready_mut); pthread_mutex_unlock(a->block_ready_mut);
@ -180,6 +183,7 @@ namespace rose {
delete taEnd; delete taEnd;
delete tbEnd; delete tbEnd;
gettimeofday(&stop_tv,0); gettimeofday(&stop_tv,0);
// TlsmFree(wait_queue[0]) /// XXX Need to implement (de)allocation! // TlsmFree(wait_queue[0]) /// XXX Need to implement (de)allocation!
@ -195,26 +199,47 @@ namespace rose {
double throughput = ((double)(insertedTuples * (uint64_t)PAGELAYOUT::FMT::TUP::sizeofBytes())) double throughput = ((double)(insertedTuples * (uint64_t)PAGELAYOUT::FMT::TUP::sizeofBytes()))
/ (1024.0 * 1024.0 * total_elapsed); / (1024.0 * 1024.0 * total_elapsed);
printf("merge # %-6d: comp ratio: %-9.3f waited %6.1f sec " printf("worker %d merge # %-6d: comp ratio: %-9.3f waited %6.1f sec "
"worked %6.1f sec inserts %-12ld (%9.3f mb/s) %6d pages\n", merge_count, ratio, "worked %6.1f sec inserts %-12ld (%9.3f mb/s) %6d pages (need %6d)\n", a->worker_id, merge_count, ratio,
wait_elapsed, work_elapsed, (unsigned long)insertedTuples, throughput, mergedPages); wait_elapsed, work_elapsed, (unsigned long)insertedTuples, throughput, mergedPages, !a->out_tree_size ? -1 : (FUDGE * *a->out_tree_size / a->r_i));
gettimeofday(&start_tv,0); gettimeofday(&start_tv,0);
pthread_mutex_lock(a->block_ready_mut); pthread_mutex_lock(a->block_ready_mut);
static int threshold_calc = 1000; // XXX REALLY NEED TO FIX THIS! // keep actual handle around so that it can be freed below.
if(a->out_tree && // is there a upstream merger (note the lack of the * on a->out_tree)? typename ITERB::handle old_in_tree = **a->in_tree;
mergedPages > threshold_calc // do we have enough data to bother it? delete old_in_tree;
free(*a->in_tree); // free pointer to handle
// XXX should we delay this to this point?
// otherwise, the contents of in_tree become temporarily unavailable to observers.
*a->in_tree = 0; // tell producer that the slot is now open
pthread_cond_signal(a->in_block_needed_cond);
if(a->out_tree_size) {
*a->my_tree_size = *a->out_tree_size / (a->r_i * FUDGE);
} else {
if(*a->my_tree_size < mergedPages) {
*a->my_tree_size = mergedPages;
}
}
if(a->out_tree && // is there a upstream merger? (note the lack of the * on a->out_tree)
((a->max_size && mergedPages > a->max_size )
||
mergedPages > (FUDGE * *a->out_tree_size / a->r_i)) // do we have enough data to bother it?
) { ) {
while(*a->out_tree) { // we probably don't need the "while..." while(*a->out_tree) { // we probably don't need the "while..."
pthread_cond_wait(a->out_block_needed_cond, a->block_ready_mut); pthread_cond_wait(a->out_block_needed_cond, a->block_ready_mut);
} }
// XXX C++? Objects? Constructors? Who needs them? // XXX C++? Objects? Constructors? Who needs them?
*a->out_tree = (recordid*)malloc(sizeof(tree)); *a->out_tree = (typeof(*a->out_tree))malloc(sizeof(**a->out_tree));
**a->out_tree = tree; **a->out_tree = new typename ITERA::treeIteratorHandle(tree);
pthread_cond_signal(a->out_block_ready_cond); pthread_cond_signal(a->out_block_ready_cond);
// This is a bit wasteful; allocate a new empty tree to merge against. // This is a bit wasteful; allocate a new empty tree to merge against.
@ -228,7 +253,7 @@ namespace rose {
// XXX TlsmFree(xid,oldtree); // XXX TlsmFree(xid,oldtree);
oldtree = tree; *oldtree = tree;
pthread_mutex_unlock(a->block_ready_mut); pthread_mutex_unlock(a->block_ready_mut);
@ -287,6 +312,7 @@ namespace rose {
pthread_mutex_t * mut; pthread_mutex_t * mut;
pthread_cond_t * input_ready_cond; pthread_cond_t * input_ready_cond;
pthread_cond_t * input_needed_cond; pthread_cond_t * input_needed_cond;
pageid_t * input_size;
merge_args<PAGELAYOUT, treeIterator<typename PAGELAYOUT::FMT::TUP, merge_args<PAGELAYOUT, treeIterator<typename PAGELAYOUT::FMT::TUP,
typename PAGELAYOUT::FMT>, treeIterator<typename PAGELAYOUT::FMT::TUP, typename PAGELAYOUT::FMT>, treeIterator<typename PAGELAYOUT::FMT::TUP,
typename PAGELAYOUT::FMT> > * args1; typename PAGELAYOUT::FMT> > * args1;
@ -296,6 +322,15 @@ namespace rose {
typename PAGELAYOUT::FMT::TUP> > * args2; typename PAGELAYOUT::FMT::TUP> > * args2;
}; };
// How many bytes of tuples can we afford to keep in RAM?
// this is just a guessed value... it seems about right based on
// experiments, but 450 bytes overhead per tuple is insane!
static const int RB_TREE_OVERHEAD = 450;
static const pageid_t MEM_SIZE = 800 * 1000 * 1000;
// How many pages should we try to fill with the first C1 merge?
static const pageid_t START_SIZE = 10 * 1000;
static const int R = 40;
template<class PAGELAYOUT> template<class PAGELAYOUT>
lsmTableHandle <PAGELAYOUT> * TlsmTableStart(recordid& tree) { lsmTableHandle <PAGELAYOUT> * TlsmTableStart(recordid& tree) {
/// XXX xid for daemon processes? /// XXX xid for daemon processes?
@ -335,6 +370,13 @@ namespace rose {
pthread_cond_init(block1_ready_cond,0); pthread_cond_init(block1_ready_cond,0);
pthread_cond_init(block2_ready_cond,0); pthread_cond_init(block2_ready_cond,0);
pageid_t * block0_size = (pageid_t*)malloc(sizeof(pageid_t));
// don't merge until we have enough data to be worthwhile...
*block0_size = START_SIZE;
pageid_t * block1_size = (pageid_t*)malloc(sizeof(pageid_t));
// similarly, wait to merge the next block until we have merged block FUDGE times.
*block1_size = FUDGE * R * *block0_size;
LSM_HANDLE ** block1_scratch = (LSM_HANDLE**) malloc(sizeof(LSM_HANDLE*)); LSM_HANDLE ** block1_scratch = (LSM_HANDLE**) malloc(sizeof(LSM_HANDLE*));
*block1_scratch = 0; *block1_scratch = 0;
@ -357,7 +399,7 @@ namespace rose {
ret->input_ready_cond = block0_ready_cond; ret->input_ready_cond = block0_ready_cond;
ret->input_needed_cond = block0_needed_cond; ret->input_needed_cond = block0_needed_cond;
ret->input_size = block0_size;
recordid * ridp = (recordid*)malloc(sizeof(recordid)); recordid * ridp = (recordid*)malloc(sizeof(recordid));
*ridp = h.bigTreeAllocState; *ridp = h.bigTreeAllocState;
@ -365,6 +407,7 @@ namespace rose {
ret->args1 = (merge_args<PAGELAYOUT,LSM_ITER,LSM_ITER>*)malloc(sizeof(merge_args<PAGELAYOUT,LSM_ITER,LSM_ITER>)); ret->args1 = (merge_args<PAGELAYOUT,LSM_ITER,LSM_ITER>*)malloc(sizeof(merge_args<PAGELAYOUT,LSM_ITER,LSM_ITER>));
merge_args<PAGELAYOUT, LSM_ITER, LSM_ITER> tmpargs1 = merge_args<PAGELAYOUT, LSM_ITER, LSM_ITER> tmpargs1 =
{ {
1,
TlsmRegionAllocRid, TlsmRegionAllocRid,
ridp, ridp,
block_ready_mut, block_ready_mut,
@ -373,7 +416,11 @@ namespace rose {
block1_ready_cond, block1_ready_cond,
block2_ready_cond, block2_ready_cond,
ret->still_open, ret->still_open,
NULLRID, block1_size,
0, // biggest component computes its size directly.
0, // No max size for biggest component
R,
new typename LSM_ITER::treeIteratorHandle(NULLRID),
block1_scratch, block1_scratch,
0 0
}; };
@ -386,6 +433,7 @@ namespace rose {
ret->args2 = (merge_args<PAGELAYOUT,LSM_ITER,RB_ITER>*)malloc(sizeof(merge_args<PAGELAYOUT,LSM_ITER,RB_ITER>)); ret->args2 = (merge_args<PAGELAYOUT,LSM_ITER,RB_ITER>*)malloc(sizeof(merge_args<PAGELAYOUT,LSM_ITER,RB_ITER>));
merge_args<PAGELAYOUT, LSM_ITER, RB_ITER> tmpargs2 = merge_args<PAGELAYOUT, LSM_ITER, RB_ITER> tmpargs2 =
{ {
2,
TlsmRegionAllocRid, TlsmRegionAllocRid,
ridp, ridp,
block_ready_mut, block_ready_mut,
@ -394,7 +442,11 @@ namespace rose {
block0_ready_cond, block0_ready_cond,
block1_ready_cond, block1_ready_cond,
ret->still_open, ret->still_open,
NULLRID, block0_size,
block1_size,
(R * MEM_SIZE) / (PAGE_SIZE * 4), // 4 = estimated compression ratio
R,
new typename LSM_ITER::treeIteratorHandle(NULLRID),
block0_scratch, block0_scratch,
block1_scratch block1_scratch
}; };
@ -407,16 +459,7 @@ namespace rose {
return ret; return ret;
} }
template<class PAGELAYOUT> template<class PAGELAYOUT>
void TlsmTableStop( lsmTableHandle<PAGELAYOUT> * h) { void TlsmTableFlush(lsmTableHandle<PAGELAYOUT> *h) {
*(h->still_open) = 0;
pthread_join(h->merge1_thread,0);
pthread_join(h->merge2_thread,0);
}
template<class PAGELAYOUT>
void TlsmTableInsert( lsmTableHandle<PAGELAYOUT> *h,
typename PAGELAYOUT::FMT::TUP &t) {
h->scratch_handle->insert(t);
if(h->scratch_handle->size() > 100000) { // XXX set threshold sanely!!!
pthread_mutex_lock(h->mut); pthread_mutex_lock(h->mut);
while(*h->input_handle) { while(*h->input_handle) {
@ -429,10 +472,32 @@ namespace rose {
*(h->input_handle) = tmp_ptr; *(h->input_handle) = tmp_ptr;
pthread_cond_signal(h->input_ready_cond); pthread_cond_signal(h->input_ready_cond);
h->scratch_handle = new typeof(*h->scratch_handle); h->scratch_handle = new typeof(*h->scratch_handle);
pthread_mutex_unlock(h->mut); pthread_mutex_unlock(h->mut);
}
template<class PAGELAYOUT>
void TlsmTableStop( lsmTableHandle<PAGELAYOUT> * h) {
TlsmTableFlush(h);
*(h->still_open) = 0;
pthread_join(h->merge1_thread,0);
pthread_join(h->merge2_thread,0);
}
template<class PAGELAYOUT>
void TlsmTableInsert( lsmTableHandle<PAGELAYOUT> *h,
typename PAGELAYOUT::FMT::TUP &t) {
h->scratch_handle->insert(t);
//XXX 4 = estimated compression ratio.
uint64_t handleBytes = h->scratch_handle->size() * (RB_TREE_OVERHEAD + PAGELAYOUT::FMT::TUP::sizeofBytes());
uint64_t inputSizeThresh = (4 * PAGE_SIZE * *h->input_size); // / (PAGELAYOUT::FMT::TUP::sizeofBytes());
uint64_t memSizeThresh = MEM_SIZE;
if(handleBytes > inputSizeThresh || handleBytes > memSizeThresh) { // XXX ok?
printf("Handle mbytes %ld Input size: %ld input size thresh: %ld mbytes mem size thresh: %ld\n",
handleBytes / (1024*1024), *h->input_size, inputSizeThresh / (1024*1024), memSizeThresh / (1024*1024));
TlsmTableFlush<PAGELAYOUT>(h);
} }
} }

View file

@ -253,6 +253,8 @@ recordid TlsmCreate(int xid, int comparator,
writeNodeRecord(xid, p, DEPTH, dummy, keySize, 0); writeNodeRecord(xid, p, DEPTH, dummy, keySize, 0);
writeNodeRecord(xid, p, COMPARATOR, dummy, keySize, comparator); writeNodeRecord(xid, p, COMPARATOR, dummy, keySize, comparator);
free(dummy);
unlock(p->rwlatch); unlock(p->rwlatch);
releasePage(p); releasePage(p);
return ret; return ret;

View file

@ -28,7 +28,7 @@ namespace rose {
plugin_id_t pluginid = plugin_id<FORMAT, COMPRESSOR, typename COMPRESSOR::TYP>(); plugin_id_t pluginid = plugin_id<FORMAT, COMPRESSOR, typename COMPRESSOR::TYP>();
plugin_id_t * plugins = new plugin_id_t[column_count]; plugin_id_t * plugins = (plugin_id_t*)malloc(column_count * sizeof(plugin_id_t));
for(column_number_t c = 0; c < column_count; c++) { for(column_number_t c = 0; c < column_count; c++) {
plugins[c] = pluginid; plugins[c] = pluginid;
} }
@ -38,6 +38,7 @@ namespace rose {
typename COMPRESSOR::TYP val = *(typename COMPRESSOR::TYP*)(t->get(c)); typename COMPRESSOR::TYP val = *(typename COMPRESSOR::TYP*)(t->get(c));
com->offset(val); com->offset(val);
} }
free(plugins);
return f; return f;
} }
static inline int cmp_id() { static inline int cmp_id() {

View file

@ -44,7 +44,11 @@ void dirtyPages_remove(Page * p) {
//assert(pblHtLookup(dirtyPages, &(p->id), sizeof(int))); //assert(pblHtLookup(dirtyPages, &(p->id), sizeof(int)));
// printf("With lsn = %d\n", (lsn_t)pblHtCurrent(dirtyPages)); // printf("With lsn = %d\n", (lsn_t)pblHtCurrent(dirtyPages));
p->dirty = 0; p->dirty = 0;
lsn_t * old = pblHtLookup(dirtyPages, &(p->id),sizeof(int));
pblHtRemove(dirtyPages, &(p->id), sizeof(int)); pblHtRemove(dirtyPages, &(p->id), sizeof(int));
if(old) {
free(old);
}
//assert(!ret); <--- Due to a bug in the PBL compatibility mode, //assert(!ret); <--- Due to a bug in the PBL compatibility mode,
//there is no way to tell whether the value didn't exist, or if it //there is no way to tell whether the value didn't exist, or if it
//was null. //was null.