Compare commits

..

No commits in common. "master" and "gsb-workers-migrate" have entirely different histories.

12 changed files with 278 additions and 240 deletions

4
.gitignore vendored
View file

@ -7,8 +7,6 @@ c_src/*.o
c_src/bzip2-1.0.6
c_src/snappy-1.0.4
deps/
priv/wt
priv/*.so*
priv/*.dylib*
priv/
log/
*~

View file

@ -52,17 +52,19 @@ endif
.PHONY: all compile doc clean test dialyzer typer shell distclean pdf \
update-deps clean-common-test-data rebuild
all: deps compile
all: deps compile test
# =============================================================================
# Rules to build the system
# =============================================================================
deps:
c_src/build_deps.sh get-deps
$(REBAR) get-deps
$(REBAR) compile
update-deps:
c_src/build_deps.sh update-deps
$(REBAR) update-deps
$(REBAR) compile

View file

@ -34,18 +34,9 @@ extern "C" {
#define ASYNC_NIF_MAX_WORKERS 1024
#define ASYNC_NIF_MIN_WORKERS 2
#define ASYNC_NIF_WORKER_QUEUE_SIZE 8192
#define ASYNC_NIF_WORKER_QUEUE_SIZE 100
#define ASYNC_NIF_MAX_QUEUED_REQS ASYNC_NIF_WORKER_QUEUE_SIZE * ASYNC_NIF_MAX_WORKERS
/* Atoms (initialized in on_load) */
static ERL_NIF_TERM ATOM_EAGAIN;
static ERL_NIF_TERM ATOM_ENOMEM;
static ERL_NIF_TERM ATOM_ENQUEUED;
static ERL_NIF_TERM ATOM_ERROR;
static ERL_NIF_TERM ATOM_OK;
static ERL_NIF_TERM ATOM_SHUTDOWN;
struct async_nif_req_entry {
ERL_NIF_TERM ref;
ErlNifEnv *env;
@ -113,20 +104,25 @@ struct async_nif_state {
argc -= 1; \
/* Note: !!! this assumes that the first element of priv_data is ours */ \
struct async_nif_state *async_nif = *(struct async_nif_state**)enif_priv_data(env); \
if (async_nif->shutdown) \
return enif_make_tuple2(env, ATOM_ERROR, ATOM_SHUTDOWN); \
if (async_nif->shutdown) { \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "shutdown")); \
} \
req = async_nif_reuse_req(async_nif); \
if (!req) \
return enif_make_tuple2(env, ATOM_ERROR, ATOM_ENOMEM); \
if (!req) { \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "eagain")); \
} \
new_env = req->env; \
DPRINTF("async_nif: calling \"%s\"", __func__); \
do pre_block while(0); \
DPRINTF("async_nif: returned from \"%s\"", __func__); \
copy_of_args = (struct decl ## _args *)malloc(sizeof(struct decl ## _args)); \
copy_of_args = (struct decl ## _args *)enif_alloc(sizeof(struct decl ## _args)); \
if (!copy_of_args) { \
fn_post_ ## decl (args); \
async_nif_recycle_req(req, async_nif); \
return enif_make_tuple2(env, ATOM_ERROR, ATOM_ENOMEM); \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "enomem")); \
} \
memcpy(copy_of_args, args, sizeof(struct decl ## _args)); \
req->ref = enif_make_copy(new_env, argv_in[0]); \
@ -141,8 +137,9 @@ struct async_nif_state {
if (!reply) { \
fn_post_ ## decl (args); \
async_nif_recycle_req(req, async_nif); \
free(copy_of_args); \
return enif_make_tuple2(env, ATOM_ERROR, ATOM_EAGAIN); \
enif_free(copy_of_args); \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "eagain")); \
} \
return reply; \
}
@ -150,11 +147,11 @@ struct async_nif_state {
#define ASYNC_NIF_INIT(name) \
static ErlNifMutex *name##_async_nif_coord = NULL;
#define ASYNC_NIF_LOAD(name, env, priv) do { \
#define ASYNC_NIF_LOAD(name, priv) do { \
if (!name##_async_nif_coord) \
name##_async_nif_coord = enif_mutex_create("nif_coord load"); \
enif_mutex_lock(name##_async_nif_coord); \
priv = async_nif_load(env); \
priv = async_nif_load(); \
enif_mutex_unlock(name##_async_nif_coord); \
} while(0);
#define ASYNC_NIF_UNLOAD(name, env, priv) do { \
@ -195,15 +192,15 @@ async_nif_reuse_req(struct async_nif_state *async_nif)
enif_mutex_lock(async_nif->recycled_req_mutex);
if (STAILQ_EMPTY(&async_nif->recycled_reqs)) {
if (async_nif->num_reqs < ASYNC_NIF_MAX_QUEUED_REQS) {
req = malloc(sizeof(struct async_nif_req_entry));
req = enif_alloc(sizeof(struct async_nif_req_entry));
if (req) {
memset(req, 0, sizeof(struct async_nif_req_entry));
env = enif_alloc_env();
if (env) {
req->env = env;
__sync_fetch_and_add(&async_nif->num_reqs, 1);
__sync_fetch_and_add(&async_nif->num_reqs, 1);
} else {
free(req);
enif_free(req);
req = NULL;
}
}
@ -257,7 +254,7 @@ async_nif_start_worker(struct async_nif_state *async_nif, struct async_nif_work_
SLIST_REMOVE(&async_nif->we_joining, we, async_nif_worker_entry, entries);
void *exit_value = 0; /* We ignore the thread_join's exit value. */
enif_thread_join(we->tid, &exit_value);
free(we);
enif_free(we);
async_nif->we_active--;
we = n;
}
@ -267,7 +264,7 @@ async_nif_start_worker(struct async_nif_state *async_nif, struct async_nif_work_
return EAGAIN;
}
we = malloc(sizeof(struct async_nif_worker_entry));
we = enif_alloc(sizeof(struct async_nif_worker_entry));
if (!we) {
enif_mutex_unlock(async_nif->we_mutex);
return ENOMEM;
@ -302,8 +299,8 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
qid = (unsigned int)hint;
} else {
do {
last_qid = __sync_fetch_and_add(&async_nif->next_q, 0);
qid = (last_qid + 1) % async_nif->num_queues;
last_qid = __sync_fetch_and_add(&async_nif->next_q, 0);
qid = (last_qid + 1) % async_nif->num_queues;
} while (!__sync_bool_compare_and_swap(&async_nif->next_q, last_qid, qid));
}
@ -326,6 +323,10 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
queue will be valid until we release the lock. */
q = &async_nif->queues[qid];
enif_mutex_lock(q->reqs_mutex);
if (async_nif->shutdown) {
enif_mutex_unlock(q->reqs_mutex);
return 0;
}
/* Try not to enqueue a request into a queue that isn't keeping up with
the request volume. */
@ -358,10 +359,8 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
/* Build the term before releasing the lock so as not to race on the use of
the req pointer (which will soon become invalid in another thread
performing the request). */
double pct_full = (double)avg_depth / (double)ASYNC_NIF_WORKER_QUEUE_SIZE;
ERL_NIF_TERM reply = enif_make_tuple2(req->env, ATOM_OK,
enif_make_tuple2(req->env, ATOM_ENQUEUED,
enif_make_double(req->env, pct_full)));
ERL_NIF_TERM reply = enif_make_tuple2(req->env, enif_make_atom(req->env, "ok"),
enif_make_atom(req->env, "enqueued"));
enif_cond_signal(q->reqs_cnd);
enif_mutex_unlock(q->reqs_mutex);
return reply;
@ -392,25 +391,20 @@ async_nif_worker_fn(void *arg)
}
if (STAILQ_EMPTY(&q->reqs)) {
/* Queue is empty so we wait for more work to arrive. */
enif_mutex_unlock(q->reqs_mutex);
if (tries == 0 && q == we->q) {
if (q->num_workers > ASYNC_NIF_MIN_WORKERS) {
/* At this point we've tried to find/execute work on all queues
* and there are at least MIN_WORKERS on this queue so we
* leaving this loop (break) which leads to a thread exit/join. */
break;
} else {
enif_mutex_lock(q->reqs_mutex);
enif_cond_wait(q->reqs_cnd, q->reqs_mutex);
goto check_again_for_work;
}
} else {
tries--;
__sync_fetch_and_add(&q->num_workers, -1);
q = q->next;
__sync_fetch_and_add(&q->num_workers, 1);
continue; // try next queue
}
if (q->num_workers > ASYNC_NIF_MIN_WORKERS) {
enif_mutex_unlock(q->reqs_mutex);
if (tries == 0 && q == we->q) break; // we've tried all queues, thread exit
else {
tries--;
__sync_fetch_and_add(&q->num_workers, -1);
q = q->next;
__sync_fetch_and_add(&q->num_workers, 1);
continue; // try another queue
}
} else {
enif_cond_wait(q->reqs_cnd, q->reqs_mutex);
goto check_again_for_work;
}
} else {
/* At this point the next req is ours to process and we hold the
reqs_mutex lock. Take the request off the queue. */
@ -418,7 +412,8 @@ async_nif_worker_fn(void *arg)
STAILQ_REMOVE(&q->reqs, req, async_nif_req_entry, entries);
__sync_fetch_and_add(&q->depth, -1);
/* Wake up other worker thread watching this queue to help process work. */
/* Ensure that there is at least one other worker thread watching this
queue. */
enif_cond_signal(q->reqs_cnd);
enif_mutex_unlock(q->reqs_mutex);
@ -432,7 +427,7 @@ async_nif_worker_fn(void *arg)
req->ref = 0;
req->fn_work = 0;
req->fn_post = 0;
free(req->args);
enif_free(req->args);
req->args = NULL;
async_nif_recycle_req(req, async_nif);
req = NULL;
@ -484,7 +479,7 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
SLIST_REMOVE(&async_nif->we_joining, we, async_nif_worker_entry, entries);
void *exit_value = 0; /* We ignore the thread_join's exit value. */
enif_thread_join(we->tid, &exit_value);
free(we);
enif_free(we);
async_nif->we_active--;
we = n;
}
@ -503,11 +498,12 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
struct async_nif_req_entry *n = STAILQ_NEXT(req, entries);
enif_clear_env(req->env);
enif_send(NULL, &req->pid, req->env,
enif_make_tuple2(req->env, ATOM_ERROR, ATOM_SHUTDOWN));
enif_make_tuple2(req->env, enif_make_atom(req->env, "error"),
enif_make_atom(req->env, "shutdown")));
req->fn_post(req->args);
enif_free_env(req->env);
free(req->args);
free(req);
enif_free(req->args);
enif_free(req);
req = n;
}
enif_mutex_destroy(q->reqs_mutex);
@ -521,18 +517,18 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
while(req != NULL) {
struct async_nif_req_entry *n = STAILQ_NEXT(req, entries);
enif_free_env(req->env);
free(req);
enif_free(req);
req = n;
}
enif_mutex_unlock(async_nif->recycled_req_mutex);
enif_mutex_destroy(async_nif->recycled_req_mutex);
memset(async_nif, 0, sizeof(struct async_nif_state) + (sizeof(struct async_nif_work_queue) * async_nif->num_queues));
free(async_nif);
enif_free(async_nif);
}
static void *
async_nif_load(ErlNifEnv *env)
async_nif_load()
{
static int has_init = 0;
unsigned int i, num_queues;
@ -543,14 +539,6 @@ async_nif_load(ErlNifEnv *env)
if (has_init) return 0;
else has_init = 1;
/* Init some static references to commonly used atoms. */
ATOM_EAGAIN = enif_make_atom(env, "eagain");
ATOM_ENOMEM = enif_make_atom(env, "enomem");
ATOM_ENQUEUED = enif_make_atom(env, "enqueued");
ATOM_ERROR = enif_make_atom(env, "error");
ATOM_OK = enif_make_atom(env, "ok");
ATOM_SHUTDOWN = enif_make_atom(env, "shutdown");
/* Find out how many schedulers there are. */
enif_system_info(&info, sizeof(ErlNifSysInfo));
@ -568,8 +556,8 @@ async_nif_load(ErlNifEnv *env)
}
/* Init our portion of priv_data's module-specific state. */
async_nif = malloc(sizeof(struct async_nif_state) +
sizeof(struct async_nif_work_queue) * num_queues);
async_nif = enif_alloc(sizeof(struct async_nif_state) +
sizeof(struct async_nif_work_queue) * num_queues);
if (!async_nif)
return NULL;
memset(async_nif, 0, sizeof(struct async_nif_state) +

View file

@ -11,10 +11,10 @@ unset POSIX_SHELL # clear it so if we invoke other scripts, they run as ksh as w
set -e
WT_REPO=http://github.com/wiredtiger/wiredtiger.git
WT_BRANCH=develop
WT_DIR=wiredtiger-`basename $WT_BRANCH`
#WT_REF="tags/1.6.6"
#WT_DIR=wiredtiger-`basename $WT_REF`
#WT_BRANCH=develop
#WT_DIR=wiredtiger-`basename $WT_BRANCH`
WT_REF="tags/1.6.3"
WT_DIR=wiredtiger-`basename $WT_REF`
SNAPPY_VSN="1.0.4"
SNAPPY_DIR=snappy-$SNAPPY_VSN
@ -26,7 +26,8 @@ export BASEDIR="$PWD"
which gmake 1>/dev/null 2>/dev/null && MAKE=gmake
MAKE=${MAKE:-make}
export CPPFLAGS="$CPPLAGS -I $BASEDIR/system/include -O3 -mtune=native -march=native"
export CFLAGS="$CFLAGS -I $BASEDIR/system/include"
export CXXFLAGS="$CXXFLAGS -I $BASEDIR/system/include"
export LDFLAGS="$LDFLAGS -L$BASEDIR/system/lib"
export LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$BASEDIR/system/lib:$LD_LIBRARY_PATH"
@ -57,7 +58,7 @@ get_wt ()
wt_configure ()
{
(cd $BASEDIR/$WT_DIR/build_posix
CFLAGS+=-g $BASEDIR/$WT_DIR/configure --with-pic \
CFLAGS+=-g ../configure --with-pic \
--enable-snappy \
--prefix=${BASEDIR}/system || exit 1)
}
@ -75,8 +76,8 @@ get_snappy ()
get_deps ()
{
get_snappy;
get_wt;
get_snappy;
}
update_deps ()
@ -109,7 +110,7 @@ build_snappy ()
case "$1" in
clean)
[ -e $BASEDIR/$WT_DIR/build_posix/Makefile ] && \
(cd $BASEDIR/$WT_DIR/build_posix && $MAKE clean)
(cd $BASEDIR/$WT_DIR/build_posix && $MAKE distclean)
rm -rf system $SNAPPY_DIR
rm -f ${BASEDIR}/../priv/wt
rm -f ${BASEDIR}/../priv/libwiredtiger-*.so
@ -130,23 +131,22 @@ case "$1" in
;;
*)
shopt -s extglob
SUFFIXES='@(so|dylib)'
[ -d $WT_DIR ] || get_wt;
[ -d $SNAPPY_DIR ] || get_snappy;
# Build Snappy
[ -d $SNAPPY_DIR ] || get_snappy;
[ -d $BASEDIR/$SNAPPY_DIR ] || (echo "Missing Snappy source directory" && exit 1)
test -f $BASEDIR/system/lib/libsnappy.so.[0-9].[0-9].[0-9].* || build_snappy;
test -f $BASEDIR/system/lib/libsnappy.so.[0-9].[0-9].[0-9] || build_snappy;
# Build WiredTiger
[ -d $WT_DIR ] || get_wt;
[ -d $BASEDIR/$WT_DIR ] || (echo "Missing WiredTiger source directory" && exit 1)
test -f $BASEDIR/system/lib/libwiredtiger-[0-9].[0-9].[0-9].${SUFFIXES} -a \
-f $BASEDIR/system/lib/libwiredtiger_snappy.${SUFFIXES} || build_wt;
test -f $BASEDIR/system/lib/libwiredtiger-[0-9].[0-9].[0-9].so \
-a -f $BASEDIR/system/lib/libwiredtiger_snappy.so || build_wt;
[ -d $BASEDIR/../priv ] || mkdir ${BASEDIR}/../priv
cp -p -P $BASEDIR/system/bin/wt ${BASEDIR}/../priv
cp -p -P ${BASEDIR}/system/lib/libwiredtiger-[0-9].[0-9].[0-9].${SUFFIXES} ${BASEDIR}/../priv
cp -p -P ${BASEDIR}/system/lib/libwiredtiger_snappy.${SUFFIXES} ${BASEDIR}/../priv
cp -p -P ${BASEDIR}/system/lib/libsnappy.${SUFFIXES}* ${BASEDIR}/../priv
cp -p -P $BASEDIR/system/bin/wt ${BASEDIR}/../priv
cp -p -P $BASEDIR/system/lib/libwiredtiger-[0-9].[0-9].[0-9].so ${BASEDIR}/../priv
cp -p -P $BASEDIR/system/lib/libwiredtiger_snappy.so* ${BASEDIR}/../priv
cp -p -P $BASEDIR/system/lib/libsnappy.so* ${BASEDIR}/../priv
;;
esac

