diff --git a/c_src/async_nif.h b/c_src/async_nif.h index 008b5e0..539bc75 100644 --- a/c_src/async_nif.h +++ b/c_src/async_nif.h @@ -34,7 +34,7 @@ extern "C" { #define ASYNC_NIF_MAX_WORKERS 1024 #define ASYNC_NIF_MIN_WORKERS 2 -#define ASYNC_NIF_WORKER_QUEUE_SIZE 4096 +#define ASYNC_NIF_WORKER_QUEUE_SIZE 256 #define ASYNC_NIF_MAX_QUEUED_REQS ASYNC_NIF_WORKER_QUEUE_SIZE * ASYNC_NIF_MAX_WORKERS struct async_nif_req_entry { @@ -391,20 +391,25 @@ async_nif_worker_fn(void *arg) } if (STAILQ_EMPTY(&q->reqs)) { /* Queue is empty so we wait for more work to arrive. */ - if (q->num_workers > ASYNC_NIF_MIN_WORKERS) { - enif_mutex_unlock(q->reqs_mutex); - if (tries == 0 && q == we->q) break; // we've tried all queues, thread exit - else { - tries--; - __sync_fetch_and_add(&q->num_workers, -1); - q = q->next; - __sync_fetch_and_add(&q->num_workers, 1); - continue; // try next queue - } - } else { - enif_cond_wait(q->reqs_cnd, q->reqs_mutex); - goto check_again_for_work; - } + enif_mutex_unlock(q->reqs_mutex); + if (tries == 0 && q == we->q) { + if (q->num_workers > ASYNC_NIF_MIN_WORKERS) { + /* At this point we've tried to find/execute work on all queues + * and there are at least MIN_WORKERS on this queue so we + * leaving this loop (break) which leads to a thread exit/join. */ + break; + } else { + enif_mutex_lock(q->reqs_mutex); + enif_cond_wait(q->reqs_cnd, q->reqs_mutex); + goto check_again_for_work; + } + } else { + tries--; + __sync_fetch_and_add(&q->num_workers, -1); + q = q->next; + __sync_fetch_and_add(&q->num_workers, 1); + continue; // try next queue + } } else { /* At this point the next req is ours to process and we hold the reqs_mutex lock. Take the request off the queue. */ @@ -412,8 +417,7 @@ async_nif_worker_fn(void *arg) STAILQ_REMOVE(&q->reqs, req, async_nif_req_entry, entries); __sync_fetch_and_add(&q->depth, -1); - /* Ensure that there is at least one other worker thread watching this - queue. */ + /* Wake up other worker thread watching this queue to help process work. */ enif_cond_signal(q->reqs_cnd); enif_mutex_unlock(q->reqs_mutex); @@ -554,7 +558,6 @@ async_nif_load() if (num_queues < 2) num_queues = 2; } - num_queues *= 32; /* Init our portion of priv_data's module-specific state. */ async_nif = enif_alloc(sizeof(struct async_nif_state) +