Replace all enif implementations of mutexes and conditions with their POSIX pthread equivalent on the theory that we don't want to be bumping heads with the Erlang runtime.

This commit is contained in:
Gregory Burd 2013-08-02 14:19:44 -04:00
parent 6919515de5
commit e9b1a9ea0b

View file

@ -25,6 +25,7 @@ extern "C" {
#endif #endif
#include <assert.h> #include <assert.h>
#include <pthread.h>
#include "queue.h" #include "queue.h"
@ -51,8 +52,8 @@ struct async_nif_req_entry {
struct async_nif_work_queue { struct async_nif_work_queue {
unsigned int num_workers; unsigned int num_workers;
unsigned int depth; unsigned int depth;
ErlNifMutex *reqs_mutex; pthread_mutex_t reqs_mutex;
ErlNifCond *reqs_cnd; pthread_cond_t reqs_cnd;
struct async_nif_work_queue *next; struct async_nif_work_queue *next;
STAILQ_HEAD(reqs, async_nif_req_entry) reqs; STAILQ_HEAD(reqs, async_nif_req_entry) reqs;
}; };
@ -67,14 +68,14 @@ struct async_nif_worker_entry {
struct async_nif_state { struct async_nif_state {
unsigned int shutdown; unsigned int shutdown;
ErlNifMutex *we_mutex; pthread_mutex_t we_mutex;
unsigned int we_active; unsigned int we_active;
SLIST_HEAD(joining, async_nif_worker_entry) we_joining; SLIST_HEAD(joining, async_nif_worker_entry) we_joining;
unsigned int num_queues; unsigned int num_queues;
unsigned int next_q; unsigned int next_q;
STAILQ_HEAD(recycled_reqs, async_nif_req_entry) recycled_reqs; STAILQ_HEAD(recycled_reqs, async_nif_req_entry) recycled_reqs;
unsigned int num_reqs; unsigned int num_reqs;
ErlNifMutex *recycled_req_mutex; pthread_mutex_t recycled_req_mutex;
struct async_nif_work_queue queues[]; struct async_nif_work_queue queues[];
}; };
@ -117,7 +118,7 @@ struct async_nif_state {
DPRINTF("async_nif: calling \"%s\"", __func__); \ DPRINTF("async_nif: calling \"%s\"", __func__); \
do pre_block while(0); \ do pre_block while(0); \
DPRINTF("async_nif: returned from \"%s\"", __func__); \ DPRINTF("async_nif: returned from \"%s\"", __func__); \
copy_of_args = (struct decl ## _args *)enif_alloc(sizeof(struct decl ## _args)); \ copy_of_args = (struct decl ## _args *)malloc(sizeof(struct decl ## _args)); \
if (!copy_of_args) { \ if (!copy_of_args) { \
fn_post_ ## decl (args); \ fn_post_ ## decl (args); \
async_nif_recycle_req(req, async_nif); \ async_nif_recycle_req(req, async_nif); \
@ -137,7 +138,7 @@ struct async_nif_state {
if (!reply) { \ if (!reply) { \
fn_post_ ## decl (args); \ fn_post_ ## decl (args); \
async_nif_recycle_req(req, async_nif); \ async_nif_recycle_req(req, async_nif); \
enif_free(copy_of_args); \ free(copy_of_args); \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \ return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "eagain")); \ enif_make_atom(env, "eagain")); \
} \ } \
@ -145,30 +146,23 @@ struct async_nif_state {
} }
#define ASYNC_NIF_INIT(name) \ #define ASYNC_NIF_INIT(name) \
static ErlNifMutex *name##_async_nif_coord = NULL; static pthread_mutex_t name##_async_nif_coord = PTHREAD_MUTEX_INITIALIZER;
#define ASYNC_NIF_LOAD(name, priv) do { \ #define ASYNC_NIF_LOAD(name, priv) do { \
if (!name##_async_nif_coord) \ pthread_mutex_lock(&name##_async_nif_coord); \
name##_async_nif_coord = enif_mutex_create("nif_coord load"); \
enif_mutex_lock(name##_async_nif_coord); \
priv = async_nif_load(); \ priv = async_nif_load(); \
enif_mutex_unlock(name##_async_nif_coord); \ pthread_mutex_unlock(&name##_async_nif_coord); \
} while(0); } while(0);
#define ASYNC_NIF_UNLOAD(name, env, priv) do { \ #define ASYNC_NIF_UNLOAD(name, env, priv) do { \
if (!name##_async_nif_coord) \ pthread_mutex_lock(&name##_async_nif_coord); \
name##_async_nif_coord = enif_mutex_create("nif_coord unload"); \
enif_mutex_lock(name##_async_nif_coord); \
async_nif_unload(env, priv); \ async_nif_unload(env, priv); \
enif_mutex_unlock(name##_async_nif_coord); \ pthread_mutex_unlock(&name##_async_nif_coord); \
enif_mutex_destroy(name##_async_nif_coord); \ pthread_mutex_destroy(&name##_async_nif_coord); \
name##_async_nif_coord = NULL; \
} while(0); } while(0);
#define ASYNC_NIF_UPGRADE(name, env) do { \ #define ASYNC_NIF_UPGRADE(name, env) do { \
if (!name##_async_nif_coord) \ pthread_mutex_lock(&name##_async_nif_coord); \
name##_async_nif_coord = enif_mutex_create("nif_coord upgrade"); \
enif_mutex_lock(name##_async_nif_coord); \
async_nif_upgrade(env); \ async_nif_upgrade(env); \
enif_mutex_unlock(name##_async_nif_coord); \ pthread_mutex_unlock(&name##_async_nif_coord); \
} while(0); } while(0);
#define ASYNC_NIF_RETURN_BADARG() do { \ #define ASYNC_NIF_RETURN_BADARG() do { \
@ -189,10 +183,10 @@ async_nif_reuse_req(struct async_nif_state *async_nif)
struct async_nif_req_entry *req = NULL; struct async_nif_req_entry *req = NULL;
ErlNifEnv *env = NULL; ErlNifEnv *env = NULL;
enif_mutex_lock(async_nif->recycled_req_mutex); pthread_mutex_lock(&async_nif->recycled_req_mutex);
if (STAILQ_EMPTY(&async_nif->recycled_reqs)) { if (STAILQ_EMPTY(&async_nif->recycled_reqs)) {
if (async_nif->num_reqs < ASYNC_NIF_MAX_QUEUED_REQS) { if (async_nif->num_reqs < ASYNC_NIF_MAX_QUEUED_REQS) {
req = enif_alloc(sizeof(struct async_nif_req_entry)); req = malloc(sizeof(struct async_nif_req_entry));
if (req) { if (req) {
memset(req, 0, sizeof(struct async_nif_req_entry)); memset(req, 0, sizeof(struct async_nif_req_entry));
env = enif_alloc_env(); env = enif_alloc_env();
@ -200,7 +194,7 @@ async_nif_reuse_req(struct async_nif_state *async_nif)
req->env = env; req->env = env;
__sync_fetch_and_add(&async_nif->num_reqs, 1); __sync_fetch_and_add(&async_nif->num_reqs, 1);
} else { } else {
enif_free(req); free(req);
req = NULL; req = NULL;
} }
} }
@ -209,7 +203,7 @@ async_nif_reuse_req(struct async_nif_state *async_nif)
req = STAILQ_FIRST(&async_nif->recycled_reqs); req = STAILQ_FIRST(&async_nif->recycled_reqs);
STAILQ_REMOVE(&async_nif->recycled_reqs, req, async_nif_req_entry, entries); STAILQ_REMOVE(&async_nif->recycled_reqs, req, async_nif_req_entry, entries);
} }
enif_mutex_unlock(async_nif->recycled_req_mutex); pthread_mutex_unlock(&async_nif->recycled_req_mutex);
return req; return req;
} }
@ -224,13 +218,13 @@ void
async_nif_recycle_req(struct async_nif_req_entry *req, struct async_nif_state *async_nif) async_nif_recycle_req(struct async_nif_req_entry *req, struct async_nif_state *async_nif)
{ {
ErlNifEnv *env = NULL; ErlNifEnv *env = NULL;
enif_mutex_lock(async_nif->recycled_req_mutex); pthread_mutex_lock(&async_nif->recycled_req_mutex);
enif_clear_env(req->env); enif_clear_env(req->env);
env = req->env; env = req->env;
memset(req, 0, sizeof(struct async_nif_req_entry)); memset(req, 0, sizeof(struct async_nif_req_entry));
req->env = env; req->env = env;
STAILQ_INSERT_TAIL(&async_nif->recycled_reqs, req, entries); STAILQ_INSERT_TAIL(&async_nif->recycled_reqs, req, entries);
enif_mutex_unlock(async_nif->recycled_req_mutex); pthread_mutex_unlock(&async_nif->recycled_req_mutex);
} }
static void *async_nif_worker_fn(void *); static void *async_nif_worker_fn(void *);
@ -246,7 +240,7 @@ async_nif_start_worker(struct async_nif_state *async_nif, struct async_nif_work_
if (0 == q) if (0 == q)
return EINVAL; return EINVAL;
enif_mutex_lock(async_nif->we_mutex); pthread_mutex_lock(&async_nif->we_mutex);
we = SLIST_FIRST(&async_nif->we_joining); we = SLIST_FIRST(&async_nif->we_joining);
while(we != NULL) { while(we != NULL) {
@ -254,19 +248,19 @@ async_nif_start_worker(struct async_nif_state *async_nif, struct async_nif_work_
SLIST_REMOVE(&async_nif->we_joining, we, async_nif_worker_entry, entries); SLIST_REMOVE(&async_nif->we_joining, we, async_nif_worker_entry, entries);
void *exit_value = 0; /* We ignore the thread_join's exit value. */ void *exit_value = 0; /* We ignore the thread_join's exit value. */
enif_thread_join(we->tid, &exit_value); enif_thread_join(we->tid, &exit_value);
enif_free(we); free(we);
async_nif->we_active--; async_nif->we_active--;
we = n; we = n;
} }
if (async_nif->we_active == ASYNC_NIF_MAX_WORKERS) { if (async_nif->we_active == ASYNC_NIF_MAX_WORKERS) {
enif_mutex_unlock(async_nif->we_mutex); pthread_mutex_unlock(&async_nif->we_mutex);
return EAGAIN; return EAGAIN;
} }
we = enif_alloc(sizeof(struct async_nif_worker_entry)); we = malloc(sizeof(struct async_nif_worker_entry));
if (!we) { if (!we) {
enif_mutex_unlock(async_nif->we_mutex); pthread_mutex_unlock(&async_nif->we_mutex);
return ENOMEM; return ENOMEM;
} }
memset(we, 0, sizeof(struct async_nif_worker_entry)); memset(we, 0, sizeof(struct async_nif_worker_entry));
@ -274,7 +268,7 @@ async_nif_start_worker(struct async_nif_state *async_nif, struct async_nif_work_
we->async_nif = async_nif; we->async_nif = async_nif;
we->q = q; we->q = q;
enif_mutex_unlock(async_nif->we_mutex); pthread_mutex_unlock(&async_nif->we_mutex);
return enif_thread_create(NULL,&we->tid, &async_nif_worker_fn, (void*)we, 0); return enif_thread_create(NULL,&we->tid, &async_nif_worker_fn, (void*)we, 0);
} }
@ -322,9 +316,9 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
we hold this lock either a) we're shutting down so exit now or b) this we hold this lock either a) we're shutting down so exit now or b) this
queue will be valid until we release the lock. */ queue will be valid until we release the lock. */
q = &async_nif->queues[qid]; q = &async_nif->queues[qid];
enif_mutex_lock(q->reqs_mutex); pthread_mutex_lock(&q->reqs_mutex);
if (async_nif->shutdown) { if (async_nif->shutdown) {
enif_mutex_unlock(q->reqs_mutex); pthread_mutex_unlock(&q->reqs_mutex);
return 0; return 0;
} }
@ -332,7 +326,7 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
the request volume. */ the request volume. */
if (q->depth <= avg_depth) break; if (q->depth <= avg_depth) break;
else { else {
enif_mutex_unlock(q->reqs_mutex); pthread_mutex_unlock(&q->reqs_mutex);
qid = (qid + 1) % async_nif->num_queues; qid = (qid + 1) % async_nif->num_queues;
} }
} }
@ -361,8 +355,8 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
performing the request). */ performing the request). */
ERL_NIF_TERM reply = enif_make_tuple2(req->env, enif_make_atom(req->env, "ok"), ERL_NIF_TERM reply = enif_make_tuple2(req->env, enif_make_atom(req->env, "ok"),
enif_make_atom(req->env, "enqueued")); enif_make_atom(req->env, "enqueued"));
enif_cond_signal(q->reqs_cnd); pthread_cond_signal(&q->reqs_cnd);
enif_mutex_unlock(q->reqs_mutex); pthread_mutex_unlock(&q->reqs_mutex);
return reply; return reply;
} }
@ -383,15 +377,15 @@ async_nif_worker_fn(void *arg)
for(;;) { for(;;) {
/* Examine the request queue, are there things to be done? */ /* Examine the request queue, are there things to be done? */
enif_mutex_lock(q->reqs_mutex); pthread_mutex_lock(&q->reqs_mutex);
check_again_for_work: check_again_for_work:
if (async_nif->shutdown) { if (async_nif->shutdown) {
enif_mutex_unlock(q->reqs_mutex); pthread_mutex_unlock(&q->reqs_mutex);
break; break;
} }
if (STAILQ_EMPTY(&q->reqs)) { if (STAILQ_EMPTY(&q->reqs)) {
/* Queue is empty so we wait for more work to arrive. */ /* Queue is empty so we wait for more work to arrive. */
enif_mutex_unlock(q->reqs_mutex); pthread_mutex_unlock(&q->reqs_mutex);
if (tries == 0 && q == we->q) { if (tries == 0 && q == we->q) {
if (q->num_workers > ASYNC_NIF_MIN_WORKERS) { if (q->num_workers > ASYNC_NIF_MIN_WORKERS) {
/* At this point we've tried to find/execute work on all queues /* At this point we've tried to find/execute work on all queues
@ -399,8 +393,8 @@ async_nif_worker_fn(void *arg)
* leaving this loop (break) which leads to a thread exit/join. */ * leaving this loop (break) which leads to a thread exit/join. */
break; break;
} else { } else {
enif_mutex_lock(q->reqs_mutex); pthread_mutex_lock(&q->reqs_mutex);
enif_cond_wait(q->reqs_cnd, q->reqs_mutex); pthread_cond_wait(&q->reqs_cnd, &q->reqs_mutex);
goto check_again_for_work; goto check_again_for_work;
} }
} else { } else {
@ -418,8 +412,8 @@ async_nif_worker_fn(void *arg)
__sync_fetch_and_add(&q->depth, -1); __sync_fetch_and_add(&q->depth, -1);
/* Wake up other worker thread watching this queue to help process work. */ /* Wake up other worker thread watching this queue to help process work. */
enif_cond_signal(q->reqs_cnd); pthread_cond_signal(&q->reqs_cnd);
enif_mutex_unlock(q->reqs_mutex); pthread_mutex_unlock(&q->reqs_mutex);
/* Perform the work. */ /* Perform the work. */
req->fn_work(req->env, req->ref, &req->pid, worker_id, req->args); req->fn_work(req->env, req->ref, &req->pid, worker_id, req->args);
@ -431,15 +425,15 @@ async_nif_worker_fn(void *arg)
req->ref = 0; req->ref = 0;
req->fn_work = 0; req->fn_work = 0;
req->fn_post = 0; req->fn_post = 0;
enif_free(req->args); free(req->args);
req->args = NULL; req->args = NULL;
async_nif_recycle_req(req, async_nif); async_nif_recycle_req(req, async_nif);
req = NULL; req = NULL;
} }
} }
enif_mutex_lock(async_nif->we_mutex); pthread_mutex_lock(&async_nif->we_mutex);
SLIST_INSERT_HEAD(&async_nif->we_joining, we, entries); SLIST_INSERT_HEAD(&async_nif->we_joining, we, entries);
enif_mutex_unlock(async_nif->we_mutex); pthread_mutex_unlock(&async_nif->we_mutex);
__sync_fetch_and_add(&q->num_workers, -1); __sync_fetch_and_add(&q->num_workers, -1);
enif_thread_exit(0); enif_thread_exit(0);
return 0; return 0;
@ -462,34 +456,34 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
if not shutting down. */ if not shutting down. */
for (i = 0; i < num_queues; i++) { for (i = 0; i < num_queues; i++) {
q = &async_nif->queues[i]; q = &async_nif->queues[i];
enif_mutex_lock(q->reqs_mutex); pthread_mutex_lock(&q->reqs_mutex);
} }
/* Set the shutdown flag so that worker threads will no continue /* Set the shutdown flag so that worker threads will no continue
executing requests. */ executing requests. */
async_nif->shutdown = 1; async_nif->shutdown = 1;
for (i = 0; i < num_queues; i++) { for (i = 0; i < num_queues; i++) {
q = &async_nif->queues[i]; q = &async_nif->queues[i];
enif_mutex_unlock(q->reqs_mutex); pthread_mutex_unlock(&q->reqs_mutex);
} }
/* Join for the now exiting worker threads. */ /* Join for the now exiting worker threads. */
while(async_nif->we_active > 0) { while(async_nif->we_active > 0) {
for (i = 0; i < num_queues; i++) for (i = 0; i < num_queues; i++)
enif_cond_broadcast(async_nif->queues[i].reqs_cnd); pthread_cond_broadcast(&async_nif->queues[i].reqs_cnd);
enif_mutex_lock(async_nif->we_mutex); pthread_mutex_lock(&async_nif->we_mutex);
we = SLIST_FIRST(&async_nif->we_joining); we = SLIST_FIRST(&async_nif->we_joining);
while(we != NULL) { while(we != NULL) {
struct async_nif_worker_entry *n = SLIST_NEXT(we, entries); struct async_nif_worker_entry *n = SLIST_NEXT(we, entries);
SLIST_REMOVE(&async_nif->we_joining, we, async_nif_worker_entry, entries); SLIST_REMOVE(&async_nif->we_joining, we, async_nif_worker_entry, entries);
void *exit_value = 0; /* We ignore the thread_join's exit value. */ void *exit_value = 0; /* We ignore the thread_join's exit value. */
enif_thread_join(we->tid, &exit_value); enif_thread_join(we->tid, &exit_value);
enif_free(we); free(we);
async_nif->we_active--; async_nif->we_active--;
we = n; we = n;
} }
enif_mutex_unlock(async_nif->we_mutex); pthread_mutex_unlock(&async_nif->we_mutex);
} }
enif_mutex_destroy(async_nif->we_mutex); pthread_mutex_destroy(&async_nif->we_mutex);
/* Cleanup in-flight requests, mutexes and conditions in each work queue. */ /* Cleanup in-flight requests, mutexes and conditions in each work queue. */
for (i = 0; i < num_queues; i++) { for (i = 0; i < num_queues; i++) {
@ -506,29 +500,29 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
enif_make_atom(req->env, "shutdown"))); enif_make_atom(req->env, "shutdown")));
req->fn_post(req->args); req->fn_post(req->args);
enif_free_env(req->env); enif_free_env(req->env);
enif_free(req->args); free(req->args);
enif_free(req); free(req);
req = n; req = n;
} }
enif_mutex_destroy(q->reqs_mutex); pthread_mutex_destroy(&q->reqs_mutex);
enif_cond_destroy(q->reqs_cnd); pthread_cond_destroy(&q->reqs_cnd);
} }
/* Free any req structures sitting unused on the recycle queue. */ /* Free any req structures sitting unused on the recycle queue. */
enif_mutex_lock(async_nif->recycled_req_mutex); pthread_mutex_lock(&async_nif->recycled_req_mutex);
req = NULL; req = NULL;
req = STAILQ_FIRST(&async_nif->recycled_reqs); req = STAILQ_FIRST(&async_nif->recycled_reqs);
while(req != NULL) { while(req != NULL) {
struct async_nif_req_entry *n = STAILQ_NEXT(req, entries); struct async_nif_req_entry *n = STAILQ_NEXT(req, entries);
enif_free_env(req->env); enif_free_env(req->env);
enif_free(req); free(req);
req = n; req = n;
} }
enif_mutex_unlock(async_nif->recycled_req_mutex); pthread_mutex_unlock(&async_nif->recycled_req_mutex);
enif_mutex_destroy(async_nif->recycled_req_mutex); pthread_mutex_destroy(&async_nif->recycled_req_mutex);
memset(async_nif, 0, sizeof(struct async_nif_state) + (sizeof(struct async_nif_work_queue) * async_nif->num_queues)); memset(async_nif, 0, sizeof(struct async_nif_state) + (sizeof(struct async_nif_work_queue) * async_nif->num_queues));
enif_free(async_nif); free(async_nif);
} }
static void * static void *
@ -560,7 +554,7 @@ async_nif_load()
} }
/* Init our portion of priv_data's module-specific state. */ /* Init our portion of priv_data's module-specific state. */
async_nif = enif_alloc(sizeof(struct async_nif_state) + async_nif = malloc(sizeof(struct async_nif_state) +
sizeof(struct async_nif_work_queue) * num_queues); sizeof(struct async_nif_work_queue) * num_queues);
if (!async_nif) if (!async_nif)
return NULL; return NULL;
@ -572,15 +566,15 @@ async_nif_load()
async_nif->next_q = 0; async_nif->next_q = 0;
async_nif->shutdown = 0; async_nif->shutdown = 0;
STAILQ_INIT(&async_nif->recycled_reqs); STAILQ_INIT(&async_nif->recycled_reqs);
async_nif->recycled_req_mutex = enif_mutex_create("recycled_req"); pthread_mutex_init(&async_nif->recycled_req_mutex, 0);
async_nif->we_mutex = enif_mutex_create("we"); pthread_mutex_init(&async_nif->we_mutex, 0);
SLIST_INIT(&async_nif->we_joining); SLIST_INIT(&async_nif->we_joining);
for (i = 0; i < async_nif->num_queues; i++) { for (i = 0; i < async_nif->num_queues; i++) {
struct async_nif_work_queue *q = &async_nif->queues[i]; struct async_nif_work_queue *q = &async_nif->queues[i];
STAILQ_INIT(&q->reqs); STAILQ_INIT(&q->reqs);
q->reqs_mutex = enif_mutex_create("reqs"); pthread_mutex_init(&q->reqs_mutex, 0);
q->reqs_cnd = enif_cond_create("reqs"); pthread_cond_init(&q->reqs_cnd, 0);
q->next = &async_nif->queues[(i + 1) % num_queues]; q->next = &async_nif->queues[(i + 1) % num_queues];
} }
return async_nif; return async_nif;