Merge branch 'master' of github.com:krestenkrab/fractal_btree
This commit is contained in:
commit
fa4f1e1f49
4 changed files with 87 additions and 2 deletions
|
@ -306,7 +306,7 @@ begin_merge(State) ->
|
|||
file:delete(XFileName),
|
||||
|
||||
MergePID = spawn_link(fun() ->
|
||||
{ok, OutCount} = fractal_btree_merger:merge(AFileName, BFileName, XFileName,
|
||||
{ok, OutCount} = fractal_btree_merger2:merge(AFileName, BFileName, XFileName,
|
||||
1 bsl (State#state.level + 1)),
|
||||
|
||||
% error_logger:info_msg("merge done ~p,~p -> ~p~n", [AFileName, BFileName, XFileName]),
|
||||
|
|
68
src/fractal_btree_merger2.erl
Normal file
68
src/fractal_btree_merger2.erl
Normal file
|
@ -0,0 +1,68 @@
|
|||
-module(fractal_btree_merger2).
|
||||
|
||||
%%
|
||||
%% Naive Merge of two b-trees. A better implementation should iterate leafs, not KV's
|
||||
%%
|
||||
|
||||
-export([merge/4]).
|
||||
|
||||
merge(A,B,C, Size) ->
|
||||
{ok, BT1} = fractal_btree_reader:open(A),
|
||||
{ok, BT2} = fractal_btree_reader:open(B),
|
||||
{ok, Out} = fractal_btree_writer:open(C, Size),
|
||||
|
||||
{node, AKVs} = fractal_btree_reader:first_node(BT1),
|
||||
{node, BKVs} = fractal_btree_reader:first_node(BT2),
|
||||
|
||||
{ok, Count} = scan(BT1, BT2, Out, AKVs, BKVs, 0),
|
||||
|
||||
%% finish stream tree
|
||||
ok = fractal_btree_reader:close(BT1),
|
||||
ok = fractal_btree_reader:close(BT2),
|
||||
ok = fractal_btree_writer:close(Out),
|
||||
|
||||
{ok, Count}.
|
||||
|
||||
|
||||
scan(BT1, BT2, Out, [], BKVs, Count) ->
|
||||
case fractal_btree_reader:next_node(BT1) of
|
||||
{node, AKVs} ->
|
||||
scan(BT1, BT2, Out, AKVs, BKVs, Count);
|
||||
end_of_data ->
|
||||
scan_only(BT2, Out, BKVs, Count)
|
||||
end;
|
||||
|
||||
scan(BT1, BT2, Out, AKVs, [], Count) ->
|
||||
case fractal_btree_reader:next_node(BT2) of
|
||||
{node, BKVs} ->
|
||||
scan(BT1, BT2, Out, AKVs, BKVs, Count);
|
||||
end_of_data ->
|
||||
scan_only(BT1, Out, AKVs, Count)
|
||||
end;
|
||||
|
||||
scan(BT1, BT2, Out, [{Key1,Value1}|AT]=AKVs, [{Key2,Value2}|BT]=BKVs, Count) ->
|
||||
if Key1 < Key2 ->
|
||||
ok = fractal_btree_writer:add(Out, Key1, Value1),
|
||||
scan(BT1, BT2, Out, AT, BKVs, Count+1);
|
||||
|
||||
Key2 < Key1 ->
|
||||
ok = fractal_btree_writer:add(Out, Key2, Value2),
|
||||
scan(BT1, BT2, Out, AKVs, BT, Count+1);
|
||||
|
||||
Key1 == Key2 ->
|
||||
%% TODO: eliminate tombstones, right now they just bubble down
|
||||
ok = fractal_btree_writer:add(Out, Key2, Value2),
|
||||
scan(BT1, BT2, Out, AT, BT, Count+1)
|
||||
end.
|
||||
|
||||
scan_only(BT, Out, [], Count) ->
|
||||
case fractal_btree_reader:next_node(BT) of
|
||||
{node, KVs} ->
|
||||
scan_only(BT, Out, KVs, Count);
|
||||
end_of_data ->
|
||||
{ok, Count}
|
||||
end;
|
||||
|
||||
scan_only(BT, Out, [{Key,Value}|Rest], Count) ->
|
||||
ok = fractal_btree_writer:add(Out, Key, Value),
|
||||
scan_only(BT, Out, Rest, Count+1).
|
|
@ -3,6 +3,7 @@
|
|||
-include_lib("kernel/include/file.hrl").
|
||||
|
||||
-export([open/1,close/1,lookup/2,fold/3]).
|
||||
-export([first_node/1,next_node/1]).
|
||||
|
||||
-record(node, { level, members=[] }).
|
||||
-record(index, {file, root, bloom}).
|
||||
|
@ -43,6 +44,22 @@ fold1(File,Fun,Acc0) ->
|
|||
fold0(File,Fun,Node,Acc0)
|
||||
end.
|
||||
|
||||
first_node(#index{file=File}) ->
|
||||
case read_node(File, 0) of
|
||||
{ok, #node{level=0, members=Members}} ->
|
||||
{node, Members}
|
||||
end.
|
||||
|
||||
next_node(#index{file=File}=Index) ->
|
||||
case read_node(File) of
|
||||
{ok, #node{level=0, members=Members}} ->
|
||||
{node, Members};
|
||||
{ok, #node{level=N}} when N>0 ->
|
||||
next_node(Index);
|
||||
eof ->
|
||||
end_of_data
|
||||
end.
|
||||
|
||||
close(#index{file=File}) ->
|
||||
file:close(File).
|
||||
|
||||
|
|
|
@ -83,7 +83,7 @@ merge_test() ->
|
|||
ok = fractal_btree_writer:close(BT2),
|
||||
|
||||
|
||||
{Time,{ok,Count}} = timer:tc(fractal_btree_merger, merge, ["test1", "test2", "test3", 10000]),
|
||||
{Time,{ok,Count}} = timer:tc(fractal_btree_merger2, merge, ["test1", "test2", "test3", 10000]),
|
||||
|
||||
error_logger:info_msg("time to merge: ~p/sec (time=~p, count=~p)~n", [1000000/(Time/Count), Time/1000000, Count]),
|
||||
|
||||
|
|
Loading…
Reference in a new issue