diff --git a/datapage.cpp b/datapage.cpp index 3c5f4f2..9a38d5c 100644 --- a/datapage.cpp +++ b/datapage.cpp @@ -114,7 +114,7 @@ void DataPage::initialize_page(pageid_t pageid) { //initialize header p->pageType = DATA_PAGE; - //clear page (arranges for null-padding) + //clear page (arranges for null-padding) XXX null pad more carefully and use sentinel value instead? memset(p->memAddr, 0, PAGE_SIZE); //we're the last page for now. @@ -162,7 +162,10 @@ size_t DataPage::read_bytes(byte * buf, off_t offset, ssize_t remaining) chunk.size = 0; // eof } else { Page *p = alloc_ ? alloc_->load_page(xid_, chunk.page) : loadPage(xid_, chunk.page); - assert(p->pageType == DATA_PAGE); + if(p->pageType != DATA_PAGE) { + fprintf(stderr, "Page type %d, id %lld lsn %lld\n", (int)p->pageType, (long long)p->id, (long long)p->LSN); + assert(p->pageType == DATA_PAGE); + } if((chunk.page + 1 == page_count_ + first_page_) && (*is_another_page_ptr(p))) { page_count_++; diff --git a/diskTreeComponent.cpp b/diskTreeComponent.cpp index 6c2b442..6334920 100644 --- a/diskTreeComponent.cpp +++ b/diskTreeComponent.cpp @@ -398,7 +398,7 @@ recordid diskTreeComponent::internalNodes::appendInternalNode(int xid, Page *p, if(ret.size != INVALID_SLOT) { stasis_record_alloc_done(xid, p, ret); writeNodeRecord(xid,p,ret,key,key_len,val_page); - stasis_page_lsn_write(xid, p, internal_node_alloc->get_lsn(xid)); + stasis_page_lsn_write(xid, p, internal_node_alloc->get_lsn(xid)); // XXX remove this (writeNodeRecord calls it for us) } unlock(p->rwlatch); } else { diff --git a/logstore.cpp b/logstore.cpp index ddd6863..d3b9fad 100644 --- a/logstore.cpp +++ b/logstore.cpp @@ -37,7 +37,8 @@ logtable::logtable(pageid_t max_c0_size, pageid_t internal_region_size, p // This bool is purely for external code. this->accepting_new_requests = true; this->shutting_down_ = false; - flushing = false; + c0_flushing = false; + c1_flushing = false; this->merge_mgr = 0; tmerger = new tuplemerger(&replace_merger); @@ -85,8 +86,8 @@ void logtable::init_stasis() { DataPage::register_stasis_page_impl(); //stasis_buffer_manager_size = 768 * 1024; // 4GB = 2^10 pages: // XXX Workaround Stasis' (still broken) default concurrent buffer manager - stasis_buffer_manager_factory = stasis_buffer_manager_hash_factory; - stasis_buffer_manager_hint_writes_are_sequential = 0; +// stasis_buffer_manager_factory = stasis_buffer_manager_hash_factory; + stasis_buffer_manager_hint_writes_are_sequential = 1; Tinit(); } @@ -159,7 +160,7 @@ void logtable::flushTable() start = tv_to_double(start_tv); - flushing = true; + c0_flushing = true; bool blocked = false; int expmcount = merge_count; @@ -201,7 +202,7 @@ void logtable::flushTable() } else { DEBUG("signaled c0-c1 merge\n"); } - flushing = false; + c0_flushing = false; } template diff --git a/logstore.h b/logstore.h index e85d1e6..94165a9 100644 --- a/logstore.h +++ b/logstore.h @@ -124,7 +124,8 @@ public: if(!shutting_down_) { shutting_down_ = true; flushTable(); - flushing = true; + c0_flushing = true; + c1_flushing = true; } rwlc_unlock(header_mut); // XXX must need to do other things! (join the threads?) @@ -144,7 +145,8 @@ private: bool c0_is_merging; public: - bool flushing; + bool c0_flushing; + bool c1_flushing; // this needs to be set to true at shutdown, or when the c0-c1 merger is waiting for c1-c2 to finish its merge //DATA PAGE SETTINGS pageid_t internal_region_size; // in number of pages diff --git a/merger.cpp b/merger.cpp index c39c56c..5affb60 100644 --- a/merger.cpp +++ b/merger.cpp @@ -103,7 +103,7 @@ void * merge_scheduler::memMergeThread() { // needs to be past the rwlc_unlock... memTreeComponent::batchedRevalidatingIterator *itrB = - new memTreeComponent::batchedRevalidatingIterator(ltable_->get_tree_c0(), ltable_->merge_mgr, ltable_->max_c0_size, <able_->flushing, 100, <able_->rb_mut); + new memTreeComponent::batchedRevalidatingIterator(ltable_->get_tree_c0(), ltable_->merge_mgr, ltable_->max_c0_size, <able_->c0_flushing, 100, <able_->rb_mut); //: do the merge DEBUG("mmt:\tMerging:\n"); @@ -171,7 +171,9 @@ void * merge_scheduler::memMergeThread() { // XXX need to report backpressure here! while(ltable_->get_tree_c1_mergeable()) { + ltable_->c1_flushing = true; rwlc_cond_wait(<able_->c1_needed, ltable_->header_mut); + ltable_->c1_flushing = false; } xid = Tbegin(); @@ -254,7 +256,7 @@ void * merge_scheduler::diskMergeThread() // 4: do the merge. //create the iterators diskTreeComponent::iterator *itrA = ltable_->get_tree_c2()->open_iterator(); - diskTreeComponent::iterator *itrB = ltable_->get_tree_c1_mergeable()->open_iterator(ltable_->merge_mgr, 0.05, <able_->shutting_down_); + diskTreeComponent::iterator *itrB = ltable_->get_tree_c1_mergeable()->open_iterator(ltable_->merge_mgr, 0.05, <able_->c1_flushing); //create a new tree diskTreeComponent * c2_prime = new diskTreeComponent(xid, ltable_->internal_region_size, ltable_->datapage_region_size, ltable_->datapage_size, stats, (uint64_t)(ltable_->max_c0_size * *ltable_->R() + stats->base_size)/ 1000); diff --git a/regionAllocator.h b/regionAllocator.h index 11a8b2b..f6dda95 100644 --- a/regionAllocator.h +++ b/regionAllocator.h @@ -121,6 +121,7 @@ public: lsn_t get_lsn(int xid) { + // XXX we shouldn't need to have this logic in here anymore... lsn_t xid_lsn = stasis_transaction_table_get((stasis_transaction_table_t*)stasis_runtime_transaction_table(), xid)->prevLSN; lsn_t log_lsn = ((stasis_log_t*)stasis_log())->next_available_lsn((stasis_log_t*)stasis_log()); lsn_t ret = xid_lsn == INVALID_LSN ? log_lsn-1 : xid_lsn;