implement deinit for concurrent ringbuffer
This commit is contained in:
parent
f64a3c2aba
commit
9a48a8553a
1 changed files with 29 additions and 12 deletions
|
@ -46,6 +46,8 @@ struct stasis_ringbuffer_t {
|
|||
pthread_cond_t read_done;
|
||||
pthread_cond_t write_done;
|
||||
|
||||
int fd;
|
||||
|
||||
// If non-zero, all read requests act as though size is
|
||||
// RING_NEXT until the read frontier is greater than flush.
|
||||
lsn_t flush;
|
||||
|
@ -115,12 +117,12 @@ lsn_t stasis_ringbuffer_consume_bytes(stasis_ringbuffer_t * ring, lsn_t* sz, lsn
|
|||
lsn_t orig_sz = *sz;
|
||||
|
||||
if(ring->flush > ring->rf) {
|
||||
pthread_mutex_unlock(&ring->mut);
|
||||
struct timespec tv;
|
||||
tv.tv_sec = 0;
|
||||
tv.tv_nsec = 100000;
|
||||
nanosleep(&tv, 0);
|
||||
pthread_mutex_lock(&ring->mut);
|
||||
// pthread_mutex_unlock(&ring->mut);
|
||||
// struct timespec tv;
|
||||
// tv.tv_sec = 0;
|
||||
// tv.tv_nsec = 100000;
|
||||
// nanosleep(&tv, 0);
|
||||
// pthread_mutex_lock(&ring->mut);
|
||||
if(ring->flush > ring->rf) { *sz = RING_NEXT; }
|
||||
}
|
||||
if(ring->shutdown) {
|
||||
|
@ -259,8 +261,8 @@ stasis_ringbuffer_t * stasis_ringbuffer_init(intptr_t base, lsn_t initial_offset
|
|||
// Allocate the memory region using mmap black magic.
|
||||
|
||||
char* name = strdup("/dev/shm/stasis-ringbuffer-XXXXXX");
|
||||
int fd = mkstemp(name);
|
||||
if(fd == -1) { perror("Couldn't mkstemp\n"); abort(); }
|
||||
ring->fd = mkstemp(name);
|
||||
if(ring->fd == -1) { perror("Couldn't mkstemp\n"); abort(); }
|
||||
|
||||
int err;
|
||||
|
||||
|
@ -272,7 +274,7 @@ stasis_ringbuffer_t * stasis_ringbuffer_init(intptr_t base, lsn_t initial_offset
|
|||
|
||||
ring->mask = (1 << base) - 1;
|
||||
lsn_t size = ring->mask+1;
|
||||
err = ftruncate(fd, size);
|
||||
err = ftruncate(ring->fd, size);
|
||||
|
||||
if(err == -1) { perror("Couldn't ftruncate file"); }
|
||||
|
||||
|
@ -280,11 +282,11 @@ stasis_ringbuffer_t * stasis_ringbuffer_init(intptr_t base, lsn_t initial_offset
|
|||
|
||||
if(ring->mem == MAP_FAILED) { perror("Couldn't mmap anonymous region"); abort(); }
|
||||
|
||||
void * errp = mmap(ring->mem, size, PROT_READ|PROT_WRITE, MAP_SHARED|MAP_FIXED, fd, 0);
|
||||
void * errp = mmap(ring->mem, size, PROT_READ|PROT_WRITE, MAP_SHARED|MAP_FIXED, ring->fd, 0);
|
||||
|
||||
if(errp == MAP_FAILED) { perror("Couldn't mmap temp region"); abort(); }
|
||||
|
||||
errp = mmap(((char*)ring->mem)+size, size, PROT_READ|PROT_WRITE, MAP_SHARED|MAP_FIXED, fd, 0);
|
||||
errp = mmap(((char*)ring->mem)+size, size, PROT_READ|PROT_WRITE, MAP_SHARED|MAP_FIXED, ring->fd, 0);
|
||||
|
||||
if(errp == MAP_FAILED) { perror("Couldn't mmap temp region the second time."); abort(); }
|
||||
|
||||
|
@ -325,5 +327,20 @@ void stasis_ringbuffer_shutdown(stasis_ringbuffer_t * ring) {
|
|||
} while (ring->rt < ring->wf);
|
||||
|
||||
pthread_mutex_unlock(&ring->mut);
|
||||
// XXX free resources.
|
||||
|
||||
lsn_t size = ring->mask+1;
|
||||
int err = munmap(((char*)ring->mem), size * 2);
|
||||
if(err == -1) { perror("could not munmap first half of ringbuffer"); }
|
||||
munmap(((char*)ring->mem)+size, size);
|
||||
if(err == -1) { perror("could not munmap second half of ringbuffer"); }
|
||||
munmap(((char*)ring->mem), size);
|
||||
if(err == -1) { perror("could not munmap hidden backing region of ringbuffer"); }
|
||||
|
||||
stasis_aggregate_min_deinit(ring->min_reader);
|
||||
stasis_aggregate_min_deinit(ring->min_writer);
|
||||
pthread_mutex_destroy(&ring->mut);
|
||||
pthread_cond_destroy(&ring->read_done);
|
||||
pthread_cond_destroy(&ring->write_done);
|
||||
close(ring->fd);
|
||||
free(ring);
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue