removed some slotted page dispatch stuff from diskTreeComponent

git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@669 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe
This commit is contained in:
sears 2010-03-08 20:16:03 +00:00
parent a79c7d1c46
commit 0a356875b2

View file

@ -25,8 +25,6 @@
const RegionAllocConf_t diskTreeComponent::REGION_ALLOC_STATIC_INITIALIZER = { {0,0,-1}, 0, -1, -1, 1000 };
#define LOGTREE_ROOT_PAGE SLOTTED_PAGE
//LSM_ROOT_PAGE
const int64_t diskTreeComponent::DEPTH = 0; //in root this is the slot num where the DEPTH (of tree) is stored
@ -139,7 +137,6 @@ pageid_t diskTreeComponent::alloc_region_rid(int xid, void * ridp) {
RegionAllocConf_t conf;
Tread(xid,rid,&conf);
pageid_t ret = alloc_region(xid,&conf);
//DEBUG("{%lld <- alloc region extend}\n", conf.regionList.page);
// XXX get rid of Tset by storing next page in memory, and losing it
// on crash.
Tset(xid,rid,&conf);
@ -329,7 +326,8 @@ recordid diskTreeComponent::appendPage(int xid, recordid tree, pageid_t & rmLeaf
//creates a copy of the root page records in the
//newly allocated child page
for(int i = FIRST_SLOT; i < *stasis_page_slotted_numslots_ptr(p); i++) {
slotid_t numslots = stasis_record_last(xid, p).slot+1;
for(int i = FIRST_SLOT; i < numslots; i++) {
//read the record from the root page
const indexnode_rec *nr = (const indexnode_rec*)readRecord(xid,p,i,0);
int reclen = readRecordLength(xid, p, i);
@ -348,23 +346,19 @@ recordid diskTreeComponent::appendPage(int xid, recordid tree, pageid_t & rmLeaf
// NOTE: stasis_record_free call goes to slottedFree in slotted.c
// this function only reduces the numslots when you call it
// with the last slot. so thats why i go backwards here.
DEBUG("slots %d (%d) keysize=%lld\n", (int)*stasis_page_slotted_numslots_ptr(p), (int)FIRST_SLOT+1, (long long int)keySize);
assert(*stasis_page_slotted_numslots_ptr(p) >= FIRST_SLOT+1);
for(int i = *stasis_page_slotted_numslots_ptr(p)-1; i>FIRST_SLOT; i--) {
assert(*stasis_page_slotted_numslots_ptr(p) > FIRST_SLOT+1);
DEBUG("slots %d (%d) keysize=%lld\n", (int)last_slot+1, (int)FIRST_SLOT+1, (long long int)keySize);
assert(numslots >= FIRST_SLOT+1);
// Note that we leave the first slot in place.
for(int i = numslots-1; i>FIRST_SLOT; i--) {
recordid tmp_rec= {p->id, i, INVALID_SIZE};
stasis_record_free(xid, p, tmp_rec);
}
recordid pFirstSlot = stasis_record_last(xid, p);
assert(pFirstSlot.slot == FIRST_SLOT);
//TODO: could change with stasis_slotted_page_initialize(...);
// TODO: fsck?
// reinsert first.
recordid pFirstSlot = { p->id, FIRST_SLOT, readRecordLength(xid, p, FIRST_SLOT)};
if(*stasis_page_slotted_numslots_ptr(p) != FIRST_SLOT+1) {
DEBUG("slots %d (%d)\n", *stasis_page_slotted_numslots_ptr(p), (int)FIRST_SLOT+1);
assert(*stasis_page_slotted_numslots_ptr(p) == FIRST_SLOT+1);
}
indexnode_rec *nr
= (indexnode_rec*)stasis_record_write_begin(xid, p, pFirstSlot);
@ -447,10 +441,7 @@ recordid diskTreeComponent::appendInternalNode(int xid, Page *p,
diskTreeComponent_page_allocator_t allocator,
void *allocator_state) {
assert(p->pageType == LOGTREE_ROOT_PAGE ||
p->pageType == SLOTTED_PAGE);
DEBUG("appendInternalNode\tdepth %lldkeylen%d\tnumslots %d\n", depth, key_len, *stasis_page_slotted_numslots_ptr(p));
assert(p->pageType == SLOTTED_PAGE);
recordid ret;
@ -462,8 +453,9 @@ recordid diskTreeComponent::appendInternalNode(int xid, Page *p,
writeNodeRecord(xid,p,ret,key,key_len,val_page);
}
} else {
// recurse
int slot = *stasis_page_slotted_numslots_ptr(p)-1;//*recordcount_ptr(p)-1;
slotid_t slot = stasis_record_last(xid, p).slot;
assert(slot >= FIRST_SLOT); // there should be no empty nodes
const indexnode_rec *nr = (const indexnode_rec*)readRecord(xid, p, slot, 0);
@ -483,7 +475,7 @@ recordid diskTreeComponent::appendInternalNode(int xid, Page *p,
ret = stasis_record_alloc_begin(xid, p, sizeof(indexnode_rec)+key_len);
DEBUG("keylen %d\tnumslots %d for page id %lld ret.size %lld prv rec len %d\n",
key_len,
*stasis_page_slotted_numslots_ptr(p),
stasis_record_last(xid, p).slot+1,
p->id,
ret.size,
readRecordLength(xid, p, slot));
@ -547,7 +539,6 @@ recordid diskTreeComponent::buildPathToLeaf(int xid, recordid root, Page *root_p
pageid_t tmp_pid = -1;
writeRecord(xid, child_p, NEXT_LEAF, (byte*)(&tmp_pid), root_rec_size);
//writeNodeRecord(xid,child_p,NEXT_LEAF,dummy,key_len,-1);
recordid leaf_rec = stasis_record_alloc_begin(xid, child_p,
sizeof(indexnode_rec)+key_len);
@ -588,8 +579,7 @@ pageid_t diskTreeComponent::findLastLeaf(int xid, Page *root, int64_t depth) {
DEBUG("Found last leaf = %lld\n", root->id);
return root->id;
} else {
const indexnode_rec *nr = (indexnode_rec*) readRecord(xid, root,
(*stasis_page_slotted_numslots_ptr(root))-1, 0);
const indexnode_rec *nr = (indexnode_rec*) readRecord(xid, root, stasis_record_last(xid, root).slot, 0);
pageid_t ret;
Page *p = loadPage(xid, nr->ptr);
@ -602,7 +592,6 @@ pageid_t diskTreeComponent::findLastLeaf(int xid, Page *root, int64_t depth) {
}
}
/**
* Traverse from the root of the tree to the left most (lowest valued
* key) leaf.
@ -658,16 +647,18 @@ recordid diskTreeComponent::lookup(int xid,
const byte *key, size_t keySize ) {
//DEBUG("lookup: pid %lld\t depth %lld\n", node->id, depth);
if(*stasis_page_slotted_numslots_ptr(node) == FIRST_SLOT) {
slotid_t numslots = stasis_record_last(xid, node).slot + 1;
if(numslots == FIRST_SLOT) {
return NULLRID;
}
assert(*stasis_page_slotted_numslots_ptr(node) > FIRST_SLOT);
assert(numslots > FIRST_SLOT);
// don't need to compare w/ first item in tree, since we need to position ourselves at the the max tree value <= key.
// positioning at FIRST_SLOT puts us "before" the first value
int match = FIRST_SLOT; // (so match is now < key)
for(int i = FIRST_SLOT+1; i < *stasis_page_slotted_numslots_ptr(node); i++) {
for(int i = FIRST_SLOT+1; i < numslots; i++) {
const indexnode_rec *rec = (const indexnode_rec*)readRecord(xid,node,i,0);
int cmpval = datatuple::compare((datatuple::key_t) (rec+1), *stasis_page_slotted_slot_length_ptr(node, i)-sizeof(*rec),
(datatuple::key_t) key, keySize);
@ -712,24 +703,26 @@ void diskTreeComponent::print_tree(int xid, pageid_t pid, int64_t depth) {
Page *node = loadPage(xid, pid);
readlock(node->rwlatch,0);
printf("page_id:%lld\tnum_slots:%d\t\n", node->id, *stasis_page_slotted_numslots_ptr(node));
slotid_t numslots = stasis_record_last(xid,node).slot + 1;
if(*stasis_page_slotted_numslots_ptr(node) == FIRST_SLOT) {
printf("page_id:%lld\tnum_slots:%d\t\n", node->id, numslots);
if(numslots == FIRST_SLOT) {
return;
}
assert(*stasis_page_slotted_numslots_ptr(node) > FIRST_SLOT);
assert(numslots > FIRST_SLOT);
if(depth) {
printf("\tnot_leaf\n");
for(int i = FIRST_SLOT; i < *stasis_page_slotted_numslots_ptr(node); i++) {
for(int i = FIRST_SLOT; i < numslots; i++) {
const indexnode_rec *nr = (const indexnode_rec*)readRecord(xid,node,i,0);
printf("\tchild_page_id:%lld\tkey:%s\n", nr->ptr,
datatuple::key_to_str((byte*)(nr+1)).c_str());
}
for(int i = FIRST_SLOT; i < *stasis_page_slotted_numslots_ptr(node); i++) {
for(int i = FIRST_SLOT; i < numslots; i++) {
const indexnode_rec *nr = (const indexnode_rec*)readRecord(xid,node,i,0);
print_tree(xid, nr->ptr, depth-1);
}
@ -743,7 +736,7 @@ void diskTreeComponent::print_tree(int xid, pageid_t pid, int64_t depth) {
printf("\t...\n");
nr = (const indexnode_rec*)readRecord(xid,node,(*stasis_page_slotted_numslots_ptr(node))-1,0);
nr = (const indexnode_rec*)readRecord(xid,node,numslots-1,0);
printf("\tdata_page_id:%lld\tkey:%s\n", nr->ptr,
datatuple::key_to_str((byte*)(nr+1)).c_str());
}
@ -780,10 +773,10 @@ diskTreeComponentIterator::diskTreeComponentIterator(int xid, recordid root) {
}
{
// Position just before the first slot.
// The first call to next() will increment us to the first slot, or return NULL.
recordid rid = { p->id, diskTreeComponent::FIRST_SLOT-1, 0};
current = rid;
// Position just before the first slot.
// The first call to next() will increment us to the first slot, or return NULL.
recordid rid = { p->id, diskTreeComponent::FIRST_SLOT-1, 0};
current = rid;
}
DEBUG("keysize = %d, slot = %d\n", keySize, current.slot);