Improve incremental merge
This change makes incremental merge be concurrent with filling up the nursery. So in stead of waiting for an incremental merge to complete before returning from insert, it - blocks waiting for a possible previous incremental merge to complete - issues a new incremental merge. This improves put latencies, but not throughput.
This commit is contained in:
parent
ee90944c62
commit
6289602045
3 changed files with 42 additions and 24 deletions
|
@ -25,8 +25,8 @@
|
|||
-author('Kresten Krab Thorup <krab@trifork.com>').
|
||||
|
||||
|
||||
%% smallest levels are 8192 entries
|
||||
-define(TOP_LEVEL, 5).
|
||||
%% smallest levels are 128 entries
|
||||
-define(TOP_LEVEL, 7).
|
||||
-define(BTREE_SIZE(Level), (1 bsl (Level))).
|
||||
|
||||
-define(TOMBSTONE, 'deleted').
|
||||
|
|
|
@ -42,7 +42,7 @@
|
|||
-behavior(plain_fsm).
|
||||
-export([data_vsn/0, code_change/3]).
|
||||
|
||||
-export([open/3, lookup/2, inject/2, close/1, async_range/3, sync_range/3, step/2]).
|
||||
-export([open/3, lookup/2, inject/2, close/1, async_range/3, sync_range/3, incremental_merge/2]).
|
||||
|
||||
-include_lib("kernel/include/file.hrl").
|
||||
|
||||
|
@ -59,7 +59,7 @@ open(Dir,Level,Next) when Level>0 ->
|
|||
process_flag(trap_exit,true),
|
||||
initialize(#state{dir=Dir,level=Level,next=Next})
|
||||
end),
|
||||
step(PID, 2*?BTREE_SIZE(?TOP_LEVEL)),
|
||||
incremental_merge(PID, 2*?BTREE_SIZE(?TOP_LEVEL)),
|
||||
{ok, PID}.
|
||||
|
||||
lookup(Ref, Key) ->
|
||||
|
@ -69,8 +69,8 @@ inject(Ref, FileName) ->
|
|||
Result = call(Ref, {inject, FileName}),
|
||||
Result.
|
||||
|
||||
step(Ref,Howmuch) ->
|
||||
call(Ref,{step,Howmuch}).
|
||||
incremental_merge(Ref,HowMuch) ->
|
||||
call(Ref, {incremental_merge, HowMuch}).
|
||||
|
||||
close(Ref) ->
|
||||
try
|
||||
|
@ -241,11 +241,20 @@ main_loop(State = #state{ next=Next }) ->
|
|||
reply(From, ok),
|
||||
check_begin_merge_then_loop(setelement(SetPos, State, BT));
|
||||
|
||||
|
||||
%% replies OK when there is no current step in progress
|
||||
?REQ(From, {incremental_merge, HowMuch})
|
||||
when State#state.step_merge_ref == undefined,
|
||||
State#state.step_next_ref == undefined ->
|
||||
reply(From, ok),
|
||||
self() ! ?REQ(undefined, {step, HowMuch}),
|
||||
main_loop(State);
|
||||
|
||||
%% accept step any time there is not an outstanding step
|
||||
?REQ(StepFrom, {step, HowMuch})
|
||||
when State#state.step_merge_ref == undefined,
|
||||
State#state.step_caller == undefined,
|
||||
State#state.step_next_ref == undefined
|
||||
State#state.step_caller == undefined,
|
||||
State#state.step_next_ref == undefined
|
||||
->
|
||||
|
||||
%% delegate the step request to the next level
|
||||
|
@ -266,8 +275,8 @@ main_loop(State = #state{ next=Next }) ->
|
|||
|
||||
if (Next =:= undefined) andalso (MergeRef =:= undefined) ->
|
||||
%% nothing to do ... just return OK
|
||||
reply(StepFrom, ok),
|
||||
main_loop(State);
|
||||
State2 = reply_step_ok(State#state { step_caller = StepFrom }),
|
||||
main_loop(State2);
|
||||
true ->
|
||||
main_loop(State#state{ step_next_ref=DelegateRef,
|
||||
step_caller=StepFrom,
|
||||
|
@ -280,8 +289,7 @@ main_loop(State = #state{ next=Next }) ->
|
|||
|
||||
case State#state.step_next_ref of
|
||||
undefined ->
|
||||
reply(State#state.step_caller, ok),
|
||||
State2 = State#state{ step_caller=undefined };
|
||||
State2 = reply_step_ok(State);
|
||||
_ ->
|
||||
State2 = State
|
||||
end,
|
||||
|
@ -290,10 +298,11 @@ main_loop(State = #state{ next=Next }) ->
|
|||
|
||||
{'DOWN', MRef, _, _, _} when MRef == State#state.step_merge_ref ->
|
||||
|
||||
%% current merge worker died (or just finished)
|
||||
|
||||
case State#state.step_next_ref of
|
||||
undefined ->
|
||||
reply(State#state.step_caller, ok),
|
||||
State2 = State#state{ step_caller=undefined };
|
||||
State2 = reply_step_ok(State);
|
||||
_ ->
|
||||
State2 = State
|
||||
end,
|
||||
|
@ -301,11 +310,15 @@ main_loop(State = #state{ next=Next }) ->
|
|||
main_loop(State2#state{ step_merge_ref=undefined });
|
||||
|
||||
|
||||
?REPLY(MRef, ok) when (MRef =:= State#state.step_next_ref) and
|
||||
(State#state.step_merge_ref =:= undefined) ->
|
||||
reply(State#state.step_caller, ok),
|
||||
main_loop(State#state{ step_next_ref=undefined,
|
||||
step_caller=undefined });
|
||||
?REPLY(MRef, ok)
|
||||
when MRef =:= State#state.step_next_ref,
|
||||
State#state.step_merge_ref =:= undefined ->
|
||||
|
||||
%% this applies when we receive an OK from the next level,
|
||||
%% and we have finished the incremental merge at this level
|
||||
|
||||
State2 = reply_step_ok(State),
|
||||
main_loop(State2#state{ step_next_ref=undefined });
|
||||
|
||||
?REQ(From, close) ->
|
||||
close_if_defined(State#state.a),
|
||||
|
@ -508,6 +521,13 @@ main_loop(State = #state{ next=Next }) ->
|
|||
|
||||
end.
|
||||
|
||||
reply_step_ok(State) ->
|
||||
case State#state.step_caller of
|
||||
undefined -> ok;
|
||||
_ -> reply(State#state.step_caller, ok)
|
||||
end,
|
||||
State#state{ step_caller=undefined }.
|
||||
|
||||
do_lookup(_Key, []) ->
|
||||
not_found;
|
||||
do_lookup(_Key, [Pid]) when is_pid(Pid) ->
|
||||
|
|
|
@ -164,11 +164,9 @@ finish(#nursery{ dir=Dir, cache=Cache, log_file=LogFile, total_size=_TotalSize,
|
|||
%% inject the B-Tree (blocking RPC)
|
||||
ok = lsm_btree_level:inject(TopLevel, BTreeFileName),
|
||||
|
||||
%% synchroneously do some work if this is a top-level inject
|
||||
|
||||
% error_logger:info_msg("doing step ~p~n", [?BTREE_SIZE(?TOP_LEVEL)]),
|
||||
lsm_btree_level:step(TopLevel, ?BTREE_SIZE(?TOP_LEVEL)),
|
||||
|
||||
%% issue some work if this is a top-level inject (blocks until previous such
|
||||
%% incremental merge is finished).
|
||||
lsm_btree_level:incremental_merge(TopLevel, ?BTREE_SIZE(?TOP_LEVEL)),
|
||||
ok;
|
||||
|
||||
_ ->
|
||||
|
|
Loading…
Reference in a new issue