diff --git a/c_src/async_nif.h b/c_src/async_nif.h index 6f17495..d25b35f 100644 --- a/c_src/async_nif.h +++ b/c_src/async_nif.h @@ -36,8 +36,9 @@ extern "C" { #define __UNUSED(v) ((void)(v)) #endif -#define ASYNC_NIF_MAX_WORKERS 128 +#define ASYNC_NIF_MAX_WORKERS 1024 #define ASYNC_NIF_WORKER_QUEUE_SIZE 500 +#define ASYNC_NIF_MAX_QUEUED_REQS 1000 * ASYNC_NIF_MAX_WORKERS DECL_FIFO_QUEUE(reqs, struct async_nif_req_entry); @@ -60,6 +61,9 @@ struct async_nif_state { struct async_nif_worker_entry worker_entries[ASYNC_NIF_MAX_WORKERS]; unsigned int num_queues; unsigned int next_q; + FIFO_QUEUE_TYPE(reqs) recycled_reqs; + unsigned int num_reqs; + ErlNifMutex *recycled_req_mutex; struct async_nif_work_queue queues[]; }; @@ -88,29 +92,19 @@ struct async_nif_state { if (async_nif->shutdown) \ return enif_make_tuple2(env, enif_make_atom(env, "error"), \ enif_make_atom(env, "shutdown")); \ - if (!(new_env = enif_alloc_env())) { /*TODO: cache, enif_clear();*/ \ + req = async_nif_reuse_req(async_nif); \ + new_env = req->env; \ + if (!req) \ return enif_make_tuple2(env, enif_make_atom(env, "error"), \ enif_make_atom(env, "enomem")); \ - } \ do pre_block while(0); \ - req = (struct async_nif_req_entry*)enif_alloc(sizeof(struct async_nif_req_entry)); \ - if (!req) { \ - fn_post_ ## decl (args); \ - enif_free_env(new_env); \ - return enif_make_tuple2(env, enif_make_atom(env, "error"), \ - enif_make_atom(env, "enomem")); \ - } \ - memset(req, 0, sizeof(struct async_nif_req_entry)); \ copy_of_args = (struct decl ## _args *)enif_alloc(sizeof(struct decl ## _args)); \ if (!copy_of_args) { \ fn_post_ ## decl (args); \ - enif_free(req); \ - enif_free_env(new_env); \ return enif_make_tuple2(env, enif_make_atom(env, "error"), \ enif_make_atom(env, "enomem")); \ } \ memcpy(copy_of_args, args, sizeof(struct decl ## _args)); \ - req->env = new_env; \ req->ref = enif_make_copy(new_env, argv_in[0]); \ enif_self(env, &req->pid); \ req->args = (void*)copy_of_args; \ @@ -122,8 +116,6 @@ struct async_nif_state { ERL_NIF_TERM reply = async_nif_enqueue_req(async_nif, req, h); \ if (!reply) { \ fn_post_ ## decl (args); \ - enif_free(req); \ - enif_free_env(new_env); \ enif_free(copy_of_args); \ return enif_make_tuple2(env, enif_make_atom(env, "error"), \ enif_make_atom(env, "shutdown")); \ @@ -158,15 +150,63 @@ struct async_nif_state { enif_mutex_unlock(name##_async_nif_coord); \ } while(0); -#define ASYNC_NIF_RETURN_BADARG() return enif_make_badarg(env); +#define ASYNC_NIF_RETURN_BADARG() do { \ + async_nif_recycle_req(req, async_nif); \ + return enif_make_badarg(env); \ + } while(0); #define ASYNC_NIF_WORK_ENV new_env #define ASYNC_NIF_REPLY(msg) enif_send(NULL, pid, env, enif_make_tuple2(env, ref, msg)) +/** + * Return a request structure from the recycled req queue if one exists, + * otherwise create one. + */ +struct async_nif_req_entry * +async_nif_reuse_req(struct async_nif_state *async_nif) +{ + struct async_nif_req_entry *req = NULL; + ErlNifEnv *env = NULL; + + enif_mutex_lock(async_nif->recycled_req_mutex); + if (fifo_q_empty(reqs, async_nif->recycled_reqs)) { + if (async_nif->num_reqs < ASYNC_NIF_MAX_QUEUED_REQS) { + req = enif_alloc(sizeof(struct async_nif_req_entry)); + if (req) { + memset(req, 0, sizeof(struct async_nif_req_entry)); + env = enif_alloc_env(); + if (!env) { + enif_free(req); + req = NULL; + } else { + req->env = env; + async_nif->num_reqs++; + } + } + } + } else { + req = fifo_q_get(reqs, async_nif->recycled_reqs); + } + enif_mutex_unlock(async_nif->recycled_req_mutex); + return req; +} + +/** + * Store the request for future re-use. + */ +void +async_nif_recycle_req(struct async_nif_req_entry *req, struct async_nif_state *async_nif) +{ + enif_mutex_lock(async_nif->recycled_req_mutex); + fifo_q_put(reqs, async_nif->recycled_reqs, req); + enif_mutex_unlock(async_nif->recycled_req_mutex); +} + /** * TODO: */ -static inline unsigned int async_nif_str_hash_func(const char *s) +static inline unsigned int +async_nif_str_hash_func(const char *s) { unsigned int h = (unsigned int)*s; if (h) for (++s ; *s; ++s) h = (h << 5) - h + (unsigned int)*s; @@ -180,15 +220,29 @@ static ERL_NIF_TERM async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_entry *req, int hint) { /* Identify the most appropriate worker for this request. */ - unsigned int qid = (hint >= 0) ? (unsigned int)hint : async_nif->next_q; + unsigned int qid = 0; struct async_nif_work_queue *q = NULL; + + /* Either we're choosing a queue based on some affinity/hinted value or we + need to select the next queue in the rotation and atomically update that + global value (next_q is shared across worker threads) . */ + if (hint >= 0) { + qid = (unsigned int)hint; + } else { + qid = async_nif->next_q; + qid = (qid + 1) % async_nif->num_queues; + async_nif->next_q = qid; + } + + /* Now we inspect and interate across the set of queues trying to select one + that isn't too full or too slow. */ do { q = &async_nif->queues[qid]; enif_mutex_lock(q->reqs_mutex); - /* Now that we hold the lock, check for shutdown. As long as we - hold this lock either a) we're shutting down so exit now or - b) this queue will be valid until we release the lock. */ + /* Now that we hold the lock, check for shutdown. As long as we hold + this lock either a) we're shutting down so exit now or b) this queue + will be valid until we release the lock. */ if (async_nif->shutdown) { enif_mutex_unlock(q->reqs_mutex); return 0; @@ -257,10 +311,14 @@ async_nif_worker_fn(void *arg) /* Now call the post-work cleanup function. */ req->fn_post(req->args); - /* Free resources allocated for this async request. */ - enif_free_env(req->env); + /* Clean up req for reuse. */ + req->ref = 0; + req->fn_work = 0; + req->fn_post = 0; enif_free(req->args); - enif_free(req); + req->args = NULL; + enif_clear_env(req->env); + async_nif_recycle_req(req, async_nif); req = NULL; } } @@ -274,6 +332,7 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif) unsigned int i; unsigned int num_queues = async_nif->num_queues; struct async_nif_work_queue *q = NULL; + struct async_nif_req_entry *req = NULL; __UNUSED(env); /* Signal the worker threads, stop what you're doing and exit. To @@ -299,12 +358,21 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif) enif_thread_join(async_nif->worker_entries[i].tid, &exit_value); } - /* Cleanup requests, mutexes and conditions in each work queue. */ + /* Free req structres sitting on the recycle queue. */ + enif_mutex_lock(async_nif->recycled_req_mutex); + req = NULL; + fifo_q_foreach(reqs, async_nif->recycled_reqs, req, { + enif_free_env(req->env); + enif_free(req); + }); + fifo_q_free(reqs, async_nif->recycled_reqs); + + /* Cleanup in-flight requests, mutexes and conditions in each work queue. */ for (i = 0; i < num_queues; i++) { q = &async_nif->queues[i]; /* Worker threads are stopped, now toss anything left in the queue. */ - struct async_nif_req_entry *req = NULL; + req = NULL; fifo_q_foreach(reqs, q->reqs, req, { enif_clear_env(req->env); enif_send(NULL, &req->pid, req->env, @@ -319,6 +387,9 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif) enif_mutex_destroy(q->reqs_mutex); enif_cond_destroy(q->reqs_cnd); } + + enif_mutex_unlock(async_nif->recycled_req_mutex); + enif_mutex_destroy(async_nif->recycled_req_mutex); memset(async_nif, 0, sizeof(struct async_nif_state) + (sizeof(struct async_nif_work_queue) * async_nif->num_queues)); enif_free(async_nif); } @@ -363,6 +434,8 @@ async_nif_load() async_nif->num_workers = ASYNC_NIF_MAX_WORKERS; // TODO: start with 2 per queue, then grow if needed async_nif->next_q = 0; async_nif->shutdown = 0; + async_nif->recycled_reqs = fifo_q_new(reqs, ASYNC_NIF_MAX_QUEUED_REQS); + async_nif->recycled_req_mutex = enif_mutex_create(NULL); for (i = 0; i < async_nif->num_queues; i++) { struct async_nif_work_queue *q = &async_nif->queues[i];