Compare commits

...

40 commits

Author SHA1 Message Date
Gregory Burd
cc807a97d6 Add new mmap config option. 2014-01-15 10:45:04 -05:00
Gregory Burd
ea99493ea3 Compenstate for LSM config API changes 2013-12-09 12:54:58 -05:00
Gregory Burd
08b2d18463 Fix checkpoint config. 2013-11-20 13:10:28 -05:00
Gregory Burd
3302ab26ed Use the develop branch for now. 2013-11-19 14:16:26 -05:00
Gregory Burd
db2daf99b2 Default logging off. 2013-11-19 14:16:01 -05:00
Gregory Burd
68d9ed942b Update to WiredTiger 1.6.6 2013-11-18 20:54:59 -05:00
Gregory Burd
36faa4e713 Forgot to remove second use of checkpoint setting. 2013-11-18 20:50:26 -05:00
Gregory Burd
e560185420 When logging enable checkpoints, even when using LSM. 2013-11-18 20:46:22 -05:00
Gregory Burd
448c0b555c Update config to match latest available options. 2013-10-30 15:00:41 -04:00
Gregory Burd
634bcd188a Integrate new configuration options available in WiredTiger. 2013-10-30 14:50:14 -04:00
Gregory Burd
1664fdcf8c API for handlers in WiredTiger changed to include session state, update our use of the API to match that change. 2013-10-30 13:11:14 -04:00
Gregory Burd
95515f111c Merge pull request #11 from basho-labs/gsb-2.0-fixes
Changes related to Riak 2.0 and an issue with how statistics were gathered from the backend
2013-10-30 08:53:49 -07:00
Gregory Burd
ac2c5caeff Change a few default configs and comment out the stats gathering for now. 2013-10-30 11:50:20 -04:00
Gregory Burd
75305dae94 Minor updates. 2013-10-12 21:48:05 -04:00
Gregory Burd
7d0ad2dce1 Update the version strings and a few config values which changed names. 2013-10-02 14:42:26 -04:00
Gregory Burd
84a85bbe38 Open a *statistics* cursor when gathering statistics. 2013-10-02 14:41:36 -04:00
Gregory Burd
9d2896016b A few build automation changes/fixes. 2013-10-02 14:41:06 -04:00
Gregory Burd
17585a99b1 priv now has the schema file in it, so be more specific with what we ignore in that dir 2013-10-02 14:38:41 -04:00
Gregory Burd
942e51b753 OS/X uses ".dylib" rather than ".so" for shared libraries (because it's
special) so I've worked around that.  Also tightened up some tests so that
we're not rebuilding the libraries when not necessary.
2013-09-06 09:54:55 -04:00
Gregory Burd
c60fa22422 Retry three times, then bail out and return not found. 2013-09-04 13:12:37 -04:00
Gregory Burd
48419ce4d0 Start the penalty after queues are 25% full because a) that makes sense, and b)
that avoids some odd badarith errors when PctBusy is very small.
2013-08-21 14:19:52 -04:00
Gregory Burd
2ddf0da53e Use malloc/free rather than enif_alloc/enif_free so as to avoid BEAM allocator
overhead (bytes and time).  Create static references to commonly used Erlang
atoms to avoid overhead re-creating them on each request cycle.
2013-08-21 12:20:19 -04:00
Gregory Burd
83c3faf74f Use malloc/free rather than enif_alloc/enif_free so as to avoid BEAM allocator
overhead (bytes and time).
2013-08-21 12:18:24 -04:00
Gregory Burd
2043e8ccc6 Because the build decends into the ext/compressors/snappy directory the
relative paths won't find system/include, so use the absolute paths instead.
2013-08-21 12:17:18 -04:00
Gregory Burd
33c8e53ccf Update to latest release of WiredTiger. Also, make sure Snappy builds before WiredTiger. 2013-08-21 12:16:24 -04:00
Gregory Burd
1bf66ae960 Every enqueued request now includes a hint as to how much work is pending in
the lower C-code.  We use that to scale the reduction count penalty so that we
can (hopefully) signal to the Erlang scheduler enough information for it to
properly throttle work.  'eagain' should only happen when queues are full, we
have no choice but to keep this calling proc busy in a recursive loop trying
the request over and over if we're going to preserve request ordering.
2013-08-21 12:15:34 -04:00
Gregory Burd
e67da86a9b Change backpressure method from EAGAIN to bump_reductions so as not to block Riak/KV vnode processes when queues backup. 2013-08-19 13:32:58 -04:00
Gregory Burd
2047104cda Remove the sleep from async_nif's EAGAIN path because it doesn't seem to have a positive effect. 2013-08-19 12:20:36 -04:00
Gregory Burd
96d43d5d17 Re-use the unchanging value of 'Args' rather than including it in every recursive call. 2013-08-02 14:22:30 -04:00
Gregory Burd
05c8c615ef I think the make_ref() needs to be within the fun()'s context to trigger selective receive optimization in the beam's runtime. 2013-08-01 10:02:21 -04:00
Gregory Burd
f153509409 With some input from Jon I've managed to reduce this back into a macro rather than a fun and a macro calling a fun. He also suggested that on eagain I sleep a small amount of time so as to allow other work to catch up a bit. 2013-07-31 15:39:55 -04:00
Gregory Burd
ee904b4769 Lower the queue size to shrink potential for latency in queue. Remove earlier idea that more queues would lead to more even worker progress, just have 1 queue per Erlang-scheduler thread (generally, 1 per CPU core available). Also change the way worker threads decide when to cond_wait or migrate to other queues looking for work. 2013-07-31 15:06:28 -04:00
Gregory Burd
c9a4ab8325 Revert changes to async_nif and re-enable stats. Fixed selective recv. 2013-07-31 09:41:36 -04:00
Gregory Burd
2393257bef Really disable stats. 2013-07-30 14:34:04 -04:00
Gregory Burd
211ffd884c Ignore requests for stats for right now. 2013-07-30 14:30:04 -04:00
Gregory Burd
4418a74183 Increase the number of queues for work to reside. Worker threads, once started, don't exit until shutdown. 2013-07-30 14:21:26 -04:00
Gregory Burd
1623d5293c Increase the max queue size. 2013-07-30 13:30:43 -04:00
Gregory Burd
56c2ac27c2 Revert to a macro-only, non-recursive on eagain method for managing requests. 2013-07-30 13:27:13 -04:00
Gregory Burd
27dba903ef The ref needs to be in-scope of the recieve for it to be optimized. 2013-07-30 13:20:49 -04:00
Gregory Burd
8f415df69c Merge pull request #10 from basho-labs/gsb-workers-migrate
Worker threads should check for work in other queues before exiting.
2013-07-26 17:12:15 -07:00
12 changed files with 240 additions and 278 deletions

