Compare commits

..

1 commit

Author SHA1 Message Date
Gregory Burd
8772d39663 WIP: use atoms to speed up hash lookup of shared cursors. 2013-04-24 10:15:46 -04:00
21 changed files with 1684 additions and 2297 deletions

4
.gitignore vendored
View file

@ -7,8 +7,6 @@ c_src/*.o
c_src/bzip2-1.0.6
c_src/snappy-1.0.4
deps/
priv/wt
priv/*.so*
priv/*.dylib*
priv/
log/
*~

187
Makefile
View file

@ -1,70 +1,31 @@
# Copyright 2012 Erlware, LLC. All Rights Reserved.
#
# This file is provided to you under the Apache License,
# Version 2.0 (the "License"); you may not use this file
# except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
TARGET= wterl
# Source: https://gist.github.com/ericbmerritt/5706091
REBAR= ./rebar
#REBAR= /usr/bin/env rebar
ERL= /usr/bin/env erl
ERLEXEC= ${ERL_ROOTDIR}/lib/erlang/erts-5.9.1/bin/erlexec
DIALYZER= /usr/bin/env dialyzer
ARCHIVETAG?= $(shell git describe --always --long --tags)
ARCHIVE?= $(shell basename $(CURDIR))-$(ARCHIVETAG)
WT_ARCHIVETAG?= $(shell cd c_src/wiredtiger-basho; git describe --always --long --tags)
.PHONY: plt analyze all deps compile get-deps clean
ERLFLAGS= -pa $(CURDIR)/.eunit -pa $(CURDIR)/ebin -pa $(CURDIR)/deps/*/ebin
all: compile
DEPS_PLT=$(CURDIR)/.deps_plt
DEPS=erts kernel stdlib
archive:
@rm -f $(ARCHIVE).tar.gz
git archive --format=tar --prefix=$(ARCHIVE)/ $(ARCHIVETAG) | gzip >$(ARCHIVE).tar.gz
# =============================================================================
# Verify that the programs we need to run are installed on this system
# =============================================================================
ERL = $(shell which erl)
deps: get-deps
ifeq ($(ERL),)
$(error "Erlang not available on this system")
endif
REBAR=$(shell which rebar)
ifeq ($(REBAR),)
$(error "Rebar not available on this system")
endif
DIALYZER=$(shell which dialyzer)
ifeq ($(DIALYZER),)
$(error "Dialyzer not available on this system")
endif
TYPER=$(shell which typer)
ifeq ($(TYPER),)
$(error "Typer not available on this system")
endif
.PHONY: all compile doc clean test dialyzer typer shell distclean pdf \
update-deps clean-common-test-data rebuild
all: deps compile
# =============================================================================
# Rules to build the system
# =============================================================================
deps:
$(REBAR) get-deps
$(REBAR) compile
get-deps:
c_src/build_deps.sh get-deps
@$(REBAR) get-deps
update-deps:
$(REBAR) update-deps
$(REBAR) compile
c_src/build_deps.sh update-deps
@$(REBAR) update-deps
c_src/wterl.o: c_src/async_nif.h
touch c_src/wterl.c
@ -75,52 +36,70 @@ ebin/app_helper.beam:
@/bin/false
compile: c_src/wterl.o ebin/app_helper.beam
$(REBAR) skip_deps=true compile
doc:
$(REBAR) skip_deps=true doc
eunit: compile clean-common-test-data
$(REBAR) skip_deps=true eunit
test: compile eunit
$(DEPS_PLT):
@echo Building local plt at $(DEPS_PLT)
@echo
dialyzer --output_plt $(DEPS_PLT) --build_plt \
--apps $(DEPS) -r deps
dialyzer: $(DEPS_PLT)
$(DIALYZER) --fullpath --plt $(DEPS_PLT) -Wrace_conditions -r ./ebin
typer:
$(TYPER) --plt $(DEPS_PLT) -r ./src
xref:
$(REBAR) xref skip_deps=true
# You often want *rebuilt* rebar tests to be available to the shell you have to
# call eunit (to get the tests rebuilt). However, eunit runs the tests, which
# probably fails (thats probably why You want them in the shell). This
# (prefixing the command with "-") runs eunit but tells make to ignore the
# result.
shell: deps compile
- @$(REBAR) skip_deps=true eunit
@$(ERL) $(ERLFLAGS)
pdf:
pandoc README.md -o README.pdf
@$(REBAR) compile
clean:
- c_src/build_deps.sh clean
- rm -rf $(CURDIR)/test/*.beam
- rm -rf $(CURDIR)/logs
- rm -rf $(CURDIR)/ebin
$(REBAR) skip_deps=true clean
@rm -f $(ARCHIVE).tar.gz
@$(REBAR) clean
distclean: clean
- rm -rf $(DEPS_PLT)
- rm -rvf $(CURDIR)/deps
xref:
@$(REBAR) xref skip_deps=true
rebuild: distclean deps compile escript dialyzer test
test: eunit
eunit: compile-for-eunit
@$(REBAR) eunit skip_deps=true
eqc: compile-for-eqc
@$(REBAR) eqc skip_deps=true
proper: compile-for-proper
@echo "rebar does not implement a 'proper' command" && false
triq: compile-for-triq
@$(REBAR) triq skip_deps=true
compile-for-eunit:
@$(REBAR) compile eunit compile_only=true
compile-for-eqc:
@$(REBAR) -D QC -D QC_EQC compile eqc compile_only=true
compile-for-eqcmini:
@$(REBAR) -D QC -D QC_EQCMINI compile eqc compile_only=true
compile-for-proper:
@$(REBAR) -D QC -D QC_PROPER compile eqc compile_only=true
compile-for-triq:
@$(REBAR) -D QC -D QC_TRIQ compile triq compile_only=true
plt: compile
@$(DIALYZER) --build_plt --output_plt .$(TARGET).plt -pa deps/lager/ebin --apps kernel stdlib
analyze: compile
@$(DIALYZER) --plt .$(TARGET).plt -pa deps/lager/ebin ebin
repl:
@$(ERL) -pa ebin -pz deps/lager/ebin
eunit-repl:
@$(ERL) -pa .eunit deps/lager/ebin
ERL_TOP= /home/gburd/eng/otp_R15B01
CERL= ${ERL_TOP}/bin/cerl
VALGRIND_MISC_FLAGS= "--verbose --leak-check=full --show-reachable=yes --trace-children=yes --track-origins=yes --suppressions=${ERL_TOP}/erts/emulator/valgrind/suppress.standard --show-possibly-lost=no --malloc-fill=AB --free-fill=CD"
helgrind:
valgrind --verbose --tool=helgrind \
--leak-check=full
--show-reachable=yes \
--trace-children=yes \
--track-origins=yes \
--suppressions=${ERL_TOP}/erts/emulator/valgrind/suppress.standard \
--show-possibly-lost=no \
--malloc-fill=AB \
--free-fill=CD ${ERLEXEC} -pz deps/lager/ebin -pa ebin -pa .eunit
valgrind:
${CERL} -valgrind ${VALGRIND_FLAGS} --log-file=${ROOTDIR}/valgrind_log-beam.smp.%p -- -pz deps/lager/ebin -pa ebin -pa .eunit -exec 'eunit:test(wterl).'

View file

@ -1,54 +1,26 @@
`wterl` is an Erlang interface to the WiredTiger database, and is written to
support a Riak storage backend that uses WiredTiger.
`wterl` is an Erlang interface to the WiredTiger database, and is written
to support a Riak storage backend that uses WiredTiger.
This backend currently supports only key-value storage and retrieval.
Remaining work includes:
TODO:
* Find/fix any code marked "TODO:"
* Why do we see {error, {eperm, _}} result on wterl:cursor_close/1 during
fold_objects/4?
* Why do we see {error, {eperm, _}} result on wterl:cursor_close/1?
* Why do we see {error, {eperm, _}} result on wterl:cursor_next/1 during
is_empty/1?
* Why do we see {error, {eperm, _}} result on wterl:cursor_next_value/1
during status/1?
* Why do we see {error, {ebusy, _}} result on wterl:drop/2?
* Determine a better way to estimate the number of sessions we should
configure WT for at startup in riak_kv_wterl_backend:max_sessions/1.
* Make sure Erlang is optimizing for selective receive in async_nif_enqueue/3
because in the eLevelDB driver there is a comment: "This cannot be a separate
function. Code must be inline to trigger Erlang compiler's use of optimized
selective receive."
* Provide a way to configure the cursor options, right now they are
always "raw,overwrite".
* Add support for Riak/KV 2i indexes using the same design pattern
as eLevelDB (in a future version consider alternate schema)
* If an operation using a shared cursor results in a non-normal error
then it should be closed/discarded from the recycled pool
* Cache cursors based on hash(table/config) rather than just table.
* Finish NIF unload/reload functions and test.
* Test an upgrade, include a format/schema/WT change.
* When WT_PANIC is returned first try to unload/reload then driver
and reset all state, if that fails then exit gracefully.
* The `wterl:session_create` function currently returns an error under
certain circumstances, so we currently ignore its return value.
* The `riak_kv_wterl_backend` module is currently designed to rely on the
fact that it runs in just a single Erlang scheduler thread, which is
necessary because WiredTiger doesn't allow a session to be used
concurrently by different threads. If the KV node design ever changes to
involve concurrency across scheduler threads, this current design will no
longer work correctly.
* Currently the `riak_kv_wterl_backend` module is stored in this
repository, but it really belongs in the `riak_kv` repository.
* wterl:truncate/5 can segv, and its tests are commented out
* Add async_nif and wterl NIF stats to the results provided by the
stats API
* Longer term ideas/changes to consider:
* More testing, especially pulse/qc
* Riak/KV integration
* Store 2i indexes in separate tables
* Store buckets, in separate tables and keep a <<bucket/key>> index
to ensure that folds across a vnode are easy
* Provide a drop bucket API call
* Support key expirey
* An ets API (like the LevelDB's lets project)
* Use mime-type to inform WT's schema for key value encoding
* Other use cases within Riak
* An AAE driver using WT
* An ability to store the ring file via WT
* There are currently some stability issues with WiredTiger that can
sometimes cause errors when restarting KV nodes with non-empty WiredTiger
storage.
Future support for secondary indexes requires WiredTiger features that are
under development but are not yet available.
Deploying
---------

View file

@ -4,16 +4,18 @@
* Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved.
* Author: Gregory Burd <greg@basho.com> <greg@burd.me>
*
* This file is provided to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
* This file is provided to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
@ -25,45 +27,24 @@ extern "C" {
#endif
#include <assert.h>
#include "queue.h"
#ifndef UNUSED
#define UNUSED(v) ((void)(v))
#include "fifo_q.h"
#ifdef ASYNC_NIF_STATS
#include "stats.h" // TODO: measure, measure... measure again
#endif
#define ASYNC_NIF_MAX_WORKERS 1024
#define ASYNC_NIF_MIN_WORKERS 2
#define ASYNC_NIF_WORKER_QUEUE_SIZE 8192
#define ASYNC_NIF_MAX_QUEUED_REQS ASYNC_NIF_WORKER_QUEUE_SIZE * ASYNC_NIF_MAX_WORKERS
#ifndef __UNUSED
#define __UNUSED(v) ((void)(v))
#endif
/* Atoms (initialized in on_load) */
static ERL_NIF_TERM ATOM_EAGAIN;
static ERL_NIF_TERM ATOM_ENOMEM;
static ERL_NIF_TERM ATOM_ENQUEUED;
static ERL_NIF_TERM ATOM_ERROR;
static ERL_NIF_TERM ATOM_OK;
static ERL_NIF_TERM ATOM_SHUTDOWN;
struct async_nif_req_entry {
ERL_NIF_TERM ref;
ErlNifEnv *env;
ErlNifPid pid;
void *args;
void (*fn_work)(ErlNifEnv*, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *);
void (*fn_post)(void *);
STAILQ_ENTRY(async_nif_req_entry) entries;
};
#define ASYNC_NIF_MAX_WORKERS 128
#define ASYNC_NIF_WORKER_QUEUE_SIZE 500
DECL_FIFO_QUEUE(reqs, struct async_nif_req_entry);
struct async_nif_work_queue {
unsigned int num_workers;
unsigned int depth;
ErlNifMutex *reqs_mutex;
ErlNifCond *reqs_cnd;
struct async_nif_work_queue *next;
STAILQ_HEAD(reqs, async_nif_req_entry) reqs;
FIFO_QUEUE_TYPE(reqs) reqs;
};
struct async_nif_worker_entry {
@ -71,64 +52,65 @@ struct async_nif_worker_entry {
unsigned int worker_id;
struct async_nif_state *async_nif;
struct async_nif_work_queue *q;
SLIST_ENTRY(async_nif_worker_entry) entries;
};
struct async_nif_state {
unsigned int shutdown;
ErlNifMutex *we_mutex;
unsigned int we_active;
SLIST_HEAD(joining, async_nif_worker_entry) we_joining;
unsigned int num_workers;
struct async_nif_worker_entry worker_entries[ASYNC_NIF_MAX_WORKERS];
unsigned int num_queues;
unsigned int next_q;
STAILQ_HEAD(recycled_reqs, async_nif_req_entry) recycled_reqs;
unsigned int num_reqs;
ErlNifMutex *recycled_req_mutex;
struct async_nif_work_queue queues[];
};
#define ASYNC_NIF_DECL(decl, frame, pre_block, work_block, post_block) \
struct decl ## _args frame; \
static void fn_work_ ## decl (ErlNifEnv *env, ERL_NIF_TERM ref, ErlNifPid *pid, unsigned int worker_id, struct decl ## _args *args) { \
UNUSED(worker_id); \
DPRINTF("async_nif: calling \"%s\"", __func__); \
__UNUSED(worker_id); \
do work_block while(0); \
DPRINTF("async_nif: returned from \"%s\"", __func__); \
} \
static void fn_post_ ## decl (struct decl ## _args *args) { \
UNUSED(args); \
DPRINTF("async_nif: calling \"fn_post_%s\"", #decl); \
__UNUSED(args); \
do post_block while(0); \
DPRINTF("async_nif: returned from \"fn_post_%s\"", #decl); \
} \
static ERL_NIF_TERM decl(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv_in[]) { \
struct decl ## _args on_stack_args; \
struct decl ## _args *args = &on_stack_args; \
struct decl ## _args *copy_of_args; \
struct async_nif_req_entry *req = NULL; \
unsigned int affinity = 0; \
const char *affinity = NULL; \
ErlNifEnv *new_env = NULL; \
/* argv[0] is a ref used for selective recv */ \
const ERL_NIF_TERM *argv = argv_in + 1; \
argc -= 1; \
/* Note: !!! this assumes that the first element of priv_data is ours */ \
struct async_nif_state *async_nif = *(struct async_nif_state**)enif_priv_data(env); \
if (async_nif->shutdown) \
return enif_make_tuple2(env, ATOM_ERROR, ATOM_SHUTDOWN); \
req = async_nif_reuse_req(async_nif); \
if (!req) \
return enif_make_tuple2(env, ATOM_ERROR, ATOM_ENOMEM); \
new_env = req->env; \
DPRINTF("async_nif: calling \"%s\"", __func__); \
if (async_nif->shutdown) \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "shutdown")); \
if (!(new_env = enif_alloc_env())) { /*TODO: cache, enif_clear();*/ \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "enomem")); \
} \
do pre_block while(0); \
DPRINTF("async_nif: returned from \"%s\"", __func__); \
copy_of_args = (struct decl ## _args *)malloc(sizeof(struct decl ## _args)); \
req = (struct async_nif_req_entry*)enif_alloc(sizeof(struct async_nif_req_entry)); \
if (!req) { \
fn_post_ ## decl (args); \
enif_free_env(new_env); \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "enomem")); \
} \
memset(req, 0, sizeof(struct async_nif_req_entry)); \
copy_of_args = (struct decl ## _args *)enif_alloc(sizeof(struct decl ## _args)); \
if (!copy_of_args) { \
fn_post_ ## decl (args); \
async_nif_recycle_req(req, async_nif); \
return enif_make_tuple2(env, ATOM_ERROR, ATOM_ENOMEM); \
enif_free(req); \
enif_free_env(new_env); \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "enomem")); \
} \
memcpy(copy_of_args, args, sizeof(struct decl ## _args)); \
req->env = new_env; \
req->ref = enif_make_copy(new_env, argv_in[0]); \
enif_self(env, &req->pid); \
req->args = (void*)copy_of_args; \
@ -136,13 +118,15 @@ struct async_nif_state {
req->fn_post = (void (*)(void *))fn_post_ ## decl; \
int h = -1; \
if (affinity) \
h = ((unsigned int)affinity) % async_nif->num_queues; \
h = async_nif_str_hash_func(affinity) % async_nif->num_queues; \
ERL_NIF_TERM reply = async_nif_enqueue_req(async_nif, req, h); \
if (!reply) { \
fn_post_ ## decl (args); \
async_nif_recycle_req(req, async_nif); \
free(copy_of_args); \
return enif_make_tuple2(env, ATOM_ERROR, ATOM_EAGAIN); \
enif_free(req); \
enif_free_env(new_env); \
enif_free(copy_of_args); \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "shutdown")); \
} \
return reply; \
}
@ -150,16 +134,16 @@ struct async_nif_state {
#define ASYNC_NIF_INIT(name) \
static ErlNifMutex *name##_async_nif_coord = NULL;
#define ASYNC_NIF_LOAD(name, env, priv) do { \
#define ASYNC_NIF_LOAD(name, priv) do { \
if (!name##_async_nif_coord) \
name##_async_nif_coord = enif_mutex_create("nif_coord load"); \
name##_async_nif_coord = enif_mutex_create(NULL); \
enif_mutex_lock(name##_async_nif_coord); \
priv = async_nif_load(env); \
priv = async_nif_load(); \
enif_mutex_unlock(name##_async_nif_coord); \
} while(0);
#define ASYNC_NIF_UNLOAD(name, env, priv) do { \
if (!name##_async_nif_coord) \
name##_async_nif_coord = enif_mutex_create("nif_coord unload"); \
name##_async_nif_coord = enif_mutex_create(NULL); \
enif_mutex_lock(name##_async_nif_coord); \
async_nif_unload(env, priv); \
enif_mutex_unlock(name##_async_nif_coord); \
@ -168,210 +152,72 @@ struct async_nif_state {
} while(0);
#define ASYNC_NIF_UPGRADE(name, env) do { \
if (!name##_async_nif_coord) \
name##_async_nif_coord = enif_mutex_create("nif_coord upgrade"); \
name##_async_nif_coord = enif_mutex_create(NULL); \
enif_mutex_lock(name##_async_nif_coord); \
async_nif_upgrade(env); \
enif_mutex_unlock(name##_async_nif_coord); \
} while(0);
#define ASYNC_NIF_RETURN_BADARG() do { \
async_nif_recycle_req(req, async_nif); \
return enif_make_badarg(env); \
} while(0);
#define ASYNC_NIF_RETURN_BADARG() return enif_make_badarg(env);
#define ASYNC_NIF_WORK_ENV new_env
#define ASYNC_NIF_REPLY(msg) enif_send(NULL, pid, env, enif_make_tuple2(env, ref, msg))
/**
* Return a request structure from the recycled req queue if one exists,
* otherwise create one.
* TODO:
*/
struct async_nif_req_entry *
async_nif_reuse_req(struct async_nif_state *async_nif)
static inline unsigned int async_nif_str_hash_func(const char *s)
{
struct async_nif_req_entry *req = NULL;
ErlNifEnv *env = NULL;
enif_mutex_lock(async_nif->recycled_req_mutex);
if (STAILQ_EMPTY(&async_nif->recycled_reqs)) {
if (async_nif->num_reqs < ASYNC_NIF_MAX_QUEUED_REQS) {
req = malloc(sizeof(struct async_nif_req_entry));
if (req) {
memset(req, 0, sizeof(struct async_nif_req_entry));
env = enif_alloc_env();
if (env) {
req->env = env;
__sync_fetch_and_add(&async_nif->num_reqs, 1);
} else {
free(req);
req = NULL;
}
}
}
} else {
req = STAILQ_FIRST(&async_nif->recycled_reqs);
STAILQ_REMOVE(&async_nif->recycled_reqs, req, async_nif_req_entry, entries);
}
enif_mutex_unlock(async_nif->recycled_req_mutex);
return req;
unsigned int h = (unsigned int)*s;
if (h) for (++s ; *s; ++s) h = (h << 5) - h + (unsigned int)*s;
return h;
}
/**
* Store the request for future re-use.
*
* req a request entry with an ErlNifEnv* which will be cleared
* before reuse, but not until then.
* async_nif a handle to our state so that we can find and use the mutex
*/
void
async_nif_recycle_req(struct async_nif_req_entry *req, struct async_nif_state *async_nif)
{
ErlNifEnv *env = NULL;
enif_mutex_lock(async_nif->recycled_req_mutex);
enif_clear_env(req->env);
env = req->env;
memset(req, 0, sizeof(struct async_nif_req_entry));
req->env = env;
STAILQ_INSERT_TAIL(&async_nif->recycled_reqs, req, entries);
enif_mutex_unlock(async_nif->recycled_req_mutex);
}
static void *async_nif_worker_fn(void *);
/**
* Start up a worker thread.
*/
static int
async_nif_start_worker(struct async_nif_state *async_nif, struct async_nif_work_queue *q)
{
struct async_nif_worker_entry *we;
if (0 == q)
return EINVAL;
enif_mutex_lock(async_nif->we_mutex);
we = SLIST_FIRST(&async_nif->we_joining);
while(we != NULL) {
struct async_nif_worker_entry *n = SLIST_NEXT(we, entries);
SLIST_REMOVE(&async_nif->we_joining, we, async_nif_worker_entry, entries);
void *exit_value = 0; /* We ignore the thread_join's exit value. */
enif_thread_join(we->tid, &exit_value);
free(we);
async_nif->we_active--;
we = n;
}
if (async_nif->we_active == ASYNC_NIF_MAX_WORKERS) {
enif_mutex_unlock(async_nif->we_mutex);
return EAGAIN;
}
we = malloc(sizeof(struct async_nif_worker_entry));
if (!we) {
enif_mutex_unlock(async_nif->we_mutex);
return ENOMEM;
}
memset(we, 0, sizeof(struct async_nif_worker_entry));
we->worker_id = async_nif->we_active++;
we->async_nif = async_nif;
we->q = q;
enif_mutex_unlock(async_nif->we_mutex);
return enif_thread_create(NULL,&we->tid, &async_nif_worker_fn, (void*)we, 0);
}
/**
* Enqueue a request for processing by a worker thread.
*
* Places the request into a work queue determined either by the
* provided affinity or by iterating through the available queues.
* TODO:
*/
static ERL_NIF_TERM
async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_entry *req, int hint)
{
/* Identify the most appropriate worker for this request. */
unsigned int i, last_qid, qid = 0;
unsigned int qid = (hint >= 0) ? (unsigned int)hint : async_nif->next_q;
struct async_nif_work_queue *q = NULL;
double avg_depth = 0.0;
/* Either we're choosing a queue based on some affinity/hinted value or we
need to select the next queue in the rotation and atomically update that
global value (next_q is shared across worker threads) . */
if (hint >= 0) {
qid = (unsigned int)hint;
} else {
do {
last_qid = __sync_fetch_and_add(&async_nif->next_q, 0);
qid = (last_qid + 1) % async_nif->num_queues;
} while (!__sync_bool_compare_and_swap(&async_nif->next_q, last_qid, qid));
}
/* Now we inspect and interate across the set of queues trying to select one
that isn't too full or too slow. */
for (i = 0; i < async_nif->num_queues; i++) {
/* Compute the average queue depth not counting queues which are empty or
the queue we're considering right now. */
unsigned int j, n = 0;
for (j = 0; j < async_nif->num_queues; j++) {
if (j != qid && async_nif->queues[j].depth != 0) {
n++;
avg_depth += async_nif->queues[j].depth;
}
}
if (avg_depth) avg_depth /= n;
/* Lock this queue under consideration, then check for shutdown. While
we hold this lock either a) we're shutting down so exit now or b) this
queue will be valid until we release the lock. */
do {
q = &async_nif->queues[qid];
enif_mutex_lock(q->reqs_mutex);
/* Try not to enqueue a request into a queue that isn't keeping up with
the request volume. */
if (q->depth <= avg_depth) break;
else {
/* Now that we hold the lock, check for shutdown. As long as we
hold this lock either a) we're shutting down so exit now or
b) this queue will be valid until we release the lock. */
if (async_nif->shutdown) {
enif_mutex_unlock(q->reqs_mutex);
return 0;
}
if (fifo_q_full(reqs, q->reqs)) { // TODO: || (q->avg_latency > median_latency)
enif_mutex_unlock(q->reqs_mutex);
qid = (qid + 1) % async_nif->num_queues;
q = &async_nif->queues[qid];
} else {
break;
}
}
// TODO: at some point add in work sheading/stealing
} while(1);
/* If the for loop finished then we didn't find a suitable queue for this
request, meaning we're backed up so trigger eagain. Note that if we left
the loop in this way we hold no lock. */
if (i == async_nif->num_queues) return 0;
/* Add the request to the queue. */
STAILQ_INSERT_TAIL(&q->reqs, req, entries);
__sync_fetch_and_add(&q->depth, 1);
/* We've selected a queue for this new request now check to make sure there are
enough workers actively processing requests on this queue. */
while (q->depth > q->num_workers) {
switch(async_nif_start_worker(async_nif, q)) {
case EINVAL: case ENOMEM: default: return 0;
case EAGAIN: continue;
case 0: __sync_fetch_and_add(&q->num_workers, 1); goto done;
}
}done:;
/* We hold the queue's lock, and we've seletect a reasonable queue for this
new request so add the request. */
fifo_q_put(reqs, q->reqs, req);
/* Build the term before releasing the lock so as not to race on the use of
the req pointer (which will soon become invalid in another thread
performing the request). */
double pct_full = (double)avg_depth / (double)ASYNC_NIF_WORKER_QUEUE_SIZE;
ERL_NIF_TERM reply = enif_make_tuple2(req->env, ATOM_OK,
enif_make_tuple2(req->env, ATOM_ENQUEUED,
enif_make_double(req->env, pct_full)));
enif_cond_signal(q->reqs_cnd);
ERL_NIF_TERM reply = enif_make_tuple2(req->env, enif_make_atom(req->env, "ok"),
enif_make_atom(req->env, "enqueued"));
enif_mutex_unlock(q->reqs_mutex);
enif_cond_signal(q->reqs_cnd);
return reply;
}
/**
* Worker threads execute this function. Here each worker pulls requests of
* their respective queues, executes that work and continues doing that until
* they see the shutdown flag is set at which point they exit.
*/
static void *
async_nif_worker_fn(void *arg)
{
@ -380,7 +226,6 @@ async_nif_worker_fn(void *arg)
struct async_nif_state *async_nif = we->async_nif;
struct async_nif_work_queue *q = we->q;
struct async_nif_req_entry *req = NULL;
unsigned int tries = async_nif->num_queues;
for(;;) {
/* Examine the request queue, are there things to be done? */
@ -390,58 +235,35 @@ async_nif_worker_fn(void *arg)
enif_mutex_unlock(q->reqs_mutex);
break;
}
if (STAILQ_EMPTY(&q->reqs)) {
/* Queue is empty so we wait for more work to arrive. */
enif_mutex_unlock(q->reqs_mutex);
if (tries == 0 && q == we->q) {
if (q->num_workers > ASYNC_NIF_MIN_WORKERS) {
/* At this point we've tried to find/execute work on all queues
* and there are at least MIN_WORKERS on this queue so we
* leaving this loop (break) which leads to a thread exit/join. */
break;
} else {
enif_mutex_lock(q->reqs_mutex);
enif_cond_wait(q->reqs_cnd, q->reqs_mutex);
goto check_again_for_work;
}
} else {
tries--;
__sync_fetch_and_add(&q->num_workers, -1);
q = q->next;
__sync_fetch_and_add(&q->num_workers, 1);
continue; // try next queue
}
if (fifo_q_empty(reqs, q->reqs)) {
/* Queue is empty, wait for work */
enif_cond_wait(q->reqs_cnd, q->reqs_mutex);
goto check_again_for_work;
} else {
assert(fifo_q_size(reqs, q->reqs) > 0);
assert(fifo_q_size(reqs, q->reqs) < fifo_q_capacity(reqs, q->reqs));
/* At this point the next req is ours to process and we hold the
reqs_mutex lock. Take the request off the queue. */
req = STAILQ_FIRST(&q->reqs);
STAILQ_REMOVE(&q->reqs, req, async_nif_req_entry, entries);
__sync_fetch_and_add(&q->depth, -1);
/* Wake up other worker thread watching this queue to help process work. */
enif_cond_signal(q->reqs_cnd);
req = fifo_q_get(reqs, q->reqs);
enif_mutex_unlock(q->reqs_mutex);
/* Ensure that there is at least one other worker thread watching this
queue. */
enif_cond_signal(q->reqs_cnd);
/* Perform the work. */
req->fn_work(req->env, req->ref, &req->pid, worker_id, req->args);
/* Now call the post-work cleanup function. */
req->fn_post(req->args);
/* Clean up req for reuse. */
req->ref = 0;
req->fn_work = 0;
req->fn_post = 0;
free(req->args);
req->args = NULL;
async_nif_recycle_req(req, async_nif);
/* Free resources allocated for this async request. */
enif_free_env(req->env);
enif_free(req->args);
enif_free(req);
req = NULL;
}
}
enif_mutex_lock(async_nif->we_mutex);
SLIST_INSERT_HEAD(&async_nif->we_joining, we, entries);
enif_mutex_unlock(async_nif->we_mutex);
__sync_fetch_and_add(&q->num_workers, -1);
enif_thread_exit(0);
return 0;
}
@ -452,90 +274,60 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
unsigned int i;
unsigned int num_queues = async_nif->num_queues;
struct async_nif_work_queue *q = NULL;
struct async_nif_req_entry *req = NULL;
struct async_nif_worker_entry *we = NULL;
UNUSED(env);
__UNUSED(env);
/* Signal the worker threads, stop what you're doing and exit. To ensure
that we don't race with the enqueue() process we first lock all the worker
queues, then set shutdown to true, then unlock. The enqueue function will
take the queue mutex, then test for shutdown condition, then enqueue only
if not shutting down. */
/* Signal the worker threads, stop what you're doing and exit. To
ensure that we don't race with the enqueue() process we first
lock all the worker queues, then set shutdown to true, then
unlock. The enqueue function will take the queue mutex, then
test for shutdown condition, then enqueue only if not shutting
down. */
for (i = 0; i < num_queues; i++) {
q = &async_nif->queues[i];
enif_mutex_lock(q->reqs_mutex);
}
/* Set the shutdown flag so that worker threads will no continue
executing requests. */
async_nif->shutdown = 1;
for (i = 0; i < num_queues; i++) {
q = &async_nif->queues[i];
enif_cond_broadcast(q->reqs_cnd);
enif_mutex_unlock(q->reqs_mutex);
}
/* Join for the now exiting worker threads. */
while(async_nif->we_active > 0) {
for (i = 0; i < num_queues; i++)
enif_cond_broadcast(async_nif->queues[i].reqs_cnd);
enif_mutex_lock(async_nif->we_mutex);
we = SLIST_FIRST(&async_nif->we_joining);
while(we != NULL) {
struct async_nif_worker_entry *n = SLIST_NEXT(we, entries);
SLIST_REMOVE(&async_nif->we_joining, we, async_nif_worker_entry, entries);
void *exit_value = 0; /* We ignore the thread_join's exit value. */
enif_thread_join(we->tid, &exit_value);
free(we);
async_nif->we_active--;
we = n;
}
enif_mutex_unlock(async_nif->we_mutex);
for (i = 0; i < async_nif->num_workers; ++i) {
void *exit_value = 0; /* We ignore the thread_join's exit value. */
enif_thread_join(async_nif->worker_entries[i].tid, &exit_value);
}
enif_mutex_destroy(async_nif->we_mutex);
/* Cleanup in-flight requests, mutexes and conditions in each work queue. */
/* Cleanup requests, mutexes and conditions in each work queue. */
for (i = 0; i < num_queues; i++) {
q = &async_nif->queues[i];
/* Worker threads are stopped, now toss anything left in the queue. */
req = NULL;
req = STAILQ_FIRST(&q->reqs);
while(req != NULL) {
struct async_nif_req_entry *n = STAILQ_NEXT(req, entries);
struct async_nif_req_entry *req = NULL;
fifo_q_foreach(reqs, q->reqs, req, {
enif_clear_env(req->env);
enif_send(NULL, &req->pid, req->env,
enif_make_tuple2(req->env, ATOM_ERROR, ATOM_SHUTDOWN));
enif_make_tuple2(req->env, enif_make_atom(req->env, "error"),
enif_make_atom(req->env, "shutdown")));
req->fn_post(req->args);
enif_free_env(req->env);
free(req->args);
free(req);
req = n;
}
enif_free(req->args);
enif_free(req);
});
fifo_q_free(reqs, q->reqs);
enif_mutex_destroy(q->reqs_mutex);
enif_cond_destroy(q->reqs_cnd);
}
/* Free any req structures sitting unused on the recycle queue. */
enif_mutex_lock(async_nif->recycled_req_mutex);
req = NULL;
req = STAILQ_FIRST(&async_nif->recycled_reqs);
while(req != NULL) {
struct async_nif_req_entry *n = STAILQ_NEXT(req, entries);
enif_free_env(req->env);
free(req);
req = n;
}
enif_mutex_unlock(async_nif->recycled_req_mutex);
enif_mutex_destroy(async_nif->recycled_req_mutex);
memset(async_nif, 0, sizeof(struct async_nif_state) + (sizeof(struct async_nif_work_queue) * async_nif->num_queues));
free(async_nif);
enif_free(async_nif);
}
static void *
async_nif_load(ErlNifEnv *env)
async_nif_load()
{
static int has_init = 0;
unsigned int i, num_queues;
unsigned int i, j, num_queues;
ErlNifSysInfo info;
struct async_nif_state *async_nif;
@ -543,14 +335,6 @@ async_nif_load(ErlNifEnv *env)
if (has_init) return 0;
else has_init = 1;
/* Init some static references to commonly used atoms. */
ATOM_EAGAIN = enif_make_atom(env, "eagain");
ATOM_ENOMEM = enif_make_atom(env, "enomem");
ATOM_ENQUEUED = enif_make_atom(env, "enqueued");
ATOM_ERROR = enif_make_atom(env, "error");
ATOM_OK = enif_make_atom(env, "ok");
ATOM_SHUTDOWN = enif_make_atom(env, "shutdown");
/* Find out how many schedulers there are. */
enif_system_info(&info, sizeof(ErlNifSysInfo));
@ -568,28 +352,58 @@ async_nif_load(ErlNifEnv *env)
}
/* Init our portion of priv_data's module-specific state. */
async_nif = malloc(sizeof(struct async_nif_state) +
sizeof(struct async_nif_work_queue) * num_queues);
async_nif = enif_alloc(sizeof(struct async_nif_state) +
sizeof(struct async_nif_work_queue) * num_queues);
if (!async_nif)
return NULL;
memset(async_nif, 0, sizeof(struct async_nif_state) +
sizeof(struct async_nif_work_queue) * num_queues);
sizeof(struct async_nif_work_queue) * num_queues);
async_nif->num_queues = num_queues;
async_nif->we_active = 0;
async_nif->num_workers = ASYNC_NIF_MAX_WORKERS; // TODO: start with 2 per queue, then grow if needed
async_nif->next_q = 0;
async_nif->shutdown = 0;
STAILQ_INIT(&async_nif->recycled_reqs);
async_nif->recycled_req_mutex = enif_mutex_create("recycled_req");
async_nif->we_mutex = enif_mutex_create("we");
SLIST_INIT(&async_nif->we_joining);
for (i = 0; i < async_nif->num_queues; i++) {
struct async_nif_work_queue *q = &async_nif->queues[i];
STAILQ_INIT(&q->reqs);
q->reqs_mutex = enif_mutex_create("reqs");
q->reqs_cnd = enif_cond_create("reqs");
q->next = &async_nif->queues[(i + 1) % num_queues];
q->reqs = fifo_q_new(reqs, ASYNC_NIF_WORKER_QUEUE_SIZE);
q->reqs_mutex = enif_mutex_create(NULL);
q->reqs_cnd = enif_cond_create(NULL);
}
/* Setup the thread pool management. */
memset(async_nif->worker_entries, 0, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS);
/* Start the worker threads. */
for (i = 0; i < async_nif->num_workers; i++) {
struct async_nif_worker_entry *we = &async_nif->worker_entries[i];
we->async_nif = async_nif;
we->worker_id = i;
we->q = &async_nif->queues[i % async_nif->num_queues];
if (enif_thread_create(NULL, &async_nif->worker_entries[i].tid,
&async_nif_worker_fn, (void*)we, NULL) != 0) {
async_nif->shutdown = 1;
for (j = 0; j < async_nif->num_queues; j++) {
struct async_nif_work_queue *q = &async_nif->queues[j];
enif_cond_broadcast(q->reqs_cnd);
}
while(i-- > 0) {
void *exit_value = 0; /* Ignore this. */
enif_thread_join(async_nif->worker_entries[i].tid, &exit_value);
}
for (j = 0; j < async_nif->num_queues; j++) {
struct async_nif_work_queue *q = &async_nif->queues[j];
enif_mutex_destroy(q->reqs_mutex);
enif_cond_destroy(q->reqs_cnd);
}
memset(async_nif->worker_entries, 0, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS);
enif_free(async_nif);
return NULL;
}
}
return async_nif;
}
@ -597,7 +411,7 @@ async_nif_load(ErlNifEnv *env)
static void
async_nif_upgrade(ErlNifEnv *env)
{
UNUSED(env);
__UNUSED(env);
// TODO:
}

View file

@ -11,14 +11,16 @@ unset POSIX_SHELL # clear it so if we invoke other scripts, they run as ksh as w
set -e
WT_REPO=http://github.com/wiredtiger/wiredtiger.git
WT_BRANCH=develop
WT_DIR=wiredtiger-`basename $WT_BRANCH`
#WT_REF="tags/1.6.6"
#WT_DIR=wiredtiger-`basename $WT_REF`
WT_BRANCH=basho
WT_VSN=""
WT_DIR=wiredtiger-$WT_BRANCH
SNAPPY_VSN="1.0.4"
SNAPPY_DIR=snappy-$SNAPPY_VSN
BZIP2_VSN="1.0.6"
BZIP2_DIR=bzip2-$BZIP2_VSN
[ `basename $PWD` != "c_src" ] && cd c_src
export BASEDIR="$PWD"
@ -26,7 +28,8 @@ export BASEDIR="$PWD"
which gmake 1>/dev/null 2>/dev/null && MAKE=gmake
MAKE=${MAKE:-make}
export CPPFLAGS="$CPPLAGS -I $BASEDIR/system/include -O3 -mtune=native -march=native"
export CFLAGS="$CFLAGS -I $BASEDIR/system/include"
export CXXFLAGS="$CXXFLAGS -I $BASEDIR/system/include"
export LDFLAGS="$LDFLAGS -L$BASEDIR/system/lib"
export LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$BASEDIR/system/lib:$LD_LIBRARY_PATH"
@ -35,31 +38,28 @@ get_wt ()
if [ -d $BASEDIR/$WT_DIR/.git ]; then
(cd $BASEDIR/$WT_DIR && git pull -u) || exit 1
else
if [ "X$WT_REF" != "X" ]; then
git clone ${WT_REPO} ${WT_DIR} && \
(cd $BASEDIR/$WT_DIR && git checkout refs/$WT_REF || exit 1)
if [ "X$WT_VSN" == "X" ]; then
git clone ${WT_REPO} && \
(cd $BASEDIR/wiredtiger && git checkout $WT_VSN || exit 1)
else
git clone ${WT_REPO} ${WT_DIR} && \
(cd $BASEDIR/$WT_DIR && git checkout -b $WT_BRANCH origin/$WT_BRANCH || exit 1)
git clone -b ${WT_BRANCH} --single-branch ${WT_REPO} && \
(cd $BASEDIR/wiredtiger && git checkout -b $WT_BRANCH origin/$WT_BRANCH || exit 1)
fi
mv wiredtiger $WT_DIR || exit 1
fi
[ -d $BASEDIR/$WT_DIR ] || (echo "Missing WiredTiger source directory" && exit 1)
(cd $BASEDIR/$WT_DIR && git cherry-pick a3c8c2a13758ae9c44edabcc1a780984a7882904 || exit 1)
(cd $BASEDIR/$WT_DIR
[ -e $BASEDIR/wiredtiger-build.patch ] && \
(patch -p1 --forward < $BASEDIR/wiredtiger-build.patch || exit 1 )
./autogen.sh || exit 1
[ -e $BASEDIR/$WT_DIR/build_posix/Makefile ] && \
(cd $BASEDIR/$WT_DIR/build_posix && $MAKE distclean)
wt_configure;
)
}
wt_configure ()
{
(cd $BASEDIR/$WT_DIR/build_posix
CFLAGS+=-g $BASEDIR/$WT_DIR/configure --with-pic \
cd ./build_posix || exit 1
[ -e Makefile ] && $MAKE distclean
../configure --with-pic \
--enable-snappy \
--prefix=${BASEDIR}/system || exit 1)
--enable-bzip2 \
--prefix=${BASEDIR}/system || exit 1
)
}
get_snappy ()
@ -73,10 +73,20 @@ get_snappy ()
./configure --with-pic --prefix=$BASEDIR/system || exit 1)
}
get_bzip2 ()
{
[ -e bzip2-$BZIP2_VSN.tar.gz ] || (echo "Missing bzip2 ($BZIP2_VSN) source package" && exit 1)
[ -d $BASEDIR/$BZIP2_DIR ] || tar -xzf bzip2-$BZIP2_VSN.tar.gz
[ -e $BASEDIR/bzip2-build.patch ] && \
(cd $BASEDIR/$BZIP2_DIR
patch -p1 --forward < $BASEDIR/bzip2-build.patch || exit 1)
}
get_deps ()
{
get_snappy;
get_wt;
get_snappy;
get_bzip2;
}
update_deps ()
@ -94,8 +104,8 @@ update_deps ()
build_wt ()
{
wt_configure;
(cd $BASEDIR/$WT_DIR/build_posix && $MAKE -j && $MAKE install)
(cd $BASEDIR/$WT_DIR/build_posix && \
$MAKE -j && $MAKE install)
}
build_snappy ()
@ -106,14 +116,26 @@ build_snappy ()
)
}
build_bzip2 ()
{
(cd $BASEDIR/$BZIP2_DIR && \
$MAKE -j -f Makefile-libbz2_so && \
mkdir -p $BASEDIR/system/lib && \
cp -f bzlib.h $BASEDIR/system/include && \
cp -f libbz2.so.1.0.6 $BASEDIR/system/lib && \
ln -s $BASEDIR/system/lib/libbz2.so.1.0.6 $BASEDIR/system/lib/libbz2.so && \
ln -s $BASEDIR/system/lib/libbz2.so.1.0.6 $BASEDIR/system/lib/libbz2-1.so && \
ln -s $BASEDIR/system/lib/libbz2.so.1.0.6 $BASEDIR/system/lib/libbz2-1.0.so
)
}
case "$1" in
clean)
[ -e $BASEDIR/$WT_DIR/build_posix/Makefile ] && \
(cd $BASEDIR/$WT_DIR/build_posix && $MAKE clean)
rm -rf system $SNAPPY_DIR
rm -rf system $WT_DIR $SNAPPY_DIR $BZIP2_DIR
rm -f ${BASEDIR}/../priv/wt
rm -f ${BASEDIR}/../priv/libwiredtiger-*.so
rm -f ${BASEDIR}/../priv/libwiredtiger_*.so
rm -f ${BASEDIR}/../priv/libbz2.so.*
rm -f ${BASEDIR}/../priv/libsnappy.so.*
;;
@ -130,23 +152,34 @@ case "$1" in
;;
*)
shopt -s extglob
SUFFIXES='@(so|dylib)'
[ -d $WT_DIR ] || get_wt;
[ -d $SNAPPY_DIR ] || get_snappy;
[ -d $BZIP2_DIR ] || get_bzip2;
# Build Snappy
[ -d $SNAPPY_DIR ] || get_snappy;
[ -d $BASEDIR/$SNAPPY_DIR ] || (echo "Missing Snappy source directory" && exit 1)
test -f $BASEDIR/system/lib/libsnappy.so.[0-9].[0-9].[0-9].* || build_snappy;
test -f $BASEDIR/system/lib/libsnappy.so.[0-9].[0-9].[0-9] || build_snappy;
# Build BZIP2
[ -d $BASEDIR/$BZIP2_DIR ] || (echo "Missing BZip2 source directory" && exit 1)
test -f $BASEDIR/system/lib/libbz2.so.[0-9].[0-9].[0-9] || build_bzip2;
# Build WiredTiger
[ -d $WT_DIR ] || get_wt;
[ -d $BASEDIR/$WT_DIR ] || (echo "Missing WiredTiger source directory" && exit 1)
test -f $BASEDIR/system/lib/libwiredtiger-[0-9].[0-9].[0-9].${SUFFIXES} -a \
-f $BASEDIR/system/lib/libwiredtiger_snappy.${SUFFIXES} || build_wt;
test -f $BASEDIR/system/lib/libwiredtiger-[0-9].[0-9].[0-9].so \
-a -f $BASEDIR/system/lib/libwiredtiger_snappy.so \
-a -f $BASEDIR/system/lib/libwiredtiger_bzip2.so.[0-9].[0-9].[0-9] || build_wt;
[ -d $BASEDIR/../priv ] || mkdir ${BASEDIR}/../priv
cp -p -P $BASEDIR/system/bin/wt ${BASEDIR}/../priv
cp -p -P ${BASEDIR}/system/lib/libwiredtiger-[0-9].[0-9].[0-9].${SUFFIXES} ${BASEDIR}/../priv
cp -p -P ${BASEDIR}/system/lib/libwiredtiger_snappy.${SUFFIXES} ${BASEDIR}/../priv
cp -p -P ${BASEDIR}/system/lib/libsnappy.${SUFFIXES}* ${BASEDIR}/../priv
cp -p -P $BASEDIR/system/bin/wt ${BASEDIR}/../priv
cp -p -P $BASEDIR/system/lib/libwiredtiger-[0-9].[0-9].[0-9].so ${BASEDIR}/../priv
cp -p -P $BASEDIR/system/lib/libwiredtiger_snappy.so ${BASEDIR}/../priv
cp -p -P $BASEDIR/system/lib/libwiredtiger_bzip2.so* ${BASEDIR}/../priv
cp -p -P $BASEDIR/system/lib/libbz2.so.[0-9].[0-9].[0-9] ${BASEDIR}/../priv
(cd ${BASEDIR}/../priv
[ -L libbz2.so ] || ln -s libbz2.so.1.0.6 libbz2.so
[ -L libbz2.so.1 ] || ln -s libbz2.so.1.0.6 libbz2.so.1
[ -L libbz2.so.1.0 ] || ln -s libbz2.so.1.0.6 libbz2.so.1.0)
cp -p -P $BASEDIR/system/lib/libsnappy.so* ${BASEDIR}/../priv
;;
esac

