Compare commits

...

1 commit

Author SHA1 Message Date
Gregory Burd
8e1e1b192d Allow clients of async_nif to set the number of concurrent workers. 2013-05-16 11:18:32 -04:00
7 changed files with 101 additions and 27 deletions

View file

@ -84,7 +84,7 @@ repl:
@$(ERL) -pa ebin -pz deps/lager/ebin @$(ERL) -pa ebin -pz deps/lager/ebin
eunit-repl: eunit-repl:
@$(ERL) -pa .eunit deps/lager/ebin @$(ERL) -pa .eunit -pz ebin deps/lager/ebin
ERL_TOP= /home/gburd/eng/otp_R15B01 ERL_TOP= /home/gburd/eng/otp_R15B01
CERL= ${ERL_TOP}/bin/cerl CERL= ${ERL_TOP}/bin/cerl

View file

@ -34,7 +34,7 @@ extern "C" {
#define __UNUSED(v) ((void)(v)) #define __UNUSED(v) ((void)(v))
#endif #endif
#define ASYNC_NIF_MAX_WORKERS 128 #define ASYNC_NIF_MAX_WORKERS 1024
#define ASYNC_NIF_WORKER_QUEUE_SIZE 500 #define ASYNC_NIF_WORKER_QUEUE_SIZE 500
#define ASYNC_NIF_MAX_QUEUED_REQS 1000 * ASYNC_NIF_MAX_WORKERS #define ASYNC_NIF_MAX_QUEUED_REQS 1000 * ASYNC_NIF_MAX_WORKERS
@ -67,6 +67,7 @@ struct async_nif_worker_entry {
struct async_nif_state { struct async_nif_state {
STAT_DEF(qwait); STAT_DEF(qwait);
unsigned int shutdown; unsigned int shutdown;
ErlNifMutex *worker_mutex;
unsigned int num_workers; unsigned int num_workers;
struct async_nif_worker_entry worker_entries[ASYNC_NIF_MAX_WORKERS]; struct async_nif_worker_entry worker_entries[ASYNC_NIF_MAX_WORKERS];
unsigned int num_queues; unsigned int num_queues;
@ -168,6 +169,9 @@ struct async_nif_state {
#define ASYNC_NIF_REPLY(msg) enif_send(NULL, pid, env, enif_make_tuple2(env, ref, msg)) #define ASYNC_NIF_REPLY(msg) enif_send(NULL, pid, env, enif_make_tuple2(env, ref, msg))
#define ASYNC_NIF_SET_MIN_WORKERS(n) \
async_nif_set_min_active_workers(*(struct async_nif_state**)enif_priv_data(env), (n))
/** /**
* Return a request structure from the recycled req queue if one exists, * Return a request structure from the recycled req queue if one exists,
* otherwise create one. * otherwise create one.
@ -425,10 +429,42 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
enif_mutex_unlock(async_nif->recycled_req_mutex); enif_mutex_unlock(async_nif->recycled_req_mutex);
enif_mutex_destroy(async_nif->recycled_req_mutex); enif_mutex_destroy(async_nif->recycled_req_mutex);
enif_mutex_destroy(async_nif->worker_mutex);
memset(async_nif, 0, sizeof(struct async_nif_state) + (sizeof(struct async_nif_work_queue) * async_nif->num_queues)); memset(async_nif, 0, sizeof(struct async_nif_state) + (sizeof(struct async_nif_work_queue) * async_nif->num_queues));
enif_free(async_nif); enif_free(async_nif);
} }
static int
async_nif_set_min_active_workers(struct async_nif_state *async_nif, unsigned int n)
{
unsigned int i = 0;
enif_mutex_lock(async_nif->worker_mutex);
if (n <= 0 || n <= async_nif->num_workers || n > ASYNC_NIF_MAX_WORKERS)
return async_nif->num_workers;
/* Start the worker threads. */
for (i = async_nif->num_workers; i < n; i++) {
struct async_nif_worker_entry *we = &async_nif->worker_entries[i];
we->async_nif = async_nif;
we->worker_id = i;
we->q = &async_nif->queues[i % async_nif->num_queues];
if (enif_thread_create(NULL, &async_nif->worker_entries[i].tid,
&async_nif_worker_fn, (void*)we, NULL) != 0) {
/* error, return number of threads started so far */
we->async_nif = 0;
we->worker_id = 0;
we->q = 0;
return i;
} else {
async_nif->num_workers = i + 1;
}
}
enif_mutex_unlock(async_nif->worker_mutex);
return i;
}
static void * static void *
async_nif_load() async_nif_load()
{ {
@ -466,7 +502,6 @@ async_nif_load()
sizeof(struct async_nif_work_queue) * num_queues); sizeof(struct async_nif_work_queue) * num_queues);
async_nif->num_queues = num_queues; async_nif->num_queues = num_queues;
async_nif->num_workers = 2 * num_queues;
async_nif->next_q = 0; async_nif->next_q = 0;
async_nif->shutdown = 0; async_nif->shutdown = 0;
async_nif->recycled_reqs = fifo_q_new(reqs, ASYNC_NIF_MAX_QUEUED_REQS); async_nif->recycled_reqs = fifo_q_new(reqs, ASYNC_NIF_MAX_QUEUED_REQS);
@ -485,15 +520,12 @@ async_nif_load()
memset(async_nif->worker_entries, 0, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS); memset(async_nif->worker_entries, 0, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS);
/* Start the worker threads. */ /* Start the worker threads. */
for (i = 0; i < async_nif->num_workers; i++) { async_nif->worker_mutex = enif_mutex_create(NULL);
struct async_nif_worker_entry *we = &async_nif->worker_entries[i]; unsigned int started = async_nif_set_min_active_workers(async_nif, 2 * num_queues);
we->async_nif = async_nif; if (started != 2 * num_queues) {
we->worker_id = i; /* Something went wrong, shut down. */
we->q = &async_nif->queues[i % async_nif->num_queues];
if (enif_thread_create(NULL, &async_nif->worker_entries[i].tid,
&async_nif_worker_fn, (void*)we, NULL) != 0) {
async_nif->shutdown = 1; async_nif->shutdown = 1;
enif_mutex_lock(async_nif->worker_mutex);
for (j = 0; j < async_nif->num_queues; j++) { for (j = 0; j < async_nif->num_queues; j++) {
struct async_nif_work_queue *q = &async_nif->queues[j]; struct async_nif_work_queue *q = &async_nif->queues[j];
enif_cond_broadcast(q->reqs_cnd); enif_cond_broadcast(q->reqs_cnd);
@ -511,10 +543,12 @@ async_nif_load()
} }
memset(async_nif->worker_entries, 0, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS); memset(async_nif->worker_entries, 0, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS);
enif_mutex_unlock(async_nif->worker_mutex);
enif_mutex_destroy(async_nif->worker_mutex);
enif_free(async_nif); enif_free(async_nif);
return NULL; return NULL;
} }
} async_nif->num_workers = started;
return async_nif; return async_nif;
} }

View file

@ -1,4 +1,4 @@
#!/bin/bash #!/bin/bash -x
# /bin/sh on Solaris is not a POSIX compatible shell, but /usr/bin/ksh is. # /bin/sh on Solaris is not a POSIX compatible shell, but /usr/bin/ksh is.
if [ `uname -s` = 'SunOS' -a "${POSIX_SHELL}" != "true" ]; then if [ `uname -s` = 'SunOS' -a "${POSIX_SHELL}" != "true" ]; then
@ -39,7 +39,7 @@ get_wt ()
git clone ${WT_REPO} && \ git clone ${WT_REPO} && \
(cd $BASEDIR/wiredtiger && git checkout $WT_VSN || exit 1) (cd $BASEDIR/wiredtiger && git checkout $WT_VSN || exit 1)
else else
git clone -b ${WT_BRANCH} ${WT_REPO} && \ git clone ${WT_REPO} && \
(cd $BASEDIR/wiredtiger && git checkout $WT_BRANCH origin/$WT_BRANCH || exit 1) (cd $BASEDIR/wiredtiger && git checkout $WT_BRANCH origin/$WT_BRANCH || exit 1)
fi fi
mv wiredtiger $WT_DIR || exit 1 mv wiredtiger $WT_DIR || exit 1
@ -49,8 +49,8 @@ get_wt ()
[ -e $BASEDIR/wiredtiger-build.patch ] && \ [ -e $BASEDIR/wiredtiger-build.patch ] && \
(patch -p1 --forward < $BASEDIR/wiredtiger-build.patch || exit 1 ) (patch -p1 --forward < $BASEDIR/wiredtiger-build.patch || exit 1 )
./autogen.sh || exit 1 ./autogen.sh || exit 1
cd ./build_posix || exit 1 [ -e $BASEDIR/$WT_DIR/build_posix/Makefile ] && \
[ -e Makefile ] && $MAKE distclean (cd $BASEDIR/$WT_DIR/build_posix && $MAKE distclean)
wt_configure; wt_configure;
) )
} }
@ -109,7 +109,8 @@ build_snappy ()
case "$1" in case "$1" in
clean) clean)
[ -d $WT_DIR/build_posix ] && (cd $WT_DIR/build_posix; make distclean) [ -e $BASEDIR/$WT_DIR/build_posix/Makefile ] && \
(cd $BASEDIR/$WT_DIR/build_posix && $MAKE distclean)
rm -rf system $SNAPPY_DIR rm -rf system $SNAPPY_DIR
rm -f ${BASEDIR}/../priv/wt rm -f ${BASEDIR}/../priv/wt
rm -f ${BASEDIR}/../priv/libwiredtiger-*.so rm -f ${BASEDIR}/../priv/libwiredtiger-*.so

