diff --git a/src/stasis/bufferManager.c b/src/stasis/bufferManager.c index 4645dca..bbdc5e7 100644 --- a/src/stasis/bufferManager.c +++ b/src/stasis/bufferManager.c @@ -161,7 +161,7 @@ Page * (*loadPageImpl)(int xid, pageid_t pageid, pagetype_t type) = 0; Page * (*loadUninitPageImpl)(int xid, pageid_t pageid) = 0; Page * (*getCachedPageImpl)(int xid, pageid_t pageid) = 0; void (*releasePageImpl)(Page * p) = 0; -void (*writeBackPage)(Page * p) = 0; +int (*writeBackPage)(Page * p) = 0; void (*forcePages)() = 0; void (*forcePageRange)(pageid_t start, pageid_t stop) = 0; void (*stasis_buffer_manager_close)() = 0; diff --git a/src/stasis/bufferManager/bufferHash.c b/src/stasis/bufferManager/bufferHash.c index 9c49cd2..deef124 100644 --- a/src/stasis/bufferManager/bufferHash.c +++ b/src/stasis/bufferManager/bufferHash.c @@ -9,7 +9,6 @@ #include #include #include - #include #include @@ -47,7 +46,6 @@ static node_t * pageGetNode(void * page, void * ignore) { static void pageSetNode(void * page, node_t * n, void * ignore) { Page * p = page; p->prev = (Page *) n; - } static inline struct Page_s ** pagePendingPtr(Page * p) { @@ -92,6 +90,22 @@ inline static void checkPageState(Page * p) { } #endif +inline static int tryToWriteBackPage(Page * p) { + + if(*pagePendingPtr(p) || *pagePinCountPtr(p)) { + return 0; + } + + DEBUG("Write(%ld)\n", (long)victim->id); + page_handle->write(page_handle, p); /// XXX pageCleanup and pageFlushed might be heavyweight. + stasis_page_cleanup(p); + + assert(!p->dirty); + // Make sure that no one mistakenly thinks this is still a live copy. + p->id = -1; + + return 1; +} /** You need to hold mut before calling this. @return the page that was just written back. It will not be in @@ -115,11 +129,8 @@ inline static Page * writeBackOnePage() { Page * old = LH_ENTRY(remove)(cachedPages, &(victim->id), sizeof(victim->id)); assert(old == victim); - // printf("Write(%ld)\n", (long)victim->id); - page_handle->write(page_handle,victim); /// XXX pageCleanup and pageFlushed might be heavyweight. - stasis_page_cleanup(victim); - // Make sure that no one mistakenly thinks this is still a live copy. - victim->id = -1; + int couldWriteBackPage = tryToWriteBackPage(victim); + assert(couldWriteBackPage); #ifdef LATCH_SANITY_CHECKING // We can release the lock since we just grabbed it to see if @@ -127,6 +138,7 @@ inline static Page * writeBackOnePage() { // no-one will touch the page for now. unlock(victim->loadlatch); #endif + return victim; } @@ -160,6 +172,7 @@ inline static Page * getFreePage() { assert(!*pagePinCountPtr(ret)); assert(!*pagePendingPtr(ret)); assert(!pageGetNode(ret,0)); + assert(!ret->dirty); return ret; } @@ -176,6 +189,7 @@ static void * writeBackWorker(void * ignored) { assert(freeCount < freeListLength); freeList[freeCount] = victim; freeCount++; + assert(!pageGetNode(victim, 0)); checkPageState(victim); } else { static int warned = 0; @@ -196,13 +210,18 @@ static Page * bhGetCachedPage(int xid, const pageid_t pageid) { Page * ret = LH_ENTRY(find)(cachedPages, &pageid, sizeof(pageid)); if(ret) { checkPageState(ret); +#ifdef LATCH_SANITY_CHECKING + int locked = tryreadlock(ret->loadlatch,0); + assert(locked); +#endif if(!*pagePendingPtr(ret)) { - // good if(!*pagePinCountPtr(ret) ) { // Then ret is in lru (otherwise it would be pending, or not cached); remove it. lru->remove(lru, ret); } (*pagePinCountPtr(ret))++; + checkPageState(ret); + assert(ret->id == pageid); } else { ret = 0; } @@ -277,6 +296,7 @@ static Page * bhLoadPageImpl_helper(int xid, const pageid_t pageid, int uninitia } freeList[freeCount] = ret2; + assert(!pageGetNode(ret2, 0)); freeCount++; } // try again. @@ -303,7 +323,8 @@ static Page * bhLoadPageImpl_helper(int xid, const pageid_t pageid, int uninitia } else { memset(ret->memAddr,0,PAGE_SIZE); *stasis_page_lsn_ptr(ret) = ret->LSN; - ret->dirty = 0; + assert(!ret->dirty); +// ret->dirty = 0; stasis_page_loaded(ret, type); } *pagePendingPtr(ret) = 0; @@ -348,8 +369,8 @@ static void bhReleasePage(Page * p) { #endif pthread_mutex_unlock(&mut); } -static void bhWriteBackPage(Page * p) { - page_handle->write(page_handle, p); +static int bhWriteBackPage(Page * p) { + return tryToWriteBackPage(p); } static void bhForcePages() { page_handle->force_file(page_handle); diff --git a/src/stasis/bufferManager/legacy/legacyBufferManager.c b/src/stasis/bufferManager/legacy/legacyBufferManager.c index d202e04..bd810e7 100644 --- a/src/stasis/bufferManager/legacy/legacyBufferManager.c +++ b/src/stasis/bufferManager/legacy/legacyBufferManager.c @@ -31,8 +31,9 @@ static stasis_page_handle_t * page_handle; static stasis_buffer_pool_t * stasis_buffer_pool; -static void pageWrite_legacyWrapper(Page * p) { +static int pageWrite_legacyWrapper(Page * p) { page_handle->write(page_handle,p); + return 1; // XXX probably unsafe. } static void forcePageFile_legacyWrapper() { page_handle->force_file(page_handle); diff --git a/src/stasis/bufferManager/legacy/pageFile.c b/src/stasis/bufferManager/legacy/pageFile.c index 12c5dbb..e991bb4 100644 --- a/src/stasis/bufferManager/legacy/pageFile.c +++ b/src/stasis/bufferManager/legacy/pageFile.c @@ -67,8 +67,7 @@ static void pfPageRead(stasis_page_handle_t * h, Page *ret, pagetype_t type) { abort(); } } - - ret->dirty = 0; + assert(ret->dirty == 0); stasis_page_loaded(ret, type); pthread_mutex_unlock(&stable_mutex); @@ -78,9 +77,7 @@ static void pfPageRead(stasis_page_handle_t * h, Page *ret, pagetype_t type) { dirty page table can be kept up to date. */ static void pfPageWrite(stasis_page_handle_t * h, Page * ret) { /** If the page is clean, there's no reason to write it out. */ - assert(ret->dirty == stasis_dirty_page_table_is_dirty(h->dirtyPages, ret)); - if(!ret->dirty) { - // if(!stasis_dirty_page_table_is_dirty(ret)) { + if(!stasis_dirty_page_table_is_dirty(h->dirtyPages, ret)) { DEBUG(" =^)~ "); return; } diff --git a/src/stasis/bufferManager/pageArray.c b/src/stasis/bufferManager/pageArray.c index c538bf7..75e63d0 100644 --- a/src/stasis/bufferManager/pageArray.c +++ b/src/stasis/bufferManager/pageArray.c @@ -47,7 +47,7 @@ static void paReleasePage(Page * p) { stasis_dirty_page_table_set_clean(stasis_runtime_dirty_page_table(), p); } -static void paWriteBackPage(Page * p) { /* no-op */ } +static int paWriteBackPage(Page * p) { return 1; /* no-op */ } static void paForcePages() { /* no-op */ } static void paForcePageRange(pageid_t start, pageid_t stop) { /* no-op */ } diff --git a/src/stasis/bufferPool.c b/src/stasis/bufferPool.c index f907ba4..98e54dd 100644 --- a/src/stasis/bufferPool.c +++ b/src/stasis/bufferPool.c @@ -104,7 +104,7 @@ Page* stasis_buffer_pool_malloc_page(stasis_buffer_pool_t * ret) { pthread_mutex_lock(&ret->mut); page = &(ret->pool[ret->nextPage]); - + assert(!page->dirty); (ret->nextPage)++; /* There's a dummy page that we need to keep around, thus the +1 */ assert(ret->nextPage <= MAX_BUFFER_SIZE + 1); @@ -119,6 +119,7 @@ void stasis_buffer_pool_free_page(stasis_buffer_pool_t * ret, Page *p, pageid_t writelock(p->rwlatch, 10); p->id = id; p->LSN = 0; - p->dirty = 0; + assert(!p->dirty); +// p->dirty = 0; writeunlock(p->rwlatch); } diff --git a/src/stasis/dirtyPageTable.c b/src/stasis/dirtyPageTable.c index 6892ad5..b6f85c1 100644 --- a/src/stasis/dirtyPageTable.c +++ b/src/stasis/dirtyPageTable.c @@ -5,14 +5,28 @@ * Author: sears */ -#include +#include #include #include #include #include +#include + +typedef struct { + pageid_t p; + lsn_t lsn; +} dpt_entry; + +static int dpt_cmp(const void *ap, const void * bp, const void * ignored) { + const dpt_entry * a = ap; + const dpt_entry * b = bp; + + return a->p < b->p ? -1 : (a->p == b->p ? 0 : 1); +} + struct stasis_dirty_page_table_t { - pblHashTable_t * table; + struct rbtree * table; pthread_mutex_t mutex; }; @@ -20,30 +34,31 @@ void stasis_dirty_page_table_set_dirty(stasis_dirty_page_table_t * dirtyPages, P pthread_mutex_lock(&dirtyPages->mutex); if(!p->dirty) { p->dirty = 1; - //assert(p->LSN); - void* ret = pblHtLookup(dirtyPages->table, &(p->id), sizeof(p->id)); - assert(!ret); - lsn_t * insert = malloc(sizeof(lsn_t)); - *insert = p->LSN; - pblHtInsert(dirtyPages->table, &(p->id), sizeof(p->id), insert); //(void*)p->LSN); + dpt_entry * e = malloc(sizeof(*e)); + e->p = p->id; + e->lsn = p->LSN; + const void * ret = rbsearch(e, dirtyPages->table); + assert(ret == e); // otherwise, the entry was already in the table. + } else { + dpt_entry e = { p->id, 0}; + assert(rbfind(&e, dirtyPages->table)); } pthread_mutex_unlock(&dirtyPages->mutex); } void stasis_dirty_page_table_set_clean(stasis_dirty_page_table_t * dirtyPages, Page * p) { pthread_mutex_lock(&dirtyPages->mutex); - // printf("Removing page %d\n", p->id); - //assert(pblHtLookup(dirtyPages, &(p->id), sizeof(int))); - // printf("With lsn = %d\n", (lsn_t)pblHtCurrent(dirtyPages)); - p->dirty = 0; - lsn_t * old = pblHtLookup(dirtyPages->table, &(p->id),sizeof(p->id)); - pblHtRemove(dirtyPages->table, &(p->id), sizeof(p->id)); - if(old) { - free(old); + dpt_entry dummy = {p->id, 0}; + const dpt_entry * e = rbdelete(&dummy, dirtyPages->table); + + if(e) { + assert(e->p == p->id); + assert(p->dirty); + p->dirty = 0; + free((void*)e); + } else { + assert(!p->dirty); } - //assert(!ret); <--- Due to a bug in the PBL compatibility mode, - //there is no way to tell whether the value didn't exist, or if it - //was null. pthread_mutex_unlock(&dirtyPages->mutex); } @@ -51,105 +66,83 @@ int stasis_dirty_page_table_is_dirty(stasis_dirty_page_table_t * dirtyPages, Pag int ret; pthread_mutex_lock(&dirtyPages->mutex); ret = p->dirty; + dpt_entry e = { p->id, 0}; + const void* found = rbfind(&e, dirtyPages->table); + assert((found && ret) || !(found||ret)); pthread_mutex_unlock(&dirtyPages->mutex); return ret; } lsn_t stasis_dirty_page_table_minRecLSN(stasis_dirty_page_table_t * dirtyPages) { - lsn_t lsn = LSN_T_MAX; // LogFlushedLSN (); - pageid_t* pageid; + lsn_t lsn = LSN_T_MAX; pthread_mutex_lock(&dirtyPages->mutex); - - for( pageid = (pageid_t*)pblHtFirst (dirtyPages->table); pageid; pageid = (pageid_t*)pblHtNext(dirtyPages->table)) { - lsn_t * thisLSN = (lsn_t*) pblHtCurrent(dirtyPages->table); - // printf("lsn = %d\n", thisLSN); - if(*thisLSN < lsn) { - lsn = *thisLSN; + for(const dpt_entry * e = rbmin(dirtyPages->table); + e; + e = rblookup(RB_LUGREAT, e, dirtyPages->table)) { + if(e->lsn < lsn) { + lsn = e->lsn; } } pthread_mutex_unlock(&dirtyPages->mutex); - return lsn; } void stasis_dirty_page_table_flush(stasis_dirty_page_table_t * dirtyPages) { - pageid_t * staleDirtyPages = malloc(sizeof(pageid_t) * (MAX_BUFFER_SIZE)); - int i; - for(i = 0; i < MAX_BUFFER_SIZE; i++) { - staleDirtyPages[i] = -1; - } - Page* p = 0; - pthread_mutex_lock(&dirtyPages->mutex); - void* tmp; - i = 0; - - for(tmp = pblHtFirst(dirtyPages->table); tmp; tmp = pblHtNext(dirtyPages->table)) { - staleDirtyPages[i] = *((pageid_t*) pblHtCurrentKey(dirtyPages->table)); - i++; - } - assert(i < MAX_BUFFER_SIZE); - pthread_mutex_unlock(&dirtyPages->mutex); - - for(i = 0; i < MAX_BUFFER_SIZE && staleDirtyPages[i] != -1; i++) { - p = getCachedPage(-1, staleDirtyPages[i]); - if(p) { - writeBackPage(p); - releasePage(p); - } - } - free(staleDirtyPages); + stasis_dirty_page_table_flush_range(dirtyPages, 0, 0); // pageid_t = 0 means flush to EOF. } void stasis_dirty_page_table_flush_range(stasis_dirty_page_table_t * dirtyPages, pageid_t start, pageid_t stop) { - pageid_t * staleDirtyPages = malloc(sizeof(pageid_t) * (MAX_BUFFER_SIZE)); - int i; - Page * p = 0; pthread_mutex_lock(&dirtyPages->mutex); - - void *tmp; - i = 0; - for(tmp = pblHtFirst(dirtyPages->table); tmp; tmp = pblHtNext(dirtyPages->table)) { - pageid_t num = *((pageid_t*) pblHtCurrentKey(dirtyPages->table)); - if(num <= start && num < stop) { - staleDirtyPages[i] = num; - i++; - } + pageid_t * staleDirtyPages = 0; + pageid_t n = 0; + dpt_entry dummy = { start, 0 }; + for(const dpt_entry * e = rblookup(RB_LUGTEQ, &dummy, dirtyPages->table); + e && (stop == 0 || e->p < stop); + e = rblookup(RB_LUGREAT, e, dirtyPages->table)) { + n++; + staleDirtyPages = realloc(staleDirtyPages, sizeof(pageid_t) * n); + staleDirtyPages[n-1] = e->p; } - staleDirtyPages[i] = -1; pthread_mutex_unlock(&dirtyPages->mutex); - for(i = 0; i < MAX_BUFFER_SIZE && staleDirtyPages[i] != -1; i++) { - p = getCachedPage(-1, staleDirtyPages[i]); + for(pageid_t i = 0; i < n; i++) { + Page * p = getCachedPage(-1, staleDirtyPages[i]); if(p) { - writeBackPage(p); + int succ = writeBackPage(p); + if(stop && (!succ)) { abort(); /*api violation!*/ } releasePage(p); } } free(staleDirtyPages); forcePageRange(start*PAGE_SIZE,stop*PAGE_SIZE); - } + stasis_dirty_page_table_t * stasis_dirty_page_table_init() { stasis_dirty_page_table_t * ret = malloc(sizeof(*ret)); - ret->table = pblHtCreate(); + ret->table = rbinit(dpt_cmp, 0); pthread_mutex_init(&ret->mutex, 0); return ret; } - void stasis_dirty_page_table_deinit(stasis_dirty_page_table_t * dirtyPages) { - void * tmp; int areDirty = 0; - for(tmp = pblHtFirst(dirtyPages->table); tmp; tmp = pblHtNext(dirtyPages->table)) { - free(pblHtCurrent(dirtyPages->table)); + dpt_entry dummy = {0, 0}; + for(const dpt_entry * e = rblookup(RB_LUGTEQ, &dummy, dirtyPages->table); + e; + e = rblookup(RB_LUGREAT, &dummy, dirtyPages->table)) { + if((!areDirty) && (!stasis_suppress_unclean_shutdown_warnings)) { printf("Warning: dirtyPagesDeinit detected dirty, unwritten pages. " "Updates lost?\n"); areDirty = 1; } + dummy = *e; + rbdelete(e, dirtyPages->table); + free((void*)e); } - pblHtDelete(dirtyPages->table); + + rbdestroy(dirtyPages->table); pthread_mutex_destroy(&dirtyPages->mutex); free(dirtyPages); } diff --git a/src/stasis/pageHandle.c b/src/stasis/pageHandle.c index a468a36..b81bce6 100644 --- a/src/stasis/pageHandle.c +++ b/src/stasis/pageHandle.c @@ -9,11 +9,14 @@ out, or forcing the log too early? */ static void phWrite(stasis_page_handle_t * ph, Page * ret) { - if(!ret->dirty) { return; } + DEBUG("%lld\n", ret->id); // This lock is only held to make the page implementation happy. We should // implicitly have exclusive access to the page before this function is called, // or we'll deadlock. + + /// TODO Turn into trywritelock + test for trouble. writelock(ret->rwlatch,0); + if(!ret->dirty) { unlock(ret->rwlatch); return; } stasis_page_flushed(ret); if(ph->log) { stasis_log_force(ph->log, ret->LSN, LOG_FORCE_WAL); } int err = ((stasis_handle_t*)ph->impl)->write(ph->impl, PAGE_SIZE * ret->id, ret->memAddr, PAGE_SIZE); @@ -38,7 +41,7 @@ static void phRead(stasis_page_handle_t * ph, Page * ret, pagetype_t type) { abort(); } } - ret->dirty = 0; + assert(!ret->dirty); stasis_page_loaded(ret, type); unlock(ret->rwlatch); } diff --git a/stasis/bufferManager.h b/stasis/bufferManager.h index adc9f41..4c1648c 100644 --- a/stasis/bufferManager.h +++ b/stasis/bufferManager.h @@ -139,7 +139,7 @@ extern void (*releasePageImpl)(Page * p); storage. For compatibility, such buffer managers should ignore this call.) */ -extern void (*writeBackPage)(Page * p); +extern int (*writeBackPage)(Page * p); /** Force any written back pages to disk.