BIN
c_src/bzip2-1.0.6.tar.gz Normal file

Binary file not shown.

0
c_src/bzip2-build.patch Normal file
View file

View file

@ -1,66 +0,0 @@
/*
* Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved.
* Author: Gregory Burd <greg@basho.com> <greg@burd.me>
*
* This file is provided to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef __COMMON_H__
#define __COMMON_H__
#if defined(__cplusplus)
extern "C" {
#endif
#if !(__STDC_VERSION__ >= 199901L || defined(__GNUC__))
# undef DEBUG
# define DEBUG 0
# define DPRINTF (void) /* Vararg macros may be unsupported */
#elif DEBUG
#include <stdio.h>
#include <stdarg.h>
#define DPRINTF(fmt, ...) \
do { \
fprintf(stderr, "%s:%d " fmt "\n", __FILE__, __LINE__, __VA_ARGS__); \
fflush(stderr); \
} while(0)
#define DPUTS(arg) DPRINTF("%s", arg)
#else
#define DPRINTF(fmt, ...) ((void) 0)
#define DPUTS(arg) ((void) 0)
#endif
#ifndef __UNUSED
#define __UNUSED(v) ((void)(v))
#endif
#ifndef COMPQUIET
#define COMPQUIET(n, v) do { \
(n) = (v); \
(n) = (n); \
} while (0)
#endif
#ifdef __APPLE__
#define PRIuint64(x) (x)
#else
#define PRIuint64(x) (unsigned long long)(x)
#endif
#if defined(__cplusplus)
}
#endif
#endif // __COMMON_H__

102
c_src/fifo_q.h Normal file
View file

@ -0,0 +1,102 @@
/*
* fifo_q: a macro-based implementation of a FIFO Queue
*
* Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved.
* Author: Gregory Burd <greg@basho.com> <greg@burd.me>
*
* This file is provided to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef __FIFO_Q_H__
#define __FIFO_Q_H__
#if defined(__cplusplus)
extern "C" {
#endif
#define FIFO_QUEUE_TYPE(name) \
struct fifo_q__ ## name *
#define DECL_FIFO_QUEUE(name, type) \
struct fifo_q__ ## name { \
unsigned int h, t, s; \
type *items[]; \
}; \
static struct fifo_q__ ## name *fifo_q_ ## name ## _new(unsigned int n) { \
int sz = sizeof(struct fifo_q__ ## name) + ((n+1) * sizeof(type *));\
struct fifo_q__ ## name *q = enif_alloc(sz); \
if (!q) \
return 0; \
memset(q, 0, sz); \
q->s = n + 1; \
return q; \
} \
static inline void fifo_q_ ## name ## _free(struct fifo_q__ ## name *q) { \
memset(q, 0, sizeof(struct fifo_q__ ## name) + (q->s * sizeof(type *))); \
enif_free(q); \
} \
static inline type *fifo_q_ ## name ## _put(struct fifo_q__ ## name *q, type *n) { \
q->items[q->h] = n; \
q->h = (q->h + 1) % q->s; \
return n; \
} \
static inline type *fifo_q_ ## name ## _get(struct fifo_q__ ## name *q) { \
type *n = q->items[q->t]; \
q->items[q->t] = 0; \
q->t = (q->t + 1) % q->s; \
return n; \
} \
static inline unsigned int fifo_q_ ## name ## _size(struct fifo_q__ ## name *q) { \
return (q->h - q->t + q->s) % q->s; \
} \
static inline unsigned int fifo_q_ ## name ## _capacity(struct fifo_q__ ## name *q) { \
return q->s - 1; \
} \
static inline int fifo_q_ ## name ## _empty(struct fifo_q__ ## name *q) { \
return (q->t == q->h); \
} \
static inline int fifo_q_ ## name ## _full(struct fifo_q__ ## name *q) { \
return ((q->h + 1) % q->s) == q->t; \
}
#define fifo_q_new(name, size) fifo_q_ ## name ## _new(size)
#define fifo_q_free(name, queue) fifo_q_ ## name ## _free(queue)
#define fifo_q_get(name, queue) fifo_q_ ## name ## _get(queue)
#define fifo_q_put(name, queue, item) fifo_q_ ## name ## _put(queue, item)
#define fifo_q_size(name, queue) fifo_q_ ## name ## _size(queue)
#define fifo_q_capacity(name, queue) fifo_q_ ## name ## _capacity(queue)
#define fifo_q_empty(name, queue) fifo_q_ ## name ## _empty(queue)
#define fifo_q_full(name, queue) fifo_q_ ## name ## _full(queue)
#define fifo_q_foreach(name, queue, item, task) do { \
while(!fifo_q_ ## name ## _empty(queue)) { \
item = fifo_q_ ## name ## _get(queue); \
do task while(0); \
} \
} while(0);
struct async_nif_req_entry {
ERL_NIF_TERM ref;
ErlNifEnv *env;
ErlNifPid pid;
void *args;
void (*fn_work)(ErlNifEnv*, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *);
void (*fn_post)(void *);
};
#if defined(__cplusplus)
}
#endif
#endif // __FIFO_Q_H__

643
c_src/khash.h Normal file
View file

@ -0,0 +1,643 @@
/* The MIT License
Copyright (c) 2008, 2009, 2011 by Attractive Chaos <attractor@live.co.uk>
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:
The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
/*
An example:
#include "khash.h"
KHASH_MAP_INIT_INT(32, char)
int main() {
int ret, is_missing;
khiter_t k;
khash_t(32) *h = kh_init(32);
k = kh_put(32, h, 5, &ret);
kh_value(h, k) = 10;
k = kh_get(32, h, 10);
is_missing = (k == kh_end(h));
k = kh_get(32, h, 5);
kh_del(32, h, k);
for (k = kh_begin(h); k != kh_end(h); ++k)
if (kh_exist(h, k)) kh_value(h, k) = 1;
kh_destroy(32, h);
return 0;
}
*/
/*
2011-12-29 (0.2.7):
* Minor code clean up; no actual effect.
2011-09-16 (0.2.6):
* The capacity is a power of 2. This seems to dramatically improve the
speed for simple keys. Thank Zilong Tan for the suggestion. Reference:
- http://code.google.com/p/ulib/
- http://nothings.org/computer/judy/
* Allow to optionally use linear probing which usually has better
performance for random input. Double hashing is still the default as it
is more robust to certain non-random input.
* Added Wang's integer hash function (not used by default). This hash
function is more robust to certain non-random input.
2011-02-14 (0.2.5):
* Allow to declare global functions.
2009-09-26 (0.2.4):
* Improve portability
2008-09-19 (0.2.3):
* Corrected the example
* Improved interfaces
2008-09-11 (0.2.2):
* Improved speed a little in kh_put()
2008-09-10 (0.2.1):
* Added kh_clear()
* Fixed a compiling error
2008-09-02 (0.2.0):
* Changed to token concatenation which increases flexibility.
2008-08-31 (0.1.2):
* Fixed a bug in kh_get(), which has not been tested previously.
2008-08-31 (0.1.1):
* Added destructor
*/
#ifndef __AC_KHASH_H
#define __AC_KHASH_H
/*!
@header
Generic hash table library.
*/
#define AC_VERSION_KHASH_H "0.2.6"
#include <stdlib.h>
#include <string.h>
#include <limits.h>
/* compiler specific configuration */
#if UINT_MAX == 0xffffffffu
typedef unsigned int khint32_t;
#elif ULONG_MAX == 0xffffffffu
typedef unsigned long khint32_t;
#endif
#if ULONG_MAX == ULLONG_MAX
typedef unsigned long khint64_t;
#else
typedef unsigned long long khint64_t;
#endif
#ifdef _MSC_VER
#define kh_inline __inline
#else
#define kh_inline inline
#endif
typedef khint32_t khint_t;
typedef khint_t khiter_t;
#define __ac_isempty(flag, i) ((flag[i>>4]>>((i&0xfU)<<1))&2)
#define __ac_isdel(flag, i) ((flag[i>>4]>>((i&0xfU)<<1))&1)
#define __ac_iseither(flag, i) ((flag[i>>4]>>((i&0xfU)<<1))&3)
#define __ac_set_isdel_false(flag, i) (flag[i>>4]&=~(1ul<<((i&0xfU)<<1)))
#define __ac_set_isempty_false(flag, i) (flag[i>>4]&=~(2ul<<((i&0xfU)<<1)))
#define __ac_set_isboth_false(flag, i) (flag[i>>4]&=~(3ul<<((i&0xfU)<<1)))
#define __ac_set_isdel_true(flag, i) (flag[i>>4]|=1ul<<((i&0xfU)<<1))
#ifdef KHASH_LINEAR
#define __ac_inc(k, m) 1
#else
#define __ac_inc(k, m) (((k)>>3 ^ (k)<<3) | 1) & (m)
#endif
#define __ac_fsize(m) ((m) < 16? 1 : (m)>>4)
#ifndef kroundup32
#define kroundup32(x) (--(x), (x)|=(x)>>1, (x)|=(x)>>2, (x)|=(x)>>4, (x)|=(x)>>8, (x)|=(x)>>16, ++(x))
#endif
#ifndef kcalloc
#define kcalloc(N,Z) calloc(N,Z)
#endif
#ifndef kmalloc
#define kmalloc(Z) malloc(Z)
#endif
#ifndef krealloc
#define krealloc(P,Z) realloc(P,Z)
#endif
#ifndef kfree
#define kfree(P) free(P)
#endif
static const double __ac_HASH_UPPER = 0.77;
#define __KHASH_TYPE(name, khkey_t, khval_t) \
typedef struct { \
khint_t n_buckets, size, n_occupied, upper_bound; \
khint32_t *flags; \
khkey_t *keys; \
khval_t *vals; \
} kh_##name##_t;
#define __KHASH_PROTOTYPES(name, khkey_t, khval_t) \
extern kh_##name##_t *kh_init_##name(void); \
extern void kh_destroy_##name(kh_##name##_t *h); \
extern void kh_clear_##name(kh_##name##_t *h); \
extern khint_t kh_get_##name(const kh_##name##_t *h, khkey_t key); \
extern int kh_resize_##name(kh_##name##_t *h, khint_t new_n_buckets); \
extern khint_t kh_put_##name(kh_##name##_t *h, khkey_t key, int *ret); \
extern void kh_del_##name(kh_##name##_t *h, khint_t x);
#define __KHASH_IMPL(name, SCOPE, khkey_t, khval_t, kh_is_map, __hash_func, __hash_equal) \
SCOPE kh_##name##_t *kh_init_##name(void) { \
return (kh_##name##_t*)kcalloc(1, sizeof(kh_##name##_t)); \
} \
SCOPE void kh_destroy_##name(kh_##name##_t *h) \
{ \
if (h) { \
kfree((void *)h->keys); kfree(h->flags); \
kfree((void *)h->vals); \
kfree(h); \
} \
} \
SCOPE void kh_clear_##name(kh_##name##_t *h) \
{ \
if (h && h->flags) { \
memset(h->flags, 0xaa, __ac_fsize(h->n_buckets) * sizeof(khint32_t)); \
h->size = h->n_occupied = 0; \
} \
} \
SCOPE khint_t kh_get_##name(const kh_##name##_t *h, khkey_t key) \
{ \
if (h->n_buckets) { \
khint_t inc, k, i, last, mask; \
mask = h->n_buckets - 1; \
k = __hash_func(key); i = k & mask; \
inc = __ac_inc(k, mask); last = i; /* inc==1 for linear probing */ \
while (!__ac_isempty(h->flags, i) && (__ac_isdel(h->flags, i) || !__hash_equal(h->keys[i], key))) { \
i = (i + inc) & mask; \
if (i == last) return h->n_buckets; \
} \
return __ac_iseither(h->flags, i)? h->n_buckets : i; \
} else return 0; \
} \
SCOPE int kh_resize_##name(kh_##name##_t *h, khint_t new_n_buckets) \
{ /* This function uses 0.25*n_buckets bytes of working space instead of [sizeof(key_t+val_t)+.25]*n_buckets. */ \
khint32_t *new_flags = 0; \
khint_t j = 1; \
{ \
kroundup32(new_n_buckets); \
if (new_n_buckets < 4) new_n_buckets = 4; \
if (h->size >= (khint_t)(new_n_buckets * __ac_HASH_UPPER + 0.5)) j = 0; /* requested size is too small */ \
else { /* hash table size to be changed (shrink or expand); rehash */ \
new_flags = (khint32_t*)kmalloc(__ac_fsize(new_n_buckets) * sizeof(khint32_t)); \
if (!new_flags) return -1; \
memset(new_flags, 0xaa, __ac_fsize(new_n_buckets) * sizeof(khint32_t)); \
if (h->n_buckets < new_n_buckets) { /* expand */ \
khkey_t *new_keys = (khkey_t*)krealloc((void *)h->keys, new_n_buckets * sizeof(khkey_t)); \
if (!new_keys) return -1; \
h->keys = new_keys; \
if (kh_is_map) { \
khval_t *new_vals = (khval_t*)krealloc((void *)h->vals, new_n_buckets * sizeof(khval_t)); \
if (!new_vals) return -1; \
h->vals = new_vals; \
} \
} /* otherwise shrink */ \
} \
} \
if (j) { /* rehashing is needed */ \
for (j = 0; j != h->n_buckets; ++j) { \
if (__ac_iseither(h->flags, j) == 0) { \
khkey_t key = h->keys[j]; \
khval_t val; \
khint_t new_mask; \
new_mask = new_n_buckets - 1; \
if (kh_is_map) val = h->vals[j]; \
__ac_set_isdel_true(h->flags, j); \
while (1) { /* kick-out process; sort of like in Cuckoo hashing */ \
khint_t inc, k, i; \
k = __hash_func(key); \
i = k & new_mask; \
inc = __ac_inc(k, new_mask); \
while (!__ac_isempty(new_flags, i)) i = (i + inc) & new_mask; \
__ac_set_isempty_false(new_flags, i); \
if (i < h->n_buckets && __ac_iseither(h->flags, i) == 0) { /* kick out the existing element */ \
{ khkey_t tmp = h->keys[i]; h->keys[i] = key; key = tmp; } \
if (kh_is_map) { khval_t tmp = h->vals[i]; h->vals[i] = val; val = tmp; } \
__ac_set_isdel_true(h->flags, i); /* mark it as deleted in the old hash table */ \
} else { /* write the element and jump out of the loop */ \
h->keys[i] = key; \
if (kh_is_map) h->vals[i] = val; \
break; \
} \
} \
} \
} \
if (h->n_buckets > new_n_buckets) { /* shrink the hash table */ \
h->keys = (khkey_t*)krealloc((void *)h->keys, new_n_buckets * sizeof(khkey_t)); \
if (kh_is_map) h->vals = (khval_t*)krealloc((void *)h->vals, new_n_buckets * sizeof(khval_t)); \
} \
kfree(h->flags); /* free the working space */ \
h->flags = new_flags; \
h->n_buckets = new_n_buckets; \
h->n_occupied = h->size; \
h->upper_bound = (khint_t)(h->n_buckets * __ac_HASH_UPPER + 0.5); \
} \
return 0; \
} \
SCOPE khint_t kh_put_##name(kh_##name##_t *h, khkey_t key, int *ret) \
{ \
khint_t x; \
if (h->n_occupied >= h->upper_bound) { /* update the hash table */ \
if (h->n_buckets > (h->size<<1)) { \
if (kh_resize_##name(h, h->n_buckets - 1) < 0) { /* clear "deleted" elements */ \
*ret = -1; return h->n_buckets; \
} \
} else if (kh_resize_##name(h, h->n_buckets + 1) < 0) { /* expand the hash table */ \
*ret = -1; return h->n_buckets; \
} \
} /* TODO: to implement automatically shrinking; resize() already support shrinking */ \
{ \
khint_t inc, k, i, site, last, mask = h->n_buckets - 1; \
x = site = h->n_buckets; k = __hash_func(key); i = k & mask; \
if (__ac_isempty(h->flags, i)) x = i; /* for speed up */ \
else { \
inc = __ac_inc(k, mask); last = i; \
while (!__ac_isempty(h->flags, i) && (__ac_isdel(h->flags, i) || !__hash_equal(h->keys[i], key))) { \
if (__ac_isdel(h->flags, i)) site = i; \
i = (i + inc) & mask; \
if (i == last) { x = site; break; } \
} \
if (x == h->n_buckets) { \
if (__ac_isempty(h->flags, i) && site != h->n_buckets) x = site; \
else x = i; \
} \
} \
} \
if (__ac_isempty(h->flags, x)) { /* not present at all */ \
h->keys[x] = key; \
__ac_set_isboth_false(h->flags, x); \
++h->size; ++h->n_occupied; \
*ret = 1; \
} else if (__ac_isdel(h->flags, x)) { /* deleted */ \
h->keys[x] = key; \
__ac_set_isboth_false(h->flags, x); \
++h->size; \
*ret = 2; \
} else *ret = 0; /* Don't touch h->keys[x] if present and not deleted */ \
return x; \
} \
SCOPE void kh_del_##name(kh_##name##_t *h, khint_t x) \
{ \
if (x != h->n_buckets && !__ac_iseither(h->flags, x)) { \
__ac_set_isdel_true(h->flags, x); \
--h->size; \
} \
}
#define KHASH_DECLARE(name, khkey_t, khval_t) \
__KHASH_TYPE(name, khkey_t, khval_t) \
__KHASH_PROTOTYPES(name, khkey_t, khval_t)
#define KHASH_INIT2(name, SCOPE, khkey_t, khval_t, kh_is_map, __hash_func, __hash_equal) \
__KHASH_TYPE(name, khkey_t, khval_t) \
__KHASH_IMPL(name, SCOPE, khkey_t, khval_t, kh_is_map, __hash_func, __hash_equal)
#define KHASH_INIT(name, khkey_t, khval_t, kh_is_map, __hash_func, __hash_equal) \
KHASH_INIT2(name, static kh_inline, khkey_t, khval_t, kh_is_map, __hash_func, __hash_equal)
/* --- BEGIN OF HASH FUNCTIONS --- */
/*! @function
@abstract Integer hash function
@param key The integer [khint32_t]
@return The hash value [khint_t]
*/
#define kh_int_hash_func(key) (khint32_t)(key)
/*! @function
@abstract Integer comparison function
*/
#define kh_int_hash_equal(a, b) ((a) == (b))
/*! @function
@abstract 64-bit integer hash function
@param key The integer [khint64_t]
@return The hash value [khint_t]
*/
#define kh_int64_hash_func(key) (khint32_t)((key)>>33^(key)^(key)<<11)
/*! @function
@abstract 64-bit integer comparison function
*/
#define kh_int64_hash_equal(a, b) ((a) == (b))
/*! @function
@abstract Pointer hash function
@param key The integer void *
@return The hash value [khint_t]
*/
#define kh_ptr_hash_func(key) (khint32_t)(key)
/*! @function
@abstract Pointer comparison function
*/
#define kh_ptr_hash_equal(a, b) ((a) == (b))
/*! @function
@abstract 64-bit pointer hash function
@param key The integer void *
@return The hash value [khint_t]
*/
#define kh_ptr64_hash_func(key) (khint32_t)(((khint64_t)key)>>33^((khint64_t)key)^((khint64_t)key)<<11)
/*! @function
@abstract 64-bit pointer comparison function
*/
#define kh_ptr64_hash_equal(a, b) ((a) == (b))
/*! @function
@abstract const char* hash function
@param s Pointer to a null terminated string
@return The hash value
*/
static kh_inline khint_t __ac_X31_hash_string(const char *s)
{
khint_t h = (khint_t)*s;
if (h) for (++s ; *s; ++s) h = (h << 5) - h + (khint_t)*s;
return h;
}
/*! @function
@abstract Another interface to const char* hash function
@param key Pointer to a null terminated string [const char*]
@return The hash value [khint_t]
*/
#define kh_str_hash_func(key) __ac_X31_hash_string(key)
/*! @function
@abstract Const char* comparison function
*/
#define kh_str_hash_equal(a, b) (strcmp(a, b) == 0)
static kh_inline khint_t __ac_Wang_hash(khint_t key)
{
key += ~(key << 15);
key ^= (key >> 10);
key += (key << 3);
key ^= (key >> 6);
key += ~(key << 11);
key ^= (key >> 16);
return key;
}
#define kh_int_hash_func2(k) __ac_Wang_hash((khint_t)key)
/* --- END OF HASH FUNCTIONS --- */
/* Other convenient macros... */
/*!
@abstract Type of the hash table.
@param name Name of the hash table [symbol]
*/
#define khash_t(name) kh_##name##_t
/*! @function
@abstract Initiate a hash table.
@param name Name of the hash table [symbol]
@return Pointer to the hash table [khash_t(name)*]
*/
#define kh_init(name) kh_init_##name()
/*! @function
@abstract Destroy a hash table.
@param name Name of the hash table [symbol]
@param h Pointer to the hash table [khash_t(name)*]
*/
#define kh_destroy(name, h) kh_destroy_##name(h)
/*! @function
@abstract Reset a hash table without deallocating memory.
@param name Name of the hash table [symbol]
@param h Pointer to the hash table [khash_t(name)*]
*/
#define kh_clear(name, h) kh_clear_##name(h)
/*! @function
@abstract Resize a hash table.
@param name Name of the hash table [symbol]
@param h Pointer to the hash table [khash_t(name)*]
@param s New size [khint_t]
*/
#define kh_resize(name, h, s) kh_resize_##name(h, s)
/*! @function
@abstract Insert a key to the hash table.
@param name Name of the hash table [symbol]
@param h Pointer to the hash table [khash_t(name)*]
@param k Key [type of keys]
@param r Extra return code: 0 if the key is present in the hash table;
1 if the bucket is empty (never used); 2 if the element in
the bucket has been deleted [int*]
@return Iterator to the inserted element [khint_t]
*/
#define kh_put(name, h, k, r) kh_put_##name(h, k, r)
/*! @function
@abstract Retrieve a key from the hash table.
@param name Name of the hash table [symbol]
@param h Pointer to the hash table [khash_t(name)*]
@param k Key [type of keys]
@return Iterator to the found element, or kh_end(h) if the element is absent [khint_t]
*/
#define kh_get(name, h, k) kh_get_##name(h, k)
/*! @function
@abstract Remove a key from the hash table.
@param name Name of the hash table [symbol]
@param h Pointer to the hash table [khash_t(name)*]
@param k Iterator to the element to be deleted [khint_t]
*/
#define kh_del(name, h, k) kh_del_##name(h, k)
/*! @function
@abstract Test whether a bucket contains data.
@param h Pointer to the hash table [khash_t(name)*]
@param x Iterator to the bucket [khint_t]
@return 1 if containing data; 0 otherwise [int]
*/
#define kh_exist(h, x) (!__ac_iseither((h)->flags, (x)))
/*! @function
@abstract Get key given an iterator
@param h Pointer to the hash table [khash_t(name)*]
@param x Iterator to the bucket [khint_t]
@return Key [type of keys]
*/
#define kh_key(h, x) ((h)->keys[x])
/*! @function
@abstract Get value given an iterator
@param h Pointer to the hash table [khash_t(name)*]
@param x Iterator to the bucket [khint_t]
@return Value [type of values]
@discussion For hash sets, calling this results in segfault.
*/
#define kh_val(h, x) ((h)->vals[x])
/*! @function
@abstract Alias of kh_val()
*/
#define kh_value(h, x) ((h)->vals[x])
/*! @function
@abstract Get the start iterator
@param h Pointer to the hash table [khash_t(name)*]
@return The start iterator [khint_t]
*/
#define kh_begin(h) (khint_t)(0)
/*! @function
@abstract Get the end iterator
@param h Pointer to the hash table [khash_t(name)*]
@return The end iterator [khint_t]
*/
#define kh_end(h) ((h)->n_buckets)
/*! @function
@abstract Get the number of elements in the hash table
@param h Pointer to the hash table [khash_t(name)*]
@return Number of elements in the hash table [khint_t]
*/
#define kh_size(h) ((h)->size)
/*! @function
@abstract Get the number of buckets in the hash table
@param h Pointer to the hash table [khash_t(name)*]
@return Number of buckets in the hash table [khint_t]
*/
#define kh_n_buckets(h) ((h)->n_buckets)
/*! @function
@abstract Iterate over the entries in the hash table
@param h Pointer to the hash table [khash_t(name)*]
@param kvar Variable to which key will be assigned
@param vvar Variable to which value will be assigned
@param code Block of code to execute
*/
#define kh_foreach(h, kvar, vvar, code) { khint_t __i; \
for (__i = kh_begin(h); __i != kh_end(h); ++__i) { \
if (!kh_exist(h,__i)) continue; \
(kvar) = kh_key(h,__i); \
(vvar) = kh_val(h,__i); \
code; \
} }
/*! @function
@abstract Iterate over the values in the hash table
@param h Pointer to the hash table [khash_t(name)*]
@param vvar Variable to which value will be assigned
@param code Block of code to execute
*/
#define kh_foreach_value(h, vvar, code) { khint_t __i; \
for (__i = kh_begin(h); __i != kh_end(h); ++__i) { \
if (!kh_exist(h,__i)) continue; \
(vvar) = kh_val(h,__i); \
code; \
} }
/* More conenient interfaces */
/*! @function
@abstract Instantiate a hash map containing (void *) keys
@param name Name of the hash table [symbol]
@param khval_t Type of values [type]
*/
#ifdef __x86_64__
#define KHASH_MAP_INIT_PTR(name, khval_t) \
KHASH_INIT(name, void*, khval_t, 1, kh_ptr64_hash_func, kh_ptr64_hash_equal)
#else
#define KHASH_MAP_INIT_PTR(name, khval_t) \
KHASH_INIT(name, void*, khval_t, 1, kh_ptr_hash_func, kh_ptr_hash_equal)
#endif
/*! @function
@abstract Instantiate a hash set containing integer keys
@param name Name of the hash table [symbol]
*/
#define KHASH_SET_INIT_INT(name) \
KHASH_INIT(name, khint32_t, char, 0, kh_int_hash_func, kh_int_hash_equal)
/*! @function
@abstract Instantiate a hash map containing integer keys
@param name Name of the hash table [symbol]
@param khval_t Type of values [type]
*/
#define KHASH_MAP_INIT_INT(name, khval_t) \
KHASH_INIT(name, khint32_t, khval_t, 1, kh_int_hash_func, kh_int_hash_equal)
/*! @function
@abstract Instantiate a hash map containing 64-bit integer keys
@param name Name of the hash table [symbol]
*/
#define KHASH_SET_INIT_INT64(name) \
KHASH_INIT(name, khint64_t, char, 0, kh_int64_hash_func, kh_int64_hash_equal)
/*! @function
@abstract Instantiate a hash map containing 64-bit integer keys
@param name Name of the hash table [symbol]
@param khval_t Type of values [type]
*/
#define KHASH_MAP_INIT_INT64(name, khval_t) \
KHASH_INIT(name, khint64_t, khval_t, 1, kh_int64_hash_func, kh_int64_hash_equal)
typedef const char *kh_cstr_t;
/*! @function
@abstract Instantiate a hash map containing const char* keys
@param name Name of the hash table [symbol]
*/
#define KHASH_SET_INIT_STR(name) \
KHASH_INIT(name, kh_cstr_t, char, 0, kh_str_hash_func, kh_str_hash_equal)
/*! @function
@abstract Instantiate a hash map containing const char* keys
@param name Name of the hash table [symbol]
@param khval_t Type of values [type]
*/
#define KHASH_MAP_INIT_STR(name, khval_t) \
KHASH_INIT(name, kh_cstr_t, khval_t, 1, kh_str_hash_func, kh_str_hash_equal)
#endif /* __AC_KHASH_H */

View file

@ -1,678 +0,0 @@
/*
* Copyright (c) 1991, 1993
* The Regents of the University of California. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* 4. Neither the name of the University nor the names of its contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
* @(#)queue.h 8.5 (Berkeley) 8/20/94
* $FreeBSD: src/sys/sys/queue.h,v 1.54 2002/08/05 05:18:43 alfred Exp $
*/
#ifndef _DB_QUEUE_H_
#define _DB_QUEUE_H_
#ifndef __offsetof
#define __offsetof(st, m) \
((size_t) ( (char *)&((st *)0)->m - (char *)0 ))
#endif
#ifndef __containerof
#define __containerof(ptr, type, member) ({ \
const typeof( ((type *)0)->member ) *__mptr = (ptr); \
(type *)( (char *)__mptr - __offsetof(type,member) );})
#endif
#if defined(__cplusplus)
extern "C" {
#endif
/*
* This file defines four types of data structures: singly-linked lists,
* singly-linked tail queues, lists and tail queues.
*
* A singly-linked list is headed by a single forward pointer. The elements
* are singly linked for minimum space and pointer manipulation overhead at
* the expense of O(n) removal for arbitrary elements. New elements can be
* added to the list after an existing element or at the head of the list.
* Elements being removed from the head of the list should use the explicit
* macro for this purpose for optimum efficiency. A singly-linked list may
* only be traversed in the forward direction. Singly-linked lists are ideal
* for applications with large datasets and few or no removals or for
* implementing a LIFO queue.
*
* A singly-linked tail queue is headed by a pair of pointers, one to the
* head of the list and the other to the tail of the list. The elements are
* singly linked for minimum space and pointer manipulation overhead at the
* expense of O(n) removal for arbitrary elements. New elements can be added
* to the list after an existing element, at the head of the list, or at the
* end of the list. Elements being removed from the head of the tail queue
* should use the explicit macro for this purpose for optimum efficiency.
* A singly-linked tail queue may only be traversed in the forward direction.
* Singly-linked tail queues are ideal for applications with large datasets
* and few or no removals or for implementing a FIFO queue.
*
* A list is headed by a single forward pointer (or an array of forward
* pointers for a hash table header). The elements are doubly linked
* so that an arbitrary element can be removed without a need to
* traverse the list. New elements can be added to the list before
* or after an existing element or at the head of the list. A list
* may only be traversed in the forward direction.
*
* A tail queue is headed by a pair of pointers, one to the head of the
* list and the other to the tail of the list. The elements are doubly
* linked so that an arbitrary element can be removed without a need to
* traverse the list. New elements can be added to the list before or
* after an existing element, at the head of the list, or at the end of
* the list. A tail queue may be traversed in either direction.
*
* For details on the use of these macros, see the queue(3) manual page.
*
*
* SLIST LIST STAILQ TAILQ
* _HEAD + + + +
* _HEAD_INITIALIZER + + + +
* _ENTRY + + + +
* _INIT + + + +
* _EMPTY + + + +
* _FIRST + + + +
* _NEXT + + + +
* _PREV - - - +
* _LAST - - + +
* _FOREACH + + + +
* _FOREACH_REVERSE - - - +
* _INSERT_HEAD + + + +
* _INSERT_BEFORE - + - +
* _INSERT_AFTER + + + +
* _INSERT_TAIL - - + +
* _CONCAT - - + +
* _REMOVE_HEAD + - + -
* _REMOVE + + + +
*
*/
/*
* XXX
* We #undef all of the macros because there are incompatible versions of this
* file and these macros on various systems. What makes the problem worse is
* they are included and/or defined by system include files which we may have
* already loaded into Berkeley DB before getting here. For example, FreeBSD's
* <rpc/rpc.h> includes its system <sys/queue.h>, and VxWorks UnixLib.h defines
* several of the LIST_XXX macros. Visual C.NET 7.0 also defines some of these
* same macros in Vc7\PlatformSDK\Include\WinNT.h. Make sure we use ours.
*/
#undef LIST_EMPTY
#undef LIST_ENTRY
#undef LIST_FIRST
#undef LIST_FOREACH
#undef LIST_HEAD
#undef LIST_HEAD_INITIALIZER
#undef LIST_INIT
#undef LIST_INSERT_AFTER
#undef LIST_INSERT_BEFORE
#undef LIST_INSERT_HEAD
#undef LIST_NEXT
#undef LIST_REMOVE
#undef QMD_TRACE_ELEM
#undef QMD_TRACE_HEAD
#undef QUEUE_MACRO_DEBUG
#undef SLIST_EMPTY
#undef SLIST_ENTRY
#undef SLIST_FIRST
#undef SLIST_FOREACH
#undef SLIST_FOREACH_PREVPTR
#undef SLIST_HEAD
#undef SLIST_HEAD_INITIALIZER
#undef SLIST_INIT
#undef SLIST_INSERT_AFTER
#undef SLIST_INSERT_HEAD
#undef SLIST_NEXT
#undef SLIST_REMOVE
#undef SLIST_REMOVE_HEAD
#undef STAILQ_CONCAT
#undef STAILQ_EMPTY
#undef STAILQ_ENTRY
#undef STAILQ_FIRST
#undef STAILQ_FOREACH
#undef STAILQ_HEAD
#undef STAILQ_HEAD_INITIALIZER
#undef STAILQ_INIT
#undef STAILQ_INSERT_AFTER
#undef STAILQ_INSERT_HEAD
#undef STAILQ_INSERT_TAIL
#undef STAILQ_LAST
#undef STAILQ_NEXT
#undef STAILQ_REMOVE
#undef STAILQ_REMOVE_HEAD
#undef STAILQ_REMOVE_HEAD_UNTIL
#undef TAILQ_CONCAT
#undef TAILQ_EMPTY
#undef TAILQ_ENTRY
#undef TAILQ_FIRST
#undef TAILQ_FOREACH
#undef TAILQ_FOREACH_REVERSE
#undef TAILQ_HEAD
#undef TAILQ_HEAD_INITIALIZER
#undef TAILQ_INIT
#undef TAILQ_INSERT_AFTER
#undef TAILQ_INSERT_BEFORE
#undef TAILQ_INSERT_HEAD
#undef TAILQ_INSERT_TAIL
#undef TAILQ_LAST
#undef TAILQ_NEXT
#undef TAILQ_PREV
#undef TAILQ_REMOVE
#undef TRACEBUF
#undef TRASHIT
#define QUEUE_MACRO_DEBUG 0
#if QUEUE_MACRO_DEBUG
/* Store the last 2 places the queue element or head was altered */
struct qm_trace {
char * lastfile;
int lastline;
char * prevfile;
int prevline;
};
#define TRACEBUF struct qm_trace trace;
#define TRASHIT(x) do {(x) = (void *)-1;} while (0)
#define QMD_TRACE_HEAD(head) do { \
(head)->trace.prevline = (head)->trace.lastline; \
(head)->trace.prevfile = (head)->trace.lastfile; \
(head)->trace.lastline = __LINE__; \
(head)->trace.lastfile = __FILE__; \
} while (0)
#define QMD_TRACE_ELEM(elem) do { \
(elem)->trace.prevline = (elem)->trace.lastline; \
(elem)->trace.prevfile = (elem)->trace.lastfile; \
(elem)->trace.lastline = __LINE__; \
(elem)->trace.lastfile = __FILE__; \
} while (0)
#else
#define QMD_TRACE_ELEM(elem)
#define QMD_TRACE_HEAD(head)
#define TRACEBUF
#define TRASHIT(x)
#endif /* QUEUE_MACRO_DEBUG */
/*
* Singly-linked List declarations.
*/
#define SLIST_HEAD(name, type) \
struct name { \
struct type *slh_first; /* first element */ \
}
#define SLIST_HEAD_INITIALIZER(head) \
{ NULL }
#define SLIST_ENTRY(type) \
struct { \
struct type *sle_next; /* next element */ \
}
/*
* Singly-linked List functions.
*/
#define SLIST_EMPTY(head) ((head)->slh_first == NULL)
#define SLIST_FIRST(head) ((head)->slh_first)
#define SLIST_FOREACH(var, head, field) \
for ((var) = SLIST_FIRST((head)); \
(var); \
(var) = SLIST_NEXT((var), field))
#define SLIST_FOREACH_PREVPTR(var, varp, head, field) \
for ((varp) = &SLIST_FIRST((head)); \
((var) = *(varp)) != NULL; \
(varp) = &SLIST_NEXT((var), field))
#define SLIST_INIT(head) do { \
SLIST_FIRST((head)) = NULL; \
} while (0)
#define SLIST_INSERT_AFTER(slistelm, elm, field) do { \
SLIST_NEXT((elm), field) = SLIST_NEXT((slistelm), field); \
SLIST_NEXT((slistelm), field) = (elm); \
} while (0)
#define SLIST_INSERT_HEAD(head, elm, field) do { \
SLIST_NEXT((elm), field) = SLIST_FIRST((head)); \
SLIST_FIRST((head)) = (elm); \
} while (0)
#define SLIST_NEXT(elm, field) ((elm)->field.sle_next)
#define SLIST_REMOVE(head, elm, type, field) do { \
if (SLIST_FIRST((head)) == (elm)) { \
SLIST_REMOVE_HEAD((head), field); \
} \
else { \
struct type *curelm = SLIST_FIRST((head)); \
while (SLIST_NEXT(curelm, field) != (elm)) \
curelm = SLIST_NEXT(curelm, field); \
SLIST_NEXT(curelm, field) = \
SLIST_NEXT(SLIST_NEXT(curelm, field), field); \
} \
} while (0)
#define SLIST_REMOVE_HEAD(head, field) do { \
SLIST_FIRST((head)) = SLIST_NEXT(SLIST_FIRST((head)), field); \
} while (0)
/*
* Singly-linked Tail queue declarations.
*/
#define STAILQ_HEAD(name, type) \
struct name { \
struct type *stqh_first;/* first element */ \
struct type **stqh_last;/* addr of last next element */ \
}
#define STAILQ_HEAD_INITIALIZER(head) \
{ NULL, &(head).stqh_first }
#define STAILQ_ENTRY(type) \
struct { \
struct type *stqe_next; /* next element */ \
}
/*
* Singly-linked Tail queue functions.
*/
#define STAILQ_CONCAT(head1, head2) do { \
if (!STAILQ_EMPTY((head2))) { \
*(head1)->stqh_last = (head2)->stqh_first; \
(head1)->stqh_last = (head2)->stqh_last; \
STAILQ_INIT((head2)); \
} \
} while (0)
#define STAILQ_EMPTY(head) ((head)->stqh_first == NULL)
#define STAILQ_FIRST(head) ((head)->stqh_first)
#define STAILQ_FOREACH(var, head, field) \
for ((var) = STAILQ_FIRST((head)); \
(var); \
(var) = STAILQ_NEXT((var), field))
#define STAILQ_INIT(head) do { \
STAILQ_FIRST((head)) = NULL; \
(head)->stqh_last = &STAILQ_FIRST((head)); \
} while (0)
#define STAILQ_INSERT_AFTER(head, tqelm, elm, field) do { \
if ((STAILQ_NEXT((elm), field) = STAILQ_NEXT((tqelm), field)) == NULL)\
(head)->stqh_last = &STAILQ_NEXT((elm), field); \
STAILQ_NEXT((tqelm), field) = (elm); \
} while (0)
#define STAILQ_INSERT_HEAD(head, elm, field) do { \
if ((STAILQ_NEXT((elm), field) = STAILQ_FIRST((head))) == NULL) \
(head)->stqh_last = &STAILQ_NEXT((elm), field); \
STAILQ_FIRST((head)) = (elm); \
} while (0)
#define STAILQ_INSERT_TAIL(head, elm, field) do { \
STAILQ_NEXT((elm), field) = NULL; \
*(head)->stqh_last = (elm); \
(head)->stqh_last = &STAILQ_NEXT((elm), field); \
} while (0)
#define STAILQ_LAST(head, type, field) \
(STAILQ_EMPTY((head)) ? \
NULL : \
((struct type *) \
((char *)((head)->stqh_last) - __offsetof(struct type, field))))
#define STAILQ_NEXT(elm, field) ((elm)->field.stqe_next)
#define STAILQ_REMOVE(head, elm, type, field) do { \
if (STAILQ_FIRST((head)) == (elm)) { \
STAILQ_REMOVE_HEAD((head), field); \
} \
else { \
struct type *curelm = STAILQ_FIRST((head)); \
while (STAILQ_NEXT(curelm, field) != (elm)) \
curelm = STAILQ_NEXT(curelm, field); \
if ((STAILQ_NEXT(curelm, field) = \
STAILQ_NEXT(STAILQ_NEXT(curelm, field), field)) == NULL)\
(head)->stqh_last = &STAILQ_NEXT((curelm), field);\
} \
} while (0)
#define STAILQ_REMOVE_HEAD(head, field) do { \
if ((STAILQ_FIRST((head)) = \
STAILQ_NEXT(STAILQ_FIRST((head)), field)) == NULL) \
(head)->stqh_last = &STAILQ_FIRST((head)); \
} while (0)
#define STAILQ_REMOVE_HEAD_UNTIL(head, elm, field) do { \
if ((STAILQ_FIRST((head)) = STAILQ_NEXT((elm), field)) == NULL) \
(head)->stqh_last = &STAILQ_FIRST((head)); \
} while (0)
/*
* List declarations.
*/
#define LIST_HEAD(name, type) \
struct name { \
struct type *lh_first; /* first element */ \
}
#define LIST_HEAD_INITIALIZER(head) \
{ NULL }
#define LIST_ENTRY(type) \
struct { \
struct type *le_next; /* next element */ \
struct type **le_prev; /* address of previous next element */ \
}
/*
* List functions.
*/
#define LIST_EMPTY(head) ((head)->lh_first == NULL)
#define LIST_FIRST(head) ((head)->lh_first)
#define LIST_FOREACH(var, head, field) \
for ((var) = LIST_FIRST((head)); \
(var); \
(var) = LIST_NEXT((var), field))
#define LIST_INIT(head) do { \
LIST_FIRST((head)) = NULL; \
} while (0)
#define LIST_INSERT_AFTER(listelm, elm, field) do { \
if ((LIST_NEXT((elm), field) = LIST_NEXT((listelm), field)) != NULL)\
LIST_NEXT((listelm), field)->field.le_prev = \
&LIST_NEXT((elm), field); \
LIST_NEXT((listelm), field) = (elm); \
(elm)->field.le_prev = &LIST_NEXT((listelm), field); \
} while (0)
#define LIST_INSERT_BEFORE(listelm, elm, field) do { \
(elm)->field.le_prev = (listelm)->field.le_prev; \
LIST_NEXT((elm), field) = (listelm); \
*(listelm)->field.le_prev = (elm); \
(listelm)->field.le_prev = &LIST_NEXT((elm), field); \
} while (0)
#define LIST_INSERT_HEAD(head, elm, field) do { \
if ((LIST_NEXT((elm), field) = LIST_FIRST((head))) != NULL) \
LIST_FIRST((head))->field.le_prev = &LIST_NEXT((elm), field);\
LIST_FIRST((head)) = (elm); \
(elm)->field.le_prev = &LIST_FIRST((head)); \
} while (0)
#define LIST_NEXT(elm, field) ((elm)->field.le_next)
#define LIST_REMOVE(elm, field) do { \
if (LIST_NEXT((elm), field) != NULL) \
LIST_NEXT((elm), field)->field.le_prev = \
(elm)->field.le_prev; \
*(elm)->field.le_prev = LIST_NEXT((elm), field); \
} while (0)
/*
* Tail queue declarations.
*/
#define TAILQ_HEAD(name, type) \
struct name { \
struct type *tqh_first; /* first element */ \
struct type **tqh_last; /* addr of last next element */ \
TRACEBUF \
}
#define TAILQ_HEAD_INITIALIZER(head) \
{ NULL, &(head).tqh_first }
#define TAILQ_ENTRY(type) \
struct { \
struct type *tqe_next; /* next element */ \
struct type **tqe_prev; /* address of previous next element */ \
TRACEBUF \
}
/*
* Tail queue functions.
*/
#define TAILQ_CONCAT(head1, head2, field) do { \
if (!TAILQ_EMPTY(head2)) { \
*(head1)->tqh_last = (head2)->tqh_first; \
(head2)->tqh_first->field.tqe_prev = (head1)->tqh_last; \
(head1)->tqh_last = (head2)->tqh_last; \
TAILQ_INIT((head2)); \
QMD_TRACE_HEAD(head); \
QMD_TRACE_HEAD(head2); \
} \
} while (0)
#define TAILQ_EMPTY(head) ((head)->tqh_first == NULL)
#define TAILQ_FIRST(head) ((head)->tqh_first)
#define TAILQ_FOREACH(var, head, field) \
for ((var) = TAILQ_FIRST((head)); \
(var); \
(var) = TAILQ_NEXT((var), field))
#define TAILQ_FOREACH_REVERSE(var, head, headname, field) \
for ((var) = TAILQ_LAST((head), headname); \
(var); \
(var) = TAILQ_PREV((var), headname, field))
#define TAILQ_INIT(head) do { \
TAILQ_FIRST((head)) = NULL; \
(head)->tqh_last = &TAILQ_FIRST((head)); \
QMD_TRACE_HEAD(head); \
} while (0)
#define TAILQ_INSERT_AFTER(head, listelm, elm, field) do { \
if ((TAILQ_NEXT((elm), field) = TAILQ_NEXT((listelm), field)) != NULL)\
TAILQ_NEXT((elm), field)->field.tqe_prev = \
&TAILQ_NEXT((elm), field); \
else { \
(head)->tqh_last = &TAILQ_NEXT((elm), field); \
QMD_TRACE_HEAD(head); \
} \
TAILQ_NEXT((listelm), field) = (elm); \
(elm)->field.tqe_prev = &TAILQ_NEXT((listelm), field); \
QMD_TRACE_ELEM(&(elm)->field); \
QMD_TRACE_ELEM(&listelm->field); \
} while (0)
#define TAILQ_INSERT_BEFORE(listelm, elm, field) do { \
(elm)->field.tqe_prev = (listelm)->field.tqe_prev; \
TAILQ_NEXT((elm), field) = (listelm); \
*(listelm)->field.tqe_prev = (elm); \
(listelm)->field.tqe_prev = &TAILQ_NEXT((elm), field); \
QMD_TRACE_ELEM(&(elm)->field); \
QMD_TRACE_ELEM(&listelm->field); \
} while (0)
#define TAILQ_INSERT_HEAD(head, elm, field) do { \
if ((TAILQ_NEXT((elm), field) = TAILQ_FIRST((head))) != NULL) \
TAILQ_FIRST((head))->field.tqe_prev = \
&TAILQ_NEXT((elm), field); \
else \
(head)->tqh_last = &TAILQ_NEXT((elm), field); \
TAILQ_FIRST((head)) = (elm); \
(elm)->field.tqe_prev = &TAILQ_FIRST((head)); \
QMD_TRACE_HEAD(head); \
QMD_TRACE_ELEM(&(elm)->field); \
} while (0)
#define TAILQ_INSERT_TAIL(head, elm, field) do { \
TAILQ_NEXT((elm), field) = NULL; \
(elm)->field.tqe_prev = (head)->tqh_last; \
*(head)->tqh_last = (elm); \
(head)->tqh_last = &TAILQ_NEXT((elm), field); \
QMD_TRACE_HEAD(head); \
QMD_TRACE_ELEM(&(elm)->field); \
} while (0)
#define TAILQ_LAST(head, headname) \
(*(((struct headname *)((head)->tqh_last))->tqh_last))
#define TAILQ_NEXT(elm, field) ((elm)->field.tqe_next)
#define TAILQ_PREV(elm, headname, field) \
(*(((struct headname *)((elm)->field.tqe_prev))->tqh_last))
#define TAILQ_REMOVE(head, elm, field) do { \
if ((TAILQ_NEXT((elm), field)) != NULL) \
TAILQ_NEXT((elm), field)->field.tqe_prev = \
(elm)->field.tqe_prev; \
else { \
(head)->tqh_last = (elm)->field.tqe_prev; \
QMD_TRACE_HEAD(head); \
} \
*(elm)->field.tqe_prev = TAILQ_NEXT((elm), field); \
TRASHIT((elm)->field.tqe_next); \
TRASHIT((elm)->field.tqe_prev); \
QMD_TRACE_ELEM(&(elm)->field); \
} while (0)
/*
* Circular queue definitions.
*/
#define CIRCLEQ_HEAD(name, type) \
struct name { \
struct type *cqh_first; /* first element */ \
struct type *cqh_last; /* last element */ \
}
#define CIRCLEQ_HEAD_INITIALIZER(head) \
{ (void *)&head, (void *)&head }
#define CIRCLEQ_ENTRY(type) \
struct { \
struct type *cqe_next; /* next element */ \
struct type *cqe_prev; /* previous element */ \
}
/*
* Circular queue functions.
*/
#define CIRCLEQ_INIT(head) do { \
(head)->cqh_first = (void *)(head); \
(head)->cqh_last = (void *)(head); \
} while (/*CONSTCOND*/0)
#define CIRCLEQ_INSERT_AFTER(head, listelm, elm, field) do { \
(elm)->field.cqe_next = (listelm)->field.cqe_next; \
(elm)->field.cqe_prev = (listelm); \
if ((listelm)->field.cqe_next == (void *)(head)) \
(head)->cqh_last = (elm); \
else \
(listelm)->field.cqe_next->field.cqe_prev = (elm); \
(listelm)->field.cqe_next = (elm); \
} while (/*CONSTCOND*/0)
#define CIRCLEQ_INSERT_BEFORE(head, listelm, elm, field) do { \
(elm)->field.cqe_next = (listelm); \
(elm)->field.cqe_prev = (listelm)->field.cqe_prev; \
if ((listelm)->field.cqe_prev == (void *)(head)) \
(head)->cqh_first = (elm); \
else \
(listelm)->field.cqe_prev->field.cqe_next = (elm); \
(listelm)->field.cqe_prev = (elm); \
} while (/*CONSTCOND*/0)
#define CIRCLEQ_INSERT_HEAD(head, elm, field) do { \
(elm)->field.cqe_next = (head)->cqh_first; \
(elm)->field.cqe_prev = (void *)(head); \
if ((head)->cqh_last == (void *)(head)) \
(head)->cqh_last = (elm); \
else \
(head)->cqh_first->field.cqe_prev = (elm); \
(head)->cqh_first = (elm); \
} while (/*CONSTCOND*/0)
#define CIRCLEQ_INSERT_TAIL(head, elm, field) do { \
(elm)->field.cqe_next = (void *)(head); \
(elm)->field.cqe_prev = (head)->cqh_last; \
if ((head)->cqh_first == (void *)(head)) \
(head)->cqh_first = (elm); \
else \
(head)->cqh_last->field.cqe_next = (elm); \
(head)->cqh_last = (elm); \
} while (/*CONSTCOND*/0)
#define CIRCLEQ_REMOVE(head, elm, field) do { \
if ((elm)->field.cqe_next == (void *)(head)) \
(head)->cqh_last = (elm)->field.cqe_prev; \
else \
(elm)->field.cqe_next->field.cqe_prev = \
(elm)->field.cqe_prev; \
if ((elm)->field.cqe_prev == (void *)(head)) \
(head)->cqh_first = (elm)->field.cqe_next; \
else \
(elm)->field.cqe_prev->field.cqe_next = \
(elm)->field.cqe_next; \
} while (/*CONSTCOND*/0)
#define CIRCLEQ_FOREACH(var, head, field) \
for ((var) = ((head)->cqh_first); \
(var) != (const void *)(head); \
(var) = ((var)->field.cqe_next))
#define CIRCLEQ_FOREACH_REVERSE(var, head, field) \
for ((var) = ((head)->cqh_last); \
(var) != (const void *)(head); \
(var) = ((var)->field.cqe_prev))
/*
* Circular queue access methods.
*/
#define CIRCLEQ_EMPTY(head) ((head)->cqh_first == (void *)(head))
#define CIRCLEQ_FIRST(head) ((head)->cqh_first)
#define CIRCLEQ_LAST(head) ((head)->cqh_last)
#define CIRCLEQ_NEXT(elm, field) ((elm)->field.cqe_next)
#define CIRCLEQ_PREV(elm, field) ((elm)->field.cqe_prev)
#define CIRCLEQ_LOOP_NEXT(head, elm, field) \
(((elm)->field.cqe_next == (void *)(head)) \
? ((head)->cqh_first) \
: (elm->field.cqe_next))
#define CIRCLEQ_LOOP_PREV(head, elm, field) \
(((elm)->field.cqe_prev == (void *)(head)) \
? ((head)->cqh_last) \
: (elm->field.cqe_prev))
#if defined(__cplusplus)
}
#endif
#endif /* !_DB_QUEUE_H_ */

