Use malloc/free rather than enif_alloc/enif_free so as to avoid BEAM allocator
overhead (bytes and time). Create static references to commonly used Erlang atoms to avoid overhead re-creating them on each request cycle.
This commit is contained in:
parent
83c3faf74f
commit
2ddf0da53e
1 changed files with 43 additions and 38 deletions
|
@ -37,6 +37,15 @@ extern "C" {
|
||||||
#define ASYNC_NIF_WORKER_QUEUE_SIZE 8192
|
#define ASYNC_NIF_WORKER_QUEUE_SIZE 8192
|
||||||
#define ASYNC_NIF_MAX_QUEUED_REQS ASYNC_NIF_WORKER_QUEUE_SIZE * ASYNC_NIF_MAX_WORKERS
|
#define ASYNC_NIF_MAX_QUEUED_REQS ASYNC_NIF_WORKER_QUEUE_SIZE * ASYNC_NIF_MAX_WORKERS
|
||||||
|
|
||||||
|
/* Atoms (initialized in on_load) */
|
||||||
|
static ERL_NIF_TERM ATOM_EAGAIN;
|
||||||
|
static ERL_NIF_TERM ATOM_ENOMEM;
|
||||||
|
static ERL_NIF_TERM ATOM_ENQUEUED;
|
||||||
|
static ERL_NIF_TERM ATOM_ERROR;
|
||||||
|
static ERL_NIF_TERM ATOM_OK;
|
||||||
|
static ERL_NIF_TERM ATOM_SHUTDOWN;
|
||||||
|
|
||||||
|
|
||||||
struct async_nif_req_entry {
|
struct async_nif_req_entry {
|
||||||
ERL_NIF_TERM ref;
|
ERL_NIF_TERM ref;
|
||||||
ErlNifEnv *env;
|
ErlNifEnv *env;
|
||||||
|
@ -104,25 +113,20 @@ struct async_nif_state {
|
||||||
argc -= 1; \
|
argc -= 1; \
|
||||||
/* Note: !!! this assumes that the first element of priv_data is ours */ \
|
/* Note: !!! this assumes that the first element of priv_data is ours */ \
|
||||||
struct async_nif_state *async_nif = *(struct async_nif_state**)enif_priv_data(env); \
|
struct async_nif_state *async_nif = *(struct async_nif_state**)enif_priv_data(env); \
|
||||||
if (async_nif->shutdown) { \
|
if (async_nif->shutdown) \
|
||||||
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
|
return enif_make_tuple2(env, ATOM_ERROR, ATOM_SHUTDOWN); \
|
||||||
enif_make_atom(env, "shutdown")); \
|
|
||||||
} \
|
|
||||||
req = async_nif_reuse_req(async_nif); \
|
req = async_nif_reuse_req(async_nif); \
|
||||||
if (!req) { \
|
if (!req) \
|
||||||
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
|
return enif_make_tuple2(env, ATOM_ERROR, ATOM_ENOMEM); \
|
||||||
enif_make_atom(env, "enomem")); \
|
|
||||||
} \
|
|
||||||
new_env = req->env; \
|
new_env = req->env; \
|
||||||
DPRINTF("async_nif: calling \"%s\"", __func__); \
|
DPRINTF("async_nif: calling \"%s\"", __func__); \
|
||||||
do pre_block while(0); \
|
do pre_block while(0); \
|
||||||
DPRINTF("async_nif: returned from \"%s\"", __func__); \
|
DPRINTF("async_nif: returned from \"%s\"", __func__); \
|
||||||
copy_of_args = (struct decl ## _args *)enif_alloc(sizeof(struct decl ## _args)); \
|
copy_of_args = (struct decl ## _args *)malloc(sizeof(struct decl ## _args)); \
|
||||||
if (!copy_of_args) { \
|
if (!copy_of_args) { \
|
||||||
fn_post_ ## decl (args); \
|
fn_post_ ## decl (args); \
|
||||||
async_nif_recycle_req(req, async_nif); \
|
async_nif_recycle_req(req, async_nif); \
|
||||||
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
|
return enif_make_tuple2(env, ATOM_ERROR, ATOM_ENOMEM); \
|
||||||
enif_make_atom(env, "enomem")); \
|
|
||||||
} \
|
} \
|
||||||
memcpy(copy_of_args, args, sizeof(struct decl ## _args)); \
|
memcpy(copy_of_args, args, sizeof(struct decl ## _args)); \
|
||||||
req->ref = enif_make_copy(new_env, argv_in[0]); \
|
req->ref = enif_make_copy(new_env, argv_in[0]); \
|
||||||
|
@ -137,9 +141,8 @@ struct async_nif_state {
|
||||||
if (!reply) { \
|
if (!reply) { \
|
||||||
fn_post_ ## decl (args); \
|
fn_post_ ## decl (args); \
|
||||||
async_nif_recycle_req(req, async_nif); \
|
async_nif_recycle_req(req, async_nif); \
|
||||||
enif_free(copy_of_args); \
|
free(copy_of_args); \
|
||||||
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
|
return enif_make_tuple2(env, ATOM_ERROR, ATOM_EAGAIN); \
|
||||||
enif_make_atom(env, "eagain")); \
|
|
||||||
} \
|
} \
|
||||||
return reply; \
|
return reply; \
|
||||||
}
|
}
|
||||||
|
@ -147,11 +150,11 @@ struct async_nif_state {
|
||||||
#define ASYNC_NIF_INIT(name) \
|
#define ASYNC_NIF_INIT(name) \
|
||||||
static ErlNifMutex *name##_async_nif_coord = NULL;
|
static ErlNifMutex *name##_async_nif_coord = NULL;
|
||||||
|
|
||||||
#define ASYNC_NIF_LOAD(name, priv) do { \
|
#define ASYNC_NIF_LOAD(name, env, priv) do { \
|
||||||
if (!name##_async_nif_coord) \
|
if (!name##_async_nif_coord) \
|
||||||
name##_async_nif_coord = enif_mutex_create("nif_coord load"); \
|
name##_async_nif_coord = enif_mutex_create("nif_coord load"); \
|
||||||
enif_mutex_lock(name##_async_nif_coord); \
|
enif_mutex_lock(name##_async_nif_coord); \
|
||||||
priv = async_nif_load(); \
|
priv = async_nif_load(env); \
|
||||||
enif_mutex_unlock(name##_async_nif_coord); \
|
enif_mutex_unlock(name##_async_nif_coord); \
|
||||||
} while(0);
|
} while(0);
|
||||||
#define ASYNC_NIF_UNLOAD(name, env, priv) do { \
|
#define ASYNC_NIF_UNLOAD(name, env, priv) do { \
|
||||||
|
@ -192,7 +195,7 @@ async_nif_reuse_req(struct async_nif_state *async_nif)
|
||||||
enif_mutex_lock(async_nif->recycled_req_mutex);
|
enif_mutex_lock(async_nif->recycled_req_mutex);
|
||||||
if (STAILQ_EMPTY(&async_nif->recycled_reqs)) {
|
if (STAILQ_EMPTY(&async_nif->recycled_reqs)) {
|
||||||
if (async_nif->num_reqs < ASYNC_NIF_MAX_QUEUED_REQS) {
|
if (async_nif->num_reqs < ASYNC_NIF_MAX_QUEUED_REQS) {
|
||||||
req = enif_alloc(sizeof(struct async_nif_req_entry));
|
req = malloc(sizeof(struct async_nif_req_entry));
|
||||||
if (req) {
|
if (req) {
|
||||||
memset(req, 0, sizeof(struct async_nif_req_entry));
|
memset(req, 0, sizeof(struct async_nif_req_entry));
|
||||||
env = enif_alloc_env();
|
env = enif_alloc_env();
|
||||||
|
@ -200,7 +203,7 @@ async_nif_reuse_req(struct async_nif_state *async_nif)
|
||||||
req->env = env;
|
req->env = env;
|
||||||
__sync_fetch_and_add(&async_nif->num_reqs, 1);
|
__sync_fetch_and_add(&async_nif->num_reqs, 1);
|
||||||
} else {
|
} else {
|
||||||
enif_free(req);
|
free(req);
|
||||||
req = NULL;
|
req = NULL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -254,7 +257,7 @@ async_nif_start_worker(struct async_nif_state *async_nif, struct async_nif_work_
|
||||||
SLIST_REMOVE(&async_nif->we_joining, we, async_nif_worker_entry, entries);
|
SLIST_REMOVE(&async_nif->we_joining, we, async_nif_worker_entry, entries);
|
||||||
void *exit_value = 0; /* We ignore the thread_join's exit value. */
|
void *exit_value = 0; /* We ignore the thread_join's exit value. */
|
||||||
enif_thread_join(we->tid, &exit_value);
|
enif_thread_join(we->tid, &exit_value);
|
||||||
enif_free(we);
|
free(we);
|
||||||
async_nif->we_active--;
|
async_nif->we_active--;
|
||||||
we = n;
|
we = n;
|
||||||
}
|
}
|
||||||
|
@ -264,7 +267,7 @@ async_nif_start_worker(struct async_nif_state *async_nif, struct async_nif_work_
|
||||||
return EAGAIN;
|
return EAGAIN;
|
||||||
}
|
}
|
||||||
|
|
||||||
we = enif_alloc(sizeof(struct async_nif_worker_entry));
|
we = malloc(sizeof(struct async_nif_worker_entry));
|
||||||
if (!we) {
|
if (!we) {
|
||||||
enif_mutex_unlock(async_nif->we_mutex);
|
enif_mutex_unlock(async_nif->we_mutex);
|
||||||
return ENOMEM;
|
return ENOMEM;
|
||||||
|
@ -323,10 +326,6 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
|
||||||
queue will be valid until we release the lock. */
|
queue will be valid until we release the lock. */
|
||||||
q = &async_nif->queues[qid];
|
q = &async_nif->queues[qid];
|
||||||
enif_mutex_lock(q->reqs_mutex);
|
enif_mutex_lock(q->reqs_mutex);
|
||||||
if (async_nif->shutdown) {
|
|
||||||
enif_mutex_unlock(q->reqs_mutex);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Try not to enqueue a request into a queue that isn't keeping up with
|
/* Try not to enqueue a request into a queue that isn't keeping up with
|
||||||
the request volume. */
|
the request volume. */
|
||||||
|
@ -360,9 +359,8 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
|
||||||
the req pointer (which will soon become invalid in another thread
|
the req pointer (which will soon become invalid in another thread
|
||||||
performing the request). */
|
performing the request). */
|
||||||
double pct_full = (double)avg_depth / (double)ASYNC_NIF_WORKER_QUEUE_SIZE;
|
double pct_full = (double)avg_depth / (double)ASYNC_NIF_WORKER_QUEUE_SIZE;
|
||||||
ERL_NIF_TERM reply = enif_make_tuple2(req->env, enif_make_atom(req->env, "ok"),
|
ERL_NIF_TERM reply = enif_make_tuple2(req->env, ATOM_OK,
|
||||||
enif_make_tuple2(req->env,
|
enif_make_tuple2(req->env, ATOM_ENQUEUED,
|
||||||
enif_make_atom(req->env, "enqueued"),
|
|
||||||
enif_make_double(req->env, pct_full)));
|
enif_make_double(req->env, pct_full)));
|
||||||
enif_cond_signal(q->reqs_cnd);
|
enif_cond_signal(q->reqs_cnd);
|
||||||
enif_mutex_unlock(q->reqs_mutex);
|
enif_mutex_unlock(q->reqs_mutex);
|
||||||
|
@ -434,7 +432,7 @@ async_nif_worker_fn(void *arg)
|
||||||
req->ref = 0;
|
req->ref = 0;
|
||||||
req->fn_work = 0;
|
req->fn_work = 0;
|
||||||
req->fn_post = 0;
|
req->fn_post = 0;
|
||||||
enif_free(req->args);
|
free(req->args);
|
||||||
req->args = NULL;
|
req->args = NULL;
|
||||||
async_nif_recycle_req(req, async_nif);
|
async_nif_recycle_req(req, async_nif);
|
||||||
req = NULL;
|
req = NULL;
|
||||||
|
@ -486,7 +484,7 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
|
||||||
SLIST_REMOVE(&async_nif->we_joining, we, async_nif_worker_entry, entries);
|
SLIST_REMOVE(&async_nif->we_joining, we, async_nif_worker_entry, entries);
|
||||||
void *exit_value = 0; /* We ignore the thread_join's exit value. */
|
void *exit_value = 0; /* We ignore the thread_join's exit value. */
|
||||||
enif_thread_join(we->tid, &exit_value);
|
enif_thread_join(we->tid, &exit_value);
|
||||||
enif_free(we);
|
free(we);
|
||||||
async_nif->we_active--;
|
async_nif->we_active--;
|
||||||
we = n;
|
we = n;
|
||||||
}
|
}
|
||||||
|
@ -505,12 +503,11 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
|
||||||
struct async_nif_req_entry *n = STAILQ_NEXT(req, entries);
|
struct async_nif_req_entry *n = STAILQ_NEXT(req, entries);
|
||||||
enif_clear_env(req->env);
|
enif_clear_env(req->env);
|
||||||
enif_send(NULL, &req->pid, req->env,
|
enif_send(NULL, &req->pid, req->env,
|
||||||
enif_make_tuple2(req->env, enif_make_atom(req->env, "error"),
|
enif_make_tuple2(req->env, ATOM_ERROR, ATOM_SHUTDOWN));
|
||||||
enif_make_atom(req->env, "shutdown")));
|
|
||||||
req->fn_post(req->args);
|
req->fn_post(req->args);
|
||||||
enif_free_env(req->env);
|
enif_free_env(req->env);
|
||||||
enif_free(req->args);
|
free(req->args);
|
||||||
enif_free(req);
|
free(req);
|
||||||
req = n;
|
req = n;
|
||||||
}
|
}
|
||||||
enif_mutex_destroy(q->reqs_mutex);
|
enif_mutex_destroy(q->reqs_mutex);
|
||||||
|
@ -524,18 +521,18 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
|
||||||
while(req != NULL) {
|
while(req != NULL) {
|
||||||
struct async_nif_req_entry *n = STAILQ_NEXT(req, entries);
|
struct async_nif_req_entry *n = STAILQ_NEXT(req, entries);
|
||||||
enif_free_env(req->env);
|
enif_free_env(req->env);
|
||||||
enif_free(req);
|
free(req);
|
||||||
req = n;
|
req = n;
|
||||||
}
|
}
|
||||||
|
|
||||||
enif_mutex_unlock(async_nif->recycled_req_mutex);
|
enif_mutex_unlock(async_nif->recycled_req_mutex);
|
||||||
enif_mutex_destroy(async_nif->recycled_req_mutex);
|
enif_mutex_destroy(async_nif->recycled_req_mutex);
|
||||||
memset(async_nif, 0, sizeof(struct async_nif_state) + (sizeof(struct async_nif_work_queue) * async_nif->num_queues));
|
memset(async_nif, 0, sizeof(struct async_nif_state) + (sizeof(struct async_nif_work_queue) * async_nif->num_queues));
|
||||||
enif_free(async_nif);
|
free(async_nif);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void *
|
static void *
|
||||||
async_nif_load()
|
async_nif_load(ErlNifEnv *env)
|
||||||
{
|
{
|
||||||
static int has_init = 0;
|
static int has_init = 0;
|
||||||
unsigned int i, num_queues;
|
unsigned int i, num_queues;
|
||||||
|
@ -546,6 +543,14 @@ async_nif_load()
|
||||||
if (has_init) return 0;
|
if (has_init) return 0;
|
||||||
else has_init = 1;
|
else has_init = 1;
|
||||||
|
|
||||||
|
/* Init some static references to commonly used atoms. */
|
||||||
|
ATOM_EAGAIN = enif_make_atom(env, "eagain");
|
||||||
|
ATOM_ENOMEM = enif_make_atom(env, "enomem");
|
||||||
|
ATOM_ENQUEUED = enif_make_atom(env, "enqueued");
|
||||||
|
ATOM_ERROR = enif_make_atom(env, "error");
|
||||||
|
ATOM_OK = enif_make_atom(env, "ok");
|
||||||
|
ATOM_SHUTDOWN = enif_make_atom(env, "shutdown");
|
||||||
|
|
||||||
/* Find out how many schedulers there are. */
|
/* Find out how many schedulers there are. */
|
||||||
enif_system_info(&info, sizeof(ErlNifSysInfo));
|
enif_system_info(&info, sizeof(ErlNifSysInfo));
|
||||||
|
|
||||||
|
@ -563,8 +568,8 @@ async_nif_load()
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Init our portion of priv_data's module-specific state. */
|
/* Init our portion of priv_data's module-specific state. */
|
||||||
async_nif = enif_alloc(sizeof(struct async_nif_state) +
|
async_nif = malloc(sizeof(struct async_nif_state) +
|
||||||
sizeof(struct async_nif_work_queue) * num_queues);
|
sizeof(struct async_nif_work_queue) * num_queues);
|
||||||
if (!async_nif)
|
if (!async_nif)
|
||||||
return NULL;
|
return NULL;
|
||||||
memset(async_nif, 0, sizeof(struct async_nif_state) +
|
memset(async_nif, 0, sizeof(struct async_nif_state) +
|
||||||
|
|
Loading…
Reference in a new issue