relaxed page latching requirements; page.c, and dirtyPageTable no longer check / rely on holding rwlatch; removed (now) unnecessary latching from unit tests

This commit is contained in:
Sears Russell 2010-08-06 00:19:19 +00:00
parent b722adff8a
commit e5528ce218
7 changed files with 15 additions and 75 deletions

View file

@ -228,7 +228,7 @@ static Page * bhLoadPageImpl_helper(stasis_buffer_manager_t* bm, stasis_buffer_m
int xid, const pageid_t pageid, int uninitialized, pagetype_t type) {
stasis_buffer_hash_t * bh = bm->impl;
DEBUG("load %lld (%d)\n", pageid, uninitialized);
DEBUG("loadPage(%lld) (uninitialized = %d)\n", pageid, uninitialized);
// Note: Calls to loadlatch in this function violate lock order, but
// should be safe, since we make sure no one can have a writelock
@ -403,6 +403,7 @@ void bhPrefetchPagesImpl(stasis_buffer_manager_t *bm, pageid_t pageid, pageid_t
}
static void bhReleasePage(stasis_buffer_manager_t * bm, Page * p) {
DEBUG("releasePage(%lld) (rwlatch = %llx)\n", p->id, (long long)p->rwlatch);
stasis_buffer_hash_t * bh = bm->impl;
pthread_mutex_lock(&bh->mut);
checkPageState(p);

View file

@ -35,16 +35,17 @@ struct stasis_dirty_page_table_t {
};
void stasis_dirty_page_table_set_dirty(stasis_dirty_page_table_t * dirtyPages, Page * p) {
assert(!tryreadlock(p->rwlatch,0));
if(!p->dirty) {
p->dirty = 1;
dpt_entry * e = malloc(sizeof(*e));
e->p = p->id;
e->lsn = p->LSN;
pthread_mutex_lock(&dirtyPages->mutex);
const void * ret = rbsearch(e, dirtyPages->table);
assert(ret == e); // otherwise, the entry was already in the table.
dirtyPages->count++;
if(!p->dirty) {
p->dirty = 1;
dpt_entry * e = malloc(sizeof(*e));
e->p = p->id;
e->lsn = p->LSN;
const void * ret = rbsearch(e, dirtyPages->table);
assert(ret == e); // otherwise, the entry was already in the table.
dirtyPages->count++;
}
pthread_mutex_unlock(&dirtyPages->mutex);
#ifdef SANITY_CHECKS
} else {

View file

@ -87,8 +87,6 @@ static page_impl page_impls[MAX_PAGE_TYPE];
static stasis_dirty_page_table_t * dirtyPages;
void stasis_page_lsn_write(int xid, Page * page, lsn_t lsn) {
assertlocked(page->rwlatch);
if(page->LSN < lsn) {
page->LSN = lsn;
}
@ -99,7 +97,6 @@ void stasis_page_lsn_write(int xid, Page * page, lsn_t lsn) {
}
lsn_t stasis_page_lsn_read(const Page * page) {
assertlocked(page->rwlatch);
return page->LSN;
}
@ -146,7 +143,6 @@ page_impl * stasis_page_impl_get(int id) {
return & page_impls[id];
}
void stasis_record_write(int xid, Page * p, recordid rid, const byte *dat) {
assertlocked(p->rwlatch);
assert( (p->id == rid.page) && (p->memAddr != NULL) );
assert(rid.size <= BLOB_THRESHOLD_SIZE);
@ -156,7 +152,6 @@ void stasis_record_write(int xid, Page * p, recordid rid, const byte *dat) {
assert( (p->id == rid.page) && (p->memAddr != NULL) );
}
int stasis_record_read(int xid, Page * p, recordid rid, byte *buf) {
assertlocked(p->rwlatch);
assert(rid.page == p->id);
assert(rid.size <= BLOB_THRESHOLD_SIZE);
@ -170,7 +165,6 @@ int stasis_record_read(int xid, Page * p, recordid rid, byte *buf) {
@todo stasis_record_dereference should dispatch via page_impl...
*/
recordid stasis_record_dereference(int xid, Page * p, recordid rid) {
assertlocked(p->rwlatch);
int page_type = p->pageType;
if(page_type == ARRAY_LIST_PAGE) {
@ -182,14 +176,12 @@ recordid stasis_record_dereference(int xid, Page * p, recordid rid) {
/// -------------- Dispatch functions
const byte * stasis_record_read_begin(int xid, Page * p, recordid rid) {
assertlocked(p->rwlatch);
int page_type = p->pageType;
assert(page_type);
return page_impls[page_type].recordRead(xid, p, rid);
}
byte * stasis_record_write_begin(int xid, Page * p, recordid rid) {
assertlocked(p->rwlatch);
int page_type = p->pageType;
assert(page_type);
@ -209,19 +201,16 @@ void stasis_record_write_done(int xid, Page *p, recordid rid, byte *b) {
}
}
int stasis_record_type_read(int xid, Page *p, recordid rid) {
assertlocked(p->rwlatch);
if(page_impls[p->pageType].recordGetType)
return page_impls[p->pageType].recordGetType(xid, p, rid);
else
return INVALID_SLOT;
}
void stasis_record_type_write(int xid, Page *p, recordid rid, int type) {
assertlocked(p->rwlatch);
page_impls[p->pageType]
.recordSetType(xid, p, rid, type);
}
int stasis_record_length_read(int xid, Page *p, recordid rid) {
assertlocked(p->rwlatch);
return page_impls[p->pageType]
.recordGetLength(xid,p,rid);
}

View file

@ -13,7 +13,6 @@ int stasis_fixed_records_per_page(size_t size) {
}
/** @todo CORRECTNESS Locking for stasis_fixed_initialize_page? (should hold writelock)*/
void stasis_fixed_initialize_page(Page * page, size_t size, int count) {
assertlocked(page->rwlatch);
stasis_page_cleanup(page);
// Zero out the page contents, since callers often do so anyway.
// blows away LSN, but the copy that's in p->LSN will be put back on page at flush.
@ -25,7 +24,6 @@ void stasis_fixed_initialize_page(Page * page, size_t size, int count) {
}
static void checkRid(Page * page, recordid rid) {
assertlocked(page->rwlatch);
assert(page->pageType); // any more specific breaks pages based on this one
assert(page->id == rid.page);
assert(*recordsize_ptr(page) == rid.size);
@ -35,21 +33,18 @@ static void checkRid(Page * page, recordid rid) {
//-------------- New API below this line
static const byte* fixedRead(int xid, Page *p, recordid rid) {
assertlocked(p->rwlatch);
checkRid(p, rid);
assert(rid.slot < *recordcount_ptr(p));
return fixed_record_ptr(p, rid.slot);
}
static byte* fixedWrite(int xid, Page *p, recordid rid) {
assertlocked(p->rwlatch);
checkRid(p, rid);
assert(rid.slot < *recordcount_ptr(p));
return fixed_record_ptr(p, rid.slot);
}
static int fixedGetType(int xid, Page *p, recordid rid) {
assertlocked(p->rwlatch);
// checkRid(p, rid);
if(rid.slot < *recordcount_ptr(p)) {
int type = *recordsize_ptr(p);
@ -62,14 +57,12 @@ static int fixedGetType(int xid, Page *p, recordid rid) {
}
}
static void fixedSetType(int xid, Page *p, recordid rid, int type) {
assertlocked(p->rwlatch);
checkRid(p,rid);
assert(rid.slot < *recordcount_ptr(p));
assert(stasis_record_type_to_size(type) == stasis_record_type_to_size(*recordsize_ptr(p)));
*recordsize_ptr(p) = rid.size;
}
static int fixedGetLength(int xid, Page *p, recordid rid) {
assertlocked(p->rwlatch);
assert(p->pageType);
return rid.slot > *recordcount_ptr(p) ?
INVALID_SLOT : stasis_record_type_to_size(*recordsize_ptr(p));
@ -78,7 +71,6 @@ static int fixedGetLength(int xid, Page *p, recordid rid) {
static int notSupported(int xid, Page * p) { return 0; }
static int fixedFreespace(int xid, Page * p) {
assertlocked(p->rwlatch);
if(stasis_fixed_records_per_page(*recordsize_ptr(p)) > *recordcount_ptr(p)) {
// Return the size of a slot; that's the biggest record we can take.
return stasis_record_type_to_size(*recordsize_ptr(p));
@ -94,7 +86,6 @@ static void fixedCompactSlotIds(int xid, Page * p) {
abort();
}
static recordid fixedPreAlloc(int xid, Page *p, int size) {
assertlocked(p->rwlatch);
if(stasis_fixed_records_per_page(*recordsize_ptr(p)) > *recordcount_ptr(p)) {
recordid rid;
rid.page = p->id;
@ -106,7 +97,6 @@ static recordid fixedPreAlloc(int xid, Page *p, int size) {
}
}
static void fixedPostAlloc(int xid, Page *p, recordid rid) {
assertlocked(p->rwlatch);
assert(*recordcount_ptr(p) == rid.slot);
assert(*recordsize_ptr(p) == rid.size);
(*recordcount_ptr(p))++;
@ -115,7 +105,6 @@ static void fixedSplice(int xid, Page *p, slotid_t first, slotid_t second) {
abort();
}
static void fixedFree(int xid, Page *p, recordid rid) {
assertlocked(p->rwlatch);
if(*recordsize_ptr(p) == rid.slot+1) {
(*recordsize_ptr(p))--;
} else {

View file

@ -36,8 +36,6 @@
*/
static inline void slottedFsck(const Page const * page) {
assertlocked(page->rwlatch);
Page dummy;
dummy.id = -1;
@ -146,15 +144,11 @@ static inline void slottedFsck(const Page const * page) {
}
/**
Move all of the records to the beginning of the page in order to
increase the available free space.
The caller of this function must have a writelock on the page.
*/
static void slottedCompact(Page * page) {
assertlocked(page->rwlatch);
Page bufPage;
byte buffer[PAGE_SIZE];
bufPage.memAddr = buffer;
@ -234,7 +228,6 @@ static void slottedCompactSlotIDs(int xid, Page * p) {
of a subsequent call to really_do_ralloc().
*/
static size_t slottedFreespaceForSlot(Page * page, int slot) {
assertlocked(page->rwlatch);
size_t slotOverhead;
if(slot == INVALID_SLOT) {
@ -581,7 +574,6 @@ void stasis_page_slotted_deinit() {
}
void stasis_page_slotted_initialize_page(Page * page) {
assertlocked(page->rwlatch);
stasis_page_cleanup(page);
page->pageType = SLOTTED_PAGE;
*stasis_page_slotted_freespace_ptr(page) = 0;

View file

@ -70,11 +70,9 @@ START_TEST(operation_physical_do_undo) {
long long pnum = TpageAlloc(xid);
Page * p = loadPage(xid, pnum);
writelock(p->rwlatch, 0);
stasis_page_slotted_initialize_page(p);
rid = stasis_record_alloc_begin(xid, p, sizeof(int));
stasis_record_alloc_done(xid, p, rid);
unlock(p->rwlatch);
releasePage(p);
DEBUG("A\n");
@ -99,11 +97,9 @@ START_TEST(operation_physical_do_undo) {
DEBUG("B\n");
p = loadPage(xid, rid.page);
writelock(p->rwlatch,0);
// manually fill in UNDO field
stasis_record_write(xid, p, rid, (byte*)&buf);
stasis_page_lsn_write(xid, p, lsn);
unlock(p->rwlatch);
releasePage(p);
setToTwo->LSN = 10;
@ -115,9 +111,7 @@ START_TEST(operation_physical_do_undo) {
releasePage(p);
p = loadPage(xid, rid.page);
writelock(p->rwlatch,0);
stasis_record_read(xid, p, rid, (byte*)&buf);
unlock(p->rwlatch);
releasePage(p);
assert(buf == 2);
@ -137,6 +131,7 @@ START_TEST(operation_physical_do_undo) {
releasePage(p);
p = loadPage(xid, rid.page);
readlock(p->rwlatch,0);
stasis_record_read(xid, p, rid, (byte*)&buf);
@ -165,10 +160,8 @@ START_TEST(operation_physical_do_undo) {
buf = 1;
p = loadPage(xid, rid.page);
writelock(p->rwlatch,0);
stasis_record_write(xid, p, rid, (byte*)&buf);
stasis_page_lsn_write(xid, p, lsn);
unlock(p->rwlatch);
releasePage(p);
/* Trace of test:
@ -558,7 +551,6 @@ START_TEST(operation_lsn_free) {
int xid = Tbegin();
pageid_t pid = TpageAlloc(xid);
Page * p = loadPage(xid,pid);
writelock(p->rwlatch,0);
stasis_slotted_lsn_free_initialize_page(p);
// XXX hack!
byte * old = malloc(PAGE_SIZE);
@ -573,7 +565,6 @@ START_TEST(operation_lsn_free) {
byte * new = malloc(PAGE_SIZE);
memcpy(new, p->memAddr, PAGE_SIZE);
memcpy(p->memAddr, old, PAGE_SIZE);
unlock(p->rwlatch);
releasePage(p);
TpageSet(xid, pid, new);
free(old);
@ -624,7 +615,6 @@ START_TEST(operation_reorderable) {
int xid = Tbegin();
pageid_t pid = TpageAlloc(xid);
Page * p = loadPage(xid,pid);
writelock(p->rwlatch,0);
stasis_slotted_lsn_free_initialize_page(p);
// XXX hack!
byte * old = malloc(PAGE_SIZE);
@ -639,7 +629,6 @@ START_TEST(operation_reorderable) {
byte * new = malloc(PAGE_SIZE);
memcpy(new, p->memAddr, PAGE_SIZE);
memcpy(p->memAddr, old, PAGE_SIZE);
unlock(p->rwlatch);
releasePage(p);
TpageSet(xid, pid, new);
free(old);

View file

@ -341,24 +341,21 @@ START_TEST(pageRecordSizeTypeIteratorTest) {
pageid_t pid = TpageAlloc(xid);
Page * p = loadPage(xid,pid);
writelock(p->rwlatch,0);
memset(p->memAddr, 0, PAGE_SIZE);
stasis_page_slotted_initialize_page(p);
checkPageIterators(xid,p,10);
unlock(p->rwlatch);
pid = TpageAlloc(xid);
releasePage(p);
p = loadPage(xid,pid);
writelock(p->rwlatch,0);
memset(p->memAddr, 0, PAGE_SIZE);
stasis_fixed_initialize_page(p,sizeof(int64_t),0);
checkPageIterators(xid,p,10);
unlock(p->rwlatch);
releasePage(p);
Tcommit(xid);
@ -385,9 +382,8 @@ START_TEST(pageNoThreadMultPageTest)
p->LSN = 0;
*stasis_page_lsn_ptr(p) = p->LSN;
writelock(p->rwlatch,0);
stasis_page_slotted_initialize_page(p);
unlock(p->rwlatch);
multiple_simultaneous_pages(p);
// Normally, you would call pageWriteLSN() to update the LSN. This
// is a hack, since Tdeinit() will crash if it detects page updates
@ -424,10 +420,8 @@ START_TEST(pageThreadTest) {
fail_unless(1, NULL);
Page * p = loadPage(-1, 2);
writelock(p->rwlatch,0);
memset(p->memAddr, 0, PAGE_SIZE);
stasis_page_slotted_initialize_page(p);
unlock(p->rwlatch);
p->LSN = 0;
*stasis_page_lsn_ptr(p) = p->LSN;
@ -465,10 +459,8 @@ START_TEST(fixedPageThreadTest) {
pthread_mutex_init(&lsn_mutex, NULL);
Tinit();
Page * p = loadPage(-1, 2);
writelock(p->rwlatch,0);
memset(p->memAddr, 0, PAGE_SIZE);
stasis_fixed_initialize_page(p, sizeof(int), 0);
unlock(p->rwlatch);
p->LSN = 0;
*stasis_page_lsn_ptr(p) = p->LSN;
@ -499,19 +491,14 @@ START_TEST(pageCheckSlotTypeTest) {
recordid blob = Talloc(xid, PAGE_SIZE * 2);
Page * p = loadPage(-1, slot.page);
readlock(p->rwlatch, 0);
assert(stasis_record_type_read(xid, p, slot) == NORMAL_SLOT);
assert(stasis_record_length_read(xid, p, slot) == sizeof(int));
unlock(p->rwlatch);
releasePage(p);
/** @todo the use of the fixedRoot recordid to check getRecordType is
a bit questionable, but should work. */
p = loadPage(-1, fixedRoot.page);
readlock(p->rwlatch, 0);
assert(stasis_record_type_read(xid, p, fixedRoot) == NORMAL_SLOT);
unlock(p->rwlatch);
releasePage(p);
fixedRoot.slot = 1;
@ -519,9 +506,7 @@ START_TEST(pageCheckSlotTypeTest) {
assert(TrecordType(xid, fixedRoot) == NORMAL_SLOT);
p = loadPage(-1, blob.page);
readlock(p->rwlatch, 0);
int type = stasis_record_type_read(xid, p, blob);
unlock(p->rwlatch);
assert(type == BLOB_SLOT);
releasePage(p);
@ -531,7 +516,6 @@ START_TEST(pageCheckSlotTypeTest) {
bad.size = 4;
p = loadPage(xid, bad.page);
readlock(p->rwlatch, 0);
assert(stasis_record_type_read(xid, p, bad) == INVALID_SLOT);
bad.size = 100000;
assert(stasis_record_type_read(xid, p, bad) == INVALID_SLOT);
@ -540,7 +524,6 @@ START_TEST(pageCheckSlotTypeTest) {
assert(stasis_record_type_read(xid, p, bad) == NORMAL_SLOT);
p->LSN = 0;
*stasis_page_lsn_ptr(p) = p->LSN;
unlock(p->rwlatch);
releasePage(p);
Tcommit(xid);
@ -596,8 +579,6 @@ START_TEST(pageTreeOpTest) {
// run a sanity check on a page pinned in ram; don't bother logging (since we don't care about recovery for this test)
Page *p = loadPage(xid, page);
writelock(p->rwlatch, 0);
stasis_page_slotted_initialize_page(p);
recordid rids[5];
@ -642,8 +623,6 @@ START_TEST(pageTreeOpTest) {
stasis_record_read(xid, p, rids[i], (byte*)(&j));
assert(i == j);
}
// (tdeinit till fsck it at shutdown)
unlock(p->rwlatch);
releasePage(p);