From 0154f7d29d5cf015fa1cb1ede28e6fc875d30ff6 Mon Sep 17 00:00:00 2001 From: Sears Russell Date: Thu, 13 Aug 2009 05:53:48 +0000 Subject: [PATCH] IMPORTANT API CHANGE: stasis_dirty_page_table_flush_range() no longer force writes to disk. You now need to call forcePageRange() after flush page range returns. This commit should significantly improve sequential write thoughput. I found a segfault bug this morning, but cannot reproduce. Could be a heisenbug, or could have been a hardware fault (gojira repeatedly kernel paniced, then both problems went away...) --- benchmarks/sequentialThroughput.c | 13 +++++++- src/stasis/bufferManager/bufferHash.c | 39 +++++++++++++++++----- src/stasis/bufferManager/legacy/pageFile.c | 1 + src/stasis/bufferManager/pageArray.c | 2 ++ src/stasis/dirtyPageTable.c | 35 +++++++++++++++++-- src/stasis/operations/lsmTree.c | 1 + src/stasis/pageHandle.c | 10 +++--- src/stasis/truncation.c | 22 ++++++------ stasis/bufferManager.h | 3 ++ stasis/pageHandle.h | 4 +++ 10 files changed, 103 insertions(+), 27 deletions(-) diff --git a/benchmarks/sequentialThroughput.c b/benchmarks/sequentialThroughput.c index affd156..ecded63 100644 --- a/benchmarks/sequentialThroughput.c +++ b/benchmarks/sequentialThroughput.c @@ -33,7 +33,7 @@ int main(int argc, char ** argv) { int direct = 0; int legacyBM = 0; int legacyFH = 0; - + int stake = 0; long page_count = mb_to_page(100); for(int i = 1; i < argc; i++) { @@ -65,6 +65,9 @@ int main(int argc, char ** argv) { } else if(!strcmp(argv[i], "--mb")) { i++; page_count = mb_to_page(atoll(argv[i])); + } else if(!strcmp(argv[i], "--stake")) { + i++; + stake = mb_to_page(atoll(argv[i])); } else { printf("Unknown argument: %s\n", argv[i]); return 1; @@ -78,6 +81,14 @@ int main(int argc, char ** argv) { Tinit(); + if(stake) { + Page * p = loadPage(-1, stake); + writelock(p->rwlatch,0); + stasis_dirty_page_table_set_dirty(stasis_runtime_dirty_page_table(), p); + unlock(p->rwlatch); + releasePage(p); + } + for(long i =0; i < page_count; i++) { Page * p = loadPage(-1, i); writelock(p->rwlatch,0); diff --git a/src/stasis/bufferManager/bufferHash.c b/src/stasis/bufferManager/bufferHash.c index eacd69f..b543a18 100644 --- a/src/stasis/bufferManager/bufferHash.c +++ b/src/stasis/bufferManager/bufferHash.c @@ -30,6 +30,8 @@ static stasis_buffer_pool_t * stasis_buffer_pool; static stasis_page_handle_t * page_handle; +static int flushing; + static int running; typedef struct LL_ENTRY(node_t) node_t; @@ -49,7 +51,14 @@ static inline struct Page_s ** pagePendingPtr(Page * p) { static inline intptr_t* pagePinCountPtr(Page * p) { return ((intptr_t*)(&((p)->queue))); } - +static inline int needFlush() { + pageid_t count = stasis_dirty_page_table_dirty_count(stasis_runtime_dirty_page_table()); + const pageid_t needed = 1000; //MAX_BUFFER_SIZE / 5; + if(count > needed) { + DEBUG("Need flush? Dirty: %lld Total: %lld ret = %d\n", count, needed, count > needed); + } + return count > needed; +} #ifdef LONG_RUN inline static void checkPageState(Page * p) { @@ -96,11 +105,13 @@ inline static int tryToWriteBackPage(pageid_t page) { if(*pagePendingPtr(p) || *pagePinCountPtr(p)) { return EBUSY; } - DEBUG("Write(%ld)\n", (long)victim->id); page_handle->write(page_handle, p); /// XXX pageCleanup and pageFlushed might be heavyweight. - assert(!stasis_dirty_page_table_is_dirty(stasis_runtime_dirty_page_table(), p)); +// int locked = trywritelock(p->rwlatch,0); +// assert(locked); +// assert(!stasis_dirty_page_table_is_dirty(stasis_runtime_dirty_page_table(), p)); +// unlock(p->rwlatch); return 0; } @@ -127,7 +138,8 @@ inline static Page * getFreePage() { assert(!*pagePendingPtr(ret)); if(ret->dirty) { pthread_mutex_unlock(&mut); - stasis_dirty_page_table_flush_range(stasis_runtime_dirty_page_table(), 0, 0); + DEBUG("Blocking app thread"); + stasis_dirty_page_table_flush(stasis_runtime_dirty_page_table()); pthread_mutex_lock(&mut); } else { break; @@ -148,12 +160,17 @@ inline static Page * getFreePage() { static void * writeBackWorker(void * ignored) { pthread_mutex_lock(&mut); while(1) { - while(running && pageCount < MAX_BUFFER_SIZE) { + while(running && (!needFlush())) { + flushing = 0; + DEBUG("Sleeping in write back worker (count = %lld)\n", stasis_dirty_page_table_dirty_count(stasis_runtime_dirty_page_table())); pthread_cond_wait(&needFree, &mut); + DEBUG("Woke write back worker (count = %lld)\n", stasis_dirty_page_table_dirty_count(stasis_runtime_dirty_page_table())); + flushing = 1; } if(!running) { break; } pthread_mutex_unlock(&mut); - stasis_dirty_page_table_flush_range(stasis_runtime_dirty_page_table(), 0, 0); + DEBUG("Calling flush\n"); + stasis_dirty_page_table_flush(stasis_runtime_dirty_page_table()); pthread_mutex_lock(&mut); } pthread_mutex_unlock(&mut); @@ -297,7 +314,7 @@ static Page * bhLoadPageImpl_helper(int xid, const pageid_t pageid, int uninitia pthread_cond_broadcast(&readComplete); // TODO Improve writeback policy - if(stasis_dirty_page_table_dirty_count(stasis_runtime_dirty_page_table()) > MAX_BUFFER_SIZE / 5) { + if((!flushing) && needFlush()) { pthread_cond_signal(&needFree); } assert(ret->id == pageid); @@ -339,13 +356,15 @@ static void bhForcePageRange(pageid_t start, pageid_t stop) { page_handle->force_range(page_handle, start, stop); } static void bhBufDeinit() { + pthread_mutex_lock(&mut); running = 0; + pthread_mutex_unlock(&mut); pthread_cond_signal(&needFree); // Wake up the writeback thread so it will exit. pthread_join(worker, 0); // XXX flush range should return an error number, which we would check. (Right now, it aborts...) - stasis_dirty_page_table_flush_range(stasis_runtime_dirty_page_table(), 0, 0); + stasis_dirty_page_table_flush(stasis_runtime_dirty_page_table()); struct LH_ENTRY(list) iter; const struct LH_ENTRY(pair_t) * next; @@ -367,7 +386,9 @@ static void bhBufDeinit() { page_handle->close(page_handle); } static void bhSimulateBufferManagerCrash() { + pthread_mutex_lock(&mut); running = 0; + pthread_mutex_unlock(&mut); pthread_cond_signal(&needFree); pthread_join(worker, 0); @@ -408,6 +429,8 @@ void stasis_buffer_manager_hash_open(stasis_page_handle_t * h) { stasis_buffer_manager_close = bhBufDeinit; stasis_buffer_manager_simulate_crash = bhSimulateBufferManagerCrash; + flushing = 0; + stasis_buffer_pool = stasis_buffer_pool_init(); lru = lruFastInit(pageGetNode, pageSetNode, 0); diff --git a/src/stasis/bufferManager/legacy/pageFile.c b/src/stasis/bufferManager/legacy/pageFile.c index e991bb4..250bedc 100644 --- a/src/stasis/bufferManager/legacy/pageFile.c +++ b/src/stasis/bufferManager/legacy/pageFile.c @@ -77,6 +77,7 @@ static void pfPageRead(stasis_page_handle_t * h, Page *ret, pagetype_t type) { dirty page table can be kept up to date. */ static void pfPageWrite(stasis_page_handle_t * h, Page * ret) { /** If the page is clean, there's no reason to write it out. */ + assertlocked(ret->rwlatch); if(!stasis_dirty_page_table_is_dirty(h->dirtyPages, ret)) { DEBUG(" =^)~ "); return; diff --git a/src/stasis/bufferManager/pageArray.c b/src/stasis/bufferManager/pageArray.c index 4903300..c4bf9ee 100644 --- a/src/stasis/bufferManager/pageArray.c +++ b/src/stasis/bufferManager/pageArray.c @@ -44,7 +44,9 @@ static Page* paGetCachedPage(int xid, pageid_t page) { return paLoadPage(xid, page, UNKNOWN_TYPE_PAGE); } static void paReleasePage(Page * p) { + writelock(p->rwlatch,0); stasis_dirty_page_table_set_clean(stasis_runtime_dirty_page_table(), p); + unlock(p->rwlatch); } static int paWriteBackPage(pageid_t p) { return 0; /* no-op */ } diff --git a/src/stasis/dirtyPageTable.c b/src/stasis/dirtyPageTable.c index bed4d2d..67a45f6 100644 --- a/src/stasis/dirtyPageTable.c +++ b/src/stasis/dirtyPageTable.c @@ -51,6 +51,7 @@ void stasis_dirty_page_table_set_dirty(stasis_dirty_page_table_t * dirtyPages, P void stasis_dirty_page_table_set_clean(stasis_dirty_page_table_t * dirtyPages, Page * p) { pthread_mutex_lock(&dirtyPages->mutex); + assertlocked(p->rwlatch); dpt_entry dummy = {p->id, 0}; const dpt_entry * e = rbdelete(&dummy, dirtyPages->table); @@ -69,6 +70,8 @@ void stasis_dirty_page_table_set_clean(stasis_dirty_page_table_t * dirtyPages, P int stasis_dirty_page_table_is_dirty(stasis_dirty_page_table_t * dirtyPages, Page * p) { int ret; pthread_mutex_lock(&dirtyPages->mutex); + assertlocked(p->rwlatch); + ret = p->dirty; dpt_entry e = { p->id, 0}; const void* found = rbfind(&e, dirtyPages->table); @@ -94,12 +97,39 @@ lsn_t stasis_dirty_page_table_minRecLSN(stasis_dirty_page_table_t * dirtyPages) pageid_t stasis_dirty_page_table_dirty_count(stasis_dirty_page_table_t * dirtyPages) { pthread_mutex_lock(&dirtyPages->mutex); pageid_t ret = dirtyPages->count; + assert(dirtyPages->count >= 0); pthread_mutex_unlock(&dirtyPages->mutex); return ret; } void stasis_dirty_page_table_flush(stasis_dirty_page_table_t * dirtyPages) { - stasis_dirty_page_table_flush_range(dirtyPages, 0, 0); // pageid_t = 0 means flush to EOF. + dpt_entry dummy = { 0, 0 }; + const int stride = 200; + pageid_t vals[stride]; + int off = 0; + int strides = 0; + pthread_mutex_lock(&dirtyPages->mutex); + for(const dpt_entry * e = rblookup(RB_LUGTEQ, &dummy, dirtyPages->table) ; + e; + e = rblookup(RB_LUGREAT, &dummy, dirtyPages->table)) { + dummy = *e; + vals[off] = dummy.p; + off++; + if(off == stride) { + pthread_mutex_unlock(&dirtyPages->mutex); + for(pageid_t i = 0; i < off; i++) { + writeBackPage(vals[i]); + } + off = 0; + strides++; + pthread_mutex_lock(&dirtyPages->mutex); + } + } + pthread_mutex_unlock(&dirtyPages->mutex); + for(int i = 0; i < off; i++) { + writeBackPage(vals[i]); + } +// if(strides < 5) { DEBUG("strides: %d dirtyCount = %lld\n", strides, stasis_dirty_page_table_dirty_count(dirtyPages)); } } void stasis_dirty_page_table_flush_range(stasis_dirty_page_table_t * dirtyPages, pageid_t start, pageid_t stop) { @@ -121,12 +151,13 @@ void stasis_dirty_page_table_flush_range(stasis_dirty_page_table_t * dirtyPages, if(stop && (err == EBUSY)) { abort(); /*api violation!*/ } } free(staleDirtyPages); - forcePageRange(start*PAGE_SIZE,stop*PAGE_SIZE); +// forcePageRange(start*PAGE_SIZE,stop*PAGE_SIZE); } stasis_dirty_page_table_t * stasis_dirty_page_table_init() { stasis_dirty_page_table_t * ret = malloc(sizeof(*ret)); ret->table = rbinit(dpt_cmp, 0); + ret->count = 0; pthread_mutex_init(&ret->mutex, 0); return ret; } diff --git a/src/stasis/operations/lsmTree.c b/src/stasis/operations/lsmTree.c index 2dce592..a2be38d 100644 --- a/src/stasis/operations/lsmTree.c +++ b/src/stasis/operations/lsmTree.c @@ -57,6 +57,7 @@ void TlsmRegionForceRid(int xid, void *conf) { pageid_t pid; Tread(xid,a.regionList,&pid); stasis_dirty_page_table_flush_range(stasis_runtime_dirty_page_table(), pid, pid+a.regionSize); + forcePageRange(pid, pid+a.regionSize); // TregionDealloc(xid,pid); } } diff --git a/src/stasis/pageHandle.c b/src/stasis/pageHandle.c index a5bb6dd..f273f4b 100644 --- a/src/stasis/pageHandle.c +++ b/src/stasis/pageHandle.c @@ -13,9 +13,8 @@ static void phWrite(stasis_page_handle_t * ph, Page * ret) { // This lock is only held to make the page implementation happy. We should // implicitly have exclusive access to the page before this function is called, // or we'll deadlock. - - /// TODO Turn into trywritelock + test for trouble. - writelock(ret->rwlatch,0); + int locked = trywritelock(ret->rwlatch,0); + assert(locked); if(!ret->dirty) { unlock(ret->rwlatch); return; } stasis_page_flushed(ret); if(ph->log) { stasis_log_force(ph->log, ret->LSN, LOG_FORCE_WAL); } @@ -29,7 +28,8 @@ static void phWrite(stasis_page_handle_t * ph, Page * ret) { unlock(ret->rwlatch); } static void phRead(stasis_page_handle_t * ph, Page * ret, pagetype_t type) { - writelock(ret->rwlatch,0); + int locked = trywritelock(ret->rwlatch,0); + assert(locked); int err = ((stasis_handle_t*)ph->impl)->read(ph->impl, PAGE_SIZE * ret->id, ret->memAddr, PAGE_SIZE); if(err) { if(err == EDOM) { @@ -50,7 +50,7 @@ static void phForce(stasis_page_handle_t * ph) { assert(!err); } static void phForceRange(stasis_page_handle_t * ph, lsn_t start, lsn_t stop) { - int err = ((stasis_handle_t*)ph->impl)->force_range(ph->impl,start,stop); + int err = ((stasis_handle_t*)ph->impl)->force_range(ph->impl,start*PAGE_SIZE,stop*PAGE_SIZE); assert(!err); } static void phClose(stasis_page_handle_t * ph) { diff --git a/src/stasis/truncation.c b/src/stasis/truncation.c index 7babcd3..77a1dc9 100644 --- a/src/stasis/truncation.c +++ b/src/stasis/truncation.c @@ -1,5 +1,5 @@ #include - +#include struct stasis_truncation_t { char initialized; char automaticallyTruncating; @@ -104,20 +104,20 @@ int stasis_truncation_truncate(stasis_truncation_t* trunc, int force) { } else { lsn_t flushed = trunc->log->first_unstable_lsn(trunc->log, LOG_FORCE_WAL); if(force || flushed - log_trunc > 2 * TARGET_LOG_SIZE) { - //fprintf(stderr, "Flushing dirty buffers: rec_lsn = %ld log_trunc = %ld flushed = %ld\n", rec_lsn, log_trunc, flushed); - stasis_dirty_page_table_flush(trunc->dirty_pages); + DEBUG("Flushing dirty buffers: rec_lsn = %lld log_trunc = %lld flushed = %lld\n", rec_lsn, log_trunc, flushed); + stasis_dirty_page_table_flush(trunc->dirty_pages); - page_rec_lsn = stasis_dirty_page_table_minRecLSN(trunc->dirty_pages); - rec_lsn = page_rec_lsn < xact_rec_lsn ? page_rec_lsn : xact_rec_lsn; - rec_lsn = (rec_lsn < flushed_lsn) ? rec_lsn : flushed_lsn; + page_rec_lsn = stasis_dirty_page_table_minRecLSN(trunc->dirty_pages); + rec_lsn = page_rec_lsn < xact_rec_lsn ? page_rec_lsn : xact_rec_lsn; + rec_lsn = (rec_lsn < flushed_lsn) ? rec_lsn : flushed_lsn; - //fprintf(stderr, "Flushed Dirty Buffers. Truncating to rec_lsn = %ld\n", rec_lsn); + //fprintf(stderr, "Flushed Dirty Buffers. Truncating to rec_lsn = %ld\n", rec_lsn); - forcePages(); - trunc->log->truncate(trunc->log, rec_lsn); - return 1; + forcePages(); + trunc->log->truncate(trunc->log, rec_lsn); + return 1; } else { - return 0; + return 0; } } } else { diff --git a/stasis/bufferManager.h b/stasis/bufferManager.h index 0041bf4..2996f35 100644 --- a/stasis/bufferManager.h +++ b/stasis/bufferManager.h @@ -155,6 +155,9 @@ extern void (*forcePages)(); Force written back pages that fall within a particular range to disk. This does not force page that have not been written to with pageWrite(). + + @param start the first pageid to be forced to disk + @param stop the page after the last page to be forced to disk. */ extern void (*forcePageRange)(pageid_t start, pageid_t stop); extern void (*stasis_buffer_manager_simulate_crash)(); diff --git a/stasis/pageHandle.h b/stasis/pageHandle.h index 60eb086..9f1a26c 100644 --- a/stasis/pageHandle.h +++ b/stasis/pageHandle.h @@ -61,6 +61,10 @@ struct stasis_page_handle_t { as well...) */ void (*force_file)(struct stasis_page_handle_t* ph); + /** + * @param start the pageid of the first page to be forced to disk. + * @param stop the pageid of the page after the last page to be forced to disk. + */ void (*force_range)(struct stasis_page_handle_t* ph, lsn_t start, lsn_t stop); /** Force the page file to disk, then close it.