partial b-tree stuff

This commit is contained in:
Sears Russell 2009-09-10 21:02:56 +00:00
parent 6abdb3c1c1
commit e48bc7adf3

View file

@ -176,14 +176,55 @@ int TbtreeInsert(int xid, recordid rid, void *cmp_arg, byte *key, size_t keySize
memcpy(buf+1, key, keySize);
memcpy(((byte*)(buf+1))+keySize, value, valueSize);
recordid newrid = stasis_record_alloc_begin(xid, p, sz);
if(newrid.size == sz) {
if(newrid.size != sz) {
// split leaf into two halves (based on slot count)
pageid_t leftpage = p->id;
pageid_t rightpage = TpageAlloc(xid);
TinitializeSlottedPage(xid, rightpage);
Page * rightp = loadPage(xid, rightpage);
writelock(rightp->rwlatch,0);
const recordid lastrid = stasis_record_last(xid, p);
for(slotid_t i = lastrid.slot / 2; i <= lastrid.slot; i++) {
recordid leftrid = {p->id, i, 0};
leftrid.size = stasis_record_length_read(xid, p, leftrid);
recordid rightrid = stasis_record_alloc_begin(xid, rightp, leftrid.size);
stasis_record_alloc_done(xid, rightp, rightrid);
byte * buf = stasis_record_write_begin(xid, rightp, rightrid);
stasis_record_read(xid, p, leftrid, buf);
stasis_record_write_done(xid, rightp, rightrid, buf);
stasis_record_free(xid, p, leftrid);
}
stasis_record_compact(p);
stasis_record_compact_slotids(xid, p);
if(slotrid.slot < lastrid.slot / 2) {
// put into left page (p)
newrid = stasis_record_alloc_begin(xid, p, sz);
} else {
// put into right page(rightp)
slotrid.slot -= (lastrid.slot/2);
Page * swpp = p;
p = rightp;
rightp = swpp;
slotrid.slot = p->id;
newrid = stasis_record_alloc_begin(xid, p, sz);
}
unlock(rightp->rwlatch);
releasePage(rightp);
int next_to_split = h.height - 2; // h.height-1 is the offset of the leaf, which we just split.
while(next_to_split >= 0) {
// insert value into intermediate node. Break out of loop if the value fit...
}
// need new root.
h.height++;
h.root = TpageAlloc(xid);
TinitializeSlottedPage(xid, h.root);
TallocFromPage(xid, h.root, -1/*oldsz*/);
// TODO now what?!?
}
stasis_record_alloc_done(xid, p, newrid);
stasis_record_write(xid, p, newrid, (byte*)buf);
stasis_record_splice(xid, p, slotrid.slot, newrid.slot);
DEBUG("created new record: %lld %d -> %d %d\n", newrid.page, newrid.slot, slotrid.slot, newrid.size);
} else {
assert(newrid.size == sz); // XXX handle splits...
}
free(path);
unlock(p->rwlatch);
releasePage(p);