diff --git a/src/fractal_btree_level.erl b/src/fractal_btree_level.erl index 1443437..7169384 100644 --- a/src/fractal_btree_level.erl +++ b/src/fractal_btree_level.erl @@ -306,7 +306,7 @@ begin_merge(State) -> file:delete(XFileName), MergePID = spawn_link(fun() -> - {ok, OutCount} = fractal_btree_merger:merge(AFileName, BFileName, XFileName, + {ok, OutCount} = fractal_btree_merger2:merge(AFileName, BFileName, XFileName, 1 bsl (State#state.level + 1)), % error_logger:info_msg("merge done ~p,~p -> ~p~n", [AFileName, BFileName, XFileName]), diff --git a/src/fractal_btree_merger2.erl b/src/fractal_btree_merger2.erl new file mode 100644 index 0000000..0a99834 --- /dev/null +++ b/src/fractal_btree_merger2.erl @@ -0,0 +1,68 @@ +-module(fractal_btree_merger2). + +%% +%% Naive Merge of two b-trees. A better implementation should iterate leafs, not KV's +%% + +-export([merge/4]). + +merge(A,B,C, Size) -> + {ok, BT1} = fractal_btree_reader:open(A), + {ok, BT2} = fractal_btree_reader:open(B), + {ok, Out} = fractal_btree_writer:open(C, Size), + + {node, AKVs} = fractal_btree_reader:first_node(BT1), + {node, BKVs} = fractal_btree_reader:first_node(BT2), + + {ok, Count} = scan(BT1, BT2, Out, AKVs, BKVs, 0), + + %% finish stream tree + ok = fractal_btree_reader:close(BT1), + ok = fractal_btree_reader:close(BT2), + ok = fractal_btree_writer:close(Out), + + {ok, Count}. + + +scan(BT1, BT2, Out, [], BKVs, Count) -> + case fractal_btree_reader:next_node(BT1) of + {node, AKVs} -> + scan(BT1, BT2, Out, AKVs, BKVs, Count); + end_of_data -> + scan_only(BT2, Out, BKVs, Count) + end; + +scan(BT1, BT2, Out, AKVs, [], Count) -> + case fractal_btree_reader:next_node(BT2) of + {node, BKVs} -> + scan(BT1, BT2, Out, AKVs, BKVs, Count); + end_of_data -> + scan_only(BT1, Out, AKVs, Count) + end; + +scan(BT1, BT2, Out, [{Key1,Value1}|AT]=AKVs, [{Key2,Value2}|BT]=BKVs, Count) -> + if Key1 < Key2 -> + ok = fractal_btree_writer:add(Out, Key1, Value1), + scan(BT1, BT2, Out, AT, BKVs, Count+1); + + Key2 < Key1 -> + ok = fractal_btree_writer:add(Out, Key2, Value2), + scan(BT1, BT2, Out, AKVs, BT, Count+1); + + Key1 == Key2 -> + %% TODO: eliminate tombstones, right now they just bubble down + ok = fractal_btree_writer:add(Out, Key2, Value2), + scan(BT1, BT2, Out, AT, BT, Count+1) + end. + +scan_only(BT, Out, [], Count) -> + case fractal_btree_reader:next_node(BT) of + {node, KVs} -> + scan_only(BT, Out, KVs, Count); + end_of_data -> + {ok, Count} + end; + +scan_only(BT, Out, [{Key,Value}|Rest], Count) -> + ok = fractal_btree_writer:add(Out, Key, Value), + scan_only(BT, Out, Rest, Count+1). diff --git a/src/fractal_btree_reader.erl b/src/fractal_btree_reader.erl index 71ebe02..29315f7 100644 --- a/src/fractal_btree_reader.erl +++ b/src/fractal_btree_reader.erl @@ -3,6 +3,7 @@ -include_lib("kernel/include/file.hrl"). -export([open/1,close/1,lookup/2,fold/3]). +-export([first_node/1,next_node/1]). -record(node, { level, members=[] }). -record(index, {file, root, bloom}). @@ -43,6 +44,22 @@ fold1(File,Fun,Acc0) -> fold0(File,Fun,Node,Acc0) end. +first_node(#index{file=File}) -> + case read_node(File, 0) of + {ok, #node{level=0, members=Members}} -> + {node, Members} + end. + +next_node(#index{file=File}=Index) -> + case read_node(File) of + {ok, #node{level=0, members=Members}} -> + {node, Members}; + {ok, #node{level=N}} when N>0 -> + next_node(Index); + eof -> + end_of_data + end. + close(#index{file=File}) -> file:close(File). diff --git a/test/fractal_btree_tests.erl b/test/fractal_btree_tests.erl index ca2c5b8..9b4d2d6 100644 --- a/test/fractal_btree_tests.erl +++ b/test/fractal_btree_tests.erl @@ -83,7 +83,7 @@ merge_test() -> ok = fractal_btree_writer:close(BT2), - {Time,{ok,Count}} = timer:tc(fractal_btree_merger, merge, ["test1", "test2", "test3", 10000]), + {Time,{ok,Count}} = timer:tc(fractal_btree_merger2, merge, ["test1", "test2", "test3", 10000]), error_logger:info_msg("time to merge: ~p/sec (time=~p, count=~p)~n", [1000000/(Time/Count), Time/1000000, Count]),