From f821b38ea2948f4af40600e2ad03d51533b0b1db Mon Sep 17 00:00:00 2001 From: Kresten Krab Thorup Date: Sat, 5 May 2012 12:49:03 +0200 Subject: [PATCH] Fold back pressure, step 1 This first step of the fold back pressure impl changes fold worker so that it does not get flooded by messages. Now, we take messages and put them in queues (one per fold source), so we don't have to do selective receive on bazillions of messages. --- src/hanoi_fold_worker.erl | 85 +++++++++++++++++++++++++++++++-------- 1 file changed, 69 insertions(+), 16 deletions(-) diff --git a/src/hanoi_fold_worker.erl b/src/hanoi_fold_worker.erl index 9f4cb52..98da59e 100644 --- a/src/hanoi_fold_worker.erl +++ b/src/hanoi_fold_worker.erl @@ -69,7 +69,13 @@ start(SendTo) -> fun() -> process_flag(trap_exit,true), link(SendTo), +try initialize(#state{sendto=SendTo}, []) +catch + Class:Ex -> + error_logger:error_msg("Unexpected: ~p:~p ~p~n", [Class, Ex, erlang:get_stacktrace()]), + exit({bad, Class, Ex, erlang:get_stacktrace()}) +end end), {ok, PID}. @@ -86,8 +92,9 @@ initialize(State, PrefixFolders) -> {initialize, Folders} -> + Queues = [ {PID,queue:new()} || PID <- (PrefixFolders ++ Folders) ], Initial = [ {PID,undefined} || PID <- (PrefixFolders ++ Folders) ], - fill(State, Initial, PrefixFolders ++ Folders); + fill(State, Initial, Queues, PrefixFolders ++ Folders); %% gen_fsm handling {system, From, Req} -> @@ -100,28 +107,68 @@ initialize(State, PrefixFolders) -> end. -fill(State, Values, []) -> - emit_next(State, Values); +fill(State, Values, Queues, []) -> + emit_next(State, Values, Queues); + +fill(State, Values, Queues, [PID|Rest]=PIDs) -> + +% io:format(user, "v=~P, q=~P, pids=~p~n", [Values, 10, Queues, 10, PIDs]), + + case lists:keyfind(PID, 1, Queues) of + {PID, Q} -> + case queue:out(Q) of + {empty, Q} -> + fill_from_inbox(State, Values, Queues, PIDs, PIDs); + + {{value, Msg}, Q2} -> + Queues2 = lists:keyreplace(PID, 1, Queues, {PID, Q2}), + + case Msg of + done -> + fill(State, lists:keydelete(PID, 1, Values), Queues2, Rest); + {_Key, _Value}=KV -> + fill(State, lists:keyreplace(PID, 1, Values, {PID, KV}), Queues2, Rest) + end + end + end. + +fill_from_inbox(State, Values, Queues, [], PIDs) -> + fill(State, Values, Queues, PIDs); + +fill_from_inbox(State, Values, Queues, PIDs, SavePIDs) -> -fill(State, Values, [PID|Rest]=PIDs) -> receive die -> ok; {level_done, PID} -> - ?log( "{level_done, ~p}~n", [PID]), - fill(State, lists:keydelete(PID, 1, Values), Rest); + Queues2 = enter(PID, done, Queues), + if PID == hd(PIDs) -> + fill_from_inbox(State, Values, Queues2, tl(PIDs), SavePIDs); + true -> + fill_from_inbox(State, Values, Queues2, PIDs, SavePIDs) + end; + {level_limit, PID, Key} -> - ?log( "{level_limit, ~p}~n", [PID]), - fill(State, lists:keyreplace(PID, 1, Values, {PID,{Key,limit}}), Rest); + Queues2 = enter(PID, {Key, limit}, Queues), + if PID == hd(PIDs) -> + fill_from_inbox(State, Values, Queues2, tl(PIDs), SavePIDs); + true -> + fill_from_inbox(State, Values, Queues2, PIDs, SavePIDs) + end; + {level_result, PID, Key, Value} -> - ?log( "{level_result, ~p, ~p, ...}~n", [PID, Key]), - fill(State, lists:keyreplace(PID, 1, Values, {PID,{Key,Value}}), Rest); + Queues2 = enter(PID, {Key, Value}, Queues), + if PID == hd(PIDs) -> + fill_from_inbox(State, Values, Queues2, tl(PIDs), SavePIDs); + true -> + fill_from_inbox(State, Values, Queues2, PIDs, SavePIDs) + end; %% gen_fsm handling {system, From, Req} -> plain_fsm:handle_system_msg( - From, Req, State, fun(S1) -> fill(S1, Values, PIDs) end); + From, Req, State, fun(S1) -> fill_from_inbox(S1, Values, Queues, PIDs, SavePIDs) end); {'EXIT', Parent, Reason}=Msg -> case plain_fsm:info(parent) == Parent of @@ -129,12 +176,17 @@ fill(State, Values, [PID|Rest]=PIDs) -> plain_fsm:parent_EXIT(Reason, State); false -> error_logger:info_msg("unhandled EXIT message ~p~n", [Msg]), - fill(State, Values, PIDs) + fill_from_inbox(State, Values, Queues, PIDs, SavePIDs) end end. -emit_next(State, []) -> +enter(PID, Msg, Queues) -> + {PID, Q} = lists:keyfind(PID, 1, Queues), + Q2 = queue:in(Msg, Q), + lists:keyreplace(PID, 1, Queues, {PID, Q2}). + +emit_next(State, [], _Queues) -> ?log( "emit_next ~p~n", [[]]), Msg = {fold_done, self()}, Target = State#state.sendto, @@ -142,7 +194,7 @@ emit_next(State, []) -> Target ! Msg, end_of_fold(State); -emit_next(State, [{FirstPID,FirstKV}|Rest]=Values) -> +emit_next(State, [{FirstPID,FirstKV}|Rest]=Values, Queues) -> ?log( "emit_next ~p~n", [Values]), case lists:foldl(fun({P,{K1,_}=KV}, {{K2,_},_}) when K1 < K2 -> @@ -156,7 +208,7 @@ emit_next(State, [{FirstPID,FirstKV}|Rest]=Values) -> Rest) of {{_, ?TOMBSTONE}, FillFrom} -> - fill(State, Values, FillFrom); + fill(State, Values, Queues, FillFrom); {{Key, limit}, _} -> ?log( "~p ! ~p~n", [State#state.sendto, {fold_limit, self(), Key}]), State#state.sendto ! {fold_limit, self(), Key}, @@ -164,7 +216,7 @@ emit_next(State, [{FirstPID,FirstKV}|Rest]=Values) -> {{Key, Value}, FillFrom} -> ?log( "~p ! ~p~n", [State#state.sendto, {fold_result, self(), Key, '...'}]), State#state.sendto ! {fold_result, self(), Key, Value}, - fill(State, Values, FillFrom) + fill(State, Values, Queues, FillFrom) end. end_of_fold(State) -> @@ -177,3 +229,4 @@ data_vsn() -> code_change(_OldVsn, _State, _Extra) -> {ok, {#state{}, data_vsn()}}. +