Rewrote slottedCompact; added fsckSlottedPage

This commit is contained in:
Sears Russell 2006-07-22 00:01:42 +00:00
parent 35b0978859
commit 2af892709f
2 changed files with 236 additions and 44 deletions

View file

@ -2,12 +2,126 @@
#include "../page.h"
//#include "../blobManager.h" /** So that we can call sizeof(blob_record_t) */
#include "slotted.h"
#include <assert.h>
static void really_do_ralloc(Page * page, recordid rid) ;
size_t slottedFreespaceForSlot(Page * page, int slot);
void fsckSlottedPage(const Page const * page) {
#ifndef SLOTTED_PAGE_SKIP_SANITY_CHECKS
Page dummy;
dummy.id = -1;
dummy.memAddr = 0;
const short page_type = *page_type_ptr(page);
const short numslots = *numslots_ptr(page);
const short freespace = *freespace_ptr(page);
const short freelist = *freelist_ptr(page);
const long slotListStart = (long)slot_length_ptr(&dummy, numslots-1);
assert(slotListStart < PAGE_SIZE && slotListStart >= 0);
assert(page_type == SLOTTED_PAGE ||
page_type == BOUNDARY_TAG_PAGE);
assert(numslots >= 0);
assert(numslots * SLOTTED_PAGE_OVERHEAD_PER_RECORD < PAGE_SIZE);
assert(freespace >= 0);
assert(freespace <= slotListStart);
assert(freelist >= INVALID_SLOT);
assert(freelist < numslots);
// Now, check integrity of freelist. All free slots less than numslots should be on it, in order.
short * slot_offsets = alloca(numslots * sizeof(short));
short * slot_lengths = alloca(numslots * sizeof(short));
for(int i = 0; i < numslots; i++) {
slot_offsets[i] = *slot_ptr(page, i);
slot_lengths[i] = *slot_length_ptr(page, i);
}
short foundEndOfList = 0;
if(freelist != INVALID_SLOT) {
assert(slot_offsets[freelist] == INVALID_SLOT);
} else {
foundEndOfList = 1;
}
for(short i = 0; i < numslots; i++) {
const short slot_length = slot_lengths[i];
const short slot_offset = slot_offsets[i];
if(slot_offset == INVALID_SLOT) {
/* if(last_freeslot == INVALID_SLOT) {
assert(freelist == i);
} else {
assert(last_freeslot_length == i);
}
last_freeslot = i;
last_freeslot_length = slot_length; */
if(slot_length == INVALID_SLOT) {
assert(!foundEndOfList);
foundEndOfList = 1;
} else {
assert (slot_offsets[slot_length] == INVALID_SLOT);
}
} else {
assert(slot_offset + slot_length <= freespace);
}
}
// Is the free list terminated?
assert(foundEndOfList);
#ifdef SLOTTED_PAGE_CHECK_FOR_OVERLAP
const byte UNUSED = 0xFF;
const byte PAGE_HEADER = 0xFE;
const byte SLOTTED_HEADER = 0xFD;
// const byte SLOT_LIST = 0xFC;
const byte FREE_SPACE = 0xFB;
const unsigned short S_SLOT_LIST = 0xFCFC;
byte image[PAGE_SIZE];
for(short i = 0; i < PAGE_SIZE; i++) {
image[i] = UNUSED;
}
for(short i = USABLE_SIZE_OF_PAGE; i < PAGE_SIZE; i++) {
image[i] = PAGE_HEADER;
}
for(short i = USABLE_SIZE_OF_PAGE - SLOTTED_PAGE_HEADER_OVERHEAD; i < USABLE_SIZE_OF_PAGE; i++) {
image[i] = SLOTTED_HEADER;
}
for(short i = *freespace_ptr(page); i < slotListStart; i++) {
image[i] = FREE_SPACE;
}
dummy.memAddr = image;
for(short i = 0; i < *numslots_ptr(page); i++) {
*slot_ptr(&dummy, i) = S_SLOT_LIST;
*slot_length_ptr(&dummy, i) = S_SLOT_LIST;
}
for(short i = 0; i < *numslots_ptr(page); i++) {
short slot_offset = *slot_ptr(page, i);
if(slot_offset != INVALID_SLOT) {
const unsigned char ci = i % 0xFF;
short slot_len = physical_slot_length(*slot_length_ptr(page, i));
for(short j = 0; j < slot_len; j++) {
assert(image[slot_offset + j] == 0xFF);
image[slot_offset + j] = ci;
}
}
}
#endif // SLOTTED_PAGE_CHECK_FOR_OVERLAP
#endif // SLOTTED_PAGE_SKIP_SANITY_CHECKS
}
/**
Move all of the records to the beginning of the page in order to
@ -17,6 +131,66 @@ The caller of this function must have a writelock on the page.
*/
void slottedCompact(Page * page) {
Page bufPage;
byte buffer[PAGE_SIZE];
bufPage.memAddr = buffer;
// Copy external headers into bufPage.
memcpy(&buffer[USABLE_SIZE_OF_PAGE], &(page->memAddr[USABLE_SIZE_OF_PAGE]), PAGE_SIZE - USABLE_SIZE_OF_PAGE);
// Now, build new slotted page in the bufPage struct.
*freespace_ptr(&bufPage) = 0;
// numslots_ptr will be set later.
*freelist_ptr(&bufPage) = INVALID_SLOT;
const short numSlots = *numslots_ptr(page);
short lastFreeSlot = INVALID_SLOT;
short lastFreeSlotBeforeUsedSlot = INVALID_SLOT;
short lastUsedSlot = -1;
// Rebuild free list.
for(short i = 0; i < numSlots; i++) {
if(*slot_ptr(page, i) == INVALID_SLOT) {
if(lastFreeSlot == INVALID_SLOT) {
*freelist_ptr(&bufPage) = i;
} else {
*slot_length_ptr(&bufPage, lastFreeSlot) = i;
}
*slot_ptr(&bufPage, i) = INVALID_SLOT;
lastFreeSlot = i;
} else {
lastUsedSlot = i;
lastFreeSlotBeforeUsedSlot = lastFreeSlot;
short logicalSize = *slot_length_ptr(page, i);
short physicalSize = physical_slot_length(logicalSize);
memcpy(&(buffer[*freespace_ptr(&bufPage)]), record_ptr(page, i), physicalSize);
*slot_ptr(&bufPage, i) = *freespace_ptr(&bufPage);
*slot_length_ptr(&bufPage, i) = logicalSize;
(*freespace_ptr(&bufPage)) += physicalSize;
}
}
// Truncate linked list, and update numslots_ptr.
*slot_length_ptr(&bufPage, lastFreeSlotBeforeUsedSlot) = INVALID_SLOT;
*numslots_ptr(&bufPage) = lastUsedSlot+1;
memcpy(page->memAddr, buffer, PAGE_SIZE);
fsckSlottedPage(page);
}
void slottedCompactOld(Page * page) {
fsckSlottedPage(page);
short i;
Page bufPage;
@ -50,9 +224,6 @@ void slottedCompact(Page * page) {
for(i = numSlots-1; i >= 0; i--) {
if (isValidSlot(page, i)) {
/* printf("copying %d\n", i);
fflush(NULL); */
/* DEBUG("Buffer offset: %d\n", freeSpace); */
recordid rid;
rid.page = -1;
@ -93,7 +264,7 @@ void slottedCompact(Page * page) {
memcpy(page->memAddr, buffer, PAGE_SIZE);
assert(slottedFreespaceForSlot(page, -1) || 1);
fsckSlottedPage(page);
}
@ -119,6 +290,9 @@ void slottedCompact(Page * page) {
// static uint64_t lastFreepage = -10;
void slottedPageInit() {
#ifdef SLOTTED_PAGE_CHECK_FOR_OVERLAP
printf("slotted.c: Using expensive page sanity checking.\n");
#endif
/*pthread_mutex_init(&lastFreepage_mutex , NULL); */
//lastFreepage = UINT64_MAX;
}
@ -136,6 +310,7 @@ void slottedPageInitialize(Page * page) {
*freespace_ptr(page) = 0;
*numslots_ptr(page) = 0;
*freelist_ptr(page) = INVALID_SLOT;
fsckSlottedPage(page);
}
size_t slottedFreespaceUnlocked(Page * page);
@ -191,13 +366,12 @@ size_t slottedFreespace(Page * page) {
recordid slottedRawRalloc(Page * page, int size) {
int type = size;
size = physical_slot_length(type);
assert(type != INVALID_SLOT);
writelock(page->rwlatch, 342);
assert(*page_type_ptr(page) == SLOTTED_PAGE || *page_type_ptr(page) == BOUNDARY_TAG_PAGE);
fsckSlottedPage(page);
recordid rid;
@ -209,18 +383,20 @@ recordid slottedRawRalloc(Page * page, int size) {
is the head of a linked list of free slot numbers.*/
if(*freelist_ptr(page) != INVALID_SLOT) {
rid.slot = *freelist_ptr(page);
*freelist_ptr(page) = *slot_length_ptr(page, rid.slot);
*slot_length_ptr(page, rid.slot) = INVALID_SLOT;
// really_do_ralloc will look this slot up in the freelist (which
// is O(1), since rid.slot is the head), and then remove it from
// the list.
}
really_do_ralloc(page, rid);
assert(*numslots_ptr(page) > rid.slot);
assert(type == *slot_length_ptr(page, rid.slot));
assert(size == physical_slot_length(*slot_length_ptr(page, rid.slot)));
/* DEBUG("slot: %d freespace: %d\n", rid.slot, freeSpace); */
assert(slottedFreespaceUnlocked(page) >= 0);
fsckSlottedPage(page);
writeunlock(page->rwlatch);
@ -268,6 +444,7 @@ static void really_do_ralloc(Page * page, recordid rid) {
}
while(next != INVALID_SLOT && next != rid.slot) {
last = next;
assert(next < *numslots_ptr(page));
short next_slot_ptr = *slot_ptr(page, next);
assert(next_slot_ptr == INVALID_SLOT);
next = *slot_length_ptr(page, next);
@ -324,6 +501,7 @@ static void really_do_ralloc(Page * page, recordid rid) {
}
// Terminate the end of the list.
assert(lastSlot < *numslots_ptr(page));
*slot_length_ptr(page, lastSlot) = INVALID_SLOT;
}
@ -332,19 +510,19 @@ static void really_do_ralloc(Page * page, recordid rid) {
*numslots_ptr(page) = rid.slot+1;
}
assert(*numslots_ptr(page) > rid.slot);
DEBUG("Num slots %d\trid.slot %d\n", *numslots_ptr(page), rid.slot);
// Reserve space for this record and record the space's offset in
// the slot header.
assert(rid.slot < *numslots_ptr(page));
*freespace_ptr(page) = freeSpace + physical_slot_length(rid.size);
*slot_ptr(page, rid.slot) = freeSpace;
// Remember how long this record is
// if(isBlob) {
//*slot_length_ptr(page, rid.slot = BLOB_SLOT);
//} else {
*slot_length_ptr(page, rid.slot) = rid.size;
//}
assert(slottedFreespaceForSlot(page, -1) || 1);
}
@ -354,6 +532,7 @@ static void really_do_ralloc(Page * page, recordid rid) {
recordid slottedPostRalloc(int xid, Page * page, lsn_t lsn, recordid rid) {
writelock(page->rwlatch, 376);
if(rid.size >= BLOB_THRESHOLD_SIZE) {
rid.size = BLOB_SLOT;
}
@ -384,26 +563,18 @@ recordid slottedPostRalloc(int xid, Page * page, lsn_t lsn, recordid rid) {
slottedPageInitialize(page);
}
fsckSlottedPage(page);
// Make sure the slot is invalid. If the slot has not been used yet, then
// slot_length_ptr will still be zero, so we allow that too.
if((*slot_length_ptr(page, rid.slot) == 0) || (*slot_ptr(page, rid.slot) == INVALID_SLOT)) {
if(*slot_ptr(page, rid.slot) == INVALID_SLOT || rid.slot >= *numslots_ptr(page)) {
really_do_ralloc(page, rid);
} else {
// Check to see that the slot happens to be the right size,
// so we are (hopefully) just overwriting a slot with
// itself, or that the slot is a blob slot. This can happen
// under normal operation, since really_do_ralloc() must
// be called before and after the log entry is generated.
// (See comment above...)
// @todo Check to see that the blob is the right size?
/* assert((rid.size == *slot_length_ptr(page, rid.slot)) ||
(*slot_length_ptr(page, rid.slot) >= PAGE_SIZE) ||
(rid.size >= BLOB_THRESHOLD_SIZE)); */
// itself. This can happen under normal operation, since
// really_do_ralloc() must be called before and after the
// log entry is generated. (See comment above...)
assert(rid.size == *slot_length_ptr(page, rid.slot));
@ -413,6 +584,7 @@ recordid slottedPostRalloc(int xid, Page * page, lsn_t lsn, recordid rid) {
assert(slottedFreespaceForSlot(page, -1) || 1);
fsckSlottedPage(page);
writeunlock(page->rwlatch);
@ -421,23 +593,30 @@ recordid slottedPostRalloc(int xid, Page * page, lsn_t lsn, recordid rid) {
void slottedDeRalloc(int xid, Page * page, lsn_t lsn, recordid rid) {
writelock(page->rwlatch, 443);
// readlock(page->rwlatch, 443);
size_t oldFreeLen = slottedFreespaceUnlocked(page);
*slot_ptr(page, rid.slot) = INVALID_SLOT;
assert(*freelist_ptr(page) < *numslots_ptr(page));
*slot_length_ptr(page, rid.slot) = *freelist_ptr(page);
*freelist_ptr(page) = rid.slot;
/* *slot_length_ptr(page, rid.slot) = 0; */
fsckSlottedPage(page);
if(*freespace_ptr(page) == *slot_ptr(page, rid.slot) + physical_slot_length(rid.size)) {
(*freespace_ptr(page)) -= physical_slot_length(rid.size);
}
assert(rid.slot < *numslots_ptr(page));
if(rid.slot == *numslots_ptr(page)-1) {
(*numslots_ptr(page))--;
} else {
*slot_ptr(page, rid.slot) = INVALID_SLOT;
*slot_length_ptr(page, rid.slot) = *freelist_ptr(page);
*freelist_ptr(page) = rid.slot;
}
pageWriteLSN(xid, page, lsn);
size_t newFreeLen = slottedFreespaceUnlocked(page);
assert(oldFreeLen <= newFreeLen);
fsckSlottedPage(page);
unlock(page->rwlatch);
}
void slottedReadUnlocked(int xid, Page * page, recordid rid, byte *buff) {
int slot_length;
fsckSlottedPage(page);
assert(page->id == rid.page);
slot_length = *slot_length_ptr(page, rid.slot);
assert((rid.size == slot_length)); // || (rid.size == BLOB_SLOT && slot_length == sizeof(blob_record_t))|| (slot_length >= PAGE_SIZE));
@ -446,6 +625,7 @@ void slottedReadUnlocked(int xid, Page * page, recordid rid, byte *buff) {
perror("memcpy");
abort();
}
fsckSlottedPage(page);
}
@ -460,6 +640,7 @@ void slottedRead(int xid, Page * page, recordid rid, byte *buff) {
int slot_length;
readlock(page->rwlatch, 519);
fsckSlottedPage(page);
// printf("Reading from rid = {%d,%d,%d (%d)}\n", rid.page, rid.slot, rid.size, physical_slot_length(rid.size));
@ -481,15 +662,21 @@ void slottedRead(int xid, Page * page, recordid rid, byte *buff) {
abort();
}
fsckSlottedPage(page);
unlock(page->rwlatch);
}
void slottedWrite(int xid, Page * page, lsn_t lsn, recordid rid, const byte *data) {
int slot_length;
readlock(page->rwlatch, 529);
slottedWriteUnlocked(xid, page, lsn, rid, data);
unlock(page->rwlatch);
/* fsckSlottedPage(page);
// printf("Writing to rid = {%d,%d,%d}\n", rid.page, rid.slot, rid.size);
@ -504,11 +691,12 @@ void slottedWrite(int xid, Page * page, lsn_t lsn, recordid rid, const byte *dat
abort();
}
unlock(page->rwlatch);
fsckSlottedPage(page); */
}
void slottedWriteUnlocked(int xid, Page * page, lsn_t lsn, recordid rid, const byte *data) {
int slot_length;
fsckSlottedPage(page);
// assert(rid.size < PAGE_SIZE);
assert(page->id == rid.page);
@ -520,4 +708,5 @@ void slottedWriteUnlocked(int xid, Page * page, lsn_t lsn, recordid rid, const b
perror("memcpy");
abort();
}
fsckSlottedPage(page);
}

View file

@ -64,6 +64,9 @@ Slotted page layout:
#define SLOTTED_PAGE_OVERHEAD_PER_RECORD (2 * sizeof(short))
#define SLOTTED_PAGE_HEADER_OVERHEAD (3 * sizeof(short))
//#define SLOTTED_PAGE_CHECK_FOR_OVERLAP 1
#define SLOTTED_PAGE_SKIP_SANITY_CHECKS 1
void slottedWrite(int xid, Page * page, lsn_t lsn, recordid rid, const byte *data);
void slottedRead(int xid, Page * page, recordid rid, byte *buff);
void slottedWriteUnlocked(int xid, Page * page, lsn_t lsn, recordid rid, const byte *data);