non_blocking is closer to working (it now flushes writes out in the background). This required some api changes. It still passes unit tests, but it's losing data on close()...

This commit is contained in:
Sears Russell 2006-10-28 03:33:02 +00:00
parent bb9e339823
commit 8481e9cdd6
5 changed files with 316 additions and 46 deletions

View file

@ -84,6 +84,7 @@ typedef struct stasis_write_buffer_t {
typedef struct stasis_read_buffer_t {
stasis_handle_t * h;
const byte * buf;
lsn_t off;
lsn_t len;
void * impl;
int error;
@ -91,9 +92,12 @@ typedef struct stasis_read_buffer_t {
stasis_handle_t * stasis_handle(open_memory)(lsn_t start_offset);
stasis_handle_t * stasis_handle(open_file)(lsn_t start_offset, char * path, int flags, int mode);
stasis_handle_t * stasis_handle(open_non_blocking)(stasis_handle_t * slow,
stasis_handle_t * stasis_handle(open_non_blocking)(stasis_handle_t * (*slow_factory)(void * arg),
void * slow_factory_arg,
stasis_handle_t * (*fast_factory)(lsn_t off, lsn_t len, void * arg),
void * fast_factory_arg);
void * fast_factory_arg,
int worker_thread_count,
lsn_t buffer_size);
stasis_handle_t * stasis_handle(open_verifying)(stasis_handle_t * h);
stasis_handle_t * stasis_handle(open_debug)(stasis_handle_t * h);

View file

@ -359,6 +359,7 @@ static stasis_read_buffer_t * file_read_buffer(stasis_handle_t * h,
if(error) {
ret->h = h;
ret->buf = 0;
ret->off = 0;
ret->len = 0;
ret->impl = 0;
ret->error = error;
@ -366,6 +367,7 @@ static stasis_read_buffer_t * file_read_buffer(stasis_handle_t * h,
} else {
ret->h = h;
ret->buf = buf;
ret->off = off;
ret->len = len;
ret->impl = 0;
ret->error = 0;

View file

@ -58,8 +58,13 @@ static stasis_write_buffer_t * mem_write_buffer(stasis_handle_t * h,
} else if(impl->end_pos > off+len) {
// Just need to return buffer; h's state is unchanged.
} else {
byte * newbuf = realloc(impl->buf, off+len - impl->start_pos);
byte * newbuf;
if(off+len-impl->start_pos) {
newbuf = realloc(impl->buf, off+len - impl->start_pos);
} else {
free(impl->buf);
newbuf = malloc(0);
}
if(newbuf) {
impl->buf = newbuf;
impl->end_pos = off+len;
@ -145,11 +150,13 @@ static stasis_read_buffer_t * mem_read_buffer(stasis_handle_t * h,
ret->h = h;
ret->buf = 0;
ret->len = 0;
ret->off = 0;
ret->impl = 0;
ret->error = EDOM;
} else {
ret->h = h;
ret->buf = &(impl->buf[off-impl->start_pos]);
ret->off = off;
ret->len = len;
ret->impl = 0;
ret->error = 0;

View file

@ -5,7 +5,7 @@
#include <stdlib.h>
#include <assert.h>
#include <stdio.h>
#include <linkedlist.h>
/*
Design:
@ -44,26 +44,24 @@ static inline stasis_write_buffer_t * alloc_write_buffer_error(stasis_handle_t *
return w;
}
typedef struct nbw_thread {
pthread_t thread;
// Other stuff?
} nbw_thread;
typedef struct tree_node {
lsn_t start_pos;
lsn_t end_pos;
stasis_handle_t * h;
int pin_count; // held when the handle can not be deallocated, (when it's being written, read, or flushed to disk)
int dirty; // set to 1 when the handle is written to, 0 when the handle is written back to disk
} tree_node;
static int cmp_handle(const void * ap, const void * bp, const void * ignored) {
tree_node * a = (tree_node*)ap;
tree_node * b = (tree_node*)bp;
// @todo It would probably be faster to malloc a wrapper and cache
// these values, instead of invoking 4 virtual methods each time we visit
// a tree node.
if(a->start_pos == b->start_pos &&
a->start_pos == a->end_pos &&
b->start_pos == b->end_pos ) {
return 0; // Special case: two zero length regions that start at the same place are equal.
}
// Otherwise, the regions must overlap:
// (0,0) == (0,0) ; (1,0) == (0,4) ; (0,0) < (0,4)
if(a->end_pos <= b->start_pos) {
return -1;
} else if(a->start_pos >= b->end_pos) {
@ -74,14 +72,37 @@ static int cmp_handle(const void * ap, const void * bp, const void * ignored) {
}
typedef struct nbw_read_buffer_impl {
stasis_read_buffer_t * buffer;
tree_node * n;
} nbw_read_buffer_impl;
typedef struct nbw_impl {
pthread_mutex_t mut;
stasis_handle_t * slow;
// Need more than one....
stasis_handle_t * (*slow_factory)(void * arg);
void * slow_factory_arg;
stasis_handle_t * (*fast_factory)(lsn_t off, lsn_t len, void * arg);
void * fast_factory_arg;
struct RB_ENTRY(tree) * fast_handles;
nbw_thread * workers;
pthread_t * workers;
int worker_count;
tree_node * last_flushed;
pthread_cond_t pending_writes_cond;
LinkedList pending_writes;
int still_open;
lsn_t buffer_size;
lsn_t used_buffer_size;
LinkedList * slow_handles;
int slow_handle_count;
// The rest of the values are caches maintained for efficiency.
@ -91,21 +112,39 @@ typedef struct nbw_impl {
lsn_t end_pos;
} nbw_impl;
static stasis_handle_t * getSlowHandle(nbw_impl * impl) {
stasis_handle_t * slow = (stasis_handle_t*) popMaxVal(&impl->slow_handles);
if((long)slow == -1) {
slow = impl->slow_factory(impl->slow_factory_arg);
impl->slow_handle_count++;
}
return slow;
}
static void releaseSlowHandle(nbw_impl * impl, stasis_handle_t * slow) {
addVal(&impl->slow_handles, (long) slow);
}
static inline stasis_handle_t * getFastHandle(nbw_impl * impl, lsn_t off, lsn_t len, int allocIfMissing) {
tree_node * np = malloc(sizeof(tree_node));
np->start_pos = off;
np->end_pos = off+len;
if(allocIfMissing) {
// printf("lookup (%ld, %ld); ", np->start_pos, np->end_pos);
const tree_node * n = RB_ENTRY(search)(np, impl->fast_handles);
if(n != np) {
// printf("found\n");
// The node was already in the tree
assert(np->start_pos == n->start_pos && np->end_pos == n->end_pos);
free(np);
} else {
// printf("not found\n");
assert(RB_ENTRY(find)(n, impl->fast_handles));
// The node wasn't in the tree and search inserted it for us.
// Assignment is safe because we're writing to a field of n that is ignored by cmp.
((tree_node*)n)->h = impl->fast_factory(off, len, impl->fast_factory_arg);
}
// fflush(stdout);
((tree_node*)n)->pin_count++;
return n->h;
} else {
const tree_node * n = RB_ENTRY(find)(np, impl->fast_handles);
@ -113,22 +152,48 @@ static inline stasis_handle_t * getFastHandle(nbw_impl * impl, lsn_t off, lsn_t
if(!n) {
return 0;
} else {
((tree_node*)n)->pin_count++;
return n->h;
}
}
}
static inline int releaseFastHandle(nbw_impl * impl, lsn_t off, lsn_t len,
stasis_handle_t * fast, int setDirty) {
tree_node * np = malloc(sizeof(tree_node));
np->start_pos = off;
np->end_pos = off+len;
if(fast) {
assert(off == fast->start_position(fast));
assert(off + len == fast->end_position(fast));
}
const tree_node * n = RB_ENTRY(find)(np, impl->fast_handles);
free(np);
assert(n);
if(fast) {
assert(n->h == fast);
}
((tree_node*)n)->pin_count--;
if(setDirty) ((tree_node*)n)->dirty = 1;
return 0;
}
static int nbw_num_copies(stasis_handle_t * h) {
nbw_impl * impl = (nbw_impl*) h->impl;
int slow_num_copies = impl->slow->num_copies(impl->slow);
stasis_handle_t * slow = getSlowHandle(impl);
int slow_num_copies = slow->num_copies(slow);
releaseSlowHandle(impl, slow);
stasis_handle_t * fast = impl->fast_factory(0, 0, impl->fast_factory_arg);
int fast_num_copies = fast->num_copies(fast);
fast->close(fast);
return slow_num_copies > fast_num_copies ? slow_num_copies : fast_num_copies;
}
static int nbw_num_copies_buffer(stasis_handle_t * h) {
nbw_impl * impl = (nbw_impl*) h->impl;
int slow_num_copies_buffer = impl->slow->num_copies_buffer(impl->slow);
nbw_impl * impl = (nbw_impl*) h->impl;
stasis_handle_t * slow = getSlowHandle(impl);
int slow_num_copies_buffer = slow->num_copies_buffer(slow);
releaseSlowHandle(impl, slow);
stasis_handle_t * fast = impl->fast_factory(0, 0, impl->fast_factory_arg);
int fast_num_copies_buffer = fast->num_copies_buffer(fast);
fast->close(fast);
@ -137,8 +202,31 @@ static int nbw_num_copies_buffer(stasis_handle_t * h) {
static int nbw_close(stasis_handle_t * h) {
printf("Warning: nbw_close leaks fast handles, and never flushes them to the slow handle...\n");
nbw_impl * impl = h->impl;
impl->slow->close(impl->slow);
stasis_handle_t * slow;
pthread_mutex_lock(&impl->mut);
impl->still_open = 0;
pthread_mutex_unlock(&impl->mut);
pthread_cond_broadcast(&impl->pending_writes_cond);
for(int i = 0; i < impl->worker_count; i++) {
pthread_join(impl->workers[i], 0);
}
free(impl->workers);
// foreach fast handle .. close fast handle...
printf("nbw had %d slow handles\n", impl->slow_handle_count);
fflush(stdout);
while(-1 != (long)(slow = (stasis_handle_t *)popMaxVal(&(impl->slow_handles)))) {
slow->close(slow);
impl->slow_handle_count --;
}
destroyList(&(impl->slow_handles));
assert(impl->slow_handle_count == 0);
free(h->impl);
free(h);
return 0;
@ -171,15 +259,27 @@ static stasis_write_buffer_t * nbw_write_buffer(stasis_handle_t * h,
if(off + len >= impl->end_pos) {
impl->end_pos = off + len;
}
stasis_handle_t * h = getFastHandle(impl, off, len, 1);
w = h->write_buffer(h, off, len);
stasis_handle_t * fast = getFastHandle(impl, off, len, 1);
w = fast->write_buffer(fast, off, len);
}
pthread_mutex_unlock(&impl->mut);
if(!w) {
w = alloc_write_buffer_error(h, error);
} else {
stasis_write_buffer_t * ret = malloc(sizeof(stasis_write_buffer_t));
ret->h = h;
if(!w->error) {
assert(w->off == off);
assert(w->len == len);
}
ret->off = off;
ret->len = len;
ret->buf = w->buf;
ret->impl = w;
ret->error = w->error;
w = ret;
}
return w;
}
@ -192,14 +292,32 @@ static stasis_write_buffer_t * nbw_append_buffer(stasis_handle_t * h,
stasis_handle_t * fast = getFastHandle(impl, off, len, 1);
stasis_write_buffer_t * w = fast->write_buffer(fast, off, len);
pthread_mutex_unlock(&(impl->mut));
// XXX error handling?!?
stasis_write_buffer_t * ret = malloc(sizeof(stasis_write_buffer_t));
ret->h = h;
ret->off = off;
ret->len = len;
ret->buf = w->buf;
ret->impl = w;
ret->error = w->error;
w = ret;
return w;
}
static int nbw_release_write_buffer(stasis_write_buffer_t * w) {
nbw_impl * impl = w->h->impl;
pthread_mutex_lock(&impl->mut);
if(w->error) {
free(w);
} else {
abort();
stasis_write_buffer_t * w_wrapped = ((stasis_write_buffer_t*)w->impl);
w_wrapped->h->release_write_buffer(w_wrapped);
releaseFastHandle(w->h->impl, w->off, w->len, 0, 1);
// printf("singalling workers\n");
pthread_cond_signal(&impl->pending_writes_cond);
}
free(w);
pthread_mutex_unlock(&impl->mut);
return 0;
}
static stasis_read_buffer_t * nbw_read_buffer(stasis_handle_t * h,
@ -215,8 +333,20 @@ static stasis_read_buffer_t * nbw_read_buffer(stasis_handle_t * h,
if(fast) {
r = fast->read_buffer(fast, off, len);
if(r) {
stasis_read_buffer_t * ret = malloc(sizeof(stasis_read_buffer_t));
ret->h = h;
ret->off = off;
ret->len = len;
ret->buf = r->buf;
ret->impl = r;
ret->error = r->error;
r = ret;
}
} else {
r = impl->slow->read_buffer(impl->slow, off, len);
stasis_handle_t * slow = getSlowHandle(impl);
r = slow->read_buffer(slow, off, len);
releaseSlowHandle(impl, slow);
}
}
if(!r) {
@ -226,11 +356,17 @@ static stasis_read_buffer_t * nbw_read_buffer(stasis_handle_t * h,
return r;
}
static int nbw_release_read_buffer(stasis_read_buffer_t * r) {
nbw_impl * impl = r->h->impl;
pthread_mutex_lock(&impl->mut);
if(r->error) {
free(r);
} else {
abort();
stasis_read_buffer_t * r_wrapped = ((stasis_read_buffer_t*)r->impl);
r_wrapped->h->release_read_buffer(r_wrapped);
releaseFastHandle(r->h->impl, r->off, r->len, 0, 0);
}
free(r);
pthread_mutex_unlock(&impl->mut);
return 0;
}
@ -247,6 +383,9 @@ static int nbw_write(stasis_handle_t * h, lsn_t off,
impl->end_pos = off + len;
}
error = fast->write(fast, off, dat, len);
releaseFastHandle(impl, off, len, fast, 1);
// printf("singailling workers\n");
pthread_cond_signal(&impl->pending_writes_cond);
}
pthread_mutex_unlock(&impl->mut);
return error;
@ -260,6 +399,9 @@ static int nbw_append(stasis_handle_t * h, lsn_t * off,
stasis_handle_t * fast = getFastHandle(impl, *off, len, 1);
impl->end_pos = *off + len;
error = fast->write(fast, *off, dat, len);
releaseFastHandle(impl, *off, len, fast, 1);
// printf("singalling workers\n");
pthread_cond_signal(&impl->pending_writes_cond);
pthread_mutex_unlock(&impl->mut);
return error;
}
@ -272,16 +414,21 @@ static int nbw_read(stasis_handle_t * h,
if(off < impl->start_pos || off + len > impl->end_pos) {
error = EDOM;
// printf("error"); fflush(stdout);
} else {
} else {
// printf("getting handle"); fflush(stdout);
stasis_handle_t * fast = getFastHandle(impl, off, len, 0);
// printf("got handle"); fflush(stdout);
if(fast) {
// printf("fast"); fflush(stdout);
error = fast->read(fast, off, buf, len);
releaseFastHandle(impl, off, len, fast, 0);
} else {
// printf("slow"); fflush(stdout);
error = impl->slow->read(impl->slow, off, buf, len);
stasis_handle_t * slow = getSlowHandle(impl);
pthread_mutex_unlock(&impl->mut);
error = slow->read(slow, off, buf, len);
pthread_mutex_lock(&impl->mut);
releaseSlowHandle(impl, slow);
}
// printf("done"); fflush(stdout);
}
@ -319,23 +466,117 @@ struct stasis_handle_t nbw_func = {
.error = 0
};
static void * nbw_worker(void * handle) {
stasis_handle_t * h = (stasis_handle_t*)handle;
nbw_impl * impl = (nbw_impl*)h->impl;
stasis_handle_t * stasis_handle(open_non_blocking)(stasis_handle_t * slow,
pthread_mutex_lock(&impl->mut);
while(impl->still_open) {
// printf("still open");
// pick next node.
tree_node * to_flush;
int checked_from_start = 0;
if(impl->last_flushed) {
// cast removes const
to_flush = (tree_node*)RB_ENTRY(lookup)(RB_LUGREAT, impl->last_flushed,
impl->fast_handles);
} else {
to_flush = 0;
}
while((to_flush || ! checked_from_start) && (to_flush != impl->last_flushed || to_flush == 0)) {
if(to_flush) {
if(to_flush->dirty) { break; }
// printf("clean node..");
// strip const
to_flush = (tree_node*)RB_ENTRY(lookup)(RB_LUGREAT, to_flush,
impl->fast_handles);
} else {
// printf("looking at beginning of tree..");
// strip const
to_flush = (tree_node*)RB_ENTRY(min)(impl->fast_handles);
checked_from_start = 1;
}
}
if(!to_flush) {
// printf("nothing to flush");
pthread_cond_wait(&impl->pending_writes_cond, &impl->mut);
} else {
impl->last_flushed = to_flush;
to_flush->pin_count++;
stasis_handle_t * slow = getSlowHandle(impl);
pthread_mutex_unlock(&impl->mut);
// @todo need a sendfile-style thing...
stasis_handle_t * fast = to_flush->h;
lsn_t off = fast->start_position(fast);
lsn_t len = fast->end_position(fast) - off;
stasis_read_buffer_t * r = fast->read_buffer(fast, off, len);
// printf("%ld, Flushing %ld to %ld\n", pthread_self(), off, off+len);fflush(stdout);
slow->write(slow, off, r->buf, len);
r->h->release_read_buffer(r);
pthread_mutex_lock(&impl->mut);
releaseSlowHandle(impl, slow);
to_flush->pin_count--;
}
}
pthread_mutex_unlock(&impl->mut);
return 0;
}
stasis_handle_t * stasis_handle(open_non_blocking)(stasis_handle_t * (*slow_factory)(void * arg),
void * slow_factory_arg,
stasis_handle_t * (*fast_factory)(lsn_t, lsn_t, void *),
void * fast_factory_arg) {
void * fast_factory_arg,
int worker_thread_count,
lsn_t buffer_size) {
nbw_impl * impl = malloc(sizeof(nbw_impl));
pthread_mutex_init(&impl->mut, 0);
impl->slow = slow;
impl->slow_factory = slow_factory;
impl->slow_factory_arg = slow_factory_arg;
impl->fast_factory = fast_factory;
impl->fast_factory_arg = fast_factory_arg;
impl->fast_handles = RB_ENTRY(init)(cmp_handle, 0);
impl->slow_handles = 0;
impl->slow_handle_count = 0;
impl->workers = 0;
impl->worker_count = 0;
impl->worker_count = worker_thread_count;
impl->buffer_size = buffer_size;
impl->used_buffer_size = 0;
stasis_handle_t * slow = getSlowHandle(impl);
impl->workers = malloc(impl->worker_count * sizeof(pthread_t));
impl->still_open = 1;
pthread_cond_init(&impl->pending_writes_cond, 0);
impl->last_flushed = 0;
impl->start_pos = slow->start_position(slow);
impl->end_pos = slow->end_position(slow);
releaseSlowHandle(impl,slow);
stasis_handle_t * ret = malloc(sizeof(stasis_handle_t));
*ret = nbw_func;
ret->impl = impl;
for(int i = 0; i < impl->worker_count; i++) {
pthread_create(&(impl->workers[i]), 0, nbw_worker, ret);
}
return ret;
}

View file

@ -371,29 +371,45 @@ static stasis_handle_t * fast_factory(lsn_t off, lsn_t len, void * ignored) {
}
typedef struct sf_args {
char * filename;
int openMode;
int filePerm;
} sf_args;
static stasis_handle_t * slow_factory(void * argsP) {
sf_args * args = (sf_args*) argsP;
return stasis_handle(open_file)(0, args->filename, args->openMode, args->filePerm);
}
START_TEST(io_nonBlockingTest) {
stasis_handle_t * slow;
stasis_handle_t * h;
slow = stasis_handle(open_file)(0, "logfile.txt", O_CREAT | O_RDWR, FILE_PERM);
// slow = stasis_handle(open_debug)(slow);
h = stasis_handle(open_non_blocking)(slow, fast_factory, 0);
sf_args slow_args = {
"logfile.txt",
O_CREAT | O_RDWR,
FILE_PERM
};
h = stasis_handle(open_non_blocking)(slow_factory, &slow_args,
fast_factory, 0,
5, 1024*1024);
// h = stasis_handle(open_debug)(h);
handle_smoketest(h);
h->close(h);
unlink("logfile.txt");
slow = stasis_handle(open_file)(0, "logfile.txt", O_CREAT | O_RDWR, FILE_PERM);
h = stasis_handle(open_non_blocking)(slow_factory, &slow_args, fast_factory, 0,
5, 1024*1024);
//h = stasis_handle(open_debug)(h);
h = stasis_handle(open_non_blocking)(slow, fast_factory, 0);
handle_sequentialtest(h);
h->close(h);
unlink("logfile.txt");
slow = stasis_handle(open_file)(0, "logfile.txt", O_CREAT | O_RDWR, FILE_PERM);
h = stasis_handle(open_non_blocking)(slow, fast_factory, 0);
h = stasis_handle(open_non_blocking)(slow_factory, &slow_args, fast_factory, 0,
5, 1024 * 1024);
handle_concurrencytest(h);
h->close(h);
@ -412,8 +428,8 @@ Suite * check_suite(void) {
tcase_set_timeout(tc, 600); // ten minute timeout
/* Sub tests are added, one per line, here */
tcase_add_test(tc, io_memoryTest);
tcase_add_test(tc, io_fileTest);
// tcase_add_test(tc, io_memoryTest);
// tcase_add_test(tc, io_fileTest);
tcase_add_test(tc, io_nonBlockingTest);
/* --------------------------------------------- */
tcase_add_checked_fixture(tc, setup, teardown);