New non-blocking implementation, with backpressure. Still doesn't handle overlapping writes. (Multiple writes to the same region is OK though.)

This commit is contained in:
Sears Russell 2006-11-21 06:50:12 +00:00
parent 4520f672ac
commit 802eeb33e7
2 changed files with 334 additions and 358 deletions

View file

@ -97,7 +97,7 @@ stasis_handle_t * stasis_handle(open_non_blocking)(stasis_handle_t * (*slow_fact
stasis_handle_t * (*fast_factory)(lsn_t off, lsn_t len, void * arg), stasis_handle_t * (*fast_factory)(lsn_t off, lsn_t len, void * arg),
void * fast_factory_arg, void * fast_factory_arg,
int worker_thread_count, int worker_thread_count,
lsn_t buffer_size); lsn_t buffer_size, int max_writes);
stasis_handle_t * stasis_handle(open_verifying)(stasis_handle_t * h); stasis_handle_t * stasis_handle(open_verifying)(stasis_handle_t * h);
stasis_handle_t * stasis_handle(open_debug)(stasis_handle_t * h); stasis_handle_t * stasis_handle(open_debug)(stasis_handle_t * h);

View file

@ -19,8 +19,21 @@
is possible for a read or write to block on a memcpy(), but writes is possible for a read or write to block on a memcpy(), but writes
may not block on disk i/o.) may not block on disk i/o.)
Latching protocol:
Each static function that manipulates the tree or lists grabs a
latch. Functions that call such functions should not hold a latch
when the function is called. If a function must atomically update
the handle's state (eg: append), they should oobtain the latch,
and release it before calling another function or returning.
Exception: freeFastHandle should be called while holding the
latch.
*/ */
#define INVALID_NODE 2
static inline stasis_read_buffer_t * alloc_read_buffer_error(stasis_handle_t * h, int error) { static inline stasis_read_buffer_t * alloc_read_buffer_error(stasis_handle_t * h, int error) {
assert(error); assert(error);
stasis_read_buffer_t * r = malloc(sizeof(stasis_read_buffer_t)); stasis_read_buffer_t * r = malloc(sizeof(stasis_read_buffer_t));
@ -43,25 +56,47 @@ static inline stasis_write_buffer_t * alloc_write_buffer_error(stasis_handle_t *
w->error = error; w->error = error;
return w; return w;
} }
/** Wraps stasis_handle_t so that it can be stored in an rbtree. */
typedef struct tree_node { typedef struct tree_node {
lsn_t start_pos; lsn_t start_pos;
lsn_t end_pos; lsn_t end_pos;
stasis_handle_t * h; stasis_handle_t * h;
int pin_count; // held when the handle can not be deallocated, (when it's being written, read, or flushed to disk) /** The number of threads accessing this handle. The handle cannot
int dirty; // set to 1 when the handle is written to, 0 when the handle is written back to disk be deallocated unless this is zero. */
int pin_count;
/** set to 1 when the handle is written to, 0 when the handle is
written back to disk, INVALID_NODE when the handle is not in
the tree. */
int dirty;
} tree_node; } tree_node;
/** Wrapper for write buffers */
typedef struct write_buffer_impl {
/** The tree node that contains this buffer */
const tree_node * n;
/** The underlying buffer. */
stasis_write_buffer_t * w;
} write_buffer_impl;
typedef struct read_buffer_impl {
/** The tree node that contains this buffer, or NULL if the buffer
is from a slow handle. */
const tree_node * n;
/** The underlying buffer. */
stasis_read_buffer_t * r;
} read_buffer_impl;
/**
Compare two tree_node structs. Two tree nodes are equal if they
are zero length, and start at the same point, or if they overlap.
*/
static int cmp_handle(const void * ap, const void * bp, const void * ignored) { static int cmp_handle(const void * ap, const void * bp, const void * ignored) {
tree_node * a = (tree_node*)ap; tree_node * a = (tree_node*)ap;
tree_node * b = (tree_node*)bp; tree_node * b = (tree_node*)bp;
if(a->start_pos == b->start_pos && if(a->start_pos == b->start_pos &&
a->start_pos == a->end_pos && a->start_pos == a->end_pos &&
b->start_pos == b->end_pos ) { b->start_pos == b->end_pos ) {
return 0; // Special case: two zero length regions that start at the same place are equal. return 0;
} }
// Otherwise, the regions must overlap:
// (0,0) == (0,0) ; (1,0) == (0,4) ; (0,0) < (0,4)
if(a->end_pos <= b->start_pos) { if(a->end_pos <= b->start_pos) {
return -1; return -1;
} else if(a->start_pos >= b->end_pos) { } else if(a->start_pos >= b->end_pos) {
@ -69,140 +104,145 @@ static int cmp_handle(const void * ap, const void * bp, const void * ignored) {
} else { } else {
return 0; return 0;
} }
} }
typedef struct nbw_read_buffer_impl {
stasis_read_buffer_t * buffer;
tree_node * n;
} nbw_read_buffer_impl;
typedef struct nbw_impl { typedef struct nbw_impl {
pthread_mutex_t mut; pthread_mutex_t mut;
// Need more than one....
// Handle state
lsn_t start_pos;
lsn_t end_pos;
// Fields to manage slow handles
stasis_handle_t * (*slow_factory)(void * arg); stasis_handle_t * (*slow_factory)(void * arg);
void * slow_factory_arg; void * slow_factory_arg;
stasis_handle_t * (*fast_factory)(lsn_t off, lsn_t len, void * arg);
void * fast_factory_arg;
struct RB_ENTRY(tree) * fast_handles;
pthread_t * workers;
int worker_count;
tree_node * last_flushed;
pthread_cond_t pending_writes_cond;
LinkedList pending_writes;
int still_open;
lsn_t buffer_size;
lsn_t used_buffer_size;
LinkedList * slow_handles; LinkedList * slow_handles;
int slow_handle_count; int slow_handle_count;
// The rest of the values are caches maintained for efficiency. // Fields to manage fast handles
stasis_handle_t * (*fast_factory)(lsn_t off, lsn_t len, void * arg);
void * fast_factory_arg;
/* int num_copies; struct RB_ENTRY(tree) * fast_handles;
int num_copies_buffer; */ int fast_handle_count;
lsn_t start_pos; int max_fast_handles;
lsn_t end_pos; lsn_t used_buffer_size;
lsn_t max_buffer_size;
// Fields to manage and signal worker threads
pthread_t * workers;
int worker_count;
pthread_cond_t pending_writes_cond;
int still_open;
} nbw_impl; } nbw_impl;
static stasis_handle_t * getSlowHandle(nbw_impl * impl) { static stasis_handle_t * getSlowHandle(nbw_impl * impl) {
stasis_handle_t * slow = (stasis_handle_t*) popMaxVal(&impl->slow_handles); pthread_mutex_lock(&impl->mut);
stasis_handle_t * slow = (stasis_handle_t*)popMaxVal(&impl->slow_handles);
assert(slow);
if((long)slow == -1) { if((long)slow == -1) {
slow = impl->slow_factory(impl->slow_factory_arg);
impl->slow_handle_count++; impl->slow_handle_count++;
pthread_mutex_unlock(&impl->mut);
slow = impl->slow_factory(impl->slow_factory_arg);
} else {
pthread_mutex_unlock(&impl->mut);
} }
return slow; return slow;
} }
static void releaseSlowHandle(nbw_impl * impl, stasis_handle_t * slow) { static void releaseSlowHandle(nbw_impl * impl, stasis_handle_t * slow) {
addVal(&impl->slow_handles, (long) slow); assert(slow);
pthread_mutex_lock(&impl->mut);
addVal(&impl->slow_handles, (long)slow);
pthread_mutex_unlock(&impl->mut);
} }
static inline stasis_handle_t * getFastHandle(nbw_impl * impl, lsn_t off, lsn_t len, int allocIfMissing) {
tree_node * np = malloc(sizeof(tree_node)); static tree_node * allocTreeNode(lsn_t off, lsn_t len) {
np->start_pos = off; tree_node * ret = malloc(sizeof(tree_node));
np->end_pos = off+len; ret->start_pos = off;
if(allocIfMissing) { ret->end_pos = off + len;
// printf("lookup (%ld, %ld); ", np->start_pos, np->end_pos); ret->dirty = 0;
const tree_node * n = RB_ENTRY(search)(np, impl->fast_handles); ret->pin_count = 1;
if(n != np) { return ret;
// printf("found\n"); }
// The node was already in the tree
assert(np->start_pos == n->start_pos && np->end_pos == n->end_pos); static inline const tree_node * allocFastHandle(nbw_impl * impl, lsn_t off, lsn_t len) {
free(np); tree_node * np = allocTreeNode(off, len);
pthread_mutex_lock(&impl->mut);
const tree_node * n = RB_ENTRY(search)(np, impl->fast_handles);
if(n == np) { // not found
if(impl->fast_handle_count > impl->max_fast_handles ||
impl->used_buffer_size > impl->max_buffer_size) {
RB_ENTRY(delete)(np, impl->fast_handles);
np->dirty = INVALID_NODE;
pthread_mutex_unlock(&impl->mut);
np->h = getSlowHandle(impl);
} else { } else {
// printf("not found\n"); impl->fast_handle_count++;
assert(RB_ENTRY(find)(n, impl->fast_handles)); impl->used_buffer_size += len;
// The node wasn't in the tree and search inserted it for us. pthread_mutex_unlock(&impl->mut);
// Assignment is safe because we're writing to a field of n that is ignored by cmp.
((tree_node*)n)->h = impl->fast_factory(off, len, impl->fast_factory_arg); np->h = impl->fast_factory(off,len,impl->fast_factory_arg);
} }
// fflush(stdout);
((tree_node*)n)->pin_count++;
return n->h;
} else { } else {
const tree_node * n = RB_ENTRY(find)(np, impl->fast_handles); ((tree_node*)n)->pin_count++;
pthread_mutex_unlock(&impl->mut);
free(np); free(np);
if(!n) { }
return 0; return n;
} else { }
((tree_node*)n)->pin_count++; static inline const tree_node * findFastHandle(nbw_impl * impl, lsn_t off, lsn_t len) {
return n->h; tree_node * np = allocTreeNode(off, len);
pthread_mutex_lock(&impl->mut);
const tree_node * n = RB_ENTRY(find)(np, impl->fast_handles);
if(n) ((tree_node*)n)->pin_count++;
pthread_mutex_unlock(&impl->mut);
free(np);
return n;
}
/** Unlke all of the other fastHandle functions, the caller
should hold the mutex when calling freeFastHandle. */
static inline void freeFastHandle(nbw_impl * impl, const tree_node * n) {
RB_ENTRY(delete)(n, impl->fast_handles);
impl->fast_handle_count--;
impl->used_buffer_size -= (n->end_pos - n->start_pos);
n->h->close(n->h);
free((void*)n);
}
static inline int releaseFastHandle(nbw_impl * impl, const tree_node * n,
int setDirty) {
if(n->dirty == INVALID_NODE) {
// Not in tree; cast removes "const"
releaseSlowHandle(impl, n->h);
free((void*)n);
return 0;
} else {
assert(setDirty == 0 || setDirty == 1);
assert(n->dirty == 0 || n->dirty == 1);
pthread_mutex_lock(&impl->mut);
((tree_node*)n)->pin_count--;
if(n->dirty == 0) {
((tree_node*)n)->dirty = setDirty;
} }
pthread_mutex_unlock(&impl->mut);
pthread_cond_signal(&impl->pending_writes_cond);
return 0;
} }
} }
/** @todo nbw_num_copies is unimplemented. */
static inline int releaseFastHandle(nbw_impl * impl, lsn_t off, lsn_t len, static int nbw_num_copies(stasis_handle_t * h) {
stasis_handle_t * fast, int setDirty) {
tree_node * np = malloc(sizeof(tree_node));
np->start_pos = off;
np->end_pos = off+len;
if(fast) {
assert(off == fast->start_position(fast));
assert(off + len == fast->end_position(fast));
}
const tree_node * n = RB_ENTRY(find)(np, impl->fast_handles);
free(np);
assert(n);
if(fast) {
assert(n->h == fast);
}
((tree_node*)n)->pin_count--;
if(setDirty) ((tree_node*)n)->dirty = 1;
return 0; return 0;
} }
/** @todo nbw_num_copies_buffer is unimplemented. */
static int nbw_num_copies(stasis_handle_t * h) {
nbw_impl * impl = (nbw_impl*) h->impl;
stasis_handle_t * slow = getSlowHandle(impl);
int slow_num_copies = slow->num_copies(slow);
releaseSlowHandle(impl, slow);
stasis_handle_t * fast = impl->fast_factory(0, 0, impl->fast_factory_arg);
int fast_num_copies = fast->num_copies(fast);
fast->close(fast);
return slow_num_copies > fast_num_copies ? slow_num_copies : fast_num_copies;
}
static int nbw_num_copies_buffer(stasis_handle_t * h) { static int nbw_num_copies_buffer(stasis_handle_t * h) {
nbw_impl * impl = (nbw_impl*) h->impl; return 0;
stasis_handle_t * slow = getSlowHandle(impl);
int slow_num_copies_buffer = slow->num_copies_buffer(slow);
releaseSlowHandle(impl, slow);
stasis_handle_t * fast = impl->fast_factory(0, 0, impl->fast_factory_arg);
int fast_num_copies_buffer = fast->num_copies_buffer(fast);
fast->close(fast);
return slow_num_copies_buffer > fast_num_copies_buffer ? slow_num_copies_buffer : fast_num_copies_buffer;
} }
static int nbw_close(stasis_handle_t * h) { static int nbw_close(stasis_handle_t * h) {
printf("Warning: nbw_close leaks fast handles, and never flushes them to the slow handle...\n");
nbw_impl * impl = h->impl; nbw_impl * impl = h->impl;
stasis_handle_t * slow;
pthread_mutex_lock(&impl->mut); pthread_mutex_lock(&impl->mut);
impl->still_open = 0; impl->still_open = 0;
@ -213,18 +253,25 @@ static int nbw_close(stasis_handle_t * h) {
pthread_join(impl->workers[i], 0); pthread_join(impl->workers[i], 0);
} }
// No longer need latch; this is the only thread allowed to touch the handle.
free(impl->workers); free(impl->workers);
// foreach fast handle .. close fast handle... // printf("nbw had %d slow handles\n", impl->slow_handle_count);
// fflush(stdout);
printf("nbw had %d slow handles\n", impl->slow_handle_count); assert(impl->fast_handle_count == 0);
fflush(stdout); assert(impl->used_buffer_size == 0);
while(-1 != (long)(slow = (stasis_handle_t *)popMaxVal(&(impl->slow_handles)))) { RB_ENTRY(destroy)(impl->fast_handles);
pthread_mutex_destroy(&impl->mut);
stasis_handle_t * slow;
while(-1 != (long)(slow = (stasis_handle_t*)popMaxVal(&impl->slow_handles))) {
slow->close(slow); slow->close(slow);
impl->slow_handle_count --; impl->slow_handle_count--;
} }
destroyList(&(impl->slow_handles)); destroyList(&impl->slow_handles);
assert(impl->slow_handle_count == 0); assert(impl->slow_handle_count == 0);
free(h->impl); free(h->impl);
@ -232,208 +279,158 @@ static int nbw_close(stasis_handle_t * h) {
return 0; return 0;
} }
static lsn_t nbw_start_position(stasis_handle_t *h) { static lsn_t nbw_start_position(stasis_handle_t *h) {
nbw_impl * impl = (nbw_impl*) h->impl; nbw_impl * impl = h->impl;
pthread_mutex_lock(&(impl->mut)); pthread_mutex_lock(&impl->mut);
lsn_t ret = impl->start_pos; lsn_t ret = impl->start_pos;
pthread_mutex_unlock(&(impl->mut)); pthread_mutex_unlock(&impl->mut);
return ret; return ret;
} }
static lsn_t nbw_end_position(stasis_handle_t *h) { static lsn_t nbw_end_position(stasis_handle_t *h) {
nbw_impl * impl = (nbw_impl*) h->impl; nbw_impl * impl = h->impl;
pthread_mutex_lock(&(impl->mut)); pthread_mutex_lock(&impl->mut);
lsn_t ret = impl->start_pos; lsn_t ret = impl->end_pos;
pthread_mutex_unlock(&(impl->mut)); pthread_mutex_unlock(&impl->mut);
return ret; return ret;
} }
static stasis_write_buffer_t * nbw_write_buffer(stasis_handle_t * h, static stasis_write_buffer_t * nbw_write_buffer(stasis_handle_t * h,
lsn_t off, lsn_t len) { lsn_t off, lsn_t len) {
nbw_impl * impl = (nbw_impl*) h->impl; nbw_impl * impl = h->impl;
int error = 0; const tree_node * n = allocFastHandle(impl, off, len);
stasis_write_buffer_t * w = n->h->write_buffer(n->h, off, len);
stasis_write_buffer_t * w = 0; write_buffer_impl * w_impl = malloc(sizeof(write_buffer_impl));
w_impl->n = n;
w_impl->w = w;
pthread_mutex_lock(&impl->mut); stasis_write_buffer_t * ret = malloc(sizeof(stasis_write_buffer_t));
if(off < impl->start_pos) { ret->h = h;
error = EDOM; ret->off = w->off;
} else { ret->len = w->len;
if(off + len >= impl->end_pos) { ret->buf = w->buf;
impl->end_pos = off + len; ret->error = w->error;
ret->impl = w_impl;
if(!ret->error) {
pthread_mutex_lock(&impl->mut);
assert(impl->start_pos <= impl->end_pos);
if(off < impl->start_pos) {
// Note: We're returning a valid write buffer to space before
// the handle's truncation point. Spooky.
ret->error = EDOM;
} else if(off + len > impl->end_pos) {
impl->end_pos = off+len;
} }
stasis_handle_t * fast = getFastHandle(impl, off, len, 1); pthread_mutex_unlock(&impl->mut);
w = fast->write_buffer(fast, off, len);
} }
pthread_mutex_unlock(&impl->mut);
if(!w) {
w = alloc_write_buffer_error(h, error);
} else {
stasis_write_buffer_t * ret = malloc(sizeof(stasis_write_buffer_t));
ret->h = h;
if(!w->error) {
assert(w->off == off);
assert(w->len == len);
}
ret->off = off;
ret->len = len;
ret->buf = w->buf;
ret->impl = w;
ret->error = w->error;
w = ret;
}
return w;
return ret;
} }
static stasis_write_buffer_t * nbw_append_buffer(stasis_handle_t * h, static stasis_write_buffer_t * nbw_append_buffer(stasis_handle_t * h,
lsn_t len) { lsn_t len) {
nbw_impl * impl = (nbw_impl*) h->impl; nbw_impl * impl = h->impl;
pthread_mutex_lock(&(impl->mut));
pthread_mutex_lock(&impl->mut);
lsn_t off = impl->end_pos; lsn_t off = impl->end_pos;
impl->end_pos = off+len; impl->end_pos += len;
stasis_handle_t * fast = getFastHandle(impl, off, len, 1); pthread_mutex_unlock(&impl->mut);
stasis_write_buffer_t * w = fast->write_buffer(fast, off, len);
pthread_mutex_unlock(&(impl->mut));
// XXX error handling?!?
stasis_write_buffer_t * ret = malloc(sizeof(stasis_write_buffer_t)); return nbw_write_buffer(h, off, len);
ret->h = h;
ret->off = off;
ret->len = len;
ret->buf = w->buf;
ret->impl = w;
ret->error = w->error;
w = ret;
return w;
} }
static int nbw_release_write_buffer(stasis_write_buffer_t * w) { static int nbw_release_write_buffer(stasis_write_buffer_t * w) {
nbw_impl * impl = w->h->impl; nbw_impl * impl = w->h->impl;
pthread_mutex_lock(&impl->mut); write_buffer_impl * w_impl = w->impl;
if(w->error) { const tree_node * n = w_impl->n;
} else { w_impl->w->h->release_write_buffer(w_impl->w);
stasis_write_buffer_t * w_wrapped = ((stasis_write_buffer_t*)w->impl); releaseFastHandle(impl, n, 1);
w_wrapped->h->release_write_buffer(w_wrapped); // pthread_cond_signal(&impl->pending_writes_cond);
releaseFastHandle(w->h->impl, w->off, w->len, 0, 1); free(w_impl);
// printf("singalling workers\n");
pthread_cond_signal(&impl->pending_writes_cond);
}
free(w); free(w);
pthread_mutex_unlock(&impl->mut);
return 0; return 0;
} }
static stasis_read_buffer_t * nbw_read_buffer(stasis_handle_t * h, static stasis_read_buffer_t * nbw_read_buffer(stasis_handle_t * h,
lsn_t off, lsn_t len) { lsn_t off, lsn_t len) {
nbw_impl * impl = (nbw_impl*) h->impl; nbw_impl * impl = h->impl;
pthread_mutex_lock(&impl->mut); const tree_node * n = findFastHandle(impl, off, len);
stasis_read_buffer_t * r = 0; stasis_read_buffer_t * r;
int error = 0; stasis_handle_t * r_h = n ? n->h : getSlowHandle(impl);
if(off < impl->start_pos || off + len > impl->end_pos) { r = r_h->read_buffer(r_h, off, len);
error = EDOM;
} else {
stasis_handle_t * fast = getFastHandle(impl, off, len, 0);
if(fast) { read_buffer_impl * r_impl = malloc(sizeof(read_buffer_impl));
r = fast->read_buffer(fast, off, len); r_impl->n = n;
if(r) { r_impl->r = r;
stasis_read_buffer_t * ret = malloc(sizeof(stasis_read_buffer_t));
ret->h = h; stasis_read_buffer_t * ret = malloc(sizeof(stasis_write_buffer_t));
ret->off = off; ret->h = h;
ret->len = len; ret->off = r->off;
ret->buf = r->buf; ret->len = r->len;
ret->impl = r; ret->buf = r->buf;
ret->error = r->error; ret->error = r->error;
r = ret; ret->impl = r_impl;
}
} else { return ret;
stasis_handle_t * slow = getSlowHandle(impl);
r = slow->read_buffer(slow, off, len);
releaseSlowHandle(impl, slow);
}
}
if(!r) {
r = alloc_read_buffer_error(h, error);
}
pthread_mutex_unlock(&impl->mut);
return r;
} }
static int nbw_release_read_buffer(stasis_read_buffer_t * r) { static int nbw_release_read_buffer(stasis_read_buffer_t * r) {
nbw_impl * impl = r->h->impl; nbw_impl * impl = r->h->impl;
pthread_mutex_lock(&impl->mut); read_buffer_impl * r_impl = r->impl;
const tree_node * n = r_impl->n;
if(r->error) { stasis_handle_t * oldHandle = r_impl->r->h;
r_impl->r->h->release_read_buffer(r_impl->r);
// XXX shouldn't need to check for this here; getFastHandle does something similar...
if(n) {
releaseFastHandle(impl, n, 0);
} else { } else {
stasis_read_buffer_t * r_wrapped = ((stasis_read_buffer_t*)r->impl); assert(oldHandle);
r_wrapped->h->release_read_buffer(r_wrapped); releaseSlowHandle(impl, oldHandle);
releaseFastHandle(r->h->impl, r->off, r->len, 0, 0);
} }
free(r_impl);
free(r); free(r);
pthread_mutex_unlock(&impl->mut);
return 0; return 0;
} }
static int nbw_write(stasis_handle_t * h, lsn_t off, static int nbw_write(stasis_handle_t * h, lsn_t off,
const byte * dat, lsn_t len) { const byte * dat, lsn_t len) {
nbw_impl * impl = (nbw_impl*) h->impl; nbw_impl * impl = h->impl;
pthread_mutex_lock(&impl->mut); const tree_node * n = allocFastHandle(impl, off, len);
int error = 0; int ret = n->h->write(n->h, off, dat, len);
if(off < impl->start_pos) { releaseFastHandle(impl, n, 1);
error = EDOM; if(!ret) {
} else { pthread_mutex_lock(&impl->mut);
stasis_handle_t * fast = getFastHandle(impl, off, len, 1); assert(impl->start_pos <= impl->end_pos);
if(off + len > impl->end_pos) { if(off < impl->start_pos) {
impl->end_pos = off + len; ret = EDOM;
} else if(off + len > impl->end_pos) {
impl->end_pos = off+len;
} }
error = fast->write(fast, off, dat, len); pthread_mutex_unlock(&impl->mut);
releaseFastHandle(impl, off, len, fast, 1);
// printf("singailling workers\n");
pthread_cond_signal(&impl->pending_writes_cond);
} }
pthread_mutex_unlock(&impl->mut); return ret;
return error;
} }
static int nbw_append(stasis_handle_t * h, lsn_t * off, static int nbw_append(stasis_handle_t * h, lsn_t * off,
const byte * dat, lsn_t len) { const byte * dat, lsn_t len) {
nbw_impl * impl = (nbw_impl *) h->impl; nbw_impl * impl = h->impl;
int error = 0;
pthread_mutex_lock(&impl->mut); pthread_mutex_lock(&impl->mut);
*off = impl->end_pos; *off = impl->end_pos;
stasis_handle_t * fast = getFastHandle(impl, *off, len, 1); impl->end_pos+= len;
impl->end_pos = *off + len;
error = fast->write(fast, *off, dat, len);
releaseFastHandle(impl, *off, len, fast, 1);
// printf("singalling workers\n");
pthread_cond_signal(&impl->pending_writes_cond);
pthread_mutex_unlock(&impl->mut); pthread_mutex_unlock(&impl->mut);
return error;
int ret = nbw_write(h, *off, dat, len);
return ret;
} }
static int nbw_read(stasis_handle_t * h, static int nbw_read(stasis_handle_t * h,
lsn_t off, byte * buf, lsn_t len) { lsn_t off, byte * buf, lsn_t len) {
nbw_impl * impl = (nbw_impl *) h->impl; nbw_impl * impl = h->impl;
int error = 0; const tree_node * n = findFastHandle(impl, off, len);
pthread_mutex_lock(&impl->mut); int ret;
// printf("got lock"); fflush(stdout); // XXX should be handled by releaseFastHandle.
if(off < impl->start_pos || off + len > impl->end_pos) { if(n) {
error = EDOM; ret = n->h->read(n->h, off, buf, len);
// printf("error"); fflush(stdout); releaseFastHandle(impl, n, 0);
} else { } else {
// printf("getting handle"); fflush(stdout); stasis_handle_t * slow = getSlowHandle(impl);
stasis_handle_t * fast = getFastHandle(impl, off, len, 0); ret = slow->read(slow, off, buf, len);
// printf("got handle"); fflush(stdout); releaseSlowHandle(impl, slow);
if(fast) {
// printf("fast"); fflush(stdout);
error = fast->read(fast, off, buf, len);
releaseFastHandle(impl, off, len, fast, 0);
} else {
// printf("slow"); fflush(stdout);
stasis_handle_t * slow = getSlowHandle(impl);
pthread_mutex_unlock(&impl->mut);
error = slow->read(slow, off, buf, len);
pthread_mutex_lock(&impl->mut);
releaseSlowHandle(impl, slow);
}
// printf("done"); fflush(stdout);
} }
pthread_mutex_unlock(&impl->mut); return ret;
return error;
} }
static int nbw_truncate_start(stasis_handle_t * h, lsn_t new_start) { static int nbw_truncate_start(stasis_handle_t * h, lsn_t new_start) {
nbw_impl * impl = h->impl; nbw_impl * impl = h->impl;
@ -445,6 +442,9 @@ static int nbw_truncate_start(stasis_handle_t * h, lsn_t new_start) {
error = EDOM; error = EDOM;
} }
pthread_mutex_unlock(&impl->mut); pthread_mutex_unlock(&impl->mut);
if(!error) {
// XXX close all slow handles; truncate of them. (ie: implement truncate)
}
return error; return error;
} }
@ -467,116 +467,92 @@ struct stasis_handle_t nbw_func = {
}; };
static void * nbw_worker(void * handle) { static void * nbw_worker(void * handle) {
stasis_handle_t * h = (stasis_handle_t*)handle; stasis_handle_t * h = handle;
nbw_impl * impl = (nbw_impl*)h->impl; nbw_impl * impl = h->impl;
pthread_mutex_lock(&impl->mut); pthread_mutex_lock(&impl->mut);
while(1) {
while(impl->still_open) { // cast strips const.
// printf("still open"); tree_node * node = (tree_node*)RB_ENTRY(min)(impl->fast_handles);
// pick next node. int writes = 0;
tree_node * to_flush; while(node) {
int checked_from_start = 0; if(node->dirty && !node->pin_count) {
if(impl->last_flushed) { node->dirty = 0;
// cast removes const node->pin_count++;
to_flush = (tree_node*)RB_ENTRY(lookup)(RB_LUGREAT, impl->last_flushed, pthread_mutex_unlock(&impl->mut);
impl->fast_handles); writes++;
} else { stasis_handle_t * slow = getSlowHandle(impl);
to_flush = 0; stasis_handle_t * fast = node->h;
lsn_t off = fast->start_position(fast);
lsn_t len = fast->end_position(fast) - off;
stasis_read_buffer_t * r = fast->read_buffer(fast, off, len);
slow->write(slow, off, r->buf, len);
r->h->release_read_buffer(r);
releaseSlowHandle(impl, slow);
pthread_mutex_lock(&impl->mut);
node->pin_count--;
}
if(!node->dirty && !node->pin_count) {
freeFastHandle(impl, node);
}
node = (tree_node*)RB_ENTRY(lookup)(RB_LUGREAT, node, impl->fast_handles);
} }
while((to_flush || ! checked_from_start) && (to_flush != impl->last_flushed || to_flush == 0)) { if(!impl->fast_handle_count || !writes) {
if(to_flush) { if(impl->still_open) {
if(to_flush->dirty) { break; } pthread_cond_wait(&impl->pending_writes_cond, &impl->mut);
// printf("clean node..");
// strip const
to_flush = (tree_node*)RB_ENTRY(lookup)(RB_LUGREAT, to_flush,
impl->fast_handles);
} else { } else {
// printf("looking at beginning of tree.."); break;
// strip const
to_flush = (tree_node*)RB_ENTRY(min)(impl->fast_handles);
checked_from_start = 1;
} }
} }
if(!to_flush) {
// printf("nothing to flush");
pthread_cond_wait(&impl->pending_writes_cond, &impl->mut);
} else {
impl->last_flushed = to_flush;
to_flush->pin_count++;
stasis_handle_t * slow = getSlowHandle(impl);
pthread_mutex_unlock(&impl->mut);
// @todo need a sendfile-style thing...
stasis_handle_t * fast = to_flush->h;
lsn_t off = fast->start_position(fast);
lsn_t len = fast->end_position(fast) - off;
stasis_read_buffer_t * r = fast->read_buffer(fast, off, len);
// printf("%ld, Flushing %ld to %ld\n", pthread_self(), off, off+len);fflush(stdout);
slow->write(slow, off, r->buf, len);
r->h->release_read_buffer(r);
pthread_mutex_lock(&impl->mut);
releaseSlowHandle(impl, slow);
to_flush->pin_count--;
}
} }
pthread_mutex_unlock(&impl->mut); pthread_mutex_unlock(&impl->mut);
return 0; return 0;
} }
stasis_handle_t * stasis_handle(open_non_blocking)(stasis_handle_t * (*slow_factory)(void * arg), stasis_handle_t * stasis_handle(open_non_blocking)(stasis_handle_t * (*slow_factory)(void * arg),
void * slow_factory_arg, void * slow_factory_arg,
stasis_handle_t * (*fast_factory)(lsn_t, lsn_t, void *), stasis_handle_t * (*fast_factory)(lsn_t, lsn_t, void *),
void * fast_factory_arg, void * fast_factory_arg,
int worker_thread_count, int worker_thread_count,
lsn_t buffer_size) { lsn_t buffer_size, int max_fast_handles) {
nbw_impl * impl = malloc(sizeof(nbw_impl)); nbw_impl * impl = malloc(sizeof(nbw_impl));
pthread_mutex_init(&impl->mut, 0); pthread_mutex_init(&impl->mut, 0);
impl->start_pos = 0;
impl->end_pos = 0;
impl->slow_factory = slow_factory; impl->slow_factory = slow_factory;
impl->slow_factory_arg = slow_factory_arg; impl->slow_factory_arg = slow_factory_arg;
impl->fast_factory = fast_factory;
impl->fast_factory_arg = fast_factory_arg;
impl->fast_handles = RB_ENTRY(init)(cmp_handle, 0);
impl->slow_handles = 0; impl->slow_handles = 0;
impl->slow_handle_count = 0; impl->slow_handle_count = 0;
impl->workers = 0;
impl->worker_count = worker_thread_count; impl->fast_factory = fast_factory;
impl->buffer_size = buffer_size; impl->fast_factory_arg = fast_factory_arg;
impl->fast_handles = RB_ENTRY(init)(cmp_handle, 0);
impl->fast_handle_count = 0;
impl->max_fast_handles = max_fast_handles;
impl->max_buffer_size = buffer_size;
impl->used_buffer_size = 0; impl->used_buffer_size = 0;
stasis_handle_t * slow = getSlowHandle(impl); impl->workers = malloc(worker_thread_count * sizeof(pthread_t));
impl->worker_count = worker_thread_count;
impl->workers = malloc(impl->worker_count * sizeof(pthread_t)); pthread_cond_init(&impl->pending_writes_cond, 0);
impl->still_open = 1; impl->still_open = 1;
pthread_cond_init(&impl->pending_writes_cond, 0);
impl->last_flushed = 0;
impl->start_pos = slow->start_position(slow); stasis_handle_t *h = malloc(sizeof(stasis_handle_t));
impl->end_pos = slow->end_position(slow); *h = nbw_func;
h->impl = impl;
releaseSlowHandle(impl,slow);
stasis_handle_t * ret = malloc(sizeof(stasis_handle_t));
*ret = nbw_func;
ret->impl = impl;
for(int i = 0; i < impl->worker_count; i++) { for(int i = 0; i < impl->worker_count; i++) {
pthread_create(&(impl->workers[i]), 0, nbw_worker, ret); int err = pthread_create(&(impl->workers[i]), 0, nbw_worker, h);
if(err) {
perror("Coudln't spawn worker thread for non_blocking io");
}
} }
return h;
return ret;
} }