Lower the queue size to shrink potential for latency in queue. Remove earlier idea that more queues would lead to more even worker progress, just have 1 queue per Erlang-scheduler thread (generally, 1 per CPU core available). Also change the way worker threads decide when to cond_wait or migrate to other queues looking for work.

This commit is contained in:
Gregory Burd 2013-07-31 15:06:28 -04:00
parent c9a4ab8325
commit ee904b4769

View file

@ -34,7 +34,7 @@ extern "C" {
#define ASYNC_NIF_MAX_WORKERS 1024 #define ASYNC_NIF_MAX_WORKERS 1024
#define ASYNC_NIF_MIN_WORKERS 2 #define ASYNC_NIF_MIN_WORKERS 2
#define ASYNC_NIF_WORKER_QUEUE_SIZE 4096 #define ASYNC_NIF_WORKER_QUEUE_SIZE 256
#define ASYNC_NIF_MAX_QUEUED_REQS ASYNC_NIF_WORKER_QUEUE_SIZE * ASYNC_NIF_MAX_WORKERS #define ASYNC_NIF_MAX_QUEUED_REQS ASYNC_NIF_WORKER_QUEUE_SIZE * ASYNC_NIF_MAX_WORKERS
struct async_nif_req_entry { struct async_nif_req_entry {
@ -391,20 +391,25 @@ async_nif_worker_fn(void *arg)
} }
if (STAILQ_EMPTY(&q->reqs)) { if (STAILQ_EMPTY(&q->reqs)) {
/* Queue is empty so we wait for more work to arrive. */ /* Queue is empty so we wait for more work to arrive. */
if (q->num_workers > ASYNC_NIF_MIN_WORKERS) { enif_mutex_unlock(q->reqs_mutex);
enif_mutex_unlock(q->reqs_mutex); if (tries == 0 && q == we->q) {
if (tries == 0 && q == we->q) break; // we've tried all queues, thread exit if (q->num_workers > ASYNC_NIF_MIN_WORKERS) {
else { /* At this point we've tried to find/execute work on all queues
tries--; * and there are at least MIN_WORKERS on this queue so we
__sync_fetch_and_add(&q->num_workers, -1); * leaving this loop (break) which leads to a thread exit/join. */
q = q->next; break;
__sync_fetch_and_add(&q->num_workers, 1); } else {
continue; // try next queue enif_mutex_lock(q->reqs_mutex);
} enif_cond_wait(q->reqs_cnd, q->reqs_mutex);
} else { goto check_again_for_work;
enif_cond_wait(q->reqs_cnd, q->reqs_mutex); }
goto check_again_for_work; } else {
} tries--;
__sync_fetch_and_add(&q->num_workers, -1);
q = q->next;
__sync_fetch_and_add(&q->num_workers, 1);
continue; // try next queue
}
} else { } else {
/* At this point the next req is ours to process and we hold the /* At this point the next req is ours to process and we hold the
reqs_mutex lock. Take the request off the queue. */ reqs_mutex lock. Take the request off the queue. */
@ -412,8 +417,7 @@ async_nif_worker_fn(void *arg)
STAILQ_REMOVE(&q->reqs, req, async_nif_req_entry, entries); STAILQ_REMOVE(&q->reqs, req, async_nif_req_entry, entries);
__sync_fetch_and_add(&q->depth, -1); __sync_fetch_and_add(&q->depth, -1);
/* Ensure that there is at least one other worker thread watching this /* Wake up other worker thread watching this queue to help process work. */
queue. */
enif_cond_signal(q->reqs_cnd); enif_cond_signal(q->reqs_cnd);
enif_mutex_unlock(q->reqs_mutex); enif_mutex_unlock(q->reqs_mutex);
@ -554,7 +558,6 @@ async_nif_load()
if (num_queues < 2) if (num_queues < 2)
num_queues = 2; num_queues = 2;
} }
num_queues *= 32;
/* Init our portion of priv_data's module-specific state. */ /* Init our portion of priv_data's module-specific state. */
async_nif = enif_alloc(sizeof(struct async_nif_state) + async_nif = enif_alloc(sizeof(struct async_nif_state) +