Reuse req and ErlNifEnv rather than re-alloc/use/free them for every request. #8
1 changed files with 100 additions and 27 deletions
|
@ -36,8 +36,9 @@ extern "C" {
|
|||
#define __UNUSED(v) ((void)(v))
|
||||
#endif
|
||||
|
||||
#define ASYNC_NIF_MAX_WORKERS 128
|
||||
#define ASYNC_NIF_MAX_WORKERS 1024
|
||||
#define ASYNC_NIF_WORKER_QUEUE_SIZE 500
|
||||
#define ASYNC_NIF_MAX_QUEUED_REQS 1000 * ASYNC_NIF_MAX_WORKERS
|
||||
|
||||
DECL_FIFO_QUEUE(reqs, struct async_nif_req_entry);
|
||||
|
||||
|
@ -60,6 +61,9 @@ struct async_nif_state {
|
|||
struct async_nif_worker_entry worker_entries[ASYNC_NIF_MAX_WORKERS];
|
||||
unsigned int num_queues;
|
||||
unsigned int next_q;
|
||||
FIFO_QUEUE_TYPE(reqs) recycled_reqs;
|
||||
unsigned int num_reqs;
|
||||
ErlNifMutex *recycled_req_mutex;
|
||||
struct async_nif_work_queue queues[];
|
||||
};
|
||||
|
||||
|
@ -88,29 +92,19 @@ struct async_nif_state {
|
|||
if (async_nif->shutdown) \
|
||||
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
|
||||
enif_make_atom(env, "shutdown")); \
|
||||
if (!(new_env = enif_alloc_env())) { /*TODO: cache, enif_clear();*/ \
|
||||
req = async_nif_reuse_req(async_nif); \
|
||||
new_env = req->env; \
|
||||
if (!req) \
|
||||
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
|
||||
enif_make_atom(env, "enomem")); \
|
||||
} \
|
||||
do pre_block while(0); \
|
||||
req = (struct async_nif_req_entry*)enif_alloc(sizeof(struct async_nif_req_entry)); \
|
||||
if (!req) { \
|
||||
fn_post_ ## decl (args); \
|
||||
enif_free_env(new_env); \
|
||||
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
|
||||
enif_make_atom(env, "enomem")); \
|
||||
} \
|
||||
memset(req, 0, sizeof(struct async_nif_req_entry)); \
|
||||
copy_of_args = (struct decl ## _args *)enif_alloc(sizeof(struct decl ## _args)); \
|
||||
if (!copy_of_args) { \
|
||||
fn_post_ ## decl (args); \
|
||||
enif_free(req); \
|
||||
enif_free_env(new_env); \
|
||||
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
|
||||
enif_make_atom(env, "enomem")); \
|
||||
} \
|
||||
memcpy(copy_of_args, args, sizeof(struct decl ## _args)); \
|
||||
req->env = new_env; \
|
||||
req->ref = enif_make_copy(new_env, argv_in[0]); \
|
||||
enif_self(env, &req->pid); \
|
||||
req->args = (void*)copy_of_args; \
|
||||
|
@ -122,8 +116,6 @@ struct async_nif_state {
|
|||
ERL_NIF_TERM reply = async_nif_enqueue_req(async_nif, req, h); \
|
||||
if (!reply) { \
|
||||
fn_post_ ## decl (args); \
|
||||
enif_free(req); \
|
||||
enif_free_env(new_env); \
|
||||
enif_free(copy_of_args); \
|
||||
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
|
||||
enif_make_atom(env, "shutdown")); \
|
||||
|
@ -158,15 +150,63 @@ struct async_nif_state {
|
|||
enif_mutex_unlock(name##_async_nif_coord); \
|
||||
} while(0);
|
||||
|
||||
#define ASYNC_NIF_RETURN_BADARG() return enif_make_badarg(env);
|
||||
#define ASYNC_NIF_RETURN_BADARG() do { \
|
||||
async_nif_recycle_req(req, async_nif); \
|
||||
return enif_make_badarg(env); \
|
||||
} while(0);
|
||||
#define ASYNC_NIF_WORK_ENV new_env
|
||||
|
||||
#define ASYNC_NIF_REPLY(msg) enif_send(NULL, pid, env, enif_make_tuple2(env, ref, msg))
|
||||
|
||||
/**
|
||||
* Return a request structure from the recycled req queue if one exists,
|
||||
* otherwise create one.
|
||||
*/
|
||||
struct async_nif_req_entry *
|
||||
async_nif_reuse_req(struct async_nif_state *async_nif)
|
||||
{
|
||||
struct async_nif_req_entry *req = NULL;
|
||||
ErlNifEnv *env = NULL;
|
||||
|
||||
enif_mutex_lock(async_nif->recycled_req_mutex);
|
||||
if (fifo_q_empty(reqs, async_nif->recycled_reqs)) {
|
||||
if (async_nif->num_reqs < ASYNC_NIF_MAX_QUEUED_REQS) {
|
||||
req = enif_alloc(sizeof(struct async_nif_req_entry));
|
||||
if (req) {
|
||||
memset(req, 0, sizeof(struct async_nif_req_entry));
|
||||
env = enif_alloc_env();
|
||||
if (!env) {
|
||||
enif_free(req);
|
||||
req = NULL;
|
||||
} else {
|
||||
req->env = env;
|
||||
async_nif->num_reqs++;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
req = fifo_q_get(reqs, async_nif->recycled_reqs);
|
||||
}
|
||||
enif_mutex_unlock(async_nif->recycled_req_mutex);
|
||||
return req;
|
||||
}
|
||||
|
||||
/**
|
||||
* Store the request for future re-use.
|
||||
*/
|
||||
void
|
||||
async_nif_recycle_req(struct async_nif_req_entry *req, struct async_nif_state *async_nif)
|
||||
{
|
||||
enif_mutex_lock(async_nif->recycled_req_mutex);
|
||||
fifo_q_put(reqs, async_nif->recycled_reqs, req);
|
||||
enif_mutex_unlock(async_nif->recycled_req_mutex);
|
||||
}
|
||||
|
||||
/**
|
||||
* TODO:
|
||||
*/
|
||||
static inline unsigned int async_nif_str_hash_func(const char *s)
|
||||
static inline unsigned int
|
||||
async_nif_str_hash_func(const char *s)
|
||||
{
|
||||
unsigned int h = (unsigned int)*s;
|
||||
if (h) for (++s ; *s; ++s) h = (h << 5) - h + (unsigned int)*s;
|
||||
|
@ -180,15 +220,29 @@ static ERL_NIF_TERM
|
|||
async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_entry *req, int hint)
|
||||
{
|
||||
/* Identify the most appropriate worker for this request. */
|
||||
unsigned int qid = (hint >= 0) ? (unsigned int)hint : async_nif->next_q;
|
||||
unsigned int qid = 0;
|
||||
struct async_nif_work_queue *q = NULL;
|
||||
|
||||
/* Either we're choosing a queue based on some affinity/hinted value or we
|
||||
need to select the next queue in the rotation and atomically update that
|
||||
global value (next_q is shared across worker threads) . */
|
||||
if (hint >= 0) {
|
||||
qid = (unsigned int)hint;
|
||||
} else {
|
||||
qid = async_nif->next_q;
|
||||
qid = (qid + 1) % async_nif->num_queues;
|
||||
async_nif->next_q = qid;
|
||||
}
|
||||
|
||||
/* Now we inspect and interate across the set of queues trying to select one
|
||||
that isn't too full or too slow. */
|
||||
do {
|
||||
q = &async_nif->queues[qid];
|
||||
enif_mutex_lock(q->reqs_mutex);
|
||||
|
||||
/* Now that we hold the lock, check for shutdown. As long as we
|
||||
hold this lock either a) we're shutting down so exit now or
|
||||
b) this queue will be valid until we release the lock. */
|
||||
/* Now that we hold the lock, check for shutdown. As long as we hold
|
||||
this lock either a) we're shutting down so exit now or b) this queue
|
||||
will be valid until we release the lock. */
|
||||
if (async_nif->shutdown) {
|
||||
enif_mutex_unlock(q->reqs_mutex);
|
||||
return 0;
|
||||
|
@ -257,10 +311,14 @@ async_nif_worker_fn(void *arg)
|
|||
/* Now call the post-work cleanup function. */
|
||||
req->fn_post(req->args);
|
||||
|
||||
/* Free resources allocated for this async request. */
|
||||
enif_free_env(req->env);
|
||||
/* Clean up req for reuse. */
|
||||
req->ref = 0;
|
||||
req->fn_work = 0;
|
||||
req->fn_post = 0;
|
||||
enif_free(req->args);
|
||||
enif_free(req);
|
||||
req->args = NULL;
|
||||
enif_clear_env(req->env);
|
||||
async_nif_recycle_req(req, async_nif);
|
||||
req = NULL;
|
||||
}
|
||||
}
|
||||
|
@ -274,6 +332,7 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
|
|||
unsigned int i;
|
||||
unsigned int num_queues = async_nif->num_queues;
|
||||
struct async_nif_work_queue *q = NULL;
|
||||
struct async_nif_req_entry *req = NULL;
|
||||
__UNUSED(env);
|
||||
|
||||
/* Signal the worker threads, stop what you're doing and exit. To
|
||||
|
@ -299,12 +358,21 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
|
|||
enif_thread_join(async_nif->worker_entries[i].tid, &exit_value);
|
||||
}
|
||||
|
||||
/* Cleanup requests, mutexes and conditions in each work queue. */
|
||||
/* Free req structres sitting on the recycle queue. */
|
||||
enif_mutex_lock(async_nif->recycled_req_mutex);
|
||||
req = NULL;
|
||||
fifo_q_foreach(reqs, async_nif->recycled_reqs, req, {
|
||||
enif_free_env(req->env);
|
||||
enif_free(req);
|
||||
});
|
||||
fifo_q_free(reqs, async_nif->recycled_reqs);
|
||||
|
||||
/* Cleanup in-flight requests, mutexes and conditions in each work queue. */
|
||||
for (i = 0; i < num_queues; i++) {
|
||||
q = &async_nif->queues[i];
|
||||
|
||||
/* Worker threads are stopped, now toss anything left in the queue. */
|
||||
struct async_nif_req_entry *req = NULL;
|
||||
req = NULL;
|
||||
fifo_q_foreach(reqs, q->reqs, req, {
|
||||
enif_clear_env(req->env);
|
||||
enif_send(NULL, &req->pid, req->env,
|
||||
|
@ -319,6 +387,9 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
|
|||
enif_mutex_destroy(q->reqs_mutex);
|
||||
enif_cond_destroy(q->reqs_cnd);
|
||||
}
|
||||
|
||||
enif_mutex_unlock(async_nif->recycled_req_mutex);
|
||||
enif_mutex_destroy(async_nif->recycled_req_mutex);
|
||||
memset(async_nif, 0, sizeof(struct async_nif_state) + (sizeof(struct async_nif_work_queue) * async_nif->num_queues));
|
||||
enif_free(async_nif);
|
||||
}
|
||||
|
@ -363,6 +434,8 @@ async_nif_load()
|
|||
async_nif->num_workers = ASYNC_NIF_MAX_WORKERS; // TODO: start with 2 per queue, then grow if needed
|
||||
async_nif->next_q = 0;
|
||||
async_nif->shutdown = 0;
|
||||
async_nif->recycled_reqs = fifo_q_new(reqs, ASYNC_NIF_MAX_QUEUED_REQS);
|
||||
async_nif->recycled_req_mutex = enif_mutex_create(NULL);
|
||||
|
||||
for (i = 0; i < async_nif->num_queues; i++) {
|
||||
struct async_nif_work_queue *q = &async_nif->queues[i];
|
||||
|
|
Loading…
Reference in a new issue