Implement smaller incremental merge steps

Right now, this is controlled by the macro
INC_MERGE_STEP in hanoidb_nursery; eventually
we should turn this into a configuration option.

Making this small, (minimum is 1), hurts average perf
but reduces the 99.9 percentile latency.
This commit is contained in:
Kresten Krab Thorup 2012-05-11 22:31:23 +02:00
parent 1b42172cbe
commit 669f589d0c
3 changed files with 40 additions and 21 deletions

View file

@ -316,7 +316,7 @@ do_merge(TopLevel, _Inc, N) when N =< 0 ->
ok = hanoidb_level:await_incremental_merge(TopLevel); ok = hanoidb_level:await_incremental_merge(TopLevel);
do_merge(TopLevel, Inc, N) -> do_merge(TopLevel, Inc, N) ->
ok = hanoidb_level:begin_incremental_merge(TopLevel), ok = hanoidb_level:begin_incremental_merge(TopLevel, ?BTREE_SIZE(?TOP_LEVEL)),
do_merge(TopLevel, Inc, N-Inc). do_merge(TopLevel, Inc, N-Inc).

View file

@ -45,7 +45,7 @@
-export([data_vsn/0, code_change/3]). -export([data_vsn/0, code_change/3]).
-export([open/5, lookup/2, lookup/3, inject/2, close/1, snapshot_range/3, blocking_range/3, -export([open/5, lookup/2, lookup/3, inject/2, close/1, snapshot_range/3, blocking_range/3,
begin_incremental_merge/1, await_incremental_merge/1, set_max_level/2, begin_incremental_merge/2, await_incremental_merge/1, set_max_level/2,
unmerged_count/1, destroy/1]). unmerged_count/1, destroy/1]).
-include_lib("kernel/include/file.hrl"). -include_lib("kernel/include/file.hrl").
@ -99,8 +99,8 @@ inject(Ref, FileName) ->
Result = plain_rpc:call(Ref, {inject, FileName}), Result = plain_rpc:call(Ref, {inject, FileName}),
Result. Result.
begin_incremental_merge(Ref) -> begin_incremental_merge(Ref, StepSize) ->
plain_rpc:call(Ref, begin_incremental_merge). plain_rpc:call(Ref, {begin_incremental_merge, StepSize}).
await_incremental_merge(Ref) -> await_incremental_merge(Ref) ->
plain_rpc:call(Ref, await_incremental_merge). plain_rpc:call(Ref, await_incremental_merge).
@ -334,11 +334,11 @@ main_loop(State = #state{ next=Next }) ->
main_loop(State#state{ max_level=Max }); main_loop(State#state{ max_level=Max });
%% replies OK when there is no current step in progress %% replies OK when there is no current step in progress
?CALL(From, begin_incremental_merge) ?CALL(From, {begin_incremental_merge, StepSize})
when State#state.step_merge_ref == undefined, when State#state.step_merge_ref == undefined,
State#state.step_next_ref == undefined -> State#state.step_next_ref == undefined ->
plain_rpc:send_reply(From, ok), plain_rpc:send_reply(From, ok),
do_step(undefined, 0, State); do_step(undefined, 0, StepSize, State);
?CALL(From, await_incremental_merge) ?CALL(From, await_incremental_merge)
when State#state.step_merge_ref == undefined, when State#state.step_merge_ref == undefined,
@ -347,12 +347,12 @@ main_loop(State = #state{ next=Next }) ->
main_loop(State); main_loop(State);
%% accept step any time there is not an outstanding step %% accept step any time there is not an outstanding step
?CALL(StepFrom, {step_level, DoneWork}) ?CALL(StepFrom, {step_level, DoneWork, StepSize})
when State#state.step_merge_ref == undefined, when State#state.step_merge_ref == undefined,
State#state.step_caller == undefined, State#state.step_caller == undefined,
State#state.step_next_ref == undefined State#state.step_next_ref == undefined
-> ->
do_step(StepFrom, DoneWork, State); do_step(StepFrom, DoneWork, StepSize, State);
{MRef, step_done} when MRef == State#state.step_merge_ref -> {MRef, step_done} when MRef == State#state.step_merge_ref ->
demonitor(MRef, [flush]), demonitor(MRef, [flush]),
@ -649,13 +649,13 @@ main_loop(State = #state{ next=Next }) ->
end. end.
do_step(StepFrom, PreviousWork, State) -> do_step(StepFrom, PreviousWork, StepSize, State) ->
if (State#state.b =/= undefined) andalso (State#state.merge_pid =/= undefined) -> if (State#state.b =/= undefined) andalso (State#state.merge_pid =/= undefined) ->
WorkLeftHere = max(0, (2 * ?BTREE_SIZE(State#state.level)) - State#state.work_done); WorkLeftHere = max(0, (2 * ?BTREE_SIZE(State#state.level)) - State#state.work_done);
true -> true ->
WorkLeftHere = 0 WorkLeftHere = 0
end, end,
WorkUnit = ?BTREE_SIZE(?TOP_LEVEL), WorkUnit = StepSize,
MaxLevel = max(State#state.max_level, State#state.level), MaxLevel = max(State#state.max_level, State#state.level),
TotalWork = (MaxLevel-?TOP_LEVEL+1) * WorkUnit, TotalWork = (MaxLevel-?TOP_LEVEL+1) * WorkUnit,
WorkUnitsLeft = max(0, TotalWork-PreviousWork), WorkUnitsLeft = max(0, TotalWork-PreviousWork),
@ -676,7 +676,7 @@ do_step(StepFrom, PreviousWork, State) ->
if Next =:= undefined -> if Next =:= undefined ->
DelegateRef = undefined; DelegateRef = undefined;
true -> true ->
DelegateRef = plain_rpc:send_call(Next, {step_level, WorkIncludingHere}) DelegateRef = plain_rpc:send_call(Next, {step_level, WorkIncludingHere, StepSize})
end, end,
if WorkToDoHere > 0 -> if WorkToDoHere > 0 ->

View file

@ -25,7 +25,7 @@
-module(hanoidb_nursery). -module(hanoidb_nursery).
-author('Kresten Krab Thorup <krab@trifork.com>'). -author('Kresten Krab Thorup <krab@trifork.com>').
-export([new/3, recover/4, add/3, finish/2, lookup/2, add_maybe_flush/4]). -export([new/3, recover/4, finish/2, lookup/2, add_maybe_flush/4]).
-export([do_level_fold/3, set_max_level/2, transact/3, destroy/1]). -export([do_level_fold/3, set_max_level/2, transact/3, destroy/1]).
-include("include/hanoidb.hrl"). -include("include/hanoidb.hrl").
@ -33,12 +33,17 @@
-include_lib("kernel/include/file.hrl"). -include_lib("kernel/include/file.hrl").
-record(nursery, { log_file, dir, cache, total_size=0, count=0, -record(nursery, { log_file, dir, cache, total_size=0, count=0,
last_sync=now(), max_level, config=[] }). last_sync=now(), max_level, config=[], step=0, merge_done=0 }).
-spec new(string(), integer(), [_]) -> {ok, #nursery{}} | {error, term()}. -spec new(string(), integer(), [_]) -> {ok, #nursery{}} | {error, term()}.
-define(LOGFILENAME(Dir), filename:join(Dir, "nursery.log")). -define(LOGFILENAME(Dir), filename:join(Dir, "nursery.log")).
%% do incremental merge every this many inserts
%% this value *must* be less than or equal to
%% 2^TOP_LEVEL == ?BTREE_SIZE(?TOP_LEVEL)
-define(INC_MERGE_STEP, ?BTREE_SIZE(?TOP_LEVEL)/2).
new(Directory, MaxLevel, Config) -> new(Directory, MaxLevel, Config) ->
hanoidb_util:ensure_expiry(Config), hanoidb_util:ensure_expiry(Config),
@ -92,8 +97,8 @@ read_nursery_from_log(Directory, MaxLevel, Config) ->
% @doc % @doc
% Add a Key/Value to the nursery % Add a Key/Value to the nursery
% @end % @end
-spec add(#nursery{}, binary(), binary()|?TOMBSTONE) -> {ok, #nursery{}}. -spec add(#nursery{}, binary(), binary()|?TOMBSTONE, pid()) -> {ok, #nursery{}}.
add(Nursery=#nursery{ log_file=File, cache=Cache, total_size=TotalSize, count=Count, config=Config }, Key, Value) -> add(Nursery=#nursery{ log_file=File, cache=Cache, total_size=TotalSize, count=Count, config=Config }, Key, Value, Top) ->
ExpiryTime = hanoidb_util:expiry_time(Config), ExpiryTime = hanoidb_util:expiry_time(Config),
TStamp = hanoidb_util:tstamp(), TStamp = hanoidb_util:tstamp(),
@ -112,7 +117,9 @@ add(Nursery=#nursery{ log_file=File, cache=Cache, total_size=TotalSize, count=Co
Cache2 = gb_trees:enter(Key, {Value, TStamp}, Cache) Cache2 = gb_trees:enter(Key, {Value, TStamp}, Cache)
end, end,
Nursery2 = Nursery1#nursery{ cache=Cache2, total_size=TotalSize+erlang:iolist_size(Data), count=Count+1 }, {ok, Nursery2} =
do_inc_merge(Nursery1#nursery{ cache=Cache2, total_size=TotalSize+erlang:iolist_size(Data),
count=Count+1 }, 1, Top),
if if
Count+1 >= ?BTREE_SIZE(?TOP_LEVEL) -> Count+1 >= ?BTREE_SIZE(?TOP_LEVEL) ->
{full, Nursery2}; {full, Nursery2};
@ -159,7 +166,7 @@ lookup(Key, #nursery{ cache=Cache, config=Config }) ->
-spec finish(Nursery::#nursery{}, TopLevel::pid()) -> ok. -spec finish(Nursery::#nursery{}, TopLevel::pid()) -> ok.
finish(#nursery{ dir=Dir, cache=Cache, log_file=LogFile, finish(#nursery{ dir=Dir, cache=Cache, log_file=LogFile,
total_size=_TotalSize, count=Count, total_size=_TotalSize, count=Count,
config=Config config=Config, merge_done=DoneMerge
}, TopLevel) -> }, TopLevel) ->
hanoidb_util:ensure_expiry(Config), hanoidb_util:ensure_expiry(Config),
@ -196,7 +203,11 @@ finish(#nursery{ dir=Dir, cache=Cache, log_file=LogFile,
%% issue some work if this is a top-level inject (blocks until previous such %% issue some work if this is a top-level inject (blocks until previous such
%% incremental merge is finished). %% incremental merge is finished).
hanoidb_level:begin_incremental_merge(TopLevel); if DoneMerge >= ?BTREE_SIZE(?TOP_LEVEL) ->
ok;
true ->
hanoidb_level:begin_incremental_merge(TopLevel, ?BTREE_SIZE(?TOP_LEVEL) - DoneMerge)
end;
_ -> _ ->
ok ok
@ -221,8 +232,8 @@ destroy(#nursery{ dir=Dir, log_file=LogFile }) ->
add_maybe_flush(Key, Value, Nursery, Top) -> add_maybe_flush(Key, Value, Nursery, Top) ->
case add(Nursery, Key, Value) of case add(Nursery, Key, Value, Top) of
{ok, _} = OK -> {ok, _Nursery} = OK ->
OK; OK;
{full, Nursery2} -> {full, Nursery2} ->
flush(Nursery2, Top) flush(Nursery2, Top)
@ -263,8 +274,16 @@ transact(Spec, Nursery=#nursery{ log_file=File, cache=Cache0, total_size=TotalSi
Count = gb_trees:size(Cache2), Count = gb_trees:size(Cache2),
{ok, Nursery2#nursery{ cache=Cache2, total_size=TotalSize+byte_size(Data), count=Count }}. do_inc_merge(Nursery2#nursery{ cache=Cache2, total_size=TotalSize+byte_size(Data), count=Count },
length(Spec), Top).
do_inc_merge(Nursery=#nursery{ step=Step, merge_done=Done }, N, TopLevel) ->
if Step+N >= ?INC_MERGE_STEP ->
hanoidb_level:begin_incremental_merge(TopLevel, Step+N),
{ok, Nursery#nursery{ step=0, merge_done=Done+Step+N }};
true ->
{ok, Nursery#nursery{ step=Step+N }}
end.
do_level_fold(#nursery{ cache=Cache, config=Config }, FoldWorkerPID, KeyRange) -> do_level_fold(#nursery{ cache=Cache, config=Config }, FoldWorkerPID, KeyRange) ->
Ref = erlang:make_ref(), Ref = erlang:make_ref(),