stasis-aries-wal/src/stasis/operations/lsmTree.c

885 lines
26 KiB
C
Raw Normal View History

#include <string.h>
#include <stasis/operations/lsmTree.h>
2007-11-12 16:40:50 +00:00
#include <stasis/truncation.h>
#include <stasis/constants.h>
// XXX including fixed.h breaks page api encapsulation; we need a "last slot"
// call.
#include "../page/fixed.h"
#include <pthread.h>
static lsm_comparator_t comparators[MAX_LSM_COMPARATORS];
static lsm_page_initializer_t initializers[MAX_LSM_PAGE_INITIALIZERS];
TlsmRegionAllocConf_t LSM_REGION_ALLOC_STATIC_INITIALIZER =
{ {0,0,-1}, 0, -1, -1, 1000 };
pageid_t TlsmRegionAlloc(int xid, void *conf) {
TlsmRegionAllocConf_t* a = (TlsmRegionAllocConf_t*)conf;
if(a->nextPage == a->endOfRegion) {
if(a->regionList.size == -1) {
a->regionList = TarrayListAlloc(xid, 1, 4, sizeof(pageid_t));
a->regionCount = 0;
}
TarrayListExtend(xid,a->regionList,1);
a->regionList.slot = a->regionCount;
DEBUG("region lst slot %lld\n",a->regionList.slot);
a->regionCount++;
DEBUG("region count %lld\n",a->regionCount);
a->nextPage = TregionAlloc(xid, a->regionSize,0);
DEBUG("next page %lld\n",a->nextPage);
a->endOfRegion = a->nextPage + a->regionSize;
Tset(xid,a->regionList,&a->nextPage);
DEBUG("next page %lld\n",a->nextPage);
}
DEBUG("%lld ?= %lld\n", a->nextPage,a->endOfRegion);
pageid_t ret = a->nextPage;
DEBUG("ret %lld\n",ret);
(a->nextPage)++;
return ret;
}
void TlsmRegionForceRid(int xid, void *conf) {
recordid rid = *(recordid*)conf;
TlsmRegionAllocConf_t a;
Tread(xid,rid,&a);
// TlsmRegionAllocConf_t* a = (TlsmRegionAllocConf_t*)conf;
for(int i = 0; i < a.regionCount; i++) {
a.regionList.slot = i;
pageid_t pid;
Tread(xid,a.regionList,&pid);
dirtyPages_flushRange(pid, pid+a.regionSize);
// TregionDealloc(xid,pid);
}
}
void TlsmRegionDeallocRid(int xid, void *conf) {
recordid rid = *(recordid*)conf;
TlsmRegionAllocConf_t a;
Tread(xid,rid,&a);
// TlsmRegionAllocConf_t* a = (TlsmRegionAllocConf_t*)conf;
for(int i = 0; i < a.regionCount; i++) {
a.regionList.slot = i;
pageid_t pid;
Tread(xid,a.regionList,&pid);
TregionDealloc(xid,pid);
}
}
pageid_t TlsmRegionAllocRid(int xid, void * ridp) {
recordid rid = *(recordid*)ridp;
TlsmRegionAllocConf_t conf;
Tread(xid,rid,&conf);
pageid_t ret = TlsmRegionAlloc(xid,&conf);
// XXX get rid of Tset by storing next page in memory, and losing it
// on crash.
Tset(xid,rid,&conf);
return ret;
}
void lsmTreeRegisterComparator(int id, lsm_comparator_t i) {
// XXX need to de-init this somewhere... assert(!comparators[id]);
comparators[id] = i;
}
void lsmTreeRegisterPageInitializer(int id, lsm_page_initializer_t i) {
initializers[id] = i;
}
#define HEADER_SIZE (2 * sizeof(lsmTreeNodeRecord))
// These two constants only apply to the root page.
#define DEPTH 0
#define COMPARATOR 1
// These two apply to all other pages.
#define PREV_LEAF 0
#define NEXT_LEAF 1
// This one applies to all pages.
#define FIRST_SLOT 2
/**
The implementation strategy used here is a bit of an experiment.
LSM tree is updated using a FORCE/STEAL strategy. In order to do
this efficiently, its root node overrides fixedPage, adding
pageLoaded and pageFlushed callbacks. Those callbacks maintain an
impl pointer, which tracks dirty pages, a mutex, and other
information on behalf of the tree. (Note that the dirtyPage list
must be stored somewhere in memory if the root is evicted with
outstanding dirty tree pages...)
Note that this has a particularly nice, general purpose property
that may be useful for other data structure implementations; by
using a mutex associated with the root of the data structure, we
can get rid of the static locks used by existing implementations.
@todo Need easy way for operations to store things in p->impl, even
if the underlying page implementation wants to store something
there too (second pointer?)
Page layout information for lsm trees:
root page layout
----------------
uses fixedPage (for now)
slot 0: the integer id of the comparator used by this tree.
slot 1: depth of tree.
the remainder of the slots contain lsmTreeNodeRecords
internal node page layout
-------------------------
uses fixedPage (for now)
slot 0: prev page
slot 1: next page
the remainder of the slots contain lsmTreeNodeRecords
leaf page layout
----------------
Defined by client.
*/
typedef struct lsmTreeState {
pageid_t lastLeaf;
} lsmTreeState;
/** Initialize a page for use as an internal node of the tree.
* lsmTree nodes are based on fixed.h. This function allocates a page
* that can hold fixed length records, and then sets up a tree node
* header in the first two lsmTreeNodeRecords on the page.
*/
static void initializeNodePage(int xid, Page *p, size_t keylen) {
stasis_fixed_initialize_page(p, sizeof(lsmTreeNodeRecord)+keylen, 0);
recordid reserved1 = stasis_record_alloc_begin(xid, p, sizeof(lsmTreeNodeRecord)+keylen);
stasis_record_alloc_done(xid, p, reserved1);
recordid reserved2 = stasis_record_alloc_begin(xid, p, sizeof(lsmTreeNodeRecord)+keylen);
stasis_record_alloc_done(xid, p, reserved2);
}
/**
* A macro that hardcodes the page implementation to use fixed.h's
* page implementation.
*/
#define readNodeRecord(xid,p,slot,keylen) readNodeRecordFixed(xid,p,slot,keylen)
/**
* @see readNodeRecord
*/
#define writeNodeRecord(xid,p,slot,key,keylen,ptr) \
writeNodeRecordFixed(xid,p,slot,key,keylen,ptr)
/**
* @see readNodeRecord
*/
#define getKeySize(xid,p) getKeySizeFixed(xid,p)
/*
#define getKeySize(xid,p) getKeySizeVirtualMethods(xid,p)
#define readNodeRecord(xid,p,slot,keylen) \
readNodeRecordVirtualMethods(xid,p,slot,keylen)
#define writeNodeRecord(xid,p,slot,key,keylen,ptr) \
writeNodeRecordVirtualMethods(xid,p,slot,key,keylen,ptr)
*/
static inline size_t getKeySizeFixed(int xid, Page const *p) {
return (*recordsize_cptr(p)) - sizeof(lsmTreeNodeRecord);
}
static inline size_t getKeySizeVirtualMethods(int xid, Page *p) {
recordid rid = { p->id, 0, 0 };
return stasis_record_length_read(xid, p, rid) - sizeof(lsmTreeNodeRecord);
}
/**
* Read a record from the page node, assuming the nodes are fixed pages.
*/
static inline
const lsmTreeNodeRecord* readNodeRecordFixed(int xid, Page *const p, int slot,
int keylen) {
return (const lsmTreeNodeRecord*)fixed_record_ptr(p, slot);
}
/**
* Read a record from the page node, using stasis' general-purpose
* page access API.
*/
static inline
lsmTreeNodeRecord* readNodeRecordVirtualMethods(int xid, Page * p,
int slot, int keylen) {
lsmTreeNodeRecord *ret;
recordid rid = {p->id, slot, sizeof(lsmTreeNodeRecord)};
const lsmTreeNodeRecord *nr
= (const lsmTreeNodeRecord*)stasis_record_read_begin(xid,p,rid);
memcpy(ret, nr, sizeof(lsmTreeNodeRecord) + keylen);
stasis_record_read_done(xid,p,rid,(const byte*)nr);
DEBUG("reading {%lld, %d, %d} = %d, %lld\n",
p->id, slot, sizeof(lsmTreeNodeRecord), ret.key, ret.ptr);
return ret;
}
/**
@see readNodeFixed
*/
static inline
void writeNodeRecordFixed(int xid, Page *p, int slot,
const byte *key, size_t keylen, pageid_t ptr) {
lsmTreeNodeRecord *nr = (lsmTreeNodeRecord*)fixed_record_ptr(p,slot);
nr->ptr = ptr;
memcpy(nr+1, key, keylen);
stasis_page_lsn_write(xid, p, 0); // XXX need real LSN?
}
/**
@see readNodeVirtualMethods
*/
static inline
void writeNodeRecordVirtualMethods(int xid, Page *p, int slot,
const byte *key, size_t keylen,
pageid_t ptr) {
recordid rid = {p->id, slot, sizeof(lsmTreeNodeRecord)};
lsmTreeNodeRecord *target = (lsmTreeNodeRecord*)stasis_record_write_begin(xid,p,rid);
target->ptr = ptr;
memcpy(target+1,key,keylen);
DEBUG("Writing to record {%d %d %lld}\n", rid.page, rid.slot, rid.size);
stasis_record_write_done(xid,p,rid,(byte*)target);
stasis_page_lsn_write(xid, p, 0); // XXX need real LSN?
}
recordid TlsmCreate(int xid, int comparator,
lsm_page_allocator_t allocator, void *allocator_state,
int keySize) {
// can the pages hold at least two keys?
assert(HEADER_SIZE + 2 * (sizeof(lsmTreeNodeRecord) +keySize) <
USABLE_SIZE_OF_PAGE - 2 * sizeof(short));
pageid_t root = allocator(xid, allocator_state); //pageAllocatorConfig);
DEBUG("Root = %lld\n", root);
recordid ret = { root, 0, 0 };
Page *p = loadPage(xid, ret.page);
writelock(p->rwlatch,0);
stasis_fixed_initialize_page(p, sizeof(lsmTreeNodeRecord) + keySize, 0);
*stasis_page_type_ptr(p) = LSM_ROOT_PAGE;
lsmTreeState *state = malloc(sizeof(lsmTreeState));
state->lastLeaf = -1; /// XXX define something in constants.h?
p->impl = state;
recordid tmp
= stasis_record_alloc_begin(xid, p, sizeof(lsmTreeNodeRecord) + keySize);
stasis_record_alloc_done(xid,p,tmp);
assert(tmp.page == ret.page
&& tmp.slot == DEPTH
&& tmp.size == sizeof(lsmTreeNodeRecord) + keySize);
tmp = stasis_record_alloc_begin(xid, p, sizeof(lsmTreeNodeRecord) + keySize);
stasis_record_alloc_done(xid,p,tmp);
assert(tmp.page == ret.page
&& tmp.slot == COMPARATOR
&& tmp.size == sizeof(lsmTreeNodeRecord) + keySize);
byte *dummy = calloc(1,keySize);
writeNodeRecord(xid, p, DEPTH, dummy, keySize, 0);
writeNodeRecord(xid, p, COMPARATOR, dummy, keySize, comparator);
free(dummy);
unlock(p->rwlatch);
releasePage(p);
return ret;
}
static recordid buildPathToLeaf(int xid, recordid root, Page *root_p,
int depth, const byte *key, size_t key_len,
pageid_t val_page, pageid_t lastLeaf,
lsm_page_allocator_t allocator,
void *allocator_state) {
// root is the recordid on the root page that should point to the
// new subtree.
assert(depth);
DEBUG("buildPathToLeaf(depth=%d) (lastleaf=%lld) called\n",depth, lastLeaf);
pageid_t child = allocator(xid,allocator_state);
DEBUG("new child = %lld internal? %d\n", child, depth-1);
Page *child_p = loadPage(xid, child);
writelock(child_p->rwlatch,0);
initializeNodePage(xid, child_p, key_len);
recordid ret;
if(depth-1) {
// recurse: the page we just allocated is not a leaf.
recordid child_rec = stasis_record_alloc_begin(xid, child_p, sizeof(lsmTreeNodeRecord)+key_len);
assert(child_rec.size != INVALID_SLOT);
stasis_record_alloc_done(xid, child_p, child_rec);
ret = buildPathToLeaf(xid, child_rec, child_p, depth-1, key, key_len,
val_page,lastLeaf, allocator, allocator_state);
unlock(child_p->rwlatch);
releasePage(child_p);
} else {
// set leaf
byte *dummy = calloc(1, key_len);
// backward link.
writeNodeRecord(xid,child_p,PREV_LEAF,dummy,key_len,lastLeaf);
// forward link (initialize to -1)
writeNodeRecord(xid,child_p,NEXT_LEAF,dummy,key_len,-1);
recordid leaf_rec = stasis_record_alloc_begin(xid, child_p,
sizeof(lsmTreeNodeRecord)+key_len);
assert(leaf_rec.slot == FIRST_SLOT);
stasis_record_alloc_done(xid, child_p, leaf_rec);
writeNodeRecord(xid,child_p,leaf_rec.slot,key,key_len,val_page);
ret = leaf_rec;
unlock(child_p->rwlatch);
releasePage(child_p);
if(lastLeaf != -1) {
// install forward link in previous page
Page *lastLeafP = loadPage(xid, lastLeaf);
writelock(lastLeafP->rwlatch,0);
writeNodeRecord(xid,lastLeafP,NEXT_LEAF,dummy,key_len,child);
unlock(lastLeafP->rwlatch);
releasePage(lastLeafP);
}
DEBUG("%lld <-> %lld\n", lastLeaf, child);
free(dummy);
}
writeNodeRecord(xid, root_p, root.slot, key, key_len, child);
return ret;
}
/* adding pages:
1) Try to append value to lsmTreeState->lastLeaf
2) If that fails, traverses down the root of the tree, split pages while
traversing back up.
3) Split is done by adding new page at end of row (no key
redistribution), except at the root, where root contents are
pushed into the first page of the next row, and a new path from root to
leaf is created starting with the root's immediate second child.
*/
static recordid appendInternalNode(int xid, Page *p,
int depth,
const byte *key, size_t key_len,
pageid_t val_page, pageid_t lastLeaf,
lsm_page_allocator_t allocator,
void *allocator_state) {
assert(*stasis_page_type_ptr(p) == LSM_ROOT_PAGE ||
*stasis_page_type_ptr(p) == FIXED_PAGE);
if(!depth) {
// leaf node.
recordid ret = stasis_record_alloc_begin(xid, p, sizeof(lsmTreeNodeRecord)+key_len);
if(ret.size != INVALID_SLOT) {
stasis_record_alloc_done(xid, p, ret);
writeNodeRecord(xid,p,ret.slot,key,key_len,val_page);
}
return ret;
} else {
// recurse
int slot = *recordcount_ptr(p)-1;
assert(slot >= FIRST_SLOT); // there should be no empty nodes
const lsmTreeNodeRecord *nr = readNodeRecord(xid, p, slot, key_len);
pageid_t child_id = nr->ptr;
recordid ret;
{
Page *child_page = loadPage(xid, child_id);
writelock(child_page->rwlatch,0);
ret = appendInternalNode(xid, child_page, depth-1, key, key_len,
val_page, lastLeaf, allocator, allocator_state);
unlock(child_page->rwlatch);
releasePage(child_page);
}
if(ret.size == INVALID_SLOT) { // subtree is full; split
ret = stasis_record_alloc_begin(xid, p, sizeof(lsmTreeNodeRecord)+key_len);
if(ret.size != INVALID_SLOT) {
stasis_record_alloc_done(xid, p, ret);
ret = buildPathToLeaf(xid, ret, p, depth, key, key_len, val_page,
lastLeaf, allocator, allocator_state);
DEBUG("split tree rooted at %lld, wrote value to {%d %d %lld}\n",
p->id, ret.page, ret.slot, ret.size);
} else {
// ret is NULLRID; this is the root of a full tree. Return
// NULLRID to the caller.
}
} else {
// we inserted the value in to a subtree rooted here.
}
return ret;
}
}
/**
* Traverse from the root of the page to the right most leaf (the one
* with the higest base key value).
*/
static pageid_t findLastLeaf(int xid, Page *root, int depth) {
if(!depth) {
DEBUG("Found last leaf = %lld\n", root->id);
return root->id;
} else {
// passing zero as length is OK, as long as we don't try to access the key.
const lsmTreeNodeRecord *nr = readNodeRecord(xid, root,
(*recordcount_ptr(root))-1,0);
pageid_t ret;
Page *p = loadPage(xid, nr->ptr);
readlock(p->rwlatch,0);
ret = findLastLeaf(xid,p,depth-1);
unlock(p->rwlatch);
releasePage(p);
return ret;
}
}
/**
* Traverse from the root of the tree to the left most (lowest valued
* key) leaf.
*/
static pageid_t findFirstLeaf(int xid, Page *root, int depth) {
if(!depth) {
return root->id;
} else {
const lsmTreeNodeRecord *nr = readNodeRecord(xid,root,FIRST_SLOT,0);
Page *p = loadPage(xid, nr->ptr);
readlock(p->rwlatch,0);
pageid_t ret = findFirstLeaf(xid,p,depth-1);
unlock(p->rwlatch);
releasePage(p);
return ret;
}
}
recordid TlsmAppendPage(int xid, recordid tree,
const byte *key,
lsm_page_allocator_t allocator, void *allocator_state,
long val_page) {
Page *p = loadPage(xid, tree.page);
writelock(p->rwlatch, 0);
lsmTreeState *s = p->impl;
size_t keySize = getKeySize(xid,p);
tree.slot = 0;
tree.size = sizeof(lsmTreeNodeRecord)+keySize;
const lsmTreeNodeRecord *nr = readNodeRecord(xid, p, DEPTH, keySize);
int depth = nr->ptr;
if(s->lastLeaf == -1) {
s->lastLeaf = findLastLeaf(xid, p, depth);
}
Page *lastLeaf;
if(s->lastLeaf != tree.page) {
lastLeaf= loadPage(xid, s->lastLeaf);
writelock(lastLeaf->rwlatch, 0);
} else {
lastLeaf = p;
}
recordid ret = stasis_record_alloc_begin(xid, lastLeaf,
sizeof(lsmTreeNodeRecord)+keySize);
if(ret.size == INVALID_SLOT) {
if(lastLeaf->id != p->id) {
2007-08-24 23:01:08 +00:00
assert(s->lastLeaf != tree.page);
unlock(lastLeaf->rwlatch);
releasePage(lastLeaf); // don't need that page anymore...
2007-08-24 23:01:08 +00:00
lastLeaf = 0;
}
// traverse down the root of the tree.
tree.slot = 0;
assert(tree.page == p->id);
ret = appendInternalNode(xid, p, depth, key, keySize, val_page,
s->lastLeaf == tree.page ? -1 : s->lastLeaf,
allocator, allocator_state);
if(ret.size == INVALID_SLOT) {
DEBUG("Need to split root; depth = %d\n", depth);
pageid_t child = allocator(xid, allocator_state);
Page *lc = loadPage(xid, child);
writelock(lc->rwlatch,0);
initializeNodePage(xid, lc,keySize);
for(int i = FIRST_SLOT; i < *recordcount_ptr(p); i++) {
recordid cnext = stasis_record_alloc_begin(xid, lc,
sizeof(lsmTreeNodeRecord)+keySize);
assert(i == cnext.slot);
assert(cnext.size != INVALID_SLOT);
stasis_record_alloc_done(xid, lc, cnext);
const lsmTreeNodeRecord *nr = readNodeRecord(xid,p,i,keySize);
writeNodeRecord(xid,lc,i,(byte*)(nr+1),keySize,nr->ptr);
}
// deallocate old entries, and update pointer on parent node.
recordid pFirstSlot = { p->id, FIRST_SLOT,
sizeof(lsmTreeNodeRecord)+keySize };
// @todo should fixed.h support bulk deallocation directly?
*recordcount_ptr(p) = FIRST_SLOT+1;
lsmTreeNodeRecord *nr
= (lsmTreeNodeRecord*)stasis_record_write_begin(xid, p, pFirstSlot);
// don't overwrite key...
nr->ptr = child;
stasis_record_write_done(xid,p,pFirstSlot,(byte*)nr);
stasis_page_lsn_write(xid, p, 0); // XXX need real LSN?
byte *dummy = calloc(1,keySize);
if(!depth) {
s->lastLeaf = lc->id;
writeNodeRecord(xid,lc,PREV_LEAF,dummy,keySize,-1);
writeNodeRecord(xid,lc,NEXT_LEAF,dummy,keySize,-1);
}
unlock(lc->rwlatch);
releasePage(lc);
depth ++;
writeNodeRecord(xid,p,DEPTH,dummy,keySize,depth);
free(dummy);
assert(tree.page == p->id);
ret = appendInternalNode(xid, p, depth, key, keySize, val_page,
s->lastLeaf == tree.page ? -1 : s->lastLeaf,
allocator, allocator_state);
assert(ret.size != INVALID_SLOT);
} else {
DEBUG("Appended new internal node tree depth = %d key = %d\n",
depth, *(int*)key);
}
s->lastLeaf = ret.page;
DEBUG("lastleaf is %lld\n", s->lastLeaf);
} else {
// write the new value to an existing page
DEBUG("Writing %d to existing page# %lld\n", *(int*)key, lastLeaf->id);
stasis_record_alloc_done(xid, lastLeaf, ret);
writeNodeRecord(xid, lastLeaf, ret.slot, key, keySize, val_page);
if(lastLeaf->id != p->id) {
2007-08-24 23:01:08 +00:00
assert(s->lastLeaf != tree.page);
unlock(lastLeaf->rwlatch);
releasePage(lastLeaf);
}
}
unlock(p->rwlatch);
releasePage(p);
return ret;
}
void TlsmForce(int xid, recordid tree, lsm_page_forcer_t force,
void *allocator_state) {
force(xid, allocator_state);
}
void TlsmFree(int xid, recordid tree, lsm_page_deallocator_t dealloc,
void *allocator_state) {
// Tdealloc(xid,tree);
dealloc(xid,allocator_state);
// XXX fishy shouldn't caller do this?
Tdealloc(xid, *(recordid*)allocator_state);
}
static pageid_t lsmLookup(int xid, Page *node, int depth,
const byte *key, size_t keySize, lsm_comparator_t cmp) {
if(*recordcount_ptr(node) == FIRST_SLOT) { return -1; }
assert(*recordcount_ptr(node) > FIRST_SLOT);
const lsmTreeNodeRecord *prev = readNodeRecord(xid,node,FIRST_SLOT,keySize);
int prev_cmp_key = cmp(prev+1,key);
// @todo binary search within each page
for(int i = FIRST_SLOT+1; i < *recordcount_ptr(node); i++) {
const lsmTreeNodeRecord *rec = readNodeRecord(xid,node,i,keySize);
int rec_cmp_key = cmp(rec+1,key);
if(depth) {
if(prev_cmp_key <= 0 && rec_cmp_key > 0) {
pageid_t child_id = prev->ptr;
Page *child_page = loadPage(xid, child_id);
readlock(child_page->rwlatch,0);
long ret = lsmLookup(xid,child_page,depth-1,key,keySize,cmp);
unlock(child_page->rwlatch);
releasePage(child_page);
return ret;
}
} else {
// XXX Doesn't handle runs of duplicates.
if(prev_cmp_key <= 0 && rec_cmp_key > 0) {
return prev->ptr;
}
}
prev = rec;
prev_cmp_key = rec_cmp_key;
if(rec_cmp_key > 0) { break; }
}
if(depth) {
// this handles the rhs of the tree.
if(prev_cmp_key <= 0) {
pageid_t child_id = prev->ptr;
Page *child_page = loadPage(xid, child_id);
readlock(child_page->rwlatch,0);
long ret = lsmLookup(xid,child_page,depth-1,key,keySize,cmp);
unlock(child_page->rwlatch);
releasePage(child_page);
return ret;
}
} else {
if(prev_cmp_key <= 0) {
return prev->ptr;
}
}
return -1;
}
/**
Look up the value associated with key.
@return -1 if key isn't in the tree.
*/
pageid_t TlsmFindPage(int xid, recordid tree, const byte *key) {
Page *p = loadPage(xid, tree.page);
readlock(p->rwlatch,0);
tree.slot = 0;
tree.size = *recordsize_ptr(p);
size_t keySize = getKeySize(xid,p);
const lsmTreeNodeRecord *depth_nr = readNodeRecord(xid, p , DEPTH, keySize);
const lsmTreeNodeRecord *cmp_nr = readNodeRecord(xid, p , COMPARATOR, keySize);
int depth = depth_nr->ptr;
lsm_comparator_t cmp = comparators[cmp_nr->ptr];
pageid_t ret = lsmLookup(xid, p, depth, key, keySize, cmp);
unlock(p->rwlatch);
releasePage(p);
return ret;
}
2007-08-24 23:01:08 +00:00
pageid_t TlsmLastPage(int xid, recordid tree) {
Page * root = loadPage(xid, tree.page);
readlock(root->rwlatch,0);
lsmTreeState *state = root->impl;
int keySize = getKeySize(xid,root);
if(state->lastLeaf == -1) {
const lsmTreeNodeRecord *nr = readNodeRecord(xid,root,DEPTH,
keySize);
int depth = nr->ptr;
state->lastLeaf = findLastLeaf(xid,root,depth);
}
pageid_t ret = state->lastLeaf;
unlock(root->rwlatch);
// ret points to the last internal node at this point.
releasePage(root);
Page * p = loadPage(xid, ret);
readlock(p->rwlatch,0);
if(*recordcount_ptr(p) == 2) {
ret = -1;
} else {
const lsmTreeNodeRecord *nr = readNodeRecord(xid,p,(*recordcount_ptr(p))-1,keySize);
ret = nr->ptr;
}
unlock(p->rwlatch);
releasePage(p);
return ret;
}
/**
The buffer manager calls this when the lsmTree's root page is
loaded. This function allocates some storage for cached values
associated with the tree.
*/
static void lsmPageLoaded(Page *p) {
/// XXX should call fixedLoaded, or something...
lsmTreeState *state = malloc(sizeof(lsmTreeState));
state->lastLeaf = -1;
p->impl = state;
}
static void lsmPageFlushed(Page *p) { }
/**
Free any soft state associated with the tree rooted at page p.
This is called by the buffer manager.
*/
static void lsmPageCleanup(Page *p) {
lsmTreeState *state = p->impl;
free(state);
}
/**
A page_impl for the root of an lsmTree.
*/
page_impl lsmRootImpl() {
page_impl pi = fixedImpl();
pi.pageLoaded = lsmPageLoaded;
pi.pageFlushed = lsmPageFlushed;
pi.pageCleanup = lsmPageCleanup;
pi.page_type = LSM_ROOT_PAGE;
return pi;
}
///--------------------- Iterator implementation
lladdIterator_t *lsmTreeIterator_open(int xid, recordid root) {
Page *p = loadPage(xid,root.page);
readlock(p->rwlatch,0);
size_t keySize = getKeySize(xid,p);
const lsmTreeNodeRecord *nr = readNodeRecord(xid,p,DEPTH,keySize);
int depth = nr->ptr;
pageid_t leafid = findFirstLeaf(xid, p, depth);
if(leafid != root.page) {
unlock(p->rwlatch);
releasePage(p);
p = loadPage(xid,leafid);
readlock(p->rwlatch,0);
2007-08-24 23:01:08 +00:00
assert(depth != 0);
} else {
assert(depth == 0);
}
lsmIteratorImpl *impl = malloc(sizeof(lsmIteratorImpl));
impl->p = p;
{
recordid rid = { p->id, 1, keySize };
impl->current = rid;
}
DEBUG("keysize = %d, slot = %d\n", keySize, impl->current.slot);
impl->t = 0;
impl->justOnePage = (depth == 0);
lladdIterator_t *it = malloc(sizeof(lladdIterator_t));
it->type = -1; // XXX LSM_TREE_ITERATOR;
it->impl = impl;
/* itdef = { <-- @todo register lsmTree iterators with stasis someday...
lsmTreeIterator_close;
lsmTreeIterator_next;
lsmTreeIterator_next;
lsmTreeIterator_key;
lsmTreeIterator_value;
lsmTreeIterator_tupleDone;
lsmTreeIterator_releaseLock;
} */
return it;
}
2007-08-24 23:01:08 +00:00
lladdIterator_t *lsmTreeIterator_copy(int xid, lladdIterator_t* i) {
lsmIteratorImpl *it = i->impl;
lsmIteratorImpl *mine = malloc(sizeof(lsmIteratorImpl));
if(it->p) {
mine->p = loadPage(xid, it->p->id);
readlock(mine->p->rwlatch,0);
} else {
mine->p = 0;
}
memcpy(&mine->current, &it->current,sizeof(recordid));
mine->t = it->t;
mine->justOnePage = it->justOnePage;
lladdIterator_t * ret = malloc(sizeof(lladdIterator_t));
ret->type = -1; // XXX LSM_TREE_ITERATOR
ret->impl = mine;
return ret;
}
void lsmTreeIterator_close(int xid, lladdIterator_t *it) {
lsmIteratorImpl *impl = it->impl;
if(impl->p) {
unlock(impl->p->rwlatch);
releasePage(impl->p);
}
free(impl);
free(it);
}
int lsmTreeIterator_next(int xid, lladdIterator_t *it) {
lsmIteratorImpl *impl = it->impl;
size_t keySize = impl->current.size;
impl->current = fixedNext(xid, impl->p, impl->current);
if(impl->current.size == INVALID_SLOT) {
const lsmTreeNodeRecord *next_rec = readNodeRecord(xid,impl->p,NEXT_LEAF,
keySize);
unlock(impl->p->rwlatch);
releasePage(impl->p);
DEBUG("done with page %lld next = %lld\n", impl->p->id, next_rec->ptr);
if(next_rec->ptr != -1 && ! impl->justOnePage) {
impl->p = loadPage(xid, next_rec->ptr);
readlock(impl->p->rwlatch,0);
impl->current.page = next_rec->ptr;
impl->current.slot = 2;
impl->current.size = keySize;
} else {
impl->p = 0;
2007-08-24 23:01:08 +00:00
impl->current.size = INVALID_SLOT;
}
} else {
assert(impl->current.size == keySize + sizeof(lsmTreeNodeRecord));
impl->current.size = keySize;
}
if(impl->current.size != INVALID_SLOT) {
impl->t = readNodeRecord(xid,impl->p,impl->current.slot,impl->current.size);
return 1;
} else {
impl->t = 0;
return 0;
}
}