From 85295c78904bd3a5a5043ce7c21291db98a924cf Mon Sep 17 00:00:00 2001 From: Gregory Burd Date: Thu, 4 Jul 2013 20:52:20 -0400 Subject: [PATCH] Update to latest async_nif code. --- c_src/async_nif.h | 359 +++++++++++++----------- c_src/common.h | 25 +- c_src/duration.h | 98 ------- c_src/fifo_q.h | 93 ------- c_src/lmdb_nif.c | 34 +-- c_src/queue.h | 678 ++++++++++++++++++++++++++++++++++++++++++++++ c_src/stats.h | 217 --------------- src/lmdb.erl | 2 +- tools/lmdb.config | 13 +- 9 files changed, 908 insertions(+), 611 deletions(-) delete mode 100644 c_src/duration.h delete mode 100644 c_src/fifo_q.h create mode 100644 c_src/queue.h delete mode 100644 c_src/stats.h diff --git a/c_src/async_nif.h b/c_src/async_nif.h index e7a9670..8f13913 100644 --- a/c_src/async_nif.h +++ b/c_src/async_nif.h @@ -4,18 +4,16 @@ * Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved. * Author: Gregory Burd * - * This file is provided to you under the Apache License, - * Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain - * a copy of the License at + * This file is provided to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations * under the License. */ @@ -27,18 +25,17 @@ extern "C" { #endif #include -#include "fifo_q.h" -#include "stats.h" -#ifndef __UNUSED -#define __UNUSED(v) ((void)(v)) +#include "queue.h" + +#ifndef UNUSED +#define UNUSED(v) ((void)(v)) #endif -#define ASYNC_NIF_MAX_WORKERS 128 -#define ASYNC_NIF_WORKER_QUEUE_SIZE 500 -#define ASYNC_NIF_MAX_QUEUED_REQS 1000 * ASYNC_NIF_MAX_WORKERS - -STAT_DECL(qwait, 1000); +#define ASYNC_NIF_MAX_WORKERS 1024 +#define ASYNC_NIF_MIN_WORKERS 2 +#define ASYNC_NIF_WORKER_QUEUE_SIZE 100 +#define ASYNC_NIF_MAX_QUEUED_REQS ASYNC_NIF_WORKER_QUEUE_SIZE * ASYNC_NIF_MAX_WORKERS struct async_nif_req_entry { ERL_NIF_TERM ref; @@ -47,14 +44,16 @@ struct async_nif_req_entry { void *args; void (*fn_work)(ErlNifEnv*, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *); void (*fn_post)(void *); + STAILQ_ENTRY(async_nif_req_entry) entries; }; -DECL_FIFO_QUEUE(reqs, struct async_nif_req_entry); + struct async_nif_work_queue { - STAT_DEF(qwait); + unsigned int num_workers; + unsigned int depth; ErlNifMutex *reqs_mutex; ErlNifCond *reqs_cnd; - FIFO_QUEUE_TYPE(reqs) reqs; + STAILQ_HEAD(reqs, async_nif_req_entry) reqs; }; struct async_nif_worker_entry { @@ -62,16 +61,17 @@ struct async_nif_worker_entry { unsigned int worker_id; struct async_nif_state *async_nif; struct async_nif_work_queue *q; + SLIST_ENTRY(async_nif_worker_entry) entries; }; struct async_nif_state { - STAT_DEF(qwait); unsigned int shutdown; - unsigned int num_workers; - struct async_nif_worker_entry worker_entries[ASYNC_NIF_MAX_WORKERS]; + ErlNifMutex *we_mutex; + unsigned int we_active; + SLIST_HEAD(joining, async_nif_worker_entry) we_joining; unsigned int num_queues; unsigned int next_q; - FIFO_QUEUE_TYPE(reqs) recycled_reqs; + STAILQ_HEAD(recycled_reqs, async_nif_req_entry) recycled_reqs; unsigned int num_reqs; ErlNifMutex *recycled_req_mutex; struct async_nif_work_queue queues[]; @@ -80,37 +80,46 @@ struct async_nif_state { #define ASYNC_NIF_DECL(decl, frame, pre_block, work_block, post_block) \ struct decl ## _args frame; \ static void fn_work_ ## decl (ErlNifEnv *env, ERL_NIF_TERM ref, ErlNifPid *pid, unsigned int worker_id, struct decl ## _args *args) { \ - __UNUSED(worker_id); \ + UNUSED(worker_id); \ + DPRINTF("async_nif: calling \"%s\"", __func__); \ do work_block while(0); \ + DPRINTF("async_nif: returned from \"%s\"", __func__); \ } \ static void fn_post_ ## decl (struct decl ## _args *args) { \ - __UNUSED(args); \ + UNUSED(args); \ + DPRINTF("async_nif: calling \"fn_post_%s\"", #decl); \ do post_block while(0); \ + DPRINTF("async_nif: returned from \"fn_post_%s\"", #decl); \ } \ static ERL_NIF_TERM decl(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv_in[]) { \ struct decl ## _args on_stack_args; \ struct decl ## _args *args = &on_stack_args; \ struct decl ## _args *copy_of_args; \ struct async_nif_req_entry *req = NULL; \ - const char *affinity = NULL; \ + unsigned int affinity = 0; \ ErlNifEnv *new_env = NULL; \ /* argv[0] is a ref used for selective recv */ \ const ERL_NIF_TERM *argv = argv_in + 1; \ argc -= 1; \ /* Note: !!! this assumes that the first element of priv_data is ours */ \ struct async_nif_state *async_nif = *(struct async_nif_state**)enif_priv_data(env); \ - if (async_nif->shutdown) \ + if (async_nif->shutdown) { \ return enif_make_tuple2(env, enif_make_atom(env, "error"), \ enif_make_atom(env, "shutdown")); \ + } \ req = async_nif_reuse_req(async_nif); \ + if (!req) { \ + return enif_make_tuple2(env, enif_make_atom(env, "error"), \ + enif_make_atom(env, "eagain")); \ + } \ new_env = req->env; \ - if (!req) \ - return enif_make_tuple2(env, enif_make_atom(env, "error"), \ - enif_make_atom(env, "eagain")); \ + DPRINTF("async_nif: calling \"%s\"", __func__); \ do pre_block while(0); \ + DPRINTF("async_nif: returned from \"%s\"", __func__); \ copy_of_args = (struct decl ## _args *)enif_alloc(sizeof(struct decl ## _args)); \ if (!copy_of_args) { \ fn_post_ ## decl (args); \ + async_nif_recycle_req(req, async_nif); \ return enif_make_tuple2(env, enif_make_atom(env, "error"), \ enif_make_atom(env, "enomem")); \ } \ @@ -122,13 +131,14 @@ struct async_nif_state { req->fn_post = (void (*)(void *))fn_post_ ## decl; \ int h = -1; \ if (affinity) \ - h = async_nif_str_hash_func(affinity) % async_nif->num_queues; \ + h = ((unsigned int)affinity) % async_nif->num_queues; \ ERL_NIF_TERM reply = async_nif_enqueue_req(async_nif, req, h); \ if (!reply) { \ fn_post_ ## decl (args); \ + async_nif_recycle_req(req, async_nif); \ enif_free(copy_of_args); \ return enif_make_tuple2(env, enif_make_atom(env, "error"), \ - enif_make_atom(env, "shutdown")); \ + enif_make_atom(env, "eagain")); \ } \ return reply; \ } @@ -179,26 +189,26 @@ async_nif_reuse_req(struct async_nif_state *async_nif) ErlNifEnv *env = NULL; enif_mutex_lock(async_nif->recycled_req_mutex); - if (fifo_q_empty(reqs, async_nif->recycled_reqs)) { + if (STAILQ_EMPTY(&async_nif->recycled_reqs)) { if (async_nif->num_reqs < ASYNC_NIF_MAX_QUEUED_REQS) { req = enif_alloc(sizeof(struct async_nif_req_entry)); if (req) { memset(req, 0, sizeof(struct async_nif_req_entry)); env = enif_alloc_env(); - if (!env) { - enif_free(req); - req = NULL; - } else { + if (env) { req->env = env; async_nif->num_reqs++; + } else { + enif_free(req); + req = NULL; } } } } else { - req = fifo_q_get(reqs, async_nif->recycled_reqs); + req = STAILQ_FIRST(&async_nif->recycled_reqs); + STAILQ_REMOVE(&async_nif->recycled_reqs, req, async_nif_req_entry, entries); } enif_mutex_unlock(async_nif->recycled_req_mutex); - STAT_TICK(async_nif, qwait); return req; } @@ -212,27 +222,59 @@ async_nif_reuse_req(struct async_nif_state *async_nif) void async_nif_recycle_req(struct async_nif_req_entry *req, struct async_nif_state *async_nif) { - STAT_TOCK(async_nif, qwait); + ErlNifEnv *env = NULL; enif_mutex_lock(async_nif->recycled_req_mutex); - fifo_q_put(reqs, async_nif->recycled_reqs, req); + enif_clear_env(req->env); + env = req->env; + memset(req, 0, sizeof(struct async_nif_req_entry)); + req->env = env; + STAILQ_INSERT_TAIL(&async_nif->recycled_reqs, req, entries); enif_mutex_unlock(async_nif->recycled_req_mutex); } +static void *async_nif_worker_fn(void *); + /** - * A string hash function. - * - * A basic hash function for strings of characters used during the - * affinity association. - * - * s a NULL terminated set of bytes to be hashed - * -> an integer hash encoding of the bytes + * Start up a worker thread. */ -static inline unsigned int -async_nif_str_hash_func(const char *s) +static int +async_nif_start_worker(struct async_nif_state *async_nif, struct async_nif_work_queue *q) { - unsigned int h = (unsigned int)*s; - if (h) for (++s ; *s; ++s) h = (h << 5) - h + (unsigned int)*s; - return h; + struct async_nif_worker_entry *we; + + if (0 == q) + return EINVAL; + + enif_mutex_lock(async_nif->we_mutex); + + we = SLIST_FIRST(&async_nif->we_joining); + while(we != NULL) { + struct async_nif_worker_entry *n = SLIST_NEXT(we, entries); + SLIST_REMOVE(&async_nif->we_joining, we, async_nif_worker_entry, entries); + void *exit_value = 0; /* We ignore the thread_join's exit value. */ + enif_thread_join(we->tid, &exit_value); + enif_free(we); + async_nif->we_active--; + we = n; + } + + if (async_nif->we_active == ASYNC_NIF_MAX_WORKERS) { + enif_mutex_unlock(async_nif->we_mutex); + return EAGAIN; + } + + we = enif_alloc(sizeof(struct async_nif_worker_entry)); + if (!we) { + enif_mutex_unlock(async_nif->we_mutex); + return ENOMEM; + } + memset(we, 0, sizeof(struct async_nif_worker_entry)); + we->worker_id = async_nif->we_active++; + we->async_nif = async_nif; + we->q = q; + + enif_mutex_unlock(async_nif->we_mutex); + return enif_thread_create(NULL,&we->tid, &async_nif_worker_fn, (void*)we, 0); } /** @@ -245,9 +287,9 @@ static ERL_NIF_TERM async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_entry *req, int hint) { /* Identify the most appropriate worker for this request. */ - unsigned int qid = 0; + unsigned int i, qid = 0; struct async_nif_work_queue *q = NULL; - unsigned int n = async_nif->num_queues; + double avg_depth = 0.0; /* Either we're choosing a queue based on some affinity/hinted value or we need to select the next queue in the rotation and atomically update that @@ -262,46 +304,68 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en /* Now we inspect and interate across the set of queues trying to select one that isn't too full or too slow. */ - do { + for (i = 0; i < async_nif->num_queues; i++) { + /* Compute the average queue depth not counting queues which are empty or + the queue we're considering right now. */ + unsigned int j, n = 0; + for (j = 0; j < async_nif->num_queues; j++) { + if (j != qid && async_nif->queues[j].depth != 0) { + n++; + avg_depth += async_nif->queues[j].depth; + } + } + if (avg_depth != 0) + avg_depth /= n; + + /* Lock this queue under consideration, then check for shutdown. While + we hold this lock either a) we're shutting down so exit now or b) this + queue will be valid until we release the lock. */ q = &async_nif->queues[qid]; enif_mutex_lock(q->reqs_mutex); - - /* Now that we hold the lock, check for shutdown. As long as we hold - this lock either a) we're shutting down so exit now or b) this queue - will be valid until we release the lock. */ if (async_nif->shutdown) { enif_mutex_unlock(q->reqs_mutex); return 0; } - double await = STAT_MEAN_LOG2_SAMPLE(async_nif, qwait); - double await_inthisq = STAT_MEAN_LOG2_SAMPLE(q, qwait); - if (fifo_q_full(reqs, q->reqs) || await_inthisq > await) { + + /* Try not to enqueue a request into a queue that isn't keeping up with + the request volume. */ + if (q->depth <= avg_depth) break; + else { enif_mutex_unlock(q->reqs_mutex); qid = (qid + 1) % async_nif->num_queues; - q = &async_nif->queues[qid]; - } else { - break; } - // TODO: at some point add in work sheading/stealing - } while(n-- > 0); + } - /* We hold the queue's lock, and we've seletect a reasonable queue for this - new request so add the request. */ - STAT_TICK(q, qwait); - fifo_q_put(reqs, q->reqs, req); + /* If the for loop finished then we didn't find a suitable queue for this + request, meaning we're backed up so trigger eagain. */ + if (i == async_nif->num_queues) { + enif_mutex_unlock(q->reqs_mutex); + return 0; + } + + /* Add the request to the queue. */ + STAILQ_INSERT_TAIL(&q->reqs, req, entries); + q->depth++; + + /* We've selected a queue for this new request now check to make sure there are + enough workers actively processing requests on this queue. */ + if (q->depth > q->num_workers) + if (async_nif_start_worker(async_nif, q) == 0) q->num_workers++; /* Build the term before releasing the lock so as not to race on the use of the req pointer (which will soon become invalid in another thread performing the request). */ ERL_NIF_TERM reply = enif_make_tuple2(req->env, enif_make_atom(req->env, "ok"), enif_make_atom(req->env, "enqueued")); - enif_mutex_unlock(q->reqs_mutex); enif_cond_signal(q->reqs_cnd); + enif_mutex_unlock(q->reqs_mutex); return reply; } /** - * TODO: + * Worker threads execute this function. Here each worker pulls requests of + * their respective queues, executes that work and continues doing that until + * they see the shutdown flag is set at which point they exit. */ static void * async_nif_worker_fn(void *arg) @@ -320,26 +384,29 @@ async_nif_worker_fn(void *arg) enif_mutex_unlock(q->reqs_mutex); break; } - if (fifo_q_empty(reqs, q->reqs)) { + if (STAILQ_EMPTY(&q->reqs)) { /* Queue is empty so we wait for more work to arrive. */ - STAT_RESET(q, qwait); - enif_cond_wait(q->reqs_cnd, q->reqs_mutex); - goto check_again_for_work; + if (q->num_workers > ASYNC_NIF_MIN_WORKERS) { + enif_mutex_unlock(q->reqs_mutex); + break; + } else { + enif_cond_wait(q->reqs_cnd, q->reqs_mutex); + goto check_again_for_work; + } } else { - assert(fifo_q_size(reqs, q->reqs) > 0); - assert(fifo_q_size(reqs, q->reqs) < fifo_q_capacity(reqs, q->reqs)); /* At this point the next req is ours to process and we hold the reqs_mutex lock. Take the request off the queue. */ - req = fifo_q_get(reqs, q->reqs); - enif_mutex_unlock(q->reqs_mutex); + req = STAILQ_FIRST(&q->reqs); + STAILQ_REMOVE(&q->reqs, req, async_nif_req_entry, entries); + q->depth--; /* Ensure that there is at least one other worker thread watching this queue. */ enif_cond_signal(q->reqs_cnd); + enif_mutex_unlock(q->reqs_mutex); /* Perform the work. */ req->fn_work(req->env, req->ref, &req->pid, worker_id, req->args); - STAT_TOCK(q, qwait); /* Now call the post-work cleanup function. */ req->fn_post(req->args); @@ -350,11 +417,14 @@ async_nif_worker_fn(void *arg) req->fn_post = 0; enif_free(req->args); req->args = NULL; - enif_clear_env(req->env); async_nif_recycle_req(req, async_nif); req = NULL; } } + enif_mutex_lock(async_nif->we_mutex); + SLIST_INSERT_HEAD(&async_nif->we_joining, we, entries); + enif_mutex_unlock(async_nif->we_mutex); + q->num_workers--; enif_thread_exit(0); return 0; } @@ -366,41 +436,44 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif) unsigned int num_queues = async_nif->num_queues; struct async_nif_work_queue *q = NULL; struct async_nif_req_entry *req = NULL; - __UNUSED(env); + struct async_nif_worker_entry *we = NULL; + UNUSED(env); - STAT_PRINT(async_nif, qwait, "wterl"); - - /* Signal the worker threads, stop what you're doing and exit. To - ensure that we don't race with the enqueue() process we first - lock all the worker queues, then set shutdown to true, then - unlock. The enqueue function will take the queue mutex, then - test for shutdown condition, then enqueue only if not shutting - down. */ + /* Signal the worker threads, stop what you're doing and exit. To ensure + that we don't race with the enqueue() process we first lock all the worker + queues, then set shutdown to true, then unlock. The enqueue function will + take the queue mutex, then test for shutdown condition, then enqueue only + if not shutting down. */ for (i = 0; i < num_queues; i++) { q = &async_nif->queues[i]; enif_mutex_lock(q->reqs_mutex); } + /* Set the shutdown flag so that worker threads will no continue + executing requests. */ async_nif->shutdown = 1; for (i = 0; i < num_queues; i++) { q = &async_nif->queues[i]; - enif_cond_broadcast(q->reqs_cnd); enif_mutex_unlock(q->reqs_mutex); } /* Join for the now exiting worker threads. */ - for (i = 0; i < async_nif->num_workers; ++i) { - void *exit_value = 0; /* We ignore the thread_join's exit value. */ - enif_thread_join(async_nif->worker_entries[i].tid, &exit_value); + while(async_nif->we_active > 0) { + for (i = 0; i < num_queues; i++) + enif_cond_broadcast(async_nif->queues[i].reqs_cnd); + enif_mutex_lock(async_nif->we_mutex); + we = SLIST_FIRST(&async_nif->we_joining); + while(we != NULL) { + struct async_nif_worker_entry *n = SLIST_NEXT(we, entries); + SLIST_REMOVE(&async_nif->we_joining, we, async_nif_worker_entry, entries); + void *exit_value = 0; /* We ignore the thread_join's exit value. */ + enif_thread_join(we->tid, &exit_value); + enif_free(we); + async_nif->we_active--; + we = n; + } + enif_mutex_unlock(async_nif->we_mutex); } - - /* Free req structres sitting on the recycle queue. */ - enif_mutex_lock(async_nif->recycled_req_mutex); - req = NULL; - fifo_q_foreach(reqs, async_nif->recycled_reqs, req, { - enif_free_env(req->env); - enif_free(req); - }); - fifo_q_free(reqs, async_nif->recycled_reqs); + enif_mutex_destroy(async_nif->we_mutex); /* Cleanup in-flight requests, mutexes and conditions in each work queue. */ for (i = 0; i < num_queues; i++) { @@ -408,7 +481,9 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif) /* Worker threads are stopped, now toss anything left in the queue. */ req = NULL; - fifo_q_foreach(reqs, q->reqs, req, { + req = STAILQ_FIRST(&q->reqs); + while(req != NULL) { + struct async_nif_req_entry *n = STAILQ_NEXT(req, entries); enif_clear_env(req->env); enif_send(NULL, &req->pid, req->env, enif_make_tuple2(req->env, enif_make_atom(req->env, "error"), @@ -417,12 +492,23 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif) enif_free_env(req->env); enif_free(req->args); enif_free(req); - }); - fifo_q_free(reqs, q->reqs); + req = n; + } enif_mutex_destroy(q->reqs_mutex); enif_cond_destroy(q->reqs_cnd); } + /* Free any req structures sitting unused on the recycle queue. */ + enif_mutex_lock(async_nif->recycled_req_mutex); + req = NULL; + req = STAILQ_FIRST(&async_nif->recycled_reqs); + while(req != NULL) { + struct async_nif_req_entry *n = STAILQ_NEXT(req, entries); + enif_free_env(req->env); + enif_free(req); + req = n; + } + enif_mutex_unlock(async_nif->recycled_req_mutex); enif_mutex_destroy(async_nif->recycled_req_mutex); memset(async_nif, 0, sizeof(struct async_nif_state) + (sizeof(struct async_nif_work_queue) * async_nif->num_queues)); @@ -433,7 +519,7 @@ static void * async_nif_load() { static int has_init = 0; - unsigned int i, j, num_queues; + unsigned int i, num_queues; ErlNifSysInfo info; struct async_nif_state *async_nif; @@ -463,57 +549,22 @@ async_nif_load() if (!async_nif) return NULL; memset(async_nif, 0, sizeof(struct async_nif_state) + - sizeof(struct async_nif_work_queue) * num_queues); + sizeof(struct async_nif_work_queue) * num_queues); async_nif->num_queues = num_queues; - async_nif->num_workers = 2 * num_queues; + async_nif->we_active = 0; async_nif->next_q = 0; async_nif->shutdown = 0; - async_nif->recycled_reqs = fifo_q_new(reqs, ASYNC_NIF_MAX_QUEUED_REQS); + STAILQ_INIT(&async_nif->recycled_reqs); async_nif->recycled_req_mutex = enif_mutex_create(NULL); - STAT_INIT(async_nif, qwait); + async_nif->we_mutex = enif_mutex_create(NULL); + SLIST_INIT(&async_nif->we_joining); for (i = 0; i < async_nif->num_queues; i++) { struct async_nif_work_queue *q = &async_nif->queues[i]; - q->reqs = fifo_q_new(reqs, ASYNC_NIF_WORKER_QUEUE_SIZE); + STAILQ_INIT(&q->reqs); q->reqs_mutex = enif_mutex_create(NULL); q->reqs_cnd = enif_cond_create(NULL); - STAT_INIT(q, qwait); - } - - /* Setup the thread pool management. */ - memset(async_nif->worker_entries, 0, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS); - - /* Start the worker threads. */ - for (i = 0; i < async_nif->num_workers; i++) { - struct async_nif_worker_entry *we = &async_nif->worker_entries[i]; - we->async_nif = async_nif; - we->worker_id = i; - we->q = &async_nif->queues[i % async_nif->num_queues]; - if (enif_thread_create(NULL, &async_nif->worker_entries[i].tid, - &async_nif_worker_fn, (void*)we, NULL) != 0) { - async_nif->shutdown = 1; - - for (j = 0; j < async_nif->num_queues; j++) { - struct async_nif_work_queue *q = &async_nif->queues[j]; - enif_cond_broadcast(q->reqs_cnd); - } - - while(i-- > 0) { - void *exit_value = 0; /* Ignore this. */ - enif_thread_join(async_nif->worker_entries[i].tid, &exit_value); - } - - for (j = 0; j < async_nif->num_queues; j++) { - struct async_nif_work_queue *q = &async_nif->queues[j]; - enif_mutex_destroy(q->reqs_mutex); - enif_cond_destroy(q->reqs_cnd); - } - - memset(async_nif->worker_entries, 0, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS); - enif_free(async_nif); - return NULL; - } } return async_nif; } @@ -521,7 +572,7 @@ async_nif_load() static void async_nif_upgrade(ErlNifEnv *env) { - __UNUSED(env); + UNUSED(env); // TODO: } diff --git a/c_src/common.h b/c_src/common.h index bbb4fdd..df2f162 100644 --- a/c_src/common.h +++ b/c_src/common.h @@ -24,24 +24,28 @@ extern "C" { #endif -#ifdef DEBUG +#if !(__STDC_VERSION__ >= 199901L || defined(__GNUC__)) +# undef DEBUG +# define DEBUG 0 +# define DPRINTF (void) /* Vararg macros may be unsupported */ +#elif DEBUG #include #include -#ifndef DPRINTF #define DPRINTF(fmt, ...) \ do { \ - fprintf(stderr, "%s:%d " fmt "\n", __func__, __LINE__, __VA_ARGS__); \ + fprintf(stderr, "%s:%d " fmt "\n", __FILE__, __LINE__, __VA_ARGS__); \ fflush(stderr); \ } while(0) -#endif -#ifndef DPUTS -#define DPUTS(arg) DPRINTF("%s", arg) -#endif +#define DPUTS(arg) DPRINTF("%s", arg) #else #define DPRINTF(fmt, ...) ((void) 0) #define DPUTS(arg) ((void) 0) #endif +#ifndef __UNUSED +#define __UNUSED(v) ((void)(v)) +#endif + #ifndef COMPQUIET #define COMPQUIET(n, v) do { \ (n) = (v); \ @@ -49,11 +53,12 @@ extern "C" { } while (0) #endif -#ifndef __UNUSED -#define __UNUSED(v) ((void)(v)) +#ifdef __APPLE__ +#define PRIuint64(x) (x) +#else +#define PRIuint64(x) (unsigned long long)(x) #endif - #if defined(__cplusplus) } #endif diff --git a/c_src/duration.h b/c_src/duration.h deleted file mode 100644 index 635d0fd..0000000 --- a/c_src/duration.h +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Copyright (C) 2013, all rights reserved by Gregory Burd - * - * This Source Code Form is subject to the terms of the Mozilla Public License, - * version 2 (MPLv2). If a copy of the MPL was not distributed with this file, - * you can obtain one at: http://mozilla.org/MPL/2.0/ - * - * NOTES: - * - on some platforms this will require -lrt - */ -#include -#include -#include -#include - -typedef enum { ns = 0, mcs, ms, s } time_scale; -struct scale_time { - const char *abbreviation; - const char *name; - uint64_t mul, div, overhead, ticks_per; -}; -static const struct scale_time scale[] = { - { "ns", "nanosecond", 1000000000LL, 1LL, 10, 2300000000000LL }, - { "mcs", "microsecond", 1000000LL, 1000LL, 10, 2300000000LL }, - { "ms", "millisecond", 1000LL, 1000000LL, 10, 2300000LL }, - { "sec", "second", 1LL, 1000000000LL, 10, 2300LL } }; - -static uint64_t ts(time_scale unit) -{ - struct timespec ts; - clock_gettime(CLOCK_REALTIME, &ts); - return (((uint64_t)ts.tv_sec * scale[unit].mul) + - ((uint64_t)ts.tv_nsec / scale[unit].div)); -} - -#if 0 -//if defined(__i386__) || defined(__x86_64__) - -/** - * cpu_clock_ticks() - * - * A measure provided by Intel x86 CPUs which provides the number of cycles - * (aka "ticks") executed as a counter using the RDTSC instruction. - */ -static inline uint64_t cpu_clock_ticks() -{ - uint32_t lo, hi; - __asm__ __volatile__ ( - "xorl %%eax, %%eax\n" - "cpuid\n" - "rdtsc\n" - : "=a" (lo), "=d" (hi) - : - : "%ebx", "%ecx" ); - return (uint64_t)hi << 32 | lo; -} - -/** - * cpu_clock_ticks() - * - * An approximation of elapsed [ns, mcs, ms, s] from CPU clock ticks. - */ -static uint64_t elapsed_cpu_clock_ticks(uint64_t start, time_scale unit) -{ - return (cpu_clock_ticks() - start - scale[unit].overhead) * scale[unit].ticks_per; -} - -#endif - -typedef struct { - uint64_t then; - time_scale unit; -} duration_t; - -static inline uint64_t elapsed(duration_t *d) -{ - uint64_t now = ts(d->unit); - uint64_t elapsed = now - d->then; - d->then = now; - return elapsed; -} - -#define DURATION(name, resolution) duration_t name = \ - {ts(resolution), resolution} - -#define ELAPSED_DURING(result, resolution, block) \ - do { \ - DURATION(__x, resolution); \ - do block while(0); \ - *result = elapsed(&__x); \ - } while(0); - -#define CYCLES_DURING(result, block) \ - do { \ - uint64_t __begin = cpu_clock_ticks(); \ - do block while(0); \ - *result = cpu_clock_ticks() - __begin; \ - } while(0); diff --git a/c_src/fifo_q.h b/c_src/fifo_q.h deleted file mode 100644 index f37bf67..0000000 --- a/c_src/fifo_q.h +++ /dev/null @@ -1,93 +0,0 @@ -/* - * fifo_q: a macro-based implementation of a FIFO Queue - * - * Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved. - * Author: Gregory Burd - * - * This file is provided to you under the Apache License, - * Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain - * a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#ifndef __FIFO_Q_H__ -#define __FIFO_Q_H__ - -#if defined(__cplusplus) -extern "C" { -#endif - -#define FIFO_QUEUE_TYPE(name) \ - struct fifo_q__ ## name * -#define DECL_FIFO_QUEUE(name, type) \ - struct fifo_q__ ## name { \ - unsigned int h, t, s; \ - type *items[]; \ - }; \ - static struct fifo_q__ ## name *fifo_q_ ## name ## _new(unsigned int n) { \ - int sz = sizeof(struct fifo_q__ ## name) + ((n+1) * sizeof(type *));\ - struct fifo_q__ ## name *q = enif_alloc(sz); \ - if (!q) \ - return 0; \ - memset(q, 0, sz); \ - q->s = n + 1; \ - return q; \ - } \ - static inline void fifo_q_ ## name ## _free(struct fifo_q__ ## name *q) { \ - memset(q, 0, sizeof(struct fifo_q__ ## name) + (q->s * sizeof(type *))); \ - enif_free(q); \ - } \ - static inline type *fifo_q_ ## name ## _put(struct fifo_q__ ## name *q, type *n) { \ - q->items[q->h] = n; \ - q->h = (q->h + 1) % q->s; \ - return n; \ - } \ - static inline type *fifo_q_ ## name ## _get(struct fifo_q__ ## name *q) { \ - type *n = q->items[q->t]; \ - q->items[q->t] = 0; \ - q->t = (q->t + 1) % q->s; \ - return n; \ - } \ - static inline unsigned int fifo_q_ ## name ## _size(struct fifo_q__ ## name *q) { \ - return (q->h - q->t + q->s) % q->s; \ - } \ - static inline unsigned int fifo_q_ ## name ## _capacity(struct fifo_q__ ## name *q) { \ - return q->s - 1; \ - } \ - static inline int fifo_q_ ## name ## _empty(struct fifo_q__ ## name *q) { \ - return (q->t == q->h); \ - } \ - static inline int fifo_q_ ## name ## _full(struct fifo_q__ ## name *q) { \ - return ((q->h + 1) % q->s) == q->t; \ - } - -#define fifo_q_new(name, size) fifo_q_ ## name ## _new(size) -#define fifo_q_free(name, queue) fifo_q_ ## name ## _free(queue) -#define fifo_q_get(name, queue) fifo_q_ ## name ## _get(queue) -#define fifo_q_put(name, queue, item) fifo_q_ ## name ## _put(queue, item) -#define fifo_q_size(name, queue) fifo_q_ ## name ## _size(queue) -#define fifo_q_capacity(name, queue) fifo_q_ ## name ## _capacity(queue) -#define fifo_q_empty(name, queue) fifo_q_ ## name ## _empty(queue) -#define fifo_q_full(name, queue) fifo_q_ ## name ## _full(queue) -#define fifo_q_foreach(name, queue, item, task) do { \ - while(!fifo_q_ ## name ## _empty(queue)) { \ - item = fifo_q_ ## name ## _get(queue); \ - do task while(0); \ - } \ - } while(0); - - -#if defined(__cplusplus) -} -#endif - -#endif // __FIFO_Q_H__ diff --git a/c_src/lmdb_nif.c b/c_src/lmdb_nif.c index 2d7253f..a24afe3 100644 --- a/c_src/lmdb_nif.c +++ b/c_src/lmdb_nif.c @@ -2,7 +2,7 @@ * This file is part of LMDB - Erlang Lightning MDB API * * Copyright (c) 2012 by Aleph Archives. All rights reserved. -%% Copyright (c) 2013 by Basho Technologies, Inc. All rights reserved. + * Copyright (c) 2013 by Basho Technologies, Inc. All rights reserved. * * ------------------------------------------------------------------------- * Redistribution and use in source and binary forms, with or without @@ -28,6 +28,7 @@ #include #include +#include #include #include #include @@ -35,22 +36,12 @@ #include "common.h" #include "async_nif.h" -#include "stats.h" #include "lmdb.h" -STAT_DECL(lmdb_get, 1000); -STAT_DECL(lmdb_put, 1000); -STAT_DECL(lmdb_del, 1000); -STAT_DECL(lmdb_upd, 1000); - static ErlNifResourceType *lmdb_RESOURCE; struct lmdb { MDB_env *env; MDB_dbi dbi; - STAT_DEF(lmdb_get); - STAT_DEF(lmdb_put); - STAT_DEF(lmdb_del); - STAT_DEF(lmdb_upd); }; struct lmdb_priv_data { @@ -213,11 +204,6 @@ ASYNC_NIF_DECL( if ((handle = enif_alloc_resource(lmdb_RESOURCE, sizeof(struct lmdb))) == NULL) FAIL_ERR(ENOMEM, err3); - STAT_INIT(handle, lmdb_get); - STAT_INIT(handle, lmdb_put); - STAT_INIT(handle, lmdb_upd); - STAT_INIT(handle, lmdb_del); - CHECK(mdb_env_create(&(handle->env)), err2); if (mdb_env_set_mapsize(handle->env, args->mapsize)) { @@ -271,15 +257,7 @@ ASYNC_NIF_DECL( }, { // work - STAT_PRINT(args->handle, lmdb_get, "lmdb"); - STAT_PRINT(args->handle, lmdb_put, "lmdb"); - STAT_PRINT(args->handle, lmdb_del, "lmdb"); - STAT_PRINT(args->handle, lmdb_upd, "lmdb"); mdb_env_close(args->handle->env); - STAT_RESET(args->handle, lmdb_get); - STAT_RESET(args->handle, lmdb_put); - STAT_RESET(args->handle, lmdb_del); - STAT_RESET(args->handle, lmdb_upd); args->handle->env = NULL; ASYNC_NIF_REPLY(ATOM_OK); return; @@ -315,7 +293,6 @@ ASYNC_NIF_DECL( } if (!args->handle->env) ASYNC_NIF_RETURN_BADARG(); - STAT_TICK(args->handle, lmdb_put); enif_keep_resource((void*)args->handle); args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[1]); args->val = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]); @@ -354,7 +331,6 @@ ASYNC_NIF_DECL( FAIL_ERR(ret, err1); CHECK(mdb_txn_commit(txn), err1); - STAT_TOCK(args->handle, lmdb_put); ASYNC_NIF_REPLY(ATOM_OK); return; @@ -395,7 +371,6 @@ ASYNC_NIF_DECL( } if (!args->handle->env) ASYNC_NIF_RETURN_BADARG(); - STAT_TICK(args->handle, lmdb_upd); enif_keep_resource((void*)args->handle); args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[1]); args->val = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]); @@ -427,7 +402,6 @@ ASYNC_NIF_DECL( CHECK(mdb_txn_begin(args->handle->env, NULL, 0, & txn), err2); CHECK(mdb_put(txn, args->handle->dbi, &mkey, &mdata, 0), err1); CHECK(mdb_txn_commit(txn), err1); - STAT_TOCK(args->handle, lmdb_upd); ASYNC_NIF_REPLY(ATOM_OK); return; @@ -465,7 +439,6 @@ ASYNC_NIF_DECL( } if (!args->handle->env) ASYNC_NIF_RETURN_BADARG(); - STAT_TICK(args->handle, lmdb_get); enif_keep_resource((void*)args->handle); args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[1]); }, @@ -505,7 +478,6 @@ ASYNC_NIF_DECL( FAIL_ERR(ENOMEM, err); memcpy(bin, mdata.mv_data, mdata.mv_size); - STAT_TOCK(args->handle, lmdb_get); ASYNC_NIF_REPLY(enif_make_tuple(env, 2, ATOM_OK, val)); return; @@ -541,7 +513,6 @@ ASYNC_NIF_DECL( } if (!args->handle->env) ASYNC_NIF_RETURN_BADARG(); - STAT_TICK(args->handle, lmdb_del); enif_keep_resource((void*)args->handle); args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[1]); }, @@ -571,7 +542,6 @@ ASYNC_NIF_DECL( } CHECK(mdb_txn_commit(txn), err); - STAT_TOCK(args->handle, lmdb_del); ASYNC_NIF_REPLY(ATOM_OK); return; diff --git a/c_src/queue.h b/c_src/queue.h new file mode 100644 index 0000000..4c6a153 --- /dev/null +++ b/c_src/queue.h @@ -0,0 +1,678 @@ +/* + * Copyright (c) 1991, 1993 + * The Regents of the University of California. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 4. Neither the name of the University nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS + * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) + * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + * + * @(#)queue.h 8.5 (Berkeley) 8/20/94 + * $FreeBSD: src/sys/sys/queue.h,v 1.54 2002/08/05 05:18:43 alfred Exp $ + */ + +#ifndef _DB_QUEUE_H_ +#define _DB_QUEUE_H_ + +#ifndef __offsetof +#define __offsetof(st, m) \ + ((size_t) ( (char *)&((st *)0)->m - (char *)0 )) +#endif + +#ifndef __containerof +#define __containerof(ptr, type, member) ({ \ + const typeof( ((type *)0)->member ) *__mptr = (ptr); \ + (type *)( (char *)__mptr - __offsetof(type,member) );}) +#endif + +#if defined(__cplusplus) +extern "C" { +#endif + +/* + * This file defines four types of data structures: singly-linked lists, + * singly-linked tail queues, lists and tail queues. + * + * A singly-linked list is headed by a single forward pointer. The elements + * are singly linked for minimum space and pointer manipulation overhead at + * the expense of O(n) removal for arbitrary elements. New elements can be + * added to the list after an existing element or at the head of the list. + * Elements being removed from the head of the list should use the explicit + * macro for this purpose for optimum efficiency. A singly-linked list may + * only be traversed in the forward direction. Singly-linked lists are ideal + * for applications with large datasets and few or no removals or for + * implementing a LIFO queue. + * + * A singly-linked tail queue is headed by a pair of pointers, one to the + * head of the list and the other to the tail of the list. The elements are + * singly linked for minimum space and pointer manipulation overhead at the + * expense of O(n) removal for arbitrary elements. New elements can be added + * to the list after an existing element, at the head of the list, or at the + * end of the list. Elements being removed from the head of the tail queue + * should use the explicit macro for this purpose for optimum efficiency. + * A singly-linked tail queue may only be traversed in the forward direction. + * Singly-linked tail queues are ideal for applications with large datasets + * and few or no removals or for implementing a FIFO queue. + * + * A list is headed by a single forward pointer (or an array of forward + * pointers for a hash table header). The elements are doubly linked + * so that an arbitrary element can be removed without a need to + * traverse the list. New elements can be added to the list before + * or after an existing element or at the head of the list. A list + * may only be traversed in the forward direction. + * + * A tail queue is headed by a pair of pointers, one to the head of the + * list and the other to the tail of the list. The elements are doubly + * linked so that an arbitrary element can be removed without a need to + * traverse the list. New elements can be added to the list before or + * after an existing element, at the head of the list, or at the end of + * the list. A tail queue may be traversed in either direction. + * + * For details on the use of these macros, see the queue(3) manual page. + * + * + * SLIST LIST STAILQ TAILQ + * _HEAD + + + + + * _HEAD_INITIALIZER + + + + + * _ENTRY + + + + + * _INIT + + + + + * _EMPTY + + + + + * _FIRST + + + + + * _NEXT + + + + + * _PREV - - - + + * _LAST - - + + + * _FOREACH + + + + + * _FOREACH_REVERSE - - - + + * _INSERT_HEAD + + + + + * _INSERT_BEFORE - + - + + * _INSERT_AFTER + + + + + * _INSERT_TAIL - - + + + * _CONCAT - - + + + * _REMOVE_HEAD + - + - + * _REMOVE + + + + + * + */ + +/* + * XXX + * We #undef all of the macros because there are incompatible versions of this + * file and these macros on various systems. What makes the problem worse is + * they are included and/or defined by system include files which we may have + * already loaded into Berkeley DB before getting here. For example, FreeBSD's + * includes its system , and VxWorks UnixLib.h defines + * several of the LIST_XXX macros. Visual C.NET 7.0 also defines some of these + * same macros in Vc7\PlatformSDK\Include\WinNT.h. Make sure we use ours. + */ +#undef LIST_EMPTY +#undef LIST_ENTRY +#undef LIST_FIRST +#undef LIST_FOREACH +#undef LIST_HEAD +#undef LIST_HEAD_INITIALIZER +#undef LIST_INIT +#undef LIST_INSERT_AFTER +#undef LIST_INSERT_BEFORE +#undef LIST_INSERT_HEAD +#undef LIST_NEXT +#undef LIST_REMOVE +#undef QMD_TRACE_ELEM +#undef QMD_TRACE_HEAD +#undef QUEUE_MACRO_DEBUG +#undef SLIST_EMPTY +#undef SLIST_ENTRY +#undef SLIST_FIRST +#undef SLIST_FOREACH +#undef SLIST_FOREACH_PREVPTR +#undef SLIST_HEAD +#undef SLIST_HEAD_INITIALIZER +#undef SLIST_INIT +#undef SLIST_INSERT_AFTER +#undef SLIST_INSERT_HEAD +#undef SLIST_NEXT +#undef SLIST_REMOVE +#undef SLIST_REMOVE_HEAD +#undef STAILQ_CONCAT +#undef STAILQ_EMPTY +#undef STAILQ_ENTRY +#undef STAILQ_FIRST +#undef STAILQ_FOREACH +#undef STAILQ_HEAD +#undef STAILQ_HEAD_INITIALIZER +#undef STAILQ_INIT +#undef STAILQ_INSERT_AFTER +#undef STAILQ_INSERT_HEAD +#undef STAILQ_INSERT_TAIL +#undef STAILQ_LAST +#undef STAILQ_NEXT +#undef STAILQ_REMOVE +#undef STAILQ_REMOVE_HEAD +#undef STAILQ_REMOVE_HEAD_UNTIL +#undef TAILQ_CONCAT +#undef TAILQ_EMPTY +#undef TAILQ_ENTRY +#undef TAILQ_FIRST +#undef TAILQ_FOREACH +#undef TAILQ_FOREACH_REVERSE +#undef TAILQ_HEAD +#undef TAILQ_HEAD_INITIALIZER +#undef TAILQ_INIT +#undef TAILQ_INSERT_AFTER +#undef TAILQ_INSERT_BEFORE +#undef TAILQ_INSERT_HEAD +#undef TAILQ_INSERT_TAIL +#undef TAILQ_LAST +#undef TAILQ_NEXT +#undef TAILQ_PREV +#undef TAILQ_REMOVE +#undef TRACEBUF +#undef TRASHIT + +#define QUEUE_MACRO_DEBUG 0 +#if QUEUE_MACRO_DEBUG +/* Store the last 2 places the queue element or head was altered */ +struct qm_trace { + char * lastfile; + int lastline; + char * prevfile; + int prevline; +}; + +#define TRACEBUF struct qm_trace trace; +#define TRASHIT(x) do {(x) = (void *)-1;} while (0) + +#define QMD_TRACE_HEAD(head) do { \ + (head)->trace.prevline = (head)->trace.lastline; \ + (head)->trace.prevfile = (head)->trace.lastfile; \ + (head)->trace.lastline = __LINE__; \ + (head)->trace.lastfile = __FILE__; \ +} while (0) + +#define QMD_TRACE_ELEM(elem) do { \ + (elem)->trace.prevline = (elem)->trace.lastline; \ + (elem)->trace.prevfile = (elem)->trace.lastfile; \ + (elem)->trace.lastline = __LINE__; \ + (elem)->trace.lastfile = __FILE__; \ +} while (0) + +#else +#define QMD_TRACE_ELEM(elem) +#define QMD_TRACE_HEAD(head) +#define TRACEBUF +#define TRASHIT(x) +#endif /* QUEUE_MACRO_DEBUG */ + +/* + * Singly-linked List declarations. + */ +#define SLIST_HEAD(name, type) \ +struct name { \ + struct type *slh_first; /* first element */ \ +} + +#define SLIST_HEAD_INITIALIZER(head) \ + { NULL } + +#define SLIST_ENTRY(type) \ +struct { \ + struct type *sle_next; /* next element */ \ +} + +/* + * Singly-linked List functions. + */ +#define SLIST_EMPTY(head) ((head)->slh_first == NULL) + +#define SLIST_FIRST(head) ((head)->slh_first) + +#define SLIST_FOREACH(var, head, field) \ + for ((var) = SLIST_FIRST((head)); \ + (var); \ + (var) = SLIST_NEXT((var), field)) + +#define SLIST_FOREACH_PREVPTR(var, varp, head, field) \ + for ((varp) = &SLIST_FIRST((head)); \ + ((var) = *(varp)) != NULL; \ + (varp) = &SLIST_NEXT((var), field)) + +#define SLIST_INIT(head) do { \ + SLIST_FIRST((head)) = NULL; \ +} while (0) + +#define SLIST_INSERT_AFTER(slistelm, elm, field) do { \ + SLIST_NEXT((elm), field) = SLIST_NEXT((slistelm), field); \ + SLIST_NEXT((slistelm), field) = (elm); \ +} while (0) + +#define SLIST_INSERT_HEAD(head, elm, field) do { \ + SLIST_NEXT((elm), field) = SLIST_FIRST((head)); \ + SLIST_FIRST((head)) = (elm); \ +} while (0) + +#define SLIST_NEXT(elm, field) ((elm)->field.sle_next) + +#define SLIST_REMOVE(head, elm, type, field) do { \ + if (SLIST_FIRST((head)) == (elm)) { \ + SLIST_REMOVE_HEAD((head), field); \ + } \ + else { \ + struct type *curelm = SLIST_FIRST((head)); \ + while (SLIST_NEXT(curelm, field) != (elm)) \ + curelm = SLIST_NEXT(curelm, field); \ + SLIST_NEXT(curelm, field) = \ + SLIST_NEXT(SLIST_NEXT(curelm, field), field); \ + } \ +} while (0) + +#define SLIST_REMOVE_HEAD(head, field) do { \ + SLIST_FIRST((head)) = SLIST_NEXT(SLIST_FIRST((head)), field); \ +} while (0) + +/* + * Singly-linked Tail queue declarations. + */ +#define STAILQ_HEAD(name, type) \ +struct name { \ + struct type *stqh_first;/* first element */ \ + struct type **stqh_last;/* addr of last next element */ \ +} + +#define STAILQ_HEAD_INITIALIZER(head) \ + { NULL, &(head).stqh_first } + +#define STAILQ_ENTRY(type) \ +struct { \ + struct type *stqe_next; /* next element */ \ +} + +/* + * Singly-linked Tail queue functions. + */ +#define STAILQ_CONCAT(head1, head2) do { \ + if (!STAILQ_EMPTY((head2))) { \ + *(head1)->stqh_last = (head2)->stqh_first; \ + (head1)->stqh_last = (head2)->stqh_last; \ + STAILQ_INIT((head2)); \ + } \ +} while (0) + +#define STAILQ_EMPTY(head) ((head)->stqh_first == NULL) + +#define STAILQ_FIRST(head) ((head)->stqh_first) + +#define STAILQ_FOREACH(var, head, field) \ + for ((var) = STAILQ_FIRST((head)); \ + (var); \ + (var) = STAILQ_NEXT((var), field)) + +#define STAILQ_INIT(head) do { \ + STAILQ_FIRST((head)) = NULL; \ + (head)->stqh_last = &STAILQ_FIRST((head)); \ +} while (0) + +#define STAILQ_INSERT_AFTER(head, tqelm, elm, field) do { \ + if ((STAILQ_NEXT((elm), field) = STAILQ_NEXT((tqelm), field)) == NULL)\ + (head)->stqh_last = &STAILQ_NEXT((elm), field); \ + STAILQ_NEXT((tqelm), field) = (elm); \ +} while (0) + +#define STAILQ_INSERT_HEAD(head, elm, field) do { \ + if ((STAILQ_NEXT((elm), field) = STAILQ_FIRST((head))) == NULL) \ + (head)->stqh_last = &STAILQ_NEXT((elm), field); \ + STAILQ_FIRST((head)) = (elm); \ +} while (0) + +#define STAILQ_INSERT_TAIL(head, elm, field) do { \ + STAILQ_NEXT((elm), field) = NULL; \ + *(head)->stqh_last = (elm); \ + (head)->stqh_last = &STAILQ_NEXT((elm), field); \ +} while (0) + +#define STAILQ_LAST(head, type, field) \ + (STAILQ_EMPTY((head)) ? \ + NULL : \ + ((struct type *) \ + ((char *)((head)->stqh_last) - __offsetof(struct type, field)))) + +#define STAILQ_NEXT(elm, field) ((elm)->field.stqe_next) + +#define STAILQ_REMOVE(head, elm, type, field) do { \ + if (STAILQ_FIRST((head)) == (elm)) { \ + STAILQ_REMOVE_HEAD((head), field); \ + } \ + else { \ + struct type *curelm = STAILQ_FIRST((head)); \ + while (STAILQ_NEXT(curelm, field) != (elm)) \ + curelm = STAILQ_NEXT(curelm, field); \ + if ((STAILQ_NEXT(curelm, field) = \ + STAILQ_NEXT(STAILQ_NEXT(curelm, field), field)) == NULL)\ + (head)->stqh_last = &STAILQ_NEXT((curelm), field);\ + } \ +} while (0) + +#define STAILQ_REMOVE_HEAD(head, field) do { \ + if ((STAILQ_FIRST((head)) = \ + STAILQ_NEXT(STAILQ_FIRST((head)), field)) == NULL) \ + (head)->stqh_last = &STAILQ_FIRST((head)); \ +} while (0) + +#define STAILQ_REMOVE_HEAD_UNTIL(head, elm, field) do { \ + if ((STAILQ_FIRST((head)) = STAILQ_NEXT((elm), field)) == NULL) \ + (head)->stqh_last = &STAILQ_FIRST((head)); \ +} while (0) + +/* + * List declarations. + */ +#define LIST_HEAD(name, type) \ +struct name { \ + struct type *lh_first; /* first element */ \ +} + +#define LIST_HEAD_INITIALIZER(head) \ + { NULL } + +#define LIST_ENTRY(type) \ +struct { \ + struct type *le_next; /* next element */ \ + struct type **le_prev; /* address of previous next element */ \ +} + +/* + * List functions. + */ + +#define LIST_EMPTY(head) ((head)->lh_first == NULL) + +#define LIST_FIRST(head) ((head)->lh_first) + +#define LIST_FOREACH(var, head, field) \ + for ((var) = LIST_FIRST((head)); \ + (var); \ + (var) = LIST_NEXT((var), field)) + +#define LIST_INIT(head) do { \ + LIST_FIRST((head)) = NULL; \ +} while (0) + +#define LIST_INSERT_AFTER(listelm, elm, field) do { \ + if ((LIST_NEXT((elm), field) = LIST_NEXT((listelm), field)) != NULL)\ + LIST_NEXT((listelm), field)->field.le_prev = \ + &LIST_NEXT((elm), field); \ + LIST_NEXT((listelm), field) = (elm); \ + (elm)->field.le_prev = &LIST_NEXT((listelm), field); \ +} while (0) + +#define LIST_INSERT_BEFORE(listelm, elm, field) do { \ + (elm)->field.le_prev = (listelm)->field.le_prev; \ + LIST_NEXT((elm), field) = (listelm); \ + *(listelm)->field.le_prev = (elm); \ + (listelm)->field.le_prev = &LIST_NEXT((elm), field); \ +} while (0) + +#define LIST_INSERT_HEAD(head, elm, field) do { \ + if ((LIST_NEXT((elm), field) = LIST_FIRST((head))) != NULL) \ + LIST_FIRST((head))->field.le_prev = &LIST_NEXT((elm), field);\ + LIST_FIRST((head)) = (elm); \ + (elm)->field.le_prev = &LIST_FIRST((head)); \ +} while (0) + +#define LIST_NEXT(elm, field) ((elm)->field.le_next) + +#define LIST_REMOVE(elm, field) do { \ + if (LIST_NEXT((elm), field) != NULL) \ + LIST_NEXT((elm), field)->field.le_prev = \ + (elm)->field.le_prev; \ + *(elm)->field.le_prev = LIST_NEXT((elm), field); \ +} while (0) + +/* + * Tail queue declarations. + */ +#define TAILQ_HEAD(name, type) \ +struct name { \ + struct type *tqh_first; /* first element */ \ + struct type **tqh_last; /* addr of last next element */ \ + TRACEBUF \ +} + +#define TAILQ_HEAD_INITIALIZER(head) \ + { NULL, &(head).tqh_first } + +#define TAILQ_ENTRY(type) \ +struct { \ + struct type *tqe_next; /* next element */ \ + struct type **tqe_prev; /* address of previous next element */ \ + TRACEBUF \ +} + +/* + * Tail queue functions. + */ +#define TAILQ_CONCAT(head1, head2, field) do { \ + if (!TAILQ_EMPTY(head2)) { \ + *(head1)->tqh_last = (head2)->tqh_first; \ + (head2)->tqh_first->field.tqe_prev = (head1)->tqh_last; \ + (head1)->tqh_last = (head2)->tqh_last; \ + TAILQ_INIT((head2)); \ + QMD_TRACE_HEAD(head); \ + QMD_TRACE_HEAD(head2); \ + } \ +} while (0) + +#define TAILQ_EMPTY(head) ((head)->tqh_first == NULL) + +#define TAILQ_FIRST(head) ((head)->tqh_first) + +#define TAILQ_FOREACH(var, head, field) \ + for ((var) = TAILQ_FIRST((head)); \ + (var); \ + (var) = TAILQ_NEXT((var), field)) + +#define TAILQ_FOREACH_REVERSE(var, head, headname, field) \ + for ((var) = TAILQ_LAST((head), headname); \ + (var); \ + (var) = TAILQ_PREV((var), headname, field)) + +#define TAILQ_INIT(head) do { \ + TAILQ_FIRST((head)) = NULL; \ + (head)->tqh_last = &TAILQ_FIRST((head)); \ + QMD_TRACE_HEAD(head); \ +} while (0) + +#define TAILQ_INSERT_AFTER(head, listelm, elm, field) do { \ + if ((TAILQ_NEXT((elm), field) = TAILQ_NEXT((listelm), field)) != NULL)\ + TAILQ_NEXT((elm), field)->field.tqe_prev = \ + &TAILQ_NEXT((elm), field); \ + else { \ + (head)->tqh_last = &TAILQ_NEXT((elm), field); \ + QMD_TRACE_HEAD(head); \ + } \ + TAILQ_NEXT((listelm), field) = (elm); \ + (elm)->field.tqe_prev = &TAILQ_NEXT((listelm), field); \ + QMD_TRACE_ELEM(&(elm)->field); \ + QMD_TRACE_ELEM(&listelm->field); \ +} while (0) + +#define TAILQ_INSERT_BEFORE(listelm, elm, field) do { \ + (elm)->field.tqe_prev = (listelm)->field.tqe_prev; \ + TAILQ_NEXT((elm), field) = (listelm); \ + *(listelm)->field.tqe_prev = (elm); \ + (listelm)->field.tqe_prev = &TAILQ_NEXT((elm), field); \ + QMD_TRACE_ELEM(&(elm)->field); \ + QMD_TRACE_ELEM(&listelm->field); \ +} while (0) + +#define TAILQ_INSERT_HEAD(head, elm, field) do { \ + if ((TAILQ_NEXT((elm), field) = TAILQ_FIRST((head))) != NULL) \ + TAILQ_FIRST((head))->field.tqe_prev = \ + &TAILQ_NEXT((elm), field); \ + else \ + (head)->tqh_last = &TAILQ_NEXT((elm), field); \ + TAILQ_FIRST((head)) = (elm); \ + (elm)->field.tqe_prev = &TAILQ_FIRST((head)); \ + QMD_TRACE_HEAD(head); \ + QMD_TRACE_ELEM(&(elm)->field); \ +} while (0) + +#define TAILQ_INSERT_TAIL(head, elm, field) do { \ + TAILQ_NEXT((elm), field) = NULL; \ + (elm)->field.tqe_prev = (head)->tqh_last; \ + *(head)->tqh_last = (elm); \ + (head)->tqh_last = &TAILQ_NEXT((elm), field); \ + QMD_TRACE_HEAD(head); \ + QMD_TRACE_ELEM(&(elm)->field); \ +} while (0) + +#define TAILQ_LAST(head, headname) \ + (*(((struct headname *)((head)->tqh_last))->tqh_last)) + +#define TAILQ_NEXT(elm, field) ((elm)->field.tqe_next) + +#define TAILQ_PREV(elm, headname, field) \ + (*(((struct headname *)((elm)->field.tqe_prev))->tqh_last)) + +#define TAILQ_REMOVE(head, elm, field) do { \ + if ((TAILQ_NEXT((elm), field)) != NULL) \ + TAILQ_NEXT((elm), field)->field.tqe_prev = \ + (elm)->field.tqe_prev; \ + else { \ + (head)->tqh_last = (elm)->field.tqe_prev; \ + QMD_TRACE_HEAD(head); \ + } \ + *(elm)->field.tqe_prev = TAILQ_NEXT((elm), field); \ + TRASHIT((elm)->field.tqe_next); \ + TRASHIT((elm)->field.tqe_prev); \ + QMD_TRACE_ELEM(&(elm)->field); \ +} while (0) + +/* + * Circular queue definitions. + */ +#define CIRCLEQ_HEAD(name, type) \ +struct name { \ + struct type *cqh_first; /* first element */ \ + struct type *cqh_last; /* last element */ \ +} + +#define CIRCLEQ_HEAD_INITIALIZER(head) \ + { (void *)&head, (void *)&head } + +#define CIRCLEQ_ENTRY(type) \ +struct { \ + struct type *cqe_next; /* next element */ \ + struct type *cqe_prev; /* previous element */ \ +} + +/* + * Circular queue functions. + */ +#define CIRCLEQ_INIT(head) do { \ + (head)->cqh_first = (void *)(head); \ + (head)->cqh_last = (void *)(head); \ +} while (/*CONSTCOND*/0) + +#define CIRCLEQ_INSERT_AFTER(head, listelm, elm, field) do { \ + (elm)->field.cqe_next = (listelm)->field.cqe_next; \ + (elm)->field.cqe_prev = (listelm); \ + if ((listelm)->field.cqe_next == (void *)(head)) \ + (head)->cqh_last = (elm); \ + else \ + (listelm)->field.cqe_next->field.cqe_prev = (elm); \ + (listelm)->field.cqe_next = (elm); \ +} while (/*CONSTCOND*/0) + +#define CIRCLEQ_INSERT_BEFORE(head, listelm, elm, field) do { \ + (elm)->field.cqe_next = (listelm); \ + (elm)->field.cqe_prev = (listelm)->field.cqe_prev; \ + if ((listelm)->field.cqe_prev == (void *)(head)) \ + (head)->cqh_first = (elm); \ + else \ + (listelm)->field.cqe_prev->field.cqe_next = (elm); \ + (listelm)->field.cqe_prev = (elm); \ +} while (/*CONSTCOND*/0) + +#define CIRCLEQ_INSERT_HEAD(head, elm, field) do { \ + (elm)->field.cqe_next = (head)->cqh_first; \ + (elm)->field.cqe_prev = (void *)(head); \ + if ((head)->cqh_last == (void *)(head)) \ + (head)->cqh_last = (elm); \ + else \ + (head)->cqh_first->field.cqe_prev = (elm); \ + (head)->cqh_first = (elm); \ +} while (/*CONSTCOND*/0) + +#define CIRCLEQ_INSERT_TAIL(head, elm, field) do { \ + (elm)->field.cqe_next = (void *)(head); \ + (elm)->field.cqe_prev = (head)->cqh_last; \ + if ((head)->cqh_first == (void *)(head)) \ + (head)->cqh_first = (elm); \ + else \ + (head)->cqh_last->field.cqe_next = (elm); \ + (head)->cqh_last = (elm); \ +} while (/*CONSTCOND*/0) + +#define CIRCLEQ_REMOVE(head, elm, field) do { \ + if ((elm)->field.cqe_next == (void *)(head)) \ + (head)->cqh_last = (elm)->field.cqe_prev; \ + else \ + (elm)->field.cqe_next->field.cqe_prev = \ + (elm)->field.cqe_prev; \ + if ((elm)->field.cqe_prev == (void *)(head)) \ + (head)->cqh_first = (elm)->field.cqe_next; \ + else \ + (elm)->field.cqe_prev->field.cqe_next = \ + (elm)->field.cqe_next; \ +} while (/*CONSTCOND*/0) + +#define CIRCLEQ_FOREACH(var, head, field) \ + for ((var) = ((head)->cqh_first); \ + (var) != (const void *)(head); \ + (var) = ((var)->field.cqe_next)) + +#define CIRCLEQ_FOREACH_REVERSE(var, head, field) \ + for ((var) = ((head)->cqh_last); \ + (var) != (const void *)(head); \ + (var) = ((var)->field.cqe_prev)) + +/* + * Circular queue access methods. + */ +#define CIRCLEQ_EMPTY(head) ((head)->cqh_first == (void *)(head)) +#define CIRCLEQ_FIRST(head) ((head)->cqh_first) +#define CIRCLEQ_LAST(head) ((head)->cqh_last) +#define CIRCLEQ_NEXT(elm, field) ((elm)->field.cqe_next) +#define CIRCLEQ_PREV(elm, field) ((elm)->field.cqe_prev) + +#define CIRCLEQ_LOOP_NEXT(head, elm, field) \ + (((elm)->field.cqe_next == (void *)(head)) \ + ? ((head)->cqh_first) \ + : (elm->field.cqe_next)) +#define CIRCLEQ_LOOP_PREV(head, elm, field) \ + (((elm)->field.cqe_prev == (void *)(head)) \ + ? ((head)->cqh_last) \ + : (elm->field.cqe_prev)) + + +#if defined(__cplusplus) +} +#endif +#endif /* !_DB_QUEUE_H_ */ diff --git a/c_src/stats.h b/c_src/stats.h deleted file mode 100644 index 4fa574b..0000000 --- a/c_src/stats.h +++ /dev/null @@ -1,217 +0,0 @@ -/* - * stats: measure all the things - * - * Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved. - * Author: Gregory Burd - * - * This file is provided to you under the Apache License, - * Version 2.0 (the "License"); you may not use this file - * except in compliance with the License. You may obtain - * a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#ifndef __STATS_H__ -#define __STATS_H__ - -#if defined(__cplusplus) -extern "C" { -#endif - -#include "duration.h" - -/** - * Calculate the log2 of 64bit unsigned integers. - */ -#ifdef __GCC__ -#define LOG2(X) ((unsigned) ((8 * (sizeof(uint64_t) - 1)) - __builtin_clzll((X)))) -#else -static unsigned int __log2_64(uint64_t x) { - static const int tab64[64] = { - 63, 0, 58, 1, 59, 47, 53, 2, - 60, 39, 48, 27, 54, 33, 42, 3, - 61, 51, 37, 40, 49, 18, 28, 20, - 55, 30, 34, 11, 43, 14, 22, 4, - 62, 57, 46, 52, 38, 26, 32, 41, - 50, 36, 17, 19, 29, 10, 13, 21, - 56, 45, 25, 31, 35, 16, 9, 12, - 44, 24, 15, 8, 23, 7, 6, 5}; - if (x == 0) return 0; - uint64_t v = x; - v |= v >> 1; - v |= v >> 2; - v |= v >> 4; - v |= v >> 8; - v |= v >> 16; - v |= v >> 32; - return tab64[((uint64_t)((v - (v >> 1)) * 0x07EDD5E59A4E28C2)) >> 58]; -} -#define LOG2(X) __log2_64(X) -#endif - -#define STAT_DEF(name) struct name ## _stat name ## _stat; - -#define STAT_DECL(name, nsamples) \ - struct name ## _stat { \ - duration_t d; \ - uint64_t histogram[64]; \ - uint32_t h, n; \ - uint64_t samples[nsamples]; \ - uint64_t min, max; \ - double mean; \ - }; \ - static inline double name ## _stat_mean(struct name ## _stat *s) { \ - uint32_t t = s->h; \ - uint32_t h = (s->h + 1) % nsamples; \ - double mean = 0; \ - while (h != t) { \ - mean += s->samples[h]; \ - h = (h + 1) % nsamples; \ - } \ - if (mean > 0) \ - mean /= (double)(s->n < nsamples ? s->n : nsamples); \ - return mean; \ - } \ - static inline double name ## _stat_mean_lg2(struct name ## _stat *s) { \ - uint32_t i; \ - double mean = 0; \ - for (i = 0; i < 64; i++) \ - mean += (s->histogram[i] * i); \ - if (mean > 0) \ - mean /= (double)s->n; \ - return mean; \ - } \ - static inline uint64_t name ## _stat_tick(struct name ## _stat *s) \ - { \ - uint64_t t = ts(s->d.unit); \ - s->d.then = t; \ - return t; \ - } \ - static inline void name ## _stat_reset(struct name ## _stat *s) \ - { \ - s->min = ~0; \ - s->max = 0; \ - s->h = 0; \ - memset(&s->histogram, 0, sizeof(uint64_t) * 64); \ - memset(&s->samples, 0, sizeof(uint64_t) * nsamples); \ - } \ - static inline uint64_t name ## _stat_tock(struct name ## _stat *s) \ - { \ - uint64_t now = ts(s->d.unit); \ - uint64_t elapsed = now - s->d.then; \ - uint32_t i = s->h; \ - if (s->n == nsamples) { \ - s->mean = (s->mean + name ## _stat_mean(s)) / 2.0; \ - if (s->n >= 4294967295) \ - name ## _stat_reset(s); \ - } \ - s->h = (s->h + 1) % nsamples; \ - s->samples[i] = elapsed; \ - if (elapsed < s->min) \ - s->min = elapsed; \ - if (elapsed > s->max) \ - s->max = elapsed; \ - s->histogram[LOG2(elapsed)]++; \ - s->n++; \ - s->d.then = ts(s->d.unit); \ - return elapsed; \ - } \ - static void name ## _stat_print_histogram(struct name ## _stat *s, const char *mod) \ - { \ - uint8_t logs[64]; \ - uint8_t i, j, max_log = 0; \ - double m = 0.0; \ - \ - if (s->n < nsamples) \ - return; \ - \ - fprintf(stderr, "\n%s:async_nif request latency histogram:\n", mod); \ - m = (s->mean + name ## _stat_mean(s) / 2.0); \ - for (i = 0; i < 64; i++) { \ - logs[i] = LOG2(s->histogram[i]); \ - if (logs[i] > max_log) \ - max_log = logs[i]; \ - } \ - for (i = max_log; i > 0; i--) { \ - if (!(i % 10)) \ - fprintf(stderr, "2^%2d ", i); \ - else \ - fprintf(stderr, " "); \ - for(j = 0; j < 64; j++) \ - fprintf(stderr, logs[j] >= i ? "•" : " "); \ - fprintf(stderr, "\n"); \ - } \ - if (max_log == 100) { \ - fprintf(stderr, "[empty]\n"); \ - } else { \ - fprintf(stderr, " ns μs ms s ks\n"); \ - fprintf(stderr, "min: "); \ - if (s->min < 1000) \ - fprintf(stderr, "%lu (ns)", s->min); \ - else if (s->min < 1000000) \ - fprintf(stderr, "%.2f (μs)", s->min / 1000.0); \ - else if (s->min < 1000000000) \ - fprintf(stderr, "%.2f (ms)", s->min / 1000000.0); \ - else if (s->min < 1000000000000) \ - fprintf(stderr, "%.2f (s)", s->min / 1000000000.0); \ - fprintf(stderr, " max: "); \ - if (s->max < 1000) \ - fprintf(stderr, "%lu (ns)", s->max); \ - else if (s->max < 1000000) \ - fprintf(stderr, "%.2f (μs)", s->max / 1000.0); \ - else if (s->max < 1000000000) \ - fprintf(stderr, "%.2f (ms)", s->max / 1000000.0); \ - else if (s->max < 1000000000000) \ - fprintf(stderr, "%.2f (s)", s->max / 1000000000.0); \ - fprintf(stderr, " mean: "); \ - if (m < 1000) \ - fprintf(stderr, "%.2f (ns)", m); \ - else if (m < 1000000) \ - fprintf(stderr, "%.2f (μs)", m / 1000.0); \ - else if (m < 1000000000) \ - fprintf(stderr, "%.2f (ms)", m / 1000000.0); \ - else if (m < 1000000000000) \ - fprintf(stderr, "%.2f (s)", m / 1000000000.0); \ - fprintf(stderr, "\n"); \ - } \ - fflush(stderr); \ - } - - -#define STAT_INIT(var, name) \ - var->name ## _stat.min = ~0; \ - var->name ## _stat.max = 0; \ - var->name ## _stat.mean = 0.0; \ - var->name ## _stat.h = 0; \ - var->name ## _stat.d.then = 0; \ - var->name ## _stat.d.unit = ns; - -#define STAT_TICK(var, name) name ## _stat_tick(&var->name ## _stat) - -#define STAT_TOCK(var, name) name ## _stat_tock(&var->name ## _stat) - -#define STAT_RESET(var, name) name ## _stat_reset(&var->name ## _stat) - -#define STAT_MEAN_LOG2_SAMPLE(var, name) \ - name ## _stat_mean_lg2(&var->name ## _stat) - -#define STAT_MEAN_SAMPLE(var, name) \ - name ## _stat_mean(&var->name ## _stat) - -#define STAT_PRINT(var, name, mod) \ - name ## _stat_print_histogram(&var->name ## _stat, mod) - - -#if defined(__cplusplus) -} -#endif - -#endif // __STATS_H__ diff --git a/src/lmdb.erl b/src/lmdb.erl index afebbcb..e769c28 100644 --- a/src/lmdb.erl +++ b/src/lmdb.erl @@ -179,7 +179,7 @@ open_test_db() -> {ok, Handle} = ?MODULE:open(DataDir, 2147483648), [?MODULE:upd(Handle, crypto:sha(<>), crypto:rand_bytes(crypto:rand_uniform(128, 4096))) || - X <- lists:seq(1, 100)], + X <- lists:seq(1, 10)], Handle. basics_test_() -> diff --git a/tools/lmdb.config b/tools/lmdb.config index c073acd..ef63aa7 100644 --- a/tools/lmdb.config +++ b/tools/lmdb.config @@ -24,13 +24,14 @@ %% adding a "_" to the name and take the "_" out of the other's name). {mode, max}. -{duration, 10}. -{concurrent, 8}. +{duration, 480}. +{concurrent, 32}. {driver, basho_bench_driver_lmdb}. -{key_generator, {int_to_bin_littleendian,{uniform_int, 5000000}}}. -{value_generator, {fixed_bin, 1024}}. -%{operations, [{get, 9}, {put, 9}, {delete, 2}]}. -{operations, [{put, 1}]}. +{key_generator, {int_to_bin_littleendian,{uniform_int, 5000000000}}}. +{value_generator, {highly_compressible_bin, 2048}}. +%{value_generator, {fixed_bin, 1024}}. +{operations, [{get, 25}, {put, 70}, {delete, 5}]}. +%{operations, [{put, 1}]}. {code_paths, ["../lmdb"]}. {lmdb_dir, "/home/gburd/ws/basho_bench/data"}.