diff --git a/src/stasis/logger/filePool.c b/src/stasis/logger/filePool.c index 16b37b8..9b2cf18 100644 --- a/src/stasis/logger/filePool.c +++ b/src/stasis/logger/filePool.c @@ -305,13 +305,26 @@ int stasis_log_file_pool_write_entry(stasis_log_t * log, LogEntry * e) { * Does no latching. No shared state, except for fd, which is * protected from being closed by truncation. */ -const LogEntry* stasis_log_file_pool_chunk_read_entry(int fd, lsn_t file_offset, lsn_t lsn, uint32_t * len) { +const LogEntry* stasis_log_file_pool_chunk_read_entry(stasis_log_file_pool_state * fp, int fd, lsn_t file_offset, lsn_t lsn, uint32_t * len) { int err; if(sizeof(*len) != (err = mypread(fd, (byte*)len, sizeof(*len), lsn-file_offset))) { if(err == 0) { DEBUG(stderr, "EOF reading len from log\n"); return 0; } abort(); } if(*len == 0) { DEBUG(stderr, "Reached end of log\n"); return 0; } + + // Force bytes containing body of log entry to disk. + if(fp->ring) { // if not, then we're in startup, and don't need to flush. + if(stasis_ringbuffer_get_write_frontier(fp->ring) > lsn) { + stasis_ringbuffer_flush(fp->ring, lsn+sizeof(uint32_t)+*len); + } else { + // there is a ringbuffer, and the read is past the eof. + // this should only happen at the end of recovery's forward + // scans. + return 0; + } + } + byte * buf = malloc(*len + sizeof(uint32_t)); if(!buf) { fprintf(stderr, "Couldn't alloc memory for log entry of size %lld. " @@ -353,8 +366,15 @@ int stasis_log_file_pool_chunk_write_buffer(int fd, const byte * buf, size_t sz, } const LogEntry* stasis_log_file_pool_read_entry(struct stasis_log_t* log, lsn_t lsn) { stasis_log_file_pool_state * fp = log->impl; - // expedient hack; force to disk before issuing read. - log->force_tail(log, 0); /// xxx use real constant for wal mode.. + if(fp->ring) { + // Force bytes containing length of log entry to disk. + if(stasis_ringbuffer_get_write_frontier(fp->ring) > lsn) { + stasis_ringbuffer_flush(fp->ring, lsn+sizeof(uint32_t)); + } else { + // end of log + return 0; + } + } // else, we haven't finished initialization, so there are no bytes to flush. const LogEntry * e; pthread_mutex_lock(&fp->mut); int chunk = get_chunk_from_offset(log, lsn); @@ -372,7 +392,7 @@ const LogEntry* stasis_log_file_pool_read_entry(struct stasis_log_t* log, lsn_t pthread_mutex_unlock(&fp->mut); // Be sure not to hold a mutex while hitting disk. uint32_t len; - e = stasis_log_file_pool_chunk_read_entry(fd, off, lsn, &len); + e = stasis_log_file_pool_chunk_read_entry(fp, fd, off, lsn, &len); if(e) { assert(sizeofLogEntry(log, e) == len); } } return e; @@ -473,7 +493,7 @@ lsn_t stasis_log_file_pool_chunk_scrub_to_eof(stasis_log_t * log, int fd, lsn_t lsn_t cur_off = file_off; const LogEntry * e; uint32_t len; - while((e = stasis_log_file_pool_chunk_read_entry(fd, file_off, cur_off, &len))) { + while((e = stasis_log_file_pool_chunk_read_entry(log->impl, fd, file_off, cur_off, &len))) { cur_off = log->next_entry(log, e); log->read_entry_done(log, e); } @@ -714,6 +734,8 @@ stasis_log_t* stasis_log_file_pool_open(const char* dirname, int filemode, int f printf("Current log segment appears to be %s. Scanning for next available LSN\n", fp->live_filenames[fp->live_count-1]); + fp->ring = 0; // scrub calls read, which tries to flush the ringbuffer. passing null disables the flush. + next_lsn = stasis_log_file_pool_chunk_scrub_to_eof(ret, fp->ro_fd[fp->live_count-1], fp->live_offsets[fp->live_count-1]); printf("Scan returned %lld\n", (long long)next_lsn); diff --git a/src/stasis/util/ringbuffer.c b/src/stasis/util/ringbuffer.c index d5f63c9..d1501da 100644 --- a/src/stasis/util/ringbuffer.c +++ b/src/stasis/util/ringbuffer.c @@ -301,8 +301,12 @@ stasis_ringbuffer_t * stasis_ringbuffer_init(intptr_t base, lsn_t initial_offset return ring; } -void stasis_ringbuffer_flush(stasis_ringbuffer_t * ring, lsn_t off) { +static int stasis_ringbuffer_flush_impl(stasis_ringbuffer_t * ring, lsn_t off, int past_end) { pthread_mutex_lock(&ring->mut); + if(ring->wt < off && !past_end) { + pthread_mutex_unlock(&ring->mut); + return RING_VOLATILE; + } if(ring->flush < off) { ring->flush = off; } while(ring->rt < off) { pthread_cond_signal(&ring->write_done); @@ -311,6 +315,13 @@ void stasis_ringbuffer_flush(stasis_ringbuffer_t * ring, lsn_t off) { } DEBUG("flushed rt = %lld off = %lld\n", ring->rt, off); pthread_mutex_unlock(&ring->mut); + return 0; +} +int stasis_ringbuffer_tryflush(stasis_ringbuffer_t * ring, lsn_t off) { + return stasis_ringbuffer_flush_impl(ring, off, 0); +} +void stasis_ringbuffer_flush(stasis_ringbuffer_t * ring, lsn_t off) { + stasis_ringbuffer_flush_impl(ring, off, 1); } void stasis_ringbuffer_shutdown(stasis_ringbuffer_t * ring) { pthread_mutex_lock(&ring->mut); diff --git a/stasis/util/ringbuffer.h b/stasis/util/ringbuffer.h index 116878b..0a6ff17 100644 --- a/stasis/util/ringbuffer.h +++ b/stasis/util/ringbuffer.h @@ -31,6 +31,10 @@ lsn_t stasis_ringbuffer_get_write_frontier(stasis_ringbuffer_t * ring); void stasis_ringbuffer_advance_read_tail(stasis_ringbuffer_t * ring, lsn_t off); typedef enum { RING_TORN = -1, RING_VOLATILE = -2, RING_FULL = -3, RING_TRUNCATED = -4, RING_NEXT = -5, RING_CLOSED = -6, RING_MINERR = -7 } stasis_ringbuffer_error_t; void stasis_ringbuffer_flush(stasis_ringbuffer_t * ring, lsn_t off); +/* + * Like flush, but if off could still be modified, immediately returns RING_VOLATILE instead of flushing the ringbuffer. + */ +int stasis_ringbuffer_tryflush(stasis_ringbuffer_t * ring, lsn_t off); // Causes ringbuffer requests to stop blocking, and return RING_CLOSED void stasis_ringbuffer_shutdown(stasis_ringbuffer_t * ring);