Integrate current version of async_nif which avoids some enif overhead.
This commit is contained in:
parent
194e1b3c87
commit
6d5daab80d
1 changed files with 86 additions and 88 deletions
|
@ -25,6 +25,7 @@ extern "C" {
|
|||
#endif
|
||||
|
||||
#include <assert.h>
|
||||
#include <pthread.h>
|
||||
|
||||
#include "queue.h"
|
||||
|
||||
|
@ -34,7 +35,7 @@ extern "C" {
|
|||
|
||||
#define ASYNC_NIF_MAX_WORKERS 1024
|
||||
#define ASYNC_NIF_MIN_WORKERS 2
|
||||
#define ASYNC_NIF_WORKER_QUEUE_SIZE 100
|
||||
#define ASYNC_NIF_WORKER_QUEUE_SIZE 256
|
||||
#define ASYNC_NIF_MAX_QUEUED_REQS ASYNC_NIF_WORKER_QUEUE_SIZE * ASYNC_NIF_MAX_WORKERS
|
||||
|
||||
struct async_nif_req_entry {
|
||||
|
@ -51,8 +52,8 @@ struct async_nif_req_entry {
|
|||
struct async_nif_work_queue {
|
||||
unsigned int num_workers;
|
||||
unsigned int depth;
|
||||
ErlNifMutex *reqs_mutex;
|
||||
ErlNifCond *reqs_cnd;
|
||||
pthread_mutex_t reqs_mutex;
|
||||
pthread_cond_t reqs_cnd;
|
||||
struct async_nif_work_queue *next;
|
||||
STAILQ_HEAD(reqs, async_nif_req_entry) reqs;
|
||||
};
|
||||
|
@ -67,14 +68,14 @@ struct async_nif_worker_entry {
|
|||
|
||||
struct async_nif_state {
|
||||
unsigned int shutdown;
|
||||
ErlNifMutex *we_mutex;
|
||||
pthread_mutex_t we_mutex;
|
||||
unsigned int we_active;
|
||||
SLIST_HEAD(joining, async_nif_worker_entry) we_joining;
|
||||
unsigned int num_queues;
|
||||
unsigned int next_q;
|
||||
STAILQ_HEAD(recycled_reqs, async_nif_req_entry) recycled_reqs;
|
||||
unsigned int num_reqs;
|
||||
ErlNifMutex *recycled_req_mutex;
|
||||
pthread_mutex_t recycled_req_mutex;
|
||||
struct async_nif_work_queue queues[];
|
||||
};
|
||||
|
||||
|
@ -117,7 +118,7 @@ struct async_nif_state {
|
|||
DPRINTF("async_nif: calling \"%s\"", __func__); \
|
||||
do pre_block while(0); \
|
||||
DPRINTF("async_nif: returned from \"%s\"", __func__); \
|
||||
copy_of_args = (struct decl ## _args *)enif_alloc(sizeof(struct decl ## _args)); \
|
||||
copy_of_args = (struct decl ## _args *)malloc(sizeof(struct decl ## _args)); \
|
||||
if (!copy_of_args) { \
|
||||
fn_post_ ## decl (args); \
|
||||
async_nif_recycle_req(req, async_nif); \
|
||||
|
@ -137,7 +138,7 @@ struct async_nif_state {
|
|||
if (!reply) { \
|
||||
fn_post_ ## decl (args); \
|
||||
async_nif_recycle_req(req, async_nif); \
|
||||
enif_free(copy_of_args); \
|
||||
free(copy_of_args); \
|
||||
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
|
||||
enif_make_atom(env, "eagain")); \
|
||||
} \
|
||||
|
@ -145,30 +146,23 @@ struct async_nif_state {
|
|||
}
|
||||
|
||||
#define ASYNC_NIF_INIT(name) \
|
||||
static ErlNifMutex *name##_async_nif_coord = NULL;
|
||||
static pthread_mutex_t name##_async_nif_coord = PTHREAD_MUTEX_INITIALIZER;
|
||||
|
||||
#define ASYNC_NIF_LOAD(name, priv) do { \
|
||||
if (!name##_async_nif_coord) \
|
||||
name##_async_nif_coord = enif_mutex_create("nif_coord load"); \
|
||||
enif_mutex_lock(name##_async_nif_coord); \
|
||||
pthread_mutex_lock(&name##_async_nif_coord); \
|
||||
priv = async_nif_load(); \
|
||||
enif_mutex_unlock(name##_async_nif_coord); \
|
||||
pthread_mutex_unlock(&name##_async_nif_coord); \
|
||||
} while(0);
|
||||
#define ASYNC_NIF_UNLOAD(name, env, priv) do { \
|
||||
if (!name##_async_nif_coord) \
|
||||
name##_async_nif_coord = enif_mutex_create("nif_coord unload"); \
|
||||
enif_mutex_lock(name##_async_nif_coord); \
|
||||
pthread_mutex_lock(&name##_async_nif_coord); \
|
||||
async_nif_unload(env, priv); \
|
||||
enif_mutex_unlock(name##_async_nif_coord); \
|
||||
enif_mutex_destroy(name##_async_nif_coord); \
|
||||
name##_async_nif_coord = NULL; \
|
||||
pthread_mutex_unlock(&name##_async_nif_coord); \
|
||||
pthread_mutex_destroy(&name##_async_nif_coord); \
|
||||
} while(0);
|
||||
#define ASYNC_NIF_UPGRADE(name, env) do { \
|
||||
if (!name##_async_nif_coord) \
|
||||
name##_async_nif_coord = enif_mutex_create("nif_coord upgrade"); \
|
||||
enif_mutex_lock(name##_async_nif_coord); \
|
||||
pthread_mutex_lock(&name##_async_nif_coord); \
|
||||
async_nif_upgrade(env); \
|
||||
enif_mutex_unlock(name##_async_nif_coord); \
|
||||
pthread_mutex_unlock(&name##_async_nif_coord); \
|
||||
} while(0);
|
||||
|
||||
#define ASYNC_NIF_RETURN_BADARG() do { \
|
||||
|
@ -189,18 +183,18 @@ async_nif_reuse_req(struct async_nif_state *async_nif)
|
|||
struct async_nif_req_entry *req = NULL;
|
||||
ErlNifEnv *env = NULL;
|
||||
|
||||
enif_mutex_lock(async_nif->recycled_req_mutex);
|
||||
pthread_mutex_lock(&async_nif->recycled_req_mutex);
|
||||
if (STAILQ_EMPTY(&async_nif->recycled_reqs)) {
|
||||
if (async_nif->num_reqs < ASYNC_NIF_MAX_QUEUED_REQS) {
|
||||
req = enif_alloc(sizeof(struct async_nif_req_entry));
|
||||
req = malloc(sizeof(struct async_nif_req_entry));
|
||||
if (req) {
|
||||
memset(req, 0, sizeof(struct async_nif_req_entry));
|
||||
env = enif_alloc_env();
|
||||
if (env) {
|
||||
req->env = env;
|
||||
__sync_fetch_and_add(&async_nif->num_reqs, 1);
|
||||
__sync_fetch_and_add(&async_nif->num_reqs, 1);
|
||||
} else {
|
||||
enif_free(req);
|
||||
free(req);
|
||||
req = NULL;
|
||||
}
|
||||
}
|
||||
|
@ -209,7 +203,7 @@ async_nif_reuse_req(struct async_nif_state *async_nif)
|
|||
req = STAILQ_FIRST(&async_nif->recycled_reqs);
|
||||
STAILQ_REMOVE(&async_nif->recycled_reqs, req, async_nif_req_entry, entries);
|
||||
}
|
||||
enif_mutex_unlock(async_nif->recycled_req_mutex);
|
||||
pthread_mutex_unlock(&async_nif->recycled_req_mutex);
|
||||
return req;
|
||||
}
|
||||
|
||||
|
@ -224,13 +218,13 @@ void
|
|||
async_nif_recycle_req(struct async_nif_req_entry *req, struct async_nif_state *async_nif)
|
||||
{
|
||||
ErlNifEnv *env = NULL;
|
||||
enif_mutex_lock(async_nif->recycled_req_mutex);
|
||||
pthread_mutex_lock(&async_nif->recycled_req_mutex);
|
||||
enif_clear_env(req->env);
|
||||
env = req->env;
|
||||
memset(req, 0, sizeof(struct async_nif_req_entry));
|
||||
req->env = env;
|
||||
STAILQ_INSERT_TAIL(&async_nif->recycled_reqs, req, entries);
|
||||
enif_mutex_unlock(async_nif->recycled_req_mutex);
|
||||
pthread_mutex_unlock(&async_nif->recycled_req_mutex);
|
||||
}
|
||||
|
||||
static void *async_nif_worker_fn(void *);
|
||||
|
@ -246,7 +240,7 @@ async_nif_start_worker(struct async_nif_state *async_nif, struct async_nif_work_
|
|||
if (0 == q)
|
||||
return EINVAL;
|
||||
|
||||
enif_mutex_lock(async_nif->we_mutex);
|
||||
pthread_mutex_lock(&async_nif->we_mutex);
|
||||
|
||||
we = SLIST_FIRST(&async_nif->we_joining);
|
||||
while(we != NULL) {
|
||||
|
@ -254,19 +248,19 @@ async_nif_start_worker(struct async_nif_state *async_nif, struct async_nif_work_
|
|||
SLIST_REMOVE(&async_nif->we_joining, we, async_nif_worker_entry, entries);
|
||||
void *exit_value = 0; /* We ignore the thread_join's exit value. */
|
||||
enif_thread_join(we->tid, &exit_value);
|
||||
enif_free(we);
|
||||
free(we);
|
||||
async_nif->we_active--;
|
||||
we = n;
|
||||
}
|
||||
|
||||
if (async_nif->we_active == ASYNC_NIF_MAX_WORKERS) {
|
||||
enif_mutex_unlock(async_nif->we_mutex);
|
||||
pthread_mutex_unlock(&async_nif->we_mutex);
|
||||
return EAGAIN;
|
||||
}
|
||||
|
||||
we = enif_alloc(sizeof(struct async_nif_worker_entry));
|
||||
we = malloc(sizeof(struct async_nif_worker_entry));
|
||||
if (!we) {
|
||||
enif_mutex_unlock(async_nif->we_mutex);
|
||||
pthread_mutex_unlock(&async_nif->we_mutex);
|
||||
return ENOMEM;
|
||||
}
|
||||
memset(we, 0, sizeof(struct async_nif_worker_entry));
|
||||
|
@ -274,7 +268,7 @@ async_nif_start_worker(struct async_nif_state *async_nif, struct async_nif_work_
|
|||
we->async_nif = async_nif;
|
||||
we->q = q;
|
||||
|
||||
enif_mutex_unlock(async_nif->we_mutex);
|
||||
pthread_mutex_unlock(&async_nif->we_mutex);
|
||||
return enif_thread_create(NULL,&we->tid, &async_nif_worker_fn, (void*)we, 0);
|
||||
}
|
||||
|
||||
|
@ -290,7 +284,7 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
|
|||
/* Identify the most appropriate worker for this request. */
|
||||
unsigned int i, last_qid, qid = 0;
|
||||
struct async_nif_work_queue *q = NULL;
|
||||
double avg_depth;
|
||||
double avg_depth = 0.0;
|
||||
|
||||
/* Either we're choosing a queue based on some affinity/hinted value or we
|
||||
need to select the next queue in the rotation and atomically update that
|
||||
|
@ -299,8 +293,8 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
|
|||
qid = (unsigned int)hint;
|
||||
} else {
|
||||
do {
|
||||
last_qid = __sync_fetch_and_add(&async_nif->next_q, 0);
|
||||
qid = (last_qid + 1) % async_nif->num_queues;
|
||||
last_qid = __sync_fetch_and_add(&async_nif->next_q, 0);
|
||||
qid = (last_qid + 1) % async_nif->num_queues;
|
||||
} while (!__sync_bool_compare_and_swap(&async_nif->next_q, last_qid, qid));
|
||||
}
|
||||
|
||||
|
@ -322,9 +316,9 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
|
|||
we hold this lock either a) we're shutting down so exit now or b) this
|
||||
queue will be valid until we release the lock. */
|
||||
q = &async_nif->queues[qid];
|
||||
enif_mutex_lock(q->reqs_mutex);
|
||||
pthread_mutex_lock(&q->reqs_mutex);
|
||||
if (async_nif->shutdown) {
|
||||
enif_mutex_unlock(q->reqs_mutex);
|
||||
pthread_mutex_unlock(&q->reqs_mutex);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -332,7 +326,7 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
|
|||
the request volume. */
|
||||
if (q->depth <= avg_depth) break;
|
||||
else {
|
||||
enif_mutex_unlock(q->reqs_mutex);
|
||||
pthread_mutex_unlock(&q->reqs_mutex);
|
||||
qid = (qid + 1) % async_nif->num_queues;
|
||||
}
|
||||
}
|
||||
|
@ -361,8 +355,8 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
|
|||
performing the request). */
|
||||
ERL_NIF_TERM reply = enif_make_tuple2(req->env, enif_make_atom(req->env, "ok"),
|
||||
enif_make_atom(req->env, "enqueued"));
|
||||
enif_cond_signal(q->reqs_cnd);
|
||||
enif_mutex_unlock(q->reqs_mutex);
|
||||
pthread_cond_signal(&q->reqs_cnd);
|
||||
pthread_mutex_unlock(&q->reqs_mutex);
|
||||
return reply;
|
||||
}
|
||||
|
||||
|
@ -383,28 +377,33 @@ async_nif_worker_fn(void *arg)
|
|||
|
||||
for(;;) {
|
||||
/* Examine the request queue, are there things to be done? */
|
||||
enif_mutex_lock(q->reqs_mutex);
|
||||
pthread_mutex_lock(&q->reqs_mutex);
|
||||
check_again_for_work:
|
||||
if (async_nif->shutdown) {
|
||||
enif_mutex_unlock(q->reqs_mutex);
|
||||
pthread_mutex_unlock(&q->reqs_mutex);
|
||||
break;
|
||||
}
|
||||
if (STAILQ_EMPTY(&q->reqs)) {
|
||||
/* Queue is empty so we wait for more work to arrive. */
|
||||
if (q->num_workers > ASYNC_NIF_MIN_WORKERS) {
|
||||
enif_mutex_unlock(q->reqs_mutex);
|
||||
if (tries == 0 && q == we->q) break; // we've tried all queues, thread exit
|
||||
else {
|
||||
tries--;
|
||||
__sync_fetch_and_add(&q->num_workers, -1);
|
||||
q = q->next;
|
||||
__sync_fetch_and_add(&q->num_workers, 1);
|
||||
continue; // try another queue
|
||||
}
|
||||
} else {
|
||||
enif_cond_wait(q->reqs_cnd, q->reqs_mutex);
|
||||
goto check_again_for_work;
|
||||
}
|
||||
pthread_mutex_unlock(&q->reqs_mutex);
|
||||
if (tries == 0 && q == we->q) {
|
||||
if (q->num_workers > ASYNC_NIF_MIN_WORKERS) {
|
||||
/* At this point we've tried to find/execute work on all queues
|
||||
* and there are at least MIN_WORKERS on this queue so we
|
||||
* leaving this loop (break) which leads to a thread exit/join. */
|
||||
break;
|
||||
} else {
|
||||
pthread_mutex_lock(&q->reqs_mutex);
|
||||
pthread_cond_wait(&q->reqs_cnd, &q->reqs_mutex);
|
||||
goto check_again_for_work;
|
||||
}
|
||||
} else {
|
||||
tries--;
|
||||
__sync_fetch_and_add(&q->num_workers, -1);
|
||||
q = q->next;
|
||||
__sync_fetch_and_add(&q->num_workers, 1);
|
||||
continue; // try next queue
|
||||
}
|
||||
} else {
|
||||
/* At this point the next req is ours to process and we hold the
|
||||
reqs_mutex lock. Take the request off the queue. */
|
||||
|
@ -412,10 +411,9 @@ async_nif_worker_fn(void *arg)
|
|||
STAILQ_REMOVE(&q->reqs, req, async_nif_req_entry, entries);
|
||||
__sync_fetch_and_add(&q->depth, -1);
|
||||
|
||||
/* Ensure that there is at least one other worker thread watching this
|
||||
queue. */
|
||||
enif_cond_signal(q->reqs_cnd);
|
||||
enif_mutex_unlock(q->reqs_mutex);
|
||||
/* Wake up other worker thread watching this queue to help process work. */
|
||||
pthread_cond_signal(&q->reqs_cnd);
|
||||
pthread_mutex_unlock(&q->reqs_mutex);
|
||||
|
||||
/* Perform the work. */
|
||||
req->fn_work(req->env, req->ref, &req->pid, worker_id, req->args);
|
||||
|
@ -427,15 +425,15 @@ async_nif_worker_fn(void *arg)
|
|||
req->ref = 0;
|
||||
req->fn_work = 0;
|
||||
req->fn_post = 0;
|
||||
enif_free(req->args);
|
||||
free(req->args);
|
||||
req->args = NULL;
|
||||
async_nif_recycle_req(req, async_nif);
|
||||
req = NULL;
|
||||
}
|
||||
}
|
||||
enif_mutex_lock(async_nif->we_mutex);
|
||||
pthread_mutex_lock(&async_nif->we_mutex);
|
||||
SLIST_INSERT_HEAD(&async_nif->we_joining, we, entries);
|
||||
enif_mutex_unlock(async_nif->we_mutex);
|
||||
pthread_mutex_unlock(&async_nif->we_mutex);
|
||||
__sync_fetch_and_add(&q->num_workers, -1);
|
||||
enif_thread_exit(0);
|
||||
return 0;
|
||||
|
@ -458,34 +456,34 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
|
|||
if not shutting down. */
|
||||
for (i = 0; i < num_queues; i++) {
|
||||
q = &async_nif->queues[i];
|
||||
enif_mutex_lock(q->reqs_mutex);
|
||||
pthread_mutex_lock(&q->reqs_mutex);
|
||||
}
|
||||
/* Set the shutdown flag so that worker threads will no continue
|
||||
executing requests. */
|
||||
async_nif->shutdown = 1;
|
||||
for (i = 0; i < num_queues; i++) {
|
||||
q = &async_nif->queues[i];
|
||||
enif_mutex_unlock(q->reqs_mutex);
|
||||
pthread_mutex_unlock(&q->reqs_mutex);
|
||||
}
|
||||
|
||||
/* Join for the now exiting worker threads. */
|
||||
while(async_nif->we_active > 0) {
|
||||
for (i = 0; i < num_queues; i++)
|
||||
enif_cond_broadcast(async_nif->queues[i].reqs_cnd);
|
||||
enif_mutex_lock(async_nif->we_mutex);
|
||||
pthread_cond_broadcast(&async_nif->queues[i].reqs_cnd);
|
||||
pthread_mutex_lock(&async_nif->we_mutex);
|
||||
we = SLIST_FIRST(&async_nif->we_joining);
|
||||
while(we != NULL) {
|
||||
struct async_nif_worker_entry *n = SLIST_NEXT(we, entries);
|
||||
SLIST_REMOVE(&async_nif->we_joining, we, async_nif_worker_entry, entries);
|
||||
void *exit_value = 0; /* We ignore the thread_join's exit value. */
|
||||
enif_thread_join(we->tid, &exit_value);
|
||||
enif_free(we);
|
||||
free(we);
|
||||
async_nif->we_active--;
|
||||
we = n;
|
||||
}
|
||||
enif_mutex_unlock(async_nif->we_mutex);
|
||||
pthread_mutex_unlock(&async_nif->we_mutex);
|
||||
}
|
||||
enif_mutex_destroy(async_nif->we_mutex);
|
||||
pthread_mutex_destroy(&async_nif->we_mutex);
|
||||
|
||||
/* Cleanup in-flight requests, mutexes and conditions in each work queue. */
|
||||
for (i = 0; i < num_queues; i++) {
|
||||
|
@ -502,29 +500,29 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
|
|||
enif_make_atom(req->env, "shutdown")));
|
||||
req->fn_post(req->args);
|
||||
enif_free_env(req->env);
|
||||
enif_free(req->args);
|
||||
enif_free(req);
|
||||
free(req->args);
|
||||
free(req);
|
||||
req = n;
|
||||
}
|
||||
enif_mutex_destroy(q->reqs_mutex);
|
||||
enif_cond_destroy(q->reqs_cnd);
|
||||
pthread_mutex_destroy(&q->reqs_mutex);
|
||||
pthread_cond_destroy(&q->reqs_cnd);
|
||||
}
|
||||
|
||||
/* Free any req structures sitting unused on the recycle queue. */
|
||||
enif_mutex_lock(async_nif->recycled_req_mutex);
|
||||
pthread_mutex_lock(&async_nif->recycled_req_mutex);
|
||||
req = NULL;
|
||||
req = STAILQ_FIRST(&async_nif->recycled_reqs);
|
||||
while(req != NULL) {
|
||||
struct async_nif_req_entry *n = STAILQ_NEXT(req, entries);
|
||||
enif_free_env(req->env);
|
||||
enif_free(req);
|
||||
free(req);
|
||||
req = n;
|
||||
}
|
||||
|
||||
enif_mutex_unlock(async_nif->recycled_req_mutex);
|
||||
enif_mutex_destroy(async_nif->recycled_req_mutex);
|
||||
pthread_mutex_unlock(&async_nif->recycled_req_mutex);
|
||||
pthread_mutex_destroy(&async_nif->recycled_req_mutex);
|
||||
memset(async_nif, 0, sizeof(struct async_nif_state) + (sizeof(struct async_nif_work_queue) * async_nif->num_queues));
|
||||
enif_free(async_nif);
|
||||
free(async_nif);
|
||||
}
|
||||
|
||||
static void *
|
||||
|
@ -556,8 +554,8 @@ async_nif_load()
|
|||
}
|
||||
|
||||
/* Init our portion of priv_data's module-specific state. */
|
||||
async_nif = enif_alloc(sizeof(struct async_nif_state) +
|
||||
sizeof(struct async_nif_work_queue) * num_queues);
|
||||
async_nif = malloc(sizeof(struct async_nif_state) +
|
||||
sizeof(struct async_nif_work_queue) * num_queues);
|
||||
if (!async_nif)
|
||||
return NULL;
|
||||
memset(async_nif, 0, sizeof(struct async_nif_state) +
|
||||
|
@ -568,15 +566,15 @@ async_nif_load()
|
|||
async_nif->next_q = 0;
|
||||
async_nif->shutdown = 0;
|
||||
STAILQ_INIT(&async_nif->recycled_reqs);
|
||||
async_nif->recycled_req_mutex = enif_mutex_create("recycled_req");
|
||||
async_nif->we_mutex = enif_mutex_create("we");
|
||||
pthread_mutex_init(&async_nif->recycled_req_mutex, 0);
|
||||
pthread_mutex_init(&async_nif->we_mutex, 0);
|
||||
SLIST_INIT(&async_nif->we_joining);
|
||||
|
||||
for (i = 0; i < async_nif->num_queues; i++) {
|
||||
struct async_nif_work_queue *q = &async_nif->queues[i];
|
||||
STAILQ_INIT(&q->reqs);
|
||||
q->reqs_mutex = enif_mutex_create("reqs");
|
||||
q->reqs_cnd = enif_cond_create("reqs");
|
||||
pthread_mutex_init(&q->reqs_mutex, 0);
|
||||
pthread_cond_init(&q->reqs_cnd, 0);
|
||||
q->next = &async_nif->queues[(i + 1) % num_queues];
|
||||
}
|
||||
return async_nif;
|
||||
|
|
Loading…
Reference in a new issue