Change backpressure method from EAGAIN to bump_reductions so as not to block Riak/KV vnode processes when queues backup.

This commit is contained in:
Gregory Burd 2013-08-19 13:32:58 -04:00
parent 2047104cda
commit e67da86a9b
3 changed files with 24 additions and 25 deletions

View file

@ -34,7 +34,7 @@ extern "C" {
#define ASYNC_NIF_MAX_WORKERS 1024 #define ASYNC_NIF_MAX_WORKERS 1024
#define ASYNC_NIF_MIN_WORKERS 2 #define ASYNC_NIF_MIN_WORKERS 2
#define ASYNC_NIF_WORKER_QUEUE_SIZE 256 #define ASYNC_NIF_WORKER_QUEUE_SIZE 8192
#define ASYNC_NIF_MAX_QUEUED_REQS ASYNC_NIF_WORKER_QUEUE_SIZE * ASYNC_NIF_MAX_WORKERS #define ASYNC_NIF_MAX_QUEUED_REQS ASYNC_NIF_WORKER_QUEUE_SIZE * ASYNC_NIF_MAX_WORKERS
struct async_nif_req_entry { struct async_nif_req_entry {
@ -111,7 +111,7 @@ struct async_nif_state {
req = async_nif_reuse_req(async_nif); \ req = async_nif_reuse_req(async_nif); \
if (!req) { \ if (!req) { \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \ return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "eagain")); \ enif_make_atom(env, "enomem")); \
} \ } \
new_env = req->env; \ new_env = req->env; \
DPRINTF("async_nif: calling \"%s\"", __func__); \ DPRINTF("async_nif: calling \"%s\"", __func__); \
@ -359,8 +359,11 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
/* Build the term before releasing the lock so as not to race on the use of /* Build the term before releasing the lock so as not to race on the use of
the req pointer (which will soon become invalid in another thread the req pointer (which will soon become invalid in another thread
performing the request). */ performing the request). */
double pct_full = (double)avg_depth / (double)ASYNC_NIF_WORKER_QUEUE_SIZE;
ERL_NIF_TERM reply = enif_make_tuple2(req->env, enif_make_atom(req->env, "ok"), ERL_NIF_TERM reply = enif_make_tuple2(req->env, enif_make_atom(req->env, "ok"),
enif_make_atom(req->env, "enqueued")); enif_make_tuple2(req->env,
enif_make_atom(req->env, "enqueued"),
enif_make_double(req->env, pct_full)));
enif_cond_signal(q->reqs_cnd); enif_cond_signal(q->reqs_cnd);
enif_mutex_unlock(q->reqs_mutex); enif_mutex_unlock(q->reqs_mutex);
return reply; return reply;

View file

@ -22,24 +22,20 @@
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------
-define(ASYNC_NIF_CALL(Fun, Args), -define(ASYNC_NIF_CALL(Fun, Args),
F = fun(F) -> R = erlang:make_ref(),
R = erlang:make_ref(), case erlang:apply(Fun, [R|Args]) of
case erlang:apply(F, [R|Args]) of {ok, {enqueued, PercentFull}} ->
{ok, enqueued} -> erlang:bump_reductions(erlang:trunc(2000 * PercentFull)),
receive receive
{R, {error, shutdown}=Error} -> {R, {error, shutdown}=Error} ->
%% Work unit was queued, but not executed. %% Work unit was queued, but not executed.
Error; Error;
{R, {error, _Reason}=Error} -> {R, {error, _Reason}=Error} ->
%% Work unit returned an error. %% Work unit returned an error.
Error; Error;
{R, Reply} -> {R, Reply} ->
Reply Reply
end; end;
{error, eagain} -> Other ->
F(F); Other
Other -> end).
Other
end
end,
F(Fun)).

View file

@ -613,7 +613,7 @@ many_open_tables_test_() ->
DataDir = ?TEST_DATA_DIR, DataDir = ?TEST_DATA_DIR,
KeyGen = KeyGen =
fun(X) -> fun(X) ->
crypto:sha(<<X>>) crypto:hash(sha, <<X>>)
end, end,
ValGen = ValGen =
fun() -> fun() ->