Implemented range scans; lsmTree now supports keys that are stored as fixed length byte arrays.

This commit is contained in:
Sears Russell 2007-08-20 16:53:16 +00:00
parent 519bd515f4
commit 9c1c284406
6 changed files with 538 additions and 294 deletions

View file

@ -1,3 +1,5 @@
#include <string.h>
#include <stasis/operations/lsmTree.h> #include <stasis/operations/lsmTree.h>
#include <stasis/constants.h> #include <stasis/constants.h>
// XXX including fixed.h breaks page api encapsulation; we need a "last slot" // XXX including fixed.h breaks page api encapsulation; we need a "last slot"
@ -5,96 +7,26 @@
#include "../page/fixed.h" #include "../page/fixed.h"
#include <pthread.h> #include <pthread.h>
const int MAX_LSM_COMPARATORS = 256; static lsm_comparator_t comparators[MAX_LSM_COMPARATORS];
typedef struct nodeRecord { void lsmTreeRegisterComparator(int id, lsm_comparator_t i) {
pageid_t ptr; // XXX need to de-init this somewhere... assert(!comparators[id]);
int key; comparators[id] = i;
// char funk[1000];
} nodeRecord;
#define HEADER_SIZE (2 * sizeof(nodeRecord))
typedef struct lsmTreeState {
// pthread_mutex_t mut;
// pageid_t * dirtyPages;
pageid_t lastLeaf;
} lsmTreeState;
/** Initialize a page for use as an internal node of the tree.
* lsmTree nodes are based on fixed.h. This function allocates a page
* that can hold fixed length records, and then sets up a tree node
* header in the first two nodeRecords on the page.
*/
static void initializeNodePage(int xid, Page * p) {
fixedPageInitialize(p, sizeof(nodeRecord), 0);
recordid reserved1 = recordPreAlloc(xid, p, sizeof(nodeRecord));
recordPostAlloc(xid, p, reserved1);
recordid reserved2 = recordPreAlloc(xid, p, sizeof(nodeRecord));
recordPostAlloc(xid, p, reserved2);
} }
/**
* A macro that hardcodes the page implementation to use fixed.h's page implementation.
*/
#define readNodeRecord(xid,p,slot) readNodeRecordFixed(xid,p,slot) #define HEADER_SIZE (2 * sizeof(lsmTreeNodeRecord))
/**
* @see readNodeRecord
*/
#define writeNodeRecord(xid,p,slot,key,ptr) writeNodeRecordFixed(xid,p,slot,key,ptr)
//#define readNodeRecord(xid,p,slot) readNodeRecordVirtualMethods(xid,p,slot)
//#define writeNodeRecord(xid,p,slot,key,ptr) writeNodeRecordVirtualMethods(xid,p,slot,key,ptr)
/** // These two constants only apply to the root page.
* Read a record from the page node, assuming the nodes are fixed pages. #define DEPTH 0
*/ #define COMPARATOR 1
static inline nodeRecord readNodeRecordFixed(int xid, Page * const p, int slot) {
return *(nodeRecord*)fixed_record_ptr(p, slot);
}
/**
* Read a record from the page node, using stasis' general-purpose page access API.
*/
static inline nodeRecord readNodeRecordVirtualMethods(int xid, Page * const p, int slot) {
nodeRecord ret;
recordid rid = {p->id, slot, sizeof(nodeRecord)}; // These two apply to all other pages.
const nodeRecord * nr = (const nodeRecord*)recordReadNew(xid,p,rid); #define PREV_LEAF 0
ret = *nr; #define NEXT_LEAF 1
assert(ret.ptr > 1 || slot < 2);
recordReadDone(xid,p,rid,(const byte*)nr);
DEBUG("reading {%lld, %d, %d} = %d, %lld\n", p->id, slot, sizeof(nodeRecord), ret.key, ret.ptr); // This one applies to all pages.
#define FIRST_SLOT 2
return ret;
}
/**
@see readNodeFixed
*/
static inline void writeNodeRecordFixed(int xid, Page * const p, int slot, int key, pageid_t ptr) {
nodeRecord * nr = (nodeRecord*)fixed_record_ptr(p,slot);
nr->key = key;
nr->ptr = ptr;
pageWriteLSN(xid, p, 0); // XXX need real LSN?
}
/**
@see readNodeVirtualMethods
*/
static inline void writeNodeRecordVirtualMethods(int xid, Page * const p, int slot, int key, pageid_t ptr) {
nodeRecord src;
src.key = key;
src.ptr = ptr;
assert(src.ptr > 1 || slot < 2);
recordid rid = {p->id, slot, sizeof(nodeRecord)};
nodeRecord * target = (nodeRecord*)recordWriteNew(xid,p,rid);
*target = src;
DEBUG("Writing to record {%d %d %lld}\n", rid.page, rid.slot, rid.size);
recordWriteDone(xid,p,rid,(byte*)target);
pageWriteLSN(xid, p, 0); // XXX need real LSN?
}
/** /**
@ -105,7 +37,7 @@ static inline void writeNodeRecordVirtualMethods(int xid, Page * const p, int sl
pageLoaded and pageFlushed callbacks. Those callbacks maintain an pageLoaded and pageFlushed callbacks. Those callbacks maintain an
impl pointer, which tracks dirty pages, a mutex, and other impl pointer, which tracks dirty pages, a mutex, and other
information on behalf of the tree. (Note that the dirtyPage list information on behalf of the tree. (Note that the dirtyPage list
must be stored in a global hash tree if the root is evicted with must be stored somewhere in memory if the root is evicted with
outstanding dirty tree pages...) outstanding dirty tree pages...)
Note that this has a particularly nice, general purpose property Note that this has a particularly nice, general purpose property
@ -124,116 +56,244 @@ static inline void writeNodeRecordVirtualMethods(int xid, Page * const p, int sl
uses fixedPage (for now) uses fixedPage (for now)
slot 0: depth of tree. slot 0: the integer id of the comparator used by this tree.
slot 1: slot id of first key in leaf records. [unimplemented] slot 1: depth of tree.
the remainder of the slots contain nodeRecords the remainder of the slots contain lsmTreeNodeRecords
internal node page layout internal node page layout
------------------------- -------------------------
uses fixedPage (for now) uses fixedPage (for now)
slot 0: prev page [unimplemented] slot 0: prev page
slot 1: next page [unimplemented] slot 1: next page
the remainder of the slots contain nodeRecords the remainder of the slots contain lsmTreeNodeRecords
leaf page layout leaf page layout
---------------- ----------------
Defined by client, but calling readRecord() on the slot id must Defined by client.
return the first key stored on the page.
*/ */
recordid TlsmCreate(int xid, int leafFirstSlot, int keySize) {
// XXX generalize later
assert(keySize == sizeof(int));
// XXX hardcoded to fixed.h's current page layout, and node records
// that contain the key...
// can the pages hold at least two keys? typedef struct lsmTreeState {
assert(HEADER_SIZE + 2 * (sizeof(nodeRecord) /*XXX +keySize*/) < pageid_t lastLeaf;
} lsmTreeState;
/** Initialize a page for use as an internal node of the tree.
* lsmTree nodes are based on fixed.h. This function allocates a page
* that can hold fixed length records, and then sets up a tree node
* header in the first two lsmTreeNodeRecords on the page.
*/
static void initializeNodePage(int xid, Page *p, size_t keylen) {
fixedPageInitialize(p, sizeof(lsmTreeNodeRecord)+keylen, 0);
recordid reserved1 = recordPreAlloc(xid, p, sizeof(lsmTreeNodeRecord)+keylen);
recordPostAlloc(xid, p, reserved1);
recordid reserved2 = recordPreAlloc(xid, p, sizeof(lsmTreeNodeRecord)+keylen);
recordPostAlloc(xid, p, reserved2);
}
/**
* A macro that hardcodes the page implementation to use fixed.h's
* page implementation.
*/
#define readNodeRecord(xid,p,slot,keylen) readNodeRecordFixed(xid,p,slot,keylen)
/**
* @see readNodeRecord
*/
#define writeNodeRecord(xid,p,slot,key,keylen,ptr) \
writeNodeRecordFixed(xid,p,slot,key,keylen,ptr)
/**
* @see readNodeRecord
*/
#define getKeySize(xid,p) getKeySizeFixed(xid,p)
/*
#define getKeySize(xid,p) getKeySizeVirtualMethods(xid,p)
#define readNodeRecord(xid,p,slot,keylen) \
readNodeRecordVirtualMethods(xid,p,slot,keylen)
#define writeNodeRecord(xid,p,slot,key,keylen,ptr) \
writeNodeRecordVirtualMethods(xid,p,slot,key,keylen,ptr)
*/
static inline size_t getKeySizeFixed(int xid, Page const *p) {
return *recordsize_ptr(p) - sizeof(lsmTreeNodeRecord);
}
static inline size_t getKeySizeVirtualMethods(int xid, Page *p) {
recordid rid = { p->id, 0, 0 };
return recordGetLength(xid, p, rid) - sizeof(lsmTreeNodeRecord);
}
/**
* Read a record from the page node, assuming the nodes are fixed pages.
*/
static inline
const lsmTreeNodeRecord* readNodeRecordFixed(int xid, Page *const p, int slot,
int keylen) {
return (const lsmTreeNodeRecord*)fixed_record_ptr(p, slot);
}
/**
* Read a record from the page node, using stasis' general-purpose
* page access API.
*/
static inline
lsmTreeNodeRecord* readNodeRecordVirtualMethods(int xid, Page * p,
int slot, int keylen) {
lsmTreeNodeRecord *ret;
recordid rid = {p->id, slot, sizeof(lsmTreeNodeRecord)};
const lsmTreeNodeRecord *nr
= (const lsmTreeNodeRecord*)recordReadNew(xid,p,rid);
memcpy(ret, nr, sizeof(lsmTreeNodeRecord) + keylen);
recordReadDone(xid,p,rid,(const byte*)nr);
DEBUG("reading {%lld, %d, %d} = %d, %lld\n",
p->id, slot, sizeof(lsmTreeNodeRecord), ret.key, ret.ptr);
return ret;
}
/**
@see readNodeFixed
*/
static inline
void writeNodeRecordFixed(int xid, Page *p, int slot,
const byte *key, size_t keylen, pageid_t ptr) {
lsmTreeNodeRecord *nr = (lsmTreeNodeRecord*)fixed_record_ptr(p,slot);
nr->ptr = ptr;
memcpy(nr+1, key, keylen);
pageWriteLSN(xid, p, 0); // XXX need real LSN?
}
/**
@see readNodeVirtualMethods
*/
static inline
void writeNodeRecordVirtualMethods(int xid, Page *p, int slot,
const byte *key, size_t keylen,
pageid_t ptr) {
recordid rid = {p->id, slot, sizeof(lsmTreeNodeRecord)};
lsmTreeNodeRecord *target = (lsmTreeNodeRecord*)recordWriteNew(xid,p,rid);
target->ptr = ptr;
memcpy(target+1,key,keylen);
DEBUG("Writing to record {%d %d %lld}\n", rid.page, rid.slot, rid.size);
recordWriteDone(xid,p,rid,(byte*)target);
pageWriteLSN(xid, p, 0); // XXX need real LSN?
}
recordid TlsmCreate(int xid, int comparator, int keySize) {
// can the pages hold at least two keys?
assert(HEADER_SIZE + 2 * (sizeof(lsmTreeNodeRecord) +keySize) <
USABLE_SIZE_OF_PAGE - 2 * sizeof(short)); USABLE_SIZE_OF_PAGE - 2 * sizeof(short));
pageid_t root = TpageAlloc(xid); pageid_t root = TpageAlloc(xid);
DEBUG("Root = %lld\n", root);
recordid ret = { root, 0, 0 }; recordid ret = { root, 0, 0 };
Page * const p = loadPage(xid, ret.page); Page *p = loadPage(xid, ret.page);
writelock(p->rwlatch,0); writelock(p->rwlatch,0);
fixedPageInitialize(p, sizeof(nodeRecord), 0); fixedPageInitialize(p, sizeof(lsmTreeNodeRecord) + keySize, 0);
*page_type_ptr(p) = LSM_ROOT_PAGE; *page_type_ptr(p) = LSM_ROOT_PAGE;
lsmTreeState * state = malloc(sizeof(lsmTreeState)); lsmTreeState *state = malloc(sizeof(lsmTreeState));
state->lastLeaf = -1; /// constants.h state->lastLeaf = -1; /// XXX define something in constants.h?
// pthread_mutex_init(&(state->mut),0);
// state->dirtyPages = malloc(sizeof(Page*)*2);
// state->dirtyPages[0] = ret.page;
// state->dirtyPages[1] = -1; // XXX this should be defined in constants.h
p->impl = state; p->impl = state;
recordid treeDepth = recordPreAlloc(xid, p, sizeof(nodeRecord)); recordid tmp
recordPostAlloc(xid,p,treeDepth); = recordPreAlloc(xid, p, sizeof(lsmTreeNodeRecord) + keySize);
recordPostAlloc(xid,p,tmp);
assert(treeDepth.page == ret.page assert(tmp.page == ret.page
&& treeDepth.slot == 0 && tmp.slot == DEPTH
&& treeDepth.size == sizeof(nodeRecord)); && tmp.size == sizeof(lsmTreeNodeRecord) + keySize);
recordid slotOff = recordPreAlloc(xid, p, sizeof(nodeRecord)); tmp = recordPreAlloc(xid, p, sizeof(lsmTreeNodeRecord) + keySize);
recordPostAlloc(xid,p,slotOff); recordPostAlloc(xid,p,tmp);
assert(slotOff.page == ret.page assert(tmp.page == ret.page
&& slotOff.slot == 1 && tmp.slot == COMPARATOR
&& slotOff.size == sizeof(nodeRecord)); && tmp.size == sizeof(lsmTreeNodeRecord) + keySize);
// ptr is zero because tree depth starts out as zero. byte *dummy = calloc(1,keySize);
writeNodeRecord(xid, p, 0, 0, 0);
// ptr = slotOff (which isn't used, for now...) writeNodeRecord(xid, p, DEPTH, dummy, keySize, 0);
writeNodeRecord(xid, p, 1, 0, leafFirstSlot); writeNodeRecord(xid, p, COMPARATOR, dummy, keySize, comparator);
unlock(p->rwlatch); unlock(p->rwlatch);
releasePage(p); releasePage(p);
return ret; return ret;
} }
static recordid buildPathToLeaf(int xid, recordid root, Page * const root_p, static recordid buildPathToLeaf(int xid, recordid root, Page *root_p,
int depth, const byte * key, size_t key_len, int depth, const byte *key, size_t key_len,
pageid_t val_page) { pageid_t val_page, pageid_t lastLeaf) {
// root is the recordid on the root page that should point to the // root is the recordid on the root page that should point to the
// new subtree. // new subtree.
assert(depth); assert(depth);
DEBUG("buildPathToLeaf(depth=%d) called\n",depth); DEBUG("buildPathToLeaf(depth=%d) (lastleaf=%lld) called\n",depth, lastLeaf);
pageid_t child = TpageAlloc(xid); // XXX Use some other function... pageid_t child = TpageAlloc(xid); // XXX Use some other function...
DEBUG("new child = %lld internal? %d\n", child, depth-1);
Page * const child_p = loadPage(xid, child); Page *child_p = loadPage(xid, child);
writelock(child_p->rwlatch,0); writelock(child_p->rwlatch,0);
initializeNodePage(xid, child_p); initializeNodePage(xid, child_p, key_len);
recordid ret; recordid ret;
if(depth-1) { if(depth-1) {
// recurse: the page we just allocated is not a leaf. // recurse: the page we just allocated is not a leaf.
recordid child_rec = recordPreAlloc(xid, child_p, sizeof(nodeRecord)); recordid child_rec = recordPreAlloc(xid, child_p, sizeof(lsmTreeNodeRecord)+key_len);
assert(child_rec.size != INVALID_SLOT); assert(child_rec.size != INVALID_SLOT);
recordPostAlloc(xid, child_p, child_rec); recordPostAlloc(xid, child_p, child_rec);
ret = buildPathToLeaf(xid, child_rec, child_p, depth-1, key, key_len, ret = buildPathToLeaf(xid, child_rec, child_p, depth-1, key, key_len,
val_page); val_page,lastLeaf);
unlock(child_p->rwlatch);
releasePage(child_p);
} else { } else {
// set leaf // set leaf
recordid leaf_rec = recordPreAlloc(xid, child_p, sizeof(nodeRecord));
assert(leaf_rec.slot == 2); // XXX byte *dummy = calloc(1, key_len);
// backward link.
writeNodeRecord(xid,child_p,PREV_LEAF,dummy,key_len,lastLeaf);
// forward link (initialize to -1)
writeNodeRecord(xid,child_p,NEXT_LEAF,dummy,key_len,-1);
recordid leaf_rec = recordPreAlloc(xid, child_p,
sizeof(lsmTreeNodeRecord)+key_len);
assert(leaf_rec.slot == FIRST_SLOT);
recordPostAlloc(xid, child_p, leaf_rec); recordPostAlloc(xid, child_p, leaf_rec);
writeNodeRecord(xid,child_p,leaf_rec.slot,*(int*)key,val_page); writeNodeRecord(xid,child_p,leaf_rec.slot,key,key_len,val_page);
ret = leaf_rec; ret = leaf_rec;
}
unlock(child_p->rwlatch);
releasePage(child_p);
writeNodeRecord(xid, root_p, root.slot, *(int*)key, child); unlock(child_p->rwlatch);
releasePage(child_p);
if(lastLeaf != -1) {
// install forward link in previous page
Page *lastLeafP = loadPage(xid, lastLeaf);
writelock(lastLeafP->rwlatch,0);
writeNodeRecord(xid,lastLeafP,NEXT_LEAF,dummy,key_len,child);
unlock(lastLeafP->rwlatch);
releasePage(lastLeafP);
}
DEBUG("%lld <-> %lld\n", lastLeaf, child);
free(dummy);
}
writeNodeRecord(xid, root_p, root.slot, key, key_len, child);
return ret; return ret;
} }
@ -252,47 +312,46 @@ static recordid buildPathToLeaf(int xid, recordid root, Page * const root_p,
*/ */
static recordid appendInternalNode(int xid, Page * const p, static recordid appendInternalNode(int xid, Page *p,
int depth, int depth,
const byte *key, size_t key_len, const byte *key, size_t key_len,
pageid_t val_page) { pageid_t val_page, pageid_t lastLeaf) {
if(!depth) { if(!depth) {
// leaf node. // leaf node.
recordid ret = recordPreAlloc(xid, p, sizeof(nodeRecord)); recordid ret = recordPreAlloc(xid, p, sizeof(lsmTreeNodeRecord)+key_len);
if(ret.size != INVALID_SLOT) { if(ret.size != INVALID_SLOT) {
recordPostAlloc(xid, p, ret); recordPostAlloc(xid, p, ret);
writeNodeRecord(xid,p,ret.slot,*(int*)key,val_page); writeNodeRecord(xid,p,ret.slot,key,key_len,val_page);
assert(val_page); // XXX
} }
return ret; return ret;
} else { } else {
// recurse // recurse
int slot = *recordcount_ptr(p)-1; int slot = *recordcount_ptr(p)-1;
assert(slot >= 2); // XXX assert(slot >= FIRST_SLOT); // there should be no empty nodes
nodeRecord nr = readNodeRecord(xid, p, slot); const lsmTreeNodeRecord *nr = readNodeRecord(xid, p, slot, key_len);
pageid_t child_id = nr.ptr; pageid_t child_id = nr->ptr;
recordid ret; recordid ret;
{ {
Page * const child_page = loadPage(xid, child_id); Page *child_page = loadPage(xid, child_id);
writelock(child_page->rwlatch,0); writelock(child_page->rwlatch,0);
ret = appendInternalNode(xid, child_page, depth-1, ret = appendInternalNode(xid, child_page, depth-1, key, key_len,
key, key_len, val_page); val_page, lastLeaf);
unlock(child_page->rwlatch); unlock(child_page->rwlatch);
releasePage(child_page); releasePage(child_page);
} }
if(ret.size == INVALID_SLOT) { // subtree is full; split if(ret.size == INVALID_SLOT) { // subtree is full; split
if(depth > 1) { ret = recordPreAlloc(xid, p, sizeof(lsmTreeNodeRecord)+key_len);
DEBUG("subtree is full at depth %d\n", depth);
}
ret = recordPreAlloc(xid, p, sizeof(nodeRecord));
if(ret.size != INVALID_SLOT) { if(ret.size != INVALID_SLOT) {
recordPostAlloc(xid, p, ret); recordPostAlloc(xid, p, ret);
ret = buildPathToLeaf(xid, ret, p, depth, key, key_len, val_page); ret = buildPathToLeaf(xid, ret, p, depth, key, key_len, val_page,
lastLeaf);
DEBUG("split tree rooted at %lld, wrote value to {%d %d %lld}\n", p->id, ret.page, ret.slot, ret.size); DEBUG("split tree rooted at %lld, wrote value to {%d %d %lld}\n",
p->id, ret.page, ret.slot, ret.size);
} else { } else {
// ret is NULLRID; this is the root of a full tree. Return NULLRID to the caller. // ret is NULLRID; this is the root of a full tree. Return
// NULLRID to the caller.
} }
} else { } else {
// we inserted the value in to a subtree rooted here. // we inserted the value in to a subtree rooted here.
@ -305,53 +364,74 @@ static recordid appendInternalNode(int xid, Page * const p,
* Traverse from the root of the page to the right most leaf (the one * Traverse from the root of the page to the right most leaf (the one
* with the higest base key value). * with the higest base key value).
*/ */
static pageid_t findLastLeaf(int xid, Page * const root, int depth) { static pageid_t findLastLeaf(int xid, Page *root, int depth) {
if(!depth) { if(!depth) {
DEBUG("Found last leaf = %lld\n", root->id); DEBUG("Found last leaf = %lld\n", root->id);
return root->id; return root->id;
} else { } else {
nodeRecord nr = readNodeRecord(xid, root, (*recordcount_ptr(root))-1); // passing zero as length is OK, as long as we don't try to access the key.
const lsmTreeNodeRecord *nr = readNodeRecord(xid, root,
(*recordcount_ptr(root))-1,0);
pageid_t ret; pageid_t ret;
{
Page * const p = loadPage(xid, nr.ptr); Page *p = loadPage(xid, nr->ptr);
writelock(p->rwlatch,0); readlock(p->rwlatch,0);
ret = findLastLeaf(xid,p,depth-1); ret = findLastLeaf(xid,p,depth-1);
unlock(p->rwlatch); unlock(p->rwlatch);
releasePage(p); releasePage(p);
}
return ret; return ret;
} }
} }
/**
* Traverse from the root of the tree to the left most (lowest valued
* key) leaf.
*/
static pageid_t findFirstLeaf(int xid, Page *root, int depth) {
if(!depth) {
return root->id;
} else {
const lsmTreeNodeRecord *nr = readNodeRecord(xid,root,FIRST_SLOT,0);
Page *p = loadPage(xid, nr->ptr);
readlock(p->rwlatch,0);
pageid_t ret = findFirstLeaf(xid,p,depth-1);
unlock(p->rwlatch);
releasePage(p);
return ret;
}
}
recordid TlsmAppendPage(int xid, recordid tree, recordid TlsmAppendPage(int xid, recordid tree,
const byte *key, size_t keySize, const byte *key,
long val_page) { long val_page) {
Page * const p = loadPage(xid, tree.page); Page *p = loadPage(xid, tree.page);
writelock(p->rwlatch, 0); writelock(p->rwlatch, 0);
lsmTreeState * s = p->impl; lsmTreeState *s = p->impl;
// pthread_mutex_lock(&(s->mut));
size_t keySize = getKeySize(xid,p);
tree.slot = 0; tree.slot = 0;
tree.size = sizeof(nodeRecord); tree.size = sizeof(lsmTreeNodeRecord)+keySize;
nodeRecord nr = readNodeRecord(xid,p,0);
int depth = nr.ptr; const lsmTreeNodeRecord *nr = readNodeRecord(xid, p, DEPTH, keySize);
// const nodeRecord * nr = (const nodeRecord*)recordReadNew(xid,p,tree); int depth = nr->ptr;
// int depth = nr->ptr;
// recordReadDone(xid,p,tree,(const byte*)nr);
if(s->lastLeaf == -1) { if(s->lastLeaf == -1) {
s->lastLeaf = findLastLeaf(xid, p, depth); s->lastLeaf = findLastLeaf(xid, p, depth);
} }
Page * lastLeaf;
Page *lastLeaf;
if(s->lastLeaf != tree.page) { if(s->lastLeaf != tree.page) {
lastLeaf= loadPage(xid, s->lastLeaf); lastLeaf= loadPage(xid, s->lastLeaf);
writelock(lastLeaf->rwlatch, 0); // tree depth is in slot zero of root writelock(lastLeaf->rwlatch, 0);
} else { } else {
lastLeaf = p; lastLeaf = p;
} }
recordid ret = recordPreAlloc(xid, lastLeaf, sizeof(nodeRecord)); recordid ret = recordPreAlloc(xid, lastLeaf,
sizeof(lsmTreeNodeRecord)+keySize);
if(ret.size == INVALID_SLOT) { if(ret.size == INVALID_SLOT) {
if(lastLeaf->id != p->id) { if(lastLeaf->id != p->id) {
@ -363,58 +443,72 @@ recordid TlsmAppendPage(int xid, recordid tree,
tree.slot = 0; tree.slot = 0;
assert(tree.page == p->id); assert(tree.page == p->id);
ret = appendInternalNode(xid, p, depth, key, keySize, ret = appendInternalNode(xid, p, depth, key, keySize, val_page,
val_page); s->lastLeaf == tree.page ? -1 : s->lastLeaf);
if(ret.size == INVALID_SLOT) { if(ret.size == INVALID_SLOT) {
DEBUG("Need to split root; depth = %d\n", depth); DEBUG("Need to split root; depth = %d\n", depth);
pageid_t child = TpageAlloc(xid); pageid_t child = TpageAlloc(xid);
Page *lc = loadPage(xid, child);
Page * lc = loadPage(xid, child);
writelock(lc->rwlatch,0); writelock(lc->rwlatch,0);
initializeNodePage(xid, lc); initializeNodePage(xid, lc,keySize);
for(int i = 2; i < *recordcount_ptr(p); i++) { for(int i = FIRST_SLOT; i < *recordcount_ptr(p); i++) {
recordid cnext = recordPreAlloc(xid, lc, sizeof(nodeRecord)); recordid cnext = recordPreAlloc(xid, lc,
sizeof(lsmTreeNodeRecord)+keySize);
assert(i == cnext.slot); // XXX hardcoded to current node format... assert(i == cnext.slot);
assert(cnext.size != INVALID_SLOT); assert(cnext.size != INVALID_SLOT);
recordPostAlloc(xid, lc, cnext); recordPostAlloc(xid, lc, cnext);
nodeRecord nr = readNodeRecord(xid,p,i); const lsmTreeNodeRecord *nr = readNodeRecord(xid,p,i,keySize);
writeNodeRecord(xid,lc,i,nr.key,nr.ptr); writeNodeRecord(xid,lc,i,(byte*)(nr+1),keySize,nr->ptr);
} }
// deallocate old entries, and update pointer on parent node. // deallocate old entries, and update pointer on parent node.
// XXX this is a terrible way to do this. recordid pFirstSlot = { p->id, FIRST_SLOT,
recordid pFirstSlot = {p->id, 2, sizeof(nodeRecord)}; sizeof(lsmTreeNodeRecord)+keySize };
*recordcount_ptr(p) = 3;
nodeRecord * nr = (nodeRecord*)recordWriteNew(xid, p, pFirstSlot); // @todo should fixed.h support bulk deallocation directly?
*recordcount_ptr(p) = FIRST_SLOT+1;
lsmTreeNodeRecord *nr
= (lsmTreeNodeRecord*)recordWriteNew(xid, p, pFirstSlot);
// don't overwrite key... // don't overwrite key...
nr->ptr = child; nr->ptr = child;
assert(nr->ptr > 1);///XXX
recordWriteDone(xid,p,pFirstSlot,(byte*)nr); recordWriteDone(xid,p,pFirstSlot,(byte*)nr);
pageWriteLSN(xid, p, 0); // XXX need real LSN? pageWriteLSN(xid, p, 0); // XXX need real LSN?
byte *dummy = calloc(1,keySize);
if(!depth) {
s->lastLeaf = lc->id;
writeNodeRecord(xid,lc,PREV_LEAF,dummy,keySize,-1);
writeNodeRecord(xid,lc,NEXT_LEAF,dummy,keySize,-1);
}
unlock(lc->rwlatch); unlock(lc->rwlatch);
releasePage(lc); releasePage(lc);
depth ++; depth ++;
writeNodeRecord(xid,p,0,0,depth); writeNodeRecord(xid,p,DEPTH,dummy,keySize,depth);
free(dummy);
assert(tree.page == p->id); assert(tree.page == p->id);
ret = appendInternalNode(xid, p, depth, key, keySize, ret = appendInternalNode(xid, p, depth, key, keySize, val_page,
val_page); s->lastLeaf == tree.page ? -1 : s->lastLeaf);
assert(ret.size != INVALID_SLOT); assert(ret.size != INVALID_SLOT);
} else { } else {
DEBUG("Appended new internal node tree depth = %d key = %d\n", depth, *(int*)key); DEBUG("Appended new internal node tree depth = %d key = %d\n",
depth, *(int*)key);
} }
s->lastLeaf = ret.page; s->lastLeaf = ret.page;
DEBUG("lastleaf is %lld\n", s->lastLeaf); DEBUG("lastleaf is %lld\n", s->lastLeaf);
@ -425,7 +519,7 @@ recordid TlsmAppendPage(int xid, recordid tree,
recordPostAlloc(xid, lastLeaf, ret); recordPostAlloc(xid, lastLeaf, ret);
writeNodeRecord(xid, lastLeaf, ret.slot, *(int*)key, val_page); writeNodeRecord(xid, lastLeaf, ret.slot, key, keySize, val_page);
if(lastLeaf->id != p->id) { if(lastLeaf->id != p->id) {
unlock(lastLeaf->rwlatch); unlock(lastLeaf->rwlatch);
@ -433,32 +527,34 @@ recordid TlsmAppendPage(int xid, recordid tree,
} }
} }
// XXX do something to make this transactional...
// pthread_mutex_unlock(&(s->mut));
unlock(p->rwlatch); unlock(p->rwlatch);
releasePage(p); releasePage(p);
return ret; return ret;
} }
static pageid_t lsmLookup(int xid, Page * const node, int depth, static pageid_t lsmLookup(int xid, Page *node, int depth,
const byte *key, size_t keySize) { const byte *key, size_t keySize, lsm_comparator_t cmp) {
// Start at slot 2 to skip reserved slots on page...
if(*recordcount_ptr(node) == 2) { return -1; }
assert(*recordcount_ptr(node) > 2);
nodeRecord prev = readNodeRecord(xid,node,2);
// should do binary search instead. if(*recordcount_ptr(node) == FIRST_SLOT) { return -1; }
for(int i = 3; i < *recordcount_ptr(node); i++) { assert(*recordcount_ptr(node) > FIRST_SLOT);
nodeRecord rec = readNodeRecord(xid,node,i);
const lsmTreeNodeRecord *prev = readNodeRecord(xid,node,FIRST_SLOT,keySize);
int prev_cmp_key = cmp(prev+1,key);
// @todo binary search within each page
for(int i = FIRST_SLOT+1; i < *recordcount_ptr(node); i++) {
const lsmTreeNodeRecord *rec = readNodeRecord(xid,node,i,keySize);
int rec_cmp_key = cmp(rec+1,key);
if(depth) { if(depth) {
if(prev.key <= *(int*)key && rec.key > *(int*)key) { if(prev_cmp_key <= 0 && rec_cmp_key > 0) {
pageid_t child_id = prev.ptr; pageid_t child_id = prev->ptr;
Page * const child_page = loadPage(xid, child_id); Page *child_page = loadPage(xid, child_id);
readlock(child_page->rwlatch,0); readlock(child_page->rwlatch,0);
long ret = lsmLookup(xid,child_page,depth-1,key,keySize); long ret = lsmLookup(xid,child_page,depth-1,key,keySize,cmp);
unlock(child_page->rwlatch); unlock(child_page->rwlatch);
releasePage(child_page); releasePage(child_page);
return ret; return ret;
@ -466,22 +562,22 @@ static pageid_t lsmLookup(int xid, Page * const node, int depth,
} else { } else {
if(prev.key == *(int*)key) { if(prev_cmp_key == 0) {
return prev.ptr; return prev->ptr;
} }
} }
prev = rec; prev = rec;
prev_cmp_key = rec_cmp_key;
if(prev.key > *(int*)key) { break; } if(rec_cmp_key > 0) { break; }
} }
if(depth) { if(depth) {
if(prev.key <= *(int*)key) { if(prev_cmp_key <= 0) {
pageid_t child_id = prev.ptr; pageid_t child_id = prev->ptr;
Page * const child_page = loadPage(xid, child_id); Page *child_page = loadPage(xid, child_id);
readlock(child_page->rwlatch,0); readlock(child_page->rwlatch,0);
long ret = lsmLookup(xid,child_page,depth-1,key,keySize); long ret = lsmLookup(xid,child_page,depth-1,key,keySize,cmp);
unlock(child_page->rwlatch); unlock(child_page->rwlatch);
releasePage(child_page); releasePage(child_page);
return ret; return ret;
@ -489,32 +585,37 @@ static pageid_t lsmLookup(int xid, Page * const node, int depth,
} else { } else {
if(prev.key == *(int*)key) { if(prev_cmp_key == 0) {
return prev.ptr; return prev->ptr;
} }
} }
return -1; return -1;
} }
pageid_t TlsmFindPage(int xid, recordid tree, const byte * key, size_t keySize) { /**
Page * const p = loadPage(xid, tree.page); Look up the value associated with key.
@return -1 if key isn't in the tree.
*/
pageid_t TlsmFindPage(int xid, recordid tree, const byte *key) {
Page *p = loadPage(xid, tree.page);
readlock(p->rwlatch,0); readlock(p->rwlatch,0);
//lsmTreeState * s = p->impl;
// pthread_mutex_lock(&(s->mut));
tree.slot = 0; tree.slot = 0;
tree.size = *recordsize_ptr(p); tree.size = *recordsize_ptr(p);
nodeRecord nr = readNodeRecord(xid, p , 0); size_t keySize = getKeySize(xid,p);
// const nodeRecord * nr = (const nodeRecord*)recordReadNew(xid, p, tree);
int depth = nr.ptr; const lsmTreeNodeRecord *depth_nr = readNodeRecord(xid, p , 0, keySize);
const lsmTreeNodeRecord *cmp_nr = readNodeRecord(xid, p , 1, keySize);
pageid_t ret = lsmLookup(xid, p, depth, key, keySize); int depth = depth_nr->ptr;
lsm_comparator_t cmp = comparators[cmp_nr->ptr];
pageid_t ret = lsmLookup(xid, p, depth, key, keySize, cmp);
// recordReadDone(xid, p, tree, (const byte*)nr);
//pthread_mutex_unlock(&(s->mut));
unlock(p->rwlatch); unlock(p->rwlatch);
releasePage(p); releasePage(p);
@ -528,9 +629,8 @@ pageid_t TlsmFindPage(int xid, recordid tree, const byte * key, size_t keySize)
associated with the tree. associated with the tree.
*/ */
static void lsmPageLoaded(Page *p) { static void lsmPageLoaded(Page *p) {
lsmTreeState * state = malloc(sizeof(lsmTreeState)); lsmTreeState *state = malloc(sizeof(lsmTreeState));
state->lastLeaf = -1; state->lastLeaf = -1;
//pthread_mutex_init(&(state->mut),0);
p->impl = state; p->impl = state;
} }
/** /**
@ -538,8 +638,7 @@ static void lsmPageLoaded(Page *p) {
This is called by the buffer manager. This is called by the buffer manager.
*/ */
static void lsmPageFlushed(Page *p) { static void lsmPageFlushed(Page *p) {
lsmTreeState * state = p->impl; lsmTreeState *state = p->impl;
//pthread_mutex_destroy(&(state->mut));
free(state); free(state);
} }
/** /**
@ -552,3 +651,83 @@ page_impl lsmRootImpl() {
pi.page_type = LSM_ROOT_PAGE; pi.page_type = LSM_ROOT_PAGE;
return pi; return pi;
} }
///--------------------- Iterator implementation
lladdIterator_t *lsmTreeIterator_open(int xid, recordid root) {
Page *p = loadPage(xid,root.page);
readlock(p->rwlatch,0);
size_t keySize = getKeySize(xid,p);
const lsmTreeNodeRecord *nr = readNodeRecord(xid,p,DEPTH,keySize);
int depth = nr->ptr;
pageid_t leafid = findFirstLeaf(xid, p, depth);
if(leafid != root.page) {
unlock(p->rwlatch);
releasePage(p);
p = loadPage(xid,leafid);
readlock(p->rwlatch,0);
}
lsmIteratorImpl *impl = malloc(sizeof(lsmIteratorImpl));
impl->p = p;
{
recordid rid = { p->id, 1, keySize };
impl->current = rid;
}
DEBUG("keysize = %d, slot = %d\n", keySize, impl->current.slot);
impl->t = 0;
impl->justOnePage = (depth == 0);
lladdIterator_t *it = malloc(sizeof(lladdIterator_t));
it->type = -1; // XXX LSM_TREE_ITERATOR;
it->impl = impl;
/* itdef = { <-- @todo register lsmTree iterators with stasis someday...
lsmTreeIterator_close;
lsmTreeIterator_next;
lsmTreeIterator_next;
lsmTreeIterator_key;
lsmTreeIterator_value;
lsmTreeIterator_tupleDone;
lsmTreeIterator_releaseLock;
} */
return it;
}
void lsmTreeIterator_close(int xid, lladdIterator_t *it) {
lsmIteratorImpl *impl = it->impl;
if(impl->p) {
unlock(impl->p->rwlatch);
releasePage(impl->p);
}
free(impl);
free(it);
}
int lsmTreeIterator_next(int xid, lladdIterator_t *it) {
lsmIteratorImpl *impl = it->impl;
size_t keySize = impl->current.size;
impl->current = fixedNext(xid, impl->p, impl->current);
if(impl->current.size == INVALID_SLOT) {
const lsmTreeNodeRecord *next_rec = readNodeRecord(xid,impl->p,NEXT_LEAF,
impl->current.size);
unlock(impl->p->rwlatch);
releasePage(impl->p);
DEBUG("done with page %lld next = %lld\n", impl->p->id, next_rec->ptr);
if(next_rec->ptr != -1 && ! impl->justOnePage) {
impl->p = loadPage(xid, next_rec->ptr);
readlock(impl->p->rwlatch,0);
impl->current.page = next_rec->ptr;
impl->current.slot = 2;
impl->current.size = keySize;
} else {
impl->p = 0;
impl->current.size = -1;
}
}
if(impl->current.size != INVALID_SLOT) {
impl->t = readNodeRecord(xid,impl->p,impl->current.slot,impl->current.size);
return 1;
} else {
impl->t = 0;
return 0;
}
}

View file

@ -79,22 +79,6 @@ static int fixedGetLength(int xid, Page *p, recordid rid) {
return rid.slot > *recordcount_ptr(p) ? return rid.slot > *recordcount_ptr(p) ?
INVALID_SLOT : physical_slot_length(*recordsize_ptr(p)); INVALID_SLOT : physical_slot_length(*recordsize_ptr(p));
} }
static recordid fixedNext(int xid, Page *p, recordid rid) {
short n = *recordcount_ptr(p);
rid.slot++;
rid.size = *recordsize_ptr(p);
if(rid.slot >= n) {
return NULLRID;
} else {
return rid;
}
}
static recordid fixedFirst(int xid, Page *p) {
recordid rid = { p->id, -1, 0 };
rid.size = *recordsize_ptr(p);
return fixedNext(xid, p, rid);
}
static int notSupported(int xid, Page * p) { return 0; } static int notSupported(int xid, Page * p) { return 0; }
static int fixedFreespace(int xid, Page * p) { static int fixedFreespace(int xid, Page * p) {

View file

@ -7,6 +7,22 @@
#define recordcount_ptr(page) shorts_from_end((page), 2) #define recordcount_ptr(page) shorts_from_end((page), 2)
#define fixed_record_ptr(page, n) bytes_from_start((page), *recordsize_ptr((page)) * (n)) #define fixed_record_ptr(page, n) bytes_from_start((page), *recordsize_ptr((page)) * (n))
static inline recordid fixedNext(int xid, Page *p, recordid rid) {
short n = *recordcount_ptr(p);
rid.slot++;
rid.size = *recordsize_ptr(p);
if(rid.slot >= n) {
return NULLRID;
} else {
return rid;
}
}
static inline recordid fixedFirst(int xid, Page *p) {
recordid rid = { p->id, -1, 0 };
rid.size = *recordsize_ptr(p);
return fixedNext(xid, p, rid);
}
void fixedPageInit(); void fixedPageInit();
void fixedPageDeinit(); void fixedPageDeinit();
page_impl fixedImpl(); page_impl fixedImpl();

View file

@ -241,4 +241,7 @@ extern const short SLOT_TYPE_LENGTHS[];
#define FILE_PERM (S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH) #define FILE_PERM (S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH)
#define LOG_MODE (O_CREAT | O_RDWR | O_SYNC) #define LOG_MODE (O_CREAT | O_RDWR | O_SYNC)
#define MAX_LSM_COMPARATORS 256
#endif #endif

View file

@ -22,13 +22,9 @@ typedef struct {
recordid pos; recordid pos;
} lladd_lsm_iterator; } lladd_lsm_iterator;
typedef struct { typedef int(*lsm_comparator_t)(const void* a, const void* b);
int id;
// fcn pointer...
} comparator_impl;
void lsmTreeRegisterComparator(comparator_impl i); void lsmTreeRegisterComparator(int id, lsm_comparator_t i);
extern const int MAX_LSM_COMPARATORS;
/** /**
Initialize a new LSM tree. Initialize a new LSM tree.
@ -47,7 +43,7 @@ recordid TlsmDealloc(int xid, recordid tree);
ascending order; LSM trees do not support update in place. ascending order; LSM trees do not support update in place.
*/ */
recordid TlsmAppendPage(int xid, recordid tree, recordid TlsmAppendPage(int xid, recordid tree,
const byte *key, size_t keySize, const byte *key,
long pageid); long pageid);
/** /**
Lookup a leaf page. Lookup a leaf page.
@ -60,25 +56,51 @@ recordid TlsmAppendPage(int xid, recordid tree,
Currently unused. Currently unused.
*/ */
pageid_t TlsmFindPage(int xid, recordid tree, pageid_t TlsmFindPage(int xid, recordid tree,
const byte *key, size_t keySize); const byte *key);
/// --------------- Iterator implementation
typedef struct lsmTreeNodeRecord {
pageid_t ptr;
} lsmTreeNodeRecord;
typedef struct lsmIteratorImpl {
Page * p;
recordid current;
const lsmTreeNodeRecord *t;
int justOnePage;
} lsmIteratorImpl;
/** /**
Return a forward iterator over the tree's leaf pages (*not* their Return a forward iterator over the tree's leaf pages (*not* their
contents). contents). The iterator starts before the first leaf page.
*/
lladdIterator_t * TlsmIterator(int xid, recordid hash);
/** @see iterator.h for documentation of lsmTree's iterator interface.
*/
lladdIterator_t * lsmTreeIterator_open(int xid, recordid tree);
/*
These are the functions that implement lsmTree's iterator. These are the functions that implement lsmTree's iterator.
They're public so that performance critical code can call them They're public so that performance critical code can call them
without paying for a virtual method invocation. without paying for a virtual method invocation.
XXX should they be public?
*/ */
void lsmTreeIterator_close(int xid, void * it); void lsmTreeIterator_close(int xid, lladdIterator_t * it);
int lsmTreeIterator_next (int xid, void * it); int lsmTreeIterator_next (int xid, lladdIterator_t * it);
int lsmTreeIterator_key (int xid, void * it, byte **key);
int lsmTreeIterator_value(int xid, void * it, byte **value);
static inline int lsmTreeIterator_key (int xid, lladdIterator_t *it,
byte **key) {
lsmIteratorImpl * impl = (lsmIteratorImpl*)it->impl;
*key = (byte*)(impl->t+1);
return sizeof(impl->current.size);
}
static inline int lsmTreeIterator_value(int xid, lladdIterator_t *it,
byte **value) {
lsmIteratorImpl * impl = (lsmIteratorImpl*)it->impl;
*value = (byte*)&(impl->t->ptr);
return sizeof(impl->t->ptr);
}
static inline void lsmTreeIterator_tupleDone(int xid, void *it) { }
static inline void lsmTreeIterator_releaseLock(int xid, void *it) { }
#endif // _LSMTREE_H__ #endif // _LSMTREE_H__

View file

@ -13,34 +13,74 @@
#include <time.h> #include <time.h>
#define LOG_NAME "check_lsmTree.log" #define LOG_NAME "check_lsmTree.log"
#define NUM_ENTRIES 100000 #define NUM_ENTRIES_A 100000
#define NUM_ENTRIES_B 10
#define NUM_ENTRIES_C 0
#define OFFSET (NUM_ENTRIES * 10) #define OFFSET (NUM_ENTRIES * 10)
#define DEBUG(...) typedef int64_t lsmkey_t;
int cmp(const void *ap, const void *bp) {
lsmkey_t a = *(lsmkey_t*)ap;
lsmkey_t b = *(lsmkey_t*)bp;
if(a < b) { return -1; }
if(a == b) { return 0; }
return 1;
}
void insertProbeIter(lsmkey_t NUM_ENTRIES) {
int intcmp = 0;
lsmTreeRegisterComparator(intcmp,cmp);
Tinit();
int xid = Tbegin();
recordid tree = TlsmCreate(xid, intcmp, sizeof(lsmkey_t));
for(lsmkey_t i = 0; i < NUM_ENTRIES; i++) {
long pagenum = TlsmFindPage(xid, tree, (byte*)&i);
assert(pagenum == -1);
DEBUG("TlsmAppendPage %d\n",i);
TlsmAppendPage(xid, tree, (const byte*)&i, i + OFFSET);
pagenum = TlsmFindPage(xid, tree, (byte*)&i);
assert(pagenum == i + OFFSET);
}
for(lsmkey_t i = 0; i < NUM_ENTRIES; i++) {
long pagenum = TlsmFindPage(xid, tree, (byte*)&i);
assert(pagenum == i + OFFSET);
}
int64_t count = 0;
lladdIterator_t * it = lsmTreeIterator_open(xid, tree);
while(lsmTreeIterator_next(xid, it)) {
lsmkey_t * key;
lsmkey_t **key_ptr = &key;
int size = lsmTreeIterator_key(xid, it, (byte**)key_ptr);
assert(size == sizeof(lsmkey_t));
long *value;
long **value_ptr = &value;
size = lsmTreeIterator_value(xid, it, (byte**)value_ptr);
assert(size == sizeof(pageid_t));
assert(*key + OFFSET == *value);
assert(*key == count);
count++;
}
assert(count == NUM_ENTRIES);
lsmTreeIterator_close(xid, it);
Tcommit(xid);
Tdeinit();
}
/** @test /** @test
*/ */
START_TEST(lsmTreeTest) START_TEST(lsmTreeTest)
{ {
Tinit(); insertProbeIter(NUM_ENTRIES_A);
int xid = Tbegin(); insertProbeIter(NUM_ENTRIES_B);
recordid tree = TlsmCreate(xid, 0, sizeof(int)); // xxx comparator not set. insertProbeIter(NUM_ENTRIES_C);
for(int i = 0; i < NUM_ENTRIES; i++) {
long pagenum = TlsmFindPage(xid, tree, (byte*)&i, sizeof(int));
assert(pagenum == -1);
DEBUG("TlsmAppendPage %d\n",i);
TlsmAppendPage(xid, tree, (const byte*)&i, sizeof(int), i + OFFSET);
// fflush(NULL);
pagenum = TlsmFindPage(xid, tree, (byte*)&i, sizeof(int));
assert(pagenum == i + OFFSET);
}
for(int i = 0; i < NUM_ENTRIES; i++) {
long pagenum = TlsmFindPage(xid, tree, (byte*)&i, sizeof(int));
assert(pagenum == i + OFFSET);
}
Tcommit(xid);
Tdeinit();
} END_TEST } END_TEST
Suite * check_suite(void) { Suite * check_suite(void) {