Allow clients of async_nif to set the number of concurrent workers.
This commit is contained in:
parent
013251e6d9
commit
8e1e1b192d
7 changed files with 101 additions and 27 deletions
2
Makefile
2
Makefile
|
@ -84,7 +84,7 @@ repl:
|
|||
@$(ERL) -pa ebin -pz deps/lager/ebin
|
||||
|
||||
eunit-repl:
|
||||
@$(ERL) -pa .eunit deps/lager/ebin
|
||||
@$(ERL) -pa .eunit -pz ebin deps/lager/ebin
|
||||
|
||||
ERL_TOP= /home/gburd/eng/otp_R15B01
|
||||
CERL= ${ERL_TOP}/bin/cerl
|
||||
|
|
|
@ -34,7 +34,7 @@ extern "C" {
|
|||
#define __UNUSED(v) ((void)(v))
|
||||
#endif
|
||||
|
||||
#define ASYNC_NIF_MAX_WORKERS 128
|
||||
#define ASYNC_NIF_MAX_WORKERS 1024
|
||||
#define ASYNC_NIF_WORKER_QUEUE_SIZE 500
|
||||
#define ASYNC_NIF_MAX_QUEUED_REQS 1000 * ASYNC_NIF_MAX_WORKERS
|
||||
|
||||
|
@ -67,6 +67,7 @@ struct async_nif_worker_entry {
|
|||
struct async_nif_state {
|
||||
STAT_DEF(qwait);
|
||||
unsigned int shutdown;
|
||||
ErlNifMutex *worker_mutex;
|
||||
unsigned int num_workers;
|
||||
struct async_nif_worker_entry worker_entries[ASYNC_NIF_MAX_WORKERS];
|
||||
unsigned int num_queues;
|
||||
|
@ -168,6 +169,9 @@ struct async_nif_state {
|
|||
|
||||
#define ASYNC_NIF_REPLY(msg) enif_send(NULL, pid, env, enif_make_tuple2(env, ref, msg))
|
||||
|
||||
#define ASYNC_NIF_SET_MIN_WORKERS(n) \
|
||||
async_nif_set_min_active_workers(*(struct async_nif_state**)enif_priv_data(env), (n))
|
||||
|
||||
/**
|
||||
* Return a request structure from the recycled req queue if one exists,
|
||||
* otherwise create one.
|
||||
|
@ -425,10 +429,42 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
|
|||
|
||||
enif_mutex_unlock(async_nif->recycled_req_mutex);
|
||||
enif_mutex_destroy(async_nif->recycled_req_mutex);
|
||||
enif_mutex_destroy(async_nif->worker_mutex);
|
||||
memset(async_nif, 0, sizeof(struct async_nif_state) + (sizeof(struct async_nif_work_queue) * async_nif->num_queues));
|
||||
enif_free(async_nif);
|
||||
}
|
||||
|
||||
static int
|
||||
async_nif_set_min_active_workers(struct async_nif_state *async_nif, unsigned int n)
|
||||
{
|
||||
unsigned int i = 0;
|
||||
|
||||
enif_mutex_lock(async_nif->worker_mutex);
|
||||
|
||||
if (n <= 0 || n <= async_nif->num_workers || n > ASYNC_NIF_MAX_WORKERS)
|
||||
return async_nif->num_workers;
|
||||
|
||||
/* Start the worker threads. */
|
||||
for (i = async_nif->num_workers; i < n; i++) {
|
||||
struct async_nif_worker_entry *we = &async_nif->worker_entries[i];
|
||||
we->async_nif = async_nif;
|
||||
we->worker_id = i;
|
||||
we->q = &async_nif->queues[i % async_nif->num_queues];
|
||||
if (enif_thread_create(NULL, &async_nif->worker_entries[i].tid,
|
||||
&async_nif_worker_fn, (void*)we, NULL) != 0) {
|
||||
/* error, return number of threads started so far */
|
||||
we->async_nif = 0;
|
||||
we->worker_id = 0;
|
||||
we->q = 0;
|
||||
return i;
|
||||
} else {
|
||||
async_nif->num_workers = i + 1;
|
||||
}
|
||||
}
|
||||
enif_mutex_unlock(async_nif->worker_mutex);
|
||||
return i;
|
||||
}
|
||||
|
||||
static void *
|
||||
async_nif_load()
|
||||
{
|
||||
|
@ -466,7 +502,6 @@ async_nif_load()
|
|||
sizeof(struct async_nif_work_queue) * num_queues);
|
||||
|
||||
async_nif->num_queues = num_queues;
|
||||
async_nif->num_workers = 2 * num_queues;
|
||||
async_nif->next_q = 0;
|
||||
async_nif->shutdown = 0;
|
||||
async_nif->recycled_reqs = fifo_q_new(reqs, ASYNC_NIF_MAX_QUEUED_REQS);
|
||||
|
@ -485,15 +520,12 @@ async_nif_load()
|
|||
memset(async_nif->worker_entries, 0, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS);
|
||||
|
||||
/* Start the worker threads. */
|
||||
for (i = 0; i < async_nif->num_workers; i++) {
|
||||
struct async_nif_worker_entry *we = &async_nif->worker_entries[i];
|
||||
we->async_nif = async_nif;
|
||||
we->worker_id = i;
|
||||
we->q = &async_nif->queues[i % async_nif->num_queues];
|
||||
if (enif_thread_create(NULL, &async_nif->worker_entries[i].tid,
|
||||
&async_nif_worker_fn, (void*)we, NULL) != 0) {
|
||||
async_nif->worker_mutex = enif_mutex_create(NULL);
|
||||
unsigned int started = async_nif_set_min_active_workers(async_nif, 2 * num_queues);
|
||||
if (started != 2 * num_queues) {
|
||||
/* Something went wrong, shut down. */
|
||||
async_nif->shutdown = 1;
|
||||
|
||||
enif_mutex_lock(async_nif->worker_mutex);
|
||||
for (j = 0; j < async_nif->num_queues; j++) {
|
||||
struct async_nif_work_queue *q = &async_nif->queues[j];
|
||||
enif_cond_broadcast(q->reqs_cnd);
|
||||
|
@ -511,10 +543,12 @@ async_nif_load()
|
|||
}
|
||||
|
||||
memset(async_nif->worker_entries, 0, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS);
|
||||
enif_mutex_unlock(async_nif->worker_mutex);
|
||||
enif_mutex_destroy(async_nif->worker_mutex);
|
||||
enif_free(async_nif);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
async_nif->num_workers = started;
|
||||
return async_nif;
|
||||
}
|
||||
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
#!/bin/bash
|
||||
#!/bin/bash -x
|
||||
|
||||
# /bin/sh on Solaris is not a POSIX compatible shell, but /usr/bin/ksh is.
|
||||
if [ `uname -s` = 'SunOS' -a "${POSIX_SHELL}" != "true" ]; then
|
||||
|
@ -39,7 +39,7 @@ get_wt ()
|
|||
git clone ${WT_REPO} && \
|
||||
(cd $BASEDIR/wiredtiger && git checkout $WT_VSN || exit 1)
|
||||
else
|
||||
git clone -b ${WT_BRANCH} ${WT_REPO} && \
|
||||
git clone ${WT_REPO} && \
|
||||
(cd $BASEDIR/wiredtiger && git checkout $WT_BRANCH origin/$WT_BRANCH || exit 1)
|
||||
fi
|
||||
mv wiredtiger $WT_DIR || exit 1
|
||||
|
@ -49,8 +49,8 @@ get_wt ()
|
|||
[ -e $BASEDIR/wiredtiger-build.patch ] && \
|
||||
(patch -p1 --forward < $BASEDIR/wiredtiger-build.patch || exit 1 )
|
||||
./autogen.sh || exit 1
|
||||
cd ./build_posix || exit 1
|
||||
[ -e Makefile ] && $MAKE distclean
|
||||
[ -e $BASEDIR/$WT_DIR/build_posix/Makefile ] && \
|
||||
(cd $BASEDIR/$WT_DIR/build_posix && $MAKE distclean)
|
||||
wt_configure;
|
||||
)
|
||||
}
|
||||
|
@ -109,7 +109,8 @@ build_snappy ()
|
|||
|
||||
case "$1" in
|
||||
clean)
|
||||
[ -d $WT_DIR/build_posix ] && (cd $WT_DIR/build_posix; make distclean)
|
||||
[ -e $BASEDIR/$WT_DIR/build_posix/Makefile ] && \
|
||||
(cd $BASEDIR/$WT_DIR/build_posix && $MAKE distclean)
|
||||
rm -rf system $SNAPPY_DIR
|
||||
rm -f ${BASEDIR}/../priv/wt
|
||||
rm -f ${BASEDIR}/../priv/libwiredtiger-*.so
|
||||
|
|
|
@ -2024,6 +2024,25 @@ ASYNC_NIF_DECL(
|
|||
});
|
||||
|
||||
|
||||
/**
|
||||
* Used to set the minimum number of worker threads.
|
||||
*/
|
||||
static ERL_NIF_TERM
|
||||
wterl_set_concurrency(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
|
||||
{
|
||||
int n = 0;
|
||||
|
||||
if (!(argc == 1 && enif_is_number(env, argv[0]))) {
|
||||
return enif_make_badarg(env);
|
||||
}
|
||||
|
||||
enif_get_int(env, argv[0], &n);
|
||||
ASYNC_NIF_SET_MIN_WORKERS(n);
|
||||
|
||||
return ATOM_OK;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Called by wterl_event_handler to set the pid for message delivery.
|
||||
*/
|
||||
|
@ -2276,6 +2295,7 @@ static ErlNifFunc nif_funcs[] =
|
|||
{"cursor_search_near_nif", 4, wterl_cursor_search_near},
|
||||
{"cursor_search_nif", 4, wterl_cursor_search},
|
||||
{"cursor_update_nif", 4, wterl_cursor_update},
|
||||
{"set_concurrency", 1, wterl_set_concurrency},
|
||||
{"set_event_handler_pid", 1, wterl_set_event_handler_pid},
|
||||
};
|
||||
|
||||
|
|
|
@ -107,6 +107,12 @@ start(Partition, Config) ->
|
|||
"lsm"
|
||||
end,
|
||||
{ok, Connection} = establish_connection(Config, Type),
|
||||
case wterl_conn:count() > erlang:system_info(schedulers) of
|
||||
true ->
|
||||
ok = wterl:set_concurrency(wterl_conn:count());
|
||||
false ->
|
||||
ok = wterl:set_concurrency(erlang:system_info(schedulers))
|
||||
end,
|
||||
Table = Type ++ ":" ++ integer_to_list(Partition),
|
||||
Compressor =
|
||||
case wterl:config_value(block_compressor, Config, "snappy") of
|
||||
|
@ -122,7 +128,7 @@ start(Partition, Config) ->
|
|||
[{internal_page_max, "128K"},
|
||||
{leaf_page_max, "128K"},
|
||||
{lsm_chunk_size, "100MB"},
|
||||
{lsm_merge_threads, "2"},
|
||||
{lsm_merge_threads, 2},
|
||||
{prefix_compression, false},
|
||||
{lsm_bloom_newest, true},
|
||||
{lsm_bloom_oldest, true} ,
|
||||
|
|
|
@ -65,7 +65,8 @@
|
|||
config_value/3,
|
||||
priv_dir/0,
|
||||
fold_keys/3,
|
||||
fold/3]).
|
||||
fold/3,
|
||||
set_concurrency/1]).
|
||||
|
||||
-export([set_event_handler_pid/1]).
|
||||
|
||||
|
@ -514,6 +515,11 @@ set_event_handler_pid(Pid)
|
|||
?nif_stub.
|
||||
|
||||
|
||||
-spec set_concurrency(non_neg_integer()) -> ok | {error, term()}.
|
||||
set_concurrency(_Count) ->
|
||||
?nif_stub.
|
||||
|
||||
|
||||
%% ===================================================================
|
||||
%% EUnit tests
|
||||
%% ===================================================================
|
||||
|
|
|
@ -30,13 +30,14 @@
|
|||
|
||||
%% API
|
||||
-export([start_link/0, stop/0,
|
||||
open/1, open/2, open/3, is_open/0, get/0, close/1]).
|
||||
open/1, open/2, open/3, is_open/0, get/0, close/1, count/0]).
|
||||
|
||||
%% gen_server callbacks
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||
terminate/2, code_change/3]).
|
||||
|
||||
-record(state, { conn :: wterl:connection() }).
|
||||
-record(state, { conn :: wterl:connection(),
|
||||
count :: non_neg_integer() }).
|
||||
|
||||
-type config_list() :: [{atom(), any()}].
|
||||
|
||||
|
@ -70,6 +71,10 @@ is_open() ->
|
|||
get() ->
|
||||
gen_server:call(?MODULE, get, infinity).
|
||||
|
||||
-spec count() -> non_neg_integer().
|
||||
count() ->
|
||||
gen_server:call(?MODULE, count, infinity).
|
||||
|
||||
-spec close(wterl:connection()) -> ok.
|
||||
close(_Conn) ->
|
||||
gen_server:call(?MODULE, {close, self()}, infinity).
|
||||
|
@ -88,15 +93,15 @@ handle_call({open, Dir, ConnectionConfig, SessionConfig, Caller}, _From, #state{
|
|||
{ok, ConnRef}=OK ->
|
||||
Monitor = erlang:monitor(process, Caller),
|
||||
true = ets:insert(wterl_ets, {Monitor, Caller}),
|
||||
{OK, State#state{conn = ConnRef}};
|
||||
{OK, State#state{conn = ConnRef, count = 1}};
|
||||
Error ->
|
||||
{Error, State}
|
||||
end,
|
||||
{reply, Reply, NState};
|
||||
handle_call({open, _Dir, _ConnectionConfig, _SessionConfig, Caller}, _From, #state{conn=ConnRef}=State) ->
|
||||
handle_call({open, _Dir, _ConnectionConfig, _SessionConfig, Caller}, _From, #state{conn=ConnRef, count=Count}=State) ->
|
||||
Monitor = erlang:monitor(process, Caller),
|
||||
true = ets:insert(wterl_ets, {Monitor, Caller}),
|
||||
{reply, {ok, ConnRef}, State};
|
||||
{reply, {ok, ConnRef}, State#state{count = Count + 1}};
|
||||
|
||||
handle_call(is_open, _From, #state{conn=ConnRef}=State) ->
|
||||
{reply, ConnRef /= undefined, State};
|
||||
|
@ -106,18 +111,20 @@ handle_call(get, _From, #state{conn=undefined}=State) ->
|
|||
handle_call(get, _From, #state{conn=ConnRef}=State) ->
|
||||
{reply, {ok, ConnRef}, State};
|
||||
|
||||
handle_call({close, Caller}, _From, #state{conn=ConnRef}=State) ->
|
||||
handle_call({close, Caller}, _From, #state{conn=ConnRef, count=Count}=State) ->
|
||||
{[{Monitor, Caller}], _} = ets:match_object(wterl_ets, {'_', Caller}, 1),
|
||||
true = erlang:demonitor(Monitor, [flush]),
|
||||
true = ets:delete(wterl_ets, Monitor),
|
||||
NState = case ets:info(wterl_ets, size) of
|
||||
0 ->
|
||||
do_close(ConnRef),
|
||||
State#state{conn=undefined};
|
||||
State#state{conn=undefined, count=0};
|
||||
_ ->
|
||||
State
|
||||
State#state{count = Count - 1}
|
||||
end,
|
||||
{reply, ok, NState};
|
||||
handle_call(count, _From, #state{count=Count}=State) ->
|
||||
{reply, Count, State};
|
||||
handle_call(_Msg, _From, State) ->
|
||||
{reply, ok, State}.
|
||||
|
||||
|
|
Loading…
Reference in a new issue