diff --git a/src/hanoi_merger.erl b/src/hanoi_merger.erl index 6a67f24..0069b16 100644 --- a/src/hanoi_merger.erl +++ b/src/hanoi_merger.erl @@ -66,18 +66,21 @@ terminate(Count, Out) -> {ok, Count}. -step({N, From}) -> - {N-1, From}. +step(S) -> + step(S, 1). + +step({N, From}, Steps) -> + {N-Steps, From}. hibernate_scan(Keep) -> erlang:garbage_collect(), receive {step, From, HowMany} -> - {BT1, BT2, OutBin, IsLastLevel, AKVs, BKVs, Count} = erlang:binary_to_term( zlib:gunzip( Keep ) ), - scan(BT1, BT2, hanoi_writer:deserialize(OutBin), IsLastLevel, AKVs, BKVs, Count, {HowMany, From}) + {BT1, BT2, OutBin, IsLastLevel, AKVs, BKVs, Count, N} = erlang:binary_to_term( zlib:gunzip( Keep ) ), + scan(BT1, BT2, hanoi_writer:deserialize(OutBin), IsLastLevel, AKVs, BKVs, Count, {N+HowMany, From}) end. -scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, {0, FromPID}) -> +scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, {N, FromPID}) when N < 1, AKVs =/= [], BKVs =/= [] -> case FromPID of none -> ok; @@ -87,11 +90,11 @@ scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, {0, FromPID}) -> receive {step, From, HowMany} -> - scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, {HowMany, From}) + scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, {N+HowMany, From}) after 10000 -> case ?LOCAL_WRITER of true -> - Args = {BT1, BT2, hanoi_writer:serialize(Out), IsLastLevel, AKVs, BKVs, Count}, + Args = {BT1, BT2, hanoi_writer:serialize(Out), IsLastLevel, AKVs, BKVs, Count, N}, Keep = zlib:gzip ( erlang:term_to_binary( Args ) ), hibernate_scan(Keep); false -> @@ -137,8 +140,10 @@ scan(BT1, BT2, Out, IsLastLevel, [{Key1,Value1}|AT]=AKVs, [{Key2,Value2}|BT]=BKV end, scan(BT1, BT2, Out2, IsLastLevel, AKVs, BT, Count+1, step(Step)); + %% cases below have Key1 == Key2, hence it consumes 2 elements + (?TOMBSTONE =:= Value2) and (true =:= IsLastLevel) -> - scan(BT1, BT2, Out, IsLastLevel, AT, BT, Count, step(Step)); + scan(BT1, BT2, Out, IsLastLevel, AT, BT, Count, step(Step, 2)); true -> case ?LOCAL_WRITER of @@ -147,13 +152,26 @@ scan(BT1, BT2, Out, IsLastLevel, [{Key1,Value1}|AT]=AKVs, [{Key2,Value2}|BT]=BKV false -> ok = hanoi_writer:add(Out2=Out, Key2, Value2) end, - scan(BT1, BT2, Out2, IsLastLevel, AT, BT, Count+1, step(Step)) + scan(BT1, BT2, Out2, IsLastLevel, AT, BT, Count+1, step(Step, 2)) end. +scan_only(BT, Out, IsLastLevel, KVs, Count, {N, FromPID}) when N < 1, KVs =/= [] -> + case FromPID of + none -> + ok; + {PID, Ref} -> + PID ! {Ref, step_done} + end, + + receive + {step, From, HowMany} -> + scan_only(BT, Out, IsLastLevel, KVs, Count, {N+HowMany, From}) + end; + scan_only(BT, Out, IsLastLevel, [], Count, {_, FromPID}=Step) -> case hanoi_reader:next_node(BT) of {node, KVs} -> - scan_only(BT, Out, IsLastLevel, KVs, Count, step(Step)); + scan_only(BT, Out, IsLastLevel, KVs, Count, Step); end_of_data -> case FromPID of none ->