diff --git a/src/hanoi_level.erl b/src/hanoi_level.erl index 5e5ab9c..f773e28 100644 --- a/src/hanoi_level.erl +++ b/src/hanoi_level.erl @@ -56,6 +56,11 @@ }). +%% if no activity for this long, we'll do some incremental +%% merge, just to clean up and optimize future operations. +-define(MERGE_TIMEOUT,1000). + + -ifdef(DEBUG). -define(log(Fmt,Args), debug_log(State,Fmt,Args)). debug_log(State,Fmt,Args) -> @@ -266,11 +271,10 @@ main_loop(State = #state{ next=Next }) -> State#state.step_next_ref == undefined -> plain_rpc:send_reply(From, ok), if HowMuch > 0 -> - self() ! ?CALL(undefined, {step, HowMuch}); + do_step(undefined, HowMuch, State); true -> - ok - end, - main_loop(State); + main_loop(State) + end; %% accept step any time there is not an outstanding step ?CALL(StepFrom, {step, HowMuch}) @@ -278,45 +282,7 @@ main_loop(State = #state{ next=Next }) -> State#state.step_caller == undefined, State#state.step_next_ref == undefined -> - - if State#state.b =/= undefined -> - WorkLeftHere = max(0, (2 * ?BTREE_SIZE(State#state.level)) - State#state.work_done); - true -> - WorkLeftHere = 0 - end, - WorkToDoHere = min(WorkLeftHere, HowMuch), - DelegateWork = max(0,HowMuch - WorkToDoHere), - - ?log("step:~p, do:~p, left:~p ~n", [HowMuch, WorkToDoHere, WorkLeftHere]), - - %% delegate the step request to the next level - if Next =:= undefined; DelegateWork == 0 -> - DelegateRef = undefined; - true -> - DelegateRef = plain_rpc:send_call(Next, {step, DelegateWork}) - end, - - if (State#state.merge_pid == undefined) - orelse (WorkToDoHere =< 0) -> - MergeRef = undefined; - true -> - MergePID = State#state.merge_pid, - MergeRef = monitor(process, MergePID), - MergePID ! {step, {self(), MergeRef}, WorkToDoHere} - end, - - if (Next =:= undefined) andalso (MergeRef =:= undefined) -> - %% nothing to do ... just return OK - - State2 = reply_step_ok(State#state { step_caller = StepFrom }), - main_loop(State2); - true -> - main_loop(State#state{ step_next_ref=DelegateRef, - step_caller=StepFrom, - step_merge_ref=MergeRef, - work_in_progress = WorkToDoHere - }) - end; + do_step(StepFrom, HowMuch, State); {MRef, step_done} when MRef == State#state.step_merge_ref -> demonitor(MRef, [flush]), @@ -570,6 +536,47 @@ main_loop(State = #state{ next=Next }) -> end. +do_step(StepFrom, HowMuch, State) -> + if State#state.b =/= undefined -> + WorkLeftHere = max(0, (2 * ?BTREE_SIZE(State#state.level)) - State#state.work_done); + true -> + WorkLeftHere = 0 + end, + WorkToDoHere = min(WorkLeftHere, HowMuch), + DelegateWork = max(0,HowMuch - WorkToDoHere), + + ?log("step:~p, do:~p, left:~p ~n", [HowMuch, WorkToDoHere, WorkLeftHere]), + + %% delegate the step request to the next level + Next = State#state.next, + if Next =:= undefined; DelegateWork == 0 -> + DelegateRef = undefined; + true -> + DelegateRef = plain_rpc:send_call(Next, {step, DelegateWork}) + end, + + if (State#state.merge_pid == undefined) + orelse (WorkToDoHere =< 0) -> + MergeRef = undefined; + true -> + MergePID = State#state.merge_pid, + MergeRef = monitor(process, MergePID), + MergePID ! {step, {self(), MergeRef}, WorkToDoHere} + end, + + if (Next =:= undefined) andalso (MergeRef =:= undefined) -> + %% nothing to do ... just return OK + + State2 = reply_step_ok(State#state { step_caller = StepFrom }), + main_loop(State2); + true -> + main_loop(State#state{ step_next_ref = DelegateRef, + step_caller = StepFrom, + step_merge_ref = MergeRef, + work_in_progress = WorkToDoHere + }) + end. + reply_step_ok(State) -> case State#state.step_caller of undefined -> ok;