Allow merger to be local
This commit is contained in:
parent
51f1c13650
commit
6c0766a433
1 changed files with 46 additions and 12 deletions
|
@ -6,20 +6,33 @@
|
||||||
|
|
||||||
-export([merge/4]).
|
-export([merge/4]).
|
||||||
|
|
||||||
|
-define(LOCAL_WRITER, true).
|
||||||
|
|
||||||
merge(A,B,C, Size) ->
|
merge(A,B,C, Size) ->
|
||||||
{ok, BT1} = fractal_btree_reader:open(A),
|
{ok, BT1} = fractal_btree_reader:open(A),
|
||||||
{ok, BT2} = fractal_btree_reader:open(B),
|
{ok, BT2} = fractal_btree_reader:open(B),
|
||||||
{ok, Out} = fractal_btree_writer:open(C, Size),
|
case ?LOCAL_WRITER of
|
||||||
|
true ->
|
||||||
|
{ok, Out} = fractal_btree_writer:init([C, Size]);
|
||||||
|
false ->
|
||||||
|
{ok, Out} = fractal_btree_writer:open(C, Size)
|
||||||
|
end,
|
||||||
|
|
||||||
{node, AKVs} = fractal_btree_reader:first_node(BT1),
|
{node, AKVs} = fractal_btree_reader:first_node(BT1),
|
||||||
{node, BKVs} = fractal_btree_reader:first_node(BT2),
|
{node, BKVs} = fractal_btree_reader:first_node(BT2),
|
||||||
|
|
||||||
{ok, Count} = scan(BT1, BT2, Out, AKVs, BKVs, 0),
|
{ok, Count, Out2} = scan(BT1, BT2, Out, AKVs, BKVs, 0),
|
||||||
|
|
||||||
%% finish stream tree
|
%% finish stream tree
|
||||||
ok = fractal_btree_reader:close(BT1),
|
ok = fractal_btree_reader:close(BT1),
|
||||||
ok = fractal_btree_reader:close(BT2),
|
ok = fractal_btree_reader:close(BT2),
|
||||||
ok = fractal_btree_writer:close(Out),
|
|
||||||
|
case ?LOCAL_WRITER of
|
||||||
|
true ->
|
||||||
|
{stop, normal, ok, _} = fractal_btree_writer:handle_call(close, self(), Out2);
|
||||||
|
false ->
|
||||||
|
ok = fractal_btree_writer:close(Out2)
|
||||||
|
end,
|
||||||
|
|
||||||
{ok, Count}.
|
{ok, Count}.
|
||||||
|
|
||||||
|
@ -42,17 +55,33 @@ scan(BT1, BT2, Out, AKVs, [], Count) ->
|
||||||
|
|
||||||
scan(BT1, BT2, Out, [{Key1,Value1}|AT]=AKVs, [{Key2,Value2}|BT]=BKVs, Count) ->
|
scan(BT1, BT2, Out, [{Key1,Value1}|AT]=AKVs, [{Key2,Value2}|BT]=BKVs, Count) ->
|
||||||
if Key1 < Key2 ->
|
if Key1 < Key2 ->
|
||||||
ok = fractal_btree_writer:add(Out, Key1, Value1),
|
case ?LOCAL_WRITER of
|
||||||
scan(BT1, BT2, Out, AT, BKVs, Count+1);
|
true ->
|
||||||
|
{noreply, Out2} = fractal_btree_writer:handle_cast({add, Key1, Value1}, Out);
|
||||||
|
false ->
|
||||||
|
ok = fractal_btree_writer:add(Out2=Out, Key1, Value1)
|
||||||
|
end,
|
||||||
|
|
||||||
|
scan(BT1, BT2, Out2, AT, BKVs, Count+1);
|
||||||
|
|
||||||
Key2 < Key1 ->
|
Key2 < Key1 ->
|
||||||
ok = fractal_btree_writer:add(Out, Key2, Value2),
|
case ?LOCAL_WRITER of
|
||||||
scan(BT1, BT2, Out, AKVs, BT, Count+1);
|
true ->
|
||||||
|
{noreply, Out2} = fractal_btree_writer:handle_cast({add, Key2, Value2}, Out);
|
||||||
|
false ->
|
||||||
|
ok = fractal_btree_writer:add(Out2=Out, Key2, Value2)
|
||||||
|
end,
|
||||||
|
scan(BT1, BT2, Out2, AKVs, BT, Count+1);
|
||||||
|
|
||||||
Key1 == Key2 ->
|
Key1 == Key2 ->
|
||||||
%% TODO: eliminate tombstones, right now they just bubble down
|
%% TODO: eliminate tombstones, right now they just bubble down
|
||||||
ok = fractal_btree_writer:add(Out, Key2, Value2),
|
case ?LOCAL_WRITER of
|
||||||
scan(BT1, BT2, Out, AT, BT, Count+1)
|
true ->
|
||||||
|
{noreply, Out2} = fractal_btree_writer:handle_cast({add, Key2, Value2}, Out);
|
||||||
|
false ->
|
||||||
|
ok = fractal_btree_writer:add(Out2=Out, Key2, Value2)
|
||||||
|
end,
|
||||||
|
scan(BT1, BT2, Out2, AT, BT, Count+1)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
scan_only(BT, Out, [], Count) ->
|
scan_only(BT, Out, [], Count) ->
|
||||||
|
@ -60,9 +89,14 @@ scan_only(BT, Out, [], Count) ->
|
||||||
{node, KVs} ->
|
{node, KVs} ->
|
||||||
scan_only(BT, Out, KVs, Count);
|
scan_only(BT, Out, KVs, Count);
|
||||||
end_of_data ->
|
end_of_data ->
|
||||||
{ok, Count}
|
{ok, Count, Out}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
scan_only(BT, Out, [{Key,Value}|Rest], Count) ->
|
scan_only(BT, Out, [{Key,Value}|Rest], Count) ->
|
||||||
ok = fractal_btree_writer:add(Out, Key, Value),
|
case ?LOCAL_WRITER of
|
||||||
scan_only(BT, Out, Rest, Count+1).
|
true ->
|
||||||
|
{noreply, Out2} = fractal_btree_writer:handle_cast({add, Key, Value}, Out);
|
||||||
|
false ->
|
||||||
|
ok = fractal_btree_writer:add(Out2=Out, Key, Value)
|
||||||
|
end,
|
||||||
|
scan_only(BT, Out2, Rest, Count+1).
|
||||||
|
|
Loading…
Reference in a new issue