diff --git a/datapage.cpp b/datapage.cpp index 820505e..65c0e99 100644 --- a/datapage.cpp +++ b/datapage.cpp @@ -1,4 +1,3 @@ - #include "logstore.h" #include "datapage.h" #include @@ -8,9 +7,8 @@ static const int DATA_PAGE = USER_DEFINED_PAGE(1); BEGIN_C_DECLS static void dataPageFsck(Page* p) { - int32_t pageCount = *stasis_page_int32_cptr_from_start(p, 0); - assert(pageCount >= 0); - assert(pageCount <= MAX_PAGE_COUNT); + int32_t is_last_page = *stasis_page_int32_cptr_from_start(p, 0); + assert(is_last_page == 0 || is_last_page == 1 || is_last_page == 2); } static void dataPageLoaded(Page* p) { dataPageFsck(p); @@ -20,6 +18,15 @@ static void dataPageFlushed(Page* p) { dataPageFsck(p); } static int notSupported(int xid, Page * p) { return 0; } + +static lsn_t get_lsn(int xid) { + lsn_t xid_lsn = stasis_transaction_table_get((stasis_transaction_table_t*)stasis_runtime_transaction_table(), xid)->prevLSN; + lsn_t log_lsn = ((stasis_log_t*)stasis_log())->next_available_lsn((stasis_log_t*)stasis_log()); + lsn_t ret = xid_lsn == INVALID_LSN ? log_lsn-1 : xid_lsn; + assert(ret != INVALID_LSN); + return ret; +} + END_C_DECLS template @@ -56,54 +63,59 @@ void DataPage::register_stasis_page_impl() { stasis_page_impl_register(pi); } -template -int32_t DataPage::readPageCount(int xid, pageid_t pid) { - Page *p = loadPage(xid, pid); - int32_t ret = *page_count_ptr(p); - releasePage(p); - return ret; -} - template DataPage::DataPage(int xid, pageid_t pid): - page_count_(readPageCount(xid, pid)), + xid_(xid), + page_count_(1), // will be opportunistically incremented as we scan the datapage. + initial_page_count_(-1), // used by append. + alloc_(0), // read-only, and we don't free data pages one at a time. first_page_(pid), - write_offset_(-1) { assert(pid!=0); } + write_offset_(-1) { + assert(pid!=0); + Page *p = loadPage(xid_, first_page_); + if(!(*is_another_page_ptr(p) == 0 || *is_another_page_ptr(p) == 2)) { + printf("Page %lld is not the start of a datapage\n", first_page_); fflush(stdout); + abort(); + } + assert(*is_another_page_ptr(p) == 0 || *is_another_page_ptr(p) == 2); // would be 1 for page in the middle of a datapage + releasePage(p); +} template -DataPage::DataPage(int xid, int fix_pcount, pageid_t (*alloc_region)(int, void*), void * alloc_state) : - page_count_(MAX_PAGE_COUNT), // XXX fix_pcount), - first_page_(alloc_region(xid, alloc_state)), +DataPage::DataPage(int xid, pageid_t page_count, RegionAllocator *alloc) : + xid_(xid), + page_count_(1), + initial_page_count_(page_count), + alloc_(alloc), + first_page_(alloc_->alloc_extent(xid_, page_count_)), write_offset_(0) { - assert(fix_pcount >= 1); - initialize(xid); + DEBUG("Datapage page count: %lld pid = %lld\n", (long long int)page_count_, (long long int)first_page_); + assert(page_count_ >= 1); + initialize(); } template -DataPage::~DataPage() { } - - -template -void DataPage::initialize(int xid) +void DataPage::initialize() { //load the first page - Page *p = loadUninitializedPage(xid, first_page_); + Page *p = loadUninitializedPage(xid_, first_page_); writelock(p->rwlatch,0); + DEBUG("\t\t\t\t\t\t->%lld\n", first_page_); + //initialize header + p->pageType = DATA_PAGE; - //set number of pages to 1 - *page_count_ptr(p) = page_count_; + //we're the last page for now. + *is_another_page_ptr(p) = 0; //write 0 to first data size *length_at_offset_ptr(p, write_offset_) = 0; //set the page dirty - stasis_dirty_page_table_set_dirty((stasis_dirty_page_table_t*)stasis_runtime_dirty_page_table(), p); - - p->pageType = DATA_PAGE; + stasis_page_lsn_write(xid_, p, get_lsn(xid_)); //release the page unlock(p->rwlatch); @@ -111,18 +123,18 @@ void DataPage::initialize(int xid) } template -size_t DataPage::write_bytes(int xid, const byte * buf, size_t remaining) { +size_t DataPage::write_bytes(const byte * buf, size_t remaining) { recordid chunk = calc_chunk_from_offset(write_offset_); if(chunk.size > remaining) { chunk.size = remaining; } if(chunk.page >= first_page_ + page_count_) { - chunk.size = 0; // no space + chunk.size = 0; // no space (should not happen) } else { - Page *p = loadPage(xid, chunk.page); + Page *p = loadPage(xid_, chunk.page); memcpy(data_at_offset_ptr(p, chunk.slot), buf, chunk.size); writelock(p->rwlatch,0); - stasis_dirty_page_table_set_dirty((stasis_dirty_page_table_t*)stasis_runtime_dirty_page_table(), p); + stasis_page_lsn_write(xid_, p, get_lsn(xid_)); unlock(p->rwlatch); releasePage(p); write_offset_ += chunk.size; @@ -130,7 +142,7 @@ size_t DataPage::write_bytes(int xid, const byte * buf, size_t remaining) return chunk.size; } template -size_t DataPage::read_bytes(int xid, byte * buf, off_t offset, size_t remaining) { +size_t DataPage::read_bytes(byte * buf, off_t offset, size_t remaining) { recordid chunk = calc_chunk_from_offset(offset); if(chunk.size > remaining) { chunk.size = remaining; @@ -138,7 +150,12 @@ size_t DataPage::read_bytes(int xid, byte * buf, off_t offset, size_t rem if(chunk.page >= first_page_ + page_count_) { chunk.size = 0; // eof } else { - Page *p = loadPage(xid, chunk.page); + Page *p = loadPage(xid_, chunk.page); + assert(p->pageType == DATA_PAGE); + if((chunk.page + 1 == page_count_ + first_page_) + && (*is_another_page_ptr(p))) { + page_count_++; + } memcpy(buf, data_at_offset_ptr(p, chunk.slot), chunk.size); releasePage(p); } @@ -146,47 +163,81 @@ size_t DataPage::read_bytes(int xid, byte * buf, off_t offset, size_t rem } template -bool DataPage::initialize_next_page(int xid) { +bool DataPage::initialize_next_page() { recordid rid = calc_chunk_from_offset(write_offset_); assert(rid.slot == 0); + DEBUG("\t\t%lld\n", (long long)rid.page); + if(rid.page >= first_page_ + page_count_) { - return false; + assert(rid.page == first_page_ + page_count_); + if(alloc_->grow_extent(1)) { + page_count_++; + } else { + return false; // The region is full + } } else { - Page *p = loadUninitializedPage(xid, rid.page); - p->pageType = DATA_PAGE; - *page_count_ptr(p) = 0; - *length_at_offset_ptr(p,0) = 0; - writelock(p->rwlatch, 0); //XXX this is pretty obnoxious. Perhaps stasis shouldn't check for the latch - stasis_dirty_page_table_set_dirty((stasis_dirty_page_table_t*)stasis_runtime_dirty_page_table(), p); - unlock(p->rwlatch); - releasePage(p); - return true; + abort(); } + + Page *p = loadPage(xid_, rid.page-1); + *is_another_page_ptr(p) = (rid.page-1 == first_page_) ? 2 : 1; + writelock(p->rwlatch, 0); + stasis_page_lsn_write(xid_, p, get_lsn(xid_)); + unlock(p->rwlatch); + releasePage(p); + + + // XXX this is repeated in initialize()! +#ifdef CHECK_FOR_SCRIBBLING + p = loadPage(xid_, rid.page); + if(*stasis_page_type_ptr(p) == DATA_PAGE) { + printf("Collision on page %lld\n", (long long)rid.page); fflush(stdout); + assert(*stasis_page_type_ptr(p) != DATA_PAGE); // XXX DO NOT COMMIT THIS LINE + } +#else + p = loadUninitializedPage(xid_, rid.page); +#endif + DEBUG("\t\t\t\t%lld\n", (long long)rid.page); + + p->pageType = DATA_PAGE; + *is_another_page_ptr(p) = 0; + writelock(p->rwlatch, 0); //XXX this is pretty obnoxious. Perhaps stasis shouldn't check for the latch + stasis_page_lsn_write(xid_, p, get_lsn(xid_)); + unlock(p->rwlatch); + releasePage(p); + return true; } template -bool DataPage::write_data(int xid, const byte * buf, size_t len) { +bool DataPage::write_data(const byte * buf, size_t len, bool init_next) { + bool first = true; while(1) { assert(len > 0); - size_t written = write_bytes(xid, buf, len); + size_t written = write_bytes(buf, len); if(written == 0) { return false; // fail } if(written == len) { return true; // success } + if(len > PAGE_SIZE && ! first) { + assert(written > 4000); + } buf += written; len -= written; - if(!initialize_next_page(xid)) { - return false; // fail + if(init_next) { + if(!initialize_next_page()) { + return false; // fail + } } + first = false; } } template -bool DataPage::read_data(int xid, byte * buf, off_t offset, size_t len) { +bool DataPage::read_data(byte * buf, off_t offset, size_t len) { while(1) { assert(len > 0); - size_t read_count = read_bytes(xid, buf, offset, len); + size_t read_count = read_bytes(buf, offset, len); if(read_count == 0) { return false; // fail } @@ -199,29 +250,17 @@ bool DataPage::read_data(int xid, byte * buf, off_t offset, size_t len) { } } template -bool DataPage::append(int xid, TUPLE const * dat) +bool DataPage::append(TUPLE const * dat) { + // Don't append record to already-full datapage. The record could push us over the page limit, but that's OK. + if(write_offset_ > (initial_page_count_ * PAGE_SIZE)) { return false; } + byte * buf = dat->to_bytes(); // TODO could be more efficient; this does a malloc and memcpy. The alternative couples us more strongly to datapage, but simplifies datapage. len_t dat_len = dat->byte_length(); - bool succ = write_data(xid, (const byte*)&dat_len, sizeof(dat_len)); + bool succ = write_data((const byte*)&dat_len, sizeof(dat_len)); if(succ) { - succ = write_data(xid, buf, dat_len); - } - - if(succ) { - - dat_len = 0; // write terminating zero. - - succ = write_data(xid, (const byte*)&dat_len, sizeof(dat_len)); - - if(succ) { - write_offset_ -= sizeof(dat_len); // want to overwrite zero next time around. - } - - // if writing the zero fails, later reads will fail as well, and assume EOF. - succ = true; // return true (the tuple has been written regardless of whether the zero fit) - + succ = write_data(buf, dat_len); } free(buf); @@ -230,12 +269,12 @@ bool DataPage::append(int xid, TUPLE const * dat) } template -bool DataPage::recordRead(int xid, typename TUPLE::key_t key, size_t keySize, TUPLE ** buf) +bool DataPage::recordRead(typename TUPLE::key_t key, size_t keySize, TUPLE ** buf) { RecordIterator itr(this); int match = -1; - while((*buf=itr.getnext(xid)) != 0) + while((*buf=itr.getnext()) != 0) { match = TUPLE::compare((*buf)->key(), key); @@ -259,108 +298,24 @@ bool DataPage::recordRead(int xid, typename TUPLE::key_t key, size_t keyS return false; } -template -pageid_t DataPage::dp_alloc_region(int xid, void *conf) -{ - RegionAllocConf_t* a = (RegionAllocConf_t*)conf; - - if(a->nextPage == a->endOfRegion) { - if(a->regionList.size == -1) { - //DEBUG("nextPage: %lld\n", a->nextPage); - a->regionList = TarrayListAlloc(xid, 1, 4, sizeof(pageid_t)); - DEBUG("regionList.page: %lld\n", a->regionList.page); - DEBUG("regionList.slot: %d\n", a->regionList.slot); - DEBUG("regionList.size: %lld\n", a->regionList.size); - - a->regionCount = 0; - } - DEBUG("{%lld <- alloc region arraylist}\n", a->regionList.page); - TarrayListExtend(xid,a->regionList,1); - a->regionList.slot = a->regionCount; - DEBUG("region lst slot %d\n",a->regionList.slot); - a->regionCount++; - DEBUG("region count %lld\n",a->regionCount); - a->nextPage = TregionAlloc(xid, a->regionSize,12); - DEBUG("next page %lld\n",a->nextPage); - a->endOfRegion = a->nextPage + a->regionSize; - Tset(xid,a->regionList,&a->nextPage); - DEBUG("next page %lld\n",a->nextPage); - } - - DEBUG("%lld ?= %lld\n", a->nextPage,a->endOfRegion); - pageid_t ret = a->nextPage; - a->nextPage = a->endOfRegion; // Allocate everything all at once. - - DEBUG("ret %lld\n",ret); - return ret; - -} - -template -pageid_t DataPage::dp_alloc_region_rid(int xid, void * ridp) { - recordid rid = *(recordid*)ridp; - RegionAllocConf_t conf; - Tread(xid,rid,&conf); - pageid_t ret = dp_alloc_region(xid,&conf); - //DEBUG("{%lld <- alloc region extend}\n", conf.regionList.page); - // XXX get rid of Tset by storing next page in memory, and losing it - // on crash. - Tset(xid,rid,&conf); - return ret; -} - -template -void DataPage::dealloc_region_rid(int xid, void *conf) -{ - RegionAllocConf_t a = *((RegionAllocConf_t*)conf); - DEBUG("{%lld <- dealloc region arraylist}\n", a.regionList.page); - - for(int i = 0; i < a.regionCount; i++) { - a.regionList.slot = i; - pageid_t pid; - Tread(xid,a.regionList,&pid); - TregionDealloc(xid,pid); - } -} - -template -void DataPage::force_region_rid(int xid, void *conf) -{ - recordid rid = *(recordid*)conf; - RegionAllocConf_t a; - Tread(xid,rid,&a); - - for(int i = 0; i < a.regionCount; i++) - { - a.regionList.slot = i; - pageid_t pid; - Tread(xid,a.regionList,&pid); - stasis_dirty_page_table_flush_range((stasis_dirty_page_table_t*)stasis_runtime_dirty_page_table(), pid, pid+a.regionSize); - stasis_buffer_manager_t *bm = - (stasis_buffer_manager_t*)stasis_runtime_buffer_manager(); - bm->forcePageRange(bm, pid, pid+a.regionSize); - } -} - - /////////////////////////////////////////////////////////////// //RECORD ITERATOR /////////////////////////////////////////////////////////////// template -TUPLE* DataPage::RecordIterator::getnext(int xid) +TUPLE* DataPage::RecordIterator::getnext() { len_t len; bool succ; - succ = dp->read_data(xid, (byte*)&len, read_offset_, sizeof(len)); + succ = dp->read_data((byte*)&len, read_offset_, sizeof(len)); if((!succ) || (len == 0)) { return NULL; } read_offset_ += sizeof(len); byte * buf = (byte*)malloc(len); - succ = dp->read_data(xid, buf, read_offset_, len); + succ = dp->read_data(buf, read_offset_, len); if(!succ) { read_offset_ -= sizeof(len); free(buf); return NULL; } @@ -373,7 +328,7 @@ TUPLE* DataPage::RecordIterator::getnext(int xid) return ret; } -template +/*template void DataPage::RecordIterator::advance(int xid, int count) { len_t len; @@ -384,4 +339,4 @@ void DataPage::RecordIterator::advance(int xid, int count) read_offset_ += sizeof(len); read_offset_ += len; } -} +}*/ diff --git a/datapage.h b/datapage.h index 89ba23e..6b4b72d 100644 --- a/datapage.h +++ b/datapage.h @@ -7,6 +7,7 @@ #include +#define CHECK_FOR_SCRIBBLING template class DataPage @@ -26,18 +27,108 @@ public: } //returns the next tuple and also advances the iterator - TUPLE *getnext(int xid); + TUPLE *getnext(); //advance the iterator by count tuples, i.e. skip over count tuples - void advance(int xid, int count=1); - - + // void advance(int xid, int count=1); + off_t read_offset_; DataPage *dp; - }; + class RegionAllocator + { + public: + + // Open an existing region allocator. + RegionAllocator(int xid, recordid rid) : + rid_(rid), + nextPage_(INVALID_PAGE), + endOfRegion_(INVALID_PAGE) { + Tread(xid, rid, &header_); + regionCount_ = TarrayListLength(xid, header_.region_list); + } + // Create a new region allocator. + RegionAllocator(int xid, recordid rid, pageid_t region_length) : + rid_(rid), + nextPage_(0), + endOfRegion_(0), + regionCount_(0) + { + assert(TrecordSize(xid, rid) == sizeof(header_)); + header_.region_list = TarrayListAlloc(xid, 1, 2, sizeof(pageid_t)); + header_.region_length = region_length; + Tset(xid, rid_, &header_); + } + // XXX handle disk full? + pageid_t alloc_extent(int xid, pageid_t extent_length) { + assert(nextPage_ != INVALID_PAGE); + pageid_t ret = nextPage_; + nextPage_ += extent_length; + if(nextPage_ >= endOfRegion_) { + ret = TregionAlloc(xid, header_.region_length, 42); // XXX assign a region allocator id + TarrayListExtend(xid, header_.region_list, 1); + recordid rid = header_.region_list; + rid.slot = regionCount_; + Tset(xid, rid, &ret); + assert(extent_length <= header_.region_length); // XXX could handle this case if we wanted to. Would remove this error case, and not be hard. + nextPage_ = ret + extent_length; + endOfRegion_ = ret + header_.region_length; + regionCount_++; + assert(regionCount_ == TarrayListLength(xid, header_.region_list)); + } + return ret; + } + bool grow_extent(pageid_t extension_length) { + assert(nextPage_ != INVALID_PAGE); + nextPage_ += extension_length; + return(nextPage_ < endOfRegion_); + } + void force_regions(int xid) { + assert(nextPage_ != INVALID_PAGE); + pageid_t regionCount = TarrayListLength(xid, header_.region_list); + for(recordid list_entry = header_.region_list; + list_entry.slot < regionCount; list_entry.slot++) { + pageid_t pid; + Tread(xid, list_entry, &pid); + TregionForce(xid, pid); + } + Tset(xid, rid_, &header_); + } + void dealloc_regions(int xid) { + pageid_t regionCount = TarrayListLength(xid, header_.region_list); + + printf("{%lld %lld %lld}\n", header_.region_list.page, (long long)header_.region_list.slot, (long long)header_.region_list.size); + + for(recordid list_entry = header_.region_list; + list_entry.slot < regionCount; list_entry.slot++) { + pageid_t pid; + Tread(xid, list_entry, &pid); +#ifndef CHECK_FOR_SCRIBBLING // Don't actually free the page if we'll be checking that pages are used exactly once below. + TregionDealloc(xid, pid); +#endif + } + } + void done() { + nextPage_ = INVALID_PAGE; + endOfRegion_ = INVALID_PAGE; + } + private: + typedef struct { + recordid region_list; + pageid_t region_length; + } persistent_state; + + const recordid rid_; + pageid_t nextPage_; + pageid_t endOfRegion_; + + pageid_t regionCount_; + persistent_state header_; + public: + static const size_t header_size = sizeof(persistent_state); + }; public: @@ -45,14 +136,24 @@ public: DataPage( int xid, pageid_t pid ); //to be used to create new data pages - DataPage( int xid, int fix_pcount, pageid_t (*alloc_region)(int, void*), void * alloc_state); + DataPage( int xid, pageid_t page_count, RegionAllocator* alloc); - ~DataPage(); + ~DataPage() { + if(write_offset_ != -1) { + len_t dat_len = 0; // write terminating zero. - bool append(int xid, TUPLE const * dat); - bool recordRead(int xid, typename TUPLE::key_t key, size_t keySize, TUPLE ** buf); + write_data((const byte*)&dat_len, sizeof(dat_len), false); - inline uint16_t recordCount(int xid); + // if writing the zero fails, later reads will fail as well, and assume EOF. + + } + + } + + bool append(TUPLE const * dat); + bool recordRead(typename TUPLE::key_t key, size_t keySize, TUPLE ** buf); + + inline uint16_t recordCount(); RecordIterator begin(){return RecordIterator(this);} @@ -60,33 +161,24 @@ public: pageid_t get_start_pid(){return first_page_;} int get_page_count(){return page_count_;} - static pageid_t dp_alloc_region(int xid, void *conf); - - static pageid_t dp_alloc_region_rid(int xid, void * ridp); - - static void dealloc_region_rid(int xid, void* conf); - - static void force_region_rid(int xid, void *conf); - static void register_stasis_page_impl(); private: - void initialize(int xid); + // static pageid_t dp_alloc_region(int xid, void *conf, pageid_t count); - //reads the page count information from the first page - static int32_t readPageCount(int xid, pageid_t pid); + void initialize(); private: static const uint16_t DATA_PAGE_HEADER_SIZE = sizeof(int32_t); static const uint16_t DATA_PAGE_SIZE = USABLE_SIZE_OF_PAGE - DATA_PAGE_HEADER_SIZE; typedef uint32_t len_t; - static inline int32_t* page_count_ptr(Page *p) { + static inline int32_t* is_another_page_ptr(Page *p) { return stasis_page_int32_ptr_from_start(p,0); } static inline byte * data_at_offset_ptr(Page *p, slotid_t offset) { - return ((byte*)(page_count_ptr(p)+1))+offset; + return ((byte*)(is_another_page_ptr(p)+1))+offset; } static inline len_t * length_at_offset_ptr(Page *p, slotid_t offset) { return (len_t*)data_at_offset_ptr(p,offset); @@ -97,15 +189,19 @@ private: ret.page = first_page_ + offset / DATA_PAGE_SIZE; ret.slot = offset % DATA_PAGE_SIZE; ret.size = DATA_PAGE_SIZE - ret.slot; - assert(ret.size); + assert(ret.size); return ret; } - size_t write_bytes(int xid, const byte * buf, size_t remaining); - size_t read_bytes(int xid, byte * buf, off_t offset, size_t remaining); - bool write_data(int xid, const byte * buf, size_t len); - bool read_data(int xid, byte * buf, off_t offset, size_t len); - bool initialize_next_page(int xid); - const int32_t page_count_; + size_t write_bytes(const byte * buf, size_t remaining); + size_t read_bytes(byte * buf, off_t offset, size_t remaining); + bool write_data(const byte * buf, size_t len, bool init_next = true); + bool read_data(byte * buf, off_t offset, size_t len); + bool initialize_next_page(); + + int xid_; + pageid_t page_count_; + const pageid_t initial_page_count_; + RegionAllocator *alloc_; const pageid_t first_page_; off_t write_offset_; // points to the next free byte (ignoring page boundaries) }; diff --git a/logiterators.cpp b/logiterators.cpp index c5444cd..a2cf8ba 100644 --- a/logiterators.cpp +++ b/logiterators.cpp @@ -80,7 +80,7 @@ TUPLE * treeIterator::getnext() if(dp_itr == 0) return 0; - TUPLE* readTuple = dp_itr->getnext(-1); + TUPLE* readTuple = dp_itr->getnext(); if(!readTuple) @@ -101,7 +101,7 @@ TUPLE * treeIterator::getnext() dp_itr = new DPITR_T(curr_page->begin()); - readTuple = dp_itr->getnext(-1); + readTuple = dp_itr->getnext(); assert(readTuple); } // else readTuple is null. We're done. diff --git a/logstore.cpp b/logstore.cpp index e6c793f..2601b62 100644 --- a/logstore.cpp +++ b/logstore.cpp @@ -8,8 +8,18 @@ #include "logiterators.h" #include "datapage.cpp" +#include #include #include +#include +#include + + +static inline double tv_to_double(struct timeval tv) +{ + return static_cast(tv.tv_sec) + + (static_cast(tv.tv_usec) / 1000000.0); +} ///////////////////////////////////////////////////////////////// // LOGTREE implementation @@ -39,6 +49,20 @@ logtree::logtree() } +void logtree::init_stasis() { + + bufferManagerFileHandleType = BUFFER_MANAGER_FILE_HANDLE_PFILE; + + DataPage::register_stasis_page_impl(); + + stasis_buffer_manager_factory = stasis_buffer_manager_hash_factory; // XXX workaround stasis issue #22. + + Tinit(); + +} + +void logtree::deinit_stasis() { Tdeinit(); } + void logtree::free_region_rid(int xid, recordid tree, logtree_page_deallocator_t dealloc, void *allocator_state) { @@ -114,15 +138,8 @@ pageid_t logtree::alloc_region(int xid, void *conf) DEBUG("%lld ?= %lld\n", a->nextPage,a->endOfRegion); pageid_t ret = a->nextPage; - // Ensure the page is in buffer cache without accessing disk (this - // sets it to clean and all zeros if the page is not in cache). - // Hopefully, future reads will get a cache hit, and avoid going to - // disk. - - Page * p = loadUninitializedPage(xid, ret); - releasePage(p); - DEBUG("ret %lld\n",ret); (a->nextPage)++; + DEBUG("tree %lld-%lld\n", (long long)ret, a->endOfRegion); return ret; } @@ -157,18 +174,11 @@ recordid logtree::create(int xid) Page *p = loadPage(xid, ret.page); writelock(p->rwlatch,0); - - stasis_page_slotted_initialize_page(p); - - //*stasis_page_type_ptr(p) = SLOTTED_PAGE; //LOGTREE_ROOT_PAGE; - - //logtree_state *state = (logtree_state*) ( malloc(sizeof(logtree_state))); - //state->lastLeaf = -1; - - //p->impl = state; + lastLeaf = -1; //initialize root node + stasis_page_slotted_initialize_page(p); recordid tmp = stasis_record_alloc_begin(xid, p, root_rec_size); stasis_record_alloc_done(xid,p,tmp); @@ -186,8 +196,7 @@ recordid logtree::create(int xid) && tmp.size == root_rec_size); writeRecord(xid, p, tmp, (byte*)&COMPARATOR, root_rec_size); - - + unlock(p->rwlatch); releasePage(p); @@ -207,10 +216,11 @@ void logtree::writeRecord(int xid, Page *p, recordid &rid, byte *byte_arr = stasis_record_write_begin(xid, p, rid); memcpy(byte_arr, data, datalen); //TODO: stasis write call stasis_record_write_done(xid, p, rid, byte_arr); - stasis_page_lsn_write(xid, p, 0); // XXX need real LSN? + stasis_page_lsn_write(xid, p, get_lsn(xid)); } + void logtree::writeNodeRecord(int xid, Page * p, recordid & rid, const byte *key, size_t keylen, pageid_t ptr) { @@ -220,7 +230,7 @@ void logtree::writeNodeRecord(int xid, Page * p, recordid & rid, nr->ptr = ptr; memcpy(nr+1, key, keylen); stasis_record_write_done(xid, p, rid, (byte*)nr); - stasis_page_lsn_write(xid, p, 0); // XXX need real LSN? + stasis_page_lsn_write(xid, p, get_lsn(xid)); } void logtree::writeRecord(int xid, Page *p, slotid_t slot, @@ -233,24 +243,14 @@ void logtree::writeRecord(int xid, Page *p, slotid_t slot, byte *byte_arr = stasis_record_write_begin(xid, p, rid); memcpy(byte_arr, data, datalen); //TODO: stasis write call stasis_record_write_done(xid, p, rid, byte_arr); - stasis_page_lsn_write(xid, p, 0); // XXX need real LSN? + stasis_page_lsn_write(xid, p, get_lsn(xid)); } const byte* logtree::readRecord(int xid, Page * p, recordid &rid) { - //byte *ret = (byte*)malloc(rid.size); - //const byte *nr = stasis_record_read_begin(xid,p,rid); - //memcpy(ret, nr, rid.size); - //stasis_record_read_done(xid,p,rid,nr); - - const byte *nr = stasis_record_read_begin(xid,p,rid); + const byte *nr = stasis_record_read_begin(xid,p,rid); // XXX API violation? return nr; - - //DEBUG("reading {%lld, %d, %d}\n", - // p->id, rid.slot, rid.size ); - - //return ret; } const byte* logtree::readRecord(int xid, Page * p, slotid_t slot, int64_t size) @@ -394,7 +394,7 @@ recordid logtree::appendPage(int xid, recordid tree, pageid_t & rmLeafID, // don't overwrite key... nr->ptr = child; stasis_record_write_done(xid,p,pFirstSlot,(byte*)nr); - stasis_page_lsn_write(xid, p, 0); // XXX need real LSN? + stasis_page_lsn_write(xid, p, get_lsn(xid)); if(!depth) { rmLeafID = lc->id; @@ -827,7 +827,6 @@ logtable::logtable() tree_c0 = NULL; tree_c1 = NULL; tree_c2 = NULL; -// rbtree_mut = NULL; this->mergedata = 0; fixed_page_count = -1; //tmerger = new tuplemerger(&append_merger); @@ -841,16 +840,22 @@ logtable::logtable() void logtable::tearDownTree(rbtree_ptr_t tree) { datatuple * t = 0; + rbtree_t::iterator old; for(rbtree_t::iterator delitr = tree->begin(); delitr != tree->end(); delitr++) { if(t) { + tree->erase(old); datatuple::freetuple(t); + t = 0; } t = *delitr; - tree->erase(delitr); + old = delitr; } - if(t) { datatuple::freetuple(t); } + if(t) { + tree->erase(old); + datatuple::freetuple(t); + } delete tree; } @@ -878,15 +883,14 @@ recordid logtable::allocTable(int xid) tree_c2 = new logtree(); tree_c2->create(xid); - tbl_header.c2_dp_state = Talloc(xid, sizeof(RegionAllocConf_t)); - Tset(xid, tbl_header.c2_dp_state, &DATAPAGE_REGION_ALLOC_STATIC_INITIALIZER); - + tbl_header.c2_dp_state = Talloc(xid, DataPage::RegionAllocator::header_size); + tree_c2->set_alloc(new DataPage::RegionAllocator(xid, tbl_header.c2_dp_state, 10000)); /// XXX do not hard code region length. //create the small tree tree_c1 = new logtree(); tree_c1->create(xid); - tbl_header.c1_dp_state = Talloc(xid, sizeof(RegionAllocConf_t)); - Tset(xid, tbl_header.c1_dp_state, &DATAPAGE_REGION_ALLOC_STATIC_INITIALIZER); + tbl_header.c1_dp_state = Talloc(xid, DataPage::RegionAllocator::header_size); + tree_c1->set_alloc(new DataPage::RegionAllocator(xid, tbl_header.c1_dp_state, 10000)); /// XXX do not hard code region length. tbl_header.c2_root = tree_c2->get_root_rec(); tbl_header.c2_state = tree_c2->get_tree_state(); @@ -1278,24 +1282,25 @@ void logtable::insertTuple(datatuple *tuple) } -DataPage* logtable::insertTuple(int xid, datatuple *tuple, recordid &dpstate, logtree *ltree) +DataPage* logtable::insertTuple(int xid, datatuple *tuple, DataPage::RegionAllocator * alloc, logtree *ltree) { - //create a new data page + //create a new data page -- either the last region is full, or the last data page doesn't want our tuple. (or both) DataPage * dp = 0; - + int count = 0; while(dp==0) { - dp = new DataPage(xid, fixed_page_count, - &DataPage::dp_alloc_region_rid, - &dpstate ); + dp = new DataPage(xid, fixed_page_count, alloc); //insert the record into the data page - if(!dp->append(xid, tuple)) - { - delete dp; + if(!dp->append(tuple)) + { + // the last datapage must have not wanted the tuple, and then this datapage figured out the region is full. + delete dp; dp = 0; + assert(count == 0); // only retry once. + count ++; } } @@ -1327,7 +1332,7 @@ datatuple * logtable::findTuple(int xid, datatuple::key_t key, size_t keySize, if(pid!=-1) { DataPage * dp = new DataPage(xid, pid); - dp->recordRead(xid, key, keySize, &tup); + dp->recordRead(key, keySize, &tup); delete dp; } return tup; @@ -1494,39 +1499,6 @@ int logtreeIterator::next(int xid, lladdIterator_t *it) } -/* -lladdIterator_t *logtreeIterator::copy(int xid, lladdIterator_t* i) -{ - logtreeIterator_s *it = (logtreeIterator_s*) i->impl; - logtreeIterator_s *mine = (logtreeIterator_s*) malloc(sizeof(logtreeIterator_s)); - - if(it->p) - { - mine->p = loadPage(xid, it->p->id); - readlock(mine->p->rwlatch,0); - } - else - mine->p = 0; - - memcpy(&mine->current, &it->current,sizeof(recordid)); - - if(it->t) - { - mine->t = (datatuple*)malloc(sizeof(*it->t)); //TODO: DATA IS NOT COPIED, MIGHT BE WRONG - //mine->t = malloc(sizeof(*it->t) + it->current.size); - memcpy(mine->t, it->t, sizeof(*it->t));// + it->current.size); - } - else - mine->t = 0; - - mine->justOnePage = it->justOnePage; - lladdIterator_t * ret = (lladdIterator_t*)malloc(sizeof(lladdIterator_t)); - ret->type = -1; // XXX LSM_TREE_ITERATOR - ret->impl = mine; - return ret; -} -*/ - void logtreeIterator::close(int xid, lladdIterator_t *it) { logtreeIterator_s *impl = (logtreeIterator_s*)it->impl; @@ -1542,20 +1514,3 @@ void logtreeIterator::close(int xid, lladdIterator_t *it) free(impl); free(it); } - - -///////////////////////////////////////////////////////////////// -///////////////////////////////////////////////////////////////// - - - - -double tv_to_double(struct timeval tv) -{ - return static_cast(tv.tv_sec) + - (static_cast(tv.tv_usec) / 1000000.0); -} - - -/////////////////////////////////////////////////////////////////// - diff --git a/logstore.h b/logstore.h index be1dd06..04b4a77 100644 --- a/logstore.h +++ b/logstore.h @@ -35,10 +35,6 @@ #include "tuplemerger.h" #include "datatuple.h" - -double tv_to_double(struct timeval tv); - - struct logtable_mergedata; typedef std::set rbtree_t; @@ -75,6 +71,9 @@ public: void print_tree(int xid); + static void init_stasis(); + static void deinit_stasis(); + static pageid_t alloc_region(int xid, void *conf); static pageid_t alloc_region_rid(int xid, void * ridp); static void force_region_rid(int xid, void *conf); @@ -134,6 +133,9 @@ public: void *allocator_state); + inline DataPage::RegionAllocator* get_alloc() { return region_alloc; } + inline void set_alloc(DataPage::RegionAllocator* a1) { region_alloc = a1; } // XXX kludge; must be a better api for this + // (currently, need to get rid from dpstate. add a 'register' method that sets the rid of the region allocator?) /** Initialize a page for use as an internal node of the tree. @@ -162,7 +164,7 @@ private: recordid tree_state; recordid root_rec; - + DataPage::RegionAllocator* region_alloc; }; @@ -189,7 +191,7 @@ public: static void tearDownTree(rbtree_ptr_t t); - DataPage* insertTuple(int xid, datatuple *tuple, recordid &dpstate,logtree *ltree); + DataPage* insertTuple(int xid, datatuple *tuple, DataPage::RegionAllocator * alloc,logtree *ltree); datatuple * findTuple(int xid, const datatuple::key_t key, size_t keySize, logtree *ltree); @@ -205,8 +207,8 @@ public: void set_tree_c0(rbtree_ptr_t newtree){tree_c0 = newtree;} - inline recordid & get_dpstate1(){return tbl_header.c1_dp_state;} - inline recordid & get_dpstate2(){return tbl_header.c2_dp_state;} + inline recordid get_dpstate1(){return tbl_header.c1_dp_state;} + inline recordid get_dpstate2(){return tbl_header.c2_dp_state;} int get_fixed_page_count(){return fixed_page_count;} void set_fixed_page_count(int count){fixed_page_count = count;} @@ -243,7 +245,7 @@ private: private: recordid table_rec; struct table_header tbl_header; - + logtree *tree_c2; //big tree logtree *tree_c1; //small tree rbtree_ptr_t tree_c0; // in-mem red black tree diff --git a/merger.cpp b/merger.cpp index 469c140..d77f084 100644 --- a/merger.cpp +++ b/merger.cpp @@ -12,7 +12,7 @@ inline DataPage* insertTuple(int xid, DataPage *dp, datatuple *t, logtable *ltable, logtree * ltree, - recordid & dpstate, + DataPage::RegionAllocator* alloc, int64_t &dpages, int64_t &npages); int merge_scheduler::addlogtable(logtable *ltable) @@ -325,30 +325,33 @@ void* memMergeThread(void*arg) recordid scratch_root = scratch_tree->create(xid); //save the old dp state values - RegionAllocConf_t olddp_state; - Tread(xid, ltable->get_dpstate1(), &olddp_state); - //reinitialize the dp state - Tset(xid, ltable->get_dpstate1(), &logtable::DATAPAGE_REGION_ALLOC_STATIC_INITIALIZER); - - //pthread_mutex_unlock(a->block_ready_mut); + // RegionAllocConf_t olddp_state; + // Tread(xid, ltable->get_dpstate1(), &olddp_state); + DataPage::RegionAllocator *old_alloc = ltable->get_tree_c1()->get_alloc(); + old_alloc->done(); // XXX do this earlier + //reinitialize the dp state + scratch_tree->set_alloc(new DataPage::RegionAllocator(xid, ltable->get_dpstate1() /*rid of old header*/, 10000)); // XXX should not hardcode region size +// Tset(xid, ltable->get_dpstate1(), &logtable::DATAPAGE_REGION_ALLOC_STATIC_INITIALIZER); + + //pthread_mutex_unlock(a->block_ready_mut); unlock(ltable->mergedata->header_lock); - - //: do the merge + + //: do the merge printf("mmt:\tMerging:\n"); int64_t npages = 0; mergedPages = merge_iterators(xid, itrA, itrB, ltable, scratch_tree, npages, false); - + delete itrA; delete itrB; - + //force write the new region to disk recordid scratch_alloc_state = scratch_tree->get_tree_state(); //TlsmForce(xid,scratch_root,logtree::force_region_rid, &scratch_alloc_state); // XXX When called by merger_check (at least), we hold a pin on a page that should be forced. This causes stasis to abort() the process. logtree::force_region_rid(xid, &scratch_alloc_state); //force write the new datapages - DataPage::force_region_rid(xid, <able->get_dpstate1()); + scratch_tree->get_alloc()->force_regions(xid); //writes complete //now automically replace the old c1 with new c1 @@ -378,12 +381,9 @@ void* memMergeThread(void*arg) // free old my_tree here //TODO: check logtree::free_region_rid(xid, a->my_tree, logtree::dealloc_region_rid, oldAllocState); - - //TlsmFree(xid,a->my_tree->r_,logtree::dealloc_region_rid,oldAllocState); - //TODO: check //free the old data pages - DataPage::dealloc_region_rid(xid, &olddp_state); + old_alloc->dealloc_regions(xid); Tcommit(xid); //xid = Tbegin(); @@ -533,11 +533,17 @@ void *diskMergeThread(void*arg) recordid scratch_root = scratch_tree->create(xid); //save the old dp state values - RegionAllocConf_t olddp_state; - Tread(xid, ltable->get_dpstate2(), &olddp_state); + DataPage::RegionAllocator *old_alloc1 = ltable->get_tree_c1()->get_alloc(); + DataPage::RegionAllocator *old_alloc2 = ltable->get_tree_c2()->get_alloc(); + //reinitialize the dp state //TODO: maybe you want larger regions for the second tree? - Tset(xid, ltable->get_dpstate2(), &logtable::DATAPAGE_REGION_ALLOC_STATIC_INITIALIZER); +// foo XXX Tset(xid, ltable->get_dpstate2(), &logtable::DATAPAGE_REGION_ALLOC_STATIC_INITIALIZER); + + //DataPage::RegionAllocator *newAlloc2 = new DataPage::RegionAllocator(xid, ltable->get_dpstate2(), 10000); // XXX don't hardcode region length + + scratch_tree->set_alloc(new DataPage::RegionAllocator(xid, ltable->get_dpstate2() /*rid of old header*/, 10000)); // XXX should not hardcode region size + //pthread_mutex_unlock(a->block_ready_mut); unlock(ltable->mergedata->header_lock); @@ -558,7 +564,7 @@ void *diskMergeThread(void*arg) //TlsmForce(xid,scratch_root,logtree::force_region_rid, &scratch_alloc_state); logtree::force_region_rid(xid, &scratch_alloc_state); //force write the new datapages - DataPage::force_region_rid(xid, <able->get_dpstate2()); + scratch_tree->get_alloc()->force_regions(xid); //writes complete @@ -586,18 +592,13 @@ void *diskMergeThread(void*arg) printf("dmt:\tUpdated C2's position on disk to %lld\n",scratch_root.page); Tset(xid, a->tree, &h); - - // free old my_tree here //TODO: check logtree::free_region_rid(xid, a->my_tree, logtree::dealloc_region_rid, oldAllocState); //TlsmFree(xid,a->my_tree->r_,logtree::dealloc_region_rid,oldAllocState); - //TODO: check //free the old data pages - DataPage::dealloc_region_rid(xid, &olddp_state); - - + old_alloc2->dealloc_regions(xid); *(recordid*)a->pageAllocState = scratch_alloc_state; a->my_tree = scratch_root; @@ -607,11 +608,12 @@ void *diskMergeThread(void*arg) logtree::free_region_rid(xid, (*a->in_tree)->get_root_rec(), logtree::dealloc_region_rid, &((*a->in_tree)->get_tree_state())); + old_alloc1->dealloc_regions(xid); // XXX make sure that both of these are 'unlinked' before this happens //TlsmFree(xid,a->my_tree->r_,logtree::dealloc_region_rid,oldAllocState); //TODO: check //free the old data pages - DataPage::dealloc_region_rid(xid, a->in_tree_allocer);//TODO: +// DataPage::dealloc_region_rid(xid, a->in_tree_allocer);//TODO: Tcommit(xid); @@ -657,7 +659,7 @@ int64_t merge_iterators(int xid, { //insert t1 dp = insertTuple(xid, dp, t1, ltable, scratch_tree, - ltable->get_dpstate2(), + scratch_tree->get_alloc(), // XXX inserTuple should do this for us dpages, npages); datatuple::freetuple(t1); @@ -672,7 +674,7 @@ int64_t merge_iterators(int xid, //insert merged tuple, drop deletes if(dropDeletes && !mtuple->isDelete()) - dp = insertTuple(xid, dp, mtuple, ltable, scratch_tree, ltable->get_dpstate2(), + dp = insertTuple(xid, dp, mtuple, ltable, scratch_tree, scratch_tree->get_alloc(), dpages, npages); datatuple::freetuple(t1); @@ -682,7 +684,7 @@ int64_t merge_iterators(int xid, else { //insert t2 - dp = insertTuple(xid, dp, t2, ltable, scratch_tree, ltable->get_dpstate2(), + dp = insertTuple(xid, dp, t2, ltable, scratch_tree, scratch_tree->get_alloc(), dpages, npages); // cannot free any tuples here; they may still be read through a lookup } @@ -693,7 +695,7 @@ int64_t merge_iterators(int xid, while(t1 != 0) // t1 is less than t2 { - dp = insertTuple(xid, dp, t1, ltable, scratch_tree, ltable->get_dpstate2(), + dp = insertTuple(xid, dp, t1, ltable, scratch_tree, scratch_tree->get_alloc(), dpages, npages); datatuple::freetuple(t1); @@ -715,19 +717,19 @@ inline DataPage* insertTuple(int xid, DataPage *dp, datatuple *t, logtable *ltable, logtree * ltree, - recordid & dpstate, + DataPage::RegionAllocator * alloc, int64_t &dpages, int64_t &npages) { if(dp==0) { - dp = ltable->insertTuple(xid, t, dpstate, ltree); + dp = ltable->insertTuple(xid, t, alloc, ltree); dpages++; } - else if(!dp->append(xid, t)) + else if(!dp->append(t)) { npages += dp->get_page_count(); delete dp; - dp = ltable->insertTuple(xid, t, dpstate, ltree); + dp = ltable->insertTuple(xid, t, alloc, ltree); dpages++; } diff --git a/server.cpp b/server.cpp index c9889d3..b57cedb 100644 --- a/server.cpp +++ b/server.cpp @@ -33,7 +33,7 @@ void terminate (int param) printf("Deinitializing stasis...\n"); fflush(stdout); - Tdeinit(); + logtree::deinit_stasis(); exit(0); } @@ -43,14 +43,10 @@ void initialize_server() //signal handling void (*prev_fn)(int); + logtree::init_stasis(); + prev_fn = signal (SIGINT,terminate); - bufferManagerFileHandleType = BUFFER_MANAGER_FILE_HANDLE_PFILE; - - DataPage::register_stasis_page_impl(); - - Tinit(); - int xid = Tbegin(); mscheduler = new merge_scheduler; diff --git a/test/check_datapage.cpp b/test/check_datapage.cpp index 698ee14..0f29935 100644 --- a/test/check_datapage.cpp +++ b/test/check_datapage.cpp @@ -28,11 +28,7 @@ void insertProbeIter(size_t NUM_ENTRIES) sync(); - bufferManagerNonBlockingSlowHandleType = IO_HANDLE_PFILE; - - DataPage::register_stasis_page_impl(); - - Tinit(); + logtree::init_stasis(); int xid = Tbegin(); @@ -52,6 +48,11 @@ void insertProbeIter(size_t NUM_ENTRIES) if(data_arr.size() > NUM_ENTRIES) data_arr.erase(data_arr.begin()+NUM_ENTRIES, data_arr.end()); + recordid datapage_header_rid = Talloc(xid, DataPage::RegionAllocator::header_size); + + DataPage::RegionAllocator * alloc + = new DataPage::RegionAllocator(xid, datapage_header_rid, 10000); // ~ 10 datapages per region. + recordid alloc_state = Talloc(xid,sizeof(RegionAllocConf_t)); Tset(xid,alloc_state, &logtree::REGION_ALLOC_STATIC_INITIALIZER); @@ -69,15 +70,15 @@ void insertProbeIter(size_t NUM_ENTRIES) datatuple *newtuple = datatuple::create(key_arr[i].c_str(), key_arr[i].length()+1, data_arr[i].c_str(), data_arr[i].length()+1); datasize += newtuple->byte_length(); - if(dp==NULL || !dp->append(xid, newtuple)) + if(dp==NULL || !dp->append(newtuple)) { dpages++; if(dp) delete dp; + + dp = new DataPage(xid, pcount, alloc); - dp = new DataPage(xid, pcount, &DataPage::dp_alloc_region_rid, &alloc_state ); - - bool succ = dp->append(xid, newtuple); + bool succ = dp->append(newtuple); assert(succ); dsp.push_back(dp->get_start_pid()); @@ -104,7 +105,7 @@ void insertProbeIter(size_t NUM_ENTRIES) DataPage dp(xid, dsp[i]); DataPage::RecordIterator itr = dp.begin(); datatuple *dt=0; - while( (dt=itr.getnext(xid)) != NULL) + while( (dt=itr.getnext()) != NULL) { assert(dt->keylen() == key_arr[tuplenum].length()+1); assert(dt->datalen() == data_arr[tuplenum].length()+1); @@ -118,7 +119,8 @@ void insertProbeIter(size_t NUM_ENTRIES) printf("Reads completed.\n"); Tcommit(xid); - Tdeinit(); + + logtree::deinit_stasis(); } diff --git a/test/check_gen.cpp b/test/check_gen.cpp index 5a0e719..6412468 100644 --- a/test/check_gen.cpp +++ b/test/check_gen.cpp @@ -9,10 +9,7 @@ int main(int argc, char **argv) sync(); - // PAGELAYOUT::initPageLayout(); - bufferManagerNonBlockingSlowHandleType = IO_HANDLE_PFILE; - - Tinit(); + logtree::init_stasis(); int xid = Tbegin(); @@ -31,7 +28,7 @@ int main(int argc, char **argv) logtreeIterator::close(xid, it); Tcommit(xid); - Tdeinit(); + logtree::deinit_stasis(); diff --git a/test/check_logtable.cpp b/test/check_logtable.cpp index 65f65a0..8058616 100644 --- a/test/check_logtable.cpp +++ b/test/check_logtable.cpp @@ -28,11 +28,7 @@ void insertProbeIter(size_t NUM_ENTRIES) sync(); - bufferManagerNonBlockingSlowHandleType = IO_HANDLE_PFILE; - - DataPage::register_stasis_page_impl(); - - Tinit(); + logtree::init_stasis(); int xid = Tbegin(); @@ -86,17 +82,17 @@ void insertProbeIter(size_t NUM_ENTRIES) if(dp == NULL) { - dp = ltable.insertTuple(xid, newtuple, ltable.get_dpstate1(), lt); + dp = ltable.insertTuple(xid, newtuple, ltable.get_tree_c1()->get_alloc(), lt); dpages++; dsp.push_back(dp->get_start_pid()); } else { - if(!dp->append(xid, newtuple)) + if(!dp->append(newtuple)) { npages += dp->get_page_count(); delete dp; - dp = ltable.insertTuple(xid, newtuple, ltable.get_dpstate1(), lt); + dp = ltable.insertTuple(xid, newtuple, ltable.get_tree_c1()->get_alloc(), lt); dpages++; dsp.push_back(dp->get_start_pid()); } @@ -155,8 +151,7 @@ void insertProbeIter(size_t NUM_ENTRIES) printf("Random Reads completed.\n"); Tcommit(xid); - Tdeinit(); - + logtree::deinit_stasis(); } /** @test diff --git a/test/check_logtree.cpp b/test/check_logtree.cpp index 096b6e8..f8e3248 100644 --- a/test/check_logtree.cpp +++ b/test/check_logtree.cpp @@ -32,9 +32,7 @@ void insertProbeIter_str(int NUM_ENTRIES) sync(); - bufferManagerNonBlockingSlowHandleType = IO_HANDLE_PFILE; - - Tinit(); + logtree::init_stasis(); int xid = Tbegin(); @@ -151,12 +149,8 @@ void insertProbeIter_str(int NUM_ENTRIES) logtreeIterator::close(xid, it); - - - - - Tcommit(xid); - Tdeinit(); + Tcommit(xid); + logtree::deinit_stasis(); } diff --git a/test/check_merge.cpp b/test/check_merge.cpp index 97fc434..dacccfd 100644 --- a/test/check_merge.cpp +++ b/test/check_merge.cpp @@ -24,9 +24,7 @@ void insertProbeIter(size_t NUM_ENTRIES) unlink("storefile.txt"); unlink("logfile.txt"); - DataPage::register_stasis_page_impl(); - - sync(); + logtree::init_stasis(); //data generation std::vector * data_arr = new std::vector; @@ -45,11 +43,6 @@ void insertProbeIter(size_t NUM_ENTRIES) if(data_arr->size() > NUM_ENTRIES) data_arr->erase(data_arr->begin()+NUM_ENTRIES, data_arr->end()); - - - bufferManagerNonBlockingSlowHandleType = IO_HANDLE_PFILE; - - Tinit(); int xid = Tbegin(); @@ -159,9 +152,7 @@ void insertProbeIter(size_t NUM_ENTRIES) Tcommit(xid); - Tdeinit(); - - + logtree::deinit_stasis(); } diff --git a/test/check_mergelarge.cpp b/test/check_mergelarge.cpp index 5f2837c..a4a8401 100644 --- a/test/check_mergelarge.cpp +++ b/test/check_mergelarge.cpp @@ -26,7 +26,7 @@ void insertProbeIter(size_t NUM_ENTRIES) sync(); - DataPage::register_stasis_page_impl(); + logtree::init_stasis(); //data generation // std::vector * data_arr = new std::vector; @@ -43,10 +43,6 @@ void insertProbeIter(size_t NUM_ENTRIES) NUM_ENTRIES=key_arr->size(); - bufferManagerNonBlockingSlowHandleType = IO_HANDLE_PFILE; - - Tinit(); - int xid = Tbegin(); merge_scheduler mscheduler; @@ -106,7 +102,7 @@ void insertProbeIter(size_t NUM_ENTRIES) //Tcommit(xid); - Tdeinit(); + logtree::deinit_stasis(); } diff --git a/test/check_mergetuple.cpp b/test/check_mergetuple.cpp index 5522003..38dbd7f 100644 --- a/test/check_mergetuple.cpp +++ b/test/check_mergetuple.cpp @@ -25,11 +25,12 @@ void insertProbeIter(size_t NUM_ENTRIES) unlink("logfile.txt"); sync(); + + logtree::init_stasis(); + double delete_freq = .05; double update_freq = .15; - DataPage::register_stasis_page_impl(); - //data generation typedef std::vector key_v_t; const static size_t max_partition_size = 100000; @@ -100,10 +101,6 @@ void insertProbeIter(size_t NUM_ENTRIES) NUM_ENTRIES=key_arr->size(); - bufferManagerNonBlockingSlowHandleType = IO_HANDLE_PFILE; - - Tinit(); - int xid = Tbegin(); merge_scheduler mscheduler; @@ -259,9 +256,7 @@ void insertProbeIter(size_t NUM_ENTRIES) Tcommit(xid); - Tdeinit(); - - + logtree::deinit_stasis(); } diff --git a/test/check_tcpclient.cpp b/test/check_tcpclient.cpp index 5a19aaf..e7b30b7 100644 --- a/test/check_tcpclient.cpp +++ b/test/check_tcpclient.cpp @@ -129,8 +129,7 @@ void insertProbeIter(size_t NUM_ENTRIES) assert(ret); gettimeofday(&ti_end,0); -// insert_time += tv_to_double(ti_end) - tv_to_double(ti_st); - insert_time ++; // XXX + insert_time += tv_to_double(ti_end) - tv_to_double(ti_st); datatuple::freetuple(newtuple); @@ -140,7 +139,7 @@ void insertProbeIter(size_t NUM_ENTRIES) } gettimeofday(&stop_tv,0); printf("insert time: %6.1f\n", insert_time); - printf("insert time: %6.1f\n", -1.0); // XXX (tv_to_double(stop_tv) - tv_to_double(start_tv))); + printf("insert time: %6.1f\n", (tv_to_double(stop_tv) - tv_to_double(start_tv))); printf("#deletions: %d\n#updates: %d\n", delcount, upcount); @@ -182,7 +181,7 @@ void insertProbeIter(size_t NUM_ENTRIES) logstore_client_close(l); gettimeofday(&stop_tv,0); - printf("run time: %6.1f\n", -1.0); // XXX (tv_to_double(stop_tv) - tv_to_double(start_tv))); + printf("run time: %6.1f\n", (tv_to_double(stop_tv) - tv_to_double(start_tv))); } diff --git a/test/check_util.h b/test/check_util.h index cc3f171..4dcbec8 100644 --- a/test/check_util.h +++ b/test/check_util.h @@ -153,4 +153,10 @@ void preprandstr(int count, std::vector &arr, int avg_len=50, bool } +static inline double tv_to_double(struct timeval tv) +{ + return static_cast(tv.tv_sec) + + (static_cast(tv.tv_usec) / 1000000.0); +} + #endif /* CHECK_UTIL_H_ */