third-party dirtyPageTable patches 1-4, forward ported to trunk. This commit may introduce a regression; dirty_page_table_flush is probably no longer reentrant

This commit is contained in:
Sears Russell 2010-10-19 22:37:07 +00:00
parent 8ee684d98e
commit 6f0c50d861
2 changed files with 164 additions and 63 deletions

View file

@ -18,20 +18,30 @@ typedef struct {
lsn_t lsn;
} dpt_entry;
static int dpt_cmp(const void *ap, const void * bp, const void * ignored) {
static int dpt_cmp_page(const void *ap, const void * bp, const void * ignored) {
const dpt_entry * a = ap;
const dpt_entry * b = bp;
return (a->p < b->p) ? -1 : ((a->p == b->p) ? 0 : 1);
}
static int dpt_cmp_lsn_and_page(const void *ap, const void * bp, const void * ignored) {
const dpt_entry * a = ap;
const dpt_entry * b = bp;
return (a->p < b->p) ? -1 : ((a->p == b->p) ? 0 : 1);
return (a->lsn < b->lsn) ? -1 : ((a->lsn == b->lsn) ? dpt_cmp_page(ap, bp, 0) : 1);
}
struct stasis_dirty_page_table_t {
struct rbtree * table;
struct rbtree * tableByPage;
struct rbtree * tableByLsnAndPage;
stasis_buffer_manager_t * bufferManager;
pageid_t count;
pthread_mutex_t mutex;
pthread_cond_t flushDone;
int flushing;
pthread_cond_t writebackCond;
lsn_t targetLsn;
};
void stasis_dirty_page_table_set_dirty(stasis_dirty_page_table_t * dirtyPages, Page * p) {
@ -42,7 +52,13 @@ void stasis_dirty_page_table_set_dirty(stasis_dirty_page_table_t * dirtyPages, P
dpt_entry * e = malloc(sizeof(*e));
e->p = p->id;
e->lsn = p->LSN;
const void * ret = rbsearch(e, dirtyPages->table);
const void * ret = rbsearch(e, dirtyPages->tableByPage);
assert(ret == e); // otherwise, the entry was already in the table.
e = malloc(sizeof(*e));
e->p = p->id;
e->lsn = p->LSN;
ret = rbsearch(e, dirtyPages->tableByLsnAndPage);
assert(ret == e); // otherwise, the entry was already in the table.
dirtyPages->count++;
}
@ -51,37 +67,56 @@ void stasis_dirty_page_table_set_dirty(stasis_dirty_page_table_t * dirtyPages, P
} else {
pthread_mutex_lock(&dirtyPages->mutex);
dpt_entry e = { p->id, 0};
assert(rbfind(&e, dirtyPages->table));
assert(rbfind(&e, dirtyPages->tableByPage));
pthread_mutex_unlock(&dirtyPages->mutex);
#endif //SANITY_CHECKS
}
}
void stasis_dirty_page_table_set_clean(stasis_dirty_page_table_t * dirtyPages, Page * p) {
assert(!tryreadlock(p->rwlatch,0));
if(p->dirty) {
pthread_mutex_lock(&dirtyPages->mutex);
dpt_entry dummy = {p->id, 0};
const dpt_entry * e = rbdelete(&dummy, dirtyPages->table);
assert(e);
assert(e->p == p->id);
assert(p->dirty);
p->dirty = 0;
free((void*)e);
dirtyPages->count--;
if(p->dirty) {
dpt_entry dummy = {p->id, 0};
const dpt_entry * e = rbdelete(&dummy, dirtyPages->tableByPage);
assert(e);
assert(e->p == p->id);
dummy.lsn = e->lsn;
free((void*)e);
e = rbdelete(&dummy, dirtyPages->tableByLsnAndPage);
assert(e);
assert(e->p == p->id);
assert(e->lsn == dummy.lsn);
assert(p->dirty);
p->dirty = 0;
lsn_t targetLsn = dirtyPages->targetLsn;
if (e->lsn < targetLsn) {
const dpt_entry * e2 = rbmin(dirtyPages->tableByLsnAndPage);
if (!e2 || e2->lsn >= targetLsn) {
dirtyPages->targetLsn = 0;
pthread_cond_broadcast( &dirtyPages->writebackCond );
}
}
dirtyPages->count--;
}
pthread_mutex_unlock(&dirtyPages->mutex);
}
}
int stasis_dirty_page_table_is_dirty(stasis_dirty_page_table_t * dirtyPages, Page * p) {
int ret;
assert(!trywritelock(p->rwlatch,0));
ret = p->dirty;
#ifdef SANITY_CHECKS
pthread_mutex_lock(&dirtyPages->mutex);
dpt_entry e = { p->id, 0};
const void* found = rbfind(&e, dirtyPages->table);
const void* found = rbfind(&e, dirtyPages->tableByPage);
assert((found && ret) || !(found||ret));
pthread_mutex_unlock(&dirtyPages->mutex);
#endif
@ -89,15 +124,9 @@ int stasis_dirty_page_table_is_dirty(stasis_dirty_page_table_t * dirtyPages, Pag
}
lsn_t stasis_dirty_page_table_minRecLSN(stasis_dirty_page_table_t * dirtyPages) {
lsn_t lsn = LSN_T_MAX;
pthread_mutex_lock(&dirtyPages->mutex);
for(const dpt_entry * e = rbmin(dirtyPages->table);
e;
e = rblookup(RB_LUGREAT, e, dirtyPages->table)) {
if(e->lsn < lsn) {
lsn = e->lsn;
}
}
const dpt_entry * e = rbmin(dirtyPages->tableByLsnAndPage);
lsn_t lsn = e ? e->lsn : LSN_T_MAX;
pthread_mutex_unlock(&dirtyPages->mutex);
return lsn;
}
@ -110,49 +139,103 @@ pageid_t stasis_dirty_page_table_dirty_count(stasis_dirty_page_table_t * dirtyPa
return ret;
}
int stasis_dirty_page_table_flush(stasis_dirty_page_table_t * dirtyPages) {
dpt_entry dummy = { 0, 0 };
int stasis_dirty_page_table_flush_with_target(stasis_dirty_page_table_t * dirtyPages, lsn_t targetLsn ) {
const int stride = 200;
pageid_t vals[stride];
int off = 0;
int strides = 0;
int all_flushed;
pthread_mutex_lock(&dirtyPages->mutex);
if(dirtyPages->flushing) {
pthread_cond_wait(&dirtyPages->flushDone, &dirtyPages->mutex);
pthread_mutex_unlock(&dirtyPages->mutex);
return EAGAIN;
}
dirtyPages->flushing = 1;
for(const dpt_entry * e = rblookup(RB_LUGTEQ, &dummy, dirtyPages->table) ;
e;
e = rblookup(RB_LUGREAT, &dummy, dirtyPages->table)) {
dummy = *e;
vals[off] = dummy.p;
off++;
if(off == stride) {
if (targetLsn == LSN_T_MAX) {
if(dirtyPages->flushing) {
pthread_cond_wait(&dirtyPages->flushDone, &dirtyPages->mutex);
pthread_mutex_unlock(&dirtyPages->mutex);
for(pageid_t i = 0; i < off; i++) {
dirtyPages->bufferManager->tryToWriteBackPage(dirtyPages->bufferManager, vals[i]);
}
off = 0;
strides++;
pthread_mutex_lock(&dirtyPages->mutex);
return EAGAIN;
}
dirtyPages->flushing = 1;
}
pthread_mutex_unlock(&dirtyPages->mutex);
for(int i = 0; i < off; i++) {
dirtyPages->bufferManager->tryToWriteBackPage(dirtyPages->bufferManager, vals[i]);
do {
dpt_entry dummy = { 0, 0 };
pageid_t vals[stride];
int off = 0;
int strides = 0;
all_flushed = 1;
for(const dpt_entry * e = rblookup(RB_LUGTEQ, &dummy, dirtyPages->tableByLsnAndPage) ;
e && e->lsn < targetLsn;
e = rblookup(RB_LUGREAT, &dummy, dirtyPages->tableByLsnAndPage)) {
dummy = *e;
vals[off] = dummy.p;
off++;
if(off == stride) {
pthread_mutex_unlock(&dirtyPages->mutex);
for(pageid_t i = 0; i < off; i++) {
if (dirtyPages->bufferManager->tryToWriteBackPage(dirtyPages->bufferManager, vals[i]) == EBUSY) {
all_flushed = 0;
}
}
off = 0;
strides++;
pthread_mutex_lock(&dirtyPages->mutex);
}
}
pthread_mutex_unlock(&dirtyPages->mutex);
for(int i = 0; i < off; i++) {
if (dirtyPages->bufferManager->tryToWriteBackPage(dirtyPages->bufferManager, vals[i]) == EBUSY) {
all_flushed = 0;
};
}
pthread_mutex_lock(&dirtyPages->mutex);
if (!all_flushed &&
targetLsn < LSN_T_MAX &&
dirtyPages->count > 0 &&
targetLsn > ((dpt_entry*)rbmin(dirtyPages->tableByLsnAndPage))->lsn ) {
struct timespec ts;
struct timeval tv;
dirtyPages->targetLsn = targetLsn;
all_flushed = 1;
int res = gettimeofday(&tv, 0);
assert(res == 0);
// We expect previously pinned pages to be unpinned and flushed within
// 100 milliseconds. If there aren't then we had race condition and the
// pinning thread sampled p->needFlush before we set it to 1. This
// should be very rare.
tv.tv_usec += 100000;
if (tv.tv_usec >= 1000000 ) {
++tv.tv_sec;
tv.tv_usec -= 1000000;
}
ts.tv_sec = tv.tv_sec;
ts.tv_nsec = 1000*tv.tv_usec;
while ( dirtyPages->targetLsn != 0 ) {
if (pthread_cond_timedwait(&dirtyPages->writebackCond, &dirtyPages->mutex, &ts) == ETIMEDOUT) {
all_flushed = 0;
break;
}
}
}
} while(!all_flushed);
if (targetLsn == LSN_T_MAX) {
pthread_cond_broadcast(&dirtyPages->flushDone);
dirtyPages->flushing = 0;
}
pthread_mutex_lock(&dirtyPages->mutex);
dirtyPages->flushing = 0;
pthread_cond_broadcast(&dirtyPages->flushDone);
pthread_mutex_unlock(&dirtyPages->mutex);
// if(strides < 5) { DEBUG("strides: %d dirtyCount = %lld\n", strides, stasis_dirty_page_table_dirty_count(dirtyPages)); }
return 0;
}
int stasis_dirty_page_table_flush(stasis_dirty_page_table_t * dirtyPages) {
return stasis_dirty_page_table_flush_with_target(dirtyPages, LSN_T_MAX);
}
int stasis_dirty_page_table_get_flush_candidates(stasis_dirty_page_table_t * dirtyPages, pageid_t start, pageid_t stop, int count, pageid_t* range_starts, pageid_t* range_ends) {
pthread_mutex_lock(&dirtyPages->mutex);
int n = 0;
@ -161,9 +244,9 @@ int stasis_dirty_page_table_get_flush_candidates(stasis_dirty_page_table_t * dir
dummy.lsn = -1;
dummy.p = start;
for(const dpt_entry *e = rblookup(RB_LUGTEQ, &dummy, dirtyPages->table);
for(const dpt_entry *e = rblookup(RB_LUGTEQ, &dummy, dirtyPages->tableByPage);
e && (stop == 0 || e->p < stop) && n < count;
e = rblookup(RB_LUGREAT, e, dirtyPages->table)) {
e = rblookup(RB_LUGREAT, e, dirtyPages->tableByPage)) {
if(n == 0 || range_ends[b] != e->p) {
b++;
range_starts[b] = e->p;
@ -193,9 +276,9 @@ void stasis_dirty_page_table_flush_range(stasis_dirty_page_table_t * dirtyPages,
pageid_t * staleDirtyPages = 0;
pageid_t n = 0;
dpt_entry dummy = { start, 0 };
for(const dpt_entry * e = rblookup(RB_LUGTEQ, &dummy, dirtyPages->table);
for(const dpt_entry * e = rblookup(RB_LUGTEQ, &dummy, dirtyPages->tableByPage);
e && (stop == 0 || e->p < stop);
e = rblookup(RB_LUGREAT, e, dirtyPages->table)) {
e = rblookup(RB_LUGREAT, e, dirtyPages->tableByPage)) {
n++;
staleDirtyPages = realloc(staleDirtyPages, sizeof(pageid_t) * n);
staleDirtyPages[n-1] = e->p;
@ -219,20 +302,23 @@ void stasis_dirty_page_table_set_buffer_manager(stasis_dirty_page_table_t * dpt,
stasis_dirty_page_table_t * stasis_dirty_page_table_init() {
stasis_dirty_page_table_t * ret = malloc(sizeof(*ret));
ret->table = rbinit(dpt_cmp, 0);
ret->tableByPage = rbinit(dpt_cmp_page, 0);
ret->tableByLsnAndPage = rbinit(dpt_cmp_lsn_and_page, 0);
ret->count = 0;
pthread_mutex_init(&ret->mutex, 0);
pthread_cond_init(&ret->flushDone, 0);
ret->flushing = 0;
ret->targetLsn = 0;
pthread_cond_init(&ret->writebackCond, 0);
return ret;
}
void stasis_dirty_page_table_deinit(stasis_dirty_page_table_t * dirtyPages) {
int areDirty = 0;
dpt_entry dummy = {0, 0};
for(const dpt_entry * e = rblookup(RB_LUGTEQ, &dummy, dirtyPages->table);
for(const dpt_entry * e = rblookup(RB_LUGTEQ, &dummy, dirtyPages->tableByPage);
e;
e = rblookup(RB_LUGREAT, &dummy, dirtyPages->table)) {
e = rblookup(RB_LUGREAT, &dummy, dirtyPages->tableByPage)) {
if((!areDirty) &&
(!stasis_suppress_unclean_shutdown_warnings)) {
@ -241,11 +327,21 @@ void stasis_dirty_page_table_deinit(stasis_dirty_page_table_t * dirtyPages) {
areDirty = 1;
}
dummy = *e;
rbdelete(e, dirtyPages->table);
rbdelete(e, dirtyPages->tableByPage);
free((void*)e);
}
rbdestroy(dirtyPages->table);
dpt_entry dummy2 = {0, 0};
for(const dpt_entry * e = rblookup(RB_LUGTEQ, &dummy2, dirtyPages->tableByLsnAndPage);
e;
e = rblookup(RB_LUGREAT, &dummy2, dirtyPages->tableByLsnAndPage)) {
dummy2 = *e;
rbdelete(e, dirtyPages->tableByLsnAndPage);
free((void*)e);
}
rbdestroy(dirtyPages->tableByPage);
rbdestroy(dirtyPages->tableByLsnAndPage);
pthread_mutex_destroy(&dirtyPages->mutex);
free(dirtyPages);
}

View file

@ -20,11 +20,16 @@ void stasis_dirty_page_table_deinit(stasis_dirty_page_table_t * dirtyPages);
void stasis_dirty_page_table_set_dirty(stasis_dirty_page_table_t * dirtyPages, Page * p);
void stasis_dirty_page_table_set_clean(stasis_dirty_page_table_t * dirtyPages, Page * p);
/** Note: this has questionable semantics, since you don't have to
write latch the page to call it. Of course, if this is called in
race with threads that are cleaning / dirtying the page, then the
return value could be stale by the time the caller examines it. */
int stasis_dirty_page_table_is_dirty(stasis_dirty_page_table_t * dirtyPages, Page * p);
pageid_t stasis_dirty_page_table_dirty_count(stasis_dirty_page_table_t * dirtyPages);
int stasis_dirty_page_table_flush(stasis_dirty_page_table_t * dirtyPages);
int stasis_dirty_page_table_flush_with_target(stasis_dirty_page_table_t * dirtyPages, lsn_t targetLsn);
lsn_t stasis_dirty_page_table_minRecLSN(stasis_dirty_page_table_t* dirtyPages);
/**