From 8e1e1b192d3b696839abe5a502ab97ad2469038b Mon Sep 17 00:00:00 2001 From: Gregory Burd Date: Thu, 16 May 2013 11:18:32 -0400 Subject: [PATCH] Allow clients of async_nif to set the number of concurrent workers. --- Makefile | 2 +- c_src/async_nif.h | 56 ++++++++++++++++++++++++++++------- c_src/build_deps.sh | 11 +++---- c_src/wterl.c | 20 +++++++++++++ src/riak_kv_wterl_backend.erl | 8 ++++- src/wterl.erl | 8 ++++- src/wterl_conn.erl | 23 +++++++++----- 7 files changed, 101 insertions(+), 27 deletions(-) diff --git a/Makefile b/Makefile index e485507..c0356db 100644 --- a/Makefile +++ b/Makefile @@ -84,7 +84,7 @@ repl: @$(ERL) -pa ebin -pz deps/lager/ebin eunit-repl: - @$(ERL) -pa .eunit deps/lager/ebin + @$(ERL) -pa .eunit -pz ebin deps/lager/ebin ERL_TOP= /home/gburd/eng/otp_R15B01 CERL= ${ERL_TOP}/bin/cerl diff --git a/c_src/async_nif.h b/c_src/async_nif.h index e7a9670..9f9cd8a 100644 --- a/c_src/async_nif.h +++ b/c_src/async_nif.h @@ -34,7 +34,7 @@ extern "C" { #define __UNUSED(v) ((void)(v)) #endif -#define ASYNC_NIF_MAX_WORKERS 128 +#define ASYNC_NIF_MAX_WORKERS 1024 #define ASYNC_NIF_WORKER_QUEUE_SIZE 500 #define ASYNC_NIF_MAX_QUEUED_REQS 1000 * ASYNC_NIF_MAX_WORKERS @@ -67,6 +67,7 @@ struct async_nif_worker_entry { struct async_nif_state { STAT_DEF(qwait); unsigned int shutdown; + ErlNifMutex *worker_mutex; unsigned int num_workers; struct async_nif_worker_entry worker_entries[ASYNC_NIF_MAX_WORKERS]; unsigned int num_queues; @@ -168,6 +169,9 @@ struct async_nif_state { #define ASYNC_NIF_REPLY(msg) enif_send(NULL, pid, env, enif_make_tuple2(env, ref, msg)) +#define ASYNC_NIF_SET_MIN_WORKERS(n) \ + async_nif_set_min_active_workers(*(struct async_nif_state**)enif_priv_data(env), (n)) + /** * Return a request structure from the recycled req queue if one exists, * otherwise create one. @@ -425,10 +429,42 @@ async_nif_unload(ErlNifEnv *env, struct async_nif_state *async_nif) enif_mutex_unlock(async_nif->recycled_req_mutex); enif_mutex_destroy(async_nif->recycled_req_mutex); + enif_mutex_destroy(async_nif->worker_mutex); memset(async_nif, 0, sizeof(struct async_nif_state) + (sizeof(struct async_nif_work_queue) * async_nif->num_queues)); enif_free(async_nif); } +static int +async_nif_set_min_active_workers(struct async_nif_state *async_nif, unsigned int n) +{ + unsigned int i = 0; + + enif_mutex_lock(async_nif->worker_mutex); + + if (n <= 0 || n <= async_nif->num_workers || n > ASYNC_NIF_MAX_WORKERS) + return async_nif->num_workers; + + /* Start the worker threads. */ + for (i = async_nif->num_workers; i < n; i++) { + struct async_nif_worker_entry *we = &async_nif->worker_entries[i]; + we->async_nif = async_nif; + we->worker_id = i; + we->q = &async_nif->queues[i % async_nif->num_queues]; + if (enif_thread_create(NULL, &async_nif->worker_entries[i].tid, + &async_nif_worker_fn, (void*)we, NULL) != 0) { + /* error, return number of threads started so far */ + we->async_nif = 0; + we->worker_id = 0; + we->q = 0; + return i; + } else { + async_nif->num_workers = i + 1; + } + } + enif_mutex_unlock(async_nif->worker_mutex); + return i; +} + static void * async_nif_load() { @@ -466,7 +502,6 @@ async_nif_load() sizeof(struct async_nif_work_queue) * num_queues); async_nif->num_queues = num_queues; - async_nif->num_workers = 2 * num_queues; async_nif->next_q = 0; async_nif->shutdown = 0; async_nif->recycled_reqs = fifo_q_new(reqs, ASYNC_NIF_MAX_QUEUED_REQS); @@ -485,15 +520,12 @@ async_nif_load() memset(async_nif->worker_entries, 0, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS); /* Start the worker threads. */ - for (i = 0; i < async_nif->num_workers; i++) { - struct async_nif_worker_entry *we = &async_nif->worker_entries[i]; - we->async_nif = async_nif; - we->worker_id = i; - we->q = &async_nif->queues[i % async_nif->num_queues]; - if (enif_thread_create(NULL, &async_nif->worker_entries[i].tid, - &async_nif_worker_fn, (void*)we, NULL) != 0) { + async_nif->worker_mutex = enif_mutex_create(NULL); + unsigned int started = async_nif_set_min_active_workers(async_nif, 2 * num_queues); + if (started != 2 * num_queues) { + /* Something went wrong, shut down. */ async_nif->shutdown = 1; - + enif_mutex_lock(async_nif->worker_mutex); for (j = 0; j < async_nif->num_queues; j++) { struct async_nif_work_queue *q = &async_nif->queues[j]; enif_cond_broadcast(q->reqs_cnd); @@ -511,10 +543,12 @@ async_nif_load() } memset(async_nif->worker_entries, 0, sizeof(struct async_nif_worker_entry) * ASYNC_NIF_MAX_WORKERS); + enif_mutex_unlock(async_nif->worker_mutex); + enif_mutex_destroy(async_nif->worker_mutex); enif_free(async_nif); return NULL; - } } + async_nif->num_workers = started; return async_nif; } diff --git a/c_src/build_deps.sh b/c_src/build_deps.sh index 789bcea..d3105ab 100755 --- a/c_src/build_deps.sh +++ b/c_src/build_deps.sh @@ -1,4 +1,4 @@ -#!/bin/bash +#!/bin/bash -x # /bin/sh on Solaris is not a POSIX compatible shell, but /usr/bin/ksh is. if [ `uname -s` = 'SunOS' -a "${POSIX_SHELL}" != "true" ]; then @@ -39,7 +39,7 @@ get_wt () git clone ${WT_REPO} && \ (cd $BASEDIR/wiredtiger && git checkout $WT_VSN || exit 1) else - git clone -b ${WT_BRANCH} ${WT_REPO} && \ + git clone ${WT_REPO} && \ (cd $BASEDIR/wiredtiger && git checkout $WT_BRANCH origin/$WT_BRANCH || exit 1) fi mv wiredtiger $WT_DIR || exit 1 @@ -49,8 +49,8 @@ get_wt () [ -e $BASEDIR/wiredtiger-build.patch ] && \ (patch -p1 --forward < $BASEDIR/wiredtiger-build.patch || exit 1 ) ./autogen.sh || exit 1 - cd ./build_posix || exit 1 - [ -e Makefile ] && $MAKE distclean + [ -e $BASEDIR/$WT_DIR/build_posix/Makefile ] && \ + (cd $BASEDIR/$WT_DIR/build_posix && $MAKE distclean) wt_configure; ) } @@ -109,7 +109,8 @@ build_snappy () case "$1" in clean) - [ -d $WT_DIR/build_posix ] && (cd $WT_DIR/build_posix; make distclean) + [ -e $BASEDIR/$WT_DIR/build_posix/Makefile ] && \ + (cd $BASEDIR/$WT_DIR/build_posix && $MAKE distclean) rm -rf system $SNAPPY_DIR rm -f ${BASEDIR}/../priv/wt rm -f ${BASEDIR}/../priv/libwiredtiger-*.so diff --git a/c_src/wterl.c b/c_src/wterl.c index f68723e..441300f 100644 --- a/c_src/wterl.c +++ b/c_src/wterl.c @@ -2024,6 +2024,25 @@ ASYNC_NIF_DECL( }); +/** + * Used to set the minimum number of worker threads. + */ +static ERL_NIF_TERM +wterl_set_concurrency(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) +{ + int n = 0; + + if (!(argc == 1 && enif_is_number(env, argv[0]))) { + return enif_make_badarg(env); + } + + enif_get_int(env, argv[0], &n); + ASYNC_NIF_SET_MIN_WORKERS(n); + + return ATOM_OK; +} + + /** * Called by wterl_event_handler to set the pid for message delivery. */ @@ -2276,6 +2295,7 @@ static ErlNifFunc nif_funcs[] = {"cursor_search_near_nif", 4, wterl_cursor_search_near}, {"cursor_search_nif", 4, wterl_cursor_search}, {"cursor_update_nif", 4, wterl_cursor_update}, + {"set_concurrency", 1, wterl_set_concurrency}, {"set_event_handler_pid", 1, wterl_set_event_handler_pid}, }; diff --git a/src/riak_kv_wterl_backend.erl b/src/riak_kv_wterl_backend.erl index 4d0448d..460d74a 100644 --- a/src/riak_kv_wterl_backend.erl +++ b/src/riak_kv_wterl_backend.erl @@ -107,6 +107,12 @@ start(Partition, Config) -> "lsm" end, {ok, Connection} = establish_connection(Config, Type), + case wterl_conn:count() > erlang:system_info(schedulers) of + true -> + ok = wterl:set_concurrency(wterl_conn:count()); + false -> + ok = wterl:set_concurrency(erlang:system_info(schedulers)) + end, Table = Type ++ ":" ++ integer_to_list(Partition), Compressor = case wterl:config_value(block_compressor, Config, "snappy") of @@ -122,7 +128,7 @@ start(Partition, Config) -> [{internal_page_max, "128K"}, {leaf_page_max, "128K"}, {lsm_chunk_size, "100MB"}, - {lsm_merge_threads, "2"}, + {lsm_merge_threads, 2}, {prefix_compression, false}, {lsm_bloom_newest, true}, {lsm_bloom_oldest, true} , diff --git a/src/wterl.erl b/src/wterl.erl index 45afae5..743188f 100644 --- a/src/wterl.erl +++ b/src/wterl.erl @@ -65,7 +65,8 @@ config_value/3, priv_dir/0, fold_keys/3, - fold/3]). + fold/3, + set_concurrency/1]). -export([set_event_handler_pid/1]). @@ -514,6 +515,11 @@ set_event_handler_pid(Pid) ?nif_stub. +-spec set_concurrency(non_neg_integer()) -> ok | {error, term()}. +set_concurrency(_Count) -> + ?nif_stub. + + %% =================================================================== %% EUnit tests %% =================================================================== diff --git a/src/wterl_conn.erl b/src/wterl_conn.erl index 9f62d46..a57b9b5 100644 --- a/src/wterl_conn.erl +++ b/src/wterl_conn.erl @@ -30,13 +30,14 @@ %% API -export([start_link/0, stop/0, - open/1, open/2, open/3, is_open/0, get/0, close/1]). + open/1, open/2, open/3, is_open/0, get/0, close/1, count/0]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, { conn :: wterl:connection() }). +-record(state, { conn :: wterl:connection(), + count :: non_neg_integer() }). -type config_list() :: [{atom(), any()}]. @@ -70,6 +71,10 @@ is_open() -> get() -> gen_server:call(?MODULE, get, infinity). +-spec count() -> non_neg_integer(). +count() -> + gen_server:call(?MODULE, count, infinity). + -spec close(wterl:connection()) -> ok. close(_Conn) -> gen_server:call(?MODULE, {close, self()}, infinity). @@ -88,15 +93,15 @@ handle_call({open, Dir, ConnectionConfig, SessionConfig, Caller}, _From, #state{ {ok, ConnRef}=OK -> Monitor = erlang:monitor(process, Caller), true = ets:insert(wterl_ets, {Monitor, Caller}), - {OK, State#state{conn = ConnRef}}; + {OK, State#state{conn = ConnRef, count = 1}}; Error -> {Error, State} end, {reply, Reply, NState}; -handle_call({open, _Dir, _ConnectionConfig, _SessionConfig, Caller}, _From, #state{conn=ConnRef}=State) -> +handle_call({open, _Dir, _ConnectionConfig, _SessionConfig, Caller}, _From, #state{conn=ConnRef, count=Count}=State) -> Monitor = erlang:monitor(process, Caller), true = ets:insert(wterl_ets, {Monitor, Caller}), - {reply, {ok, ConnRef}, State}; + {reply, {ok, ConnRef}, State#state{count = Count + 1}}; handle_call(is_open, _From, #state{conn=ConnRef}=State) -> {reply, ConnRef /= undefined, State}; @@ -106,18 +111,20 @@ handle_call(get, _From, #state{conn=undefined}=State) -> handle_call(get, _From, #state{conn=ConnRef}=State) -> {reply, {ok, ConnRef}, State}; -handle_call({close, Caller}, _From, #state{conn=ConnRef}=State) -> +handle_call({close, Caller}, _From, #state{conn=ConnRef, count=Count}=State) -> {[{Monitor, Caller}], _} = ets:match_object(wterl_ets, {'_', Caller}, 1), true = erlang:demonitor(Monitor, [flush]), true = ets:delete(wterl_ets, Monitor), NState = case ets:info(wterl_ets, size) of 0 -> do_close(ConnRef), - State#state{conn=undefined}; + State#state{conn=undefined, count=0}; _ -> - State + State#state{count = Count - 1} end, {reply, ok, NState}; +handle_call(count, _From, #state{count=Count}=State) -> + {reply, Count, State}; handle_call(_Msg, _From, State) -> {reply, ok, State}.