Execute NIF calls on non-scheduler threads asynchronously #6

Merged
gburd merged 72 commits from gsb-async-nifs3 into master 2013-04-23 00:54:56 +00:00
22 changed files with 4122 additions and 1102 deletions

4
.gdbinit Normal file
View file

@ -0,0 +1,4 @@
handle SIGPIPE nostop noprint pass
#b erl_nif.c:1203
#b sys/unix/erl_unix_sys_ddll.c:234

6
.gitignore vendored
View file

@ -1,8 +1,12 @@
*.beam
.eunit
ebin
priv/*.so
c_src/system
c_src/wiredtiger*/
c_src/*.o
c_src/bzip2-1.0.6
c_src/snappy-1.0.4
deps/
priv/
log/
*~

2
AUTHORS Normal file
View file

@ -0,0 +1,2 @@
-author('Steve Vinoski <steve@basho.com>').
-author('Gregory Burd <steve@basho.com>'). % greg@burd.me @gregburd

View file

@ -3,6 +3,7 @@ TARGET= wterl
REBAR= ./rebar
#REBAR= /usr/bin/env rebar
ERL= /usr/bin/env erl
ERLEXEC= ${ERL_ROOTDIR}/lib/erlang/erts-5.9.1/bin/erlexec
DIALYZER= /usr/bin/env dialyzer
@ -13,12 +14,14 @@ all: compile
deps: get-deps
get-deps:
c_src/build_deps.sh get-deps
@$(REBAR) get-deps
update-deps:
c_src/build_deps.sh update-deps
@$(REBAR) update-deps
c_src/wterl.o:
c_src/wterl.o: c_src/async_nif.h
touch c_src/wterl.c
ebin/app_helper.beam:
@ -38,22 +41,41 @@ eunit: compile
@$(REBAR) eunit skip_deps=true
eunit_console:
@$(ERL) -pa .eunit deps/*/ebin
@$(ERL) -pa .eunit deps/lager/ebin
plt: compile
@$(DIALYZER) --build_plt --output_plt .$(TARGET).plt -pa deps/*/ebin --apps kernel stdlib
@$(DIALYZER) --build_plt --output_plt .$(TARGET).plt -pa deps/lager/ebin --apps kernel stdlib
analyze: compile
$(DIALYZER) --plt .$(TARGET).plt -pa deps/*/ebin ebin
$(DIALYZER) --plt .$(TARGET).plt -pa deps/lager/ebin ebin
repl:
$(ERL) -pz deps/*/ebin -pa ebin
$(ERL) -pz deps/lager/ebin -pa ebin
gdb-repl:
USE_GDB=1 $(ERL) -pz deps/*/ebin -pa ebin
USE_GDB=1 $(ERL) -pz deps/lager/ebin -pa ebin
eunit-repl:
$(ERL) -pa .eunit -pz deps/*/ebin -pz ebin -exec 'cd(".eunit").'
$(ERL) -pz deps/lager/ebin -pa ebin -pa .eunit
gdb-eunit-repl:
USE_GDB=1 $(ERL) -pa .eunit -pz deps/*/ebin -pz ebin -exec 'cd(".eunit").'
USE_GDB=1 $(ERL) -pa .eunit -pz deps/lager/ebin -pz ebin -exec 'cd(".eunit").'
ERL_TOP= /home/gburd/eng/otp_R15B01
CERL= ${ERL_TOP}/bin/cerl
VALGRIND_MISC_FLAGS= "--verbose --leak-check=full --show-reachable=yes --trace-children=yes --track-origins=yes --suppressions=${ERL_TOP}/erts/emulator/valgrind/suppress.standard --show-possibly-lost=no --malloc-fill=AB --free-fill=CD"
helgrind:
valgrind --verbose --tool=helgrind \
--leak-check=full
--show-reachable=yes \
--trace-children=yes \
--track-origins=yes \
--suppressions=${ERL_TOP}/erts/emulator/valgrind/suppress.standard \
--show-possibly-lost=no \
--malloc-fill=AB \
--free-fill=CD ${ERLEXEC} -pz deps/lager/ebin -pa ebin -pa .eunit
valgrind:
${CERL} -valgrind ${VALGRIND_FLAGS} --log-file=${ROOTDIR}/valgrind_log-beam.smp.%p -- -pz deps/lager/ebin -pa ebin -pa .eunit -exec 'eunit:test(wterl).'

411
c_src/async_nif.h Normal file
View file

