Merge pull request #1 from eriksoe/level_mainloop_simplification
Level mainloop simplification
This commit is contained in:
commit
8c6d832f27
1 changed files with 57 additions and 121 deletions
|
@ -81,7 +81,6 @@ reply({PID,Ref}, Reply) ->
|
|||
initialize(State) ->
|
||||
% error_logger:info_msg("in ~p level=~p~n", [self(), State]),
|
||||
|
||||
|
||||
AFileName = filename("A",State),
|
||||
BFileName = filename("B",State),
|
||||
CFileName = filename("C",State),
|
||||
|
@ -95,7 +94,7 @@ initialize(State) ->
|
|||
ok = file:rename(CFileName, AFileName),
|
||||
|
||||
{ok, BT} = fractal_btree_reader:open(CFileName),
|
||||
main_loop1(State#state{ a= BT, b=undefined });
|
||||
main_loop(State#state{ a= BT, b=undefined });
|
||||
|
||||
{error, enoent} ->
|
||||
case file:read_file_info(BFileName) of
|
||||
|
@ -103,137 +102,59 @@ initialize(State) ->
|
|||
{ok, BT1} = fractal_btree_reader:open(AFileName),
|
||||
{ok, BT2} = fractal_btree_reader:open(BFileName),
|
||||
|
||||
{ok, MergePID} = begin_merge(State),
|
||||
|
||||
main_loop2(State#state{ a=BT1, b=BT2, merge_pid=MergePID });
|
||||
check_begin_merge_then_loop(State#state{ a=BT1, b=BT2 });
|
||||
|
||||
{error, enoent} ->
|
||||
|
||||
case file:read_file_info(AFileName) of
|
||||
{ok, _} ->
|
||||
{ok, BT1} = fractal_btree_reader:open(AFileName),
|
||||
main_loop1(State#state{ a=BT1 });
|
||||
main_loop(State#state{ a=BT1 });
|
||||
|
||||
{error, enoent} ->
|
||||
main_loop0(State)
|
||||
main_loop(State)
|
||||
end
|
||||
end
|
||||
end.
|
||||
|
||||
|
||||
main_loop0(State = #state{ a=undefined, b=undefined }) ->
|
||||
Parent = plain_fsm:info(parent),
|
||||
% error_logger:info_msg("in main_loop0~n", []),
|
||||
receive
|
||||
?REQ(From, {lookup, _})=Msg ->
|
||||
% error_logger:info_msg("in main_loop0, msg=~p~n", [Msg]),
|
||||
case State#state.next of
|
||||
undefined ->
|
||||
reply(From, notfound);
|
||||
Next ->
|
||||
Next ! Msg
|
||||
end,
|
||||
main_loop0(State);
|
||||
|
||||
?REQ(From, {inject, FileName})=_Msg ->
|
||||
% error_logger:info_msg("in main_loop0, msg=~p~n", [_Msg]),
|
||||
AFileName = filename("A",State),
|
||||
ok = file:rename(FileName, AFileName),
|
||||
{ok, BT} = fractal_btree_reader:open(AFileName),
|
||||
reply(From, ok),
|
||||
main_loop1(State#state{ a=BT });
|
||||
|
||||
?REQ(From, close)=_Msg ->
|
||||
% error_logger:info_msg("in main_loop0, msg=~p~n", [_Msg]),
|
||||
reply(From, ok),
|
||||
ok;
|
||||
|
||||
%% gen_fsm handling
|
||||
{system, From, Req}=_Msg ->
|
||||
% error_logger:info_msg("in main_loop0, msg=~p~n", [_Msg]),
|
||||
plain_fsm:handle_system_msg(
|
||||
From, Req, State, fun(S1) -> main_loop0(S1) end);
|
||||
|
||||
{'EXIT', Parent, Reason}=_Msg ->
|
||||
% error_logger:info_msg("in main_loop0, msg=~p~n", [_Msg]),
|
||||
plain_fsm:parent_EXIT(Reason, State)
|
||||
end.
|
||||
|
||||
main_loop1(State = #state{ a=BT1, b=undefined, next=Next }) ->
|
||||
Parent = plain_fsm:info(parent),
|
||||
% error_logger:info_msg("in main_loop1~n", []),
|
||||
receive
|
||||
?REQ(From, {lookup, Key})=Req ->
|
||||
case fractal_btree_reader:lookup(BT1, Key) of
|
||||
{ok, deleted} ->
|
||||
reply(From, notfound);
|
||||
{ok, _}=Reply ->
|
||||
reply(From, Reply);
|
||||
notfound when Next =:= undefined ->
|
||||
reply(From, notfound);
|
||||
notfound ->
|
||||
Next ! Req
|
||||
end,
|
||||
main_loop1(State);
|
||||
|
||||
?REQ(From, {inject, FileName}) ->
|
||||
BFileName = filename("B",State),
|
||||
ok = file:rename(FileName, BFileName),
|
||||
{ok, BT2} = fractal_btree_reader:open(BFileName),
|
||||
reply(From, ok),
|
||||
check_begin_merge_then_loop(State=#state{a=BT1, b=BT2, merge_pid=undefined})
|
||||
when BT1/=undefined, BT2 /= undefined ->
|
||||
{ok, MergePID} = begin_merge(State),
|
||||
main_loop2(State#state{ b=BT2, merge_pid=MergePID });
|
||||
main_loop(State#state{merge_pid=MergePID });
|
||||
check_begin_merge_then_loop(State) ->
|
||||
main_loop(State).
|
||||
|
||||
?REQ(From, close) ->
|
||||
fractal_btree_reader:close(BT1),
|
||||
reply(From, ok),
|
||||
ok;
|
||||
|
||||
%% gen_fsm handling
|
||||
{system, From, Req} ->
|
||||
plain_fsm:handle_system_msg(
|
||||
From, Req, State, fun(S1) -> main_loop1(S1) end);
|
||||
{'EXIT', Parent, Reason} ->
|
||||
plain_fsm:parent_EXIT(Reason, State)
|
||||
end.
|
||||
|
||||
main_loop2(State = #state{ next=Next }) ->
|
||||
main_loop(State = #state{ next=Next }) ->
|
||||
Parent = plain_fsm:info(parent),
|
||||
% error_logger:info_msg("in main_loop2~n", []),
|
||||
receive
|
||||
?REQ(From, {lookup, Key})=Req ->
|
||||
case fractal_btree_reader:lookup(State#state.b, Key) of
|
||||
{ok, deleted} ->
|
||||
reply(From, notfound),
|
||||
main_loop2(State);
|
||||
{ok, _}=Reply ->
|
||||
reply(From, Reply),
|
||||
main_loop2(State);
|
||||
_ ->
|
||||
case fractal_btree_reader:lookup(State#state.a, Key) of
|
||||
{ok, deleted} ->
|
||||
reply(From, notfound);
|
||||
{ok, _}=Reply ->
|
||||
reply(From, Reply);
|
||||
notfound when Next =:= undefined ->
|
||||
reply(From, notfound);
|
||||
case do_lookup(Key, [State#state.b, State#state.a, Next]) of
|
||||
notfound ->
|
||||
Next ! Req
|
||||
reply(From, notfound);
|
||||
{found, Result} ->
|
||||
reply(From, {ok, Result});
|
||||
{delegate, DelegatePid} ->
|
||||
DelegatePid ! Req
|
||||
end,
|
||||
main_loop2(State)
|
||||
end;
|
||||
main_loop(State);
|
||||
|
||||
?REQ(From, {inject, FileName}) when State#state.b == undefined ->
|
||||
if State#state.a == undefined ->
|
||||
ToFileName = filename("A",State),
|
||||
SetPos = #state.a;
|
||||
true ->
|
||||
ToFileName = filename("B",State),
|
||||
SetPos = #state.b
|
||||
end,
|
||||
ok = file:rename(FileName, ToFileName),
|
||||
{ok, BT} = fractal_btree_reader:open(ToFileName),
|
||||
reply(From, ok),
|
||||
check_begin_merge_then_loop(setelement(SetPos, State, BT));
|
||||
|
||||
?REQ(From, close) ->
|
||||
fractal_btree_reader:close(State#state.a),
|
||||
fractal_btree_reader:close(State#state.b),
|
||||
MergePID = State#state.merge_pid,
|
||||
if MergePID =:= undefined ->
|
||||
ok;
|
||||
true ->
|
||||
erlang:exit(State#state.merge_pid, shutdown)
|
||||
end,
|
||||
%% TODO: stop merger, if any?
|
||||
close_if_defined(State#state.a),
|
||||
close_if_defined(State#state.b),
|
||||
stop_if_defined(State#state.merge_pid),
|
||||
reply(From, ok),
|
||||
ok;
|
||||
|
||||
|
@ -255,7 +176,7 @@ main_loop2(State = #state{ next=Next }) ->
|
|||
ok = file:rename(CFileName, AFileName),
|
||||
{ok, BT} = fractal_btree_reader:open(AFileName),
|
||||
|
||||
main_loop1(State2#state{ a=BT, b=undefined, merge_pid=undefined });
|
||||
main_loop(State2#state{ a=BT, b=undefined, merge_pid=undefined });
|
||||
|
||||
%%
|
||||
%% We need to push the output of merging to the next level
|
||||
|
@ -270,7 +191,7 @@ main_loop2(State = #state{ next=Next }) ->
|
|||
end,
|
||||
|
||||
MRef = send_request(State1#state.next, {inject, OutFileName}),
|
||||
main_loop2(State1#state{ inject_done_ref = MRef, merge_pid=undefined });
|
||||
main_loop(State1#state{ inject_done_ref = MRef, merge_pid=undefined });
|
||||
|
||||
%%
|
||||
%% Our successor accepted the inject
|
||||
|
@ -278,7 +199,7 @@ main_loop2(State = #state{ next=Next }) ->
|
|||
?REPLY(MRef, ok) when MRef =:= State#state.inject_done_ref ->
|
||||
erlang:demonitor(MRef, [flush]),
|
||||
{ok, State2} = close_a_and_b(State),
|
||||
main_loop0(State2#state{ inject_done_ref=undefined });
|
||||
main_loop(State2#state{ inject_done_ref=undefined });
|
||||
|
||||
%%
|
||||
%% Our successor died!
|
||||
|
@ -289,13 +210,31 @@ main_loop2(State = #state{ next=Next }) ->
|
|||
%% gen_fsm handling
|
||||
{system, From, Req} ->
|
||||
plain_fsm:handle_system_msg(
|
||||
From, Req, State, fun(S1) -> main_loop2(S1) end);
|
||||
From, Req, State, fun(S1) -> main_loop(S1) end);
|
||||
{'EXIT', Parent, Reason} ->
|
||||
plain_fsm:parent_EXIT(Reason, State)
|
||||
|
||||
end.
|
||||
|
||||
do_lookup(_Key, []) ->
|
||||
notfound;
|
||||
do_lookup(_Key, [Pid]) when is_pid(Pid) ->
|
||||
{delegate, Pid};
|
||||
do_lookup(Key, [undefined|Rest]) ->
|
||||
do_lookup(Key, Rest);
|
||||
do_lookup(Key, [BT|Rest]) ->
|
||||
case fractal_btree_reader:lookup(BT, Key) of
|
||||
{ok, deleted} -> notfound;
|
||||
{ok, Result} -> {found, Result};
|
||||
notfound -> do_lookup(Key, Rest)
|
||||
end.
|
||||
|
||||
close_if_defined(undefined) -> ok;
|
||||
close_if_defined(BT) -> fractal_btree_reader:close(BT).
|
||||
|
||||
stop_if_defined(undefined) -> ok;
|
||||
stop_if_defined(MergePid) when is_pid(MergePid) ->
|
||||
erlang:exit(MergePid, shutdown).
|
||||
|
||||
begin_merge(State) ->
|
||||
AFileName = filename("A",State),
|
||||
|
@ -333,6 +272,3 @@ close_a_and_b(State) ->
|
|||
filename(PFX, State) ->
|
||||
filename:join(State#state.dir, PFX ++ "-" ++ integer_to_list(State#state.level) ++ ".data").
|
||||
|
||||
size(State) ->
|
||||
(1 bsl State#state.level).
|
||||
|
||||
|
|
Loading…
Reference in a new issue