Inline the fifo_q functions to speed them up and silence compiler warnings

for unused API calls.  Add a fifo_q_full call to hide the details of that.
Alloc work queues along with the async_nif at the end of that memory block.
Fix a few places where things should be free'd and were not.  Change enqueue
to return 0 when shutting down.  Fix a race related to shutdown.  When I use
gdb eunit calls ?cmd() seem to fail, so I've created rmdir:path() to replace
?cmd("rm -rf path") calls.
This commit is contained in:
Gregory Burd 2013-04-17 11:17:13 -04:00
parent 1913e7fdf5
commit 87f70d75a1
10 changed files with 130 additions and 92 deletions

3
.gitignore vendored
View file

@ -4,6 +4,9 @@ ebin
c_src/system
c_src/wiredtiger*/
c_src/*.o
c_src/bzip2-1.0.6
c_src/snappy-1.0.4
deps/
priv/
log/
*~

View file

@ -49,29 +49,32 @@ extern "C" {
q->s = n + 1; \
return q; \
} \
static type *fifo_q_ ## name ## _put(struct fifo_q__ ## name *q, type *n) { \
static inline type *fifo_q_ ## name ## _put(struct fifo_q__ ## name *q, type *n) { \
q->items[q->h] = n; \
q->h = (q->h + 1) % q->s; \
return n; \
} \
static type *fifo_q_ ## name ## _get(struct fifo_q__ ## name *q) { \
static inline type *fifo_q_ ## name ## _get(struct fifo_q__ ## name *q) { \
type *n = q->items[q->t]; \
q->items[q->t] = 0; \
q->t = (q->t + 1) % q->s; \
return n; \
} \
static void fifo_q_ ## name ## _free(struct fifo_q__ ## name *q) { \
memset(q, 0, sizeof(*q)); \
static inline void fifo_q_ ## name ## _free(struct fifo_q__ ## name *q) { \
memset(q, 0, sizeof(struct fifo_q__ ## name) + (q->s * sizeof(type *))); \
enif_free(q); \
} \
static unsigned int fifo_q_ ## name ## _size(struct fifo_q__ ## name *q) { \
static inline unsigned int fifo_q_ ## name ## _size(struct fifo_q__ ## name *q) { \
return (q->h - q->t + q->s) % q->s; \
} \
static unsigned int fifo_q_ ## name ## _capacity(struct fifo_q__ ## name *q) { \
static inline unsigned int fifo_q_ ## name ## _capacity(struct fifo_q__ ## name *q) { \
return q->s - 1; \
} \
static int fifo_q_ ## name ## _empty(struct fifo_q__ ## name *q) { \
static inline int fifo_q_ ## name ## _empty(struct fifo_q__ ## name *q) { \
return (q->t == q->h); \
} \
static inline int fifo_q_ ## name ## _full(struct fifo_q__ ## name *q) { \
return ((q->h + 1) % q->s) == q->t; \
}
#define fifo_q_new(name, size) fifo_q_ ## name ## _new(size)
@ -81,6 +84,7 @@ extern "C" {
#define fifo_q_size(name, queue) fifo_q_ ## name ## _size(queue)
#define fifo_q_capacity(name, queue) fifo_q_ ## name ## _capacity(queue)
#define fifo_q_empty(name, queue) fifo_q_ ## name ## _empty(queue)
#define fifo_q_full(name, queue) fifo_q_ ## name ## _full(queue)
#define fifo_q_foreach(name, queue, item, task) do { \
while((item = fifo_q_ ## name ## _get(queue)) != NULL) { \
do task while(0); \
@ -117,7 +121,7 @@ struct async_nif_state {
struct async_nif_worker_entry worker_entries[ASYNC_NIF_MAX_WORKERS];
unsigned int num_queues;
unsigned int next_q;
struct async_nif_work_queue queues[ASYNC_NIF_MAX_WORKERS]; // TODO: this should be alloc'ed
struct async_nif_work_queue queues[];
};
#define ASYNC_NIF_DECL(decl, frame, pre_block, work_block, post_block) \
@ -156,6 +160,7 @@ struct async_nif_state {
copy_of_args = (struct decl ## _args *)enif_alloc(sizeof(struct decl ## _args)); \
if (!copy_of_args) { \
fn_post_ ## decl (args); \
enif_free(req); \
enif_free_env(new_env); \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "enomem")); \
@ -166,11 +171,18 @@ struct async_nif_state {
enif_self(env, &req->pid); \
req->args = (void*)copy_of_args; \
req->fn_work = (void (*)(ErlNifEnv *, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *))fn_work_ ## decl ; \
unsigned int h = 0; \
int h = -1; \
if (affinity) \
h = async_nif_str_hash_func(affinity) % async_nif->num_queues; \
ERL_NIF_TERM reply = async_nif_enqueue_req(async_nif, req, h); \
req->fn_post = (void (*)(void *))fn_post_ ## decl; \
if (!reply) { \
enif_free(req); \
enif_free_env(new_env); \
enif_free(copy_of_args); \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "shutdown")); \
} \
return reply; \
}
@ -205,6 +217,8 @@ struct async_nif_state {
#define ASYNC_NIF_WORK_ENV new_env
#define ASYNC_NIF_REPLY(msg) enif_send(NULL, pid, env, enif_make_tuple2(env, ref, msg))
// TODO: fix, currently NOREPLY() will block cause the recieve in async_nif.hrl wait forever
#define ASYNC_NIF_NOREPLY() enif_free_env(env)
/**
* TODO:
@ -220,38 +234,38 @@ static inline unsigned int async_nif_str_hash_func(const char *s)
* TODO:
*/
static ERL_NIF_TERM
async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_entry *req, unsigned int hint)
async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_entry *req, int hint)
{
/* If we're shutting down return an error term and ignore the request. */
if (async_nif->shutdown) {
ERL_NIF_TERM reply = enif_make_tuple2(req->env, enif_make_atom(req->env, "error"),
enif_make_atom(req->env, "shutdown"));
enif_free(req->args);
enif_free_env(req->env);
enif_free(req);
return reply;
}
unsigned int qid = hint ? hint : async_nif->next_q; // Keep a local to avoid the race.
struct async_nif_work_queue *q = &async_nif->queues[qid];
while (fifo_q_size(reqs, q->reqs) == fifo_q_capacity(reqs, q->reqs)) {
qid = (qid + 1) % async_nif->num_queues;
/* Identify the most appropriate worker for this request. */
unsigned int qid = (hint != -1) ? hint : async_nif->next_q;
struct async_nif_work_queue *q = NULL;
do {
q = &async_nif->queues[qid];
}
/* TODO:
if (q->avg_latency > 5) {
async_nif->next_q = (qid + 1) % async_nif->num_queues;
}
*/
enif_mutex_lock(q->reqs_mutex);
/* Otherwise, add the request to the work queue. */
enif_mutex_lock(q->reqs_mutex);
/* Now that we hold the lock, check for shutdown. As long as we
hold this lock either a) we're shutting down so exit now or
b) this queue will be valid until we release the lock. */
if (async_nif->shutdown)
return 0;
if (fifo_q_full(reqs, q->reqs)) { // TODO: || (q->avg_latency > median_latency)
enif_mutex_unlock(q->reqs_mutex);
qid = (qid + 1) % async_nif->num_queues;
q = &async_nif->queues[qid];
} else {
break;
}
} while(1);
/* And add the request to their work queue. */
fifo_q_put(reqs, q->reqs, req);
/* Build the term before releasing the lock so as not to race on the use of
the req pointer (which will soon become invalid). */
the req pointer (which will soon become invalid in another thread
performing the request). */
ERL_NIF_TERM reply = enif_make_tuple2(req->env, enif_make_atom(req->env, "ok"),
enif_make_atom(req->env, "enqueued"));
enif_make_atom(req->env, "enqueued"));
enif_mutex_unlock(q->reqs_mutex);
enif_cond_signal(q->reqs_cnd);
return reply;
@ -294,6 +308,10 @@ async_nif_worker_fn(void *arg)
/* Finally, do the work. */
req->fn_work(req->env, req->ref, &req->pid, worker_id, req->args);
req->fn_post(req->args);
/* Note: we don't call enif_free_env(req->env) because it has called
enif_send() which invalidates it (free'ing it for us). If a work
block doesn't call ASYNC_NIF_REPLY() at some point then it must
call ASYNC_NIF_NOREPLY() to free this env. */
enif_free(req->args);
enif_free(req);
@ -317,8 +335,17 @@ async_nif_unload(ErlNifEnv *env)
unsigned int i;
struct async_nif_state *async_nif = (struct async_nif_state*)enif_priv_data(env);
/* Signal the worker threads, stop what you're doing and exit. */
/* Signal the worker threads, stop what you're doing and exit. To
ensure that we don't race with the enqueue() process we first
lock all the worker queues, then set shutdown to true, then
unlock. The enqueue function will take the queue mutex, then
test for shutdown condition, then enqueue only if not shutting
down. */
for (i = 0; i < async_nif->num_queues; i++)
enif_mutex_lock(async_nif->queues[i].reqs_mutex);
async_nif->shutdown = 1;
for (i = 0; i < async_nif->num_queues; i++)
enif_mutex_unlock(async_nif->queues[i].reqs_mutex);
/* Wake up any waiting worker threads. */
for (i = 0; i < async_nif->num_queues; i++) {
@ -328,12 +355,13 @@ async_nif_unload(ErlNifEnv *env)
/* Join for the now exiting worker threads. */
for (i = 0; i < async_nif->num_workers; ++i) {
void *exit_value = 0; /* Ignore this. */
void *exit_value = 0; /* We ignore the thread_join's exit value. */
enif_thread_join(async_nif->worker_entries[i].tid, &exit_value);
}
/* Cleanup requests, mutexes and conditions in each work queue. */
for (i = 0; i < async_nif->num_queues; i++) {
unsigned int num_queues = async_nif->num_queues;
for (i = 0; i < num_queues; i++) {
struct async_nif_work_queue *q = &async_nif->queues[i];
enif_mutex_destroy(q->reqs_mutex);
enif_cond_destroy(q->reqs_cnd);
@ -351,7 +379,8 @@ async_nif_unload(ErlNifEnv *env)
});
fifo_q_free(reqs, q->reqs);
}
memset(async_nif, 0, sizeof(struct async_nif_state));
memset(async_nif, 0, sizeof(struct async_nif_state) +
sizeof(struct async_nif_work_queue) * num_queues);
enif_free(async_nif);
}
@ -371,10 +400,12 @@ async_nif_load(void)
enif_system_info(&info, sizeof(ErlNifSysInfo));
/* Init our portion of priv_data's module-specific state. */
async_nif = enif_alloc(sizeof(struct async_nif_state));
async_nif = enif_alloc(sizeof(struct async_nif_state) +
sizeof(struct async_nif_work_queue) * info.scheduler_threads);
if (!async_nif)
return NULL;
memset(async_nif, 0, sizeof(struct async_nif_state));
memset(async_nif, 0, sizeof(struct async_nif_state) +
sizeof(struct async_nif_work_queue) * info.scheduler_threads);
async_nif->num_queues = info.scheduler_threads;
async_nif->next_q = 0;

