From 85ca7a935c244c4ed32b974e346e78db8e55f5ee Mon Sep 17 00:00:00 2001 From: sears Date: Mon, 8 Mar 2010 23:58:13 +0000 Subject: [PATCH] remove most of the read/write record wrappers from diskTreeComponent git-svn-id: svn+ssh://svn.corp.yahoo.com/yahoo/yrl/labs/pnuts/code/logstore@670 8dad8b1f-cf64-0410-95b6-bcf113ffbcfe --- diskTreeComponent.cpp | 219 +++++++++++++++++++++--------------------- diskTreeComponent.h | 11 --- server.cpp | 2 +- 3 files changed, 113 insertions(+), 119 deletions(-) diff --git a/diskTreeComponent.cpp b/diskTreeComponent.cpp index 7b5de0b..a8c8ad6 100644 --- a/diskTreeComponent.cpp +++ b/diskTreeComponent.cpp @@ -43,6 +43,7 @@ static lsn_t get_lsn(int xid) { return ret; } +// TODO move init_stasis to a more appropriate module void diskTreeComponent::init_stasis() { @@ -183,7 +184,8 @@ recordid diskTreeComponent::create(int xid) { && tmp.size == root_rec_size); int64_t zero = 0; - writeRecord(xid, p, tmp, (byte*)&zero, root_rec_size); + assert(sizeof(zero) == root_rec_size); + stasis_record_write(xid, p, tmp, (byte*)&zero); tmp = stasis_record_alloc_begin(xid, p, root_rec_size); stasis_record_alloc_done(xid,p,tmp); @@ -192,7 +194,9 @@ recordid diskTreeComponent::create(int xid) { && tmp.slot == COMPARATOR && tmp.size == root_rec_size); - writeRecord(xid, p, tmp, (byte*)&COMPARATOR, root_rec_size); + stasis_record_write(xid, p, tmp, (byte*)&zero); + + stasis_page_lsn_write(xid, p, get_lsn(xid)); unlock(p->rwlatch); releasePage(p); @@ -202,20 +206,6 @@ recordid diskTreeComponent::create(int xid) { return ret; } -// XXX remove the next N records, which are completely redundant. - -/** - * TODO: what happen if there is already such a record with a different size? - * I guess this should never happen in rose, but what if? - **/ -void diskTreeComponent::writeRecord(int xid, Page *p, recordid &rid, - const byte *data, size_t datalen) { - byte *byte_arr = stasis_record_write_begin(xid, p, rid); - memcpy(byte_arr, data, datalen); //TODO: stasis write call - stasis_record_write_done(xid, p, rid, byte_arr); - stasis_page_lsn_write(xid, p, get_lsn(xid)); -} - void diskTreeComponent::writeNodeRecord(int xid, Page * p, recordid & rid, const byte *key, size_t keylen, pageid_t ptr) { DEBUG("writenoderecord:\tp->id\t%lld\tkey:\t%s\tkeylen: %d\tval_page\t%lld\n", @@ -227,40 +217,6 @@ void diskTreeComponent::writeNodeRecord(int xid, Page * p, recordid & rid, stasis_page_lsn_write(xid, p, get_lsn(xid)); } -void diskTreeComponent::writeRecord(int xid, Page *p, slotid_t slot, - const byte *data, size_t datalen) { - recordid rid; - rid.page = p->id; - rid.slot = slot; - rid.size = datalen; - byte *byte_arr = stasis_record_write_begin(xid, p, rid); - memcpy(byte_arr, data, datalen); //TODO: stasis write call - stasis_record_write_done(xid, p, rid, byte_arr); - stasis_page_lsn_write(xid, p, get_lsn(xid)); -} - -const byte* diskTreeComponent::readRecord(int xid, Page * p, recordid &rid) { - - const byte *nr = stasis_record_read_begin(xid,p,rid); // XXX API violation? - return nr; -} - -const byte* diskTreeComponent::readRecord(int xid, Page * p, slotid_t slot, int64_t size) { - - recordid rid; - rid.page = p->id; - rid.slot = slot; - rid.size = size; - const byte *nr = stasis_record_read_begin(xid,p,rid); - return nr; -} - -int32_t diskTreeComponent::readRecordLength(int xid, Page *p, slotid_t slot) { - recordid rec = {p->id, slot, 0}; - int32_t reclen = stasis_record_length_read(xid, p, rec); - return reclen; -} - void diskTreeComponent::initializeNodePage(int xid, Page *p) { stasis_page_slotted_initialize_page(p); recordid reserved1 = stasis_record_alloc_begin(xid, p, sizeof(indexnode_rec)); @@ -277,10 +233,12 @@ recordid diskTreeComponent::appendPage(int xid, recordid tree, pageid_t & rmLeaf Page *p = loadPage(xid, tree.page); writelock(p->rwlatch, 0); - tree.slot = 0; + tree.slot = DEPTH; + tree.size = 0; - const indexnode_rec *nr = (const indexnode_rec*)readRecord(xid, p , DEPTH, 0); + const indexnode_rec *nr = (const indexnode_rec*)stasis_record_read_begin(xid, p, tree); int64_t depth = *((int64_t*)nr); + stasis_record_read_done(xid, p, tree, (const byte*)nr); if(rmLeafID == -1) { rmLeafID = findLastLeaf(xid, p, depth); @@ -327,19 +285,21 @@ recordid diskTreeComponent::appendPage(int xid, recordid tree, pageid_t & rmLeaf //creates a copy of the root page records in the //newly allocated child page slotid_t numslots = stasis_record_last(xid, p).slot+1; - for(int i = FIRST_SLOT; i < numslots; i++) { + recordid rid; + rid.page = p->id; + for(rid.slot = FIRST_SLOT; rid.slot < numslots; rid.slot++) { //read the record from the root page - const indexnode_rec *nr = (const indexnode_rec*)readRecord(xid,p,i,0); - int reclen = readRecordLength(xid, p, i); + rid.size = stasis_record_length_read(xid, p, rid); + const indexnode_rec *nr = (const indexnode_rec*)stasis_record_read_begin(xid, p, rid); - recordid cnext = stasis_record_alloc_begin(xid, lc,reclen); + recordid cnext = stasis_record_alloc_begin(xid, lc,rid.size); - assert(i == cnext.slot); + assert(rid.slot == cnext.slot); assert(cnext.size != INVALID_SLOT); stasis_record_alloc_done(xid, lc, cnext); - - writeRecord(xid,lc,i,(byte*)(nr),reclen); + stasis_record_write(xid, lc, cnext, (byte*)nr); + stasis_record_read_done(xid, p, rid, (const byte*)nr); } // deallocate old entries, and update pointer on parent node. @@ -366,21 +326,24 @@ recordid diskTreeComponent::appendPage(int xid, recordid tree, pageid_t & rmLeaf // don't overwrite key... nr->ptr = child; stasis_record_write_done(xid,p,pFirstSlot,(byte*)nr); - stasis_page_lsn_write(xid, p, get_lsn(xid)); if(!depth) { rmLeafID = lc->id; pageid_t tmpid = -1; - writeRecord(xid,lc,PREV_LEAF,(byte*)(&tmpid), root_rec_size); - writeRecord(xid,lc,NEXT_LEAF,(byte*)(&tmpid), root_rec_size); + recordid rid = { lc->id, PREV_LEAF, root_rec_size }; + stasis_record_write(xid, lc, rid, (byte*)&tmpid); + rid.slot = NEXT_LEAF; + stasis_record_write(xid, lc, rid, (byte*)&tmpid); } + stasis_page_lsn_write(xid, lc, get_lsn(xid)); unlock(lc->rwlatch); releasePage(lc); //update the depth info at the root depth ++; - writeRecord(xid,p,DEPTH,(byte*)(&depth), root_rec_size); + recordid depth_rid = { p->id, DEPTH, root_rec_size }; + stasis_record_write(xid, p, depth_rid, (byte*)(&depth)); assert(tree.page == p->id); ret = appendInternalNode(xid, p, depth, key, keySize, val_page, @@ -397,7 +360,6 @@ recordid diskTreeComponent::appendPage(int xid, recordid tree, pageid_t & rmLeaf rmLeafID = ret.page; DEBUG("lastleaf is %lld\n", rmLeafID); - } else { // write the new value to an existing page DEBUG("Writing %s\t%d to existing page# %lld\n", datatuple::key_to_str(key).c_str(), @@ -414,6 +376,8 @@ recordid diskTreeComponent::appendPage(int xid, recordid tree, pageid_t & rmLeaf } } + stasis_page_lsn_write(xid, p, get_lsn(xid)); + unlock(p->rwlatch); releasePage(p); @@ -455,11 +419,13 @@ recordid diskTreeComponent::appendInternalNode(int xid, Page *p, } else { // recurse - slotid_t slot = stasis_record_last(xid, p).slot; + recordid last_rid = stasis_record_last(xid, p); + + assert(last_rid.slot >= FIRST_SLOT); // there should be no empty nodes + const indexnode_rec *nr = (const indexnode_rec*)stasis_record_read_begin(xid, p, last_rid); - assert(slot >= FIRST_SLOT); // there should be no empty nodes - const indexnode_rec *nr = (const indexnode_rec*)readRecord(xid, p, slot, 0); pageid_t child_id = nr->ptr; + stasis_record_read_done(xid, p, last_rid, (const byte*)nr); nr = 0; { Page *child_page = loadPage(xid, child_id); @@ -532,13 +498,14 @@ recordid diskTreeComponent::buildPathToLeaf(int xid, recordid root, Page *root_p } else { // set leaf - // backward link.//these writes do not need alloc_begin as it is done in page initialization - writeRecord(xid, child_p, PREV_LEAF, (byte*)(&lastLeaf), root_rec_size); + // backward link. records were alloced by page initialization + recordid prev_leaf_rid = { child_p->id, PREV_LEAF, root_rec_size }; + stasis_record_write(xid, child_p, prev_leaf_rid, (byte*)&lastLeaf); // forward link (initialize to -1) - pageid_t tmp_pid = -1; - writeRecord(xid, child_p, NEXT_LEAF, (byte*)(&tmp_pid), root_rec_size); + recordid next_leaf_rid = { child_p->id, NEXT_LEAF, root_rec_size }; + stasis_record_write(xid, child_p, next_leaf_rid, (byte*)&tmp_pid); recordid leaf_rec = stasis_record_alloc_begin(xid, child_p, sizeof(indexnode_rec)+key_len); @@ -550,13 +517,16 @@ recordid diskTreeComponent::buildPathToLeaf(int xid, recordid root, Page *root_p ret = leaf_rec; + stasis_page_lsn_write(xid, child_p, get_lsn(xid)); unlock(child_p->rwlatch); releasePage(child_p); if(lastLeaf != -1) { // install forward link in previous page Page *lastLeafP = loadPage(xid, lastLeaf); writelock(lastLeafP->rwlatch,0); - writeRecord(xid,lastLeafP,NEXT_LEAF,(byte*)(&child),root_rec_size); + recordid last_next_leaf_rid = {lastLeaf, NEXT_LEAF, root_rec_size }; + stasis_record_write(xid,lastLeafP,last_next_leaf_rid,(byte*)&child); + stasis_page_lsn_write(xid, lastLeafP, get_lsn(xid)); unlock(lastLeafP->rwlatch); releasePage(lastLeafP); } @@ -579,12 +549,14 @@ pageid_t diskTreeComponent::findLastLeaf(int xid, Page *root, int64_t depth) { DEBUG("Found last leaf = %lld\n", root->id); return root->id; } else { - const indexnode_rec *nr = (indexnode_rec*) readRecord(xid, root, stasis_record_last(xid, root).slot, 0); - pageid_t ret; + recordid last_rid = stasis_record_last(xid, root); + const indexnode_rec *nr = (const indexnode_rec*) stasis_record_read_begin(xid, root, last_rid); Page *p = loadPage(xid, nr->ptr); + stasis_record_read_done(xid, root, last_rid, (const byte*)nr); + readlock(p->rwlatch,0); - ret = findLastLeaf(xid,p,depth-1); + pageid_t ret = findLastLeaf(xid,p,depth-1); unlock(p->rwlatch); releasePage(p); @@ -601,8 +573,10 @@ pageid_t diskTreeComponent::findFirstLeaf(int xid, Page *root, int64_t depth) { if(!depth) { //if depth is 0, then returns the id of the page return root->id; } else { - const indexnode_rec *nr = (indexnode_rec*)readRecord(xid,root,FIRST_SLOT,0); + recordid rid = {root->id, FIRST_SLOT, 0}; + const indexnode_rec *nr = (const indexnode_rec*)stasis_record_read_begin(xid, root, rid); Page *p = loadPage(xid, nr->ptr); + stasis_record_read_done(xid, root, rid, (const byte*)nr); readlock(p->rwlatch,0); pageid_t ret = findFirstLeaf(xid,p,depth-1); unlock(p->rwlatch); @@ -617,9 +591,12 @@ pageid_t diskTreeComponent::findPage(int xid, recordid tree, const byte *key, si Page *p = loadPage(xid, tree.page); readlock(p->rwlatch,0); - int64_t depth = *(int64_t*)readRecord(xid, p , DEPTH, 0); + recordid depth_rid = {p->id, DEPTH, 0}; + int64_t * depth = (int64_t*)stasis_record_read_begin(xid, p, depth_rid); + + recordid rid = lookup(xid, p, *depth, key, keySize); + stasis_record_read_done(xid, p, depth_rid, (const byte*)depth); - recordid rid = lookup(xid, p, depth, key, keySize); pageid_t ret = lookupLeafPageFromRid(xid,rid); unlock(p->rwlatch); releasePage(p); @@ -634,7 +611,9 @@ pageid_t diskTreeComponent::lookupLeafPageFromRid(int xid, recordid rid) { if(rid.page != NULLRID.page || rid.slot != NULLRID.slot) { Page * p2 = loadPage(xid, rid.page); readlock(p2->rwlatch,0); - pid = ((const indexnode_rec*)(readRecord(xid,p2,rid.slot,0)))->ptr; + const indexnode_rec * nr = (const indexnode_rec*)stasis_record_read_begin(xid, p2, rid); + pid = nr->ptr; + stasis_record_read_done(xid, p2, rid, (const byte*)nr); unlock(p2->rwlatch); releasePage(p2); } @@ -657,20 +636,30 @@ recordid diskTreeComponent::lookup(int xid, // don't need to compare w/ first item in tree, since we need to position ourselves at the the max tree value <= key. // positioning at FIRST_SLOT puts us "before" the first value int match = FIRST_SLOT; // (so match is now < key) + recordid rid; + rid.page = node->id; + rid.size = 0; + for(rid.slot = FIRST_SLOT+1; rid.slot < numslots; rid.slot++) { + rid.size = stasis_record_length_read(xid, node, rid); - for(int i = FIRST_SLOT+1; i < numslots; i++) { - const indexnode_rec *rec = (const indexnode_rec*)readRecord(xid,node,i,0); - int cmpval = datatuple::compare((datatuple::key_t) (rec+1), *stasis_page_slotted_slot_length_ptr(node, i)-sizeof(*rec), + const indexnode_rec *rec = (const indexnode_rec*)stasis_record_read_begin(xid,node,rid); + int cmpval = datatuple::compare((datatuple::key_t) (rec+1), rid.size-sizeof(*rec), (datatuple::key_t) key, keySize); + stasis_record_read_done(xid,node,rid,(const byte*)rec); // key of current node is too big; there can be no matches under it. if(cmpval>0) break; - match = i; // only increment match after comparing with the current node. + match = rid.slot; // only increment match after comparing with the current node. } + rid.slot = match; + rid.size = 0; if(depth) { - pageid_t child_id = ((const indexnode_rec*)readRecord(xid,node,match,0))->ptr; + const indexnode_rec* nr = (const indexnode_rec*)stasis_record_read_begin(xid, node, rid); + pageid_t child_id = nr->ptr; + stasis_record_read_done(xid, node, rid, (const byte*)nr); + Page* child_page = loadPage(xid, child_id); readlock(child_page->rwlatch,0); recordid ret = lookup(xid,child_page,depth-1,key,keySize); @@ -686,10 +675,11 @@ recordid diskTreeComponent::lookup(int xid, void diskTreeComponent::print_tree(int xid) { Page *p = loadPage(xid, root_rec.page); readlock(p->rwlatch,0); + recordid depth_rid = {p->id, DEPTH, 0}; + const indexnode_rec *depth_nr = (const indexnode_rec*)stasis_record_read_begin(xid, p , depth_rid); - const indexnode_rec *depth_nr = (const indexnode_rec*)readRecord(xid, p , DEPTH, 0); - - int64_t depth = *((int64_t*)depth_nr); + int64_t depth = depth_nr->ptr; + stasis_record_read_done(xid,p,depth_rid,(const byte*)depth_nr); print_tree(xid, root_rec.page, depth); @@ -713,33 +703,43 @@ void diskTreeComponent::print_tree(int xid, pageid_t pid, int64_t depth) { assert(numslots > FIRST_SLOT); + recordid rid = { node->id, 0, 0 }; + if(depth) { printf("\tnot_leaf\n"); for(int i = FIRST_SLOT; i < numslots; i++) { - const indexnode_rec *nr = (const indexnode_rec*)readRecord(xid,node,i,0); + rid.slot = i; + const indexnode_rec *nr = (const indexnode_rec*)stasis_record_read_begin(xid,node,rid); printf("\tchild_page_id:%lld\tkey:%s\n", nr->ptr, datatuple::key_to_str((byte*)(nr+1)).c_str()); + stasis_record_read_done(xid, node, rid, (const byte*)nr); } for(int i = FIRST_SLOT; i < numslots; i++) { - const indexnode_rec *nr = (const indexnode_rec*)readRecord(xid,node,i,0); + rid.slot = i; + const indexnode_rec *nr = (const indexnode_rec*)stasis_record_read_begin(xid,node,rid); print_tree(xid, nr->ptr, depth-1); + stasis_record_read_done(xid, node, rid, (const byte*)nr); } } else { printf("\tis_leaf\t\n"); - const indexnode_rec *nr = (const indexnode_rec*)readRecord(xid,node,FIRST_SLOT,0); + rid.slot = FIRST_SLOT; + const indexnode_rec *nr = (const indexnode_rec*)stasis_record_read_begin(xid,node,rid); printf("\tdata_page_id:%lld\tkey:%s\n", nr->ptr, datatuple::key_to_str((byte*)(nr+1)).c_str()); + stasis_record_read_done(xid, node, rid, (const byte*)nr); printf("\t...\n"); - nr = (const indexnode_rec*)readRecord(xid,node,numslots-1,0); + rid.slot= numslots - 1; + nr = (const indexnode_rec*)stasis_record_read_begin(xid,node,rid); printf("\tdata_page_id:%lld\tkey:%s\n", nr->ptr, datatuple::key_to_str((byte*)(nr+1)).c_str()); - } + stasis_record_read_done(xid, node, rid, (const byte*)nr); +} unlock(node->rwlatch); releasePage(node); } @@ -754,11 +754,12 @@ diskTreeComponentIterator::diskTreeComponentIterator(int xid, recordid root) { readlock(p->rwlatch,0); DEBUG("ROOT_REC_SIZE %d\n", diskTreeComponent::root_rec_size); - const byte * nr = diskTreeComponent::readRecord(xid,p, - diskTreeComponent::DEPTH, - diskTreeComponent::root_rec_size); - int64_t depth = *((int64_t*)nr); + recordid rid = {p->id, diskTreeComponent::DEPTH, diskTreeComponent::root_rec_size}; + const indexnode_rec* nr = (const indexnode_rec*)stasis_record_read_begin(xid,p, rid); + + int64_t depth = nr->ptr; DEBUG("DEPTH = %lld\n", depth); + stasis_record_read_done(xid,p,rid,(const byte*)nr); pageid_t leafid = diskTreeComponent::findFirstLeaf(xid, p, depth); if(leafid != root.page) { @@ -791,10 +792,11 @@ diskTreeComponentIterator::diskTreeComponentIterator(int xid, recordid root, con p = loadPage(xid,root.page); readlock(p->rwlatch,0); + recordid rid = {p->id, diskTreeComponent::DEPTH, diskTreeComponent::root_rec_size}; - const byte *nr = diskTreeComponent::readRecord(xid,p,diskTreeComponent::DEPTH, diskTreeComponent::root_rec_size); - - int64_t depth = *((int64_t*)nr); + const indexnode_rec *nr = (const indexnode_rec*)stasis_record_read_begin(xid,p,rid); + int64_t depth = nr->ptr; + stasis_record_read_done(xid,p,rid,(const byte*)nr); recordid lsm_entry_rid = diskTreeComponent::lookup(xid,p,depth,key,keylen); @@ -837,19 +839,20 @@ int diskTreeComponentIterator::next() if(current.size == INVALID_SLOT) { - const indexnode_rec next_rec = *(const indexnode_rec*)diskTreeComponent::readRecord(xid_,p, - diskTreeComponent::NEXT_LEAF, - 0); + recordid next_leaf_rid = {p->id, diskTreeComponent::NEXT_LEAF,0}; + const indexnode_rec *nr = (const indexnode_rec*)stasis_record_read_begin(xid_, p, next_leaf_rid); + pageid_t next_rec = nr->ptr; + stasis_record_read_done(xid_,p,next_leaf_rid,(const byte*)nr); + unlock(p->rwlatch); releasePage(p); DEBUG("done with page %lld next = %lld\n", p->id, next_rec.ptr); - - if(next_rec.ptr != -1 && ! justOnePage) { - p = loadPage(xid_, next_rec.ptr); + if(next_rec != -1 && ! justOnePage) { + p = loadPage(xid_, next_rec); readlock(p->rwlatch,0); - current.page = next_rec.ptr; + current.page = next_rec; current.slot = 2; current.size = stasis_record_length_read(xid_, p, current); } else { @@ -863,7 +866,9 @@ int diskTreeComponentIterator::next() if(t != NULL) { free(t); t = NULL; } t = (indexnode_rec*)malloc(current.size); - memcpy(t, diskTreeComponent::readRecord(xid_,p,current), current.size); + const byte * buf = stasis_record_read_begin(xid_, p, current); + memcpy(t, buf, current.size); + stasis_record_read_done(xid_, p, current, buf); return 1; } else { diff --git a/diskTreeComponent.h b/diskTreeComponent.h index 202584b..dd8c564 100644 --- a/diskTreeComponent.h +++ b/diskTreeComponent.h @@ -68,17 +68,6 @@ public: static void writeNodeRecord(int xid, Page *p, recordid &rid, const byte *key, size_t keylen, pageid_t ptr); - static void writeRecord(int xid, Page *p, recordid &rid, - const byte *data, size_t datalen); - - static void writeRecord(int xid, Page *p, slotid_t slot, - const byte *data, size_t datalen); - - static const byte* readRecord(int xid, Page * p, recordid &rid); - static const byte* readRecord(int xid, Page * p, slotid_t slot, int64_t size); - - static int32_t readRecordLength(int xid, Page *p, slotid_t slot); - //return the left-most leaf, these are not data pages, although referred to as leaf static pageid_t findFirstLeaf(int xid, Page *root, int64_t depth); //return the right-most leaf diff --git a/server.cpp b/server.cpp index 1c2eec4..5817db5 100644 --- a/server.cpp +++ b/server.cpp @@ -79,7 +79,7 @@ void initialize_server() ltable.setMergeData(mscheduler->getMergeData(lindex)); int64_t c0_size = 1024 * 1024 * 10; - printf("warning: running w/ tiny c0 for testing"); // XXX + printf("warning: running w/ tiny c0 for testing"); // XXX build a separate test server and deployment server? mscheduler->startlogtable(lindex, c0_size); unlock(ltable.header_lock);