Add option to evict tombstones in merge
This commit is contained in:
parent
5b4a4551a9
commit
3d0c36c3bc
3 changed files with 32 additions and 26 deletions
|
@ -261,7 +261,8 @@ begin_merge(State) ->
|
||||||
|
|
||||||
MergePID = proc_lib:spawn_link(fun() ->
|
MergePID = proc_lib:spawn_link(fun() ->
|
||||||
{ok, OutCount} = fractal_btree_merger2:merge(AFileName, BFileName, XFileName,
|
{ok, OutCount} = fractal_btree_merger2:merge(AFileName, BFileName, XFileName,
|
||||||
1 bsl (State#state.level + 1)),
|
1 bsl (State#state.level + 1),
|
||||||
|
State#state.next =:= undefined),
|
||||||
% error_logger:info_msg("merge done ~p,~p -> ~p~n", [AFileName, BFileName, XFileName]),
|
% error_logger:info_msg("merge done ~p,~p -> ~p~n", [AFileName, BFileName, XFileName]),
|
||||||
|
|
||||||
Owner ! {merge_done, OutCount, XFileName}
|
Owner ! {merge_done, OutCount, XFileName}
|
||||||
|
|
|
@ -1,14 +1,14 @@
|
||||||
-module(fractal_btree_merger2).
|
-module(fractal_btree_merger2).
|
||||||
|
|
||||||
%%
|
%%
|
||||||
%% Naive Merge of two b-trees. A better implementation should iterate leafs, not KV's
|
%% Merging two BTrees
|
||||||
%%
|
%%
|
||||||
|
|
||||||
-export([merge/4]).
|
-export([merge/5]).
|
||||||
|
|
||||||
-define(LOCAL_WRITER, true).
|
-define(LOCAL_WRITER, true).
|
||||||
|
|
||||||
merge(A,B,C, Size) ->
|
merge(A,B,C, Size, IsLastLevel) ->
|
||||||
{ok, BT1} = fractal_btree_reader:open(A),
|
{ok, BT1} = fractal_btree_reader:open(A),
|
||||||
{ok, BT2} = fractal_btree_reader:open(B),
|
{ok, BT2} = fractal_btree_reader:open(B),
|
||||||
case ?LOCAL_WRITER of
|
case ?LOCAL_WRITER of
|
||||||
|
@ -21,7 +21,7 @@ merge(A,B,C, Size) ->
|
||||||
{node, AKVs} = fractal_btree_reader:first_node(BT1),
|
{node, AKVs} = fractal_btree_reader:first_node(BT1),
|
||||||
{node, BKVs} = fractal_btree_reader:first_node(BT2),
|
{node, BKVs} = fractal_btree_reader:first_node(BT2),
|
||||||
|
|
||||||
{ok, Count, Out2} = scan(BT1, BT2, Out, AKVs, BKVs, 0),
|
{ok, Count, Out2} = scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, 0),
|
||||||
|
|
||||||
%% finish stream tree
|
%% finish stream tree
|
||||||
ok = fractal_btree_reader:close(BT1),
|
ok = fractal_btree_reader:close(BT1),
|
||||||
|
@ -37,23 +37,23 @@ merge(A,B,C, Size) ->
|
||||||
{ok, Count}.
|
{ok, Count}.
|
||||||
|
|
||||||
|
|
||||||
scan(BT1, BT2, Out, [], BKVs, Count) ->
|
scan(BT1, BT2, Out, IsLastLevel, [], BKVs, Count) ->
|
||||||
case fractal_btree_reader:next_node(BT1) of
|
case fractal_btree_reader:next_node(BT1) of
|
||||||
{node, AKVs} ->
|
{node, AKVs} ->
|
||||||
scan(BT1, BT2, Out, AKVs, BKVs, Count);
|
scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count);
|
||||||
end_of_data ->
|
end_of_data ->
|
||||||
scan_only(BT2, Out, BKVs, Count)
|
scan_only(BT2, Out, IsLastLevel, BKVs, Count)
|
||||||
end;
|
end;
|
||||||
|
|
||||||
scan(BT1, BT2, Out, AKVs, [], Count) ->
|
scan(BT1, BT2, Out, IsLastLevel, AKVs, [], Count) ->
|
||||||
case fractal_btree_reader:next_node(BT2) of
|
case fractal_btree_reader:next_node(BT2) of
|
||||||
{node, BKVs} ->
|
{node, BKVs} ->
|
||||||
scan(BT1, BT2, Out, AKVs, BKVs, Count);
|
scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count);
|
||||||
end_of_data ->
|
end_of_data ->
|
||||||
scan_only(BT1, Out, AKVs, Count)
|
scan_only(BT1, Out, IsLastLevel, AKVs, Count)
|
||||||
end;
|
end;
|
||||||
|
|
||||||
scan(BT1, BT2, Out, [{Key1,Value1}|AT]=AKVs, [{Key2,Value2}|BT]=BKVs, Count) ->
|
scan(BT1, BT2, Out, IsLastLevel, [{Key1,Value1}|AT]=AKVs, [{Key2,Value2}|BT]=BKVs, Count) ->
|
||||||
if Key1 < Key2 ->
|
if Key1 < Key2 ->
|
||||||
case ?LOCAL_WRITER of
|
case ?LOCAL_WRITER of
|
||||||
true ->
|
true ->
|
||||||
|
@ -62,7 +62,7 @@ scan(BT1, BT2, Out, [{Key1,Value1}|AT]=AKVs, [{Key2,Value2}|BT]=BKVs, Count) ->
|
||||||
ok = fractal_btree_writer:add(Out2=Out, Key1, Value1)
|
ok = fractal_btree_writer:add(Out2=Out, Key1, Value1)
|
||||||
end,
|
end,
|
||||||
|
|
||||||
scan(BT1, BT2, Out2, AT, BKVs, Count+1);
|
scan(BT1, BT2, Out2, IsLastLevel, AT, BKVs, Count+1);
|
||||||
|
|
||||||
Key2 < Key1 ->
|
Key2 < Key1 ->
|
||||||
case ?LOCAL_WRITER of
|
case ?LOCAL_WRITER of
|
||||||
|
@ -71,32 +71,37 @@ scan(BT1, BT2, Out, [{Key1,Value1}|AT]=AKVs, [{Key2,Value2}|BT]=BKVs, Count) ->
|
||||||
false ->
|
false ->
|
||||||
ok = fractal_btree_writer:add(Out2=Out, Key2, Value2)
|
ok = fractal_btree_writer:add(Out2=Out, Key2, Value2)
|
||||||
end,
|
end,
|
||||||
scan(BT1, BT2, Out2, AKVs, BT, Count+1);
|
scan(BT1, BT2, Out2, IsLastLevel, AKVs, BT, Count+1);
|
||||||
|
|
||||||
Key1 == Key2 ->
|
(delete =:= Value2) and (true =:= IsLastLevel) ->
|
||||||
%% TODO: eliminate tombstones, right now they just bubble down
|
scan(BT1, BT2, Out, IsLastLevel, AT, BT, Count);
|
||||||
|
|
||||||
|
true ->
|
||||||
case ?LOCAL_WRITER of
|
case ?LOCAL_WRITER of
|
||||||
true ->
|
true ->
|
||||||
{noreply, Out2} = fractal_btree_writer:handle_cast({add, Key2, Value2}, Out);
|
{noreply, Out2} = fractal_btree_writer:handle_cast({add, Key2, Value2}, Out);
|
||||||
false ->
|
false ->
|
||||||
ok = fractal_btree_writer:add(Out2=Out, Key2, Value2)
|
ok = fractal_btree_writer:add(Out2=Out, Key2, Value2)
|
||||||
end,
|
end,
|
||||||
scan(BT1, BT2, Out2, AT, BT, Count+1)
|
scan(BT1, BT2, Out2, IsLastLevel, AT, BT, Count+1)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
scan_only(BT, Out, [], Count) ->
|
scan_only(BT, Out, IsLastLevel, [], Count) ->
|
||||||
case fractal_btree_reader:next_node(BT) of
|
case fractal_btree_reader:next_node(BT) of
|
||||||
{node, KVs} ->
|
{node, KVs} ->
|
||||||
scan_only(BT, Out, KVs, Count);
|
scan_only(BT, Out, IsLastLevel, KVs, Count);
|
||||||
end_of_data ->
|
end_of_data ->
|
||||||
{ok, Count, Out}
|
{ok, Count, Out}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
scan_only(BT, Out, [{Key,Value}|Rest], Count) ->
|
scan_only(BT, Out, true, [{_,delete}|Rest], Count) ->
|
||||||
|
scan_only(BT, Out, true, Rest, Count);
|
||||||
|
|
||||||
|
scan_only(BT, Out, IsLastLevel, [{Key,Value}|Rest], Count) ->
|
||||||
case ?LOCAL_WRITER of
|
case ?LOCAL_WRITER of
|
||||||
true ->
|
true ->
|
||||||
{noreply, Out2} = fractal_btree_writer:handle_cast({add, Key, Value}, Out);
|
{noreply, Out2} = fractal_btree_writer:handle_cast({add, Key, Value}, Out);
|
||||||
false ->
|
false ->
|
||||||
ok = fractal_btree_writer:add(Out2=Out, Key, Value)
|
ok = fractal_btree_writer:add(Out2=Out, Key, Value)
|
||||||
end,
|
end,
|
||||||
scan_only(BT, Out2, Rest, Count+1).
|
scan_only(BT, Out2, IsLastLevel, Rest, Count+1).
|
||||||
|
|
|
@ -9,6 +9,10 @@
|
||||||
|
|
||||||
merge_test() ->
|
merge_test() ->
|
||||||
|
|
||||||
|
file:delete("test1"),
|
||||||
|
file:delete("test2"),
|
||||||
|
file:delete("test3"),
|
||||||
|
|
||||||
{ok, BT1} = fractal_btree_writer:open("test1"),
|
{ok, BT1} = fractal_btree_writer:open("test1"),
|
||||||
lists:foldl(fun(N,_) ->
|
lists:foldl(fun(N,_) ->
|
||||||
ok = fractal_btree_writer:add(BT1, <<N:128>>, <<"data",N:128>>)
|
ok = fractal_btree_writer:add(BT1, <<N:128>>, <<"data",N:128>>)
|
||||||
|
@ -27,13 +31,9 @@ merge_test() ->
|
||||||
ok = fractal_btree_writer:close(BT2),
|
ok = fractal_btree_writer:close(BT2),
|
||||||
|
|
||||||
|
|
||||||
{Time,{ok,Count}} = timer:tc(fractal_btree_merger2, merge, ["test1", "test2", "test3", 10000]),
|
{Time,{ok,Count}} = timer:tc(fractal_btree_merger2, merge, ["test1", "test2", "test3", 10000, true]),
|
||||||
|
|
||||||
error_logger:info_msg("time to merge: ~p/sec (time=~p, count=~p)~n", [1000000/(Time/Count), Time/1000000, Count]),
|
error_logger:info_msg("time to merge: ~p/sec (time=~p, count=~p)~n", [1000000/(Time/Count), Time/1000000, Count]),
|
||||||
|
|
||||||
ok = file:delete("test1"),
|
|
||||||
ok = file:delete("test2"),
|
|
||||||
ok = file:delete("test3"),
|
|
||||||
|
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue