Implement lsm_btree:fold_range/5

First implementation of range fold

Range folding doesn't prohibit insert/lookup or
merge operations, but each level can only have
one range fold operation active.

Thus, worst case active range folds can double
space requirements, because it holds hard-linked
copies of used btree files.
This commit is contained in:
Kresten Krab Thorup 2012-01-16 00:37:52 +01:00
parent 2923ef8eff
commit 771d18f9f7
4 changed files with 252 additions and 4 deletions

View file

@ -5,7 +5,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-export([open/1, close/1, lookup/2, delete/2, put/3]).
-export([open/1, close/1, lookup/2, delete/2, put/3, range/3, fold_range/5]).
-include("lsm_btree.hrl").
-include_lib("kernel/include/file.hrl").
@ -30,6 +30,20 @@ delete(Ref,Key) when is_binary(Key) ->
put(Ref,Key,Value) when is_binary(Key), is_binary(Value) ->
gen_server:call(Ref, {put, Key, Value}).
range(Ref,FromKey,ToKey) when is_binary(FromKey), is_binary(ToKey) ->
gen_server:call(Ref, {range, self(), FromKey, ToKey}).
fold_range(Ref,Fun,Acc0,FromKey,ToKey) ->
{ok, PID} = range(Ref,FromKey,ToKey),
receive_fold_range(PID,Fun,Acc0).
receive_fold_range(PID,Fun,Acc0) ->
receive
{fold_result, PID, K,V} ->
receive_fold_range(PID, Fun, Fun(K,V,Acc0));
{fold_done, PID} ->
Acc0
end.
init([Dir]) ->
@ -110,6 +124,10 @@ code_change(_OldVsn, State, _Extra) ->
handle_call({range, Sender, FromKey, ToKey}, _From, State=#state{ top=TopLevel }) ->
Result = lsm_btree_level:range_fold(TopLevel, Sender, FromKey, ToKey),
{reply, Result, State};
handle_call({put, Key, Value}, _From, State) when is_binary(Key), is_binary(Value) ->
{ok, State2} = do_put(Key, Value, State),
{reply, ok, State2};
@ -133,7 +151,6 @@ handle_call(close, _From, State) ->
{ok, State2} = flush_nursery(State),
{stop, normal, ok, State2}.
do_put(Key, Value, State=#state{ nursery=Nursery, top=Top }) ->
{ok, Nursery2} = lsm_btree_nursery:add_maybe_flush(Key, Value, Nursery, Top),
{ok, State#state{ nursery=Nursery2 }}.

View file

@ -0,0 +1,123 @@
-module(lsm_btree_fold_worker).
%%
%% This worker is used to merge fold results from individual
%% levels. First, it receives a message
%%
%% {initialize, [LevelWorker, ...]}
%%
%% And then from each LevelWorker, a sequence of
%%
%% {level_result, LevelWorker, Key1, Value}
%% {level_result, LevelWorker, Key2, Value}
%% {level_result, LevelWorker, Key3, Value}
%% {level_result, LevelWorker, Key4, Value}
%% ...
%% {level_done, LevelWorker}
%%
%% The order of level workers in the initialize messge is top-down,
%% which is used to select between same-key messages from different
%% levels.
%%
%% This fold_worker process will then send to a designated SendTo target
%% a similar sequence of messages
%%
%% {fold_result, self(), Key1, Value}
%% {fold_result, self(), Key2, Value}
%% {fold_result, self(), Key3, Value}
%% ...
%% {fold_done, self()}.
%%
-export([start/1]).
-behavior(plain_fsm).
-export([data_vsn/0, code_change/3]).
-record(state, {sendto}).
start(SendTo) ->
PID = plain_fsm:spawn_link(?MODULE,
fun() ->
process_flag(trap_exit,true),
initialize(#state{sendto=SendTo})
end),
{ok, PID}.
initialize(State) ->
Parent = plain_fsm:info(parent),
receive
{initialize, Folders} ->
Initial = [ {PID,undefined} || PID <- Folders ],
fill(State, Initial, Folders);
%% gen_fsm handling
{system, From, Req} ->
plain_fsm:handle_system_msg(
From, Req, State, fun(S1) -> initialize(S1) end);
{'EXIT', Parent, Reason} ->
plain_fsm:parent_EXIT(Reason, State)
end.
fill(State, Values, []) ->
emit_next(State, Values);
fill(State, Values, [PID|Rest]=PIDs) ->
receive
{level_done, PID} ->
fill(State, lists:keydelete(PID, 1, Values), Rest);
{level_result, PID, Key, Value} ->
fill(State, lists:keyreplace(PID, 1, Values, {PID,{Key,Value}}), Rest);
%% gen_fsm handling
{system, From, Req} ->
plain_fsm:handle_system_msg(
From, Req, State, fun(S1) -> fill(S1, Values, PIDs) end);
{'EXIT', Parent, Reason}=Msg ->
case plain_fsm:info(parent) == Parent of
true ->
plain_fsm:parent_EXIT(Reason, State);
false ->
error_logger:info_msg("unhandled EXIT message ~p~n", [Msg]),
fill(State, Values, PIDs)
end
end.
emit_next(State, []) ->
State#state.sendto ! {fold_done, self()},
ok;
emit_next(State, [{PID,{Key,Value}}]) ->
State#state.sendto ! {fold_result, self(), Key, Value},
fill(State, [{PID,undefined}], [PID]);
emit_next(State, [{FirstPID,FirstKV}|Rest]=Values) ->
{{FoundKey, FoundValue}, FillFrom} =
lists:foldl(fun({P,{K1,V1}}, {{K2,_},_}) when K1 < K2 ->
{{K1,V1},[P]};
({P,{K,_}}, {{K,V},List}) ->
{{K,V}, [P|List]};
(_, Found) ->
Found
end,
{FirstKV,[FirstPID]},
Rest),
State#state.sendto ! {fold_result, self(), FoundKey, FoundValue},
fill(State, Values, FillFrom).
data_vsn() ->
5.
code_change(_OldVsn, _State, _Extra) ->
{ok, {#state{}, data_vsn()}}.

View file

@ -16,12 +16,12 @@
-behavior(plain_fsm).
-export([data_vsn/0, code_change/3]).
-export([open/3, lookup/2, inject/2, close/1]).
-export([open/3, lookup/2, inject/2, close/1, range_fold/4]).
-include_lib("kernel/include/file.hrl").
-record(state, {
a, b, next, dir, level, inject_done_ref, merge_pid
a, b, next, dir, level, inject_done_ref, merge_pid, folding = 0
}).
%%%%% PUBLIC OPERATIONS
@ -43,6 +43,11 @@ inject(Ref, FileName) ->
close(Ref) ->
call(Ref, close).
range_fold(Ref, SendTo, From, To) ->
{ok, FoldWorkerPID} = lsm_btree_fold_worker:start(SendTo),
{ok, Folders} = call(Ref, {init_range_fold, FoldWorkerPID, From, To, []}),
FoldWorkerPID ! {initialize, Folders},
{ok, FoldWorkerPID}.
%%%%% INTERNAL
@ -90,6 +95,10 @@ initialize(State) ->
%% remove old merge file
file:delete( filename("X",State)),
%% remove old fold files (hard links to A/B used during fold)
file:delete( filename("AF",State)),
file:delete( filename("BF",State)),
case file:read_file_info(CFileName) of
{ok, _} ->
@ -163,6 +172,44 @@ main_loop(State = #state{ next=Next }) ->
reply(From, ok),
ok;
?REQ(From, {init_range_fold, WorkerPID, FromKey, ToKey, List}) when State#state.folding == 0 ->
case {State#state.a, State#state.b} of
{undefined, undefined} ->
NewFolding = 0,
NextList = List;
{_, undefined} ->
NewFolding = 1,
ok = file:make_link(filename("A", State), filename("AF", State)),
{ok, PID0} = start_range_fold(filename("AF",State), WorkerPID, FromKey, ToKey),
NextList = [PID0|List];
{_, _} ->
NewFolding = 2,
ok = file:make_link(filename("A", State), filename("AF", State)),
{ok, PID0} = start_range_fold(filename("AF",State), WorkerPID, FromKey, ToKey),
ok = file:make_link(filename("B", State), filename("BF", State)),
{ok, PID1} = start_range_fold(filename("BF",State), WorkerPID, FromKey, ToKey),
NextList = [PID1,PID0|List]
end,
case Next of
undefined ->
reply(From, {ok, lists:reverse(NextList)});
_ ->
Next ! ?REQ(From, {init_range_fold, WorkerPID, FromKey, ToKey, NextList})
end,
main_loop(State#state{ folding = NewFolding });
{range_fold_done, _PID, FoldFileName} ->
ok = file:delete(FoldFileName),
main_loop(State#state{ folding = State#state.folding-1 });
%%
%% The outcome of merging resulted in a file with less than
%% level #entries, so we keep it at this level
@ -289,3 +336,27 @@ close_a_and_b(State) ->
filename(PFX, State) ->
filename:join(State#state.dir, PFX ++ "-" ++ integer_to_list(State#state.level) ++ ".data").
start_range_fold(FileName, WorkerPID, FromKey, ToKey) ->
Owner = self(),
PID =
proc_lib:spawn( fun() ->
erlang:link(WorkerPID),
{ok, File} = lsm_btree_reader:open(FileName),
lsm_btree_reader:range_fold(fun(Key,Value,_) ->
WorkerPID ! {level_result, self(), Key, Value},
ok
end,
ok,
File,
FromKey, ToKey),
%% tell fold merge worker we're done
WorkerPID ! {level_done, self()},
erlang:unlink(WorkerPID),
%% this will release the pinning of the fold file
Owner ! {range_fold_done, self(), FileName}
end ),
{ok, PID}.

View file

@ -222,9 +222,46 @@ test_tree() ->
end,
ok,
lists:seq(2,10000,1)),
lists:foldl(fun(N,_) ->
ok = lsm_btree:put(Tree,
<<N:128>>, <<"data",N:128>>)
end,
ok,
lists:seq(4000,6000,1)),
{Time,{ok,Count}} = timer:tc(?MODULE, run_fold, [Tree,1000,9000]),
error_logger:info_msg("time to fold: ~p/sec (time=~p, count=~p)~n", [1000000/(Time/Count), Time/1000000, Count]),
ok = lsm_btree:close(Tree).
run_fold(Tree,From,To) ->
{ok, PID} = lsm_btree:range(Tree, <<From:128>>, <<(To+1):128>>),
lists:foreach(fun(N) ->
receive
{fold_result, _, <<N:128>>,_} -> ok
after 1000 ->
error_logger:info_msg("timed out on #~p~n", [N])
end
end,
lists:seq(From,To,1)),
receive
{fold_result, _, <<N:128>>,_} ->
error_logger:info_msg("got fold key #~p! ~n", [N])
after 0 -> ok
end,
receive
{fold_done, _} -> ok
after 1000 ->
error_logger:info_msg("timed out on fond_done! ~n", [])
end,
{ok, To-From}.
%% Command processing
%% ----------------------------------------------------------------------
cmd_close_args(#state { open = Open }) ->