diff --git a/src/stasis/dirtyPageTable.c b/src/stasis/dirtyPageTable.c index 9e296be..bc63325 100644 --- a/src/stasis/dirtyPageTable.c +++ b/src/stasis/dirtyPageTable.c @@ -18,20 +18,30 @@ typedef struct { lsn_t lsn; } dpt_entry; -static int dpt_cmp(const void *ap, const void * bp, const void * ignored) { + +static int dpt_cmp_page(const void *ap, const void * bp, const void * ignored) { + const dpt_entry * a = ap; + const dpt_entry * b = bp; + return (a->p < b->p) ? -1 : ((a->p == b->p) ? 0 : 1); +} +static int dpt_cmp_lsn_and_page(const void *ap, const void * bp, const void * ignored) { const dpt_entry * a = ap; const dpt_entry * b = bp; - return (a->p < b->p) ? -1 : ((a->p == b->p) ? 0 : 1); + return (a->lsn < b->lsn) ? -1 : ((a->lsn == b->lsn) ? dpt_cmp_page(ap, bp, 0) : 1); } struct stasis_dirty_page_table_t { - struct rbtree * table; + struct rbtree * tableByPage; + struct rbtree * tableByLsnAndPage; stasis_buffer_manager_t * bufferManager; pageid_t count; pthread_mutex_t mutex; pthread_cond_t flushDone; int flushing; + + pthread_cond_t writebackCond; + lsn_t targetLsn; }; void stasis_dirty_page_table_set_dirty(stasis_dirty_page_table_t * dirtyPages, Page * p) { @@ -42,7 +52,13 @@ void stasis_dirty_page_table_set_dirty(stasis_dirty_page_table_t * dirtyPages, P dpt_entry * e = malloc(sizeof(*e)); e->p = p->id; e->lsn = p->LSN; - const void * ret = rbsearch(e, dirtyPages->table); + const void * ret = rbsearch(e, dirtyPages->tableByPage); + assert(ret == e); // otherwise, the entry was already in the table. + + e = malloc(sizeof(*e)); + e->p = p->id; + e->lsn = p->LSN; + ret = rbsearch(e, dirtyPages->tableByLsnAndPage); assert(ret == e); // otherwise, the entry was already in the table. dirtyPages->count++; } @@ -51,37 +67,56 @@ void stasis_dirty_page_table_set_dirty(stasis_dirty_page_table_t * dirtyPages, P } else { pthread_mutex_lock(&dirtyPages->mutex); dpt_entry e = { p->id, 0}; - assert(rbfind(&e, dirtyPages->table)); + assert(rbfind(&e, dirtyPages->tableByPage)); pthread_mutex_unlock(&dirtyPages->mutex); #endif //SANITY_CHECKS } } void stasis_dirty_page_table_set_clean(stasis_dirty_page_table_t * dirtyPages, Page * p) { - assert(!tryreadlock(p->rwlatch,0)); if(p->dirty) { pthread_mutex_lock(&dirtyPages->mutex); - dpt_entry dummy = {p->id, 0}; - const dpt_entry * e = rbdelete(&dummy, dirtyPages->table); - assert(e); - assert(e->p == p->id); - assert(p->dirty); - p->dirty = 0; - free((void*)e); - dirtyPages->count--; + if(p->dirty) { + dpt_entry dummy = {p->id, 0}; + + const dpt_entry * e = rbdelete(&dummy, dirtyPages->tableByPage); + assert(e); + assert(e->p == p->id); + dummy.lsn = e->lsn; + free((void*)e); + + e = rbdelete(&dummy, dirtyPages->tableByLsnAndPage); + assert(e); + assert(e->p == p->id); + assert(e->lsn == dummy.lsn); + + assert(p->dirty); + p->dirty = 0; + + lsn_t targetLsn = dirtyPages->targetLsn; + if (e->lsn < targetLsn) { + const dpt_entry * e2 = rbmin(dirtyPages->tableByLsnAndPage); + + if (!e2 || e2->lsn >= targetLsn) { + dirtyPages->targetLsn = 0; + pthread_cond_broadcast( &dirtyPages->writebackCond ); + } + } + + dirtyPages->count--; + } pthread_mutex_unlock(&dirtyPages->mutex); } } int stasis_dirty_page_table_is_dirty(stasis_dirty_page_table_t * dirtyPages, Page * p) { int ret; - assert(!trywritelock(p->rwlatch,0)); ret = p->dirty; #ifdef SANITY_CHECKS pthread_mutex_lock(&dirtyPages->mutex); dpt_entry e = { p->id, 0}; - const void* found = rbfind(&e, dirtyPages->table); + const void* found = rbfind(&e, dirtyPages->tableByPage); assert((found && ret) || !(found||ret)); pthread_mutex_unlock(&dirtyPages->mutex); #endif @@ -89,15 +124,9 @@ int stasis_dirty_page_table_is_dirty(stasis_dirty_page_table_t * dirtyPages, Pag } lsn_t stasis_dirty_page_table_minRecLSN(stasis_dirty_page_table_t * dirtyPages) { - lsn_t lsn = LSN_T_MAX; pthread_mutex_lock(&dirtyPages->mutex); - for(const dpt_entry * e = rbmin(dirtyPages->table); - e; - e = rblookup(RB_LUGREAT, e, dirtyPages->table)) { - if(e->lsn < lsn) { - lsn = e->lsn; - } - } + const dpt_entry * e = rbmin(dirtyPages->tableByLsnAndPage); + lsn_t lsn = e ? e->lsn : LSN_T_MAX; pthread_mutex_unlock(&dirtyPages->mutex); return lsn; } @@ -110,49 +139,103 @@ pageid_t stasis_dirty_page_table_dirty_count(stasis_dirty_page_table_t * dirtyPa return ret; } -int stasis_dirty_page_table_flush(stasis_dirty_page_table_t * dirtyPages) { - dpt_entry dummy = { 0, 0 }; +int stasis_dirty_page_table_flush_with_target(stasis_dirty_page_table_t * dirtyPages, lsn_t targetLsn ) { const int stride = 200; - pageid_t vals[stride]; - int off = 0; - int strides = 0; + int all_flushed; pthread_mutex_lock(&dirtyPages->mutex); - if(dirtyPages->flushing) { - pthread_cond_wait(&dirtyPages->flushDone, &dirtyPages->mutex); - pthread_mutex_unlock(&dirtyPages->mutex); - return EAGAIN; - } - dirtyPages->flushing = 1; - for(const dpt_entry * e = rblookup(RB_LUGTEQ, &dummy, dirtyPages->table) ; - e; - e = rblookup(RB_LUGREAT, &dummy, dirtyPages->table)) { - dummy = *e; - vals[off] = dummy.p; - off++; - if(off == stride) { + if (targetLsn == LSN_T_MAX) { + if(dirtyPages->flushing) { + pthread_cond_wait(&dirtyPages->flushDone, &dirtyPages->mutex); pthread_mutex_unlock(&dirtyPages->mutex); - for(pageid_t i = 0; i < off; i++) { - dirtyPages->bufferManager->tryToWriteBackPage(dirtyPages->bufferManager, vals[i]); - } - off = 0; - strides++; - pthread_mutex_lock(&dirtyPages->mutex); + return EAGAIN; } + dirtyPages->flushing = 1; } - pthread_mutex_unlock(&dirtyPages->mutex); - for(int i = 0; i < off; i++) { - dirtyPages->bufferManager->tryToWriteBackPage(dirtyPages->bufferManager, vals[i]); + + do { + dpt_entry dummy = { 0, 0 }; + pageid_t vals[stride]; + int off = 0; + int strides = 0; + all_flushed = 1; + for(const dpt_entry * e = rblookup(RB_LUGTEQ, &dummy, dirtyPages->tableByLsnAndPage) ; + e && e->lsn < targetLsn; + e = rblookup(RB_LUGREAT, &dummy, dirtyPages->tableByLsnAndPage)) { + dummy = *e; + vals[off] = dummy.p; + off++; + if(off == stride) { + pthread_mutex_unlock(&dirtyPages->mutex); + for(pageid_t i = 0; i < off; i++) { + if (dirtyPages->bufferManager->tryToWriteBackPage(dirtyPages->bufferManager, vals[i]) == EBUSY) { + all_flushed = 0; + } + } + off = 0; + strides++; + pthread_mutex_lock(&dirtyPages->mutex); + } + } + pthread_mutex_unlock(&dirtyPages->mutex); + for(int i = 0; i < off; i++) { + if (dirtyPages->bufferManager->tryToWriteBackPage(dirtyPages->bufferManager, vals[i]) == EBUSY) { + all_flushed = 0; + }; + } + pthread_mutex_lock(&dirtyPages->mutex); + + if (!all_flushed && + targetLsn < LSN_T_MAX && + dirtyPages->count > 0 && + targetLsn > ((dpt_entry*)rbmin(dirtyPages->tableByLsnAndPage))->lsn ) { + struct timespec ts; + struct timeval tv; + + dirtyPages->targetLsn = targetLsn; + all_flushed = 1; + + int res = gettimeofday(&tv, 0); + assert(res == 0); + + // We expect previously pinned pages to be unpinned and flushed within + // 100 milliseconds. If there aren't then we had race condition and the + // pinning thread sampled p->needFlush before we set it to 1. This + // should be very rare. + + tv.tv_usec += 100000; + if (tv.tv_usec >= 1000000 ) { + ++tv.tv_sec; + tv.tv_usec -= 1000000; + } + + ts.tv_sec = tv.tv_sec; + ts.tv_nsec = 1000*tv.tv_usec; + + while ( dirtyPages->targetLsn != 0 ) { + if (pthread_cond_timedwait(&dirtyPages->writebackCond, &dirtyPages->mutex, &ts) == ETIMEDOUT) { + all_flushed = 0; + break; + } + } + } + } while(!all_flushed); + if (targetLsn == LSN_T_MAX) { + pthread_cond_broadcast(&dirtyPages->flushDone); + dirtyPages->flushing = 0; } - pthread_mutex_lock(&dirtyPages->mutex); - dirtyPages->flushing = 0; - pthread_cond_broadcast(&dirtyPages->flushDone); + pthread_mutex_unlock(&dirtyPages->mutex); + // if(strides < 5) { DEBUG("strides: %d dirtyCount = %lld\n", strides, stasis_dirty_page_table_dirty_count(dirtyPages)); } return 0; } +int stasis_dirty_page_table_flush(stasis_dirty_page_table_t * dirtyPages) { + return stasis_dirty_page_table_flush_with_target(dirtyPages, LSN_T_MAX); +} + int stasis_dirty_page_table_get_flush_candidates(stasis_dirty_page_table_t * dirtyPages, pageid_t start, pageid_t stop, int count, pageid_t* range_starts, pageid_t* range_ends) { pthread_mutex_lock(&dirtyPages->mutex); int n = 0; @@ -161,9 +244,9 @@ int stasis_dirty_page_table_get_flush_candidates(stasis_dirty_page_table_t * dir dummy.lsn = -1; dummy.p = start; - for(const dpt_entry *e = rblookup(RB_LUGTEQ, &dummy, dirtyPages->table); + for(const dpt_entry *e = rblookup(RB_LUGTEQ, &dummy, dirtyPages->tableByPage); e && (stop == 0 || e->p < stop) && n < count; - e = rblookup(RB_LUGREAT, e, dirtyPages->table)) { + e = rblookup(RB_LUGREAT, e, dirtyPages->tableByPage)) { if(n == 0 || range_ends[b] != e->p) { b++; range_starts[b] = e->p; @@ -193,9 +276,9 @@ void stasis_dirty_page_table_flush_range(stasis_dirty_page_table_t * dirtyPages, pageid_t * staleDirtyPages = 0; pageid_t n = 0; dpt_entry dummy = { start, 0 }; - for(const dpt_entry * e = rblookup(RB_LUGTEQ, &dummy, dirtyPages->table); + for(const dpt_entry * e = rblookup(RB_LUGTEQ, &dummy, dirtyPages->tableByPage); e && (stop == 0 || e->p < stop); - e = rblookup(RB_LUGREAT, e, dirtyPages->table)) { + e = rblookup(RB_LUGREAT, e, dirtyPages->tableByPage)) { n++; staleDirtyPages = realloc(staleDirtyPages, sizeof(pageid_t) * n); staleDirtyPages[n-1] = e->p; @@ -219,20 +302,23 @@ void stasis_dirty_page_table_set_buffer_manager(stasis_dirty_page_table_t * dpt, stasis_dirty_page_table_t * stasis_dirty_page_table_init() { stasis_dirty_page_table_t * ret = malloc(sizeof(*ret)); - ret->table = rbinit(dpt_cmp, 0); + ret->tableByPage = rbinit(dpt_cmp_page, 0); + ret->tableByLsnAndPage = rbinit(dpt_cmp_lsn_and_page, 0); ret->count = 0; pthread_mutex_init(&ret->mutex, 0); pthread_cond_init(&ret->flushDone, 0); ret->flushing = 0; + ret->targetLsn = 0; + pthread_cond_init(&ret->writebackCond, 0); return ret; } void stasis_dirty_page_table_deinit(stasis_dirty_page_table_t * dirtyPages) { int areDirty = 0; dpt_entry dummy = {0, 0}; - for(const dpt_entry * e = rblookup(RB_LUGTEQ, &dummy, dirtyPages->table); + for(const dpt_entry * e = rblookup(RB_LUGTEQ, &dummy, dirtyPages->tableByPage); e; - e = rblookup(RB_LUGREAT, &dummy, dirtyPages->table)) { + e = rblookup(RB_LUGREAT, &dummy, dirtyPages->tableByPage)) { if((!areDirty) && (!stasis_suppress_unclean_shutdown_warnings)) { @@ -241,11 +327,21 @@ void stasis_dirty_page_table_deinit(stasis_dirty_page_table_t * dirtyPages) { areDirty = 1; } dummy = *e; - rbdelete(e, dirtyPages->table); + rbdelete(e, dirtyPages->tableByPage); free((void*)e); } - rbdestroy(dirtyPages->table); + dpt_entry dummy2 = {0, 0}; + for(const dpt_entry * e = rblookup(RB_LUGTEQ, &dummy2, dirtyPages->tableByLsnAndPage); + e; + e = rblookup(RB_LUGREAT, &dummy2, dirtyPages->tableByLsnAndPage)) { + dummy2 = *e; + rbdelete(e, dirtyPages->tableByLsnAndPage); + free((void*)e); + } + + rbdestroy(dirtyPages->tableByPage); + rbdestroy(dirtyPages->tableByLsnAndPage); pthread_mutex_destroy(&dirtyPages->mutex); free(dirtyPages); } diff --git a/stasis/dirtyPageTable.h b/stasis/dirtyPageTable.h index ade0916..fe6d9e0 100644 --- a/stasis/dirtyPageTable.h +++ b/stasis/dirtyPageTable.h @@ -20,11 +20,16 @@ void stasis_dirty_page_table_deinit(stasis_dirty_page_table_t * dirtyPages); void stasis_dirty_page_table_set_dirty(stasis_dirty_page_table_t * dirtyPages, Page * p); void stasis_dirty_page_table_set_clean(stasis_dirty_page_table_t * dirtyPages, Page * p); +/** Note: this has questionable semantics, since you don't have to + write latch the page to call it. Of course, if this is called in + race with threads that are cleaning / dirtying the page, then the + return value could be stale by the time the caller examines it. */ int stasis_dirty_page_table_is_dirty(stasis_dirty_page_table_t * dirtyPages, Page * p); pageid_t stasis_dirty_page_table_dirty_count(stasis_dirty_page_table_t * dirtyPages); int stasis_dirty_page_table_flush(stasis_dirty_page_table_t * dirtyPages); +int stasis_dirty_page_table_flush_with_target(stasis_dirty_page_table_t * dirtyPages, lsn_t targetLsn); lsn_t stasis_dirty_page_table_minRecLSN(stasis_dirty_page_table_t* dirtyPages); /**