new data structures: min aggregate and concurrent ringbuffer
This commit is contained in:
parent
e1201970b6
commit
304b439217
8 changed files with 543 additions and 1 deletions
|
@ -48,7 +48,7 @@ ADD_LIBRARY(stasis crc32.c redblack.c tsearchcompat.c lhtable.c concurrentHash.c
|
|||
bufferManager/pageArray.c
|
||||
bufferManager/bufferHash.c replacementPolicy/lru.c
|
||||
replacementPolicy/lruFast.c replacementPolicy/threadsafeWrapper.c replacementPolicy/concurrentWrapper.c
|
||||
util/log2.c util/histogram.c util/hashFunctions.c
|
||||
util/log2.c util/histogram.c util/hashFunctions.c util/min.c util/ringbuffer.c
|
||||
util/multiset.c util/slab.c
|
||||
stlredblack.cpp util/stlslab.cpp)
|
||||
|
||||
|
|
27
src/stasis/util/min.c
Normal file
27
src/stasis/util/min.c
Normal file
|
@ -0,0 +1,27 @@
|
|||
#include <stasis/common.h>
|
||||
#include <stasis/util/min.h>
|
||||
#include <stasis/redblack.h>
|
||||
|
||||
struct stasis_aggregate_min_t {
|
||||
struct rbtree * tree;
|
||||
};
|
||||
|
||||
stasis_aggregate_min_t * stasis_aggregate_min_init(int(*cmp)(const void* a, const void *b, const void *c)) {
|
||||
stasis_aggregate_min_t * ret = malloc(sizeof(*ret));
|
||||
ret->tree = rbinit(cmp,0);
|
||||
return ret;
|
||||
}
|
||||
void stasis_aggregate_min_deinit(stasis_aggregate_min_t * min) {
|
||||
rbdestroy(min->tree);
|
||||
free(min);
|
||||
}
|
||||
void stasis_aggregate_min_add(stasis_aggregate_min_t * min, void * a) {
|
||||
rbsearch(a, min->tree);
|
||||
}
|
||||
const void * stasis_aggregate_min_remove(stasis_aggregate_min_t * min, void * a) {
|
||||
return rbdelete(a, min->tree);
|
||||
}
|
||||
const void * stasis_aggregate_min_compute(stasis_aggregate_min_t * min) {
|
||||
return rbmin(min->tree);
|
||||
}
|
||||
|
247
src/stasis/util/ringbuffer.c
Normal file
247
src/stasis/util/ringbuffer.c
Normal file
|
@ -0,0 +1,247 @@
|
|||
#include <stasis/util/ringbuffer.h>
|
||||
#include <stasis/util/min.h>
|
||||
|
||||
#include <assert.h>
|
||||
#include <sys/mman.h>
|
||||
#include <stdio.h>
|
||||
/**
|
||||
* A ring buffer implementation.
|
||||
*
|
||||
* This file implements a ring buffer. Reads and writes are to
|
||||
* contiguous regions, and are zero-copy. Regions may be populated
|
||||
* in parallel, and data can be randomly accessed by its offset in
|
||||
* the buffer's logical address space (which is returned by various API calls).
|
||||
* Any number of readers and writers may safely read and write data from the
|
||||
* ringbuffer without further coordination.
|
||||
*
|
||||
* The size of the ring buffer must be a power of two (this avoids
|
||||
* expensive mod operations).
|
||||
*
|
||||
* TODO: Currently, in order to support multiple readers and writers, this file
|
||||
* relies upon red-black trees, which adds a lot of overhead.
|
||||
*/
|
||||
|
||||
struct stasis_ringbuffer_t {
|
||||
byte * mem;
|
||||
int64_t mask;
|
||||
// Track four regions: write_frontier (wf), write_tail (wt), read_frontier (rf), read_tail (rt):
|
||||
|
||||
// Logical buffer layout:
|
||||
// byte zero
|
||||
// ...
|
||||
|
||||
int64_t rt; // First byte that some thread might be reading. Earlier bytes can be reclaimed.
|
||||
int64_t rf; // First byte that will be returned by "read next".
|
||||
int64_t wt; // First byte that some thread might be writing. Earlier bytes are stable for readers.
|
||||
int64_t wf; // First available byte
|
||||
|
||||
// ...
|
||||
// byte 2^64
|
||||
|
||||
stasis_aggregate_min_t * min_writer;
|
||||
stasis_aggregate_min_t * min_reader;
|
||||
|
||||
// Synchronization stuff
|
||||
pthread_mutex_t mut;
|
||||
pthread_cond_t read_done;
|
||||
pthread_cond_t write_done;
|
||||
};
|
||||
|
||||
static int cmp_int64_t(const void *ap, const void *bp, const void *ign) {
|
||||
int64_t a = *(int64_t*)ap;
|
||||
int64_t b = *(int64_t*)bp;
|
||||
|
||||
return (a < b) ? -1 :
|
||||
(a > b) ? 1 : 0;
|
||||
}
|
||||
|
||||
// Does not need synchronization (only called from nb function).
|
||||
static inline int64_t freespace(stasis_ringbuffer_t * ring) {
|
||||
int64_t ret = ((ring->rt - ring->wf) - 1) & ring->mask;
|
||||
// printf("freespace is %lld\n", (long long)ret);
|
||||
return ret;
|
||||
}
|
||||
|
||||
// Does not need any synchronization (all fields are read only)
|
||||
static inline void* ptr_off(stasis_ringbuffer_t * ring, int64_t off) {
|
||||
return ring->mem + (off & ring->mask);
|
||||
}
|
||||
|
||||
// Not threadsafe.
|
||||
int64_t stasis_ringbuffer_nb_reserve_space(stasis_ringbuffer_t * ring, int64_t sz) {
|
||||
if(freespace(ring) < sz) { return RING_FULL; }
|
||||
int64_t ret = ring->wf;
|
||||
ring->wf += sz;
|
||||
return ret;
|
||||
}
|
||||
// Threadsafe (explicit synchronization). Blocks.
|
||||
int64_t stasis_ringbuffer_reserve_space(stasis_ringbuffer_t * ring, int64_t sz, int64_t * handle) {
|
||||
pthread_mutex_lock(&ring->mut);
|
||||
int64_t ret;
|
||||
while(RING_FULL == (ret = stasis_ringbuffer_nb_reserve_space(ring, sz))) {
|
||||
pthread_cond_wait(&ring->read_done, &ring->mut);
|
||||
}
|
||||
if(handle) {
|
||||
*handle = ret;
|
||||
stasis_aggregate_min_add(ring->min_writer, handle);
|
||||
}
|
||||
pthread_mutex_unlock(&ring->mut);
|
||||
return ret;
|
||||
}
|
||||
int64_t stasis_ringbuffer_nb_consume_bytes(stasis_ringbuffer_t * ring, int64_t off, int64_t* sz) {
|
||||
if(off == RING_NEXT) { off = ring->rf; }
|
||||
if(*sz == RING_NEXT) { *sz = ring->wt - off; }
|
||||
|
||||
// has the entire byte range been consumed? (This is "normal".)
|
||||
if(off + *sz < ring->rt) { return RING_TRUNCATED; }
|
||||
|
||||
// check to see if only some part of the range has been consumed.
|
||||
// (Probably bad news for the caller, but not our problem)
|
||||
|
||||
if(off < ring->rt) { return RING_TORN; }
|
||||
|
||||
// part of the byte range is still being written. Recovering from
|
||||
// this at the caller is probably easy (just wait a bit), but
|
||||
// something fugly is going on.
|
||||
if(off + *sz > ring->wt) { return RING_VOLATILE; }
|
||||
|
||||
if(ring->rf < off + *sz) { ring->rf = off + *sz; }
|
||||
|
||||
return off;
|
||||
}
|
||||
int64_t stasis_ringbuffer_consume_bytes(stasis_ringbuffer_t * ring, int64_t* sz, int64_t * handle) {
|
||||
pthread_mutex_lock(&ring->mut);
|
||||
int64_t ret;
|
||||
while(RING_VOLATILE == (ret = stasis_ringbuffer_nb_consume_bytes(ring, RING_NEXT, sz))) {
|
||||
pthread_cond_wait(&ring->write_done, &ring->mut);
|
||||
}
|
||||
if(handle) {
|
||||
*handle = ret;
|
||||
stasis_aggregate_min_add(ring->min_reader, handle);
|
||||
}
|
||||
pthread_mutex_unlock(&ring->mut);
|
||||
return ret;
|
||||
}
|
||||
// Not threadsafe.
|
||||
const void * stasis_ringbuffer_nb_get_rd_buf(stasis_ringbuffer_t * ring, int64_t off, int64_t sz) {
|
||||
int64_t off2 = stasis_ringbuffer_nb_consume_bytes(ring, off, &sz);
|
||||
if(off2 != off) { if(off != RING_NEXT || (off2 < 0 && off2 > RING_MINERR)) { return (const void*) off2; } }
|
||||
assert(! (off2 < 0 && off2 >= RING_MINERR));
|
||||
return ptr_off(ring, off2);
|
||||
}
|
||||
// Explicit synchronization (blocks).
|
||||
const void * stasis_ringbuffer_get_rd_buf(stasis_ringbuffer_t * ring, int64_t off, int64_t* sz) {
|
||||
pthread_mutex_lock(&ring->mut);
|
||||
const void * ret;
|
||||
while(((const void*)RING_VOLATILE) == (ret = stasis_ringbuffer_nb_get_rd_buf(ring, off, *sz))) {
|
||||
pthread_cond_wait(&ring->write_done, &ring->mut);
|
||||
}
|
||||
pthread_mutex_unlock(&ring->mut);
|
||||
return ret;
|
||||
}
|
||||
// No need for synchronization (only touches read-only-fields)
|
||||
void * stasis_ringbuffer_get_wr_buf(stasis_ringbuffer_t * ring, int64_t off, int64_t sz) {
|
||||
return ptr_off(ring, off);
|
||||
}
|
||||
void stasis_ringbuffer_nb_advance_write_tail(stasis_ringbuffer_t * ring, int64_t off) {
|
||||
assert(off >= ring->wt);
|
||||
ring->wt = off;
|
||||
assert(ring->wt <= ring->wf);
|
||||
}
|
||||
|
||||
void stasis_ringbuffer_advance_write_tail(stasis_ringbuffer_t * ring, int64_t off) {
|
||||
pthread_mutex_lock(&ring->mut);
|
||||
stasis_ringbuffer_nb_advance_write_tail(ring, off);
|
||||
pthread_cond_broadcast(&ring->write_done);
|
||||
pthread_mutex_unlock(&ring->mut);
|
||||
}
|
||||
void stasis_ringbuffer_write_done(stasis_ringbuffer_t * ring, int64_t * off) {
|
||||
pthread_mutex_lock(&ring->mut);
|
||||
stasis_aggregate_min_remove(ring->min_writer, off);
|
||||
int64_t * new_wtp = (int64_t*)stasis_aggregate_min_compute(ring->min_writer);
|
||||
int64_t new_wt = new_wtp ? *new_wtp : ring->wf;
|
||||
if(new_wt != ring->wt) {
|
||||
stasis_ringbuffer_nb_advance_write_tail(ring, new_wt);
|
||||
pthread_cond_broadcast(&ring->write_done);
|
||||
}
|
||||
pthread_mutex_unlock(&ring->mut);
|
||||
|
||||
}
|
||||
void stasis_ringbuffer_nb_advance_read_tail(stasis_ringbuffer_t * ring, int64_t off) {
|
||||
assert(off >= ring->rt);
|
||||
assert(off <= ring->rf);
|
||||
ring->rt = off;
|
||||
}
|
||||
void stasis_ringbuffer_advance_read_tail(stasis_ringbuffer_t * ring, int64_t off) {
|
||||
pthread_mutex_lock(&ring->mut);
|
||||
stasis_ringbuffer_nb_advance_read_tail(ring, off);
|
||||
pthread_cond_broadcast(&ring->read_done);
|
||||
pthread_mutex_unlock(&ring->mut);
|
||||
}
|
||||
void stasis_ringbuffer_read_done(stasis_ringbuffer_t * ring, int64_t * off) {
|
||||
pthread_mutex_lock(&ring->mut);
|
||||
stasis_aggregate_min_remove(ring->min_reader, off);
|
||||
int64_t * new_rtp = (int64_t*)stasis_aggregate_min_compute(ring->min_reader);
|
||||
int64_t new_rt = new_rtp ? *new_rtp : ring->rf;
|
||||
if(new_rt != ring->rt) {
|
||||
stasis_ringbuffer_nb_advance_read_tail(ring, new_rt);
|
||||
pthread_cond_broadcast(&ring->read_done);
|
||||
}
|
||||
pthread_mutex_unlock(&ring->mut);
|
||||
}
|
||||
|
||||
stasis_ringbuffer_t * stasis_ringbuffer_init(intptr_t base, int64_t initial_offset) {
|
||||
|
||||
if(base < 12) {
|
||||
fprintf(stderr, "can't allocate ringbuffer that is less than 4096 bytes.\n");
|
||||
return 0;
|
||||
}
|
||||
|
||||
stasis_ringbuffer_t * ring = malloc(sizeof(*ring));
|
||||
|
||||
// Allocate the memory region using mmap black magic.
|
||||
|
||||
char* name = strdup("/dev/shm/stasis-ringbuffer-XXXXXX");
|
||||
int fd = mkstemp(name);
|
||||
if(fd == -1) { perror("Couldn't mkstemp\n"); abort(); }
|
||||
|
||||
int err;
|
||||
|
||||
err = unlink(name);
|
||||
|
||||
if(err == -1) { perror("Couldn't unlink mkstemp file\n"); }
|
||||
|
||||
free(name);
|
||||
|
||||
ring->mask = (1 << base) - 1;
|
||||
int64_t size = ring->mask+1;
|
||||
err = ftruncate(fd, size);
|
||||
|
||||
if(err == -1) { perror("Couldn't ftruncate file"); }
|
||||
|
||||
ring->mem = mmap(0, size*2, PROT_NONE, MAP_ANONYMOUS|MAP_PRIVATE, -1, 0);
|
||||
|
||||
if(ring->mem == MAP_FAILED) { perror("Couldn't mmap anonymous region"); abort(); }
|
||||
|
||||
void * errp = mmap(ring->mem, size, PROT_READ|PROT_WRITE, MAP_SHARED|MAP_FIXED, fd, 0);
|
||||
|
||||
if(errp == MAP_FAILED) { perror("Couldn't mmap temp region"); abort(); }
|
||||
|
||||
errp = mmap(((char*)ring->mem)+size, size, PROT_READ|PROT_WRITE, MAP_SHARED|MAP_FIXED, fd, 0);
|
||||
|
||||
if(errp == MAP_FAILED) { perror("Couldn't mmap temp region the second time."); abort(); }
|
||||
|
||||
// Done with the black magic.
|
||||
|
||||
ring->rt = ring->rf = ring->wt = ring->wf = 0;
|
||||
|
||||
ring->min_reader = stasis_aggregate_min_init(cmp_int64_t);
|
||||
ring->min_writer = stasis_aggregate_min_init(cmp_int64_t);
|
||||
|
||||
pthread_mutex_init(&ring->mut,0);
|
||||
pthread_cond_init(&ring->read_done,0);
|
||||
pthread_cond_init(&ring->write_done,0);
|
||||
|
||||
return ring;
|
||||
|
||||
}
|
|
@ -63,6 +63,10 @@ terms specified in this license.
|
|||
#ifndef _XOPEN_SOURCE
|
||||
#define _XOPEN_SOURCE 600
|
||||
#endif
|
||||
#ifndef _BSD_SOURCE
|
||||
#define _BSD_SOURCE
|
||||
#endif
|
||||
|
||||
#include <stdint.h> // uint32, et. al. (has to be before sys/types.h for mcpp atop some broken gcc headers)
|
||||
#include <fcntl.h>
|
||||
#include <sys/types.h> // for size_t
|
||||
|
|
15
stasis/util/min.h
Normal file
15
stasis/util/min.h
Normal file
|
@ -0,0 +1,15 @@
|
|||
#ifndef STASIS_UTIL_MIN_H
|
||||
#define STASIS_UTIL_MIN_H
|
||||
#include <stasis/common.h>
|
||||
BEGIN_C_DECLS
|
||||
|
||||
typedef struct stasis_aggregate_min_t stasis_aggregate_min_t;
|
||||
|
||||
stasis_aggregate_min_t * stasis_aggregate_min_init(int(*cmp)(const void *a, const void *b, const void *c));
|
||||
void stasis_aggregate_min_deinit(stasis_aggregate_min_t * min);
|
||||
void stasis_aggregate_min_add(stasis_aggregate_min_t * min, void * a);
|
||||
const void * stasis_aggregate_min_remove(stasis_aggregate_min_t * min, void * b);
|
||||
const void * stasis_aggregate_min_compute(stasis_aggregate_min_t * min);
|
||||
|
||||
END_C_DECLS
|
||||
#endif
|
31
stasis/util/ringbuffer.h
Normal file
31
stasis/util/ringbuffer.h
Normal file
|
@ -0,0 +1,31 @@
|
|||
/*
|
||||
* ringbuffer.h
|
||||
*
|
||||
* Created on: Apr 1, 2011
|
||||
* Author: sears
|
||||
*/
|
||||
|
||||
#ifndef RINGBUFFER_H_
|
||||
#define RINGBUFFER_H_
|
||||
#include <stasis/common.h>
|
||||
BEGIN_C_DECLS
|
||||
|
||||
typedef struct stasis_ringbuffer_t stasis_ringbuffer_t;
|
||||
|
||||
stasis_ringbuffer_t * stasis_ringbuffer_init(intptr_t size, int64_t initial_offset);
|
||||
int64_t stasis_ringbuffer_nb_reserve_space(stasis_ringbuffer_t * ring, int64_t sz);
|
||||
int64_t stasis_ringbuffer_reserve_space(stasis_ringbuffer_t * ring, int64_t sz, int64_t * handle);
|
||||
void stasis_ringbuffer_read_done(stasis_ringbuffer_t * ring, int64_t * handle);
|
||||
void stasis_ringbuffer_advance_write_tail(stasis_ringbuffer_t * ring, int64_t off);
|
||||
const void * stasis_ringbuffer_nb_get_rd_buf(stasis_ringbuffer_t * ring, int64_t off, int64_t sz);
|
||||
// sz is a pointer to the desired size, or RING_NEXT for "as many bytes as possible"
|
||||
int64_t stasis_ringbuffer_consume_bytes(stasis_ringbuffer_t * ring, int64_t* sz, int64_t * handle);
|
||||
void stasis_ringbuffer_write_done(stasis_ringbuffer_t * ring, int64_t * handle);
|
||||
// sz is a pointer to the desired size, or RING_NEXT for "as many bytes as possible"
|
||||
const void * stasis_ringbuffer_get_rd_buf(stasis_ringbuffer_t * ring, int64_t off, int64_t* sz);
|
||||
void * stasis_ringbuffer_get_wr_buf(stasis_ringbuffer_t * ring, int64_t off, int64_t sz);
|
||||
void stasis_ringbuffer_advance_read_tail(stasis_ringbuffer_t * ring, int64_t off);
|
||||
typedef enum { RING_TORN = -1, RING_VOLATILE = -2, RING_FULL = -3, RING_TRUNCATED = -4, RING_NEXT = -5, RING_MINERR = -6 } stasis_ringbuffer_error_t;
|
||||
|
||||
END_C_DECLS
|
||||
#endif /* RINGBUFFER_H_ */
|
|
@ -2,6 +2,7 @@ SUBDIRS(fault_injection)
|
|||
CREATE_CHECK(check_redblack)
|
||||
CREATE_CHECK(check_concurrentHash)
|
||||
CREATE_CHECK(check_lhtable)
|
||||
CREATE_CHECK(check_concurrentRingbuffer)
|
||||
CREATE_CHECK(check_logEntry)
|
||||
CREATE_CHECK(check_logWriter)
|
||||
CREATE_CHECK(check_page)
|
||||
|
|
217
test/stasis/check_concurrentRingbuffer.c
Normal file
217
test/stasis/check_concurrentRingbuffer.c
Normal file
|
@ -0,0 +1,217 @@
|
|||
/*---
|
||||
This software is copyrighted by the Regents of the University of
|
||||
California, and other parties. The following terms apply to all files
|
||||
associated with the software unless explicitly disclaimed in
|
||||
individual files.
|
||||
|
||||
The authors hereby grant permission to use, copy, modify, distribute,
|
||||
and license this software and its documentation for any purpose,
|
||||
provided that existing copyright notices are retained in all copies
|
||||
and that this notice is included verbatim in any distributions. No
|
||||
written agreement, license, or royalty fee is required for any of the
|
||||
authorized uses. Modifications to this software may be copyrighted by
|
||||
their authors and need not follow the licensing terms described here,
|
||||
provided that the new terms are clearly indicated on the first page of
|
||||
each file where they apply.
|
||||
|
||||
IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY
|
||||
FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
|
||||
ARISING OUT OF THE USE OF THIS SOFTWARE, ITS DOCUMENTATION, OR ANY
|
||||
DERIVATIVES THEREOF, EVEN IF THE AUTHORS HAVE BEEN ADVISED OF THE
|
||||
POSSIBILITY OF SUCH DAMAGE.
|
||||
|
||||
THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES,
|
||||
INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND
|
||||
NON-INFRINGEMENT. THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, AND
|
||||
THE AUTHORS AND DISTRIBUTORS HAVE NO OBLIGATION TO PROVIDE
|
||||
MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
|
||||
|
||||
GOVERNMENT USE: If you are acquiring this software on behalf of the
|
||||
U.S. government, the Government shall have only "Restricted Rights" in
|
||||
the software and related documentation as defined in the Federal
|
||||
Acquisition Regulations (FARs) in Clause 52.227.19 (c) (2). If you are
|
||||
acquiring the software on behalf of the Department of Defense, the
|
||||
software shall be classified as "Commercial Computer Software" and the
|
||||
Government shall have only "Restricted Rights" as defined in Clause
|
||||
252.227-7013 (c) (1) of DFARs. Notwithstanding the foregoing, the
|
||||
authors grant the U.S. Government and others acting in its behalf
|
||||
permission to use and distribute the software in accordance with the
|
||||
terms specified in this license.
|
||||
---*/
|
||||
#include "../check_includes.h"
|
||||
|
||||
#include <stasis/util/ringbuffer.h>
|
||||
|
||||
#include <assert.h>
|
||||
#include <sys/time.h>
|
||||
#include <time.h>
|
||||
|
||||
#define LOG_NAME "check_concurrentRingbuffer.log"
|
||||
|
||||
/**
|
||||
@test
|
||||
|
||||
*/
|
||||
|
||||
#define NUM_ENTRIES 10000
|
||||
|
||||
START_TEST(ringBufferSmokeTest) {
|
||||
stasis_ringbuffer_t * ring = stasis_ringbuffer_init(12, 0);
|
||||
assert((void*)RING_VOLATILE == stasis_ringbuffer_nb_get_rd_buf(ring, 0, 1));
|
||||
int64_t off = stasis_ringbuffer_nb_reserve_space(ring, 4*900);
|
||||
assert(off != RING_FULL && off != RING_TORN && off != RING_VOLATILE);
|
||||
void * buf = stasis_ringbuffer_get_wr_buf(ring, off, 4*900);
|
||||
|
||||
int64_t off2 = stasis_ringbuffer_nb_reserve_space(ring, 4*400);
|
||||
assert(off2 == RING_FULL);
|
||||
|
||||
stasis_ringbuffer_advance_write_tail(ring, 4*300);
|
||||
const void * buf3 = stasis_ringbuffer_nb_get_rd_buf(ring, 0,4*300);
|
||||
assert(buf3 == buf);
|
||||
|
||||
stasis_ringbuffer_advance_read_tail(ring, 4*300);
|
||||
|
||||
int64_t off4 = stasis_ringbuffer_nb_reserve_space(ring, 4*400);
|
||||
assert(off4 != RING_FULL && off4 != RING_TORN && off4 != RING_VOLATILE);
|
||||
|
||||
// XXX stasis_ringbuffer_deinit(ring);
|
||||
} END_TEST
|
||||
|
||||
#define PROD_CONS_SIZE (100L * 1024L * 1024L)
|
||||
static void * consumerWorker(void * arg) {
|
||||
stasis_ringbuffer_t * ring = arg;
|
||||
int64_t cursor = 0;
|
||||
while(cursor < PROD_CONS_SIZE) {
|
||||
int64_t rnd_size = myrandom(2048);
|
||||
if(rnd_size + cursor > PROD_CONS_SIZE) { rnd_size = PROD_CONS_SIZE - cursor; }
|
||||
byte const * rd_buf = stasis_ringbuffer_get_rd_buf(ring, RING_NEXT, &rnd_size);
|
||||
for(uint64_t i = 0; i < rnd_size; i++) {
|
||||
// printf("R[%lld] (addr=%lld) val = %d (%d)\n", cursor+i, (long long)(rd_buf)+i, rd_buf[i], (cursor+i)%250);
|
||||
assert(rd_buf[i] == ((cursor + i)%250));
|
||||
}
|
||||
cursor += rnd_size;
|
||||
stasis_ringbuffer_advance_read_tail(ring, cursor);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
static void * producerWorker(void * arg) {
|
||||
stasis_ringbuffer_t * ring = arg;
|
||||
int64_t cursor = 0;
|
||||
while(cursor < PROD_CONS_SIZE) {
|
||||
int rnd_size = myrandom(2048);
|
||||
if(rnd_size + cursor > PROD_CONS_SIZE) { rnd_size = PROD_CONS_SIZE - cursor; }
|
||||
int64_t wr_off = stasis_ringbuffer_reserve_space(ring, rnd_size, 0);
|
||||
assert(wr_off == cursor);
|
||||
byte * wr_buf = stasis_ringbuffer_get_wr_buf(ring, wr_off, rnd_size);
|
||||
for(uint64_t i = 0; i < rnd_size; i++) {
|
||||
wr_buf[i] = (cursor + i)%250;
|
||||
// printf("W[%d] (addr=%lld) val = %d\n", cursor+i, (long long)(wr_buf)+i, wr_buf[i]);
|
||||
}
|
||||
cursor += rnd_size;
|
||||
stasis_ringbuffer_advance_write_tail(ring, cursor);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
START_TEST(ringBufferProducerConsumerTest) {
|
||||
stasis_ringbuffer_t * ring = stasis_ringbuffer_init(12, 0);
|
||||
pthread_t reader, writer;
|
||||
pthread_create(&reader, 0, consumerWorker, ring);
|
||||
pthread_create(&writer, 0, producerWorker, ring);
|
||||
pthread_join(reader, 0);
|
||||
pthread_join(writer, 0);
|
||||
// XXX stasis_ringbuffer_deinit(ring);
|
||||
} END_TEST
|
||||
|
||||
#define NUM_READERS 20
|
||||
#define NUM_WRITERS NUM_READERS
|
||||
#define BYTES_PER_THREAD (10L * 1000L * 1000L)
|
||||
|
||||
typedef struct {
|
||||
pthread_mutex_t wr_mut;
|
||||
stasis_ringbuffer_t * ring;
|
||||
} arg;
|
||||
static void * concurrentReader(void * argp) {
|
||||
arg * a = argp;
|
||||
stasis_ringbuffer_t * ring = a->ring;
|
||||
uint64_t cursor = 0;
|
||||
int64_t rd_handle;
|
||||
while(cursor < BYTES_PER_THREAD) {
|
||||
int64_t rnd_size = 1+myrandom(2047/NUM_READERS);
|
||||
if(rnd_size + cursor > BYTES_PER_THREAD) { rnd_size = BYTES_PER_THREAD - cursor; }
|
||||
stasis_ringbuffer_consume_bytes(ring, &rnd_size, &rd_handle);
|
||||
|
||||
byte const * rd_buf = stasis_ringbuffer_get_rd_buf(ring, rd_handle, &rnd_size);
|
||||
|
||||
for(uint64_t i = 0; i < rnd_size; i++) {
|
||||
// printf("R[%lld] (addr=%lld) val = %d (%d)\n", cursor+i, (long long)(rd_buf)+i, rd_buf[i], (cursor+i)%250);
|
||||
assert(rd_buf[i] == ((rd_handle + i)%250));
|
||||
}
|
||||
cursor += rnd_size;
|
||||
stasis_ringbuffer_read_done(ring, &rd_handle);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
static void * concurrentWriter(void * argp) {
|
||||
arg * a = argp;
|
||||
stasis_ringbuffer_t * ring = a->ring;
|
||||
uint64_t cursor = 0;
|
||||
int64_t wr_handle;
|
||||
while(cursor < BYTES_PER_THREAD) {
|
||||
int rnd_size = 1+myrandom(2047/NUM_WRITERS);
|
||||
if(rnd_size + cursor > BYTES_PER_THREAD) { rnd_size = BYTES_PER_THREAD- cursor; }
|
||||
stasis_ringbuffer_reserve_space(ring, rnd_size, &wr_handle);
|
||||
byte * wr_buf = stasis_ringbuffer_get_wr_buf(ring, wr_handle, rnd_size);
|
||||
for(uint64_t i = 0; i < rnd_size; i++) {
|
||||
wr_buf[i] = (wr_handle + i)%250;
|
||||
// printf("W[%d] (addr=%lld) val = %d\n", cursor+i, (long long)(wr_buf)+i, wr_buf[i]);
|
||||
}
|
||||
cursor += rnd_size;
|
||||
stasis_ringbuffer_write_done(ring, &wr_handle);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
START_TEST(ringBufferConcurrentProducerConsumerTest) {
|
||||
arg a = {
|
||||
PTHREAD_MUTEX_INITIALIZER,
|
||||
stasis_ringbuffer_init(12, 0),
|
||||
};
|
||||
pthread_t readers[NUM_READERS];
|
||||
pthread_t writers[NUM_WRITERS];
|
||||
for(int i = 0; i < NUM_READERS; i++) {
|
||||
pthread_create(&readers[i], 0, concurrentReader, &a);
|
||||
}
|
||||
for(int i = 0; i < NUM_WRITERS; i++) {
|
||||
pthread_create(&writers[i], 0, concurrentWriter, &a);
|
||||
}
|
||||
for(int i = 0; i < NUM_READERS; i++) {
|
||||
pthread_join(readers[i], 0);
|
||||
}
|
||||
for(int i = 0; i < NUM_WRITERS; i++) {
|
||||
pthread_join(writers[i], 0);
|
||||
}
|
||||
|
||||
} END_TEST
|
||||
|
||||
Suite * check_suite(void) {
|
||||
Suite *s = suite_create("ringBuffer");
|
||||
/* Begin a new test */
|
||||
TCase *tc = tcase_create("ringBuffer");
|
||||
|
||||
/* Sub tests are added, one per line, here */
|
||||
|
||||
tcase_add_test(tc, ringBufferSmokeTest);
|
||||
tcase_add_test(tc, ringBufferProducerConsumerTest);
|
||||
tcase_add_test(tc, ringBufferConcurrentProducerConsumerTest);
|
||||
|
||||
/* --------------------------------------------- */
|
||||
|
||||
tcase_add_checked_fixture(tc, setup, teardown);
|
||||
|
||||
|
||||
suite_add_tcase(s, tc);
|
||||
return s;
|
||||
}
|
||||
|
||||
#include "../check_setup.h"
|
Loading…
Reference in a new issue