From c41e411a92bdf61e3b0a0bdc0a09a0a88ae30e46 Mon Sep 17 00:00:00 2001 From: Gregory Burd Date: Tue, 25 Jun 2013 13:31:43 -0400 Subject: [PATCH] Worker threads come and go as needed with a lower bound of 2 and an upper bound of ASYNC_NIF_MAX_WORKERS. Stats were improved to use thread local storage for measures. With stats working again wterl uses them to determine who to evict. Wterl's signature calculation for an operation wasn't correct and so the cache wasn't efficient at all, this has been fixed. --- c_src/async_nif.h | 189 +++++++++++++++++++------------- c_src/build_deps.sh | 9 +- c_src/cas.h | 6 +- c_src/duration.h | 2 +- c_src/stats.c | 260 ++++++++++++++++++++++++++++++++++++++++++++ c_src/stats.h | 194 +++------------------------------ c_src/wterl.c | 154 ++++++++++---------------- 7 files changed, 453 insertions(+), 361 deletions(-) create mode 100644 c_src/stats.c diff --git a/c_src/async_nif.h b/c_src/async_nif.h index ff76364..6627152 100644 --- a/c_src/async_nif.h +++ b/c_src/async_nif.h @@ -26,6 +26,7 @@ extern "C" { #include #include "fifo_q.h" +#include "queue.h" #include "stats.h" #ifndef UNUSED @@ -33,11 +34,9 @@ extern "C" { #endif #define ASYNC_NIF_MAX_WORKERS 1024 -#define ASYNC_NIF_WORKER_QUEUE_SIZE 2000 +#define ASYNC_NIF_WORKER_QUEUE_SIZE 1000 #define ASYNC_NIF_MAX_QUEUED_REQS ASYNC_NIF_WORKER_QUEUE_SIZE * ASYNC_NIF_MAX_WORKERS -STAT_DECL(qwait, 1000); - struct async_nif_req_entry { ERL_NIF_TERM ref; ErlNifEnv *env; @@ -45,12 +44,12 @@ struct async_nif_req_entry { void *args; void (*fn_work)(ErlNifEnv*, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *); void (*fn_post)(void *); - const char *func; }; DECL_FIFO_QUEUE(reqs, struct async_nif_req_entry); struct async_nif_work_queue { STAT_DEF(qwait); + unsigned int workers; ErlNifMutex *reqs_mutex; ErlNifCond *reqs_cnd; FIFO_QUEUE_TYPE(reqs) reqs; @@ -61,13 +60,15 @@ struct async_nif_worker_entry { unsigned int worker_id; struct async_nif_state *async_nif; struct async_nif_work_queue *q; + SLIST_ENTRY(async_nif_worker_entry) entries; }; struct async_nif_state { STAT_DEF(qwait); unsigned int shutdown; - unsigned int num_workers; - struct async_nif_worker_entry worker_entries[ASYNC_NIF_MAX_WORKERS]; + ErlNifMutex *we_mutex; + unsigned int we_active; + SLIST_HEAD(joining, async_nif_worker_entry) we_joining; unsigned int num_queues; unsigned int next_q; FIFO_QUEUE_TYPE(reqs) recycled_reqs; @@ -107,7 +108,6 @@ struct async_nif_state { enif_make_atom(env, "shutdown")); \ req = async_nif_reuse_req(async_nif); \ if (!req) { \ - async_nif_recycle_req(req, async_nif); \ return enif_make_tuple2(env, enif_make_atom(env, "error"), \ enif_make_atom(env, "eagain")); \ } \ @@ -128,7 +128,6 @@ struct async_nif_state { req->args = (void*)copy_of_args; \ req->fn_work = (void (*)(ErlNifEnv *, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *))fn_work_ ## decl ; \ req->fn_post = (void (*)(void *))fn_post_ ## decl; \ - req->func = __func__; \ int h = -1; \ if (affinity) \ h = affinity % async_nif->num_queues; \ @@ -195,12 +194,12 @@ async_nif_reuse_req(struct async_nif_state *async_nif) if (req) { memset(req, 0, sizeof(struct async_nif_req_entry)); env = enif_alloc_env(); - if (!env) { - enif_free(req); - req = NULL; - } else { + if (env) { req->env = env; async_nif->num_reqs++; + } else { + enif_free(req); + req = NULL; } } } @@ -208,7 +207,7 @@ async_nif_reuse_req(struct async_nif_state *async_nif) req = fifo_q_get(reqs, async_nif->recycled_reqs); } enif_mutex_unlock(async_nif->recycled_req_mutex); - STAT_TICK(async_nif, qwait); + __stat_tick(async_nif->qwait_stat); return req; } @@ -223,16 +222,61 @@ void async_nif_recycle_req(struct async_nif_req_entry *req, struct async_nif_state *async_nif) { ErlNifEnv *env = NULL; - STAT_TOCK(async_nif, qwait); + __stat_tock(async_nif->qwait_stat); enif_mutex_lock(async_nif->recycled_req_mutex); + enif_clear_env(req->env); env = req->env; - enif_clear_env(env); memset(req, 0, sizeof(struct async_nif_req_entry)); req->env = env; fifo_q_put(reqs, async_nif->recycled_reqs, req); enif_mutex_unlock(async_nif->recycled_req_mutex); } +static void *async_nif_worker_fn(void *); + +/** + * Start up a worker thread. + */ +static int +async_nif_start_worker(struct async_nif_state *async_nif, struct async_nif_work_queue *q) +{ + struct async_nif_worker_entry *we; + + if (0 == q) + return EINVAL; + + enif_mutex_lock(async_nif->we_mutex); + + we = SLIST_FIRST(&async_nif->we_joining); + while(we != NULL) { + struct async_nif_worker_entry *n = SLIST_NEXT(we, entries); + SLIST_REMOVE_HEAD(&async_nif->we_joining, entries); + void *exit_value = 0; /* We ignore the thread_join's exit value. */ + enif_thread_join(we->tid, &exit_value); + enif_free(we); + async_nif->we_active--; + we = n; + } + + if (async_nif->we_active == ASYNC_NIF_MAX_WORKERS) { + enif_mutex_unlock(async_nif->we_mutex); + return EAGAIN; + } + + we = enif_alloc(sizeof(struct async_nif_worker_entry)); + if (!we) { + enif_mutex_unlock(async_nif->we_mutex); + return ENOMEM; + } + memset(we, 0, sizeof(struct async_nif_worker_entry)); + we->worker_id = async_nif->we_active++; + we->async_nif = async_nif; + we->q = q; + + enif_mutex_unlock(async_nif->we_mutex); + return enif_thread_create(NULL,&we->tid, &async_nif_worker_fn, (void*)we, 0); +} + /** * Enqueue a request for processing by a worker thread. * @@ -244,7 +288,10 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en { /* Identify the most appropriate worker for this request. */ unsigned int qid = 0; + unsigned int n = async_nif->num_queues; struct async_nif_work_queue *q = NULL; + double await = 0; + double await_inthisq = 0; /* Either we're choosing a queue based on some affinity/hinted value or we need to select the next queue in the rotation and atomically update that @@ -257,12 +304,6 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en async_nif->next_q = qid; } - q = &async_nif->queues[qid]; - enif_mutex_lock(q->reqs_mutex); - -#if 0 // stats aren't yet thread safe, so this can go haywire... TODO: fix. - unsigned int n = async_nif->num_queues; - /* Now we inspect and interate across the set of queues trying to select one that isn't too full or too slow. */ do { @@ -277,8 +318,8 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en return 0; } if (!fifo_q_full(reqs, q->reqs)) { - double await = STAT_MEAN_LOG2_SAMPLE(async_nif, qwait); - double await_inthisq = STAT_MEAN_LOG2_SAMPLE(q, qwait); + await = __stat_mean_log2(async_nif->qwait_stat); + await_inthisq = __stat_mean_log2(q->qwait_stat); if (await_inthisq > await) { enif_mutex_unlock(q->reqs_mutex); qid = (qid + 1) % async_nif->num_queues; @@ -288,13 +329,18 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en break; } } - // TODO: at some point add in work sheading/stealing } while(n-- > 0); -#endif /* We hold the queue's lock, and we've seletect a reasonable queue for this - new request so add the request. */ - STAT_TICK(q, qwait); + new request now check to make sure there are enough workers actively + processing requests on this queue. */ + if (q->workers < 2 || await_inthisq > await) { + if (async_nif_start_worker(async_nif, q) == 0) + q->workers++; + } + + /* And finally add the request to the queue. */ + __stat_tick(q->qwait_stat); fifo_q_put(reqs, q->reqs, req); /* Build the term before releasing the lock so as not to race on the use of @@ -331,9 +377,14 @@ async_nif_worker_fn(void *arg) } if (fifo_q_empty(reqs, q->reqs)) { /* Queue is empty so we wait for more work to arrive. */ - STAT_RESET(q, qwait); - enif_cond_wait(q->reqs_cnd, q->reqs_mutex); - goto check_again_for_work; + __stat_reset(q->qwait_stat); + if (q->workers > 2) { + enif_mutex_unlock(q->reqs_mutex); + break; + } else { + enif_cond_wait(q->reqs_cnd, q->reqs_mutex); + goto check_again_for_work; + } } else { assert(fifo_q_size(reqs, q->reqs) > 0); assert(fifo_q_size(reqs, q->reqs) < fifo_q_capacity(reqs, q->reqs)); @@ -348,7 +399,7 @@ async_nif_worker_fn(void *arg) /* Perform the work. */ req->fn_work(req->env, req->ref, &req->pid, worker_id, req->args); - STAT_TOCK(q, qwait); + __stat_tock(q->qwait_stat); /* Now call the post-work cleanup function. */ req->fn_post(req->args); @@ -363,6 +414,10 @@ async_nif_worker_fn(void *arg) req = NULL; } } + enif_mutex_lock(async_nif->we_mutex); + SLIST_INSERT_HEAD(&async_nif->we_joining, we, entries); + enif_mutex_unlock(async_nif->we_mutex); + q->workers--; enif_thread_exit(0); return 0; } @@ -374,9 +429,10 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif) unsigned int num_queues = async_nif->num_queues; struct async_nif_work_queue *q = NULL; struct async_nif_req_entry *req = NULL; + struct async_nif_worker_entry *we = NULL; UNUSED(env); - STAT_PRINT(async_nif, qwait, "wterl"); + __stat_print_histogram(async_nif->qwait_stat, "wterl"); /* Signal the worker threads, stop what you're doing and exit. To ensure that we don't race with the enqueue() process we first @@ -393,19 +449,29 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif) executing requests. */ async_nif->shutdown = 1; - /* Make sure to wake up all worker threads sitting on conditional - wait for work so that they can see it's time to exit. */ for (i = 0; i < num_queues; i++) { q = &async_nif->queues[i]; - enif_cond_broadcast(q->reqs_cnd); enif_mutex_unlock(q->reqs_mutex); } /* Join for the now exiting worker threads. */ - for (i = 0; i < async_nif->num_workers; ++i) { - void *exit_value = 0; /* We ignore the thread_join's exit value. */ - enif_thread_join(async_nif->worker_entries[i].tid, &exit_value); + while(async_nif->we_active > 0) { + + for (i = 0; i < num_queues; i++) + enif_cond_broadcast(async_nif->queues[i].reqs_cnd); + + we = SLIST_FIRST(&async_nif->we_joining); + while(we != NULL) { + struct async_nif_worker_entry *n = SLIST_NEXT(we, entries); + SLIST_REMOVE_HEAD(&async_nif->we_joining, entries); + void *exit_value = 0; /* We ignore the thread_join's exit value. */ + enif_thread_join(we->tid, &exit_value); + enif_free(we); + async_nif->we_active--; + we = n; + } } + enif_mutex_destroy(async_nif->we_mutex); /* Cleanup in-flight requests, mutexes and conditions in each work queue. */ for (i = 0; i < num_queues; i++) { @@ -447,7 +513,7 @@ static void * async_nif_load() { static int has_init = 0; - unsigned int i, j, num_queues; + unsigned int i, num_queues; ErlNifSysInfo info; struct async_nif_state *async_nif; @@ -477,57 +543,24 @@ async_nif_load() if (!async_nif) return NULL; memset(async_nif, 0, sizeof(struct async_nif_state) + - sizeof(struct async_nif_work_queue) * num_queues); + sizeof(struct async_nif_work_queue) * num_queues); async_nif->num_queues = num_queues; - async_nif->num_workers = ASYNC_NIF_MAX_WORKERS; + async_nif->we_active = 0; async_nif->next_q = 0; async_nif->shutdown = 0; async_nif->recycled_reqs = fifo_q_new(reqs, ASYNC_NIF_MAX_QUEUED_REQS); async_nif->recycled_req_mutex = enif_mutex_create(NULL); - STAT_INIT(async_nif, qwait); + async_nif->qwait_stat = __stat_init(1000); + async_nif->we_mutex = enif_mutex_create(NULL); + SLIST_INIT(&async_nif->we_joining); for (i = 0; i < async_nif->num_queues; i++) { struct async_nif_work_queue *q = &async_nif->queues[i]; q->reqs = fifo_q_new(reqs, ASYNC_NIF_WORKER_QUEUE_SIZE); q->reqs_mutex = enif_mutex_create(NULL); q->reqs_cnd = enif_cond_create(NULL); - STAT_INIT(q, qwait); - } - - /* Setup the thread pool management. */ - memset(async_nif->worker_entries, 0, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS); - - /* Start the worker threads. */ - for (i = 0; i < async_nif->num_workers; i++) { - struct async_nif_worker_entry *we = &async_nif->worker_entries[i]; - we->async_nif = async_nif; - we->worker_id = i; - we->q = &async_nif->queues[i % async_nif->num_queues]; - if (enif_thread_create(NULL, &async_nif->worker_entries[i].tid, - &async_nif_worker_fn, (void*)we, NULL) != 0) { - async_nif->shutdown = 1; - - for (j = 0; j < async_nif->num_queues; j++) { - struct async_nif_work_queue *q = &async_nif->queues[j]; - enif_cond_broadcast(q->reqs_cnd); - } - - while(i-- > 0) { - void *exit_value = 0; /* Ignore this. */ - enif_thread_join(async_nif->worker_entries[i].tid, &exit_value); - } - - for (j = 0; j < async_nif->num_queues; j++) { - struct async_nif_work_queue *q = &async_nif->queues[j]; - enif_mutex_destroy(q->reqs_mutex); - enif_cond_destroy(q->reqs_cnd); - } - - memset(async_nif->worker_entries, 0, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS); - enif_free(async_nif); - return NULL; - } + q->qwait_stat = __stat_init(1000); } return async_nif; } diff --git a/c_src/build_deps.sh b/c_src/build_deps.sh index cc0e807..15608ef 100755 --- a/c_src/build_deps.sh +++ b/c_src/build_deps.sh @@ -36,13 +36,12 @@ get_wt () (cd $BASEDIR/$WT_DIR && git pull -u) || exit 1 else if [ "X$WT_REF" != "X" ]; then - git clone ${WT_REPO} && \ - (cd $BASEDIR/wiredtiger && git checkout refs/$WT_REF || exit 1) + git clone ${WT_REPO} ${WT_DIR} && \ + (cd $BASEDIR/$WT_DIR && git checkout refs/$WT_REF || exit 1) else - git clone ${WT_REPO} && \ - (cd $BASEDIR/wiredtiger && git checkout -b $WT_BRANCH origin/$WT_BRANCH || exit 1) + git clone ${WT_REPO} ${WT_DIR} && \ + (cd $BASEDIR/$WT_DIR && git checkout -b $WT_BRANCH origin/$WT_BRANCH || exit 1) fi - mv wiredtiger $WT_DIR || exit 1 fi [ -d $BASEDIR/$WT_DIR ] || (echo "Missing WiredTiger source directory" && exit 1) (cd $BASEDIR/$WT_DIR diff --git a/c_src/cas.h b/c_src/cas.h index ea81dbf..61c1f61 100644 --- a/c_src/cas.h +++ b/c_src/cas.h @@ -69,9 +69,9 @@ do { \ __val = __newval; \ } while ( 0 ) -#define ALIGNED_ENIF_ALLOC(_s) \ - ((void *)(((unsigned long)enif_alloc((_s)+CACHE_LINE_SIZE*2) + \ - CACHE_LINE_SIZE - 1) & ~(CACHE_LINE_SIZE-1))) \ +#define CACHE_ALIGNED_SIZEOF(_s) \ + ((sizeof(_s)) + CACHE_LINE_SIZE*2) + \ + CACHE_LINE_SIZE - 1) & ~(CACHE_LINE_SIZE-1))) \ /* * I. Compare-and-swap. diff --git a/c_src/duration.h b/c_src/duration.h index 1404f41..2d86385 100644 --- a/c_src/duration.h +++ b/c_src/duration.h @@ -19,7 +19,7 @@ #endif -void current_utc_time(struct timespec *ts) +static inline void current_utc_time(struct timespec *ts) { #ifdef __MACH__ // OS X does not have clock_gettime, use clock_get_time clock_serv_t cclock; diff --git a/c_src/stats.c b/c_src/stats.c new file mode 100644 index 0000000..9d56f9e --- /dev/null +++ b/c_src/stats.c @@ -0,0 +1,260 @@ +/* + * stats: + * + * Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved. + * Author: Gregory Burd + * + * This file is provided to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include + +#include "erl_nif.h" +#include "erl_driver.h" + +#include "common.h" +#include "duration.h" +#include "stats.h" + +/** + * Calculate the log2 of 64bit unsigned integers. + */ +#ifdef __GCC__ +#define LOG2(X) ((unsigned) ((8 * (sizeof(uint64_t) - 1)) - __builtin_clzll((X)))) +#else +static unsigned int __log2_64(uint64_t x) { + static const int tab64[64] = { + 63, 0, 58, 1, 59, 47, 53, 2, + 60, 39, 48, 27, 54, 33, 42, 3, + 61, 51, 37, 40, 49, 18, 28, 20, + 55, 30, 34, 11, 43, 14, 22, 4, + 62, 57, 46, 52, 38, 26, 32, 41, + 50, 36, 17, 19, 29, 10, 13, 21, + 56, 45, 25, 31, 35, 16, 9, 12, + 44, 24, 15, 8, 23, 7, 6, 5}; + if (x == 0) return 0; + uint64_t v = x; + v |= v >> 1; + v |= v >> 2; + v |= v >> 4; + v |= v >> 8; + v |= v >> 16; + v |= v >> 32; + return tab64[((uint64_t)((v - (v >> 1)) * 0x07EDD5E59A4E28C2)) >> 58]; +} +#define LOG2(X) __log2_64(X) +#endif + +double +__stat_mean(struct stat *s) +{ + uint32_t t, h; + double mean; + + if (!s) + return 0.0; + + t = s->h; + h = (s->h + 1) % s->num_samples; + mean = 0; + + while (h != t) { + mean += s->samples[h]; + h = (h + 1) % s->num_samples; + } + if (mean > 0) + mean /= (double)(s->n < s->num_samples ? s->n : s->num_samples); + return mean; +} + +double +__stat_mean_log2(struct stat *s) +{ + uint32_t i; + double mean; + + if (!s) + return 0.0; + + mean = 0; + for (i = 0; i < 64; i++) + mean += (s->histogram[i] * i); + if (mean > 0) + mean /= (double)s->n; + return mean; +} + +uint64_t +__stat_tick(struct stat *s) +{ + duration_t *d; + uint64_t t; + + if (!s) + return 0.0; + + d = (duration_t*)erl_drv_tsd_get(s->duration_key); + if (!d) { + if ((d = enif_alloc(sizeof(duration_t))) == NULL) + return 0; + memset(d, 0, sizeof(duration_t)); + erl_drv_tsd_set(s->duration_key, d); + } + t = ts(d->unit); + d->then = t; + return t; +} + +void +__stat_reset(struct stat *s) +{ + duration_t *d; + + if (!s) + return; + + s->min = ~0; + s->max = 0; + s->h = 0; + memset(s->histogram, 0, sizeof(uint64_t) * 64); + memset(s->samples, 0, sizeof(uint64_t) * s->num_samples); + d = (duration_t*)erl_drv_tsd_get(s->duration_key); + if (d) + d->then = 0; +} + +uint64_t +__stat_tock(struct stat *s) +{ + uint64_t now; + uint64_t elapsed; + uint32_t i; + duration_t *d; + + if (!s) + return 0.0; + + d = (duration_t*)erl_drv_tsd_get(s->duration_key); + if (!d) + return 0; + + now = ts(d->unit); + elapsed = now - d->then; + i = s->h; + if (s->n == s->num_samples) { + s->mean = (s->mean + __stat_mean(s)) / 2.0; + if (s->n >= 4294967295) + __stat_reset(s); + } + s->h = (s->h + 1) % s->num_samples; + s->samples[i] = elapsed; + if (elapsed < s->min) + s->min = elapsed; + if (elapsed > s->max) + s->max = elapsed; + s->histogram[LOG2(elapsed)]++; + s->n++; + d->then = ts(d->unit); + return elapsed; +} + +void +__stat_print_histogram(struct stat *s, const char *mod) +{ + uint8_t logs[64]; + uint8_t i, j, max_log = 0; + double m; + + if (!s) + return; + + m = (s->mean + __stat_mean(s) / 2.0); + + fprintf(stderr, "%s:async_nif request latency histogram:\n", mod); + for (i = 0; i < 64; i++) { + logs[i] = LOG2(s->histogram[i]); + if (logs[i] > max_log) + max_log = logs[i]; + } + for (i = max_log; i > 0; i--) { + if (!(i % 10)) + fprintf(stderr, "2^%2d ", i); + else + fprintf(stderr, " "); + for(j = 0; j < 64; j++) + fprintf(stderr, logs[j] >= i ? "•" : " "); + fprintf(stderr, "\n"); + } + if (max_log == 0) { + fprintf(stderr, "[empty]\n"); + } else { + fprintf(stderr, " ns μs ms s ks\n"); + fprintf(stderr, "min: "); + if (s->min < 1000) + fprintf(stderr, "%llu (ns)", PRIuint64(s->min)); + else if (s->min < 1000000) + fprintf(stderr, "%.2f (μs)", s->min / 1000.0); + else if (s->min < 1000000000) + fprintf(stderr, "%.2f (ms)", s->min / 1000000.0); + else if (s->min < 1000000000000) + fprintf(stderr, "%.2f (s)", s->min / 1000000000.0); + fprintf(stderr, " max: "); + if (s->max < 1000) + fprintf(stderr, "%llu (ns)", PRIuint64(s->max)); + else if (s->max < 1000000) + fprintf(stderr, "%.2f (μs)", s->max / 1000.0); + else if (s->max < 1000000000) + fprintf(stderr, "%.2f (ms)", s->max / 1000000.0); + else if (s->max < 1000000000000) + fprintf(stderr, "%.2f (s)", s->max / 1000000000.0); + fprintf(stderr, " mean: "); + if (m < 1000) + fprintf(stderr, "%.2f (ns)", m); + else if (m < 1000000) + fprintf(stderr, "%.2f (μs)", m / 1000.0); + else if (m < 1000000000) + fprintf(stderr, "%.2f (ms)", m / 1000000.0); + else if (m < 1000000000000) + fprintf(stderr, "%.2f (s)", m / 1000000000.0); + fprintf(stderr, "\n"); + } + fflush(stderr); +} + +void +__stat_free(struct stat *s) +{ + if (!s) + return; + + enif_free(s->samples); + enif_free(s); +} + +struct stat * +__stat_init(uint32_t n) +{ + struct stat *s = enif_alloc(sizeof(struct stat) + (sizeof(uint64_t) * n)); + if (!s) + return NULL; + memset(s, 0, sizeof(struct stat) + (sizeof(uint64_t) * n)); + s->min = ~0; + s->max = 0; + s->mean = 0.0; + s->h = 0; + s->num_samples = n; + erl_drv_tsd_key_create(NULL, &(s->duration_key)); + return s; +} diff --git a/c_src/stats.h b/c_src/stats.h index 35192ec..6d7f983 100644 --- a/c_src/stats.h +++ b/c_src/stats.h @@ -27,185 +27,25 @@ extern "C" { #endif -#include "duration.h" +#define STAT_DEF(name) struct stat *name ## _stat; -/** - * Calculate the log2 of 64bit unsigned integers. - */ -#ifdef __GCC__ -#define LOG2(X) ((unsigned) ((8 * (sizeof(uint64_t) - 1)) - __builtin_clzll((X)))) -#else -static unsigned int __log2_64(uint64_t x) { - static const int tab64[64] = { - 63, 0, 58, 1, 59, 47, 53, 2, - 60, 39, 48, 27, 54, 33, 42, 3, - 61, 51, 37, 40, 49, 18, 28, 20, - 55, 30, 34, 11, 43, 14, 22, 4, - 62, 57, 46, 52, 38, 26, 32, 41, - 50, 36, 17, 19, 29, 10, 13, 21, - 56, 45, 25, 31, 35, 16, 9, 12, - 44, 24, 15, 8, 23, 7, 6, 5}; - if (x == 0) return 0; - uint64_t v = x; - v |= v >> 1; - v |= v >> 2; - v |= v >> 4; - v |= v >> 8; - v |= v >> 16; - v |= v >> 32; - return tab64[((uint64_t)((v - (v >> 1)) * 0x07EDD5E59A4E28C2)) >> 58]; -} -#define LOG2(X) __log2_64(X) -#endif - -#define STAT_DEF(name) struct name ## _stat name ## _stat; - -#define STAT_DECL(name, nsamples) \ - struct name ## _stat { \ - duration_t d; \ - uint64_t histogram[64]; \ - uint32_t h, n; \ - uint64_t samples[nsamples]; \ - uint64_t min, max; \ - double mean; \ - }; \ - static inline double name ## _stat_mean(struct name ## _stat *s) { \ - uint32_t t = s->h; \ - uint32_t h = (s->h + 1) % nsamples; \ - double mean = 0; \ - while (h != t) { \ - mean += s->samples[h]; \ - h = (h + 1) % nsamples; \ - } \ - if (mean > 0) \ - mean /= (double)(s->n < nsamples ? s->n : nsamples); \ - return mean; \ - } \ - static inline double name ## _stat_mean_lg2(struct name ## _stat *s) { \ - uint32_t i; \ - double mean = 0; \ - for (i = 0; i < 64; i++) \ - mean += (s->histogram[i] * i); \ - if (mean > 0) \ - mean /= (double)s->n; \ - return mean; \ - } \ - static inline uint64_t name ## _stat_tick(struct name ## _stat *s) \ - { \ - uint64_t t = ts(s->d.unit); \ - s->d.then = t; \ - return t; \ - } \ - static inline void name ## _stat_reset(struct name ## _stat *s) \ - { \ - s->min = ~0; \ - s->max = 0; \ - s->h = 0; \ - memset(&s->histogram, 0, sizeof(uint64_t) * 64); \ - memset(&s->samples, 0, sizeof(uint64_t) * nsamples); \ - } \ - static inline uint64_t name ## _stat_tock(struct name ## _stat *s) \ - { \ - uint64_t now = ts(s->d.unit); \ - uint64_t elapsed = now - s->d.then; \ - uint32_t i = s->h; \ - if (s->n == nsamples) { \ - s->mean = (s->mean + name ## _stat_mean(s)) / 2.0; \ - if (s->n >= 4294967295) \ - name ## _stat_reset(s); \ - } \ - s->h = (s->h + 1) % nsamples; \ - s->samples[i] = elapsed; \ - if (elapsed < s->min) \ - s->min = elapsed; \ - if (elapsed > s->max) \ - s->max = elapsed; \ - s->histogram[LOG2(elapsed)]++; \ - s->n++; \ - s->d.then = ts(s->d.unit); \ - return elapsed; \ - } \ - static void name ## _stat_print_histogram(struct name ## _stat *s, const char *mod) \ - { \ - uint8_t logs[64]; \ - uint8_t i, j, max_log = 0; \ - double m = (s->mean + name ## _stat_mean(s) / 2.0); \ - \ - fprintf(stderr, "%s:async_nif request latency histogram:\n", mod); \ - for (i = 0; i < 64; i++) { \ - logs[i] = LOG2(s->histogram[i]); \ - if (logs[i] > max_log) \ - max_log = logs[i]; \ - } \ - for (i = max_log; i > 0; i--) { \ - if (!(i % 10)) \ - fprintf(stderr, "2^%2d ", i); \ - else \ - fprintf(stderr, " "); \ - for(j = 0; j < 64; j++) \ - fprintf(stderr, logs[j] >= i ? "•" : " "); \ - fprintf(stderr, "\n"); \ - } \ - if (max_log == 0) { \ - fprintf(stderr, "[empty]\n"); \ - } else { \ - fprintf(stderr, " ns μs ms s ks\n"); \ - fprintf(stderr, "min: "); \ - if (s->min < 1000) \ - fprintf(stderr, "%llu (ns)", PRIuint64(s->min)); \ - else if (s->min < 1000000) \ - fprintf(stderr, "%.2f (μs)", s->min / 1000.0); \ - else if (s->min < 1000000000) \ - fprintf(stderr, "%.2f (ms)", s->min / 1000000.0); \ - else if (s->min < 1000000000000) \ - fprintf(stderr, "%.2f (s)", s->min / 1000000000.0); \ - fprintf(stderr, " max: "); \ - if (s->max < 1000) \ - fprintf(stderr, "%llu (ns)", PRIuint64(s->max)); \ - else if (s->max < 1000000) \ - fprintf(stderr, "%.2f (μs)", s->max / 1000.0); \ - else if (s->max < 1000000000) \ - fprintf(stderr, "%.2f (ms)", s->max / 1000000.0); \ - else if (s->max < 1000000000000) \ - fprintf(stderr, "%.2f (s)", s->max / 1000000000.0); \ - fprintf(stderr, " mean: "); \ - if (m < 1000) \ - fprintf(stderr, "%.2f (ns)", m); \ - else if (m < 1000000) \ - fprintf(stderr, "%.2f (μs)", m / 1000.0); \ - else if (m < 1000000000) \ - fprintf(stderr, "%.2f (ms)", m / 1000000.0); \ - else if (m < 1000000000000) \ - fprintf(stderr, "%.2f (s)", m / 1000000000.0); \ - fprintf(stderr, "\n"); \ - } \ - fflush(stderr); \ - } - - -#define STAT_INIT(var, name) \ - (var)->name ## _stat.min = ~0; \ - (var)->name ## _stat.max = 0; \ - (var)->name ## _stat.mean = 0.0; \ - (var)->name ## _stat.h = 0; \ - (var)->name ## _stat.d.then = 0; \ - (var)->name ## _stat.d.unit = ns; - -#define STAT_TICK(var, name) name ## _stat_tick(&(var)->name ## _stat) - -#define STAT_TOCK(var, name) name ## _stat_tock(&(var)->name ## _stat) - -#define STAT_RESET(var, name) name ## _stat_reset(&(var)->name ## _stat) - -#define STAT_MEAN_LOG2_SAMPLE(var, name) \ - name ## _stat_mean_lg2(&(var)->name ## _stat) - -#define STAT_MEAN_SAMPLE(var, name) \ - name ## _stat_mean(&(var)->name ## _stat) - -#define STAT_PRINT(var, name, mod) \ - name ## _stat_print_histogram(&(var)->name ## _stat, mod) +struct stat { + ErlDrvTSDKey duration_key; + uint32_t h, n, num_samples; + uint64_t min, max; + double mean; + uint64_t histogram[64]; + uint64_t samples[]; +}; +extern double __stat_mean(struct stat *s); +extern double __stat_mean_log2(struct stat *s); +extern uint64_t __stat_tick(struct stat *s); +extern void __stat_reset(struct stat *s); +extern uint64_t __stat_tock(struct stat *s); +extern void __stat_print_histogram(struct stat *s, const char *mod); +extern void __stat_free(struct stat *s); +extern struct stat *__stat_init(uint32_t n); #if defined(__cplusplus) } diff --git a/c_src/wterl.c b/c_src/wterl.c index 824224d..5f44ae7 100644 --- a/c_src/wterl.c +++ b/c_src/wterl.c @@ -26,8 +26,10 @@ #include #include -#include "common.h" #include "wiredtiger.h" + +#include "common.h" +#include "duration.h" #include "stats.h" #include "async_nif.h" #include "queue.h" @@ -191,16 +193,14 @@ static inline uint32_t __log2(uint64_t x) { static int __ctx_cache_evict(WterlConnHandle *conn_handle) { - uint32_t num_evicted = 0; - struct wterl_ctx *c; - - if (conn_handle->cache_size < MAX_CACHE_SIZE) - return 0; - -#if 0 // TODO: fixme once stats work again + static uint16_t ncalls = 0; uint32_t mean, log, num_evicted, i; uint64_t now, elapsed; struct wterl_ctx *c, *n; + + if (conn_handle->cache_size < MAX_CACHE_SIZE && ++ncalls != 0) + return 0; + now = cpu_clock_ticks(); // Find the mean of the recorded times that items stayed in cache. @@ -233,16 +233,6 @@ __ctx_cache_evict(WterlConnHandle *conn_handle) } c = n; } -#else - c = STAILQ_FIRST(&conn_handle->cache); - if (c) { - STAILQ_REMOVE(&conn_handle->cache, c, wterl_ctx, entries); - DPRINTF("evicting: %llu", PRIuint64(c->sig)); - c->session->close(c->session, NULL); - enif_free(c); - num_evicted++; - } -#endif conn_handle->cache_size -= num_evicted; return num_evicted; } @@ -295,57 +285,6 @@ __ctx_cache_add(WterlConnHandle *conn_handle, struct wterl_ctx *c) enif_mutex_unlock(conn_handle->cache_mutex); } -/** - * Create a signature for the operation we're about to perform. - * - * Create a 64-bit hash signature for this a combination of session - * configuration some number of cursors open on tables each potentially with a - * different configuration. "session_config, [{table_name, cursor_config}, - * ...]" - * - * session_config the string used to configure the WT_SESSION - * ... each pair of items in the varargs array is a table name, - * cursor config pair - * -> number of variable arguments processed - */ -static uint64_t -__ctx_cache_sig(const char *c, va_list ap, int count, size_t *len) -{ - int i = 0; - uint32_t hash = 0; - uint32_t crc = 0; - uint64_t sig = 0; - const char *arg; - size_t l = 0; - - *len = 0; - - if (c) { - l = __strlen(c); - hash = __str_hash(hash, c, l); - crc = __crc32(crc, c, l); - *len += l + 1; - } else { - *len += 1; - } - - for (i = 0; i < (2 * count); i++) { - arg = va_arg(ap, const char *); - if (arg) { - l = __strlen(arg); - hash = __str_hash(hash, arg, l); - crc = __crc32(crc, arg, __strlen(arg)); - *len += l + 1; - } else { - *len += 1; - } - } - - sig = (uint64_t)crc << 32 | hash; - //DPRINTF("sig %llu [%u:%u]", PRIuint64(sig), crc, hash); - return sig; -} - static inline char * __copy_str_into(char **p, const char *s) { @@ -366,42 +305,63 @@ __retain_ctx(WterlConnHandle *conn_handle, uint32_t worker_id, struct wterl_ctx **ctx, int count, const char *session_config, ...) { - int i = 3; - size_t sig_len = 0; + int i = 0; + uint32_t hash = 0; + uint32_t crc = 0; + uint64_t sig = 0; + size_t l, sig_len = 0; va_list ap; - uint64_t sig; const char *arg; struct wterl_ctx *c; arg = session_config; va_start(ap, session_config); - sig = __ctx_cache_sig(session_config, ap, count, &sig_len); + if (session_config) { + l = __strlen(session_config); + hash = __str_hash(hash, session_config, l); + crc = __crc32(crc, session_config, l); + sig_len += l + 1; + DPRINTF("sig/1: %s", session_config); + } else { + sig_len += 1; + } + for (i = 0; i < (2 * count); i++) { + arg = va_arg(ap, const char *); + if (arg) { + l = __strlen(arg); + DPRINTF("sig/args: %s", arg); + hash = __str_hash(hash, arg, l); + crc = __crc32(crc, arg, l); + sig_len += l + 1; + } else { + sig_len += 1; + } + } + sig = (uint64_t)crc << 32 | hash; + DPRINTF("sig %llu [%u:%u]", PRIuint64(sig), crc, hash); va_end(ap); *ctx = NULL; - do { - c = conn_handle->mru_ctx[worker_id]; - if (CASPO(&conn_handle->mru_ctx[worker_id], c, 0) == c) { - if (c == 0) { - // mru miss: - DPRINTF("[%.4u] mru miss, empty", worker_id); - *ctx = NULL; - } else { - if (c->sig == sig) { - // mru hit: - DPRINTF("[%.4u] mru hit: %llu found", worker_id, PRIuint64(sig)); - *ctx = c; - break; - } else { - // mru mismatch: - DPRINTF("[%.4u] mru miss: %llu != %llu", worker_id, PRIuint64(sig), PRIuint64(c->sig)); - __ctx_cache_add(conn_handle, c); - *ctx = NULL; - } - } + + c = conn_handle->mru_ctx[worker_id]; + if (CASPO(&conn_handle->mru_ctx[worker_id], c, 0) == c) { + if (c == 0) { + // mru miss: + DPRINTF("[%.4u] mru miss, empty", worker_id); + *ctx = NULL; + } else { + if (c->sig == sig) { + // mru hit: + DPRINTF("[%.4u] mru hit: %llu found", worker_id, PRIuint64(sig)); + *ctx = c; + } else { + // mru mismatch: + DPRINTF("[%.4u] mru miss: %llu != %llu", worker_id, PRIuint64(sig), PRIuint64(c->sig)); + __ctx_cache_add(conn_handle, c); + *ctx = NULL; + } } - // CAS failed, retry up to 3 times - } while(i--); + } if (*ctx == NULL) { // check the cache @@ -474,9 +434,9 @@ __release_ctx(WterlConnHandle *conn_handle, uint32_t worker_id, struct wterl_ctx } else { if (c != NULL) { __ctx_cache_add(conn_handle, c); - DPRINTF("[%.4u] reset %d cursors, returnd ctx to cache", worker_id, ctx->num_cursors); + DPRINTF("[%.4u] reset %d cursors, returned ctx to cache", worker_id, ctx->num_cursors); } else { - DPRINTF("[%.4u] reset %d cursors, returnd ctx to mru", worker_id, ctx->num_cursors); + DPRINTF("[%.4u] reset %d cursors, returned ctx to mru", worker_id, ctx->num_cursors); } } }