From 9a48a8553a8dd4a92039b582113339cdb48a9359 Mon Sep 17 00:00:00 2001 From: Sears Russell Date: Mon, 25 Apr 2011 15:30:11 +0000 Subject: [PATCH] implement deinit for concurrent ringbuffer --- src/stasis/util/ringbuffer.c | 41 +++++++++++++++++++++++++----------- 1 file changed, 29 insertions(+), 12 deletions(-) diff --git a/src/stasis/util/ringbuffer.c b/src/stasis/util/ringbuffer.c index 784f96f..375fb72 100644 --- a/src/stasis/util/ringbuffer.c +++ b/src/stasis/util/ringbuffer.c @@ -46,6 +46,8 @@ struct stasis_ringbuffer_t { pthread_cond_t read_done; pthread_cond_t write_done; + int fd; + // If non-zero, all read requests act as though size is // RING_NEXT until the read frontier is greater than flush. lsn_t flush; @@ -115,12 +117,12 @@ lsn_t stasis_ringbuffer_consume_bytes(stasis_ringbuffer_t * ring, lsn_t* sz, lsn lsn_t orig_sz = *sz; if(ring->flush > ring->rf) { - pthread_mutex_unlock(&ring->mut); - struct timespec tv; - tv.tv_sec = 0; - tv.tv_nsec = 100000; - nanosleep(&tv, 0); - pthread_mutex_lock(&ring->mut); +// pthread_mutex_unlock(&ring->mut); +// struct timespec tv; +// tv.tv_sec = 0; +// tv.tv_nsec = 100000; +// nanosleep(&tv, 0); +// pthread_mutex_lock(&ring->mut); if(ring->flush > ring->rf) { *sz = RING_NEXT; } } if(ring->shutdown) { @@ -259,8 +261,8 @@ stasis_ringbuffer_t * stasis_ringbuffer_init(intptr_t base, lsn_t initial_offset // Allocate the memory region using mmap black magic. char* name = strdup("/dev/shm/stasis-ringbuffer-XXXXXX"); - int fd = mkstemp(name); - if(fd == -1) { perror("Couldn't mkstemp\n"); abort(); } + ring->fd = mkstemp(name); + if(ring->fd == -1) { perror("Couldn't mkstemp\n"); abort(); } int err; @@ -272,7 +274,7 @@ stasis_ringbuffer_t * stasis_ringbuffer_init(intptr_t base, lsn_t initial_offset ring->mask = (1 << base) - 1; lsn_t size = ring->mask+1; - err = ftruncate(fd, size); + err = ftruncate(ring->fd, size); if(err == -1) { perror("Couldn't ftruncate file"); } @@ -280,11 +282,11 @@ stasis_ringbuffer_t * stasis_ringbuffer_init(intptr_t base, lsn_t initial_offset if(ring->mem == MAP_FAILED) { perror("Couldn't mmap anonymous region"); abort(); } - void * errp = mmap(ring->mem, size, PROT_READ|PROT_WRITE, MAP_SHARED|MAP_FIXED, fd, 0); + void * errp = mmap(ring->mem, size, PROT_READ|PROT_WRITE, MAP_SHARED|MAP_FIXED, ring->fd, 0); if(errp == MAP_FAILED) { perror("Couldn't mmap temp region"); abort(); } - errp = mmap(((char*)ring->mem)+size, size, PROT_READ|PROT_WRITE, MAP_SHARED|MAP_FIXED, fd, 0); + errp = mmap(((char*)ring->mem)+size, size, PROT_READ|PROT_WRITE, MAP_SHARED|MAP_FIXED, ring->fd, 0); if(errp == MAP_FAILED) { perror("Couldn't mmap temp region the second time."); abort(); } @@ -325,5 +327,20 @@ void stasis_ringbuffer_shutdown(stasis_ringbuffer_t * ring) { } while (ring->rt < ring->wf); pthread_mutex_unlock(&ring->mut); - // XXX free resources. + + lsn_t size = ring->mask+1; + int err = munmap(((char*)ring->mem), size * 2); + if(err == -1) { perror("could not munmap first half of ringbuffer"); } + munmap(((char*)ring->mem)+size, size); + if(err == -1) { perror("could not munmap second half of ringbuffer"); } + munmap(((char*)ring->mem), size); + if(err == -1) { perror("could not munmap hidden backing region of ringbuffer"); } + + stasis_aggregate_min_deinit(ring->min_reader); + stasis_aggregate_min_deinit(ring->min_writer); + pthread_mutex_destroy(&ring->mut); + pthread_cond_destroy(&ring->read_done); + pthread_cond_destroy(&ring->write_done); + close(ring->fd); + free(ring); }