diff --git a/src/lladd/page/slotted.c b/src/lladd/page/slotted.c index bb982a6..3afb4dc 100644 --- a/src/lladd/page/slotted.c +++ b/src/lladd/page/slotted.c @@ -2,12 +2,126 @@ #include "../page.h" -//#include "../blobManager.h" /** So that we can call sizeof(blob_record_t) */ #include "slotted.h" #include static void really_do_ralloc(Page * page, recordid rid) ; size_t slottedFreespaceForSlot(Page * page, int slot); + +void fsckSlottedPage(const Page const * page) { + +#ifndef SLOTTED_PAGE_SKIP_SANITY_CHECKS + Page dummy; + + dummy.id = -1; + dummy.memAddr = 0; + + const short page_type = *page_type_ptr(page); + const short numslots = *numslots_ptr(page); + const short freespace = *freespace_ptr(page); + const short freelist = *freelist_ptr(page); + + const long slotListStart = (long)slot_length_ptr(&dummy, numslots-1); + assert(slotListStart < PAGE_SIZE && slotListStart >= 0); + assert(page_type == SLOTTED_PAGE || + page_type == BOUNDARY_TAG_PAGE); + assert(numslots >= 0); + assert(numslots * SLOTTED_PAGE_OVERHEAD_PER_RECORD < PAGE_SIZE); + assert(freespace >= 0); + assert(freespace <= slotListStart); + assert(freelist >= INVALID_SLOT); + assert(freelist < numslots); + + // Now, check integrity of freelist. All free slots less than numslots should be on it, in order. + + short * slot_offsets = alloca(numslots * sizeof(short)); + short * slot_lengths = alloca(numslots * sizeof(short)); + for(int i = 0; i < numslots; i++) { + slot_offsets[i] = *slot_ptr(page, i); + slot_lengths[i] = *slot_length_ptr(page, i); + } + + short foundEndOfList = 0; + + if(freelist != INVALID_SLOT) { + assert(slot_offsets[freelist] == INVALID_SLOT); + } else { + foundEndOfList = 1; + } + + for(short i = 0; i < numslots; i++) { + const short slot_length = slot_lengths[i]; + const short slot_offset = slot_offsets[i]; + if(slot_offset == INVALID_SLOT) { + /* if(last_freeslot == INVALID_SLOT) { + assert(freelist == i); + } else { + assert(last_freeslot_length == i); + } + last_freeslot = i; + last_freeslot_length = slot_length; */ + if(slot_length == INVALID_SLOT) { + assert(!foundEndOfList); + foundEndOfList = 1; + } else { + assert (slot_offsets[slot_length] == INVALID_SLOT); + } + } else { + assert(slot_offset + slot_length <= freespace); + } + } + + // Is the free list terminated? + assert(foundEndOfList); + +#ifdef SLOTTED_PAGE_CHECK_FOR_OVERLAP + + const byte UNUSED = 0xFF; + const byte PAGE_HEADER = 0xFE; + const byte SLOTTED_HEADER = 0xFD; + // const byte SLOT_LIST = 0xFC; + const byte FREE_SPACE = 0xFB; + + const unsigned short S_SLOT_LIST = 0xFCFC; + + byte image[PAGE_SIZE]; + for(short i = 0; i < PAGE_SIZE; i++) { + image[i] = UNUSED; + } + for(short i = USABLE_SIZE_OF_PAGE; i < PAGE_SIZE; i++) { + image[i] = PAGE_HEADER; + } + for(short i = USABLE_SIZE_OF_PAGE - SLOTTED_PAGE_HEADER_OVERHEAD; i < USABLE_SIZE_OF_PAGE; i++) { + image[i] = SLOTTED_HEADER; + } + for(short i = *freespace_ptr(page); i < slotListStart; i++) { + image[i] = FREE_SPACE; + } + + dummy.memAddr = image; + + for(short i = 0; i < *numslots_ptr(page); i++) { + *slot_ptr(&dummy, i) = S_SLOT_LIST; + *slot_length_ptr(&dummy, i) = S_SLOT_LIST; + } + for(short i = 0; i < *numslots_ptr(page); i++) { + short slot_offset = *slot_ptr(page, i); + if(slot_offset != INVALID_SLOT) { + const unsigned char ci = i % 0xFF; + short slot_len = physical_slot_length(*slot_length_ptr(page, i)); + + for(short j = 0; j < slot_len; j++) { + assert(image[slot_offset + j] == 0xFF); + image[slot_offset + j] = ci; + } + } + } +#endif // SLOTTED_PAGE_CHECK_FOR_OVERLAP +#endif // SLOTTED_PAGE_SKIP_SANITY_CHECKS + +} + + /** Move all of the records to the beginning of the page in order to @@ -16,7 +130,67 @@ increase the available free space. The caller of this function must have a writelock on the page. */ -void slottedCompact(Page * page) { +void slottedCompact(Page * page) { + Page bufPage; + byte buffer[PAGE_SIZE]; + bufPage.memAddr = buffer; + + // Copy external headers into bufPage. + + memcpy(&buffer[USABLE_SIZE_OF_PAGE], &(page->memAddr[USABLE_SIZE_OF_PAGE]), PAGE_SIZE - USABLE_SIZE_OF_PAGE); + + // Now, build new slotted page in the bufPage struct. + + *freespace_ptr(&bufPage) = 0; + // numslots_ptr will be set later. + *freelist_ptr(&bufPage) = INVALID_SLOT; + + const short numSlots = *numslots_ptr(page); + short lastFreeSlot = INVALID_SLOT; + short lastFreeSlotBeforeUsedSlot = INVALID_SLOT; + short lastUsedSlot = -1; + + // Rebuild free list. + + for(short i = 0; i < numSlots; i++) { + if(*slot_ptr(page, i) == INVALID_SLOT) { + if(lastFreeSlot == INVALID_SLOT) { + *freelist_ptr(&bufPage) = i; + } else { + *slot_length_ptr(&bufPage, lastFreeSlot) = i; + } + *slot_ptr(&bufPage, i) = INVALID_SLOT; + lastFreeSlot = i; + } else { + lastUsedSlot = i; + lastFreeSlotBeforeUsedSlot = lastFreeSlot; + + short logicalSize = *slot_length_ptr(page, i); + short physicalSize = physical_slot_length(logicalSize); + + memcpy(&(buffer[*freespace_ptr(&bufPage)]), record_ptr(page, i), physicalSize); + + *slot_ptr(&bufPage, i) = *freespace_ptr(&bufPage); + *slot_length_ptr(&bufPage, i) = logicalSize; + + (*freespace_ptr(&bufPage)) += physicalSize; + + } + } + + // Truncate linked list, and update numslots_ptr. + *slot_length_ptr(&bufPage, lastFreeSlotBeforeUsedSlot) = INVALID_SLOT; + *numslots_ptr(&bufPage) = lastUsedSlot+1; + + memcpy(page->memAddr, buffer, PAGE_SIZE); + + fsckSlottedPage(page); + +} + +void slottedCompactOld(Page * page) { + + fsckSlottedPage(page); short i; Page bufPage; @@ -50,9 +224,6 @@ void slottedCompact(Page * page) { for(i = numSlots-1; i >= 0; i--) { if (isValidSlot(page, i)) { - /* printf("copying %d\n", i); - fflush(NULL); */ - /* DEBUG("Buffer offset: %d\n", freeSpace); */ recordid rid; rid.page = -1; @@ -93,7 +264,7 @@ void slottedCompact(Page * page) { memcpy(page->memAddr, buffer, PAGE_SIZE); - assert(slottedFreespaceForSlot(page, -1) || 1); + fsckSlottedPage(page); } @@ -119,6 +290,9 @@ void slottedCompact(Page * page) { // static uint64_t lastFreepage = -10; void slottedPageInit() { +#ifdef SLOTTED_PAGE_CHECK_FOR_OVERLAP + printf("slotted.c: Using expensive page sanity checking.\n"); +#endif /*pthread_mutex_init(&lastFreepage_mutex , NULL); */ //lastFreepage = UINT64_MAX; } @@ -136,6 +310,7 @@ void slottedPageInitialize(Page * page) { *freespace_ptr(page) = 0; *numslots_ptr(page) = 0; *freelist_ptr(page) = INVALID_SLOT; + fsckSlottedPage(page); } size_t slottedFreespaceUnlocked(Page * page); @@ -191,36 +366,37 @@ size_t slottedFreespace(Page * page) { recordid slottedRawRalloc(Page * page, int size) { - int type = size; size = physical_slot_length(type); assert(type != INVALID_SLOT); writelock(page->rwlatch, 342); - assert(*page_type_ptr(page) == SLOTTED_PAGE || *page_type_ptr(page) == BOUNDARY_TAG_PAGE); + fsckSlottedPage(page); recordid rid; rid.page = page->id; rid.slot = *numslots_ptr(page); rid.size = type; // The rid should reflect the fact that this is a special slot. - + /* The freelist_ptr points to the first free slot number, which is the head of a linked list of free slot numbers.*/ if(*freelist_ptr(page) != INVALID_SLOT) { - rid.slot = *freelist_ptr(page); - *freelist_ptr(page) = *slot_length_ptr(page, rid.slot); - *slot_length_ptr(page, rid.slot) = INVALID_SLOT; - } + rid.slot = *freelist_ptr(page); + // really_do_ralloc will look this slot up in the freelist (which + // is O(1), since rid.slot is the head), and then remove it from + // the list. + } really_do_ralloc(page, rid); + assert(*numslots_ptr(page) > rid.slot); assert(type == *slot_length_ptr(page, rid.slot)); assert(size == physical_slot_length(*slot_length_ptr(page, rid.slot))); /* DEBUG("slot: %d freespace: %d\n", rid.slot, freeSpace); */ - assert(slottedFreespaceUnlocked(page) >= 0); + fsckSlottedPage(page); writeunlock(page->rwlatch); @@ -268,6 +444,7 @@ static void really_do_ralloc(Page * page, recordid rid) { } while(next != INVALID_SLOT && next != rid.slot) { last = next; + assert(next < *numslots_ptr(page)); short next_slot_ptr = *slot_ptr(page, next); assert(next_slot_ptr == INVALID_SLOT); next = *slot_length_ptr(page, next); @@ -324,6 +501,7 @@ static void really_do_ralloc(Page * page, recordid rid) { } // Terminate the end of the list. + assert(lastSlot < *numslots_ptr(page)); *slot_length_ptr(page, lastSlot) = INVALID_SLOT; } @@ -332,19 +510,19 @@ static void really_do_ralloc(Page * page, recordid rid) { *numslots_ptr(page) = rid.slot+1; } + assert(*numslots_ptr(page) > rid.slot); + DEBUG("Num slots %d\trid.slot %d\n", *numslots_ptr(page), rid.slot); // Reserve space for this record and record the space's offset in // the slot header. + + assert(rid.slot < *numslots_ptr(page)); *freespace_ptr(page) = freeSpace + physical_slot_length(rid.size); *slot_ptr(page, rid.slot) = freeSpace; - // Remember how long this record is - // if(isBlob) { - //*slot_length_ptr(page, rid.slot = BLOB_SLOT); - //} else { *slot_length_ptr(page, rid.slot) = rid.size; - //} + assert(slottedFreespaceForSlot(page, -1) || 1); } @@ -354,6 +532,7 @@ static void really_do_ralloc(Page * page, recordid rid) { recordid slottedPostRalloc(int xid, Page * page, lsn_t lsn, recordid rid) { writelock(page->rwlatch, 376); + if(rid.size >= BLOB_THRESHOLD_SIZE) { rid.size = BLOB_SLOT; } @@ -384,26 +563,18 @@ recordid slottedPostRalloc(int xid, Page * page, lsn_t lsn, recordid rid) { slottedPageInitialize(page); } + fsckSlottedPage(page); - // Make sure the slot is invalid. If the slot has not been used yet, then - // slot_length_ptr will still be zero, so we allow that too. - if((*slot_length_ptr(page, rid.slot) == 0) || (*slot_ptr(page, rid.slot) == INVALID_SLOT)) { - + if(*slot_ptr(page, rid.slot) == INVALID_SLOT || rid.slot >= *numslots_ptr(page)) { really_do_ralloc(page, rid); } else { // Check to see that the slot happens to be the right size, // so we are (hopefully) just overwriting a slot with - // itself, or that the slot is a blob slot. This can happen - // under normal operation, since really_do_ralloc() must - // be called before and after the log entry is generated. - // (See comment above...) - // @todo Check to see that the blob is the right size? - - /* assert((rid.size == *slot_length_ptr(page, rid.slot)) || - (*slot_length_ptr(page, rid.slot) >= PAGE_SIZE) || - (rid.size >= BLOB_THRESHOLD_SIZE)); */ + // itself. This can happen under normal operation, since + // really_do_ralloc() must be called before and after the + // log entry is generated. (See comment above...) assert(rid.size == *slot_length_ptr(page, rid.slot)); @@ -413,6 +584,7 @@ recordid slottedPostRalloc(int xid, Page * page, lsn_t lsn, recordid rid) { assert(slottedFreespaceForSlot(page, -1) || 1); + fsckSlottedPage(page); writeunlock(page->rwlatch); @@ -421,23 +593,30 @@ recordid slottedPostRalloc(int xid, Page * page, lsn_t lsn, recordid rid) { void slottedDeRalloc(int xid, Page * page, lsn_t lsn, recordid rid) { writelock(page->rwlatch, 443); - // readlock(page->rwlatch, 443); - size_t oldFreeLen = slottedFreespaceUnlocked(page); - *slot_ptr(page, rid.slot) = INVALID_SLOT; - assert(*freelist_ptr(page) < *numslots_ptr(page)); - *slot_length_ptr(page, rid.slot) = *freelist_ptr(page); - *freelist_ptr(page) = rid.slot; - /* *slot_length_ptr(page, rid.slot) = 0; */ - + fsckSlottedPage(page); + + if(*freespace_ptr(page) == *slot_ptr(page, rid.slot) + physical_slot_length(rid.size)) { + (*freespace_ptr(page)) -= physical_slot_length(rid.size); + } + + assert(rid.slot < *numslots_ptr(page)); + if(rid.slot == *numslots_ptr(page)-1) { + (*numslots_ptr(page))--; + } else { + *slot_ptr(page, rid.slot) = INVALID_SLOT; + *slot_length_ptr(page, rid.slot) = *freelist_ptr(page); + *freelist_ptr(page) = rid.slot; + } + pageWriteLSN(xid, page, lsn); - size_t newFreeLen = slottedFreespaceUnlocked(page); - assert(oldFreeLen <= newFreeLen); + fsckSlottedPage(page); unlock(page->rwlatch); } void slottedReadUnlocked(int xid, Page * page, recordid rid, byte *buff) { int slot_length; + fsckSlottedPage(page); assert(page->id == rid.page); slot_length = *slot_length_ptr(page, rid.slot); assert((rid.size == slot_length)); // || (rid.size == BLOB_SLOT && slot_length == sizeof(blob_record_t))|| (slot_length >= PAGE_SIZE)); @@ -446,6 +625,7 @@ void slottedReadUnlocked(int xid, Page * page, recordid rid, byte *buff) { perror("memcpy"); abort(); } + fsckSlottedPage(page); } @@ -460,6 +640,7 @@ void slottedRead(int xid, Page * page, recordid rid, byte *buff) { int slot_length; readlock(page->rwlatch, 519); + fsckSlottedPage(page); // printf("Reading from rid = {%d,%d,%d (%d)}\n", rid.page, rid.slot, rid.size, physical_slot_length(rid.size)); @@ -481,15 +662,21 @@ void slottedRead(int xid, Page * page, recordid rid, byte *buff) { abort(); } + fsckSlottedPage(page); unlock(page->rwlatch); } void slottedWrite(int xid, Page * page, lsn_t lsn, recordid rid, const byte *data) { - int slot_length; readlock(page->rwlatch, 529); + slottedWriteUnlocked(xid, page, lsn, rid, data); + + unlock(page->rwlatch); + + + /* fsckSlottedPage(page); // printf("Writing to rid = {%d,%d,%d}\n", rid.page, rid.slot, rid.size); @@ -504,11 +691,12 @@ void slottedWrite(int xid, Page * page, lsn_t lsn, recordid rid, const byte *dat abort(); } - unlock(page->rwlatch); + fsckSlottedPage(page); */ } void slottedWriteUnlocked(int xid, Page * page, lsn_t lsn, recordid rid, const byte *data) { int slot_length; + fsckSlottedPage(page); // assert(rid.size < PAGE_SIZE); assert(page->id == rid.page); @@ -520,4 +708,5 @@ void slottedWriteUnlocked(int xid, Page * page, lsn_t lsn, recordid rid, const b perror("memcpy"); abort(); } + fsckSlottedPage(page); } diff --git a/src/lladd/page/slotted.h b/src/lladd/page/slotted.h index 8c23e90..fb385c7 100644 --- a/src/lladd/page/slotted.h +++ b/src/lladd/page/slotted.h @@ -64,6 +64,9 @@ Slotted page layout: #define SLOTTED_PAGE_OVERHEAD_PER_RECORD (2 * sizeof(short)) #define SLOTTED_PAGE_HEADER_OVERHEAD (3 * sizeof(short)) +//#define SLOTTED_PAGE_CHECK_FOR_OVERLAP 1 +#define SLOTTED_PAGE_SKIP_SANITY_CHECKS 1 + void slottedWrite(int xid, Page * page, lsn_t lsn, recordid rid, const byte *data); void slottedRead(int xid, Page * page, recordid rid, byte *buff); void slottedWriteUnlocked(int xid, Page * page, lsn_t lsn, recordid rid, const byte *data);