Implement hanoi:destroy/1
Also riak_kv_hanoi_backend:drop/1 (The latter does hanoi:destroy and then re-opens the same store).
This commit is contained in:
parent
96c5ec74c3
commit
f9b7fcf224
6 changed files with 81 additions and 15 deletions
|
@ -32,7 +32,7 @@
|
|||
terminate/2, code_change/3]).
|
||||
|
||||
-export([open/1, open/2, transact/2, close/1, get/2, lookup/2, delete/2, put/3,
|
||||
fold/3, fold_range/4]).
|
||||
fold/3, fold_range/4, destroy/1]).
|
||||
|
||||
-export([get_opt/2, get_opt/3]).
|
||||
|
||||
|
@ -75,6 +75,17 @@ close(Ref) ->
|
|||
exit:{normal, _} -> ok
|
||||
end.
|
||||
|
||||
-spec destroy(Ref::pid()) -> ok.
|
||||
destroy(Ref) ->
|
||||
try
|
||||
gen_server:call(Ref, destroy, infinity)
|
||||
catch
|
||||
exit:{noproc,_} -> ok;
|
||||
exit:noproc -> ok;
|
||||
%% Handle the case where the monitor triggers
|
||||
exit:{normal, _} -> ok
|
||||
end.
|
||||
|
||||
get(Ref,Key) when is_binary(Key) ->
|
||||
gen_server:call(Ref, {get, Key}, infinity).
|
||||
|
||||
|
@ -341,7 +352,13 @@ handle_call(close, _From, State=#state{top=Top}) ->
|
|||
E:R ->
|
||||
error_logger:info_msg("exception from close ~p:~p~n", [E,R]),
|
||||
{stop, normal, ok, State}
|
||||
end.
|
||||
end;
|
||||
|
||||
handle_call(destroy, _From, State=#state{top=Top, nursery=Nursery }) ->
|
||||
ok = hanoi_nursery:destroy(Nursery),
|
||||
ok = hanoi_level:destroy(Top),
|
||||
{stop, normal, ok, State#state{ top=undefined, nursery=undefined, max_level=?TOP_LEVEL }}.
|
||||
|
||||
|
||||
do_put(Key, Value, State=#state{ nursery=Nursery, top=Top }) ->
|
||||
{ok, Nursery2} = hanoi_nursery:add_maybe_flush(Key, Value, Nursery, Top),
|
||||
|
|
|
@ -46,7 +46,7 @@
|
|||
|
||||
-export([open/5, lookup/2, inject/2, close/1, snapshot_range/3, blocking_range/3,
|
||||
begin_incremental_merge/1, await_incremental_merge/1, set_max_level/2,
|
||||
unmerged_count/1]).
|
||||
unmerged_count/1, destroy/1]).
|
||||
|
||||
-include_lib("kernel/include/file.hrl").
|
||||
|
||||
|
@ -112,10 +112,18 @@ close(Ref) ->
|
|||
plain_rpc:call(Ref, close)
|
||||
catch
|
||||
exit:{noproc,_} -> ok;
|
||||
exit:noproc -> ok
|
||||
exit:noproc -> ok;
|
||||
exit:{normal, _} -> ok
|
||||
end.
|
||||
|
||||
|
||||
destroy(Ref) ->
|
||||
try
|
||||
plain_rpc:call(Ref, destroy)
|
||||
catch
|
||||
exit:{noproc,_} -> ok;
|
||||
exit:noproc -> ok;
|
||||
exit:{normal, _} -> ok
|
||||
end.
|
||||
|
||||
snapshot_range(Ref, FoldWorkerPID, Range) ->
|
||||
{ok, Folders} = plain_rpc:call(Ref, {init_snapshot_range_fold, FoldWorkerPID, Range, []}),
|
||||
|
@ -381,8 +389,7 @@ main_loop(State = #state{ next=Next }) ->
|
|||
close_if_defined(State#state.a),
|
||||
close_if_defined(State#state.b),
|
||||
close_if_defined(State#state.c),
|
||||
stop_if_defined(State#state.merge_pid),
|
||||
plain_rpc:send_reply(From, ok),
|
||||
[stop_if_defined(PID) || PID <- [State#state.merge_pid | State#state.folding]],
|
||||
|
||||
%% this is synchronous all the way down, because our
|
||||
%% caller is monitoring *this* proces, and thus the
|
||||
|
@ -391,8 +398,25 @@ main_loop(State = #state{ next=Next }) ->
|
|||
true ->
|
||||
hanoi_level:close(Next)
|
||||
end,
|
||||
plain_rpc:send_reply(From, ok),
|
||||
{ok, closing};
|
||||
|
||||
?CALL(From, destroy) ->
|
||||
destroy_if_defined(State#state.a),
|
||||
destroy_if_defined(State#state.b),
|
||||
destroy_if_defined(State#state.c),
|
||||
[stop_if_defined(PID) || PID <- [State#state.merge_pid | State#state.folding]],
|
||||
|
||||
%% this is synchronous all the way down, because our
|
||||
%% caller is monitoring *this* proces, and thus the
|
||||
%% rpc would fail when we fall off the cliff
|
||||
if Next == undefined -> ok;
|
||||
true ->
|
||||
hanoi_level:destroy(Next)
|
||||
end,
|
||||
plain_rpc:send_reply(From, ok),
|
||||
{ok, destroying};
|
||||
|
||||
?CALL(From, {init_snapshot_range_fold, WorkerPID, Range, List}) when State#state.folding == [] ->
|
||||
|
||||
?log("init_range_fold ~p -> ~p", [Range, WorkerPID]),
|
||||
|
@ -684,6 +708,9 @@ do_lookup(Key, [BT|Rest]) ->
|
|||
close_if_defined(undefined) -> ok;
|
||||
close_if_defined(BT) -> hanoi_reader:close(BT).
|
||||
|
||||
destroy_if_defined(undefined) -> ok;
|
||||
destroy_if_defined(BT) -> hanoi_reader:destroy(BT).
|
||||
|
||||
stop_if_defined(undefined) -> ok;
|
||||
stop_if_defined(MergePid) when is_pid(MergePid) ->
|
||||
erlang:exit(MergePid, shutdown).
|
||||
|
|
|
@ -26,7 +26,7 @@
|
|||
-author('Kresten Krab Thorup <krab@trifork.com>').
|
||||
|
||||
-export([new/2, recover/3, add/3, finish/2, lookup/2, add_maybe_flush/4]).
|
||||
-export([do_level_fold/3, set_max_level/2, transact/3]).
|
||||
-export([do_level_fold/3, set_max_level/2, transact/3, destroy/1]).
|
||||
|
||||
-include("include/hanoi.hrl").
|
||||
-include("hanoi.hrl").
|
||||
|
@ -177,6 +177,19 @@ finish(#nursery{ dir=Dir, cache=Cache, log_file=LogFile,
|
|||
file:delete(LogFileName),
|
||||
ok.
|
||||
|
||||
destroy(#nursery{ dir=Dir, log_file=LogFile }) ->
|
||||
%% first, close the log file
|
||||
if LogFile /= undefined ->
|
||||
ok = file:close(LogFile);
|
||||
true ->
|
||||
ok
|
||||
end,
|
||||
%% then delete it
|
||||
LogFileName = filename:join(Dir, "nursery.log"),
|
||||
file:delete(LogFileName),
|
||||
ok.
|
||||
|
||||
|
||||
add_maybe_flush(Key, Value, Nursery, Top) ->
|
||||
case add(Nursery, Key, Value) of
|
||||
{ok, _} = OK ->
|
||||
|
|
|
@ -30,7 +30,7 @@
|
|||
-include("hanoi.hrl").
|
||||
-include("include/plain_rpc.hrl").
|
||||
|
||||
-export([open/1, open/2,close/1,lookup/2,fold/3,range_fold/4]).
|
||||
-export([open/1, open/2,close/1,lookup/2,fold/3,range_fold/4, destroy/1]).
|
||||
-export([first_node/1,next_node/1]).
|
||||
-export([serialize/1, deserialize/1]).
|
||||
|
||||
|
@ -78,6 +78,10 @@ open(Name, Config) ->
|
|||
{ok, #index{file=File, root=Root, bloom=Bloom, name=Name, config=Config}}
|
||||
end.
|
||||
|
||||
destroy(#index{file=File, name=Name}) ->
|
||||
ok = file:close(File),
|
||||
file:delete(Name).
|
||||
|
||||
serialize(#index{file=File, bloom=undefined }=Index) ->
|
||||
{ok, Position} = file:position(File, cur),
|
||||
ok = file:close(File),
|
||||
|
|
|
@ -59,7 +59,8 @@
|
|||
%-define(CAPABILITIES, [async_fold]).
|
||||
|
||||
-record(state, {tree,
|
||||
partition :: integer()}).
|
||||
partition :: integer(),
|
||||
config :: config() }).
|
||||
|
||||
-type state() :: #state{}.
|
||||
-type config() :: [{atom(), term()}].
|
||||
|
@ -108,7 +109,7 @@ start(Partition, Config) ->
|
|||
{ok, DataDir} ->
|
||||
case hanoi:open(DataDir, Config) of
|
||||
{ok, Tree} ->
|
||||
{ok, #state{tree=Tree, partition=Partition}};
|
||||
{ok, #state{tree=Tree, partition=Partition, config=Config }};
|
||||
{error, OpenReason}=OpenError ->
|
||||
lager:error("Failed to open hanoi: ~p\n", [OpenReason]),
|
||||
OpenError
|
||||
|
@ -290,9 +291,13 @@ fold_objects(FoldObjectsFun, Acc, Opts, #state{tree=Tree}) ->
|
|||
|
||||
%% @doc Delete all objects from this hanoi backend
|
||||
-spec drop(state()) -> {ok, state()} | {error, term(), state()}.
|
||||
drop(#state{}=State) ->
|
||||
%% TODO: not yet implemented
|
||||
{ok, State}.
|
||||
drop(#state{ tree=Tree, partition=Partition, config=Config }=State) ->
|
||||
case hanoi:destroy(Tree) of
|
||||
ok ->
|
||||
start(Partition, Config);
|
||||
{error, Term} ->
|
||||
{error, Term, State}
|
||||
end.
|
||||
|
||||
%% @doc Returns true if this hanoi backend contains any
|
||||
%% non-tombstone values; otherwise returns false.
|
||||
|
|
|
@ -293,7 +293,7 @@ running(#qcst{backend=Backend,
|
|||
{history, {call, Backend, fold_keys, [fold_keys_fun(), get_fold_buffer(), g_opts(), State]}},
|
||||
{history, {call, Backend, fold_objects, [fold_objects_fun(), get_fold_buffer(), g_opts(), State]}},
|
||||
{history, {call, Backend, is_empty, [State]}},
|
||||
% {history, {call, ?MODULE, drop, [Backend, State]}},
|
||||
{history, {call, ?MODULE, drop, [Backend, State]}},
|
||||
{stopped, {call, Backend, stop, [State]}}
|
||||
].
|
||||
|
||||
|
|
Loading…
Reference in a new issue