View file

@ -2024,6 +2024,25 @@ ASYNC_NIF_DECL(
}); });
/**
* Used to set the minimum number of worker threads.
*/
static ERL_NIF_TERM
wterl_set_concurrency(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
int n = 0;
if (!(argc == 1 && enif_is_number(env, argv[0]))) {
return enif_make_badarg(env);
}
enif_get_int(env, argv[0], &n);
ASYNC_NIF_SET_MIN_WORKERS(n);
return ATOM_OK;
}
/** /**
* Called by wterl_event_handler to set the pid for message delivery. * Called by wterl_event_handler to set the pid for message delivery.
*/ */
@ -2276,6 +2295,7 @@ static ErlNifFunc nif_funcs[] =
{"cursor_search_near_nif", 4, wterl_cursor_search_near}, {"cursor_search_near_nif", 4, wterl_cursor_search_near},
{"cursor_search_nif", 4, wterl_cursor_search}, {"cursor_search_nif", 4, wterl_cursor_search},
{"cursor_update_nif", 4, wterl_cursor_update}, {"cursor_update_nif", 4, wterl_cursor_update},
{"set_concurrency", 1, wterl_set_concurrency},
{"set_event_handler_pid", 1, wterl_set_event_handler_pid}, {"set_event_handler_pid", 1, wterl_set_event_handler_pid},
}; };

