port to new rbtree implementation (still buggy); performance tweaks for sequential throughput (this commit only changes non_blocking which is disabled by default, and in all sane runtime configurations)
This commit is contained in:
parent
9ffc06c824
commit
9a46882ae3
2 changed files with 37 additions and 22 deletions
|
@ -74,7 +74,7 @@ stasis_handle_t * stasis_handle(open)(const char * path) {
|
||||||
slow_arg->filePerm = FILE_PERM;
|
slow_arg->filePerm = FILE_PERM;
|
||||||
// Allow 4MB of outstanding writes.
|
// Allow 4MB of outstanding writes.
|
||||||
// @todo Where / how should we open storefile?
|
// @todo Where / how should we open storefile?
|
||||||
int worker_thread_count = 4;
|
int worker_thread_count = 1;
|
||||||
if(bufferManagerNonBlockingSlowHandleType == IO_HANDLE_PFILE) {
|
if(bufferManagerNonBlockingSlowHandleType == IO_HANDLE_PFILE) {
|
||||||
// printf("\nusing pread()/pwrite()\n");
|
// printf("\nusing pread()/pwrite()\n");
|
||||||
stasis_handle_t * slow_pfile = stasis_handle_open_pfile(0, slow_arg->filename, slow_arg->openMode, slow_arg->filePerm);
|
stasis_handle_t * slow_pfile = stasis_handle_open_pfile(0, slow_arg->filename, slow_arg->openMode, slow_arg->filePerm);
|
||||||
|
|
|
@ -1,8 +1,6 @@
|
||||||
|
|
||||||
#include <stasis/common.h>
|
#include <stasis/common.h>
|
||||||
|
|
||||||
#undef STLSEARCH // XXX
|
|
||||||
|
|
||||||
#include <stasis/constants.h>
|
#include <stasis/constants.h>
|
||||||
#include <stasis/io/handle.h>
|
#include <stasis/io/handle.h>
|
||||||
#include <stasis/linkedlist.h>
|
#include <stasis/linkedlist.h>
|
||||||
|
@ -167,7 +165,7 @@ typedef struct nbw_impl {
|
||||||
stasis_handle_t * (*fast_factory)(lsn_t off, lsn_t len, void * arg);
|
stasis_handle_t * (*fast_factory)(lsn_t off, lsn_t len, void * arg);
|
||||||
void * fast_factory_arg;
|
void * fast_factory_arg;
|
||||||
|
|
||||||
struct RB_ENTRY(tree) * fast_handles;
|
struct rbtree * fast_handles;
|
||||||
int fast_handle_count;
|
int fast_handle_count;
|
||||||
int max_fast_handles;
|
int max_fast_handles;
|
||||||
int min_fast_handles;
|
int min_fast_handles;
|
||||||
|
@ -228,11 +226,14 @@ static tree_node * allocTreeNode(lsn_t off, lsn_t len) {
|
||||||
static inline const tree_node * allocFastHandle(nbw_impl * impl, lsn_t off,
|
static inline const tree_node * allocFastHandle(nbw_impl * impl, lsn_t off,
|
||||||
lsn_t len) {
|
lsn_t len) {
|
||||||
tree_node * np = allocTreeNode(off, len);
|
tree_node * np = allocTreeNode(off, len);
|
||||||
|
|
||||||
|
hack:
|
||||||
|
|
||||||
pthread_mutex_lock(&impl->mut);
|
pthread_mutex_lock(&impl->mut);
|
||||||
|
|
||||||
DEBUG("allocFastHandle(%lld)\n", off/PAGE_SIZE);
|
DEBUG("allocFastHandle(%lld)\n", off/PAGE_SIZE);
|
||||||
|
|
||||||
const tree_node * n = RB_ENTRY(lookup)(RB_LULTEQ, np, impl->fast_handles);
|
const tree_node * n = rblookup(RB_LULTEQ, np, impl->fast_handles);
|
||||||
// this code only works when writes / reads are aligned to immutable
|
// this code only works when writes / reads are aligned to immutable
|
||||||
// boundaries, and never cross boundaries.
|
// boundaries, and never cross boundaries.
|
||||||
if((!n) ||
|
if((!n) ||
|
||||||
|
@ -254,11 +255,16 @@ static inline const tree_node * allocFastHandle(nbw_impl * impl, lsn_t off,
|
||||||
|
|
||||||
pthread_mutex_unlock(&impl->mut);
|
pthread_mutex_unlock(&impl->mut);
|
||||||
|
|
||||||
np->dirty = INVALID_NODE;
|
struct timespec ts = { 0, 1000000 /*ns*/ };
|
||||||
// @todo should non_blocking fall back on slow handles for backpressure?
|
nanosleep(&ts,0);
|
||||||
np->h = getSlowHandle(impl);
|
|
||||||
|
|
||||||
return np;
|
goto hack;
|
||||||
|
|
||||||
|
// np->dirty = INVALID_NODE;
|
||||||
|
// // @todo should non_blocking fall back on slow handles for backpressure?
|
||||||
|
// np->h = getSlowHandle(impl);
|
||||||
|
|
||||||
|
// return np;
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
impl->fast_handle_count++;
|
impl->fast_handle_count++;
|
||||||
|
@ -266,7 +272,7 @@ static inline const tree_node * allocFastHandle(nbw_impl * impl, lsn_t off,
|
||||||
impl->used_buffer_size += len;
|
impl->used_buffer_size += len;
|
||||||
|
|
||||||
np->h = impl->fast_factory(off,len,impl->fast_factory_arg);
|
np->h = impl->fast_factory(off,len,impl->fast_factory_arg);
|
||||||
RB_ENTRY(search)(np, impl->fast_handles);
|
rbsearch(np, impl->fast_handles);
|
||||||
n = np;
|
n = np;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -283,7 +289,7 @@ static inline const tree_node * findFastHandle(nbw_impl * impl, lsn_t off,
|
||||||
tree_node * np = allocTreeNode(off, len);
|
tree_node * np = allocTreeNode(off, len);
|
||||||
|
|
||||||
pthread_mutex_lock(&impl->mut);
|
pthread_mutex_lock(&impl->mut);
|
||||||
const tree_node * n = RB_ENTRY(find)(np, impl->fast_handles);
|
const tree_node * n = rbfind(np, impl->fast_handles);
|
||||||
if(n) ((tree_node*)n)->pin_count++;
|
if(n) ((tree_node*)n)->pin_count++;
|
||||||
pthread_mutex_unlock(&impl->mut);
|
pthread_mutex_unlock(&impl->mut);
|
||||||
|
|
||||||
|
@ -296,7 +302,7 @@ static inline void freeFastHandle(nbw_impl * impl, const tree_node * n) {
|
||||||
assert(impl->fast_handle_count>=0);
|
assert(impl->fast_handle_count>=0);
|
||||||
impl->fast_handle_count--;
|
impl->fast_handle_count--;
|
||||||
DEBUG("dec fast handle count %d", impl->fast_handle_count);
|
DEBUG("dec fast handle count %d", impl->fast_handle_count);
|
||||||
RB_ENTRY(delete)(n, impl->fast_handles);
|
rbdelete(n, impl->fast_handles);
|
||||||
n->h->close(n->h);
|
n->h->close(n->h);
|
||||||
free((void*)n);
|
free((void*)n);
|
||||||
}
|
}
|
||||||
|
@ -363,7 +369,7 @@ static int nbw_close(stasis_handle_t * h) {
|
||||||
assert(impl->fast_handle_count == 0);
|
assert(impl->fast_handle_count == 0);
|
||||||
assert(impl->used_buffer_size == 0);
|
assert(impl->used_buffer_size == 0);
|
||||||
|
|
||||||
RB_ENTRY(destroy)(impl->fast_handles);
|
rbdestroy(impl->fast_handles);
|
||||||
pthread_mutex_destroy(&impl->mut);
|
pthread_mutex_destroy(&impl->mut);
|
||||||
stasis_handle_t * slow;
|
stasis_handle_t * slow;
|
||||||
while(-1 != (long)(slow = (stasis_handle_t*)popMaxVal(&impl->available_slow_handles))) {
|
while(-1 != (long)(slow = (stasis_handle_t*)popMaxVal(&impl->available_slow_handles))) {
|
||||||
|
@ -555,7 +561,7 @@ static int nbw_force_range_impl(stasis_handle_t * h, lsn_t start, lsn_t stop) {
|
||||||
scratch.start_pos = start;
|
scratch.start_pos = start;
|
||||||
scratch.end_pos = start+1;
|
scratch.end_pos = start+1;
|
||||||
if(!stop) stop = impl->end_pos;
|
if(!stop) stop = impl->end_pos;
|
||||||
const tree_node * n = RB_ENTRY(lookup)(RB_LUGTEQ,&scratch,impl->fast_handles); // min)(impl->fast_handles);
|
const tree_node * n = rblookup(RB_LUGTEQ,&scratch,impl->fast_handles); // min)(impl->fast_handles);
|
||||||
int blocked = 0;
|
int blocked = 0;
|
||||||
while(n) {
|
while(n) {
|
||||||
if(n->start_pos >= stop) { break; }
|
if(n->start_pos >= stop) { break; }
|
||||||
|
@ -564,18 +570,18 @@ static int nbw_force_range_impl(stasis_handle_t * h, lsn_t start, lsn_t stop) {
|
||||||
((tree_node*)n)->dirty = NEEDS_FORCE;
|
((tree_node*)n)->dirty = NEEDS_FORCE;
|
||||||
blocked = 1;
|
blocked = 1;
|
||||||
}
|
}
|
||||||
n = RB_ENTRY(lookup)(RB_LUNEXT,n,impl->fast_handles);
|
n = rblookup(RB_LUNEXT,n,impl->fast_handles);
|
||||||
}
|
}
|
||||||
pthread_cond_broadcast(&impl->pending_writes_cond);
|
pthread_cond_broadcast(&impl->pending_writes_cond);
|
||||||
while(blocked) {
|
while(blocked) {
|
||||||
pthread_cond_wait(&impl->force_completed_cond,&impl->mut);
|
pthread_cond_wait(&impl->force_completed_cond,&impl->mut);
|
||||||
blocked = 0;
|
blocked = 0;
|
||||||
n = RB_ENTRY(min)(impl->fast_handles);
|
n = rbmin(impl->fast_handles);
|
||||||
while(n) {
|
while(n) {
|
||||||
if(n->dirty == NEEDS_FORCE) {
|
if(n->dirty == NEEDS_FORCE) {
|
||||||
blocked = 1;
|
blocked = 1;
|
||||||
}
|
}
|
||||||
n = RB_ENTRY(lookup)(RB_LUNEXT,n,impl->fast_handles);
|
n = rblookup(RB_LUNEXT,n,impl->fast_handles);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
int ret = 0;
|
int ret = 0;
|
||||||
|
@ -670,7 +676,7 @@ static void * nbw_worker(void * handle) {
|
||||||
pthread_mutex_lock(&impl->mut);
|
pthread_mutex_lock(&impl->mut);
|
||||||
while(1) {
|
while(1) {
|
||||||
// cast strips const.
|
// cast strips const.
|
||||||
tree_node * node = (tree_node*)RB_ENTRY(min)(impl->fast_handles);
|
tree_node * node = (tree_node*)rbmin(impl->fast_handles);
|
||||||
int writes = 0;
|
int writes = 0;
|
||||||
int contributed_to_force = 0;
|
int contributed_to_force = 0;
|
||||||
while(node) {
|
while(node) {
|
||||||
|
@ -701,7 +707,7 @@ static void * nbw_worker(void * handle) {
|
||||||
tree_node * np;
|
tree_node * np;
|
||||||
tree_node * dummies = 0;
|
tree_node * dummies = 0;
|
||||||
int dummy_count = 0;
|
int dummy_count = 0;
|
||||||
while((np = (tree_node*)RB_ENTRY(find)(&dummy, impl->fast_handles))
|
while((np = (tree_node*)rbfind(&dummy, impl->fast_handles))
|
||||||
&& np->dirty && !np->pin_count) {
|
&& np->dirty && !np->pin_count) {
|
||||||
lsn_t np_len = np->end_pos - np->start_pos;
|
lsn_t np_len = np->end_pos - np->start_pos;
|
||||||
len += np_len;
|
len += np_len;
|
||||||
|
@ -756,9 +762,10 @@ static void * nbw_worker(void * handle) {
|
||||||
#endif
|
#endif
|
||||||
r->h->release_read_buffer(r);
|
r->h->release_read_buffer(r);
|
||||||
pthread_mutex_lock(&impl->mut);
|
pthread_mutex_lock(&impl->mut);
|
||||||
|
|
||||||
#ifdef MERGE_WRITES
|
#ifdef MERGE_WRITES
|
||||||
for(int i = 0; i < dummy_count; i++) {
|
for(int i = 0; i < dummy_count; i++) {
|
||||||
np = (tree_node*)RB_ENTRY(find)(&dummies[i], impl->fast_handles);
|
np = (tree_node*)rbfind(&dummies[i], impl->fast_handles);
|
||||||
assert(np);
|
assert(np);
|
||||||
assert(np->pin_count);
|
assert(np->pin_count);
|
||||||
assert(np->dirty != INVALID_NODE);
|
assert(np->dirty != INVALID_NODE);
|
||||||
|
@ -777,7 +784,15 @@ static void * nbw_worker(void * handle) {
|
||||||
assert(node->pin_count);
|
assert(node->pin_count);
|
||||||
node->pin_count--;
|
node->pin_count--;
|
||||||
}
|
}
|
||||||
tree_node *new_node = (tree_node*)RB_ENTRY(lookup)(RB_LUGREAT, &next_node,
|
|
||||||
|
if(impl->used_buffer_size < (1 * impl->max_buffer_size) / 5) {
|
||||||
|
pthread_mutex_unlock(&impl->mut);
|
||||||
|
struct timespec ts = {0, 1000000};
|
||||||
|
nanosleep(&ts, 0);
|
||||||
|
pthread_mutex_lock(&impl->mut);
|
||||||
|
}
|
||||||
|
|
||||||
|
tree_node *new_node = (tree_node*)rblookup(RB_LUGREAT, &next_node,
|
||||||
impl->fast_handles);
|
impl->fast_handles);
|
||||||
if(!node->dirty && !node->pin_count) {
|
if(!node->dirty && !node->pin_count) {
|
||||||
impl->used_buffer_size -= (node->end_pos - node->start_pos);
|
impl->used_buffer_size -= (node->end_pos - node->start_pos);
|
||||||
|
@ -834,7 +849,7 @@ stasis_handle_t * stasis_handle(open_non_blocking)
|
||||||
impl->fast_factory = fast_factory;
|
impl->fast_factory = fast_factory;
|
||||||
impl->fast_factory_arg = fast_factory_arg;
|
impl->fast_factory_arg = fast_factory_arg;
|
||||||
|
|
||||||
impl->fast_handles = RB_ENTRY(init)(cmp_handle, 0);
|
impl->fast_handles = rbinit(cmp_handle, 0);
|
||||||
impl->fast_handle_count = 0;
|
impl->fast_handle_count = 0;
|
||||||
impl->max_fast_handles = max_fast_handles;
|
impl->max_fast_handles = max_fast_handles;
|
||||||
impl->min_fast_handles = max_fast_handles / 2;
|
impl->min_fast_handles = max_fast_handles / 2;
|
||||||
|
|
Loading…
Reference in a new issue