Break out connection and session startup into functions.
This commit is contained in:
parent
ce79d22659
commit
24527cf453
1 changed files with 58 additions and 59 deletions
|
@ -24,8 +24,6 @@
|
||||||
-behavior(temp_riak_kv_backend).
|
-behavior(temp_riak_kv_backend).
|
||||||
-author('Steve Vinoski <steve@basho.com>').
|
-author('Steve Vinoski <steve@basho.com>').
|
||||||
|
|
||||||
-compile([{parse_transform, lager_transform}]).
|
|
||||||
|
|
||||||
%% KV Backend API
|
%% KV Backend API
|
||||||
-export([api_version/0,
|
-export([api_version/0,
|
||||||
capabilities/1,
|
capabilities/1,
|
||||||
|
@ -53,8 +51,8 @@
|
||||||
-define(CAPABILITIES, [async_fold]).
|
-define(CAPABILITIES, [async_fold]).
|
||||||
|
|
||||||
-record(state, {table :: string(),
|
-record(state, {table :: string(),
|
||||||
session :: wterl:session(),
|
connection :: wterl:connection(),
|
||||||
partition :: integer()}).
|
session :: wterl:session()}).
|
||||||
|
|
||||||
-type state() :: #state{}.
|
-type state() :: #state{}.
|
||||||
-type config() :: [{atom(), term()}].
|
-type config() :: [{atom(), term()}].
|
||||||
|
@ -82,54 +80,22 @@ capabilities(_, _) ->
|
||||||
%% @doc Start the wterl backend
|
%% @doc Start the wterl backend
|
||||||
-spec start(integer(), config()) -> {ok, state()} | {error, term()}.
|
-spec start(integer(), config()) -> {ok, state()} | {error, term()}.
|
||||||
start(Partition, Config) ->
|
start(Partition, Config) ->
|
||||||
%% Get the data root directory
|
|
||||||
Table = "lsm:wt" ++ integer_to_list(Partition),
|
Table = "lsm:wt" ++ integer_to_list(Partition),
|
||||||
case app_helper:get_prop_or_env(data_root, Config, wterl) of
|
AppStart =
|
||||||
undefined ->
|
case application:start(wterl) of
|
||||||
lager:error("Failed to create wterl dir: data_root is not set"),
|
ok ->
|
||||||
{error, data_root_unset};
|
ok;
|
||||||
DataRoot ->
|
{error, {already_started, _}} ->
|
||||||
Started =
|
ok;
|
||||||
case application:start(wterl) of
|
{error, Reason1} ->
|
||||||
ok ->
|
lager:error("Failed to start wterl: ~p", [Reason1]),
|
||||||
wterl_conn:is_open();
|
{error, Reason1}
|
||||||
{error, {already_started, _}} ->
|
end,
|
||||||
wterl_conn:is_open();
|
case AppStart of
|
||||||
{error, Reason1} ->
|
ok ->
|
||||||
lager:error("Failed to start wterl: ~p", [Reason1]),
|
establish_connection(Table, Config);
|
||||||
{error, Reason1}
|
{error, Reason2} ->
|
||||||
end,
|
{error, Reason2}
|
||||||
case Started of
|
|
||||||
false ->
|
|
||||||
ok = filelib:ensure_dir(filename:join(DataRoot, "x")),
|
|
||||||
SessionMax =
|
|
||||||
case app_helper:get_prop_or_env(ring_creation_size, Config, riak_core) of
|
|
||||||
undefined -> 1024;
|
|
||||||
RingSize when RingSize < 512 -> 1024;
|
|
||||||
RingSize -> RingSize * 2
|
|
||||||
end,
|
|
||||||
case wterl_conn:open(DataRoot,
|
|
||||||
[Config,
|
|
||||||
{create, true},
|
|
||||||
{sync, false},
|
|
||||||
{logging, true},
|
|
||||||
{transactional, true},
|
|
||||||
{session_max, SessionMax},
|
|
||||||
{cache_size, size_cache(Config)},
|
|
||||||
{checkpoint, [{wait, 1}]}, % sec
|
|
||||||
{statistics_log, [{wait, 30}]} % sec
|
|
||||||
]) of
|
|
||||||
{ok, _Connection} ->
|
|
||||||
{ok, #state{partition=Partition, table=Table}};
|
|
||||||
{error, Reason2} ->
|
|
||||||
lager:error("Failed to establish a WiredTiger connection, wterl backend unable to start: ~p\n", [Reason2]),
|
|
||||||
{error, Reason2}
|
|
||||||
end;
|
|
||||||
true ->
|
|
||||||
{ok, #state{partition=Partition, table=Table}};
|
|
||||||
{error, Reason3} ->
|
|
||||||
{error, Reason3}
|
|
||||||
end
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% @doc Stop the wterl backend
|
%% @doc Stop the wterl backend
|
||||||
|
@ -145,7 +111,7 @@ stop(#state{session=Session}) ->
|
||||||
{ok, not_found, state()} |
|
{ok, not_found, state()} |
|
||||||
{error, term(), state()}.
|
{error, term(), state()}.
|
||||||
get(Bucket, Key, #state{session=undefined}=State) ->
|
get(Bucket, Key, #state{session=undefined}=State) ->
|
||||||
get(Bucket, Key, establish_connection(State));
|
get(Bucket, Key, establish_session(State));
|
||||||
get(Bucket, Key, #state{session=Session, table=Table}=State) ->
|
get(Bucket, Key, #state{session=Session, table=Table}=State) ->
|
||||||
WTKey = to_object_key(Bucket, Key),
|
WTKey = to_object_key(Bucket, Key),
|
||||||
case wterl:session_get(Session, Table, WTKey) of
|
case wterl:session_get(Session, Table, WTKey) of
|
||||||
|
@ -166,7 +132,7 @@ get(Bucket, Key, #state{session=Session, table=Table}=State) ->
|
||||||
{ok, state()} |
|
{ok, state()} |
|
||||||
{error, term(), state()}.
|
{error, term(), state()}.
|
||||||
put(Bucket, PrimaryKey, IndexSpecs, Val, #state{session=undefined}=State) ->
|
put(Bucket, PrimaryKey, IndexSpecs, Val, #state{session=undefined}=State) ->
|
||||||
put(Bucket, PrimaryKey, IndexSpecs, Val, establish_connection(State));
|
put(Bucket, PrimaryKey, IndexSpecs, Val, establish_session(State));
|
||||||
put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{session=Session, table=Table}=State) ->
|
put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{session=Session, table=Table}=State) ->
|
||||||
WTKey = to_object_key(Bucket, PrimaryKey),
|
WTKey = to_object_key(Bucket, PrimaryKey),
|
||||||
case wterl:session_put(Session, Table, WTKey, Val) of
|
case wterl:session_put(Session, Table, WTKey, Val) of
|
||||||
|
@ -184,7 +150,7 @@ put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{session=Session, table=Table}=S
|
||||||
{ok, state()} |
|
{ok, state()} |
|
||||||
{error, term(), state()}.
|
{error, term(), state()}.
|
||||||
delete(Bucket, Key, IndexSpecs, #state{session=undefined}=State) ->
|
delete(Bucket, Key, IndexSpecs, #state{session=undefined}=State) ->
|
||||||
delete(Bucket, Key, IndexSpecs, establish_connection(State));
|
delete(Bucket, Key, IndexSpecs, establish_session(State));
|
||||||
delete(Bucket, Key, _IndexSpecs, #state{session=Session, table=Table}=State) ->
|
delete(Bucket, Key, _IndexSpecs, #state{session=Session, table=Table}=State) ->
|
||||||
WTKey = to_object_key(Bucket, Key),
|
WTKey = to_object_key(Bucket, Key),
|
||||||
case wterl:session_delete(Session, Table, WTKey) of
|
case wterl:session_delete(Session, Table, WTKey) of
|
||||||
|
@ -313,7 +279,7 @@ fold_objects(FoldObjectsFun, Acc, Opts, #state{table=Table}) ->
|
||||||
%% @doc Delete all objects from this wterl backend
|
%% @doc Delete all objects from this wterl backend
|
||||||
-spec drop(state()) -> {ok, state()} | {error, term(), state()}.
|
-spec drop(state()) -> {ok, state()} | {error, term(), state()}.
|
||||||
drop(#state{session=undefined}=State) ->
|
drop(#state{session=undefined}=State) ->
|
||||||
drop(establish_connection(State));
|
drop(establish_session(State));
|
||||||
drop(#state{table=Table, session=Session}=State) ->
|
drop(#state{table=Table, session=Session}=State) ->
|
||||||
case wterl:session_truncate(Session, Table) of
|
case wterl:session_truncate(Session, Table) of
|
||||||
ok ->
|
ok ->
|
||||||
|
@ -326,7 +292,7 @@ drop(#state{table=Table, session=Session}=State) ->
|
||||||
%% non-tombstone values; otherwise returns false.
|
%% non-tombstone values; otherwise returns false.
|
||||||
-spec is_empty(state()) -> boolean().
|
-spec is_empty(state()) -> boolean().
|
||||||
is_empty(#state{session=undefined}=State) ->
|
is_empty(#state{session=undefined}=State) ->
|
||||||
is_empty(establish_connection(State));
|
is_empty(establish_session(State));
|
||||||
is_empty(#state{table=Table, session=Session}) ->
|
is_empty(#state{table=Table, session=Session}) ->
|
||||||
{ok, Cursor} = wterl:cursor_open(Session, Table),
|
{ok, Cursor} = wterl:cursor_open(Session, Table),
|
||||||
try
|
try
|
||||||
|
@ -338,7 +304,7 @@ is_empty(#state{table=Table, session=Session}) ->
|
||||||
%% @doc Get the status information for this wterl backend
|
%% @doc Get the status information for this wterl backend
|
||||||
-spec status(state()) -> [{atom(), term()}].
|
-spec status(state()) -> [{atom(), term()}].
|
||||||
status(#state{session=undefined}=State) ->
|
status(#state{session=undefined}=State) ->
|
||||||
status(establish_connection(State));
|
status(establish_session(State));
|
||||||
status(#state{table=Table, session=Session}) ->
|
status(#state{table=Table, session=Session}) ->
|
||||||
{ok, Cursor} = wterl:cursor_open(Session, "statistics:"++Table),
|
{ok, Cursor} = wterl:cursor_open(Session, "statistics:"++Table),
|
||||||
try
|
try
|
||||||
|
@ -358,7 +324,40 @@ callback(_Ref, _Msg, State) ->
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%% ===================================================================
|
%% ===================================================================
|
||||||
|
|
||||||
establish_connection(#state{table=Table, session=undefined}=State) ->
|
establish_connection(Table, Config) ->
|
||||||
|
%% Get the data root directory
|
||||||
|
case app_helper:get_prop_or_env(data_root, Config, wterl) of
|
||||||
|
undefined ->
|
||||||
|
lager:error("Failed to create wterl dir: data_root is not set"),
|
||||||
|
{error, data_root_unset};
|
||||||
|
DataRoot ->
|
||||||
|
ok = filelib:ensure_dir(filename:join(DataRoot, "x")),
|
||||||
|
SessionMax =
|
||||||
|
case app_helper:get_prop_or_env(ring_creation_size, Config, riak_core) of
|
||||||
|
undefined -> 1024;
|
||||||
|
RingSize when RingSize < 512 -> 1024;
|
||||||
|
RingSize -> RingSize * 2
|
||||||
|
end,
|
||||||
|
case wterl_conn:open(DataRoot,
|
||||||
|
[Config,
|
||||||
|
{create, true},
|
||||||
|
{sync, false},
|
||||||
|
{logging, true},
|
||||||
|
{transactional, true},
|
||||||
|
{session_max, SessionMax},
|
||||||
|
{cache_size, size_cache(Config)},
|
||||||
|
{checkpoint, [{wait, 1}]}, % sec
|
||||||
|
{statistics_log, [{wait, 30}]} % sec
|
||||||
|
]) of
|
||||||
|
{ok, Connection} ->
|
||||||
|
{ok, #state{table=Table, connection=Connection}};
|
||||||
|
{error, Reason2} ->
|
||||||
|
lager:error("Failed to establish a WiredTiger connection, wterl backend unable to start: ~p\n", [Reason2]),
|
||||||
|
{error, Reason2}
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
|
establish_session(#state{table=Table, session=undefined}=State) ->
|
||||||
{ok, Connection} = wterl_conn:get(),
|
{ok, Connection} = wterl_conn:get(),
|
||||||
case wterl:session_open(Connection, wterl:config_to_bin([{isolation, "snapshot"}])) of
|
case wterl:session_open(Connection, wterl:config_to_bin([{isolation, "snapshot"}])) of
|
||||||
{ok, Session} ->
|
{ok, Session} ->
|
||||||
|
@ -379,7 +378,7 @@ establish_connection(#state{table=Table, session=undefined}=State) ->
|
||||||
lager:error("Failed to open a WiredTiger session: ~p\n", [Reason]),
|
lager:error("Failed to open a WiredTiger session: ~p\n", [Reason]),
|
||||||
State
|
State
|
||||||
end;
|
end;
|
||||||
establish_connection(State) ->
|
establish_session(State) ->
|
||||||
State.
|
State.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
|
|
Loading…
Reference in a new issue