4
.gitignore vendored
View file

@ -7,6 +7,8 @@ c_src/*.o
c_src/bzip2-1.0.6
c_src/snappy-1.0.4
deps/
priv/
priv/wt
priv/*.so*
priv/*.dylib*
log/
*~

View file

@ -52,19 +52,17 @@ endif
.PHONY: all compile doc clean test dialyzer typer shell distclean pdf \
update-deps clean-common-test-data rebuild
all: deps compile test
all: deps compile
# =============================================================================
# Rules to build the system
# =============================================================================
deps:
c_src/build_deps.sh get-deps
$(REBAR) get-deps
$(REBAR) compile
update-deps:
c_src/build_deps.sh update-deps
$(REBAR) update-deps
$(REBAR) compile

View file

@ -34,9 +34,18 @@ extern "C" {
#define ASYNC_NIF_MAX_WORKERS 1024
#define ASYNC_NIF_MIN_WORKERS 2
#define ASYNC_NIF_WORKER_QUEUE_SIZE 100
#define ASYNC_NIF_WORKER_QUEUE_SIZE 8192
#define ASYNC_NIF_MAX_QUEUED_REQS ASYNC_NIF_WORKER_QUEUE_SIZE * ASYNC_NIF_MAX_WORKERS
/* Atoms (initialized in on_load) */
static ERL_NIF_TERM ATOM_EAGAIN;
static ERL_NIF_TERM ATOM_ENOMEM;
static ERL_NIF_TERM ATOM_ENQUEUED;
static ERL_NIF_TERM ATOM_ERROR;
static ERL_NIF_TERM ATOM_OK;
static ERL_NIF_TERM ATOM_SHUTDOWN;
struct async_nif_req_entry {
ERL_NIF_TERM ref;
ErlNifEnv *env;
@ -104,25 +113,20 @@ struct async_nif_state {
argc -= 1; \
/* Note: !!! this assumes that the first element of priv_data is ours */ \
struct async_nif_state *async_nif = *(struct async_nif_state**)enif_priv_data(env); \
if (async_nif->shutdown) { \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "shutdown")); \
} \
if (async_nif->shutdown) \
return enif_make_tuple2(env, ATOM_ERROR, ATOM_SHUTDOWN); \
req = async_nif_reuse_req(async_nif); \
if (!req) { \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "eagain")); \
} \
if (!req) \
return enif_make_tuple2(env, ATOM_ERROR, ATOM_ENOMEM); \
new_env = req->env; \
DPRINTF("async_nif: calling \"%s\"", __func__); \
do pre_block while(0); \
DPRINTF("async_nif: returned from \"%s\"", __func__); \
copy_of_args = (struct decl ## _args *)enif_alloc(sizeof(struct decl ## _args)); \
copy_of_args = (struct decl ## _args *)malloc(sizeof(struct decl ## _args)); \
if (!copy_of_args) { \
fn_post_ ## decl (args); \
async_nif_recycle_req(req, async_nif); \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "enomem")); \
return enif_make_tuple2(env, ATOM_ERROR, ATOM_ENOMEM); \
} \
memcpy(copy_of_args, args, sizeof(struct decl ## _args)); \
req->ref = enif_make_copy(new_env, argv_in[0]); \
@ -137,9 +141,8 @@ struct async_nif_state {
if (!reply) { \
fn_post_ ## decl (args); \
async_nif_recycle_req(req, async_nif); \
enif_free(copy_of_args); \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "eagain")); \
free(copy_of_args); \
return enif_make_tuple2(env, ATOM_ERROR, ATOM_EAGAIN); \
} \
return reply; \
}
@ -147,11 +150,11 @@ struct async_nif_state {
#define ASYNC_NIF_INIT(name) \
static ErlNifMutex *name##_async_nif_coord = NULL;
#define ASYNC_NIF_LOAD(name, priv) do { \
#define ASYNC_NIF_LOAD(name, env, priv) do { \
if (!name##_async_nif_coord) \
name##_async_nif_coord = enif_mutex_create("nif_coord load"); \
enif_mutex_lock(name##_async_nif_coord); \
priv = async_nif_load(); \
priv = async_nif_load(env); \
enif_mutex_unlock(name##_async_nif_coord); \
} while(0);
#define ASYNC_NIF_UNLOAD(name, env, priv) do { \
@ -192,15 +195,15 @@ async_nif_reuse_req(struct async_nif_state *async_nif)
enif_mutex_lock(async_nif->recycled_req_mutex);
if (STAILQ_EMPTY(&async_nif->recycled_reqs)) {
if (async_nif->num_reqs < ASYNC_NIF_MAX_QUEUED_REQS) {
req = enif_alloc(sizeof(struct async_nif_req_entry));
req = malloc(sizeof(struct async_nif_req_entry));
if (req) {
memset(req, 0, sizeof(struct async_nif_req_entry));
env = enif_alloc_env();
if (env) {
req->env = env;
__sync_fetch_and_add(&async_nif->num_reqs, 1);
__sync_fetch_and_add(&async_nif->num_reqs, 1);
} else {
enif_free(req);
free(req);
req = NULL;
}
}
@ -254,7 +257,7 @@ async_nif_start_worker(struct async_nif_state *async_nif, struct async_nif_work_
SLIST_REMOVE(&async_nif->we_joining, we, async_nif_worker_entry, entries);
void *exit_value = 0; /* We ignore the thread_join's exit value. */
enif_thread_join(we->tid, &exit_value);
enif_free(we);
free(we);
async_nif->we_active--;
we = n;
}
@ -264,7 +267,7 @@ async_nif_start_worker(struct async_nif_state *async_nif, struct async_nif_work_
return EAGAIN;
}
we = enif_alloc(sizeof(struct async_nif_worker_entry));
we = malloc(sizeof(struct async_nif_worker_entry));
if (!we) {
enif_mutex_unlock(async_nif->we_mutex);
return ENOMEM;
@ -299,8 +302,8 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
qid = (unsigned int)hint;
} else {
do {
last_qid = __sync_fetch_and_add(&async_nif->next_q, 0);
qid = (last_qid + 1) % async_nif->num_queues;
last_qid = __sync_fetch_and_add(&async_nif->next_q, 0);
qid = (last_qid + 1) % async_nif->num_queues;
} while (!__sync_bool_compare_and_swap(&async_nif->next_q, last_qid, qid));
}
@ -323,10 +326,6 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
queue will be valid until we release the lock. */
q = &async_nif->queues[qid];
enif_mutex_lock(q->reqs_mutex);
if (async_nif->shutdown) {
enif_mutex_unlock(q->reqs_mutex);
return 0;
}
/* Try not to enqueue a request into a queue that isn't keeping up with
the request volume. */
@ -359,8 +358,10 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
/* Build the term before releasing the lock so as not to race on the use of
the req pointer (which will soon become invalid in another thread
performing the request). */
ERL_NIF_TERM reply = enif_make_tuple2(req->env, enif_make_atom(req->env, "ok"),
enif_make_atom(req->env, "enqueued"));
double pct_full = (double)avg_depth / (double)ASYNC_NIF_WORKER_QUEUE_SIZE;
ERL_NIF_TERM reply = enif_make_tuple2(req->env, ATOM_OK,
enif_make_tuple2(req->env, ATOM_ENQUEUED,
enif_make_double(req->env, pct_full)));
enif_cond_signal(q->reqs_cnd);
enif_mutex_unlock(q->reqs_mutex);
return reply;
@ -391,20 +392,25 @@ async_nif_worker_fn(void *arg)
}
if (STAILQ_EMPTY(&q->reqs)) {
/* Queue is empty so we wait for more work to arrive. */
if (q->num_workers > ASYNC_NIF_MIN_WORKERS) {
enif_mutex_unlock(q->reqs_mutex);
if (tries == 0 && q == we->q) break; // we've tried all queues, thread exit
else {
tries--;
__sync_fetch_and_add(&q->num_workers, -1);
q = q->next;
__sync_fetch_and_add(&q->num_workers, 1);
continue; // try another queue
}
} else {
enif_cond_wait(q->reqs_cnd, q->reqs_mutex);
goto check_again_for_work;
}
enif_mutex_unlock(q->reqs_mutex);
if (tries == 0 && q == we->q) {
if (q->num_workers > ASYNC_NIF_MIN_WORKERS) {
/* At this point we've tried to find/execute work on all queues
* and there are at least MIN_WORKERS on this queue so we
* leaving this loop (break) which leads to a thread exit/join. */
break;
} else {
enif_mutex_lock(q->reqs_mutex);
enif_cond_wait(q->reqs_cnd, q->reqs_mutex);
goto check_again_for_work;
}
} else {
tries--;
__sync_fetch_and_add(&q->num_workers, -1);
q = q->next;
__sync_fetch_and_add(&q->num_workers, 1);
continue; // try next queue
}
} else {
/* At this point the next req is ours to process and we hold the
reqs_mutex lock. Take the request off the queue. */
@ -412,8 +418,7 @@ async_nif_worker_fn(void *arg)
STAILQ_REMOVE(&q->reqs, req, async_nif_req_entry, entries);
__sync_fetch_and_add(&q->depth, -1);
/* Ensure that there is at least one other worker thread watching this
queue. */
/* Wake up other worker thread watching this queue to help process work. */
enif_cond_signal(q->reqs_cnd);
enif_mutex_unlock(q->reqs_mutex);
@ -427,7 +432,7 @@ async_nif_worker_fn(void *arg)
req->ref = 0;
req->fn_work = 0;
req->fn_post = 0;
enif_free(req->args);
free(req->args);
req->args = NULL;
async_nif_recycle_req(req, async_nif);
req = NULL;
@ -479,7 +484,7 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
SLIST_REMOVE(&async_nif->we_joining, we, async_nif_worker_entry, entries);
void *exit_value = 0; /* We ignore the thread_join's exit value. */
enif_thread_join(we->tid, &exit_value);
enif_free(we);
free(we);
async_nif->we_active--;
we = n;
}
@ -498,12 +503,11 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
struct async_nif_req_entry *n = STAILQ_NEXT(req, entries);
enif_clear_env(req->env);
enif_send(NULL, &req->pid, req->env,
enif_make_tuple2(req->env, enif_make_atom(req->env, "error"),
enif_make_atom(req->env, "shutdown")));
enif_make_tuple2(req->env, ATOM_ERROR, ATOM_SHUTDOWN));
req->fn_post(req->args);
enif_free_env(req->env);
enif_free(req->args);
enif_free(req);
free(req->args);
free(req);
req = n;
}
enif_mutex_destroy(q->reqs_mutex);
@ -517,18 +521,18 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
while(req != NULL) {
struct async_nif_req_entry *n = STAILQ_NEXT(req, entries);
enif_free_env(req->env);
enif_free(req);
free(req);
req = n;
}
enif_mutex_unlock(async_nif->recycled_req_mutex);
enif_mutex_destroy(async_nif->recycled_req_mutex);
memset(async_nif, 0, sizeof(struct async_nif_state) + (sizeof(struct async_nif_work_queue) * async_nif->num_queues));
enif_free(async_nif);
free(async_nif);
}
static void *
async_nif_load()
async_nif_load(ErlNifEnv *env)
{
static int has_init = 0;
unsigned int i, num_queues;
@ -539,6 +543,14 @@ async_nif_load()
if (has_init) return 0;
else has_init = 1;
/* Init some static references to commonly used atoms. */
ATOM_EAGAIN = enif_make_atom(env, "eagain");
ATOM_ENOMEM = enif_make_atom(env, "enomem");
ATOM_ENQUEUED = enif_make_atom(env, "enqueued");
ATOM_ERROR = enif_make_atom(env, "error");
ATOM_OK = enif_make_atom(env, "ok");
ATOM_SHUTDOWN = enif_make_atom(env, "shutdown");
/* Find out how many schedulers there are. */
enif_system_info(&info, sizeof(ErlNifSysInfo));
@ -556,8 +568,8 @@ async_nif_load()
}
/* Init our portion of priv_data's module-specific state. */
async_nif = enif_alloc(sizeof(struct async_nif_state) +
sizeof(struct async_nif_work_queue) * num_queues);
async_nif = malloc(sizeof(struct async_nif_state) +
sizeof(struct async_nif_work_queue) * num_queues);
if (!async_nif)
return NULL;
memset(async_nif, 0, sizeof(struct async_nif_state) +

View file

@ -11,10 +11,10 @@ unset POSIX_SHELL # clear it so if we invoke other scripts, they run as ksh as w
set -e
WT_REPO=http://github.com/wiredtiger/wiredtiger.git
#WT_BRANCH=develop
#WT_DIR=wiredtiger-`basename $WT_BRANCH`
WT_REF="tags/1.6.3"
WT_DIR=wiredtiger-`basename $WT_REF`
WT_BRANCH=develop
WT_DIR=wiredtiger-`basename $WT_BRANCH`
#WT_REF="tags/1.6.6"
#WT_DIR=wiredtiger-`basename $WT_REF`
SNAPPY_VSN="1.0.4"
SNAPPY_DIR=snappy-$SNAPPY_VSN
@ -26,8 +26,7 @@ export BASEDIR="$PWD"
which gmake 1>/dev/null 2>/dev/null && MAKE=gmake
MAKE=${MAKE:-make}
export CFLAGS="$CFLAGS -I $BASEDIR/system/include"
export CXXFLAGS="$CXXFLAGS -I $BASEDIR/system/include"
export CPPFLAGS="$CPPLAGS -I $BASEDIR/system/include -O3 -mtune=native -march=native"
export LDFLAGS="$LDFLAGS -L$BASEDIR/system/lib"
export LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$BASEDIR/system/lib:$LD_LIBRARY_PATH"
@ -58,7 +57,7 @@ get_wt ()
wt_configure ()
{
(cd $BASEDIR/$WT_DIR/build_posix
CFLAGS+=-g ../configure --with-pic \
CFLAGS+=-g $BASEDIR/$WT_DIR/configure --with-pic \
--enable-snappy \
--prefix=${BASEDIR}/system || exit 1)
}
@ -76,8 +75,8 @@ get_snappy ()
get_deps ()
{
get_wt;
get_snappy;
get_wt;
}
update_deps ()
@ -110,7 +109,7 @@ build_snappy ()
case "$1" in
clean)
[ -e $BASEDIR/$WT_DIR/build_posix/Makefile ] && \
(cd $BASEDIR/$WT_DIR/build_posix && $MAKE distclean)
(cd $BASEDIR/$WT_DIR/build_posix && $MAKE clean)
rm -rf system $SNAPPY_DIR
rm -f ${BASEDIR}/../priv/wt
rm -f ${BASEDIR}/../priv/libwiredtiger-*.so
@ -131,22 +130,23 @@ case "$1" in
;;
*)
[ -d $WT_DIR ] || get_wt;
[ -d $SNAPPY_DIR ] || get_snappy;
shopt -s extglob
SUFFIXES='@(so|dylib)'
# Build Snappy
[ -d $SNAPPY_DIR ] || get_snappy;
[ -d $BASEDIR/$SNAPPY_DIR ] || (echo "Missing Snappy source directory" && exit 1)
test -f $BASEDIR/system/lib/libsnappy.so.[0-9].[0-9].[0-9] || build_snappy;
test -f $BASEDIR/system/lib/libsnappy.so.[0-9].[0-9].[0-9].* || build_snappy;
# Build WiredTiger
[ -d $WT_DIR ] || get_wt;
[ -d $BASEDIR/$WT_DIR ] || (echo "Missing WiredTiger source directory" && exit 1)
test -f $BASEDIR/system/lib/libwiredtiger-[0-9].[0-9].[0-9].so \
-a -f $BASEDIR/system/lib/libwiredtiger_snappy.so || build_wt;
test -f $BASEDIR/system/lib/libwiredtiger-[0-9].[0-9].[0-9].${SUFFIXES} -a \
-f $BASEDIR/system/lib/libwiredtiger_snappy.${SUFFIXES} || build_wt;
[ -d $BASEDIR/../priv ] || mkdir ${BASEDIR}/../priv
cp -p -P $BASEDIR/system/bin/wt ${BASEDIR}/../priv
cp -p -P $BASEDIR/system/lib/libwiredtiger-[0-9].[0-9].[0-9].so ${BASEDIR}/../priv
cp -p -P $BASEDIR/system/lib/libwiredtiger_snappy.so* ${BASEDIR}/../priv
cp -p -P $BASEDIR/system/lib/libsnappy.so* ${BASEDIR}/../priv
cp -p -P $BASEDIR/system/bin/wt ${BASEDIR}/../priv
cp -p -P ${BASEDIR}/system/lib/libwiredtiger-[0-9].[0-9].[0-9].${SUFFIXES} ${BASEDIR}/../priv
cp -p -P ${BASEDIR}/system/lib/libwiredtiger_snappy.${SUFFIXES} ${BASEDIR}/../priv
cp -p -P ${BASEDIR}/system/lib/libsnappy.${SUFFIXES}* ${BASEDIR}/../priv
;;
esac

