Fold back pressure, step 2

This makes fold-from-snapshot use the back pressure
model of doing plain_rpc:call to the merge worker
delivering chunks of 100 KVs.

The back pressure is entirely internal to hanoi,
designed to ensure that the process that merges
fold results from the individual levels is not
swamped with fold data.

Folds with a limit < 10 still do "blocking fold"
which is more efficient and uses fewer FDs, but 
blocks concurrent put/get operations.
This commit is contained in:
Kresten Krab Thorup 2012-05-05 18:53:02 +02:00
parent 49afbbc411
commit 8f1600b41a
2 changed files with 58 additions and 19 deletions

View file

@ -25,7 +25,11 @@
-module(hanoi_fold_worker). -module(hanoi_fold_worker).
-author('Kresten Krab Thorup <krab@trifork.com>'). -author('Kresten Krab Thorup <krab@trifork.com>').
-ifdef(DEBUG).
-define(log(Fmt,Args),io:format(user,Fmt,Args)).
-else.
-define(log(Fmt,Args),ok). -define(log(Fmt,Args),ok).
-endif.
%% %%
%% This worker is used to merge fold results from individual %% This worker is used to merge fold results from individual
@ -61,6 +65,7 @@
-export([data_vsn/0, code_change/3]). -export([data_vsn/0, code_change/3]).
-include("hanoi.hrl"). -include("hanoi.hrl").
-include("plain_rpc.hrl").
-record(state, {sendto}). -record(state, {sendto}).
@ -118,7 +123,7 @@ fill(State, Values, Queues, [PID|Rest]=PIDs) ->
{PID, Q} -> {PID, Q} ->
case queue:out(Q) of case queue:out(Q) of
{empty, Q} -> {empty, Q} ->
fill_from_inbox(State, Values, Queues, PIDs, PIDs); fill_from_inbox(State, Values, Queues, [PID], PIDs);
{{value, Msg}, Q2} -> {{value, Msg}, Q2} ->
Queues2 = lists:keyreplace(PID, 1, Queues, {PID, Q2}), Queues2 = lists:keyreplace(PID, 1, Queues, {PID, Q2}),
@ -135,35 +140,32 @@ fill(State, Values, Queues, [PID|Rest]=PIDs) ->
fill_from_inbox(State, Values, Queues, [], PIDs) -> fill_from_inbox(State, Values, Queues, [], PIDs) ->
fill(State, Values, Queues, PIDs); fill(State, Values, Queues, PIDs);
fill_from_inbox(State, Values, Queues, PIDs, SavePIDs) -> fill_from_inbox(State, Values, Queues, [PID|_]=PIDs, SavePIDs) ->
?log("waiting for ~p~n", [PIDs]),
receive receive
die -> die ->
ok; ok;
{level_done, PID} -> {level_done, PID} ->
?log("got {done, ~p}~n", [PID]),
Queues2 = enter(PID, done, Queues), Queues2 = enter(PID, done, Queues),
if PID == hd(PIDs) -> fill_from_inbox(State, Values, Queues2, lists:delete(PID,PIDs), SavePIDs);
fill_from_inbox(State, Values, Queues2, tl(PIDs), SavePIDs);
true ->
fill_from_inbox(State, Values, Queues2, PIDs, SavePIDs)
end;
{level_limit, PID, Key} -> {level_limit, PID, Key} ->
?log("got {limit, ~p}~n", [PID]),
Queues2 = enter(PID, {Key, limit}, Queues), Queues2 = enter(PID, {Key, limit}, Queues),
if PID == hd(PIDs) -> fill_from_inbox(State, Values, Queues2, lists:delete(PID,PIDs), SavePIDs);
fill_from_inbox(State, Values, Queues2, tl(PIDs), SavePIDs);
true ->
fill_from_inbox(State, Values, Queues2, PIDs, SavePIDs)
end;
{level_result, PID, Key, Value} -> {level_result, PID, Key, Value} ->
?log("got {result, ~p}~n", [PID]),
Queues2 = enter(PID, {Key, Value}, Queues), Queues2 = enter(PID, {Key, Value}, Queues),
if PID == hd(PIDs) -> fill_from_inbox(State, Values, Queues2, lists:delete(PID,PIDs), SavePIDs);
fill_from_inbox(State, Values, Queues2, tl(PIDs), SavePIDs);
true -> ?CALL(From,{level_results, PID, KVs}) ->
fill_from_inbox(State, Values, Queues2, PIDs, SavePIDs) ?log("got {results, ~p}~n", [PID]),
end; plain_rpc:send_reply(From,ok),
Queues2 = enter_many(PID, KVs, Queues),
fill_from_inbox(State, Values, Queues2, lists:delete(PID,PIDs), SavePIDs);
%% gen_fsm handling %% gen_fsm handling
{system, From, Req} -> {system, From, Req} ->
@ -186,6 +188,11 @@ enter(PID, Msg, Queues) ->
Q2 = queue:in(Msg, Q), Q2 = queue:in(Msg, Q),
lists:keyreplace(PID, 1, Queues, {PID, Q2}). lists:keyreplace(PID, 1, Queues, {PID, Q2}).
enter_many(PID, Msgs, Queues) ->
{PID, Q} = lists:keyfind(PID, 1, Queues),
Q2 = lists:foldl(fun queue:in/2, Q, Msgs),
lists:keyreplace(PID, 1, Queues, {PID, Q2}).
emit_next(State, [], _Queues) -> emit_next(State, [], _Queues) ->
?log( "emit_next ~p~n", [[]]), ?log( "emit_next ~p~n", [[]]),
Msg = {fold_done, self()}, Msg = {fold_done, self()},

