diff --git a/include/lsm_btree.hrl b/include/lsm_btree.hrl new file mode 100644 index 0000000..9499167 --- /dev/null +++ b/include/lsm_btree.hrl @@ -0,0 +1,6 @@ + +-record(btree_range, { from_key = <<>> :: binary(), + from_inclusive = true :: boolean(), + to_key :: binary() | undefined, + to_inclusive = false :: boolean(), + limit :: pos_integer() | undefined }). diff --git a/src/lsm_btree.erl b/src/lsm_btree.erl index 4890984..8094eef 100644 --- a/src/lsm_btree.erl +++ b/src/lsm_btree.erl @@ -9,6 +9,7 @@ -include("lsm_btree.hrl"). -include_lib("kernel/include/file.hrl"). +-include_lib("include/lsm_btree.hrl"). -record(state, { top, nursery, dir }). @@ -38,17 +39,35 @@ delete(Ref,Key) when is_binary(Key) -> put(Ref,Key,Value) when is_binary(Key), is_binary(Value) -> gen_server:call(Ref, {put, Key, Value}). -sync_range(Ref,FromKey,ToKey) when FromKey == undefined orelse is_binary(FromKey), +sync_range(Ref, #btree_range{}=Range) -> + gen_server:call(Ref, {sync_range, self(), Range}). + +sync_range(Ref,undefined,ToKey) -> + sync_range(Ref, <<>>, ToKey); +sync_range(Ref,FromKey,ToKey) when is_binary(FromKey), ToKey == undefined orelse is_binary(ToKey) -> - gen_server:call(Ref, {sync_range, self(), FromKey, ToKey}). + sync_range(Ref, #btree_range{ from_key = FromKey, + from_inclusive = true, + to_key = ToKey, + to_inclusive = false, + limit = undefined }). sync_fold_range(Ref,Fun,Acc0,FromKey,ToKey) -> {ok, PID} = sync_range(Ref,FromKey,ToKey), receive_fold_range(PID,Fun,Acc0). -async_range(Ref,FromKey,ToKey) when FromKey == undefined orelse is_binary(FromKey), +async_range(Ref, #btree_range{}=Range) -> + gen_server:call(Ref, {async_range, self(), Range}). + +async_range(Ref,undefined,ToKey) -> + async_range(Ref, <<>>, ToKey); +async_range(Ref,FromKey,ToKey) when is_binary(FromKey), ToKey == undefined orelse is_binary(ToKey) -> - gen_server:call(Ref, {async_range, self(), FromKey, ToKey}). + async_range(Ref, #btree_range{ from_key = FromKey, + from_inclusive = true, + to_key = ToKey, + to_inclusive = false, + limit = undefined }). async_fold_range(Ref,Fun,Acc0,FromKey,ToKey) -> {ok, PID} = async_range(Ref,FromKey,ToKey), @@ -141,16 +160,16 @@ code_change(_OldVsn, State, _Extra) -> -handle_call({async_range, Sender, FromKey, ToKey}, _From, State=#state{ top=TopLevel, nursery=Nursery }) -> +handle_call({async_range, Sender, Range}, _From, State=#state{ top=TopLevel, nursery=Nursery }) -> {ok, FoldWorkerPID} = lsm_btree_fold_worker:start(Sender), - lsm_btree_nursery:do_level_fold(Nursery, FoldWorkerPID, FromKey, ToKey), - Result = lsm_btree_level:async_range(TopLevel, FoldWorkerPID, FromKey, ToKey), + lsm_btree_nursery:do_level_fold(Nursery, FoldWorkerPID, Range), + Result = lsm_btree_level:async_range(TopLevel, FoldWorkerPID, Range), {reply, Result, State}; -handle_call({sync_range, Sender, FromKey, ToKey}, _From, State=#state{ top=TopLevel, nursery=Nursery }) -> +handle_call({sync_range, Sender, Range}, _From, State=#state{ top=TopLevel, nursery=Nursery }) -> {ok, FoldWorkerPID} = lsm_btree_fold_worker:start(Sender), - lsm_btree_nursery:do_level_fold(Nursery, FoldWorkerPID, FromKey, ToKey), - Result = lsm_btree_level:sync_range(TopLevel, FoldWorkerPID, FromKey, ToKey), + lsm_btree_nursery:do_level_fold(Nursery, FoldWorkerPID, Range), + Result = lsm_btree_level:sync_range(TopLevel, FoldWorkerPID, Range), {reply, Result, State}; handle_call({put, Key, Value}, _From, State) when is_binary(Key), is_binary(Value) -> diff --git a/src/lsm_btree.hrl b/src/lsm_btree.hrl index b889e22..7a294cd 100644 --- a/src/lsm_btree.hrl +++ b/src/lsm_btree.hrl @@ -6,7 +6,31 @@ -define(TOMBSTONE, 'deleted'). --define(KEY_IN_RANGE(Key,FromKey,ToKey), - (((FromKey == undefined) orelse (FromKey =< Key)) - and - ((ToKey == undefined) orelse (Key < ToKey)))). +-define(KEY_IN_FROM_RANGE(Key,Range), + ((Range#btree_range.from_inclusive andalso + (Range#btree_range.from_key =< Key)) + orelse + (Range#btree_range.from_key < Key))). + +-define(KEY_IN_TO_RANGE(Key,Range), + ((Range#btree_range.to_key == undefined) + orelse + ((Range#btree_range.to_inclusive andalso + (Key =< Range#btree_range.to_key)) + orelse + (Key < Range#btree_range.to_key)))). + +-define(KEY_IN_RANGE(Key,Range), + ((Range#btree_range.from_inclusive andalso + (Range#btree_range.from_key =< Key)) + orelse + (Range#btree_range.from_key < Key)) + andalso + ((Range#btree_range.to_key == undefined) + orelse + ((Range#btree_range.to_inclusive andalso + (Key =< Range#btree_range.to_key)) + orelse + (Key < Range#btree_range.to_key)))). + + diff --git a/src/lsm_btree_level.erl b/src/lsm_btree_level.erl index 5d28932..a13d2a7 100644 --- a/src/lsm_btree_level.erl +++ b/src/lsm_btree_level.erl @@ -16,7 +16,7 @@ -behavior(plain_fsm). -export([data_vsn/0, code_change/3]). --export([open/3, lookup/2, inject/2, close/1, async_range/4, sync_range/4]). +-export([open/3, lookup/2, inject/2, close/1, async_range/3, sync_range/3]). -include_lib("kernel/include/file.hrl"). @@ -50,15 +50,15 @@ close(Ref) -> -async_range(Ref, FoldWorkerPID, From, To) -> +async_range(Ref, FoldWorkerPID, Range) -> proc_lib:spawn(fun() -> - {ok, Folders} = call(Ref, {init_range_fold, FoldWorkerPID, From, To, []}), + {ok, Folders} = call(Ref, {init_range_fold, FoldWorkerPID, Range, []}), FoldWorkerPID ! {initialize, Folders} end), {ok, FoldWorkerPID}. -sync_range(Ref, FoldWorkerPID, From, To) -> - {ok, Folders} = call(Ref, {sync_range_fold, FoldWorkerPID, From, To, []}), +sync_range(Ref, FoldWorkerPID, Range) -> + {ok, Folders} = call(Ref, {sync_range_fold, FoldWorkerPID, Range, []}), FoldWorkerPID ! {initialize, Folders}, {ok, FoldWorkerPID}. @@ -192,7 +192,7 @@ main_loop(State = #state{ next=Next }) -> end, ok; - ?REQ(From, {init_range_fold, WorkerPID, FromKey, ToKey, List}) when State#state.folding == [] -> + ?REQ(From, {init_range_fold, WorkerPID, Range, List}) when State#state.folding == [] -> case {State#state.a, State#state.b} of {undefined, undefined} -> @@ -201,16 +201,16 @@ main_loop(State = #state{ next=Next }) -> {_, undefined} -> ok = file:make_link(filename("A", State), filename("AF", State)), - {ok, PID0} = start_range_fold(filename("AF",State), WorkerPID, FromKey, ToKey), + {ok, PID0} = start_range_fold(filename("AF",State), WorkerPID, Range), NextList = [PID0|List], NewFolding = [PID0]; {_, _} -> ok = file:make_link(filename("A", State), filename("AF", State)), - {ok, PID0} = start_range_fold(filename("AF",State), WorkerPID, FromKey, ToKey), + {ok, PID0} = start_range_fold(filename("AF",State), WorkerPID, Range), ok = file:make_link(filename("B", State), filename("BF", State)), - {ok, PID1} = start_range_fold(filename("BF",State), WorkerPID, FromKey, ToKey), + {ok, PID1} = start_range_fold(filename("BF",State), WorkerPID, Range), NextList = [PID1,PID0|List], NewFolding = [PID1,PID0] @@ -220,7 +220,7 @@ main_loop(State = #state{ next=Next }) -> undefined -> reply(From, {ok, lists:reverse(NextList)}); _ -> - Next ! ?REQ(From, {init_range_fold, WorkerPID, FromKey, ToKey, NextList}) + Next ! ?REQ(From, {init_range_fold, WorkerPID, Range, NextList}) end, main_loop(State#state{ folding = NewFolding }); @@ -229,7 +229,7 @@ main_loop(State = #state{ next=Next }) -> ok = file:delete(FoldFileName), main_loop(State#state{ folding = lists:delete(PID,State#state.folding) }); - ?REQ(From, {sync_range_fold, WorkerPID, FromKey, ToKey, List}) -> + ?REQ(From, {sync_range_fold, WorkerPID, Range, List}) -> case {State#state.a, State#state.b} of {undefined, undefined} -> @@ -237,15 +237,15 @@ main_loop(State = #state{ next=Next }) -> {_, undefined} -> ARef = erlang:make_ref(), - ok = do_range_fold(State#state.a, WorkerPID, ARef, FromKey, ToKey), + ok = do_range_fold(State#state.a, WorkerPID, ARef, Range), RefList = [ARef|List]; {_, _} -> BRef = erlang:make_ref(), - ok = do_range_fold(State#state.b, WorkerPID, BRef, FromKey, ToKey), + ok = do_range_fold(State#state.b, WorkerPID, BRef, Range), ARef = erlang:make_ref(), - ok = do_range_fold(State#state.a, WorkerPID, ARef, FromKey, ToKey), + ok = do_range_fold(State#state.a, WorkerPID, ARef, Range), RefList = [ARef,BRef|List] end, @@ -254,7 +254,7 @@ main_loop(State = #state{ next=Next }) -> undefined -> reply(From, {ok, lists:reverse(RefList)}); _ -> - Next ! ?REQ(From, {sync_range_fold, WorkerPID, FromKey, ToKey, RefList}) + Next ! ?REQ(From, {sync_range_fold, WorkerPID, Range, RefList}) end, main_loop(State); @@ -393,13 +393,13 @@ filename(PFX, State) -> filename:join(State#state.dir, PFX ++ "-" ++ integer_to_list(State#state.level) ++ ".data"). -start_range_fold(FileName, WorkerPID, FromKey, ToKey) -> +start_range_fold(FileName, WorkerPID, Range) -> Owner = self(), PID = proc_lib:spawn( fun() -> erlang:link(WorkerPID), {ok, File} = lsm_btree_reader:open(FileName, sequential), - do_range_fold(File, WorkerPID, self(), FromKey, ToKey), + do_range_fold(File, WorkerPID, self(), Range), erlang:unlink(WorkerPID), %% this will release the pinning of the fold file @@ -407,14 +407,14 @@ start_range_fold(FileName, WorkerPID, FromKey, ToKey) -> end ), {ok, PID}. -do_range_fold(BT, WorkerPID, Self, FromKey, ToKey) -> +do_range_fold(BT, WorkerPID, Self, Range) -> lsm_btree_reader:range_fold(fun(Key,Value,_) -> WorkerPID ! {level_result, Self, Key, Value}, ok end, ok, BT, - FromKey, ToKey), + Range), %% tell fold merge worker we're done WorkerPID ! {level_done, Self}, diff --git a/src/lsm_btree_nursery.erl b/src/lsm_btree_nursery.erl index 4def7d9..fe66411 100644 --- a/src/lsm_btree_nursery.erl +++ b/src/lsm_btree_nursery.erl @@ -1,8 +1,9 @@ -module(lsm_btree_nursery). -export([new/1, recover/2, add/3, finish/2, lookup/2, add_maybe_flush/4]). --export([do_level_fold/4]). +-export([do_level_fold/3]). +-include("include/lsm_btree.hrl"). -include("lsm_btree.hrl"). -include_lib("kernel/include/file.hrl"). @@ -156,13 +157,16 @@ add_maybe_flush(Key, Value, Nursery=#nursery{ dir=Dir }, Top) -> lsm_btree_nursery:new(Dir) end. -do_level_fold(#nursery{ cache=Cache }, FoldWorkerPID, FromKey, ToKey) -> +do_level_fold(#nursery{ cache=Cache }, FoldWorkerPID, KeyRange) -> Ref = erlang:make_ref(), FoldWorkerPID ! {prefix, [Ref]}, - lists:foreach(fun({Key,Value}) when ?KEY_IN_RANGE(Key,FromKey,ToKey) -> - FoldWorkerPID ! {level_result, Ref, Key, Value}; - (_) -> - ok + lists:foreach(fun({Key,Value}) -> + case ?KEY_IN_RANGE(Key,KeyRange) of + true -> + FoldWorkerPID ! {level_result, Ref, Key, Value}; + false -> + ok + end end, gb_trees:to_list(Cache)), FoldWorkerPID ! {level_done, Ref}, diff --git a/src/lsm_btree_reader.erl b/src/lsm_btree_reader.erl index 894d686..b33932a 100644 --- a/src/lsm_btree_reader.erl +++ b/src/lsm_btree_reader.erl @@ -1,9 +1,10 @@ -module(lsm_btree_reader). -include_lib("kernel/include/file.hrl"). +-include("include/lsm_btree.hrl"). -include("lsm_btree.hrl"). --export([open/1, open/2,close/1,lookup/2,fold/3,range_fold/5]). +-export([open/1, open/2,close/1,lookup/2,fold/3,range_fold/4]). -export([first_node/1,next_node/1]). -record(node, { level, members=[] }). @@ -56,37 +57,36 @@ fold1(File,Fun,Acc0) -> fold0(File,Fun,Node,Acc0) end. -range_fold(Fun, Acc0, #index{file=File,root=Root}, FromKey0, ToKey) -> - FromKey = if FromKey0 == undefined -> <<>>; true -> FromKey0 end, - case lookup_node(File,FromKey,Root,0) of +range_fold(Fun, Acc0, #index{file=File,root=Root}, Range) -> + case lookup_node(File,Range#btree_range.from_key,Root,0) of {ok, {Pos,_}} -> file:position(File, Pos), - do_range_fold(Fun, Acc0, File, FromKey, ToKey); + do_range_fold(Fun, Acc0, File, Range); {ok, Pos} -> file:position(File, Pos), - do_range_fold(Fun, Acc0, File, FromKey, ToKey); + do_range_fold(Fun, Acc0, File, Range); none -> Acc0 end. -do_range_fold(Fun, Acc0, File, FromKey, ToKey) -> +do_range_fold(Fun, Acc0, File, Range) -> case next_leaf_node(File) of eof -> Acc0; {ok, #node{members=Members}} -> Acc1 = - lists:foldl(fun({Key,Value}, Acc) when ?KEY_IN_RANGE(Key, FromKey, ToKey) -> + lists:foldl(fun({Key,Value}, Acc) when ?KEY_IN_RANGE(Key, Range) -> Fun(Key, Value, Acc); - (_,Acc) -> + (_, Acc)-> Acc end, Acc0, Members), case lists:last(Members) of - {LastKey,_} when LastKey < ToKey; ToKey == undefined -> - do_range_fold(Fun, Acc1, File, FromKey, ToKey); + {LastKey,_} when (LastKey /= Range#btree_range.to_key) andalso ?KEY_IN_TO_RANGE(LastKey, Range) -> + do_range_fold(Fun, Acc1, File, Range); _ -> Acc1 end