View file

@ -1,5 +1,17 @@
diff --git a/ext/compressors/bzip2/Makefile.am b/ext/compressors/bzip2/Makefile.am
index 0aedc2e..a70ae2e 100644
--- a/ext/compressors/bzip2/Makefile.am
+++ b/ext/compressors/bzip2/Makefile.am
@@ -2,5 +2,6 @@ AM_CPPFLAGS = -I$(top_builddir) -I$(top_srcdir)/src/include
lib_LTLIBRARIES = libwiredtiger_bzip2.la
libwiredtiger_bzip2_la_SOURCES = bzip2_compress.c
-libwiredtiger_bzip2_la_LDFLAGS = -avoid-version -module
+libwiredtiger_snappy_la_CFLAGS = -I$(src_builddir)/../../system/include
+libwiredtiger_snappy_la_LDFLAGS = -avoid-version -module -L$(src_builddir)/../../system/lib -Wl,-rpath,lib/wterl-0.9.0/priv:lib/wterl/priv:priv
libwiredtiger_bzip2_la_LIBADD = -lbz2
diff --git a/ext/compressors/snappy/Makefile.am b/ext/compressors/snappy/Makefile.am
index 6d78823..c423590 100644
index 6d78823..2122cf8 100644
--- a/ext/compressors/snappy/Makefile.am
+++ b/ext/compressors/snappy/Makefile.am
@@ -2,5 +2,6 @@ AM_CPPFLAGS = -I$(top_builddir) -I$(top_srcdir)/src/include
@ -7,6 +19,6 @@ index 6d78823..c423590 100644
lib_LTLIBRARIES = libwiredtiger_snappy.la
libwiredtiger_snappy_la_SOURCES = snappy_compress.c
-libwiredtiger_snappy_la_LDFLAGS = -avoid-version -module
+libwiredtiger_snappy_la_CFLAGS = -I$(abs_top_builddir)/../../system/include
+libwiredtiger_snappy_la_LDFLAGS = -avoid-version -module -L$(abs_top_builddir)/../../system/lib -Wl,-rpath,lib/wterl-0.9.0/priv:lib/wterl/priv:priv
+libwiredtiger_snappy_la_CFLAGS = -I$(src_builddir)/../../system/include
+libwiredtiger_snappy_la_LDFLAGS = -avoid-version -module -L$(src_builddir)/../../system/lib -Wl,-rpath,lib/wterl-0.9.0/priv:lib/wterl/priv:priv
libwiredtiger_snappy_la_LIBADD = -lsnappy

