From 7660467f8e9ec5f3acdeafbc256f11765015f590 Mon Sep 17 00:00:00 2001 From: Sears Russell Date: Wed, 9 Nov 2011 21:37:38 +0000 Subject: [PATCH] Refactor page implementation to use multiple include trick (so the backing representation can be changed). Also, fix apparent bug in stasis_page_fixed_set_record_type(). Otherwise, this change should not make any semantic changes. --- benchmarks/bufferManager.c | 4 +- benchmarks/lsn_bench_common.h | 2 +- benchmarks/roseTable.cpp | 1 + benchmarks/roseTableTpcCH.h | 4 +- src/stasis/experimental/lsmTree.c | 34 +- src/stasis/operations/arrayList.c | 9 +- src/stasis/operations/pageOperations.c | 9 +- src/stasis/page.c | 8 +- src/stasis/page/fixed.c | 209 +++++------ src/stasis/page/lsnFree.c | 2 +- src/stasis/page/slotted.c | 495 ++----------------------- src/stasis/util/stlredblack.cpp | 4 - stasis/page.h | 292 +-------------- stasis/page/fixed-impl.h | 93 +++++ stasis/page/fixed.h | 39 +- stasis/page/page-impl.h | 296 +++++++++++++++ stasis/page/slotted-impl.h | 461 +++++++++++++++++++++++ stasis/page/slotted.h | 13 - test/stasis/check_operations.c | 9 +- test/stasis/check_page.c | 5 +- 20 files changed, 1053 insertions(+), 936 deletions(-) create mode 100644 stasis/page/fixed-impl.h create mode 100644 stasis/page/page-impl.h create mode 100644 stasis/page/slotted-impl.h diff --git a/benchmarks/bufferManager.c b/benchmarks/bufferManager.c index 190a81b..37bdd7c 100644 --- a/benchmarks/bufferManager.c +++ b/benchmarks/bufferManager.c @@ -8,6 +8,8 @@ #include #include #include +#include + #include #include #include @@ -29,7 +31,7 @@ struct thread_arg { int do_load(pageid_t page_count) { for(int i = 1; i < page_count; i++) { Page * p = loadUninitializedPage(-1, i); - stasis_fixed_initialize_page(p, sizeof(i), 1); + stasis_page_fixed_initialize_page(p, sizeof(i), 1); recordid rid = {i, 0, sizeof(i)}; stasis_record_write(-1, p, rid,(byte*) &i); stasis_page_lsn_write(-1, p, p->LSN + 1); diff --git a/benchmarks/lsn_bench_common.h b/benchmarks/lsn_bench_common.h index 8415b6f..805122f 100644 --- a/benchmarks/lsn_bench_common.h +++ b/benchmarks/lsn_bench_common.h @@ -17,7 +17,7 @@ void alloc_rids(long long num_rids, recordid ** slow, recordid ** fast) { Page * p = loadPage(xid, pid); writelock(p->rwlatch,0); memcpy(old, p->memAddr, PAGE_SIZE); - stasis_slotted_lsn_free_initialize_page(p); + stasis_page_slotted_lsn_free_initialize_page(p); while(i < num_rids && ( ((*slow)[i] = stasis_record_alloc_begin(xid, p, sizeof(int))).size diff --git a/benchmarks/roseTable.cpp b/benchmarks/roseTable.cpp index 6a2afce..1fc0b50 100644 --- a/benchmarks/roseTable.cpp +++ b/benchmarks/roseTable.cpp @@ -1,5 +1,6 @@ //#define LEAK_TEST +#include #include "roseTable.h" #include "stasis/experimental/compression/compression.h" diff --git a/benchmarks/roseTableTpcCH.h b/benchmarks/roseTableTpcCH.h index da01041..42b5ef8 100644 --- a/benchmarks/roseTableTpcCH.h +++ b/benchmarks/roseTableTpcCH.h @@ -1,10 +1,10 @@ #include #include #include -#include "stasis/experimental/lsmTable.h" - #include "stasis/transactional.h" +#include "stasis/experimental/lsmTable.h" + #include "stasis/experimental/compression/nop.h" #include "stasis/experimental/compression/multicolumn-impl.h" #include "stasis/experimental/compression/staticMulticolumn.h" diff --git a/src/stasis/experimental/lsmTree.c b/src/stasis/experimental/lsmTree.c index 89b8b0d..d936f34 100644 --- a/src/stasis/experimental/lsmTree.c +++ b/src/stasis/experimental/lsmTree.c @@ -162,7 +162,7 @@ typedef struct lsmTreeState { * header in the first two lsmTreeNodeRecords on the page. */ static void initializeNodePage(int xid, Page *p, size_t keylen) { - stasis_fixed_initialize_page(p, sizeof(lsmTreeNodeRecord)+keylen, 0); + stasis_page_fixed_initialize_page(p, sizeof(lsmTreeNodeRecord)+keylen, 0); recordid reserved1 = stasis_record_alloc_begin(xid, p, sizeof(lsmTreeNodeRecord)+keylen); stasis_record_alloc_done(xid, p, reserved1); recordid reserved2 = stasis_record_alloc_begin(xid, p, sizeof(lsmTreeNodeRecord)+keylen); @@ -194,7 +194,7 @@ static void initializeNodePage(int xid, Page *p, size_t keylen) { */ static inline size_t getKeySizeFixed(int xid, Page const *p) { - return (*recordsize_cptr(p)) - sizeof(lsmTreeNodeRecord); + return (*stasis_page_fixed_recordsize_cptr(p)) - sizeof(lsmTreeNodeRecord); } static inline size_t getKeySizeVirtualMethods(int xid, Page *p) { @@ -207,7 +207,7 @@ static inline size_t getKeySizeVirtualMethods(int xid, Page *p) { static inline const lsmTreeNodeRecord* readNodeRecordFixed(int xid, Page *const p, int slot, int keylen) { - return (const lsmTreeNodeRecord*)fixed_record_ptr(p, slot); + return (const lsmTreeNodeRecord*)stasis_page_fixed_record_ptr(p, slot); } /** * Read a record from the page node, using stasis' general-purpose @@ -237,7 +237,7 @@ lsmTreeNodeRecord* readNodeRecordVirtualMethods(int xid, Page * p, static inline void writeNodeRecordFixed(int xid, Page *p, int slot, const byte *key, size_t keylen, pageid_t ptr) { - lsmTreeNodeRecord *nr = (lsmTreeNodeRecord*)fixed_record_ptr(p,slot); + lsmTreeNodeRecord *nr = (lsmTreeNodeRecord*)stasis_page_fixed_record_ptr(p,slot); nr->ptr = ptr; memcpy(nr+1, key, keylen); stasis_page_lsn_write(xid, p, 0); // XXX need real LSN? @@ -274,7 +274,7 @@ recordid TlsmCreate(int xid, int comparator, Page *p = loadPage(xid, ret.page); writelock(p->rwlatch,0); - stasis_fixed_initialize_page(p, sizeof(lsmTreeNodeRecord) + keySize, 0); + stasis_page_fixed_initialize_page(p, sizeof(lsmTreeNodeRecord) + keySize, 0); p->pageType = LSM_ROOT_PAGE; lsmTreeState *state = malloc(sizeof(lsmTreeState)); @@ -411,7 +411,7 @@ static recordid appendInternalNode(int xid, Page *p, return ret; } else { // recurse - int slot = *recordcount_ptr(p)-1; + int slot = *stasis_page_fixed_recordcount_ptr(p)-1; assert(slot >= FIRST_SLOT); // there should be no empty nodes const lsmTreeNodeRecord *nr = readNodeRecord(xid, p, slot, key_len); pageid_t child_id = nr->ptr; @@ -457,7 +457,7 @@ static pageid_t findLastLeaf(int xid, Page *root, int depth) { } else { // passing zero as length is OK, as long as we don't try to access the key. const lsmTreeNodeRecord *nr = readNodeRecord(xid, root, - (*recordcount_ptr(root))-1,0); + (*stasis_page_fixed_recordcount_ptr(root))-1,0); pageid_t ret; Page *p = loadPage(xid, nr->ptr); @@ -545,7 +545,7 @@ recordid TlsmAppendPage(int xid, recordid tree, initializeNodePage(xid, lc,keySize); - for(int i = FIRST_SLOT; i < *recordcount_ptr(p); i++) { + for(int i = FIRST_SLOT; i < *stasis_page_fixed_recordcount_ptr(p); i++) { recordid cnext = stasis_record_alloc_begin(xid, lc, sizeof(lsmTreeNodeRecord)+keySize); @@ -565,7 +565,7 @@ recordid TlsmAppendPage(int xid, recordid tree, sizeof(lsmTreeNodeRecord)+keySize }; // @todo should fixed.h support bulk deallocation directly? - *recordcount_ptr(p) = FIRST_SLOT+1; + *stasis_page_fixed_recordcount_ptr(p) = FIRST_SLOT+1; lsmTreeNodeRecord *nr = (lsmTreeNodeRecord*)stasis_record_write_begin(xid, p, pFirstSlot); @@ -639,14 +639,14 @@ void TlsmFree(int xid, recordid tree, lsm_page_deallocator_t dealloc, static recordid lsmLookup(int xid, Page *node, int depth, const byte *key, size_t keySize, lsm_comparator_t cmp) { - if(*recordcount_ptr(node) == FIRST_SLOT) { + if(*stasis_page_fixed_recordcount_ptr(node) == FIRST_SLOT) { return NULLRID; } - assert(*recordcount_ptr(node) > FIRST_SLOT); + assert(*stasis_page_fixed_recordcount_ptr(node) > FIRST_SLOT); int match = FIRST_SLOT; // don't need to compare w/ first item in tree. const lsmTreeNodeRecord *rec = readNodeRecord(xid,node,FIRST_SLOT,keySize); - for(int i = FIRST_SLOT+1; i < *recordcount_ptr(node); i++) { + for(int i = FIRST_SLOT+1; i < *stasis_page_fixed_recordcount_ptr(node); i++) { rec = readNodeRecord(xid,node,i,keySize); int cmpval = cmp(rec+1,key); if(cmpval > 0) { @@ -690,7 +690,7 @@ pageid_t TlsmFindPage(int xid, recordid tree, const byte *key) { readlock(p->rwlatch,0); tree.slot = 0; - tree.size = *recordsize_ptr(p); + tree.size = *stasis_page_fixed_recordsize_ptr(p); size_t keySize = getKeySize(xid,p); @@ -733,10 +733,10 @@ pageid_t TlsmLastPage(int xid, recordid tree) { Page * p = loadPage(xid, ret); readlock(p->rwlatch,0); - if(*recordcount_ptr(p) == 2) { + if(*stasis_page_fixed_recordcount_ptr(p) == 2) { ret = -1; } else { - const lsmTreeNodeRecord *nr = readNodeRecord(xid,p,(*recordcount_ptr(p))-1,keySize); + const lsmTreeNodeRecord *nr = readNodeRecord(xid,p,(*stasis_page_fixed_recordcount_ptr(p))-1,keySize); ret = nr->ptr; } unlock(p->rwlatch); @@ -769,7 +769,7 @@ static void lsmPageCleanup(Page *p) { A page_impl for the root of an lsmTree. */ page_impl lsmRootImpl() { - page_impl pi = fixedImpl(); + page_impl pi = stasis_page_fixed_impl(); pi.pageLoaded = lsmPageLoaded; pi.pageFlushed = lsmPageFlushed; pi.pageCleanup = lsmPageCleanup; @@ -888,7 +888,7 @@ void lsmTreeIterator_close(int xid, lladdIterator_t *it) { int lsmTreeIterator_next(int xid, lladdIterator_t *it) { lsmIteratorImpl *impl = it->impl; size_t keySize = impl->current.size; - impl->current = fixedNext(xid, impl->p, impl->current); + impl->current = stasis_page_fixed_next_record(xid, impl->p, impl->current); if(impl->current.size == INVALID_SLOT) { const lsmTreeNodeRecord next_rec = *readNodeRecord(xid,impl->p,NEXT_LEAF, keySize); diff --git a/src/stasis/operations/arrayList.c b/src/stasis/operations/arrayList.c index 9f39a10..cf5857c 100644 --- a/src/stasis/operations/arrayList.c +++ b/src/stasis/operations/arrayList.c @@ -1,6 +1,7 @@ #include #include #include +#include #include #include @@ -33,7 +34,7 @@ static array_list_parameter_t array_list_read_parameter(int xid, Page * p) { } static int array_list_get_block_containing_offset(array_list_parameter_t alp, int offset, pageid_t * firstSlotInBlock) { - int rec_per_page = stasis_fixed_records_per_page((size_t)alp.size); + int rec_per_page = stasis_page_fixed_records_per_page((size_t)alp.size); long thisHigh = rec_per_page * alp.initialSize; int lastHigh = 0; int pageRidSlot = 0; @@ -58,8 +59,8 @@ static int array_list_op_init_header(const LogEntry* e, Page* p) { const array_list_parameter_t * alp = stasis_log_entry_update_args_cptr(e); - stasis_fixed_initialize_page(p, sizeof(pageid_t), - stasis_fixed_records_per_page(sizeof(pageid_t))); + stasis_page_fixed_initialize_page(p, sizeof(pageid_t), + stasis_page_fixed_records_per_page(sizeof(pageid_t))); recordid initialSizeRid, multiplierRid, slotSizeRid, maxOffsetRid, firstDataPageRid; @@ -113,7 +114,7 @@ recordid stasis_array_list_dereference_recordid(int xid, Page * p, int offset) { readlock(p->rwlatch,0); array_list_parameter_t tlp = array_list_read_parameter(xid, p); - int rec_per_page = stasis_fixed_records_per_page((size_t)tlp.size); + int rec_per_page = stasis_page_fixed_records_per_page((size_t)tlp.size); pageid_t lastHigh = 0; int pageRidSlot = 0; /* The slot on the root arrayList page that contains the first page of the block of interest */ diff --git a/src/stasis/operations/pageOperations.c b/src/stasis/operations/pageOperations.c index 0dbda4e..c2f8f9d 100644 --- a/src/stasis/operations/pageOperations.c +++ b/src/stasis/operations/pageOperations.c @@ -1,4 +1,5 @@ #include +#include #include #include #include @@ -195,8 +196,8 @@ static int op_initialize_page(const LogEntry* e, Page* p) { stasis_page_slotted_initialize_page(p); break; case FIXED_PAGE: - stasis_fixed_initialize_page(p, arg->type, - stasis_fixed_records_per_page + stasis_page_fixed_initialize_page(p, arg->type, + stasis_page_fixed_records_per_page (stasis_record_type_to_size(arg->type))); break; default: @@ -223,8 +224,8 @@ static int op_init_multipage_impl(const LogEntry *e, Page *ignored) { } else if(arg->recordSize == BLOB_SLOT) { stasis_page_blob_initialize_page(p); } else { - stasis_fixed_initialize_page(p, arg->recordSize, - stasis_fixed_records_per_page + stasis_page_fixed_initialize_page(p, arg->recordSize, + stasis_page_fixed_records_per_page (stasis_record_type_to_size(arg->recordSize))); } stasis_page_lsn_write(e->xid, p, e->LSN); diff --git a/src/stasis/page.c b/src/stasis/page.c index a3f4f3f..912a0b1 100644 --- a/src/stasis/page.c +++ b/src/stasis/page.c @@ -109,13 +109,13 @@ lsn_t stasis_page_lsn_read(const Page * page) { void stasis_page_init(stasis_dirty_page_table_t * dpt) { dirtyPages = dpt; stasis_page_slotted_init(); - fixedPageInit(); + stasis_page_fixed_init(); stasis_page_impl_register(stasis_page_uninitialized_impl()); stasis_page_impl_register(stasis_page_slotted_impl()); - stasis_page_impl_register(fixedImpl()); + stasis_page_impl_register(stasis_page_fixed_impl()); stasis_page_impl_register(stasis_page_boundary_tag_impl()); - stasis_page_impl_register(arrayListImpl()); + stasis_page_impl_register(stasis_page_array_list_impl()); stasis_page_impl_register(stasis_page_blob_impl()); stasis_page_impl_register(slottedLsnFreeImpl()); stasis_page_impl_register(segmentImpl()); @@ -129,7 +129,7 @@ void stasis_page_deinit() { page_impls[i] = p; } - fixedPageDeinit(); + stasis_page_fixed_deinit(); stasis_page_slotted_deinit(); } diff --git a/src/stasis/page/fixed.c b/src/stasis/page/fixed.c index c5a5f45..3c8eca5 100644 --- a/src/stasis/page/fixed.c +++ b/src/stasis/page/fixed.c @@ -1,155 +1,148 @@ #include #include #include -/** @todo should page implementations provide readLSN / writeLSN??? */ #include #include - - -int stasis_fixed_records_per_page(size_t size) { - return (USABLE_SIZE_OF_PAGE - 2*sizeof(short)) / size; -} -/** @todo CORRECTNESS Locking for stasis_fixed_initialize_page? (should hold writelock)*/ -void stasis_fixed_initialize_page(Page * page, size_t size, int count) { - stasis_page_cleanup(page); - // Zero out the page contents, since callers often do so anyway. - // blows away LSN, but the copy that's in p->LSN will be put back on page at flush. - memset(page->memAddr, 0, PAGE_SIZE); - page->pageType = FIXED_PAGE; - *recordsize_ptr(page) = size; - assert(count <= stasis_fixed_records_per_page(size)); - *recordcount_ptr(page)= count; -} - -static void checkRid(Page * page, recordid rid) { - assert(page->pageType); // any more specific breaks pages based on this one - assert(page->id == rid.page); - assert(*recordsize_ptr(page) == rid.size); - assert(stasis_fixed_records_per_page(rid.size) > rid.slot); -} - //-------------- New API below this line -static const byte* fixedRead(int xid, Page *p, recordid rid) { - checkRid(p, rid); - assert(rid.slot < *recordcount_ptr(p)); - return fixed_record_ptr(p, rid.slot); +static inline void stasis_page_fixed_checkRid(Page * page, recordid rid) { + assert(page->pageType); // any more specific breaks pages based on this one + assert(page->id == rid.page); + assert(*stasis_page_fixed_recordsize_cptr(page) == rid.size); + assert(stasis_page_fixed_records_per_page(rid.size) > rid.slot); +} +void stasis_page_fixed_initialize_page(Page * page, size_t size, int count) { + stasis_page_cleanup(page); + page->pageType = FIXED_PAGE; + stasis_page_fixed_initialize_page_raw(page, size, count); } -static byte* fixedWrite(int xid, Page *p, recordid rid) { - checkRid(p, rid); - assert(rid.slot < *recordcount_ptr(p)); - return fixed_record_ptr(p, rid.slot); +static int stasis_page_fixed_get_record_type(int xid, Page *p, recordid rid) { + return stasis_page_fixed_get_type(p, rid.slot); +} +static void stasis_page_fixed_set_record_type(int xid, Page *p, recordid rid, int type) { + stasis_page_fixed_set_type(p, rid.slot, type); } -static int fixedGetType(int xid, Page *p, recordid rid) { - // checkRid(p, rid); - if(rid.slot < *recordcount_ptr(p)) { - int type = *recordsize_ptr(p); - if(type > 0) { - type = NORMAL_SLOT; - } - return type; - } else { - return INVALID_SLOT; - } +static int stasis_page_fixed_not_supported(int xid, Page * p) { return 0; } + +static const byte* stasis_page_fixed_read(int xid, Page *p, recordid rid) { + stasis_page_fixed_checkRid(p, rid); + assert(rid.slot < *stasis_page_fixed_recordcount_cptr(p)); + return stasis_page_fixed_record_cptr(p, rid.slot); } -static void fixedSetType(int xid, Page *p, recordid rid, int type) { - checkRid(p,rid); - assert(rid.slot < *recordcount_ptr(p)); - assert(stasis_record_type_to_size(type) == stasis_record_type_to_size(*recordsize_ptr(p))); - *recordsize_ptr(p) = rid.size; + +static inline byte* stasis_page_fixed_write(int xid, Page *p, recordid rid) { + stasis_page_fixed_checkRid(p, rid); + assert(rid.slot < *stasis_page_fixed_recordcount_cptr(p)); + return stasis_page_fixed_record_ptr(p, rid.slot); } -static int fixedGetLength(int xid, Page *p, recordid rid) { + + +static int stasis_page_fixed_get_length_record(int xid, Page *p, recordid rid) { assert(p->pageType); - return rid.slot > *recordcount_ptr(p) ? - INVALID_SLOT : stasis_record_type_to_size(*recordsize_ptr(p)); + return stasis_page_fixed_get_length(p, rid.slot); } -static int notSupported(int xid, Page * p) { return 0; } - -static int fixedFreespace(int xid, Page * p) { - if(stasis_fixed_records_per_page(*recordsize_ptr(p)) > *recordcount_ptr(p)) { - // Return the size of a slot; that's the biggest record we can take. - return stasis_record_type_to_size(*recordsize_ptr(p)); +static recordid stasis_page_fixed_last_record(int xid, Page *p) { + recordid rid = { p->id, -1, 0 }; + rid.size = *stasis_page_fixed_recordsize_cptr(p); + rid.slot = -stasis_page_fixed_last_slot(p); + return rid; +} +recordid stasis_page_fixed_next_record(int xid, Page *p, recordid rid) { + slotid_t slot = stasis_page_fixed_next_slot(p, rid.slot); + if(slot == INVALID_SLOT) { + return NULLRID; } else { - // Page full; return zero. - return 0; + assert(rid.page == p->id); + rid.size = *stasis_page_fixed_recordsize_cptr(p); + rid.slot = slot; + return rid; } } -static void fixedCompact(Page * p) { +static recordid stasis_page_fixed_first_record(int xid, Page *p) { + recordid rid = { + p->id, + INVALID_SLOT, + *stasis_page_fixed_recordsize_cptr(p) + }; + return stasis_page_fixed_next_record(xid, p, rid); +} + +static int stasis_page_fixed_freespace(int xid, Page * p) { + return stasis_page_fixed_freespace_raw(p); +} + +static inline void stasis_page_fixed_compact(Page * p) { // no-op } -static void fixedCompactSlotIds(int xid, Page * p) { +static inline void stasis_page_fixed_compact_slot_ids(int xid, Page * p) { abort(); } -static recordid fixedPreAlloc(int xid, Page *p, int size) { - if(stasis_fixed_records_per_page(*recordsize_ptr(p)) > *recordcount_ptr(p)) { - recordid rid; - rid.page = p->id; - rid.slot = *recordcount_ptr(p); - rid.size = *recordsize_ptr(p); - return rid; - } else { + +static recordid stasis_page_fixed_pre_alloc_record(int xid, Page *p, int size) { + int slot = stasis_page_fixed_pre_alloc(p, size); + if(slot ==-1) { return NULLRID; } + recordid rid = { p->id, slot, *stasis_page_fixed_recordsize_cptr(p) }; + return rid; } -static void fixedPostAlloc(int xid, Page *p, recordid rid) { - assert(*recordcount_ptr(p) == rid.slot); - assert(*recordsize_ptr(p) == rid.size); - (*recordcount_ptr(p))++; +static void stasis_page_fixed_post_alloc_record(int xid, Page *p, recordid rid) { + assert(*stasis_page_fixed_recordsize_cptr(p) == rid.size); + stasis_page_fixed_post_alloc(p, rid.slot); } -static void fixedSplice(int xid, Page *p, slotid_t first, slotid_t second) { + +static inline void stasis_page_fixed_splice(int xid, Page *p, slotid_t first, slotid_t second) { abort(); } -static void fixedFree(int xid, Page *p, recordid rid) { - if(*recordsize_ptr(p) == rid.slot+1) { - (*recordsize_ptr(p))--; - } else { - // leak space; there's no way to track it with this page format. - } + +static void stasis_page_fixed_free_record(int xid, Page *p, recordid rid) { + stasis_page_fixed_free(p, rid.slot); } // XXX dereferenceRID -void fixedLoaded(Page *p) { +static void stasis_page_fixed_loaded(Page *p) { p->LSN = *stasis_page_lsn_ptr(p); } -void fixedFlushed(Page *p) { +static void stasis_page_fixed_flushed(Page *p) { *stasis_page_lsn_ptr(p) = p->LSN; } -void fixedCleanup(Page *p) { } -page_impl fixedImpl() { +static void stasis_page_fixed_cleanup(Page *p) { } + +page_impl stasis_page_fixed_impl() { static page_impl pi = { FIXED_PAGE, 1, - fixedRead, - fixedWrite, + stasis_page_fixed_read, + stasis_page_fixed_write, 0,// readDone 0,// writeDone - fixedGetType, - fixedSetType, - fixedGetLength, - fixedFirst, - fixedNext, - fixedLast, - notSupported, // notSupported, + stasis_page_fixed_get_record_type, + stasis_page_fixed_set_record_type, + stasis_page_fixed_get_length_record, + stasis_page_fixed_first_record, + stasis_page_fixed_next_record, + stasis_page_fixed_last_record, + stasis_page_fixed_not_supported, stasis_block_first_default_impl, stasis_block_next_default_impl, stasis_block_done_default_impl, - fixedFreespace, - fixedCompact, - fixedCompactSlotIds, - fixedPreAlloc, - fixedPostAlloc, - fixedSplice, - fixedFree, + stasis_page_fixed_freespace, + stasis_page_fixed_compact, + stasis_page_fixed_compact_slot_ids, + stasis_page_fixed_pre_alloc_record, + stasis_page_fixed_post_alloc_record, + stasis_page_fixed_splice, + stasis_page_fixed_free_record, 0, // XXX dereference - fixedLoaded, // loaded - fixedFlushed, // flushed - fixedCleanup + stasis_page_fixed_loaded, + stasis_page_fixed_flushed, + stasis_page_fixed_cleanup }; return pi; } @@ -157,11 +150,11 @@ page_impl fixedImpl() { /** @todo arrayListImpl belongs in arrayList.c */ -page_impl arrayListImpl() { - page_impl pi = fixedImpl(); +page_impl stasis_page_array_list_impl() { + page_impl pi = stasis_page_fixed_impl(); pi.page_type = ARRAY_LIST_PAGE; return pi; } -void fixedPageInit() { } -void fixedPageDeinit() { } +void stasis_page_fixed_init() { } +void stasis_page_fixed_deinit() { } diff --git a/src/stasis/page/lsnFree.c b/src/stasis/page/lsnFree.c index 0616da2..6cb13c2 100644 --- a/src/stasis/page/lsnFree.c +++ b/src/stasis/page/lsnFree.c @@ -2,7 +2,7 @@ #include #include -void stasis_slotted_lsn_free_initialize_page(Page * p) { +void stasis_page_slotted_lsn_free_initialize_page(Page * p) { stasis_page_slotted_initialize_page(p); p->pageType = SLOTTED_LSN_FREE_PAGE; } diff --git a/src/stasis/page/slotted.c b/src/stasis/page/slotted.c index 89024ae..33f6c83 100644 --- a/src/stasis/page/slotted.c +++ b/src/stasis/page/slotted.c @@ -8,25 +8,7 @@ #include #include -//#include -#define SLOTTED_PAGE_OVERHEAD_PER_RECORD (2 * sizeof(short)) -#define SLOTTED_PAGE_HEADER_OVERHEAD (3 * sizeof(short)) - -#ifdef LONG_TEST -#define SLOTTED_PAGE_CHECK_FOR_OVERLAP 1 -#endif - -// plan: slotted fsck on read / write. Make it more thorough so that the other methods only check for record existence. -// intermediate ops assume that slotted.c is correctly implemented (ie: fsck passes iff page is OK, page always "stays" OK) -// benchmark page ops (real time) + hash table (user time) - -//#define SLOTTED_PAGE_OLD_CHECKS -#define SLOTTED_PAGE_NEW_CHECKS -#ifdef SLOTTED_PAGE_NEW_CHECKS -#define SLOTTED_PAGE_SANITY_CHECKS -#define SLOTTED_PAGE_CHECK_FOR_OVERLAP -#endif #include /** Run sanity checks to make sure page is in a consistent state. @@ -36,353 +18,79 @@ and more expensive. */ static inline void slottedFsck(const Page const * page) { - - Page dummy; - - dummy.id = -1; - dummy.memAddr = 0; - const short page_type = page->pageType; - const short numslots = *stasis_page_slotted_numslots_cptr(page); - const short freespace = *stasis_page_slotted_freespace_cptr(page); - const short freelist = *stasis_page_slotted_freelist_cptr(page); - - const long slotListStart = (long)stasis_page_slotted_slot_length_ptr(&dummy, numslots-1); - assert(slotListStart < PAGE_SIZE && slotListStart >= 0); assert(page_type == SLOTTED_PAGE || page_type == BOUNDARY_TAG_PAGE || page_type == SLOTTED_LSN_FREE_PAGE || page_type == SLOTTED_LATCH_FREE_PAGE); - assert(numslots >= 0); - assert(numslots * SLOTTED_PAGE_OVERHEAD_PER_RECORD < PAGE_SIZE); - assert(freespace >= 0); - assert(freespace <= slotListStart); - assert(freelist >= INVALID_SLOT); - assert(freelist < numslots); - -#ifdef SLOTTED_PAGE_SANITY_CHECKS - - // Check integrity of freelist. All free slots less than - // numslots should be on it, in order. - - short * slot_offsets = alloca(numslots * sizeof(short)); - short * slot_lengths = alloca(numslots * sizeof(short)); - for(int i = 0; i < numslots; i++) { - slot_offsets[i] = *stasis_page_slotted_slot_cptr(page, i); - slot_lengths[i] = *stasis_page_slotted_slot_length_cptr(page, i); - } - - short foundEndOfList = 0; - - if(freelist != INVALID_SLOT) { - assert(slot_offsets[freelist] == INVALID_SLOT); - } else { - foundEndOfList = 1; - } - - for(short i = 0; i < numslots; i++) { - const short slot_length = slot_lengths[i]; - const short slot_offset = slot_offsets[i]; - if(slot_offset == INVALID_SLOT) { - if(slot_length == INVALID_SLOT) { - assert(!foundEndOfList); - foundEndOfList = 1; - } else { - assert (slot_offsets[slot_length] == INVALID_SLOT); - } - } else { - assert(slot_offset + slot_length <= freespace); - } - } - - // Is the free list terminated? - assert(foundEndOfList); - -#ifdef SLOTTED_PAGE_CHECK_FOR_OVERLAP - - const byte UNUSED = 0xFF; - const byte PAGE_HEADER = 0xFE; - const byte SLOTTED_HEADER = 0xFD; - // const byte SLOT_LIST = 0xFC; - const byte FREE_SPACE = 0xFB; - - const unsigned short S_SLOT_LIST = 0xFCFC; - - byte image[PAGE_SIZE]; - for(short i = 0; i < PAGE_SIZE; i++) { - image[i] = UNUSED; - } - for(short i = USABLE_SIZE_OF_PAGE; i < PAGE_SIZE; i++) { - image[i] = PAGE_HEADER; - } - for(short i = USABLE_SIZE_OF_PAGE - SLOTTED_PAGE_HEADER_OVERHEAD; i < USABLE_SIZE_OF_PAGE; i++) { - image[i] = SLOTTED_HEADER; - } - for(short i = *stasis_page_slotted_freespace_cptr(page); i < slotListStart; i++) { - image[i] = FREE_SPACE; - } - - dummy.memAddr = image; - - for(short i = 0; i < *stasis_page_slotted_numslots_cptr(page); i++) { - *stasis_page_slotted_slot_ptr(&dummy, i) = S_SLOT_LIST; - *stasis_page_slotted_slot_length_ptr(&dummy, i) = S_SLOT_LIST; - } - for(short i = 0; i < *stasis_page_slotted_numslots_cptr(page); i++) { - short slot_offset = *stasis_page_slotted_slot_cptr(page, i); - if(slot_offset != INVALID_SLOT) { - const unsigned char ci = i % 0xFF; - short slot_len = stasis_record_type_to_size(*stasis_page_slotted_slot_length_cptr(page, i)); - - for(short j = 0; j < slot_len; j++) { - assert(image[slot_offset + j] == 0xFF); - image[slot_offset + j] = ci; - } - } - } -#endif // SLOTTED_PAGE_CHECK_FOR_OVERLAP -#endif // SLOTTED_PAGE_SANITY_CHECKS - + stasis_page_slotted_check(page); } /** Move all of the records to the beginning of the page in order to increase the available free space. */ - static void slottedCompact(Page * page) { - Page bufPage; - byte buffer[PAGE_SIZE]; - bufPage.memAddr = buffer; - - // Copy external headers into bufPage. - - memcpy(&buffer[USABLE_SIZE_OF_PAGE], &(page->memAddr[USABLE_SIZE_OF_PAGE]), PAGE_SIZE - USABLE_SIZE_OF_PAGE); - - // Now, build new slotted page in the bufPage struct. - - *stasis_page_slotted_freespace_ptr(&bufPage) = 0; - // numslots_ptr will be set later. - *stasis_page_slotted_freelist_ptr(&bufPage) = INVALID_SLOT; - - const short numSlots = *stasis_page_slotted_numslots_ptr(page); - short lastFreeSlot = INVALID_SLOT; - short lastFreeSlotBeforeUsedSlot = INVALID_SLOT; - short lastUsedSlot = -1; - - // Rebuild free list. - - for(short i = 0; i < numSlots; i++) { - if(*stasis_page_slotted_slot_ptr(page, i) == INVALID_SLOT) { - if(lastFreeSlot == INVALID_SLOT) { - *stasis_page_slotted_freelist_ptr(&bufPage) = i; - } else { - *stasis_page_slotted_slot_length_ptr(&bufPage, lastFreeSlot) = i; - } - *stasis_page_slotted_slot_ptr(&bufPage, i) = INVALID_SLOT; - lastFreeSlot = i; - } else { - lastUsedSlot = i; - lastFreeSlotBeforeUsedSlot = lastFreeSlot; - - short logicalSize = *stasis_page_slotted_slot_length_ptr(page, i); - short physicalSize = stasis_record_type_to_size(logicalSize); - - memcpy(&(buffer[*stasis_page_slotted_freespace_ptr(&bufPage)]), stasis_page_slotted_record_ptr(page, i), physicalSize); - - *stasis_page_slotted_slot_ptr(&bufPage, i) = *stasis_page_slotted_freespace_ptr(&bufPage); - *stasis_page_slotted_slot_length_ptr(&bufPage, i) = logicalSize; - - (*stasis_page_slotted_freespace_ptr(&bufPage)) += physicalSize; - - } - } - - // Truncate linked list, and update numslots_ptr. - *stasis_page_slotted_slot_length_ptr(&bufPage, lastFreeSlotBeforeUsedSlot) = INVALID_SLOT; - *stasis_page_slotted_numslots_ptr(&bufPage) = lastUsedSlot+1; - - memcpy(page->memAddr, buffer, PAGE_SIZE); -#ifdef SLOTTED_PAGE_OLD_CHECKS - slottedFsck(page); -#endif // SLOTTED_PAGE_OLD_CHECKS + stasis_page_slotted_compact(page); } - static void slottedCompactSlotIDs(int xid, Page * p) { - int16_t numSlots = *stasis_page_slotted_numslots_ptr(p); - int16_t out = 0; - for(int16_t in = 0; in < numSlots; in++) { - if(*stasis_page_slotted_slot_ptr(p, in) == INVALID_SLOT) { - // nop - } else { - *stasis_page_slotted_slot_ptr(p, out) = *stasis_page_slotted_slot_cptr(p, in); - *stasis_page_slotted_slot_length_ptr(p, out) = *stasis_page_slotted_slot_length_cptr(p, in); - out++; - } - } - *stasis_page_slotted_numslots_ptr(p) = out; - *stasis_page_slotted_freelist_ptr(p) = INVALID_SLOT; + stasis_page_slotted_compact_slot_ids(p); } - -/** - Check to see how many bytes can fit in a given slot. This - makes it possible for callers to guarantee the safety - of a subsequent call to really_do_ralloc(). -*/ -static ssize_t slottedFreespaceForSlot(Page * page, int slot) { - ssize_t slotOverhead; - - if(slot == INVALID_SLOT) { - slotOverhead = (*stasis_page_slotted_freelist_ptr(page) == INVALID_SLOT) ? SLOTTED_PAGE_OVERHEAD_PER_RECORD : 0; - } else if(slot < *stasis_page_slotted_numslots_ptr(page)) { - slotOverhead = 0; - } else { - // slotOverhead = SLOTTED_PAGE_OVERHEAD_PER_RECORD * (*numslots_ptr(page) - slot); - slotOverhead = SLOTTED_PAGE_OVERHEAD_PER_RECORD * ((slot+1) - *stasis_page_slotted_numslots_ptr(page)); - } - // end_of_free_space points to the beginning of the slot header at the bottom of the page header. - byte* end_of_free_space = (byte*)stasis_page_slotted_slot_length_ptr(page, (*stasis_page_slotted_numslots_ptr(page))-1); - - // start_of_free_space points to the first unallocated byte in the page - // (ignoring space that could be reclaimed by compaction) - byte* start_of_free_space = (byte*)(page->memAddr + *stasis_page_slotted_freespace_ptr(page)); - - assert(end_of_free_space >= start_of_free_space); - - if(end_of_free_space < start_of_free_space + slotOverhead) { - // The regions would overlap after allocation. There is no free space. - return 0; - } else { - // The regions would not overlap. There might be free space. - return (ssize_t) (end_of_free_space - start_of_free_space - slotOverhead); - } -} - -static inline void slottedSanityCheck(Page * p, recordid rid) { -#ifdef SLOTTED_PAGE_OLD_CHECKS - assert(p->id == rid.page); - assert(rid.size == INVALID_SIZE || rid.size < BLOB_THRESHOLD_SIZE); // Caller deals with this now! - slottedFsck(p); -#endif -} - // -------------------------------------------------------------------------- // PUBLIC API IS BELOW THIS LINE // -------------------------------------------------------------------------- static const byte* slottedRead (int xid, Page *p, recordid rid) { - slottedSanityCheck(p, rid); - return stasis_page_slotted_record_ptr(p, rid.slot); } - static byte* slottedWrite(int xid, Page *p, recordid rid) { - slottedSanityCheck(p, rid); - return stasis_page_slotted_record_ptr(p, rid.slot); } static int slottedGetType(int xid, Page *p, recordid rid) { -#ifdef SLOTTED_PAGE_OLD_CHECKS - //sanityCheck(p, rid); <-- Would fail if rid.size is a blob - assert(p->id == rid.page); - slottedFsck(p); -#endif - if(rid.slot >= *stasis_page_slotted_numslots_ptr(p)) { return INVALID_SLOT; } - if(*stasis_page_slotted_slot_ptr(p, rid.slot) == INVALID_SLOT) { return INVALID_SLOT; } - int ret = *stasis_page_slotted_slot_length_ptr(p, rid.slot); - return ret >= 0 ? NORMAL_SLOT : ret; + return stasis_page_slotted_get_type(p, rid.slot); } static void slottedSetType(int xid, Page *p, recordid rid, int type) { - slottedSanityCheck(p, rid); - - int old_type = *stasis_page_slotted_slot_length_ptr(p, rid.slot); - assert(rid.slot < *stasis_page_slotted_numslots_ptr(p)); - assert(old_type != INVALID_SLOT); - - if(type == NORMAL_SLOT) { - // set slot_length_ptr to the physical length. - *stasis_page_slotted_slot_length_ptr(p, rid.slot) = stasis_record_type_to_size(old_type); - } else { - // Changing to a special slot type; make sure doing so doesn't change - // the record size. - assert(stasis_record_type_to_size(type) == stasis_record_type_to_size(old_type)); - *stasis_page_slotted_slot_length_ptr(p, rid.slot) = type; - } + stasis_page_slotted_set_type(p, rid.slot, type); } - static int slottedGetLength(int xid, Page *p, recordid rid) { -#ifdef SLOTTED_PAGE_OLD_CHECKS - assert(p->id == rid.page); - slottedFsck(p); -#endif - if( slottedGetType(xid, p, rid) == INVALID_SLOT) - return INVALID_SLOT; - else - return stasis_record_type_to_size(*stasis_page_slotted_slot_length_ptr(p, rid.slot)); + return stasis_page_slotted_get_length(p, rid.slot); } - static recordid slottedNext(int xid, Page *p, recordid rid) { - slottedSanityCheck(p, rid); - short n = *stasis_page_slotted_numslots_ptr(p); - rid.slot ++; - while(rid.slot < n && slottedGetType(xid,p,rid)==INVALID_SLOT) { - rid.slot++; - } - if(rid.slot != n) { + rid.slot = stasis_page_slotted_next_record(p, rid.slot); + + if(rid.slot != INVALID_SLOT) { rid.size = *stasis_page_slotted_slot_length_ptr(p, rid.slot); return rid; } else { return NULLRID; } } - static recordid slottedFirst(int xid, Page *p) { -#ifdef SLOTTED_PAGE_OLD_CHECKS - slottedFsck(p); -#endif recordid rid = { p->id, -1, 0 }; return slottedNext(xid, p, rid); } static recordid slottedLast(int xid, Page *p) { recordid rid = {p->id, -1, 0 }; - rid.slot = (*stasis_page_slotted_numslots_cptr(p)) - 1; + rid.slot = stasis_page_slotted_last_record(p); rid.size = *stasis_page_slotted_slot_length_cptr(p, rid.slot); return rid; } - static int notSupported(int xid, Page * p) { return 0; } static int slottedFreespace(int xid, Page * p) { -#ifdef SLOTTED_PAGE_OLD_CHECKS - slottedFsck(p); -#endif - - return slottedFreespaceForSlot(p, INVALID_SLOT); + return stasis_page_slotted_freespace_for_slot(p, INVALID_SLOT); } static recordid slottedPreRalloc(int xid, Page * p, int type) { - assert(type != INVALID_SLOT); -#ifdef SLOTTED_PAGE_OLD_CHECKS - slottedFsck(p); -#endif - - recordid rid; - rid.page = p->id; - rid.slot = *stasis_page_slotted_numslots_ptr(p); - rid.size = type; - - if(*stasis_page_slotted_freelist_ptr(p) != INVALID_SLOT) { - rid.slot = *stasis_page_slotted_freelist_ptr(p); + slotid_t slot = stasis_page_slotted_pre_alloc(p, type); + if(slot == INVALID_SLOT) { + return NULLRID; + } else { + recordid rid = { p->id, slot, type }; + return rid; } - - if(slottedFreespaceForSlot(p, rid.slot) < stasis_record_type_to_size(type)) { - rid = NULLRID; - } - - return rid; } - /** Allocate data on a page after deciding which recordid to allocate, and making sure there is enough freespace. @@ -390,7 +98,7 @@ static recordid slottedPreRalloc(int xid, Page * p, int type) { Allocation is complicated without locking. Consider this situation: (1) *numslot_ptr(page) is 10 - (2) An aborting transcation calls really_do_ralloc(page) with rid.slot = 12 + (2) An aborting transaction calls slottedPostRalloc(page) with rid.slot = 12 (3) *numslot_ptr(page) must be incremented to 12. Now, what happens to 11? - If 11 was also deleted by a transaction that could abort, we should lock it so that it won't be reused. (4) This function adds it to the freelist to avoid leaking space. (Therefore, Talloc() can return recordids that will @@ -401,159 +109,22 @@ static recordid slottedPreRalloc(int xid, Page * p, int type) { @param xid The transaction allocating the record. @param page A pointer to the page. - @param rid Recordid with 'internal' size. The size should have already been translated to a type if necessary. + @param rid rid.size should be a size or (for special records) a type that + stasis_record_type_to_size() can interpret. This allows callers + to store record type information in the page's size field. */ -static void slottedPostRalloc(int xid, Page * page, recordid rid) { - slottedSanityCheck(page, rid); - - short freeSpace; - - // Compact the page if we don't have enough room. - if(slottedFreespaceForSlot(page, rid.slot) < stasis_record_type_to_size(rid.size)) { - slottedCompact(page); - - // Make sure we have enough enough free space for the new record - assert (slottedFreespaceForSlot(page, rid.slot) >= stasis_record_type_to_size(rid.size)); - } - - freeSpace = *stasis_page_slotted_freespace_ptr(page); - - // Remove this entry from the freelist (if necessary) slottedCompact - // assumes that this does not change the order of items in the list. - // If it did, then slottedCompact could leaks slot id's (or worse!) - if(rid.slot < *stasis_page_slotted_numslots_ptr(page) && *stasis_page_slotted_slot_ptr(page,rid.slot) == INVALID_SLOT) { - short next = *stasis_page_slotted_freelist_ptr(page); - short last = INVALID_SLOT; - // special case: is the slot physically before us the predecessor? - if(rid.slot > 0) { - if(*stasis_page_slotted_slot_length_ptr(page, rid.slot-1) == rid.slot && *stasis_page_slotted_slot_ptr(page, rid.slot-1) == INVALID_SLOT) { - next = rid.slot; - last = rid.slot-1; - } - } - while(next != INVALID_SLOT && next != rid.slot) { - last = next; - assert(next < *stasis_page_slotted_numslots_ptr(page)); - short next_slot_ptr = *stasis_page_slotted_slot_ptr(page, next); - assert(next_slot_ptr == INVALID_SLOT); - next = *stasis_page_slotted_slot_length_ptr(page, next); - } - if(next == rid.slot) { - if(last == INVALID_SLOT) { - *stasis_page_slotted_freelist_ptr(page) = *stasis_page_slotted_slot_length_ptr(page, rid.slot); - } else { - *stasis_page_slotted_slot_length_ptr(page, last) = *stasis_page_slotted_slot_length_ptr(page, rid.slot); - } - } - } - - // Insert any slots that come between the previous numslots_ptr() - // and the slot we're allocating onto the freelist. In order to - // promote the reuse of free slot numbers, we go out of our way to make sure - // that we put them in the list in increasing order. (Note: slottedCompact's - // correctness depends on this behavior!) - - - if(rid.slot > *stasis_page_slotted_numslots_ptr(page)) { - short lastSlot; - short numSlots = *stasis_page_slotted_numslots_ptr(page); - if(*stasis_page_slotted_freelist_ptr(page) == INVALID_SLOT) { - - *stasis_page_slotted_freelist_ptr(page) = numSlots; - lastSlot = numSlots; - - *stasis_page_slotted_slot_ptr(page, lastSlot) = INVALID_SLOT; - // will set slot_length_ptr on next iteration. - - - (*stasis_page_slotted_numslots_ptr(page))++; - } else { - lastSlot = INVALID_SLOT; - short next = *stasis_page_slotted_freelist_ptr(page); - while(next != INVALID_SLOT) { - lastSlot = next; - next = *stasis_page_slotted_slot_length_ptr(page, lastSlot); - assert(lastSlot < *stasis_page_slotted_numslots_ptr(page)); - assert(*stasis_page_slotted_slot_ptr(page, lastSlot) == INVALID_SLOT); - } - *stasis_page_slotted_slot_ptr(page, lastSlot) = INVALID_SLOT; - - } - - // lastSlot now contains the tail of the free list. We can start adding slots to the list starting at *numslots_ptr. - - while(*stasis_page_slotted_numslots_ptr(page) < rid.slot) { - *stasis_page_slotted_slot_length_ptr(page, lastSlot) = *stasis_page_slotted_numslots_ptr(page); - lastSlot = *stasis_page_slotted_numslots_ptr(page); - *stasis_page_slotted_slot_ptr(page, lastSlot) = INVALID_SLOT; - (*stasis_page_slotted_numslots_ptr(page))++; - } - - // Terminate the end of the list. - assert(lastSlot < *stasis_page_slotted_numslots_ptr(page)); - *stasis_page_slotted_slot_length_ptr(page, lastSlot) = INVALID_SLOT; - - } - - if(*stasis_page_slotted_numslots_ptr(page) == rid.slot) { - *stasis_page_slotted_numslots_ptr(page) = rid.slot+1; - } - - assert(*stasis_page_slotted_numslots_ptr(page) > rid.slot); - - DEBUG("Num slots %d\trid.slot %d\n", *stasis_page_slotted_numslots_ptr(page), rid.slot); - - // Reserve space for this record and record the space's offset in - // the slot header. - - assert(rid.slot < *stasis_page_slotted_numslots_ptr(page)); - *stasis_page_slotted_freespace_ptr(page) = freeSpace + stasis_record_type_to_size(rid.size); - *stasis_page_slotted_slot_ptr(page, rid.slot) = freeSpace; - - *stasis_page_slotted_slot_length_ptr(page, rid.slot) = rid.size; - +static void slottedPostRalloc(int xid, Page * p, recordid rid) { + assert(((short)rid.size) == rid.size); + stasis_page_slotted_post_alloc(p, rid.slot, rid.size); } static void slottedSpliceSlot(int xid, Page *p, slotid_t a, slotid_t b) { - if(a==b) { return; } // no-op - assert(a < b); - int16_t b_slot = *stasis_page_slotted_slot_cptr(p, b); - int16_t b_slot_len = *stasis_page_slotted_slot_length_cptr(p, b); - for(int16_t i = b-1; i >= a; i--) { - *stasis_page_slotted_slot_ptr(p, i+1) = *stasis_page_slotted_slot_cptr(p, i); - *stasis_page_slotted_slot_length_ptr(p, i+1) = *stasis_page_slotted_slot_length_cptr(p, i); - } - *stasis_page_slotted_slot_ptr(p, a) = b_slot; - *stasis_page_slotted_slot_length_ptr(p, a) = b_slot_len; + stasis_page_slotted_splice_slot(p, a, b); } - static void slottedFree(int xid, Page * p, recordid rid) { - slottedSanityCheck(p, rid); - - ssize_t rec_size = stasis_record_type_to_size(*stasis_page_slotted_slot_length_ptr(p, rid.slot)); - - if(*stasis_page_slotted_freespace_ptr(p) == *stasis_page_slotted_slot_ptr(p, rid.slot) + rec_size) { - (*stasis_page_slotted_freespace_ptr(p)) -= rec_size; - } - - assert(rid.slot < *stasis_page_slotted_numslots_ptr(p)); - if(rid.slot == *stasis_page_slotted_numslots_ptr(p)-1) { - (*stasis_page_slotted_numslots_ptr(p))--; - assert(slottedGetType(xid,p,rid)==INVALID_SLOT); - } else { - *stasis_page_slotted_slot_ptr(p, rid.slot) = INVALID_SLOT; - *stasis_page_slotted_slot_length_ptr(p, rid.slot) = *stasis_page_slotted_freelist_ptr(p); - *stasis_page_slotted_freelist_ptr(p) = rid.slot; - assert(slottedGetType(xid,p,rid)==INVALID_SLOT); - } -#ifdef SLOTTED_PAGE_OLD_CHECKS - slottedFsck(p); -#endif + stasis_page_slotted_free(p, rid.slot); } - - // XXX dereferenceRID - static void slottedLoaded(Page *p) { p->LSN = *stasis_page_lsn_ptr(p); slottedFsck(p); @@ -564,6 +135,12 @@ static void slottedFlushed(Page *p) { } static void slottedCleanup(Page *p) { } +void stasis_page_slotted_initialize_page(Page * page) { + stasis_page_cleanup(page); + page->pageType = SLOTTED_PAGE; + stasis_page_slotted_initialize_page_raw(page); +} + void stasis_page_slotted_init() { #ifdef SLOTTED_PAGE_CHECK_FOR_OVERLAP #ifdef SLOTTED_PAGE_OLD_CHECKS @@ -575,14 +152,6 @@ void stasis_page_slotted_init() { void stasis_page_slotted_deinit() { } -void stasis_page_slotted_initialize_page(Page * page) { - stasis_page_cleanup(page); - page->pageType = SLOTTED_PAGE; - *stasis_page_slotted_freespace_ptr(page) = 0; - *stasis_page_slotted_numslots_ptr(page) = 0; - *stasis_page_slotted_freelist_ptr(page) = INVALID_SLOT; -} - page_impl stasis_page_slotted_impl() { static page_impl pi = { SLOTTED_PAGE, diff --git a/src/stasis/util/stlredblack.cpp b/src/stasis/util/stlredblack.cpp index c8fde9b..e793c5d 100644 --- a/src/stasis/util/stlredblack.cpp +++ b/src/stasis/util/stlredblack.cpp @@ -10,10 +10,6 @@ #include #include #include -#undef end -#undef try -#undef catch -#include extern "C" { typedef int (*c_cmp_t)(const void*, const void*, const void*); diff --git a/stasis/page.h b/stasis/page.h index e4bde7d..291daa2 100644 --- a/stasis/page.h +++ b/stasis/page.h @@ -40,64 +40,6 @@ permission to use and distribute the software in accordance with the terms specified in this license. ---*/ -/** - * @file - * - * interface for dealing with generic, LSN based pages - * - * This file provides a re-entrant interface for pages that are labeled - * with an LSN and a page type. - * - * @ingroup PAGE_FORMATS - * - * $Id$ - */ - -/** - @defgroup PAGE_FORMATS Page layouts - - Stasis allows developers to define their own on-disk page formats. - Currently, each page format must end with a hard-coded header - containing an LSN and a page type. (This restriction will be - removed in the future.) - - This section explains how new page formats can be implemented in - Stasis, and documents the currently available page types. - - A number of callbacks are invoked on existing pages as they are read - from disk, flushed back, and ultimately, evicted from cache: - - -# stasis_page_loaded() is invoked when the page is read from disk. It - should set the Page::LSN field appropriately, and - perhaps allocate any data structures that will be stored in the - Page::impl field. - -# stasis_page_flushed() is invoked when a dirty page is written back to - disk. It should make sure that all updates are applied to the - physical representation of the page. (Implementations of this - callback usually copy the Page::LSN field into the page header.) - -# stasis_page_cleanup() is invoked before a page is evicted from cache. - It should free any memory associated with the page, such as - that allocated by stasis_page_loaded(), or pointed to by Page::impl. - - When an uninitialized page is read from disk, Stasis has no way of - knowing which stasis_page_loaded() callback should be invoked. Therefore, - when a new page is initialized the page initialization code should - perform the work that would normally be performed by stasis_page_loaded(). - Similarly, before a page is freed (and therefore, will be treated as - uninitialized data) stasis_page_cleanup() should be called. - - Page implementations are free to define their own access methods - and APIs. However, Stasis's record oriented page interface - provides a default set of methods for page access. - - @see PAGE_RECORD_INTERFACE - - @todo Page deallocators should call stasis_page_cleanup() - @todo Create variant of loadPage() that takes a page type - @todo Add support for LSN free pages. - - */ -/*@{*/ #ifndef __PAGE_H__ #define __PAGE_H__ @@ -121,6 +63,7 @@ BEGIN_C_DECLS @todo Remove next and prev from Page_s */ + struct Page_s { pageid_t id; /** @@ -241,205 +184,6 @@ struct Page_s { +----------------------------------------------------+-----------+-----+ */ -/*@{*/ -static inline lsn_t* stasis_page_lsn_ptr(Page *p) { - return ((lsn_t*)(&(p->memAddr[PAGE_SIZE])))-1; -} -static inline const lsn_t* stasis_page_lsn_cptr(const Page *p) { - return ((const lsn_t*)(&(p->memAddr[PAGE_SIZE])))-1; -} - -/** - Returns a pointer to the page's type. This information is stored with the LSN. - Stasis uses it to determine which page implementation should handle each - page. - - @param p Any page that contains an LSN header. - @see stasis_page_impl_register - @todo Need to typedef page_type_t - */ -static inline int* stasis_page_type_ptr(Page *p) { - return ((int*)stasis_page_lsn_ptr(p))-1; -} -static inline const int* stasis_page_type_cptr(const Page *p) { - return ((const int*)stasis_page_lsn_cptr(p))-1; -} - -/** - * assumes that the page is already loaded in memory. It takes as a - * parameter a Page. The Page struct contains the new LSN and the - * page number to which the new LSN must be written to. Furthermore, - * this function updates the dirtyPages table, if necessary. The - * dirtyPages table is needed for log truncation. (If the page->id is - * null, this function assumes the page is not in the buffer pool, and - * does not update dirtyPages. Similarly, if the page is already - * dirty, there is no need to update dirtyPages. - * - * @param xid The transaction that is writing to the page, or -1 if - * outside of a transaction. - * - * @param page You must have a writelock on page before calling this - * function. - * - * @param lsn The new lsn of the page. If the new lsn is less than - * the page's current LSN, then the page's LSN will not be changed. - * If the page is clean, the new LSN must be greater than the old LSN. - */ -void stasis_page_lsn_write(int xid, Page * page, lsn_t lsn); - -/** - * assumes that the page is already loaded in memory. It takes - * as a parameter a Page and returns the LSN that is currently written on that - * page in memory. - */ -lsn_t stasis_page_lsn_read(const Page * page); - -/*@}*/ - -/** - @defgroup PAGE_UTIL Byte-level page manipulation - - These methods make it easy to manipulate pages that use a standard - Stasis header (one with an LSN and page type). - - Each one counts bytes from the beginning or end of the page's - usable space. Methods with "_cptr_" in their names return const - pointers (and can accept const Page pointers as arguments). - Methods with "_ptr_" in their names take non-const pages, and - return non-const pointers. - - @par Implementing new pointer arithmetic macros - - Stasis page type implementations typically do little more than - pointer arithmetic. However, implementing page types cleanly and - portably is a bit tricky. Stasis has settled upon a compromise in - this matter. Its page file formats are compatible within a single - architecture, but not across systems with varying lengths of - primitive types, or that vary in endianness. - - Over time, types that vary in length such as "int", "long", etc - will be removed from Stasis, but their usage still exists in a few - places. Once they have been removed, file compatibility problems - should be limited to endianness (though application code will still - be free to serialize objects in a non-portable manner). - - Most page implementations leverage C's pointer manipulation - semantics to lay out pages. Rather than casting pointers to - char*'s and then manually calculating byte offsets using sizeof(), - the existing page types prefer to cast pointers to appropriate - types, and then add or subtract the appropriate number of values. - - For example, instead of doing this: - - @code - // p points to an int, followed by a two bars, then the foo whose address - // we want to calculate - - int * p; - foo* f = (foo*)( ((char*)p) + sizeof(int) + 2 * sizeof(bar)) - @endcode - - the implementations would do this: - - @code - int * p; - foo * f = (foo*)( ((bar*)(p+1)) + 2 ) - @endcode - - The main disadvantage of this approach is the large number of ()'s - involved. However, it lets the compiler deal with the underlying - multiplications, and often reduces the number of casts, leading to - slightly more readable code. Take this implementation of - stasis_page_type_ptr(), for example: - - @code - int * stasis_page_type_ptr(Page *p) { - return ( (int*)stasis_page_lsn_ptr(Page *p) ) - 1; - } - @endcode - - Here, the page type is stored as an integer immediately before the - LSN pointer. Using arithmetic over char*'s would require an extra - cast to char*, and a multiplication by sizeof(int). - -*/ -/*@{*/ -static inline byte* -stasis_page_byte_ptr_from_start(Page *p, int count) { - return ((byte*)(p->memAddr))+count; -} -static inline byte* -stasis_page_byte_ptr_from_end(Page *p, int count) { - return ((byte*)stasis_page_type_ptr(p))-count; -} - -static inline int16_t* -stasis_page_int16_ptr_from_start(Page *p, int count) { - return ((int16_t*)(p->memAddr))+count; -} - -static inline int16_t* -stasis_page_int16_ptr_from_end(Page *p, int count) { - return ((int16_t*)stasis_page_type_ptr(p))-count; -} -static inline int32_t* -stasis_page_int32_ptr_from_start(Page *p, int count) { - return ((int32_t*)(p->memAddr))+count; -} - -static inline int32_t* -stasis_page_int32_ptr_from_end(Page *p, int count) { - return ((int32_t*)stasis_page_type_ptr(p))-count; -} -static inline pageid_t* -stasis_page_pageid_t_ptr_from_start(Page *p, int count) { - return ((pageid_t*)(p->memAddr))+count; -} - -static inline pageid_t* -stasis_page_pageid_t_ptr_from_end(Page *p, int count) { - return ((pageid_t*)stasis_page_type_ptr(p))-count; -} -// Const methods -static inline const byte* -stasis_page_byte_cptr_from_start(const Page *p, int count) { - return (const byte*)stasis_page_byte_ptr_from_start((Page*)p, count); -} -static inline const byte* -stasis_page_byte_cptr_from_end(const Page *p, int count) { - return (const byte*)stasis_page_byte_ptr_from_end((Page*)p, count); -} - -static inline const int16_t* -stasis_page_int16_cptr_from_start(const Page *p, int count) { - return (const int16_t*)stasis_page_int16_ptr_from_start((Page*)p,count); -} - -static inline const int16_t* -stasis_page_int16_cptr_from_end(const Page *p, int count) { - return ((int16_t*)stasis_page_type_cptr(p))-count; -} -static inline const int32_t* -stasis_page_int32_cptr_from_start(const Page *p, int count) { - return ((const int32_t*)(p->memAddr))+count; -} - -static inline const int32_t* -stasis_page_int32_cptr_from_end(const Page *p, int count) { - return (const int32_t*)stasis_page_int32_ptr_from_end((Page*)p,count); -} -static inline const pageid_t* -stasis_page_pageid_t_cptr_from_start(const Page *p, int count) { - return ((const pageid_t*)(p->memAddr))+count; -} - -static inline const pageid_t* -stasis_page_pageid_t_cptr_from_end(const Page *p, int count) { - return (const pageid_t*)stasis_page_pageid_t_cptr_from_end(p,count); -} - - -/*@}*/ /** * initializes all the global variables needed by the functions @@ -785,7 +529,7 @@ typedef struct page_impl { @param p the page whose freespace will be estimated. @return The number of bytes of free space on the page, or (for - efficiency's sake) an underestimate. + efficiency's sake) an underestimate. */ int (*pageFreespace)(int xid, Page * p); @@ -909,26 +653,22 @@ int stasis_page_impl_register(page_impl impl); */ page_impl* stasis_page_impl_get(int id); - -// -------------------- Page specific, general purpose methods - -/** - Initialize a new page - - @param p The page that will be turned into a new slotted page. - Its contents will be overwritten. It was probably - returned by loadPage() - */ -void stasis_page_slotted_initialize_page(Page * p); -void stasis_page_slotted_latch_free_initialize_page(Page * page); -void stasis_slotted_lsn_free_initialize_page(Page * p); -void stasis_fixed_initialize_page(Page * page, size_t size, int count); -void stasis_indirect_initialize_page(Page * p, int height); -int stasis_fixed_records_per_page(size_t size); -void stasis_page_blob_initialize_page(Page * p); - page_impl slottedLsnFreeImpl(); page_impl segmentImpl(); + +#define stasis_mempage_memaddr(p) (p) +#define stasis_page(x) stasis_mempage_##x +#define PAGE byte + +#include "page/page-impl.h" + +#undef stasis_mempage_memaddr +#undef stasis_page +#undef PAGE + +#include "page/page-impl.h" + + END_C_DECLS #endif diff --git a/stasis/page/fixed-impl.h b/stasis/page/fixed-impl.h new file mode 100644 index 0000000..333fbe5 --- /dev/null +++ b/stasis/page/fixed-impl.h @@ -0,0 +1,93 @@ +/* + * fixed-impl.h + * + * Created on: Nov 7, 2011 + * Author: sears + */ + +static inline int16_t* stasis_page(fixed_recordsize_ptr) (PAGE * p) { return stasis_page(int16_ptr_from_end) (p, 1); } +static inline int16_t* stasis_page(fixed_recordcount_ptr)(PAGE * p) { return stasis_page(int16_ptr_from_end) (p, 2); } +static inline byte* stasis_page(fixed_record_ptr) (PAGE * p, slotid_t n) { return stasis_page(byte_ptr_from_start)(p, *stasis_page(fixed_recordsize_ptr)(p) * n); } + +static inline const int16_t* stasis_page(fixed_recordsize_cptr) (const PAGE * p) { return stasis_page(fixed_recordsize_ptr) ((PAGE*)p); } +static inline const int16_t* stasis_page(fixed_recordcount_cptr)(const PAGE * p) { return stasis_page(fixed_recordcount_ptr)((PAGE*)p); } +static inline const byte* stasis_page(fixed_record_cptr) (const PAGE * p, slotid_t n) { return stasis_page(fixed_record_ptr) ((PAGE*)p, n); } + +static inline slotid_t stasis_page(fixed_next_slot)(PAGE *p, slotid_t slot) { + slot++; + if(*stasis_page(fixed_recordcount_cptr)(p) > slot) { + return slot; + } else { + return -1; + } +} +static inline slotid_t stasis_page(fixed_first_slot)(PAGE *p) { + return stasis_page(fixed_next_slot)(p, -1); +} +static inline slotid_t stasis_page(fixed_last_slot)(PAGE *p) { + return -(*stasis_page(fixed_recordcount_cptr)(p)) - 1; +} + +static inline int stasis_page(fixed_records_per_page)(size_t size) { + return (USABLE_SIZE_OF_PAGE - 2*sizeof(short)) / size; +} +static inline void stasis_page(fixed_initialize_page_raw)(PAGE * page, size_t size, int count) { + // Zero out the page contents, since callers often do so anyway. + // blows away LSN, but the copy that's in p->LSN will be put back on page at flush. + memset(stasis_page(memaddr)(page), 0, PAGE_SIZE); + *stasis_page(fixed_recordsize_ptr)(page) = size; + assert(count <= stasis_page(fixed_records_per_page)(size)); + *stasis_page(fixed_recordcount_ptr)(page)= count; +} + +static inline int stasis_page(fixed_get_type)(PAGE *p, slotid_t slot) { + // checkRid(p, rid); + if(slot < *stasis_page(fixed_recordcount_cptr)(p)) { + int type = *stasis_page(fixed_recordsize_cptr)(p); + if(type > 0) { + type = NORMAL_SLOT; + } + return type; + } else { + return INVALID_SLOT; + } +} +static inline void stasis_page(fixed_set_type)(PAGE *p, slotid_t slot, int type) { +//XXX stasis_page(checkRid)(p,rid); + assert(slot < *stasis_page(fixed_recordcount_cptr)(p)); + assert(stasis_record_type_to_size(type) == stasis_record_type_to_size(*stasis_page(fixed_recordsize_cptr)(p))); +} +static inline int stasis_page(fixed_get_length)(PAGE *p, slotid_t slot) { + return slot > *stasis_page(fixed_recordcount_cptr)(p) ? + INVALID_SLOT : stasis_record_type_to_size(*stasis_page(fixed_recordsize_cptr)(p)); +} + +static inline int stasis_page(fixed_freespace_raw)(PAGE * p) { + if(stasis_page(fixed_records_per_page)(*stasis_page(fixed_recordsize_cptr)(p)) > *stasis_page(fixed_recordcount_cptr)(p)) { + // Return the size of a slot; that's the biggest record we can take. + return stasis_record_type_to_size(*stasis_page(fixed_recordsize_cptr)(p)); + } else { + // Page full; return zero. + return 0; + } +} +static inline int16_t stasis_page(fixed_pre_alloc)(PAGE *p, int size) { + if(stasis_page(fixed_records_per_page)(*stasis_page(fixed_recordsize_cptr)(p)) > *stasis_page(fixed_recordcount_cptr)(p)) { + return *stasis_page(fixed_recordcount_cptr)(p); + } else { + return -1; + } +} +static inline void stasis_page(fixed_post_alloc)(PAGE *p, slotid_t n) { + assert(*stasis_page(fixed_recordcount_cptr)(p) == n); + (*stasis_page(fixed_recordcount_ptr)(p))++; +} + +static inline void stasis_page(fixed_free)(PAGE *p, slotid_t n) { + if(*stasis_page(fixed_recordsize_cptr)(p) == n+1) { + (*stasis_page(fixed_recordsize_ptr)(p))--; + } else { + // leak space; there's no way to track it with this page format. + } +} + diff --git a/stasis/page/fixed.h b/stasis/page/fixed.h index 57aee22..5cfa4f3 100644 --- a/stasis/page/fixed.h +++ b/stasis/page/fixed.h @@ -2,38 +2,13 @@ #ifndef __FIXED_H #define __FIXED_H -/** - @todo rename fixed.h macros turn them into static inline functions. -*/ -#define recordsize_ptr(page) stasis_page_int16_ptr_from_end((page), 1) -#define recordsize_cptr(page) stasis_page_int16_cptr_from_end((page), 1) -#define recordcount_ptr(page) stasis_page_int16_ptr_from_end((page), 2) -#define fixed_record_ptr(page, n) stasis_page_byte_ptr_from_start((page), *recordsize_ptr((page)) * (n)) -static inline recordid fixedNext(int xid, Page *p, recordid rid) { - short n = *recordcount_ptr(p); - rid.slot++; - rid.size = *recordsize_ptr(p); - if(rid.slot >= n) { - return NULLRID; - } else { - return rid; - } -} -static inline recordid fixedFirst(int xid, Page *p) { - recordid rid = { p->id, -1, 0 }; - rid.size = *recordsize_ptr(p); - return fixedNext(xid, p, rid); -} -static inline recordid fixedLast(int xid, Page *p) { - recordid rid = { p->id, -1, 0 }; - rid.size = *recordsize_ptr(p); - rid.slot = -1+*recordcount_ptr(p); - return rid; -} +void stasis_page_fixed_init(); +void stasis_page_fixed_deinit(); -void fixedPageInit(); -void fixedPageDeinit(); -page_impl fixedImpl(); -page_impl arrayListImpl(); +void stasis_page_fixed_initialize_page(Page * page, size_t size, int count); +recordid stasis_page_fixed_next_record(int xid, Page *p, recordid rid); + +page_impl stasis_page_fixed_impl(); +page_impl stasis_page_array_list_impl(); #endif diff --git a/stasis/page/page-impl.h b/stasis/page/page-impl.h new file mode 100644 index 0000000..ba27183 --- /dev/null +++ b/stasis/page/page-impl.h @@ -0,0 +1,296 @@ +/* + * page-impl.h + * + * Created on: Nov 7, 2011 + * Author: sears + */ + +/** + * @file + * + * interface for dealing with generic, LSN based pages + * + * This file provides a re-entrant interface for pages that are labeled + * with an LSN and a page type. + * + * @ingroup PAGE_FORMATS + * + * $Id: page.h 1526 2011-06-13 11:26:25Z sears.russell@gmail.com $ + */ + +#ifndef stasis_page +#define stasis_page_memaddr(p) (p->memAddr) +#define stasis_page(x) stasis_page_##x +#define PAGE Page +#define STASIS_PAGE_IMPL_NEED_UNDEF +#endif + +/** + @defgroup PAGE_FORMATS Page layouts + + Stasis allows developers to define their own on-disk page formats. + Currently, each page format must end with a hard-coded header + containing an LSN and a page type. (This restriction will be + removed in the future.) + + This section explains how new page formats can be implemented in + Stasis, and documents the currently available page types. + + A number of callbacks are invoked on existing pages as they are read + from disk, flushed back, and ultimately, evicted from cache: + + -# stasis_page_loaded() is invoked when the page is read from disk. It + should set the Page::LSN field appropriately, and + perhaps allocate any data structures that will be stored in the + Page::impl field. + -# stasis_page_flushed() is invoked when a dirty page is written back to + disk. It should make sure that all updates are applied to the + physical representation of the page. (Implementations of this + callback usually copy the Page::LSN field into the page header.) + -# stasis_page_cleanup() is invoked before a page is evicted from cache. + It should free any memory associated with the page, such as + that allocated by stasis_page_loaded(), or pointed to by Page::impl. + + When an uninitialized page is read from disk, Stasis has no way of + knowing which stasis_page_loaded() callback should be invoked. Therefore, + when a new page is initialized the page initialization code should + perform the work that would normally be performed by stasis_page_loaded(). + Similarly, before a page is freed (and therefore, will be treated as + uninitialized data) stasis_page_cleanup() should be called. + + Page implementations are free to define their own access methods + and APIs. However, Stasis's record oriented page interface + provides a default set of methods for page access. + + @see PAGE_RECORD_INTERFACE + + @todo Page deallocators should call stasis_page_cleanup() + @todo Create variant of loadPage() that takes a page type + @todo Add support for LSN free pages. + + */ +/*@{*/ + +/*@{*/ +static inline lsn_t* stasis_page(lsn_ptr)(PAGE *p) { + return ((lsn_t*)(&(stasis_page(memaddr)(p)[PAGE_SIZE])))-1; +} +static inline const lsn_t* stasis_page(lsn_cptr)(const PAGE *p) { + return ((const lsn_t*)(&(stasis_page(memaddr)(p)[PAGE_SIZE])))-1; +} + +/** + Returns a pointer to the page's type. This information is stored with the LSN. + Stasis uses it to determine which page implementation should handle each + page. + + @param p Any page that contains an LSN header. + @see stasis_page_impl_register + @todo Need to typedef page_type_t + */ +static inline int* stasis_page(type_ptr)(PAGE *p) { + return ((int*)stasis_page(lsn_ptr)(p))-1; +} +static inline const int* stasis_page(type_cptr)(const PAGE *p) { + return ((const int*)stasis_page(lsn_cptr)(p))-1; +} + +/** + * assumes that the page is already loaded in memory. It takes as a + * parameter a Page. The Page struct contains the new LSN and the + * page number to which the new LSN must be written to. Furthermore, + * this function updates the dirtyPages table, if necessary. The + * dirtyPages table is needed for log truncation. (If the page->id is + * null, this function assumes the page is not in the buffer pool, and + * does not update dirtyPages. Similarly, if the page is already + * dirty, there is no need to update dirtyPages. + * + * @param xid The transaction that is writing to the page, or -1 if + * outside of a transaction. + * + * @param page You must have a writelock on page before calling this + * function. + * + * @param lsn The new lsn of the page. If the new lsn is less than + * the page's current LSN, then the page's LSN will not be changed. + * If the page is clean, the new LSN must be greater than the old LSN. + */ +void stasis_page(lsn_write)(int xid, PAGE * page, lsn_t lsn); + +/** + * assumes that the page is already loaded in memory. It takes + * as a parameter a Page and returns the LSN that is currently written on that + * page in memory. + */ +lsn_t stasis_page(lsn_read)(const PAGE * page); + +/*@}*/ + +/** + @defgroup PAGE_UTIL Byte-level page manipulation + + These methods make it easy to manipulate pages that use a standard + Stasis header (one with an LSN and page type). + + Each one counts bytes from the beginning or end of the page's + usable space. Methods with "_cptr_" in their names return const + pointers (and can accept const Page pointers as arguments). + Methods with "_ptr_" in their names take non-const pages, and + return non-const pointers. + + @par Implementing new pointer arithmetic macros + + Stasis page type implementations typically do little more than + pointer arithmetic. However, implementing page types cleanly and + portably is a bit tricky. Stasis has settled upon a compromise in + this matter. Its page file formats are compatible within a single + architecture, but not across systems with varying lengths of + primitive types, or that vary in endianness. + + Over time, types that vary in length such as "int", "long", etc + will be removed from Stasis, but their usage still exists in a few + places. Once they have been removed, file compatibility problems + should be limited to endianness (though application code will still + be free to serialize objects in a non-portable manner). + + Most page implementations leverage C's pointer manipulation + semantics to lay out pages. Rather than casting pointers to + char*'s and then manually calculating byte offsets using sizeof(), + the existing page types prefer to cast pointers to appropriate + types, and then add or subtract the appropriate number of values. + + For example, instead of doing this: + + @code + // p points to an int, followed by a two bars, then the foo whose address + // we want to calculate + + int * p; + foo* f = (foo*)( ((char*)p) + sizeof(int) + 2 * sizeof(bar)) + @endcode + + the implementations would do this: + + @code + int * p; + foo * f = (foo*)( ((bar*)(p+1)) + 2 ) + @endcode + + The main disadvantage of this approach is the large number of ()'s + involved. However, it lets the compiler deal with the underlying + multiplications, and often reduces the number of casts, leading to + slightly more readable code. Take this implementation of + stasis_page_type_ptr(), for example: + + @code + int * stasis_page_type_ptr(Page *p) { + return ( (int*)stasis_page_lsn_ptr(Page *p) ) - 1; + } + @endcode + + Here, the page type is stored as an integer immediately before the + LSN pointer. Using arithmetic over char*'s would require an extra + cast to char*, and a multiplication by sizeof(int). + +*/ +/*@{*/ +static inline byte* +stasis_page(byte_ptr_from_start)(PAGE *p, int count) { + return ((byte*)(stasis_page(memaddr)(p)))+count; +} +static inline byte* +stasis_page(byte_ptr_from_end)(PAGE *p, int count) { + return ((byte*)stasis_page(type_ptr)(p))-count; +} + +static inline int16_t* +stasis_page(int16_ptr_from_start)(PAGE *p, int count) { + return ((int16_t*)(stasis_page(memaddr)(p)))+count; +} + +static inline int16_t* +stasis_page(int16_ptr_from_end)(PAGE *p, int count) { + return ((int16_t*)stasis_page(type_ptr)(p))-count; +} +static inline int32_t* +stasis_page(int32_ptr_from_start)(PAGE *p, int count) { + return ((int32_t*)(stasis_page(memaddr)(p)))+count; +} + +static inline int32_t* +stasis_page(int32_ptr_from_end)(PAGE *p, int count) { + return ((int32_t*)stasis_page(type_ptr)(p))-count; +} +static inline pageid_t* +stasis_page(pageid_t_ptr_from_start)(PAGE *p, int count) { + return ((pageid_t*)(stasis_page(memaddr)(p)))+count; +} + +static inline pageid_t* +stasis_page(pageid_t_ptr_from_end)(PAGE *p, int count) { + return ((pageid_t*)stasis_page(type_ptr)(p))-count; +} +// Const methods +static inline const byte* +stasis_page(byte_cptr_from_start)(const PAGE *p, int count) { + return (const byte*)stasis_page(byte_ptr_from_start)((PAGE*)p, count); +} +static inline const byte* +stasis_page(byte_cptr_from_end)(const PAGE *p, int count) { + return (const byte*)stasis_page(byte_ptr_from_end)((PAGE*)p, count); +} + +static inline const int16_t* +stasis_page(int16_cptr_from_start)(const PAGE *p, int count) { + return (const int16_t*)stasis_page(int16_ptr_from_start)((PAGE*)p,count); +} + +static inline const int16_t* +stasis_page(int16_cptr_from_end)(const PAGE *p, int count) { + return ((int16_t*)stasis_page(type_cptr)(p))-count; +} +static inline const int32_t* +stasis_page(int32_cptr_from_start)(const PAGE *p, int count) { + return ((const int32_t*)(stasis_page(memaddr)(p)))+count; +} + +static inline const int32_t* +stasis_page(int32_cptr_from_end)(const PAGE *p, int count) { + return (const int32_t*)stasis_page(int32_ptr_from_end)((PAGE*)p,count); +} +static inline const pageid_t* +stasis_page(pageid_t_cptr_from_start)(const PAGE *p, int count) { + return ((const pageid_t*)(stasis_page(memaddr)(p)))+count; +} + +static inline const pageid_t* +stasis_page(pageid_t_cptr_from_end)(const PAGE *p, int count) { + return (const pageid_t*)stasis_page(pageid_t_cptr_from_end)(p,count); +} +/*@}*/ + +// -------------------- Page specific, general purpose methods + + +/** + Initialize a new page + + @param p The page that will be turned into a new slotted page. + Its contents will be overwritten. It was probably + returned by loadPage() + */ +void stasis_page(slotted_initialize_page)(PAGE * p); +void stasis_page(slotted_latch_free_initialize_page)(PAGE * page); +void stasis_page(slotted_lsn_free_initialize_page)(Page * p); +void stasis_page(indirect_initialize_page)(Page * p, int height); +void stasis_page(blob_initialize_page)(PAGE * p); + +#include "fixed-impl.h" +#include "slotted-impl.h" + +#ifdef STASIS_PAGE_IMPL_NEED_UNDEF +#undef stasis_page_memaddr +#undef stasis_page +#undef PAGE +#undef STASIS_PAGE_IMPL_NEED_UNDEF +#endif diff --git a/stasis/page/slotted-impl.h b/stasis/page/slotted-impl.h new file mode 100644 index 0000000..a0737a1 --- /dev/null +++ b/stasis/page/slotted-impl.h @@ -0,0 +1,461 @@ +/* + * slotted-impl.h + * + * Created on: Nov 7, 2011 + * Author: sears + */ + +#ifdef LONG_TEST +#define SLOTTED_PAGE_CHECK_FOR_OVERLAP 1 +#endif + +// plan: slotted fsck on read / write. Make it more thorough so that the other methods only check for record existence. +// intermediate ops assume that slotted.c is correctly implemented (ie: fsck passes iff page is OK, page always "stays" OK) +// benchmark page ops (real time) + hash table (user time) + +//#define SLOTTED_PAGE_OLD_CHECKS +#define SLOTTED_PAGE_NEW_CHECKS +#ifdef SLOTTED_PAGE_NEW_CHECKS +#define SLOTTED_PAGE_SANITY_CHECKS +#define SLOTTED_PAGE_CHECK_FOR_OVERLAP +#endif + +#define SLOTTED_PAGE_OVERHEAD_PER_RECORD (2 * sizeof(short)) +#define SLOTTED_PAGE_HEADER_OVERHEAD (3 * sizeof(short)) + +static inline int16_t* stasis_page(slotted_freespace_ptr) (PAGE * p) { return stasis_page(int16_ptr_from_end) (p, 1); } +static inline int16_t* stasis_page(slotted_numslots_ptr) (PAGE * p) { return stasis_page(int16_ptr_from_end) (p, 2); } +static inline int16_t* stasis_page(slotted_freelist_ptr) (PAGE * p) { return stasis_page(int16_ptr_from_end) (p, 3); } +static inline int16_t* stasis_page(slotted_slot_ptr) (PAGE * p, slotid_t n) { return stasis_page(int16_ptr_from_end) (p, (2*(n))+4); } +static inline int16_t* stasis_page(slotted_slot_length_ptr)(PAGE * p, slotid_t n) { return stasis_page(int16_ptr_from_end) (p, (2*(n))+5); } +static inline byte* stasis_page(slotted_record_ptr) (PAGE * p, slotid_t n) { return stasis_page(byte_ptr_from_start)(p, *stasis_page(slotted_slot_ptr)(p, n)); } + +static inline const int16_t* stasis_page(slotted_freespace_cptr) (const PAGE * p) { return stasis_page(slotted_freespace_ptr) ((PAGE*)p); } +static inline const int16_t* stasis_page(slotted_numslots_cptr) (const PAGE * p) { return stasis_page(slotted_numslots_ptr) ((PAGE*)p); } +static inline const int16_t* stasis_page(slotted_freelist_cptr) (const PAGE * p) { return stasis_page(slotted_freelist_ptr) ((PAGE*)p); } +static inline const int16_t* stasis_page(slotted_slot_cptr) (const PAGE * p, slotid_t n) { return stasis_page(slotted_slot_ptr) ((PAGE*)p, n); } +static inline const int16_t* stasis_page(slotted_slot_length_cptr)(const PAGE * p, slotid_t n) { return stasis_page(slotted_slot_length_ptr)((PAGE*)p, n); } +static inline const byte* stasis_page(slotted_record_cptr) (const PAGE * p, slotid_t n) { return stasis_page(slotted_record_ptr) ((PAGE*)p, n); } + +static inline void stasis_page(slotted_initialize_page_raw)(PAGE * page) { + *stasis_page(slotted_freespace_ptr)(page) = 0; + *stasis_page(slotted_numslots_ptr)(page) = 0; + *stasis_page(slotted_freelist_ptr)(page) = INVALID_SLOT; +} + +static inline void stasis_page(slotted_check)(const PAGE * page) { + + const short numslots = *stasis_page(slotted_numslots_cptr)(page); + const short freespace = *stasis_page(slotted_freespace_cptr)(page); + const short freelist = *stasis_page(slotted_freelist_cptr)(page); + + const long slotListStart = (const byte*)stasis_page(slotted_slot_length_cptr)(page, numslots-1) + - (const byte*)stasis_page(memaddr)(page); + assert(slotListStart < PAGE_SIZE && slotListStart >= 0); + assert(numslots >= 0); + assert(numslots * SLOTTED_PAGE_OVERHEAD_PER_RECORD < PAGE_SIZE); + assert(freespace >= 0); + assert(freespace <= slotListStart); + assert(freelist >= INVALID_SLOT); + assert(freelist < numslots); + +#ifdef SLOTTED_PAGE_SANITY_CHECKS + + // Check integrity of freelist. All free slots less than + // numslots should be on it, in order. + + short * slot_offsets = (short*)alloca(numslots * sizeof(short)); + short * slot_lengths = (short*)alloca(numslots * sizeof(short)); + for(int i = 0; i < numslots; i++) { + slot_offsets[i] = *stasis_page(slotted_slot_cptr)(page, i); + slot_lengths[i] = *stasis_page(slotted_slot_length_cptr)(page, i); + } + + short foundEndOfList = 0; + + if(freelist != INVALID_SLOT) { + assert(slot_offsets[freelist] == INVALID_SLOT); + } else { + foundEndOfList = 1; + } + + for(short i = 0; i < numslots; i++) { + const short slot_length = slot_lengths[i]; + const short slot_offset = slot_offsets[i]; + if(slot_offset == INVALID_SLOT) { + if(slot_length == INVALID_SLOT) { + assert(!foundEndOfList); + foundEndOfList = 1; + } else { + assert (slot_offsets[slot_length] == INVALID_SLOT); + } + } else { + assert(slot_offset + slot_length <= freespace); + } + } + + // Is the free list terminated? + assert(foundEndOfList); + +#ifdef SLOTTED_PAGE_CHECK_FOR_OVERLAP + + // Page dummy; + + //dummy.id = -1; + //dummy.memAddr = 0; + + const byte UNUSED = 0xFF; + const byte PAGE_HEADER = 0xFE; + const byte SLOTTED_HEADER = 0xFD; + // const byte SLOT_LIST = 0xFC; + const byte FREE_SPACE = 0xFB; + + const unsigned short S_SLOT_LIST = 0xFCFC; + + byte image[PAGE_SIZE]; + for(unsigned short i = 0; i < PAGE_SIZE; i++) { + image[i] = UNUSED; + } + for(unsigned short i = USABLE_SIZE_OF_PAGE; i < PAGE_SIZE; i++) { + image[i] = PAGE_HEADER; + } + for(unsigned short i = USABLE_SIZE_OF_PAGE - SLOTTED_PAGE_HEADER_OVERHEAD; i < USABLE_SIZE_OF_PAGE; i++) { + image[i] = SLOTTED_HEADER; + } + for(unsigned short i = *stasis_page(slotted_freespace_cptr)(page); i < slotListStart; i++) { + image[i] = FREE_SPACE; + } + + for(unsigned short i = 0; i < *stasis_page(slotted_numslots_cptr)(page); i++) { + *stasis_mempage_slotted_slot_ptr(image, i) = S_SLOT_LIST; + *stasis_mempage_slotted_slot_length_ptr(image, i) = S_SLOT_LIST; + } + for(unsigned short i = 0; i < *stasis_page(slotted_numslots_cptr)(page); i++) { + short slot_offset = *stasis_page(slotted_slot_cptr)(page, i); + if(slot_offset != INVALID_SLOT) { + const unsigned char ci = i % 0xFF; + short slot_len = stasis_record_type_to_size(*stasis_page(slotted_slot_length_cptr)(page, i)); + + for(unsigned short j = 0; j < slot_len; j++) { + assert(image[slot_offset + j] == 0xFF); + image[slot_offset + j] = ci; + } + } + } +#endif // SLOTTED_PAGE_CHECK_FOR_OVERLAP +#endif // SLOTTED_PAGE_SANITY_CHECKS + +} + +/** + Check to see how many bytes can fit in a given slot. This + makes it possible for callers to guarantee the safety + of a subsequent call to really_do_ralloc(). + + This call can return negative numbers. This lets it differentiate between + situations in which a zero-byte record would fit, and when it would not. +*/ +static inline ssize_t stasis_page(slotted_freespace_for_slot)(PAGE * page, slotid_t slot) { + ssize_t slotOverhead; + + if(slot == INVALID_SLOT) { + slotOverhead = (*stasis_page(slotted_freelist_cptr)(page) == INVALID_SLOT) ? SLOTTED_PAGE_OVERHEAD_PER_RECORD : 0; + } else if(slot < *stasis_page(slotted_numslots_cptr)(page)) { + slotOverhead = 0; + } else { + // slotOverhead = SLOTTED_PAGE_OVERHEAD_PER_RECORD * (*numslots_ptr(page) - slot); + slotOverhead = SLOTTED_PAGE_OVERHEAD_PER_RECORD * ((slot+1) - *stasis_page(slotted_numslots_cptr)(page)); + } + // end_of_free_space points to the beginning of the slot header at the bottom of the page header. + byte* end_of_free_space = (byte*)stasis_page(slotted_slot_length_cptr)(page, (*stasis_page(slotted_numslots_cptr)(page))-1); + + // start_of_free_space points to the first unallocated byte in the page + // (ignoring space that could be reclaimed by compaction) + const byte* start_of_free_space = (stasis_page(byte_cptr_from_start)(page, 0) + *stasis_page(slotted_freespace_cptr)(page)); + + assert(end_of_free_space >= start_of_free_space); + + if(end_of_free_space < start_of_free_space + slotOverhead) { + // The regions would overlap after allocation. There is no free space. + return 0; + } else { + // The regions would not overlap. There might be free space. + return (ssize_t) (end_of_free_space - start_of_free_space - slotOverhead); + } +} + +static inline void stasis_page(slotted_compact)(PAGE * page) { + byte buffer[PAGE_SIZE]; + + // Copy external headers into bufPage. + + memcpy(&buffer[USABLE_SIZE_OF_PAGE], &(stasis_page(memaddr)(page)[USABLE_SIZE_OF_PAGE]), PAGE_SIZE - USABLE_SIZE_OF_PAGE); + + // Now, build new slotted page in the bufPage struct. + + *stasis_mempage_slotted_freespace_ptr(buffer) = 0; + // numslots_ptr will be set later. + *stasis_mempage_slotted_freelist_ptr(buffer) = INVALID_SLOT; + + const short numSlots = *stasis_page(slotted_numslots_cptr)(page); + short lastFreeSlot = INVALID_SLOT; + short lastFreeSlotBeforeUsedSlot = INVALID_SLOT; + short lastUsedSlot = -1; + + // Rebuild free list. + + for(short i = 0; i < numSlots; i++) { + if(*stasis_page(slotted_slot_cptr)(page, i) == INVALID_SLOT) { + if(lastFreeSlot == INVALID_SLOT) { + *stasis_mempage_slotted_freelist_ptr(buffer) = i; + } else { + *stasis_mempage_slotted_slot_length_ptr(buffer, lastFreeSlot) = i; + } + *stasis_mempage_slotted_slot_ptr(buffer, i) = INVALID_SLOT; + lastFreeSlot = i; + } else { + lastUsedSlot = i; + lastFreeSlotBeforeUsedSlot = lastFreeSlot; + + short logicalSize = *stasis_page(slotted_slot_length_cptr)(page, i); + short physicalSize = stasis_record_type_to_size(logicalSize); + + memcpy(&(buffer[*stasis_mempage_slotted_freespace_ptr(buffer)]), stasis_page(slotted_record_cptr)(page, i), physicalSize); + + *stasis_mempage_slotted_slot_ptr(buffer, i) = *stasis_mempage_slotted_freespace_cptr(buffer); + *stasis_mempage_slotted_slot_length_ptr(buffer, i) = logicalSize; + + (*stasis_mempage_slotted_freespace_ptr(buffer)) += physicalSize; + + } + } + + // Truncate linked list, and update numslots_ptr. + *stasis_mempage_slotted_slot_length_ptr(buffer, lastFreeSlotBeforeUsedSlot) = INVALID_SLOT; + *stasis_mempage_slotted_numslots_ptr(buffer) = lastUsedSlot+1; + + memcpy(stasis_page(memaddr)(page), buffer, PAGE_SIZE); +#ifdef SLOTTED_PAGE_OLD_CHECKS + stasis_page(slotted_fsck)(page); +#endif // SLOTTED_PAGE_OLD_CHECKS +} + + +static inline void stasis_page(slotted_compact_slot_ids)(PAGE * p) { + int16_t numSlots = *stasis_page(slotted_numslots_cptr)(p); + int16_t out = 0; + for(int16_t in = 0; in < numSlots; in++) { + if(*stasis_page(slotted_slot_cptr)(p, in) == INVALID_SLOT) { + // nop + } else { + *stasis_page(slotted_slot_ptr)(p, out) = *stasis_page(slotted_slot_cptr)(p, in); + *stasis_page(slotted_slot_length_ptr)(p, out) = *stasis_page(slotted_slot_length_cptr)(p, in); + out++; + } + } + *stasis_page(slotted_numslots_ptr)(p) = out; + *stasis_page(slotted_freelist_ptr)(p) = INVALID_SLOT; +} + +static inline int stasis_page(slotted_get_type)(PAGE *p, slotid_t slot) { + if(slot >= *stasis_page(slotted_numslots_cptr)(p)) { return INVALID_SLOT; } + if(*stasis_page(slotted_slot_cptr)(p, slot) == INVALID_SLOT) { return INVALID_SLOT; } + int ret = *stasis_page(slotted_slot_length_cptr)(p, slot); + return ret >= 0 ? NORMAL_SLOT : ret; +} +static inline void stasis_page(slotted_set_type)(PAGE *p, slotid_t slot, int type) { + int old_type = *stasis_page(slotted_slot_length_cptr)(p, slot); + assert(slot < *stasis_page(slotted_numslots_cptr)(p)); + assert(old_type != INVALID_SLOT); + + if(type == NORMAL_SLOT) { + // set slot_length_ptr to the physical length. + *stasis_page(slotted_slot_length_ptr)(p, slot) = stasis_record_type_to_size(old_type); + } else { + // Changing to a special slot type; make sure doing so doesn't change + // the record size. + assert(stasis_record_type_to_size(type) == stasis_record_type_to_size(old_type)); + *stasis_page(slotted_slot_length_ptr)(p, slot) = type; + } +} +static inline int stasis_page(slotted_get_length)(PAGE *p, slotid_t slot) { + if( stasis_page(slotted_get_type)(p, slot) == INVALID_SLOT) + return INVALID_SLOT; + else + return stasis_record_type_to_size(*stasis_page(slotted_slot_length_cptr)(p, slot)); +} +static inline slotid_t stasis_page(slotted_next_record)(PAGE *p, slotid_t slot) { + short n = *stasis_page(slotted_numslots_cptr)(p); + + do { + slot++; + } while(slot < n && stasis_page(slotted_get_type)(p,slot)==INVALID_SLOT); + + return n > slot ? slot : INVALID_SLOT; +} +static inline slotid_t stasis_page(slotted_last_record)(PAGE *p) { + return (*stasis_page(slotted_numslots_cptr)(p)) - 1; +} +static inline slotid_t stasis_page(slotted_pre_alloc)(PAGE * p, int type) { + assert(type != INVALID_SLOT); + + slotid_t slot = *stasis_page(slotted_numslots_cptr)(p); + + if(*stasis_page(slotted_freelist_cptr)(p) != INVALID_SLOT) { + slot = *stasis_page(slotted_freelist_cptr)(p); + } + + // int casts are there to deal with sign vs. unsigned types + if((int)stasis_page(slotted_freespace_for_slot)(p, slot) < (int)stasis_record_type_to_size(type)) { + return INVALID_SLOT; + } else { + return slot; + } +} +static inline void stasis_page(slotted_post_alloc)(PAGE * page, slotid_t slot, short type) { + // Compact the page if we don't have enough room. + if((int)stasis_page(slotted_freespace_for_slot)(page, slot) < (int)stasis_record_type_to_size(type)) { + stasis_page(slotted_compact)(page); + + // Make sure we have enough enough free space for the new record + assert ((int)stasis_page(slotted_freespace_for_slot)(page, slot) >= (int)stasis_record_type_to_size(type)); + } + + short freeSpace = *stasis_page(slotted_freespace_cptr)(page); + + // Remove this entry from the freelist (if necessary) slottedCompact + // assumes that this does not change the order of items in the list. + // If it did, then slottedCompact could leak slot id's (or worse!) + if(slot < *stasis_page(slotted_numslots_cptr)(page) && *stasis_page(slotted_slot_cptr)(page,slot) == INVALID_SLOT) { + short next = *stasis_page(slotted_freelist_cptr)(page); + short last = INVALID_SLOT; + // special case: is the slot physically before us the predecessor? + if(slot > 0) { + if(*stasis_page(slotted_slot_length_cptr)(page, slot-1) == slot && + *stasis_page(slotted_slot_cptr)(page, slot-1) == INVALID_SLOT) { + next = slot; + last = slot-1; + } + } + while(next != INVALID_SLOT && next != slot) { + last = next; + assert(next < *stasis_page(slotted_numslots_cptr)(page)); + short next_slot_ptr = *stasis_page(slotted_slot_cptr)(page, next); + assert(next_slot_ptr == INVALID_SLOT); + next = *stasis_page(slotted_slot_length_cptr)(page, next); + } + if(next == slot) { + if(last == INVALID_SLOT) { + *stasis_page(slotted_freelist_ptr)(page) + = *stasis_page(slotted_slot_length_cptr)(page, slot); + } else { + *stasis_page(slotted_slot_length_ptr)(page, last) + = *stasis_page(slotted_slot_length_cptr)(page, slot); + } + } + } + + // Insert any slots that come between the previous numslots_ptr() + // and the slot we're allocating onto the freelist. In order to + // promote the reuse of free slot numbers, we go out of our way to make sure + // that we put them in the list in increasing order. (Note: slottedCompact's + // correctness depends on this behavior!) + + if(slot > *stasis_page(slotted_numslots_cptr)(page)) { + short lastSlot; + if(*stasis_page(slotted_freelist_cptr)(page) == INVALID_SLOT) { + short numSlots = *stasis_page(slotted_numslots_cptr)(page); + *stasis_page(slotted_freelist_ptr)(page) = numSlots; + lastSlot = numSlots; + + *stasis_page(slotted_slot_ptr)(page, lastSlot) = INVALID_SLOT; + // will set slot_length_ptr on next iteration. + + + (*stasis_page(slotted_numslots_ptr)(page))++; + } else { + lastSlot = INVALID_SLOT; + short next = *stasis_page(slotted_freelist_cptr)(page); + while(next != INVALID_SLOT) { + lastSlot = next; + next = *stasis_page(slotted_slot_length_cptr)(page, lastSlot); + assert(lastSlot < *stasis_page(slotted_numslots_cptr)(page)); + assert(*stasis_page(slotted_slot_ptr)(page, lastSlot) == INVALID_SLOT); + } + *stasis_page(slotted_slot_ptr)(page, lastSlot) = INVALID_SLOT; + + } + + // lastSlot now contains the tail of the free list. We can start adding slots to the list starting at *numslots_ptr. + + while(*stasis_page(slotted_numslots_cptr)(page) < slot) { + *stasis_page(slotted_slot_length_ptr)(page, lastSlot) = *stasis_page(slotted_numslots_cptr)(page); + lastSlot = *stasis_page(slotted_numslots_cptr)(page); + *stasis_page(slotted_slot_ptr)(page, lastSlot) = INVALID_SLOT; + (*stasis_page(slotted_numslots_ptr)(page))++; + } + + // Terminate the end of the list. + assert(lastSlot < *stasis_page(slotted_numslots_cptr)(page)); + *stasis_page(slotted_slot_length_ptr)(page, lastSlot) = INVALID_SLOT; + + } + + if(*stasis_page(slotted_numslots_cptr)(page) == slot) { + *stasis_page(slotted_numslots_ptr)(page) = slot+1; + } + + assert(*stasis_page(slotted_numslots_cptr)(page) > slot); + + DEBUG("Num slots %d\trid.slot %d\n", *stasis_page(slotted_numslots_cptr)(page), slot); + + // Reserve space for this record and record the space's offset in + // the slot header. + + assert(slot < *stasis_page(slotted_numslots_cptr)(page)); + assert(freeSpace == *stasis_page(slotted_freespace_cptr)(page)); // XXX not sure how this could not be the case. + *stasis_page(slotted_freespace_ptr)(page) = freeSpace + stasis_record_type_to_size(type); + *stasis_page(slotted_slot_ptr)(page, slot) = freeSpace; + + *stasis_page(slotted_slot_length_ptr)(page, slot) = type; + +} +static inline void stasis_page(slotted_splice_slot)(PAGE *p, slotid_t a, slotid_t b) { + if(a==b) { return; } // no-op + + if(a > b) { + slotid_t c = a; + a = b; + b = c; + } + + int16_t b_slot = *stasis_page(slotted_slot_cptr)(p, b); + int16_t b_slot_len = *stasis_page(slotted_slot_length_cptr)(p, b); + for(int16_t i = b-1; i >= a; i--) { + *stasis_page(slotted_slot_ptr)(p, i+1) = *stasis_page(slotted_slot_cptr)(p, i); + *stasis_page(slotted_slot_length_ptr)(p, i+1) = *stasis_page(slotted_slot_length_cptr)(p, i); + } + *stasis_page(slotted_slot_ptr)(p, a) = b_slot; + *stasis_page(slotted_slot_length_ptr)(p, a) = b_slot_len; +} +static inline void stasis_page(slotted_free)(PAGE * p, slotid_t slot) { + ssize_t rec_size = stasis_record_type_to_size(*stasis_page(slotted_slot_length_cptr)(p, slot)); + + if(*stasis_page(slotted_freespace_cptr)(p) == *stasis_page(slotted_slot_cptr)(p, slot) + rec_size) { + (*stasis_page(slotted_freespace_ptr)(p)) -= rec_size; + } + + assert(slot < *stasis_page(slotted_numslots_cptr)(p)); + if(slot == *stasis_page(slotted_numslots_cptr)(p)-1) { + (*stasis_page(slotted_numslots_ptr)(p))--; + assert(stasis_page(slotted_get_type)(p,slot)==INVALID_SLOT); + } else { + *stasis_page(slotted_slot_ptr)(p, slot) = INVALID_SLOT; + *stasis_page(slotted_slot_length_ptr)(p, slot) = *stasis_page(slotted_freelist_cptr)(p); + *stasis_page(slotted_freelist_ptr)(p) = slot; + assert(stasis_page(slotted_get_type)(p,slot)==INVALID_SLOT); + } +#ifdef SLOTTED_PAGE_OLD_CHECKS + stasis_page(slotted_fsck)(p); +#endif +} diff --git a/stasis/page/slotted.h b/stasis/page/slotted.h index 52e68a2..d55fdce 100644 --- a/stasis/page/slotted.h +++ b/stasis/page/slotted.h @@ -61,19 +61,6 @@ Slotted page layout: ************************************************************************/ #ifndef STASIS_PAGE_SLOTTED_H #define STASIS_PAGE_SLOTTED_H -static inline int16_t* stasis_page_slotted_freespace_ptr(Page * p) { return stasis_page_int16_ptr_from_end((p), 1); } -static inline int16_t* stasis_page_slotted_numslots_ptr(Page * p) { return stasis_page_int16_ptr_from_end(p, 2); } -static inline int16_t* stasis_page_slotted_freelist_ptr(Page * p) { return stasis_page_int16_ptr_from_end(p, 3); } -static inline int16_t* stasis_page_slotted_slot_ptr(Page * p, slotid_t n) { return stasis_page_int16_ptr_from_end(p, (2*(n))+4); } -static inline int16_t* stasis_page_slotted_slot_length_ptr(Page * p, slotid_t n) { return stasis_page_int16_ptr_from_end((p), (2*(n))+5); } -static inline byte* stasis_page_slotted_record_ptr(Page * p, slotid_t n) { return stasis_page_byte_ptr_from_start((p), *stasis_page_slotted_slot_ptr((p), (n))); } - -static inline const int16_t* stasis_page_slotted_freespace_cptr(const Page * p) { return stasis_page_slotted_freespace_ptr((Page*)p); } -static inline const int16_t* stasis_page_slotted_numslots_cptr(const Page * p) { return stasis_page_slotted_numslots_ptr((Page*)p); } -static inline const int16_t* stasis_page_slotted_freelist_cptr(const Page * p) { return stasis_page_slotted_freelist_ptr((Page*)p); } -static inline const int16_t* stasis_page_slotted_slot_cptr(const Page * p, slotid_t n) { return stasis_page_slotted_slot_ptr((Page*)p, n); } -static inline const int16_t* stasis_page_slotted_slot_length_cptr(const Page * p, slotid_t n) { return stasis_page_slotted_slot_length_ptr((Page*)p, n); } -static inline const byte* stasis_page_slotted_record_cptr(const Page * p, slotid_t n) { return stasis_page_slotted_record_ptr((Page*)p, n); } void stasis_page_slotted_init(); void stasis_page_slotted_deinit(); diff --git a/test/stasis/check_operations.c b/test/stasis/check_operations.c index 39d0ff3..b9b3882 100644 --- a/test/stasis/check_operations.c +++ b/test/stasis/check_operations.c @@ -41,6 +41,7 @@ terms specified in this license. #include "../check_includes.h" #include +#include #include #include #include @@ -497,7 +498,7 @@ START_TEST(operation_alloc_small) { } Tcommit(xid); Tdeinit(); -} END_TEST; +} END_TEST #define ARRAY_LIST_CHECK_ITER 10000 START_TEST(operation_array_list) { @@ -571,7 +572,7 @@ START_TEST(operation_lsn_free) { int xid = Tbegin(); pageid_t pid = TpageAlloc(xid); Page * p = loadPage(xid,pid); - stasis_slotted_lsn_free_initialize_page(p); + stasis_page_slotted_lsn_free_initialize_page(p); // XXX hack! byte * old = malloc(PAGE_SIZE); memcpy(old, p->memAddr, PAGE_SIZE); @@ -635,7 +636,7 @@ START_TEST(operation_reorderable) { int xid = Tbegin(); pageid_t pid = TpageAlloc(xid); Page * p = loadPage(xid,pid); - stasis_slotted_lsn_free_initialize_page(p); + stasis_page_slotted_lsn_free_initialize_page(p); // XXX hack! byte * old = malloc(PAGE_SIZE); memcpy(old, p->memAddr, PAGE_SIZE); @@ -772,7 +773,7 @@ static int op_test_redo_impl(const LogEntry * e, Page * p) { Page * p = loadPage(e->xid, a->start + i); if(stasis_operation_multi_should_apply(e, p)) { writelock(p->rwlatch, 0); - stasis_fixed_initialize_page(p, sizeof(i), 1); + stasis_page_fixed_initialize_page(p, sizeof(i), 1); recordid rid = { p->id, 0, sizeof(i) }; stasis_record_write(e->xid, p, rid, (byte*)&i); stasis_page_lsn_write(e->xid, p, e->LSN); diff --git a/test/stasis/check_page.c b/test/stasis/check_page.c index 06e3854..ce900ad 100644 --- a/test/stasis/check_page.c +++ b/test/stasis/check_page.c @@ -48,6 +48,7 @@ terms specified in this license. #include #include +#include #include #include #include @@ -429,7 +430,7 @@ START_TEST(pageRecordSizeTypeIteratorTest) { p = loadPage(xid,pid); memset(p->memAddr, 0, PAGE_SIZE); - stasis_fixed_initialize_page(p,sizeof(int64_t),0); + stasis_page_fixed_initialize_page(p,sizeof(int64_t),0); checkPageIterators(xid,p,10); @@ -542,7 +543,7 @@ START_TEST(fixedPageThreadTest) { Tinit(); Page * p = loadPage(-1, 2); memset(p->memAddr, 0, PAGE_SIZE); - stasis_fixed_initialize_page(p, sizeof(int), 0); + stasis_page_fixed_initialize_page(p, sizeof(int), 0); p->LSN = 0; *stasis_page_lsn_ptr(p) = p->LSN;