diff --git a/enable-lsm_btree b/enable-lsm_btree index 9669fb2..6726bca 100755 --- a/enable-lsm_btree +++ b/enable-lsm_btree @@ -32,7 +32,7 @@ if [ -d dev ]; then done fi -rebar get-deps +./rebar get-deps file=./deps/riak_kv/src/riak_kv.app.src if ! grep -q lsm_btree $file ; then @@ -48,7 +48,7 @@ if ! grep -q lsm_btree $file ; then perl -i.orig -pe '/\bsext\b/ && print qq( {lsm_btree, ".*", {git, "git\@github.com:basho/lsm_btree.git", "master"}},\n)' $file fi -rebar get-deps +./rebar get-deps rm -rf dev make all devrel diff --git a/include/lsm_btree.hrl b/include/lsm_btree.hrl index dbc3bfe..09c399c 100644 --- a/include/lsm_btree.hrl +++ b/include/lsm_btree.hrl @@ -6,6 +6,15 @@ %% -define(BTREE_ASYNC_CHUNK_SIZE, 100). +%% +%% The btree_range structure is a bit assymetric, here is why: +%% +%% from_key=<<>> is "less than" any other key, hence we don't need to +%% handle from_key=undefined to support an open-ended start of the +%% interval. For to_key, we cannot (statically) construct a key +%% which is > any possible key, hence we need to allow to_key=undefined +%% as a token of an interval that has no upper limit. +%% -record(btree_range, { from_key = <<>> :: binary(), from_inclusive = true :: boolean(), to_key :: binary() | undefined, diff --git a/levelpresence.sh b/levelpresence.sh index af9d573..0e1d674 100644 --- a/levelpresence.sh +++ b/levelpresence.sh @@ -11,6 +11,8 @@ function periodic() { echo -n " " elif ! [ -f "B-$i.data" ] ; then echo -n "-" + elif ! [ -f "C-$i.data" ] ; then + echo -n "#" elif ! [ -f "X-$i.data" ] ; then echo -n "=" else @@ -32,8 +34,10 @@ function dynamic() { s="$s " elif ! [ -f "B-$i.data" ] ; then s="$s-" - elif ! [ -f "X-$i.data" ] ; then + elif ! [ -f "C-$i.data" ] ; then s="$s=" + elif ! [ -f "X-$i.data" ] ; then + s="$s%" else s="$s*" fi diff --git a/src/lsm_btree.erl b/src/lsm_btree.erl index 67f6adb..d193417 100644 --- a/src/lsm_btree.erl +++ b/src/lsm_btree.erl @@ -79,6 +79,8 @@ sync_fold_range(Ref,Fun,Acc0,Range) -> sync_receive_fold_range(MRef, PID,Fun,Acc0) -> receive + + %% receive one K/V from fold_worker {fold_result, PID, K,V} -> case try @@ -96,6 +98,25 @@ sync_receive_fold_range(MRef, PID,Fun,Acc0) -> erlang:exit(PID, kill), drain_worker_and_throw(MRef,PID,Exit) end; + + %% receive multiple KVs from fold_worker + {fold_results, PID, KVs} -> + case + try + {ok, kvfoldl(Fun,Acc0,KVs)} + catch + Class:Exception -> + lager:warning("Exception in lsm_btree fold: ~p", [Exception]), + {'EXIT', Class, Exception, erlang:get_stacktrace()} + end + of + {ok, Acc1} -> + sync_receive_fold_range(MRef, PID, Fun, Acc1); + Exit -> + %% kill the fold worker ... + erlang:exit(PID, kill), + drain_worker_and_throw(MRef,PID,Exit) + end; {fold_limit, PID, _} -> erlang:demonitor(MRef, [flush]), Acc0; @@ -106,6 +127,26 @@ sync_receive_fold_range(MRef, PID,Fun,Acc0) -> error({fold_worker_died, Reason}) end. +kvfoldl(_Fun,Acc0,[]) -> + Acc0; +kvfoldl(Fun,Acc0,[{K,V}|T]) -> + kvfoldl(Fun, Fun(K,V,Acc0), T). + +drain_worker_and_throw(MRef, PID, ExitTuple) -> + receive + {fold_result, PID, _, _} -> + drain_worker_and_throw(MRef, PID, ExitTuple); + {'DOWN', MRef, _, _, _} -> + raise(ExitTuple); + {fold_limit, PID, _} -> + erlang:demonitor(MRef, [flush]), + raise(ExitTuple); + {fold_done, PID} -> + erlang:demonitor(MRef, [flush]), + raise(ExitTuple) + end. + + raise({'EXIT', Class, Exception, Trace}) -> erlang:raise(Class, Exception, Trace). @@ -113,6 +154,8 @@ drain_worker_and_throw(MRef, PID, ExitTuple) -> receive {fold_result, PID, _, _} -> drain_worker_and_throw(MRef, PID, ExitTuple); + {fold_results, PID, _} -> + drain_worker_and_throw(MRef, PID, ExitTuple); {'DOWN', MRef, _, _, _} -> raise(ExitTuple); {fold_limit, PID, _} -> @@ -215,8 +258,10 @@ handle_cast(Info,State) -> %% premature delete -> cleanup +terminate(normal,_State) -> + ok; terminate(_Reason,_State) -> - % error_logger:info_msg("got terminate(~p,~p)~n", [Reason,State]), + error_logger:info_msg("got terminate(~p,~p)~n", [_Reason,_State]), % flush_nursery(State), ok. diff --git a/src/lsm_btree.hrl b/src/lsm_btree.hrl index 17107d3..9311d44 100644 --- a/src/lsm_btree.hrl +++ b/src/lsm_btree.hrl @@ -25,8 +25,8 @@ -author('Kresten Krab Thorup '). -%% smallest levels are 8192 entries --define(TOP_LEVEL, 5). +%% smallest levels are 128 entries +-define(TOP_LEVEL, 7). -define(BTREE_SIZE(Level), (1 bsl (Level))). -define(TOMBSTONE, 'deleted'). diff --git a/src/lsm_btree_fold_worker.erl b/src/lsm_btree_fold_worker.erl index b8cfeb1..b89dcf7 100644 --- a/src/lsm_btree_fold_worker.erl +++ b/src/lsm_btree_fold_worker.erl @@ -67,6 +67,7 @@ start(SendTo) -> PID = plain_fsm:spawn(?MODULE, fun() -> process_flag(trap_exit,true), + link(SendTo), initialize(#state{sendto=SendTo}, []) end), {ok, PID}. @@ -124,10 +125,9 @@ fill(State, Values, [PID|Rest]=PIDs) -> emit_next(State, []) -> State#state.sendto ! {fold_done, self()}, - ok; + end_of_fold(State); emit_next(State, [{FirstPID,FirstKV}|Rest]=Values) -> - case lists:foldl(fun({P,{K1,_}=KV}, {{K2,_},_}) when K1 < K2 -> {KV,[P]}; @@ -142,12 +142,16 @@ emit_next(State, [{FirstPID,FirstKV}|Rest]=Values) -> {{_, ?TOMBSTONE}, FillFrom} -> fill(State, Values, FillFrom); {{Key, limit}, _} -> - State#state.sendto ! {fold_limit, self(), Key}; + State#state.sendto ! {fold_limit, self(), Key}, + end_of_fold(State); {{FoundKey, FoundValue}, FillFrom} -> State#state.sendto ! {fold_result, self(), FoundKey, FoundValue}, fill(State, Values, FillFrom) end. +end_of_fold(State) -> + unlink(State#state.sendto), + ok. data_vsn() -> 5. diff --git a/src/lsm_btree_level.erl b/src/lsm_btree_level.erl index b6f9945..3bb062c 100644 --- a/src/lsm_btree_level.erl +++ b/src/lsm_btree_level.erl @@ -25,7 +25,8 @@ -module(lsm_btree_level). -author('Kresten Krab Thorup '). --include("lsm_btree.hrl"). +-include("include/lsm_btree.hrl"). +-include("src/lsm_btree.hrl"). %% %% Manages a "pair" of lsm_index (or rathern, 0, 1 or 2), and governs @@ -41,22 +42,25 @@ -behavior(plain_fsm). -export([data_vsn/0, code_change/3]). --export([open/3, lookup/2, inject/2, close/1, async_range/3, sync_range/3]). +-export([open/3, lookup/2, inject/2, close/1, async_range/3, sync_range/3, incremental_merge/2]). -include_lib("kernel/include/file.hrl"). -record(state, { - a, b, next, dir, level, inject_done_ref, merge_pid, folding = [] + a, b, c, next, dir, level, inject_done_ref, merge_pid, folding = [], + step_next_ref, step_caller, step_merge_ref }). %%%%% PUBLIC OPERATIONS open(Dir,Level,Next) when Level>0 -> - {ok, plain_fsm:spawn_link(?MODULE, + PID = plain_fsm:spawn_link(?MODULE, fun() -> process_flag(trap_exit,true), initialize(#state{dir=Dir,level=Level,next=Next}) - end)}. + end), + incremental_merge(PID, 2*?BTREE_SIZE(?TOP_LEVEL)), + {ok, PID}. lookup(Ref, Key) -> call(Ref, {lookup, Key}). @@ -65,6 +69,9 @@ inject(Ref, FileName) -> Result = call(Ref, {inject, FileName}), Result. +incremental_merge(Ref,HowMuch) -> + call(Ref, {incremental_merge, HowMuch}). + close(Ref) -> try call(Ref, close) @@ -77,13 +84,13 @@ close(Ref) -> async_range(Ref, FoldWorkerPID, Range) -> proc_lib:spawn(fun() -> - {ok, Folders} = call(Ref, {init_range_fold, FoldWorkerPID, Range, []}), + {ok, Folders} = call(Ref, {init_snapshot_range_fold, FoldWorkerPID, Range, []}), FoldWorkerPID ! {initialize, Folders} end), {ok, FoldWorkerPID}. sync_range(Ref, FoldWorkerPID, Range) -> - {ok, Folders} = call(Ref, {sync_range_fold, FoldWorkerPID, Range, []}), + {ok, Folders} = call(Ref, {init_blocking_range_fold, FoldWorkerPID, Range, []}), FoldWorkerPID ! {initialize, Folders}, {ok, FoldWorkerPID}. @@ -123,29 +130,49 @@ reply({PID,Ref}, Reply) -> initialize(State) -> + + try + initialize2(State) + catch + Class:Ex when not (Class == exit andalso Ex == normal) -> + error_logger:error_msg("crash2: ~p:~p ~p~n", [Class,Ex,erlang:get_stacktrace()]) + end. + +initialize2(State) -> % error_logger:info_msg("in ~p level=~p~n", [self(), State]), AFileName = filename("A",State), BFileName = filename("B",State), CFileName = filename("C",State), + MFileName = filename("M",State), %% remove old merge file file:delete( filename("X",State)), - %% remove old fold files (hard links to A/B used during fold) + %% remove old fold files (hard links to A/B/C used during fold) file:delete( filename("AF",State)), file:delete( filename("BF",State)), + file:delete( filename("CF",State)), - case file:read_file_info(CFileName) of + case file:read_file_info(MFileName) of {ok, _} -> %% recover from post-merge crash file:delete(AFileName), file:delete(BFileName), - ok = file:rename(CFileName, AFileName), + ok = file:rename(MFileName, AFileName), - {ok, BT} = lsm_btree_reader:open(CFileName, random), - main_loop(State#state{ a= BT, b=undefined }); + {ok, BT} = lsm_btree_reader:open(AFileName, random), + + case file:read_file_info(CFileName) of + {ok, _} -> + file:rename(CFileName, BFileName), + {ok, BT2} = lsm_btree_reader:open(BFileName, random), + check_begin_merge_then_loop(State#state{ a= BT, b=BT2 }); + + {error, enoent} -> + main_loop(State#state{ a= BT, b=undefined }) + end; {error, enoent} -> case file:read_file_info(BFileName) of @@ -153,7 +180,14 @@ initialize(State) -> {ok, BT1} = lsm_btree_reader:open(AFileName, random), {ok, BT2} = lsm_btree_reader:open(BFileName, random), - check_begin_merge_then_loop(State#state{ a=BT1, b=BT2 }); + case file:read_file_info(CFileName) of + {ok, _} -> + {ok, BT3} = lsm_btree_reader:open(CFileName, random); + {error, enoent} -> + BT3 = undefined + end, + + check_begin_merge_then_loop(State#state{ a=BT1, b=BT2, c=BT3 }); {error, enoent} -> @@ -179,7 +213,7 @@ main_loop(State = #state{ next=Next }) -> Parent = plain_fsm:info(parent), receive ?REQ(From, {lookup, Key})=Req -> - case do_lookup(Key, [State#state.b, State#state.a, Next]) of + case do_lookup(Key, [State#state.c, State#state.b, State#state.a, Next]) of not_found -> reply(From, not_found); {found, Result} -> @@ -189,22 +223,107 @@ main_loop(State = #state{ next=Next }) -> end, main_loop(State); - ?REQ(From, {inject, FileName}) when State#state.b == undefined -> - if State#state.a == undefined -> + ?REQ(From, {inject, FileName}) when State#state.c == undefined -> + case {State#state.a, State#state.b} of + {undefined, undefined} -> ToFileName = filename("A",State), SetPos = #state.a; - true -> + {_, undefined} -> ToFileName = filename("B",State), - SetPos = #state.b + SetPos = #state.b; + {_, _} -> + ToFileName = filename("C",State), + SetPos = #state.c end, ok = file:rename(FileName, ToFileName), {ok, BT} = lsm_btree_reader:open(ToFileName, random), + reply(From, ok), check_begin_merge_then_loop(setelement(SetPos, State, BT)); + + %% replies OK when there is no current step in progress + ?REQ(From, {incremental_merge, HowMuch}) + when State#state.step_merge_ref == undefined, + State#state.step_next_ref == undefined -> + reply(From, ok), + self() ! ?REQ(undefined, {step, HowMuch}), + main_loop(State); + + %% accept step any time there is not an outstanding step + ?REQ(StepFrom, {step, HowMuch}) + when State#state.step_merge_ref == undefined, + State#state.step_caller == undefined, + State#state.step_next_ref == undefined + -> + + %% delegate the step request to the next level + if Next =:= undefined -> + DelegateRef = undefined; + true -> + DelegateRef = send_request(Next, {step, HowMuch}) + end, + + %% if there is a merge worker, send him a step message + case State#state.merge_pid of + undefined -> + MergeRef = undefined; + MergePID -> + MergeRef = monitor(process, MergePID), + MergePID ! {step, {self(), MergeRef}, HowMuch} + end, + + if (Next =:= undefined) andalso (MergeRef =:= undefined) -> + %% nothing to do ... just return OK + State2 = reply_step_ok(State#state { step_caller = StepFrom }), + main_loop(State2); + true -> + main_loop(State#state{ step_next_ref=DelegateRef, + step_caller=StepFrom, + step_merge_ref=MergeRef + }) + end; + + {MRef, step_done} when MRef == State#state.step_merge_ref -> + demonitor(MRef, [flush]), + + case State#state.step_next_ref of + undefined -> + State2 = reply_step_ok(State); + _ -> + State2 = State + end, + + main_loop(State2#state{ step_merge_ref=undefined }); + + {'DOWN', MRef, _, _, _} when MRef == State#state.step_merge_ref -> + + %% current merge worker died (or just finished) + + case State#state.step_next_ref of + undefined -> + State2 = reply_step_ok(State); + _ -> + State2 = State + end, + + main_loop(State2#state{ step_merge_ref=undefined }); + + + ?REPLY(MRef, ok) + when MRef =:= State#state.step_next_ref, + State#state.step_merge_ref =:= undefined -> + + %% this applies when we receive an OK from the next level, + %% and we have finished the incremental merge at this level + + State2 = reply_step_ok(State), + main_loop(State2#state{ step_next_ref=undefined }); + ?REQ(From, close) -> close_if_defined(State#state.a), close_if_defined(State#state.b), + close_if_defined(State#state.c), stop_if_defined(State#state.merge_pid), reply(From, ok), @@ -217,69 +336,94 @@ main_loop(State = #state{ next=Next }) -> end, ok; - ?REQ(From, {init_range_fold, WorkerPID, Range, List}) when State#state.folding == [] -> + ?REQ(From, {init_snapshot_range_fold, WorkerPID, Range, List}) when State#state.folding == [] -> - case {State#state.a, State#state.b} of - {undefined, undefined} -> - NewFolding = [], + case {State#state.a, State#state.b, State#state.c} of + {undefined, undefined, undefined} -> + FoldingPIDs = [], NextList = List; - {_, undefined} -> + {_, undefined, undefined} -> ok = file:make_link(filename("A", State), filename("AF", State)), {ok, PID0} = start_range_fold(filename("AF",State), WorkerPID, Range), NextList = [PID0|List], - NewFolding = [PID0]; + FoldingPIDs = [PID0]; - {_, _} -> + {_, _, undefined} -> ok = file:make_link(filename("A", State), filename("AF", State)), - {ok, PID0} = start_range_fold(filename("AF",State), WorkerPID, Range), + {ok, PIDA} = start_range_fold(filename("AF",State), WorkerPID, Range), ok = file:make_link(filename("B", State), filename("BF", State)), - {ok, PID1} = start_range_fold(filename("BF",State), WorkerPID, Range), + {ok, PIDB} = start_range_fold(filename("BF",State), WorkerPID, Range), - NextList = [PID1,PID0|List], - NewFolding = [PID1,PID0] + NextList = [PIDA,PIDB|List], + FoldingPIDs = [PIDB,PIDA]; + + {_, _, _} -> + ok = file:make_link(filename("A", State), filename("AF", State)), + {ok, PIDA} = start_range_fold(filename("AF",State), WorkerPID, Range), + + ok = file:make_link(filename("B", State), filename("BF", State)), + {ok, PIDB} = start_range_fold(filename("BF",State), WorkerPID, Range), + + ok = file:make_link(filename("C", State), filename("CF", State)), + {ok, PIDC} = start_range_fold(filename("CF",State), WorkerPID, Range), + + NextList = [PIDA,PIDB,PIDC|List], + FoldingPIDs = [PIDC,PIDB,PIDA] end, case Next of undefined -> reply(From, {ok, lists:reverse(NextList)}); _ -> - Next ! ?REQ(From, {init_range_fold, WorkerPID, Range, NextList}) + Next ! ?REQ(From, {init_snapshot_range_fold, WorkerPID, Range, NextList}) end, - main_loop(State#state{ folding = NewFolding }); + main_loop(State#state{ folding = FoldingPIDs }); {range_fold_done, PID, [_,$F|_]=FoldFileName} -> ok = file:delete(FoldFileName), main_loop(State#state{ folding = lists:delete(PID,State#state.folding) }); - ?REQ(From, {sync_range_fold, WorkerPID, Range, List}) -> + ?REQ(From, {init_blocking_range_fold, WorkerPID, Range, List}) -> - case {State#state.a, State#state.b} of - {undefined, undefined} -> + case {State#state.a, State#state.b, State#state.c} of + {undefined, undefined, undefined} -> RefList = List; - {_, undefined} -> + {_, undefined, undefined} -> ARef = erlang:make_ref(), ok = do_range_fold(State#state.a, WorkerPID, ARef, Range), RefList = [ARef|List]; - {_, _} -> + {_, _, undefined} -> BRef = erlang:make_ref(), ok = do_range_fold(State#state.b, WorkerPID, BRef, Range), ARef = erlang:make_ref(), ok = do_range_fold(State#state.a, WorkerPID, ARef, Range), - RefList = [ARef,BRef|List] + RefList = [ARef,BRef|List]; + + {_, _, _} -> + CRef = erlang:make_ref(), + ok = do_range_fold(State#state.c, WorkerPID, CRef, Range), + + BRef = erlang:make_ref(), + ok = do_range_fold(State#state.b, WorkerPID, BRef, Range), + + ARef = erlang:make_ref(), + ok = do_range_fold(State#state.a, WorkerPID, ARef, Range), + + RefList = [ARef,BRef,CRef|List] end, case Next of undefined -> reply(From, {ok, lists:reverse(RefList)}); _ -> - Next ! ?REQ(From, {sync_range_fold, WorkerPID, Range, RefList}) + Next ! ?REQ(From, {init_blocking_range_fold, WorkerPID, Range, RefList}) end, main_loop(State); @@ -291,19 +435,29 @@ main_loop(State = #state{ next=Next }) -> %% {merge_done, Count, OutFileName} when Count =< ?BTREE_SIZE(State#state.level) -> - % first, rename the tmp file to C, so recovery will pick it up - CFileName = filename("C",State), - ok = file:rename(OutFileName, CFileName), + % first, rename the tmp file to M, so recovery will pick it up + MFileName = filename("M",State), + ok = file:rename(OutFileName, MFileName), % then delete A and B (if we crash now, C will become the A file) - {ok, State2} = close_a_and_b(State), + {ok, State2} = close_and_delete_a_and_b(State), - % then, rename C to A, and open it + % then, rename M to A, and open it AFileName = filename("A",State2), - ok = file:rename(CFileName, AFileName), + ok = file:rename(MFileName, AFileName), {ok, BT} = lsm_btree_reader:open(AFileName, random), - main_loop(State2#state{ a=BT, b=undefined, merge_pid=undefined }); + % iff there is a C file, then move it to B position + % TODO: consider recovery for this + case State#state.c of + undefined -> + main_loop(State2#state{ a=BT, b=undefined, merge_pid=undefined }); + TreeFile -> + file:rename(filename("C",State2), filename("B", State2)), + check_begin_merge_then_loop(State2#state{ a=BT, b=TreeFile, c=undefined, + merge_pid=undefined }) + + end; %% %% We need to push the output of merging to the next level @@ -317,6 +471,8 @@ main_loop(State = #state{ next=Next }) -> State end, + %% no need to rename it since we don't accept new injects + MRef = send_request(State1#state.next, {inject, OutFileName}), main_loop(State1#state{ inject_done_ref = MRef, merge_pid=undefined }); @@ -325,14 +481,25 @@ main_loop(State = #state{ next=Next }) -> %% ?REPLY(MRef, ok) when MRef =:= State#state.inject_done_ref -> erlang:demonitor(MRef, [flush]), - {ok, State2} = close_a_and_b(State), - main_loop(State2#state{ inject_done_ref=undefined }); + {ok, State2} = close_and_delete_a_and_b(State), + + % if there is a "C" file, then move it to "A" position. + case State2#state.c of + undefined -> + State3=State2; + TreeFile -> + %% TODO: on what OS's is it ok to rename an open file? + ok = file:rename(filename("C", State2), filename("A", State2)), + State3 = State2#state{ a = TreeFile, b=undefined, c=undefined } + end, + + main_loop(State3#state{ inject_done_ref=undefined }); %% %% Our successor died! %% - {'DOWN', MRef, _, _, Reason} when MRef =:= State#state.inject_done_ref -> - exit(Reason); + {'DOWN', MRef, _, _, Reason} when MRef =:= State#state.inject_done_ref -> + exit(Reason); %% gen_fsm handling {system, From, Req} -> @@ -354,6 +521,13 @@ main_loop(State = #state{ next=Next }) -> end. +reply_step_ok(State) -> + case State#state.step_caller of + undefined -> ok; + _ -> reply(State#state.step_caller, ok) + end, + State#state{ step_caller=undefined }. + do_lookup(_Key, []) -> not_found; do_lookup(_Key, [Pid]) when is_pid(Pid) -> @@ -404,7 +578,7 @@ begin_merge(State) -> {ok, MergePID}. -close_a_and_b(State) -> +close_and_delete_a_and_b(State) -> AFileName = filename("A",State), BFileName = filename("B",State), @@ -435,19 +609,23 @@ start_range_fold(FileName, WorkerPID, Range) -> end ), {ok, PID}. -do_range_fold(BT, WorkerPID, Self, Range) -> +-spec do_range_fold(BT :: lsm_btree_reader:read_file(), + WorkerPID :: pid(), + SelfOrRef :: pid() | reference(), + Range :: #btree_range{} ) -> ok. +do_range_fold(BT, WorkerPID, SelfOrRef, Range) -> case lsm_btree_reader:range_fold(fun(Key,Value,_) -> - WorkerPID ! {level_result, Self, Key, Value}, + WorkerPID ! {level_result, SelfOrRef, Key, Value}, ok end, ok, BT, Range) of {limit, _, LastKey} -> - WorkerPID ! {level_limit, Self, LastKey}; + WorkerPID ! {level_limit, SelfOrRef, LastKey}; {done, _} -> %% tell fold merge worker we're done - WorkerPID ! {level_done, Self} + WorkerPID ! {level_done, SelfOrRef} end, ok. diff --git a/src/lsm_btree_merger.erl b/src/lsm_btree_merger.erl index abc0854..af6939c 100644 --- a/src/lsm_btree_merger.erl +++ b/src/lsm_btree_merger.erl @@ -53,7 +53,7 @@ merge(A,B,C, Size, IsLastLevel) -> {node, AKVs} = lsm_btree_reader:first_node(BT1), {node, BKVs} = lsm_btree_reader:first_node(BT2), - {ok, Count, Out2} = scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, 0), + {ok, Count, Out2} = scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, 0, {0, none}), %% finish stream tree ok = lsm_btree_reader:close(BT1), @@ -68,24 +68,42 @@ merge(A,B,C, Size, IsLastLevel) -> {ok, Count}. +step({N, From}) -> + {N-1, From}. -scan(BT1, BT2, Out, IsLastLevel, [], BKVs, Count) -> +scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, {0, FromPID}) -> + case FromPID of + none -> + ok; + {PID, Ref} -> + PID ! {Ref, step_done} + end, + +% error_logger:info_msg("waiting for step in ~p~n", [self()]), + + receive + {step, From, HowMany} -> +% error_logger:info_msg("got step ~p,~p in ~p~n", [From,HowMany, self()]), + scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, {HowMany, From}) + end; + +scan(BT1, BT2, Out, IsLastLevel, [], BKVs, Count, Step) -> case lsm_btree_reader:next_node(BT1) of {node, AKVs} -> - scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count); + scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, Step); end_of_data -> - scan_only(BT2, Out, IsLastLevel, BKVs, Count) + scan_only(BT2, Out, IsLastLevel, BKVs, Count, Step) end; -scan(BT1, BT2, Out, IsLastLevel, AKVs, [], Count) -> +scan(BT1, BT2, Out, IsLastLevel, AKVs, [], Count, Step) -> case lsm_btree_reader:next_node(BT2) of {node, BKVs} -> - scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count); + scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, Step); end_of_data -> - scan_only(BT1, Out, IsLastLevel, AKVs, Count) + scan_only(BT1, Out, IsLastLevel, AKVs, Count, Step) end; -scan(BT1, BT2, Out, IsLastLevel, [{Key1,Value1}|AT]=AKVs, [{Key2,Value2}|BT]=BKVs, Count) -> +scan(BT1, BT2, Out, IsLastLevel, [{Key1,Value1}|AT]=AKVs, [{Key2,Value2}|BT]=BKVs, Count, Step) -> if Key1 < Key2 -> case ?LOCAL_WRITER of true -> @@ -94,7 +112,7 @@ scan(BT1, BT2, Out, IsLastLevel, [{Key1,Value1}|AT]=AKVs, [{Key2,Value2}|BT]=BKV ok = lsm_btree_writer:add(Out2=Out, Key1, Value1) end, - scan(BT1, BT2, Out2, IsLastLevel, AT, BKVs, Count+1); + scan(BT1, BT2, Out2, IsLastLevel, AT, BKVs, Count+1, step(Step)); Key2 < Key1 -> case ?LOCAL_WRITER of @@ -103,10 +121,10 @@ scan(BT1, BT2, Out, IsLastLevel, [{Key1,Value1}|AT]=AKVs, [{Key2,Value2}|BT]=BKV false -> ok = lsm_btree_writer:add(Out2=Out, Key2, Value2) end, - scan(BT1, BT2, Out2, IsLastLevel, AKVs, BT, Count+1); + scan(BT1, BT2, Out2, IsLastLevel, AKVs, BT, Count+1, step(Step)); (?TOMBSTONE =:= Value2) and (true =:= IsLastLevel) -> - scan(BT1, BT2, Out, IsLastLevel, AT, BT, Count); + scan(BT1, BT2, Out, IsLastLevel, AT, BT, Count, step(Step)); true -> case ?LOCAL_WRITER of @@ -115,25 +133,25 @@ scan(BT1, BT2, Out, IsLastLevel, [{Key1,Value1}|AT]=AKVs, [{Key2,Value2}|BT]=BKV false -> ok = lsm_btree_writer:add(Out2=Out, Key2, Value2) end, - scan(BT1, BT2, Out2, IsLastLevel, AT, BT, Count+1) + scan(BT1, BT2, Out2, IsLastLevel, AT, BT, Count+1, step(Step)) end. -scan_only(BT, Out, IsLastLevel, [], Count) -> +scan_only(BT, Out, IsLastLevel, [], Count, Step) -> case lsm_btree_reader:next_node(BT) of {node, KVs} -> - scan_only(BT, Out, IsLastLevel, KVs, Count); + scan_only(BT, Out, IsLastLevel, KVs, Count, step(Step)); end_of_data -> {ok, Count, Out} end; -scan_only(BT, Out, true, [{_,?TOMBSTONE}|Rest], Count) -> - scan_only(BT, Out, true, Rest, Count); +scan_only(BT, Out, true, [{_,?TOMBSTONE}|Rest], Count, Step) -> + scan_only(BT, Out, true, Rest, Count, step(Step)); -scan_only(BT, Out, IsLastLevel, [{Key,Value}|Rest], Count) -> +scan_only(BT, Out, IsLastLevel, [{Key,Value}|Rest], Count, Step) -> case ?LOCAL_WRITER of true -> {noreply, Out2} = lsm_btree_writer:handle_cast({add, Key, Value}, Out); false -> ok = lsm_btree_writer:add(Out2=Out, Key, Value) end, - scan_only(BT, Out2, IsLastLevel, Rest, Count+1). + scan_only(BT, Out2, IsLastLevel, Rest, Count+1, step(Step)). diff --git a/src/lsm_btree_nursery.erl b/src/lsm_btree_nursery.erl index 2ce742e..00055d9 100644 --- a/src/lsm_btree_nursery.erl +++ b/src/lsm_btree_nursery.erl @@ -162,7 +162,13 @@ finish(#nursery{ dir=Dir, cache=Cache, log_file=LogFile, total_size=_TotalSize, % [ gb_trees:size(Cache), TotalSize, FileInfo#file_info.size ]), %% inject the B-Tree (blocking RPC) - ok = lsm_btree_level:inject(TopLevel, BTreeFileName); + ok = lsm_btree_level:inject(TopLevel, BTreeFileName), + + %% issue some work if this is a top-level inject (blocks until previous such + %% incremental merge is finished). + lsm_btree_level:incremental_merge(TopLevel, ?BTREE_SIZE(?TOP_LEVEL)), + ok; + _ -> ok end, diff --git a/src/lsm_btree_reader.erl b/src/lsm_btree_reader.erl index 6cf6c88..00cd7a1 100644 --- a/src/lsm_btree_reader.erl +++ b/src/lsm_btree_reader.erl @@ -35,9 +35,14 @@ -record(node, { level, members=[] }). -record(index, {file, root, bloom}). +-type read_file() :: #index{}. + +-spec open(Name::string()) -> read_file(). open(Name) -> open(Name, random). +-spec open(Name::string(), sequential|random) -> read_file(). + %% this is how to open a btree for sequential scanning (merge, fold) open(Name, sequential) -> {ok, File} = file:open(Name, [raw,read,{read_ahead, 1024 * 512},binary]), diff --git a/test/lsm_btree_merger_tests.erl b/test/lsm_btree_merger_tests.erl index 520e72f..d633031 100644 --- a/test/lsm_btree_merger_tests.erl +++ b/test/lsm_btree_merger_tests.erl @@ -31,6 +31,7 @@ merge_test() -> ok = lsm_btree_writer:close(BT2), + self() ! {step, {self(), none}, 2000000000}, {Time,{ok,Count}} = timer:tc(lsm_btree_merger, merge, ["test1", "test2", "test3", 10000, true]), error_logger:info_msg("time to merge: ~p/sec (time=~p, count=~p)~n", [1000000/(Time/Count), Time/1000000, Count]), diff --git a/test/lsm_btree_tests.erl b/test/lsm_btree_tests.erl index b1861d3..f2088ad 100644 --- a/test/lsm_btree_tests.erl +++ b/test/lsm_btree_tests.erl @@ -38,7 +38,7 @@ full_test_() -> ?_test(test_tree_simple_2()), ?_test(test_tree_simple_3()), ?_test(test_tree_simple_4()), - {timeout, 30, ?_test(test_tree())}, + {timeout, 300, ?_test(test_tree())}, {timeout, 120, ?_test(test_qc())} ]}. @@ -299,7 +299,7 @@ test_tree() -> <>, <<"data",N:128>>) end, ok, - lists:seq(2,10000,1)), + lists:seq(2,100000,1)), lists:foldl(fun(N,_) -> ok = lsm_btree:put(Tree, <>, <<"data",N:128>>)