File diff suppressed because it is too large Load diff

View file

@ -1,6 +0,0 @@
%%%% This is the WiredTiger section
%% @doc wiredtiger data_root
{mapping, "wiredtiger.data_root", "wterl.data_root", [
{default, "{{platform_data_dir}}/wiredtiger"}
]}.

View file

@ -5,28 +5,26 @@
{cover_enabled, true}.
%{eunit_opts, [verbose, {report, {eunit_surefire, [{dir, "."}]}}]}.
{eunit_opts, [verbose, {report, {eunit_surefire, [{dir, "."}]}}]}.
{erl_opts, [
%native, {hipe, [o3,verbose]}, inline, {inline_size, 1024},
{parse_transform, lager_transform},
{erl_opts, [%{d,'DEBUG',true},
debug_info,
{d,'DEBUG',true},
strict_validation,
fail_on_warning,
%warn_missing_spec,
warn_bif_clash,
warn_deprecated_function,
warn_unused_vars,
warn_export_all,
warn_shadow_vars,
warn_unused_import,
warn_unused_function,
warn_bif_clash,
warn_unused_record,
warn_deprecated_function,
warn_obsolete_guard,
warn_export_vars,
warn_exported_vars,
warn_obsolete_guard,
warn_shadow_vars,
warn_untyped_record,
warn_unused_function,
warn_unused_import,
warn_unused_record,
warn_unused_vars
{parse_transform, lager_transform}
%warn_missing_spec,
%strict_validation
]}.
{xref_checks, [undefined_function_calls, deprecated_function_calls]}.
@ -38,8 +36,8 @@
{port_specs, [{"priv/wterl.so", ["c_src/*.c"]}]}.
{port_env, [
{"DRV_CFLAGS", "$DRV_CFLAGS -O3 -mtune=native -march=native -fPIC -Wall -Wextra -Werror -I c_src/system/include"},
{"DRV_LDFLAGS", "$DRV_LDFLAGS -Wl,-rpath,lib/wterl/priv:lib/wterl-0.9.0/priv:priv -Lc_src/system/lib -lwiredtiger"}
{"DRV_CFLAGS", "$DRV_CFLAGS -fPIC -Wall -Wextra -Werror -I c_src/system/include"},
{"DRV_LDFLAGS", "$DRV_LDFLAGS -Wl,-rpath,lib/wterl/priv:priv -Lc_src/system/lib -lwiredtiger"}
]}.
{pre_hooks, [{compile, "c_src/build_deps.sh compile"}]}.

View file

@ -22,33 +22,21 @@
%% -------------------------------------------------------------------
-define(ASYNC_NIF_CALL(Fun, Args),
F = fun(F, T) ->
R = erlang:make_ref(),
case erlang:apply(Fun, [R|Args]) of
{ok, {enqueued, PctBusy}} ->
if
PctBusy > 0.25 andalso PctBusy =< 1.0 ->
erlang:bump_reductions(erlang:trunc(2000 * PctBusy));
true ->
ok
end,
receive
{R, {error, shutdown}=Error} ->
%% Work unit was queued, but not executed.
Error;
{R, {error, _Reason}=Error} ->
%% Work unit returned an error.
Error;
{R, Reply} ->
Reply
end;
{error, eagain} ->
case T of
3 -> not_found;
_ -> F(F, T + 1)
end;
Other ->
Other
end
end,
F(F, 1)).
begin
NIFRef = erlang:make_ref(),
case erlang:apply(Fun, [NIFRef|Args]) of
{ok, enqueued} ->
receive
{NIFRef, {error, shutdown}=Error} ->
%% Work unit was queued, but not executed.
Error;
{NIFRef, {error, _Reason}=Error} ->
%% Work unit returned an error.
Error;
{NIFRef, Reply} ->
Reply
end;
Other ->
Other
end
end).

View file

@ -22,7 +22,6 @@
-module(riak_kv_wterl_backend).
-behavior(temp_riak_kv_backend).
-compile([{parse_transform, lager_transform}]).
%% KV Backend API
-export([api_version/0,
@ -43,7 +42,7 @@
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-compile(export_all).
-compiel(export_all).
-endif.
-define(API_VERSION, 1).
@ -51,9 +50,11 @@
%%-define(CAPABILITIES, [async_fold, indexes]).
-define(CAPABILITIES, [async_fold]).
-record(state, {table :: string(),
-record(state, {table :: atom(), % Atoms aka int's are easier to hash
type :: string(),
connection :: wterl:connection()}).
connection :: wterl:connection(),
is_empty_cursor :: wterl:cursor(),
status_cursor :: wterl:cursor()}).
-type state() :: #state{}.
-type config() :: [{atom(), term()}].
@ -106,36 +107,44 @@ start(Partition, Config) ->
"lsm"
end,
{ok, Connection} = establish_connection(Config, Type),
Table = Type ++ ":" ++ integer_to_list(Partition),
TableUri = Type ++ ":" ++ integer_to_list(Partition),
Compressor =
case wterl:config_value(block_compressor, Config, "snappy") of
{block_compressor, "snappy"}=C -> [C];
{block_compressor, "none"} -> [];
{block_compressor, none} -> [];
{block_compressor, _} -> [{block_compressor, "snappy"}];
_ -> [{block_compressor, "snappy"}]
{block_compressor, "bzip2"}=C -> [C];
{block_compressor, "none"} -> [];
{block_compressor, none} -> [];
{block_compressor, _} -> [{block_compressor, "snappy"}];
_ -> [{block_compressor, "snappy"}]
end,
TableOpts =
case Type of
"lsm" ->
[{internal_page_max, "128K"},
{leaf_page_max, "16K"},
{lsm, [
{bloom_config, [{leaf_page_max, "8MB"}]},
{bloom_bit_count, 28},
{bloom_hash_count, 19},
{bloom_oldest, true},
{chunk_size, "100MB"},
{merge_threads, 2}
]}
{leaf_page_max, "128K"},
{lsm_chunk_size, "25MB"},
{prefix_compression, false},
{lsm_bloom_newest, true},
{lsm_bloom_oldest, true} ,
{lsm_bloom_bit_count, 128},
{lsm_bloom_hash_count, 64},
{lsm_bloom_config, [{leaf_page_max, "8MB"}]}
] ++ Compressor;
"table" ->
Compressor
end,
case wterl:create(Connection, Table, TableOpts) of
Table = list_to_atom(TableUri),
case wterl:create(Connection, TableUri, TableOpts) of
ok ->
{ok, #state{table=Table, type=Type,
connection=Connection}};
case establish_utility_cursors(Connection, Table) of
{ok, IsEmptyCursor, StatusCursor} ->
{ok, #state{table=Table, type=Type,
connection=Connection,
is_empty_cursor=IsEmptyCursor,
status_cursor=StatusCursor}};
{error, Reason2} ->
{error, Reason2}
end;
{error, Reason3} ->
{error, Reason3}
end
@ -321,44 +330,26 @@ drop(#state{connection=Connection, table=Table}=State) ->
%% @doc Returns true if this wterl backend contains any
%% non-tombstone values; otherwise returns false.
-spec is_empty(state()) -> boolean().
is_empty(#state{connection=Connection, table=Table}) ->
case wterl:cursor_open(Connection, Table) of
{ok, Cursor} ->
IsEmpty =
case wterl:cursor_next(Cursor) of
not_found ->
true;
{error, {eperm, _}} ->
false; % TODO: review/fix this logic
_ ->
false
end,
wterl:cursor_close(Cursor),
IsEmpty;
{error, Reason2} ->
{error, Reason2}
is_empty(#state{is_empty_cursor=Cursor}) ->
wterl:cursor_reset(Cursor),
case wterl:cursor_next(Cursor) of
not_found -> true;
{error, {eperm, _}} -> false; % TODO: review/fix this logic
_ -> false
end.
%% @doc Get the status information for this wterl backend
-spec status(state()) -> [{atom(), term()}].
status(#state{connection=Connection, table=Table}) ->
[].
%% case wterl:cursor_open(Connection, "statistics:" ++ Table, [{statistics_fast, true}]) of
%% {ok, Cursor} ->
%% TheStats =
%% case fetch_status(Cursor) of
%% {ok, Stats} ->
%% Stats;
%% {error, {eperm, _}} -> % TODO: review/fix this logic
%% {ok, []};
%% _ ->
%% {ok, []}
%% end,
%% wterl:cursor_close(Cursor),
%% TheStats;
%% {error, Reason2} ->
%% {error, Reason2}
%% end.
status(#state{status_cursor=Cursor}) ->
wterl:cursor_reset(Cursor),
case fetch_status(Cursor) of
{ok, Stats} ->
Stats;
{error, {eperm, _}} -> % TODO: review/fix this logic
{ok, []};
_ ->
{ok, []}
end.
%% @doc Register an asynchronous callback
-spec callback(reference(), any(), state()) -> {ok, state()}.
@ -377,17 +368,24 @@ max_sessions(Config) ->
undefined -> 1024;
Size -> Size
end,
Est = RingSize * erlang:system_info(schedulers),
case Est > 8192 of
true ->
8192;
false ->
case Est < 1024 of
true ->
1024;
false ->
Est
end
Est = 100 * (RingSize * erlang:system_info(schedulers)), % TODO: review/fix this logic
case Est > 1000000000 of % Note: WiredTiger uses a signed int for this
true -> 1000000000;
false -> Est
end.
%% @private
establish_utility_cursors(Connection, Table) ->
case wterl:cursor_open(Connection, Table) of
{ok, IsEmptyCursor} ->
case wterl:cursor_open(Connection, "statistics:" ++ Table, [{statistics_fast, true}]) of
{ok, StatusCursor} ->
{ok, IsEmptyCursor, StatusCursor};
{error, Reason1} ->
{error, Reason1}
end;
{error, Reason2} ->
{error, Reason2}
end.
%% @private
@ -401,41 +399,35 @@ establish_connection(Config, Type) ->
ok = filelib:ensure_dir(filename:join(DataRoot, "x")),
%% WT Connection Options:
LogSetting = app_helper:get_prop_or_env(log, Config, wterl, false),
%% NOTE: LSM auto-checkpoints, so we don't have too.
CheckpointSetting =
case Type =:= "lsm" of
true ->
case LogSetting of
true ->
%% Turn checkpoints on if logging is on, checkpoints enable log archival.
app_helper:get_prop_or_env(checkpoint, Config, wterl, [{wait, 30}]); % in seconds
_ ->
[]
end;
[];
false ->
app_helper:get_prop_or_env(checkpoint, Config, wterl, [{wait, 30}])
app_helper:get_prop_or_env(checkpoint, Config, wterl, [{wait, 10}])
end,
RequestedCacheSize = app_helper:get_prop_or_env(cache_size, Config, wterl),
ConnectionOpts =
orddict:from_list(
[ wterl:config_value(create, Config, true),
wterl:config_value(checkpoint_sync, Config, false),
wterl:config_value(transaction_sync, Config, "none"),
wterl:config_value(log, Config, [{enabled, LogSetting}]),
wterl:config_value(mmap, Config, false),
wterl:config_value(checkpoint, Config, CheckpointSetting),
wterl:config_value(sync, Config, false),
wterl:config_value(logging, Config, true),
wterl:config_value(transactional, Config, true),
wterl:config_value(session_max, Config, max_sessions(Config)),
wterl:config_value(cache_size, Config, size_cache(RequestedCacheSize)),
wterl:config_value(statistics, Config, [ "fast", "clear"]),
wterl:config_value(statistics_log, Config, [{wait, 600}]), % in seconds
wterl:config_value(statistics_log, Config, [{wait, 30}]), % sec
wterl:config_value(verbose, Config, [ "salvage", "verify"
% Note: for some unknown reason, if you add these additional
% verbose flags Erlang SEGV's "size_object: bad tag for 0x80"
% no idea why... you've been warned.
% no idea why... yet... you've been warned.
%"block", "shared_cache", "reconcile", "evict", "lsm",
%"fileops", "read", "write", "readserver", "evictserver",
%"hazard", "mutex", "ckpt"
]) ] ++ proplists:get_value(wterl, Config, [])), % sec
]) ] ++ CheckpointSetting ++ proplists:get_value(wterl, Config, [])), % sec
%% WT Session Options:
SessionOpts = [{isolation, "snapshot"}],
@ -556,15 +548,15 @@ from_index_key(LKey) ->
%% @private
%% Return all status from wterl statistics cursor
%% fetch_status(Cursor) ->
%% {ok, fetch_status(Cursor, wterl:cursor_next_value(Cursor), [])}.
%% fetch_status(_Cursor, {error, _}, Acc) ->
%% lists:reverse(Acc);
%% fetch_status(_Cursor, not_found, Acc) ->
%% lists:reverse(Acc);
%% fetch_status(Cursor, {ok, Stat}, Acc) ->
%% [What,Val|_] = [binary_to_list(B) || B <- binary:split(Stat, [<<0>>], [global])],
%% fetch_status(Cursor, wterl:cursor_next_value(Cursor), [{What,Val}|Acc]).
fetch_status(Cursor) ->
{ok, fetch_status(Cursor, wterl:cursor_next_value(Cursor), [])}.
fetch_status(_Cursor, {error, _}, Acc) ->
lists:reverse(Acc);
fetch_status(_Cursor, not_found, Acc) ->
lists:reverse(Acc);
fetch_status(Cursor, {ok, Stat}, Acc) ->
[What,Val|_] = [binary_to_list(B) || B <- binary:split(Stat, [<<0>>], [global])],
fetch_status(Cursor, wterl:cursor_next_value(Cursor), [{What,Val}|Acc]).
size_cache(RequestedSize) ->
Size =
@ -579,7 +571,7 @@ size_cache(RequestedSize) ->
TotalRAM = proplists:get_value(system_total_memory, Memory),
FreeRAM = proplists:get_value(free_memory, Memory),
UsedByBeam = proplists:get_value(total, erlang:memory()),
Target = ((TotalRAM - UsedByBeam) div 3),
Target = ((TotalRAM - UsedByBeam) div 4),
FirstGuess = (Target - (Target rem (1024 * 1024))),
SecondGuess =
case FirstGuess > FreeRAM of

View file

@ -69,6 +69,8 @@
-export([set_event_handler_pid/1]).
-include("async_nif.hrl").
-ifdef(TEST).
-ifdef(EQC).
-include_lib("eqc/include/eqc.hrl").
@ -88,7 +90,6 @@
-on_load(init/0).
-include("async_nif.hrl").
-define(nif_stub, nif_stub_error(?LINE)).
nif_stub_error(Line) ->
erlang:nif_error({nif_not_loaded,module,?MODULE,line,Line}).
@ -96,8 +97,7 @@ nif_stub_error(Line) ->
-spec init() -> ok | {error, any()}.
init() ->
erlang:load_nif(filename:join([priv_dir(), atom_to_list(?MODULE)]),
[{wterl_vsn, "942e51b"},
{wiredtiger_vsn, "1.6.4-275-g9c44420"}]). %% TODO automate these
[{wterl_vsn, "a1459ce"}, {wiredtiger_vsn, "1.5.2-2-g8f2685b"}]).
-spec connection_open(string(), config_list()) -> {ok, connection()} | {error, term()}.
-spec connection_open(string(), config_list(), config_list()) -> {ok, connection()} | {error, term()}.
@ -134,71 +134,71 @@ connection_close(ConnRef) ->
conn_close_nif(_AsyncRef, _ConnRef) ->
?nif_stub.
-spec create(connection(), string()) -> ok | {error, term()}.
-spec create(connection(), string(), config_list()) -> ok | {error, term()}.
-spec create(connection(), atom()) -> ok | {error, term()}.
-spec create(connection(), atom(), config_list()) -> ok | {error, term()}.
create(Ref, Name) ->
create(Ref, Name, []).
create(Ref, Name, Config) ->
?ASYNC_NIF_CALL(fun create_nif/4, [Ref, Name, config_to_bin(Config)]).
-spec create_nif(reference(), connection(), string(), config()) -> ok | {error, term()}.
-spec create_nif(reference(), connection(), atom(), config()) -> ok | {error, term()}.
create_nif(_AsyncNif, _Ref, _Name, _Config) ->
?nif_stub.
-spec drop(connection(), string()) -> ok | {error, term()}.
-spec drop(connection(), string(), config_list()) -> ok | {error, term()}.
-spec drop(connection(), atom()) -> ok | {error, term()}.
-spec drop(connection(), atom(), config_list()) -> ok | {error, term()}.
drop(Ref, Name) ->
drop(Ref, Name, [{force, true}]).
drop(Ref, Name, Config) ->
?ASYNC_NIF_CALL(fun drop_nif/4, [Ref, Name, config_to_bin(Config)]).
-spec drop_nif(reference(), connection(), string(), config()) -> ok | {error, term()}.
-spec drop_nif(reference(), connection(), atom(), config()) -> ok | {error, term()}.
drop_nif(_AsyncRef, _Ref, _Name, _Config) ->
?nif_stub.
-spec delete(connection(), string(), key()) -> ok | {error, term()}.
-spec delete(connection(), atom(), key()) -> ok | {error, term()}.
delete(Ref, Table, Key) ->
?ASYNC_NIF_CALL(fun delete_nif/4, [Ref, Table, Key]).
-spec delete_nif(reference(), connection(), string(), key()) -> ok | {error, term()}.
-spec delete_nif(reference(), connection(), atom(), key()) -> ok | {error, term()}.
delete_nif(_AsyncRef, _Ref, _Table, _Key) ->
?nif_stub.
-spec get(connection(), string(), key()) -> {ok, value()} | not_found | {error, term()}.
-spec get(connection(), atom(), key()) -> {ok, value()} | not_found | {error, term()}.
get(Ref, Table, Key) ->
?ASYNC_NIF_CALL(fun get_nif/4, [Ref, Table, Key]).
-spec get_nif(reference(), connection(), string(), key()) -> {ok, value()} | not_found | {error, term()}.
-spec get_nif(reference(), connection(), atom(), key()) -> {ok, value()} | not_found | {error, term()}.
get_nif(_AsyncRef, _Ref, _Table, _Key) ->
?nif_stub.
-spec put(connection(), string(), key(), value()) -> ok | {error, term()}.
-spec put(connection(), atom(), key(), value()) -> ok | {error, term()}.
put(Ref, Table, Key, Value) ->
?ASYNC_NIF_CALL(fun put_nif/5, [Ref, Table, Key, Value]).
-spec put_nif(reference(), connection(), string(), key(), value()) -> ok | {error, term()}.
-spec put_nif(reference(), connection(), atom(), key(), value()) -> ok | {error, term()}.
put_nif(_AsyncRef, _Ref, _Table, _Key, _Value) ->
?nif_stub.
-spec rename(connection(), string(), string()) -> ok | {error, term()}.
-spec rename(connection(), string(), string(), config_list()) -> ok | {error, term()}.
-spec rename(connection(), atom(), string()) -> ok | {error, term()}.
-spec rename(connection(), atom(), string(), config_list()) -> ok | {error, term()}.
rename(Ref, OldName, NewName) ->
rename(Ref, OldName, NewName, []).
rename(Ref, OldName, NewName, Config) ->
?ASYNC_NIF_CALL(fun rename_nif/5, [Ref, OldName, NewName, config_to_bin(Config)]).
-spec rename_nif(reference(), connection(), string(), string(), config()) -> ok | {error, term()}.
-spec rename_nif(reference(), connection(), atom(), string(), config()) -> ok | {error, term()}.
rename_nif(_AsyncRef, _Ref, _OldName, _NewName, _Config) ->
?nif_stub.
-spec salvage(connection(), string()) -> ok | {error, term()}.
-spec salvage(connection(), string(), config_list()) -> ok | {error, term()}.
-spec salvage(connection(), atom()) -> ok | {error, term()}.
-spec salvage(connection(), atom(), config_list()) -> ok | {error, term()}.
salvage(Ref, Name) ->
salvage(Ref, Name, []).
salvage(Ref, Name, Config) ->
?ASYNC_NIF_CALL(fun salvage_nif/4, [Ref, Name, config_to_bin(Config)]).
-spec salvage_nif(reference(), connection(), string(), config()) -> ok | {error, term()}.
-spec salvage_nif(reference(), connection(), atom(), config()) -> ok | {error, term()}.
salvage_nif(_AsyncRef, _Ref, _Name, _Config) ->
?nif_stub.
@ -213,10 +213,10 @@ checkpoint(Ref, Config) ->
checkpoint_nif(_AsyncRef, _Ref, _Config) ->
?nif_stub.
-spec truncate(connection(), string()) -> ok | {error, term()}.
-spec truncate(connection(), string(), config_list()) -> ok | {error, term()}.
-spec truncate(connection(), string(), binary() | first, binary() | last) -> ok | {error, term()}.
-spec truncate(connection(), string(), binary() | first, binary() | last, config()) -> ok | {error, term()}.
-spec truncate(connection(), atom()) -> ok | {error, term()}.
-spec truncate(connection(), atom(), config_list()) -> ok | {error, term()}.
-spec truncate(connection(), atom(), binary() | first, binary() | last) -> ok | {error, term()}.
-spec truncate(connection(), atom(), binary() | first, binary() | last, config()) -> ok | {error, term()}.
truncate(Ref, Name) ->
truncate(Ref, Name, first, last, []).
truncate(Ref, Name, Config) ->
@ -226,41 +226,40 @@ truncate(Ref, Name, Start, Stop) ->
truncate(Ref, Name, Start, Stop, Config) ->
?ASYNC_NIF_CALL(fun truncate_nif/6, [Ref, Name, Start, Stop, config_to_bin(Config)]).
-spec truncate_nif(reference(), connection(), string(), cursor() | first, cursor() | last, config()) -> ok | {error, term()}.
-spec truncate_nif(reference(), connection(), atom(), cursor() | first, cursor() | last, config()) -> ok | {error, term()}.
truncate_nif(_AsyncRef, _Ref, _Name, _Start, _Stop, _Config) ->
?nif_stub.
-spec upgrade(connection(), string()) -> ok | {error, term()}.
-spec upgrade(connection(), string(), config_list()) -> ok | {error, term()}.
-spec upgrade(connection(), atom()) -> ok | {error, term()}.
-spec upgrade(connection(), atom(), config_list()) -> ok | {error, term()}.
upgrade(Ref, Name) ->
upgrade(Ref, Name, []).
upgrade(Ref, Name, Config) ->
?ASYNC_NIF_CALL(fun upgrade_nif/4, [Ref, Name, config_to_bin(Config)]).
-spec upgrade_nif(reference(), connection(), string(), config()) -> ok | {error, term()}.
-spec upgrade_nif(reference(), connection(), atom(), config()) -> ok | {error, term()}.
upgrade_nif(_AsyncRef, _Ref, _Name, _Config) ->
?nif_stub.
-spec verify(connection(), string()) -> ok | {error, term()}.
-spec verify(connection(), string(), config_list()) -> ok | {error, term()}.
-spec verify(connection(), atom()) -> ok | {error, term()}.
-spec verify(connection(), atom(), config_list()) -> ok | {error, term()}.
verify(Ref, Name) ->
verify(Ref, Name, []).
verify(Ref, Name, Config) ->
?ASYNC_NIF_CALL(fun verify_nif/4, [Ref, Name, config_to_bin(Config)]).
-spec verify_nif(reference(), connection(), string(), config()) -> ok | {error, term()}.
-spec verify_nif(reference(), connection(), atom(), config()) -> ok | {error, term()}.
verify_nif(_AsyncRef, _Ref, _Name, _Config) ->
?nif_stub.
-spec cursor_open(connection(), string()) -> {ok, cursor()} | {error, term()}.
-spec cursor_open(connection(), string(), config_list()) -> {ok, cursor()} | {error, term()}.
-spec cursor_open(connection(), atom()) -> {ok, cursor()} | {error, term()}.
-spec cursor_open(connection(), atom(), config_list()) -> {ok, cursor()} | {error, term()}.
cursor_open(Ref, Table) ->
cursor_open(Ref, Table, []).
cursor_open(Ref, Table, Config) ->
?ASYNC_NIF_CALL(fun cursor_open_nif/4, [Ref, Table, config_to_bin(Config)]).
-spec cursor_open_nif(reference(), connection(), string(), config()) -> {ok, cursor()} | {error, term()}.
-spec cursor_open_nif(reference(), connection(), atom(), config()) -> {ok, cursor()} | {error, term()}.
cursor_open_nif(_AsyncRef, _Ref, _Table, _Config) ->
?nif_stub.
@ -454,26 +453,16 @@ config_to_bin([], Acc) ->
config_to_bin([{Key, Value} | Rest], Acc) ->
ConfigTypes =
[{block_compressor, {string, quoted}},
{bloom_bit_count, integer},
{bloom_config, config},
{bloom_hash_count, integer},
{bloom_newest, bool},
{bloom_oldest, bool},
{cache_size, string},
{checkpoint, config},
{checkpoint_sync, bool},
{checksum, string},
{chunk_size, string},
{create, bool},
{direct_io, list},
{drop, list},
{enabled, bool},
{error_prefix, string},
{eviction_target, integer},
{eviction_trigger, integer},
{extensions, {list, quoted}},
{statistics_fast, bool},
{file_max, string},
{force, bool},
{from, string},
{hazard_max, integer},
@ -483,21 +472,22 @@ config_to_bin([{Key, Value} | Rest], Acc) ->
{isolation, string},
{key_type, string},
{leaf_page_max, string},
{log, config},
{lsm, config},
{mmap, bool},
{merge_threads, integer},
{logging, bool},
{lsm_bloom_bit_count, integer},
{lsm_bloom_config, config},
{lsm_bloom_hash_count, integer},
{lsm_bloom_newest, bool},
{lsm_bloom_oldest, bool},
{lsm_chunk_size, string},
{prefix_compression, bool},
{lsm_merge_threads, integer},
{multiprocess, bool},
{name, string},
{overwrite, bool},
{prefix_compression, bool},
{raw, bool},
{session_max, integer},
{statistics, list},
{statistics_log, config},
{sync, bool},
{target, {list, quoted}},
{to, string},
{transaction_sync, string},
{transactional, bool},
{verbose, list},
{wait, integer}],
@ -533,7 +523,7 @@ set_event_handler_pid(Pid)
-define(TEST_DATA_DIR, "test/wterl.basic").
open_test_conn(DataDir) ->
open_test_conn(DataDir, [{create,true},{cache_size,"1GB"},{session_max, 8192}]).
open_test_conn(DataDir, [{create,true},{cache_size,"100MB"},{session_max, 8192}]).
open_test_conn(DataDir, OpenConfig) ->
{ok, CWD} = file:get_cwd(),
rmdir:path(filename:join([CWD, DataDir])), %?cmd("rm -rf " ++ filename:join([CWD, DataDir])),
@ -582,6 +572,12 @@ conn_test_() ->
ConnRef = open_test_table(ConnRef, "table", [{block_compressor, "snappy"}]),
?assertMatch(ok, verify(ConnRef, "table:test")),
?assertMatch(ok, drop(ConnRef, "table:test"))
end},
{"create, verify, drop a table(btree, bzip2)",
fun() ->
ConnRef = open_test_table(ConnRef, "table", [{block_compressor, "bzip2"}]),
?assertMatch(ok, verify(ConnRef, "table:test")),
?assertMatch(ok, drop(ConnRef, "table:test"))
end}
]}
end}.
@ -595,68 +591,6 @@ insert_delete_test() ->
?assertMatch(not_found, get(ConnRef, "table:test", <<"a">>)),
ok = connection_close(ConnRef).
%% cursor_fold_keys_test() ->
%% ConnRef = open_test_conn(?TEST_DATA_DIR),
%% ConnRef = open_test_table(ConnRef),
%% [wterl:put(ConnRef, "table:test-fold", crypto:sha(<<X>>),
%% crypto:rand_bytes(crypto:rand_uniform(128, 4096)))
%% || X <- lists:seq(1, 2000)],
%% Cursor = wterl:cursor_open(ConnRef, "table:test-fold"),
%% try
%% {Result, _} = wterl:fold_keys(Cursor, fun(Key, Acc) -> [Key | Acc] end, [])
%% catch
%% _:_ -> wterl:cursor_close(Cursor)
%% after
%% ok = connection_close(ConnRef)
%% end.
%% ?assertMatch(lists:sort(Result),
%% lists:sort([crypto:sha(<<X>>) || X <- lists:seq(1, 2000)])).
many_open_tables_test_() ->
{timeout, 120,
fun() ->
ConnOpts = [{create,true},{cache_size,"1GB"},{session_max, 8192}],
DataDir = ?TEST_DATA_DIR,
KeyGen =
fun(X) ->
crypto:hash(sha, <<X>>)
end,
ValGen =
fun() ->
crypto:rand_bytes(crypto:rand_uniform(128, 4096))
end,
TableNameGen =
fun(X) ->
"lsm:" ++ integer_to_list(X)
end,
NumTables = 16, N = 100,
ConnRef = open_test_conn(DataDir, ConnOpts),
Parent = self(),
[ok = wterl:create(ConnRef, TableNameGen(X), [{checksum, "uncompressed"}]) || X <- lists:seq(0, NumTables)],
[spawn(fun() ->
TableName = TableNameGen(X),
[case wterl:put(ConnRef, TableName, KeyGen(P), ValGen()) of
ok -> ok;
{error, {enoent, _}} -> io:format("put failed, table missing ~p~n", [TableName])
end || P <- lists:seq(1, N)],
[case wterl:get(ConnRef, TableName, KeyGen(P)) of
{ok, _} -> ok;
{error, {enoent, _}} -> io:format("get failed, table missing ~p~n", [TableName])
end || P <- lists:seq(1, N)],
[case wterl:delete(ConnRef, TableName, KeyGen(P)) of
ok -> ok;
{error, {enoent, _}} -> io:format("delete failed, table missing ~p~n", [TableName])
end || P <- lists:seq(1, N)],
Parent ! done
end) || X <- lists:seq(0, NumTables)],
[receive done -> ok end || _ <- lists:seq(0, NumTables)],
[case wterl:drop(ConnRef, TableNameGen(X)) of
ok -> ok;
{error, {enoent, _}} -> io:format("drop failed, table missing ~p~n", [TableNameGen(X)])
end || X <- lists:seq(0, NumTables)],
ok = wterl:connection_close(ConnRef)
end}.
init_test_table() ->
ConnRef = open_test_conn(?TEST_DATA_DIR),
ConnRef = open_test_table(ConnRef),
@ -689,7 +623,7 @@ various_online_test_() ->
end},
{"truncate entire table",
fun() ->
?assertMatch(ok, truncate(ConnRef, "table:test")),
?assertMatch(ok, truncate(ConnRef, "table:test")),
?assertMatch(not_found, get(ConnRef, "table:test", <<"a">>))
end},
%% {"truncate range [<<b>>..last], ensure value outside range is found after",
@ -737,7 +671,7 @@ various_maintenance_test_() ->
fun () ->
{ok, CWD} = file:get_cwd(),
?assertMatch(ok, filelib:ensure_dir(filename:join([?TEST_DATA_DIR, "x"]))),
{ok, ConnRef} = connection_open(filename:join([CWD, ?TEST_DATA_DIR]), [{create,true}]),
{ok, ConnRef} = connection_open(filename:join([CWD, ?TEST_DATA_DIR]), []),
ConnRef
end,
fun (ConnRef) ->
@ -881,7 +815,7 @@ various_cursor_test_() ->
end},
{"update an item using a cursor",
fun() ->
{ok, Cursor} = cursor_open(ConnRef, "table:test", [{overwrite, false}, {raw,true}]),
{ok, Cursor} = cursor_open(ConnRef, "table:test"),
?assertMatch(ok, cursor_update(Cursor, <<"g">>, <<"goji berries">>)),
?assertMatch(not_found, cursor_update(Cursor, <<"k">>, <<"kumquat">>)),
?assertMatch(ok, cursor_close(Cursor)),
@ -889,7 +823,7 @@ various_cursor_test_() ->
end},
{"remove an item using a cursor",
fun() ->
{ok, Cursor} = cursor_open(ConnRef, "table:test", [{overwrite, false}, {raw,true}]),
{ok, Cursor} = cursor_open(ConnRef, "table:test"),
?assertMatch(ok, cursor_remove(Cursor, <<"g">>)),
?assertMatch(not_found, cursor_remove(Cursor, <<"l">>)),
?assertMatch(ok, cursor_close(Cursor)),
@ -934,7 +868,7 @@ prop_put_delete() ->
DataDir = "test/wterl.putdelete.qc",
Table = "table:eqc",
{ok, CWD} = file:get_cwd(),
rmdir:path(filename:join([CWD, DataDir])), % ?cmd("rm -rf " ++ filename:join([CWD, DataDir])),
rmdir(filename:join([CWD, DataDir])), % ?cmd("rm -rf " ++ filename:join([CWD, DataDir])),
ok = filelib:ensure_dir(filename:join([DataDir, "x"])),
{ok, ConnRef} = wterl:connection_open(DataDir, [{create,true}]),
try

View file

@ -1,91 +0,0 @@
-module(basho_bench_driver_wterl).
-record(state, { connection, uri }).
-export([new/1,
run/4]).
-include_lib("basho_bench/include/basho_bench.hrl").
%% ====================================================================
%% API
%% ====================================================================
new(1) ->
%% Make sure wterl is available
case code:which(wterl) of
non_existing ->
?FAIL_MSG("~s requires wterl to be available on code path.\n",
[?MODULE]);
_ ->
ok
end,
{ok, _} = wterl_sup:start_link(),
setup(1);
new(Id) ->
setup(Id).
setup(Id) ->
%% Get the target directory
Dir = basho_bench_config:get(wterl_dir, "/tmp"),
Config = basho_bench_config:get(wterl, []),
Uri = config_value(table_uri, Config, "lsm:test"),
ConnectionOpts = config_value(connection, Config, [{create,true},{session_max, 8192}]),
SessionOpts = config_value(session, Config, []),
TableOpts = config_value(table, Config, []),
%% Start WiredTiger
Connection =
case wterl_conn:is_open() of
false ->
case wterl_conn:open(Dir, ConnectionOpts, SessionOpts) of
{ok, Conn} ->
Conn;
{error, Reason0} ->
?FAIL_MSG("Failed to establish a WiredTiger connection for ~p, wterl backend unable to start: ~p\n", [Id, Reason0])
end;
true ->
{ok, Conn} = wterl_conn:get(),
Conn
end,
case wterl:create(Connection, Uri, TableOpts) of
ok ->
{ok, #state{connection=Connection, uri=Uri}};
{error, Reason} ->
{error, Reason}
end.
run(get, KeyGen, _ValueGen, #state{connection=Connection, uri=Uri}=State) ->
case wterl:get(Connection, Uri, KeyGen()) of
{ok, _Value} ->
{ok, State};
not_found ->
{ok, State};
{error, Reason} ->
{error, Reason}
end;
run(put, KeyGen, ValueGen, #state{connection=Connection, uri=Uri}=State) ->
case wterl:put(Connection, Uri, KeyGen(), ValueGen()) of
ok ->
{ok, State};
{error, Reason} ->
{error, Reason}
end;
run(delete, KeyGen, _ValueGen, #state{connection=Connection, uri=Uri}=State) ->
case wterl:delete(Connection, Uri, KeyGen()) of
ok ->
{ok, State};
not_found ->
{ok, State};
{error, Reason} ->
{error, Reason}
end.
config_value(Key, Config, Default) ->
case proplists:get_value(Key, Config) of
undefined ->
Default;
Value ->
Value
end.

View file

@ -1,102 +0,0 @@
%%-*- mode: erlang -*-
%% ex: ft=erlang ts=4 sw=4 et
%% How to:
%% * put the wterl-b_b.config file into basho_bench/examples
%% * put the basho_bench_driver_wterl.erl into basho_bench/src
%% * make clean in basho_bench, then make
%% * edit examples/wterl-b_b.config
%% - change {code_paths, ["../wterl"]}. to be a relative path to your
%% wterl directory
%% - change {wterl_dir, "/home/gburd/ws/basho_bench/data"}. to a fully
%% qualified location for your test data files (mkdir that directory
%% yourself, if it doesn't exist the test will fail 'enoent')
%% * to run, replace this path with the proper path on your system:
%% LD_LIBRARY_PATH=/home/you/wterl/priv ./basho_bench examples/wterl-b_b.config
%% * the test should run for 10 minutes (as it is configured right now)
%% with 4 concurrent workers accessing the same table
%%
%% Note:
%% There are two config sections in wt.config {wterl, [ ... ]}. and
%% {wterl_, [ ... ]}. The one being used is named "wterl" the other
%% config is ignored. I setup an LSM and BTREE config and to choose
%% which is run you just rename those two sections (turn one off by
%% adding a "_" to the name and take the "_" out of the other's name).
{mode, max}.
{duration, 10}.
{concurrent, 16}.
{report_interval, 1}.
{pb_timeout_general, 1000}. % ms
%{pb_timeout_read, ?}.
%{pb_timeout_write, ?}.
%{pb_timeout_listkeys, ?}.
%{pb_timeout_mapreduce, ?}.
{driver, basho_bench_driver_wterl}.
{key_generator, {int_to_bin_littleendian,{uniform_int, 5000000}}}.
{value_generator, {fixed_bin, 10000}}.
{operations, [{get, 4}, {put, 4}, {delete, 2}]}.
{code_paths, ["../wterl"]}.
{wterl_dir, "/home/gburd/ws/basho_bench/data"}.
%% lsm
{wterl, [
{connection, [
{create, true},
{session_sync, false},
{transaction_sync, "none"},
{log, [{enabled, false}]},
{session_max, 1024},
{cache_size, 4294967296},
{verbose, []},
% "salvage", "verify" are okay, however...
% for some unknown reason, if you add these additional
% verbose flags Erlang SEGV's "size_object: bad tag for 0x80"
% no idea why... yet... you've been warned.
%"block", "shared_cache", "reconcile", "evict", "lsm",
%"fileops", "read", "write", "readserver", "evictserver",
%"hazard", "mutex", "ckpt"
{statistics_log, [{wait, 30}]}
]},
{session, [ {isolation, "snapshot"} ]},
{table_uri, "lsm:test"},
{lsm_merge_threads, 2},
{table, [
{internal_page_max, "128K"},
{leaf_page_max, "128K"},
{lsm_chunk_size, "25MB"},
{lsm_bloom_newest, true},
{lsm_bloom_oldest, true} ,
{lsm_bloom_bit_count, 128},
{lsm_bloom_hash_count, 64},
{lsm_bloom_config, [{leaf_page_max, "8MB"}]},
{block_compressor, "snappy"} % bzip2
]}
]}.
%% btree
{wterl_, [
{connection, [
{create, true},
{session_sync, false},
{transaction_sync, "none"},
{log, [{enabled, false}]},
{session_max, 1024},
{cache_size, 4294967296},
{verbose, []},
% "salvage", "verify" are okay, however...
% for some unknown reason, if you add these additional
% verbose flags Erlang SEGV's "size_object: bad tag for 0x80"
% no idea why... yet... you've been warned.
%"block", "shared_cache", "reconcile", "evict", "lsm",
%"fileops", "read", "write", "readserver", "evictserver",
%"hazard", "mutex", "ckpt"
{statistics_log, [{wait, 30}]},
{checkpoint, [{await, 10}]}
]},
{session, [ {isolation, "snapshot"} ]},
{table_uri, "table:test"},
{table, [
{block_compressor, "snappy"} % bzip2
]}
]}.

View file

@ -2,9 +2,10 @@
# Note: also, remember to update version numbers in rpath specs so that shared libs can be found at runtime!!!
wterl=`git describe --always --long --tags`
wiredtiger0=`(cd c_src/wiredtiger-[0-9.]* && git describe --always --long --tags)`
wterl=`git log -n 1 --pretty=format:"%H"`
wiredtiger0=`(cd c_src/wiredtiger && git log -n 1 --pretty=format:"%H")`
wiredtiger=`echo $wiredtiger0 | awk '{print $2}'`
echo $wterl
echo $wiredtiger