diff --git a/lladd/io/handle.h b/lladd/io/handle.h index b598d17..4cbe4b7 100644 --- a/lladd/io/handle.h +++ b/lladd/io/handle.h @@ -97,7 +97,7 @@ stasis_handle_t * stasis_handle(open_non_blocking)(stasis_handle_t * (*slow_fact stasis_handle_t * (*fast_factory)(lsn_t off, lsn_t len, void * arg), void * fast_factory_arg, int worker_thread_count, - lsn_t buffer_size); + lsn_t buffer_size, int max_writes); stasis_handle_t * stasis_handle(open_verifying)(stasis_handle_t * h); stasis_handle_t * stasis_handle(open_debug)(stasis_handle_t * h); diff --git a/src/lladd/io/non_blocking.c b/src/lladd/io/non_blocking.c index 9135bae..7260407 100644 --- a/src/lladd/io/non_blocking.c +++ b/src/lladd/io/non_blocking.c @@ -19,8 +19,21 @@ is possible for a read or write to block on a memcpy(), but writes may not block on disk i/o.) + Latching protocol: + + Each static function that manipulates the tree or lists grabs a + latch. Functions that call such functions should not hold a latch + when the function is called. If a function must atomically update + the handle's state (eg: append), they should oobtain the latch, + and release it before calling another function or returning. + + Exception: freeFastHandle should be called while holding the + latch. + */ +#define INVALID_NODE 2 + static inline stasis_read_buffer_t * alloc_read_buffer_error(stasis_handle_t * h, int error) { assert(error); stasis_read_buffer_t * r = malloc(sizeof(stasis_read_buffer_t)); @@ -43,25 +56,47 @@ static inline stasis_write_buffer_t * alloc_write_buffer_error(stasis_handle_t * w->error = error; return w; } - +/** Wraps stasis_handle_t so that it can be stored in an rbtree. */ typedef struct tree_node { lsn_t start_pos; lsn_t end_pos; stasis_handle_t * h; - int pin_count; // held when the handle can not be deallocated, (when it's being written, read, or flushed to disk) - int dirty; // set to 1 when the handle is written to, 0 when the handle is written back to disk + /** The number of threads accessing this handle. The handle cannot + be deallocated unless this is zero. */ + int pin_count; + /** set to 1 when the handle is written to, 0 when the handle is + written back to disk, INVALID_NODE when the handle is not in + the tree. */ + int dirty; } tree_node; +/** Wrapper for write buffers */ +typedef struct write_buffer_impl { + /** The tree node that contains this buffer */ + const tree_node * n; + /** The underlying buffer. */ + stasis_write_buffer_t * w; +} write_buffer_impl; +typedef struct read_buffer_impl { + /** The tree node that contains this buffer, or NULL if the buffer + is from a slow handle. */ + const tree_node * n; + /** The underlying buffer. */ + stasis_read_buffer_t * r; +} read_buffer_impl; + +/** + Compare two tree_node structs. Two tree nodes are equal if they + are zero length, and start at the same point, or if they overlap. + */ static int cmp_handle(const void * ap, const void * bp, const void * ignored) { tree_node * a = (tree_node*)ap; tree_node * b = (tree_node*)bp; if(a->start_pos == b->start_pos && a->start_pos == a->end_pos && b->start_pos == b->end_pos ) { - return 0; // Special case: two zero length regions that start at the same place are equal. + return 0; } - // Otherwise, the regions must overlap: - // (0,0) == (0,0) ; (1,0) == (0,4) ; (0,0) < (0,4) if(a->end_pos <= b->start_pos) { return -1; } else if(a->start_pos >= b->end_pos) { @@ -69,371 +104,333 @@ static int cmp_handle(const void * ap, const void * bp, const void * ignored) { } else { return 0; } - } - -typedef struct nbw_read_buffer_impl { - stasis_read_buffer_t * buffer; - tree_node * n; -} nbw_read_buffer_impl; - typedef struct nbw_impl { pthread_mutex_t mut; - // Need more than one.... + + // Handle state + lsn_t start_pos; + lsn_t end_pos; + + // Fields to manage slow handles stasis_handle_t * (*slow_factory)(void * arg); void * slow_factory_arg; - - stasis_handle_t * (*fast_factory)(lsn_t off, lsn_t len, void * arg); - void * fast_factory_arg; - struct RB_ENTRY(tree) * fast_handles; - - pthread_t * workers; - int worker_count; - - tree_node * last_flushed; - - pthread_cond_t pending_writes_cond; - LinkedList pending_writes; - - int still_open; - - lsn_t buffer_size; - lsn_t used_buffer_size; LinkedList * slow_handles; int slow_handle_count; + + // Fields to manage fast handles + stasis_handle_t * (*fast_factory)(lsn_t off, lsn_t len, void * arg); + void * fast_factory_arg; - // The rest of the values are caches maintained for efficiency. + struct RB_ENTRY(tree) * fast_handles; + int fast_handle_count; + int max_fast_handles; + lsn_t used_buffer_size; + lsn_t max_buffer_size; - /* int num_copies; - int num_copies_buffer; */ - lsn_t start_pos; - lsn_t end_pos; + // Fields to manage and signal worker threads + pthread_t * workers; + int worker_count; + pthread_cond_t pending_writes_cond; + int still_open; } nbw_impl; static stasis_handle_t * getSlowHandle(nbw_impl * impl) { - stasis_handle_t * slow = (stasis_handle_t*) popMaxVal(&impl->slow_handles); + pthread_mutex_lock(&impl->mut); + stasis_handle_t * slow = (stasis_handle_t*)popMaxVal(&impl->slow_handles); + assert(slow); if((long)slow == -1) { - slow = impl->slow_factory(impl->slow_factory_arg); impl->slow_handle_count++; + pthread_mutex_unlock(&impl->mut); + slow = impl->slow_factory(impl->slow_factory_arg); + } else { + pthread_mutex_unlock(&impl->mut); } return slow; } - static void releaseSlowHandle(nbw_impl * impl, stasis_handle_t * slow) { - addVal(&impl->slow_handles, (long) slow); + assert(slow); + pthread_mutex_lock(&impl->mut); + addVal(&impl->slow_handles, (long)slow); + pthread_mutex_unlock(&impl->mut); } -static inline stasis_handle_t * getFastHandle(nbw_impl * impl, lsn_t off, lsn_t len, int allocIfMissing) { - tree_node * np = malloc(sizeof(tree_node)); - np->start_pos = off; - np->end_pos = off+len; - if(allocIfMissing) { - // printf("lookup (%ld, %ld); ", np->start_pos, np->end_pos); - const tree_node * n = RB_ENTRY(search)(np, impl->fast_handles); - if(n != np) { - // printf("found\n"); - // The node was already in the tree - assert(np->start_pos == n->start_pos && np->end_pos == n->end_pos); - free(np); - } else { - // printf("not found\n"); - assert(RB_ENTRY(find)(n, impl->fast_handles)); - // The node wasn't in the tree and search inserted it for us. - // Assignment is safe because we're writing to a field of n that is ignored by cmp. - ((tree_node*)n)->h = impl->fast_factory(off, len, impl->fast_factory_arg); - } - // fflush(stdout); - ((tree_node*)n)->pin_count++; - return n->h; - } else { - const tree_node * n = RB_ENTRY(find)(np, impl->fast_handles); - free(np); - if(!n) { - return 0; + +static tree_node * allocTreeNode(lsn_t off, lsn_t len) { + tree_node * ret = malloc(sizeof(tree_node)); + ret->start_pos = off; + ret->end_pos = off + len; + ret->dirty = 0; + ret->pin_count = 1; + return ret; +} + +static inline const tree_node * allocFastHandle(nbw_impl * impl, lsn_t off, lsn_t len) { + tree_node * np = allocTreeNode(off, len); + pthread_mutex_lock(&impl->mut); + const tree_node * n = RB_ENTRY(search)(np, impl->fast_handles); + if(n == np) { // not found + if(impl->fast_handle_count > impl->max_fast_handles || + impl->used_buffer_size > impl->max_buffer_size) { + RB_ENTRY(delete)(np, impl->fast_handles); + np->dirty = INVALID_NODE; + pthread_mutex_unlock(&impl->mut); + + np->h = getSlowHandle(impl); } else { - ((tree_node*)n)->pin_count++; - return n->h; + impl->fast_handle_count++; + impl->used_buffer_size += len; + pthread_mutex_unlock(&impl->mut); + + np->h = impl->fast_factory(off,len,impl->fast_factory_arg); } + } else { + ((tree_node*)n)->pin_count++; + pthread_mutex_unlock(&impl->mut); + + free(np); + } + return n; +} +static inline const tree_node * findFastHandle(nbw_impl * impl, lsn_t off, lsn_t len) { + tree_node * np = allocTreeNode(off, len); + + pthread_mutex_lock(&impl->mut); + const tree_node * n = RB_ENTRY(find)(np, impl->fast_handles); + if(n) ((tree_node*)n)->pin_count++; + pthread_mutex_unlock(&impl->mut); + + free(np); + return n; +} +/** Unlke all of the other fastHandle functions, the caller + should hold the mutex when calling freeFastHandle. */ +static inline void freeFastHandle(nbw_impl * impl, const tree_node * n) { + RB_ENTRY(delete)(n, impl->fast_handles); + impl->fast_handle_count--; + impl->used_buffer_size -= (n->end_pos - n->start_pos); + n->h->close(n->h); + free((void*)n); +} +static inline int releaseFastHandle(nbw_impl * impl, const tree_node * n, + int setDirty) { + if(n->dirty == INVALID_NODE) { + // Not in tree; cast removes "const" + releaseSlowHandle(impl, n->h); + free((void*)n); + return 0; + } else { + assert(setDirty == 0 || setDirty == 1); + assert(n->dirty == 0 || n->dirty == 1); + pthread_mutex_lock(&impl->mut); + ((tree_node*)n)->pin_count--; + if(n->dirty == 0) { + ((tree_node*)n)->dirty = setDirty; + } + pthread_mutex_unlock(&impl->mut); + pthread_cond_signal(&impl->pending_writes_cond); + return 0; } } - -static inline int releaseFastHandle(nbw_impl * impl, lsn_t off, lsn_t len, - stasis_handle_t * fast, int setDirty) { - tree_node * np = malloc(sizeof(tree_node)); - np->start_pos = off; - np->end_pos = off+len; - if(fast) { - assert(off == fast->start_position(fast)); - assert(off + len == fast->end_position(fast)); - } - const tree_node * n = RB_ENTRY(find)(np, impl->fast_handles); - free(np); - assert(n); - if(fast) { - assert(n->h == fast); - } - ((tree_node*)n)->pin_count--; - if(setDirty) ((tree_node*)n)->dirty = 1; - +/** @todo nbw_num_copies is unimplemented. */ +static int nbw_num_copies(stasis_handle_t * h) { return 0; } - -static int nbw_num_copies(stasis_handle_t * h) { - nbw_impl * impl = (nbw_impl*) h->impl; - stasis_handle_t * slow = getSlowHandle(impl); - int slow_num_copies = slow->num_copies(slow); - releaseSlowHandle(impl, slow); - stasis_handle_t * fast = impl->fast_factory(0, 0, impl->fast_factory_arg); - int fast_num_copies = fast->num_copies(fast); - fast->close(fast); - return slow_num_copies > fast_num_copies ? slow_num_copies : fast_num_copies; -} +/** @todo nbw_num_copies_buffer is unimplemented. */ static int nbw_num_copies_buffer(stasis_handle_t * h) { - nbw_impl * impl = (nbw_impl*) h->impl; - stasis_handle_t * slow = getSlowHandle(impl); - int slow_num_copies_buffer = slow->num_copies_buffer(slow); - releaseSlowHandle(impl, slow); - stasis_handle_t * fast = impl->fast_factory(0, 0, impl->fast_factory_arg); - int fast_num_copies_buffer = fast->num_copies_buffer(fast); - fast->close(fast); - return slow_num_copies_buffer > fast_num_copies_buffer ? slow_num_copies_buffer : fast_num_copies_buffer; + return 0; } static int nbw_close(stasis_handle_t * h) { - printf("Warning: nbw_close leaks fast handles, and never flushes them to the slow handle...\n"); nbw_impl * impl = h->impl; - stasis_handle_t * slow; - + pthread_mutex_lock(&impl->mut); impl->still_open = 0; pthread_mutex_unlock(&impl->mut); pthread_cond_broadcast(&impl->pending_writes_cond); - + for(int i = 0; i < impl->worker_count; i++) { pthread_join(impl->workers[i], 0); } + // No longer need latch; this is the only thread allowed to touch the handle. + free(impl->workers); - // foreach fast handle .. close fast handle... + // printf("nbw had %d slow handles\n", impl->slow_handle_count); + // fflush(stdout); - printf("nbw had %d slow handles\n", impl->slow_handle_count); - fflush(stdout); + assert(impl->fast_handle_count == 0); + assert(impl->used_buffer_size == 0); - while(-1 != (long)(slow = (stasis_handle_t *)popMaxVal(&(impl->slow_handles)))) { + RB_ENTRY(destroy)(impl->fast_handles); + pthread_mutex_destroy(&impl->mut); + stasis_handle_t * slow; + while(-1 != (long)(slow = (stasis_handle_t*)popMaxVal(&impl->slow_handles))) { slow->close(slow); - impl->slow_handle_count --; + impl->slow_handle_count--; } - destroyList(&(impl->slow_handles)); - assert(impl->slow_handle_count == 0); + destroyList(&impl->slow_handles); + assert(impl->slow_handle_count == 0); + free(h->impl); free(h); return 0; } static lsn_t nbw_start_position(stasis_handle_t *h) { - nbw_impl * impl = (nbw_impl*) h->impl; - pthread_mutex_lock(&(impl->mut)); + nbw_impl * impl = h->impl; + pthread_mutex_lock(&impl->mut); lsn_t ret = impl->start_pos; - pthread_mutex_unlock(&(impl->mut)); + pthread_mutex_unlock(&impl->mut); return ret; } static lsn_t nbw_end_position(stasis_handle_t *h) { - nbw_impl * impl = (nbw_impl*) h->impl; - pthread_mutex_lock(&(impl->mut)); - lsn_t ret = impl->start_pos; - pthread_mutex_unlock(&(impl->mut)); + nbw_impl * impl = h->impl; + pthread_mutex_lock(&impl->mut); + lsn_t ret = impl->end_pos; + pthread_mutex_unlock(&impl->mut); return ret; } static stasis_write_buffer_t * nbw_write_buffer(stasis_handle_t * h, lsn_t off, lsn_t len) { - nbw_impl * impl = (nbw_impl*) h->impl; - int error = 0; + nbw_impl * impl = h->impl; + const tree_node * n = allocFastHandle(impl, off, len); + stasis_write_buffer_t * w = n->h->write_buffer(n->h, off, len); - stasis_write_buffer_t * w = 0; + write_buffer_impl * w_impl = malloc(sizeof(write_buffer_impl)); + w_impl->n = n; + w_impl->w = w; - pthread_mutex_lock(&impl->mut); - if(off < impl->start_pos) { - error = EDOM; - } else { - if(off + len >= impl->end_pos) { - impl->end_pos = off + len; - } - stasis_handle_t * fast = getFastHandle(impl, off, len, 1); - w = fast->write_buffer(fast, off, len); - } - pthread_mutex_unlock(&impl->mut); + stasis_write_buffer_t * ret = malloc(sizeof(stasis_write_buffer_t)); + ret->h = h; + ret->off = w->off; + ret->len = w->len; + ret->buf = w->buf; + ret->error = w->error; + ret->impl = w_impl; - if(!w) { - w = alloc_write_buffer_error(h, error); - } else { - stasis_write_buffer_t * ret = malloc(sizeof(stasis_write_buffer_t)); - ret->h = h; - if(!w->error) { - assert(w->off == off); - assert(w->len == len); + if(!ret->error) { + pthread_mutex_lock(&impl->mut); + assert(impl->start_pos <= impl->end_pos); + if(off < impl->start_pos) { + // Note: We're returning a valid write buffer to space before + // the handle's truncation point. Spooky. + ret->error = EDOM; + } else if(off + len > impl->end_pos) { + impl->end_pos = off+len; } - ret->off = off; - ret->len = len; - ret->buf = w->buf; - ret->impl = w; - ret->error = w->error; - w = ret; + pthread_mutex_unlock(&impl->mut); } - return w; - + + return ret; } static stasis_write_buffer_t * nbw_append_buffer(stasis_handle_t * h, lsn_t len) { - nbw_impl * impl = (nbw_impl*) h->impl; - pthread_mutex_lock(&(impl->mut)); - lsn_t off = impl->end_pos; - impl->end_pos = off+len; - stasis_handle_t * fast = getFastHandle(impl, off, len, 1); - stasis_write_buffer_t * w = fast->write_buffer(fast, off, len); - pthread_mutex_unlock(&(impl->mut)); - // XXX error handling?!? - - stasis_write_buffer_t * ret = malloc(sizeof(stasis_write_buffer_t)); - ret->h = h; - ret->off = off; - ret->len = len; - ret->buf = w->buf; - ret->impl = w; - ret->error = w->error; - w = ret; + nbw_impl * impl = h->impl; - return w; + pthread_mutex_lock(&impl->mut); + lsn_t off = impl->end_pos; + impl->end_pos += len; + pthread_mutex_unlock(&impl->mut); + + return nbw_write_buffer(h, off, len); } static int nbw_release_write_buffer(stasis_write_buffer_t * w) { nbw_impl * impl = w->h->impl; - pthread_mutex_lock(&impl->mut); - if(w->error) { - } else { - stasis_write_buffer_t * w_wrapped = ((stasis_write_buffer_t*)w->impl); - w_wrapped->h->release_write_buffer(w_wrapped); - releaseFastHandle(w->h->impl, w->off, w->len, 0, 1); - // printf("singalling workers\n"); - pthread_cond_signal(&impl->pending_writes_cond); - } + write_buffer_impl * w_impl = w->impl; + const tree_node * n = w_impl->n; + w_impl->w->h->release_write_buffer(w_impl->w); + releaseFastHandle(impl, n, 1); + // pthread_cond_signal(&impl->pending_writes_cond); + free(w_impl); free(w); - pthread_mutex_unlock(&impl->mut); return 0; } static stasis_read_buffer_t * nbw_read_buffer(stasis_handle_t * h, lsn_t off, lsn_t len) { - nbw_impl * impl = (nbw_impl*) h->impl; - pthread_mutex_lock(&impl->mut); - stasis_read_buffer_t * r = 0; - int error = 0; - if(off < impl->start_pos || off + len > impl->end_pos) { - error = EDOM; - } else { - stasis_handle_t * fast = getFastHandle(impl, off, len, 0); + nbw_impl * impl = h->impl; + const tree_node * n = findFastHandle(impl, off, len); + stasis_read_buffer_t * r; + stasis_handle_t * r_h = n ? n->h : getSlowHandle(impl); + r = r_h->read_buffer(r_h, off, len); - if(fast) { - r = fast->read_buffer(fast, off, len); - if(r) { - stasis_read_buffer_t * ret = malloc(sizeof(stasis_read_buffer_t)); - ret->h = h; - ret->off = off; - ret->len = len; - ret->buf = r->buf; - ret->impl = r; - ret->error = r->error; - r = ret; - } - } else { - stasis_handle_t * slow = getSlowHandle(impl); - r = slow->read_buffer(slow, off, len); - releaseSlowHandle(impl, slow); - } - } - if(!r) { - r = alloc_read_buffer_error(h, error); - } - pthread_mutex_unlock(&impl->mut); - return r; + read_buffer_impl * r_impl = malloc(sizeof(read_buffer_impl)); + r_impl->n = n; + r_impl->r = r; + + stasis_read_buffer_t * ret = malloc(sizeof(stasis_write_buffer_t)); + ret->h = h; + ret->off = r->off; + ret->len = r->len; + ret->buf = r->buf; + ret->error = r->error; + ret->impl = r_impl; + + return ret; } static int nbw_release_read_buffer(stasis_read_buffer_t * r) { nbw_impl * impl = r->h->impl; - pthread_mutex_lock(&impl->mut); - - if(r->error) { + read_buffer_impl * r_impl = r->impl; + const tree_node * n = r_impl->n; + stasis_handle_t * oldHandle = r_impl->r->h; + r_impl->r->h->release_read_buffer(r_impl->r); + // XXX shouldn't need to check for this here; getFastHandle does something similar... + if(n) { + releaseFastHandle(impl, n, 0); } else { - stasis_read_buffer_t * r_wrapped = ((stasis_read_buffer_t*)r->impl); - r_wrapped->h->release_read_buffer(r_wrapped); - releaseFastHandle(r->h->impl, r->off, r->len, 0, 0); + assert(oldHandle); + releaseSlowHandle(impl, oldHandle); } + free(r_impl); free(r); - pthread_mutex_unlock(&impl->mut); return 0; - } static int nbw_write(stasis_handle_t * h, lsn_t off, const byte * dat, lsn_t len) { - nbw_impl * impl = (nbw_impl*) h->impl; - pthread_mutex_lock(&impl->mut); - int error = 0; - if(off < impl->start_pos) { - error = EDOM; - } else { - stasis_handle_t * fast = getFastHandle(impl, off, len, 1); - if(off + len > impl->end_pos) { - impl->end_pos = off + len; + nbw_impl * impl = h->impl; + const tree_node * n = allocFastHandle(impl, off, len); + int ret = n->h->write(n->h, off, dat, len); + releaseFastHandle(impl, n, 1); + if(!ret) { + pthread_mutex_lock(&impl->mut); + assert(impl->start_pos <= impl->end_pos); + if(off < impl->start_pos) { + ret = EDOM; + } else if(off + len > impl->end_pos) { + impl->end_pos = off+len; } - error = fast->write(fast, off, dat, len); - releaseFastHandle(impl, off, len, fast, 1); - // printf("singailling workers\n"); - pthread_cond_signal(&impl->pending_writes_cond); + pthread_mutex_unlock(&impl->mut); } - pthread_mutex_unlock(&impl->mut); - return error; + return ret; } static int nbw_append(stasis_handle_t * h, lsn_t * off, const byte * dat, lsn_t len) { - nbw_impl * impl = (nbw_impl *) h->impl; - int error = 0; + nbw_impl * impl = h->impl; + pthread_mutex_lock(&impl->mut); *off = impl->end_pos; - stasis_handle_t * fast = getFastHandle(impl, *off, len, 1); - impl->end_pos = *off + len; - error = fast->write(fast, *off, dat, len); - releaseFastHandle(impl, *off, len, fast, 1); - // printf("singalling workers\n"); - pthread_cond_signal(&impl->pending_writes_cond); + impl->end_pos+= len; pthread_mutex_unlock(&impl->mut); - return error; + + int ret = nbw_write(h, *off, dat, len); + return ret; } static int nbw_read(stasis_handle_t * h, lsn_t off, byte * buf, lsn_t len) { - nbw_impl * impl = (nbw_impl *) h->impl; - int error = 0; - pthread_mutex_lock(&impl->mut); - // printf("got lock"); fflush(stdout); - if(off < impl->start_pos || off + len > impl->end_pos) { - error = EDOM; - // printf("error"); fflush(stdout); - } else { - // printf("getting handle"); fflush(stdout); - stasis_handle_t * fast = getFastHandle(impl, off, len, 0); - // printf("got handle"); fflush(stdout); - if(fast) { - // printf("fast"); fflush(stdout); - error = fast->read(fast, off, buf, len); - releaseFastHandle(impl, off, len, fast, 0); - } else { - // printf("slow"); fflush(stdout); - stasis_handle_t * slow = getSlowHandle(impl); - pthread_mutex_unlock(&impl->mut); - error = slow->read(slow, off, buf, len); - pthread_mutex_lock(&impl->mut); - releaseSlowHandle(impl, slow); - } - // printf("done"); fflush(stdout); + nbw_impl * impl = h->impl; + const tree_node * n = findFastHandle(impl, off, len); + int ret; + // XXX should be handled by releaseFastHandle. + if(n) { + ret = n->h->read(n->h, off, buf, len); + releaseFastHandle(impl, n, 0); + } else { + stasis_handle_t * slow = getSlowHandle(impl); + ret = slow->read(slow, off, buf, len); + releaseSlowHandle(impl, slow); } - pthread_mutex_unlock(&impl->mut); - return error; + return ret; } static int nbw_truncate_start(stasis_handle_t * h, lsn_t new_start) { nbw_impl * impl = h->impl; @@ -445,6 +442,9 @@ static int nbw_truncate_start(stasis_handle_t * h, lsn_t new_start) { error = EDOM; } pthread_mutex_unlock(&impl->mut); + if(!error) { + // XXX close all slow handles; truncate of them. (ie: implement truncate) + } return error; } @@ -466,117 +466,93 @@ struct stasis_handle_t nbw_func = { .error = 0 }; -static void * nbw_worker(void * handle) { - stasis_handle_t * h = (stasis_handle_t*)handle; - nbw_impl * impl = (nbw_impl*)h->impl; +static void * nbw_worker(void * handle) { + stasis_handle_t * h = handle; + nbw_impl * impl = h->impl; pthread_mutex_lock(&impl->mut); - - while(impl->still_open) { - // printf("still open"); - // pick next node. - tree_node * to_flush; - int checked_from_start = 0; - if(impl->last_flushed) { - // cast removes const - to_flush = (tree_node*)RB_ENTRY(lookup)(RB_LUGREAT, impl->last_flushed, - impl->fast_handles); - } else { - to_flush = 0; + while(1) { + // cast strips const. + tree_node * node = (tree_node*)RB_ENTRY(min)(impl->fast_handles); + int writes = 0; + while(node) { + if(node->dirty && !node->pin_count) { + node->dirty = 0; + node->pin_count++; + pthread_mutex_unlock(&impl->mut); + writes++; + stasis_handle_t * slow = getSlowHandle(impl); + stasis_handle_t * fast = node->h; + lsn_t off = fast->start_position(fast); + lsn_t len = fast->end_position(fast) - off; + stasis_read_buffer_t * r = fast->read_buffer(fast, off, len); + slow->write(slow, off, r->buf, len); + r->h->release_read_buffer(r); + releaseSlowHandle(impl, slow); + pthread_mutex_lock(&impl->mut); + node->pin_count--; + } + if(!node->dirty && !node->pin_count) { + freeFastHandle(impl, node); + } + node = (tree_node*)RB_ENTRY(lookup)(RB_LUGREAT, node, impl->fast_handles); } - while((to_flush || ! checked_from_start) && (to_flush != impl->last_flushed || to_flush == 0)) { - if(to_flush) { - if(to_flush->dirty) { break; } - // printf("clean node.."); - // strip const - to_flush = (tree_node*)RB_ENTRY(lookup)(RB_LUGREAT, to_flush, - impl->fast_handles); - } else { - // printf("looking at beginning of tree.."); - // strip const - to_flush = (tree_node*)RB_ENTRY(min)(impl->fast_handles); - checked_from_start = 1; + if(!impl->fast_handle_count || !writes) { + if(impl->still_open) { + pthread_cond_wait(&impl->pending_writes_cond, &impl->mut); + } else { + break; } } - if(!to_flush) { - // printf("nothing to flush"); - pthread_cond_wait(&impl->pending_writes_cond, &impl->mut); - } else { - impl->last_flushed = to_flush; - to_flush->pin_count++; - - stasis_handle_t * slow = getSlowHandle(impl); - - pthread_mutex_unlock(&impl->mut); - - - // @todo need a sendfile-style thing... - - stasis_handle_t * fast = to_flush->h; - - lsn_t off = fast->start_position(fast); - lsn_t len = fast->end_position(fast) - off; - stasis_read_buffer_t * r = fast->read_buffer(fast, off, len); - - // printf("%ld, Flushing %ld to %ld\n", pthread_self(), off, off+len);fflush(stdout); - - slow->write(slow, off, r->buf, len); - - r->h->release_read_buffer(r); - - pthread_mutex_lock(&impl->mut); - - releaseSlowHandle(impl, slow); - to_flush->pin_count--; - - } } - pthread_mutex_unlock(&impl->mut); return 0; } - stasis_handle_t * stasis_handle(open_non_blocking)(stasis_handle_t * (*slow_factory)(void * arg), void * slow_factory_arg, stasis_handle_t * (*fast_factory)(lsn_t, lsn_t, void *), void * fast_factory_arg, int worker_thread_count, - lsn_t buffer_size) { + lsn_t buffer_size, int max_fast_handles) { nbw_impl * impl = malloc(sizeof(nbw_impl)); pthread_mutex_init(&impl->mut, 0); + + impl->start_pos = 0; + impl->end_pos = 0; + impl->slow_factory = slow_factory; impl->slow_factory_arg = slow_factory_arg; - impl->fast_factory = fast_factory; - impl->fast_factory_arg = fast_factory_arg; - impl->fast_handles = RB_ENTRY(init)(cmp_handle, 0); + impl->slow_handles = 0; impl->slow_handle_count = 0; - impl->workers = 0; - impl->worker_count = worker_thread_count; - impl->buffer_size = buffer_size; - impl->used_buffer_size = 0; - - stasis_handle_t * slow = getSlowHandle(impl); - impl->workers = malloc(impl->worker_count * sizeof(pthread_t)); + impl->fast_factory = fast_factory; + impl->fast_factory_arg = fast_factory_arg; + + impl->fast_handles = RB_ENTRY(init)(cmp_handle, 0); + impl->fast_handle_count = 0; + impl->max_fast_handles = max_fast_handles; + + impl->max_buffer_size = buffer_size; + impl->used_buffer_size = 0; + + impl->workers = malloc(worker_thread_count * sizeof(pthread_t)); + impl->worker_count = worker_thread_count; + + pthread_cond_init(&impl->pending_writes_cond, 0); impl->still_open = 1; - pthread_cond_init(&impl->pending_writes_cond, 0); - impl->last_flushed = 0; - impl->start_pos = slow->start_position(slow); - impl->end_pos = slow->end_position(slow); - - releaseSlowHandle(impl,slow); - - stasis_handle_t * ret = malloc(sizeof(stasis_handle_t)); - *ret = nbw_func; - ret->impl = impl; + stasis_handle_t *h = malloc(sizeof(stasis_handle_t)); + *h = nbw_func; + h->impl = impl; for(int i = 0; i < impl->worker_count; i++) { - pthread_create(&(impl->workers[i]), 0, nbw_worker, ret); + int err = pthread_create(&(impl->workers[i]), 0, nbw_worker, h); + if(err) { + perror("Coudln't spawn worker thread for non_blocking io"); + } } - - return ret; + return h; }