Make lsm_btree:close/1 stop more processes
Closing a tree did not stop ongoing merge processes beyond the current top level. Now close synchronously calls down through all levels and closes each one.
This commit is contained in:
parent
6e9ef1ee47
commit
ead8d3a41d
2 changed files with 30 additions and 5 deletions
|
@ -19,7 +19,12 @@ open(Dir) ->
|
||||||
gen_server:start(?MODULE, [Dir], []).
|
gen_server:start(?MODULE, [Dir], []).
|
||||||
|
|
||||||
close(Ref) ->
|
close(Ref) ->
|
||||||
gen_server:call(Ref, close).
|
try
|
||||||
|
gen_server:call(Ref, close)
|
||||||
|
catch
|
||||||
|
exit:{noproc,_} -> ok
|
||||||
|
end.
|
||||||
|
|
||||||
|
|
||||||
lookup(Ref,Key) when is_binary(Key) ->
|
lookup(Ref,Key) when is_binary(Key) ->
|
||||||
gen_server:call(Ref, {lookup, Key}).
|
gen_server:call(Ref, {lookup, Key}).
|
||||||
|
@ -147,9 +152,16 @@ handle_call({lookup, Key}, _From, State=#state{ top=Top, nursery=Nursery } ) whe
|
||||||
{reply, Reply, State}
|
{reply, Reply, State}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_call(close, _From, State) ->
|
handle_call(close, _From, State=#state{top=Top}) ->
|
||||||
{ok, State2} = flush_nursery(State),
|
try
|
||||||
{stop, normal, ok, State2}.
|
{ok, State2} = flush_nursery(State),
|
||||||
|
ok = lsm_btree_level:close(Top),
|
||||||
|
{stop, normal, ok, State2}
|
||||||
|
catch
|
||||||
|
E:R ->
|
||||||
|
error_logger:info_msg("exception from close ~p:~p~n", [E,R]),
|
||||||
|
{stop, normal, ok, State}
|
||||||
|
end.
|
||||||
|
|
||||||
do_put(Key, Value, State=#state{ nursery=Nursery, top=Top }) ->
|
do_put(Key, Value, State=#state{ nursery=Nursery, top=Top }) ->
|
||||||
{ok, Nursery2} = lsm_btree_nursery:add_maybe_flush(Key, Value, Nursery, Top),
|
{ok, Nursery2} = lsm_btree_nursery:add_maybe_flush(Key, Value, Nursery, Top),
|
||||||
|
|
|
@ -41,7 +41,12 @@ inject(Ref, FileName) ->
|
||||||
Result.
|
Result.
|
||||||
|
|
||||||
close(Ref) ->
|
close(Ref) ->
|
||||||
call(Ref, close).
|
try
|
||||||
|
call(Ref, close)
|
||||||
|
catch
|
||||||
|
exit:{noproc,_} -> ok
|
||||||
|
end.
|
||||||
|
|
||||||
|
|
||||||
range_fold(Ref, SendTo, From, To) ->
|
range_fold(Ref, SendTo, From, To) ->
|
||||||
{ok, FoldWorkerPID} = lsm_btree_fold_worker:start(SendTo),
|
{ok, FoldWorkerPID} = lsm_btree_fold_worker:start(SendTo),
|
||||||
|
@ -170,6 +175,14 @@ main_loop(State = #state{ next=Next }) ->
|
||||||
close_if_defined(State#state.b),
|
close_if_defined(State#state.b),
|
||||||
stop_if_defined(State#state.merge_pid),
|
stop_if_defined(State#state.merge_pid),
|
||||||
reply(From, ok),
|
reply(From, ok),
|
||||||
|
|
||||||
|
%% this is synchronous all the way down, because our
|
||||||
|
%% caller is monitoring *this* proces, and thus the
|
||||||
|
%% rpc would fail when we fall off the cliff
|
||||||
|
if Next == undefined -> ok;
|
||||||
|
true ->
|
||||||
|
lsm_btree_level:close(Next)
|
||||||
|
end,
|
||||||
ok;
|
ok;
|
||||||
|
|
||||||
?REQ(From, {init_range_fold, WorkerPID, FromKey, ToKey, List}) when State#state.folding == 0 ->
|
?REQ(From, {init_range_fold, WorkerPID, FromKey, ToKey, List}) when State#state.folding == 0 ->
|
||||||
|
|
Loading…
Reference in a new issue