diff --git a/c_src/async_nif.h b/c_src/async_nif.h index 3a4a5aa..bb80a7c 100644 --- a/c_src/async_nif.h +++ b/c_src/async_nif.h @@ -35,9 +35,18 @@ extern "C" { #define ASYNC_NIF_MAX_WORKERS 1024 #define ASYNC_NIF_MIN_WORKERS 2 -#define ASYNC_NIF_WORKER_QUEUE_SIZE 256 +#define ASYNC_NIF_WORKER_QUEUE_SIZE 8192 #define ASYNC_NIF_MAX_QUEUED_REQS ASYNC_NIF_WORKER_QUEUE_SIZE * ASYNC_NIF_MAX_WORKERS +/* Atoms (initialized in on_load) */ +static ERL_NIF_TERM ATOM_EAGAIN; +static ERL_NIF_TERM ATOM_ENOMEM; +static ERL_NIF_TERM ATOM_ENQUEUED; +static ERL_NIF_TERM ATOM_ERROR; +static ERL_NIF_TERM ATOM_OK; +static ERL_NIF_TERM ATOM_SHUTDOWN; + + struct async_nif_req_entry { ERL_NIF_TERM ref; ErlNifEnv *env; @@ -105,15 +114,11 @@ struct async_nif_state { argc -= 1; \ /* Note: !!! this assumes that the first element of priv_data is ours */ \ struct async_nif_state *async_nif = *(struct async_nif_state**)enif_priv_data(env); \ - if (async_nif->shutdown) { \ - return enif_make_tuple2(env, enif_make_atom(env, "error"), \ - enif_make_atom(env, "shutdown")); \ - } \ + if (async_nif->shutdown) \ + return enif_make_tuple2(env, ATOM_ERROR, ATOM_SHUTDOWN); \ req = async_nif_reuse_req(async_nif); \ - if (!req) { \ - return enif_make_tuple2(env, enif_make_atom(env, "error"), \ - enif_make_atom(env, "eagain")); \ - } \ + if (!req) \ + return enif_make_tuple2(env, ATOM_ERROR, ATOM_ENOMEM); \ new_env = req->env; \ DPRINTF("async_nif: calling \"%s\"", __func__); \ do pre_block while(0); \ @@ -122,8 +127,7 @@ struct async_nif_state { if (!copy_of_args) { \ fn_post_ ## decl (args); \ async_nif_recycle_req(req, async_nif); \ - return enif_make_tuple2(env, enif_make_atom(env, "error"), \ - enif_make_atom(env, "enomem")); \ + return enif_make_tuple2(env, ATOM_ERROR, ATOM_ENOMEM); \ } \ memcpy(copy_of_args, args, sizeof(struct decl ## _args)); \ req->ref = enif_make_copy(new_env, argv_in[0]); \ @@ -139,8 +143,7 @@ struct async_nif_state { fn_post_ ## decl (args); \ async_nif_recycle_req(req, async_nif); \ free(copy_of_args); \ - return enif_make_tuple2(env, enif_make_atom(env, "error"), \ - enif_make_atom(env, "eagain")); \ + return enif_make_tuple2(env, ATOM_ERROR, ATOM_EAGAIN); \ } \ return reply; \ } @@ -353,8 +356,10 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en /* Build the term before releasing the lock so as not to race on the use of the req pointer (which will soon become invalid in another thread performing the request). */ - ERL_NIF_TERM reply = enif_make_tuple2(req->env, enif_make_atom(req->env, "ok"), - enif_make_atom(req->env, "enqueued")); + double pct_full = (double)avg_depth / (double)ASYNC_NIF_WORKER_QUEUE_SIZE; + ERL_NIF_TERM reply = enif_make_tuple2(req->env, ATOM_OK, + enif_make_tuple2(req->env, ATOM_ENQUEUED, + enif_make_double(req->env, pct_full))); pthread_cond_signal(&q->reqs_cnd); pthread_mutex_unlock(&q->reqs_mutex); return reply; @@ -496,8 +501,7 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif) struct async_nif_req_entry *n = STAILQ_NEXT(req, entries); enif_clear_env(req->env); enif_send(NULL, &req->pid, req->env, - enif_make_tuple2(req->env, enif_make_atom(req->env, "error"), - enif_make_atom(req->env, "shutdown"))); + enif_make_tuple2(req->env, ATOM_ERROR, ATOM_SHUTDOWN)); req->fn_post(req->args); enif_free_env(req->env); free(req->args); @@ -526,7 +530,7 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif) } static void * -async_nif_load() +async_nif_load(ErlNifEnv *env) { static int has_init = 0; unsigned int i, num_queues; @@ -537,6 +541,14 @@ async_nif_load() if (has_init) return 0; else has_init = 1; + /* Init some static references to commonly used atoms. */ + ATOM_EAGAIN = enif_make_atom(env, "eagain"); + ATOM_ENOMEM = enif_make_atom(env, "enomem"); + ATOM_ENQUEUED = enif_make_atom(env, "enqueued"); + ATOM_ERROR = enif_make_atom(env, "error"); + ATOM_OK = enif_make_atom(env, "ok"); + ATOM_SHUTDOWN = enif_make_atom(env, "shutdown"); + /* Find out how many schedulers there are. */ enif_system_info(&info, sizeof(ErlNifSysInfo)); diff --git a/c_src/lmdb_nif.c b/c_src/lmdb_nif.c index 5b1811a..262178a 100644 --- a/c_src/lmdb_nif.c +++ b/c_src/lmdb_nif.c @@ -1096,7 +1096,7 @@ lmdb_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) /* Note: !!! the first element of our priv_data struct *must* be the pointer to the async_nif's private data which we set here. */ - ASYNC_NIF_LOAD(lmdb, priv->async_nif_priv); + ASYNC_NIF_LOAD(lmdb, env, priv->async_nif_priv); if (!priv) FAIL_ERR(ENOMEM, err2); *priv_data = priv; diff --git a/src/async_nif.hrl b/src/async_nif.hrl index b8f7be3..5492e97 100644 --- a/src/async_nif.hrl +++ b/src/async_nif.hrl @@ -24,8 +24,14 @@ -define(ASYNC_NIF_CALL(Fun, Args), F = fun(F, T) -> R = erlang:make_ref(), - case erlang:apply(F, [R|Args]) of - {ok, enqueued} -> + case erlang:apply(Fun, [R|Args]) of + {ok, {enqueued, PctBusy}} -> + if + PctBusy > 0.25 andalso PctBusy =< 1.0 -> + erlang:bump_reductions(erlang:trunc(2000 * PctBusy)); + true -> + ok + end, receive {R, {error, shutdown}=Error} -> %% Work unit was queued, but not executed. @@ -37,11 +43,12 @@ Reply end; {error, eagain} -> - SleepyTime = min(30, (T+1)*2), - timer:sleep(SleepyTime), - F(F, SleepyTime); + case T of + 3 -> not_found; + _ -> F(F, T + 1) + end; Other -> Other end end, - F(Fun, 0)). + F(F, 1)).