Fixed nasty bug that was causing the LLADD header to be lost (!!)

This commit is contained in:
Sears Russell 2004-12-06 01:20:48 +00:00
parent 72070acb67
commit 55b0ddf1b6
6 changed files with 100 additions and 49 deletions

View file

@ -103,7 +103,7 @@ void bufDeinit() {
for( p = (Page*)pblHtFirst( activePages ); p; p = (Page*)pblHtNext(activePages)) { for( p = (Page*)pblHtFirst( activePages ); p; p = (Page*)pblHtNext(activePages)) {
pblHtRemove( activePages, 0, 0 ); // pblHtRemove( activePages, 0, 0 );
DEBUG("+"); DEBUG("+");
/** @todo No one seems to set the dirty flag... */ /** @todo No one seems to set the dirty flag... */
/*if(p->dirty && (ret = pageWrite(p)/ *flushPage(*p)* /)) { /*if(p->dirty && (ret = pageWrite(p)/ *flushPage(*p)* /)) {

View file

@ -35,12 +35,12 @@
static int operate(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) { static int operate(int xid, Page * p, lsn_t lsn, recordid rid, const void * dat) {
/* * @ todo Currently, Talloc() needs to clean up the page type (for recovery). Should this be elsewhere? */ /* * @ todo Currently, Talloc() needs to clean up the page type (for recovery). Should this be elsewhere? */
/* if(*page_type_ptr(p) == UNINITIALIZED_PAGE) { /* if(*page_type_ptr(p) == UNINITIALIZED_PAGE) {
*page_type_ptr(p) = SLOTTED_PAGE; *page_type_ptr(p) = SLOTTED_PAGE;
} */ }
/* assert(*page_type_ptr(p) == SLOTTED_PAGE); */
assert(*page_type_ptr(p) == SLOTTED_PAGE); */
if(rid.size >= BLOB_THRESHOLD_SIZE) { if(rid.size >= BLOB_THRESHOLD_SIZE) {
allocBlob(xid, p, lsn, rid); allocBlob(xid, p, lsn, rid);
} else { } else {

View file

@ -8,7 +8,9 @@
#include "../page/header.h" #include "../page/header.h"
#include "../pageFile.h" #include "../pageFile.h"
#ifdef REUSE_PAGES
static int freelist; static int freelist;
#endif
static int freepage; static int freepage;
static pthread_mutex_t pageAllocMutex; static pthread_mutex_t pageAllocMutex;
@ -41,11 +43,11 @@ typedef struct {
int after; int after;
} update_tuple; } update_tuple;
int __update_freespace(int xid, Page * p, lsn_t lsn, recordid r, const void * d) { int __update_freepage(int xid, Page * p, lsn_t lsn, recordid r, const void * d) {
assert(r.page == 0); assert(r.page == 0);
const update_tuple * t = d; const update_tuple * t = d;
/* printf("freespace %d -> %d\n", t->before, t->after); /* printf("freepage %d -> %d\n", t->before, t->after);
fflush(NULL); */ fflush(NULL); */
* headerFreepage_ptr(p) = t->after; * headerFreepage_ptr(p) = t->after;
freepage = t->after; freepage = t->after;
pageWriteLSN(p, lsn); pageWriteLSN(p, lsn);
@ -56,8 +58,8 @@ int __update_freespace_inverse(int xid, Page * p, lsn_t lsn, recordid r, const v
#ifdef REUSE_PAGES #ifdef REUSE_PAGES
assert(r.page == 0); assert(r.page == 0);
const update_tuple * t = d; const update_tuple * t = d;
/* printf("freespace %d <- %d\n", t->before, t->after); /* ("freespace %d <- %d\n", t->before, t->after);
fflush(NULL); */ fflush(NULL); */
* headerFreepage_ptr(p) = t->before; * headerFreepage_ptr(p) = t->before;
freepage = t->before; freepage = t->before;
@ -65,6 +67,7 @@ int __update_freespace_inverse(int xid, Page * p, lsn_t lsn, recordid r, const v
pageWriteLSN(p, lsn); pageWriteLSN(p, lsn);
return 0; return 0;
} }
#ifdef REUSE_PAGES
/** @todo need to hold mutex here... */ /** @todo need to hold mutex here... */
int __update_freelist(int xid, Page * p, lsn_t lsn, recordid r, const void * d) { int __update_freelist(int xid, Page * p, lsn_t lsn, recordid r, const void * d) {
assert(r.page == 0); assert(r.page == 0);
@ -72,7 +75,7 @@ int __update_freelist(int xid, Page * p, lsn_t lsn, recordid r, const void * d)
const update_tuple * t = d; const update_tuple * t = d;
/* printf("freelist %d -> %d\n", t->before, t->after); /* printf("freelist %d -> %d\n", t->before, t->after);
fflush(NULL); */ fflush(NULL); */
* headerFreepagelist_ptr(p) = t->after; * headerFreepagelist_ptr(p) = t->after;
freelist = t->after; freelist = t->after;
@ -83,19 +86,19 @@ int __update_freelist_inverse(int xid, Page * p, lsn_t lsn, recordid r, const vo
assert(r.page == 0); assert(r.page == 0);
const update_tuple * t = d; const update_tuple * t = d;
/* printf("freelist %d <- %d\n", t->before, t->after); /* printf("freelist %d <- %d\n", t->before, t->after);
fflush(NULL); */ fflush(NULL); */
* headerFreepagelist_ptr(p) = t->before; * headerFreepagelist_ptr(p) = t->before;
freelist = t->before; freelist = t->before;
pageWriteLSN(p, lsn); pageWriteLSN(p, lsn);
return 0; return 0;
} }
#endif
int __free_page(int xid, Page * p, lsn_t lsn, recordid r, const void * d) { int __free_page(int xid, Page * p, lsn_t lsn, recordid r, const void * d) {
const int * successor = d; const int * successor = d;
/* printf("Unallocing page %d\n", r.page); /*printf("Unallocing page %d\n", r.page);
fflush(NULL); */ fflush(NULL); */
memset(p->memAddr, 0, PAGE_SIZE); memset(p->memAddr, 0, PAGE_SIZE);
*page_type_ptr(p) = LLADD_FREE_PAGE; *page_type_ptr(p) = LLADD_FREE_PAGE;
*nextfreepage_ptr(p) = *successor; *nextfreepage_ptr(p) = *successor;
@ -130,28 +133,35 @@ int TpageSet(int xid, int pageid, byte * memAddr) {
since it needs to perform raw, synchronous I/O on the pagefile for since it needs to perform raw, synchronous I/O on the pagefile for
bootstrapping purposes. */ bootstrapping purposes. */
void pageOperationsInit() { void pageOperationsInit() {
Page p; /* Page p;
p.rwlatch = initlock(); p.rwlatch = initlock();
p.loadlatch = initlock(); p.loadlatch = initlock();
assert(!posix_memalign((void **)&(p.memAddr), PAGE_SIZE, PAGE_SIZE)); assert(!posix_memalign((void **)&(p.memAddr), PAGE_SIZE, PAGE_SIZE));
p.id = 0; p.id = 0;*/
pageRead(&p); Page * p = loadPage(0);
// pageRead(&p);
if(*page_type_ptr(&p) != LLADD_HEADER_PAGE) { if(*page_type_ptr(p) != LLADD_HEADER_PAGE) {
headerPageInitialize(&p); /*printf("Writing new LLADD header\n"); fflush(NULL); */
pageWrite(&p); headerPageInitialize(p);
// pageWrite(p);
} else {
/*printf("Found LLADD header.\n"); fflush(NULL);*/
} }
#ifdef REUSE_PAGES
freelist = *headerFreepagelist_ptr(&p); freelist = *headerFreepagelist_ptr(p);
freepage = *headerFreepage_ptr(&p); #endif
freepage = *headerFreepage_ptr(p);
assert(freepage); assert(freepage);
/* free(p.memAddr); */ /* free(p.memAddr); */
deletelock(p.loadlatch); //deletelock(p.loadlatch);
deletelock(p.rwlatch); //deletelock(p.rwlatch);
releasePage(p);
pthread_mutex_init(&pageAllocMutex, NULL); pthread_mutex_init(&pageAllocMutex, NULL);
@ -208,12 +218,12 @@ int TpageDealloc(int xid, int pageid) {
t.before = freelist; t.before = freelist;
#endif //#endif
Tupdate(xid, rid, &freelist, OPERATION_FREE_PAGE); Tupdate(xid, rid, &freelist, OPERATION_FREE_PAGE);
#ifdef REUSE_PAGES //#ifdef REUSE_PAGES
t.after = pageid; t.after = pageid;
freelist = pageid; freelist = pageid;
@ -238,6 +248,7 @@ int TpageAlloc(int xid /*, int type */) {
pthread_mutex_lock(&pageAllocMutex); pthread_mutex_lock(&pageAllocMutex);
int newpage; int newpage;
/*printf("TpageAlloc\n"); fflush(NULL); */
#ifdef REUSE_PAGES #ifdef REUSE_PAGES
if(freelist) { if(freelist) {
@ -267,14 +278,16 @@ int TpageAlloc(int xid /*, int type */) {
} else { } else {
#endif #endif
/* printf("Allocing new page: %d\n", freepage); /*printf("Allocing new page: %d\n", freepage);
fflush(NULL); */ fflush(NULL); */
t.before = freepage; t.before = freepage;
newpage = freepage; newpage = freepage;
freepage++; freepage++;
t.after = freepage; t.after = freepage;
/* Don't need to touch the new page. */
/*printf("next freepage: %d\n", freepage); */
/* Don't need to touch the new page. */
rid.page = 0; rid.page = 0;
Tupdate(xid, rid, &t, OPERATION_UPDATE_FREESPACE); Tupdate(xid, rid, &t, OPERATION_UPDATE_FREESPACE);
@ -285,7 +298,7 @@ int TpageAlloc(int xid /*, int type */) {
#endif #endif
pthread_mutex_unlock(&pageAllocMutex); pthread_mutex_unlock(&pageAllocMutex);
/*printf("TpageAlloc alloced page %d\n", newpage); fflush(NULL); */
return newpage; return newpage;
} }
@ -344,7 +357,7 @@ Operation getUpdateFreespace() {
OPERATION_UPDATE_FREESPACE, OPERATION_UPDATE_FREESPACE,
sizeof(update_tuple), sizeof(update_tuple),
/* OPERATION_UPDATE_FREESPACE_INVERSE, */ OPERATION_NOOP, /* OPERATION_UPDATE_FREESPACE_INVERSE, */ OPERATION_NOOP,
&__update_freespace &__update_freepage
}; };
return o; return o;
} }
@ -359,14 +372,17 @@ Operation getUpdateFreespaceInverse() {
return o; return o;
} }
Operation getUpdateFreelist() { Operation getUpdateFreelist() {
Operation o = { Operation o = {
OPERATION_UPDATE_FREELIST, OPERATION_UPDATE_FREELIST,
sizeof(update_tuple), sizeof(update_tuple),
OPERATION_NOOP, OPERATION_NOOP,
#ifdef REUSE_PAGES
&__update_freelist &__update_freelist
}; #else
NULL
#endif
};
return o; return o;
} }
@ -375,8 +391,12 @@ Operation getUpdateFreelistInverse() {
OPERATION_UPDATE_FREELIST_INVERSE, OPERATION_UPDATE_FREELIST_INVERSE,
sizeof(update_tuple), sizeof(update_tuple),
OPERATION_UPDATE_FREELIST, OPERATION_UPDATE_FREELIST,
#ifdef REUSE_PAGES
&__update_freelist_inverse &__update_freelist_inverse
}; #else
NULL
#endif
};
return o; return o;
} }
@ -447,5 +467,3 @@ Operation getPageSet() {
}; };
return o; return o;
} }

View file

@ -44,7 +44,7 @@ static void slottedCompact(Page * page) {
numSlots = *numslots_ptr(page); numSlots = *numslots_ptr(page);
for (i = 0; i < numSlots; i++) { for (i = 0; i < numSlots; i++) {
/* printf("i = %d\n", i); */ /* ("i = %d\n", i); */
if (isValidSlot(page, i)) { if (isValidSlot(page, i)) {
/* printf("copying %d\n", i); /* printf("copying %d\n", i);
fflush(NULL); */ fflush(NULL); */
@ -115,7 +115,7 @@ static void slottedCompact(Page * page) {
/*static pthread_mutex_t lastFreepage_mutex; */ /*static pthread_mutex_t lastFreepage_mutex; */
static unsigned int lastFreepage = -1; static unsigned int lastFreepage = -10;
void slottedPageInit() { void slottedPageInit() {
/*pthread_mutex_init(&lastFreepage_mutex , NULL); */ /*pthread_mutex_init(&lastFreepage_mutex , NULL); */
@ -128,8 +128,8 @@ void slottedPageDeinit() {
void slottedPageInitialize(Page * page) { void slottedPageInitialize(Page * page) {
/* printf("Initializing page %d\n", page->id); /*printf("Initializing page %d\n", page->id);
fflush(NULL); */ fflush(NULL); */
memset(page->memAddr, 0, PAGE_SIZE); memset(page->memAddr, 0, PAGE_SIZE);
*page_type_ptr(page) = SLOTTED_PAGE; *page_type_ptr(page) = SLOTTED_PAGE;
*freespace_ptr(page) = 0; *freespace_ptr(page) = 0;
@ -168,9 +168,11 @@ recordid slottedPreRalloc(int xid, long size, Page ** pp) {
/* pthread_mutex_lock(&lastFreepage_mutex); */ /* pthread_mutex_lock(&lastFreepage_mutex); */
/** @todo is ((unsigned int) foo) == -1 portable? Gotta love C.*/ /** @todo is ((unsigned int) foo) == -1 portable? Gotta love C.*/
/*printf("lastFreepage %d\n", lastFreepage); fflush(NULL); */
if(lastFreepage == -1) { if(lastFreepage == -1) {
lastFreepage = TpageAlloc(xid/*, SLOTTED_PAGE*/); lastFreepage = TpageAlloc(xid/*, SLOTTED_PAGE*/);
*pp = loadPage(lastFreepage); *pp = loadPage(lastFreepage);
assert(*page_type_ptr(*pp) == UNINITIALIZED_PAGE);
slottedPageInitialize(*pp); slottedPageInitialize(*pp);
} else { } else {
*pp = loadPage(lastFreepage); *pp = loadPage(lastFreepage);
@ -206,7 +208,8 @@ recordid slottedRawRalloc(Page * page, int size) {
rid.slot = *numslots_ptr(page); rid.slot = *numslots_ptr(page);
rid.size = size; rid.size = size;
/* new way */ /* new way - The freelist_ptr points to the first free slot number, which
is the head of a linked list of free slot numbers.*/
if(*freelist_ptr(page) != INVALID_SLOT) { if(*freelist_ptr(page) != INVALID_SLOT) {
rid.slot = *freelist_ptr(page); rid.slot = *freelist_ptr(page);
*freelist_ptr(page) = *slot_length_ptr(page, rid.slot); *freelist_ptr(page) = *slot_length_ptr(page, rid.slot);

View file

@ -1,7 +1,7 @@
INCLUDES = @CHECK_CFLAGS@ INCLUDES = @CHECK_CFLAGS@
if HAVE_CHECK if HAVE_CHECK
## Had to disable check_lht because lht needs to be rewritten. ## Had to disable check_lht because lht needs to be rewritten.
TESTS = check_logEntry check_logWriter check_page check_operations check_transactional2 check_recovery check_blobRecovery check_bufferManager check_indirect check_lladdhash check_pageOperations check_linearHash check_logicalLinearHash TESTS = check_logEntry check_logWriter check_page check_operations check_transactional2 check_recovery check_blobRecovery check_bufferManager check_indirect check_lladdhash check_pageOperations check_linearHash check_logicalLinearHash check_header
else else
TESTS = TESTS =
endif endif

View file

@ -339,21 +339,30 @@ START_TEST(operation_nestedTopAction) {
int xid= Tbegin(); int xid= Tbegin();
int *dat; int *dat;
dat == malloc(sizeof(int)); //printf("W"); fflush(NULL);
dat = malloc(sizeof(int));
recordid rid1 = Talloc(xid, sizeof(int)); recordid rid1 = Talloc(xid, sizeof(int));
// printf("{%d,%d,%d}\n", rid1.page, rid1.slot, rid1.size); fflush(NULL);
recordid rid2 = Talloc(xid, sizeof(int)); recordid rid2 = Talloc(xid, sizeof(int));
// printf("{%d,%d,%d}\n", rid2.page, rid2.slot, rid2.size); fflush(NULL);
recordid rid3 = Talloc(xid, sizeof(int)); recordid rid3 = Talloc(xid, sizeof(int));
// printf("{%d,%d,%d}\n", rid3.page, rid3.slot, rid3.size); fflush(NULL);
recordid rid4 = Talloc(xid, sizeof(int)); recordid rid4 = Talloc(xid, sizeof(int));
//printf("{%d,%d,%d}\n", rid4.page, rid4.slot, rid4.size); fflush(NULL);
// printf("E"); fflush(NULL);
*dat = 1; *dat = 1;
Tset(xid, rid1, dat); Tset(xid, rid1, dat);
*dat = 2; *dat = 2;
Tset(xid, rid2, dat); Tset(xid, rid2, dat);
// printf("F"); fflush(NULL);
*dat = 3; *dat = 3;
Tset(xid, rid3, dat); Tset(xid, rid3, dat);
// printf("G"); fflush(NULL);
*dat = 4; *dat = 4;
Tset(xid, rid4, dat); Tset(xid, rid4, dat);
// printf("H"); fflush(NULL);
Tcommit(xid); Tcommit(xid);
// printf("A"); fflush(NULL);
xid = Tbegin(); // Loser xact. xid = Tbegin(); // Loser xact.
*dat = 10; *dat = 10;
@ -367,7 +376,7 @@ START_TEST(operation_nestedTopAction) {
Tset(xid, rid3, dat); Tset(xid, rid3, dat);
TendNestedTopAction(xid); TendNestedTopAction(xid);
// printf("B"); fflush(NULL);
*dat = 40; *dat = 40;
Tset(xid, rid4, dat); Tset(xid, rid4, dat);
@ -379,7 +388,7 @@ START_TEST(operation_nestedTopAction) {
int dat2; int dat2;
int dat3; int dat3;
int dat4; int dat4;
// printf("C"); fflush(NULL);
Tread(xid, rid1, &dat1); Tread(xid, rid1, &dat1);
Tread(xid, rid2, &dat2); Tread(xid, rid2, &dat2);
Tread(xid, rid3, &dat3); Tread(xid, rid3, &dat3);
@ -505,6 +514,26 @@ START_TEST(operation_set_range) {
Tcommit(xid); Tcommit(xid);
} END_TEST } END_TEST
START_TEST(operation_alloc_test) {
Tinit();
int xid = Tbegin();
recordid rid1 = Talloc(xid, 100);
Tcommit(xid);
xid = Tbegin();
recordid rid2 = Talloc(xid, 100);
Tcommit(xid);
printf("rid1={%d,%d,%d} rid2={%d,%d,%d}\n",
rid1.page, rid1.slot, rid1.size,
rid2.page, rid2.slot, rid2.size);
Tdeinit();
} END_TEST
#define ARRAY_LIST_CHECK_ITER 10000 #define ARRAY_LIST_CHECK_ITER 10000
START_TEST(operation_array_list) { START_TEST(operation_array_list) {
@ -602,6 +631,7 @@ Suite * check_suite(void) {
tcase_add_test(tc, operation_instant_set); tcase_add_test(tc, operation_instant_set);
tcase_add_test(tc, operation_set_range); tcase_add_test(tc, operation_set_range);
tcase_add_test(tc, operation_prepare); tcase_add_test(tc, operation_prepare);
tcase_add_test(tc, operation_alloc_test);
tcase_add_test(tc, operation_array_list); tcase_add_test(tc, operation_array_list);
/* --------------------------------------------- */ /* --------------------------------------------- */
tcase_add_checked_fixture(tc, setup, teardown); tcase_add_checked_fixture(tc, setup, teardown);