Add both sync and async range fold

Sync blocks insert/lookup while doing a range
query, but still buffers the results in a
per-range query process.  

Async fold runs on a hard link copy of the
underlying tree data.

This commit and also fixes a number of bugs 
related to folding; it was not taking nursery 
data into account.
This commit is contained in:
Kresten Krab Thorup 2012-01-19 14:25:47 +01:00
parent ead8d3a41d
commit f56f530d7a
5 changed files with 144 additions and 66 deletions

View file

@ -5,7 +5,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]). terminate/2, code_change/3]).
-export([open/1, close/1, lookup/2, delete/2, put/3, async_range/3, fold_range/5]). -export([open/1, close/1, lookup/2, delete/2, put/3, async_range/3, async_fold_range/5, sync_range/3, sync_fold_range/5]).
-include("lsm_btree.hrl"). -include("lsm_btree.hrl").
-include_lib("kernel/include/file.hrl"). -include_lib("kernel/include/file.hrl").
@ -35,10 +35,17 @@ delete(Ref,Key) when is_binary(Key) ->
put(Ref,Key,Value) when is_binary(Key), is_binary(Value) -> put(Ref,Key,Value) when is_binary(Key), is_binary(Value) ->
gen_server:call(Ref, {put, Key, Value}). gen_server:call(Ref, {put, Key, Value}).
sync_range(Ref,FromKey,ToKey) when is_binary(FromKey), is_binary(ToKey) ->
gen_server:call(Ref, {sync_range, self(), FromKey, ToKey}).
sync_fold_range(Ref,Fun,Acc0,FromKey,ToKey) ->
{ok, PID} = sync_range(Ref,FromKey,ToKey),
receive_fold_range(PID,Fun,Acc0).
async_range(Ref,FromKey,ToKey) when is_binary(FromKey), is_binary(ToKey) -> async_range(Ref,FromKey,ToKey) when is_binary(FromKey), is_binary(ToKey) ->
gen_server:call(Ref, {async_range, self(), FromKey, ToKey}). gen_server:call(Ref, {async_range, self(), FromKey, ToKey}).
fold_range(Ref,Fun,Acc0,FromKey,ToKey) -> async_fold_range(Ref,Fun,Acc0,FromKey,ToKey) ->
{ok, PID} = async_range(Ref,FromKey,ToKey), {ok, PID} = async_range(Ref,FromKey,ToKey),
receive_fold_range(PID,Fun,Acc0). receive_fold_range(PID,Fun,Acc0).
@ -129,8 +136,16 @@ code_change(_OldVsn, State, _Extra) ->
handle_call({async_range, Sender, FromKey, ToKey}, _From, State=#state{ top=TopLevel }) -> handle_call({async_range, Sender, FromKey, ToKey}, _From, State=#state{ top=TopLevel, nursery=Nursery }) ->
Result = lsm_btree_level:range_fold(TopLevel, Sender, FromKey, ToKey), {ok, FoldWorkerPID} = lsm_btree_fold_worker:start(Sender),
lsm_btree_nursery:do_level_fold(Nursery, FoldWorkerPID),
Result = lsm_btree_level:async_range(TopLevel, FoldWorkerPID, FromKey, ToKey),
{reply, Result, State};
handle_call({sync_range, Sender, FromKey, ToKey}, _From, State=#state{ top=TopLevel, nursery=Nursery }) ->
{ok, FoldWorkerPID} = lsm_btree_fold_worker:start(Sender),
lsm_btree_nursery:do_level_fold(Nursery, FoldWorkerPID),
Result = lsm_btree_level:sync_range(TopLevel, FoldWorkerPID, FromKey, ToKey),
{reply, Result, State}; {reply, Result, State};
handle_call({put, Key, Value}, _From, State) when is_binary(Key), is_binary(Value) -> handle_call({put, Key, Value}, _From, State) when is_binary(Key), is_binary(Value) ->

View file

@ -33,29 +33,35 @@
-behavior(plain_fsm). -behavior(plain_fsm).
-export([data_vsn/0, code_change/3]). -export([data_vsn/0, code_change/3]).
-include("lsm_btree.hrl").
-record(state, {sendto}). -record(state, {sendto}).
start(SendTo) -> start(SendTo) ->
PID = plain_fsm:spawn_link(?MODULE, PID = plain_fsm:spawn(?MODULE,
fun() -> fun() ->
process_flag(trap_exit,true), process_flag(trap_exit,true),
initialize(#state{sendto=SendTo}) initialize(#state{sendto=SendTo}, [])
end), end),
{ok, PID}. {ok, PID}.
initialize(State) -> initialize(State, PrefixFolders) ->
Parent = plain_fsm:info(parent), Parent = plain_fsm:info(parent),
receive receive
{prefix, [_]=Folders} ->
initialize(State, Folders);
{initialize, Folders} -> {initialize, Folders} ->
Initial = [ {PID,undefined} || PID <- Folders ],
fill(State, Initial, Folders); Initial = [ {PID,undefined} || PID <- (PrefixFolders ++ Folders) ],
fill(State, Initial, PrefixFolders ++ Folders);
%% gen_fsm handling %% gen_fsm handling
{system, From, Req} -> {system, From, Req} ->
plain_fsm:handle_system_msg( plain_fsm:handle_system_msg(
From, Req, State, fun(S1) -> initialize(S1) end); From, Req, State, fun(S1) -> initialize(S1, PrefixFolders) end);
{'EXIT', Parent, Reason} -> {'EXIT', Parent, Reason} ->
plain_fsm:parent_EXIT(Reason, State) plain_fsm:parent_EXIT(Reason, State)
@ -86,31 +92,31 @@ fill(State, Values, [PID|Rest]=PIDs) ->
fill(State, Values, PIDs) fill(State, Values, PIDs)
end end
end. end.
emit_next(State, []) -> emit_next(State, []) ->
State#state.sendto ! {fold_done, self()}, State#state.sendto ! {fold_done, self()},
ok; ok;
emit_next(State, [{PID,{Key,Value}}]) ->
State#state.sendto ! {fold_result, self(), Key, Value},
fill(State, [{PID,undefined}], [PID]);
emit_next(State, [{FirstPID,FirstKV}|Rest]=Values) -> emit_next(State, [{FirstPID,FirstKV}|Rest]=Values) ->
{{FoundKey, FoundValue}, FillFrom} = {{FoundKey, FoundValue}, FillFrom} =
lists:foldl(fun({P,{K1,V1}}, {{K2,_},_}) when K1 < K2 -> lists:foldl(fun({P,{K1,_}=KV}, {{K2,_},_}) when K1 < K2 ->
{{K1,V1},[P]}; {KV,[P]};
({P,{K,_}}, {{K,V},List}) -> ({P,{K,_}}, {{K,_}=KV,List}) ->
{{K,V}, [P|List]}; {KV, [P|List]};
(_, Found) -> (_, Found) ->
Found Found
end, end,
{FirstKV,[FirstPID]}, {FirstKV,[FirstPID]},
Rest), Rest),
State#state.sendto ! {fold_result, self(), FoundKey, FoundValue}, case FoundValue of
?TOMBSTONE ->
ok;
_ ->
State#state.sendto ! {fold_result, self(), FoundKey, FoundValue}
end,
fill(State, Values, FillFrom). fill(State, Values, FillFrom).

View file

@ -16,12 +16,12 @@
-behavior(plain_fsm). -behavior(plain_fsm).
-export([data_vsn/0, code_change/3]). -export([data_vsn/0, code_change/3]).
-export([open/3, lookup/2, inject/2, close/1, range_fold/4]). -export([open/3, lookup/2, inject/2, close/1, async_range/4, sync_range/4]).
-include_lib("kernel/include/file.hrl"). -include_lib("kernel/include/file.hrl").
-record(state, { -record(state, {
a, b, next, dir, level, inject_done_ref, merge_pid, folding = 0 a, b, next, dir, level, inject_done_ref, merge_pid, folding = []
}). }).
%%%%% PUBLIC OPERATIONS %%%%% PUBLIC OPERATIONS
@ -48,9 +48,16 @@ close(Ref) ->
end. end.
range_fold(Ref, SendTo, From, To) ->
{ok, FoldWorkerPID} = lsm_btree_fold_worker:start(SendTo), async_range(Ref, FoldWorkerPID, From, To) ->
proc_lib:spawn(fun() ->
{ok, Folders} = call(Ref, {init_range_fold, FoldWorkerPID, From, To, []}), {ok, Folders} = call(Ref, {init_range_fold, FoldWorkerPID, From, To, []}),
FoldWorkerPID ! {initialize, Folders}
end),
{ok, FoldWorkerPID}.
sync_range(Ref, FoldWorkerPID, From, To) ->
{ok, Folders} = call(Ref, {sync_range_fold, FoldWorkerPID, From, To, []}),
FoldWorkerPID ! {initialize, Folders}, FoldWorkerPID ! {initialize, Folders},
{ok, FoldWorkerPID}. {ok, FoldWorkerPID}.
@ -77,7 +84,6 @@ receive_reply(MRef) ->
erlang:demonitor(MRef, [flush]), erlang:demonitor(MRef, [flush]),
Reply; Reply;
{'DOWN', MRef, _, _, Reason} -> {'DOWN', MRef, _, _, Reason} ->
error_logger:info_msg("Level dies, reason=~p~n", [Reason]),
exit(Reason) exit(Reason)
end. end.
@ -185,29 +191,28 @@ main_loop(State = #state{ next=Next }) ->
end, end,
ok; ok;
?REQ(From, {init_range_fold, WorkerPID, FromKey, ToKey, List}) when State#state.folding == 0 -> ?REQ(From, {init_range_fold, WorkerPID, FromKey, ToKey, List}) when State#state.folding == [] ->
case {State#state.a, State#state.b} of case {State#state.a, State#state.b} of
{undefined, undefined} -> {undefined, undefined} ->
NewFolding = 0, NewFolding = [],
NextList = List; NextList = List;
{_, undefined} -> {_, undefined} ->
NewFolding = 1,
ok = file:make_link(filename("A", State), filename("AF", State)), ok = file:make_link(filename("A", State), filename("AF", State)),
{ok, PID0} = start_range_fold(filename("AF",State), WorkerPID, FromKey, ToKey), {ok, PID0} = start_range_fold(filename("AF",State), WorkerPID, FromKey, ToKey),
NextList = [PID0|List]; NextList = [PID0|List],
NewFolding = [PID0];
{_, _} -> {_, _} ->
NewFolding = 2,
ok = file:make_link(filename("A", State), filename("AF", State)), ok = file:make_link(filename("A", State), filename("AF", State)),
{ok, PID0} = start_range_fold(filename("AF",State), WorkerPID, FromKey, ToKey), {ok, PID0} = start_range_fold(filename("AF",State), WorkerPID, FromKey, ToKey),
ok = file:make_link(filename("B", State), filename("BF", State)), ok = file:make_link(filename("B", State), filename("BF", State)),
{ok, PID1} = start_range_fold(filename("BF",State), WorkerPID, FromKey, ToKey), {ok, PID1} = start_range_fold(filename("BF",State), WorkerPID, FromKey, ToKey),
NextList = [PID1,PID0|List] NextList = [PID1,PID0|List],
NewFolding = [PID1,PID0]
end, end,
case Next of case Next of
@ -219,9 +224,40 @@ main_loop(State = #state{ next=Next }) ->
main_loop(State#state{ folding = NewFolding }); main_loop(State#state{ folding = NewFolding });
{range_fold_done, _PID, FoldFileName} -> {range_fold_done, PID, [_,$F|_]=FoldFileName} ->
ok = file:delete(FoldFileName), ok = file:delete(FoldFileName),
main_loop(State#state{ folding = State#state.folding-1 }); main_loop(State#state{ folding = lists:delete(PID,State#state.folding) });
?REQ(From, {sync_range_fold, WorkerPID, FromKey, ToKey, List}) ->
case {State#state.a, State#state.b} of
{undefined, undefined} ->
RefList = List;
{_, undefined} ->
ARef = erlang:make_ref(),
ok = do_range_fold(State#state.a, WorkerPID, ARef, FromKey, ToKey),
RefList = [ARef|List];
{_, _} ->
BRef = erlang:make_ref(),
ok = do_range_fold(State#state.b, WorkerPID, BRef, FromKey, ToKey),
ARef = erlang:make_ref(),
ok = do_range_fold(State#state.a, WorkerPID, ARef, FromKey, ToKey),
RefList = [ARef,BRef|List]
end,
case Next of
undefined ->
reply(From, {ok, lists:reverse(RefList)});
_ ->
Next ! ?REQ(From, {sync_range_fold, WorkerPID, FromKey, ToKey, RefList})
end,
main_loop(State);
%% %%
%% The outcome of merging resulted in a file with less than %% The outcome of merging resulted in a file with less than
@ -283,7 +319,13 @@ main_loop(State = #state{ next=Next }) ->
%% Probably from a merger_pid - which we may have forgotten in the meantime. %% Probably from a merger_pid - which we may have forgotten in the meantime.
main_loop(State); main_loop(State);
{'EXIT', Pid, Reason} when Pid == State#state.merge_pid -> {'EXIT', Pid, Reason} when Pid == State#state.merge_pid ->
restart_merge_then_loop(State#state{merge_pid=undefined}, Reason) restart_merge_then_loop(State#state{merge_pid=undefined}, Reason);
{'EXIT', PID, _} when [PID] == tl(State#state.folding);
hd(State#state.folding) == PID ->
main_loop(State#state{ folding = lists:delete(PID,State#state.folding) });
{'EXIT', PID, Reason} ->
error_logger:info_msg("got unexpected exit ~p from ~p~n", [Reason, PID])
end. end.
do_lookup(_Key, []) -> do_lookup(_Key, []) ->
@ -356,20 +398,24 @@ start_range_fold(FileName, WorkerPID, FromKey, ToKey) ->
proc_lib:spawn( fun() -> proc_lib:spawn( fun() ->
erlang:link(WorkerPID), erlang:link(WorkerPID),
{ok, File} = lsm_btree_reader:open(FileName), {ok, File} = lsm_btree_reader:open(FileName),
lsm_btree_reader:range_fold(fun(Key,Value,_) -> do_range_fold(File, WorkerPID, self(), FromKey, ToKey),
WorkerPID ! {level_result, self(), Key, Value},
ok
end,
ok,
File,
FromKey, ToKey),
%% tell fold merge worker we're done
WorkerPID ! {level_done, self()},
erlang:unlink(WorkerPID), erlang:unlink(WorkerPID),
%% this will release the pinning of the fold file %% this will release the pinning of the fold file
Owner ! {range_fold_done, self(), FileName} Owner ! {range_fold_done, self(), FileName}
end ), end ),
{ok, PID}. {ok, PID}.
do_range_fold(BT, WorkerPID, Self, FromKey, ToKey) ->
lsm_btree_reader:range_fold(fun(Key,Value,_) ->
WorkerPID ! {level_result, Self, Key, Value},
ok
end,
ok,
BT,
FromKey, ToKey),
%% tell fold merge worker we're done
WorkerPID ! {level_done, Self},
ok.

View file

@ -1,6 +1,7 @@
-module(lsm_btree_nursery). -module(lsm_btree_nursery).
-export([new/1, recover/2, add/3, finish/2, lookup/2, add_maybe_flush/4]). -export([new/1, recover/2, add/3, finish/2, lookup/2, add_maybe_flush/4]).
-export([do_level_fold/2]).
-include("lsm_btree.hrl"). -include("lsm_btree.hrl").
-include_lib("kernel/include/file.hrl"). -include_lib("kernel/include/file.hrl").
@ -155,3 +156,12 @@ add_maybe_flush(Key, Value, Nursery=#nursery{ dir=Dir }, Top) ->
lsm_btree_nursery:new(Dir) lsm_btree_nursery:new(Dir)
end. end.
do_level_fold(#nursery{ cache=Cache }, FoldWorkerPID) ->
Ref = erlang:make_ref(),
FoldWorkerPID ! {prefix, [Ref]},
lists:foreach(fun({Key,Value}) ->
FoldWorkerPID ! {level_result, Ref, Key, Value}
end,
gb_trees:to_list(Cache)),
FoldWorkerPID ! {level_done, Ref},
ok.

View file

@ -30,10 +30,10 @@ full_test_() ->
fun () -> ok end, fun () -> ok end,
fun (_) -> ok end, fun (_) -> ok end,
[ [
{timeout, 120, ?_test(test_qc())},
?_test(test_tree_simple_1()), ?_test(test_tree_simple_1()),
?_test(test_tree_simple_2()), ?_test(test_tree_simple_2()),
?_test(test_tree()) ?_test(test_tree()),
{timeout, 120, ?_test(test_qc())}
]}. ]}.
-ifdef(TRIQ). -ifdef(TRIQ).
@ -213,9 +213,9 @@ test_tree_simple_2() ->
test_tree() -> test_tree() ->
%% application:start(sasl), application:start(sasl),
{ok, Tree} = lsm_btree:open("simple"), {ok, Tree} = lsm_btree:open("simple2"),
lists:foldl(fun(N,_) -> lists:foldl(fun(N,_) ->
ok = lsm_btree:put(Tree, ok = lsm_btree:put(Tree,
<<N:128>>, <<"data",N:128>>) <<N:128>>, <<"data",N:128>>)
@ -229,8 +229,9 @@ test_tree() ->
ok, ok,
lists:seq(4000,6000,1)), lists:seq(4000,6000,1)),
lsm_btree:delete(Tree, <<1500:128>>),
{Time,{ok,Count}} = timer:tc(?MODULE, run_fold, [Tree,1000,9000]), {Time,{ok,Count}} = timer:tc(?MODULE, run_fold, [Tree,1000,2000]),
error_logger:info_msg("time to fold: ~p/sec (time=~p, count=~p)~n", [1000000/(Time/Count), Time/1000000, Count]), error_logger:info_msg("time to fold: ~p/sec (time=~p, count=~p)~n", [1000000/(Time/Count), Time/1000000, Count]),
@ -238,25 +239,25 @@ test_tree() ->
ok = lsm_btree:close(Tree). ok = lsm_btree:close(Tree).
run_fold(Tree,From,To) -> run_fold(Tree,From,To) ->
{ok, PID} = lsm_btree:async_range(Tree, <<From:128>>, <<(To+1):128>>), {ok, PID} = lsm_btree:sync_range(Tree, <<From:128>>, <<(To+1):128>>),
lists:foreach(fun(N) -> lists:foreach(fun(1500) -> ok;
(N) ->
receive receive
{fold_result, PID, <<N:128>>,_} -> ok {fold_result, PID, <<N:128>>,_} -> ok
after 1000 -> after 100 ->
error_logger:info_msg("timed out on #~p~n", [N]) error_logger:info_msg("timed out on #~p~n", [N])
end end
end, end,
lists:seq(From,To,1)), lists:seq(From,To,1)),
receive
{fold_result, PID, <<N:128>>,_} ->
error_logger:info_msg("got fold key #~p! ~n", [N])
after 0 -> ok
end,
receive receive
{fold_done, PID} -> ok {fold_done, PID} -> ok
after 1000 -> after 1000 ->
error_logger:info_msg("timed out on fond_done! ~n", []) error_logger:info_msg("timed out on fold_done! ~n", [])
end, end,
%% we should now have no spurious messages
{messages,[]} = erlang:process_info(self(),messages),
{ok, To-From}. {ok, To-From}.