Return 'eagain' when request queue is full and then try the request again.
In the worst case is the request queue remains full and we loop between the NIF and Erlang forever trying over and over to enqueue the request. If that happens we shouldn't take schedulers offline as the NIF calls are fast and we shouldn't run out of memory as that is bounded. CPU will show a lot of activity, but progress will continue in Erlang.
This commit is contained in:
parent
f043a99ccb
commit
422dcfda89
3 changed files with 24 additions and 22 deletions
|
@ -96,7 +96,7 @@ struct async_nif_state {
|
||||||
new_env = req->env; \
|
new_env = req->env; \
|
||||||
if (!req) \
|
if (!req) \
|
||||||
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
|
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
|
||||||
enif_make_atom(env, "enomem")); \
|
enif_make_atom(env, "eagain")); \
|
||||||
do pre_block while(0); \
|
do pre_block while(0); \
|
||||||
copy_of_args = (struct decl ## _args *)enif_alloc(sizeof(struct decl ## _args)); \
|
copy_of_args = (struct decl ## _args *)enif_alloc(sizeof(struct decl ## _args)); \
|
||||||
if (!copy_of_args) { \
|
if (!copy_of_args) { \
|
||||||
|
|
|
@ -21,22 +21,25 @@
|
||||||
%%
|
%%
|
||||||
%% -------------------------------------------------------------------
|
%% -------------------------------------------------------------------
|
||||||
|
|
||||||
-define(ASYNC_NIF_CALL(Fun, Args),
|
-spec async_nif_enqueue(reference(), function(), [term()]) -> term() | {error, term()}.
|
||||||
begin
|
async_nif_enqueue(R, F, A) ->
|
||||||
NIFRef = erlang:make_ref(),
|
case erlang:apply(F, [R|A]) of
|
||||||
case erlang:apply(Fun, [NIFRef|Args]) of
|
{ok, enqueued} ->
|
||||||
{ok, enqueued} ->
|
receive
|
||||||
receive
|
{R, {error, eagain}} ->
|
||||||
{NIFRef, {error, shutdown}=Error} ->
|
%% Work unit was not queued, try again.
|
||||||
%% Work unit was queued, but not executed.
|
async_nif_enqueue(R, F, A);
|
||||||
Error;
|
{R, {error, shutdown}=Error} ->
|
||||||
{NIFRef, {error, _Reason}=Error} ->
|
%% Work unit was queued, but not executed.
|
||||||
%% Work unit returned an error.
|
Error;
|
||||||
Error;
|
{R, {error, _Reason}=Error} ->
|
||||||
{NIFRef, Reply} ->
|
%% Work unit returned an error.
|
||||||
Reply
|
Error;
|
||||||
end;
|
{R, Reply} ->
|
||||||
Other ->
|
Reply
|
||||||
Other
|
end;
|
||||||
end
|
Other ->
|
||||||
end).
|
Other
|
||||||
|
end.
|
||||||
|
|
||||||
|
-define(ASYNC_NIF_CALL(Fun, Args), async_nif_enqueue(erlang:make_ref(), Fun, Args)).
|
||||||
|
|
|
@ -69,8 +69,6 @@
|
||||||
|
|
||||||
-export([set_event_handler_pid/1]).
|
-export([set_event_handler_pid/1]).
|
||||||
|
|
||||||
-include("async_nif.hrl").
|
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
-ifdef(EQC).
|
-ifdef(EQC).
|
||||||
-include_lib("eqc/include/eqc.hrl").
|
-include_lib("eqc/include/eqc.hrl").
|
||||||
|
@ -90,6 +88,7 @@
|
||||||
|
|
||||||
-on_load(init/0).
|
-on_load(init/0).
|
||||||
|
|
||||||
|
-include("async_nif.hrl").
|
||||||
-define(nif_stub, nif_stub_error(?LINE)).
|
-define(nif_stub, nif_stub_error(?LINE)).
|
||||||
nif_stub_error(Line) ->
|
nif_stub_error(Line) ->
|
||||||
erlang:nif_error({nif_not_loaded,module,?MODULE,line,Line}).
|
erlang:nif_error({nif_not_loaded,module,?MODULE,line,Line}).
|
||||||
|
|
Loading…
Reference in a new issue