eliminate "lastLeaf" parameter from all functions in the diskTreeComponent API

git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@686 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
sears 2010-03-09 23:48:42 +00:00
parent 5f0d5c4f97
commit dc8185a236
4 changed files with 30 additions and 40 deletions

View file

@ -19,14 +19,6 @@ static void dataPageFlushed(Page* p) {
}
static int notSupported(int xid, Page * p) { return 0; }
static lsn_t get_lsn(int xid) {
lsn_t xid_lsn = stasis_transaction_table_get((stasis_transaction_table_t*)stasis_runtime_transaction_table(), xid)->prevLSN;
lsn_t log_lsn = ((stasis_log_t*)stasis_log())->next_available_lsn((stasis_log_t*)stasis_log());
lsn_t ret = xid_lsn == INVALID_LSN ? log_lsn-1 : xid_lsn;
assert(ret != INVALID_LSN);
return ret;
}
END_C_DECLS
template <class TUPLE>
@ -128,7 +120,7 @@ void DataPage<TUPLE>::initialize_page(pageid_t pageid) {
*length_at_offset_ptr(p, calc_chunk_from_offset(write_offset_).slot) = 0;
//set the page dirty
stasis_page_lsn_write(xid_, p, get_lsn(xid_));
stasis_page_lsn_write(xid_, p, alloc_->get_lsn(xid_));
//release the page
unlock(p->rwlatch);
@ -146,7 +138,7 @@ size_t DataPage<TUPLE>::write_bytes(const byte * buf, size_t remaining) {
Page *p = loadPage(xid_, chunk.page);
memcpy(data_at_offset_ptr(p, chunk.slot), buf, chunk.size);
writelock(p->rwlatch,0);
stasis_page_lsn_write(xid_, p, get_lsn(xid_));
stasis_page_lsn_write(xid_, p, alloc_->get_lsn(xid_));
unlock(p->rwlatch);
releasePage(p);
write_offset_ += chunk.size;
@ -194,7 +186,7 @@ bool DataPage<TUPLE>::initialize_next_page() {
Page *p = loadPage(xid_, rid.page-1);
*is_another_page_ptr(p) = (rid.page-1 == first_page_) ? 2 : 1;
writelock(p->rwlatch, 0);
stasis_page_lsn_write(xid_, p, get_lsn(xid_));
stasis_page_lsn_write(xid_, p, alloc_->get_lsn(xid_));
unlock(p->rwlatch);
releasePage(p);

View file

@ -32,15 +32,6 @@ const size_t diskTreeComponent::internalNodes::root_rec_size = sizeof(int64_t);
const int64_t diskTreeComponent::internalNodes::PREV_LEAF = 0; //pointer to prev leaf page
const int64_t diskTreeComponent::internalNodes::NEXT_LEAF = 1; //pointer to next leaf page
// XXX hack, and cut and pasted from datapage.cpp.
static lsn_t get_lsn(int xid) {
lsn_t xid_lsn = stasis_transaction_table_get((stasis_transaction_table_t*)stasis_runtime_transaction_table(), xid)->prevLSN;
lsn_t log_lsn = ((stasis_log_t*)stasis_log())->next_available_lsn((stasis_log_t*)stasis_log());
lsn_t ret = xid_lsn == INVALID_LSN ? log_lsn-1 : xid_lsn;
assert(ret != INVALID_LSN);
return ret;
}
// TODO move init_stasis to a more appropriate module
void diskTreeComponent::internalNodes::init_stasis() {
@ -87,7 +78,7 @@ recordid diskTreeComponent::internalNodes::create(int xid) {
stasis_record_write(xid, p, tmp, (byte*)&zero);
stasis_page_lsn_write(xid, p, get_lsn(xid));
stasis_page_lsn_write(xid, p, internal_node_alloc->get_lsn(xid));
unlock(p->rwlatch);
releasePage(p);
@ -105,7 +96,7 @@ void diskTreeComponent::internalNodes::writeNodeRecord(int xid, Page * p, record
nr->ptr = ptr;
memcpy(nr+1, key, keylen);
stasis_record_write_done(xid, p, rid, (byte*)nr);
stasis_page_lsn_write(xid, p, get_lsn(xid));
stasis_page_lsn_write(xid, p, internal_node_alloc->get_lsn(xid));
}
void diskTreeComponent::internalNodes::initializeNodePage(int xid, Page *p) {
@ -160,8 +151,7 @@ recordid diskTreeComponent::internalNodes::appendPage(int xid,
assert(tree.page == p->id);
ret = appendInternalNode(xid, p, depth, key, keySize, val_page,
lastLeaf == tree.page ? -1 : lastLeaf);
ret = appendInternalNode(xid, p, depth, key, keySize, val_page);
if(ret.size == INVALID_SLOT) {
DEBUG("Need to split root; depth = %d\n", depth);
@ -226,7 +216,7 @@ recordid diskTreeComponent::internalNodes::appendPage(int xid,
stasis_record_write(xid, lc, rid, (byte*)&tmpid);
}
stasis_page_lsn_write(xid, lc, get_lsn(xid));
stasis_page_lsn_write(xid, lc, internal_node_alloc->get_lsn(xid));
unlock(lc->rwlatch);
releasePage(lc);
@ -236,8 +226,7 @@ recordid diskTreeComponent::internalNodes::appendPage(int xid,
stasis_record_write(xid, p, depth_rid, (byte*)(&depth));
assert(tree.page == p->id);
ret = appendInternalNode(xid, p, depth, key, keySize, val_page,
lastLeaf == tree.page ? -1 : lastLeaf);
ret = appendInternalNode(xid, p, depth, key, keySize, val_page);
assert(ret.size != INVALID_SLOT);
@ -265,7 +254,7 @@ recordid diskTreeComponent::internalNodes::appendPage(int xid,
}
}
stasis_page_lsn_write(xid, p, get_lsn(xid));
stasis_page_lsn_write(xid, p, internal_node_alloc->get_lsn(xid));
unlock(p->rwlatch);
releasePage(p);
@ -290,7 +279,7 @@ recordid diskTreeComponent::internalNodes::appendPage(int xid,
recordid diskTreeComponent::internalNodes::appendInternalNode(int xid, Page *p,
int64_t depth,
const byte *key, size_t key_len,
pageid_t val_page, pageid_t lastLeaf) {
pageid_t val_page) {
assert(p->pageType == SLOTTED_PAGE);
@ -302,6 +291,7 @@ recordid diskTreeComponent::internalNodes::appendInternalNode(int xid, Page *p,
if(ret.size != INVALID_SLOT) {
stasis_record_alloc_done(xid, p, ret);
writeNodeRecord(xid,p,ret,key,key_len,val_page);
stasis_page_lsn_write(xid, p, internal_node_alloc->get_lsn(xid));
}
} else {
@ -318,7 +308,7 @@ recordid diskTreeComponent::internalNodes::appendInternalNode(int xid, Page *p,
Page *child_page = loadPage(xid, child_id);
writelock(child_page->rwlatch,0);
ret = appendInternalNode(xid, child_page, depth-1, key, key_len,
val_page, lastLeaf);
val_page);
unlock(child_page->rwlatch);
releasePage(child_page);
@ -334,8 +324,7 @@ recordid diskTreeComponent::internalNodes::appendInternalNode(int xid, Page *p,
readRecordLength(xid, p, slot));
if(ret.size != INVALID_SLOT) {
stasis_record_alloc_done(xid, p, ret);
ret = buildPathToLeaf(xid, ret, p, depth, key, key_len, val_page,
lastLeaf);
ret = buildPathToLeaf(xid, ret, p, depth, key, key_len, val_page);
DEBUG("split tree rooted at %lld, wrote value to {%d %d %lld}\n",
p->id, ret.page, ret.slot, ret.size);
@ -352,7 +341,7 @@ recordid diskTreeComponent::internalNodes::appendInternalNode(int xid, Page *p,
recordid diskTreeComponent::internalNodes::buildPathToLeaf(int xid, recordid root, Page *root_p,
int64_t depth, const byte *key, size_t key_len,
pageid_t val_page, pageid_t lastLeaf) {
pageid_t val_page) {
// root is the recordid on the root page that should point to the
// new subtree.
@ -375,7 +364,7 @@ recordid diskTreeComponent::internalNodes::buildPathToLeaf(int xid, recordid roo
stasis_record_alloc_done(xid, child_p, child_rec);
ret = buildPathToLeaf(xid, child_rec, child_p, depth-1, key, key_len,
val_page,lastLeaf);
val_page);
unlock(child_p->rwlatch);
releasePage(child_p);
@ -402,16 +391,16 @@ recordid diskTreeComponent::internalNodes::buildPathToLeaf(int xid, recordid roo
ret = leaf_rec;
stasis_page_lsn_write(xid, child_p, get_lsn(xid));
stasis_page_lsn_write(xid, child_p, internal_node_alloc->get_lsn(xid));
unlock(child_p->rwlatch);
releasePage(child_p);
if(lastLeaf != -1) {
if(lastLeaf != -1 && lastLeaf != root_rec.page) {
// install forward link in previous page
Page *lastLeafP = loadPage(xid, lastLeaf);
writelock(lastLeafP->rwlatch,0);
recordid last_next_leaf_rid = {lastLeaf, NEXT_LEAF, root_rec_size };
stasis_record_write(xid,lastLeafP,last_next_leaf_rid,(byte*)&child);
stasis_page_lsn_write(xid, lastLeafP, get_lsn(xid));
stasis_page_lsn_write(xid, lastLeafP, internal_node_alloc->get_lsn(xid));
unlock(lastLeafP->rwlatch);
releasePage(lastLeafP);
}

View file

@ -52,7 +52,7 @@ public:
static void deinit_stasis();
private:
static void writeNodeRecord(int xid, Page *p, recordid &rid,
void writeNodeRecord(int xid, Page *p, recordid &rid,
const byte *key, size_t keylen, pageid_t ptr);
//reads the given record and returns the page id stored in it
static pageid_t lookupLeafPageFromRid(int xid, recordid rid);
@ -60,11 +60,11 @@ public:
recordid appendInternalNode(int xid, Page *p,
int64_t depth,
const byte *key, size_t key_len,
pageid_t val_page, pageid_t lastLeaf);
pageid_t val_page);
recordid buildPathToLeaf(int xid, recordid root, Page *root_p,
int64_t depth, const byte *key, size_t key_len,
pageid_t val_page, pageid_t lastLeaf);
pageid_t val_page);
/**
Initialize a page for use as an internal node of the tree.

View file

@ -97,6 +97,15 @@ public:
endOfRegion_ = INVALID_PAGE;
}
recordid header_rid() { return rid_; }
lsn_t get_lsn(int xid) {
lsn_t xid_lsn = stasis_transaction_table_get((stasis_transaction_table_t*)stasis_runtime_transaction_table(), xid)->prevLSN;
lsn_t log_lsn = ((stasis_log_t*)stasis_log())->next_available_lsn((stasis_log_t*)stasis_log());
lsn_t ret = xid_lsn == INVALID_LSN ? log_lsn-1 : xid_lsn;
assert(ret != INVALID_LSN);
return ret;
}
private:
typedef struct {
recordid region_list;