diff --git a/c_src/async_nif.h b/c_src/async_nif.h index c18106c..425c0f1 100644 --- a/c_src/async_nif.h +++ b/c_src/async_nif.h @@ -27,7 +27,6 @@ extern "C" { #include #include "queue.h" -#include "stats.h" #ifndef UNUSED #define UNUSED(v) ((void)(v)) @@ -35,7 +34,7 @@ extern "C" { #define ASYNC_NIF_MAX_WORKERS 1024 #define ASYNC_NIF_MIN_WORKERS 2 -#define ASYNC_NIF_WORKER_QUEUE_SIZE 1000 +#define ASYNC_NIF_WORKER_QUEUE_SIZE 100 #define ASYNC_NIF_MAX_QUEUED_REQS ASYNC_NIF_WORKER_QUEUE_SIZE * ASYNC_NIF_MAX_WORKERS struct async_nif_req_entry { @@ -56,7 +55,6 @@ struct async_nif_work_queue { ErlNifMutex *reqs_mutex; ErlNifCond *reqs_cnd; STAILQ_HEAD(reqs, async_nif_req_entry) reqs; - STAT_DEF(work); }; struct async_nif_worker_entry { @@ -68,8 +66,6 @@ struct async_nif_worker_entry { }; struct async_nif_state { - STAT_DEF(wait); - STAT_DEF(service); unsigned int shutdown; ErlNifMutex *we_mutex; unsigned int we_active; @@ -349,26 +345,15 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en return 0; } - /* We've selected queue for this new request now check to make sure there are - enough workers actively processing requests on this queue. */ - if (q->num_workers < ASYNC_NIF_MIN_WORKERS) { - if (async_nif_start_worker(async_nif, q) == 0) q->num_workers++; - } else { - /* If more the 1/4 of the time it takes to complete a request is spent in - waiting to be serviced then we need more worker threads to service - requests. */ - double m, n; - m = __stat_mean(async_nif->wait_stat); - n = __stat_mean(q->work_stat); - DPRINTF("w: %f\ts: %f\t%f", m, n, n/(m+n)); - if (m && n && n / (m+n) < 0.75) - if (async_nif_start_worker(async_nif, q) == 0) q->num_workers++; - } - - /* And finally add the request to the queue. */ + /* Add the request to the queue. */ STAILQ_INSERT_TAIL(&q->reqs, req, entries); q->depth++; + /* We've selected a queue for this new request now check to make sure there are + enough workers actively processing requests on this queue. */ + if (q->depth > q->num_workers) + if (async_nif_start_worker(async_nif, q) == 0) q->num_workers++; + /* Build the term before releasing the lock so as not to race on the use of the req pointer (which will soon become invalid in another thread performing the request). */ @@ -423,13 +408,7 @@ async_nif_worker_fn(void *arg) enif_mutex_unlock(q->reqs_mutex); /* Perform the work. */ - uint64_t then = ts(q->work_stat->d.unit); - uint64_t wait = then - req->submitted; req->fn_work(req->env, req->ref, &req->pid, worker_id, req->args); - uint64_t work = ts(q->work_stat->d.unit) - then; - __stat_add(async_nif->wait_stat, wait); - __stat_add(q->work_stat, work); - __stat_add(async_nif->service_stat, wait + work); /* Now call the post-work cleanup function. */ req->fn_post(req->args); @@ -481,7 +460,6 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif) q = &async_nif->queues[i]; enif_mutex_unlock(q->reqs_mutex); } - __stat_print_histogram(async_nif->service_stat, "wterl"); /* Join for the now exiting worker threads. */ while(async_nif->we_active > 0) { @@ -586,15 +564,12 @@ async_nif_load() async_nif->recycled_req_mutex = enif_mutex_create(NULL); async_nif->we_mutex = enif_mutex_create(NULL); SLIST_INIT(&async_nif->we_joining); - async_nif->wait_stat = __stat_init(100000); - async_nif->service_stat = __stat_init(100000); for (i = 0; i < async_nif->num_queues; i++) { struct async_nif_work_queue *q = &async_nif->queues[i]; STAILQ_INIT(&q->reqs); q->reqs_mutex = enif_mutex_create(NULL); q->reqs_cnd = enif_cond_create(NULL); - q->work_stat = __stat_init(100000); } return async_nif; } diff --git a/c_src/stats.c b/c_src/stats.c index d018c76..5583374 100644 --- a/c_src/stats.c +++ b/c_src/stats.c @@ -66,7 +66,6 @@ __stat_mean(struct stat *s) if (!s) return 0.0; - enif_mutex_lock(s->mutex); t = s->h; h = (s->h + 1) % s->num_samples; mean = 0; @@ -77,7 +76,6 @@ __stat_mean(struct stat *s) } if (mean > 0) mean /= (s->n < s->num_samples ? (double)s->n : (double)s->num_samples); - enif_mutex_unlock(s->mutex); return mean; } @@ -90,12 +88,10 @@ __stat_mean_log2(struct stat *s) if (!s) return 0.0; - enif_mutex_lock(s->mutex); for (i = 0; i < 64; i++) mean += (s->histogram[i] * i); if (mean > 0) mean /= (s->n < s->num_samples ? s->n : s->num_samples); - enif_mutex_unlock(s->mutex); return mean; } @@ -107,10 +103,8 @@ __stat_tick(struct stat *s) if (!s) return 0.0; - enif_mutex_lock(s->mutex); t = ts(s->d.unit); s->d.then = t; - enif_mutex_unlock(s->mutex); return t; } @@ -120,13 +114,11 @@ __stat_reset(struct stat *s) if (!s) return; - enif_mutex_lock(s->mutex); s->h = 0; s->d.unit = ns; s->d.then = 0; memset(s->histogram, 0, sizeof(uint64_t) * 64); memset(s->samples, 0, sizeof(uint64_t) * s->num_samples); - enif_mutex_unlock(s->mutex); } void @@ -134,13 +126,10 @@ __stat_add(struct stat *s, uint64_t elapsed) { uint32_t i; - enif_mutex_lock(s->mutex); if (s->n == s->num_samples) { s->mean = (s->mean + __stat_mean(s)) / 2.0; if (s->n >= 4294967295) { - enif_mutex_unlock(s->mutex); __stat_reset(s); - enif_mutex_lock(s->mutex); } } i = s->h; @@ -152,7 +141,6 @@ __stat_add(struct stat *s, uint64_t elapsed) s->max = elapsed; s->histogram[LOG2(elapsed)]++; s->n++; - enif_mutex_unlock(s->mutex); } void @@ -172,7 +160,6 @@ __stat_print_histogram(struct stat *s, const char *mod) if (!s) return; - enif_mutex_lock(s->mutex); m = (s->mean + __stat_mean(s) / 2.0); fprintf(stderr, "%s:async_nif request latency histogram:\n", mod); @@ -224,7 +211,6 @@ __stat_print_histogram(struct stat *s, const char *mod) fprintf(stderr, "\n"); } fflush(stderr); - enif_mutex_unlock(s->mutex); } void @@ -233,10 +219,7 @@ __stat_free(struct stat *s) if (!s) return; - enif_mutex_lock(s->mutex); enif_free(s->samples); - enif_mutex_unlock(s->mutex); - enif_mutex_destroy(s->mutex); enif_free(s); } @@ -253,6 +236,5 @@ __stat_init(uint32_t n) s->h = 0; s->num_samples = n; s->d.unit = ns; - s->mutex = enif_mutex_create(NULL); return s; } diff --git a/c_src/stats.h b/c_src/stats.h index f0e550f..c563491 100644 --- a/c_src/stats.h +++ b/c_src/stats.h @@ -30,7 +30,6 @@ extern "C" { #define STAT_DEF(name) struct stat *name ## _stat; struct stat { - ErlNifMutex *mutex; duration_t d; uint32_t h, n, num_samples; uint64_t min, max;