add writeback starvation avoidance logic to concurrentBufferManager

This commit is contained in:
Sears Russell 2010-07-09 01:55:54 +00:00
parent f376d46090
commit ee7aaff9ec
3 changed files with 16 additions and 0 deletions

View file

@ -16,6 +16,8 @@
#endif // CONCURRENT_LRU #endif // CONCURRENT_LRU
#endif // NO_CONCURRENT_LRU #endif // NO_CONCURRENT_LRU
//#define STRESS_TEST_WRITEBACK 1 // if defined, writeback as much as possible, as fast as possible.
typedef struct { typedef struct {
Page *p; Page *p;
stasis_buffer_manager_t *bm; stasis_buffer_manager_t *bm;
@ -58,7 +60,11 @@ static inline int needFlush(stasis_buffer_manager_t * bm) {
if(count > needed) { if(count > needed) {
DEBUG("Need flush? Dirty: %lld Total: %lld ret = %d\n", count, needed, count > needed); DEBUG("Need flush? Dirty: %lld Total: %lld ret = %d\n", count, needed, count > needed);
} }
#ifdef STRESS_TEST_WRITEBACK
return count > 0;
#else
return count > needed; return count > needed;
#endif
} }
static int chWriteBackPage_helper(stasis_buffer_manager_t* bm, pageid_t pageid, int is_hint) { static int chWriteBackPage_helper(stasis_buffer_manager_t* bm, pageid_t pageid, int is_hint) {
@ -72,6 +78,7 @@ static int chWriteBackPage_helper(stasis_buffer_manager_t* bm, pageid_t pageid,
if(is_hint) { if(is_hint) {
if(!trywritelock(p->loadlatch,0)) { if(!trywritelock(p->loadlatch,0)) {
ret = EBUSY; ret = EBUSY;
p->needsFlush = 1; // Not atomic. Oh well.
} }
} else { } else {
// Uggh. With the current design, it's possible that the trywritelock will block on the writeback thread. // Uggh. With the current design, it's possible that the trywritelock will block on the writeback thread.
@ -85,6 +92,7 @@ static int chWriteBackPage_helper(stasis_buffer_manager_t* bm, pageid_t pageid,
if(ret) { return ret; } if(ret) { return ret; }
// write calls stasis_page_flushed(p); // write calls stasis_page_flushed(p);
ch->page_handle->write(ch->page_handle, p); ch->page_handle->write(ch->page_handle, p);
p->needsFlush = 0;
unlock(p->loadlatch); unlock(p->loadlatch);
return 0; return 0;
} }
@ -260,7 +268,13 @@ static Page * chLoadUninitPageImpl(stasis_buffer_manager_t *bm, int xid, const p
static void chReleasePage(stasis_buffer_manager_t * bm, Page * p) { static void chReleasePage(stasis_buffer_manager_t * bm, Page * p) {
stasis_buffer_concurrent_hash_t * ch = bm->impl; stasis_buffer_concurrent_hash_t * ch = bm->impl;
ch->lru->insert(ch->lru, p); ch->lru->insert(ch->lru, p);
int doFlush = p->needsFlush;
pageid_t pid = p->id;
unlock(p->loadlatch); unlock(p->loadlatch);
if(doFlush) {
DEBUG(__FILE__ "releasePage: Page writeback is hungry. Pageid = %lld Do flush = %d\n", (long long)pid, doFlush);
bm->tryToWriteBackPage(bm, pid);
}
} }
static void chForcePages(stasis_buffer_manager_t* bm, stasis_buffer_manager_handle_t *h) { static void chForcePages(stasis_buffer_manager_t* bm, stasis_buffer_manager_handle_t *h) {
stasis_buffer_concurrent_hash_t * ch = bm->impl; stasis_buffer_concurrent_hash_t * ch = bm->impl;

View file

@ -94,6 +94,7 @@ stasis_buffer_pool_t* stasis_buffer_pool_init() {
ret->pool[i].memAddr = calloc(1, PAGE_SIZE); ret->pool[i].memAddr = calloc(1, PAGE_SIZE);
#endif #endif
ret->pool[i].dirty = 0; ret->pool[i].dirty = 0;
ret->pool[i].needsFlush = 0;
} }
return ret; return ret;
} }

View file

@ -136,6 +136,7 @@ struct Page_s {
lsn_t LSN; lsn_t LSN;
byte *memAddr; byte *memAddr;
byte dirty; byte dirty;
byte needsFlush;
/** The next item in the replacement policy's queue */ /** The next item in the replacement policy's queue */
struct Page_s *next; struct Page_s *next;
/** The previous item in the replacement policy's queue. */ /** The previous item in the replacement policy's queue. */