From e67da86a9b8f1a37d3d91ab96aa19540c526acfe Mon Sep 17 00:00:00 2001 From: Gregory Burd Date: Mon, 19 Aug 2013 13:32:58 -0400 Subject: [PATCH] Change backpressure method from EAGAIN to bump_reductions so as not to block Riak/KV vnode processes when queues backup. --- c_src/async_nif.h | 9 ++++++--- src/async_nif.hrl | 38 +++++++++++++++++--------------------- src/wterl.erl | 2 +- 3 files changed, 24 insertions(+), 25 deletions(-) diff --git a/c_src/async_nif.h b/c_src/async_nif.h index 539bc75..b1800be 100644 --- a/c_src/async_nif.h +++ b/c_src/async_nif.h @@ -34,7 +34,7 @@ extern "C" { #define ASYNC_NIF_MAX_WORKERS 1024 #define ASYNC_NIF_MIN_WORKERS 2 -#define ASYNC_NIF_WORKER_QUEUE_SIZE 256 +#define ASYNC_NIF_WORKER_QUEUE_SIZE 8192 #define ASYNC_NIF_MAX_QUEUED_REQS ASYNC_NIF_WORKER_QUEUE_SIZE * ASYNC_NIF_MAX_WORKERS struct async_nif_req_entry { @@ -111,7 +111,7 @@ struct async_nif_state { req = async_nif_reuse_req(async_nif); \ if (!req) { \ return enif_make_tuple2(env, enif_make_atom(env, "error"), \ - enif_make_atom(env, "eagain")); \ + enif_make_atom(env, "enomem")); \ } \ new_env = req->env; \ DPRINTF("async_nif: calling \"%s\"", __func__); \ @@ -359,8 +359,11 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en /* Build the term before releasing the lock so as not to race on the use of the req pointer (which will soon become invalid in another thread performing the request). */ + double pct_full = (double)avg_depth / (double)ASYNC_NIF_WORKER_QUEUE_SIZE; ERL_NIF_TERM reply = enif_make_tuple2(req->env, enif_make_atom(req->env, "ok"), - enif_make_atom(req->env, "enqueued")); + enif_make_tuple2(req->env, + enif_make_atom(req->env, "enqueued"), + enif_make_double(req->env, pct_full))); enif_cond_signal(q->reqs_cnd); enif_mutex_unlock(q->reqs_mutex); return reply; diff --git a/src/async_nif.hrl b/src/async_nif.hrl index be61b54..c225f16 100644 --- a/src/async_nif.hrl +++ b/src/async_nif.hrl @@ -22,24 +22,20 @@ %% ------------------------------------------------------------------- -define(ASYNC_NIF_CALL(Fun, Args), - F = fun(F) -> - R = erlang:make_ref(), - case erlang:apply(F, [R|Args]) of - {ok, enqueued} -> - receive - {R, {error, shutdown}=Error} -> - %% Work unit was queued, but not executed. - Error; - {R, {error, _Reason}=Error} -> - %% Work unit returned an error. - Error; - {R, Reply} -> - Reply - end; - {error, eagain} -> - F(F); - Other -> - Other - end - end, - F(Fun)). + R = erlang:make_ref(), + case erlang:apply(Fun, [R|Args]) of + {ok, {enqueued, PercentFull}} -> + erlang:bump_reductions(erlang:trunc(2000 * PercentFull)), + receive + {R, {error, shutdown}=Error} -> + %% Work unit was queued, but not executed. + Error; + {R, {error, _Reason}=Error} -> + %% Work unit returned an error. + Error; + {R, Reply} -> + Reply + end; + Other -> + Other + end). diff --git a/src/wterl.erl b/src/wterl.erl index 906a167..952bb61 100644 --- a/src/wterl.erl +++ b/src/wterl.erl @@ -613,7 +613,7 @@ many_open_tables_test_() -> DataDir = ?TEST_DATA_DIR, KeyGen = fun(X) -> - crypto:sha(<>) + crypto:hash(sha, <>) end, ValGen = fun() ->