Encapsulate allocator state inside logtree. Remove redundant information from merger_args. Rewrite merge threads. This seems to fix the double free issues, and gets rid of disk space leaks. It's probably leaking ram, and does not keep the logstore headers up to date.

git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@587 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
sears 2010-02-18 23:31:57 +00:00
parent ec76d1c244
commit ed9d4fc377
7 changed files with 184 additions and 271 deletions

View file

@ -43,20 +43,19 @@ public:
// Open an existing region allocator.
RegionAllocator(int xid, recordid rid) :
rid_(rid),
nextPage_(INVALID_PAGE),
endOfRegion_(INVALID_PAGE) {
Tread(xid, rid, &header_);
rid_ = rid;
Tread(xid, rid_, &header_);
regionCount_ = TarrayListLength(xid, header_.region_list);
}
// Create a new region allocator.
RegionAllocator(int xid, recordid rid, pageid_t region_length) :
rid_(rid),
RegionAllocator(int xid, pageid_t region_length) :
nextPage_(0),
endOfRegion_(0),
regionCount_(0)
{
assert(TrecordSize(xid, rid) == sizeof(header_));
rid_ = Talloc(xid, sizeof(header_));
header_.region_list = TarrayListAlloc(xid, 1, 2, sizeof(pageid_t));
header_.region_length = region_length;
Tset(xid, rid_, &header_);
@ -94,7 +93,6 @@ public:
Tread(xid, list_entry, &pid);
TregionForce(xid, pid);
}
Tset(xid, rid_, &header_);
}
void dealloc_regions(int xid) {
pageid_t regionCount = TarrayListLength(xid, header_.region_list);
@ -105,12 +103,13 @@ public:
list_entry.slot < regionCount; list_entry.slot++) {
pageid_t pid;
Tread(xid, list_entry, &pid);
#ifndef CHECK_FOR_SCRIBBLING // Don't actually free the page if we'll be checking that pages are used exactly once below.
//#ifndef CHECK_FOR_SCRIBBLING // Don't actually free the page if we'll be checking that pages are used exactly once below.
TregionDealloc(xid, pid);
#endif
//#endif
}
printf("Warning: leaking arraylist %lld in datapage\n", (long long)header_.region_list.page);
// TarrayListDealloc(xid, header_.region_list);
// printf("Warning: leaking arraylist %lld in datapage\n", (long long)header_.region_list.page);
TarrayListDealloc(xid, header_.region_list);
Tdealloc(xid, rid_);
}
pageid_t * list_regions(int xid, pageid_t * region_length, pageid_t * region_count) {
*region_count = TarrayListLength(xid, header_.region_list);
@ -127,17 +126,17 @@ public:
nextPage_ = INVALID_PAGE;
endOfRegion_ = INVALID_PAGE;
}
recordid header_rid() { return rid_; }
private:
typedef struct {
recordid region_list;
pageid_t region_length;
} persistent_state;
const recordid rid_;
recordid rid_;
pageid_t nextPage_;
pageid_t endOfRegion_;
pageid_t regionCount_;
pageid_t endOfRegion_;
pageid_t regionCount_;
persistent_state header_;
public:
static const size_t header_size = sizeof(persistent_state);

View file

