From c9a4ab8325706fdf9ac68ba525866001a304c642 Mon Sep 17 00:00:00 2001 From: Gregory Burd Date: Wed, 31 Jul 2013 09:41:36 -0400 Subject: [PATCH] Revert changes to async_nif and re-enable stats. Fixed selective recv. --- c_src/async_nif.h | 24 +++++++-------- src/async_nif.hrl | 44 ++++++++++++++++------------ src/riak_kv_wterl_backend.erl | 55 +++++++++++++++++------------------ 3 files changed, 62 insertions(+), 61 deletions(-) diff --git a/c_src/async_nif.h b/c_src/async_nif.h index 25d407c..008b5e0 100644 --- a/c_src/async_nif.h +++ b/c_src/async_nif.h @@ -198,7 +198,7 @@ async_nif_reuse_req(struct async_nif_state *async_nif) env = enif_alloc_env(); if (env) { req->env = env; - __sync_fetch_and_add(&async_nif->num_reqs, 1); + __sync_fetch_and_add(&async_nif->num_reqs, 1); } else { enif_free(req); req = NULL; @@ -299,8 +299,8 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en qid = (unsigned int)hint; } else { do { - last_qid = __sync_fetch_and_add(&async_nif->next_q, 0); - qid = (last_qid + 1) % async_nif->num_queues; + last_qid = __sync_fetch_and_add(&async_nif->next_q, 0); + qid = (last_qid + 1) % async_nif->num_queues; } while (!__sync_bool_compare_and_swap(&async_nif->next_q, last_qid, qid)); } @@ -393,16 +393,14 @@ async_nif_worker_fn(void *arg) /* Queue is empty so we wait for more work to arrive. */ if (q->num_workers > ASYNC_NIF_MIN_WORKERS) { enif_mutex_unlock(q->reqs_mutex); - if (tries == 0 && q == we->q) { - tries = async_nif->num_queues; - continue; - } else { - tries--; - __sync_fetch_and_add(&q->num_workers, -1); - q = q->next; - __sync_fetch_and_add(&q->num_workers, 1); - continue; // try next queue - } + if (tries == 0 && q == we->q) break; // we've tried all queues, thread exit + else { + tries--; + __sync_fetch_and_add(&q->num_workers, -1); + q = q->next; + __sync_fetch_and_add(&q->num_workers, 1); + continue; // try next queue + } } else { enif_cond_wait(q->reqs_cnd, q->reqs_mutex); goto check_again_for_work; diff --git a/src/async_nif.hrl b/src/async_nif.hrl index 5110fa2..44b7a2a 100644 --- a/src/async_nif.hrl +++ b/src/async_nif.hrl @@ -21,22 +21,28 @@ %% %% ------------------------------------------------------------------- --define(ASYNC_NIF_CALL(Fun, Args), - begin - NIFRef = erlang:make_ref(), - case erlang:apply(Fun, [NIFRef|Args]) of - {ok, enqueued} -> - receive - {NIFRef, {error, shutdown}=Error} -> - %% Work unit was queued, but not executed. - Error; - {NIFRef, {error, _Reason}=Error} -> - %% Work unit returned an error. - Error; - {NIFRef, Reply} -> - Reply - end; - Other -> - Other - end - end). +-spec async_nif_enqueue(function(), [term()]) -> term() | {error, term()}. +async_nif_enqueue(F, A) -> + R = erlang:make_ref(), + case erlang:apply(F, [R|A]) of + {ok, enqueued} -> + receive + {R, {error, shutdown}=Error} -> + %% Work unit was queued, but not executed. + Error; + {R, {error, _Reason}=Error} -> + %% Work unit returned an error. + Error; + {R, Reply} -> + Reply + end; + {error, eagain} -> + %% Work unit was not queued, try again. + async_nif_enqueue(F, A); + %{error, enomem} -> + %{error, shutdown} -> + Other -> + Other + end. + +-define(ASYNC_NIF_CALL(Fun, Args), async_nif_enqueue(Fun, Args)). diff --git a/src/riak_kv_wterl_backend.erl b/src/riak_kv_wterl_backend.erl index dcadbc2..c075317 100644 --- a/src/riak_kv_wterl_backend.erl +++ b/src/riak_kv_wterl_backend.erl @@ -340,26 +340,23 @@ is_empty(#state{connection=Connection, table=Table}) -> %% @doc Get the status information for this wterl backend -spec status(state()) -> [{atom(), term()}]. -status(_) -> - []. - -%% status(#state{connection=Connection, table=Table}) -> -%% case wterl:cursor_open(Connection, Table) of -%% {ok, Cursor} -> -%% TheStats = -%% case fetch_status(Cursor) of -%% {ok, Stats} -> -%% Stats; -%% {error, {eperm, _}} -> % TODO: review/fix this logic -%% {ok, []}; -%% _ -> -%% {ok, []} -%% end, -%% wterl:cursor_close(Cursor), -%% TheStats; -%% {error, Reason2} -> -%% {error, Reason2} -%% end. +status(#state{connection=Connection, table=Table}) -> + case wterl:cursor_open(Connection, Table) of + {ok, Cursor} -> + TheStats = + case fetch_status(Cursor) of + {ok, Stats} -> + Stats; + {error, {eperm, _}} -> % TODO: review/fix this logic + {ok, []}; + _ -> + {ok, []} + end, + wterl:cursor_close(Cursor), + TheStats; + {error, Reason2} -> + {error, Reason2} + end. %% @doc Register an asynchronous callback -spec callback(reference(), any(), state()) -> {ok, state()}. @@ -546,15 +543,15 @@ from_index_key(LKey) -> %% @private %% Return all status from wterl statistics cursor -%% fetch_status(Cursor) -> -%% {ok, fetch_status(Cursor, wterl:cursor_next_value(Cursor), [])}. -%% fetch_status(_Cursor, {error, _}, Acc) -> -%% lists:reverse(Acc); -%% fetch_status(_Cursor, not_found, Acc) -> -%% lists:reverse(Acc); -%% fetch_status(Cursor, {ok, Stat}, Acc) -> -%% [What,Val|_] = [binary_to_list(B) || B <- binary:split(Stat, [<<0>>], [global])], -%% fetch_status(Cursor, wterl:cursor_next_value(Cursor), [{What,Val}|Acc]). +fetch_status(Cursor) -> + {ok, fetch_status(Cursor, wterl:cursor_next_value(Cursor), [])}. +fetch_status(_Cursor, {error, _}, Acc) -> + lists:reverse(Acc); +fetch_status(_Cursor, not_found, Acc) -> + lists:reverse(Acc); +fetch_status(Cursor, {ok, Stat}, Acc) -> + [What,Val|_] = [binary_to_list(B) || B <- binary:split(Stat, [<<0>>], [global])], + fetch_status(Cursor, wterl:cursor_next_value(Cursor), [{What,Val}|Acc]). size_cache(RequestedSize) -> Size =