diff --git a/src/riak_kv_wterl_backend.erl b/src/riak_kv_wterl_backend.erl index d589c97..eebe69e 100644 --- a/src/riak_kv_wterl_backend.erl +++ b/src/riak_kv_wterl_backend.erl @@ -24,8 +24,6 @@ -behavior(temp_riak_kv_backend). -author('Steve Vinoski '). --compile([{parse_transform, lager_transform}]). - %% KV Backend API -export([api_version/0, capabilities/1, @@ -53,8 +51,8 @@ -define(CAPABILITIES, [async_fold]). -record(state, {table :: string(), - session :: wterl:session(), - partition :: integer()}). + connection :: wterl:connection(), + session :: wterl:session()}). -type state() :: #state{}. -type config() :: [{atom(), term()}]. @@ -82,54 +80,22 @@ capabilities(_, _) -> %% @doc Start the wterl backend -spec start(integer(), config()) -> {ok, state()} | {error, term()}. start(Partition, Config) -> - %% Get the data root directory Table = "lsm:wt" ++ integer_to_list(Partition), - case app_helper:get_prop_or_env(data_root, Config, wterl) of - undefined -> - lager:error("Failed to create wterl dir: data_root is not set"), - {error, data_root_unset}; - DataRoot -> - Started = - case application:start(wterl) of - ok -> - wterl_conn:is_open(); - {error, {already_started, _}} -> - wterl_conn:is_open(); - {error, Reason1} -> - lager:error("Failed to start wterl: ~p", [Reason1]), - {error, Reason1} - end, - case Started of - false -> - ok = filelib:ensure_dir(filename:join(DataRoot, "x")), - SessionMax = - case app_helper:get_prop_or_env(ring_creation_size, Config, riak_core) of - undefined -> 1024; - RingSize when RingSize < 512 -> 1024; - RingSize -> RingSize * 2 - end, - case wterl_conn:open(DataRoot, - [Config, - {create, true}, - {sync, false}, - {logging, true}, - {transactional, true}, - {session_max, SessionMax}, - {cache_size, size_cache(Config)}, - {checkpoint, [{wait, 1}]}, % sec - {statistics_log, [{wait, 30}]} % sec - ]) of - {ok, _Connection} -> - {ok, #state{partition=Partition, table=Table}}; - {error, Reason2} -> - lager:error("Failed to establish a WiredTiger connection, wterl backend unable to start: ~p\n", [Reason2]), - {error, Reason2} - end; - true -> - {ok, #state{partition=Partition, table=Table}}; - {error, Reason3} -> - {error, Reason3} - end + AppStart = + case application:start(wterl) of + ok -> + ok; + {error, {already_started, _}} -> + ok; + {error, Reason1} -> + lager:error("Failed to start wterl: ~p", [Reason1]), + {error, Reason1} + end, + case AppStart of + ok -> + establish_connection(Table, Config); + {error, Reason2} -> + {error, Reason2} end. %% @doc Stop the wterl backend @@ -145,7 +111,7 @@ stop(#state{session=Session}) -> {ok, not_found, state()} | {error, term(), state()}. get(Bucket, Key, #state{session=undefined}=State) -> - get(Bucket, Key, establish_connection(State)); + get(Bucket, Key, establish_session(State)); get(Bucket, Key, #state{session=Session, table=Table}=State) -> WTKey = to_object_key(Bucket, Key), case wterl:session_get(Session, Table, WTKey) of @@ -166,7 +132,7 @@ get(Bucket, Key, #state{session=Session, table=Table}=State) -> {ok, state()} | {error, term(), state()}. put(Bucket, PrimaryKey, IndexSpecs, Val, #state{session=undefined}=State) -> - put(Bucket, PrimaryKey, IndexSpecs, Val, establish_connection(State)); + put(Bucket, PrimaryKey, IndexSpecs, Val, establish_session(State)); put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{session=Session, table=Table}=State) -> WTKey = to_object_key(Bucket, PrimaryKey), case wterl:session_put(Session, Table, WTKey, Val) of @@ -184,7 +150,7 @@ put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{session=Session, table=Table}=S {ok, state()} | {error, term(), state()}. delete(Bucket, Key, IndexSpecs, #state{session=undefined}=State) -> - delete(Bucket, Key, IndexSpecs, establish_connection(State)); + delete(Bucket, Key, IndexSpecs, establish_session(State)); delete(Bucket, Key, _IndexSpecs, #state{session=Session, table=Table}=State) -> WTKey = to_object_key(Bucket, Key), case wterl:session_delete(Session, Table, WTKey) of @@ -313,7 +279,7 @@ fold_objects(FoldObjectsFun, Acc, Opts, #state{table=Table}) -> %% @doc Delete all objects from this wterl backend -spec drop(state()) -> {ok, state()} | {error, term(), state()}. drop(#state{session=undefined}=State) -> - drop(establish_connection(State)); + drop(establish_session(State)); drop(#state{table=Table, session=Session}=State) -> case wterl:session_truncate(Session, Table) of ok -> @@ -326,7 +292,7 @@ drop(#state{table=Table, session=Session}=State) -> %% non-tombstone values; otherwise returns false. -spec is_empty(state()) -> boolean(). is_empty(#state{session=undefined}=State) -> - is_empty(establish_connection(State)); + is_empty(establish_session(State)); is_empty(#state{table=Table, session=Session}) -> {ok, Cursor} = wterl:cursor_open(Session, Table), try @@ -338,7 +304,7 @@ is_empty(#state{table=Table, session=Session}) -> %% @doc Get the status information for this wterl backend -spec status(state()) -> [{atom(), term()}]. status(#state{session=undefined}=State) -> - status(establish_connection(State)); + status(establish_session(State)); status(#state{table=Table, session=Session}) -> {ok, Cursor} = wterl:cursor_open(Session, "statistics:"++Table), try @@ -358,7 +324,40 @@ callback(_Ref, _Msg, State) -> %% Internal functions %% =================================================================== -establish_connection(#state{table=Table, session=undefined}=State) -> +establish_connection(Table, Config) -> + %% Get the data root directory + case app_helper:get_prop_or_env(data_root, Config, wterl) of + undefined -> + lager:error("Failed to create wterl dir: data_root is not set"), + {error, data_root_unset}; + DataRoot -> + ok = filelib:ensure_dir(filename:join(DataRoot, "x")), + SessionMax = + case app_helper:get_prop_or_env(ring_creation_size, Config, riak_core) of + undefined -> 1024; + RingSize when RingSize < 512 -> 1024; + RingSize -> RingSize * 2 + end, + case wterl_conn:open(DataRoot, + [Config, + {create, true}, + {sync, false}, + {logging, true}, + {transactional, true}, + {session_max, SessionMax}, + {cache_size, size_cache(Config)}, + {checkpoint, [{wait, 1}]}, % sec + {statistics_log, [{wait, 30}]} % sec + ]) of + {ok, Connection} -> + {ok, #state{table=Table, connection=Connection}}; + {error, Reason2} -> + lager:error("Failed to establish a WiredTiger connection, wterl backend unable to start: ~p\n", [Reason2]), + {error, Reason2} + end + end. + +establish_session(#state{table=Table, session=undefined}=State) -> {ok, Connection} = wterl_conn:get(), case wterl:session_open(Connection, wterl:config_to_bin([{isolation, "snapshot"}])) of {ok, Session} -> @@ -379,7 +378,7 @@ establish_connection(#state{table=Table, session=undefined}=State) -> lager:error("Failed to open a WiredTiger session: ~p\n", [Reason]), State end; -establish_connection(State) -> +establish_session(State) -> State. %% @private