diff --git a/src/stasis/bufferManager/concurrentBufferManager.c b/src/stasis/bufferManager/concurrentBufferManager.c index a93ee05..82fad89 100644 --- a/src/stasis/bufferManager/concurrentBufferManager.c +++ b/src/stasis/bufferManager/concurrentBufferManager.c @@ -16,6 +16,8 @@ #endif // CONCURRENT_LRU #endif // NO_CONCURRENT_LRU +//#define STRESS_TEST_WRITEBACK 1 // if defined, writeback as much as possible, as fast as possible. + typedef struct { Page *p; stasis_buffer_manager_t *bm; @@ -58,7 +60,11 @@ static inline int needFlush(stasis_buffer_manager_t * bm) { if(count > needed) { DEBUG("Need flush? Dirty: %lld Total: %lld ret = %d\n", count, needed, count > needed); } +#ifdef STRESS_TEST_WRITEBACK + return count > 0; +#else return count > needed; +#endif } static int chWriteBackPage_helper(stasis_buffer_manager_t* bm, pageid_t pageid, int is_hint) { @@ -72,6 +78,7 @@ static int chWriteBackPage_helper(stasis_buffer_manager_t* bm, pageid_t pageid, if(is_hint) { if(!trywritelock(p->loadlatch,0)) { ret = EBUSY; + p->needsFlush = 1; // Not atomic. Oh well. } } else { // Uggh. With the current design, it's possible that the trywritelock will block on the writeback thread. @@ -85,6 +92,7 @@ static int chWriteBackPage_helper(stasis_buffer_manager_t* bm, pageid_t pageid, if(ret) { return ret; } // write calls stasis_page_flushed(p); ch->page_handle->write(ch->page_handle, p); + p->needsFlush = 0; unlock(p->loadlatch); return 0; } @@ -260,7 +268,13 @@ static Page * chLoadUninitPageImpl(stasis_buffer_manager_t *bm, int xid, const p static void chReleasePage(stasis_buffer_manager_t * bm, Page * p) { stasis_buffer_concurrent_hash_t * ch = bm->impl; ch->lru->insert(ch->lru, p); + int doFlush = p->needsFlush; + pageid_t pid = p->id; unlock(p->loadlatch); + if(doFlush) { + DEBUG(__FILE__ "releasePage: Page writeback is hungry. Pageid = %lld Do flush = %d\n", (long long)pid, doFlush); + bm->tryToWriteBackPage(bm, pid); + } } static void chForcePages(stasis_buffer_manager_t* bm, stasis_buffer_manager_handle_t *h) { stasis_buffer_concurrent_hash_t * ch = bm->impl; diff --git a/src/stasis/bufferPool.c b/src/stasis/bufferPool.c index 840618f..7676c34 100644 --- a/src/stasis/bufferPool.c +++ b/src/stasis/bufferPool.c @@ -94,6 +94,7 @@ stasis_buffer_pool_t* stasis_buffer_pool_init() { ret->pool[i].memAddr = calloc(1, PAGE_SIZE); #endif ret->pool[i].dirty = 0; + ret->pool[i].needsFlush = 0; } return ret; } diff --git a/stasis/page.h b/stasis/page.h index e03e049..bde9006 100644 --- a/stasis/page.h +++ b/stasis/page.h @@ -136,6 +136,7 @@ struct Page_s { lsn_t LSN; byte *memAddr; byte dirty; + byte needsFlush; /** The next item in the replacement policy's queue */ struct Page_s *next; /** The previous item in the replacement policy's queue. */