Added non-blocking io implementation. It doesn't actually work yet, but it passes the unit tests. ;)

This commit is contained in:
Sears Russell 2006-10-26 05:48:30 +00:00
parent 10b77729f5
commit faf55911df
6 changed files with 416 additions and 21 deletions

View file

@ -89,9 +89,11 @@ typedef struct stasis_read_buffer_t {
int error; int error;
} stasis_read_buffer_t; } stasis_read_buffer_t;
stasis_handle_t * stasis_handle(open_memory)(void); stasis_handle_t * stasis_handle(open_memory)(lsn_t start_offset);
stasis_handle_t * stasis_handle(open_file)(char * path, int flags, int mode); stasis_handle_t * stasis_handle(open_file)(lsn_t start_offset, char * path, int flags, int mode);
stasis_handle_t * stasis_handle(open_non_blocking)(stasis_handle_t * h, stasis_handle_t * stasis_handle(open_non_blocking)(stasis_handle_t * slow,
int worker_thread_count); stasis_handle_t * (*fast_factory)(lsn_t off, lsn_t len, void * arg),
void * fast_factory_arg);
stasis_handle_t * stasis_handle(open_verifying)(stasis_handle_t * h); stasis_handle_t * stasis_handle(open_verifying)(stasis_handle_t * h);
stasis_handle_t * stasis_handle(open_debug)(stasis_handle_t * h); stasis_handle_t * stasis_handle(open_debug)(stasis_handle_t * h);

View file

@ -18,7 +18,7 @@ liblladd_a_SOURCES=crc32.c redblack.c lhtable.c common.c stats.c io.c bufferMana
operations/linearHashNTA.c operations/linkedListNTA.c \ operations/linearHashNTA.c operations/linkedListNTA.c \
operations/pageOrientedListNTA.c operations/bTree.c \ operations/pageOrientedListNTA.c operations/bTree.c \
operations/regions.c \ operations/regions.c \
io/memory.c io/file.c io/debug.c io/memory.c io/file.c io/non_blocking.c io/debug.c
# page/header.c logger/logMemory.c \ ringbuffer.c \ asdfas # page/header.c logger/logMemory.c \ ringbuffer.c \ asdfas
#operations/lladdhash.c #operations/lladdhash.c
#AM_CFLAGS= -g -Wall -pedantic -std=gnu99 #AM_CFLAGS= -g -Wall -pedantic -std=gnu99

View file