View file

@ -1,5 +1,5 @@
diff --git a/ext/compressors/snappy/Makefile.am b/ext/compressors/snappy/Makefile.am
index 6d78823..c423590 100644
index 6d78823..2122cf8 100644
--- a/ext/compressors/snappy/Makefile.am
+++ b/ext/compressors/snappy/Makefile.am
@@ -2,5 +2,6 @@ AM_CPPFLAGS = -I$(top_builddir) -I$(top_srcdir)/src/include
@ -7,6 +7,100 @@ index 6d78823..c423590 100644
lib_LTLIBRARIES = libwiredtiger_snappy.la
libwiredtiger_snappy_la_SOURCES = snappy_compress.c
-libwiredtiger_snappy_la_LDFLAGS = -avoid-version -module
+libwiredtiger_snappy_la_CFLAGS = -I$(abs_top_builddir)/../../system/include
+libwiredtiger_snappy_la_LDFLAGS = -avoid-version -module -L$(abs_top_builddir)/../../system/lib -Wl,-rpath,lib/wterl-0.9.0/priv:lib/wterl/priv:priv
+libwiredtiger_snappy_la_CFLAGS = -I$(src_builddir)/../../system/include
+libwiredtiger_snappy_la_LDFLAGS = -avoid-version -module -L$(src_builddir)/../../system/lib -Wl,-rpath,lib/wterl-0.9.0/priv:lib/wterl/priv:priv
libwiredtiger_snappy_la_LIBADD = -lsnappy
diff --git a/src/support/cksum.c b/src/support/cksum.c
index 7e9befe..b924db7 100644
--- a/src/support/cksum.c
+++ b/src/support/cksum.c
@@ -27,6 +27,13 @@
#include "wt_internal.h"
+#if defined(__amd64) || defined(__x86_64)
+#define USE_HARDWARE_CRC32 1
+#else
+#undef USE_HARDWARE_CRC32
+#endif
+
+#ifdef USE_HARDWARE_CRC32
static const uint32_t g_crc_slicing[8][256] = {
#ifdef WORDS_BIGENDIAN
/*
@@ -1078,6 +1085,7 @@ static const uint32_t g_crc_slicing[8][256] = {
}
#endif
};
+#endif /* USE_HARDWARE_CRC32 */
/*
* __wt_cksum --
@@ -1106,15 +1114,29 @@ __wt_cksum(const void *chunk, size_t len)
/* Checksum one byte at a time to the first 4B boundary. */
for (p = chunk;
((uintptr_t)p & (sizeof(uint32_t) - 1)) != 0 &&
- len > 0; ++p, --len)
+ len > 0; ++p, --len) {
+#ifdef USE_HARDWARE_CRC32
+ __asm__ __volatile__(
+ ".byte 0xF2, 0x0F, 0x38, 0xF0, 0xF1"
+ : "=S" (crc)
+ : "0" (crc), "c" (*p));
+#else
#ifdef WORDS_BIGENDIAN
crc = g_crc_slicing[0][((crc >> 24) ^ *p) & 0xFF] ^ (crc << 8);
#else
crc = g_crc_slicing[0][(crc ^ *p) & 0xFF] ^ (crc >> 8);
#endif
+#endif
+ }
/* Checksum in 8B chunks. */
for (nqwords = len / sizeof(uint64_t); nqwords; nqwords--) {
+#ifdef USE_HARDWARE_CRC32
+ __asm__ __volatile__ (
+ ".byte 0xf2, 0x48, 0x0f, 0x38, 0xf0, 0xf1;"
+ : "=S"(crc)
+ : "S"(crc), "c"(*p));
+#else
crc ^= *(uint32_t *)p;
p += sizeof(uint32_t);
next = *(uint32_t *)p;
@@ -1139,22 +1161,32 @@ __wt_cksum(const void *chunk, size_t len)
g_crc_slicing[1][(next >> 16) & 0xFF] ^
g_crc_slicing[0][(next >> 24)];
#endif
+#endif
}
/* Checksum trailing bytes one byte at a time. */
+ for (len &= 0x7; len > 0; ++p, len--) {
+#ifdef USE_HARDWARE_CRC32
+ __asm__ __volatile__(
+ ".byte 0xF2, 0x0F, 0x38, 0xF0, 0xF1"
+ : "=S" (crc)
+ : "0" (crc), "c" (*p));
+#else
#ifdef WORDS_BIGENDIAN
- for (len &= 0x7; len > 0; ++p, len--)
crc = g_crc_slicing[0][((crc >> 24) ^ *p) & 0xFF] ^ (crc << 8);
+#else
+ crc = g_crc_slicing[0][(crc ^ *p) & 0xFF] ^ (crc >> 8);
+#endif
+#endif
+ }
+#ifdef WORDS_BIGENDIAN
/* Do final byte swap to produce a result identical to little endian */
crc =
((crc << 24) & 0xFF000000) |
((crc << 8) & 0x00FF0000) |
((crc >> 8) & 0x0000FF00) |
((crc >> 24) & 0x000000FF);
-#else
- for (len &= 0x7; len > 0; ++p, len--)
- crc = g_crc_slicing[0][(crc ^ *p) & 0xFF] ^ (crc >> 8);
#endif
return (~crc);
}

