Fold back pressure, step 1

This first step of the fold back pressure impl
changes fold worker so that it does not get
flooded by messages.  Now, we take messages
and put them in queues (one per fold source),
so we don't have to do selective receive on
bazillions of messages.
This commit is contained in:
Kresten Krab Thorup 2012-05-05 12:49:03 +02:00
parent bfb8f3f783
commit f821b38ea2

View file

@ -69,7 +69,13 @@ start(SendTo) ->
fun() -> fun() ->
process_flag(trap_exit,true), process_flag(trap_exit,true),
link(SendTo), link(SendTo),
try
initialize(#state{sendto=SendTo}, []) initialize(#state{sendto=SendTo}, [])
catch
Class:Ex ->
error_logger:error_msg("Unexpected: ~p:~p ~p~n", [Class, Ex, erlang:get_stacktrace()]),
exit({bad, Class, Ex, erlang:get_stacktrace()})
end
end), end),
{ok, PID}. {ok, PID}.
@ -86,8 +92,9 @@ initialize(State, PrefixFolders) ->
{initialize, Folders} -> {initialize, Folders} ->
Queues = [ {PID,queue:new()} || PID <- (PrefixFolders ++ Folders) ],
Initial = [ {PID,undefined} || PID <- (PrefixFolders ++ Folders) ], Initial = [ {PID,undefined} || PID <- (PrefixFolders ++ Folders) ],
fill(State, Initial, PrefixFolders ++ Folders); fill(State, Initial, Queues, PrefixFolders ++ Folders);
%% gen_fsm handling %% gen_fsm handling
{system, From, Req} -> {system, From, Req} ->
@ -100,28 +107,68 @@ initialize(State, PrefixFolders) ->
end. end.
fill(State, Values, []) -> fill(State, Values, Queues, []) ->
emit_next(State, Values); emit_next(State, Values, Queues);
fill(State, Values, Queues, [PID|Rest]=PIDs) ->
% io:format(user, "v=~P, q=~P, pids=~p~n", [Values, 10, Queues, 10, PIDs]),
case lists:keyfind(PID, 1, Queues) of
{PID, Q} ->
case queue:out(Q) of
{empty, Q} ->
fill_from_inbox(State, Values, Queues, PIDs, PIDs);
{{value, Msg}, Q2} ->
Queues2 = lists:keyreplace(PID, 1, Queues, {PID, Q2}),
case Msg of
done ->
fill(State, lists:keydelete(PID, 1, Values), Queues2, Rest);
{_Key, _Value}=KV ->
fill(State, lists:keyreplace(PID, 1, Values, {PID, KV}), Queues2, Rest)
end
end
end.
fill_from_inbox(State, Values, Queues, [], PIDs) ->
fill(State, Values, Queues, PIDs);
fill_from_inbox(State, Values, Queues, PIDs, SavePIDs) ->
fill(State, Values, [PID|Rest]=PIDs) ->
receive receive
die -> die ->
ok; ok;
{level_done, PID} -> {level_done, PID} ->
?log( "{level_done, ~p}~n", [PID]), Queues2 = enter(PID, done, Queues),
fill(State, lists:keydelete(PID, 1, Values), Rest); if PID == hd(PIDs) ->
fill_from_inbox(State, Values, Queues2, tl(PIDs), SavePIDs);
true ->
fill_from_inbox(State, Values, Queues2, PIDs, SavePIDs)
end;
{level_limit, PID, Key} -> {level_limit, PID, Key} ->
?log( "{level_limit, ~p}~n", [PID]), Queues2 = enter(PID, {Key, limit}, Queues),
fill(State, lists:keyreplace(PID, 1, Values, {PID,{Key,limit}}), Rest); if PID == hd(PIDs) ->
fill_from_inbox(State, Values, Queues2, tl(PIDs), SavePIDs);
true ->
fill_from_inbox(State, Values, Queues2, PIDs, SavePIDs)
end;
{level_result, PID, Key, Value} -> {level_result, PID, Key, Value} ->
?log( "{level_result, ~p, ~p, ...}~n", [PID, Key]), Queues2 = enter(PID, {Key, Value}, Queues),
fill(State, lists:keyreplace(PID, 1, Values, {PID,{Key,Value}}), Rest); if PID == hd(PIDs) ->
fill_from_inbox(State, Values, Queues2, tl(PIDs), SavePIDs);
true ->
fill_from_inbox(State, Values, Queues2, PIDs, SavePIDs)
end;
%% gen_fsm handling %% gen_fsm handling
{system, From, Req} -> {system, From, Req} ->
plain_fsm:handle_system_msg( plain_fsm:handle_system_msg(
From, Req, State, fun(S1) -> fill(S1, Values, PIDs) end); From, Req, State, fun(S1) -> fill_from_inbox(S1, Values, Queues, PIDs, SavePIDs) end);
{'EXIT', Parent, Reason}=Msg -> {'EXIT', Parent, Reason}=Msg ->
case plain_fsm:info(parent) == Parent of case plain_fsm:info(parent) == Parent of
@ -129,12 +176,17 @@ fill(State, Values, [PID|Rest]=PIDs) ->
plain_fsm:parent_EXIT(Reason, State); plain_fsm:parent_EXIT(Reason, State);
false -> false ->
error_logger:info_msg("unhandled EXIT message ~p~n", [Msg]), error_logger:info_msg("unhandled EXIT message ~p~n", [Msg]),
fill(State, Values, PIDs) fill_from_inbox(State, Values, Queues, PIDs, SavePIDs)
end end
end. end.
emit_next(State, []) -> enter(PID, Msg, Queues) ->
{PID, Q} = lists:keyfind(PID, 1, Queues),
Q2 = queue:in(Msg, Q),
lists:keyreplace(PID, 1, Queues, {PID, Q2}).
emit_next(State, [], _Queues) ->
?log( "emit_next ~p~n", [[]]), ?log( "emit_next ~p~n", [[]]),
Msg = {fold_done, self()}, Msg = {fold_done, self()},
Target = State#state.sendto, Target = State#state.sendto,
@ -142,7 +194,7 @@ emit_next(State, []) ->
Target ! Msg, Target ! Msg,
end_of_fold(State); end_of_fold(State);
emit_next(State, [{FirstPID,FirstKV}|Rest]=Values) -> emit_next(State, [{FirstPID,FirstKV}|Rest]=Values, Queues) ->
?log( "emit_next ~p~n", [Values]), ?log( "emit_next ~p~n", [Values]),
case case
lists:foldl(fun({P,{K1,_}=KV}, {{K2,_},_}) when K1 < K2 -> lists:foldl(fun({P,{K1,_}=KV}, {{K2,_},_}) when K1 < K2 ->
@ -156,7 +208,7 @@ emit_next(State, [{FirstPID,FirstKV}|Rest]=Values) ->
Rest) Rest)
of of
{{_, ?TOMBSTONE}, FillFrom} -> {{_, ?TOMBSTONE}, FillFrom} ->
fill(State, Values, FillFrom); fill(State, Values, Queues, FillFrom);
{{Key, limit}, _} -> {{Key, limit}, _} ->
?log( "~p ! ~p~n", [State#state.sendto, {fold_limit, self(), Key}]), ?log( "~p ! ~p~n", [State#state.sendto, {fold_limit, self(), Key}]),
State#state.sendto ! {fold_limit, self(), Key}, State#state.sendto ! {fold_limit, self(), Key},
@ -164,7 +216,7 @@ emit_next(State, [{FirstPID,FirstKV}|Rest]=Values) ->
{{Key, Value}, FillFrom} -> {{Key, Value}, FillFrom} ->
?log( "~p ! ~p~n", [State#state.sendto, {fold_result, self(), Key, '...'}]), ?log( "~p ! ~p~n", [State#state.sendto, {fold_result, self(), Key, '...'}]),
State#state.sendto ! {fold_result, self(), Key, Value}, State#state.sendto ! {fold_result, self(), Key, Value},
fill(State, Values, FillFrom) fill(State, Values, Queues, FillFrom)
end. end.
end_of_fold(State) -> end_of_fold(State) ->
@ -177,3 +229,4 @@ data_vsn() ->
code_change(_OldVsn, _State, _Extra) -> code_change(_OldVsn, _State, _Extra) ->
{ok, {#state{}, data_vsn()}}. {ok, {#state{}, data_vsn()}}.