Compare commits

...

1 commit

Author SHA1 Message Date
Gregory Burd
8e1e1b192d Allow clients of async_nif to set the number of concurrent workers. 2013-05-16 11:18:32 -04:00
7 changed files with 101 additions and 27 deletions

View file

@ -84,7 +84,7 @@ repl:
@$(ERL) -pa ebin -pz deps/lager/ebin
eunit-repl:
@$(ERL) -pa .eunit deps/lager/ebin
@$(ERL) -pa .eunit -pz ebin deps/lager/ebin
ERL_TOP= /home/gburd/eng/otp_R15B01
CERL= ${ERL_TOP}/bin/cerl

View file

@ -34,7 +34,7 @@ extern "C" {
#define __UNUSED(v) ((void)(v))
#endif
#define ASYNC_NIF_MAX_WORKERS 128
#define ASYNC_NIF_MAX_WORKERS 1024
#define ASYNC_NIF_WORKER_QUEUE_SIZE 500
#define ASYNC_NIF_MAX_QUEUED_REQS 1000 * ASYNC_NIF_MAX_WORKERS
@ -67,6 +67,7 @@ struct async_nif_worker_entry {
struct async_nif_state {
STAT_DEF(qwait);
unsigned int shutdown;
ErlNifMutex *worker_mutex;
unsigned int num_workers;
struct async_nif_worker_entry worker_entries[ASYNC_NIF_MAX_WORKERS];
unsigned int num_queues;
@ -168,6 +169,9 @@ struct async_nif_state {
#define ASYNC_NIF_REPLY(msg) enif_send(NULL, pid, env, enif_make_tuple2(env, ref, msg))
#define ASYNC_NIF_SET_MIN_WORKERS(n) \
async_nif_set_min_active_workers(*(struct async_nif_state**)enif_priv_data(env), (n))
/**
* Return a request structure from the recycled req queue if one exists,
* otherwise create one.
@ -425,10 +429,42 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
enif_mutex_unlock(async_nif->recycled_req_mutex);
enif_mutex_destroy(async_nif->recycled_req_mutex);
enif_mutex_destroy(async_nif->worker_mutex);
memset(async_nif, 0, sizeof(struct async_nif_state) + (sizeof(struct async_nif_work_queue) * async_nif->num_queues));
enif_free(async_nif);
}
static int
async_nif_set_min_active_workers(struct async_nif_state *async_nif, unsigned int n)
{
unsigned int i = 0;
enif_mutex_lock(async_nif->worker_mutex);
if (n <= 0 || n <= async_nif->num_workers || n > ASYNC_NIF_MAX_WORKERS)
return async_nif->num_workers;
/* Start the worker threads. */
for (i = async_nif->num_workers; i < n; i++) {
struct async_nif_worker_entry *we = &async_nif->worker_entries[i];
we->async_nif = async_nif;
we->worker_id = i;
we->q = &async_nif->queues[i % async_nif->num_queues];
if (enif_thread_create(NULL, &async_nif->worker_entries[i].tid,
&async_nif_worker_fn, (void*)we, NULL) != 0) {
/* error, return number of threads started so far */
we->async_nif = 0;
we->worker_id = 0;
we->q = 0;
return i;
} else {
async_nif->num_workers = i + 1;
}
}
enif_mutex_unlock(async_nif->worker_mutex);
return i;
}
static void *
async_nif_load()
{
@ -466,7 +502,6 @@ async_nif_load()
sizeof(struct async_nif_work_queue) * num_queues);
async_nif->num_queues = num_queues;
async_nif->num_workers = 2 * num_queues;
async_nif->next_q = 0;
async_nif->shutdown = 0;
async_nif->recycled_reqs = fifo_q_new(reqs, ASYNC_NIF_MAX_QUEUED_REQS);
@ -485,15 +520,12 @@ async_nif_load()
memset(async_nif->worker_entries, 0, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS);
/* Start the worker threads. */
for (i = 0; i < async_nif->num_workers; i++) {
struct async_nif_worker_entry *we = &async_nif->worker_entries[i];
we->async_nif = async_nif;
we->worker_id = i;
we->q = &async_nif->queues[i % async_nif->num_queues];
if (enif_thread_create(NULL, &async_nif->worker_entries[i].tid,
&async_nif_worker_fn, (void*)we, NULL) != 0) {
async_nif->worker_mutex = enif_mutex_create(NULL);
unsigned int started = async_nif_set_min_active_workers(async_nif, 2 * num_queues);
if (started != 2 * num_queues) {
/* Something went wrong, shut down. */
async_nif->shutdown = 1;
enif_mutex_lock(async_nif->worker_mutex);
for (j = 0; j < async_nif->num_queues; j++) {
struct async_nif_work_queue *q = &async_nif->queues[j];
enif_cond_broadcast(q->reqs_cnd);
@ -511,10 +543,12 @@ async_nif_load()
}
memset(async_nif->worker_entries, 0, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS);
enif_mutex_unlock(async_nif->worker_mutex);
enif_mutex_destroy(async_nif->worker_mutex);
enif_free(async_nif);
return NULL;
}
}
async_nif->num_workers = started;
return async_nif;
}

View file

@ -1,4 +1,4 @@
#!/bin/bash
#!/bin/bash -x
# /bin/sh on Solaris is not a POSIX compatible shell, but /usr/bin/ksh is.
if [ `uname -s` = 'SunOS' -a "${POSIX_SHELL}" != "true" ]; then
@ -39,7 +39,7 @@ get_wt ()
git clone ${WT_REPO} && \
(cd $BASEDIR/wiredtiger && git checkout $WT_VSN || exit 1)
else
git clone -b ${WT_BRANCH} ${WT_REPO} && \
git clone ${WT_REPO} && \
(cd $BASEDIR/wiredtiger && git checkout $WT_BRANCH origin/$WT_BRANCH || exit 1)
fi
mv wiredtiger $WT_DIR || exit 1
@ -49,8 +49,8 @@ get_wt ()
[ -e $BASEDIR/wiredtiger-build.patch ] && \
(patch -p1 --forward < $BASEDIR/wiredtiger-build.patch || exit 1 )
./autogen.sh || exit 1
cd ./build_posix || exit 1
[ -e Makefile ] && $MAKE distclean
[ -e $BASEDIR/$WT_DIR/build_posix/Makefile ] && \
(cd $BASEDIR/$WT_DIR/build_posix && $MAKE distclean)
wt_configure;
)
}
@ -109,7 +109,8 @@ build_snappy ()
case "$1" in
clean)
[ -d $WT_DIR/build_posix ] && (cd $WT_DIR/build_posix; make distclean)
[ -e $BASEDIR/$WT_DIR/build_posix/Makefile ] && \
(cd $BASEDIR/$WT_DIR/build_posix && $MAKE distclean)
rm -rf system $SNAPPY_DIR
rm -f ${BASEDIR}/../priv/wt
rm -f ${BASEDIR}/../priv/libwiredtiger-*.so

