Compare commits
1 commit
master
...
gsb-affini
Author | SHA1 | Date | |
---|---|---|---|
|
8e1e1b192d |
7 changed files with 101 additions and 27 deletions
2
Makefile
2
Makefile
|
@ -84,7 +84,7 @@ repl:
|
||||||
@$(ERL) -pa ebin -pz deps/lager/ebin
|
@$(ERL) -pa ebin -pz deps/lager/ebin
|
||||||
|
|
||||||
eunit-repl:
|
eunit-repl:
|
||||||
@$(ERL) -pa .eunit deps/lager/ebin
|
@$(ERL) -pa .eunit -pz ebin deps/lager/ebin
|
||||||
|
|
||||||
ERL_TOP= /home/gburd/eng/otp_R15B01
|
ERL_TOP= /home/gburd/eng/otp_R15B01
|
||||||
CERL= ${ERL_TOP}/bin/cerl
|
CERL= ${ERL_TOP}/bin/cerl
|
||||||
|
|
|
@ -34,7 +34,7 @@ extern "C" {
|
||||||
#define __UNUSED(v) ((void)(v))
|
#define __UNUSED(v) ((void)(v))
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#define ASYNC_NIF_MAX_WORKERS 128
|
#define ASYNC_NIF_MAX_WORKERS 1024
|
||||||
#define ASYNC_NIF_WORKER_QUEUE_SIZE 500
|
#define ASYNC_NIF_WORKER_QUEUE_SIZE 500
|
||||||
#define ASYNC_NIF_MAX_QUEUED_REQS 1000 * ASYNC_NIF_MAX_WORKERS
|
#define ASYNC_NIF_MAX_QUEUED_REQS 1000 * ASYNC_NIF_MAX_WORKERS
|
||||||
|
|
||||||
|
@ -67,6 +67,7 @@ struct async_nif_worker_entry {
|
||||||
struct async_nif_state {
|
struct async_nif_state {
|
||||||
STAT_DEF(qwait);
|
STAT_DEF(qwait);
|
||||||
unsigned int shutdown;
|
unsigned int shutdown;
|
||||||
|
ErlNifMutex *worker_mutex;
|
||||||
unsigned int num_workers;
|
unsigned int num_workers;
|
||||||
struct async_nif_worker_entry worker_entries[ASYNC_NIF_MAX_WORKERS];
|
struct async_nif_worker_entry worker_entries[ASYNC_NIF_MAX_WORKERS];
|
||||||
unsigned int num_queues;
|
unsigned int num_queues;
|
||||||
|
@ -168,6 +169,9 @@ struct async_nif_state {
|
||||||
|
|
||||||
#define ASYNC_NIF_REPLY(msg) enif_send(NULL, pid, env, enif_make_tuple2(env, ref, msg))
|
#define ASYNC_NIF_REPLY(msg) enif_send(NULL, pid, env, enif_make_tuple2(env, ref, msg))
|
||||||
|
|
||||||
|
#define ASYNC_NIF_SET_MIN_WORKERS(n) \
|
||||||
|
async_nif_set_min_active_workers(*(struct async_nif_state**)enif_priv_data(env), (n))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return a request structure from the recycled req queue if one exists,
|
* Return a request structure from the recycled req queue if one exists,
|
||||||
* otherwise create one.
|
* otherwise create one.
|
||||||
|
@ -425,10 +429,42 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif)
|
||||||
|
|
||||||
enif_mutex_unlock(async_nif->recycled_req_mutex);
|
enif_mutex_unlock(async_nif->recycled_req_mutex);
|
||||||
enif_mutex_destroy(async_nif->recycled_req_mutex);
|
enif_mutex_destroy(async_nif->recycled_req_mutex);
|
||||||
|
enif_mutex_destroy(async_nif->worker_mutex);
|
||||||
memset(async_nif, 0, sizeof(struct async_nif_state) + (sizeof(struct async_nif_work_queue) * async_nif->num_queues));
|
memset(async_nif, 0, sizeof(struct async_nif_state) + (sizeof(struct async_nif_work_queue) * async_nif->num_queues));
|
||||||
enif_free(async_nif);
|
enif_free(async_nif);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int
|
||||||
|
async_nif_set_min_active_workers(struct async_nif_state *async_nif, unsigned int n)
|
||||||
|
{
|
||||||
|
unsigned int i = 0;
|
||||||
|
|
||||||
|
enif_mutex_lock(async_nif->worker_mutex);
|
||||||
|
|
||||||
|
if (n <= 0 || n <= async_nif->num_workers || n > ASYNC_NIF_MAX_WORKERS)
|
||||||
|
return async_nif->num_workers;
|
||||||
|
|
||||||
|
/* Start the worker threads. */
|
||||||
|
for (i = async_nif->num_workers; i < n; i++) {
|
||||||
|
struct async_nif_worker_entry *we = &async_nif->worker_entries[i];
|
||||||
|
we->async_nif = async_nif;
|
||||||
|
we->worker_id = i;
|
||||||
|
we->q = &async_nif->queues[i % async_nif->num_queues];
|
||||||
|
if (enif_thread_create(NULL, &async_nif->worker_entries[i].tid,
|
||||||
|
&async_nif_worker_fn, (void*)we, NULL) != 0) {
|
||||||
|
/* error, return number of threads started so far */
|
||||||
|
we->async_nif = 0;
|
||||||
|
we->worker_id = 0;
|
||||||
|
we->q = 0;
|
||||||
|
return i;
|
||||||
|
} else {
|
||||||
|
async_nif->num_workers = i + 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
enif_mutex_unlock(async_nif->worker_mutex);
|
||||||
|
return i;
|
||||||
|
}
|
||||||
|
|
||||||
static void *
|
static void *
|
||||||
async_nif_load()
|
async_nif_load()
|
||||||
{
|
{
|
||||||
|
@ -466,7 +502,6 @@ async_nif_load()
|
||||||
sizeof(struct async_nif_work_queue) * num_queues);
|
sizeof(struct async_nif_work_queue) * num_queues);
|
||||||
|
|
||||||
async_nif->num_queues = num_queues;
|
async_nif->num_queues = num_queues;
|
||||||
async_nif->num_workers = 2 * num_queues;
|
|
||||||
async_nif->next_q = 0;
|
async_nif->next_q = 0;
|
||||||
async_nif->shutdown = 0;
|
async_nif->shutdown = 0;
|
||||||
async_nif->recycled_reqs = fifo_q_new(reqs, ASYNC_NIF_MAX_QUEUED_REQS);
|
async_nif->recycled_reqs = fifo_q_new(reqs, ASYNC_NIF_MAX_QUEUED_REQS);
|
||||||
|
@ -485,15 +520,12 @@ async_nif_load()
|
||||||
memset(async_nif->worker_entries, 0, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS);
|
memset(async_nif->worker_entries, 0, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS);
|
||||||
|
|
||||||
/* Start the worker threads. */
|
/* Start the worker threads. */
|
||||||
for (i = 0; i < async_nif->num_workers; i++) {
|
async_nif->worker_mutex = enif_mutex_create(NULL);
|
||||||
struct async_nif_worker_entry *we = &async_nif->worker_entries[i];
|
unsigned int started = async_nif_set_min_active_workers(async_nif, 2 * num_queues);
|
||||||
we->async_nif = async_nif;
|
if (started != 2 * num_queues) {
|
||||||
we->worker_id = i;
|
/* Something went wrong, shut down. */
|
||||||
we->q = &async_nif->queues[i % async_nif->num_queues];
|
|
||||||
if (enif_thread_create(NULL, &async_nif->worker_entries[i].tid,
|
|
||||||
&async_nif_worker_fn, (void*)we, NULL) != 0) {
|
|
||||||
async_nif->shutdown = 1;
|
async_nif->shutdown = 1;
|
||||||
|
enif_mutex_lock(async_nif->worker_mutex);
|
||||||
for (j = 0; j < async_nif->num_queues; j++) {
|
for (j = 0; j < async_nif->num_queues; j++) {
|
||||||
struct async_nif_work_queue *q = &async_nif->queues[j];
|
struct async_nif_work_queue *q = &async_nif->queues[j];
|
||||||
enif_cond_broadcast(q->reqs_cnd);
|
enif_cond_broadcast(q->reqs_cnd);
|
||||||
|
@ -511,10 +543,12 @@ async_nif_load()
|
||||||
}
|
}
|
||||||
|
|
||||||
memset(async_nif->worker_entries, 0, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS);
|
memset(async_nif->worker_entries, 0, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS);
|
||||||
|
enif_mutex_unlock(async_nif->worker_mutex);
|
||||||
|
enif_mutex_destroy(async_nif->worker_mutex);
|
||||||
enif_free(async_nif);
|
enif_free(async_nif);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
async_nif->num_workers = started;
|
||||||
return async_nif;
|
return async_nif;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
#!/bin/bash
|
#!/bin/bash -x
|
||||||
|
|
||||||
# /bin/sh on Solaris is not a POSIX compatible shell, but /usr/bin/ksh is.
|
# /bin/sh on Solaris is not a POSIX compatible shell, but /usr/bin/ksh is.
|
||||||
if [ `uname -s` = 'SunOS' -a "${POSIX_SHELL}" != "true" ]; then
|
if [ `uname -s` = 'SunOS' -a "${POSIX_SHELL}" != "true" ]; then
|
||||||
|
@ -39,7 +39,7 @@ get_wt ()
|
||||||
git clone ${WT_REPO} && \
|
git clone ${WT_REPO} && \
|
||||||
(cd $BASEDIR/wiredtiger && git checkout $WT_VSN || exit 1)
|
(cd $BASEDIR/wiredtiger && git checkout $WT_VSN || exit 1)
|
||||||
else
|
else
|
||||||
git clone -b ${WT_BRANCH} ${WT_REPO} && \
|
git clone ${WT_REPO} && \
|
||||||
(cd $BASEDIR/wiredtiger && git checkout $WT_BRANCH origin/$WT_BRANCH || exit 1)
|
(cd $BASEDIR/wiredtiger && git checkout $WT_BRANCH origin/$WT_BRANCH || exit 1)
|
||||||
fi
|
fi
|
||||||
mv wiredtiger $WT_DIR || exit 1
|
mv wiredtiger $WT_DIR || exit 1
|
||||||
|
@ -49,8 +49,8 @@ get_wt ()
|
||||||
[ -e $BASEDIR/wiredtiger-build.patch ] && \
|
[ -e $BASEDIR/wiredtiger-build.patch ] && \
|
||||||
(patch -p1 --forward < $BASEDIR/wiredtiger-build.patch || exit 1 )
|
(patch -p1 --forward < $BASEDIR/wiredtiger-build.patch || exit 1 )
|
||||||
./autogen.sh || exit 1
|
./autogen.sh || exit 1
|
||||||
cd ./build_posix || exit 1
|
[ -e $BASEDIR/$WT_DIR/build_posix/Makefile ] && \
|
||||||
[ -e Makefile ] && $MAKE distclean
|
(cd $BASEDIR/$WT_DIR/build_posix && $MAKE distclean)
|
||||||
wt_configure;
|
wt_configure;
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
@ -109,7 +109,8 @@ build_snappy ()
|
||||||
|
|
||||||
case "$1" in
|
case "$1" in
|
||||||
clean)
|
clean)
|
||||||
[ -d $WT_DIR/build_posix ] && (cd $WT_DIR/build_posix; make distclean)
|
[ -e $BASEDIR/$WT_DIR/build_posix/Makefile ] && \
|
||||||
|
(cd $BASEDIR/$WT_DIR/build_posix && $MAKE distclean)
|
||||||
rm -rf system $SNAPPY_DIR
|
rm -rf system $SNAPPY_DIR
|
||||||
rm -f ${BASEDIR}/../priv/wt
|
rm -f ${BASEDIR}/../priv/wt
|
||||||
rm -f ${BASEDIR}/../priv/libwiredtiger-*.so
|
rm -f ${BASEDIR}/../priv/libwiredtiger-*.so
|
||||||
|
|
|
@ -2024,6 +2024,25 @@ ASYNC_NIF_DECL(
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Used to set the minimum number of worker threads.
|
||||||
|
*/
|
||||||
|
static ERL_NIF_TERM
|
||||||
|
wterl_set_concurrency(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
|
||||||
|
{
|
||||||
|
int n = 0;
|
||||||
|
|
||||||
|
if (!(argc == 1 && enif_is_number(env, argv[0]))) {
|
||||||
|
return enif_make_badarg(env);
|
||||||
|
}
|
||||||
|
|
||||||
|
enif_get_int(env, argv[0], &n);
|
||||||
|
ASYNC_NIF_SET_MIN_WORKERS(n);
|
||||||
|
|
||||||
|
return ATOM_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Called by wterl_event_handler to set the pid for message delivery.
|
* Called by wterl_event_handler to set the pid for message delivery.
|
||||||
*/
|
*/
|
||||||
|
@ -2276,6 +2295,7 @@ static ErlNifFunc nif_funcs[] =
|
||||||
{"cursor_search_near_nif", 4, wterl_cursor_search_near},
|
{"cursor_search_near_nif", 4, wterl_cursor_search_near},
|
||||||
{"cursor_search_nif", 4, wterl_cursor_search},
|
{"cursor_search_nif", 4, wterl_cursor_search},
|
||||||
{"cursor_update_nif", 4, wterl_cursor_update},
|
{"cursor_update_nif", 4, wterl_cursor_update},
|
||||||
|
{"set_concurrency", 1, wterl_set_concurrency},
|
||||||
{"set_event_handler_pid", 1, wterl_set_event_handler_pid},
|
{"set_event_handler_pid", 1, wterl_set_event_handler_pid},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -107,6 +107,12 @@ start(Partition, Config) ->
|
||||||
"lsm"
|
"lsm"
|
||||||
end,
|
end,
|
||||||
{ok, Connection} = establish_connection(Config, Type),
|
{ok, Connection} = establish_connection(Config, Type),
|
||||||
|
case wterl_conn:count() > erlang:system_info(schedulers) of
|
||||||
|
true ->
|
||||||
|
ok = wterl:set_concurrency(wterl_conn:count());
|
||||||
|
false ->
|
||||||
|
ok = wterl:set_concurrency(erlang:system_info(schedulers))
|
||||||
|
end,
|
||||||
Table = Type ++ ":" ++ integer_to_list(Partition),
|
Table = Type ++ ":" ++ integer_to_list(Partition),
|
||||||
Compressor =
|
Compressor =
|
||||||
case wterl:config_value(block_compressor, Config, "snappy") of
|
case wterl:config_value(block_compressor, Config, "snappy") of
|
||||||
|
@ -122,7 +128,7 @@ start(Partition, Config) ->
|
||||||
[{internal_page_max, "128K"},
|
[{internal_page_max, "128K"},
|
||||||
{leaf_page_max, "128K"},
|
{leaf_page_max, "128K"},
|
||||||
{lsm_chunk_size, "100MB"},
|
{lsm_chunk_size, "100MB"},
|
||||||
{lsm_merge_threads, "2"},
|
{lsm_merge_threads, 2},
|
||||||
{prefix_compression, false},
|
{prefix_compression, false},
|
||||||
{lsm_bloom_newest, true},
|
{lsm_bloom_newest, true},
|
||||||
{lsm_bloom_oldest, true} ,
|
{lsm_bloom_oldest, true} ,
|
||||||
|
|
|
@ -65,7 +65,8 @@
|
||||||
config_value/3,
|
config_value/3,
|
||||||
priv_dir/0,
|
priv_dir/0,
|
||||||
fold_keys/3,
|
fold_keys/3,
|
||||||
fold/3]).
|
fold/3,
|
||||||
|
set_concurrency/1]).
|
||||||
|
|
||||||
-export([set_event_handler_pid/1]).
|
-export([set_event_handler_pid/1]).
|
||||||
|
|
||||||
|
@ -514,6 +515,11 @@ set_event_handler_pid(Pid)
|
||||||
?nif_stub.
|
?nif_stub.
|
||||||
|
|
||||||
|
|
||||||
|
-spec set_concurrency(non_neg_integer()) -> ok | {error, term()}.
|
||||||
|
set_concurrency(_Count) ->
|
||||||
|
?nif_stub.
|
||||||
|
|
||||||
|
|
||||||
%% ===================================================================
|
%% ===================================================================
|
||||||
%% EUnit tests
|
%% EUnit tests
|
||||||
%% ===================================================================
|
%% ===================================================================
|
||||||
|
|
|
@ -30,13 +30,14 @@
|
||||||
|
|
||||||
%% API
|
%% API
|
||||||
-export([start_link/0, stop/0,
|
-export([start_link/0, stop/0,
|
||||||
open/1, open/2, open/3, is_open/0, get/0, close/1]).
|
open/1, open/2, open/3, is_open/0, get/0, close/1, count/0]).
|
||||||
|
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||||
terminate/2, code_change/3]).
|
terminate/2, code_change/3]).
|
||||||
|
|
||||||
-record(state, { conn :: wterl:connection() }).
|
-record(state, { conn :: wterl:connection(),
|
||||||
|
count :: non_neg_integer() }).
|
||||||
|
|
||||||
-type config_list() :: [{atom(), any()}].
|
-type config_list() :: [{atom(), any()}].
|
||||||
|
|
||||||
|
@ -70,6 +71,10 @@ is_open() ->
|
||||||
get() ->
|
get() ->
|
||||||
gen_server:call(?MODULE, get, infinity).
|
gen_server:call(?MODULE, get, infinity).
|
||||||
|
|
||||||
|
-spec count() -> non_neg_integer().
|
||||||
|
count() ->
|
||||||
|
gen_server:call(?MODULE, count, infinity).
|
||||||
|
|
||||||
-spec close(wterl:connection()) -> ok.
|
-spec close(wterl:connection()) -> ok.
|
||||||
close(_Conn) ->
|
close(_Conn) ->
|
||||||
gen_server:call(?MODULE, {close, self()}, infinity).
|
gen_server:call(?MODULE, {close, self()}, infinity).
|
||||||
|
@ -88,15 +93,15 @@ handle_call({open, Dir, ConnectionConfig, SessionConfig, Caller}, _From, #state{
|
||||||
{ok, ConnRef}=OK ->
|
{ok, ConnRef}=OK ->
|
||||||
Monitor = erlang:monitor(process, Caller),
|
Monitor = erlang:monitor(process, Caller),
|
||||||
true = ets:insert(wterl_ets, {Monitor, Caller}),
|
true = ets:insert(wterl_ets, {Monitor, Caller}),
|
||||||
{OK, State#state{conn = ConnRef}};
|
{OK, State#state{conn = ConnRef, count = 1}};
|
||||||
Error ->
|
Error ->
|
||||||
{Error, State}
|
{Error, State}
|
||||||
end,
|
end,
|
||||||
{reply, Reply, NState};
|
{reply, Reply, NState};
|
||||||
handle_call({open, _Dir, _ConnectionConfig, _SessionConfig, Caller}, _From, #state{conn=ConnRef}=State) ->
|
handle_call({open, _Dir, _ConnectionConfig, _SessionConfig, Caller}, _From, #state{conn=ConnRef, count=Count}=State) ->
|
||||||
Monitor = erlang:monitor(process, Caller),
|
Monitor = erlang:monitor(process, Caller),
|
||||||
true = ets:insert(wterl_ets, {Monitor, Caller}),
|
true = ets:insert(wterl_ets, {Monitor, Caller}),
|
||||||
{reply, {ok, ConnRef}, State};
|
{reply, {ok, ConnRef}, State#state{count = Count + 1}};
|
||||||
|
|
||||||
handle_call(is_open, _From, #state{conn=ConnRef}=State) ->
|
handle_call(is_open, _From, #state{conn=ConnRef}=State) ->
|
||||||
{reply, ConnRef /= undefined, State};
|
{reply, ConnRef /= undefined, State};
|
||||||
|
@ -106,18 +111,20 @@ handle_call(get, _From, #state{conn=undefined}=State) ->
|
||||||
handle_call(get, _From, #state{conn=ConnRef}=State) ->
|
handle_call(get, _From, #state{conn=ConnRef}=State) ->
|
||||||
{reply, {ok, ConnRef}, State};
|
{reply, {ok, ConnRef}, State};
|
||||||
|
|
||||||
handle_call({close, Caller}, _From, #state{conn=ConnRef}=State) ->
|
handle_call({close, Caller}, _From, #state{conn=ConnRef, count=Count}=State) ->
|
||||||
{[{Monitor, Caller}], _} = ets:match_object(wterl_ets, {'_', Caller}, 1),
|
{[{Monitor, Caller}], _} = ets:match_object(wterl_ets, {'_', Caller}, 1),
|
||||||
true = erlang:demonitor(Monitor, [flush]),
|
true = erlang:demonitor(Monitor, [flush]),
|
||||||
true = ets:delete(wterl_ets, Monitor),
|
true = ets:delete(wterl_ets, Monitor),
|
||||||
NState = case ets:info(wterl_ets, size) of
|
NState = case ets:info(wterl_ets, size) of
|
||||||
0 ->
|
0 ->
|
||||||
do_close(ConnRef),
|
do_close(ConnRef),
|
||||||
State#state{conn=undefined};
|
State#state{conn=undefined, count=0};
|
||||||
_ ->
|
_ ->
|
||||||
State
|
State#state{count = Count - 1}
|
||||||
end,
|
end,
|
||||||
{reply, ok, NState};
|
{reply, ok, NState};
|
||||||
|
handle_call(count, _From, #state{count=Count}=State) ->
|
||||||
|
{reply, Count, State};
|
||||||
handle_call(_Msg, _From, State) ->
|
handle_call(_Msg, _From, State) ->
|
||||||
{reply, ok, State}.
|
{reply, ok, State}.
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue