diff --git a/src/fractal_btree_level.erl b/src/fractal_btree_level.erl index 4e9e4b9..660be0d 100644 --- a/src/fractal_btree_level.erl +++ b/src/fractal_btree_level.erl @@ -261,7 +261,8 @@ begin_merge(State) -> MergePID = proc_lib:spawn_link(fun() -> {ok, OutCount} = fractal_btree_merger2:merge(AFileName, BFileName, XFileName, - 1 bsl (State#state.level + 1)), + 1 bsl (State#state.level + 1), + State#state.next =:= undefined), % error_logger:info_msg("merge done ~p,~p -> ~p~n", [AFileName, BFileName, XFileName]), Owner ! {merge_done, OutCount, XFileName} diff --git a/src/fractal_btree_merger2.erl b/src/fractal_btree_merger2.erl index bb1f877..ed6b4cf 100644 --- a/src/fractal_btree_merger2.erl +++ b/src/fractal_btree_merger2.erl @@ -1,14 +1,14 @@ -module(fractal_btree_merger2). %% -%% Naive Merge of two b-trees. A better implementation should iterate leafs, not KV's +%% Merging two BTrees %% --export([merge/4]). +-export([merge/5]). -define(LOCAL_WRITER, true). -merge(A,B,C, Size) -> +merge(A,B,C, Size, IsLastLevel) -> {ok, BT1} = fractal_btree_reader:open(A), {ok, BT2} = fractal_btree_reader:open(B), case ?LOCAL_WRITER of @@ -21,7 +21,7 @@ merge(A,B,C, Size) -> {node, AKVs} = fractal_btree_reader:first_node(BT1), {node, BKVs} = fractal_btree_reader:first_node(BT2), - {ok, Count, Out2} = scan(BT1, BT2, Out, AKVs, BKVs, 0), + {ok, Count, Out2} = scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, 0), %% finish stream tree ok = fractal_btree_reader:close(BT1), @@ -37,23 +37,23 @@ merge(A,B,C, Size) -> {ok, Count}. -scan(BT1, BT2, Out, [], BKVs, Count) -> +scan(BT1, BT2, Out, IsLastLevel, [], BKVs, Count) -> case fractal_btree_reader:next_node(BT1) of {node, AKVs} -> - scan(BT1, BT2, Out, AKVs, BKVs, Count); + scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count); end_of_data -> - scan_only(BT2, Out, BKVs, Count) + scan_only(BT2, Out, IsLastLevel, BKVs, Count) end; -scan(BT1, BT2, Out, AKVs, [], Count) -> +scan(BT1, BT2, Out, IsLastLevel, AKVs, [], Count) -> case fractal_btree_reader:next_node(BT2) of {node, BKVs} -> - scan(BT1, BT2, Out, AKVs, BKVs, Count); + scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count); end_of_data -> - scan_only(BT1, Out, AKVs, Count) + scan_only(BT1, Out, IsLastLevel, AKVs, Count) end; -scan(BT1, BT2, Out, [{Key1,Value1}|AT]=AKVs, [{Key2,Value2}|BT]=BKVs, Count) -> +scan(BT1, BT2, Out, IsLastLevel, [{Key1,Value1}|AT]=AKVs, [{Key2,Value2}|BT]=BKVs, Count) -> if Key1 < Key2 -> case ?LOCAL_WRITER of true -> @@ -62,7 +62,7 @@ scan(BT1, BT2, Out, [{Key1,Value1}|AT]=AKVs, [{Key2,Value2}|BT]=BKVs, Count) -> ok = fractal_btree_writer:add(Out2=Out, Key1, Value1) end, - scan(BT1, BT2, Out2, AT, BKVs, Count+1); + scan(BT1, BT2, Out2, IsLastLevel, AT, BKVs, Count+1); Key2 < Key1 -> case ?LOCAL_WRITER of @@ -71,32 +71,37 @@ scan(BT1, BT2, Out, [{Key1,Value1}|AT]=AKVs, [{Key2,Value2}|BT]=BKVs, Count) -> false -> ok = fractal_btree_writer:add(Out2=Out, Key2, Value2) end, - scan(BT1, BT2, Out2, AKVs, BT, Count+1); + scan(BT1, BT2, Out2, IsLastLevel, AKVs, BT, Count+1); - Key1 == Key2 -> - %% TODO: eliminate tombstones, right now they just bubble down + (delete =:= Value2) and (true =:= IsLastLevel) -> + scan(BT1, BT2, Out, IsLastLevel, AT, BT, Count); + + true -> case ?LOCAL_WRITER of true -> {noreply, Out2} = fractal_btree_writer:handle_cast({add, Key2, Value2}, Out); false -> ok = fractal_btree_writer:add(Out2=Out, Key2, Value2) end, - scan(BT1, BT2, Out2, AT, BT, Count+1) + scan(BT1, BT2, Out2, IsLastLevel, AT, BT, Count+1) end. -scan_only(BT, Out, [], Count) -> +scan_only(BT, Out, IsLastLevel, [], Count) -> case fractal_btree_reader:next_node(BT) of {node, KVs} -> - scan_only(BT, Out, KVs, Count); + scan_only(BT, Out, IsLastLevel, KVs, Count); end_of_data -> {ok, Count, Out} end; -scan_only(BT, Out, [{Key,Value}|Rest], Count) -> +scan_only(BT, Out, true, [{_,delete}|Rest], Count) -> + scan_only(BT, Out, true, Rest, Count); + +scan_only(BT, Out, IsLastLevel, [{Key,Value}|Rest], Count) -> case ?LOCAL_WRITER of true -> {noreply, Out2} = fractal_btree_writer:handle_cast({add, Key, Value}, Out); false -> ok = fractal_btree_writer:add(Out2=Out, Key, Value) end, - scan_only(BT, Out2, Rest, Count+1). + scan_only(BT, Out2, IsLastLevel, Rest, Count+1). diff --git a/test/fractal_btree_merger_tests.erl b/test/fractal_btree_merger_tests.erl index d0f36e5..1d41582 100644 --- a/test/fractal_btree_merger_tests.erl +++ b/test/fractal_btree_merger_tests.erl @@ -9,6 +9,10 @@ merge_test() -> + file:delete("test1"), + file:delete("test2"), + file:delete("test3"), + {ok, BT1} = fractal_btree_writer:open("test1"), lists:foldl(fun(N,_) -> ok = fractal_btree_writer:add(BT1, <>, <<"data",N:128>>) @@ -27,13 +31,9 @@ merge_test() -> ok = fractal_btree_writer:close(BT2), - {Time,{ok,Count}} = timer:tc(fractal_btree_merger2, merge, ["test1", "test2", "test3", 10000]), + {Time,{ok,Count}} = timer:tc(fractal_btree_merger2, merge, ["test1", "test2", "test3", 10000, true]), error_logger:info_msg("time to merge: ~p/sec (time=~p, count=~p)~n", [1000000/(Time/Count), Time/1000000, Count]), - ok = file:delete("test1"), - ok = file:delete("test2"), - ok = file:delete("test3"), - ok.