IMPORTANT API CHANGE: stasis_dirty_page_table_flush_range() no longer force writes to disk. You now need to call forcePageRange() after flush page range returns.
This commit should significantly improve sequential write thoughput. I found a segfault bug this morning, but cannot reproduce. Could be a heisenbug, or could have been a hardware fault (gojira repeatedly kernel paniced, then both problems went away...)
This commit is contained in:
parent
0cc65aefaf
commit
0154f7d29d
10 changed files with 103 additions and 27 deletions
|
@ -33,7 +33,7 @@ int main(int argc, char ** argv) {
|
|||
int direct = 0;
|
||||
int legacyBM = 0;
|
||||
int legacyFH = 0;
|
||||
|
||||
int stake = 0;
|
||||
long page_count = mb_to_page(100);
|
||||
|
||||
for(int i = 1; i < argc; i++) {
|
||||
|
@ -65,6 +65,9 @@ int main(int argc, char ** argv) {
|
|||
} else if(!strcmp(argv[i], "--mb")) {
|
||||
i++;
|
||||
page_count = mb_to_page(atoll(argv[i]));
|
||||
} else if(!strcmp(argv[i], "--stake")) {
|
||||
i++;
|
||||
stake = mb_to_page(atoll(argv[i]));
|
||||
} else {
|
||||
printf("Unknown argument: %s\n", argv[i]);
|
||||
return 1;
|
||||
|
@ -78,6 +81,14 @@ int main(int argc, char ** argv) {
|
|||
|
||||
Tinit();
|
||||
|
||||
if(stake) {
|
||||
Page * p = loadPage(-1, stake);
|
||||
writelock(p->rwlatch,0);
|
||||
stasis_dirty_page_table_set_dirty(stasis_runtime_dirty_page_table(), p);
|
||||
unlock(p->rwlatch);
|
||||
releasePage(p);
|
||||
}
|
||||
|
||||
for(long i =0; i < page_count; i++) {
|
||||
Page * p = loadPage(-1, i);
|
||||
writelock(p->rwlatch,0);
|
||||
|
|
|
@ -30,6 +30,8 @@ static stasis_buffer_pool_t * stasis_buffer_pool;
|
|||
|
||||
static stasis_page_handle_t * page_handle;
|
||||
|
||||
static int flushing;
|
||||
|
||||
static int running;
|
||||
|
||||
typedef struct LL_ENTRY(node_t) node_t;
|
||||
|
@ -49,7 +51,14 @@ static inline struct Page_s ** pagePendingPtr(Page * p) {
|
|||
static inline intptr_t* pagePinCountPtr(Page * p) {
|
||||
return ((intptr_t*)(&((p)->queue)));
|
||||
}
|
||||
|
||||
static inline int needFlush() {
|
||||
pageid_t count = stasis_dirty_page_table_dirty_count(stasis_runtime_dirty_page_table());
|
||||
const pageid_t needed = 1000; //MAX_BUFFER_SIZE / 5;
|
||||
if(count > needed) {
|
||||
DEBUG("Need flush? Dirty: %lld Total: %lld ret = %d\n", count, needed, count > needed);
|
||||
}
|
||||
return count > needed;
|
||||
}
|
||||
#ifdef LONG_RUN
|
||||
|
||||
inline static void checkPageState(Page * p) {
|
||||
|
@ -96,11 +105,13 @@ inline static int tryToWriteBackPage(pageid_t page) {
|
|||
if(*pagePendingPtr(p) || *pagePinCountPtr(p)) {
|
||||
return EBUSY;
|
||||
}
|
||||
|
||||
DEBUG("Write(%ld)\n", (long)victim->id);
|
||||
page_handle->write(page_handle, p); /// XXX pageCleanup and pageFlushed might be heavyweight.
|
||||
|
||||
assert(!stasis_dirty_page_table_is_dirty(stasis_runtime_dirty_page_table(), p));
|
||||
// int locked = trywritelock(p->rwlatch,0);
|
||||
// assert(locked);
|
||||
// assert(!stasis_dirty_page_table_is_dirty(stasis_runtime_dirty_page_table(), p));
|
||||
// unlock(p->rwlatch);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -127,7 +138,8 @@ inline static Page * getFreePage() {
|
|||
assert(!*pagePendingPtr(ret));
|
||||
if(ret->dirty) {
|
||||
pthread_mutex_unlock(&mut);
|
||||
stasis_dirty_page_table_flush_range(stasis_runtime_dirty_page_table(), 0, 0);
|
||||
DEBUG("Blocking app thread");
|
||||
stasis_dirty_page_table_flush(stasis_runtime_dirty_page_table());
|
||||
pthread_mutex_lock(&mut);
|
||||
} else {
|
||||
break;
|
||||
|
@ -148,12 +160,17 @@ inline static Page * getFreePage() {
|
|||
static void * writeBackWorker(void * ignored) {
|
||||
pthread_mutex_lock(&mut);
|
||||
while(1) {
|
||||
while(running && pageCount < MAX_BUFFER_SIZE) {
|
||||
while(running && (!needFlush())) {
|
||||
flushing = 0;
|
||||
DEBUG("Sleeping in write back worker (count = %lld)\n", stasis_dirty_page_table_dirty_count(stasis_runtime_dirty_page_table()));
|
||||
pthread_cond_wait(&needFree, &mut);
|
||||
DEBUG("Woke write back worker (count = %lld)\n", stasis_dirty_page_table_dirty_count(stasis_runtime_dirty_page_table()));
|
||||
flushing = 1;
|
||||
}
|
||||
if(!running) { break; }
|
||||
pthread_mutex_unlock(&mut);
|
||||
stasis_dirty_page_table_flush_range(stasis_runtime_dirty_page_table(), 0, 0);
|
||||
DEBUG("Calling flush\n");
|
||||
stasis_dirty_page_table_flush(stasis_runtime_dirty_page_table());
|
||||
pthread_mutex_lock(&mut);
|
||||
}
|
||||
pthread_mutex_unlock(&mut);
|
||||
|
@ -297,7 +314,7 @@ static Page * bhLoadPageImpl_helper(int xid, const pageid_t pageid, int uninitia
|
|||
pthread_cond_broadcast(&readComplete);
|
||||
|
||||
// TODO Improve writeback policy
|
||||
if(stasis_dirty_page_table_dirty_count(stasis_runtime_dirty_page_table()) > MAX_BUFFER_SIZE / 5) {
|
||||
if((!flushing) && needFlush()) {
|
||||
pthread_cond_signal(&needFree);
|
||||
}
|
||||
assert(ret->id == pageid);
|
||||
|
@ -339,13 +356,15 @@ static void bhForcePageRange(pageid_t start, pageid_t stop) {
|
|||
page_handle->force_range(page_handle, start, stop);
|
||||
}
|
||||
static void bhBufDeinit() {
|
||||
pthread_mutex_lock(&mut);
|
||||
running = 0;
|
||||
pthread_mutex_unlock(&mut);
|
||||
|
||||
pthread_cond_signal(&needFree); // Wake up the writeback thread so it will exit.
|
||||
pthread_join(worker, 0);
|
||||
|
||||
// XXX flush range should return an error number, which we would check. (Right now, it aborts...)
|
||||
stasis_dirty_page_table_flush_range(stasis_runtime_dirty_page_table(), 0, 0);
|
||||
stasis_dirty_page_table_flush(stasis_runtime_dirty_page_table());
|
||||
|
||||
struct LH_ENTRY(list) iter;
|
||||
const struct LH_ENTRY(pair_t) * next;
|
||||
|
@ -367,7 +386,9 @@ static void bhBufDeinit() {
|
|||
page_handle->close(page_handle);
|
||||
}
|
||||
static void bhSimulateBufferManagerCrash() {
|
||||
pthread_mutex_lock(&mut);
|
||||
running = 0;
|
||||
pthread_mutex_unlock(&mut);
|
||||
|
||||
pthread_cond_signal(&needFree);
|
||||
pthread_join(worker, 0);
|
||||
|
@ -408,6 +429,8 @@ void stasis_buffer_manager_hash_open(stasis_page_handle_t * h) {
|
|||
stasis_buffer_manager_close = bhBufDeinit;
|
||||
stasis_buffer_manager_simulate_crash = bhSimulateBufferManagerCrash;
|
||||
|
||||
flushing = 0;
|
||||
|
||||
stasis_buffer_pool = stasis_buffer_pool_init();
|
||||
|
||||
lru = lruFastInit(pageGetNode, pageSetNode, 0);
|
||||
|
|
|
@ -77,6 +77,7 @@ static void pfPageRead(stasis_page_handle_t * h, Page *ret, pagetype_t type) {
|
|||
dirty page table can be kept up to date. */
|
||||
static void pfPageWrite(stasis_page_handle_t * h, Page * ret) {
|
||||
/** If the page is clean, there's no reason to write it out. */
|
||||
assertlocked(ret->rwlatch);
|
||||
if(!stasis_dirty_page_table_is_dirty(h->dirtyPages, ret)) {
|
||||
DEBUG(" =^)~ ");
|
||||
return;
|
||||
|
|
|
@ -44,7 +44,9 @@ static Page* paGetCachedPage(int xid, pageid_t page) {
|
|||
return paLoadPage(xid, page, UNKNOWN_TYPE_PAGE);
|
||||
}
|
||||
static void paReleasePage(Page * p) {
|
||||
writelock(p->rwlatch,0);
|
||||
stasis_dirty_page_table_set_clean(stasis_runtime_dirty_page_table(), p);
|
||||
unlock(p->rwlatch);
|
||||
}
|
||||
|
||||
static int paWriteBackPage(pageid_t p) { return 0; /* no-op */ }
|
||||
|
|
|
@ -51,6 +51,7 @@ void stasis_dirty_page_table_set_dirty(stasis_dirty_page_table_t * dirtyPages, P
|
|||
|
||||
void stasis_dirty_page_table_set_clean(stasis_dirty_page_table_t * dirtyPages, Page * p) {
|
||||
pthread_mutex_lock(&dirtyPages->mutex);
|
||||
assertlocked(p->rwlatch);
|
||||
dpt_entry dummy = {p->id, 0};
|
||||
const dpt_entry * e = rbdelete(&dummy, dirtyPages->table);
|
||||
|
||||
|
@ -69,6 +70,8 @@ void stasis_dirty_page_table_set_clean(stasis_dirty_page_table_t * dirtyPages, P
|
|||
int stasis_dirty_page_table_is_dirty(stasis_dirty_page_table_t * dirtyPages, Page * p) {
|
||||
int ret;
|
||||
pthread_mutex_lock(&dirtyPages->mutex);
|
||||
assertlocked(p->rwlatch);
|
||||
|
||||
ret = p->dirty;
|
||||
dpt_entry e = { p->id, 0};
|
||||
const void* found = rbfind(&e, dirtyPages->table);
|
||||
|
@ -94,12 +97,39 @@ lsn_t stasis_dirty_page_table_minRecLSN(stasis_dirty_page_table_t * dirtyPages)
|
|||
pageid_t stasis_dirty_page_table_dirty_count(stasis_dirty_page_table_t * dirtyPages) {
|
||||
pthread_mutex_lock(&dirtyPages->mutex);
|
||||
pageid_t ret = dirtyPages->count;
|
||||
assert(dirtyPages->count >= 0);
|
||||
pthread_mutex_unlock(&dirtyPages->mutex);
|
||||
return ret;
|
||||
}
|
||||
|
||||
void stasis_dirty_page_table_flush(stasis_dirty_page_table_t * dirtyPages) {
|
||||
stasis_dirty_page_table_flush_range(dirtyPages, 0, 0); // pageid_t = 0 means flush to EOF.
|
||||
dpt_entry dummy = { 0, 0 };
|
||||
const int stride = 200;
|
||||
pageid_t vals[stride];
|
||||
int off = 0;
|
||||
int strides = 0;
|
||||
pthread_mutex_lock(&dirtyPages->mutex);
|
||||
for(const dpt_entry * e = rblookup(RB_LUGTEQ, &dummy, dirtyPages->table) ;
|
||||
e;
|
||||
e = rblookup(RB_LUGREAT, &dummy, dirtyPages->table)) {
|
||||
dummy = *e;
|
||||
vals[off] = dummy.p;
|
||||
off++;
|
||||
if(off == stride) {
|
||||
pthread_mutex_unlock(&dirtyPages->mutex);
|
||||
for(pageid_t i = 0; i < off; i++) {
|
||||
writeBackPage(vals[i]);
|
||||
}
|
||||
off = 0;
|
||||
strides++;
|
||||
pthread_mutex_lock(&dirtyPages->mutex);
|
||||
}
|
||||
}
|
||||
pthread_mutex_unlock(&dirtyPages->mutex);
|
||||
for(int i = 0; i < off; i++) {
|
||||
writeBackPage(vals[i]);
|
||||
}
|
||||
// if(strides < 5) { DEBUG("strides: %d dirtyCount = %lld\n", strides, stasis_dirty_page_table_dirty_count(dirtyPages)); }
|
||||
}
|
||||
void stasis_dirty_page_table_flush_range(stasis_dirty_page_table_t * dirtyPages, pageid_t start, pageid_t stop) {
|
||||
|
||||
|
@ -121,12 +151,13 @@ void stasis_dirty_page_table_flush_range(stasis_dirty_page_table_t * dirtyPages,
|
|||
if(stop && (err == EBUSY)) { abort(); /*api violation!*/ }
|
||||
}
|
||||
free(staleDirtyPages);
|
||||
forcePageRange(start*PAGE_SIZE,stop*PAGE_SIZE);
|
||||
// forcePageRange(start*PAGE_SIZE,stop*PAGE_SIZE);
|
||||
}
|
||||
|
||||
stasis_dirty_page_table_t * stasis_dirty_page_table_init() {
|
||||
stasis_dirty_page_table_t * ret = malloc(sizeof(*ret));
|
||||
ret->table = rbinit(dpt_cmp, 0);
|
||||
ret->count = 0;
|
||||
pthread_mutex_init(&ret->mutex, 0);
|
||||
return ret;
|
||||
}
|
||||
|
|
|
@ -57,6 +57,7 @@ void TlsmRegionForceRid(int xid, void *conf) {
|
|||
pageid_t pid;
|
||||
Tread(xid,a.regionList,&pid);
|
||||
stasis_dirty_page_table_flush_range(stasis_runtime_dirty_page_table(), pid, pid+a.regionSize);
|
||||
forcePageRange(pid, pid+a.regionSize);
|
||||
// TregionDealloc(xid,pid);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -13,9 +13,8 @@ static void phWrite(stasis_page_handle_t * ph, Page * ret) {
|
|||
// This lock is only held to make the page implementation happy. We should
|
||||
// implicitly have exclusive access to the page before this function is called,
|
||||
// or we'll deadlock.
|
||||
|
||||
/// TODO Turn into trywritelock + test for trouble.
|
||||
writelock(ret->rwlatch,0);
|
||||
int locked = trywritelock(ret->rwlatch,0);
|
||||
assert(locked);
|
||||
if(!ret->dirty) { unlock(ret->rwlatch); return; }
|
||||
stasis_page_flushed(ret);
|
||||
if(ph->log) { stasis_log_force(ph->log, ret->LSN, LOG_FORCE_WAL); }
|
||||
|
@ -29,7 +28,8 @@ static void phWrite(stasis_page_handle_t * ph, Page * ret) {
|
|||
unlock(ret->rwlatch);
|
||||
}
|
||||
static void phRead(stasis_page_handle_t * ph, Page * ret, pagetype_t type) {
|
||||
writelock(ret->rwlatch,0);
|
||||
int locked = trywritelock(ret->rwlatch,0);
|
||||
assert(locked);
|
||||
int err = ((stasis_handle_t*)ph->impl)->read(ph->impl, PAGE_SIZE * ret->id, ret->memAddr, PAGE_SIZE);
|
||||
if(err) {
|
||||
if(err == EDOM) {
|
||||
|
@ -50,7 +50,7 @@ static void phForce(stasis_page_handle_t * ph) {
|
|||
assert(!err);
|
||||
}
|
||||
static void phForceRange(stasis_page_handle_t * ph, lsn_t start, lsn_t stop) {
|
||||
int err = ((stasis_handle_t*)ph->impl)->force_range(ph->impl,start,stop);
|
||||
int err = ((stasis_handle_t*)ph->impl)->force_range(ph->impl,start*PAGE_SIZE,stop*PAGE_SIZE);
|
||||
assert(!err);
|
||||
}
|
||||
static void phClose(stasis_page_handle_t * ph) {
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
#include <stasis/truncation.h>
|
||||
|
||||
#include <stdio.h>
|
||||
struct stasis_truncation_t {
|
||||
char initialized;
|
||||
char automaticallyTruncating;
|
||||
|
@ -104,20 +104,20 @@ int stasis_truncation_truncate(stasis_truncation_t* trunc, int force) {
|
|||
} else {
|
||||
lsn_t flushed = trunc->log->first_unstable_lsn(trunc->log, LOG_FORCE_WAL);
|
||||
if(force || flushed - log_trunc > 2 * TARGET_LOG_SIZE) {
|
||||
//fprintf(stderr, "Flushing dirty buffers: rec_lsn = %ld log_trunc = %ld flushed = %ld\n", rec_lsn, log_trunc, flushed);
|
||||
stasis_dirty_page_table_flush(trunc->dirty_pages);
|
||||
DEBUG("Flushing dirty buffers: rec_lsn = %lld log_trunc = %lld flushed = %lld\n", rec_lsn, log_trunc, flushed);
|
||||
stasis_dirty_page_table_flush(trunc->dirty_pages);
|
||||
|
||||
page_rec_lsn = stasis_dirty_page_table_minRecLSN(trunc->dirty_pages);
|
||||
rec_lsn = page_rec_lsn < xact_rec_lsn ? page_rec_lsn : xact_rec_lsn;
|
||||
rec_lsn = (rec_lsn < flushed_lsn) ? rec_lsn : flushed_lsn;
|
||||
page_rec_lsn = stasis_dirty_page_table_minRecLSN(trunc->dirty_pages);
|
||||
rec_lsn = page_rec_lsn < xact_rec_lsn ? page_rec_lsn : xact_rec_lsn;
|
||||
rec_lsn = (rec_lsn < flushed_lsn) ? rec_lsn : flushed_lsn;
|
||||
|
||||
//fprintf(stderr, "Flushed Dirty Buffers. Truncating to rec_lsn = %ld\n", rec_lsn);
|
||||
//fprintf(stderr, "Flushed Dirty Buffers. Truncating to rec_lsn = %ld\n", rec_lsn);
|
||||
|
||||
forcePages();
|
||||
trunc->log->truncate(trunc->log, rec_lsn);
|
||||
return 1;
|
||||
forcePages();
|
||||
trunc->log->truncate(trunc->log, rec_lsn);
|
||||
return 1;
|
||||
} else {
|
||||
return 0;
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
|
|
@ -155,6 +155,9 @@ extern void (*forcePages)();
|
|||
Force written back pages that fall within a particular range to disk.
|
||||
|
||||
This does not force page that have not been written to with pageWrite().
|
||||
|
||||
@param start the first pageid to be forced to disk
|
||||
@param stop the page after the last page to be forced to disk.
|
||||
*/
|
||||
extern void (*forcePageRange)(pageid_t start, pageid_t stop);
|
||||
extern void (*stasis_buffer_manager_simulate_crash)();
|
||||
|
|
|
@ -61,6 +61,10 @@ struct stasis_page_handle_t {
|
|||
as well...)
|
||||
*/
|
||||
void (*force_file)(struct stasis_page_handle_t* ph);
|
||||
/**
|
||||
* @param start the pageid of the first page to be forced to disk.
|
||||
* @param stop the pageid of the page after the last page to be forced to disk.
|
||||
*/
|
||||
void (*force_range)(struct stasis_page_handle_t* ph, lsn_t start, lsn_t stop);
|
||||
/**
|
||||
Force the page file to disk, then close it.
|
||||
|
|
Loading…
Reference in a new issue