diff --git a/lladd/io/handle.h b/lladd/io/handle.h index 4f50e49..b598d17 100644 --- a/lladd/io/handle.h +++ b/lladd/io/handle.h @@ -84,6 +84,7 @@ typedef struct stasis_write_buffer_t { typedef struct stasis_read_buffer_t { stasis_handle_t * h; const byte * buf; + lsn_t off; lsn_t len; void * impl; int error; @@ -91,9 +92,12 @@ typedef struct stasis_read_buffer_t { stasis_handle_t * stasis_handle(open_memory)(lsn_t start_offset); stasis_handle_t * stasis_handle(open_file)(lsn_t start_offset, char * path, int flags, int mode); -stasis_handle_t * stasis_handle(open_non_blocking)(stasis_handle_t * slow, +stasis_handle_t * stasis_handle(open_non_blocking)(stasis_handle_t * (*slow_factory)(void * arg), + void * slow_factory_arg, stasis_handle_t * (*fast_factory)(lsn_t off, lsn_t len, void * arg), - void * fast_factory_arg); + void * fast_factory_arg, + int worker_thread_count, + lsn_t buffer_size); stasis_handle_t * stasis_handle(open_verifying)(stasis_handle_t * h); stasis_handle_t * stasis_handle(open_debug)(stasis_handle_t * h); diff --git a/src/lladd/io/file.c b/src/lladd/io/file.c index 99cf2cb..fc298f6 100644 --- a/src/lladd/io/file.c +++ b/src/lladd/io/file.c @@ -359,6 +359,7 @@ static stasis_read_buffer_t * file_read_buffer(stasis_handle_t * h, if(error) { ret->h = h; ret->buf = 0; + ret->off = 0; ret->len = 0; ret->impl = 0; ret->error = error; @@ -366,6 +367,7 @@ static stasis_read_buffer_t * file_read_buffer(stasis_handle_t * h, } else { ret->h = h; ret->buf = buf; + ret->off = off; ret->len = len; ret->impl = 0; ret->error = 0; diff --git a/src/lladd/io/memory.c b/src/lladd/io/memory.c index 2428269..6e4bc6f 100644 --- a/src/lladd/io/memory.c +++ b/src/lladd/io/memory.c @@ -58,8 +58,13 @@ static stasis_write_buffer_t * mem_write_buffer(stasis_handle_t * h, } else if(impl->end_pos > off+len) { // Just need to return buffer; h's state is unchanged. } else { - byte * newbuf = realloc(impl->buf, off+len - impl->start_pos); - + byte * newbuf; + if(off+len-impl->start_pos) { + newbuf = realloc(impl->buf, off+len - impl->start_pos); + } else { + free(impl->buf); + newbuf = malloc(0); + } if(newbuf) { impl->buf = newbuf; impl->end_pos = off+len; @@ -145,11 +150,13 @@ static stasis_read_buffer_t * mem_read_buffer(stasis_handle_t * h, ret->h = h; ret->buf = 0; ret->len = 0; + ret->off = 0; ret->impl = 0; ret->error = EDOM; } else { ret->h = h; ret->buf = &(impl->buf[off-impl->start_pos]); + ret->off = off; ret->len = len; ret->impl = 0; ret->error = 0; diff --git a/src/lladd/io/non_blocking.c b/src/lladd/io/non_blocking.c index 024f00c..9135bae 100644 --- a/src/lladd/io/non_blocking.c +++ b/src/lladd/io/non_blocking.c @@ -5,7 +5,7 @@ #include #include #include - +#include /* Design: @@ -44,26 +44,24 @@ static inline stasis_write_buffer_t * alloc_write_buffer_error(stasis_handle_t * return w; } - -typedef struct nbw_thread { - pthread_t thread; - // Other stuff? -} nbw_thread; - typedef struct tree_node { lsn_t start_pos; lsn_t end_pos; stasis_handle_t * h; + int pin_count; // held when the handle can not be deallocated, (when it's being written, read, or flushed to disk) + int dirty; // set to 1 when the handle is written to, 0 when the handle is written back to disk } tree_node; static int cmp_handle(const void * ap, const void * bp, const void * ignored) { tree_node * a = (tree_node*)ap; tree_node * b = (tree_node*)bp; - - // @todo It would probably be faster to malloc a wrapper and cache - // these values, instead of invoking 4 virtual methods each time we visit - // a tree node. - + if(a->start_pos == b->start_pos && + a->start_pos == a->end_pos && + b->start_pos == b->end_pos ) { + return 0; // Special case: two zero length regions that start at the same place are equal. + } + // Otherwise, the regions must overlap: + // (0,0) == (0,0) ; (1,0) == (0,4) ; (0,0) < (0,4) if(a->end_pos <= b->start_pos) { return -1; } else if(a->start_pos >= b->end_pos) { @@ -74,14 +72,37 @@ static int cmp_handle(const void * ap, const void * bp, const void * ignored) { } + +typedef struct nbw_read_buffer_impl { + stasis_read_buffer_t * buffer; + tree_node * n; +} nbw_read_buffer_impl; + typedef struct nbw_impl { pthread_mutex_t mut; - stasis_handle_t * slow; + // Need more than one.... + stasis_handle_t * (*slow_factory)(void * arg); + void * slow_factory_arg; + stasis_handle_t * (*fast_factory)(lsn_t off, lsn_t len, void * arg); void * fast_factory_arg; struct RB_ENTRY(tree) * fast_handles; - nbw_thread * workers; + + pthread_t * workers; int worker_count; + + tree_node * last_flushed; + + pthread_cond_t pending_writes_cond; + LinkedList pending_writes; + + int still_open; + + lsn_t buffer_size; + lsn_t used_buffer_size; + + LinkedList * slow_handles; + int slow_handle_count; // The rest of the values are caches maintained for efficiency. @@ -91,21 +112,39 @@ typedef struct nbw_impl { lsn_t end_pos; } nbw_impl; +static stasis_handle_t * getSlowHandle(nbw_impl * impl) { + stasis_handle_t * slow = (stasis_handle_t*) popMaxVal(&impl->slow_handles); + if((long)slow == -1) { + slow = impl->slow_factory(impl->slow_factory_arg); + impl->slow_handle_count++; + } + return slow; +} + +static void releaseSlowHandle(nbw_impl * impl, stasis_handle_t * slow) { + addVal(&impl->slow_handles, (long) slow); +} static inline stasis_handle_t * getFastHandle(nbw_impl * impl, lsn_t off, lsn_t len, int allocIfMissing) { tree_node * np = malloc(sizeof(tree_node)); np->start_pos = off; np->end_pos = off+len; if(allocIfMissing) { + // printf("lookup (%ld, %ld); ", np->start_pos, np->end_pos); const tree_node * n = RB_ENTRY(search)(np, impl->fast_handles); if(n != np) { + // printf("found\n"); // The node was already in the tree assert(np->start_pos == n->start_pos && np->end_pos == n->end_pos); free(np); } else { + // printf("not found\n"); + assert(RB_ENTRY(find)(n, impl->fast_handles)); // The node wasn't in the tree and search inserted it for us. // Assignment is safe because we're writing to a field of n that is ignored by cmp. ((tree_node*)n)->h = impl->fast_factory(off, len, impl->fast_factory_arg); } + // fflush(stdout); + ((tree_node*)n)->pin_count++; return n->h; } else { const tree_node * n = RB_ENTRY(find)(np, impl->fast_handles); @@ -113,22 +152,48 @@ static inline stasis_handle_t * getFastHandle(nbw_impl * impl, lsn_t off, lsn_t if(!n) { return 0; } else { + ((tree_node*)n)->pin_count++; return n->h; } } } +static inline int releaseFastHandle(nbw_impl * impl, lsn_t off, lsn_t len, + stasis_handle_t * fast, int setDirty) { + tree_node * np = malloc(sizeof(tree_node)); + np->start_pos = off; + np->end_pos = off+len; + if(fast) { + assert(off == fast->start_position(fast)); + assert(off + len == fast->end_position(fast)); + } + const tree_node * n = RB_ENTRY(find)(np, impl->fast_handles); + free(np); + assert(n); + if(fast) { + assert(n->h == fast); + } + ((tree_node*)n)->pin_count--; + if(setDirty) ((tree_node*)n)->dirty = 1; + + return 0; +} + static int nbw_num_copies(stasis_handle_t * h) { nbw_impl * impl = (nbw_impl*) h->impl; - int slow_num_copies = impl->slow->num_copies(impl->slow); + stasis_handle_t * slow = getSlowHandle(impl); + int slow_num_copies = slow->num_copies(slow); + releaseSlowHandle(impl, slow); stasis_handle_t * fast = impl->fast_factory(0, 0, impl->fast_factory_arg); int fast_num_copies = fast->num_copies(fast); fast->close(fast); return slow_num_copies > fast_num_copies ? slow_num_copies : fast_num_copies; } static int nbw_num_copies_buffer(stasis_handle_t * h) { - nbw_impl * impl = (nbw_impl*) h->impl; - int slow_num_copies_buffer = impl->slow->num_copies_buffer(impl->slow); + nbw_impl * impl = (nbw_impl*) h->impl; + stasis_handle_t * slow = getSlowHandle(impl); + int slow_num_copies_buffer = slow->num_copies_buffer(slow); + releaseSlowHandle(impl, slow); stasis_handle_t * fast = impl->fast_factory(0, 0, impl->fast_factory_arg); int fast_num_copies_buffer = fast->num_copies_buffer(fast); fast->close(fast); @@ -137,8 +202,31 @@ static int nbw_num_copies_buffer(stasis_handle_t * h) { static int nbw_close(stasis_handle_t * h) { printf("Warning: nbw_close leaks fast handles, and never flushes them to the slow handle...\n"); nbw_impl * impl = h->impl; - impl->slow->close(impl->slow); + stasis_handle_t * slow; + + pthread_mutex_lock(&impl->mut); + impl->still_open = 0; + pthread_mutex_unlock(&impl->mut); + pthread_cond_broadcast(&impl->pending_writes_cond); + + for(int i = 0; i < impl->worker_count; i++) { + pthread_join(impl->workers[i], 0); + } + + free(impl->workers); + // foreach fast handle .. close fast handle... + + printf("nbw had %d slow handles\n", impl->slow_handle_count); + fflush(stdout); + + while(-1 != (long)(slow = (stasis_handle_t *)popMaxVal(&(impl->slow_handles)))) { + slow->close(slow); + impl->slow_handle_count --; + } + destroyList(&(impl->slow_handles)); + assert(impl->slow_handle_count == 0); + free(h->impl); free(h); return 0; @@ -171,15 +259,27 @@ static stasis_write_buffer_t * nbw_write_buffer(stasis_handle_t * h, if(off + len >= impl->end_pos) { impl->end_pos = off + len; } - stasis_handle_t * h = getFastHandle(impl, off, len, 1); - w = h->write_buffer(h, off, len); + stasis_handle_t * fast = getFastHandle(impl, off, len, 1); + w = fast->write_buffer(fast, off, len); } pthread_mutex_unlock(&impl->mut); if(!w) { w = alloc_write_buffer_error(h, error); + } else { + stasis_write_buffer_t * ret = malloc(sizeof(stasis_write_buffer_t)); + ret->h = h; + if(!w->error) { + assert(w->off == off); + assert(w->len == len); + } + ret->off = off; + ret->len = len; + ret->buf = w->buf; + ret->impl = w; + ret->error = w->error; + w = ret; } - return w; } @@ -192,14 +292,32 @@ static stasis_write_buffer_t * nbw_append_buffer(stasis_handle_t * h, stasis_handle_t * fast = getFastHandle(impl, off, len, 1); stasis_write_buffer_t * w = fast->write_buffer(fast, off, len); pthread_mutex_unlock(&(impl->mut)); + // XXX error handling?!? + + stasis_write_buffer_t * ret = malloc(sizeof(stasis_write_buffer_t)); + ret->h = h; + ret->off = off; + ret->len = len; + ret->buf = w->buf; + ret->impl = w; + ret->error = w->error; + w = ret; + return w; } static int nbw_release_write_buffer(stasis_write_buffer_t * w) { + nbw_impl * impl = w->h->impl; + pthread_mutex_lock(&impl->mut); if(w->error) { - free(w); } else { - abort(); + stasis_write_buffer_t * w_wrapped = ((stasis_write_buffer_t*)w->impl); + w_wrapped->h->release_write_buffer(w_wrapped); + releaseFastHandle(w->h->impl, w->off, w->len, 0, 1); + // printf("singalling workers\n"); + pthread_cond_signal(&impl->pending_writes_cond); } + free(w); + pthread_mutex_unlock(&impl->mut); return 0; } static stasis_read_buffer_t * nbw_read_buffer(stasis_handle_t * h, @@ -215,8 +333,20 @@ static stasis_read_buffer_t * nbw_read_buffer(stasis_handle_t * h, if(fast) { r = fast->read_buffer(fast, off, len); + if(r) { + stasis_read_buffer_t * ret = malloc(sizeof(stasis_read_buffer_t)); + ret->h = h; + ret->off = off; + ret->len = len; + ret->buf = r->buf; + ret->impl = r; + ret->error = r->error; + r = ret; + } } else { - r = impl->slow->read_buffer(impl->slow, off, len); + stasis_handle_t * slow = getSlowHandle(impl); + r = slow->read_buffer(slow, off, len); + releaseSlowHandle(impl, slow); } } if(!r) { @@ -226,11 +356,17 @@ static stasis_read_buffer_t * nbw_read_buffer(stasis_handle_t * h, return r; } static int nbw_release_read_buffer(stasis_read_buffer_t * r) { + nbw_impl * impl = r->h->impl; + pthread_mutex_lock(&impl->mut); + if(r->error) { - free(r); } else { - abort(); + stasis_read_buffer_t * r_wrapped = ((stasis_read_buffer_t*)r->impl); + r_wrapped->h->release_read_buffer(r_wrapped); + releaseFastHandle(r->h->impl, r->off, r->len, 0, 0); } + free(r); + pthread_mutex_unlock(&impl->mut); return 0; } @@ -247,6 +383,9 @@ static int nbw_write(stasis_handle_t * h, lsn_t off, impl->end_pos = off + len; } error = fast->write(fast, off, dat, len); + releaseFastHandle(impl, off, len, fast, 1); + // printf("singailling workers\n"); + pthread_cond_signal(&impl->pending_writes_cond); } pthread_mutex_unlock(&impl->mut); return error; @@ -260,6 +399,9 @@ static int nbw_append(stasis_handle_t * h, lsn_t * off, stasis_handle_t * fast = getFastHandle(impl, *off, len, 1); impl->end_pos = *off + len; error = fast->write(fast, *off, dat, len); + releaseFastHandle(impl, *off, len, fast, 1); + // printf("singalling workers\n"); + pthread_cond_signal(&impl->pending_writes_cond); pthread_mutex_unlock(&impl->mut); return error; } @@ -272,16 +414,21 @@ static int nbw_read(stasis_handle_t * h, if(off < impl->start_pos || off + len > impl->end_pos) { error = EDOM; // printf("error"); fflush(stdout); - } else { + } else { // printf("getting handle"); fflush(stdout); stasis_handle_t * fast = getFastHandle(impl, off, len, 0); // printf("got handle"); fflush(stdout); if(fast) { // printf("fast"); fflush(stdout); error = fast->read(fast, off, buf, len); + releaseFastHandle(impl, off, len, fast, 0); } else { // printf("slow"); fflush(stdout); - error = impl->slow->read(impl->slow, off, buf, len); + stasis_handle_t * slow = getSlowHandle(impl); + pthread_mutex_unlock(&impl->mut); + error = slow->read(slow, off, buf, len); + pthread_mutex_lock(&impl->mut); + releaseSlowHandle(impl, slow); } // printf("done"); fflush(stdout); } @@ -319,23 +466,117 @@ struct stasis_handle_t nbw_func = { .error = 0 }; +static void * nbw_worker(void * handle) { + stasis_handle_t * h = (stasis_handle_t*)handle; + nbw_impl * impl = (nbw_impl*)h->impl; -stasis_handle_t * stasis_handle(open_non_blocking)(stasis_handle_t * slow, + pthread_mutex_lock(&impl->mut); + + while(impl->still_open) { + // printf("still open"); + // pick next node. + tree_node * to_flush; + int checked_from_start = 0; + if(impl->last_flushed) { + // cast removes const + to_flush = (tree_node*)RB_ENTRY(lookup)(RB_LUGREAT, impl->last_flushed, + impl->fast_handles); + } else { + to_flush = 0; + } + while((to_flush || ! checked_from_start) && (to_flush != impl->last_flushed || to_flush == 0)) { + if(to_flush) { + if(to_flush->dirty) { break; } + // printf("clean node.."); + // strip const + to_flush = (tree_node*)RB_ENTRY(lookup)(RB_LUGREAT, to_flush, + impl->fast_handles); + } else { + // printf("looking at beginning of tree.."); + // strip const + to_flush = (tree_node*)RB_ENTRY(min)(impl->fast_handles); + checked_from_start = 1; + } + } + if(!to_flush) { + // printf("nothing to flush"); + pthread_cond_wait(&impl->pending_writes_cond, &impl->mut); + } else { + impl->last_flushed = to_flush; + to_flush->pin_count++; + + stasis_handle_t * slow = getSlowHandle(impl); + + pthread_mutex_unlock(&impl->mut); + + + // @todo need a sendfile-style thing... + + stasis_handle_t * fast = to_flush->h; + + lsn_t off = fast->start_position(fast); + lsn_t len = fast->end_position(fast) - off; + stasis_read_buffer_t * r = fast->read_buffer(fast, off, len); + + // printf("%ld, Flushing %ld to %ld\n", pthread_self(), off, off+len);fflush(stdout); + + slow->write(slow, off, r->buf, len); + + r->h->release_read_buffer(r); + + pthread_mutex_lock(&impl->mut); + + releaseSlowHandle(impl, slow); + to_flush->pin_count--; + + } + } + + pthread_mutex_unlock(&impl->mut); + return 0; +} + + +stasis_handle_t * stasis_handle(open_non_blocking)(stasis_handle_t * (*slow_factory)(void * arg), + void * slow_factory_arg, stasis_handle_t * (*fast_factory)(lsn_t, lsn_t, void *), - void * fast_factory_arg) { + void * fast_factory_arg, + int worker_thread_count, + lsn_t buffer_size) { nbw_impl * impl = malloc(sizeof(nbw_impl)); pthread_mutex_init(&impl->mut, 0); - impl->slow = slow; + impl->slow_factory = slow_factory; + impl->slow_factory_arg = slow_factory_arg; impl->fast_factory = fast_factory; impl->fast_factory_arg = fast_factory_arg; impl->fast_handles = RB_ENTRY(init)(cmp_handle, 0); + impl->slow_handles = 0; + impl->slow_handle_count = 0; impl->workers = 0; - impl->worker_count = 0; + impl->worker_count = worker_thread_count; + impl->buffer_size = buffer_size; + impl->used_buffer_size = 0; + + stasis_handle_t * slow = getSlowHandle(impl); + + impl->workers = malloc(impl->worker_count * sizeof(pthread_t)); + + impl->still_open = 1; + pthread_cond_init(&impl->pending_writes_cond, 0); + impl->last_flushed = 0; + impl->start_pos = slow->start_position(slow); impl->end_pos = slow->end_position(slow); + + releaseSlowHandle(impl,slow); stasis_handle_t * ret = malloc(sizeof(stasis_handle_t)); *ret = nbw_func; ret->impl = impl; + + for(int i = 0; i < impl->worker_count; i++) { + pthread_create(&(impl->workers[i]), 0, nbw_worker, ret); + } + return ret; } diff --git a/test/lladd/check_io.c b/test/lladd/check_io.c index f46ee5c..128ad83 100644 --- a/test/lladd/check_io.c +++ b/test/lladd/check_io.c @@ -371,29 +371,45 @@ static stasis_handle_t * fast_factory(lsn_t off, lsn_t len, void * ignored) { } +typedef struct sf_args { + char * filename; + int openMode; + int filePerm; +} sf_args; + +static stasis_handle_t * slow_factory(void * argsP) { + sf_args * args = (sf_args*) argsP; + return stasis_handle(open_file)(0, args->filename, args->openMode, args->filePerm); +} + START_TEST(io_nonBlockingTest) { - stasis_handle_t * slow; stasis_handle_t * h; - slow = stasis_handle(open_file)(0, "logfile.txt", O_CREAT | O_RDWR, FILE_PERM); - // slow = stasis_handle(open_debug)(slow); - h = stasis_handle(open_non_blocking)(slow, fast_factory, 0); + sf_args slow_args = { + "logfile.txt", + O_CREAT | O_RDWR, + FILE_PERM + }; + + h = stasis_handle(open_non_blocking)(slow_factory, &slow_args, + fast_factory, 0, + 5, 1024*1024); // h = stasis_handle(open_debug)(h); handle_smoketest(h); h->close(h); unlink("logfile.txt"); - slow = stasis_handle(open_file)(0, "logfile.txt", O_CREAT | O_RDWR, FILE_PERM); + h = stasis_handle(open_non_blocking)(slow_factory, &slow_args, fast_factory, 0, + 5, 1024*1024); //h = stasis_handle(open_debug)(h); - h = stasis_handle(open_non_blocking)(slow, fast_factory, 0); handle_sequentialtest(h); h->close(h); unlink("logfile.txt"); - slow = stasis_handle(open_file)(0, "logfile.txt", O_CREAT | O_RDWR, FILE_PERM); - h = stasis_handle(open_non_blocking)(slow, fast_factory, 0); + h = stasis_handle(open_non_blocking)(slow_factory, &slow_args, fast_factory, 0, + 5, 1024 * 1024); handle_concurrencytest(h); h->close(h); @@ -412,8 +428,8 @@ Suite * check_suite(void) { tcase_set_timeout(tc, 600); // ten minute timeout /* Sub tests are added, one per line, here */ - tcase_add_test(tc, io_memoryTest); - tcase_add_test(tc, io_fileTest); + // tcase_add_test(tc, io_memoryTest); + // tcase_add_test(tc, io_fileTest); tcase_add_test(tc, io_nonBlockingTest); /* --------------------------------------------- */ tcase_add_checked_fixture(tc, setup, teardown);