diff --git a/src/hanoi_merger.erl b/src/hanoi_merger.erl index 93a5191..adeb593 100644 --- a/src/hanoi_merger.erl +++ b/src/hanoi_merger.erl @@ -53,17 +53,15 @@ merge(A,B,C, Size, IsLastLevel, Options) -> {node, AKVs} = hanoi_reader:first_node(BT1), {node, BKVs} = hanoi_reader:first_node(BT2), - {ok, Count, Out2} = scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, 0, {0, none}), + scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, 0, {0, none}). - %% finish stream tree - ok = hanoi_reader:close(BT1), - ok = hanoi_reader:close(BT2), +terminate(Count, Out) -> case ?LOCAL_WRITER of true -> - {stop, normal, ok, _} = hanoi_writer:handle_call(close, self(), Out2); + {stop, normal, ok, _} = hanoi_writer:handle_call(close, self(), Out); false -> - ok = hanoi_writer:close(Out2) + ok = hanoi_writer:close(Out) end, {ok, Count}. @@ -71,6 +69,16 @@ merge(A,B,C, Size, IsLastLevel, Options) -> step({N, From}) -> {N-1, From}. +hibernate_scan(Keep) -> + error_logger:info_msg("hibernating ~p~n", [self()]), + erlang:garbage_collect(), + receive + {step, From, HowMany} -> + error_logger:info_msg("waking up ~p~n", [self()]), + {BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count} = erlang:binary_to_term( zlib:gunzip( Keep ) ), + scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, {HowMany, From}) + end. + scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, {0, FromPID}) -> case FromPID of none -> @@ -82,6 +90,10 @@ scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, {0, FromPID}) -> receive {step, From, HowMany} -> scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, {HowMany, From}) + after 10000 -> + Args = {BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count}, + Keep = zlib:gzip ( erlang:term_to_binary( Args ) ), + hibernate_scan(Keep) end; scan(BT1, BT2, Out, IsLastLevel, [], BKVs, Count, Step) -> @@ -89,6 +101,7 @@ scan(BT1, BT2, Out, IsLastLevel, [], BKVs, Count, Step) -> {node, AKVs} -> scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, Step); end_of_data -> + hanoi_reader:close(BT1), scan_only(BT2, Out, IsLastLevel, BKVs, Count, Step) end; @@ -97,6 +110,7 @@ scan(BT1, BT2, Out, IsLastLevel, AKVs, [], Count, Step) -> {node, BKVs} -> scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, Step); end_of_data -> + hanoi_reader:close(BT2), scan_only(BT1, Out, IsLastLevel, AKVs, Count, Step) end; @@ -144,7 +158,8 @@ scan_only(BT, Out, IsLastLevel, [], Count, {_, FromPID}=Step) -> {PID, Ref} -> PID ! {Ref, step_done} end, - {ok, Count, Out} + hanoi_reader:close(BT), + terminate(Count, Out) end; scan_only(BT, Out, true, [{_,?TOMBSTONE}|Rest], Count, Step) ->