Use plain_rpc

This commit is contained in:
Kresten Krab Thorup 2012-04-26 00:56:59 +02:00
parent 085f400bb8
commit 6c8492c7d0

View file

@ -25,6 +25,8 @@
-module(hanoi_level). -module(hanoi_level).
-author('Kresten Krab Thorup <krab@trifork.com>'). -author('Kresten Krab Thorup <krab@trifork.com>').
-include("include/plain_rpc.hrl").
-include("include/hanoi.hrl"). -include("include/hanoi.hrl").
-include("src/hanoi.hrl"). -include("src/hanoi.hrl").
@ -56,29 +58,30 @@
%%%%% PUBLIC OPERATIONS %%%%% PUBLIC OPERATIONS
open(Dir,Level,Next,Opts,Owner) when Level>0 -> open(Dir,Level,Next,Opts,Owner) when Level>0 ->
PID = plain_fsm:spawn_link(?MODULE, PID = plain_fsm:spawn(?MODULE,
fun() -> fun() ->
process_flag(trap_exit,true), process_flag(trap_exit,true),
link(Owner),
initialize(#state{dir=Dir,level=Level,next=Next,opts=Opts,owner=Owner}) initialize(#state{dir=Dir,level=Level,next=Next,opts=Opts,owner=Owner})
end), end),
{ok, PID}. {ok, PID}.
lookup(Ref, Key) -> lookup(Ref, Key) ->
call(Ref, {lookup, Key}). plain_rpc:call(Ref, {lookup, Key}).
inject(Ref, FileName) -> inject(Ref, FileName) ->
Result = call(Ref, {inject, FileName}), Result = plain_rpc:call(Ref, {inject, FileName}),
Result. Result.
incremental_merge(Ref,HowMuch) -> incremental_merge(Ref,HowMuch) ->
call(Ref, {incremental_merge, HowMuch}). plain_rpc:call(Ref, {incremental_merge, HowMuch}).
unmerged_count(Ref) -> unmerged_count(Ref) ->
call(Ref, unmerged_count). plain_rpc:call(Ref, unmerged_count).
close(Ref) -> close(Ref) ->
try try
call(Ref, close) plain_rpc:call(Ref, close)
catch catch
exit:{noproc,_} -> ok; exit:{noproc,_} -> ok;
exit:noproc -> ok exit:noproc -> ok
@ -88,13 +91,13 @@ close(Ref) ->
snapshot_range(Ref, FoldWorkerPID, Range) -> snapshot_range(Ref, FoldWorkerPID, Range) ->
proc_lib:spawn(fun() -> proc_lib:spawn(fun() ->
{ok, Folders} = call(Ref, {init_snapshot_range_fold, FoldWorkerPID, Range, []}), {ok, Folders} = plain_rpc:call(Ref, {init_snapshot_range_fold, FoldWorkerPID, Range, []}),
FoldWorkerPID ! {initialize, Folders} FoldWorkerPID ! {initialize, Folders}
end), end),
{ok, FoldWorkerPID}. {ok, FoldWorkerPID}.
blocking_range(Ref, FoldWorkerPID, Range) -> blocking_range(Ref, FoldWorkerPID, Range) ->
{ok, Folders} = call(Ref, {init_blocking_range_fold, FoldWorkerPID, Range, []}), {ok, Folders} = plain_rpc:call(Ref, {init_blocking_range_fold, FoldWorkerPID, Range, []}),
FoldWorkerPID ! {initialize, Folders}, FoldWorkerPID ! {initialize, Folders},
{ok, FoldWorkerPID}. {ok, FoldWorkerPID}.
@ -107,38 +110,13 @@ code_change(_OldVsn, _State, _Extra) ->
{ok, {#state{}, data_vsn()}}. {ok, {#state{}, data_vsn()}}.
-define(REQ(From,Msg), {'$req', From, Msg}).
-define(REPLY(Ref,Msg), {'$rep', Ref, Msg}).
send_request(PID, Request) ->
Ref = erlang:monitor(process, PID),
PID ! ?REQ({self(), Ref}, Request),
Ref.
receive_reply(MRef) ->
receive
?REPLY(MRef, Reply) ->
erlang:demonitor(MRef, [flush]),
Reply;
{'DOWN', MRef, _, _, Reason} ->
exit(Reason)
end.
call(PID,Request) ->
Ref = send_request(PID, Request),
receive_reply(Ref).
reply({PID,Ref}, Reply) ->
erlang:send(PID, ?REPLY(Ref, Reply)),
ok.
initialize(State) -> initialize(State) ->
try try
initialize2(State) initialize2(State)
catch catch
Class:Ex when not (Class == exit andalso Ex == normal) -> Class:Ex when not (Class == exit andalso Ex == normal) ->
?log("crashing~n", []),
error_logger:error_msg("crash2: ~p:~p ~p~n", [Class,Ex,erlang:get_stacktrace()]) error_logger:error_msg("crash2: ~p:~p ~p~n", [Class,Ex,erlang:get_stacktrace()])
end. end.
@ -216,18 +194,19 @@ check_begin_merge_then_loop(State) ->
main_loop(State = #state{ next=Next }) -> main_loop(State = #state{ next=Next }) ->
Parent = plain_fsm:info(parent), Parent = plain_fsm:info(parent),
receive receive
?REQ(From, {lookup, Key})=Req -> ?CALL(From, {lookup, Key})=Req ->
case do_lookup(Key, [State#state.c, State#state.b, State#state.a, Next]) of case do_lookup(Key, [State#state.c, State#state.b, State#state.a, Next]) of
not_found -> not_found ->
reply(From, not_found); plain_rpc:send_reply(From, not_found);
{found, Result} -> {found, Result} ->
reply(From, {ok, Result}); plain_rpc:send_reply(From, {ok, Result});
{delegate, DelegatePid} -> {delegate, DelegatePid} ->
DelegatePid ! Req DelegatePid ! Req
end, end,
main_loop(State); main_loop(State);
?REQ(From, {inject, FileName}) when State#state.c == undefined -> ?CALL(From, {inject, FileName}) when State#state.c == undefined ->
case {State#state.a, State#state.b} of case {State#state.a, State#state.b} of
{undefined, undefined} -> {undefined, undefined} ->
ToFileName = filename("A",State), ToFileName = filename("A",State),
@ -246,24 +225,24 @@ main_loop(State = #state{ next=Next }) ->
check_begin_merge_then_loop(setelement(SetPos, State, BT)); check_begin_merge_then_loop(setelement(SetPos, State, BT));
?REQ(From, unmerged_count) -> ?CALL(From, unmerged_count) ->
reply(From, total_unmerged(State)), plain_rpc:send_reply(From, total_unmerged(State)),
main_loop(State); main_loop(State);
%% replies OK when there is no current step in progress %% replies OK when there is no current step in progress
?REQ(From, {incremental_merge, HowMuch}) ?CALL(From, {incremental_merge, HowMuch})
when State#state.step_merge_ref == undefined, when State#state.step_merge_ref == undefined,
State#state.step_next_ref == undefined -> State#state.step_next_ref == undefined ->
reply(From, ok), plain_rpc:send_reply(From, ok),
if HowMuch > 0 -> if HowMuch > 0 ->
self() ! ?REQ(undefined, {step, HowMuch}); self() ! ?CALL(undefined, {step, HowMuch});
true -> true ->
ok ok
end, end,
main_loop(State); main_loop(State);
%% accept step any time there is not an outstanding step %% accept step any time there is not an outstanding step
?REQ(StepFrom, {step, HowMuch}) ?CALL(StepFrom, {step, HowMuch})
when State#state.step_merge_ref == undefined, when State#state.step_merge_ref == undefined,
State#state.step_caller == undefined, State#state.step_caller == undefined,
State#state.step_next_ref == undefined State#state.step_next_ref == undefined
@ -277,7 +256,7 @@ main_loop(State = #state{ next=Next }) ->
if Next =:= undefined; DelegateWork == 0 -> if Next =:= undefined; DelegateWork == 0 ->
DelegateRef = undefined; DelegateRef = undefined;
true -> true ->
DelegateRef = send_request(Next, {step, DelegateWork}) DelegateRef = plain_rpc:send_call(Next, {step, DelegateWork})
end, end,
if (State#state.merge_pid == undefined) if (State#state.merge_pid == undefined)
@ -339,12 +318,12 @@ main_loop(State = #state{ next=Next }) ->
State2 = reply_step_ok(State), State2 = reply_step_ok(State),
main_loop(State2#state{ step_next_ref=undefined }); main_loop(State2#state{ step_next_ref=undefined });
?REQ(From, close) -> ?CALL(From, close) ->
close_if_defined(State#state.a), close_if_defined(State#state.a),
close_if_defined(State#state.b), close_if_defined(State#state.b),
close_if_defined(State#state.c), close_if_defined(State#state.c),
stop_if_defined(State#state.merge_pid), stop_if_defined(State#state.merge_pid),
reply(From, ok), plain_rpc:send_reply(From, ok),
%% this is synchronous all the way down, because our %% this is synchronous all the way down, because our
%% caller is monitoring *this* proces, and thus the %% caller is monitoring *this* proces, and thus the
@ -355,7 +334,7 @@ main_loop(State = #state{ next=Next }) ->
end, end,
ok; ok;
?REQ(From, {init_snapshot_range_fold, WorkerPID, Range, List}) when State#state.folding == [] -> ?CALL(From, {init_snapshot_range_fold, WorkerPID, Range, List}) when State#state.folding == [] ->
case {State#state.a, State#state.b, State#state.c} of case {State#state.a, State#state.b, State#state.c} of
{undefined, undefined, undefined} -> {undefined, undefined, undefined} ->
@ -394,9 +373,9 @@ main_loop(State = #state{ next=Next }) ->
case Next of case Next of
undefined -> undefined ->
reply(From, {ok, lists:reverse(NextList)}); plain_rpc:send_reply(From, {ok, lists:reverse(NextList)});
_ -> _ ->
Next ! ?REQ(From, {init_snapshot_range_fold, WorkerPID, Range, NextList}) Next ! ?CALL(From, {init_snapshot_range_fold, WorkerPID, Range, NextList})
end, end,
main_loop(State#state{ folding = FoldingPIDs }); main_loop(State#state{ folding = FoldingPIDs });
@ -406,7 +385,7 @@ main_loop(State = #state{ next=Next }) ->
NewFolding = lists:delete(PID,State#state.folding), NewFolding = lists:delete(PID,State#state.folding),
main_loop(State#state{ folding = NewFolding }); main_loop(State#state{ folding = NewFolding });
?REQ(From, {init_blocking_range_fold, WorkerPID, Range, List}) -> ?CALL(From, {init_blocking_range_fold, WorkerPID, Range, List}) ->
case {State#state.a, State#state.b, State#state.c} of case {State#state.a, State#state.b, State#state.c} of
{undefined, undefined, undefined} -> {undefined, undefined, undefined} ->
@ -441,9 +420,9 @@ main_loop(State = #state{ next=Next }) ->
case Next of case Next of
undefined -> undefined ->
reply(From, {ok, lists:reverse(RefList)}); plain_rpc:send_reply(From, {ok, lists:reverse(RefList)});
_ -> _ ->
Next ! ?REQ(From, {init_blocking_range_fold, WorkerPID, Range, RefList}) Next ! ?CALL(From, {init_blocking_range_fold, WorkerPID, Range, RefList})
end, end,
main_loop(State); main_loop(State);
@ -453,7 +432,9 @@ main_loop(State = #state{ next=Next }) ->
%% The outcome of merging resulted in a file with less than %% The outcome of merging resulted in a file with less than
%% level #entries, so we keep it at this level %% level #entries, so we keep it at this level
%% %%
{merge_done, Count, OutFileName} when Count =< ?BTREE_SIZE(State#state.level) -> ?CAST(_From,{merge_done, Count, OutFileName}) when Count =< ?BTREE_SIZE(State#state.level) ->
?log("merge_done, out:~w~n -> self", [Count]),
% first, rename the tmp file to M, so recovery will pick it up % first, rename the tmp file to M, so recovery will pick it up
MFileName = filename("M",State), MFileName = filename("M",State),
@ -482,7 +463,7 @@ main_loop(State = #state{ next=Next }) ->
%% %%
%% We need to push the output of merging to the next level %% We need to push the output of merging to the next level
%% %%
{merge_done, _, OutFileName} -> ?CAST(_,{merge_done, _Count, OutFileName}) ->
State1 = State1 =
if Next =:= undefined -> if Next =:= undefined ->
{ok, PID} = ?MODULE:open(State#state.dir, State#state.level + 1, undefined, {ok, PID} = ?MODULE:open(State#state.dir, State#state.level + 1, undefined,
@ -495,14 +476,15 @@ main_loop(State = #state{ next=Next }) ->
%% no need to rename it since we don't accept new injects %% no need to rename it since we don't accept new injects
MRef = send_request(State1#state.next, {inject, OutFileName}), MRef = plain_rpc:send_call(State1#state.next, {inject, OutFileName}),
main_loop(State1#state{ inject_done_ref = MRef, merge_pid=undefined }); main_loop(State1#state{ inject_done_ref = MRef, merge_pid=undefined });
%% %%
%% Our successor accepted the inject %% Our successor accepted the inject
%% %%
?REPLY(MRef, ok) when MRef =:= State#state.inject_done_ref -> ?REPLY(MRef, ok) when MRef == State#state.inject_done_ref ->
erlang:demonitor(MRef, [flush]), erlang:demonitor(MRef, [flush]),
{ok, State2} = close_and_delete_a_and_b(State), {ok, State2} = close_and_delete_a_and_b(State),
% if there is a "C" file, then move it to "A" position. % if there is a "C" file, then move it to "A" position.
@ -546,7 +528,7 @@ main_loop(State = #state{ next=Next }) ->
reply_step_ok(State) -> reply_step_ok(State) ->
case State#state.step_caller of case State#state.step_caller of
undefined -> ok; undefined -> ok;
_ -> reply(State#state.step_caller, ok) _ -> plain_rpc:send_reply(State#state.step_caller, ok)
end, end,
State#state{ step_caller=undefined }. State#state{ step_caller=undefined }.
@ -605,7 +587,7 @@ begin_merge(State) ->
State#state.opts ), State#state.opts ),
% error_logger:info_msg("merge done ~p,~p -> ~p~n", [AFileName, BFileName, XFileName]), % error_logger:info_msg("merge done ~p,~p -> ~p~n", [AFileName, BFileName, XFileName]),
Owner ! {merge_done, OutCount, XFileName} Owner ! ?CAST(self(),{merge_done, OutCount, XFileName})
catch catch
C:E -> C:E ->
error_logger:error_msg("merge failed ~p:~p ~p~n", error_logger:error_msg("merge failed ~p:~p ~p~n",