From 6c8492c7d0fdb2f407107bb0e3b6c31ee3b403ef Mon Sep 17 00:00:00 2001 From: Kresten Krab Thorup Date: Thu, 26 Apr 2012 00:56:59 +0200 Subject: [PATCH] Use plain_rpc --- src/hanoi_level.erl | 100 ++++++++++++++++++-------------------------- 1 file changed, 41 insertions(+), 59 deletions(-) diff --git a/src/hanoi_level.erl b/src/hanoi_level.erl index 8acf51a..4cde09d 100644 --- a/src/hanoi_level.erl +++ b/src/hanoi_level.erl @@ -25,6 +25,8 @@ -module(hanoi_level). -author('Kresten Krab Thorup '). +-include("include/plain_rpc.hrl"). + -include("include/hanoi.hrl"). -include("src/hanoi.hrl"). @@ -56,29 +58,30 @@ %%%%% PUBLIC OPERATIONS open(Dir,Level,Next,Opts,Owner) when Level>0 -> - PID = plain_fsm:spawn_link(?MODULE, + PID = plain_fsm:spawn(?MODULE, fun() -> process_flag(trap_exit,true), + link(Owner), initialize(#state{dir=Dir,level=Level,next=Next,opts=Opts,owner=Owner}) end), {ok, PID}. lookup(Ref, Key) -> - call(Ref, {lookup, Key}). + plain_rpc:call(Ref, {lookup, Key}). inject(Ref, FileName) -> - Result = call(Ref, {inject, FileName}), + Result = plain_rpc:call(Ref, {inject, FileName}), Result. incremental_merge(Ref,HowMuch) -> - call(Ref, {incremental_merge, HowMuch}). + plain_rpc:call(Ref, {incremental_merge, HowMuch}). unmerged_count(Ref) -> - call(Ref, unmerged_count). + plain_rpc:call(Ref, unmerged_count). close(Ref) -> try - call(Ref, close) + plain_rpc:call(Ref, close) catch exit:{noproc,_} -> ok; exit:noproc -> ok @@ -88,13 +91,13 @@ close(Ref) -> snapshot_range(Ref, FoldWorkerPID, Range) -> proc_lib:spawn(fun() -> - {ok, Folders} = call(Ref, {init_snapshot_range_fold, FoldWorkerPID, Range, []}), + {ok, Folders} = plain_rpc:call(Ref, {init_snapshot_range_fold, FoldWorkerPID, Range, []}), FoldWorkerPID ! {initialize, Folders} end), {ok, FoldWorkerPID}. blocking_range(Ref, FoldWorkerPID, Range) -> - {ok, Folders} = call(Ref, {init_blocking_range_fold, FoldWorkerPID, Range, []}), + {ok, Folders} = plain_rpc:call(Ref, {init_blocking_range_fold, FoldWorkerPID, Range, []}), FoldWorkerPID ! {initialize, Folders}, {ok, FoldWorkerPID}. @@ -107,38 +110,13 @@ code_change(_OldVsn, _State, _Extra) -> {ok, {#state{}, data_vsn()}}. --define(REQ(From,Msg), {'$req', From, Msg}). --define(REPLY(Ref,Msg), {'$rep', Ref, Msg}). - -send_request(PID, Request) -> - Ref = erlang:monitor(process, PID), - PID ! ?REQ({self(), Ref}, Request), - Ref. - -receive_reply(MRef) -> - receive - ?REPLY(MRef, Reply) -> - erlang:demonitor(MRef, [flush]), - Reply; - {'DOWN', MRef, _, _, Reason} -> - exit(Reason) - end. - -call(PID,Request) -> - Ref = send_request(PID, Request), - receive_reply(Ref). - -reply({PID,Ref}, Reply) -> - erlang:send(PID, ?REPLY(Ref, Reply)), - ok. - - initialize(State) -> try initialize2(State) catch Class:Ex when not (Class == exit andalso Ex == normal) -> + ?log("crashing~n", []), error_logger:error_msg("crash2: ~p:~p ~p~n", [Class,Ex,erlang:get_stacktrace()]) end. @@ -216,18 +194,19 @@ check_begin_merge_then_loop(State) -> main_loop(State = #state{ next=Next }) -> Parent = plain_fsm:info(parent), receive - ?REQ(From, {lookup, Key})=Req -> + ?CALL(From, {lookup, Key})=Req -> case do_lookup(Key, [State#state.c, State#state.b, State#state.a, Next]) of not_found -> - reply(From, not_found); + plain_rpc:send_reply(From, not_found); {found, Result} -> - reply(From, {ok, Result}); + plain_rpc:send_reply(From, {ok, Result}); {delegate, DelegatePid} -> DelegatePid ! Req end, main_loop(State); - ?REQ(From, {inject, FileName}) when State#state.c == undefined -> + ?CALL(From, {inject, FileName}) when State#state.c == undefined -> + case {State#state.a, State#state.b} of {undefined, undefined} -> ToFileName = filename("A",State), @@ -246,24 +225,24 @@ main_loop(State = #state{ next=Next }) -> check_begin_merge_then_loop(setelement(SetPos, State, BT)); - ?REQ(From, unmerged_count) -> - reply(From, total_unmerged(State)), + ?CALL(From, unmerged_count) -> + plain_rpc:send_reply(From, total_unmerged(State)), main_loop(State); %% replies OK when there is no current step in progress - ?REQ(From, {incremental_merge, HowMuch}) + ?CALL(From, {incremental_merge, HowMuch}) when State#state.step_merge_ref == undefined, State#state.step_next_ref == undefined -> - reply(From, ok), + plain_rpc:send_reply(From, ok), if HowMuch > 0 -> - self() ! ?REQ(undefined, {step, HowMuch}); + self() ! ?CALL(undefined, {step, HowMuch}); true -> ok end, main_loop(State); %% accept step any time there is not an outstanding step - ?REQ(StepFrom, {step, HowMuch}) + ?CALL(StepFrom, {step, HowMuch}) when State#state.step_merge_ref == undefined, State#state.step_caller == undefined, State#state.step_next_ref == undefined @@ -277,7 +256,7 @@ main_loop(State = #state{ next=Next }) -> if Next =:= undefined; DelegateWork == 0 -> DelegateRef = undefined; true -> - DelegateRef = send_request(Next, {step, DelegateWork}) + DelegateRef = plain_rpc:send_call(Next, {step, DelegateWork}) end, if (State#state.merge_pid == undefined) @@ -339,12 +318,12 @@ main_loop(State = #state{ next=Next }) -> State2 = reply_step_ok(State), main_loop(State2#state{ step_next_ref=undefined }); - ?REQ(From, close) -> + ?CALL(From, close) -> close_if_defined(State#state.a), close_if_defined(State#state.b), close_if_defined(State#state.c), stop_if_defined(State#state.merge_pid), - reply(From, ok), + plain_rpc:send_reply(From, ok), %% this is synchronous all the way down, because our %% caller is monitoring *this* proces, and thus the @@ -355,7 +334,7 @@ main_loop(State = #state{ next=Next }) -> end, ok; - ?REQ(From, {init_snapshot_range_fold, WorkerPID, Range, List}) when State#state.folding == [] -> + ?CALL(From, {init_snapshot_range_fold, WorkerPID, Range, List}) when State#state.folding == [] -> case {State#state.a, State#state.b, State#state.c} of {undefined, undefined, undefined} -> @@ -394,9 +373,9 @@ main_loop(State = #state{ next=Next }) -> case Next of undefined -> - reply(From, {ok, lists:reverse(NextList)}); + plain_rpc:send_reply(From, {ok, lists:reverse(NextList)}); _ -> - Next ! ?REQ(From, {init_snapshot_range_fold, WorkerPID, Range, NextList}) + Next ! ?CALL(From, {init_snapshot_range_fold, WorkerPID, Range, NextList}) end, main_loop(State#state{ folding = FoldingPIDs }); @@ -406,7 +385,7 @@ main_loop(State = #state{ next=Next }) -> NewFolding = lists:delete(PID,State#state.folding), main_loop(State#state{ folding = NewFolding }); - ?REQ(From, {init_blocking_range_fold, WorkerPID, Range, List}) -> + ?CALL(From, {init_blocking_range_fold, WorkerPID, Range, List}) -> case {State#state.a, State#state.b, State#state.c} of {undefined, undefined, undefined} -> @@ -441,9 +420,9 @@ main_loop(State = #state{ next=Next }) -> case Next of undefined -> - reply(From, {ok, lists:reverse(RefList)}); + plain_rpc:send_reply(From, {ok, lists:reverse(RefList)}); _ -> - Next ! ?REQ(From, {init_blocking_range_fold, WorkerPID, Range, RefList}) + Next ! ?CALL(From, {init_blocking_range_fold, WorkerPID, Range, RefList}) end, main_loop(State); @@ -453,7 +432,9 @@ main_loop(State = #state{ next=Next }) -> %% The outcome of merging resulted in a file with less than %% level #entries, so we keep it at this level %% - {merge_done, Count, OutFileName} when Count =< ?BTREE_SIZE(State#state.level) -> + ?CAST(_From,{merge_done, Count, OutFileName}) when Count =< ?BTREE_SIZE(State#state.level) -> + + ?log("merge_done, out:~w~n -> self", [Count]), % first, rename the tmp file to M, so recovery will pick it up MFileName = filename("M",State), @@ -482,7 +463,7 @@ main_loop(State = #state{ next=Next }) -> %% %% We need to push the output of merging to the next level %% - {merge_done, _, OutFileName} -> + ?CAST(_,{merge_done, _Count, OutFileName}) -> State1 = if Next =:= undefined -> {ok, PID} = ?MODULE:open(State#state.dir, State#state.level + 1, undefined, @@ -495,14 +476,15 @@ main_loop(State = #state{ next=Next }) -> %% no need to rename it since we don't accept new injects - MRef = send_request(State1#state.next, {inject, OutFileName}), + MRef = plain_rpc:send_call(State1#state.next, {inject, OutFileName}), main_loop(State1#state{ inject_done_ref = MRef, merge_pid=undefined }); %% %% Our successor accepted the inject %% - ?REPLY(MRef, ok) when MRef =:= State#state.inject_done_ref -> + ?REPLY(MRef, ok) when MRef == State#state.inject_done_ref -> erlang:demonitor(MRef, [flush]), + {ok, State2} = close_and_delete_a_and_b(State), % if there is a "C" file, then move it to "A" position. @@ -546,7 +528,7 @@ main_loop(State = #state{ next=Next }) -> reply_step_ok(State) -> case State#state.step_caller of undefined -> ok; - _ -> reply(State#state.step_caller, ok) + _ -> plain_rpc:send_reply(State#state.step_caller, ok) end, State#state{ step_caller=undefined }. @@ -605,7 +587,7 @@ begin_merge(State) -> State#state.opts ), % error_logger:info_msg("merge done ~p,~p -> ~p~n", [AFileName, BFileName, XFileName]), - Owner ! {merge_done, OutCount, XFileName} + Owner ! ?CAST(self(),{merge_done, OutCount, XFileName}) catch C:E -> error_logger:error_msg("merge failed ~p:~p ~p~n",