From 1ae8e5698fc4434797a47e788257dcbb8358af92 Mon Sep 17 00:00:00 2001 From: Gregory Burd Date: Wed, 17 Apr 2013 16:48:23 -0400 Subject: [PATCH] Ensure that the ratio of workers to queues is 2:1 and that there are at least 2 queues regardless. Fix a few race conditions (h/t Sue from WiredTiger for some nice work) and cherry pick (for now) a commit that fixes a bug I triggered and Keith fixed (in < 10min from report) related to WiredTiger stats. Ensure that my guesstimate for session_max is no larger than WiredTiger can manage. Continue to fiddle with the build script. --- c_src/async_nif.h | 60 ++++++++++++++++++++++++----------- c_src/build_deps.sh | 6 ++-- src/riak_kv_wterl_backend.erl | 6 +++- 3 files changed, 51 insertions(+), 21 deletions(-) diff --git a/c_src/async_nif.h b/c_src/async_nif.h index 9d63c52..abe6ed9 100644 --- a/c_src/async_nif.h +++ b/c_src/async_nif.h @@ -26,6 +26,7 @@ extern "C" { #endif +#include #ifdef ASYNC_NIF_STATS #include "stats.h" // TODO: measure, measure... measure again #endif @@ -171,12 +172,13 @@ struct async_nif_state { enif_self(env, &req->pid); \ req->args = (void*)copy_of_args; \ req->fn_work = (void (*)(ErlNifEnv *, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *))fn_work_ ## decl ; \ + req->fn_post = (void (*)(void *))fn_post_ ## decl; \ int h = -1; \ if (affinity) \ h = async_nif_str_hash_func(affinity) % async_nif->num_queues; \ ERL_NIF_TERM reply = async_nif_enqueue_req(async_nif, req, h); \ - req->fn_post = (void (*)(void *))fn_post_ ## decl; \ if (!reply) { \ + fn_post_ ## decl (args); \ enif_free(req); \ enif_free_env(new_env); \ enif_free(copy_of_args); \ @@ -293,23 +295,30 @@ async_nif_worker_fn(void *arg) enif_cond_wait(q->reqs_cnd, q->reqs_mutex); goto check_again_for_work; } else { + assert(fifo_q_size(reqs, q->reqs) > 0); + assert(fifo_q_size(reqs, q->reqs) < fifo_q_capacity(reqs, q->reqs)); /* At this point the next req is ours to process and we hold the reqs_mutex lock. Take the request off the queue. */ req = fifo_q_get(reqs, q->reqs); enif_mutex_unlock(q->reqs_mutex); - /* Wake up another thread working on this queue. */ + /* Ensure that there is at least one other worker watching this queue. */ enif_cond_signal(q->reqs_cnd); - /* Finally, do the work. */ + /* Finally, do the work, */ req->fn_work(req->env, req->ref, &req->pid, worker_id, req->args); + + /* and then call the post-work cleanup function. */ req->fn_post(req->args); - /* Note: we don't call enif_free_env(req->env) because it has called - enif_send() which invalidates it (free'ing it for us). If a work - block doesn't call ASYNC_NIF_REPLY() at some point then it must - call ASYNC_NIF_NOREPLY() to free this env. */ + + /* Note: we don't call enif_free_env(req->env) because somewhere in the + work block there has been a call to enif_send() which invalidates the + environment we created which in theory will cause the next GC to free + it for us). If a work block doesn't call ASYNC_NIF_REPLY() at some + point then it must call ASYNC_NIF_NOREPLY() to free this env. */ enif_free(req->args); enif_free(req); + req = NULL; } } enif_thread_exit(0); @@ -321,6 +330,7 @@ async_nif_unload(ErlNifEnv *env) { unsigned int i; struct async_nif_state *async_nif = (struct async_nif_state*)enif_priv_data(env); + unsigned int num_queues = async_nif->num_queues; /* Signal the worker threads, stop what you're doing and exit. To ensure that we don't race with the enqueue() process we first @@ -328,14 +338,14 @@ async_nif_unload(ErlNifEnv *env) unlock. The enqueue function will take the queue mutex, then test for shutdown condition, then enqueue only if not shutting down. */ - for (i = 0; i < async_nif->num_queues; i++) + for (i = 0; i < num_queues; i++) enif_mutex_lock(async_nif->queues[i].reqs_mutex); async_nif->shutdown = 1; - for (i = 0; i < async_nif->num_queues; i++) + for (i = 0; i < num_queues; i++) enif_mutex_unlock(async_nif->queues[i].reqs_mutex); /* Wake up any waiting worker threads. */ - for (i = 0; i < async_nif->num_queues; i++) { + for (i = 0; i < num_queues; i++) { struct async_nif_work_queue *q = &async_nif->queues[i]; enif_cond_broadcast(q->reqs_cnd); } @@ -347,7 +357,6 @@ async_nif_unload(ErlNifEnv *env) } /* Cleanup requests, mutexes and conditions in each work queue. */ - unsigned int num_queues = async_nif->num_queues; for (i = 0; i < num_queues; i++) { struct async_nif_work_queue *q = &async_nif->queues[i]; enif_mutex_destroy(q->reqs_mutex); @@ -394,7 +403,20 @@ async_nif_load(void) memset(async_nif, 0, sizeof(struct async_nif_state) + sizeof(struct async_nif_work_queue) * info.scheduler_threads); - async_nif->num_queues = info.scheduler_threads; + if (info.scheduler_threads > ASYNC_NIF_MAX_WORKERS / 2) { + async_nif->num_queues = ASYNC_NIF_MAX_WORKERS / 2; + } else { + int remainder = ASYNC_NIF_MAX_WORKERS % info.scheduler_threads; + if (remainder != 0) { + async_nif->num_queues = info.scheduler_threads - remainder; + } else { + async_nif->num_queues = info.scheduler_threads; + } + if (async_nif->num_queues < 2) { + async_nif->num_queues = 2; + } + } + async_nif->num_workers = ASYNC_NIF_MAX_WORKERS; // TODO: start with 2 per queue, then grow if needed async_nif->next_q = 0; async_nif->shutdown = 0; @@ -409,9 +431,7 @@ async_nif_load(void) memset(async_nif->worker_entries, 0, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS); /* Start the worker threads. */ - unsigned int num_workers = async_nif->num_queues; - - for (i = 0; i < num_workers; i++) { + for (i = 0; i < async_nif->num_workers; i++) { struct async_nif_worker_entry *we = &async_nif->worker_entries[i]; we->async_nif = async_nif; we->worker_id = i; @@ -423,8 +443,6 @@ async_nif_load(void) for (j = 0; j < async_nif->num_queues; j++) { struct async_nif_work_queue *q = &async_nif->queues[j]; enif_cond_broadcast(q->reqs_cnd); - enif_mutex_destroy(q->reqs_mutex); - enif_cond_destroy(q->reqs_cnd); } while(i-- > 0) { @@ -432,11 +450,17 @@ async_nif_load(void) enif_thread_join(async_nif->worker_entries[i].tid, &exit_value); } + for (j = 0; j < async_nif->num_queues; j++) { + struct async_nif_work_queue *q = &async_nif->queues[j]; + enif_mutex_destroy(q->reqs_mutex); + enif_cond_destroy(q->reqs_cnd); + } + memset(async_nif->worker_entries, 0, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS); + enif_free(async_nif); return NULL; } } - async_nif->num_workers = i; return async_nif; } diff --git a/c_src/build_deps.sh b/c_src/build_deps.sh index c07f10b..19658eb 100755 --- a/c_src/build_deps.sh +++ b/c_src/build_deps.sh @@ -48,6 +48,7 @@ get_wt () mv wiredtiger $WT_DIR || exit 1 fi [ -d $BASEDIR/$WT_DIR ] || (echo "Missing WiredTiger source directory" && exit 1) + (cd $BASEDIR/$WT_DIR && git cherry-pick a3c8c2a13758ae9c44edabcc1a780984a7882904 || exit 1) (cd $BASEDIR/$WT_DIR [ -e $BASEDIR/wiredtiger-build.patch ] && \ (patch -p1 --forward < $BASEDIR/wiredtiger-build.patch || exit 1 ) @@ -167,12 +168,13 @@ case "$1" in [ -d $BASEDIR/$WT_DIR ] || (echo "Missing WiredTiger source directory" && exit 1) test -f system/lib/libwiredtiger-[0-9].[0-9].[0-9].so \ -a -f system/lib/libwiredtiger_snappy.so \ - -a -f system/lib/libwiredtiger_bzip2.so || build_wt; + -a -f system/lib/libwiredtiger_bzip2.so.[0-9].[0-9].[0-9] || build_wt; [ -d $BASEDIR/../priv ] || mkdir ${BASEDIR}/../priv cp $BASEDIR/system/bin/wt ${BASEDIR}/../priv cp $BASEDIR/system/lib/libwiredtiger-[0-9].[0-9].[0-9].so ${BASEDIR}/../priv - cp $BASEDIR/system/lib/libwiredtiger_*.so ${BASEDIR}/../priv + cp $BASEDIR/system/lib/libwiredtiger_snappy.so ${BASEDIR}/../priv + cp $BASEDIR/system/lib/libwiredtiger_bzip2.so.[0-9].[0-9].[0-9] ${BASEDIR}/../priv cp $BASEDIR/system/lib/libbz2.so.[0-9].[0-9].[0-9] ${BASEDIR}/../priv cp $BASEDIR/system/lib/libsnappy.so.[0-9].[0-9].[0-9] ${BASEDIR}/../priv ;; diff --git a/src/riak_kv_wterl_backend.erl b/src/riak_kv_wterl_backend.erl index 8477a79..d6282b2 100644 --- a/src/riak_kv_wterl_backend.erl +++ b/src/riak_kv_wterl_backend.erl @@ -355,7 +355,11 @@ max_sessions(Config) -> undefined -> 1024; Size -> Size end, - 1000 * (RingSize * erlang:system_info(schedulers)). % TODO: review/fix this logic + Est = 1000 * (RingSize * erlang:system_info(schedulers)), % TODO: review/fix this logic + case Est > 1000000000 of % Note: WiredTiger uses a signed int for this + true -> 1000000000; + false -> Est + end. %% @private establish_utility_cursors(Connection, Table) ->