New (more thorough, less frequent) tests for slotted pages. Renamed slotted methods, added microbenchmark for slotted pages.

This commit is contained in:
Sears Russell 2009-07-20 17:42:43 +00:00
parent 9ef3adf40c
commit 7f2edd880d
12 changed files with 295 additions and 179 deletions

View file

@ -30,3 +30,4 @@ CREATE_EXECUTABLE(distributedLsnFree)
IF(CHECK_LIBRARY) IF(CHECK_LIBRARY)
ADD_TEST(rose rose) ADD_TEST(rose rose)
ENDIF(CHECK_LIBRARY) ENDIF(CHECK_LIBRARY)
CREATE_EXECUTABLE(slotted)

115
benchmarks/slotted.c Normal file
View file

@ -0,0 +1,115 @@
/*
* slotted.c
*
* Created on: Jul 15, 2009
* Author: sears
*/
#include <string.h>
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <stasis/transactional.h>
int main(int argc, char* argv[]) {
unlink("storefile.txt");
unlink("logfile.txt");
int i = 1;
// Should we pin the page once per run?
int mode = 0;
// How many operations?
unsigned long long count = 1000000;
while(i != argc) {
if(!strcmp(argv[i], "--mode")) {
i++;
assert(i != argc);
char * endptr;
mode = strtoull(argv[i], &endptr, 10);
assert(!*endptr);
i++;
} else if(!strcmp(argv[i], "--count")) {
i++;
assert(i != argc);
char * endptr;
count = strtoull(argv[i], &endptr, 10);
assert(!*endptr);
i++;
} else {
abort();
}
}
printf("%d %lld\n", mode, count);
Tinit();
int xid = Tbegin();
pageid_t page = TpageAlloc(xid);
if(mode == 0) {
Page * p = loadPage(xid, page);
writelock(p->rwlatch, 0);
stasis_page_slotted_initialize_page(p);
recordid rid = stasis_record_alloc_begin(xid, p, sizeof(uint64_t));
stasis_record_alloc_done(xid, p, rid);
for(unsigned long long i = 0; i < count; i++) {
uint64_t val = i;
stasis_record_write(xid, p, rid, (const byte*)&val);
}
unlock(p->rwlatch);
releasePage(p);
} else if(mode == 1) {
Page * p = loadPage(xid, page);
writelock(p->rwlatch, 0);
stasis_page_slotted_initialize_page(p);
recordid rid = stasis_record_alloc_begin(xid, p, sizeof(uint64_t));
stasis_record_alloc_done(xid, p, rid);
unlock(p->rwlatch);
for(unsigned long long i = 0; i < count; i++) {
writelock(p->rwlatch, 0);
uint64_t val = i;
stasis_record_write(xid, p, rid, (const byte*)&val);
unlock(p->rwlatch);
}
releasePage(p);
} else if(mode == 2) {
Page * p = loadPage(xid, page);
writelock(p->rwlatch, 0);
stasis_page_slotted_initialize_page(p);
recordid rid = stasis_record_alloc_begin(xid, p, sizeof(uint64_t));
stasis_record_alloc_done(xid, p, rid);
unlock(p->rwlatch);
releasePage(p);
for(unsigned long long i = 0; i < count; i++) {
Page * p = loadPage(xid, page);
writelock(p->rwlatch, 0);
uint64_t val = i;
stasis_record_write(xid, p, rid, (const byte*)&val);
unlock(p->rwlatch);
releasePage(p);
}
} else if(mode == 3) {
recordid rid = Talloc(xid, sizeof(uint64_t));
for(unsigned long long i = 0; i < count; i++) {
uint64_t val = i;
Tset(xid, rid, &val);
}
} else {
abort();
}
Tcommit(xid);
Tdeinit();
return 0;
}

View file

