diff --git a/datapage.cpp b/datapage.cpp index 53b8488..5aaa531 100644 --- a/datapage.cpp +++ b/datapage.cpp @@ -23,8 +23,7 @@ static int notSupported(int xid, Page * p) { return 0; } END_C_DECLS -template -void DataPage::register_stasis_page_impl() { +void DataPage::register_stasis_page_impl() { static page_impl pi = { DATA_PAGE, 1, @@ -58,78 +57,75 @@ void DataPage::register_stasis_page_impl() { } -template -DataPage::DataPage(int xid, RegionAllocator * alloc, pageid_t pid): // XXX Hack!! The read-only constructor signature is too close to the other's - xid_(xid), - page_count_(1), // will be opportunistically incremented as we scan the datapage. - initial_page_count_(-1), // used by append. - alloc_(alloc), // read-only, and we don't free data pages one at a time. - first_page_(pid), - write_offset_(-1) - { - assert(pid!=0); - Page *p = alloc_ ? alloc_->load_page(xid, first_page_) : loadPage(xid, first_page_); - if(!(*is_another_page_ptr(p) == 0 || *is_another_page_ptr(p) == 2)) { - printf("Page %lld is not the start of a datapage\n", first_page_); fflush(stdout); - abort(); - } - assert(*is_another_page_ptr(p) == 0 || *is_another_page_ptr(p) == 2); // would be 1 for page in the middle of a datapage - releasePage(p); +DataPage::DataPage(int xid, RegionAllocator * alloc, pageid_t pid): // XXX Hack!! The read-only constructor signature is too close to the other's + xid_(xid), + page_count_(1), // will be opportunistically incremented as we scan the datapage. + initial_page_count_(-1), // used by append. + alloc_(alloc), // read-only, and we don't free data pages one at a time. + first_page_(pid), + write_offset_(-1) + { + assert(pid!=0); + Page *p = alloc_ ? alloc_->load_page(xid, first_page_) : loadPage(xid, first_page_); + if(!(*is_another_page_ptr(p) == 0 || *is_another_page_ptr(p) == 2)) { + printf("Page %lld is not the start of a datapage\n", first_page_); fflush(stdout); + abort(); + } + assert(*is_another_page_ptr(p) == 0 || *is_another_page_ptr(p) == 2); // would be 1 for page in the middle of a datapage + releasePage(p); } -template -DataPage::DataPage(int xid, pageid_t page_count, RegionAllocator *alloc) : - xid_(xid), - page_count_(1), - initial_page_count_(page_count), - alloc_(alloc), - first_page_(alloc_->alloc_extent(xid_, page_count_)), - write_offset_(0) +DataPage::DataPage(int xid, pageid_t page_count, RegionAllocator *alloc) : + xid_(xid), + page_count_(1), + initial_page_count_(page_count), + alloc_(alloc), + first_page_(alloc_->alloc_extent(xid_, page_count_)), + write_offset_(0) { - DEBUG("Datapage page count: %lld pid = %lld\n", (long long int)initial_page_count_, (long long int)first_page_); - assert(page_count_ >= 1); - initialize(); + DEBUG("Datapage page count: %lld pid = %lld\n", (long long int)initial_page_count_, (long long int)first_page_); + assert(page_count_ >= 1); + initialize(); } -template -void DataPage::initialize() { - initialize_page(first_page_); +void DataPage::initialize() { + initialize_page(first_page_); } -template -void DataPage::initialize_page(pageid_t pageid) { - //load the first page - Page *p; + +void DataPage::initialize_page(pageid_t pageid) { + //load the first page + Page *p; #ifdef CHECK_FOR_SCRIBBLING - p = alloc_ ? alloc->load_page(xid_, pageid) : loadPage(xid_, pageid); - if(*stasis_page_type_ptr(p) == DATA_PAGE) { - printf("Collision on page %lld\n", (long long)pageid); fflush(stdout); - assert(*stasis_page_type_ptr(p) != DATA_PAGE); - } + p = alloc_ ? alloc->load_page(xid_, pageid) : loadPage(xid_, pageid); + if(*stasis_page_type_ptr(p) == DATA_PAGE) { + printf("Collision on page %lld\n", (long long)pageid); fflush(stdout); + assert(*stasis_page_type_ptr(p) != DATA_PAGE); + } #else - p = loadUninitializedPage(xid_, pageid); + p = loadUninitializedPage(xid_, pageid); #endif - - DEBUG("\t\t\t\t\t\t->%lld\n", pageid); - //initialize header - p->pageType = DATA_PAGE; - - //clear page (arranges for null-padding) XXX null pad more carefully and use sentinel value instead? - memset(p->memAddr, 0, PAGE_SIZE); + DEBUG("\t\t\t\t\t\t->%lld\n", pageid); - //we're the last page for now. - *is_another_page_ptr(p) = 0; - - //write 0 to first data size - *length_at_offset_ptr(p, calc_chunk_from_offset(write_offset_).slot) = 0; + //initialize header + p->pageType = DATA_PAGE; - //set the page dirty - stasis_page_lsn_write(xid_, p, alloc_->get_lsn(xid_)); + //clear page (arranges for null-padding) XXX null pad more carefully and use sentinel value instead? + memset(p->memAddr, 0, PAGE_SIZE); - releasePage(p); + //we're the last page for now. + *is_another_page_ptr(p) = 0; + + //write 0 to first data size + *length_at_offset_ptr(p, calc_chunk_from_offset(write_offset_).slot) = 0; + + //set the page dirty + stasis_page_lsn_write(xid_, p, alloc_->get_lsn(xid_)); + + releasePage(p); } -template -size_t DataPage::write_bytes(const byte * buf, ssize_t remaining, Page ** latch_p) { + +size_t DataPage::write_bytes(const byte * buf, ssize_t remaining, Page ** latch_p) { if(latch_p) { *latch_p = NULL; } recordid chunk = calc_chunk_from_offset(write_offset_); if(chunk.size > remaining) { @@ -152,45 +148,43 @@ size_t DataPage::write_bytes(const byte * buf, ssize_t remaining, Page ** } return chunk.size; } -template -size_t DataPage::read_bytes(byte * buf, off_t offset, ssize_t remaining) { - recordid chunk = calc_chunk_from_offset(offset); - if(chunk.size > remaining) { - chunk.size = remaining; - } - if(chunk.page >= first_page_ + page_count_) { - chunk.size = 0; // eof - } else { - Page *p = alloc_ ? alloc_->load_page(xid_, chunk.page) : loadPage(xid_, chunk.page); - if(p->pageType != DATA_PAGE) { - fprintf(stderr, "Page type %d, id %lld lsn %lld\n", (int)p->pageType, (long long)p->id, (long long)p->LSN); - assert(p->pageType == DATA_PAGE); - } - if((chunk.page + 1 == page_count_ + first_page_) - && (*is_another_page_ptr(p))) { - page_count_++; - } - memcpy(buf, data_at_offset_ptr(p, chunk.slot), chunk.size); - releasePage(p); - } - return chunk.size; +size_t DataPage::read_bytes(byte * buf, off_t offset, ssize_t remaining) { + recordid chunk = calc_chunk_from_offset(offset); + if(chunk.size > remaining) { + chunk.size = remaining; + } + if(chunk.page >= first_page_ + page_count_) { + chunk.size = 0; // eof + } else { + Page *p = alloc_ ? alloc_->load_page(xid_, chunk.page) : loadPage(xid_, chunk.page); + if(p->pageType != DATA_PAGE) { + fprintf(stderr, "Page type %d, id %lld lsn %lld\n", (int)p->pageType, (long long)p->id, (long long)p->LSN); + assert(p->pageType == DATA_PAGE); + } + if((chunk.page + 1 == page_count_ + first_page_) + && (*is_another_page_ptr(p))) { + page_count_++; + } + memcpy(buf, data_at_offset_ptr(p, chunk.slot), chunk.size); + releasePage(p); + } + return chunk.size; } -template -bool DataPage::initialize_next_page() { +bool DataPage::initialize_next_page() { recordid rid = calc_chunk_from_offset(write_offset_); assert(rid.slot == 0); DEBUG("\t\t%lld\n", (long long)rid.page); if(rid.page >= first_page_ + page_count_) { - assert(rid.page == first_page_ + page_count_); - if(alloc_->grow_extent(1)) { - page_count_++; - } else { - return false; // The region is full - } + assert(rid.page == first_page_ + page_count_); + if(alloc_->grow_extent(1)) { + page_count_++; + } else { + return false; // The region is full + } } else { - abort(); + abort(); } Page *p = alloc_ ? alloc_->load_page(xid_, rid.page-1) : loadPage(xid_, rid.page-1); @@ -202,78 +196,70 @@ bool DataPage::initialize_next_page() { return true; } -template -Page * DataPage::write_data_and_latch(const byte * buf, size_t len, bool init_next, bool latch) { +Page * DataPage::write_data_and_latch(const byte * buf, size_t len, bool init_next, bool latch) { bool first = true; Page * p = 0; while(1) { - assert(len > 0); -// if(latch) { -// if(first) { assert(!p); } else { assert(p); } -// } else { -// assert(!p); -// } - size_t written; - if(latch && first ) { - written = write_bytes(buf, len, &p); + assert(len > 0); + size_t written; + if(latch && first ) { + written = write_bytes(buf, len, &p); + } else { + written = write_bytes(buf, len); + } + if(written == 0) { + assert(!p); + return 0; // fail + } + if(written == len) { + if(latch) { + return p; } else { - written = write_bytes(buf, len); + return (Page*)1; } - if(written == 0) { - assert(!p); - return 0; // fail - } - if(written == len) { - if(latch) { - return p; - } else { -// assert(!p); - return (Page*)1; + } + if(len > PAGE_SIZE && ! first) { + assert(written > 4000); + } + buf += written; + len -= written; + if(init_next) { + if(!initialize_next_page()) { + if(p) { + unlock(p->rwlatch); + releasePage(p); } + return 0; // fail } - if(len > PAGE_SIZE && ! first) { - assert(written > 4000); - } - buf += written; - len -= written; - if(init_next) { - if(!initialize_next_page()) { - if(p) { -// assert(latch); - unlock(p->rwlatch); - releasePage(p); - } - return 0; // fail - } - } - first = false; + } + first = false; } } -template -bool DataPage::write_data(const byte * buf, size_t len, bool init_next) { +bool DataPage::write_data(const byte * buf, size_t len, bool init_next) { return 0 != write_data_and_latch(buf, len, init_next, false); } -template -bool DataPage::read_data(byte * buf, off_t offset, size_t len) { - while(1) { - assert(len > 0); - size_t read_count = read_bytes(buf, offset, len); - if(read_count == 0) { - return false; // fail - } - if(read_count == len) { - return true; // success - } - buf += read_count; - offset += read_count; - len -= read_count; - } + +bool DataPage::read_data(byte * buf, off_t offset, size_t len) { + while(1) { + assert(len > 0); + size_t read_count = read_bytes(buf, offset, len); + if(read_count == 0) { + return false; // fail + } + if(read_count == len) { + return true; // success + } + buf += read_count; + offset += read_count; + len -= read_count; + } } -template -bool DataPage::append(TUPLE const * dat) + +bool DataPage::append(datatuple const * dat) { - // First, decide if we should append to this datapage, based on whether appending will waste more or less space than starting a new datapage + // First, decide if we should append to this datapage, based on whether + // appending will waste more or less space than starting a new datapage bool accept_tuple; len_t tup_len = dat->byte_length(); @@ -281,7 +267,10 @@ bool DataPage::append(TUPLE const * dat) if(write_offset_ > (initial_page_count_ * PAGE_SIZE)) { // we already exceeded the page budget if(write_offset_ > (2 * initial_page_count_ * PAGE_SIZE)) { - // ... by a lot. Reject regardless. + // ... by a lot. Reject regardless. This prevents small tuples from + // being stuck behind giant ones without sacrificing much space + // (as a percentage of the whole index), because this path only + // can happen once per giant object. accept_tuple = false; } else { // ... by a little bit. Accept tuple if it fits on this page. @@ -292,7 +281,7 @@ bool DataPage::append(TUPLE const * dat) // tuple fits. contractually obligated to accept it. accept_tuple = true; } else if(write_offset_ == 0) { - // datapage is emptry. contractually obligated to accept tuple. + // datapage is empty. contractually obligated to accept tuple. accept_tuple = true; } else { if(tup_len > initial_page_count_ * PAGE_SIZE) { @@ -315,50 +304,45 @@ bool DataPage::append(TUPLE const * dat) DEBUG("offset %lld continuing datapage\n", write_offset_); - byte * buf = dat->to_bytes(); // TODO could be more efficient; this does a malloc and memcpy. The alternative couples us more strongly to datapage, but simplifies datapage. - len_t dat_len = dat->byte_length(); + // TODO could be more efficient; this does a malloc and memcpy. + // The alternative couples us more strongly to datatuple, but simplifies + // datapage. + byte * buf = dat->to_bytes(); + len_t dat_len = dat->byte_length(); - Page * p = write_data_and_latch((const byte*)&dat_len, sizeof(dat_len)); - bool succ = false; - if(p) { - succ = write_data(buf, dat_len); - unlock(p->rwlatch); - releasePage(p); - } + Page * p = write_data_and_latch((const byte*)&dat_len, sizeof(dat_len)); + bool succ = false; + if(p) { + succ = write_data(buf, dat_len); + unlock(p->rwlatch); + releasePage(p); + } - free(buf); + free(buf); - return succ; + return succ; } -template -bool DataPage::recordRead(const typename TUPLE::key_t key, size_t keySize, TUPLE ** buf) +bool DataPage::recordRead(const typename datatuple::key_t key, size_t keySize, datatuple ** buf) { iterator itr(this, NULL); - int match = -1; - while((*buf=itr.getnext()) != 0) - { - match = TUPLE::compare((*buf)->strippedkey(), (*buf)->strippedkeylen(), key, keySize); - - if(match<0) //keep searching - { - datatuple::freetuple(*buf); - *buf=0; - } - else if(match==0) //found - { - return true; - } - else // match > 0, then does not exist - { - datatuple::freetuple(*buf); - *buf = 0; - break; - } - } - - return false; + int match = -1; + while((*buf=itr.getnext()) != 0) { + match = datatuple::compare((*buf)->strippedkey(), (*buf)->strippedkeylen(), key, keySize); + + if(match<0) { //keep searching + datatuple::freetuple(*buf); + *buf=0; + } else if(match==0) { //found + return true; + } else { // match > 0, then does not exist + datatuple::freetuple(*buf); + *buf = 0; + break; + } + } + return false; } /////////////////////////////////////////////////////////////// @@ -366,40 +350,36 @@ bool DataPage::recordRead(const typename TUPLE::key_t key, size_t keySize /////////////////////////////////////////////////////////////// -template -TUPLE* DataPage::iterator::getnext() -{ - len_t len; - bool succ; - if(dp == NULL) { return NULL; } - // XXX hack: read latch the page that the record will live on. - // This should be handled by a read_data_in_latch function, or something... - Page * p = loadPage(dp->xid_, dp->calc_chunk_from_offset(read_offset_).page); - readlock(p->rwlatch, 0); - succ = dp->read_data((byte*)&len, read_offset_, sizeof(len)); - if((!succ) || (len == 0)) { - unlock(p->rwlatch); - releasePage(p); - return NULL; - } - read_offset_ += sizeof(len); +datatuple* DataPage::iterator::getnext() { + len_t len; + bool succ; + if(dp == NULL) { return NULL; } + // XXX hack: read latch the page that the record will live on. + // This should be handled by a read_data_in_latch function, or something... + Page * p = loadPage(dp->xid_, dp->calc_chunk_from_offset(read_offset_).page); + readlock(p->rwlatch, 0); + succ = dp->read_data((byte*)&len, read_offset_, sizeof(len)); + if((!succ) || (len == 0)) { + unlock(p->rwlatch); + releasePage(p); + return NULL; + } + read_offset_ += sizeof(len); - byte * buf = (byte*)malloc(len); - succ = dp->read_data(buf, read_offset_, len); + byte * buf = (byte*)malloc(len); + succ = dp->read_data(buf, read_offset_, len); - // release hacky latch - unlock(p->rwlatch); - releasePage(p); + // release hacky latch + unlock(p->rwlatch); + releasePage(p); - if(!succ) { read_offset_ -= sizeof(len); free(buf); return NULL; } + if(!succ) { read_offset_ -= sizeof(len); free(buf); return NULL; } - read_offset_ += len; + read_offset_ += len; - TUPLE *ret = TUPLE::from_bytes(buf); + datatuple *ret = datatuple::from_bytes(buf); - free(buf); + free(buf); - return ret; + return ret; } - -template class DataPage; diff --git a/datapage.h b/datapage.h index b1fcbe6..2ed1672 100644 --- a/datapage.h +++ b/datapage.h @@ -5,142 +5,134 @@ #include #include +#include "datatuple.h" struct RegionAllocator; //#define CHECK_FOR_SCRIBBLING -template class DataPage { public: - - class iterator - { - private: - void scan_to_key(TUPLE * key) { - if(key) { - len_t old_off = read_offset_; - TUPLE * t = getnext(); - while(t && TUPLE::compare(key->strippedkey(), key->strippedkeylen(), t->strippedkey(), t->strippedkeylen()) > 0) { - TUPLE::freetuple(t); - old_off = read_offset_; - t = getnext(); - } - if(t) { - DEBUG("datapage opened at %s\n", t->key()); - TUPLE::freetuple(t); - read_offset_ = old_off; - } else { - DEBUG("datapage key not found. Offset = %lld", read_offset_); - dp = NULL; - } - } - } - public: - iterator(DataPage *dp, TUPLE * key=NULL) : read_offset_(0), dp(dp) { - scan_to_key(key); + class iterator + { + private: + void scan_to_key(datatuple * key) { + if(key) { + len_t old_off = read_offset_; + datatuple * t = getnext(); + while(t && datatuple::compare(key->strippedkey(), key->strippedkeylen(), t->strippedkey(), t->strippedkeylen()) > 0) { + datatuple::freetuple(t); + old_off = read_offset_; + t = getnext(); + } + if(t) { + DEBUG("datapage opened at %s\n", t->key()); + datatuple::freetuple(t); + read_offset_ = old_off; + } else { + DEBUG("datapage key not found. Offset = %lld", read_offset_); + dp = NULL; + } } + } + public: + iterator(DataPage *dp, datatuple * key=NULL) : read_offset_(0), dp(dp) { + scan_to_key(key); + } - void operator=(const iterator &rhs) - { - this->read_offset_ = rhs.read_offset_; - this->dp = rhs.dp; - } + void operator=(const iterator &rhs) { + this->read_offset_ = rhs.read_offset_; + this->dp = rhs.dp; + } - //returns the next tuple and also advances the iterator - TUPLE *getnext(); + //returns the next tuple and also advances the iterator + datatuple *getnext(); - //advance the iterator by count tuples, i.e. skip over count tuples - // void advance(int xid, int count=1); - - off_t read_offset_; - DataPage *dp; - - }; + private: + off_t read_offset_; + DataPage *dp; + }; public: - /** - * if alloc is non-null, then reads will be optimized for sequential access - */ - DataPage( int xid, RegionAllocator* alloc, pageid_t pid ); + /** + * if alloc is non-null, then reads will be optimized for sequential access + */ + DataPage( int xid, RegionAllocator* alloc, pageid_t pid ); - //to be used to create new data pages - DataPage( int xid, pageid_t page_count, RegionAllocator* alloc); + //to be used to create new data pages + DataPage( int xid, pageid_t page_count, RegionAllocator* alloc); - ~DataPage() { - assert(write_offset_ == -1); + ~DataPage() { + assert(write_offset_ == -1); + } + + void writes_done() { + if(write_offset_ != -1) { + len_t dat_len = 0; // write terminating zero. + + write_data((const byte*)&dat_len, sizeof(dat_len), false); + + // if writing the zero fails, later reads will fail as well, and assume EOF. + + write_offset_ = -1; } - void writes_done() { - if(write_offset_ != -1) { - len_t dat_len = 0; // write terminating zero. + } - write_data((const byte*)&dat_len, sizeof(dat_len), false); + bool append(datatuple const * dat); + bool recordRead(const typename datatuple::key_t key, size_t keySize, datatuple ** buf); - // if writing the zero fails, later reads will fail as well, and assume EOF. + inline uint16_t recordCount(); - write_offset_ = -1; - } + iterator begin(){return iterator(this);} - } + pageid_t get_start_pid(){return first_page_;} + int get_page_count(){return page_count_;} - bool append(TUPLE const * dat); - bool recordRead(const typename TUPLE::key_t key, size_t keySize, TUPLE ** buf); - - inline uint16_t recordCount(); - - - iterator begin(){return iterator(this);} - - pageid_t get_start_pid(){return first_page_;} - int get_page_count(){return page_count_;} - - static void register_stasis_page_impl(); + static void register_stasis_page_impl(); private: - // static pageid_t dp_alloc_region(int xid, void *conf, pageid_t count); + void initialize(); - void initialize(); + static const uint16_t DATA_PAGE_HEADER_SIZE = sizeof(int32_t); + static const uint16_t DATA_PAGE_SIZE = USABLE_SIZE_OF_PAGE - DATA_PAGE_HEADER_SIZE; + typedef uint32_t len_t; -private: - static const uint16_t DATA_PAGE_HEADER_SIZE = sizeof(int32_t); - static const uint16_t DATA_PAGE_SIZE = USABLE_SIZE_OF_PAGE - DATA_PAGE_HEADER_SIZE; - typedef uint32_t len_t; + static inline int32_t* is_another_page_ptr(Page *p) { + return stasis_page_int32_ptr_from_start(p,0); + } + static inline byte * data_at_offset_ptr(Page *p, slotid_t offset) { + return ((byte*)(is_another_page_ptr(p)+1))+offset; + } + static inline len_t * length_at_offset_ptr(Page *p, slotid_t offset) { + return (len_t*)data_at_offset_ptr(p,offset); + } - static inline int32_t* is_another_page_ptr(Page *p) { - return stasis_page_int32_ptr_from_start(p,0); - } - static inline byte * data_at_offset_ptr(Page *p, slotid_t offset) { - return ((byte*)(is_another_page_ptr(p)+1))+offset; - } - static inline len_t * length_at_offset_ptr(Page *p, slotid_t offset) { - return (len_t*)data_at_offset_ptr(p,offset); - } + inline recordid calc_chunk_from_offset(off_t offset) { + recordid ret; + ret.page = first_page_ + offset / DATA_PAGE_SIZE; + ret.slot = offset % DATA_PAGE_SIZE; + ret.size = DATA_PAGE_SIZE - ret.slot; + assert(ret.size); + return ret; + } - inline recordid calc_chunk_from_offset(off_t offset) { - recordid ret; - ret.page = first_page_ + offset / DATA_PAGE_SIZE; - ret.slot = offset % DATA_PAGE_SIZE; - ret.size = DATA_PAGE_SIZE - ret.slot; - assert(ret.size); - return ret; - } - size_t write_bytes(const byte * buf, ssize_t remaining, Page ** latch_p = NULL); - size_t read_bytes(byte * buf, off_t offset, ssize_t remaining); - Page * write_data_and_latch(const byte * buf, size_t len, bool init_next = true, bool latch = true); - bool write_data(const byte * buf, size_t len, bool init_next = true); - bool read_data(byte * buf, off_t offset, size_t len); - bool initialize_next_page(); - void initialize_page(pageid_t pageid); + size_t write_bytes(const byte * buf, ssize_t remaining, Page ** latch_p = NULL); + size_t read_bytes(byte * buf, off_t offset, ssize_t remaining); + Page * write_data_and_latch(const byte * buf, size_t len, bool init_next = true, bool latch = true); + bool write_data(const byte * buf, size_t len, bool init_next = true); + bool read_data(byte * buf, off_t offset, size_t len); + bool initialize_next_page(); + void initialize_page(pageid_t pageid); - int xid_; - pageid_t page_count_; - const pageid_t initial_page_count_; - RegionAllocator *alloc_; - const pageid_t first_page_; - off_t write_offset_; // points to the next free byte (ignoring page boundaries) + int xid_; + pageid_t page_count_; + const pageid_t initial_page_count_; + RegionAllocator *alloc_; + const pageid_t first_page_; + off_t write_offset_; // points to the next free byte (ignoring page boundaries) }; #endif diff --git a/diskTreeComponent.cpp b/diskTreeComponent.cpp index 60ca433..5cd1e17 100644 --- a/diskTreeComponent.cpp +++ b/diskTreeComponent.cpp @@ -82,14 +82,14 @@ int diskTreeComponent::insertTuple(int xid, datatuple *t) return ret; } -DataPage* diskTreeComponent::insertDataPage(int xid, datatuple *tuple) { +DataPage* diskTreeComponent::insertDataPage(int xid, datatuple *tuple) { //create a new data page -- either the last region is full, or the last data page doesn't want our tuple. (or both) - DataPage * dp = 0; + DataPage * dp = 0; int count = 0; while(dp==0) { - dp = new DataPage(xid, datapage_size, ltree->get_datapage_alloc()); + dp = new DataPage(xid, datapage_size, ltree->get_datapage_alloc()); //insert the record into the data page if(!dp->append(tuple)) @@ -131,7 +131,7 @@ datatuple * diskTreeComponent::findTuple(int xid, datatuple::key_t key, size_t k if(pid!=-1) { - DataPage * dp = new DataPage(xid, 0, pid); + DataPage * dp = new DataPage(xid, 0, pid); dp->recordRead(key, keySize, &tup); delete dp; } @@ -974,7 +974,7 @@ void diskTreeComponent::iterator::init_helper(datatuple* key1) lsmIterator_->value((byte**)hack); curr_pageid = *pid_tmp; - curr_page = new DataPage(-1, ro_alloc_, curr_pageid); + curr_page = new DataPage(-1, ro_alloc_, curr_pageid); DEBUG("opening datapage iterator %lld at key %s\n.", curr_pageid, key1 ? (char*)key1->key() : "NULL"); dp_itr = new DPITR_T(curr_page, key1); @@ -1008,7 +1008,7 @@ datatuple * diskTreeComponent::iterator::next_callerFrees() size_t ret = lsmIterator_->value((byte**)hack); assert(ret == sizeof(pageid_t)); curr_pageid = *pid_tmp; - curr_page = new DataPage(-1, ro_alloc_, curr_pageid); + curr_page = new DataPage(-1, ro_alloc_, curr_pageid); DEBUG("opening datapage iterator %lld at beginning\n.", curr_pageid); dp_itr = new DPITR_T(curr_page->begin()); diff --git a/diskTreeComponent.h b/diskTreeComponent.h index 499f5fa..32937ad 100644 --- a/diskTreeComponent.h +++ b/diskTreeComponent.h @@ -87,10 +87,10 @@ class diskTreeComponent { private: - DataPage* insertDataPage(int xid, datatuple *tuple); + DataPage* insertDataPage(int xid, datatuple *tuple); internalNodes * ltree; - DataPage* dp; + DataPage* dp; pageid_t datapage_size; /*mergeManager::mergeStats*/ void *stats; // XXX hack to work around circular includes. @@ -226,8 +226,8 @@ class diskTreeComponent { diskTreeComponent::internalNodes::iterator* lsmIterator_; pageid_t curr_pageid; //current page id - DataPage *curr_page; //current page - typedef DataPage::iterator DPITR_T; + DataPage *curr_page; //current page + typedef DataPage::iterator DPITR_T; DPITR_T *dp_itr; }; diff --git a/logstore.cpp b/logstore.cpp index ffe5469..f22cc8d 100644 --- a/logstore.cpp +++ b/logstore.cpp @@ -97,7 +97,7 @@ logtable::~logtable() template void logtable::init_stasis() { - DataPage::register_stasis_page_impl(); + DataPage::register_stasis_page_impl(); stasis_buffer_manager_hint_writes_are_sequential = 1; Tinit(); diff --git a/mergeStats.h b/mergeStats.h index d35b7ed..cae6958 100644 --- a/mergeStats.h +++ b/mergeStats.h @@ -174,7 +174,7 @@ class mergeStats { } void merged_tuples(datatuple * merged, datatuple * small, datatuple * large) { } - void wrote_datapage(DataPage *dp) { + void wrote_datapage(DataPage *dp) { #if EXTENDED_STATS stats_num_datapages_out++; stats_bytes_out_with_overhead += (PAGE_SIZE * dp->get_page_count()); diff --git a/newserver.cpp b/newserver.cpp index d86c48e..9f99f58 100644 --- a/newserver.cpp +++ b/newserver.cpp @@ -24,6 +24,7 @@ int main(int argc, char *argv[]) int64_t c0_size = 1024 * 1024 * 512 * 1; int log_mode = 0; // do not log by default. int64_t expiry_delta = 0; // do not gc by default + int port = simpleServer::DEFAULT_PORT; stasis_buffer_manager_size = 1 * 1024 * 1024 * 1024 / PAGE_SIZE; // 1.5GB total for(int i = 1; i < argc; i++) { @@ -41,8 +42,11 @@ int main(int argc, char *argv[]) } else if(!strcmp(argv[i], "--expiry-delta")) { i++; expiry_delta = atoi(argv[i]); + } else if(!strcmp(argv[i], "--port")) { + i++; + port = atoi(argv[i]); } else { - fprintf(stderr, "Usage: %s [--test|--benchmark] [--log-mode ] [--expiry-delta ]", argv[0]); + fprintf(stderr, "Usage: %s [--test|--benchmark] [--log-mode ] [--expiry-delta ] [--port ]", argv[0]); abort(); } } @@ -73,7 +77,7 @@ int main(int argc, char *argv[]) mscheduler->start(); ltable.replayLog(); - simpleServer *lserver = new simpleServer(<able); + simpleServer *lserver = new simpleServer(<able, simpleServer::DEFAULT_THREADS, port); lserver->acceptLoop(); diff --git a/test/check_datapage.cpp b/test/check_datapage.cpp index cec840c..0f4582d 100644 --- a/test/check_datapage.cpp +++ b/test/check_datapage.cpp @@ -20,9 +20,6 @@ #undef begin #undef end -template class DataPage; - - void insertWithConcurrentReads(size_t NUM_ENTRIES) { srand(1001); unlink("storefile.txt"); @@ -58,7 +55,7 @@ void insertWithConcurrentReads(size_t NUM_ENTRIES) { int pcount = 1000; int dpages = 0; - DataPage *dp=0; + DataPage *dp=0; int64_t datasize = 0; std::vector dsp; size_t last_i = 0; @@ -84,7 +81,7 @@ void insertWithConcurrentReads(size_t NUM_ENTRIES) { xid = Tbegin(); alloc = new RegionAllocator(xid, 10000); - dp = new DataPage(xid, pcount, alloc); + dp = new DataPage(xid, pcount, alloc); // printf("%lld\n", dp->get_start_pid()); bool succ = dp->append(newtuple); assert(succ); @@ -95,7 +92,7 @@ void insertWithConcurrentReads(size_t NUM_ENTRIES) { if(j >= key_arr.size()) { j = key_arr.size()-1; } bool found = 0; { - DataPage::iterator it = dp->begin(); + DataPage::iterator it = dp->begin(); datatuple * dt; while((dt = it.getnext()) != NULL) { if(!strcmp((char*)dt->rawkey(), key_arr[j].c_str())) { @@ -160,7 +157,7 @@ void insertProbeIter(size_t NUM_ENTRIES) int pcount = 1000; int dpages = 0; - DataPage *dp=0; + DataPage *dp=0; int64_t datasize = 0; std::vector dsp; for(size_t i = 0; i < NUM_ENTRIES; i++) @@ -176,7 +173,7 @@ void insertProbeIter(size_t NUM_ENTRIES) dp->writes_done(); delete dp; - dp = new DataPage(xid, pcount, alloc); + dp = new DataPage(xid, pcount, alloc); bool succ = dp->append(newtuple); assert(succ); @@ -204,8 +201,8 @@ void insertProbeIter(size_t NUM_ENTRIES) int tuplenum = 0; for(int i = 0; i < dpages ; i++) { - DataPage dp(xid, 0, dsp[i]); - DataPage::iterator itr = dp.begin(); + DataPage dp(xid, 0, dsp[i]); + DataPage::iterator itr = dp.begin(); datatuple *dt=0; while( (dt=itr.getnext()) != NULL) {