Correct step counts in merger

Merge was progressing too fast. This corrects
the progress house keeping in processing
merge work.
This commit is contained in:
Kresten Krab Thorup 2012-04-30 19:28:20 +02:00
parent d6b8491a3d
commit 6ce7101506

View file

@ -66,18 +66,21 @@ terminate(Count, Out) ->
{ok, Count}. {ok, Count}.
step({N, From}) -> step(S) ->
{N-1, From}. step(S, 1).
step({N, From}, Steps) ->
{N-Steps, From}.
hibernate_scan(Keep) -> hibernate_scan(Keep) ->
erlang:garbage_collect(), erlang:garbage_collect(),
receive receive
{step, From, HowMany} -> {step, From, HowMany} ->
{BT1, BT2, OutBin, IsLastLevel, AKVs, BKVs, Count} = erlang:binary_to_term( zlib:gunzip( Keep ) ), {BT1, BT2, OutBin, IsLastLevel, AKVs, BKVs, Count, N} = erlang:binary_to_term( zlib:gunzip( Keep ) ),
scan(BT1, BT2, hanoi_writer:deserialize(OutBin), IsLastLevel, AKVs, BKVs, Count, {HowMany, From}) scan(BT1, BT2, hanoi_writer:deserialize(OutBin), IsLastLevel, AKVs, BKVs, Count, {N+HowMany, From})
end. end.
scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, {0, FromPID}) -> scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, {N, FromPID}) when N < 1, AKVs =/= [], BKVs =/= [] ->
case FromPID of case FromPID of
none -> none ->
ok; ok;
@ -87,11 +90,11 @@ scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, {0, FromPID}) ->
receive receive
{step, From, HowMany} -> {step, From, HowMany} ->
scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, {HowMany, From}) scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, {N+HowMany, From})
after 10000 -> after 10000 ->
case ?LOCAL_WRITER of case ?LOCAL_WRITER of
true -> true ->
Args = {BT1, BT2, hanoi_writer:serialize(Out), IsLastLevel, AKVs, BKVs, Count}, Args = {BT1, BT2, hanoi_writer:serialize(Out), IsLastLevel, AKVs, BKVs, Count, N},
Keep = zlib:gzip ( erlang:term_to_binary( Args ) ), Keep = zlib:gzip ( erlang:term_to_binary( Args ) ),
hibernate_scan(Keep); hibernate_scan(Keep);
false -> false ->
@ -137,8 +140,10 @@ scan(BT1, BT2, Out, IsLastLevel, [{Key1,Value1}|AT]=AKVs, [{Key2,Value2}|BT]=BKV
end, end,
scan(BT1, BT2, Out2, IsLastLevel, AKVs, BT, Count+1, step(Step)); scan(BT1, BT2, Out2, IsLastLevel, AKVs, BT, Count+1, step(Step));
%% cases below have Key1 == Key2, hence it consumes 2 elements
(?TOMBSTONE =:= Value2) and (true =:= IsLastLevel) -> (?TOMBSTONE =:= Value2) and (true =:= IsLastLevel) ->
scan(BT1, BT2, Out, IsLastLevel, AT, BT, Count, step(Step)); scan(BT1, BT2, Out, IsLastLevel, AT, BT, Count, step(Step, 2));
true -> true ->
case ?LOCAL_WRITER of case ?LOCAL_WRITER of
@ -147,13 +152,26 @@ scan(BT1, BT2, Out, IsLastLevel, [{Key1,Value1}|AT]=AKVs, [{Key2,Value2}|BT]=BKV
false -> false ->
ok = hanoi_writer:add(Out2=Out, Key2, Value2) ok = hanoi_writer:add(Out2=Out, Key2, Value2)
end, end,
scan(BT1, BT2, Out2, IsLastLevel, AT, BT, Count+1, step(Step)) scan(BT1, BT2, Out2, IsLastLevel, AT, BT, Count+1, step(Step, 2))
end. end.
scan_only(BT, Out, IsLastLevel, KVs, Count, {N, FromPID}) when N < 1, KVs =/= [] ->
case FromPID of
none ->
ok;
{PID, Ref} ->
PID ! {Ref, step_done}
end,
receive
{step, From, HowMany} ->
scan_only(BT, Out, IsLastLevel, KVs, Count, {N+HowMany, From})
end;
scan_only(BT, Out, IsLastLevel, [], Count, {_, FromPID}=Step) -> scan_only(BT, Out, IsLastLevel, [], Count, {_, FromPID}=Step) ->
case hanoi_reader:next_node(BT) of case hanoi_reader:next_node(BT) of
{node, KVs} -> {node, KVs} ->
scan_only(BT, Out, IsLastLevel, KVs, Count, step(Step)); scan_only(BT, Out, IsLastLevel, KVs, Count, Step);
end_of_data -> end_of_data ->
case FromPID of case FromPID of
none -> none ->