View file

@ -15,13 +15,9 @@ WT_BRANCH=basho
WT_VSN=""
WT_DIR=wiredtiger-$WT_BRANCH
#SNAPPY_REPO=
#SNAPPY_BRANCH=
SNAPPY_VSN="1.0.4"
SNAPPY_DIR=snappy-$SNAPPY_VSN
#BZIP2_REPO=
#BZIP2_BRANCH=
BZIP2_VSN="1.0.6"
BZIP2_DIR=bzip2-$BZIP2_VSN
@ -71,12 +67,10 @@ get_snappy ()
[ -e snappy-$SNAPPY_VSN.tar.gz ] || (echo "Missing Snappy ($SNAPPY_VSN) source package" && exit 1)
[ -d $BASEDIR/$SNAPPY_DIR ] || tar -xzf snappy-$SNAPPY_VSN.tar.gz
[ -e $BASEDIR/snappy-build.patch ] && \
(cd $BASEDIR/$SNAPPY_DIR || exit 1
(cd $BASEDIR/$SNAPPY_DIR
patch -p1 --forward < $BASEDIR/snappy-build.patch || exit 1)
(cd $BASEDIR/$SNAPPY_DIR || exit 1
./configure --with-pic \
--prefix=$BASEDIR/system || exit 1
)
(cd $BASEDIR/$SNAPPY_DIR
./configure --with-pic --prefix=$BASEDIR/system || exit 1)
}
get_bzip2 ()
@ -84,7 +78,7 @@ get_bzip2 ()
[ -e bzip2-$BZIP2_VSN.tar.gz ] || (echo "Missing bzip2 ($BZIP2_VSN) source package" && exit 1)
[ -d $BASEDIR/$BZIP2_DIR ] || tar -xzf bzip2-$BZIP2_VSN.tar.gz
[ -e $BASEDIR/bzip2-build.patch ] && \
(cd $BASEDIR/$BZIP2_DIR || exit 1
(cd $BASEDIR/$BZIP2_DIR
patch -p1 --forward < $BASEDIR/bzip2-build.patch || exit 1)
}
@ -163,18 +157,18 @@ case "$1" in
[ -d $BZIP2_DIR ] || get_bzip2;
# Build Snappy
[ -d $BASEDIR/$SNAPPY_DIR ] || (echo "Missing Snappy source directory (did you first get-deps?)" && exit 1)
[ -d $BASEDIR/$SNAPPY_DIR ] || (echo "Missing Snappy source directory" && exit 1)
test -f system/lib/libsnappy.so.[0-9].[0-9].[0-9] || build_snappy;
# Build BZIP2
[ -d $BASEDIR/$BZIP2_DIR ] || (echo "Missing BZip2 source directory (did you first get-deps?)" && exit 1)
[ -d $BASEDIR/$BZIP2_DIR ] || (echo "Missing BZip2 source directory" && exit 1)
test -f system/lib/libbz2.so.[0-9].[0-9].[0-9] || build_bzip2;
# Build WiredTiger
[ -d $BASEDIR/$WT_DIR ] || (echo "Missing WiredTiger source directory (did you first get-deps?)" && exit 1)
test -f system/lib/libwiredtiger-[0-9].[0-9].[0-9].so -a \
-f system/lib/libwiredtiger_snappy.so -a \
-f system/lib/libwiredtiger_bzip2.so || build_wt;
[ -d $BASEDIR/$WT_DIR ] || (echo "Missing WiredTiger source directory" && exit 1)
test -f system/lib/libwiredtiger-[0-9].[0-9].[0-9].so \
-a -f system/lib/libwiredtiger_snappy.so \
-a -f system/lib/libwiredtiger_bzip2.so || build_wt;
[ -d $BASEDIR/../priv ] || mkdir ${BASEDIR}/../priv
cp $BASEDIR/system/bin/wt ${BASEDIR}/../priv

