remove most of the read/write record wrappers from diskTreeComponent

git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@670 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
sears 2010-03-08 23:58:13 +00:00
parent 0a356875b2
commit 85ca7a935c
3 changed files with 113 additions and 119 deletions

View file

@ -43,6 +43,7 @@ static lsn_t get_lsn(int xid) {
return ret;
}
// TODO move init_stasis to a more appropriate module
void diskTreeComponent::init_stasis() {
@ -183,7 +184,8 @@ recordid diskTreeComponent::create(int xid) {
&& tmp.size == root_rec_size);
int64_t zero = 0;
writeRecord(xid, p, tmp, (byte*)&zero, root_rec_size);
assert(sizeof(zero) == root_rec_size);
stasis_record_write(xid, p, tmp, (byte*)&zero);
tmp = stasis_record_alloc_begin(xid, p, root_rec_size);
stasis_record_alloc_done(xid,p,tmp);
@ -192,7 +194,9 @@ recordid diskTreeComponent::create(int xid) {
&& tmp.slot == COMPARATOR
&& tmp.size == root_rec_size);
writeRecord(xid, p, tmp, (byte*)&COMPARATOR, root_rec_size);
stasis_record_write(xid, p, tmp, (byte*)&zero);
stasis_page_lsn_write(xid, p, get_lsn(xid));
unlock(p->rwlatch);
releasePage(p);
@ -202,20 +206,6 @@ recordid diskTreeComponent::create(int xid) {
return ret;
}
// XXX remove the next N records, which are completely redundant.
/**
* TODO: what happen if there is already such a record with a different size?
* I guess this should never happen in rose, but what if?
**/
void diskTreeComponent::writeRecord(int xid, Page *p, recordid &rid,
const byte *data, size_t datalen) {
byte *byte_arr = stasis_record_write_begin(xid, p, rid);
memcpy(byte_arr, data, datalen); //TODO: stasis write call
stasis_record_write_done(xid, p, rid, byte_arr);
stasis_page_lsn_write(xid, p, get_lsn(xid));
}
void diskTreeComponent::writeNodeRecord(int xid, Page * p, recordid & rid,
const byte *key, size_t keylen, pageid_t ptr) {
DEBUG("writenoderecord:\tp->id\t%lld\tkey:\t%s\tkeylen: %d\tval_page\t%lld\n",
@ -227,40 +217,6 @@ void diskTreeComponent::writeNodeRecord(int xid, Page * p, recordid & rid,
stasis_page_lsn_write(xid, p, get_lsn(xid));
}
void diskTreeComponent::writeRecord(int xid, Page *p, slotid_t slot,
const byte *data, size_t datalen) {
recordid rid;
rid.page = p->id;
rid.slot = slot;
rid.size = datalen;
byte *byte_arr = stasis_record_write_begin(xid, p, rid);
memcpy(byte_arr, data, datalen); //TODO: stasis write call
stasis_record_write_done(xid, p, rid, byte_arr);
stasis_page_lsn_write(xid, p, get_lsn(xid));
}
const byte* diskTreeComponent::readRecord(int xid, Page * p, recordid &rid) {
const byte *nr = stasis_record_read_begin(xid,p,rid); // XXX API violation?
return nr;
}
const byte* diskTreeComponent::readRecord(int xid, Page * p, slotid_t slot, int64_t size) {
recordid rid;
rid.page = p->id;
rid.slot = slot;
rid.size = size;
const byte *nr = stasis_record_read_begin(xid,p,rid);
return nr;
}
int32_t diskTreeComponent::readRecordLength(int xid, Page *p, slotid_t slot) {
recordid rec = {p->id, slot, 0};
int32_t reclen = stasis_record_length_read(xid, p, rec);
return reclen;
}
void diskTreeComponent::initializeNodePage(int xid, Page *p) {
stasis_page_slotted_initialize_page(p);
recordid reserved1 = stasis_record_alloc_begin(xid, p, sizeof(indexnode_rec));
@ -277,10 +233,12 @@ recordid diskTreeComponent::appendPage(int xid, recordid tree, pageid_t & rmLeaf
Page *p = loadPage(xid, tree.page);
writelock(p->rwlatch, 0);
tree.slot = 0;
tree.slot = DEPTH;
tree.size = 0;
const indexnode_rec *nr = (const indexnode_rec*)readRecord(xid, p , DEPTH, 0);
const indexnode_rec *nr = (const indexnode_rec*)stasis_record_read_begin(xid, p, tree);
int64_t depth = *((int64_t*)nr);
stasis_record_read_done(xid, p, tree, (const byte*)nr);
if(rmLeafID == -1) {
rmLeafID = findLastLeaf(xid, p, depth);
@ -327,19 +285,21 @@ recordid diskTreeComponent::appendPage(int xid, recordid tree, pageid_t & rmLeaf
//creates a copy of the root page records in the
//newly allocated child page
slotid_t numslots = stasis_record_last(xid, p).slot+1;
for(int i = FIRST_SLOT; i < numslots; i++) {
recordid rid;
rid.page = p->id;
for(rid.slot = FIRST_SLOT; rid.slot < numslots; rid.slot++) {
//read the record from the root page
const indexnode_rec *nr = (const indexnode_rec*)readRecord(xid,p,i,0);
int reclen = readRecordLength(xid, p, i);
rid.size = stasis_record_length_read(xid, p, rid);
const indexnode_rec *nr = (const indexnode_rec*)stasis_record_read_begin(xid, p, rid);
recordid cnext = stasis_record_alloc_begin(xid, lc,reclen);
recordid cnext = stasis_record_alloc_begin(xid, lc,rid.size);
assert(i == cnext.slot);
assert(rid.slot == cnext.slot);
assert(cnext.size != INVALID_SLOT);
stasis_record_alloc_done(xid, lc, cnext);
writeRecord(xid,lc,i,(byte*)(nr),reclen);
stasis_record_write(xid, lc, cnext, (byte*)nr);
stasis_record_read_done(xid, p, rid, (const byte*)nr);
}
// deallocate old entries, and update pointer on parent node.
@ -366,21 +326,24 @@ recordid diskTreeComponent::appendPage(int xid, recordid tree, pageid_t & rmLeaf
// don't overwrite key...
nr->ptr = child;
stasis_record_write_done(xid,p,pFirstSlot,(byte*)nr);
stasis_page_lsn_write(xid, p, get_lsn(xid));
if(!depth) {
rmLeafID = lc->id;
pageid_t tmpid = -1;
writeRecord(xid,lc,PREV_LEAF,(byte*)(&tmpid), root_rec_size);
writeRecord(xid,lc,NEXT_LEAF,(byte*)(&tmpid), root_rec_size);
recordid rid = { lc->id, PREV_LEAF, root_rec_size };
stasis_record_write(xid, lc, rid, (byte*)&tmpid);
rid.slot = NEXT_LEAF;
stasis_record_write(xid, lc, rid, (byte*)&tmpid);
}
stasis_page_lsn_write(xid, lc, get_lsn(xid));
unlock(lc->rwlatch);
releasePage(lc);
//update the depth info at the root
depth ++;
writeRecord(xid,p,DEPTH,(byte*)(&depth), root_rec_size);
recordid depth_rid = { p->id, DEPTH, root_rec_size };
stasis_record_write(xid, p, depth_rid, (byte*)(&depth));
assert(tree.page == p->id);
ret = appendInternalNode(xid, p, depth, key, keySize, val_page,
@ -397,7 +360,6 @@ recordid diskTreeComponent::appendPage(int xid, recordid tree, pageid_t & rmLeaf
rmLeafID = ret.page;
DEBUG("lastleaf is %lld\n", rmLeafID);
} else {
// write the new value to an existing page
DEBUG("Writing %s\t%d to existing page# %lld\n", datatuple::key_to_str(key).c_str(),
@ -414,6 +376,8 @@ recordid diskTreeComponent::appendPage(int xid, recordid tree, pageid_t & rmLeaf
}
}
stasis_page_lsn_write(xid, p, get_lsn(xid));
unlock(p->rwlatch);
releasePage(p);
@ -455,11 +419,13 @@ recordid diskTreeComponent::appendInternalNode(int xid, Page *p,
} else {
// recurse
slotid_t slot = stasis_record_last(xid, p).slot;
recordid last_rid = stasis_record_last(xid, p);
assert(last_rid.slot >= FIRST_SLOT); // there should be no empty nodes
const indexnode_rec *nr = (const indexnode_rec*)stasis_record_read_begin(xid, p, last_rid);
assert(slot >= FIRST_SLOT); // there should be no empty nodes
const indexnode_rec *nr = (const indexnode_rec*)readRecord(xid, p, slot, 0);
pageid_t child_id = nr->ptr;
stasis_record_read_done(xid, p, last_rid, (const byte*)nr);
nr = 0;
{
Page *child_page = loadPage(xid, child_id);
@ -532,13 +498,14 @@ recordid diskTreeComponent::buildPathToLeaf(int xid, recordid root, Page *root_p
} else {
// set leaf
// backward link.//these writes do not need alloc_begin as it is done in page initialization
writeRecord(xid, child_p, PREV_LEAF, (byte*)(&lastLeaf), root_rec_size);
// backward link. records were alloced by page initialization
recordid prev_leaf_rid = { child_p->id, PREV_LEAF, root_rec_size };
stasis_record_write(xid, child_p, prev_leaf_rid, (byte*)&lastLeaf);
// forward link (initialize to -1)
pageid_t tmp_pid = -1;
writeRecord(xid, child_p, NEXT_LEAF, (byte*)(&tmp_pid), root_rec_size);
recordid next_leaf_rid = { child_p->id, NEXT_LEAF, root_rec_size };
stasis_record_write(xid, child_p, next_leaf_rid, (byte*)&tmp_pid);
recordid leaf_rec = stasis_record_alloc_begin(xid, child_p,
sizeof(indexnode_rec)+key_len);
@ -550,13 +517,16 @@ recordid diskTreeComponent::buildPathToLeaf(int xid, recordid root, Page *root_p
ret = leaf_rec;
stasis_page_lsn_write(xid, child_p, get_lsn(xid));
unlock(child_p->rwlatch);
releasePage(child_p);
if(lastLeaf != -1) {
// install forward link in previous page
Page *lastLeafP = loadPage(xid, lastLeaf);
writelock(lastLeafP->rwlatch,0);
writeRecord(xid,lastLeafP,NEXT_LEAF,(byte*)(&child),root_rec_size);
recordid last_next_leaf_rid = {lastLeaf, NEXT_LEAF, root_rec_size };
stasis_record_write(xid,lastLeafP,last_next_leaf_rid,(byte*)&child);
stasis_page_lsn_write(xid, lastLeafP, get_lsn(xid));
unlock(lastLeafP->rwlatch);
releasePage(lastLeafP);
}
@ -579,12 +549,14 @@ pageid_t diskTreeComponent::findLastLeaf(int xid, Page *root, int64_t depth) {
DEBUG("Found last leaf = %lld\n", root->id);
return root->id;
} else {
const indexnode_rec *nr = (indexnode_rec*) readRecord(xid, root, stasis_record_last(xid, root).slot, 0);
pageid_t ret;
recordid last_rid = stasis_record_last(xid, root);
const indexnode_rec *nr = (const indexnode_rec*) stasis_record_read_begin(xid, root, last_rid);
Page *p = loadPage(xid, nr->ptr);
stasis_record_read_done(xid, root, last_rid, (const byte*)nr);
readlock(p->rwlatch,0);
ret = findLastLeaf(xid,p,depth-1);
pageid_t ret = findLastLeaf(xid,p,depth-1);
unlock(p->rwlatch);
releasePage(p);
@ -601,8 +573,10 @@ pageid_t diskTreeComponent::findFirstLeaf(int xid, Page *root, int64_t depth) {
if(!depth) { //if depth is 0, then returns the id of the page
return root->id;
} else {
const indexnode_rec *nr = (indexnode_rec*)readRecord(xid,root,FIRST_SLOT,0);
recordid rid = {root->id, FIRST_SLOT, 0};
const indexnode_rec *nr = (const indexnode_rec*)stasis_record_read_begin(xid, root, rid);
Page *p = loadPage(xid, nr->ptr);
stasis_record_read_done(xid, root, rid, (const byte*)nr);
readlock(p->rwlatch,0);
pageid_t ret = findFirstLeaf(xid,p,depth-1);
unlock(p->rwlatch);
@ -617,9 +591,12 @@ pageid_t diskTreeComponent::findPage(int xid, recordid tree, const byte *key, si
Page *p = loadPage(xid, tree.page);
readlock(p->rwlatch,0);
int64_t depth = *(int64_t*)readRecord(xid, p , DEPTH, 0);
recordid depth_rid = {p->id, DEPTH, 0};
int64_t * depth = (int64_t*)stasis_record_read_begin(xid, p, depth_rid);
recordid rid = lookup(xid, p, *depth, key, keySize);
stasis_record_read_done(xid, p, depth_rid, (const byte*)depth);
recordid rid = lookup(xid, p, depth, key, keySize);
pageid_t ret = lookupLeafPageFromRid(xid,rid);
unlock(p->rwlatch);
releasePage(p);
@ -634,7 +611,9 @@ pageid_t diskTreeComponent::lookupLeafPageFromRid(int xid, recordid rid) {
if(rid.page != NULLRID.page || rid.slot != NULLRID.slot) {
Page * p2 = loadPage(xid, rid.page);
readlock(p2->rwlatch,0);
pid = ((const indexnode_rec*)(readRecord(xid,p2,rid.slot,0)))->ptr;
const indexnode_rec * nr = (const indexnode_rec*)stasis_record_read_begin(xid, p2, rid);
pid = nr->ptr;
stasis_record_read_done(xid, p2, rid, (const byte*)nr);
unlock(p2->rwlatch);
releasePage(p2);
}
@ -657,20 +636,30 @@ recordid diskTreeComponent::lookup(int xid,
// don't need to compare w/ first item in tree, since we need to position ourselves at the the max tree value <= key.
// positioning at FIRST_SLOT puts us "before" the first value
int match = FIRST_SLOT; // (so match is now < key)
recordid rid;
rid.page = node->id;
rid.size = 0;
for(rid.slot = FIRST_SLOT+1; rid.slot < numslots; rid.slot++) {
rid.size = stasis_record_length_read(xid, node, rid);
for(int i = FIRST_SLOT+1; i < numslots; i++) {
const indexnode_rec *rec = (const indexnode_rec*)readRecord(xid,node,i,0);
int cmpval = datatuple::compare((datatuple::key_t) (rec+1), *stasis_page_slotted_slot_length_ptr(node, i)-sizeof(*rec),
const indexnode_rec *rec = (const indexnode_rec*)stasis_record_read_begin(xid,node,rid);
int cmpval = datatuple::compare((datatuple::key_t) (rec+1), rid.size-sizeof(*rec),
(datatuple::key_t) key, keySize);
stasis_record_read_done(xid,node,rid,(const byte*)rec);
// key of current node is too big; there can be no matches under it.
if(cmpval>0) break;
match = i; // only increment match after comparing with the current node.
match = rid.slot; // only increment match after comparing with the current node.
}
rid.slot = match;
rid.size = 0;
if(depth) {
pageid_t child_id = ((const indexnode_rec*)readRecord(xid,node,match,0))->ptr;
const indexnode_rec* nr = (const indexnode_rec*)stasis_record_read_begin(xid, node, rid);
pageid_t child_id = nr->ptr;
stasis_record_read_done(xid, node, rid, (const byte*)nr);
Page* child_page = loadPage(xid, child_id);
readlock(child_page->rwlatch,0);
recordid ret = lookup(xid,child_page,depth-1,key,keySize);
@ -686,10 +675,11 @@ recordid diskTreeComponent::lookup(int xid,
void diskTreeComponent::print_tree(int xid) {
Page *p = loadPage(xid, root_rec.page);
readlock(p->rwlatch,0);
recordid depth_rid = {p->id, DEPTH, 0};
const indexnode_rec *depth_nr = (const indexnode_rec*)stasis_record_read_begin(xid, p , depth_rid);
const indexnode_rec *depth_nr = (const indexnode_rec*)readRecord(xid, p , DEPTH, 0);
int64_t depth = *((int64_t*)depth_nr);
int64_t depth = depth_nr->ptr;
stasis_record_read_done(xid,p,depth_rid,(const byte*)depth_nr);
print_tree(xid, root_rec.page, depth);
@ -713,33 +703,43 @@ void diskTreeComponent::print_tree(int xid, pageid_t pid, int64_t depth) {
assert(numslots > FIRST_SLOT);
recordid rid = { node->id, 0, 0 };
if(depth) {
printf("\tnot_leaf\n");
for(int i = FIRST_SLOT; i < numslots; i++) {
const indexnode_rec *nr = (const indexnode_rec*)readRecord(xid,node,i,0);
rid.slot = i;
const indexnode_rec *nr = (const indexnode_rec*)stasis_record_read_begin(xid,node,rid);
printf("\tchild_page_id:%lld\tkey:%s\n", nr->ptr,
datatuple::key_to_str((byte*)(nr+1)).c_str());
stasis_record_read_done(xid, node, rid, (const byte*)nr);
}
for(int i = FIRST_SLOT; i < numslots; i++) {
const indexnode_rec *nr = (const indexnode_rec*)readRecord(xid,node,i,0);
rid.slot = i;
const indexnode_rec *nr = (const indexnode_rec*)stasis_record_read_begin(xid,node,rid);
print_tree(xid, nr->ptr, depth-1);
stasis_record_read_done(xid, node, rid, (const byte*)nr);
}
} else {
printf("\tis_leaf\t\n");
const indexnode_rec *nr = (const indexnode_rec*)readRecord(xid,node,FIRST_SLOT,0);
rid.slot = FIRST_SLOT;
const indexnode_rec *nr = (const indexnode_rec*)stasis_record_read_begin(xid,node,rid);
printf("\tdata_page_id:%lld\tkey:%s\n", nr->ptr,
datatuple::key_to_str((byte*)(nr+1)).c_str());
stasis_record_read_done(xid, node, rid, (const byte*)nr);
printf("\t...\n");
nr = (const indexnode_rec*)readRecord(xid,node,numslots-1,0);
rid.slot= numslots - 1;
nr = (const indexnode_rec*)stasis_record_read_begin(xid,node,rid);
printf("\tdata_page_id:%lld\tkey:%s\n", nr->ptr,
datatuple::key_to_str((byte*)(nr+1)).c_str());
}
stasis_record_read_done(xid, node, rid, (const byte*)nr);
}
unlock(node->rwlatch);
releasePage(node);
}
@ -754,11 +754,12 @@ diskTreeComponentIterator::diskTreeComponentIterator(int xid, recordid root) {
readlock(p->rwlatch,0);
DEBUG("ROOT_REC_SIZE %d\n", diskTreeComponent::root_rec_size);
const byte * nr = diskTreeComponent::readRecord(xid,p,
diskTreeComponent::DEPTH,
diskTreeComponent::root_rec_size);
int64_t depth = *((int64_t*)nr);
recordid rid = {p->id, diskTreeComponent::DEPTH, diskTreeComponent::root_rec_size};
const indexnode_rec* nr = (const indexnode_rec*)stasis_record_read_begin(xid,p, rid);
int64_t depth = nr->ptr;
DEBUG("DEPTH = %lld\n", depth);
stasis_record_read_done(xid,p,rid,(const byte*)nr);
pageid_t leafid = diskTreeComponent::findFirstLeaf(xid, p, depth);
if(leafid != root.page) {
@ -791,10 +792,11 @@ diskTreeComponentIterator::diskTreeComponentIterator(int xid, recordid root, con
p = loadPage(xid,root.page);
readlock(p->rwlatch,0);
recordid rid = {p->id, diskTreeComponent::DEPTH, diskTreeComponent::root_rec_size};
const byte *nr = diskTreeComponent::readRecord(xid,p,diskTreeComponent::DEPTH, diskTreeComponent::root_rec_size);
int64_t depth = *((int64_t*)nr);
const indexnode_rec *nr = (const indexnode_rec*)stasis_record_read_begin(xid,p,rid);
int64_t depth = nr->ptr;
stasis_record_read_done(xid,p,rid,(const byte*)nr);
recordid lsm_entry_rid = diskTreeComponent::lookup(xid,p,depth,key,keylen);
@ -837,19 +839,20 @@ int diskTreeComponentIterator::next()
if(current.size == INVALID_SLOT) {
const indexnode_rec next_rec = *(const indexnode_rec*)diskTreeComponent::readRecord(xid_,p,
diskTreeComponent::NEXT_LEAF,
0);
recordid next_leaf_rid = {p->id, diskTreeComponent::NEXT_LEAF,0};
const indexnode_rec *nr = (const indexnode_rec*)stasis_record_read_begin(xid_, p, next_leaf_rid);
pageid_t next_rec = nr->ptr;
stasis_record_read_done(xid_,p,next_leaf_rid,(const byte*)nr);
unlock(p->rwlatch);
releasePage(p);
DEBUG("done with page %lld next = %lld\n", p->id, next_rec.ptr);
if(next_rec.ptr != -1 && ! justOnePage) {
p = loadPage(xid_, next_rec.ptr);
if(next_rec != -1 && ! justOnePage) {
p = loadPage(xid_, next_rec);
readlock(p->rwlatch,0);
current.page = next_rec.ptr;
current.page = next_rec;
current.slot = 2;
current.size = stasis_record_length_read(xid_, p, current);
} else {
@ -863,7 +866,9 @@ int diskTreeComponentIterator::next()
if(t != NULL) { free(t); t = NULL; }
t = (indexnode_rec*)malloc(current.size);
memcpy(t, diskTreeComponent::readRecord(xid_,p,current), current.size);
const byte * buf = stasis_record_read_begin(xid_, p, current);
memcpy(t, buf, current.size);
stasis_record_read_done(xid_, p, current, buf);
return 1;
} else {

View file

@ -68,17 +68,6 @@ public:
static void writeNodeRecord(int xid, Page *p, recordid &rid,
const byte *key, size_t keylen, pageid_t ptr);
static void writeRecord(int xid, Page *p, recordid &rid,
const byte *data, size_t datalen);
static void writeRecord(int xid, Page *p, slotid_t slot,
const byte *data, size_t datalen);
static const byte* readRecord(int xid, Page * p, recordid &rid);
static const byte* readRecord(int xid, Page * p, slotid_t slot, int64_t size);
static int32_t readRecordLength(int xid, Page *p, slotid_t slot);
//return the left-most leaf, these are not data pages, although referred to as leaf
static pageid_t findFirstLeaf(int xid, Page *root, int64_t depth);
//return the right-most leaf

View file

@ -79,7 +79,7 @@ void initialize_server()
ltable.setMergeData(mscheduler->getMergeData(lindex));
int64_t c0_size = 1024 * 1024 * 10;
printf("warning: running w/ tiny c0 for testing"); // XXX
printf("warning: running w/ tiny c0 for testing"); // XXX build a separate test server and deployment server?
mscheduler->startlogtable(lindex, c0_size);
unlock(ltable.header_lock);