Implement hibernation in merge processes
Analysis seems to indicate that merge processes (from high-numbered levels) tend to be activated quite infrequent. Thus, we term-to-bin/gzip the merge process state, and invoke explicit gc before waiting for a {step, …} message again.
This commit is contained in:
parent
801817cf70
commit
15fc05634a
1 changed files with 22 additions and 7 deletions
|
@ -53,17 +53,15 @@ merge(A,B,C, Size, IsLastLevel, Options) ->
|
|||
{node, AKVs} = hanoi_reader:first_node(BT1),
|
||||
{node, BKVs} = hanoi_reader:first_node(BT2),
|
||||
|
||||
{ok, Count, Out2} = scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, 0, {0, none}),
|
||||
scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, 0, {0, none}).
|
||||
|
||||
%% finish stream tree
|
||||
ok = hanoi_reader:close(BT1),
|
||||
ok = hanoi_reader:close(BT2),
|
||||
terminate(Count, Out) ->
|
||||
|
||||
case ?LOCAL_WRITER of
|
||||
true ->
|
||||
{stop, normal, ok, _} = hanoi_writer:handle_call(close, self(), Out2);
|
||||
{stop, normal, ok, _} = hanoi_writer:handle_call(close, self(), Out);
|
||||
false ->
|
||||
ok = hanoi_writer:close(Out2)
|
||||
ok = hanoi_writer:close(Out)
|
||||
end,
|
||||
|
||||
{ok, Count}.
|
||||
|
@ -71,6 +69,16 @@ merge(A,B,C, Size, IsLastLevel, Options) ->
|
|||
step({N, From}) ->
|
||||
{N-1, From}.
|
||||
|
||||
hibernate_scan(Keep) ->
|
||||
error_logger:info_msg("hibernating ~p~n", [self()]),
|
||||
erlang:garbage_collect(),
|
||||
receive
|
||||
{step, From, HowMany} ->
|
||||
error_logger:info_msg("waking up ~p~n", [self()]),
|
||||
{BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count} = erlang:binary_to_term( zlib:gunzip( Keep ) ),
|
||||
scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, {HowMany, From})
|
||||
end.
|
||||
|
||||
scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, {0, FromPID}) ->
|
||||
case FromPID of
|
||||
none ->
|
||||
|
@ -82,6 +90,10 @@ scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, {0, FromPID}) ->
|
|||
receive
|
||||
{step, From, HowMany} ->
|
||||
scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, {HowMany, From})
|
||||
after 10000 ->
|
||||
Args = {BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count},
|
||||
Keep = zlib:gzip ( erlang:term_to_binary( Args ) ),
|
||||
hibernate_scan(Keep)
|
||||
end;
|
||||
|
||||
scan(BT1, BT2, Out, IsLastLevel, [], BKVs, Count, Step) ->
|
||||
|
@ -89,6 +101,7 @@ scan(BT1, BT2, Out, IsLastLevel, [], BKVs, Count, Step) ->
|
|||
{node, AKVs} ->
|
||||
scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, Step);
|
||||
end_of_data ->
|
||||
hanoi_reader:close(BT1),
|
||||
scan_only(BT2, Out, IsLastLevel, BKVs, Count, Step)
|
||||
end;
|
||||
|
||||
|
@ -97,6 +110,7 @@ scan(BT1, BT2, Out, IsLastLevel, AKVs, [], Count, Step) ->
|
|||
{node, BKVs} ->
|
||||
scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, Step);
|
||||
end_of_data ->
|
||||
hanoi_reader:close(BT2),
|
||||
scan_only(BT1, Out, IsLastLevel, AKVs, Count, Step)
|
||||
end;
|
||||
|
||||
|
@ -144,7 +158,8 @@ scan_only(BT, Out, IsLastLevel, [], Count, {_, FromPID}=Step) ->
|
|||
{PID, Ref} ->
|
||||
PID ! {Ref, step_done}
|
||||
end,
|
||||
{ok, Count, Out}
|
||||
hanoi_reader:close(BT),
|
||||
terminate(Count, Out)
|
||||
end;
|
||||
|
||||
scan_only(BT, Out, true, [{_,?TOMBSTONE}|Rest], Count, Step) ->
|
||||
|
|
Loading…
Reference in a new issue