diff --git a/CMakeLists.txt b/CMakeLists.txt index e865631..4087211 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -69,6 +69,6 @@ ENDIF ( "${CMAKE_C_COMPILER_ID}" STREQUAL "GNU" ) #CONFIGURE_FILE(${CMAKE_CURRENT_SOURCE_DIR}/config.h.cmake ${CMAKE_CURRENT_BINARY_DIR}/config.h) -ADD_LIBRARY(logstore logserver.cpp logstore.cpp logiterators.cpp datapage.cpp merger.cpp tuplemerger.cpp) +ADD_LIBRARY(logstore logserver.cpp logstore.cpp diskTreeComponent.cpp logiterators.cpp datapage.cpp merger.cpp tuplemerger.cpp) ADD_LIBRARY(logstore_client tcpclient.cpp) CREATE_EXECUTABLE(server) diff --git a/diskTreeComponent.cpp b/diskTreeComponent.cpp new file mode 100644 index 0000000..bdbef1b --- /dev/null +++ b/diskTreeComponent.cpp @@ -0,0 +1,1004 @@ +/* + * diskTreeComponent.cpp + * + * Created on: Feb 18, 2010 + * Author: sears + */ + +#include +#include +#include +#include + +#include "merger.h" +#include "diskTreeComponent.h" + +#include +#include +#include +#include +#include + +///////////////////////////////////////////////////////////////// +// LOGTREE implementation +///////////////////////////////////////////////////////////////// + +const RegionAllocConf_t logtree::REGION_ALLOC_STATIC_INITIALIZER = { {0,0,-1}, 0, -1, -1, 1000 }; + +#define LOGTREE_ROOT_PAGE SLOTTED_PAGE + +//LSM_ROOT_PAGE + +const int64_t logtree::DEPTH = 0; //in root this is the slot num where the DEPTH (of tree) is stored +const int64_t logtree::COMPARATOR = 1; //in root this is the slot num where the COMPARATOR id is stored +const int64_t logtree::FIRST_SLOT = 2; //this is the first unused slot in all index pages +const size_t logtree::root_rec_size = sizeof(int64_t); +const int64_t logtree::PREV_LEAF = 0; //pointer to prev leaf page +const int64_t logtree::NEXT_LEAF = 1; //pointer to next leaf page + +// XXX hack, and cut and pasted from datapage.cpp. +static lsn_t get_lsn(int xid) { + lsn_t xid_lsn = stasis_transaction_table_get((stasis_transaction_table_t*)stasis_runtime_transaction_table(), xid)->prevLSN; + lsn_t log_lsn = ((stasis_log_t*)stasis_log())->next_available_lsn((stasis_log_t*)stasis_log()); + lsn_t ret = xid_lsn == INVALID_LSN ? log_lsn-1 : xid_lsn; + assert(ret != INVALID_LSN); + return ret; +} + + +void logtree::init_stasis() { + + bufferManagerFileHandleType = BUFFER_MANAGER_FILE_HANDLE_PFILE; + + DataPage::register_stasis_page_impl(); + + stasis_buffer_manager_factory = stasis_buffer_manager_hash_factory; // XXX workaround stasis issue #22. + + Tinit(); + +} + +void logtree::deinit_stasis() { Tdeinit(); } + +void logtree::free_region_rid(int xid, recordid tree, + logtree_page_deallocator_t dealloc, void *allocator_state) +{ + // Tdealloc(xid,tree); + dealloc(xid,allocator_state); + // XXX fishy shouldn't caller do this? + Tdealloc(xid, *(recordid*)allocator_state); +} + + +void logtree::dealloc_region_rid(int xid, recordid rid) +{ + RegionAllocConf_t a; + Tread(xid,rid,&a); + DEBUG("{%lld <- dealloc region arraylist}\n", a.regionList.page); + + for(int i = 0; i < a.regionCount; i++) { + a.regionList.slot = i; + pageid_t pid; + Tread(xid,a.regionList,&pid); + TregionDealloc(xid,pid); + } + a.regionList.slot = 0; +// printf("Warning: leaking arraylist %lld in logtree\n", (long long)a.regionList.page); + TarrayListDealloc(xid, a.regionList); +} + + +void logtree::force_region_rid(int xid, recordid rid) +{ + RegionAllocConf_t a; + Tread(xid,rid,&a); + + for(int i = 0; i < a.regionCount; i++) + { + a.regionList.slot = i; + pageid_t pid; + Tread(xid,a.regionList,&pid); + stasis_dirty_page_table_flush_range((stasis_dirty_page_table_t*)stasis_runtime_dirty_page_table(), pid, pid+a.regionSize); + stasis_buffer_manager_t *bm = + (stasis_buffer_manager_t*)stasis_runtime_buffer_manager(); + bm->forcePageRange(bm, pid, pid+a.regionSize); + } +} + + +pageid_t logtree::alloc_region(int xid, void *conf) +{ + RegionAllocConf_t* a = (RegionAllocConf_t*)conf; + + + if(a->nextPage == a->endOfRegion) { + if(a->regionList.size == -1) { + //DEBUG("nextPage: %lld\n", a->nextPage); + a->regionList = TarrayListAlloc(xid, 1, 4, sizeof(pageid_t)); + DEBUG("regionList.page: %lld\n", a->regionList.page); + DEBUG("regionList.slot: %d\n", a->regionList.slot); + DEBUG("regionList.size: %lld\n", a->regionList.size); + + a->regionCount = 0; + } + DEBUG("{%lld <- alloc region arraylist}\n", a->regionList.page); + TarrayListExtend(xid,a->regionList,1); + a->regionList.slot = a->regionCount; + DEBUG("region lst slot %d\n",a->regionList.slot); + a->regionCount++; + DEBUG("region count %lld\n",a->regionCount); + a->nextPage = TregionAlloc(xid, a->regionSize,12); + DEBUG("next page %lld\n",a->nextPage); + a->endOfRegion = a->nextPage + a->regionSize; + Tset(xid,a->regionList,&a->nextPage); + DEBUG("next page %lld\n",a->nextPage); + } + + DEBUG("%lld ?= %lld\n", a->nextPage,a->endOfRegion); + pageid_t ret = a->nextPage; + (a->nextPage)++; + DEBUG("tree %lld-%lld\n", (long long)ret, a->endOfRegion); + return ret; + +} + +pageid_t logtree::alloc_region_rid(int xid, void * ridp) { + recordid rid = *(recordid*)ridp; + RegionAllocConf_t conf; + Tread(xid,rid,&conf); + pageid_t ret = alloc_region(xid,&conf); + //DEBUG("{%lld <- alloc region extend}\n", conf.regionList.page); + // XXX get rid of Tset by storing next page in memory, and losing it + // on crash. + Tset(xid,rid,&conf); + return ret; +} + +pageid_t * logtree::list_region_rid(int xid, void *ridp, pageid_t * region_len, pageid_t * region_count) { + recordid header = *(recordid*)ridp; + RegionAllocConf_t conf; + Tread(xid,header,&conf); + recordid header_list = conf.regionList; + *region_len = conf.regionSize; + *region_count = conf.regionCount; + pageid_t * ret = (pageid_t*) malloc(sizeof(pageid_t) * *region_count); + for(pageid_t i = 0; i < *region_count; i++) { + header_list.slot = i; + Tread(xid,header_list,&ret[i]); + } + return ret; +} + + + +recordid logtree::create(int xid) +{ + + tree_state = Talloc(xid,sizeof(RegionAllocConf_t)); + + //int ptype = TpageGetType(xid, tree_state.page); + //DEBUG("page type %d\n", ptype); //returns a slotted page + + Tset(xid,tree_state, ®ION_ALLOC_STATIC_INITIALIZER); + + pageid_t root = alloc_region_rid(xid, &tree_state); + DEBUG("Root = %lld\n", root); + recordid ret = { root, 0, 0 }; + + Page *p = loadPage(xid, ret.page); + writelock(p->rwlatch,0); + + lastLeaf = -1; + + //initialize root node + stasis_page_slotted_initialize_page(p); + recordid tmp = stasis_record_alloc_begin(xid, p, root_rec_size); + stasis_record_alloc_done(xid,p,tmp); + + assert(tmp.page == ret.page + && tmp.slot == DEPTH + && tmp.size == root_rec_size); + + writeRecord(xid, p, tmp, (byte*)&DEPTH, root_rec_size); + + tmp = stasis_record_alloc_begin(xid, p, root_rec_size); + stasis_record_alloc_done(xid,p,tmp); + + assert(tmp.page == ret.page + && tmp.slot == COMPARATOR + && tmp.size == root_rec_size); + + writeRecord(xid, p, tmp, (byte*)&COMPARATOR, root_rec_size); + + unlock(p->rwlatch); + releasePage(p); + + root_rec = ret; + + return ret; +} + + +/** + * TODO: what happen if there is already such a record with a different size? + * I guess this should never happen in rose, but what if? + **/ +void logtree::writeRecord(int xid, Page *p, recordid &rid, + const byte *data, size_t datalen) +{ + byte *byte_arr = stasis_record_write_begin(xid, p, rid); + memcpy(byte_arr, data, datalen); //TODO: stasis write call + stasis_record_write_done(xid, p, rid, byte_arr); + stasis_page_lsn_write(xid, p, get_lsn(xid)); + +} + + +void logtree::writeNodeRecord(int xid, Page * p, recordid & rid, + const byte *key, size_t keylen, pageid_t ptr) +{ + DEBUG("writenoderecord:\tp->id\t%lld\tkey:\t%s\tkeylen: %d\tval_page\t%lld\n", + p->id, datatuple::key_to_str(key).c_str(), keylen, ptr); + indexnode_rec *nr = (indexnode_rec*)stasis_record_write_begin(xid, p, rid); + nr->ptr = ptr; + memcpy(nr+1, key, keylen); + stasis_record_write_done(xid, p, rid, (byte*)nr); + stasis_page_lsn_write(xid, p, get_lsn(xid)); +} + +void logtree::writeRecord(int xid, Page *p, slotid_t slot, + const byte *data, size_t datalen) +{ + recordid rid; + rid.page = p->id; + rid.slot = slot; + rid.size = datalen; + byte *byte_arr = stasis_record_write_begin(xid, p, rid); + memcpy(byte_arr, data, datalen); //TODO: stasis write call + stasis_record_write_done(xid, p, rid, byte_arr); + stasis_page_lsn_write(xid, p, get_lsn(xid)); + +} + +const byte* logtree::readRecord(int xid, Page * p, recordid &rid) +{ + const byte *nr = stasis_record_read_begin(xid,p,rid); // XXX API violation? + return nr; +} + +const byte* logtree::readRecord(int xid, Page * p, slotid_t slot, int64_t size) +{ + recordid rid; + rid.page = p->id; + rid.slot = slot; + rid.size = size; + //byte *ret = (byte*)malloc(rid.size); + //stasis_record_read(xid,p,rid,ret); + //return ret; + const byte *nr = stasis_record_read_begin(xid,p,rid); + return nr; +// return readRecord(xid, p, rid); + +} + +int32_t logtree::readRecordLength(int xid, Page *p, slotid_t slot) +{ + recordid rec = {p->id, slot, 0}; + int32_t reclen = stasis_record_length_read(xid, p, rec); + return reclen; +} + +void logtree::initializeNodePage(int xid, Page *p) +{ + stasis_page_slotted_initialize_page(p); + recordid reserved1 = stasis_record_alloc_begin(xid, p, sizeof(indexnode_rec)); + stasis_record_alloc_done(xid, p, reserved1); + recordid reserved2 = stasis_record_alloc_begin(xid, p, sizeof(indexnode_rec)); + stasis_record_alloc_done(xid, p, reserved2); +} + + +recordid logtree::appendPage(int xid, recordid tree, pageid_t & rmLeafID, + const byte *key, size_t keySize, + lsm_page_allocator_t allocator, void *allocator_state, + long val_page) +{ + Page *p = loadPage(xid, tree.page); + writelock(p->rwlatch, 0); + //logtree_state *s = (logtree_state*)p->impl; + + tree.slot = 0; + //tree.size = sizeof(lsmTreeNodeRecord)+keySize; + + const indexnode_rec *nr = (const indexnode_rec*)readRecord(xid, p , DEPTH, 0); + int64_t depth = *((int64_t*)nr); + + if(rmLeafID == -1) { + rmLeafID = findLastLeaf(xid, p, depth); + } + + Page *lastLeaf; + + if(rmLeafID != tree.page) + { + lastLeaf= loadPage(xid, rmLeafID); + writelock(lastLeaf->rwlatch, 0); + } else + lastLeaf = p; + + + recordid ret = stasis_record_alloc_begin(xid, lastLeaf, + sizeof(indexnode_rec)+keySize); + + if(ret.size == INVALID_SLOT) + { + if(lastLeaf->id != p->id) + { + assert(rmLeafID != tree.page); + unlock(lastLeaf->rwlatch); + releasePage(lastLeaf); // don't need that page anymore... + lastLeaf = 0; + } + // traverse down the root of the tree. + + tree.slot = 0; + + assert(tree.page == p->id); + + ret = appendInternalNode(xid, p, depth, key, keySize, val_page, + rmLeafID == tree.page ? -1 : rmLeafID, + allocator, allocator_state); + + if(ret.size == INVALID_SLOT) + { + DEBUG("Need to split root; depth = %d\n", depth); + + pageid_t child = allocator(xid, allocator_state); + Page *lc = loadPage(xid, child); + writelock(lc->rwlatch,0); + + initializeNodePage(xid, lc); + + //creates a copy of the root page records in the + //newly allocated child page + for(int i = FIRST_SLOT; i < *stasis_page_slotted_numslots_ptr(p); i++) + { + //read the record from the root page + const indexnode_rec *nr = (const indexnode_rec*)readRecord(xid,p,i,0); + int reclen = readRecordLength(xid, p, i); + + recordid cnext = stasis_record_alloc_begin(xid, lc,reclen); + + assert(i == cnext.slot); + assert(cnext.size != INVALID_SLOT); + + stasis_record_alloc_done(xid, lc, cnext); + + writeRecord(xid,lc,i,(byte*)(nr),reclen); + } + + // deallocate old entries, and update pointer on parent node. + // NOTE: stasis_record_free call goes to slottedFree in slotted.c + // this function only reduces the numslots when you call it + // with the last slot. so thats why i go backwards here. + printf("slots %d (%d) keysize=%lld\n", (int)*stasis_page_slotted_numslots_ptr(p), (int)FIRST_SLOT+1, (long long int)keySize); + assert(*stasis_page_slotted_numslots_ptr(p) >= FIRST_SLOT+1); + for(int i = *stasis_page_slotted_numslots_ptr(p)-1; i>FIRST_SLOT; i--) + { + assert(*stasis_page_slotted_numslots_ptr(p) > FIRST_SLOT+1); + recordid tmp_rec= {p->id, i, INVALID_SIZE}; + stasis_record_free(xid, p, tmp_rec); + } + + //TODO: could change with stasis_slotted_page_initialize(...); + // TODO: fsck? + // stasis_page_slotted_initialize_page(p); + + // reinsert first. + recordid pFirstSlot = { p->id, FIRST_SLOT, readRecordLength(xid, p, FIRST_SLOT)}; + if(*stasis_page_slotted_numslots_ptr(p) != FIRST_SLOT+1) { + printf("slots %d (%d)\n", *stasis_page_slotted_numslots_ptr(p), (int)FIRST_SLOT+1); + assert(*stasis_page_slotted_numslots_ptr(p) == FIRST_SLOT+1); + } + + indexnode_rec *nr + = (indexnode_rec*)stasis_record_write_begin(xid, p, pFirstSlot); + + // don't overwrite key... + nr->ptr = child; + stasis_record_write_done(xid,p,pFirstSlot,(byte*)nr); + stasis_page_lsn_write(xid, p, get_lsn(xid)); + + if(!depth) { + rmLeafID = lc->id; + pageid_t tmpid = -1; + writeRecord(xid,lc,PREV_LEAF,(byte*)(&tmpid), root_rec_size); + writeRecord(xid,lc,NEXT_LEAF,(byte*)(&tmpid), root_rec_size); + } + + unlock(lc->rwlatch); + releasePage(lc); + + //update the depth info at the root + depth ++; + writeRecord(xid,p,DEPTH,(byte*)(&depth), root_rec_size); + + assert(tree.page == p->id); + ret = appendInternalNode(xid, p, depth, key, keySize, val_page, + rmLeafID == tree.page ? -1 : rmLeafID, + allocator, allocator_state); + + assert(ret.size != INVALID_SLOT); + + } + else { + DEBUG("Appended new internal node tree depth = %lld key = %s\n", + depth, datatuple::key_to_str(key).c_str()); + } + + rmLeafID = ret.page; + DEBUG("lastleaf is %lld\n", rmLeafID); + + + } + else + { + // write the new value to an existing page + DEBUG("Writing %s\t%d to existing page# %lld\n", datatuple::key_to_str(key).c_str(), + val_page, lastLeaf->id); + + stasis_record_alloc_done(xid, lastLeaf, ret); + + logtree::writeNodeRecord(xid, lastLeaf, ret, key, keySize, val_page); + + if(lastLeaf->id != p->id) { + assert(rmLeafID != tree.page); + unlock(lastLeaf->rwlatch); + releasePage(lastLeaf); + } + } + + unlock(p->rwlatch); + releasePage(p); + + return ret; +} + +/* adding pages: + + 1) Try to append value to lsmTreeState->lastLeaf + + 2) If that fails, traverses down the root of the tree, split pages while + traversing back up. + + 3) Split is done by adding new page at end of row (no key + redistribution), except at the root, where root contents are + pushed into the first page of the next row, and a new path from root to + leaf is created starting with the root's immediate second child. + +*/ + +recordid logtree::appendInternalNode(int xid, Page *p, + int64_t depth, + const byte *key, size_t key_len, + pageid_t val_page, pageid_t lastLeaf, + logtree_page_allocator_t allocator, + void *allocator_state) +{ +// assert(*stasis_page_type_ptr(p) == LOGTREE_ROOT_PAGE || +// *stasis_page_type_ptr(p) == SLOTTED_PAGE); + assert(p->pageType == LOGTREE_ROOT_PAGE || + p->pageType == SLOTTED_PAGE); + + DEBUG("appendInternalNode\tdepth %lldkeylen%d\tnumslots %d\n", depth, key_len, *stasis_page_slotted_numslots_ptr(p)); + + if(!depth) + { + // leaf node. + recordid ret = stasis_record_alloc_begin(xid, p, sizeof(indexnode_rec)+key_len); + if(ret.size != INVALID_SLOT) { + stasis_record_alloc_done(xid, p, ret); + writeNodeRecord(xid,p,ret,key,key_len,val_page); + } + return ret; + } + else + { + // recurse + int slot = *stasis_page_slotted_numslots_ptr(p)-1;//*recordcount_ptr(p)-1; + + assert(slot >= FIRST_SLOT); // there should be no empty nodes + const indexnode_rec *nr = (const indexnode_rec*)readRecord(xid, p, slot, 0); + pageid_t child_id = nr->ptr; + nr = 0; + recordid ret; + { + Page *child_page = loadPage(xid, child_id); + writelock(child_page->rwlatch,0); + ret = appendInternalNode(xid, child_page, depth-1, key, key_len, + val_page, lastLeaf, allocator, allocator_state); + + unlock(child_page->rwlatch); + releasePage(child_page); + } + + if(ret.size == INVALID_SLOT) // subtree is full; split + { + ret = stasis_record_alloc_begin(xid, p, sizeof(indexnode_rec)+key_len); + DEBUG("keylen %d\tnumslots %d for page id %lld ret.size %lld prv rec len %d\n", + key_len, + *stasis_page_slotted_numslots_ptr(p), + p->id, + ret.size, + readRecordLength(xid, p, slot)); + if(ret.size != INVALID_SLOT) + { + stasis_record_alloc_done(xid, p, ret); + ret = buildPathToLeaf(xid, ret, p, depth, key, key_len, val_page, + lastLeaf, allocator, allocator_state); + + DEBUG("split tree rooted at %lld, wrote value to {%d %d %lld}\n", + p->id, ret.page, ret.slot, ret.size); + } else { + // ret is NULLRID; this is the root of a full tree. Return + // NULLRID to the caller. + } + } else { + // we inserted the value in to a subtree rooted here. + } + return ret; + } +} + +recordid logtree::buildPathToLeaf(int xid, recordid root, Page *root_p, + int64_t depth, const byte *key, size_t key_len, + pageid_t val_page, pageid_t lastLeaf, + logtree_page_allocator_t allocator, + void *allocator_state) +{ + + // root is the recordid on the root page that should point to the + // new subtree. + assert(depth); + DEBUG("buildPathToLeaf(depth=%lld) (lastleaf=%lld) called\n",depth, lastLeaf); + + pageid_t child = allocator(xid,allocator_state); + DEBUG("new child = %lld internal? %lld\n", child, depth-1); + + Page *child_p = loadPage(xid, child); + writelock(child_p->rwlatch,0); + initializeNodePage(xid, child_p); + + recordid ret; + + if(depth-1) { + // recurse: the page we just allocated is not a leaf. + recordid child_rec = stasis_record_alloc_begin(xid, child_p, sizeof(indexnode_rec)+key_len); + assert(child_rec.size != INVALID_SLOT); + stasis_record_alloc_done(xid, child_p, child_rec); + + ret = buildPathToLeaf(xid, child_rec, child_p, depth-1, key, key_len, + val_page,lastLeaf, allocator, allocator_state); + + unlock(child_p->rwlatch); + releasePage(child_p); + + } else { + // set leaf + + // backward link.//these writes do not need alloc_begin as it is done in page initialization + writeRecord(xid, child_p, PREV_LEAF, (byte*)(&lastLeaf), root_rec_size); + //writeNodeRecord(xid,child_p,PREV_LEAF,dummy,key_len,lastLeaf); + + // forward link (initialize to -1) + + pageid_t tmp_pid = -1; + writeRecord(xid, child_p, NEXT_LEAF, (byte*)(&tmp_pid), root_rec_size); + //writeNodeRecord(xid,child_p,NEXT_LEAF,dummy,key_len,-1); + + recordid leaf_rec = stasis_record_alloc_begin(xid, child_p, + sizeof(indexnode_rec)+key_len); + + assert(leaf_rec.slot == FIRST_SLOT); + + stasis_record_alloc_done(xid, child_p, leaf_rec); + writeNodeRecord(xid,child_p,leaf_rec,key,key_len,val_page); + + ret = leaf_rec; + + unlock(child_p->rwlatch); + releasePage(child_p); + if(lastLeaf != -1) + { + // install forward link in previous page + Page *lastLeafP = loadPage(xid, lastLeaf); + writelock(lastLeafP->rwlatch,0); + writeRecord(xid,lastLeafP,NEXT_LEAF,(byte*)(&child),root_rec_size); + unlock(lastLeafP->rwlatch); + releasePage(lastLeafP); + } + + DEBUG("%lld <-> %lld\n", lastLeaf, child); + } + + writeNodeRecord(xid, root_p, root, key, key_len, child); + + return ret; + +} + + + +/** + * Traverse from the root of the page to the right most leaf (the one + * with the higest base key value). + **/ +pageid_t logtree::findLastLeaf(int xid, Page *root, int64_t depth) +{ + if(!depth) + { + DEBUG("Found last leaf = %lld\n", root->id); + return root->id; + } + else + { + const indexnode_rec *nr = (indexnode_rec*) readRecord(xid, root, + (*stasis_page_slotted_numslots_ptr(root))-1, 0); + pageid_t ret; + + Page *p = loadPage(xid, nr->ptr); + readlock(p->rwlatch,0); + ret = findLastLeaf(xid,p,depth-1); + unlock(p->rwlatch); + releasePage(p); + + return ret; + } +} + + +/** + * Traverse from the root of the tree to the left most (lowest valued + * key) leaf. + */ +pageid_t logtree::findFirstLeaf(int xid, Page *root, int64_t depth) +{ + if(!depth) //if depth is 0, then returns the id of the page + return root->id; + else + { + const indexnode_rec *nr = (indexnode_rec*)readRecord(xid,root,FIRST_SLOT,0); + Page *p = loadPage(xid, nr->ptr); + readlock(p->rwlatch,0); + pageid_t ret = findFirstLeaf(xid,p,depth-1); + unlock(p->rwlatch); + releasePage(p); + return ret; + } +} + + +pageid_t logtree::findPage(int xid, recordid tree, const byte *key, size_t keySize) +{ + Page *p = loadPage(xid, tree.page); + readlock(p->rwlatch,0); + + const indexnode_rec *depth_nr = (const indexnode_rec*)readRecord(xid, p , DEPTH, 0); + + int64_t depth = *((int64_t*)depth_nr); + + recordid rid = lookup(xid, p, depth, key, keySize); + pageid_t ret = lookupLeafPageFromRid(xid,rid);//,keySize); + unlock(p->rwlatch); + releasePage(p); + + return ret; + +} + +pageid_t logtree::lookupLeafPageFromRid(int xid, recordid rid) +{ + pageid_t pid = -1; + if(rid.page != NULLRID.page || rid.slot != NULLRID.slot) + { + Page * p2 = loadPage(xid, rid.page); + readlock(p2->rwlatch,0); + pid = ((const indexnode_rec*)(readRecord(xid,p2,rid.slot,0)))->ptr; + unlock(p2->rwlatch); + releasePage(p2); + } + return pid; +} + + +recordid logtree::lookup(int xid, + Page *node, + int64_t depth, + const byte *key, size_t keySize ) +{ + //DEBUG("lookup: pid %lld\t depth %lld\n", node->id, depth); + if(*stasis_page_slotted_numslots_ptr(node) == FIRST_SLOT) + return NULLRID; + + assert(*stasis_page_slotted_numslots_ptr(node) > FIRST_SLOT); + + int match = FIRST_SLOT; + + // don't need to compare w/ first item in tree. + const indexnode_rec * rec = (indexnode_rec*)readRecord(xid,node,FIRST_SLOT,0); //TODO: why read it then? + + for(int i = FIRST_SLOT+1; i < *stasis_page_slotted_numslots_ptr(node); i++) + { + rec = (const indexnode_rec*)readRecord(xid,node,i,0); + int cmpval = datatuple::compare((datatuple::key_t) (rec+1),(datatuple::key_t) key); + if(cmpval>0) //changed it from > + break; + match = i; + } + + + if(depth) + { + pageid_t child_id = ((const indexnode_rec*)readRecord(xid,node,match,0))->ptr; + Page* child_page = loadPage(xid, child_id); + readlock(child_page->rwlatch,0); + recordid ret = lookup(xid,child_page,depth-1,key,0); + unlock(child_page->rwlatch); + releasePage(child_page); + return ret; + } + else + { + recordid ret = {node->id, match, keySize}; + return ret; + } +} + + +void logtree::print_tree(int xid) +{ + Page *p = loadPage(xid, root_rec.page); + readlock(p->rwlatch,0); + + const indexnode_rec *depth_nr = (const indexnode_rec*)readRecord(xid, p , DEPTH, 0); + + int64_t depth = *((int64_t*)depth_nr); + + print_tree(xid, root_rec.page, depth); + + unlock(p->rwlatch); + releasePage(p); + +} + +void logtree::print_tree(int xid, pageid_t pid, int64_t depth) +{ + + Page *node = loadPage(xid, pid); + readlock(node->rwlatch,0); + + //const indexnode_rec *depth_nr = (const indexnode_rec*)readRecord(xid, p , DEPTH, 0); + + printf("page_id:%lld\tnum_slots:%d\t\n", node->id, *stasis_page_slotted_numslots_ptr(node)); + + if(*stasis_page_slotted_numslots_ptr(node) == FIRST_SLOT) + return; + + assert(*stasis_page_slotted_numslots_ptr(node) > FIRST_SLOT); + + if(depth) + { + printf("\tnot_leaf\n"); + + for(int i = FIRST_SLOT; i < *stasis_page_slotted_numslots_ptr(node); i++) + { + const indexnode_rec *nr = (const indexnode_rec*)readRecord(xid,node,i,0); + printf("\tchild_page_id:%lld\tkey:%s\n", nr->ptr, + datatuple::key_to_str((byte*)(nr+1)).c_str()); + + } + + for(int i = FIRST_SLOT; i < *stasis_page_slotted_numslots_ptr(node); i++) + { + const indexnode_rec *nr = (const indexnode_rec*)readRecord(xid,node,i,0); + print_tree(xid, nr->ptr, depth-1); + + } + + } + else + { + printf("\tis_leaf\t\n"); + const indexnode_rec *nr = (const indexnode_rec*)readRecord(xid,node,FIRST_SLOT,0); + printf("\tdata_page_id:%lld\tkey:%s\n", nr->ptr, + datatuple::key_to_str((byte*)(nr+1)).c_str()); + printf("\t...\n"); + nr = (const indexnode_rec*)readRecord(xid,node,(*stasis_page_slotted_numslots_ptr(node))-1,0); + printf("\tdata_page_id:%lld\tkey:%s\n", nr->ptr, + datatuple::key_to_str((byte*)(nr+1)).c_str()); + + + } + + + unlock(node->rwlatch); + releasePage(node); + + +} + +///////////////////////////////////////////////// +//logtreeIterator implementation +///////////////////////////////////////////////// + +lladdIterator_t* logtreeIterator::open(int xid, recordid root) +{ + if(root.page == 0 && root.slot == 0 && root.size == -1) + return 0; + + Page *p = loadPage(xid,root.page); + readlock(p->rwlatch,0); + + //size_t keySize = getKeySize(xid,p); + DEBUG("ROOT_REC_SIZE %d\n", logtree::root_rec_size); + const byte * nr = logtree::readRecord(xid,p, + logtree::DEPTH, + logtree::root_rec_size); + int64_t depth = *((int64_t*)nr); + DEBUG("DEPTH = %lld\n", depth); + + pageid_t leafid = logtree::findFirstLeaf(xid, p, depth); + if(leafid != root.page) + { + unlock(p->rwlatch); + releasePage(p); + p = loadPage(xid,leafid); + readlock(p->rwlatch,0); + assert(depth != 0); + } + else + assert(depth == 0); + + + logtreeIterator_s *impl = (logtreeIterator_s*)malloc(sizeof(logtreeIterator_s)); + impl->p = p; + { + recordid rid = { p->id, 1, 0};//keySize }; //TODO: why does this start from 1? + impl->current = rid; + } + //DEBUG("keysize = %d, slot = %d\n", keySize, impl->current.slot); + impl->t = 0; + impl->justOnePage = (depth == 0); + + lladdIterator_t *it = (lladdIterator_t*) malloc(sizeof(lladdIterator_t)); + it->type = -1; // XXX LSM_TREE_ITERATOR; + it->impl = impl; + return it; +} + +lladdIterator_t* logtreeIterator::openAt(int xid, recordid root, const byte* key) +{ + if(root.page == NULLRID.page && root.slot == NULLRID.slot) + return 0; + + Page *p = loadPage(xid,root.page); + readlock(p->rwlatch,0); + //size_t keySize = getKeySize(xid,p); + //assert(keySize); + const byte *nr = logtree::readRecord(xid,p,logtree::DEPTH, logtree::root_rec_size); + //const byte *cmp_nr = logtree::readRecord(xid, p , logtree::COMPARATOR, logtree::root_rec_size); + + int64_t depth = *((int64_t*)nr); + + recordid lsm_entry_rid = logtree::lookup(xid,p,depth,key,0);//keySize,comparators[cmp_nr->ptr]); + + if(lsm_entry_rid.page == NULLRID.page && lsm_entry_rid.slot == NULLRID.slot) { + unlock(p->rwlatch); + return 0; + } + assert(lsm_entry_rid.size != INVALID_SLOT); + + if(root.page != lsm_entry_rid.page) + { + unlock(p->rwlatch); + releasePage(p); + p = loadPage(xid,lsm_entry_rid.page); + readlock(p->rwlatch,0); + } + + logtreeIterator_s *impl = (logtreeIterator_s*) malloc(sizeof(logtreeIterator_s)); + impl->p = p; + + impl->current.page = lsm_entry_rid.page; + impl->current.slot = lsm_entry_rid.slot - 1; // slot before thing of interest + impl->current.size = lsm_entry_rid.size; + + impl->t = 0; // must be zero so free() doesn't croak. + impl->justOnePage = (depth==0); + + lladdIterator_t *it = (lladdIterator_t*) malloc(sizeof(lladdIterator_t)); + it->type = -1; // XXX LSM_TREE_ITERATOR + it->impl = impl; + return it; +} + +/** + * move to the next page + **/ +int logtreeIterator::next(int xid, lladdIterator_t *it) +{ + logtreeIterator_s *impl = (logtreeIterator_s*) it->impl; + + impl->current = stasis_record_next(xid, impl->p, impl->current); + + if(impl->current.size == INVALID_SLOT) + { + + const indexnode_rec next_rec = *(const indexnode_rec*)logtree::readRecord(xid,impl->p, + logtree::NEXT_LEAF, + 0); + unlock(impl->p->rwlatch); + releasePage(impl->p); + + DEBUG("done with page %lld next = %lld\n", impl->p->id, next_rec.ptr); + + + if(next_rec.ptr != -1 && ! impl->justOnePage) + { + impl->p = loadPage(xid, next_rec.ptr); + readlock(impl->p->rwlatch,0); + impl->current.page = next_rec.ptr; + impl->current.slot = 2; + impl->current.size = stasis_record_length_read(xid, impl->p, impl->current); //keySize; + } else { + impl->p = 0; + impl->current.size = INVALID_SLOT; + } + + } + else + { + /* + assert(impl->current.size == keySize + sizeof(lsmTreeNodeRecord)); + impl->current.size = keySize; + */ + } + + + if(impl->current.size != INVALID_SLOT) + { + //size_t sz = sizeof(*impl->t) + impl->current.size; + if(impl->t != NULL) + free(impl->t); + + impl->t = (indexnode_rec*)malloc(impl->current.size); + memcpy(impl->t, logtree::readRecord(xid,impl->p,impl->current), impl->current.size); + + return 1; + } + else + { + assert(!impl->p); + if(impl->t != NULL) + free(impl->t); + impl->t = 0; + return 0; + } + +} + +void logtreeIterator::close(int xid, lladdIterator_t *it) +{ + logtreeIterator_s *impl = (logtreeIterator_s*)it->impl; + if(impl->p) + { + unlock(impl->p->rwlatch); + releasePage(impl->p); + } + if(impl->t) + { + free(impl->t); + } + free(impl); + free(it); +} diff --git a/diskTreeComponent.h b/diskTreeComponent.h new file mode 100644 index 0000000..5307737 --- /dev/null +++ b/diskTreeComponent.h @@ -0,0 +1,146 @@ +/* + * diskTreeComponent.h + * + * Created on: Feb 18, 2010 + * Author: sears + */ + +#ifndef DISKTREECOMPONENT_H_ +#define DISKTREECOMPONENT_H_ + +#include + +#include +#include +#include +#include +#include +#include + + +#include "datapage.h" +#include "tuplemerger.h" +#include "datatuple.h" + + +typedef struct RegionAllocConf_t +{ + recordid regionList; + pageid_t regionCount; + pageid_t nextPage; + pageid_t endOfRegion; + pageid_t regionSize; +} RegionAllocConf_t; + + +typedef pageid_t(*logtree_page_allocator_t)(int, void *); +typedef void(*logtree_page_deallocator_t)(int, void *); + + +class logtree{ +public: + logtree(int xid): region_alloc(new DataPage::RegionAllocator(xid, 10000)) {create(xid);} // XXX shouldn't hardcode region size. +private: + recordid create(int xid); +public: + void print_tree(int xid); + + static void init_stasis(); + static void deinit_stasis(); +private: + static pageid_t alloc_region(int xid, void *conf); +public: + static pageid_t alloc_region_rid(int xid, void * ridp); + static void force_region_rid(int xid, recordid rid); + static pageid_t*list_region_rid(int xid, void * ridp, pageid_t * region_len, pageid_t * region_count); + static void dealloc_region_rid(int xid, recordid rid); + static void free_region_rid(int xid, recordid tree, + logtree_page_deallocator_t dealloc, + void *allocator_state); + + static void writeNodeRecord(int xid, Page *p, recordid &rid, + const byte *key, size_t keylen, pageid_t ptr); + + static void writeRecord(int xid, Page *p, recordid &rid, + const byte *data, size_t datalen); + + static void writeRecord(int xid, Page *p, slotid_t slot, + const byte *data, size_t datalen); + + static const byte* readRecord(int xid, Page * p, recordid &rid); + static const byte* readRecord(int xid, Page * p, slotid_t slot, int64_t size); + + static int32_t readRecordLength(int xid, Page *p, slotid_t slot); + + //return the left-most leaf, these are not data pages, although referred to as leaf + static pageid_t findFirstLeaf(int xid, Page *root, int64_t depth); + //return the right-most leaf + static pageid_t findLastLeaf(int xid, Page *root, int64_t depth) ; + + //reads the given record and returns the page id stored in it + static pageid_t lookupLeafPageFromRid(int xid, recordid rid); + + //returns a record that stores the pageid where the given key should be in, i.e. if it exists + static recordid lookup(int xid, Page *node, int64_t depth, const byte *key, + size_t keySize); + + //returns the id of the data page that could contain the given key + static pageid_t findPage(int xid, recordid tree, const byte *key, size_t keySize); + + + //appends a leaf page, val_page is the id of the leaf page + //rmLeafID --> rightmost leaf id + static recordid appendPage(int xid, recordid tree, pageid_t & rmLeafID, + const byte *key,size_t keySize, + logtree_page_allocator_t allocator, void *allocator_state, + long val_page); + + static recordid appendInternalNode(int xid, Page *p, + int64_t depth, + const byte *key, size_t key_len, + pageid_t val_page, pageid_t lastLeaf, + logtree_page_allocator_t allocator, + void *allocator_state); + + static recordid buildPathToLeaf(int xid, recordid root, Page *root_p, + int64_t depth, const byte *key, size_t key_len, + pageid_t val_page, pageid_t lastLeaf, + logtree_page_allocator_t allocator, + void *allocator_state); + + inline DataPage::RegionAllocator* get_alloc() { return region_alloc; } + + /** + Initialize a page for use as an internal node of the tree. + */ + inline static void initializeNodePage(int xid, Page *p); + + recordid &get_tree_state(){return tree_state;} + recordid &get_root_rec(){return root_rec;} + +public: + + const static RegionAllocConf_t REGION_ALLOC_STATIC_INITIALIZER; + const static int64_t DEPTH; + const static int64_t COMPARATOR; + const static int64_t FIRST_SLOT; + const static size_t root_rec_size; + const static int64_t PREV_LEAF; + const static int64_t NEXT_LEAF; + + pageid_t lastLeaf; +private: + + void print_tree(int xid, pageid_t pid, int64_t depth); + +private: + recordid tree_state; + recordid root_rec; + + DataPage::RegionAllocator* region_alloc; + + +}; + + +#endif /* DISKTREECOMPONENT_H_ */ diff --git a/logstore.cpp b/logstore.cpp index e7cfb19..e296421 100644 --- a/logstore.cpp +++ b/logstore.cpp @@ -14,811 +14,12 @@ #include #include - static inline double tv_to_double(struct timeval tv) { return static_cast(tv.tv_sec) + (static_cast(tv.tv_usec) / 1000000.0); } -///////////////////////////////////////////////////////////////// -// LOGTREE implementation -///////////////////////////////////////////////////////////////// - -const RegionAllocConf_t logtree::REGION_ALLOC_STATIC_INITIALIZER = { {0,0,-1}, 0, -1, -1, 1000 }; - -#define LOGTREE_ROOT_PAGE SLOTTED_PAGE - -//LSM_ROOT_PAGE - -const int64_t logtree::DEPTH = 0; //in root this is the slot num where the DEPTH (of tree) is stored -const int64_t logtree::COMPARATOR = 1; //in root this is the slot num where the COMPARATOR id is stored -const int64_t logtree::FIRST_SLOT = 2; //this is the first unused slot in all index pages -const size_t logtree::root_rec_size = sizeof(int64_t); -const int64_t logtree::PREV_LEAF = 0; //pointer to prev leaf page -const int64_t logtree::NEXT_LEAF = 1; //pointer to next leaf page - -void logtree::init_stasis() { - - bufferManagerFileHandleType = BUFFER_MANAGER_FILE_HANDLE_PFILE; - - DataPage::register_stasis_page_impl(); - - stasis_buffer_manager_factory = stasis_buffer_manager_hash_factory; // XXX workaround stasis issue #22. - - Tinit(); - -} - -void logtree::deinit_stasis() { Tdeinit(); } - -void logtree::free_region_rid(int xid, recordid tree, - logtree_page_deallocator_t dealloc, void *allocator_state) -{ - // Tdealloc(xid,tree); - dealloc(xid,allocator_state); - // XXX fishy shouldn't caller do this? - Tdealloc(xid, *(recordid*)allocator_state); -} - - -void logtree::dealloc_region_rid(int xid, recordid rid) -{ - RegionAllocConf_t a; - Tread(xid,rid,&a); - DEBUG("{%lld <- dealloc region arraylist}\n", a.regionList.page); - - for(int i = 0; i < a.regionCount; i++) { - a.regionList.slot = i; - pageid_t pid; - Tread(xid,a.regionList,&pid); - TregionDealloc(xid,pid); - } - a.regionList.slot = 0; -// printf("Warning: leaking arraylist %lld in logtree\n", (long long)a.regionList.page); - TarrayListDealloc(xid, a.regionList); -} - - -void logtree::force_region_rid(int xid, recordid rid) -{ - RegionAllocConf_t a; - Tread(xid,rid,&a); - - for(int i = 0; i < a.regionCount; i++) - { - a.regionList.slot = i; - pageid_t pid; - Tread(xid,a.regionList,&pid); - stasis_dirty_page_table_flush_range((stasis_dirty_page_table_t*)stasis_runtime_dirty_page_table(), pid, pid+a.regionSize); - stasis_buffer_manager_t *bm = - (stasis_buffer_manager_t*)stasis_runtime_buffer_manager(); - bm->forcePageRange(bm, pid, pid+a.regionSize); - } -} - - -pageid_t logtree::alloc_region(int xid, void *conf) -{ - RegionAllocConf_t* a = (RegionAllocConf_t*)conf; - - - if(a->nextPage == a->endOfRegion) { - if(a->regionList.size == -1) { - //DEBUG("nextPage: %lld\n", a->nextPage); - a->regionList = TarrayListAlloc(xid, 1, 4, sizeof(pageid_t)); - DEBUG("regionList.page: %lld\n", a->regionList.page); - DEBUG("regionList.slot: %d\n", a->regionList.slot); - DEBUG("regionList.size: %lld\n", a->regionList.size); - - a->regionCount = 0; - } - DEBUG("{%lld <- alloc region arraylist}\n", a->regionList.page); - TarrayListExtend(xid,a->regionList,1); - a->regionList.slot = a->regionCount; - DEBUG("region lst slot %d\n",a->regionList.slot); - a->regionCount++; - DEBUG("region count %lld\n",a->regionCount); - a->nextPage = TregionAlloc(xid, a->regionSize,12); - DEBUG("next page %lld\n",a->nextPage); - a->endOfRegion = a->nextPage + a->regionSize; - Tset(xid,a->regionList,&a->nextPage); - DEBUG("next page %lld\n",a->nextPage); - } - - DEBUG("%lld ?= %lld\n", a->nextPage,a->endOfRegion); - pageid_t ret = a->nextPage; - (a->nextPage)++; - DEBUG("tree %lld-%lld\n", (long long)ret, a->endOfRegion); - return ret; - -} - -pageid_t logtree::alloc_region_rid(int xid, void * ridp) { - recordid rid = *(recordid*)ridp; - RegionAllocConf_t conf; - Tread(xid,rid,&conf); - pageid_t ret = alloc_region(xid,&conf); - //DEBUG("{%lld <- alloc region extend}\n", conf.regionList.page); - // XXX get rid of Tset by storing next page in memory, and losing it - // on crash. - Tset(xid,rid,&conf); - return ret; -} - -pageid_t * logtree::list_region_rid(int xid, void *ridp, pageid_t * region_len, pageid_t * region_count) { - recordid header = *(recordid*)ridp; - RegionAllocConf_t conf; - Tread(xid,header,&conf); - recordid header_list = conf.regionList; - *region_len = conf.regionSize; - *region_count = conf.regionCount; - pageid_t * ret = (pageid_t*) malloc(sizeof(pageid_t) * *region_count); - for(pageid_t i = 0; i < *region_count; i++) { - header_list.slot = i; - Tread(xid,header_list,&ret[i]); - } - return ret; -} - - - -recordid logtree::create(int xid) -{ - - tree_state = Talloc(xid,sizeof(RegionAllocConf_t)); - - //int ptype = TpageGetType(xid, tree_state.page); - //DEBUG("page type %d\n", ptype); //returns a slotted page - - Tset(xid,tree_state, ®ION_ALLOC_STATIC_INITIALIZER); - - pageid_t root = alloc_region_rid(xid, &tree_state); - DEBUG("Root = %lld\n", root); - recordid ret = { root, 0, 0 }; - - Page *p = loadPage(xid, ret.page); - writelock(p->rwlatch,0); - - lastLeaf = -1; - - //initialize root node - stasis_page_slotted_initialize_page(p); - recordid tmp = stasis_record_alloc_begin(xid, p, root_rec_size); - stasis_record_alloc_done(xid,p,tmp); - - assert(tmp.page == ret.page - && tmp.slot == DEPTH - && tmp.size == root_rec_size); - - writeRecord(xid, p, tmp, (byte*)&DEPTH, root_rec_size); - - tmp = stasis_record_alloc_begin(xid, p, root_rec_size); - stasis_record_alloc_done(xid,p,tmp); - - assert(tmp.page == ret.page - && tmp.slot == COMPARATOR - && tmp.size == root_rec_size); - - writeRecord(xid, p, tmp, (byte*)&COMPARATOR, root_rec_size); - - unlock(p->rwlatch); - releasePage(p); - - root_rec = ret; - - return ret; -} - - -/** - * TODO: what happen if there is already such a record with a different size? - * I guess this should never happen in rose, but what if? - **/ -void logtree::writeRecord(int xid, Page *p, recordid &rid, - const byte *data, size_t datalen) -{ - byte *byte_arr = stasis_record_write_begin(xid, p, rid); - memcpy(byte_arr, data, datalen); //TODO: stasis write call - stasis_record_write_done(xid, p, rid, byte_arr); - stasis_page_lsn_write(xid, p, get_lsn(xid)); - -} - - -void logtree::writeNodeRecord(int xid, Page * p, recordid & rid, - const byte *key, size_t keylen, pageid_t ptr) -{ - DEBUG("writenoderecord:\tp->id\t%lld\tkey:\t%s\tkeylen: %d\tval_page\t%lld\n", - p->id, datatuple::key_to_str(key).c_str(), keylen, ptr); - indexnode_rec *nr = (indexnode_rec*)stasis_record_write_begin(xid, p, rid); - nr->ptr = ptr; - memcpy(nr+1, key, keylen); - stasis_record_write_done(xid, p, rid, (byte*)nr); - stasis_page_lsn_write(xid, p, get_lsn(xid)); -} - -void logtree::writeRecord(int xid, Page *p, slotid_t slot, - const byte *data, size_t datalen) -{ - recordid rid; - rid.page = p->id; - rid.slot = slot; - rid.size = datalen; - byte *byte_arr = stasis_record_write_begin(xid, p, rid); - memcpy(byte_arr, data, datalen); //TODO: stasis write call - stasis_record_write_done(xid, p, rid, byte_arr); - stasis_page_lsn_write(xid, p, get_lsn(xid)); - -} - -const byte* logtree::readRecord(int xid, Page * p, recordid &rid) -{ - const byte *nr = stasis_record_read_begin(xid,p,rid); // XXX API violation? - return nr; -} - -const byte* logtree::readRecord(int xid, Page * p, slotid_t slot, int64_t size) -{ - recordid rid; - rid.page = p->id; - rid.slot = slot; - rid.size = size; - //byte *ret = (byte*)malloc(rid.size); - //stasis_record_read(xid,p,rid,ret); - //return ret; - const byte *nr = stasis_record_read_begin(xid,p,rid); - return nr; -// return readRecord(xid, p, rid); - -} - -int32_t logtree::readRecordLength(int xid, Page *p, slotid_t slot) -{ - recordid rec = {p->id, slot, 0}; - int32_t reclen = stasis_record_length_read(xid, p, rec); - return reclen; -} - -void logtree::initializeNodePage(int xid, Page *p) -{ - stasis_page_slotted_initialize_page(p); - recordid reserved1 = stasis_record_alloc_begin(xid, p, sizeof(indexnode_rec)); - stasis_record_alloc_done(xid, p, reserved1); - recordid reserved2 = stasis_record_alloc_begin(xid, p, sizeof(indexnode_rec)); - stasis_record_alloc_done(xid, p, reserved2); -} - - -recordid logtree::appendPage(int xid, recordid tree, pageid_t & rmLeafID, - const byte *key, size_t keySize, - lsm_page_allocator_t allocator, void *allocator_state, - long val_page) -{ - Page *p = loadPage(xid, tree.page); - writelock(p->rwlatch, 0); - //logtree_state *s = (logtree_state*)p->impl; - - tree.slot = 0; - //tree.size = sizeof(lsmTreeNodeRecord)+keySize; - - const indexnode_rec *nr = (const indexnode_rec*)readRecord(xid, p , DEPTH, 0); - int64_t depth = *((int64_t*)nr); - - if(rmLeafID == -1) { - rmLeafID = findLastLeaf(xid, p, depth); - } - - Page *lastLeaf; - - if(rmLeafID != tree.page) - { - lastLeaf= loadPage(xid, rmLeafID); - writelock(lastLeaf->rwlatch, 0); - } else - lastLeaf = p; - - - recordid ret = stasis_record_alloc_begin(xid, lastLeaf, - sizeof(indexnode_rec)+keySize); - - if(ret.size == INVALID_SLOT) - { - if(lastLeaf->id != p->id) - { - assert(rmLeafID != tree.page); - unlock(lastLeaf->rwlatch); - releasePage(lastLeaf); // don't need that page anymore... - lastLeaf = 0; - } - // traverse down the root of the tree. - - tree.slot = 0; - - assert(tree.page == p->id); - - ret = appendInternalNode(xid, p, depth, key, keySize, val_page, - rmLeafID == tree.page ? -1 : rmLeafID, - allocator, allocator_state); - - if(ret.size == INVALID_SLOT) - { - DEBUG("Need to split root; depth = %d\n", depth); - - pageid_t child = allocator(xid, allocator_state); - Page *lc = loadPage(xid, child); - writelock(lc->rwlatch,0); - - initializeNodePage(xid, lc); - - //creates a copy of the root page records in the - //newly allocated child page - for(int i = FIRST_SLOT; i < *stasis_page_slotted_numslots_ptr(p); i++) - { - //read the record from the root page - const indexnode_rec *nr = (const indexnode_rec*)readRecord(xid,p,i,0); - int reclen = readRecordLength(xid, p, i); - - recordid cnext = stasis_record_alloc_begin(xid, lc,reclen); - - assert(i == cnext.slot); - assert(cnext.size != INVALID_SLOT); - - stasis_record_alloc_done(xid, lc, cnext); - - writeRecord(xid,lc,i,(byte*)(nr),reclen); - } - - // deallocate old entries, and update pointer on parent node. - // NOTE: stasis_record_free call goes to slottedFree in slotted.c - // this function only reduces the numslots when you call it - // with the last slot. so thats why i go backwards here. - printf("slots %d (%d) keysize=%lld\n", (int)*stasis_page_slotted_numslots_ptr(p), (int)FIRST_SLOT+1, (long long int)keySize); - assert(*stasis_page_slotted_numslots_ptr(p) >= FIRST_SLOT+1); - for(int i = *stasis_page_slotted_numslots_ptr(p)-1; i>FIRST_SLOT; i--) - { - assert(*stasis_page_slotted_numslots_ptr(p) > FIRST_SLOT+1); - recordid tmp_rec= {p->id, i, INVALID_SIZE}; - stasis_record_free(xid, p, tmp_rec); - } - - //TODO: could change with stasis_slotted_page_initialize(...); - // TODO: fsck? - // stasis_page_slotted_initialize_page(p); - - // reinsert first. - recordid pFirstSlot = { p->id, FIRST_SLOT, readRecordLength(xid, p, FIRST_SLOT)}; - if(*stasis_page_slotted_numslots_ptr(p) != FIRST_SLOT+1) { - printf("slots %d (%d)\n", *stasis_page_slotted_numslots_ptr(p), (int)FIRST_SLOT+1); - assert(*stasis_page_slotted_numslots_ptr(p) == FIRST_SLOT+1); - } - - indexnode_rec *nr - = (indexnode_rec*)stasis_record_write_begin(xid, p, pFirstSlot); - - // don't overwrite key... - nr->ptr = child; - stasis_record_write_done(xid,p,pFirstSlot,(byte*)nr); - stasis_page_lsn_write(xid, p, get_lsn(xid)); - - if(!depth) { - rmLeafID = lc->id; - pageid_t tmpid = -1; - writeRecord(xid,lc,PREV_LEAF,(byte*)(&tmpid), root_rec_size); - writeRecord(xid,lc,NEXT_LEAF,(byte*)(&tmpid), root_rec_size); - } - - unlock(lc->rwlatch); - releasePage(lc); - - //update the depth info at the root - depth ++; - writeRecord(xid,p,DEPTH,(byte*)(&depth), root_rec_size); - - assert(tree.page == p->id); - ret = appendInternalNode(xid, p, depth, key, keySize, val_page, - rmLeafID == tree.page ? -1 : rmLeafID, - allocator, allocator_state); - - assert(ret.size != INVALID_SLOT); - - } - else { - DEBUG("Appended new internal node tree depth = %lld key = %s\n", - depth, datatuple::key_to_str(key).c_str()); - } - - rmLeafID = ret.page; - DEBUG("lastleaf is %lld\n", rmLeafID); - - - } - else - { - // write the new value to an existing page - DEBUG("Writing %s\t%d to existing page# %lld\n", datatuple::key_to_str(key).c_str(), - val_page, lastLeaf->id); - - stasis_record_alloc_done(xid, lastLeaf, ret); - - logtree::writeNodeRecord(xid, lastLeaf, ret, key, keySize, val_page); - - if(lastLeaf->id != p->id) { - assert(rmLeafID != tree.page); - unlock(lastLeaf->rwlatch); - releasePage(lastLeaf); - } - } - - unlock(p->rwlatch); - releasePage(p); - - return ret; -} - -/* adding pages: - - 1) Try to append value to lsmTreeState->lastLeaf - - 2) If that fails, traverses down the root of the tree, split pages while - traversing back up. - - 3) Split is done by adding new page at end of row (no key - redistribution), except at the root, where root contents are - pushed into the first page of the next row, and a new path from root to - leaf is created starting with the root's immediate second child. - -*/ - -recordid logtree::appendInternalNode(int xid, Page *p, - int64_t depth, - const byte *key, size_t key_len, - pageid_t val_page, pageid_t lastLeaf, - logtree_page_allocator_t allocator, - void *allocator_state) -{ -// assert(*stasis_page_type_ptr(p) == LOGTREE_ROOT_PAGE || -// *stasis_page_type_ptr(p) == SLOTTED_PAGE); - assert(p->pageType == LOGTREE_ROOT_PAGE || - p->pageType == SLOTTED_PAGE); - - DEBUG("appendInternalNode\tdepth %lldkeylen%d\tnumslots %d\n", depth, key_len, *stasis_page_slotted_numslots_ptr(p)); - - if(!depth) - { - // leaf node. - recordid ret = stasis_record_alloc_begin(xid, p, sizeof(indexnode_rec)+key_len); - if(ret.size != INVALID_SLOT) { - stasis_record_alloc_done(xid, p, ret); - writeNodeRecord(xid,p,ret,key,key_len,val_page); - } - return ret; - } - else - { - // recurse - int slot = *stasis_page_slotted_numslots_ptr(p)-1;//*recordcount_ptr(p)-1; - - assert(slot >= FIRST_SLOT); // there should be no empty nodes - const indexnode_rec *nr = (const indexnode_rec*)readRecord(xid, p, slot, 0); - pageid_t child_id = nr->ptr; - nr = 0; - recordid ret; - { - Page *child_page = loadPage(xid, child_id); - writelock(child_page->rwlatch,0); - ret = appendInternalNode(xid, child_page, depth-1, key, key_len, - val_page, lastLeaf, allocator, allocator_state); - - unlock(child_page->rwlatch); - releasePage(child_page); - } - - if(ret.size == INVALID_SLOT) // subtree is full; split - { - ret = stasis_record_alloc_begin(xid, p, sizeof(indexnode_rec)+key_len); - DEBUG("keylen %d\tnumslots %d for page id %lld ret.size %lld prv rec len %d\n", - key_len, - *stasis_page_slotted_numslots_ptr(p), - p->id, - ret.size, - readRecordLength(xid, p, slot)); - if(ret.size != INVALID_SLOT) - { - stasis_record_alloc_done(xid, p, ret); - ret = buildPathToLeaf(xid, ret, p, depth, key, key_len, val_page, - lastLeaf, allocator, allocator_state); - - DEBUG("split tree rooted at %lld, wrote value to {%d %d %lld}\n", - p->id, ret.page, ret.slot, ret.size); - } else { - // ret is NULLRID; this is the root of a full tree. Return - // NULLRID to the caller. - } - } else { - // we inserted the value in to a subtree rooted here. - } - return ret; - } -} - -recordid logtree::buildPathToLeaf(int xid, recordid root, Page *root_p, - int64_t depth, const byte *key, size_t key_len, - pageid_t val_page, pageid_t lastLeaf, - logtree_page_allocator_t allocator, - void *allocator_state) -{ - - // root is the recordid on the root page that should point to the - // new subtree. - assert(depth); - DEBUG("buildPathToLeaf(depth=%lld) (lastleaf=%lld) called\n",depth, lastLeaf); - - pageid_t child = allocator(xid,allocator_state); - DEBUG("new child = %lld internal? %lld\n", child, depth-1); - - Page *child_p = loadPage(xid, child); - writelock(child_p->rwlatch,0); - initializeNodePage(xid, child_p); - - recordid ret; - - if(depth-1) { - // recurse: the page we just allocated is not a leaf. - recordid child_rec = stasis_record_alloc_begin(xid, child_p, sizeof(indexnode_rec)+key_len); - assert(child_rec.size != INVALID_SLOT); - stasis_record_alloc_done(xid, child_p, child_rec); - - ret = buildPathToLeaf(xid, child_rec, child_p, depth-1, key, key_len, - val_page,lastLeaf, allocator, allocator_state); - - unlock(child_p->rwlatch); - releasePage(child_p); - - } else { - // set leaf - - // backward link.//these writes do not need alloc_begin as it is done in page initialization - writeRecord(xid, child_p, PREV_LEAF, (byte*)(&lastLeaf), root_rec_size); - //writeNodeRecord(xid,child_p,PREV_LEAF,dummy,key_len,lastLeaf); - - // forward link (initialize to -1) - - pageid_t tmp_pid = -1; - writeRecord(xid, child_p, NEXT_LEAF, (byte*)(&tmp_pid), root_rec_size); - //writeNodeRecord(xid,child_p,NEXT_LEAF,dummy,key_len,-1); - - recordid leaf_rec = stasis_record_alloc_begin(xid, child_p, - sizeof(indexnode_rec)+key_len); - - assert(leaf_rec.slot == FIRST_SLOT); - - stasis_record_alloc_done(xid, child_p, leaf_rec); - writeNodeRecord(xid,child_p,leaf_rec,key,key_len,val_page); - - ret = leaf_rec; - - unlock(child_p->rwlatch); - releasePage(child_p); - if(lastLeaf != -1) - { - // install forward link in previous page - Page *lastLeafP = loadPage(xid, lastLeaf); - writelock(lastLeafP->rwlatch,0); - writeRecord(xid,lastLeafP,NEXT_LEAF,(byte*)(&child),root_rec_size); - unlock(lastLeafP->rwlatch); - releasePage(lastLeafP); - } - - DEBUG("%lld <-> %lld\n", lastLeaf, child); - } - - writeNodeRecord(xid, root_p, root, key, key_len, child); - - return ret; - -} - - - -/** - * Traverse from the root of the page to the right most leaf (the one - * with the higest base key value). - **/ -pageid_t logtree::findLastLeaf(int xid, Page *root, int64_t depth) -{ - if(!depth) - { - DEBUG("Found last leaf = %lld\n", root->id); - return root->id; - } - else - { - const indexnode_rec *nr = (indexnode_rec*) readRecord(xid, root, - (*stasis_page_slotted_numslots_ptr(root))-1, 0); - pageid_t ret; - - Page *p = loadPage(xid, nr->ptr); - readlock(p->rwlatch,0); - ret = findLastLeaf(xid,p,depth-1); - unlock(p->rwlatch); - releasePage(p); - - return ret; - } -} - - -/** - * Traverse from the root of the tree to the left most (lowest valued - * key) leaf. - */ -pageid_t logtree::findFirstLeaf(int xid, Page *root, int64_t depth) -{ - if(!depth) //if depth is 0, then returns the id of the page - return root->id; - else - { - const indexnode_rec *nr = (indexnode_rec*)readRecord(xid,root,FIRST_SLOT,0); - Page *p = loadPage(xid, nr->ptr); - readlock(p->rwlatch,0); - pageid_t ret = findFirstLeaf(xid,p,depth-1); - unlock(p->rwlatch); - releasePage(p); - return ret; - } -} - - -pageid_t logtree::findPage(int xid, recordid tree, const byte *key, size_t keySize) -{ - Page *p = loadPage(xid, tree.page); - readlock(p->rwlatch,0); - - const indexnode_rec *depth_nr = (const indexnode_rec*)readRecord(xid, p , DEPTH, 0); - - int64_t depth = *((int64_t*)depth_nr); - - recordid rid = lookup(xid, p, depth, key, keySize); - pageid_t ret = lookupLeafPageFromRid(xid,rid);//,keySize); - unlock(p->rwlatch); - releasePage(p); - - return ret; - -} - -pageid_t logtree::lookupLeafPageFromRid(int xid, recordid rid) -{ - pageid_t pid = -1; - if(rid.page != NULLRID.page || rid.slot != NULLRID.slot) - { - Page * p2 = loadPage(xid, rid.page); - readlock(p2->rwlatch,0); - pid = ((const indexnode_rec*)(readRecord(xid,p2,rid.slot,0)))->ptr; - unlock(p2->rwlatch); - releasePage(p2); - } - return pid; -} - - -recordid logtree::lookup(int xid, - Page *node, - int64_t depth, - const byte *key, size_t keySize ) -{ - //DEBUG("lookup: pid %lld\t depth %lld\n", node->id, depth); - if(*stasis_page_slotted_numslots_ptr(node) == FIRST_SLOT) - return NULLRID; - - assert(*stasis_page_slotted_numslots_ptr(node) > FIRST_SLOT); - - int match = FIRST_SLOT; - - // don't need to compare w/ first item in tree. - const indexnode_rec * rec = (indexnode_rec*)readRecord(xid,node,FIRST_SLOT,0); //TODO: why read it then? - - for(int i = FIRST_SLOT+1; i < *stasis_page_slotted_numslots_ptr(node); i++) - { - rec = (const indexnode_rec*)readRecord(xid,node,i,0); - int cmpval = datatuple::compare((datatuple::key_t) (rec+1),(datatuple::key_t) key); - if(cmpval>0) //changed it from > - break; - match = i; - } - - - if(depth) - { - pageid_t child_id = ((const indexnode_rec*)readRecord(xid,node,match,0))->ptr; - Page* child_page = loadPage(xid, child_id); - readlock(child_page->rwlatch,0); - recordid ret = lookup(xid,child_page,depth-1,key,0); - unlock(child_page->rwlatch); - releasePage(child_page); - return ret; - } - else - { - recordid ret = {node->id, match, keySize}; - return ret; - } -} - - -void logtree::print_tree(int xid) -{ - Page *p = loadPage(xid, root_rec.page); - readlock(p->rwlatch,0); - - const indexnode_rec *depth_nr = (const indexnode_rec*)readRecord(xid, p , DEPTH, 0); - - int64_t depth = *((int64_t*)depth_nr); - - print_tree(xid, root_rec.page, depth); - - unlock(p->rwlatch); - releasePage(p); - -} - -void logtree::print_tree(int xid, pageid_t pid, int64_t depth) -{ - - Page *node = loadPage(xid, pid); - readlock(node->rwlatch,0); - - //const indexnode_rec *depth_nr = (const indexnode_rec*)readRecord(xid, p , DEPTH, 0); - - printf("page_id:%lld\tnum_slots:%d\t\n", node->id, *stasis_page_slotted_numslots_ptr(node)); - - if(*stasis_page_slotted_numslots_ptr(node) == FIRST_SLOT) - return; - - assert(*stasis_page_slotted_numslots_ptr(node) > FIRST_SLOT); - - if(depth) - { - printf("\tnot_leaf\n"); - - for(int i = FIRST_SLOT; i < *stasis_page_slotted_numslots_ptr(node); i++) - { - const indexnode_rec *nr = (const indexnode_rec*)readRecord(xid,node,i,0); - printf("\tchild_page_id:%lld\tkey:%s\n", nr->ptr, - datatuple::key_to_str((byte*)(nr+1)).c_str()); - - } - - for(int i = FIRST_SLOT; i < *stasis_page_slotted_numslots_ptr(node); i++) - { - const indexnode_rec *nr = (const indexnode_rec*)readRecord(xid,node,i,0); - print_tree(xid, nr->ptr, depth-1); - - } - - } - else - { - printf("\tis_leaf\t\n"); - const indexnode_rec *nr = (const indexnode_rec*)readRecord(xid,node,FIRST_SLOT,0); - printf("\tdata_page_id:%lld\tkey:%s\n", nr->ptr, - datatuple::key_to_str((byte*)(nr+1)).c_str()); - printf("\t...\n"); - nr = (const indexnode_rec*)readRecord(xid,node,(*stasis_page_slotted_numslots_ptr(node))-1,0); - printf("\tdata_page_id:%lld\tkey:%s\n", nr->ptr, - datatuple::key_to_str((byte*)(nr+1)).c_str()); - - - } - - - unlock(node->rwlatch); - releasePage(node); - - -} - ///////////////////////////////////////////////////////////////// // LOG TABLE IMPLEMENTATION ///////////////////////////////////////////////////////////////// @@ -1087,9 +288,6 @@ datatuple * logtable::findTuple(int xid, const datatuple::key_t key, size_t keyS { use_copy = true; ret_tuple = tuple_oc1; - //byte *barr = (byte*)malloc(tuple_oc1->byte_length()); - //memcpy(barr, (byte*)tuple_oc1->keylen, tuple_oc1->byte_length()); - //ret_tuple = datatuple::from_bytes(barr); } if(!use_copy) @@ -1249,15 +447,6 @@ void logtable::insertTuple(datatuple *tuple) } //flushing logic - /* - bool go = false; - if(tree_bytes >= MAX_C0_SIZE) - { - go = *mergedata->input_needed; - DEBUG("go %d\n", go); - } - */ - if(tree_bytes >= max_c0_size ) { DEBUG("tree size before merge %d tuples %lld bytes.\n", tsize, tree_bytes); @@ -1336,180 +525,3 @@ datatuple * logtable::findTuple(int xid, datatuple::key_t key, size_t keySize, } return tup; } - - -///////////////////////////////////////////////// -//logtreeIterator implementation -///////////////////////////////////////////////// - -lladdIterator_t* logtreeIterator::open(int xid, recordid root) -{ - if(root.page == 0 && root.slot == 0 && root.size == -1) - return 0; - - Page *p = loadPage(xid,root.page); - readlock(p->rwlatch,0); - - //size_t keySize = getKeySize(xid,p); - DEBUG("ROOT_REC_SIZE %d\n", logtree::root_rec_size); - const byte * nr = logtree::readRecord(xid,p, - logtree::DEPTH, - logtree::root_rec_size); - int64_t depth = *((int64_t*)nr); - DEBUG("DEPTH = %lld\n", depth); - - pageid_t leafid = logtree::findFirstLeaf(xid, p, depth); - if(leafid != root.page) - { - unlock(p->rwlatch); - releasePage(p); - p = loadPage(xid,leafid); - readlock(p->rwlatch,0); - assert(depth != 0); - } - else - assert(depth == 0); - - - logtreeIterator_s *impl = (logtreeIterator_s*)malloc(sizeof(logtreeIterator_s)); - impl->p = p; - { - recordid rid = { p->id, 1, 0};//keySize }; //TODO: why does this start from 1? - impl->current = rid; - } - //DEBUG("keysize = %d, slot = %d\n", keySize, impl->current.slot); - impl->t = 0; - impl->justOnePage = (depth == 0); - - lladdIterator_t *it = (lladdIterator_t*) malloc(sizeof(lladdIterator_t)); - it->type = -1; // XXX LSM_TREE_ITERATOR; - it->impl = impl; - return it; -} - -lladdIterator_t* logtreeIterator::openAt(int xid, recordid root, const byte* key) -{ - if(root.page == NULLRID.page && root.slot == NULLRID.slot) - return 0; - - Page *p = loadPage(xid,root.page); - readlock(p->rwlatch,0); - //size_t keySize = getKeySize(xid,p); - //assert(keySize); - const byte *nr = logtree::readRecord(xid,p,logtree::DEPTH, logtree::root_rec_size); - //const byte *cmp_nr = logtree::readRecord(xid, p , logtree::COMPARATOR, logtree::root_rec_size); - - int64_t depth = *((int64_t*)nr); - - recordid lsm_entry_rid = logtree::lookup(xid,p,depth,key,0);//keySize,comparators[cmp_nr->ptr]); - - if(lsm_entry_rid.page == NULLRID.page && lsm_entry_rid.slot == NULLRID.slot) { - unlock(p->rwlatch); - return 0; - } - assert(lsm_entry_rid.size != INVALID_SLOT); - - if(root.page != lsm_entry_rid.page) - { - unlock(p->rwlatch); - releasePage(p); - p = loadPage(xid,lsm_entry_rid.page); - readlock(p->rwlatch,0); - } - - logtreeIterator_s *impl = (logtreeIterator_s*) malloc(sizeof(logtreeIterator_s)); - impl->p = p; - - impl->current.page = lsm_entry_rid.page; - impl->current.slot = lsm_entry_rid.slot - 1; // slot before thing of interest - impl->current.size = lsm_entry_rid.size; - - impl->t = 0; // must be zero so free() doesn't croak. - impl->justOnePage = (depth==0); - - lladdIterator_t *it = (lladdIterator_t*) malloc(sizeof(lladdIterator_t)); - it->type = -1; // XXX LSM_TREE_ITERATOR - it->impl = impl; - return it; -} - -/** - * move to the next page - **/ -int logtreeIterator::next(int xid, lladdIterator_t *it) -{ - logtreeIterator_s *impl = (logtreeIterator_s*) it->impl; - - impl->current = stasis_record_next(xid, impl->p, impl->current); - - if(impl->current.size == INVALID_SLOT) - { - - const indexnode_rec next_rec = *(const indexnode_rec*)logtree::readRecord(xid,impl->p, - logtree::NEXT_LEAF, - 0); - unlock(impl->p->rwlatch); - releasePage(impl->p); - - DEBUG("done with page %lld next = %lld\n", impl->p->id, next_rec.ptr); - - - if(next_rec.ptr != -1 && ! impl->justOnePage) - { - impl->p = loadPage(xid, next_rec.ptr); - readlock(impl->p->rwlatch,0); - impl->current.page = next_rec.ptr; - impl->current.slot = 2; - impl->current.size = stasis_record_length_read(xid, impl->p, impl->current); //keySize; - } else { - impl->p = 0; - impl->current.size = INVALID_SLOT; - } - - } - else - { - /* - assert(impl->current.size == keySize + sizeof(lsmTreeNodeRecord)); - impl->current.size = keySize; - */ - } - - - if(impl->current.size != INVALID_SLOT) - { - //size_t sz = sizeof(*impl->t) + impl->current.size; - if(impl->t != NULL) - free(impl->t); - - impl->t = (indexnode_rec*)malloc(impl->current.size); - memcpy(impl->t, logtree::readRecord(xid,impl->p,impl->current), impl->current.size); - - return 1; - } - else - { - assert(!impl->p); - if(impl->t != NULL) - free(impl->t); - impl->t = 0; - return 0; - } - -} - -void logtreeIterator::close(int xid, lladdIterator_t *it) -{ - logtreeIterator_s *impl = (logtreeIterator_s*)it->impl; - if(impl->p) - { - unlock(impl->p->rwlatch); - releasePage(impl->p); - } - if(impl->t) - { - free(impl->t); - } - free(impl); - free(it); -} diff --git a/logstore.h b/logstore.h index 7aeb1b4..c5beddf 100644 --- a/logstore.h +++ b/logstore.h @@ -30,6 +30,7 @@ #include #include +#include "diskTreeComponent.h" #include "datapage.h" #include "tuplemerger.h" @@ -40,129 +41,10 @@ struct logtable_mergedata; typedef std::set rbtree_t; typedef rbtree_t* rbtree_ptr_t; -typedef struct RegionAllocConf_t -{ - recordid regionList; - pageid_t regionCount; - pageid_t nextPage; - pageid_t endOfRegion; - pageid_t regionSize; -} RegionAllocConf_t; - struct indexnode_rec { pageid_t ptr; }; -typedef pageid_t(*logtree_page_allocator_t)(int, void *); -typedef void(*logtree_page_deallocator_t)(int, void *); - - -class logtree{ -public: - logtree(int xid): region_alloc(new DataPage::RegionAllocator(xid, 10000)) {create(xid);} // XXX shouldn't hardcode region size. -private: - recordid create(int xid); -public: - void print_tree(int xid); - - static void init_stasis(); - static void deinit_stasis(); -private: - static pageid_t alloc_region(int xid, void *conf); -public: - static pageid_t alloc_region_rid(int xid, void * ridp); - static void force_region_rid(int xid, recordid rid); - static pageid_t*list_region_rid(int xid, void * ridp, pageid_t * region_len, pageid_t * region_count); - static void dealloc_region_rid(int xid, recordid rid); - static void free_region_rid(int xid, recordid tree, - logtree_page_deallocator_t dealloc, - void *allocator_state); - - static void writeNodeRecord(int xid, Page *p, recordid &rid, - const byte *key, size_t keylen, pageid_t ptr); - - static void writeRecord(int xid, Page *p, recordid &rid, - const byte *data, size_t datalen); - - static void writeRecord(int xid, Page *p, slotid_t slot, - const byte *data, size_t datalen); - - static const byte* readRecord(int xid, Page * p, recordid &rid); - static const byte* readRecord(int xid, Page * p, slotid_t slot, int64_t size); - - static int32_t readRecordLength(int xid, Page *p, slotid_t slot); - - //return the left-most leaf, these are not data pages, although referred to as leaf - static pageid_t findFirstLeaf(int xid, Page *root, int64_t depth); - //return the right-most leaf - static pageid_t findLastLeaf(int xid, Page *root, int64_t depth) ; - - //reads the given record and returns the page id stored in it - static pageid_t lookupLeafPageFromRid(int xid, recordid rid); - - //returns a record that stores the pageid where the given key should be in, i.e. if it exists - static recordid lookup(int xid, Page *node, int64_t depth, const byte *key, - size_t keySize); - - //returns the id of the data page that could contain the given key - static pageid_t findPage(int xid, recordid tree, const byte *key, size_t keySize); - - - //appends a leaf page, val_page is the id of the leaf page - //rmLeafID --> rightmost leaf id - static recordid appendPage(int xid, recordid tree, pageid_t & rmLeafID, - const byte *key,size_t keySize, - logtree_page_allocator_t allocator, void *allocator_state, - long val_page); - - static recordid appendInternalNode(int xid, Page *p, - int64_t depth, - const byte *key, size_t key_len, - pageid_t val_page, pageid_t lastLeaf, - logtree_page_allocator_t allocator, - void *allocator_state); - - static recordid buildPathToLeaf(int xid, recordid root, Page *root_p, - int64_t depth, const byte *key, size_t key_len, - pageid_t val_page, pageid_t lastLeaf, - logtree_page_allocator_t allocator, - void *allocator_state); - - inline DataPage::RegionAllocator* get_alloc() { return region_alloc; } - - /** - Initialize a page for use as an internal node of the tree. - */ - inline static void initializeNodePage(int xid, Page *p); - - recordid &get_tree_state(){return tree_state;} - recordid &get_root_rec(){return root_rec;} - -public: - - const static RegionAllocConf_t REGION_ALLOC_STATIC_INITIALIZER; - const static int64_t DEPTH; - const static int64_t COMPARATOR; - const static int64_t FIRST_SLOT; - const static size_t root_rec_size; - const static int64_t PREV_LEAF; - const static int64_t NEXT_LEAF; - - pageid_t lastLeaf; -private: - - void print_tree(int xid, pageid_t pid, int64_t depth); - -private: - recordid tree_state; - recordid root_rec; - - DataPage::RegionAllocator* region_alloc; - - -}; - - class logtable { public: