Update to latest async_nif code.

This commit is contained in:
Gregory Burd 2013-07-04 20:52:20 -04:00
parent af7dbbcc9a
commit 85295c7890
9 changed files with 908 additions and 611 deletions

View file

@ -4,18 +4,16 @@
* Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved. * Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved.
* Author: Gregory Burd <greg@basho.com> <greg@burd.me> * Author: Gregory Burd <greg@basho.com> <greg@burd.me>
* *
* This file is provided to you under the Apache License, * This file is provided to you under the Apache License, Version 2.0 (the
* Version 2.0 (the "License"); you may not use this file * "License"); you may not use this file except in compliance with the License.
* except in compliance with the License. You may obtain * You may obtain a copy of the License at:
* a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, * Unless required by applicable law or agreed to in writing, software
* software distributed under the License is distributed on an * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* KIND, either express or implied. See the License for the * License for the specific language governing permissions and limitations
* specific language governing permissions and limitations
* under the License. * under the License.
*/ */
@ -27,18 +25,17 @@ extern "C" {
#endif #endif
#include <assert.h> #include <assert.h>
#include "fifo_q.h"
#include "stats.h"
#ifndef __UNUSED #include "queue.h"
#define __UNUSED(v) ((void)(v))
#ifndef UNUSED
#define UNUSED(v) ((void)(v))
#endif #endif
#define ASYNC_NIF_MAX_WORKERS 128 #define ASYNC_NIF_MAX_WORKERS 1024
#define ASYNC_NIF_WORKER_QUEUE_SIZE 500 #define ASYNC_NIF_MIN_WORKERS 2
#define ASYNC_NIF_MAX_QUEUED_REQS 1000 * ASYNC_NIF_MAX_WORKERS #define ASYNC_NIF_WORKER_QUEUE_SIZE 100
#define ASYNC_NIF_MAX_QUEUED_REQS ASYNC_NIF_WORKER_QUEUE_SIZE * ASYNC_NIF_MAX_WORKERS
STAT_DECL(qwait, 1000);
struct async_nif_req_entry { struct async_nif_req_entry {
ERL_NIF_TERM ref; ERL_NIF_TERM ref;
@ -47,14 +44,16 @@ struct async_nif_req_entry {
void *args; void *args;
void (*fn_work)(ErlNifEnv*, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *); void (*fn_work)(ErlNifEnv*, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *);
void (*fn_post)(void *); void (*fn_post)(void *);
STAILQ_ENTRY(async_nif_req_entry) entries;
}; };
DECL_FIFO_QUEUE(reqs, struct async_nif_req_entry);
struct async_nif_work_queue { struct async_nif_work_queue {
STAT_DEF(qwait); unsigned int num_workers;
unsigned int depth;
ErlNifMutex *reqs_mutex; ErlNifMutex *reqs_mutex;
ErlNifCond *reqs_cnd; ErlNifCond *reqs_cnd;
FIFO_QUEUE_TYPE(reqs) reqs; STAILQ_HEAD(reqs, async_nif_req_entry) reqs;
}; };
struct async_nif_worker_entry { struct async_nif_worker_entry {
@ -62,16 +61,17 @@ struct async_nif_worker_entry {
unsigned int worker_id; unsigned int worker_id;
struct async_nif_state *async_nif; struct async_nif_state *async_nif;
struct async_nif_work_queue *q; struct async_nif_work_queue *q;
SLIST_ENTRY(async_nif_worker_entry) entries;
}; };
struct async_nif_state { struct async_nif_state {
STAT_DEF(qwait);
unsigned int shutdown; unsigned int shutdown;
unsigned int num_workers; ErlNifMutex *we_mutex;
struct async_nif_worker_entry worker_entries[ASYNC_NIF_MAX_WORKERS]; unsigned int we_active;
SLIST_HEAD(joining, async_nif_worker_entry) we_joining;
unsigned int num_queues; unsigned int num_queues;
unsigned int next_q; unsigned int next_q;
FIFO_QUEUE_TYPE(reqs) recycled_reqs; STAILQ_HEAD(recycled_reqs, async_nif_req_entry) recycled_reqs;
unsigned int num_reqs; unsigned int num_reqs;
ErlNifMutex *recycled_req_mutex; ErlNifMutex *recycled_req_mutex;
struct async_nif_work_queue queues[]; struct async_nif_work_queue queues[];
@ -80,37 +80,46 @@ struct async_nif_state {
#define ASYNC_NIF_DECL(decl, frame, pre_block, work_block, post_block) \ #define ASYNC_NIF_DECL(decl, frame, pre_block, work_block, post_block) \
struct decl ## _args frame; \ struct decl ## _args frame; \
static void fn_work_ ## decl (ErlNifEnv *env, ERL_NIF_TERM ref, ErlNifPid *pid, unsigned int worker_id, struct decl ## _args *args) { \ static void fn_work_ ## decl (ErlNifEnv *env, ERL_NIF_TERM ref, ErlNifPid *pid, unsigned int worker_id, struct decl ## _args *args) { \
__UNUSED(worker_id); \ UNUSED(worker_id); \
DPRINTF("async_nif: calling \"%s\"", __func__); \
do work_block while(0); \ do work_block while(0); \
DPRINTF("async_nif: returned from \"%s\"", __func__); \
} \ } \
static void fn_post_ ## decl (struct decl ## _args *args) { \ static void fn_post_ ## decl (struct decl ## _args *args) { \
__UNUSED(args); \ UNUSED(args); \
DPRINTF("async_nif: calling \"fn_post_%s\"", #decl); \
do post_block while(0); \ do post_block while(0); \
DPRINTF("async_nif: returned from \"fn_post_%s\"", #decl); \
} \ } \
static ERL_NIF_TERM decl(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv_in[]) { \ static ERL_NIF_TERM decl(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv_in[]) { \
struct decl ## _args on_stack_args; \ struct decl ## _args on_stack_args; \
struct decl ## _args *args = &on_stack_args; \ struct decl ## _args *args = &on_stack_args; \
struct decl ## _args *copy_of_args; \ struct decl ## _args *copy_of_args; \
struct async_nif_req_entry *req = NULL; \ struct async_nif_req_entry *req = NULL; \
const char *affinity = NULL; \ unsigned int affinity = 0; \
ErlNifEnv *new_env = NULL; \ ErlNifEnv *new_env = NULL; \
/* argv[0] is a ref used for selective recv */ \ /* argv[0] is a ref used for selective recv */ \
const ERL_NIF_TERM *argv = argv_in + 1; \ const ERL_NIF_TERM *argv = argv_in + 1; \
argc -= 1; \ argc -= 1; \
/* Note: !!! this assumes that the first element of priv_data is ours */ \ /* Note: !!! this assumes that the first element of priv_data is ours */ \
struct async_nif_state *async_nif = *(struct async_nif_state**)enif_priv_data(env); \ struct async_nif_state *async_nif = *(struct async_nif_state**)enif_priv_data(env); \
if (async_nif->shutdown) \ if (async_nif->shutdown) { \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \ return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "shutdown")); \ enif_make_atom(env, "shutdown")); \
} \
req = async_nif_reuse_req(async_nif); \ req = async_nif_reuse_req(async_nif); \
new_env = req->env; \ if (!req) { \
if (!req) \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \ return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "eagain")); \ enif_make_atom(env, "eagain")); \
} \
new_env = req->env; \
DPRINTF("async_nif: calling \"%s\"", __func__); \
do pre_block while(0); \ do pre_block while(0); \
DPRINTF("async_nif: returned from \"%s\"", __func__); \
copy_of_args = (struct decl ## _args *)enif_alloc(sizeof(struct decl ## _args)); \ copy_of_args = (struct decl ## _args *)enif_alloc(sizeof(struct decl ## _args)); \
if (!copy_of_args) { \ if (!copy_of_args) { \
fn_post_ ## decl (args); \ fn_post_ ## decl (args); \
async_nif_recycle_req(req, async_nif); \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \ return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "enomem")); \ enif_make_atom(env, "enomem")); \
} \ } \
@ -122,13 +131,14 @@ struct async_nif_state {
req->fn_post = (void (*)(void *))fn_post_ ## decl; \ req->fn_post = (void (*)(void *))fn_post_ ## decl; \
int h = -1; \ int h = -1; \
if (affinity) \ if (affinity) \
h = async_nif_str_hash_func(affinity) % async_nif->num_queues; \ h = ((unsigned int)affinity) % async_nif->num_queues; \
ERL_NIF_TERM reply = async_nif_enqueue_req(async_nif, req, h); \ ERL_NIF_TERM reply = async_nif_enqueue_req(async_nif, req, h); \
if (!reply) { \ if (!reply) { \
fn_post_ ## decl (args); \ fn_post_ ## decl (args); \
async_nif_recycle_req(req, async_nif); \
enif_free(copy_of_args); \ enif_free(copy_of_args); \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \ return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "shutdown")); \ enif_make_atom(env, "eagain")); \
} \ } \
return reply; \ return reply; \
} }
@ -179,26 +189,26 @@ async_nif_reuse_req(struct async_nif_state *async_nif)
ErlNifEnv *env = NULL; ErlNifEnv *env = NULL;
enif_mutex_lock(async_nif->recycled_req_mutex); enif_mutex_lock(async_nif->recycled_req_mutex);
if (fifo_q_empty(reqs, async_nif->recycled_reqs)) { if (STAILQ_EMPTY(&async_nif->recycled_reqs)) {
if (async_nif->num_reqs < ASYNC_NIF_MAX_QUEUED_REQS) { if (async_nif->num_reqs < ASYNC_NIF_MAX_QUEUED_REQS) {
req = enif_alloc(sizeof(struct async_nif_req_entry)); req = enif_alloc(sizeof(struct async_nif_req_entry));
if (req) { if (req) {
memset(req, 0, sizeof(struct async_nif_req_entry)); memset(req, 0, sizeof(struct async_nif_req_entry));
env = enif_alloc_env(); env = enif_alloc_env();
if (!env) { if (env) {
enif_free(req);
req = NULL;
} else {
req->env = env; req->env = env;
async_nif->num_reqs++; async_nif->num_reqs++;
} else {
enif_free(req);
req = NULL;
} }
} }
} }
} else { } else {
req = fifo_q_get(reqs, async_nif->recycled_reqs); req = STAILQ_FIRST(&async_nif->recycled_reqs);
STAILQ_REMOVE(&async_nif->recycled_reqs, req, async_nif_req_entry, entries);
} }
enif_mutex_unlock(async_nif->recycled_req_mutex); enif_mutex_unlock(async_nif->recycled_req_mutex);
STAT_TICK(async_nif, qwait);
return req; return req;
} }
@ -212,27 +222,59 @@ async_nif_reuse_req(struct async_nif_state *async_nif)
void void
async_nif_recycle_req(struct async_nif_req_entry *req, struct async_nif_state *async_nif) async_nif_recycle_req(struct async_nif_req_entry *req, struct async_nif_state *async_nif)
{ {
STAT_TOCK(async_nif, qwait); ErlNifEnv *env = NULL;
enif_mutex_lock(async_nif->recycled_req_mutex); enif_mutex_lock(async_nif->recycled_req_mutex);
fifo_q_put(reqs, async_nif->recycled_reqs, req); enif_clear_env(req->env);
env = req->env;
memset(req, 0, sizeof(struct async_nif_req_entry));
req->env = env;
STAILQ_INSERT_TAIL(&async_nif->recycled_reqs, req, entries);
enif_mutex_unlock(async_nif->recycled_req_mutex); enif_mutex_unlock(async_nif->recycled_req_mutex);
} }
static void *async_nif_worker_fn(void *);
/** /**
* A string hash function. * Start up a worker thread.
*
* A basic hash function for strings of characters used during the
* affinity association.
*
* s a NULL terminated set of bytes to be hashed
* -> an integer hash encoding of the bytes
*/ */
static inline unsigned int static int
async_nif_str_hash_func(const char *s) async_nif_start_worker(struct async_nif_state *async_nif, struct async_nif_work_queue *q)
{ {
unsigned int h = (unsigned int)*s; struct async_nif_worker_entry *we;
if (h) for (++s ; *s; ++s) h = (h << 5) - h + (unsigned int)*s;
return h; if (0 == q)
return EINVAL;
enif_mutex_lock(async_nif->we_mutex);
we = SLIST_FIRST(&async_nif->we_joining);
while(we != NULL) {
struct async_nif_worker_entry *n = SLIST_NEXT(we, entries);
SLIST_REMOVE(&async_nif->we_joining, we, async_nif_worker_entry, entries);
void *exit_value = 0; /* We ignore the thread_join's exit value. */
enif_thread_join(we->tid, &exit_value);
enif_free(we);
async_nif->we_active--;
we = n;
}
if (async_nif->we_active == ASYNC_NIF_MAX_WORKERS) {
enif_mutex_unlock(async_nif->we_mutex);
return EAGAIN;
}
we = enif_alloc(sizeof(struct async_nif_worker_entry));
if (!we) {
enif_mutex_unlock(async_nif->we_mutex);
return ENOMEM;
}
memset(we, 0, sizeof(struct async_nif_worker_entry));
we->worker_id = async_nif->we_active++;
we->async_nif = async_nif;
we->q = q;
enif_mutex_unlock(async_nif->we_mutex);
return enif_thread_create(NULL,&we->tid, &async_nif_worker_fn, (void*)we, 0);
} }
/** /**
@ -245,9 +287,9 @@ static ERL_NIF_TERM
async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_entry *req, int hint) async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_entry *req, int hint)
{ {
/* Identify the most appropriate worker for this request. */ /* Identify the most appropriate worker for this request. */
unsigned int qid = 0; unsigned int i, qid = 0;
struct async_nif_work_queue *q = NULL; struct async_nif_work_queue *q = NULL;
unsigned int n = async_nif->num_queues; double avg_depth = 0.0;
/* Either we're choosing a queue based on some affinity/hinted value or we /* Either we're choosing a queue based on some affinity/hinted value or we
need to select the next queue in the rotation and atomically update that need to select the next queue in the rotation and atomically update that
@ -262,46 +304,68 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
/* Now we inspect and interate across the set of queues trying to select one /* Now we inspect and interate across the set of queues trying to select one
that isn't too full or too slow. */ that isn't too full or too slow. */
do { for (i = 0; i < async_nif->num_queues; i++) {
/* Compute the average queue depth not counting queues which are empty or
the queue we're considering right now. */
unsigned int j, n = 0;
for (j = 0; j < async_nif->num_queues; j++) {
if (j != qid && async_nif->queues[j].depth != 0) {
n++;
avg_depth += async_nif->queues[j].depth;
}
}
if (avg_depth != 0)
avg_depth /= n;
/* Lock this queue under consideration, then check for shutdown. While
we hold this lock either a) we're shutting down so exit now or b) this
queue will be valid until we release the lock. */
q = &async_nif->queues[qid]; q = &async_nif->queues[qid];
enif_mutex_lock(q->reqs_mutex); enif_mutex_lock(q->reqs_mutex);
/* Now that we hold the lock, check for shutdown. As long as we hold
this lock either a) we're shutting down so exit now or b) this queue
will be valid until we release the lock. */
if (async_nif->shutdown) { if (async_nif->shutdown) {
enif_mutex_unlock(q->reqs_mutex); enif_mutex_unlock(q->reqs_mutex);
return 0; return 0;
} }
double await = STAT_MEAN_LOG2_SAMPLE(async_nif, qwait);
double await_inthisq = STAT_MEAN_LOG2_SAMPLE(q, qwait); /* Try not to enqueue a request into a queue that isn't keeping up with
if (fifo_q_full(reqs, q->reqs) || await_inthisq > await) { the request volume. */
if (q->depth <= avg_depth) break;
else {
enif_mutex_unlock(q->reqs_mutex); enif_mutex_unlock(q->reqs_mutex);
qid = (qid + 1) % async_nif->num_queues; qid = (qid + 1) % async_nif->num_queues;
q = &async_nif->queues[qid];
} else {
break;
} }
// TODO: at some point add in work sheading/stealing }
} while(n-- > 0);
/* We hold the queue's lock, and we've seletect a reasonable queue for this /* If the for loop finished then we didn't find a suitable queue for this
new request so add the request. */ request, meaning we're backed up so trigger eagain. */
STAT_TICK(q, qwait); if (i == async_nif->num_queues) {
fifo_q_put(reqs, q->reqs, req); enif_mutex_unlock(q->reqs_mutex);
return 0;
}
/* Add the request to the queue. */
STAILQ_INSERT_TAIL(&q->reqs, req, entries);
q->depth++;
/* We've selected a queue for this new request now check to make sure there are
enough workers actively processing requests on this queue. */
if (q->depth > q->num_workers)
if (async_nif_start_worker(async_nif, q) == 0) q->num_workers++;
/* Build the term before releasing the lock so as not to race on the use of /* Build the term before releasing the lock so as not to race on the use of
the req pointer (which will soon become invalid in another thread the req pointer (which will soon become invalid in another thread
performing the request). */ performing the request). */
ERL_NIF_TERM reply = enif_make_tuple2(req->env, enif_make_atom(req->env, "ok"), ERL_NIF_TERM reply = enif_make_tuple2(req->env, enif_make_atom(req->env, "ok"),
enif_make_atom(req->env, "enqueued")); enif_make_atom(req->env, "enqueued"));
enif_mutex_unlock(q->reqs_mutex);
enif_cond_signal(q->reqs_cnd); enif_cond_signal(q->reqs_cnd);
enif_mutex_unlock(q->reqs_mutex);
return reply; return reply;
} }
/** /**
* TODO: * Worker threads execute this function. Here each worker pulls requests of
* their respective queues, executes that work and continues doing that until
* they see the shutdown flag is set at which point they exit.
*/ */
static void * static void *
async_nif_worker_fn(void *arg) async_nif_worker_fn(void *arg)
@ -320,26 +384,29 @@ async_nif_worker_fn(void *arg)
enif_mutex_unlock(q->reqs_mutex); enif_mutex_unlock(q->reqs_mutex);
break; break;
} }
if (fifo_q_empty(reqs, q->reqs)) { if (STAILQ_EMPTY(&q->reqs)) {
/* Queue is empty so we wait for more work to arrive. */ /* Queue is empty so we wait for more work to arrive. */
STAT_RESET(q, qwait); if (q->num_workers > ASYNC_NIF_MIN_WORKERS) {
enif_mutex_unlock(q->reqs_mutex);
break;
} else {
enif_cond_wait(q->reqs_cnd, q->reqs_mutex); enif_cond_wait(q->reqs_cnd, q->reqs_mutex);
goto check_again_for_work; goto check_again_for_work;
}
} else { } else {
assert(fifo_q_size(reqs, q->reqs) > 0);
assert(fifo_q_size(reqs, q->reqs) < fifo_q_capacity(reqs, q->reqs));
/* At this point the next req is ours to process and we hold the /* At this point the next req is ours to process and we hold the
reqs_mutex lock. Take the request off the queue. */ reqs_mutex lock. Take the request off the queue. */
req = fifo_q_get(reqs, q->reqs); req = STAILQ_FIRST(&q->reqs);
enif_mutex_unlock(q->reqs_mutex); STAILQ_REMOVE(&q->reqs, req, async_nif_req_entry, entries);
q->depth--;
/* Ensure that there is at least one other worker thread watching this /* Ensure that there is at least one other worker thread watching this
queue. */ queue. */
enif_cond_signal(q->reqs_cnd); enif_cond_signal(q->reqs_cnd);
enif_mutex_unlock(q->reqs_mutex);
/* Perform the work. */ /* Perform the work. */
req->fn_work(req->env, req->ref, &req->pid, worker_id, req->args); req->fn_work(req->env, req->ref, &req->pid, worker_id, req->args);
STAT_TOCK(q, qwait);
/* Now call the post-work cleanup function. */ /* Now call the post-work cleanup function. */
req->fn_post(req->args); req->fn_post(req->args);
@ -350,11 +417,14 @@ async_nif_worker_fn(void *arg)
req->fn_post = 0; req->fn_post = 0;
enif_free(req->args); enif_free(req->args);
req->args = NULL; req->args = NULL;
enif_clear_env(req->env);
async_nif_recycle_req(req, async_nif); async_nif_recycle_req(req, async_nif);
req = NULL; req = NULL;
} }
} }
enif_mutex_lock(async_nif->we_mutex);
SLIST_INSERT_HEAD(&async_nif->we_joining, we, entries);
enif_mutex_unlock(async_nif->we_mutex);
q->num_workers--;
enif_thread_exit(0); enif_thread_exit(0);
return 0; return 0;
} }
@ -366,41 +436,44 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
unsigned int num_queues = async_nif->num_queues; unsigned int num_queues = async_nif->num_queues;
struct async_nif_work_queue *q = NULL; struct async_nif_work_queue *q = NULL;
struct async_nif_req_entry *req = NULL; struct async_nif_req_entry *req = NULL;
__UNUSED(env); struct async_nif_worker_entry *we = NULL;
UNUSED(env);
STAT_PRINT(async_nif, qwait, "wterl"); /* Signal the worker threads, stop what you're doing and exit. To ensure
that we don't race with the enqueue() process we first lock all the worker
/* Signal the worker threads, stop what you're doing and exit. To queues, then set shutdown to true, then unlock. The enqueue function will
ensure that we don't race with the enqueue() process we first take the queue mutex, then test for shutdown condition, then enqueue only
lock all the worker queues, then set shutdown to true, then if not shutting down. */
unlock. The enqueue function will take the queue mutex, then
test for shutdown condition, then enqueue only if not shutting
down. */
for (i = 0; i < num_queues; i++) { for (i = 0; i < num_queues; i++) {
q = &async_nif->queues[i]; q = &async_nif->queues[i];
enif_mutex_lock(q->reqs_mutex); enif_mutex_lock(q->reqs_mutex);
} }
/* Set the shutdown flag so that worker threads will no continue
executing requests. */
async_nif->shutdown = 1; async_nif->shutdown = 1;
for (i = 0; i < num_queues; i++) { for (i = 0; i < num_queues; i++) {
q = &async_nif->queues[i]; q = &async_nif->queues[i];
enif_cond_broadcast(q->reqs_cnd);
enif_mutex_unlock(q->reqs_mutex); enif_mutex_unlock(q->reqs_mutex);
} }
/* Join for the now exiting worker threads. */ /* Join for the now exiting worker threads. */
for (i = 0; i < async_nif->num_workers; ++i) { while(async_nif->we_active > 0) {
for (i = 0; i < num_queues; i++)
enif_cond_broadcast(async_nif->queues[i].reqs_cnd);
enif_mutex_lock(async_nif->we_mutex);
we = SLIST_FIRST(&async_nif->we_joining);
while(we != NULL) {
struct async_nif_worker_entry *n = SLIST_NEXT(we, entries);
SLIST_REMOVE(&async_nif->we_joining, we, async_nif_worker_entry, entries);
void *exit_value = 0; /* We ignore the thread_join's exit value. */ void *exit_value = 0; /* We ignore the thread_join's exit value. */
enif_thread_join(async_nif->worker_entries[i].tid, &exit_value); enif_thread_join(we->tid, &exit_value);
enif_free(we);
async_nif->we_active--;
we = n;
} }
enif_mutex_unlock(async_nif->we_mutex);
/* Free req structres sitting on the recycle queue. */ }
enif_mutex_lock(async_nif->recycled_req_mutex); enif_mutex_destroy(async_nif->we_mutex);
req = NULL;
fifo_q_foreach(reqs, async_nif->recycled_reqs, req, {
enif_free_env(req->env);
enif_free(req);
});
fifo_q_free(reqs, async_nif->recycled_reqs);
/* Cleanup in-flight requests, mutexes and conditions in each work queue. */ /* Cleanup in-flight requests, mutexes and conditions in each work queue. */
for (i = 0; i < num_queues; i++) { for (i = 0; i < num_queues; i++) {
@ -408,7 +481,9 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
/* Worker threads are stopped, now toss anything left in the queue. */ /* Worker threads are stopped, now toss anything left in the queue. */
req = NULL; req = NULL;
fifo_q_foreach(reqs, q->reqs, req, { req = STAILQ_FIRST(&q->reqs);
while(req != NULL) {
struct async_nif_req_entry *n = STAILQ_NEXT(req, entries);
enif_clear_env(req->env); enif_clear_env(req->env);
enif_send(NULL, &req->pid, req->env, enif_send(NULL, &req->pid, req->env,
enif_make_tuple2(req->env, enif_make_atom(req->env, "error"), enif_make_tuple2(req->env, enif_make_atom(req->env, "error"),
@ -417,12 +492,23 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
enif_free_env(req->env); enif_free_env(req->env);
enif_free(req->args); enif_free(req->args);
enif_free(req); enif_free(req);
}); req = n;
fifo_q_free(reqs, q->reqs); }
enif_mutex_destroy(q->reqs_mutex); enif_mutex_destroy(q->reqs_mutex);
enif_cond_destroy(q->reqs_cnd); enif_cond_destroy(q->reqs_cnd);
} }
/* Free any req structures sitting unused on the recycle queue. */
enif_mutex_lock(async_nif->recycled_req_mutex);
req = NULL;
req = STAILQ_FIRST(&async_nif->recycled_reqs);
while(req != NULL) {
struct async_nif_req_entry *n = STAILQ_NEXT(req, entries);
enif_free_env(req->env);
enif_free(req);
req = n;
}
enif_mutex_unlock(async_nif->recycled_req_mutex); enif_mutex_unlock(async_nif->recycled_req_mutex);
enif_mutex_destroy(async_nif->recycled_req_mutex); enif_mutex_destroy(async_nif->recycled_req_mutex);
memset(async_nif, 0, sizeof(struct async_nif_state) + (sizeof(struct async_nif_work_queue) * async_nif->num_queues)); memset(async_nif, 0, sizeof(struct async_nif_state) + (sizeof(struct async_nif_work_queue) * async_nif->num_queues));
@ -433,7 +519,7 @@ static void *
async_nif_load() async_nif_load()
{ {
static int has_init = 0; static int has_init = 0;
unsigned int i, j, num_queues; unsigned int i, num_queues;
ErlNifSysInfo info; ErlNifSysInfo info;
struct async_nif_state *async_nif; struct async_nif_state *async_nif;
@ -466,54 +552,19 @@ async_nif_load()
sizeof(struct async_nif_work_queue) * num_queues); sizeof(struct async_nif_work_queue) * num_queues);
async_nif->num_queues = num_queues; async_nif->num_queues = num_queues;
async_nif->num_workers = 2 * num_queues; async_nif->we_active = 0;
async_nif->next_q = 0; async_nif->next_q = 0;
async_nif->shutdown = 0; async_nif->shutdown = 0;
async_nif->recycled_reqs = fifo_q_new(reqs, ASYNC_NIF_MAX_QUEUED_REQS); STAILQ_INIT(&async_nif->recycled_reqs);
async_nif->recycled_req_mutex = enif_mutex_create(NULL); async_nif->recycled_req_mutex = enif_mutex_create(NULL);
STAT_INIT(async_nif, qwait); async_nif->we_mutex = enif_mutex_create(NULL);
SLIST_INIT(&async_nif->we_joining);
for (i = 0; i < async_nif->num_queues; i++) { for (i = 0; i < async_nif->num_queues; i++) {
struct async_nif_work_queue *q = &async_nif->queues[i]; struct async_nif_work_queue *q = &async_nif->queues[i];
q->reqs = fifo_q_new(reqs, ASYNC_NIF_WORKER_QUEUE_SIZE); STAILQ_INIT(&q->reqs);
q->reqs_mutex = enif_mutex_create(NULL); q->reqs_mutex = enif_mutex_create(NULL);
q->reqs_cnd = enif_cond_create(NULL); q->reqs_cnd = enif_cond_create(NULL);
STAT_INIT(q, qwait);
}
/* Setup the thread pool management. */
memset(async_nif->worker_entries, 0, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS);
/* Start the worker threads. */
for (i = 0; i < async_nif->num_workers; i++) {
struct async_nif_worker_entry *we = &async_nif->worker_entries[i];
we->async_nif = async_nif;
we->worker_id = i;
we->q = &async_nif->queues[i % async_nif->num_queues];
if (enif_thread_create(NULL, &async_nif->worker_entries[i].tid,
&async_nif_worker_fn, (void*)we, NULL) != 0) {
async_nif->shutdown = 1;
for (j = 0; j < async_nif->num_queues; j++) {
struct async_nif_work_queue *q = &async_nif->queues[j];
enif_cond_broadcast(q->reqs_cnd);
}
while(i-- > 0) {
void *exit_value = 0; /* Ignore this. */
enif_thread_join(async_nif->worker_entries[i].tid, &exit_value);
}
for (j = 0; j < async_nif->num_queues; j++) {
struct async_nif_work_queue *q = &async_nif->queues[j];
enif_mutex_destroy(q->reqs_mutex);
enif_cond_destroy(q->reqs_cnd);
}
memset(async_nif->worker_entries, 0, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS);
enif_free(async_nif);
return NULL;
}
} }
return async_nif; return async_nif;
} }
@ -521,7 +572,7 @@ async_nif_load()
static void static void
async_nif_upgrade(ErlNifEnv *env) async_nif_upgrade(ErlNifEnv *env)
{ {
__UNUSED(env); UNUSED(env);
// TODO: // TODO:
} }

View file

@ -24,24 +24,28 @@
extern "C" { extern "C" {
#endif #endif
#ifdef DEBUG #if !(__STDC_VERSION__ >= 199901L || defined(__GNUC__))
# undef DEBUG
# define DEBUG 0
# define DPRINTF (void) /* Vararg macros may be unsupported */
#elif DEBUG
#include <stdio.h> #include <stdio.h>
#include <stdarg.h> #include <stdarg.h>
#ifndef DPRINTF
#define DPRINTF(fmt, ...) \ #define DPRINTF(fmt, ...) \
do { \ do { \
fprintf(stderr, "%s:%d " fmt "\n", __func__, __LINE__, __VA_ARGS__); \ fprintf(stderr, "%s:%d " fmt "\n", __FILE__, __LINE__, __VA_ARGS__); \
fflush(stderr); \ fflush(stderr); \
} while(0) } while(0)
#endif
#ifndef DPUTS
#define DPUTS(arg) DPRINTF("%s", arg) #define DPUTS(arg) DPRINTF("%s", arg)
#endif
#else #else
#define DPRINTF(fmt, ...) ((void) 0) #define DPRINTF(fmt, ...) ((void) 0)
#define DPUTS(arg) ((void) 0) #define DPUTS(arg) ((void) 0)
#endif #endif
#ifndef __UNUSED
#define __UNUSED(v) ((void)(v))
#endif
#ifndef COMPQUIET #ifndef COMPQUIET
#define COMPQUIET(n, v) do { \ #define COMPQUIET(n, v) do { \
(n) = (v); \ (n) = (v); \
@ -49,11 +53,12 @@ extern "C" {
} while (0) } while (0)
#endif #endif
#ifndef __UNUSED #ifdef __APPLE__
#define __UNUSED(v) ((void)(v)) #define PRIuint64(x) (x)
#else
#define PRIuint64(x) (unsigned long long)(x)
#endif #endif
#if defined(__cplusplus) #if defined(__cplusplus)
} }
#endif #endif

View file

@ -1,98 +0,0 @@
/*
* Copyright (C) 2013, all rights reserved by Gregory Burd <greg@burd.me>
*
* This Source Code Form is subject to the terms of the Mozilla Public License,
* version 2 (MPLv2). If a copy of the MPL was not distributed with this file,
* you can obtain one at: http://mozilla.org/MPL/2.0/
*
* NOTES:
* - on some platforms this will require -lrt
*/
#include <stdio.h>
#include <stdint.h>
#include <time.h>
#include <sys/timeb.h>
typedef enum { ns = 0, mcs, ms, s } time_scale;
struct scale_time {
const char *abbreviation;
const char *name;
uint64_t mul, div, overhead, ticks_per;
};
static const struct scale_time scale[] = {
{ "ns", "nanosecond", 1000000000LL, 1LL, 10, 2300000000000LL },
{ "mcs", "microsecond", 1000000LL, 1000LL, 10, 2300000000LL },
{ "ms", "millisecond", 1000LL, 1000000LL, 10, 2300000LL },
{ "sec", "second", 1LL, 1000000000LL, 10, 2300LL } };
static uint64_t ts(time_scale unit)
{
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
return (((uint64_t)ts.tv_sec * scale[unit].mul) +
((uint64_t)ts.tv_nsec / scale[unit].div));
}
#if 0
//if defined(__i386__) || defined(__x86_64__)
/**
* cpu_clock_ticks()
*
* A measure provided by Intel x86 CPUs which provides the number of cycles
* (aka "ticks") executed as a counter using the RDTSC instruction.
*/
static inline uint64_t cpu_clock_ticks()
{
uint32_t lo, hi;
__asm__ __volatile__ (
"xorl %%eax, %%eax\n"
"cpuid\n"
"rdtsc\n"
: "=a" (lo), "=d" (hi)
:
: "%ebx", "%ecx" );
return (uint64_t)hi << 32 | lo;
}
/**
* cpu_clock_ticks()
*
* An approximation of elapsed [ns, mcs, ms, s] from CPU clock ticks.
*/
static uint64_t elapsed_cpu_clock_ticks(uint64_t start, time_scale unit)
{
return (cpu_clock_ticks() - start - scale[unit].overhead) * scale[unit].ticks_per;
}
#endif
typedef struct {
uint64_t then;
time_scale unit;
} duration_t;
static inline uint64_t elapsed(duration_t *d)
{
uint64_t now = ts(d->unit);
uint64_t elapsed = now - d->then;
d->then = now;
return elapsed;
}
#define DURATION(name, resolution) duration_t name = \
{ts(resolution), resolution}
#define ELAPSED_DURING(result, resolution, block) \
do { \
DURATION(__x, resolution); \
do block while(0); \
*result = elapsed(&__x); \
} while(0);
#define CYCLES_DURING(result, block) \
do { \
uint64_t __begin = cpu_clock_ticks(); \
do block while(0); \
*result = cpu_clock_ticks() - __begin; \
} while(0);

View file

@ -1,93 +0,0 @@
/*
* fifo_q: a macro-based implementation of a FIFO Queue
*
* Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved.
* Author: Gregory Burd <greg@basho.com> <greg@burd.me>
*
* This file is provided to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef __FIFO_Q_H__
#define __FIFO_Q_H__
#if defined(__cplusplus)
extern "C" {
#endif
#define FIFO_QUEUE_TYPE(name) \
struct fifo_q__ ## name *
#define DECL_FIFO_QUEUE(name, type) \
struct fifo_q__ ## name { \
unsigned int h, t, s; \
type *items[]; \
}; \
static struct fifo_q__ ## name *fifo_q_ ## name ## _new(unsigned int n) { \
int sz = sizeof(struct fifo_q__ ## name) + ((n+1) * sizeof(type *));\
struct fifo_q__ ## name *q = enif_alloc(sz); \
if (!q) \
return 0; \
memset(q, 0, sz); \
q->s = n + 1; \
return q; \
} \
static inline void fifo_q_ ## name ## _free(struct fifo_q__ ## name *q) { \
memset(q, 0, sizeof(struct fifo_q__ ## name) + (q->s * sizeof(type *))); \
enif_free(q); \
} \
static inline type *fifo_q_ ## name ## _put(struct fifo_q__ ## name *q, type *n) { \
q->items[q->h] = n; \
q->h = (q->h + 1) % q->s; \
return n; \
} \
static inline type *fifo_q_ ## name ## _get(struct fifo_q__ ## name *q) { \
type *n = q->items[q->t]; \
q->items[q->t] = 0; \
q->t = (q->t + 1) % q->s; \
return n; \
} \
static inline unsigned int fifo_q_ ## name ## _size(struct fifo_q__ ## name *q) { \
return (q->h - q->t + q->s) % q->s; \
} \
static inline unsigned int fifo_q_ ## name ## _capacity(struct fifo_q__ ## name *q) { \
return q->s - 1; \
} \
static inline int fifo_q_ ## name ## _empty(struct fifo_q__ ## name *q) { \
return (q->t == q->h); \
} \
static inline int fifo_q_ ## name ## _full(struct fifo_q__ ## name *q) { \
return ((q->h + 1) % q->s) == q->t; \
}
#define fifo_q_new(name, size) fifo_q_ ## name ## _new(size)
#define fifo_q_free(name, queue) fifo_q_ ## name ## _free(queue)
#define fifo_q_get(name, queue) fifo_q_ ## name ## _get(queue)
#define fifo_q_put(name, queue, item) fifo_q_ ## name ## _put(queue, item)
#define fifo_q_size(name, queue) fifo_q_ ## name ## _size(queue)
#define fifo_q_capacity(name, queue) fifo_q_ ## name ## _capacity(queue)
#define fifo_q_empty(name, queue) fifo_q_ ## name ## _empty(queue)
#define fifo_q_full(name, queue) fifo_q_ ## name ## _full(queue)
#define fifo_q_foreach(name, queue, item, task) do { \
while(!fifo_q_ ## name ## _empty(queue)) { \
item = fifo_q_ ## name ## _get(queue); \
do task while(0); \
} \
} while(0);
#if defined(__cplusplus)
}
#endif
#endif // __FIFO_Q_H__

View file

@ -2,7 +2,7 @@
* This file is part of LMDB - Erlang Lightning MDB API * This file is part of LMDB - Erlang Lightning MDB API
* *
* Copyright (c) 2012 by Aleph Archives. All rights reserved. * Copyright (c) 2012 by Aleph Archives. All rights reserved.
%% Copyright (c) 2013 by Basho Technologies, Inc. All rights reserved. * Copyright (c) 2013 by Basho Technologies, Inc. All rights reserved.
* *
* ------------------------------------------------------------------------- * -------------------------------------------------------------------------
* Redistribution and use in source and binary forms, with or without * Redistribution and use in source and binary forms, with or without
@ -28,6 +28,7 @@
#include <stdio.h> #include <stdio.h>
#include <string.h> #include <string.h>
#include <inttypes.h>
#include <errno.h> #include <errno.h>
#include <sys/param.h> #include <sys/param.h>
#include <erl_nif.h> #include <erl_nif.h>
@ -35,22 +36,12 @@
#include "common.h" #include "common.h"
#include "async_nif.h" #include "async_nif.h"
#include "stats.h"
#include "lmdb.h" #include "lmdb.h"
STAT_DECL(lmdb_get, 1000);
STAT_DECL(lmdb_put, 1000);
STAT_DECL(lmdb_del, 1000);
STAT_DECL(lmdb_upd, 1000);
static ErlNifResourceType *lmdb_RESOURCE; static ErlNifResourceType *lmdb_RESOURCE;
struct lmdb { struct lmdb {
MDB_env *env; MDB_env *env;
MDB_dbi dbi; MDB_dbi dbi;
STAT_DEF(lmdb_get);
STAT_DEF(lmdb_put);
STAT_DEF(lmdb_del);
STAT_DEF(lmdb_upd);
}; };
struct lmdb_priv_data { struct lmdb_priv_data {
@ -213,11 +204,6 @@ ASYNC_NIF_DECL(
if ((handle = enif_alloc_resource(lmdb_RESOURCE, sizeof(struct lmdb))) == NULL) if ((handle = enif_alloc_resource(lmdb_RESOURCE, sizeof(struct lmdb))) == NULL)
FAIL_ERR(ENOMEM, err3); FAIL_ERR(ENOMEM, err3);
STAT_INIT(handle, lmdb_get);
STAT_INIT(handle, lmdb_put);
STAT_INIT(handle, lmdb_upd);
STAT_INIT(handle, lmdb_del);
CHECK(mdb_env_create(&(handle->env)), err2); CHECK(mdb_env_create(&(handle->env)), err2);
if (mdb_env_set_mapsize(handle->env, args->mapsize)) { if (mdb_env_set_mapsize(handle->env, args->mapsize)) {
@ -271,15 +257,7 @@ ASYNC_NIF_DECL(
}, },
{ // work { // work
STAT_PRINT(args->handle, lmdb_get, "lmdb");
STAT_PRINT(args->handle, lmdb_put, "lmdb");
STAT_PRINT(args->handle, lmdb_del, "lmdb");
STAT_PRINT(args->handle, lmdb_upd, "lmdb");
mdb_env_close(args->handle->env); mdb_env_close(args->handle->env);
STAT_RESET(args->handle, lmdb_get);
STAT_RESET(args->handle, lmdb_put);
STAT_RESET(args->handle, lmdb_del);
STAT_RESET(args->handle, lmdb_upd);
args->handle->env = NULL; args->handle->env = NULL;
ASYNC_NIF_REPLY(ATOM_OK); ASYNC_NIF_REPLY(ATOM_OK);
return; return;
@ -315,7 +293,6 @@ ASYNC_NIF_DECL(
} }
if (!args->handle->env) if (!args->handle->env)
ASYNC_NIF_RETURN_BADARG(); ASYNC_NIF_RETURN_BADARG();
STAT_TICK(args->handle, lmdb_put);
enif_keep_resource((void*)args->handle); enif_keep_resource((void*)args->handle);
args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[1]); args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[1]);
args->val = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]); args->val = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]);
@ -354,7 +331,6 @@ ASYNC_NIF_DECL(
FAIL_ERR(ret, err1); FAIL_ERR(ret, err1);
CHECK(mdb_txn_commit(txn), err1); CHECK(mdb_txn_commit(txn), err1);
STAT_TOCK(args->handle, lmdb_put);
ASYNC_NIF_REPLY(ATOM_OK); ASYNC_NIF_REPLY(ATOM_OK);
return; return;
@ -395,7 +371,6 @@ ASYNC_NIF_DECL(
} }
if (!args->handle->env) if (!args->handle->env)
ASYNC_NIF_RETURN_BADARG(); ASYNC_NIF_RETURN_BADARG();
STAT_TICK(args->handle, lmdb_upd);
enif_keep_resource((void*)args->handle); enif_keep_resource((void*)args->handle);
args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[1]); args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[1]);
args->val = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]); args->val = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[2]);
@ -427,7 +402,6 @@ ASYNC_NIF_DECL(
CHECK(mdb_txn_begin(args->handle->env, NULL, 0, & txn), err2); CHECK(mdb_txn_begin(args->handle->env, NULL, 0, & txn), err2);
CHECK(mdb_put(txn, args->handle->dbi, &mkey, &mdata, 0), err1); CHECK(mdb_put(txn, args->handle->dbi, &mkey, &mdata, 0), err1);
CHECK(mdb_txn_commit(txn), err1); CHECK(mdb_txn_commit(txn), err1);
STAT_TOCK(args->handle, lmdb_upd);
ASYNC_NIF_REPLY(ATOM_OK); ASYNC_NIF_REPLY(ATOM_OK);
return; return;
@ -465,7 +439,6 @@ ASYNC_NIF_DECL(
} }
if (!args->handle->env) if (!args->handle->env)
ASYNC_NIF_RETURN_BADARG(); ASYNC_NIF_RETURN_BADARG();
STAT_TICK(args->handle, lmdb_get);
enif_keep_resource((void*)args->handle); enif_keep_resource((void*)args->handle);
args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[1]); args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[1]);
}, },
@ -505,7 +478,6 @@ ASYNC_NIF_DECL(
FAIL_ERR(ENOMEM, err); FAIL_ERR(ENOMEM, err);
memcpy(bin, mdata.mv_data, mdata.mv_size); memcpy(bin, mdata.mv_data, mdata.mv_size);
STAT_TOCK(args->handle, lmdb_get);
ASYNC_NIF_REPLY(enif_make_tuple(env, 2, ATOM_OK, val)); ASYNC_NIF_REPLY(enif_make_tuple(env, 2, ATOM_OK, val));
return; return;
@ -541,7 +513,6 @@ ASYNC_NIF_DECL(
} }
if (!args->handle->env) if (!args->handle->env)
ASYNC_NIF_RETURN_BADARG(); ASYNC_NIF_RETURN_BADARG();
STAT_TICK(args->handle, lmdb_del);
enif_keep_resource((void*)args->handle); enif_keep_resource((void*)args->handle);
args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[1]); args->key = enif_make_copy(ASYNC_NIF_WORK_ENV, argv[1]);
}, },
@ -571,7 +542,6 @@ ASYNC_NIF_DECL(
} }
CHECK(mdb_txn_commit(txn), err); CHECK(mdb_txn_commit(txn), err);
STAT_TOCK(args->handle, lmdb_del);
ASYNC_NIF_REPLY(ATOM_OK); ASYNC_NIF_REPLY(ATOM_OK);
return; return;

678
c_src/queue.h Normal file
View file

@ -0,0 +1,678 @@
/*
* Copyright (c) 1991, 1993
* The Regents of the University of California. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* 4. Neither the name of the University nor the names of its contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
* @(#)queue.h 8.5 (Berkeley) 8/20/94
* $FreeBSD: src/sys/sys/queue.h,v 1.54 2002/08/05 05:18:43 alfred Exp $
*/
#ifndef _DB_QUEUE_H_
#define _DB_QUEUE_H_
#ifndef __offsetof
#define __offsetof(st, m) \
((size_t) ( (char *)&((st *)0)->m - (char *)0 ))
#endif
#ifndef __containerof
#define __containerof(ptr, type, member) ({ \
const typeof( ((type *)0)->member ) *__mptr = (ptr); \
(type *)( (char *)__mptr - __offsetof(type,member) );})
#endif
#if defined(__cplusplus)
extern "C" {
#endif
/*
* This file defines four types of data structures: singly-linked lists,
* singly-linked tail queues, lists and tail queues.
*
* A singly-linked list is headed by a single forward pointer. The elements
* are singly linked for minimum space and pointer manipulation overhead at
* the expense of O(n) removal for arbitrary elements. New elements can be
* added to the list after an existing element or at the head of the list.
* Elements being removed from the head of the list should use the explicit
* macro for this purpose for optimum efficiency. A singly-linked list may
* only be traversed in the forward direction. Singly-linked lists are ideal
* for applications with large datasets and few or no removals or for
* implementing a LIFO queue.
*
* A singly-linked tail queue is headed by a pair of pointers, one to the
* head of the list and the other to the tail of the list. The elements are
* singly linked for minimum space and pointer manipulation overhead at the
* expense of O(n) removal for arbitrary elements. New elements can be added
* to the list after an existing element, at the head of the list, or at the
* end of the list. Elements being removed from the head of the tail queue
* should use the explicit macro for this purpose for optimum efficiency.
* A singly-linked tail queue may only be traversed in the forward direction.
* Singly-linked tail queues are ideal for applications with large datasets
* and few or no removals or for implementing a FIFO queue.
*
* A list is headed by a single forward pointer (or an array of forward
* pointers for a hash table header). The elements are doubly linked
* so that an arbitrary element can be removed without a need to
* traverse the list. New elements can be added to the list before
* or after an existing element or at the head of the list. A list
* may only be traversed in the forward direction.
*
* A tail queue is headed by a pair of pointers, one to the head of the
* list and the other to the tail of the list. The elements are doubly
* linked so that an arbitrary element can be removed without a need to
* traverse the list. New elements can be added to the list before or
* after an existing element, at the head of the list, or at the end of
* the list. A tail queue may be traversed in either direction.
*
* For details on the use of these macros, see the queue(3) manual page.
*
*
* SLIST LIST STAILQ TAILQ
* _HEAD + + + +
* _HEAD_INITIALIZER + + + +
* _ENTRY + + + +
* _INIT + + + +
* _EMPTY + + + +
* _FIRST + + + +
* _NEXT + + + +
* _PREV - - - +
* _LAST - - + +
* _FOREACH + + + +
* _FOREACH_REVERSE - - - +
* _INSERT_HEAD + + + +
* _INSERT_BEFORE - + - +
* _INSERT_AFTER + + + +
* _INSERT_TAIL - - + +
* _CONCAT - - + +
* _REMOVE_HEAD + - + -
* _REMOVE + + + +
*
*/
/*
* XXX
* We #undef all of the macros because there are incompatible versions of this
* file and these macros on various systems. What makes the problem worse is
* they are included and/or defined by system include files which we may have
* already loaded into Berkeley DB before getting here. For example, FreeBSD's
* <rpc/rpc.h> includes its system <sys/queue.h>, and VxWorks UnixLib.h defines
* several of the LIST_XXX macros. Visual C.NET 7.0 also defines some of these
* same macros in Vc7\PlatformSDK\Include\WinNT.h. Make sure we use ours.
*/
#undef LIST_EMPTY
#undef LIST_ENTRY
#undef LIST_FIRST
#undef LIST_FOREACH
#undef LIST_HEAD
#undef LIST_HEAD_INITIALIZER
#undef LIST_INIT
#undef LIST_INSERT_AFTER
#undef LIST_INSERT_BEFORE
#undef LIST_INSERT_HEAD
#undef LIST_NEXT
#undef LIST_REMOVE
#undef QMD_TRACE_ELEM
#undef QMD_TRACE_HEAD
#undef QUEUE_MACRO_DEBUG
#undef SLIST_EMPTY
#undef SLIST_ENTRY
#undef SLIST_FIRST
#undef SLIST_FOREACH
#undef SLIST_FOREACH_PREVPTR
#undef SLIST_HEAD
#undef SLIST_HEAD_INITIALIZER
#undef SLIST_INIT
#undef SLIST_INSERT_AFTER
#undef SLIST_INSERT_HEAD
#undef SLIST_NEXT
#undef SLIST_REMOVE
#undef SLIST_REMOVE_HEAD
#undef STAILQ_CONCAT
#undef STAILQ_EMPTY
#undef STAILQ_ENTRY
#undef STAILQ_FIRST
#undef STAILQ_FOREACH
#undef STAILQ_HEAD
#undef STAILQ_HEAD_INITIALIZER
#undef STAILQ_INIT
#undef STAILQ_INSERT_AFTER
#undef STAILQ_INSERT_HEAD
#undef STAILQ_INSERT_TAIL
#undef STAILQ_LAST
#undef STAILQ_NEXT
#undef STAILQ_REMOVE
#undef STAILQ_REMOVE_HEAD
#undef STAILQ_REMOVE_HEAD_UNTIL
#undef TAILQ_CONCAT
#undef TAILQ_EMPTY
#undef TAILQ_ENTRY
#undef TAILQ_FIRST
#undef TAILQ_FOREACH
#undef TAILQ_FOREACH_REVERSE
#undef TAILQ_HEAD
#undef TAILQ_HEAD_INITIALIZER
#undef TAILQ_INIT
#undef TAILQ_INSERT_AFTER
#undef TAILQ_INSERT_BEFORE
#undef TAILQ_INSERT_HEAD
#undef TAILQ_INSERT_TAIL
#undef TAILQ_LAST
#undef TAILQ_NEXT
#undef TAILQ_PREV
#undef TAILQ_REMOVE
#undef TRACEBUF
#undef TRASHIT
#define QUEUE_MACRO_DEBUG 0
#if QUEUE_MACRO_DEBUG
/* Store the last 2 places the queue element or head was altered */
struct qm_trace {
char * lastfile;
int lastline;
char * prevfile;
int prevline;
};
#define TRACEBUF struct qm_trace trace;
#define TRASHIT(x) do {(x) = (void *)-1;} while (0)
#define QMD_TRACE_HEAD(head) do { \
(head)->trace.prevline = (head)->trace.lastline; \
(head)->trace.prevfile = (head)->trace.lastfile; \
(head)->trace.lastline = __LINE__; \
(head)->trace.lastfile = __FILE__; \
} while (0)
#define QMD_TRACE_ELEM(elem) do { \
(elem)->trace.prevline = (elem)->trace.lastline; \
(elem)->trace.prevfile = (elem)->trace.lastfile; \
(elem)->trace.lastline = __LINE__; \
(elem)->trace.lastfile = __FILE__; \
} while (0)
#else
#define QMD_TRACE_ELEM(elem)
#define QMD_TRACE_HEAD(head)
#define TRACEBUF
#define TRASHIT(x)
#endif /* QUEUE_MACRO_DEBUG */
/*
* Singly-linked List declarations.
*/
#define SLIST_HEAD(name, type) \
struct name { \
struct type *slh_first; /* first element */ \
}
#define SLIST_HEAD_INITIALIZER(head) \
{ NULL }
#define SLIST_ENTRY(type) \
struct { \
struct type *sle_next; /* next element */ \
}
/*
* Singly-linked List functions.
*/
#define SLIST_EMPTY(head) ((head)->slh_first == NULL)
#define SLIST_FIRST(head) ((head)->slh_first)
#define SLIST_FOREACH(var, head, field) \
for ((var) = SLIST_FIRST((head)); \
(var); \
(var) = SLIST_NEXT((var), field))
#define SLIST_FOREACH_PREVPTR(var, varp, head, field) \
for ((varp) = &SLIST_FIRST((head)); \
((var) = *(varp)) != NULL; \
(varp) = &SLIST_NEXT((var), field))
#define SLIST_INIT(head) do { \
SLIST_FIRST((head)) = NULL; \
} while (0)
#define SLIST_INSERT_AFTER(slistelm, elm, field) do { \
SLIST_NEXT((elm), field) = SLIST_NEXT((slistelm), field); \
SLIST_NEXT((slistelm), field) = (elm); \
} while (0)
#define SLIST_INSERT_HEAD(head, elm, field) do { \
SLIST_NEXT((elm), field) = SLIST_FIRST((head)); \
SLIST_FIRST((head)) = (elm); \
} while (0)
#define SLIST_NEXT(elm, field) ((elm)->field.sle_next)
#define SLIST_REMOVE(head, elm, type, field) do { \
if (SLIST_FIRST((head)) == (elm)) { \
SLIST_REMOVE_HEAD((head), field); \
} \
else { \
struct type *curelm = SLIST_FIRST((head)); \
while (SLIST_NEXT(curelm, field) != (elm)) \
curelm = SLIST_NEXT(curelm, field); \
SLIST_NEXT(curelm, field) = \
SLIST_NEXT(SLIST_NEXT(curelm, field), field); \
} \
} while (0)
#define SLIST_REMOVE_HEAD(head, field) do { \
SLIST_FIRST((head)) = SLIST_NEXT(SLIST_FIRST((head)), field); \
} while (0)
/*
* Singly-linked Tail queue declarations.
*/
#define STAILQ_HEAD(name, type) \
struct name { \
struct type *stqh_first;/* first element */ \
struct type **stqh_last;/* addr of last next element */ \
}
#define STAILQ_HEAD_INITIALIZER(head) \
{ NULL, &(head).stqh_first }
#define STAILQ_ENTRY(type) \
struct { \
struct type *stqe_next; /* next element */ \
}
/*
* Singly-linked Tail queue functions.
*/
#define STAILQ_CONCAT(head1, head2) do { \
if (!STAILQ_EMPTY((head2))) { \
*(head1)->stqh_last = (head2)->stqh_first; \
(head1)->stqh_last = (head2)->stqh_last; \
STAILQ_INIT((head2)); \
} \
} while (0)
#define STAILQ_EMPTY(head) ((head)->stqh_first == NULL)
#define STAILQ_FIRST(head) ((head)->stqh_first)
#define STAILQ_FOREACH(var, head, field) \
for ((var) = STAILQ_FIRST((head)); \
(var); \
(var) = STAILQ_NEXT((var), field))
#define STAILQ_INIT(head) do { \
STAILQ_FIRST((head)) = NULL; \
(head)->stqh_last = &STAILQ_FIRST((head)); \
} while (0)
#define STAILQ_INSERT_AFTER(head, tqelm, elm, field) do { \
if ((STAILQ_NEXT((elm), field) = STAILQ_NEXT((tqelm), field)) == NULL)\
(head)->stqh_last = &STAILQ_NEXT((elm), field); \
STAILQ_NEXT((tqelm), field) = (elm); \
} while (0)
#define STAILQ_INSERT_HEAD(head, elm, field) do { \
if ((STAILQ_NEXT((elm), field) = STAILQ_FIRST((head))) == NULL) \
(head)->stqh_last = &STAILQ_NEXT((elm), field); \
STAILQ_FIRST((head)) = (elm); \
} while (0)
#define STAILQ_INSERT_TAIL(head, elm, field) do { \
STAILQ_NEXT((elm), field) = NULL; \
*(head)->stqh_last = (elm); \
(head)->stqh_last = &STAILQ_NEXT((elm), field); \
} while (0)
#define STAILQ_LAST(head, type, field) \
(STAILQ_EMPTY((head)) ? \
NULL : \
((struct type *) \
((char *)((head)->stqh_last) - __offsetof(struct type, field))))
#define STAILQ_NEXT(elm, field) ((elm)->field.stqe_next)
#define STAILQ_REMOVE(head, elm, type, field) do { \
if (STAILQ_FIRST((head)) == (elm)) { \
STAILQ_REMOVE_HEAD((head), field); \
} \
else { \
struct type *curelm = STAILQ_FIRST((head)); \
while (STAILQ_NEXT(curelm, field) != (elm)) \
curelm = STAILQ_NEXT(curelm, field); \
if ((STAILQ_NEXT(curelm, field) = \
STAILQ_NEXT(STAILQ_NEXT(curelm, field), field)) == NULL)\
(head)->stqh_last = &STAILQ_NEXT((curelm), field);\
} \
} while (0)
#define STAILQ_REMOVE_HEAD(head, field) do { \
if ((STAILQ_FIRST((head)) = \
STAILQ_NEXT(STAILQ_FIRST((head)), field)) == NULL) \
(head)->stqh_last = &STAILQ_FIRST((head)); \
} while (0)
#define STAILQ_REMOVE_HEAD_UNTIL(head, elm, field) do { \
if ((STAILQ_FIRST((head)) = STAILQ_NEXT((elm), field)) == NULL) \
(head)->stqh_last = &STAILQ_FIRST((head)); \
} while (0)
/*
* List declarations.
*/
#define LIST_HEAD(name, type) \
struct name { \
struct type *lh_first; /* first element */ \
}
#define LIST_HEAD_INITIALIZER(head) \
{ NULL }
#define LIST_ENTRY(type) \
struct { \
struct type *le_next; /* next element */ \
struct type **le_prev; /* address of previous next element */ \
}
/*
* List functions.
*/
#define LIST_EMPTY(head) ((head)->lh_first == NULL)
#define LIST_FIRST(head) ((head)->lh_first)
#define LIST_FOREACH(var, head, field) \
for ((var) = LIST_FIRST((head)); \
(var); \
(var) = LIST_NEXT((var), field))
#define LIST_INIT(head) do { \
LIST_FIRST((head)) = NULL; \
} while (0)
#define LIST_INSERT_AFTER(listelm, elm, field) do { \
if ((LIST_NEXT((elm), field) = LIST_NEXT((listelm), field)) != NULL)\
LIST_NEXT((listelm), field)->field.le_prev = \
&LIST_NEXT((elm), field); \
LIST_NEXT((listelm), field) = (elm); \
(elm)->field.le_prev = &LIST_NEXT((listelm), field); \
} while (0)
#define LIST_INSERT_BEFORE(listelm, elm, field) do { \
(elm)->field.le_prev = (listelm)->field.le_prev; \
LIST_NEXT((elm), field) = (listelm); \
*(listelm)->field.le_prev = (elm); \
(listelm)->field.le_prev = &LIST_NEXT((elm), field); \
} while (0)
#define LIST_INSERT_HEAD(head, elm, field) do { \
if ((LIST_NEXT((elm), field) = LIST_FIRST((head))) != NULL) \
LIST_FIRST((head))->field.le_prev = &LIST_NEXT((elm), field);\
LIST_FIRST((head)) = (elm); \
(elm)->field.le_prev = &LIST_FIRST((head)); \
} while (0)
#define LIST_NEXT(elm, field) ((elm)->field.le_next)
#define LIST_REMOVE(elm, field) do { \
if (LIST_NEXT((elm), field) != NULL) \
LIST_NEXT((elm), field)->field.le_prev = \
(elm)->field.le_prev; \
*(elm)->field.le_prev = LIST_NEXT((elm), field); \
} while (0)
/*
* Tail queue declarations.
*/
#define TAILQ_HEAD(name, type) \
struct name { \
struct type *tqh_first; /* first element */ \
struct type **tqh_last; /* addr of last next element */ \
TRACEBUF \
}
#define TAILQ_HEAD_INITIALIZER(head) \
{ NULL, &(head).tqh_first }
#define TAILQ_ENTRY(type) \
struct { \
struct type *tqe_next; /* next element */ \
struct type **tqe_prev; /* address of previous next element */ \
TRACEBUF \
}
/*
* Tail queue functions.
*/
#define TAILQ_CONCAT(head1, head2, field) do { \
if (!TAILQ_EMPTY(head2)) { \
*(head1)->tqh_last = (head2)->tqh_first; \
(head2)->tqh_first->field.tqe_prev = (head1)->tqh_last; \
(head1)->tqh_last = (head2)->tqh_last; \
TAILQ_INIT((head2)); \
QMD_TRACE_HEAD(head); \
QMD_TRACE_HEAD(head2); \
} \
} while (0)
#define TAILQ_EMPTY(head) ((head)->tqh_first == NULL)
#define TAILQ_FIRST(head) ((head)->tqh_first)
#define TAILQ_FOREACH(var, head, field) \
for ((var) = TAILQ_FIRST((head)); \
(var); \
(var) = TAILQ_NEXT((var), field))
#define TAILQ_FOREACH_REVERSE(var, head, headname, field) \
for ((var) = TAILQ_LAST((head), headname); \
(var); \
(var) = TAILQ_PREV((var), headname, field))
#define TAILQ_INIT(head) do { \
TAILQ_FIRST((head)) = NULL; \
(head)->tqh_last = &TAILQ_FIRST((head)); \
QMD_TRACE_HEAD(head); \
} while (0)
#define TAILQ_INSERT_AFTER(head, listelm, elm, field) do { \
if ((TAILQ_NEXT((elm), field) = TAILQ_NEXT((listelm), field)) != NULL)\
TAILQ_NEXT((elm), field)->field.tqe_prev = \
&TAILQ_NEXT((elm), field); \
else { \
(head)->tqh_last = &TAILQ_NEXT((elm), field); \
QMD_TRACE_HEAD(head); \
} \
TAILQ_NEXT((listelm), field) = (elm); \
(elm)->field.tqe_prev = &TAILQ_NEXT((listelm), field); \
QMD_TRACE_ELEM(&(elm)->field); \
QMD_TRACE_ELEM(&listelm->field); \
} while (0)
#define TAILQ_INSERT_BEFORE(listelm, elm, field) do { \
(elm)->field.tqe_prev = (listelm)->field.tqe_prev; \
TAILQ_NEXT((elm), field) = (listelm); \
*(listelm)->field.tqe_prev = (elm); \
(listelm)->field.tqe_prev = &TAILQ_NEXT((elm), field); \
QMD_TRACE_ELEM(&(elm)->field); \
QMD_TRACE_ELEM(&listelm->field); \
} while (0)
#define TAILQ_INSERT_HEAD(head, elm, field) do { \
if ((TAILQ_NEXT((elm), field) = TAILQ_FIRST((head))) != NULL) \
TAILQ_FIRST((head))->field.tqe_prev = \
&TAILQ_NEXT((elm), field); \
else \
(head)->tqh_last = &TAILQ_NEXT((elm), field); \
TAILQ_FIRST((head)) = (elm); \
(elm)->field.tqe_prev = &TAILQ_FIRST((head)); \
QMD_TRACE_HEAD(head); \
QMD_TRACE_ELEM(&(elm)->field); \
} while (0)
#define TAILQ_INSERT_TAIL(head, elm, field) do { \
TAILQ_NEXT((elm), field) = NULL; \
(elm)->field.tqe_prev = (head)->tqh_last; \
*(head)->tqh_last = (elm); \
(head)->tqh_last = &TAILQ_NEXT((elm), field); \
QMD_TRACE_HEAD(head); \
QMD_TRACE_ELEM(&(elm)->field); \
} while (0)
#define TAILQ_LAST(head, headname) \
(*(((struct headname *)((head)->tqh_last))->tqh_last))
#define TAILQ_NEXT(elm, field) ((elm)->field.tqe_next)
#define TAILQ_PREV(elm, headname, field) \
(*(((struct headname *)((elm)->field.tqe_prev))->tqh_last))
#define TAILQ_REMOVE(head, elm, field) do { \
if ((TAILQ_NEXT((elm), field)) != NULL) \
TAILQ_NEXT((elm), field)->field.tqe_prev = \
(elm)->field.tqe_prev; \
else { \
(head)->tqh_last = (elm)->field.tqe_prev; \
QMD_TRACE_HEAD(head); \
} \
*(elm)->field.tqe_prev = TAILQ_NEXT((elm), field); \
TRASHIT((elm)->field.tqe_next); \
TRASHIT((elm)->field.tqe_prev); \
QMD_TRACE_ELEM(&(elm)->field); \
} while (0)
/*
* Circular queue definitions.
*/
#define CIRCLEQ_HEAD(name, type) \
struct name { \
struct type *cqh_first; /* first element */ \
struct type *cqh_last; /* last element */ \
}
#define CIRCLEQ_HEAD_INITIALIZER(head) \
{ (void *)&head, (void *)&head }
#define CIRCLEQ_ENTRY(type) \
struct { \
struct type *cqe_next; /* next element */ \
struct type *cqe_prev; /* previous element */ \
}
/*
* Circular queue functions.
*/
#define CIRCLEQ_INIT(head) do { \
(head)->cqh_first = (void *)(head); \
(head)->cqh_last = (void *)(head); \
} while (/*CONSTCOND*/0)
#define CIRCLEQ_INSERT_AFTER(head, listelm, elm, field) do { \
(elm)->field.cqe_next = (listelm)->field.cqe_next; \
(elm)->field.cqe_prev = (listelm); \
if ((listelm)->field.cqe_next == (void *)(head)) \
(head)->cqh_last = (elm); \
else \
(listelm)->field.cqe_next->field.cqe_prev = (elm); \
(listelm)->field.cqe_next = (elm); \
} while (/*CONSTCOND*/0)
#define CIRCLEQ_INSERT_BEFORE(head, listelm, elm, field) do { \
(elm)->field.cqe_next = (listelm); \
(elm)->field.cqe_prev = (listelm)->field.cqe_prev; \
if ((listelm)->field.cqe_prev == (void *)(head)) \
(head)->cqh_first = (elm); \
else \
(listelm)->field.cqe_prev->field.cqe_next = (elm); \
(listelm)->field.cqe_prev = (elm); \
} while (/*CONSTCOND*/0)
#define CIRCLEQ_INSERT_HEAD(head, elm, field) do { \
(elm)->field.cqe_next = (head)->cqh_first; \
(elm)->field.cqe_prev = (void *)(head); \
if ((head)->cqh_last == (void *)(head)) \
(head)->cqh_last = (elm); \
else \
(head)->cqh_first->field.cqe_prev = (elm); \
(head)->cqh_first = (elm); \
} while (/*CONSTCOND*/0)
#define CIRCLEQ_INSERT_TAIL(head, elm, field) do { \
(elm)->field.cqe_next = (void *)(head); \
(elm)->field.cqe_prev = (head)->cqh_last; \
if ((head)->cqh_first == (void *)(head)) \
(head)->cqh_first = (elm); \
else \
(head)->cqh_last->field.cqe_next = (elm); \
(head)->cqh_last = (elm); \
} while (/*CONSTCOND*/0)
#define CIRCLEQ_REMOVE(head, elm, field) do { \
if ((elm)->field.cqe_next == (void *)(head)) \
(head)->cqh_last = (elm)->field.cqe_prev; \
else \
(elm)->field.cqe_next->field.cqe_prev = \
(elm)->field.cqe_prev; \
if ((elm)->field.cqe_prev == (void *)(head)) \
(head)->cqh_first = (elm)->field.cqe_next; \
else \
(elm)->field.cqe_prev->field.cqe_next = \
(elm)->field.cqe_next; \
} while (/*CONSTCOND*/0)
#define CIRCLEQ_FOREACH(var, head, field) \
for ((var) = ((head)->cqh_first); \
(var) != (const void *)(head); \
(var) = ((var)->field.cqe_next))
#define CIRCLEQ_FOREACH_REVERSE(var, head, field) \
for ((var) = ((head)->cqh_last); \
(var) != (const void *)(head); \
(var) = ((var)->field.cqe_prev))
/*
* Circular queue access methods.
*/
#define CIRCLEQ_EMPTY(head) ((head)->cqh_first == (void *)(head))
#define CIRCLEQ_FIRST(head) ((head)->cqh_first)
#define CIRCLEQ_LAST(head) ((head)->cqh_last)
#define CIRCLEQ_NEXT(elm, field) ((elm)->field.cqe_next)
#define CIRCLEQ_PREV(elm, field) ((elm)->field.cqe_prev)
#define CIRCLEQ_LOOP_NEXT(head, elm, field) \
(((elm)->field.cqe_next == (void *)(head)) \
? ((head)->cqh_first) \
: (elm->field.cqe_next))
#define CIRCLEQ_LOOP_PREV(head, elm, field) \
(((elm)->field.cqe_prev == (void *)(head)) \
? ((head)->cqh_last) \
: (elm->field.cqe_prev))
#if defined(__cplusplus)
}
#endif
#endif /* !_DB_QUEUE_H_ */

View file

@ -1,217 +0,0 @@
/*
* stats: measure all the things
*
* Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved.
* Author: Gregory Burd <greg@basho.com> <greg@burd.me>
*
* This file is provided to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef __STATS_H__
#define __STATS_H__
#if defined(__cplusplus)
extern "C" {
#endif
#include "duration.h"
/**
* Calculate the log2 of 64bit unsigned integers.
*/
#ifdef __GCC__
#define LOG2(X) ((unsigned) ((8 * (sizeof(uint64_t) - 1)) - __builtin_clzll((X))))
#else
static unsigned int __log2_64(uint64_t x) {
static const int tab64[64] = {
63, 0, 58, 1, 59, 47, 53, 2,
60, 39, 48, 27, 54, 33, 42, 3,
61, 51, 37, 40, 49, 18, 28, 20,
55, 30, 34, 11, 43, 14, 22, 4,
62, 57, 46, 52, 38, 26, 32, 41,
50, 36, 17, 19, 29, 10, 13, 21,
56, 45, 25, 31, 35, 16, 9, 12,
44, 24, 15, 8, 23, 7, 6, 5};
if (x == 0) return 0;
uint64_t v = x;
v |= v >> 1;
v |= v >> 2;
v |= v >> 4;
v |= v >> 8;
v |= v >> 16;
v |= v >> 32;
return tab64[((uint64_t)((v - (v >> 1)) * 0x07EDD5E59A4E28C2)) >> 58];
}
#define LOG2(X) __log2_64(X)
#endif
#define STAT_DEF(name) struct name ## _stat name ## _stat;
#define STAT_DECL(name, nsamples) \
struct name ## _stat { \
duration_t d; \
uint64_t histogram[64]; \
uint32_t h, n; \
uint64_t samples[nsamples]; \
uint64_t min, max; \
double mean; \
}; \
static inline double name ## _stat_mean(struct name ## _stat *s) { \
uint32_t t = s->h; \
uint32_t h = (s->h + 1) % nsamples; \
double mean = 0; \
while (h != t) { \
mean += s->samples[h]; \
h = (h + 1) % nsamples; \
} \
if (mean > 0) \
mean /= (double)(s->n < nsamples ? s->n : nsamples); \
return mean; \
} \
static inline double name ## _stat_mean_lg2(struct name ## _stat *s) { \
uint32_t i; \
double mean = 0; \
for (i = 0; i < 64; i++) \
mean += (s->histogram[i] * i); \
if (mean > 0) \
mean /= (double)s->n; \
return mean; \
} \
static inline uint64_t name ## _stat_tick(struct name ## _stat *s) \
{ \
uint64_t t = ts(s->d.unit); \
s->d.then = t; \
return t; \
} \
static inline void name ## _stat_reset(struct name ## _stat *s) \
{ \
s->min = ~0; \
s->max = 0; \
s->h = 0; \
memset(&s->histogram, 0, sizeof(uint64_t) * 64); \
memset(&s->samples, 0, sizeof(uint64_t) * nsamples); \
} \
static inline uint64_t name ## _stat_tock(struct name ## _stat *s) \
{ \
uint64_t now = ts(s->d.unit); \
uint64_t elapsed = now - s->d.then; \
uint32_t i = s->h; \
if (s->n == nsamples) { \
s->mean = (s->mean + name ## _stat_mean(s)) / 2.0; \
if (s->n >= 4294967295) \
name ## _stat_reset(s); \
} \
s->h = (s->h + 1) % nsamples; \
s->samples[i] = elapsed; \
if (elapsed < s->min) \
s->min = elapsed; \
if (elapsed > s->max) \
s->max = elapsed; \
s->histogram[LOG2(elapsed)]++; \
s->n++; \
s->d.then = ts(s->d.unit); \
return elapsed; \
} \
static void name ## _stat_print_histogram(struct name ## _stat *s, const char *mod) \
{ \
uint8_t logs[64]; \
uint8_t i, j, max_log = 0; \
double m = 0.0; \
\
if (s->n < nsamples) \
return; \
\
fprintf(stderr, "\n%s:async_nif request latency histogram:\n", mod); \
m = (s->mean + name ## _stat_mean(s) / 2.0); \
for (i = 0; i < 64; i++) { \
logs[i] = LOG2(s->histogram[i]); \
if (logs[i] > max_log) \
max_log = logs[i]; \
} \
for (i = max_log; i > 0; i--) { \
if (!(i % 10)) \
fprintf(stderr, "2^%2d ", i); \
else \
fprintf(stderr, " "); \
for(j = 0; j < 64; j++) \
fprintf(stderr, logs[j] >= i ? "" : " "); \
fprintf(stderr, "\n"); \
} \
if (max_log == 100) { \
fprintf(stderr, "[empty]\n"); \
} else { \
fprintf(stderr, " ns μs ms s ks\n"); \
fprintf(stderr, "min: "); \
if (s->min < 1000) \
fprintf(stderr, "%lu (ns)", s->min); \
else if (s->min < 1000000) \
fprintf(stderr, "%.2f (μs)", s->min / 1000.0); \
else if (s->min < 1000000000) \
fprintf(stderr, "%.2f (ms)", s->min / 1000000.0); \
else if (s->min < 1000000000000) \
fprintf(stderr, "%.2f (s)", s->min / 1000000000.0); \
fprintf(stderr, " max: "); \
if (s->max < 1000) \
fprintf(stderr, "%lu (ns)", s->max); \
else if (s->max < 1000000) \
fprintf(stderr, "%.2f (μs)", s->max / 1000.0); \
else if (s->max < 1000000000) \
fprintf(stderr, "%.2f (ms)", s->max / 1000000.0); \
else if (s->max < 1000000000000) \
fprintf(stderr, "%.2f (s)", s->max / 1000000000.0); \
fprintf(stderr, " mean: "); \
if (m < 1000) \
fprintf(stderr, "%.2f (ns)", m); \
else if (m < 1000000) \
fprintf(stderr, "%.2f (μs)", m / 1000.0); \
else if (m < 1000000000) \
fprintf(stderr, "%.2f (ms)", m / 1000000.0); \
else if (m < 1000000000000) \
fprintf(stderr, "%.2f (s)", m / 1000000000.0); \
fprintf(stderr, "\n"); \
} \
fflush(stderr); \
}
#define STAT_INIT(var, name) \
var->name ## _stat.min = ~0; \
var->name ## _stat.max = 0; \
var->name ## _stat.mean = 0.0; \
var->name ## _stat.h = 0; \
var->name ## _stat.d.then = 0; \
var->name ## _stat.d.unit = ns;
#define STAT_TICK(var, name) name ## _stat_tick(&var->name ## _stat)
#define STAT_TOCK(var, name) name ## _stat_tock(&var->name ## _stat)
#define STAT_RESET(var, name) name ## _stat_reset(&var->name ## _stat)
#define STAT_MEAN_LOG2_SAMPLE(var, name) \
name ## _stat_mean_lg2(&var->name ## _stat)
#define STAT_MEAN_SAMPLE(var, name) \
name ## _stat_mean(&var->name ## _stat)
#define STAT_PRINT(var, name, mod) \
name ## _stat_print_histogram(&var->name ## _stat, mod)
#if defined(__cplusplus)
}
#endif
#endif // __STATS_H__

View file

@ -179,7 +179,7 @@ open_test_db() ->
{ok, Handle} = ?MODULE:open(DataDir, 2147483648), {ok, Handle} = ?MODULE:open(DataDir, 2147483648),
[?MODULE:upd(Handle, crypto:sha(<<X>>), [?MODULE:upd(Handle, crypto:sha(<<X>>),
crypto:rand_bytes(crypto:rand_uniform(128, 4096))) || crypto:rand_bytes(crypto:rand_uniform(128, 4096))) ||
X <- lists:seq(1, 100)], X <- lists:seq(1, 10)],
Handle. Handle.
basics_test_() -> basics_test_() ->

View file

@ -24,13 +24,14 @@
%% adding a "_" to the name and take the "_" out of the other's name). %% adding a "_" to the name and take the "_" out of the other's name).
{mode, max}. {mode, max}.
{duration, 10}. {duration, 480}.
{concurrent, 8}. {concurrent, 32}.
{driver, basho_bench_driver_lmdb}. {driver, basho_bench_driver_lmdb}.
{key_generator, {int_to_bin_littleendian,{uniform_int, 5000000}}}. {key_generator, {int_to_bin_littleendian,{uniform_int, 5000000000}}}.
{value_generator, {fixed_bin, 1024}}. {value_generator, {highly_compressible_bin, 2048}}.
%{operations, [{get, 9}, {put, 9}, {delete, 2}]}. %{value_generator, {fixed_bin, 1024}}.
{operations, [{put, 1}]}. {operations, [{get, 25}, {put, 70}, {delete, 5}]}.
%{operations, [{put, 1}]}.
{code_paths, ["../lmdb"]}. {code_paths, ["../lmdb"]}.
{lmdb_dir, "/home/gburd/ws/basho_bench/data"}. {lmdb_dir, "/home/gburd/ws/basho_bench/data"}.