Introduce btree_range record for range queries
This allows specifying ranges with from/to being inclusive or not, and providing a result limit (latter not implemented yet). This change just makes all current tests pass.
This commit is contained in:
parent
afec223f03
commit
f0def8231b
6 changed files with 103 additions and 50 deletions
6
include/lsm_btree.hrl
Normal file
6
include/lsm_btree.hrl
Normal file
|
@ -0,0 +1,6 @@
|
|||
|
||||
-record(btree_range, { from_key = <<>> :: binary(),
|
||||
from_inclusive = true :: boolean(),
|
||||
to_key :: binary() | undefined,
|
||||
to_inclusive = false :: boolean(),
|
||||
limit :: pos_integer() | undefined }).
|
|
@ -9,6 +9,7 @@
|
|||
|
||||
-include("lsm_btree.hrl").
|
||||
-include_lib("kernel/include/file.hrl").
|
||||
-include_lib("include/lsm_btree.hrl").
|
||||
|
||||
-record(state, { top, nursery, dir }).
|
||||
|
||||
|
@ -38,17 +39,35 @@ delete(Ref,Key) when is_binary(Key) ->
|
|||
put(Ref,Key,Value) when is_binary(Key), is_binary(Value) ->
|
||||
gen_server:call(Ref, {put, Key, Value}).
|
||||
|
||||
sync_range(Ref,FromKey,ToKey) when FromKey == undefined orelse is_binary(FromKey),
|
||||
sync_range(Ref, #btree_range{}=Range) ->
|
||||
gen_server:call(Ref, {sync_range, self(), Range}).
|
||||
|
||||
sync_range(Ref,undefined,ToKey) ->
|
||||
sync_range(Ref, <<>>, ToKey);
|
||||
sync_range(Ref,FromKey,ToKey) when is_binary(FromKey),
|
||||
ToKey == undefined orelse is_binary(ToKey) ->
|
||||
gen_server:call(Ref, {sync_range, self(), FromKey, ToKey}).
|
||||
sync_range(Ref, #btree_range{ from_key = FromKey,
|
||||
from_inclusive = true,
|
||||
to_key = ToKey,
|
||||
to_inclusive = false,
|
||||
limit = undefined }).
|
||||
|
||||
sync_fold_range(Ref,Fun,Acc0,FromKey,ToKey) ->
|
||||
{ok, PID} = sync_range(Ref,FromKey,ToKey),
|
||||
receive_fold_range(PID,Fun,Acc0).
|
||||
|
||||
async_range(Ref,FromKey,ToKey) when FromKey == undefined orelse is_binary(FromKey),
|
||||
async_range(Ref, #btree_range{}=Range) ->
|
||||
gen_server:call(Ref, {async_range, self(), Range}).
|
||||
|
||||
async_range(Ref,undefined,ToKey) ->
|
||||
async_range(Ref, <<>>, ToKey);
|
||||
async_range(Ref,FromKey,ToKey) when is_binary(FromKey),
|
||||
ToKey == undefined orelse is_binary(ToKey) ->
|
||||
gen_server:call(Ref, {async_range, self(), FromKey, ToKey}).
|
||||
async_range(Ref, #btree_range{ from_key = FromKey,
|
||||
from_inclusive = true,
|
||||
to_key = ToKey,
|
||||
to_inclusive = false,
|
||||
limit = undefined }).
|
||||
|
||||
async_fold_range(Ref,Fun,Acc0,FromKey,ToKey) ->
|
||||
{ok, PID} = async_range(Ref,FromKey,ToKey),
|
||||
|
@ -141,16 +160,16 @@ code_change(_OldVsn, State, _Extra) ->
|
|||
|
||||
|
||||
|
||||
handle_call({async_range, Sender, FromKey, ToKey}, _From, State=#state{ top=TopLevel, nursery=Nursery }) ->
|
||||
handle_call({async_range, Sender, Range}, _From, State=#state{ top=TopLevel, nursery=Nursery }) ->
|
||||
{ok, FoldWorkerPID} = lsm_btree_fold_worker:start(Sender),
|
||||
lsm_btree_nursery:do_level_fold(Nursery, FoldWorkerPID, FromKey, ToKey),
|
||||
Result = lsm_btree_level:async_range(TopLevel, FoldWorkerPID, FromKey, ToKey),
|
||||
lsm_btree_nursery:do_level_fold(Nursery, FoldWorkerPID, Range),
|
||||
Result = lsm_btree_level:async_range(TopLevel, FoldWorkerPID, Range),
|
||||
{reply, Result, State};
|
||||
|
||||
handle_call({sync_range, Sender, FromKey, ToKey}, _From, State=#state{ top=TopLevel, nursery=Nursery }) ->
|
||||
handle_call({sync_range, Sender, Range}, _From, State=#state{ top=TopLevel, nursery=Nursery }) ->
|
||||
{ok, FoldWorkerPID} = lsm_btree_fold_worker:start(Sender),
|
||||
lsm_btree_nursery:do_level_fold(Nursery, FoldWorkerPID, FromKey, ToKey),
|
||||
Result = lsm_btree_level:sync_range(TopLevel, FoldWorkerPID, FromKey, ToKey),
|
||||
lsm_btree_nursery:do_level_fold(Nursery, FoldWorkerPID, Range),
|
||||
Result = lsm_btree_level:sync_range(TopLevel, FoldWorkerPID, Range),
|
||||
{reply, Result, State};
|
||||
|
||||
handle_call({put, Key, Value}, _From, State) when is_binary(Key), is_binary(Value) ->
|
||||
|
|
|
@ -6,7 +6,31 @@
|
|||
|
||||
-define(TOMBSTONE, 'deleted').
|
||||
|
||||
-define(KEY_IN_RANGE(Key,FromKey,ToKey),
|
||||
(((FromKey == undefined) orelse (FromKey =< Key))
|
||||
and
|
||||
((ToKey == undefined) orelse (Key < ToKey)))).
|
||||
-define(KEY_IN_FROM_RANGE(Key,Range),
|
||||
((Range#btree_range.from_inclusive andalso
|
||||
(Range#btree_range.from_key =< Key))
|
||||
orelse
|
||||
(Range#btree_range.from_key < Key))).
|
||||
|
||||
-define(KEY_IN_TO_RANGE(Key,Range),
|
||||
((Range#btree_range.to_key == undefined)
|
||||
orelse
|
||||
((Range#btree_range.to_inclusive andalso
|
||||
(Key =< Range#btree_range.to_key))
|
||||
orelse
|
||||
(Key < Range#btree_range.to_key)))).
|
||||
|
||||
-define(KEY_IN_RANGE(Key,Range),
|
||||
((Range#btree_range.from_inclusive andalso
|
||||
(Range#btree_range.from_key =< Key))
|
||||
orelse
|
||||
(Range#btree_range.from_key < Key))
|
||||
andalso
|
||||
((Range#btree_range.to_key == undefined)
|
||||
orelse
|
||||
((Range#btree_range.to_inclusive andalso
|
||||
(Key =< Range#btree_range.to_key))
|
||||
orelse
|
||||
(Key < Range#btree_range.to_key)))).
|
||||
|
||||
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
-behavior(plain_fsm).
|
||||
-export([data_vsn/0, code_change/3]).
|
||||
|
||||
-export([open/3, lookup/2, inject/2, close/1, async_range/4, sync_range/4]).
|
||||
-export([open/3, lookup/2, inject/2, close/1, async_range/3, sync_range/3]).
|
||||
|
||||
-include_lib("kernel/include/file.hrl").
|
||||
|
||||
|
@ -50,15 +50,15 @@ close(Ref) ->
|
|||
|
||||
|
||||
|
||||
async_range(Ref, FoldWorkerPID, From, To) ->
|
||||
async_range(Ref, FoldWorkerPID, Range) ->
|
||||
proc_lib:spawn(fun() ->
|
||||
{ok, Folders} = call(Ref, {init_range_fold, FoldWorkerPID, From, To, []}),
|
||||
{ok, Folders} = call(Ref, {init_range_fold, FoldWorkerPID, Range, []}),
|
||||
FoldWorkerPID ! {initialize, Folders}
|
||||
end),
|
||||
{ok, FoldWorkerPID}.
|
||||
|
||||
sync_range(Ref, FoldWorkerPID, From, To) ->
|
||||
{ok, Folders} = call(Ref, {sync_range_fold, FoldWorkerPID, From, To, []}),
|
||||
sync_range(Ref, FoldWorkerPID, Range) ->
|
||||
{ok, Folders} = call(Ref, {sync_range_fold, FoldWorkerPID, Range, []}),
|
||||
FoldWorkerPID ! {initialize, Folders},
|
||||
{ok, FoldWorkerPID}.
|
||||
|
||||
|
@ -192,7 +192,7 @@ main_loop(State = #state{ next=Next }) ->
|
|||
end,
|
||||
ok;
|
||||
|
||||
?REQ(From, {init_range_fold, WorkerPID, FromKey, ToKey, List}) when State#state.folding == [] ->
|
||||
?REQ(From, {init_range_fold, WorkerPID, Range, List}) when State#state.folding == [] ->
|
||||
|
||||
case {State#state.a, State#state.b} of
|
||||
{undefined, undefined} ->
|
||||
|
@ -201,16 +201,16 @@ main_loop(State = #state{ next=Next }) ->
|
|||
|
||||
{_, undefined} ->
|
||||
ok = file:make_link(filename("A", State), filename("AF", State)),
|
||||
{ok, PID0} = start_range_fold(filename("AF",State), WorkerPID, FromKey, ToKey),
|
||||
{ok, PID0} = start_range_fold(filename("AF",State), WorkerPID, Range),
|
||||
NextList = [PID0|List],
|
||||
NewFolding = [PID0];
|
||||
|
||||
{_, _} ->
|
||||
ok = file:make_link(filename("A", State), filename("AF", State)),
|
||||
{ok, PID0} = start_range_fold(filename("AF",State), WorkerPID, FromKey, ToKey),
|
||||
{ok, PID0} = start_range_fold(filename("AF",State), WorkerPID, Range),
|
||||
|
||||
ok = file:make_link(filename("B", State), filename("BF", State)),
|
||||
{ok, PID1} = start_range_fold(filename("BF",State), WorkerPID, FromKey, ToKey),
|
||||
{ok, PID1} = start_range_fold(filename("BF",State), WorkerPID, Range),
|
||||
|
||||
NextList = [PID1,PID0|List],
|
||||
NewFolding = [PID1,PID0]
|
||||
|
@ -220,7 +220,7 @@ main_loop(State = #state{ next=Next }) ->
|
|||
undefined ->
|
||||
reply(From, {ok, lists:reverse(NextList)});
|
||||
_ ->
|
||||
Next ! ?REQ(From, {init_range_fold, WorkerPID, FromKey, ToKey, NextList})
|
||||
Next ! ?REQ(From, {init_range_fold, WorkerPID, Range, NextList})
|
||||
end,
|
||||
|
||||
main_loop(State#state{ folding = NewFolding });
|
||||
|
@ -229,7 +229,7 @@ main_loop(State = #state{ next=Next }) ->
|
|||
ok = file:delete(FoldFileName),
|
||||
main_loop(State#state{ folding = lists:delete(PID,State#state.folding) });
|
||||
|
||||
?REQ(From, {sync_range_fold, WorkerPID, FromKey, ToKey, List}) ->
|
||||
?REQ(From, {sync_range_fold, WorkerPID, Range, List}) ->
|
||||
|
||||
case {State#state.a, State#state.b} of
|
||||
{undefined, undefined} ->
|
||||
|
@ -237,15 +237,15 @@ main_loop(State = #state{ next=Next }) ->
|
|||
|
||||
{_, undefined} ->
|
||||
ARef = erlang:make_ref(),
|
||||
ok = do_range_fold(State#state.a, WorkerPID, ARef, FromKey, ToKey),
|
||||
ok = do_range_fold(State#state.a, WorkerPID, ARef, Range),
|
||||
RefList = [ARef|List];
|
||||
|
||||
{_, _} ->
|
||||
BRef = erlang:make_ref(),
|
||||
ok = do_range_fold(State#state.b, WorkerPID, BRef, FromKey, ToKey),
|
||||
ok = do_range_fold(State#state.b, WorkerPID, BRef, Range),
|
||||
|
||||
ARef = erlang:make_ref(),
|
||||
ok = do_range_fold(State#state.a, WorkerPID, ARef, FromKey, ToKey),
|
||||
ok = do_range_fold(State#state.a, WorkerPID, ARef, Range),
|
||||
|
||||
RefList = [ARef,BRef|List]
|
||||
end,
|
||||
|
@ -254,7 +254,7 @@ main_loop(State = #state{ next=Next }) ->
|
|||
undefined ->
|
||||
reply(From, {ok, lists:reverse(RefList)});
|
||||
_ ->
|
||||
Next ! ?REQ(From, {sync_range_fold, WorkerPID, FromKey, ToKey, RefList})
|
||||
Next ! ?REQ(From, {sync_range_fold, WorkerPID, Range, RefList})
|
||||
end,
|
||||
|
||||
main_loop(State);
|
||||
|
@ -393,13 +393,13 @@ filename(PFX, State) ->
|
|||
filename:join(State#state.dir, PFX ++ "-" ++ integer_to_list(State#state.level) ++ ".data").
|
||||
|
||||
|
||||
start_range_fold(FileName, WorkerPID, FromKey, ToKey) ->
|
||||
start_range_fold(FileName, WorkerPID, Range) ->
|
||||
Owner = self(),
|
||||
PID =
|
||||
proc_lib:spawn( fun() ->
|
||||
erlang:link(WorkerPID),
|
||||
{ok, File} = lsm_btree_reader:open(FileName, sequential),
|
||||
do_range_fold(File, WorkerPID, self(), FromKey, ToKey),
|
||||
do_range_fold(File, WorkerPID, self(), Range),
|
||||
erlang:unlink(WorkerPID),
|
||||
|
||||
%% this will release the pinning of the fold file
|
||||
|
@ -407,14 +407,14 @@ start_range_fold(FileName, WorkerPID, FromKey, ToKey) ->
|
|||
end ),
|
||||
{ok, PID}.
|
||||
|
||||
do_range_fold(BT, WorkerPID, Self, FromKey, ToKey) ->
|
||||
do_range_fold(BT, WorkerPID, Self, Range) ->
|
||||
lsm_btree_reader:range_fold(fun(Key,Value,_) ->
|
||||
WorkerPID ! {level_result, Self, Key, Value},
|
||||
ok
|
||||
end,
|
||||
ok,
|
||||
BT,
|
||||
FromKey, ToKey),
|
||||
Range),
|
||||
|
||||
%% tell fold merge worker we're done
|
||||
WorkerPID ! {level_done, Self},
|
||||
|
|
|
@ -1,8 +1,9 @@
|
|||
-module(lsm_btree_nursery).
|
||||
|
||||
-export([new/1, recover/2, add/3, finish/2, lookup/2, add_maybe_flush/4]).
|
||||
-export([do_level_fold/4]).
|
||||
-export([do_level_fold/3]).
|
||||
|
||||
-include("include/lsm_btree.hrl").
|
||||
-include("lsm_btree.hrl").
|
||||
-include_lib("kernel/include/file.hrl").
|
||||
|
||||
|
@ -156,13 +157,16 @@ add_maybe_flush(Key, Value, Nursery=#nursery{ dir=Dir }, Top) ->
|
|||
lsm_btree_nursery:new(Dir)
|
||||
end.
|
||||
|
||||
do_level_fold(#nursery{ cache=Cache }, FoldWorkerPID, FromKey, ToKey) ->
|
||||
do_level_fold(#nursery{ cache=Cache }, FoldWorkerPID, KeyRange) ->
|
||||
Ref = erlang:make_ref(),
|
||||
FoldWorkerPID ! {prefix, [Ref]},
|
||||
lists:foreach(fun({Key,Value}) when ?KEY_IN_RANGE(Key,FromKey,ToKey) ->
|
||||
FoldWorkerPID ! {level_result, Ref, Key, Value};
|
||||
(_) ->
|
||||
ok
|
||||
lists:foreach(fun({Key,Value}) ->
|
||||
case ?KEY_IN_RANGE(Key,KeyRange) of
|
||||
true ->
|
||||
FoldWorkerPID ! {level_result, Ref, Key, Value};
|
||||
false ->
|
||||
ok
|
||||
end
|
||||
end,
|
||||
gb_trees:to_list(Cache)),
|
||||
FoldWorkerPID ! {level_done, Ref},
|
||||
|
|
|
@ -1,9 +1,10 @@
|
|||
-module(lsm_btree_reader).
|
||||
|
||||
-include_lib("kernel/include/file.hrl").
|
||||
-include("include/lsm_btree.hrl").
|
||||
-include("lsm_btree.hrl").
|
||||
|
||||
-export([open/1, open/2,close/1,lookup/2,fold/3,range_fold/5]).
|
||||
-export([open/1, open/2,close/1,lookup/2,fold/3,range_fold/4]).
|
||||
-export([first_node/1,next_node/1]).
|
||||
|
||||
-record(node, { level, members=[] }).
|
||||
|
@ -56,37 +57,36 @@ fold1(File,Fun,Acc0) ->
|
|||
fold0(File,Fun,Node,Acc0)
|
||||
end.
|
||||
|
||||
range_fold(Fun, Acc0, #index{file=File,root=Root}, FromKey0, ToKey) ->
|
||||
FromKey = if FromKey0 == undefined -> <<>>; true -> FromKey0 end,
|
||||
case lookup_node(File,FromKey,Root,0) of
|
||||
range_fold(Fun, Acc0, #index{file=File,root=Root}, Range) ->
|
||||
case lookup_node(File,Range#btree_range.from_key,Root,0) of
|
||||
{ok, {Pos,_}} ->
|
||||
file:position(File, Pos),
|
||||
do_range_fold(Fun, Acc0, File, FromKey, ToKey);
|
||||
do_range_fold(Fun, Acc0, File, Range);
|
||||
{ok, Pos} ->
|
||||
file:position(File, Pos),
|
||||
do_range_fold(Fun, Acc0, File, FromKey, ToKey);
|
||||
do_range_fold(Fun, Acc0, File, Range);
|
||||
none ->
|
||||
Acc0
|
||||
end.
|
||||
|
||||
do_range_fold(Fun, Acc0, File, FromKey, ToKey) ->
|
||||
do_range_fold(Fun, Acc0, File, Range) ->
|
||||
case next_leaf_node(File) of
|
||||
eof ->
|
||||
Acc0;
|
||||
|
||||
{ok, #node{members=Members}} ->
|
||||
Acc1 =
|
||||
lists:foldl(fun({Key,Value}, Acc) when ?KEY_IN_RANGE(Key, FromKey, ToKey) ->
|
||||
lists:foldl(fun({Key,Value}, Acc) when ?KEY_IN_RANGE(Key, Range) ->
|
||||
Fun(Key, Value, Acc);
|
||||
(_,Acc) ->
|
||||
(_, Acc)->
|
||||
Acc
|
||||
end,
|
||||
Acc0,
|
||||
Members),
|
||||
|
||||
case lists:last(Members) of
|
||||
{LastKey,_} when LastKey < ToKey; ToKey == undefined ->
|
||||
do_range_fold(Fun, Acc1, File, FromKey, ToKey);
|
||||
{LastKey,_} when (LastKey /= Range#btree_range.to_key) andalso ?KEY_IN_TO_RANGE(LastKey, Range) ->
|
||||
do_range_fold(Fun, Acc1, File, Range);
|
||||
_ ->
|
||||
Acc1
|
||||
end
|
||||
|
|
Loading…
Reference in a new issue