Merge branch 'krab-incremental-merge' into gsb-merge-krab-20120419
Conflicts: src/lsm_btree.erl src/lsm_btree.hrl
This commit is contained in:
commit
23f6d76a72
12 changed files with 354 additions and 84 deletions
|
@ -32,7 +32,7 @@ if [ -d dev ]; then
|
|||
done
|
||||
fi
|
||||
|
||||
rebar get-deps
|
||||
./rebar get-deps
|
||||
|
||||
file=./deps/riak_kv/src/riak_kv.app.src
|
||||
if ! grep -q lsm_btree $file ; then
|
||||
|
@ -48,7 +48,7 @@ if ! grep -q lsm_btree $file ; then
|
|||
perl -i.orig -pe '/\bsext\b/ && print qq( {lsm_btree, ".*", {git, "git\@github.com:basho/lsm_btree.git", "master"}},\n)' $file
|
||||
fi
|
||||
|
||||
rebar get-deps
|
||||
./rebar get-deps
|
||||
|
||||
rm -rf dev
|
||||
make all devrel
|
||||
|
|
|
@ -6,6 +6,15 @@
|
|||
%%
|
||||
-define(BTREE_ASYNC_CHUNK_SIZE, 100).
|
||||
|
||||
%%
|
||||
%% The btree_range structure is a bit assymetric, here is why:
|
||||
%%
|
||||
%% from_key=<<>> is "less than" any other key, hence we don't need to
|
||||
%% handle from_key=undefined to support an open-ended start of the
|
||||
%% interval. For to_key, we cannot (statically) construct a key
|
||||
%% which is > any possible key, hence we need to allow to_key=undefined
|
||||
%% as a token of an interval that has no upper limit.
|
||||
%%
|
||||
-record(btree_range, { from_key = <<>> :: binary(),
|
||||
from_inclusive = true :: boolean(),
|
||||
to_key :: binary() | undefined,
|
||||
|
|
|
@ -11,6 +11,8 @@ function periodic() {
|
|||
echo -n " "
|
||||
elif ! [ -f "B-$i.data" ] ; then
|
||||
echo -n "-"
|
||||
elif ! [ -f "C-$i.data" ] ; then
|
||||
echo -n "#"
|
||||
elif ! [ -f "X-$i.data" ] ; then
|
||||
echo -n "="
|
||||
else
|
||||
|
@ -32,8 +34,10 @@ function dynamic() {
|
|||
s="$s "
|
||||
elif ! [ -f "B-$i.data" ] ; then
|
||||
s="$s-"
|
||||
elif ! [ -f "X-$i.data" ] ; then
|
||||
elif ! [ -f "C-$i.data" ] ; then
|
||||
s="$s="
|
||||
elif ! [ -f "X-$i.data" ] ; then
|
||||
s="$s%"
|
||||
else
|
||||
s="$s*"
|
||||
fi
|
||||
|
|
|
@ -79,6 +79,8 @@ sync_fold_range(Ref,Fun,Acc0,Range) ->
|
|||
|
||||
sync_receive_fold_range(MRef, PID,Fun,Acc0) ->
|
||||
receive
|
||||
|
||||
%% receive one K/V from fold_worker
|
||||
{fold_result, PID, K,V} ->
|
||||
case
|
||||
try
|
||||
|
@ -96,6 +98,25 @@ sync_receive_fold_range(MRef, PID,Fun,Acc0) ->
|
|||
erlang:exit(PID, kill),
|
||||
drain_worker_and_throw(MRef,PID,Exit)
|
||||
end;
|
||||
|
||||
%% receive multiple KVs from fold_worker
|
||||
{fold_results, PID, KVs} ->
|
||||
case
|
||||
try
|
||||
{ok, kvfoldl(Fun,Acc0,KVs)}
|
||||
catch
|
||||
Class:Exception ->
|
||||
lager:warning("Exception in lsm_btree fold: ~p", [Exception]),
|
||||
{'EXIT', Class, Exception, erlang:get_stacktrace()}
|
||||
end
|
||||
of
|
||||
{ok, Acc1} ->
|
||||
sync_receive_fold_range(MRef, PID, Fun, Acc1);
|
||||
Exit ->
|
||||
%% kill the fold worker ...
|
||||
erlang:exit(PID, kill),
|
||||
drain_worker_and_throw(MRef,PID,Exit)
|
||||
end;
|
||||
{fold_limit, PID, _} ->
|
||||
erlang:demonitor(MRef, [flush]),
|
||||
Acc0;
|
||||
|
@ -106,6 +127,26 @@ sync_receive_fold_range(MRef, PID,Fun,Acc0) ->
|
|||
error({fold_worker_died, Reason})
|
||||
end.
|
||||
|
||||
kvfoldl(_Fun,Acc0,[]) ->
|
||||
Acc0;
|
||||
kvfoldl(Fun,Acc0,[{K,V}|T]) ->
|
||||
kvfoldl(Fun, Fun(K,V,Acc0), T).
|
||||
|
||||
drain_worker_and_throw(MRef, PID, ExitTuple) ->
|
||||
receive
|
||||
{fold_result, PID, _, _} ->
|
||||
drain_worker_and_throw(MRef, PID, ExitTuple);
|
||||
{'DOWN', MRef, _, _, _} ->
|
||||
raise(ExitTuple);
|
||||
{fold_limit, PID, _} ->
|
||||
erlang:demonitor(MRef, [flush]),
|
||||
raise(ExitTuple);
|
||||
{fold_done, PID} ->
|
||||
erlang:demonitor(MRef, [flush]),
|
||||
raise(ExitTuple)
|
||||
end.
|
||||
|
||||
|
||||
raise({'EXIT', Class, Exception, Trace}) ->
|
||||
erlang:raise(Class, Exception, Trace).
|
||||
|
||||
|
@ -113,6 +154,8 @@ drain_worker_and_throw(MRef, PID, ExitTuple) ->
|
|||
receive
|
||||
{fold_result, PID, _, _} ->
|
||||
drain_worker_and_throw(MRef, PID, ExitTuple);
|
||||
{fold_results, PID, _} ->
|
||||
drain_worker_and_throw(MRef, PID, ExitTuple);
|
||||
{'DOWN', MRef, _, _, _} ->
|
||||
raise(ExitTuple);
|
||||
{fold_limit, PID, _} ->
|
||||
|
@ -215,8 +258,10 @@ handle_cast(Info,State) ->
|
|||
|
||||
|
||||
%% premature delete -> cleanup
|
||||
terminate(normal,_State) ->
|
||||
ok;
|
||||
terminate(_Reason,_State) ->
|
||||
% error_logger:info_msg("got terminate(~p,~p)~n", [Reason,State]),
|
||||
error_logger:info_msg("got terminate(~p,~p)~n", [_Reason,_State]),
|
||||
% flush_nursery(State),
|
||||
ok.
|
||||
|
||||
|
|
|
@ -25,8 +25,8 @@
|
|||
-author('Kresten Krab Thorup <krab@trifork.com>').
|
||||
|
||||
|
||||
%% smallest levels are 8192 entries
|
||||
-define(TOP_LEVEL, 5).
|
||||
%% smallest levels are 128 entries
|
||||
-define(TOP_LEVEL, 7).
|
||||
-define(BTREE_SIZE(Level), (1 bsl (Level))).
|
||||
|
||||
-define(TOMBSTONE, 'deleted').
|
||||
|
|
|
@ -67,6 +67,7 @@ start(SendTo) ->
|
|||
PID = plain_fsm:spawn(?MODULE,
|
||||
fun() ->
|
||||
process_flag(trap_exit,true),
|
||||
link(SendTo),
|
||||
initialize(#state{sendto=SendTo}, [])
|
||||
end),
|
||||
{ok, PID}.
|
||||
|
@ -124,10 +125,9 @@ fill(State, Values, [PID|Rest]=PIDs) ->
|
|||
|
||||
emit_next(State, []) ->
|
||||
State#state.sendto ! {fold_done, self()},
|
||||
ok;
|
||||
end_of_fold(State);
|
||||
|
||||
emit_next(State, [{FirstPID,FirstKV}|Rest]=Values) ->
|
||||
|
||||
case
|
||||
lists:foldl(fun({P,{K1,_}=KV}, {{K2,_},_}) when K1 < K2 ->
|
||||
{KV,[P]};
|
||||
|
@ -142,12 +142,16 @@ emit_next(State, [{FirstPID,FirstKV}|Rest]=Values) ->
|
|||
{{_, ?TOMBSTONE}, FillFrom} ->
|
||||
fill(State, Values, FillFrom);
|
||||
{{Key, limit}, _} ->
|
||||
State#state.sendto ! {fold_limit, self(), Key};
|
||||
State#state.sendto ! {fold_limit, self(), Key},
|
||||
end_of_fold(State);
|
||||
{{FoundKey, FoundValue}, FillFrom} ->
|
||||
State#state.sendto ! {fold_result, self(), FoundKey, FoundValue},
|
||||
fill(State, Values, FillFrom)
|
||||
end.
|
||||
|
||||
end_of_fold(State) ->
|
||||
unlink(State#state.sendto),
|
||||
ok.
|
||||
|
||||
data_vsn() ->
|
||||
5.
|
||||
|
|
|
@ -25,7 +25,8 @@
|
|||
-module(lsm_btree_level).
|
||||
-author('Kresten Krab Thorup <krab@trifork.com>').
|
||||
|
||||
-include("lsm_btree.hrl").
|
||||
-include("include/lsm_btree.hrl").
|
||||
-include("src/lsm_btree.hrl").
|
||||
|
||||
%%
|
||||
%% Manages a "pair" of lsm_index (or rathern, 0, 1 or 2), and governs
|
||||
|
@ -41,22 +42,25 @@
|
|||
-behavior(plain_fsm).
|
||||
-export([data_vsn/0, code_change/3]).
|
||||
|
||||
-export([open/3, lookup/2, inject/2, close/1, async_range/3, sync_range/3]).
|
||||
-export([open/3, lookup/2, inject/2, close/1, async_range/3, sync_range/3, incremental_merge/2]).
|
||||
|
||||
-include_lib("kernel/include/file.hrl").
|
||||
|
||||
-record(state, {
|
||||
a, b, next, dir, level, inject_done_ref, merge_pid, folding = []
|
||||
a, b, c, next, dir, level, inject_done_ref, merge_pid, folding = [],
|
||||
step_next_ref, step_caller, step_merge_ref
|
||||
}).
|
||||
|
||||
%%%%% PUBLIC OPERATIONS
|
||||
|
||||
open(Dir,Level,Next) when Level>0 ->
|
||||
{ok, plain_fsm:spawn_link(?MODULE,
|
||||
PID = plain_fsm:spawn_link(?MODULE,
|
||||
fun() ->
|
||||
process_flag(trap_exit,true),
|
||||
initialize(#state{dir=Dir,level=Level,next=Next})
|
||||
end)}.
|
||||
end),
|
||||
incremental_merge(PID, 2*?BTREE_SIZE(?TOP_LEVEL)),
|
||||
{ok, PID}.
|
||||
|
||||
lookup(Ref, Key) ->
|
||||
call(Ref, {lookup, Key}).
|
||||
|
@ -65,6 +69,9 @@ inject(Ref, FileName) ->
|
|||
Result = call(Ref, {inject, FileName}),
|
||||
Result.
|
||||
|
||||
incremental_merge(Ref,HowMuch) ->
|
||||
call(Ref, {incremental_merge, HowMuch}).
|
||||
|
||||
close(Ref) ->
|
||||
try
|
||||
call(Ref, close)
|
||||
|
@ -77,13 +84,13 @@ close(Ref) ->
|
|||
|
||||
async_range(Ref, FoldWorkerPID, Range) ->
|
||||
proc_lib:spawn(fun() ->
|
||||
{ok, Folders} = call(Ref, {init_range_fold, FoldWorkerPID, Range, []}),
|
||||
{ok, Folders} = call(Ref, {init_snapshot_range_fold, FoldWorkerPID, Range, []}),
|
||||
FoldWorkerPID ! {initialize, Folders}
|
||||
end),
|
||||
{ok, FoldWorkerPID}.
|
||||
|
||||
sync_range(Ref, FoldWorkerPID, Range) ->
|
||||
{ok, Folders} = call(Ref, {sync_range_fold, FoldWorkerPID, Range, []}),
|
||||
{ok, Folders} = call(Ref, {init_blocking_range_fold, FoldWorkerPID, Range, []}),
|
||||
FoldWorkerPID ! {initialize, Folders},
|
||||
{ok, FoldWorkerPID}.
|
||||
|
||||
|
@ -123,29 +130,49 @@ reply({PID,Ref}, Reply) ->
|
|||
|
||||
|
||||
initialize(State) ->
|
||||
|
||||
try
|
||||
initialize2(State)
|
||||
catch
|
||||
Class:Ex when not (Class == exit andalso Ex == normal) ->
|
||||
error_logger:error_msg("crash2: ~p:~p ~p~n", [Class,Ex,erlang:get_stacktrace()])
|
||||
end.
|
||||
|
||||
initialize2(State) ->
|
||||
% error_logger:info_msg("in ~p level=~p~n", [self(), State]),
|
||||
|
||||
AFileName = filename("A",State),
|
||||
BFileName = filename("B",State),
|
||||
CFileName = filename("C",State),
|
||||
MFileName = filename("M",State),
|
||||
|
||||
%% remove old merge file
|
||||
file:delete( filename("X",State)),
|
||||
|
||||
%% remove old fold files (hard links to A/B used during fold)
|
||||
%% remove old fold files (hard links to A/B/C used during fold)
|
||||
file:delete( filename("AF",State)),
|
||||
file:delete( filename("BF",State)),
|
||||
file:delete( filename("CF",State)),
|
||||
|
||||
case file:read_file_info(CFileName) of
|
||||
case file:read_file_info(MFileName) of
|
||||
{ok, _} ->
|
||||
|
||||
%% recover from post-merge crash
|
||||
file:delete(AFileName),
|
||||
file:delete(BFileName),
|
||||
ok = file:rename(CFileName, AFileName),
|
||||
ok = file:rename(MFileName, AFileName),
|
||||
|
||||
{ok, BT} = lsm_btree_reader:open(CFileName, random),
|
||||
main_loop(State#state{ a= BT, b=undefined });
|
||||
{ok, BT} = lsm_btree_reader:open(AFileName, random),
|
||||
|
||||
case file:read_file_info(CFileName) of
|
||||
{ok, _} ->
|
||||
file:rename(CFileName, BFileName),
|
||||
{ok, BT2} = lsm_btree_reader:open(BFileName, random),
|
||||
check_begin_merge_then_loop(State#state{ a= BT, b=BT2 });
|
||||
|
||||
{error, enoent} ->
|
||||
main_loop(State#state{ a= BT, b=undefined })
|
||||
end;
|
||||
|
||||
{error, enoent} ->
|
||||
case file:read_file_info(BFileName) of
|
||||
|
@ -153,7 +180,14 @@ initialize(State) ->
|
|||
{ok, BT1} = lsm_btree_reader:open(AFileName, random),
|
||||
{ok, BT2} = lsm_btree_reader:open(BFileName, random),
|
||||
|
||||
check_begin_merge_then_loop(State#state{ a=BT1, b=BT2 });
|
||||
case file:read_file_info(CFileName) of
|
||||
{ok, _} ->
|
||||
{ok, BT3} = lsm_btree_reader:open(CFileName, random);
|
||||
{error, enoent} ->
|
||||
BT3 = undefined
|
||||
end,
|
||||
|
||||
check_begin_merge_then_loop(State#state{ a=BT1, b=BT2, c=BT3 });
|
||||
|
||||
{error, enoent} ->
|
||||
|
||||
|
@ -179,7 +213,7 @@ main_loop(State = #state{ next=Next }) ->
|
|||
Parent = plain_fsm:info(parent),
|
||||
receive
|
||||
?REQ(From, {lookup, Key})=Req ->
|
||||
case do_lookup(Key, [State#state.b, State#state.a, Next]) of
|
||||
case do_lookup(Key, [State#state.c, State#state.b, State#state.a, Next]) of
|
||||
not_found ->
|
||||
reply(From, not_found);
|
||||
{found, Result} ->
|
||||
|
@ -189,22 +223,107 @@ main_loop(State = #state{ next=Next }) ->
|
|||
end,
|
||||
main_loop(State);
|
||||
|
||||
?REQ(From, {inject, FileName}) when State#state.b == undefined ->
|
||||
if State#state.a == undefined ->
|
||||
?REQ(From, {inject, FileName}) when State#state.c == undefined ->
|
||||
case {State#state.a, State#state.b} of
|
||||
{undefined, undefined} ->
|
||||
ToFileName = filename("A",State),
|
||||
SetPos = #state.a;
|
||||
true ->
|
||||
{_, undefined} ->
|
||||
ToFileName = filename("B",State),
|
||||
SetPos = #state.b
|
||||
SetPos = #state.b;
|
||||
{_, _} ->
|
||||
ToFileName = filename("C",State),
|
||||
SetPos = #state.c
|
||||
end,
|
||||
ok = file:rename(FileName, ToFileName),
|
||||
{ok, BT} = lsm_btree_reader:open(ToFileName, random),
|
||||
|
||||
reply(From, ok),
|
||||
check_begin_merge_then_loop(setelement(SetPos, State, BT));
|
||||
|
||||
|
||||
%% replies OK when there is no current step in progress
|
||||
?REQ(From, {incremental_merge, HowMuch})
|
||||
when State#state.step_merge_ref == undefined,
|
||||
State#state.step_next_ref == undefined ->
|
||||
reply(From, ok),
|
||||
self() ! ?REQ(undefined, {step, HowMuch}),
|
||||
main_loop(State);
|
||||
|
||||
%% accept step any time there is not an outstanding step
|
||||
?REQ(StepFrom, {step, HowMuch})
|
||||
when State#state.step_merge_ref == undefined,
|
||||
State#state.step_caller == undefined,
|
||||
State#state.step_next_ref == undefined
|
||||
->
|
||||
|
||||
%% delegate the step request to the next level
|
||||
if Next =:= undefined ->
|
||||
DelegateRef = undefined;
|
||||
true ->
|
||||
DelegateRef = send_request(Next, {step, HowMuch})
|
||||
end,
|
||||
|
||||
%% if there is a merge worker, send him a step message
|
||||
case State#state.merge_pid of
|
||||
undefined ->
|
||||
MergeRef = undefined;
|
||||
MergePID ->
|
||||
MergeRef = monitor(process, MergePID),
|
||||
MergePID ! {step, {self(), MergeRef}, HowMuch}
|
||||
end,
|
||||
|
||||
if (Next =:= undefined) andalso (MergeRef =:= undefined) ->
|
||||
%% nothing to do ... just return OK
|
||||
State2 = reply_step_ok(State#state { step_caller = StepFrom }),
|
||||
main_loop(State2);
|
||||
true ->
|
||||
main_loop(State#state{ step_next_ref=DelegateRef,
|
||||
step_caller=StepFrom,
|
||||
step_merge_ref=MergeRef
|
||||
})
|
||||
end;
|
||||
|
||||
{MRef, step_done} when MRef == State#state.step_merge_ref ->
|
||||
demonitor(MRef, [flush]),
|
||||
|
||||
case State#state.step_next_ref of
|
||||
undefined ->
|
||||
State2 = reply_step_ok(State);
|
||||
_ ->
|
||||
State2 = State
|
||||
end,
|
||||
|
||||
main_loop(State2#state{ step_merge_ref=undefined });
|
||||
|
||||
{'DOWN', MRef, _, _, _} when MRef == State#state.step_merge_ref ->
|
||||
|
||||
%% current merge worker died (or just finished)
|
||||
|
||||
case State#state.step_next_ref of
|
||||
undefined ->
|
||||
State2 = reply_step_ok(State);
|
||||
_ ->
|
||||
State2 = State
|
||||
end,
|
||||
|
||||
main_loop(State2#state{ step_merge_ref=undefined });
|
||||
|
||||
|
||||
?REPLY(MRef, ok)
|
||||
when MRef =:= State#state.step_next_ref,
|
||||
State#state.step_merge_ref =:= undefined ->
|
||||
|
||||
%% this applies when we receive an OK from the next level,
|
||||
%% and we have finished the incremental merge at this level
|
||||
|
||||
State2 = reply_step_ok(State),
|
||||
main_loop(State2#state{ step_next_ref=undefined });
|
||||
|
||||
?REQ(From, close) ->
|
||||
close_if_defined(State#state.a),
|
||||
close_if_defined(State#state.b),
|
||||
close_if_defined(State#state.c),
|
||||
stop_if_defined(State#state.merge_pid),
|
||||
reply(From, ok),
|
||||
|
||||
|
@ -217,69 +336,94 @@ main_loop(State = #state{ next=Next }) ->
|
|||
end,
|
||||
ok;
|
||||
|
||||
?REQ(From, {init_range_fold, WorkerPID, Range, List}) when State#state.folding == [] ->
|
||||
?REQ(From, {init_snapshot_range_fold, WorkerPID, Range, List}) when State#state.folding == [] ->
|
||||
|
||||
case {State#state.a, State#state.b} of
|
||||
{undefined, undefined} ->
|
||||
NewFolding = [],
|
||||
case {State#state.a, State#state.b, State#state.c} of
|
||||
{undefined, undefined, undefined} ->
|
||||
FoldingPIDs = [],
|
||||
NextList = List;
|
||||
|
||||
{_, undefined} ->
|
||||
{_, undefined, undefined} ->
|
||||
ok = file:make_link(filename("A", State), filename("AF", State)),
|
||||
{ok, PID0} = start_range_fold(filename("AF",State), WorkerPID, Range),
|
||||
NextList = [PID0|List],
|
||||
NewFolding = [PID0];
|
||||
FoldingPIDs = [PID0];
|
||||
|
||||
{_, _} ->
|
||||
{_, _, undefined} ->
|
||||
ok = file:make_link(filename("A", State), filename("AF", State)),
|
||||
{ok, PID0} = start_range_fold(filename("AF",State), WorkerPID, Range),
|
||||
{ok, PIDA} = start_range_fold(filename("AF",State), WorkerPID, Range),
|
||||
|
||||
ok = file:make_link(filename("B", State), filename("BF", State)),
|
||||
{ok, PID1} = start_range_fold(filename("BF",State), WorkerPID, Range),
|
||||
{ok, PIDB} = start_range_fold(filename("BF",State), WorkerPID, Range),
|
||||
|
||||
NextList = [PID1,PID0|List],
|
||||
NewFolding = [PID1,PID0]
|
||||
NextList = [PIDA,PIDB|List],
|
||||
FoldingPIDs = [PIDB,PIDA];
|
||||
|
||||
{_, _, _} ->
|
||||
ok = file:make_link(filename("A", State), filename("AF", State)),
|
||||
{ok, PIDA} = start_range_fold(filename("AF",State), WorkerPID, Range),
|
||||
|
||||
ok = file:make_link(filename("B", State), filename("BF", State)),
|
||||
{ok, PIDB} = start_range_fold(filename("BF",State), WorkerPID, Range),
|
||||
|
||||
ok = file:make_link(filename("C", State), filename("CF", State)),
|
||||
{ok, PIDC} = start_range_fold(filename("CF",State), WorkerPID, Range),
|
||||
|
||||
NextList = [PIDA,PIDB,PIDC|List],
|
||||
FoldingPIDs = [PIDC,PIDB,PIDA]
|
||||
end,
|
||||
|
||||
case Next of
|
||||
undefined ->
|
||||
reply(From, {ok, lists:reverse(NextList)});
|
||||
_ ->
|
||||
Next ! ?REQ(From, {init_range_fold, WorkerPID, Range, NextList})
|
||||
Next ! ?REQ(From, {init_snapshot_range_fold, WorkerPID, Range, NextList})
|
||||
end,
|
||||
|
||||
main_loop(State#state{ folding = NewFolding });
|
||||
main_loop(State#state{ folding = FoldingPIDs });
|
||||
|
||||
{range_fold_done, PID, [_,$F|_]=FoldFileName} ->
|
||||
ok = file:delete(FoldFileName),
|
||||
main_loop(State#state{ folding = lists:delete(PID,State#state.folding) });
|
||||
|
||||
?REQ(From, {sync_range_fold, WorkerPID, Range, List}) ->
|
||||
?REQ(From, {init_blocking_range_fold, WorkerPID, Range, List}) ->
|
||||
|
||||
case {State#state.a, State#state.b} of
|
||||
{undefined, undefined} ->
|
||||
case {State#state.a, State#state.b, State#state.c} of
|
||||
{undefined, undefined, undefined} ->
|
||||
RefList = List;
|
||||
|
||||
{_, undefined} ->
|
||||
{_, undefined, undefined} ->
|
||||
ARef = erlang:make_ref(),
|
||||
ok = do_range_fold(State#state.a, WorkerPID, ARef, Range),
|
||||
RefList = [ARef|List];
|
||||
|
||||
{_, _} ->
|
||||
{_, _, undefined} ->
|
||||
BRef = erlang:make_ref(),
|
||||
ok = do_range_fold(State#state.b, WorkerPID, BRef, Range),
|
||||
|
||||
ARef = erlang:make_ref(),
|
||||
ok = do_range_fold(State#state.a, WorkerPID, ARef, Range),
|
||||
|
||||
RefList = [ARef,BRef|List]
|
||||
RefList = [ARef,BRef|List];
|
||||
|
||||
{_, _, _} ->
|
||||
CRef = erlang:make_ref(),
|
||||
ok = do_range_fold(State#state.c, WorkerPID, CRef, Range),
|
||||
|
||||
BRef = erlang:make_ref(),
|
||||
ok = do_range_fold(State#state.b, WorkerPID, BRef, Range),
|
||||
|
||||
ARef = erlang:make_ref(),
|
||||
ok = do_range_fold(State#state.a, WorkerPID, ARef, Range),
|
||||
|
||||
RefList = [ARef,BRef,CRef|List]
|
||||
end,
|
||||
|
||||
case Next of
|
||||
undefined ->
|
||||
reply(From, {ok, lists:reverse(RefList)});
|
||||
_ ->
|
||||
Next ! ?REQ(From, {sync_range_fold, WorkerPID, Range, RefList})
|
||||
Next ! ?REQ(From, {init_blocking_range_fold, WorkerPID, Range, RefList})
|
||||
end,
|
||||
|
||||
main_loop(State);
|
||||
|
@ -291,19 +435,29 @@ main_loop(State = #state{ next=Next }) ->
|
|||
%%
|
||||
{merge_done, Count, OutFileName} when Count =< ?BTREE_SIZE(State#state.level) ->
|
||||
|
||||
% first, rename the tmp file to C, so recovery will pick it up
|
||||
CFileName = filename("C",State),
|
||||
ok = file:rename(OutFileName, CFileName),
|
||||
% first, rename the tmp file to M, so recovery will pick it up
|
||||
MFileName = filename("M",State),
|
||||
ok = file:rename(OutFileName, MFileName),
|
||||
|
||||
% then delete A and B (if we crash now, C will become the A file)
|
||||
{ok, State2} = close_a_and_b(State),
|
||||
{ok, State2} = close_and_delete_a_and_b(State),
|
||||
|
||||
% then, rename C to A, and open it
|
||||
% then, rename M to A, and open it
|
||||
AFileName = filename("A",State2),
|
||||
ok = file:rename(CFileName, AFileName),
|
||||
ok = file:rename(MFileName, AFileName),
|
||||
{ok, BT} = lsm_btree_reader:open(AFileName, random),
|
||||
|
||||
main_loop(State2#state{ a=BT, b=undefined, merge_pid=undefined });
|
||||
% iff there is a C file, then move it to B position
|
||||
% TODO: consider recovery for this
|
||||
case State#state.c of
|
||||
undefined ->
|
||||
main_loop(State2#state{ a=BT, b=undefined, merge_pid=undefined });
|
||||
TreeFile ->
|
||||
file:rename(filename("C",State2), filename("B", State2)),
|
||||
check_begin_merge_then_loop(State2#state{ a=BT, b=TreeFile, c=undefined,
|
||||
merge_pid=undefined })
|
||||
|
||||
end;
|
||||
|
||||
%%
|
||||
%% We need to push the output of merging to the next level
|
||||
|
@ -317,6 +471,8 @@ main_loop(State = #state{ next=Next }) ->
|
|||
State
|
||||
end,
|
||||
|
||||
%% no need to rename it since we don't accept new injects
|
||||
|
||||
MRef = send_request(State1#state.next, {inject, OutFileName}),
|
||||
main_loop(State1#state{ inject_done_ref = MRef, merge_pid=undefined });
|
||||
|
||||
|
@ -325,14 +481,25 @@ main_loop(State = #state{ next=Next }) ->
|
|||
%%
|
||||
?REPLY(MRef, ok) when MRef =:= State#state.inject_done_ref ->
|
||||
erlang:demonitor(MRef, [flush]),
|
||||
{ok, State2} = close_a_and_b(State),
|
||||
main_loop(State2#state{ inject_done_ref=undefined });
|
||||
{ok, State2} = close_and_delete_a_and_b(State),
|
||||
|
||||
% if there is a "C" file, then move it to "A" position.
|
||||
case State2#state.c of
|
||||
undefined ->
|
||||
State3=State2;
|
||||
TreeFile ->
|
||||
%% TODO: on what OS's is it ok to rename an open file?
|
||||
ok = file:rename(filename("C", State2), filename("A", State2)),
|
||||
State3 = State2#state{ a = TreeFile, b=undefined, c=undefined }
|
||||
end,
|
||||
|
||||
main_loop(State3#state{ inject_done_ref=undefined });
|
||||
|
||||
%%
|
||||
%% Our successor died!
|
||||
%%
|
||||
{'DOWN', MRef, _, _, Reason} when MRef =:= State#state.inject_done_ref ->
|
||||
exit(Reason);
|
||||
{'DOWN', MRef, _, _, Reason} when MRef =:= State#state.inject_done_ref ->
|
||||
exit(Reason);
|
||||
|
||||
%% gen_fsm handling
|
||||
{system, From, Req} ->
|
||||
|
@ -354,6 +521,13 @@ main_loop(State = #state{ next=Next }) ->
|
|||
|
||||
end.
|
||||
|
||||
reply_step_ok(State) ->
|
||||
case State#state.step_caller of
|
||||
undefined -> ok;
|
||||
_ -> reply(State#state.step_caller, ok)
|
||||
end,
|
||||
State#state{ step_caller=undefined }.
|
||||
|
||||
do_lookup(_Key, []) ->
|
||||
not_found;
|
||||
do_lookup(_Key, [Pid]) when is_pid(Pid) ->
|
||||
|
@ -404,7 +578,7 @@ begin_merge(State) ->
|
|||
{ok, MergePID}.
|
||||
|
||||
|
||||
close_a_and_b(State) ->
|
||||
close_and_delete_a_and_b(State) ->
|
||||
AFileName = filename("A",State),
|
||||
BFileName = filename("B",State),
|
||||
|
||||
|
@ -435,19 +609,23 @@ start_range_fold(FileName, WorkerPID, Range) ->
|
|||
end ),
|
||||
{ok, PID}.
|
||||
|
||||
do_range_fold(BT, WorkerPID, Self, Range) ->
|
||||
-spec do_range_fold(BT :: lsm_btree_reader:read_file(),
|
||||
WorkerPID :: pid(),
|
||||
SelfOrRef :: pid() | reference(),
|
||||
Range :: #btree_range{} ) -> ok.
|
||||
do_range_fold(BT, WorkerPID, SelfOrRef, Range) ->
|
||||
case lsm_btree_reader:range_fold(fun(Key,Value,_) ->
|
||||
WorkerPID ! {level_result, Self, Key, Value},
|
||||
WorkerPID ! {level_result, SelfOrRef, Key, Value},
|
||||
ok
|
||||
end,
|
||||
ok,
|
||||
BT,
|
||||
Range) of
|
||||
{limit, _, LastKey} ->
|
||||
WorkerPID ! {level_limit, Self, LastKey};
|
||||
WorkerPID ! {level_limit, SelfOrRef, LastKey};
|
||||
{done, _} ->
|
||||
%% tell fold merge worker we're done
|
||||
WorkerPID ! {level_done, Self}
|
||||
WorkerPID ! {level_done, SelfOrRef}
|
||||
|
||||
end,
|
||||
ok.
|
||||
|
|
|
@ -53,7 +53,7 @@ merge(A,B,C, Size, IsLastLevel) ->
|
|||
{node, AKVs} = lsm_btree_reader:first_node(BT1),
|
||||
{node, BKVs} = lsm_btree_reader:first_node(BT2),
|
||||
|
||||
{ok, Count, Out2} = scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, 0),
|
||||
{ok, Count, Out2} = scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, 0, {0, none}),
|
||||
|
||||
%% finish stream tree
|
||||
ok = lsm_btree_reader:close(BT1),
|
||||
|
@ -68,24 +68,42 @@ merge(A,B,C, Size, IsLastLevel) ->
|
|||
|
||||
{ok, Count}.
|
||||
|
||||
step({N, From}) ->
|
||||
{N-1, From}.
|
||||
|
||||
scan(BT1, BT2, Out, IsLastLevel, [], BKVs, Count) ->
|
||||
scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, {0, FromPID}) ->
|
||||
case FromPID of
|
||||
none ->
|
||||
ok;
|
||||
{PID, Ref} ->
|
||||
PID ! {Ref, step_done}
|
||||
end,
|
||||
|
||||
% error_logger:info_msg("waiting for step in ~p~n", [self()]),
|
||||
|
||||
receive
|
||||
{step, From, HowMany} ->
|
||||
% error_logger:info_msg("got step ~p,~p in ~p~n", [From,HowMany, self()]),
|
||||
scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, {HowMany, From})
|
||||
end;
|
||||
|
||||
scan(BT1, BT2, Out, IsLastLevel, [], BKVs, Count, Step) ->
|
||||
case lsm_btree_reader:next_node(BT1) of
|
||||
{node, AKVs} ->
|
||||
scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count);
|
||||
scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, Step);
|
||||
end_of_data ->
|
||||
scan_only(BT2, Out, IsLastLevel, BKVs, Count)
|
||||
scan_only(BT2, Out, IsLastLevel, BKVs, Count, Step)
|
||||
end;
|
||||
|
||||
scan(BT1, BT2, Out, IsLastLevel, AKVs, [], Count) ->
|
||||
scan(BT1, BT2, Out, IsLastLevel, AKVs, [], Count, Step) ->
|
||||
case lsm_btree_reader:next_node(BT2) of
|
||||
{node, BKVs} ->
|
||||
scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count);
|
||||
scan(BT1, BT2, Out, IsLastLevel, AKVs, BKVs, Count, Step);
|
||||
end_of_data ->
|
||||
scan_only(BT1, Out, IsLastLevel, AKVs, Count)
|
||||
scan_only(BT1, Out, IsLastLevel, AKVs, Count, Step)
|
||||
end;
|
||||
|
||||
scan(BT1, BT2, Out, IsLastLevel, [{Key1,Value1}|AT]=AKVs, [{Key2,Value2}|BT]=BKVs, Count) ->
|
||||
scan(BT1, BT2, Out, IsLastLevel, [{Key1,Value1}|AT]=AKVs, [{Key2,Value2}|BT]=BKVs, Count, Step) ->
|
||||
if Key1 < Key2 ->
|
||||
case ?LOCAL_WRITER of
|
||||
true ->
|
||||
|
@ -94,7 +112,7 @@ scan(BT1, BT2, Out, IsLastLevel, [{Key1,Value1}|AT]=AKVs, [{Key2,Value2}|BT]=BKV
|
|||
ok = lsm_btree_writer:add(Out2=Out, Key1, Value1)
|
||||
end,
|
||||
|
||||
scan(BT1, BT2, Out2, IsLastLevel, AT, BKVs, Count+1);
|
||||
scan(BT1, BT2, Out2, IsLastLevel, AT, BKVs, Count+1, step(Step));
|
||||
|
||||
Key2 < Key1 ->
|
||||
case ?LOCAL_WRITER of
|
||||
|
@ -103,10 +121,10 @@ scan(BT1, BT2, Out, IsLastLevel, [{Key1,Value1}|AT]=AKVs, [{Key2,Value2}|BT]=BKV
|
|||
false ->
|
||||
ok = lsm_btree_writer:add(Out2=Out, Key2, Value2)
|
||||
end,
|
||||
scan(BT1, BT2, Out2, IsLastLevel, AKVs, BT, Count+1);
|
||||
scan(BT1, BT2, Out2, IsLastLevel, AKVs, BT, Count+1, step(Step));
|
||||
|
||||
(?TOMBSTONE =:= Value2) and (true =:= IsLastLevel) ->
|
||||
scan(BT1, BT2, Out, IsLastLevel, AT, BT, Count);
|
||||
scan(BT1, BT2, Out, IsLastLevel, AT, BT, Count, step(Step));
|
||||
|
||||
true ->
|
||||
case ?LOCAL_WRITER of
|
||||
|
@ -115,25 +133,25 @@ scan(BT1, BT2, Out, IsLastLevel, [{Key1,Value1}|AT]=AKVs, [{Key2,Value2}|BT]=BKV
|
|||
false ->
|
||||
ok = lsm_btree_writer:add(Out2=Out, Key2, Value2)
|
||||
end,
|
||||
scan(BT1, BT2, Out2, IsLastLevel, AT, BT, Count+1)
|
||||
scan(BT1, BT2, Out2, IsLastLevel, AT, BT, Count+1, step(Step))
|
||||
end.
|
||||
|
||||
scan_only(BT, Out, IsLastLevel, [], Count) ->
|
||||
scan_only(BT, Out, IsLastLevel, [], Count, Step) ->
|
||||
case lsm_btree_reader:next_node(BT) of
|
||||
{node, KVs} ->
|
||||
scan_only(BT, Out, IsLastLevel, KVs, Count);
|
||||
scan_only(BT, Out, IsLastLevel, KVs, Count, step(Step));
|
||||
end_of_data ->
|
||||
{ok, Count, Out}
|
||||
end;
|
||||
|
||||
scan_only(BT, Out, true, [{_,?TOMBSTONE}|Rest], Count) ->
|
||||
scan_only(BT, Out, true, Rest, Count);
|
||||
scan_only(BT, Out, true, [{_,?TOMBSTONE}|Rest], Count, Step) ->
|
||||
scan_only(BT, Out, true, Rest, Count, step(Step));
|
||||
|
||||
scan_only(BT, Out, IsLastLevel, [{Key,Value}|Rest], Count) ->
|
||||
scan_only(BT, Out, IsLastLevel, [{Key,Value}|Rest], Count, Step) ->
|
||||
case ?LOCAL_WRITER of
|
||||
true ->
|
||||
{noreply, Out2} = lsm_btree_writer:handle_cast({add, Key, Value}, Out);
|
||||
false ->
|
||||
ok = lsm_btree_writer:add(Out2=Out, Key, Value)
|
||||
end,
|
||||
scan_only(BT, Out2, IsLastLevel, Rest, Count+1).
|
||||
scan_only(BT, Out2, IsLastLevel, Rest, Count+1, step(Step)).
|
||||
|
|
|
@ -162,7 +162,13 @@ finish(#nursery{ dir=Dir, cache=Cache, log_file=LogFile, total_size=_TotalSize,
|
|||
% [ gb_trees:size(Cache), TotalSize, FileInfo#file_info.size ]),
|
||||
|
||||
%% inject the B-Tree (blocking RPC)
|
||||
ok = lsm_btree_level:inject(TopLevel, BTreeFileName);
|
||||
ok = lsm_btree_level:inject(TopLevel, BTreeFileName),
|
||||
|
||||
%% issue some work if this is a top-level inject (blocks until previous such
|
||||
%% incremental merge is finished).
|
||||
lsm_btree_level:incremental_merge(TopLevel, ?BTREE_SIZE(?TOP_LEVEL)),
|
||||
ok;
|
||||
|
||||
_ ->
|
||||
ok
|
||||
end,
|
||||
|
|
|
@ -35,9 +35,14 @@
|
|||
-record(node, { level, members=[] }).
|
||||
-record(index, {file, root, bloom}).
|
||||
|
||||
-type read_file() :: #index{}.
|
||||
|
||||
-spec open(Name::string()) -> read_file().
|
||||
open(Name) ->
|
||||
open(Name, random).
|
||||
|
||||
-spec open(Name::string(), sequential|random) -> read_file().
|
||||
|
||||
%% this is how to open a btree for sequential scanning (merge, fold)
|
||||
open(Name, sequential) ->
|
||||
{ok, File} = file:open(Name, [raw,read,{read_ahead, 1024 * 512},binary]),
|
||||
|
|
|
@ -31,6 +31,7 @@ merge_test() ->
|
|||
ok = lsm_btree_writer:close(BT2),
|
||||
|
||||
|
||||
self() ! {step, {self(), none}, 2000000000},
|
||||
{Time,{ok,Count}} = timer:tc(lsm_btree_merger, merge, ["test1", "test2", "test3", 10000, true]),
|
||||
|
||||
error_logger:info_msg("time to merge: ~p/sec (time=~p, count=~p)~n", [1000000/(Time/Count), Time/1000000, Count]),
|
||||
|
|
|
@ -38,7 +38,7 @@ full_test_() ->
|
|||
?_test(test_tree_simple_2()),
|
||||
?_test(test_tree_simple_3()),
|
||||
?_test(test_tree_simple_4()),
|
||||
{timeout, 30, ?_test(test_tree())},
|
||||
{timeout, 300, ?_test(test_tree())},
|
||||
{timeout, 120, ?_test(test_qc())}
|
||||
]}.
|
||||
|
||||
|
@ -299,7 +299,7 @@ test_tree() ->
|
|||
<<N:128>>, <<"data",N:128>>)
|
||||
end,
|
||||
ok,
|
||||
lists:seq(2,10000,1)),
|
||||
lists:seq(2,100000,1)),
|
||||
lists:foldl(fun(N,_) ->
|
||||
ok = lsm_btree:put(Tree,
|
||||
<<N:128>>, <<"data",N:128>>)
|
||||
|
|
Loading…
Reference in a new issue