View file

@ -107,6 +107,12 @@ start(Partition, Config) ->
"lsm" "lsm"
end, end,
{ok, Connection} = establish_connection(Config, Type), {ok, Connection} = establish_connection(Config, Type),
case wterl_conn:count() > erlang:system_info(schedulers) of
true ->
ok = wterl:set_concurrency(wterl_conn:count());
false ->
ok = wterl:set_concurrency(erlang:system_info(schedulers))
end,
Table = Type ++ ":" ++ integer_to_list(Partition), Table = Type ++ ":" ++ integer_to_list(Partition),
Compressor = Compressor =
case wterl:config_value(block_compressor, Config, "snappy") of case wterl:config_value(block_compressor, Config, "snappy") of
@ -122,7 +128,7 @@ start(Partition, Config) ->
[{internal_page_max, "128K"}, [{internal_page_max, "128K"},
{leaf_page_max, "128K"}, {leaf_page_max, "128K"},
{lsm_chunk_size, "100MB"}, {lsm_chunk_size, "100MB"},
{lsm_merge_threads, "2"}, {lsm_merge_threads, 2},
{prefix_compression, false}, {prefix_compression, false},
{lsm_bloom_newest, true}, {lsm_bloom_newest, true},
{lsm_bloom_oldest, true} , {lsm_bloom_oldest, true} ,

View file

@ -65,7 +65,8 @@
config_value/3, config_value/3,
priv_dir/0, priv_dir/0,
fold_keys/3, fold_keys/3,
fold/3]). fold/3,
set_concurrency/1]).
-export([set_event_handler_pid/1]). -export([set_event_handler_pid/1]).
@ -514,6 +515,11 @@ set_event_handler_pid(Pid)
?nif_stub. ?nif_stub.
-spec set_concurrency(non_neg_integer()) -> ok | {error, term()}.
set_concurrency(_Count) ->
?nif_stub.
%% =================================================================== %% ===================================================================
%% EUnit tests %% EUnit tests
%% =================================================================== %% ===================================================================

