Execute NIF calls on non-scheduler threads asynchronously #6

Merged
gburd merged 72 commits from gsb-async-nifs3 into master 2013-04-23 00:54:56 +00:00
3 changed files with 51 additions and 21 deletions
Showing only changes of commit 1ae8e5698f - Show all commits

View file

@ -26,6 +26,7 @@
extern "C" { extern "C" {
#endif #endif
#include <assert.h>
#ifdef ASYNC_NIF_STATS #ifdef ASYNC_NIF_STATS
#include "stats.h" // TODO: measure, measure... measure again #include "stats.h" // TODO: measure, measure... measure again
#endif #endif
@ -171,12 +172,13 @@ struct async_nif_state {
enif_self(env, &req->pid); \ enif_self(env, &req->pid); \
req->args = (void*)copy_of_args; \ req->args = (void*)copy_of_args; \
req->fn_work = (void (*)(ErlNifEnv *, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *))fn_work_ ## decl ; \ req->fn_work = (void (*)(ErlNifEnv *, ERL_NIF_TERM, ErlNifPid*, unsigned int, void *))fn_work_ ## decl ; \
req->fn_post = (void (*)(void *))fn_post_ ## decl; \
int h = -1; \ int h = -1; \
if (affinity) \ if (affinity) \
h = async_nif_str_hash_func(affinity) % async_nif->num_queues; \ h = async_nif_str_hash_func(affinity) % async_nif->num_queues; \
ERL_NIF_TERM reply = async_nif_enqueue_req(async_nif, req, h); \ ERL_NIF_TERM reply = async_nif_enqueue_req(async_nif, req, h); \
req->fn_post = (void (*)(void *))fn_post_ ## decl; \
if (!reply) { \ if (!reply) { \
fn_post_ ## decl (args); \
enif_free(req); \ enif_free(req); \
enif_free_env(new_env); \ enif_free_env(new_env); \
enif_free(copy_of_args); \ enif_free(copy_of_args); \
@ -293,23 +295,30 @@ async_nif_worker_fn(void *arg)
enif_cond_wait(q->reqs_cnd, q->reqs_mutex); enif_cond_wait(q->reqs_cnd, q->reqs_mutex);
goto check_again_for_work; goto check_again_for_work;
} else { } else {
assert(fifo_q_size(reqs, q->reqs) > 0);
assert(fifo_q_size(reqs, q->reqs) < fifo_q_capacity(reqs, q->reqs));
/* At this point the next req is ours to process and we hold the /* At this point the next req is ours to process and we hold the
reqs_mutex lock. Take the request off the queue. */ reqs_mutex lock. Take the request off the queue. */
req = fifo_q_get(reqs, q->reqs); req = fifo_q_get(reqs, q->reqs);
enif_mutex_unlock(q->reqs_mutex); enif_mutex_unlock(q->reqs_mutex);
/* Wake up another thread working on this queue. */ /* Ensure that there is at least one other worker watching this queue. */
enif_cond_signal(q->reqs_cnd); enif_cond_signal(q->reqs_cnd);
/* Finally, do the work. */ /* Finally, do the work, */
req->fn_work(req->env, req->ref, &req->pid, worker_id, req->args); req->fn_work(req->env, req->ref, &req->pid, worker_id, req->args);
/* and then call the post-work cleanup function. */
req->fn_post(req->args); req->fn_post(req->args);
/* Note: we don't call enif_free_env(req->env) because it has called
enif_send() which invalidates it (free'ing it for us). If a work /* Note: we don't call enif_free_env(req->env) because somewhere in the
block doesn't call ASYNC_NIF_REPLY() at some point then it must work block there has been a call to enif_send() which invalidates the
call ASYNC_NIF_NOREPLY() to free this env. */ environment we created which in theory will cause the next GC to free
it for us). If a work block doesn't call ASYNC_NIF_REPLY() at some
point then it must call ASYNC_NIF_NOREPLY() to free this env. */
enif_free(req->args); enif_free(req->args);
enif_free(req); enif_free(req);
req = NULL;
} }
} }
enif_thread_exit(0); enif_thread_exit(0);
@ -321,6 +330,7 @@ async_nif_unload(ErlNifEnv *env)
{ {
unsigned int i; unsigned int i;
struct async_nif_state *async_nif = (struct async_nif_state*)enif_priv_data(env); struct async_nif_state *async_nif = (struct async_nif_state*)enif_priv_data(env);
unsigned int num_queues = async_nif->num_queues;
/* Signal the worker threads, stop what you're doing and exit. To /* Signal the worker threads, stop what you're doing and exit. To
ensure that we don't race with the enqueue() process we first ensure that we don't race with the enqueue() process we first
@ -328,14 +338,14 @@ async_nif_unload(ErlNifEnv *env)
unlock. The enqueue function will take the queue mutex, then unlock. The enqueue function will take the queue mutex, then
test for shutdown condition, then enqueue only if not shutting test for shutdown condition, then enqueue only if not shutting
down. */ down. */
for (i = 0; i < async_nif->num_queues; i++) for (i = 0; i < num_queues; i++)
enif_mutex_lock(async_nif->queues[i].reqs_mutex); enif_mutex_lock(async_nif->queues[i].reqs_mutex);
async_nif->shutdown = 1; async_nif->shutdown = 1;
for (i = 0; i < async_nif->num_queues; i++) for (i = 0; i < num_queues; i++)
enif_mutex_unlock(async_nif->queues[i].reqs_mutex); enif_mutex_unlock(async_nif->queues[i].reqs_mutex);
/* Wake up any waiting worker threads. */ /* Wake up any waiting worker threads. */
for (i = 0; i < async_nif->num_queues; i++) { for (i = 0; i < num_queues; i++) {
struct async_nif_work_queue *q = &async_nif->queues[i]; struct async_nif_work_queue *q = &async_nif->queues[i];
enif_cond_broadcast(q->reqs_cnd); enif_cond_broadcast(q->reqs_cnd);
} }
@ -347,7 +357,6 @@ async_nif_unload(ErlNifEnv *env)
} }
/* Cleanup requests, mutexes and conditions in each work queue. */ /* Cleanup requests, mutexes and conditions in each work queue. */
unsigned int num_queues = async_nif->num_queues;
for (i = 0; i < num_queues; i++) { for (i = 0; i < num_queues; i++) {
struct async_nif_work_queue *q = &async_nif->queues[i]; struct async_nif_work_queue *q = &async_nif->queues[i];
enif_mutex_destroy(q->reqs_mutex); enif_mutex_destroy(q->reqs_mutex);
@ -394,7 +403,20 @@ async_nif_load(void)
memset(async_nif, 0, sizeof(struct async_nif_state) + memset(async_nif, 0, sizeof(struct async_nif_state) +
sizeof(struct async_nif_work_queue) * info.scheduler_threads); sizeof(struct async_nif_work_queue) * info.scheduler_threads);
if (info.scheduler_threads > ASYNC_NIF_MAX_WORKERS / 2) {
async_nif->num_queues = ASYNC_NIF_MAX_WORKERS / 2;
} else {
int remainder = ASYNC_NIF_MAX_WORKERS % info.scheduler_threads;
if (remainder != 0) {
async_nif->num_queues = info.scheduler_threads - remainder;
} else {
async_nif->num_queues = info.scheduler_threads; async_nif->num_queues = info.scheduler_threads;
}
if (async_nif->num_queues < 2) {
async_nif->num_queues = 2;
}
}
async_nif->num_workers = ASYNC_NIF_MAX_WORKERS; // TODO: start with 2 per queue, then grow if needed
async_nif->next_q = 0; async_nif->next_q = 0;
async_nif->shutdown = 0; async_nif->shutdown = 0;
@ -409,9 +431,7 @@ async_nif_load(void)
memset(async_nif->worker_entries, 0, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS); memset(async_nif->worker_entries, 0, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS);
/* Start the worker threads. */ /* Start the worker threads. */
unsigned int num_workers = async_nif->num_queues; for (i = 0; i < async_nif->num_workers; i++) {
for (i = 0; i < num_workers; i++) {
struct async_nif_worker_entry *we = &async_nif->worker_entries[i]; struct async_nif_worker_entry *we = &async_nif->worker_entries[i];
we->async_nif = async_nif; we->async_nif = async_nif;
we->worker_id = i; we->worker_id = i;
@ -423,8 +443,6 @@ async_nif_load(void)
for (j = 0; j < async_nif->num_queues; j++) { for (j = 0; j < async_nif->num_queues; j++) {
struct async_nif_work_queue *q = &async_nif->queues[j]; struct async_nif_work_queue *q = &async_nif->queues[j];
enif_cond_broadcast(q->reqs_cnd); enif_cond_broadcast(q->reqs_cnd);
enif_mutex_destroy(q->reqs_mutex);
enif_cond_destroy(q->reqs_cnd);
} }
while(i-- > 0) { while(i-- > 0) {
@ -432,11 +450,17 @@ async_nif_load(void)
enif_thread_join(async_nif->worker_entries[i].tid, &exit_value); enif_thread_join(async_nif->worker_entries[i].tid, &exit_value);
} }
for (j = 0; j < async_nif->num_queues; j++) {
struct async_nif_work_queue *q = &async_nif->queues[j];
enif_mutex_destroy(q->reqs_mutex);
enif_cond_destroy(q->reqs_cnd);
}
memset(async_nif->worker_entries, 0, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS); memset(async_nif->worker_entries, 0, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS);
enif_free(async_nif);
return NULL; return NULL;
} }
} }
async_nif->num_workers = i;
return async_nif; return async_nif;
} }

