Remove LRU-ordered writeback. Instead, writeback pages sequentially once some fraction of the page file is dirty. Also. further simplifies writeback code.
This commit is contained in:
parent
0f2b2ff200
commit
0cc65aefaf
5 changed files with 47 additions and 95 deletions
|
@ -21,13 +21,8 @@ static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
|
|||
static pthread_cond_t readComplete = PTHREAD_COND_INITIALIZER;
|
||||
static pthread_cond_t needFree = PTHREAD_COND_INITIALIZER;
|
||||
|
||||
static pageid_t freeLowWater;
|
||||
static pageid_t freeListLength;
|
||||
static pageid_t freeCount;
|
||||
static pageid_t pageCount;
|
||||
|
||||
static Page ** freeList;
|
||||
|
||||
// A page is in LRU iff !pending, !pinned
|
||||
static replacementPolicy * lru;
|
||||
|
||||
|
@ -109,47 +104,6 @@ inline static int tryToWriteBackPage(pageid_t page) {
|
|||
|
||||
return 0;
|
||||
}
|
||||
/** You need to hold mut before calling this.
|
||||
|
||||
@return the page that was just written back. It will not be in
|
||||
lru or cachedPages after the call returns.
|
||||
*/
|
||||
inline static Page * writeBackOnePage() {
|
||||
Page * victim = lru->getStale(lru);
|
||||
// Make sure we have an exclusive lock on victim.
|
||||
if(!victim) return 0;
|
||||
|
||||
assert(! *pagePendingPtr(victim));
|
||||
assert(! *pagePinCountPtr(victim));
|
||||
#ifdef LATCH_SANITY_CHECKING
|
||||
int latched = trywritelock(victim->loadlatch,0);
|
||||
assert(latched);
|
||||
#endif
|
||||
|
||||
checkPageState(victim);
|
||||
|
||||
lru->remove(lru, victim);
|
||||
|
||||
int err= tryToWriteBackPage(victim->id);
|
||||
assert(!err);
|
||||
|
||||
Page * old = LH_ENTRY(remove)(cachedPages, &(victim->id), sizeof(victim->id));
|
||||
assert(old == victim);
|
||||
|
||||
stasis_page_cleanup(victim);
|
||||
// Make sure that no one mistakenly thinks this is still a live copy.
|
||||
victim->id = -1;
|
||||
|
||||
|
||||
#ifdef LATCH_SANITY_CHECKING
|
||||
// We can release the lock since we just grabbed it to see if
|
||||
// anyone else has pinned the page... the caller holds mut, so
|
||||
// no-one will touch the page for now.
|
||||
unlock(victim->loadlatch);
|
||||
#endif
|
||||
|
||||
return victim;
|
||||
}
|
||||
|
||||
/** Returns a free page. The page will not be in freeList,
|
||||
cachedPages or lru. */
|
||||
|
@ -163,20 +117,26 @@ inline static Page * getFreePage() {
|
|||
pageSetNode(ret,0,0);
|
||||
pageCount++;
|
||||
} else {
|
||||
if(!freeCount) {
|
||||
ret = writeBackOnePage();
|
||||
while((ret = lru->getStale(lru))) {
|
||||
// Make sure we have an exclusive lock on victim.
|
||||
if(!ret) {
|
||||
printf("bufferHash.c: Cannot find free page for application request.\nbufferHash.c: This should not happen unless all pages have been pinned.\nbufferHash.c: Crashing.");
|
||||
abort();
|
||||
}
|
||||
assert(!*pagePinCountPtr(ret));
|
||||
assert(!*pagePendingPtr(ret));
|
||||
if(ret->dirty) {
|
||||
pthread_mutex_unlock(&mut);
|
||||
stasis_dirty_page_table_flush_range(stasis_runtime_dirty_page_table(), 0, 0);
|
||||
pthread_mutex_lock(&mut);
|
||||
} else {
|
||||
ret = freeList[freeCount-1];
|
||||
freeList[freeCount-1] = 0;
|
||||
freeCount--;
|
||||
break;
|
||||
}
|
||||
if(freeCount < freeLowWater) {
|
||||
pthread_cond_signal(&needFree);
|
||||
}
|
||||
|
||||
lru->remove(lru, ret);
|
||||
Page * check = LH_ENTRY(remove)(cachedPages, &ret->id, sizeof(ret->id));
|
||||
assert(check == ret);
|
||||
}
|
||||
assert(!*pagePinCountPtr(ret));
|
||||
assert(!*pagePendingPtr(ret));
|
||||
|
@ -188,26 +148,13 @@ inline static Page * getFreePage() {
|
|||
static void * writeBackWorker(void * ignored) {
|
||||
pthread_mutex_lock(&mut);
|
||||
while(1) {
|
||||
while(running && (freeCount == freeListLength || pageCount < MAX_BUFFER_SIZE)) {
|
||||
while(running && pageCount < MAX_BUFFER_SIZE) {
|
||||
pthread_cond_wait(&needFree, &mut);
|
||||
}
|
||||
if(!running) { break; }
|
||||
Page * victim = writeBackOnePage();
|
||||
|
||||
if(victim) {
|
||||
assert(freeCount < freeListLength);
|
||||
freeList[freeCount] = victim;
|
||||
freeCount++;
|
||||
assert(!pageGetNode(victim, 0));
|
||||
checkPageState(victim);
|
||||
} else {
|
||||
static int warned = 0;
|
||||
if(!warned) {
|
||||
printf("bufferHash.c: writeBackWorker() could not find a page to write back.\nbufferHash.c:\tThis means a significant fraction of the buffer pool is pinned.\n");
|
||||
warned = 1;
|
||||
}
|
||||
pthread_cond_wait(&needFree, &mut);
|
||||
}
|
||||
pthread_mutex_unlock(&mut);
|
||||
stasis_dirty_page_table_flush_range(stasis_runtime_dirty_page_table(), 0, 0);
|
||||
pthread_mutex_lock(&mut);
|
||||
}
|
||||
pthread_mutex_unlock(&mut);
|
||||
return 0;
|
||||
|
@ -290,23 +237,20 @@ static Page * bhLoadPageImpl_helper(int xid, const pageid_t pageid, int uninitia
|
|||
ret = LH_ENTRY(find)(cachedPages, &pageid,sizeof(pageid));
|
||||
|
||||
if(!ret) {
|
||||
|
||||
stasis_page_cleanup(ret2);
|
||||
// Make sure that no one mistakenly thinks this is still a live copy.
|
||||
ret2->id = -1;
|
||||
|
||||
// No, so we're ready to add it.
|
||||
ret = ret2;
|
||||
// Esacpe from this loop.
|
||||
break;
|
||||
} else {
|
||||
// Put the page back on the free list
|
||||
|
||||
// It's possible that we wrote this page back even though the
|
||||
// freeList didn't have any free space; extend free list if necessary.
|
||||
if(freeListLength == freeCount) {
|
||||
freeList = realloc(freeList, freeListLength+1);
|
||||
freeListLength++;
|
||||
}
|
||||
|
||||
freeList[freeCount] = ret2;
|
||||
assert(!pageGetNode(ret2, 0));
|
||||
freeCount++;
|
||||
// Put the page we were about to evict back in cached pages
|
||||
LH_ENTRY(insert)(cachedPages, &ret2->id, sizeof(ret2->id), ret2);
|
||||
lru->insert(lru, ret2);
|
||||
// On the next loop iteration, we'll probably return the page the other thread inserted for us.
|
||||
}
|
||||
// try again.
|
||||
} while(1);
|
||||
|
@ -352,6 +296,10 @@ static Page * bhLoadPageImpl_helper(int xid, const pageid_t pageid, int uninitia
|
|||
pthread_mutex_unlock(&mut);
|
||||
pthread_cond_broadcast(&readComplete);
|
||||
|
||||
// TODO Improve writeback policy
|
||||
if(stasis_dirty_page_table_dirty_count(stasis_runtime_dirty_page_table()) > MAX_BUFFER_SIZE / 5) {
|
||||
pthread_cond_signal(&needFree);
|
||||
}
|
||||
assert(ret->id == pageid);
|
||||
checkPageState (ret);
|
||||
return ret;
|
||||
|
@ -414,8 +362,6 @@ static void bhBufDeinit() {
|
|||
LH_ENTRY(closelist)(&iter);
|
||||
LH_ENTRY(destroy)(cachedPages);
|
||||
|
||||
free(freeList);
|
||||
|
||||
lru->deinit(lru);
|
||||
stasis_buffer_pool_deinit(stasis_buffer_pool);
|
||||
page_handle->close(page_handle);
|
||||
|
@ -439,8 +385,6 @@ static void bhSimulateBufferManagerCrash() {
|
|||
LH_ENTRY(closelist)(&iter);
|
||||
LH_ENTRY(destroy)(cachedPages);
|
||||
|
||||
free(freeList);
|
||||
|
||||
lru->deinit(lru);
|
||||
stasis_buffer_pool_deinit(stasis_buffer_pool);
|
||||
page_handle->close(page_handle);
|
||||
|
@ -470,13 +414,8 @@ void stasis_buffer_manager_hash_open(stasis_page_handle_t * h) {
|
|||
|
||||
cachedPages = LH_ENTRY(create)(MAX_BUFFER_SIZE);
|
||||
|
||||
freeListLength = 6 + MAX_BUFFER_SIZE / 100;
|
||||
freeLowWater = freeListLength - 5;
|
||||
freeCount = 0;
|
||||
pageCount = 0;
|
||||
|
||||
freeList = calloc(freeListLength, sizeof(Page*));
|
||||
|
||||
running = 1;
|
||||
|
||||
pthread_create(&worker, 0, writeBackWorker, 0);
|
||||
|
|
|
@ -119,6 +119,7 @@ void stasis_buffer_pool_free_page(stasis_buffer_pool_t * ret, Page *p, pageid_t
|
|||
writelock(p->rwlatch, 10);
|
||||
p->id = id;
|
||||
p->LSN = 0;
|
||||
p->pageType = UNINITIALIZED_PAGE;
|
||||
assert(!p->dirty);
|
||||
// p->dirty = 0;
|
||||
writeunlock(p->rwlatch);
|
||||
|
|
|
@ -27,6 +27,7 @@ static int dpt_cmp(const void *ap, const void * bp, const void * ignored) {
|
|||
|
||||
struct stasis_dirty_page_table_t {
|
||||
struct rbtree * table;
|
||||
pageid_t count;
|
||||
pthread_mutex_t mutex;
|
||||
};
|
||||
|
||||
|
@ -40,6 +41,7 @@ void stasis_dirty_page_table_set_dirty(stasis_dirty_page_table_t * dirtyPages, P
|
|||
e->lsn = p->LSN;
|
||||
const void * ret = rbsearch(e, dirtyPages->table);
|
||||
assert(ret == e); // otherwise, the entry was already in the table.
|
||||
dirtyPages->count++;
|
||||
} else {
|
||||
dpt_entry e = { p->id, 0};
|
||||
assert(rbfind(&e, dirtyPages->table));
|
||||
|
@ -57,6 +59,7 @@ void stasis_dirty_page_table_set_clean(stasis_dirty_page_table_t * dirtyPages, P
|
|||
assert(p->dirty);
|
||||
p->dirty = 0;
|
||||
free((void*)e);
|
||||
dirtyPages->count--;
|
||||
} else {
|
||||
assert(!p->dirty);
|
||||
}
|
||||
|
@ -88,6 +91,13 @@ lsn_t stasis_dirty_page_table_minRecLSN(stasis_dirty_page_table_t * dirtyPages)
|
|||
return lsn;
|
||||
}
|
||||
|
||||
pageid_t stasis_dirty_page_table_dirty_count(stasis_dirty_page_table_t * dirtyPages) {
|
||||
pthread_mutex_lock(&dirtyPages->mutex);
|
||||
pageid_t ret = dirtyPages->count;
|
||||
pthread_mutex_unlock(&dirtyPages->mutex);
|
||||
return ret;
|
||||
}
|
||||
|
||||
void stasis_dirty_page_table_flush(stasis_dirty_page_table_t * dirtyPages) {
|
||||
stasis_dirty_page_table_flush_range(dirtyPages, 0, 0); // pageid_t = 0 means flush to EOF.
|
||||
}
|
||||
|
|
|
@ -9,7 +9,7 @@
|
|||
out, or forcing the log too early?
|
||||
*/
|
||||
static void phWrite(stasis_page_handle_t * ph, Page * ret) {
|
||||
DEBUG("%lld\n", ret->id);
|
||||
DEBUG("\nPAGEWRITE %lld\n", ret->id);
|
||||
// This lock is only held to make the page implementation happy. We should
|
||||
// implicitly have exclusive access to the page before this function is called,
|
||||
// or we'll deadlock.
|
||||
|
|
|
@ -19,6 +19,8 @@ void stasis_dirty_page_table_set_dirty(stasis_dirty_page_table_t * dirtyPages, P
|
|||
void stasis_dirty_page_table_set_clean(stasis_dirty_page_table_t * dirtyPages, Page * p);
|
||||
int stasis_dirty_page_table_is_dirty(stasis_dirty_page_table_t * dirtyPages, Page * p);
|
||||
|
||||
pageid_t stasis_dirty_page_table_dirty_count(stasis_dirty_page_table_t * dirtyPages);
|
||||
|
||||
void stasis_dirty_page_table_flush(stasis_dirty_page_table_t * dirtyPages);
|
||||
lsn_t stasis_dirty_page_table_minRecLSN(stasis_dirty_page_table_t* dirtyPages);
|
||||
|
||||
|
|
Loading…
Reference in a new issue