Improve merge work computation
This commit is contained in:
parent
5d95070669
commit
e147dd9d1e
1 changed files with 27 additions and 20 deletions
|
@ -31,14 +31,14 @@
|
||||||
-include("src/hanoi.hrl").
|
-include("src/hanoi.hrl").
|
||||||
|
|
||||||
%%
|
%%
|
||||||
%% Manages a "pair" of hanoi index (or rathern, 0, 1 or 2), and governs
|
%% Manages 0..2 of hanoi index file, and governs all aspects of
|
||||||
%% the process of injecting/merging parent trees into this pair.
|
%% merging, lookup, folding, etc. for these files
|
||||||
%%
|
%%
|
||||||
|
|
||||||
%%
|
%%
|
||||||
%% For now, we use plain_fsm, because we *want* selective receive to postpone
|
%% DESIGN: uses plain_fsm, because we *want* selective receive to
|
||||||
%% pending injects while we're merging. That's a lot simpler than maintaining
|
%% postpone pending injects while we're merging. That's a lot simpler
|
||||||
%% to corresponding merge queue.
|
%% than maintaining to corresponding merge queue.
|
||||||
%%
|
%%
|
||||||
|
|
||||||
-behavior(plain_fsm).
|
-behavior(plain_fsm).
|
||||||
|
@ -52,7 +52,7 @@
|
||||||
-record(state, {
|
-record(state, {
|
||||||
a, b, c, next, dir, level, inject_done_ref, merge_pid, folding = [],
|
a, b, c, next, dir, level, inject_done_ref, merge_pid, folding = [],
|
||||||
step_next_ref, step_caller, step_merge_ref,
|
step_next_ref, step_caller, step_merge_ref,
|
||||||
opts = [], owner, work_done=0
|
opts = [], owner, work_in_progress=0, work_done=0
|
||||||
}).
|
}).
|
||||||
|
|
||||||
|
|
||||||
|
@ -279,7 +279,11 @@ main_loop(State = #state{ next=Next }) ->
|
||||||
State#state.step_next_ref == undefined
|
State#state.step_next_ref == undefined
|
||||||
->
|
->
|
||||||
|
|
||||||
WorkLeftHere = max(0,total_unmerged(State) - State#state.work_done),
|
if State#state.b =/= undefined ->
|
||||||
|
WorkLeftHere = max(0, (2 * ?BTREE_SIZE(State#state.level)) - State#state.work_done);
|
||||||
|
true ->
|
||||||
|
WorkLeftHere = 0
|
||||||
|
end,
|
||||||
WorkToDoHere = min(WorkLeftHere, HowMuch),
|
WorkToDoHere = min(WorkLeftHere, HowMuch),
|
||||||
DelegateWork = max(0,HowMuch - WorkToDoHere),
|
DelegateWork = max(0,HowMuch - WorkToDoHere),
|
||||||
|
|
||||||
|
@ -294,35 +298,37 @@ main_loop(State = #state{ next=Next }) ->
|
||||||
|
|
||||||
if (State#state.merge_pid == undefined)
|
if (State#state.merge_pid == undefined)
|
||||||
orelse (WorkToDoHere =< 0) ->
|
orelse (WorkToDoHere =< 0) ->
|
||||||
MergeRef = undefined,
|
MergeRef = undefined;
|
||||||
NewWorkDone = State#state.work_done;
|
|
||||||
true ->
|
true ->
|
||||||
MergePID = State#state.merge_pid,
|
MergePID = State#state.merge_pid,
|
||||||
MergeRef = monitor(process, MergePID),
|
MergeRef = monitor(process, MergePID),
|
||||||
MergePID ! {step, {self(), MergeRef}, WorkToDoHere},
|
MergePID ! {step, {self(), MergeRef}, WorkToDoHere}
|
||||||
NewWorkDone = State#state.work_done + WorkToDoHere
|
|
||||||
end,
|
end,
|
||||||
|
|
||||||
if (Next =:= undefined) andalso (MergeRef =:= undefined) ->
|
if (Next =:= undefined) andalso (MergeRef =:= undefined) ->
|
||||||
%% nothing to do ... just return OK
|
%% nothing to do ... just return OK
|
||||||
|
|
||||||
State2 = reply_step_ok(State#state { step_caller = StepFrom }),
|
State2 = reply_step_ok(State#state { step_caller = StepFrom }),
|
||||||
main_loop(State2#state { work_done = NewWorkDone });
|
main_loop(State2);
|
||||||
true ->
|
true ->
|
||||||
main_loop(State#state{ step_next_ref=DelegateRef,
|
main_loop(State#state{ step_next_ref=DelegateRef,
|
||||||
step_caller=StepFrom,
|
step_caller=StepFrom,
|
||||||
step_merge_ref=MergeRef,
|
step_merge_ref=MergeRef,
|
||||||
work_done = NewWorkDone
|
work_in_progress = WorkToDoHere
|
||||||
})
|
})
|
||||||
end;
|
end;
|
||||||
|
|
||||||
{MRef, step_done} when MRef == State#state.step_merge_ref ->
|
{MRef, step_done} when MRef == State#state.step_merge_ref ->
|
||||||
demonitor(MRef, [flush]),
|
demonitor(MRef, [flush]),
|
||||||
|
|
||||||
case State#state.step_next_ref of
|
State1 = State#state{ work_done = State#state.work_done + State#state.work_in_progress,
|
||||||
|
work_in_progress = 0 },
|
||||||
|
|
||||||
|
case State1#state.step_next_ref of
|
||||||
undefined ->
|
undefined ->
|
||||||
State2 = reply_step_ok(State);
|
State2 = reply_step_ok(State1);
|
||||||
_ ->
|
_ ->
|
||||||
State2 = State
|
State2 = State1
|
||||||
end,
|
end,
|
||||||
|
|
||||||
main_loop(State2#state{ step_merge_ref=undefined });
|
main_loop(State2#state{ step_merge_ref=undefined });
|
||||||
|
@ -338,7 +344,7 @@ main_loop(State = #state{ next=Next }) ->
|
||||||
State2 = State
|
State2 = State
|
||||||
end,
|
end,
|
||||||
|
|
||||||
main_loop(State2#state{ step_merge_ref=undefined });
|
main_loop(State2#state{ step_merge_ref=undefined, work_in_progress=0 });
|
||||||
|
|
||||||
|
|
||||||
?REPLY(MRef, ok)
|
?REPLY(MRef, ok)
|
||||||
|
@ -573,10 +579,11 @@ reply_step_ok(State) ->
|
||||||
|
|
||||||
total_unmerged(State) ->
|
total_unmerged(State) ->
|
||||||
Files =
|
Files =
|
||||||
(if State#state.b == undefined -> 0; true -> 1 end)
|
(if State#state.b == undefined -> 0; true -> 2 end)
|
||||||
+ (if State#state.c == undefined -> 0; true -> 1 end),
|
% + (if State#state.c == undefined -> 0; true -> 1 end)
|
||||||
|
,
|
||||||
|
|
||||||
Files * ?BTREE_SIZE( State#state.level ).
|
Files * 1 * ?BTREE_SIZE( State#state.level ).
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue