diff --git a/src/stasis/io/handle.c b/src/stasis/io/handle.c index 58bbabd..6b5277b 100644 --- a/src/stasis/io/handle.c +++ b/src/stasis/io/handle.c @@ -74,7 +74,7 @@ stasis_handle_t * stasis_handle(open)(const char * path) { slow_arg->filePerm = FILE_PERM; // Allow 4MB of outstanding writes. // @todo Where / how should we open storefile? - int worker_thread_count = 4; + int worker_thread_count = 1; if(bufferManagerNonBlockingSlowHandleType == IO_HANDLE_PFILE) { // printf("\nusing pread()/pwrite()\n"); stasis_handle_t * slow_pfile = stasis_handle_open_pfile(0, slow_arg->filename, slow_arg->openMode, slow_arg->filePerm); diff --git a/src/stasis/io/non_blocking.c b/src/stasis/io/non_blocking.c index 077386e..92ff2ec 100644 --- a/src/stasis/io/non_blocking.c +++ b/src/stasis/io/non_blocking.c @@ -1,8 +1,6 @@ #include -#undef STLSEARCH // XXX - #include #include #include @@ -167,7 +165,7 @@ typedef struct nbw_impl { stasis_handle_t * (*fast_factory)(lsn_t off, lsn_t len, void * arg); void * fast_factory_arg; - struct RB_ENTRY(tree) * fast_handles; + struct rbtree * fast_handles; int fast_handle_count; int max_fast_handles; int min_fast_handles; @@ -228,11 +226,14 @@ static tree_node * allocTreeNode(lsn_t off, lsn_t len) { static inline const tree_node * allocFastHandle(nbw_impl * impl, lsn_t off, lsn_t len) { tree_node * np = allocTreeNode(off, len); + +hack: + pthread_mutex_lock(&impl->mut); DEBUG("allocFastHandle(%lld)\n", off/PAGE_SIZE); - const tree_node * n = RB_ENTRY(lookup)(RB_LULTEQ, np, impl->fast_handles); + const tree_node * n = rblookup(RB_LULTEQ, np, impl->fast_handles); // this code only works when writes / reads are aligned to immutable // boundaries, and never cross boundaries. if((!n) || @@ -254,11 +255,16 @@ static inline const tree_node * allocFastHandle(nbw_impl * impl, lsn_t off, pthread_mutex_unlock(&impl->mut); - np->dirty = INVALID_NODE; - // @todo should non_blocking fall back on slow handles for backpressure? - np->h = getSlowHandle(impl); + struct timespec ts = { 0, 1000000 /*ns*/ }; + nanosleep(&ts,0); - return np; + goto hack; + +// np->dirty = INVALID_NODE; +// // @todo should non_blocking fall back on slow handles for backpressure? +// np->h = getSlowHandle(impl); + +// return np; } else { impl->fast_handle_count++; @@ -266,7 +272,7 @@ static inline const tree_node * allocFastHandle(nbw_impl * impl, lsn_t off, impl->used_buffer_size += len; np->h = impl->fast_factory(off,len,impl->fast_factory_arg); - RB_ENTRY(search)(np, impl->fast_handles); + rbsearch(np, impl->fast_handles); n = np; } } else { @@ -283,7 +289,7 @@ static inline const tree_node * findFastHandle(nbw_impl * impl, lsn_t off, tree_node * np = allocTreeNode(off, len); pthread_mutex_lock(&impl->mut); - const tree_node * n = RB_ENTRY(find)(np, impl->fast_handles); + const tree_node * n = rbfind(np, impl->fast_handles); if(n) ((tree_node*)n)->pin_count++; pthread_mutex_unlock(&impl->mut); @@ -296,7 +302,7 @@ static inline void freeFastHandle(nbw_impl * impl, const tree_node * n) { assert(impl->fast_handle_count>=0); impl->fast_handle_count--; DEBUG("dec fast handle count %d", impl->fast_handle_count); - RB_ENTRY(delete)(n, impl->fast_handles); + rbdelete(n, impl->fast_handles); n->h->close(n->h); free((void*)n); } @@ -363,7 +369,7 @@ static int nbw_close(stasis_handle_t * h) { assert(impl->fast_handle_count == 0); assert(impl->used_buffer_size == 0); - RB_ENTRY(destroy)(impl->fast_handles); + rbdestroy(impl->fast_handles); pthread_mutex_destroy(&impl->mut); stasis_handle_t * slow; while(-1 != (long)(slow = (stasis_handle_t*)popMaxVal(&impl->available_slow_handles))) { @@ -555,7 +561,7 @@ static int nbw_force_range_impl(stasis_handle_t * h, lsn_t start, lsn_t stop) { scratch.start_pos = start; scratch.end_pos = start+1; if(!stop) stop = impl->end_pos; - const tree_node * n = RB_ENTRY(lookup)(RB_LUGTEQ,&scratch,impl->fast_handles); // min)(impl->fast_handles); + const tree_node * n = rblookup(RB_LUGTEQ,&scratch,impl->fast_handles); // min)(impl->fast_handles); int blocked = 0; while(n) { if(n->start_pos >= stop) { break; } @@ -564,18 +570,18 @@ static int nbw_force_range_impl(stasis_handle_t * h, lsn_t start, lsn_t stop) { ((tree_node*)n)->dirty = NEEDS_FORCE; blocked = 1; } - n = RB_ENTRY(lookup)(RB_LUNEXT,n,impl->fast_handles); + n = rblookup(RB_LUNEXT,n,impl->fast_handles); } pthread_cond_broadcast(&impl->pending_writes_cond); while(blocked) { pthread_cond_wait(&impl->force_completed_cond,&impl->mut); blocked = 0; - n = RB_ENTRY(min)(impl->fast_handles); + n = rbmin(impl->fast_handles); while(n) { if(n->dirty == NEEDS_FORCE) { blocked = 1; } - n = RB_ENTRY(lookup)(RB_LUNEXT,n,impl->fast_handles); + n = rblookup(RB_LUNEXT,n,impl->fast_handles); } } int ret = 0; @@ -670,7 +676,7 @@ static void * nbw_worker(void * handle) { pthread_mutex_lock(&impl->mut); while(1) { // cast strips const. - tree_node * node = (tree_node*)RB_ENTRY(min)(impl->fast_handles); + tree_node * node = (tree_node*)rbmin(impl->fast_handles); int writes = 0; int contributed_to_force = 0; while(node) { @@ -701,7 +707,7 @@ static void * nbw_worker(void * handle) { tree_node * np; tree_node * dummies = 0; int dummy_count = 0; - while((np = (tree_node*)RB_ENTRY(find)(&dummy, impl->fast_handles)) + while((np = (tree_node*)rbfind(&dummy, impl->fast_handles)) && np->dirty && !np->pin_count) { lsn_t np_len = np->end_pos - np->start_pos; len += np_len; @@ -756,9 +762,10 @@ static void * nbw_worker(void * handle) { #endif r->h->release_read_buffer(r); pthread_mutex_lock(&impl->mut); + #ifdef MERGE_WRITES for(int i = 0; i < dummy_count; i++) { - np = (tree_node*)RB_ENTRY(find)(&dummies[i], impl->fast_handles); + np = (tree_node*)rbfind(&dummies[i], impl->fast_handles); assert(np); assert(np->pin_count); assert(np->dirty != INVALID_NODE); @@ -777,7 +784,15 @@ static void * nbw_worker(void * handle) { assert(node->pin_count); node->pin_count--; } - tree_node *new_node = (tree_node*)RB_ENTRY(lookup)(RB_LUGREAT, &next_node, + + if(impl->used_buffer_size < (1 * impl->max_buffer_size) / 5) { + pthread_mutex_unlock(&impl->mut); + struct timespec ts = {0, 1000000}; + nanosleep(&ts, 0); + pthread_mutex_lock(&impl->mut); + } + + tree_node *new_node = (tree_node*)rblookup(RB_LUGREAT, &next_node, impl->fast_handles); if(!node->dirty && !node->pin_count) { impl->used_buffer_size -= (node->end_pos - node->start_pos); @@ -834,7 +849,7 @@ stasis_handle_t * stasis_handle(open_non_blocking) impl->fast_factory = fast_factory; impl->fast_factory_arg = fast_factory_arg; - impl->fast_handles = RB_ENTRY(init)(cmp_handle, 0); + impl->fast_handles = rbinit(cmp_handle, 0); impl->fast_handle_count = 0; impl->max_fast_handles = max_fast_handles; impl->min_fast_handles = max_fast_handles / 2;