Incremental merge refactor, step #2

Now incremental merge has a new strategy.
In stead of doing the same amount of merge
work at all levels, we now compute the total
merge work load, and do as much as possible
on the first level, subtract work done, and
delegate to the next level, etc.

The effect of this is that we do more IO on
fewer files, improving sequential-ness of
the workload involved in the incremental merge.
This commit is contained in:
Kresten Krab Thorup 2012-04-24 00:31:28 +02:00
parent 9fb8a5e73f
commit ff36e401b7
3 changed files with 72 additions and 47 deletions

View file

@ -197,13 +197,13 @@ init([Dir, Opts]) ->
case file:read_file_info(Dir) of
{ok, #file_info{ type=directory }} ->
{ok, TopLevel, MaxLevel} = open_levels(Dir,Opts),
{ok, Nursery} = hanoi_nursery:recover(Dir, TopLevel);
{ok, Nursery} = hanoi_nursery:recover(Dir, TopLevel, MaxLevel);
{error, E} when E =:= enoent ->
ok = file:make_dir(Dir),
{ok, TopLevel} = hanoi_level:open(Dir, ?TOP_LEVEL, undefined, Opts, self()),
MaxLevel = ?TOP_LEVEL,
{ok, Nursery} = hanoi_nursery:new(Dir)
{ok, Nursery} = hanoi_nursery:new(Dir, MaxLevel)
end,
{ok, #state{ top=TopLevel, dir=Dir, nursery=Nursery, opt=Opts, max_level=MaxLevel }}.
@ -233,12 +233,12 @@ open_levels(Dir,Options) ->
file:delete(filename:join(Dir,"nursery.data")),
{TopLevel, MaxMerge} =
lists:foldl( fun(LevelNo, {Prev, Max}) ->
lists:foldl( fun(LevelNo, {Prev, MergeWork0}) ->
{ok, Level} = hanoi_level:open(Dir,LevelNo,Prev,Options,self()),
NextMax = max(Max, hanoi_level:unmerged_count(Level)),
MergeWork = MergeWork0 + hanoi_level:unmerged_count(Level),
{Level, NextMax}
{Level, MergeWork}
end,
{undefined, 0},
lists:seq(MaxLevel, min(?TOP_LEVEL, MinLevel), -1)),
@ -262,8 +262,10 @@ parse_level(FileName) ->
end.
handle_info({bottom_level, N}, State) when N > State#state.max_level ->
{noreply,State#state{ max_level = N }};
handle_info({bottom_level, N}, #state{ nursery=Nursery }=State)
when N > State#state.max_level ->
{noreply,State#state{ max_level = N,
nursery= hanoi_nursery:set_max_level(Nursery, N) }};
handle_info(Info,State) ->
error_logger:error_msg("Unknown info ~p~n", [Info]),
@ -333,9 +335,9 @@ do_put(Key, Value, State=#state{ nursery=Nursery, top=Top }) ->
{ok, Nursery2} = hanoi_nursery:add_maybe_flush(Key, Value, Nursery, Top),
{ok, State#state{ nursery=Nursery2 }}.
flush_nursery(State=#state{nursery=Nursery, top=Top, dir=Dir}) ->
flush_nursery(State=#state{nursery=Nursery, top=Top, dir=Dir, max_level=MaxLevel}) ->
ok = hanoi_nursery:finish(Nursery, Top),
{ok, Nursery2} = hanoi_nursery:new(Dir),
{ok, Nursery2} = hanoi_nursery:new(Dir, MaxLevel),
{ok, State#state{ nursery=Nursery2 }}.
start_app() ->

View file

@ -50,7 +50,7 @@
-record(state, {
a, b, c, next, dir, level, inject_done_ref, merge_pid, folding = [],
step_next_ref, step_caller, step_merge_ref,
opts = [], owner
opts = [], owner, work_done=0
}).
%%%%% PUBLIC OPERATIONS
@ -209,7 +209,7 @@ initialize2(State) ->
check_begin_merge_then_loop(State=#state{a=BT1, b=BT2, merge_pid=undefined})
when BT1/=undefined, BT2 /= undefined ->
{ok, MergePID} = begin_merge(State),
main_loop(State#state{merge_pid=MergePID });
main_loop(State#state{merge_pid=MergePID,work_done=0 });
check_begin_merge_then_loop(State) ->
main_loop(State).
@ -247,13 +247,7 @@ main_loop(State = #state{ next=Next }) ->
?REQ(From, unmerged_count) ->
Files =
(if State#state.b == undefined -> 0; true -> 1 end)
+ (if State#state.c == undefined -> 0; true -> 1 end),
Amount = Files * ?BTREE_SIZE( State#state.level ),
reply(From, Amount),
reply(From, total_unmerged(State)),
main_loop(State);
%% replies OK when there is no current step in progress
@ -261,7 +255,11 @@ main_loop(State = #state{ next=Next }) ->
when State#state.step_merge_ref == undefined,
State#state.step_next_ref == undefined ->
reply(From, ok),
self() ! ?REQ(undefined, {step, HowMuch}),
if HowMuch > 0 ->
self() ! ?REQ(undefined, {step, HowMuch});
true ->
ok
end,
main_loop(State);
%% accept step any time there is not an outstanding step
@ -271,30 +269,37 @@ main_loop(State = #state{ next=Next }) ->
State#state.step_next_ref == undefined
->
WorkLeftHere = max(0,total_unmerged(State) - State#state.work_done),
WorkToDoHere = min(WorkLeftHere, HowMuch),
DelegateWork = max(0,HowMuch - WorkToDoHere),
%% delegate the step request to the next level
if Next =:= undefined ->
if Next =:= undefined; DelegateWork == 0 ->
DelegateRef = undefined;
true ->
DelegateRef = send_request(Next, {step, HowMuch})
DelegateRef = send_request(Next, {step, DelegateWork})
end,
%% if there is a merge worker, send him a step message
case State#state.merge_pid of
undefined ->
MergeRef = undefined;
MergePID ->
if (State#state.merge_pid == undefined)
orelse (WorkToDoHere =< 0) ->
MergeRef = undefined,
NewWorkDone = State#state.work_done;
true ->
MergePID = State#state.merge_pid,
MergeRef = monitor(process, MergePID),
MergePID ! {step, {self(), MergeRef}, HowMuch}
MergePID ! {step, {self(), MergeRef}, WorkToDoHere},
NewWorkDone = State#state.work_done + WorkToDoHere
end,
if (Next =:= undefined) andalso (MergeRef =:= undefined) ->
%% nothing to do ... just return OK
State2 = reply_step_ok(State#state { step_caller = StepFrom }),
main_loop(State2);
main_loop(State2#state { work_done = NewWorkDone });
true ->
main_loop(State#state{ step_next_ref=DelegateRef,
step_caller=StepFrom,
step_merge_ref=MergeRef
step_merge_ref=MergeRef,
work_done = NewWorkDone
})
end;
@ -545,6 +550,15 @@ reply_step_ok(State) ->
end,
State#state{ step_caller=undefined }.
total_unmerged(State) ->
Files =
(if State#state.b == undefined -> 0; true -> 1 end)
+ (if State#state.c == undefined -> 0; true -> 1 end),
Files * ?BTREE_SIZE( State#state.level ).
do_lookup(_Key, []) ->
not_found;
do_lookup(_Key, [Pid]) when is_pid(Pid) ->

View file

@ -25,38 +25,40 @@
-module(hanoi_nursery).
-author('Kresten Krab Thorup <krab@trifork.com>').
-export([new/1, recover/2, add/3, finish/2, lookup/2, add_maybe_flush/4]).
-export([do_level_fold/3]).
-export([new/2, recover/3, add/3, finish/2, lookup/2, add_maybe_flush/4]).
-export([do_level_fold/3, set_max_level/2]).
-include("include/hanoi.hrl").
-include("hanoi.hrl").
-include_lib("kernel/include/file.hrl").
-record(nursery, { log_file, dir, cache, total_size=0, count=0, last_sync=now() }).
-record(nursery, { log_file, dir, cache, total_size=0, count=0,
last_sync=now(), max_level }).
-spec new(string()) -> {ok, #nursery{}} | {error, term()}.
-spec new(string(), integer()) -> {ok, #nursery{}} | {error, term()}.
-define(LOGFILENAME(Dir), filename:join(Dir, "nursery.log")).
new(Directory) ->
new(Directory, MaxLevel) ->
{ok, File} = file:open( ?LOGFILENAME(Directory),
[raw, exclusive, write, delayed_write, append]),
{ok, #nursery{ log_file=File, dir=Directory, cache= gb_trees:empty() }}.
{ok, #nursery{ log_file=File, dir=Directory, cache= gb_trees:empty(),
max_level=MaxLevel}}.
recover(Directory, TopLevel) ->
recover(Directory, TopLevel, MaxLevel) ->
case file:read_file_info( ?LOGFILENAME(Directory) ) of
{ok, _} ->
ok = do_recover(Directory, TopLevel),
new(Directory);
ok = do_recover(Directory, TopLevel, MaxLevel),
new(Directory, MaxLevel);
{error, enoent} ->
new(Directory)
new(Directory, MaxLevel)
end.
do_recover(Directory, TopLevel) ->
do_recover(Directory, TopLevel, MaxLevel) ->
%% repair the log file; storing it in nursery2
LogFileName = ?LOGFILENAME(Directory),
{ok, Nursery} = read_nursery_from_log(Directory),
{ok, Nursery} = read_nursery_from_log(Directory, MaxLevel),
ok = finish(Nursery, TopLevel),
@ -65,11 +67,11 @@ do_recover(Directory, TopLevel) ->
ok.
read_nursery_from_log(Directory) ->
read_nursery_from_log(Directory, MaxLevel) ->
{ok, LogFile} = file:open( ?LOGFILENAME(Directory), [raw, read, read_ahead, binary] ),
{ok, Cache} = load_good_chunks(LogFile, gb_trees:empty()),
ok = file:close(LogFile),
{ok, #nursery{ dir=Directory, cache=Cache, count=gb_trees:size(Cache) }}.
{ok, #nursery{ dir=Directory, cache=Cache, count=gb_trees:size(Cache), max_level=MaxLevel }}.
%% Just read the log file into a cache (gb_tree).
%% If any errors happen here, then we simply ignore them and return
@ -148,7 +150,10 @@ lookup(Key, #nursery{ cache=Cache }) ->
% Finish this nursery (encode it to a btree, and delete the nursery file)
% @end
-spec finish(Nursery::#nursery{}, TopLevel::pid()) -> ok.
finish(#nursery{ dir=Dir, cache=Cache, log_file=LogFile, total_size=_TotalSize, count=Count }, TopLevel) ->
finish(#nursery{ dir=Dir, cache=Cache, log_file=LogFile,
total_size=_TotalSize, count=Count,
max_level=MaxLevel
}, TopLevel) ->
%% first, close the log file (if it is open)
if LogFile /= undefined ->
@ -181,7 +186,8 @@ finish(#nursery{ dir=Dir, cache=Cache, log_file=LogFile, total_size=_TotalSize,
%% issue some work if this is a top-level inject (blocks until previous such
%% incremental merge is finished).
hanoi_level:incremental_merge(TopLevel, ?BTREE_SIZE(?TOP_LEVEL)),
hanoi_level:incremental_merge(TopLevel,
(MaxLevel-?TOP_LEVEL+1)*?BTREE_SIZE(?TOP_LEVEL)),
ok;
_ ->
@ -193,14 +199,14 @@ finish(#nursery{ dir=Dir, cache=Cache, log_file=LogFile, total_size=_TotalSize,
file:delete(LogFileName),
ok.
add_maybe_flush(Key, Value, Nursery=#nursery{ dir=Dir }, Top) ->
add_maybe_flush(Key, Value, Nursery=#nursery{ dir=Dir, max_level=MaxLevel }, Top) ->
case add(Nursery, Key, Value) of
{ok, _} = OK ->
OK;
{full, Nursery2} ->
ok = hanoi_nursery:finish(Nursery2, Top),
{error, enoent} = file:read_file_info( filename:join(Dir, "nursery.log")),
hanoi_nursery:new(Dir)
hanoi_nursery:new(Dir, MaxLevel)
end.
do_level_fold(#nursery{ cache=Cache }, FoldWorkerPID, KeyRange) ->
@ -217,3 +223,6 @@ do_level_fold(#nursery{ cache=Cache }, FoldWorkerPID, KeyRange) ->
gb_trees:to_list(Cache)),
FoldWorkerPID ! {level_done, Ref},
ok.
set_max_level(Nursery = #nursery{}, MaxLevel) ->
Nursery#nursery{ max_level = MaxLevel }.