dirty page table flush was pinning pages, then asserting they were not pinned; updated page writeback api to allow correct usage

This commit is contained in:
Sears Russell 2009-08-08 01:43:03 +00:00
parent c92ee87c32
commit 907a24b68e
7 changed files with 32 additions and 22 deletions

View file

@ -161,7 +161,7 @@ Page * (*loadPageImpl)(int xid, pageid_t pageid, pagetype_t type) = 0;
Page * (*loadUninitPageImpl)(int xid, pageid_t pageid) = 0; Page * (*loadUninitPageImpl)(int xid, pageid_t pageid) = 0;
Page * (*getCachedPageImpl)(int xid, pageid_t pageid) = 0; Page * (*getCachedPageImpl)(int xid, pageid_t pageid) = 0;
void (*releasePageImpl)(Page * p) = 0; void (*releasePageImpl)(Page * p) = 0;
int (*writeBackPage)(Page * p) = 0; int (*writeBackPage)(pageid_t page) = 0;
void (*forcePages)() = 0; void (*forcePages)() = 0;
void (*forcePageRange)(pageid_t start, pageid_t stop) = 0; void (*forcePageRange)(pageid_t start, pageid_t stop) = 0;
void (*stasis_buffer_manager_close)() = 0; void (*stasis_buffer_manager_close)() = 0;

View file

@ -90,21 +90,24 @@ inline static void checkPageState(Page * p) { }
#endif #endif
inline static int tryToWriteBackPage(Page * p) { inline static int tryToWriteBackPage(pageid_t page) {
Page * p = lhfind(cachedPages, &page, sizeof(page));
if(!p) { return ENOENT; }
assert(p->id == page);
if(*pagePendingPtr(p) || *pagePinCountPtr(p)) { if(*pagePendingPtr(p) || *pagePinCountPtr(p)) {
return 0; return EBUSY;
} }
DEBUG("Write(%ld)\n", (long)victim->id); DEBUG("Write(%ld)\n", (long)victim->id);
page_handle->write(page_handle, p); /// XXX pageCleanup and pageFlushed might be heavyweight. page_handle->write(page_handle, p); /// XXX pageCleanup and pageFlushed might be heavyweight.
stasis_page_cleanup(p);
assert(!p->dirty); assert(!p->dirty);
// Make sure that no one mistakenly thinks this is still a live copy.
p->id = -1;
return 1; return 0;
} }
/** You need to hold mut before calling this. /** You need to hold mut before calling this.
@ -126,11 +129,17 @@ inline static Page * writeBackOnePage() {
checkPageState(victim); checkPageState(victim);
lru->remove(lru, victim); lru->remove(lru, victim);
int err= tryToWriteBackPage(victim->id);
assert(!err);
Page * old = LH_ENTRY(remove)(cachedPages, &(victim->id), sizeof(victim->id)); Page * old = LH_ENTRY(remove)(cachedPages, &(victim->id), sizeof(victim->id));
assert(old == victim); assert(old == victim);
int couldWriteBackPage = tryToWriteBackPage(victim); stasis_page_cleanup(victim);
assert(couldWriteBackPage); // Make sure that no one mistakenly thinks this is still a live copy.
victim->id = -1;
#ifdef LATCH_SANITY_CHECKING #ifdef LATCH_SANITY_CHECKING
// We can release the lock since we just grabbed it to see if // We can release the lock since we just grabbed it to see if
@ -369,8 +378,8 @@ static void bhReleasePage(Page * p) {
#endif #endif
pthread_mutex_unlock(&mut); pthread_mutex_unlock(&mut);
} }
static int bhWriteBackPage(Page * p) { static int bhWriteBackPage(pageid_t pageid) {
return tryToWriteBackPage(p); return tryToWriteBackPage(pageid);
} }
static void bhForcePages() { static void bhForcePages() {
page_handle->force_file(page_handle); page_handle->force_file(page_handle);

View file

@ -31,9 +31,12 @@ static stasis_page_handle_t * page_handle;
static stasis_buffer_pool_t * stasis_buffer_pool; static stasis_buffer_pool_t * stasis_buffer_pool;
static int pageWrite_legacyWrapper(Page * p) { static int pageWrite_legacyWrapper(pageid_t pageid) {
Page * p = loadPage(-1, pageid);
// XXX this is unsafe; the page could be pinned!
page_handle->write(page_handle,p); page_handle->write(page_handle,p);
return 1; // XXX probably unsafe. releasePage(p);
return 0;
} }
static void forcePageFile_legacyWrapper() { static void forcePageFile_legacyWrapper() {
page_handle->force_file(page_handle); page_handle->force_file(page_handle);

View file

@ -47,7 +47,7 @@ static void paReleasePage(Page * p) {
stasis_dirty_page_table_set_clean(stasis_runtime_dirty_page_table(), p); stasis_dirty_page_table_set_clean(stasis_runtime_dirty_page_table(), p);
} }
static int paWriteBackPage(Page * p) { return 1; /* no-op */ } static int paWriteBackPage(pageid_t p) { return 0; /* no-op */ }
static void paForcePages() { /* no-op */ } static void paForcePages() { /* no-op */ }
static void paForcePageRange(pageid_t start, pageid_t stop) { /* no-op */ } static void paForcePageRange(pageid_t start, pageid_t stop) { /* no-op */ }

View file

@ -106,12 +106,8 @@ void stasis_dirty_page_table_flush_range(stasis_dirty_page_table_t * dirtyPages,
pthread_mutex_unlock(&dirtyPages->mutex); pthread_mutex_unlock(&dirtyPages->mutex);
for(pageid_t i = 0; i < n; i++) { for(pageid_t i = 0; i < n; i++) {
Page * p = getCachedPage(-1, staleDirtyPages[i]); int err = writeBackPage(staleDirtyPages[i]);
if(p) { if(stop && (err == EBUSY)) { abort(); /*api violation!*/ }
int succ = writeBackPage(p);
if(stop && (!succ)) { abort(); /*api violation!*/ }
releasePage(p);
}
} }
free(staleDirtyPages); free(staleDirtyPages);
forcePageRange(start*PAGE_SIZE,stop*PAGE_SIZE); forcePageRange(start*PAGE_SIZE,stop*PAGE_SIZE);

View file

@ -138,8 +138,10 @@ extern void (*releasePageImpl)(Page * p);
(Not all buffer managers support synchronous writes to stable (Not all buffer managers support synchronous writes to stable
storage. For compatibility, such buffer managers should ignore storage. For compatibility, such buffer managers should ignore
this call.) this call.)
@return 0 on success, ENOENT if the page is not in cache, and EBUSY if the page is pinned.
*/ */
extern int (*writeBackPage)(Page * p); extern int (*writeBackPage)(pageid_t p);
/** /**
Force any written back pages to disk. Force any written back pages to disk.

View file

@ -130,7 +130,7 @@ Suite * check_suite(void) {
/* Sub tests are added, one per line, here */ /* Sub tests are added, one per line, here */
tcase_add_test(tc, dirtyPageTable_randomTest); tcase_add_test(tc, dirtyPageTable_randomTest);
// tcase_add_test(tc, dirtyPageTable_threadTest); tcase_add_test(tc, dirtyPageTable_threadTest);
/* --------------------------------------------- */ /* --------------------------------------------- */