diff --git a/src/fractal_btree_level.erl b/src/fractal_btree_level.erl index 7169384..6e87923 100644 --- a/src/fractal_btree_level.erl +++ b/src/fractal_btree_level.erl @@ -81,7 +81,6 @@ reply({PID,Ref}, Reply) -> initialize(State) -> % error_logger:info_msg("in ~p level=~p~n", [self(), State]), - AFileName = filename("A",State), BFileName = filename("B",State), CFileName = filename("C",State), @@ -95,7 +94,7 @@ initialize(State) -> ok = file:rename(CFileName, AFileName), {ok, BT} = fractal_btree_reader:open(CFileName), - main_loop1(State#state{ a= BT, b=undefined }); + main_loop(State#state{ a= BT, b=undefined }); {error, enoent} -> case file:read_file_info(BFileName) of @@ -103,137 +102,59 @@ initialize(State) -> {ok, BT1} = fractal_btree_reader:open(AFileName), {ok, BT2} = fractal_btree_reader:open(BFileName), - {ok, MergePID} = begin_merge(State), - - main_loop2(State#state{ a=BT1, b=BT2, merge_pid=MergePID }); + check_begin_merge_then_loop(State#state{ a=BT1, b=BT2 }); {error, enoent} -> case file:read_file_info(AFileName) of {ok, _} -> {ok, BT1} = fractal_btree_reader:open(AFileName), - main_loop1(State#state{ a=BT1 }); + main_loop(State#state{ a=BT1 }); {error, enoent} -> - main_loop0(State) + main_loop(State) end end end. +check_begin_merge_then_loop(State=#state{a=BT1, b=BT2, merge_pid=undefined}) + when BT1/=undefined, BT2 /= undefined -> + {ok, MergePID} = begin_merge(State), + main_loop(State#state{merge_pid=MergePID }); +check_begin_merge_then_loop(State) -> + main_loop(State). -main_loop0(State = #state{ a=undefined, b=undefined }) -> +main_loop(State = #state{ next=Next }) -> Parent = plain_fsm:info(parent), -% error_logger:info_msg("in main_loop0~n", []), - receive - ?REQ(From, {lookup, _})=Msg -> -% error_logger:info_msg("in main_loop0, msg=~p~n", [Msg]), - case State#state.next of - undefined -> - reply(From, notfound); - Next -> - Next ! Msg - end, - main_loop0(State); - - ?REQ(From, {inject, FileName})=_Msg -> -% error_logger:info_msg("in main_loop0, msg=~p~n", [_Msg]), - AFileName = filename("A",State), - ok = file:rename(FileName, AFileName), - {ok, BT} = fractal_btree_reader:open(AFileName), - reply(From, ok), - main_loop1(State#state{ a=BT }); - - ?REQ(From, close)=_Msg -> -% error_logger:info_msg("in main_loop0, msg=~p~n", [_Msg]), - reply(From, ok), - ok; - - %% gen_fsm handling - {system, From, Req}=_Msg -> -% error_logger:info_msg("in main_loop0, msg=~p~n", [_Msg]), - plain_fsm:handle_system_msg( - From, Req, State, fun(S1) -> main_loop0(S1) end); - - {'EXIT', Parent, Reason}=_Msg -> -% error_logger:info_msg("in main_loop0, msg=~p~n", [_Msg]), - plain_fsm:parent_EXIT(Reason, State) - end. - -main_loop1(State = #state{ a=BT1, b=undefined, next=Next }) -> - Parent = plain_fsm:info(parent), -% error_logger:info_msg("in main_loop1~n", []), receive ?REQ(From, {lookup, Key})=Req -> - case fractal_btree_reader:lookup(BT1, Key) of - {ok, deleted} -> - reply(From, notfound); - {ok, _}=Reply -> - reply(From, Reply); - notfound when Next =:= undefined -> - reply(From, notfound); - notfound -> - Next ! Req - end, - main_loop1(State); + case do_lookup(Key, [State#state.b, State#state.a, Next]) of + notfound -> + reply(From, notfound); + {found, Result} -> + reply(From, {ok, Result}); + {delegate, DelegatePid} -> + DelegatePid ! Req + end, + main_loop(State); - ?REQ(From, {inject, FileName}) -> - BFileName = filename("B",State), - ok = file:rename(FileName, BFileName), - {ok, BT2} = fractal_btree_reader:open(BFileName), - reply(From, ok), - {ok, MergePID} = begin_merge(State), - main_loop2(State#state{ b=BT2, merge_pid=MergePID }); + ?REQ(From, {inject, FileName}) when State#state.b == undefined -> + if State#state.a == undefined -> + ToFileName = filename("A",State), + SetPos = #state.a; + true -> + ToFileName = filename("B",State), + SetPos = #state.b + end, + ok = file:rename(FileName, ToFileName), + {ok, BT} = fractal_btree_reader:open(ToFileName), + reply(From, ok), + check_begin_merge_then_loop(setelement(SetPos, State, BT)); ?REQ(From, close) -> - fractal_btree_reader:close(BT1), - reply(From, ok), - ok; - - %% gen_fsm handling - {system, From, Req} -> - plain_fsm:handle_system_msg( - From, Req, State, fun(S1) -> main_loop1(S1) end); - {'EXIT', Parent, Reason} -> - plain_fsm:parent_EXIT(Reason, State) - end. - -main_loop2(State = #state{ next=Next }) -> - Parent = plain_fsm:info(parent), -% error_logger:info_msg("in main_loop2~n", []), - receive - ?REQ(From, {lookup, Key})=Req -> - case fractal_btree_reader:lookup(State#state.b, Key) of - {ok, deleted} -> - reply(From, notfound), - main_loop2(State); - {ok, _}=Reply -> - reply(From, Reply), - main_loop2(State); - _ -> - case fractal_btree_reader:lookup(State#state.a, Key) of - {ok, deleted} -> - reply(From, notfound); - {ok, _}=Reply -> - reply(From, Reply); - notfound when Next =:= undefined -> - reply(From, notfound); - notfound -> - Next ! Req - end, - main_loop2(State) - end; - - - ?REQ(From, close) -> - fractal_btree_reader:close(State#state.a), - fractal_btree_reader:close(State#state.b), - MergePID = State#state.merge_pid, - if MergePID =:= undefined -> - ok; - true -> - erlang:exit(State#state.merge_pid, shutdown) - end, - %% TODO: stop merger, if any? + close_if_defined(State#state.a), + close_if_defined(State#state.b), + stop_if_defined(State#state.merge_pid), reply(From, ok), ok; @@ -255,7 +176,7 @@ main_loop2(State = #state{ next=Next }) -> ok = file:rename(CFileName, AFileName), {ok, BT} = fractal_btree_reader:open(AFileName), - main_loop1(State2#state{ a=BT, b=undefined, merge_pid=undefined }); + main_loop(State2#state{ a=BT, b=undefined, merge_pid=undefined }); %% %% We need to push the output of merging to the next level @@ -270,7 +191,7 @@ main_loop2(State = #state{ next=Next }) -> end, MRef = send_request(State1#state.next, {inject, OutFileName}), - main_loop2(State1#state{ inject_done_ref = MRef, merge_pid=undefined }); + main_loop(State1#state{ inject_done_ref = MRef, merge_pid=undefined }); %% %% Our successor accepted the inject @@ -278,7 +199,7 @@ main_loop2(State = #state{ next=Next }) -> ?REPLY(MRef, ok) when MRef =:= State#state.inject_done_ref -> erlang:demonitor(MRef, [flush]), {ok, State2} = close_a_and_b(State), - main_loop0(State2#state{ inject_done_ref=undefined }); + main_loop(State2#state{ inject_done_ref=undefined }); %% %% Our successor died! @@ -289,13 +210,31 @@ main_loop2(State = #state{ next=Next }) -> %% gen_fsm handling {system, From, Req} -> plain_fsm:handle_system_msg( - From, Req, State, fun(S1) -> main_loop2(S1) end); + From, Req, State, fun(S1) -> main_loop(S1) end); {'EXIT', Parent, Reason} -> plain_fsm:parent_EXIT(Reason, State) end. +do_lookup(_Key, []) -> + notfound; +do_lookup(_Key, [Pid]) when is_pid(Pid) -> + {delegate, Pid}; +do_lookup(Key, [undefined|Rest]) -> + do_lookup(Key, Rest); +do_lookup(Key, [BT|Rest]) -> + case fractal_btree_reader:lookup(BT, Key) of + {ok, deleted} -> notfound; + {ok, Result} -> {found, Result}; + notfound -> do_lookup(Key, Rest) + end. +close_if_defined(undefined) -> ok; +close_if_defined(BT) -> fractal_btree_reader:close(BT). + +stop_if_defined(undefined) -> ok; +stop_if_defined(MergePid) when is_pid(MergePid) -> + erlang:exit(MergePid, shutdown). begin_merge(State) -> AFileName = filename("A",State), @@ -333,6 +272,3 @@ close_a_and_b(State) -> filename(PFX, State) -> filename:join(State#state.dir, PFX ++ "-" ++ integer_to_list(State#state.level) ++ ".data"). -size(State) -> - (1 bsl State#state.level). -