From f56f530d7a0cf9aefa2698ce22be68b5663d47f5 Mon Sep 17 00:00:00 2001 From: Kresten Krab Thorup Date: Thu, 19 Jan 2012 14:25:47 +0100 Subject: [PATCH] Add both sync and async range fold Sync blocks insert/lookup while doing a range query, but still buffers the results in a per-range query process. Async fold runs on a hard link copy of the underlying tree data. This commit and also fixes a number of bugs related to folding; it was not taking nursery data into account. --- src/lsm_btree.erl | 23 ++++++-- src/lsm_btree_fold_worker.erl | 44 +++++++------- src/lsm_btree_level.erl | 104 ++++++++++++++++++++++++---------- src/lsm_btree_nursery.erl | 10 ++++ test/lsm_btree_tests.erl | 29 +++++----- 5 files changed, 144 insertions(+), 66 deletions(-) diff --git a/src/lsm_btree.erl b/src/lsm_btree.erl index a7f98d7..dcb3925 100644 --- a/src/lsm_btree.erl +++ b/src/lsm_btree.erl @@ -5,7 +5,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([open/1, close/1, lookup/2, delete/2, put/3, async_range/3, fold_range/5]). +-export([open/1, close/1, lookup/2, delete/2, put/3, async_range/3, async_fold_range/5, sync_range/3, sync_fold_range/5]). -include("lsm_btree.hrl"). -include_lib("kernel/include/file.hrl"). @@ -35,10 +35,17 @@ delete(Ref,Key) when is_binary(Key) -> put(Ref,Key,Value) when is_binary(Key), is_binary(Value) -> gen_server:call(Ref, {put, Key, Value}). +sync_range(Ref,FromKey,ToKey) when is_binary(FromKey), is_binary(ToKey) -> + gen_server:call(Ref, {sync_range, self(), FromKey, ToKey}). + +sync_fold_range(Ref,Fun,Acc0,FromKey,ToKey) -> + {ok, PID} = sync_range(Ref,FromKey,ToKey), + receive_fold_range(PID,Fun,Acc0). + async_range(Ref,FromKey,ToKey) when is_binary(FromKey), is_binary(ToKey) -> gen_server:call(Ref, {async_range, self(), FromKey, ToKey}). -fold_range(Ref,Fun,Acc0,FromKey,ToKey) -> +async_fold_range(Ref,Fun,Acc0,FromKey,ToKey) -> {ok, PID} = async_range(Ref,FromKey,ToKey), receive_fold_range(PID,Fun,Acc0). @@ -129,8 +136,16 @@ code_change(_OldVsn, State, _Extra) -> -handle_call({async_range, Sender, FromKey, ToKey}, _From, State=#state{ top=TopLevel }) -> - Result = lsm_btree_level:range_fold(TopLevel, Sender, FromKey, ToKey), +handle_call({async_range, Sender, FromKey, ToKey}, _From, State=#state{ top=TopLevel, nursery=Nursery }) -> + {ok, FoldWorkerPID} = lsm_btree_fold_worker:start(Sender), + lsm_btree_nursery:do_level_fold(Nursery, FoldWorkerPID), + Result = lsm_btree_level:async_range(TopLevel, FoldWorkerPID, FromKey, ToKey), + {reply, Result, State}; + +handle_call({sync_range, Sender, FromKey, ToKey}, _From, State=#state{ top=TopLevel, nursery=Nursery }) -> + {ok, FoldWorkerPID} = lsm_btree_fold_worker:start(Sender), + lsm_btree_nursery:do_level_fold(Nursery, FoldWorkerPID), + Result = lsm_btree_level:sync_range(TopLevel, FoldWorkerPID, FromKey, ToKey), {reply, Result, State}; handle_call({put, Key, Value}, _From, State) when is_binary(Key), is_binary(Value) -> diff --git a/src/lsm_btree_fold_worker.erl b/src/lsm_btree_fold_worker.erl index 4bd315c..c792d18 100644 --- a/src/lsm_btree_fold_worker.erl +++ b/src/lsm_btree_fold_worker.erl @@ -33,29 +33,35 @@ -behavior(plain_fsm). -export([data_vsn/0, code_change/3]). +-include("lsm_btree.hrl"). + -record(state, {sendto}). start(SendTo) -> - PID = plain_fsm:spawn_link(?MODULE, - fun() -> - process_flag(trap_exit,true), - initialize(#state{sendto=SendTo}) - end), + PID = plain_fsm:spawn(?MODULE, + fun() -> + process_flag(trap_exit,true), + initialize(#state{sendto=SendTo}, []) + end), {ok, PID}. -initialize(State) -> +initialize(State, PrefixFolders) -> Parent = plain_fsm:info(parent), receive + {prefix, [_]=Folders} -> + initialize(State, Folders); + {initialize, Folders} -> - Initial = [ {PID,undefined} || PID <- Folders ], - fill(State, Initial, Folders); + + Initial = [ {PID,undefined} || PID <- (PrefixFolders ++ Folders) ], + fill(State, Initial, PrefixFolders ++ Folders); %% gen_fsm handling {system, From, Req} -> plain_fsm:handle_system_msg( - From, Req, State, fun(S1) -> initialize(S1) end); + From, Req, State, fun(S1) -> initialize(S1, PrefixFolders) end); {'EXIT', Parent, Reason} -> plain_fsm:parent_EXIT(Reason, State) @@ -86,31 +92,31 @@ fill(State, Values, [PID|Rest]=PIDs) -> fill(State, Values, PIDs) end - end. emit_next(State, []) -> State#state.sendto ! {fold_done, self()}, ok; -emit_next(State, [{PID,{Key,Value}}]) -> - State#state.sendto ! {fold_result, self(), Key, Value}, - fill(State, [{PID,undefined}], [PID]); - emit_next(State, [{FirstPID,FirstKV}|Rest]=Values) -> {{FoundKey, FoundValue}, FillFrom} = - lists:foldl(fun({P,{K1,V1}}, {{K2,_},_}) when K1 < K2 -> - {{K1,V1},[P]}; - ({P,{K,_}}, {{K,V},List}) -> - {{K,V}, [P|List]}; + lists:foldl(fun({P,{K1,_}=KV}, {{K2,_},_}) when K1 < K2 -> + {KV,[P]}; + ({P,{K,_}}, {{K,_}=KV,List}) -> + {KV, [P|List]}; (_, Found) -> Found end, {FirstKV,[FirstPID]}, Rest), - State#state.sendto ! {fold_result, self(), FoundKey, FoundValue}, + case FoundValue of + ?TOMBSTONE -> + ok; + _ -> + State#state.sendto ! {fold_result, self(), FoundKey, FoundValue} + end, fill(State, Values, FillFrom). diff --git a/src/lsm_btree_level.erl b/src/lsm_btree_level.erl index 21fffb0..02286f8 100644 --- a/src/lsm_btree_level.erl +++ b/src/lsm_btree_level.erl @@ -16,12 +16,12 @@ -behavior(plain_fsm). -export([data_vsn/0, code_change/3]). --export([open/3, lookup/2, inject/2, close/1, range_fold/4]). +-export([open/3, lookup/2, inject/2, close/1, async_range/4, sync_range/4]). -include_lib("kernel/include/file.hrl"). -record(state, { - a, b, next, dir, level, inject_done_ref, merge_pid, folding = 0 + a, b, next, dir, level, inject_done_ref, merge_pid, folding = [] }). %%%%% PUBLIC OPERATIONS @@ -48,9 +48,16 @@ close(Ref) -> end. -range_fold(Ref, SendTo, From, To) -> - {ok, FoldWorkerPID} = lsm_btree_fold_worker:start(SendTo), - {ok, Folders} = call(Ref, {init_range_fold, FoldWorkerPID, From, To, []}), + +async_range(Ref, FoldWorkerPID, From, To) -> + proc_lib:spawn(fun() -> + {ok, Folders} = call(Ref, {init_range_fold, FoldWorkerPID, From, To, []}), + FoldWorkerPID ! {initialize, Folders} + end), + {ok, FoldWorkerPID}. + +sync_range(Ref, FoldWorkerPID, From, To) -> + {ok, Folders} = call(Ref, {sync_range_fold, FoldWorkerPID, From, To, []}), FoldWorkerPID ! {initialize, Folders}, {ok, FoldWorkerPID}. @@ -77,7 +84,6 @@ receive_reply(MRef) -> erlang:demonitor(MRef, [flush]), Reply; {'DOWN', MRef, _, _, Reason} -> - error_logger:info_msg("Level dies, reason=~p~n", [Reason]), exit(Reason) end. @@ -185,29 +191,28 @@ main_loop(State = #state{ next=Next }) -> end, ok; - ?REQ(From, {init_range_fold, WorkerPID, FromKey, ToKey, List}) when State#state.folding == 0 -> + ?REQ(From, {init_range_fold, WorkerPID, FromKey, ToKey, List}) when State#state.folding == [] -> case {State#state.a, State#state.b} of {undefined, undefined} -> - NewFolding = 0, + NewFolding = [], NextList = List; {_, undefined} -> - NewFolding = 1, ok = file:make_link(filename("A", State), filename("AF", State)), {ok, PID0} = start_range_fold(filename("AF",State), WorkerPID, FromKey, ToKey), - NextList = [PID0|List]; + NextList = [PID0|List], + NewFolding = [PID0]; {_, _} -> - NewFolding = 2, - ok = file:make_link(filename("A", State), filename("AF", State)), {ok, PID0} = start_range_fold(filename("AF",State), WorkerPID, FromKey, ToKey), ok = file:make_link(filename("B", State), filename("BF", State)), {ok, PID1} = start_range_fold(filename("BF",State), WorkerPID, FromKey, ToKey), - NextList = [PID1,PID0|List] + NextList = [PID1,PID0|List], + NewFolding = [PID1,PID0] end, case Next of @@ -219,9 +224,40 @@ main_loop(State = #state{ next=Next }) -> main_loop(State#state{ folding = NewFolding }); - {range_fold_done, _PID, FoldFileName} -> + {range_fold_done, PID, [_,$F|_]=FoldFileName} -> ok = file:delete(FoldFileName), - main_loop(State#state{ folding = State#state.folding-1 }); + main_loop(State#state{ folding = lists:delete(PID,State#state.folding) }); + + ?REQ(From, {sync_range_fold, WorkerPID, FromKey, ToKey, List}) -> + + case {State#state.a, State#state.b} of + {undefined, undefined} -> + RefList = List; + + {_, undefined} -> + ARef = erlang:make_ref(), + ok = do_range_fold(State#state.a, WorkerPID, ARef, FromKey, ToKey), + RefList = [ARef|List]; + + {_, _} -> + BRef = erlang:make_ref(), + ok = do_range_fold(State#state.b, WorkerPID, BRef, FromKey, ToKey), + + ARef = erlang:make_ref(), + ok = do_range_fold(State#state.a, WorkerPID, ARef, FromKey, ToKey), + + RefList = [ARef,BRef|List] + end, + + case Next of + undefined -> + reply(From, {ok, lists:reverse(RefList)}); + _ -> + Next ! ?REQ(From, {sync_range_fold, WorkerPID, FromKey, ToKey, RefList}) + end, + + main_loop(State); + %% %% The outcome of merging resulted in a file with less than @@ -280,10 +316,16 @@ main_loop(State = #state{ next=Next }) -> {'EXIT', Parent, Reason} -> plain_fsm:parent_EXIT(Reason, State); {'EXIT', _, normal} -> - %% Probably from a merger_pid - which we may have forgotten in the meantime. - main_loop(State); + %% Probably from a merger_pid - which we may have forgotten in the meantime. + main_loop(State); {'EXIT', Pid, Reason} when Pid == State#state.merge_pid -> - restart_merge_then_loop(State#state{merge_pid=undefined}, Reason) + restart_merge_then_loop(State#state{merge_pid=undefined}, Reason); + {'EXIT', PID, _} when [PID] == tl(State#state.folding); + hd(State#state.folding) == PID -> + main_loop(State#state{ folding = lists:delete(PID,State#state.folding) }); + {'EXIT', PID, Reason} -> + error_logger:info_msg("got unexpected exit ~p from ~p~n", [Reason, PID]) + end. do_lookup(_Key, []) -> @@ -356,20 +398,24 @@ start_range_fold(FileName, WorkerPID, FromKey, ToKey) -> proc_lib:spawn( fun() -> erlang:link(WorkerPID), {ok, File} = lsm_btree_reader:open(FileName), - lsm_btree_reader:range_fold(fun(Key,Value,_) -> - WorkerPID ! {level_result, self(), Key, Value}, - ok - end, - ok, - File, - FromKey, ToKey), - - %% tell fold merge worker we're done - WorkerPID ! {level_done, self()}, - + do_range_fold(File, WorkerPID, self(), FromKey, ToKey), erlang:unlink(WorkerPID), %% this will release the pinning of the fold file Owner ! {range_fold_done, self(), FileName} end ), {ok, PID}. + +do_range_fold(BT, WorkerPID, Self, FromKey, ToKey) -> + lsm_btree_reader:range_fold(fun(Key,Value,_) -> + WorkerPID ! {level_result, Self, Key, Value}, + ok + end, + ok, + BT, + FromKey, ToKey), + + %% tell fold merge worker we're done + WorkerPID ! {level_done, Self}, + + ok. diff --git a/src/lsm_btree_nursery.erl b/src/lsm_btree_nursery.erl index 03b80f6..25d0d04 100644 --- a/src/lsm_btree_nursery.erl +++ b/src/lsm_btree_nursery.erl @@ -1,6 +1,7 @@ -module(lsm_btree_nursery). -export([new/1, recover/2, add/3, finish/2, lookup/2, add_maybe_flush/4]). +-export([do_level_fold/2]). -include("lsm_btree.hrl"). -include_lib("kernel/include/file.hrl"). @@ -155,3 +156,12 @@ add_maybe_flush(Key, Value, Nursery=#nursery{ dir=Dir }, Top) -> lsm_btree_nursery:new(Dir) end. +do_level_fold(#nursery{ cache=Cache }, FoldWorkerPID) -> + Ref = erlang:make_ref(), + FoldWorkerPID ! {prefix, [Ref]}, + lists:foreach(fun({Key,Value}) -> + FoldWorkerPID ! {level_result, Ref, Key, Value} + end, + gb_trees:to_list(Cache)), + FoldWorkerPID ! {level_done, Ref}, + ok. diff --git a/test/lsm_btree_tests.erl b/test/lsm_btree_tests.erl index 555e71f..b92e73e 100644 --- a/test/lsm_btree_tests.erl +++ b/test/lsm_btree_tests.erl @@ -30,10 +30,10 @@ full_test_() -> fun () -> ok end, fun (_) -> ok end, [ - {timeout, 120, ?_test(test_qc())}, ?_test(test_tree_simple_1()), ?_test(test_tree_simple_2()), - ?_test(test_tree()) + ?_test(test_tree()), + {timeout, 120, ?_test(test_qc())} ]}. -ifdef(TRIQ). @@ -213,9 +213,9 @@ test_tree_simple_2() -> test_tree() -> -%% application:start(sasl), + application:start(sasl), - {ok, Tree} = lsm_btree:open("simple"), + {ok, Tree} = lsm_btree:open("simple2"), lists:foldl(fun(N,_) -> ok = lsm_btree:put(Tree, <>, <<"data",N:128>>) @@ -229,8 +229,9 @@ test_tree() -> ok, lists:seq(4000,6000,1)), + lsm_btree:delete(Tree, <<1500:128>>), - {Time,{ok,Count}} = timer:tc(?MODULE, run_fold, [Tree,1000,9000]), + {Time,{ok,Count}} = timer:tc(?MODULE, run_fold, [Tree,1000,2000]), error_logger:info_msg("time to fold: ~p/sec (time=~p, count=~p)~n", [1000000/(Time/Count), Time/1000000, Count]), @@ -238,25 +239,25 @@ test_tree() -> ok = lsm_btree:close(Tree). run_fold(Tree,From,To) -> - {ok, PID} = lsm_btree:async_range(Tree, <>, <<(To+1):128>>), - lists:foreach(fun(N) -> + {ok, PID} = lsm_btree:sync_range(Tree, <>, <<(To+1):128>>), + lists:foreach(fun(1500) -> ok; + (N) -> receive {fold_result, PID, <>,_} -> ok - after 1000 -> + after 100 -> error_logger:info_msg("timed out on #~p~n", [N]) end end, lists:seq(From,To,1)), - receive - {fold_result, PID, <>,_} -> - error_logger:info_msg("got fold key #~p! ~n", [N]) - after 0 -> ok - end, receive {fold_done, PID} -> ok after 1000 -> - error_logger:info_msg("timed out on fond_done! ~n", []) + error_logger:info_msg("timed out on fold_done! ~n", []) end, + + %% we should now have no spurious messages + {messages,[]} = erlang:process_info(self(),messages), + {ok, To-From}.