diff --git a/src/riak_kv_wterl_backend.erl b/src/riak_kv_wterl_backend.erl index f4401c8..51a11cd 100644 --- a/src/riak_kv_wterl_backend.erl +++ b/src/riak_kv_wterl_backend.erl @@ -55,7 +55,7 @@ -record(state, {conn :: wterl:connection(), table :: string(), session :: wterl:session(), - cursors :: ets:tid(), + cursor :: wterl:cursor(), partition :: integer()}). -type state() :: #state{}. @@ -93,17 +93,17 @@ start(Partition, Config) -> AppStart = case application:start(wterl) of ok -> ok; - {error, {already_started, _}} -> + {error, {already_started, _Pid}}=_AppStartError -> ok; - {error, Reason} -> + {error, Reason}=AppStartError -> lager:error("Failed to start wterl: ~p", [Reason]), - {error, Reason} + AppStartError end, case AppStart of ok -> ok = filelib:ensure_dir(filename:join(DataRoot, "x")), SessionMax = - case app_helper:get_env(riak_core, ring_creation_size) of + case app_helper:get_prop_or_env(ring_creation_size, Config, riak_core) of undefined -> 1024; RingSize when RingSize < 512 -> 1024; RingSize -> RingSize * 2 @@ -149,15 +149,12 @@ start(Partition, Config) -> %% @doc Stop the wterl backend -spec stop(state()) -> ok. -stop(#state{conn=ConnRef, session=SRef, cursors=undefined}) -> +stop(#state{conn=ConnRef, session=SRef, cursor=undefined}) -> ok = wterl:session_close(SRef), wterl_conn:close(ConnRef); -stop(#state{cursors=Cursors}=State) -> - ets:foldl(fun({_Table, Cursor}, _) -> - ok = wterl:cursor_close(Cursor) - end, true, Cursors), - ets:delete(Cursors), - stop(State#state{cursors=undefined}). +stop(#state{cursor=Cursor}=State) -> + ok = wterl:cursor_close(Cursor), + stop(State#state{cursor=undefined}). %% @doc Retrieve an object from the wterl backend -spec get(riak_object:bucket(), riak_object:key(), state()) -> @@ -166,13 +163,14 @@ stop(#state{cursors=Cursors}=State) -> {error, term(), state()}. get(Bucket, Key, #state{session=SRef, table=Table}=State) -> WTKey = to_object_key(Bucket, Key), - case wterl:cursor_search(shared_cursor(SRef, Table, State), WTKey) of + Cursor = shared_cursor(SRef, Table, State), + case wterl:cursor_search(Cursor, WTKey) of {ok, Value} -> - {ok, Value, State}; + {ok, Value, State#state{cursor=Cursor}}; not_found -> - {error, not_found, State}; + {error, not_found, State#state{cursor=Cursor}}; {error, Reason} -> - {error, Reason, State} + {error, Reason, State#state{cursor=Cursor}} end. %% @doc Insert an object into the wterl backend. @@ -185,11 +183,12 @@ get(Bucket, Key, #state{session=SRef, table=Table}=State) -> {error, term(), state()}. put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{table=Table, session=SRef}=State) -> WTKey = to_object_key(Bucket, PrimaryKey), - case wterl:cursor_insert(shared_cursor(SRef, Table, State), WTKey, Val) of + Cursor = shared_cursor(SRef, Table, State), + case wterl:cursor_insert(Cursor, WTKey, Val) of ok -> - {ok, State}; + {ok, State#state{cursor=Cursor}}; {error, Reason} -> - {error, Reason, State} + {error, Reason, State#state{cursor=Cursor}} end. %% @doc Delete an object from the wterl backend @@ -201,11 +200,12 @@ put(Bucket, PrimaryKey, _IndexSpecs, Val, #state{table=Table, session=SRef}=Stat {error, term(), state()}. delete(Bucket, Key, _IndexSpecs, #state{table=Table, session=SRef}=State) -> WTKey = to_object_key(Bucket, Key), - case wterl:cursor_remove(shared_cursor(SRef, Table, State), WTKey) of + Cursor = shared_cursor(SRef, Table, State), + case wterl:cursor_remove(Cursor, WTKey) of ok -> - {ok, State}; + {ok, State#state{cursor=Cursor}}; {error, Reason} -> - {error, Reason, State} + {error, Reason, State#state{cursor=Cursor}} end. %% @doc Fold over all the buckets @@ -350,18 +350,17 @@ callback(_Ref, _Msg, State) -> %% Internal functions %% =================================================================== -shared_cursor(SRef, Table, #state{cursors=undefined}=State) -> - Cursors = ets:new(?MODULE, []), - shared_cursor(SRef, Table, State#state{cursors=Cursors}); -shared_cursor(SRef, Table, #state{cursors=Cursors}=State) -> - case ets:lookup(Cursors, Table) of - [{Table, Cursor}] -> - {Cursor, State}; - _ -> - Cursor = wterl:cursor_open(SRef, Table), - ets:insert(Cursors, {Table, Cursor}), - {Cursor, State} - end. +shared_cursor(SRef, Table, #state{cursor=undefined}=State) -> + case wterl:cursor_open(SRef, Table) of + {ok, Cursor} -> + Cursor; + {error, Reason} -> + lager:error("Failed to open WT cursor on ~p because: ~p", [Table, Reason]), + undefined + end; + +shared_cursor(_SRef, _Table, #state{cursor=Cursor}) -> + Cursor. %% @private @@ -482,8 +481,6 @@ fetch_status(Cursor, {ok, Stat}, Acc) -> size_cache(Config) -> Size = case app_helper:get_prop_or_env(cache_size, Config, wterl) of - {ok, Value} -> - Value; undefined -> RunningApps = application:which_applications(), FinalGuess = @@ -510,9 +507,11 @@ size_cache(Config) -> false -> "1GB" end, - application:set_env(wt, cache_size, FinalGuess), + application:set_env(wterl, cache_size, FinalGuess), lager:warning("Using best-guess cache size of ~p for WiredTiger storage backend.", [FinalGuess]), - FinalGuess + FinalGuess; + Value -> + Value end, Size.