datapage is no longer a template
git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@2666 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
parent
f0ba49a649
commit
0923cd9d96
8 changed files with 318 additions and 345 deletions
412
datapage.cpp
412
datapage.cpp
|
@ -23,8 +23,7 @@ static int notSupported(int xid, Page * p) { return 0; }
|
|||
|
||||
END_C_DECLS
|
||||
|
||||
template <class TUPLE>
|
||||
void DataPage<TUPLE>::register_stasis_page_impl() {
|
||||
void DataPage::register_stasis_page_impl() {
|
||||
static page_impl pi = {
|
||||
DATA_PAGE,
|
||||
1,
|
||||
|
@ -58,78 +57,75 @@ void DataPage<TUPLE>::register_stasis_page_impl() {
|
|||
|
||||
}
|
||||
|
||||
template <class TUPLE>
|
||||
DataPage<TUPLE>::DataPage(int xid, RegionAllocator * alloc, pageid_t pid): // XXX Hack!! The read-only constructor signature is too close to the other's
|
||||
xid_(xid),
|
||||
page_count_(1), // will be opportunistically incremented as we scan the datapage.
|
||||
initial_page_count_(-1), // used by append.
|
||||
alloc_(alloc), // read-only, and we don't free data pages one at a time.
|
||||
first_page_(pid),
|
||||
write_offset_(-1)
|
||||
{
|
||||
assert(pid!=0);
|
||||
Page *p = alloc_ ? alloc_->load_page(xid, first_page_) : loadPage(xid, first_page_);
|
||||
if(!(*is_another_page_ptr(p) == 0 || *is_another_page_ptr(p) == 2)) {
|
||||
printf("Page %lld is not the start of a datapage\n", first_page_); fflush(stdout);
|
||||
abort();
|
||||
}
|
||||
assert(*is_another_page_ptr(p) == 0 || *is_another_page_ptr(p) == 2); // would be 1 for page in the middle of a datapage
|
||||
releasePage(p);
|
||||
DataPage::DataPage(int xid, RegionAllocator * alloc, pageid_t pid): // XXX Hack!! The read-only constructor signature is too close to the other's
|
||||
xid_(xid),
|
||||
page_count_(1), // will be opportunistically incremented as we scan the datapage.
|
||||
initial_page_count_(-1), // used by append.
|
||||
alloc_(alloc), // read-only, and we don't free data pages one at a time.
|
||||
first_page_(pid),
|
||||
write_offset_(-1)
|
||||
{
|
||||
assert(pid!=0);
|
||||
Page *p = alloc_ ? alloc_->load_page(xid, first_page_) : loadPage(xid, first_page_);
|
||||
if(!(*is_another_page_ptr(p) == 0 || *is_another_page_ptr(p) == 2)) {
|
||||
printf("Page %lld is not the start of a datapage\n", first_page_); fflush(stdout);
|
||||
abort();
|
||||
}
|
||||
assert(*is_another_page_ptr(p) == 0 || *is_another_page_ptr(p) == 2); // would be 1 for page in the middle of a datapage
|
||||
releasePage(p);
|
||||
}
|
||||
|
||||
template <class TUPLE>
|
||||
DataPage<TUPLE>::DataPage(int xid, pageid_t page_count, RegionAllocator *alloc) :
|
||||
xid_(xid),
|
||||
page_count_(1),
|
||||
initial_page_count_(page_count),
|
||||
alloc_(alloc),
|
||||
first_page_(alloc_->alloc_extent(xid_, page_count_)),
|
||||
write_offset_(0)
|
||||
DataPage::DataPage(int xid, pageid_t page_count, RegionAllocator *alloc) :
|
||||
xid_(xid),
|
||||
page_count_(1),
|
||||
initial_page_count_(page_count),
|
||||
alloc_(alloc),
|
||||
first_page_(alloc_->alloc_extent(xid_, page_count_)),
|
||||
write_offset_(0)
|
||||
{
|
||||
DEBUG("Datapage page count: %lld pid = %lld\n", (long long int)initial_page_count_, (long long int)first_page_);
|
||||
assert(page_count_ >= 1);
|
||||
initialize();
|
||||
DEBUG("Datapage page count: %lld pid = %lld\n", (long long int)initial_page_count_, (long long int)first_page_);
|
||||
assert(page_count_ >= 1);
|
||||
initialize();
|
||||
}
|
||||
|
||||
template<class TUPLE>
|
||||
void DataPage<TUPLE>::initialize() {
|
||||
initialize_page(first_page_);
|
||||
void DataPage::initialize() {
|
||||
initialize_page(first_page_);
|
||||
}
|
||||
template<class TUPLE>
|
||||
void DataPage<TUPLE>::initialize_page(pageid_t pageid) {
|
||||
//load the first page
|
||||
Page *p;
|
||||
|
||||
void DataPage::initialize_page(pageid_t pageid) {
|
||||
//load the first page
|
||||
Page *p;
|
||||
#ifdef CHECK_FOR_SCRIBBLING
|
||||
p = alloc_ ? alloc->load_page(xid_, pageid) : loadPage(xid_, pageid);
|
||||
if(*stasis_page_type_ptr(p) == DATA_PAGE) {
|
||||
printf("Collision on page %lld\n", (long long)pageid); fflush(stdout);
|
||||
assert(*stasis_page_type_ptr(p) != DATA_PAGE);
|
||||
}
|
||||
p = alloc_ ? alloc->load_page(xid_, pageid) : loadPage(xid_, pageid);
|
||||
if(*stasis_page_type_ptr(p) == DATA_PAGE) {
|
||||
printf("Collision on page %lld\n", (long long)pageid); fflush(stdout);
|
||||
assert(*stasis_page_type_ptr(p) != DATA_PAGE);
|
||||
}
|
||||
#else
|
||||
p = loadUninitializedPage(xid_, pageid);
|
||||
p = loadUninitializedPage(xid_, pageid);
|
||||
#endif
|
||||
|
||||
DEBUG("\t\t\t\t\t\t->%lld\n", pageid);
|
||||
|
||||
//initialize header
|
||||
p->pageType = DATA_PAGE;
|
||||
|
||||
//clear page (arranges for null-padding) XXX null pad more carefully and use sentinel value instead?
|
||||
memset(p->memAddr, 0, PAGE_SIZE);
|
||||
DEBUG("\t\t\t\t\t\t->%lld\n", pageid);
|
||||
|
||||
//we're the last page for now.
|
||||
*is_another_page_ptr(p) = 0;
|
||||
|
||||
//write 0 to first data size
|
||||
*length_at_offset_ptr(p, calc_chunk_from_offset(write_offset_).slot) = 0;
|
||||
//initialize header
|
||||
p->pageType = DATA_PAGE;
|
||||
|
||||
//set the page dirty
|
||||
stasis_page_lsn_write(xid_, p, alloc_->get_lsn(xid_));
|
||||
//clear page (arranges for null-padding) XXX null pad more carefully and use sentinel value instead?
|
||||
memset(p->memAddr, 0, PAGE_SIZE);
|
||||
|
||||
releasePage(p);
|
||||
//we're the last page for now.
|
||||
*is_another_page_ptr(p) = 0;
|
||||
|
||||
//write 0 to first data size
|
||||
*length_at_offset_ptr(p, calc_chunk_from_offset(write_offset_).slot) = 0;
|
||||
|
||||
//set the page dirty
|
||||
stasis_page_lsn_write(xid_, p, alloc_->get_lsn(xid_));
|
||||
|
||||
releasePage(p);
|
||||
}
|
||||
template <class TUPLE>
|
||||
size_t DataPage<TUPLE>::write_bytes(const byte * buf, ssize_t remaining, Page ** latch_p) {
|
||||
|
||||
size_t DataPage::write_bytes(const byte * buf, ssize_t remaining, Page ** latch_p) {
|
||||
if(latch_p) { *latch_p = NULL; }
|
||||
recordid chunk = calc_chunk_from_offset(write_offset_);
|
||||
if(chunk.size > remaining) {
|
||||
|
@ -152,45 +148,43 @@ size_t DataPage<TUPLE>::write_bytes(const byte * buf, ssize_t remaining, Page **
|
|||
}
|
||||
return chunk.size;
|
||||
}
|
||||
template <class TUPLE>
|
||||
size_t DataPage<TUPLE>::read_bytes(byte * buf, off_t offset, ssize_t remaining) {
|
||||
recordid chunk = calc_chunk_from_offset(offset);
|
||||
if(chunk.size > remaining) {
|
||||
chunk.size = remaining;
|
||||
}
|
||||
if(chunk.page >= first_page_ + page_count_) {
|
||||
chunk.size = 0; // eof
|
||||
} else {
|
||||
Page *p = alloc_ ? alloc_->load_page(xid_, chunk.page) : loadPage(xid_, chunk.page);
|
||||
if(p->pageType != DATA_PAGE) {
|
||||
fprintf(stderr, "Page type %d, id %lld lsn %lld\n", (int)p->pageType, (long long)p->id, (long long)p->LSN);
|
||||
assert(p->pageType == DATA_PAGE);
|
||||
}
|
||||
if((chunk.page + 1 == page_count_ + first_page_)
|
||||
&& (*is_another_page_ptr(p))) {
|
||||
page_count_++;
|
||||
}
|
||||
memcpy(buf, data_at_offset_ptr(p, chunk.slot), chunk.size);
|
||||
releasePage(p);
|
||||
}
|
||||
return chunk.size;
|
||||
size_t DataPage::read_bytes(byte * buf, off_t offset, ssize_t remaining) {
|
||||
recordid chunk = calc_chunk_from_offset(offset);
|
||||
if(chunk.size > remaining) {
|
||||
chunk.size = remaining;
|
||||
}
|
||||
if(chunk.page >= first_page_ + page_count_) {
|
||||
chunk.size = 0; // eof
|
||||
} else {
|
||||
Page *p = alloc_ ? alloc_->load_page(xid_, chunk.page) : loadPage(xid_, chunk.page);
|
||||
if(p->pageType != DATA_PAGE) {
|
||||
fprintf(stderr, "Page type %d, id %lld lsn %lld\n", (int)p->pageType, (long long)p->id, (long long)p->LSN);
|
||||
assert(p->pageType == DATA_PAGE);
|
||||
}
|
||||
if((chunk.page + 1 == page_count_ + first_page_)
|
||||
&& (*is_another_page_ptr(p))) {
|
||||
page_count_++;
|
||||
}
|
||||
memcpy(buf, data_at_offset_ptr(p, chunk.slot), chunk.size);
|
||||
releasePage(p);
|
||||
}
|
||||
return chunk.size;
|
||||
}
|
||||
|
||||
template <class TUPLE>
|
||||
bool DataPage<TUPLE>::initialize_next_page() {
|
||||
bool DataPage::initialize_next_page() {
|
||||
recordid rid = calc_chunk_from_offset(write_offset_);
|
||||
assert(rid.slot == 0);
|
||||
DEBUG("\t\t%lld\n", (long long)rid.page);
|
||||
|
||||
if(rid.page >= first_page_ + page_count_) {
|
||||
assert(rid.page == first_page_ + page_count_);
|
||||
if(alloc_->grow_extent(1)) {
|
||||
page_count_++;
|
||||
} else {
|
||||
return false; // The region is full
|
||||
}
|
||||
assert(rid.page == first_page_ + page_count_);
|
||||
if(alloc_->grow_extent(1)) {
|
||||
page_count_++;
|
||||
} else {
|
||||
return false; // The region is full
|
||||
}
|
||||
} else {
|
||||
abort();
|
||||
abort();
|
||||
}
|
||||
|
||||
Page *p = alloc_ ? alloc_->load_page(xid_, rid.page-1) : loadPage(xid_, rid.page-1);
|
||||
|
@ -202,78 +196,70 @@ bool DataPage<TUPLE>::initialize_next_page() {
|
|||
return true;
|
||||
}
|
||||
|
||||
template<class TUPLE>
|
||||
Page * DataPage<TUPLE>::write_data_and_latch(const byte * buf, size_t len, bool init_next, bool latch) {
|
||||
Page * DataPage::write_data_and_latch(const byte * buf, size_t len, bool init_next, bool latch) {
|
||||
bool first = true;
|
||||
Page * p = 0;
|
||||
while(1) {
|
||||
assert(len > 0);
|
||||
// if(latch) {
|
||||
// if(first) { assert(!p); } else { assert(p); }
|
||||
// } else {
|
||||
// assert(!p);
|
||||
// }
|
||||
size_t written;
|
||||
if(latch && first ) {
|
||||
written = write_bytes(buf, len, &p);
|
||||
assert(len > 0);
|
||||
size_t written;
|
||||
if(latch && first ) {
|
||||
written = write_bytes(buf, len, &p);
|
||||
} else {
|
||||
written = write_bytes(buf, len);
|
||||
}
|
||||
if(written == 0) {
|
||||
assert(!p);
|
||||
return 0; // fail
|
||||
}
|
||||
if(written == len) {
|
||||
if(latch) {
|
||||
return p;
|
||||
} else {
|
||||
written = write_bytes(buf, len);
|
||||
return (Page*)1;
|
||||
}
|
||||
if(written == 0) {
|
||||
assert(!p);
|
||||
return 0; // fail
|
||||
}
|
||||
if(written == len) {
|
||||
if(latch) {
|
||||
return p;
|
||||
} else {
|
||||
// assert(!p);
|
||||
return (Page*)1;
|
||||
}
|
||||
if(len > PAGE_SIZE && ! first) {
|
||||
assert(written > 4000);
|
||||
}
|
||||
buf += written;
|
||||
len -= written;
|
||||
if(init_next) {
|
||||
if(!initialize_next_page()) {
|
||||
if(p) {
|
||||
unlock(p->rwlatch);
|
||||
releasePage(p);
|
||||
}
|
||||
return 0; // fail
|
||||
}
|
||||
if(len > PAGE_SIZE && ! first) {
|
||||
assert(written > 4000);
|
||||
}
|
||||
buf += written;
|
||||
len -= written;
|
||||
if(init_next) {
|
||||
if(!initialize_next_page()) {
|
||||
if(p) {
|
||||
// assert(latch);
|
||||
unlock(p->rwlatch);
|
||||
releasePage(p);
|
||||
}
|
||||
return 0; // fail
|
||||
}
|
||||
}
|
||||
first = false;
|
||||
}
|
||||
first = false;
|
||||
}
|
||||
}
|
||||
|
||||
template <class TUPLE>
|
||||
bool DataPage<TUPLE>::write_data(const byte * buf, size_t len, bool init_next) {
|
||||
bool DataPage::write_data(const byte * buf, size_t len, bool init_next) {
|
||||
return 0 != write_data_and_latch(buf, len, init_next, false);
|
||||
}
|
||||
template <class TUPLE>
|
||||
bool DataPage<TUPLE>::read_data(byte * buf, off_t offset, size_t len) {
|
||||
while(1) {
|
||||
assert(len > 0);
|
||||
size_t read_count = read_bytes(buf, offset, len);
|
||||
if(read_count == 0) {
|
||||
return false; // fail
|
||||
}
|
||||
if(read_count == len) {
|
||||
return true; // success
|
||||
}
|
||||
buf += read_count;
|
||||
offset += read_count;
|
||||
len -= read_count;
|
||||
}
|
||||
|
||||
bool DataPage::read_data(byte * buf, off_t offset, size_t len) {
|
||||
while(1) {
|
||||
assert(len > 0);
|
||||
size_t read_count = read_bytes(buf, offset, len);
|
||||
if(read_count == 0) {
|
||||
return false; // fail
|
||||
}
|
||||
if(read_count == len) {
|
||||
return true; // success
|
||||
}
|
||||
buf += read_count;
|
||||
offset += read_count;
|
||||
len -= read_count;
|
||||
}
|
||||
}
|
||||
template <class TUPLE>
|
||||
bool DataPage<TUPLE>::append(TUPLE const * dat)
|
||||
|
||||
bool DataPage::append(datatuple const * dat)
|
||||
{
|
||||
// First, decide if we should append to this datapage, based on whether appending will waste more or less space than starting a new datapage
|
||||
// First, decide if we should append to this datapage, based on whether
|
||||
// appending will waste more or less space than starting a new datapage
|
||||
|
||||
bool accept_tuple;
|
||||
len_t tup_len = dat->byte_length();
|
||||
|
@ -281,7 +267,10 @@ bool DataPage<TUPLE>::append(TUPLE const * dat)
|
|||
if(write_offset_ > (initial_page_count_ * PAGE_SIZE)) {
|
||||
// we already exceeded the page budget
|
||||
if(write_offset_ > (2 * initial_page_count_ * PAGE_SIZE)) {
|
||||
// ... by a lot. Reject regardless.
|
||||
// ... by a lot. Reject regardless. This prevents small tuples from
|
||||
// being stuck behind giant ones without sacrificing much space
|
||||
// (as a percentage of the whole index), because this path only
|
||||
// can happen once per giant object.
|
||||
accept_tuple = false;
|
||||
} else {
|
||||
// ... by a little bit. Accept tuple if it fits on this page.
|
||||
|
@ -292,7 +281,7 @@ bool DataPage<TUPLE>::append(TUPLE const * dat)
|
|||
// tuple fits. contractually obligated to accept it.
|
||||
accept_tuple = true;
|
||||
} else if(write_offset_ == 0) {
|
||||
// datapage is emptry. contractually obligated to accept tuple.
|
||||
// datapage is empty. contractually obligated to accept tuple.
|
||||
accept_tuple = true;
|
||||
} else {
|
||||
if(tup_len > initial_page_count_ * PAGE_SIZE) {
|
||||
|
@ -315,50 +304,45 @@ bool DataPage<TUPLE>::append(TUPLE const * dat)
|
|||
|
||||
DEBUG("offset %lld continuing datapage\n", write_offset_);
|
||||
|
||||
byte * buf = dat->to_bytes(); // TODO could be more efficient; this does a malloc and memcpy. The alternative couples us more strongly to datapage, but simplifies datapage.
|
||||
len_t dat_len = dat->byte_length();
|
||||
// TODO could be more efficient; this does a malloc and memcpy.
|
||||
// The alternative couples us more strongly to datatuple, but simplifies
|
||||
// datapage.
|
||||
byte * buf = dat->to_bytes();
|
||||
len_t dat_len = dat->byte_length();
|
||||
|
||||
Page * p = write_data_and_latch((const byte*)&dat_len, sizeof(dat_len));
|
||||
bool succ = false;
|
||||
if(p) {
|
||||
succ = write_data(buf, dat_len);
|
||||
unlock(p->rwlatch);
|
||||
releasePage(p);
|
||||
}
|
||||
Page * p = write_data_and_latch((const byte*)&dat_len, sizeof(dat_len));
|
||||
bool succ = false;
|
||||
if(p) {
|
||||
succ = write_data(buf, dat_len);
|
||||
unlock(p->rwlatch);
|
||||
releasePage(p);
|
||||
}
|
||||
|
||||
free(buf);
|
||||
free(buf);
|
||||
|
||||
return succ;
|
||||
return succ;
|
||||
}
|
||||
|
||||
template <class TUPLE>
|
||||
bool DataPage<TUPLE>::recordRead(const typename TUPLE::key_t key, size_t keySize, TUPLE ** buf)
|
||||
bool DataPage::recordRead(const typename datatuple::key_t key, size_t keySize, datatuple ** buf)
|
||||
{
|
||||
iterator itr(this, NULL);
|
||||
|
||||
int match = -1;
|
||||
while((*buf=itr.getnext()) != 0)
|
||||
{
|
||||
match = TUPLE::compare((*buf)->strippedkey(), (*buf)->strippedkeylen(), key, keySize);
|
||||
|
||||
if(match<0) //keep searching
|
||||
{
|
||||
datatuple::freetuple(*buf);
|
||||
*buf=0;
|
||||
}
|
||||
else if(match==0) //found
|
||||
{
|
||||
return true;
|
||||
}
|
||||
else // match > 0, then does not exist
|
||||
{
|
||||
datatuple::freetuple(*buf);
|
||||
*buf = 0;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
int match = -1;
|
||||
while((*buf=itr.getnext()) != 0) {
|
||||
match = datatuple::compare((*buf)->strippedkey(), (*buf)->strippedkeylen(), key, keySize);
|
||||
|
||||
if(match<0) { //keep searching
|
||||
datatuple::freetuple(*buf);
|
||||
*buf=0;
|
||||
} else if(match==0) { //found
|
||||
return true;
|
||||
} else { // match > 0, then does not exist
|
||||
datatuple::freetuple(*buf);
|
||||
*buf = 0;
|
||||
break;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////
|
||||
|
@ -366,40 +350,36 @@ bool DataPage<TUPLE>::recordRead(const typename TUPLE::key_t key, size_t keySize
|
|||
///////////////////////////////////////////////////////////////
|
||||
|
||||
|
||||
template <class TUPLE>
|
||||
TUPLE* DataPage<TUPLE>::iterator::getnext()
|
||||
{
|
||||
len_t len;
|
||||
bool succ;
|
||||
if(dp == NULL) { return NULL; }
|
||||
// XXX hack: read latch the page that the record will live on.
|
||||
// This should be handled by a read_data_in_latch function, or something...
|
||||
Page * p = loadPage(dp->xid_, dp->calc_chunk_from_offset(read_offset_).page);
|
||||
readlock(p->rwlatch, 0);
|
||||
succ = dp->read_data((byte*)&len, read_offset_, sizeof(len));
|
||||
if((!succ) || (len == 0)) {
|
||||
unlock(p->rwlatch);
|
||||
releasePage(p);
|
||||
return NULL;
|
||||
}
|
||||
read_offset_ += sizeof(len);
|
||||
datatuple* DataPage::iterator::getnext() {
|
||||
len_t len;
|
||||
bool succ;
|
||||
if(dp == NULL) { return NULL; }
|
||||
// XXX hack: read latch the page that the record will live on.
|
||||
// This should be handled by a read_data_in_latch function, or something...
|
||||
Page * p = loadPage(dp->xid_, dp->calc_chunk_from_offset(read_offset_).page);
|
||||
readlock(p->rwlatch, 0);
|
||||
succ = dp->read_data((byte*)&len, read_offset_, sizeof(len));
|
||||
if((!succ) || (len == 0)) {
|
||||
unlock(p->rwlatch);
|
||||
releasePage(p);
|
||||
return NULL;
|
||||
}
|
||||
read_offset_ += sizeof(len);
|
||||
|
||||
byte * buf = (byte*)malloc(len);
|
||||
succ = dp->read_data(buf, read_offset_, len);
|
||||
byte * buf = (byte*)malloc(len);
|
||||
succ = dp->read_data(buf, read_offset_, len);
|
||||
|
||||
// release hacky latch
|
||||
unlock(p->rwlatch);
|
||||
releasePage(p);
|
||||
// release hacky latch
|
||||
unlock(p->rwlatch);
|
||||
releasePage(p);
|
||||
|
||||
if(!succ) { read_offset_ -= sizeof(len); free(buf); return NULL; }
|
||||
if(!succ) { read_offset_ -= sizeof(len); free(buf); return NULL; }
|
||||
|
||||
read_offset_ += len;
|
||||
read_offset_ += len;
|
||||
|
||||
TUPLE *ret = TUPLE::from_bytes(buf);
|
||||
datatuple *ret = datatuple::from_bytes(buf);
|
||||
|
||||
free(buf);
|
||||
free(buf);
|
||||
|
||||
return ret;
|
||||
return ret;
|
||||
}
|
||||
|
||||
template class DataPage<datatuple>;
|
||||
|
|
202
datapage.h
202
datapage.h
|
@ -5,142 +5,134 @@
|
|||
|
||||
#include <stasis/page.h>
|
||||
#include <stasis/constants.h>
|
||||
#include "datatuple.h"
|
||||
|
||||
struct RegionAllocator;
|
||||
|
||||
//#define CHECK_FOR_SCRIBBLING
|
||||
|
||||
template<class TUPLE>
|
||||
class DataPage
|
||||
{
|
||||
public:
|
||||
|
||||
class iterator
|
||||
{
|
||||
private:
|
||||
void scan_to_key(TUPLE * key) {
|
||||
if(key) {
|
||||
len_t old_off = read_offset_;
|
||||
TUPLE * t = getnext();
|
||||
while(t && TUPLE::compare(key->strippedkey(), key->strippedkeylen(), t->strippedkey(), t->strippedkeylen()) > 0) {
|
||||
TUPLE::freetuple(t);
|
||||
old_off = read_offset_;
|
||||
t = getnext();
|
||||
}
|
||||
if(t) {
|
||||
DEBUG("datapage opened at %s\n", t->key());
|
||||
TUPLE::freetuple(t);
|
||||
read_offset_ = old_off;
|
||||
} else {
|
||||
DEBUG("datapage key not found. Offset = %lld", read_offset_);
|
||||
dp = NULL;
|
||||
}
|
||||
}
|
||||
}
|
||||
public:
|
||||
iterator(DataPage *dp, TUPLE * key=NULL) : read_offset_(0), dp(dp) {
|
||||
scan_to_key(key);
|
||||
class iterator
|
||||
{
|
||||
private:
|
||||
void scan_to_key(datatuple * key) {
|
||||
if(key) {
|
||||
len_t old_off = read_offset_;
|
||||
datatuple * t = getnext();
|
||||
while(t && datatuple::compare(key->strippedkey(), key->strippedkeylen(), t->strippedkey(), t->strippedkeylen()) > 0) {
|
||||
datatuple::freetuple(t);
|
||||
old_off = read_offset_;
|
||||
t = getnext();
|
||||
}
|
||||
if(t) {
|
||||
DEBUG("datapage opened at %s\n", t->key());
|
||||
datatuple::freetuple(t);
|
||||
read_offset_ = old_off;
|
||||
} else {
|
||||
DEBUG("datapage key not found. Offset = %lld", read_offset_);
|
||||
dp = NULL;
|
||||
}
|
||||
}
|
||||
}
|
||||
public:
|
||||
iterator(DataPage *dp, datatuple * key=NULL) : read_offset_(0), dp(dp) {
|
||||
scan_to_key(key);
|
||||
}
|
||||
|
||||
void operator=(const iterator &rhs)
|
||||
{
|
||||
this->read_offset_ = rhs.read_offset_;
|
||||
this->dp = rhs.dp;
|
||||
}
|
||||
void operator=(const iterator &rhs) {
|
||||
this->read_offset_ = rhs.read_offset_;
|
||||
this->dp = rhs.dp;
|
||||
}
|
||||
|
||||
//returns the next tuple and also advances the iterator
|
||||
TUPLE *getnext();
|
||||
//returns the next tuple and also advances the iterator
|
||||
datatuple *getnext();
|
||||
|
||||
//advance the iterator by count tuples, i.e. skip over count tuples
|
||||
// void advance(int xid, int count=1);
|
||||
|
||||
off_t read_offset_;
|
||||
DataPage *dp;
|
||||
|
||||
};
|
||||
private:
|
||||
off_t read_offset_;
|
||||
DataPage *dp;
|
||||
};
|
||||
|
||||
public:
|
||||
|
||||
/**
|
||||
* if alloc is non-null, then reads will be optimized for sequential access
|
||||
*/
|
||||
DataPage( int xid, RegionAllocator* alloc, pageid_t pid );
|
||||
/**
|
||||
* if alloc is non-null, then reads will be optimized for sequential access
|
||||
*/
|
||||
DataPage( int xid, RegionAllocator* alloc, pageid_t pid );
|
||||
|
||||
//to be used to create new data pages
|
||||
DataPage( int xid, pageid_t page_count, RegionAllocator* alloc);
|
||||
//to be used to create new data pages
|
||||
DataPage( int xid, pageid_t page_count, RegionAllocator* alloc);
|
||||
|
||||
~DataPage() {
|
||||
assert(write_offset_ == -1);
|
||||
~DataPage() {
|
||||
assert(write_offset_ == -1);
|
||||
}
|
||||
|
||||
void writes_done() {
|
||||
if(write_offset_ != -1) {
|
||||
len_t dat_len = 0; // write terminating zero.
|
||||
|
||||
write_data((const byte*)&dat_len, sizeof(dat_len), false);
|
||||
|
||||
// if writing the zero fails, later reads will fail as well, and assume EOF.
|
||||
|
||||
write_offset_ = -1;
|
||||
}
|
||||
|
||||
void writes_done() {
|
||||
if(write_offset_ != -1) {
|
||||
len_t dat_len = 0; // write terminating zero.
|
||||
}
|
||||
|
||||
write_data((const byte*)&dat_len, sizeof(dat_len), false);
|
||||
bool append(datatuple const * dat);
|
||||
bool recordRead(const typename datatuple::key_t key, size_t keySize, datatuple ** buf);
|
||||
|
||||
// if writing the zero fails, later reads will fail as well, and assume EOF.
|
||||
inline uint16_t recordCount();
|
||||
|
||||
write_offset_ = -1;
|
||||
}
|
||||
iterator begin(){return iterator(this);}
|
||||
|
||||
}
|
||||
pageid_t get_start_pid(){return first_page_;}
|
||||
int get_page_count(){return page_count_;}
|
||||
|
||||
bool append(TUPLE const * dat);
|
||||
bool recordRead(const typename TUPLE::key_t key, size_t keySize, TUPLE ** buf);
|
||||
|
||||
inline uint16_t recordCount();
|
||||
|
||||
|
||||
iterator begin(){return iterator(this);}
|
||||
|
||||
pageid_t get_start_pid(){return first_page_;}
|
||||
int get_page_count(){return page_count_;}
|
||||
|
||||
static void register_stasis_page_impl();
|
||||
static void register_stasis_page_impl();
|
||||
|
||||
private:
|
||||
|
||||
// static pageid_t dp_alloc_region(int xid, void *conf, pageid_t count);
|
||||
void initialize();
|
||||
|
||||
void initialize();
|
||||
static const uint16_t DATA_PAGE_HEADER_SIZE = sizeof(int32_t);
|
||||
static const uint16_t DATA_PAGE_SIZE = USABLE_SIZE_OF_PAGE - DATA_PAGE_HEADER_SIZE;
|
||||
typedef uint32_t len_t;
|
||||
|
||||
private:
|
||||
static const uint16_t DATA_PAGE_HEADER_SIZE = sizeof(int32_t);
|
||||
static const uint16_t DATA_PAGE_SIZE = USABLE_SIZE_OF_PAGE - DATA_PAGE_HEADER_SIZE;
|
||||
typedef uint32_t len_t;
|
||||
static inline int32_t* is_another_page_ptr(Page *p) {
|
||||
return stasis_page_int32_ptr_from_start(p,0);
|
||||
}
|
||||
static inline byte * data_at_offset_ptr(Page *p, slotid_t offset) {
|
||||
return ((byte*)(is_another_page_ptr(p)+1))+offset;
|
||||
}
|
||||
static inline len_t * length_at_offset_ptr(Page *p, slotid_t offset) {
|
||||
return (len_t*)data_at_offset_ptr(p,offset);
|
||||
}
|
||||
|
||||
static inline int32_t* is_another_page_ptr(Page *p) {
|
||||
return stasis_page_int32_ptr_from_start(p,0);
|
||||
}
|
||||
static inline byte * data_at_offset_ptr(Page *p, slotid_t offset) {
|
||||
return ((byte*)(is_another_page_ptr(p)+1))+offset;
|
||||
}
|
||||
static inline len_t * length_at_offset_ptr(Page *p, slotid_t offset) {
|
||||
return (len_t*)data_at_offset_ptr(p,offset);
|
||||
}
|
||||
inline recordid calc_chunk_from_offset(off_t offset) {
|
||||
recordid ret;
|
||||
ret.page = first_page_ + offset / DATA_PAGE_SIZE;
|
||||
ret.slot = offset % DATA_PAGE_SIZE;
|
||||
ret.size = DATA_PAGE_SIZE - ret.slot;
|
||||
assert(ret.size);
|
||||
return ret;
|
||||
}
|
||||
|
||||
inline recordid calc_chunk_from_offset(off_t offset) {
|
||||
recordid ret;
|
||||
ret.page = first_page_ + offset / DATA_PAGE_SIZE;
|
||||
ret.slot = offset % DATA_PAGE_SIZE;
|
||||
ret.size = DATA_PAGE_SIZE - ret.slot;
|
||||
assert(ret.size);
|
||||
return ret;
|
||||
}
|
||||
size_t write_bytes(const byte * buf, ssize_t remaining, Page ** latch_p = NULL);
|
||||
size_t read_bytes(byte * buf, off_t offset, ssize_t remaining);
|
||||
Page * write_data_and_latch(const byte * buf, size_t len, bool init_next = true, bool latch = true);
|
||||
bool write_data(const byte * buf, size_t len, bool init_next = true);
|
||||
bool read_data(byte * buf, off_t offset, size_t len);
|
||||
bool initialize_next_page();
|
||||
void initialize_page(pageid_t pageid);
|
||||
size_t write_bytes(const byte * buf, ssize_t remaining, Page ** latch_p = NULL);
|
||||
size_t read_bytes(byte * buf, off_t offset, ssize_t remaining);
|
||||
Page * write_data_and_latch(const byte * buf, size_t len, bool init_next = true, bool latch = true);
|
||||
bool write_data(const byte * buf, size_t len, bool init_next = true);
|
||||
bool read_data(byte * buf, off_t offset, size_t len);
|
||||
bool initialize_next_page();
|
||||
void initialize_page(pageid_t pageid);
|
||||
|
||||
int xid_;
|
||||
pageid_t page_count_;
|
||||
const pageid_t initial_page_count_;
|
||||
RegionAllocator *alloc_;
|
||||
const pageid_t first_page_;
|
||||
off_t write_offset_; // points to the next free byte (ignoring page boundaries)
|
||||
int xid_;
|
||||
pageid_t page_count_;
|
||||
const pageid_t initial_page_count_;
|
||||
RegionAllocator *alloc_;
|
||||
const pageid_t first_page_;
|
||||
off_t write_offset_; // points to the next free byte (ignoring page boundaries)
|
||||
};
|
||||
#endif
|
||||
|
|
|
@ -82,14 +82,14 @@ int diskTreeComponent::insertTuple(int xid, datatuple *t)
|
|||
return ret;
|
||||
}
|
||||
|
||||
DataPage<datatuple>* diskTreeComponent::insertDataPage(int xid, datatuple *tuple) {
|
||||
DataPage* diskTreeComponent::insertDataPage(int xid, datatuple *tuple) {
|
||||
//create a new data page -- either the last region is full, or the last data page doesn't want our tuple. (or both)
|
||||
|
||||
DataPage<datatuple> * dp = 0;
|
||||
DataPage * dp = 0;
|
||||
int count = 0;
|
||||
while(dp==0)
|
||||
{
|
||||
dp = new DataPage<datatuple>(xid, datapage_size, ltree->get_datapage_alloc());
|
||||
dp = new DataPage(xid, datapage_size, ltree->get_datapage_alloc());
|
||||
|
||||
//insert the record into the data page
|
||||
if(!dp->append(tuple))
|
||||
|
@ -131,7 +131,7 @@ datatuple * diskTreeComponent::findTuple(int xid, datatuple::key_t key, size_t k
|
|||
|
||||
if(pid!=-1)
|
||||
{
|
||||
DataPage<datatuple> * dp = new DataPage<datatuple>(xid, 0, pid);
|
||||
DataPage * dp = new DataPage(xid, 0, pid);
|
||||
dp->recordRead(key, keySize, &tup);
|
||||
delete dp;
|
||||
}
|
||||
|
@ -974,7 +974,7 @@ void diskTreeComponent::iterator::init_helper(datatuple* key1)
|
|||
lsmIterator_->value((byte**)hack);
|
||||
|
||||
curr_pageid = *pid_tmp;
|
||||
curr_page = new DataPage<datatuple>(-1, ro_alloc_, curr_pageid);
|
||||
curr_page = new DataPage(-1, ro_alloc_, curr_pageid);
|
||||
|
||||
DEBUG("opening datapage iterator %lld at key %s\n.", curr_pageid, key1 ? (char*)key1->key() : "NULL");
|
||||
dp_itr = new DPITR_T(curr_page, key1);
|
||||
|
@ -1008,7 +1008,7 @@ datatuple * diskTreeComponent::iterator::next_callerFrees()
|
|||
size_t ret = lsmIterator_->value((byte**)hack);
|
||||
assert(ret == sizeof(pageid_t));
|
||||
curr_pageid = *pid_tmp;
|
||||
curr_page = new DataPage<datatuple>(-1, ro_alloc_, curr_pageid);
|
||||
curr_page = new DataPage(-1, ro_alloc_, curr_pageid);
|
||||
DEBUG("opening datapage iterator %lld at beginning\n.", curr_pageid);
|
||||
dp_itr = new DPITR_T(curr_page->begin());
|
||||
|
||||
|
|
|
@ -87,10 +87,10 @@ class diskTreeComponent {
|
|||
|
||||
|
||||
private:
|
||||
DataPage<datatuple>* insertDataPage(int xid, datatuple *tuple);
|
||||
DataPage* insertDataPage(int xid, datatuple *tuple);
|
||||
|
||||
internalNodes * ltree;
|
||||
DataPage<datatuple>* dp;
|
||||
DataPage* dp;
|
||||
pageid_t datapage_size;
|
||||
/*mergeManager::mergeStats*/ void *stats; // XXX hack to work around circular includes.
|
||||
|
||||
|
@ -226,8 +226,8 @@ class diskTreeComponent {
|
|||
diskTreeComponent::internalNodes::iterator* lsmIterator_;
|
||||
|
||||
pageid_t curr_pageid; //current page id
|
||||
DataPage<datatuple> *curr_page; //current page
|
||||
typedef DataPage<datatuple>::iterator DPITR_T;
|
||||
DataPage *curr_page; //current page
|
||||
typedef DataPage::iterator DPITR_T;
|
||||
DPITR_T *dp_itr;
|
||||
|
||||
};
|
||||
|
|
|
@ -97,7 +97,7 @@ logtable<TUPLE>::~logtable()
|
|||
template<class TUPLE>
|
||||
void logtable<TUPLE>::init_stasis() {
|
||||
|
||||
DataPage<datatuple>::register_stasis_page_impl();
|
||||
DataPage::register_stasis_page_impl();
|
||||
stasis_buffer_manager_hint_writes_are_sequential = 1;
|
||||
Tinit();
|
||||
|
||||
|
|
|
@ -174,7 +174,7 @@ class mergeStats {
|
|||
}
|
||||
void merged_tuples(datatuple * merged, datatuple * small, datatuple * large) {
|
||||
}
|
||||
void wrote_datapage(DataPage<datatuple> *dp) {
|
||||
void wrote_datapage(DataPage *dp) {
|
||||
#if EXTENDED_STATS
|
||||
stats_num_datapages_out++;
|
||||
stats_bytes_out_with_overhead += (PAGE_SIZE * dp->get_page_count());
|
||||
|
|
|
@ -24,6 +24,7 @@ int main(int argc, char *argv[])
|
|||
int64_t c0_size = 1024 * 1024 * 512 * 1;
|
||||
int log_mode = 0; // do not log by default.
|
||||
int64_t expiry_delta = 0; // do not gc by default
|
||||
int port = simpleServer::DEFAULT_PORT;
|
||||
stasis_buffer_manager_size = 1 * 1024 * 1024 * 1024 / PAGE_SIZE; // 1.5GB total
|
||||
|
||||
for(int i = 1; i < argc; i++) {
|
||||
|
@ -41,8 +42,11 @@ int main(int argc, char *argv[])
|
|||
} else if(!strcmp(argv[i], "--expiry-delta")) {
|
||||
i++;
|
||||
expiry_delta = atoi(argv[i]);
|
||||
} else if(!strcmp(argv[i], "--port")) {
|
||||
i++;
|
||||
port = atoi(argv[i]);
|
||||
} else {
|
||||
fprintf(stderr, "Usage: %s [--test|--benchmark] [--log-mode <int>] [--expiry-delta <int>]", argv[0]);
|
||||
fprintf(stderr, "Usage: %s [--test|--benchmark] [--log-mode <int>] [--expiry-delta <int>] [--port <int>]", argv[0]);
|
||||
abort();
|
||||
}
|
||||
}
|
||||
|
@ -73,7 +77,7 @@ int main(int argc, char *argv[])
|
|||
mscheduler->start();
|
||||
ltable.replayLog();
|
||||
|
||||
simpleServer *lserver = new simpleServer(<able);
|
||||
simpleServer *lserver = new simpleServer(<able, simpleServer::DEFAULT_THREADS, port);
|
||||
|
||||
lserver->acceptLoop();
|
||||
|
||||
|
|
|
@ -20,9 +20,6 @@
|
|||
#undef begin
|
||||
#undef end
|
||||
|
||||
template class DataPage<datatuple>;
|
||||
|
||||
|
||||
void insertWithConcurrentReads(size_t NUM_ENTRIES) {
|
||||
srand(1001);
|
||||
unlink("storefile.txt");
|
||||
|
@ -58,7 +55,7 @@ void insertWithConcurrentReads(size_t NUM_ENTRIES) {
|
|||
|
||||
int pcount = 1000;
|
||||
int dpages = 0;
|
||||
DataPage<datatuple> *dp=0;
|
||||
DataPage *dp=0;
|
||||
int64_t datasize = 0;
|
||||
std::vector<pageid_t> dsp;
|
||||
size_t last_i = 0;
|
||||
|
@ -84,7 +81,7 @@ void insertWithConcurrentReads(size_t NUM_ENTRIES) {
|
|||
xid = Tbegin();
|
||||
alloc = new RegionAllocator(xid, 10000);
|
||||
|
||||
dp = new DataPage<datatuple>(xid, pcount, alloc);
|
||||
dp = new DataPage(xid, pcount, alloc);
|
||||
// printf("%lld\n", dp->get_start_pid());
|
||||
bool succ = dp->append(newtuple);
|
||||
assert(succ);
|
||||
|
@ -95,7 +92,7 @@ void insertWithConcurrentReads(size_t NUM_ENTRIES) {
|
|||
if(j >= key_arr.size()) { j = key_arr.size()-1; }
|
||||
bool found = 0;
|
||||
{
|
||||
DataPage<datatuple>::iterator it = dp->begin();
|
||||
DataPage::iterator it = dp->begin();
|
||||
datatuple * dt;
|
||||
while((dt = it.getnext()) != NULL) {
|
||||
if(!strcmp((char*)dt->rawkey(), key_arr[j].c_str())) {
|
||||
|
@ -160,7 +157,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
|
|||
|
||||
int pcount = 1000;
|
||||
int dpages = 0;
|
||||
DataPage<datatuple> *dp=0;
|
||||
DataPage *dp=0;
|
||||
int64_t datasize = 0;
|
||||
std::vector<pageid_t> dsp;
|
||||
for(size_t i = 0; i < NUM_ENTRIES; i++)
|
||||
|
@ -176,7 +173,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
|
|||
dp->writes_done();
|
||||
delete dp;
|
||||
|
||||
dp = new DataPage<datatuple>(xid, pcount, alloc);
|
||||
dp = new DataPage(xid, pcount, alloc);
|
||||
|
||||
bool succ = dp->append(newtuple);
|
||||
assert(succ);
|
||||
|
@ -204,8 +201,8 @@ void insertProbeIter(size_t NUM_ENTRIES)
|
|||
int tuplenum = 0;
|
||||
for(int i = 0; i < dpages ; i++)
|
||||
{
|
||||
DataPage<datatuple> dp(xid, 0, dsp[i]);
|
||||
DataPage<datatuple>::iterator itr = dp.begin();
|
||||
DataPage dp(xid, 0, dsp[i]);
|
||||
DataPage::iterator itr = dp.begin();
|
||||
datatuple *dt=0;
|
||||
while( (dt=itr.getnext()) != NULL)
|
||||
{
|
||||
|
|
Loading…
Reference in a new issue