Update to latest async work.

This commit is contained in:
Gregory Burd 2013-07-26 14:33:40 -04:00
parent 69a850737e
commit 73e071eabe

View file

@ -53,6 +53,7 @@ struct async_nif_work_queue {
unsigned int depth; unsigned int depth;
ErlNifMutex *reqs_mutex; ErlNifMutex *reqs_mutex;
ErlNifCond *reqs_cnd; ErlNifCond *reqs_cnd;
struct async_nif_work_queue *next;
STAILQ_HEAD(reqs, async_nif_req_entry) reqs; STAILQ_HEAD(reqs, async_nif_req_entry) reqs;
}; };
@ -148,14 +149,14 @@ struct async_nif_state {
#define ASYNC_NIF_LOAD(name, priv) do { \ #define ASYNC_NIF_LOAD(name, priv) do { \
if (!name##_async_nif_coord) \ if (!name##_async_nif_coord) \
name##_async_nif_coord = enif_mutex_create(NULL); \ name##_async_nif_coord = enif_mutex_create("nif_coord load"); \
enif_mutex_lock(name##_async_nif_coord); \ enif_mutex_lock(name##_async_nif_coord); \
priv = async_nif_load(); \ priv = async_nif_load(); \
enif_mutex_unlock(name##_async_nif_coord); \ enif_mutex_unlock(name##_async_nif_coord); \
} while(0); } while(0);
#define ASYNC_NIF_UNLOAD(name, env, priv) do { \ #define ASYNC_NIF_UNLOAD(name, env, priv) do { \
if (!name##_async_nif_coord) \ if (!name##_async_nif_coord) \
name##_async_nif_coord = enif_mutex_create(NULL); \ name##_async_nif_coord = enif_mutex_create("nif_coord unload"); \
enif_mutex_lock(name##_async_nif_coord); \ enif_mutex_lock(name##_async_nif_coord); \
async_nif_unload(env, priv); \ async_nif_unload(env, priv); \
enif_mutex_unlock(name##_async_nif_coord); \ enif_mutex_unlock(name##_async_nif_coord); \
@ -164,7 +165,7 @@ struct async_nif_state {
} while(0); } while(0);
#define ASYNC_NIF_UPGRADE(name, env) do { \ #define ASYNC_NIF_UPGRADE(name, env) do { \
if (!name##_async_nif_coord) \ if (!name##_async_nif_coord) \
name##_async_nif_coord = enif_mutex_create(NULL); \ name##_async_nif_coord = enif_mutex_create("nif_coord upgrade"); \
enif_mutex_lock(name##_async_nif_coord); \ enif_mutex_lock(name##_async_nif_coord); \
async_nif_upgrade(env); \ async_nif_upgrade(env); \
enif_mutex_unlock(name##_async_nif_coord); \ enif_mutex_unlock(name##_async_nif_coord); \
@ -197,7 +198,7 @@ async_nif_reuse_req(struct async_nif_state *async_nif)
env = enif_alloc_env(); env = enif_alloc_env();
if (env) { if (env) {
req->env = env; req->env = env;
async_nif->num_reqs++; __sync_fetch_and_add(&async_nif->num_reqs, 1);
} else { } else {
enif_free(req); enif_free(req);
req = NULL; req = NULL;
@ -287,9 +288,9 @@ static ERL_NIF_TERM
async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_entry *req, int hint) async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_entry *req, int hint)
{ {
/* Identify the most appropriate worker for this request. */ /* Identify the most appropriate worker for this request. */
unsigned int i, qid = 0; unsigned int i, last_qid, qid = 0;
struct async_nif_work_queue *q = NULL; struct async_nif_work_queue *q = NULL;
double avg_depth = 0.0; double avg_depth;
/* Either we're choosing a queue based on some affinity/hinted value or we /* Either we're choosing a queue based on some affinity/hinted value or we
need to select the next queue in the rotation and atomically update that need to select the next queue in the rotation and atomically update that
@ -297,9 +298,10 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
if (hint >= 0) { if (hint >= 0) {
qid = (unsigned int)hint; qid = (unsigned int)hint;
} else { } else {
qid = async_nif->next_q; do {
qid = (qid + 1) % async_nif->num_queues; last_qid = __sync_fetch_and_add(&async_nif->next_q, 0);
async_nif->next_q = qid; qid = (last_qid + 1) % async_nif->num_queues;
} while (!__sync_bool_compare_and_swap(&async_nif->next_q, last_qid, qid));
} }
/* Now we inspect and interate across the set of queues trying to select one /* Now we inspect and interate across the set of queues trying to select one
@ -314,8 +316,7 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
avg_depth += async_nif->queues[j].depth; avg_depth += async_nif->queues[j].depth;
} }
} }
if (avg_depth != 0) if (avg_depth) avg_depth /= n;
avg_depth /= n;
/* Lock this queue under consideration, then check for shutdown. While /* Lock this queue under consideration, then check for shutdown. While
we hold this lock either a) we're shutting down so exit now or b) this we hold this lock either a) we're shutting down so exit now or b) this
@ -337,20 +338,23 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
} }
/* If the for loop finished then we didn't find a suitable queue for this /* If the for loop finished then we didn't find a suitable queue for this
request, meaning we're backed up so trigger eagain. */ request, meaning we're backed up so trigger eagain. Note that if we left
if (i == async_nif->num_queues) { the loop in this way we hold no lock. */
enif_mutex_unlock(q->reqs_mutex); if (i == async_nif->num_queues) return 0;
return 0;
}
/* Add the request to the queue. */ /* Add the request to the queue. */
STAILQ_INSERT_TAIL(&q->reqs, req, entries); STAILQ_INSERT_TAIL(&q->reqs, req, entries);
q->depth++; __sync_fetch_and_add(&q->depth, 1);
/* We've selected a queue for this new request now check to make sure there are /* We've selected a queue for this new request now check to make sure there are
enough workers actively processing requests on this queue. */ enough workers actively processing requests on this queue. */
if (q->depth > q->num_workers) while (q->depth > q->num_workers) {
if (async_nif_start_worker(async_nif, q) == 0) q->num_workers++; switch(async_nif_start_worker(async_nif, q)) {
case EINVAL: case ENOMEM: default: return 0;
case EAGAIN: continue;
case 0: __sync_fetch_and_add(&q->num_workers, 1); goto done;
}
}done:;
/* Build the term before releasing the lock so as not to race on the use of /* Build the term before releasing the lock so as not to race on the use of
the req pointer (which will soon become invalid in another thread the req pointer (which will soon become invalid in another thread
@ -375,6 +379,7 @@ async_nif_worker_fn(void *arg)
struct async_nif_state *async_nif = we->async_nif; struct async_nif_state *async_nif = we->async_nif;
struct async_nif_work_queue *q = we->q; struct async_nif_work_queue *q = we->q;
struct async_nif_req_entry *req = NULL; struct async_nif_req_entry *req = NULL;
unsigned int tries = async_nif->num_queues;
for(;;) { for(;;) {
/* Examine the request queue, are there things to be done? */ /* Examine the request queue, are there things to be done? */
@ -388,7 +393,14 @@ async_nif_worker_fn(void *arg)
/* Queue is empty so we wait for more work to arrive. */ /* Queue is empty so we wait for more work to arrive. */
if (q->num_workers > ASYNC_NIF_MIN_WORKERS) { if (q->num_workers > ASYNC_NIF_MIN_WORKERS) {
enif_mutex_unlock(q->reqs_mutex); enif_mutex_unlock(q->reqs_mutex);
break; if (tries == 0 && q == we->q) break; // we've tried all queues, thread exit
else {
tries--;
__sync_fetch_and_add(&q->num_workers, -1);
q = q->next;
__sync_fetch_and_add(&q->num_workers, 1);
continue; // try another queue
}
} else { } else {
enif_cond_wait(q->reqs_cnd, q->reqs_mutex); enif_cond_wait(q->reqs_cnd, q->reqs_mutex);
goto check_again_for_work; goto check_again_for_work;
@ -398,7 +410,7 @@ async_nif_worker_fn(void *arg)
reqs_mutex lock. Take the request off the queue. */ reqs_mutex lock. Take the request off the queue. */
req = STAILQ_FIRST(&q->reqs); req = STAILQ_FIRST(&q->reqs);
STAILQ_REMOVE(&q->reqs, req, async_nif_req_entry, entries); STAILQ_REMOVE(&q->reqs, req, async_nif_req_entry, entries);
q->depth--; __sync_fetch_and_add(&q->depth, -1);
/* Ensure that there is at least one other worker thread watching this /* Ensure that there is at least one other worker thread watching this
queue. */ queue. */
@ -424,7 +436,7 @@ async_nif_worker_fn(void *arg)
enif_mutex_lock(async_nif->we_mutex); enif_mutex_lock(async_nif->we_mutex);
SLIST_INSERT_HEAD(&async_nif->we_joining, we, entries); SLIST_INSERT_HEAD(&async_nif->we_joining, we, entries);
enif_mutex_unlock(async_nif->we_mutex); enif_mutex_unlock(async_nif->we_mutex);
q->num_workers--; __sync_fetch_and_add(&q->num_workers, -1);
enif_thread_exit(0); enif_thread_exit(0);
return 0; return 0;
} }
@ -556,15 +568,16 @@ async_nif_load()
async_nif->next_q = 0; async_nif->next_q = 0;
async_nif->shutdown = 0; async_nif->shutdown = 0;
STAILQ_INIT(&async_nif->recycled_reqs); STAILQ_INIT(&async_nif->recycled_reqs);
async_nif->recycled_req_mutex = enif_mutex_create(NULL); async_nif->recycled_req_mutex = enif_mutex_create("recycled_req");
async_nif->we_mutex = enif_mutex_create(NULL); async_nif->we_mutex = enif_mutex_create("we");
SLIST_INIT(&async_nif->we_joining); SLIST_INIT(&async_nif->we_joining);
for (i = 0; i < async_nif->num_queues; i++) { for (i = 0; i < async_nif->num_queues; i++) {
struct async_nif_work_queue *q = &async_nif->queues[i]; struct async_nif_work_queue *q = &async_nif->queues[i];
STAILQ_INIT(&q->reqs); STAILQ_INIT(&q->reqs);
q->reqs_mutex = enif_mutex_create(NULL); q->reqs_mutex = enif_mutex_create("reqs");
q->reqs_cnd = enif_cond_create(NULL); q->reqs_cnd = enif_cond_create("reqs");
q->next = &async_nif->queues[(i + 1) % num_queues];
} }
return async_nif; return async_nif;
} }