diff --git a/benchmarks/CMakeLists.txt b/benchmarks/CMakeLists.txt index b48a782..193692a 100644 --- a/benchmarks/CMakeLists.txt +++ b/benchmarks/CMakeLists.txt @@ -30,3 +30,4 @@ CREATE_EXECUTABLE(distributedLsnFree) IF(CHECK_LIBRARY) ADD_TEST(rose rose) ENDIF(CHECK_LIBRARY) +CREATE_EXECUTABLE(slotted) \ No newline at end of file diff --git a/benchmarks/slotted.c b/benchmarks/slotted.c new file mode 100644 index 0000000..02b8c11 --- /dev/null +++ b/benchmarks/slotted.c @@ -0,0 +1,115 @@ +/* + * slotted.c + * + * Created on: Jul 15, 2009 + * Author: sears + */ +#include +#include +#include +#include + +#include + +int main(int argc, char* argv[]) { + unlink("storefile.txt"); + unlink("logfile.txt"); + int i = 1; + // Should we pin the page once per run? + int mode = 0; + // How many operations? + unsigned long long count = 1000000; + while(i != argc) { + if(!strcmp(argv[i], "--mode")) { + i++; + assert(i != argc); + char * endptr; + mode = strtoull(argv[i], &endptr, 10); + assert(!*endptr); + i++; + } else if(!strcmp(argv[i], "--count")) { + i++; + assert(i != argc); + char * endptr; + count = strtoull(argv[i], &endptr, 10); + assert(!*endptr); + i++; + } else { + abort(); + } + } + printf("%d %lld\n", mode, count); + + Tinit(); + + int xid = Tbegin(); + + pageid_t page = TpageAlloc(xid); + if(mode == 0) { + Page * p = loadPage(xid, page); + writelock(p->rwlatch, 0); + stasis_page_slotted_initialize_page(p); + + recordid rid = stasis_record_alloc_begin(xid, p, sizeof(uint64_t)); + stasis_record_alloc_done(xid, p, rid); + for(unsigned long long i = 0; i < count; i++) { + uint64_t val = i; + stasis_record_write(xid, p, rid, (const byte*)&val); + } + unlock(p->rwlatch); + releasePage(p); + } else if(mode == 1) { + Page * p = loadPage(xid, page); + writelock(p->rwlatch, 0); + + stasis_page_slotted_initialize_page(p); + + recordid rid = stasis_record_alloc_begin(xid, p, sizeof(uint64_t)); + stasis_record_alloc_done(xid, p, rid); + + unlock(p->rwlatch); + + for(unsigned long long i = 0; i < count; i++) { + writelock(p->rwlatch, 0); + uint64_t val = i; + stasis_record_write(xid, p, rid, (const byte*)&val); + unlock(p->rwlatch); + } + releasePage(p); + } else if(mode == 2) { + Page * p = loadPage(xid, page); + writelock(p->rwlatch, 0); + + stasis_page_slotted_initialize_page(p); + + recordid rid = stasis_record_alloc_begin(xid, p, sizeof(uint64_t)); + stasis_record_alloc_done(xid, p, rid); + + unlock(p->rwlatch); + releasePage(p); + + for(unsigned long long i = 0; i < count; i++) { + Page * p = loadPage(xid, page); + writelock(p->rwlatch, 0); + uint64_t val = i; + stasis_record_write(xid, p, rid, (const byte*)&val); + unlock(p->rwlatch); + releasePage(p); + } + } else if(mode == 3) { + + recordid rid = Talloc(xid, sizeof(uint64_t)); + + for(unsigned long long i = 0; i < count; i++) { + uint64_t val = i; + Tset(xid, rid, &val); + } + } else { + abort(); + } + Tcommit(xid); + + Tdeinit(); + + return 0; +} diff --git a/src/stasis/operations/pageOperations.c b/src/stasis/operations/pageOperations.c index ae289ee..4bdf8d1 100644 --- a/src/stasis/operations/pageOperations.c +++ b/src/stasis/operations/pageOperations.c @@ -206,7 +206,7 @@ static int op_initialize_page(const LogEntry* e, Page* p) { switch(arg->slot) { case SLOTTED_PAGE: - stasis_slotted_initialize_page(p); + stasis_page_slotted_initialize_page(p); break; case FIXED_PAGE: stasis_fixed_initialize_page(p, arg->type, diff --git a/src/stasis/operations/regions.c b/src/stasis/operations/regions.c index 0fe9a62..54e420c 100644 --- a/src/stasis/operations/regions.c +++ b/src/stasis/operations/regions.c @@ -23,7 +23,7 @@ static void TdeallocBoundaryTag(int xid, pageid_t page); top action's logical undo grabs the necessary latches. */ static int op_alloc_boundary_tag(const LogEntry* e, Page* p) { - stasis_slotted_initialize_page(p); + stasis_page_slotted_initialize_page(p); recordid rid = {p->id, 0, sizeof(boundary_tag)}; assert(e->update.arg_size == sizeof(boundary_tag)); p->pageType = BOUNDARY_TAG_PAGE; diff --git a/src/stasis/page.c b/src/stasis/page.c index 6dccca6..034a499 100644 --- a/src/stasis/page.c +++ b/src/stasis/page.c @@ -116,12 +116,12 @@ lsn_t stasis_page_lsn_read(const Page * page) { */ void stasis_page_init(stasis_dirty_page_table_t * dpt) { dirtyPages = dpt; - slottedPageInit(); + stasis_page_slotted_init(); fixedPageInit(); - stasis_page_impl_register(slottedImpl()); + stasis_page_impl_register(stasis_page_slotted_impl()); stasis_page_impl_register(fixedImpl()); - stasis_page_impl_register(boundaryTagImpl()); + stasis_page_impl_register(stasis_page_boundary_tag_impl()); stasis_page_impl_register(arrayListImpl()); stasis_page_impl_register(stasis_page_blob_impl()); stasis_page_impl_register(lsmRootImpl()); @@ -137,7 +137,7 @@ void stasis_page_deinit() { } fixedPageDeinit(); - slottedPageDeinit(); + stasis_page_slotted_deinit(); } int stasis_page_impl_register(page_impl p) { diff --git a/src/stasis/page/lsnFree.c b/src/stasis/page/lsnFree.c index 155f33f..68aeb95 100644 --- a/src/stasis/page/lsnFree.c +++ b/src/stasis/page/lsnFree.c @@ -3,7 +3,7 @@ #include void stasis_slotted_lsn_free_initialize_page(Page * p) { - stasis_slotted_initialize_page(p); + stasis_page_slotted_initialize_page(p); p->pageType = SLOTTED_LSN_FREE_PAGE; } // XXX still not correct; need to have an "LSN_FREE" constant. @@ -13,7 +13,7 @@ static void lsnFreeLoaded(Page * p) { static void lsnFreeFlushed(Page * p) { } page_impl slottedLsnFreeImpl() { - page_impl pi = slottedImpl(); + page_impl pi = stasis_page_slotted_impl(); pi.page_type = SLOTTED_LSN_FREE_PAGE; pi.pageLoaded = lsnFreeLoaded; pi.pageLoaded = lsnFreeFlushed; diff --git a/src/stasis/page/slotted.c b/src/stasis/page/slotted.c index abec433..0d1b22a 100644 --- a/src/stasis/page/slotted.c +++ b/src/stasis/page/slotted.c @@ -3,12 +3,23 @@ #include #include +#define SLOTTED_PAGE_OVERHEAD_PER_RECORD (2 * sizeof(short)) +#define SLOTTED_PAGE_HEADER_OVERHEAD (3 * sizeof(short)) + +#ifdef LONG_TEST +#define SLOTTED_PAGE_CHECK_FOR_OVERLAP 1 +#endif + // plan: slotted fsck on read / write. Make it more thorough so that the other methods only check for record existence. // intermediate ops assume that slotted.c is correctly implemented (ie: fsck passes iff page is OK, page always "stays" OK) -// stop calloing slotted fsck on each operation. This should speed things up a *lot*. // benchmark page ops (real time) + hash table (user time) - +//#define SLOTTED_PAGE_OLD_CHECKS +#define SLOTTED_PAGE_NEW_CHECKS +#ifdef SLOTTED_PAGE_NEW_CHECKS +#define SLOTTED_PAGE_SANITY_CHECKS +#define SLOTTED_PAGE_CHECK_FOR_OVERLAP +#endif #include /** @@ -28,11 +39,11 @@ static inline void slottedFsck(const Page const * page) { dummy.memAddr = 0; const short page_type = page->pageType; - const short numslots = *numslots_cptr(page); - const short freespace = *freespace_cptr(page); - const short freelist = *freelist_cptr(page); + const short numslots = *stasis_page_slotted_numslots_cptr(page); + const short freespace = *stasis_page_slotted_freespace_cptr(page); + const short freelist = *stasis_page_slotted_freelist_cptr(page); - const long slotListStart = (long)slot_length_ptr(&dummy, numslots-1); + const long slotListStart = (long)stasis_page_slotted_slot_length_ptr(&dummy, numslots-1); assert(slotListStart < PAGE_SIZE && slotListStart >= 0); assert(page_type == SLOTTED_PAGE || page_type == BOUNDARY_TAG_PAGE || @@ -52,8 +63,8 @@ static inline void slottedFsck(const Page const * page) { short * slot_offsets = alloca(numslots * sizeof(short)); short * slot_lengths = alloca(numslots * sizeof(short)); for(int i = 0; i < numslots; i++) { - slot_offsets[i] = *slot_ptr(page, i); - slot_lengths[i] = *slot_length_ptr(page, i); + slot_offsets[i] = *stasis_page_slotted_slot_cptr(page, i); + slot_lengths[i] = *stasis_page_slotted_slot_length_cptr(page, i); } short foundEndOfList = 0; @@ -102,21 +113,21 @@ static inline void slottedFsck(const Page const * page) { for(short i = USABLE_SIZE_OF_PAGE - SLOTTED_PAGE_HEADER_OVERHEAD; i < USABLE_SIZE_OF_PAGE; i++) { image[i] = SLOTTED_HEADER; } - for(short i = *freespace_ptr(page); i < slotListStart; i++) { + for(short i = *stasis_page_slotted_freespace_cptr(page); i < slotListStart; i++) { image[i] = FREE_SPACE; } dummy.memAddr = image; - for(short i = 0; i < *numslots_ptr(page); i++) { - *slot_ptr(&dummy, i) = S_SLOT_LIST; - *slot_length_ptr(&dummy, i) = S_SLOT_LIST; + for(short i = 0; i < *stasis_page_slotted_numslots_cptr(page); i++) { + *stasis_page_slotted_slot_ptr(&dummy, i) = S_SLOT_LIST; + *stasis_page_slotted_slot_length_ptr(&dummy, i) = S_SLOT_LIST; } - for(short i = 0; i < *numslots_ptr(page); i++) { - short slot_offset = *slot_ptr(page, i); + for(short i = 0; i < *stasis_page_slotted_numslots_cptr(page); i++) { + short slot_offset = *stasis_page_slotted_slot_cptr(page, i); if(slot_offset != INVALID_SLOT) { const unsigned char ci = i % 0xFF; - short slot_len = stasis_record_type_to_size(*slot_length_ptr(page, i)); + short slot_len = stasis_record_type_to_size(*stasis_page_slotted_slot_length_cptr(page, i)); for(short j = 0; j < slot_len; j++) { assert(image[slot_offset + j] == 0xFF); @@ -149,11 +160,11 @@ static void slottedCompact(Page * page) { // Now, build new slotted page in the bufPage struct. - *freespace_ptr(&bufPage) = 0; + *stasis_page_slotted_freespace_ptr(&bufPage) = 0; // numslots_ptr will be set later. - *freelist_ptr(&bufPage) = INVALID_SLOT; + *stasis_page_slotted_freelist_ptr(&bufPage) = INVALID_SLOT; - const short numSlots = *numslots_ptr(page); + const short numSlots = *stasis_page_slotted_numslots_ptr(page); short lastFreeSlot = INVALID_SLOT; short lastFreeSlotBeforeUsedSlot = INVALID_SLOT; short lastUsedSlot = -1; @@ -161,58 +172,39 @@ static void slottedCompact(Page * page) { // Rebuild free list. for(short i = 0; i < numSlots; i++) { - if(*slot_ptr(page, i) == INVALID_SLOT) { + if(*stasis_page_slotted_slot_ptr(page, i) == INVALID_SLOT) { if(lastFreeSlot == INVALID_SLOT) { - *freelist_ptr(&bufPage) = i; + *stasis_page_slotted_freelist_ptr(&bufPage) = i; } else { - *slot_length_ptr(&bufPage, lastFreeSlot) = i; + *stasis_page_slotted_slot_length_ptr(&bufPage, lastFreeSlot) = i; } - *slot_ptr(&bufPage, i) = INVALID_SLOT; + *stasis_page_slotted_slot_ptr(&bufPage, i) = INVALID_SLOT; lastFreeSlot = i; } else { lastUsedSlot = i; lastFreeSlotBeforeUsedSlot = lastFreeSlot; - short logicalSize = *slot_length_ptr(page, i); + short logicalSize = *stasis_page_slotted_slot_length_ptr(page, i); short physicalSize = stasis_record_type_to_size(logicalSize); - memcpy(&(buffer[*freespace_ptr(&bufPage)]), record_ptr(page, i), physicalSize); + memcpy(&(buffer[*stasis_page_slotted_freespace_ptr(&bufPage)]), stasis_page_slotted_record_ptr(page, i), physicalSize); - *slot_ptr(&bufPage, i) = *freespace_ptr(&bufPage); - *slot_length_ptr(&bufPage, i) = logicalSize; + *stasis_page_slotted_slot_ptr(&bufPage, i) = *stasis_page_slotted_freespace_ptr(&bufPage); + *stasis_page_slotted_slot_length_ptr(&bufPage, i) = logicalSize; - (*freespace_ptr(&bufPage)) += physicalSize; + (*stasis_page_slotted_freespace_ptr(&bufPage)) += physicalSize; } } // Truncate linked list, and update numslots_ptr. - *slot_length_ptr(&bufPage, lastFreeSlotBeforeUsedSlot) = INVALID_SLOT; - *numslots_ptr(&bufPage) = lastUsedSlot+1; + *stasis_page_slotted_slot_length_ptr(&bufPage, lastFreeSlotBeforeUsedSlot) = INVALID_SLOT; + *stasis_page_slotted_numslots_ptr(&bufPage) = lastUsedSlot+1; memcpy(page->memAddr, buffer, PAGE_SIZE); - +#ifdef SLOTTED_PAGE_OLD_CHECKS slottedFsck(page); - -} - -void slottedPageInit() { -#ifdef SLOTTED_PAGE_CHECK_FOR_OVERLAP - printf("slotted.c: Using expensive page sanity checking.\n"); -#endif -} - -void slottedPageDeinit() { -} - - -void stasis_slotted_initialize_page(Page * page) { - assertlocked(page->rwlatch); - stasis_page_cleanup(page); - page->pageType = SLOTTED_PAGE; - *freespace_ptr(page) = 0; - *numslots_ptr(page) = 0; - *freelist_ptr(page) = INVALID_SLOT; +#endif // SLOTTED_PAGE_OLD_CHECKS } /** @@ -225,19 +217,19 @@ static size_t slottedFreespaceForSlot(Page * page, int slot) { size_t slotOverhead; if(slot == INVALID_SLOT) { - slotOverhead = (*freelist_ptr(page) == INVALID_SLOT) ? SLOTTED_PAGE_OVERHEAD_PER_RECORD : 0; - } else if(slot < *numslots_ptr(page)) { + slotOverhead = (*stasis_page_slotted_freelist_ptr(page) == INVALID_SLOT) ? SLOTTED_PAGE_OVERHEAD_PER_RECORD : 0; + } else if(slot < *stasis_page_slotted_numslots_ptr(page)) { slotOverhead = 0; } else { // slotOverhead = SLOTTED_PAGE_OVERHEAD_PER_RECORD * (*numslots_ptr(page) - slot); - slotOverhead = SLOTTED_PAGE_OVERHEAD_PER_RECORD * ((slot+1) - *numslots_ptr(page)); + slotOverhead = SLOTTED_PAGE_OVERHEAD_PER_RECORD * ((slot+1) - *stasis_page_slotted_numslots_ptr(page)); } // end_of_free_space points to the beginning of the slot header at the bottom of the page header. - byte* end_of_free_space = (byte*)slot_length_ptr(page, (*numslots_ptr(page))-1); + byte* end_of_free_space = (byte*)stasis_page_slotted_slot_length_ptr(page, (*stasis_page_slotted_numslots_ptr(page))-1); // start_of_free_space points to the first unallocated byte in the page // (ignoring space that could be reclaimed by compaction) - byte* start_of_free_space = (byte*)(page->memAddr + *freespace_ptr(page)); + byte* start_of_free_space = (byte*)(page->memAddr + *stasis_page_slotted_freespace_ptr(page)); assert(end_of_free_space >= start_of_free_space); @@ -282,33 +274,33 @@ static void really_do_ralloc(Page * page, recordid rid) { assert (slottedFreespaceForSlot(page, rid.slot) >= stasis_record_type_to_size(rid.size)); } - freeSpace = *freespace_ptr(page); + freeSpace = *stasis_page_slotted_freespace_ptr(page); // Remove this entry from the freelist (if necessary) slottedCompact // assumes that this does not change the order of items in the list. // If it did, then slottedCompact could leaks slot id's (or worse!) - if(rid.slot < *numslots_ptr(page) && *slot_ptr(page,rid.slot) == INVALID_SLOT) { - short next = *freelist_ptr(page); + if(rid.slot < *stasis_page_slotted_numslots_ptr(page) && *stasis_page_slotted_slot_ptr(page,rid.slot) == INVALID_SLOT) { + short next = *stasis_page_slotted_freelist_ptr(page); short last = INVALID_SLOT; // special case: is the slot physically before us the predecessor? if(rid.slot > 0) { - if(*slot_length_ptr(page, rid.slot-1) == rid.slot && *slot_ptr(page, rid.slot-1) == INVALID_SLOT) { + if(*stasis_page_slotted_slot_length_ptr(page, rid.slot-1) == rid.slot && *stasis_page_slotted_slot_ptr(page, rid.slot-1) == INVALID_SLOT) { next = rid.slot; last = rid.slot-1; } } while(next != INVALID_SLOT && next != rid.slot) { last = next; - assert(next < *numslots_ptr(page)); - short next_slot_ptr = *slot_ptr(page, next); + assert(next < *stasis_page_slotted_numslots_ptr(page)); + short next_slot_ptr = *stasis_page_slotted_slot_ptr(page, next); assert(next_slot_ptr == INVALID_SLOT); - next = *slot_length_ptr(page, next); + next = *stasis_page_slotted_slot_length_ptr(page, next); } if(next == rid.slot) { if(last == INVALID_SLOT) { - *freelist_ptr(page) = *slot_length_ptr(page, rid.slot); + *stasis_page_slotted_freelist_ptr(page) = *stasis_page_slotted_slot_length_ptr(page, rid.slot); } else { - *slot_length_ptr(page, last) = *slot_length_ptr(page, rid.slot); + *stasis_page_slotted_slot_length_ptr(page, last) = *stasis_page_slotted_slot_length_ptr(page, rid.slot); } } } @@ -320,63 +312,63 @@ static void really_do_ralloc(Page * page, recordid rid) { // correctness depends on this behavior!) - if(rid.slot > *numslots_ptr(page)) { + if(rid.slot > *stasis_page_slotted_numslots_ptr(page)) { short lastSlot; - short numSlots = *numslots_ptr(page); - if(*freelist_ptr(page) == INVALID_SLOT) { + short numSlots = *stasis_page_slotted_numslots_ptr(page); + if(*stasis_page_slotted_freelist_ptr(page) == INVALID_SLOT) { - *freelist_ptr(page) = numSlots; + *stasis_page_slotted_freelist_ptr(page) = numSlots; lastSlot = numSlots; - *slot_ptr(page, lastSlot) = INVALID_SLOT; + *stasis_page_slotted_slot_ptr(page, lastSlot) = INVALID_SLOT; // will set slot_length_ptr on next iteration. - (*numslots_ptr(page))++; + (*stasis_page_slotted_numslots_ptr(page))++; } else { lastSlot = INVALID_SLOT; - short next = *freelist_ptr(page); + short next = *stasis_page_slotted_freelist_ptr(page); while(next != INVALID_SLOT) { lastSlot = next; - next = *slot_length_ptr(page, lastSlot); - assert(lastSlot < *numslots_ptr(page)); - assert(*slot_ptr(page, lastSlot) == INVALID_SLOT); + next = *stasis_page_slotted_slot_length_ptr(page, lastSlot); + assert(lastSlot < *stasis_page_slotted_numslots_ptr(page)); + assert(*stasis_page_slotted_slot_ptr(page, lastSlot) == INVALID_SLOT); } - *slot_ptr(page, lastSlot) = INVALID_SLOT; + *stasis_page_slotted_slot_ptr(page, lastSlot) = INVALID_SLOT; } // lastSlot now contains the tail of the free list. We can start adding slots to the list starting at *numslots_ptr. - while(*numslots_ptr(page) < rid.slot) { - *slot_length_ptr(page, lastSlot) = *numslots_ptr(page); - lastSlot = *numslots_ptr(page); - *slot_ptr(page, lastSlot) = INVALID_SLOT; - (*numslots_ptr(page))++; + while(*stasis_page_slotted_numslots_ptr(page) < rid.slot) { + *stasis_page_slotted_slot_length_ptr(page, lastSlot) = *stasis_page_slotted_numslots_ptr(page); + lastSlot = *stasis_page_slotted_numslots_ptr(page); + *stasis_page_slotted_slot_ptr(page, lastSlot) = INVALID_SLOT; + (*stasis_page_slotted_numslots_ptr(page))++; } // Terminate the end of the list. - assert(lastSlot < *numslots_ptr(page)); - *slot_length_ptr(page, lastSlot) = INVALID_SLOT; + assert(lastSlot < *stasis_page_slotted_numslots_ptr(page)); + *stasis_page_slotted_slot_length_ptr(page, lastSlot) = INVALID_SLOT; } - if(*numslots_ptr(page) == rid.slot) { - *numslots_ptr(page) = rid.slot+1; + if(*stasis_page_slotted_numslots_ptr(page) == rid.slot) { + *stasis_page_slotted_numslots_ptr(page) = rid.slot+1; } - assert(*numslots_ptr(page) > rid.slot); + assert(*stasis_page_slotted_numslots_ptr(page) > rid.slot); - DEBUG("Num slots %d\trid.slot %d\n", *numslots_ptr(page), rid.slot); + DEBUG("Num slots %d\trid.slot %d\n", *stasis_page_slotted_numslots_ptr(page), rid.slot); // Reserve space for this record and record the space's offset in // the slot header. - assert(rid.slot < *numslots_ptr(page)); - *freespace_ptr(page) = freeSpace + stasis_record_type_to_size(rid.size); - *slot_ptr(page, rid.slot) = freeSpace; + assert(rid.slot < *stasis_page_slotted_numslots_ptr(page)); + *stasis_page_slotted_freespace_ptr(page) = freeSpace + stasis_record_type_to_size(rid.size); + *stasis_page_slotted_slot_ptr(page, rid.slot) = freeSpace; - *slot_length_ptr(page, rid.slot) = rid.size; + *stasis_page_slotted_slot_length_ptr(page, rid.slot) = rid.size; } @@ -385,68 +377,74 @@ static void really_do_ralloc(Page * page, recordid rid) { // -------------------------------------------------------------------------- static inline void sanityCheck(Page * p, recordid rid) { +#ifdef SLOTTED_PAGE_OLD_CHECKS assert(p->id == rid.page); assert(rid.size < BLOB_THRESHOLD_SIZE); // Caller deals with this now! slottedFsck(p); +#endif } static const byte* slottedRead (int xid, Page *p, recordid rid) { sanityCheck(p, rid); - return record_ptr(p, rid.slot); + return stasis_page_slotted_record_ptr(p, rid.slot); } static byte* slottedWrite(int xid, Page *p, recordid rid) { sanityCheck(p, rid); - return record_ptr(p, rid.slot); + return stasis_page_slotted_record_ptr(p, rid.slot); } static int slottedGetType(int xid, Page *p, recordid rid) { +#ifdef SLOTTED_PAGE_OLD_CHECKS //sanityCheck(p, rid); <-- Would fail if rid.size is a blob assert(p->id == rid.page); slottedFsck(p); - if(rid.slot >= *numslots_ptr(p)) { return INVALID_SLOT; } - if(*slot_ptr(p, rid.slot) == INVALID_SLOT) { return INVALID_SLOT; } - int ret = *slot_length_ptr(p, rid.slot); +#endif + if(rid.slot >= *stasis_page_slotted_numslots_ptr(p)) { return INVALID_SLOT; } + if(*stasis_page_slotted_slot_ptr(p, rid.slot) == INVALID_SLOT) { return INVALID_SLOT; } + int ret = *stasis_page_slotted_slot_length_ptr(p, rid.slot); return ret >= 0 ? NORMAL_SLOT : ret; } static void slottedSetType(int xid, Page *p, recordid rid, int type) { sanityCheck(p, rid); - int old_type = *slot_length_ptr(p, rid.slot); - assert(rid.slot < *numslots_ptr(p)); + int old_type = *stasis_page_slotted_slot_length_ptr(p, rid.slot); + assert(rid.slot < *stasis_page_slotted_numslots_ptr(p)); assert(old_type != INVALID_SLOT); if(type == NORMAL_SLOT) { // set slot_length_ptr to the physical length. - *slot_length_ptr(p, rid.slot) = stasis_record_type_to_size(old_type); + *stasis_page_slotted_slot_length_ptr(p, rid.slot) = stasis_record_type_to_size(old_type); } else { // Changing to a special slot type; make sure doing so doesn't change // the record size. assert(stasis_record_type_to_size(type) == stasis_record_type_to_size(old_type)); - *slot_length_ptr(p, rid.slot) = type; + *stasis_page_slotted_slot_length_ptr(p, rid.slot) = type; } } static int slottedGetLength(int xid, Page *p, recordid rid) { +#ifdef SLOTTED_PAGE_OLD_CHECKS assert(p->id == rid.page); slottedFsck(p); +#endif if( slottedGetType(xid, p, rid) == INVALID_SLOT) return INVALID_SLOT; else - return stasis_record_type_to_size(*slot_length_ptr(p, rid.slot)); + return stasis_record_type_to_size(*stasis_page_slotted_slot_length_ptr(p, rid.slot)); } static recordid slottedNext(int xid, Page *p, recordid rid) { sanityCheck(p, rid); - short n = *numslots_ptr(p); + short n = *stasis_page_slotted_numslots_ptr(p); rid.slot ++; while(rid.slot < n && slottedGetType(xid,p,rid)==INVALID_SLOT) { rid.slot++; } if(rid.slot != n) { - rid.size = *slot_length_ptr(p, rid.slot); + rid.size = *stasis_page_slotted_slot_length_ptr(p, rid.slot); return rid; } else { return NULLRID; @@ -474,11 +472,11 @@ static recordid slottedPreRalloc(int xid, Page * p, int type) { recordid rid; rid.page = p->id; - rid.slot = *numslots_ptr(p); + rid.slot = *stasis_page_slotted_numslots_ptr(p); rid.size = type; - if(*freelist_ptr(p) != INVALID_SLOT) { - rid.slot = *freelist_ptr(p); + if(*stasis_page_slotted_freelist_ptr(p) != INVALID_SLOT) { + rid.slot = *stasis_page_slotted_freelist_ptr(p); } if(slottedFreespaceForSlot(p, rid.slot) < stasis_record_type_to_size(type)) { @@ -497,18 +495,18 @@ static void slottedPostRalloc(int xid, Page * p, recordid rid) { static void slottedFree(int xid, Page * p, recordid rid) { sanityCheck(p, rid); - if(*freespace_ptr(p) == *slot_ptr(p, rid.slot) + stasis_record_type_to_size(rid.size)) { - (*freespace_ptr(p)) -= stasis_record_type_to_size(rid.size); + if(*stasis_page_slotted_freespace_ptr(p) == *stasis_page_slotted_slot_ptr(p, rid.slot) + stasis_record_type_to_size(rid.size)) { + (*stasis_page_slotted_freespace_ptr(p)) -= stasis_record_type_to_size(rid.size); } - assert(rid.slot < *numslots_ptr(p)); - if(rid.slot == *numslots_ptr(p)-1) { - (*numslots_ptr(p))--; + assert(rid.slot < *stasis_page_slotted_numslots_ptr(p)); + if(rid.slot == *stasis_page_slotted_numslots_ptr(p)-1) { + (*stasis_page_slotted_numslots_ptr(p))--; assert(slottedGetType(xid,p,rid)==INVALID_SLOT); } else { - *slot_ptr(p, rid.slot) = INVALID_SLOT; - *slot_length_ptr(p, rid.slot) = *freelist_ptr(p); - *freelist_ptr(p) = rid.slot; + *stasis_page_slotted_slot_ptr(p, rid.slot) = INVALID_SLOT; + *stasis_page_slotted_slot_length_ptr(p, rid.slot) = *stasis_page_slotted_freelist_ptr(p); + *stasis_page_slotted_freelist_ptr(p) = rid.slot; assert(slottedGetType(xid,p,rid)==INVALID_SLOT); } @@ -520,7 +518,6 @@ static void slottedFree(int xid, Page * p, recordid rid) { static void slottedLoaded(Page *p) { p->LSN = *stasis_page_lsn_ptr(p); - // @todo arrange for pagefsck to run on load/flush, but nowhere else. slottedFsck(p); } static void slottedFlushed(Page *p) { @@ -529,7 +526,25 @@ static void slottedFlushed(Page *p) { } static void slottedCleanup(Page *p) { } -page_impl slottedImpl() { +void stasis_page_slotted_init() { +#ifdef SLOTTED_PAGE_CHECK_FOR_OVERLAP + printf("slotted.c: Using expensive page sanity checking.\n"); +#endif +} + +void stasis_page_slotted_deinit() { +} + +void stasis_page_slotted_initialize_page(Page * page) { + assertlocked(page->rwlatch); + stasis_page_cleanup(page); + page->pageType = SLOTTED_PAGE; + *stasis_page_slotted_freespace_ptr(page) = 0; + *stasis_page_slotted_numslots_ptr(page) = 0; + *stasis_page_slotted_freelist_ptr(page) = INVALID_SLOT; +} + +page_impl stasis_page_slotted_impl() { static page_impl pi = { SLOTTED_PAGE, 1, @@ -559,8 +574,8 @@ static page_impl pi = { return pi; } -page_impl boundaryTagImpl() { - page_impl p = slottedImpl(); +page_impl stasis_page_boundary_tag_impl() { + page_impl p = stasis_page_slotted_impl(); p.page_type = BOUNDARY_TAG_PAGE; return p; } diff --git a/stasis/page.h b/stasis/page.h index e05fa5d..7aada11 100644 --- a/stasis/page.h +++ b/stasis/page.h @@ -893,7 +893,7 @@ page_impl* stasis_page_impl_get(int id); Its contents will be overwritten. It was probably returned by loadPage() */ -void stasis_slotted_initialize_page(Page * p); +void stasis_page_slotted_initialize_page(Page * p); void stasis_slotted_lsn_free_initialize_page(Page * p); void stasis_fixed_initialize_page(Page * page, size_t size, int count); void stasis_indirect_initialize_page(Page * p, int height); diff --git a/stasis/page/slotted.h b/stasis/page/slotted.h index 1bb8c77..42be326 100644 --- a/stasis/page/slotted.h +++ b/stasis/page/slotted.h @@ -60,36 +60,21 @@ Slotted page layout: ************************************************************************/ -#define SLOTTED_PAGE_OVERHEAD_PER_RECORD (2 * sizeof(short)) -#define SLOTTED_PAGE_HEADER_OVERHEAD (3 * sizeof(short)) +static inline int16_t* stasis_page_slotted_freespace_ptr(Page * p) { return stasis_page_int16_ptr_from_end((p), 1); } +static inline int16_t* stasis_page_slotted_numslots_ptr(Page * p) { return stasis_page_int16_ptr_from_end(p, 2); } +static inline int16_t* stasis_page_slotted_freelist_ptr(Page * p) { return stasis_page_int16_ptr_from_end(p, 3); } +static inline int16_t* stasis_page_slotted_slot_ptr(Page * p, slotid_t n) { return stasis_page_int16_ptr_from_end(p, (2*(n))+4); } +static inline int16_t* stasis_page_slotted_slot_length_ptr(Page * p, slotid_t n) { return stasis_page_int16_ptr_from_end((p), (2*(n))+5); } +static inline byte* stasis_page_slotted_record_ptr(Page * p, slotid_t n) { return stasis_page_byte_ptr_from_start((p), *stasis_page_slotted_slot_ptr((p), (n))); } -//#define SLOTTED_PAGE_CHECK_FOR_OVERLAP 1 -//#define SLOTTED_PAGE_SKIP_SANITY_CHECKS 1 +static inline const int16_t* stasis_page_slotted_freespace_cptr(const Page * p) { return stasis_page_slotted_freespace_ptr((Page*)p); } +static inline const int16_t* stasis_page_slotted_numslots_cptr(const Page * p) { return stasis_page_slotted_numslots_ptr((Page*)p); } +static inline const int16_t* stasis_page_slotted_freelist_cptr(const Page * p) { return stasis_page_slotted_freelist_ptr((Page*)p); } +static inline const int16_t* stasis_page_slotted_slot_cptr(const Page * p, slotid_t n) { return stasis_page_slotted_slot_ptr((Page*)p, n); } +static inline const int16_t* stasis_page_slotted_slot_length_cptr(const Page * p, slotid_t n) { return stasis_page_slotted_slot_length_ptr((Page*)p, n); } +static inline const byte* stasis_page_slotted_record_cptr(const Page * p, slotid_t n) { return stasis_page_slotted_record_ptr((Page*)p, n); } -#ifdef LONG_TEST -#define SLOTTED_PAGE_CHECK_FOR_OVERLAP 1 -#endif - -/** - @todo rename and rewrite slotted.h macros as static inline functions. - */ -#define freespace_ptr(page) stasis_page_int16_ptr_from_end((page), 1) -#define numslots_ptr(page) stasis_page_int16_ptr_from_end((page), 2) -#define freelist_ptr(page) stasis_page_int16_ptr_from_end((page), 3) -#define slot_ptr(page, n) stasis_page_int16_ptr_from_end((page), (2*(n))+4) -#define slot_length_ptr(page, n) stasis_page_int16_ptr_from_end((page), (2*(n))+5) -#define record_ptr(page, n) stasis_page_byte_ptr_from_start((page), \ - *slot_ptr((page), (n))) - -#define freespace_cptr(page) stasis_page_int16_cptr_from_end((page), 1) -#define numslots_cptr(page) stasis_page_int16_cptr_from_end((page), 2) -#define freelist_cptr(page) stasis_page_int16_cptr_from_end((page), 3) -#define slot_cptr(page, n) stasis_page_int16_cptr_from_end((page), (2*(n))+4) -#define slot_length_cptr(page, n) stasis_page_int16_cptr_from_end((page), (2*(n))+5) -#define record_cptr(page, n) stasis_page_byte_cptr_from_start((page), \ - *slot_cptr((page), (n))) - -void slottedPageInit(); -void slottedPageDeinit(); -page_impl slottedImpl(); -page_impl boundaryTagImpl(); +void stasis_page_slotted_init(); +void stasis_page_slotted_deinit(); +page_impl stasis_page_slotted_impl(); +page_impl stasis_page_boundary_tag_impl(); diff --git a/test/stasis/check_bufferManager.c b/test/stasis/check_bufferManager.c index 1d88da9..ae877c0 100644 --- a/test/stasis/check_bufferManager.c +++ b/test/stasis/check_bufferManager.c @@ -47,7 +47,7 @@ void initializePages() { assert(p->id != -1); writelock(p->rwlatch,0); - stasis_slotted_initialize_page(p); + stasis_page_slotted_initialize_page(p); stasis_record_alloc_done(-1, p, rid); int * buf = (int*)stasis_record_write_begin(-1, p, rid); *buf = i; diff --git a/test/stasis/check_operations.c b/test/stasis/check_operations.c index 3de3231..955f687 100644 --- a/test/stasis/check_operations.c +++ b/test/stasis/check_operations.c @@ -71,7 +71,7 @@ START_TEST(operation_physical_do_undo) { Page * p = loadPage(xid, pnum); writelock(p->rwlatch, 0); - stasis_slotted_initialize_page(p); + stasis_page_slotted_initialize_page(p); rid = stasis_record_alloc_begin(xid, p, sizeof(int)); stasis_record_alloc_done(xid, p, rid); unlock(p->rwlatch); diff --git a/test/stasis/check_page.c b/test/stasis/check_page.c index 1cda697..3c536cb 100644 --- a/test/stasis/check_page.c +++ b/test/stasis/check_page.c @@ -220,7 +220,7 @@ START_TEST(pageNoThreadTest) p = loadPage(-1, 0); writelock(p->rwlatch,0); memset(p->memAddr, 0, PAGE_SIZE); - stasis_slotted_initialize_page(p); + stasis_page_slotted_initialize_page(p); unlock(p->rwlatch); worker_thread(p); @@ -251,14 +251,14 @@ START_TEST(pageCheckMacros) { *stasis_page_lsn_ptr(&p) = lsn; *stasis_page_type_ptr(&p) = 10; - *freespace_ptr(&p) = 15; - *numslots_ptr(&p) = 20; - *slot_ptr(&p, 0) = 30; - *slot_ptr(&p, 1) = 35; - *slot_ptr(&p, 40) = 40; - *slot_length_ptr(&p, 0) = 31; - *slot_length_ptr(&p, 1) = 36; - *slot_length_ptr(&p, 40) = 41; + *stasis_page_slotted_freespace_ptr(&p) = 15; + *stasis_page_slotted_numslots_ptr(&p) = 20; + *stasis_page_slotted_slot_ptr(&p, 0) = 30; + *stasis_page_slotted_slot_ptr(&p, 1) = 35; + *stasis_page_slotted_slot_ptr(&p, 40) = 40; + *stasis_page_slotted_slot_length_ptr(&p, 0) = 31; + *stasis_page_slotted_slot_length_ptr(&p, 1) = 36; + *stasis_page_slotted_slot_length_ptr(&p, 40) = 41; *stasis_page_byte_ptr_from_start(&p, 0) = 50; *stasis_page_byte_ptr_from_start(&p, 1) = 51; @@ -269,14 +269,14 @@ START_TEST(pageCheckMacros) { assert(*stasis_page_lsn_ptr(&p) == lsn); assert(*stasis_page_type_ptr(&p) == 10); //assert(end_of_usable_space_ptr(&p) == stasis_page_type_ptr(&p)); - assert(*freespace_ptr(&p) == 15); - assert(*numslots_ptr(&p) == 20); - assert(*slot_ptr(&p, 0) == 30); - assert(*slot_ptr(&p, 1) == 35); - assert(*slot_ptr(&p, 40) == 40); - assert(*slot_length_ptr(&p, 0) == 31); - assert(*slot_length_ptr(&p, 1) == 36); - assert(*slot_length_ptr(&p, 40) == 41); + assert(*stasis_page_slotted_freespace_ptr(&p) == 15); + assert(*stasis_page_slotted_numslots_ptr(&p) == 20); + assert(*stasis_page_slotted_slot_ptr(&p, 0) == 30); + assert(*stasis_page_slotted_slot_ptr(&p, 1) == 35); + assert(*stasis_page_slotted_slot_ptr(&p, 40) == 40); + assert(*stasis_page_slotted_slot_length_ptr(&p, 0) == 31); + assert(*stasis_page_slotted_slot_length_ptr(&p, 1) == 36); + assert(*stasis_page_slotted_slot_length_ptr(&p, 40) == 41); assert(*stasis_page_byte_ptr_from_start(&p, 0) == 50); assert(*stasis_page_byte_ptr_from_start(&p, 1) == 51); @@ -343,7 +343,7 @@ START_TEST(pageRecordSizeTypeIteratorTest) { Page * p = loadPage(xid,pid); writelock(p->rwlatch,0); memset(p->memAddr, 0, PAGE_SIZE); - stasis_slotted_initialize_page(p); + stasis_page_slotted_initialize_page(p); checkPageIterators(xid,p,10); @@ -386,7 +386,7 @@ START_TEST(pageNoThreadMultPageTest) *stasis_page_lsn_ptr(p) = p->LSN; writelock(p->rwlatch,0); - stasis_slotted_initialize_page(p); + stasis_page_slotted_initialize_page(p); unlock(p->rwlatch); multiple_simultaneous_pages(p); // Normally, you would call pageWriteLSN() to update the LSN. This @@ -426,7 +426,7 @@ START_TEST(pageThreadTest) { Page * p = loadPage(-1, 2); writelock(p->rwlatch,0); memset(p->memAddr, 0, PAGE_SIZE); - stasis_slotted_initialize_page(p); + stasis_page_slotted_initialize_page(p); unlock(p->rwlatch); p->LSN = 0; *stasis_page_lsn_ptr(p) = p->LSN;