First cut at a new io API, with an in-memory reference implementation + test case.
This commit is contained in:
parent
e961a2bdb8
commit
80a4148543
4 changed files with 708 additions and 1 deletions
85
lladd/io/handle.h
Normal file
85
lladd/io/handle.h
Normal file
|
@ -0,0 +1,85 @@
|
||||||
|
#include <lladd/transactional.h>
|
||||||
|
#define stasis_handle(x) stasis_handle_##x
|
||||||
|
|
||||||
|
/**
|
||||||
|
Error handling:
|
||||||
|
|
||||||
|
read, write, append, open, release_read_buffer and
|
||||||
|
release_write_buffer return 0 on success, and an error code
|
||||||
|
otherwise. read_buffer() and write_buffer() return error codes via
|
||||||
|
the error field of the handles they produce.
|
||||||
|
|
||||||
|
Errors in num_copies, num_copies_buffer, start_position, and end_position
|
||||||
|
are always unrecoverable, and return -1.
|
||||||
|
|
||||||
|
Here are the meanings of the various error codes:
|
||||||
|
|
||||||
|
EDOM off is less than the beginning of the file (possibly due to truncation).
|
||||||
|
EBADF an unrecoverable error occurred; the handle is no longer vaild. The error
|
||||||
|
that caused this one is stored in the handle's error field.
|
||||||
|
|
||||||
|
Handle implementations may return return other errors as appropriate.
|
||||||
|
*/
|
||||||
|
typedef struct stasis_handle_t {
|
||||||
|
/** @return the number of in-memory copies made when the caller
|
||||||
|
provides the buffer */
|
||||||
|
int (*num_copies)();
|
||||||
|
/** @return the number of in-memory copies made when the handle
|
||||||
|
provides the buffer */
|
||||||
|
int (*num_copies_buffer)();
|
||||||
|
|
||||||
|
int (*close)(struct stasis_handle_t *);
|
||||||
|
|
||||||
|
/** The offset of the handle's first byte */
|
||||||
|
lsn_t (*start_position)(struct stasis_handle_t * h);
|
||||||
|
/** The offset of the byte after the end of the handle's data. */
|
||||||
|
lsn_t (*end_position)(struct stasis_handle_t * h);
|
||||||
|
|
||||||
|
int (*write)(struct stasis_handle_t * h, lsn_t off,
|
||||||
|
const byte * dat, lsn_t len);
|
||||||
|
int (*append)(struct stasis_handle_t * h, lsn_t * off, const byte * dat, lsn_t len);
|
||||||
|
|
||||||
|
struct stasis_write_buffer_t * (*write_buffer)(struct stasis_handle_t * h,
|
||||||
|
lsn_t off, lsn_t len);
|
||||||
|
struct stasis_write_buffer_t * (*append_buffer)(struct stasis_handle_t * h,
|
||||||
|
lsn_t len);
|
||||||
|
int (*release_write_buffer)(struct stasis_write_buffer_t * w);
|
||||||
|
|
||||||
|
int (*read)(struct stasis_handle_t * h,
|
||||||
|
lsn_t offset, byte * buf, lsn_t length);
|
||||||
|
|
||||||
|
struct stasis_read_buffer_t * (*read_buffer)(struct stasis_handle_t * h,
|
||||||
|
lsn_t ofset, lsn_t length);
|
||||||
|
int (*release_read_buffer)(struct stasis_read_buffer_t * r);
|
||||||
|
|
||||||
|
int (*truncate_start)(struct stasis_handle_t * h, lsn_t new_start);
|
||||||
|
|
||||||
|
int error;
|
||||||
|
|
||||||
|
void * impl;
|
||||||
|
|
||||||
|
} stasis_handle_t;
|
||||||
|
|
||||||
|
typedef struct stasis_write_buffer_t {
|
||||||
|
stasis_handle_t * h;
|
||||||
|
lsn_t off;
|
||||||
|
byte * buf;
|
||||||
|
lsn_t len;
|
||||||
|
void * impl;
|
||||||
|
int error;
|
||||||
|
} stasis_write_buffer_t;
|
||||||
|
|
||||||
|
typedef struct stasis_read_buffer_t {
|
||||||
|
stasis_handle_t * h;
|
||||||
|
const byte * buf;
|
||||||
|
lsn_t len;
|
||||||
|
void * impl;
|
||||||
|
int error;
|
||||||
|
} stasis_read_buffer_t;
|
||||||
|
|
||||||
|
stasis_handle_t * stasis_handle(open_memory)();
|
||||||
|
stasis_handle_t * stasis_handle(open_file)(char * path, int mode);
|
||||||
|
stasis_handle_t * stasis_handle(open_non_blocking)(stasis_handle_t * h,
|
||||||
|
int worker_thread_count);
|
||||||
|
stasis_handle_t * stasis_handle(open_verifying)(stasis_handle_t * h);
|
||||||
|
|
|
@ -17,7 +17,8 @@ liblladd_a_SOURCES=crc32.c redblack.c lhtable.c common.c stats.c io.c bufferMana
|
||||||
operations/naiveLinearHash.c operations/nestedTopActions.c \
|
operations/naiveLinearHash.c operations/nestedTopActions.c \
|
||||||
operations/linearHashNTA.c operations/linkedListNTA.c \
|
operations/linearHashNTA.c operations/linkedListNTA.c \
|
||||||
operations/pageOrientedListNTA.c operations/bTree.c \
|
operations/pageOrientedListNTA.c operations/bTree.c \
|
||||||
operations/regions.c
|
operations/regions.c \
|
||||||
|
io/memory.c
|
||||||
# page/header.c logger/logMemory.c \ ringbuffer.c \ asdfas
|
# page/header.c logger/logMemory.c \ ringbuffer.c \ asdfas
|
||||||
#operations/lladdhash.c
|
#operations/lladdhash.c
|
||||||
#AM_CFLAGS= -g -Wall -pedantic -std=gnu99
|
#AM_CFLAGS= -g -Wall -pedantic -std=gnu99
|
||||||
|
|
273
src/lladd/io/memory.c
Normal file
273
src/lladd/io/memory.c
Normal file
|
@ -0,0 +1,273 @@
|
||||||
|
#include <lladd/io/handle.h>
|
||||||
|
#include <latches.h>
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include <assert.h>
|
||||||
|
#include <string.h>
|
||||||
|
#include <pthread.h>
|
||||||
|
#include <errno.h>
|
||||||
|
|
||||||
|
|
||||||
|
typedef struct mem_impl {
|
||||||
|
pthread_mutex_t mut;
|
||||||
|
rwl * trunc_latch;
|
||||||
|
lsn_t start_pos;
|
||||||
|
lsn_t end_pos;
|
||||||
|
byte * buf;
|
||||||
|
} mem_impl;
|
||||||
|
|
||||||
|
static int mem_num_copies() { return 1; }
|
||||||
|
static int mem_num_copies_buffer() { return 0; }
|
||||||
|
|
||||||
|
static int mem_close(stasis_handle_t * h) {
|
||||||
|
free(((mem_impl*)h->impl)->buf);
|
||||||
|
pthread_mutex_destroy(&(((mem_impl*)h->impl)->mut));
|
||||||
|
deletelock(((mem_impl*)h->impl)->trunc_latch);
|
||||||
|
free(h->impl);
|
||||||
|
free(h);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
static lsn_t mem_start_position(stasis_handle_t *h) {
|
||||||
|
lsn_t ret;
|
||||||
|
mem_impl* impl = (mem_impl*)(h->impl);
|
||||||
|
|
||||||
|
pthread_mutex_lock(&impl->mut);
|
||||||
|
ret = impl->start_pos;
|
||||||
|
pthread_mutex_unlock(&impl->mut);
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
static lsn_t mem_end_position(stasis_handle_t *h) {
|
||||||
|
lsn_t ret;
|
||||||
|
mem_impl* impl = (mem_impl*)(h->impl);
|
||||||
|
|
||||||
|
pthread_mutex_lock(&impl->mut);
|
||||||
|
ret = impl->end_pos;
|
||||||
|
pthread_mutex_unlock(&impl->mut);
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
static stasis_write_buffer_t * mem_write_buffer(stasis_handle_t * h,
|
||||||
|
lsn_t off, lsn_t len) {
|
||||||
|
mem_impl* impl = (mem_impl*)(h->impl);
|
||||||
|
|
||||||
|
stasis_write_buffer_t * ret = malloc(sizeof(stasis_write_buffer_t));
|
||||||
|
if(!ret) { return NULL; }
|
||||||
|
|
||||||
|
writelock(impl->trunc_latch,0);
|
||||||
|
pthread_mutex_lock(&(impl->mut));
|
||||||
|
|
||||||
|
int error = 0;
|
||||||
|
|
||||||
|
if(impl->start_pos > off) {
|
||||||
|
error = EDOM;
|
||||||
|
} else if(impl->end_pos > off+len) {
|
||||||
|
// Just need to return buffer; h's state is unchanged.
|
||||||
|
} else {
|
||||||
|
byte * newbuf = realloc(impl->buf, off+len - impl->start_pos);
|
||||||
|
|
||||||
|
if(newbuf) {
|
||||||
|
impl->buf = newbuf;
|
||||||
|
impl->end_pos = off+len;
|
||||||
|
} else {
|
||||||
|
error = ENOMEM;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if(error) {
|
||||||
|
ret->h = h;
|
||||||
|
ret->off = 0;
|
||||||
|
ret->buf = 0;
|
||||||
|
ret->len = 0;
|
||||||
|
ret->impl = 0;
|
||||||
|
ret->error = error;
|
||||||
|
} else {
|
||||||
|
ret->h = h;
|
||||||
|
ret->off = off;
|
||||||
|
ret->buf = &(impl->buf[off-impl->start_pos]);
|
||||||
|
ret->len = len;
|
||||||
|
ret->impl = 0;
|
||||||
|
ret->error = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
pthread_mutex_unlock(&(impl->mut));
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
static stasis_write_buffer_t * mem_append_buffer(stasis_handle_t * h,
|
||||||
|
lsn_t len) {
|
||||||
|
mem_impl * impl = (mem_impl*)(h->impl);
|
||||||
|
|
||||||
|
stasis_write_buffer_t * ret = malloc(sizeof(stasis_write_buffer_t));
|
||||||
|
if(!ret) { return 0; }
|
||||||
|
|
||||||
|
writelock(impl->trunc_latch,0);
|
||||||
|
pthread_mutex_lock(&(impl->mut));
|
||||||
|
|
||||||
|
lsn_t off = impl->end_pos;
|
||||||
|
impl->end_pos += len;
|
||||||
|
byte * newbuf = realloc(impl->buf, impl->end_pos - impl->start_pos);
|
||||||
|
if(newbuf) {
|
||||||
|
impl->buf = newbuf;
|
||||||
|
|
||||||
|
ret->h = h;
|
||||||
|
ret->off = off;
|
||||||
|
ret->buf = &(impl->buf[off-impl->start_pos]);
|
||||||
|
ret->len = len;
|
||||||
|
ret->impl = 0;
|
||||||
|
ret->error = 0;
|
||||||
|
} else {
|
||||||
|
ret->h = h;
|
||||||
|
ret->off = 0;
|
||||||
|
ret->buf = 0;
|
||||||
|
ret->len = 0;
|
||||||
|
ret->impl = 0;
|
||||||
|
ret->error = ENOMEM;
|
||||||
|
}
|
||||||
|
pthread_mutex_unlock(&(impl->mut));
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
static int mem_release_write_buffer(stasis_write_buffer_t * w) {
|
||||||
|
unlock(((mem_impl*)w->h->impl)->trunc_latch);
|
||||||
|
free(w);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int mem_write(stasis_handle_t * h, lsn_t off,
|
||||||
|
const byte * dat, lsn_t len) {
|
||||||
|
// Overlapping writes aren't atomic; no latch needed.
|
||||||
|
stasis_write_buffer_t * w = mem_write_buffer(h, off, len);
|
||||||
|
int ret;
|
||||||
|
if(w->error) {
|
||||||
|
ret = w->error;
|
||||||
|
} else {
|
||||||
|
memcpy(w->buf, dat, len);
|
||||||
|
ret = 0;
|
||||||
|
}
|
||||||
|
mem_release_write_buffer(w);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int mem_append(stasis_handle_t * h, lsn_t * off, const byte * dat, lsn_t len) {
|
||||||
|
stasis_write_buffer_t * w = mem_append_buffer(h, len);
|
||||||
|
int ret;
|
||||||
|
if(w->error) {
|
||||||
|
ret = w->error;
|
||||||
|
} else {
|
||||||
|
memcpy(w->buf, dat, len);
|
||||||
|
ret = 0;
|
||||||
|
}
|
||||||
|
*off = w->off;
|
||||||
|
mem_release_write_buffer(w);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
static stasis_read_buffer_t * mem_read_buffer(stasis_handle_t * h,
|
||||||
|
lsn_t off, lsn_t len) {
|
||||||
|
mem_impl * impl = (mem_impl*)(h->impl);
|
||||||
|
readlock(impl->trunc_latch,0);
|
||||||
|
pthread_mutex_lock(&(impl->mut));
|
||||||
|
|
||||||
|
stasis_read_buffer_t * ret = malloc(sizeof(stasis_read_buffer_t));
|
||||||
|
if(!ret) { return NULL; }
|
||||||
|
|
||||||
|
if(off < impl->start_pos || off + len > impl->end_pos) {
|
||||||
|
ret->h = h;
|
||||||
|
ret->buf = 0;
|
||||||
|
ret->len = 0;
|
||||||
|
ret->impl = 0;
|
||||||
|
ret->error = EDOM;
|
||||||
|
} else {
|
||||||
|
ret->h = h;
|
||||||
|
ret->buf = &(impl->buf[off-impl->start_pos]);
|
||||||
|
ret->len = len;
|
||||||
|
ret->impl = 0;
|
||||||
|
ret->error = 0;
|
||||||
|
}
|
||||||
|
pthread_mutex_unlock(&(impl->mut));
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
static int mem_release_read_buffer(stasis_read_buffer_t * r) {
|
||||||
|
unlock(((mem_impl*)r->h->impl)->trunc_latch);
|
||||||
|
free(r);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int mem_read(stasis_handle_t * h,
|
||||||
|
lsn_t off, byte * buf, lsn_t len) {
|
||||||
|
stasis_read_buffer_t * r = mem_read_buffer(h, off, len);
|
||||||
|
int ret;
|
||||||
|
if(r->error) {
|
||||||
|
ret = r->error;
|
||||||
|
} else {
|
||||||
|
memcpy(buf, r->buf, len);
|
||||||
|
ret = 0;
|
||||||
|
}
|
||||||
|
mem_release_read_buffer(r);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static int mem_truncate_start(stasis_handle_t * h, lsn_t new_start) {
|
||||||
|
mem_impl* impl = (mem_impl*) h->impl;
|
||||||
|
writelock(impl->trunc_latch,0);
|
||||||
|
pthread_mutex_lock(&(impl->mut));
|
||||||
|
if(new_start < impl->start_pos) {
|
||||||
|
pthread_mutex_unlock(&impl->mut);
|
||||||
|
unlock(impl->trunc_latch);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
if(new_start > impl->end_pos) {
|
||||||
|
pthread_mutex_unlock(&impl->mut);
|
||||||
|
unlock(impl->trunc_latch);
|
||||||
|
return EDOM;
|
||||||
|
}
|
||||||
|
|
||||||
|
byte * new_buf = malloc(impl->end_pos -new_start);
|
||||||
|
|
||||||
|
memcpy(new_buf, &(impl->buf[new_start - impl->start_pos]), impl->end_pos - new_start);
|
||||||
|
|
||||||
|
free(impl->buf);
|
||||||
|
|
||||||
|
impl->buf = new_buf;
|
||||||
|
impl->start_pos = new_start;
|
||||||
|
|
||||||
|
pthread_mutex_unlock(&(impl->mut));
|
||||||
|
unlock(impl->trunc_latch);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
struct stasis_handle_t mem_func = {
|
||||||
|
.num_copies = mem_num_copies,
|
||||||
|
.num_copies_buffer = mem_num_copies_buffer,
|
||||||
|
.close = mem_close,
|
||||||
|
.start_position = mem_start_position,
|
||||||
|
.end_position = mem_end_position,
|
||||||
|
.write = mem_write,
|
||||||
|
.append = mem_append,
|
||||||
|
.write_buffer = mem_write_buffer,
|
||||||
|
.append_buffer = mem_append_buffer,
|
||||||
|
.release_write_buffer = mem_release_write_buffer,
|
||||||
|
.read = mem_read,
|
||||||
|
.read_buffer = mem_read_buffer,
|
||||||
|
.release_read_buffer = mem_release_read_buffer,
|
||||||
|
.truncate_start = mem_truncate_start,
|
||||||
|
.error = 0
|
||||||
|
};
|
||||||
|
|
||||||
|
stasis_handle_t * stasis_handle(open_memory)() {
|
||||||
|
stasis_handle_t * ret = malloc(sizeof(stasis_handle_t));
|
||||||
|
*ret = mem_func;
|
||||||
|
|
||||||
|
mem_impl * impl = malloc(sizeof(mem_impl));
|
||||||
|
ret->impl = impl;
|
||||||
|
pthread_mutex_init(&(impl->mut), 0);
|
||||||
|
impl->trunc_latch = initlock();
|
||||||
|
impl->start_pos = 0;
|
||||||
|
impl->end_pos = 0;
|
||||||
|
impl->buf = malloc(0);
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
348
test/lladd/check_io.c
Normal file
348
test/lladd/check_io.c
Normal file
|
@ -0,0 +1,348 @@
|
||||||
|
/*--- This software is copyrighted by the Regents of the University of
|
||||||
|
California, and other parties. The following terms apply to all files
|
||||||
|
associated with the software unless explicitly disclaimed in
|
||||||
|
individual files.
|
||||||
|
|
||||||
|
The authors hereby grant permission to use, copy, modify, distribute,
|
||||||
|
and license this software and its documentation for any purpose,
|
||||||
|
provided that existing copyright notices are retained in all copies
|
||||||
|
and that this notice is included verbatim in any distributions. No
|
||||||
|
written agreement, license, or royalty fee is required for any of the
|
||||||
|
authorized uses. Modifications to this software may be copyrighted by
|
||||||
|
their authors and need not follow the licensing terms described here,
|
||||||
|
provided that the new terms are clearly indicated on the first page of
|
||||||
|
each file where they apply.
|
||||||
|
|
||||||
|
IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY
|
||||||
|
FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES
|
||||||
|
ARISING OUT OF THE USE OF THIS SOFTWARE, ITS DOCUMENTATION, OR ANY
|
||||||
|
DERIVATIVES THEREOF, EVEN IF THE AUTHORS HAVE BEEN ADVISED OF THE
|
||||||
|
POSSIBILITY OF SUCH DAMAGE.
|
||||||
|
|
||||||
|
THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES,
|
||||||
|
INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
|
||||||
|
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND
|
||||||
|
NON-INFRINGEMENT. THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, AND
|
||||||
|
THE AUTHORS AND DISTRIBUTORS HAVE NO OBLIGATION TO PROVIDE
|
||||||
|
MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
|
||||||
|
|
||||||
|
GOVERNMENT USE: If you are acquiring this software on behalf of the
|
||||||
|
U.S. government, the Government shall have only "Restricted Rights" in
|
||||||
|
the software and related documentation as defined in the Federal
|
||||||
|
Acquisition Regulations (FARs) in Clause 52.227.19 (c) (2). If you are
|
||||||
|
acquiring the software on behalf of the Department of Defense, the
|
||||||
|
software shall be classified as "Commercial Computer Software" and the
|
||||||
|
Government shall have only "Restricted Rights" as defined in Clause
|
||||||
|
252.227-7013 (c) (1) of DFARs. Notwithstanding the foregoing, the
|
||||||
|
authors grant the U.S. Government and others acting in its behalf
|
||||||
|
permission to use and distribute the software in accordance with the
|
||||||
|
terms specified in this license.
|
||||||
|
---*/
|
||||||
|
#include <config.h>
|
||||||
|
#include <lladd/io/handle.h>
|
||||||
|
|
||||||
|
#include <check.h>
|
||||||
|
#include <assert.h>
|
||||||
|
|
||||||
|
#include "../check_includes.h"
|
||||||
|
|
||||||
|
#define LOG_NAME "check_io.log"
|
||||||
|
|
||||||
|
long myrandom(long x) {
|
||||||
|
double xx = x;
|
||||||
|
double r = random();
|
||||||
|
double max = ((long)RAND_MAX)+1;
|
||||||
|
max /= xx;
|
||||||
|
return (long)((r/max));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void handle_smoketest(stasis_handle_t * h) {
|
||||||
|
|
||||||
|
const int one = 0x11111111;
|
||||||
|
const int two = 0x22222222;
|
||||||
|
const int three = 0x33333333;
|
||||||
|
const int four = 0x44444444;
|
||||||
|
|
||||||
|
assert((!h->num_copies()) || (!h->num_copies_buffer()));
|
||||||
|
|
||||||
|
assert(0 == h->start_position(h) ||
|
||||||
|
0 == h->end_position(h));
|
||||||
|
|
||||||
|
assert(! h->write(h, 0, (byte*)&one, sizeof(int)));
|
||||||
|
|
||||||
|
int one_read = 0;
|
||||||
|
assert(! h->read(h, 0, (byte*)&one_read, sizeof(int)));
|
||||||
|
assert(one_read == one);
|
||||||
|
stasis_write_buffer_t * w = h->write_buffer(h, sizeof(int), sizeof(int));
|
||||||
|
*((int*)(w->buf)) = two;
|
||||||
|
w->h->release_write_buffer(w);
|
||||||
|
one_read = 0;
|
||||||
|
stasis_read_buffer_t * r = h->read_buffer(h, 0, sizeof(int));
|
||||||
|
one_read = *((int*)(r->buf));
|
||||||
|
r->h->release_read_buffer(r);
|
||||||
|
assert(one == one_read);
|
||||||
|
|
||||||
|
int two_read = 0;
|
||||||
|
assert(! h->read(h, sizeof(int), (byte*)&two_read, sizeof(int)));
|
||||||
|
assert(two == two_read);
|
||||||
|
|
||||||
|
lsn_t off;
|
||||||
|
assert(! h->append(h, &off, (byte*)&three, sizeof(int)));
|
||||||
|
|
||||||
|
w = h->append_buffer(h, sizeof(int));
|
||||||
|
memcpy(w->buf, &four, sizeof(int));
|
||||||
|
w->h->release_write_buffer(w);
|
||||||
|
|
||||||
|
h->truncate_start(h, 2 * sizeof(int));
|
||||||
|
|
||||||
|
int three_read = 0;
|
||||||
|
int four_read = 0;
|
||||||
|
|
||||||
|
assert(! h->read(h, 2*sizeof(int), (byte*)&three_read, sizeof(int)));
|
||||||
|
|
||||||
|
r = h->read_buffer(h, 3*sizeof(int), sizeof(int));
|
||||||
|
memcpy(&four_read, r->buf, sizeof(int));
|
||||||
|
r->h->release_read_buffer(r);
|
||||||
|
|
||||||
|
assert(three == three_read);
|
||||||
|
assert(four == four_read);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int * values;
|
||||||
|
int count;
|
||||||
|
stasis_handle_t * h;
|
||||||
|
} thread_arg;
|
||||||
|
|
||||||
|
#define VALUE_COUNT 100000
|
||||||
|
#define THREAD_COUNT 100
|
||||||
|
#define OPS_PER_THREAD 500000
|
||||||
|
|
||||||
|
lsn_t trunc_val;
|
||||||
|
pthread_mutex_t trunc_mut = PTHREAD_MUTEX_INITIALIZER;
|
||||||
|
|
||||||
|
void load_handle(thread_arg* t) {
|
||||||
|
lsn_t * offsets = malloc(t->count * sizeof(lsn_t));
|
||||||
|
|
||||||
|
stasis_handle_t * h = t->h;
|
||||||
|
|
||||||
|
for(int i = 0; i < t->count; i++) {
|
||||||
|
offsets[i] = -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
for(int i = 0; i < OPS_PER_THREAD; i++) {
|
||||||
|
int val = myrandom(t->count);
|
||||||
|
|
||||||
|
if(offsets[val] == -1) {
|
||||||
|
// Need to write it somewhere.
|
||||||
|
|
||||||
|
long choice = myrandom(4);
|
||||||
|
|
||||||
|
switch(choice) {
|
||||||
|
case 0: { // overwrite old entry with write()
|
||||||
|
long val2 = myrandom(t->count);
|
||||||
|
offsets[val] = offsets[val2];
|
||||||
|
offsets[val2] = -1;
|
||||||
|
if(offsets[val] != -1) {
|
||||||
|
int ret = h->write(h, offsets[val], (const byte*)&(t->values[val]), sizeof(int));
|
||||||
|
if(ret) {
|
||||||
|
assert(ret == EDOM);
|
||||||
|
offsets[val] = -1;
|
||||||
|
i--;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
i--;
|
||||||
|
}
|
||||||
|
} break;
|
||||||
|
case 1: { // overwrite old entry with write_buffer()
|
||||||
|
long val2 = myrandom(t->count);
|
||||||
|
offsets[val] = offsets[val2];
|
||||||
|
offsets[val2] = -1;
|
||||||
|
if(offsets[val] != -1) {
|
||||||
|
stasis_write_buffer_t * w = h->write_buffer(h, offsets[val], sizeof(int));
|
||||||
|
if(!w->error) {
|
||||||
|
*((int*)w->buf) = t->values[val];
|
||||||
|
assert(w->len == sizeof(int));
|
||||||
|
assert(w->off == offsets[val]);
|
||||||
|
} else {
|
||||||
|
assert(w->error == EDOM);
|
||||||
|
offsets[val] = -1;
|
||||||
|
i--;
|
||||||
|
}
|
||||||
|
w->h->release_write_buffer(w);
|
||||||
|
} else {
|
||||||
|
i--;
|
||||||
|
}
|
||||||
|
} break;
|
||||||
|
case 2: { // append
|
||||||
|
int ret = h->append(h, &(offsets[val]), (const byte*)&(t->values[val]), sizeof(int));
|
||||||
|
assert(!ret);
|
||||||
|
} break;
|
||||||
|
case 3: { // append_buffer
|
||||||
|
stasis_write_buffer_t * w = h->append_buffer(h, sizeof(int));
|
||||||
|
if(!w->error) {
|
||||||
|
*((int*)w->buf) = t->values[val];
|
||||||
|
assert(w->len == sizeof(int));
|
||||||
|
offsets[val] = w->off;
|
||||||
|
} else {
|
||||||
|
abort();
|
||||||
|
}
|
||||||
|
w->h->release_write_buffer(w);
|
||||||
|
} break;
|
||||||
|
default: {
|
||||||
|
abort();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int check;
|
||||||
|
int ret = h->read(h, offsets[val], (byte*)&check, sizeof(int));
|
||||||
|
if(!ret) {
|
||||||
|
assert(check == t->values[val]);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
} else {
|
||||||
|
// Read the value.
|
||||||
|
|
||||||
|
long choice = myrandom(2);
|
||||||
|
|
||||||
|
switch(choice) {
|
||||||
|
case 0: { // read
|
||||||
|
int j;
|
||||||
|
int ret = h->read(h, offsets[val], (byte*)&j, sizeof(int));
|
||||||
|
if(!ret) {
|
||||||
|
assert(j == t->values[val]);
|
||||||
|
} else {
|
||||||
|
assert(ret == EDOM);
|
||||||
|
assert(h->start_position(h) > offsets[val]);
|
||||||
|
}
|
||||||
|
} break;
|
||||||
|
case 1: { // read_buffer
|
||||||
|
stasis_read_buffer_t * r = h->read_buffer(h, offsets[val], sizeof(int));
|
||||||
|
if(!r->error) {
|
||||||
|
assert(*(int*)(r->buf) == t->values[val]);
|
||||||
|
assert(r->len == sizeof(int));
|
||||||
|
} else {
|
||||||
|
assert(r->error == EDOM);
|
||||||
|
assert(h->start_position(h) > offsets[val]);
|
||||||
|
}
|
||||||
|
r->h->release_read_buffer(r);
|
||||||
|
|
||||||
|
} break;
|
||||||
|
default:
|
||||||
|
abort();
|
||||||
|
};
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// Truncate 1% of the time.
|
||||||
|
if(!myrandom(100)) {
|
||||||
|
lsn_t pre_start = h->start_position(h);
|
||||||
|
|
||||||
|
pthread_mutex_lock(&trunc_mut);
|
||||||
|
lsn_t start = trunc_val;
|
||||||
|
lsn_t stop = start - 100 + myrandom(200);
|
||||||
|
if(stop > trunc_val) {
|
||||||
|
trunc_val = stop;
|
||||||
|
}
|
||||||
|
pthread_mutex_unlock(&trunc_mut);
|
||||||
|
|
||||||
|
|
||||||
|
assert(pre_start <= start);
|
||||||
|
int ret = h->truncate_start(h, stop);
|
||||||
|
if(!ret) {
|
||||||
|
lsn_t post_stop = h->start_position(h);
|
||||||
|
assert(stop <= post_stop);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
free(offsets);
|
||||||
|
}
|
||||||
|
|
||||||
|
void handle_sequentialtest(stasis_handle_t * h) {
|
||||||
|
time_t seed = time(0);
|
||||||
|
printf("\nSeed = %ld\n", seed);
|
||||||
|
srandom(seed);
|
||||||
|
|
||||||
|
int * values = malloc(VALUE_COUNT * sizeof(int));
|
||||||
|
|
||||||
|
for(int i = 0; i < VALUE_COUNT; i++) {
|
||||||
|
values[i] = i;
|
||||||
|
}
|
||||||
|
trunc_val = 0;
|
||||||
|
thread_arg arg = { values, VALUE_COUNT, h};
|
||||||
|
load_handle(&arg);
|
||||||
|
|
||||||
|
free(values);
|
||||||
|
}
|
||||||
|
|
||||||
|
void handle_concurrencytest(stasis_handle_t * h) {
|
||||||
|
int vc = myrandom(VALUE_COUNT) + 10;
|
||||||
|
|
||||||
|
printf("Running concurrency test with %d values", vc);
|
||||||
|
|
||||||
|
int * values = malloc(vc * sizeof(int));
|
||||||
|
|
||||||
|
for(int i = 0; i < vc; i++) {
|
||||||
|
values[i] = i;
|
||||||
|
}
|
||||||
|
|
||||||
|
thread_arg * args = malloc(THREAD_COUNT * sizeof(thread_arg));
|
||||||
|
pthread_t * threads = malloc(THREAD_COUNT * sizeof(pthread_t));
|
||||||
|
|
||||||
|
int val_per_thread = vc / THREAD_COUNT;
|
||||||
|
trunc_val = 0;
|
||||||
|
for(int i = 0; i < THREAD_COUNT; i++) {
|
||||||
|
args[i].values = &(values[i * val_per_thread]);
|
||||||
|
args[i].count = val_per_thread;
|
||||||
|
args[i].h = h;
|
||||||
|
pthread_create(&threads[i], 0, (void*(*)(void*))load_handle, &args[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
for(int i = 0; i < THREAD_COUNT; i++) {
|
||||||
|
pthread_join(threads[i], 0);
|
||||||
|
}
|
||||||
|
free(values);
|
||||||
|
free(args);
|
||||||
|
free(threads);
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
@test
|
||||||
|
Check the memory I/O handle.
|
||||||
|
*/
|
||||||
|
START_TEST(io_memoryTest) {
|
||||||
|
stasis_handle_t * h = stasis_handle(open_memory)(0);
|
||||||
|
handle_smoketest(h);
|
||||||
|
h->close(h);
|
||||||
|
h = stasis_handle(open_memory)(0);
|
||||||
|
handle_sequentialtest(h);
|
||||||
|
h->close(h);
|
||||||
|
h = stasis_handle(open_memory)(0);
|
||||||
|
handle_concurrencytest(h);
|
||||||
|
h->close(h);
|
||||||
|
} END_TEST
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
Add suite declarations here
|
||||||
|
*/
|
||||||
|
Suite * check_suite(void) {
|
||||||
|
Suite *s = suite_create("io");
|
||||||
|
/* Begin a new test */
|
||||||
|
TCase *tc = tcase_create("io_test");
|
||||||
|
tcase_set_timeout(tc, 600); // ten minute timeout
|
||||||
|
|
||||||
|
/* Sub tests are added, one per line, here */
|
||||||
|
tcase_add_test(tc, io_memoryTest);
|
||||||
|
/* --------------------------------------------- */
|
||||||
|
tcase_add_checked_fixture(tc, setup, teardown);
|
||||||
|
suite_add_tcase(s, tc);
|
||||||
|
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
|
||||||
|
#include "../check_setup.h"
|
Loading…
Reference in a new issue