diff --git a/src/stasis/bufferManager/bufferHash.c b/src/stasis/bufferManager/bufferHash.c index 0c9525a..8a3f4eb 100644 --- a/src/stasis/bufferManager/bufferHash.c +++ b/src/stasis/bufferManager/bufferHash.c @@ -228,7 +228,7 @@ static Page * bhLoadPageImpl_helper(stasis_buffer_manager_t* bm, stasis_buffer_m int xid, const pageid_t pageid, int uninitialized, pagetype_t type) { stasis_buffer_hash_t * bh = bm->impl; - DEBUG("load %lld (%d)\n", pageid, uninitialized); + DEBUG("loadPage(%lld) (uninitialized = %d)\n", pageid, uninitialized); // Note: Calls to loadlatch in this function violate lock order, but // should be safe, since we make sure no one can have a writelock @@ -403,6 +403,7 @@ void bhPrefetchPagesImpl(stasis_buffer_manager_t *bm, pageid_t pageid, pageid_t } static void bhReleasePage(stasis_buffer_manager_t * bm, Page * p) { + DEBUG("releasePage(%lld) (rwlatch = %llx)\n", p->id, (long long)p->rwlatch); stasis_buffer_hash_t * bh = bm->impl; pthread_mutex_lock(&bh->mut); checkPageState(p); diff --git a/src/stasis/dirtyPageTable.c b/src/stasis/dirtyPageTable.c index fc4f519..9e296be 100644 --- a/src/stasis/dirtyPageTable.c +++ b/src/stasis/dirtyPageTable.c @@ -35,16 +35,17 @@ struct stasis_dirty_page_table_t { }; void stasis_dirty_page_table_set_dirty(stasis_dirty_page_table_t * dirtyPages, Page * p) { - assert(!tryreadlock(p->rwlatch,0)); if(!p->dirty) { - p->dirty = 1; - dpt_entry * e = malloc(sizeof(*e)); - e->p = p->id; - e->lsn = p->LSN; pthread_mutex_lock(&dirtyPages->mutex); - const void * ret = rbsearch(e, dirtyPages->table); - assert(ret == e); // otherwise, the entry was already in the table. - dirtyPages->count++; + if(!p->dirty) { + p->dirty = 1; + dpt_entry * e = malloc(sizeof(*e)); + e->p = p->id; + e->lsn = p->LSN; + const void * ret = rbsearch(e, dirtyPages->table); + assert(ret == e); // otherwise, the entry was already in the table. + dirtyPages->count++; + } pthread_mutex_unlock(&dirtyPages->mutex); #ifdef SANITY_CHECKS } else { diff --git a/src/stasis/page.c b/src/stasis/page.c index 0d2a2fc..adfe727 100644 --- a/src/stasis/page.c +++ b/src/stasis/page.c @@ -87,8 +87,6 @@ static page_impl page_impls[MAX_PAGE_TYPE]; static stasis_dirty_page_table_t * dirtyPages; void stasis_page_lsn_write(int xid, Page * page, lsn_t lsn) { - assertlocked(page->rwlatch); - if(page->LSN < lsn) { page->LSN = lsn; } @@ -99,7 +97,6 @@ void stasis_page_lsn_write(int xid, Page * page, lsn_t lsn) { } lsn_t stasis_page_lsn_read(const Page * page) { - assertlocked(page->rwlatch); return page->LSN; } @@ -146,7 +143,6 @@ page_impl * stasis_page_impl_get(int id) { return & page_impls[id]; } void stasis_record_write(int xid, Page * p, recordid rid, const byte *dat) { - assertlocked(p->rwlatch); assert( (p->id == rid.page) && (p->memAddr != NULL) ); assert(rid.size <= BLOB_THRESHOLD_SIZE); @@ -156,7 +152,6 @@ void stasis_record_write(int xid, Page * p, recordid rid, const byte *dat) { assert( (p->id == rid.page) && (p->memAddr != NULL) ); } int stasis_record_read(int xid, Page * p, recordid rid, byte *buf) { - assertlocked(p->rwlatch); assert(rid.page == p->id); assert(rid.size <= BLOB_THRESHOLD_SIZE); @@ -170,7 +165,6 @@ int stasis_record_read(int xid, Page * p, recordid rid, byte *buf) { @todo stasis_record_dereference should dispatch via page_impl... */ recordid stasis_record_dereference(int xid, Page * p, recordid rid) { - assertlocked(p->rwlatch); int page_type = p->pageType; if(page_type == ARRAY_LIST_PAGE) { @@ -182,14 +176,12 @@ recordid stasis_record_dereference(int xid, Page * p, recordid rid) { /// -------------- Dispatch functions const byte * stasis_record_read_begin(int xid, Page * p, recordid rid) { - assertlocked(p->rwlatch); int page_type = p->pageType; assert(page_type); return page_impls[page_type].recordRead(xid, p, rid); } byte * stasis_record_write_begin(int xid, Page * p, recordid rid) { - assertlocked(p->rwlatch); int page_type = p->pageType; assert(page_type); @@ -209,19 +201,16 @@ void stasis_record_write_done(int xid, Page *p, recordid rid, byte *b) { } } int stasis_record_type_read(int xid, Page *p, recordid rid) { - assertlocked(p->rwlatch); if(page_impls[p->pageType].recordGetType) return page_impls[p->pageType].recordGetType(xid, p, rid); else return INVALID_SLOT; } void stasis_record_type_write(int xid, Page *p, recordid rid, int type) { - assertlocked(p->rwlatch); page_impls[p->pageType] .recordSetType(xid, p, rid, type); } int stasis_record_length_read(int xid, Page *p, recordid rid) { - assertlocked(p->rwlatch); return page_impls[p->pageType] .recordGetLength(xid,p,rid); } diff --git a/src/stasis/page/fixed.c b/src/stasis/page/fixed.c index b8d6df2..c5a5f45 100644 --- a/src/stasis/page/fixed.c +++ b/src/stasis/page/fixed.c @@ -13,7 +13,6 @@ int stasis_fixed_records_per_page(size_t size) { } /** @todo CORRECTNESS Locking for stasis_fixed_initialize_page? (should hold writelock)*/ void stasis_fixed_initialize_page(Page * page, size_t size, int count) { - assertlocked(page->rwlatch); stasis_page_cleanup(page); // Zero out the page contents, since callers often do so anyway. // blows away LSN, but the copy that's in p->LSN will be put back on page at flush. @@ -25,7 +24,6 @@ void stasis_fixed_initialize_page(Page * page, size_t size, int count) { } static void checkRid(Page * page, recordid rid) { - assertlocked(page->rwlatch); assert(page->pageType); // any more specific breaks pages based on this one assert(page->id == rid.page); assert(*recordsize_ptr(page) == rid.size); @@ -35,21 +33,18 @@ static void checkRid(Page * page, recordid rid) { //-------------- New API below this line static const byte* fixedRead(int xid, Page *p, recordid rid) { - assertlocked(p->rwlatch); checkRid(p, rid); assert(rid.slot < *recordcount_ptr(p)); return fixed_record_ptr(p, rid.slot); } static byte* fixedWrite(int xid, Page *p, recordid rid) { - assertlocked(p->rwlatch); checkRid(p, rid); assert(rid.slot < *recordcount_ptr(p)); return fixed_record_ptr(p, rid.slot); } static int fixedGetType(int xid, Page *p, recordid rid) { - assertlocked(p->rwlatch); // checkRid(p, rid); if(rid.slot < *recordcount_ptr(p)) { int type = *recordsize_ptr(p); @@ -62,14 +57,12 @@ static int fixedGetType(int xid, Page *p, recordid rid) { } } static void fixedSetType(int xid, Page *p, recordid rid, int type) { - assertlocked(p->rwlatch); checkRid(p,rid); assert(rid.slot < *recordcount_ptr(p)); assert(stasis_record_type_to_size(type) == stasis_record_type_to_size(*recordsize_ptr(p))); *recordsize_ptr(p) = rid.size; } static int fixedGetLength(int xid, Page *p, recordid rid) { - assertlocked(p->rwlatch); assert(p->pageType); return rid.slot > *recordcount_ptr(p) ? INVALID_SLOT : stasis_record_type_to_size(*recordsize_ptr(p)); @@ -78,7 +71,6 @@ static int fixedGetLength(int xid, Page *p, recordid rid) { static int notSupported(int xid, Page * p) { return 0; } static int fixedFreespace(int xid, Page * p) { - assertlocked(p->rwlatch); if(stasis_fixed_records_per_page(*recordsize_ptr(p)) > *recordcount_ptr(p)) { // Return the size of a slot; that's the biggest record we can take. return stasis_record_type_to_size(*recordsize_ptr(p)); @@ -94,7 +86,6 @@ static void fixedCompactSlotIds(int xid, Page * p) { abort(); } static recordid fixedPreAlloc(int xid, Page *p, int size) { - assertlocked(p->rwlatch); if(stasis_fixed_records_per_page(*recordsize_ptr(p)) > *recordcount_ptr(p)) { recordid rid; rid.page = p->id; @@ -106,7 +97,6 @@ static recordid fixedPreAlloc(int xid, Page *p, int size) { } } static void fixedPostAlloc(int xid, Page *p, recordid rid) { - assertlocked(p->rwlatch); assert(*recordcount_ptr(p) == rid.slot); assert(*recordsize_ptr(p) == rid.size); (*recordcount_ptr(p))++; @@ -115,7 +105,6 @@ static void fixedSplice(int xid, Page *p, slotid_t first, slotid_t second) { abort(); } static void fixedFree(int xid, Page *p, recordid rid) { - assertlocked(p->rwlatch); if(*recordsize_ptr(p) == rid.slot+1) { (*recordsize_ptr(p))--; } else { diff --git a/src/stasis/page/slotted.c b/src/stasis/page/slotted.c index 2f1c0c8..942c266 100644 --- a/src/stasis/page/slotted.c +++ b/src/stasis/page/slotted.c @@ -36,8 +36,6 @@ */ static inline void slottedFsck(const Page const * page) { - assertlocked(page->rwlatch); - Page dummy; dummy.id = -1; @@ -146,15 +144,11 @@ static inline void slottedFsck(const Page const * page) { } /** - Move all of the records to the beginning of the page in order to increase the available free space. - -The caller of this function must have a writelock on the page. */ static void slottedCompact(Page * page) { - assertlocked(page->rwlatch); Page bufPage; byte buffer[PAGE_SIZE]; bufPage.memAddr = buffer; @@ -234,7 +228,6 @@ static void slottedCompactSlotIDs(int xid, Page * p) { of a subsequent call to really_do_ralloc(). */ static size_t slottedFreespaceForSlot(Page * page, int slot) { - assertlocked(page->rwlatch); size_t slotOverhead; if(slot == INVALID_SLOT) { @@ -581,7 +574,6 @@ void stasis_page_slotted_deinit() { } void stasis_page_slotted_initialize_page(Page * page) { - assertlocked(page->rwlatch); stasis_page_cleanup(page); page->pageType = SLOTTED_PAGE; *stasis_page_slotted_freespace_ptr(page) = 0; diff --git a/test/stasis/check_operations.c b/test/stasis/check_operations.c index de13c02..bc08cd8 100644 --- a/test/stasis/check_operations.c +++ b/test/stasis/check_operations.c @@ -70,11 +70,9 @@ START_TEST(operation_physical_do_undo) { long long pnum = TpageAlloc(xid); Page * p = loadPage(xid, pnum); - writelock(p->rwlatch, 0); stasis_page_slotted_initialize_page(p); rid = stasis_record_alloc_begin(xid, p, sizeof(int)); stasis_record_alloc_done(xid, p, rid); - unlock(p->rwlatch); releasePage(p); DEBUG("A\n"); @@ -99,11 +97,9 @@ START_TEST(operation_physical_do_undo) { DEBUG("B\n"); p = loadPage(xid, rid.page); - writelock(p->rwlatch,0); // manually fill in UNDO field stasis_record_write(xid, p, rid, (byte*)&buf); stasis_page_lsn_write(xid, p, lsn); - unlock(p->rwlatch); releasePage(p); setToTwo->LSN = 10; @@ -115,9 +111,7 @@ START_TEST(operation_physical_do_undo) { releasePage(p); p = loadPage(xid, rid.page); - writelock(p->rwlatch,0); stasis_record_read(xid, p, rid, (byte*)&buf); - unlock(p->rwlatch); releasePage(p); assert(buf == 2); @@ -137,6 +131,7 @@ START_TEST(operation_physical_do_undo) { releasePage(p); p = loadPage(xid, rid.page); + readlock(p->rwlatch,0); stasis_record_read(xid, p, rid, (byte*)&buf); @@ -165,10 +160,8 @@ START_TEST(operation_physical_do_undo) { buf = 1; p = loadPage(xid, rid.page); - writelock(p->rwlatch,0); stasis_record_write(xid, p, rid, (byte*)&buf); stasis_page_lsn_write(xid, p, lsn); - unlock(p->rwlatch); releasePage(p); /* Trace of test: @@ -558,7 +551,6 @@ START_TEST(operation_lsn_free) { int xid = Tbegin(); pageid_t pid = TpageAlloc(xid); Page * p = loadPage(xid,pid); - writelock(p->rwlatch,0); stasis_slotted_lsn_free_initialize_page(p); // XXX hack! byte * old = malloc(PAGE_SIZE); @@ -573,7 +565,6 @@ START_TEST(operation_lsn_free) { byte * new = malloc(PAGE_SIZE); memcpy(new, p->memAddr, PAGE_SIZE); memcpy(p->memAddr, old, PAGE_SIZE); - unlock(p->rwlatch); releasePage(p); TpageSet(xid, pid, new); free(old); @@ -624,7 +615,6 @@ START_TEST(operation_reorderable) { int xid = Tbegin(); pageid_t pid = TpageAlloc(xid); Page * p = loadPage(xid,pid); - writelock(p->rwlatch,0); stasis_slotted_lsn_free_initialize_page(p); // XXX hack! byte * old = malloc(PAGE_SIZE); @@ -639,7 +629,6 @@ START_TEST(operation_reorderable) { byte * new = malloc(PAGE_SIZE); memcpy(new, p->memAddr, PAGE_SIZE); memcpy(p->memAddr, old, PAGE_SIZE); - unlock(p->rwlatch); releasePage(p); TpageSet(xid, pid, new); free(old); diff --git a/test/stasis/check_page.c b/test/stasis/check_page.c index 1698f1e..6f87262 100644 --- a/test/stasis/check_page.c +++ b/test/stasis/check_page.c @@ -341,24 +341,21 @@ START_TEST(pageRecordSizeTypeIteratorTest) { pageid_t pid = TpageAlloc(xid); Page * p = loadPage(xid,pid); - writelock(p->rwlatch,0); + memset(p->memAddr, 0, PAGE_SIZE); stasis_page_slotted_initialize_page(p); checkPageIterators(xid,p,10); - unlock(p->rwlatch); - pid = TpageAlloc(xid); releasePage(p); p = loadPage(xid,pid); - writelock(p->rwlatch,0); + memset(p->memAddr, 0, PAGE_SIZE); stasis_fixed_initialize_page(p,sizeof(int64_t),0); checkPageIterators(xid,p,10); - unlock(p->rwlatch); releasePage(p); Tcommit(xid); @@ -385,9 +382,8 @@ START_TEST(pageNoThreadMultPageTest) p->LSN = 0; *stasis_page_lsn_ptr(p) = p->LSN; - writelock(p->rwlatch,0); stasis_page_slotted_initialize_page(p); - unlock(p->rwlatch); + multiple_simultaneous_pages(p); // Normally, you would call pageWriteLSN() to update the LSN. This // is a hack, since Tdeinit() will crash if it detects page updates @@ -424,10 +420,8 @@ START_TEST(pageThreadTest) { fail_unless(1, NULL); Page * p = loadPage(-1, 2); - writelock(p->rwlatch,0); memset(p->memAddr, 0, PAGE_SIZE); stasis_page_slotted_initialize_page(p); - unlock(p->rwlatch); p->LSN = 0; *stasis_page_lsn_ptr(p) = p->LSN; @@ -465,10 +459,8 @@ START_TEST(fixedPageThreadTest) { pthread_mutex_init(&lsn_mutex, NULL); Tinit(); Page * p = loadPage(-1, 2); - writelock(p->rwlatch,0); memset(p->memAddr, 0, PAGE_SIZE); stasis_fixed_initialize_page(p, sizeof(int), 0); - unlock(p->rwlatch); p->LSN = 0; *stasis_page_lsn_ptr(p) = p->LSN; @@ -499,19 +491,14 @@ START_TEST(pageCheckSlotTypeTest) { recordid blob = Talloc(xid, PAGE_SIZE * 2); Page * p = loadPage(-1, slot.page); - readlock(p->rwlatch, 0); assert(stasis_record_type_read(xid, p, slot) == NORMAL_SLOT); assert(stasis_record_length_read(xid, p, slot) == sizeof(int)); - unlock(p->rwlatch); releasePage(p); /** @todo the use of the fixedRoot recordid to check getRecordType is a bit questionable, but should work. */ p = loadPage(-1, fixedRoot.page); - - readlock(p->rwlatch, 0); assert(stasis_record_type_read(xid, p, fixedRoot) == NORMAL_SLOT); - unlock(p->rwlatch); releasePage(p); fixedRoot.slot = 1; @@ -519,9 +506,7 @@ START_TEST(pageCheckSlotTypeTest) { assert(TrecordType(xid, fixedRoot) == NORMAL_SLOT); p = loadPage(-1, blob.page); - readlock(p->rwlatch, 0); int type = stasis_record_type_read(xid, p, blob); - unlock(p->rwlatch); assert(type == BLOB_SLOT); releasePage(p); @@ -531,7 +516,6 @@ START_TEST(pageCheckSlotTypeTest) { bad.size = 4; p = loadPage(xid, bad.page); - readlock(p->rwlatch, 0); assert(stasis_record_type_read(xid, p, bad) == INVALID_SLOT); bad.size = 100000; assert(stasis_record_type_read(xid, p, bad) == INVALID_SLOT); @@ -540,7 +524,6 @@ START_TEST(pageCheckSlotTypeTest) { assert(stasis_record_type_read(xid, p, bad) == NORMAL_SLOT); p->LSN = 0; *stasis_page_lsn_ptr(p) = p->LSN; - unlock(p->rwlatch); releasePage(p); Tcommit(xid); @@ -596,8 +579,6 @@ START_TEST(pageTreeOpTest) { // run a sanity check on a page pinned in ram; don't bother logging (since we don't care about recovery for this test) Page *p = loadPage(xid, page); - writelock(p->rwlatch, 0); - stasis_page_slotted_initialize_page(p); recordid rids[5]; @@ -642,8 +623,6 @@ START_TEST(pageTreeOpTest) { stasis_record_read(xid, p, rids[i], (byte*)(&j)); assert(i == j); } - // (tdeinit till fsck it at shutdown) - unlock(p->rwlatch); releasePage(p);