Changed conditions for worker thread creation.

This commit is contained in:
Gregory Burd 2013-07-02 16:46:04 -04:00
parent 4300b3036f
commit 00e5889ac9
6 changed files with 228 additions and 185 deletions

View file

@ -25,6 +25,7 @@ extern "C" {
#endif
#include <assert.h>
#include "queue.h"
#include "stats.h"
@ -33,6 +34,7 @@ extern "C" {
#endif
#define ASYNC_NIF_MAX_WORKERS 1024
#define ASYNC_NIF_MIN_WORKERS 2
#define ASYNC_NIF_WORKER_QUEUE_SIZE 1000
#define ASYNC_NIF_MAX_QUEUED_REQS ASYNC_NIF_WORKER_QUEUE_SIZE * ASYNC_NIF_MAX_WORKERS
@ -43,17 +45,18 @@ struct async_nif_req_entry {
void *args;
void (*fn_work)(ErlNifEnv*, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *);
void (*fn_post)(void *);
uint64_t submitted;
STAILQ_ENTRY(async_nif_req_entry) entries;
};
struct async_nif_work_queue {
unsigned int num_workers;
unsigned int depth;
ErlNifMutex *reqs_mutex;
ErlNifCond *reqs_cnd;
STAILQ_HEAD(reqs, async_nif_req_entry) reqs;
STAT_DEF(qwait);
STAT_DEF(wt);
STAT_DEF(work);
};
struct async_nif_worker_entry {
@ -65,6 +68,8 @@ struct async_nif_worker_entry {
};
struct async_nif_state {
STAT_DEF(wait);
STAT_DEF(service);
unsigned int shutdown;
ErlNifMutex *we_mutex;
unsigned int we_active;
@ -103,10 +108,12 @@ struct async_nif_state {
argc -= 1; \
/* Note: !!! this assumes that the first element of priv_data is ours */ \
struct async_nif_state *async_nif = *(struct async_nif_state**)enif_priv_data(env); \
if (async_nif->shutdown) \
if (async_nif->shutdown) { \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "shutdown")); \
} \
req = async_nif_reuse_req(async_nif); \
req->submitted = ts(ns); \
if (!req) { \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "eagain")); \
@ -246,7 +253,6 @@ async_nif_start_worker(struct async_nif_state *async_nif, struct async_nif_work_
enif_mutex_lock(async_nif->we_mutex);
#if 0 // TODO:
we = SLIST_FIRST(&async_nif->we_joining);
while(we != NULL) {
struct async_nif_worker_entry *n = SLIST_NEXT(we, entries);
@ -257,7 +263,6 @@ async_nif_start_worker(struct async_nif_state *async_nif, struct async_nif_work_
async_nif->we_active--;
we = n;
}
#endif
if (async_nif->we_active == ASYNC_NIF_MAX_WORKERS) {
enif_mutex_unlock(async_nif->we_mutex);
@ -289,9 +294,8 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
{
/* Identify the most appropriate worker for this request. */
unsigned int i, qid = 0;
unsigned int n = async_nif->num_queues;
struct async_nif_work_queue *q = NULL;
double avg_wait_across_q, avg_wt_service_time, avg_wait_this_q = 0;
double avg_depth;
/* Either we're choosing a queue based on some affinity/hinted value or we
need to select the next queue in the rotation and atomically update that
@ -304,52 +308,66 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
async_nif->next_q = qid;
}
avg_wait_across_q = 0;
for (i = 0; i < async_nif->num_queues; i++) {
avg_wait_across_q += __stat_mean(async_nif->queues[i].qwait_stat);
}
if (avg_wait_across_q != 0)
avg_wait_across_q /= async_nif->num_queues;
/* Now we inspect and interate across the set of queues trying to select one
that isn't too full or too slow. */
do {
for (i = 0; i < async_nif->num_queues; i++) {
/* Compute the average queue depth not counting queues which are empty or
the queue we're considering right now. */
unsigned int j, n = 0;
for (j = 0; j < async_nif->num_queues; j++) {
if (j != qid && async_nif->queues[j].depth != 0) {
n++;
avg_depth += async_nif->queues[j].depth;
}
}
if (avg_depth != 0)
avg_depth /= n;
/* Lock this queue under consideration, then check for shutdown. While
we hold this lock either a) we're shutting down so exit now or b) this
queue will be valid until we release the lock. */
q = &async_nif->queues[qid];
enif_mutex_lock(q->reqs_mutex);
/* Now that we hold the lock, check for shutdown. As long as we hold
this lock either a) we're shutting down so exit now or b) this queue
will be valid until we release the lock. */
if (async_nif->shutdown) {
enif_mutex_unlock(q->reqs_mutex);
return 0;
}
avg_wait_this_q = __stat_mean(q->qwait_stat);
avg_wt_service_time = __stat_mean(q->wt_stat);
DPRINTF("q:%d w:%u %f/%f(%f) %c", qid, q->num_workers, avg_wait_this_q, avg_wait_across_q, avg_wt_service_time, avg_wait_this_q <= avg_wait_across_q ? 't' : 'f');
if (avg_wait_this_q <= avg_wait_across_q) break;
/* Try not to enqueue a request into a queue that isn't keeping up with
the request volume. */
if (q->depth <= avg_depth) break;
else {
enif_mutex_unlock(q->reqs_mutex);
qid = (qid + 1) % async_nif->num_queues;
q = &async_nif->queues[qid];
}
} while(n-- > 0);
}
if (n == 0) return 0; // All queues are full, trigger eagain
/* If the for loop finished then we didn't find a suitable queue for this
request, meaning we're backed up so trigger eagain. */
if (i == async_nif->num_queues) {
enif_mutex_unlock(q->reqs_mutex);
return 0;
}
/* We hold the queue's lock, and we've seletect a reasonable queue for this
new request now check to make sure there are enough workers actively
processing requests on this queue. */
if (q->num_workers == 0 || avg_wait_this_q >= avg_wt_service_time) {
if (async_nif_start_worker(async_nif, q) == 0)
q->num_workers++;
/* We've selected queue for this new request now check to make sure there are
enough workers actively processing requests on this queue. */
if (q->num_workers < ASYNC_NIF_MIN_WORKERS) {
if (async_nif_start_worker(async_nif, q) == 0) q->num_workers++;
} else {
/* If more the 1/4 of the time it takes to complete a request is spent in
waiting to be serviced then we need more worker threads to service
requests. */
double m, n;
m = __stat_mean(async_nif->wait_stat);
n = __stat_mean(q->work_stat);
DPRINTF("w: %f\ts: %f\t%f", m, n, n/(m+n));
if (m && n && n / (m+n) < 0.75)
if (async_nif_start_worker(async_nif, q) == 0) q->num_workers++;
}
/* And finally add the request to the queue. */
__stat_tick(q->qwait_stat);
STAILQ_INSERT_TAIL(&q->reqs, req, entries);
q->depth++;
/* Build the term before releasing the lock so as not to race on the use of
the req pointer (which will soon become invalid in another thread
@ -385,7 +403,7 @@ async_nif_worker_fn(void *arg)
}
if (STAILQ_EMPTY(&q->reqs)) {
/* Queue is empty so we wait for more work to arrive. */
if (q->num_workers > 2) {
if (q->num_workers > ASYNC_NIF_MIN_WORKERS) {
enif_mutex_unlock(q->reqs_mutex);
break;
} else {
@ -397,24 +415,28 @@ async_nif_worker_fn(void *arg)
reqs_mutex lock. Take the request off the queue. */
req = STAILQ_FIRST(&q->reqs);
STAILQ_REMOVE(&q->reqs, req, async_nif_req_entry, entries);
q->depth--;
/* Ensure that there is at least one other worker thread watching this
queue. */
enif_cond_signal(q->reqs_cnd);
enif_mutex_unlock(q->reqs_mutex);
__stat_tock(q->qwait_stat);
/* Perform the work. */
__stat_tick(q->wt_stat);
uint64_t then = ts(q->work_stat->d.unit);
uint64_t wait = then - req->submitted;
req->fn_work(req->env, req->ref, &req->pid, worker_id, req->args);
__stat_tock(q->wt_stat);
uint64_t work = ts(q->work_stat->d.unit) - then;
__stat_add(async_nif->wait_stat, wait);
__stat_add(q->work_stat, work);
__stat_add(async_nif->service_stat, wait + work);
/* Now call the post-work cleanup function. */
req->fn_post(req->args);
/* Clean up req for reuse. */
req->ref = 0;
req->submitted = 0;
req->fn_work = 0;
req->fn_post = 0;
enif_free(req->args);
@ -427,10 +449,6 @@ async_nif_worker_fn(void *arg)
SLIST_INSERT_HEAD(&async_nif->we_joining, we, entries);
enif_mutex_unlock(async_nif->we_mutex);
q->num_workers--;
if (q->num_workers == 0) {
__stat_reset(q->qwait_stat);
__stat_reset(q->wt_stat);
}
enif_thread_exit(0);
return 0;
}
@ -445,12 +463,11 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
struct async_nif_worker_entry *we = NULL;
UNUSED(env);
/* Signal the worker threads, stop what you're doing and exit. To
ensure that we don't race with the enqueue() process we first
lock all the worker queues, then set shutdown to true, then
unlock. The enqueue function will take the queue mutex, then
test for shutdown condition, then enqueue only if not shutting
down. */
/* Signal the worker threads, stop what you're doing and exit. To ensure
that we don't race with the enqueue() process we first lock all the worker
queues, then set shutdown to true, then unlock. The enqueue function will
take the queue mutex, then test for shutdown condition, then enqueue only
if not shutting down. */
for (i = 0; i < num_queues; i++) {
q = &async_nif->queues[i];
enif_mutex_lock(q->reqs_mutex);
@ -462,10 +479,9 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
for (i = 0; i < num_queues; i++) {
q = &async_nif->queues[i];
__stat_print_histogram(async_nif->queues[i].qwait_stat, "wterl q-wait");
enif_mutex_unlock(q->reqs_mutex);
}
__stat_print_histogram(async_nif->queues[i].qwait_stat, "wterl service time");
__stat_print_histogram(async_nif->service_stat, "wterl");
/* Join for the now exiting worker threads. */
while(async_nif->we_active > 0) {
@ -570,14 +586,15 @@ async_nif_load()
async_nif->recycled_req_mutex = enif_mutex_create(NULL);
async_nif->we_mutex = enif_mutex_create(NULL);
SLIST_INIT(&async_nif->we_joining);
async_nif->wait_stat = __stat_init(100000);
async_nif->service_stat = __stat_init(100000);
for (i = 0; i < async_nif->num_queues; i++) {
struct async_nif_work_queue *q = &async_nif->queues[i];
STAILQ_INIT(&q->reqs);
q->reqs_mutex = enif_mutex_create(NULL);
q->reqs_cnd = enif_cond_create(NULL);
q->qwait_stat = __stat_init(100000);
q->wt_stat = __stat_init(100000);
q->work_stat = __stat_init(100000);
}
return async_nif;
}

View file

@ -66,6 +66,7 @@ __stat_mean(struct stat *s)
if (!s)
return 0.0;
enif_mutex_lock(s->mutex);
t = s->h;
h = (s->h + 1) % s->num_samples;
mean = 0;
@ -76,6 +77,7 @@ __stat_mean(struct stat *s)
}
if (mean > 0)
mean /= (s->n < s->num_samples ? (double)s->n : (double)s->num_samples);
enif_mutex_unlock(s->mutex);
return mean;
}
@ -88,11 +90,12 @@ __stat_mean_log2(struct stat *s)
if (!s)
return 0.0;
enif_mutex_lock(s->mutex);
for (i = 0; i < 64; i++)
mean += (s->histogram[i] * i);
if (mean > 0)
mean /= (s->n < s->num_samples ? s->n : s->num_samples);
DPRINTF("n: %u %llu %f", s->n < 64 ? s->n : 64, PRIuint64(s), mean);
enif_mutex_unlock(s->mutex);
return mean;
}
@ -104,8 +107,10 @@ __stat_tick(struct stat *s)
if (!s)
return 0.0;
enif_mutex_lock(s->mutex);
t = ts(s->d.unit);
s->d.then = t;
enif_mutex_unlock(s->mutex);
return t;
}
@ -115,33 +120,30 @@ __stat_reset(struct stat *s)
if (!s)
return;
enif_mutex_lock(s->mutex);
s->h = 0;
s->d.unit = ns;
s->d.then = 0;
memset(s->histogram, 0, sizeof(uint64_t) * 64);
memset(s->samples, 0, sizeof(uint64_t) * s->num_samples);
enif_mutex_unlock(s->mutex);
}
uint64_t
__stat_tock(struct stat *s)
void
__stat_add(struct stat *s, uint64_t elapsed)
{
uint64_t now;
uint64_t elapsed;
uint32_t i;
duration_t *d;
if (!s)
return 0.0;
d = &s->d;
now = ts(d->unit);
elapsed = now - d->then;
i = s->h;
enif_mutex_lock(s->mutex);
if (s->n == s->num_samples) {
s->mean = (s->mean + __stat_mean(s)) / 2.0;
if (s->n >= 4294967295)
if (s->n >= 4294967295) {
enif_mutex_unlock(s->mutex);
__stat_reset(s);
enif_mutex_lock(s->mutex);
}
}
i = s->h;
s->h = (s->h + 1) % s->num_samples;
s->samples[i] = elapsed;
if (elapsed != 0 && elapsed < s->min)
@ -150,8 +152,14 @@ __stat_tock(struct stat *s)
s->max = elapsed;
s->histogram[LOG2(elapsed)]++;
s->n++;
d->then = ts(d->unit);
return elapsed;
enif_mutex_unlock(s->mutex);
}
void
__stat_tock(struct stat *s)
{
if (s)
__stat_add(s, ts(s->d.unit));
}
void
@ -164,6 +172,7 @@ __stat_print_histogram(struct stat *s, const char *mod)
if (!s)
return;
enif_mutex_lock(s->mutex);
m = (s->mean + __stat_mean(s) / 2.0);
fprintf(stderr, "%s:async_nif request latency histogram:\n", mod);
@ -215,6 +224,7 @@ __stat_print_histogram(struct stat *s, const char *mod)
fprintf(stderr, "\n");
}
fflush(stderr);
enif_mutex_unlock(s->mutex);
}
void
@ -223,7 +233,10 @@ __stat_free(struct stat *s)
if (!s)
return;
enif_mutex_lock(s->mutex);
enif_free(s->samples);
enif_mutex_unlock(s->mutex);
enif_mutex_destroy(s->mutex);
enif_free(s);
}
@ -240,5 +253,6 @@ __stat_init(uint32_t n)
s->h = 0;
s->num_samples = n;
s->d.unit = ns;
s->mutex = enif_mutex_create(NULL);
return s;
}