View file

@ -1,5 +1,5 @@
diff --git a/ext/compressors/snappy/Makefile.am b/ext/compressors/snappy/Makefile.am
index 6d78823..2122cf8 100644
index 6d78823..c423590 100644
--- a/ext/compressors/snappy/Makefile.am
+++ b/ext/compressors/snappy/Makefile.am
@@ -2,5 +2,6 @@ AM_CPPFLAGS = -I$(top_builddir) -I$(top_srcdir)/src/include
@ -7,100 +7,6 @@ index 6d78823..2122cf8 100644
lib_LTLIBRARIES = libwiredtiger_snappy.la
libwiredtiger_snappy_la_SOURCES = snappy_compress.c
-libwiredtiger_snappy_la_LDFLAGS = -avoid-version -module
+libwiredtiger_snappy_la_CFLAGS = -I$(src_builddir)/../../system/include
+libwiredtiger_snappy_la_LDFLAGS = -avoid-version -module -L$(src_builddir)/../../system/lib -Wl,-rpath,lib/wterl-0.9.0/priv:lib/wterl/priv:priv
+libwiredtiger_snappy_la_CFLAGS = -I$(abs_top_builddir)/../../system/include
+libwiredtiger_snappy_la_LDFLAGS = -avoid-version -module -L$(abs_top_builddir)/../../system/lib -Wl,-rpath,lib/wterl-0.9.0/priv:lib/wterl/priv:priv
libwiredtiger_snappy_la_LIBADD = -lsnappy
diff --git a/src/support/cksum.c b/src/support/cksum.c
index 7e9befe..b924db7 100644
--- a/src/support/cksum.c
+++ b/src/support/cksum.c
@@ -27,6 +27,13 @@
#include "wt_internal.h"
+#if defined(__amd64) || defined(__x86_64)
+#define USE_HARDWARE_CRC32 1
+#else
+#undef USE_HARDWARE_CRC32
+#endif
+
+#ifdef USE_HARDWARE_CRC32
static const uint32_t g_crc_slicing[8][256] = {
#ifdef WORDS_BIGENDIAN
/*
@@ -1078,6 +1085,7 @@ static const uint32_t g_crc_slicing[8][256] = {
}
#endif
};
+#endif /* USE_HARDWARE_CRC32 */
/*
* __wt_cksum --
@@ -1106,15 +1114,29 @@ __wt_cksum(const void *chunk, size_t len)
/* Checksum one byte at a time to the first 4B boundary. */
for (p = chunk;
((uintptr_t)p & (sizeof(uint32_t) - 1)) != 0 &&
- len > 0; ++p, --len)
+ len > 0; ++p, --len) {
+#ifdef USE_HARDWARE_CRC32
+ __asm__ __volatile__(
+ ".byte 0xF2, 0x0F, 0x38, 0xF0, 0xF1"
+ : "=S" (crc)
+ : "0" (crc), "c" (*p));
+#else
#ifdef WORDS_BIGENDIAN
crc = g_crc_slicing[0][((crc >> 24) ^ *p) & 0xFF] ^ (crc << 8);
#else
crc = g_crc_slicing[0][(crc ^ *p) & 0xFF] ^ (crc >> 8);
#endif
+#endif
+ }
/* Checksum in 8B chunks. */
for (nqwords = len / sizeof(uint64_t); nqwords; nqwords--) {
+#ifdef USE_HARDWARE_CRC32
+ __asm__ __volatile__ (
+ ".byte 0xf2, 0x48, 0x0f, 0x38, 0xf0, 0xf1;"
+ : "=S"(crc)
+ : "S"(crc), "c"(*p));
+#else
crc ^= *(uint32_t *)p;
p += sizeof(uint32_t);
next = *(uint32_t *)p;
@@ -1139,22 +1161,32 @@ __wt_cksum(const void *chunk, size_t len)
g_crc_slicing[1][(next >> 16) & 0xFF] ^
g_crc_slicing[0][(next >> 24)];
#endif
+#endif
}
/* Checksum trailing bytes one byte at a time. */
+ for (len &= 0x7; len > 0; ++p, len--) {
+#ifdef USE_HARDWARE_CRC32
+ __asm__ __volatile__(
+ ".byte 0xF2, 0x0F, 0x38, 0xF0, 0xF1"
+ : "=S" (crc)
+ : "0" (crc), "c" (*p));
+#else
#ifdef WORDS_BIGENDIAN
- for (len &= 0x7; len > 0; ++p, len--)
crc = g_crc_slicing[0][((crc >> 24) ^ *p) & 0xFF] ^ (crc << 8);
+#else
+ crc = g_crc_slicing[0][(crc ^ *p) & 0xFF] ^ (crc >> 8);
+#endif
+#endif
+ }
+#ifdef WORDS_BIGENDIAN
/* Do final byte swap to produce a result identical to little endian */
crc =
((crc << 24) & 0xFF000000) |
((crc << 8) & 0x00FF0000) |
((crc >> 8) & 0x0000FF00) |
((crc >> 24) & 0x000000FF);
-#else
- for (len &= 0x7; len > 0; ++p, len--)
- crc = g_crc_slicing[0][(crc ^ *p) & 0xFF] ^ (crc >> 8);
#endif
return (~crc);
}

