diff --git a/src/lsm_btree.erl b/src/lsm_btree.erl index 3831a04..fd4dc56 100644 --- a/src/lsm_btree.erl +++ b/src/lsm_btree.erl @@ -245,8 +245,10 @@ handle_cast(Info,State) -> %% premature delete -> cleanup +terminate(normal,_State) -> + ok; terminate(_Reason,_State) -> - % error_logger:info_msg("got terminate(~p,~p)~n", [Reason,State]), + error_logger:info_msg("got terminate(~p,~p)~n", [_Reason,_State]), % flush_nursery(State), ok. diff --git a/src/lsm_btree.hrl b/src/lsm_btree.hrl index 0de4845..17107d3 100644 --- a/src/lsm_btree.hrl +++ b/src/lsm_btree.hrl @@ -26,7 +26,7 @@ %% smallest levels are 8192 entries --define(TOP_LEVEL, 13). +-define(TOP_LEVEL, 5). -define(BTREE_SIZE(Level), (1 bsl (Level))). -define(TOMBSTONE, 'deleted'). diff --git a/src/lsm_btree_level.erl b/src/lsm_btree_level.erl index 6fe8f13..ae2b619 100644 --- a/src/lsm_btree_level.erl +++ b/src/lsm_btree_level.erl @@ -42,22 +42,25 @@ -behavior(plain_fsm). -export([data_vsn/0, code_change/3]). --export([open/3, lookup/2, inject/2, close/1, async_range/3, sync_range/3]). +-export([open/3, lookup/2, inject/2, close/1, async_range/3, sync_range/3, step/2]). -include_lib("kernel/include/file.hrl"). -record(state, { - a, b, c, next, dir, level, inject_done_ref, merge_pid, folding = [] + a, b, c, next, dir, level, inject_done_ref, merge_pid, folding = [], + step_next_ref, step_caller, step_merge_ref }). %%%%% PUBLIC OPERATIONS open(Dir,Level,Next) when Level>0 -> - {ok, plain_fsm:spawn_link(?MODULE, + PID = plain_fsm:spawn_link(?MODULE, fun() -> process_flag(trap_exit,true), initialize(#state{dir=Dir,level=Level,next=Next}) - end)}. + end), + step(PID, 2*?BTREE_SIZE(?TOP_LEVEL)), + {ok, PID}. lookup(Ref, Key) -> call(Ref, {lookup, Key}). @@ -66,6 +69,9 @@ inject(Ref, FileName) -> Result = call(Ref, {inject, FileName}), Result. +step(Ref,Howmuch) -> + call(Ref,{step,Howmuch}). + close(Ref) -> try call(Ref, close) @@ -128,8 +134,8 @@ initialize(State) -> try initialize2(State) catch - Class:Ex -> - error_logger:error_msg("crash: ~p:~p ~p~n", [Class,Ex,erlang:get_stacktrace()]) + Class:Ex when not (Class == exit andalso Ex == normal) -> + error_logger:error_msg("crash2: ~p:~p ~p~n", [Class,Ex,erlang:get_stacktrace()]) end. initialize2(State) -> @@ -231,9 +237,76 @@ main_loop(State = #state{ next=Next }) -> end, ok = file:rename(FileName, ToFileName), {ok, BT} = lsm_btree_reader:open(ToFileName, random), + reply(From, ok), check_begin_merge_then_loop(setelement(SetPos, State, BT)); + %% accept step any time there is not an outstanding step + ?REQ(StepFrom, {step, HowMuch}) + when State#state.step_merge_ref == undefined, + State#state.step_caller == undefined, + State#state.step_next_ref == undefined + -> + + %% delegate the step request to the next level + if Next =:= undefined -> + DelegateRef = undefined; + true -> + DelegateRef = send_request(Next, {step, HowMuch}) + end, + + %% if there is a merge worker, send him a step message + case State#state.merge_pid of + undefined -> + MergeRef = undefined; + MergePID -> + MergeRef = monitor(process, MergePID), + MergePID ! {step, {self(), MergeRef}, HowMuch} + end, + + if (Next =:= undefined) andalso (MergeRef =:= undefined) -> + %% nothing to do ... just return OK + reply(StepFrom, ok), + main_loop(State); + true -> + main_loop(State#state{ step_next_ref=DelegateRef, + step_caller=StepFrom, + step_merge_ref=MergeRef + }) + end; + + {MRef, step_done} when MRef == State#state.step_merge_ref -> + demonitor(MRef, [flush]), + + case State#state.step_next_ref of + undefined -> + reply(State#state.step_caller, ok), + State2 = State#state{ step_caller=undefined }; + _ -> + State2 = State + end, + + main_loop(State2#state{ step_merge_ref=undefined }); + + {'DOWN', MRef, _, _, _} when MRef == State#state.step_merge_ref -> + + case State#state.step_next_ref of + undefined -> + reply(State#state.step_caller, ok), + State2 = State#state{ step_caller=undefined }; + _ -> + State2 = State + end, + + main_loop(State2#state{ step_merge_ref=undefined }); + + + ?REPLY(MRef, ok) when (MRef =:= State#state.step_next_ref) and + (State#state.step_merge_ref =:= undefined) -> + reply(State#state.step_caller, ok), + main_loop(State#state{ step_next_ref=undefined, + step_caller=undefined }); + ?REQ(From, close) -> close_if_defined(State#state.a), close_if_defined(State#state.b), diff --git a/src/lsm_btree_merger.erl b/src/lsm_btree_merger.erl index abc0854..af6939c 100644 --- a/src/lsm_btree_merger.erl +++ b/src/lsm_btree_merger.erl @@ -53,7 +53,7 @@ merge(A,B,C, Size, IsLastLevel) -> {node, AKVs} = lsm_btree_reader:first_node(BT1), {node, BKVs} = lsm_btree_reader:first_node(BT2), - {ok, Count, Out2} = scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, 0), + {ok, Count, Out2} = scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, 0, {0, none}), %% finish stream tree ok = lsm_btree_reader:close(BT1), @@ -68,24 +68,42 @@ merge(A,B,C, Size, IsLastLevel) -> {ok, Count}. +step({N, From}) -> + {N-1, From}. -scan(BT1, BT2, Out, IsLastLevel, [], BKVs, Count) -> +scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, {0, FromPID}) -> + case FromPID of + none -> + ok; + {PID, Ref} -> + PID ! {Ref, step_done} + end, + +% error_logger:info_msg("waiting for step in ~p~n", [self()]), + + receive + {step, From, HowMany} -> +% error_logger:info_msg("got step ~p,~p in ~p~n", [From,HowMany, self()]), + scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, {HowMany, From}) + end; + +scan(BT1, BT2, Out, IsLastLevel, [], BKVs, Count, Step) -> case lsm_btree_reader:next_node(BT1) of {node, AKVs} -> - scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count); + scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, Step); end_of_data -> - scan_only(BT2, Out, IsLastLevel, BKVs, Count) + scan_only(BT2, Out, IsLastLevel, BKVs, Count, Step) end; -scan(BT1, BT2, Out, IsLastLevel, AKVs, [], Count) -> +scan(BT1, BT2, Out, IsLastLevel, AKVs, [], Count, Step) -> case lsm_btree_reader:next_node(BT2) of {node, BKVs} -> - scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count); + scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, Step); end_of_data -> - scan_only(BT1, Out, IsLastLevel, AKVs, Count) + scan_only(BT1, Out, IsLastLevel, AKVs, Count, Step) end; -scan(BT1, BT2, Out, IsLastLevel, [{Key1,Value1}|AT]=AKVs, [{Key2,Value2}|BT]=BKVs, Count) -> +scan(BT1, BT2, Out, IsLastLevel, [{Key1,Value1}|AT]=AKVs, [{Key2,Value2}|BT]=BKVs, Count, Step) -> if Key1 < Key2 -> case ?LOCAL_WRITER of true -> @@ -94,7 +112,7 @@ scan(BT1, BT2, Out, IsLastLevel, [{Key1,Value1}|AT]=AKVs, [{Key2,Value2}|BT]=BKV ok = lsm_btree_writer:add(Out2=Out, Key1, Value1) end, - scan(BT1, BT2, Out2, IsLastLevel, AT, BKVs, Count+1); + scan(BT1, BT2, Out2, IsLastLevel, AT, BKVs, Count+1, step(Step)); Key2 < Key1 -> case ?LOCAL_WRITER of @@ -103,10 +121,10 @@ scan(BT1, BT2, Out, IsLastLevel, [{Key1,Value1}|AT]=AKVs, [{Key2,Value2}|BT]=BKV false -> ok = lsm_btree_writer:add(Out2=Out, Key2, Value2) end, - scan(BT1, BT2, Out2, IsLastLevel, AKVs, BT, Count+1); + scan(BT1, BT2, Out2, IsLastLevel, AKVs, BT, Count+1, step(Step)); (?TOMBSTONE =:= Value2) and (true =:= IsLastLevel) -> - scan(BT1, BT2, Out, IsLastLevel, AT, BT, Count); + scan(BT1, BT2, Out, IsLastLevel, AT, BT, Count, step(Step)); true -> case ?LOCAL_WRITER of @@ -115,25 +133,25 @@ scan(BT1, BT2, Out, IsLastLevel, [{Key1,Value1}|AT]=AKVs, [{Key2,Value2}|BT]=BKV false -> ok = lsm_btree_writer:add(Out2=Out, Key2, Value2) end, - scan(BT1, BT2, Out2, IsLastLevel, AT, BT, Count+1) + scan(BT1, BT2, Out2, IsLastLevel, AT, BT, Count+1, step(Step)) end. -scan_only(BT, Out, IsLastLevel, [], Count) -> +scan_only(BT, Out, IsLastLevel, [], Count, Step) -> case lsm_btree_reader:next_node(BT) of {node, KVs} -> - scan_only(BT, Out, IsLastLevel, KVs, Count); + scan_only(BT, Out, IsLastLevel, KVs, Count, step(Step)); end_of_data -> {ok, Count, Out} end; -scan_only(BT, Out, true, [{_,?TOMBSTONE}|Rest], Count) -> - scan_only(BT, Out, true, Rest, Count); +scan_only(BT, Out, true, [{_,?TOMBSTONE}|Rest], Count, Step) -> + scan_only(BT, Out, true, Rest, Count, step(Step)); -scan_only(BT, Out, IsLastLevel, [{Key,Value}|Rest], Count) -> +scan_only(BT, Out, IsLastLevel, [{Key,Value}|Rest], Count, Step) -> case ?LOCAL_WRITER of true -> {noreply, Out2} = lsm_btree_writer:handle_cast({add, Key, Value}, Out); false -> ok = lsm_btree_writer:add(Out2=Out, Key, Value) end, - scan_only(BT, Out2, IsLastLevel, Rest, Count+1). + scan_only(BT, Out2, IsLastLevel, Rest, Count+1, step(Step)). diff --git a/src/lsm_btree_nursery.erl b/src/lsm_btree_nursery.erl index 2ce742e..92525c2 100644 --- a/src/lsm_btree_nursery.erl +++ b/src/lsm_btree_nursery.erl @@ -162,7 +162,15 @@ finish(#nursery{ dir=Dir, cache=Cache, log_file=LogFile, total_size=_TotalSize, % [ gb_trees:size(Cache), TotalSize, FileInfo#file_info.size ]), %% inject the B-Tree (blocking RPC) - ok = lsm_btree_level:inject(TopLevel, BTreeFileName); + ok = lsm_btree_level:inject(TopLevel, BTreeFileName), + + %% synchroneously do some work if this is a top-level inject + + % error_logger:info_msg("doing step ~p~n", [?BTREE_SIZE(?TOP_LEVEL)]), + lsm_btree_level:step(TopLevel, ?BTREE_SIZE(?TOP_LEVEL)), + + ok; + _ -> ok end, diff --git a/test/lsm_btree_merger_tests.erl b/test/lsm_btree_merger_tests.erl index 520e72f..d633031 100644 --- a/test/lsm_btree_merger_tests.erl +++ b/test/lsm_btree_merger_tests.erl @@ -31,6 +31,7 @@ merge_test() -> ok = lsm_btree_writer:close(BT2), + self() ! {step, {self(), none}, 2000000000}, {Time,{ok,Count}} = timer:tc(lsm_btree_merger, merge, ["test1", "test2", "test3", 10000, true]), error_logger:info_msg("time to merge: ~p/sec (time=~p, count=~p)~n", [1000000/(Time/Count), Time/1000000, Count]), diff --git a/test/lsm_btree_tests.erl b/test/lsm_btree_tests.erl index b1861d3..f2088ad 100644 --- a/test/lsm_btree_tests.erl +++ b/test/lsm_btree_tests.erl @@ -38,7 +38,7 @@ full_test_() -> ?_test(test_tree_simple_2()), ?_test(test_tree_simple_3()), ?_test(test_tree_simple_4()), - {timeout, 30, ?_test(test_tree())}, + {timeout, 300, ?_test(test_tree())}, {timeout, 120, ?_test(test_qc())} ]}. @@ -299,7 +299,7 @@ test_tree() -> <>, <<"data",N:128>>) end, ok, - lists:seq(2,10000,1)), + lists:seq(2,100000,1)), lists:foldl(fun(N,_) -> ok = lsm_btree:put(Tree, <>, <<"data",N:128>>)