diff --git a/src/fractal_btree_level.erl b/src/fractal_btree_level.erl index ea480ad..4d861ad 100644 --- a/src/fractal_btree_level.erl +++ b/src/fractal_btree_level.erl @@ -81,7 +81,6 @@ reply({PID,Ref}, Reply) -> initialize(State) -> error_logger:info_msg("in ~p level=~p~n", [self(), State]), - AFileName = filename("A",State), BFileName = filename("B",State), CFileName = filename("C",State), @@ -120,82 +119,12 @@ initialize(State) -> end end. - -main_loop(State = #state{ a=undefined, b=undefined }) -> - Parent = plain_fsm:info(parent), - error_logger:info_msg("in main_loop~n", []), - receive - ?REQ(From, {lookup, _})=Msg -> - error_logger:info_msg("in main_loop, msg=~p~n", [Msg]), - case State#state.next of - undefined -> - reply(From, notfound); - Next -> - Next ! Msg - end, - main_loop(State); - - ?REQ(From, {inject, FileName})=_Msg -> - error_logger:info_msg("in main_loop, msg=~p~n", [_Msg]), - AFileName = filename("A",State), - ok = file:rename(FileName, AFileName), - {ok, BT} = fractal_btree_reader:open(AFileName), - reply(From, ok), - main_loop(State#state{ a=BT }); - - ?REQ(From, close)=_Msg -> - error_logger:info_msg("in main_loop, msg=~p~n", [_Msg]), - reply(From, ok), - ok; - - %% gen_fsm handling - {system, From, Req}=_Msg -> - error_logger:info_msg("in main_loop, msg=~p~n", [_Msg]), - plain_fsm:handle_system_msg( - From, Req, State, fun(S1) -> main_loop(S1) end); - - {'EXIT', Parent, Reason}=_Msg -> - error_logger:info_msg("in main_loop, msg=~p~n", [_Msg]), - plain_fsm:parent_EXIT(Reason, State) - end; - -main_loop(State = #state{ a=BT1, b=undefined, next=Next }) -> - Parent = plain_fsm:info(parent), - error_logger:info_msg("in main_loop~n", []), - receive - ?REQ(From, {lookup, Key})=Req -> - case fractal_btree_reader:lookup(BT1, Key) of - {ok, deleted} -> - reply(From, notfound); - {ok, _}=Reply -> - reply(From, Reply); - notfound when Next =:= undefined -> - reply(From, notfound); - notfound -> - Next ! Req - end, - main_loop(State); - - ?REQ(From, {inject, FileName}) -> - BFileName = filename("B",State), - ok = file:rename(FileName, BFileName), - {ok, BT2} = fractal_btree_reader:open(BFileName), - reply(From, ok), - {ok, MergePID} = begin_merge(State), - main_loop(State#state{ b=BT2, merge_pid=MergePID }); - - ?REQ(From, close) -> - fractal_btree_reader:close(BT1), - reply(From, ok), - ok; - - %% gen_fsm handling - {system, From, Req} -> - plain_fsm:handle_system_msg( - From, Req, State, fun(S1) -> main_loop(S1) end); - {'EXIT', Parent, Reason} -> - plain_fsm:parent_EXIT(Reason, State) - end; +check_begin_merge_then_loop(State=#state{a=BT1, b=BT2, merge_pid=undefined}) + when BT1/=undefined, BT2 /= undefined -> + {ok, MergePID} = begin_merge(State), + main_loop(State#state{merge_pid=MergePID }); +check_begin_merge_then_loop(State) -> + main_loop(State). main_loop(State = #state{ next=Next }) -> Parent = plain_fsm:info(parent), @@ -212,16 +141,23 @@ main_loop(State = #state{ next=Next }) -> end, main_loop(State); + ?REQ(From, {inject, FileName}) when State#state.b == undefined -> + if State#state.a == undefined -> + ToFileName = filename("A",State), + SetPos = #state.a; + true -> + ToFileName = filename("B",State), + SetPos = #state.b + end, + ok = file:rename(FileName, ToFileName), + {ok, BT} = fractal_btree_reader:open(ToFileName), + reply(From, ok), + check_begin_merge_then_loop(setelement(SetPos, State, BT)); + ?REQ(From, close) -> - fractal_btree_reader:close(State#state.a), - fractal_btree_reader:close(State#state.b), - MergePID = State#state.merge_pid, - if MergePID =:= undefined -> - ok; - true -> - erlang:exit(State#state.merge_pid, shutdown) - end, - %% TODO: stop merger, if any? + close_if_defined(State#state.a), + close_if_defined(State#state.b), + stop_if_defined(State#state.merge_pid), reply(From, ok), ok; @@ -296,6 +232,13 @@ do_lookup(Key, [BT|Rest]) -> notfound -> do_lookup(Key, Rest) end. +close_if_defined(undefined) -> ok; +close_if_defined(BT) -> fractal_btree_reader:close(BT). + +stop_if_defined(undefined) -> ok; +stop_if_defined(MergePid=) when is_pid(MergePid) -> + erlang:exit(MergePid, shutdown). + begin_merge(State) -> AFileName = filename("A",State), BFileName = filename("B",State),