@ -500,18 +500,27 @@ void * thread_work_fn( void * args)
readlock(item->data->ltable->getMergeData()->header_lock, 0);
// produce a list of regions used by current tree components
pageid_t datapage_c1_region_length, datapage_c2_region_length;
pageid_t datapage_c1_region_count, datapage_c2_region_count;
pageid_t datapage_c1_region_length, datapage_c1_mergeable_region_length = 0, datapage_c2_region_length;
pageid_t datapage_c1_region_count, datapage_c1_mergeable_region_count = 0, datapage_c2_region_count;
pageid_t * datapage_c1_regions = item->data->ltable->get_tree_c1()->get_alloc()->list_regions(xid, &datapage_c1_region_length, &datapage_c1_region_count);
pageid_t * datapage_c1_mergeable_regions = NULL;
if(item->data->ltable->get_tree_c1_mergeable()) {
datapage_c1_mergeable_regions = item->data->ltable->get_tree_c1_mergeable()->get_alloc()->list_regions(xid, &datapage_c1_mergeable_region_length, &datapage_c1_mergeable_region_count);
}
pageid_t * datapage_c2_regions = item->data->ltable->get_tree_c2()->get_alloc()->list_regions(xid, &datapage_c2_region_length, &datapage_c2_region_count);
pageid_t tree_c1_region_length, tree_c2_region_length;
pageid_t tree_c1_region_count, tree_c2_region_count;
pageid_t tree_c1_region_length, tree_c1_mergeable_region_length = 0, tree_c2_region_length;
pageid_t tree_c1_region_count, tree_c1_mergeable_region_count = 0, tree_c2_region_count;
recordid tree_c1_region_header = item->data->ltable->get_treestate1();
recordid tree_c2_region_header = item->data->ltable->get_treestate2();
recordid tree_c1_region_header = item->data->ltable->get_tree_c1()->get_tree_state();
recordid tree_c2_region_header = item->data->ltable->get_tree_c2()->get_tree_state();
pageid_t * tree_c1_regions = logtree::list_region_rid(xid, &tree_c1_region_header, &tree_c1_region_length, &tree_c1_region_count);
pageid_t * tree_c1_mergeable_regions = NULL;
if(item->data->ltable->get_tree_c1_mergeable()) {
recordid tree_c1_mergeable_region_header = item->data->ltable->get_tree_c1_mergeable()->get_tree_state();
tree_c1_mergeable_regions = logtree::list_region_rid(xid, &tree_c1_mergeable_region_header, &tree_c1_mergeable_region_length, &tree_c1_mergeable_region_count);
}
pageid_t * tree_c2_regions = logtree::list_region_rid(xid, &tree_c2_region_header, &tree_c2_region_length, &tree_c2_region_count);
unlock(item->data->ltable->getMergeData()->header_lock);
@ -541,17 +550,31 @@ void * thread_work_fn( void * args)
boundary_tag tag;
pageid_t pid = ROOT_RECORD.page;
TregionReadBoundaryTag(xid, pid, &tag);
pageid_t max_off = 0;
bool done;
do {
max_off = pid + tag.size;
// print tag.
printf("\tPage %lld\tSize %lld\tAllocationManager %d\n", (long long)pid, (long long)tag.size, (int)tag.allocation_manager);
done = ! TregionNextBoundaryTag(xid, &pid, &tag, 0/*all allocation managers*/);
} while(!done);
printf("\n");
printf("Tree components are using %lld megabytes. File is using %lld megabytes.",
PAGE_SIZE * (tree_c1_region_length * tree_c1_region_count
+ tree_c1_mergeable_region_length * tree_c1_mergeable_region_count
+ tree_c2_region_length * tree_c2_region_count
+ datapage_c1_region_length * datapage_c1_region_count
+ datapage_c1_mergeable_region_length * datapage_c1_mergeable_region_count
+ datapage_c2_region_length * datapage_c2_region_count) / (1024 * 1024),
(PAGE_SIZE * max_off) / (1024*1024));
free(datapage_c1_regions);
if(datapage_c1_mergeable_regions) free(datapage_c1_mergeable_regions);
free(datapage_c2_regions);
free(tree_c1_regions);
if(tree_c1_mergeable_regions) free(tree_c1_mergeable_regions);
free(tree_c2_regions);
err = writeoptosocket(*(item->data->workitem), LOGSTORE_RESPONSE_SUCCESS);

View file