@ -456,7 +456,7 @@ struct stasis_handle_t file_func = {
.error = 0 .error = 0
}; };
stasis_handle_t * stasis_handle(open_file)(char * filename, int flags, int mode) { stasis_handle_t * stasis_handle(open_file)(lsn_t start_offset, char * filename, int flags, int mode) {
stasis_handle_t * ret = malloc(sizeof(stasis_handle_t)); stasis_handle_t * ret = malloc(sizeof(stasis_handle_t));
if(!ret) { return NULL; } if(!ret) { return NULL; }
*ret = file_func; *ret = file_func;
@ -464,8 +464,8 @@ stasis_handle_t * stasis_handle(open_file)(char * filename, int flags, int mode)
file_impl * impl = malloc(sizeof(file_impl)); file_impl * impl = malloc(sizeof(file_impl));
ret->impl = impl; ret->impl = impl;
pthread_mutex_init(&(impl->mut), 0); pthread_mutex_init(&(impl->mut), 0);
impl->start_pos = 0; impl->start_pos = start_offset;
impl->end_pos = 0; impl->end_pos = start_offset;
impl->fd = open(filename, flags, mode); impl->fd = open(filename, flags, mode);
if(impl->fd == -1) { if(impl->fd == -1) {
ret->error = errno; ret->error = errno;

View file

@ -98,7 +98,14 @@ static stasis_write_buffer_t * mem_append_buffer(stasis_handle_t * h,
lsn_t off = impl->end_pos; lsn_t off = impl->end_pos;
impl->end_pos += len; impl->end_pos += len;
byte * newbuf = realloc(impl->buf, impl->end_pos - impl->start_pos); size_t newlen = impl->end_pos - impl->start_pos;
byte * newbuf;
if(newlen == 0) {
free(impl->buf);
newbuf = malloc(0);
} else {
newbuf = realloc(impl->buf, impl->end_pos - impl->start_pos);
}
if(newbuf) { if(newbuf) {
impl->buf = newbuf; impl->buf = newbuf;
@ -109,6 +116,7 @@ static stasis_write_buffer_t * mem_append_buffer(stasis_handle_t * h,
ret->impl = 0; ret->impl = 0;
ret->error = 0; ret->error = 0;
} else { } else {
// if we requested a zero length buffer, this is OK.
ret->h = h; ret->h = h;
ret->off = 0; ret->off = 0;
ret->buf = 0; ret->buf = 0;
@ -242,7 +250,7 @@ struct stasis_handle_t mem_func = {
.error = 0 .error = 0
}; };
stasis_handle_t * stasis_handle(open_memory)() { stasis_handle_t * stasis_handle(open_memory)(lsn_t start_offset) {
stasis_handle_t * ret = malloc(sizeof(stasis_handle_t)); stasis_handle_t * ret = malloc(sizeof(stasis_handle_t));
if(!ret) { return NULL; } if(!ret) { return NULL; }
*ret = mem_func; *ret = mem_func;
@ -250,8 +258,8 @@ stasis_handle_t * stasis_handle(open_memory)() {
mem_impl * impl = malloc(sizeof(mem_impl)); mem_impl * impl = malloc(sizeof(mem_impl));
ret->impl = impl; ret->impl = impl;
pthread_mutex_init(&(impl->mut), 0); pthread_mutex_init(&(impl->mut), 0);
impl->start_pos = 0; impl->start_pos = start_offset;
impl->end_pos = 0; impl->end_pos = start_offset;
impl->buf = malloc(0); impl->buf = malloc(0);
return ret; return ret;

341
src/lladd/io/non_blocking.c Normal file
View file

@ -0,0 +1,341 @@
#include <lladd/io/handle.h>
#include <lladd/redblack.h>
#include <pthread.h>
#include <errno.h>
#include <stdlib.h>
#include <assert.h>
#include <stdio.h>
/*
Design:
data structures: A rb tree holds a set of "fast" handles that manage
disjoint regions. Each "fast" handle corresponds to an outstanding
write. Worker threads then flush "fast" handle contents into the
"slow" handle. Reads are serviced from the fast handles, and the
slow handle is used to fill any holes that exist within the read
range. (This implementation resorts to copies when necessary... it
is possible for a read or write to block on a memcpy(), but writes
may not block on disk i/o.)
*/
static inline stasis_read_buffer_t * alloc_read_buffer_error(stasis_handle_t * h, int error) {
assert(error);
stasis_read_buffer_t * r = malloc(sizeof(stasis_read_buffer_t));
r->h = h;
r->buf = 0;
r->len = 0;
r->impl = 0;
r->error = error;
return r;
}
static inline stasis_write_buffer_t * alloc_write_buffer_error(stasis_handle_t * h, int error) {
assert(error);
stasis_write_buffer_t * w = malloc(sizeof(stasis_write_buffer_t));
w->h = h;
w->off = 0;
w->buf = 0;
w->len = 0;
w->impl = 0;
w->error = error;
return w;
}
typedef struct nbw_thread {
pthread_t thread;
// Other stuff?
} nbw_thread;
typedef struct tree_node {
lsn_t start_pos;
lsn_t end_pos;
stasis_handle_t * h;
} tree_node;
static int cmp_handle(const void * ap, const void * bp, const void * ignored) {
tree_node * a = (tree_node*)ap;
tree_node * b = (tree_node*)bp;
// @todo It would probably be faster to malloc a wrapper and cache
// these values, instead of invoking 4 virtual methods each time we visit
// a tree node.
if(a->end_pos <= b->start_pos) {
return -1;
} else if(a->start_pos >= b->end_pos) {
return 1;
} else {
return 0;
}
}
typedef struct nbw_impl {
pthread_mutex_t mut;
stasis_handle_t * slow;
stasis_handle_t * (*fast_factory)(lsn_t off, lsn_t len, void * arg);
void * fast_factory_arg;
struct RB_ENTRY(tree) * fast_handles;
nbw_thread * workers;
int worker_count;
// The rest of the values are caches maintained for efficiency.
/* int num_copies;
int num_copies_buffer; */
lsn_t start_pos;
lsn_t end_pos;
} nbw_impl;
static inline stasis_handle_t * getFastHandle(nbw_impl * impl, lsn_t off, lsn_t len, int allocIfMissing) {
tree_node * np = malloc(sizeof(tree_node));
np->start_pos = off;
np->end_pos = off+len;
if(allocIfMissing) {
const tree_node * n = RB_ENTRY(search)(np, impl->fast_handles);
if(n != np) {
// The node was already in the tree
assert(np->start_pos == n->start_pos && np->end_pos == n->end_pos);
free(np);
} else {
// The node wasn't in the tree and search inserted it for us.
// Assignment is safe because we're writing to a field of n that is ignored by cmp.
((tree_node*)n)->h = impl->fast_factory(off, len, impl->fast_factory_arg);
}
return n->h;
} else {
const tree_node * n = RB_ENTRY(find)(np, impl->fast_handles);
free(np);
if(!n) {
return 0;
} else {
return n->h;
}
}
}
static int nbw_num_copies(stasis_handle_t * h) {
nbw_impl * impl = (nbw_impl*) h->impl;
int slow_num_copies = impl->slow->num_copies(impl->slow);
stasis_handle_t * fast = impl->fast_factory(0, 0, impl->fast_factory_arg);
int fast_num_copies = fast->num_copies(fast);
fast->close(fast);
return slow_num_copies > fast_num_copies ? slow_num_copies : fast_num_copies;
}
static int nbw_num_copies_buffer(stasis_handle_t * h) {
nbw_impl * impl = (nbw_impl*) h->impl;
int slow_num_copies_buffer = impl->slow->num_copies_buffer(impl->slow);
stasis_handle_t * fast = impl->fast_factory(0, 0, impl->fast_factory_arg);
int fast_num_copies_buffer = fast->num_copies_buffer(fast);
fast->close(fast);
return slow_num_copies_buffer > fast_num_copies_buffer ? slow_num_copies_buffer : fast_num_copies_buffer;
}
static int nbw_close(stasis_handle_t * h) {
printf("Warning: nbw_close leaks fast handles, and never flushes them to the slow handle...\n");
nbw_impl * impl = h->impl;
impl->slow->close(impl->slow);
// foreach fast handle .. close fast handle...
free(h->impl);
free(h);
return 0;
}
static lsn_t nbw_start_position(stasis_handle_t *h) {
nbw_impl * impl = (nbw_impl*) h->impl;
pthread_mutex_lock(&(impl->mut));
lsn_t ret = impl->start_pos;
pthread_mutex_unlock(&(impl->mut));
return ret;
}
static lsn_t nbw_end_position(stasis_handle_t *h) {
nbw_impl * impl = (nbw_impl*) h->impl;
pthread_mutex_lock(&(impl->mut));
lsn_t ret = impl->start_pos;
pthread_mutex_unlock(&(impl->mut));
return ret;
}
static stasis_write_buffer_t * nbw_write_buffer(stasis_handle_t * h,
lsn_t off, lsn_t len) {
nbw_impl * impl = (nbw_impl*) h->impl;
int error = 0;
stasis_write_buffer_t * w = 0;
pthread_mutex_lock(&impl->mut);
if(off < impl->start_pos) {
error = EDOM;
} else {
if(off + len >= impl->end_pos) {
impl->end_pos = off + len;
}
stasis_handle_t * h = getFastHandle(impl, off, len, 1);
w = h->write_buffer(h, off, len);
}
pthread_mutex_unlock(&impl->mut);
if(!w) {
w = alloc_write_buffer_error(h, error);
}
return w;
}
static stasis_write_buffer_t * nbw_append_buffer(stasis_handle_t * h,
lsn_t len) {
nbw_impl * impl = (nbw_impl*) h->impl;
pthread_mutex_lock(&(impl->mut));
lsn_t off = impl->end_pos;
impl->end_pos = off+len;
stasis_handle_t * fast = getFastHandle(impl, off, len, 1);
stasis_write_buffer_t * w = fast->write_buffer(fast, off, len);
pthread_mutex_unlock(&(impl->mut));
return w;
}
static int nbw_release_write_buffer(stasis_write_buffer_t * w) {
if(w->error) {
free(w);
} else {
abort();
}
return 0;
}
static stasis_read_buffer_t * nbw_read_buffer(stasis_handle_t * h,
lsn_t off, lsn_t len) {
nbw_impl * impl = (nbw_impl*) h->impl;
pthread_mutex_lock(&impl->mut);
stasis_read_buffer_t * r = 0;
int error = 0;
if(off < impl->start_pos || off + len > impl->end_pos) {
error = EDOM;
} else {
stasis_handle_t * fast = getFastHandle(impl, off, len, 0);
if(fast) {
r = fast->read_buffer(fast, off, len);
} else {
r = impl->slow->read_buffer(impl->slow, off, len);
}
}
if(!r) {
r = alloc_read_buffer_error(h, error);
}
pthread_mutex_unlock(&impl->mut);
return r;
}
static int nbw_release_read_buffer(stasis_read_buffer_t * r) {
if(r->error) {
free(r);
} else {
abort();
}
return 0;
}
static int nbw_write(stasis_handle_t * h, lsn_t off,
const byte * dat, lsn_t len) {
nbw_impl * impl = (nbw_impl*) h->impl;
pthread_mutex_lock(&impl->mut);
int error = 0;
if(off < impl->start_pos) {
error = EDOM;
} else {
stasis_handle_t * fast = getFastHandle(impl, off, len, 1);
if(off + len > impl->end_pos) {
impl->end_pos = off + len;
}
error = fast->write(fast, off, dat, len);
}
pthread_mutex_unlock(&impl->mut);
return error;
}
static int nbw_append(stasis_handle_t * h, lsn_t * off,
const byte * dat, lsn_t len) {
nbw_impl * impl = (nbw_impl *) h->impl;
int error = 0;
pthread_mutex_lock(&impl->mut);
*off = impl->end_pos;
stasis_handle_t * fast = getFastHandle(impl, *off, len, 1);
impl->end_pos = *off + len;
error = fast->write(fast, *off, dat, len);
pthread_mutex_unlock(&impl->mut);
return error;
}
static int nbw_read(stasis_handle_t * h,
lsn_t off, byte * buf, lsn_t len) {
nbw_impl * impl = (nbw_impl *) h->impl;
int error = 0;
pthread_mutex_lock(&impl->mut);
// printf("got lock"); fflush(stdout);
if(off < impl->start_pos || off + len > impl->end_pos) {
error = EDOM;
// printf("error"); fflush(stdout);
} else {
// printf("getting handle"); fflush(stdout);
stasis_handle_t * fast = getFastHandle(impl, off, len, 0);
// printf("got handle"); fflush(stdout);
if(fast) {
// printf("fast"); fflush(stdout);
error = fast->read(fast, off, buf, len);
} else {
// printf("slow"); fflush(stdout);
error = impl->slow->read(impl->slow, off, buf, len);
}
// printf("done"); fflush(stdout);
}
pthread_mutex_unlock(&impl->mut);
return error;
}
static int nbw_truncate_start(stasis_handle_t * h, lsn_t new_start) {
nbw_impl * impl = h->impl;
int error = 0;
pthread_mutex_lock(&impl->mut);
if(new_start <= impl->end_pos && new_start > impl->start_pos) {
impl->start_pos = new_start;
} else {
error = EDOM;
}
pthread_mutex_unlock(&impl->mut);
return error;
}
struct stasis_handle_t nbw_func = {
.num_copies = nbw_num_copies,
.num_copies_buffer = nbw_num_copies_buffer,
.close = nbw_close,
.start_position = nbw_start_position,
.end_position = nbw_end_position,
.write = nbw_write,
.append = nbw_append,
.write_buffer = nbw_write_buffer,
.append_buffer = nbw_append_buffer,
.release_write_buffer = nbw_release_write_buffer,
.read = nbw_read,
.read_buffer = nbw_read_buffer,
.release_read_buffer = nbw_release_read_buffer,
.truncate_start = nbw_truncate_start,
.error = 0
};
stasis_handle_t * stasis_handle(open_non_blocking)(stasis_handle_t * slow,
stasis_handle_t * (*fast_factory)(lsn_t, lsn_t, void *),
void * fast_factory_arg) {
nbw_impl * impl = malloc(sizeof(nbw_impl));
pthread_mutex_init(&impl->mut, 0);
impl->slow = slow;
impl->fast_factory = fast_factory;
impl->fast_factory_arg = fast_factory_arg;
impl->fast_handles = RB_ENTRY(init)(cmp_handle, 0);
impl->workers = 0;
impl->worker_count = 0;
impl->start_pos = slow->start_position(slow);
impl->end_pos = slow->end_position(slow);
stasis_handle_t * ret = malloc(sizeof(stasis_handle_t));
*ret = nbw_func;
ret->impl = impl;
return ret;
}

View file

@ -68,6 +68,10 @@ void handle_smoketest(stasis_handle_t * h) {
const int three = 0x33333333; const int three = 0x33333333;
const int four = 0x44444444; const int four = 0x44444444;
lsn_t off;
h->append(h, &off, 0, 0);
assert(off == 0);
assert((!h->num_copies(h)) || (!h->num_copies_buffer(h))); assert((!h->num_copies(h)) || (!h->num_copies_buffer(h)));
assert(0 == h->start_position(h) || assert(0 == h->start_position(h) ||
@ -92,7 +96,6 @@ void handle_smoketest(stasis_handle_t * h) {
assert(! h->read(h, sizeof(int), (byte*)&two_read, sizeof(int))); assert(! h->read(h, sizeof(int), (byte*)&two_read, sizeof(int)));
assert(two == two_read); assert(two == two_read);
lsn_t off;
assert(! h->append(h, &off, (byte*)&three, sizeof(int))); assert(! h->append(h, &off, (byte*)&three, sizeof(int)));
w = h->append_buffer(h, sizeof(int)); w = h->append_buffer(h, sizeof(int));
@ -320,15 +323,15 @@ void handle_concurrencytest(stasis_handle_t * h) {
*/ */
START_TEST(io_memoryTest) { START_TEST(io_memoryTest) {
stasis_handle_t * h; stasis_handle_t * h;
h = stasis_handle(open_memory)(); h = stasis_handle(open_memory)(0);
// h = stasis_handle(open_debug)(h); // h = stasis_handle(open_debug)(h);
handle_smoketest(h); handle_smoketest(h);
h->close(h); h->close(h);
h = stasis_handle(open_memory)(); h = stasis_handle(open_memory)(0);
// h = stasis_handle(open_debug)(h); // h = stasis_handle(open_debug)(h);
handle_sequentialtest(h); handle_sequentialtest(h);
h->close(h); h->close(h);
h = stasis_handle(open_memory)(); h = stasis_handle(open_memory)(0);
// h = stasis_handle(open_debug)(h); // h = stasis_handle(open_debug)(h);
handle_concurrencytest(h); handle_concurrencytest(h);
h->close(h); h->close(h);
@ -336,21 +339,21 @@ START_TEST(io_memoryTest) {
START_TEST(io_fileTest) { START_TEST(io_fileTest) {
stasis_handle_t * h; stasis_handle_t * h;
h = stasis_handle(open_file)("logfile.txt", O_CREAT | O_RDWR, FILE_PERM); h = stasis_handle(open_file)(0, "logfile.txt", O_CREAT | O_RDWR, FILE_PERM);
// h = stasis_handle(open_debug)(h); // h = stasis_handle(open_debug)(h);
handle_smoketest(h); handle_smoketest(h);
h->close(h); h->close(h);
unlink("logfile.txt"); unlink("logfile.txt");
h = stasis_handle(open_file)("logfile.txt", O_CREAT | O_RDWR, FILE_PERM); h = stasis_handle(open_file)(0, "logfile.txt", O_CREAT | O_RDWR, FILE_PERM);
//h = stasis_handle(open_debug)(h); //h = stasis_handle(open_debug)(h);
handle_sequentialtest(h); handle_sequentialtest(h);
h->close(h); h->close(h);
unlink("logfile.txt"); unlink("logfile.txt");
h = stasis_handle(open_file)("logfile.txt", O_CREAT | O_RDWR, FILE_PERM); h = stasis_handle(open_file)(0, "logfile.txt", O_CREAT | O_RDWR, FILE_PERM);
handle_concurrencytest(h); handle_concurrencytest(h);
h->close(h); h->close(h);
@ -358,6 +361,47 @@ START_TEST(io_fileTest) {
} END_TEST } END_TEST
static stasis_handle_t * fast_factory(lsn_t off, lsn_t len, void * ignored) {
stasis_handle_t * h = stasis_handle(open_memory)(off);
// h = stasis_handle(open_debug)(h);
stasis_write_buffer_t * w = h->append_buffer(h, len);
w->h->release_write_buffer(w);
return h;
}
START_TEST(io_nonBlockingTest) {
stasis_handle_t * slow;
stasis_handle_t * h;
slow = stasis_handle(open_file)(0, "logfile.txt", O_CREAT | O_RDWR, FILE_PERM);
// slow = stasis_handle(open_debug)(slow);
h = stasis_handle(open_non_blocking)(slow, fast_factory, 0);
// h = stasis_handle(open_debug)(h);
handle_smoketest(h);
h->close(h);
unlink("logfile.txt");
slow = stasis_handle(open_file)(0, "logfile.txt", O_CREAT | O_RDWR, FILE_PERM);
//h = stasis_handle(open_debug)(h);
h = stasis_handle(open_non_blocking)(slow, fast_factory, 0);
handle_sequentialtest(h);
h->close(h);
unlink("logfile.txt");
slow = stasis_handle(open_file)(0, "logfile.txt", O_CREAT | O_RDWR, FILE_PERM);
h = stasis_handle(open_non_blocking)(slow, fast_factory, 0);
handle_concurrencytest(h);
h->close(h);
unlink("logfile.txt");
} END_TEST
/** /**
Add suite declarations here Add suite declarations here
*/ */
@ -368,9 +412,9 @@ Suite * check_suite(void) {
tcase_set_timeout(tc, 600); // ten minute timeout tcase_set_timeout(tc, 600); // ten minute timeout
/* Sub tests are added, one per line, here */ /* Sub tests are added, one per line, here */
// tcase_add_test(tc, io_memoryTest); tcase_add_test(tc, io_memoryTest);
tcase_add_test(tc, io_fileTest); tcase_add_test(tc, io_fileTest);
tcase_add_test(tc, io_nonBlockingTest);
/* --------------------------------------------- */ /* --------------------------------------------- */
tcase_add_checked_fixture(tc, setup, teardown); tcase_add_checked_fixture(tc, setup, teardown);
suite_add_tcase(s, tc); suite_add_tcase(s, tc);