Revert changes to async_nif and re-enable stats. Fixed selective recv.

This commit is contained in:
Gregory Burd 2013-07-31 09:41:36 -04:00
parent 2393257bef
commit c9a4ab8325
3 changed files with 62 additions and 61 deletions

View file

@ -198,7 +198,7 @@ async_nif_reuse_req(struct async_nif_state *async_nif)
env = enif_alloc_env();
if (env) {
req->env = env;
__sync_fetch_and_add(&async_nif->num_reqs, 1);
__sync_fetch_and_add(&async_nif->num_reqs, 1);
} else {
enif_free(req);
req = NULL;
@ -299,8 +299,8 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
qid = (unsigned int)hint;
} else {
do {
last_qid = __sync_fetch_and_add(&async_nif->next_q, 0);
qid = (last_qid + 1) % async_nif->num_queues;
last_qid = __sync_fetch_and_add(&async_nif->next_q, 0);
qid = (last_qid + 1) % async_nif->num_queues;
} while (!__sync_bool_compare_and_swap(&async_nif->next_q, last_qid, qid));
}
@ -393,16 +393,14 @@ async_nif_worker_fn(void *arg)
/* Queue is empty so we wait for more work to arrive. */
if (q->num_workers > ASYNC_NIF_MIN_WORKERS) {
enif_mutex_unlock(q->reqs_mutex);
if (tries == 0 && q == we->q) {
tries = async_nif->num_queues;
continue;
} else {
tries--;
__sync_fetch_and_add(&q->num_workers, -1);
q = q->next;
__sync_fetch_and_add(&q->num_workers, 1);
continue; // try next queue
}
if (tries == 0 && q == we->q) break; // we've tried all queues, thread exit
else {
tries--;
__sync_fetch_and_add(&q->num_workers, -1);
q = q->next;
__sync_fetch_and_add(&q->num_workers, 1);
continue; // try next queue
}
} else {
enif_cond_wait(q->reqs_cnd, q->reqs_mutex);
goto check_again_for_work;

View file

@ -21,22 +21,28 @@
%%
%% -------------------------------------------------------------------
-define(ASYNC_NIF_CALL(Fun, Args),
begin
NIFRef = erlang:make_ref(),
case erlang:apply(Fun, [NIFRef|Args]) of
{ok, enqueued} ->
receive
{NIFRef, {error, shutdown}=Error} ->
%% Work unit was queued, but not executed.
Error;
{NIFRef, {error, _Reason}=Error} ->
%% Work unit returned an error.
Error;
{NIFRef, Reply} ->
Reply
end;
Other ->
Other
end
end).
-spec async_nif_enqueue(function(), [term()]) -> term() | {error, term()}.
async_nif_enqueue(F, A) ->
R = erlang:make_ref(),
case erlang:apply(F, [R|A]) of
{ok, enqueued} ->
receive
{R, {error, shutdown}=Error} ->
%% Work unit was queued, but not executed.
Error;
{R, {error, _Reason}=Error} ->
%% Work unit returned an error.
Error;
{R, Reply} ->
Reply
end;
{error, eagain} ->
%% Work unit was not queued, try again.
async_nif_enqueue(F, A);
%{error, enomem} ->
%{error, shutdown} ->
Other ->
Other
end.
-define(ASYNC_NIF_CALL(Fun, Args), async_nif_enqueue(Fun, Args)).

View file

@ -340,26 +340,23 @@ is_empty(#state{connection=Connection, table=Table}) ->
%% @doc Get the status information for this wterl backend
-spec status(state()) -> [{atom(), term()}].
status(_) ->
[].
%% status(#state{connection=Connection, table=Table}) ->
%% case wterl:cursor_open(Connection, Table) of
%% {ok, Cursor} ->
%% TheStats =
%% case fetch_status(Cursor) of
%% {ok, Stats} ->
%% Stats;
%% {error, {eperm, _}} -> % TODO: review/fix this logic
%% {ok, []};
%% _ ->
%% {ok, []}
%% end,
%% wterl:cursor_close(Cursor),
%% TheStats;
%% {error, Reason2} ->
%% {error, Reason2}
%% end.
status(#state{connection=Connection, table=Table}) ->
case wterl:cursor_open(Connection, Table) of
{ok, Cursor} ->
TheStats =
case fetch_status(Cursor) of
{ok, Stats} ->
Stats;
{error, {eperm, _}} -> % TODO: review/fix this logic
{ok, []};
_ ->
{ok, []}
end,
wterl:cursor_close(Cursor),
TheStats;
{error, Reason2} ->
{error, Reason2}
end.
%% @doc Register an asynchronous callback
-spec callback(reference(), any(), state()) -> {ok, state()}.
@ -546,15 +543,15 @@ from_index_key(LKey) ->
%% @private
%% Return all status from wterl statistics cursor
%% fetch_status(Cursor) ->
%% {ok, fetch_status(Cursor, wterl:cursor_next_value(Cursor), [])}.
%% fetch_status(_Cursor, {error, _}, Acc) ->
%% lists:reverse(Acc);
%% fetch_status(_Cursor, not_found, Acc) ->
%% lists:reverse(Acc);
%% fetch_status(Cursor, {ok, Stat}, Acc) ->
%% [What,Val|_] = [binary_to_list(B) || B <- binary:split(Stat, [<<0>>], [global])],
%% fetch_status(Cursor, wterl:cursor_next_value(Cursor), [{What,Val}|Acc]).
fetch_status(Cursor) ->
{ok, fetch_status(Cursor, wterl:cursor_next_value(Cursor), [])}.
fetch_status(_Cursor, {error, _}, Acc) ->
lists:reverse(Acc);
fetch_status(_Cursor, not_found, Acc) ->
lists:reverse(Acc);
fetch_status(Cursor, {ok, Stat}, Acc) ->
[What,Val|_] = [binary_to_list(B) || B <- binary:split(Stat, [<<0>>], [global])],
fetch_status(Cursor, wterl:cursor_next_value(Cursor), [{What,Val}|Acc]).
size_cache(RequestedSize) ->
Size =