View file

@ -30,13 +30,14 @@
%% API %% API
-export([start_link/0, stop/0, -export([start_link/0, stop/0,
open/1, open/2, open/3, is_open/0, get/0, close/1]). open/1, open/2, open/3, is_open/0, get/0, close/1, count/0]).
%% gen_server callbacks %% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]). terminate/2, code_change/3]).
-record(state, { conn :: wterl:connection() }). -record(state, { conn :: wterl:connection(),
count :: non_neg_integer() }).
-type config_list() :: [{atom(), any()}]. -type config_list() :: [{atom(), any()}].
@ -70,6 +71,10 @@ is_open() ->
get() -> get() ->
gen_server:call(?MODULE, get, infinity). gen_server:call(?MODULE, get, infinity).
-spec count() -> non_neg_integer().
count() ->
gen_server:call(?MODULE, count, infinity).
-spec close(wterl:connection()) -> ok. -spec close(wterl:connection()) -> ok.
close(_Conn) -> close(_Conn) ->
gen_server:call(?MODULE, {close, self()}, infinity). gen_server:call(?MODULE, {close, self()}, infinity).
@ -88,15 +93,15 @@ handle_call({open, Dir, ConnectionConfig, SessionConfig, Caller}, _From, #state{
{ok, ConnRef}=OK -> {ok, ConnRef}=OK ->
Monitor = erlang:monitor(process, Caller), Monitor = erlang:monitor(process, Caller),
true = ets:insert(wterl_ets, {Monitor, Caller}), true = ets:insert(wterl_ets, {Monitor, Caller}),
{OK, State#state{conn = ConnRef}}; {OK, State#state{conn = ConnRef, count = 1}};
Error -> Error ->
{Error, State} {Error, State}
end, end,
{reply, Reply, NState}; {reply, Reply, NState};
handle_call({open, _Dir, _ConnectionConfig, _SessionConfig, Caller}, _From, #state{conn=ConnRef}=State) -> handle_call({open, _Dir, _ConnectionConfig, _SessionConfig, Caller}, _From, #state{conn=ConnRef, count=Count}=State) ->
Monitor = erlang:monitor(process, Caller), Monitor = erlang:monitor(process, Caller),
true = ets:insert(wterl_ets, {Monitor, Caller}), true = ets:insert(wterl_ets, {Monitor, Caller}),
{reply, {ok, ConnRef}, State}; {reply, {ok, ConnRef}, State#state{count = Count + 1}};
handle_call(is_open, _From, #state{conn=ConnRef}=State) -> handle_call(is_open, _From, #state{conn=ConnRef}=State) ->
{reply, ConnRef /= undefined, State}; {reply, ConnRef /= undefined, State};
@ -106,18 +111,20 @@ handle_call(get, _From, #state{conn=undefined}=State) ->
handle_call(get, _From, #state{conn=ConnRef}=State) -> handle_call(get, _From, #state{conn=ConnRef}=State) ->
{reply, {ok, ConnRef}, State}; {reply, {ok, ConnRef}, State};
handle_call({close, Caller}, _From, #state{conn=ConnRef}=State) -> handle_call({close, Caller}, _From, #state{conn=ConnRef, count=Count}=State) ->
{[{Monitor, Caller}], _} = ets:match_object(wterl_ets, {'_', Caller}, 1), {[{Monitor, Caller}], _} = ets:match_object(wterl_ets, {'_', Caller}, 1),
true = erlang:demonitor(Monitor, [flush]), true = erlang:demonitor(Monitor, [flush]),
true = ets:delete(wterl_ets, Monitor), true = ets:delete(wterl_ets, Monitor),
NState = case ets:info(wterl_ets, size) of NState = case ets:info(wterl_ets, size) of
0 -> 0 ->
do_close(ConnRef), do_close(ConnRef),
State#state{conn=undefined}; State#state{conn=undefined, count=0};
_ -> _ ->
State State#state{count = Count - 1}
end, end,
{reply, ok, NState}; {reply, ok, NState};
handle_call(count, _From, #state{count=Count}=State) ->
{reply, Count, State};
handle_call(_Msg, _From, State) -> handle_call(_Msg, _From, State) ->
{reply, ok, State}. {reply, ok, State}.