Cache session/[{cursor, config}] for reuse and spawn threads when needed. #9

Merged
gburd merged 30 commits from gsb-ctx-cache into master 2013-07-03 12:31:15 +00:00
3 changed files with 7 additions and 51 deletions
Showing only changes of commit 2672bab3ea - Show all commits

View file

@ -27,7 +27,6 @@ extern "C" {
#include <assert.h>
#include "queue.h"
#include "stats.h"
#ifndef UNUSED
#define UNUSED(v) ((void)(v))
@ -35,7 +34,7 @@ extern "C" {
#define ASYNC_NIF_MAX_WORKERS 1024
#define ASYNC_NIF_MIN_WORKERS 2
#define ASYNC_NIF_WORKER_QUEUE_SIZE 1000
#define ASYNC_NIF_WORKER_QUEUE_SIZE 100
#define ASYNC_NIF_MAX_QUEUED_REQS ASYNC_NIF_WORKER_QUEUE_SIZE * ASYNC_NIF_MAX_WORKERS
struct async_nif_req_entry {
@ -56,7 +55,6 @@ struct async_nif_work_queue {
ErlNifMutex *reqs_mutex;
ErlNifCond *reqs_cnd;
STAILQ_HEAD(reqs, async_nif_req_entry) reqs;
STAT_DEF(work);
};
struct async_nif_worker_entry {
@ -68,8 +66,6 @@ struct async_nif_worker_entry {
};
struct async_nif_state {
STAT_DEF(wait);
STAT_DEF(service);
unsigned int shutdown;
ErlNifMutex *we_mutex;
unsigned int we_active;
@ -349,26 +345,15 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
return 0;
}
/* We've selected queue for this new request now check to make sure there are
enough workers actively processing requests on this queue. */
if (q->num_workers < ASYNC_NIF_MIN_WORKERS) {
if (async_nif_start_worker(async_nif, q) == 0) q->num_workers++;
} else {
/* If more the 1/4 of the time it takes to complete a request is spent in
waiting to be serviced then we need more worker threads to service
requests. */
double m, n;
m = __stat_mean(async_nif->wait_stat);
n = __stat_mean(q->work_stat);
DPRINTF("w: %f\ts: %f\t%f", m, n, n/(m+n));
if (m && n && n / (m+n) < 0.75)
if (async_nif_start_worker(async_nif, q) == 0) q->num_workers++;
}
/* And finally add the request to the queue. */
/* Add the request to the queue. */
STAILQ_INSERT_TAIL(&q->reqs, req, entries);
q->depth++;
/* We've selected a queue for this new request now check to make sure there are
enough workers actively processing requests on this queue. */
if (q->depth > q->num_workers)
if (async_nif_start_worker(async_nif, q) == 0) q->num_workers++;
/* Build the term before releasing the lock so as not to race on the use of
the req pointer (which will soon become invalid in another thread
performing the request). */
@ -423,13 +408,7 @@ async_nif_worker_fn(void *arg)
enif_mutex_unlock(q->reqs_mutex);
/* Perform the work. */
uint64_t then = ts(q->work_stat->d.unit);
uint64_t wait = then - req->submitted;
req->fn_work(req->env, req->ref, &req->pid, worker_id, req->args);
uint64_t work = ts(q->work_stat->d.unit) - then;
__stat_add(async_nif->wait_stat, wait);
__stat_add(q->work_stat, work);
__stat_add(async_nif->service_stat, wait + work);
/* Now call the post-work cleanup function. */
req->fn_post(req->args);
@ -481,7 +460,6 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
q = &async_nif->queues[i];
enif_mutex_unlock(q->reqs_mutex);
}
__stat_print_histogram(async_nif->service_stat, "wterl");
/* Join for the now exiting worker threads. */
while(async_nif->we_active > 0) {
@ -586,15 +564,12 @@ async_nif_load()
async_nif->recycled_req_mutex = enif_mutex_create(NULL);
async_nif->we_mutex = enif_mutex_create(NULL);
SLIST_INIT(&async_nif->we_joining);
async_nif->wait_stat = __stat_init(100000);
async_nif->service_stat = __stat_init(100000);
for (i = 0; i < async_nif->num_queues; i++) {
struct async_nif_work_queue *q = &async_nif->queues[i];
STAILQ_INIT(&q->reqs);
q->reqs_mutex = enif_mutex_create(NULL);
q->reqs_cnd = enif_cond_create(NULL);
q->work_stat = __stat_init(100000);
}
return async_nif;
}

View file

@ -66,7 +66,6 @@ __stat_mean(struct stat *s)
if (!s)
return 0.0;
enif_mutex_lock(s->mutex);
t = s->h;
h = (s->h + 1) % s->num_samples;
mean = 0;
@ -77,7 +76,6 @@ __stat_mean(struct stat *s)
}
if (mean > 0)
mean /= (s->n < s->num_samples ? (double)s->n : (double)s->num_samples);
enif_mutex_unlock(s->mutex);
return mean;
}
@ -90,12 +88,10 @@ __stat_mean_log2(struct stat *s)
if (!s)
return 0.0;
enif_mutex_lock(s->mutex);
for (i = 0; i < 64; i++)
mean += (s->histogram[i] * i);
if (mean > 0)
mean /= (s->n < s->num_samples ? s->n : s->num_samples);
enif_mutex_unlock(s->mutex);
return mean;
}
@ -107,10 +103,8 @@ __stat_tick(struct stat *s)
if (!s)
return 0.0;
enif_mutex_lock(s->mutex);
t = ts(s->d.unit);
s->d.then = t;
enif_mutex_unlock(s->mutex);
return t;
}
@ -120,13 +114,11 @@ __stat_reset(struct stat *s)
if (!s)
return;
enif_mutex_lock(s->mutex);
s->h = 0;
s->d.unit = ns;
s->d.then = 0;
memset(s->histogram, 0, sizeof(uint64_t) * 64);
memset(s->samples, 0, sizeof(uint64_t) * s->num_samples);
enif_mutex_unlock(s->mutex);
}
void
@ -134,13 +126,10 @@ __stat_add(struct stat *s, uint64_t elapsed)
{
uint32_t i;
enif_mutex_lock(s->mutex);
if (s->n == s->num_samples) {
s->mean = (s->mean + __stat_mean(s)) / 2.0;
if (s->n >= 4294967295) {
enif_mutex_unlock(s->mutex);
__stat_reset(s);
enif_mutex_lock(s->mutex);
}
}
i = s->h;
@ -152,7 +141,6 @@ __stat_add(struct stat *s, uint64_t elapsed)
s->max = elapsed;
s->histogram[LOG2(elapsed)]++;
s->n++;
enif_mutex_unlock(s->mutex);
}
void
@ -172,7 +160,6 @@ __stat_print_histogram(struct stat *s, const char *mod)
if (!s)
return;
enif_mutex_lock(s->mutex);
m = (s->mean + __stat_mean(s) / 2.0);
fprintf(stderr, "%s:async_nif request latency histogram:\n", mod);
@ -224,7 +211,6 @@ __stat_print_histogram(struct stat *s, const char *mod)
fprintf(stderr, "\n");
}
fflush(stderr);
enif_mutex_unlock(s->mutex);
}
void
@ -233,10 +219,7 @@ __stat_free(struct stat *s)
if (!s)
return;
enif_mutex_lock(s->mutex);
enif_free(s->samples);
enif_mutex_unlock(s->mutex);
enif_mutex_destroy(s->mutex);
enif_free(s);
}
@ -253,6 +236,5 @@ __stat_init(uint32_t n)
s->h = 0;
s->num_samples = n;
s->d.unit = ns;
s->mutex = enif_mutex_create(NULL);
return s;
}

View file

@ -30,7 +30,6 @@ extern "C" {
#define STAT_DEF(name) struct stat *name ## _stat;
struct stat {
ErlNifMutex *mutex;
duration_t d;
uint32_t h, n, num_samples;
uint64_t min, max;