two unrelated changes: release rb_mut before getting header_mut on read path (breaks delta tuples), and implement first-cut support for pulling in statistics when reopening logstores (required for correct recovery)

git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@868 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
sears 2010-06-21 20:03:05 +00:00
parent 118ae94a9b
commit fded91fec6
5 changed files with 54 additions and 64 deletions

View file

@ -50,10 +50,6 @@ logtable<TUPLE>::logtable(pageid_t internal_region_size, pageid_t datapage_regio
this->internal_region_size = internal_region_size; this->internal_region_size = internal_region_size;
this->datapage_region_size = datapage_region_size; this->datapage_region_size = datapage_region_size;
this->datapage_size = datapage_size; this->datapage_size = datapage_size;
c0_stats = merge_mgr->get_merge_stats(0);
merge_mgr->new_merge(0);
c0_stats->starting_merge();
} }
template<class TUPLE> template<class TUPLE>
@ -104,7 +100,13 @@ recordid logtable<TUPLE>::allocTable(int xid)
//create the small tree //create the small tree
tree_c1 = new diskTreeComponent(xid, internal_region_size, datapage_region_size, datapage_size, stats); tree_c1 = new diskTreeComponent(xid, internal_region_size, datapage_region_size, datapage_size, stats);
update_persistent_header(xid); c0_stats = merge_mgr->get_merge_stats(0);
merge_mgr->new_merge(0);
c0_stats->starting_merge();
update_persistent_header(xid, 1);
update_persistent_header(xid, 2);
return table_rec; return table_rec;
} }
@ -114,9 +116,20 @@ void logtable<TUPLE>::openTable(int xid, recordid rid) {
Tread(xid, table_rec, &tbl_header); Tread(xid, table_rec, &tbl_header);
tree_c2 = new diskTreeComponent(xid, tbl_header.c2_root, tbl_header.c2_state, tbl_header.c2_dp_state, 0); tree_c2 = new diskTreeComponent(xid, tbl_header.c2_root, tbl_header.c2_state, tbl_header.c2_dp_state, 0);
tree_c1 = new diskTreeComponent(xid, tbl_header.c1_root, tbl_header.c1_state, tbl_header.c1_dp_state, 0); tree_c1 = new diskTreeComponent(xid, tbl_header.c1_root, tbl_header.c1_state, tbl_header.c1_dp_state, 0);
merge_mgr->get_merge_stats(1)->bytes_out = tbl_header.c1_base_size;
merge_mgr->get_merge_stats(1)->base_size = tbl_header.c1_base_size;
merge_mgr->get_merge_stats(1)->mergeable_size = tbl_header.c1_mergeable_size;
merge_mgr->get_merge_stats(2)->base_size = tbl_header.c2_base_size;
merge_mgr->get_merge_stats(2)->bytes_out = tbl_header.c2_base_size;
c0_stats = merge_mgr->get_merge_stats(0);
merge_mgr->new_merge(0);
c0_stats->starting_merge();
} }
template<class TUPLE> template<class TUPLE>
void logtable<TUPLE>::update_persistent_header(int xid) { void logtable<TUPLE>::update_persistent_header(int xid, int merge_level) {
tbl_header.c2_root = tree_c2->get_root_rid(); tbl_header.c2_root = tree_c2->get_root_rid();
tbl_header.c2_dp_state = tree_c2->get_datapage_allocator_rid(); tbl_header.c2_dp_state = tree_c2->get_datapage_allocator_rid();
@ -125,6 +138,17 @@ void logtable<TUPLE>::update_persistent_header(int xid) {
tbl_header.c1_dp_state = tree_c1->get_datapage_allocator_rid(); tbl_header.c1_dp_state = tree_c1->get_datapage_allocator_rid();
tbl_header.c1_state = tree_c1->get_internal_node_allocator_rid(); tbl_header.c1_state = tree_c1->get_internal_node_allocator_rid();
if(merge_level == 1) {
tbl_header.c1_base_size = merge_mgr->get_merge_stats(1)->bytes_out;
tbl_header.c1_mergeable_size = merge_mgr->get_merge_stats(1)->mergeable_size;
} else if(merge_level == 2) {
tbl_header.c1_mergeable_size = 0;
tbl_header.c2_base_size = merge_mgr->get_merge_stats(2)->bytes_out;
} else {
assert(merge_level == 1 || merge_level == 2);
abort();
}
Tset(xid, table_rec, &tbl_header); Tset(xid, table_rec, &tbl_header);
} }
@ -218,8 +242,8 @@ datatuple * logtable<TUPLE>::findTuple(int xid, const datatuple::key_t key, size
ret_tuple = (*rbitr)->create_copy(); ret_tuple = (*rbitr)->create_copy();
} }
rwlc_readlock(header_mut); // has to be before rb_mut, or we could merge the tuple with itself due to an intervening merge
pthread_mutex_unlock(&rb_mut); pthread_mutex_unlock(&rb_mut);
rwlc_readlock(header_mut); // XXX: FIXME with optimisitic concurrency control. Has to be before rb_mut, or we could merge the tuple with itself due to an intervening merge
bool done = false; bool done = false;
//step: 2 look into first in tree if exists (a first level merge going on) //step: 2 look into first in tree if exists (a first level merge going on)
@ -371,8 +395,8 @@ datatuple * logtable<TUPLE>::findTuple_first(int xid, datatuple::key_t key, size
{ {
DEBUG("Not in mem tree %d\n", tree_c0->size()); DEBUG("Not in mem tree %d\n", tree_c0->size());
rwlc_readlock(header_mut);
pthread_mutex_unlock(&rb_mut); pthread_mutex_unlock(&rb_mut);
rwlc_readlock(header_mut); // XXX FIXME WITH OCC!!
//step: 2 look into first in tree if exists (a first level merge going on) //step: 2 look into first in tree if exists (a first level merge going on)
if(get_tree_c0_mergeable() != NULL) if(get_tree_c0_mergeable() != NULL)

View file

@ -81,7 +81,7 @@ public:
merge_mgr->get_merge_stats(1); merge_mgr->get_merge_stats(1);
} }
void set_tree_c0_mergeable(memTreeComponent<datatuple>::rbtree_ptr_t newtree){tree_c0_mergeable = newtree; bump_epoch(); } void set_tree_c0_mergeable(memTreeComponent<datatuple>::rbtree_ptr_t newtree){tree_c0_mergeable = newtree; bump_epoch(); }
void update_persistent_header(int xid); void update_persistent_header(int xid, int merge_level);
void setMergeData(logtable_mergedata * mdata); void setMergeData(logtable_mergedata * mdata);
logtable_mergedata* getMergeData(){return mergedata;} logtable_mergedata* getMergeData(){return mergedata;}
@ -97,6 +97,9 @@ public:
recordid c1_root; recordid c1_root;
recordid c1_state; recordid c1_state;
recordid c1_dp_state; recordid c1_dp_state;
pageid_t c2_base_size;
pageid_t c1_mergeable_size;
pageid_t c1_base_size;
}; };
logtable_mergedata * mergedata; logtable_mergedata * mergedata;

