Only open a session/cursor when first needed. There is still a race

between vnodes during startup, ideally wterl_conn would only be opened
once.
This commit is contained in:
Gregory Burd 2013-03-16 23:28:13 -04:00
parent 22d42f5076
commit 1129241fe0
2 changed files with 146 additions and 119 deletions

View file

@ -52,8 +52,7 @@
%%-define(CAPABILITIES, [async_fold, indexes]). %%-define(CAPABILITIES, [async_fold, indexes]).
-define(CAPABILITIES, [async_fold]). -define(CAPABILITIES, [async_fold]).
-record(state, {conn :: wterl:connection(), -record(state, {table :: string(),
table :: string(),
session :: wterl:session(), session :: wterl:session(),
cursor :: wterl:cursor(), cursor :: wterl:cursor(),
partition :: integer()}). partition :: integer()}).
@ -85,22 +84,24 @@ capabilities(_, _) ->
-spec start(integer(), config()) -> {ok, state()} | {error, term()}. -spec start(integer(), config()) -> {ok, state()} | {error, term()}.
start(Partition, Config) -> start(Partition, Config) ->
%% Get the data root directory %% Get the data root directory
Table = "lsm:wt" ++ integer_to_list(Partition),
case app_helper:get_prop_or_env(data_root, Config, wterl) of case app_helper:get_prop_or_env(data_root, Config, wterl) of
undefined -> undefined ->
lager:error("Failed to create wterl dir: data_root is not set"), lager:error("Failed to create wterl dir: data_root is not set"),
{error, data_root_unset}; {error, data_root_unset};
DataRoot -> DataRoot ->
AppStart = case application:start(wterl) of Started =
case application:start(wterl) of
ok -> ok ->
ok; wterl_conn:is_open();
{error, {already_started, _Pid}}=_AppStartError -> {error, {already_started, _}} ->
ok; wterl_conn:is_open();
{error, Reason}=AppStartError -> {error, Reason1} ->
lager:error("Failed to start wterl: ~p", [Reason]), lager:error("Failed to start wterl: ~p", [Reason1]),
AppStartError {error, Reason1}
end, end,
case AppStart of case Started of
ok -> false ->
ok = filelib:ensure_dir(filename:join(DataRoot, "x")), ok = filelib:ensure_dir(filename:join(DataRoot, "x")),
SessionMax = SessionMax =
case app_helper:get_prop_or_env(ring_creation_size, Config, riak_core) of case app_helper:get_prop_or_env(ring_creation_size, Config, riak_core) of
@ -108,7 +109,8 @@ start(Partition, Config) ->
RingSize when RingSize < 512 -> 1024; RingSize when RingSize < 512 -> 1024;
RingSize -> RingSize * 2 RingSize -> RingSize * 2
end, end,
ConnectionOpts = [Config, case wterl_conn:open(DataRoot,
[Config,
{create, true}, {create, true},
{sync, false}, {sync, false},
{logging, true}, {logging, true},
@ -117,41 +119,26 @@ start(Partition, Config) ->
{cache_size, size_cache(Config)}, {cache_size, size_cache(Config)},
{checkpoint, [{wait, 1}]}, % sec {checkpoint, [{wait, 1}]}, % sec
{statistics_log, [{wait, 30}]} % sec {statistics_log, [{wait, 30}]} % sec
%% {verbose, ]) of
%% ["block", "shared_cache", "ckpt", "evict", {ok, _Connection} ->
%% "evictserver", "fileops", "hazard", "lsm", {ok, #state{partition=Partition, table=Table}};
%% "mutex", "read", "readserver", "reconcile", {error, Reason2} ->
%% "salvage", "verify", "write"]} lager:error("Failed to establish a WiredTiger connection, wterl backend unable to start: ~p\n", [Reason2]),
], {error, Reason2}
case wterl_conn:open(DataRoot, ConnectionOpts) of
{ok, ConnRef} ->
Table = "lsm:wt" ++ integer_to_list(Partition),
SessionOpenOpts = [{isolation, "snapshot"}],
{ok, SRef} = wterl:session_open(ConnRef, wterl:config_to_bin(SessionOpenOpts)),
SessionOpts = [%TODO {block_compressor, "snappy"},
{internal_page_max, "128K"},
{leaf_page_max, "256K"},
{lsm_chunk_size, "256MB"},
{lsm_bloom_config, [{leaf_page_max, "16MB"}]} ],
ok = wterl:session_create(SRef, Table, wterl:config_to_bin(SessionOpts)),
{ok, #state{conn=ConnRef,
table=Table,
session=SRef,
partition=Partition}};
{error, ConnReason}=ConnError ->
lager:error("Failed to start wterl backend: ~p\n", [ConnReason]),
ConnError
end; end;
Error -> true ->
Error {ok, #state{partition=Partition, table=Table}};
{error, Reason3} ->
{error, Reason3}
end end
end. end.
%% @doc Stop the wterl backend %% @doc Stop the wterl backend
-spec stop(state()) -> ok. -spec stop(state()) -> ok.
stop(#state{conn=ConnRef, session=SRef, cursor=undefined}) -> stop(#state{session=undefined, cursor=undefined}) ->
ok = wterl:session_close(SRef), ok;
wterl_conn:close(ConnRef); stop(#state{session=Session, cursor=undefined}) ->
ok = wterl:session_close(Session);
stop(#state{cursor=Cursor}=State) -> stop(#state{cursor=Cursor}=State) ->
ok = wterl:cursor_close(Cursor), ok = wterl:cursor_close(Cursor),
stop(State#state{cursor=undefined}). stop(State#state{cursor=undefined}).
@ -161,16 +148,17 @@ stop(#state{cursor=Cursor}=State) ->
{ok, any(), state()} | {ok, any(), state()} |
{ok, not_found, state()} | {ok, not_found, state()} |
{error, term(), state()}. {error, term(), state()}.
get(Bucket, Key, #state{session=SRef, table=Table}=State) -> get(Bucket, Key, #state{session=undefined, cursor=undefined}=State) ->
get(Bucket, Key, establish_connection(State));
get(Bucket, Key, #state{cursor=Cursor}=State) ->
WTKey = to_object_key(Bucket, Key), WTKey = to_object_key(Bucket, Key),
Cursor = shared_cursor(SRef, Table, State),
case wterl:cursor_search(Cursor, WTKey) of case wterl:cursor_search(Cursor, WTKey) of
{ok, Value} -> {ok, Value} ->
{ok, Value, State#state{cursor=Cursor}}; {ok, Value, State};
not_found -> not_found ->
{error, not_found, State#state{cursor=Cursor}}; {error, not_found, State};
{error, Reason} -> {error, Reason} ->
{error, Reason, State#state{cursor=Cursor}} {error, Reason, State}
end. end.
%% @doc Insert an object into the wterl backend. %% @doc Insert an object into the wterl backend.
@ -181,14 +169,15 @@ get(Bucket, Key, #state{session=SRef, table=Table}=State) ->
-spec put(riak_object:bucket(), riak_object:key(), [index_spec()], binary(), state()) -> -spec put(riak_object:bucket(), riak_object:key(), [index_spec()], binary(), state()) ->
{ok, state()} | {ok, state()} |
{error, term(), state()}. {error, term(), state()}.
put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{table=Table, session=SRef}=State) -> put(Bucket, PrimaryKey, IndexSpecs, Val, #state{session=undefined, cursor=undefined}=State) ->
put(Bucket, PrimaryKey, IndexSpecs, Val, establish_connection(State));
put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{cursor=Cursor}=State) ->
WTKey = to_object_key(Bucket, PrimaryKey), WTKey = to_object_key(Bucket, PrimaryKey),
Cursor = shared_cursor(SRef, Table, State),
case wterl:cursor_insert(Cursor, WTKey, Val) of case wterl:cursor_insert(Cursor, WTKey, Val) of
ok -> ok ->
{ok, State#state{cursor=Cursor}}; {ok, State};
{error, Reason} -> {error, Reason} ->
{error, Reason, State#state{cursor=Cursor}} {error, Reason, State}
end. end.
%% @doc Delete an object from the wterl backend %% @doc Delete an object from the wterl backend
@ -198,14 +187,15 @@ put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{table=Table, session=SRef}=Stat
-spec delete(riak_object:bucket(), riak_object:key(), [index_spec()], state()) -> -spec delete(riak_object:bucket(), riak_object:key(), [index_spec()], state()) ->
{ok, state()} | {ok, state()} |
{error, term(), state()}. {error, term(), state()}.
delete(Bucket, Key, _IndexSpecs, #state{table=Table, session=SRef}=State) -> delete(Bucket, Key, IndexSpecs, #state{session=undefined, cursor=undefined}=State) ->
delete(Bucket, Key, IndexSpecs, establish_connection(State));
delete(Bucket, Key, _IndexSpecs, #state{cursor=Cursor}=State) ->
WTKey = to_object_key(Bucket, Key), WTKey = to_object_key(Bucket, Key),
Cursor = shared_cursor(SRef, Table, State),
case wterl:cursor_remove(Cursor, WTKey) of case wterl:cursor_remove(Cursor, WTKey) of
ok -> ok ->
{ok, State#state{cursor=Cursor}}; {ok, State};
{error, Reason} -> {error, Reason} ->
{error, Reason, State#state{cursor=Cursor}} {error, Reason, State}
end. end.
%% @doc Fold over all the buckets %% @doc Fold over all the buckets
@ -213,12 +203,16 @@ delete(Bucket, Key, _IndexSpecs, #state{table=Table, session=SRef}=State) ->
any(), any(),
[], [],
state()) -> {ok, any()} | {async, fun()}. state()) -> {ok, any()} | {async, fun()}.
fold_buckets(FoldBucketsFun, Acc, Opts, #state{conn=ConnRef, table=Table}) -> fold_buckets(FoldBucketsFun, Acc, Opts, #state{table=Table}) ->
{ok, Connection} = wterl_conn:get(),
FoldFun = fold_buckets_fun(FoldBucketsFun), FoldFun = fold_buckets_fun(FoldBucketsFun),
BucketFolder = BucketFolder =
fun() -> fun() ->
{ok, SRef} = wterl:session_open(ConnRef), {ok, Session} = wterl:session_open(Connection),
{ok, Cursor} = wterl:cursor_open(SRef, Table), case wterl:cursor_open(Session, Table) of
{error, "No such file or directory"} ->
Acc;
{ok, Cursor} ->
try try
{FoldResult, _} = {FoldResult, _} =
wterl:fold_keys(Cursor, FoldFun, {Acc, []}), wterl:fold_keys(Cursor, FoldFun, {Acc, []}),
@ -228,7 +222,8 @@ fold_buckets(FoldBucketsFun, Acc, Opts, #state{conn=ConnRef, table=Table}) ->
AccFinal AccFinal
after after
ok = wterl:cursor_close(Cursor), ok = wterl:cursor_close(Cursor),
ok = wterl:session_close(SRef) ok = wterl:session_close(Session)
end
end end
end, end,
case lists:member(async_fold, Opts) of case lists:member(async_fold, Opts) of
@ -243,7 +238,9 @@ fold_buckets(FoldBucketsFun, Acc, Opts, #state{conn=ConnRef, table=Table}) ->
any(), any(),
[{atom(), term()}], [{atom(), term()}],
state()) -> {ok, term()} | {async, fun()}. state()) -> {ok, term()} | {async, fun()}.
fold_keys(FoldKeysFun, Acc, Opts, #state{conn=ConnRef, table=Table}) -> fold_keys(FoldKeysFun, Acc, Opts, #state{table=Table}) ->
{ok, Connection} = wterl_conn:get(),
%% Figure out how we should limit the fold: by bucket, by %% Figure out how we should limit the fold: by bucket, by
%% secondary index, or neither (fold across everything.) %% secondary index, or neither (fold across everything.)
Bucket = lists:keyfind(bucket, 1, Opts), Bucket = lists:keyfind(bucket, 1, Opts),
@ -260,8 +257,11 @@ fold_keys(FoldKeysFun, Acc, Opts, #state{conn=ConnRef, table=Table}) ->
FoldFun = fold_keys_fun(FoldKeysFun, Limiter), FoldFun = fold_keys_fun(FoldKeysFun, Limiter),
KeyFolder = KeyFolder =
fun() -> fun() ->
{ok, SRef} = wterl:session_open(ConnRef), {ok, Session} = wterl:session_open(Connection),
{ok, Cursor} = wterl:cursor_open(SRef, Table), case wterl:cursor_open(Session, Table) of
{error, "No such file or directory"} ->
Acc;
{ok, Cursor} ->
try try
wterl:fold_keys(Cursor, FoldFun, Acc) wterl:fold_keys(Cursor, FoldFun, Acc)
catch catch
@ -269,7 +269,8 @@ fold_keys(FoldKeysFun, Acc, Opts, #state{conn=ConnRef, table=Table}) ->
AccFinal AccFinal
after after
ok = wterl:cursor_close(Cursor), ok = wterl:cursor_close(Cursor),
ok = wterl:session_close(SRef) ok = wterl:session_close(Session)
end
end end
end, end,
case lists:member(async_fold, Opts) of case lists:member(async_fold, Opts) of
@ -284,13 +285,17 @@ fold_keys(FoldKeysFun, Acc, Opts, #state{conn=ConnRef, table=Table}) ->
any(), any(),
[{atom(), term()}], [{atom(), term()}],
state()) -> {ok, any()} | {async, fun()}. state()) -> {ok, any()} | {async, fun()}.
fold_objects(FoldObjectsFun, Acc, Opts, #state{conn=ConnRef, table=Table}) -> fold_objects(FoldObjectsFun, Acc, Opts, #state{table=Table}) ->
{ok, Connection} = wterl_conn:get(),
Bucket = proplists:get_value(bucket, Opts), Bucket = proplists:get_value(bucket, Opts),
FoldFun = fold_objects_fun(FoldObjectsFun, Bucket), FoldFun = fold_objects_fun(FoldObjectsFun, Bucket),
ObjectFolder = ObjectFolder =
fun() -> fun() ->
{ok, SRef} = wterl:session_open(ConnRef), {ok, Session} = wterl:session_open(Connection),
{ok, Cursor} = wterl:cursor_open(SRef, Table), case wterl:cursor_open(Session, Table) of
{error, "No such file or directory"} ->
Acc;
{ok, Cursor} ->
try try
wterl:fold(Cursor, FoldFun, Acc) wterl:fold(Cursor, FoldFun, Acc)
catch catch
@ -298,7 +303,8 @@ fold_objects(FoldObjectsFun, Acc, Opts, #state{conn=ConnRef, table=Table}) ->
AccFinal AccFinal
after after
ok = wterl:cursor_close(Cursor), ok = wterl:cursor_close(Cursor),
ok = wterl:session_close(SRef) ok = wterl:session_close(Session)
end
end end
end, end,
case lists:member(async_fold, Opts) of case lists:member(async_fold, Opts) of
@ -310,8 +316,10 @@ fold_objects(FoldObjectsFun, Acc, Opts, #state{conn=ConnRef, table=Table}) ->
%% @doc Delete all objects from this wterl backend %% @doc Delete all objects from this wterl backend
-spec drop(state()) -> {ok, state()} | {error, term(), state()}. -spec drop(state()) -> {ok, state()} | {error, term(), state()}.
drop(#state{table=Table, session=SRef}=State) -> drop(#state{session=undefined, cursor=undefined}=State) ->
case wterl:session_truncate(SRef, Table) of drop(establish_connection(State));
drop(#state{table=Table, session=Session}=State) ->
case wterl:session_truncate(Session, Table) of
ok -> ok ->
{ok, State}; {ok, State};
Error -> Error ->
@ -321,8 +329,10 @@ drop(#state{table=Table, session=SRef}=State) ->
%% @doc Returns true if this wterl backend contains any %% @doc Returns true if this wterl backend contains any
%% non-tombstone values; otherwise returns false. %% non-tombstone values; otherwise returns false.
-spec is_empty(state()) -> boolean(). -spec is_empty(state()) -> boolean().
is_empty(#state{table=Table, session=SRef}) -> is_empty(#state{session=undefined, cursor=undefined}=State) ->
{ok, Cursor} = wterl:cursor_open(SRef, Table), is_empty(establish_connection(State));
is_empty(#state{table=Table, session=Session}) ->
{ok, Cursor} = wterl:cursor_open(Session, Table),
try try
not_found =:= wterl:cursor_next(Cursor) not_found =:= wterl:cursor_next(Cursor)
after after
@ -331,8 +341,10 @@ is_empty(#state{table=Table, session=SRef}) ->
%% @doc Get the status information for this wterl backend %% @doc Get the status information for this wterl backend
-spec status(state()) -> [{atom(), term()}]. -spec status(state()) -> [{atom(), term()}].
status(#state{table=Table, session=SRef}) -> status(#state{session=undefined, cursor=undefined}=State) ->
{ok, Cursor} = wterl:cursor_open(SRef, "statistics:"++Table), status(establish_connection(State));
status(#state{table=Table, session=Session}) ->
{ok, Cursor} = wterl:cursor_open(Session, "statistics:"++Table),
try try
Stats = fetch_status(Cursor), Stats = fetch_status(Cursor),
[{stats, Stats}] [{stats, Stats}]
@ -350,18 +362,35 @@ callback(_Ref, _Msg, State) ->
%% Internal functions %% Internal functions
%% =================================================================== %% ===================================================================
shared_cursor(SRef, Table, #state{cursor=undefined}=State) -> establish_connection(#state{table=Table, session=undefined, cursor=undefined}=State) ->
case wterl:cursor_open(SRef, Table) of {ok, Connection} = wterl_conn:get(),
case wterl:session_open(Connection, wterl:config_to_bin([{isolation, "snapshot"}])) of
{ok, Session} ->
SessionOpts =
[%TODO {block_compressor, "snappy"},
{internal_page_max, "128K"},
{leaf_page_max, "256K"},
{lsm_chunk_size, "256MB"},
{lsm_bloom_config, [{leaf_page_max, "16MB"}]} ],
case wterl:session_create(Session, Table, wterl:config_to_bin(SessionOpts)) of
ok ->
case wterl:cursor_open(Session, Table) of
{ok, Cursor} -> {ok, Cursor} ->
Cursor; State#state{session=Session, cursor=Cursor};
{error, Reason} -> {error, Reason} ->
lager:error("Failed to open WT cursor on ~p because: ~p", [Table, Reason]), lager:error("Failed to open WiredTiger cursor on ~p because: ~p", [Table, Reason]),
undefined State
end; end;
{error, Reason} ->
shared_cursor(_SRef, _Table, #state{cursor=Cursor}) -> lager:error("Failed to start wterl backend: ~p\n", [Reason]),
Cursor. State
end;
{error, Reason} ->
lager:error("Failed to open a WiredTiger session: ~p\n", [Reason]),
State
end;
establish_connection(State) ->
State.
%% @private %% @private
%% Return a function to fold over the buckets on this backend %% Return a function to fold over the buckets on this backend
@ -508,7 +537,7 @@ size_cache(Config) ->
"1GB" "1GB"
end, end,
application:set_env(wterl, cache_size, FinalGuess), application:set_env(wterl, cache_size, FinalGuess),
lager:warning("Using best-guess cache size of ~p for WiredTiger storage backend.", [FinalGuess]), lager:warning("Using cache size of ~p for WiredTiger storage backend.", [FinalGuess]),
FinalGuess; FinalGuess;
Value -> Value ->
Value Value

View file

@ -36,9 +36,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]). terminate/2, code_change/3]).
-record(state, { -record(state, { conn :: wterl:connection() }).
conn :: wterl:connection()
}).
-type config_list() :: [{atom(), any()}]. -type config_list() :: [{atom(), any()}].