View file

@ -204,7 +204,7 @@ __ctx_cache_evict(WterlConnHandle *conn_handle)
STAILQ_REMOVE(&conn_handle->cache, c, wterl_ctx, entries);
if (c->session)
c->session->close(c->session, NULL);
free(c);
enif_free(c);
num_evicted++;
}
}
@ -333,7 +333,7 @@ __retain_ctx(WterlConnHandle *conn_handle, uint32_t worker_id,
int rc = conn->open_session(conn, NULL, session_config, &session);
if (rc != 0) return rc;
size_t s = sizeof(struct wterl_ctx) + (count * sizeof(struct cursor_info)) + sig_len;
c = malloc(s); // TODO: enif_alloc_resource()
c = enif_alloc(s); // TODO: enif_alloc_resource()
if (c == NULL) {
session->close(session, NULL);
return ENOMEM;
@ -355,7 +355,7 @@ __retain_ctx(WterlConnHandle *conn_handle, uint32_t worker_id,
c->ci[i].config = __copy_str_into(&p, config);
rc = session->open_cursor(session, uri, NULL, config, &c->ci[i].cursor);
if (rc != 0) {
free(c);
enif_free(c);
session->close(session, NULL); // this will free the cursors too
va_end(ap);
return rc;
@ -405,7 +405,7 @@ __close_all_sessions(WterlConnHandle *conn_handle)
STAILQ_REMOVE(&conn_handle->cache, c, wterl_ctx, entries);
conn_handle->cache_size -= 1;
c->session->close(c->session, NULL);
free(c);
enif_free(c);
c = n;
}
}
@ -431,7 +431,7 @@ __close_cursors_on(WterlConnHandle *conn_handle, const char *uri)
STAILQ_REMOVE(&conn_handle->cache, c, wterl_ctx, entries);
conn_handle->cache_size -= 1;
c->session->close(c->session, NULL);
free(c);
enif_free(c);
break;
}
}
@ -440,7 +440,6 @@ __close_cursors_on(WterlConnHandle *conn_handle, const char *uri)
return;
}
/**
* Callback to handle error messages.
*
@ -455,15 +454,13 @@ __close_cursors_on(WterlConnHandle *conn_handle, const char *uri)
* operation or library failure.
*/
int
__wterl_error_handler(WT_EVENT_HANDLER *handler, WT_SESSION *session,
int error, const char *message)
__wterl_error_handler(WT_EVENT_HANDLER *handler, int error, const char *message)
{
struct wterl_event_handlers *eh = (struct wterl_event_handlers *)handler;
ErlNifEnv *msg_env;
ErlNifPid *to_pid;
int rc = 0;
UNUSED(session);
enif_mutex_lock(eh->error_mutex);
msg_env = eh->msg_env_error;
to_pid = &eh->to_pid;
@ -495,14 +492,13 @@ __wterl_error_handler(WT_EVENT_HANDLER *handler, WT_SESSION *session,
* operation or library failure.
*/
int
__wterl_message_handler(WT_EVENT_HANDLER *handler, WT_SESSION *session, const char *message)
__wterl_message_handler(WT_EVENT_HANDLER *handler, const char *message)
{
struct wterl_event_handlers *eh = (struct wterl_event_handlers *)handler;
ErlNifEnv *msg_env;
ErlNifPid *to_pid;
int rc = 0;
UNUSED(session);
enif_mutex_lock(eh->message_mutex);
msg_env = eh->msg_env_message;
to_pid = &eh->to_pid;
@ -533,14 +529,13 @@ __wterl_message_handler(WT_EVENT_HANDLER *handler, WT_SESSION *session, const ch
* operation or library failure.
*/
int
__wterl_progress_handler(WT_EVENT_HANDLER *handler, WT_SESSION *session, const char *operation, uint64_t counter)
__wterl_progress_handler(WT_EVENT_HANDLER *handler, const char *operation, uint64_t counter)
{
struct wterl_event_handlers *eh = (struct wterl_event_handlers *)handler;
ErlNifEnv *msg_env;
ErlNifPid *to_pid;
int rc = 0;
UNUSED(session);
enif_mutex_lock(eh->progress_mutex);
msg_env = eh->msg_env_progress;
to_pid = &eh->to_pid;
@ -642,7 +637,7 @@ ASYNC_NIF_DECL(
return;
}
if (session_config.size > 1) {
char *sc = malloc(session_config.size);
char *sc = enif_alloc(session_config.size);
if (!sc) {
enif_release_resource(conn_handle);
ASYNC_NIF_REPLY(__strerror_term(env, ENOMEM));
@ -703,7 +698,7 @@ ASYNC_NIF_DECL(
enif_mutex_lock(args->conn_handle->cache_mutex);
__close_all_sessions(args->conn_handle);
if (args->conn_handle->session_config) {
free((char *)args->conn_handle->session_config);
enif_free((char *)args->conn_handle->session_config);
args->conn_handle->session_config = NULL;
}
WT_CONNECTION* conn = args->conn_handle->conn;
@ -2270,7 +2265,7 @@ on_load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info)
ATOM_WIREDTIGER_VSN = enif_make_atom(env, "wiredtiger_vsn");
ATOM_MSG_PID = enif_make_atom(env, "message_pid");
struct wterl_priv_data *priv = malloc(sizeof(struct wterl_priv_data));
struct wterl_priv_data *priv = enif_alloc(sizeof(struct wterl_priv_data));
if (!priv)
return ENOMEM;
memset(priv, 0, sizeof(struct wterl_priv_data));
@ -2298,17 +2293,17 @@ on_load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info)
/* Note: !!! the first element of our priv_data struct *must* be the
pointer to the async_nif's private data which we set here. */
ASYNC_NIF_LOAD(wterl, env, priv->async_nif_priv);
ASYNC_NIF_LOAD(wterl, priv->async_nif_priv);
if (!priv->async_nif_priv) {
memset(priv, 0, sizeof(struct wterl_priv_data));
free(priv);
enif_free(priv);
return ENOMEM;
}
*priv_data = priv;
char msg[1024];
snprintf(msg, 1024, "NIF on_load complete (wterl version: %s, wiredtiger version: %s)", priv->wterl_vsn, priv->wiredtiger_vsn);
__wterl_message_handler((WT_EVENT_HANDLER *)&priv->eh, NULL, msg);
__wterl_message_handler((WT_EVENT_HANDLER *)&priv->eh, msg);
return 0;
}
@ -2347,7 +2342,7 @@ on_unload(ErlNifEnv *env, void *priv_data)
enif_free_env(eh->msg_env_progress);
memset(priv, 0, sizeof(struct wterl_priv_data));
free(priv);
enif_free(priv);
priv_data = NULL;
}

View file

@ -1,6 +0,0 @@
%%%% This is the WiredTiger section
%% @doc wiredtiger data_root
{mapping, "wiredtiger.data_root", "wterl.data_root", [
{default, "{{platform_data_dir}}/wiredtiger"}
]}.

View file

@ -38,8 +38,8 @@
{port_specs, [{"priv/wterl.so", ["c_src/*.c"]}]}.
{port_env, [
{"DRV_CFLAGS", "$DRV_CFLAGS -O3 -mtune=native -march=native -fPIC -Wall -Wextra -Werror -I c_src/system/include"},
{"DRV_LDFLAGS", "$DRV_LDFLAGS -Wl,-rpath,lib/wterl/priv:lib/wterl-0.9.0/priv:priv -Lc_src/system/lib -lwiredtiger"}
{"DRV_CFLAGS", "$DRV_CFLAGS -fPIC -Wall -Wextra -Werror -I c_src/system/include"},
{"DRV_LDFLAGS", "$DRV_LDFLAGS -Wl,-rpath,lib/wterl/priv:priv -Lc_src/system/lib -lwiredtiger"}
]}.
{pre_hooks, [{compile, "c_src/build_deps.sh compile"}]}.

View file

@ -21,34 +21,27 @@
%%
%% -------------------------------------------------------------------
-define(ASYNC_NIF_CALL(Fun, Args),
F = fun(F, T) ->
R = erlang:make_ref(),
case erlang:apply(Fun, [R|Args]) of
{ok, {enqueued, PctBusy}} ->
if
PctBusy > 0.25 andalso PctBusy =< 1.0 ->
erlang:bump_reductions(erlang:trunc(2000 * PctBusy));
true ->
ok
end,
receive
{R, {error, shutdown}=Error} ->
%% Work unit was queued, but not executed.
Error;
{R, {error, _Reason}=Error} ->
%% Work unit returned an error.
Error;
{R, Reply} ->
Reply
end;
{error, eagain} ->
case T of
3 -> not_found;
_ -> F(F, T + 1)
end;
Other ->
Other
end
end,
F(F, 1)).
-spec async_nif_enqueue(reference(), function(), [term()]) -> term() | {error, term()}.
async_nif_enqueue(R, F, A) ->
case erlang:apply(F, [R|A]) of
{ok, enqueued} ->
receive
{R, {error, shutdown}=Error} ->
%% Work unit was queued, but not executed.
Error;
{R, {error, _Reason}=Error} ->
%% Work unit returned an error.
Error;
{R, Reply} ->
Reply
end;
{error, eagain} ->
%% Work unit was not queued, try again.
async_nif_enqueue(R, F, A);
%{error, enomem} ->
%{error, shutdown} ->
Other ->
Other
end.
-define(ASYNC_NIF_CALL(Fun, Args), async_nif_enqueue(erlang:make_ref(), Fun, Args)).

View file

@ -22,7 +22,6 @@
-module(riak_kv_wterl_backend).
-behavior(temp_riak_kv_backend).
-compile([{parse_transform, lager_transform}]).
%% KV Backend API
-export([api_version/0,
@ -43,7 +42,7 @@
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-compile(export_all).
-compiel(export_all).
-endif.
-define(API_VERSION, 1).
@ -120,14 +119,14 @@ start(Partition, Config) ->
"lsm" ->
[{internal_page_max, "128K"},
{leaf_page_max, "16K"},
{lsm, [
{bloom_config, [{leaf_page_max, "8MB"}]},
{bloom_bit_count, 28},
{bloom_hash_count, 19},
{bloom_oldest, true},
{chunk_size, "100MB"},
{merge_threads, 2}
]}
{lsm_chunk_size, "100MB"},
{lsm_merge_threads, 2},
{prefix_compression, true},
{lsm_bloom_newest, true},
{lsm_bloom_oldest, true} ,
{lsm_bloom_bit_count, 28},
{lsm_bloom_hash_count, 19},
{lsm_bloom_config, [{leaf_page_max, "8MB"}]}
] ++ Compressor;
"table" ->
Compressor
@ -342,23 +341,22 @@ is_empty(#state{connection=Connection, table=Table}) ->
%% @doc Get the status information for this wterl backend
-spec status(state()) -> [{atom(), term()}].
status(#state{connection=Connection, table=Table}) ->
[].
%% case wterl:cursor_open(Connection, "statistics:" ++ Table, [{statistics_fast, true}]) of
%% {ok, Cursor} ->
%% TheStats =
%% case fetch_status(Cursor) of
%% {ok, Stats} ->
%% Stats;
%% {error, {eperm, _}} -> % TODO: review/fix this logic
%% {ok, []};
%% _ ->
%% {ok, []}
%% end,
%% wterl:cursor_close(Cursor),
%% TheStats;
%% {error, Reason2} ->
%% {error, Reason2}
%% end.
case wterl:cursor_open(Connection, Table) of
{ok, Cursor} ->
TheStats =
case fetch_status(Cursor) of
{ok, Stats} ->
Stats;
{error, {eperm, _}} -> % TODO: review/fix this logic
{ok, []};
_ ->
{ok, []}
end,
wterl:cursor_close(Cursor),
TheStats;
{error, Reason2} ->
{error, Reason2}
end.
%% @doc Register an asynchronous callback
-spec callback(reference(), any(), state()) -> {ok, state()}.
@ -401,41 +399,30 @@ establish_connection(Config, Type) ->
ok = filelib:ensure_dir(filename:join(DataRoot, "x")),
%% WT Connection Options:
LogSetting = app_helper:get_prop_or_env(log, Config, wterl, false),
%% NOTE: LSM auto-checkpoints, so we don't have too.
CheckpointSetting =
case Type =:= "lsm" of
true ->
case LogSetting of
true ->
%% Turn checkpoints on if logging is on, checkpoints enable log archival.
app_helper:get_prop_or_env(checkpoint, Config, wterl, [{wait, 30}]); % in seconds
_ ->
[]
end;
[];
false ->
app_helper:get_prop_or_env(checkpoint, Config, wterl, [{wait, 30}])
app_helper:get_prop_or_env(checkpoint, Config, wterl, [{wait, 10}])
end,
RequestedCacheSize = app_helper:get_prop_or_env(cache_size, Config, wterl),
ConnectionOpts =
orddict:from_list(
[ wterl:config_value(create, Config, true),
wterl:config_value(checkpoint_sync, Config, false),
wterl:config_value(transaction_sync, Config, "none"),
wterl:config_value(log, Config, [{enabled, LogSetting}]),
wterl:config_value(mmap, Config, false),
wterl:config_value(checkpoint, Config, CheckpointSetting),
wterl:config_value(sync, Config, false),
wterl:config_value(session_max, Config, max_sessions(Config)),
wterl:config_value(cache_size, Config, size_cache(RequestedCacheSize)),
wterl:config_value(statistics, Config, [ "fast", "clear"]),
wterl:config_value(statistics_log, Config, [{wait, 600}]), % in seconds
wterl:config_value(verbose, Config, [ "salvage", "verify"
% Note: for some unknown reason, if you add these additional
% verbose flags Erlang SEGV's "size_object: bad tag for 0x80"
% no idea why... you've been warned.
% no idea why... yet... you've been warned.
%"block", "shared_cache", "reconcile", "evict", "lsm",
%"fileops", "read", "write", "readserver", "evictserver",
%"hazard", "mutex", "ckpt"
]) ] ++ proplists:get_value(wterl, Config, [])), % sec
]) ] ++ CheckpointSetting ++ proplists:get_value(wterl, Config, [])), % sec
%% WT Session Options:
SessionOpts = [{isolation, "snapshot"}],
@ -556,15 +543,15 @@ from_index_key(LKey) ->
%% @private
%% Return all status from wterl statistics cursor
%% fetch_status(Cursor) ->
%% {ok, fetch_status(Cursor, wterl:cursor_next_value(Cursor), [])}.
%% fetch_status(_Cursor, {error, _}, Acc) ->
%% lists:reverse(Acc);
%% fetch_status(_Cursor, not_found, Acc) ->
%% lists:reverse(Acc);
%% fetch_status(Cursor, {ok, Stat}, Acc) ->
%% [What,Val|_] = [binary_to_list(B) || B <- binary:split(Stat, [<<0>>], [global])],
%% fetch_status(Cursor, wterl:cursor_next_value(Cursor), [{What,Val}|Acc]).
fetch_status(Cursor) ->
{ok, fetch_status(Cursor, wterl:cursor_next_value(Cursor), [])}.
fetch_status(_Cursor, {error, _}, Acc) ->
lists:reverse(Acc);
fetch_status(_Cursor, not_found, Acc) ->
lists:reverse(Acc);
fetch_status(Cursor, {ok, Stat}, Acc) ->
[What,Val|_] = [binary_to_list(B) || B <- binary:split(Stat, [<<0>>], [global])],
fetch_status(Cursor, wterl:cursor_next_value(Cursor), [{What,Val}|Acc]).
size_cache(RequestedSize) ->
Size =

View file

@ -96,8 +96,8 @@ nif_stub_error(Line) ->
-spec init() -> ok | {error, any()}.
init() ->
erlang:load_nif(filename:join([priv_dir(), atom_to_list(?MODULE)]),
[{wterl_vsn, "942e51b"},
{wiredtiger_vsn, "1.6.4-275-g9c44420"}]). %% TODO automate these
[{wterl_vsn, "53307e8"},
{wiredtiger_vsn, "1.6.2-0-g07cb0a5"}]).
-spec connection_open(string(), config_list()) -> {ok, connection()} | {error, term()}.
-spec connection_open(string(), config_list(), config_list()) -> {ok, connection()} | {error, term()}.
@ -454,26 +454,17 @@ config_to_bin([], Acc) ->
config_to_bin([{Key, Value} | Rest], Acc) ->
ConfigTypes =
[{block_compressor, {string, quoted}},
{bloom_bit_count, integer},
{bloom_config, config},
{bloom_hash_count, integer},
{bloom_newest, bool},
{bloom_oldest, bool},
{cache_size, string},
{checkpoint, config},
{checkpoint_sync, bool},
{checksum, string},
{chunk_size, string},
{create, bool},
{direct_io, list},
{drop, list},
{enabled, bool},
{error_prefix, string},
{eviction_target, integer},
{eviction_trigger, integer},
{extensions, {list, quoted}},
{statistics_fast, bool},
{file_max, string},
{force, bool},
{from, string},
{hazard_max, integer},
@ -483,21 +474,24 @@ config_to_bin([{Key, Value} | Rest], Acc) ->
{isolation, string},
{key_type, string},
{leaf_page_max, string},
{log, config},
{lsm, config},
{mmap, bool},
{merge_threads, integer},
{logging, bool},
{lsm_bloom_bit_count, integer},
{lsm_bloom_config, config},
{lsm_bloom_hash_count, integer},
{lsm_bloom_newest, bool},
{lsm_bloom_oldest, bool},
{lsm_chunk_size, string},
{prefix_compression, bool},
{lsm_merge_threads, integer},
{multiprocess, bool},
{name, string},
{overwrite, bool},
{prefix_compression, bool},
{raw, bool},
{session_max, integer},
{statistics, list},
{statistics_log, config},
{sync, bool},
{target, {list, quoted}},
{to, string},
{transaction_sync, string},
{transactional, bool},
{verbose, list},
{wait, integer}],
@ -619,7 +613,7 @@ many_open_tables_test_() ->
DataDir = ?TEST_DATA_DIR,
KeyGen =
fun(X) ->
crypto:hash(sha, <<X>>)
crypto:sha(<<X>>)
end,
ValGen =
fun() ->

View file

@ -25,13 +25,7 @@
{mode, max}.
{duration, 10}.
{concurrent, 16}.
{report_interval, 1}.
{pb_timeout_general, 1000}. % ms
%{pb_timeout_read, ?}.
%{pb_timeout_write, ?}.
%{pb_timeout_listkeys, ?}.
%{pb_timeout_mapreduce, ?}.
{concurrent, 4}.
{driver, basho_bench_driver_wterl}.
{key_generator, {int_to_bin_littleendian,{uniform_int, 5000000}}}.
{value_generator, {fixed_bin, 10000}}.
@ -43,9 +37,7 @@
{wterl, [
{connection, [
{create, true},
{session_sync, false},
{transaction_sync, "none"},
{log, [{enabled, false}]},
{sync, false},
{session_max, 1024},
{cache_size, 4294967296},
{verbose, []},
@ -60,11 +52,11 @@
]},
{session, [ {isolation, "snapshot"} ]},
{table_uri, "lsm:test"},
{lsm_merge_threads, 2},
{table, [
{internal_page_max, "128K"},
{leaf_page_max, "128K"},
{lsm_chunk_size, "25MB"},
{prefix_compression, false},
{lsm_bloom_newest, true},
{lsm_bloom_oldest, true} ,
{lsm_bloom_bit_count, 128},
@ -78,9 +70,9 @@
{wterl_, [
{connection, [
{create, true},
{session_sync, false},
{transaction_sync, "none"},
{log, [{enabled, false}]},
{sync, false},
{logging, true},
{transactional, true},
{session_max, 1024},
{cache_size, 4294967296},
{verbose, []},
@ -97,6 +89,7 @@
{session, [ {isolation, "snapshot"} ]},
{table_uri, "table:test"},
{table, [
{prefix_compression, false},
{block_compressor, "snappy"} % bzip2
]}
]}.