From 304b439217c546192cf49c68cd01c68b3755d77a Mon Sep 17 00:00:00 2001 From: Sears Russell Date: Thu, 14 Apr 2011 23:03:20 +0000 Subject: [PATCH] new data structures: min aggregate and concurrent ringbuffer --- src/stasis/CMakeLists.txt | 2 +- src/stasis/util/min.c | 27 +++ src/stasis/util/ringbuffer.c | 247 +++++++++++++++++++++++ stasis/common.h | 4 + stasis/util/min.h | 15 ++ stasis/util/ringbuffer.h | 31 +++ test/stasis/CMakeLists.txt | 1 + test/stasis/check_concurrentRingbuffer.c | 217 ++++++++++++++++++++ 8 files changed, 543 insertions(+), 1 deletion(-) create mode 100644 src/stasis/util/min.c create mode 100644 src/stasis/util/ringbuffer.c create mode 100644 stasis/util/min.h create mode 100644 stasis/util/ringbuffer.h create mode 100644 test/stasis/check_concurrentRingbuffer.c diff --git a/src/stasis/CMakeLists.txt b/src/stasis/CMakeLists.txt index bcaba38..e468c9e 100644 --- a/src/stasis/CMakeLists.txt +++ b/src/stasis/CMakeLists.txt @@ -48,7 +48,7 @@ ADD_LIBRARY(stasis crc32.c redblack.c tsearchcompat.c lhtable.c concurrentHash.c bufferManager/pageArray.c bufferManager/bufferHash.c replacementPolicy/lru.c replacementPolicy/lruFast.c replacementPolicy/threadsafeWrapper.c replacementPolicy/concurrentWrapper.c - util/log2.c util/histogram.c util/hashFunctions.c + util/log2.c util/histogram.c util/hashFunctions.c util/min.c util/ringbuffer.c util/multiset.c util/slab.c stlredblack.cpp util/stlslab.cpp) diff --git a/src/stasis/util/min.c b/src/stasis/util/min.c new file mode 100644 index 0000000..0b1b5f9 --- /dev/null +++ b/src/stasis/util/min.c @@ -0,0 +1,27 @@ +#include +#include +#include + +struct stasis_aggregate_min_t { + struct rbtree * tree; +}; + +stasis_aggregate_min_t * stasis_aggregate_min_init(int(*cmp)(const void* a, const void *b, const void *c)) { + stasis_aggregate_min_t * ret = malloc(sizeof(*ret)); + ret->tree = rbinit(cmp,0); + return ret; +} +void stasis_aggregate_min_deinit(stasis_aggregate_min_t * min) { + rbdestroy(min->tree); + free(min); +} +void stasis_aggregate_min_add(stasis_aggregate_min_t * min, void * a) { + rbsearch(a, min->tree); +} +const void * stasis_aggregate_min_remove(stasis_aggregate_min_t * min, void * a) { + return rbdelete(a, min->tree); +} +const void * stasis_aggregate_min_compute(stasis_aggregate_min_t * min) { + return rbmin(min->tree); +} + diff --git a/src/stasis/util/ringbuffer.c b/src/stasis/util/ringbuffer.c new file mode 100644 index 0000000..db091ac --- /dev/null +++ b/src/stasis/util/ringbuffer.c @@ -0,0 +1,247 @@ +#include +#include + +#include +#include +#include +/** + * A ring buffer implementation. + * + * This file implements a ring buffer. Reads and writes are to + * contiguous regions, and are zero-copy. Regions may be populated + * in parallel, and data can be randomly accessed by its offset in + * the buffer's logical address space (which is returned by various API calls). + * Any number of readers and writers may safely read and write data from the + * ringbuffer without further coordination. + * + * The size of the ring buffer must be a power of two (this avoids + * expensive mod operations). + * + * TODO: Currently, in order to support multiple readers and writers, this file + * relies upon red-black trees, which adds a lot of overhead. + */ + +struct stasis_ringbuffer_t { + byte * mem; + int64_t mask; + // Track four regions: write_frontier (wf), write_tail (wt), read_frontier (rf), read_tail (rt): + + // Logical buffer layout: + // byte zero + // ... + + int64_t rt; // First byte that some thread might be reading. Earlier bytes can be reclaimed. + int64_t rf; // First byte that will be returned by "read next". + int64_t wt; // First byte that some thread might be writing. Earlier bytes are stable for readers. + int64_t wf; // First available byte + + // ... + // byte 2^64 + + stasis_aggregate_min_t * min_writer; + stasis_aggregate_min_t * min_reader; + + // Synchronization stuff + pthread_mutex_t mut; + pthread_cond_t read_done; + pthread_cond_t write_done; +}; + +static int cmp_int64_t(const void *ap, const void *bp, const void *ign) { + int64_t a = *(int64_t*)ap; + int64_t b = *(int64_t*)bp; + + return (a < b) ? -1 : + (a > b) ? 1 : 0; +} + +// Does not need synchronization (only called from nb function). +static inline int64_t freespace(stasis_ringbuffer_t * ring) { + int64_t ret = ((ring->rt - ring->wf) - 1) & ring->mask; +// printf("freespace is %lld\n", (long long)ret); + return ret; +} + +// Does not need any synchronization (all fields are read only) +static inline void* ptr_off(stasis_ringbuffer_t * ring, int64_t off) { + return ring->mem + (off & ring->mask); +} + +// Not threadsafe. +int64_t stasis_ringbuffer_nb_reserve_space(stasis_ringbuffer_t * ring, int64_t sz) { + if(freespace(ring) < sz) { return RING_FULL; } + int64_t ret = ring->wf; + ring->wf += sz; + return ret; +} +// Threadsafe (explicit synchronization). Blocks. +int64_t stasis_ringbuffer_reserve_space(stasis_ringbuffer_t * ring, int64_t sz, int64_t * handle) { + pthread_mutex_lock(&ring->mut); + int64_t ret; + while(RING_FULL == (ret = stasis_ringbuffer_nb_reserve_space(ring, sz))) { + pthread_cond_wait(&ring->read_done, &ring->mut); + } + if(handle) { + *handle = ret; + stasis_aggregate_min_add(ring->min_writer, handle); + } + pthread_mutex_unlock(&ring->mut); + return ret; +} +int64_t stasis_ringbuffer_nb_consume_bytes(stasis_ringbuffer_t * ring, int64_t off, int64_t* sz) { + if(off == RING_NEXT) { off = ring->rf; } + if(*sz == RING_NEXT) { *sz = ring->wt - off; } + + // has the entire byte range been consumed? (This is "normal".) + if(off + *sz < ring->rt) { return RING_TRUNCATED; } + + // check to see if only some part of the range has been consumed. + // (Probably bad news for the caller, but not our problem) + + if(off < ring->rt) { return RING_TORN; } + + // part of the byte range is still being written. Recovering from + // this at the caller is probably easy (just wait a bit), but + // something fugly is going on. + if(off + *sz > ring->wt) { return RING_VOLATILE; } + + if(ring->rf < off + *sz) { ring->rf = off + *sz; } + + return off; +} +int64_t stasis_ringbuffer_consume_bytes(stasis_ringbuffer_t * ring, int64_t* sz, int64_t * handle) { + pthread_mutex_lock(&ring->mut); + int64_t ret; + while(RING_VOLATILE == (ret = stasis_ringbuffer_nb_consume_bytes(ring, RING_NEXT, sz))) { + pthread_cond_wait(&ring->write_done, &ring->mut); + } + if(handle) { + *handle = ret; + stasis_aggregate_min_add(ring->min_reader, handle); + } + pthread_mutex_unlock(&ring->mut); + return ret; +} +// Not threadsafe. +const void * stasis_ringbuffer_nb_get_rd_buf(stasis_ringbuffer_t * ring, int64_t off, int64_t sz) { + int64_t off2 = stasis_ringbuffer_nb_consume_bytes(ring, off, &sz); + if(off2 != off) { if(off != RING_NEXT || (off2 < 0 && off2 > RING_MINERR)) { return (const void*) off2; } } + assert(! (off2 < 0 && off2 >= RING_MINERR)); + return ptr_off(ring, off2); +} +// Explicit synchronization (blocks). +const void * stasis_ringbuffer_get_rd_buf(stasis_ringbuffer_t * ring, int64_t off, int64_t* sz) { + pthread_mutex_lock(&ring->mut); + const void * ret; + while(((const void*)RING_VOLATILE) == (ret = stasis_ringbuffer_nb_get_rd_buf(ring, off, *sz))) { + pthread_cond_wait(&ring->write_done, &ring->mut); + } + pthread_mutex_unlock(&ring->mut); + return ret; +} +// No need for synchronization (only touches read-only-fields) +void * stasis_ringbuffer_get_wr_buf(stasis_ringbuffer_t * ring, int64_t off, int64_t sz) { + return ptr_off(ring, off); +} +void stasis_ringbuffer_nb_advance_write_tail(stasis_ringbuffer_t * ring, int64_t off) { + assert(off >= ring->wt); + ring->wt = off; + assert(ring->wt <= ring->wf); +} + +void stasis_ringbuffer_advance_write_tail(stasis_ringbuffer_t * ring, int64_t off) { + pthread_mutex_lock(&ring->mut); + stasis_ringbuffer_nb_advance_write_tail(ring, off); + pthread_cond_broadcast(&ring->write_done); + pthread_mutex_unlock(&ring->mut); +} +void stasis_ringbuffer_write_done(stasis_ringbuffer_t * ring, int64_t * off) { + pthread_mutex_lock(&ring->mut); + stasis_aggregate_min_remove(ring->min_writer, off); + int64_t * new_wtp = (int64_t*)stasis_aggregate_min_compute(ring->min_writer); + int64_t new_wt = new_wtp ? *new_wtp : ring->wf; + if(new_wt != ring->wt) { + stasis_ringbuffer_nb_advance_write_tail(ring, new_wt); + pthread_cond_broadcast(&ring->write_done); + } + pthread_mutex_unlock(&ring->mut); + +} +void stasis_ringbuffer_nb_advance_read_tail(stasis_ringbuffer_t * ring, int64_t off) { + assert(off >= ring->rt); + assert(off <= ring->rf); + ring->rt = off; +} +void stasis_ringbuffer_advance_read_tail(stasis_ringbuffer_t * ring, int64_t off) { + pthread_mutex_lock(&ring->mut); + stasis_ringbuffer_nb_advance_read_tail(ring, off); + pthread_cond_broadcast(&ring->read_done); + pthread_mutex_unlock(&ring->mut); +} +void stasis_ringbuffer_read_done(stasis_ringbuffer_t * ring, int64_t * off) { + pthread_mutex_lock(&ring->mut); + stasis_aggregate_min_remove(ring->min_reader, off); + int64_t * new_rtp = (int64_t*)stasis_aggregate_min_compute(ring->min_reader); + int64_t new_rt = new_rtp ? *new_rtp : ring->rf; + if(new_rt != ring->rt) { + stasis_ringbuffer_nb_advance_read_tail(ring, new_rt); + pthread_cond_broadcast(&ring->read_done); + } + pthread_mutex_unlock(&ring->mut); +} + +stasis_ringbuffer_t * stasis_ringbuffer_init(intptr_t base, int64_t initial_offset) { + + if(base < 12) { + fprintf(stderr, "can't allocate ringbuffer that is less than 4096 bytes.\n"); + return 0; + } + + stasis_ringbuffer_t * ring = malloc(sizeof(*ring)); + + // Allocate the memory region using mmap black magic. + + char* name = strdup("/dev/shm/stasis-ringbuffer-XXXXXX"); + int fd = mkstemp(name); + if(fd == -1) { perror("Couldn't mkstemp\n"); abort(); } + + int err; + + err = unlink(name); + + if(err == -1) { perror("Couldn't unlink mkstemp file\n"); } + + free(name); + + ring->mask = (1 << base) - 1; + int64_t size = ring->mask+1; + err = ftruncate(fd, size); + + if(err == -1) { perror("Couldn't ftruncate file"); } + + ring->mem = mmap(0, size*2, PROT_NONE, MAP_ANONYMOUS|MAP_PRIVATE, -1, 0); + + if(ring->mem == MAP_FAILED) { perror("Couldn't mmap anonymous region"); abort(); } + + void * errp = mmap(ring->mem, size, PROT_READ|PROT_WRITE, MAP_SHARED|MAP_FIXED, fd, 0); + + if(errp == MAP_FAILED) { perror("Couldn't mmap temp region"); abort(); } + + errp = mmap(((char*)ring->mem)+size, size, PROT_READ|PROT_WRITE, MAP_SHARED|MAP_FIXED, fd, 0); + + if(errp == MAP_FAILED) { perror("Couldn't mmap temp region the second time."); abort(); } + + // Done with the black magic. + + ring->rt = ring->rf = ring->wt = ring->wf = 0; + + ring->min_reader = stasis_aggregate_min_init(cmp_int64_t); + ring->min_writer = stasis_aggregate_min_init(cmp_int64_t); + + pthread_mutex_init(&ring->mut,0); + pthread_cond_init(&ring->read_done,0); + pthread_cond_init(&ring->write_done,0); + + return ring; + +} diff --git a/stasis/common.h b/stasis/common.h index 3c393ac..765c481 100644 --- a/stasis/common.h +++ b/stasis/common.h @@ -63,6 +63,10 @@ terms specified in this license. #ifndef _XOPEN_SOURCE #define _XOPEN_SOURCE 600 #endif +#ifndef _BSD_SOURCE +#define _BSD_SOURCE +#endif + #include // uint32, et. al. (has to be before sys/types.h for mcpp atop some broken gcc headers) #include #include // for size_t diff --git a/stasis/util/min.h b/stasis/util/min.h new file mode 100644 index 0000000..87bb02a --- /dev/null +++ b/stasis/util/min.h @@ -0,0 +1,15 @@ +#ifndef STASIS_UTIL_MIN_H +#define STASIS_UTIL_MIN_H +#include +BEGIN_C_DECLS + +typedef struct stasis_aggregate_min_t stasis_aggregate_min_t; + +stasis_aggregate_min_t * stasis_aggregate_min_init(int(*cmp)(const void *a, const void *b, const void *c)); +void stasis_aggregate_min_deinit(stasis_aggregate_min_t * min); +void stasis_aggregate_min_add(stasis_aggregate_min_t * min, void * a); +const void * stasis_aggregate_min_remove(stasis_aggregate_min_t * min, void * b); +const void * stasis_aggregate_min_compute(stasis_aggregate_min_t * min); + +END_C_DECLS +#endif diff --git a/stasis/util/ringbuffer.h b/stasis/util/ringbuffer.h new file mode 100644 index 0000000..8d8ed80 --- /dev/null +++ b/stasis/util/ringbuffer.h @@ -0,0 +1,31 @@ +/* + * ringbuffer.h + * + * Created on: Apr 1, 2011 + * Author: sears + */ + +#ifndef RINGBUFFER_H_ +#define RINGBUFFER_H_ +#include +BEGIN_C_DECLS + +typedef struct stasis_ringbuffer_t stasis_ringbuffer_t; + +stasis_ringbuffer_t * stasis_ringbuffer_init(intptr_t size, int64_t initial_offset); +int64_t stasis_ringbuffer_nb_reserve_space(stasis_ringbuffer_t * ring, int64_t sz); +int64_t stasis_ringbuffer_reserve_space(stasis_ringbuffer_t * ring, int64_t sz, int64_t * handle); +void stasis_ringbuffer_read_done(stasis_ringbuffer_t * ring, int64_t * handle); +void stasis_ringbuffer_advance_write_tail(stasis_ringbuffer_t * ring, int64_t off); +const void * stasis_ringbuffer_nb_get_rd_buf(stasis_ringbuffer_t * ring, int64_t off, int64_t sz); +// sz is a pointer to the desired size, or RING_NEXT for "as many bytes as possible" +int64_t stasis_ringbuffer_consume_bytes(stasis_ringbuffer_t * ring, int64_t* sz, int64_t * handle); +void stasis_ringbuffer_write_done(stasis_ringbuffer_t * ring, int64_t * handle); +// sz is a pointer to the desired size, or RING_NEXT for "as many bytes as possible" +const void * stasis_ringbuffer_get_rd_buf(stasis_ringbuffer_t * ring, int64_t off, int64_t* sz); +void * stasis_ringbuffer_get_wr_buf(stasis_ringbuffer_t * ring, int64_t off, int64_t sz); +void stasis_ringbuffer_advance_read_tail(stasis_ringbuffer_t * ring, int64_t off); +typedef enum { RING_TORN = -1, RING_VOLATILE = -2, RING_FULL = -3, RING_TRUNCATED = -4, RING_NEXT = -5, RING_MINERR = -6 } stasis_ringbuffer_error_t; + +END_C_DECLS +#endif /* RINGBUFFER_H_ */ diff --git a/test/stasis/CMakeLists.txt b/test/stasis/CMakeLists.txt index 5d2a935..209c9c5 100644 --- a/test/stasis/CMakeLists.txt +++ b/test/stasis/CMakeLists.txt @@ -2,6 +2,7 @@ SUBDIRS(fault_injection) CREATE_CHECK(check_redblack) CREATE_CHECK(check_concurrentHash) CREATE_CHECK(check_lhtable) +CREATE_CHECK(check_concurrentRingbuffer) CREATE_CHECK(check_logEntry) CREATE_CHECK(check_logWriter) CREATE_CHECK(check_page) diff --git a/test/stasis/check_concurrentRingbuffer.c b/test/stasis/check_concurrentRingbuffer.c new file mode 100644 index 0000000..31714ff --- /dev/null +++ b/test/stasis/check_concurrentRingbuffer.c @@ -0,0 +1,217 @@ +/*--- +This software is copyrighted by the Regents of the University of +California, and other parties. The following terms apply to all files +associated with the software unless explicitly disclaimed in +individual files. + +The authors hereby grant permission to use, copy, modify, distribute, +and license this software and its documentation for any purpose, +provided that existing copyright notices are retained in all copies +and that this notice is included verbatim in any distributions. No +written agreement, license, or royalty fee is required for any of the +authorized uses. Modifications to this software may be copyrighted by +their authors and need not follow the licensing terms described here, +provided that the new terms are clearly indicated on the first page of +each file where they apply. + +IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY +FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES +ARISING OUT OF THE USE OF THIS SOFTWARE, ITS DOCUMENTATION, OR ANY +DERIVATIVES THEREOF, EVEN IF THE AUTHORS HAVE BEEN ADVISED OF THE +POSSIBILITY OF SUCH DAMAGE. + +THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES, +INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND +NON-INFRINGEMENT. THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, AND +THE AUTHORS AND DISTRIBUTORS HAVE NO OBLIGATION TO PROVIDE +MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. + +GOVERNMENT USE: If you are acquiring this software on behalf of the +U.S. government, the Government shall have only "Restricted Rights" in +the software and related documentation as defined in the Federal +Acquisition Regulations (FARs) in Clause 52.227.19 (c) (2). If you are +acquiring the software on behalf of the Department of Defense, the +software shall be classified as "Commercial Computer Software" and the +Government shall have only "Restricted Rights" as defined in Clause +252.227-7013 (c) (1) of DFARs. Notwithstanding the foregoing, the +authors grant the U.S. Government and others acting in its behalf +permission to use and distribute the software in accordance with the +terms specified in this license. +---*/ +#include "../check_includes.h" + +#include + +#include +#include +#include + +#define LOG_NAME "check_concurrentRingbuffer.log" + +/** + @test + +*/ + +#define NUM_ENTRIES 10000 + +START_TEST(ringBufferSmokeTest) { + stasis_ringbuffer_t * ring = stasis_ringbuffer_init(12, 0); + assert((void*)RING_VOLATILE == stasis_ringbuffer_nb_get_rd_buf(ring, 0, 1)); + int64_t off = stasis_ringbuffer_nb_reserve_space(ring, 4*900); + assert(off != RING_FULL && off != RING_TORN && off != RING_VOLATILE); + void * buf = stasis_ringbuffer_get_wr_buf(ring, off, 4*900); + + int64_t off2 = stasis_ringbuffer_nb_reserve_space(ring, 4*400); + assert(off2 == RING_FULL); + + stasis_ringbuffer_advance_write_tail(ring, 4*300); + const void * buf3 = stasis_ringbuffer_nb_get_rd_buf(ring, 0,4*300); + assert(buf3 == buf); + + stasis_ringbuffer_advance_read_tail(ring, 4*300); + + int64_t off4 = stasis_ringbuffer_nb_reserve_space(ring, 4*400); + assert(off4 != RING_FULL && off4 != RING_TORN && off4 != RING_VOLATILE); + + // XXX stasis_ringbuffer_deinit(ring); +} END_TEST + +#define PROD_CONS_SIZE (100L * 1024L * 1024L) +static void * consumerWorker(void * arg) { + stasis_ringbuffer_t * ring = arg; + int64_t cursor = 0; + while(cursor < PROD_CONS_SIZE) { + int64_t rnd_size = myrandom(2048); + if(rnd_size + cursor > PROD_CONS_SIZE) { rnd_size = PROD_CONS_SIZE - cursor; } + byte const * rd_buf = stasis_ringbuffer_get_rd_buf(ring, RING_NEXT, &rnd_size); + for(uint64_t i = 0; i < rnd_size; i++) { + // printf("R[%lld] (addr=%lld) val = %d (%d)\n", cursor+i, (long long)(rd_buf)+i, rd_buf[i], (cursor+i)%250); + assert(rd_buf[i] == ((cursor + i)%250)); + } + cursor += rnd_size; + stasis_ringbuffer_advance_read_tail(ring, cursor); + } + return 0; +} +static void * producerWorker(void * arg) { + stasis_ringbuffer_t * ring = arg; + int64_t cursor = 0; + while(cursor < PROD_CONS_SIZE) { + int rnd_size = myrandom(2048); + if(rnd_size + cursor > PROD_CONS_SIZE) { rnd_size = PROD_CONS_SIZE - cursor; } + int64_t wr_off = stasis_ringbuffer_reserve_space(ring, rnd_size, 0); + assert(wr_off == cursor); + byte * wr_buf = stasis_ringbuffer_get_wr_buf(ring, wr_off, rnd_size); + for(uint64_t i = 0; i < rnd_size; i++) { + wr_buf[i] = (cursor + i)%250; +// printf("W[%d] (addr=%lld) val = %d\n", cursor+i, (long long)(wr_buf)+i, wr_buf[i]); + } + cursor += rnd_size; + stasis_ringbuffer_advance_write_tail(ring, cursor); + } + return 0; +} + +START_TEST(ringBufferProducerConsumerTest) { + stasis_ringbuffer_t * ring = stasis_ringbuffer_init(12, 0); + pthread_t reader, writer; + pthread_create(&reader, 0, consumerWorker, ring); + pthread_create(&writer, 0, producerWorker, ring); + pthread_join(reader, 0); + pthread_join(writer, 0); + // XXX stasis_ringbuffer_deinit(ring); +} END_TEST + +#define NUM_READERS 20 +#define NUM_WRITERS NUM_READERS +#define BYTES_PER_THREAD (10L * 1000L * 1000L) + +typedef struct { + pthread_mutex_t wr_mut; + stasis_ringbuffer_t * ring; +} arg; +static void * concurrentReader(void * argp) { + arg * a = argp; + stasis_ringbuffer_t * ring = a->ring; + uint64_t cursor = 0; + int64_t rd_handle; + while(cursor < BYTES_PER_THREAD) { + int64_t rnd_size = 1+myrandom(2047/NUM_READERS); + if(rnd_size + cursor > BYTES_PER_THREAD) { rnd_size = BYTES_PER_THREAD - cursor; } + stasis_ringbuffer_consume_bytes(ring, &rnd_size, &rd_handle); + + byte const * rd_buf = stasis_ringbuffer_get_rd_buf(ring, rd_handle, &rnd_size); + + for(uint64_t i = 0; i < rnd_size; i++) { + // printf("R[%lld] (addr=%lld) val = %d (%d)\n", cursor+i, (long long)(rd_buf)+i, rd_buf[i], (cursor+i)%250); + assert(rd_buf[i] == ((rd_handle + i)%250)); + } + cursor += rnd_size; + stasis_ringbuffer_read_done(ring, &rd_handle); + } + return 0; +} +static void * concurrentWriter(void * argp) { + arg * a = argp; + stasis_ringbuffer_t * ring = a->ring; + uint64_t cursor = 0; + int64_t wr_handle; + while(cursor < BYTES_PER_THREAD) { + int rnd_size = 1+myrandom(2047/NUM_WRITERS); + if(rnd_size + cursor > BYTES_PER_THREAD) { rnd_size = BYTES_PER_THREAD- cursor; } + stasis_ringbuffer_reserve_space(ring, rnd_size, &wr_handle); + byte * wr_buf = stasis_ringbuffer_get_wr_buf(ring, wr_handle, rnd_size); + for(uint64_t i = 0; i < rnd_size; i++) { + wr_buf[i] = (wr_handle + i)%250; +// printf("W[%d] (addr=%lld) val = %d\n", cursor+i, (long long)(wr_buf)+i, wr_buf[i]); + } + cursor += rnd_size; + stasis_ringbuffer_write_done(ring, &wr_handle); + } + return 0; +} +START_TEST(ringBufferConcurrentProducerConsumerTest) { + arg a = { + PTHREAD_MUTEX_INITIALIZER, + stasis_ringbuffer_init(12, 0), + }; + pthread_t readers[NUM_READERS]; + pthread_t writers[NUM_WRITERS]; + for(int i = 0; i < NUM_READERS; i++) { + pthread_create(&readers[i], 0, concurrentReader, &a); + } + for(int i = 0; i < NUM_WRITERS; i++) { + pthread_create(&writers[i], 0, concurrentWriter, &a); + } + for(int i = 0; i < NUM_READERS; i++) { + pthread_join(readers[i], 0); + } + for(int i = 0; i < NUM_WRITERS; i++) { + pthread_join(writers[i], 0); + } + +} END_TEST + +Suite * check_suite(void) { + Suite *s = suite_create("ringBuffer"); + /* Begin a new test */ + TCase *tc = tcase_create("ringBuffer"); + + /* Sub tests are added, one per line, here */ + + tcase_add_test(tc, ringBufferSmokeTest); + tcase_add_test(tc, ringBufferProducerConsumerTest); + tcase_add_test(tc, ringBufferConcurrentProducerConsumerTest); + + /* --------------------------------------------- */ + + tcase_add_checked_fixture(tc, setup, teardown); + + + suite_add_tcase(s, tc); + return s; +} + +#include "../check_setup.h"