From 77e0dacdb54687777f3c11ee2850e673659202a5 Mon Sep 17 00:00:00 2001 From: Gregory Burd Date: Tue, 26 Mar 2013 14:17:21 -0400 Subject: [PATCH] Use a set of {Session, Cursor} pairs ("passes") for calls into WiredTiger to avoid a) opening a large number of Sessions or Cursors and b) avoid reusing a Session/Cursor pair in more than one thread. Each operation, except folds, use the Session/Cursor pair corresponding to the scheduler id (roughly "core") they are running on. Each vnode instance will have (count(schedulers) + count(active folds)) Session/Cursor pairs in use so be sure to set session_max large enough when opening the connection to WiredTiger. This method will cause problems with scheduler threads going to "sleep" so be sure to run schedmon to wake them up. --- src/riak_kv_wterl_backend.erl | 171 ++++++++++++++++------------------ 1 file changed, 82 insertions(+), 89 deletions(-) diff --git a/src/riak_kv_wterl_backend.erl b/src/riak_kv_wterl_backend.erl index 744e3ba..dc37c02 100644 --- a/src/riak_kv_wterl_backend.erl +++ b/src/riak_kv_wterl_backend.erl @@ -43,6 +43,7 @@ -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). +-compiel(export_all). -endif. -define(API_VERSION, 1). @@ -50,10 +51,13 @@ %%-define(CAPABILITIES, [async_fold, indexes]). -define(CAPABILITIES, [async_fold]). +-record(pass, {session :: wterl:session(), + cursor :: wterl:cursor()}). +-type pass() :: #pass{}. + -record(state, {table :: string(), connection :: wterl:connection(), - cursors :: ets:tid(), - session :: wterl:session()}). + passes :: [pass()]}). -type state() :: #state{}. -type config() :: [{atom(), term()}]. @@ -81,6 +85,7 @@ capabilities(_, _) -> %% @doc Start the wterl backend -spec start(integer(), config()) -> {ok, state()} | {error, term()}. start(Partition, Config) -> + %lager:start(), AppStart = case application:start(wterl) of ok -> @@ -94,34 +99,31 @@ start(Partition, Config) -> case AppStart of ok -> Table = "lsm:wt" ++ integer_to_list(Partition), - establish_connection(Table, Config); + {ok, Connection} = establish_connection(Config), + Passes = establish_passes(erlang:system_info(schedulers), Connection, Table), + {ok, #state{table=Table, connection=Connection, passes=Passes}}; {error, Reason2} -> {error, Reason2} end. %% @doc Stop the wterl backend -spec stop(state()) -> ok. -stop(#state{session=undefined, cursors=undefined}) -> - ok; -stop(#state{session=Session, cursors=undefined}) -> - ok = wterl:session_close(Session); -stop(#state{session=Session, cursors=Cursors}=State) -> - ets:foldl(fun({_Table, Cursor}, _) -> - ok = wterl:cursor_close(Cursor) - end, true, Cursors), - ets:delete(Cursors), - stop(State#state{session=Session, cursors=undefined}). +stop(#state{passes=Passes}) -> + lists:foreach(fun(Elem) -> + {Session, Cursor} = Elem, + ok = wterl:cursor_close(Cursor), + ok = wterl:session_close(Session) + end, Passes), + ok. %% @doc Retrieve an object from the wterl backend -spec get(riak_object:bucket(), riak_object:key(), state()) -> {ok, any(), state()} | {ok, not_found, state()} | {error, term(), state()}. -get(Bucket, Key, #state{session=undefined}=State) -> - get(Bucket, Key, establish_session(State)); -get(Bucket, Key, #state{session=Session, table=Table}=State0) -> +get(Bucket, Key, #state{passes=Passes}=State) -> WTKey = to_object_key(Bucket, Key), - {Cursor, State} = cursor(Session, Table, get, State0), + {_Session, Cursor} = lists:nth(erlang:system_info(scheduler_id), Passes), case wterl:cursor_search(Cursor, WTKey) of {ok, Value} -> {ok, Value, State}; @@ -139,11 +141,9 @@ get(Bucket, Key, #state{session=Session, table=Table}=State0) -> -spec put(riak_object:bucket(), riak_object:key(), [index_spec()], binary(), state()) -> {ok, state()} | {error, term(), state()}. -put(Bucket, PrimaryKey, IndexSpecs, Val, #state{session=undefined}=State) -> - put(Bucket, PrimaryKey, IndexSpecs, Val, establish_session(State)); -put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{session=Session, table=Table}=State0) -> +put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{passes=Passes}=State) -> WTKey = to_object_key(Bucket, PrimaryKey), - {Cursor, State} = cursor(Session, Table, put, State0), + {_Session, Cursor} = lists:nth(erlang:system_info(scheduler_id), Passes), case wterl:cursor_insert(Cursor, WTKey, Val) of ok -> {ok, State}; @@ -158,11 +158,9 @@ put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{session=Session, table=Table}=S -spec delete(riak_object:bucket(), riak_object:key(), [index_spec()], state()) -> {ok, state()} | {error, term(), state()}. -delete(Bucket, Key, IndexSpecs, #state{session=undefined}=State) -> - delete(Bucket, Key, IndexSpecs, establish_session(State)); -delete(Bucket, Key, _IndexSpecs, #state{session=Session, table=Table}=State0) -> +delete(Bucket, Key, _IndexSpecs, #state{passes=Passes}=State) -> WTKey = to_object_key(Bucket, Key), - {Cursor, State} = cursor(Session, Table, delete, State0), + {_Session, Cursor} = lists:nth(erlang:system_info(scheduler_id), Passes), case wterl:cursor_remove(Cursor, WTKey) of ok -> {ok, State}; @@ -175,12 +173,12 @@ delete(Bucket, Key, _IndexSpecs, #state{session=Session, table=Table}=State0) -> any(), [], state()) -> {ok, any()} | {async, fun()}. -fold_buckets(FoldBucketsFun, Acc, Opts, #state{session=undefined}=State) -> - fold_buckets(FoldBucketsFun, Acc, Opts, establish_session(State)); -fold_buckets(FoldBucketsFun, Acc, Opts, #state{session=Session, table=Table}) -> +fold_buckets(FoldBucketsFun, Acc, Opts, #state{table=Table}) -> + {ok, Connection} = wterl_conn:get(), FoldFun = fold_buckets_fun(FoldBucketsFun), BucketFolder = fun() -> + {ok, Session} = wterl:session_open(Connection), case wterl:cursor_open(Session, Table) of {error, "No such file or directory"} -> Acc; @@ -193,7 +191,8 @@ fold_buckets(FoldBucketsFun, Acc, Opts, #state{session=Session, table=Table}) -> {break, AccFinal} -> AccFinal after - ok = wterl:cursor_close(Cursor) + ok = wterl:cursor_close(Cursor), + ok = wterl:session_close(Session) end end end, @@ -209,9 +208,7 @@ fold_buckets(FoldBucketsFun, Acc, Opts, #state{session=Session, table=Table}) -> any(), [{atom(), term()}], state()) -> {ok, term()} | {async, fun()}. -fold_keys(FoldKeysFun, Acc, Opts, #state{session=undefined}=State) -> - fold_keys(FoldKeysFun, Acc, Opts, establish_session(State)); -fold_keys(FoldKeysFun, Acc, Opts, #state{session=Session, table=Table}) -> +fold_keys(FoldKeysFun, Acc, Opts, #state{table=Table}) -> %% Figure out how we should limit the fold: by bucket, by %% secondary index, or neither (fold across everything.) Bucket = lists:keyfind(bucket, 1, Opts), @@ -224,10 +221,13 @@ fold_keys(FoldKeysFun, Acc, Opts, #state{session=Session, table=Table}) -> true -> undefined end, + {ok, Connection} = wterl_conn:get(), + %% Set up the fold... FoldFun = fold_keys_fun(FoldKeysFun, Limiter), KeyFolder = fun() -> + {ok, Session} = wterl:session_open(Connection), case wterl:cursor_open(Session, Table) of {error, "No such file or directory"} -> Acc; @@ -238,7 +238,8 @@ fold_keys(FoldKeysFun, Acc, Opts, #state{session=Session, table=Table}) -> {break, AccFinal} -> AccFinal after - ok = wterl:cursor_close(Cursor) + ok = wterl:cursor_close(Cursor), + ok = wterl:session_close(Session) end end end, @@ -254,13 +255,13 @@ fold_keys(FoldKeysFun, Acc, Opts, #state{session=Session, table=Table}) -> any(), [{atom(), term()}], state()) -> {ok, any()} | {async, fun()}. -fold_objects(FoldObjectsFun, Acc, Opts, #state{session=undefined}=State) -> - fold_objects(FoldObjectsFun, Acc, Opts, establish_session(State)); -fold_objects(FoldObjectsFun, Acc, Opts, #state{session=Session, table=Table}) -> +fold_objects(FoldObjectsFun, Acc, Opts, #state{table=Table}) -> + {ok, Connection} = wterl_conn:get(), Bucket = proplists:get_value(bucket, Opts), FoldFun = fold_objects_fun(FoldObjectsFun, Bucket), ObjectFolder = fun() -> + {ok, Session} = wterl:session_open(Connection), case wterl:cursor_open(Session, Table) of {error, "No such file or directory"} -> Acc; @@ -271,7 +272,8 @@ fold_objects(FoldObjectsFun, Acc, Opts, #state{session=Session, table=Table}) -> {break, AccFinal} -> AccFinal after - ok = wterl:cursor_close(Cursor) + ok = wterl:cursor_close(Cursor), + ok = wterl:session_close(Session) end end end, @@ -284,9 +286,8 @@ fold_objects(FoldObjectsFun, Acc, Opts, #state{session=Session, table=Table}) -> %% @doc Delete all objects from this wterl backend -spec drop(state()) -> {ok, state()} | {error, term(), state()}. -drop(#state{session=undefined}=State) -> - drop(establish_session(State)); -drop(#state{table=Table, session=Session}=State) -> +drop(#state{passes=Passes, table=Table}=State) -> + {Session, _Cursor} = lists:nth(erlang:system_info(scheduler_id), Passes), case wterl:session_truncate(Session, Table) of ok -> {ok, State}; @@ -297,10 +298,9 @@ drop(#state{table=Table, session=Session}=State) -> %% @doc Returns true if this wterl backend contains any %% non-tombstone values; otherwise returns false. -spec is_empty(state()) -> boolean(). -is_empty(#state{session=undefined}=State) -> - is_empty(establish_session(State)); -is_empty(#state{table=Table, session=Session}) -> - {ok, Cursor} = wterl:cursor_open(Session, Table), +is_empty(#state{passes=Passes}) -> + {_Session, Cursor} = lists:nth(erlang:system_info(scheduler_id), Passes), + wterl:cursor_reset(Cursor), try not_found =:= wterl:cursor_next(Cursor) after @@ -309,10 +309,8 @@ is_empty(#state{table=Table, session=Session}) -> %% @doc Get the status information for this wterl backend -spec status(state()) -> [{atom(), term()}]. -status(#state{session=undefined}=State) -> - status(establish_session(State)); -status(#state{table=Table, session=Session}) -> - {ok, Cursor} = wterl:cursor_open(Session, "statistics:"++Table), +status(#state{passes=Passes}) -> + {_Session, Cursor} = lists:nth(erlang:system_info(scheduler_id), Passes), try Stats = fetch_status(Cursor), [{stats, Stats}] @@ -330,21 +328,16 @@ callback(_Ref, _Msg, State) -> %% Internal functions %% =================================================================== -cursor(Session, Table, Type, #state{cursors=undefined}=State) -> - Cursors = ets:new(?MODULE, [{read_concurrency, true}]), - cursor(Session, Table, Type, State#state{cursors=Cursors}); -cursor(Session, Table, Type, #state{cursors=Cursors}=State) -> - case ets:lookup(Cursors, {Type, Table}) of - [{{_Type, Table}, Cursor}] -> - {Cursor, State}; - _ -> - {ok, Cursor} = wterl:cursor_open(Session, Table), - ets:insert(Cursors, {Table, Cursor}), - {Cursor, State} - end. +max_sessions(Config) -> + RingSize = + case app_helper:get_prop_or_env(ring_creation_size, Config, riak_core) of + undefined -> 1024; + Size -> Size + end, + 2 * (RingSize * erlang:system_info(schedulers)). %% @private -establish_connection(Table, Config) -> +establish_connection(Config) -> %% Get the data root directory case app_helper:get_prop_or_env(data_root, Config, wterl) of undefined -> @@ -352,40 +345,42 @@ establish_connection(Table, Config) -> {error, data_root_unset}; DataRoot -> ok = filelib:ensure_dir(filename:join(DataRoot, "x")), - SessionMax = - case app_helper:get_prop_or_env(ring_creation_size, Config, riak_core) of - undefined -> 1024; - RingSize when RingSize < 512 -> 1024; - RingSize -> RingSize * 2 - end, RequestedCacheSize = app_helper:get_prop_or_env(cache_size, Config, wterl), - Opts = orddict:from_list( - [ wterl:config_value(create, Config, true), - wterl:config_value(sync, Config, false), - wterl:config_value(logging, Config, true), - wterl:config_value(transactional, Config, true), - wterl:config_value(session_max, Config, SessionMax), - wterl:config_value(cache_size, Config, size_cache(RequestedCacheSize)), - wterl:config_value(statistics_log, Config, [{wait, 30}]), % sec - wterl:config_value(verbose, Config, [ + Opts = + orddict:from_list( + [ wterl:config_value(create, Config, true), + wterl:config_value(sync, Config, false), + wterl:config_value(logging, Config, true), + wterl:config_value(transactional, Config, true), + wterl:config_value(session_max, Config, max_sessions(Config)), + wterl:config_value(cache_size, Config, size_cache(RequestedCacheSize)), + wterl:config_value(statistics_log, Config, [{wait, 30}]), % sec + wterl:config_value(checkpoint, Config, [{wait, 10}]), % sec + wterl:config_value(verbose, Config, [ %"ckpt" "block", "shared_cache", "evictserver", "fileops", %"hazard", "mutex", "read", "readserver", "reconcile", %"salvage", "verify", "write", "evict", "lsm" - ]) - ] ++ proplists:get_value(wterl, Config, [])), % sec - %%lager:info("WiredTiger connection:open(~s, ~s)", [DataRoot, wterl:config_to_bin(Opts)]), + ]) ] ++ proplists:get_value(wterl, Config, [])), % sec + %lager:info("WiredTiger connection:open(~s, ~s)", [DataRoot, wterl:config_to_bin(Opts)]), case wterl_conn:open(DataRoot, Opts) of {ok, Connection} -> - {ok, #state{table=Table, connection=Connection}}; + {ok, Connection}; {error, Reason2} -> lager:error("Failed to establish a WiredTiger connection, wterl backend unable to start: ~p\n", [Reason2]), {error, Reason2} end end. +establish_passes(Count, Connection, Table) + when is_number(Count), Count > 0 -> + lists:map(fun(_Elem) -> + {ok, Session} = establish_session(Connection, Table), + {ok, Cursor} = wterl:cursor_open(Session, Table), + {Session, Cursor} + end, lists:seq(1, Count)). + %% @private -establish_session(#state{table=Table, session=undefined}=State) -> - {ok, Connection} = wterl_conn:get(), +establish_session(Connection, Table) -> case wterl:session_open(Connection, wterl:config_to_bin([{isolation, "snapshot"}])) of {ok, Session} -> SessionOpts = @@ -398,17 +393,15 @@ establish_session(#state{table=Table, session=undefined}=State) -> {lsm_bloom_config, [{leaf_page_max, "10MB"}]} ], case wterl:session_create(Session, Table, wterl:config_to_bin(SessionOpts)) of ok -> - State#state{session=Session}; + {ok, Session}; {error, Reason} -> lager:error("Failed to start wterl backend: ~p\n", [Reason]), - State + {error, Reason} end; {error, Reason} -> lager:error("Failed to open a WiredTiger session: ~p\n", [Reason]), - State - end; -establish_session(State) -> - State. + {error, Reason} + end. %% @private %% Return a function to fold over the buckets on this backend @@ -555,7 +548,7 @@ size_cache(RequestedSize) -> "1GB" end, application:set_env(wterl, cache_size, FinalGuess), - lager:info("Using cache size of ~p for WiredTiger storage backend.", [FinalGuess]), + %lager:info("Using cache size of ~p for WiredTiger storage backend.", [FinalGuess]), FinalGuess; Value when is_list(Value) -> Value;