add flush and shutdown apis to concurrent ringbuffer, tighten up get_rd_buf api

This commit is contained in:
Sears Russell 2011-04-18 20:21:16 +00:00
parent ab058db5d3
commit 175c26e125
3 changed files with 62 additions and 10 deletions

View file

@ -45,6 +45,13 @@ struct stasis_ringbuffer_t {
pthread_mutex_t mut;
pthread_cond_t read_done;
pthread_cond_t write_done;
// If non-zero, all read requests act as though size is
// RING_NEXT until the read frontier is greater than flush.
uint64_t flush;
// Once this is non-zero, no read will ever block. Attempts to
// write data after shutdown is set will have undefined semantics.
int shutdown;
};
// Does not need synchronization (only called from nb function).
@ -104,8 +111,24 @@ int64_t stasis_ringbuffer_nb_consume_bytes(stasis_ringbuffer_t * ring, int64_t o
int64_t stasis_ringbuffer_consume_bytes(stasis_ringbuffer_t * ring, int64_t* sz, int64_t * handle) {
pthread_mutex_lock(&ring->mut);
int64_t ret;
int64_t orig_sz = *sz;
*sz = (ring->flush > ring->rf) ? RING_NEXT : orig_sz;
if(ring->shutdown) {
if(ring->rt == ring->wf) {
fprintf(stderr, "Shutting down, and there are no more bytes. Signaling shutdown thread.\n");
pthread_cond_signal(&ring->read_done);
pthread_mutex_unlock(&ring->mut);
return RING_CLOSED;
} else {
*sz = RING_NEXT;
}
}
while(RING_VOLATILE == (ret = stasis_ringbuffer_nb_consume_bytes(ring, RING_NEXT, sz))) {
pthread_cond_wait(&ring->write_done, &ring->mut);
*sz = (ring->flush > ring->rf) ? RING_NEXT : orig_sz;
if(ring->shutdown) { *sz = RING_NEXT; }
}
if(handle) {
*handle = ret;
@ -117,15 +140,16 @@ int64_t stasis_ringbuffer_consume_bytes(stasis_ringbuffer_t * ring, int64_t* sz,
// Not threadsafe.
const void * stasis_ringbuffer_nb_get_rd_buf(stasis_ringbuffer_t * ring, int64_t off, int64_t sz) {
int64_t off2 = stasis_ringbuffer_nb_consume_bytes(ring, off, &sz);
if(off2 != off) { if(off != RING_NEXT || (off2 < 0 && off2 > RING_MINERR)) { return (const void*) off2; } }
if(off2 != off) { if(off != RING_NEXT || (off2 < 0 && off2 > RING_MINERR)) { return (const void*) (intptr_t)off2; } }
assert(! (off2 < 0 && off2 >= RING_MINERR));
return ptr_off(ring, off2);
}
// Explicit synchronization (blocks).
const void * stasis_ringbuffer_get_rd_buf(stasis_ringbuffer_t * ring, int64_t off, int64_t* sz) {
const void * stasis_ringbuffer_get_rd_buf(stasis_ringbuffer_t * ring, int64_t off, int64_t sz) {
pthread_mutex_lock(&ring->mut);
const void * ret;
while(((const void*)RING_VOLATILE) == (ret = stasis_ringbuffer_nb_get_rd_buf(ring, off, *sz))) {
assert(sz != RING_NEXT);
while(((const void*)RING_VOLATILE) == (ret = stasis_ringbuffer_nb_get_rd_buf(ring, off, sz))) {
pthread_cond_wait(&ring->write_done, &ring->mut);
}
pthread_mutex_unlock(&ring->mut);
@ -140,7 +164,12 @@ void stasis_ringbuffer_nb_advance_write_tail(stasis_ringbuffer_t * ring, int64
ring->wt = off;
assert(ring->wt <= ring->wf);
}
int64_t stasis_ringbuffer_current_write_tail(stasis_ringbuffer_t * ring) {
pthread_mutex_lock(&ring->mut);
int64_t ret = ring->wt;
pthread_mutex_unlock(&ring->mut);
return ret;
}
void stasis_ringbuffer_advance_write_tail(stasis_ringbuffer_t * ring, int64_t off) {
pthread_mutex_lock(&ring->mut);
stasis_ringbuffer_nb_advance_write_tail(ring, off);
@ -225,7 +254,7 @@ stasis_ringbuffer_t * stasis_ringbuffer_init(intptr_t base, int64_t initial_offs
// Done with the black magic.
ring->rt = ring->rf = ring->wt = ring->wf = 0;
ring->rt = ring->rf = ring->wt = ring->wf = initial_offset;
ring->min_reader = stasis_aggregate_min_init(0);
ring->min_writer = stasis_aggregate_min_init(0);
@ -235,5 +264,25 @@ stasis_ringbuffer_t * stasis_ringbuffer_init(intptr_t base, int64_t initial_offs
pthread_cond_init(&ring->write_done,0);
return ring;
}
void stasis_ringbuffer_flush(stasis_ringbuffer_t * ring, int64_t off) {
pthread_mutex_lock(&ring->mut);
if(ring->flush < off) { ring->flush = off; }
while(ring->rt < off) {
pthread_cond_signal(&ring->write_done);
pthread_cond_wait(&ring->read_done, &ring->mut);
}
pthread_mutex_unlock(&ring->mut);
}
void stasis_ringbuffer_shutdown(stasis_ringbuffer_t * ring) {
pthread_mutex_lock(&ring->mut);
ring->shutdown = 1;
while(ring->rt < ring->wf) {
fprintf(stderr, "%lld < %lld signaling readers for shutdown and sleeping\n", ring->rt, ring->wf);
pthread_cond_signal(&ring->write_done);
pthread_cond_wait(&ring->read_done,&ring->mut);
fprintf(stderr, "readers done\n");
}
pthread_mutex_unlock(&ring->mut);
// XXX free resources.
}

View file

@ -17,15 +17,18 @@ int64_t stasis_ringbuffer_nb_reserve_space(stasis_ringbuffer_t * ring, int64_t s
int64_t stasis_ringbuffer_reserve_space(stasis_ringbuffer_t * ring, int64_t sz, int64_t * handle);
void stasis_ringbuffer_read_done(stasis_ringbuffer_t * ring, int64_t * handle);
void stasis_ringbuffer_advance_write_tail(stasis_ringbuffer_t * ring, int64_t off);
int64_t stasis_ringbuffer_current_write_tail(stasis_ringbuffer_t * ring);
const void * stasis_ringbuffer_nb_get_rd_buf(stasis_ringbuffer_t * ring, int64_t off, int64_t sz);
// sz is a pointer to the desired size, or RING_NEXT for "as many bytes as possible"
int64_t stasis_ringbuffer_consume_bytes(stasis_ringbuffer_t * ring, int64_t* sz, int64_t * handle);
void stasis_ringbuffer_write_done(stasis_ringbuffer_t * ring, int64_t * handle);
// sz is a pointer to the desired size, or RING_NEXT for "as many bytes as possible"
const void * stasis_ringbuffer_get_rd_buf(stasis_ringbuffer_t * ring, int64_t off, int64_t* sz);
const void * stasis_ringbuffer_get_rd_buf(stasis_ringbuffer_t * ring, int64_t off, int64_t sz);
void * stasis_ringbuffer_get_wr_buf(stasis_ringbuffer_t * ring, int64_t off, int64_t sz);
void stasis_ringbuffer_advance_read_tail(stasis_ringbuffer_t * ring, int64_t off);
typedef enum { RING_TORN = -1, RING_VOLATILE = -2, RING_FULL = -3, RING_TRUNCATED = -4, RING_NEXT = -5, RING_MINERR = -6 } stasis_ringbuffer_error_t;
typedef enum { RING_TORN = -1, RING_VOLATILE = -2, RING_FULL = -3, RING_TRUNCATED = -4, RING_NEXT = -5, RING_CLOSED = -6, RING_MINERR = -7 } stasis_ringbuffer_error_t;
void stasis_ringbuffer_flush(stasis_ringbuffer_t * ring, int64_t off);
void stasis_ringbuffer_shutdown(stasis_ringbuffer_t * ring);
END_C_DECLS
#endif /* RINGBUFFER_H_ */

View file

@ -85,7 +85,7 @@ static void * consumerWorker(void * arg) {
while(cursor < PROD_CONS_SIZE) {
int64_t rnd_size = myrandom(2048);
if(rnd_size + cursor > PROD_CONS_SIZE) { rnd_size = PROD_CONS_SIZE - cursor; }
byte const * rd_buf = stasis_ringbuffer_get_rd_buf(ring, RING_NEXT, &rnd_size);
byte const * rd_buf = stasis_ringbuffer_get_rd_buf(ring, RING_NEXT, rnd_size);
for(uint64_t i = 0; i < rnd_size; i++) {
// printf("R[%lld] (addr=%lld) val = %d (%d)\n", cursor+i, (long long)(rd_buf)+i, rd_buf[i], (cursor+i)%250);
assert(rd_buf[i] == ((cursor + i)%250));
@ -142,7 +142,7 @@ static void * concurrentReader(void * argp) {
if(rnd_size + cursor > BYTES_PER_THREAD) { rnd_size = BYTES_PER_THREAD - cursor; }
stasis_ringbuffer_consume_bytes(ring, &rnd_size, &rd_handle);
byte const * rd_buf = stasis_ringbuffer_get_rd_buf(ring, rd_handle, &rnd_size);
byte const * rd_buf = stasis_ringbuffer_get_rd_buf(ring, rd_handle, rnd_size);
for(uint64_t i = 0; i < rnd_size; i++) {
// printf("R[%lld] (addr=%lld) val = %d (%d)\n", cursor+i, (long long)(rd_buf)+i, rd_buf[i], (cursor+i)%250);