From 2672bab3ea0ae2c311373abd598397bdde0200ac Mon Sep 17 00:00:00 2001 From: Gregory Burd Date: Tue, 2 Jul 2013 19:58:00 -0400 Subject: [PATCH] Stats overhead due to hitting the clock and pulling a mutex caused a massive slowdown so now work is assigned to a new queue only when the candidate queue is deeper than the average of the other queues and threads are created only when the depth of the queue is larger than the number of threads working on that queue. --- c_src/async_nif.h | 39 +++++++-------------------------------- c_src/stats.c | 18 ------------------ c_src/stats.h | 1 - 3 files changed, 7 insertions(+), 51 deletions(-) diff --git a/c_src/async_nif.h b/c_src/async_nif.h index c18106c..425c0f1 100644 --- a/c_src/async_nif.h +++ b/c_src/async_nif.h @@ -27,7 +27,6 @@ extern "C" { #include #include "queue.h" -#include "stats.h" #ifndef UNUSED #define UNUSED(v) ((void)(v)) @@ -35,7 +34,7 @@ extern "C" { #define ASYNC_NIF_MAX_WORKERS 1024 #define ASYNC_NIF_MIN_WORKERS 2 -#define ASYNC_NIF_WORKER_QUEUE_SIZE 1000 +#define ASYNC_NIF_WORKER_QUEUE_SIZE 100 #define ASYNC_NIF_MAX_QUEUED_REQS ASYNC_NIF_WORKER_QUEUE_SIZE * ASYNC_NIF_MAX_WORKERS struct async_nif_req_entry { @@ -56,7 +55,6 @@ struct async_nif_work_queue { ErlNifMutex *reqs_mutex; ErlNifCond *reqs_cnd; STAILQ_HEAD(reqs, async_nif_req_entry) reqs; - STAT_DEF(work); }; struct async_nif_worker_entry { @@ -68,8 +66,6 @@ struct async_nif_worker_entry { }; struct async_nif_state { - STAT_DEF(wait); - STAT_DEF(service); unsigned int shutdown; ErlNifMutex *we_mutex; unsigned int we_active; @@ -349,26 +345,15 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en return 0; } - /* We've selected queue for this new request now check to make sure there are - enough workers actively processing requests on this queue. */ - if (q->num_workers < ASYNC_NIF_MIN_WORKERS) { - if (async_nif_start_worker(async_nif, q) == 0) q->num_workers++; - } else { - /* If more the 1/4 of the time it takes to complete a request is spent in - waiting to be serviced then we need more worker threads to service - requests. */ - double m, n; - m = __stat_mean(async_nif->wait_stat); - n = __stat_mean(q->work_stat); - DPRINTF("w: %f\ts: %f\t%f", m, n, n/(m+n)); - if (m && n && n / (m+n) < 0.75) - if (async_nif_start_worker(async_nif, q) == 0) q->num_workers++; - } - - /* And finally add the request to the queue. */ + /* Add the request to the queue. */ STAILQ_INSERT_TAIL(&q->reqs, req, entries); q->depth++; + /* We've selected a queue for this new request now check to make sure there are + enough workers actively processing requests on this queue. */ + if (q->depth > q->num_workers) + if (async_nif_start_worker(async_nif, q) == 0) q->num_workers++; + /* Build the term before releasing the lock so as not to race on the use of the req pointer (which will soon become invalid in another thread performing the request). */ @@ -423,13 +408,7 @@ async_nif_worker_fn(void *arg) enif_mutex_unlock(q->reqs_mutex); /* Perform the work. */ - uint64_t then = ts(q->work_stat->d.unit); - uint64_t wait = then - req->submitted; req->fn_work(req->env, req->ref, &req->pid, worker_id, req->args); - uint64_t work = ts(q->work_stat->d.unit) - then; - __stat_add(async_nif->wait_stat, wait); - __stat_add(q->work_stat, work); - __stat_add(async_nif->service_stat, wait + work); /* Now call the post-work cleanup function. */ req->fn_post(req->args); @@ -481,7 +460,6 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif) q = &async_nif->queues[i]; enif_mutex_unlock(q->reqs_mutex); } - __stat_print_histogram(async_nif->service_stat, "wterl"); /* Join for the now exiting worker threads. */ while(async_nif->we_active > 0) { @@ -586,15 +564,12 @@ async_nif_load() async_nif->recycled_req_mutex = enif_mutex_create(NULL); async_nif->we_mutex = enif_mutex_create(NULL); SLIST_INIT(&async_nif->we_joining); - async_nif->wait_stat = __stat_init(100000); - async_nif->service_stat = __stat_init(100000); for (i = 0; i < async_nif->num_queues; i++) { struct async_nif_work_queue *q = &async_nif->queues[i]; STAILQ_INIT(&q->reqs); q->reqs_mutex = enif_mutex_create(NULL); q->reqs_cnd = enif_cond_create(NULL); - q->work_stat = __stat_init(100000); } return async_nif; } diff --git a/c_src/stats.c b/c_src/stats.c index d018c76..5583374 100644 --- a/c_src/stats.c +++ b/c_src/stats.c @@ -66,7 +66,6 @@ __stat_mean(struct stat *s) if (!s) return 0.0; - enif_mutex_lock(s->mutex); t = s->h; h = (s->h + 1) % s->num_samples; mean = 0; @@ -77,7 +76,6 @@ __stat_mean(struct stat *s) } if (mean > 0) mean /= (s->n < s->num_samples ? (double)s->n : (double)s->num_samples); - enif_mutex_unlock(s->mutex); return mean; } @@ -90,12 +88,10 @@ __stat_mean_log2(struct stat *s) if (!s) return 0.0; - enif_mutex_lock(s->mutex); for (i = 0; i < 64; i++) mean += (s->histogram[i] * i); if (mean > 0) mean /= (s->n < s->num_samples ? s->n : s->num_samples); - enif_mutex_unlock(s->mutex); return mean; } @@ -107,10 +103,8 @@ __stat_tick(struct stat *s) if (!s) return 0.0; - enif_mutex_lock(s->mutex); t = ts(s->d.unit); s->d.then = t; - enif_mutex_unlock(s->mutex); return t; } @@ -120,13 +114,11 @@ __stat_reset(struct stat *s) if (!s) return; - enif_mutex_lock(s->mutex); s->h = 0; s->d.unit = ns; s->d.then = 0; memset(s->histogram, 0, sizeof(uint64_t) * 64); memset(s->samples, 0, sizeof(uint64_t) * s->num_samples); - enif_mutex_unlock(s->mutex); } void @@ -134,13 +126,10 @@ __stat_add(struct stat *s, uint64_t elapsed) { uint32_t i; - enif_mutex_lock(s->mutex); if (s->n == s->num_samples) { s->mean = (s->mean + __stat_mean(s)) / 2.0; if (s->n >= 4294967295) { - enif_mutex_unlock(s->mutex); __stat_reset(s); - enif_mutex_lock(s->mutex); } } i = s->h; @@ -152,7 +141,6 @@ __stat_add(struct stat *s, uint64_t elapsed) s->max = elapsed; s->histogram[LOG2(elapsed)]++; s->n++; - enif_mutex_unlock(s->mutex); } void @@ -172,7 +160,6 @@ __stat_print_histogram(struct stat *s, const char *mod) if (!s) return; - enif_mutex_lock(s->mutex); m = (s->mean + __stat_mean(s) / 2.0); fprintf(stderr, "%s:async_nif request latency histogram:\n", mod); @@ -224,7 +211,6 @@ __stat_print_histogram(struct stat *s, const char *mod) fprintf(stderr, "\n"); } fflush(stderr); - enif_mutex_unlock(s->mutex); } void @@ -233,10 +219,7 @@ __stat_free(struct stat *s) if (!s) return; - enif_mutex_lock(s->mutex); enif_free(s->samples); - enif_mutex_unlock(s->mutex); - enif_mutex_destroy(s->mutex); enif_free(s); } @@ -253,6 +236,5 @@ __stat_init(uint32_t n) s->h = 0; s->num_samples = n; s->d.unit = ns; - s->mutex = enif_mutex_create(NULL); return s; } diff --git a/c_src/stats.h b/c_src/stats.h index f0e550f..c563491 100644 --- a/c_src/stats.h +++ b/c_src/stats.h @@ -30,7 +30,6 @@ extern "C" { #define STAT_DEF(name) struct stat *name ## _stat; struct stat { - ErlNifMutex *mutex; duration_t d; uint32_t h, n, num_samples; uint64_t min, max;