@ -206,7 +206,7 @@ static int op_initialize_page(const LogEntry* e, Page* p) {
switch(arg->slot) { switch(arg->slot) {
case SLOTTED_PAGE: case SLOTTED_PAGE:
stasis_slotted_initialize_page(p); stasis_page_slotted_initialize_page(p);
break; break;
case FIXED_PAGE: case FIXED_PAGE:
stasis_fixed_initialize_page(p, arg->type, stasis_fixed_initialize_page(p, arg->type,

View file

@ -23,7 +23,7 @@ static void TdeallocBoundaryTag(int xid, pageid_t page);
top action's logical undo grabs the necessary latches. top action's logical undo grabs the necessary latches.
*/ */
static int op_alloc_boundary_tag(const LogEntry* e, Page* p) { static int op_alloc_boundary_tag(const LogEntry* e, Page* p) {
stasis_slotted_initialize_page(p); stasis_page_slotted_initialize_page(p);
recordid rid = {p->id, 0, sizeof(boundary_tag)}; recordid rid = {p->id, 0, sizeof(boundary_tag)};
assert(e->update.arg_size == sizeof(boundary_tag)); assert(e->update.arg_size == sizeof(boundary_tag));
p->pageType = BOUNDARY_TAG_PAGE; p->pageType = BOUNDARY_TAG_PAGE;

View file

@ -116,12 +116,12 @@ lsn_t stasis_page_lsn_read(const Page * page) {
*/ */
void stasis_page_init(stasis_dirty_page_table_t * dpt) { void stasis_page_init(stasis_dirty_page_table_t * dpt) {
dirtyPages = dpt; dirtyPages = dpt;
slottedPageInit(); stasis_page_slotted_init();
fixedPageInit(); fixedPageInit();
stasis_page_impl_register(slottedImpl()); stasis_page_impl_register(stasis_page_slotted_impl());
stasis_page_impl_register(fixedImpl()); stasis_page_impl_register(fixedImpl());
stasis_page_impl_register(boundaryTagImpl()); stasis_page_impl_register(stasis_page_boundary_tag_impl());
stasis_page_impl_register(arrayListImpl()); stasis_page_impl_register(arrayListImpl());
stasis_page_impl_register(stasis_page_blob_impl()); stasis_page_impl_register(stasis_page_blob_impl());
stasis_page_impl_register(lsmRootImpl()); stasis_page_impl_register(lsmRootImpl());
@ -137,7 +137,7 @@ void stasis_page_deinit() {
} }
fixedPageDeinit(); fixedPageDeinit();
slottedPageDeinit(); stasis_page_slotted_deinit();
} }
int stasis_page_impl_register(page_impl p) { int stasis_page_impl_register(page_impl p) {

View file

@ -3,7 +3,7 @@
#include <stasis/logger/logger2.h> #include <stasis/logger/logger2.h>
void stasis_slotted_lsn_free_initialize_page(Page * p) { void stasis_slotted_lsn_free_initialize_page(Page * p) {
stasis_slotted_initialize_page(p); stasis_page_slotted_initialize_page(p);
p->pageType = SLOTTED_LSN_FREE_PAGE; p->pageType = SLOTTED_LSN_FREE_PAGE;
} }
// XXX still not correct; need to have an "LSN_FREE" constant. // XXX still not correct; need to have an "LSN_FREE" constant.
@ -13,7 +13,7 @@ static void lsnFreeLoaded(Page * p) {
static void lsnFreeFlushed(Page * p) { } static void lsnFreeFlushed(Page * p) { }
page_impl slottedLsnFreeImpl() { page_impl slottedLsnFreeImpl() {
page_impl pi = slottedImpl(); page_impl pi = stasis_page_slotted_impl();
pi.page_type = SLOTTED_LSN_FREE_PAGE; pi.page_type = SLOTTED_LSN_FREE_PAGE;
pi.pageLoaded = lsnFreeLoaded; pi.pageLoaded = lsnFreeLoaded;
pi.pageLoaded = lsnFreeFlushed; pi.pageLoaded = lsnFreeFlushed;

View file

@ -3,12 +3,23 @@
#include <stasis/page/slotted.h> #include <stasis/page/slotted.h>
#include <assert.h> #include <assert.h>
#define SLOTTED_PAGE_OVERHEAD_PER_RECORD (2 * sizeof(short))
#define SLOTTED_PAGE_HEADER_OVERHEAD (3 * sizeof(short))
#ifdef LONG_TEST
#define SLOTTED_PAGE_CHECK_FOR_OVERLAP 1
#endif
// plan: slotted fsck on read / write. Make it more thorough so that the other methods only check for record existence. // plan: slotted fsck on read / write. Make it more thorough so that the other methods only check for record existence.
// intermediate ops assume that slotted.c is correctly implemented (ie: fsck passes iff page is OK, page always "stays" OK) // intermediate ops assume that slotted.c is correctly implemented (ie: fsck passes iff page is OK, page always "stays" OK)
// stop calloing slotted fsck on each operation. This should speed things up a *lot*.
// benchmark page ops (real time) + hash table (user time) // benchmark page ops (real time) + hash table (user time)
//#define SLOTTED_PAGE_OLD_CHECKS
#define SLOTTED_PAGE_NEW_CHECKS
#ifdef SLOTTED_PAGE_NEW_CHECKS
#define SLOTTED_PAGE_SANITY_CHECKS
#define SLOTTED_PAGE_CHECK_FOR_OVERLAP
#endif
#include <stasis/truncation.h> #include <stasis/truncation.h>
/** /**
@ -28,11 +39,11 @@ static inline void slottedFsck(const Page const * page) {
dummy.memAddr = 0; dummy.memAddr = 0;
const short page_type = page->pageType; const short page_type = page->pageType;
const short numslots = *numslots_cptr(page); const short numslots = *stasis_page_slotted_numslots_cptr(page);
const short freespace = *freespace_cptr(page); const short freespace = *stasis_page_slotted_freespace_cptr(page);
const short freelist = *freelist_cptr(page); const short freelist = *stasis_page_slotted_freelist_cptr(page);
const long slotListStart = (long)slot_length_ptr(&dummy, numslots-1); const long slotListStart = (long)stasis_page_slotted_slot_length_ptr(&dummy, numslots-1);
assert(slotListStart < PAGE_SIZE && slotListStart >= 0); assert(slotListStart < PAGE_SIZE && slotListStart >= 0);
assert(page_type == SLOTTED_PAGE || assert(page_type == SLOTTED_PAGE ||
page_type == BOUNDARY_TAG_PAGE || page_type == BOUNDARY_TAG_PAGE ||
@ -52,8 +63,8 @@ static inline void slottedFsck(const Page const * page) {
short * slot_offsets = alloca(numslots * sizeof(short)); short * slot_offsets = alloca(numslots * sizeof(short));
short * slot_lengths = alloca(numslots * sizeof(short)); short * slot_lengths = alloca(numslots * sizeof(short));
for(int i = 0; i < numslots; i++) { for(int i = 0; i < numslots; i++) {
slot_offsets[i] = *slot_ptr(page, i); slot_offsets[i] = *stasis_page_slotted_slot_cptr(page, i);
slot_lengths[i] = *slot_length_ptr(page, i); slot_lengths[i] = *stasis_page_slotted_slot_length_cptr(page, i);
} }
short foundEndOfList = 0; short foundEndOfList = 0;
@ -102,21 +113,21 @@ static inline void slottedFsck(const Page const * page) {
for(short i = USABLE_SIZE_OF_PAGE - SLOTTED_PAGE_HEADER_OVERHEAD; i < USABLE_SIZE_OF_PAGE; i++) { for(short i = USABLE_SIZE_OF_PAGE - SLOTTED_PAGE_HEADER_OVERHEAD; i < USABLE_SIZE_OF_PAGE; i++) {
image[i] = SLOTTED_HEADER; image[i] = SLOTTED_HEADER;
} }
for(short i = *freespace_ptr(page); i < slotListStart; i++) { for(short i = *stasis_page_slotted_freespace_cptr(page); i < slotListStart; i++) {
image[i] = FREE_SPACE; image[i] = FREE_SPACE;
} }
dummy.memAddr = image; dummy.memAddr = image;
for(short i = 0; i < *numslots_ptr(page); i++) { for(short i = 0; i < *stasis_page_slotted_numslots_cptr(page); i++) {
*slot_ptr(&dummy, i) = S_SLOT_LIST; *stasis_page_slotted_slot_ptr(&dummy, i) = S_SLOT_LIST;
*slot_length_ptr(&dummy, i) = S_SLOT_LIST; *stasis_page_slotted_slot_length_ptr(&dummy, i) = S_SLOT_LIST;
} }
for(short i = 0; i < *numslots_ptr(page); i++) { for(short i = 0; i < *stasis_page_slotted_numslots_cptr(page); i++) {
short slot_offset = *slot_ptr(page, i); short slot_offset = *stasis_page_slotted_slot_cptr(page, i);
if(slot_offset != INVALID_SLOT) { if(slot_offset != INVALID_SLOT) {
const unsigned char ci = i % 0xFF; const unsigned char ci = i % 0xFF;
short slot_len = stasis_record_type_to_size(*slot_length_ptr(page, i)); short slot_len = stasis_record_type_to_size(*stasis_page_slotted_slot_length_cptr(page, i));
for(short j = 0; j < slot_len; j++) { for(short j = 0; j < slot_len; j++) {
assert(image[slot_offset + j] == 0xFF); assert(image[slot_offset + j] == 0xFF);
@ -149,11 +160,11 @@ static void slottedCompact(Page * page) {
// Now, build new slotted page in the bufPage struct. // Now, build new slotted page in the bufPage struct.
*freespace_ptr(&bufPage) = 0; *stasis_page_slotted_freespace_ptr(&bufPage) = 0;
// numslots_ptr will be set later. // numslots_ptr will be set later.
*freelist_ptr(&bufPage) = INVALID_SLOT; *stasis_page_slotted_freelist_ptr(&bufPage) = INVALID_SLOT;
const short numSlots = *numslots_ptr(page); const short numSlots = *stasis_page_slotted_numslots_ptr(page);
short lastFreeSlot = INVALID_SLOT; short lastFreeSlot = INVALID_SLOT;
short lastFreeSlotBeforeUsedSlot = INVALID_SLOT; short lastFreeSlotBeforeUsedSlot = INVALID_SLOT;
short lastUsedSlot = -1; short lastUsedSlot = -1;
@ -161,58 +172,39 @@ static void slottedCompact(Page * page) {
// Rebuild free list. // Rebuild free list.
for(short i = 0; i < numSlots; i++) { for(short i = 0; i < numSlots; i++) {
if(*slot_ptr(page, i) == INVALID_SLOT) { if(*stasis_page_slotted_slot_ptr(page, i) == INVALID_SLOT) {
if(lastFreeSlot == INVALID_SLOT) { if(lastFreeSlot == INVALID_SLOT) {
*freelist_ptr(&bufPage) = i; *stasis_page_slotted_freelist_ptr(&bufPage) = i;
} else { } else {
*slot_length_ptr(&bufPage, lastFreeSlot) = i; *stasis_page_slotted_slot_length_ptr(&bufPage, lastFreeSlot) = i;
} }
*slot_ptr(&bufPage, i) = INVALID_SLOT; *stasis_page_slotted_slot_ptr(&bufPage, i) = INVALID_SLOT;
lastFreeSlot = i; lastFreeSlot = i;
} else { } else {
lastUsedSlot = i; lastUsedSlot = i;
lastFreeSlotBeforeUsedSlot = lastFreeSlot; lastFreeSlotBeforeUsedSlot = lastFreeSlot;
short logicalSize = *slot_length_ptr(page, i); short logicalSize = *stasis_page_slotted_slot_length_ptr(page, i);
short physicalSize = stasis_record_type_to_size(logicalSize); short physicalSize = stasis_record_type_to_size(logicalSize);
memcpy(&(buffer[*freespace_ptr(&bufPage)]), record_ptr(page, i), physicalSize); memcpy(&(buffer[*stasis_page_slotted_freespace_ptr(&bufPage)]), stasis_page_slotted_record_ptr(page, i), physicalSize);
*slot_ptr(&bufPage, i) = *freespace_ptr(&bufPage); *stasis_page_slotted_slot_ptr(&bufPage, i) = *stasis_page_slotted_freespace_ptr(&bufPage);
*slot_length_ptr(&bufPage, i) = logicalSize; *stasis_page_slotted_slot_length_ptr(&bufPage, i) = logicalSize;
(*freespace_ptr(&bufPage)) += physicalSize; (*stasis_page_slotted_freespace_ptr(&bufPage)) += physicalSize;
} }
} }
// Truncate linked list, and update numslots_ptr. // Truncate linked list, and update numslots_ptr.
*slot_length_ptr(&bufPage, lastFreeSlotBeforeUsedSlot) = INVALID_SLOT; *stasis_page_slotted_slot_length_ptr(&bufPage, lastFreeSlotBeforeUsedSlot) = INVALID_SLOT;
*numslots_ptr(&bufPage) = lastUsedSlot+1; *stasis_page_slotted_numslots_ptr(&bufPage) = lastUsedSlot+1;
memcpy(page->memAddr, buffer, PAGE_SIZE); memcpy(page->memAddr, buffer, PAGE_SIZE);
#ifdef SLOTTED_PAGE_OLD_CHECKS
slottedFsck(page); slottedFsck(page);
#endif // SLOTTED_PAGE_OLD_CHECKS
}
void slottedPageInit() {
#ifdef SLOTTED_PAGE_CHECK_FOR_OVERLAP
printf("slotted.c: Using expensive page sanity checking.\n");
#endif
}
void slottedPageDeinit() {
}
void stasis_slotted_initialize_page(Page * page) {
assertlocked(page->rwlatch);
stasis_page_cleanup(page);
page->pageType = SLOTTED_PAGE;
*freespace_ptr(page) = 0;
*numslots_ptr(page) = 0;
*freelist_ptr(page) = INVALID_SLOT;
} }
/** /**
@ -225,19 +217,19 @@ static size_t slottedFreespaceForSlot(Page * page, int slot) {
size_t slotOverhead; size_t slotOverhead;
if(slot == INVALID_SLOT) { if(slot == INVALID_SLOT) {
slotOverhead = (*freelist_ptr(page) == INVALID_SLOT) ? SLOTTED_PAGE_OVERHEAD_PER_RECORD : 0; slotOverhead = (*stasis_page_slotted_freelist_ptr(page) == INVALID_SLOT) ? SLOTTED_PAGE_OVERHEAD_PER_RECORD : 0;
} else if(slot < *numslots_ptr(page)) { } else if(slot < *stasis_page_slotted_numslots_ptr(page)) {
slotOverhead = 0; slotOverhead = 0;
} else { } else {
// slotOverhead = SLOTTED_PAGE_OVERHEAD_PER_RECORD * (*numslots_ptr(page) - slot); // slotOverhead = SLOTTED_PAGE_OVERHEAD_PER_RECORD * (*numslots_ptr(page) - slot);
slotOverhead = SLOTTED_PAGE_OVERHEAD_PER_RECORD * ((slot+1) - *numslots_ptr(page)); slotOverhead = SLOTTED_PAGE_OVERHEAD_PER_RECORD * ((slot+1) - *stasis_page_slotted_numslots_ptr(page));
} }
// end_of_free_space points to the beginning of the slot header at the bottom of the page header. // end_of_free_space points to the beginning of the slot header at the bottom of the page header.
byte* end_of_free_space = (byte*)slot_length_ptr(page, (*numslots_ptr(page))-1); byte* end_of_free_space = (byte*)stasis_page_slotted_slot_length_ptr(page, (*stasis_page_slotted_numslots_ptr(page))-1);
// start_of_free_space points to the first unallocated byte in the page // start_of_free_space points to the first unallocated byte in the page
// (ignoring space that could be reclaimed by compaction) // (ignoring space that could be reclaimed by compaction)
byte* start_of_free_space = (byte*)(page->memAddr + *freespace_ptr(page)); byte* start_of_free_space = (byte*)(page->memAddr + *stasis_page_slotted_freespace_ptr(page));
assert(end_of_free_space >= start_of_free_space); assert(end_of_free_space >= start_of_free_space);
@ -282,33 +274,33 @@ static void really_do_ralloc(Page * page, recordid rid) {
assert (slottedFreespaceForSlot(page, rid.slot) >= stasis_record_type_to_size(rid.size)); assert (slottedFreespaceForSlot(page, rid.slot) >= stasis_record_type_to_size(rid.size));
} }
freeSpace = *freespace_ptr(page); freeSpace = *stasis_page_slotted_freespace_ptr(page);
// Remove this entry from the freelist (if necessary) slottedCompact // Remove this entry from the freelist (if necessary) slottedCompact
// assumes that this does not change the order of items in the list. // assumes that this does not change the order of items in the list.
// If it did, then slottedCompact could leaks slot id's (or worse!) // If it did, then slottedCompact could leaks slot id's (or worse!)
if(rid.slot < *numslots_ptr(page) && *slot_ptr(page,rid.slot) == INVALID_SLOT) { if(rid.slot < *stasis_page_slotted_numslots_ptr(page) && *stasis_page_slotted_slot_ptr(page,rid.slot) == INVALID_SLOT) {
short next = *freelist_ptr(page); short next = *stasis_page_slotted_freelist_ptr(page);
short last = INVALID_SLOT; short last = INVALID_SLOT;
// special case: is the slot physically before us the predecessor? // special case: is the slot physically before us the predecessor?
if(rid.slot > 0) { if(rid.slot > 0) {
if(*slot_length_ptr(page, rid.slot-1) == rid.slot && *slot_ptr(page, rid.slot-1) == INVALID_SLOT) { if(*stasis_page_slotted_slot_length_ptr(page, rid.slot-1) == rid.slot && *stasis_page_slotted_slot_ptr(page, rid.slot-1) == INVALID_SLOT) {
next = rid.slot; next = rid.slot;
last = rid.slot-1; last = rid.slot-1;
} }
} }
while(next != INVALID_SLOT && next != rid.slot) { while(next != INVALID_SLOT && next != rid.slot) {
last = next; last = next;
assert(next < *numslots_ptr(page)); assert(next < *stasis_page_slotted_numslots_ptr(page));
short next_slot_ptr = *slot_ptr(page, next); short next_slot_ptr = *stasis_page_slotted_slot_ptr(page, next);
assert(next_slot_ptr == INVALID_SLOT); assert(next_slot_ptr == INVALID_SLOT);
next = *slot_length_ptr(page, next); next = *stasis_page_slotted_slot_length_ptr(page, next);
} }
if(next == rid.slot) { if(next == rid.slot) {
if(last == INVALID_SLOT) { if(last == INVALID_SLOT) {
*freelist_ptr(page) = *slot_length_ptr(page, rid.slot); *stasis_page_slotted_freelist_ptr(page) = *stasis_page_slotted_slot_length_ptr(page, rid.slot);
} else { } else {
*slot_length_ptr(page, last) = *slot_length_ptr(page, rid.slot); *stasis_page_slotted_slot_length_ptr(page, last) = *stasis_page_slotted_slot_length_ptr(page, rid.slot);
} }
} }
} }
@ -320,63 +312,63 @@ static void really_do_ralloc(Page * page, recordid rid) {
// correctness depends on this behavior!) // correctness depends on this behavior!)
if(rid.slot > *numslots_ptr(page)) { if(rid.slot > *stasis_page_slotted_numslots_ptr(page)) {
short lastSlot; short lastSlot;
short numSlots = *numslots_ptr(page); short numSlots = *stasis_page_slotted_numslots_ptr(page);
if(*freelist_ptr(page) == INVALID_SLOT) { if(*stasis_page_slotted_freelist_ptr(page) == INVALID_SLOT) {
*freelist_ptr(page) = numSlots; *stasis_page_slotted_freelist_ptr(page) = numSlots;
lastSlot = numSlots; lastSlot = numSlots;
*slot_ptr(page, lastSlot) = INVALID_SLOT; *stasis_page_slotted_slot_ptr(page, lastSlot) = INVALID_SLOT;
// will set slot_length_ptr on next iteration. // will set slot_length_ptr on next iteration.
(*numslots_ptr(page))++; (*stasis_page_slotted_numslots_ptr(page))++;
} else { } else {
lastSlot = INVALID_SLOT; lastSlot = INVALID_SLOT;
short next = *freelist_ptr(page); short next = *stasis_page_slotted_freelist_ptr(page);
while(next != INVALID_SLOT) { while(next != INVALID_SLOT) {
lastSlot = next; lastSlot = next;
next = *slot_length_ptr(page, lastSlot); next = *stasis_page_slotted_slot_length_ptr(page, lastSlot);
assert(lastSlot < *numslots_ptr(page)); assert(lastSlot < *stasis_page_slotted_numslots_ptr(page));
assert(*slot_ptr(page, lastSlot) == INVALID_SLOT); assert(*stasis_page_slotted_slot_ptr(page, lastSlot) == INVALID_SLOT);
} }
*slot_ptr(page, lastSlot) = INVALID_SLOT; *stasis_page_slotted_slot_ptr(page, lastSlot) = INVALID_SLOT;
} }
// lastSlot now contains the tail of the free list. We can start adding slots to the list starting at *numslots_ptr. // lastSlot now contains the tail of the free list. We can start adding slots to the list starting at *numslots_ptr.
while(*numslots_ptr(page) < rid.slot) { while(*stasis_page_slotted_numslots_ptr(page) < rid.slot) {
*slot_length_ptr(page, lastSlot) = *numslots_ptr(page); *stasis_page_slotted_slot_length_ptr(page, lastSlot) = *stasis_page_slotted_numslots_ptr(page);
lastSlot = *numslots_ptr(page); lastSlot = *stasis_page_slotted_numslots_ptr(page);
*slot_ptr(page, lastSlot) = INVALID_SLOT; *stasis_page_slotted_slot_ptr(page, lastSlot) = INVALID_SLOT;
(*numslots_ptr(page))++; (*stasis_page_slotted_numslots_ptr(page))++;
} }
// Terminate the end of the list. // Terminate the end of the list.
assert(lastSlot < *numslots_ptr(page)); assert(lastSlot < *stasis_page_slotted_numslots_ptr(page));
*slot_length_ptr(page, lastSlot) = INVALID_SLOT; *stasis_page_slotted_slot_length_ptr(page, lastSlot) = INVALID_SLOT;
} }
if(*numslots_ptr(page) == rid.slot) { if(*stasis_page_slotted_numslots_ptr(page) == rid.slot) {
*numslots_ptr(page) = rid.slot+1; *stasis_page_slotted_numslots_ptr(page) = rid.slot+1;
} }
assert(*numslots_ptr(page) > rid.slot); assert(*stasis_page_slotted_numslots_ptr(page) > rid.slot);
DEBUG("Num slots %d\trid.slot %d\n", *numslots_ptr(page), rid.slot); DEBUG("Num slots %d\trid.slot %d\n", *stasis_page_slotted_numslots_ptr(page), rid.slot);
// Reserve space for this record and record the space's offset in // Reserve space for this record and record the space's offset in
// the slot header. // the slot header.
assert(rid.slot < *numslots_ptr(page)); assert(rid.slot < *stasis_page_slotted_numslots_ptr(page));
*freespace_ptr(page) = freeSpace + stasis_record_type_to_size(rid.size); *stasis_page_slotted_freespace_ptr(page) = freeSpace + stasis_record_type_to_size(rid.size);
*slot_ptr(page, rid.slot) = freeSpace; *stasis_page_slotted_slot_ptr(page, rid.slot) = freeSpace;
*slot_length_ptr(page, rid.slot) = rid.size; *stasis_page_slotted_slot_length_ptr(page, rid.slot) = rid.size;
} }
@ -385,68 +377,74 @@ static void really_do_ralloc(Page * page, recordid rid) {
// -------------------------------------------------------------------------- // --------------------------------------------------------------------------
static inline void sanityCheck(Page * p, recordid rid) { static inline void sanityCheck(Page * p, recordid rid) {
#ifdef SLOTTED_PAGE_OLD_CHECKS
assert(p->id == rid.page); assert(p->id == rid.page);
assert(rid.size < BLOB_THRESHOLD_SIZE); // Caller deals with this now! assert(rid.size < BLOB_THRESHOLD_SIZE); // Caller deals with this now!
slottedFsck(p); slottedFsck(p);
#endif
} }
static const byte* slottedRead (int xid, Page *p, recordid rid) { static const byte* slottedRead (int xid, Page *p, recordid rid) {
sanityCheck(p, rid); sanityCheck(p, rid);
return record_ptr(p, rid.slot); return stasis_page_slotted_record_ptr(p, rid.slot);
} }
static byte* slottedWrite(int xid, Page *p, recordid rid) { static byte* slottedWrite(int xid, Page *p, recordid rid) {
sanityCheck(p, rid); sanityCheck(p, rid);
return record_ptr(p, rid.slot); return stasis_page_slotted_record_ptr(p, rid.slot);
} }
static int slottedGetType(int xid, Page *p, recordid rid) { static int slottedGetType(int xid, Page *p, recordid rid) {
#ifdef SLOTTED_PAGE_OLD_CHECKS
//sanityCheck(p, rid); <-- Would fail if rid.size is a blob //sanityCheck(p, rid); <-- Would fail if rid.size is a blob
assert(p->id == rid.page); assert(p->id == rid.page);
slottedFsck(p); slottedFsck(p);
if(rid.slot >= *numslots_ptr(p)) { return INVALID_SLOT; } #endif
if(*slot_ptr(p, rid.slot) == INVALID_SLOT) { return INVALID_SLOT; } if(rid.slot >= *stasis_page_slotted_numslots_ptr(p)) { return INVALID_SLOT; }
int ret = *slot_length_ptr(p, rid.slot); if(*stasis_page_slotted_slot_ptr(p, rid.slot) == INVALID_SLOT) { return INVALID_SLOT; }
int ret = *stasis_page_slotted_slot_length_ptr(p, rid.slot);
return ret >= 0 ? NORMAL_SLOT : ret; return ret >= 0 ? NORMAL_SLOT : ret;
} }
static void slottedSetType(int xid, Page *p, recordid rid, int type) { static void slottedSetType(int xid, Page *p, recordid rid, int type) {
sanityCheck(p, rid); sanityCheck(p, rid);
int old_type = *slot_length_ptr(p, rid.slot); int old_type = *stasis_page_slotted_slot_length_ptr(p, rid.slot);
assert(rid.slot < *numslots_ptr(p)); assert(rid.slot < *stasis_page_slotted_numslots_ptr(p));
assert(old_type != INVALID_SLOT); assert(old_type != INVALID_SLOT);
if(type == NORMAL_SLOT) { if(type == NORMAL_SLOT) {
// set slot_length_ptr to the physical length. // set slot_length_ptr to the physical length.
*slot_length_ptr(p, rid.slot) = stasis_record_type_to_size(old_type); *stasis_page_slotted_slot_length_ptr(p, rid.slot) = stasis_record_type_to_size(old_type);
} else { } else {
// Changing to a special slot type; make sure doing so doesn't change // Changing to a special slot type; make sure doing so doesn't change
// the record size. // the record size.
assert(stasis_record_type_to_size(type) == stasis_record_type_to_size(old_type)); assert(stasis_record_type_to_size(type) == stasis_record_type_to_size(old_type));
*slot_length_ptr(p, rid.slot) = type; *stasis_page_slotted_slot_length_ptr(p, rid.slot) = type;
} }
} }
static int slottedGetLength(int xid, Page *p, recordid rid) { static int slottedGetLength(int xid, Page *p, recordid rid) {
#ifdef SLOTTED_PAGE_OLD_CHECKS
assert(p->id == rid.page); assert(p->id == rid.page);
slottedFsck(p); slottedFsck(p);
#endif
if( slottedGetType(xid, p, rid) == INVALID_SLOT) if( slottedGetType(xid, p, rid) == INVALID_SLOT)
return INVALID_SLOT; return INVALID_SLOT;
else else
return stasis_record_type_to_size(*slot_length_ptr(p, rid.slot)); return stasis_record_type_to_size(*stasis_page_slotted_slot_length_ptr(p, rid.slot));
} }
static recordid slottedNext(int xid, Page *p, recordid rid) { static recordid slottedNext(int xid, Page *p, recordid rid) {
sanityCheck(p, rid); sanityCheck(p, rid);
short n = *numslots_ptr(p); short n = *stasis_page_slotted_numslots_ptr(p);
rid.slot ++; rid.slot ++;
while(rid.slot < n && slottedGetType(xid,p,rid)==INVALID_SLOT) { while(rid.slot < n && slottedGetType(xid,p,rid)==INVALID_SLOT) {
rid.slot++; rid.slot++;
} }
if(rid.slot != n) { if(rid.slot != n) {
rid.size = *slot_length_ptr(p, rid.slot); rid.size = *stasis_page_slotted_slot_length_ptr(p, rid.slot);
return rid; return rid;
} else { } else {
return NULLRID; return NULLRID;
@ -474,11 +472,11 @@ static recordid slottedPreRalloc(int xid, Page * p, int type) {
recordid rid; recordid rid;
rid.page = p->id; rid.page = p->id;
rid.slot = *numslots_ptr(p); rid.slot = *stasis_page_slotted_numslots_ptr(p);
rid.size = type; rid.size = type;
if(*freelist_ptr(p) != INVALID_SLOT) { if(*stasis_page_slotted_freelist_ptr(p) != INVALID_SLOT) {
rid.slot = *freelist_ptr(p); rid.slot = *stasis_page_slotted_freelist_ptr(p);
} }
if(slottedFreespaceForSlot(p, rid.slot) < stasis_record_type_to_size(type)) { if(slottedFreespaceForSlot(p, rid.slot) < stasis_record_type_to_size(type)) {
@ -497,18 +495,18 @@ static void slottedPostRalloc(int xid, Page * p, recordid rid) {
static void slottedFree(int xid, Page * p, recordid rid) { static void slottedFree(int xid, Page * p, recordid rid) {
sanityCheck(p, rid); sanityCheck(p, rid);
if(*freespace_ptr(p) == *slot_ptr(p, rid.slot) + stasis_record_type_to_size(rid.size)) { if(*stasis_page_slotted_freespace_ptr(p) == *stasis_page_slotted_slot_ptr(p, rid.slot) + stasis_record_type_to_size(rid.size)) {
(*freespace_ptr(p)) -= stasis_record_type_to_size(rid.size); (*stasis_page_slotted_freespace_ptr(p)) -= stasis_record_type_to_size(rid.size);
} }
assert(rid.slot < *numslots_ptr(p)); assert(rid.slot < *stasis_page_slotted_numslots_ptr(p));
if(rid.slot == *numslots_ptr(p)-1) { if(rid.slot == *stasis_page_slotted_numslots_ptr(p)-1) {
(*numslots_ptr(p))--; (*stasis_page_slotted_numslots_ptr(p))--;
assert(slottedGetType(xid,p,rid)==INVALID_SLOT); assert(slottedGetType(xid,p,rid)==INVALID_SLOT);
} else { } else {
*slot_ptr(p, rid.slot) = INVALID_SLOT; *stasis_page_slotted_slot_ptr(p, rid.slot) = INVALID_SLOT;
*slot_length_ptr(p, rid.slot) = *freelist_ptr(p); *stasis_page_slotted_slot_length_ptr(p, rid.slot) = *stasis_page_slotted_freelist_ptr(p);
*freelist_ptr(p) = rid.slot; *stasis_page_slotted_freelist_ptr(p) = rid.slot;
assert(slottedGetType(xid,p,rid)==INVALID_SLOT); assert(slottedGetType(xid,p,rid)==INVALID_SLOT);
} }
@ -520,7 +518,6 @@ static void slottedFree(int xid, Page * p, recordid rid) {
static void slottedLoaded(Page *p) { static void slottedLoaded(Page *p) {
p->LSN = *stasis_page_lsn_ptr(p); p->LSN = *stasis_page_lsn_ptr(p);
// @todo arrange for pagefsck to run on load/flush, but nowhere else.
slottedFsck(p); slottedFsck(p);
} }
static void slottedFlushed(Page *p) { static void slottedFlushed(Page *p) {
@ -529,7 +526,25 @@ static void slottedFlushed(Page *p) {
} }
static void slottedCleanup(Page *p) { } static void slottedCleanup(Page *p) { }
page_impl slottedImpl() { void stasis_page_slotted_init() {
#ifdef SLOTTED_PAGE_CHECK_FOR_OVERLAP
printf("slotted.c: Using expensive page sanity checking.\n");
#endif
}
void stasis_page_slotted_deinit() {
}
void stasis_page_slotted_initialize_page(Page * page) {
assertlocked(page->rwlatch);
stasis_page_cleanup(page);
page->pageType = SLOTTED_PAGE;
*stasis_page_slotted_freespace_ptr(page) = 0;
*stasis_page_slotted_numslots_ptr(page) = 0;
*stasis_page_slotted_freelist_ptr(page) = INVALID_SLOT;
}
page_impl stasis_page_slotted_impl() {
static page_impl pi = { static page_impl pi = {
SLOTTED_PAGE, SLOTTED_PAGE,
1, 1,
@ -559,8 +574,8 @@ static page_impl pi = {
return pi; return pi;
} }
page_impl boundaryTagImpl() { page_impl stasis_page_boundary_tag_impl() {
page_impl p = slottedImpl(); page_impl p = stasis_page_slotted_impl();
p.page_type = BOUNDARY_TAG_PAGE; p.page_type = BOUNDARY_TAG_PAGE;
return p; return p;
} }

View file

@ -893,7 +893,7 @@ page_impl* stasis_page_impl_get(int id);
Its contents will be overwritten. It was probably Its contents will be overwritten. It was probably
returned by loadPage() returned by loadPage()
*/ */
void stasis_slotted_initialize_page(Page * p); void stasis_page_slotted_initialize_page(Page * p);
void stasis_slotted_lsn_free_initialize_page(Page * p); void stasis_slotted_lsn_free_initialize_page(Page * p);
void stasis_fixed_initialize_page(Page * page, size_t size, int count); void stasis_fixed_initialize_page(Page * page, size_t size, int count);
void stasis_indirect_initialize_page(Page * p, int height); void stasis_indirect_initialize_page(Page * p, int height);

View file

@ -60,36 +60,21 @@ Slotted page layout:
************************************************************************/ ************************************************************************/
#define SLOTTED_PAGE_OVERHEAD_PER_RECORD (2 * sizeof(short)) static inline int16_t* stasis_page_slotted_freespace_ptr(Page * p) { return stasis_page_int16_ptr_from_end((p), 1); }
#define SLOTTED_PAGE_HEADER_OVERHEAD (3 * sizeof(short)) static inline int16_t* stasis_page_slotted_numslots_ptr(Page * p) { return stasis_page_int16_ptr_from_end(p, 2); }
static inline int16_t* stasis_page_slotted_freelist_ptr(Page * p) { return stasis_page_int16_ptr_from_end(p, 3); }
static inline int16_t* stasis_page_slotted_slot_ptr(Page * p, slotid_t n) { return stasis_page_int16_ptr_from_end(p, (2*(n))+4); }
static inline int16_t* stasis_page_slotted_slot_length_ptr(Page * p, slotid_t n) { return stasis_page_int16_ptr_from_end((p), (2*(n))+5); }
static inline byte* stasis_page_slotted_record_ptr(Page * p, slotid_t n) { return stasis_page_byte_ptr_from_start((p), *stasis_page_slotted_slot_ptr((p), (n))); }
//#define SLOTTED_PAGE_CHECK_FOR_OVERLAP 1 static inline const int16_t* stasis_page_slotted_freespace_cptr(const Page * p) { return stasis_page_slotted_freespace_ptr((Page*)p); }
//#define SLOTTED_PAGE_SKIP_SANITY_CHECKS 1 static inline const int16_t* stasis_page_slotted_numslots_cptr(const Page * p) { return stasis_page_slotted_numslots_ptr((Page*)p); }
static inline const int16_t* stasis_page_slotted_freelist_cptr(const Page * p) { return stasis_page_slotted_freelist_ptr((Page*)p); }
static inline const int16_t* stasis_page_slotted_slot_cptr(const Page * p, slotid_t n) { return stasis_page_slotted_slot_ptr((Page*)p, n); }
static inline const int16_t* stasis_page_slotted_slot_length_cptr(const Page * p, slotid_t n) { return stasis_page_slotted_slot_length_ptr((Page*)p, n); }
static inline const byte* stasis_page_slotted_record_cptr(const Page * p, slotid_t n) { return stasis_page_slotted_record_ptr((Page*)p, n); }
#ifdef LONG_TEST void stasis_page_slotted_init();
#define SLOTTED_PAGE_CHECK_FOR_OVERLAP 1 void stasis_page_slotted_deinit();
#endif page_impl stasis_page_slotted_impl();
page_impl stasis_page_boundary_tag_impl();
/**
@todo rename and rewrite slotted.h macros as static inline functions.
*/
#define freespace_ptr(page) stasis_page_int16_ptr_from_end((page), 1)
#define numslots_ptr(page) stasis_page_int16_ptr_from_end((page), 2)
#define freelist_ptr(page) stasis_page_int16_ptr_from_end((page), 3)
#define slot_ptr(page, n) stasis_page_int16_ptr_from_end((page), (2*(n))+4)
#define slot_length_ptr(page, n) stasis_page_int16_ptr_from_end((page), (2*(n))+5)
#define record_ptr(page, n) stasis_page_byte_ptr_from_start((page), \
*slot_ptr((page), (n)))
#define freespace_cptr(page) stasis_page_int16_cptr_from_end((page), 1)
#define numslots_cptr(page) stasis_page_int16_cptr_from_end((page), 2)
#define freelist_cptr(page) stasis_page_int16_cptr_from_end((page), 3)
#define slot_cptr(page, n) stasis_page_int16_cptr_from_end((page), (2*(n))+4)
#define slot_length_cptr(page, n) stasis_page_int16_cptr_from_end((page), (2*(n))+5)
#define record_cptr(page, n) stasis_page_byte_cptr_from_start((page), \
*slot_cptr((page), (n)))
void slottedPageInit();
void slottedPageDeinit();
page_impl slottedImpl();
page_impl boundaryTagImpl();

View file

@ -47,7 +47,7 @@ void initializePages() {
assert(p->id != -1); assert(p->id != -1);
writelock(p->rwlatch,0); writelock(p->rwlatch,0);
stasis_slotted_initialize_page(p); stasis_page_slotted_initialize_page(p);
stasis_record_alloc_done(-1, p, rid); stasis_record_alloc_done(-1, p, rid);
int * buf = (int*)stasis_record_write_begin(-1, p, rid); int * buf = (int*)stasis_record_write_begin(-1, p, rid);
*buf = i; *buf = i;

View file

@ -71,7 +71,7 @@ START_TEST(operation_physical_do_undo) {
Page * p = loadPage(xid, pnum); Page * p = loadPage(xid, pnum);
writelock(p->rwlatch, 0); writelock(p->rwlatch, 0);
stasis_slotted_initialize_page(p); stasis_page_slotted_initialize_page(p);
rid = stasis_record_alloc_begin(xid, p, sizeof(int)); rid = stasis_record_alloc_begin(xid, p, sizeof(int));
stasis_record_alloc_done(xid, p, rid); stasis_record_alloc_done(xid, p, rid);
unlock(p->rwlatch); unlock(p->rwlatch);

View file

@ -220,7 +220,7 @@ START_TEST(pageNoThreadTest)
p = loadPage(-1, 0); p = loadPage(-1, 0);
writelock(p->rwlatch,0); writelock(p->rwlatch,0);
memset(p->memAddr, 0, PAGE_SIZE); memset(p->memAddr, 0, PAGE_SIZE);
stasis_slotted_initialize_page(p); stasis_page_slotted_initialize_page(p);
unlock(p->rwlatch); unlock(p->rwlatch);
worker_thread(p); worker_thread(p);
@ -251,14 +251,14 @@ START_TEST(pageCheckMacros) {
*stasis_page_lsn_ptr(&p) = lsn; *stasis_page_lsn_ptr(&p) = lsn;
*stasis_page_type_ptr(&p) = 10; *stasis_page_type_ptr(&p) = 10;
*freespace_ptr(&p) = 15; *stasis_page_slotted_freespace_ptr(&p) = 15;
*numslots_ptr(&p) = 20; *stasis_page_slotted_numslots_ptr(&p) = 20;
*slot_ptr(&p, 0) = 30; *stasis_page_slotted_slot_ptr(&p, 0) = 30;
*slot_ptr(&p, 1) = 35; *stasis_page_slotted_slot_ptr(&p, 1) = 35;
*slot_ptr(&p, 40) = 40; *stasis_page_slotted_slot_ptr(&p, 40) = 40;
*slot_length_ptr(&p, 0) = 31; *stasis_page_slotted_slot_length_ptr(&p, 0) = 31;
*slot_length_ptr(&p, 1) = 36; *stasis_page_slotted_slot_length_ptr(&p, 1) = 36;
*slot_length_ptr(&p, 40) = 41; *stasis_page_slotted_slot_length_ptr(&p, 40) = 41;
*stasis_page_byte_ptr_from_start(&p, 0) = 50; *stasis_page_byte_ptr_from_start(&p, 0) = 50;
*stasis_page_byte_ptr_from_start(&p, 1) = 51; *stasis_page_byte_ptr_from_start(&p, 1) = 51;
@ -269,14 +269,14 @@ START_TEST(pageCheckMacros) {
assert(*stasis_page_lsn_ptr(&p) == lsn); assert(*stasis_page_lsn_ptr(&p) == lsn);
assert(*stasis_page_type_ptr(&p) == 10); assert(*stasis_page_type_ptr(&p) == 10);
//assert(end_of_usable_space_ptr(&p) == stasis_page_type_ptr(&p)); //assert(end_of_usable_space_ptr(&p) == stasis_page_type_ptr(&p));
assert(*freespace_ptr(&p) == 15); assert(*stasis_page_slotted_freespace_ptr(&p) == 15);
assert(*numslots_ptr(&p) == 20); assert(*stasis_page_slotted_numslots_ptr(&p) == 20);
assert(*slot_ptr(&p, 0) == 30); assert(*stasis_page_slotted_slot_ptr(&p, 0) == 30);
assert(*slot_ptr(&p, 1) == 35); assert(*stasis_page_slotted_slot_ptr(&p, 1) == 35);
assert(*slot_ptr(&p, 40) == 40); assert(*stasis_page_slotted_slot_ptr(&p, 40) == 40);
assert(*slot_length_ptr(&p, 0) == 31); assert(*stasis_page_slotted_slot_length_ptr(&p, 0) == 31);
assert(*slot_length_ptr(&p, 1) == 36); assert(*stasis_page_slotted_slot_length_ptr(&p, 1) == 36);
assert(*slot_length_ptr(&p, 40) == 41); assert(*stasis_page_slotted_slot_length_ptr(&p, 40) == 41);
assert(*stasis_page_byte_ptr_from_start(&p, 0) == 50); assert(*stasis_page_byte_ptr_from_start(&p, 0) == 50);
assert(*stasis_page_byte_ptr_from_start(&p, 1) == 51); assert(*stasis_page_byte_ptr_from_start(&p, 1) == 51);
@ -343,7 +343,7 @@ START_TEST(pageRecordSizeTypeIteratorTest) {
Page * p = loadPage(xid,pid); Page * p = loadPage(xid,pid);
writelock(p->rwlatch,0); writelock(p->rwlatch,0);
memset(p->memAddr, 0, PAGE_SIZE); memset(p->memAddr, 0, PAGE_SIZE);
stasis_slotted_initialize_page(p); stasis_page_slotted_initialize_page(p);
checkPageIterators(xid,p,10); checkPageIterators(xid,p,10);
@ -386,7 +386,7 @@ START_TEST(pageNoThreadMultPageTest)
*stasis_page_lsn_ptr(p) = p->LSN; *stasis_page_lsn_ptr(p) = p->LSN;
writelock(p->rwlatch,0); writelock(p->rwlatch,0);
stasis_slotted_initialize_page(p); stasis_page_slotted_initialize_page(p);
unlock(p->rwlatch); unlock(p->rwlatch);
multiple_simultaneous_pages(p); multiple_simultaneous_pages(p);
// Normally, you would call pageWriteLSN() to update the LSN. This // Normally, you would call pageWriteLSN() to update the LSN. This
@ -426,7 +426,7 @@ START_TEST(pageThreadTest) {
Page * p = loadPage(-1, 2); Page * p = loadPage(-1, 2);
writelock(p->rwlatch,0); writelock(p->rwlatch,0);
memset(p->memAddr, 0, PAGE_SIZE); memset(p->memAddr, 0, PAGE_SIZE);
stasis_slotted_initialize_page(p); stasis_page_slotted_initialize_page(p);
unlock(p->rwlatch); unlock(p->rwlatch);
p->LSN = 0; p->LSN = 0;
*stasis_page_lsn_ptr(p) = p->LSN; *stasis_page_lsn_ptr(p) = p->LSN;