@ -62,9 +62,8 @@ void logtree::free_region_rid(int xid, recordid tree,
}
void logtree::dealloc_region_rid(int xid, void *conf)
void logtree::dealloc_region_rid(int xid, recordid rid)
{
recordid rid = *(recordid*)conf;
RegionAllocConf_t a;
Tread(xid,rid,&a);
DEBUG("{%lld <- dealloc region arraylist}\n", a.regionList.page);
@ -76,14 +75,13 @@ void logtree::dealloc_region_rid(int xid, void *conf)
TregionDealloc(xid,pid);
}
a.regionList.slot = 0;
printf("Warning: leaking arraylist %lld in logtree\n", (long long)a.regionList.page);
// TarrayListDealloc(xid, a.regionList);
// printf("Warning: leaking arraylist %lld in logtree\n", (long long)a.regionList.page);
TarrayListDealloc(xid, a.regionList);
}
void logtree::force_region_rid(int xid, void *conf)
void logtree::force_region_rid(int xid, recordid rid)
{
recordid rid = *(recordid*)conf;
RegionAllocConf_t a;
Tread(xid,rid,&a);
@ -832,7 +830,9 @@ logtable::logtable()
{
tree_c0 = NULL;
tree_c0_mergeable = NULL;
tree_c1 = NULL;
tree_c1_mergeable = NULL;
tree_c2 = NULL;
this->still_running_ = true;
this->mergedata = 0;
@ -889,17 +889,17 @@ recordid logtable::allocTable(int xid)
//create the big tree
tbl_header.c2_dp_state = Talloc(xid, DataPage<datatuple>::RegionAllocator::header_size);
tree_c2 = new logtree(new DataPage<datatuple>::RegionAllocator(xid, tbl_header.c2_dp_state, 10000)); /// XXX do not hard code region length.
tree_c2->create(xid);
tree_c2 = new logtree(xid);
//create the small tree
tbl_header.c1_dp_state = Talloc(xid, DataPage<datatuple>::RegionAllocator::header_size);
tree_c1 = new logtree(new DataPage<datatuple>::RegionAllocator(xid, tbl_header.c1_dp_state, 10000)); /// XXX do not hard code region length.
tree_c1->create(xid);
tree_c1 = new logtree(xid);
tbl_header.c2_root = tree_c2->get_root_rec();
tbl_header.c2_dp_state = tree_c2->get_alloc()->header_rid();
tbl_header.c2_state = tree_c2->get_tree_state();
tbl_header.c1_root = tree_c1->get_root_rec();
tbl_header.c2_dp_state = tree_c1->get_alloc()->header_rid();
tbl_header.c1_state = tree_c1->get_tree_state();
Tset(xid, table_rec, &tbl_header);
@ -931,7 +931,7 @@ void logtable::flushTable()
printf("prv merge not complete\n");
while(*mergedata->old_c0) {
while(get_tree_c0_mergeable()) {
unlock(mergedata->header_lock);
// pthread_mutex_lock(mergedata->rbtree_mut);
if(tree_bytes >= max_c0_size)
@ -963,15 +963,15 @@ void logtable::flushTable()
stop = tv_to_double(stop_tv);
//rbtree_ptr *tmp_ptr = new rbtree_ptr_t; //(typeof(h->scratch_tree)*) malloc(sizeof(void*));
//*tmp_ptr = tree_c0;
*(mergedata->old_c0) = tree_c0;
set_tree_c0_mergeable(get_tree_c0());
// pthread_mutex_lock(mergedata->rbtree_mut);
pthread_cond_signal(mergedata->input_ready_cond);
// pthread_mutex_unlock(mergedata->rbtree_mut);
merge_count ++;
tree_c0 = new rbtree_t;
set_tree_c0(new rbtree_t);
tsize = 0;
tree_bytes = 0;
@ -1002,20 +1002,20 @@ datatuple * logtable::findTuple(int xid, const datatuple::key_t key, size_t keyS
datatuple *ret_tuple=0;
//step 1: look in tree_c0
rbtree_t::iterator rbitr = tree_c0->find(search_tuple);
if(rbitr != tree_c0->end())
rbtree_t::iterator rbitr = get_tree_c0()->find(search_tuple);
if(rbitr != get_tree_c0()->end())
{
DEBUG("tree_c0 size %d\n", tree_c0->size());
DEBUG("tree_c0 size %d\n", get_tree_c0()->size());
ret_tuple = (*rbitr)->create_copy();
}
bool done = false;
//step: 2 look into first in tree if exists (a first level merge going on)
if(*(mergedata->old_c0) != 0)
if(get_tree_c0_mergeable() != 0)
{
DEBUG("old mem tree not null %d\n", (*(mergedata->old_c0))->size());
rbitr = (*(mergedata->old_c0))->find(search_tuple);
if(rbitr != (*(mergedata->old_c0))->end())
rbitr = get_tree_c0_mergeable()->find(search_tuple);
if(rbitr != get_tree_c0_mergeable()->end())
{
datatuple *tuple = *rbitr;
@ -1041,7 +1041,7 @@ datatuple * logtable::findTuple(int xid, const datatuple::key_t key, size_t keyS
//step 3: check c1
if(!done)
{
datatuple *tuple_c1 = findTuple(xid, key, keySize, tree_c1);
datatuple *tuple_c1 = findTuple(xid, key, keySize, get_tree_c1());
if(tuple_c1 != NULL)
{
bool use_copy = false;
@ -1057,9 +1057,6 @@ datatuple * logtable::findTuple(int xid, const datatuple::key_t key, size_t keyS
{
use_copy = true;
ret_tuple = tuple_c1;
//byte *barr = (byte*)malloc(tuple_c1->byte_length());
//memcpy(barr, (byte*)tuple_c1->keylen, tuple_c1->byte_length());
//ret_tuple = datatuple::from_bytes(barr);
}
if(!use_copy)
@ -1070,11 +1067,10 @@ datatuple * logtable::findTuple(int xid, const datatuple::key_t key, size_t keyS
}
//step 4: check old c1 if exists
if(!done && *(mergedata->diskmerge_args->in_tree) != 0)
if(!done && get_tree_c1_mergeable() != 0)
{
DEBUG("old c1 tree not null\n");
datatuple *tuple_oc1 = findTuple(xid, key, keySize,
(logtree*)( *(mergedata->diskmerge_args->in_tree)));
datatuple *tuple_oc1 = findTuple(xid, key, keySize, get_tree_c1_mergeable());
if(tuple_oc1 != NULL)
{
@ -1107,7 +1103,7 @@ datatuple * logtable::findTuple(int xid, const datatuple::key_t key, size_t keyS
if(!done)
{
DEBUG("Not in old first disk tree\n");
datatuple *tuple_c2 = findTuple(xid, key, keySize, tree_c2);
datatuple *tuple_c2 = findTuple(xid, key, keySize, get_tree_c2());
if(tuple_c2 != NULL)
{
@ -1154,8 +1150,8 @@ datatuple * logtable::findTuple_first(int xid, datatuple::key_t key, size_t keyS
datatuple *ret_tuple=0;
//step 1: look in tree_c0
rbtree_t::iterator rbitr = tree_c0->find(search_tuple);
if(rbitr != tree_c0->end())
rbtree_t::iterator rbitr = get_tree_c0()->find(search_tuple);
if(rbitr != get_tree_c0()->end())
{
DEBUG("tree_c0 size %d\n", tree_c0->size());
ret_tuple = (*rbitr)->create_copy();
@ -1165,11 +1161,11 @@ datatuple * logtable::findTuple_first(int xid, datatuple::key_t key, size_t keyS
{
DEBUG("Not in mem tree %d\n", tree_c0->size());
//step: 2 look into first in tree if exists (a first level merge going on)
if(*(mergedata->old_c0) != 0)
if(get_tree_c0_mergeable() != NULL)
{
DEBUG("old mem tree not null %d\n", (*(mergedata->old_c0))->size());
rbitr = (*(mergedata->old_c0))->find(search_tuple);
if(rbitr != (*(mergedata->old_c0))->end())
rbitr = get_tree_c0_mergeable()->find(search_tuple);
if(rbitr != get_tree_c0_mergeable()->end())
{
ret_tuple = (*rbitr)->create_copy();
}
@ -1180,7 +1176,7 @@ datatuple * logtable::findTuple_first(int xid, datatuple::key_t key, size_t keyS
DEBUG("Not in old mem tree\n");
//step 3: check c1
ret_tuple = findTuple(xid, key, keySize, tree_c1);
ret_tuple = findTuple(xid, key, keySize, get_tree_c1());
}
if(ret_tuple == 0)
@ -1188,11 +1184,10 @@ datatuple * logtable::findTuple_first(int xid, datatuple::key_t key, size_t keyS
DEBUG("Not in first disk tree\n");
//step 4: check old c1 if exists
if( *(mergedata->diskmerge_args->in_tree) != 0)
if( get_tree_c1_mergeable() != 0)
{
DEBUG("old c1 tree not null\n");
ret_tuple = findTuple(xid, key, keySize,
(logtree*)( *(mergedata->diskmerge_args->in_tree)));
ret_tuple = findTuple(xid, key, keySize, get_tree_c1_mergeable());
}
}

View file

@ -59,10 +59,10 @@ typedef void(*logtree_page_deallocator_t)(int, void *);
class logtree{
public:
logtree(DataPage<datatuple>::RegionAllocator * alloc): region_alloc(alloc) {}
logtree(int xid): region_alloc(new DataPage<datatuple>::RegionAllocator(xid, 10000)) {create(xid);} // XXX shouldn't hardcode region size.
private:
recordid create(int xid);
public:
void print_tree(int xid);
static void init_stasis();
@ -71,9 +71,9 @@ private:
static pageid_t alloc_region(int xid, void *conf);
public:
static pageid_t alloc_region_rid(int xid, void * ridp);
static void force_region_rid(int xid, void *conf);
static void force_region_rid(int xid, recordid rid);
static pageid_t*list_region_rid(int xid, void * ridp, pageid_t * region_len, pageid_t * region_count);
static void dealloc_region_rid(int xid, void *conf);
static void dealloc_region_rid(int xid, recordid rid);
static void free_region_rid(int xid, recordid tree,
logtree_page_deallocator_t dealloc,
void *allocator_state);
@ -129,8 +129,6 @@ public:
void *allocator_state);
inline DataPage<datatuple>::RegionAllocator* get_alloc() { return region_alloc; }
// inline void set_alloc(DataPage<datatuple>::RegionAllocator* a1) { region_alloc = a1; } // XXX kludge; must be a better api for this
// (currently, need to get rid from dpstate. add a 'register' method that sets the rid of the region allocator?)
/**
Initialize a page for use as an internal node of the tree.
@ -193,18 +191,16 @@ public:
inline logtree * get_tree_c2(){return tree_c2;}
inline logtree * get_tree_c1(){return tree_c1;}
inline logtree * get_tree_c1_mergeable(){return tree_c1_mergeable;}
inline void set_tree_c1(logtree *t){tree_c1=t;}
inline void set_tree_c1_mergeable(logtree *t){tree_c1_mergeable=t;}
inline void set_tree_c2(logtree *t){tree_c2=t;}
inline rbtree_ptr_t get_tree_c0(){return tree_c0;}
inline rbtree_ptr_t get_tree_c0_mergeable(){return tree_c0_mergeable;}
void set_tree_c0(rbtree_ptr_t newtree){tree_c0 = newtree;}
inline recordid get_dpstate1(){return tbl_header.c1_dp_state;}
inline recordid get_dpstate2(){return tbl_header.c2_dp_state;}
inline recordid get_treestate1(){return tbl_header.c1_state;}
inline recordid get_treestate2(){return tbl_header.c2_state;}
void set_tree_c0_mergeable(rbtree_ptr_t newtree){tree_c0_mergeable = newtree;}
int get_fixed_page_count(){return fixed_page_count;}
void set_fixed_page_count(int count){fixed_page_count = count;}
@ -223,9 +219,6 @@ public:
recordid c1_root;
recordid c1_state;
recordid c1_dp_state;
//epoch_t beginning;
//epoch_t end;
};
const static RegionAllocConf_t DATAPAGE_REGION_ALLOC_STATIC_INITIALIZER;
@ -246,8 +239,9 @@ private:
logtree *tree_c2; //big tree
logtree *tree_c1; //small tree
logtree *tree_c1_mergeable; //small tree: ready to be merged with c2
rbtree_ptr_t tree_c0; // in-mem red black tree
rbtree_ptr_t tree_c0_mergeable; // in-mem red black tree: ready to be merged with c1.
int tsize; //number of tuples
int64_t tree_bytes; //number of bytes
@ -256,8 +250,6 @@ private:
//DATA PAGE SETTINGS
int fixed_page_count;//number of pages in a datapage
// logtable_mergedata * mergedata;
tuplemerger *tmerger;
bool still_running_;
@ -279,7 +271,6 @@ public:
static lladdIterator_t* open(int xid, recordid root);
static lladdIterator_t* openAt(int xid, recordid root, const byte* key);
static int next(int xid, lladdIterator_t *it);
//static lladdIterator_t *copy(int xid, lladdIterator_t* i);
static void close(int xid, lladdIterator_t *it);

View file

@ -19,8 +19,7 @@ int merge_scheduler::addlogtable(logtable *ltable)
mdata->header_lock = initlock();
mdata->rbtree_mut = new pthread_mutex_t;
pthread_mutex_init(mdata->rbtree_mut,0);
mdata->old_c0 = new rbtree_ptr_t;
*mdata->old_c0 = 0;
ltable->set_tree_c0_mergeable(NULL);
mdata->input_needed = new bool(false);
@ -32,8 +31,8 @@ int merge_scheduler::addlogtable(logtable *ltable)
mdata->input_size = new int64_t(100);
mdata->diskmerge_args = new merger_args<logtree>;
mdata->memmerge_args = new merger_args<rbtree_t>;
mdata->diskmerge_args = new merger_args;
mdata->memmerge_args = new merger_args;
mergedata.push_back(std::make_pair(ltable, mdata));
return mergedata.size()-1;
@ -50,7 +49,6 @@ merge_scheduler::~merge_scheduler()
//delete the mergedata fields
deletelock(mdata->header_lock);
delete mdata->rbtree_mut;
delete mdata->old_c0;
delete mdata->input_needed;
delete mdata->input_ready_cond;
delete mdata->input_needed_cond;
@ -147,7 +145,7 @@ void merge_scheduler::startlogtable(int index, int64_t MAX_C0_SIZE)
DEBUG("Tree C1 is %lld\n", (long long)ltable->get_tree_c1()->get_root_rec().page);
DEBUG("Tree C2 is %lld\n", (long long)ltable->get_tree_c2()->get_root_rec().page);
struct merger_args<logtree> diskmerge_args= {
struct merger_args diskmerge_args= {
ltable,
1, //worker id
mdata->rbtree_mut, //block_ready_mutex
@ -159,18 +157,15 @@ void merge_scheduler::startlogtable(int index, int64_t MAX_C0_SIZE)
block2_ready_cond, //out_block_ready_cond
0, //max_tree_size No max size for biggest component
&R, //r_i
block1_scratch, //in-tree
0, //out_tree
ltable->get_tree_c2()->get_root_rec(), // my_tree
};
*mdata->diskmerge_args = diskmerge_args;
struct merger_args<rbtree_t> memmerge_args =
struct merger_args memmerge_args =
{
ltable,
2,
mdata->rbtree_mut, //block_ready_mutex
mdata->rbtree_mut,
mdata->input_needed_cond,
mdata->input_needed,
block1_needed_cond,
@ -179,9 +174,6 @@ void merge_scheduler::startlogtable(int index, int64_t MAX_C0_SIZE)
block1_ready_cond,
(int64_t)(R * R * MAX_C0_SIZE),
&R,
mdata->old_c0,
block1_scratch,
ltable->get_tree_c1()->get_root_rec(),
};
*mdata->memmerge_args = memmerge_args;
@ -194,17 +186,32 @@ void merge_scheduler::startlogtable(int index, int64_t MAX_C0_SIZE)
}
// deallocate/free their region
// create new data region for new data pages
/**
* Merge algorithm
*<pre>
1: while(1)
2: wait for c0_mergable
3: begin
4: merge c0_mergable and c1 into c1'
5: force c1'
6: delete c1
7: if c1' is too big
8: c1 = new_empty
9: c1_mergable = c1'
10: else
11: c1 = c1'
12: commit
</pre>
*/
void* memMergeThread(void*arg)
{
int xid;// = Tbegin();
merger_args<rbtree_t> * a = (merger_args<rbtree_t>*)(arg);
assert(a->my_tree.size != -1);
merger_args * a = (merger_args*)(arg);
logtable * ltable = a->ltable;
assert(ltable->get_tree_c1());
int merge_count =0;
// pthread_mutex_lock(a->block_ready_mut);
@ -213,8 +220,8 @@ void* memMergeThread(void*arg)
{
writelock(ltable->mergedata->header_lock,0);
int done = 0;
// get a new input for merge
while(!*(a->in_tree))
// wait for c0_mergable
while(!ltable->get_tree_c0_mergeable())
{
pthread_mutex_lock(a->block_ready_mut);
*a->in_block_needed = true;
@ -242,40 +249,25 @@ void* memMergeThread(void*arg)
if(done==1)
{
pthread_mutex_lock(a->block_ready_mut);
pthread_cond_signal(a->out_block_ready_cond);
pthread_cond_signal(a->out_block_ready_cond); // no block is ready. this allows the other thread to wake up, and see that we're shutting down.
pthread_mutex_unlock(a->block_ready_mut);
unlock(ltable->mergedata->header_lock);
break;
}
if((*a->in_tree)->size()==0) //input empty, this can only happen during shutdown
{
delete *a->in_tree;
*a->in_tree = 0;
unlock(ltable->mergedata->header_lock);
continue;
}
int64_t mergedPages=0;
assert(a->my_tree.size != -1);
//create the iterators
treeIterator<datatuple> *itrA = new treeIterator<datatuple>(a->my_tree);
memTreeIterator<rbtree_t, datatuple> *itrB =
new memTreeIterator<rbtree_t, datatuple>(*a->in_tree);
//Tcommit(xid);
// 3: Begin transaction
xid = Tbegin();
// 4: Merge
//create the iterators
treeIterator<datatuple> *itrA = new treeIterator<datatuple>(ltable->get_tree_c1()->get_root_rec()); // XXX don't want get_root_rec() to be here.
memTreeIterator<rbtree_t, datatuple> *itrB =
new memTreeIterator<rbtree_t, datatuple>(ltable->get_tree_c0_mergeable());
//create a new tree
logtree * scratch_tree = new logtree(new DataPage<datatuple>::RegionAllocator(xid, ltable->get_dpstate1() /*rid of old header*/, 10000)); // XXX should not hardcode region size)
recordid scratch_root = scratch_tree->create(xid);
//save the old dp state values
DataPage<datatuple>::RegionAllocator *old_alloc = ltable->get_tree_c1()->get_alloc();
old_alloc->done(); // XXX do this earlier
recordid oldAllocState = ltable->get_tree_c1()->get_tree_state();
logtree * c1_prime = new logtree(xid); // XXX should not hardcode region size)
//pthread_mutex_unlock(a->block_ready_mut);
unlock(ltable->mergedata->header_lock);
@ -284,16 +276,24 @@ void* memMergeThread(void*arg)
printf("mmt:\tMerging:\n");
int64_t npages = 0;
mergedPages = merge_iterators<typeof(*itrA),typeof(*itrB)>(xid, itrA, itrB, ltable, scratch_tree, npages, false);
int64_t mergedPages = merge_iterators<typeof(*itrA),typeof(*itrB)>(xid, itrA, itrB, ltable, c1_prime, npages, false);
delete itrA;
delete itrB;
// 5: force c1'
//force write the new region to disk
recordid scratch_alloc_state = scratch_tree->get_tree_state();
logtree::force_region_rid(xid, &scratch_alloc_state);
logtree::force_region_rid(xid, c1_prime->get_tree_state());
//force write the new datapages
scratch_tree->get_alloc()->force_regions(xid);
c1_prime->get_alloc()->force_regions(xid);
// 6: delete c1 and c0_mergeable
logtree::dealloc_region_rid(xid, ltable->get_tree_c1()->get_tree_state());
ltable->get_tree_c1()->get_alloc()->dealloc_regions(xid);
logtable::tearDownTree(ltable->get_tree_c0_mergeable());
ltable->set_tree_c0_mergeable(NULL);
//writes complete
//now atomically replace the old c1 with new c1
@ -303,32 +303,6 @@ void* memMergeThread(void*arg)
merge_count++;
printf("mmt:\tmerge_count %d #pages written %lld\n", merge_count, npages);
delete ltable->get_tree_c1();
ltable->set_tree_c1(scratch_tree);
logtable::table_header h;
Tread(xid, ltable->get_table_rec(), &h);
h.c1_root = scratch_root;
h.c1_state = scratch_alloc_state;
//note we already updated the dpstate before the merge
printf("mmt:\tUpdated C1's position on disk to %lld\n",scratch_root.page);
Tset(xid, ltable->get_table_rec(), &h);
//Tcommit(xid);
//xid = Tbegin();
// free old my_tree here
//TODO: check
logtree::free_region_rid(xid, a->my_tree, logtree::dealloc_region_rid, &oldAllocState);
//free the old data pages
old_alloc->dealloc_regions(xid);
Tcommit(xid);
//xid = Tbegin();
//TODO: this is simplistic for now
//signal the other merger if necessary
double target_R = *(a->r_i);
@ -337,12 +311,19 @@ void* memMergeThread(void*arg)
if( (new_c1_size / ltable->max_c0_size > target_R) ||
(a->max_size && new_c1_size > a->max_size ) )
{
printf("mmt:\tsignaling C2 for merge\n");
// 7: c1' is too big
// 8: c1 = new empty.
ltable->set_tree_c1(new logtree(xid));
printf("mmt:\tsignaling C2 for merge\n");
printf("mmt:\tnew_c1_size %.2f\tMAX_C0_SIZE %lld\ta->max_size %lld\t targetr %.2f \n", new_c1_size,
ltable->max_c0_size, a->max_size, target_R);
// 9: c1_mergeable = c1'
// XXX need to report backpressure here!
while(*a->out_tree) {
// XXX need to report backpressure here! Also, shouldn't be inside a transaction while waiting on backpressure.
while(ltable->get_tree_c1_mergeable()) {
pthread_mutex_lock(a->block_ready_mut);
unlock(ltable->mergedata->header_lock);
@ -350,50 +331,24 @@ void* memMergeThread(void*arg)
pthread_mutex_unlock(a->block_ready_mut);
writelock(ltable->mergedata->header_lock,0);
}
*a->out_tree = scratch_tree;
xid = Tbegin();
ltable->set_tree_c1_mergeable(c1_prime);
pthread_cond_signal(a->out_block_ready_cond);
logtree *empty_tree = new logtree(new DataPage<datatuple>::RegionAllocator(xid, ltable->get_dpstate1() /*rid of old header*/, 10000)); // XXX should not hardcode region size);
empty_tree->create(xid);
a->my_tree = empty_tree->get_root_rec();
ltable->set_tree_c1(empty_tree);
logtable::table_header h;
Tread(xid, ltable->get_table_rec(), &h);
h.c1_root = empty_tree->get_root_rec(); //update root
h.c1_state = empty_tree->get_tree_state(); //update index alloc state
printf("mmt:\tUpdated C1's position on disk to %lld\n",empty_tree->get_root_rec().page);
Tset(xid, ltable->get_table_rec(), &h);
Tcommit(xid);
//xid = Tbegin();
}
else //not signaling the C2 for merge yet
{
printf("mmt:\tnot signaling C2 for merge\n");
a->my_tree = scratch_root;
} else {
// 11: c1 = c1'
ltable->set_tree_c1(c1_prime);
}
rbtree_ptr_t deltree = *a->in_tree;
*a->in_tree = 0;
// XXX want to set this stuff somewhere.
logtable::table_header h;
printf("mmt:\tUpdated C1's position on disk to %lld\n",ltable->get_tree_c1()->get_root_rec().page);
Tcommit(xid);
//Tcommit(xid);
unlock(ltable->mergedata->header_lock);
//TODO: get the freeing outside of the lock
//// ----------- Free in_tree
logtable::tearDownTree(deltree);
//deltree = 0;
}
//pthread_mutex_unlock(a->block_ready_mut);
@ -407,10 +362,10 @@ void *diskMergeThread(void*arg)
{
int xid;// = Tbegin();
merger_args<logtree> * a = (merger_args<logtree>*)(arg);
assert(a->my_tree.size != -1);
merger_args * a = (merger_args*)(arg);
logtable * ltable = a->ltable;
assert(ltable->get_tree_c2());
int merge_count =0;
//pthread_mutex_lock(a->block_ready_mut);
@ -420,7 +375,7 @@ void *diskMergeThread(void*arg)
writelock(ltable->mergedata->header_lock,0);
int done = 0;
// get a new input for merge
while(!*(a->in_tree))
while(!ltable->get_tree_c1_mergeable())
{
pthread_mutex_lock(a->block_ready_mut);
*a->in_block_needed = true;
@ -451,25 +406,16 @@ void *diskMergeThread(void*arg)
int64_t mergedPages=0;
assert(a->my_tree.size != -1);
//create the iterators
treeIterator<datatuple> *itrA = new treeIterator<datatuple>(a->my_tree);
treeIterator<datatuple> *itrA = new treeIterator<datatuple>(ltable->get_tree_c2()->get_root_rec());
treeIterator<datatuple> *itrB =
new treeIterator<datatuple>((*a->in_tree)->get_root_rec());
new treeIterator<datatuple>(ltable->get_tree_c1_mergeable()->get_root_rec());
xid = Tbegin();
//create a new tree
//TODO: maybe you want larger regions for the second tree?
logtree * scratch_tree = new logtree(new DataPage<datatuple>::RegionAllocator(xid, ltable->get_dpstate2() /*rid of old header*/, 10000)); // XXX should not hardcode region size
recordid scratch_root = scratch_tree->create(xid);
//save the old dp state values
DataPage<datatuple>::RegionAllocator *old_alloc1 = ltable->get_tree_c1()->get_alloc();
DataPage<datatuple>::RegionAllocator *old_alloc2 = ltable->get_tree_c2()->get_alloc();
recordid oldAllocState = ltable->get_tree_c2()->get_tree_state();
logtree * c2_prime = new logtree(xid);
unlock(ltable->mergedata->header_lock);
@ -478,16 +424,24 @@ void *diskMergeThread(void*arg)
printf("dmt:\tMerging:\n");
int64_t npages = 0;
mergedPages = merge_iterators<typeof(*itrA),typeof(*itrB)>(xid, itrA, itrB, ltable, scratch_tree, npages, true);
mergedPages = merge_iterators<typeof(*itrA),typeof(*itrB)>(xid, itrA, itrB, ltable, c2_prime, npages, true);
delete itrA;
delete itrB;
//force write the new region to disk
recordid scratch_alloc_state = scratch_tree->get_tree_state();
logtree::force_region_rid(xid, &scratch_alloc_state);
//force write the new datapages
scratch_tree->get_alloc()->force_regions(xid);
logtree::force_region_rid(xid, c2_prime->get_tree_state());
c2_prime->get_alloc()->force_regions(xid);
logtree::dealloc_region_rid(xid, ltable->get_tree_c1_mergeable()->get_tree_state());
ltable->get_tree_c1_mergeable()->get_alloc()->dealloc_regions(xid);
delete ltable->get_tree_c1_mergeable();
ltable->set_tree_c1_mergeable(0);
logtree::dealloc_region_rid(xid, ltable->get_tree_c2()->get_tree_state());
ltable->get_tree_c2()->get_alloc()->dealloc_regions(xid);
delete ltable->get_tree_c2();
//writes complete
//now atomically replace the old c2 with new c2
@ -500,50 +454,18 @@ void *diskMergeThread(void*arg)
printf("dmt:\tmerge_count %d\t#written pages: %lld\n optimal r %.2f", merge_count, npages, *(a->r_i));
delete ltable->get_tree_c2();
ltable->set_tree_c2(scratch_tree);
// 11: C2 is never too big.
ltable->set_tree_c2(c2_prime);
logtable::table_header h;
Tread(xid, ltable->get_table_rec(), &h);
h.c2_root = scratch_root;
h.c2_state = scratch_alloc_state;
//note we already updated the dpstate before the merge
printf("dmt:\tUpdated C2's position on disk to %lld\n",scratch_root.page);
Tset(xid, ltable->get_table_rec(), &h);
// free old my_tree here
//TODO: check
logtree::free_region_rid(xid, a->my_tree, logtree::dealloc_region_rid, &oldAllocState);
//free the old data pages
old_alloc2->dealloc_regions(xid);
logtable::table_header h; // XXX Need to set header.
a->my_tree = scratch_root;
//// ----------- Free in_tree
//TODO: check
logtree::free_region_rid(xid, (*a->in_tree)->get_root_rec(),
logtree::dealloc_region_rid,
&((*a->in_tree)->get_tree_state()));
old_alloc1->dealloc_regions(xid); // XXX make sure that both of these are 'unlinked' before this happens
printf("dmt:\tUpdated C2's position on disk to %lld\n",(long long)-1);
Tcommit(xid);
//xid = Tbegin();
//Tcommit(xid);
delete *a->in_tree;
*a->in_tree = 0;
unlock(ltable->mergedata->header_lock);
}
//pthread_mutex_unlock(a->block_ready_mut);
return 0;
}
template <class ITA, class ITB>

View file

@ -10,8 +10,7 @@
//TODO: 400 bytes overhead per tuple, this is nuts, check if this is true...
static const int RB_TREE_OVERHEAD = 400;
static const double MIN_R = 3.0;
//T is either logtree or red-black tree
template <class T>
struct merger_args
{
logtable * ltable;
@ -28,14 +27,8 @@ struct merger_args
pthread_cond_t * in_block_ready_cond;
pthread_cond_t * out_block_ready_cond;
int64_t max_size; //pageid_t
int64_t max_size;
double * r_i;
T ** in_tree;
logtree ** out_tree;
recordid my_tree;
};
@ -49,7 +42,6 @@ struct logtable_mergedata
rwl *header_lock;
pthread_mutex_t * rbtree_mut;
rbtree_ptr_t *old_c0; //in-mem red black tree being merged / to be merged
bool *input_needed; // memmerge-input needed
@ -58,9 +50,9 @@ struct logtable_mergedata
int64_t * input_size;
//merge args 1
struct merger_args<logtree> *diskmerge_args;
struct merger_args *diskmerge_args;
//merge args 2
struct merger_args<rbtree_t> *memmerge_args;
struct merger_args *memmerge_args;
};
@ -79,14 +71,8 @@ public:
struct logtable_mergedata *getMergeData(int index){return mergedata[index].second;}
void shutdown();
};
void* memMergeThread(void* arg);
template <class ITA, class ITB>
int64_t merge_iterators(int xid,
ITA *itrA,
@ -96,8 +82,7 @@ int64_t merge_iterators(int xid,
int64_t &npages,
bool dropDeletes);
void* memMergeThread(void* arg);
void* diskMergeThread(void* arg);
#endif

View file

@ -48,10 +48,8 @@ void insertProbeIter(size_t NUM_ENTRIES)
if(data_arr.size() > NUM_ENTRIES)
data_arr.erase(data_arr.begin()+NUM_ENTRIES, data_arr.end());
recordid datapage_header_rid = Talloc(xid, DataPage<datatuple>::RegionAllocator::header_size);
DataPage<datatuple>::RegionAllocator * alloc
= new DataPage<datatuple>::RegionAllocator(xid, datapage_header_rid, 10000); // ~ 10 datapages per region.
= new DataPage<datatuple>::RegionAllocator(xid, 10000); // ~ 10 datapages per region.
recordid alloc_state = Talloc(xid,sizeof(RegionAllocConf_t));