diff --git a/src/hanoidb_level.erl b/src/hanoidb_level.erl index a47f54c..9d9c067 100644 --- a/src/hanoidb_level.erl +++ b/src/hanoidb_level.erl @@ -784,33 +784,20 @@ begin_merge(State) -> AFileName = filename("A",State), BFileName = filename("B",State), XFileName = filename("X",State), - Owner = self(), ?log("starting merge~n", []), file:delete(XFileName), - MergePID = proc_lib:spawn_link(fun() -> - try - ?log("merge begun~n", []), - - {ok, OutCount} = hanoidb_merger:merge(AFileName, BFileName, XFileName, - ?BTREE_SIZE(State#state.level + 1), - State#state.next =:= undefined, - State#state.opts ), - - Owner ! ?CAST(self(),{merge_done, OutCount, XFileName}) - catch - C:E -> - error_logger:error_msg("~p: merge failed ~p:~p ~p -> ~s~n", - [self(), C,E,erlang:get_stacktrace(), XFileName]), - erlang:raise(C,E,erlang:get_stacktrace()) - end - end), + MergePID = hanoidb_merger:start(AFileName, BFileName, XFileName, + ?BTREE_SIZE(State#state.level + 1), + State#state.next =:= undefined, + State#state.opts), {ok, MergePID}. + close_and_delete_a_and_b(State) -> AFileName = filename("A",State), BFileName = filename("B",State), diff --git a/src/hanoidb_merger.erl b/src/hanoidb_merger.erl index 1a54a5d..85c5693 100644 --- a/src/hanoidb_merger.erl +++ b/src/hanoidb_merger.erl @@ -28,9 +28,10 @@ %% @doc Merging two Indexes --export([merge/6]). +-export([start/6, merge/6]). -include("hanoidb.hrl"). +-include("include/plain_rpc.hrl"). %% A merger which is inactive for this long will sleep which means that it will %% close open files, and compress the current bloom filter. @@ -40,6 +41,27 @@ %% merges, so we default to running the entire merge in one process. -define(LOCAL_WRITER, true). + +-spec start(string(), string(), string(), integer(), boolean(), list()) -> pid(). +start(A,B,X, Size, IsLastLevel, Options) -> + Owner = self(), + plain_fsm:spawn_link(?MODULE, fun() -> + try + {ok, OutCount} = hanoidb_merger:merge(A, B, X, + Size, + IsLastLevel, + Options), + + Owner ! ?CAST(self(),{merge_done, OutCount, X}) + catch + C:E -> + %% this semi-bogus code makes sure we always get a stack trace if merging fails + error_logger:error_msg("~p: merge failed ~p:~p ~p -> ~s~n", + [self(), C,E,erlang:get_stacktrace(), X]), + erlang:raise(C,E,erlang:get_stacktrace()) + end + end). + -spec merge(string(), string(), string(), integer(), boolean(), list()) -> {ok, integer()}. merge(A,B,C, Size, IsLastLevel, Options) -> {ok, IXA} = hanoidb_reader:open(A, [sequential|Options]), @@ -72,13 +94,77 @@ hibernate_scan(Keep) -> erlang:garbage_collect(), receive {step, From, HowMany} -> - {IXA, IXB, Out, IsLastLevel, AKVs, BKVs, N} = erlang:binary_to_term(zlib:gunzip(Keep)), + {IXA, IXB, Out, IsLastLevel, AKVs, BKVs, N} = erlang:binary_to_term(Keep), scan(hanoidb_reader:deserialize(IXA), hanoidb_reader:deserialize(IXB), hanoidb_writer:deserialize(Out), - IsLastLevel, AKVs, BKVs, {N+HowMany, From}) + IsLastLevel, AKVs, BKVs, {N+HowMany, From}); + + %% gen_fsm handling + {system, From, Req} -> + plain_fsm:handle_system_msg( + Req, From, Keep, fun hibernate_scan/1); + + {'EXIT', Parent, Reason} -> + case plain_fsm:info(parent) of + Parent -> + plain_fsm:parent_EXIT(Reason, Keep) + end + end. + +hibernate_scan_only(Keep) -> + erlang:garbage_collect(), + receive + {step, From, HowMany} -> + {IX, OutBin, IsLastLevel, KVs, N} = erlang:binary_to_term(Keep), + scan_only(hanoidb_reader:deserialize(IX), + hanoidb_writer:deserialize(OutBin), + IsLastLevel, KVs, {N+HowMany, From}); + + %% gen_fsm handling + {system, From, Req} -> + plain_fsm:handle_system_msg( + Req, From, Keep, fun hibernate_scan_only/1); + + {'EXIT', Parent, Reason} -> + case plain_fsm:info(parent) of + Parent -> + plain_fsm:parent_EXIT(Reason, Keep) + end + end. + + +receive_scan(IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {N, FromPID}) -> + + receive + {step, From, HowMany} -> + scan(IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {N+HowMany, From}); + + %% gen_fsm handling + {system, From, Req} -> + plain_fsm:handle_system_msg( + Req, From, {IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {N, FromPID}}, + fun({IXA2, IXB2, Out2, IsLastLevel2, AKVs2, BKVs2, {N2, FromPID2}}) -> + receive_scan(IXA2, IXB2, Out2, IsLastLevel2, AKVs2, BKVs2, {N2, FromPID2}) + end); + + {'EXIT', Parent, Reason} -> + case plain_fsm:info(parent) of + Parent -> + plain_fsm:parent_EXIT(Reason, {IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {N, FromPID}}) + end + + after ?HIBERNATE_TIMEOUT -> + Args = {hanoidb_reader:serialize(IXA), + hanoidb_reader:serialize(IXB), + hanoidb_writer:serialize(Out), IsLastLevel, AKVs, BKVs, N}, + Keep = erlang:term_to_binary(Args, [compressed]), + hibernate_scan(Keep) + end. + + scan(IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {N, FromPID}) when N < 1, AKVs =/= [], BKVs =/= [] -> case FromPID of none -> @@ -87,16 +173,7 @@ scan(IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {N, FromPID}) when N < 1, AKVs =/= PID ! {Ref, step_done} end, - receive - {step, From, HowMany} -> - scan(IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {N+HowMany, From}) - after ?HIBERNATE_TIMEOUT -> - Args = {hanoidb_reader:serialize(IXA), - hanoidb_reader:serialize(IXB), - hanoidb_writer:serialize(Out), IsLastLevel, AKVs, BKVs, N}, - Keep = zlib:gzip(erlang:term_to_binary(Args)), - hibernate_scan(Keep) - end; + receive_scan(IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {N, FromPID}); scan(IXA, IXB, Out, IsLastLevel, [], BKVs, Step) -> case hanoidb_reader:next_node(IXA) of @@ -128,17 +205,37 @@ scan(IXA, IXB, Out, IsLastLevel, [{_Key1,_Value1}|AT]=_AKVs, [{Key2,Value2}|IX]= {noreply, Out3} = hanoidb_writer:handle_cast({add, Key2, Value2}, Out), scan(IXA, IXB, Out3, IsLastLevel, AT, IX, step(Step, 2)). -hibernate_scan_only(Keep) -> - erlang:garbage_collect(), + +receive_scan_only(IX, Out, IsLastLevel, KVs, {N, FromPID}) -> + + receive {step, From, HowMany} -> - {IX, OutBin, IsLastLevel, KVs, N} = erlang:binary_to_term(zlib:gunzip(Keep)), - scan_only(hanoidb_reader:deserialize(IX), - hanoidb_writer:deserialize(OutBin), - IsLastLevel, KVs, {N+HowMany, From}) + scan_only(IX, Out, IsLastLevel, KVs, {N+HowMany, From}); + + %% gen_fsm handling + {system, From, Req} -> + plain_fsm:handle_system_msg( + Req, From, {IX, Out, IsLastLevel, KVs, {N, FromPID}}, + fun({IX2, Out2, IsLastLevel2, KVs2, {N2, FromPID2}}) -> + receive_scan_only(IX2, Out2, IsLastLevel2, KVs2, {N2, FromPID2}) + end); + + {'EXIT', Parent, Reason} -> + case plain_fsm:info(parent) of + Parent -> + plain_fsm:parent_EXIT(Reason, {IX, Out, IsLastLevel, KVs, {N, FromPID}}) + end + + after ?HIBERNATE_TIMEOUT -> + Args = {hanoidb_reader:serialize(IX), + hanoidb_writer:serialize(Out), IsLastLevel, KVs, N}, + Keep = erlang:term_to_binary(Args, [compressed]), + hibernate_scan_only(Keep) end. + scan_only(IX, Out, IsLastLevel, KVs, {N, FromPID}) when N < 1, KVs =/= [] -> case FromPID of none -> @@ -147,15 +244,7 @@ scan_only(IX, Out, IsLastLevel, KVs, {N, FromPID}) when N < 1, KVs =/= [] -> PID ! {Ref, step_done} end, - receive - {step, From, HowMany} -> - scan_only(IX, Out, IsLastLevel, KVs, {N+HowMany, From}) - after ?HIBERNATE_TIMEOUT -> - Args = {hanoidb_reader:serialize(IX), - hanoidb_writer:serialize(Out), IsLastLevel, KVs, N}, - Keep = zlib:gzip(erlang:term_to_binary(Args)), - hibernate_scan_only(Keep) - end; + receive_scan_only(IX, Out, IsLastLevel, KVs, {N, FromPID}); scan_only(IX, Out, IsLastLevel, [], {_, FromPID}=Step) -> case hanoidb_reader:next_node(IX) of