View file

@ -25,9 +25,9 @@ class mergeStats {
merge_level(merge_level), merge_level(merge_level),
merge_count(0), merge_count(0),
base_size(0), base_size(0),
mergeable_size(0),
target_size(target_size), target_size(target_size),
current_size(0), current_size(0),
mergeable_size(0),
bytes_out_with_overhead(0), bytes_out_with_overhead(0),
bytes_out(0), bytes_out(0),
num_tuples_out(0), num_tuples_out(0),
@ -114,14 +114,17 @@ class mergeStats {
friend class mergeManager; friend class mergeManager;
struct timespec last_tick; struct timespec last_tick;
public: // XXX only accessed during initialization.
pageid_t base_size; pageid_t base_size;
pageid_t mergeable_size; // protected by mutex.
protected:
pageid_t target_size; pageid_t target_size;
pageid_t current_size; pageid_t current_size;
pageid_t mergeable_size; // protected by mutex.
pageid_t bytes_out_with_overhead;// How many bytes did we write (including internal tree nodes)? pageid_t bytes_out_with_overhead;// How many bytes did we write (including internal tree nodes)?
public:
pageid_t bytes_out; // How many bytes worth of tuples did we write? pageid_t bytes_out; // How many bytes worth of tuples did we write?
protected:
pageid_t num_tuples_out; // How many tuples did we write? pageid_t num_tuples_out; // How many tuples did we write?
pageid_t num_datapages_out; // How many datapages? pageid_t num_datapages_out; // How many datapages?
pageid_t bytes_in_small; // How many bytes from the small input tree (for C0, we ignore tree overheads)? pageid_t bytes_in_small; // How many bytes from the small input tree (for C0, we ignore tree overheads)?

View file

@ -15,9 +15,6 @@ int merge_scheduler::addlogtable(logtable<datatuple> *ltable)
// initialize merge data // initialize merge data
ltable->set_tree_c0_mergeable(NULL); ltable->set_tree_c0_mergeable(NULL);
mdata->diskmerge_args = new merger_args;
mdata->memmerge_args = new merger_args;
mergedata.push_back(std::make_pair(ltable, mdata)); mergedata.push_back(std::make_pair(ltable, mdata));
return mergedata.size()-1; return mergedata.size()-1;
@ -25,14 +22,6 @@ int merge_scheduler::addlogtable(logtable<datatuple> *ltable)
merge_scheduler::~merge_scheduler() merge_scheduler::~merge_scheduler()
{ {
for(size_t i=0; i<mergedata.size(); i++)
{
logtable<datatuple> *ltable = mergedata[i].first;
logtable_mergedata *mdata = mergedata[i].second;
delete mdata->diskmerge_args;
delete mdata->memmerge_args;
}
mergedata.clear(); mergedata.clear();
} }
@ -76,26 +65,11 @@ void merge_scheduler::startlogtable(int index, int64_t MAX_C0_SIZE)
DEBUG("Tree C1 is %lld\n", (long long)ltable->get_tree_c1()->get_root_rec().page); DEBUG("Tree C1 is %lld\n", (long long)ltable->get_tree_c1()->get_root_rec().page);
DEBUG("Tree C2 is %lld\n", (long long)ltable->get_tree_c2()->get_root_rec().page); DEBUG("Tree C2 is %lld\n", (long long)ltable->get_tree_c2()->get_root_rec().page);
struct merger_args diskmerge_args= {
ltable,
0, //max_tree_size No max size for biggest component
};
*mdata->diskmerge_args = diskmerge_args;
struct merger_args memmerge_args =
{
ltable,
(int64_t)(MAX_C0_SIZE), // XXX why did this multiply by R^2 before??
};
*mdata->memmerge_args = memmerge_args;
void * (*diskmerger)(void*) = diskMergeThread; void * (*diskmerger)(void*) = diskMergeThread;
void * (*memmerger)(void*) = memMergeThread; void * (*memmerger)(void*) = memMergeThread;
pthread_create(&mdata->diskmerge_thread, 0, diskmerger, mdata->diskmerge_args); pthread_create(&mdata->diskmerge_thread, 0, diskmerger, ltable);
pthread_create(&mdata->memmerge_thread, 0, memmerger, mdata->memmerge_args); pthread_create(&mdata->memmerge_thread, 0, memmerger, ltable);
} }
@ -135,13 +109,11 @@ void* memMergeThread(void*arg)
int xid; int xid;
merger_args * a = (merger_args*)(arg); logtable<datatuple> * ltable = (logtable<datatuple>*)arg;
logtable<datatuple> * ltable = a->ltable;
assert(ltable->get_tree_c1()); assert(ltable->get_tree_c1());
int merge_count =0; int merge_count =0;
mergeStats * stats = a->ltable->merge_mgr->get_merge_stats(1); mergeStats * stats = ltable->merge_mgr->get_merge_stats(1);
while(true) // 1 while(true) // 1
{ {
@ -227,7 +199,7 @@ void* memMergeThread(void*arg)
double new_c1_size = stats->output_size(); double new_c1_size = stats->output_size();
pthread_cond_signal(&ltable->c0_needed); pthread_cond_signal(&ltable->c0_needed);
ltable->update_persistent_header(xid); ltable->update_persistent_header(xid, 1);
Tcommit(xid); Tcommit(xid);
ltable->merge_mgr->finished_merge(1); ltable->merge_mgr->finished_merge(1);
@ -261,8 +233,10 @@ void* memMergeThread(void*arg)
ltable->set_tree_c1(new diskTreeComponent(xid, ltable->internal_region_size, ltable->datapage_region_size, ltable->datapage_size, stats)); ltable->set_tree_c1(new diskTreeComponent(xid, ltable->internal_region_size, ltable->datapage_region_size, ltable->datapage_size, stats));
pthread_cond_signal(&ltable->c1_ready); pthread_cond_signal(&ltable->c1_ready);
pageid_t old_bytes_out = stats->bytes_out;
ltable->update_persistent_header(xid); stats->bytes_out = 0; // XXX HACK
ltable->update_persistent_header(xid, 1);
stats->bytes_out = old_bytes_out;
Tcommit(xid); Tcommit(xid);
} }
@ -285,14 +259,12 @@ void *diskMergeThread(void*arg)
{ {
int xid; int xid;
merger_args * a = (merger_args*)(arg); logtable<datatuple> * ltable = (logtable<datatuple>*)arg;
logtable<datatuple> * ltable = a->ltable;
assert(ltable->get_tree_c2()); assert(ltable->get_tree_c2());
int merge_count =0; int merge_count =0;
mergeStats * stats = a->ltable->merge_mgr->get_merge_stats(2); mergeStats * stats = ltable->merge_mgr->get_merge_stats(2);
while(true) while(true)
{ {
@ -378,7 +350,7 @@ void *diskMergeThread(void*arg)
DEBUG("dmt:\tUpdated C2's position on disk to %lld\n",(long long)-1); DEBUG("dmt:\tUpdated C2's position on disk to %lld\n",(long long)-1);
// 13 // 13
ltable->update_persistent_header(xid); ltable->update_persistent_header(xid, 2);
Tcommit(xid); Tcommit(xid);
ltable->merge_mgr->finished_merge(2); ltable->merge_mgr->finished_merge(2);

View file

@ -12,23 +12,11 @@
static const int RB_TREE_OVERHEAD = 400; static const int RB_TREE_OVERHEAD = 400;
static const double MIN_R = 3.0; static const double MIN_R = 3.0;
struct merger_args
{
logtable<datatuple> * ltable;
int64_t max_size;
};
struct logtable_mergedata struct logtable_mergedata
{ {
//merge threads //merge threads
pthread_t diskmerge_thread; pthread_t diskmerge_thread;
pthread_t memmerge_thread; pthread_t memmerge_thread;
//merge args 1
struct merger_args *diskmerge_args;
//merge args 2
struct merger_args *memmerge_args;
}; };
class merge_scheduler class merge_scheduler