View file

@ -30,6 +30,7 @@ extern "C" {
#define STAT_DEF(name) struct stat *name ## _stat;
struct stat {
ErlNifMutex *mutex;
duration_t d;
uint32_t h, n, num_samples;
uint64_t min, max;
@ -41,8 +42,9 @@ struct stat {
extern double __stat_mean(struct stat *s);
extern double __stat_mean_log2(struct stat *s);
extern uint64_t __stat_tick(struct stat *s);
extern void __stat_add(struct stat *s, uint64_t d);
extern void __stat_reset(struct stat *s);
extern uint64_t __stat_tock(struct stat *s);
extern void __stat_tock(struct stat *s);
extern void __stat_print_histogram(struct stat *s, const char *mod);
extern void __stat_free(struct stat *s);
extern struct stat *__stat_init(uint32_t n);

View file

@ -197,10 +197,12 @@ __ctx_cache_evict(WterlConnHandle *conn_handle)
uint64_t now, elapsed;
struct wterl_ctx *c, *n;
#ifndef DEBUG
if (conn_handle->cache_size < MAX_CACHE_SIZE)
return 0;
#endif
now = cpu_clock_ticks();
now = ts(ns);
// Find the mean of the recorded times that items stayed in cache.
mean = 0;
@ -226,6 +228,7 @@ __ctx_cache_evict(WterlConnHandle *conn_handle)
if (log > mean) {
STAILQ_REMOVE(&conn_handle->cache, c, wterl_ctx, entries);
DPRINTF("evicting: %llu", PRIuint64(c->sig));
if (c->session)
c->session->close(c->session, NULL);
enif_free(c);
num_evicted++;
@ -256,22 +259,15 @@ __ctx_cache_find(WterlConnHandle *conn_handle, const uint64_t sig)
if (c->sig == sig) { // TODO: hash collisions *will* lead to SEGVs
// cache hit:
STAILQ_REMOVE(&conn_handle->cache, c, wterl_ctx, entries);
conn_handle->histogram[__log2(cpu_clock_ticks() - c->tstamp)]++;
conn_handle->histogram[__log2(ts(ns) - c->tstamp)]++;
conn_handle->histogram_count++;
conn_handle->cache_size -= 1;
break;
}
c = STAILQ_NEXT(c, entries);
}
#ifdef DEBUG
uint32_t sz = 0;
struct wterl_ctx *f;
STAILQ_FOREACH(f, &conn_handle->cache, entries) {
sz++;
}
#endif
enif_mutex_unlock(conn_handle->cache_mutex);
DPRINTF("cache_find: [%u:%u] %s (%p)", sz, conn_handle->cache_size, c ? "hit" : "miss", c);
DPRINTF("cache_find: [%u] %s (%p)", conn_handle->cache_size, c ? "hit" : "miss", c);
return c;
}
@ -286,7 +282,7 @@ __ctx_cache_add(WterlConnHandle *conn_handle, struct wterl_ctx *c)
{
enif_mutex_lock(conn_handle->cache_mutex);
__ctx_cache_evict(conn_handle);
c->tstamp = cpu_clock_ticks();
c->tstamp = ts(ns);
STAILQ_INSERT_TAIL(&conn_handle->cache, c, entries);
conn_handle->cache_size += 1;
#ifdef DEBUG
@ -2309,7 +2305,7 @@ static void __wterl_conn_dtor(ErlNifEnv* env, void* obj)
WterlConnHandle *conn_handle = (WterlConnHandle *)obj;
if (conn_handle->cache_mutex) {
DPRINTF("Non-NULL conn_handle (%p) to free", obj);
DPRINTF("conn_handle dtor free'ing (%p)", obj);
enif_mutex_lock(conn_handle->cache_mutex);
__close_all_sessions(conn_handle);
conn_handle->conn->close(conn_handle->conn, NULL);

View file

@ -26,9 +26,6 @@ async_nif_enqueue(R, F, A) ->
case erlang:apply(F, [R|A]) of
{ok, enqueued} ->
receive
{R, {error, eagain}} ->
%% Work unit was not queued, try again.
async_nif_enqueue(R, F, A);
{R, {error, shutdown}=Error} ->
%% Work unit was queued, but not executed.
Error;
@ -38,6 +35,11 @@ async_nif_enqueue(R, F, A) ->
{R, Reply} ->
Reply
end;
{error, eagain} ->
%% Work unit was not queued, try again.
async_nif_enqueue(R, F, A);
%{error, enomem} ->
%{error, shutdown} ->
Other ->
Other
end.

View file

@ -524,7 +524,7 @@ set_event_handler_pid(Pid)
-define(TEST_DATA_DIR, "test/wterl.basic").
open_test_conn(DataDir) ->
open_test_conn(DataDir, [{create,true},{cache_size,"100MB"},{session_max, 8192}]).
open_test_conn(DataDir, [{create,true},{cache_size,"1GB"},{session_max, 8192}]).
open_test_conn(DataDir, OpenConfig) ->
{ok, CWD} = file:get_cwd(),
rmdir:path(filename:join([CWD, DataDir])), %?cmd("rm -rf " ++ filename:join([CWD, DataDir])),
@ -606,7 +606,7 @@ insert_delete_test() ->
many_open_tables_test_() ->
{timeout, 120,
fun() ->
ConnOpts = [{create,true},{cache_size,"100MB"},{session_max, 8192}],
ConnOpts = [{create,true},{cache_size,"1GB"},{session_max, 8192}],
DataDir = ?TEST_DATA_DIR,
KeyGen =
fun(X) ->
@ -620,19 +620,31 @@ many_open_tables_test_() ->
fun(X) ->
"lsm:" ++ integer_to_list(X)
end,
N = 1000,
NumTables = 16, N = 100,
ConnRef = open_test_conn(DataDir, ConnOpts),
Parent = self(),
[wterl:create(ConnRef, TableNameGen(X), [{checksum, "uncompressed"}]) || X <- lists:seq(0, 128)],
[ok = wterl:create(ConnRef, TableNameGen(X), [{checksum, "uncompressed"}]) || X <- lists:seq(0, NumTables)],
[spawn(fun() ->
TableName = TableNameGen(X),
[wterl:put(ConnRef, TableName, KeyGen(P), ValGen()) || P <- lists:seq(1, N)],
[wterl:get(ConnRef, TableName, KeyGen(P)) || P <- lists:seq(1, N)],
[wterl:delete(ConnRef, TableName, KeyGen(P)) || P <- lists:seq(1, N)],
[case wterl:put(ConnRef, TableName, KeyGen(P), ValGen()) of
ok -> ok;
{error, {enoent, _}} -> io:format("put failed, table missing ~p~n", [TableName])
end || P <- lists:seq(1, N)],
[case wterl:get(ConnRef, TableName, KeyGen(P)) of
{ok, _} -> ok;
{error, {enoent, _}} -> io:format("get failed, table missing ~p~n", [TableName])
end || P <- lists:seq(1, N)],
[case wterl:delete(ConnRef, TableName, KeyGen(P)) of
ok -> ok;
{error, {enoent, _}} -> io:format("delete failed, table missing ~p~n", [TableName])
end || P <- lists:seq(1, N)],
Parent ! done
end) || X <- lists:seq(0, 128)],
[wterl:drop(ConnRef, TableNameGen(X)) || X <- lists:seq(0, 128)],
[receive done -> ok end || _ <- lists:seq(0, 128)],
end) || X <- lists:seq(0, NumTables)],
[receive done -> ok end || _ <- lists:seq(0, NumTables)],
[case wterl:drop(ConnRef, TableNameGen(X)) of
ok -> ok;
{error, {enoent, _}} -> io:format("drop failed, table missing ~p~n", [TableNameGen(X)])
end || X <- lists:seq(0, NumTables)],
ok = wterl:connection_close(ConnRef)
end}.