Update async_nif to latest version.

This commit is contained in:
Gregory Burd 2013-09-17 13:23:31 -04:00
parent 85295c7890
commit 0c98a25ade
3 changed files with 140 additions and 104 deletions

View file

@ -34,9 +34,18 @@ extern "C" {
#define ASYNC_NIF_MAX_WORKERS 1024 #define ASYNC_NIF_MAX_WORKERS 1024
#define ASYNC_NIF_MIN_WORKERS 2 #define ASYNC_NIF_MIN_WORKERS 2
#define ASYNC_NIF_WORKER_QUEUE_SIZE 100 #define ASYNC_NIF_WORKER_QUEUE_SIZE 8192
#define ASYNC_NIF_MAX_QUEUED_REQS ASYNC_NIF_WORKER_QUEUE_SIZE * ASYNC_NIF_MAX_WORKERS #define ASYNC_NIF_MAX_QUEUED_REQS ASYNC_NIF_WORKER_QUEUE_SIZE * ASYNC_NIF_MAX_WORKERS
/* Atoms (initialized in on_load) */
static ERL_NIF_TERM ATOM_EAGAIN;
static ERL_NIF_TERM ATOM_ENOMEM;
static ERL_NIF_TERM ATOM_ENQUEUED;
static ERL_NIF_TERM ATOM_ERROR;
static ERL_NIF_TERM ATOM_OK;
static ERL_NIF_TERM ATOM_SHUTDOWN;
struct async_nif_req_entry { struct async_nif_req_entry {
ERL_NIF_TERM ref; ERL_NIF_TERM ref;
ErlNifEnv *env; ErlNifEnv *env;
@ -53,6 +62,7 @@ struct async_nif_work_queue {
unsigned int depth; unsigned int depth;
ErlNifMutex *reqs_mutex; ErlNifMutex *reqs_mutex;
ErlNifCond *reqs_cnd; ErlNifCond *reqs_cnd;
struct async_nif_work_queue *next;
STAILQ_HEAD(reqs, async_nif_req_entry) reqs; STAILQ_HEAD(reqs, async_nif_req_entry) reqs;
}; };
@ -103,25 +113,20 @@ struct async_nif_state {
argc -= 1; \ argc -= 1; \
/* Note: !!! this assumes that the first element of priv_data is ours */ \ /* Note: !!! this assumes that the first element of priv_data is ours */ \
struct async_nif_state *async_nif = *(struct async_nif_state**)enif_priv_data(env); \ struct async_nif_state *async_nif = *(struct async_nif_state**)enif_priv_data(env); \
if (async_nif->shutdown) { \ if (async_nif->shutdown) \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \ return enif_make_tuple2(env, ATOM_ERROR, ATOM_SHUTDOWN); \
enif_make_atom(env, "shutdown")); \
} \
req = async_nif_reuse_req(async_nif); \ req = async_nif_reuse_req(async_nif); \
if (!req) { \ if (!req) \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \ return enif_make_tuple2(env, ATOM_ERROR, ATOM_ENOMEM); \
enif_make_atom(env, "eagain")); \
} \
new_env = req->env; \ new_env = req->env; \
DPRINTF("async_nif: calling \"%s\"", __func__); \ DPRINTF("async_nif: calling \"%s\"", __func__); \
do pre_block while(0); \ do pre_block while(0); \
DPRINTF("async_nif: returned from \"%s\"", __func__); \ DPRINTF("async_nif: returned from \"%s\"", __func__); \
copy_of_args = (struct decl ## _args *)enif_alloc(sizeof(struct decl ## _args)); \ copy_of_args = (struct decl ## _args *)malloc(sizeof(struct decl ## _args)); \
if (!copy_of_args) { \ if (!copy_of_args) { \
fn_post_ ## decl (args); \ fn_post_ ## decl (args); \
async_nif_recycle_req(req, async_nif); \ async_nif_recycle_req(req, async_nif); \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \ return enif_make_tuple2(env, ATOM_ERROR, ATOM_ENOMEM); \
enif_make_atom(env, "enomem")); \
} \ } \
memcpy(copy_of_args, args, sizeof(struct decl ## _args)); \ memcpy(copy_of_args, args, sizeof(struct decl ## _args)); \
req->ref = enif_make_copy(new_env, argv_in[0]); \ req->ref = enif_make_copy(new_env, argv_in[0]); \
@ -136,9 +141,8 @@ struct async_nif_state {
if (!reply) { \ if (!reply) { \
fn_post_ ## decl (args); \ fn_post_ ## decl (args); \
async_nif_recycle_req(req, async_nif); \ async_nif_recycle_req(req, async_nif); \
enif_free(copy_of_args); \ free(copy_of_args); \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \ return enif_make_tuple2(env, ATOM_ERROR, ATOM_EAGAIN); \
enif_make_atom(env, "eagain")); \
} \ } \
return reply; \ return reply; \
} }
@ -146,16 +150,16 @@ struct async_nif_state {
#define ASYNC_NIF_INIT(name) \ #define ASYNC_NIF_INIT(name) \
static ErlNifMutex *name##_async_nif_coord = NULL; static ErlNifMutex *name##_async_nif_coord = NULL;
#define ASYNC_NIF_LOAD(name, priv) do { \ #define ASYNC_NIF_LOAD(name, env, priv) do { \
if (!name##_async_nif_coord) \ if (!name##_async_nif_coord) \
name##_async_nif_coord = enif_mutex_create(NULL); \ name##_async_nif_coord = enif_mutex_create("nif_coord load"); \
enif_mutex_lock(name##_async_nif_coord); \ enif_mutex_lock(name##_async_nif_coord); \
priv = async_nif_load(); \ priv = async_nif_load(env); \
enif_mutex_unlock(name##_async_nif_coord); \ enif_mutex_unlock(name##_async_nif_coord); \
} while(0); } while(0);
#define ASYNC_NIF_UNLOAD(name, env, priv) do { \ #define ASYNC_NIF_UNLOAD(name, env, priv) do { \
if (!name##_async_nif_coord) \ if (!name##_async_nif_coord) \
name##_async_nif_coord = enif_mutex_create(NULL); \ name##_async_nif_coord = enif_mutex_create("nif_coord unload"); \
enif_mutex_lock(name##_async_nif_coord); \ enif_mutex_lock(name##_async_nif_coord); \
async_nif_unload(env, priv); \ async_nif_unload(env, priv); \
enif_mutex_unlock(name##_async_nif_coord); \ enif_mutex_unlock(name##_async_nif_coord); \
@ -164,7 +168,7 @@ struct async_nif_state {
} while(0); } while(0);
#define ASYNC_NIF_UPGRADE(name, env) do { \ #define ASYNC_NIF_UPGRADE(name, env) do { \
if (!name##_async_nif_coord) \ if (!name##_async_nif_coord) \
name##_async_nif_coord = enif_mutex_create(NULL); \ name##_async_nif_coord = enif_mutex_create("nif_coord upgrade"); \
enif_mutex_lock(name##_async_nif_coord); \ enif_mutex_lock(name##_async_nif_coord); \
async_nif_upgrade(env); \ async_nif_upgrade(env); \
enif_mutex_unlock(name##_async_nif_coord); \ enif_mutex_unlock(name##_async_nif_coord); \
@ -191,15 +195,15 @@ async_nif_reuse_req(struct async_nif_state *async_nif)
enif_mutex_lock(async_nif->recycled_req_mutex); enif_mutex_lock(async_nif->recycled_req_mutex);
if (STAILQ_EMPTY(&async_nif->recycled_reqs)) { if (STAILQ_EMPTY(&async_nif->recycled_reqs)) {
if (async_nif->num_reqs < ASYNC_NIF_MAX_QUEUED_REQS) { if (async_nif->num_reqs < ASYNC_NIF_MAX_QUEUED_REQS) {
req = enif_alloc(sizeof(struct async_nif_req_entry)); req = malloc(sizeof(struct async_nif_req_entry));
if (req) { if (req) {
memset(req, 0, sizeof(struct async_nif_req_entry)); memset(req, 0, sizeof(struct async_nif_req_entry));
env = enif_alloc_env(); env = enif_alloc_env();
if (env) { if (env) {
req->env = env; req->env = env;
async_nif->num_reqs++; __sync_fetch_and_add(&async_nif->num_reqs, 1);
} else { } else {
enif_free(req); free(req);
req = NULL; req = NULL;
} }
} }
@ -253,7 +257,7 @@ async_nif_start_worker(struct async_nif_state *async_nif, struct async_nif_work_
SLIST_REMOVE(&async_nif->we_joining, we, async_nif_worker_entry, entries); SLIST_REMOVE(&async_nif->we_joining, we, async_nif_worker_entry, entries);
void *exit_value = 0; /* We ignore the thread_join's exit value. */ void *exit_value = 0; /* We ignore the thread_join's exit value. */
enif_thread_join(we->tid, &exit_value); enif_thread_join(we->tid, &exit_value);
enif_free(we); free(we);
async_nif->we_active--; async_nif->we_active--;
we = n; we = n;
} }
@ -263,7 +267,7 @@ async_nif_start_worker(struct async_nif_state *async_nif, struct async_nif_work_
return EAGAIN; return EAGAIN;
} }
we = enif_alloc(sizeof(struct async_nif_worker_entry)); we = malloc(sizeof(struct async_nif_worker_entry));
if (!we) { if (!we) {
enif_mutex_unlock(async_nif->we_mutex); enif_mutex_unlock(async_nif->we_mutex);
return ENOMEM; return ENOMEM;
@ -287,7 +291,7 @@ static ERL_NIF_TERM
async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_entry *req, int hint) async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_entry *req, int hint)
{ {
/* Identify the most appropriate worker for this request. */ /* Identify the most appropriate worker for this request. */
unsigned int i, qid = 0; unsigned int i, last_qid, qid = 0;
struct async_nif_work_queue *q = NULL; struct async_nif_work_queue *q = NULL;
double avg_depth = 0.0; double avg_depth = 0.0;
@ -297,9 +301,10 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
if (hint >= 0) { if (hint >= 0) {
qid = (unsigned int)hint; qid = (unsigned int)hint;
} else { } else {
qid = async_nif->next_q; do {
qid = (qid + 1) % async_nif->num_queues; last_qid = __sync_fetch_and_add(&async_nif->next_q, 0);
async_nif->next_q = qid; qid = (last_qid + 1) % async_nif->num_queues;
} while (!__sync_bool_compare_and_swap(&async_nif->next_q, last_qid, qid));
} }
/* Now we inspect and interate across the set of queues trying to select one /* Now we inspect and interate across the set of queues trying to select one
@ -314,18 +319,13 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
avg_depth += async_nif->queues[j].depth; avg_depth += async_nif->queues[j].depth;
} }
} }
if (avg_depth != 0) if (avg_depth) avg_depth /= n;
avg_depth /= n;
/* Lock this queue under consideration, then check for shutdown. While /* Lock this queue under consideration, then check for shutdown. While
we hold this lock either a) we're shutting down so exit now or b) this we hold this lock either a) we're shutting down so exit now or b) this
queue will be valid until we release the lock. */ queue will be valid until we release the lock. */
q = &async_nif->queues[qid]; q = &async_nif->queues[qid];
enif_mutex_lock(q->reqs_mutex); enif_mutex_lock(q->reqs_mutex);
if (async_nif->shutdown) {
enif_mutex_unlock(q->reqs_mutex);
return 0;
}
/* Try not to enqueue a request into a queue that isn't keeping up with /* Try not to enqueue a request into a queue that isn't keeping up with
the request volume. */ the request volume. */
@ -337,26 +337,31 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
} }
/* If the for loop finished then we didn't find a suitable queue for this /* If the for loop finished then we didn't find a suitable queue for this
request, meaning we're backed up so trigger eagain. */ request, meaning we're backed up so trigger eagain. Note that if we left
if (i == async_nif->num_queues) { the loop in this way we hold no lock. */
enif_mutex_unlock(q->reqs_mutex); if (i == async_nif->num_queues) return 0;
return 0;
}
/* Add the request to the queue. */ /* Add the request to the queue. */
STAILQ_INSERT_TAIL(&q->reqs, req, entries); STAILQ_INSERT_TAIL(&q->reqs, req, entries);
q->depth++; __sync_fetch_and_add(&q->depth, 1);
/* We've selected a queue for this new request now check to make sure there are /* We've selected a queue for this new request now check to make sure there are
enough workers actively processing requests on this queue. */ enough workers actively processing requests on this queue. */
if (q->depth > q->num_workers) while (q->depth > q->num_workers) {
if (async_nif_start_worker(async_nif, q) == 0) q->num_workers++; switch(async_nif_start_worker(async_nif, q)) {
case EINVAL: case ENOMEM: default: return 0;
case EAGAIN: continue;
case 0: __sync_fetch_and_add(&q->num_workers, 1); goto done;
}
}done:;
/* Build the term before releasing the lock so as not to race on the use of /* Build the term before releasing the lock so as not to race on the use of
the req pointer (which will soon become invalid in another thread the req pointer (which will soon become invalid in another thread
performing the request). */ performing the request). */
ERL_NIF_TERM reply = enif_make_tuple2(req->env, enif_make_atom(req->env, "ok"), double pct_full = (double)avg_depth / (double)ASYNC_NIF_WORKER_QUEUE_SIZE;
enif_make_atom(req->env, "enqueued")); ERL_NIF_TERM reply = enif_make_tuple2(req->env, ATOM_OK,
enif_make_tuple2(req->env, ATOM_ENQUEUED,
enif_make_double(req->env, pct_full)));
enif_cond_signal(q->reqs_cnd); enif_cond_signal(q->reqs_cnd);
enif_mutex_unlock(q->reqs_mutex); enif_mutex_unlock(q->reqs_mutex);
return reply; return reply;
@ -375,6 +380,7 @@ async_nif_worker_fn(void *arg)
struct async_nif_state *async_nif = we->async_nif; struct async_nif_state *async_nif = we->async_nif;
struct async_nif_work_queue *q = we->q; struct async_nif_work_queue *q = we->q;
struct async_nif_req_entry *req = NULL; struct async_nif_req_entry *req = NULL;
unsigned int tries = async_nif->num_queues;
for(;;) { for(;;) {
/* Examine the request queue, are there things to be done? */ /* Examine the request queue, are there things to be done? */
@ -386,22 +392,33 @@ async_nif_worker_fn(void *arg)
} }
if (STAILQ_EMPTY(&q->reqs)) { if (STAILQ_EMPTY(&q->reqs)) {
/* Queue is empty so we wait for more work to arrive. */ /* Queue is empty so we wait for more work to arrive. */
if (q->num_workers > ASYNC_NIF_MIN_WORKERS) {
enif_mutex_unlock(q->reqs_mutex); enif_mutex_unlock(q->reqs_mutex);
if (tries == 0 && q == we->q) {
if (q->num_workers > ASYNC_NIF_MIN_WORKERS) {
/* At this point we've tried to find/execute work on all queues
* and there are at least MIN_WORKERS on this queue so we
* leaving this loop (break) which leads to a thread exit/join. */
break; break;
} else { } else {
enif_mutex_lock(q->reqs_mutex);
enif_cond_wait(q->reqs_cnd, q->reqs_mutex); enif_cond_wait(q->reqs_cnd, q->reqs_mutex);
goto check_again_for_work; goto check_again_for_work;
} }
} else {
tries--;
__sync_fetch_and_add(&q->num_workers, -1);
q = q->next;
__sync_fetch_and_add(&q->num_workers, 1);
continue; // try next queue
}
} else { } else {
/* At this point the next req is ours to process and we hold the /* At this point the next req is ours to process and we hold the
reqs_mutex lock. Take the request off the queue. */ reqs_mutex lock. Take the request off the queue. */
req = STAILQ_FIRST(&q->reqs); req = STAILQ_FIRST(&q->reqs);
STAILQ_REMOVE(&q->reqs, req, async_nif_req_entry, entries); STAILQ_REMOVE(&q->reqs, req, async_nif_req_entry, entries);
q->depth--; __sync_fetch_and_add(&q->depth, -1);
/* Ensure that there is at least one other worker thread watching this /* Wake up other worker thread watching this queue to help process work. */
queue. */
enif_cond_signal(q->reqs_cnd); enif_cond_signal(q->reqs_cnd);
enif_mutex_unlock(q->reqs_mutex); enif_mutex_unlock(q->reqs_mutex);
@ -415,7 +432,7 @@ async_nif_worker_fn(void *arg)
req->ref = 0; req->ref = 0;
req->fn_work = 0; req->fn_work = 0;
req->fn_post = 0; req->fn_post = 0;
enif_free(req->args); free(req->args);
req->args = NULL; req->args = NULL;
async_nif_recycle_req(req, async_nif); async_nif_recycle_req(req, async_nif);
req = NULL; req = NULL;
@ -424,7 +441,7 @@ async_nif_worker_fn(void *arg)
enif_mutex_lock(async_nif->we_mutex); enif_mutex_lock(async_nif->we_mutex);
SLIST_INSERT_HEAD(&async_nif->we_joining, we, entries); SLIST_INSERT_HEAD(&async_nif->we_joining, we, entries);
enif_mutex_unlock(async_nif->we_mutex); enif_mutex_unlock(async_nif->we_mutex);
q->num_workers--; __sync_fetch_and_add(&q->num_workers, -1);
enif_thread_exit(0); enif_thread_exit(0);
return 0; return 0;
} }
@ -467,7 +484,7 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
SLIST_REMOVE(&async_nif->we_joining, we, async_nif_worker_entry, entries); SLIST_REMOVE(&async_nif->we_joining, we, async_nif_worker_entry, entries);
void *exit_value = 0; /* We ignore the thread_join's exit value. */ void *exit_value = 0; /* We ignore the thread_join's exit value. */
enif_thread_join(we->tid, &exit_value); enif_thread_join(we->tid, &exit_value);
enif_free(we); free(we);
async_nif->we_active--; async_nif->we_active--;
we = n; we = n;
} }
@ -486,12 +503,11 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
struct async_nif_req_entry *n = STAILQ_NEXT(req, entries); struct async_nif_req_entry *n = STAILQ_NEXT(req, entries);
enif_clear_env(req->env); enif_clear_env(req->env);
enif_send(NULL, &req->pid, req->env, enif_send(NULL, &req->pid, req->env,
enif_make_tuple2(req->env, enif_make_atom(req->env, "error"), enif_make_tuple2(req->env, ATOM_ERROR, ATOM_SHUTDOWN));
enif_make_atom(req->env, "shutdown")));
req->fn_post(req->args); req->fn_post(req->args);
enif_free_env(req->env); enif_free_env(req->env);
enif_free(req->args); free(req->args);
enif_free(req); free(req);
req = n; req = n;
} }
enif_mutex_destroy(q->reqs_mutex); enif_mutex_destroy(q->reqs_mutex);
@ -505,18 +521,18 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
while(req != NULL) { while(req != NULL) {
struct async_nif_req_entry *n = STAILQ_NEXT(req, entries); struct async_nif_req_entry *n = STAILQ_NEXT(req, entries);
enif_free_env(req->env); enif_free_env(req->env);
enif_free(req); free(req);
req = n; req = n;
} }
enif_mutex_unlock(async_nif->recycled_req_mutex); enif_mutex_unlock(async_nif->recycled_req_mutex);
enif_mutex_destroy(async_nif->recycled_req_mutex); enif_mutex_destroy(async_nif->recycled_req_mutex);
memset(async_nif, 0, sizeof(struct async_nif_state) + (sizeof(struct async_nif_work_queue) * async_nif->num_queues)); memset(async_nif, 0, sizeof(struct async_nif_state) + (sizeof(struct async_nif_work_queue) * async_nif->num_queues));
enif_free(async_nif); free(async_nif);
} }
static void * static void *
async_nif_load() async_nif_load(ErlNifEnv *env)
{ {
static int has_init = 0; static int has_init = 0;
unsigned int i, num_queues; unsigned int i, num_queues;
@ -527,6 +543,14 @@ async_nif_load()
if (has_init) return 0; if (has_init) return 0;
else has_init = 1; else has_init = 1;
/* Init some static references to commonly used atoms. */
ATOM_EAGAIN = enif_make_atom(env, "eagain");
ATOM_ENOMEM = enif_make_atom(env, "enomem");
ATOM_ENQUEUED = enif_make_atom(env, "enqueued");
ATOM_ERROR = enif_make_atom(env, "error");
ATOM_OK = enif_make_atom(env, "ok");
ATOM_SHUTDOWN = enif_make_atom(env, "shutdown");
/* Find out how many schedulers there are. */ /* Find out how many schedulers there are. */
enif_system_info(&info, sizeof(ErlNifSysInfo)); enif_system_info(&info, sizeof(ErlNifSysInfo));
@ -544,7 +568,7 @@ async_nif_load()
} }
/* Init our portion of priv_data's module-specific state. */ /* Init our portion of priv_data's module-specific state. */
async_nif = enif_alloc(sizeof(struct async_nif_state) + async_nif = malloc(sizeof(struct async_nif_state) +
sizeof(struct async_nif_work_queue) * num_queues); sizeof(struct async_nif_work_queue) * num_queues);
if (!async_nif) if (!async_nif)
return NULL; return NULL;
@ -556,15 +580,16 @@ async_nif_load()
async_nif->next_q = 0; async_nif->next_q = 0;
async_nif->shutdown = 0; async_nif->shutdown = 0;
STAILQ_INIT(&async_nif->recycled_reqs); STAILQ_INIT(&async_nif->recycled_reqs);
async_nif->recycled_req_mutex = enif_mutex_create(NULL); async_nif->recycled_req_mutex = enif_mutex_create("recycled_req");
async_nif->we_mutex = enif_mutex_create(NULL); async_nif->we_mutex = enif_mutex_create("we");
SLIST_INIT(&async_nif->we_joining); SLIST_INIT(&async_nif->we_joining);
for (i = 0; i < async_nif->num_queues; i++) { for (i = 0; i < async_nif->num_queues; i++) {
struct async_nif_work_queue *q = &async_nif->queues[i]; struct async_nif_work_queue *q = &async_nif->queues[i];
STAILQ_INIT(&q->reqs); STAILQ_INIT(&q->reqs);
q->reqs_mutex = enif_mutex_create(NULL); q->reqs_mutex = enif_mutex_create("reqs");
q->reqs_cnd = enif_cond_create(NULL); q->reqs_cnd = enif_cond_create("reqs");
q->next = &async_nif->queues[(i + 1) % num_queues];
} }
return async_nif; return async_nif;
} }

View file

@ -615,7 +615,7 @@ static int lmdb_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
/* Note: !!! the first element of our priv_data struct *must* be the /* Note: !!! the first element of our priv_data struct *must* be the
pointer to the async_nif's private data which we set here. */ pointer to the async_nif's private data which we set here. */
ASYNC_NIF_LOAD(lmdb, priv->async_nif_priv); ASYNC_NIF_LOAD(lmdb, env, priv->async_nif_priv);
if (!priv) if (!priv)
return ENOMEM; return ENOMEM;
*priv_data = priv; *priv_data = priv;

View file

@ -1,32 +1,38 @@
%% --------------------------------------------------------------------------- %% -------------------------------------------------------------------
%% %%
%% async_nif: An async thread-pool layer for Erlang's NIF API %% async_nif: An async thread-pool layer for Erlang's NIF API
%% %%
%% Copyright (c) 2012-2013 Basho Technologies, Inc. All Rights Reserved. %% Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved.
%% Author: Gregory Burd <greg@basho.com> <greg@burd.me> %% Author: Gregory Burd <greg@basho.com> <greg@burd.me>
%% %%
%% This file is provided to you under the Apache License, Version 2.0 (the %% This file is provided to you under the Apache License,
%% "License"); you may not use this file except in compliance with the License. %% Version 2.0 (the "License"); you may not use this file
%% You may obtain a copy of the License at: %% except in compliance with the License. You may obtain
%% a copy of the License at
%% %%
%% http://www.apache.org/licenses/LICENSE-2.0 %% http://www.apache.org/licenses/LICENSE-2.0
%% %%
%% Unless required by applicable law or agreed to in writing, software %% Unless required by applicable law or agreed to in writing,
%% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT %% software distributed under the License is distributed on an
%% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the %% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% License for the specific language governing permissions and limitations %% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License. %% under the License.
%% %%
%% --------------------------------------------------------------------------- %% -------------------------------------------------------------------
-spec async_nif_enqueue(reference(), function(), [term()]) -> term() | {error, term()}. -define(ASYNC_NIF_CALL(Fun, Args),
async_nif_enqueue(R, F, A) -> F = fun(F, T) ->
case erlang:apply(F, [R|A]) of R = erlang:make_ref(),
{ok, enqueued} -> case erlang:apply(Fun, [R|Args]) of
{ok, {enqueued, PctBusy}} ->
if
PctBusy > 0.25 andalso PctBusy =< 1.0 ->
erlang:bump_reductions(erlang:trunc(2000 * PctBusy));
true ->
ok
end,
receive receive
{R, {error, eagain}} ->
%% Work unit was not queued, try again.
async_nif_enqueue(R, F, A);
{R, {error, shutdown}=Error} -> {R, {error, shutdown}=Error} ->
%% Work unit was queued, but not executed. %% Work unit was queued, but not executed.
Error; Error;
@ -36,8 +42,13 @@ async_nif_enqueue(R, F, A) ->
{R, Reply} -> {R, Reply} ->
Reply Reply
end; end;
{error, eagain} ->
case T of
3 -> not_found;
_ -> F(F, T + 1)
end;
Other -> Other ->
Other Other
end. end
end,
-define(ASYNC_NIF_CALL(Fun, Args), async_nif_enqueue(erlang:make_ref(), Fun, Args)). F(F, 1)).