diff --git a/src/hanoi.erl b/src/hanoi.erl index 8513ed9..d80d966 100644 --- a/src/hanoi.erl +++ b/src/hanoi.erl @@ -197,13 +197,13 @@ init([Dir, Opts]) -> case file:read_file_info(Dir) of {ok, #file_info{ type=directory }} -> {ok, TopLevel, MaxLevel} = open_levels(Dir,Opts), - {ok, Nursery} = hanoi_nursery:recover(Dir, TopLevel); + {ok, Nursery} = hanoi_nursery:recover(Dir, TopLevel, MaxLevel); {error, E} when E =:= enoent -> ok = file:make_dir(Dir), {ok, TopLevel} = hanoi_level:open(Dir, ?TOP_LEVEL, undefined, Opts, self()), MaxLevel = ?TOP_LEVEL, - {ok, Nursery} = hanoi_nursery:new(Dir) + {ok, Nursery} = hanoi_nursery:new(Dir, MaxLevel) end, {ok, #state{ top=TopLevel, dir=Dir, nursery=Nursery, opt=Opts, max_level=MaxLevel }}. @@ -233,12 +233,12 @@ open_levels(Dir,Options) -> file:delete(filename:join(Dir,"nursery.data")), {TopLevel, MaxMerge} = - lists:foldl( fun(LevelNo, {Prev, Max}) -> + lists:foldl( fun(LevelNo, {Prev, MergeWork0}) -> {ok, Level} = hanoi_level:open(Dir,LevelNo,Prev,Options,self()), - NextMax = max(Max, hanoi_level:unmerged_count(Level)), + MergeWork = MergeWork0 + hanoi_level:unmerged_count(Level), - {Level, NextMax} + {Level, MergeWork} end, {undefined, 0}, lists:seq(MaxLevel, min(?TOP_LEVEL, MinLevel), -1)), @@ -262,8 +262,10 @@ parse_level(FileName) -> end. -handle_info({bottom_level, N}, State) when N > State#state.max_level -> - {noreply,State#state{ max_level = N }}; +handle_info({bottom_level, N}, #state{ nursery=Nursery }=State) + when N > State#state.max_level -> + {noreply,State#state{ max_level = N, + nursery= hanoi_nursery:set_max_level(Nursery, N) }}; handle_info(Info,State) -> error_logger:error_msg("Unknown info ~p~n", [Info]), @@ -333,9 +335,9 @@ do_put(Key, Value, State=#state{ nursery=Nursery, top=Top }) -> {ok, Nursery2} = hanoi_nursery:add_maybe_flush(Key, Value, Nursery, Top), {ok, State#state{ nursery=Nursery2 }}. -flush_nursery(State=#state{nursery=Nursery, top=Top, dir=Dir}) -> +flush_nursery(State=#state{nursery=Nursery, top=Top, dir=Dir, max_level=MaxLevel}) -> ok = hanoi_nursery:finish(Nursery, Top), - {ok, Nursery2} = hanoi_nursery:new(Dir), + {ok, Nursery2} = hanoi_nursery:new(Dir, MaxLevel), {ok, State#state{ nursery=Nursery2 }}. start_app() -> diff --git a/src/hanoi_level.erl b/src/hanoi_level.erl index 2a9d732..8acf51a 100644 --- a/src/hanoi_level.erl +++ b/src/hanoi_level.erl @@ -50,7 +50,7 @@ -record(state, { a, b, c, next, dir, level, inject_done_ref, merge_pid, folding = [], step_next_ref, step_caller, step_merge_ref, - opts = [], owner + opts = [], owner, work_done=0 }). %%%%% PUBLIC OPERATIONS @@ -209,7 +209,7 @@ initialize2(State) -> check_begin_merge_then_loop(State=#state{a=BT1, b=BT2, merge_pid=undefined}) when BT1/=undefined, BT2 /= undefined -> {ok, MergePID} = begin_merge(State), - main_loop(State#state{merge_pid=MergePID }); + main_loop(State#state{merge_pid=MergePID,work_done=0 }); check_begin_merge_then_loop(State) -> main_loop(State). @@ -247,13 +247,7 @@ main_loop(State = #state{ next=Next }) -> ?REQ(From, unmerged_count) -> - Files = - (if State#state.b == undefined -> 0; true -> 1 end) - + (if State#state.c == undefined -> 0; true -> 1 end), - - Amount = Files * ?BTREE_SIZE( State#state.level ), - - reply(From, Amount), + reply(From, total_unmerged(State)), main_loop(State); %% replies OK when there is no current step in progress @@ -261,7 +255,11 @@ main_loop(State = #state{ next=Next }) -> when State#state.step_merge_ref == undefined, State#state.step_next_ref == undefined -> reply(From, ok), - self() ! ?REQ(undefined, {step, HowMuch}), + if HowMuch > 0 -> + self() ! ?REQ(undefined, {step, HowMuch}); + true -> + ok + end, main_loop(State); %% accept step any time there is not an outstanding step @@ -271,30 +269,37 @@ main_loop(State = #state{ next=Next }) -> State#state.step_next_ref == undefined -> + WorkLeftHere = max(0,total_unmerged(State) - State#state.work_done), + WorkToDoHere = min(WorkLeftHere, HowMuch), + DelegateWork = max(0,HowMuch - WorkToDoHere), + %% delegate the step request to the next level - if Next =:= undefined -> + if Next =:= undefined; DelegateWork == 0 -> DelegateRef = undefined; true -> - DelegateRef = send_request(Next, {step, HowMuch}) + DelegateRef = send_request(Next, {step, DelegateWork}) end, - %% if there is a merge worker, send him a step message - case State#state.merge_pid of - undefined -> - MergeRef = undefined; - MergePID -> + if (State#state.merge_pid == undefined) + orelse (WorkToDoHere =< 0) -> + MergeRef = undefined, + NewWorkDone = State#state.work_done; + true -> + MergePID = State#state.merge_pid, MergeRef = monitor(process, MergePID), - MergePID ! {step, {self(), MergeRef}, HowMuch} + MergePID ! {step, {self(), MergeRef}, WorkToDoHere}, + NewWorkDone = State#state.work_done + WorkToDoHere end, if (Next =:= undefined) andalso (MergeRef =:= undefined) -> %% nothing to do ... just return OK State2 = reply_step_ok(State#state { step_caller = StepFrom }), - main_loop(State2); + main_loop(State2#state { work_done = NewWorkDone }); true -> main_loop(State#state{ step_next_ref=DelegateRef, step_caller=StepFrom, - step_merge_ref=MergeRef + step_merge_ref=MergeRef, + work_done = NewWorkDone }) end; @@ -545,6 +550,15 @@ reply_step_ok(State) -> end, State#state{ step_caller=undefined }. +total_unmerged(State) -> + Files = + (if State#state.b == undefined -> 0; true -> 1 end) + + (if State#state.c == undefined -> 0; true -> 1 end), + + Files * ?BTREE_SIZE( State#state.level ). + + + do_lookup(_Key, []) -> not_found; do_lookup(_Key, [Pid]) when is_pid(Pid) -> diff --git a/src/hanoi_nursery.erl b/src/hanoi_nursery.erl index 87479a1..84282f5 100644 --- a/src/hanoi_nursery.erl +++ b/src/hanoi_nursery.erl @@ -25,38 +25,40 @@ -module(hanoi_nursery). -author('Kresten Krab Thorup '). --export([new/1, recover/2, add/3, finish/2, lookup/2, add_maybe_flush/4]). --export([do_level_fold/3]). +-export([new/2, recover/3, add/3, finish/2, lookup/2, add_maybe_flush/4]). +-export([do_level_fold/3, set_max_level/2]). -include("include/hanoi.hrl"). -include("hanoi.hrl"). -include_lib("kernel/include/file.hrl"). --record(nursery, { log_file, dir, cache, total_size=0, count=0, last_sync=now() }). +-record(nursery, { log_file, dir, cache, total_size=0, count=0, + last_sync=now(), max_level }). --spec new(string()) -> {ok, #nursery{}} | {error, term()}. +-spec new(string(), integer()) -> {ok, #nursery{}} | {error, term()}. -define(LOGFILENAME(Dir), filename:join(Dir, "nursery.log")). -new(Directory) -> +new(Directory, MaxLevel) -> {ok, File} = file:open( ?LOGFILENAME(Directory), [raw, exclusive, write, delayed_write, append]), - {ok, #nursery{ log_file=File, dir=Directory, cache= gb_trees:empty() }}. + {ok, #nursery{ log_file=File, dir=Directory, cache= gb_trees:empty(), + max_level=MaxLevel}}. -recover(Directory, TopLevel) -> +recover(Directory, TopLevel, MaxLevel) -> case file:read_file_info( ?LOGFILENAME(Directory) ) of {ok, _} -> - ok = do_recover(Directory, TopLevel), - new(Directory); + ok = do_recover(Directory, TopLevel, MaxLevel), + new(Directory, MaxLevel); {error, enoent} -> - new(Directory) + new(Directory, MaxLevel) end. -do_recover(Directory, TopLevel) -> +do_recover(Directory, TopLevel, MaxLevel) -> %% repair the log file; storing it in nursery2 LogFileName = ?LOGFILENAME(Directory), - {ok, Nursery} = read_nursery_from_log(Directory), + {ok, Nursery} = read_nursery_from_log(Directory, MaxLevel), ok = finish(Nursery, TopLevel), @@ -65,11 +67,11 @@ do_recover(Directory, TopLevel) -> ok. -read_nursery_from_log(Directory) -> +read_nursery_from_log(Directory, MaxLevel) -> {ok, LogFile} = file:open( ?LOGFILENAME(Directory), [raw, read, read_ahead, binary] ), {ok, Cache} = load_good_chunks(LogFile, gb_trees:empty()), ok = file:close(LogFile), - {ok, #nursery{ dir=Directory, cache=Cache, count=gb_trees:size(Cache) }}. + {ok, #nursery{ dir=Directory, cache=Cache, count=gb_trees:size(Cache), max_level=MaxLevel }}. %% Just read the log file into a cache (gb_tree). %% If any errors happen here, then we simply ignore them and return @@ -148,7 +150,10 @@ lookup(Key, #nursery{ cache=Cache }) -> % Finish this nursery (encode it to a btree, and delete the nursery file) % @end -spec finish(Nursery::#nursery{}, TopLevel::pid()) -> ok. -finish(#nursery{ dir=Dir, cache=Cache, log_file=LogFile, total_size=_TotalSize, count=Count }, TopLevel) -> +finish(#nursery{ dir=Dir, cache=Cache, log_file=LogFile, + total_size=_TotalSize, count=Count, + max_level=MaxLevel + }, TopLevel) -> %% first, close the log file (if it is open) if LogFile /= undefined -> @@ -181,7 +186,8 @@ finish(#nursery{ dir=Dir, cache=Cache, log_file=LogFile, total_size=_TotalSize, %% issue some work if this is a top-level inject (blocks until previous such %% incremental merge is finished). - hanoi_level:incremental_merge(TopLevel, ?BTREE_SIZE(?TOP_LEVEL)), + hanoi_level:incremental_merge(TopLevel, + (MaxLevel-?TOP_LEVEL+1)*?BTREE_SIZE(?TOP_LEVEL)), ok; _ -> @@ -193,14 +199,14 @@ finish(#nursery{ dir=Dir, cache=Cache, log_file=LogFile, total_size=_TotalSize, file:delete(LogFileName), ok. -add_maybe_flush(Key, Value, Nursery=#nursery{ dir=Dir }, Top) -> +add_maybe_flush(Key, Value, Nursery=#nursery{ dir=Dir, max_level=MaxLevel }, Top) -> case add(Nursery, Key, Value) of {ok, _} = OK -> OK; {full, Nursery2} -> ok = hanoi_nursery:finish(Nursery2, Top), {error, enoent} = file:read_file_info( filename:join(Dir, "nursery.log")), - hanoi_nursery:new(Dir) + hanoi_nursery:new(Dir, MaxLevel) end. do_level_fold(#nursery{ cache=Cache }, FoldWorkerPID, KeyRange) -> @@ -217,3 +223,6 @@ do_level_fold(#nursery{ cache=Cache }, FoldWorkerPID, KeyRange) -> gb_trees:to_list(Cache)), FoldWorkerPID ! {level_done, Ref}, ok. + +set_max_level(Nursery = #nursery{}, MaxLevel) -> + Nursery#nursery{ max_level = MaxLevel }.