diff --git a/datapage.cpp b/datapage.cpp index b607577..096b35a 100644 --- a/datapage.cpp +++ b/datapage.cpp @@ -59,21 +59,22 @@ void DataPage::register_stasis_page_impl() { } template -DataPage::DataPage(int xid, pageid_t pid): +DataPage::DataPage(int xid, RegionAllocator * alloc, pageid_t pid): // XXX Hack!! The read-only constructor signature is too close to the other's xid_(xid), page_count_(1), // will be opportunistically incremented as we scan the datapage. initial_page_count_(-1), // used by append. - alloc_(0), // read-only, and we don't free data pages one at a time. + alloc_(alloc), // read-only, and we don't free data pages one at a time. first_page_(pid), - write_offset_(-1) { - assert(pid!=0); - Page *p = loadPage(xid_, first_page_); - if(!(*is_another_page_ptr(p) == 0 || *is_another_page_ptr(p) == 2)) { - printf("Page %lld is not the start of a datapage\n", first_page_); fflush(stdout); - abort(); - } - assert(*is_another_page_ptr(p) == 0 || *is_another_page_ptr(p) == 2); // would be 1 for page in the middle of a datapage - releasePage(p); + write_offset_(-1) + { + assert(pid!=0); + Page *p = alloc_ ? alloc_->load_page(xid, first_page_) : loadPage(xid, first_page_); + if(!(*is_another_page_ptr(p) == 0 || *is_another_page_ptr(p) == 2)) { + printf("Page %lld is not the start of a datapage\n", first_page_); fflush(stdout); + abort(); + } + assert(*is_another_page_ptr(p) == 0 || *is_another_page_ptr(p) == 2); // would be 1 for page in the middle of a datapage + releasePage(p); } template @@ -99,7 +100,7 @@ void DataPage::initialize_page(pageid_t pageid) { //load the first page Page *p; #ifdef CHECK_FOR_SCRIBBLING - p = loadPage(xid_, pageid); + p = alloc_ ? alloc->load_page(xid_, pageid) : loadPage(xid_, pageid); if(*stasis_page_type_ptr(p) == DATA_PAGE) { printf("Collision on page %lld\n", (long long)pageid); fflush(stdout); assert(*stasis_page_type_ptr(p) != DATA_PAGE); @@ -137,7 +138,7 @@ size_t DataPage::write_bytes(const byte * buf, size_t remaining) { if(chunk.page >= first_page_ + page_count_) { chunk.size = 0; // no space (should not happen) } else { - Page *p = loadPage(xid_, chunk.page); + Page *p = alloc_ ? alloc_->load_page(xid_, chunk.page) : loadPage(xid_, chunk.page); memcpy(data_at_offset_ptr(p, chunk.slot), buf, chunk.size); writelock(p->rwlatch,0); stasis_page_lsn_write(xid_, p, alloc_->get_lsn(xid_)); @@ -156,7 +157,7 @@ size_t DataPage::read_bytes(byte * buf, off_t offset, size_t remaining) { if(chunk.page >= first_page_ + page_count_) { chunk.size = 0; // eof } else { - Page *p = loadPage(xid_, chunk.page); + Page *p = alloc_ ? alloc_->load_page(xid_, chunk.page) : loadPage(xid_, chunk.page); assert(p->pageType == DATA_PAGE); if((chunk.page + 1 == page_count_ + first_page_) && (*is_another_page_ptr(p))) { @@ -185,7 +186,7 @@ bool DataPage::initialize_next_page() { abort(); } - Page *p = loadPage(xid_, rid.page-1); + Page *p = alloc_ ? alloc_->load_page(xid_, rid.page-1) : loadPage(xid_, rid.page-1); *is_another_page_ptr(p) = (rid.page-1 == first_page_) ? 2 : 1; writelock(p->rwlatch, 0); stasis_page_lsn_write(xid_, p, alloc_->get_lsn(xid_)); diff --git a/datapage.h b/datapage.h index 5d769c5..d59c334 100644 --- a/datapage.h +++ b/datapage.h @@ -61,8 +61,10 @@ public: public: - //to be used when reading an existing data page from disk - DataPage( int xid, pageid_t pid ); + /** + * if alloc is non-null, then reads will be optimized for sequential access + */ + DataPage( int xid, RegionAllocator* alloc, pageid_t pid ); //to be used to create new data pages DataPage( int xid, pageid_t page_count, RegionAllocator* alloc); diff --git a/diskTreeComponent.cpp b/diskTreeComponent.cpp index 77e6036..65dbbec 100644 --- a/diskTreeComponent.cpp +++ b/diskTreeComponent.cpp @@ -134,7 +134,7 @@ datatuple * diskTreeComponent::findTuple(int xid, datatuple::key_t key, size_t k if(pid!=-1) { - DataPage * dp = new DataPage(xid, pid); + DataPage * dp = new DataPage(xid, 0, pid); dp->recordRead(key, keySize, &tup); delete dp; } @@ -731,9 +731,10 @@ void diskTreeComponent::internalNodes::print_tree(int xid, pageid_t pid, int64_t //diskTreeComponentIterator implementation ///////////////////////////////////////////////// -diskTreeComponent::internalNodes::iterator::iterator(int xid, recordid root) { +diskTreeComponent::internalNodes::iterator::iterator(int xid, RegionAllocator* ro_alloc, recordid root) { + ro_alloc_ = ro_alloc; if(root.page == 0 && root.slot == 0 && root.size == -1) abort(); - p = loadPage(xid,root.page); + p = ro_alloc_->load_page(xid,root.page); readlock(p->rwlatch,0); DEBUG("ROOT_REC_SIZE %d\n", diskTreeComponent::internalNodes::root_rec_size); @@ -749,7 +750,7 @@ diskTreeComponent::internalNodes::iterator::iterator(int xid, recordid root) { unlock(p->rwlatch); releasePage(p); - p = loadPage(xid,leafid); + p = ro_alloc_->load_page(xid,leafid); readlock(p->rwlatch,0); assert(depth != 0); } else { @@ -770,10 +771,10 @@ diskTreeComponent::internalNodes::iterator::iterator(int xid, recordid root) { justOnePage = (depth == 0); } -diskTreeComponent::internalNodes::iterator::iterator(int xid, recordid root, const byte* key, len_t keylen) { +diskTreeComponent::internalNodes::iterator::iterator(int xid, RegionAllocator* ro_alloc, recordid root, const byte* key, len_t keylen) { if(root.page == NULLRID.page && root.slot == NULLRID.slot) abort(); - - p = loadPage(xid,root.page); + ro_alloc_ = ro_alloc; + p = ro_alloc_->load_page(xid,root.page); readlock(p->rwlatch,0); recordid rid = {p->id, diskTreeComponent::internalNodes::DEPTH, diskTreeComponent::internalNodes::root_rec_size}; @@ -795,7 +796,7 @@ diskTreeComponent::internalNodes::iterator::iterator(int xid, recordid root, con { unlock(p->rwlatch); releasePage(p); - p = loadPage(xid,lsm_entry_rid.page); + p = ro_alloc->load_page(xid,lsm_entry_rid.page); readlock(p->rwlatch,0); } @@ -835,7 +836,7 @@ int diskTreeComponent::internalNodes::iterator::next() DEBUG("done with page %lld next = %lld\n", p->id, next_rec.ptr); if(next_rec != -1 && ! justOnePage) { - p = loadPage(xid_, next_rec); + p = ro_alloc_->load_page(xid_, next_rec); readlock(p->rwlatch,0); current.page = next_rec; current.slot = 2; @@ -887,14 +888,15 @@ void diskTreeComponent::iterator::init_iterators(datatuple * key1, datatuple * k lsmIterator_ = NULL; } else { if(key1) { - lsmIterator_ = new diskTreeComponent::internalNodes::iterator(-1, tree_, key1->key(), key1->keylen()); + lsmIterator_ = new diskTreeComponent::internalNodes::iterator(-1, ro_alloc_, tree_, key1->key(), key1->keylen()); } else { - lsmIterator_ = new diskTreeComponent::internalNodes::iterator(-1, tree_); + lsmIterator_ = new diskTreeComponent::internalNodes::iterator(-1, ro_alloc_, tree_); } } } diskTreeComponent::iterator::iterator(diskTreeComponent::internalNodes *tree) : + ro_alloc_(new RegionAllocator()), tree_(tree ? tree->get_root_rec() : NULLRID) { init_iterators(NULL, NULL); @@ -902,6 +904,7 @@ diskTreeComponent::iterator::iterator(diskTreeComponent::internalNodes *tree) : } diskTreeComponent::iterator::iterator(diskTreeComponent::internalNodes *tree, datatuple* key) : + ro_alloc_(new RegionAllocator()), tree_(tree ? tree->get_root_rec() : NULLRID) { init_iterators(key,NULL); @@ -909,20 +912,16 @@ diskTreeComponent::iterator::iterator(diskTreeComponent::internalNodes *tree, da } -diskTreeComponent::iterator::~iterator() -{ - if(lsmIterator_) { - lsmIterator_->close(); - delete lsmIterator_; - } - - if(curr_page!=NULL) - { - delete curr_page; - curr_page = 0; - } +diskTreeComponent::iterator::~iterator() { + if(lsmIterator_) { + lsmIterator_->close(); + delete lsmIterator_; + } + delete curr_page; + curr_page = 0; + delete ro_alloc_; } void diskTreeComponent::iterator::init_helper(datatuple* key1) @@ -948,7 +947,7 @@ void diskTreeComponent::iterator::init_helper(datatuple* key1) lsmIterator_->value((byte**)hack); curr_pageid = *pid_tmp; - curr_page = new DataPage(-1, curr_pageid); + curr_page = new DataPage(-1, ro_alloc_, curr_pageid); DEBUG("opening datapage iterator %lld at key %s\n.", curr_pageid, key1 ? (char*)key1->key() : "NULL"); dp_itr = new DPITR_T(curr_page, key1); @@ -982,7 +981,7 @@ datatuple * diskTreeComponent::iterator::next_callerFrees() size_t ret = lsmIterator_->value((byte**)hack); assert(ret == sizeof(pageid_t)); curr_pageid = *pid_tmp; - curr_page = new DataPage(-1, curr_pageid); + curr_page = new DataPage(-1, ro_alloc_, curr_pageid); DEBUG("opening datapage iterator %lld at beginning\n.", curr_pageid); dp_itr = new DPITR_T(curr_page->begin()); diff --git a/diskTreeComponent.h b/diskTreeComponent.h index d9f0aa9..ea996f6 100644 --- a/diskTreeComponent.h +++ b/diskTreeComponent.h @@ -143,8 +143,8 @@ class diskTreeComponent { public: class iterator { public: - iterator(int xid, recordid root); - iterator(int xid, recordid root, const byte* key, len_t keylen); + iterator(int xid, RegionAllocator *ro_alloc, recordid root); + iterator(int xid, RegionAllocator *ro_alloc, recordid root, const byte* key, len_t keylen); int next(); void close(); @@ -162,7 +162,7 @@ class diskTreeComponent { inline void releaseLock() { } private: - + RegionAllocator * ro_alloc_; Page * p; int xid_; bool done; @@ -193,14 +193,15 @@ class diskTreeComponent { int operator-(iterator & t) { abort(); } private: - recordid tree_; //root of the tree + RegionAllocator * ro_alloc_; // has a filehandle that we use to optimize sequential scans. + recordid tree_; //root of the tree - diskTreeComponent::internalNodes::iterator* lsmIterator_; + diskTreeComponent::internalNodes::iterator* lsmIterator_; - pageid_t curr_pageid; //current page id - DataPage *curr_page; //current page - typedef DataPage::iterator DPITR_T; - DPITR_T *dp_itr; + pageid_t curr_pageid; //current page id + DataPage *curr_page; //current page + typedef DataPage::iterator DPITR_T; + DPITR_T *dp_itr; }; }; #endif /* DISKTREECOMPONENT_H_ */ diff --git a/logserver.cpp b/logserver.cpp index 863d7b0..47cd663 100644 --- a/logserver.cpp +++ b/logserver.cpp @@ -5,6 +5,7 @@ #include "merger.h" #include "logstore.h" +#include "regionAllocator.h" #include "network.h" @@ -617,7 +618,8 @@ int op_stat_histogram(pthread_data* data, size_t limit) { } int xid = Tbegin(); - diskTreeComponent::internalNodes::iterator * it = new diskTreeComponent::internalNodes::iterator(xid, data->ltable->get_tree_c2()->get_root_rid()); + RegionAllocator * ro_alloc = new RegionAllocator(); + diskTreeComponent::internalNodes::iterator * it = new diskTreeComponent::internalNodes::iterator(xid, ro_alloc, data->ltable->get_tree_c2()->get_root_rid()); size_t count = 0; int err = 0; @@ -643,7 +645,7 @@ int op_stat_histogram(pthread_data* data, size_t limit) { size_t cur_stride = 0; size_t i = 0; - it = new diskTreeComponent::internalNodes::iterator(xid, data->ltable->get_tree_c2()->get_root_rid()); // TODO make this method private? + it = new diskTreeComponent::internalNodes::iterator(xid, ro_alloc, data->ltable->get_tree_c2()->get_root_rid()); // TODO make this method private? while(it->next()) { i++; if(i == count || !cur_stride) { // do we want to send this key? (this matches the first, last and interior keys) @@ -661,6 +663,7 @@ int op_stat_histogram(pthread_data* data, size_t limit) { it->close(); delete(it); + delete(ro_alloc); if(!err){ err = writeendofiteratortosocket(*(data->workitem)); } Tcommit(xid); return err; diff --git a/regionAllocator.h b/regionAllocator.h index fd17570..031dafc 100644 --- a/regionAllocator.h +++ b/regionAllocator.h @@ -20,22 +20,36 @@ public: // Open an existing region allocator. RegionAllocator(int xid, recordid rid) : nextPage_(INVALID_PAGE), - endOfRegion_(INVALID_PAGE) { + endOfRegion_(INVALID_PAGE), + bm_((stasis_buffer_manager_t*)stasis_runtime_buffer_manager()), + bmh_(bm_->openHandleImpl(bm_, 1)) { rid_ = rid; Tread(xid, rid_, &header_); regionCount_ = TarrayListLength(xid, header_.region_list); } // Create a new region allocator. - RegionAllocator(int xid, pageid_t region_length) : - nextPage_(0), - endOfRegion_(0), - regionCount_(0) - { - rid_ = Talloc(xid, sizeof(header_)); - header_.region_list = TarrayListAlloc(xid, 1, 2, sizeof(pageid_t)); - header_.region_length = region_length; - Tset(xid, rid_, &header_); - } + RegionAllocator(int xid, pageid_t region_length) : + nextPage_(0), + endOfRegion_(0), + regionCount_(0), + bm_((stasis_buffer_manager_t*)stasis_runtime_buffer_manager()), + bmh_(bm_->openHandleImpl(bm_, 1)) + { + rid_ = Talloc(xid, sizeof(header_)); + header_.region_list = TarrayListAlloc(xid, 1, 2, sizeof(pageid_t)); + header_.region_length = region_length; + Tset(xid, rid_, &header_); + } + explicit RegionAllocator() : + nextPage_(INVALID_PAGE), + endOfRegion_(INVALID_PAGE), + bm_((stasis_buffer_manager_t*)stasis_runtime_buffer_manager()), + bmh_(bm_->openHandleImpl(bm_, 1)){ + rid_.page = INVALID_PAGE; + regionCount_ = -1; + } + Page * load_page(int xid, pageid_t p) { return bm_->loadPageImpl(bm_, bmh_, xid, p, UNKNOWN_TYPE_PAGE); } + // XXX handle disk full? pageid_t alloc_extent(int xid, pageid_t extent_length) { assert(nextPage_ != INVALID_PAGE); @@ -67,8 +81,9 @@ public: list_entry.slot < regionCount; list_entry.slot++) { pageid_t pid; Tread(xid, list_entry, &pid); - TregionForce(xid, pid); + TregionForce(xid, bm_, bmh_, pid); } + bm_->closeHandleImpl(bm_, bmh_); } void dealloc_regions(int xid) { pageid_t regionCount = TarrayListLength(xid, header_.region_list); @@ -121,6 +136,8 @@ private: pageid_t nextPage_; pageid_t endOfRegion_; pageid_t regionCount_; + stasis_buffer_manager_t * bm_; + stasis_buffer_manager_handle_t *bmh_; persistent_state header_; public: static const size_t header_size = sizeof(persistent_state); diff --git a/test/check_datapage.cpp b/test/check_datapage.cpp index 6f33482..ab87a00 100644 --- a/test/check_datapage.cpp +++ b/test/check_datapage.cpp @@ -100,7 +100,7 @@ void insertProbeIter(size_t NUM_ENTRIES) int tuplenum = 0; for(int i = 0; i < dpages ; i++) { - DataPage dp(xid, dsp[i]); + DataPage dp(xid, 0, dsp[i]); DataPage::iterator itr = dp.begin(); datatuple *dt=0; while( (dt=itr.getnext()) != NULL) diff --git a/test/check_gen.cpp b/test/check_gen.cpp index 8121caa..4200076 100644 --- a/test/check_gen.cpp +++ b/test/check_gen.cpp @@ -5,6 +5,7 @@ #include "logstore.h" +#include "regionAllocator.h" int main(int argc, char **argv) { @@ -24,9 +25,12 @@ int main(int argc, char **argv) Tcommit(xid); xid = Tbegin(); - diskTreeComponent::internalNodes::iterator * it = new diskTreeComponent::internalNodes::iterator(xid,ltable.get_tree_c2()->get_root_rid() ); + RegionAllocator * ro_alloc = new RegionAllocator(); + + diskTreeComponent::internalNodes::iterator * it = new diskTreeComponent::internalNodes::iterator(xid,ro_alloc, ltable.get_tree_c2()->get_root_rid() ); it->close(); delete it; + delete ro_alloc; Tcommit(xid); diskTreeComponent::internalNodes::deinit_stasis(); diff --git a/test/check_logtree.cpp b/test/check_logtree.cpp index c807dbe..d207012 100644 --- a/test/check_logtree.cpp +++ b/test/check_logtree.cpp @@ -4,6 +4,7 @@ #include #include #include "logstore.h" +#include "regionAllocator.h" #include "diskTreeComponent.h" #include @@ -114,7 +115,8 @@ void insertProbeIter_str(int NUM_ENTRIES) int64_t count = 0; - diskTreeComponent::internalNodes::iterator * it = new diskTreeComponent::internalNodes::iterator(xid, lt->get_root_rec()); + RegionAllocator * ro_alloc = new RegionAllocator(); + diskTreeComponent::internalNodes::iterator * it = new diskTreeComponent::internalNodes::iterator(xid, ro_alloc, lt->get_root_rec()); while(it->next()) { byte * key; @@ -133,7 +135,7 @@ void insertProbeIter_str(int NUM_ENTRIES) it->close(); delete it; - + delete ro_alloc; Tcommit(xid); diskTreeComponent::internalNodes::deinit_stasis(); } diff --git a/test/check_merge.cpp b/test/check_merge.cpp index dd24e54..fc19af1 100644 --- a/test/check_merge.cpp +++ b/test/check_merge.cpp @@ -57,7 +57,7 @@ void insertProbeIter(size_t NUM_ENTRIES) int lindex = mscheduler.addlogtable(<able); ltable.setMergeData(mscheduler.getMergeData(lindex)); - mscheduler.startlogtable(lindex); + mscheduler.startlogtable(lindex, 10 * 1024 * 1024); unlock(ltable.header_lock); printf("Stage 1: Writing %d keys\n", NUM_ENTRIES); diff --git a/test/check_mergelarge.cpp b/test/check_mergelarge.cpp index fd8255f..9db8950 100644 --- a/test/check_mergelarge.cpp +++ b/test/check_mergelarge.cpp @@ -56,7 +56,7 @@ void insertProbeIter(size_t NUM_ENTRIES) int lindex = mscheduler.addlogtable(<able); ltable.setMergeData(mscheduler.getMergeData(lindex)); - mscheduler.startlogtable(lindex); + mscheduler.startlogtable(lindex, 10 * 1024 * 1024); unlock(ltable.header_lock); printf("Stage 1: Writing %d keys\n", NUM_ENTRIES); diff --git a/test/check_mergetuple.cpp b/test/check_mergetuple.cpp index 204e489..bc825e1 100644 --- a/test/check_mergetuple.cpp +++ b/test/check_mergetuple.cpp @@ -111,7 +111,7 @@ void insertProbeIter(size_t NUM_ENTRIES) int lindex = mscheduler.addlogtable(<able); ltable.setMergeData(mscheduler.getMergeData(lindex)); - mscheduler.startlogtable(lindex); + mscheduler.startlogtable(lindex, 10 * 1024 * 1024); unlock(ltable.header_lock); printf("Stage 1: Writing %d keys\n", NUM_ENTRIES);