2007-06-11 21:36:57 +00:00
|
|
|
#include <stasis/io/handle.h>
|
|
|
|
#include <stasis/redblack.h>
|
2006-10-26 05:48:30 +00:00
|
|
|
#include <pthread.h>
|
|
|
|
#include <errno.h>
|
|
|
|
#include <stdlib.h>
|
|
|
|
#include <assert.h>
|
|
|
|
#include <stdio.h>
|
2007-08-20 16:23:57 +00:00
|
|
|
#include <string.h>
|
2006-10-28 03:33:02 +00:00
|
|
|
#include <linkedlist.h>
|
2007-05-26 01:03:57 +00:00
|
|
|
/**
|
|
|
|
|
|
|
|
@file
|
2006-10-26 05:48:30 +00:00
|
|
|
|
2007-08-20 16:23:57 +00:00
|
|
|
File handle that avoids blocking on writes. It attempts to ensure
|
|
|
|
that the code calling it never waits for a "slow handle" to perform
|
|
|
|
a write. Instead, when a write request is recieved, it is
|
|
|
|
temporarly stored in a "fast handle". The caller provides factory
|
|
|
|
methods that instantiate fast and slow handles.
|
|
|
|
|
|
|
|
For effeciency, this file handle imposes a special restriction upon
|
|
|
|
its callers. It implicitly partitions the underlying file into
|
|
|
|
blocks based upon the read and write requests it receives. Future
|
|
|
|
reads and writes must access complete blocks, and may not span
|
|
|
|
multiple blocks. This works well for page files (where each page is
|
|
|
|
a block), and log files, where each log entry is a block, as is the
|
|
|
|
header that stasis appends to the log entry.
|
|
|
|
|
2006-10-26 05:48:30 +00:00
|
|
|
Design:
|
|
|
|
|
|
|
|
data structures: A rb tree holds a set of "fast" handles that manage
|
|
|
|
disjoint regions. Each "fast" handle corresponds to an outstanding
|
|
|
|
write. Worker threads then flush "fast" handle contents into the
|
|
|
|
"slow" handle. Reads are serviced from the fast handles, and the
|
|
|
|
slow handle is used to fill any holes that exist within the read
|
|
|
|
range. (This implementation resorts to copies when necessary... it
|
|
|
|
is possible for a read or write to block on a memcpy(), but writes
|
|
|
|
may not block on disk i/o.)
|
|
|
|
|
2007-08-20 16:23:57 +00:00
|
|
|
Latching protocol:
|
2006-11-21 06:50:12 +00:00
|
|
|
|
|
|
|
Each static function that manipulates the tree or lists grabs a
|
|
|
|
latch. Functions that call such functions should not hold a latch
|
|
|
|
when the function is called. If a function must atomically update
|
|
|
|
the handle's state (eg: append), they should oobtain the latch,
|
|
|
|
and release it before calling another function or returning.
|
|
|
|
|
|
|
|
Exception: freeFastHandle should be called while holding the
|
|
|
|
latch.
|
|
|
|
|
2006-10-26 05:48:30 +00:00
|
|
|
*/
|
|
|
|
|
2006-11-21 06:50:12 +00:00
|
|
|
#define INVALID_NODE 2
|
2007-08-20 16:23:57 +00:00
|
|
|
/* If defined, merge writes immediately (not recommended, as doing so
|
|
|
|
decreases the granularity of the "dirty" bit, causing clean data to
|
|
|
|
be written back.). Whether or not this is defined, writes will be
|
|
|
|
merged by nbw_worker at flush.
|
|
|
|
*/
|
|
|
|
//#define EAGER_MERGE
|
|
|
|
/* If EAGER_MERGE is defined, this limits the number of pages it will
|
|
|
|
coaleasce into a single write.
|
|
|
|
*/
|
|
|
|
//#define MAX_MERGE 4
|
2006-11-21 06:50:12 +00:00
|
|
|
|
2007-08-20 16:23:57 +00:00
|
|
|
/** @return a read buffer indicating an error has occured */
|
|
|
|
static inline stasis_read_buffer_t * alloc_read_buffer_error(stasis_handle_t *h,
|
|
|
|
int error) {
|
2006-10-26 05:48:30 +00:00
|
|
|
assert(error);
|
|
|
|
stasis_read_buffer_t * r = malloc(sizeof(stasis_read_buffer_t));
|
|
|
|
r->h = h;
|
|
|
|
r->buf = 0;
|
|
|
|
r->len = 0;
|
|
|
|
r->impl = 0;
|
|
|
|
r->error = error;
|
|
|
|
return r;
|
|
|
|
}
|
2007-08-20 16:23:57 +00:00
|
|
|
/** @return a read buffer indicating a write error has occured */
|
|
|
|
static inline stasis_write_buffer_t * alloc_write_buffer_error(stasis_handle_t *h,
|
|
|
|
int error) {
|
2006-10-26 05:48:30 +00:00
|
|
|
assert(error);
|
|
|
|
stasis_write_buffer_t * w = malloc(sizeof(stasis_write_buffer_t));
|
|
|
|
w->h = h;
|
|
|
|
w->off = 0;
|
|
|
|
w->buf = 0;
|
|
|
|
w->len = 0;
|
|
|
|
w->impl = 0;
|
|
|
|
w->error = error;
|
|
|
|
return w;
|
|
|
|
}
|
2006-11-21 06:50:12 +00:00
|
|
|
/** Wraps stasis_handle_t so that it can be stored in an rbtree. */
|
2006-10-26 05:48:30 +00:00
|
|
|
typedef struct tree_node {
|
|
|
|
lsn_t start_pos;
|
|
|
|
lsn_t end_pos;
|
|
|
|
stasis_handle_t * h;
|
2007-08-20 16:23:57 +00:00
|
|
|
/** The number of I/O requests this node corresponds to. */
|
|
|
|
int write_count;
|
2006-11-21 06:50:12 +00:00
|
|
|
/** The number of threads accessing this handle. The handle cannot
|
|
|
|
be deallocated unless this is zero. */
|
|
|
|
int pin_count;
|
|
|
|
/** set to 1 when the handle is written to, 0 when the handle is
|
|
|
|
written back to disk, INVALID_NODE when the handle is not in
|
|
|
|
the tree. */
|
2007-08-20 16:23:57 +00:00
|
|
|
int dirty;
|
2006-10-26 05:48:30 +00:00
|
|
|
} tree_node;
|
|
|
|
|
2006-11-21 06:50:12 +00:00
|
|
|
/** Wrapper for write buffers */
|
|
|
|
typedef struct write_buffer_impl {
|
|
|
|
/** The tree node that contains this buffer */
|
|
|
|
const tree_node * n;
|
|
|
|
/** The underlying buffer. */
|
|
|
|
stasis_write_buffer_t * w;
|
|
|
|
} write_buffer_impl;
|
|
|
|
typedef struct read_buffer_impl {
|
|
|
|
/** The tree node that contains this buffer, or NULL if the buffer
|
|
|
|
is from a slow handle. */
|
|
|
|
const tree_node * n;
|
|
|
|
/** The underlying buffer. */
|
|
|
|
stasis_read_buffer_t * r;
|
|
|
|
} read_buffer_impl;
|
|
|
|
|
|
|
|
/**
|
|
|
|
Compare two tree_node structs. Two tree nodes are equal if they
|
|
|
|
are zero length, and start at the same point, or if they overlap.
|
|
|
|
*/
|
2006-10-26 05:48:30 +00:00
|
|
|
static int cmp_handle(const void * ap, const void * bp, const void * ignored) {
|
|
|
|
tree_node * a = (tree_node*)ap;
|
|
|
|
tree_node * b = (tree_node*)bp;
|
2006-10-28 03:33:02 +00:00
|
|
|
if(a->start_pos == b->start_pos &&
|
|
|
|
a->start_pos == a->end_pos &&
|
|
|
|
b->start_pos == b->end_pos ) {
|
2006-11-21 06:50:12 +00:00
|
|
|
return 0;
|
2006-10-28 03:33:02 +00:00
|
|
|
}
|
2006-10-26 05:48:30 +00:00
|
|
|
if(a->end_pos <= b->start_pos) {
|
|
|
|
return -1;
|
|
|
|
} else if(a->start_pos >= b->end_pos) {
|
|
|
|
return 1;
|
|
|
|
} else {
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
typedef struct nbw_impl {
|
|
|
|
pthread_mutex_t mut;
|
2006-11-21 06:50:12 +00:00
|
|
|
|
|
|
|
// Handle state
|
|
|
|
lsn_t start_pos;
|
|
|
|
lsn_t end_pos;
|
|
|
|
|
|
|
|
// Fields to manage slow handles
|
2006-10-28 03:33:02 +00:00
|
|
|
stasis_handle_t * (*slow_factory)(void * arg);
|
|
|
|
void * slow_factory_arg;
|
2006-11-21 06:50:12 +00:00
|
|
|
|
|
|
|
LinkedList * slow_handles;
|
|
|
|
int slow_handle_count;
|
2007-08-20 16:23:57 +00:00
|
|
|
|
|
|
|
// These two track statistics on write coalescing.
|
|
|
|
lsn_t requested_bytes_written;
|
|
|
|
lsn_t total_bytes_written;
|
|
|
|
|
2006-11-21 06:50:12 +00:00
|
|
|
// Fields to manage fast handles
|
2006-10-26 05:48:30 +00:00
|
|
|
stasis_handle_t * (*fast_factory)(lsn_t off, lsn_t len, void * arg);
|
|
|
|
void * fast_factory_arg;
|
2006-11-21 06:50:12 +00:00
|
|
|
|
2006-10-26 05:48:30 +00:00
|
|
|
struct RB_ENTRY(tree) * fast_handles;
|
2006-11-21 06:50:12 +00:00
|
|
|
int fast_handle_count;
|
|
|
|
int max_fast_handles;
|
2007-08-20 16:23:57 +00:00
|
|
|
int min_fast_handles;
|
2006-11-21 06:50:12 +00:00
|
|
|
lsn_t used_buffer_size;
|
|
|
|
lsn_t max_buffer_size;
|
2006-10-28 03:33:02 +00:00
|
|
|
|
2006-11-21 06:50:12 +00:00
|
|
|
// Fields to manage and signal worker threads
|
2006-10-28 03:33:02 +00:00
|
|
|
pthread_t * workers;
|
2006-10-26 05:48:30 +00:00
|
|
|
int worker_count;
|
2006-10-28 03:33:02 +00:00
|
|
|
pthread_cond_t pending_writes_cond;
|
2007-08-20 16:23:57 +00:00
|
|
|
int still_open;
|
2006-10-26 05:48:30 +00:00
|
|
|
} nbw_impl;
|
|
|
|
|
2007-08-20 16:23:57 +00:00
|
|
|
static inline void freeFastHandle(nbw_impl * impl, const tree_node * n);
|
|
|
|
|
|
|
|
/** Obtain a slow handle from the pool of existing ones, or obtain a new one
|
|
|
|
by calling impl->slow_factory.. */
|
2006-10-28 03:33:02 +00:00
|
|
|
static stasis_handle_t * getSlowHandle(nbw_impl * impl) {
|
2006-11-21 06:50:12 +00:00
|
|
|
pthread_mutex_lock(&impl->mut);
|
|
|
|
stasis_handle_t * slow = (stasis_handle_t*)popMaxVal(&impl->slow_handles);
|
|
|
|
assert(slow);
|
2006-10-28 03:33:02 +00:00
|
|
|
if((long)slow == -1) {
|
|
|
|
impl->slow_handle_count++;
|
2006-11-21 06:50:12 +00:00
|
|
|
pthread_mutex_unlock(&impl->mut);
|
|
|
|
slow = impl->slow_factory(impl->slow_factory_arg);
|
|
|
|
} else {
|
|
|
|
pthread_mutex_unlock(&impl->mut);
|
2006-10-28 03:33:02 +00:00
|
|
|
}
|
|
|
|
return slow;
|
|
|
|
}
|
2007-08-20 16:23:57 +00:00
|
|
|
/** Release a file handle back into the pool of slow handles. */
|
2006-10-28 03:33:02 +00:00
|
|
|
static void releaseSlowHandle(nbw_impl * impl, stasis_handle_t * slow) {
|
2006-11-21 06:50:12 +00:00
|
|
|
assert(slow);
|
|
|
|
pthread_mutex_lock(&impl->mut);
|
|
|
|
addVal(&impl->slow_handles, (long)slow);
|
|
|
|
pthread_mutex_unlock(&impl->mut);
|
|
|
|
}
|
|
|
|
|
|
|
|
static tree_node * allocTreeNode(lsn_t off, lsn_t len) {
|
|
|
|
tree_node * ret = malloc(sizeof(tree_node));
|
|
|
|
ret->start_pos = off;
|
|
|
|
ret->end_pos = off + len;
|
2007-08-20 16:23:57 +00:00
|
|
|
ret->write_count = 1;
|
2006-11-21 06:50:12 +00:00
|
|
|
ret->dirty = 0;
|
|
|
|
ret->pin_count = 1;
|
|
|
|
return ret;
|
2006-10-28 03:33:02 +00:00
|
|
|
}
|
2006-11-21 06:50:12 +00:00
|
|
|
|
2007-08-20 16:23:57 +00:00
|
|
|
static inline const tree_node * allocFastHandle(nbw_impl * impl, lsn_t off,
|
|
|
|
lsn_t len) {
|
2006-11-21 06:50:12 +00:00
|
|
|
tree_node * np = allocTreeNode(off, len);
|
|
|
|
pthread_mutex_lock(&impl->mut);
|
|
|
|
|
2007-08-20 16:23:57 +00:00
|
|
|
DEBUG("allocFastHandle(%lld)\n", off/PAGE_SIZE);
|
|
|
|
|
|
|
|
const tree_node * n = RB_ENTRY(lookup)(RB_LULTEQ, np, impl->fast_handles);
|
|
|
|
// this code only works when writes / reads are aligned to immutable
|
|
|
|
// boundaries, and never cross boundaries.
|
|
|
|
if((!n) ||
|
|
|
|
!(n->start_pos <= off &&
|
|
|
|
n->end_pos >= off + len)) {
|
|
|
|
|
|
|
|
// no completely overlapping range found; allocate space in np.
|
|
|
|
|
|
|
|
if(0 && (impl->fast_handle_count >= impl->max_fast_handles ||
|
|
|
|
impl->used_buffer_size + len > impl->max_buffer_size)) {
|
|
|
|
|
|
|
|
assert(n->end_pos <= off);
|
|
|
|
|
|
|
|
if(impl->fast_handle_count >= impl->max_fast_handles) {
|
|
|
|
printf("Blocking on write. %d handles (%d max)\n",
|
|
|
|
impl->fast_handle_count, impl->max_fast_handles);
|
|
|
|
}
|
|
|
|
if(impl->used_buffer_size + len > impl->max_buffer_size) {
|
|
|
|
printf("Blocking on write. %lld bytes (%lld max)\n",
|
|
|
|
impl->used_buffer_size, impl->max_buffer_size);
|
|
|
|
}
|
|
|
|
|
|
|
|
np->dirty = INVALID_NODE;
|
2006-11-21 06:50:12 +00:00
|
|
|
np->h = getSlowHandle(impl);
|
2007-08-20 16:23:57 +00:00
|
|
|
|
|
|
|
} else {
|
2006-11-21 06:50:12 +00:00
|
|
|
impl->fast_handle_count++;
|
|
|
|
impl->used_buffer_size += len;
|
|
|
|
|
2007-08-20 16:23:57 +00:00
|
|
|
#ifdef EAGER_MERGE
|
|
|
|
if(n && n->end_pos == off && n->write_count + 1 < MAX_MERGE) {
|
|
|
|
DEBUG("Did merge.\n");
|
|
|
|
((tree_node*)n)->pin_count++;
|
|
|
|
((tree_node*)n)->write_count++;
|
|
|
|
((tree_node*)n)->end_pos += len;
|
|
|
|
} else {
|
|
|
|
#endif
|
|
|
|
RB_ENTRY(search)(np, impl->fast_handles);
|
|
|
|
np->h = impl->fast_factory(off,len,impl->fast_factory_arg);
|
|
|
|
n = np;
|
|
|
|
#ifdef EAGER_MERGE
|
|
|
|
}
|
|
|
|
#endif
|
|
|
|
}
|
2006-11-21 06:50:12 +00:00
|
|
|
} else {
|
2006-10-28 03:33:02 +00:00
|
|
|
((tree_node*)n)->pin_count++;
|
2006-10-26 05:48:30 +00:00
|
|
|
free(np);
|
|
|
|
}
|
2007-08-20 16:23:57 +00:00
|
|
|
|
|
|
|
#ifdef EAGER_MERGE
|
|
|
|
// check for a mergable range immediately after the point we're interested in.
|
|
|
|
tree_node dummy;
|
|
|
|
dummy.start_pos = n->end_pos;
|
|
|
|
dummy.end_pos = n->end_pos+1;
|
|
|
|
|
|
|
|
while((np = (tree_node*)RB_ENTRY(find)(&dummy, impl->fast_handles)) && np->dirty && !np->pin_count && np->write_count + n->write_count < MAX_MERGE) {
|
|
|
|
DEBUG("Did post-merge of page %lld-%lld (%d) and %lld-%lld (%d) outstanding = %d\n", n->start_pos/PAGE_SIZE, -1+n->end_pos/PAGE_SIZE, n->write_count, np->start_pos/PAGE_SIZE, -1+np->end_pos/PAGE_SIZE, np->write_count, impl->fast_handle_count);
|
|
|
|
lsn_t appendLen = np->end_pos - np->start_pos;
|
|
|
|
stasis_read_buffer_t * r= np->h->read_buffer(np->h,np->start_pos, appendLen);
|
|
|
|
int ret = n->h->write(n->h,np->start_pos,r->buf, appendLen);
|
|
|
|
assert(!ret);
|
|
|
|
ret = r->h->release_read_buffer(r);
|
|
|
|
assert(!ret);
|
|
|
|
np->dirty = 0;
|
|
|
|
((tree_node*)n)->write_count += np->write_count;
|
|
|
|
freeFastHandle(impl,np);
|
|
|
|
RB_ENTRY(delete)(n,impl->fast_handles);
|
|
|
|
((tree_node*)n)->end_pos += appendLen;
|
|
|
|
RB_ENTRY(search)(n,impl->fast_handles);
|
|
|
|
}
|
|
|
|
#endif
|
|
|
|
|
|
|
|
pthread_mutex_unlock(&impl->mut);
|
|
|
|
|
2006-11-21 06:50:12 +00:00
|
|
|
return n;
|
2006-10-26 05:48:30 +00:00
|
|
|
}
|
2007-08-20 16:23:57 +00:00
|
|
|
static inline const tree_node * findFastHandle(nbw_impl * impl, lsn_t off, lsn_t len) {
|
2006-11-21 06:50:12 +00:00
|
|
|
tree_node * np = allocTreeNode(off, len);
|
2006-10-26 05:48:30 +00:00
|
|
|
|
2006-11-21 06:50:12 +00:00
|
|
|
pthread_mutex_lock(&impl->mut);
|
2006-10-28 03:33:02 +00:00
|
|
|
const tree_node * n = RB_ENTRY(find)(np, impl->fast_handles);
|
2006-11-21 06:50:12 +00:00
|
|
|
if(n) ((tree_node*)n)->pin_count++;
|
|
|
|
pthread_mutex_unlock(&impl->mut);
|
|
|
|
|
2006-10-28 03:33:02 +00:00
|
|
|
free(np);
|
2006-11-21 06:50:12 +00:00
|
|
|
return n;
|
|
|
|
}
|
2007-08-20 16:23:57 +00:00
|
|
|
/** Unlke all of the other fastHandle functions, the caller
|
2006-11-21 06:50:12 +00:00
|
|
|
should hold the mutex when calling freeFastHandle. */
|
2007-08-20 16:23:57 +00:00
|
|
|
static inline void freeFastHandle(nbw_impl * impl, const tree_node * n) {
|
2006-11-21 06:50:12 +00:00
|
|
|
RB_ENTRY(delete)(n, impl->fast_handles);
|
|
|
|
n->h->close(n->h);
|
|
|
|
free((void*)n);
|
|
|
|
}
|
|
|
|
static inline int releaseFastHandle(nbw_impl * impl, const tree_node * n,
|
|
|
|
int setDirty) {
|
|
|
|
if(n->dirty == INVALID_NODE) {
|
|
|
|
// Not in tree; cast removes "const"
|
|
|
|
releaseSlowHandle(impl, n->h);
|
|
|
|
free((void*)n);
|
|
|
|
return 0;
|
|
|
|
} else {
|
|
|
|
assert(setDirty == 0 || setDirty == 1);
|
|
|
|
assert(n->dirty == 0 || n->dirty == 1);
|
|
|
|
pthread_mutex_lock(&impl->mut);
|
|
|
|
((tree_node*)n)->pin_count--;
|
|
|
|
if(n->dirty == 0) {
|
|
|
|
((tree_node*)n)->dirty = setDirty;
|
|
|
|
}
|
|
|
|
pthread_mutex_unlock(&impl->mut);
|
2007-08-20 16:23:57 +00:00
|
|
|
if(impl->fast_handle_count > impl->min_fast_handles) {
|
|
|
|
pthread_cond_signal(&impl->pending_writes_cond);
|
|
|
|
}
|
2006-11-21 06:50:12 +00:00
|
|
|
return 0;
|
2006-10-28 03:33:02 +00:00
|
|
|
}
|
|
|
|
}
|
2006-11-21 06:50:12 +00:00
|
|
|
/** @todo nbw_num_copies is unimplemented. */
|
2006-10-26 05:48:30 +00:00
|
|
|
static int nbw_num_copies(stasis_handle_t * h) {
|
2006-11-21 06:50:12 +00:00
|
|
|
return 0;
|
2006-10-26 05:48:30 +00:00
|
|
|
}
|
2006-11-21 06:50:12 +00:00
|
|
|
/** @todo nbw_num_copies_buffer is unimplemented. */
|
2006-10-26 05:48:30 +00:00
|
|
|
static int nbw_num_copies_buffer(stasis_handle_t * h) {
|
2006-11-21 06:50:12 +00:00
|
|
|
return 0;
|
2006-10-26 05:48:30 +00:00
|
|
|
}
|
|
|
|
static int nbw_close(stasis_handle_t * h) {
|
|
|
|
nbw_impl * impl = h->impl;
|
2007-08-20 16:23:57 +00:00
|
|
|
|
2006-10-28 03:33:02 +00:00
|
|
|
pthread_mutex_lock(&impl->mut);
|
|
|
|
impl->still_open = 0;
|
|
|
|
pthread_mutex_unlock(&impl->mut);
|
|
|
|
pthread_cond_broadcast(&impl->pending_writes_cond);
|
2006-11-21 06:50:12 +00:00
|
|
|
|
2007-08-20 16:23:57 +00:00
|
|
|
for(int i = 0; i < impl->worker_count; i++) {
|
2006-10-28 03:33:02 +00:00
|
|
|
pthread_join(impl->workers[i], 0);
|
|
|
|
}
|
2006-11-21 06:50:12 +00:00
|
|
|
|
2007-08-20 16:23:57 +00:00
|
|
|
// No longer need latch; this is the only thread allowed to touch the handle.
|
2006-10-28 03:33:02 +00:00
|
|
|
free(impl->workers);
|
|
|
|
|
2007-08-20 16:23:57 +00:00
|
|
|
DEBUG("nbw had %d slow handles\n", impl->slow_handle_count);
|
|
|
|
DEBUG("fast handles = %d, used buffer = %lld\n", impl->fast_handle_count, impl->used_buffer_size);
|
|
|
|
if(impl->requested_bytes_written < impl->total_bytes_written) {
|
|
|
|
printf("nbw: Problem with write coalescing detected.\n"
|
|
|
|
"Client wrote %lld bytes, handle wrote %lld.\n",
|
|
|
|
impl->requested_bytes_written, impl->total_bytes_written);
|
|
|
|
}
|
2006-10-28 03:33:02 +00:00
|
|
|
|
2006-11-21 06:50:12 +00:00
|
|
|
assert(impl->fast_handle_count == 0);
|
|
|
|
assert(impl->used_buffer_size == 0);
|
2006-10-28 03:33:02 +00:00
|
|
|
|
2006-11-21 06:50:12 +00:00
|
|
|
RB_ENTRY(destroy)(impl->fast_handles);
|
|
|
|
pthread_mutex_destroy(&impl->mut);
|
|
|
|
stasis_handle_t * slow;
|
|
|
|
while(-1 != (long)(slow = (stasis_handle_t*)popMaxVal(&impl->slow_handles))) {
|
2006-10-28 03:33:02 +00:00
|
|
|
slow->close(slow);
|
2006-11-21 06:50:12 +00:00
|
|
|
impl->slow_handle_count--;
|
2006-10-28 03:33:02 +00:00
|
|
|
}
|
2006-11-21 06:50:12 +00:00
|
|
|
destroyList(&impl->slow_handles);
|
2006-10-28 03:33:02 +00:00
|
|
|
|
2006-11-21 06:50:12 +00:00
|
|
|
assert(impl->slow_handle_count == 0);
|
|
|
|
|
2006-10-26 05:48:30 +00:00
|
|
|
free(h->impl);
|
|
|
|
free(h);
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
static lsn_t nbw_start_position(stasis_handle_t *h) {
|
2006-11-21 06:50:12 +00:00
|
|
|
nbw_impl * impl = h->impl;
|
|
|
|
pthread_mutex_lock(&impl->mut);
|
2006-10-26 05:48:30 +00:00
|
|
|
lsn_t ret = impl->start_pos;
|
2006-11-21 06:50:12 +00:00
|
|
|
pthread_mutex_unlock(&impl->mut);
|
2006-10-26 05:48:30 +00:00
|
|
|
return ret;
|
|
|
|
}
|
|
|
|
static lsn_t nbw_end_position(stasis_handle_t *h) {
|
2006-11-21 06:50:12 +00:00
|
|
|
nbw_impl * impl = h->impl;
|
|
|
|
pthread_mutex_lock(&impl->mut);
|
|
|
|
lsn_t ret = impl->end_pos;
|
|
|
|
pthread_mutex_unlock(&impl->mut);
|
2006-10-26 05:48:30 +00:00
|
|
|
return ret;
|
|
|
|
}
|
|
|
|
static stasis_write_buffer_t * nbw_write_buffer(stasis_handle_t * h,
|
|
|
|
lsn_t off, lsn_t len) {
|
2006-11-21 06:50:12 +00:00
|
|
|
nbw_impl * impl = h->impl;
|
|
|
|
const tree_node * n = allocFastHandle(impl, off, len);
|
|
|
|
stasis_write_buffer_t * w = n->h->write_buffer(n->h, off, len);
|
2006-10-26 05:48:30 +00:00
|
|
|
|
2006-11-21 06:50:12 +00:00
|
|
|
write_buffer_impl * w_impl = malloc(sizeof(write_buffer_impl));
|
|
|
|
w_impl->n = n;
|
|
|
|
w_impl->w = w;
|
2006-10-26 05:48:30 +00:00
|
|
|
|
2006-11-21 06:50:12 +00:00
|
|
|
stasis_write_buffer_t * ret = malloc(sizeof(stasis_write_buffer_t));
|
|
|
|
ret->h = h;
|
|
|
|
ret->off = w->off;
|
|
|
|
ret->len = w->len;
|
|
|
|
ret->buf = w->buf;
|
|
|
|
ret->error = w->error;
|
|
|
|
ret->impl = w_impl;
|
2006-10-26 05:48:30 +00:00
|
|
|
|
2006-11-21 06:50:12 +00:00
|
|
|
if(!ret->error) {
|
|
|
|
pthread_mutex_lock(&impl->mut);
|
|
|
|
assert(impl->start_pos <= impl->end_pos);
|
|
|
|
if(off < impl->start_pos) {
|
|
|
|
// Note: We're returning a valid write buffer to space before
|
|
|
|
// the handle's truncation point. Spooky.
|
|
|
|
ret->error = EDOM;
|
|
|
|
} else if(off + len > impl->end_pos) {
|
|
|
|
impl->end_pos = off+len;
|
2006-10-28 03:33:02 +00:00
|
|
|
}
|
2007-08-20 16:23:57 +00:00
|
|
|
impl->requested_bytes_written += len;
|
2006-11-21 06:50:12 +00:00
|
|
|
pthread_mutex_unlock(&impl->mut);
|
2006-10-26 05:48:30 +00:00
|
|
|
}
|
2006-11-21 06:50:12 +00:00
|
|
|
|
|
|
|
return ret;
|
2006-10-26 05:48:30 +00:00
|
|
|
}
|
|
|
|
static stasis_write_buffer_t * nbw_append_buffer(stasis_handle_t * h,
|
|
|
|
lsn_t len) {
|
2006-11-21 06:50:12 +00:00
|
|
|
nbw_impl * impl = h->impl;
|
|
|
|
|
|
|
|
pthread_mutex_lock(&impl->mut);
|
2006-10-26 05:48:30 +00:00
|
|
|
lsn_t off = impl->end_pos;
|
2006-11-21 06:50:12 +00:00
|
|
|
impl->end_pos += len;
|
2007-08-20 16:23:57 +00:00
|
|
|
impl->requested_bytes_written += len;
|
2006-11-21 06:50:12 +00:00
|
|
|
pthread_mutex_unlock(&impl->mut);
|
2006-10-28 03:33:02 +00:00
|
|
|
|
2006-11-21 06:50:12 +00:00
|
|
|
return nbw_write_buffer(h, off, len);
|
2006-10-26 05:48:30 +00:00
|
|
|
}
|
|
|
|
static int nbw_release_write_buffer(stasis_write_buffer_t * w) {
|
2006-10-28 03:33:02 +00:00
|
|
|
nbw_impl * impl = w->h->impl;
|
2006-11-21 06:50:12 +00:00
|
|
|
write_buffer_impl * w_impl = w->impl;
|
|
|
|
const tree_node * n = w_impl->n;
|
|
|
|
w_impl->w->h->release_write_buffer(w_impl->w);
|
|
|
|
releaseFastHandle(impl, n, 1);
|
|
|
|
free(w_impl);
|
2006-10-28 03:33:02 +00:00
|
|
|
free(w);
|
2006-10-26 05:48:30 +00:00
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
static stasis_read_buffer_t * nbw_read_buffer(stasis_handle_t * h,
|
|
|
|
lsn_t off, lsn_t len) {
|
2006-11-21 06:50:12 +00:00
|
|
|
nbw_impl * impl = h->impl;
|
|
|
|
const tree_node * n = findFastHandle(impl, off, len);
|
|
|
|
stasis_read_buffer_t * r;
|
|
|
|
stasis_handle_t * r_h = n ? n->h : getSlowHandle(impl);
|
|
|
|
r = r_h->read_buffer(r_h, off, len);
|
|
|
|
|
|
|
|
read_buffer_impl * r_impl = malloc(sizeof(read_buffer_impl));
|
|
|
|
r_impl->n = n;
|
|
|
|
r_impl->r = r;
|
2007-08-20 16:23:57 +00:00
|
|
|
|
|
|
|
stasis_read_buffer_t * ret = malloc(sizeof(stasis_read_buffer_t));
|
2006-11-21 06:50:12 +00:00
|
|
|
ret->h = h;
|
|
|
|
ret->off = r->off;
|
|
|
|
ret->len = r->len;
|
|
|
|
ret->buf = r->buf;
|
|
|
|
ret->error = r->error;
|
|
|
|
ret->impl = r_impl;
|
|
|
|
|
|
|
|
return ret;
|
2006-10-26 05:48:30 +00:00
|
|
|
}
|
|
|
|
static int nbw_release_read_buffer(stasis_read_buffer_t * r) {
|
2006-10-28 03:33:02 +00:00
|
|
|
nbw_impl * impl = r->h->impl;
|
2006-11-21 06:50:12 +00:00
|
|
|
read_buffer_impl * r_impl = r->impl;
|
|
|
|
const tree_node * n = r_impl->n;
|
|
|
|
stasis_handle_t * oldHandle = r_impl->r->h;
|
|
|
|
r_impl->r->h->release_read_buffer(r_impl->r);
|
|
|
|
// XXX shouldn't need to check for this here; getFastHandle does something similar...
|
|
|
|
if(n) {
|
|
|
|
releaseFastHandle(impl, n, 0);
|
2006-10-26 05:48:30 +00:00
|
|
|
} else {
|
2006-11-21 06:50:12 +00:00
|
|
|
assert(oldHandle);
|
|
|
|
releaseSlowHandle(impl, oldHandle);
|
2006-10-26 05:48:30 +00:00
|
|
|
}
|
2006-11-21 06:50:12 +00:00
|
|
|
free(r_impl);
|
2006-10-28 03:33:02 +00:00
|
|
|
free(r);
|
2006-10-26 05:48:30 +00:00
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
static int nbw_write(stasis_handle_t * h, lsn_t off,
|
|
|
|
const byte * dat, lsn_t len) {
|
2006-11-21 06:50:12 +00:00
|
|
|
nbw_impl * impl = h->impl;
|
|
|
|
const tree_node * n = allocFastHandle(impl, off, len);
|
|
|
|
int ret = n->h->write(n->h, off, dat, len);
|
|
|
|
releaseFastHandle(impl, n, 1);
|
|
|
|
if(!ret) {
|
|
|
|
pthread_mutex_lock(&impl->mut);
|
|
|
|
assert(impl->start_pos <= impl->end_pos);
|
|
|
|
if(off < impl->start_pos) {
|
|
|
|
ret = EDOM;
|
|
|
|
} else if(off + len > impl->end_pos) {
|
|
|
|
impl->end_pos = off+len;
|
2006-10-26 05:48:30 +00:00
|
|
|
}
|
2007-08-20 16:23:57 +00:00
|
|
|
impl->requested_bytes_written += len;
|
2006-11-21 06:50:12 +00:00
|
|
|
pthread_mutex_unlock(&impl->mut);
|
2006-10-26 05:48:30 +00:00
|
|
|
}
|
2006-11-21 06:50:12 +00:00
|
|
|
return ret;
|
2006-10-26 05:48:30 +00:00
|
|
|
}
|
|
|
|
static int nbw_append(stasis_handle_t * h, lsn_t * off,
|
|
|
|
const byte * dat, lsn_t len) {
|
2006-11-21 06:50:12 +00:00
|
|
|
nbw_impl * impl = h->impl;
|
|
|
|
|
2006-10-26 05:48:30 +00:00
|
|
|
pthread_mutex_lock(&impl->mut);
|
|
|
|
*off = impl->end_pos;
|
2006-11-21 06:50:12 +00:00
|
|
|
impl->end_pos+= len;
|
2007-08-20 16:23:57 +00:00
|
|
|
impl->requested_bytes_written += len;
|
2006-10-26 05:48:30 +00:00
|
|
|
pthread_mutex_unlock(&impl->mut);
|
2006-11-21 06:50:12 +00:00
|
|
|
|
|
|
|
int ret = nbw_write(h, *off, dat, len);
|
|
|
|
return ret;
|
2006-10-26 05:48:30 +00:00
|
|
|
}
|
|
|
|
static int nbw_read(stasis_handle_t * h,
|
|
|
|
lsn_t off, byte * buf, lsn_t len) {
|
2006-11-21 06:50:12 +00:00
|
|
|
nbw_impl * impl = h->impl;
|
|
|
|
const tree_node * n = findFastHandle(impl, off, len);
|
|
|
|
int ret;
|
|
|
|
// XXX should be handled by releaseFastHandle.
|
|
|
|
if(n) {
|
|
|
|
ret = n->h->read(n->h, off, buf, len);
|
|
|
|
releaseFastHandle(impl, n, 0);
|
|
|
|
} else {
|
|
|
|
stasis_handle_t * slow = getSlowHandle(impl);
|
|
|
|
ret = slow->read(slow, off, buf, len);
|
|
|
|
releaseSlowHandle(impl, slow);
|
2006-10-26 05:48:30 +00:00
|
|
|
}
|
2006-11-21 06:50:12 +00:00
|
|
|
return ret;
|
2006-10-26 05:48:30 +00:00
|
|
|
}
|
|
|
|
static int nbw_truncate_start(stasis_handle_t * h, lsn_t new_start) {
|
|
|
|
nbw_impl * impl = h->impl;
|
|
|
|
int error = 0;
|
|
|
|
pthread_mutex_lock(&impl->mut);
|
|
|
|
if(new_start <= impl->end_pos && new_start > impl->start_pos) {
|
|
|
|
impl->start_pos = new_start;
|
|
|
|
} else {
|
|
|
|
error = EDOM;
|
|
|
|
}
|
|
|
|
pthread_mutex_unlock(&impl->mut);
|
2006-11-21 06:50:12 +00:00
|
|
|
if(!error) {
|
|
|
|
// XXX close all slow handles; truncate of them. (ie: implement truncate)
|
|
|
|
}
|
2006-10-26 05:48:30 +00:00
|
|
|
return error;
|
|
|
|
}
|
|
|
|
|
|
|
|
struct stasis_handle_t nbw_func = {
|
|
|
|
.num_copies = nbw_num_copies,
|
|
|
|
.num_copies_buffer = nbw_num_copies_buffer,
|
|
|
|
.close = nbw_close,
|
|
|
|
.start_position = nbw_start_position,
|
|
|
|
.end_position = nbw_end_position,
|
|
|
|
.write = nbw_write,
|
|
|
|
.append = nbw_append,
|
|
|
|
.write_buffer = nbw_write_buffer,
|
|
|
|
.append_buffer = nbw_append_buffer,
|
|
|
|
.release_write_buffer = nbw_release_write_buffer,
|
|
|
|
.read = nbw_read,
|
|
|
|
.read_buffer = nbw_read_buffer,
|
|
|
|
.release_read_buffer = nbw_release_read_buffer,
|
|
|
|
.truncate_start = nbw_truncate_start,
|
|
|
|
.error = 0
|
|
|
|
};
|
|
|
|
|
2007-08-20 16:23:57 +00:00
|
|
|
/**
|
|
|
|
This worker thread simulates asynchrnous I/O by handling writeback
|
|
|
|
on behalf of the application. Multiple workers may be spawned for
|
|
|
|
a non-blocking handle.
|
|
|
|
|
|
|
|
This function walks the list of fast handles, writing back dirty
|
|
|
|
ones, and freeing clean ones. It (almost) never performs a write
|
|
|
|
while holding the mutex.
|
|
|
|
|
|
|
|
@todo Non-blocking handle should not memcpy() buffers while holding
|
|
|
|
the mutex.
|
|
|
|
|
|
|
|
*/
|
2006-11-21 06:50:12 +00:00
|
|
|
static void * nbw_worker(void * handle) {
|
|
|
|
stasis_handle_t * h = handle;
|
|
|
|
nbw_impl * impl = h->impl;
|
2006-10-28 03:33:02 +00:00
|
|
|
|
|
|
|
pthread_mutex_lock(&impl->mut);
|
2006-11-21 06:50:12 +00:00
|
|
|
while(1) {
|
|
|
|
// cast strips const.
|
|
|
|
tree_node * node = (tree_node*)RB_ENTRY(min)(impl->fast_handles);
|
|
|
|
int writes = 0;
|
2007-08-20 16:23:57 +00:00
|
|
|
while(node) {
|
|
|
|
if(node->dirty && !node->pin_count) {
|
2006-11-21 06:50:12 +00:00
|
|
|
node->dirty = 0;
|
|
|
|
node->pin_count++;
|
|
|
|
pthread_mutex_unlock(&impl->mut);
|
|
|
|
writes++;
|
|
|
|
stasis_handle_t * slow = getSlowHandle(impl);
|
|
|
|
stasis_handle_t * fast = node->h;
|
|
|
|
lsn_t off = fast->start_position(fast);
|
|
|
|
lsn_t len = fast->end_position(fast) - off;
|
|
|
|
stasis_read_buffer_t * r = fast->read_buffer(fast, off, len);
|
2007-08-20 16:23:57 +00:00
|
|
|
|
|
|
|
// cast strips const
|
|
|
|
byte *buf = (byte*)r->buf;
|
|
|
|
|
|
|
|
pthread_mutex_lock(&impl->mut);
|
|
|
|
|
|
|
|
int first = 1;
|
|
|
|
off_t buf_off = 0;
|
|
|
|
tree_node dummy;
|
|
|
|
dummy.start_pos = node->end_pos;
|
|
|
|
dummy.end_pos = node->end_pos+1;
|
|
|
|
tree_node * np;
|
|
|
|
while((np = (tree_node*)RB_ENTRY(find)(&dummy, impl->fast_handles))
|
|
|
|
&& np->dirty && !np->pin_count) {
|
|
|
|
lsn_t np_len = np->end_pos - np->start_pos;
|
|
|
|
len += np_len;
|
|
|
|
|
|
|
|
if(first) {
|
|
|
|
buf = malloc(r->len + len);
|
|
|
|
memcpy(buf, r->buf, r->len);
|
|
|
|
buf_off += r->len;
|
|
|
|
first = 0;
|
|
|
|
} else {
|
|
|
|
buf = realloc(buf, len);
|
|
|
|
}
|
|
|
|
|
|
|
|
stasis_handle_t * fast2 = np->h;
|
|
|
|
stasis_read_buffer_t * r2 = fast2->read_buffer(fast2,np->start_pos, np_len);
|
|
|
|
memcpy(buf + buf_off, r2->buf, np_len);
|
|
|
|
buf_off += np_len;
|
|
|
|
r2->h->release_read_buffer(r2);
|
|
|
|
np->dirty = 0;
|
|
|
|
dummy.start_pos = np->end_pos;
|
|
|
|
dummy.end_pos = np->end_pos+1;
|
|
|
|
}
|
|
|
|
|
|
|
|
impl->total_bytes_written += len;
|
|
|
|
|
|
|
|
pthread_mutex_unlock(&impl->mut);
|
|
|
|
|
|
|
|
if(len != PAGE_SIZE) {
|
|
|
|
DEBUG("merged %lld pages at %lld into single write\n", len/PAGE_SIZE, off/PAGE_SIZE);
|
|
|
|
}
|
|
|
|
slow->write(slow, off, buf, len);
|
|
|
|
|
|
|
|
if(!first) {
|
|
|
|
free(buf);
|
|
|
|
}
|
|
|
|
|
2006-11-21 06:50:12 +00:00
|
|
|
r->h->release_read_buffer(r);
|
|
|
|
releaseSlowHandle(impl, slow);
|
|
|
|
pthread_mutex_lock(&impl->mut);
|
|
|
|
node->pin_count--;
|
2007-08-20 16:23:57 +00:00
|
|
|
}
|
|
|
|
tree_node *new_node = (tree_node*)RB_ENTRY(lookup)(RB_LUGREAT, node, impl->fast_handles);
|
2006-11-21 06:50:12 +00:00
|
|
|
if(!node->dirty && !node->pin_count) {
|
2007-08-20 16:23:57 +00:00
|
|
|
impl->fast_handle_count -= node->write_count;
|
|
|
|
impl->used_buffer_size -= (node->end_pos - node->start_pos);
|
|
|
|
freeFastHandle(impl, node);
|
2006-10-28 03:33:02 +00:00
|
|
|
}
|
2007-04-21 07:51:33 +00:00
|
|
|
node = new_node;
|
2006-10-28 03:33:02 +00:00
|
|
|
}
|
2006-11-21 06:50:12 +00:00
|
|
|
if(!impl->fast_handle_count || !writes) {
|
|
|
|
if(impl->still_open) {
|
|
|
|
pthread_cond_wait(&impl->pending_writes_cond, &impl->mut);
|
|
|
|
} else {
|
|
|
|
break;
|
|
|
|
}
|
2006-10-28 03:33:02 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
pthread_mutex_unlock(&impl->mut);
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
|
|
|
stasis_handle_t * stasis_handle(open_non_blocking)(stasis_handle_t * (*slow_factory)(void * arg),
|
|
|
|
void * slow_factory_arg,
|
2006-10-26 05:48:30 +00:00
|
|
|
stasis_handle_t * (*fast_factory)(lsn_t, lsn_t, void *),
|
2006-10-28 03:33:02 +00:00
|
|
|
void * fast_factory_arg,
|
|
|
|
int worker_thread_count,
|
2006-11-21 06:50:12 +00:00
|
|
|
lsn_t buffer_size, int max_fast_handles) {
|
2006-10-26 05:48:30 +00:00
|
|
|
nbw_impl * impl = malloc(sizeof(nbw_impl));
|
|
|
|
pthread_mutex_init(&impl->mut, 0);
|
2006-11-21 06:50:12 +00:00
|
|
|
|
|
|
|
impl->start_pos = 0;
|
|
|
|
impl->end_pos = 0;
|
|
|
|
|
2006-10-28 03:33:02 +00:00
|
|
|
impl->slow_factory = slow_factory;
|
|
|
|
impl->slow_factory_arg = slow_factory_arg;
|
2006-11-21 06:50:12 +00:00
|
|
|
|
|
|
|
impl->slow_handles = 0;
|
|
|
|
impl->slow_handle_count = 0;
|
|
|
|
|
2007-08-20 16:23:57 +00:00
|
|
|
impl->requested_bytes_written = 0;
|
|
|
|
impl->total_bytes_written = 0;
|
|
|
|
|
2006-10-26 05:48:30 +00:00
|
|
|
impl->fast_factory = fast_factory;
|
|
|
|
impl->fast_factory_arg = fast_factory_arg;
|
2006-11-21 06:50:12 +00:00
|
|
|
|
2006-10-26 05:48:30 +00:00
|
|
|
impl->fast_handles = RB_ENTRY(init)(cmp_handle, 0);
|
2006-11-21 06:50:12 +00:00
|
|
|
impl->fast_handle_count = 0;
|
|
|
|
impl->max_fast_handles = max_fast_handles;
|
2007-08-20 16:23:57 +00:00
|
|
|
impl->min_fast_handles = max_fast_handles / 2;
|
2006-11-21 06:50:12 +00:00
|
|
|
impl->max_buffer_size = buffer_size;
|
2006-10-28 03:33:02 +00:00
|
|
|
impl->used_buffer_size = 0;
|
|
|
|
|
2006-11-21 06:50:12 +00:00
|
|
|
impl->workers = malloc(worker_thread_count * sizeof(pthread_t));
|
|
|
|
impl->worker_count = worker_thread_count;
|
2006-10-28 03:33:02 +00:00
|
|
|
|
2006-11-21 06:50:12 +00:00
|
|
|
pthread_cond_init(&impl->pending_writes_cond, 0);
|
2006-10-28 03:33:02 +00:00
|
|
|
|
2006-11-21 06:50:12 +00:00
|
|
|
impl->still_open = 1;
|
2006-10-26 05:48:30 +00:00
|
|
|
|
2006-11-21 06:50:12 +00:00
|
|
|
stasis_handle_t *h = malloc(sizeof(stasis_handle_t));
|
|
|
|
*h = nbw_func;
|
|
|
|
h->impl = impl;
|
2006-10-28 03:33:02 +00:00
|
|
|
|
|
|
|
for(int i = 0; i < impl->worker_count; i++) {
|
2006-11-21 06:50:12 +00:00
|
|
|
int err = pthread_create(&(impl->workers[i]), 0, nbw_worker, h);
|
|
|
|
if(err) {
|
|
|
|
perror("Coudln't spawn worker thread for non_blocking io");
|
|
|
|
}
|
2006-10-28 03:33:02 +00:00
|
|
|
}
|
2007-08-20 16:23:57 +00:00
|
|
|
|
|
|
|
DEBUG("Opened non blocking I/O handle; buffer size = %lldmb max outstanding writes = %d\n",
|
|
|
|
impl->max_buffer_size / (1024 * 1024), impl->max_fast_handles);
|
|
|
|
|
2006-11-21 06:50:12 +00:00
|
|
|
return h;
|
2006-10-26 05:48:30 +00:00
|
|
|
}
|