View file

@ -204,7 +204,7 @@ __ctx_cache_evict(WterlConnHandle *conn_handle)
STAILQ_REMOVE(&conn_handle->cache, c, wterl_ctx, entries);
if (c->session)
c->session->close(c->session, NULL);
enif_free(c);
free(c);
num_evicted++;
}
}
@ -333,7 +333,7 @@ __retain_ctx(WterlConnHandle *conn_handle, uint32_t worker_id,
int rc = conn->open_session(conn, NULL, session_config, &session);
if (rc != 0) return rc;
size_t s = sizeof(struct wterl_ctx) + (count * sizeof(struct cursor_info)) + sig_len;
c = enif_alloc(s); // TODO: enif_alloc_resource()
c = malloc(s); // TODO: enif_alloc_resource()
if (c == NULL) {
session->close(session, NULL);
return ENOMEM;
@ -355,7 +355,7 @@ __retain_ctx(WterlConnHandle *conn_handle, uint32_t worker_id,
c->ci[i].config = __copy_str_into(&p, config);
rc = session->open_cursor(session, uri, NULL, config, &c->ci[i].cursor);
if (rc != 0) {
enif_free(c);
free(c);
session->close(session, NULL); // this will free the cursors too
va_end(ap);
return rc;
@ -405,7 +405,7 @@ __close_all_sessions(WterlConnHandle *conn_handle)
STAILQ_REMOVE(&conn_handle->cache, c, wterl_ctx, entries);
conn_handle->cache_size -= 1;
c->session->close(c->session, NULL);
enif_free(c);
free(c);
c = n;
}
}
@ -431,7 +431,7 @@ __close_cursors_on(WterlConnHandle *conn_handle, const char *uri)
STAILQ_REMOVE(&conn_handle->cache, c, wterl_ctx, entries);
conn_handle->cache_size -= 1;
c->session->close(c->session, NULL);
enif_free(c);
free(c);
break;
}
}
@ -440,6 +440,7 @@ __close_cursors_on(WterlConnHandle *conn_handle, const char *uri)
return;
}
/**
* Callback to handle error messages.
*
@ -454,13 +455,15 @@ __close_cursors_on(WterlConnHandle *conn_handle, const char *uri)
* operation or library failure.
*/
int
__wterl_error_handler(WT_EVENT_HANDLER *handler, int error, const char *message)
__wterl_error_handler(WT_EVENT_HANDLER *handler, WT_SESSION *session,
int error, const char *message)
{
struct wterl_event_handlers *eh = (struct wterl_event_handlers *)handler;
ErlNifEnv *msg_env;
ErlNifPid *to_pid;
int rc = 0;
UNUSED(session);
enif_mutex_lock(eh->error_mutex);
msg_env = eh->msg_env_error;
to_pid = &eh->to_pid;
@ -492,13 +495,14 @@ __wterl_error_handler(WT_EVENT_HANDLER *handler, int error, const char *message)
* operation or library failure.
*/
int
__wterl_message_handler(WT_EVENT_HANDLER *handler, const char *message)
__wterl_message_handler(WT_EVENT_HANDLER *handler, WT_SESSION *session, const char *message)
{
struct wterl_event_handlers *eh = (struct wterl_event_handlers *)handler;
ErlNifEnv *msg_env;
ErlNifPid *to_pid;
int rc = 0;
UNUSED(session);
enif_mutex_lock(eh->message_mutex);
msg_env = eh->msg_env_message;
to_pid = &eh->to_pid;
@ -529,13 +533,14 @@ __wterl_message_handler(WT_EVENT_HANDLER *handler, const char *message)
* operation or library failure.
*/
int
__wterl_progress_handler(WT_EVENT_HANDLER *handler, const char *operation, uint64_t counter)
__wterl_progress_handler(WT_EVENT_HANDLER *handler, WT_SESSION *session, const char *operation, uint64_t counter)
{
struct wterl_event_handlers *eh = (struct wterl_event_handlers *)handler;
ErlNifEnv *msg_env;
ErlNifPid *to_pid;
int rc = 0;
UNUSED(session);
enif_mutex_lock(eh->progress_mutex);
msg_env = eh->msg_env_progress;
to_pid = &eh->to_pid;
@ -637,7 +642,7 @@ ASYNC_NIF_DECL(
return;
}
if (session_config.size > 1) {
char *sc = enif_alloc(session_config.size);
char *sc = malloc(session_config.size);
if (!sc) {
enif_release_resource(conn_handle);
ASYNC_NIF_REPLY(__strerror_term(env, ENOMEM));
@ -698,7 +703,7 @@ ASYNC_NIF_DECL(
enif_mutex_lock(args->conn_handle->cache_mutex);
__close_all_sessions(args->conn_handle);
if (args->conn_handle->session_config) {
enif_free((char *)args->conn_handle->session_config);
free((char *)args->conn_handle->session_config);
args->conn_handle->session_config = NULL;
}
WT_CONNECTION* conn = args->conn_handle->conn;
@ -2265,7 +2270,7 @@ on_load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info)
ATOM_WIREDTIGER_VSN = enif_make_atom(env, "wiredtiger_vsn");
ATOM_MSG_PID = enif_make_atom(env, "message_pid");
struct wterl_priv_data *priv = enif_alloc(sizeof(struct wterl_priv_data));
struct wterl_priv_data *priv = malloc(sizeof(struct wterl_priv_data));
if (!priv)
return ENOMEM;
memset(priv, 0, sizeof(struct wterl_priv_data));
@ -2293,17 +2298,17 @@ on_load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info)
/* Note: !!! the first element of our priv_data struct *must* be the
pointer to the async_nif's private data which we set here. */
ASYNC_NIF_LOAD(wterl, priv->async_nif_priv);
ASYNC_NIF_LOAD(wterl, env, priv->async_nif_priv);
if (!priv->async_nif_priv) {
memset(priv, 0, sizeof(struct wterl_priv_data));
enif_free(priv);
free(priv);
return ENOMEM;
}
*priv_data = priv;
char msg[1024];
snprintf(msg, 1024, "NIF on_load complete (wterl version: %s, wiredtiger version: %s)", priv->wterl_vsn, priv->wiredtiger_vsn);
__wterl_message_handler((WT_EVENT_HANDLER *)&priv->eh, msg);
__wterl_message_handler((WT_EVENT_HANDLER *)&priv->eh, NULL, msg);
return 0;
}
@ -2342,7 +2347,7 @@ on_unload(ErlNifEnv *env, void *priv_data)
enif_free_env(eh->msg_env_progress);
memset(priv, 0, sizeof(struct wterl_priv_data));
enif_free(priv);
free(priv);
priv_data = NULL;
}

6
priv/wterl.schema Normal file
View file

@ -0,0 +1,6 @@
%%%% This is the WiredTiger section
%% @doc wiredtiger data_root
{mapping, "wiredtiger.data_root", "wterl.data_root", [
{default, "{{platform_data_dir}}/wiredtiger"}
]}.

View file

@ -38,8 +38,8 @@
{port_specs, [{"priv/wterl.so", ["c_src/*.c"]}]}.
{port_env, [
{"DRV_CFLAGS", "$DRV_CFLAGS -fPIC -Wall -Wextra -Werror -I c_src/system/include"},
{"DRV_LDFLAGS", "$DRV_LDFLAGS -Wl,-rpath,lib/wterl/priv:priv -Lc_src/system/lib -lwiredtiger"}
{"DRV_CFLAGS", "$DRV_CFLAGS -O3 -mtune=native -march=native -fPIC -Wall -Wextra -Werror -I c_src/system/include"},
{"DRV_LDFLAGS", "$DRV_LDFLAGS -Wl,-rpath,lib/wterl/priv:lib/wterl-0.9.0/priv:priv -Lc_src/system/lib -lwiredtiger"}
]}.
{pre_hooks, [{compile, "c_src/build_deps.sh compile"}]}.

View file

@ -21,27 +21,34 @@
%%
%% -------------------------------------------------------------------
-spec async_nif_enqueue(reference(), function(), [term()]) -> term() | {error, term()}.
async_nif_enqueue(R, F, A) ->
case erlang:apply(F, [R|A]) of
{ok, enqueued} ->
receive
{R, {error, shutdown}=Error} ->
%% Work unit was queued, but not executed.
Error;
{R, {error, _Reason}=Error} ->
%% Work unit returned an error.
Error;
{R, Reply} ->
Reply
end;
{error, eagain} ->
%% Work unit was not queued, try again.
async_nif_enqueue(R, F, A);
%{error, enomem} ->
%{error, shutdown} ->
Other ->
Other
end.
-define(ASYNC_NIF_CALL(Fun, Args), async_nif_enqueue(erlang:make_ref(), Fun, Args)).
-define(ASYNC_NIF_CALL(Fun, Args),
F = fun(F, T) ->
R = erlang:make_ref(),
case erlang:apply(Fun, [R|Args]) of
{ok, {enqueued, PctBusy}} ->
if
PctBusy > 0.25 andalso PctBusy =< 1.0 ->
erlang:bump_reductions(erlang:trunc(2000 * PctBusy));
true ->
ok
end,
receive
{R, {error, shutdown}=Error} ->
%% Work unit was queued, but not executed.
Error;
{R, {error, _Reason}=Error} ->
%% Work unit returned an error.
Error;
{R, Reply} ->
Reply
end;
{error, eagain} ->
case T of
3 -> not_found;
_ -> F(F, T + 1)
end;
Other ->
Other
end
end,
F(F, 1)).

View file

@ -22,6 +22,7 @@
-module(riak_kv_wterl_backend).
-behavior(temp_riak_kv_backend).
-compile([{parse_transform, lager_transform}]).
%% KV Backend API
-export([api_version/0,
@ -42,7 +43,7 @@
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-compiel(export_all).
-compile(export_all).
-endif.
-define(API_VERSION, 1).
@ -119,14 +120,14 @@ start(Partition, Config) ->
"lsm" ->
[{internal_page_max, "128K"},
{leaf_page_max, "16K"},
{lsm_chunk_size, "100MB"},
{lsm_merge_threads, 2},
{prefix_compression, true},
{lsm_bloom_newest, true},
{lsm_bloom_oldest, true} ,
{lsm_bloom_bit_count, 28},
{lsm_bloom_hash_count, 19},
{lsm_bloom_config, [{leaf_page_max, "8MB"}]}
{lsm, [
{bloom_config, [{leaf_page_max, "8MB"}]},
{bloom_bit_count, 28},
{bloom_hash_count, 19},
{bloom_oldest, true},
{chunk_size, "100MB"},
{merge_threads, 2}
]}
] ++ Compressor;
"table" ->
Compressor
@ -341,22 +342,23 @@ is_empty(#state{connection=Connection, table=Table}) ->
%% @doc Get the status information for this wterl backend
-spec status(state()) -> [{atom(), term()}].
status(#state{connection=Connection, table=Table}) ->
case wterl:cursor_open(Connection, Table) of
{ok, Cursor} ->
TheStats =
case fetch_status(Cursor) of
{ok, Stats} ->
Stats;
{error, {eperm, _}} -> % TODO: review/fix this logic
{ok, []};
_ ->
{ok, []}
end,
wterl:cursor_close(Cursor),
TheStats;
{error, Reason2} ->
{error, Reason2}
end.
[].
%% case wterl:cursor_open(Connection, "statistics:" ++ Table, [{statistics_fast, true}]) of
%% {ok, Cursor} ->
%% TheStats =
%% case fetch_status(Cursor) of
%% {ok, Stats} ->
%% Stats;
%% {error, {eperm, _}} -> % TODO: review/fix this logic
%% {ok, []};
%% _ ->
%% {ok, []}
%% end,
%% wterl:cursor_close(Cursor),
%% TheStats;
%% {error, Reason2} ->
%% {error, Reason2}
%% end.
%% @doc Register an asynchronous callback
-spec callback(reference(), any(), state()) -> {ok, state()}.
@ -399,30 +401,41 @@ establish_connection(Config, Type) ->
ok = filelib:ensure_dir(filename:join(DataRoot, "x")),
%% WT Connection Options:
%% NOTE: LSM auto-checkpoints, so we don't have too.
LogSetting = app_helper:get_prop_or_env(log, Config, wterl, false),
CheckpointSetting =
case Type =:= "lsm" of
true ->
[];
case LogSetting of
true ->
%% Turn checkpoints on if logging is on, checkpoints enable log archival.
app_helper:get_prop_or_env(checkpoint, Config, wterl, [{wait, 30}]); % in seconds
_ ->
[]
end;
false ->
app_helper:get_prop_or_env(checkpoint, Config, wterl, [{wait, 10}])
app_helper:get_prop_or_env(checkpoint, Config, wterl, [{wait, 30}])
end,
RequestedCacheSize = app_helper:get_prop_or_env(cache_size, Config, wterl),
ConnectionOpts =
orddict:from_list(
[ wterl:config_value(create, Config, true),
wterl:config_value(sync, Config, false),
wterl:config_value(checkpoint_sync, Config, false),
wterl:config_value(transaction_sync, Config, "none"),
wterl:config_value(log, Config, [{enabled, LogSetting}]),
wterl:config_value(mmap, Config, false),
wterl:config_value(checkpoint, Config, CheckpointSetting),
wterl:config_value(session_max, Config, max_sessions(Config)),
wterl:config_value(cache_size, Config, size_cache(RequestedCacheSize)),
wterl:config_value(statistics, Config, [ "fast", "clear"]),
wterl:config_value(statistics_log, Config, [{wait, 600}]), % in seconds
wterl:config_value(verbose, Config, [ "salvage", "verify"
% Note: for some unknown reason, if you add these additional
% verbose flags Erlang SEGV's "size_object: bad tag for 0x80"
% no idea why... yet... you've been warned.
% no idea why... you've been warned.
%"block", "shared_cache", "reconcile", "evict", "lsm",
%"fileops", "read", "write", "readserver", "evictserver",
%"hazard", "mutex", "ckpt"
]) ] ++ CheckpointSetting ++ proplists:get_value(wterl, Config, [])), % sec
]) ] ++ proplists:get_value(wterl, Config, [])), % sec
%% WT Session Options:
SessionOpts = [{isolation, "snapshot"}],
@ -543,15 +556,15 @@ from_index_key(LKey) ->
%% @private
%% Return all status from wterl statistics cursor
fetch_status(Cursor) ->
{ok, fetch_status(Cursor, wterl:cursor_next_value(Cursor), [])}.
fetch_status(_Cursor, {error, _}, Acc) ->
lists:reverse(Acc);
fetch_status(_Cursor, not_found, Acc) ->
lists:reverse(Acc);
fetch_status(Cursor, {ok, Stat}, Acc) ->
[What,Val|_] = [binary_to_list(B) || B <- binary:split(Stat, [<<0>>], [global])],
fetch_status(Cursor, wterl:cursor_next_value(Cursor), [{What,Val}|Acc]).
%% fetch_status(Cursor) ->
%% {ok, fetch_status(Cursor, wterl:cursor_next_value(Cursor), [])}.
%% fetch_status(_Cursor, {error, _}, Acc) ->
%% lists:reverse(Acc);
%% fetch_status(_Cursor, not_found, Acc) ->
%% lists:reverse(Acc);
%% fetch_status(Cursor, {ok, Stat}, Acc) ->
%% [What,Val|_] = [binary_to_list(B) || B <- binary:split(Stat, [<<0>>], [global])],
%% fetch_status(Cursor, wterl:cursor_next_value(Cursor), [{What,Val}|Acc]).
size_cache(RequestedSize) ->
Size =

View file

@ -96,8 +96,8 @@ nif_stub_error(Line) ->
-spec init() -> ok | {error, any()}.
init() ->
erlang:load_nif(filename:join([priv_dir(), atom_to_list(?MODULE)]),
[{wterl_vsn, "53307e8"},
{wiredtiger_vsn, "1.6.2-0-g07cb0a5"}]).
[{wterl_vsn, "942e51b"},
{wiredtiger_vsn, "1.6.4-275-g9c44420"}]). %% TODO automate these
-spec connection_open(string(), config_list()) -> {ok, connection()} | {error, term()}.
-spec connection_open(string(), config_list(), config_list()) -> {ok, connection()} | {error, term()}.
@ -454,17 +454,26 @@ config_to_bin([], Acc) ->
config_to_bin([{Key, Value} | Rest], Acc) ->
ConfigTypes =
[{block_compressor, {string, quoted}},
{bloom_bit_count, integer},
{bloom_config, config},
{bloom_hash_count, integer},
{bloom_newest, bool},
{bloom_oldest, bool},
{cache_size, string},
{checkpoint, config},
{checkpoint_sync, bool},
{checksum, string},
{chunk_size, string},
{create, bool},
{direct_io, list},
{drop, list},
{enabled, bool},
{error_prefix, string},
{eviction_target, integer},
{eviction_trigger, integer},
{extensions, {list, quoted}},
{statistics_fast, bool},
{file_max, string},
{force, bool},
{from, string},
{hazard_max, integer},
@ -474,24 +483,21 @@ config_to_bin([{Key, Value} | Rest], Acc) ->
{isolation, string},
{key_type, string},
{leaf_page_max, string},
{logging, bool},
{lsm_bloom_bit_count, integer},
{lsm_bloom_config, config},
{lsm_bloom_hash_count, integer},
{lsm_bloom_newest, bool},
{lsm_bloom_oldest, bool},
{lsm_chunk_size, string},
{prefix_compression, bool},
{lsm_merge_threads, integer},
{log, config},
{lsm, config},
{mmap, bool},
{merge_threads, integer},
{multiprocess, bool},
{name, string},
{overwrite, bool},
{prefix_compression, bool},
{raw, bool},
{session_max, integer},
{statistics, list},
{statistics_log, config},
{sync, bool},
{target, {list, quoted}},
{to, string},
{transaction_sync, string},
{transactional, bool},
{verbose, list},
{wait, integer}],
@ -613,7 +619,7 @@ many_open_tables_test_() ->
DataDir = ?TEST_DATA_DIR,
KeyGen =
fun(X) ->
crypto:sha(<<X>>)
crypto:hash(sha, <<X>>)
end,
ValGen =
fun() ->

View file

@ -25,7 +25,13 @@
{mode, max}.
{duration, 10}.
{concurrent, 4}.
{concurrent, 16}.
{report_interval, 1}.
{pb_timeout_general, 1000}. % ms
%{pb_timeout_read, ?}.
%{pb_timeout_write, ?}.
%{pb_timeout_listkeys, ?}.
%{pb_timeout_mapreduce, ?}.
{driver, basho_bench_driver_wterl}.
{key_generator, {int_to_bin_littleendian,{uniform_int, 5000000}}}.
{value_generator, {fixed_bin, 10000}}.
@ -37,7 +43,9 @@
{wterl, [
{connection, [
{create, true},
{sync, false},
{session_sync, false},
{transaction_sync, "none"},
{log, [{enabled, false}]},
{session_max, 1024},
{cache_size, 4294967296},
{verbose, []},
@ -52,11 +60,11 @@
]},
{session, [ {isolation, "snapshot"} ]},
{table_uri, "lsm:test"},
{lsm_merge_threads, 2},
{table, [
{internal_page_max, "128K"},
{leaf_page_max, "128K"},
{lsm_chunk_size, "25MB"},
{prefix_compression, false},
{lsm_bloom_newest, true},
{lsm_bloom_oldest, true} ,
{lsm_bloom_bit_count, 128},
@ -70,9 +78,9 @@
{wterl_, [
{connection, [
{create, true},
{sync, false},
{logging, true},
{transactional, true},
{session_sync, false},
{transaction_sync, "none"},
{log, [{enabled, false}]},
{session_max, 1024},
{cache_size, 4294967296},
{verbose, []},
@ -89,7 +97,6 @@
{session, [ {isolation, "snapshot"} ]},
{table_uri, "table:test"},
{table, [
{prefix_compression, false},
{block_compressor, "snappy"} % bzip2
]}
]}.