fix merge deadlock; port to fixed stasis buffer manager api; enable new buffer manager by default
git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@2330 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
parent
0a0646e75e
commit
e5058da393
6 changed files with 21 additions and 12 deletions
|
@ -114,7 +114,7 @@ void DataPage<TUPLE>::initialize_page(pageid_t pageid) {
|
||||||
//initialize header
|
//initialize header
|
||||||
p->pageType = DATA_PAGE;
|
p->pageType = DATA_PAGE;
|
||||||
|
|
||||||
//clear page (arranges for null-padding)
|
//clear page (arranges for null-padding) XXX null pad more carefully and use sentinel value instead?
|
||||||
memset(p->memAddr, 0, PAGE_SIZE);
|
memset(p->memAddr, 0, PAGE_SIZE);
|
||||||
|
|
||||||
//we're the last page for now.
|
//we're the last page for now.
|
||||||
|
@ -162,7 +162,10 @@ size_t DataPage<TUPLE>::read_bytes(byte * buf, off_t offset, ssize_t remaining)
|
||||||
chunk.size = 0; // eof
|
chunk.size = 0; // eof
|
||||||
} else {
|
} else {
|
||||||
Page *p = alloc_ ? alloc_->load_page(xid_, chunk.page) : loadPage(xid_, chunk.page);
|
Page *p = alloc_ ? alloc_->load_page(xid_, chunk.page) : loadPage(xid_, chunk.page);
|
||||||
assert(p->pageType == DATA_PAGE);
|
if(p->pageType != DATA_PAGE) {
|
||||||
|
fprintf(stderr, "Page type %d, id %lld lsn %lld\n", (int)p->pageType, (long long)p->id, (long long)p->LSN);
|
||||||
|
assert(p->pageType == DATA_PAGE);
|
||||||
|
}
|
||||||
if((chunk.page + 1 == page_count_ + first_page_)
|
if((chunk.page + 1 == page_count_ + first_page_)
|
||||||
&& (*is_another_page_ptr(p))) {
|
&& (*is_another_page_ptr(p))) {
|
||||||
page_count_++;
|
page_count_++;
|
||||||
|
|
|
@ -398,7 +398,7 @@ recordid diskTreeComponent::internalNodes::appendInternalNode(int xid, Page *p,
|
||||||
if(ret.size != INVALID_SLOT) {
|
if(ret.size != INVALID_SLOT) {
|
||||||
stasis_record_alloc_done(xid, p, ret);
|
stasis_record_alloc_done(xid, p, ret);
|
||||||
writeNodeRecord(xid,p,ret,key,key_len,val_page);
|
writeNodeRecord(xid,p,ret,key,key_len,val_page);
|
||||||
stasis_page_lsn_write(xid, p, internal_node_alloc->get_lsn(xid));
|
stasis_page_lsn_write(xid, p, internal_node_alloc->get_lsn(xid)); // XXX remove this (writeNodeRecord calls it for us)
|
||||||
}
|
}
|
||||||
unlock(p->rwlatch);
|
unlock(p->rwlatch);
|
||||||
} else {
|
} else {
|
||||||
|
|
11
logstore.cpp
11
logstore.cpp
|
@ -37,7 +37,8 @@ logtable<TUPLE>::logtable(pageid_t max_c0_size, pageid_t internal_region_size, p
|
||||||
// This bool is purely for external code.
|
// This bool is purely for external code.
|
||||||
this->accepting_new_requests = true;
|
this->accepting_new_requests = true;
|
||||||
this->shutting_down_ = false;
|
this->shutting_down_ = false;
|
||||||
flushing = false;
|
c0_flushing = false;
|
||||||
|
c1_flushing = false;
|
||||||
this->merge_mgr = 0;
|
this->merge_mgr = 0;
|
||||||
tmerger = new tuplemerger(&replace_merger);
|
tmerger = new tuplemerger(&replace_merger);
|
||||||
|
|
||||||
|
@ -85,8 +86,8 @@ void logtable<TUPLE>::init_stasis() {
|
||||||
DataPage<datatuple>::register_stasis_page_impl();
|
DataPage<datatuple>::register_stasis_page_impl();
|
||||||
//stasis_buffer_manager_size = 768 * 1024; // 4GB = 2^10 pages:
|
//stasis_buffer_manager_size = 768 * 1024; // 4GB = 2^10 pages:
|
||||||
// XXX Workaround Stasis' (still broken) default concurrent buffer manager
|
// XXX Workaround Stasis' (still broken) default concurrent buffer manager
|
||||||
stasis_buffer_manager_factory = stasis_buffer_manager_hash_factory;
|
// stasis_buffer_manager_factory = stasis_buffer_manager_hash_factory;
|
||||||
stasis_buffer_manager_hint_writes_are_sequential = 0;
|
stasis_buffer_manager_hint_writes_are_sequential = 1;
|
||||||
Tinit();
|
Tinit();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -159,7 +160,7 @@ void logtable<TUPLE>::flushTable()
|
||||||
start = tv_to_double(start_tv);
|
start = tv_to_double(start_tv);
|
||||||
|
|
||||||
|
|
||||||
flushing = true;
|
c0_flushing = true;
|
||||||
bool blocked = false;
|
bool blocked = false;
|
||||||
|
|
||||||
int expmcount = merge_count;
|
int expmcount = merge_count;
|
||||||
|
@ -201,7 +202,7 @@ void logtable<TUPLE>::flushTable()
|
||||||
} else {
|
} else {
|
||||||
DEBUG("signaled c0-c1 merge\n");
|
DEBUG("signaled c0-c1 merge\n");
|
||||||
}
|
}
|
||||||
flushing = false;
|
c0_flushing = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
template<class TUPLE>
|
template<class TUPLE>
|
||||||
|
|
|
@ -124,7 +124,8 @@ public:
|
||||||
if(!shutting_down_) {
|
if(!shutting_down_) {
|
||||||
shutting_down_ = true;
|
shutting_down_ = true;
|
||||||
flushTable();
|
flushTable();
|
||||||
flushing = true;
|
c0_flushing = true;
|
||||||
|
c1_flushing = true;
|
||||||
}
|
}
|
||||||
rwlc_unlock(header_mut);
|
rwlc_unlock(header_mut);
|
||||||
// XXX must need to do other things! (join the threads?)
|
// XXX must need to do other things! (join the threads?)
|
||||||
|
@ -144,7 +145,8 @@ private:
|
||||||
bool c0_is_merging;
|
bool c0_is_merging;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
bool flushing;
|
bool c0_flushing;
|
||||||
|
bool c1_flushing; // this needs to be set to true at shutdown, or when the c0-c1 merger is waiting for c1-c2 to finish its merge
|
||||||
|
|
||||||
//DATA PAGE SETTINGS
|
//DATA PAGE SETTINGS
|
||||||
pageid_t internal_region_size; // in number of pages
|
pageid_t internal_region_size; // in number of pages
|
||||||
|
|
|
@ -103,7 +103,7 @@ void * merge_scheduler::memMergeThread() {
|
||||||
|
|
||||||
// needs to be past the rwlc_unlock...
|
// needs to be past the rwlc_unlock...
|
||||||
memTreeComponent<datatuple>::batchedRevalidatingIterator *itrB =
|
memTreeComponent<datatuple>::batchedRevalidatingIterator *itrB =
|
||||||
new memTreeComponent<datatuple>::batchedRevalidatingIterator(ltable_->get_tree_c0(), ltable_->merge_mgr, ltable_->max_c0_size, <able_->flushing, 100, <able_->rb_mut);
|
new memTreeComponent<datatuple>::batchedRevalidatingIterator(ltable_->get_tree_c0(), ltable_->merge_mgr, ltable_->max_c0_size, <able_->c0_flushing, 100, <able_->rb_mut);
|
||||||
|
|
||||||
//: do the merge
|
//: do the merge
|
||||||
DEBUG("mmt:\tMerging:\n");
|
DEBUG("mmt:\tMerging:\n");
|
||||||
|
@ -171,7 +171,9 @@ void * merge_scheduler::memMergeThread() {
|
||||||
|
|
||||||
// XXX need to report backpressure here!
|
// XXX need to report backpressure here!
|
||||||
while(ltable_->get_tree_c1_mergeable()) {
|
while(ltable_->get_tree_c1_mergeable()) {
|
||||||
|
ltable_->c1_flushing = true;
|
||||||
rwlc_cond_wait(<able_->c1_needed, ltable_->header_mut);
|
rwlc_cond_wait(<able_->c1_needed, ltable_->header_mut);
|
||||||
|
ltable_->c1_flushing = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
xid = Tbegin();
|
xid = Tbegin();
|
||||||
|
@ -254,7 +256,7 @@ void * merge_scheduler::diskMergeThread()
|
||||||
// 4: do the merge.
|
// 4: do the merge.
|
||||||
//create the iterators
|
//create the iterators
|
||||||
diskTreeComponent::iterator *itrA = ltable_->get_tree_c2()->open_iterator();
|
diskTreeComponent::iterator *itrA = ltable_->get_tree_c2()->open_iterator();
|
||||||
diskTreeComponent::iterator *itrB = ltable_->get_tree_c1_mergeable()->open_iterator(ltable_->merge_mgr, 0.05, <able_->shutting_down_);
|
diskTreeComponent::iterator *itrB = ltable_->get_tree_c1_mergeable()->open_iterator(ltable_->merge_mgr, 0.05, <able_->c1_flushing);
|
||||||
|
|
||||||
//create a new tree
|
//create a new tree
|
||||||
diskTreeComponent * c2_prime = new diskTreeComponent(xid, ltable_->internal_region_size, ltable_->datapage_region_size, ltable_->datapage_size, stats, (uint64_t)(ltable_->max_c0_size * *ltable_->R() + stats->base_size)/ 1000);
|
diskTreeComponent * c2_prime = new diskTreeComponent(xid, ltable_->internal_region_size, ltable_->datapage_region_size, ltable_->datapage_size, stats, (uint64_t)(ltable_->max_c0_size * *ltable_->R() + stats->base_size)/ 1000);
|
||||||
|
|
|
@ -121,6 +121,7 @@ public:
|
||||||
|
|
||||||
|
|
||||||
lsn_t get_lsn(int xid) {
|
lsn_t get_lsn(int xid) {
|
||||||
|
// XXX we shouldn't need to have this logic in here anymore...
|
||||||
lsn_t xid_lsn = stasis_transaction_table_get((stasis_transaction_table_t*)stasis_runtime_transaction_table(), xid)->prevLSN;
|
lsn_t xid_lsn = stasis_transaction_table_get((stasis_transaction_table_t*)stasis_runtime_transaction_table(), xid)->prevLSN;
|
||||||
lsn_t log_lsn = ((stasis_log_t*)stasis_log())->next_available_lsn((stasis_log_t*)stasis_log());
|
lsn_t log_lsn = ((stasis_log_t*)stasis_log())->next_available_lsn((stasis_log_t*)stasis_log());
|
||||||
lsn_t ret = xid_lsn == INVALID_LSN ? log_lsn-1 : xid_lsn;
|
lsn_t ret = xid_lsn == INVALID_LSN ? log_lsn-1 : xid_lsn;
|
||||||
|
|
Loading…
Reference in a new issue