Merge pull request #8 from basho-labs/gsb-recycle-reqs

Reuse req and ErlNifEnv rather than re-alloc/use/free them for every request.
This commit is contained in:
Gregory Burd 2013-04-25 10:26:16 -07:00
commit f043a99ccb

View file

@ -36,8 +36,9 @@ extern "C" {
#define __UNUSED(v) ((void)(v)) #define __UNUSED(v) ((void)(v))
#endif #endif
#define ASYNC_NIF_MAX_WORKERS 128 #define ASYNC_NIF_MAX_WORKERS 1024
#define ASYNC_NIF_WORKER_QUEUE_SIZE 500 #define ASYNC_NIF_WORKER_QUEUE_SIZE 500
#define ASYNC_NIF_MAX_QUEUED_REQS 1000 * ASYNC_NIF_MAX_WORKERS
DECL_FIFO_QUEUE(reqs, struct async_nif_req_entry); DECL_FIFO_QUEUE(reqs, struct async_nif_req_entry);
@ -60,6 +61,9 @@ struct async_nif_state {
struct async_nif_worker_entry worker_entries[ASYNC_NIF_MAX_WORKERS]; struct async_nif_worker_entry worker_entries[ASYNC_NIF_MAX_WORKERS];
unsigned int num_queues; unsigned int num_queues;
unsigned int next_q; unsigned int next_q;
FIFO_QUEUE_TYPE(reqs) recycled_reqs;
unsigned int num_reqs;
ErlNifMutex *recycled_req_mutex;
struct async_nif_work_queue queues[]; struct async_nif_work_queue queues[];
}; };
@ -88,29 +92,19 @@ struct async_nif_state {
if (async_nif->shutdown) \ if (async_nif->shutdown) \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \ return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "shutdown")); \ enif_make_atom(env, "shutdown")); \
if (!(new_env = enif_alloc_env())) { /*TODO: cache, enif_clear();*/ \ req = async_nif_reuse_req(async_nif); \
new_env = req->env; \
if (!req) \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \ return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "enomem")); \ enif_make_atom(env, "enomem")); \
} \
do pre_block while(0); \ do pre_block while(0); \
req = (struct async_nif_req_entry*)enif_alloc(sizeof(struct async_nif_req_entry)); \
if (!req) { \
fn_post_ ## decl (args); \
enif_free_env(new_env); \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "enomem")); \
} \
memset(req, 0, sizeof(struct async_nif_req_entry)); \
copy_of_args = (struct decl ## _args *)enif_alloc(sizeof(struct decl ## _args)); \ copy_of_args = (struct decl ## _args *)enif_alloc(sizeof(struct decl ## _args)); \
if (!copy_of_args) { \ if (!copy_of_args) { \
fn_post_ ## decl (args); \ fn_post_ ## decl (args); \
enif_free(req); \
enif_free_env(new_env); \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \ return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "enomem")); \ enif_make_atom(env, "enomem")); \
} \ } \
memcpy(copy_of_args, args, sizeof(struct decl ## _args)); \ memcpy(copy_of_args, args, sizeof(struct decl ## _args)); \
req->env = new_env; \
req->ref = enif_make_copy(new_env, argv_in[0]); \ req->ref = enif_make_copy(new_env, argv_in[0]); \
enif_self(env, &req->pid); \ enif_self(env, &req->pid); \
req->args = (void*)copy_of_args; \ req->args = (void*)copy_of_args; \
@ -122,8 +116,6 @@ struct async_nif_state {
ERL_NIF_TERM reply = async_nif_enqueue_req(async_nif, req, h); \ ERL_NIF_TERM reply = async_nif_enqueue_req(async_nif, req, h); \
if (!reply) { \ if (!reply) { \
fn_post_ ## decl (args); \ fn_post_ ## decl (args); \
enif_free(req); \
enif_free_env(new_env); \
enif_free(copy_of_args); \ enif_free(copy_of_args); \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \ return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "shutdown")); \ enif_make_atom(env, "shutdown")); \
@ -158,15 +150,63 @@ struct async_nif_state {
enif_mutex_unlock(name##_async_nif_coord); \ enif_mutex_unlock(name##_async_nif_coord); \
} while(0); } while(0);
#define ASYNC_NIF_RETURN_BADARG() return enif_make_badarg(env); #define ASYNC_NIF_RETURN_BADARG() do { \
async_nif_recycle_req(req, async_nif); \
return enif_make_badarg(env); \
} while(0);
#define ASYNC_NIF_WORK_ENV new_env #define ASYNC_NIF_WORK_ENV new_env
#define ASYNC_NIF_REPLY(msg) enif_send(NULL, pid, env, enif_make_tuple2(env, ref, msg)) #define ASYNC_NIF_REPLY(msg) enif_send(NULL, pid, env, enif_make_tuple2(env, ref, msg))
/**
* Return a request structure from the recycled req queue if one exists,
* otherwise create one.
*/
struct async_nif_req_entry *
async_nif_reuse_req(struct async_nif_state *async_nif)
{
struct async_nif_req_entry *req = NULL;
ErlNifEnv *env = NULL;
enif_mutex_lock(async_nif->recycled_req_mutex);
if (fifo_q_empty(reqs, async_nif->recycled_reqs)) {
if (async_nif->num_reqs < ASYNC_NIF_MAX_QUEUED_REQS) {
req = enif_alloc(sizeof(struct async_nif_req_entry));
if (req) {
memset(req, 0, sizeof(struct async_nif_req_entry));
env = enif_alloc_env();
if (!env) {
enif_free(req);
req = NULL;
} else {
req->env = env;
async_nif->num_reqs++;
}
}
}
} else {
req = fifo_q_get(reqs, async_nif->recycled_reqs);
}
enif_mutex_unlock(async_nif->recycled_req_mutex);
return req;
}
/**
* Store the request for future re-use.
*/
void
async_nif_recycle_req(struct async_nif_req_entry *req, struct async_nif_state *async_nif)
{
enif_mutex_lock(async_nif->recycled_req_mutex);
fifo_q_put(reqs, async_nif->recycled_reqs, req);
enif_mutex_unlock(async_nif->recycled_req_mutex);
}
/** /**
* TODO: * TODO:
*/ */
static inline unsigned int async_nif_str_hash_func(const char *s) static inline unsigned int
async_nif_str_hash_func(const char *s)
{ {
unsigned int h = (unsigned int)*s; unsigned int h = (unsigned int)*s;
if (h) for (++s ; *s; ++s) h = (h << 5) - h + (unsigned int)*s; if (h) for (++s ; *s; ++s) h = (h << 5) - h + (unsigned int)*s;
@ -180,15 +220,29 @@ static ERL_NIF_TERM
async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_entry *req, int hint) async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_entry *req, int hint)
{ {
/* Identify the most appropriate worker for this request. */ /* Identify the most appropriate worker for this request. */
unsigned int qid = (hint >= 0) ? (unsigned int)hint : async_nif->next_q; unsigned int qid = 0;
struct async_nif_work_queue *q = NULL; struct async_nif_work_queue *q = NULL;
/* Either we're choosing a queue based on some affinity/hinted value or we
need to select the next queue in the rotation and atomically update that
global value (next_q is shared across worker threads) . */
if (hint >= 0) {
qid = (unsigned int)hint;
} else {
qid = async_nif->next_q;
qid = (qid + 1) % async_nif->num_queues;
async_nif->next_q = qid;
}
/* Now we inspect and interate across the set of queues trying to select one
that isn't too full or too slow. */
do { do {
q = &async_nif->queues[qid]; q = &async_nif->queues[qid];
enif_mutex_lock(q->reqs_mutex); enif_mutex_lock(q->reqs_mutex);
/* Now that we hold the lock, check for shutdown. As long as we /* Now that we hold the lock, check for shutdown. As long as we hold
hold this lock either a) we're shutting down so exit now or this lock either a) we're shutting down so exit now or b) this queue
b) this queue will be valid until we release the lock. */ will be valid until we release the lock. */
if (async_nif->shutdown) { if (async_nif->shutdown) {
enif_mutex_unlock(q->reqs_mutex); enif_mutex_unlock(q->reqs_mutex);
return 0; return 0;
@ -257,10 +311,14 @@ async_nif_worker_fn(void *arg)
/* Now call the post-work cleanup function. */ /* Now call the post-work cleanup function. */
req->fn_post(req->args); req->fn_post(req->args);
/* Free resources allocated for this async request. */ /* Clean up req for reuse. */
enif_free_env(req->env); req->ref = 0;
req->fn_work = 0;
req->fn_post = 0;
enif_free(req->args); enif_free(req->args);
enif_free(req); req->args = NULL;
enif_clear_env(req->env);
async_nif_recycle_req(req, async_nif);
req = NULL; req = NULL;
} }
} }
@ -274,6 +332,7 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
unsigned int i; unsigned int i;
unsigned int num_queues = async_nif->num_queues; unsigned int num_queues = async_nif->num_queues;
struct async_nif_work_queue *q = NULL; struct async_nif_work_queue *q = NULL;
struct async_nif_req_entry *req = NULL;
__UNUSED(env); __UNUSED(env);
/* Signal the worker threads, stop what you're doing and exit. To /* Signal the worker threads, stop what you're doing and exit. To
@ -299,12 +358,21 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
enif_thread_join(async_nif->worker_entries[i].tid, &exit_value); enif_thread_join(async_nif->worker_entries[i].tid, &exit_value);
} }
/* Cleanup requests, mutexes and conditions in each work queue. */ /* Free req structres sitting on the recycle queue. */
enif_mutex_lock(async_nif->recycled_req_mutex);
req = NULL;
fifo_q_foreach(reqs, async_nif->recycled_reqs, req, {
enif_free_env(req->env);
enif_free(req);
});
fifo_q_free(reqs, async_nif->recycled_reqs);
/* Cleanup in-flight requests, mutexes and conditions in each work queue. */
for (i = 0; i < num_queues; i++) { for (i = 0; i < num_queues; i++) {
q = &async_nif->queues[i]; q = &async_nif->queues[i];
/* Worker threads are stopped, now toss anything left in the queue. */ /* Worker threads are stopped, now toss anything left in the queue. */
struct async_nif_req_entry *req = NULL; req = NULL;
fifo_q_foreach(reqs, q->reqs, req, { fifo_q_foreach(reqs, q->reqs, req, {
enif_clear_env(req->env); enif_clear_env(req->env);
enif_send(NULL, &req->pid, req->env, enif_send(NULL, &req->pid, req->env,
@ -319,6 +387,9 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
enif_mutex_destroy(q->reqs_mutex); enif_mutex_destroy(q->reqs_mutex);
enif_cond_destroy(q->reqs_cnd); enif_cond_destroy(q->reqs_cnd);
} }
enif_mutex_unlock(async_nif->recycled_req_mutex);
enif_mutex_destroy(async_nif->recycled_req_mutex);
memset(async_nif, 0, sizeof(struct async_nif_state) + (sizeof(struct async_nif_work_queue) * async_nif->num_queues)); memset(async_nif, 0, sizeof(struct async_nif_state) + (sizeof(struct async_nif_work_queue) * async_nif->num_queues));
enif_free(async_nif); enif_free(async_nif);
} }
@ -363,6 +434,8 @@ async_nif_load()
async_nif->num_workers = ASYNC_NIF_MAX_WORKERS; // TODO: start with 2 per queue, then grow if needed async_nif->num_workers = ASYNC_NIF_MAX_WORKERS; // TODO: start with 2 per queue, then grow if needed
async_nif->next_q = 0; async_nif->next_q = 0;
async_nif->shutdown = 0; async_nif->shutdown = 0;
async_nif->recycled_reqs = fifo_q_new(reqs, ASYNC_NIF_MAX_QUEUED_REQS);
async_nif->recycled_req_mutex = enif_mutex_create(NULL);
for (i = 0; i < async_nif->num_queues; i++) { for (i = 0; i < async_nif->num_queues; i++) {
struct async_nif_work_queue *q = &async_nif->queues[i]; struct async_nif_work_queue *q = &async_nif->queues[i];