From 87f70d75a1ecbb81d5e3cb7e919b307563e686ac Mon Sep 17 00:00:00 2001 From: Gregory Burd Date: Wed, 17 Apr 2013 11:17:13 -0400 Subject: [PATCH] Inline the fifo_q functions to speed them up and silence compiler warnings for unused API calls. Add a fifo_q_full call to hide the details of that. Alloc work queues along with the async_nif at the end of that memory block. Fix a few places where things should be free'd and were not. Change enqueue to return 0 when shutting down. Fix a race related to shutdown. When I use gdb eunit calls ?cmd() seem to fail, so I've created rmdir:path() to replace ?cmd("rm -rf path") calls. --- .gitignore | 3 + c_src/async_nif.h | 111 ++++++++++++++++++++++------------ c_src/build_deps.sh | 26 +++----- c_src/bzip2-build.patch | 0 c_src/wterl.c | 16 ++--- src/riak_kv_wterl_backend.erl | 8 ++- src/rmdir.erl | 26 ++++++++ src/wterl.app.src | 2 +- src/wterl.erl | 28 +-------- update-version.sh | 2 + 10 files changed, 130 insertions(+), 92 deletions(-) create mode 100644 c_src/bzip2-build.patch create mode 100644 src/rmdir.erl diff --git a/.gitignore b/.gitignore index 3982b0b..b312112 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,9 @@ ebin c_src/system c_src/wiredtiger*/ c_src/*.o +c_src/bzip2-1.0.6 +c_src/snappy-1.0.4 deps/ priv/ +log/ *~ diff --git a/c_src/async_nif.h b/c_src/async_nif.h index 5493dba..eb4715a 100644 --- a/c_src/async_nif.h +++ b/c_src/async_nif.h @@ -49,29 +49,32 @@ extern "C" { q->s = n + 1; \ return q; \ } \ - static type *fifo_q_ ## name ## _put(struct fifo_q__ ## name *q, type *n) { \ + static inline type *fifo_q_ ## name ## _put(struct fifo_q__ ## name *q, type *n) { \ q->items[q->h] = n; \ q->h = (q->h + 1) % q->s; \ return n; \ } \ - static type *fifo_q_ ## name ## _get(struct fifo_q__ ## name *q) { \ + static inline type *fifo_q_ ## name ## _get(struct fifo_q__ ## name *q) { \ type *n = q->items[q->t]; \ q->items[q->t] = 0; \ q->t = (q->t + 1) % q->s; \ return n; \ } \ - static void fifo_q_ ## name ## _free(struct fifo_q__ ## name *q) { \ - memset(q, 0, sizeof(*q)); \ + static inline void fifo_q_ ## name ## _free(struct fifo_q__ ## name *q) { \ + memset(q, 0, sizeof(struct fifo_q__ ## name) + (q->s * sizeof(type *))); \ enif_free(q); \ } \ - static unsigned int fifo_q_ ## name ## _size(struct fifo_q__ ## name *q) { \ + static inline unsigned int fifo_q_ ## name ## _size(struct fifo_q__ ## name *q) { \ return (q->h - q->t + q->s) % q->s; \ } \ - static unsigned int fifo_q_ ## name ## _capacity(struct fifo_q__ ## name *q) { \ + static inline unsigned int fifo_q_ ## name ## _capacity(struct fifo_q__ ## name *q) { \ return q->s - 1; \ } \ - static int fifo_q_ ## name ## _empty(struct fifo_q__ ## name *q) { \ + static inline int fifo_q_ ## name ## _empty(struct fifo_q__ ## name *q) { \ return (q->t == q->h); \ + } \ + static inline int fifo_q_ ## name ## _full(struct fifo_q__ ## name *q) { \ + return ((q->h + 1) % q->s) == q->t; \ } #define fifo_q_new(name, size) fifo_q_ ## name ## _new(size) @@ -81,6 +84,7 @@ extern "C" { #define fifo_q_size(name, queue) fifo_q_ ## name ## _size(queue) #define fifo_q_capacity(name, queue) fifo_q_ ## name ## _capacity(queue) #define fifo_q_empty(name, queue) fifo_q_ ## name ## _empty(queue) +#define fifo_q_full(name, queue) fifo_q_ ## name ## _full(queue) #define fifo_q_foreach(name, queue, item, task) do { \ while((item = fifo_q_ ## name ## _get(queue)) != NULL) { \ do task while(0); \ @@ -117,7 +121,7 @@ struct async_nif_state { struct async_nif_worker_entry worker_entries[ASYNC_NIF_MAX_WORKERS]; unsigned int num_queues; unsigned int next_q; - struct async_nif_work_queue queues[ASYNC_NIF_MAX_WORKERS]; // TODO: this should be alloc'ed + struct async_nif_work_queue queues[]; }; #define ASYNC_NIF_DECL(decl, frame, pre_block, work_block, post_block) \ @@ -156,6 +160,7 @@ struct async_nif_state { copy_of_args = (struct decl ## _args *)enif_alloc(sizeof(struct decl ## _args)); \ if (!copy_of_args) { \ fn_post_ ## decl (args); \ + enif_free(req); \ enif_free_env(new_env); \ return enif_make_tuple2(env, enif_make_atom(env, "error"), \ enif_make_atom(env, "enomem")); \ @@ -166,11 +171,18 @@ struct async_nif_state { enif_self(env, &req->pid); \ req->args = (void*)copy_of_args; \ req->fn_work = (void (*)(ErlNifEnv *, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *))fn_work_ ## decl ; \ - unsigned int h = 0; \ + int h = -1; \ if (affinity) \ h = async_nif_str_hash_func(affinity) % async_nif->num_queues; \ ERL_NIF_TERM reply = async_nif_enqueue_req(async_nif, req, h); \ req->fn_post = (void (*)(void *))fn_post_ ## decl; \ + if (!reply) { \ + enif_free(req); \ + enif_free_env(new_env); \ + enif_free(copy_of_args); \ + return enif_make_tuple2(env, enif_make_atom(env, "error"), \ + enif_make_atom(env, "shutdown")); \ + } \ return reply; \ } @@ -205,6 +217,8 @@ struct async_nif_state { #define ASYNC_NIF_WORK_ENV new_env #define ASYNC_NIF_REPLY(msg) enif_send(NULL, pid, env, enif_make_tuple2(env, ref, msg)) +// TODO: fix, currently NOREPLY() will block cause the recieve in async_nif.hrl wait forever +#define ASYNC_NIF_NOREPLY() enif_free_env(env) /** * TODO: @@ -220,38 +234,38 @@ static inline unsigned int async_nif_str_hash_func(const char *s) * TODO: */ static ERL_NIF_TERM -async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_entry *req, unsigned int hint) +async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_entry *req, int hint) { - /* If we're shutting down return an error term and ignore the request. */ - if (async_nif->shutdown) { - ERL_NIF_TERM reply = enif_make_tuple2(req->env, enif_make_atom(req->env, "error"), - enif_make_atom(req->env, "shutdown")); - enif_free(req->args); - enif_free_env(req->env); - enif_free(req); - return reply; - } - - unsigned int qid = hint ? hint : async_nif->next_q; // Keep a local to avoid the race. - struct async_nif_work_queue *q = &async_nif->queues[qid]; - while (fifo_q_size(reqs, q->reqs) == fifo_q_capacity(reqs, q->reqs)) { - qid = (qid + 1) % async_nif->num_queues; + /* Identify the most appropriate worker for this request. */ + unsigned int qid = (hint != -1) ? hint : async_nif->next_q; + struct async_nif_work_queue *q = NULL; + do { q = &async_nif->queues[qid]; - } - /* TODO: - if (q->avg_latency > 5) { - async_nif->next_q = (qid + 1) % async_nif->num_queues; - } - */ + enif_mutex_lock(q->reqs_mutex); - /* Otherwise, add the request to the work queue. */ - enif_mutex_lock(q->reqs_mutex); + /* Now that we hold the lock, check for shutdown. As long as we + hold this lock either a) we're shutting down so exit now or + b) this queue will be valid until we release the lock. */ + if (async_nif->shutdown) + return 0; + + if (fifo_q_full(reqs, q->reqs)) { // TODO: || (q->avg_latency > median_latency) + enif_mutex_unlock(q->reqs_mutex); + qid = (qid + 1) % async_nif->num_queues; + q = &async_nif->queues[qid]; + } else { + break; + } + } while(1); + + /* And add the request to their work queue. */ fifo_q_put(reqs, q->reqs, req); /* Build the term before releasing the lock so as not to race on the use of - the req pointer (which will soon become invalid). */ + the req pointer (which will soon become invalid in another thread + performing the request). */ ERL_NIF_TERM reply = enif_make_tuple2(req->env, enif_make_atom(req->env, "ok"), - enif_make_atom(req->env, "enqueued")); + enif_make_atom(req->env, "enqueued")); enif_mutex_unlock(q->reqs_mutex); enif_cond_signal(q->reqs_cnd); return reply; @@ -294,6 +308,10 @@ async_nif_worker_fn(void *arg) /* Finally, do the work. */ req->fn_work(req->env, req->ref, &req->pid, worker_id, req->args); req->fn_post(req->args); + /* Note: we don't call enif_free_env(req->env) because it has called + enif_send() which invalidates it (free'ing it for us). If a work + block doesn't call ASYNC_NIF_REPLY() at some point then it must + call ASYNC_NIF_NOREPLY() to free this env. */ enif_free(req->args); enif_free(req); @@ -317,8 +335,17 @@ async_nif_unload(ErlNifEnv *env) unsigned int i; struct async_nif_state *async_nif = (struct async_nif_state*)enif_priv_data(env); - /* Signal the worker threads, stop what you're doing and exit. */ + /* Signal the worker threads, stop what you're doing and exit. To + ensure that we don't race with the enqueue() process we first + lock all the worker queues, then set shutdown to true, then + unlock. The enqueue function will take the queue mutex, then + test for shutdown condition, then enqueue only if not shutting + down. */ + for (i = 0; i < async_nif->num_queues; i++) + enif_mutex_lock(async_nif->queues[i].reqs_mutex); async_nif->shutdown = 1; + for (i = 0; i < async_nif->num_queues; i++) + enif_mutex_unlock(async_nif->queues[i].reqs_mutex); /* Wake up any waiting worker threads. */ for (i = 0; i < async_nif->num_queues; i++) { @@ -328,12 +355,13 @@ async_nif_unload(ErlNifEnv *env) /* Join for the now exiting worker threads. */ for (i = 0; i < async_nif->num_workers; ++i) { - void *exit_value = 0; /* Ignore this. */ + void *exit_value = 0; /* We ignore the thread_join's exit value. */ enif_thread_join(async_nif->worker_entries[i].tid, &exit_value); } /* Cleanup requests, mutexes and conditions in each work queue. */ - for (i = 0; i < async_nif->num_queues; i++) { + unsigned int num_queues = async_nif->num_queues; + for (i = 0; i < num_queues; i++) { struct async_nif_work_queue *q = &async_nif->queues[i]; enif_mutex_destroy(q->reqs_mutex); enif_cond_destroy(q->reqs_cnd); @@ -351,7 +379,8 @@ async_nif_unload(ErlNifEnv *env) }); fifo_q_free(reqs, q->reqs); } - memset(async_nif, 0, sizeof(struct async_nif_state)); + memset(async_nif, 0, sizeof(struct async_nif_state) + + sizeof(struct async_nif_work_queue) * num_queues); enif_free(async_nif); } @@ -371,10 +400,12 @@ async_nif_load(void) enif_system_info(&info, sizeof(ErlNifSysInfo)); /* Init our portion of priv_data's module-specific state. */ - async_nif = enif_alloc(sizeof(struct async_nif_state)); + async_nif = enif_alloc(sizeof(struct async_nif_state) + + sizeof(struct async_nif_work_queue) * info.scheduler_threads); if (!async_nif) return NULL; - memset(async_nif, 0, sizeof(struct async_nif_state)); + memset(async_nif, 0, sizeof(struct async_nif_state) + + sizeof(struct async_nif_work_queue) * info.scheduler_threads); async_nif->num_queues = info.scheduler_threads; async_nif->next_q = 0; diff --git a/c_src/build_deps.sh b/c_src/build_deps.sh index 6f69cc9..601e917 100755 --- a/c_src/build_deps.sh +++ b/c_src/build_deps.sh @@ -15,13 +15,9 @@ WT_BRANCH=basho WT_VSN="" WT_DIR=wiredtiger-$WT_BRANCH -#SNAPPY_REPO= -#SNAPPY_BRANCH= SNAPPY_VSN="1.0.4" SNAPPY_DIR=snappy-$SNAPPY_VSN -#BZIP2_REPO= -#BZIP2_BRANCH= BZIP2_VSN="1.0.6" BZIP2_DIR=bzip2-$BZIP2_VSN @@ -71,12 +67,10 @@ get_snappy () [ -e snappy-$SNAPPY_VSN.tar.gz ] || (echo "Missing Snappy ($SNAPPY_VSN) source package" && exit 1) [ -d $BASEDIR/$SNAPPY_DIR ] || tar -xzf snappy-$SNAPPY_VSN.tar.gz [ -e $BASEDIR/snappy-build.patch ] && \ - (cd $BASEDIR/$SNAPPY_DIR || exit 1 + (cd $BASEDIR/$SNAPPY_DIR patch -p1 --forward < $BASEDIR/snappy-build.patch || exit 1) - (cd $BASEDIR/$SNAPPY_DIR || exit 1 - ./configure --with-pic \ - --prefix=$BASEDIR/system || exit 1 - ) + (cd $BASEDIR/$SNAPPY_DIR + ./configure --with-pic --prefix=$BASEDIR/system || exit 1) } get_bzip2 () @@ -84,7 +78,7 @@ get_bzip2 () [ -e bzip2-$BZIP2_VSN.tar.gz ] || (echo "Missing bzip2 ($BZIP2_VSN) source package" && exit 1) [ -d $BASEDIR/$BZIP2_DIR ] || tar -xzf bzip2-$BZIP2_VSN.tar.gz [ -e $BASEDIR/bzip2-build.patch ] && \ - (cd $BASEDIR/$BZIP2_DIR || exit 1 + (cd $BASEDIR/$BZIP2_DIR patch -p1 --forward < $BASEDIR/bzip2-build.patch || exit 1) } @@ -163,18 +157,18 @@ case "$1" in [ -d $BZIP2_DIR ] || get_bzip2; # Build Snappy - [ -d $BASEDIR/$SNAPPY_DIR ] || (echo "Missing Snappy source directory (did you first get-deps?)" && exit 1) + [ -d $BASEDIR/$SNAPPY_DIR ] || (echo "Missing Snappy source directory" && exit 1) test -f system/lib/libsnappy.so.[0-9].[0-9].[0-9] || build_snappy; # Build BZIP2 - [ -d $BASEDIR/$BZIP2_DIR ] || (echo "Missing BZip2 source directory (did you first get-deps?)" && exit 1) + [ -d $BASEDIR/$BZIP2_DIR ] || (echo "Missing BZip2 source directory" && exit 1) test -f system/lib/libbz2.so.[0-9].[0-9].[0-9] || build_bzip2; # Build WiredTiger - [ -d $BASEDIR/$WT_DIR ] || (echo "Missing WiredTiger source directory (did you first get-deps?)" && exit 1) - test -f system/lib/libwiredtiger-[0-9].[0-9].[0-9].so -a \ - -f system/lib/libwiredtiger_snappy.so -a \ - -f system/lib/libwiredtiger_bzip2.so || build_wt; + [ -d $BASEDIR/$WT_DIR ] || (echo "Missing WiredTiger source directory" && exit 1) + test -f system/lib/libwiredtiger-[0-9].[0-9].[0-9].so \ + -a -f system/lib/libwiredtiger_snappy.so \ + -a -f system/lib/libwiredtiger_bzip2.so || build_wt; [ -d $BASEDIR/../priv ] || mkdir ${BASEDIR}/../priv cp $BASEDIR/system/bin/wt ${BASEDIR}/../priv diff --git a/c_src/bzip2-build.patch b/c_src/bzip2-build.patch new file mode 100644 index 0000000..e69de29 diff --git a/c_src/wterl.c b/c_src/wterl.c index f2971a8..c7837ff 100644 --- a/c_src/wterl.c +++ b/c_src/wterl.c @@ -23,14 +23,6 @@ #include #include -#include "wiredtiger.h" -#include "async_nif.h" -#include "khash.h" - -#ifdef WTERL_STATS -#include "stats.h" -#endif - #ifdef DEBUG #include #include @@ -43,6 +35,14 @@ # define dprint(s, ...) {} #endif +#include "wiredtiger.h" +#include "async_nif.h" +#include "khash.h" + +#ifdef WTERL_STATS +#include "stats.h" +#endif + static ErlNifResourceType *wterl_conn_RESOURCE; static ErlNifResourceType *wterl_cursor_RESOURCE; diff --git a/src/riak_kv_wterl_backend.erl b/src/riak_kv_wterl_backend.erl index 5651751..4c345c0 100644 --- a/src/riak_kv_wterl_backend.erl +++ b/src/riak_kv_wterl_backend.erl @@ -560,12 +560,16 @@ size_cache(RequestedSize) -> -ifdef(TEST). simple_test_() -> - ?assertCmd("rm -rf test/wterl-backend"), + {ok, CWD} = file:get_cwd(), + ?assertMatch(true, lists:suffix("wterl/.eunit", CWD)), + rmdir:path(filename:join([CWD, "test/wterl-backend"])), %?assertCmd("rm -rf test/wterl-backend"), application:set_env(wterl, data_root, "test/wterl-backend"), temp_riak_kv_backend:standard_test(?MODULE, []). custom_config_test_() -> - ?assertCmd("rm -rf test/wterl-backend"), + {ok, CWD} = file:get_cwd(), + ?assertMatch(true, lists:suffix("wterl/.eunit", CWD)), + rmdir:path(filename:join([CWD, "test/wterl-backend"])), %?assertCmd("rm -rf test/wterl-backend"), application:set_env(wterl, data_root, ""), temp_riak_kv_backend:standard_test(?MODULE, [{data_root, "test/wterl-backend"}]). diff --git a/src/rmdir.erl b/src/rmdir.erl new file mode 100644 index 0000000..d26a129 --- /dev/null +++ b/src/rmdir.erl @@ -0,0 +1,26 @@ +-module(rmdir). + +-export([path/1]). + +-include_lib("kernel/include/file.hrl"). + +path(Dir) -> + remove_all_files(".", [Dir]). + +remove_all_files(Dir, Files) -> + lists:foreach(fun(File) -> + FilePath = filename:join([Dir, File]), + case file:read_file_info(FilePath) of + {ok, FileInfo} -> + case FileInfo#file_info.type of + directory -> + {ok, DirFiles} = file:list_dir(FilePath), + remove_all_files(FilePath, DirFiles), + file:del_dir(FilePath); + _ -> + file:delete(FilePath) + end; + {error, _Reason} -> + ok + end + end, Files). diff --git a/src/wterl.app.src b/src/wterl.app.src index 534c8f0..b235371 100644 --- a/src/wterl.app.src +++ b/src/wterl.app.src @@ -1,6 +1,6 @@ {application, wterl, [ - {description, "Erlang Wrapper for WiredTiger"}, + {description, "Erlang NIF Wrapper for WiredTiger"}, {vsn, "0.9.0"}, {registered, []}, {applications, [ diff --git a/src/wterl.erl b/src/wterl.erl index a3c7cc4..0c28d1a 100644 --- a/src/wterl.erl +++ b/src/wterl.erl @@ -512,29 +512,6 @@ config_to_bin([{Key, Value} | Rest], Acc) -> %% =================================================================== -ifdef(TEST). --include_lib("kernel/include/file.hrl"). - -remove_dir_tree(Dir) -> - remove_all_files(".", [Dir]). - -remove_all_files(Dir, Files) -> - lists:foreach(fun(File) -> - FilePath = filename:join([Dir, File]), - case file:read_file_info(FilePath) of - {ok, FileInfo} -> - case FileInfo#file_info.type of - directory -> - {ok, DirFiles} = file:list_dir(FilePath), - remove_all_files(FilePath, DirFiles), - file:del_dir(FilePath); - _ -> - file:delete(FilePath) - end; - {error, _Reason} -> - ok - end - end, Files). - -define(TEST_DATA_DIR, "test/wterl.basic"). open_test_conn(DataDir) -> @@ -542,7 +519,7 @@ open_test_conn(DataDir) -> open_test_conn(DataDir, OpenConfig) -> {ok, CWD} = file:get_cwd(), ?assertMatch(true, lists:suffix("wterl/.eunit", CWD)), - remove_dir_tree(filename:join([CWD, DataDir])), %?cmd("rm -rf " ++ filename:join([CWD, DataDir])), + rmdir:path(filename:join([CWD, DataDir])), %?cmd("rm -rf " ++ filename:join([CWD, DataDir])), ?assertMatch(ok, filelib:ensure_dir(filename:join([DataDir, "x"]))), {ok, ConnRef} = connection_open(filename:join([CWD, DataDir]), OpenConfig), ConnRef. @@ -594,7 +571,8 @@ conn_test_() -> ConnRef = open_test_table(ConnRef, "table", [{block_compressor, "bzip2"}]), ?assertMatch(ok, verify(ConnRef, "table:test")), ?assertMatch(ok, drop(ConnRef, "table:test")) - end}]} + end} + ]} end}. insert_delete_test() -> diff --git a/update-version.sh b/update-version.sh index 69a7a58..459fc0c 100755 --- a/update-version.sh +++ b/update-version.sh @@ -1,5 +1,7 @@ #!/bin/sh - +# Note: also, remember to update version numbers in rpath specs so that shared libs can be found at runtime!!! + wterl=`git log -n 1 --pretty=format:"%H"` wiredtiger0=`(cd c_src/wiredtiger && git log -n 1 --pretty=format:"%H")` wiredtiger=`echo $wiredtiger0 | awk '{print $2}'`