Simplify life cycle for fold workers

With this change, the fold worker does not
link to the receiver; now it simply monitors
the receiving process.  If the receiver dies,
the fold worker dies normally.  

The individual fold processes running on level
files are linked to the fold worker; so between
fold merge worker and those, normal link/kill
applies.
This commit is contained in:
Kresten Krab Thorup 2012-05-07 00:14:33 +02:00
parent fb67fed456
commit 9a7b9eb29f
4 changed files with 71 additions and 56 deletions

View file

@ -39,9 +39,18 @@
-include("hanoi.hrl").
-include_lib("kernel/include/file.hrl").
-include_lib("include/hanoi.hrl").
-include_lib("include/plain_rpc.hrl").
-record(state, { top, nursery, dir, opt, max_level }).
-ifdef(DEBUG).
-define(log(Fmt,Args),io:format(user,Fmt,Args)).
-else.
-define(log(Fmt,Args),ok).
-endif.
%% PUBLIC API
-type hanoi() :: pid().
@ -124,19 +133,24 @@ fold_range(Ref,Fun,Acc0,Range) ->
ok = gen_server:call(Ref, {snapshot_range, FoldWorkerPID, Range}, infinity)
end,
MRef = erlang:monitor(process, FoldWorkerPID),
receive_fold_range(MRef, FoldWorkerPID, Fun, Acc0).
?log("fold_range begin: self=~p, worker=~p~n", [self(), FoldWorkerPID]),
Result = receive_fold_range(MRef, FoldWorkerPID, Fun, Acc0),
?log("fold_range done: self:~p, result=~P~n", [self(), Result, 20]),
Result.
receive_fold_range(MRef, PID,Fun,Acc0) ->
receive_fold_range(MRef,PID,Fun,Acc0) ->
?log("receive_fold_range:~p,~P~n", [PID,Acc0,10]),
receive
%% receive one K/V from fold_worker
{fold_result, PID, K,V} ->
?CALL(From, {fold_result, PID, K,V}) ->
plain_rpc:send_reply(From, ok),
case
try
{ok, Fun(K,V,Acc0)}
catch
Class:Exception ->
% io:format(user, "Exception in hanoi fold: ~p ~p", [Exception, erlang:get_stacktrace()]),
% ?log("Exception in hanoi fold: ~p ~p", [Exception, erlang:get_stacktrace()]),
% lager:warn("Exception in hanoi fold: ~p", [Exception]),
{'EXIT', Class, Exception, erlang:get_stacktrace()}
end
@ -145,58 +159,47 @@ receive_fold_range(MRef, PID,Fun,Acc0) ->
receive_fold_range(MRef, PID, Fun, Acc1);
Exit ->
%% kill the fold worker ...
PID ! shutdown,
erlang:exit(PID, shutdown),
drain_worker_and_throw(MRef,PID,Exit)
end;
%% receive multiple KVs from fold_worker
{fold_results, PID, KVs} ->
case
try
{ok, kvfoldl(Fun,Acc0,KVs)}
catch
Class:Exception ->
lager:warning("Exception in hanoi fold: ~p", [Exception]),
{'EXIT', Class, Exception, erlang:get_stacktrace()}
end
of
{ok, Acc1} ->
receive_fold_range(MRef, PID, Fun, Acc1);
Exit ->
%% kill the fold worker ...
erlang:exit(PID, kill),
drain_worker_and_throw(MRef,PID,Exit)
end;
{fold_limit, PID, _} ->
?CAST(_,{fold_limit, PID, _}) ->
?log("> fold_limit pid=~p, self=~p~n", [PID, self()]),
erlang:demonitor(MRef, [flush]),
Acc0;
{fold_done, PID} ->
?CAST(_,{fold_done, PID}) ->
?log("> fold_done pid=~p, self=~p~n", [PID, self()]),
erlang:demonitor(MRef, [flush]),
Acc0;
{'DOWN', MRef, _, _, Reason} ->
{'DOWN', MRef, _, _PID, normal} ->
?log("> fold worker ~p ENDED~n", [_PID]),
Acc0;
{'DOWN', MRef, _, _PID, Reason} ->
?log("> fold worker ~p DOWN reason:~p~n", [_PID, Reason]),
error({fold_worker_died, Reason})
end.
kvfoldl(_Fun,Acc0,[]) ->
Acc0;
kvfoldl(Fun,Acc0,[{K,V}|T]) ->
kvfoldl(Fun, Fun(K,V,Acc0), T).
%%
%% Just calls erlang:raise with appropriate arguments
%%
raise({'EXIT', Class, Exception, Trace}) ->
erlang:raise(Class, Exception, Trace).
%%
%% When an exception has happened in the fold function, we use
%% this to drain messages coming from the fold_worker before
%% re-throwing the exception.
%%
drain_worker_and_throw(MRef, PID, ExitTuple) ->
receive
{fold_result, PID, _, _} ->
drain_worker_and_throw(MRef, PID, ExitTuple);
{fold_results, PID, _} ->
?CALL(_From,{fold_result, PID, _, _}) ->
drain_worker_and_throw(MRef, PID, ExitTuple);
{'DOWN', MRef, _, _, _} ->
raise(ExitTuple);
{fold_limit, PID, _} ->
?CAST(_,{fold_limit, PID, _}) ->
erlang:demonitor(MRef, [flush]),
raise(ExitTuple);
{fold_done, PID} ->
?CAST(_,{fold_done, PID}) ->
erlang:demonitor(MRef, [flush]),
raise(ExitTuple)
end.

