Add a bit of statistics tracking for two reasons, a) to help inform

where a request should be enqueded and b) to track request latency.
This commit is contained in:
Gregory Burd 2013-05-01 22:02:37 -04:00
parent ae64a5e26f
commit 786142ce73
3 changed files with 348 additions and 9 deletions

View file

@ -28,9 +28,7 @@ extern "C" {
#include <assert.h>
#include "fifo_q.h"
#ifdef ASYNC_NIF_STATS
#include "stats.h" // TODO: measure, measure... measure again
#endif
#include "stats.h"
#ifndef __UNUSED
#define __UNUSED(v) ((void)(v))
@ -53,6 +51,7 @@ struct async_nif_req_entry {
DECL_FIFO_QUEUE(reqs, struct async_nif_req_entry);
struct async_nif_work_queue {
STAT_DEF(qwait);
ErlNifMutex *reqs_mutex;
ErlNifCond *reqs_cnd;
FIFO_QUEUE_TYPE(reqs) reqs;
@ -66,6 +65,7 @@ struct async_nif_worker_entry {
};
struct async_nif_state {
STAT_DEF(qwait);
unsigned int shutdown;
unsigned int num_workers;
struct async_nif_worker_entry worker_entries[ASYNC_NIF_MAX_WORKERS];
@ -198,22 +198,34 @@ async_nif_reuse_req(struct async_nif_state *async_nif)
req = fifo_q_get(reqs, async_nif->recycled_reqs);
}
enif_mutex_unlock(async_nif->recycled_req_mutex);
STAT_TICK(async_nif, qwait);
return req;
}
/**
* Store the request for future re-use.
*
* req a request entry with an ErlNifEnv* which will be cleared
* before reuse, but not until then.
* async_nif a handle to our state so that we can find and use the mutex
*/
void
async_nif_recycle_req(struct async_nif_req_entry *req, struct async_nif_state *async_nif)
{
STAT_TOCK(async_nif, qwait);
enif_mutex_lock(async_nif->recycled_req_mutex);
fifo_q_put(reqs, async_nif->recycled_reqs, req);
enif_mutex_unlock(async_nif->recycled_req_mutex);
}
/**
* TODO:
* A string hash function.
*
* A basic hash function for strings of characters used during the
* affinity association.
*
* s a NULL terminated set of bytes to be hashed
* -> an integer hash encoding of the bytes
*/
static inline unsigned int
async_nif_str_hash_func(const char *s)
@ -224,7 +236,10 @@ async_nif_str_hash_func(const char *s)
}
/**
* TODO:
* Enqueue a request for processing by a worker thread.
*
* Places the request into a work queue determined either by the
* provided affinity or by iterating through the available queues.
*/
static ERL_NIF_TERM
async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_entry *req, int hint)
@ -232,6 +247,7 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
/* Identify the most appropriate worker for this request. */
unsigned int qid = 0;
struct async_nif_work_queue *q = NULL;
unsigned int n = async_nif->num_queues;
/* Either we're choosing a queue based on some affinity/hinted value or we
need to select the next queue in the rotation and atomically update that
@ -257,8 +273,9 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
enif_mutex_unlock(q->reqs_mutex);
return 0;
}
if (fifo_q_full(reqs, q->reqs)) { // TODO: || (q->avg_latency > median_latency)
double await = STAT_MEAN_LOG2_SAMPLE(async_nif, qwait);
double await_inthisq = STAT_MEAN_LOG2_SAMPLE(q, qwait);
if (fifo_q_full(reqs, q->reqs) || await_inthisq > await) {
enif_mutex_unlock(q->reqs_mutex);
qid = (qid + 1) % async_nif->num_queues;
q = &async_nif->queues[qid];
@ -266,10 +283,11 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
break;
}
// TODO: at some point add in work sheading/stealing
} while(1);
} while(n-- > 0);
/* We hold the queue's lock, and we've seletect a reasonable queue for this
new request so add the request. */
STAT_TICK(q, qwait);
fifo_q_put(reqs, q->reqs, req);
/* Build the term before releasing the lock so as not to race on the use of
@ -282,6 +300,9 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
return reply;
}
/**
* TODO:
*/
static void *
async_nif_worker_fn(void *arg)
{
@ -300,7 +321,8 @@ async_nif_worker_fn(void *arg)
break;
}
if (fifo_q_empty(reqs, q->reqs)) {
/* Queue is empty, wait for work */
/* Queue is empty so we wait for more work to arrive. */
STAT_RESET(q, qwait);
enif_cond_wait(q->reqs_cnd, q->reqs_mutex);
goto check_again_for_work;
} else {
@ -317,6 +339,7 @@ async_nif_worker_fn(void *arg)
/* Perform the work. */
req->fn_work(req->env, req->ref, &req->pid, worker_id, req->args);
STAT_TOCK(q, qwait);
/* Now call the post-work cleanup function. */
req->fn_post(req->args);
@ -345,6 +368,8 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
struct async_nif_req_entry *req = NULL;
__UNUSED(env);
STAT_PRINT(async_nif, qwait, "wterl");
/* Signal the worker threads, stop what you're doing and exit. To
ensure that we don't race with the enqueue() process we first
lock all the worker queues, then set shutdown to true, then
@ -446,12 +471,14 @@ async_nif_load()
async_nif->shutdown = 0;
async_nif->recycled_reqs = fifo_q_new(reqs, ASYNC_NIF_MAX_QUEUED_REQS);
async_nif->recycled_req_mutex = enif_mutex_create(NULL);
STAT_INIT(async_nif, qwait);
for (i = 0; i < async_nif->num_queues; i++) {
struct async_nif_work_queue *q = &async_nif->queues[i];
q->reqs = fifo_q_new(reqs, ASYNC_NIF_WORKER_QUEUE_SIZE);
q->reqs_mutex = enif_mutex_create(NULL);
q->reqs_cnd = enif_cond_create(NULL);
STAT_INIT(q, qwait);
}
/* Setup the thread pool management. */

98
c_src/duration.h Normal file
View file

@ -0,0 +1,98 @@
/*
* Copyright (C) 2013, all rights reserved by Gregory Burd <greg@burd.me>
*
* This Source Code Form is subject to the terms of the Mozilla Public License,
* version 2 (MPLv2). If a copy of the MPL was not distributed with this file,
* you can obtain one at: http://mozilla.org/MPL/2.0/
*
* NOTES:
* - on some platforms this will require -lrt
*/
#include <stdio.h>
#include <stdint.h>
#include <time.h>
#include <sys/timeb.h>
typedef enum { ns = 0, mcs, ms, s } time_scale;
struct scale_time {
const char *abbreviation;
const char *name;
uint64_t mul, div, overhead, ticks_per;
};
static const struct scale_time scale[] = {
{ "ns", "nanosecond", 1000000000LL, 1LL, 10, 2300000000000LL },
{ "mcs", "microsecond", 1000000LL, 1000LL, 10, 2300000000LL },
{ "ms", "millisecond", 1000LL, 1000000LL, 10, 2300000LL },
{ "sec", "second", 1LL, 1000000000LL, 10, 2300LL } };
static uint64_t ts(time_scale unit)
{
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
return (((uint64_t)ts.tv_sec * scale[unit].mul) +
((uint64_t)ts.tv_nsec / scale[unit].div));
}
#if 0
//if defined(__i386__) || defined(__x86_64__)
/**
* cpu_clock_ticks()
*
* A measure provided by Intel x86 CPUs which provides the number of cycles
* (aka "ticks") executed as a counter using the RDTSC instruction.
*/
static inline uint64_t cpu_clock_ticks()
{
uint32_t lo, hi;
__asm__ __volatile__ (
"xorl %%eax, %%eax\n"
"cpuid\n"
"rdtsc\n"
: "=a" (lo), "=d" (hi)
:
: "%ebx", "%ecx" );
return (uint64_t)hi << 32 | lo;
}
/**
* cpu_clock_ticks()
*
* An approximation of elapsed [ns, mcs, ms, s] from CPU clock ticks.
*/
static uint64_t elapsed_cpu_clock_ticks(uint64_t start, time_scale unit)
{
return (cpu_clock_ticks() - start - scale[unit].overhead) * scale[unit].ticks_per;
}
#endif
typedef struct {
uint64_t then;
time_scale unit;
} duration_t;
static inline uint64_t elapsed(duration_t *d)
{
uint64_t now = ts(d->unit);
uint64_t elapsed = now - d->then;
d->then = now;
return elapsed;
}
#define DURATION(name, resolution) duration_t name = \
{ts(resolution), resolution}
#define ELAPSED_DURING(result, resolution, block) \
do { \
DURATION(__x, resolution); \
do block while(0); \
*result = elapsed(&__x); \
} while(0);
#define CYCLES_DURING(result, block) \
do { \
uint64_t __begin = cpu_clock_ticks(); \
do block while(0); \
*result = cpu_clock_ticks() - __begin; \
} while(0);

214
c_src/stats.h Normal file
View file

@ -0,0 +1,214 @@
/*
* stats:
*
* Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved.
* Author: Gregory Burd <greg@basho.com> <greg@burd.me>
*
* This file is provided to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef __STATS_H__
#define __STATS_H__
#if defined(__cplusplus)
extern "C" {
#endif
#include "duration.h"
/**
* Calculate the log2 of 64bit unsigned integers.
*/
#ifdef __GCC__
#define LOG2(X) ((unsigned) ((8 * (sizeof(uint64_t) - 1)) - __builtin_clzll((X))))
#else
static unsigned int __log2_64(uint64_t x) {
static const int tab64[64] = {
63, 0, 58, 1, 59, 47, 53, 2,
60, 39, 48, 27, 54, 33, 42, 3,
61, 51, 37, 40, 49, 18, 28, 20,
55, 30, 34, 11, 43, 14, 22, 4,
62, 57, 46, 52, 38, 26, 32, 41,
50, 36, 17, 19, 29, 10, 13, 21,
56, 45, 25, 31, 35, 16, 9, 12,
44, 24, 15, 8, 23, 7, 6, 5};
if (x == 0) return 0;
uint64_t v = x;
v |= v >> 1;
v |= v >> 2;
v |= v >> 4;
v |= v >> 8;
v |= v >> 16;
v |= v >> 32;
return tab64[((uint64_t)((v - (v >> 1)) * 0x07EDD5E59A4E28C2)) >> 58];
}
#define LOG2(X) __log2_64(X)
#endif
#define STAT_DEF(name) struct name ## _stat name ## _stat;
#define STAT_DECL(name, nsamples) \
struct name ## _stat { \
duration_t d; \
uint64_t histogram[64]; \
uint32_t h, n; \
uint64_t samples[nsamples]; \
uint64_t min, max; \
double mean; \
}; \
static inline double name ## _stat_mean(struct name ## _stat *s) { \
uint32_t t = s->h; \
uint32_t h = (s->h + 1) % nsamples; \
double mean = 0; \
while (h != t) { \
mean += s->samples[h]; \
h = (h + 1) % nsamples; \
} \
if (mean > 0) \
mean /= (double)(s->n < nsamples ? s->n : nsamples); \
return mean; \
} \
static inline double name ## _stat_mean_lg2(struct name ## _stat *s) { \
uint32_t i; \
double mean = 0; \
for (i = 0; i < 64; i++) \
mean += (s->histogram[i] * i); \
if (mean > 0) \
mean /= (double)s->n; \
return mean; \
} \
static inline uint64_t name ## _stat_tick(struct name ## _stat *s) \
{ \
uint64_t t = ts(s->d.unit); \
s->d.then = t; \
return t; \
} \
static inline void name ## _stat_reset(struct name ## _stat *s) \
{ \
s->min = ~0; \
s->max = 0; \
s->h = 0; \
memset(&s->histogram, 0, sizeof(uint64_t) * 64); \
memset(&s->samples, 0, sizeof(uint64_t) * nsamples); \
} \
static inline uint64_t name ## _stat_tock(struct name ## _stat *s) \
{ \
uint64_t now = ts(s->d.unit); \
uint64_t elapsed = now - s->d.then; \
uint32_t i = s->h; \
if (s->n == nsamples) { \
s->mean = (s->mean + name ## _stat_mean(s)) / 2.0; \
if (s->n >= 4294967295) \
name ## _stat_reset(s); \
} \
s->h = (s->h + 1) % nsamples; \
s->samples[i] = elapsed; \
if (elapsed < s->min) \
s->min = elapsed; \
if (elapsed > s->max) \
s->max = elapsed; \
s->histogram[LOG2(elapsed)]++; \
s->n++; \
s->d.then = ts(s->d.unit); \
return elapsed; \
} \
static void name ## _stat_print_histogram(struct name ## _stat *s, const char *mod) \
{ \
uint8_t logs[64]; \
uint8_t i, j, max_log = 0; \
double m = (s->mean + name ## _stat_mean(s) / 2.0); \
\
fprintf(stderr, "%s:async_nif request latency histogram:\n", mod); \
for (i = 0; i < 64; i++) { \
logs[i] = LOG2(s->histogram[i]); \
if (logs[i] > max_log) \
max_log = logs[i]; \
} \
for (i = max_log; i > 0; i--) { \
if (!(i % 10)) \
fprintf(stderr, "2^%2d ", i); \
else \
fprintf(stderr, " "); \
for(j = 0; j < 64; j++) \
fprintf(stderr, logs[j] >= i ? "" : " "); \
fprintf(stderr, "\n"); \
} \
if (max_log == 0) { \
fprintf(stderr, "[empty]\n"); \
} else { \
fprintf(stderr, " ns μs ms s ks\n"); \
fprintf(stderr, "min: "); \
if (s->min < 1000) \
fprintf(stderr, "%lu (ns)", s->min); \
else if (s->min < 1000000) \
fprintf(stderr, "%.2f (μs)", s->min / 1000.0); \
else if (s->min < 1000000000) \
fprintf(stderr, "%.2f (ms)", s->min / 1000000.0); \
else if (s->min < 1000000000000) \
fprintf(stderr, "%.2f (s)", s->min / 1000000000.0); \
fprintf(stderr, " max: "); \
if (s->max < 1000) \
fprintf(stderr, "%lu (ns)", s->max); \
else if (s->max < 1000000) \
fprintf(stderr, "%.2f (μs)", s->max / 1000.0); \
else if (s->max < 1000000000) \
fprintf(stderr, "%.2f (ms)", s->max / 1000000.0); \
else if (s->max < 1000000000000) \
fprintf(stderr, "%.2f (s)", s->max / 1000000000.0); \
fprintf(stderr, " mean: "); \
if (m < 1000) \
fprintf(stderr, "%.2f (ns)", m); \
else if (m < 1000000) \
fprintf(stderr, "%.2f (μs)", m / 1000.0); \
else if (m < 1000000000) \
fprintf(stderr, "%.2f (ms)", m / 1000000.0); \
else if (m < 1000000000000) \
fprintf(stderr, "%.2f (s)", m / 1000000000.0); \
fprintf(stderr, "\n"); \
} \
fflush(stderr); \
}
#define STAT_INIT(var, name) \
var->name ## _stat.min = ~0; \
var->name ## _stat.max = 0; \
var->name ## _stat.mean = 0.0; \
var->name ## _stat.h = 0; \
var->name ## _stat.d.then = 0; \
var->name ## _stat.d.unit = ns;
#define STAT_TICK(var, name) name ## _stat_tick(&var->name ## _stat)
#define STAT_TOCK(var, name) name ## _stat_tock(&var->name ## _stat)
#define STAT_RESET(var, name) name ## _stat_reset(&var->name ## _stat)
#define STAT_MEAN_LOG2_SAMPLE(var, name) \
name ## _stat_mean_lg2(&var->name ## _stat)
#define STAT_MEAN_SAMPLE(var, name) \
name ## _stat_mean(&var->name ## _stat)
#define STAT_PRINT(var, name, mod) \
name ## _stat_print_histogram(&var->name ## _stat, mod)
#if defined(__cplusplus)
}
#endif
#endif // __STATS_H__