Implement incremental insert
This slows down insert to be log2(N), where N is the total number of objects in the store. The upside is that it also removes the terrible worst case scenarios for insert.
This commit is contained in:
parent
3d80b164d5
commit
ee90944c62
7 changed files with 131 additions and 29 deletions
|
@ -245,8 +245,10 @@ handle_cast(Info,State) ->
|
||||||
|
|
||||||
|
|
||||||
%% premature delete -> cleanup
|
%% premature delete -> cleanup
|
||||||
|
terminate(normal,_State) ->
|
||||||
|
ok;
|
||||||
terminate(_Reason,_State) ->
|
terminate(_Reason,_State) ->
|
||||||
% error_logger:info_msg("got terminate(~p,~p)~n", [Reason,State]),
|
error_logger:info_msg("got terminate(~p,~p)~n", [_Reason,_State]),
|
||||||
% flush_nursery(State),
|
% flush_nursery(State),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
|
|
@ -26,7 +26,7 @@
|
||||||
|
|
||||||
|
|
||||||
%% smallest levels are 8192 entries
|
%% smallest levels are 8192 entries
|
||||||
-define(TOP_LEVEL, 13).
|
-define(TOP_LEVEL, 5).
|
||||||
-define(BTREE_SIZE(Level), (1 bsl (Level))).
|
-define(BTREE_SIZE(Level), (1 bsl (Level))).
|
||||||
|
|
||||||
-define(TOMBSTONE, 'deleted').
|
-define(TOMBSTONE, 'deleted').
|
||||||
|
|
|
@ -42,22 +42,25 @@
|
||||||
-behavior(plain_fsm).
|
-behavior(plain_fsm).
|
||||||
-export([data_vsn/0, code_change/3]).
|
-export([data_vsn/0, code_change/3]).
|
||||||
|
|
||||||
-export([open/3, lookup/2, inject/2, close/1, async_range/3, sync_range/3]).
|
-export([open/3, lookup/2, inject/2, close/1, async_range/3, sync_range/3, step/2]).
|
||||||
|
|
||||||
-include_lib("kernel/include/file.hrl").
|
-include_lib("kernel/include/file.hrl").
|
||||||
|
|
||||||
-record(state, {
|
-record(state, {
|
||||||
a, b, c, next, dir, level, inject_done_ref, merge_pid, folding = []
|
a, b, c, next, dir, level, inject_done_ref, merge_pid, folding = [],
|
||||||
|
step_next_ref, step_caller, step_merge_ref
|
||||||
}).
|
}).
|
||||||
|
|
||||||
%%%%% PUBLIC OPERATIONS
|
%%%%% PUBLIC OPERATIONS
|
||||||
|
|
||||||
open(Dir,Level,Next) when Level>0 ->
|
open(Dir,Level,Next) when Level>0 ->
|
||||||
{ok, plain_fsm:spawn_link(?MODULE,
|
PID = plain_fsm:spawn_link(?MODULE,
|
||||||
fun() ->
|
fun() ->
|
||||||
process_flag(trap_exit,true),
|
process_flag(trap_exit,true),
|
||||||
initialize(#state{dir=Dir,level=Level,next=Next})
|
initialize(#state{dir=Dir,level=Level,next=Next})
|
||||||
end)}.
|
end),
|
||||||
|
step(PID, 2*?BTREE_SIZE(?TOP_LEVEL)),
|
||||||
|
{ok, PID}.
|
||||||
|
|
||||||
lookup(Ref, Key) ->
|
lookup(Ref, Key) ->
|
||||||
call(Ref, {lookup, Key}).
|
call(Ref, {lookup, Key}).
|
||||||
|
@ -66,6 +69,9 @@ inject(Ref, FileName) ->
|
||||||
Result = call(Ref, {inject, FileName}),
|
Result = call(Ref, {inject, FileName}),
|
||||||
Result.
|
Result.
|
||||||
|
|
||||||
|
step(Ref,Howmuch) ->
|
||||||
|
call(Ref,{step,Howmuch}).
|
||||||
|
|
||||||
close(Ref) ->
|
close(Ref) ->
|
||||||
try
|
try
|
||||||
call(Ref, close)
|
call(Ref, close)
|
||||||
|
@ -128,8 +134,8 @@ initialize(State) ->
|
||||||
try
|
try
|
||||||
initialize2(State)
|
initialize2(State)
|
||||||
catch
|
catch
|
||||||
Class:Ex ->
|
Class:Ex when not (Class == exit andalso Ex == normal) ->
|
||||||
error_logger:error_msg("crash: ~p:~p ~p~n", [Class,Ex,erlang:get_stacktrace()])
|
error_logger:error_msg("crash2: ~p:~p ~p~n", [Class,Ex,erlang:get_stacktrace()])
|
||||||
end.
|
end.
|
||||||
|
|
||||||
initialize2(State) ->
|
initialize2(State) ->
|
||||||
|
@ -231,9 +237,76 @@ main_loop(State = #state{ next=Next }) ->
|
||||||
end,
|
end,
|
||||||
ok = file:rename(FileName, ToFileName),
|
ok = file:rename(FileName, ToFileName),
|
||||||
{ok, BT} = lsm_btree_reader:open(ToFileName, random),
|
{ok, BT} = lsm_btree_reader:open(ToFileName, random),
|
||||||
|
|
||||||
reply(From, ok),
|
reply(From, ok),
|
||||||
check_begin_merge_then_loop(setelement(SetPos, State, BT));
|
check_begin_merge_then_loop(setelement(SetPos, State, BT));
|
||||||
|
|
||||||
|
%% accept step any time there is not an outstanding step
|
||||||
|
?REQ(StepFrom, {step, HowMuch})
|
||||||
|
when State#state.step_merge_ref == undefined,
|
||||||
|
State#state.step_caller == undefined,
|
||||||
|
State#state.step_next_ref == undefined
|
||||||
|
->
|
||||||
|
|
||||||
|
%% delegate the step request to the next level
|
||||||
|
if Next =:= undefined ->
|
||||||
|
DelegateRef = undefined;
|
||||||
|
true ->
|
||||||
|
DelegateRef = send_request(Next, {step, HowMuch})
|
||||||
|
end,
|
||||||
|
|
||||||
|
%% if there is a merge worker, send him a step message
|
||||||
|
case State#state.merge_pid of
|
||||||
|
undefined ->
|
||||||
|
MergeRef = undefined;
|
||||||
|
MergePID ->
|
||||||
|
MergeRef = monitor(process, MergePID),
|
||||||
|
MergePID ! {step, {self(), MergeRef}, HowMuch}
|
||||||
|
end,
|
||||||
|
|
||||||
|
if (Next =:= undefined) andalso (MergeRef =:= undefined) ->
|
||||||
|
%% nothing to do ... just return OK
|
||||||
|
reply(StepFrom, ok),
|
||||||
|
main_loop(State);
|
||||||
|
true ->
|
||||||
|
main_loop(State#state{ step_next_ref=DelegateRef,
|
||||||
|
step_caller=StepFrom,
|
||||||
|
step_merge_ref=MergeRef
|
||||||
|
})
|
||||||
|
end;
|
||||||
|
|
||||||
|
{MRef, step_done} when MRef == State#state.step_merge_ref ->
|
||||||
|
demonitor(MRef, [flush]),
|
||||||
|
|
||||||
|
case State#state.step_next_ref of
|
||||||
|
undefined ->
|
||||||
|
reply(State#state.step_caller, ok),
|
||||||
|
State2 = State#state{ step_caller=undefined };
|
||||||
|
_ ->
|
||||||
|
State2 = State
|
||||||
|
end,
|
||||||
|
|
||||||
|
main_loop(State2#state{ step_merge_ref=undefined });
|
||||||
|
|
||||||
|
{'DOWN', MRef, _, _, _} when MRef == State#state.step_merge_ref ->
|
||||||
|
|
||||||
|
case State#state.step_next_ref of
|
||||||
|
undefined ->
|
||||||
|
reply(State#state.step_caller, ok),
|
||||||
|
State2 = State#state{ step_caller=undefined };
|
||||||
|
_ ->
|
||||||
|
State2 = State
|
||||||
|
end,
|
||||||
|
|
||||||
|
main_loop(State2#state{ step_merge_ref=undefined });
|
||||||
|
|
||||||
|
|
||||||
|
?REPLY(MRef, ok) when (MRef =:= State#state.step_next_ref) and
|
||||||
|
(State#state.step_merge_ref =:= undefined) ->
|
||||||
|
reply(State#state.step_caller, ok),
|
||||||
|
main_loop(State#state{ step_next_ref=undefined,
|
||||||
|
step_caller=undefined });
|
||||||
|
|
||||||
?REQ(From, close) ->
|
?REQ(From, close) ->
|
||||||
close_if_defined(State#state.a),
|
close_if_defined(State#state.a),
|
||||||
close_if_defined(State#state.b),
|
close_if_defined(State#state.b),
|
||||||
|
|
|
@ -53,7 +53,7 @@ merge(A,B,C, Size, IsLastLevel) ->
|
||||||
{node, AKVs} = lsm_btree_reader:first_node(BT1),
|
{node, AKVs} = lsm_btree_reader:first_node(BT1),
|
||||||
{node, BKVs} = lsm_btree_reader:first_node(BT2),
|
{node, BKVs} = lsm_btree_reader:first_node(BT2),
|
||||||
|
|
||||||
{ok, Count, Out2} = scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, 0),
|
{ok, Count, Out2} = scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, 0, {0, none}),
|
||||||
|
|
||||||
%% finish stream tree
|
%% finish stream tree
|
||||||
ok = lsm_btree_reader:close(BT1),
|
ok = lsm_btree_reader:close(BT1),
|
||||||
|
@ -68,24 +68,42 @@ merge(A,B,C, Size, IsLastLevel) ->
|
||||||
|
|
||||||
{ok, Count}.
|
{ok, Count}.
|
||||||
|
|
||||||
|
step({N, From}) ->
|
||||||
|
{N-1, From}.
|
||||||
|
|
||||||
scan(BT1, BT2, Out, IsLastLevel, [], BKVs, Count) ->
|
scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, {0, FromPID}) ->
|
||||||
|
case FromPID of
|
||||||
|
none ->
|
||||||
|
ok;
|
||||||
|
{PID, Ref} ->
|
||||||
|
PID ! {Ref, step_done}
|
||||||
|
end,
|
||||||
|
|
||||||
|
% error_logger:info_msg("waiting for step in ~p~n", [self()]),
|
||||||
|
|
||||||
|
receive
|
||||||
|
{step, From, HowMany} ->
|
||||||
|
% error_logger:info_msg("got step ~p,~p in ~p~n", [From,HowMany, self()]),
|
||||||
|
scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, {HowMany, From})
|
||||||
|
end;
|
||||||
|
|
||||||
|
scan(BT1, BT2, Out, IsLastLevel, [], BKVs, Count, Step) ->
|
||||||
case lsm_btree_reader:next_node(BT1) of
|
case lsm_btree_reader:next_node(BT1) of
|
||||||
{node, AKVs} ->
|
{node, AKVs} ->
|
||||||
scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count);
|
scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, Step);
|
||||||
end_of_data ->
|
end_of_data ->
|
||||||
scan_only(BT2, Out, IsLastLevel, BKVs, Count)
|
scan_only(BT2, Out, IsLastLevel, BKVs, Count, Step)
|
||||||
end;
|
end;
|
||||||
|
|
||||||
scan(BT1, BT2, Out, IsLastLevel, AKVs, [], Count) ->
|
scan(BT1, BT2, Out, IsLastLevel, AKVs, [], Count, Step) ->
|
||||||
case lsm_btree_reader:next_node(BT2) of
|
case lsm_btree_reader:next_node(BT2) of
|
||||||
{node, BKVs} ->
|
{node, BKVs} ->
|
||||||
scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count);
|
scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, Step);
|
||||||
end_of_data ->
|
end_of_data ->
|
||||||
scan_only(BT1, Out, IsLastLevel, AKVs, Count)
|
scan_only(BT1, Out, IsLastLevel, AKVs, Count, Step)
|
||||||
end;
|
end;
|
||||||
|
|
||||||
scan(BT1, BT2, Out, IsLastLevel, [{Key1,Value1}|AT]=AKVs, [{Key2,Value2}|BT]=BKVs, Count) ->
|
scan(BT1, BT2, Out, IsLastLevel, [{Key1,Value1}|AT]=AKVs, [{Key2,Value2}|BT]=BKVs, Count, Step) ->
|
||||||
if Key1 < Key2 ->
|
if Key1 < Key2 ->
|
||||||
case ?LOCAL_WRITER of
|
case ?LOCAL_WRITER of
|
||||||
true ->
|
true ->
|
||||||
|
@ -94,7 +112,7 @@ scan(BT1, BT2, Out, IsLastLevel, [{Key1,Value1}|AT]=AKVs, [{Key2,Value2}|BT]=BKV
|
||||||
ok = lsm_btree_writer:add(Out2=Out, Key1, Value1)
|
ok = lsm_btree_writer:add(Out2=Out, Key1, Value1)
|
||||||
end,
|
end,
|
||||||
|
|
||||||
scan(BT1, BT2, Out2, IsLastLevel, AT, BKVs, Count+1);
|
scan(BT1, BT2, Out2, IsLastLevel, AT, BKVs, Count+1, step(Step));
|
||||||
|
|
||||||
Key2 < Key1 ->
|
Key2 < Key1 ->
|
||||||
case ?LOCAL_WRITER of
|
case ?LOCAL_WRITER of
|
||||||
|
@ -103,10 +121,10 @@ scan(BT1, BT2, Out, IsLastLevel, [{Key1,Value1}|AT]=AKVs, [{Key2,Value2}|BT]=BKV
|
||||||
false ->
|
false ->
|
||||||
ok = lsm_btree_writer:add(Out2=Out, Key2, Value2)
|
ok = lsm_btree_writer:add(Out2=Out, Key2, Value2)
|
||||||
end,
|
end,
|
||||||
scan(BT1, BT2, Out2, IsLastLevel, AKVs, BT, Count+1);
|
scan(BT1, BT2, Out2, IsLastLevel, AKVs, BT, Count+1, step(Step));
|
||||||
|
|
||||||
(?TOMBSTONE =:= Value2) and (true =:= IsLastLevel) ->
|
(?TOMBSTONE =:= Value2) and (true =:= IsLastLevel) ->
|
||||||
scan(BT1, BT2, Out, IsLastLevel, AT, BT, Count);
|
scan(BT1, BT2, Out, IsLastLevel, AT, BT, Count, step(Step));
|
||||||
|
|
||||||
true ->
|
true ->
|
||||||
case ?LOCAL_WRITER of
|
case ?LOCAL_WRITER of
|
||||||
|
@ -115,25 +133,25 @@ scan(BT1, BT2, Out, IsLastLevel, [{Key1,Value1}|AT]=AKVs, [{Key2,Value2}|BT]=BKV
|
||||||
false ->
|
false ->
|
||||||
ok = lsm_btree_writer:add(Out2=Out, Key2, Value2)
|
ok = lsm_btree_writer:add(Out2=Out, Key2, Value2)
|
||||||
end,
|
end,
|
||||||
scan(BT1, BT2, Out2, IsLastLevel, AT, BT, Count+1)
|
scan(BT1, BT2, Out2, IsLastLevel, AT, BT, Count+1, step(Step))
|
||||||
end.
|
end.
|
||||||
|
|
||||||
scan_only(BT, Out, IsLastLevel, [], Count) ->
|
scan_only(BT, Out, IsLastLevel, [], Count, Step) ->
|
||||||
case lsm_btree_reader:next_node(BT) of
|
case lsm_btree_reader:next_node(BT) of
|
||||||
{node, KVs} ->
|
{node, KVs} ->
|
||||||
scan_only(BT, Out, IsLastLevel, KVs, Count);
|
scan_only(BT, Out, IsLastLevel, KVs, Count, step(Step));
|
||||||
end_of_data ->
|
end_of_data ->
|
||||||
{ok, Count, Out}
|
{ok, Count, Out}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
scan_only(BT, Out, true, [{_,?TOMBSTONE}|Rest], Count) ->
|
scan_only(BT, Out, true, [{_,?TOMBSTONE}|Rest], Count, Step) ->
|
||||||
scan_only(BT, Out, true, Rest, Count);
|
scan_only(BT, Out, true, Rest, Count, step(Step));
|
||||||
|
|
||||||
scan_only(BT, Out, IsLastLevel, [{Key,Value}|Rest], Count) ->
|
scan_only(BT, Out, IsLastLevel, [{Key,Value}|Rest], Count, Step) ->
|
||||||
case ?LOCAL_WRITER of
|
case ?LOCAL_WRITER of
|
||||||
true ->
|
true ->
|
||||||
{noreply, Out2} = lsm_btree_writer:handle_cast({add, Key, Value}, Out);
|
{noreply, Out2} = lsm_btree_writer:handle_cast({add, Key, Value}, Out);
|
||||||
false ->
|
false ->
|
||||||
ok = lsm_btree_writer:add(Out2=Out, Key, Value)
|
ok = lsm_btree_writer:add(Out2=Out, Key, Value)
|
||||||
end,
|
end,
|
||||||
scan_only(BT, Out2, IsLastLevel, Rest, Count+1).
|
scan_only(BT, Out2, IsLastLevel, Rest, Count+1, step(Step)).
|
||||||
|
|
|
@ -162,7 +162,15 @@ finish(#nursery{ dir=Dir, cache=Cache, log_file=LogFile, total_size=_TotalSize,
|
||||||
% [ gb_trees:size(Cache), TotalSize, FileInfo#file_info.size ]),
|
% [ gb_trees:size(Cache), TotalSize, FileInfo#file_info.size ]),
|
||||||
|
|
||||||
%% inject the B-Tree (blocking RPC)
|
%% inject the B-Tree (blocking RPC)
|
||||||
ok = lsm_btree_level:inject(TopLevel, BTreeFileName);
|
ok = lsm_btree_level:inject(TopLevel, BTreeFileName),
|
||||||
|
|
||||||
|
%% synchroneously do some work if this is a top-level inject
|
||||||
|
|
||||||
|
% error_logger:info_msg("doing step ~p~n", [?BTREE_SIZE(?TOP_LEVEL)]),
|
||||||
|
lsm_btree_level:step(TopLevel, ?BTREE_SIZE(?TOP_LEVEL)),
|
||||||
|
|
||||||
|
ok;
|
||||||
|
|
||||||
_ ->
|
_ ->
|
||||||
ok
|
ok
|
||||||
end,
|
end,
|
||||||
|
|
|
@ -31,6 +31,7 @@ merge_test() ->
|
||||||
ok = lsm_btree_writer:close(BT2),
|
ok = lsm_btree_writer:close(BT2),
|
||||||
|
|
||||||
|
|
||||||
|
self() ! {step, {self(), none}, 2000000000},
|
||||||
{Time,{ok,Count}} = timer:tc(lsm_btree_merger, merge, ["test1", "test2", "test3", 10000, true]),
|
{Time,{ok,Count}} = timer:tc(lsm_btree_merger, merge, ["test1", "test2", "test3", 10000, true]),
|
||||||
|
|
||||||
error_logger:info_msg("time to merge: ~p/sec (time=~p, count=~p)~n", [1000000/(Time/Count), Time/1000000, Count]),
|
error_logger:info_msg("time to merge: ~p/sec (time=~p, count=~p)~n", [1000000/(Time/Count), Time/1000000, Count]),
|
||||||
|
|
|
@ -38,7 +38,7 @@ full_test_() ->
|
||||||
?_test(test_tree_simple_2()),
|
?_test(test_tree_simple_2()),
|
||||||
?_test(test_tree_simple_3()),
|
?_test(test_tree_simple_3()),
|
||||||
?_test(test_tree_simple_4()),
|
?_test(test_tree_simple_4()),
|
||||||
{timeout, 30, ?_test(test_tree())},
|
{timeout, 300, ?_test(test_tree())},
|
||||||
{timeout, 120, ?_test(test_qc())}
|
{timeout, 120, ?_test(test_qc())}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
|
@ -299,7 +299,7 @@ test_tree() ->
|
||||||
<<N:128>>, <<"data",N:128>>)
|
<<N:128>>, <<"data",N:128>>)
|
||||||
end,
|
end,
|
||||||
ok,
|
ok,
|
||||||
lists:seq(2,10000,1)),
|
lists:seq(2,100000,1)),
|
||||||
lists:foldl(fun(N,_) ->
|
lists:foldl(fun(N,_) ->
|
||||||
ok = lsm_btree:put(Tree,
|
ok = lsm_btree:put(Tree,
|
||||||
<<N:128>>, <<"data",N:128>>)
|
<<N:128>>, <<"data",N:128>>)
|
||||||
|
|
Loading…
Reference in a new issue