diff --git a/src/hanoi.erl b/src/hanoi.erl index ee89855..afb0775 100644 --- a/src/hanoi.erl +++ b/src/hanoi.erl @@ -244,7 +244,7 @@ open_levels(Dir,Options) -> %% remove old nursery file file:delete(filename:join(Dir,"nursery.data")), - {TopLevel, MaxMerge} = + {TopLevel, _MaxMerge} = lists:foldl( fun(LevelNo, {NextLevel, MergeWork0}) -> {ok, Level} = hanoi_level:open(Dir,LevelNo,NextLevel,Options,self()), @@ -258,10 +258,10 @@ open_levels(Dir,Options) -> %% we need to do this much merge work before we can guarantee %% response times ... this is the amount of "in flight" merging %% we lost when the hanoi store was closed. - ok = hanoi_level:incremental_merge(TopLevel, MaxMerge), + ok = hanoi_level:begin_incremental_merge(TopLevel), %% second incremental merge blocks until the previous is done - ok = hanoi_level:incremental_merge(TopLevel, 0), + ok = hanoi_level:await_incremental_merge(TopLevel), {ok, TopLevel, MaxLevel}. @@ -274,16 +274,12 @@ parse_level(FileName) -> end. -handle_info({bottom_level, N}, #state{ nursery=Nursery }=State) +handle_info({bottom_level, N}, #state{ nursery=Nursery, top=TopLevel }=State) when N > State#state.max_level -> State2 = State#state{ max_level = N, nursery= hanoi_nursery:set_max_level(Nursery, N) }, - %% when this happens, there is a race condition because inserts can already - %% be in process of being executed in the levels. The remedy is to initiate - %% some extra incremental merge. - - ok = hanoi_level:incremental_merge( State#state.top, ?BTREE_SIZE(?TOP_LEVEL)), + hanoi_level:set_max_level(TopLevel, N), {noreply, State2}; diff --git a/src/hanoi_level.erl b/src/hanoi_level.erl index 8f2a8c6..d64afaa 100644 --- a/src/hanoi_level.erl +++ b/src/hanoi_level.erl @@ -45,14 +45,15 @@ -export([data_vsn/0, code_change/3]). -export([open/5, lookup/2, inject/2, close/1, snapshot_range/3, blocking_range/3, - incremental_merge/2, unmerged_count/1]). + begin_incremental_merge/1, await_incremental_merge/1, set_max_level/2, + unmerged_count/1]). -include_lib("kernel/include/file.hrl"). -record(state, { a, b, c, next, dir, level, inject_done_ref, merge_pid, folding = [], step_next_ref, step_caller, step_merge_ref, - opts = [], owner, work_in_progress=0, work_done=0 + opts = [], owner, work_in_progress=0, work_done=0, max_level=?TOP_LEVEL }). @@ -94,12 +95,18 @@ inject(Ref, FileName) -> Result = plain_rpc:call(Ref, {inject, FileName}), Result. -incremental_merge(Ref,HowMuch) -> - plain_rpc:call(Ref, {incremental_merge, HowMuch}). +begin_incremental_merge(Ref) -> + plain_rpc:call(Ref, begin_incremental_merge). + +await_incremental_merge(Ref) -> + plain_rpc:call(Ref, await_incremental_merge). unmerged_count(Ref) -> plain_rpc:call(Ref, unmerged_count). +set_max_level(Ref, LevelNo) -> + plain_rpc:send_cast(Ref, {set_max_level, LevelNo}). + close(Ref) -> try plain_rpc:call(Ref, close) @@ -275,24 +282,35 @@ main_loop(State = #state{ next=Next }) -> plain_rpc:send_reply(From, total_unmerged(State)), main_loop(State); + %% propagate knowledge of new max level + ?CAST(_From, {set_max_level, Max}) -> + if Next =/= undefined -> + set_max_level(Next, Max); + true -> + ok + end, + main_loop(State#state{ max_level=Max }); + %% replies OK when there is no current step in progress - ?CALL(From, {incremental_merge, HowMuch}) + ?CALL(From, begin_incremental_merge) when State#state.step_merge_ref == undefined, State#state.step_next_ref == undefined -> plain_rpc:send_reply(From, ok), - if HowMuch > 0 -> - do_step(undefined, HowMuch, State); - true -> - main_loop(State) - end; + do_step(undefined, 0, State); + + ?CALL(From, await_incremental_merge) + when State#state.step_merge_ref == undefined, + State#state.step_next_ref == undefined -> + plain_rpc:send_reply(From, ok), + main_loop(State); %% accept step any time there is not an outstanding step - ?CALL(StepFrom, {step, HowMuch}) + ?CALL(StepFrom, {step_level, DoneWork}) when State#state.step_merge_ref == undefined, State#state.step_caller == undefined, State#state.step_next_ref == undefined -> - do_step(StepFrom, HowMuch, State); + do_step(StepFrom, DoneWork, State); {MRef, step_done} when MRef == State#state.step_merge_ref -> demonitor(MRef, [flush]), @@ -558,23 +576,26 @@ main_loop(State = #state{ next=Next }) -> end. -do_step(StepFrom, HowMuch, State) -> +do_step(StepFrom, PreviousWork, State) -> if (State#state.b =/= undefined) andalso (State#state.merge_pid =/= undefined) -> WorkLeftHere = max(0, (2 * ?BTREE_SIZE(State#state.level)) - State#state.work_done); true -> WorkLeftHere = 0 end, - WorkToDoHere = min(WorkLeftHere, HowMuch), - DelegateWork = max(0,HowMuch - WorkToDoHere), + MaxLevel = max(State#state.max_level, State#state.level), + TotalWork = (MaxLevel-?TOP_LEVEL+1) * ?BTREE_SIZE(?TOP_LEVEL), + WorkUnitsLeft = max(0, TotalWork-PreviousWork), + WorkToDoHere = min(WorkLeftHere, WorkUnitsLeft), + WorkIncludingHere = PreviousWork + WorkToDoHere, - ?log("step:~p, do:~p, left:~p ~n", [HowMuch, WorkToDoHere, WorkLeftHere]), + ?log("do_step prev:~p, do:~p of ~p ~n", [PreviousWork, WorkToDoHere, WorkLeftHere]), - %% delegate the step request to the next level + %% delegate the step_level request to the next level Next = State#state.next, - if Next =:= undefined; DelegateWork == 0 -> + if Next =:= undefined -> DelegateRef = undefined; true -> - DelegateRef = plain_rpc:send_call(Next, {step, DelegateWork}) + DelegateRef = plain_rpc:send_call(Next, {step_level, WorkIncludingHere}) end, if WorkToDoHere > 0 -> @@ -588,12 +609,6 @@ do_step(StepFrom, HowMuch, State) -> if (DelegateRef =:= undefined) andalso (MergeRef =:= undefined) -> %% nothing to do ... just return OK - if (DelegateWork > 0) -> - ?log("undone work: ~p", [DelegateWork]); - true -> - ok - end, - State2 = reply_step_ok(State#state { step_caller = StepFrom }), main_loop(State2); true -> diff --git a/src/hanoi_merger.erl b/src/hanoi_merger.erl index 6f4481d..0a2487d 100644 --- a/src/hanoi_merger.erl +++ b/src/hanoi_merger.erl @@ -91,7 +91,7 @@ scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, {N, FromPID}) when N < 1, AK receive {step, From, HowMany} -> scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, {N+HowMany, From}) - after 10000 -> + after 5000 -> case ?LOCAL_WRITER of true -> Args = {BT1, BT2, hanoi_writer:serialize(Out), IsLastLevel, AKVs, BKVs, Count, N}, @@ -176,7 +176,7 @@ scan_only(BT, Out, IsLastLevel, KVs, Count, {N, FromPID}) when N < 1, KVs =/= [] receive {step, From, HowMany} -> scan_only(BT, Out, IsLastLevel, KVs, Count, {N+HowMany, From}) - after 10000 -> + after 5000 -> Args = {BT, hanoi_writer:serialize(Out), IsLastLevel, KVs, Count, N}, Keep = zlib:gzip ( erlang:term_to_binary( Args ) ), hibernate_scan_only(Keep) diff --git a/src/hanoi_nursery.erl b/src/hanoi_nursery.erl index 60edf7c..0de32a1 100644 --- a/src/hanoi_nursery.erl +++ b/src/hanoi_nursery.erl @@ -164,8 +164,7 @@ finish(#nursery{ dir=Dir, cache=Cache, log_file=LogFile, %% issue some work if this is a top-level inject (blocks until previous such %% incremental merge is finished). - hanoi_level:incremental_merge(TopLevel, - (MaxLevel-?TOP_LEVEL+1)*?BTREE_SIZE(?TOP_LEVEL)), + hanoi_level:begin_incremental_merge(TopLevel), ok; diff --git a/visualize-hanoi.sh b/visualize-hanoi.sh old mode 100644 new mode 100755 index b8e8f4c..d716b90 --- a/visualize-hanoi.sh +++ b/visualize-hanoi.sh @@ -96,13 +96,13 @@ function dynamic() { let "t=t+1" now=`date +%s` let "now=now-start" - free=`df -m . | tail -1 | awk '{print $4}'` - used=`du -m | awk '{print $1}' ` + free=`df -m . 2> /dev/null | tail -1 | awk '{print $4}'` + used=`du -m 2> /dev/null | awk '{print $1}' ` printf "%5d %6d [%s\n" "$t" "$now" "$s ${used}Mb (${free}Mb free)" old="$s" else # Sleep a little bit: - perl -e 'use Time::HiRes; Time::HiRes::usleep(100000)' + sleep 1 fi done }