From ff36e401b758d94b575a14b12ed81228f2af5ea2 Mon Sep 17 00:00:00 2001 From: Kresten Krab Thorup Date: Tue, 24 Apr 2012 00:31:28 +0200 Subject: [PATCH] Incremental merge refactor, step #2 Now incremental merge has a new strategy. In stead of doing the same amount of merge work at all levels, we now compute the total merge work load, and do as much as possible on the first level, subtract work done, and delegate to the next level, etc. The effect of this is that we do more IO on fewer files, improving sequential-ness of the workload involved in the incremental merge. --- src/hanoi.erl | 20 ++++++++-------- src/hanoi_level.erl | 54 +++++++++++++++++++++++++++---------------- src/hanoi_nursery.erl | 45 +++++++++++++++++++++--------------- 3 files changed, 72 insertions(+), 47 deletions(-) diff --git a/src/hanoi.erl b/src/hanoi.erl index 8513ed9..d80d966 100644 --- a/src/hanoi.erl +++ b/src/hanoi.erl @@ -197,13 +197,13 @@ init([Dir, Opts]) -> case file:read_file_info(Dir) of {ok, #file_info{ type=directory }} -> {ok, TopLevel, MaxLevel} = open_levels(Dir,Opts), - {ok, Nursery} = hanoi_nursery:recover(Dir, TopLevel); + {ok, Nursery} = hanoi_nursery:recover(Dir, TopLevel, MaxLevel); {error, E} when E =:= enoent -> ok = file:make_dir(Dir), {ok, TopLevel} = hanoi_level:open(Dir, ?TOP_LEVEL, undefined, Opts, self()), MaxLevel = ?TOP_LEVEL, - {ok, Nursery} = hanoi_nursery:new(Dir) + {ok, Nursery} = hanoi_nursery:new(Dir, MaxLevel) end, {ok, #state{ top=TopLevel, dir=Dir, nursery=Nursery, opt=Opts, max_level=MaxLevel }}. @@ -233,12 +233,12 @@ open_levels(Dir,Options) -> file:delete(filename:join(Dir,"nursery.data")), {TopLevel, MaxMerge} = - lists:foldl( fun(LevelNo, {Prev, Max}) -> + lists:foldl( fun(LevelNo, {Prev, MergeWork0}) -> {ok, Level} = hanoi_level:open(Dir,LevelNo,Prev,Options,self()), - NextMax = max(Max, hanoi_level:unmerged_count(Level)), + MergeWork = MergeWork0 + hanoi_level:unmerged_count(Level), - {Level, NextMax} + {Level, MergeWork} end, {undefined, 0}, lists:seq(MaxLevel, min(?TOP_LEVEL, MinLevel), -1)), @@ -262,8 +262,10 @@ parse_level(FileName) -> end. -handle_info({bottom_level, N}, State) when N > State#state.max_level -> - {noreply,State#state{ max_level = N }}; +handle_info({bottom_level, N}, #state{ nursery=Nursery }=State) + when N > State#state.max_level -> + {noreply,State#state{ max_level = N, + nursery= hanoi_nursery:set_max_level(Nursery, N) }}; handle_info(Info,State) -> error_logger:error_msg("Unknown info ~p~n", [Info]), @@ -333,9 +335,9 @@ do_put(Key, Value, State=#state{ nursery=Nursery, top=Top }) -> {ok, Nursery2} = hanoi_nursery:add_maybe_flush(Key, Value, Nursery, Top), {ok, State#state{ nursery=Nursery2 }}. -flush_nursery(State=#state{nursery=Nursery, top=Top, dir=Dir}) -> +flush_nursery(State=#state{nursery=Nursery, top=Top, dir=Dir, max_level=MaxLevel}) -> ok = hanoi_nursery:finish(Nursery, Top), - {ok, Nursery2} = hanoi_nursery:new(Dir), + {ok, Nursery2} = hanoi_nursery:new(Dir, MaxLevel), {ok, State#state{ nursery=Nursery2 }}. start_app() -> diff --git a/src/hanoi_level.erl b/src/hanoi_level.erl index 2a9d732..8acf51a 100644 --- a/src/hanoi_level.erl +++ b/src/hanoi_level.erl @@ -50,7 +50,7 @@ -record(state, { a, b, c, next, dir, level, inject_done_ref, merge_pid, folding = [], step_next_ref, step_caller, step_merge_ref, - opts = [], owner + opts = [], owner, work_done=0 }). %%%%% PUBLIC OPERATIONS @@ -209,7 +209,7 @@ initialize2(State) -> check_begin_merge_then_loop(State=#state{a=BT1, b=BT2, merge_pid=undefined}) when BT1/=undefined, BT2 /= undefined -> {ok, MergePID} = begin_merge(State), - main_loop(State#state{merge_pid=MergePID }); + main_loop(State#state{merge_pid=MergePID,work_done=0 }); check_begin_merge_then_loop(State) -> main_loop(State). @@ -247,13 +247,7 @@ main_loop(State = #state{ next=Next }) -> ?REQ(From, unmerged_count) -> - Files = - (if State#state.b == undefined -> 0; true -> 1 end) - + (if State#state.c == undefined -> 0; true -> 1 end), - - Amount = Files * ?BTREE_SIZE( State#state.level ), - - reply(From, Amount), + reply(From, total_unmerged(State)), main_loop(State); %% replies OK when there is no current step in progress @@ -261,7 +255,11 @@ main_loop(State = #state{ next=Next }) -> when State#state.step_merge_ref == undefined, State#state.step_next_ref == undefined -> reply(From, ok), - self() ! ?REQ(undefined, {step, HowMuch}), + if HowMuch > 0 -> + self() ! ?REQ(undefined, {step, HowMuch}); + true -> + ok + end, main_loop(State); %% accept step any time there is not an outstanding step @@ -271,30 +269,37 @@ main_loop(State = #state{ next=Next }) -> State#state.step_next_ref == undefined -> + WorkLeftHere = max(0,total_unmerged(State) - State#state.work_done), + WorkToDoHere = min(WorkLeftHere, HowMuch), + DelegateWork = max(0,HowMuch - WorkToDoHere), + %% delegate the step request to the next level - if Next =:= undefined -> + if Next =:= undefined; DelegateWork == 0 -> DelegateRef = undefined; true -> - DelegateRef = send_request(Next, {step, HowMuch}) + DelegateRef = send_request(Next, {step, DelegateWork}) end, - %% if there is a merge worker, send him a step message - case State#state.merge_pid of - undefined -> - MergeRef = undefined; - MergePID -> + if (State#state.merge_pid == undefined) + orelse (WorkToDoHere =< 0) -> + MergeRef = undefined, + NewWorkDone = State#state.work_done; + true -> + MergePID = State#state.merge_pid, MergeRef = monitor(process, MergePID), - MergePID ! {step, {self(), MergeRef}, HowMuch} + MergePID ! {step, {self(), MergeRef}, WorkToDoHere}, + NewWorkDone = State#state.work_done + WorkToDoHere end, if (Next =:= undefined) andalso (MergeRef =:= undefined) -> %% nothing to do ... just return OK State2 = reply_step_ok(State#state { step_caller = StepFrom }), - main_loop(State2); + main_loop(State2#state { work_done = NewWorkDone }); true -> main_loop(State#state{ step_next_ref=DelegateRef, step_caller=StepFrom, - step_merge_ref=MergeRef + step_merge_ref=MergeRef, + work_done = NewWorkDone }) end; @@ -545,6 +550,15 @@ reply_step_ok(State) -> end, State#state{ step_caller=undefined }. +total_unmerged(State) -> + Files = + (if State#state.b == undefined -> 0; true -> 1 end) + + (if State#state.c == undefined -> 0; true -> 1 end), + + Files * ?BTREE_SIZE( State#state.level ). + + + do_lookup(_Key, []) -> not_found; do_lookup(_Key, [Pid]) when is_pid(Pid) -> diff --git a/src/hanoi_nursery.erl b/src/hanoi_nursery.erl index 87479a1..84282f5 100644 --- a/src/hanoi_nursery.erl +++ b/src/hanoi_nursery.erl @@ -25,38 +25,40 @@ -module(hanoi_nursery). -author('Kresten Krab Thorup '). --export([new/1, recover/2, add/3, finish/2, lookup/2, add_maybe_flush/4]). --export([do_level_fold/3]). +-export([new/2, recover/3, add/3, finish/2, lookup/2, add_maybe_flush/4]). +-export([do_level_fold/3, set_max_level/2]). -include("include/hanoi.hrl"). -include("hanoi.hrl"). -include_lib("kernel/include/file.hrl"). --record(nursery, { log_file, dir, cache, total_size=0, count=0, last_sync=now() }). +-record(nursery, { log_file, dir, cache, total_size=0, count=0, + last_sync=now(), max_level }). --spec new(string()) -> {ok, #nursery{}} | {error, term()}. +-spec new(string(), integer()) -> {ok, #nursery{}} | {error, term()}. -define(LOGFILENAME(Dir), filename:join(Dir, "nursery.log")). -new(Directory) -> +new(Directory, MaxLevel) -> {ok, File} = file:open( ?LOGFILENAME(Directory), [raw, exclusive, write, delayed_write, append]), - {ok, #nursery{ log_file=File, dir=Directory, cache= gb_trees:empty() }}. + {ok, #nursery{ log_file=File, dir=Directory, cache= gb_trees:empty(), + max_level=MaxLevel}}. -recover(Directory, TopLevel) -> +recover(Directory, TopLevel, MaxLevel) -> case file:read_file_info( ?LOGFILENAME(Directory) ) of {ok, _} -> - ok = do_recover(Directory, TopLevel), - new(Directory); + ok = do_recover(Directory, TopLevel, MaxLevel), + new(Directory, MaxLevel); {error, enoent} -> - new(Directory) + new(Directory, MaxLevel) end. -do_recover(Directory, TopLevel) -> +do_recover(Directory, TopLevel, MaxLevel) -> %% repair the log file; storing it in nursery2 LogFileName = ?LOGFILENAME(Directory), - {ok, Nursery} = read_nursery_from_log(Directory), + {ok, Nursery} = read_nursery_from_log(Directory, MaxLevel), ok = finish(Nursery, TopLevel), @@ -65,11 +67,11 @@ do_recover(Directory, TopLevel) -> ok. -read_nursery_from_log(Directory) -> +read_nursery_from_log(Directory, MaxLevel) -> {ok, LogFile} = file:open( ?LOGFILENAME(Directory), [raw, read, read_ahead, binary] ), {ok, Cache} = load_good_chunks(LogFile, gb_trees:empty()), ok = file:close(LogFile), - {ok, #nursery{ dir=Directory, cache=Cache, count=gb_trees:size(Cache) }}. + {ok, #nursery{ dir=Directory, cache=Cache, count=gb_trees:size(Cache), max_level=MaxLevel }}. %% Just read the log file into a cache (gb_tree). %% If any errors happen here, then we simply ignore them and return @@ -148,7 +150,10 @@ lookup(Key, #nursery{ cache=Cache }) -> % Finish this nursery (encode it to a btree, and delete the nursery file) % @end -spec finish(Nursery::#nursery{}, TopLevel::pid()) -> ok. -finish(#nursery{ dir=Dir, cache=Cache, log_file=LogFile, total_size=_TotalSize, count=Count }, TopLevel) -> +finish(#nursery{ dir=Dir, cache=Cache, log_file=LogFile, + total_size=_TotalSize, count=Count, + max_level=MaxLevel + }, TopLevel) -> %% first, close the log file (if it is open) if LogFile /= undefined -> @@ -181,7 +186,8 @@ finish(#nursery{ dir=Dir, cache=Cache, log_file=LogFile, total_size=_TotalSize, %% issue some work if this is a top-level inject (blocks until previous such %% incremental merge is finished). - hanoi_level:incremental_merge(TopLevel, ?BTREE_SIZE(?TOP_LEVEL)), + hanoi_level:incremental_merge(TopLevel, + (MaxLevel-?TOP_LEVEL+1)*?BTREE_SIZE(?TOP_LEVEL)), ok; _ -> @@ -193,14 +199,14 @@ finish(#nursery{ dir=Dir, cache=Cache, log_file=LogFile, total_size=_TotalSize, file:delete(LogFileName), ok. -add_maybe_flush(Key, Value, Nursery=#nursery{ dir=Dir }, Top) -> +add_maybe_flush(Key, Value, Nursery=#nursery{ dir=Dir, max_level=MaxLevel }, Top) -> case add(Nursery, Key, Value) of {ok, _} = OK -> OK; {full, Nursery2} -> ok = hanoi_nursery:finish(Nursery2, Top), {error, enoent} = file:read_file_info( filename:join(Dir, "nursery.log")), - hanoi_nursery:new(Dir) + hanoi_nursery:new(Dir, MaxLevel) end. do_level_fold(#nursery{ cache=Cache }, FoldWorkerPID, KeyRange) -> @@ -217,3 +223,6 @@ do_level_fold(#nursery{ cache=Cache }, FoldWorkerPID, KeyRange) -> gb_trees:to_list(Cache)), FoldWorkerPID ! {level_done, Ref}, ok. + +set_max_level(Nursery = #nursery{}, MaxLevel) -> + Nursery#nursery{ max_level = MaxLevel }.