fix deadlock in filePool logger. The deadlock was due to pages being evicted before Tupdate() released the log entry for writeback. Now, we track another timestamp, so that Tupdate() starts with something analogous to a writelock, and downgrades it to a readlock before pinning pages, and invoking callbacks

This commit is contained in:
Sears Russell 2011-07-15 17:37:50 +00:00
parent 852c46b97b
commit 7f332f85cc
5 changed files with 66 additions and 36 deletions

View file

@ -275,12 +275,25 @@ LogEntry * stasis_log_file_pool_reserve_entry(stasis_log_t * log, size_t szs) {
return e;
}
/**
* Does no latching, but does call ringbuffer.
* Does no latching. Everything is thread local, except the call to ringbuffer.
*/
int stasis_log_file_pool_write_entry_done(stasis_log_t * log, LogEntry * e) {
stasis_log_file_pool_state * fp = log->impl;
lsn_t * handle = pthread_getspecific(fp->handle_key);
assert(handle);
stasis_ringbuffer_reading_writer_done(fp->ring, handle);
return 0;
}
/**
* Does no latching. Other than the CRC computation (which is protected by
* ringbuffer), and the call to ringbuffer, everything is thread local.
*/
int stasis_log_file_pool_write_entry(stasis_log_t * log, LogEntry * e) {
stasis_log_file_pool_state * fp = log->impl;
lsn_t * handle = pthread_getspecific(fp->handle_key);
assert(handle);
byte * buf = (byte*)e;
lsn_t sz = sizeofLogEntry(log, e);
@ -294,13 +307,6 @@ int stasis_log_file_pool_write_entry_done(stasis_log_t * log, LogEntry * e) {
stasis_ringbuffer_write_done(fp->ring, handle);
return 0;
}
/**
* Does no latching. (no-op)
*/
int stasis_log_file_pool_write_entry(stasis_log_t * log, LogEntry * e) {
// no-op; the entry is written into the ringbuffer in place.
return 0;
}
/**
* Does no latching. No shared state, except for fd, which is
* protected from being closed by truncation.

View file

@ -1,9 +1,12 @@
#include <config.h>
#include <stasis/util/ringbuffer.h>
#include <stasis/util/min.h>
#include <stasis/util/latches.h>
#include <assert.h>
#include <sys/mman.h>
#include <stdio.h>
/**
* A ring buffer implementation.
*
@ -24,13 +27,14 @@
struct stasis_ringbuffer_t {
byte * mem;
lsn_t mask;
// Track four regions: write_frontier (wf), write_tail (wt), read_frontier (rf), read_tail (rt):
// Track four pointers: write_frontier (wf), write_tail (wt), read_frontier (rf), read_tail (rt)
// that are ordered with respect to each other.
// Logical buffer layout:
// byte zero
// ...
lsn_t rt; // First byte that some thread might be reading. Earlier bytes can be reclaimed.
lsn_t rt; // First byte that some thread might be reading. Bytes before this and rwt can be reclaimed.
lsn_t rf; // First byte that will be returned by "read next".
lsn_t wt; // First byte that some thread might be writing. Earlier bytes are stable for readers.
lsn_t wf; // First available byte
@ -38,7 +42,18 @@ struct stasis_ringbuffer_t {
// ...
// byte 2^64
// There is one more pointer, which is only ordered with respect to wt and
// wf. It is always less than or equal to wt. It is the first byte that
// may still be read by the thread that wrote it. This is needed so that
// data can be consumed by the readers in parallel with deferred processing
// at the writers. (Stasis uses this to allow log entries to be written to
// disk before they are applied in RAM. Otherwise, if a page pin performed
// by this log entry evicts a page updated by a later entry then we would
// deadlock.
lsn_t rwt;
stasis_aggregate_min_t * min_writer;
stasis_aggregate_min_t * min_reading_writer;
stasis_aggregate_min_t * min_reader;
// Synchronization stuff
@ -56,9 +71,13 @@ struct stasis_ringbuffer_t {
int shutdown;
};
static inline lsn_t effective_rt(stasis_ringbuffer_t * ring) {
return ring->rt < ring->rwt ? ring->rt : ring->rwt;
}
// Does not need synchronization (only called from nb function).
static inline lsn_t freespace(stasis_ringbuffer_t * ring) {
lsn_t ret = ((ring->rt - ring->wf) - 1) & ring->mask;
lsn_t ret = ((effective_rt(ring) - ring->wf) - 1) & ring->mask;
// printf("freespace is %lld\n", (long long)ret);
return ret;
}
@ -72,7 +91,7 @@ static inline void* ptr_off(stasis_ringbuffer_t * ring, lsn_t off) {
lsn_t stasis_ringbuffer_nb_reserve_space(stasis_ringbuffer_t * ring, lsn_t sz) {
if(freespace(ring) < sz) { return RING_FULL; }
lsn_t ret = ring->wf;
ring->wf += sz;
FETCH_AND_ADD(&ring->wf, sz); //ring->wf += sz;
return ret;
}
// Threadsafe (explicit synchronization). Blocks.
@ -85,6 +104,7 @@ lsn_t stasis_ringbuffer_reserve_space(stasis_ringbuffer_t * ring, lsn_t sz, lsn_
if(handle) {
*handle = ret;
stasis_aggregate_min_add(ring->min_writer, handle);
stasis_aggregate_min_add(ring->min_reading_writer, handle);
}
pthread_mutex_unlock(&ring->mut);
return ret;
@ -109,7 +129,7 @@ lsn_t stasis_ringbuffer_nb_consume_bytes(stasis_ringbuffer_t * ring, lsn_t off,
// something fugly is going on.
if(off + *sz > ring->wt) { return RING_VOLATILE; }
if(ring->rf < off + *sz) { ring->rf = off + *sz; }
if(ring->rf < off + *sz) { ATOMIC_WRITE_64(0, &ring->rf, off + *sz); }
return off;
}
@ -177,18 +197,14 @@ void * stasis_ringbuffer_get_wr_buf(stasis_ringbuffer_t * ring, lsn_t off, lsn_t
}
void stasis_ringbuffer_nb_advance_write_tail(stasis_ringbuffer_t * ring, lsn_t off) {
assert(off >= ring->wt);
ring->wt = off;
ATOMIC_WRITE_64(0, &ring->wt, off);
assert(ring->wt <= ring->wf);
}
lsn_t stasis_ringbuffer_current_write_tail(stasis_ringbuffer_t * ring) {
pthread_mutex_lock(&ring->mut);
lsn_t ret = ring->wt;
pthread_mutex_unlock(&ring->mut);
return ret;
}
void stasis_ringbuffer_advance_write_tail(stasis_ringbuffer_t * ring, lsn_t off) {
pthread_mutex_lock(&ring->mut);
stasis_ringbuffer_nb_advance_write_tail(ring, off);
// TODO ringbuffer is getting kind of complicated... Get rid of public nb api?
ATOMIC_WRITE_64(0,&ring->rwt, off);
pthread_cond_broadcast(&ring->write_done);
pthread_mutex_unlock(&ring->mut);
}
@ -205,27 +221,18 @@ void stasis_ringbuffer_write_done(stasis_ringbuffer_t * ring, lsn_t * off) {
}
lsn_t stasis_ringbuffer_get_read_tail(stasis_ringbuffer_t * ring) {
pthread_mutex_lock(&ring->mut);
lsn_t ret = ring->rt;
pthread_mutex_unlock(&ring->mut);
return ret;
return ATOMIC_READ_64(&ring->mut, &ring->rt);
}
lsn_t stasis_ringbuffer_get_write_tail(stasis_ringbuffer_t * ring) {
pthread_mutex_lock(&ring->mut);
lsn_t ret = ring->wt;
pthread_mutex_unlock(&ring->mut);
return ret;
return ATOMIC_READ_64(&ring->mut, &ring->wt);
}
lsn_t stasis_ringbuffer_get_write_frontier(stasis_ringbuffer_t * ring) {
pthread_mutex_lock(&ring->mut);
lsn_t ret = ring->wf;
pthread_mutex_unlock(&ring->mut);
return ret;
return ATOMIC_READ_64(&ring->mut,&ring->wf);
}
void stasis_ringbuffer_nb_advance_read_tail(stasis_ringbuffer_t * ring, lsn_t off) {
assert(off >= ring->rt);
assert(off <= ring->rf);
ring->rt = off;
ATOMIC_WRITE_64(0,&ring->rt,off);
}
void stasis_ringbuffer_advance_read_tail(stasis_ringbuffer_t * ring, lsn_t off) {
pthread_mutex_lock(&ring->mut);
@ -244,6 +251,19 @@ void stasis_ringbuffer_read_done(stasis_ringbuffer_t * ring, lsn_t * off) {
}
pthread_mutex_unlock(&ring->mut);
}
void stasis_ringbuffer_reading_writer_done(stasis_ringbuffer_t * ring, lsn_t * off) {
pthread_mutex_lock(&ring->mut);
stasis_aggregate_min_remove(ring->min_reading_writer, off);
lsn_t * new_rwtp = (lsn_t*)stasis_aggregate_min_compute(ring->min_reading_writer);
lsn_t new_rwt = new_rwtp ? *new_rwtp : ring->wf;
if(new_rwt != ring->rwt) {
assert(*off >= ring->rwt);
assert(*off <= ring->wf);
ATOMIC_WRITE_64(0, &ring->rwt, new_rwt);
pthread_cond_broadcast(&ring->read_done);
}
pthread_mutex_unlock(&ring->mut);
}
stasis_ringbuffer_t * stasis_ringbuffer_init(intptr_t base, lsn_t initial_offset) {
@ -288,14 +308,16 @@ stasis_ringbuffer_t * stasis_ringbuffer_init(intptr_t base, lsn_t initial_offset
// Done with the black magic.
ring->rt = ring->rf = ring->wt = ring->wf = initial_offset;
ring->rt = ring->rf = ring->rwt = ring->wt = ring->wf = initial_offset;
ring->min_reader = stasis_aggregate_min_init(0);
ring->min_writer = stasis_aggregate_min_init(0);
ring->min_reading_writer = stasis_aggregate_min_init(0);
ring->flush = 0;
ring->shutdown = 0;
pthread_mutex_init(&ring->mut,0);
DEBUG("ringbuffer mutex is %lx\n", &ring->mut);
pthread_cond_init(&ring->read_done,0);
pthread_cond_init(&ring->write_done,0);
@ -346,6 +368,7 @@ void stasis_ringbuffer_free(stasis_ringbuffer_t * ring) {
stasis_aggregate_min_deinit(ring->min_reader);
stasis_aggregate_min_deinit(ring->min_writer);
stasis_aggregate_min_deinit(ring->min_reading_writer);
pthread_mutex_destroy(&ring->mut);
pthread_cond_destroy(&ring->read_done);
pthread_cond_destroy(&ring->write_done);

View file

@ -201,7 +201,7 @@ struct stasis_log_t {
/**
Force any enqueued, unwritten entries to disk.
Once this method returns, any log entries written before the call began
Once this method returns, any log entries produced before the call began
should survive subsequent crashes. If the underlying log implementation
is not durable, then this method has no effect.

View file

@ -17,7 +17,7 @@ lsn_t stasis_ringbuffer_nb_reserve_space(stasis_ringbuffer_t * ring, lsn_t sz);
lsn_t stasis_ringbuffer_reserve_space(stasis_ringbuffer_t * ring, lsn_t sz, lsn_t * handle);
void stasis_ringbuffer_read_done(stasis_ringbuffer_t * ring, lsn_t * handle);
void stasis_ringbuffer_advance_write_tail(stasis_ringbuffer_t * ring, lsn_t off);
lsn_t stasis_ringbuffer_current_write_tail(stasis_ringbuffer_t * ring);
void stasis_ringbuffer_reading_writer_done(stasis_ringbuffer_t * ring, lsn_t * handle);
const void * stasis_ringbuffer_nb_get_rd_buf(stasis_ringbuffer_t * ring, lsn_t off, lsn_t sz);
// sz is a pointer to the desired size, or RING_NEXT for "as many bytes as possible"
lsn_t stasis_ringbuffer_consume_bytes(stasis_ringbuffer_t * ring, lsn_t* sz, lsn_t * handle);

View file

@ -169,6 +169,7 @@ static void * concurrentWriter(void * argp) {
}
cursor += rnd_size;
stasis_ringbuffer_write_done(ring, &wr_handle);
stasis_ringbuffer_reading_writer_done(ring, &wr_handle);
}
return 0;
}