diff --git a/c_src/async_nif.h b/c_src/async_nif.h index ff76364..6627152 100644 --- a/c_src/async_nif.h +++ b/c_src/async_nif.h @@ -26,6 +26,7 @@ extern "C" { #include #include "fifo_q.h" +#include "queue.h" #include "stats.h" #ifndef UNUSED @@ -33,11 +34,9 @@ extern "C" { #endif #define ASYNC_NIF_MAX_WORKERS 1024 -#define ASYNC_NIF_WORKER_QUEUE_SIZE 2000 +#define ASYNC_NIF_WORKER_QUEUE_SIZE 1000 #define ASYNC_NIF_MAX_QUEUED_REQS ASYNC_NIF_WORKER_QUEUE_SIZE * ASYNC_NIF_MAX_WORKERS -STAT_DECL(qwait, 1000); - struct async_nif_req_entry { ERL_NIF_TERM ref; ErlNifEnv *env; @@ -45,12 +44,12 @@ struct async_nif_req_entry { void *args; void (*fn_work)(ErlNifEnv*, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *); void (*fn_post)(void *); - const char *func; }; DECL_FIFO_QUEUE(reqs, struct async_nif_req_entry); struct async_nif_work_queue { STAT_DEF(qwait); + unsigned int workers; ErlNifMutex *reqs_mutex; ErlNifCond *reqs_cnd; FIFO_QUEUE_TYPE(reqs) reqs; @@ -61,13 +60,15 @@ struct async_nif_worker_entry { unsigned int worker_id; struct async_nif_state *async_nif; struct async_nif_work_queue *q; + SLIST_ENTRY(async_nif_worker_entry) entries; }; struct async_nif_state { STAT_DEF(qwait); unsigned int shutdown; - unsigned int num_workers; - struct async_nif_worker_entry worker_entries[ASYNC_NIF_MAX_WORKERS]; + ErlNifMutex *we_mutex; + unsigned int we_active; + SLIST_HEAD(joining, async_nif_worker_entry) we_joining; unsigned int num_queues; unsigned int next_q; FIFO_QUEUE_TYPE(reqs) recycled_reqs; @@ -107,7 +108,6 @@ struct async_nif_state { enif_make_atom(env, "shutdown")); \ req = async_nif_reuse_req(async_nif); \ if (!req) { \ - async_nif_recycle_req(req, async_nif); \ return enif_make_tuple2(env, enif_make_atom(env, "error"), \ enif_make_atom(env, "eagain")); \ } \ @@ -128,7 +128,6 @@ struct async_nif_state { req->args = (void*)copy_of_args; \ req->fn_work = (void (*)(ErlNifEnv *, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *))fn_work_ ## decl ; \ req->fn_post = (void (*)(void *))fn_post_ ## decl; \ - req->func = __func__; \ int h = -1; \ if (affinity) \ h = affinity % async_nif->num_queues; \ @@ -195,12 +194,12 @@ async_nif_reuse_req(struct async_nif_state *async_nif) if (req) { memset(req, 0, sizeof(struct async_nif_req_entry)); env = enif_alloc_env(); - if (!env) { - enif_free(req); - req = NULL; - } else { + if (env) { req->env = env; async_nif->num_reqs++; + } else { + enif_free(req); + req = NULL; } } } @@ -208,7 +207,7 @@ async_nif_reuse_req(struct async_nif_state *async_nif) req = fifo_q_get(reqs, async_nif->recycled_reqs); } enif_mutex_unlock(async_nif->recycled_req_mutex); - STAT_TICK(async_nif, qwait); + __stat_tick(async_nif->qwait_stat); return req; } @@ -223,16 +222,61 @@ void async_nif_recycle_req(struct async_nif_req_entry *req, struct async_nif_state *async_nif) { ErlNifEnv *env = NULL; - STAT_TOCK(async_nif, qwait); + __stat_tock(async_nif->qwait_stat); enif_mutex_lock(async_nif->recycled_req_mutex); + enif_clear_env(req->env); env = req->env; - enif_clear_env(env); memset(req, 0, sizeof(struct async_nif_req_entry)); req->env = env; fifo_q_put(reqs, async_nif->recycled_reqs, req); enif_mutex_unlock(async_nif->recycled_req_mutex); } +static void *async_nif_worker_fn(void *); + +/** + * Start up a worker thread. + */ +static int +async_nif_start_worker(struct async_nif_state *async_nif, struct async_nif_work_queue *q) +{ + struct async_nif_worker_entry *we; + + if (0 == q) + return EINVAL; + + enif_mutex_lock(async_nif->we_mutex); + + we = SLIST_FIRST(&async_nif->we_joining); + while(we != NULL) { + struct async_nif_worker_entry *n = SLIST_NEXT(we, entries); + SLIST_REMOVE_HEAD(&async_nif->we_joining, entries); + void *exit_value = 0; /* We ignore the thread_join's exit value. */ + enif_thread_join(we->tid, &exit_value); + enif_free(we); + async_nif->we_active--; + we = n; + } + + if (async_nif->we_active == ASYNC_NIF_MAX_WORKERS) { + enif_mutex_unlock(async_nif->we_mutex); + return EAGAIN; + } + + we = enif_alloc(sizeof(struct async_nif_worker_entry)); + if (!we) { + enif_mutex_unlock(async_nif->we_mutex); + return ENOMEM; + } + memset(we, 0, sizeof(struct async_nif_worker_entry)); + we->worker_id = async_nif->we_active++; + we->async_nif = async_nif; + we->q = q; + + enif_mutex_unlock(async_nif->we_mutex); + return enif_thread_create(NULL,&we->tid, &async_nif_worker_fn, (void*)we, 0); +} + /** * Enqueue a request for processing by a worker thread. * @@ -244,7 +288,10 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en { /* Identify the most appropriate worker for this request. */ unsigned int qid = 0; + unsigned int n = async_nif->num_queues; struct async_nif_work_queue *q = NULL; + double await = 0; + double await_inthisq = 0; /* Either we're choosing a queue based on some affinity/hinted value or we need to select the next queue in the rotation and atomically update that @@ -257,12 +304,6 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en async_nif->next_q = qid; } - q = &async_nif->queues[qid]; - enif_mutex_lock(q->reqs_mutex); - -#if 0 // stats aren't yet thread safe, so this can go haywire... TODO: fix. - unsigned int n = async_nif->num_queues; - /* Now we inspect and interate across the set of queues trying to select one that isn't too full or too slow. */ do { @@ -277,8 +318,8 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en return 0; } if (!fifo_q_full(reqs, q->reqs)) { - double await = STAT_MEAN_LOG2_SAMPLE(async_nif, qwait); - double await_inthisq = STAT_MEAN_LOG2_SAMPLE(q, qwait); + await = __stat_mean_log2(async_nif->qwait_stat); + await_inthisq = __stat_mean_log2(q->qwait_stat); if (await_inthisq > await) { enif_mutex_unlock(q->reqs_mutex); qid = (qid + 1) % async_nif->num_queues; @@ -288,13 +329,18 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en break; } } - // TODO: at some point add in work sheading/stealing } while(n-- > 0); -#endif /* We hold the queue's lock, and we've seletect a reasonable queue for this - new request so add the request. */ - STAT_TICK(q, qwait); + new request now check to make sure there are enough workers actively + processing requests on this queue. */ + if (q->workers < 2 || await_inthisq > await) { + if (async_nif_start_worker(async_nif, q) == 0) + q->workers++; + } + + /* And finally add the request to the queue. */ + __stat_tick(q->qwait_stat); fifo_q_put(reqs, q->reqs, req); /* Build the term before releasing the lock so as not to race on the use of @@ -331,9 +377,14 @@ async_nif_worker_fn(void *arg) } if (fifo_q_empty(reqs, q->reqs)) { /* Queue is empty so we wait for more work to arrive. */ - STAT_RESET(q, qwait); - enif_cond_wait(q->reqs_cnd, q->reqs_mutex); - goto check_again_for_work; + __stat_reset(q->qwait_stat); + if (q->workers > 2) { + enif_mutex_unlock(q->reqs_mutex); + break; + } else { + enif_cond_wait(q->reqs_cnd, q->reqs_mutex); + goto check_again_for_work; + } } else { assert(fifo_q_size(reqs, q->reqs) > 0); assert(fifo_q_size(reqs, q->reqs) < fifo_q_capacity(reqs, q->reqs)); @@ -348,7 +399,7 @@ async_nif_worker_fn(void *arg) /* Perform the work. */ req->fn_work(req->env, req->ref, &req->pid, worker_id, req->args); - STAT_TOCK(q, qwait); + __stat_tock(q->qwait_stat); /* Now call the post-work cleanup function. */ req->fn_post(req->args); @@ -363,6 +414,10 @@ async_nif_worker_fn(void *arg) req = NULL; } } + enif_mutex_lock(async_nif->we_mutex); + SLIST_INSERT_HEAD(&async_nif->we_joining, we, entries); + enif_mutex_unlock(async_nif->we_mutex); + q->workers--; enif_thread_exit(0); return 0; } @@ -374,9 +429,10 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif) unsigned int num_queues = async_nif->num_queues; struct async_nif_work_queue *q = NULL; struct async_nif_req_entry *req = NULL; + struct async_nif_worker_entry *we = NULL; UNUSED(env); - STAT_PRINT(async_nif, qwait, "wterl"); + __stat_print_histogram(async_nif->qwait_stat, "wterl"); /* Signal the worker threads, stop what you're doing and exit. To ensure that we don't race with the enqueue() process we first @@ -393,19 +449,29 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif) executing requests. */ async_nif->shutdown = 1; - /* Make sure to wake up all worker threads sitting on conditional - wait for work so that they can see it's time to exit. */ for (i = 0; i < num_queues; i++) { q = &async_nif->queues[i]; - enif_cond_broadcast(q->reqs_cnd); enif_mutex_unlock(q->reqs_mutex); } /* Join for the now exiting worker threads. */ - for (i = 0; i < async_nif->num_workers; ++i) { - void *exit_value = 0; /* We ignore the thread_join's exit value. */ - enif_thread_join(async_nif->worker_entries[i].tid, &exit_value); + while(async_nif->we_active > 0) { + + for (i = 0; i < num_queues; i++) + enif_cond_broadcast(async_nif->queues[i].reqs_cnd); + + we = SLIST_FIRST(&async_nif->we_joining); + while(we != NULL) { + struct async_nif_worker_entry *n = SLIST_NEXT(we, entries); + SLIST_REMOVE_HEAD(&async_nif->we_joining, entries); + void *exit_value = 0; /* We ignore the thread_join's exit value. */ + enif_thread_join(we->tid, &exit_value); + enif_free(we); + async_nif->we_active--; + we = n; + } } + enif_mutex_destroy(async_nif->we_mutex); /* Cleanup in-flight requests, mutexes and conditions in each work queue. */ for (i = 0; i < num_queues; i++) { @@ -447,7 +513,7 @@ static void * async_nif_load() { static int has_init = 0; - unsigned int i, j, num_queues; + unsigned int i, num_queues; ErlNifSysInfo info; struct async_nif_state *async_nif; @@ -477,57 +543,24 @@ async_nif_load() if (!async_nif) return NULL; memset(async_nif, 0, sizeof(struct async_nif_state) + - sizeof(struct async_nif_work_queue) * num_queues); + sizeof(struct async_nif_work_queue) * num_queues); async_nif->num_queues = num_queues; - async_nif->num_workers = ASYNC_NIF_MAX_WORKERS; + async_nif->we_active = 0; async_nif->next_q = 0; async_nif->shutdown = 0; async_nif->recycled_reqs = fifo_q_new(reqs, ASYNC_NIF_MAX_QUEUED_REQS); async_nif->recycled_req_mutex = enif_mutex_create(NULL); - STAT_INIT(async_nif, qwait); + async_nif->qwait_stat = __stat_init(1000); + async_nif->we_mutex = enif_mutex_create(NULL); + SLIST_INIT(&async_nif->we_joining); for (i = 0; i < async_nif->num_queues; i++) { struct async_nif_work_queue *q = &async_nif->queues[i]; q->reqs = fifo_q_new(reqs, ASYNC_NIF_WORKER_QUEUE_SIZE); q->reqs_mutex = enif_mutex_create(NULL); q->reqs_cnd = enif_cond_create(NULL); - STAT_INIT(q, qwait); - } - - /* Setup the thread pool management. */ - memset(async_nif->worker_entries, 0, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS); - - /* Start the worker threads. */ - for (i = 0; i < async_nif->num_workers; i++) { - struct async_nif_worker_entry *we = &async_nif->worker_entries[i]; - we->async_nif = async_nif; - we->worker_id = i; - we->q = &async_nif->queues[i % async_nif->num_queues]; - if (enif_thread_create(NULL, &async_nif->worker_entries[i].tid, - &async_nif_worker_fn, (void*)we, NULL) != 0) { - async_nif->shutdown = 1; - - for (j = 0; j < async_nif->num_queues; j++) { - struct async_nif_work_queue *q = &async_nif->queues[j]; - enif_cond_broadcast(q->reqs_cnd); - } - - while(i-- > 0) { - void *exit_value = 0; /* Ignore this. */ - enif_thread_join(async_nif->worker_entries[i].tid, &exit_value); - } - - for (j = 0; j < async_nif->num_queues; j++) { - struct async_nif_work_queue *q = &async_nif->queues[j]; - enif_mutex_destroy(q->reqs_mutex); - enif_cond_destroy(q->reqs_cnd); - } - - memset(async_nif->worker_entries, 0, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS); - enif_free(async_nif); - return NULL; - } + q->qwait_stat = __stat_init(1000); } return async_nif; } diff --git a/c_src/build_deps.sh b/c_src/build_deps.sh index cc0e807..15608ef 100755 --- a/c_src/build_deps.sh +++ b/c_src/build_deps.sh @@ -36,13 +36,12 @@ get_wt () (cd $BASEDIR/$WT_DIR && git pull -u) || exit 1 else if [ "X$WT_REF" != "X" ]; then - git clone ${WT_REPO} && \ - (cd $BASEDIR/wiredtiger && git checkout refs/$WT_REF || exit 1) + git clone ${WT_REPO} ${WT_DIR} && \ + (cd $BASEDIR/$WT_DIR && git checkout refs/$WT_REF || exit 1) else - git clone ${WT_REPO} && \ - (cd $BASEDIR/wiredtiger && git checkout -b $WT_BRANCH origin/$WT_BRANCH || exit 1) + git clone ${WT_REPO} ${WT_DIR} && \ + (cd $BASEDIR/$WT_DIR && git checkout -b $WT_BRANCH origin/$WT_BRANCH || exit 1) fi - mv wiredtiger $WT_DIR || exit 1 fi [ -d $BASEDIR/$WT_DIR ] || (echo "Missing WiredTiger source directory" && exit 1) (cd $BASEDIR/$WT_DIR diff --git a/c_src/cas.h b/c_src/cas.h index ea81dbf..61c1f61 100644 --- a/c_src/cas.h +++ b/c_src/cas.h @@ -69,9 +69,9 @@ do { \ __val = __newval; \ } while ( 0 ) -#define ALIGNED_ENIF_ALLOC(_s) \ - ((void *)(((unsigned long)enif_alloc((_s)+CACHE_LINE_SIZE*2) + \ - CACHE_LINE_SIZE - 1) & ~(CACHE_LINE_SIZE-1))) \ +#define CACHE_ALIGNED_SIZEOF(_s) \ + ((sizeof(_s)) + CACHE_LINE_SIZE*2) + \ + CACHE_LINE_SIZE - 1) & ~(CACHE_LINE_SIZE-1))) \ /* * I. Compare-and-swap. diff --git a/c_src/duration.h b/c_src/duration.h index 1404f41..2d86385 100644 --- a/c_src/duration.h +++ b/c_src/duration.h @@ -19,7 +19,7 @@ #endif -void current_utc_time(struct timespec *ts) +static inline void current_utc_time(struct timespec *ts) { #ifdef __MACH__ // OS X does not have clock_gettime, use clock_get_time clock_serv_t cclock; diff --git a/c_src/stats.c b/c_src/stats.c new file mode 100644 index 0000000..9d56f9e --- /dev/null +++ b/c_src/stats.c @@ -0,0 +1,260 @@ +/* + * stats: + * + * Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved. + * Author: Gregory Burd + * + * This file is provided to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file + * except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include + +#include "erl_nif.h" +#include "erl_driver.h" + +#include "common.h" +#include "duration.h" +#include "stats.h" + +/** + * Calculate the log2 of 64bit unsigned integers. + */ +#ifdef __GCC__ +#define LOG2(X) ((unsigned) ((8 * (sizeof(uint64_t) - 1)) - __builtin_clzll((X)))) +#else +static unsigned int __log2_64(uint64_t x) { + static const int tab64[64] = { + 63, 0, 58, 1, 59, 47, 53, 2, + 60, 39, 48, 27, 54, 33, 42, 3, + 61, 51, 37, 40, 49, 18, 28, 20, + 55, 30, 34, 11, 43, 14, 22, 4, + 62, 57, 46, 52, 38, 26, 32, 41, + 50, 36, 17, 19, 29, 10, 13, 21, + 56, 45, 25, 31, 35, 16, 9, 12, + 44, 24, 15, 8, 23, 7, 6, 5}; + if (x == 0) return 0; + uint64_t v = x; + v |= v >> 1; + v |= v >> 2; + v |= v >> 4; + v |= v >> 8; + v |= v >> 16; + v |= v >> 32; + return tab64[((uint64_t)((v - (v >> 1)) * 0x07EDD5E59A4E28C2)) >> 58]; +} +#define LOG2(X) __log2_64(X) +#endif + +double +__stat_mean(struct stat *s) +{ + uint32_t t, h; + double mean; + + if (!s) + return 0.0; + + t = s->h; + h = (s->h + 1) % s->num_samples; + mean = 0; + + while (h != t) { + mean += s->samples[h]; + h = (h + 1) % s->num_samples; + } + if (mean > 0) + mean /= (double)(s->n < s->num_samples ? s->n : s->num_samples); + return mean; +} + +double +__stat_mean_log2(struct stat *s) +{ + uint32_t i; + double mean; + + if (!s) + return 0.0; + + mean = 0; + for (i = 0; i < 64; i++) + mean += (s->histogram[i] * i); + if (mean > 0) + mean /= (double)s->n; + return mean; +} + +uint64_t +__stat_tick(struct stat *s) +{ + duration_t *d; + uint64_t t; + + if (!s) + return 0.0; + + d = (duration_t*)erl_drv_tsd_get(s->duration_key); + if (!d) { + if ((d = enif_alloc(sizeof(duration_t))) == NULL) + return 0; + memset(d, 0, sizeof(duration_t)); + erl_drv_tsd_set(s->duration_key, d); + } + t = ts(d->unit); + d->then = t; + return t; +} + +void +__stat_reset(struct stat *s) +{ + duration_t *d; + + if (!s) + return; + + s->min = ~0; + s->max = 0; + s->h = 0; + memset(s->histogram, 0, sizeof(uint64_t) * 64); + memset(s->samples, 0, sizeof(uint64_t) * s->num_samples); + d = (duration_t*)erl_drv_tsd_get(s->duration_key); + if (d) + d->then = 0; +} + +uint64_t +__stat_tock(struct stat *s) +{ + uint64_t now; + uint64_t elapsed; + uint32_t i; + duration_t *d; + + if (!s) + return 0.0; + + d = (duration_t*)erl_drv_tsd_get(s->duration_key); + if (!d) + return 0; + + now = ts(d->unit); + elapsed = now - d->then; + i = s->h; + if (s->n == s->num_samples) { + s->mean = (s->mean + __stat_mean(s)) / 2.0; + if (s->n >= 4294967295) + __stat_reset(s); + } + s->h = (s->h + 1) % s->num_samples; + s->samples[i] = elapsed; + if (elapsed < s->min) + s->min = elapsed; + if (elapsed > s->max) + s->max = elapsed; + s->histogram[LOG2(elapsed)]++; + s->n++; + d->then = ts(d->unit); + return elapsed; +} + +void +__stat_print_histogram(struct stat *s, const char *mod) +{ + uint8_t logs[64]; + uint8_t i, j, max_log = 0; + double m; + + if (!s) + return; + + m = (s->mean + __stat_mean(s) / 2.0); + + fprintf(stderr, "%s:async_nif request latency histogram:\n", mod); + for (i = 0; i < 64; i++) { + logs[i] = LOG2(s->histogram[i]); + if (logs[i] > max_log) + max_log = logs[i]; + } + for (i = max_log; i > 0; i--) { + if (!(i % 10)) + fprintf(stderr, "2^%2d ", i); + else + fprintf(stderr, " "); + for(j = 0; j < 64; j++) + fprintf(stderr, logs[j] >= i ? "•" : " "); + fprintf(stderr, "\n"); + } + if (max_log == 0) { + fprintf(stderr, "[empty]\n"); + } else { + fprintf(stderr, " ns μs ms s ks\n"); + fprintf(stderr, "min: "); + if (s->min < 1000) + fprintf(stderr, "%llu (ns)", PRIuint64(s->min)); + else if (s->min < 1000000) + fprintf(stderr, "%.2f (μs)", s->min / 1000.0); + else if (s->min < 1000000000) + fprintf(stderr, "%.2f (ms)", s->min / 1000000.0); + else if (s->min < 1000000000000) + fprintf(stderr, "%.2f (s)", s->min / 1000000000.0); + fprintf(stderr, " max: "); + if (s->max < 1000) + fprintf(stderr, "%llu (ns)", PRIuint64(s->max)); + else if (s->max < 1000000) + fprintf(stderr, "%.2f (μs)", s->max / 1000.0); + else if (s->max < 1000000000) + fprintf(stderr, "%.2f (ms)", s->max / 1000000.0); + else if (s->max < 1000000000000) + fprintf(stderr, "%.2f (s)", s->max / 1000000000.0); + fprintf(stderr, " mean: "); + if (m < 1000) + fprintf(stderr, "%.2f (ns)", m); + else if (m < 1000000) + fprintf(stderr, "%.2f (μs)", m / 1000.0); + else if (m < 1000000000) + fprintf(stderr, "%.2f (ms)", m / 1000000.0); + else if (m < 1000000000000) + fprintf(stderr, "%.2f (s)", m / 1000000000.0); + fprintf(stderr, "\n"); + } + fflush(stderr); +} + +void +__stat_free(struct stat *s) +{ + if (!s) + return; + + enif_free(s->samples); + enif_free(s); +} + +struct stat * +__stat_init(uint32_t n) +{ + struct stat *s = enif_alloc(sizeof(struct stat) + (sizeof(uint64_t) * n)); + if (!s) + return NULL; + memset(s, 0, sizeof(struct stat) + (sizeof(uint64_t) * n)); + s->min = ~0; + s->max = 0; + s->mean = 0.0; + s->h = 0; + s->num_samples = n; + erl_drv_tsd_key_create(NULL, &(s->duration_key)); + return s; +} diff --git a/c_src/stats.h b/c_src/stats.h index 35192ec..6d7f983 100644 --- a/c_src/stats.h +++ b/c_src/stats.h @@ -27,185 +27,25 @@ extern "C" { #endif -#include "duration.h" +#define STAT_DEF(name) struct stat *name ## _stat; -/** - * Calculate the log2 of 64bit unsigned integers. - */ -#ifdef __GCC__ -#define LOG2(X) ((unsigned) ((8 * (sizeof(uint64_t) - 1)) - __builtin_clzll((X)))) -#else -static unsigned int __log2_64(uint64_t x) { - static const int tab64[64] = { - 63, 0, 58, 1, 59, 47, 53, 2, - 60, 39, 48, 27, 54, 33, 42, 3, - 61, 51, 37, 40, 49, 18, 28, 20, - 55, 30, 34, 11, 43, 14, 22, 4, - 62, 57, 46, 52, 38, 26, 32, 41, - 50, 36, 17, 19, 29, 10, 13, 21, - 56, 45, 25, 31, 35, 16, 9, 12, - 44, 24, 15, 8, 23, 7, 6, 5}; - if (x == 0) return 0; - uint64_t v = x; - v |= v >> 1; - v |= v >> 2; - v |= v >> 4; - v |= v >> 8; - v |= v >> 16; - v |= v >> 32; - return tab64[((uint64_t)((v - (v >> 1)) * 0x07EDD5E59A4E28C2)) >> 58]; -} -#define LOG2(X) __log2_64(X) -#endif - -#define STAT_DEF(name) struct name ## _stat name ## _stat; - -#define STAT_DECL(name, nsamples) \ - struct name ## _stat { \ - duration_t d; \ - uint64_t histogram[64]; \ - uint32_t h, n; \ - uint64_t samples[nsamples]; \ - uint64_t min, max; \ - double mean; \ - }; \ - static inline double name ## _stat_mean(struct name ## _stat *s) { \ - uint32_t t = s->h; \ - uint32_t h = (s->h + 1) % nsamples; \ - double mean = 0; \ - while (h != t) { \ - mean += s->samples[h]; \ - h = (h + 1) % nsamples; \ - } \ - if (mean > 0) \ - mean /= (double)(s->n < nsamples ? s->n : nsamples); \ - return mean; \ - } \ - static inline double name ## _stat_mean_lg2(struct name ## _stat *s) { \ - uint32_t i; \ - double mean = 0; \ - for (i = 0; i < 64; i++) \ - mean += (s->histogram[i] * i); \ - if (mean > 0) \ - mean /= (double)s->n; \ - return mean; \ - } \ - static inline uint64_t name ## _stat_tick(struct name ## _stat *s) \ - { \ - uint64_t t = ts(s->d.unit); \ - s->d.then = t; \ - return t; \ - } \ - static inline void name ## _stat_reset(struct name ## _stat *s) \ - { \ - s->min = ~0; \ - s->max = 0; \ - s->h = 0; \ - memset(&s->histogram, 0, sizeof(uint64_t) * 64); \ - memset(&s->samples, 0, sizeof(uint64_t) * nsamples); \ - } \ - static inline uint64_t name ## _stat_tock(struct name ## _stat *s) \ - { \ - uint64_t now = ts(s->d.unit); \ - uint64_t elapsed = now - s->d.then; \ - uint32_t i = s->h; \ - if (s->n == nsamples) { \ - s->mean = (s->mean + name ## _stat_mean(s)) / 2.0; \ - if (s->n >= 4294967295) \ - name ## _stat_reset(s); \ - } \ - s->h = (s->h + 1) % nsamples; \ - s->samples[i] = elapsed; \ - if (elapsed < s->min) \ - s->min = elapsed; \ - if (elapsed > s->max) \ - s->max = elapsed; \ - s->histogram[LOG2(elapsed)]++; \ - s->n++; \ - s->d.then = ts(s->d.unit); \ - return elapsed; \ - } \ - static void name ## _stat_print_histogram(struct name ## _stat *s, const char *mod) \ - { \ - uint8_t logs[64]; \ - uint8_t i, j, max_log = 0; \ - double m = (s->mean + name ## _stat_mean(s) / 2.0); \ - \ - fprintf(stderr, "%s:async_nif request latency histogram:\n", mod); \ - for (i = 0; i < 64; i++) { \ - logs[i] = LOG2(s->histogram[i]); \ - if (logs[i] > max_log) \ - max_log = logs[i]; \ - } \ - for (i = max_log; i > 0; i--) { \ - if (!(i % 10)) \ - fprintf(stderr, "2^%2d ", i); \ - else \ - fprintf(stderr, " "); \ - for(j = 0; j < 64; j++) \ - fprintf(stderr, logs[j] >= i ? "•" : " "); \ - fprintf(stderr, "\n"); \ - } \ - if (max_log == 0) { \ - fprintf(stderr, "[empty]\n"); \ - } else { \ - fprintf(stderr, " ns μs ms s ks\n"); \ - fprintf(stderr, "min: "); \ - if (s->min < 1000) \ - fprintf(stderr, "%llu (ns)", PRIuint64(s->min)); \ - else if (s->min < 1000000) \ - fprintf(stderr, "%.2f (μs)", s->min / 1000.0); \ - else if (s->min < 1000000000) \ - fprintf(stderr, "%.2f (ms)", s->min / 1000000.0); \ - else if (s->min < 1000000000000) \ - fprintf(stderr, "%.2f (s)", s->min / 1000000000.0); \ - fprintf(stderr, " max: "); \ - if (s->max < 1000) \ - fprintf(stderr, "%llu (ns)", PRIuint64(s->max)); \ - else if (s->max < 1000000) \ - fprintf(stderr, "%.2f (μs)", s->max / 1000.0); \ - else if (s->max < 1000000000) \ - fprintf(stderr, "%.2f (ms)", s->max / 1000000.0); \ - else if (s->max < 1000000000000) \ - fprintf(stderr, "%.2f (s)", s->max / 1000000000.0); \ - fprintf(stderr, " mean: "); \ - if (m < 1000) \ - fprintf(stderr, "%.2f (ns)", m); \ - else if (m < 1000000) \ - fprintf(stderr, "%.2f (μs)", m / 1000.0); \ - else if (m < 1000000000) \ - fprintf(stderr, "%.2f (ms)", m / 1000000.0); \ - else if (m < 1000000000000) \ - fprintf(stderr, "%.2f (s)", m / 1000000000.0); \ - fprintf(stderr, "\n"); \ - } \ - fflush(stderr); \ - } - - -#define STAT_INIT(var, name) \ - (var)->name ## _stat.min = ~0; \ - (var)->name ## _stat.max = 0; \ - (var)->name ## _stat.mean = 0.0; \ - (var)->name ## _stat.h = 0; \ - (var)->name ## _stat.d.then = 0; \ - (var)->name ## _stat.d.unit = ns; - -#define STAT_TICK(var, name) name ## _stat_tick(&(var)->name ## _stat) - -#define STAT_TOCK(var, name) name ## _stat_tock(&(var)->name ## _stat) - -#define STAT_RESET(var, name) name ## _stat_reset(&(var)->name ## _stat) - -#define STAT_MEAN_LOG2_SAMPLE(var, name) \ - name ## _stat_mean_lg2(&(var)->name ## _stat) - -#define STAT_MEAN_SAMPLE(var, name) \ - name ## _stat_mean(&(var)->name ## _stat) - -#define STAT_PRINT(var, name, mod) \ - name ## _stat_print_histogram(&(var)->name ## _stat, mod) +struct stat { + ErlDrvTSDKey duration_key; + uint32_t h, n, num_samples; + uint64_t min, max; + double mean; + uint64_t histogram[64]; + uint64_t samples[]; +}; +extern double __stat_mean(struct stat *s); +extern double __stat_mean_log2(struct stat *s); +extern uint64_t __stat_tick(struct stat *s); +extern void __stat_reset(struct stat *s); +extern uint64_t __stat_tock(struct stat *s); +extern void __stat_print_histogram(struct stat *s, const char *mod); +extern void __stat_free(struct stat *s); +extern struct stat *__stat_init(uint32_t n); #if defined(__cplusplus) } diff --git a/c_src/wterl.c b/c_src/wterl.c index 824224d..5f44ae7 100644 --- a/c_src/wterl.c +++ b/c_src/wterl.c @@ -26,8 +26,10 @@ #include #include -#include "common.h" #include "wiredtiger.h" + +#include "common.h" +#include "duration.h" #include "stats.h" #include "async_nif.h" #include "queue.h" @@ -191,16 +193,14 @@ static inline uint32_t __log2(uint64_t x) { static int __ctx_cache_evict(WterlConnHandle *conn_handle) { - uint32_t num_evicted = 0; - struct wterl_ctx *c; - - if (conn_handle->cache_size < MAX_CACHE_SIZE) - return 0; - -#if 0 // TODO: fixme once stats work again + static uint16_t ncalls = 0; uint32_t mean, log, num_evicted, i; uint64_t now, elapsed; struct wterl_ctx *c, *n; + + if (conn_handle->cache_size < MAX_CACHE_SIZE && ++ncalls != 0) + return 0; + now = cpu_clock_ticks(); // Find the mean of the recorded times that items stayed in cache. @@ -233,16 +233,6 @@ __ctx_cache_evict(WterlConnHandle *conn_handle) } c = n; } -#else - c = STAILQ_FIRST(&conn_handle->cache); - if (c) { - STAILQ_REMOVE(&conn_handle->cache, c, wterl_ctx, entries); - DPRINTF("evicting: %llu", PRIuint64(c->sig)); - c->session->close(c->session, NULL); - enif_free(c); - num_evicted++; - } -#endif conn_handle->cache_size -= num_evicted; return num_evicted; } @@ -295,57 +285,6 @@ __ctx_cache_add(WterlConnHandle *conn_handle, struct wterl_ctx *c) enif_mutex_unlock(conn_handle->cache_mutex); } -/** - * Create a signature for the operation we're about to perform. - * - * Create a 64-bit hash signature for this a combination of session - * configuration some number of cursors open on tables each potentially with a - * different configuration. "session_config, [{table_name, cursor_config}, - * ...]" - * - * session_config the string used to configure the WT_SESSION - * ... each pair of items in the varargs array is a table name, - * cursor config pair - * -> number of variable arguments processed - */ -static uint64_t -__ctx_cache_sig(const char *c, va_list ap, int count, size_t *len) -{ - int i = 0; - uint32_t hash = 0; - uint32_t crc = 0; - uint64_t sig = 0; - const char *arg; - size_t l = 0; - - *len = 0; - - if (c) { - l = __strlen(c); - hash = __str_hash(hash, c, l); - crc = __crc32(crc, c, l); - *len += l + 1; - } else { - *len += 1; - } - - for (i = 0; i < (2 * count); i++) { - arg = va_arg(ap, const char *); - if (arg) { - l = __strlen(arg); - hash = __str_hash(hash, arg, l); - crc = __crc32(crc, arg, __strlen(arg)); - *len += l + 1; - } else { - *len += 1; - } - } - - sig = (uint64_t)crc << 32 | hash; - //DPRINTF("sig %llu [%u:%u]", PRIuint64(sig), crc, hash); - return sig; -} - static inline char * __copy_str_into(char **p, const char *s) { @@ -366,42 +305,63 @@ __retain_ctx(WterlConnHandle *conn_handle, uint32_t worker_id, struct wterl_ctx **ctx, int count, const char *session_config, ...) { - int i = 3; - size_t sig_len = 0; + int i = 0; + uint32_t hash = 0; + uint32_t crc = 0; + uint64_t sig = 0; + size_t l, sig_len = 0; va_list ap; - uint64_t sig; const char *arg; struct wterl_ctx *c; arg = session_config; va_start(ap, session_config); - sig = __ctx_cache_sig(session_config, ap, count, &sig_len); + if (session_config) { + l = __strlen(session_config); + hash = __str_hash(hash, session_config, l); + crc = __crc32(crc, session_config, l); + sig_len += l + 1; + DPRINTF("sig/1: %s", session_config); + } else { + sig_len += 1; + } + for (i = 0; i < (2 * count); i++) { + arg = va_arg(ap, const char *); + if (arg) { + l = __strlen(arg); + DPRINTF("sig/args: %s", arg); + hash = __str_hash(hash, arg, l); + crc = __crc32(crc, arg, l); + sig_len += l + 1; + } else { + sig_len += 1; + } + } + sig = (uint64_t)crc << 32 | hash; + DPRINTF("sig %llu [%u:%u]", PRIuint64(sig), crc, hash); va_end(ap); *ctx = NULL; - do { - c = conn_handle->mru_ctx[worker_id]; - if (CASPO(&conn_handle->mru_ctx[worker_id], c, 0) == c) { - if (c == 0) { - // mru miss: - DPRINTF("[%.4u] mru miss, empty", worker_id); - *ctx = NULL; - } else { - if (c->sig == sig) { - // mru hit: - DPRINTF("[%.4u] mru hit: %llu found", worker_id, PRIuint64(sig)); - *ctx = c; - break; - } else { - // mru mismatch: - DPRINTF("[%.4u] mru miss: %llu != %llu", worker_id, PRIuint64(sig), PRIuint64(c->sig)); - __ctx_cache_add(conn_handle, c); - *ctx = NULL; - } - } + + c = conn_handle->mru_ctx[worker_id]; + if (CASPO(&conn_handle->mru_ctx[worker_id], c, 0) == c) { + if (c == 0) { + // mru miss: + DPRINTF("[%.4u] mru miss, empty", worker_id); + *ctx = NULL; + } else { + if (c->sig == sig) { + // mru hit: + DPRINTF("[%.4u] mru hit: %llu found", worker_id, PRIuint64(sig)); + *ctx = c; + } else { + // mru mismatch: + DPRINTF("[%.4u] mru miss: %llu != %llu", worker_id, PRIuint64(sig), PRIuint64(c->sig)); + __ctx_cache_add(conn_handle, c); + *ctx = NULL; + } } - // CAS failed, retry up to 3 times - } while(i--); + } if (*ctx == NULL) { // check the cache @@ -474,9 +434,9 @@ __release_ctx(WterlConnHandle *conn_handle, uint32_t worker_id, struct wterl_ctx } else { if (c != NULL) { __ctx_cache_add(conn_handle, c); - DPRINTF("[%.4u] reset %d cursors, returnd ctx to cache", worker_id, ctx->num_cursors); + DPRINTF("[%.4u] reset %d cursors, returned ctx to cache", worker_id, ctx->num_cursors); } else { - DPRINTF("[%.4u] reset %d cursors, returnd ctx to mru", worker_id, ctx->num_cursors); + DPRINTF("[%.4u] reset %d cursors, returned ctx to mru", worker_id, ctx->num_cursors); } } }