0
c_src/bzip2-build.patch Normal file
View file

View file

@ -23,14 +23,6 @@
#include <string.h>
#include <errno.h>
#include "wiredtiger.h"
#include "async_nif.h"
#include "khash.h"
#ifdef WTERL_STATS
#include "stats.h"
#endif
#ifdef DEBUG
#include <stdio.h>
#include <stdarg.h>
@ -43,6 +35,14 @@
# define dprint(s, ...) {}
#endif
#include "wiredtiger.h"
#include "async_nif.h"
#include "khash.h"
#ifdef WTERL_STATS
#include "stats.h"
#endif
static ErlNifResourceType *wterl_conn_RESOURCE;
static ErlNifResourceType *wterl_cursor_RESOURCE;

View file

@ -560,12 +560,16 @@ size_cache(RequestedSize) ->
-ifdef(TEST).
simple_test_() ->
?assertCmd("rm -rf test/wterl-backend"),
{ok, CWD} = file:get_cwd(),
?assertMatch(true, lists:suffix("wterl/.eunit", CWD)),
rmdir:path(filename:join([CWD, "test/wterl-backend"])), %?assertCmd("rm -rf test/wterl-backend"),
application:set_env(wterl, data_root, "test/wterl-backend"),
temp_riak_kv_backend:standard_test(?MODULE, []).
custom_config_test_() ->
?assertCmd("rm -rf test/wterl-backend"),
{ok, CWD} = file:get_cwd(),
?assertMatch(true, lists:suffix("wterl/.eunit", CWD)),
rmdir:path(filename:join([CWD, "test/wterl-backend"])), %?assertCmd("rm -rf test/wterl-backend"),
application:set_env(wterl, data_root, ""),
temp_riak_kv_backend:standard_test(?MODULE, [{data_root, "test/wterl-backend"}]).

