diff --git a/c_src/async_nif.h b/c_src/async_nif.h index b1800be..87bd3a6 100644 --- a/c_src/async_nif.h +++ b/c_src/async_nif.h @@ -37,6 +37,15 @@ extern "C" { #define ASYNC_NIF_WORKER_QUEUE_SIZE 8192 #define ASYNC_NIF_MAX_QUEUED_REQS ASYNC_NIF_WORKER_QUEUE_SIZE * ASYNC_NIF_MAX_WORKERS +/* Atoms (initialized in on_load) */ +static ERL_NIF_TERM ATOM_EAGAIN; +static ERL_NIF_TERM ATOM_ENOMEM; +static ERL_NIF_TERM ATOM_ENQUEUED; +static ERL_NIF_TERM ATOM_ERROR; +static ERL_NIF_TERM ATOM_OK; +static ERL_NIF_TERM ATOM_SHUTDOWN; + + struct async_nif_req_entry { ERL_NIF_TERM ref; ErlNifEnv *env; @@ -104,25 +113,20 @@ struct async_nif_state { argc -= 1; \ /* Note: !!! this assumes that the first element of priv_data is ours */ \ struct async_nif_state *async_nif = *(struct async_nif_state**)enif_priv_data(env); \ - if (async_nif->shutdown) { \ - return enif_make_tuple2(env, enif_make_atom(env, "error"), \ - enif_make_atom(env, "shutdown")); \ - } \ + if (async_nif->shutdown) \ + return enif_make_tuple2(env, ATOM_ERROR, ATOM_SHUTDOWN); \ req = async_nif_reuse_req(async_nif); \ - if (!req) { \ - return enif_make_tuple2(env, enif_make_atom(env, "error"), \ - enif_make_atom(env, "enomem")); \ - } \ + if (!req) \ + return enif_make_tuple2(env, ATOM_ERROR, ATOM_ENOMEM); \ new_env = req->env; \ DPRINTF("async_nif: calling \"%s\"", __func__); \ do pre_block while(0); \ DPRINTF("async_nif: returned from \"%s\"", __func__); \ - copy_of_args = (struct decl ## _args *)enif_alloc(sizeof(struct decl ## _args)); \ + copy_of_args = (struct decl ## _args *)malloc(sizeof(struct decl ## _args)); \ if (!copy_of_args) { \ fn_post_ ## decl (args); \ async_nif_recycle_req(req, async_nif); \ - return enif_make_tuple2(env, enif_make_atom(env, "error"), \ - enif_make_atom(env, "enomem")); \ + return enif_make_tuple2(env, ATOM_ERROR, ATOM_ENOMEM); \ } \ memcpy(copy_of_args, args, sizeof(struct decl ## _args)); \ req->ref = enif_make_copy(new_env, argv_in[0]); \ @@ -137,9 +141,8 @@ struct async_nif_state { if (!reply) { \ fn_post_ ## decl (args); \ async_nif_recycle_req(req, async_nif); \ - enif_free(copy_of_args); \ - return enif_make_tuple2(env, enif_make_atom(env, "error"), \ - enif_make_atom(env, "eagain")); \ + free(copy_of_args); \ + return enif_make_tuple2(env, ATOM_ERROR, ATOM_EAGAIN); \ } \ return reply; \ } @@ -147,11 +150,11 @@ struct async_nif_state { #define ASYNC_NIF_INIT(name) \ static ErlNifMutex *name##_async_nif_coord = NULL; -#define ASYNC_NIF_LOAD(name, priv) do { \ +#define ASYNC_NIF_LOAD(name, env, priv) do { \ if (!name##_async_nif_coord) \ name##_async_nif_coord = enif_mutex_create("nif_coord load"); \ enif_mutex_lock(name##_async_nif_coord); \ - priv = async_nif_load(); \ + priv = async_nif_load(env); \ enif_mutex_unlock(name##_async_nif_coord); \ } while(0); #define ASYNC_NIF_UNLOAD(name, env, priv) do { \ @@ -192,7 +195,7 @@ async_nif_reuse_req(struct async_nif_state *async_nif) enif_mutex_lock(async_nif->recycled_req_mutex); if (STAILQ_EMPTY(&async_nif->recycled_reqs)) { if (async_nif->num_reqs < ASYNC_NIF_MAX_QUEUED_REQS) { - req = enif_alloc(sizeof(struct async_nif_req_entry)); + req = malloc(sizeof(struct async_nif_req_entry)); if (req) { memset(req, 0, sizeof(struct async_nif_req_entry)); env = enif_alloc_env(); @@ -200,7 +203,7 @@ async_nif_reuse_req(struct async_nif_state *async_nif) req->env = env; __sync_fetch_and_add(&async_nif->num_reqs, 1); } else { - enif_free(req); + free(req); req = NULL; } } @@ -254,7 +257,7 @@ async_nif_start_worker(struct async_nif_state *async_nif, struct async_nif_work_ SLIST_REMOVE(&async_nif->we_joining, we, async_nif_worker_entry, entries); void *exit_value = 0; /* We ignore the thread_join's exit value. */ enif_thread_join(we->tid, &exit_value); - enif_free(we); + free(we); async_nif->we_active--; we = n; } @@ -264,7 +267,7 @@ async_nif_start_worker(struct async_nif_state *async_nif, struct async_nif_work_ return EAGAIN; } - we = enif_alloc(sizeof(struct async_nif_worker_entry)); + we = malloc(sizeof(struct async_nif_worker_entry)); if (!we) { enif_mutex_unlock(async_nif->we_mutex); return ENOMEM; @@ -323,10 +326,6 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en queue will be valid until we release the lock. */ q = &async_nif->queues[qid]; enif_mutex_lock(q->reqs_mutex); - if (async_nif->shutdown) { - enif_mutex_unlock(q->reqs_mutex); - return 0; - } /* Try not to enqueue a request into a queue that isn't keeping up with the request volume. */ @@ -360,9 +359,8 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en the req pointer (which will soon become invalid in another thread performing the request). */ double pct_full = (double)avg_depth / (double)ASYNC_NIF_WORKER_QUEUE_SIZE; - ERL_NIF_TERM reply = enif_make_tuple2(req->env, enif_make_atom(req->env, "ok"), - enif_make_tuple2(req->env, - enif_make_atom(req->env, "enqueued"), + ERL_NIF_TERM reply = enif_make_tuple2(req->env, ATOM_OK, + enif_make_tuple2(req->env, ATOM_ENQUEUED, enif_make_double(req->env, pct_full))); enif_cond_signal(q->reqs_cnd); enif_mutex_unlock(q->reqs_mutex); @@ -434,7 +432,7 @@ async_nif_worker_fn(void *arg) req->ref = 0; req->fn_work = 0; req->fn_post = 0; - enif_free(req->args); + free(req->args); req->args = NULL; async_nif_recycle_req(req, async_nif); req = NULL; @@ -486,7 +484,7 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif) SLIST_REMOVE(&async_nif->we_joining, we, async_nif_worker_entry, entries); void *exit_value = 0; /* We ignore the thread_join's exit value. */ enif_thread_join(we->tid, &exit_value); - enif_free(we); + free(we); async_nif->we_active--; we = n; } @@ -505,12 +503,11 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif) struct async_nif_req_entry *n = STAILQ_NEXT(req, entries); enif_clear_env(req->env); enif_send(NULL, &req->pid, req->env, - enif_make_tuple2(req->env, enif_make_atom(req->env, "error"), - enif_make_atom(req->env, "shutdown"))); + enif_make_tuple2(req->env, ATOM_ERROR, ATOM_SHUTDOWN)); req->fn_post(req->args); enif_free_env(req->env); - enif_free(req->args); - enif_free(req); + free(req->args); + free(req); req = n; } enif_mutex_destroy(q->reqs_mutex); @@ -524,18 +521,18 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif) while(req != NULL) { struct async_nif_req_entry *n = STAILQ_NEXT(req, entries); enif_free_env(req->env); - enif_free(req); + free(req); req = n; } enif_mutex_unlock(async_nif->recycled_req_mutex); enif_mutex_destroy(async_nif->recycled_req_mutex); memset(async_nif, 0, sizeof(struct async_nif_state) + (sizeof(struct async_nif_work_queue) * async_nif->num_queues)); - enif_free(async_nif); + free(async_nif); } static void * -async_nif_load() +async_nif_load(ErlNifEnv *env) { static int has_init = 0; unsigned int i, num_queues; @@ -546,6 +543,14 @@ async_nif_load() if (has_init) return 0; else has_init = 1; + /* Init some static references to commonly used atoms. */ + ATOM_EAGAIN = enif_make_atom(env, "eagain"); + ATOM_ENOMEM = enif_make_atom(env, "enomem"); + ATOM_ENQUEUED = enif_make_atom(env, "enqueued"); + ATOM_ERROR = enif_make_atom(env, "error"); + ATOM_OK = enif_make_atom(env, "ok"); + ATOM_SHUTDOWN = enif_make_atom(env, "shutdown"); + /* Find out how many schedulers there are. */ enif_system_info(&info, sizeof(ErlNifSysInfo)); @@ -563,8 +568,8 @@ async_nif_load() } /* Init our portion of priv_data's module-specific state. */ - async_nif = enif_alloc(sizeof(struct async_nif_state) + - sizeof(struct async_nif_work_queue) * num_queues); + async_nif = malloc(sizeof(struct async_nif_state) + + sizeof(struct async_nif_work_queue) * num_queues); if (!async_nif) return NULL; memset(async_nif, 0, sizeof(struct async_nif_state) +