partially remove merger_args structure

git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@579 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
sears 2010-02-17 23:38:31 +00:00
parent 5032e77b6c
commit ec747781ac
4 changed files with 28 additions and 85 deletions

View file

@ -834,6 +834,7 @@ logtable::logtable()
tree_c0 = NULL; tree_c0 = NULL;
tree_c1 = NULL; tree_c1 = NULL;
tree_c2 = NULL; tree_c2 = NULL;
this->still_running_ = true;
this->mergedata = 0; this->mergedata = 0;
fixed_page_count = -1; fixed_page_count = -1;
//tmerger = new tuplemerger(&append_merger); //tmerger = new tuplemerger(&append_merger);

View file

@ -189,7 +189,7 @@ public:
datatuple * findTuple(int xid, const datatuple::key_t key, size_t keySize, logtree *ltree); datatuple * findTuple(int xid, const datatuple::key_t key, size_t keySize, logtree *ltree);
inline recordid & get_table_rec(){return table_rec;} inline recordid & get_table_rec(){return table_rec;} // TODO This is called by merger.cpp for no good reason. (remove the calls)
inline logtree * get_tree_c2(){return tree_c2;} inline logtree * get_tree_c2(){return tree_c2;}
inline logtree * get_tree_c1(){return tree_c1;} inline logtree * get_tree_c1(){return tree_c1;}
@ -234,9 +234,11 @@ public:
int64_t max_c0_size; int64_t max_c0_size;
private: inline bool is_still_running() { return still_running_; }
inline void stop() {
still_running_ = false;
// XXX must need to do other things!
}
private: private:
recordid table_rec; recordid table_rec;
@ -257,6 +259,8 @@ private:
// logtable_mergedata * mergedata; // logtable_mergedata * mergedata;
tuplemerger *tmerger; tuplemerger *tmerger;
bool still_running_;
}; };

View file

