Worker threads come and go as needed with a lower bound of 2 and an

upper bound of ASYNC_NIF_MAX_WORKERS.  Stats were improved to use
thread local storage for measures.  With stats working again wterl
uses them to determine who to evict.  Wterl's signature calculation
for an operation wasn't correct and so the cache wasn't efficient at
all, this has been fixed.
This commit is contained in:
Gregory Burd 2013-06-25 13:31:43 -04:00
parent a3c54b1610
commit c41e411a92
7 changed files with 453 additions and 361 deletions

View file

@ -26,6 +26,7 @@ extern "C" {
#include <assert.h>
#include "fifo_q.h"
#include "queue.h"
#include "stats.h"
#ifndef UNUSED
@ -33,11 +34,9 @@ extern "C" {
#endif
#define ASYNC_NIF_MAX_WORKERS 1024
#define ASYNC_NIF_WORKER_QUEUE_SIZE 2000
#define ASYNC_NIF_WORKER_QUEUE_SIZE 1000
#define ASYNC_NIF_MAX_QUEUED_REQS ASYNC_NIF_WORKER_QUEUE_SIZE * ASYNC_NIF_MAX_WORKERS
STAT_DECL(qwait, 1000);
struct async_nif_req_entry {
ERL_NIF_TERM ref;
ErlNifEnv *env;
@ -45,12 +44,12 @@ struct async_nif_req_entry {
void *args;
void (*fn_work)(ErlNifEnv*, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *);
void (*fn_post)(void *);
const char *func;
};
DECL_FIFO_QUEUE(reqs, struct async_nif_req_entry);
struct async_nif_work_queue {
STAT_DEF(qwait);
unsigned int workers;
ErlNifMutex *reqs_mutex;
ErlNifCond *reqs_cnd;
FIFO_QUEUE_TYPE(reqs) reqs;
@ -61,13 +60,15 @@ struct async_nif_worker_entry {
unsigned int worker_id;
struct async_nif_state *async_nif;
struct async_nif_work_queue *q;
SLIST_ENTRY(async_nif_worker_entry) entries;
};
struct async_nif_state {
STAT_DEF(qwait);
unsigned int shutdown;
unsigned int num_workers;
struct async_nif_worker_entry worker_entries[ASYNC_NIF_MAX_WORKERS];
ErlNifMutex *we_mutex;
unsigned int we_active;
SLIST_HEAD(joining, async_nif_worker_entry) we_joining;
unsigned int num_queues;
unsigned int next_q;
FIFO_QUEUE_TYPE(reqs) recycled_reqs;
@ -107,7 +108,6 @@ struct async_nif_state {
enif_make_atom(env, "shutdown")); \
req = async_nif_reuse_req(async_nif); \
if (!req) { \
async_nif_recycle_req(req, async_nif); \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "eagain")); \
} \
@ -128,7 +128,6 @@ struct async_nif_state {
req->args = (void*)copy_of_args; \
req->fn_work = (void (*)(ErlNifEnv *, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *))fn_work_ ## decl ; \
req->fn_post = (void (*)(void *))fn_post_ ## decl; \
req->func = __func__; \
int h = -1; \
if (affinity) \
h = affinity % async_nif->num_queues; \
@ -195,12 +194,12 @@ async_nif_reuse_req(struct async_nif_state *async_nif)
if (req) {
memset(req, 0, sizeof(struct async_nif_req_entry));
env = enif_alloc_env();
if (!env) {
enif_free(req);
req = NULL;
} else {
if (env) {
req->env = env;
async_nif->num_reqs++;
} else {
enif_free(req);
req = NULL;
}
}
}
@ -208,7 +207,7 @@ async_nif_reuse_req(struct async_nif_state *async_nif)
req = fifo_q_get(reqs, async_nif->recycled_reqs);
}
enif_mutex_unlock(async_nif->recycled_req_mutex);
STAT_TICK(async_nif, qwait);
__stat_tick(async_nif->qwait_stat);
return req;
}
@ -223,16 +222,61 @@ void
async_nif_recycle_req(struct async_nif_req_entry *req, struct async_nif_state *async_nif)
{
ErlNifEnv *env = NULL;
STAT_TOCK(async_nif, qwait);
__stat_tock(async_nif->qwait_stat);
enif_mutex_lock(async_nif->recycled_req_mutex);
enif_clear_env(req->env);
env = req->env;
enif_clear_env(env);
memset(req, 0, sizeof(struct async_nif_req_entry));
req->env = env;
fifo_q_put(reqs, async_nif->recycled_reqs, req);
enif_mutex_unlock(async_nif->recycled_req_mutex);
}
static void *async_nif_worker_fn(void *);
/**
* Start up a worker thread.
*/
static int
async_nif_start_worker(struct async_nif_state *async_nif, struct async_nif_work_queue *q)
{
struct async_nif_worker_entry *we;
if (0 == q)
return EINVAL;
enif_mutex_lock(async_nif->we_mutex);
we = SLIST_FIRST(&async_nif->we_joining);
while(we != NULL) {
struct async_nif_worker_entry *n = SLIST_NEXT(we, entries);
SLIST_REMOVE_HEAD(&async_nif->we_joining, entries);
void *exit_value = 0; /* We ignore the thread_join's exit value. */
enif_thread_join(we->tid, &exit_value);
enif_free(we);
async_nif->we_active--;
we = n;
}
if (async_nif->we_active == ASYNC_NIF_MAX_WORKERS) {
enif_mutex_unlock(async_nif->we_mutex);
return EAGAIN;
}
we = enif_alloc(sizeof(struct async_nif_worker_entry));
if (!we) {
enif_mutex_unlock(async_nif->we_mutex);
return ENOMEM;
}
memset(we, 0, sizeof(struct async_nif_worker_entry));
we->worker_id = async_nif->we_active++;
we->async_nif = async_nif;
we->q = q;
enif_mutex_unlock(async_nif->we_mutex);
return enif_thread_create(NULL,&we->tid, &async_nif_worker_fn, (void*)we, 0);
}
/**
* Enqueue a request for processing by a worker thread.
*
@ -244,7 +288,10 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
{
/* Identify the most appropriate worker for this request. */
unsigned int qid = 0;
unsigned int n = async_nif->num_queues;
struct async_nif_work_queue *q = NULL;
double await = 0;
double await_inthisq = 0;
/* Either we're choosing a queue based on some affinity/hinted value or we
need to select the next queue in the rotation and atomically update that
@ -257,12 +304,6 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
async_nif->next_q = qid;
}
q = &async_nif->queues[qid];
enif_mutex_lock(q->reqs_mutex);
#if 0 // stats aren't yet thread safe, so this can go haywire... TODO: fix.
unsigned int n = async_nif->num_queues;
/* Now we inspect and interate across the set of queues trying to select one
that isn't too full or too slow. */
do {
@ -277,8 +318,8 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
return 0;
}
if (!fifo_q_full(reqs, q->reqs)) {
double await = STAT_MEAN_LOG2_SAMPLE(async_nif, qwait);
double await_inthisq = STAT_MEAN_LOG2_SAMPLE(q, qwait);
await = __stat_mean_log2(async_nif->qwait_stat);
await_inthisq = __stat_mean_log2(q->qwait_stat);
if (await_inthisq > await) {
enif_mutex_unlock(q->reqs_mutex);
qid = (qid + 1) % async_nif->num_queues;
@ -288,13 +329,18 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
break;
}
}
// TODO: at some point add in work sheading/stealing
} while(n-- > 0);
#endif
/* We hold the queue's lock, and we've seletect a reasonable queue for this
new request so add the request. */
STAT_TICK(q, qwait);
new request now check to make sure there are enough workers actively
processing requests on this queue. */
if (q->workers < 2 || await_inthisq > await) {
if (async_nif_start_worker(async_nif, q) == 0)
q->workers++;
}
/* And finally add the request to the queue. */
__stat_tick(q->qwait_stat);
fifo_q_put(reqs, q->reqs, req);
/* Build the term before releasing the lock so as not to race on the use of
@ -331,9 +377,14 @@ async_nif_worker_fn(void *arg)
}
if (fifo_q_empty(reqs, q->reqs)) {
/* Queue is empty so we wait for more work to arrive. */
STAT_RESET(q, qwait);
__stat_reset(q->qwait_stat);
if (q->workers > 2) {
enif_mutex_unlock(q->reqs_mutex);
break;
} else {
enif_cond_wait(q->reqs_cnd, q->reqs_mutex);
goto check_again_for_work;
}
} else {
assert(fifo_q_size(reqs, q->reqs) > 0);
assert(fifo_q_size(reqs, q->reqs) < fifo_q_capacity(reqs, q->reqs));
@ -348,7 +399,7 @@ async_nif_worker_fn(void *arg)
/* Perform the work. */
req->fn_work(req->env, req->ref, &req->pid, worker_id, req->args);
STAT_TOCK(q, qwait);
__stat_tock(q->qwait_stat);
/* Now call the post-work cleanup function. */
req->fn_post(req->args);
@ -363,6 +414,10 @@ async_nif_worker_fn(void *arg)
req = NULL;
}
}
enif_mutex_lock(async_nif->we_mutex);
SLIST_INSERT_HEAD(&async_nif->we_joining, we, entries);
enif_mutex_unlock(async_nif->we_mutex);
q->workers--;
enif_thread_exit(0);
return 0;
}
@ -374,9 +429,10 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
unsigned int num_queues = async_nif->num_queues;
struct async_nif_work_queue *q = NULL;
struct async_nif_req_entry *req = NULL;
struct async_nif_worker_entry *we = NULL;
UNUSED(env);
STAT_PRINT(async_nif, qwait, "wterl");
__stat_print_histogram(async_nif->qwait_stat, "wterl");
/* Signal the worker threads, stop what you're doing and exit. To
ensure that we don't race with the enqueue() process we first
@ -393,19 +449,29 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
executing requests. */
async_nif->shutdown = 1;
/* Make sure to wake up all worker threads sitting on conditional
wait for work so that they can see it's time to exit. */
for (i = 0; i < num_queues; i++) {
q = &async_nif->queues[i];
enif_cond_broadcast(q->reqs_cnd);
enif_mutex_unlock(q->reqs_mutex);
}
/* Join for the now exiting worker threads. */
for (i = 0; i < async_nif->num_workers; ++i) {
while(async_nif->we_active > 0) {
for (i = 0; i < num_queues; i++)
enif_cond_broadcast(async_nif->queues[i].reqs_cnd);
we = SLIST_FIRST(&async_nif->we_joining);
while(we != NULL) {
struct async_nif_worker_entry *n = SLIST_NEXT(we, entries);
SLIST_REMOVE_HEAD(&async_nif->we_joining, entries);
void *exit_value = 0; /* We ignore the thread_join's exit value. */
enif_thread_join(async_nif->worker_entries[i].tid, &exit_value);
enif_thread_join(we->tid, &exit_value);
enif_free(we);
async_nif->we_active--;
we = n;
}
}
enif_mutex_destroy(async_nif->we_mutex);
/* Cleanup in-flight requests, mutexes and conditions in each work queue. */
for (i = 0; i < num_queues; i++) {
@ -447,7 +513,7 @@ static void *
async_nif_load()
{
static int has_init = 0;
unsigned int i, j, num_queues;
unsigned int i, num_queues;
ErlNifSysInfo info;
struct async_nif_state *async_nif;
@ -480,54 +546,21 @@ async_nif_load()
sizeof(struct async_nif_work_queue) * num_queues);
async_nif->num_queues = num_queues;
async_nif->num_workers = ASYNC_NIF_MAX_WORKERS;
async_nif->we_active = 0;
async_nif->next_q = 0;
async_nif->shutdown = 0;
async_nif->recycled_reqs = fifo_q_new(reqs, ASYNC_NIF_MAX_QUEUED_REQS);
async_nif->recycled_req_mutex = enif_mutex_create(NULL);
STAT_INIT(async_nif, qwait);
async_nif->qwait_stat = __stat_init(1000);
async_nif->we_mutex = enif_mutex_create(NULL);
SLIST_INIT(&async_nif->we_joining);
for (i = 0; i < async_nif->num_queues; i++) {
struct async_nif_work_queue *q = &async_nif->queues[i];
q->reqs = fifo_q_new(reqs, ASYNC_NIF_WORKER_QUEUE_SIZE);
q->reqs_mutex = enif_mutex_create(NULL);
q->reqs_cnd = enif_cond_create(NULL);
STAT_INIT(q, qwait);
}
/* Setup the thread pool management. */
memset(async_nif->worker_entries, 0, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS);
/* Start the worker threads. */
for (i = 0; i < async_nif->num_workers; i++) {
struct async_nif_worker_entry *we = &async_nif->worker_entries[i];
we->async_nif = async_nif;
we->worker_id = i;
we->q = &async_nif->queues[i % async_nif->num_queues];
if (enif_thread_create(NULL, &async_nif->worker_entries[i].tid,
&async_nif_worker_fn, (void*)we, NULL) != 0) {
async_nif->shutdown = 1;
for (j = 0; j < async_nif->num_queues; j++) {
struct async_nif_work_queue *q = &async_nif->queues[j];
enif_cond_broadcast(q->reqs_cnd);
}
while(i-- > 0) {
void *exit_value = 0; /* Ignore this. */
enif_thread_join(async_nif->worker_entries[i].tid, &exit_value);
}
for (j = 0; j < async_nif->num_queues; j++) {
struct async_nif_work_queue *q = &async_nif->queues[j];
enif_mutex_destroy(q->reqs_mutex);
enif_cond_destroy(q->reqs_cnd);
}
memset(async_nif->worker_entries, 0, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS);
enif_free(async_nif);
return NULL;
}
q->qwait_stat = __stat_init(1000);
}
return async_nif;
}

View file

@ -36,13 +36,12 @@ get_wt ()
(cd $BASEDIR/$WT_DIR && git pull -u) || exit 1
else
if [ "X$WT_REF" != "X" ]; then
git clone ${WT_REPO} && \
(cd $BASEDIR/wiredtiger && git checkout refs/$WT_REF || exit 1)
git clone ${WT_REPO} ${WT_DIR} && \
(cd $BASEDIR/$WT_DIR && git checkout refs/$WT_REF || exit 1)
else
git clone ${WT_REPO} && \
(cd $BASEDIR/wiredtiger && git checkout -b $WT_BRANCH origin/$WT_BRANCH || exit 1)
git clone ${WT_REPO} ${WT_DIR} && \
(cd $BASEDIR/$WT_DIR && git checkout -b $WT_BRANCH origin/$WT_BRANCH || exit 1)
fi
mv wiredtiger $WT_DIR || exit 1
fi
[ -d $BASEDIR/$WT_DIR ] || (echo "Missing WiredTiger source directory" && exit 1)
(cd $BASEDIR/$WT_DIR

View file

@ -69,8 +69,8 @@ do { \
__val = __newval; \
} while ( 0 )
#define ALIGNED_ENIF_ALLOC(_s) \
((void *)(((unsigned long)enif_alloc((_s)+CACHE_LINE_SIZE*2) + \
#define CACHE_ALIGNED_SIZEOF(_s) \
((sizeof(_s)) + CACHE_LINE_SIZE*2) + \
CACHE_LINE_SIZE - 1) & ~(CACHE_LINE_SIZE-1))) \
/*

View file

@ -19,7 +19,7 @@
#endif
void current_utc_time(struct timespec *ts)
static inline void current_utc_time(struct timespec *ts)
{
#ifdef __MACH__ // OS X does not have clock_gettime, use clock_get_time
clock_serv_t cclock;

260
c_src/stats.c Normal file
View file

@ -0,0 +1,260 @@
/*
* stats:
*
* Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved.
* Author: Gregory Burd <greg@basho.com> <greg@burd.me>
*
* This file is provided to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <inttypes.h>
#include "erl_nif.h"
#include "erl_driver.h"
#include "common.h"
#include "duration.h"
#include "stats.h"
/**
* Calculate the log2 of 64bit unsigned integers.
*/
#ifdef __GCC__
#define LOG2(X) ((unsigned) ((8 * (sizeof(uint64_t) - 1)) - __builtin_clzll((X))))
#else
static unsigned int __log2_64(uint64_t x) {
static const int tab64[64] = {
63, 0, 58, 1, 59, 47, 53, 2,
60, 39, 48, 27, 54, 33, 42, 3,
61, 51, 37, 40, 49, 18, 28, 20,
55, 30, 34, 11, 43, 14, 22, 4,
62, 57, 46, 52, 38, 26, 32, 41,
50, 36, 17, 19, 29, 10, 13, 21,
56, 45, 25, 31, 35, 16, 9, 12,
44, 24, 15, 8, 23, 7, 6, 5};
if (x == 0) return 0;
uint64_t v = x;
v |= v >> 1;
v |= v >> 2;
v |= v >> 4;
v |= v >> 8;
v |= v >> 16;
v |= v >> 32;
return tab64[((uint64_t)((v - (v >> 1)) * 0x07EDD5E59A4E28C2)) >> 58];
}
#define LOG2(X) __log2_64(X)
#endif
double
__stat_mean(struct stat *s)
{
uint32_t t, h;
double mean;
if (!s)
return 0.0;
t = s->h;
h = (s->h + 1) % s->num_samples;
mean = 0;
while (h != t) {
mean += s->samples[h];
h = (h + 1) % s->num_samples;
}
if (mean > 0)
mean /= (double)(s->n < s->num_samples ? s->n : s->num_samples);
return mean;
}
double
__stat_mean_log2(struct stat *s)
{
uint32_t i;
double mean;
if (!s)
return 0.0;
mean = 0;
for (i = 0; i < 64; i++)
mean += (s->histogram[i] * i);
if (mean > 0)
mean /= (double)s->n;
return mean;
}
uint64_t
__stat_tick(struct stat *s)
{
duration_t *d;
uint64_t t;
if (!s)
return 0.0;
d = (duration_t*)erl_drv_tsd_get(s->duration_key);
if (!d) {
if ((d = enif_alloc(sizeof(duration_t))) == NULL)
return 0;
memset(d, 0, sizeof(duration_t));
erl_drv_tsd_set(s->duration_key, d);
}
t = ts(d->unit);
d->then = t;
return t;
}
void
__stat_reset(struct stat *s)
{
duration_t *d;
if (!s)
return;
s->min = ~0;
s->max = 0;
s->h = 0;
memset(s->histogram, 0, sizeof(uint64_t) * 64);
memset(s->samples, 0, sizeof(uint64_t) * s->num_samples);
d = (duration_t*)erl_drv_tsd_get(s->duration_key);
if (d)
d->then = 0;
}
uint64_t
__stat_tock(struct stat *s)
{
uint64_t now;
uint64_t elapsed;
uint32_t i;
duration_t *d;
if (!s)
return 0.0;
d = (duration_t*)erl_drv_tsd_get(s->duration_key);
if (!d)
return 0;
now = ts(d->unit);
elapsed = now - d->then;
i = s->h;
if (s->n == s->num_samples) {
s->mean = (s->mean + __stat_mean(s)) / 2.0;
if (s->n >= 4294967295)
__stat_reset(s);
}
s->h = (s->h + 1) % s->num_samples;
s->samples[i] = elapsed;
if (elapsed < s->min)
s->min = elapsed;
if (elapsed > s->max)
s->max = elapsed;
s->histogram[LOG2(elapsed)]++;
s->n++;
d->then = ts(d->unit);
return elapsed;
}
void
__stat_print_histogram(struct stat *s, const char *mod)
{
uint8_t logs[64];
uint8_t i, j, max_log = 0;
double m;
if (!s)
return;
m = (s->mean + __stat_mean(s) / 2.0);
fprintf(stderr, "%s:async_nif request latency histogram:\n", mod);
for (i = 0; i < 64; i++) {
logs[i] = LOG2(s->histogram[i]);
if (logs[i] > max_log)
max_log = logs[i];
}
for (i = max_log; i > 0; i--) {
if (!(i % 10))
fprintf(stderr, "2^%2d ", i);
else
fprintf(stderr, " ");
for(j = 0; j < 64; j++)
fprintf(stderr, logs[j] >= i ? "" : " ");
fprintf(stderr, "\n");
}
if (max_log == 0) {
fprintf(stderr, "[empty]\n");
} else {
fprintf(stderr, " ns μs ms s ks\n");
fprintf(stderr, "min: ");
if (s->min < 1000)
fprintf(stderr, "%llu (ns)", PRIuint64(s->min));
else if (s->min < 1000000)
fprintf(stderr, "%.2f (μs)", s->min / 1000.0);
else if (s->min < 1000000000)
fprintf(stderr, "%.2f (ms)", s->min / 1000000.0);
else if (s->min < 1000000000000)
fprintf(stderr, "%.2f (s)", s->min / 1000000000.0);
fprintf(stderr, " max: ");
if (s->max < 1000)
fprintf(stderr, "%llu (ns)", PRIuint64(s->max));
else if (s->max < 1000000)
fprintf(stderr, "%.2f (μs)", s->max / 1000.0);
else if (s->max < 1000000000)
fprintf(stderr, "%.2f (ms)", s->max / 1000000.0);
else if (s->max < 1000000000000)
fprintf(stderr, "%.2f (s)", s->max / 1000000000.0);
fprintf(stderr, " mean: ");
if (m < 1000)
fprintf(stderr, "%.2f (ns)", m);
else if (m < 1000000)
fprintf(stderr, "%.2f (μs)", m / 1000.0);
else if (m < 1000000000)
fprintf(stderr, "%.2f (ms)", m / 1000000.0);
else if (m < 1000000000000)
fprintf(stderr, "%.2f (s)", m / 1000000000.0);
fprintf(stderr, "\n");
}
fflush(stderr);
}
void
__stat_free(struct stat *s)
{
if (!s)
return;
enif_free(s->samples);
enif_free(s);
}
struct stat *
__stat_init(uint32_t n)
{
struct stat *s = enif_alloc(sizeof(struct stat) + (sizeof(uint64_t) * n));
if (!s)
return NULL;
memset(s, 0, sizeof(struct stat) + (sizeof(uint64_t) * n));
s->min = ~0;
s->max = 0;
s->mean = 0.0;
s->h = 0;
s->num_samples = n;
erl_drv_tsd_key_create(NULL, &(s->duration_key));
return s;
}

View file

@ -27,185 +27,25 @@
extern "C" {
#endif
#include "duration.h"
#define STAT_DEF(name) struct stat *name ## _stat;
/**
* Calculate the log2 of 64bit unsigned integers.
*/
#ifdef __GCC__
#define LOG2(X) ((unsigned) ((8 * (sizeof(uint64_t) - 1)) - __builtin_clzll((X))))
#else
static unsigned int __log2_64(uint64_t x) {
static const int tab64[64] = {
63, 0, 58, 1, 59, 47, 53, 2,
60, 39, 48, 27, 54, 33, 42, 3,
61, 51, 37, 40, 49, 18, 28, 20,
55, 30, 34, 11, 43, 14, 22, 4,
62, 57, 46, 52, 38, 26, 32, 41,
50, 36, 17, 19, 29, 10, 13, 21,
56, 45, 25, 31, 35, 16, 9, 12,
44, 24, 15, 8, 23, 7, 6, 5};
if (x == 0) return 0;
uint64_t v = x;
v |= v >> 1;
v |= v >> 2;
v |= v >> 4;
v |= v >> 8;
v |= v >> 16;
v |= v >> 32;
return tab64[((uint64_t)((v - (v >> 1)) * 0x07EDD5E59A4E28C2)) >> 58];
}
#define LOG2(X) __log2_64(X)
#endif
#define STAT_DEF(name) struct name ## _stat name ## _stat;
#define STAT_DECL(name, nsamples) \
struct name ## _stat { \
duration_t d; \
uint64_t histogram[64]; \
uint32_t h, n; \
uint64_t samples[nsamples]; \
uint64_t min, max; \
double mean; \
}; \
static inline double name ## _stat_mean(struct name ## _stat *s) { \
uint32_t t = s->h; \
uint32_t h = (s->h + 1) % nsamples; \
double mean = 0; \
while (h != t) { \
mean += s->samples[h]; \
h = (h + 1) % nsamples; \
} \
if (mean > 0) \
mean /= (double)(s->n < nsamples ? s->n : nsamples); \
return mean; \
} \
static inline double name ## _stat_mean_lg2(struct name ## _stat *s) { \
uint32_t i; \
double mean = 0; \
for (i = 0; i < 64; i++) \
mean += (s->histogram[i] * i); \
if (mean > 0) \
mean /= (double)s->n; \
return mean; \
} \
static inline uint64_t name ## _stat_tick(struct name ## _stat *s) \
{ \
uint64_t t = ts(s->d.unit); \
s->d.then = t; \
return t; \
} \
static inline void name ## _stat_reset(struct name ## _stat *s) \
{ \
s->min = ~0; \
s->max = 0; \
s->h = 0; \
memset(&s->histogram, 0, sizeof(uint64_t) * 64); \
memset(&s->samples, 0, sizeof(uint64_t) * nsamples); \
} \
static inline uint64_t name ## _stat_tock(struct name ## _stat *s) \
{ \
uint64_t now = ts(s->d.unit); \
uint64_t elapsed = now - s->d.then; \
uint32_t i = s->h; \
if (s->n == nsamples) { \
s->mean = (s->mean + name ## _stat_mean(s)) / 2.0; \
if (s->n >= 4294967295) \
name ## _stat_reset(s); \
} \
s->h = (s->h + 1) % nsamples; \
s->samples[i] = elapsed; \
if (elapsed < s->min) \
s->min = elapsed; \
if (elapsed > s->max) \
s->max = elapsed; \
s->histogram[LOG2(elapsed)]++; \
s->n++; \
s->d.then = ts(s->d.unit); \
return elapsed; \
} \
static void name ## _stat_print_histogram(struct name ## _stat *s, const char *mod) \
{ \
uint8_t logs[64]; \
uint8_t i, j, max_log = 0; \
double m = (s->mean + name ## _stat_mean(s) / 2.0); \
\
fprintf(stderr, "%s:async_nif request latency histogram:\n", mod); \
for (i = 0; i < 64; i++) { \
logs[i] = LOG2(s->histogram[i]); \
if (logs[i] > max_log) \
max_log = logs[i]; \
} \
for (i = max_log; i > 0; i--) { \
if (!(i % 10)) \
fprintf(stderr, "2^%2d ", i); \
else \
fprintf(stderr, " "); \
for(j = 0; j < 64; j++) \
fprintf(stderr, logs[j] >= i ? "" : " "); \
fprintf(stderr, "\n"); \
} \
if (max_log == 0) { \
fprintf(stderr, "[empty]\n"); \
} else { \
fprintf(stderr, " ns μs ms s ks\n"); \
fprintf(stderr, "min: "); \
if (s->min < 1000) \
fprintf(stderr, "%llu (ns)", PRIuint64(s->min)); \
else if (s->min < 1000000) \
fprintf(stderr, "%.2f (μs)", s->min / 1000.0); \
else if (s->min < 1000000000) \
fprintf(stderr, "%.2f (ms)", s->min / 1000000.0); \
else if (s->min < 1000000000000) \
fprintf(stderr, "%.2f (s)", s->min / 1000000000.0); \
fprintf(stderr, " max: "); \
if (s->max < 1000) \
fprintf(stderr, "%llu (ns)", PRIuint64(s->max)); \
else if (s->max < 1000000) \
fprintf(stderr, "%.2f (μs)", s->max / 1000.0); \
else if (s->max < 1000000000) \
fprintf(stderr, "%.2f (ms)", s->max / 1000000.0); \
else if (s->max < 1000000000000) \
fprintf(stderr, "%.2f (s)", s->max / 1000000000.0); \
fprintf(stderr, " mean: "); \
if (m < 1000) \
fprintf(stderr, "%.2f (ns)", m); \
else if (m < 1000000) \
fprintf(stderr, "%.2f (μs)", m / 1000.0); \
else if (m < 1000000000) \
fprintf(stderr, "%.2f (ms)", m / 1000000.0); \
else if (m < 1000000000000) \
fprintf(stderr, "%.2f (s)", m / 1000000000.0); \
fprintf(stderr, "\n"); \
} \
fflush(stderr); \
}
#define STAT_INIT(var, name) \
(var)->name ## _stat.min = ~0; \
(var)->name ## _stat.max = 0; \
(var)->name ## _stat.mean = 0.0; \
(var)->name ## _stat.h = 0; \
(var)->name ## _stat.d.then = 0; \
(var)->name ## _stat.d.unit = ns;
#define STAT_TICK(var, name) name ## _stat_tick(&(var)->name ## _stat)
#define STAT_TOCK(var, name) name ## _stat_tock(&(var)->name ## _stat)
#define STAT_RESET(var, name) name ## _stat_reset(&(var)->name ## _stat)
#define STAT_MEAN_LOG2_SAMPLE(var, name) \
name ## _stat_mean_lg2(&(var)->name ## _stat)
#define STAT_MEAN_SAMPLE(var, name) \
name ## _stat_mean(&(var)->name ## _stat)
#define STAT_PRINT(var, name, mod) \
name ## _stat_print_histogram(&(var)->name ## _stat, mod)
struct stat {
ErlDrvTSDKey duration_key;
uint32_t h, n, num_samples;
uint64_t min, max;
double mean;
uint64_t histogram[64];
uint64_t samples[];
};
extern double __stat_mean(struct stat *s);
extern double __stat_mean_log2(struct stat *s);
extern uint64_t __stat_tick(struct stat *s);
extern void __stat_reset(struct stat *s);
extern uint64_t __stat_tock(struct stat *s);
extern void __stat_print_histogram(struct stat *s, const char *mod);
extern void __stat_free(struct stat *s);
extern struct stat *__stat_init(uint32_t n);
#if defined(__cplusplus)
}

View file

@ -26,8 +26,10 @@
#include <inttypes.h>
#include <errno.h>
#include "common.h"
#include "wiredtiger.h"
#include "common.h"
#include "duration.h"
#include "stats.h"
#include "async_nif.h"
#include "queue.h"
@ -191,16 +193,14 @@ static inline uint32_t __log2(uint64_t x) {
static int
__ctx_cache_evict(WterlConnHandle *conn_handle)
{
uint32_t num_evicted = 0;
struct wterl_ctx *c;
if (conn_handle->cache_size < MAX_CACHE_SIZE)
return 0;
#if 0 // TODO: fixme once stats work again
static uint16_t ncalls = 0;
uint32_t mean, log, num_evicted, i;
uint64_t now, elapsed;
struct wterl_ctx *c, *n;
if (conn_handle->cache_size < MAX_CACHE_SIZE && ++ncalls != 0)
return 0;
now = cpu_clock_ticks();
// Find the mean of the recorded times that items stayed in cache.
@ -233,16 +233,6 @@ __ctx_cache_evict(WterlConnHandle *conn_handle)
}
c = n;
}
#else
c = STAILQ_FIRST(&conn_handle->cache);
if (c) {
STAILQ_REMOVE(&conn_handle->cache, c, wterl_ctx, entries);
DPRINTF("evicting: %llu", PRIuint64(c->sig));
c->session->close(c->session, NULL);
enif_free(c);
num_evicted++;
}
#endif
conn_handle->cache_size -= num_evicted;
return num_evicted;
}
@ -295,57 +285,6 @@ __ctx_cache_add(WterlConnHandle *conn_handle, struct wterl_ctx *c)
enif_mutex_unlock(conn_handle->cache_mutex);
}
/**
* Create a signature for the operation we're about to perform.
*
* Create a 64-bit hash signature for this a combination of session
* configuration some number of cursors open on tables each potentially with a
* different configuration. "session_config, [{table_name, cursor_config},
* ...]"
*
* session_config the string used to configure the WT_SESSION
* ... each pair of items in the varargs array is a table name,
* cursor config pair
* -> number of variable arguments processed
*/
static uint64_t
__ctx_cache_sig(const char *c, va_list ap, int count, size_t *len)
{
int i = 0;
uint32_t hash = 0;
uint32_t crc = 0;
uint64_t sig = 0;
const char *arg;
size_t l = 0;
*len = 0;
if (c) {
l = __strlen(c);
hash = __str_hash(hash, c, l);
crc = __crc32(crc, c, l);
*len += l + 1;
} else {
*len += 1;
}
for (i = 0; i < (2 * count); i++) {
arg = va_arg(ap, const char *);
if (arg) {
l = __strlen(arg);
hash = __str_hash(hash, arg, l);
crc = __crc32(crc, arg, __strlen(arg));
*len += l + 1;
} else {
*len += 1;
}
}
sig = (uint64_t)crc << 32 | hash;
//DPRINTF("sig %llu [%u:%u]", PRIuint64(sig), crc, hash);
return sig;
}
static inline char *
__copy_str_into(char **p, const char *s)
{
@ -366,20 +305,44 @@ __retain_ctx(WterlConnHandle *conn_handle, uint32_t worker_id,
struct wterl_ctx **ctx,
int count, const char *session_config, ...)
{
int i = 3;
size_t sig_len = 0;
int i = 0;
uint32_t hash = 0;
uint32_t crc = 0;
uint64_t sig = 0;
size_t l, sig_len = 0;
va_list ap;
uint64_t sig;
const char *arg;
struct wterl_ctx *c;
arg = session_config;
va_start(ap, session_config);
sig = __ctx_cache_sig(session_config, ap, count, &sig_len);
if (session_config) {
l = __strlen(session_config);
hash = __str_hash(hash, session_config, l);
crc = __crc32(crc, session_config, l);
sig_len += l + 1;
DPRINTF("sig/1: %s", session_config);
} else {
sig_len += 1;
}
for (i = 0; i < (2 * count); i++) {
arg = va_arg(ap, const char *);
if (arg) {
l = __strlen(arg);
DPRINTF("sig/args: %s", arg);
hash = __str_hash(hash, arg, l);
crc = __crc32(crc, arg, l);
sig_len += l + 1;
} else {
sig_len += 1;
}
}
sig = (uint64_t)crc << 32 | hash;
DPRINTF("sig %llu [%u:%u]", PRIuint64(sig), crc, hash);
va_end(ap);
*ctx = NULL;
do {
c = conn_handle->mru_ctx[worker_id];
if (CASPO(&conn_handle->mru_ctx[worker_id], c, 0) == c) {
if (c == 0) {
@ -391,7 +354,6 @@ __retain_ctx(WterlConnHandle *conn_handle, uint32_t worker_id,
// mru hit:
DPRINTF("[%.4u] mru hit: %llu found", worker_id, PRIuint64(sig));
*ctx = c;
break;
} else {
// mru mismatch:
DPRINTF("[%.4u] mru miss: %llu != %llu", worker_id, PRIuint64(sig), PRIuint64(c->sig));
@ -400,8 +362,6 @@ __retain_ctx(WterlConnHandle *conn_handle, uint32_t worker_id,
}
}
}
// CAS failed, retry up to 3 times
} while(i--);
if (*ctx == NULL) {
// check the cache
@ -474,9 +434,9 @@ __release_ctx(WterlConnHandle *conn_handle, uint32_t worker_id, struct wterl_ctx
} else {
if (c != NULL) {
__ctx_cache_add(conn_handle, c);
DPRINTF("[%.4u] reset %d cursors, returnd ctx to cache", worker_id, ctx->num_cursors);
DPRINTF("[%.4u] reset %d cursors, returned ctx to cache", worker_id, ctx->num_cursors);
} else {
DPRINTF("[%.4u] reset %d cursors, returnd ctx to mru", worker_id, ctx->num_cursors);
DPRINTF("[%.4u] reset %d cursors, returned ctx to mru", worker_id, ctx->num_cursors);
}
}
}