diff --git a/src/hanoi.erl b/src/hanoi.erl index 8758049..27c0aaf 100644 --- a/src/hanoi.erl +++ b/src/hanoi.erl @@ -231,15 +231,24 @@ open_levels(Dir,Options) -> %% remove old nursery file file:delete(filename:join(Dir,"nursery.data")), - TopLevel = - lists:foldl( fun(LevelNo, Prev) -> + {TopLevel, MaxMerge} = + lists:foldl( fun(LevelNo, {Prev, Max}) -> {ok, Level} = hanoi_level:open(Dir,LevelNo,Prev,Options), - Level - end, - undefined, - lists:seq(MaxLevel, MinLevel, -1)), - ok = hanoi_level:incremental_merge(TopLevel, 2*?BTREE_SIZE(MaxLevel)), + NextMax = max(Max, hanoi_level:unmerged_count(Level)), + + {Level, NextMax} + end, + {undefined, 0}, + lists:seq(MaxLevel, ?TOP_LEVEL, -1)), + + %% we need to do this much merge work before we can guarantee + %% response times ... this is the amount of "in flight" merging + %% we lost when the hanoi store was closed. + ok = hanoi_level:incremental_merge(TopLevel, MaxMerge), + + %% second incremental merge blocks until the previous is done + ok = hanoi_level:incremental_merge(TopLevel, 0), {ok, TopLevel}. diff --git a/src/hanoi_level.erl b/src/hanoi_level.erl index 1774b67..b417b13 100644 --- a/src/hanoi_level.erl +++ b/src/hanoi_level.erl @@ -42,7 +42,8 @@ -behavior(plain_fsm). -export([data_vsn/0, code_change/3]). --export([open/4, lookup/2, inject/2, close/1, snapshot_range/3, blocking_range/3, incremental_merge/2]). +-export([open/4, lookup/2, inject/2, close/1, snapshot_range/3, blocking_range/3, + incremental_merge/2, unmerged_count/1]). -include_lib("kernel/include/file.hrl"). @@ -72,6 +73,9 @@ inject(Ref, FileName) -> incremental_merge(Ref,HowMuch) -> call(Ref, {incremental_merge, HowMuch}). +unmerged_count(Ref) -> + call(Ref, unmerged_count). + close(Ref) -> try call(Ref, close) @@ -242,6 +246,16 @@ main_loop(State = #state{ next=Next }) -> check_begin_merge_then_loop(setelement(SetPos, State, BT)); + ?REQ(From, unmerged_count) -> + Files = + (if State#state.b == undefined -> 0; true -> 1 end) + + (if State#state.c == undefined -> 0; true -> 1 end), + + Amount = Files * ?BTREE_SIZE( State#state.level ), + + reply(From, Amount), + main_loop(State); + %% replies OK when there is no current step in progress ?REQ(From, {incremental_merge, HowMuch}) when State#state.step_merge_ref == undefined,