Improve recovery

This improves recovery two-fold:

1. make sure that we actually wait for initial
   merge to complete (issue incremental_merge(0))

2. compute minimum required merge work for merge
   to establish invariant that there's room
   for a new nursery inject any time.
This commit is contained in:
Kresten Krab Thorup 2012-04-23 13:51:25 +02:00
parent 8796575053
commit 8b725cceaa
2 changed files with 31 additions and 8 deletions

View file

@ -231,15 +231,24 @@ open_levels(Dir,Options) ->
%% remove old nursery file %% remove old nursery file
file:delete(filename:join(Dir,"nursery.data")), file:delete(filename:join(Dir,"nursery.data")),
TopLevel = {TopLevel, MaxMerge} =
lists:foldl( fun(LevelNo, Prev) -> lists:foldl( fun(LevelNo, {Prev, Max}) ->
{ok, Level} = hanoi_level:open(Dir,LevelNo,Prev,Options), {ok, Level} = hanoi_level:open(Dir,LevelNo,Prev,Options),
Level
end,
undefined,
lists:seq(MaxLevel, MinLevel, -1)),
ok = hanoi_level:incremental_merge(TopLevel, 2*?BTREE_SIZE(MaxLevel)), NextMax = max(Max, hanoi_level:unmerged_count(Level)),
{Level, NextMax}
end,
{undefined, 0},
lists:seq(MaxLevel, ?TOP_LEVEL, -1)),
%% we need to do this much merge work before we can guarantee
%% response times ... this is the amount of "in flight" merging
%% we lost when the hanoi store was closed.
ok = hanoi_level:incremental_merge(TopLevel, MaxMerge),
%% second incremental merge blocks until the previous is done
ok = hanoi_level:incremental_merge(TopLevel, 0),
{ok, TopLevel}. {ok, TopLevel}.

View file

@ -42,7 +42,8 @@
-behavior(plain_fsm). -behavior(plain_fsm).
-export([data_vsn/0, code_change/3]). -export([data_vsn/0, code_change/3]).
-export([open/4, lookup/2, inject/2, close/1, snapshot_range/3, blocking_range/3, incremental_merge/2]). -export([open/4, lookup/2, inject/2, close/1, snapshot_range/3, blocking_range/3,
incremental_merge/2, unmerged_count/1]).
-include_lib("kernel/include/file.hrl"). -include_lib("kernel/include/file.hrl").
@ -72,6 +73,9 @@ inject(Ref, FileName) ->
incremental_merge(Ref,HowMuch) -> incremental_merge(Ref,HowMuch) ->
call(Ref, {incremental_merge, HowMuch}). call(Ref, {incremental_merge, HowMuch}).
unmerged_count(Ref) ->
call(Ref, unmerged_count).
close(Ref) -> close(Ref) ->
try try
call(Ref, close) call(Ref, close)
@ -242,6 +246,16 @@ main_loop(State = #state{ next=Next }) ->
check_begin_merge_then_loop(setelement(SetPos, State, BT)); check_begin_merge_then_loop(setelement(SetPos, State, BT));
?REQ(From, unmerged_count) ->
Files =
(if State#state.b == undefined -> 0; true -> 1 end)
+ (if State#state.c == undefined -> 0; true -> 1 end),
Amount = Files * ?BTREE_SIZE( State#state.level ),
reply(From, Amount),
main_loop(State);
%% replies OK when there is no current step in progress %% replies OK when there is no current step in progress
?REQ(From, {incremental_merge, HowMuch}) ?REQ(From, {incremental_merge, HowMuch})
when State#state.step_merge_ref == undefined, when State#state.step_merge_ref == undefined,