added fsckRegions(), and call it from the unit tests.

This commit is contained in:
Sears Russell 2006-07-25 22:23:15 +00:00
parent aec9148b83
commit 62abe94bf5
4 changed files with 83 additions and 16 deletions

View file

@ -37,4 +37,12 @@ Operation getAllocBoundaryTag();
Operation getAllocRegion(); Operation getAllocRegion();
Operation getDeallocRegion(); Operation getDeallocRegion();
/** This function checks the regions in the page file for consistency.
It makes sure that the doublly linked list is consistent (eg
this->next->prev == this), and it makes sure that all boundary
tags pages (that are marked REGION_ZONED) occur somewhere in the
linked list. */
void fsckRegions(int xid);
// XXX need callbacks to handle transaction commit/abort. // XXX need callbacks to handle transaction commit/abort.

View file

@ -11,11 +11,8 @@ typedef struct regionAllocLogArg{
int allocationManager; int allocationManager;
} regionAllocArg; } regionAllocArg;
#define boundary_tag_ptr(p) (((byte*)end_of_usable_space_ptr((p)))-sizeof(boundary_tag_t))
pthread_mutex_t region_mutex = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t region_mutex = PTHREAD_MUTEX_INITIALIZER;
static void TregionAllocHelper(int xid, unsigned int pageid, unsigned int pageCount, int allocationManager); static void TregionAllocHelper(int xid, unsigned int pageid, unsigned int pageCount, int allocationManager);
static int operate_alloc_boundary_tag(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) { static int operate_alloc_boundary_tag(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) {
@ -42,24 +39,29 @@ static int operate_dealloc_region(int xid, Page * p, lsn_t lsn, recordid rid, co
// TODO: Implement these four functions. // TODO: Implement these four functions.
static void TallocBoundaryTag(int xid, unsigned int page, boundary_tag* tag) { static void TallocBoundaryTag(int xid, unsigned int page, boundary_tag* tag) {
// printf("Alloc boundary tag at %d\n", page); // printf("Alloc boundary tag at %d = { %d, %d, %d }\n", page, tag->size, tag->prev_size, tag->status);
recordid rid = {page, 0, sizeof(boundary_tag)}; recordid rid = {page, 0, sizeof(boundary_tag)};
Tupdate(xid, rid, tag, OPERATION_ALLOC_BOUNDARY_TAG); Tupdate(xid, rid, tag, OPERATION_ALLOC_BOUNDARY_TAG);
} }
static void TdeallocBoundaryTag(int xid, unsigned int page) {
//no-op
}
static void TreadBoundaryTag(int xid, unsigned int page, boundary_tag* tag) { static void TreadBoundaryTag(int xid, unsigned int page, boundary_tag* tag) {
recordid rid = { page, 0, sizeof(boundary_tag) }; recordid rid = { page, 0, sizeof(boundary_tag) };
Tread(xid, rid, tag); Tread(xid, rid, tag);
// printf("Read boundary tag at %d = { %d, %d, %d }\n", page, tag->size, tag->prev_size, tag->status);
} }
static void TsetBoundaryTag(int xid, unsigned int page, boundary_tag* tag) { static void TsetBoundaryTag(int xid, unsigned int page, boundary_tag* tag) {
// printf("Writing boundary tag at %d\n", page); // printf("Write boundary tag at %d = { %d, %d, %d }\n", page, tag->size, tag->prev_size, tag->status);
recordid rid = { page, 0, sizeof(boundary_tag) }; recordid rid = { page, 0, sizeof(boundary_tag) };
Tset(xid, rid, tag); Tset(xid, rid, tag);
} }
static void TdeallocBoundaryTag(int xid, unsigned int page) {
boundary_tag t;
TreadBoundaryTag(xid, page, &t);
t.status = REGION_CONDEMNED;
TsetBoundaryTag(xid, page, &t);
}
void regionsInit() { void regionsInit() {
Page * p = loadPage(-1, 0); Page * p = loadPage(-1, 0);
int pageType = *page_type_ptr(p); int pageType = *page_type_ptr(p);
@ -84,6 +86,51 @@ void regionsInit() {
releasePage(p); releasePage(p);
} }
void fsckRegions(int xid) {
// Ignore region_xid, allocation_manager for now.
int pageType;
boundary_tag tag;
boundary_tag prev_tag;
prev_tag.size = UINT32_MAX;
int tagPage = 0;
pageType = TpageGetType(xid, tagPage);
assert(pageType == BOUNDARY_TAG_PAGE);
TreadBoundaryTag(xid, tagPage, &tag);
assert(tag.prev_size == UINT32_MAX);
while(tag.size != UINT32_MAX) {
// Ignore region_xid, allocation_manager for now.
assert(tag.status == REGION_VACANT || tag.status == REGION_ZONED);
assert(prev_tag.size == tag.prev_size);
for(int i = 0; i < tag.size; i++) {
int thisPage = tagPage + 1 + i;
pageType = TpageGetType(xid, thisPage);
if(pageType == BOUNDARY_TAG_PAGE) {
boundary_tag orphan;
TreadBoundaryTag(xid, thisPage, &orphan);
assert(orphan.status == REGION_CONDEMNED);
Page * p = loadPage(xid, thisPage);
fsckSlottedPage(p);
releasePage(p);
} else if (pageType == SLOTTED_PAGE) {
Page * p = loadPage(xid, thisPage);
fsckSlottedPage(p);
releasePage(p);
}
}
prev_tag = tag;
tagPage = tagPage + 1 + prev_tag.size;
TreadBoundaryTag(xid, tagPage, &tag);
}
assert(tag.status == REGION_VACANT); // space at EOF better be vacant!
}
static void TregionAllocHelper(int xid, unsigned int pageid, unsigned int pageCount, int allocationManager) { static void TregionAllocHelper(int xid, unsigned int pageid, unsigned int pageCount, int allocationManager) {
boundary_tag t; boundary_tag t;
TreadBoundaryTag(xid, pageid, &t); TreadBoundaryTag(xid, pageid, &t);
@ -191,12 +238,12 @@ void TregionDealloc(int xid, unsigned int firstPage) {
TreadBoundaryTag(xid, succ_page, &succ_tag); TreadBoundaryTag(xid, succ_page, &succ_tag);
// TODO: Check page_type_ptr()... // TODO: Check page_type_ptr()...
if(succ_tag.size == UINT32_MAX) { if(succ_tag.size == UINT32_MAX) {
t.size = UINT32_MAX; t.size = UINT32_MAX;
assert(succ_tag.status == REGION_VACANT);
// TODO: Truncate page file. // TODO: Truncate page file.
TdeallocBoundaryTag(xid, succ_page); TdeallocBoundaryTag(xid, succ_page);
} else if(succ_tag.status == REGION_VACANT) { } else if(succ_tag.status == REGION_VACANT) {
t.size = t.size + succ_tag.size + 1; t.size = t.size + succ_tag.size + 1;
@ -208,8 +255,7 @@ void TregionDealloc(int xid, unsigned int firstPage) {
succ_succ_tag.prev_size = t.size; succ_succ_tag.prev_size = t.size;
TsetBoundaryTag(xid, succ_succ_page, &succ_succ_tag); TsetBoundaryTag(xid, succ_succ_page, &succ_succ_tag);
TsetBoundaryTag(xid, succ_page, &succ_tag); TdeallocBoundaryTag(xid, succ_page);
} }
} }
@ -226,6 +272,9 @@ void TregionDealloc(int xid, unsigned int firstPage) {
TreadBoundaryTag(xid, pred_page, &pred_tag); TreadBoundaryTag(xid, pred_page, &pred_tag);
if(pred_tag.status == REGION_VACANT) { if(pred_tag.status == REGION_VACANT) {
TdeallocBoundaryTag(xid, firstPage -1);
if(t.size == UINT32_MAX) { if(t.size == UINT32_MAX) {
pred_tag.size = UINT32_MAX; pred_tag.size = UINT32_MAX;
@ -248,7 +297,6 @@ void TregionDealloc(int xid, unsigned int firstPage) {
} }
TsetBoundaryTag(xid, pred_page, &pred_tag); TsetBoundaryTag(xid, pred_page, &pred_tag);
TdeallocBoundaryTag(xid, firstPage -1);
} else { } else {
TsetBoundaryTag(xid, firstPage - 1, &t); TsetBoundaryTag(xid, firstPage - 1, &t);

View file

@ -213,3 +213,5 @@ int slottedGetType(Page * p, int slot);
void slottedSetType(Page * p, int slot, int type); void slottedSetType(Page * p, int slot, int type);
/** The caller of this function must have a write lock on the page. */ /** The caller of this function must have a write lock on the page. */
void slottedCompact(Page * page); void slottedCompact(Page * page);
void fsckSlottedPage(const Page const * page);

View file

@ -106,6 +106,9 @@ START_TEST(regions_smokeTest) {
max_page = new_page + 2; max_page = new_page + 2;
} }
} }
fsckRegions(xid);
Tcommit(xid); Tcommit(xid);
printf("\nMaximum space usage = %d, best possible = %d\n", max_page, 104); // peak storage usage = 100 pages + 1 page + 3 boundary pages. printf("\nMaximum space usage = %d, best possible = %d\n", max_page, 104); // peak storage usage = 100 pages + 1 page + 3 boundary pages.
@ -128,6 +131,11 @@ START_TEST(regions_randomizedTest) {
unsigned int max_size = 0; unsigned int max_size = 0;
unsigned int max_ideal_size = 0; unsigned int max_ideal_size = 0;
for(int i = 0; i < 10000; i++) { for(int i = 0; i < 10000; i++) {
if(!(i % 100)) {
fsckRegions(xid);
}
if(myrandom(2)) { if(myrandom(2)) {
unsigned int size = myrandom(100); unsigned int size = myrandom(100);
TregionAlloc(xid, size, 1); TregionAlloc(xid, size, 1);
@ -175,6 +183,7 @@ START_TEST(regions_randomizedTest) {
} }
} }
} }
fsckRegions(xid);
Tcommit(xid); Tcommit(xid);
Tdeinit(); Tdeinit();
if((double)max_size/(double)max_ideal_size > 5) { if((double)max_size/(double)max_ideal_size > 5) {