View file

@ -48,6 +48,7 @@ get_wt ()
mv wiredtiger $WT_DIR || exit 1 mv wiredtiger $WT_DIR || exit 1
fi fi
[ -d $BASEDIR/$WT_DIR ] || (echo "Missing WiredTiger source directory" && exit 1) [ -d $BASEDIR/$WT_DIR ] || (echo "Missing WiredTiger source directory" && exit 1)
(cd $BASEDIR/$WT_DIR && git cherry-pick a3c8c2a13758ae9c44edabcc1a780984a7882904 || exit 1)
(cd $BASEDIR/$WT_DIR (cd $BASEDIR/$WT_DIR
[ -e $BASEDIR/wiredtiger-build.patch ] && \ [ -e $BASEDIR/wiredtiger-build.patch ] && \
(patch -p1 --forward < $BASEDIR/wiredtiger-build.patch || exit 1 ) (patch -p1 --forward < $BASEDIR/wiredtiger-build.patch || exit 1 )
@ -167,12 +168,13 @@ case "$1" in
[ -d $BASEDIR/$WT_DIR ] || (echo "Missing WiredTiger source directory" && exit 1) [ -d $BASEDIR/$WT_DIR ] || (echo "Missing WiredTiger source directory" && exit 1)
test -f system/lib/libwiredtiger-[0-9].[0-9].[0-9].so \ test -f system/lib/libwiredtiger-[0-9].[0-9].[0-9].so \
-a -f system/lib/libwiredtiger_snappy.so \ -a -f system/lib/libwiredtiger_snappy.so \
-a -f system/lib/libwiredtiger_bzip2.so || build_wt; -a -f system/lib/libwiredtiger_bzip2.so.[0-9].[0-9].[0-9] || build_wt;
[ -d $BASEDIR/../priv ] || mkdir ${BASEDIR}/../priv [ -d $BASEDIR/../priv ] || mkdir ${BASEDIR}/../priv
cp $BASEDIR/system/bin/wt ${BASEDIR}/../priv cp $BASEDIR/system/bin/wt ${BASEDIR}/../priv
cp $BASEDIR/system/lib/libwiredtiger-[0-9].[0-9].[0-9].so ${BASEDIR}/../priv cp $BASEDIR/system/lib/libwiredtiger-[0-9].[0-9].[0-9].so ${BASEDIR}/../priv
cp $BASEDIR/system/lib/libwiredtiger_*.so ${BASEDIR}/../priv cp $BASEDIR/system/lib/libwiredtiger_snappy.so ${BASEDIR}/../priv
cp $BASEDIR/system/lib/libwiredtiger_bzip2.so.[0-9].[0-9].[0-9] ${BASEDIR}/../priv
cp $BASEDIR/system/lib/libbz2.so.[0-9].[0-9].[0-9] ${BASEDIR}/../priv cp $BASEDIR/system/lib/libbz2.so.[0-9].[0-9].[0-9] ${BASEDIR}/../priv
cp $BASEDIR/system/lib/libsnappy.so.[0-9].[0-9].[0-9] ${BASEDIR}/../priv cp $BASEDIR/system/lib/libsnappy.so.[0-9].[0-9].[0-9] ${BASEDIR}/../priv
;; ;;

View file

@ -355,7 +355,11 @@ max_sessions(Config) ->
undefined -> 1024; undefined -> 1024;
Size -> Size Size -> Size
end, end,
1000 * (RingSize * erlang:system_info(schedulers)). % TODO: review/fix this logic Est = 1000 * (RingSize * erlang:system_info(schedulers)), % TODO: review/fix this logic
case Est > 1000000000 of % Note: WiredTiger uses a signed int for this
true -> 1000000000;
false -> Est
end.
%% @private %% @private
establish_utility_cursors(Connection, Table) -> establish_utility_cursors(Connection, Table) ->