diff --git a/src/lsm_btree.erl b/src/lsm_btree.erl index aaaee1a..6705acc 100644 --- a/src/lsm_btree.erl +++ b/src/lsm_btree.erl @@ -5,7 +5,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([open/1, close/1, lookup/2, delete/2, put/3]). +-export([open/1, close/1, lookup/2, delete/2, put/3, range/3, fold_range/5]). -include("lsm_btree.hrl"). -include_lib("kernel/include/file.hrl"). @@ -30,6 +30,20 @@ delete(Ref,Key) when is_binary(Key) -> put(Ref,Key,Value) when is_binary(Key), is_binary(Value) -> gen_server:call(Ref, {put, Key, Value}). +range(Ref,FromKey,ToKey) when is_binary(FromKey), is_binary(ToKey) -> + gen_server:call(Ref, {range, self(), FromKey, ToKey}). + +fold_range(Ref,Fun,Acc0,FromKey,ToKey) -> + {ok, PID} = range(Ref,FromKey,ToKey), + receive_fold_range(PID,Fun,Acc0). + +receive_fold_range(PID,Fun,Acc0) -> + receive + {fold_result, PID, K,V} -> + receive_fold_range(PID, Fun, Fun(K,V,Acc0)); + {fold_done, PID} -> + Acc0 + end. init([Dir]) -> @@ -110,6 +124,10 @@ code_change(_OldVsn, State, _Extra) -> +handle_call({range, Sender, FromKey, ToKey}, _From, State=#state{ top=TopLevel }) -> + Result = lsm_btree_level:range_fold(TopLevel, Sender, FromKey, ToKey), + {reply, Result, State}; + handle_call({put, Key, Value}, _From, State) when is_binary(Key), is_binary(Value) -> {ok, State2} = do_put(Key, Value, State), {reply, ok, State2}; @@ -133,7 +151,6 @@ handle_call(close, _From, State) -> {ok, State2} = flush_nursery(State), {stop, normal, ok, State2}. - do_put(Key, Value, State=#state{ nursery=Nursery, top=Top }) -> {ok, Nursery2} = lsm_btree_nursery:add_maybe_flush(Key, Value, Nursery, Top), {ok, State#state{ nursery=Nursery2 }}. diff --git a/src/lsm_btree_fold_worker.erl b/src/lsm_btree_fold_worker.erl new file mode 100644 index 0000000..4bd315c --- /dev/null +++ b/src/lsm_btree_fold_worker.erl @@ -0,0 +1,123 @@ +-module(lsm_btree_fold_worker). + +%% +%% This worker is used to merge fold results from individual +%% levels. First, it receives a message +%% +%% {initialize, [LevelWorker, ...]} +%% +%% And then from each LevelWorker, a sequence of +%% +%% {level_result, LevelWorker, Key1, Value} +%% {level_result, LevelWorker, Key2, Value} +%% {level_result, LevelWorker, Key3, Value} +%% {level_result, LevelWorker, Key4, Value} +%% ... +%% {level_done, LevelWorker} +%% +%% The order of level workers in the initialize messge is top-down, +%% which is used to select between same-key messages from different +%% levels. +%% +%% This fold_worker process will then send to a designated SendTo target +%% a similar sequence of messages +%% +%% {fold_result, self(), Key1, Value} +%% {fold_result, self(), Key2, Value} +%% {fold_result, self(), Key3, Value} +%% ... +%% {fold_done, self()}. +%% + +-export([start/1]). +-behavior(plain_fsm). +-export([data_vsn/0, code_change/3]). + +-record(state, {sendto}). + +start(SendTo) -> + PID = plain_fsm:spawn_link(?MODULE, + fun() -> + process_flag(trap_exit,true), + initialize(#state{sendto=SendTo}) + end), + {ok, PID}. + + +initialize(State) -> + + Parent = plain_fsm:info(parent), + receive + {initialize, Folders} -> + Initial = [ {PID,undefined} || PID <- Folders ], + fill(State, Initial, Folders); + + %% gen_fsm handling + {system, From, Req} -> + plain_fsm:handle_system_msg( + From, Req, State, fun(S1) -> initialize(S1) end); + + {'EXIT', Parent, Reason} -> + plain_fsm:parent_EXIT(Reason, State) + end. + + +fill(State, Values, []) -> + emit_next(State, Values); + +fill(State, Values, [PID|Rest]=PIDs) -> + receive + {level_done, PID} -> + fill(State, lists:keydelete(PID, 1, Values), Rest); + {level_result, PID, Key, Value} -> + fill(State, lists:keyreplace(PID, 1, Values, {PID,{Key,Value}}), Rest); + + %% gen_fsm handling + {system, From, Req} -> + plain_fsm:handle_system_msg( + From, Req, State, fun(S1) -> fill(S1, Values, PIDs) end); + + {'EXIT', Parent, Reason}=Msg -> + case plain_fsm:info(parent) == Parent of + true -> + plain_fsm:parent_EXIT(Reason, State); + false -> + error_logger:info_msg("unhandled EXIT message ~p~n", [Msg]), + fill(State, Values, PIDs) + end + + + end. + +emit_next(State, []) -> + State#state.sendto ! {fold_done, self()}, + ok; + +emit_next(State, [{PID,{Key,Value}}]) -> + State#state.sendto ! {fold_result, self(), Key, Value}, + fill(State, [{PID,undefined}], [PID]); + +emit_next(State, [{FirstPID,FirstKV}|Rest]=Values) -> + + {{FoundKey, FoundValue}, FillFrom} = + lists:foldl(fun({P,{K1,V1}}, {{K2,_},_}) when K1 < K2 -> + {{K1,V1},[P]}; + ({P,{K,_}}, {{K,V},List}) -> + {{K,V}, [P|List]}; + (_, Found) -> + Found + end, + {FirstKV,[FirstPID]}, + Rest), + + State#state.sendto ! {fold_result, self(), FoundKey, FoundValue}, + + fill(State, Values, FillFrom). + + +data_vsn() -> + 5. + +code_change(_OldVsn, _State, _Extra) -> + {ok, {#state{}, data_vsn()}}. + diff --git a/src/lsm_btree_level.erl b/src/lsm_btree_level.erl index 30496a2..c93ff45 100644 --- a/src/lsm_btree_level.erl +++ b/src/lsm_btree_level.erl @@ -16,12 +16,12 @@ -behavior(plain_fsm). -export([data_vsn/0, code_change/3]). --export([open/3, lookup/2, inject/2, close/1]). +-export([open/3, lookup/2, inject/2, close/1, range_fold/4]). -include_lib("kernel/include/file.hrl"). -record(state, { - a, b, next, dir, level, inject_done_ref, merge_pid + a, b, next, dir, level, inject_done_ref, merge_pid, folding = 0 }). %%%%% PUBLIC OPERATIONS @@ -43,6 +43,11 @@ inject(Ref, FileName) -> close(Ref) -> call(Ref, close). +range_fold(Ref, SendTo, From, To) -> + {ok, FoldWorkerPID} = lsm_btree_fold_worker:start(SendTo), + {ok, Folders} = call(Ref, {init_range_fold, FoldWorkerPID, From, To, []}), + FoldWorkerPID ! {initialize, Folders}, + {ok, FoldWorkerPID}. %%%%% INTERNAL @@ -90,6 +95,10 @@ initialize(State) -> %% remove old merge file file:delete( filename("X",State)), + %% remove old fold files (hard links to A/B used during fold) + file:delete( filename("AF",State)), + file:delete( filename("BF",State)), + case file:read_file_info(CFileName) of {ok, _} -> @@ -163,6 +172,44 @@ main_loop(State = #state{ next=Next }) -> reply(From, ok), ok; + ?REQ(From, {init_range_fold, WorkerPID, FromKey, ToKey, List}) when State#state.folding == 0 -> + + case {State#state.a, State#state.b} of + {undefined, undefined} -> + NewFolding = 0, + NextList = List; + + {_, undefined} -> + NewFolding = 1, + ok = file:make_link(filename("A", State), filename("AF", State)), + {ok, PID0} = start_range_fold(filename("AF",State), WorkerPID, FromKey, ToKey), + NextList = [PID0|List]; + + {_, _} -> + NewFolding = 2, + + ok = file:make_link(filename("A", State), filename("AF", State)), + {ok, PID0} = start_range_fold(filename("AF",State), WorkerPID, FromKey, ToKey), + + ok = file:make_link(filename("B", State), filename("BF", State)), + {ok, PID1} = start_range_fold(filename("BF",State), WorkerPID, FromKey, ToKey), + + NextList = [PID1,PID0|List] + end, + + case Next of + undefined -> + reply(From, {ok, lists:reverse(NextList)}); + _ -> + Next ! ?REQ(From, {init_range_fold, WorkerPID, FromKey, ToKey, NextList}) + end, + + main_loop(State#state{ folding = NewFolding }); + + {range_fold_done, _PID, FoldFileName} -> + ok = file:delete(FoldFileName), + main_loop(State#state{ folding = State#state.folding-1 }); + %% %% The outcome of merging resulted in a file with less than %% level #entries, so we keep it at this level @@ -289,3 +336,27 @@ close_a_and_b(State) -> filename(PFX, State) -> filename:join(State#state.dir, PFX ++ "-" ++ integer_to_list(State#state.level) ++ ".data"). + +start_range_fold(FileName, WorkerPID, FromKey, ToKey) -> + Owner = self(), + PID = + proc_lib:spawn( fun() -> + erlang:link(WorkerPID), + {ok, File} = lsm_btree_reader:open(FileName), + lsm_btree_reader:range_fold(fun(Key,Value,_) -> + WorkerPID ! {level_result, self(), Key, Value}, + ok + end, + ok, + File, + FromKey, ToKey), + + %% tell fold merge worker we're done + WorkerPID ! {level_done, self()}, + + erlang:unlink(WorkerPID), + + %% this will release the pinning of the fold file + Owner ! {range_fold_done, self(), FileName} + end ), + {ok, PID}. diff --git a/test/lsm_btree_tests.erl b/test/lsm_btree_tests.erl index 2558274..ca0614d 100644 --- a/test/lsm_btree_tests.erl +++ b/test/lsm_btree_tests.erl @@ -222,9 +222,46 @@ test_tree() -> end, ok, lists:seq(2,10000,1)), + lists:foldl(fun(N,_) -> + ok = lsm_btree:put(Tree, + <>, <<"data",N:128>>) + end, + ok, + lists:seq(4000,6000,1)), + + + {Time,{ok,Count}} = timer:tc(?MODULE, run_fold, [Tree,1000,9000]), + + error_logger:info_msg("time to fold: ~p/sec (time=~p, count=~p)~n", [1000000/(Time/Count), Time/1000000, Count]), + ok = lsm_btree:close(Tree). +run_fold(Tree,From,To) -> + {ok, PID} = lsm_btree:range(Tree, <>, <<(To+1):128>>), + lists:foreach(fun(N) -> + receive + {fold_result, _, <>,_} -> ok + after 1000 -> + error_logger:info_msg("timed out on #~p~n", [N]) + end + end, + lists:seq(From,To,1)), + receive + {fold_result, _, <>,_} -> + error_logger:info_msg("got fold key #~p! ~n", [N]) + after 0 -> ok + end, + receive + {fold_done, _} -> ok + after 1000 -> + error_logger:info_msg("timed out on fond_done! ~n", []) + end, + {ok, To-From}. + + + + %% Command processing %% ---------------------------------------------------------------------- cmd_close_args(#state { open = Open }) ->