* Fixed a race condition. Pages that were merged during writeback were

set clean and unpinned while the slow handle was writing them back.
This allowed other threads to come in, and read the old version of the
page from disk.

* Fixed performance bug: When considering pages for writeback, start
  after the end of the last flushed page (not the first flushed page)

* Honor "max_fast_handles".
This commit is contained in:
Sears Russell 2007-10-22 20:45:48 +00:00
parent ebc6258fea
commit 44a766a7f6

View file

@ -50,16 +50,12 @@
*/
#define INVALID_NODE 2
/* If defined, merge writes immediately (not recommended, as doing so
decreases the granularity of the "dirty" bit, causing clean data to
be written back.). Whether or not this is defined, writes will be
merged by nbw_worker at flush.
/*
This should probably be enabled by default. It causes the writeback
threads to coalesce write requests before passing them to the
kernel.
*/
//#define EAGER_MERGE
/* If EAGER_MERGE is defined, this limits the number of pages it will
coaleasce into a single write.
*/
//#define MAX_MERGE 4
#define MERGE_WRITES
/** @return a read buffer indicating an error has occured */
static inline stasis_read_buffer_t* alloc_read_buffer_error(stasis_handle_t *h,
@ -91,8 +87,6 @@ typedef struct tree_node {
lsn_t start_pos;
lsn_t end_pos;
stasis_handle_t * h;
/** The number of I/O requests this node corresponds to. */
int write_count;
/** The number of threads accessing this handle. The handle cannot
be deallocated unless this is zero. */
int pin_count;
@ -203,7 +197,6 @@ static tree_node * allocTreeNode(lsn_t off, lsn_t len) {
tree_node * ret = malloc(sizeof(tree_node));
ret->start_pos = off;
ret->end_pos = off + len;
ret->write_count = 1;
ret->dirty = 0;
ret->pin_count = 1;
return ret;
@ -224,80 +217,40 @@ static inline const tree_node * allocFastHandle(nbw_impl * impl, lsn_t off,
n->end_pos >= off + len)) {
// no completely overlapping range found; allocate space in np.
if(0 && (impl->fast_handle_count >= impl->max_fast_handles ||
impl->used_buffer_size + len > impl->max_buffer_size)) {
assert(n->end_pos <= off);
if(impl->fast_handle_count >= impl->max_fast_handles ||
impl->used_buffer_size + len > impl->max_buffer_size) {
if(impl->fast_handle_count >= impl->max_fast_handles) {
printf("Blocking on write. %d handles (%d max)\n",
impl->fast_handle_count, impl->max_fast_handles);
DEBUG("Blocking on write. %d handles (%d max)\n",
impl->fast_handle_count, impl->max_fast_handles);
}
if(impl->used_buffer_size + len > impl->max_buffer_size) {
printf("Blocking on write. %lld bytes (%lld max)\n",
impl->used_buffer_size, impl->max_buffer_size);
DEBUG("Blocking on write. %lld bytes (%lld max)\n",
impl->used_buffer_size, impl->max_buffer_size);
}
pthread_mutex_unlock(&impl->mut);
np->dirty = INVALID_NODE;
// @todo should non_blocking fall back on slow handles for backpressure?
np->h = getSlowHandle(impl);
return np;
} else {
impl->fast_handle_count++;
DEBUG("inc fast handle count %d", impl->fast_handle_count);
impl->used_buffer_size += len;
#ifdef EAGER_MERGE
if(n && n->end_pos == off && n->write_count + 1 < MAX_MERGE) {
DEBUG("Did merge.\n");
((tree_node*)n)->pin_count++;
((tree_node*)n)->write_count++;
((tree_node*)n)->end_pos += len;
} else {
#endif
RB_ENTRY(search)(np, impl->fast_handles);
np->h = impl->fast_factory(off,len,impl->fast_factory_arg);
n = np;
#ifdef EAGER_MERGE
}
#endif
np->h = impl->fast_factory(off,len,impl->fast_factory_arg);
RB_ENTRY(search)(np, impl->fast_handles);
n = np;
}
} else {
((tree_node*)n)->pin_count++;
free(np);
}
#ifdef EAGER_MERGE
// check for a mergable range immediately after the point we're
// interested in. (DEAD CODE)
tree_node dummy;
dummy.start_pos = n->end_pos;
dummy.end_pos = n->end_pos+1;
while((np = (tree_node*)RB_ENTRY(find)(&dummy, impl->fast_handles))
&& np->dirty
&& !np->pin_count
&& np->write_count + n->write_count < MAX_MERGE) {
DEBUG("Did post-merge of page %lld-%lld (%d) and %lld-%lld (%d) "
"outstanding = %d\n", n->start_pos/PAGE_SIZE,
-1+n->end_pos/PAGE_SIZE, n->write_count, np->start_pos/PAGE_SIZE,
-1+np->end_pos/PAGE_SIZE, np->write_count, impl->fast_handle_count);
lsn_t appendLen = np->end_pos - np->start_pos;
stasis_read_buffer_t * r= np->h->read_buffer(np->h,np->start_pos,
appendLen);
int ret = n->h->write(n->h,np->start_pos,r->buf, appendLen);
assert(!ret);
ret = r->h->release_read_buffer(r);
assert(!ret);
np->dirty = 0;
((tree_node*)n)->write_count += np->write_count;
freeFastHandle(impl,np);
RB_ENTRY(delete)(n,impl->fast_handles);
((tree_node*)n)->end_pos += appendLen;
RB_ENTRY(search)(n,impl->fast_handles);
}
#endif
pthread_mutex_unlock(&impl->mut);
return n;
@ -317,6 +270,9 @@ static inline const tree_node * findFastHandle(nbw_impl * impl, lsn_t off,
/** Unlke all of the other fastHandle functions, the caller
should hold the mutex when calling freeFastHandle. */
static inline void freeFastHandle(nbw_impl * impl, const tree_node * n) {
assert(impl->fast_handle_count>=0);
impl->fast_handle_count--;
DEBUG("dec fast handle count %d", impl->fast_handle_count);
RB_ENTRY(delete)(n, impl->fast_handles);
n->h->close(n->h);
free((void*)n);
@ -562,7 +518,7 @@ static int nbw_truncate_start(stasis_handle_t * h, lsn_t new_start) {
}
pthread_mutex_unlock(&impl->mut);
if(!error) {
// XXX close all slow handles; truncate of them. (ie: implement truncate)
// @todo close all slow handles; truncate them. (ie: implement truncate)
}
return error;
}
@ -602,34 +558,37 @@ static void * nbw_worker(void * handle) {
stasis_handle_t * h = handle;
nbw_impl * impl = h->impl;
stasis_handle_t * slow = getSlowHandle(impl);
pthread_mutex_lock(&impl->mut);
while(1) {
// cast strips const.
tree_node * node = (tree_node*)RB_ENTRY(min)(impl->fast_handles);
int writes = 0;
while(node) {
tree_node next_node = *node;
if(node->dirty && !node->pin_count) {
node->dirty = 0;
node->pin_count++;
pthread_mutex_unlock(&impl->mut);
writes++;
stasis_handle_t * slow = getSlowHandle(impl);
stasis_handle_t * fast = node->h;
lsn_t off = fast->start_position(fast);
lsn_t len = fast->end_position(fast) - off;
stasis_read_buffer_t * r = fast->read_buffer(fast, off, len);
#ifdef MERGE_WRITES
// cast strips const
byte *buf = (byte*)r->buf;
pthread_mutex_lock(&impl->mut);
int first = 1;
off_t buf_off = 0;
tree_node dummy;
dummy.start_pos = node->end_pos;
dummy.end_pos = node->end_pos+1;
tree_node * np;
tree_node * dummies = 0;
int dummy_count = 0;
while((np = (tree_node*)RB_ENTRY(find)(&dummy, impl->fast_handles))
&& np->dirty && !np->pin_count) {
lsn_t np_len = np->end_pos - np->start_pos;
@ -639,9 +598,17 @@ static void * nbw_worker(void * handle) {
buf = malloc(r->len + len);
memcpy(buf, r->buf, r->len);
buf_off += r->len;
dummies = malloc(sizeof(tree_node));
dummies[0] = dummy;
dummy_count = 1;
first = 0;
} else {
buf = realloc(buf, len);
dummies = realloc(dummies, sizeof(tree_node) * (dummy_count+1));
dummies[dummy_count] = dummy;
dummy_count++;
}
stasis_handle_t * fast2 = np->h;
@ -651,6 +618,7 @@ static void * nbw_worker(void * handle) {
buf_off += np_len;
r2->h->release_read_buffer(r2);
np->dirty = 0;
np->pin_count++;
dummy.start_pos = np->end_pos;
dummy.end_pos = np->end_pos+1;
}
@ -668,16 +636,32 @@ static void * nbw_worker(void * handle) {
if(!first) {
free(buf);
}
#else
pthread_mutex_unlock(&impl->mut);
slow->write(slow, off, r->buf, len);
#endif
r->h->release_read_buffer(r);
releaseSlowHandle(impl, slow);
pthread_mutex_lock(&impl->mut);
pthread_mutex_lock(&impl->mut);
#ifdef MERGE_WRITES
for(int i = 0; i < dummy_count; i++) {
np = (tree_node*)RB_ENTRY(find)(&dummies[i], impl->fast_handles);
assert(np->pin_count);
np->pin_count--;
if(!np->dirty && !np->pin_count) {
impl->used_buffer_size -= (np->end_pos - np->start_pos);
freeFastHandle(impl, np);
}
}
if(dummies) {
next_node = dummies[dummy_count-1];
free(dummies);
}
#endif
node->pin_count--;
}
tree_node *new_node = (tree_node*)RB_ENTRY(lookup)(RB_LUGREAT, node,
tree_node *new_node = (tree_node*)RB_ENTRY(lookup)(RB_LUGREAT, &next_node,
impl->fast_handles);
if(!node->dirty && !node->pin_count) {
impl->fast_handle_count -= node->write_count;
impl->used_buffer_size -= (node->end_pos - node->start_pos);
freeFastHandle(impl, node);
}
@ -692,6 +676,9 @@ static void * nbw_worker(void * handle) {
}
}
pthread_mutex_unlock(&impl->mut);
releaseSlowHandle(impl, slow);
return 0;
}