Refactor merge work computation

Now merge work computation is close to ideal.
It does not take into account the actual size
of files at each level, but we have not figured
out how to utilize that knowledge.
This commit is contained in:
Kresten Krab Thorup 2012-04-30 23:34:27 +02:00
parent 380a4f9cfc
commit c8e403af8c
5 changed files with 51 additions and 41 deletions

View file

@ -244,7 +244,7 @@ open_levels(Dir,Options) ->
%% remove old nursery file
file:delete(filename:join(Dir,"nursery.data")),
{TopLevel, MaxMerge} =
{TopLevel, _MaxMerge} =
lists:foldl( fun(LevelNo, {NextLevel, MergeWork0}) ->
{ok, Level} = hanoi_level:open(Dir,LevelNo,NextLevel,Options,self()),
@ -258,10 +258,10 @@ open_levels(Dir,Options) ->
%% we need to do this much merge work before we can guarantee
%% response times ... this is the amount of "in flight" merging
%% we lost when the hanoi store was closed.
ok = hanoi_level:incremental_merge(TopLevel, MaxMerge),
ok = hanoi_level:begin_incremental_merge(TopLevel),
%% second incremental merge blocks until the previous is done
ok = hanoi_level:incremental_merge(TopLevel, 0),
ok = hanoi_level:await_incremental_merge(TopLevel),
{ok, TopLevel, MaxLevel}.
@ -274,16 +274,12 @@ parse_level(FileName) ->
end.
handle_info({bottom_level, N}, #state{ nursery=Nursery }=State)
handle_info({bottom_level, N}, #state{ nursery=Nursery, top=TopLevel }=State)
when N > State#state.max_level ->
State2 = State#state{ max_level = N,
nursery= hanoi_nursery:set_max_level(Nursery, N) },
%% when this happens, there is a race condition because inserts can already
%% be in process of being executed in the levels. The remedy is to initiate
%% some extra incremental merge.
ok = hanoi_level:incremental_merge( State#state.top, ?BTREE_SIZE(?TOP_LEVEL)),
hanoi_level:set_max_level(TopLevel, N),
{noreply, State2};

View file

@ -45,14 +45,15 @@
-export([data_vsn/0, code_change/3]).
-export([open/5, lookup/2, inject/2, close/1, snapshot_range/3, blocking_range/3,
incremental_merge/2, unmerged_count/1]).
begin_incremental_merge/1, await_incremental_merge/1, set_max_level/2,
unmerged_count/1]).
-include_lib("kernel/include/file.hrl").
-record(state, {
a, b, c, next, dir, level, inject_done_ref, merge_pid, folding = [],
step_next_ref, step_caller, step_merge_ref,
opts = [], owner, work_in_progress=0, work_done=0
opts = [], owner, work_in_progress=0, work_done=0, max_level=?TOP_LEVEL
}).
@ -94,12 +95,18 @@ inject(Ref, FileName) ->
Result = plain_rpc:call(Ref, {inject, FileName}),
Result.
incremental_merge(Ref,HowMuch) ->
plain_rpc:call(Ref, {incremental_merge, HowMuch}).
begin_incremental_merge(Ref) ->
plain_rpc:call(Ref, begin_incremental_merge).
await_incremental_merge(Ref) ->
plain_rpc:call(Ref, await_incremental_merge).
unmerged_count(Ref) ->
plain_rpc:call(Ref, unmerged_count).
set_max_level(Ref, LevelNo) ->
plain_rpc:send_cast(Ref, {set_max_level, LevelNo}).
close(Ref) ->
try
plain_rpc:call(Ref, close)
@ -275,24 +282,35 @@ main_loop(State = #state{ next=Next }) ->
plain_rpc:send_reply(From, total_unmerged(State)),
main_loop(State);
%% propagate knowledge of new max level
?CAST(_From, {set_max_level, Max}) ->
if Next =/= undefined ->
set_max_level(Next, Max);
true ->
ok
end,
main_loop(State#state{ max_level=Max });
%% replies OK when there is no current step in progress
?CALL(From, {incremental_merge, HowMuch})
?CALL(From, begin_incremental_merge)
when State#state.step_merge_ref == undefined,
State#state.step_next_ref == undefined ->
plain_rpc:send_reply(From, ok),
if HowMuch > 0 ->
do_step(undefined, HowMuch, State);
true ->
main_loop(State)
end;
do_step(undefined, 0, State);
?CALL(From, await_incremental_merge)
when State#state.step_merge_ref == undefined,
State#state.step_next_ref == undefined ->
plain_rpc:send_reply(From, ok),
main_loop(State);
%% accept step any time there is not an outstanding step
?CALL(StepFrom, {step, HowMuch})
?CALL(StepFrom, {step_level, DoneWork})
when State#state.step_merge_ref == undefined,
State#state.step_caller == undefined,
State#state.step_next_ref == undefined
->
do_step(StepFrom, HowMuch, State);
do_step(StepFrom, DoneWork, State);
{MRef, step_done} when MRef == State#state.step_merge_ref ->
demonitor(MRef, [flush]),
@ -558,23 +576,26 @@ main_loop(State = #state{ next=Next }) ->
end.
do_step(StepFrom, HowMuch, State) ->
do_step(StepFrom, PreviousWork, State) ->
if (State#state.b =/= undefined) andalso (State#state.merge_pid =/= undefined) ->
WorkLeftHere = max(0, (2 * ?BTREE_SIZE(State#state.level)) - State#state.work_done);
true ->
WorkLeftHere = 0
end,
WorkToDoHere = min(WorkLeftHere, HowMuch),
DelegateWork = max(0,HowMuch - WorkToDoHere),
MaxLevel = max(State#state.max_level, State#state.level),
TotalWork = (MaxLevel-?TOP_LEVEL+1) * ?BTREE_SIZE(?TOP_LEVEL),
WorkUnitsLeft = max(0, TotalWork-PreviousWork),
WorkToDoHere = min(WorkLeftHere, WorkUnitsLeft),
WorkIncludingHere = PreviousWork + WorkToDoHere,
?log("step:~p, do:~p, left:~p ~n", [HowMuch, WorkToDoHere, WorkLeftHere]),
?log("do_step prev:~p, do:~p of ~p ~n", [PreviousWork, WorkToDoHere, WorkLeftHere]),
%% delegate the step request to the next level
%% delegate the step_level request to the next level
Next = State#state.next,
if Next =:= undefined; DelegateWork == 0 ->
if Next =:= undefined ->
DelegateRef = undefined;
true ->
DelegateRef = plain_rpc:send_call(Next, {step, DelegateWork})
DelegateRef = plain_rpc:send_call(Next, {step_level, WorkIncludingHere})
end,
if WorkToDoHere > 0 ->
@ -588,12 +609,6 @@ do_step(StepFrom, HowMuch, State) ->
if (DelegateRef =:= undefined) andalso (MergeRef =:= undefined) ->
%% nothing to do ... just return OK
if (DelegateWork > 0) ->
?log("undone work: ~p", [DelegateWork]);
true ->
ok
end,
State2 = reply_step_ok(State#state { step_caller = StepFrom }),
main_loop(State2);
true ->

View file

@ -91,7 +91,7 @@ scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, {N, FromPID}) when N < 1, AK
receive
{step, From, HowMany} ->
scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, {N+HowMany, From})
after 10000 ->
after 5000 ->
case ?LOCAL_WRITER of
true ->
Args = {BT1, BT2, hanoi_writer:serialize(Out), IsLastLevel, AKVs, BKVs, Count, N},
@ -176,7 +176,7 @@ scan_only(BT, Out, IsLastLevel, KVs, Count, {N, FromPID}) when N < 1, KVs =/= []
receive
{step, From, HowMany} ->
scan_only(BT, Out, IsLastLevel, KVs, Count, {N+HowMany, From})
after 10000 ->
after 5000 ->
Args = {BT, hanoi_writer:serialize(Out), IsLastLevel, KVs, Count, N},
Keep = zlib:gzip ( erlang:term_to_binary( Args ) ),
hibernate_scan_only(Keep)

View file

@ -164,8 +164,7 @@ finish(#nursery{ dir=Dir, cache=Cache, log_file=LogFile,
%% issue some work if this is a top-level inject (blocks until previous such
%% incremental merge is finished).
hanoi_level:incremental_merge(TopLevel,
(MaxLevel-?TOP_LEVEL+1)*?BTREE_SIZE(?TOP_LEVEL)),
hanoi_level:begin_incremental_merge(TopLevel),
ok;

6
visualize-hanoi.sh Normal file → Executable file
View file

@ -96,13 +96,13 @@ function dynamic() {
let "t=t+1"
now=`date +%s`
let "now=now-start"
free=`df -m . | tail -1 | awk '{print $4}'`
used=`du -m | awk '{print $1}' `
free=`df -m . 2> /dev/null | tail -1 | awk '{print $4}'`
used=`du -m 2> /dev/null | awk '{print $1}' `
printf "%5d %6d [%s\n" "$t" "$now" "$s ${used}Mb (${free}Mb free)"
old="$s"
else
# Sleep a little bit:
perl -e 'use Time::HiRes; Time::HiRes::usleep(100000)'
sleep 1
fi
done
}