@ -0,0 +1,411 @@
/*
* async_nif: An async thread-pool layer for Erlang's NIF API
*
* Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved.
* Author: Gregory Burd <greg@basho.com> <greg@burd.me>
*
* This file is provided to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef __ASYNC_NIF_H__
#define __ASYNC_NIF_H__
#if defined(__cplusplus)
extern "C" {
#endif
#include <assert.h>
#include "fifo_q.h"
#ifdef ASYNC_NIF_STATS
#include "stats.h" // TODO: measure, measure... measure again
#endif
#define ASYNC_NIF_MAX_WORKERS 128
#define ASYNC_NIF_WORKER_QUEUE_SIZE 500
DECL_FIFO_QUEUE(reqs, struct async_nif_req_entry);
struct async_nif_work_queue {
ErlNifMutex *reqs_mutex;
ErlNifCond *reqs_cnd;
FIFO_QUEUE_TYPE(reqs) reqs;
};
struct async_nif_worker_entry {
ErlNifTid tid;
unsigned int worker_id;
struct async_nif_state *async_nif;
struct async_nif_work_queue *q;
};
struct async_nif_state {
unsigned int shutdown;
unsigned int num_workers;
struct async_nif_worker_entry worker_entries[ASYNC_NIF_MAX_WORKERS];
unsigned int num_queues;
unsigned int next_q;
struct async_nif_work_queue queues[];
};
#define ASYNC_NIF_DECL(decl, frame, pre_block, work_block, post_block) \
struct decl ## _args frame; \
static void fn_work_ ## decl (ErlNifEnv *env, ERL_NIF_TERM ref, ErlNifPid *pid, unsigned int worker_id, struct decl ## _args *args) work_block \
static void fn_post_ ## decl (struct decl ## _args *args) { \
do post_block while(0); \
} \
static ERL_NIF_TERM decl(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv_in[]) { \
struct decl ## _args on_stack_args; \
struct decl ## _args *args = &on_stack_args; \
struct decl ## _args *copy_of_args; \
struct async_nif_req_entry *req = NULL; \
const char *affinity = NULL; \
ErlNifEnv *new_env = NULL; \
/* argv[0] is a ref used for selective recv */ \
const ERL_NIF_TERM *argv = argv_in + 1; \
argc -= 1; \
/* Note: !!! this assumes that the first element of priv_data is ours */ \
struct async_nif_state *async_nif = *(struct async_nif_state**)enif_priv_data(env); \
if (async_nif->shutdown) \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "shutdown")); \
if (!(new_env = enif_alloc_env())) { \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "enomem")); \
} \
do pre_block while(0); \
req = (struct async_nif_req_entry*)enif_alloc(sizeof(struct async_nif_req_entry)); \
if (!req) { \
fn_post_ ## decl (args); \
enif_free_env(new_env); \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "enomem")); \
} \
memset(req, 0, sizeof(struct async_nif_req_entry)); \
copy_of_args = (struct decl ## _args *)enif_alloc(sizeof(struct decl ## _args)); \
if (!copy_of_args) { \
fn_post_ ## decl (args); \
enif_free(req); \
enif_free_env(new_env); \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "enomem")); \
} \
memcpy(copy_of_args, args, sizeof(struct decl ## _args)); \
req->env = new_env; \
req->ref = enif_make_copy(new_env, argv_in[0]); \
enif_self(env, &req->pid); \
req->args = (void*)copy_of_args; \
req->fn_work = (void (*)(ErlNifEnv *, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *))fn_work_ ## decl ; \
req->fn_post = (void (*)(void *))fn_post_ ## decl; \
int h = -1; \
if (affinity) \
h = async_nif_str_hash_func(affinity) % async_nif->num_queues; \
ERL_NIF_TERM reply = async_nif_enqueue_req(async_nif, req, h); \
if (!reply) { \
fn_post_ ## decl (args); \
enif_free(req); \
enif_free_env(new_env); \
enif_free(copy_of_args); \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "shutdown")); \
} \
return reply; \
}
#define ASYNC_NIF_INIT(name) \
static ErlNifMutex *name##_async_nif_coord = NULL;
#define ASYNC_NIF_LOAD(name, priv) do { \
if (!name##_async_nif_coord) \
name##_async_nif_coord = enif_mutex_create(NULL); \
enif_mutex_lock(name##_async_nif_coord); \
priv = async_nif_load(); \
enif_mutex_unlock(name##_async_nif_coord); \
} while(0);
#define ASYNC_NIF_UNLOAD(name, env, priv) do { \
if (!name##_async_nif_coord) \
name##_async_nif_coord = enif_mutex_create(NULL); \
enif_mutex_lock(name##_async_nif_coord); \
async_nif_unload(env, priv); \
enif_mutex_unlock(name##_async_nif_coord); \
enif_mutex_destroy(name##_async_nif_coord); \
name##_async_nif_coord = NULL; \
} while(0);
#define ASYNC_NIF_UPGRADE(name, env) do { \
if (!name##_async_nif_coord) \
name##_async_nif_coord = enif_mutex_create(NULL); \
enif_mutex_lock(name##_async_nif_coord); \
async_nif_upgrade(env); \
enif_mutex_unlock(name##_async_nif_coord); \
} while(0);
#define ASYNC_NIF_RETURN_BADARG() return enif_make_badarg(env);
#define ASYNC_NIF_WORK_ENV new_env
#define ASYNC_NIF_REPLY(msg) enif_send(NULL, pid, env, enif_make_tuple2(env, ref, msg))
/**
* TODO:
*/
static inline unsigned int async_nif_str_hash_func(const char *s)
{
unsigned int h = (unsigned int)*s;
if (h) for (++s ; *s; ++s) h = (h << 5) - h + (unsigned int)*s;
return h;
}
/**
* TODO:
*/
static ERL_NIF_TERM
async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_entry *req, int hint)
{
/* Identify the most appropriate worker for this request. */
unsigned int qid = (hint != -1) ? hint : async_nif->next_q;
struct async_nif_work_queue *q = NULL;
do {
q = &async_nif->queues[qid];
enif_mutex_lock(q->reqs_mutex);
/* Now that we hold the lock, check for shutdown. As long as we
hold this lock either a) we're shutting down so exit now or
b) this queue will be valid until we release the lock. */
if (async_nif->shutdown) {
enif_mutex_unlock(q->reqs_mutex);
return 0;
}
if (fifo_q_full(reqs, q->reqs)) { // TODO: || (q->avg_latency > median_latency)
enif_mutex_unlock(q->reqs_mutex);
qid = (qid + 1) % async_nif->num_queues;
q = &async_nif->queues[qid];
} else {
break;
}
} while(1);
/* We hold the queue's lock, and we've seletect a reasonable queue for this
new request so add the request. */
fifo_q_put(reqs, q->reqs, req);
/* Build the term before releasing the lock so as not to race on the use of
the req pointer (which will soon become invalid in another thread
performing the request). */
ERL_NIF_TERM reply = enif_make_tuple2(req->env, enif_make_atom(req->env, "ok"),
enif_make_atom(req->env, "enqueued"));
enif_mutex_unlock(q->reqs_mutex);
enif_cond_signal(q->reqs_cnd);
return reply;
}
static void *
async_nif_worker_fn(void *arg)
{
struct async_nif_worker_entry *we = (struct async_nif_worker_entry *)arg;
unsigned int worker_id = we->worker_id;
struct async_nif_state *async_nif = we->async_nif;
struct async_nif_work_queue *q = we->q;
struct async_nif_req_entry *req = NULL;
for(;;) {
/* Examine the request queue, are there things to be done? */
enif_mutex_lock(q->reqs_mutex);
check_again_for_work:
if (async_nif->shutdown) {
enif_mutex_unlock(q->reqs_mutex);
break;
}
if (fifo_q_empty(reqs, q->reqs)) {
/* Queue is empty, wait for work */
enif_cond_wait(q->reqs_cnd, q->reqs_mutex);
goto check_again_for_work;
} else {
assert(fifo_q_size(reqs, q->reqs) > 0);
assert(fifo_q_size(reqs, q->reqs) < fifo_q_capacity(reqs, q->reqs));
/* At this point the next req is ours to process and we hold the
reqs_mutex lock. Take the request off the queue. */
req = fifo_q_get(reqs, q->reqs);
enif_mutex_unlock(q->reqs_mutex);
/* Ensure that there is at least one other worker thread watching this
queue. */
enif_cond_signal(q->reqs_cnd);
/* Perform the work. */
req->fn_work(req->env, req->ref, &req->pid, worker_id, req->args);
/* Now call the post-work cleanup function. */
req->fn_post(req->args);
/* Free resources allocated for this async request. */
enif_free_env(req->env);
enif_free(req->args);
enif_free(req);
req = NULL;
}
}
enif_thread_exit(0);
return 0;
}
static void
async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
{
unsigned int i;
unsigned int num_queues = async_nif->num_queues;
struct async_nif_work_queue *q = NULL;
/* Signal the worker threads, stop what you're doing and exit. To
ensure that we don't race with the enqueue() process we first
lock all the worker queues, then set shutdown to true, then
unlock. The enqueue function will take the queue mutex, then
test for shutdown condition, then enqueue only if not shutting
down. */
for (i = 0; i < num_queues; i++) {
q = &async_nif->queues[i];
enif_mutex_lock(q->reqs_mutex);
}
async_nif->shutdown = 1;
for (i = 0; i < num_queues; i++) {
q = &async_nif->queues[i];
enif_cond_broadcast(q->reqs_cnd);
enif_mutex_unlock(q->reqs_mutex);
}
/* Join for the now exiting worker threads. */
for (i = 0; i < async_nif->num_workers; ++i) {
void *exit_value = 0; /* We ignore the thread_join's exit value. */
enif_thread_join(async_nif->worker_entries[i].tid, &exit_value);
}
/* Cleanup requests, mutexes and conditions in each work queue. */
for (i = 0; i < num_queues; i++) {
q = &async_nif->queues[i];
/* Worker threads are stopped, now toss anything left in the queue. */
struct async_nif_req_entry *req = NULL;
fifo_q_foreach(reqs, q->reqs, req, {
enif_send(NULL, &req->pid, req->env,
enif_make_tuple2(req->env, enif_make_atom(req->env, "error"),
enif_make_atom(req->env, "shutdown")));
req->fn_post(req->args);
enif_free_env(req->env);
enif_free(req->args);
enif_free(req);
});
fifo_q_free(reqs, q->reqs);
enif_mutex_destroy(q->reqs_mutex);
enif_cond_destroy(q->reqs_cnd);
}
memset(async_nif, 0, sizeof(struct async_nif_state) + (sizeof(struct async_nif_work_queue) * async_nif->num_queues));
enif_free(async_nif);
}
static void *
async_nif_load()
{
static int has_init = 0;
unsigned int i, j, num_queues;
ErlNifSysInfo info;
struct async_nif_state *async_nif;
/* Don't init more than once. */
if (has_init) return 0;
else has_init = 1;
/* Find out how many schedulers there are. */
enif_system_info(&info, sizeof(ErlNifSysInfo));
/* Size the number of work queues according to schedulers. */
if (info.scheduler_threads > ASYNC_NIF_MAX_WORKERS / 2) {
num_queues = ASYNC_NIF_MAX_WORKERS / 2;
} else {
int remainder = ASYNC_NIF_MAX_WORKERS % info.scheduler_threads;
if (remainder != 0)
num_queues = info.scheduler_threads - remainder;
else
num_queues = info.scheduler_threads;
if (num_queues < 2)
num_queues = 2;
}
/* Init our portion of priv_data's module-specific state. */
async_nif = enif_alloc(sizeof(struct async_nif_state) +
sizeof(struct async_nif_work_queue) * num_queues);
if (!async_nif)
return NULL;
memset(async_nif, 0, sizeof(struct async_nif_state) +
sizeof(struct async_nif_work_queue) * num_queues);
async_nif->num_queues = num_queues;
async_nif->num_workers = ASYNC_NIF_MAX_WORKERS; // TODO: start with 2 per queue, then grow if needed
async_nif->next_q = 0;
async_nif->shutdown = 0;
for (i = 0; i < async_nif->num_queues; i++) {
struct async_nif_work_queue *q = &async_nif->queues[i];
q->reqs = fifo_q_new(reqs, ASYNC_NIF_WORKER_QUEUE_SIZE);
q->reqs_mutex = enif_mutex_create(NULL);
q->reqs_cnd = enif_cond_create(NULL);
}
/* Setup the thread pool management. */
memset(async_nif->worker_entries, 0, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS);
/* Start the worker threads. */
for (i = 0; i < async_nif->num_workers; i++) {
struct async_nif_worker_entry *we = &async_nif->worker_entries[i];
we->async_nif = async_nif;
we->worker_id = i;
we->q = &async_nif->queues[i % async_nif->num_queues];
if (enif_thread_create(NULL, &async_nif->worker_entries[i].tid,
&async_nif_worker_fn, (void*)we, NULL) != 0) {
async_nif->shutdown = 1;
for (j = 0; j < async_nif->num_queues; j++) {
struct async_nif_work_queue *q = &async_nif->queues[j];
enif_cond_broadcast(q->reqs_cnd);
}
while(i-- > 0) {
void *exit_value = 0; /* Ignore this. */
enif_thread_join(async_nif->worker_entries[i].tid, &exit_value);
}
for (j = 0; j < async_nif->num_queues; j++) {
struct async_nif_work_queue *q = &async_nif->queues[j];
enif_mutex_destroy(q->reqs_mutex);
enif_cond_destroy(q->reqs_cnd);
}
memset(async_nif->worker_entries, 0, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS);
enif_free(async_nif);
return NULL;
}
}
return async_nif;
}
static void
async_nif_upgrade(ErlNifEnv *env)
{
// TODO:
}
#if defined(__cplusplus)
}
#endif
#endif // __ASYNC_NIF_H__

View file

@ -1,40 +1,185 @@
#!/bin/bash
# /bin/sh on Solaris is not a POSIX compatible shell, but /usr/bin/ksh is.
if [ `uname -s` = 'SunOS' -a "${POSIX_SHELL}" != "true" ]; then
POSIX_SHELL="true"
export POSIX_SHELL
exec /usr/bin/ksh $0 $@
fi
unset POSIX_SHELL # clear it so if we invoke other scripts, they run as ksh as well
set -e
WT_REPO=http://github.com/wiredtiger/wiredtiger.git
WT_BRANCH=basho
WT_REMOTE_REPO=http://github.com/wiredtiger/wiredtiger.git
WT_VSN=""
WT_DIR=wiredtiger-$WT_BRANCH
SNAPPY_VSN="1.0.4"
SNAPPY_DIR=snappy-$SNAPPY_VSN
BZIP2_VSN="1.0.6"
BZIP2_DIR=bzip2-$BZIP2_VSN
[ `basename $PWD` != "c_src" ] && cd c_src
BASEDIR="$PWD"
export BASEDIR="$PWD"
case "$1" in
clean)
rm -rf system wiredtiger
;;
which gmake 1>/dev/null 2>/dev/null && MAKE=gmake
MAKE=${MAKE:-make}
*)
test -f system/lib/libwiredtiger.a && exit 0
export CFLAGS="$CFLAGS -g -I $BASEDIR/system/include"
export CXXFLAGS="$CXXFLAGS -I $BASEDIR/system/include"
export LDFLAGS="$LDFLAGS -L$BASEDIR/system/lib"
export LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$BASEDIR/system/lib:$LD_LIBRARY_PATH"
if [ -d wiredtiger/.git ]; then
(cd wiredtiger && \
git fetch && \
git merge origin/$WT_BRANCH)
get_wt ()
{
if [ -d $BASEDIR/$WT_DIR/.git ]; then
(cd $BASEDIR/$WT_DIR && git pull -u) || exit 1
else
git clone -b $WT_BRANCH --single-branch $WT_REMOTE_REPO && \
(cd wiredtiger && \
patch -p1 < ../wiredtiger-extension-link.patch && \
./autogen.sh)
if [ "X$WT_VSN" == "X" ]; then
git clone ${WT_REPO} && \
(cd $BASEDIR/wiredtiger && git checkout $WT_VSN || exit 1)
else
git clone -b ${WT_BRANCH} --single-branch ${WT_REPO} && \
(cd $BASEDIR/wiredtiger && git checkout -b $WT_BRANCH origin/$WT_BRANCH || exit 1)
fi
(cd wiredtiger/build_posix && \
mv wiredtiger $WT_DIR || exit 1
fi
[ -d $BASEDIR/$WT_DIR ] || (echo "Missing WiredTiger source directory" && exit 1)
(cd $BASEDIR/$WT_DIR && git cherry-pick a3c8c2a13758ae9c44edabcc1a780984a7882904 || exit 1)
(cd $BASEDIR/$WT_DIR
[ -e $BASEDIR/wiredtiger-build.patch ] && \
(patch -p1 --forward < $BASEDIR/wiredtiger-build.patch || exit 1 )
./autogen.sh || exit 1
cd ./build_posix || exit 1
[ -e Makefile ] && $MAKE distclean
../configure --with-pic \
--enable-snappy \
--enable-bzip2 \
--prefix=$BASEDIR/system && \
make -j && make install)
[ -d $BASEDIR/../priv ] || mkdir $BASEDIR/../priv
cp $BASEDIR/system/bin/wt $BASEDIR/../priv
cp $BASEDIR/system/lib/*.so $BASEDIR/../priv
--prefix=${BASEDIR}/system || exit 1
)
}
get_snappy ()
{
[ -e snappy-$SNAPPY_VSN.tar.gz ] || (echo "Missing Snappy ($SNAPPY_VSN) source package" && exit 1)
[ -d $BASEDIR/$SNAPPY_DIR ] || tar -xzf snappy-$SNAPPY_VSN.tar.gz
[ -e $BASEDIR/snappy-build.patch ] && \
(cd $BASEDIR/$SNAPPY_DIR
patch -p1 --forward < $BASEDIR/snappy-build.patch || exit 1)
(cd $BASEDIR/$SNAPPY_DIR
./configure --with-pic --prefix=$BASEDIR/system || exit 1)
}
get_bzip2 ()
{
[ -e bzip2-$BZIP2_VSN.tar.gz ] || (echo "Missing bzip2 ($BZIP2_VSN) source package" && exit 1)
[ -d $BASEDIR/$BZIP2_DIR ] || tar -xzf bzip2-$BZIP2_VSN.tar.gz
[ -e $BASEDIR/bzip2-build.patch ] && \
(cd $BASEDIR/$BZIP2_DIR
patch -p1 --forward < $BASEDIR/bzip2-build.patch || exit 1)
}
get_deps ()
{
get_wt;
get_snappy;
get_bzip2;
}
update_deps ()
{
if [ -d $BASEDIR/$WT_DIR/.git ]; then
(cd $BASEDIR/$WT_DIR
if [ "X$WT_VSN" == "X" ]; then
git pull -u || exit 1
else
git checkout $WT_VSN || exit 1
fi
)
fi
}
build_wt ()
{
(cd $BASEDIR/$WT_DIR/build_posix && \
$MAKE -j && $MAKE install)
}
build_snappy ()
{
(cd $BASEDIR/$SNAPPY_DIR && \
$MAKE -j && \
$MAKE install
)
}
build_bzip2 ()
{
(cd $BASEDIR/$BZIP2_DIR && \
$MAKE -j -f Makefile-libbz2_so && \
mkdir -p $BASEDIR/system/lib && \
cp -f bzlib.h $BASEDIR/system/include && \
cp -f libbz2.so.1.0.6 $BASEDIR/system/lib && \
ln -s $BASEDIR/system/lib/libbz2.so.1.0.6 $BASEDIR/system/lib/libbz2.so && \
ln -s $BASEDIR/system/lib/libbz2.so.1.0.6 $BASEDIR/system/lib/libbz2-1.so && \
ln -s $BASEDIR/system/lib/libbz2.so.1.0.6 $BASEDIR/system/lib/libbz2-1.0.so
)
}
case "$1" in
clean)
rm -rf system $WT_DIR $SNAPPY_DIR $BZIP2_DIR
rm -f ${BASEDIR}/../priv/wt
rm -f ${BASEDIR}/../priv/libwiredtiger-*.so
rm -f ${BASEDIR}/../priv/libwiredtiger_*.so
rm -f ${BASEDIR}/../priv/libbz2.so.*
rm -f ${BASEDIR}/../priv/libsnappy.so.*
;;
test)
(cd $BASEDIR/$WT_DIR && $MAKE -j test)
;;
update-deps)
update-deps;
;;
get-deps)
get_deps;
;;
*)
[ -d $WT_DIR ] || get_wt;
[ -d $SNAPPY_DIR ] || get_snappy;
[ -d $BZIP2_DIR ] || get_bzip2;
# Build Snappy
[ -d $BASEDIR/$SNAPPY_DIR ] || (echo "Missing Snappy source directory" && exit 1)
test -f $BASEDIR/system/lib/libsnappy.so.[0-9].[0-9].[0-9] || build_snappy;
# Build BZIP2
[ -d $BASEDIR/$BZIP2_DIR ] || (echo "Missing BZip2 source directory" && exit 1)
test -f $BASEDIR/system/lib/libbz2.so.[0-9].[0-9].[0-9] || build_bzip2;
# Build WiredTiger
[ -d $BASEDIR/$WT_DIR ] || (echo "Missing WiredTiger source directory" && exit 1)
test -f $BASEDIR/system/lib/libwiredtiger-[0-9].[0-9].[0-9].so \
-a -f $BASEDIR/system/lib/libwiredtiger_snappy.so \
-a -f $BASEDIR/system/lib/libwiredtiger_bzip2.so.[0-9].[0-9].[0-9] || build_wt;
[ -d $BASEDIR/../priv ] || mkdir ${BASEDIR}/../priv
cp -p -P $BASEDIR/system/bin/wt ${BASEDIR}/../priv
cp -p -P $BASEDIR/system/lib/libwiredtiger-[0-9].[0-9].[0-9].so ${BASEDIR}/../priv
cp -p -P $BASEDIR/system/lib/libwiredtiger_snappy.so ${BASEDIR}/../priv
cp -p -P $BASEDIR/system/lib/libwiredtiger_bzip2.so* ${BASEDIR}/../priv
cp -p -P $BASEDIR/system/lib/libbz2.so.[0-9].[0-9].[0-9] ${BASEDIR}/../priv
(cd ${BASEDIR}/../priv
[ -L libbz2.so ] || ln -s libbz2.so.1.0.6 libbz2.so
[ -L libbz2.so.1 ] || ln -s libbz2.so.1.0.6 libbz2.so.1
[ -L libbz2.so.1.0 ] || ln -s libbz2.so.1.0.6 libbz2.so.1.0)
cp -p -P $BASEDIR/system/lib/libsnappy.so* ${BASEDIR}/../priv
;;
esac

BIN
c_src/bzip2-1.0.6.tar.gz Normal file

Binary file not shown.

0
c_src/bzip2-build.patch Normal file
View file

102
c_src/fifo_q.h Normal file
View file

@ -0,0 +1,102 @@
/*
* fifo_q: a macro-based implementation of a FIFO Queue
*
* Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved.
* Author: Gregory Burd <greg@basho.com> <greg@burd.me>
*
* This file is provided to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain
* a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef __FIFO_Q_H__
#define __FIFO_Q_H__
#if defined(__cplusplus)
extern "C" {
#endif
#define FIFO_QUEUE_TYPE(name) \
struct fifo_q__ ## name *
#define DECL_FIFO_QUEUE(name, type) \
struct fifo_q__ ## name { \
unsigned int h, t, s; \
type *items[]; \
}; \
static struct fifo_q__ ## name *fifo_q_ ## name ## _new(unsigned int n) { \
int sz = sizeof(struct fifo_q__ ## name) + ((n+1) * sizeof(type *));\
struct fifo_q__ ## name *q = enif_alloc(sz); \
if (!q) \
return 0; \
memset(q, 0, sz); \
q->s = n + 1; \
return q; \
} \
static inline void fifo_q_ ## name ## _free(struct fifo_q__ ## name *q) { \
memset(q, 0, sizeof(struct fifo_q__ ## name) + (q->s * sizeof(type *))); \
enif_free(q); \
} \
static inline type *fifo_q_ ## name ## _put(struct fifo_q__ ## name *q, type *n) { \
q->items[q->h] = n; \
q->h = (q->h + 1) % q->s; \
return n; \
} \
static inline type *fifo_q_ ## name ## _get(struct fifo_q__ ## name *q) { \
type *n = q->items[q->t]; \
q->items[q->t] = 0; \
q->t = (q->t + 1) % q->s; \
return n; \
} \
static inline unsigned int fifo_q_ ## name ## _size(struct fifo_q__ ## name *q) { \
return (q->h - q->t + q->s) % q->s; \
} \
static inline unsigned int fifo_q_ ## name ## _capacity(struct fifo_q__ ## name *q) { \
return q->s - 1; \
} \
static inline int fifo_q_ ## name ## _empty(struct fifo_q__ ## name *q) { \
return (q->t == q->h); \
} \
static inline int fifo_q_ ## name ## _full(struct fifo_q__ ## name *q) { \
return ((q->h + 1) % q->s) == q->t; \
}
#define fifo_q_new(name, size) fifo_q_ ## name ## _new(size)
#define fifo_q_free(name, queue) fifo_q_ ## name ## _free(queue)
#define fifo_q_get(name, queue) fifo_q_ ## name ## _get(queue)
#define fifo_q_put(name, queue, item) fifo_q_ ## name ## _put(queue, item)
#define fifo_q_size(name, queue) fifo_q_ ## name ## _size(queue)
#define fifo_q_capacity(name, queue) fifo_q_ ## name ## _capacity(queue)
#define fifo_q_empty(name, queue) fifo_q_ ## name ## _empty(queue)
#define fifo_q_full(name, queue) fifo_q_ ## name ## _full(queue)
#define fifo_q_foreach(name, queue, item, task) do { \
while(!fifo_q_ ## name ## _empty(queue)) { \
item = fifo_q_ ## name ## _get(queue); \
do task while(0); \
} \
} while(0);
struct async_nif_req_entry {
ERL_NIF_TERM ref;
ErlNifEnv *env;
ErlNifPid pid;
void *args;
void (*fn_work)(ErlNifEnv*, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *);
void (*fn_post)(void *);
};
#if defined(__cplusplus)
}
#endif
#endif // __FIFO_Q_H__

643
c_src/khash.h Normal file
View file

@ -0,0 +1,643 @@
/* The MIT License
Copyright (c) 2008, 2009, 2011 by Attractive Chaos <attractor@live.co.uk>
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:
The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
/*
An example:
#include "khash.h"
KHASH_MAP_INIT_INT(32, char)
int main() {
int ret, is_missing;
khiter_t k;
khash_t(32) *h = kh_init(32);
k = kh_put(32, h, 5, &ret);
kh_value(h, k) = 10;
k = kh_get(32, h, 10);
is_missing = (k == kh_end(h));
k = kh_get(32, h, 5);
kh_del(32, h, k);
for (k = kh_begin(h); k != kh_end(h); ++k)
if (kh_exist(h, k)) kh_value(h, k) = 1;
kh_destroy(32, h);
return 0;
}
*/
/*
2011-12-29 (0.2.7):
* Minor code clean up; no actual effect.
2011-09-16 (0.2.6):
* The capacity is a power of 2. This seems to dramatically improve the
speed for simple keys. Thank Zilong Tan for the suggestion. Reference:
- http://code.google.com/p/ulib/
- http://nothings.org/computer/judy/
* Allow to optionally use linear probing which usually has better
performance for random input. Double hashing is still the default as it
is more robust to certain non-random input.
* Added Wang's integer hash function (not used by default). This hash
function is more robust to certain non-random input.
2011-02-14 (0.2.5):
* Allow to declare global functions.
2009-09-26 (0.2.4):
* Improve portability
2008-09-19 (0.2.3):
* Corrected the example
* Improved interfaces
2008-09-11 (0.2.2):
* Improved speed a little in kh_put()
2008-09-10 (0.2.1):
* Added kh_clear()
* Fixed a compiling error
2008-09-02 (0.2.0):
* Changed to token concatenation which increases flexibility.
2008-08-31 (0.1.2):
* Fixed a bug in kh_get(), which has not been tested previously.
2008-08-31 (0.1.1):
* Added destructor
*/
#ifndef __AC_KHASH_H
#define __AC_KHASH_H
/*!
@header
Generic hash table library.
*/
#define AC_VERSION_KHASH_H "0.2.6"
#include <stdlib.h>
#include <string.h>
#include <limits.h>
/* compiler specific configuration */
#if UINT_MAX == 0xffffffffu
typedef unsigned int khint32_t;
#elif ULONG_MAX == 0xffffffffu
typedef unsigned long khint32_t;
#endif
#if ULONG_MAX == ULLONG_MAX
typedef unsigned long khint64_t;
#else
typedef unsigned long long khint64_t;
#endif
#ifdef _MSC_VER
#define kh_inline __inline
#else
#define kh_inline inline
#endif
typedef khint32_t khint_t;
typedef khint_t khiter_t;
#define __ac_isempty(flag, i) ((flag[i>>4]>>((i&0xfU)<<1))&2)
#define __ac_isdel(flag, i) ((flag[i>>4]>>((i&0xfU)<<1))&1)
#define __ac_iseither(flag, i) ((flag[i>>4]>>((i&0xfU)<<1))&3)
#define __ac_set_isdel_false(flag, i) (flag[i>>4]&=~(1ul<<((i&0xfU)<<1)))
#define __ac_set_isempty_false(flag, i) (flag[i>>4]&=~(2ul<<((i&0xfU)<<1)))
#define __ac_set_isboth_false(flag, i) (flag[i>>4]&=~(3ul<<((i&0xfU)<<1)))
#define __ac_set_isdel_true(flag, i) (flag[i>>4]|=1ul<<((i&0xfU)<<1))
#ifdef KHASH_LINEAR
#define __ac_inc(k, m) 1
#else
#define __ac_inc(k, m) (((k)>>3 ^ (k)<<3) | 1) & (m)
#endif
#define __ac_fsize(m) ((m) < 16? 1 : (m)>>4)
#ifndef kroundup32
#define kroundup32(x) (--(x), (x)|=(x)>>1, (x)|=(x)>>2, (x)|=(x)>>4, (x)|=(x)>>8, (x)|=(x)>>16, ++(x))
#endif
#ifndef kcalloc
#define kcalloc(N,Z) calloc(N,Z)
#endif
#ifndef kmalloc
#define kmalloc(Z) malloc(Z)
#endif
#ifndef krealloc
#define krealloc(P,Z) realloc(P,Z)
#endif
#ifndef kfree
#define kfree(P) free(P)
#endif
static const double __ac_HASH_UPPER = 0.77;
#define __KHASH_TYPE(name, khkey_t, khval_t) \
typedef struct { \
khint_t n_buckets, size, n_occupied, upper_bound; \
khint32_t *flags; \
khkey_t *keys; \
khval_t *vals; \
} kh_##name##_t;
#define __KHASH_PROTOTYPES(name, khkey_t, khval_t) \
extern kh_##name##_t *kh_init_##name(void); \
extern void kh_destroy_##name(kh_##name##_t *h); \
extern void kh_clear_##name(kh_##name##_t *h); \
extern khint_t kh_get_##name(const kh_##name##_t *h, khkey_t key); \
extern int kh_resize_##name(kh_##name##_t *h, khint_t new_n_buckets); \
extern khint_t kh_put_##name(kh_##name##_t *h, khkey_t key, int *ret); \
extern void kh_del_##name(kh_##name##_t *h, khint_t x);
#define __KHASH_IMPL(name, SCOPE, khkey_t, khval_t, kh_is_map, __hash_func, __hash_equal) \
SCOPE kh_##name##_t *kh_init_##name(void) { \
return (kh_##name##_t*)kcalloc(1, sizeof(kh_##name##_t)); \
} \
SCOPE void kh_destroy_##name(kh_##name##_t *h) \
{ \
if (h) { \
kfree((void *)h->keys); kfree(h->flags); \
kfree((void *)h->vals); \
kfree(h); \
} \
} \
SCOPE void kh_clear_##name(kh_##name##_t *h) \
{ \
if (h && h->flags) { \
memset(h->flags, 0xaa, __ac_fsize(h->n_buckets) * sizeof(khint32_t)); \
h->size = h->n_occupied = 0; \
} \
} \
SCOPE khint_t kh_get_##name(const kh_##name##_t *h, khkey_t key) \
{ \
if (h->n_buckets) { \
khint_t inc, k, i, last, mask; \
mask = h->n_buckets - 1; \
k = __hash_func(key); i = k & mask; \
inc = __ac_inc(k, mask); last = i; /* inc==1 for linear probing */ \
while (!__ac_isempty(h->flags, i) && (__ac_isdel(h->flags, i) || !__hash_equal(h->keys[i], key))) { \
i = (i + inc) & mask; \
if (i == last) return h->n_buckets; \
} \
return __ac_iseither(h->flags, i)? h->n_buckets : i; \
} else return 0; \
} \
SCOPE int kh_resize_##name(kh_##name##_t *h, khint_t new_n_buckets) \
{ /* This function uses 0.25*n_buckets bytes of working space instead of [sizeof(key_t+val_t)+.25]*n_buckets. */ \
khint32_t *new_flags = 0; \
khint_t j = 1; \
{ \
kroundup32(new_n_buckets); \
if (new_n_buckets < 4) new_n_buckets = 4; \
if (h->size >= (khint_t)(new_n_buckets * __ac_HASH_UPPER + 0.5)) j = 0; /* requested size is too small */ \
else { /* hash table size to be changed (shrink or expand); rehash */ \
new_flags = (khint32_t*)kmalloc(__ac_fsize(new_n_buckets) * sizeof(khint32_t)); \
if (!new_flags) return -1; \
memset(new_flags, 0xaa, __ac_fsize(new_n_buckets) * sizeof(khint32_t)); \
if (h->n_buckets < new_n_buckets) { /* expand */ \
khkey_t *new_keys = (khkey_t*)krealloc((void *)h->keys, new_n_buckets * sizeof(khkey_t)); \
if (!new_keys) return -1; \
h->keys = new_keys; \
if (kh_is_map) { \
khval_t *new_vals = (khval_t*)krealloc((void *)h->vals, new_n_buckets * sizeof(khval_t)); \
if (!new_vals) return -1; \
h->vals = new_vals; \
} \
} /* otherwise shrink */ \
} \
} \
if (j) { /* rehashing is needed */ \
for (j = 0; j != h->n_buckets; ++j) { \
if (__ac_iseither(h->flags, j) == 0) { \
khkey_t key = h->keys[j]; \
khval_t val; \
khint_t new_mask; \
new_mask = new_n_buckets - 1; \
if (kh_is_map) val = h->vals[j]; \
__ac_set_isdel_true(h->flags, j); \
while (1) { /* kick-out process; sort of like in Cuckoo hashing */ \
khint_t inc, k, i; \
k = __hash_func(key); \
i = k & new_mask; \
inc = __ac_inc(k, new_mask); \
while (!__ac_isempty(new_flags, i)) i = (i + inc) & new_mask; \
__ac_set_isempty_false(new_flags, i); \
if (i < h->n_buckets && __ac_iseither(h->flags, i) == 0) { /* kick out the existing element */ \
{ khkey_t tmp = h->keys[i]; h->keys[i] = key; key = tmp; } \
if (kh_is_map) { khval_t tmp = h->vals[i]; h->vals[i] = val; val = tmp; } \
__ac_set_isdel_true(h->flags, i); /* mark it as deleted in the old hash table */ \
} else { /* write the element and jump out of the loop */ \
h->keys[i] = key; \
if (kh_is_map) h->vals[i] = val; \
break; \
} \
} \
} \
} \
if (h->n_buckets > new_n_buckets) { /* shrink the hash table */ \
h->keys = (khkey_t*)krealloc((void *)h->keys, new_n_buckets * sizeof(khkey_t)); \
if (kh_is_map) h->vals = (khval_t*)krealloc((void *)h->vals, new_n_buckets * sizeof(khval_t)); \
} \
kfree(h->flags); /* free the working space */ \
h->flags = new_flags; \
h->n_buckets = new_n_buckets; \
h->n_occupied = h->size; \
h->upper_bound = (khint_t)(h->n_buckets * __ac_HASH_UPPER + 0.5); \
} \
return 0; \
} \
SCOPE khint_t kh_put_##name(kh_##name##_t *h, khkey_t key, int *ret) \
{ \
khint_t x; \
if (h->n_occupied >= h->upper_bound) { /* update the hash table */ \
if (h->n_buckets > (h->size<<1)) { \
if (kh_resize_##name(h, h->n_buckets - 1) < 0) { /* clear "deleted" elements */ \
*ret = -1; return h->n_buckets; \
} \
} else if (kh_resize_##name(h, h->n_buckets + 1) < 0) { /* expand the hash table */ \
*ret = -1; return h->n_buckets; \
} \
} /* TODO: to implement automatically shrinking; resize() already support shrinking */ \
{ \
khint_t inc, k, i, site, last, mask = h->n_buckets - 1; \
x = site = h->n_buckets; k = __hash_func(key); i = k & mask; \
if (__ac_isempty(h->flags, i)) x = i; /* for speed up */ \
else { \
inc = __ac_inc(k, mask); last = i; \
while (!__ac_isempty(h->flags, i) && (__ac_isdel(h->flags, i) || !__hash_equal(h->keys[i], key))) { \
if (__ac_isdel(h->flags, i)) site = i; \
i = (i + inc) & mask; \
if (i == last) { x = site; break; } \
} \
if (x == h->n_buckets) { \
if (__ac_isempty(h->flags, i) && site != h->n_buckets) x = site; \
else x = i; \
} \
} \
} \
if (__ac_isempty(h->flags, x)) { /* not present at all */ \
h->keys[x] = key; \
__ac_set_isboth_false(h->flags, x); \
++h->size; ++h->n_occupied; \
*ret = 1; \
} else if (__ac_isdel(h->flags, x)) { /* deleted */ \
h->keys[x] = key; \
__ac_set_isboth_false(h->flags, x); \
++h->size; \
*ret = 2; \
} else *ret = 0; /* Don't touch h->keys[x] if present and not deleted */ \
return x; \
} \
SCOPE void kh_del_##name(kh_##name##_t *h, khint_t x) \
{ \
if (x != h->n_buckets && !__ac_iseither(h->flags, x)) { \
__ac_set_isdel_true(h->flags, x); \
--h->size; \
} \
}
#define KHASH_DECLARE(name, khkey_t, khval_t) \
__KHASH_TYPE(name, khkey_t, khval_t) \
__KHASH_PROTOTYPES(name, khkey_t, khval_t)
#define KHASH_INIT2(name, SCOPE, khkey_t, khval_t, kh_is_map, __hash_func, __hash_equal) \
__KHASH_TYPE(name, khkey_t, khval_t) \
__KHASH_IMPL(name, SCOPE, khkey_t, khval_t, kh_is_map, __hash_func, __hash_equal)
#define KHASH_INIT(name, khkey_t, khval_t, kh_is_map, __hash_func, __hash_equal) \
KHASH_INIT2(name, static kh_inline, khkey_t, khval_t, kh_is_map, __hash_func, __hash_equal)
/* --- BEGIN OF HASH FUNCTIONS --- */
/*! @function
@abstract Integer hash function
@param key The integer [khint32_t]
@return The hash value [khint_t]
*/
#define kh_int_hash_func(key) (khint32_t)(key)
/*! @function
@abstract Integer comparison function
*/
#define kh_int_hash_equal(a, b) ((a) == (b))
/*! @function
@abstract 64-bit integer hash function
@param key The integer [khint64_t]
@return The hash value [khint_t]
*/
#define kh_int64_hash_func(key) (khint32_t)((key)>>33^(key)^(key)<<11)
/*! @function
@abstract 64-bit integer comparison function
*/
#define kh_int64_hash_equal(a, b) ((a) == (b))
/*! @function
@abstract Pointer hash function
@param key The integer void *
@return The hash value [khint_t]
*/
#define kh_ptr_hash_func(key) (khint32_t)(key)
/*! @function
@abstract Pointer comparison function
*/
#define kh_ptr_hash_equal(a, b) ((a) == (b))
/*! @function
@abstract 64-bit pointer hash function
@param key The integer void *
@return The hash value [khint_t]
*/
#define kh_ptr64_hash_func(key) (khint32_t)(((khint64_t)key)>>33^((khint64_t)key)^((khint64_t)key)<<11)
/*! @function
@abstract 64-bit pointer comparison function
*/
#define kh_ptr64_hash_equal(a, b) ((a) == (b))
/*! @function
@abstract const char* hash function
@param s Pointer to a null terminated string
@return The hash value
*/
static kh_inline khint_t __ac_X31_hash_string(const char *s)
{
khint_t h = (khint_t)*s;
if (h) for (++s ; *s; ++s) h = (h << 5) - h + (khint_t)*s;
return h;
}
/*! @function
@abstract Another interface to const char* hash function
@param key Pointer to a null terminated string [const char*]
@return The hash value [khint_t]
*/
#define kh_str_hash_func(key) __ac_X31_hash_string(key)
/*! @function
@abstract Const char* comparison function
*/
#define kh_str_hash_equal(a, b) (strcmp(a, b) == 0)
static kh_inline khint_t __ac_Wang_hash(khint_t key)
{
key += ~(key << 15);
key ^= (key >> 10);
key += (key << 3);
key ^= (key >> 6);
key += ~(key << 11);
key ^= (key >> 16);
return key;
}
#define kh_int_hash_func2(k) __ac_Wang_hash((khint_t)key)
/* --- END OF HASH FUNCTIONS --- */
/* Other convenient macros... */
/*!
@abstract Type of the hash table.
@param name Name of the hash table [symbol]
*/
#define khash_t(name) kh_##name##_t
/*! @function
@abstract Initiate a hash table.
@param name Name of the hash table [symbol]
@return Pointer to the hash table [khash_t(name)*]
*/
#define kh_init(name) kh_init_##name()
/*! @function
@abstract Destroy a hash table.
@param name Name of the hash table [symbol]
@param h Pointer to the hash table [khash_t(name)*]
*/
#define kh_destroy(name, h) kh_destroy_##name(h)
/*! @function
@abstract Reset a hash table without deallocating memory.
@param name Name of the hash table [symbol]
@param h Pointer to the hash table [khash_t(name)*]
*/
#define kh_clear(name, h) kh_clear_##name(h)
/*! @function
@abstract Resize a hash table.
@param name Name of the hash table [symbol]
@param h Pointer to the hash table [khash_t(name)*]
@param s New size [khint_t]
*/
#define kh_resize(name, h, s) kh_resize_##name(h, s)
/*! @function
@abstract Insert a key to the hash table.
@param name Name of the hash table [symbol]
@param h Pointer to the hash table [khash_t(name)*]
@param k Key [type of keys]
@param r Extra return code: 0 if the key is present in the hash table;
1 if the bucket is empty (never used); 2 if the element in
the bucket has been deleted [int*]
@return Iterator to the inserted element [khint_t]
*/
#define kh_put(name, h, k, r) kh_put_##name(h, k, r)
/*! @function
@abstract Retrieve a key from the hash table.
@param name Name of the hash table [symbol]
@param h Pointer to the hash table [khash_t(name)*]
@param k Key [type of keys]
@return Iterator to the found element, or kh_end(h) if the element is absent [khint_t]
*/
#define kh_get(name, h, k) kh_get_##name(h, k)
/*! @function
@abstract Remove a key from the hash table.
@param name Name of the hash table [symbol]
@param h Pointer to the hash table [khash_t(name)*]
@param k Iterator to the element to be deleted [khint_t]
*/
#define kh_del(name, h, k) kh_del_##name(h, k)
/*! @function
@abstract Test whether a bucket contains data.
@param h Pointer to the hash table [khash_t(name)*]
@param x Iterator to the bucket [khint_t]
@return 1 if containing data; 0 otherwise [int]
*/
#define kh_exist(h, x) (!__ac_iseither((h)->flags, (x)))
/*! @function
@abstract Get key given an iterator
@param h Pointer to the hash table [khash_t(name)*]
@param x Iterator to the bucket [khint_t]
@return Key [type of keys]
*/
#define kh_key(h, x) ((h)->keys[x])
/*! @function
@abstract Get value given an iterator
@param h Pointer to the hash table [khash_t(name)*]
@param x Iterator to the bucket [khint_t]
@return Value [type of values]
@discussion For hash sets, calling this results in segfault.
*/
#define kh_val(h, x) ((h)->vals[x])
/*! @function
@abstract Alias of kh_val()
*/
#define kh_value(h, x) ((h)->vals[x])
/*! @function
@abstract Get the start iterator
@param h Pointer to the hash table [khash_t(name)*]
@return The start iterator [khint_t]
*/
#define kh_begin(h) (khint_t)(0)
/*! @function
@abstract Get the end iterator
@param h Pointer to the hash table [khash_t(name)*]
@return The end iterator [khint_t]
*/
#define kh_end(h) ((h)->n_buckets)
/*! @function
@abstract Get the number of elements in the hash table
@param h Pointer to the hash table [khash_t(name)*]
@return Number of elements in the hash table [khint_t]
*/
#define kh_size(h) ((h)->size)
/*! @function
@abstract Get the number of buckets in the hash table
@param h Pointer to the hash table [khash_t(name)*]
@return Number of buckets in the hash table [khint_t]
*/
#define kh_n_buckets(h) ((h)->n_buckets)
/*! @function
@abstract Iterate over the entries in the hash table
@param h Pointer to the hash table [khash_t(name)*]
@param kvar Variable to which key will be assigned
@param vvar Variable to which value will be assigned
@param code Block of code to execute
*/
#define kh_foreach(h, kvar, vvar, code) { khint_t __i; \
for (__i = kh_begin(h); __i != kh_end(h); ++__i) { \
if (!kh_exist(h,__i)) continue; \
(kvar) = kh_key(h,__i); \
(vvar) = kh_val(h,__i); \
code; \
} }
/*! @function
@abstract Iterate over the values in the hash table
@param h Pointer to the hash table [khash_t(name)*]
@param vvar Variable to which value will be assigned
@param code Block of code to execute
*/
#define kh_foreach_value(h, vvar, code) { khint_t __i; \
for (__i = kh_begin(h); __i != kh_end(h); ++__i) { \
if (!kh_exist(h,__i)) continue; \
(vvar) = kh_val(h,__i); \
code; \
} }
/* More conenient interfaces */
/*! @function
@abstract Instantiate a hash map containing (void *) keys
@param name Name of the hash table [symbol]
@param khval_t Type of values [type]
*/
#ifdef __x86_64__
#define KHASH_MAP_INIT_PTR(name, khval_t) \
KHASH_INIT(name, void*, khval_t, 1, kh_ptr64_hash_func, kh_ptr64_hash_equal)
#else
#define KHASH_MAP_INIT_PTR(name, khval_t) \
KHASH_INIT(name, void*, khval_t, 1, kh_ptr_hash_func, kh_ptr_hash_equal)
#endif
/*! @function
@abstract Instantiate a hash set containing integer keys
@param name Name of the hash table [symbol]
*/
#define KHASH_SET_INIT_INT(name) \
KHASH_INIT(name, khint32_t, char, 0, kh_int_hash_func, kh_int_hash_equal)
/*! @function
@abstract Instantiate a hash map containing integer keys
@param name Name of the hash table [symbol]
@param khval_t Type of values [type]
*/
#define KHASH_MAP_INIT_INT(name, khval_t) \
KHASH_INIT(name, khint32_t, khval_t, 1, kh_int_hash_func, kh_int_hash_equal)
/*! @function
@abstract Instantiate a hash map containing 64-bit integer keys
@param name Name of the hash table [symbol]
*/
#define KHASH_SET_INIT_INT64(name) \
KHASH_INIT(name, khint64_t, char, 0, kh_int64_hash_func, kh_int64_hash_equal)
/*! @function
@abstract Instantiate a hash map containing 64-bit integer keys
@param name Name of the hash table [symbol]
@param khval_t Type of values [type]
*/
#define KHASH_MAP_INIT_INT64(name, khval_t) \
KHASH_INIT(name, khint64_t, khval_t, 1, kh_int64_hash_func, kh_int64_hash_equal)
typedef const char *kh_cstr_t;
/*! @function
@abstract Instantiate a hash map containing const char* keys
@param name Name of the hash table [symbol]
*/
#define KHASH_SET_INIT_STR(name) \
KHASH_INIT(name, kh_cstr_t, char, 0, kh_str_hash_func, kh_str_hash_equal)
/*! @function
@abstract Instantiate a hash map containing const char* keys
@param name Name of the hash table [symbol]
@param khval_t Type of values [type]
*/
#define KHASH_MAP_INIT_STR(name, khval_t) \
KHASH_INIT(name, kh_cstr_t, khval_t, 1, kh_str_hash_func, kh_str_hash_equal)
#endif /* __AC_KHASH_H */

BIN
c_src/snappy-1.0.4.tar.gz Normal file

Binary file not shown.

View file

@ -1,22 +1,24 @@
diff --git a/ext/compressors/bzip2/Makefile.am b/ext/compressors/bzip2/Makefile.am
index 0aedc2e..1cc4cf6 100644
index 0aedc2e..a70ae2e 100644
--- a/ext/compressors/bzip2/Makefile.am
+++ b/ext/compressors/bzip2/Makefile.am
@@ -2,5 +2,5 @@ AM_CPPFLAGS = -I$(top_builddir) -I$(top_srcdir)/src/include
@@ -2,5 +2,6 @@ AM_CPPFLAGS = -I$(top_builddir) -I$(top_srcdir)/src/include
lib_LTLIBRARIES = libwiredtiger_bzip2.la
libwiredtiger_bzip2_la_SOURCES = bzip2_compress.c
-libwiredtiger_bzip2_la_LDFLAGS = -avoid-version -module
+libwiredtiger_bzip2_la_LDFLAGS = -avoid-version -module -Wl,-rpath,lib/wterl/priv:priv:/usr/local/lib
+libwiredtiger_snappy_la_CFLAGS = -I$(src_builddir)/../../system/include
+libwiredtiger_snappy_la_LDFLAGS = -avoid-version -module -L$(src_builddir)/../../system/lib -Wl,-rpath,lib/wterl-0.9.0/priv:lib/wterl/priv:priv
libwiredtiger_bzip2_la_LIBADD = -lbz2
diff --git a/ext/compressors/snappy/Makefile.am b/ext/compressors/snappy/Makefile.am
index 6d78823..7d35777 100644
index 6d78823..2122cf8 100644
--- a/ext/compressors/snappy/Makefile.am
+++ b/ext/compressors/snappy/Makefile.am
@@ -2,5 +2,5 @@ AM_CPPFLAGS = -I$(top_builddir) -I$(top_srcdir)/src/include
@@ -2,5 +2,6 @@ AM_CPPFLAGS = -I$(top_builddir) -I$(top_srcdir)/src/include
lib_LTLIBRARIES = libwiredtiger_snappy.la
libwiredtiger_snappy_la_SOURCES = snappy_compress.c
-libwiredtiger_snappy_la_LDFLAGS = -avoid-version -module
+libwiredtiger_snappy_la_LDFLAGS = -avoid-version -module -Wl,-rpath,lib/wterl/priv:priv:/usr/local/lib
+libwiredtiger_snappy_la_CFLAGS = -I$(src_builddir)/../../system/include
+libwiredtiger_snappy_la_LDFLAGS = -avoid-version -module -L$(src_builddir)/../../system/lib -Wl,-rpath,lib/wterl-0.9.0/priv:lib/wterl/priv:priv
libwiredtiger_snappy_la_LIBADD = -lsnappy

File diff suppressed because it is too large Load diff

View file

@ -37,9 +37,8 @@
{port_env, [
{"DRV_CFLAGS", "$DRV_CFLAGS -Werror -I c_src/system/include"},
{"DRV_LDFLAGS", "$DRV_LDFLAGS -Wl,-rpath,lib/wterl/priv:priv -Lpriv -lwiredtiger"}
{"DRV_LDFLAGS", "$DRV_LDFLAGS -Wl,-rpath,lib/wterl/priv:priv -Lc_src/system/lib -lwiredtiger"}
]}.
{pre_hooks, [{compile, "c_src/build_deps.sh"}]}.
{pre_hooks, [{compile, "c_src/build_deps.sh compile"}]}.
{post_hooks, [{clean, "c_src/build_deps.sh clean"}]}.

42
src/async_nif.hrl Normal file
View file

@ -0,0 +1,42 @@
%% -------------------------------------------------------------------
%%
%% async_nif: An async thread-pool layer for Erlang's NIF API
%%
%% Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved.
%% Author: Gregory Burd <greg@basho.com> <greg@burd.me>
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-define(ASYNC_NIF_CALL(Fun, Args),
begin
NIFRef = erlang:make_ref(),
case erlang:apply(Fun, [NIFRef|Args]) of
{ok, enqueued} ->
receive
{NIFRef, {error, shutdown}=Error} ->
%% Work unit was queued, but not executed.
Error;
{NIFRef, {error, _Reason}=Error} ->
%% Work unit returned an error.
Error;
{NIFRef, Reply} ->
Reply
end;
Other ->
Other
end
end).

View file

@ -22,7 +22,6 @@
-module(riak_kv_wterl_backend).
-behavior(temp_riak_kv_backend).
-author('Steve Vinoski <steve@basho.com>').
%% KV Backend API
-export([api_version/0,
@ -51,13 +50,11 @@
%%-define(CAPABILITIES, [async_fold, indexes]).
-define(CAPABILITIES, [async_fold]).
-record(pass, {session :: wterl:session(),
cursor :: wterl:cursor()}).
-type pass() :: #pass{}.
-record(state, {table :: string(),
type :: string(),
connection :: wterl:connection(),
passes :: [pass()]}).
is_empty_cursor :: wterl:cursor(),
status_cursor :: wterl:cursor()}).
-type state() :: #state{}.
-type config() :: [{atom(), term()}].
@ -97,33 +94,74 @@ start(Partition, Config) ->
end,
case AppStart of
ok ->
Table = "lsm:wt" ++ integer_to_list(Partition),
{ok, Connection} = establish_connection(Config),
Passes = establish_passes(erlang:system_info(schedulers), Connection, Table),
{ok, #state{table=Table, connection=Connection, passes=Passes}};
Type =
case wterl:config_value(type, Config, "lsm") of
{type, "lsm"} -> "lsm";
{type, "table"} -> "table";
{type, "btree"} -> "table";
{type, BadType} ->
lager:info("wterl:start ignoring unknown type ~p, using lsm instead", [BadType]),
"lsm";
_ ->
lager:info("wterl:start ignoring mistaken setting defaulting to lsm"),
"lsm"
end,
{ok, Connection} = establish_connection(Config, Type),
Table = Type ++ ":" ++ integer_to_list(Partition),
Compressor =
case wterl:config_value(block_compressor, Config, "snappy") of
{block_compressor, "snappy"}=C -> [C];
{block_compressor, "bzip2"}=C -> [C];
{block_compressor, "none"} -> [];
{block_compressor, none} -> [];
{block_compressor, _} -> [{block_compressor, "snappy"}];
_ -> [{block_compressor, "snappy"}]
end,
TableOpts =
case Type of
"lsm" ->
[{internal_page_max, "128K"},
{leaf_page_max, "128K"},
{lsm_chunk_size, "25MB"},
{prefix_compression, false},
{lsm_bloom_newest, true},
{lsm_bloom_oldest, true} ,
{lsm_bloom_bit_count, 128},
{lsm_bloom_hash_count, 64},
{lsm_bloom_config, [{leaf_page_max, "8MB"}]}
] ++ Compressor;
"table" ->
Compressor
end,
case wterl:create(Connection, Table, TableOpts) of
ok ->
case establish_utility_cursors(Connection, Table) of
{ok, IsEmptyCursor, StatusCursor} ->
{ok, #state{table=Table, type=Type,
connection=Connection,
is_empty_cursor=IsEmptyCursor,
status_cursor=StatusCursor}};
{error, Reason2} ->
{error, Reason2}
end;
{error, Reason3} ->
{error, Reason3}
end
end.
%% @doc Stop the wterl backend
-spec stop(state()) -> ok.
stop(#state{passes=Passes}) ->
lists:foreach(fun(Elem) ->
{Session, Cursor} = Elem,
ok = wterl:cursor_close(Cursor),
ok = wterl:session_close(Session)
end, Passes),
ok.
stop(_State) ->
ok. %% The connection is closed by wterl_conn:stop()
%% @doc Retrieve an object from the wterl backend
-spec get(riak_object:bucket(), riak_object:key(), state()) ->
{ok, any(), state()} |
{ok, not_found, state()} |
{error, term(), state()}.
get(Bucket, Key, #state{passes=Passes}=State) ->
get(Bucket, Key, #state{connection=Connection, table=Table}=State) ->
WTKey = to_object_key(Bucket, Key),
{_Session, Cursor} = lists:nth(erlang:system_info(scheduler_id), Passes),
case wterl:cursor_search(Cursor, WTKey) of
case wterl:get(Connection, Table, WTKey) of
{ok, Value} ->
{ok, Value, State};
not_found ->
@ -140,10 +178,8 @@ get(Bucket, Key, #state{passes=Passes}=State) ->
-spec put(riak_object:bucket(), riak_object:key(), [index_spec()], binary(), state()) ->
{ok, state()} |
{error, term(), state()}.
put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{passes=Passes}=State) ->
WTKey = to_object_key(Bucket, PrimaryKey),
{_Session, Cursor} = lists:nth(erlang:system_info(scheduler_id), Passes),
case wterl:cursor_insert(Cursor, WTKey, Val) of
put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{connection=Connection, table=Table}=State) ->
case wterl:put(Connection, Table, to_object_key(Bucket, PrimaryKey), Val) of
ok ->
{ok, State};
{error, Reason} ->
@ -157,10 +193,8 @@ put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{passes=Passes}=State) ->
-spec delete(riak_object:bucket(), riak_object:key(), [index_spec()], state()) ->
{ok, state()} |
{error, term(), state()}.
delete(Bucket, Key, _IndexSpecs, #state{passes=Passes}=State) ->
WTKey = to_object_key(Bucket, Key),
{_Session, Cursor} = lists:nth(erlang:system_info(scheduler_id), Passes),
case wterl:cursor_remove(Cursor, WTKey) of
delete(Bucket, Key, _IndexSpecs, #state{connection=Connection, table=Table}=State) ->
case wterl:delete(Connection, Table, to_object_key(Bucket, Key)) of
ok ->
{ok, State};
{error, Reason} ->
@ -172,14 +206,12 @@ delete(Bucket, Key, _IndexSpecs, #state{passes=Passes}=State) ->
any(),
[],
state()) -> {ok, any()} | {async, fun()}.
fold_buckets(FoldBucketsFun, Acc, Opts, #state{table=Table}) ->
{ok, Connection} = wterl_conn:get(),
fold_buckets(FoldBucketsFun, Acc, Opts, #state{connection=Connection, table=Table}) ->
FoldFun = fold_buckets_fun(FoldBucketsFun),
BucketFolder =
fun() ->
{ok, Session} = wterl:session_open(Connection),
case wterl:cursor_open(Session, Table) of
{error, "No such file or directory"} ->
case wterl:cursor_open(Connection, Table) of
{error, {enoent, _Message}} ->
Acc;
{ok, Cursor} ->
try
@ -190,8 +222,7 @@ fold_buckets(FoldBucketsFun, Acc, Opts, #state{table=Table}) ->
{break, AccFinal} ->
AccFinal
after
ok = wterl:cursor_close(Cursor),
ok = wterl:session_close(Session)
ok = wterl:cursor_close(Cursor)
end
end
end,
@ -207,7 +238,7 @@ fold_buckets(FoldBucketsFun, Acc, Opts, #state{table=Table}) ->
any(),
[{atom(), term()}],
state()) -> {ok, term()} | {async, fun()}.
fold_keys(FoldKeysFun, Acc, Opts, #state{table=Table}) ->
fold_keys(FoldKeysFun, Acc, Opts, #state{connection=Connection, table=Table}) ->
%% Figure out how we should limit the fold: by bucket, by
%% secondary index, or neither (fold across everything.)
Bucket = lists:keyfind(bucket, 1, Opts),
@ -220,15 +251,12 @@ fold_keys(FoldKeysFun, Acc, Opts, #state{table=Table}) ->
true -> undefined
end,
{ok, Connection} = wterl_conn:get(),
%% Set up the fold...
FoldFun = fold_keys_fun(FoldKeysFun, Limiter),
KeyFolder =
fun() ->
{ok, Session} = wterl:session_open(Connection),
case wterl:cursor_open(Session, Table) of
{error, "No such file or directory"} ->
case wterl:cursor_open(Connection, Table) of
{error, {enoent, _Message}} ->
Acc;
{ok, Cursor} ->
try
@ -237,8 +265,7 @@ fold_keys(FoldKeysFun, Acc, Opts, #state{table=Table}) ->
{break, AccFinal} ->
AccFinal
after
ok = wterl:cursor_close(Cursor),
ok = wterl:session_close(Session)
ok = wterl:cursor_close(Cursor)
end
end
end,
@ -254,15 +281,13 @@ fold_keys(FoldKeysFun, Acc, Opts, #state{table=Table}) ->
any(),
[{atom(), term()}],
state()) -> {ok, any()} | {async, fun()}.
fold_objects(FoldObjectsFun, Acc, Opts, #state{table=Table}) ->
{ok, Connection} = wterl_conn:get(),
fold_objects(FoldObjectsFun, Acc, Opts, #state{connection=Connection, table=Table}) ->
Bucket = proplists:get_value(bucket, Opts),
FoldFun = fold_objects_fun(FoldObjectsFun, Bucket),
ObjectFolder =
fun() ->
{ok, Session} = wterl:session_open(Connection),
case wterl:cursor_open(Session, Table) of
{error, "No such file or directory"} ->
case wterl:cursor_open(Connection, Table) of
{error, {enoent, _Message}} ->
Acc;
{ok, Cursor} ->
try
@ -271,8 +296,14 @@ fold_objects(FoldObjectsFun, Acc, Opts, #state{table=Table}) ->
{break, AccFinal} ->
AccFinal
after
ok = wterl:cursor_close(Cursor),
ok = wterl:session_close(Session)
case wterl:cursor_close(Cursor) of
ok ->
ok;
{error, {eperm, _}} -> %% TODO: review/fix
ok;
{error, _}=E ->
E
end
end
end
end,
@ -285,11 +316,12 @@ fold_objects(FoldObjectsFun, Acc, Opts, #state{table=Table}) ->
%% @doc Delete all objects from this wterl backend
-spec drop(state()) -> {ok, state()} | {error, term(), state()}.
drop(#state{passes=Passes, table=Table}=State) ->
{Session, _Cursor} = lists:nth(erlang:system_info(scheduler_id), Passes),
case wterl:session_truncate(Session, Table) of
drop(#state{connection=Connection, table=Table}=State) ->
case wterl:drop(Connection, Table) of
ok ->
{ok, State};
{error, {ebusy, _}} -> %% TODO: review/fix
{ok, State};
Error ->
{error, Error, State}
end.
@ -297,24 +329,25 @@ drop(#state{passes=Passes, table=Table}=State) ->
%% @doc Returns true if this wterl backend contains any
%% non-tombstone values; otherwise returns false.
-spec is_empty(state()) -> boolean().
is_empty(#state{passes=Passes}) ->
{_Session, Cursor} = lists:nth(erlang:system_info(scheduler_id), Passes),
is_empty(#state{is_empty_cursor=Cursor}) ->
wterl:cursor_reset(Cursor),
try
not_found =:= wterl:cursor_next(Cursor)
after
ok = wterl:cursor_close(Cursor)
case wterl:cursor_next(Cursor) of
not_found -> true;
{error, {eperm, _}} -> false; % TODO: review/fix this logic
_ -> false
end.
%% @doc Get the status information for this wterl backend
-spec status(state()) -> [{atom(), term()}].
status(#state{passes=Passes}) ->
{_Session, Cursor} = lists:nth(erlang:system_info(scheduler_id), Passes),
try
Stats = fetch_status(Cursor),
[{stats, Stats}]
after
ok = wterl:cursor_close(Cursor)
status(#state{status_cursor=Cursor}) ->
wterl:cursor_reset(Cursor),
case fetch_status(Cursor) of
{ok, Stats} ->
Stats;
{error, {eperm, _}} -> % TODO: review/fix this logic
{ok, []};
_ ->
{ok, []}
end.
%% @doc Register an asynchronous callback
@ -327,16 +360,35 @@ callback(_Ref, _Msg, State) ->
%% Internal functions
%% ===================================================================
%% @private
max_sessions(Config) ->
RingSize =
case app_helper:get_prop_or_env(ring_creation_size, Config, riak_core) of
undefined -> 1024;
Size -> Size
end,
2 * (RingSize * erlang:system_info(schedulers)).
Est = 100 * (RingSize * erlang:system_info(schedulers)), % TODO: review/fix this logic
case Est > 1000000000 of % Note: WiredTiger uses a signed int for this
true -> 1000000000;
false -> Est
end.
%% @private
establish_connection(Config) ->
establish_utility_cursors(Connection, Table) ->
case wterl:cursor_open(Connection, Table) of
{ok, IsEmptyCursor} ->
case wterl:cursor_open(Connection, "statistics:" ++ Table, [{statistics_fast, true}]) of
{ok, StatusCursor} ->
{ok, IsEmptyCursor, StatusCursor};
{error, Reason1} ->
{error, Reason1}
end;
{error, Reason2} ->
{error, Reason2}
end.
%% @private
establish_connection(Config, Type) ->
%% Get the data root directory
case app_helper:get_prop_or_env(data_root, Config, wterl) of
undefined ->
@ -344,8 +396,18 @@ establish_connection(Config) ->
{error, data_root_unset};
DataRoot ->
ok = filelib:ensure_dir(filename:join(DataRoot, "x")),
%% WT Connection Options:
%% NOTE: LSM auto-checkpoints, so we don't have too.
CheckpointSetting =
case Type =:= "lsm" of
true ->
[];
false ->
app_helper:get_prop_or_env(checkpoint, Config, wterl, [{wait, 10}])
end,
RequestedCacheSize = app_helper:get_prop_or_env(cache_size, Config, wterl),
Opts =
ConnectionOpts =
orddict:from_list(
[ wterl:config_value(create, Config, true),
wterl:config_value(sync, Config, false),
@ -354,14 +416,18 @@ establish_connection(Config) ->
wterl:config_value(session_max, Config, max_sessions(Config)),
wterl:config_value(cache_size, Config, size_cache(RequestedCacheSize)),
wterl:config_value(statistics_log, Config, [{wait, 30}]), % sec
%% NOTE: LSM auto-checkpoints, so we don't have too.
%% wterl:config_value(checkpoint, Config, [{wait, 10}]), % sec
wterl:config_value(verbose, Config, [
%"ckpt" "block", "shared_cache", "evictserver", "fileops",
%"hazard", "mutex", "read", "readserver", "reconcile",
%"salvage", "verify", "write", "evict", "lsm"
]) ] ++ proplists:get_value(wterl, Config, [])), % sec
case wterl_conn:open(DataRoot, Opts) of
]) ] ++ CheckpointSetting ++ proplists:get_value(wterl, Config, [])), % sec
%% WT Session Options:
SessionOpts = [{isolation, "snapshot"}],
case wterl_conn:open(DataRoot, ConnectionOpts, SessionOpts) of
{ok, Connection} ->
{ok, Connection};
{error, Reason2} ->
@ -370,40 +436,6 @@ establish_connection(Config) ->
end
end.
establish_passes(Count, Connection, Table)
when is_number(Count), Count > 0 ->
lists:map(fun(_Elem) ->
{ok, Session} = establish_session(Connection, Table),
{ok, Cursor} = wterl:cursor_open(Session, Table),
{Session, Cursor}
end, lists:seq(1, Count)).
%% @private
establish_session(Connection, Table) ->
case wterl:session_open(Connection, wterl:config_to_bin([{isolation, "snapshot"}])) of
{ok, Session} ->
SessionOpts =
[{block_compressor, "snappy"},
{internal_page_max, "128K"},
{leaf_page_max, "128K"},
{lsm_chunk_size, "25MB"},
{lsm_bloom_newest, true},
{lsm_bloom_oldest, true} ,
{lsm_bloom_bit_count, 128},
{lsm_bloom_hash_count, 64},
{lsm_bloom_config, [{leaf_page_max, "8MB"}]} ],
case wterl:session_create(Session, Table, wterl:config_to_bin(SessionOpts)) of
ok ->
{ok, Session};
{error, Reason} ->
lager:error("Failed to start wterl backend: ~p\n", [Reason]),
{error, Reason}
end;
{error, Reason} ->
lager:error("Failed to open a WiredTiger session: ~p\n", [Reason]),
{error, Reason}
end.
%% @private
%% Return a function to fold over the buckets on this backend
fold_buckets_fun(FoldBucketsFun) ->
@ -512,7 +544,9 @@ from_index_key(LKey) ->
%% @private
%% Return all status from wterl statistics cursor
fetch_status(Cursor) ->
fetch_status(Cursor, wterl:cursor_next_value(Cursor), []).
{ok, fetch_status(Cursor, wterl:cursor_next_value(Cursor), [])}.
fetch_status(_Cursor, {error, _}, Acc) ->
lists:reverse(Acc);
fetch_status(_Cursor, not_found, Acc) ->
lists:reverse(Acc);
fetch_status(Cursor, {ok, Stat}, Acc) ->
@ -563,12 +597,14 @@ size_cache(RequestedSize) ->
-ifdef(TEST).
simple_test_() ->
?assertCmd("rm -rf test/wterl-backend"),
{ok, CWD} = file:get_cwd(),
rmdir:path(filename:join([CWD, "test/wterl-backend"])), %?assertCmd("rm -rf test/wterl-backend"),
application:set_env(wterl, data_root, "test/wterl-backend"),
temp_riak_kv_backend:standard_test(?MODULE, []).
custom_config_test_() ->
?assertCmd("rm -rf test/wterl-backend"),
{ok, CWD} = file:get_cwd(),
rmdir:path(filename:join([CWD, "test/wterl-backend"])), %?assertCmd("rm -rf test/wterl-backend"),
application:set_env(wterl, data_root, ""),
temp_riak_kv_backend:standard_test(?MODULE, [{data_root, "test/wterl-backend"}]).

26
src/rmdir.erl Normal file
View file

@ -0,0 +1,26 @@
-module(rmdir).
-export([path/1]).
-include_lib("kernel/include/file.hrl").
path(Dir) ->
remove_all_files(".", [Dir]).
remove_all_files(Dir, Files) ->
lists:foreach(fun(File) ->
FilePath = filename:join([Dir, File]),
case file:read_file_info(FilePath) of
{ok, FileInfo} ->
case FileInfo#file_info.type of
directory ->
{ok, DirFiles} = file:list_dir(FilePath),
remove_all_files(FilePath, DirFiles),
file:del_dir(FilePath);
_ ->
file:delete(FilePath)
end;
{error, _Reason} ->
ok
end
end, Files).

View file

@ -272,13 +272,16 @@ empty_check({Backend, State}) ->
}.
setup({BackendMod, Config}) ->
lager:start(),
application:start(lager),
application:start(sasl),
application:start(os_mon),
{ok, S} = BackendMod:start(42, Config),
{BackendMod, S}.
cleanup({BackendMod, S}) ->
ok = BackendMod:stop(S).
ok = BackendMod:stop(S),
application:stop(lager),
application:stop(sasl),
application:stop(os_mon).
-endif. % TEST

View file

@ -1,6 +1,6 @@
{application, wterl,
[
{description, "Erlang Wrapper for WiredTiger"},
{description, "Erlang NIF Wrapper for WiredTiger"},
{vsn, "0.9.0"},
{registered, []},
{applications, [

File diff suppressed because it is too large Load diff

View file

@ -30,7 +30,7 @@
%% API
-export([start_link/0, stop/0,
open/1, open/2, is_open/0, get/0, close/1]).
open/1, open/2, open/3, is_open/0, get/0, close/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
@ -53,12 +53,14 @@ stop() ->
gen_server:cast(?MODULE, stop).
-spec open(string()) -> {ok, wterl:connection()} | {error, term()}.
open(Dir) ->
open(Dir, []).
-spec open(string(), config_list()) -> {ok, wterl:connection()} | {error, term()}.
open(Dir, Config) ->
gen_server:call(?MODULE, {open, Dir, Config, self()}, infinity).
-spec open(string(), config_list(), config_list()) -> {ok, wterl:connection()} | {error, term()}.
open(Dir) ->
open(Dir, [], []).
open(Dir, ConnectionConfig) ->
gen_server:call(?MODULE, {open, Dir, ConnectionConfig, [], self()}, infinity).
open(Dir, ConnectionConfig, SessionConfig) ->
gen_server:call(?MODULE, {open, Dir, ConnectionConfig, SessionConfig, self()}, infinity).
-spec is_open() -> boolean().
is_open() ->
@ -80,9 +82,9 @@ init([]) ->
true = wterl_ets:table_ready(),
{ok, #state{}}.
handle_call({open, Dir, Config, Caller}, _From, #state{conn=undefined}=State) ->
handle_call({open, Dir, ConnectionConfig, SessionConfig, Caller}, _From, #state{conn=undefined}=State) ->
{Reply, NState} =
case wterl:connection_open(Dir, wterl:config_to_bin(Config)) of
case wterl:connection_open(Dir, ConnectionConfig, SessionConfig) of
{ok, ConnRef}=OK ->
Monitor = erlang:monitor(process, Caller),
true = ets:insert(wterl_ets, {Monitor, Caller}),
@ -91,7 +93,7 @@ handle_call({open, Dir, Config, Caller}, _From, #state{conn=undefined}=State) ->
{Error, State}
end,
{reply, Reply, NState};
handle_call({open, _Dir, _Config, Caller}, _From,#state{conn=ConnRef}=State) ->
handle_call({open, _Dir, _ConnectionConfig, _SessionConfig, Caller}, _From, #state{conn=ConnRef}=State) ->
Monitor = erlang:monitor(process, Caller),
true = ets:insert(wterl_ets, {Monitor, Caller}),
{reply, {ok, ConnRef}, State};

11
update-version.sh Executable file
View file

@ -0,0 +1,11 @@
#!/bin/sh -
# Note: also, remember to update version numbers in rpath specs so that shared libs can be found at runtime!!!
wterl=`git log -n 1 --pretty=format:"%H"`
wiredtiger0=`(cd c_src/wiredtiger && git log -n 1 --pretty=format:"%H")`
wiredtiger=`echo $wiredtiger0 | awk '{print $2}'`
echo $wterl
echo $wiredtiger