View file

@ -2024,6 +2024,25 @@ ASYNC_NIF_DECL(
});
/**
* Used to set the minimum number of worker threads.
*/
static ERL_NIF_TERM
wterl_set_concurrency(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
int n = 0;
if (!(argc == 1 && enif_is_number(env, argv[0]))) {
return enif_make_badarg(env);
}
enif_get_int(env, argv[0], &n);
ASYNC_NIF_SET_MIN_WORKERS(n);
return ATOM_OK;
}
/**
* Called by wterl_event_handler to set the pid for message delivery.
*/
@ -2276,6 +2295,7 @@ static ErlNifFunc nif_funcs[] =
{"cursor_search_near_nif", 4, wterl_cursor_search_near},
{"cursor_search_nif", 4, wterl_cursor_search},
{"cursor_update_nif", 4, wterl_cursor_update},
{"set_concurrency", 1, wterl_set_concurrency},
{"set_event_handler_pid", 1, wterl_set_event_handler_pid},
};

View file

@ -107,6 +107,12 @@ start(Partition, Config) ->
"lsm"
end,
{ok, Connection} = establish_connection(Config, Type),
case wterl_conn:count() > erlang:system_info(schedulers) of
true ->
ok = wterl:set_concurrency(wterl_conn:count());
false ->
ok = wterl:set_concurrency(erlang:system_info(schedulers))
end,
Table = Type ++ ":" ++ integer_to_list(Partition),
Compressor =
case wterl:config_value(block_compressor, Config, "snappy") of
@ -122,7 +128,7 @@ start(Partition, Config) ->
[{internal_page_max, "128K"},
{leaf_page_max, "128K"},
{lsm_chunk_size, "100MB"},
{lsm_merge_threads, "2"},
{lsm_merge_threads, 2},
{prefix_compression, false},
{lsm_bloom_newest, true},
{lsm_bloom_oldest, true} ,

View file

@ -65,7 +65,8 @@
config_value/3,
priv_dir/0,
fold_keys/3,
fold/3]).
fold/3,
set_concurrency/1]).
-export([set_event_handler_pid/1]).
@ -514,6 +515,11 @@ set_event_handler_pid(Pid)
?nif_stub.
-spec set_concurrency(non_neg_integer()) -> ok | {error, term()}.
set_concurrency(_Count) ->
?nif_stub.
%% ===================================================================
%% EUnit tests
%% ===================================================================