@ -57,13 +57,6 @@ merge_scheduler::~merge_scheduler()
delete mdata->input_size; delete mdata->input_size;
//delete the merge thread structure variables //delete the merge thread structure variables
delete (recordid*) mdata->memmerge_args->pageAllocState;
delete (recordid*) mdata->memmerge_args->oldAllocState;
delete mdata->memmerge_args->still_open;
delete (recordid*) mdata->diskmerge_args->pageAllocState;
delete (recordid*) mdata->diskmerge_args->oldAllocState;
pthread_cond_destroy(mdata->diskmerge_args->in_block_needed_cond); pthread_cond_destroy(mdata->diskmerge_args->in_block_needed_cond);
delete mdata->diskmerge_args->in_block_needed_cond; delete mdata->diskmerge_args->in_block_needed_cond;
delete mdata->diskmerge_args->in_block_needed; delete mdata->diskmerge_args->in_block_needed;
@ -76,13 +69,9 @@ merge_scheduler::~merge_scheduler()
delete mdata->diskmerge_args->in_block_ready_cond; delete mdata->diskmerge_args->in_block_ready_cond;
pthread_cond_destroy(mdata->diskmerge_args->out_block_ready_cond); pthread_cond_destroy(mdata->diskmerge_args->out_block_ready_cond);
delete mdata->diskmerge_args->out_block_ready_cond; delete mdata->diskmerge_args->out_block_ready_cond;
delete mdata->diskmerge_args->my_tree_size;
delete mdata->diskmerge_args; delete mdata->diskmerge_args;
delete mdata->memmerge_args; delete mdata->memmerge_args;
} }
mergedata.clear(); mergedata.clear();
@ -99,8 +88,8 @@ void merge_scheduler::shutdown()
//flush the in memory table to write any tuples still in memory //flush the in memory table to write any tuples still in memory
ltable->flushTable(); ltable->flushTable();
pthread_mutex_lock(mdata->rbtree_mut); pthread_mutex_lock(mdata->rbtree_mut);
*(mdata->memmerge_args->still_open)=false; ltable->stop();
pthread_cond_signal(mdata->input_ready_cond); pthread_cond_signal(mdata->input_ready_cond);
//*(mdata->diskmerge_args->still_open)=false;//same pointer so no need //*(mdata->diskmerge_args->still_open)=false;//same pointer so no need
@ -138,7 +127,6 @@ void merge_scheduler::startlogtable(int index, int64_t MAX_C0_SIZE)
bool *block1_needed = new bool(false); bool *block1_needed = new bool(false);
bool *block2_needed = new bool(false); bool *block2_needed = new bool(false);
bool *system_running = new bool(true);
//wait to merge the next block until we have merged block FUDGE times. //wait to merge the next block until we have merged block FUDGE times.
static const int FUDGE = 1; static const int FUDGE = 1;
@ -150,24 +138,18 @@ void merge_scheduler::startlogtable(int index, int64_t MAX_C0_SIZE)
ltable->set_tree_c0(new rbtree_t); ltable->set_tree_c0(new rbtree_t);
//disk merger args //disk merger args
recordid * ridp = new recordid;
*ridp = ltable->get_tree_c2()->get_tree_state(); //h.bigTreeAllocState;
recordid * oldridp = new recordid;
*oldridp = NULLRID;
ltable->max_c0_size = MAX_C0_SIZE; ltable->max_c0_size = MAX_C0_SIZE;
logtree ** block1_scratch = new logtree*; logtree ** block1_scratch = new logtree*;
*block1_scratch=0; *block1_scratch=0;
//recordid * allocer_scratch = new recordid; DEBUG("Tree C1 is %lld\n", (long long)ltable->get_tree_c1()->get_root_rec().page);
DEBUG("Tree C2 is %lld\n", (long long)ltable->get_tree_c2()->get_root_rec().page);
struct merger_args<logtree> diskmerge_args= { struct merger_args<logtree> diskmerge_args= {
ltable, ltable,
1, //worker id 1, //worker id
logtree::alloc_region_rid, //pageAlloc
ridp, // pageAllocState
oldridp, // oldAllocState
mdata->rbtree_mut, //block_ready_mutex mdata->rbtree_mut, //block_ready_mutex
block1_needed_cond, //in_block_needed_cond block1_needed_cond, //in_block_needed_cond
block1_needed, //in_block_needed block1_needed, //in_block_needed
@ -175,37 +157,19 @@ void merge_scheduler::startlogtable(int index, int64_t MAX_C0_SIZE)
block2_needed, //out_block_needed block2_needed, //out_block_needed
block1_ready_cond, //in_block_ready_cond block1_ready_cond, //in_block_ready_cond
block2_ready_cond, //out_block_ready_cond block2_ready_cond, //out_block_ready_cond
system_running, //still_open i.e. system running
block1_size, //mytree_size ?
0, //out_tree_size, biggest component computes its size directly.
0, //max_tree_size No max size for biggest component 0, //max_tree_size No max size for biggest component
&R, //r_i &R, //r_i
block1_scratch, //in-tree block1_scratch, //in-tree
0, //out_tree 0, //out_tree
ltable->get_tree_c2()->get_root_rec(), // my_tree ltable->get_tree_c2()->get_root_rec(), // my_tree
ltable->get_table_rec() //tree
}; };
*mdata->diskmerge_args = diskmerge_args; *mdata->diskmerge_args = diskmerge_args;
DEBUG("Tree C2 is %lld\n", (long long)ltable->get_tree_c2()->get_root_rec().page);
//memory merger args
ridp = new recordid;
*ridp = ltable->get_tree_c1()->get_tree_state();
oldridp = new recordid;
*oldridp = NULLRID;
DEBUG("Tree C1 is %lld\n", (long long)ltable->get_tree_c1()->get_root_rec().page);
struct merger_args<rbtree_t> memmerge_args = struct merger_args<rbtree_t> memmerge_args =
{ {
ltable, ltable,
2, 2,
logtree::alloc_region_rid, //pageAlloc
ridp, // pageAllocState
oldridp, // oldAllocState
mdata->rbtree_mut, //block_ready_mutex mdata->rbtree_mut, //block_ready_mutex
mdata->input_needed_cond, mdata->input_needed_cond,
mdata->input_needed, mdata->input_needed,
@ -213,15 +177,11 @@ void merge_scheduler::startlogtable(int index, int64_t MAX_C0_SIZE)
block1_needed, block1_needed,
mdata->input_ready_cond, mdata->input_ready_cond,
block1_ready_cond, block1_ready_cond,
system_running,
mdata->input_size,
block1_size,
(int64_t)(R * R * MAX_C0_SIZE), (int64_t)(R * R * MAX_C0_SIZE),
&R, &R,
mdata->old_c0, mdata->old_c0,
block1_scratch, block1_scratch,
ltable->get_tree_c1()->get_root_rec(), ltable->get_tree_c1()->get_root_rec(),
ltable->get_table_rec() //tree
}; };
*mdata->memmerge_args = memmerge_args; *mdata->memmerge_args = memmerge_args;
@ -261,7 +221,7 @@ void* memMergeThread(void*arg)
//pthread_cond_signal(a->in_block_needed_cond); //pthread_cond_signal(a->in_block_needed_cond);
pthread_cond_broadcast(a->in_block_needed_cond); pthread_cond_broadcast(a->in_block_needed_cond);
if(!*(a->still_open)){ if(!ltable->is_still_running()){
done = 1; done = 1;
pthread_mutex_unlock(a->block_ready_mut); pthread_mutex_unlock(a->block_ready_mut);
break; break;
@ -315,6 +275,7 @@ void* memMergeThread(void*arg)
//save the old dp state values //save the old dp state values
DataPage<datatuple>::RegionAllocator *old_alloc = ltable->get_tree_c1()->get_alloc(); DataPage<datatuple>::RegionAllocator *old_alloc = ltable->get_tree_c1()->get_alloc();
old_alloc->done(); // XXX do this earlier old_alloc->done(); // XXX do this earlier
recordid oldAllocState = ltable->get_tree_c1()->get_tree_state();
//pthread_mutex_unlock(a->block_ready_mut); //pthread_mutex_unlock(a->block_ready_mut);
unlock(ltable->mergedata->header_lock); unlock(ltable->mergedata->header_lock);
@ -340,28 +301,26 @@ void* memMergeThread(void*arg)
writelock(ltable->mergedata->header_lock,0); writelock(ltable->mergedata->header_lock,0);
merge_count++; merge_count++;
*a->my_tree_size = mergedPages;
printf("mmt:\tmerge_count %d #pages written %lld\n", merge_count, npages); printf("mmt:\tmerge_count %d #pages written %lld\n", merge_count, npages);
delete ltable->get_tree_c1(); delete ltable->get_tree_c1();
ltable->set_tree_c1(scratch_tree); ltable->set_tree_c1(scratch_tree);
logtable::table_header h; logtable::table_header h;
void * oldAllocState = a->pageAllocState; Tread(xid, ltable->get_table_rec(), &h);
Tread(xid, a->tree, &h);
h.c1_root = scratch_root; h.c1_root = scratch_root;
h.c1_state = scratch_alloc_state; h.c1_state = scratch_alloc_state;
//note we already updated the dpstate before the merge //note we already updated the dpstate before the merge
printf("mmt:\tUpdated C1's position on disk to %lld\n",scratch_root.page); printf("mmt:\tUpdated C1's position on disk to %lld\n",scratch_root.page);
Tset(xid, a->tree, &h); Tset(xid, ltable->get_table_rec(), &h);
//Tcommit(xid); //Tcommit(xid);
//xid = Tbegin(); //xid = Tbegin();
// free old my_tree here // free old my_tree here
//TODO: check //TODO: check
logtree::free_region_rid(xid, a->my_tree, logtree::dealloc_region_rid, oldAllocState); logtree::free_region_rid(xid, a->my_tree, logtree::dealloc_region_rid, &oldAllocState);
//free the old data pages //free the old data pages
old_alloc->dealloc_regions(xid); old_alloc->dealloc_regions(xid);
@ -401,19 +360,17 @@ void* memMergeThread(void*arg)
logtree *empty_tree = new logtree(new DataPage<datatuple>::RegionAllocator(xid, ltable->get_dpstate1() /*rid of old header*/, 10000)); // XXX should not hardcode region size); logtree *empty_tree = new logtree(new DataPage<datatuple>::RegionAllocator(xid, ltable->get_dpstate1() /*rid of old header*/, 10000)); // XXX should not hardcode region size);
empty_tree->create(xid); empty_tree->create(xid);
*(recordid*)(a->pageAllocState) = empty_tree->get_tree_state();
a->my_tree = empty_tree->get_root_rec(); a->my_tree = empty_tree->get_root_rec();
ltable->set_tree_c1(empty_tree); ltable->set_tree_c1(empty_tree);
logtable::table_header h; logtable::table_header h;
Tread(xid, a->tree, &h); Tread(xid, ltable->get_table_rec(), &h);
h.c1_root = empty_tree->get_root_rec(); //update root h.c1_root = empty_tree->get_root_rec(); //update root
h.c1_state = empty_tree->get_tree_state(); //update index alloc state h.c1_state = empty_tree->get_tree_state(); //update index alloc state
printf("mmt:\tUpdated C1's position on disk to %lld\n",empty_tree->get_root_rec().page); printf("mmt:\tUpdated C1's position on disk to %lld\n",empty_tree->get_root_rec().page);
Tset(xid, a->tree, &h); Tset(xid, ltable->get_table_rec(), &h);
Tcommit(xid); Tcommit(xid);
//xid = Tbegin(); //xid = Tbegin();
@ -421,7 +378,6 @@ void* memMergeThread(void*arg)
else //not signaling the C2 for merge yet else //not signaling the C2 for merge yet
{ {
printf("mmt:\tnot signaling C2 for merge\n"); printf("mmt:\tnot signaling C2 for merge\n");
*(recordid*)a->pageAllocState = scratch_alloc_state;
a->my_tree = scratch_root; a->my_tree = scratch_root;
} }
@ -470,7 +426,7 @@ void *diskMergeThread(void*arg)
*a->in_block_needed = true; *a->in_block_needed = true;
pthread_cond_signal(a->in_block_needed_cond); pthread_cond_signal(a->in_block_needed_cond);
if(!*(a->still_open)){ if(!ltable->is_still_running()){
done = 1; done = 1;
pthread_mutex_unlock(a->block_ready_mut); pthread_mutex_unlock(a->block_ready_mut);
break; break;
@ -513,6 +469,8 @@ void *diskMergeThread(void*arg)
DataPage<datatuple>::RegionAllocator *old_alloc1 = ltable->get_tree_c1()->get_alloc(); DataPage<datatuple>::RegionAllocator *old_alloc1 = ltable->get_tree_c1()->get_alloc();
DataPage<datatuple>::RegionAllocator *old_alloc2 = ltable->get_tree_c2()->get_alloc(); DataPage<datatuple>::RegionAllocator *old_alloc2 = ltable->get_tree_c2()->get_alloc();
recordid oldAllocState = ltable->get_tree_c2()->get_tree_state();
unlock(ltable->mergedata->header_lock); unlock(ltable->mergedata->header_lock);
@ -537,7 +495,6 @@ void *diskMergeThread(void*arg)
writelock(ltable->mergedata->header_lock,0); writelock(ltable->mergedata->header_lock,0);
merge_count++; merge_count++;
*a->my_tree_size = mergedPages;
//update the current optimal R value //update the current optimal R value
*(a->r_i) = std::max(MIN_R, sqrt( (npages * 1.0) / (ltable->max_c0_size/PAGE_SIZE) ) ); *(a->r_i) = std::max(MIN_R, sqrt( (npages * 1.0) / (ltable->max_c0_size/PAGE_SIZE) ) );
@ -547,24 +504,21 @@ void *diskMergeThread(void*arg)
ltable->set_tree_c2(scratch_tree); ltable->set_tree_c2(scratch_tree);
logtable::table_header h; logtable::table_header h;
void * oldAllocState = a->pageAllocState; Tread(xid, ltable->get_table_rec(), &h);
Tread(xid, a->tree, &h);
h.c2_root = scratch_root; h.c2_root = scratch_root;
h.c2_state = scratch_alloc_state; h.c2_state = scratch_alloc_state;
//note we already updated the dpstate before the merge //note we already updated the dpstate before the merge
printf("dmt:\tUpdated C2's position on disk to %lld\n",scratch_root.page); printf("dmt:\tUpdated C2's position on disk to %lld\n",scratch_root.page);
Tset(xid, a->tree, &h); Tset(xid, ltable->get_table_rec(), &h);
// free old my_tree here // free old my_tree here
//TODO: check //TODO: check
logtree::free_region_rid(xid, a->my_tree, logtree::dealloc_region_rid, oldAllocState); logtree::free_region_rid(xid, a->my_tree, logtree::dealloc_region_rid, &oldAllocState);
//TlsmFree(xid,a->my_tree->r_,logtree::dealloc_region_rid,oldAllocState);
//free the old data pages //free the old data pages
old_alloc2->dealloc_regions(xid); old_alloc2->dealloc_regions(xid);
*(recordid*)a->pageAllocState = scratch_alloc_state;
a->my_tree = scratch_root; a->my_tree = scratch_root;
//// ----------- Free in_tree //// ----------- Free in_tree
@ -573,11 +527,6 @@ void *diskMergeThread(void*arg)
logtree::dealloc_region_rid, logtree::dealloc_region_rid,
&((*a->in_tree)->get_tree_state())); &((*a->in_tree)->get_tree_state()));
old_alloc1->dealloc_regions(xid); // XXX make sure that both of these are 'unlinked' before this happens old_alloc1->dealloc_regions(xid); // XXX make sure that both of these are 'unlinked' before this happens
//TlsmFree(xid,a->my_tree->r_,logtree::dealloc_region_rid,oldAllocState);
//TODO: check
//free the old data pages
// DataPage<datatuple>::dealloc_region_rid(xid, a->in_tree_allocer);//TODO:
Tcommit(xid); Tcommit(xid);

View file

@ -17,11 +17,6 @@ struct merger_args
logtable * ltable; logtable * ltable;
int worker_id; int worker_id;
//page allocation information
pageid_t(*pageAlloc)(int,void*);
void *pageAllocState;
void *oldAllocState;
pthread_mutex_t * block_ready_mut; pthread_mutex_t * block_ready_mut;
pthread_cond_t * in_block_needed_cond; pthread_cond_t * in_block_needed_cond;
@ -33,10 +28,6 @@ struct merger_args
pthread_cond_t * in_block_ready_cond; pthread_cond_t * in_block_ready_cond;
pthread_cond_t * out_block_ready_cond; pthread_cond_t * out_block_ready_cond;
bool * still_open;
int64_t * my_tree_size;
int64_t * out_tree_size;
int64_t max_size; //pageid_t int64_t max_size; //pageid_t
double * r_i; double * r_i;
@ -45,8 +36,6 @@ struct merger_args
logtree ** out_tree; logtree ** out_tree;
recordid my_tree; recordid my_tree;
recordid tree;
}; };