View file

@ -43,6 +43,7 @@
%% {level_result, LevelWorker, Key2, Value}
%% {level_result, LevelWorker, Key3, Value}
%% {level_result, LevelWorker, Key4, Value}
%% {level_results, LevelWorker, [{Key,Value}...]} %% alternatively
%% ...
%% {level_done, LevelWorker}
%%
@ -67,18 +68,20 @@
-include("hanoi.hrl").
-include("plain_rpc.hrl").
-record(state, {sendto}).
-record(state, {sendto, sendto_ref}).
start(SendTo) ->
PID = plain_fsm:spawn(?MODULE,
fun() ->
?log("fold_worker started ~p~n", [self()]),
process_flag(trap_exit,true),
link(SendTo),
MRef = erlang:monitor(process, SendTo),
try
initialize(#state{sendto=SendTo}, []),
unlink(SendTo)
initialize(#state{sendto=SendTo, sendto_ref=MRef}, []),
?log("fold_worker done ~p~n", [self()])
catch
Class:Ex ->
?log("fold_worker exception ~p:~p ~p~n", [Class, Ex, erlang:get_stacktrace()]),
error_logger:error_msg("Unexpected: ~p:~p ~p~n", [Class, Ex, erlang:get_stacktrace()]),
exit({bad, Class, Ex, erlang:get_stacktrace()})
end
@ -90,9 +93,6 @@ initialize(State, PrefixFolders) ->
Parent = plain_fsm:info(parent),
receive
shutdown ->
ok;
{prefix, [_]=Folders} ->
initialize(State, Folders);
@ -107,6 +107,9 @@ initialize(State, PrefixFolders) ->
plain_fsm:handle_system_msg(
From, Req, State, fun(S1) -> initialize(S1, PrefixFolders) end);
{'DOWN', MRef, _, _, _} when MRef =:= State#state.sendto_ref ->
ok;
{'EXIT', Parent, Reason} ->
plain_fsm:parent_EXIT(Reason, State)
@ -144,11 +147,6 @@ fill_from_inbox(State, Values, Queues, [], PIDs) ->
fill_from_inbox(State, Values, Queues, [PID|_]=PIDs, SavePIDs) ->
?log("waiting for ~p~n", [PIDs]),
receive
shutdown ->
[ erlang:kill(QPID, shutdown) || {QPID,_} <- Queues,
is_pid(QPID) ],
ok;
{level_done, PID} ->
?log("got {done, ~p}~n", [PID]),
Queues2 = enter(PID, done, Queues),
@ -175,6 +173,9 @@ fill_from_inbox(State, Values, Queues, [PID|_]=PIDs, SavePIDs) ->
plain_fsm:handle_system_msg(
From, Req, State, fun(S1) -> fill_from_inbox(S1, Values, Queues, PIDs, SavePIDs) end);
{'DOWN', MRef, _, _, _} when MRef =:= State#state.sendto_ref ->
ok;
{'EXIT', Parent, Reason}=Msg ->
case plain_fsm:info(parent) == Parent of
true ->
@ -201,7 +202,7 @@ emit_next(State, [], _Queues) ->
Msg = {fold_done, self()},
Target = State#state.sendto,
?log( "~p ! ~p~n", [Target, Msg]),
Target ! Msg,
plain_rpc:cast(Target, Msg),
end_of_fold(State);
emit_next(State, [{FirstPID,FirstKV}|Rest]=Values, Queues) ->
@ -221,15 +222,15 @@ emit_next(State, [{FirstPID,FirstKV}|Rest]=Values, Queues) ->
fill(State, Values, Queues, FillFrom);
{{Key, limit}, _} ->
?log( "~p ! ~p~n", [State#state.sendto, {fold_limit, self(), Key}]),
State#state.sendto ! {fold_limit, self(), Key},
plain_rpc:cast(State#state.sendto, {fold_limit, self(), Key}),
end_of_fold(State);
{{Key, Value}, FillFrom} ->
?log( "~p ! ~p~n", [State#state.sendto, {fold_result, self(), Key, '...'}]),
State#state.sendto ! {fold_result, self(), Key, Value},
plain_rpc:call(State#state.sendto, {fold_result, self(), Key, Value}),
fill(State, Values, Queues, FillFrom)
end.
end_of_fold(State) ->
end_of_fold(_State) ->
ok.
data_vsn() ->

View file

@ -820,7 +820,7 @@ do_range_fold(BT, WorkerPID, SelfOrRef, Range) ->
SelfOrRef :: pid() | reference(),
Range :: #btree_range{} ) -> ok.
do_range_fold2(BT, WorkerPID, SelfOrRef, Range) ->
case hanoi_reader:range_fold(fun(Key,Value,{0,KVs}) ->
try hanoi_reader:range_fold(fun(Key,Value,{0,KVs}) ->
send(WorkerPID, SelfOrRef, [{Key,Value}|KVs]),
{?FOLD_CHUNK_SIZE-1, []};
(Key,Value,{N,KVs}) ->
@ -836,11 +836,19 @@ do_range_fold2(BT, WorkerPID, SelfOrRef, Range) ->
%% tell fold merge worker we're done
send(WorkerPID, SelfOrRef, KVs),
WorkerPID ! {level_done, SelfOrRef}
catch
exit:worker_died -> ok
end,
ok.
send(_,_,[]) ->
[];
send(WorkerPID,Ref,ReverseKVs) ->
plain_rpc:call(WorkerPID, {level_results, Ref, lists:reverse(ReverseKVs)}).
try
plain_rpc:call(WorkerPID, {level_results, Ref, lists:reverse(ReverseKVs)})
catch
%% the fold worker died; just ignore it
exit:normal -> exit(worker_died);
exit:shutdown -> exit(worker_died);
exit:noproc -> exit(worker_died)
end.

View file

@ -280,7 +280,10 @@ fold_objects(FoldObjectsFun, Acc, Opts, #state{tree=Tree}) ->
FoldFun = fold_objects_fun(FoldObjectsFun, Bucket),
ObjectFolder =
fun() ->
hanoi:fold_range(Tree, FoldFun, Acc, to_key_range(Bucket))
% io:format(user, "starting fold_objects in ~p~n", [self()]),
Result = hanoi:fold_range(Tree, FoldFun, Acc, to_key_range(Bucket)),
% io:format(user, "ended fold_objects in ~p => ~P~n", [self(),Result,20]),
Result
end,
case proplists:get_bool(async_fold, Opts) of
true ->