View file

@ -30,13 +30,14 @@
%% API
-export([start_link/0, stop/0,
open/1, open/2, open/3, is_open/0, get/0, close/1]).
open/1, open/2, open/3, is_open/0, get/0, close/1, count/0]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-record(state, { conn :: wterl:connection() }).
-record(state, { conn :: wterl:connection(),
count :: non_neg_integer() }).
-type config_list() :: [{atom(), any()}].
@ -70,6 +71,10 @@ is_open() ->
get() ->
gen_server:call(?MODULE, get, infinity).
-spec count() -> non_neg_integer().
count() ->
gen_server:call(?MODULE, count, infinity).
-spec close(wterl:connection()) -> ok.
close(_Conn) ->
gen_server:call(?MODULE, {close, self()}, infinity).
@ -88,15 +93,15 @@ handle_call({open, Dir, ConnectionConfig, SessionConfig, Caller}, _From, #state{
{ok, ConnRef}=OK ->
Monitor = erlang:monitor(process, Caller),
true = ets:insert(wterl_ets, {Monitor, Caller}),
{OK, State#state{conn = ConnRef}};
{OK, State#state{conn = ConnRef, count = 1}};
Error ->
{Error, State}
end,
{reply, Reply, NState};
handle_call({open, _Dir, _ConnectionConfig, _SessionConfig, Caller}, _From, #state{conn=ConnRef}=State) ->
handle_call({open, _Dir, _ConnectionConfig, _SessionConfig, Caller}, _From, #state{conn=ConnRef, count=Count}=State) ->
Monitor = erlang:monitor(process, Caller),
true = ets:insert(wterl_ets, {Monitor, Caller}),
{reply, {ok, ConnRef}, State};
{reply, {ok, ConnRef}, State#state{count = Count + 1}};
handle_call(is_open, _From, #state{conn=ConnRef}=State) ->
{reply, ConnRef /= undefined, State};
@ -106,18 +111,20 @@ handle_call(get, _From, #state{conn=undefined}=State) ->
handle_call(get, _From, #state{conn=ConnRef}=State) ->
{reply, {ok, ConnRef}, State};
handle_call({close, Caller}, _From, #state{conn=ConnRef}=State) ->
handle_call({close, Caller}, _From, #state{conn=ConnRef, count=Count}=State) ->
{[{Monitor, Caller}], _} = ets:match_object(wterl_ets, {'_', Caller}, 1),
true = erlang:demonitor(Monitor, [flush]),
true = ets:delete(wterl_ets, Monitor),
NState = case ets:info(wterl_ets, size) of
0 ->
do_close(ConnRef),
State#state{conn=undefined};
State#state{conn=undefined, count=0};
_ ->
State
State#state{count = Count - 1}
end,
{reply, ok, NState};
handle_call(count, _From, #state{count=Count}=State) ->
{reply, Count, State};
handle_call(_Msg, _From, State) ->
{reply, ok, State}.