Merge remote-tracking branch 'origin/master' into gsb-finish-lmdb-api

Conflicts:
	c_src/async_nif.h
	src/async_nif.hrl
This commit is contained in:
Gregory Burd 2013-09-17 14:40:36 -04:00
commit 04784be0df
3 changed files with 44 additions and 25 deletions

View file

@ -35,9 +35,18 @@ extern "C" {
#define ASYNC_NIF_MAX_WORKERS 1024 #define ASYNC_NIF_MAX_WORKERS 1024
#define ASYNC_NIF_MIN_WORKERS 2 #define ASYNC_NIF_MIN_WORKERS 2
#define ASYNC_NIF_WORKER_QUEUE_SIZE 256 #define ASYNC_NIF_WORKER_QUEUE_SIZE 8192
#define ASYNC_NIF_MAX_QUEUED_REQS ASYNC_NIF_WORKER_QUEUE_SIZE * ASYNC_NIF_MAX_WORKERS #define ASYNC_NIF_MAX_QUEUED_REQS ASYNC_NIF_WORKER_QUEUE_SIZE * ASYNC_NIF_MAX_WORKERS
/* Atoms (initialized in on_load) */
static ERL_NIF_TERM ATOM_EAGAIN;
static ERL_NIF_TERM ATOM_ENOMEM;
static ERL_NIF_TERM ATOM_ENQUEUED;
static ERL_NIF_TERM ATOM_ERROR;
static ERL_NIF_TERM ATOM_OK;
static ERL_NIF_TERM ATOM_SHUTDOWN;
struct async_nif_req_entry { struct async_nif_req_entry {
ERL_NIF_TERM ref; ERL_NIF_TERM ref;
ErlNifEnv *env; ErlNifEnv *env;
@ -105,15 +114,11 @@ struct async_nif_state {
argc -= 1; \ argc -= 1; \
/* Note: !!! this assumes that the first element of priv_data is ours */ \ /* Note: !!! this assumes that the first element of priv_data is ours */ \
struct async_nif_state *async_nif = *(struct async_nif_state**)enif_priv_data(env); \ struct async_nif_state *async_nif = *(struct async_nif_state**)enif_priv_data(env); \
if (async_nif->shutdown) { \ if (async_nif->shutdown) \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \ return enif_make_tuple2(env, ATOM_ERROR, ATOM_SHUTDOWN); \
enif_make_atom(env, "shutdown")); \
} \
req = async_nif_reuse_req(async_nif); \ req = async_nif_reuse_req(async_nif); \
if (!req) { \ if (!req) \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \ return enif_make_tuple2(env, ATOM_ERROR, ATOM_ENOMEM); \
enif_make_atom(env, "eagain")); \
} \
new_env = req->env; \ new_env = req->env; \
DPRINTF("async_nif: calling \"%s\"", __func__); \ DPRINTF("async_nif: calling \"%s\"", __func__); \
do pre_block while(0); \ do pre_block while(0); \
@ -122,8 +127,7 @@ struct async_nif_state {
if (!copy_of_args) { \ if (!copy_of_args) { \
fn_post_ ## decl (args); \ fn_post_ ## decl (args); \
async_nif_recycle_req(req, async_nif); \ async_nif_recycle_req(req, async_nif); \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \ return enif_make_tuple2(env, ATOM_ERROR, ATOM_ENOMEM); \
enif_make_atom(env, "enomem")); \
} \ } \
memcpy(copy_of_args, args, sizeof(struct decl ## _args)); \ memcpy(copy_of_args, args, sizeof(struct decl ## _args)); \
req->ref = enif_make_copy(new_env, argv_in[0]); \ req->ref = enif_make_copy(new_env, argv_in[0]); \
@ -139,8 +143,7 @@ struct async_nif_state {
fn_post_ ## decl (args); \ fn_post_ ## decl (args); \
async_nif_recycle_req(req, async_nif); \ async_nif_recycle_req(req, async_nif); \
free(copy_of_args); \ free(copy_of_args); \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \ return enif_make_tuple2(env, ATOM_ERROR, ATOM_EAGAIN); \
enif_make_atom(env, "eagain")); \
} \ } \
return reply; \ return reply; \
} }
@ -353,8 +356,10 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
/* Build the term before releasing the lock so as not to race on the use of /* Build the term before releasing the lock so as not to race on the use of
the req pointer (which will soon become invalid in another thread the req pointer (which will soon become invalid in another thread
performing the request). */ performing the request). */
ERL_NIF_TERM reply = enif_make_tuple2(req->env, enif_make_atom(req->env, "ok"), double pct_full = (double)avg_depth / (double)ASYNC_NIF_WORKER_QUEUE_SIZE;
enif_make_atom(req->env, "enqueued")); ERL_NIF_TERM reply = enif_make_tuple2(req->env, ATOM_OK,
enif_make_tuple2(req->env, ATOM_ENQUEUED,
enif_make_double(req->env, pct_full)));
pthread_cond_signal(&q->reqs_cnd); pthread_cond_signal(&q->reqs_cnd);
pthread_mutex_unlock(&q->reqs_mutex); pthread_mutex_unlock(&q->reqs_mutex);
return reply; return reply;
@ -496,8 +501,7 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
struct async_nif_req_entry *n = STAILQ_NEXT(req, entries); struct async_nif_req_entry *n = STAILQ_NEXT(req, entries);
enif_clear_env(req->env); enif_clear_env(req->env);
enif_send(NULL, &req->pid, req->env, enif_send(NULL, &req->pid, req->env,
enif_make_tuple2(req->env, enif_make_atom(req->env, "error"), enif_make_tuple2(req->env, ATOM_ERROR, ATOM_SHUTDOWN));
enif_make_atom(req->env, "shutdown")));
req->fn_post(req->args); req->fn_post(req->args);
enif_free_env(req->env); enif_free_env(req->env);
free(req->args); free(req->args);
@ -526,7 +530,7 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
} }
static void * static void *
async_nif_load() async_nif_load(ErlNifEnv *env)
{ {
static int has_init = 0; static int has_init = 0;
unsigned int i, num_queues; unsigned int i, num_queues;
@ -537,6 +541,14 @@ async_nif_load()
if (has_init) return 0; if (has_init) return 0;
else has_init = 1; else has_init = 1;
/* Init some static references to commonly used atoms. */
ATOM_EAGAIN = enif_make_atom(env, "eagain");
ATOM_ENOMEM = enif_make_atom(env, "enomem");
ATOM_ENQUEUED = enif_make_atom(env, "enqueued");
ATOM_ERROR = enif_make_atom(env, "error");
ATOM_OK = enif_make_atom(env, "ok");
ATOM_SHUTDOWN = enif_make_atom(env, "shutdown");
/* Find out how many schedulers there are. */ /* Find out how many schedulers there are. */
enif_system_info(&info, sizeof(ErlNifSysInfo)); enif_system_info(&info, sizeof(ErlNifSysInfo));

View file

@ -1096,7 +1096,7 @@ lmdb_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
/* Note: !!! the first element of our priv_data struct *must* be the /* Note: !!! the first element of our priv_data struct *must* be the
pointer to the async_nif's private data which we set here. */ pointer to the async_nif's private data which we set here. */
ASYNC_NIF_LOAD(lmdb, priv->async_nif_priv); ASYNC_NIF_LOAD(lmdb, env, priv->async_nif_priv);
if (!priv) if (!priv)
FAIL_ERR(ENOMEM, err2); FAIL_ERR(ENOMEM, err2);
*priv_data = priv; *priv_data = priv;

View file

@ -24,8 +24,14 @@
-define(ASYNC_NIF_CALL(Fun, Args), -define(ASYNC_NIF_CALL(Fun, Args),
F = fun(F, T) -> F = fun(F, T) ->
R = erlang:make_ref(), R = erlang:make_ref(),
case erlang:apply(F, [R|Args]) of case erlang:apply(Fun, [R|Args]) of
{ok, enqueued} -> {ok, {enqueued, PctBusy}} ->
if
PctBusy > 0.25 andalso PctBusy =< 1.0 ->
erlang:bump_reductions(erlang:trunc(2000 * PctBusy));
true ->
ok
end,
receive receive
{R, {error, shutdown}=Error} -> {R, {error, shutdown}=Error} ->
%% Work unit was queued, but not executed. %% Work unit was queued, but not executed.
@ -37,11 +43,12 @@
Reply Reply
end; end;
{error, eagain} -> {error, eagain} ->
SleepyTime = min(30, (T+1)*2), case T of
timer:sleep(SleepyTime), 3 -> not_found;
F(F, SleepyTime); _ -> F(F, T + 1)
end;
Other -> Other ->
Other Other
end end
end, end,
F(Fun, 0)). F(F, 1)).