rewrote mergeThread, added ability to start iterators mid lsmTable

This commit is contained in:
Sears Russell 2008-11-26 07:14:23 +00:00
parent 1985ec6424
commit 549f97d297

View file

@ -71,23 +71,23 @@ namespace rose {
pageCount++; pageCount++;
Page *p = loadPage(xid, next_page); Page *p = loadPage(xid, next_page);
writelock(p->rwlatch,0);
stasis_page_cleanup(p); stasis_page_cleanup(p);
typename PAGELAYOUT::FMT * mc = PAGELAYOUT::initPage(p, &**begin); typename PAGELAYOUT::FMT * mc = PAGELAYOUT::initPage(p, &**begin);
for(ITER i(*begin); i != *end; ++i) { for(ITER i(*begin); i != *end; ++i) {
/* for(int c = 0; c < (*i).column_count(); c++) {
assert(*(byte*)(*i).get(c) || !*(byte*)(*i).get(c));
} */
rose::slot_index_t ret = mc->append(xid, *i); rose::slot_index_t ret = mc->append(xid, *i);
if(ret == rose::NOSPACE) { if(ret == rose::NOSPACE) {
dirtyPages_add(p); dirtyPages_add(p);
// p->dirty = 1;
mc->pack(); mc->pack();
unlock(p->rwlatch);
releasePage(p); releasePage(p);
next_page = pageAlloc(xid,pageAllocState); next_page = pageAlloc(xid,pageAllocState);
TlsmAppendPage(xid,tree,(*i).toByteArray(),pageAlloc,pageAllocState,next_page); TlsmAppendPage(xid,tree,(*i).toByteArray(),pageAlloc,pageAllocState,next_page);
p = loadPage(xid, next_page); p = loadPage(xid, next_page);
writelock(p->rwlatch,0);
mc = PAGELAYOUT::initPage(p, &*i); mc = PAGELAYOUT::initPage(p, &*i);
pageCount++; pageCount++;
ret = mc->append(xid, *i); ret = mc->append(xid, *i);
@ -96,8 +96,8 @@ namespace rose {
(*inserted)++; (*inserted)++;
} }
dirtyPages_add(p); dirtyPages_add(p);
// p->dirty = 1;
mc->pack(); mc->pack();
unlock(p->rwlatch);
releasePage(p); releasePage(p);
return pageCount; return pageCount;
} }
@ -107,28 +107,33 @@ namespace rose {
// this is just a guessed value... it seems about right based on // this is just a guessed value... it seems about right based on
// experiments, but 450 bytes overhead per tuple is insane! // experiments, but 450 bytes overhead per tuple is insane!
static const int RB_TREE_OVERHEAD = 400; // = 450; static const int RB_TREE_OVERHEAD = 400; // = 450;
static const pageid_t MEM_SIZE = 1000 * 1000 * 1000; static pageid_t C0_MEM_SIZE = 1000 * 1000 * 1000;
// static const pageid_t MEM_SIZE = 100 * 1000;
// How many pages should we try to fill with the first C1 merge? // How many pages should we try to fill with the first C1 merge?
static int R = 10; // XXX set this as low as possible (for dynamic setting. = sqrt(C2 size / C0 size)) static int R = 10; // XXX set this as low as possible (for dynamic setting. = sqrt(C2 size / C0 size))
#ifdef THROTTLED #ifdef THROTTLED
static const pageid_t START_SIZE = 100; //10 * 1000; /*10 **/ //1000; // XXX 4 is fudge related to RB overhead. static const pageid_t START_SIZE = 100; //10 * 1000; /*10 **/ //1000; // XXX 4 is fudge related to RB overhead.
#else #else
static const pageid_t START_SIZE = MEM_SIZE * R /( PAGE_SIZE * 4); //10 * 1000; /*10 **/ //1000; // XXX 4 is fudge related to RB overhead. Do not run this code
static const pageid_t START_SIZE = C0_MEM_SIZE * R /( PAGE_SIZE * 4); //10 * 1000; /*10 **/ //1000; // XXX 4 is fudge related to RB overhead.
#endif #endif
// Lower total work by perfomrming one merge at higher level // Lower total work by perfomrming one merge at higher level
// for every FUDGE^2 merges at the immediately lower level. // for every FUDGE^2 merges at the immediately lower level.
// (Constrast to R, which controls the ratio of sizes of the trees.) // (Constrast to R, which controls the ratio of sizes of the trees.)
static const int FUDGE = 1; static const int FUDGE = 1;
/** /**
ITERA is an iterator over the data structure that mergeThread creates (a lsm tree iterator). ITERA is an iterator over the data structure that mergeThread creates (a lsm tree iterator).
ITERB is an iterator over the data structures that mergeThread takes as input (lsm tree, or rb tree..) ITERB is an iterator over the data structures that mergeThread takes as input (lsm tree, or rb tree..)
*/ */
template<class PAGELAYOUT, class ITERA, class ITERB> template<class PAGELAYOUT, class ITERA, class ITERB>
void* mergeThread(void* arg) { void* mergeThread(void* arg) {
typedef typename PAGELAYOUT::FMT::TUP TUP;
typedef mergeIterator<ITERA, ITERB, TUP> MERGE_ITER;
typedef gcIterator<TUP,MERGE_ITER> GC_ITER;
typedef stlSetIterator<std::set<TUP, typename TUP::stl_cmp>, TUP> STL_ITER;
// The ITER argument of a is unused (we don't look at it's begin or end fields...) // The ITER argument of a is unused (we don't look at it's begin or end fields...)
merge_args<PAGELAYOUT, ITERA, ITERB> * a = (merge_args<PAGELAYOUT, ITERA, ITERB>*)arg; merge_args<PAGELAYOUT, ITERA, ITERB> * a = (merge_args<PAGELAYOUT, ITERA, ITERB>*)arg;
struct timeval start_tv, start_push_tv, wait_tv, stop_tv; struct timeval start_tv, start_push_tv, wait_tv, stop_tv;
@ -139,19 +144,19 @@ namespace rose {
// XXX hardcodes ITERA's type: // XXX hardcodes ITERA's type:
// We assume that the caller set pageAllocState for us; oldPageAllocState // We assume that the caller set pageAllocState for us; oldPageAllocState
// shouldn't be set (it should be NULLRID) // shouldn't be set (it should be NULLRID)
typename ITERA::handle tree assert(a->my_tree->r_.size != -1);
= new typename ITERA::treeIteratorHandle(
TlsmCreate(xid, PAGELAYOUT::cmp_id(),a->pageAlloc,
a->pageAllocState,PAGELAYOUT::FMT::TUP::sizeofBytes()) );
// loop around here to produce multiple batches for merge. // loop around here to produce multiple batches for merge.
gettimeofday(&start_push_tv,0); gettimeofday(&start_push_tv,0);
gettimeofday(&start_tv,0); gettimeofday(&start_tv,0);
pthread_mutex_lock(a->block_ready_mut);
while(1) { while(1) {
pthread_mutex_lock(a->block_ready_mut);
int done = 0; int done = 0;
// get a new input for merge
while(!*(a->in_tree)) { while(!*(a->in_tree)) {
*a->in_block_needed = true; *a->in_block_needed = true;
pthread_cond_signal(a->in_block_needed_cond); pthread_cond_signal(a->in_block_needed_cond);
@ -164,216 +169,180 @@ namespace rose {
*a->in_block_needed = false; *a->in_block_needed = false;
if(done) { if(done) {
pthread_cond_signal(a->out_block_ready_cond); pthread_cond_signal(a->out_block_ready_cond);
pthread_mutex_unlock(a->block_ready_mut);
break; break;
} }
gettimeofday(&wait_tv,0); gettimeofday(&wait_tv,0);
epoch_t current_timestamp = a->last_complete_xact ? *(a->last_complete_xact) : 0; epoch_t current_timestamp = a->last_complete_xact ? *(a->last_complete_xact) : 0;
uint64_t insertedTuples; uint64_t insertedTuples;
pageid_t mergedPages; pageid_t mergedPages;
ITERA *taBegin = new ITERA(tree);
assert(a->my_tree->r_.size != -1);
ITERA *taBegin = new ITERA(a->my_tree);
ITERB *tbBegin = new ITERB(**a->in_tree); ITERB *tbBegin = new ITERB(**a->in_tree);
ITERA *taEnd = taBegin->end(); ITERA *taEnd = taBegin->end();
ITERB *tbEnd = tbBegin->end(); ITERB *tbEnd = tbBegin->end();
{ // this { protects us from recalcitrant iterators below (tree iterators hold stasis page latches...)
pthread_mutex_unlock(a->block_ready_mut);
Tcommit(xid); Tcommit(xid);
xid = Tbegin(); xid = Tbegin();
// XXX hardcodes allocator type.
if(((recordid*)a->oldAllocState)->size != -1) {
// free the tree that we merged against during the last round.
TlsmFree(xid,tree->r_,TlsmRegionDeallocRid,a->oldAllocState);
}
// we're merging against old alloc state this round.
*(recordid*)(a->oldAllocState) = *(recordid*)(a->pageAllocState);
// we're merging into pagealloc state.
*(recordid*)(a->pageAllocState) = Talloc(xid, sizeof(TlsmRegionAllocConf_t));
Tset(xid, *(recordid*)(a->pageAllocState),
&LSM_REGION_ALLOC_STATIC_INITIALIZER);
tree->r_ = TlsmCreate(xid, PAGELAYOUT::cmp_id(),a->pageAlloc,
a->pageAllocState,PAGELAYOUT::FMT::TUP::sizeofBytes());
mergeIterator<ITERA, ITERB, typename PAGELAYOUT::FMT::TUP> recordid * scratchAllocState = (recordid*)malloc(sizeof(recordid));
mBegin(*taBegin, *tbBegin, *taEnd, *tbEnd); *scratchAllocState = Talloc(xid, sizeof(TlsmRegionAllocConf_t));
Tset(xid, *scratchAllocState, &LSM_REGION_ALLOC_STATIC_INITIALIZER);
mergeIterator<ITERA, ITERB, typename PAGELAYOUT::FMT::TUP> typename ITERA::handle scratch_tree
mEnd(*taBegin, *tbBegin, *taEnd, *tbEnd); = new typename ITERA::treeIteratorHandle(
TlsmCreate(xid, PAGELAYOUT::cmp_id(),a->pageAlloc,
scratchAllocState,TUP::sizeofBytes()) );
// XXX
pthread_mutex_unlock(a->block_ready_mut);
mEnd.seekEnd(); { // this { allows us to explicitly free the iterators mid-function
/* versioningIterator<mergeIterator MERGE_ITER mBegin(*taBegin, *tbBegin, *taEnd, *tbEnd);
<ITERA,ITERB,typename PAGELAYOUT::FMT::TUP>, MERGE_ITER mEnd(*taBegin, *tbBegin, *taEnd, *tbEnd);
typename PAGELAYOUT::FMT::TUP> vBegin(mBegin,mEnd,0);
versioningIterator<mergeIterator mEnd.seekEnd();
<ITERA,ITERB,typename PAGELAYOUT::FMT::TUP>,
typename PAGELAYOUT::FMT::TUP> vEnd(mBegin,mEnd,0);
vEnd.seekEnd(); GC_ITER gcBegin(&mBegin, &mEnd, current_timestamp, a->ts_col);
GC_ITER gcEnd;
mergedPages = compressData mergedPages = compressData<PAGELAYOUT, GC_ITER>
<PAGELAYOUT,versioningIterator<mergeIterator<ITERA,ITERB,typename PAGELAYOUT::FMT::TUP>, typename PAGELAYOUT::FMT::TUP> > (xid, &gcBegin, &gcEnd,scratch_tree->r_,a->pageAlloc,scratchAllocState,&insertedTuples);
(xid, &vBegin, &vEnd,tree->r_,a->pageAlloc,a->pageAllocState,&insertedTuples); */
/*mergedPages = compressData
<PAGELAYOUT,mergeIterator<ITERA,ITERB,typename PAGELAYOUT::FMT::TUP> >
(xid, &mBegin, &mEnd,tree->r_,a->pageAlloc,a->pageAllocState,&insertedTuples); */
gcIterator<typename PAGELAYOUT::FMT::TUP,mergeIterator<ITERA,ITERB,typename PAGELAYOUT::FMT::TUP> > gcBegin(&mBegin, &mEnd, current_timestamp, a->ts_col);
gcIterator<typename PAGELAYOUT::FMT::TUP,mergeIterator<ITERA,ITERB,typename PAGELAYOUT::FMT::TUP> > gcEnd;
mergedPages = compressData
<PAGELAYOUT, gcIterator<typename PAGELAYOUT::FMT::TUP,mergeIterator<ITERA,ITERB,typename PAGELAYOUT::FMT::TUP> > >
(xid, &gcBegin, &gcEnd,tree->r_,a->pageAlloc,a->pageAllocState,&insertedTuples);
// these tree iterators keep pages pinned! Don't call force until they've been deleted, or we'll deadlock.
// tree iterators are pinning pages we want to force, so free them.
} // free all the stack allocated iterators... } // free all the stack allocated iterators...
delete taBegin; delete taBegin;
delete tbBegin; delete tbBegin;
delete taEnd; delete taEnd;
delete tbEnd; delete tbEnd;
// XXX hardcodes tree type. // XXX hardcodes tree type.
TlsmForce(xid,tree->r_,TlsmRegionForceRid,a->pageAllocState); TlsmForce(xid,scratch_tree->r_,TlsmRegionForceRid,scratchAllocState);
//XXX
pthread_mutex_lock(a->block_ready_mut);
gettimeofday(&stop_tv,0); gettimeofday(&stop_tv,0);
merge_count++; merge_count++;
double wait_elapsed = tv_to_double(wait_tv) - tv_to_double(start_tv);
double work_elapsed = tv_to_double(stop_tv) - tv_to_double(wait_tv);
double push_elapsed = tv_to_double(start_tv) - tv_to_double(start_push_tv);
double total_elapsed = wait_elapsed + work_elapsed;
double ratio = ((double)(insertedTuples * (uint64_t)PAGELAYOUT::FMT::TUP::sizeofBytes())) double ratio = ((double)(insertedTuples * (uint64_t)PAGELAYOUT::FMT::TUP::sizeofBytes()))
/ (double)(PAGE_SIZE * mergedPages); / (double)(PAGE_SIZE * mergedPages);
double throughput = ((double)(insertedTuples * (uint64_t)PAGELAYOUT::FMT::TUP::sizeofBytes()))
/ (1024.0 * 1024.0 * total_elapsed);
printf("worker %d merge # %-6d: comp ratio: %-9.3f stalled %6.1f sec backpressure %6.1f "
"worked %6.1f sec inserts %-12ld (%9.3f mb/s) %6lld pages (need %6lld)\n", a->worker_id, merge_count, ratio,
wait_elapsed, push_elapsed, work_elapsed,(unsigned long)insertedTuples, throughput, mergedPages, !a->out_tree_size ? -1 : (FUDGE * *a->out_tree_size / a->r_i));
{ // print timing statistics
double wait_elapsed = tv_to_double(wait_tv) - tv_to_double(start_tv);
double work_elapsed = tv_to_double(stop_tv) - tv_to_double(wait_tv);
double push_elapsed = tv_to_double(start_tv) - tv_to_double(start_push_tv);
double total_elapsed = wait_elapsed + work_elapsed;
double throughput = ((double)(insertedTuples
* (uint64_t)PAGELAYOUT::FMT::TUP::sizeofBytes()))
/ (1024.0 * 1024.0 * total_elapsed);
printf("worker %d merge # %-6d: comp ratio: %-9.3f stalled %6.1f sec backpressure %6.1f "
"worked %6.1f sec inserts %-12ld (%9.3f mb/s) %6lld pages (need %6lld)\n",
a->worker_id, merge_count, ratio,
wait_elapsed, push_elapsed, work_elapsed,(unsigned long)insertedTuples,
throughput, mergedPages,
!a->out_tree_size ? -1
: (FUDGE * *a->out_tree_size / a->r_i));
}
gettimeofday(&start_push_tv,0); gettimeofday(&start_push_tv,0);
pthread_mutex_lock(a->block_ready_mut); *a->my_tree_size = mergedPages;
// keep actual handle around so that it can be freed below. // always free in tree and old my tree
typename ITERB::handle old_in_tree = **a->in_tree;
if(a->in_tree_allocer) { // free old my_tree here
// TlsmFree(xid, ((typename ITERB::handle)old_in_tree)->r_,TlsmRegionDeallocRid,a->in_tree_allocer); TlsmFree(xid,a->my_tree->r_,TlsmRegionDeallocRid,a->pageAllocState);
// XXX kludge; assumes C1 and C2 have same type of handle....
TlsmFree(xid, ((typename ITERA::handle)old_in_tree)->r_,TlsmRegionDeallocRid,a->in_tree_allocer); if(a->out_tree) {
delete old_in_tree; double frac_wasted =
} else { ((double)RB_TREE_OVERHEAD)
((typename stlSetIterator<std::set<typename PAGELAYOUT::FMT::TUP, / (double)(RB_TREE_OVERHEAD + TUP::sizeofBytes());
typename PAGELAYOUT::FMT::TUP::stl_cmp>,
typename PAGELAYOUT::FMT::TUP>::handle)old_in_tree)->clear(); double target_R = sqrt(((double)(*a->out_tree_size+*a->my_tree_size))
delete / ((C0_MEM_SIZE*(1-frac_wasted))/(4096*ratio)));
((typename stlSetIterator<std::set<typename PAGELAYOUT::FMT::TUP,
typename PAGELAYOUT::FMT::TUP::stl_cmp>, printf("R_C2-C1 = %6.1f R_C1-C0 = %6.1f target = %6.1f\n",
typename PAGELAYOUT::FMT::TUP>::handle)old_in_tree); ((double)(*a->out_tree_size/*+*a->my_tree_size*/)) / ((double)*a->my_tree_size),
((double)*a->my_tree_size) / ((double)(C0_MEM_SIZE*(1-frac_wasted))/(4096*ratio)),
target_R);
if(((double)*a->out_tree_size / ((double)*a->my_tree_size) < target_R)
|| (a->max_size && mergedPages > a->max_size )) {
// XXX need to report backpressure here!
while(*a->out_tree) { // we probably don't need the "while..."
pthread_cond_wait(a->out_block_needed_cond, a->block_ready_mut);
}
*a->out_tree = (typeof(*a->out_tree))malloc(sizeof(**a->out_tree));
**a->out_tree = new typename ITERA::treeIteratorHandle(scratch_tree->r_);
*(recordid*)(a->out_tree_allocer) = *scratchAllocState;
pthread_cond_signal(a->out_block_ready_cond);
// This is a bit wasteful; allocate a new empty tree to merge against.
// We don't want to ever look at the one we just handed upstream...
// We could wait for an in tree to be ready, and then pretend
// that we've just finished merging against it (to avoid all
// those merging comparisons, and a table scan...)
// old alloc state contains the tree that we used as input for this merge...
// we can still free it below
// create a new allocator.
*(recordid*)(a->pageAllocState)
= Talloc(xid, sizeof(TlsmRegionAllocConf_t));
Tset(xid, *(recordid*)(a->pageAllocState),
&LSM_REGION_ALLOC_STATIC_INITIALIZER);
a->my_tree->r_ = TlsmCreate(xid, PAGELAYOUT::cmp_id(),a->pageAlloc,
a->pageAllocState,TUP::sizeofBytes());
} else {
// there is an out tree, but we don't want to push updates to it yet
// replace my_tree with output of merge
*(recordid*)a->pageAllocState = *scratchAllocState;
free(scratchAllocState);
*a->my_tree = *scratch_tree;
}
} else { // ! a->out_tree
*(recordid*)a->pageAllocState = *scratchAllocState;
free(scratchAllocState);
*a->my_tree = *scratch_tree;
} }
free(*a->in_tree); // free pointer to handle //// ----------- Free in_tree
// XXX should we delay this to this point? if(a->in_tree_allocer) { // is in tree an lsm tree or a rb tree?
// otherwise, the contents of in_tree become temporarily unavailable to observers. // @todo don't assume C1 and C2 have same type of handle....
TlsmFree(xid,
(**(typename ITERA::handle**)a->in_tree)->r_,
TlsmRegionDeallocRid,a->in_tree_allocer);
} else {
(**(typename STL_ITER::handle**)a->in_tree)->clear();
}
delete **a->in_tree;
free (*a->in_tree);
*a->in_tree = 0; // tell producer that the slot is now open *a->in_tree = 0; // tell producer that the slot is now open
// todo: do this above the frees by copying state.
// then we wouldn't need to hold the mutex while freeing regions, etc.
// don't set in_block_needed = true; we're not blocked yet. // don't set in_block_needed = true; we're not blocked yet.
pthread_cond_signal(a->in_block_needed_cond); pthread_cond_signal(a->in_block_needed_cond);
#ifdef INFINITE_RESOURCES
*a->my_tree_size = mergedPages;
double target_R = 0;
if(a->out_tree) {
double frac_wasted = ((double)RB_TREE_OVERHEAD)/(double)(RB_TREE_OVERHEAD + PAGELAYOUT::FMT::TUP::sizeofBytes());
target_R = sqrt(((double)(*a->out_tree_size+*a->my_tree_size)) / ((MEM_SIZE*(1-frac_wasted))/(4096*ratio)));
printf("R_C2-C1 = %6.1f R_C1-C0 = %6.1f target = %6.1f\n",
((double)(*a->out_tree_size/*+*a->my_tree_size*/)) / ((double)*a->my_tree_size),
((double)*a->my_tree_size) / ((double)(MEM_SIZE*(1-frac_wasted))/(4096*ratio)),target_R);
}
#else
if(a->out_tree_size) {
*a->my_tree_size = *a->out_tree_size / (a->r_i * FUDGE);
} else {
if(*a->my_tree_size < mergedPages) {
*a->my_tree_size = mergedPages;
}
}
#endif
if(a->out_tree && // is there a upstream merger? (note the lack of the * on a->out_tree)
(
(
#ifdef INFINITE_RESOURCES
#ifndef THROTTLED
(*a->out_block_needed)
#endif
#ifdef THROTTLED
((double)*a->out_tree_size / ((double)*a->my_tree_size) < target_R)
#endif
#else
mergedPages > (FUDGE * *a->out_tree_size / a->r_i) // do we have enough data to bother it?
#endif
)
||
(a->max_size && mergedPages > a->max_size )
)) {
// XXX need to report backpressure here!
while(*a->out_tree) { // we probably don't need the "while..."
pthread_cond_wait(a->out_block_needed_cond, a->block_ready_mut);
}
#ifdef INFINITE_RESOURCES
// printf("pushing tree R_eff = %6.1f target = %6.1f\n", ((double)*a->out_tree_size) / ((double)*a->my_tree_size), target_R);
#endif
// XXX C++? Objects? Constructors? Who needs them?
*a->out_tree = (typeof(*a->out_tree))malloc(sizeof(**a->out_tree));
**a->out_tree = new typename ITERA::treeIteratorHandle(tree->r_);
pthread_cond_signal(a->out_block_ready_cond);
// This is a bit wasteful; allocate a new empty tree to merge against.
// We don't want to ever look at the one we just handed upstream...
// We could wait for an in tree to be ready, and then pretend
// that we've just finished merging against it (to avoid all
// those merging comparisons, and a table scan...)
// old alloc state contains the tree that we used as input for this merge... we can still free it
*(recordid*)(a->out_tree_allocer) = *(recordid*)(a->pageAllocState);
*(recordid*)(a->pageAllocState) = NULLRID;
// create a new allocator.
*(recordid*)(a->pageAllocState) = Talloc(xid, sizeof(TlsmRegionAllocConf_t));
Tset(xid, *(recordid*)(a->pageAllocState),
&LSM_REGION_ALLOC_STATIC_INITIALIZER);
tree->r_ = TlsmCreate(xid, PAGELAYOUT::cmp_id(),a->pageAlloc,
a->pageAllocState,PAGELAYOUT::FMT::TUP::sizeofBytes());
}
// XXX TlsmFree(xid,*a->tree);
assert(a->my_tree->r_.page != tree->r_.page);
*a->my_tree = *tree;
pthread_mutex_unlock(a->block_ready_mut);
gettimeofday(&start_tv,0); gettimeofday(&start_tv,0);
} }
pthread_mutex_unlock(a->block_ready_mut);
Tcommit(xid); Tcommit(xid);
return 0; return 0;
@ -407,8 +376,6 @@ namespace rose {
h.mediumTree = TlsmCreate(xid, PAGELAYOUT::cmp_id(), h.mediumTree = TlsmCreate(xid, PAGELAYOUT::cmp_id(),
TlsmRegionAllocRid,&h.mediumTreeAllocState, TlsmRegionAllocRid,&h.mediumTreeAllocState,
PAGELAYOUT::FMT::TUP::sizeofBytes()); PAGELAYOUT::FMT::TUP::sizeofBytes());
//XXX epoch_t beginning = 0;
//XXX epoch_t end = 0;
Tset(xid, ret, &h); Tset(xid, ret, &h);
return ret; return ret;
} }
@ -534,8 +501,8 @@ namespace rose {
{ {
1, 1,
TlsmRegionAllocRid, TlsmRegionAllocRid,
ridp, ridp, // XXX should be renamed to my_tree_alloc_state
oldridp, oldridp, // XXX no longer needed?!?
block_ready_mut, block_ready_mut,
block1_needed_cond, block1_needed_cond,
block1_needed, block1_needed,
@ -552,7 +519,7 @@ namespace rose {
allocer_scratch, allocer_scratch,
0, 0,
0, 0,
new typename LSM_ITER::treeIteratorHandle(NULLRID), new typename LSM_ITER::treeIteratorHandle(h.bigTree), // my_tree
&(ret->last_xact), &(ret->last_xact),
ts_col ts_col
}; };
@ -569,8 +536,8 @@ namespace rose {
{ {
2, 2,
TlsmRegionAllocRid, TlsmRegionAllocRid,
ridp, ridp,
oldridp, oldridp,
block_ready_mut, block_ready_mut,
block0_needed_cond, block0_needed_cond,
block0_needed, block0_needed,
@ -581,14 +548,13 @@ namespace rose {
ret->still_open, ret->still_open,
block0_size, block0_size,
block1_size, block1_size,
(R * MEM_SIZE) / (PAGE_SIZE * 4), // XXX 4 = estimated compression ratio (R * C0_MEM_SIZE) / (PAGE_SIZE * 4), // XXX 4 = estimated compression ratio
R, R,
//new typename LSM_ITER::treeIteratorHandle(NULLRID),
block0_scratch, block0_scratch,
0, 0,
block1_scratch, block1_scratch,
allocer_scratch, allocer_scratch,
new typename LSM_ITER::treeIteratorHandle(NULLRID), new typename LSM_ITER::treeIteratorHandle(h.mediumTree),
0, 0,
ts_col ts_col
}; };
@ -600,7 +566,8 @@ namespace rose {
return ret; return ret;
} }
// XXX this does not force the table to disk... it simply forces everything out of the in-memory tree. // XXX this does not force the table to disk...
// it simply forces everything out of the in-memory tree.
template<class PAGELAYOUT> template<class PAGELAYOUT>
void TlsmTableFlush(lsmTableHandle<PAGELAYOUT> *h) { void TlsmTableFlush(lsmTableHandle<PAGELAYOUT> *h) {
@ -661,36 +628,31 @@ namespace rose {
template<class PAGELAYOUT> template<class PAGELAYOUT>
void TlsmTableInsert( lsmTableHandle<PAGELAYOUT> *h, void TlsmTableInsert( lsmTableHandle<PAGELAYOUT> *h,
typename PAGELAYOUT::FMT::TUP &t) { typename PAGELAYOUT::FMT::TUP &t) {
/* for(int i = 0; i < t.column_count(); i++) { // This helps valgrind warn earlier...
assert(*((char*)t.get(i)) || *((char*)t.get(i))+1); pthread_mutex_lock(h->mut); //XXX
} */
h->scratch_tree->insert(t); h->scratch_tree->insert(t);
uint64_t handleBytes = h->scratch_tree->size() * (RB_TREE_OVERHEAD + PAGELAYOUT::FMT::TUP::sizeofBytes()); uint64_t handleBytes = h->scratch_tree->size() * (RB_TREE_OVERHEAD + PAGELAYOUT::FMT::TUP::sizeofBytes());
//XXX 4 = estimated compression ratio. //XXX 4 = estimated compression ratio.
uint64_t inputSizeThresh = (4 * PAGE_SIZE * *h->input_size); // / (PAGELAYOUT::FMT::TUP::sizeofBytes()); uint64_t inputSizeThresh = (4 * PAGE_SIZE * *h->input_size);
uint64_t memSizeThresh = MEM_SIZE; uint64_t memSizeThresh = C0_MEM_SIZE;
#ifdef INFINITE_RESOURCES
static const int LATCH_INTERVAL = 10000; static const int LATCH_INTERVAL = 10000;
static int count = LATCH_INTERVAL; /// XXX HACK static int count = LATCH_INTERVAL; /// XXX HACK
bool go = false; bool go = false;
if(!count) { if(!count) {
pthread_mutex_lock(h->mut); ///XXX pthread_mutex_lock(h->mut);
go = *h->input_needed; go = *h->input_needed;
pthread_mutex_unlock(h->mut); ///XXX pthread_mutex_unlock(h->mut);
count = LATCH_INTERVAL; count = LATCH_INTERVAL;
} }
count --; count --;
#endif
if( (handleBytes > memSizeThresh / 2) && ( pthread_mutex_unlock(h->mut);
#ifdef INFINITE_RESOURCES
go || if( (handleBytes > memSizeThresh / 2)
#else && ( go || handleBytes > memSizeThresh ) ) { // XXX ok?
handleBytes > inputSizeThresh ||
#endif
handleBytes > memSizeThresh ) ) { // XXX ok?
printf("Handle mbytes %lld (%lld) Input size: %lld input size thresh: %lld mbytes mem size thresh: %lld\n", printf("Handle mbytes %lld (%lld) Input size: %lld input size thresh: %lld mbytes mem size thresh: %lld\n",
(long long) handleBytes / (1024*1024), (long long) h->scratch_tree->size(), (long long) *h->input_size, (long long) handleBytes / (1024*1024), (long long) h->scratch_tree->size(), (long long) *h->input_size,
(long long) inputSizeThresh / (1024*1024), (long long) memSizeThresh / (1024*1024)); (long long) inputSizeThresh / (1024*1024), (long long) memSizeThresh / (1024*1024));
@ -727,20 +689,9 @@ namespace rose {
template<class PAGELAYOUT> template<class PAGELAYOUT>
int TlsmTableCount(int xid, lsmTableHandle<PAGELAYOUT> *h) { int TlsmTableCount(int xid, lsmTableHandle<PAGELAYOUT> *h) {
/* treeIterator<typename PAGELAYOUT::FMT::TUP, typename PAGELAYOUT::FMT> it1(NULLRID);
treeIterator<typename PAGELAYOUT::FMT::TUP, typename PAGELAYOUT::FMT> it2(NULLRID; typedef treeIterator<typename PAGELAYOUT::FMT::TUP, typename PAGELAYOUT::FMT> LSM_ITER;
stlSetIterator<typename std::set<typename PAGELAYOUT::FMT::TUP>,
typename PAGELAYOUT::FMT::TUP::stl_cmp>,
typename PAGELAYOUT::FMT::TUP> it3(???);*/
// typename std::set<typename
typedef treeIterator<typename PAGELAYOUT::FMT::TUP,
typename PAGELAYOUT::FMT> LSM_ITER;
/* taypedef stlSetIterator<typename std::set<typename PAGELAYOUT::FMT::TUP,
typename PAGELAYOUT::FMT::TUP::stl_cmp>,
typename PAGELAYOUT::FMT::TUP> RB_ITER; */
typedef std::_Rb_tree_const_iterator<typename PAGELAYOUT::FMT::TUP> RB_ITER; typedef std::_Rb_tree_const_iterator<typename PAGELAYOUT::FMT::TUP> RB_ITER;
// typedef mergeIterator<LSM_ITER,RB_ITER,typename PAGELAYOUT::FMT::TUP> INNER_MERGE;
typedef mergeIterator<LSM_ITER,LSM_ITER,typename PAGELAYOUT::FMT::TUP> LSM_LSM ; typedef mergeIterator<LSM_ITER,LSM_ITER,typename PAGELAYOUT::FMT::TUP> LSM_LSM ;
typedef mergeIterator<RB_ITER,RB_ITER,typename PAGELAYOUT::FMT::TUP> RB_RB ; typedef mergeIterator<RB_ITER,RB_ITER,typename PAGELAYOUT::FMT::TUP> RB_RB ;
typedef mergeIterator<LSM_ITER,LSM_LSM,typename PAGELAYOUT::FMT::TUP> LSM_M_LSM_LSM; typedef mergeIterator<LSM_ITER,LSM_LSM,typename PAGELAYOUT::FMT::TUP> LSM_M_LSM_LSM;
@ -841,8 +792,8 @@ namespace rose {
void** void**
TlsmTableFindGTE(int xid, lsmTableHandle<PAGELAYOUT> *h, TlsmTableFindGTE(int xid, lsmTableHandle<PAGELAYOUT> *h,
typename PAGELAYOUT::FMT::TUP &val) { typename PAGELAYOUT::FMT::TUP &val) {
pthread_mutex_lock(h->mut);
// typedef stlSetIterator<typename std::set<typename PAGELAYOUT::FMT::TUP,
typedef stlSetIterator<typename std::set<typename PAGELAYOUT::FMT::TUP, typedef stlSetIterator<typename std::set<typename PAGELAYOUT::FMT::TUP,
typename PAGELAYOUT::FMT::TUP::stl_cmp>, typename PAGELAYOUT::FMT::TUP::stl_cmp>,
typename PAGELAYOUT::FMT::TUP> RB_ITER; typename PAGELAYOUT::FMT::TUP> RB_ITER;
@ -880,6 +831,11 @@ namespace rose {
return ret; return ret;
} }
template<class PAGELAYOUT>
void
TlsmTableFindGTEDone(lsmTableHandle<PAGELAYOUT> *h) {
pthread_mutex_unlock(h->mut);
}
template<class PAGELAYOUT> template<class PAGELAYOUT>
const typename PAGELAYOUT::FMT::TUP * const typename PAGELAYOUT::FMT::TUP *
TlsmTableFind(int xid, lsmTableHandle<PAGELAYOUT> *h, TlsmTableFind(int xid, lsmTableHandle<PAGELAYOUT> *h,