pass sequential I/O hints to stasis

git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@773 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
sears 2010-04-12 20:56:54 +00:00
parent 21238f1961
commit 2f15673c80
12 changed files with 101 additions and 72 deletions

View file

@ -59,21 +59,22 @@ void DataPage<TUPLE>::register_stasis_page_impl() {
}
template <class TUPLE>
DataPage<TUPLE>::DataPage(int xid, pageid_t pid):
DataPage<TUPLE>::DataPage(int xid, RegionAllocator * alloc, pageid_t pid): // XXX Hack!! The read-only constructor signature is too close to the other's
xid_(xid),
page_count_(1), // will be opportunistically incremented as we scan the datapage.
initial_page_count_(-1), // used by append.
alloc_(0), // read-only, and we don't free data pages one at a time.
alloc_(alloc), // read-only, and we don't free data pages one at a time.
first_page_(pid),
write_offset_(-1) {
assert(pid!=0);
Page *p = loadPage(xid_, first_page_);
if(!(*is_another_page_ptr(p) == 0 || *is_another_page_ptr(p) == 2)) {
printf("Page %lld is not the start of a datapage\n", first_page_); fflush(stdout);
abort();
}
assert(*is_another_page_ptr(p) == 0 || *is_another_page_ptr(p) == 2); // would be 1 for page in the middle of a datapage
releasePage(p);
write_offset_(-1)
{
assert(pid!=0);
Page *p = alloc_ ? alloc_->load_page(xid, first_page_) : loadPage(xid, first_page_);
if(!(*is_another_page_ptr(p) == 0 || *is_another_page_ptr(p) == 2)) {
printf("Page %lld is not the start of a datapage\n", first_page_); fflush(stdout);
abort();
}
assert(*is_another_page_ptr(p) == 0 || *is_another_page_ptr(p) == 2); // would be 1 for page in the middle of a datapage
releasePage(p);
}
template <class TUPLE>
@ -99,7 +100,7 @@ void DataPage<TUPLE>::initialize_page(pageid_t pageid) {
//load the first page
Page *p;
#ifdef CHECK_FOR_SCRIBBLING
p = loadPage(xid_, pageid);
p = alloc_ ? alloc->load_page(xid_, pageid) : loadPage(xid_, pageid);
if(*stasis_page_type_ptr(p) == DATA_PAGE) {
printf("Collision on page %lld\n", (long long)pageid); fflush(stdout);
assert(*stasis_page_type_ptr(p) != DATA_PAGE);
@ -137,7 +138,7 @@ size_t DataPage<TUPLE>::write_bytes(const byte * buf, size_t remaining) {
if(chunk.page >= first_page_ + page_count_) {
chunk.size = 0; // no space (should not happen)
} else {
Page *p = loadPage(xid_, chunk.page);
Page *p = alloc_ ? alloc_->load_page(xid_, chunk.page) : loadPage(xid_, chunk.page);
memcpy(data_at_offset_ptr(p, chunk.slot), buf, chunk.size);
writelock(p->rwlatch,0);
stasis_page_lsn_write(xid_, p, alloc_->get_lsn(xid_));
@ -156,7 +157,7 @@ size_t DataPage<TUPLE>::read_bytes(byte * buf, off_t offset, size_t remaining) {
if(chunk.page >= first_page_ + page_count_) {
chunk.size = 0; // eof
} else {
Page *p = loadPage(xid_, chunk.page);
Page *p = alloc_ ? alloc_->load_page(xid_, chunk.page) : loadPage(xid_, chunk.page);
assert(p->pageType == DATA_PAGE);
if((chunk.page + 1 == page_count_ + first_page_)
&& (*is_another_page_ptr(p))) {
@ -185,7 +186,7 @@ bool DataPage<TUPLE>::initialize_next_page() {
abort();
}
Page *p = loadPage(xid_, rid.page-1);
Page *p = alloc_ ? alloc_->load_page(xid_, rid.page-1) : loadPage(xid_, rid.page-1);
*is_another_page_ptr(p) = (rid.page-1 == first_page_) ? 2 : 1;
writelock(p->rwlatch, 0);
stasis_page_lsn_write(xid_, p, alloc_->get_lsn(xid_));

View file

@ -61,8 +61,10 @@ public:
public:
//to be used when reading an existing data page from disk
DataPage( int xid, pageid_t pid );
/**
* if alloc is non-null, then reads will be optimized for sequential access
*/
DataPage( int xid, RegionAllocator* alloc, pageid_t pid );
//to be used to create new data pages
DataPage( int xid, pageid_t page_count, RegionAllocator* alloc);

View file

@ -134,7 +134,7 @@ datatuple * diskTreeComponent::findTuple(int xid, datatuple::key_t key, size_t k
if(pid!=-1)
{
DataPage<datatuple> * dp = new DataPage<datatuple>(xid, pid);
DataPage<datatuple> * dp = new DataPage<datatuple>(xid, 0, pid);
dp->recordRead(key, keySize, &tup);
delete dp;
}
@ -731,9 +731,10 @@ void diskTreeComponent::internalNodes::print_tree(int xid, pageid_t pid, int64_t
//diskTreeComponentIterator implementation
/////////////////////////////////////////////////
diskTreeComponent::internalNodes::iterator::iterator(int xid, recordid root) {
diskTreeComponent::internalNodes::iterator::iterator(int xid, RegionAllocator* ro_alloc, recordid root) {
ro_alloc_ = ro_alloc;
if(root.page == 0 && root.slot == 0 && root.size == -1) abort();
p = loadPage(xid,root.page);
p = ro_alloc_->load_page(xid,root.page);
readlock(p->rwlatch,0);
DEBUG("ROOT_REC_SIZE %d\n", diskTreeComponent::internalNodes::root_rec_size);
@ -749,7 +750,7 @@ diskTreeComponent::internalNodes::iterator::iterator(int xid, recordid root) {
unlock(p->rwlatch);
releasePage(p);
p = loadPage(xid,leafid);
p = ro_alloc_->load_page(xid,leafid);
readlock(p->rwlatch,0);
assert(depth != 0);
} else {
@ -770,10 +771,10 @@ diskTreeComponent::internalNodes::iterator::iterator(int xid, recordid root) {
justOnePage = (depth == 0);
}
diskTreeComponent::internalNodes::iterator::iterator(int xid, recordid root, const byte* key, len_t keylen) {
diskTreeComponent::internalNodes::iterator::iterator(int xid, RegionAllocator* ro_alloc, recordid root, const byte* key, len_t keylen) {
if(root.page == NULLRID.page && root.slot == NULLRID.slot) abort();
p = loadPage(xid,root.page);
ro_alloc_ = ro_alloc;
p = ro_alloc_->load_page(xid,root.page);
readlock(p->rwlatch,0);
recordid rid = {p->id, diskTreeComponent::internalNodes::DEPTH, diskTreeComponent::internalNodes::root_rec_size};
@ -795,7 +796,7 @@ diskTreeComponent::internalNodes::iterator::iterator(int xid, recordid root, con
{
unlock(p->rwlatch);
releasePage(p);
p = loadPage(xid,lsm_entry_rid.page);
p = ro_alloc->load_page(xid,lsm_entry_rid.page);
readlock(p->rwlatch,0);
}
@ -835,7 +836,7 @@ int diskTreeComponent::internalNodes::iterator::next()
DEBUG("done with page %lld next = %lld\n", p->id, next_rec.ptr);
if(next_rec != -1 && ! justOnePage) {
p = loadPage(xid_, next_rec);
p = ro_alloc_->load_page(xid_, next_rec);
readlock(p->rwlatch,0);
current.page = next_rec;
current.slot = 2;
@ -887,14 +888,15 @@ void diskTreeComponent::iterator::init_iterators(datatuple * key1, datatuple * k
lsmIterator_ = NULL;
} else {
if(key1) {
lsmIterator_ = new diskTreeComponent::internalNodes::iterator(-1, tree_, key1->key(), key1->keylen());
lsmIterator_ = new diskTreeComponent::internalNodes::iterator(-1, ro_alloc_, tree_, key1->key(), key1->keylen());
} else {
lsmIterator_ = new diskTreeComponent::internalNodes::iterator(-1, tree_);
lsmIterator_ = new diskTreeComponent::internalNodes::iterator(-1, ro_alloc_, tree_);
}
}
}
diskTreeComponent::iterator::iterator(diskTreeComponent::internalNodes *tree) :
ro_alloc_(new RegionAllocator()),
tree_(tree ? tree->get_root_rec() : NULLRID)
{
init_iterators(NULL, NULL);
@ -902,6 +904,7 @@ diskTreeComponent::iterator::iterator(diskTreeComponent::internalNodes *tree) :
}
diskTreeComponent::iterator::iterator(diskTreeComponent::internalNodes *tree, datatuple* key) :
ro_alloc_(new RegionAllocator()),
tree_(tree ? tree->get_root_rec() : NULLRID)
{
init_iterators(key,NULL);
@ -909,20 +912,16 @@ diskTreeComponent::iterator::iterator(diskTreeComponent::internalNodes *tree, da
}
diskTreeComponent::iterator::~iterator()
{
if(lsmIterator_) {
lsmIterator_->close();
delete lsmIterator_;
}
if(curr_page!=NULL)
{
delete curr_page;
curr_page = 0;
}
diskTreeComponent::iterator::~iterator() {
if(lsmIterator_) {
lsmIterator_->close();
delete lsmIterator_;
}
delete curr_page;
curr_page = 0;
delete ro_alloc_;
}
void diskTreeComponent::iterator::init_helper(datatuple* key1)
@ -948,7 +947,7 @@ void diskTreeComponent::iterator::init_helper(datatuple* key1)
lsmIterator_->value((byte**)hack);
curr_pageid = *pid_tmp;
curr_page = new DataPage<datatuple>(-1, curr_pageid);
curr_page = new DataPage<datatuple>(-1, ro_alloc_, curr_pageid);
DEBUG("opening datapage iterator %lld at key %s\n.", curr_pageid, key1 ? (char*)key1->key() : "NULL");
dp_itr = new DPITR_T(curr_page, key1);
@ -982,7 +981,7 @@ datatuple * diskTreeComponent::iterator::next_callerFrees()
size_t ret = lsmIterator_->value((byte**)hack);
assert(ret == sizeof(pageid_t));
curr_pageid = *pid_tmp;
curr_page = new DataPage<datatuple>(-1, curr_pageid);
curr_page = new DataPage<datatuple>(-1, ro_alloc_, curr_pageid);
DEBUG("opening datapage iterator %lld at beginning\n.", curr_pageid);
dp_itr = new DPITR_T(curr_page->begin());

View file

@ -143,8 +143,8 @@ class diskTreeComponent {
public:
class iterator {
public:
iterator(int xid, recordid root);
iterator(int xid, recordid root, const byte* key, len_t keylen);
iterator(int xid, RegionAllocator *ro_alloc, recordid root);
iterator(int xid, RegionAllocator *ro_alloc, recordid root, const byte* key, len_t keylen);
int next();
void close();
@ -162,7 +162,7 @@ class diskTreeComponent {
inline void releaseLock() { }
private:
RegionAllocator * ro_alloc_;
Page * p;
int xid_;
bool done;
@ -193,14 +193,15 @@ class diskTreeComponent {
int operator-(iterator & t) { abort(); }
private:
recordid tree_; //root of the tree
RegionAllocator * ro_alloc_; // has a filehandle that we use to optimize sequential scans.
recordid tree_; //root of the tree
diskTreeComponent::internalNodes::iterator* lsmIterator_;
diskTreeComponent::internalNodes::iterator* lsmIterator_;
pageid_t curr_pageid; //current page id
DataPage<datatuple> *curr_page; //current page
typedef DataPage<datatuple>::iterator DPITR_T;
DPITR_T *dp_itr;
pageid_t curr_pageid; //current page id
DataPage<datatuple> *curr_page; //current page
typedef DataPage<datatuple>::iterator DPITR_T;
DPITR_T *dp_itr;
};
};
#endif /* DISKTREECOMPONENT_H_ */

View file

@ -5,6 +5,7 @@
#include "merger.h"
#include "logstore.h"
#include "regionAllocator.h"
#include "network.h"
@ -617,7 +618,8 @@ int op_stat_histogram(pthread_data* data, size_t limit) {
}
int xid = Tbegin();
diskTreeComponent::internalNodes::iterator * it = new diskTreeComponent::internalNodes::iterator(xid, data->ltable->get_tree_c2()->get_root_rid());
RegionAllocator * ro_alloc = new RegionAllocator();
diskTreeComponent::internalNodes::iterator * it = new diskTreeComponent::internalNodes::iterator(xid, ro_alloc, data->ltable->get_tree_c2()->get_root_rid());
size_t count = 0;
int err = 0;
@ -643,7 +645,7 @@ int op_stat_histogram(pthread_data* data, size_t limit) {
size_t cur_stride = 0;
size_t i = 0;
it = new diskTreeComponent::internalNodes::iterator(xid, data->ltable->get_tree_c2()->get_root_rid()); // TODO make this method private?
it = new diskTreeComponent::internalNodes::iterator(xid, ro_alloc, data->ltable->get_tree_c2()->get_root_rid()); // TODO make this method private?
while(it->next()) {
i++;
if(i == count || !cur_stride) { // do we want to send this key? (this matches the first, last and interior keys)
@ -661,6 +663,7 @@ int op_stat_histogram(pthread_data* data, size_t limit) {
it->close();
delete(it);
delete(ro_alloc);
if(!err){ err = writeendofiteratortosocket(*(data->workitem)); }
Tcommit(xid);
return err;

View file

@ -20,22 +20,36 @@ public:
// Open an existing region allocator.
RegionAllocator(int xid, recordid rid) :
nextPage_(INVALID_PAGE),
endOfRegion_(INVALID_PAGE) {
endOfRegion_(INVALID_PAGE),
bm_((stasis_buffer_manager_t*)stasis_runtime_buffer_manager()),
bmh_(bm_->openHandleImpl(bm_, 1)) {
rid_ = rid;
Tread(xid, rid_, &header_);
regionCount_ = TarrayListLength(xid, header_.region_list);
}
// Create a new region allocator.
RegionAllocator(int xid, pageid_t region_length) :
nextPage_(0),
endOfRegion_(0),
regionCount_(0)
{
rid_ = Talloc(xid, sizeof(header_));
header_.region_list = TarrayListAlloc(xid, 1, 2, sizeof(pageid_t));
header_.region_length = region_length;
Tset(xid, rid_, &header_);
}
RegionAllocator(int xid, pageid_t region_length) :
nextPage_(0),
endOfRegion_(0),
regionCount_(0),
bm_((stasis_buffer_manager_t*)stasis_runtime_buffer_manager()),
bmh_(bm_->openHandleImpl(bm_, 1))
{
rid_ = Talloc(xid, sizeof(header_));
header_.region_list = TarrayListAlloc(xid, 1, 2, sizeof(pageid_t));
header_.region_length = region_length;
Tset(xid, rid_, &header_);
}
explicit RegionAllocator() :
nextPage_(INVALID_PAGE),
endOfRegion_(INVALID_PAGE),
bm_((stasis_buffer_manager_t*)stasis_runtime_buffer_manager()),
bmh_(bm_->openHandleImpl(bm_, 1)){
rid_.page = INVALID_PAGE;
regionCount_ = -1;
}
Page * load_page(int xid, pageid_t p) { return bm_->loadPageImpl(bm_, bmh_, xid, p, UNKNOWN_TYPE_PAGE); }
// XXX handle disk full?
pageid_t alloc_extent(int xid, pageid_t extent_length) {
assert(nextPage_ != INVALID_PAGE);
@ -67,8 +81,9 @@ public:
list_entry.slot < regionCount; list_entry.slot++) {
pageid_t pid;
Tread(xid, list_entry, &pid);
TregionForce(xid, pid);
TregionForce(xid, bm_, bmh_, pid);
}
bm_->closeHandleImpl(bm_, bmh_);
}
void dealloc_regions(int xid) {
pageid_t regionCount = TarrayListLength(xid, header_.region_list);
@ -121,6 +136,8 @@ private:
pageid_t nextPage_;
pageid_t endOfRegion_;
pageid_t regionCount_;
stasis_buffer_manager_t * bm_;
stasis_buffer_manager_handle_t *bmh_;
persistent_state header_;
public:
static const size_t header_size = sizeof(persistent_state);

View file

@ -100,7 +100,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
int tuplenum = 0;
for(int i = 0; i < dpages ; i++)
{
DataPage<datatuple> dp(xid, dsp[i]);
DataPage<datatuple> dp(xid, 0, dsp[i]);
DataPage<datatuple>::iterator itr = dp.begin();
datatuple *dt=0;
while( (dt=itr.getnext()) != NULL)

View file

@ -5,6 +5,7 @@
#include "logstore.h"
#include "regionAllocator.h"
int main(int argc, char **argv)
{
@ -24,9 +25,12 @@ int main(int argc, char **argv)
Tcommit(xid);
xid = Tbegin();
diskTreeComponent::internalNodes::iterator * it = new diskTreeComponent::internalNodes::iterator(xid,ltable.get_tree_c2()->get_root_rid() );
RegionAllocator * ro_alloc = new RegionAllocator();
diskTreeComponent::internalNodes::iterator * it = new diskTreeComponent::internalNodes::iterator(xid,ro_alloc, ltable.get_tree_c2()->get_root_rid() );
it->close();
delete it;
delete ro_alloc;
Tcommit(xid);
diskTreeComponent::internalNodes::deinit_stasis();

View file

@ -4,6 +4,7 @@
#include <iostream>
#include <sstream>
#include "logstore.h"
#include "regionAllocator.h"
#include "diskTreeComponent.h"
#include <assert.h>
@ -114,7 +115,8 @@ void insertProbeIter_str(int NUM_ENTRIES)
int64_t count = 0;
diskTreeComponent::internalNodes::iterator * it = new diskTreeComponent::internalNodes::iterator(xid, lt->get_root_rec());
RegionAllocator * ro_alloc = new RegionAllocator();
diskTreeComponent::internalNodes::iterator * it = new diskTreeComponent::internalNodes::iterator(xid, ro_alloc, lt->get_root_rec());
while(it->next()) {
byte * key;
@ -133,7 +135,7 @@ void insertProbeIter_str(int NUM_ENTRIES)
it->close();
delete it;
delete ro_alloc;
Tcommit(xid);
diskTreeComponent::internalNodes::deinit_stasis();
}

View file

@ -57,7 +57,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
int lindex = mscheduler.addlogtable(&ltable);
ltable.setMergeData(mscheduler.getMergeData(lindex));
mscheduler.startlogtable(lindex);
mscheduler.startlogtable(lindex, 10 * 1024 * 1024);
unlock(ltable.header_lock);
printf("Stage 1: Writing %d keys\n", NUM_ENTRIES);

View file

@ -56,7 +56,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
int lindex = mscheduler.addlogtable(&ltable);
ltable.setMergeData(mscheduler.getMergeData(lindex));
mscheduler.startlogtable(lindex);
mscheduler.startlogtable(lindex, 10 * 1024 * 1024);
unlock(ltable.header_lock);
printf("Stage 1: Writing %d keys\n", NUM_ENTRIES);

View file

@ -111,7 +111,7 @@ void insertProbeIter(size_t NUM_ENTRIES)
int lindex = mscheduler.addlogtable(&ltable);
ltable.setMergeData(mscheduler.getMergeData(lindex));
mscheduler.startlogtable(lindex);
mscheduler.startlogtable(lindex, 10 * 1024 * 1024);
unlock(ltable.header_lock);
printf("Stage 1: Writing %d keys\n", NUM_ENTRIES);