26
src/rmdir.erl Normal file
View file

@ -0,0 +1,26 @@
-module(rmdir).
-export([path/1]).
-include_lib("kernel/include/file.hrl").
path(Dir) ->
remove_all_files(".", [Dir]).
remove_all_files(Dir, Files) ->
lists:foreach(fun(File) ->
FilePath = filename:join([Dir, File]),
case file:read_file_info(FilePath) of
{ok, FileInfo} ->
case FileInfo#file_info.type of
directory ->
{ok, DirFiles} = file:list_dir(FilePath),
remove_all_files(FilePath, DirFiles),
file:del_dir(FilePath);
_ ->
file:delete(FilePath)
end;
{error, _Reason} ->
ok
end
end, Files).

View file

@ -1,6 +1,6 @@
{application, wterl,
[
{description, "Erlang Wrapper for WiredTiger"},
{description, "Erlang NIF Wrapper for WiredTiger"},
{vsn, "0.9.0"},
{registered, []},
{applications, [

View file

@ -512,29 +512,6 @@ config_to_bin([{Key, Value} | Rest], Acc) ->
%% ===================================================================
-ifdef(TEST).
-include_lib("kernel/include/file.hrl").
remove_dir_tree(Dir) ->
remove_all_files(".", [Dir]).
remove_all_files(Dir, Files) ->
lists:foreach(fun(File) ->
FilePath = filename:join([Dir, File]),
case file:read_file_info(FilePath) of
{ok, FileInfo} ->
case FileInfo#file_info.type of
directory ->
{ok, DirFiles} = file:list_dir(FilePath),
remove_all_files(FilePath, DirFiles),
file:del_dir(FilePath);
_ ->
file:delete(FilePath)
end;
{error, _Reason} ->
ok
end
end, Files).
-define(TEST_DATA_DIR, "test/wterl.basic").
open_test_conn(DataDir) ->
@ -542,7 +519,7 @@ open_test_conn(DataDir) ->
open_test_conn(DataDir, OpenConfig) ->
{ok, CWD} = file:get_cwd(),
?assertMatch(true, lists:suffix("wterl/.eunit", CWD)),
remove_dir_tree(filename:join([CWD, DataDir])), %?cmd("rm -rf " ++ filename:join([CWD, DataDir])),
rmdir:path(filename:join([CWD, DataDir])), %?cmd("rm -rf " ++ filename:join([CWD, DataDir])),
?assertMatch(ok, filelib:ensure_dir(filename:join([DataDir, "x"]))),
{ok, ConnRef} = connection_open(filename:join([CWD, DataDir]), OpenConfig),
ConnRef.
@ -594,7 +571,8 @@ conn_test_() ->
ConnRef = open_test_table(ConnRef, "table", [{block_compressor, "bzip2"}]),
?assertMatch(ok, verify(ConnRef, "table:test")),
?assertMatch(ok, drop(ConnRef, "table:test"))
end}]}
end}
]}
end}.
insert_delete_test() ->

View file

@ -1,5 +1,7 @@
#!/bin/sh -
# Note: also, remember to update version numbers in rpath specs so that shared libs can be found at runtime!!!
wterl=`git log -n 1 --pretty=format:"%H"`
wiredtiger0=`(cd c_src/wiredtiger && git log -n 1 --pretty=format:"%H")`
wiredtiger=`echo $wiredtiger0 | awk '{print $2}'`