stop gratuitiously forcing the log to disk each time it is read (which was causing Tabort() to run extremely slowly). Also, add a "tryflush()" method to ringbuffer.

This commit is contained in:
Sears Russell 2011-06-12 05:15:42 +00:00
parent 88d9ce157a
commit bb89c5a0cf
3 changed files with 43 additions and 6 deletions

View file

@ -305,13 +305,26 @@ int stasis_log_file_pool_write_entry(stasis_log_t * log, LogEntry * e) {
* Does no latching. No shared state, except for fd, which is * Does no latching. No shared state, except for fd, which is
* protected from being closed by truncation. * protected from being closed by truncation.
*/ */
const LogEntry* stasis_log_file_pool_chunk_read_entry(int fd, lsn_t file_offset, lsn_t lsn, uint32_t * len) { const LogEntry* stasis_log_file_pool_chunk_read_entry(stasis_log_file_pool_state * fp, int fd, lsn_t file_offset, lsn_t lsn, uint32_t * len) {
int err; int err;
if(sizeof(*len) != (err = mypread(fd, (byte*)len, sizeof(*len), lsn-file_offset))) { if(sizeof(*len) != (err = mypread(fd, (byte*)len, sizeof(*len), lsn-file_offset))) {
if(err == 0) { DEBUG(stderr, "EOF reading len from log\n"); return 0; } if(err == 0) { DEBUG(stderr, "EOF reading len from log\n"); return 0; }
abort(); abort();
} }
if(*len == 0) { DEBUG(stderr, "Reached end of log\n"); return 0; } if(*len == 0) { DEBUG(stderr, "Reached end of log\n"); return 0; }
// Force bytes containing body of log entry to disk.
if(fp->ring) { // if not, then we're in startup, and don't need to flush.
if(stasis_ringbuffer_get_write_frontier(fp->ring) > lsn) {
stasis_ringbuffer_flush(fp->ring, lsn+sizeof(uint32_t)+*len);
} else {
// there is a ringbuffer, and the read is past the eof.
// this should only happen at the end of recovery's forward
// scans.
return 0;
}
}
byte * buf = malloc(*len + sizeof(uint32_t)); byte * buf = malloc(*len + sizeof(uint32_t));
if(!buf) { if(!buf) {
fprintf(stderr, "Couldn't alloc memory for log entry of size %lld. " fprintf(stderr, "Couldn't alloc memory for log entry of size %lld. "
@ -353,8 +366,15 @@ int stasis_log_file_pool_chunk_write_buffer(int fd, const byte * buf, size_t sz,
} }
const LogEntry* stasis_log_file_pool_read_entry(struct stasis_log_t* log, lsn_t lsn) { const LogEntry* stasis_log_file_pool_read_entry(struct stasis_log_t* log, lsn_t lsn) {
stasis_log_file_pool_state * fp = log->impl; stasis_log_file_pool_state * fp = log->impl;
// expedient hack; force to disk before issuing read. if(fp->ring) {
log->force_tail(log, 0); /// xxx use real constant for wal mode.. // Force bytes containing length of log entry to disk.
if(stasis_ringbuffer_get_write_frontier(fp->ring) > lsn) {
stasis_ringbuffer_flush(fp->ring, lsn+sizeof(uint32_t));
} else {
// end of log
return 0;
}
} // else, we haven't finished initialization, so there are no bytes to flush.
const LogEntry * e; const LogEntry * e;
pthread_mutex_lock(&fp->mut); pthread_mutex_lock(&fp->mut);
int chunk = get_chunk_from_offset(log, lsn); int chunk = get_chunk_from_offset(log, lsn);
@ -372,7 +392,7 @@ const LogEntry* stasis_log_file_pool_read_entry(struct stasis_log_t* log, lsn_t
pthread_mutex_unlock(&fp->mut); pthread_mutex_unlock(&fp->mut);
// Be sure not to hold a mutex while hitting disk. // Be sure not to hold a mutex while hitting disk.
uint32_t len; uint32_t len;
e = stasis_log_file_pool_chunk_read_entry(fd, off, lsn, &len); e = stasis_log_file_pool_chunk_read_entry(fp, fd, off, lsn, &len);
if(e) { assert(sizeofLogEntry(log, e) == len); } if(e) { assert(sizeofLogEntry(log, e) == len); }
} }
return e; return e;
@ -473,7 +493,7 @@ lsn_t stasis_log_file_pool_chunk_scrub_to_eof(stasis_log_t * log, int fd, lsn_t
lsn_t cur_off = file_off; lsn_t cur_off = file_off;
const LogEntry * e; const LogEntry * e;
uint32_t len; uint32_t len;
while((e = stasis_log_file_pool_chunk_read_entry(fd, file_off, cur_off, &len))) { while((e = stasis_log_file_pool_chunk_read_entry(log->impl, fd, file_off, cur_off, &len))) {
cur_off = log->next_entry(log, e); cur_off = log->next_entry(log, e);
log->read_entry_done(log, e); log->read_entry_done(log, e);
} }
@ -714,6 +734,8 @@ stasis_log_t* stasis_log_file_pool_open(const char* dirname, int filemode, int f
printf("Current log segment appears to be %s. Scanning for next available LSN\n", fp->live_filenames[fp->live_count-1]); printf("Current log segment appears to be %s. Scanning for next available LSN\n", fp->live_filenames[fp->live_count-1]);
fp->ring = 0; // scrub calls read, which tries to flush the ringbuffer. passing null disables the flush.
next_lsn = stasis_log_file_pool_chunk_scrub_to_eof(ret, fp->ro_fd[fp->live_count-1], fp->live_offsets[fp->live_count-1]); next_lsn = stasis_log_file_pool_chunk_scrub_to_eof(ret, fp->ro_fd[fp->live_count-1], fp->live_offsets[fp->live_count-1]);
printf("Scan returned %lld\n", (long long)next_lsn); printf("Scan returned %lld\n", (long long)next_lsn);

View file

@ -301,8 +301,12 @@ stasis_ringbuffer_t * stasis_ringbuffer_init(intptr_t base, lsn_t initial_offset
return ring; return ring;
} }
void stasis_ringbuffer_flush(stasis_ringbuffer_t * ring, lsn_t off) { static int stasis_ringbuffer_flush_impl(stasis_ringbuffer_t * ring, lsn_t off, int past_end) {
pthread_mutex_lock(&ring->mut); pthread_mutex_lock(&ring->mut);
if(ring->wt < off && !past_end) {
pthread_mutex_unlock(&ring->mut);
return RING_VOLATILE;
}
if(ring->flush < off) { ring->flush = off; } if(ring->flush < off) { ring->flush = off; }
while(ring->rt < off) { while(ring->rt < off) {
pthread_cond_signal(&ring->write_done); pthread_cond_signal(&ring->write_done);
@ -311,6 +315,13 @@ void stasis_ringbuffer_flush(stasis_ringbuffer_t * ring, lsn_t off) {
} }
DEBUG("flushed rt = %lld off = %lld\n", ring->rt, off); DEBUG("flushed rt = %lld off = %lld\n", ring->rt, off);
pthread_mutex_unlock(&ring->mut); pthread_mutex_unlock(&ring->mut);
return 0;
}
int stasis_ringbuffer_tryflush(stasis_ringbuffer_t * ring, lsn_t off) {
return stasis_ringbuffer_flush_impl(ring, off, 0);
}
void stasis_ringbuffer_flush(stasis_ringbuffer_t * ring, lsn_t off) {
stasis_ringbuffer_flush_impl(ring, off, 1);
} }
void stasis_ringbuffer_shutdown(stasis_ringbuffer_t * ring) { void stasis_ringbuffer_shutdown(stasis_ringbuffer_t * ring) {
pthread_mutex_lock(&ring->mut); pthread_mutex_lock(&ring->mut);

View file

@ -31,6 +31,10 @@ lsn_t stasis_ringbuffer_get_write_frontier(stasis_ringbuffer_t * ring);
void stasis_ringbuffer_advance_read_tail(stasis_ringbuffer_t * ring, lsn_t off); void stasis_ringbuffer_advance_read_tail(stasis_ringbuffer_t * ring, lsn_t off);
typedef enum { RING_TORN = -1, RING_VOLATILE = -2, RING_FULL = -3, RING_TRUNCATED = -4, RING_NEXT = -5, RING_CLOSED = -6, RING_MINERR = -7 } stasis_ringbuffer_error_t; typedef enum { RING_TORN = -1, RING_VOLATILE = -2, RING_FULL = -3, RING_TRUNCATED = -4, RING_NEXT = -5, RING_CLOSED = -6, RING_MINERR = -7 } stasis_ringbuffer_error_t;
void stasis_ringbuffer_flush(stasis_ringbuffer_t * ring, lsn_t off); void stasis_ringbuffer_flush(stasis_ringbuffer_t * ring, lsn_t off);
/*
* Like flush, but if off could still be modified, immediately returns RING_VOLATILE instead of flushing the ringbuffer.
*/
int stasis_ringbuffer_tryflush(stasis_ringbuffer_t * ring, lsn_t off);
// Causes ringbuffer requests to stop blocking, and return RING_CLOSED // Causes ringbuffer requests to stop blocking, and return RING_CLOSED
void stasis_ringbuffer_shutdown(stasis_ringbuffer_t * ring); void stasis_ringbuffer_shutdown(stasis_ringbuffer_t * ring);