Introduce 3rd file in each level to reduce worst-case
Now, each level is comprised of 3 files, A=Oldest, B=Older, C=Old As in [Overmars and Leeuwen, 1983]. As soon as we have A & B, we initiate a merge, (to the M=New) file, i.e. we merge more eagerly than previously. Next step in this refactoring is to add a scheduler that enforces some merge activity as part of a PUT.
This commit is contained in:
parent
4e53b0a083
commit
3d80b164d5
3 changed files with 146 additions and 52 deletions
|
@ -11,6 +11,8 @@ function periodic() {
|
||||||
echo -n " "
|
echo -n " "
|
||||||
elif ! [ -f "B-$i.data" ] ; then
|
elif ! [ -f "B-$i.data" ] ; then
|
||||||
echo -n "-"
|
echo -n "-"
|
||||||
|
elif ! [ -f "C-$i.data" ] ; then
|
||||||
|
echo -n "#"
|
||||||
elif ! [ -f "X-$i.data" ] ; then
|
elif ! [ -f "X-$i.data" ] ; then
|
||||||
echo -n "="
|
echo -n "="
|
||||||
else
|
else
|
||||||
|
@ -32,8 +34,10 @@ function dynamic() {
|
||||||
s="$s "
|
s="$s "
|
||||||
elif ! [ -f "B-$i.data" ] ; then
|
elif ! [ -f "B-$i.data" ] ; then
|
||||||
s="$s-"
|
s="$s-"
|
||||||
elif ! [ -f "X-$i.data" ] ; then
|
elif ! [ -f "C-$i.data" ] ; then
|
||||||
s="$s="
|
s="$s="
|
||||||
|
elif ! [ -f "X-$i.data" ] ; then
|
||||||
|
s="$s%"
|
||||||
else
|
else
|
||||||
s="$s*"
|
s="$s*"
|
||||||
fi
|
fi
|
||||||
|
|
|
@ -25,7 +25,8 @@
|
||||||
-module(lsm_btree_level).
|
-module(lsm_btree_level).
|
||||||
-author('Kresten Krab Thorup <krab@trifork.com>').
|
-author('Kresten Krab Thorup <krab@trifork.com>').
|
||||||
|
|
||||||
-include("lsm_btree.hrl").
|
-include("include/lsm_btree.hrl").
|
||||||
|
-include("src/lsm_btree.hrl").
|
||||||
|
|
||||||
%%
|
%%
|
||||||
%% Manages a "pair" of lsm_index (or rathern, 0, 1 or 2), and governs
|
%% Manages a "pair" of lsm_index (or rathern, 0, 1 or 2), and governs
|
||||||
|
@ -46,7 +47,7 @@
|
||||||
-include_lib("kernel/include/file.hrl").
|
-include_lib("kernel/include/file.hrl").
|
||||||
|
|
||||||
-record(state, {
|
-record(state, {
|
||||||
a, b, next, dir, level, inject_done_ref, merge_pid, folding = []
|
a, b, c, next, dir, level, inject_done_ref, merge_pid, folding = []
|
||||||
}).
|
}).
|
||||||
|
|
||||||
%%%%% PUBLIC OPERATIONS
|
%%%%% PUBLIC OPERATIONS
|
||||||
|
@ -77,13 +78,13 @@ close(Ref) ->
|
||||||
|
|
||||||
async_range(Ref, FoldWorkerPID, Range) ->
|
async_range(Ref, FoldWorkerPID, Range) ->
|
||||||
proc_lib:spawn(fun() ->
|
proc_lib:spawn(fun() ->
|
||||||
{ok, Folders} = call(Ref, {init_range_fold, FoldWorkerPID, Range, []}),
|
{ok, Folders} = call(Ref, {init_snapshot_range_fold, FoldWorkerPID, Range, []}),
|
||||||
FoldWorkerPID ! {initialize, Folders}
|
FoldWorkerPID ! {initialize, Folders}
|
||||||
end),
|
end),
|
||||||
{ok, FoldWorkerPID}.
|
{ok, FoldWorkerPID}.
|
||||||
|
|
||||||
sync_range(Ref, FoldWorkerPID, Range) ->
|
sync_range(Ref, FoldWorkerPID, Range) ->
|
||||||
{ok, Folders} = call(Ref, {sync_range_fold, FoldWorkerPID, Range, []}),
|
{ok, Folders} = call(Ref, {init_blocking_range_fold, FoldWorkerPID, Range, []}),
|
||||||
FoldWorkerPID ! {initialize, Folders},
|
FoldWorkerPID ! {initialize, Folders},
|
||||||
{ok, FoldWorkerPID}.
|
{ok, FoldWorkerPID}.
|
||||||
|
|
||||||
|
@ -123,29 +124,49 @@ reply({PID,Ref}, Reply) ->
|
||||||
|
|
||||||
|
|
||||||
initialize(State) ->
|
initialize(State) ->
|
||||||
|
|
||||||
|
try
|
||||||
|
initialize2(State)
|
||||||
|
catch
|
||||||
|
Class:Ex ->
|
||||||
|
error_logger:error_msg("crash: ~p:~p ~p~n", [Class,Ex,erlang:get_stacktrace()])
|
||||||
|
end.
|
||||||
|
|
||||||
|
initialize2(State) ->
|
||||||
% error_logger:info_msg("in ~p level=~p~n", [self(), State]),
|
% error_logger:info_msg("in ~p level=~p~n", [self(), State]),
|
||||||
|
|
||||||
AFileName = filename("A",State),
|
AFileName = filename("A",State),
|
||||||
BFileName = filename("B",State),
|
BFileName = filename("B",State),
|
||||||
CFileName = filename("C",State),
|
CFileName = filename("C",State),
|
||||||
|
MFileName = filename("M",State),
|
||||||
|
|
||||||
%% remove old merge file
|
%% remove old merge file
|
||||||
file:delete( filename("X",State)),
|
file:delete( filename("X",State)),
|
||||||
|
|
||||||
%% remove old fold files (hard links to A/B used during fold)
|
%% remove old fold files (hard links to A/B/C used during fold)
|
||||||
file:delete( filename("AF",State)),
|
file:delete( filename("AF",State)),
|
||||||
file:delete( filename("BF",State)),
|
file:delete( filename("BF",State)),
|
||||||
|
file:delete( filename("CF",State)),
|
||||||
|
|
||||||
case file:read_file_info(CFileName) of
|
case file:read_file_info(MFileName) of
|
||||||
{ok, _} ->
|
{ok, _} ->
|
||||||
|
|
||||||
%% recover from post-merge crash
|
%% recover from post-merge crash
|
||||||
file:delete(AFileName),
|
file:delete(AFileName),
|
||||||
file:delete(BFileName),
|
file:delete(BFileName),
|
||||||
ok = file:rename(CFileName, AFileName),
|
ok = file:rename(MFileName, AFileName),
|
||||||
|
|
||||||
{ok, BT} = lsm_btree_reader:open(CFileName, random),
|
{ok, BT} = lsm_btree_reader:open(AFileName, random),
|
||||||
main_loop(State#state{ a= BT, b=undefined });
|
|
||||||
|
case file:read_file_info(CFileName) of
|
||||||
|
{ok, _} ->
|
||||||
|
file:rename(CFileName, BFileName),
|
||||||
|
{ok, BT2} = lsm_btree_reader:open(BFileName, random),
|
||||||
|
check_begin_merge_then_loop(State#state{ a= BT, b=BT2 });
|
||||||
|
|
||||||
|
{error, enoent} ->
|
||||||
|
main_loop(State#state{ a= BT, b=undefined })
|
||||||
|
end;
|
||||||
|
|
||||||
{error, enoent} ->
|
{error, enoent} ->
|
||||||
case file:read_file_info(BFileName) of
|
case file:read_file_info(BFileName) of
|
||||||
|
@ -153,7 +174,14 @@ initialize(State) ->
|
||||||
{ok, BT1} = lsm_btree_reader:open(AFileName, random),
|
{ok, BT1} = lsm_btree_reader:open(AFileName, random),
|
||||||
{ok, BT2} = lsm_btree_reader:open(BFileName, random),
|
{ok, BT2} = lsm_btree_reader:open(BFileName, random),
|
||||||
|
|
||||||
check_begin_merge_then_loop(State#state{ a=BT1, b=BT2 });
|
case file:read_file_info(CFileName) of
|
||||||
|
{ok, _} ->
|
||||||
|
{ok, BT3} = lsm_btree_reader:open(CFileName, random);
|
||||||
|
{error, enoent} ->
|
||||||
|
BT3 = undefined
|
||||||
|
end,
|
||||||
|
|
||||||
|
check_begin_merge_then_loop(State#state{ a=BT1, b=BT2, c=BT3 });
|
||||||
|
|
||||||
{error, enoent} ->
|
{error, enoent} ->
|
||||||
|
|
||||||
|
@ -179,7 +207,7 @@ main_loop(State = #state{ next=Next }) ->
|
||||||
Parent = plain_fsm:info(parent),
|
Parent = plain_fsm:info(parent),
|
||||||
receive
|
receive
|
||||||
?REQ(From, {lookup, Key})=Req ->
|
?REQ(From, {lookup, Key})=Req ->
|
||||||
case do_lookup(Key, [State#state.b, State#state.a, Next]) of
|
case do_lookup(Key, [State#state.c, State#state.b, State#state.a, Next]) of
|
||||||
not_found ->
|
not_found ->
|
||||||
reply(From, not_found);
|
reply(From, not_found);
|
||||||
{found, Result} ->
|
{found, Result} ->
|
||||||
|
@ -189,13 +217,17 @@ main_loop(State = #state{ next=Next }) ->
|
||||||
end,
|
end,
|
||||||
main_loop(State);
|
main_loop(State);
|
||||||
|
|
||||||
?REQ(From, {inject, FileName}) when State#state.b == undefined ->
|
?REQ(From, {inject, FileName}) when State#state.c == undefined ->
|
||||||
if State#state.a == undefined ->
|
case {State#state.a, State#state.b} of
|
||||||
|
{undefined, undefined} ->
|
||||||
ToFileName = filename("A",State),
|
ToFileName = filename("A",State),
|
||||||
SetPos = #state.a;
|
SetPos = #state.a;
|
||||||
true ->
|
{_, undefined} ->
|
||||||
ToFileName = filename("B",State),
|
ToFileName = filename("B",State),
|
||||||
SetPos = #state.b
|
SetPos = #state.b;
|
||||||
|
{_, _} ->
|
||||||
|
ToFileName = filename("C",State),
|
||||||
|
SetPos = #state.c
|
||||||
end,
|
end,
|
||||||
ok = file:rename(FileName, ToFileName),
|
ok = file:rename(FileName, ToFileName),
|
||||||
{ok, BT} = lsm_btree_reader:open(ToFileName, random),
|
{ok, BT} = lsm_btree_reader:open(ToFileName, random),
|
||||||
|
@ -205,6 +237,7 @@ main_loop(State = #state{ next=Next }) ->
|
||||||
?REQ(From, close) ->
|
?REQ(From, close) ->
|
||||||
close_if_defined(State#state.a),
|
close_if_defined(State#state.a),
|
||||||
close_if_defined(State#state.b),
|
close_if_defined(State#state.b),
|
||||||
|
close_if_defined(State#state.c),
|
||||||
stop_if_defined(State#state.merge_pid),
|
stop_if_defined(State#state.merge_pid),
|
||||||
reply(From, ok),
|
reply(From, ok),
|
||||||
|
|
||||||
|
@ -217,69 +250,94 @@ main_loop(State = #state{ next=Next }) ->
|
||||||
end,
|
end,
|
||||||
ok;
|
ok;
|
||||||
|
|
||||||
?REQ(From, {init_range_fold, WorkerPID, Range, List}) when State#state.folding == [] ->
|
?REQ(From, {init_snapshot_range_fold, WorkerPID, Range, List}) when State#state.folding == [] ->
|
||||||
|
|
||||||
case {State#state.a, State#state.b} of
|
case {State#state.a, State#state.b, State#state.c} of
|
||||||
{undefined, undefined} ->
|
{undefined, undefined, undefined} ->
|
||||||
NewFolding = [],
|
FoldingPIDs = [],
|
||||||
NextList = List;
|
NextList = List;
|
||||||
|
|
||||||
{_, undefined} ->
|
{_, undefined, undefined} ->
|
||||||
ok = file:make_link(filename("A", State), filename("AF", State)),
|
ok = file:make_link(filename("A", State), filename("AF", State)),
|
||||||
{ok, PID0} = start_range_fold(filename("AF",State), WorkerPID, Range),
|
{ok, PID0} = start_range_fold(filename("AF",State), WorkerPID, Range),
|
||||||
NextList = [PID0|List],
|
NextList = [PID0|List],
|
||||||
NewFolding = [PID0];
|
FoldingPIDs = [PID0];
|
||||||
|
|
||||||
{_, _} ->
|
{_, _, undefined} ->
|
||||||
ok = file:make_link(filename("A", State), filename("AF", State)),
|
ok = file:make_link(filename("A", State), filename("AF", State)),
|
||||||
{ok, PID0} = start_range_fold(filename("AF",State), WorkerPID, Range),
|
{ok, PIDA} = start_range_fold(filename("AF",State), WorkerPID, Range),
|
||||||
|
|
||||||
ok = file:make_link(filename("B", State), filename("BF", State)),
|
ok = file:make_link(filename("B", State), filename("BF", State)),
|
||||||
{ok, PID1} = start_range_fold(filename("BF",State), WorkerPID, Range),
|
{ok, PIDB} = start_range_fold(filename("BF",State), WorkerPID, Range),
|
||||||
|
|
||||||
NextList = [PID1,PID0|List],
|
NextList = [PIDA,PIDB|List],
|
||||||
NewFolding = [PID1,PID0]
|
FoldingPIDs = [PIDB,PIDA];
|
||||||
|
|
||||||
|
{_, _, _} ->
|
||||||
|
ok = file:make_link(filename("A", State), filename("AF", State)),
|
||||||
|
{ok, PIDA} = start_range_fold(filename("AF",State), WorkerPID, Range),
|
||||||
|
|
||||||
|
ok = file:make_link(filename("B", State), filename("BF", State)),
|
||||||
|
{ok, PIDB} = start_range_fold(filename("BF",State), WorkerPID, Range),
|
||||||
|
|
||||||
|
ok = file:make_link(filename("C", State), filename("CF", State)),
|
||||||
|
{ok, PIDC} = start_range_fold(filename("CF",State), WorkerPID, Range),
|
||||||
|
|
||||||
|
NextList = [PIDA,PIDB,PIDC|List],
|
||||||
|
FoldingPIDs = [PIDC,PIDB,PIDA]
|
||||||
end,
|
end,
|
||||||
|
|
||||||
case Next of
|
case Next of
|
||||||
undefined ->
|
undefined ->
|
||||||
reply(From, {ok, lists:reverse(NextList)});
|
reply(From, {ok, lists:reverse(NextList)});
|
||||||
_ ->
|
_ ->
|
||||||
Next ! ?REQ(From, {init_range_fold, WorkerPID, Range, NextList})
|
Next ! ?REQ(From, {init_snapshot_range_fold, WorkerPID, Range, NextList})
|
||||||
end,
|
end,
|
||||||
|
|
||||||
main_loop(State#state{ folding = NewFolding });
|
main_loop(State#state{ folding = FoldingPIDs });
|
||||||
|
|
||||||
{range_fold_done, PID, [_,$F|_]=FoldFileName} ->
|
{range_fold_done, PID, [_,$F|_]=FoldFileName} ->
|
||||||
ok = file:delete(FoldFileName),
|
ok = file:delete(FoldFileName),
|
||||||
main_loop(State#state{ folding = lists:delete(PID,State#state.folding) });
|
main_loop(State#state{ folding = lists:delete(PID,State#state.folding) });
|
||||||
|
|
||||||
?REQ(From, {sync_range_fold, WorkerPID, Range, List}) ->
|
?REQ(From, {init_blocking_range_fold, WorkerPID, Range, List}) ->
|
||||||
|
|
||||||
case {State#state.a, State#state.b} of
|
case {State#state.a, State#state.b, State#state.c} of
|
||||||
{undefined, undefined} ->
|
{undefined, undefined, undefined} ->
|
||||||
RefList = List;
|
RefList = List;
|
||||||
|
|
||||||
{_, undefined} ->
|
{_, undefined, undefined} ->
|
||||||
ARef = erlang:make_ref(),
|
ARef = erlang:make_ref(),
|
||||||
ok = do_range_fold(State#state.a, WorkerPID, ARef, Range),
|
ok = do_range_fold(State#state.a, WorkerPID, ARef, Range),
|
||||||
RefList = [ARef|List];
|
RefList = [ARef|List];
|
||||||
|
|
||||||
{_, _} ->
|
{_, _, undefined} ->
|
||||||
BRef = erlang:make_ref(),
|
BRef = erlang:make_ref(),
|
||||||
ok = do_range_fold(State#state.b, WorkerPID, BRef, Range),
|
ok = do_range_fold(State#state.b, WorkerPID, BRef, Range),
|
||||||
|
|
||||||
ARef = erlang:make_ref(),
|
ARef = erlang:make_ref(),
|
||||||
ok = do_range_fold(State#state.a, WorkerPID, ARef, Range),
|
ok = do_range_fold(State#state.a, WorkerPID, ARef, Range),
|
||||||
|
|
||||||
RefList = [ARef,BRef|List]
|
RefList = [ARef,BRef|List];
|
||||||
|
|
||||||
|
{_, _, _} ->
|
||||||
|
CRef = erlang:make_ref(),
|
||||||
|
ok = do_range_fold(State#state.c, WorkerPID, CRef, Range),
|
||||||
|
|
||||||
|
BRef = erlang:make_ref(),
|
||||||
|
ok = do_range_fold(State#state.b, WorkerPID, BRef, Range),
|
||||||
|
|
||||||
|
ARef = erlang:make_ref(),
|
||||||
|
ok = do_range_fold(State#state.a, WorkerPID, ARef, Range),
|
||||||
|
|
||||||
|
RefList = [ARef,BRef,CRef|List]
|
||||||
end,
|
end,
|
||||||
|
|
||||||
case Next of
|
case Next of
|
||||||
undefined ->
|
undefined ->
|
||||||
reply(From, {ok, lists:reverse(RefList)});
|
reply(From, {ok, lists:reverse(RefList)});
|
||||||
_ ->
|
_ ->
|
||||||
Next ! ?REQ(From, {sync_range_fold, WorkerPID, Range, RefList})
|
Next ! ?REQ(From, {init_blocking_range_fold, WorkerPID, Range, RefList})
|
||||||
end,
|
end,
|
||||||
|
|
||||||
main_loop(State);
|
main_loop(State);
|
||||||
|
@ -291,19 +349,29 @@ main_loop(State = #state{ next=Next }) ->
|
||||||
%%
|
%%
|
||||||
{merge_done, Count, OutFileName} when Count =< ?BTREE_SIZE(State#state.level) ->
|
{merge_done, Count, OutFileName} when Count =< ?BTREE_SIZE(State#state.level) ->
|
||||||
|
|
||||||
% first, rename the tmp file to C, so recovery will pick it up
|
% first, rename the tmp file to M, so recovery will pick it up
|
||||||
CFileName = filename("C",State),
|
MFileName = filename("M",State),
|
||||||
ok = file:rename(OutFileName, CFileName),
|
ok = file:rename(OutFileName, MFileName),
|
||||||
|
|
||||||
% then delete A and B (if we crash now, C will become the A file)
|
% then delete A and B (if we crash now, C will become the A file)
|
||||||
{ok, State2} = close_a_and_b(State),
|
{ok, State2} = close_and_delete_a_and_b(State),
|
||||||
|
|
||||||
% then, rename C to A, and open it
|
% then, rename M to A, and open it
|
||||||
AFileName = filename("A",State2),
|
AFileName = filename("A",State2),
|
||||||
ok = file:rename(CFileName, AFileName),
|
ok = file:rename(MFileName, AFileName),
|
||||||
{ok, BT} = lsm_btree_reader:open(AFileName, random),
|
{ok, BT} = lsm_btree_reader:open(AFileName, random),
|
||||||
|
|
||||||
main_loop(State2#state{ a=BT, b=undefined, merge_pid=undefined });
|
% iff there is a C file, then move it to B position
|
||||||
|
% TODO: consider recovery for this
|
||||||
|
case State#state.c of
|
||||||
|
undefined ->
|
||||||
|
main_loop(State2#state{ a=BT, b=undefined, merge_pid=undefined });
|
||||||
|
TreeFile ->
|
||||||
|
file:rename(filename("C",State2), filename("B", State2)),
|
||||||
|
check_begin_merge_then_loop(State2#state{ a=BT, b=TreeFile, c=undefined,
|
||||||
|
merge_pid=undefined })
|
||||||
|
|
||||||
|
end;
|
||||||
|
|
||||||
%%
|
%%
|
||||||
%% We need to push the output of merging to the next level
|
%% We need to push the output of merging to the next level
|
||||||
|
@ -317,6 +385,8 @@ main_loop(State = #state{ next=Next }) ->
|
||||||
State
|
State
|
||||||
end,
|
end,
|
||||||
|
|
||||||
|
%% no need to rename it since we don't accept new injects
|
||||||
|
|
||||||
MRef = send_request(State1#state.next, {inject, OutFileName}),
|
MRef = send_request(State1#state.next, {inject, OutFileName}),
|
||||||
main_loop(State1#state{ inject_done_ref = MRef, merge_pid=undefined });
|
main_loop(State1#state{ inject_done_ref = MRef, merge_pid=undefined });
|
||||||
|
|
||||||
|
@ -325,14 +395,25 @@ main_loop(State = #state{ next=Next }) ->
|
||||||
%%
|
%%
|
||||||
?REPLY(MRef, ok) when MRef =:= State#state.inject_done_ref ->
|
?REPLY(MRef, ok) when MRef =:= State#state.inject_done_ref ->
|
||||||
erlang:demonitor(MRef, [flush]),
|
erlang:demonitor(MRef, [flush]),
|
||||||
{ok, State2} = close_a_and_b(State),
|
{ok, State2} = close_and_delete_a_and_b(State),
|
||||||
main_loop(State2#state{ inject_done_ref=undefined });
|
|
||||||
|
% if there is a "C" file, then move it to "A" position.
|
||||||
|
case State2#state.c of
|
||||||
|
undefined ->
|
||||||
|
State3=State2;
|
||||||
|
TreeFile ->
|
||||||
|
%% TODO: on what OS's is it ok to rename an open file?
|
||||||
|
ok = file:rename(filename("C", State2), filename("A", State2)),
|
||||||
|
State3 = State2#state{ a = TreeFile, b=undefined, c=undefined }
|
||||||
|
end,
|
||||||
|
|
||||||
|
main_loop(State3#state{ inject_done_ref=undefined });
|
||||||
|
|
||||||
%%
|
%%
|
||||||
%% Our successor died!
|
%% Our successor died!
|
||||||
%%
|
%%
|
||||||
{'DOWN', MRef, _, _, Reason} when MRef =:= State#state.inject_done_ref ->
|
{'DOWN', MRef, _, _, Reason} when MRef =:= State#state.inject_done_ref ->
|
||||||
exit(Reason);
|
exit(Reason);
|
||||||
|
|
||||||
%% gen_fsm handling
|
%% gen_fsm handling
|
||||||
{system, From, Req} ->
|
{system, From, Req} ->
|
||||||
|
@ -404,7 +485,7 @@ begin_merge(State) ->
|
||||||
{ok, MergePID}.
|
{ok, MergePID}.
|
||||||
|
|
||||||
|
|
||||||
close_a_and_b(State) ->
|
close_and_delete_a_and_b(State) ->
|
||||||
AFileName = filename("A",State),
|
AFileName = filename("A",State),
|
||||||
BFileName = filename("B",State),
|
BFileName = filename("B",State),
|
||||||
|
|
||||||
|
@ -435,19 +516,23 @@ start_range_fold(FileName, WorkerPID, Range) ->
|
||||||
end ),
|
end ),
|
||||||
{ok, PID}.
|
{ok, PID}.
|
||||||
|
|
||||||
do_range_fold(BT, WorkerPID, Self, Range) ->
|
-spec do_range_fold(BT :: lsm_btree_reader:read_file(),
|
||||||
|
WorkerPID :: pid(),
|
||||||
|
SelfOrRef :: pid() | reference(),
|
||||||
|
Range :: #btree_range{} ) -> ok.
|
||||||
|
do_range_fold(BT, WorkerPID, SelfOrRef, Range) ->
|
||||||
case lsm_btree_reader:range_fold(fun(Key,Value,_) ->
|
case lsm_btree_reader:range_fold(fun(Key,Value,_) ->
|
||||||
WorkerPID ! {level_result, Self, Key, Value},
|
WorkerPID ! {level_result, SelfOrRef, Key, Value},
|
||||||
ok
|
ok
|
||||||
end,
|
end,
|
||||||
ok,
|
ok,
|
||||||
BT,
|
BT,
|
||||||
Range) of
|
Range) of
|
||||||
{limit, _, LastKey} ->
|
{limit, _, LastKey} ->
|
||||||
WorkerPID ! {level_limit, Self, LastKey};
|
WorkerPID ! {level_limit, SelfOrRef, LastKey};
|
||||||
{done, _} ->
|
{done, _} ->
|
||||||
%% tell fold merge worker we're done
|
%% tell fold merge worker we're done
|
||||||
WorkerPID ! {level_done, Self}
|
WorkerPID ! {level_done, SelfOrRef}
|
||||||
|
|
||||||
end,
|
end,
|
||||||
ok.
|
ok.
|
||||||
|
|
|
@ -35,9 +35,14 @@
|
||||||
-record(node, { level, members=[] }).
|
-record(node, { level, members=[] }).
|
||||||
-record(index, {file, root, bloom}).
|
-record(index, {file, root, bloom}).
|
||||||
|
|
||||||
|
-type read_file() :: #index{}.
|
||||||
|
|
||||||
|
-spec open(Name::string()) -> read_file().
|
||||||
open(Name) ->
|
open(Name) ->
|
||||||
open(Name, random).
|
open(Name, random).
|
||||||
|
|
||||||
|
-spec open(Name::string(), sequential|random) -> read_file().
|
||||||
|
|
||||||
%% this is how to open a btree for sequential scanning (merge, fold)
|
%% this is how to open a btree for sequential scanning (merge, fold)
|
||||||
open(Name, sequential) ->
|
open(Name, sequential) ->
|
||||||
{ok, File} = file:open(Name, [raw,read,{read_ahead, 1024 * 512},binary]),
|
{ok, File} = file:open(Name, [raw,read,{read_ahead, 1024 * 512},binary]),
|
||||||
|
|
Loading…
Reference in a new issue