View file

@ -751,7 +751,7 @@ try
?log("start_range_fold ~p on ~p -> ~p", [self, FileName, WorkerPID]), ?log("start_range_fold ~p on ~p -> ~p", [self, FileName, WorkerPID]),
erlang:link(WorkerPID), erlang:link(WorkerPID),
{ok, File} = hanoi_reader:open(FileName, [folding|State#state.opts]), {ok, File} = hanoi_reader:open(FileName, [folding|State#state.opts]),
do_range_fold(File, WorkerPID, self(), Range), do_range_fold2(File, WorkerPID, self(), Range),
erlang:unlink(WorkerPID), erlang:unlink(WorkerPID),
hanoi_reader:close(File), hanoi_reader:close(File),
@ -785,3 +785,35 @@ do_range_fold(BT, WorkerPID, SelfOrRef, Range) ->
end, end,
ok. ok.
-define(FOLD_CHUNK_SIZE, 100).
-spec do_range_fold2(BT :: hanoi_reader:read_file(),
WorkerPID :: pid(),
SelfOrRef :: pid() | reference(),
Range :: #btree_range{} ) -> ok.
do_range_fold2(BT, WorkerPID, SelfOrRef, Range) ->
case hanoi_reader:range_fold(fun(Key,Value,{0,KVs}) ->
send(WorkerPID, SelfOrRef, [{Key,Value}|KVs]),
{?FOLD_CHUNK_SIZE-1, []};
(Key,Value,{N,KVs}) ->
{N-1,[{Key,Value}|KVs]}
end,
{?FOLD_CHUNK_SIZE-1,[]},
BT,
Range) of
{limit, {_,KVs}, LastKey} ->
send(WorkerPID, SelfOrRef, KVs),
WorkerPID ! {level_limit, SelfOrRef, LastKey};
{done, {_, KVs}} ->
%% tell fold merge worker we're done
send(WorkerPID, SelfOrRef, KVs),
WorkerPID ! {level_done, SelfOrRef}
end,
ok.
send(_,_,[]) ->
[];
send(WorkerPID,Ref,ReverseKVs) ->
plain_rpc:call(WorkerPID, {level_results, Ref, lists:reverse(ReverseKVs)}).