Support sys messages in hanoidb_merger processes

This commit is contained in:
Kresten Krab Thorup 2014-11-26 11:52:53 +01:00
parent f0d24894c5
commit 3b6b0ce197
2 changed files with 122 additions and 46 deletions

View file

@ -784,33 +784,20 @@ begin_merge(State) ->
AFileName = filename("A",State),
BFileName = filename("B",State),
XFileName = filename("X",State),
Owner = self(),
?log("starting merge~n", []),
file:delete(XFileName),
MergePID = proc_lib:spawn_link(fun() ->
try
?log("merge begun~n", []),
{ok, OutCount} = hanoidb_merger:merge(AFileName, BFileName, XFileName,
?BTREE_SIZE(State#state.level + 1),
State#state.next =:= undefined,
State#state.opts ),
Owner ! ?CAST(self(),{merge_done, OutCount, XFileName})
catch
C:E ->
error_logger:error_msg("~p: merge failed ~p:~p ~p -> ~s~n",
[self(), C,E,erlang:get_stacktrace(), XFileName]),
erlang:raise(C,E,erlang:get_stacktrace())
end
end),
MergePID = hanoidb_merger:start(AFileName, BFileName, XFileName,
?BTREE_SIZE(State#state.level + 1),
State#state.next =:= undefined,
State#state.opts),
{ok, MergePID}.
close_and_delete_a_and_b(State) ->
AFileName = filename("A",State),
BFileName = filename("B",State),

View file

@ -28,9 +28,10 @@
%% @doc Merging two Indexes
-export([merge/6]).
-export([start/6, merge/6]).
-include("hanoidb.hrl").
-include("include/plain_rpc.hrl").
%% A merger which is inactive for this long will sleep which means that it will
%% close open files, and compress the current bloom filter.
@ -40,6 +41,27 @@
%% merges, so we default to running the entire merge in one process.
-define(LOCAL_WRITER, true).
-spec start(string(), string(), string(), integer(), boolean(), list()) -> pid().
start(A,B,X, Size, IsLastLevel, Options) ->
Owner = self(),
plain_fsm:spawn_link(?MODULE, fun() ->
try
{ok, OutCount} = hanoidb_merger:merge(A, B, X,
Size,
IsLastLevel,
Options),
Owner ! ?CAST(self(),{merge_done, OutCount, X})
catch
C:E ->
%% this semi-bogus code makes sure we always get a stack trace if merging fails
error_logger:error_msg("~p: merge failed ~p:~p ~p -> ~s~n",
[self(), C,E,erlang:get_stacktrace(), X]),
erlang:raise(C,E,erlang:get_stacktrace())
end
end).
-spec merge(string(), string(), string(), integer(), boolean(), list()) -> {ok, integer()}.
merge(A,B,C, Size, IsLastLevel, Options) ->
{ok, IXA} = hanoidb_reader:open(A, [sequential|Options]),
@ -72,13 +94,77 @@ hibernate_scan(Keep) ->
erlang:garbage_collect(),
receive
{step, From, HowMany} ->
{IXA, IXB, Out, IsLastLevel, AKVs, BKVs, N} = erlang:binary_to_term(zlib:gunzip(Keep)),
{IXA, IXB, Out, IsLastLevel, AKVs, BKVs, N} = erlang:binary_to_term(Keep),
scan(hanoidb_reader:deserialize(IXA),
hanoidb_reader:deserialize(IXB),
hanoidb_writer:deserialize(Out),
IsLastLevel, AKVs, BKVs, {N+HowMany, From})
IsLastLevel, AKVs, BKVs, {N+HowMany, From});
%% gen_fsm handling
{system, From, Req} ->
plain_fsm:handle_system_msg(
Req, From, Keep, fun hibernate_scan/1);
{'EXIT', Parent, Reason} ->
case plain_fsm:info(parent) of
Parent ->
plain_fsm:parent_EXIT(Reason, Keep)
end
end.
hibernate_scan_only(Keep) ->
erlang:garbage_collect(),
receive
{step, From, HowMany} ->
{IX, OutBin, IsLastLevel, KVs, N} = erlang:binary_to_term(Keep),
scan_only(hanoidb_reader:deserialize(IX),
hanoidb_writer:deserialize(OutBin),
IsLastLevel, KVs, {N+HowMany, From});
%% gen_fsm handling
{system, From, Req} ->
plain_fsm:handle_system_msg(
Req, From, Keep, fun hibernate_scan_only/1);
{'EXIT', Parent, Reason} ->
case plain_fsm:info(parent) of
Parent ->
plain_fsm:parent_EXIT(Reason, Keep)
end
end.
receive_scan(IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {N, FromPID}) ->
receive
{step, From, HowMany} ->
scan(IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {N+HowMany, From});
%% gen_fsm handling
{system, From, Req} ->
plain_fsm:handle_system_msg(
Req, From, {IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {N, FromPID}},
fun({IXA2, IXB2, Out2, IsLastLevel2, AKVs2, BKVs2, {N2, FromPID2}}) ->
receive_scan(IXA2, IXB2, Out2, IsLastLevel2, AKVs2, BKVs2, {N2, FromPID2})
end);
{'EXIT', Parent, Reason} ->
case plain_fsm:info(parent) of
Parent ->
plain_fsm:parent_EXIT(Reason, {IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {N, FromPID}})
end
after ?HIBERNATE_TIMEOUT ->
Args = {hanoidb_reader:serialize(IXA),
hanoidb_reader:serialize(IXB),
hanoidb_writer:serialize(Out), IsLastLevel, AKVs, BKVs, N},
Keep = erlang:term_to_binary(Args, [compressed]),
hibernate_scan(Keep)
end.
scan(IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {N, FromPID}) when N < 1, AKVs =/= [], BKVs =/= [] ->
case FromPID of
none ->
@ -87,16 +173,7 @@ scan(IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {N, FromPID}) when N < 1, AKVs =/=
PID ! {Ref, step_done}
end,
receive
{step, From, HowMany} ->
scan(IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {N+HowMany, From})
after ?HIBERNATE_TIMEOUT ->
Args = {hanoidb_reader:serialize(IXA),
hanoidb_reader:serialize(IXB),
hanoidb_writer:serialize(Out), IsLastLevel, AKVs, BKVs, N},
Keep = zlib:gzip(erlang:term_to_binary(Args)),
hibernate_scan(Keep)
end;
receive_scan(IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {N, FromPID});
scan(IXA, IXB, Out, IsLastLevel, [], BKVs, Step) ->
case hanoidb_reader:next_node(IXA) of
@ -128,17 +205,37 @@ scan(IXA, IXB, Out, IsLastLevel, [{_Key1,_Value1}|AT]=_AKVs, [{Key2,Value2}|IX]=
{noreply, Out3} = hanoidb_writer:handle_cast({add, Key2, Value2}, Out),
scan(IXA, IXB, Out3, IsLastLevel, AT, IX, step(Step, 2)).
hibernate_scan_only(Keep) ->
erlang:garbage_collect(),
receive_scan_only(IX, Out, IsLastLevel, KVs, {N, FromPID}) ->
receive
{step, From, HowMany} ->
{IX, OutBin, IsLastLevel, KVs, N} = erlang:binary_to_term(zlib:gunzip(Keep)),
scan_only(hanoidb_reader:deserialize(IX),
hanoidb_writer:deserialize(OutBin),
IsLastLevel, KVs, {N+HowMany, From})
scan_only(IX, Out, IsLastLevel, KVs, {N+HowMany, From});
%% gen_fsm handling
{system, From, Req} ->
plain_fsm:handle_system_msg(
Req, From, {IX, Out, IsLastLevel, KVs, {N, FromPID}},
fun({IX2, Out2, IsLastLevel2, KVs2, {N2, FromPID2}}) ->
receive_scan_only(IX2, Out2, IsLastLevel2, KVs2, {N2, FromPID2})
end);
{'EXIT', Parent, Reason} ->
case plain_fsm:info(parent) of
Parent ->
plain_fsm:parent_EXIT(Reason, {IX, Out, IsLastLevel, KVs, {N, FromPID}})
end
after ?HIBERNATE_TIMEOUT ->
Args = {hanoidb_reader:serialize(IX),
hanoidb_writer:serialize(Out), IsLastLevel, KVs, N},
Keep = erlang:term_to_binary(Args, [compressed]),
hibernate_scan_only(Keep)
end.
scan_only(IX, Out, IsLastLevel, KVs, {N, FromPID}) when N < 1, KVs =/= [] ->
case FromPID of
none ->
@ -147,15 +244,7 @@ scan_only(IX, Out, IsLastLevel, KVs, {N, FromPID}) when N < 1, KVs =/= [] ->
PID ! {Ref, step_done}
end,
receive
{step, From, HowMany} ->
scan_only(IX, Out, IsLastLevel, KVs, {N+HowMany, From})
after ?HIBERNATE_TIMEOUT ->
Args = {hanoidb_reader:serialize(IX),
hanoidb_writer:serialize(Out), IsLastLevel, KVs, N},
Keep = zlib:gzip(erlang:term_to_binary(Args)),
hibernate_scan_only(Keep)
end;
receive_scan_only(IX, Out, IsLastLevel, KVs, {N, FromPID});
scan_only(IX, Out, IsLastLevel, [], {_, FromPID}=Step) ->
case hanoidb_reader:next_node(IX) of