Merge remote-tracking branch 'origin/master' into gsb-rcu

Conflicts:
	c_src/async_nif.h
	c_src/build_deps.sh
	src/async_nif.hrl
This commit is contained in:
Gregory Burd 2013-08-21 15:56:31 -04:00
commit da418b4abf
8 changed files with 109 additions and 90 deletions

View file

@ -37,12 +37,20 @@ extern "C" {
#define ASYNC_NIF_MAX_WORKERS 1024
#define ASYNC_NIF_MIN_WORKERS 2
#define ASYNC_NIF_WORKER_QUEUE_SIZE 100
#define ASYNC_NIF_WORKER_QUEUE_SIZE 8192
#define ASYNC_NIF_MAX_QUEUED_REQS ASYNC_NIF_WORKER_QUEUE_SIZE * ASYNC_NIF_MAX_WORKERS
static DEFINE_URCU_TLS(unsigned long long, nr_enqueues);
static DEFINE_URCU_TLS(unsigned long long, nr_dequeues);
/* Atoms (initialized in on_load) */
static ERL_NIF_TERM ATOM_EAGAIN;
static ERL_NIF_TERM ATOM_ENOMEM;
static ERL_NIF_TERM ATOM_ENQUEUED;
static ERL_NIF_TERM ATOM_ERROR;
static ERL_NIF_TERM ATOM_OK;
static ERL_NIF_TERM ATOM_SHUTDOWN;
struct async_nif_req_entry {
ERL_NIF_TERM ref;
ErlNifEnv *env;
@ -58,6 +66,7 @@ struct async_nif_work_queue {
unsigned int depth;
struct cds_lfq_queue_rcu req_queue;
struct async_nif_work_queue *next;
STAILQ_HEAD(reqs, async_nif_req_entry) reqs;
};
struct async_nif_worker_entry {
@ -106,25 +115,20 @@ struct async_nif_state {
argc -= 1; \
/* Note: !!! this assumes that the first element of priv_data is ours */ \
struct async_nif_state *async_nif = *(struct async_nif_state**)enif_priv_data(env); \
if (uatomic_read(&async_nif->shutdown)) { \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "shutdown")); \
} \
if (async_nif->shutdown) \
return enif_make_tuple2(env, ATOM_ERROR, ATOM_SHUTDOWN); \
req = async_nif_reuse_req(async_nif); \
if (!req) { \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "eagain")); \
} \
if (!req) \
return enif_make_tuple2(env, ATOM_ERROR, ATOM_ENOMEM); \
new_env = req->env; \
DPRINTF("async_nif: calling \"%s\"", __func__); \
do pre_block while(0); \
DPRINTF("async_nif: returned from \"%s\"", __func__); \
copy_of_args = (struct decl##_args *)enif_alloc(sizeof(struct decl##_args)); \
copy_of_args = (struct decl ## _args *)malloc(sizeof(struct decl ## _args)); \
if (!copy_of_args) { \
fn_post_##decl (args); \
async_nif_recycle_req(req, async_nif); \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "enomem")); \
return enif_make_tuple2(env, ATOM_ERROR, ATOM_ENOMEM); \
} \
memcpy(copy_of_args, args, sizeof(struct decl##_args)); \
req->ref = enif_make_copy(new_env, argv_in[0]); \
@ -139,9 +143,8 @@ struct async_nif_state {
if (!reply) { \
fn_post_##decl (args); \
async_nif_recycle_req(req, async_nif); \
enif_free(copy_of_args); \
return enif_make_tuple2(env, enif_make_atom(env, "error"), \
enif_make_atom(env, "eagain")); \
free(copy_of_args); \
return enif_make_tuple2(env, ATOM_ERROR, ATOM_EAGAIN); \
} \
return reply; \
}
@ -149,11 +152,11 @@ struct async_nif_state {
#define ASYNC_NIF_INIT(name) \
static ErlNifMutex *name##_async_nif_coord = NULL;
#define ASYNC_NIF_LOAD(name, priv) do { \
#define ASYNC_NIF_LOAD(name, env, priv) do { \
if (!name##_async_nif_coord) \
name##_async_nif_coord = enif_mutex_create("nif_coord load"); \
enif_mutex_lock(name##_async_nif_coord); \
priv = async_nif_load(); \
priv = async_nif_load(env); \
enif_mutex_unlock(name##_async_nif_coord); \
} while(0);
#define ASYNC_NIF_UNLOAD(name, env, priv) do { \
@ -206,7 +209,7 @@ async_nif_reuse_req(struct async_nif_state *async_nif)
return req;
} else {
if (uatomic_read(&async_nif->num_reqs) < ASYNC_NIF_MAX_QUEUED_REQS) {
req = enif_alloc(sizeof(struct async_nif_req_entry));
req = malloc(sizeof(struct async_nif_req_entry));
if (req) {
memset(req, 0, sizeof(struct async_nif_req_entry));
env = enif_alloc_env();
@ -214,11 +217,10 @@ async_nif_reuse_req(struct async_nif_state *async_nif)
req->env = env;
uatomic_inc(&async_nif->num_reqs);
} else {
enif_free(req);
free(req);
req = NULL;
}
}
return req;
}
}
return req;
@ -240,7 +242,7 @@ async_nif_recycle_req(struct async_nif_req_entry *req, struct async_nif_state *a
3) keep a pointer to the env so we can reset it in the req */
ErlNifEnv *env = req->env;
enif_clear_env(req->env);
if (req->args) enif_free(req->args);
if (req->args) free(req->args);
memset(req, 0, sizeof(struct async_nif_req_entry));
req->env = env;
@ -275,7 +277,7 @@ async_nif_start_worker(struct async_nif_state *async_nif, struct async_nif_work_
if (worker) {
void *exit_value = 0; /* We ignore the thread_join's exit value. */
enif_thread_join(worker->tid, &exit_value);
enif_free(worker);
free(worker);
uatomic_dec(&async_nif->num_active_workers);
} else
break;
@ -284,7 +286,7 @@ async_nif_start_worker(struct async_nif_state *async_nif, struct async_nif_work_
if (uatomic_read(&async_nif->num_active_workers) >= ASYNC_NIF_MAX_WORKERS)
return EAGAIN;
worker = enif_alloc(sizeof(struct async_nif_worker_entry));
worker = malloc(sizeof(struct async_nif_worker_entry));
if (!worker) return ENOMEM;
memset(worker, 0, sizeof(struct async_nif_worker_entry));
worker->worker_id = uatomic_add_return(&async_nif->num_active_workers, 1);
@ -369,8 +371,10 @@ async_nif_enqueue_req(struct async_nif_state* async_nif, struct async_nif_req_en
/* Build the term before releasing the lock so as not to race on the use of
the req pointer (which will soon become invalid in another thread
performing the request). */
ERL_NIF_TERM reply = enif_make_tuple2(req->env, enif_make_atom(req->env, "ok"),
enif_make_atom(req->env, "enqueued"));
double pct_full = (double)avg_depth / (double)ASYNC_NIF_WORKER_QUEUE_SIZE;
ERL_NIF_TERM reply = enif_make_tuple2(req->env, ATOM_OK,
enif_make_tuple2(req->env, ATOM_ENQUEUED,
enif_make_double(req->env, pct_full)));
return reply;
}
@ -450,7 +454,7 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
if (worker) {
void *exit_value = 0; /* We ignore the thread_join's exit value. */
enif_thread_join(worker->tid, &exit_value);
enif_free(worker);
free(worker);
uatomic_dec(&async_nif->num_active_workers);
}
}
@ -467,13 +471,11 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
struct async_nif_req_entry *req;
req = caa_container_of(node, struct async_nif_req_entry, queue_entry);
enif_clear_env(req->env);
enif_send(NULL, &req->pid, req->env,
enif_make_tuple2(req->env, enif_make_atom(req->env, "error"),
enif_make_atom(req->env, "shutdown")));
enif_send(NULL, &req->pid, req->env, enif_make_tuple2(req->env, ATOM_ERROR, ATOM_SHUTDOWN));
req->fn_post(req->args);
enif_free(req->args);
enif_free_env(req->env);
enif_free(req);
free(req->args);
free_env(req->env);
free(req);
}
} while(node);
cds_lfq_destroy_rcu(&q->req_queue); // TODO(gburd): check return val
@ -485,19 +487,21 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
if (node) {
struct async_nif_req_entry *req;
req = caa_container_of(node, struct async_nif_req_entry, queue_entry);
enif_free_env(req->env);
enif_free(req);
free_env(req->env);
free(req->args);
free(req);
req = n;
}
} while(node);
cds_lfq_destroy_rcu(&async_nif->recycled_req_queue); // TODO(gburd): check return val
memset(async_nif, 0, sizeof(struct async_nif_state) + (sizeof(struct async_nif_work_queue) * async_nif->num_queues));
free_all_cpu_call_rcu_data();
enif_free(async_nif);
free(async_nif);
}
static void *
async_nif_load()
async_nif_load(ErlNifEnv *env)
{
static int has_init = 0;
unsigned int i, num_queues;
@ -508,6 +512,14 @@ async_nif_load()
if (has_init) return 0;
else has_init = 1;
/* Init some static references to commonly used atoms. */
ATOM_EAGAIN = enif_make_atom(env, "eagain");
ATOM_ENOMEM = enif_make_atom(env, "enomem");
ATOM_ENQUEUED = enif_make_atom(env, "enqueued");
ATOM_ERROR = enif_make_atom(env, "error");
ATOM_OK = enif_make_atom(env, "ok");
ATOM_SHUTDOWN = enif_make_atom(env, "shutdown");
/* Init the RCU library. */
rcu_init();
(void)create_all_cpu_call_rcu_data(0);
@ -529,8 +541,8 @@ async_nif_load()
}
/* Init our portion of priv_data's module-specific state. */
async_nif = enif_alloc(sizeof(struct async_nif_state) +
sizeof(struct async_nif_work_queue) * num_queues);
async_nif = malloc(sizeof(struct async_nif_state) +
sizeof(struct async_nif_work_queue) * num_queues);
if (!async_nif)
return NULL;
memset(async_nif, 0, sizeof(struct async_nif_state) +

View file

@ -17,7 +17,7 @@ set -e
WT_REPO=http://github.com/wiredtiger/wiredtiger.git
#WT_BRANCH=develop
#WT_DIR=wiredtiger-`basename $WT_BRANCH`
WT_REF="tags/1.6.3"
WT_REF="tags/1.6.4"
WT_DIR=wiredtiger-`basename $WT_REF`
# Google's Snappy Compression
@ -167,7 +167,8 @@ build_snappy ()
case "$1" in
clean)
#[ -e $BASEDIR/$WT_DIR/build_posix/Makefile ] && (cd $BASEDIR/$WT_DIR/build_posix && $MAKE clean)
[ -e $BASEDIR/$WT_DIR/build_posix/Makefile ] && \
(cd $BASEDIR/$WT_DIR/build_posix && $MAKE clean)
[ -e $BASEDIR/$URCU_DIR/Makefile ] && (cd $BASEDIR/$URCU_DIR && $MAKE clean)
rm -rf system $SNAPPY_DIR
rm -f ${BASEDIR}/../priv/wt
@ -189,19 +190,18 @@ case "$1" in
;;
*)
[ -d $URCU_DIR ] || get_urcu;
[ -d $WT_DIR ] || get_wt;
[ -d $SNAPPY_DIR ] || get_snappy;
# Build URCU
[ -d $BASEDIR/$URCU_DIR ] || (echo "Missing URCU source directory" && exit 1)
test -f $BASEDIR/system/lib/liburcu.a || build_urcu;
# Build Snappy
[ -d $SNAPPY_DIR ] || get_snappy;
[ -d $BASEDIR/$SNAPPY_DIR ] || (echo "Missing Snappy source directory" && exit 1)
test -f $BASEDIR/system/lib/libsnappy.so.[0-9].[0-9].[0-9] || build_snappy;
# Build URCU
[ -d $URCU_DIR ] || get_urcu;
[ -d $BASEDIR/$URCU_DIR ] || (echo "Missing URCU source directory" && exit 1)
test -f $BASEDIR/system/lib/liburcu.a || build_urcu;
# Build WiredTiger
[ -d $WT_DIR ] || get_wt;
[ -d $BASEDIR/$WT_DIR ] || (echo "Missing WiredTiger source directory" && exit 1)
test -f $BASEDIR/system/lib/libwiredtiger-[0-9].[0-9].[0-9].so \
-a -f $BASEDIR/system/lib/libwiredtiger_snappy.so || build_wt;

