Added assertlock() and call it when page implementations assume a page has already been latched. (Fixed

associated bugs as well)
This commit is contained in:
Sears Russell 2007-06-01 21:32:33 +00:00
parent d1aeba8a82
commit dd7c6b857d
14 changed files with 75 additions and 28 deletions

View file

@ -35,6 +35,8 @@ void readlock (rwl *lock, int d);
int tryreadlock(rwl *lock, int d); int tryreadlock(rwl *lock, int d);
void writelock (rwl *lock, int d); void writelock (rwl *lock, int d);
int trywritelock(rwl *lock, int d); int trywritelock(rwl *lock, int d);
/** aborts if called when no thread holds this latch. */
void assertlocked(rwl *lock);
void downgradelock(rwl * lock); void downgradelock(rwl * lock);
void unlock(rwl * lock); void unlock(rwl * lock);
/** @deprecated in favor of unlock() */ /** @deprecated in favor of unlock() */

View file

@ -36,6 +36,10 @@ rwl *initlock (void)
writelock(lock, d); writelock(lock, d);
}*/ }*/
void assertlocked(rwl * lock) {
assert(lock->writers || lock->readers);
}
void readlock (rwl *lock, int d) void readlock (rwl *lock, int d)
{ {
/* printf("reader %d\n", d); /* printf("reader %d\n", d);

View file

@ -202,7 +202,9 @@ compensated_function recordid Talloc(int xid, unsigned long size) {
p = loadPage(xid, lastFreepage); p = loadPage(xid, lastFreepage);
while(slottedFreespace(p) < physical_slot_length(type)) { while(slottedFreespace(p) < physical_slot_length(type)) {
writelock(p->rwlatch,0);
slottedCompact(p); slottedCompact(p);
unlock(p->rwlatch);
int newFreespace = slottedFreespace(p); int newFreespace = slottedFreespace(p);
if(newFreespace >= physical_slot_length(type)) { if(newFreespace >= physical_slot_length(type)) {
break; break;

View file

@ -75,6 +75,9 @@ static int operateAlloc(int xid, Page * p, lsn_t lsn, recordid rid, const void *
int multiplier = tlp->multiplier; int multiplier = tlp->multiplier;
int size = tlp->size; int size = tlp->size;
/* Allocing this page -> implicit lock, but latch to conformt to
fixedPage's interface. */
writelock(p->rwlatch, 0);
fixedPageInitialize(p, sizeof(int), recordsPerPage(sizeof(int))); fixedPageInitialize(p, sizeof(int), recordsPerPage(sizeof(int)));
#define MAX_OFFSET_POSITION 3 #define MAX_OFFSET_POSITION 3
@ -91,7 +94,6 @@ static int operateAlloc(int xid, Page * p, lsn_t lsn, recordid rid, const void *
firstDataPageRid.slot = 4; firstDataPageRid.slot = 4;
int firstDataPage = firstPage + 1; int firstDataPage = firstPage + 1;
/* Allocing this page -> implicit lock. */
fixedWriteUnlocked(p, countRid, (byte*)&count); fixedWriteUnlocked(p, countRid, (byte*)&count);
fixedWriteUnlocked(p, multiplierRid, (byte*)&multiplier); fixedWriteUnlocked(p, multiplierRid, (byte*)&multiplier);
fixedWriteUnlocked(p, firstDataPageRid, (byte*)&firstDataPage); fixedWriteUnlocked(p, firstDataPageRid, (byte*)&firstDataPage);
@ -109,6 +111,7 @@ static int operateAlloc(int xid, Page * p, lsn_t lsn, recordid rid, const void *
ret.page = firstPage; ret.page = firstPage;
ret.slot = 0; /* slot = # of slots in array... */ ret.slot = 0; /* slot = # of slots in array... */
ret.size = size; ret.size = size;
unlock(p->rwlatch);
return 0; return 0;
} }
@ -196,7 +199,7 @@ compensated_function int TarrayListLength(int xid, recordid rid) {
/*----------------------------------------------------------------------------*/ /*----------------------------------------------------------------------------*/
recordid dereferenceArrayListRid(Page * p, int offset) { recordid dereferenceArrayListRid(Page * p, int offset) {
readlock(p->rwlatch,0);
TarrayListParameters tlp = pageToTLP(p); TarrayListParameters tlp = pageToTLP(p);
int rec_per_page = recordsPerPage((size_t)tlp.size); int rec_per_page = recordsPerPage((size_t)tlp.size);
@ -215,6 +218,7 @@ recordid dereferenceArrayListRid(Page * p, int offset) {
assert(pageRidSlot + FIRST_DATA_PAGE_OFFSET < fixedPageCount(p)); assert(pageRidSlot + FIRST_DATA_PAGE_OFFSET < fixedPageCount(p));
thePage = *(int*)fixed_record_ptr(p, pageRidSlot + FIRST_DATA_PAGE_OFFSET); /*reading immutable record; don't need latch.*/ thePage = *(int*)fixed_record_ptr(p, pageRidSlot + FIRST_DATA_PAGE_OFFSET); /*reading immutable record; don't need latch.*/
unlock(p->rwlatch);
recordid rid; recordid rid;
rid.page = thePage + blockPage; rid.page = thePage + blockPage;

View file

@ -54,7 +54,6 @@ static int operate(int xid, Page * p, lsn_t lsn, recordid r, const void *d) {
recordRead(xid, p, r, (byte*)&i); recordRead(xid, p, r, (byte*)&i);
i--; i--;
recordWrite(xid, p, lsn, r, (byte*)&i); recordWrite(xid, p, lsn, r, (byte*)&i);
return 0; return 0;
} }

View file

@ -54,7 +54,6 @@ static int operate(int xid, Page * p, lsn_t lsn, recordid r, const void *d) {
recordRead(xid, p, r, (byte*)&i); recordRead(xid, p, r, (byte*)&i);
i++; i++;
recordWrite(xid, p, lsn, r, (byte*)&i); recordWrite(xid, p, lsn, r, (byte*)&i);
return 0; return 0;
} }

View file

@ -154,7 +154,7 @@ void recordWrite(int xid, Page * p, lsn_t lsn, recordid rid, const void *dat) {
assert( (p->id == rid.page) && (p->memAddr != NULL) ); assert( (p->id == rid.page) && (p->memAddr != NULL) );
} }
static int recordReadWarnedAboutPageTypeKludge = 0;
int recordRead(int xid, Page * p, recordid rid, void *buf) { int recordRead(int xid, Page * p, recordid rid, void *buf) {
assert(rid.page == p->id); assert(rid.page == p->id);
@ -164,8 +164,12 @@ int recordRead(int xid, Page * p, recordid rid, void *buf) {
readBlob(xid, p, rid, buf); readBlob(xid, p, rid, buf);
} else if(page_type == SLOTTED_PAGE || page_type == BOUNDARY_TAG_PAGE) { } else if(page_type == SLOTTED_PAGE || page_type == BOUNDARY_TAG_PAGE) {
slottedRead(p, rid, buf); slottedRead(p, rid, buf);
/* @TODO !page_type can never be required if this code is correct... arraylist is broken!!*/ /* @TODO !page_type can never be required if this code is correct... arraylist is broken!! XXX */
} else if(page_type == FIXED_PAGE || page_type==ARRAY_LIST_PAGE || !page_type) { } else if(page_type == FIXED_PAGE || page_type==ARRAY_LIST_PAGE || !page_type) {
if(!page_type && ! recordReadWarnedAboutPageTypeKludge) {
recordReadWarnedAboutPageTypeKludge = 1;
printf("page.c: MAKING USE OF TERRIBLE KLUDGE AND IGNORING ASSERT FAILURE! FIX ARRAY LIST ASAP!!!\n");
}
fixedRead(p, rid, buf); fixedRead(p, rid, buf);
} else { } else {
abort(); abort();

View file

@ -18,11 +18,13 @@ void fixedPageInitialize(Page * page, size_t size, int count) {
} }
short fixedPageCount(Page * page) { short fixedPageCount(Page * page) {
assertlocked(page->rwlatch);
assert(*page_type_ptr(page) == FIXED_PAGE || *page_type_ptr(page) == ARRAY_LIST_PAGE); assert(*page_type_ptr(page) == FIXED_PAGE || *page_type_ptr(page) == ARRAY_LIST_PAGE);
return *recordcount_ptr(page); return *recordcount_ptr(page);
} }
short fixedPageRecordSize(Page * page) { short fixedPageRecordSize(Page * page) {
assertlocked(page->rwlatch);
assert(*page_type_ptr(page) == FIXED_PAGE || *page_type_ptr(page) == ARRAY_LIST_PAGE); assert(*page_type_ptr(page) == FIXED_PAGE || *page_type_ptr(page) == ARRAY_LIST_PAGE);
return *recordsize_ptr(page); return *recordsize_ptr(page);
} }
@ -53,7 +55,9 @@ recordid fixedRawRalloc(Page *page) {
return fixedRawRallocMany(page, 1); return fixedRawRallocMany(page, 1);
} }
static int checkRidWarnedAboutUninitializedKludge = 0;
static void checkRid(Page * page, recordid rid) { static void checkRid(Page * page, recordid rid) {
assertlocked(page->rwlatch);
if(*page_type_ptr(page)) { if(*page_type_ptr(page)) {
assert(*page_type_ptr(page) == FIXED_PAGE || *page_type_ptr(page) == ARRAY_LIST_PAGE); assert(*page_type_ptr(page) == FIXED_PAGE || *page_type_ptr(page) == ARRAY_LIST_PAGE);
assert(page->id == rid.page); assert(page->id == rid.page);
@ -62,11 +66,16 @@ static void checkRid(Page * page, recordid rid) {
int recCount = *recordcount_ptr(page); int recCount = *recordcount_ptr(page);
assert(recCount > rid.slot); assert(recCount > rid.slot);
} else { } else {
if(!checkRidWarnedAboutUninitializedKludge) {
checkRidWarnedAboutUninitializedKludge = 1;
printf("KLUDGE detected in checkRid. Fix it ASAP\n");
}
fixedPageInitialize(page, rid.size, recordsPerPage(rid.size)); fixedPageInitialize(page, rid.size, recordsPerPage(rid.size));
} }
} }
void fixedReadUnlocked(Page * page, recordid rid, byte * buf) { void fixedReadUnlocked(Page * page, recordid rid, byte * buf) {
assertlocked(page->rwlatch);
if(!memcpy(buf, fixed_record_ptr(page, rid.slot), rid.size)) { if(!memcpy(buf, fixed_record_ptr(page, rid.slot), rid.size)) {
perror("memcpy"); perror("memcpy");
abort(); abort();
@ -87,6 +96,7 @@ void fixedRead(Page * page, recordid rid, byte * buf) {
} }
void fixedWriteUnlocked(Page * page, recordid rid, const byte *dat) { void fixedWriteUnlocked(Page * page, recordid rid, const byte *dat) {
assertlocked(page->rwlatch);
checkRid(page,rid); checkRid(page,rid);
if(!memcpy(fixed_record_ptr(page, rid.slot), dat, rid.size)) { if(!memcpy(fixed_record_ptr(page, rid.slot), dat, rid.size)) {
perror("memcpy"); perror("memcpy");

View file

@ -1,3 +1,9 @@
/**
@file
@todo Is header.[ch] used for anything any more? No one seems to call
into them. Delete them?
*/
//void headerPageInitialize(Page * p); //void headerPageInitialize(Page * p);
int headerPageInitialize(); int headerPageInitialize();

View file

@ -13,39 +13,43 @@ void indirectInitialize(Page * p, int height) {
*page_type_ptr(p) = INDIRECT_PAGE; *page_type_ptr(p) = INDIRECT_PAGE;
memset(p->memAddr, INVALID_SLOT, ((size_t)level_ptr(p)) - ((size_t)p->memAddr)); memset(p->memAddr, INVALID_SLOT, ((size_t)level_ptr(p)) - ((size_t)p->memAddr));
} }
/** @todo locking for dereferenceRID? */ /** @todo Is locking for dereferenceRID really necessary? */
compensated_function recordid dereferenceRID(int xid, recordid rid) { compensated_function recordid dereferenceRID(int xid, recordid rid) {
Page * this; Page * page;
try_ret(NULLRID) { try_ret(NULLRID) {
this = loadPage(xid, rid.page); page = loadPage(xid, rid.page);
readlock(page->rwlatch, 0);
} end_ret(NULLRID); } end_ret(NULLRID);
// printf("a"); fflush(stdout); // printf("a"); fflush(stdout);
int offset = 0; int offset = 0;
int max_slot; int max_slot;
while(*page_type_ptr(this) == INDIRECT_PAGE) { while(*page_type_ptr(page) == INDIRECT_PAGE) {
int i = 0; int i = 0;
for(max_slot = *maxslot_ptr(this, i); ( max_slot + offset ) <= rid.slot; max_slot = *maxslot_ptr(this, i)) { for(max_slot = *maxslot_ptr(page, i); ( max_slot + offset ) <= rid.slot; max_slot = *maxslot_ptr(page, i)) {
i++; i++;
assert(max_slot != INVALID_SLOT); assert(max_slot != INVALID_SLOT);
} }
if(i) { if(i) {
offset += *maxslot_ptr(this, i - 1); offset += *maxslot_ptr(page, i - 1);
} /** else, the adjustment to the offset is zero */ } /** else, the adjustment to the offset is zero */
int nextPage = *page_ptr(this, i); int nextPage = *page_ptr(page, i);
releasePage(this); unlock(page->rwlatch);
releasePage(page);
try_ret(NULLRID) { try_ret(NULLRID) {
this = loadPage(xid, nextPage); page = loadPage(xid, nextPage);
readlock(page->rwlatch, 0);
} end_ret(NULLRID); } end_ret(NULLRID);
} }
// printf("b"); fflush(stdout); // printf("b"); fflush(stdout);
rid.page = this->id; rid.page = page->id;
rid.slot -= offset; rid.slot -= offset;
releasePage(this); unlock(page->rwlatch);
releasePage(page);
// printf("c"); fflush(stdout); // printf("c"); fflush(stdout);
return rid; return rid;
@ -195,6 +199,7 @@ compensated_function int indirectPageRecordCount(int xid, recordid rid) {
try_ret(-1){ try_ret(-1){
p = loadPage(xid, rid.page); p = loadPage(xid, rid.page);
}end_ret(-1); }end_ret(-1);
readlock(p->rwlatch, 0);
int i = 0; int i = 0;
unsigned int ret; unsigned int ret;
if(*page_type_ptr(p) == INDIRECT_PAGE) { if(*page_type_ptr(p) == INDIRECT_PAGE) {
@ -221,6 +226,8 @@ compensated_function int indirectPageRecordCount(int xid, recordid rid) {
printf("Unknown page type in indirectPageRecordCount\n"); printf("Unknown page type in indirectPageRecordCount\n");
abort(); abort();
} }
unlock(p->rwlatch);
releasePage(p); releasePage(p);
return ret; return ret;
} }

View file

@ -10,18 +10,21 @@ void rawPageInferMetadata(Page * p) {
} }
byte* rawPageGetData(int xid, Page * p) { byte* rawPageGetData(int xid, Page * p) {
assertlocked(p->rwlatch);
return units_from_start_raw(byte, p, 0); return units_from_start_raw(byte, p, 0);
} }
void rawPageSetData(int xid, lsn_t lsn, Page * p) { void rawPageSetData(int xid, lsn_t lsn, Page * p) {
writelock(p->rwlatch, 255); assertlocked(p->rwlatch);
// writelock(p->rwlatch, 255);
rawPageWriteLSN(xid, p, lsn); rawPageWriteLSN(xid, p, lsn);
dirtyPages_add(p); dirtyPages_add(p);
unlock(p->rwlatch); // unlock(p->rwlatch);
return; return;
} }
lsn_t rawPageReadLSN(const Page * p) { lsn_t rawPageReadLSN(const Page * p) {
assertlocked(p->rwlatch);
// There are some potential optimizations here since the page // There are some potential optimizations here since the page
// doesn't "really" have an LSN at all, but we need to be careful // doesn't "really" have an LSN at all, but we need to be careful
// about log truncation... // about log truncation...
@ -29,6 +32,7 @@ lsn_t rawPageReadLSN(const Page * p) {
} }
void rawPageWriteLSN(int xid, Page * p, lsn_t lsn) { void rawPageWriteLSN(int xid, Page * p, lsn_t lsn) {
assertlocked(p->rwlatch);
if(p->LSN < lsn) { p->LSN = lsn; } if(p->LSN < lsn) { p->LSN = lsn; }
} }

View file

@ -7,7 +7,7 @@
static void really_do_ralloc(Page * page, recordid rid) ; static void really_do_ralloc(Page * page, recordid rid) ;
size_t slottedFreespaceForSlot(Page * page, int slot); size_t slottedFreespaceForSlot(Page * page, int slot);
void fsckSlottedPage(const Page const * page) { void fsckSlottedPage(const Page const * page) {
assertlocked(page->rwlatch);
#ifdef SLOTTED_PAGE_SANITY_CHECKS #ifdef SLOTTED_PAGE_SANITY_CHECKS
Page dummy; Page dummy;
@ -126,6 +126,7 @@ The caller of this function must have a writelock on the page.
*/ */
void slottedCompact(Page * page) { void slottedCompact(Page * page) {
assertlocked(page->rwlatch);
Page bufPage; Page bufPage;
byte buffer[PAGE_SIZE]; byte buffer[PAGE_SIZE];
bufPage.memAddr = buffer; bufPage.memAddr = buffer;
@ -198,8 +199,6 @@ void slottedPageInitialize(Page * page) {
*freespace_ptr(page) = 0; *freespace_ptr(page) = 0;
*numslots_ptr(page) = 0; *numslots_ptr(page) = 0;
*freelist_ptr(page) = INVALID_SLOT; *freelist_ptr(page) = INVALID_SLOT;
fsckSlottedPage(page);
} }
size_t slottedFreespaceUnlocked(Page * page); size_t slottedFreespaceUnlocked(Page * page);
@ -208,6 +207,7 @@ size_t slottedFreespaceUnlocked(Page * page);
it takes the position of the new slot's header into account. it takes the position of the new slot's header into account.
*/ */
size_t slottedFreespaceForSlot(Page * page, int slot) { size_t slottedFreespaceForSlot(Page * page, int slot) {
assertlocked(page->rwlatch);
size_t slotOverhead; size_t slotOverhead;
if(slot == INVALID_SLOT) { if(slot == INVALID_SLOT) {
@ -305,6 +305,7 @@ recordid slottedRawRalloc(Page * page, int size) {
@param rid Recordid with 'internal' size. The size should have already been translated to a type if necessary. @param rid Recordid with 'internal' size. The size should have already been translated to a type if necessary.
*/ */
static void really_do_ralloc(Page * page, recordid rid) { static void really_do_ralloc(Page * page, recordid rid) {
assertlocked(page->rwlatch);
short freeSpace; short freeSpace;
@ -508,6 +509,7 @@ void slottedDeRalloc(int xid, Page * page, lsn_t lsn, recordid rid) {
} }
void slottedReadUnlocked(Page * page, recordid rid, byte *buff) { void slottedReadUnlocked(Page * page, recordid rid, byte *buff) {
assertlocked(page->rwlatch);
int slot_length; int slot_length;
fsckSlottedPage(page); fsckSlottedPage(page);
@ -556,6 +558,7 @@ void slottedWrite(Page * page, recordid rid, const byte *data) {
} }
void slottedWriteUnlocked(Page * page, recordid rid, const byte *data) { void slottedWriteUnlocked(Page * page, recordid rid, const byte *data) {
assertlocked(page->rwlatch);
int slot_length; int slot_length;
fsckSlottedPage(page); fsckSlottedPage(page);

View file

@ -146,6 +146,7 @@ int Tinit() {
NULL, 20, PAGE_SIZE * 1024, 1024); NULL, 20, PAGE_SIZE * 1024, 1024);
pageHandleOpen(pageFile); pageHandleOpen(pageFile);
#else #else
printf("\nWarning: Using old I/O routines.\n");
openPageFile(); openPageFile();
#endif // USE_PAGEFILE #endif // USE_PAGEFILE
bufInit(bufferManagerType); bufInit(bufferManagerType);

View file

@ -206,6 +206,7 @@ void testFunctions(){
// cleaning up // cleaning up
releasePage(p1); releasePage(p1);
Tcommit(xid); Tcommit(xid);
} }
@ -233,7 +234,7 @@ int SimpleExample(){
int pageid1 = rid1.page; int pageid1 = rid1.page;
Page * p1 = loadPage(xid, pageid1); Page * p1 = loadPage(xid, pageid1);
writelock(p1->rwlatch, 0);
if(DEBUGP) { printf("\n**** page->id = %lld\n", p1->id);} if(DEBUGP) { printf("\n**** page->id = %lld\n", p1->id);}
/* check consistency between rid & page's values /* check consistency between rid & page's values
@ -272,14 +273,15 @@ int SimpleExample(){
recordid rid2 = rid1; recordid rid2 = rid1;
rid2.slot = 0; rid2.slot = 0;
// @todo This is a messy way to do this...
unlock(p1->rwlatch);
fixedWrite(p1, rid2, b1); fixedWrite(p1, rid2, b1);
fixedRead(p1, rid2, b2); fixedRead(p1, rid2, b2);
if (DEBUGP) { printf("\nb2** = %d\n",*((int *) b2));} if (DEBUGP) { printf("\nb2** = %d\n",*((int *) b2));}
// initializeNewBTreeNode(p1, rid1); // initializeNewBTreeNode(p1, rid1);
releasePage(p1); releasePage(p1);
Tcommit(xid); Tcommit(xid);