Code cleanup
Clean up a little in hanoi_level, avoiding an extra message send when initiating incremental merge
This commit is contained in:
parent
eba7f820ef
commit
67f1c46b7e
1 changed files with 50 additions and 43 deletions
|
@ -56,6 +56,11 @@
|
||||||
}).
|
}).
|
||||||
|
|
||||||
|
|
||||||
|
%% if no activity for this long, we'll do some incremental
|
||||||
|
%% merge, just to clean up and optimize future operations.
|
||||||
|
-define(MERGE_TIMEOUT,1000).
|
||||||
|
|
||||||
|
|
||||||
-ifdef(DEBUG).
|
-ifdef(DEBUG).
|
||||||
-define(log(Fmt,Args), debug_log(State,Fmt,Args)).
|
-define(log(Fmt,Args), debug_log(State,Fmt,Args)).
|
||||||
debug_log(State,Fmt,Args) ->
|
debug_log(State,Fmt,Args) ->
|
||||||
|
@ -266,11 +271,10 @@ main_loop(State = #state{ next=Next }) ->
|
||||||
State#state.step_next_ref == undefined ->
|
State#state.step_next_ref == undefined ->
|
||||||
plain_rpc:send_reply(From, ok),
|
plain_rpc:send_reply(From, ok),
|
||||||
if HowMuch > 0 ->
|
if HowMuch > 0 ->
|
||||||
self() ! ?CALL(undefined, {step, HowMuch});
|
do_step(undefined, HowMuch, State);
|
||||||
true ->
|
true ->
|
||||||
ok
|
main_loop(State)
|
||||||
end,
|
end;
|
||||||
main_loop(State);
|
|
||||||
|
|
||||||
%% accept step any time there is not an outstanding step
|
%% accept step any time there is not an outstanding step
|
||||||
?CALL(StepFrom, {step, HowMuch})
|
?CALL(StepFrom, {step, HowMuch})
|
||||||
|
@ -278,45 +282,7 @@ main_loop(State = #state{ next=Next }) ->
|
||||||
State#state.step_caller == undefined,
|
State#state.step_caller == undefined,
|
||||||
State#state.step_next_ref == undefined
|
State#state.step_next_ref == undefined
|
||||||
->
|
->
|
||||||
|
do_step(StepFrom, HowMuch, State);
|
||||||
if State#state.b =/= undefined ->
|
|
||||||
WorkLeftHere = max(0, (2 * ?BTREE_SIZE(State#state.level)) - State#state.work_done);
|
|
||||||
true ->
|
|
||||||
WorkLeftHere = 0
|
|
||||||
end,
|
|
||||||
WorkToDoHere = min(WorkLeftHere, HowMuch),
|
|
||||||
DelegateWork = max(0,HowMuch - WorkToDoHere),
|
|
||||||
|
|
||||||
?log("step:~p, do:~p, left:~p ~n", [HowMuch, WorkToDoHere, WorkLeftHere]),
|
|
||||||
|
|
||||||
%% delegate the step request to the next level
|
|
||||||
if Next =:= undefined; DelegateWork == 0 ->
|
|
||||||
DelegateRef = undefined;
|
|
||||||
true ->
|
|
||||||
DelegateRef = plain_rpc:send_call(Next, {step, DelegateWork})
|
|
||||||
end,
|
|
||||||
|
|
||||||
if (State#state.merge_pid == undefined)
|
|
||||||
orelse (WorkToDoHere =< 0) ->
|
|
||||||
MergeRef = undefined;
|
|
||||||
true ->
|
|
||||||
MergePID = State#state.merge_pid,
|
|
||||||
MergeRef = monitor(process, MergePID),
|
|
||||||
MergePID ! {step, {self(), MergeRef}, WorkToDoHere}
|
|
||||||
end,
|
|
||||||
|
|
||||||
if (Next =:= undefined) andalso (MergeRef =:= undefined) ->
|
|
||||||
%% nothing to do ... just return OK
|
|
||||||
|
|
||||||
State2 = reply_step_ok(State#state { step_caller = StepFrom }),
|
|
||||||
main_loop(State2);
|
|
||||||
true ->
|
|
||||||
main_loop(State#state{ step_next_ref=DelegateRef,
|
|
||||||
step_caller=StepFrom,
|
|
||||||
step_merge_ref=MergeRef,
|
|
||||||
work_in_progress = WorkToDoHere
|
|
||||||
})
|
|
||||||
end;
|
|
||||||
|
|
||||||
{MRef, step_done} when MRef == State#state.step_merge_ref ->
|
{MRef, step_done} when MRef == State#state.step_merge_ref ->
|
||||||
demonitor(MRef, [flush]),
|
demonitor(MRef, [flush]),
|
||||||
|
@ -570,6 +536,47 @@ main_loop(State = #state{ next=Next }) ->
|
||||||
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
do_step(StepFrom, HowMuch, State) ->
|
||||||
|
if State#state.b =/= undefined ->
|
||||||
|
WorkLeftHere = max(0, (2 * ?BTREE_SIZE(State#state.level)) - State#state.work_done);
|
||||||
|
true ->
|
||||||
|
WorkLeftHere = 0
|
||||||
|
end,
|
||||||
|
WorkToDoHere = min(WorkLeftHere, HowMuch),
|
||||||
|
DelegateWork = max(0,HowMuch - WorkToDoHere),
|
||||||
|
|
||||||
|
?log("step:~p, do:~p, left:~p ~n", [HowMuch, WorkToDoHere, WorkLeftHere]),
|
||||||
|
|
||||||
|
%% delegate the step request to the next level
|
||||||
|
Next = State#state.next,
|
||||||
|
if Next =:= undefined; DelegateWork == 0 ->
|
||||||
|
DelegateRef = undefined;
|
||||||
|
true ->
|
||||||
|
DelegateRef = plain_rpc:send_call(Next, {step, DelegateWork})
|
||||||
|
end,
|
||||||
|
|
||||||
|
if (State#state.merge_pid == undefined)
|
||||||
|
orelse (WorkToDoHere =< 0) ->
|
||||||
|
MergeRef = undefined;
|
||||||
|
true ->
|
||||||
|
MergePID = State#state.merge_pid,
|
||||||
|
MergeRef = monitor(process, MergePID),
|
||||||
|
MergePID ! {step, {self(), MergeRef}, WorkToDoHere}
|
||||||
|
end,
|
||||||
|
|
||||||
|
if (Next =:= undefined) andalso (MergeRef =:= undefined) ->
|
||||||
|
%% nothing to do ... just return OK
|
||||||
|
|
||||||
|
State2 = reply_step_ok(State#state { step_caller = StepFrom }),
|
||||||
|
main_loop(State2);
|
||||||
|
true ->
|
||||||
|
main_loop(State#state{ step_next_ref = DelegateRef,
|
||||||
|
step_caller = StepFrom,
|
||||||
|
step_merge_ref = MergeRef,
|
||||||
|
work_in_progress = WorkToDoHere
|
||||||
|
})
|
||||||
|
end.
|
||||||
|
|
||||||
reply_step_ok(State) ->
|
reply_step_ok(State) ->
|
||||||
case State#state.step_caller of
|
case State#state.step_caller of
|
||||||
undefined -> ok;
|
undefined -> ok;
|
||||||
|
|
Loading…
Reference in a new issue