View file

@ -7,8 +7,8 @@ index 6d78823..2122cf8 100644
lib_LTLIBRARIES = libwiredtiger_snappy.la
libwiredtiger_snappy_la_SOURCES = snappy_compress.c
-libwiredtiger_snappy_la_LDFLAGS = -avoid-version -module
+libwiredtiger_snappy_la_CFLAGS = -I$(src_builddir)/../../system/include
+libwiredtiger_snappy_la_LDFLAGS = -avoid-version -module -L$(src_builddir)/../../system/lib -Wl,-rpath,lib/wterl-0.9.0/priv:lib/wterl/priv:priv
+libwiredtiger_snappy_la_CFLAGS = -I$(abs_top_builddir)/../../system/include
+libwiredtiger_snappy_la_LDFLAGS = -avoid-version -module -L$(abs_top_builddir)/../../system/lib -Wl,-rpath,lib/wterl-0.9.0/priv:lib/wterl/priv:priv
libwiredtiger_snappy_la_LIBADD = -lsnappy
diff --git a/src/support/cksum.c b/src/support/cksum.c
index 7e9befe..b924db7 100644

View file

@ -204,7 +204,7 @@ __ctx_cache_evict(WterlConnHandle *conn_handle)
STAILQ_REMOVE(&conn_handle->cache, c, wterl_ctx, entries);
if (c->session)
c->session->close(c->session, NULL);
enif_free(c);
free(c);
num_evicted++;
}
}
@ -333,7 +333,7 @@ __retain_ctx(WterlConnHandle *conn_handle, uint32_t worker_id,
int rc = conn->open_session(conn, NULL, session_config, &session);
if (rc != 0) return rc;
size_t s = sizeof(struct wterl_ctx) + (count * sizeof(struct cursor_info)) + sig_len;
c = enif_alloc(s); // TODO: enif_alloc_resource()
c = malloc(s); // TODO: enif_alloc_resource()
if (c == NULL) {
session->close(session, NULL);
return ENOMEM;
@ -355,7 +355,7 @@ __retain_ctx(WterlConnHandle *conn_handle, uint32_t worker_id,
c->ci[i].config = __copy_str_into(&p, config);
rc = session->open_cursor(session, uri, NULL, config, &c->ci[i].cursor);
if (rc != 0) {
enif_free(c);
free(c);
session->close(session, NULL); // this will free the cursors too
va_end(ap);
return rc;
@ -405,7 +405,7 @@ __close_all_sessions(WterlConnHandle *conn_handle)
STAILQ_REMOVE(&conn_handle->cache, c, wterl_ctx, entries);
conn_handle->cache_size -= 1;
c->session->close(c->session, NULL);
enif_free(c);
free(c);
c = n;
}
}
@ -431,7 +431,7 @@ __close_cursors_on(WterlConnHandle *conn_handle, const char *uri)
STAILQ_REMOVE(&conn_handle->cache, c, wterl_ctx, entries);
conn_handle->cache_size -= 1;
c->session->close(c->session, NULL);
enif_free(c);
free(c);
break;
}
}
@ -637,7 +637,7 @@ ASYNC_NIF_DECL(
return;
}
if (session_config.size > 1) {
char *sc = enif_alloc(session_config.size);
char *sc = malloc(session_config.size);
if (!sc) {
enif_release_resource(conn_handle);
ASYNC_NIF_REPLY(__strerror_term(env, ENOMEM));
@ -698,7 +698,7 @@ ASYNC_NIF_DECL(
enif_mutex_lock(args->conn_handle->cache_mutex);
__close_all_sessions(args->conn_handle);
if (args->conn_handle->session_config) {
enif_free((char *)args->conn_handle->session_config);
free((char *)args->conn_handle->session_config);
args->conn_handle->session_config = NULL;
}
WT_CONNECTION* conn = args->conn_handle->conn;
@ -2265,7 +2265,7 @@ on_load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info)
ATOM_WIREDTIGER_VSN = enif_make_atom(env, "wiredtiger_vsn");
ATOM_MSG_PID = enif_make_atom(env, "message_pid");
struct wterl_priv_data *priv = enif_alloc(sizeof(struct wterl_priv_data));
struct wterl_priv_data *priv = malloc(sizeof(struct wterl_priv_data));
if (!priv)
return ENOMEM;
memset(priv, 0, sizeof(struct wterl_priv_data));
@ -2293,10 +2293,10 @@ on_load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info)
/* Note: !!! the first element of our priv_data struct *must* be the
pointer to the async_nif's private data which we set here. */
ASYNC_NIF_LOAD(wterl, priv->async_nif_priv);
ASYNC_NIF_LOAD(wterl, env, priv->async_nif_priv);
if (!priv->async_nif_priv) {
memset(priv, 0, sizeof(struct wterl_priv_data));
enif_free(priv);
free(priv);
return ENOMEM;
}
*priv_data = priv;
@ -2342,7 +2342,7 @@ on_unload(ErlNifEnv *env, void *priv_data)
enif_free_env(eh->msg_env_progress);
memset(priv, 0, sizeof(struct wterl_priv_data));
enif_free(priv);
free(priv);
priv_data = NULL;
}

View file

@ -21,28 +21,31 @@
%%
%% -------------------------------------------------------------------
-spec async_nif_enqueue(reference(), function(), [term()]) -> term() | {error, term()}.
async_nif_enqueue(F, A) ->
R = erlang:make_ref(),
case erlang:apply(F, [R|A]) of
{ok, enqueued} ->
receive
{R, {error, shutdown}=Error} ->
%% Work unit was queued, but not executed.
Error;
{R, {error, _Reason}=Error} ->
%% Work unit returned an error.
Error;
{R, Reply} ->
Reply
end;
{error, eagain} ->
%% Work unit was not queued, try again.
async_nif_enqueue(R, F, A);
%{error, enomem} ->
%{error, shutdown} ->
Other ->
Other
end.
-define(ASYNC_NIF_CALL(Fun, Args), async_nif_enqueue(Fun, Args)).
-define(ASYNC_NIF_CALL(Fun, Args),
F = fun(F) ->
R = erlang:make_ref(),
case erlang:apply(Fun, [R|Args]) of
{ok, {enqueued, PctBusy}} ->
if
PctBusy > 0.25 andalso PctBusy =< 1.0 ->
erlang:bump_reductions(erlang:trunc(2000 * PctBusy));
true ->
ok
end,
receive
{R, {error, shutdown}=Error} ->
%% Work unit was queued, but not executed.
Error;
{R, {error, _Reason}=Error} ->
%% Work unit returned an error.
Error;
{R, Reply} ->
Reply
end;
{error, eagain} ->
F(F);
Other ->
Other
end
end,
F(F)).

