From 786142ce7358f13034798b53f1a2d7db05f8ddc5 Mon Sep 17 00:00:00 2001 From: Gregory Burd Date: Wed, 1 May 2013 22:02:37 -0400 Subject: [PATCH] Add a bit of statistics tracking for two reasons, a) to help inform where a request should be enqueded and b) to track request latency. --- c_src/async_nif.h | 45 ++++++++-- c_src/duration.h | 98 +++++++++++++++++++++ c_src/stats.h | 214 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 348 insertions(+), 9 deletions(-) create mode 100644 c_src/duration.h create mode 100644 c_src/stats.h diff --git a/c_src/async_nif.h b/c_src/async_nif.h index bc12373..e7a9670 100644 --- a/c_src/async_nif.h +++ b/c_src/async_nif.h @@ -28,9 +28,7 @@ extern "C" { #include #include "fifo_q.h" -#ifdef ASYNC_NIF_STATS -#include "stats.h" // TODO: measure, measure... measure again -#endif +#include "stats.h" #ifndef __UNUSED #define __UNUSED(v) ((void)(v)) @@ -53,6 +51,7 @@ struct async_nif_req_entry { DECL_FIFO_QUEUE(reqs, struct async_nif_req_entry); struct async_nif_work_queue { + STAT_DEF(qwait); ErlNifMutex *reqs_mutex; ErlNifCond *reqs_cnd; FIFO_QUEUE_TYPE(reqs) reqs; @@ -66,6 +65,7 @@ struct async_nif_worker_entry { }; struct async_nif_state { + STAT_DEF(qwait); unsigned int shutdown; unsigned int num_workers; struct async_nif_worker_entry worker_entries[ASYNC_NIF_MAX_WORKERS]; @@ -198,22 +198,34 @@ async_nif_reuse_req(struct async_nif_state *async_nif) req = fifo_q_get(reqs, async_nif->recycled_reqs); } enif_mutex_unlock(async_nif->recycled_req_mutex); + STAT_TICK(async_nif, qwait); return req; } /** * Store the request for future re-use. + * + * req a request entry with an ErlNifEnv* which will be cleared + * before reuse, but not until then. + * async_nif a handle to our state so that we can find and use the mutex */ void async_nif_recycle_req(struct async_nif_req_entry *req, struct async_nif_state *async_nif) { + STAT_TOCK(async_nif, qwait); enif_mutex_lock(async_nif->recycled_req_mutex); fifo_q_put(reqs, async_nif->recycled_reqs, req); enif_mutex_unlock(async_nif->recycled_req_mutex); } /** - * TODO: + * A string hash function. + * + * A basic hash function for strings of characters used during the + * affinity association. + * + * s a NULL terminated set of bytes to be hashed + * -> an integer hash encoding of the bytes */ static inline unsigned int async_nif_str_hash_func(const char *s) @@ -224,7 +236,10 @@ async_nif_str_hash_func(const char *s) } /** - * TODO: + * Enqueue a request for processing by a worker thread. + * + * Places the request into a work queue determined either by the + * provided affinity or by iterating through the available queues. */ static ERL_NIF_TERM async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_entry *req, int hint) @@ -232,6 +247,7 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en /* Identify the most appropriate worker for this request. */ unsigned int qid = 0; struct async_nif_work_queue *q = NULL; + unsigned int n = async_nif->num_queues; /* Either we're choosing a queue based on some affinity/hinted value or we need to select the next queue in the rotation and atomically update that @@ -257,8 +273,9 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en enif_mutex_unlock(q->reqs_mutex); return 0; } - - if (fifo_q_full(reqs, q->reqs)) { // TODO: || (q->avg_latency > median_latency) + double await = STAT_MEAN_LOG2_SAMPLE(async_nif, qwait); + double await_inthisq = STAT_MEAN_LOG2_SAMPLE(q, qwait); + if (fifo_q_full(reqs, q->reqs) || await_inthisq > await) { enif_mutex_unlock(q->reqs_mutex); qid = (qid + 1) % async_nif->num_queues; q = &async_nif->queues[qid]; @@ -266,10 +283,11 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en break; } // TODO: at some point add in work sheading/stealing - } while(1); + } while(n-- > 0); /* We hold the queue's lock, and we've seletect a reasonable queue for this new request so add the request. */ + STAT_TICK(q, qwait); fifo_q_put(reqs, q->reqs, req); /* Build the term before releasing the lock so as not to race on the use of @@ -282,6 +300,9 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en return reply; } +/** + * TODO: + */ static void * async_nif_worker_fn(void *arg) { @@ -300,7 +321,8 @@ async_nif_worker_fn(void *arg) break; } if (fifo_q_empty(reqs, q->reqs)) { - /* Queue is empty, wait for work */ + /* Queue is empty so we wait for more work to arrive. */ + STAT_RESET(q, qwait); enif_cond_wait(q->reqs_cnd, q->reqs_mutex); goto check_again_for_work; } else { @@ -317,6 +339,7 @@ async_nif_worker_fn(void *arg) /* Perform the work. */ req->fn_work(req->env, req->ref, &req->pid, worker_id, req->args); + STAT_TOCK(q, qwait); /* Now call the post-work cleanup function. */ req->fn_post(req->args); @@ -345,6 +368,8 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif) struct async_nif_req_entry *req = NULL; __UNUSED(env); + STAT_PRINT(async_nif, qwait, "wterl"); + /* Signal the worker threads, stop what you're doing and exit. To ensure that we don't race with the enqueue() process we first lock all the worker queues, then set shutdown to true, then @@ -446,12 +471,14 @@ async_nif_load() async_nif->shutdown = 0; async_nif->recycled_reqs = fifo_q_new(reqs, ASYNC_NIF_MAX_QUEUED_REQS); async_nif->recycled_req_mutex = enif_mutex_create(NULL); + STAT_INIT(async_nif, qwait); for (i = 0; i < async_nif->num_queues; i++) { struct async_nif_work_queue *q = &async_nif->queues[i]; q->reqs = fifo_q_new(reqs, ASYNC_NIF_WORKER_QUEUE_SIZE); q->reqs_mutex = enif_mutex_create(NULL); q->reqs_cnd = enif_cond_create(NULL); + STAT_INIT(q, qwait); } /* Setup the thread pool management. */ diff --git a/c_src/duration.h b/c_src/duration.h new file mode 100644 index 0000000..635d0fd --- /dev/null +++ b/c_src/duration.h @@ -0,0 +1,98 @@ +/* + * Copyright (C) 2013, all rights reserved by Gregory Burd + * + * This Source Code Form is subject to the terms of the Mozilla Public License, + * version 2 (MPLv2). If a copy of the MPL was not distributed with this file, + * you can obtain one at: http://mozilla.org/MPL/2.0/ + * + * NOTES: + * - on some platforms this will require -lrt + */ +#include +#include +#include +#include + +typedef enum { ns = 0, mcs, ms, s } time_scale; +struct scale_time { + const char *abbreviation; + const char *name; + uint64_t mul, div, overhead, ticks_per; +}; +static const struct scale_time scale[] = { + { "ns", "nanosecond", 1000000000LL, 1LL, 10, 2300000000000LL }, + { "mcs", "microsecond", 1000000LL, 1000LL, 10, 2300000000LL }, + { "ms", "millisecond", 1000LL, 1000000LL, 10, 2300000LL }, + { "sec", "second", 1LL, 1000000000LL, 10, 2300LL } }; + +static uint64_t ts(time_scale unit) +{ + struct timespec ts; + clock_gettime(CLOCK_REALTIME, &ts); + return (((uint64_t)ts.tv_sec * scale[unit].mul) + + ((uint64_t)ts.tv_nsec / scale[unit].div)); +} + +#if 0 +//if defined(__i386__) || defined(__x86_64__) + +/** + * cpu_clock_ticks() + * + * A measure provided by Intel x86 CPUs which provides the number of cycles + * (aka "ticks") executed as a counter using the RDTSC instruction. + */ +static inline uint64_t cpu_clock_ticks() +{ + uint32_t lo, hi; + __asm__ __volatile__ ( + "xorl %%eax, %%eax\n" + "cpuid\n" + "rdtsc\n" + : "=a" (lo), "=d" (hi) + : + : "%ebx", "%ecx" ); + return (uint64_t)hi << 32 | lo; +} + +/** + * cpu_clock_ticks() + * + * An approximation of elapsed [ns, mcs, ms, s] from CPU clock ticks. + */ +static uint64_t elapsed_cpu_clock_ticks(uint64_t start, time_scale unit) +{ + return (cpu_clock_ticks() - start - scale[unit].overhead) * scale[unit].ticks_per; +} + +#endif + +typedef struct { + uint64_t then; + time_scale unit; +} duration_t; + +static inline uint64_t elapsed(duration_t *d) +{ + uint64_t now = ts(d->unit); + uint64_t elapsed = now - d->then; + d->then = now; + return elapsed; +} + +#define DURATION(name, resolution) duration_t name = \ + {ts(resolution), resolution} + +#define ELAPSED_DURING(result, resolution, block) \ + do { \ + DURATION(__x, resolution); \ + do block while(0); \ + *result = elapsed(&__x); \ + } while(0); + +#define CYCLES_DURING(result, block) \ + do { \ + uint64_t __begin = cpu_clock_ticks(); \ + do block while(0); \ + *result = cpu_clock_ticks() - __begin; \ + } while(0); diff --git a/c_src/stats.h b/c_src/stats.h new file mode 100644 index 0000000..2f465be --- /dev/null +++ b/c_src/stats.h @@ -0,0 +1,214 @@ +/* + * stats: + * + * Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved. + * Author: Gregory Burd + * + * This file is provided to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +#ifndef __STATS_H__ +#define __STATS_H__ + +#if defined(__cplusplus) +extern "C" { +#endif + +#include "duration.h" + +/** + * Calculate the log2 of 64bit unsigned integers. + */ +#ifdef __GCC__ +#define LOG2(X) ((unsigned) ((8 * (sizeof(uint64_t) - 1)) - __builtin_clzll((X)))) +#else +static unsigned int __log2_64(uint64_t x) { + static const int tab64[64] = { + 63, 0, 58, 1, 59, 47, 53, 2, + 60, 39, 48, 27, 54, 33, 42, 3, + 61, 51, 37, 40, 49, 18, 28, 20, + 55, 30, 34, 11, 43, 14, 22, 4, + 62, 57, 46, 52, 38, 26, 32, 41, + 50, 36, 17, 19, 29, 10, 13, 21, + 56, 45, 25, 31, 35, 16, 9, 12, + 44, 24, 15, 8, 23, 7, 6, 5}; + if (x == 0) return 0; + uint64_t v = x; + v |= v >> 1; + v |= v >> 2; + v |= v >> 4; + v |= v >> 8; + v |= v >> 16; + v |= v >> 32; + return tab64[((uint64_t)((v - (v >> 1)) * 0x07EDD5E59A4E28C2)) >> 58]; +} +#define LOG2(X) __log2_64(X) +#endif + +#define STAT_DEF(name) struct name ## _stat name ## _stat; + +#define STAT_DECL(name, nsamples) \ + struct name ## _stat { \ + duration_t d; \ + uint64_t histogram[64]; \ + uint32_t h, n; \ + uint64_t samples[nsamples]; \ + uint64_t min, max; \ + double mean; \ + }; \ + static inline double name ## _stat_mean(struct name ## _stat *s) { \ + uint32_t t = s->h; \ + uint32_t h = (s->h + 1) % nsamples; \ + double mean = 0; \ + while (h != t) { \ + mean += s->samples[h]; \ + h = (h + 1) % nsamples; \ + } \ + if (mean > 0) \ + mean /= (double)(s->n < nsamples ? s->n : nsamples); \ + return mean; \ + } \ + static inline double name ## _stat_mean_lg2(struct name ## _stat *s) { \ + uint32_t i; \ + double mean = 0; \ + for (i = 0; i < 64; i++) \ + mean += (s->histogram[i] * i); \ + if (mean > 0) \ + mean /= (double)s->n; \ + return mean; \ + } \ + static inline uint64_t name ## _stat_tick(struct name ## _stat *s) \ + { \ + uint64_t t = ts(s->d.unit); \ + s->d.then = t; \ + return t; \ + } \ + static inline void name ## _stat_reset(struct name ## _stat *s) \ + { \ + s->min = ~0; \ + s->max = 0; \ + s->h = 0; \ + memset(&s->histogram, 0, sizeof(uint64_t) * 64); \ + memset(&s->samples, 0, sizeof(uint64_t) * nsamples); \ + } \ + static inline uint64_t name ## _stat_tock(struct name ## _stat *s) \ + { \ + uint64_t now = ts(s->d.unit); \ + uint64_t elapsed = now - s->d.then; \ + uint32_t i = s->h; \ + if (s->n == nsamples) { \ + s->mean = (s->mean + name ## _stat_mean(s)) / 2.0; \ + if (s->n >= 4294967295) \ + name ## _stat_reset(s); \ + } \ + s->h = (s->h + 1) % nsamples; \ + s->samples[i] = elapsed; \ + if (elapsed < s->min) \ + s->min = elapsed; \ + if (elapsed > s->max) \ + s->max = elapsed; \ + s->histogram[LOG2(elapsed)]++; \ + s->n++; \ + s->d.then = ts(s->d.unit); \ + return elapsed; \ + } \ + static void name ## _stat_print_histogram(struct name ## _stat *s, const char *mod) \ + { \ + uint8_t logs[64]; \ + uint8_t i, j, max_log = 0; \ + double m = (s->mean + name ## _stat_mean(s) / 2.0); \ + \ + fprintf(stderr, "%s:async_nif request latency histogram:\n", mod); \ + for (i = 0; i < 64; i++) { \ + logs[i] = LOG2(s->histogram[i]); \ + if (logs[i] > max_log) \ + max_log = logs[i]; \ + } \ + for (i = max_log; i > 0; i--) { \ + if (!(i % 10)) \ + fprintf(stderr, "2^%2d ", i); \ + else \ + fprintf(stderr, " "); \ + for(j = 0; j < 64; j++) \ + fprintf(stderr, logs[j] >= i ? "•" : " "); \ + fprintf(stderr, "\n"); \ + } \ + if (max_log == 0) { \ + fprintf(stderr, "[empty]\n"); \ + } else { \ + fprintf(stderr, " ns μs ms s ks\n"); \ + fprintf(stderr, "min: "); \ + if (s->min < 1000) \ + fprintf(stderr, "%lu (ns)", s->min); \ + else if (s->min < 1000000) \ + fprintf(stderr, "%.2f (μs)", s->min / 1000.0); \ + else if (s->min < 1000000000) \ + fprintf(stderr, "%.2f (ms)", s->min / 1000000.0); \ + else if (s->min < 1000000000000) \ + fprintf(stderr, "%.2f (s)", s->min / 1000000000.0); \ + fprintf(stderr, " max: "); \ + if (s->max < 1000) \ + fprintf(stderr, "%lu (ns)", s->max); \ + else if (s->max < 1000000) \ + fprintf(stderr, "%.2f (μs)", s->max / 1000.0); \ + else if (s->max < 1000000000) \ + fprintf(stderr, "%.2f (ms)", s->max / 1000000.0); \ + else if (s->max < 1000000000000) \ + fprintf(stderr, "%.2f (s)", s->max / 1000000000.0); \ + fprintf(stderr, " mean: "); \ + if (m < 1000) \ + fprintf(stderr, "%.2f (ns)", m); \ + else if (m < 1000000) \ + fprintf(stderr, "%.2f (μs)", m / 1000.0); \ + else if (m < 1000000000) \ + fprintf(stderr, "%.2f (ms)", m / 1000000.0); \ + else if (m < 1000000000000) \ + fprintf(stderr, "%.2f (s)", m / 1000000000.0); \ + fprintf(stderr, "\n"); \ + } \ + fflush(stderr); \ + } + + +#define STAT_INIT(var, name) \ + var->name ## _stat.min = ~0; \ + var->name ## _stat.max = 0; \ + var->name ## _stat.mean = 0.0; \ + var->name ## _stat.h = 0; \ + var->name ## _stat.d.then = 0; \ + var->name ## _stat.d.unit = ns; + +#define STAT_TICK(var, name) name ## _stat_tick(&var->name ## _stat) + +#define STAT_TOCK(var, name) name ## _stat_tock(&var->name ## _stat) + +#define STAT_RESET(var, name) name ## _stat_reset(&var->name ## _stat) + +#define STAT_MEAN_LOG2_SAMPLE(var, name) \ + name ## _stat_mean_lg2(&var->name ## _stat) + +#define STAT_MEAN_SAMPLE(var, name) \ + name ## _stat_mean(&var->name ## _stat) + +#define STAT_PRINT(var, name, mod) \ + name ## _stat_print_histogram(&var->name ## _stat, mod) + + +#if defined(__cplusplus) +} +#endif + +#endif // __STATS_H__