improved behavior for sequential I/O performance, fixed races during writeback

This commit is contained in:
Sears Russell 2009-08-07 18:27:52 +00:00
parent 68e947fb6d
commit c92ee87c32
9 changed files with 116 additions and 100 deletions

View file

@ -161,7 +161,7 @@ Page * (*loadPageImpl)(int xid, pageid_t pageid, pagetype_t type) = 0;
Page * (*loadUninitPageImpl)(int xid, pageid_t pageid) = 0;
Page * (*getCachedPageImpl)(int xid, pageid_t pageid) = 0;
void (*releasePageImpl)(Page * p) = 0;
void (*writeBackPage)(Page * p) = 0;
int (*writeBackPage)(Page * p) = 0;
void (*forcePages)() = 0;
void (*forcePageRange)(pageid_t start, pageid_t stop) = 0;
void (*stasis_buffer_manager_close)() = 0;

View file

@ -9,7 +9,6 @@
#include <stasis/replacementPolicy.h>
#include <stasis/bufferManager.h>
#include <stasis/page.h>
#include <assert.h>
#include <stdio.h>
@ -47,7 +46,6 @@ static node_t * pageGetNode(void * page, void * ignore) {
static void pageSetNode(void * page, node_t * n, void * ignore) {
Page * p = page;
p->prev = (Page *) n;
}
static inline struct Page_s ** pagePendingPtr(Page * p) {
@ -92,6 +90,22 @@ inline static void checkPageState(Page * p) { }
#endif
inline static int tryToWriteBackPage(Page * p) {
if(*pagePendingPtr(p) || *pagePinCountPtr(p)) {
return 0;
}
DEBUG("Write(%ld)\n", (long)victim->id);
page_handle->write(page_handle, p); /// XXX pageCleanup and pageFlushed might be heavyweight.
stasis_page_cleanup(p);
assert(!p->dirty);
// Make sure that no one mistakenly thinks this is still a live copy.
p->id = -1;
return 1;
}
/** You need to hold mut before calling this.
@return the page that was just written back. It will not be in
@ -115,11 +129,8 @@ inline static Page * writeBackOnePage() {
Page * old = LH_ENTRY(remove)(cachedPages, &(victim->id), sizeof(victim->id));
assert(old == victim);
// printf("Write(%ld)\n", (long)victim->id);
page_handle->write(page_handle,victim); /// XXX pageCleanup and pageFlushed might be heavyweight.
stasis_page_cleanup(victim);
// Make sure that no one mistakenly thinks this is still a live copy.
victim->id = -1;
int couldWriteBackPage = tryToWriteBackPage(victim);
assert(couldWriteBackPage);
#ifdef LATCH_SANITY_CHECKING
// We can release the lock since we just grabbed it to see if
@ -127,6 +138,7 @@ inline static Page * writeBackOnePage() {
// no-one will touch the page for now.
unlock(victim->loadlatch);
#endif
return victim;
}
@ -160,6 +172,7 @@ inline static Page * getFreePage() {
assert(!*pagePinCountPtr(ret));
assert(!*pagePendingPtr(ret));
assert(!pageGetNode(ret,0));
assert(!ret->dirty);
return ret;
}
@ -176,6 +189,7 @@ static void * writeBackWorker(void * ignored) {
assert(freeCount < freeListLength);
freeList[freeCount] = victim;
freeCount++;
assert(!pageGetNode(victim, 0));
checkPageState(victim);
} else {
static int warned = 0;
@ -196,13 +210,18 @@ static Page * bhGetCachedPage(int xid, const pageid_t pageid) {
Page * ret = LH_ENTRY(find)(cachedPages, &pageid, sizeof(pageid));
if(ret) {
checkPageState(ret);
#ifdef LATCH_SANITY_CHECKING
int locked = tryreadlock(ret->loadlatch,0);
assert(locked);
#endif
if(!*pagePendingPtr(ret)) {
// good
if(!*pagePinCountPtr(ret) ) {
// Then ret is in lru (otherwise it would be pending, or not cached); remove it.
lru->remove(lru, ret);
}
(*pagePinCountPtr(ret))++;
checkPageState(ret);
assert(ret->id == pageid);
} else {
ret = 0;
}
@ -277,6 +296,7 @@ static Page * bhLoadPageImpl_helper(int xid, const pageid_t pageid, int uninitia
}
freeList[freeCount] = ret2;
assert(!pageGetNode(ret2, 0));
freeCount++;
}
// try again.
@ -303,7 +323,8 @@ static Page * bhLoadPageImpl_helper(int xid, const pageid_t pageid, int uninitia
} else {
memset(ret->memAddr,0,PAGE_SIZE);
*stasis_page_lsn_ptr(ret) = ret->LSN;
ret->dirty = 0;
assert(!ret->dirty);
// ret->dirty = 0;
stasis_page_loaded(ret, type);
}
*pagePendingPtr(ret) = 0;
@ -348,8 +369,8 @@ static void bhReleasePage(Page * p) {
#endif
pthread_mutex_unlock(&mut);
}
static void bhWriteBackPage(Page * p) {
page_handle->write(page_handle, p);
static int bhWriteBackPage(Page * p) {
return tryToWriteBackPage(p);
}
static void bhForcePages() {
page_handle->force_file(page_handle);

View file

@ -31,8 +31,9 @@ static stasis_page_handle_t * page_handle;
static stasis_buffer_pool_t * stasis_buffer_pool;
static void pageWrite_legacyWrapper(Page * p) {
static int pageWrite_legacyWrapper(Page * p) {
page_handle->write(page_handle,p);
return 1; // XXX probably unsafe.
}
static void forcePageFile_legacyWrapper() {
page_handle->force_file(page_handle);

View file

@ -67,8 +67,7 @@ static void pfPageRead(stasis_page_handle_t * h, Page *ret, pagetype_t type) {
abort();
}
}
ret->dirty = 0;
assert(ret->dirty == 0);
stasis_page_loaded(ret, type);
pthread_mutex_unlock(&stable_mutex);
@ -78,9 +77,7 @@ static void pfPageRead(stasis_page_handle_t * h, Page *ret, pagetype_t type) {
dirty page table can be kept up to date. */
static void pfPageWrite(stasis_page_handle_t * h, Page * ret) {
/** If the page is clean, there's no reason to write it out. */
assert(ret->dirty == stasis_dirty_page_table_is_dirty(h->dirtyPages, ret));
if(!ret->dirty) {
// if(!stasis_dirty_page_table_is_dirty(ret)) {
if(!stasis_dirty_page_table_is_dirty(h->dirtyPages, ret)) {
DEBUG(" =^)~ ");
return;
}

View file

@ -47,7 +47,7 @@ static void paReleasePage(Page * p) {
stasis_dirty_page_table_set_clean(stasis_runtime_dirty_page_table(), p);
}
static void paWriteBackPage(Page * p) { /* no-op */ }
static int paWriteBackPage(Page * p) { return 1; /* no-op */ }
static void paForcePages() { /* no-op */ }
static void paForcePageRange(pageid_t start, pageid_t stop) { /* no-op */ }

View file

@ -104,7 +104,7 @@ Page* stasis_buffer_pool_malloc_page(stasis_buffer_pool_t * ret) {
pthread_mutex_lock(&ret->mut);
page = &(ret->pool[ret->nextPage]);
assert(!page->dirty);
(ret->nextPage)++;
/* There's a dummy page that we need to keep around, thus the +1 */
assert(ret->nextPage <= MAX_BUFFER_SIZE + 1);
@ -119,6 +119,7 @@ void stasis_buffer_pool_free_page(stasis_buffer_pool_t * ret, Page *p, pageid_t
writelock(p->rwlatch, 10);
p->id = id;
p->LSN = 0;
p->dirty = 0;
assert(!p->dirty);
// p->dirty = 0;
writeunlock(p->rwlatch);
}

View file

@ -5,14 +5,28 @@
* Author: sears
*/
#include <pbl/pbl.h>
#include <stasis/redblack.h>
#include <stasis/common.h>
#include <stasis/dirtyPageTable.h>
#include <stasis/page.h>
#include <stasis/bufferManager.h>
#include <stdio.h>
typedef struct {
pageid_t p;
lsn_t lsn;
} dpt_entry;
static int dpt_cmp(const void *ap, const void * bp, const void * ignored) {
const dpt_entry * a = ap;
const dpt_entry * b = bp;
return a->p < b->p ? -1 : (a->p == b->p ? 0 : 1);
}
struct stasis_dirty_page_table_t {
pblHashTable_t * table;
struct rbtree * table;
pthread_mutex_t mutex;
};
@ -20,30 +34,31 @@ void stasis_dirty_page_table_set_dirty(stasis_dirty_page_table_t * dirtyPages, P
pthread_mutex_lock(&dirtyPages->mutex);
if(!p->dirty) {
p->dirty = 1;
//assert(p->LSN);
void* ret = pblHtLookup(dirtyPages->table, &(p->id), sizeof(p->id));
assert(!ret);
lsn_t * insert = malloc(sizeof(lsn_t));
*insert = p->LSN;
pblHtInsert(dirtyPages->table, &(p->id), sizeof(p->id), insert); //(void*)p->LSN);
dpt_entry * e = malloc(sizeof(*e));
e->p = p->id;
e->lsn = p->LSN;
const void * ret = rbsearch(e, dirtyPages->table);
assert(ret == e); // otherwise, the entry was already in the table.
} else {
dpt_entry e = { p->id, 0};
assert(rbfind(&e, dirtyPages->table));
}
pthread_mutex_unlock(&dirtyPages->mutex);
}
void stasis_dirty_page_table_set_clean(stasis_dirty_page_table_t * dirtyPages, Page * p) {
pthread_mutex_lock(&dirtyPages->mutex);
// printf("Removing page %d\n", p->id);
//assert(pblHtLookup(dirtyPages, &(p->id), sizeof(int)));
// printf("With lsn = %d\n", (lsn_t)pblHtCurrent(dirtyPages));
p->dirty = 0;
lsn_t * old = pblHtLookup(dirtyPages->table, &(p->id),sizeof(p->id));
pblHtRemove(dirtyPages->table, &(p->id), sizeof(p->id));
if(old) {
free(old);
dpt_entry dummy = {p->id, 0};
const dpt_entry * e = rbdelete(&dummy, dirtyPages->table);
if(e) {
assert(e->p == p->id);
assert(p->dirty);
p->dirty = 0;
free((void*)e);
} else {
assert(!p->dirty);
}
//assert(!ret); <--- Due to a bug in the PBL compatibility mode,
//there is no way to tell whether the value didn't exist, or if it
//was null.
pthread_mutex_unlock(&dirtyPages->mutex);
}
@ -51,105 +66,83 @@ int stasis_dirty_page_table_is_dirty(stasis_dirty_page_table_t * dirtyPages, Pag
int ret;
pthread_mutex_lock(&dirtyPages->mutex);
ret = p->dirty;
dpt_entry e = { p->id, 0};
const void* found = rbfind(&e, dirtyPages->table);
assert((found && ret) || !(found||ret));
pthread_mutex_unlock(&dirtyPages->mutex);
return ret;
}
lsn_t stasis_dirty_page_table_minRecLSN(stasis_dirty_page_table_t * dirtyPages) {
lsn_t lsn = LSN_T_MAX; // LogFlushedLSN ();
pageid_t* pageid;
lsn_t lsn = LSN_T_MAX;
pthread_mutex_lock(&dirtyPages->mutex);
for( pageid = (pageid_t*)pblHtFirst (dirtyPages->table); pageid; pageid = (pageid_t*)pblHtNext(dirtyPages->table)) {
lsn_t * thisLSN = (lsn_t*) pblHtCurrent(dirtyPages->table);
// printf("lsn = %d\n", thisLSN);
if(*thisLSN < lsn) {
lsn = *thisLSN;
for(const dpt_entry * e = rbmin(dirtyPages->table);
e;
e = rblookup(RB_LUGREAT, e, dirtyPages->table)) {
if(e->lsn < lsn) {
lsn = e->lsn;
}
}
pthread_mutex_unlock(&dirtyPages->mutex);
return lsn;
}
void stasis_dirty_page_table_flush(stasis_dirty_page_table_t * dirtyPages) {
pageid_t * staleDirtyPages = malloc(sizeof(pageid_t) * (MAX_BUFFER_SIZE));
int i;
for(i = 0; i < MAX_BUFFER_SIZE; i++) {
staleDirtyPages[i] = -1;
}
Page* p = 0;
pthread_mutex_lock(&dirtyPages->mutex);
void* tmp;
i = 0;
for(tmp = pblHtFirst(dirtyPages->table); tmp; tmp = pblHtNext(dirtyPages->table)) {
staleDirtyPages[i] = *((pageid_t*) pblHtCurrentKey(dirtyPages->table));
i++;
}
assert(i < MAX_BUFFER_SIZE);
pthread_mutex_unlock(&dirtyPages->mutex);
for(i = 0; i < MAX_BUFFER_SIZE && staleDirtyPages[i] != -1; i++) {
p = getCachedPage(-1, staleDirtyPages[i]);
if(p) {
writeBackPage(p);
releasePage(p);
}
}
free(staleDirtyPages);
stasis_dirty_page_table_flush_range(dirtyPages, 0, 0); // pageid_t = 0 means flush to EOF.
}
void stasis_dirty_page_table_flush_range(stasis_dirty_page_table_t * dirtyPages, pageid_t start, pageid_t stop) {
pageid_t * staleDirtyPages = malloc(sizeof(pageid_t) * (MAX_BUFFER_SIZE));
int i;
Page * p = 0;
pthread_mutex_lock(&dirtyPages->mutex);
void *tmp;
i = 0;
for(tmp = pblHtFirst(dirtyPages->table); tmp; tmp = pblHtNext(dirtyPages->table)) {
pageid_t num = *((pageid_t*) pblHtCurrentKey(dirtyPages->table));
if(num <= start && num < stop) {
staleDirtyPages[i] = num;
i++;
}
pageid_t * staleDirtyPages = 0;
pageid_t n = 0;
dpt_entry dummy = { start, 0 };
for(const dpt_entry * e = rblookup(RB_LUGTEQ, &dummy, dirtyPages->table);
e && (stop == 0 || e->p < stop);
e = rblookup(RB_LUGREAT, e, dirtyPages->table)) {
n++;
staleDirtyPages = realloc(staleDirtyPages, sizeof(pageid_t) * n);
staleDirtyPages[n-1] = e->p;
}
staleDirtyPages[i] = -1;
pthread_mutex_unlock(&dirtyPages->mutex);
for(i = 0; i < MAX_BUFFER_SIZE && staleDirtyPages[i] != -1; i++) {
p = getCachedPage(-1, staleDirtyPages[i]);
for(pageid_t i = 0; i < n; i++) {
Page * p = getCachedPage(-1, staleDirtyPages[i]);
if(p) {
writeBackPage(p);
int succ = writeBackPage(p);
if(stop && (!succ)) { abort(); /*api violation!*/ }
releasePage(p);
}
}
free(staleDirtyPages);
forcePageRange(start*PAGE_SIZE,stop*PAGE_SIZE);
}
stasis_dirty_page_table_t * stasis_dirty_page_table_init() {
stasis_dirty_page_table_t * ret = malloc(sizeof(*ret));
ret->table = pblHtCreate();
ret->table = rbinit(dpt_cmp, 0);
pthread_mutex_init(&ret->mutex, 0);
return ret;
}
void stasis_dirty_page_table_deinit(stasis_dirty_page_table_t * dirtyPages) {
void * tmp;
int areDirty = 0;
for(tmp = pblHtFirst(dirtyPages->table); tmp; tmp = pblHtNext(dirtyPages->table)) {
free(pblHtCurrent(dirtyPages->table));
dpt_entry dummy = {0, 0};
for(const dpt_entry * e = rblookup(RB_LUGTEQ, &dummy, dirtyPages->table);
e;
e = rblookup(RB_LUGREAT, &dummy, dirtyPages->table)) {
if((!areDirty) &&
(!stasis_suppress_unclean_shutdown_warnings)) {
printf("Warning: dirtyPagesDeinit detected dirty, unwritten pages. "
"Updates lost?\n");
areDirty = 1;
}
dummy = *e;
rbdelete(e, dirtyPages->table);
free((void*)e);
}
pblHtDelete(dirtyPages->table);
rbdestroy(dirtyPages->table);
pthread_mutex_destroy(&dirtyPages->mutex);
free(dirtyPages);
}

View file

@ -9,11 +9,14 @@
out, or forcing the log too early?
*/
static void phWrite(stasis_page_handle_t * ph, Page * ret) {
if(!ret->dirty) { return; }
DEBUG("%lld\n", ret->id);
// This lock is only held to make the page implementation happy. We should
// implicitly have exclusive access to the page before this function is called,
// or we'll deadlock.
/// TODO Turn into trywritelock + test for trouble.
writelock(ret->rwlatch,0);
if(!ret->dirty) { unlock(ret->rwlatch); return; }
stasis_page_flushed(ret);
if(ph->log) { stasis_log_force(ph->log, ret->LSN, LOG_FORCE_WAL); }
int err = ((stasis_handle_t*)ph->impl)->write(ph->impl, PAGE_SIZE * ret->id, ret->memAddr, PAGE_SIZE);
@ -38,7 +41,7 @@ static void phRead(stasis_page_handle_t * ph, Page * ret, pagetype_t type) {
abort();
}
}
ret->dirty = 0;
assert(!ret->dirty);
stasis_page_loaded(ret, type);
unlock(ret->rwlatch);
}

View file

@ -139,7 +139,7 @@ extern void (*releasePageImpl)(Page * p);
storage. For compatibility, such buffer managers should ignore
this call.)
*/
extern void (*writeBackPage)(Page * p);
extern int (*writeBackPage)(Page * p);
/**
Force any written back pages to disk.