Use a set of {Session, Cursor} pairs ("passes") for calls into WiredTiger

to avoid a) opening a large number of Sessions or Cursors and b) avoid
reusing a Session/Cursor pair in more than one thread.  Each operation,
except folds, use the Session/Cursor pair corresponding to the scheduler
id (roughly "core") they are running on.  Each vnode instance will have
(count(schedulers) + count(active folds)) Session/Cursor pairs in use so
be sure to set session_max large enough when opening the connection to
WiredTiger.  This method will cause problems with scheduler threads going
to "sleep" so be sure to run schedmon to wake them up.
This commit is contained in:
Gregory Burd 2013-03-26 14:17:21 -04:00
parent 85b84a5343
commit 77e0dacdb5

View file

@ -43,6 +43,7 @@
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-compiel(export_all).
-endif.
-define(API_VERSION, 1).
@ -50,10 +51,13 @@
%%-define(CAPABILITIES, [async_fold, indexes]).
-define(CAPABILITIES, [async_fold]).
-record(pass, {session :: wterl:session(),
cursor :: wterl:cursor()}).
-type pass() :: #pass{}.
-record(state, {table :: string(),
connection :: wterl:connection(),
cursors :: ets:tid(),
session :: wterl:session()}).
passes :: [pass()]}).
-type state() :: #state{}.
-type config() :: [{atom(), term()}].
@ -81,6 +85,7 @@ capabilities(_, _) ->
%% @doc Start the wterl backend
-spec start(integer(), config()) -> {ok, state()} | {error, term()}.
start(Partition, Config) ->
%lager:start(),
AppStart =
case application:start(wterl) of
ok ->
@ -94,34 +99,31 @@ start(Partition, Config) ->
case AppStart of
ok ->
Table = "lsm:wt" ++ integer_to_list(Partition),
establish_connection(Table, Config);
{ok, Connection} = establish_connection(Config),
Passes = establish_passes(erlang:system_info(schedulers), Connection, Table),
{ok, #state{table=Table, connection=Connection, passes=Passes}};
{error, Reason2} ->
{error, Reason2}
end.
%% @doc Stop the wterl backend
-spec stop(state()) -> ok.
stop(#state{session=undefined, cursors=undefined}) ->
ok;
stop(#state{session=Session, cursors=undefined}) ->
ok = wterl:session_close(Session);
stop(#state{session=Session, cursors=Cursors}=State) ->
ets:foldl(fun({_Table, Cursor}, _) ->
ok = wterl:cursor_close(Cursor)
end, true, Cursors),
ets:delete(Cursors),
stop(State#state{session=Session, cursors=undefined}).
stop(#state{passes=Passes}) ->
lists:foreach(fun(Elem) ->
{Session, Cursor} = Elem,
ok = wterl:cursor_close(Cursor),
ok = wterl:session_close(Session)
end, Passes),
ok.
%% @doc Retrieve an object from the wterl backend
-spec get(riak_object:bucket(), riak_object:key(), state()) ->
{ok, any(), state()} |
{ok, not_found, state()} |
{error, term(), state()}.
get(Bucket, Key, #state{session=undefined}=State) ->
get(Bucket, Key, establish_session(State));
get(Bucket, Key, #state{session=Session, table=Table}=State0) ->
get(Bucket, Key, #state{passes=Passes}=State) ->
WTKey = to_object_key(Bucket, Key),
{Cursor, State} = cursor(Session, Table, get, State0),
{_Session, Cursor} = lists:nth(erlang:system_info(scheduler_id), Passes),
case wterl:cursor_search(Cursor, WTKey) of
{ok, Value} ->
{ok, Value, State};
@ -139,11 +141,9 @@ get(Bucket, Key, #state{session=Session, table=Table}=State0) ->
-spec put(riak_object:bucket(), riak_object:key(), [index_spec()], binary(), state()) ->
{ok, state()} |
{error, term(), state()}.
put(Bucket, PrimaryKey, IndexSpecs, Val, #state{session=undefined}=State) ->
put(Bucket, PrimaryKey, IndexSpecs, Val, establish_session(State));
put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{session=Session, table=Table}=State0) ->
put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{passes=Passes}=State) ->
WTKey = to_object_key(Bucket, PrimaryKey),
{Cursor, State} = cursor(Session, Table, put, State0),
{_Session, Cursor} = lists:nth(erlang:system_info(scheduler_id), Passes),
case wterl:cursor_insert(Cursor, WTKey, Val) of
ok ->
{ok, State};
@ -158,11 +158,9 @@ put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{session=Session, table=Table}=S
-spec delete(riak_object:bucket(), riak_object:key(), [index_spec()], state()) ->
{ok, state()} |
{error, term(), state()}.
delete(Bucket, Key, IndexSpecs, #state{session=undefined}=State) ->
delete(Bucket, Key, IndexSpecs, establish_session(State));
delete(Bucket, Key, _IndexSpecs, #state{session=Session, table=Table}=State0) ->
delete(Bucket, Key, _IndexSpecs, #state{passes=Passes}=State) ->
WTKey = to_object_key(Bucket, Key),
{Cursor, State} = cursor(Session, Table, delete, State0),
{_Session, Cursor} = lists:nth(erlang:system_info(scheduler_id), Passes),
case wterl:cursor_remove(Cursor, WTKey) of
ok ->
{ok, State};
@ -175,12 +173,12 @@ delete(Bucket, Key, _IndexSpecs, #state{session=Session, table=Table}=State0) ->
any(),
[],
state()) -> {ok, any()} | {async, fun()}.
fold_buckets(FoldBucketsFun, Acc, Opts, #state{session=undefined}=State) ->
fold_buckets(FoldBucketsFun, Acc, Opts, establish_session(State));
fold_buckets(FoldBucketsFun, Acc, Opts, #state{session=Session, table=Table}) ->
fold_buckets(FoldBucketsFun, Acc, Opts, #state{table=Table}) ->
{ok, Connection} = wterl_conn:get(),
FoldFun = fold_buckets_fun(FoldBucketsFun),
BucketFolder =
fun() ->
{ok, Session} = wterl:session_open(Connection),
case wterl:cursor_open(Session, Table) of
{error, "No such file or directory"} ->
Acc;
@ -193,7 +191,8 @@ fold_buckets(FoldBucketsFun, Acc, Opts, #state{session=Session, table=Table}) ->
{break, AccFinal} ->
AccFinal
after
ok = wterl:cursor_close(Cursor)
ok = wterl:cursor_close(Cursor),
ok = wterl:session_close(Session)
end
end
end,
@ -209,9 +208,7 @@ fold_buckets(FoldBucketsFun, Acc, Opts, #state{session=Session, table=Table}) ->
any(),
[{atom(), term()}],
state()) -> {ok, term()} | {async, fun()}.
fold_keys(FoldKeysFun, Acc, Opts, #state{session=undefined}=State) ->
fold_keys(FoldKeysFun, Acc, Opts, establish_session(State));
fold_keys(FoldKeysFun, Acc, Opts, #state{session=Session, table=Table}) ->
fold_keys(FoldKeysFun, Acc, Opts, #state{table=Table}) ->
%% Figure out how we should limit the fold: by bucket, by
%% secondary index, or neither (fold across everything.)
Bucket = lists:keyfind(bucket, 1, Opts),
@ -224,10 +221,13 @@ fold_keys(FoldKeysFun, Acc, Opts, #state{session=Session, table=Table}) ->
true -> undefined
end,
{ok, Connection} = wterl_conn:get(),
%% Set up the fold...
FoldFun = fold_keys_fun(FoldKeysFun, Limiter),
KeyFolder =
fun() ->
{ok, Session} = wterl:session_open(Connection),
case wterl:cursor_open(Session, Table) of
{error, "No such file or directory"} ->
Acc;
@ -238,7 +238,8 @@ fold_keys(FoldKeysFun, Acc, Opts, #state{session=Session, table=Table}) ->
{break, AccFinal} ->
AccFinal
after
ok = wterl:cursor_close(Cursor)
ok = wterl:cursor_close(Cursor),
ok = wterl:session_close(Session)
end
end
end,
@ -254,13 +255,13 @@ fold_keys(FoldKeysFun, Acc, Opts, #state{session=Session, table=Table}) ->
any(),
[{atom(), term()}],
state()) -> {ok, any()} | {async, fun()}.
fold_objects(FoldObjectsFun, Acc, Opts, #state{session=undefined}=State) ->
fold_objects(FoldObjectsFun, Acc, Opts, establish_session(State));
fold_objects(FoldObjectsFun, Acc, Opts, #state{session=Session, table=Table}) ->
fold_objects(FoldObjectsFun, Acc, Opts, #state{table=Table}) ->
{ok, Connection} = wterl_conn:get(),
Bucket = proplists:get_value(bucket, Opts),
FoldFun = fold_objects_fun(FoldObjectsFun, Bucket),
ObjectFolder =
fun() ->
{ok, Session} = wterl:session_open(Connection),
case wterl:cursor_open(Session, Table) of
{error, "No such file or directory"} ->
Acc;
@ -271,7 +272,8 @@ fold_objects(FoldObjectsFun, Acc, Opts, #state{session=Session, table=Table}) ->
{break, AccFinal} ->
AccFinal
after
ok = wterl:cursor_close(Cursor)
ok = wterl:cursor_close(Cursor),
ok = wterl:session_close(Session)
end
end
end,
@ -284,9 +286,8 @@ fold_objects(FoldObjectsFun, Acc, Opts, #state{session=Session, table=Table}) ->
%% @doc Delete all objects from this wterl backend
-spec drop(state()) -> {ok, state()} | {error, term(), state()}.
drop(#state{session=undefined}=State) ->
drop(establish_session(State));
drop(#state{table=Table, session=Session}=State) ->
drop(#state{passes=Passes, table=Table}=State) ->
{Session, _Cursor} = lists:nth(erlang:system_info(scheduler_id), Passes),
case wterl:session_truncate(Session, Table) of
ok ->
{ok, State};
@ -297,10 +298,9 @@ drop(#state{table=Table, session=Session}=State) ->
%% @doc Returns true if this wterl backend contains any
%% non-tombstone values; otherwise returns false.
-spec is_empty(state()) -> boolean().
is_empty(#state{session=undefined}=State) ->
is_empty(establish_session(State));
is_empty(#state{table=Table, session=Session}) ->
{ok, Cursor} = wterl:cursor_open(Session, Table),
is_empty(#state{passes=Passes}) ->
{_Session, Cursor} = lists:nth(erlang:system_info(scheduler_id), Passes),
wterl:cursor_reset(Cursor),
try
not_found =:= wterl:cursor_next(Cursor)
after
@ -309,10 +309,8 @@ is_empty(#state{table=Table, session=Session}) ->
%% @doc Get the status information for this wterl backend
-spec status(state()) -> [{atom(), term()}].
status(#state{session=undefined}=State) ->
status(establish_session(State));
status(#state{table=Table, session=Session}) ->
{ok, Cursor} = wterl:cursor_open(Session, "statistics:"++Table),
status(#state{passes=Passes}) ->
{_Session, Cursor} = lists:nth(erlang:system_info(scheduler_id), Passes),
try
Stats = fetch_status(Cursor),
[{stats, Stats}]
@ -330,21 +328,16 @@ callback(_Ref, _Msg, State) ->
%% Internal functions
%% ===================================================================
cursor(Session, Table, Type, #state{cursors=undefined}=State) ->
Cursors = ets:new(?MODULE, [{read_concurrency, true}]),
cursor(Session, Table, Type, State#state{cursors=Cursors});
cursor(Session, Table, Type, #state{cursors=Cursors}=State) ->
case ets:lookup(Cursors, {Type, Table}) of
[{{_Type, Table}, Cursor}] ->
{Cursor, State};
_ ->
{ok, Cursor} = wterl:cursor_open(Session, Table),
ets:insert(Cursors, {Table, Cursor}),
{Cursor, State}
end.
max_sessions(Config) ->
RingSize =
case app_helper:get_prop_or_env(ring_creation_size, Config, riak_core) of
undefined -> 1024;
Size -> Size
end,
2 * (RingSize * erlang:system_info(schedulers)).
%% @private
establish_connection(Table, Config) ->
establish_connection(Config) ->
%% Get the data root directory
case app_helper:get_prop_or_env(data_root, Config, wterl) of
undefined ->
@ -352,40 +345,42 @@ establish_connection(Table, Config) ->
{error, data_root_unset};
DataRoot ->
ok = filelib:ensure_dir(filename:join(DataRoot, "x")),
SessionMax =
case app_helper:get_prop_or_env(ring_creation_size, Config, riak_core) of
undefined -> 1024;
RingSize when RingSize < 512 -> 1024;
RingSize -> RingSize * 2
end,
RequestedCacheSize = app_helper:get_prop_or_env(cache_size, Config, wterl),
Opts = orddict:from_list(
[ wterl:config_value(create, Config, true),
wterl:config_value(sync, Config, false),
wterl:config_value(logging, Config, true),
wterl:config_value(transactional, Config, true),
wterl:config_value(session_max, Config, SessionMax),
wterl:config_value(cache_size, Config, size_cache(RequestedCacheSize)),
wterl:config_value(statistics_log, Config, [{wait, 30}]), % sec
wterl:config_value(verbose, Config, [
Opts =
orddict:from_list(
[ wterl:config_value(create, Config, true),
wterl:config_value(sync, Config, false),
wterl:config_value(logging, Config, true),
wterl:config_value(transactional, Config, true),
wterl:config_value(session_max, Config, max_sessions(Config)),
wterl:config_value(cache_size, Config, size_cache(RequestedCacheSize)),
wterl:config_value(statistics_log, Config, [{wait, 30}]), % sec
wterl:config_value(checkpoint, Config, [{wait, 10}]), % sec
wterl:config_value(verbose, Config, [
%"ckpt" "block", "shared_cache", "evictserver", "fileops",
%"hazard", "mutex", "read", "readserver", "reconcile",
%"salvage", "verify", "write", "evict", "lsm"
])
] ++ proplists:get_value(wterl, Config, [])), % sec
%%lager:info("WiredTiger connection:open(~s, ~s)", [DataRoot, wterl:config_to_bin(Opts)]),
]) ] ++ proplists:get_value(wterl, Config, [])), % sec
%lager:info("WiredTiger connection:open(~s, ~s)", [DataRoot, wterl:config_to_bin(Opts)]),
case wterl_conn:open(DataRoot, Opts) of
{ok, Connection} ->
{ok, #state{table=Table, connection=Connection}};
{ok, Connection};
{error, Reason2} ->
lager:error("Failed to establish a WiredTiger connection, wterl backend unable to start: ~p\n", [Reason2]),
{error, Reason2}
end
end.
establish_passes(Count, Connection, Table)
when is_number(Count), Count > 0 ->
lists:map(fun(_Elem) ->
{ok, Session} = establish_session(Connection, Table),
{ok, Cursor} = wterl:cursor_open(Session, Table),
{Session, Cursor}
end, lists:seq(1, Count)).
%% @private
establish_session(#state{table=Table, session=undefined}=State) ->
{ok, Connection} = wterl_conn:get(),
establish_session(Connection, Table) ->
case wterl:session_open(Connection, wterl:config_to_bin([{isolation, "snapshot"}])) of
{ok, Session} ->
SessionOpts =
@ -398,17 +393,15 @@ establish_session(#state{table=Table, session=undefined}=State) ->
{lsm_bloom_config, [{leaf_page_max, "10MB"}]} ],
case wterl:session_create(Session, Table, wterl:config_to_bin(SessionOpts)) of
ok ->
State#state{session=Session};
{ok, Session};
{error, Reason} ->
lager:error("Failed to start wterl backend: ~p\n", [Reason]),
State
{error, Reason}
end;
{error, Reason} ->
lager:error("Failed to open a WiredTiger session: ~p\n", [Reason]),
State
end;
establish_session(State) ->
State.
{error, Reason}
end.
%% @private
%% Return a function to fold over the buckets on this backend
@ -555,7 +548,7 @@ size_cache(RequestedSize) ->
"1GB"
end,
application:set_env(wterl, cache_size, FinalGuess),
lager:info("Using cache size of ~p for WiredTiger storage backend.", [FinalGuess]),
%lager:info("Using cache size of ~p for WiredTiger storage backend.", [FinalGuess]),
FinalGuess;
Value when is_list(Value) ->
Value;