View file

@ -544,7 +544,7 @@ from_index_key(LKey) ->
%% @private
%% Return all status from wterl statistics cursor
fetch_status(Cursor) ->
{ok, fetch_status(Cursor, wterl:cursor_next_value(Cursor), [])}.
{ok, fetch_status(Cursor, wterl:cursor_next_value(Cursor), [])}.
fetch_status(_Cursor, {error, _}, Acc) ->
lists:reverse(Acc);
fetch_status(_Cursor, not_found, Acc) ->

View file

@ -613,7 +613,7 @@ many_open_tables_test_() ->
DataDir = ?TEST_DATA_DIR,
KeyGen =
fun(X) ->
crypto:sha(<<X>>)
crypto:hash(sha, <<X>>)
end,
ValGen =
fun() ->

View file

@ -26,6 +26,12 @@
{mode, max}.
{duration, 10}.
{concurrent, 4}.
{report_interval, 1}.
{pb_timeout_general, 1000}. % ms
%{pb_timeout_read, ?}.
%{pb_timeout_write, ?}.
%{pb_timeout_listkeys, ?}.
%{pb_timeout_mapreduce, ?}.
{driver, basho_bench_driver_wterl}.
{key_generator, {int_to_bin_littleendian,{uniform_int, 5000000}}}.
{value_generator, {fixed_bin, 10000}}.
@ -71,8 +77,6 @@
{connection, [
{create, true},
{sync, false},
{logging, true},
{transactional, true},
{session_max, 1024},
{cache_size, 4294967296},
{verbose, []},