Refactor page implementation to use multiple include trick (so the backing representation can be changed).

Also, fix apparent bug in stasis_page_fixed_set_record_type().

Otherwise, this change should not make any semantic changes.
This commit is contained in:
Sears Russell 2011-11-09 21:37:38 +00:00
parent 913f0a4401
commit 7660467f8e
20 changed files with 1053 additions and 936 deletions

View file

@ -8,6 +8,8 @@
#include <stasis/io/handle.h> #include <stasis/io/handle.h>
#include <stasis/transactional.h> #include <stasis/transactional.h>
#include <stasis/util/histogram.h> #include <stasis/util/histogram.h>
#include <stasis/page/fixed.h>
#include <string.h> #include <string.h>
#include <assert.h> #include <assert.h>
#include <pthread.h> #include <pthread.h>
@ -29,7 +31,7 @@ struct thread_arg {
int do_load(pageid_t page_count) { int do_load(pageid_t page_count) {
for(int i = 1; i < page_count; i++) { for(int i = 1; i < page_count; i++) {
Page * p = loadUninitializedPage(-1, i); Page * p = loadUninitializedPage(-1, i);
stasis_fixed_initialize_page(p, sizeof(i), 1); stasis_page_fixed_initialize_page(p, sizeof(i), 1);
recordid rid = {i, 0, sizeof(i)}; recordid rid = {i, 0, sizeof(i)};
stasis_record_write(-1, p, rid,(byte*) &i); stasis_record_write(-1, p, rid,(byte*) &i);
stasis_page_lsn_write(-1, p, p->LSN + 1); stasis_page_lsn_write(-1, p, p->LSN + 1);

View file

@ -17,7 +17,7 @@ void alloc_rids(long long num_rids, recordid ** slow, recordid ** fast) {
Page * p = loadPage(xid, pid); Page * p = loadPage(xid, pid);
writelock(p->rwlatch,0); writelock(p->rwlatch,0);
memcpy(old, p->memAddr, PAGE_SIZE); memcpy(old, p->memAddr, PAGE_SIZE);
stasis_slotted_lsn_free_initialize_page(p); stasis_page_slotted_lsn_free_initialize_page(p);
while(i < num_rids && while(i < num_rids &&
( (
((*slow)[i] = stasis_record_alloc_begin(xid, p, sizeof(int))).size ((*slow)[i] = stasis_record_alloc_begin(xid, p, sizeof(int))).size

View file

@ -1,5 +1,6 @@
//#define LEAK_TEST //#define LEAK_TEST
#include <stasis/transactional.h>
#include "roseTable.h" #include "roseTable.h"
#include "stasis/experimental/compression/compression.h" #include "stasis/experimental/compression/compression.h"

View file

@ -1,10 +1,10 @@
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <errno.h> #include <errno.h>
#include "stasis/experimental/lsmTable.h"
#include "stasis/transactional.h" #include "stasis/transactional.h"
#include "stasis/experimental/lsmTable.h"
#include "stasis/experimental/compression/nop.h" #include "stasis/experimental/compression/nop.h"
#include "stasis/experimental/compression/multicolumn-impl.h" #include "stasis/experimental/compression/multicolumn-impl.h"
#include "stasis/experimental/compression/staticMulticolumn.h" #include "stasis/experimental/compression/staticMulticolumn.h"

View file

@ -162,7 +162,7 @@ typedef struct lsmTreeState {
* header in the first two lsmTreeNodeRecords on the page. * header in the first two lsmTreeNodeRecords on the page.
*/ */
static void initializeNodePage(int xid, Page *p, size_t keylen) { static void initializeNodePage(int xid, Page *p, size_t keylen) {
stasis_fixed_initialize_page(p, sizeof(lsmTreeNodeRecord)+keylen, 0); stasis_page_fixed_initialize_page(p, sizeof(lsmTreeNodeRecord)+keylen, 0);
recordid reserved1 = stasis_record_alloc_begin(xid, p, sizeof(lsmTreeNodeRecord)+keylen); recordid reserved1 = stasis_record_alloc_begin(xid, p, sizeof(lsmTreeNodeRecord)+keylen);
stasis_record_alloc_done(xid, p, reserved1); stasis_record_alloc_done(xid, p, reserved1);
recordid reserved2 = stasis_record_alloc_begin(xid, p, sizeof(lsmTreeNodeRecord)+keylen); recordid reserved2 = stasis_record_alloc_begin(xid, p, sizeof(lsmTreeNodeRecord)+keylen);
@ -194,7 +194,7 @@ static void initializeNodePage(int xid, Page *p, size_t keylen) {
*/ */
static inline size_t getKeySizeFixed(int xid, Page const *p) { static inline size_t getKeySizeFixed(int xid, Page const *p) {
return (*recordsize_cptr(p)) - sizeof(lsmTreeNodeRecord); return (*stasis_page_fixed_recordsize_cptr(p)) - sizeof(lsmTreeNodeRecord);
} }
static inline size_t getKeySizeVirtualMethods(int xid, Page *p) { static inline size_t getKeySizeVirtualMethods(int xid, Page *p) {
@ -207,7 +207,7 @@ static inline size_t getKeySizeVirtualMethods(int xid, Page *p) {
static inline static inline
const lsmTreeNodeRecord* readNodeRecordFixed(int xid, Page *const p, int slot, const lsmTreeNodeRecord* readNodeRecordFixed(int xid, Page *const p, int slot,
int keylen) { int keylen) {
return (const lsmTreeNodeRecord*)fixed_record_ptr(p, slot); return (const lsmTreeNodeRecord*)stasis_page_fixed_record_ptr(p, slot);
} }
/** /**
* Read a record from the page node, using stasis' general-purpose * Read a record from the page node, using stasis' general-purpose
@ -237,7 +237,7 @@ lsmTreeNodeRecord* readNodeRecordVirtualMethods(int xid, Page * p,
static inline static inline
void writeNodeRecordFixed(int xid, Page *p, int slot, void writeNodeRecordFixed(int xid, Page *p, int slot,
const byte *key, size_t keylen, pageid_t ptr) { const byte *key, size_t keylen, pageid_t ptr) {
lsmTreeNodeRecord *nr = (lsmTreeNodeRecord*)fixed_record_ptr(p,slot); lsmTreeNodeRecord *nr = (lsmTreeNodeRecord*)stasis_page_fixed_record_ptr(p,slot);
nr->ptr = ptr; nr->ptr = ptr;
memcpy(nr+1, key, keylen); memcpy(nr+1, key, keylen);
stasis_page_lsn_write(xid, p, 0); // XXX need real LSN? stasis_page_lsn_write(xid, p, 0); // XXX need real LSN?
@ -274,7 +274,7 @@ recordid TlsmCreate(int xid, int comparator,
Page *p = loadPage(xid, ret.page); Page *p = loadPage(xid, ret.page);
writelock(p->rwlatch,0); writelock(p->rwlatch,0);
stasis_fixed_initialize_page(p, sizeof(lsmTreeNodeRecord) + keySize, 0); stasis_page_fixed_initialize_page(p, sizeof(lsmTreeNodeRecord) + keySize, 0);
p->pageType = LSM_ROOT_PAGE; p->pageType = LSM_ROOT_PAGE;
lsmTreeState *state = malloc(sizeof(lsmTreeState)); lsmTreeState *state = malloc(sizeof(lsmTreeState));
@ -411,7 +411,7 @@ static recordid appendInternalNode(int xid, Page *p,
return ret; return ret;
} else { } else {
// recurse // recurse
int slot = *recordcount_ptr(p)-1; int slot = *stasis_page_fixed_recordcount_ptr(p)-1;
assert(slot >= FIRST_SLOT); // there should be no empty nodes assert(slot >= FIRST_SLOT); // there should be no empty nodes
const lsmTreeNodeRecord *nr = readNodeRecord(xid, p, slot, key_len); const lsmTreeNodeRecord *nr = readNodeRecord(xid, p, slot, key_len);
pageid_t child_id = nr->ptr; pageid_t child_id = nr->ptr;
@ -457,7 +457,7 @@ static pageid_t findLastLeaf(int xid, Page *root, int depth) {
} else { } else {
// passing zero as length is OK, as long as we don't try to access the key. // passing zero as length is OK, as long as we don't try to access the key.
const lsmTreeNodeRecord *nr = readNodeRecord(xid, root, const lsmTreeNodeRecord *nr = readNodeRecord(xid, root,
(*recordcount_ptr(root))-1,0); (*stasis_page_fixed_recordcount_ptr(root))-1,0);
pageid_t ret; pageid_t ret;
Page *p = loadPage(xid, nr->ptr); Page *p = loadPage(xid, nr->ptr);
@ -545,7 +545,7 @@ recordid TlsmAppendPage(int xid, recordid tree,
initializeNodePage(xid, lc,keySize); initializeNodePage(xid, lc,keySize);
for(int i = FIRST_SLOT; i < *recordcount_ptr(p); i++) { for(int i = FIRST_SLOT; i < *stasis_page_fixed_recordcount_ptr(p); i++) {
recordid cnext = stasis_record_alloc_begin(xid, lc, recordid cnext = stasis_record_alloc_begin(xid, lc,
sizeof(lsmTreeNodeRecord)+keySize); sizeof(lsmTreeNodeRecord)+keySize);
@ -565,7 +565,7 @@ recordid TlsmAppendPage(int xid, recordid tree,
sizeof(lsmTreeNodeRecord)+keySize }; sizeof(lsmTreeNodeRecord)+keySize };
// @todo should fixed.h support bulk deallocation directly? // @todo should fixed.h support bulk deallocation directly?
*recordcount_ptr(p) = FIRST_SLOT+1; *stasis_page_fixed_recordcount_ptr(p) = FIRST_SLOT+1;
lsmTreeNodeRecord *nr lsmTreeNodeRecord *nr
= (lsmTreeNodeRecord*)stasis_record_write_begin(xid, p, pFirstSlot); = (lsmTreeNodeRecord*)stasis_record_write_begin(xid, p, pFirstSlot);
@ -639,14 +639,14 @@ void TlsmFree(int xid, recordid tree, lsm_page_deallocator_t dealloc,
static recordid lsmLookup(int xid, Page *node, int depth, const byte *key, static recordid lsmLookup(int xid, Page *node, int depth, const byte *key,
size_t keySize, lsm_comparator_t cmp) { size_t keySize, lsm_comparator_t cmp) {
if(*recordcount_ptr(node) == FIRST_SLOT) { if(*stasis_page_fixed_recordcount_ptr(node) == FIRST_SLOT) {
return NULLRID; return NULLRID;
} }
assert(*recordcount_ptr(node) > FIRST_SLOT); assert(*stasis_page_fixed_recordcount_ptr(node) > FIRST_SLOT);
int match = FIRST_SLOT; int match = FIRST_SLOT;
// don't need to compare w/ first item in tree. // don't need to compare w/ first item in tree.
const lsmTreeNodeRecord *rec = readNodeRecord(xid,node,FIRST_SLOT,keySize); const lsmTreeNodeRecord *rec = readNodeRecord(xid,node,FIRST_SLOT,keySize);
for(int i = FIRST_SLOT+1; i < *recordcount_ptr(node); i++) { for(int i = FIRST_SLOT+1; i < *stasis_page_fixed_recordcount_ptr(node); i++) {
rec = readNodeRecord(xid,node,i,keySize); rec = readNodeRecord(xid,node,i,keySize);
int cmpval = cmp(rec+1,key); int cmpval = cmp(rec+1,key);
if(cmpval > 0) { if(cmpval > 0) {
@ -690,7 +690,7 @@ pageid_t TlsmFindPage(int xid, recordid tree, const byte *key) {
readlock(p->rwlatch,0); readlock(p->rwlatch,0);
tree.slot = 0; tree.slot = 0;
tree.size = *recordsize_ptr(p); tree.size = *stasis_page_fixed_recordsize_ptr(p);
size_t keySize = getKeySize(xid,p); size_t keySize = getKeySize(xid,p);
@ -733,10 +733,10 @@ pageid_t TlsmLastPage(int xid, recordid tree) {
Page * p = loadPage(xid, ret); Page * p = loadPage(xid, ret);
readlock(p->rwlatch,0); readlock(p->rwlatch,0);
if(*recordcount_ptr(p) == 2) { if(*stasis_page_fixed_recordcount_ptr(p) == 2) {
ret = -1; ret = -1;
} else { } else {
const lsmTreeNodeRecord *nr = readNodeRecord(xid,p,(*recordcount_ptr(p))-1,keySize); const lsmTreeNodeRecord *nr = readNodeRecord(xid,p,(*stasis_page_fixed_recordcount_ptr(p))-1,keySize);
ret = nr->ptr; ret = nr->ptr;
} }
unlock(p->rwlatch); unlock(p->rwlatch);
@ -769,7 +769,7 @@ static void lsmPageCleanup(Page *p) {
A page_impl for the root of an lsmTree. A page_impl for the root of an lsmTree.
*/ */
page_impl lsmRootImpl() { page_impl lsmRootImpl() {
page_impl pi = fixedImpl(); page_impl pi = stasis_page_fixed_impl();
pi.pageLoaded = lsmPageLoaded; pi.pageLoaded = lsmPageLoaded;
pi.pageFlushed = lsmPageFlushed; pi.pageFlushed = lsmPageFlushed;
pi.pageCleanup = lsmPageCleanup; pi.pageCleanup = lsmPageCleanup;
@ -888,7 +888,7 @@ void lsmTreeIterator_close(int xid, lladdIterator_t *it) {
int lsmTreeIterator_next(int xid, lladdIterator_t *it) { int lsmTreeIterator_next(int xid, lladdIterator_t *it) {
lsmIteratorImpl *impl = it->impl; lsmIteratorImpl *impl = it->impl;
size_t keySize = impl->current.size; size_t keySize = impl->current.size;
impl->current = fixedNext(xid, impl->p, impl->current); impl->current = stasis_page_fixed_next_record(xid, impl->p, impl->current);
if(impl->current.size == INVALID_SLOT) { if(impl->current.size == INVALID_SLOT) {
const lsmTreeNodeRecord next_rec = *readNodeRecord(xid,impl->p,NEXT_LEAF, const lsmTreeNodeRecord next_rec = *readNodeRecord(xid,impl->p,NEXT_LEAF,
keySize); keySize);

View file

@ -1,6 +1,7 @@
#include <stasis/operations/arrayList.h> #include <stasis/operations/arrayList.h>
#include <stasis/bufferManager.h> #include <stasis/bufferManager.h>
#include <stasis/transactional.h> #include <stasis/transactional.h>
#include <stasis/page/fixed.h>
#include <assert.h> #include <assert.h>
#include <math.h> #include <math.h>
@ -33,7 +34,7 @@ static array_list_parameter_t array_list_read_parameter(int xid, Page * p) {
} }
static int array_list_get_block_containing_offset(array_list_parameter_t alp, int offset, pageid_t * firstSlotInBlock) { static int array_list_get_block_containing_offset(array_list_parameter_t alp, int offset, pageid_t * firstSlotInBlock) {
int rec_per_page = stasis_fixed_records_per_page((size_t)alp.size); int rec_per_page = stasis_page_fixed_records_per_page((size_t)alp.size);
long thisHigh = rec_per_page * alp.initialSize; long thisHigh = rec_per_page * alp.initialSize;
int lastHigh = 0; int lastHigh = 0;
int pageRidSlot = 0; int pageRidSlot = 0;
@ -58,8 +59,8 @@ static int array_list_op_init_header(const LogEntry* e, Page* p) {
const array_list_parameter_t * alp const array_list_parameter_t * alp
= stasis_log_entry_update_args_cptr(e); = stasis_log_entry_update_args_cptr(e);
stasis_fixed_initialize_page(p, sizeof(pageid_t), stasis_page_fixed_initialize_page(p, sizeof(pageid_t),
stasis_fixed_records_per_page(sizeof(pageid_t))); stasis_page_fixed_records_per_page(sizeof(pageid_t)));
recordid initialSizeRid, multiplierRid, slotSizeRid, maxOffsetRid, firstDataPageRid; recordid initialSizeRid, multiplierRid, slotSizeRid, maxOffsetRid, firstDataPageRid;
@ -113,7 +114,7 @@ recordid stasis_array_list_dereference_recordid(int xid, Page * p, int offset) {
readlock(p->rwlatch,0); readlock(p->rwlatch,0);
array_list_parameter_t tlp = array_list_read_parameter(xid, p); array_list_parameter_t tlp = array_list_read_parameter(xid, p);
int rec_per_page = stasis_fixed_records_per_page((size_t)tlp.size); int rec_per_page = stasis_page_fixed_records_per_page((size_t)tlp.size);
pageid_t lastHigh = 0; pageid_t lastHigh = 0;
int pageRidSlot = 0; /* The slot on the root arrayList page that contains the first page of the block of interest */ int pageRidSlot = 0; /* The slot on the root arrayList page that contains the first page of the block of interest */

View file

@ -1,4 +1,5 @@
#include <stasis/page.h> #include <stasis/page.h>
#include <stasis/page/fixed.h>
#include <stasis/bufferManager.h> #include <stasis/bufferManager.h>
#include <stasis/logger/logEntry.h> #include <stasis/logger/logEntry.h>
#include <stasis/operations/pageOperations.h> #include <stasis/operations/pageOperations.h>
@ -195,8 +196,8 @@ static int op_initialize_page(const LogEntry* e, Page* p) {
stasis_page_slotted_initialize_page(p); stasis_page_slotted_initialize_page(p);
break; break;
case FIXED_PAGE: case FIXED_PAGE:
stasis_fixed_initialize_page(p, arg->type, stasis_page_fixed_initialize_page(p, arg->type,
stasis_fixed_records_per_page stasis_page_fixed_records_per_page
(stasis_record_type_to_size(arg->type))); (stasis_record_type_to_size(arg->type)));
break; break;
default: default:
@ -223,8 +224,8 @@ static int op_init_multipage_impl(const LogEntry *e, Page *ignored) {
} else if(arg->recordSize == BLOB_SLOT) { } else if(arg->recordSize == BLOB_SLOT) {
stasis_page_blob_initialize_page(p); stasis_page_blob_initialize_page(p);
} else { } else {
stasis_fixed_initialize_page(p, arg->recordSize, stasis_page_fixed_initialize_page(p, arg->recordSize,
stasis_fixed_records_per_page stasis_page_fixed_records_per_page
(stasis_record_type_to_size(arg->recordSize))); (stasis_record_type_to_size(arg->recordSize)));
} }
stasis_page_lsn_write(e->xid, p, e->LSN); stasis_page_lsn_write(e->xid, p, e->LSN);

View file

@ -109,13 +109,13 @@ lsn_t stasis_page_lsn_read(const Page * page) {
void stasis_page_init(stasis_dirty_page_table_t * dpt) { void stasis_page_init(stasis_dirty_page_table_t * dpt) {
dirtyPages = dpt; dirtyPages = dpt;
stasis_page_slotted_init(); stasis_page_slotted_init();
fixedPageInit(); stasis_page_fixed_init();
stasis_page_impl_register(stasis_page_uninitialized_impl()); stasis_page_impl_register(stasis_page_uninitialized_impl());
stasis_page_impl_register(stasis_page_slotted_impl()); stasis_page_impl_register(stasis_page_slotted_impl());
stasis_page_impl_register(fixedImpl()); stasis_page_impl_register(stasis_page_fixed_impl());
stasis_page_impl_register(stasis_page_boundary_tag_impl()); stasis_page_impl_register(stasis_page_boundary_tag_impl());
stasis_page_impl_register(arrayListImpl()); stasis_page_impl_register(stasis_page_array_list_impl());
stasis_page_impl_register(stasis_page_blob_impl()); stasis_page_impl_register(stasis_page_blob_impl());
stasis_page_impl_register(slottedLsnFreeImpl()); stasis_page_impl_register(slottedLsnFreeImpl());
stasis_page_impl_register(segmentImpl()); stasis_page_impl_register(segmentImpl());
@ -129,7 +129,7 @@ void stasis_page_deinit() {
page_impls[i] = p; page_impls[i] = p;
} }
fixedPageDeinit(); stasis_page_fixed_deinit();
stasis_page_slotted_deinit(); stasis_page_slotted_deinit();
} }

View file

@ -1,155 +1,148 @@
#include <stasis/common.h> #include <stasis/common.h>
#include <stasis/page.h> #include <stasis/page.h>
#include <stasis/page/fixed.h> #include <stasis/page/fixed.h>
/** @todo should page implementations provide readLSN / writeLSN??? */
#include <stasis/truncation.h> #include <stasis/truncation.h>
#include <assert.h> #include <assert.h>
int stasis_fixed_records_per_page(size_t size) {
return (USABLE_SIZE_OF_PAGE - 2*sizeof(short)) / size;
}
/** @todo CORRECTNESS Locking for stasis_fixed_initialize_page? (should hold writelock)*/
void stasis_fixed_initialize_page(Page * page, size_t size, int count) {
stasis_page_cleanup(page);
// Zero out the page contents, since callers often do so anyway.
// blows away LSN, but the copy that's in p->LSN will be put back on page at flush.
memset(page->memAddr, 0, PAGE_SIZE);
page->pageType = FIXED_PAGE;
*recordsize_ptr(page) = size;
assert(count <= stasis_fixed_records_per_page(size));
*recordcount_ptr(page)= count;
}
static void checkRid(Page * page, recordid rid) {
assert(page->pageType); // any more specific breaks pages based on this one
assert(page->id == rid.page);
assert(*recordsize_ptr(page) == rid.size);
assert(stasis_fixed_records_per_page(rid.size) > rid.slot);
}
//-------------- New API below this line //-------------- New API below this line
static const byte* fixedRead(int xid, Page *p, recordid rid) { static inline void stasis_page_fixed_checkRid(Page * page, recordid rid) {
checkRid(p, rid); assert(page->pageType); // any more specific breaks pages based on this one
assert(rid.slot < *recordcount_ptr(p)); assert(page->id == rid.page);
return fixed_record_ptr(p, rid.slot); assert(*stasis_page_fixed_recordsize_cptr(page) == rid.size);
assert(stasis_page_fixed_records_per_page(rid.size) > rid.slot);
}
void stasis_page_fixed_initialize_page(Page * page, size_t size, int count) {
stasis_page_cleanup(page);
page->pageType = FIXED_PAGE;
stasis_page_fixed_initialize_page_raw(page, size, count);
} }
static byte* fixedWrite(int xid, Page *p, recordid rid) { static int stasis_page_fixed_get_record_type(int xid, Page *p, recordid rid) {
checkRid(p, rid); return stasis_page_fixed_get_type(p, rid.slot);
assert(rid.slot < *recordcount_ptr(p)); }
return fixed_record_ptr(p, rid.slot); static void stasis_page_fixed_set_record_type(int xid, Page *p, recordid rid, int type) {
stasis_page_fixed_set_type(p, rid.slot, type);
} }
static int fixedGetType(int xid, Page *p, recordid rid) { static int stasis_page_fixed_not_supported(int xid, Page * p) { return 0; }
// checkRid(p, rid);
if(rid.slot < *recordcount_ptr(p)) { static const byte* stasis_page_fixed_read(int xid, Page *p, recordid rid) {
int type = *recordsize_ptr(p); stasis_page_fixed_checkRid(p, rid);
if(type > 0) { assert(rid.slot < *stasis_page_fixed_recordcount_cptr(p));
type = NORMAL_SLOT; return stasis_page_fixed_record_cptr(p, rid.slot);
}
return type;
} else {
return INVALID_SLOT;
}
} }
static void fixedSetType(int xid, Page *p, recordid rid, int type) {
checkRid(p,rid); static inline byte* stasis_page_fixed_write(int xid, Page *p, recordid rid) {
assert(rid.slot < *recordcount_ptr(p)); stasis_page_fixed_checkRid(p, rid);
assert(stasis_record_type_to_size(type) == stasis_record_type_to_size(*recordsize_ptr(p))); assert(rid.slot < *stasis_page_fixed_recordcount_cptr(p));
*recordsize_ptr(p) = rid.size; return stasis_page_fixed_record_ptr(p, rid.slot);
} }
static int fixedGetLength(int xid, Page *p, recordid rid) {
static int stasis_page_fixed_get_length_record(int xid, Page *p, recordid rid) {
assert(p->pageType); assert(p->pageType);
return rid.slot > *recordcount_ptr(p) ? return stasis_page_fixed_get_length(p, rid.slot);
INVALID_SLOT : stasis_record_type_to_size(*recordsize_ptr(p));
} }
static int notSupported(int xid, Page * p) { return 0; } static recordid stasis_page_fixed_last_record(int xid, Page *p) {
recordid rid = { p->id, -1, 0 };
static int fixedFreespace(int xid, Page * p) { rid.size = *stasis_page_fixed_recordsize_cptr(p);
if(stasis_fixed_records_per_page(*recordsize_ptr(p)) > *recordcount_ptr(p)) { rid.slot = -stasis_page_fixed_last_slot(p);
// Return the size of a slot; that's the biggest record we can take. return rid;
return stasis_record_type_to_size(*recordsize_ptr(p)); }
recordid stasis_page_fixed_next_record(int xid, Page *p, recordid rid) {
slotid_t slot = stasis_page_fixed_next_slot(p, rid.slot);
if(slot == INVALID_SLOT) {
return NULLRID;
} else { } else {
// Page full; return zero. assert(rid.page == p->id);
return 0; rid.size = *stasis_page_fixed_recordsize_cptr(p);
rid.slot = slot;
return rid;
} }
} }
static void fixedCompact(Page * p) { static recordid stasis_page_fixed_first_record(int xid, Page *p) {
recordid rid = {
p->id,
INVALID_SLOT,
*stasis_page_fixed_recordsize_cptr(p)
};
return stasis_page_fixed_next_record(xid, p, rid);
}
static int stasis_page_fixed_freespace(int xid, Page * p) {
return stasis_page_fixed_freespace_raw(p);
}
static inline void stasis_page_fixed_compact(Page * p) {
// no-op // no-op
} }
static void fixedCompactSlotIds(int xid, Page * p) { static inline void stasis_page_fixed_compact_slot_ids(int xid, Page * p) {
abort(); abort();
} }
static recordid fixedPreAlloc(int xid, Page *p, int size) {
if(stasis_fixed_records_per_page(*recordsize_ptr(p)) > *recordcount_ptr(p)) { static recordid stasis_page_fixed_pre_alloc_record(int xid, Page *p, int size) {
recordid rid; int slot = stasis_page_fixed_pre_alloc(p, size);
rid.page = p->id; if(slot ==-1) {
rid.slot = *recordcount_ptr(p);
rid.size = *recordsize_ptr(p);
return rid;
} else {
return NULLRID; return NULLRID;
} }
recordid rid = { p->id, slot, *stasis_page_fixed_recordsize_cptr(p) };
return rid;
} }
static void fixedPostAlloc(int xid, Page *p, recordid rid) { static void stasis_page_fixed_post_alloc_record(int xid, Page *p, recordid rid) {
assert(*recordcount_ptr(p) == rid.slot); assert(*stasis_page_fixed_recordsize_cptr(p) == rid.size);
assert(*recordsize_ptr(p) == rid.size); stasis_page_fixed_post_alloc(p, rid.slot);
(*recordcount_ptr(p))++;
} }
static void fixedSplice(int xid, Page *p, slotid_t first, slotid_t second) {
static inline void stasis_page_fixed_splice(int xid, Page *p, slotid_t first, slotid_t second) {
abort(); abort();
} }
static void fixedFree(int xid, Page *p, recordid rid) {
if(*recordsize_ptr(p) == rid.slot+1) { static void stasis_page_fixed_free_record(int xid, Page *p, recordid rid) {
(*recordsize_ptr(p))--; stasis_page_fixed_free(p, rid.slot);
} else {
// leak space; there's no way to track it with this page format.
}
} }
// XXX dereferenceRID // XXX dereferenceRID
void fixedLoaded(Page *p) { static void stasis_page_fixed_loaded(Page *p) {
p->LSN = *stasis_page_lsn_ptr(p); p->LSN = *stasis_page_lsn_ptr(p);
} }
void fixedFlushed(Page *p) { static void stasis_page_fixed_flushed(Page *p) {
*stasis_page_lsn_ptr(p) = p->LSN; *stasis_page_lsn_ptr(p) = p->LSN;
} }
void fixedCleanup(Page *p) { } static void stasis_page_fixed_cleanup(Page *p) { }
page_impl fixedImpl() {
page_impl stasis_page_fixed_impl() {
static page_impl pi = { static page_impl pi = {
FIXED_PAGE, FIXED_PAGE,
1, 1,
fixedRead, stasis_page_fixed_read,
fixedWrite, stasis_page_fixed_write,
0,// readDone 0,// readDone
0,// writeDone 0,// writeDone
fixedGetType, stasis_page_fixed_get_record_type,
fixedSetType, stasis_page_fixed_set_record_type,
fixedGetLength, stasis_page_fixed_get_length_record,
fixedFirst, stasis_page_fixed_first_record,
fixedNext, stasis_page_fixed_next_record,
fixedLast, stasis_page_fixed_last_record,
notSupported, // notSupported, stasis_page_fixed_not_supported,
stasis_block_first_default_impl, stasis_block_first_default_impl,
stasis_block_next_default_impl, stasis_block_next_default_impl,
stasis_block_done_default_impl, stasis_block_done_default_impl,
fixedFreespace, stasis_page_fixed_freespace,
fixedCompact, stasis_page_fixed_compact,
fixedCompactSlotIds, stasis_page_fixed_compact_slot_ids,
fixedPreAlloc, stasis_page_fixed_pre_alloc_record,
fixedPostAlloc, stasis_page_fixed_post_alloc_record,
fixedSplice, stasis_page_fixed_splice,
fixedFree, stasis_page_fixed_free_record,
0, // XXX dereference 0, // XXX dereference
fixedLoaded, // loaded stasis_page_fixed_loaded,
fixedFlushed, // flushed stasis_page_fixed_flushed,
fixedCleanup stasis_page_fixed_cleanup
}; };
return pi; return pi;
} }
@ -157,11 +150,11 @@ page_impl fixedImpl() {
/** /**
@todo arrayListImpl belongs in arrayList.c @todo arrayListImpl belongs in arrayList.c
*/ */
page_impl arrayListImpl() { page_impl stasis_page_array_list_impl() {
page_impl pi = fixedImpl(); page_impl pi = stasis_page_fixed_impl();
pi.page_type = ARRAY_LIST_PAGE; pi.page_type = ARRAY_LIST_PAGE;
return pi; return pi;
} }
void fixedPageInit() { } void stasis_page_fixed_init() { }
void fixedPageDeinit() { } void stasis_page_fixed_deinit() { }

View file

@ -2,7 +2,7 @@
#include <stasis/page/slotted.h> #include <stasis/page/slotted.h>
#include <stasis/logger/logger2.h> #include <stasis/logger/logger2.h>
void stasis_slotted_lsn_free_initialize_page(Page * p) { void stasis_page_slotted_lsn_free_initialize_page(Page * p) {
stasis_page_slotted_initialize_page(p); stasis_page_slotted_initialize_page(p);
p->pageType = SLOTTED_LSN_FREE_PAGE; p->pageType = SLOTTED_LSN_FREE_PAGE;
} }

View file

@ -8,25 +8,7 @@
#include <stasis/page.h> #include <stasis/page.h>
#include <stasis/page/slotted.h> #include <stasis/page/slotted.h>
//#include <assert.h>
#define SLOTTED_PAGE_OVERHEAD_PER_RECORD (2 * sizeof(short))
#define SLOTTED_PAGE_HEADER_OVERHEAD (3 * sizeof(short))
#ifdef LONG_TEST
#define SLOTTED_PAGE_CHECK_FOR_OVERLAP 1
#endif
// plan: slotted fsck on read / write. Make it more thorough so that the other methods only check for record existence.
// intermediate ops assume that slotted.c is correctly implemented (ie: fsck passes iff page is OK, page always "stays" OK)
// benchmark page ops (real time) + hash table (user time)
//#define SLOTTED_PAGE_OLD_CHECKS
#define SLOTTED_PAGE_NEW_CHECKS
#ifdef SLOTTED_PAGE_NEW_CHECKS
#define SLOTTED_PAGE_SANITY_CHECKS
#define SLOTTED_PAGE_CHECK_FOR_OVERLAP
#endif
#include <stasis/truncation.h> #include <stasis/truncation.h>
/** /**
Run sanity checks to make sure page is in a consistent state. Run sanity checks to make sure page is in a consistent state.
@ -36,353 +18,79 @@
and more expensive. and more expensive.
*/ */
static inline void slottedFsck(const Page const * page) { static inline void slottedFsck(const Page const * page) {
Page dummy;
dummy.id = -1;
dummy.memAddr = 0;
const short page_type = page->pageType; const short page_type = page->pageType;
const short numslots = *stasis_page_slotted_numslots_cptr(page);
const short freespace = *stasis_page_slotted_freespace_cptr(page);
const short freelist = *stasis_page_slotted_freelist_cptr(page);
const long slotListStart = (long)stasis_page_slotted_slot_length_ptr(&dummy, numslots-1);
assert(slotListStart < PAGE_SIZE && slotListStart >= 0);
assert(page_type == SLOTTED_PAGE || assert(page_type == SLOTTED_PAGE ||
page_type == BOUNDARY_TAG_PAGE || page_type == BOUNDARY_TAG_PAGE ||
page_type == SLOTTED_LSN_FREE_PAGE || page_type == SLOTTED_LSN_FREE_PAGE ||
page_type == SLOTTED_LATCH_FREE_PAGE); page_type == SLOTTED_LATCH_FREE_PAGE);
assert(numslots >= 0); stasis_page_slotted_check(page);
assert(numslots * SLOTTED_PAGE_OVERHEAD_PER_RECORD < PAGE_SIZE);
assert(freespace >= 0);
assert(freespace <= slotListStart);
assert(freelist >= INVALID_SLOT);
assert(freelist < numslots);
#ifdef SLOTTED_PAGE_SANITY_CHECKS
// Check integrity of freelist. All free slots less than
// numslots should be on it, in order.
short * slot_offsets = alloca(numslots * sizeof(short));
short * slot_lengths = alloca(numslots * sizeof(short));
for(int i = 0; i < numslots; i++) {
slot_offsets[i] = *stasis_page_slotted_slot_cptr(page, i);
slot_lengths[i] = *stasis_page_slotted_slot_length_cptr(page, i);
}
short foundEndOfList = 0;
if(freelist != INVALID_SLOT) {
assert(slot_offsets[freelist] == INVALID_SLOT);
} else {
foundEndOfList = 1;
}
for(short i = 0; i < numslots; i++) {
const short slot_length = slot_lengths[i];
const short slot_offset = slot_offsets[i];
if(slot_offset == INVALID_SLOT) {
if(slot_length == INVALID_SLOT) {
assert(!foundEndOfList);
foundEndOfList = 1;
} else {
assert (slot_offsets[slot_length] == INVALID_SLOT);
}
} else {
assert(slot_offset + slot_length <= freespace);
}
}
// Is the free list terminated?
assert(foundEndOfList);
#ifdef SLOTTED_PAGE_CHECK_FOR_OVERLAP
const byte UNUSED = 0xFF;
const byte PAGE_HEADER = 0xFE;
const byte SLOTTED_HEADER = 0xFD;
// const byte SLOT_LIST = 0xFC;
const byte FREE_SPACE = 0xFB;
const unsigned short S_SLOT_LIST = 0xFCFC;
byte image[PAGE_SIZE];
for(short i = 0; i < PAGE_SIZE; i++) {
image[i] = UNUSED;
}
for(short i = USABLE_SIZE_OF_PAGE; i < PAGE_SIZE; i++) {
image[i] = PAGE_HEADER;
}
for(short i = USABLE_SIZE_OF_PAGE - SLOTTED_PAGE_HEADER_OVERHEAD; i < USABLE_SIZE_OF_PAGE; i++) {
image[i] = SLOTTED_HEADER;
}
for(short i = *stasis_page_slotted_freespace_cptr(page); i < slotListStart; i++) {
image[i] = FREE_SPACE;
}
dummy.memAddr = image;
for(short i = 0; i < *stasis_page_slotted_numslots_cptr(page); i++) {
*stasis_page_slotted_slot_ptr(&dummy, i) = S_SLOT_LIST;
*stasis_page_slotted_slot_length_ptr(&dummy, i) = S_SLOT_LIST;
}
for(short i = 0; i < *stasis_page_slotted_numslots_cptr(page); i++) {
short slot_offset = *stasis_page_slotted_slot_cptr(page, i);
if(slot_offset != INVALID_SLOT) {
const unsigned char ci = i % 0xFF;
short slot_len = stasis_record_type_to_size(*stasis_page_slotted_slot_length_cptr(page, i));
for(short j = 0; j < slot_len; j++) {
assert(image[slot_offset + j] == 0xFF);
image[slot_offset + j] = ci;
}
}
}
#endif // SLOTTED_PAGE_CHECK_FOR_OVERLAP
#endif // SLOTTED_PAGE_SANITY_CHECKS
} }
/** /**
Move all of the records to the beginning of the page in order to Move all of the records to the beginning of the page in order to
increase the available free space. increase the available free space.
*/ */
static void slottedCompact(Page * page) { static void slottedCompact(Page * page) {
Page bufPage; stasis_page_slotted_compact(page);
byte buffer[PAGE_SIZE];
bufPage.memAddr = buffer;
// Copy external headers into bufPage.
memcpy(&buffer[USABLE_SIZE_OF_PAGE], &(page->memAddr[USABLE_SIZE_OF_PAGE]), PAGE_SIZE - USABLE_SIZE_OF_PAGE);
// Now, build new slotted page in the bufPage struct.
*stasis_page_slotted_freespace_ptr(&bufPage) = 0;
// numslots_ptr will be set later.
*stasis_page_slotted_freelist_ptr(&bufPage) = INVALID_SLOT;
const short numSlots = *stasis_page_slotted_numslots_ptr(page);
short lastFreeSlot = INVALID_SLOT;
short lastFreeSlotBeforeUsedSlot = INVALID_SLOT;
short lastUsedSlot = -1;
// Rebuild free list.
for(short i = 0; i < numSlots; i++) {
if(*stasis_page_slotted_slot_ptr(page, i) == INVALID_SLOT) {
if(lastFreeSlot == INVALID_SLOT) {
*stasis_page_slotted_freelist_ptr(&bufPage) = i;
} else {
*stasis_page_slotted_slot_length_ptr(&bufPage, lastFreeSlot) = i;
}
*stasis_page_slotted_slot_ptr(&bufPage, i) = INVALID_SLOT;
lastFreeSlot = i;
} else {
lastUsedSlot = i;
lastFreeSlotBeforeUsedSlot = lastFreeSlot;
short logicalSize = *stasis_page_slotted_slot_length_ptr(page, i);
short physicalSize = stasis_record_type_to_size(logicalSize);
memcpy(&(buffer[*stasis_page_slotted_freespace_ptr(&bufPage)]), stasis_page_slotted_record_ptr(page, i), physicalSize);
*stasis_page_slotted_slot_ptr(&bufPage, i) = *stasis_page_slotted_freespace_ptr(&bufPage);
*stasis_page_slotted_slot_length_ptr(&bufPage, i) = logicalSize;
(*stasis_page_slotted_freespace_ptr(&bufPage)) += physicalSize;
}
}
// Truncate linked list, and update numslots_ptr.
*stasis_page_slotted_slot_length_ptr(&bufPage, lastFreeSlotBeforeUsedSlot) = INVALID_SLOT;
*stasis_page_slotted_numslots_ptr(&bufPage) = lastUsedSlot+1;
memcpy(page->memAddr, buffer, PAGE_SIZE);
#ifdef SLOTTED_PAGE_OLD_CHECKS
slottedFsck(page);
#endif // SLOTTED_PAGE_OLD_CHECKS
} }
static void slottedCompactSlotIDs(int xid, Page * p) { static void slottedCompactSlotIDs(int xid, Page * p) {
int16_t numSlots = *stasis_page_slotted_numslots_ptr(p); stasis_page_slotted_compact_slot_ids(p);
int16_t out = 0;
for(int16_t in = 0; in < numSlots; in++) {
if(*stasis_page_slotted_slot_ptr(p, in) == INVALID_SLOT) {
// nop
} else {
*stasis_page_slotted_slot_ptr(p, out) = *stasis_page_slotted_slot_cptr(p, in);
*stasis_page_slotted_slot_length_ptr(p, out) = *stasis_page_slotted_slot_length_cptr(p, in);
out++;
}
}
*stasis_page_slotted_numslots_ptr(p) = out;
*stasis_page_slotted_freelist_ptr(p) = INVALID_SLOT;
} }
/**
Check to see how many bytes can fit in a given slot. This
makes it possible for callers to guarantee the safety
of a subsequent call to really_do_ralloc().
*/
static ssize_t slottedFreespaceForSlot(Page * page, int slot) {
ssize_t slotOverhead;
if(slot == INVALID_SLOT) {
slotOverhead = (*stasis_page_slotted_freelist_ptr(page) == INVALID_SLOT) ? SLOTTED_PAGE_OVERHEAD_PER_RECORD : 0;
} else if(slot < *stasis_page_slotted_numslots_ptr(page)) {
slotOverhead = 0;
} else {
// slotOverhead = SLOTTED_PAGE_OVERHEAD_PER_RECORD * (*numslots_ptr(page) - slot);
slotOverhead = SLOTTED_PAGE_OVERHEAD_PER_RECORD * ((slot+1) - *stasis_page_slotted_numslots_ptr(page));
}
// end_of_free_space points to the beginning of the slot header at the bottom of the page header.
byte* end_of_free_space = (byte*)stasis_page_slotted_slot_length_ptr(page, (*stasis_page_slotted_numslots_ptr(page))-1);
// start_of_free_space points to the first unallocated byte in the page
// (ignoring space that could be reclaimed by compaction)
byte* start_of_free_space = (byte*)(page->memAddr + *stasis_page_slotted_freespace_ptr(page));
assert(end_of_free_space >= start_of_free_space);
if(end_of_free_space < start_of_free_space + slotOverhead) {
// The regions would overlap after allocation. There is no free space.
return 0;
} else {
// The regions would not overlap. There might be free space.
return (ssize_t) (end_of_free_space - start_of_free_space - slotOverhead);
}
}
static inline void slottedSanityCheck(Page * p, recordid rid) {
#ifdef SLOTTED_PAGE_OLD_CHECKS
assert(p->id == rid.page);
assert(rid.size == INVALID_SIZE || rid.size < BLOB_THRESHOLD_SIZE); // Caller deals with this now!
slottedFsck(p);
#endif
}
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
// PUBLIC API IS BELOW THIS LINE // PUBLIC API IS BELOW THIS LINE
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
static const byte* slottedRead (int xid, Page *p, recordid rid) { static const byte* slottedRead (int xid, Page *p, recordid rid) {
slottedSanityCheck(p, rid);
return stasis_page_slotted_record_ptr(p, rid.slot); return stasis_page_slotted_record_ptr(p, rid.slot);
} }
static byte* slottedWrite(int xid, Page *p, recordid rid) { static byte* slottedWrite(int xid, Page *p, recordid rid) {
slottedSanityCheck(p, rid);
return stasis_page_slotted_record_ptr(p, rid.slot); return stasis_page_slotted_record_ptr(p, rid.slot);
} }
static int slottedGetType(int xid, Page *p, recordid rid) { static int slottedGetType(int xid, Page *p, recordid rid) {
#ifdef SLOTTED_PAGE_OLD_CHECKS return stasis_page_slotted_get_type(p, rid.slot);
//sanityCheck(p, rid); <-- Would fail if rid.size is a blob
assert(p->id == rid.page);
slottedFsck(p);
#endif
if(rid.slot >= *stasis_page_slotted_numslots_ptr(p)) { return INVALID_SLOT; }
if(*stasis_page_slotted_slot_ptr(p, rid.slot) == INVALID_SLOT) { return INVALID_SLOT; }
int ret = *stasis_page_slotted_slot_length_ptr(p, rid.slot);
return ret >= 0 ? NORMAL_SLOT : ret;
} }
static void slottedSetType(int xid, Page *p, recordid rid, int type) { static void slottedSetType(int xid, Page *p, recordid rid, int type) {
slottedSanityCheck(p, rid); stasis_page_slotted_set_type(p, rid.slot, type);
int old_type = *stasis_page_slotted_slot_length_ptr(p, rid.slot);
assert(rid.slot < *stasis_page_slotted_numslots_ptr(p));
assert(old_type != INVALID_SLOT);
if(type == NORMAL_SLOT) {
// set slot_length_ptr to the physical length.
*stasis_page_slotted_slot_length_ptr(p, rid.slot) = stasis_record_type_to_size(old_type);
} else {
// Changing to a special slot type; make sure doing so doesn't change
// the record size.
assert(stasis_record_type_to_size(type) == stasis_record_type_to_size(old_type));
*stasis_page_slotted_slot_length_ptr(p, rid.slot) = type;
}
} }
static int slottedGetLength(int xid, Page *p, recordid rid) { static int slottedGetLength(int xid, Page *p, recordid rid) {
#ifdef SLOTTED_PAGE_OLD_CHECKS return stasis_page_slotted_get_length(p, rid.slot);
assert(p->id == rid.page);
slottedFsck(p);
#endif
if( slottedGetType(xid, p, rid) == INVALID_SLOT)
return INVALID_SLOT;
else
return stasis_record_type_to_size(*stasis_page_slotted_slot_length_ptr(p, rid.slot));
} }
static recordid slottedNext(int xid, Page *p, recordid rid) { static recordid slottedNext(int xid, Page *p, recordid rid) {
slottedSanityCheck(p, rid);
short n = *stasis_page_slotted_numslots_ptr(p); rid.slot = stasis_page_slotted_next_record(p, rid.slot);
rid.slot ++;
while(rid.slot < n && slottedGetType(xid,p,rid)==INVALID_SLOT) { if(rid.slot != INVALID_SLOT) {
rid.slot++;
}
if(rid.slot != n) {
rid.size = *stasis_page_slotted_slot_length_ptr(p, rid.slot); rid.size = *stasis_page_slotted_slot_length_ptr(p, rid.slot);
return rid; return rid;
} else { } else {
return NULLRID; return NULLRID;
} }
} }
static recordid slottedFirst(int xid, Page *p) { static recordid slottedFirst(int xid, Page *p) {
#ifdef SLOTTED_PAGE_OLD_CHECKS
slottedFsck(p);
#endif
recordid rid = { p->id, -1, 0 }; recordid rid = { p->id, -1, 0 };
return slottedNext(xid, p, rid); return slottedNext(xid, p, rid);
} }
static recordid slottedLast(int xid, Page *p) { static recordid slottedLast(int xid, Page *p) {
recordid rid = {p->id, -1, 0 }; recordid rid = {p->id, -1, 0 };
rid.slot = (*stasis_page_slotted_numslots_cptr(p)) - 1; rid.slot = stasis_page_slotted_last_record(p);
rid.size = *stasis_page_slotted_slot_length_cptr(p, rid.slot); rid.size = *stasis_page_slotted_slot_length_cptr(p, rid.slot);
return rid; return rid;
} }
static int notSupported(int xid, Page * p) { return 0; } static int notSupported(int xid, Page * p) { return 0; }
static int slottedFreespace(int xid, Page * p) { static int slottedFreespace(int xid, Page * p) {
#ifdef SLOTTED_PAGE_OLD_CHECKS return stasis_page_slotted_freespace_for_slot(p, INVALID_SLOT);
slottedFsck(p);
#endif
return slottedFreespaceForSlot(p, INVALID_SLOT);
} }
static recordid slottedPreRalloc(int xid, Page * p, int type) { static recordid slottedPreRalloc(int xid, Page * p, int type) {
assert(type != INVALID_SLOT); slotid_t slot = stasis_page_slotted_pre_alloc(p, type);
#ifdef SLOTTED_PAGE_OLD_CHECKS if(slot == INVALID_SLOT) {
slottedFsck(p); return NULLRID;
#endif } else {
recordid rid = { p->id, slot, type };
recordid rid; return rid;
rid.page = p->id;
rid.slot = *stasis_page_slotted_numslots_ptr(p);
rid.size = type;
if(*stasis_page_slotted_freelist_ptr(p) != INVALID_SLOT) {
rid.slot = *stasis_page_slotted_freelist_ptr(p);
} }
if(slottedFreespaceForSlot(p, rid.slot) < stasis_record_type_to_size(type)) {
rid = NULLRID;
}
return rid;
} }
/** /**
Allocate data on a page after deciding which recordid to allocate, Allocate data on a page after deciding which recordid to allocate,
and making sure there is enough freespace. and making sure there is enough freespace.
@ -390,7 +98,7 @@ static recordid slottedPreRalloc(int xid, Page * p, int type) {
Allocation is complicated without locking. Consider this situation: Allocation is complicated without locking. Consider this situation:
(1) *numslot_ptr(page) is 10 (1) *numslot_ptr(page) is 10
(2) An aborting transcation calls really_do_ralloc(page) with rid.slot = 12 (2) An aborting transaction calls slottedPostRalloc(page) with rid.slot = 12
(3) *numslot_ptr(page) must be incremented to 12. Now, what happens to 11? (3) *numslot_ptr(page) must be incremented to 12. Now, what happens to 11?
- If 11 was also deleted by a transaction that could abort, we should lock it so that it won't be reused. - If 11 was also deleted by a transaction that could abort, we should lock it so that it won't be reused.
(4) This function adds it to the freelist to avoid leaking space. (Therefore, Talloc() can return recordids that will (4) This function adds it to the freelist to avoid leaking space. (Therefore, Talloc() can return recordids that will
@ -401,159 +109,22 @@ static recordid slottedPreRalloc(int xid, Page * p, int type) {
@param xid The transaction allocating the record. @param xid The transaction allocating the record.
@param page A pointer to the page. @param page A pointer to the page.
@param rid Recordid with 'internal' size. The size should have already been translated to a type if necessary. @param rid rid.size should be a size or (for special records) a type that
stasis_record_type_to_size() can interpret. This allows callers
to store record type information in the page's size field.
*/ */
static void slottedPostRalloc(int xid, Page * page, recordid rid) { static void slottedPostRalloc(int xid, Page * p, recordid rid) {
slottedSanityCheck(page, rid); assert(((short)rid.size) == rid.size);
stasis_page_slotted_post_alloc(p, rid.slot, rid.size);
short freeSpace;
// Compact the page if we don't have enough room.
if(slottedFreespaceForSlot(page, rid.slot) < stasis_record_type_to_size(rid.size)) {
slottedCompact(page);
// Make sure we have enough enough free space for the new record
assert (slottedFreespaceForSlot(page, rid.slot) >= stasis_record_type_to_size(rid.size));
}
freeSpace = *stasis_page_slotted_freespace_ptr(page);
// Remove this entry from the freelist (if necessary) slottedCompact
// assumes that this does not change the order of items in the list.
// If it did, then slottedCompact could leaks slot id's (or worse!)
if(rid.slot < *stasis_page_slotted_numslots_ptr(page) && *stasis_page_slotted_slot_ptr(page,rid.slot) == INVALID_SLOT) {
short next = *stasis_page_slotted_freelist_ptr(page);
short last = INVALID_SLOT;
// special case: is the slot physically before us the predecessor?
if(rid.slot > 0) {
if(*stasis_page_slotted_slot_length_ptr(page, rid.slot-1) == rid.slot && *stasis_page_slotted_slot_ptr(page, rid.slot-1) == INVALID_SLOT) {
next = rid.slot;
last = rid.slot-1;
}
}
while(next != INVALID_SLOT && next != rid.slot) {
last = next;
assert(next < *stasis_page_slotted_numslots_ptr(page));
short next_slot_ptr = *stasis_page_slotted_slot_ptr(page, next);
assert(next_slot_ptr == INVALID_SLOT);
next = *stasis_page_slotted_slot_length_ptr(page, next);
}
if(next == rid.slot) {
if(last == INVALID_SLOT) {
*stasis_page_slotted_freelist_ptr(page) = *stasis_page_slotted_slot_length_ptr(page, rid.slot);
} else {
*stasis_page_slotted_slot_length_ptr(page, last) = *stasis_page_slotted_slot_length_ptr(page, rid.slot);
}
}
}
// Insert any slots that come between the previous numslots_ptr()
// and the slot we're allocating onto the freelist. In order to
// promote the reuse of free slot numbers, we go out of our way to make sure
// that we put them in the list in increasing order. (Note: slottedCompact's
// correctness depends on this behavior!)
if(rid.slot > *stasis_page_slotted_numslots_ptr(page)) {
short lastSlot;
short numSlots = *stasis_page_slotted_numslots_ptr(page);
if(*stasis_page_slotted_freelist_ptr(page) == INVALID_SLOT) {
*stasis_page_slotted_freelist_ptr(page) = numSlots;
lastSlot = numSlots;
*stasis_page_slotted_slot_ptr(page, lastSlot) = INVALID_SLOT;
// will set slot_length_ptr on next iteration.
(*stasis_page_slotted_numslots_ptr(page))++;
} else {
lastSlot = INVALID_SLOT;
short next = *stasis_page_slotted_freelist_ptr(page);
while(next != INVALID_SLOT) {
lastSlot = next;
next = *stasis_page_slotted_slot_length_ptr(page, lastSlot);
assert(lastSlot < *stasis_page_slotted_numslots_ptr(page));
assert(*stasis_page_slotted_slot_ptr(page, lastSlot) == INVALID_SLOT);
}
*stasis_page_slotted_slot_ptr(page, lastSlot) = INVALID_SLOT;
}
// lastSlot now contains the tail of the free list. We can start adding slots to the list starting at *numslots_ptr.
while(*stasis_page_slotted_numslots_ptr(page) < rid.slot) {
*stasis_page_slotted_slot_length_ptr(page, lastSlot) = *stasis_page_slotted_numslots_ptr(page);
lastSlot = *stasis_page_slotted_numslots_ptr(page);
*stasis_page_slotted_slot_ptr(page, lastSlot) = INVALID_SLOT;
(*stasis_page_slotted_numslots_ptr(page))++;
}
// Terminate the end of the list.
assert(lastSlot < *stasis_page_slotted_numslots_ptr(page));
*stasis_page_slotted_slot_length_ptr(page, lastSlot) = INVALID_SLOT;
}
if(*stasis_page_slotted_numslots_ptr(page) == rid.slot) {
*stasis_page_slotted_numslots_ptr(page) = rid.slot+1;
}
assert(*stasis_page_slotted_numslots_ptr(page) > rid.slot);
DEBUG("Num slots %d\trid.slot %d\n", *stasis_page_slotted_numslots_ptr(page), rid.slot);
// Reserve space for this record and record the space's offset in
// the slot header.
assert(rid.slot < *stasis_page_slotted_numslots_ptr(page));
*stasis_page_slotted_freespace_ptr(page) = freeSpace + stasis_record_type_to_size(rid.size);
*stasis_page_slotted_slot_ptr(page, rid.slot) = freeSpace;
*stasis_page_slotted_slot_length_ptr(page, rid.slot) = rid.size;
} }
static void slottedSpliceSlot(int xid, Page *p, slotid_t a, slotid_t b) { static void slottedSpliceSlot(int xid, Page *p, slotid_t a, slotid_t b) {
if(a==b) { return; } // no-op stasis_page_slotted_splice_slot(p, a, b);
assert(a < b);
int16_t b_slot = *stasis_page_slotted_slot_cptr(p, b);
int16_t b_slot_len = *stasis_page_slotted_slot_length_cptr(p, b);
for(int16_t i = b-1; i >= a; i--) {
*stasis_page_slotted_slot_ptr(p, i+1) = *stasis_page_slotted_slot_cptr(p, i);
*stasis_page_slotted_slot_length_ptr(p, i+1) = *stasis_page_slotted_slot_length_cptr(p, i);
}
*stasis_page_slotted_slot_ptr(p, a) = b_slot;
*stasis_page_slotted_slot_length_ptr(p, a) = b_slot_len;
} }
static void slottedFree(int xid, Page * p, recordid rid) { static void slottedFree(int xid, Page * p, recordid rid) {
slottedSanityCheck(p, rid); stasis_page_slotted_free(p, rid.slot);
ssize_t rec_size = stasis_record_type_to_size(*stasis_page_slotted_slot_length_ptr(p, rid.slot));
if(*stasis_page_slotted_freespace_ptr(p) == *stasis_page_slotted_slot_ptr(p, rid.slot) + rec_size) {
(*stasis_page_slotted_freespace_ptr(p)) -= rec_size;
}
assert(rid.slot < *stasis_page_slotted_numslots_ptr(p));
if(rid.slot == *stasis_page_slotted_numslots_ptr(p)-1) {
(*stasis_page_slotted_numslots_ptr(p))--;
assert(slottedGetType(xid,p,rid)==INVALID_SLOT);
} else {
*stasis_page_slotted_slot_ptr(p, rid.slot) = INVALID_SLOT;
*stasis_page_slotted_slot_length_ptr(p, rid.slot) = *stasis_page_slotted_freelist_ptr(p);
*stasis_page_slotted_freelist_ptr(p) = rid.slot;
assert(slottedGetType(xid,p,rid)==INVALID_SLOT);
}
#ifdef SLOTTED_PAGE_OLD_CHECKS
slottedFsck(p);
#endif
} }
// XXX dereferenceRID // XXX dereferenceRID
static void slottedLoaded(Page *p) { static void slottedLoaded(Page *p) {
p->LSN = *stasis_page_lsn_ptr(p); p->LSN = *stasis_page_lsn_ptr(p);
slottedFsck(p); slottedFsck(p);
@ -564,6 +135,12 @@ static void slottedFlushed(Page *p) {
} }
static void slottedCleanup(Page *p) { } static void slottedCleanup(Page *p) { }
void stasis_page_slotted_initialize_page(Page * page) {
stasis_page_cleanup(page);
page->pageType = SLOTTED_PAGE;
stasis_page_slotted_initialize_page_raw(page);
}
void stasis_page_slotted_init() { void stasis_page_slotted_init() {
#ifdef SLOTTED_PAGE_CHECK_FOR_OVERLAP #ifdef SLOTTED_PAGE_CHECK_FOR_OVERLAP
#ifdef SLOTTED_PAGE_OLD_CHECKS #ifdef SLOTTED_PAGE_OLD_CHECKS
@ -575,14 +152,6 @@ void stasis_page_slotted_init() {
void stasis_page_slotted_deinit() { void stasis_page_slotted_deinit() {
} }
void stasis_page_slotted_initialize_page(Page * page) {
stasis_page_cleanup(page);
page->pageType = SLOTTED_PAGE;
*stasis_page_slotted_freespace_ptr(page) = 0;
*stasis_page_slotted_numslots_ptr(page) = 0;
*stasis_page_slotted_freelist_ptr(page) = INVALID_SLOT;
}
page_impl stasis_page_slotted_impl() { page_impl stasis_page_slotted_impl() {
static page_impl pi = { static page_impl pi = {
SLOTTED_PAGE, SLOTTED_PAGE,

View file

@ -10,10 +10,6 @@
#include <stasis/util/stlredblack.h> #include <stasis/util/stlredblack.h>
#include <stasis/util/redblack.h> #include <stasis/util/redblack.h>
#include <stdio.h> #include <stdio.h>
#undef end
#undef try
#undef catch
#include <stasis/util/stlslab.h>
extern "C" { extern "C" {
typedef int (*c_cmp_t)(const void*, const void*, const void*); typedef int (*c_cmp_t)(const void*, const void*, const void*);

View file

@ -40,64 +40,6 @@ permission to use and distribute the software in accordance with the
terms specified in this license. terms specified in this license.
---*/ ---*/
/**
* @file
*
* interface for dealing with generic, LSN based pages
*
* This file provides a re-entrant interface for pages that are labeled
* with an LSN and a page type.
*
* @ingroup PAGE_FORMATS
*
* $Id$
*/
/**
@defgroup PAGE_FORMATS Page layouts
Stasis allows developers to define their own on-disk page formats.
Currently, each page format must end with a hard-coded header
containing an LSN and a page type. (This restriction will be
removed in the future.)
This section explains how new page formats can be implemented in
Stasis, and documents the currently available page types.
A number of callbacks are invoked on existing pages as they are read
from disk, flushed back, and ultimately, evicted from cache:
-# stasis_page_loaded() is invoked when the page is read from disk. It
should set the Page::LSN field appropriately, and
perhaps allocate any data structures that will be stored in the
Page::impl field.
-# stasis_page_flushed() is invoked when a dirty page is written back to
disk. It should make sure that all updates are applied to the
physical representation of the page. (Implementations of this
callback usually copy the Page::LSN field into the page header.)
-# stasis_page_cleanup() is invoked before a page is evicted from cache.
It should free any memory associated with the page, such as
that allocated by stasis_page_loaded(), or pointed to by Page::impl.
When an uninitialized page is read from disk, Stasis has no way of
knowing which stasis_page_loaded() callback should be invoked. Therefore,
when a new page is initialized the page initialization code should
perform the work that would normally be performed by stasis_page_loaded().
Similarly, before a page is freed (and therefore, will be treated as
uninitialized data) stasis_page_cleanup() should be called.
Page implementations are free to define their own access methods
and APIs. However, Stasis's record oriented page interface
provides a default set of methods for page access.
@see PAGE_RECORD_INTERFACE
@todo Page deallocators should call stasis_page_cleanup()
@todo Create variant of loadPage() that takes a page type
@todo Add support for LSN free pages.
*/
/*@{*/
#ifndef __PAGE_H__ #ifndef __PAGE_H__
#define __PAGE_H__ #define __PAGE_H__
@ -121,6 +63,7 @@ BEGIN_C_DECLS
@todo Remove next and prev from Page_s @todo Remove next and prev from Page_s
*/ */
struct Page_s { struct Page_s {
pageid_t id; pageid_t id;
/** /**
@ -241,205 +184,6 @@ struct Page_s {
+----------------------------------------------------+-----------+-----+ +----------------------------------------------------+-----------+-----+
</pre> </pre>
*/ */
/*@{*/
static inline lsn_t* stasis_page_lsn_ptr(Page *p) {
return ((lsn_t*)(&(p->memAddr[PAGE_SIZE])))-1;
}
static inline const lsn_t* stasis_page_lsn_cptr(const Page *p) {
return ((const lsn_t*)(&(p->memAddr[PAGE_SIZE])))-1;
}
/**
Returns a pointer to the page's type. This information is stored with the LSN.
Stasis uses it to determine which page implementation should handle each
page.
@param p Any page that contains an LSN header.
@see stasis_page_impl_register
@todo Need to typedef page_type_t
*/
static inline int* stasis_page_type_ptr(Page *p) {
return ((int*)stasis_page_lsn_ptr(p))-1;
}
static inline const int* stasis_page_type_cptr(const Page *p) {
return ((const int*)stasis_page_lsn_cptr(p))-1;
}
/**
* assumes that the page is already loaded in memory. It takes as a
* parameter a Page. The Page struct contains the new LSN and the
* page number to which the new LSN must be written to. Furthermore,
* this function updates the dirtyPages table, if necessary. The
* dirtyPages table is needed for log truncation. (If the page->id is
* null, this function assumes the page is not in the buffer pool, and
* does not update dirtyPages. Similarly, if the page is already
* dirty, there is no need to update dirtyPages.
*
* @param xid The transaction that is writing to the page, or -1 if
* outside of a transaction.
*
* @param page You must have a writelock on page before calling this
* function.
*
* @param lsn The new lsn of the page. If the new lsn is less than
* the page's current LSN, then the page's LSN will not be changed.
* If the page is clean, the new LSN must be greater than the old LSN.
*/
void stasis_page_lsn_write(int xid, Page * page, lsn_t lsn);
/**
* assumes that the page is already loaded in memory. It takes
* as a parameter a Page and returns the LSN that is currently written on that
* page in memory.
*/
lsn_t stasis_page_lsn_read(const Page * page);
/*@}*/
/**
@defgroup PAGE_UTIL Byte-level page manipulation
These methods make it easy to manipulate pages that use a standard
Stasis header (one with an LSN and page type).
Each one counts bytes from the beginning or end of the page's
usable space. Methods with "_cptr_" in their names return const
pointers (and can accept const Page pointers as arguments).
Methods with "_ptr_" in their names take non-const pages, and
return non-const pointers.
@par Implementing new pointer arithmetic macros
Stasis page type implementations typically do little more than
pointer arithmetic. However, implementing page types cleanly and
portably is a bit tricky. Stasis has settled upon a compromise in
this matter. Its page file formats are compatible within a single
architecture, but not across systems with varying lengths of
primitive types, or that vary in endianness.
Over time, types that vary in length such as "int", "long", etc
will be removed from Stasis, but their usage still exists in a few
places. Once they have been removed, file compatibility problems
should be limited to endianness (though application code will still
be free to serialize objects in a non-portable manner).
Most page implementations leverage C's pointer manipulation
semantics to lay out pages. Rather than casting pointers to
char*'s and then manually calculating byte offsets using sizeof(),
the existing page types prefer to cast pointers to appropriate
types, and then add or subtract the appropriate number of values.
For example, instead of doing this:
@code
// p points to an int, followed by a two bars, then the foo whose address
// we want to calculate
int * p;
foo* f = (foo*)( ((char*)p) + sizeof(int) + 2 * sizeof(bar))
@endcode
the implementations would do this:
@code
int * p;
foo * f = (foo*)( ((bar*)(p+1)) + 2 )
@endcode
The main disadvantage of this approach is the large number of ()'s
involved. However, it lets the compiler deal with the underlying
multiplications, and often reduces the number of casts, leading to
slightly more readable code. Take this implementation of
stasis_page_type_ptr(), for example:
@code
int * stasis_page_type_ptr(Page *p) {
return ( (int*)stasis_page_lsn_ptr(Page *p) ) - 1;
}
@endcode
Here, the page type is stored as an integer immediately before the
LSN pointer. Using arithmetic over char*'s would require an extra
cast to char*, and a multiplication by sizeof(int).
*/
/*@{*/
static inline byte*
stasis_page_byte_ptr_from_start(Page *p, int count) {
return ((byte*)(p->memAddr))+count;
}
static inline byte*
stasis_page_byte_ptr_from_end(Page *p, int count) {
return ((byte*)stasis_page_type_ptr(p))-count;
}
static inline int16_t*
stasis_page_int16_ptr_from_start(Page *p, int count) {
return ((int16_t*)(p->memAddr))+count;
}
static inline int16_t*
stasis_page_int16_ptr_from_end(Page *p, int count) {
return ((int16_t*)stasis_page_type_ptr(p))-count;
}
static inline int32_t*
stasis_page_int32_ptr_from_start(Page *p, int count) {
return ((int32_t*)(p->memAddr))+count;
}
static inline int32_t*
stasis_page_int32_ptr_from_end(Page *p, int count) {
return ((int32_t*)stasis_page_type_ptr(p))-count;
}
static inline pageid_t*
stasis_page_pageid_t_ptr_from_start(Page *p, int count) {
return ((pageid_t*)(p->memAddr))+count;
}
static inline pageid_t*
stasis_page_pageid_t_ptr_from_end(Page *p, int count) {
return ((pageid_t*)stasis_page_type_ptr(p))-count;
}
// Const methods
static inline const byte*
stasis_page_byte_cptr_from_start(const Page *p, int count) {
return (const byte*)stasis_page_byte_ptr_from_start((Page*)p, count);
}
static inline const byte*
stasis_page_byte_cptr_from_end(const Page *p, int count) {
return (const byte*)stasis_page_byte_ptr_from_end((Page*)p, count);
}
static inline const int16_t*
stasis_page_int16_cptr_from_start(const Page *p, int count) {
return (const int16_t*)stasis_page_int16_ptr_from_start((Page*)p,count);
}
static inline const int16_t*
stasis_page_int16_cptr_from_end(const Page *p, int count) {
return ((int16_t*)stasis_page_type_cptr(p))-count;
}
static inline const int32_t*
stasis_page_int32_cptr_from_start(const Page *p, int count) {
return ((const int32_t*)(p->memAddr))+count;
}
static inline const int32_t*
stasis_page_int32_cptr_from_end(const Page *p, int count) {
return (const int32_t*)stasis_page_int32_ptr_from_end((Page*)p,count);
}
static inline const pageid_t*
stasis_page_pageid_t_cptr_from_start(const Page *p, int count) {
return ((const pageid_t*)(p->memAddr))+count;
}
static inline const pageid_t*
stasis_page_pageid_t_cptr_from_end(const Page *p, int count) {
return (const pageid_t*)stasis_page_pageid_t_cptr_from_end(p,count);
}
/*@}*/
/** /**
* initializes all the global variables needed by the functions * initializes all the global variables needed by the functions
@ -785,7 +529,7 @@ typedef struct page_impl {
@param p the page whose freespace will be estimated. @param p the page whose freespace will be estimated.
@return The number of bytes of free space on the page, or (for @return The number of bytes of free space on the page, or (for
efficiency's sake) an underestimate. efficiency's sake) an underestimate.
*/ */
int (*pageFreespace)(int xid, Page * p); int (*pageFreespace)(int xid, Page * p);
@ -909,26 +653,22 @@ int stasis_page_impl_register(page_impl impl);
*/ */
page_impl* stasis_page_impl_get(int id); page_impl* stasis_page_impl_get(int id);
// -------------------- Page specific, general purpose methods
/**
Initialize a new page
@param p The page that will be turned into a new slotted page.
Its contents will be overwritten. It was probably
returned by loadPage()
*/
void stasis_page_slotted_initialize_page(Page * p);
void stasis_page_slotted_latch_free_initialize_page(Page * page);
void stasis_slotted_lsn_free_initialize_page(Page * p);
void stasis_fixed_initialize_page(Page * page, size_t size, int count);
void stasis_indirect_initialize_page(Page * p, int height);
int stasis_fixed_records_per_page(size_t size);
void stasis_page_blob_initialize_page(Page * p);
page_impl slottedLsnFreeImpl(); page_impl slottedLsnFreeImpl();
page_impl segmentImpl(); page_impl segmentImpl();
#define stasis_mempage_memaddr(p) (p)
#define stasis_page(x) stasis_mempage_##x
#define PAGE byte
#include "page/page-impl.h"
#undef stasis_mempage_memaddr
#undef stasis_page
#undef PAGE
#include "page/page-impl.h"
END_C_DECLS END_C_DECLS
#endif #endif

93
stasis/page/fixed-impl.h Normal file
View file

@ -0,0 +1,93 @@
/*
* fixed-impl.h
*
* Created on: Nov 7, 2011
* Author: sears
*/
static inline int16_t* stasis_page(fixed_recordsize_ptr) (PAGE * p) { return stasis_page(int16_ptr_from_end) (p, 1); }
static inline int16_t* stasis_page(fixed_recordcount_ptr)(PAGE * p) { return stasis_page(int16_ptr_from_end) (p, 2); }
static inline byte* stasis_page(fixed_record_ptr) (PAGE * p, slotid_t n) { return stasis_page(byte_ptr_from_start)(p, *stasis_page(fixed_recordsize_ptr)(p) * n); }
static inline const int16_t* stasis_page(fixed_recordsize_cptr) (const PAGE * p) { return stasis_page(fixed_recordsize_ptr) ((PAGE*)p); }
static inline const int16_t* stasis_page(fixed_recordcount_cptr)(const PAGE * p) { return stasis_page(fixed_recordcount_ptr)((PAGE*)p); }
static inline const byte* stasis_page(fixed_record_cptr) (const PAGE * p, slotid_t n) { return stasis_page(fixed_record_ptr) ((PAGE*)p, n); }
static inline slotid_t stasis_page(fixed_next_slot)(PAGE *p, slotid_t slot) {
slot++;
if(*stasis_page(fixed_recordcount_cptr)(p) > slot) {
return slot;
} else {
return -1;
}
}
static inline slotid_t stasis_page(fixed_first_slot)(PAGE *p) {
return stasis_page(fixed_next_slot)(p, -1);
}
static inline slotid_t stasis_page(fixed_last_slot)(PAGE *p) {
return -(*stasis_page(fixed_recordcount_cptr)(p)) - 1;
}
static inline int stasis_page(fixed_records_per_page)(size_t size) {
return (USABLE_SIZE_OF_PAGE - 2*sizeof(short)) / size;
}
static inline void stasis_page(fixed_initialize_page_raw)(PAGE * page, size_t size, int count) {
// Zero out the page contents, since callers often do so anyway.
// blows away LSN, but the copy that's in p->LSN will be put back on page at flush.
memset(stasis_page(memaddr)(page), 0, PAGE_SIZE);
*stasis_page(fixed_recordsize_ptr)(page) = size;
assert(count <= stasis_page(fixed_records_per_page)(size));
*stasis_page(fixed_recordcount_ptr)(page)= count;
}
static inline int stasis_page(fixed_get_type)(PAGE *p, slotid_t slot) {
// checkRid(p, rid);
if(slot < *stasis_page(fixed_recordcount_cptr)(p)) {
int type = *stasis_page(fixed_recordsize_cptr)(p);
if(type > 0) {
type = NORMAL_SLOT;
}
return type;
} else {
return INVALID_SLOT;
}
}
static inline void stasis_page(fixed_set_type)(PAGE *p, slotid_t slot, int type) {
//XXX stasis_page(checkRid)(p,rid);
assert(slot < *stasis_page(fixed_recordcount_cptr)(p));
assert(stasis_record_type_to_size(type) == stasis_record_type_to_size(*stasis_page(fixed_recordsize_cptr)(p)));
}
static inline int stasis_page(fixed_get_length)(PAGE *p, slotid_t slot) {
return slot > *stasis_page(fixed_recordcount_cptr)(p) ?
INVALID_SLOT : stasis_record_type_to_size(*stasis_page(fixed_recordsize_cptr)(p));
}
static inline int stasis_page(fixed_freespace_raw)(PAGE * p) {
if(stasis_page(fixed_records_per_page)(*stasis_page(fixed_recordsize_cptr)(p)) > *stasis_page(fixed_recordcount_cptr)(p)) {
// Return the size of a slot; that's the biggest record we can take.
return stasis_record_type_to_size(*stasis_page(fixed_recordsize_cptr)(p));
} else {
// Page full; return zero.
return 0;
}
}
static inline int16_t stasis_page(fixed_pre_alloc)(PAGE *p, int size) {
if(stasis_page(fixed_records_per_page)(*stasis_page(fixed_recordsize_cptr)(p)) > *stasis_page(fixed_recordcount_cptr)(p)) {
return *stasis_page(fixed_recordcount_cptr)(p);
} else {
return -1;
}
}
static inline void stasis_page(fixed_post_alloc)(PAGE *p, slotid_t n) {
assert(*stasis_page(fixed_recordcount_cptr)(p) == n);
(*stasis_page(fixed_recordcount_ptr)(p))++;
}
static inline void stasis_page(fixed_free)(PAGE *p, slotid_t n) {
if(*stasis_page(fixed_recordsize_cptr)(p) == n+1) {
(*stasis_page(fixed_recordsize_ptr)(p))--;
} else {
// leak space; there's no way to track it with this page format.
}
}

View file

@ -2,38 +2,13 @@
#ifndef __FIXED_H #ifndef __FIXED_H
#define __FIXED_H #define __FIXED_H
/**
@todo rename fixed.h macros turn them into static inline functions.
*/
#define recordsize_ptr(page) stasis_page_int16_ptr_from_end((page), 1)
#define recordsize_cptr(page) stasis_page_int16_cptr_from_end((page), 1)
#define recordcount_ptr(page) stasis_page_int16_ptr_from_end((page), 2)
#define fixed_record_ptr(page, n) stasis_page_byte_ptr_from_start((page), *recordsize_ptr((page)) * (n))
static inline recordid fixedNext(int xid, Page *p, recordid rid) { void stasis_page_fixed_init();
short n = *recordcount_ptr(p); void stasis_page_fixed_deinit();
rid.slot++;
rid.size = *recordsize_ptr(p);
if(rid.slot >= n) {
return NULLRID;
} else {
return rid;
}
}
static inline recordid fixedFirst(int xid, Page *p) {
recordid rid = { p->id, -1, 0 };
rid.size = *recordsize_ptr(p);
return fixedNext(xid, p, rid);
}
static inline recordid fixedLast(int xid, Page *p) {
recordid rid = { p->id, -1, 0 };
rid.size = *recordsize_ptr(p);
rid.slot = -1+*recordcount_ptr(p);
return rid;
}
void fixedPageInit(); void stasis_page_fixed_initialize_page(Page * page, size_t size, int count);
void fixedPageDeinit(); recordid stasis_page_fixed_next_record(int xid, Page *p, recordid rid);
page_impl fixedImpl();
page_impl arrayListImpl(); page_impl stasis_page_fixed_impl();
page_impl stasis_page_array_list_impl();
#endif #endif

296
stasis/page/page-impl.h Normal file
View file

@ -0,0 +1,296 @@
/*
* page-impl.h
*
* Created on: Nov 7, 2011
* Author: sears
*/
/**
* @file
*
* interface for dealing with generic, LSN based pages
*
* This file provides a re-entrant interface for pages that are labeled
* with an LSN and a page type.
*
* @ingroup PAGE_FORMATS
*
* $Id: page.h 1526 2011-06-13 11:26:25Z sears.russell@gmail.com $
*/
#ifndef stasis_page
#define stasis_page_memaddr(p) (p->memAddr)
#define stasis_page(x) stasis_page_##x
#define PAGE Page
#define STASIS_PAGE_IMPL_NEED_UNDEF
#endif
/**
@defgroup PAGE_FORMATS Page layouts
Stasis allows developers to define their own on-disk page formats.
Currently, each page format must end with a hard-coded header
containing an LSN and a page type. (This restriction will be
removed in the future.)
This section explains how new page formats can be implemented in
Stasis, and documents the currently available page types.
A number of callbacks are invoked on existing pages as they are read
from disk, flushed back, and ultimately, evicted from cache:
-# stasis_page_loaded() is invoked when the page is read from disk. It
should set the Page::LSN field appropriately, and
perhaps allocate any data structures that will be stored in the
Page::impl field.
-# stasis_page_flushed() is invoked when a dirty page is written back to
disk. It should make sure that all updates are applied to the
physical representation of the page. (Implementations of this
callback usually copy the Page::LSN field into the page header.)
-# stasis_page_cleanup() is invoked before a page is evicted from cache.
It should free any memory associated with the page, such as
that allocated by stasis_page_loaded(), or pointed to by Page::impl.
When an uninitialized page is read from disk, Stasis has no way of
knowing which stasis_page_loaded() callback should be invoked. Therefore,
when a new page is initialized the page initialization code should
perform the work that would normally be performed by stasis_page_loaded().
Similarly, before a page is freed (and therefore, will be treated as
uninitialized data) stasis_page_cleanup() should be called.
Page implementations are free to define their own access methods
and APIs. However, Stasis's record oriented page interface
provides a default set of methods for page access.
@see PAGE_RECORD_INTERFACE
@todo Page deallocators should call stasis_page_cleanup()
@todo Create variant of loadPage() that takes a page type
@todo Add support for LSN free pages.
*/
/*@{*/
/*@{*/
static inline lsn_t* stasis_page(lsn_ptr)(PAGE *p) {
return ((lsn_t*)(&(stasis_page(memaddr)(p)[PAGE_SIZE])))-1;
}
static inline const lsn_t* stasis_page(lsn_cptr)(const PAGE *p) {
return ((const lsn_t*)(&(stasis_page(memaddr)(p)[PAGE_SIZE])))-1;
}
/**
Returns a pointer to the page's type. This information is stored with the LSN.
Stasis uses it to determine which page implementation should handle each
page.
@param p Any page that contains an LSN header.
@see stasis_page_impl_register
@todo Need to typedef page_type_t
*/
static inline int* stasis_page(type_ptr)(PAGE *p) {
return ((int*)stasis_page(lsn_ptr)(p))-1;
}
static inline const int* stasis_page(type_cptr)(const PAGE *p) {
return ((const int*)stasis_page(lsn_cptr)(p))-1;
}
/**
* assumes that the page is already loaded in memory. It takes as a
* parameter a Page. The Page struct contains the new LSN and the
* page number to which the new LSN must be written to. Furthermore,
* this function updates the dirtyPages table, if necessary. The
* dirtyPages table is needed for log truncation. (If the page->id is
* null, this function assumes the page is not in the buffer pool, and
* does not update dirtyPages. Similarly, if the page is already
* dirty, there is no need to update dirtyPages.
*
* @param xid The transaction that is writing to the page, or -1 if
* outside of a transaction.
*
* @param page You must have a writelock on page before calling this
* function.
*
* @param lsn The new lsn of the page. If the new lsn is less than
* the page's current LSN, then the page's LSN will not be changed.
* If the page is clean, the new LSN must be greater than the old LSN.
*/
void stasis_page(lsn_write)(int xid, PAGE * page, lsn_t lsn);
/**
* assumes that the page is already loaded in memory. It takes
* as a parameter a Page and returns the LSN that is currently written on that
* page in memory.
*/
lsn_t stasis_page(lsn_read)(const PAGE * page);
/*@}*/
/**
@defgroup PAGE_UTIL Byte-level page manipulation
These methods make it easy to manipulate pages that use a standard
Stasis header (one with an LSN and page type).
Each one counts bytes from the beginning or end of the page's
usable space. Methods with "_cptr_" in their names return const
pointers (and can accept const Page pointers as arguments).
Methods with "_ptr_" in their names take non-const pages, and
return non-const pointers.
@par Implementing new pointer arithmetic macros
Stasis page type implementations typically do little more than
pointer arithmetic. However, implementing page types cleanly and
portably is a bit tricky. Stasis has settled upon a compromise in
this matter. Its page file formats are compatible within a single
architecture, but not across systems with varying lengths of
primitive types, or that vary in endianness.
Over time, types that vary in length such as "int", "long", etc
will be removed from Stasis, but their usage still exists in a few
places. Once they have been removed, file compatibility problems
should be limited to endianness (though application code will still
be free to serialize objects in a non-portable manner).
Most page implementations leverage C's pointer manipulation
semantics to lay out pages. Rather than casting pointers to
char*'s and then manually calculating byte offsets using sizeof(),
the existing page types prefer to cast pointers to appropriate
types, and then add or subtract the appropriate number of values.
For example, instead of doing this:
@code
// p points to an int, followed by a two bars, then the foo whose address
// we want to calculate
int * p;
foo* f = (foo*)( ((char*)p) + sizeof(int) + 2 * sizeof(bar))
@endcode
the implementations would do this:
@code
int * p;
foo * f = (foo*)( ((bar*)(p+1)) + 2 )
@endcode
The main disadvantage of this approach is the large number of ()'s
involved. However, it lets the compiler deal with the underlying
multiplications, and often reduces the number of casts, leading to
slightly more readable code. Take this implementation of
stasis_page_type_ptr(), for example:
@code
int * stasis_page_type_ptr(Page *p) {
return ( (int*)stasis_page_lsn_ptr(Page *p) ) - 1;
}
@endcode
Here, the page type is stored as an integer immediately before the
LSN pointer. Using arithmetic over char*'s would require an extra
cast to char*, and a multiplication by sizeof(int).
*/
/*@{*/
static inline byte*
stasis_page(byte_ptr_from_start)(PAGE *p, int count) {
return ((byte*)(stasis_page(memaddr)(p)))+count;
}
static inline byte*
stasis_page(byte_ptr_from_end)(PAGE *p, int count) {
return ((byte*)stasis_page(type_ptr)(p))-count;
}
static inline int16_t*
stasis_page(int16_ptr_from_start)(PAGE *p, int count) {
return ((int16_t*)(stasis_page(memaddr)(p)))+count;
}
static inline int16_t*
stasis_page(int16_ptr_from_end)(PAGE *p, int count) {
return ((int16_t*)stasis_page(type_ptr)(p))-count;
}
static inline int32_t*
stasis_page(int32_ptr_from_start)(PAGE *p, int count) {
return ((int32_t*)(stasis_page(memaddr)(p)))+count;
}
static inline int32_t*
stasis_page(int32_ptr_from_end)(PAGE *p, int count) {
return ((int32_t*)stasis_page(type_ptr)(p))-count;
}
static inline pageid_t*
stasis_page(pageid_t_ptr_from_start)(PAGE *p, int count) {
return ((pageid_t*)(stasis_page(memaddr)(p)))+count;
}
static inline pageid_t*
stasis_page(pageid_t_ptr_from_end)(PAGE *p, int count) {
return ((pageid_t*)stasis_page(type_ptr)(p))-count;
}
// Const methods
static inline const byte*
stasis_page(byte_cptr_from_start)(const PAGE *p, int count) {
return (const byte*)stasis_page(byte_ptr_from_start)((PAGE*)p, count);
}
static inline const byte*
stasis_page(byte_cptr_from_end)(const PAGE *p, int count) {
return (const byte*)stasis_page(byte_ptr_from_end)((PAGE*)p, count);
}
static inline const int16_t*
stasis_page(int16_cptr_from_start)(const PAGE *p, int count) {
return (const int16_t*)stasis_page(int16_ptr_from_start)((PAGE*)p,count);
}
static inline const int16_t*
stasis_page(int16_cptr_from_end)(const PAGE *p, int count) {
return ((int16_t*)stasis_page(type_cptr)(p))-count;
}
static inline const int32_t*
stasis_page(int32_cptr_from_start)(const PAGE *p, int count) {
return ((const int32_t*)(stasis_page(memaddr)(p)))+count;
}
static inline const int32_t*
stasis_page(int32_cptr_from_end)(const PAGE *p, int count) {
return (const int32_t*)stasis_page(int32_ptr_from_end)((PAGE*)p,count);
}
static inline const pageid_t*
stasis_page(pageid_t_cptr_from_start)(const PAGE *p, int count) {
return ((const pageid_t*)(stasis_page(memaddr)(p)))+count;
}
static inline const pageid_t*
stasis_page(pageid_t_cptr_from_end)(const PAGE *p, int count) {
return (const pageid_t*)stasis_page(pageid_t_cptr_from_end)(p,count);
}
/*@}*/
// -------------------- Page specific, general purpose methods
/**
Initialize a new page
@param p The page that will be turned into a new slotted page.
Its contents will be overwritten. It was probably
returned by loadPage()
*/
void stasis_page(slotted_initialize_page)(PAGE * p);
void stasis_page(slotted_latch_free_initialize_page)(PAGE * page);
void stasis_page(slotted_lsn_free_initialize_page)(Page * p);
void stasis_page(indirect_initialize_page)(Page * p, int height);
void stasis_page(blob_initialize_page)(PAGE * p);
#include "fixed-impl.h"
#include "slotted-impl.h"
#ifdef STASIS_PAGE_IMPL_NEED_UNDEF
#undef stasis_page_memaddr
#undef stasis_page
#undef PAGE
#undef STASIS_PAGE_IMPL_NEED_UNDEF
#endif

461
stasis/page/slotted-impl.h Normal file
View file

@ -0,0 +1,461 @@
/*
* slotted-impl.h
*
* Created on: Nov 7, 2011
* Author: sears
*/
#ifdef LONG_TEST
#define SLOTTED_PAGE_CHECK_FOR_OVERLAP 1
#endif
// plan: slotted fsck on read / write. Make it more thorough so that the other methods only check for record existence.
// intermediate ops assume that slotted.c is correctly implemented (ie: fsck passes iff page is OK, page always "stays" OK)
// benchmark page ops (real time) + hash table (user time)
//#define SLOTTED_PAGE_OLD_CHECKS
#define SLOTTED_PAGE_NEW_CHECKS
#ifdef SLOTTED_PAGE_NEW_CHECKS
#define SLOTTED_PAGE_SANITY_CHECKS
#define SLOTTED_PAGE_CHECK_FOR_OVERLAP
#endif
#define SLOTTED_PAGE_OVERHEAD_PER_RECORD (2 * sizeof(short))
#define SLOTTED_PAGE_HEADER_OVERHEAD (3 * sizeof(short))
static inline int16_t* stasis_page(slotted_freespace_ptr) (PAGE * p) { return stasis_page(int16_ptr_from_end) (p, 1); }
static inline int16_t* stasis_page(slotted_numslots_ptr) (PAGE * p) { return stasis_page(int16_ptr_from_end) (p, 2); }
static inline int16_t* stasis_page(slotted_freelist_ptr) (PAGE * p) { return stasis_page(int16_ptr_from_end) (p, 3); }
static inline int16_t* stasis_page(slotted_slot_ptr) (PAGE * p, slotid_t n) { return stasis_page(int16_ptr_from_end) (p, (2*(n))+4); }
static inline int16_t* stasis_page(slotted_slot_length_ptr)(PAGE * p, slotid_t n) { return stasis_page(int16_ptr_from_end) (p, (2*(n))+5); }
static inline byte* stasis_page(slotted_record_ptr) (PAGE * p, slotid_t n) { return stasis_page(byte_ptr_from_start)(p, *stasis_page(slotted_slot_ptr)(p, n)); }
static inline const int16_t* stasis_page(slotted_freespace_cptr) (const PAGE * p) { return stasis_page(slotted_freespace_ptr) ((PAGE*)p); }
static inline const int16_t* stasis_page(slotted_numslots_cptr) (const PAGE * p) { return stasis_page(slotted_numslots_ptr) ((PAGE*)p); }
static inline const int16_t* stasis_page(slotted_freelist_cptr) (const PAGE * p) { return stasis_page(slotted_freelist_ptr) ((PAGE*)p); }
static inline const int16_t* stasis_page(slotted_slot_cptr) (const PAGE * p, slotid_t n) { return stasis_page(slotted_slot_ptr) ((PAGE*)p, n); }
static inline const int16_t* stasis_page(slotted_slot_length_cptr)(const PAGE * p, slotid_t n) { return stasis_page(slotted_slot_length_ptr)((PAGE*)p, n); }
static inline const byte* stasis_page(slotted_record_cptr) (const PAGE * p, slotid_t n) { return stasis_page(slotted_record_ptr) ((PAGE*)p, n); }
static inline void stasis_page(slotted_initialize_page_raw)(PAGE * page) {
*stasis_page(slotted_freespace_ptr)(page) = 0;
*stasis_page(slotted_numslots_ptr)(page) = 0;
*stasis_page(slotted_freelist_ptr)(page) = INVALID_SLOT;
}
static inline void stasis_page(slotted_check)(const PAGE * page) {
const short numslots = *stasis_page(slotted_numslots_cptr)(page);
const short freespace = *stasis_page(slotted_freespace_cptr)(page);
const short freelist = *stasis_page(slotted_freelist_cptr)(page);
const long slotListStart = (const byte*)stasis_page(slotted_slot_length_cptr)(page, numslots-1)
- (const byte*)stasis_page(memaddr)(page);
assert(slotListStart < PAGE_SIZE && slotListStart >= 0);
assert(numslots >= 0);
assert(numslots * SLOTTED_PAGE_OVERHEAD_PER_RECORD < PAGE_SIZE);
assert(freespace >= 0);
assert(freespace <= slotListStart);
assert(freelist >= INVALID_SLOT);
assert(freelist < numslots);
#ifdef SLOTTED_PAGE_SANITY_CHECKS
// Check integrity of freelist. All free slots less than
// numslots should be on it, in order.
short * slot_offsets = (short*)alloca(numslots * sizeof(short));
short * slot_lengths = (short*)alloca(numslots * sizeof(short));
for(int i = 0; i < numslots; i++) {
slot_offsets[i] = *stasis_page(slotted_slot_cptr)(page, i);
slot_lengths[i] = *stasis_page(slotted_slot_length_cptr)(page, i);
}
short foundEndOfList = 0;
if(freelist != INVALID_SLOT) {
assert(slot_offsets[freelist] == INVALID_SLOT);
} else {
foundEndOfList = 1;
}
for(short i = 0; i < numslots; i++) {
const short slot_length = slot_lengths[i];
const short slot_offset = slot_offsets[i];
if(slot_offset == INVALID_SLOT) {
if(slot_length == INVALID_SLOT) {
assert(!foundEndOfList);
foundEndOfList = 1;
} else {
assert (slot_offsets[slot_length] == INVALID_SLOT);
}
} else {
assert(slot_offset + slot_length <= freespace);
}
}
// Is the free list terminated?
assert(foundEndOfList);
#ifdef SLOTTED_PAGE_CHECK_FOR_OVERLAP
// Page dummy;
//dummy.id = -1;
//dummy.memAddr = 0;
const byte UNUSED = 0xFF;
const byte PAGE_HEADER = 0xFE;
const byte SLOTTED_HEADER = 0xFD;
// const byte SLOT_LIST = 0xFC;
const byte FREE_SPACE = 0xFB;
const unsigned short S_SLOT_LIST = 0xFCFC;
byte image[PAGE_SIZE];
for(unsigned short i = 0; i < PAGE_SIZE; i++) {
image[i] = UNUSED;
}
for(unsigned short i = USABLE_SIZE_OF_PAGE; i < PAGE_SIZE; i++) {
image[i] = PAGE_HEADER;
}
for(unsigned short i = USABLE_SIZE_OF_PAGE - SLOTTED_PAGE_HEADER_OVERHEAD; i < USABLE_SIZE_OF_PAGE; i++) {
image[i] = SLOTTED_HEADER;
}
for(unsigned short i = *stasis_page(slotted_freespace_cptr)(page); i < slotListStart; i++) {
image[i] = FREE_SPACE;
}
for(unsigned short i = 0; i < *stasis_page(slotted_numslots_cptr)(page); i++) {
*stasis_mempage_slotted_slot_ptr(image, i) = S_SLOT_LIST;
*stasis_mempage_slotted_slot_length_ptr(image, i) = S_SLOT_LIST;
}
for(unsigned short i = 0; i < *stasis_page(slotted_numslots_cptr)(page); i++) {
short slot_offset = *stasis_page(slotted_slot_cptr)(page, i);
if(slot_offset != INVALID_SLOT) {
const unsigned char ci = i % 0xFF;
short slot_len = stasis_record_type_to_size(*stasis_page(slotted_slot_length_cptr)(page, i));
for(unsigned short j = 0; j < slot_len; j++) {
assert(image[slot_offset + j] == 0xFF);
image[slot_offset + j] = ci;
}
}
}
#endif // SLOTTED_PAGE_CHECK_FOR_OVERLAP
#endif // SLOTTED_PAGE_SANITY_CHECKS
}
/**
Check to see how many bytes can fit in a given slot. This
makes it possible for callers to guarantee the safety
of a subsequent call to really_do_ralloc().
This call can return negative numbers. This lets it differentiate between
situations in which a zero-byte record would fit, and when it would not.
*/
static inline ssize_t stasis_page(slotted_freespace_for_slot)(PAGE * page, slotid_t slot) {
ssize_t slotOverhead;
if(slot == INVALID_SLOT) {
slotOverhead = (*stasis_page(slotted_freelist_cptr)(page) == INVALID_SLOT) ? SLOTTED_PAGE_OVERHEAD_PER_RECORD : 0;
} else if(slot < *stasis_page(slotted_numslots_cptr)(page)) {
slotOverhead = 0;
} else {
// slotOverhead = SLOTTED_PAGE_OVERHEAD_PER_RECORD * (*numslots_ptr(page) - slot);
slotOverhead = SLOTTED_PAGE_OVERHEAD_PER_RECORD * ((slot+1) - *stasis_page(slotted_numslots_cptr)(page));
}
// end_of_free_space points to the beginning of the slot header at the bottom of the page header.
byte* end_of_free_space = (byte*)stasis_page(slotted_slot_length_cptr)(page, (*stasis_page(slotted_numslots_cptr)(page))-1);
// start_of_free_space points to the first unallocated byte in the page
// (ignoring space that could be reclaimed by compaction)
const byte* start_of_free_space = (stasis_page(byte_cptr_from_start)(page, 0) + *stasis_page(slotted_freespace_cptr)(page));
assert(end_of_free_space >= start_of_free_space);
if(end_of_free_space < start_of_free_space + slotOverhead) {
// The regions would overlap after allocation. There is no free space.
return 0;
} else {
// The regions would not overlap. There might be free space.
return (ssize_t) (end_of_free_space - start_of_free_space - slotOverhead);
}
}
static inline void stasis_page(slotted_compact)(PAGE * page) {
byte buffer[PAGE_SIZE];
// Copy external headers into bufPage.
memcpy(&buffer[USABLE_SIZE_OF_PAGE], &(stasis_page(memaddr)(page)[USABLE_SIZE_OF_PAGE]), PAGE_SIZE - USABLE_SIZE_OF_PAGE);
// Now, build new slotted page in the bufPage struct.
*stasis_mempage_slotted_freespace_ptr(buffer) = 0;
// numslots_ptr will be set later.
*stasis_mempage_slotted_freelist_ptr(buffer) = INVALID_SLOT;
const short numSlots = *stasis_page(slotted_numslots_cptr)(page);
short lastFreeSlot = INVALID_SLOT;
short lastFreeSlotBeforeUsedSlot = INVALID_SLOT;
short lastUsedSlot = -1;
// Rebuild free list.
for(short i = 0; i < numSlots; i++) {
if(*stasis_page(slotted_slot_cptr)(page, i) == INVALID_SLOT) {
if(lastFreeSlot == INVALID_SLOT) {
*stasis_mempage_slotted_freelist_ptr(buffer) = i;
} else {
*stasis_mempage_slotted_slot_length_ptr(buffer, lastFreeSlot) = i;
}
*stasis_mempage_slotted_slot_ptr(buffer, i) = INVALID_SLOT;
lastFreeSlot = i;
} else {
lastUsedSlot = i;
lastFreeSlotBeforeUsedSlot = lastFreeSlot;
short logicalSize = *stasis_page(slotted_slot_length_cptr)(page, i);
short physicalSize = stasis_record_type_to_size(logicalSize);
memcpy(&(buffer[*stasis_mempage_slotted_freespace_ptr(buffer)]), stasis_page(slotted_record_cptr)(page, i), physicalSize);
*stasis_mempage_slotted_slot_ptr(buffer, i) = *stasis_mempage_slotted_freespace_cptr(buffer);
*stasis_mempage_slotted_slot_length_ptr(buffer, i) = logicalSize;
(*stasis_mempage_slotted_freespace_ptr(buffer)) += physicalSize;
}
}
// Truncate linked list, and update numslots_ptr.
*stasis_mempage_slotted_slot_length_ptr(buffer, lastFreeSlotBeforeUsedSlot) = INVALID_SLOT;
*stasis_mempage_slotted_numslots_ptr(buffer) = lastUsedSlot+1;
memcpy(stasis_page(memaddr)(page), buffer, PAGE_SIZE);
#ifdef SLOTTED_PAGE_OLD_CHECKS
stasis_page(slotted_fsck)(page);
#endif // SLOTTED_PAGE_OLD_CHECKS
}
static inline void stasis_page(slotted_compact_slot_ids)(PAGE * p) {
int16_t numSlots = *stasis_page(slotted_numslots_cptr)(p);
int16_t out = 0;
for(int16_t in = 0; in < numSlots; in++) {
if(*stasis_page(slotted_slot_cptr)(p, in) == INVALID_SLOT) {
// nop
} else {
*stasis_page(slotted_slot_ptr)(p, out) = *stasis_page(slotted_slot_cptr)(p, in);
*stasis_page(slotted_slot_length_ptr)(p, out) = *stasis_page(slotted_slot_length_cptr)(p, in);
out++;
}
}
*stasis_page(slotted_numslots_ptr)(p) = out;
*stasis_page(slotted_freelist_ptr)(p) = INVALID_SLOT;
}
static inline int stasis_page(slotted_get_type)(PAGE *p, slotid_t slot) {
if(slot >= *stasis_page(slotted_numslots_cptr)(p)) { return INVALID_SLOT; }
if(*stasis_page(slotted_slot_cptr)(p, slot) == INVALID_SLOT) { return INVALID_SLOT; }
int ret = *stasis_page(slotted_slot_length_cptr)(p, slot);
return ret >= 0 ? NORMAL_SLOT : ret;
}
static inline void stasis_page(slotted_set_type)(PAGE *p, slotid_t slot, int type) {
int old_type = *stasis_page(slotted_slot_length_cptr)(p, slot);
assert(slot < *stasis_page(slotted_numslots_cptr)(p));
assert(old_type != INVALID_SLOT);
if(type == NORMAL_SLOT) {
// set slot_length_ptr to the physical length.
*stasis_page(slotted_slot_length_ptr)(p, slot) = stasis_record_type_to_size(old_type);
} else {
// Changing to a special slot type; make sure doing so doesn't change
// the record size.
assert(stasis_record_type_to_size(type) == stasis_record_type_to_size(old_type));
*stasis_page(slotted_slot_length_ptr)(p, slot) = type;
}
}
static inline int stasis_page(slotted_get_length)(PAGE *p, slotid_t slot) {
if( stasis_page(slotted_get_type)(p, slot) == INVALID_SLOT)
return INVALID_SLOT;
else
return stasis_record_type_to_size(*stasis_page(slotted_slot_length_cptr)(p, slot));
}
static inline slotid_t stasis_page(slotted_next_record)(PAGE *p, slotid_t slot) {
short n = *stasis_page(slotted_numslots_cptr)(p);
do {
slot++;
} while(slot < n && stasis_page(slotted_get_type)(p,slot)==INVALID_SLOT);
return n > slot ? slot : INVALID_SLOT;
}
static inline slotid_t stasis_page(slotted_last_record)(PAGE *p) {
return (*stasis_page(slotted_numslots_cptr)(p)) - 1;
}
static inline slotid_t stasis_page(slotted_pre_alloc)(PAGE * p, int type) {
assert(type != INVALID_SLOT);
slotid_t slot = *stasis_page(slotted_numslots_cptr)(p);
if(*stasis_page(slotted_freelist_cptr)(p) != INVALID_SLOT) {
slot = *stasis_page(slotted_freelist_cptr)(p);
}
// int casts are there to deal with sign vs. unsigned types
if((int)stasis_page(slotted_freespace_for_slot)(p, slot) < (int)stasis_record_type_to_size(type)) {
return INVALID_SLOT;
} else {
return slot;
}
}
static inline void stasis_page(slotted_post_alloc)(PAGE * page, slotid_t slot, short type) {
// Compact the page if we don't have enough room.
if((int)stasis_page(slotted_freespace_for_slot)(page, slot) < (int)stasis_record_type_to_size(type)) {
stasis_page(slotted_compact)(page);
// Make sure we have enough enough free space for the new record
assert ((int)stasis_page(slotted_freespace_for_slot)(page, slot) >= (int)stasis_record_type_to_size(type));
}
short freeSpace = *stasis_page(slotted_freespace_cptr)(page);
// Remove this entry from the freelist (if necessary) slottedCompact
// assumes that this does not change the order of items in the list.
// If it did, then slottedCompact could leak slot id's (or worse!)
if(slot < *stasis_page(slotted_numslots_cptr)(page) && *stasis_page(slotted_slot_cptr)(page,slot) == INVALID_SLOT) {
short next = *stasis_page(slotted_freelist_cptr)(page);
short last = INVALID_SLOT;
// special case: is the slot physically before us the predecessor?
if(slot > 0) {
if(*stasis_page(slotted_slot_length_cptr)(page, slot-1) == slot &&
*stasis_page(slotted_slot_cptr)(page, slot-1) == INVALID_SLOT) {
next = slot;
last = slot-1;
}
}
while(next != INVALID_SLOT && next != slot) {
last = next;
assert(next < *stasis_page(slotted_numslots_cptr)(page));
short next_slot_ptr = *stasis_page(slotted_slot_cptr)(page, next);
assert(next_slot_ptr == INVALID_SLOT);
next = *stasis_page(slotted_slot_length_cptr)(page, next);
}
if(next == slot) {
if(last == INVALID_SLOT) {
*stasis_page(slotted_freelist_ptr)(page)
= *stasis_page(slotted_slot_length_cptr)(page, slot);
} else {
*stasis_page(slotted_slot_length_ptr)(page, last)
= *stasis_page(slotted_slot_length_cptr)(page, slot);
}
}
}
// Insert any slots that come between the previous numslots_ptr()
// and the slot we're allocating onto the freelist. In order to
// promote the reuse of free slot numbers, we go out of our way to make sure
// that we put them in the list in increasing order. (Note: slottedCompact's
// correctness depends on this behavior!)
if(slot > *stasis_page(slotted_numslots_cptr)(page)) {
short lastSlot;
if(*stasis_page(slotted_freelist_cptr)(page) == INVALID_SLOT) {
short numSlots = *stasis_page(slotted_numslots_cptr)(page);
*stasis_page(slotted_freelist_ptr)(page) = numSlots;
lastSlot = numSlots;
*stasis_page(slotted_slot_ptr)(page, lastSlot) = INVALID_SLOT;
// will set slot_length_ptr on next iteration.
(*stasis_page(slotted_numslots_ptr)(page))++;
} else {
lastSlot = INVALID_SLOT;
short next = *stasis_page(slotted_freelist_cptr)(page);
while(next != INVALID_SLOT) {
lastSlot = next;
next = *stasis_page(slotted_slot_length_cptr)(page, lastSlot);
assert(lastSlot < *stasis_page(slotted_numslots_cptr)(page));
assert(*stasis_page(slotted_slot_ptr)(page, lastSlot) == INVALID_SLOT);
}
*stasis_page(slotted_slot_ptr)(page, lastSlot) = INVALID_SLOT;
}
// lastSlot now contains the tail of the free list. We can start adding slots to the list starting at *numslots_ptr.
while(*stasis_page(slotted_numslots_cptr)(page) < slot) {
*stasis_page(slotted_slot_length_ptr)(page, lastSlot) = *stasis_page(slotted_numslots_cptr)(page);
lastSlot = *stasis_page(slotted_numslots_cptr)(page);
*stasis_page(slotted_slot_ptr)(page, lastSlot) = INVALID_SLOT;
(*stasis_page(slotted_numslots_ptr)(page))++;
}
// Terminate the end of the list.
assert(lastSlot < *stasis_page(slotted_numslots_cptr)(page));
*stasis_page(slotted_slot_length_ptr)(page, lastSlot) = INVALID_SLOT;
}
if(*stasis_page(slotted_numslots_cptr)(page) == slot) {
*stasis_page(slotted_numslots_ptr)(page) = slot+1;
}
assert(*stasis_page(slotted_numslots_cptr)(page) > slot);
DEBUG("Num slots %d\trid.slot %d\n", *stasis_page(slotted_numslots_cptr)(page), slot);
// Reserve space for this record and record the space's offset in
// the slot header.
assert(slot < *stasis_page(slotted_numslots_cptr)(page));
assert(freeSpace == *stasis_page(slotted_freespace_cptr)(page)); // XXX not sure how this could not be the case.
*stasis_page(slotted_freespace_ptr)(page) = freeSpace + stasis_record_type_to_size(type);
*stasis_page(slotted_slot_ptr)(page, slot) = freeSpace;
*stasis_page(slotted_slot_length_ptr)(page, slot) = type;
}
static inline void stasis_page(slotted_splice_slot)(PAGE *p, slotid_t a, slotid_t b) {
if(a==b) { return; } // no-op
if(a > b) {
slotid_t c = a;
a = b;
b = c;
}
int16_t b_slot = *stasis_page(slotted_slot_cptr)(p, b);
int16_t b_slot_len = *stasis_page(slotted_slot_length_cptr)(p, b);
for(int16_t i = b-1; i >= a; i--) {
*stasis_page(slotted_slot_ptr)(p, i+1) = *stasis_page(slotted_slot_cptr)(p, i);
*stasis_page(slotted_slot_length_ptr)(p, i+1) = *stasis_page(slotted_slot_length_cptr)(p, i);
}
*stasis_page(slotted_slot_ptr)(p, a) = b_slot;
*stasis_page(slotted_slot_length_ptr)(p, a) = b_slot_len;
}
static inline void stasis_page(slotted_free)(PAGE * p, slotid_t slot) {
ssize_t rec_size = stasis_record_type_to_size(*stasis_page(slotted_slot_length_cptr)(p, slot));
if(*stasis_page(slotted_freespace_cptr)(p) == *stasis_page(slotted_slot_cptr)(p, slot) + rec_size) {
(*stasis_page(slotted_freespace_ptr)(p)) -= rec_size;
}
assert(slot < *stasis_page(slotted_numslots_cptr)(p));
if(slot == *stasis_page(slotted_numslots_cptr)(p)-1) {
(*stasis_page(slotted_numslots_ptr)(p))--;
assert(stasis_page(slotted_get_type)(p,slot)==INVALID_SLOT);
} else {
*stasis_page(slotted_slot_ptr)(p, slot) = INVALID_SLOT;
*stasis_page(slotted_slot_length_ptr)(p, slot) = *stasis_page(slotted_freelist_cptr)(p);
*stasis_page(slotted_freelist_ptr)(p) = slot;
assert(stasis_page(slotted_get_type)(p,slot)==INVALID_SLOT);
}
#ifdef SLOTTED_PAGE_OLD_CHECKS
stasis_page(slotted_fsck)(p);
#endif
}

View file

@ -61,19 +61,6 @@ Slotted page layout:
************************************************************************/ ************************************************************************/
#ifndef STASIS_PAGE_SLOTTED_H #ifndef STASIS_PAGE_SLOTTED_H
#define STASIS_PAGE_SLOTTED_H #define STASIS_PAGE_SLOTTED_H
static inline int16_t* stasis_page_slotted_freespace_ptr(Page * p) { return stasis_page_int16_ptr_from_end((p), 1); }
static inline int16_t* stasis_page_slotted_numslots_ptr(Page * p) { return stasis_page_int16_ptr_from_end(p, 2); }
static inline int16_t* stasis_page_slotted_freelist_ptr(Page * p) { return stasis_page_int16_ptr_from_end(p, 3); }
static inline int16_t* stasis_page_slotted_slot_ptr(Page * p, slotid_t n) { return stasis_page_int16_ptr_from_end(p, (2*(n))+4); }
static inline int16_t* stasis_page_slotted_slot_length_ptr(Page * p, slotid_t n) { return stasis_page_int16_ptr_from_end((p), (2*(n))+5); }
static inline byte* stasis_page_slotted_record_ptr(Page * p, slotid_t n) { return stasis_page_byte_ptr_from_start((p), *stasis_page_slotted_slot_ptr((p), (n))); }
static inline const int16_t* stasis_page_slotted_freespace_cptr(const Page * p) { return stasis_page_slotted_freespace_ptr((Page*)p); }
static inline const int16_t* stasis_page_slotted_numslots_cptr(const Page * p) { return stasis_page_slotted_numslots_ptr((Page*)p); }
static inline const int16_t* stasis_page_slotted_freelist_cptr(const Page * p) { return stasis_page_slotted_freelist_ptr((Page*)p); }
static inline const int16_t* stasis_page_slotted_slot_cptr(const Page * p, slotid_t n) { return stasis_page_slotted_slot_ptr((Page*)p, n); }
static inline const int16_t* stasis_page_slotted_slot_length_cptr(const Page * p, slotid_t n) { return stasis_page_slotted_slot_length_ptr((Page*)p, n); }
static inline const byte* stasis_page_slotted_record_cptr(const Page * p, slotid_t n) { return stasis_page_slotted_record_ptr((Page*)p, n); }
void stasis_page_slotted_init(); void stasis_page_slotted_init();
void stasis_page_slotted_deinit(); void stasis_page_slotted_deinit();

View file

@ -41,6 +41,7 @@ terms specified in this license.
#include "../check_includes.h" #include "../check_includes.h"
#include <stasis/transactional.h> #include <stasis/transactional.h>
#include <stasis/page/fixed.h>
#include <stasis/logger/logger2.h> #include <stasis/logger/logger2.h>
#include <stasis/bufferManager.h> #include <stasis/bufferManager.h>
#include <stasis/truncation.h> #include <stasis/truncation.h>
@ -497,7 +498,7 @@ START_TEST(operation_alloc_small) {
} }
Tcommit(xid); Tcommit(xid);
Tdeinit(); Tdeinit();
} END_TEST; } END_TEST
#define ARRAY_LIST_CHECK_ITER 10000 #define ARRAY_LIST_CHECK_ITER 10000
START_TEST(operation_array_list) { START_TEST(operation_array_list) {
@ -571,7 +572,7 @@ START_TEST(operation_lsn_free) {
int xid = Tbegin(); int xid = Tbegin();
pageid_t pid = TpageAlloc(xid); pageid_t pid = TpageAlloc(xid);
Page * p = loadPage(xid,pid); Page * p = loadPage(xid,pid);
stasis_slotted_lsn_free_initialize_page(p); stasis_page_slotted_lsn_free_initialize_page(p);
// XXX hack! // XXX hack!
byte * old = malloc(PAGE_SIZE); byte * old = malloc(PAGE_SIZE);
memcpy(old, p->memAddr, PAGE_SIZE); memcpy(old, p->memAddr, PAGE_SIZE);
@ -635,7 +636,7 @@ START_TEST(operation_reorderable) {
int xid = Tbegin(); int xid = Tbegin();
pageid_t pid = TpageAlloc(xid); pageid_t pid = TpageAlloc(xid);
Page * p = loadPage(xid,pid); Page * p = loadPage(xid,pid);
stasis_slotted_lsn_free_initialize_page(p); stasis_page_slotted_lsn_free_initialize_page(p);
// XXX hack! // XXX hack!
byte * old = malloc(PAGE_SIZE); byte * old = malloc(PAGE_SIZE);
memcpy(old, p->memAddr, PAGE_SIZE); memcpy(old, p->memAddr, PAGE_SIZE);
@ -772,7 +773,7 @@ static int op_test_redo_impl(const LogEntry * e, Page * p) {
Page * p = loadPage(e->xid, a->start + i); Page * p = loadPage(e->xid, a->start + i);
if(stasis_operation_multi_should_apply(e, p)) { if(stasis_operation_multi_should_apply(e, p)) {
writelock(p->rwlatch, 0); writelock(p->rwlatch, 0);
stasis_fixed_initialize_page(p, sizeof(i), 1); stasis_page_fixed_initialize_page(p, sizeof(i), 1);
recordid rid = { p->id, 0, sizeof(i) }; recordid rid = { p->id, 0, sizeof(i) };
stasis_record_write(e->xid, p, rid, (byte*)&i); stasis_record_write(e->xid, p, rid, (byte*)&i);
stasis_page_lsn_write(e->xid, p, e->LSN); stasis_page_lsn_write(e->xid, p, e->LSN);

View file

@ -48,6 +48,7 @@ terms specified in this license.
#include <stasis/page.h> #include <stasis/page.h>
#include <stasis/page/slotted.h> #include <stasis/page/slotted.h>
#include <stasis/page/fixed.h>
#include <stasis/experimental/latchFree/lfSlotted.h> #include <stasis/experimental/latchFree/lfSlotted.h>
#include <stasis/operations/blobs.h> #include <stasis/operations/blobs.h>
#include <stasis/bufferManager.h> #include <stasis/bufferManager.h>
@ -429,7 +430,7 @@ START_TEST(pageRecordSizeTypeIteratorTest) {
p = loadPage(xid,pid); p = loadPage(xid,pid);
memset(p->memAddr, 0, PAGE_SIZE); memset(p->memAddr, 0, PAGE_SIZE);
stasis_fixed_initialize_page(p,sizeof(int64_t),0); stasis_page_fixed_initialize_page(p,sizeof(int64_t),0);
checkPageIterators(xid,p,10); checkPageIterators(xid,p,10);
@ -542,7 +543,7 @@ START_TEST(fixedPageThreadTest) {
Tinit(); Tinit();
Page * p = loadPage(-1, 2); Page * p = loadPage(-1, 2);
memset(p->memAddr, 0, PAGE_SIZE); memset(p->memAddr, 0, PAGE_SIZE);
stasis_fixed_initialize_page(p, sizeof(int), 0); stasis_page_fixed_initialize_page(p, sizeof(int), 0);
p->LSN = 0; p->LSN = 0;
*stasis_page_lsn_ptr(p) = p->LSN; *stasis_page_lsn_ptr(p) = p->LSN;