Compare commits
2 commits
master
...
krab-backe
Author | SHA1 | Date | |
---|---|---|---|
|
2a205db9fe | ||
|
3a43d9235b |
7 changed files with 374 additions and 84 deletions
|
@ -27,3 +27,5 @@
|
|||
-define(REPLY(Ref,Msg), {'$reply', Ref, Msg}).
|
||||
-define(CAST(From,Msg), {'$cast', From, Msg}).
|
||||
|
||||
-type caller() :: { pid(), reference() }.
|
||||
|
||||
|
|
|
@ -57,7 +57,9 @@
|
|||
max_level :: integer(),
|
||||
config=[] :: [{atom(), term()}],
|
||||
step=0 :: integer(),
|
||||
merge_done=0 :: integer()}).
|
||||
merge_done=0 :: integer(),
|
||||
backend = hanoidb_han2_backend :: atom()
|
||||
}).
|
||||
|
||||
-type kventry() :: { key(), expvalue() } | [ kventry() ].
|
||||
-type key() :: binary().
|
||||
|
|
174
src/hanoidb_backend.erl
Normal file
174
src/hanoidb_backend.erl
Normal file
|
@ -0,0 +1,174 @@
|
|||
|
||||
-module(hanoidb_backend).
|
||||
|
||||
-include("include/hanoidb.hrl").
|
||||
-include("src/hanoidb.hrl").
|
||||
|
||||
-type options() :: [ atom() | { atom(), term() } ].
|
||||
-type kvexp_entry() :: { Key :: key(), Value :: value(), TimeOut :: expiry() }.
|
||||
-type batch_reader() :: any().
|
||||
-type batch_writer() :: any().
|
||||
-type random_reader() :: any().
|
||||
|
||||
-export([merge/7]).
|
||||
|
||||
%%%=========================================================================
|
||||
%%% API
|
||||
%%%=========================================================================
|
||||
|
||||
%% batch_reader and batch_writer are used by the merging logic. A batch_reader
|
||||
%% must return the values in lexicographical order of the binary keys.
|
||||
|
||||
-callback open_batch_reader(File :: string(), Options :: options())
|
||||
-> {ok, batch_reader()} | { error, term() }.
|
||||
-callback read_next(batch_reader())
|
||||
-> { [kvexp_entry(), ...], batch_reader()} | 'done'.
|
||||
-callback close_batch_reader( batch_reader() )
|
||||
-> ok | {error, term()}.
|
||||
|
||||
|
||||
-callback open_batch_writer(File :: string(), Options :: options())
|
||||
-> {ok, batch_writer()} | {error, term()}.
|
||||
-callback write_next( kvexp_entry() , batch_writer() )
|
||||
-> {ok, batch_writer()} | {error, term()}.
|
||||
-callback write_count( batch_writer() ) ->
|
||||
{ok, non_neg_integer()} | {error, term()}.
|
||||
-callback close_batch_writer( batch_writer() )
|
||||
-> ok | {error, term()}.
|
||||
|
||||
|
||||
-callback open_random_reader(File :: string(), Options :: options()) ->
|
||||
{ok, random_reader()} | {error, term()}.
|
||||
-callback file_name( random_reader() ) ->
|
||||
{ok, string()} | {error, term()}.
|
||||
-callback lookup( Key :: key(), random_reader() ) ->
|
||||
not_found | {ok, value()}.
|
||||
-callback range_fold( fun( (key(), value(), term()) -> term() ),
|
||||
Acc0 :: term(),
|
||||
Reader :: random_reader(),
|
||||
Range :: #key_range{} ) ->
|
||||
{limit, term(), LastKey :: binary()} | {ok, term()}.
|
||||
-callback close_random_reader(random_reader()) ->
|
||||
ok | {error, term()}.
|
||||
|
||||
|
||||
|
||||
|
||||
-spec merge(atom(), string(), string(), string(), integer(), boolean(), list()) -> {ok, integer()}.
|
||||
merge(Mod,A,B,C, Size, IsLastLevel, Options) ->
|
||||
{ok, IXA} = Mod:open_batch_reader(A, Options),
|
||||
{ok, IXB} = Mod:open_batch_reader(B, Options),
|
||||
{ok, Out} = Mod:open_batch_writer(C, [{size, Size} | Options]),
|
||||
scan(Mod,IXA, IXB, Out, IsLastLevel, [], [], {0, none}).
|
||||
|
||||
terminate(Mod, Out) ->
|
||||
{ok, Count} = Mod:write_count( Out ),
|
||||
ok = Mod:close_batch_writer( Out ),
|
||||
{ok, Count}.
|
||||
|
||||
step(S) ->
|
||||
step(S, 1).
|
||||
|
||||
step({N, From}, Steps) ->
|
||||
{N-Steps, From}.
|
||||
|
||||
|
||||
scan(Mod, IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {N, FromPID}) when N < 1, AKVs =/= [], BKVs =/= [] ->
|
||||
case FromPID of
|
||||
none ->
|
||||
ok;
|
||||
{PID, Ref} ->
|
||||
PID ! {Ref, step_done}
|
||||
end,
|
||||
|
||||
receive
|
||||
{step, From, HowMany} ->
|
||||
scan(Mod, IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {N+HowMany, From})
|
||||
end;
|
||||
|
||||
scan(Mod, IXA, IXB, Out, IsLastLevel, [], BKVs, Step) ->
|
||||
case Mod:read_next(IXA) of
|
||||
{AKVs, IXA2} ->
|
||||
scan(Mod, IXA2, IXB, Out, IsLastLevel, AKVs, BKVs, Step);
|
||||
done ->
|
||||
ok = Mod:close_batch_reader(IXA),
|
||||
scan_only(Mod, IXB, Out, IsLastLevel, BKVs, Step)
|
||||
end;
|
||||
|
||||
scan(Mod, IXA, IXB, Out, IsLastLevel, AKVs, [], Step) ->
|
||||
case Mod:read_next(IXB) of
|
||||
{BKVs, IXB2} ->
|
||||
scan(Mod, IXA, IXB2, Out, IsLastLevel, AKVs, BKVs, Step);
|
||||
done ->
|
||||
ok = Mod:close_batch_reader(IXB),
|
||||
scan_only(Mod, IXA, Out, IsLastLevel, AKVs, Step)
|
||||
end;
|
||||
|
||||
scan(Mod, IXA, IXB, Out, IsLastLevel, [{Key1,_,_}=Entry|AT], [{Key2,_,_}|_]=BKVs, Step)
|
||||
when Key1 < Key2 ->
|
||||
case Entry of
|
||||
{_, ?TOMBSTONE, _} when IsLastLevel ->
|
||||
scan(Mod, IXA, IXB, Out, true, AT, BKVs, step(Step));
|
||||
_ ->
|
||||
{ok, Out3} = Mod:write_next( Entry, Out ),
|
||||
scan(Mod, IXA, IXB, Out3, IsLastLevel, AT, BKVs, step(Step))
|
||||
end;
|
||||
scan(Mod, IXA, IXB, Out, IsLastLevel, [{Key1,_,_}|_]=AKVs, [{Key2,_,_}=Entry|BT], Step)
|
||||
when Key1 > Key2 ->
|
||||
case Entry of
|
||||
{_, ?TOMBSTONE, _} when IsLastLevel ->
|
||||
scan(Mod, IXA, IXB, Out, true, AKVs, BT, step(Step));
|
||||
_ ->
|
||||
{ok, Out3} = Mod:write_next( Entry, Out ),
|
||||
scan(Mod, IXA, IXB, Out3, IsLastLevel, AKVs, BT, step(Step))
|
||||
end;
|
||||
scan(Mod, IXA, IXB, Out, IsLastLevel, [_|AT], [Entry|BT], Step) ->
|
||||
case Entry of
|
||||
{_, ?TOMBSTONE, _} when IsLastLevel ->
|
||||
scan(Mod, IXA, IXB, Out, true, AT, BT, step(Step));
|
||||
_ ->
|
||||
{ok, Out3} = Mod:write_next( Entry, Out ),
|
||||
scan(Mod, IXA, IXB, Out3, IsLastLevel, AT, BT, step(Step, 2))
|
||||
end.
|
||||
|
||||
|
||||
scan_only(Mod, IX, Out, IsLastLevel, KVs, {N, FromPID}) when N < 1, KVs =/= [] ->
|
||||
case FromPID of
|
||||
none ->
|
||||
ok;
|
||||
{PID, Ref} ->
|
||||
PID ! {Ref, step_done}
|
||||
end,
|
||||
|
||||
receive
|
||||
{step, From, HowMany} ->
|
||||
scan_only(Mod, IX, Out, IsLastLevel, KVs, {N+HowMany, From})
|
||||
end;
|
||||
|
||||
scan_only(Mod, IX, Out, IsLastLevel, [], {_, FromPID}=Step) ->
|
||||
case Mod:read_next(IX) of
|
||||
{KVs, IX2} ->
|
||||
scan_only(Mod, IX2, Out, IsLastLevel, KVs, Step);
|
||||
done ->
|
||||
case FromPID of
|
||||
none ->
|
||||
ok;
|
||||
{PID, Ref} ->
|
||||
PID ! {Ref, step_done}
|
||||
end,
|
||||
ok = Mod:close_batch_reader(IX),
|
||||
terminate(Mod, Out)
|
||||
end;
|
||||
|
||||
scan_only(Mod, IX, Out, true, [{_,?TOMBSTONE,_}|Rest], Step) ->
|
||||
scan_only(Mod, IX, Out, true, Rest, step(Step));
|
||||
|
||||
scan_only(Mod, IX, Out, IsLastLevel, [Entry|Rest], Step) ->
|
||||
{ok, Out3} = Mod:write_next( Entry, Out ),
|
||||
scan_only(Mod, IX, Out3, IsLastLevel, Rest, step(Step)).
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
70
src/hanoidb_han2_backend.erl
Normal file
70
src/hanoidb_han2_backend.erl
Normal file
|
@ -0,0 +1,70 @@
|
|||
-module(hanoidb_han2_backend).
|
||||
|
||||
-include("hanoidb.hrl").
|
||||
|
||||
-behavior(hanoidb_backend).
|
||||
|
||||
-export([open_random_reader/2, file_name/1, range_fold/4, lookup/2, close_random_reader/1]).
|
||||
-export([open_batch_reader/2, read_next/1, close_batch_reader/1]).
|
||||
-export([open_batch_writer/2, write_next/2, write_count/1, close_batch_writer/1]).
|
||||
|
||||
|
||||
open_random_reader(Name, Options) ->
|
||||
hanoidb_reader:open(Name, [random|Options]).
|
||||
|
||||
file_name(Reader) ->
|
||||
hanoidb_reader:file_name(Reader).
|
||||
|
||||
lookup(Key, Reader) ->
|
||||
hanoidb_reader:lookup(Reader, Key).
|
||||
|
||||
range_fold(Fun, Acc, Reader, Range) ->
|
||||
hanoidb_reader:range_fold(Fun, Acc, Reader, Range).
|
||||
|
||||
close_random_reader(Reader) ->
|
||||
hanoidb_reader:close(Reader).
|
||||
|
||||
|
||||
|
||||
open_batch_reader(Name, Options) ->
|
||||
hanoidb_reader:open(Name, [sequential|Options]).
|
||||
|
||||
read_next(Reader) ->
|
||||
case hanoidb_reader:next_node(Reader) of
|
||||
{node, KVs} ->
|
||||
{[ unfold(KV) || KV <- KVs], Reader};
|
||||
end_of_data ->
|
||||
'done';
|
||||
{error, _}=Err ->
|
||||
Err
|
||||
end.
|
||||
|
||||
unfold({Key,{Value, Expiry}}) when is_binary(Value); ?TOMBSTONE =:= Value ->
|
||||
{Key,Value,Expiry};
|
||||
unfold({Key,Value}) ->
|
||||
{Key, Value, infinity}.
|
||||
|
||||
close_batch_reader(Reader) ->
|
||||
hanoidb_reader:close(Reader).
|
||||
|
||||
open_batch_writer(Name, Options) ->
|
||||
hanoidb_writer:init([Name, Options]).
|
||||
|
||||
write_next( {Key, Value, infinity}, Writer) ->
|
||||
{noreply, Writer2} = hanoidb_writer:handle_cast({add, Key, Value}, Writer),
|
||||
{ok, Writer2};
|
||||
write_next( {Key, Value, Expiry}, Writer) ->
|
||||
{noreply, Writer2} = hanoidb_writer:handle_cast({add, Key, {Value, Expiry}}, Writer),
|
||||
{ok, Writer2}.
|
||||
|
||||
write_count( Writer ) ->
|
||||
case hanoidb_writer:handle_call(count, self(), Writer) of
|
||||
{ok, Count, _} ->
|
||||
{ok, Count};
|
||||
Err ->
|
||||
{error, Err}
|
||||
end.
|
||||
|
||||
close_batch_writer(Writer) ->
|
||||
{stop, normal, ok, _} = hanoidb_writer:handle_call(close, self(), Writer),
|
||||
ok.
|
|
@ -51,9 +51,24 @@
|
|||
-include_lib("kernel/include/file.hrl").
|
||||
|
||||
-record(state, {
|
||||
a, b, c, next, dir, level, inject_done_ref, merge_pid, folding = [],
|
||||
step_next_ref, step_caller, step_merge_ref,
|
||||
opts = [], owner, work_in_progress=0, work_done=0, max_level=?TOP_LEVEL
|
||||
a :: undefined | hanoidb_backend:random_reader(),
|
||||
b :: undefined | hanoidb_backend:random_reader(),
|
||||
c :: undefined | hanoidb_backend:random_reader(),
|
||||
next :: pid() | undefined,
|
||||
dir :: string(),
|
||||
level :: non_neg_integer(),
|
||||
inject_done_ref :: reference(),
|
||||
merge_pid :: pid(),
|
||||
folding = [] :: list( pid() ),
|
||||
step_next_ref :: reference() | undefined,
|
||||
step_caller :: plain_rpc:caller() | undefined,
|
||||
step_merge_ref :: reference() | undefined,
|
||||
opts = [] :: list(),
|
||||
owner :: pid(),
|
||||
work_in_progress=0 :: non_neg_integer(),
|
||||
work_done=0 :: non_neg_integer(),
|
||||
max_level=?TOP_LEVEL :: pos_integer(),
|
||||
backend = hanoidb_han2_backend :: atom()
|
||||
}).
|
||||
|
||||
|
||||
|
@ -177,6 +192,8 @@ initialize2(State) ->
|
|||
file:delete(filename("BF",State)),
|
||||
file:delete(filename("CF",State)),
|
||||
|
||||
Mod = State#state.backend,
|
||||
|
||||
case file:read_file_info(MFileName) of
|
||||
{ok, _} ->
|
||||
|
||||
|
@ -188,12 +205,12 @@ initialize2(State) ->
|
|||
file:delete(BFileName),
|
||||
ok = file:rename(MFileName, AFileName),
|
||||
|
||||
{ok, IXA} = hanoidb_reader:open(AFileName, [random|State#state.opts]),
|
||||
{ok, IXA} = Mod:open_random_reader(AFileName, [random|State#state.opts]),
|
||||
|
||||
case file:read_file_info(CFileName) of
|
||||
{ok, _} ->
|
||||
file:rename(CFileName, BFileName),
|
||||
{ok, IXB} = hanoidb_reader:open(BFileName, [random|State#state.opts]),
|
||||
{ok, IXB} = Mod:open_random_reader(BFileName, [random|State#state.opts]),
|
||||
check_begin_merge_then_loop0(init_state(State#state{ a= IXA, b=IXB }));
|
||||
|
||||
{error, enoent} ->
|
||||
|
@ -203,13 +220,13 @@ initialize2(State) ->
|
|||
{error, enoent} ->
|
||||
case file:read_file_info(BFileName) of
|
||||
{ok, _} ->
|
||||
{ok, IXA} = hanoidb_reader:open(AFileName, [random|State#state.opts]),
|
||||
{ok, IXB} = hanoidb_reader:open(BFileName, [random|State#state.opts]),
|
||||
{ok, IXA} = Mod:open_random_reader(AFileName, [random|State#state.opts]),
|
||||
{ok, IXB} = Mod:open_random_reader(BFileName, [random|State#state.opts]),
|
||||
|
||||
IXC =
|
||||
case file:read_file_info(CFileName) of
|
||||
{ok, _} ->
|
||||
{ok, C} = hanoidb_reader:open(CFileName, [random|State#state.opts]),
|
||||
{ok, C} = Mod:open_random_reader(CFileName, [random|State#state.opts]),
|
||||
C;
|
||||
{error, enoent} ->
|
||||
undefined
|
||||
|
@ -224,7 +241,7 @@ initialize2(State) ->
|
|||
|
||||
case file:read_file_info(AFileName) of
|
||||
{ok, _} ->
|
||||
{ok, IXA} = hanoidb_reader:open(AFileName, [random|State#state.opts]),
|
||||
{ok, IXA} = Mod:open_random_reader(AFileName, [random|State#state.opts]),
|
||||
main_loop(init_state(State#state{ a=IXA }));
|
||||
|
||||
{error, enoent} ->
|
||||
|
@ -263,28 +280,28 @@ check_begin_merge_then_loop(State=#state{a=IXA, b=IXB, merge_pid=undefined})
|
|||
check_begin_merge_then_loop(State) ->
|
||||
main_loop(State).
|
||||
|
||||
main_loop(State = #state{ next=Next }) ->
|
||||
main_loop(State = #state{ next=Next, backend=Mod }) ->
|
||||
Parent = plain_fsm:info(parent),
|
||||
receive
|
||||
?CALL(From, {lookup, Key})=Req ->
|
||||
case do_lookup(Key, [State#state.c, State#state.b, State#state.a, Next]) of
|
||||
not_found ->
|
||||
case do_lookup(Mod, Key, [State#state.c, State#state.b, State#state.a]) of
|
||||
not_found when Next =:= undefined ->
|
||||
plain_rpc:send_reply(From, not_found);
|
||||
{found, Result} ->
|
||||
plain_rpc:send_reply(From, {ok, Result});
|
||||
{delegate, DelegatePid} ->
|
||||
DelegatePid ! Req
|
||||
not_found ->
|
||||
Next ! Req
|
||||
end,
|
||||
main_loop(State);
|
||||
|
||||
?CAST(_From, {lookup, Key, ReplyFun})=Req ->
|
||||
case do_lookup(Key, [State#state.c, State#state.b, State#state.a, Next]) of
|
||||
not_found ->
|
||||
case do_lookup(Mod, Key, [State#state.c, State#state.b, State#state.a]) of
|
||||
not_found when Next =:= undefined ->
|
||||
ReplyFun(not_found);
|
||||
{found, Result} ->
|
||||
ReplyFun({ok, Result});
|
||||
{delegate, DelegatePid} ->
|
||||
DelegatePid ! Req
|
||||
not_found ->
|
||||
Next ! Req
|
||||
end,
|
||||
main_loop(State);
|
||||
|
||||
|
@ -310,7 +327,7 @@ main_loop(State = #state{ next=Next }) ->
|
|||
|
||||
plain_rpc:send_reply(From, ok),
|
||||
|
||||
case hanoidb_reader:open(ToFileName, [random|State#state.opts]) of
|
||||
case Mod:open_random_reader(ToFileName, [random|State#state.opts]) of
|
||||
{ok, BT} ->
|
||||
if SetPos == #state.b ->
|
||||
check_begin_merge_then_loop(setelement(SetPos, State, BT));
|
||||
|
@ -406,9 +423,9 @@ main_loop(State = #state{ next=Next }) ->
|
|||
main_loop(State2#state{ step_next_ref=undefined });
|
||||
|
||||
?CALL(From, close) ->
|
||||
close_if_defined(State#state.a),
|
||||
close_if_defined(State#state.b),
|
||||
close_if_defined(State#state.c),
|
||||
close_if_defined(Mod, State#state.a),
|
||||
close_if_defined(Mod, State#state.b),
|
||||
close_if_defined(Mod, State#state.c),
|
||||
[stop_if_defined(PID) || PID <- [State#state.merge_pid | State#state.folding]],
|
||||
|
||||
%% this is synchronous all the way down, because our
|
||||
|
@ -422,9 +439,9 @@ main_loop(State = #state{ next=Next }) ->
|
|||
{ok, closing};
|
||||
|
||||
?CALL(From, destroy) ->
|
||||
destroy_if_defined(State#state.a),
|
||||
destroy_if_defined(State#state.b),
|
||||
destroy_if_defined(State#state.c),
|
||||
destroy_if_defined(Mod,State#state.a),
|
||||
destroy_if_defined(Mod,State#state.b),
|
||||
destroy_if_defined(Mod,State#state.c),
|
||||
[stop_if_defined(PID) || PID <- [State#state.merge_pid | State#state.folding]],
|
||||
|
||||
%% this is synchronous all the way down, because our
|
||||
|
@ -496,27 +513,27 @@ main_loop(State = #state{ next=Next }) ->
|
|||
|
||||
{_, undefined, undefined} ->
|
||||
ARef = erlang:make_ref(),
|
||||
ok = do_range_fold(State#state.a, WorkerPID, ARef, Range),
|
||||
ok = do_range_fold(Mod,State#state.a, WorkerPID, ARef, Range),
|
||||
[ARef|List];
|
||||
|
||||
{_, _, undefined} ->
|
||||
BRef = erlang:make_ref(),
|
||||
ok = do_range_fold(State#state.b, WorkerPID, BRef, Range),
|
||||
ok = do_range_fold(Mod,State#state.b, WorkerPID, BRef, Range),
|
||||
|
||||
ARef = erlang:make_ref(),
|
||||
ok = do_range_fold(State#state.a, WorkerPID, ARef, Range),
|
||||
ok = do_range_fold(Mod,State#state.a, WorkerPID, ARef, Range),
|
||||
|
||||
[ARef,BRef|List];
|
||||
|
||||
{_, _, _} ->
|
||||
CRef = erlang:make_ref(),
|
||||
ok = do_range_fold(State#state.c, WorkerPID, CRef, Range),
|
||||
ok = do_range_fold(Mod,State#state.c, WorkerPID, CRef, Range),
|
||||
|
||||
BRef = erlang:make_ref(),
|
||||
ok = do_range_fold(State#state.b, WorkerPID, BRef, Range),
|
||||
ok = do_range_fold(Mod,State#state.b, WorkerPID, BRef, Range),
|
||||
|
||||
ARef = erlang:make_ref(),
|
||||
ok = do_range_fold(State#state.a, WorkerPID, ARef, Range),
|
||||
ok = do_range_fold(Mod,State#state.a, WorkerPID, ARef, Range),
|
||||
|
||||
[ARef,BRef,CRef|List]
|
||||
end,
|
||||
|
@ -542,9 +559,9 @@ main_loop(State = #state{ next=Next }) ->
|
|||
undefined ->
|
||||
main_loop(State2#state{ merge_pid=undefined });
|
||||
CFile ->
|
||||
ok = hanoidb_reader:close(CFile),
|
||||
ok = Mod:close_random_reader(CFile),
|
||||
ok = file:rename(filename("C", State2), filename("A", State2)),
|
||||
{ok, AFile} = hanoidb_reader:open(filename("A", State2), [random|State#state.opts]),
|
||||
{ok, AFile} = Mod:open_random_reader(filename("A", State2), [random|State#state.opts]),
|
||||
main_loop(State2#state{ a = AFile, c = undefined, merge_pid=undefined })
|
||||
end;
|
||||
|
||||
|
@ -565,7 +582,7 @@ main_loop(State = #state{ next=Next }) ->
|
|||
% then, rename M to A, and open it
|
||||
AFileName = filename("A",State2),
|
||||
ok = file:rename(MFileName, AFileName),
|
||||
{ok, AFile} = hanoidb_reader:open(AFileName, [random|State#state.opts]),
|
||||
{ok, AFile} = Mod:open_random_reader(AFileName, [random|State#state.opts]),
|
||||
|
||||
% iff there is a C file, then move it to B position
|
||||
% TODO: consider recovery for this
|
||||
|
@ -573,9 +590,9 @@ main_loop(State = #state{ next=Next }) ->
|
|||
undefined ->
|
||||
main_loop(State2#state{ a=AFile, b=undefined, merge_pid=undefined });
|
||||
CFile ->
|
||||
ok = hanoidb_reader:close(CFile),
|
||||
ok = Mod:close_random_reader(CFile),
|
||||
ok = file:rename(filename("C", State2), filename("B", State2)),
|
||||
{ok, BFile} = hanoidb_reader:open(filename("B", State2), [random|State#state.opts]),
|
||||
{ok, BFile} = Mod:open_random_reader(filename("B", State2), [random|State#state.opts]),
|
||||
check_begin_merge_then_loop(State2#state{ a=AFile, b=BFile, c=undefined,
|
||||
merge_pid=undefined })
|
||||
end;
|
||||
|
@ -722,9 +739,9 @@ reply_step_ok(State) ->
|
|||
case State#state.step_caller of
|
||||
undefined ->
|
||||
ok;
|
||||
_ ->
|
||||
?log("step_ok -> ~p", [State#state.step_caller]),
|
||||
plain_rpc:send_reply(State#state.step_caller, step_ok)
|
||||
Caller ->
|
||||
?log("step_ok -> ~p", [Caller]),
|
||||
plain_rpc:send_reply(Caller, step_ok)
|
||||
end,
|
||||
State#state{ step_caller=undefined }.
|
||||
|
||||
|
@ -738,27 +755,28 @@ total_unmerged(State) ->
|
|||
|
||||
|
||||
|
||||
do_lookup(_Key, []) ->
|
||||
do_lookup(_, _Key, []) ->
|
||||
not_found;
|
||||
do_lookup(_Key, [Pid]) when is_pid(Pid) ->
|
||||
{delegate, Pid};
|
||||
do_lookup(Key, [undefined|Rest]) ->
|
||||
do_lookup(Key, Rest);
|
||||
do_lookup(Key, [BT|Rest]) ->
|
||||
case hanoidb_reader:lookup(BT, Key) of
|
||||
do_lookup(Mod, Key, [undefined|Rest]) ->
|
||||
do_lookup(Mod, Key, Rest);
|
||||
do_lookup(Mod, Key, [BT|Rest]) ->
|
||||
case Mod:lookup(Key, BT) of
|
||||
{ok, ?TOMBSTONE} ->
|
||||
not_found;
|
||||
{ok, Result} ->
|
||||
{found, Result};
|
||||
not_found ->
|
||||
do_lookup(Key, Rest)
|
||||
do_lookup(Mod,Key, Rest)
|
||||
end.
|
||||
|
||||
close_if_defined(undefined) -> ok;
|
||||
close_if_defined(BT) -> hanoidb_reader:close(BT).
|
||||
close_if_defined(_, undefined) -> ok;
|
||||
close_if_defined(Mod, BT) -> Mod:close_random_reader(BT).
|
||||
|
||||
destroy_if_defined(undefined) -> ok;
|
||||
destroy_if_defined(BT) -> hanoidb_reader:destroy(BT).
|
||||
destroy_if_defined(_, undefined) -> ok;
|
||||
destroy_if_defined(Mod, BT) ->
|
||||
{ok, Name} = Mod:file_name(BT),
|
||||
Mod:close_random_reader(BT),
|
||||
file:delete(Name).
|
||||
|
||||
stop_if_defined(undefined) -> ok;
|
||||
stop_if_defined(MergePid) when is_pid(MergePid) ->
|
||||
|
@ -785,7 +803,9 @@ begin_merge(State) ->
|
|||
try
|
||||
?log("merge begun~n", []),
|
||||
|
||||
{ok, OutCount} = hanoidb_merger:merge(AFileName, BFileName, XFileName,
|
||||
{ok, OutCount} = hanoidb_backend:merge(
|
||||
State#state.backend,
|
||||
AFileName, BFileName, XFileName,
|
||||
?BTREE_SIZE(State#state.level + 1),
|
||||
State#state.next =:= undefined,
|
||||
State#state.opts ),
|
||||
|
@ -805,9 +825,10 @@ begin_merge(State) ->
|
|||
close_and_delete_a_and_b(State) ->
|
||||
AFileName = filename("A",State),
|
||||
BFileName = filename("B",State),
|
||||
Mod = State#state.backend,
|
||||
|
||||
ok = hanoidb_reader:close(State#state.a),
|
||||
ok = hanoidb_reader:close(State#state.b),
|
||||
ok = Mod:close_random_reader(State#state.a),
|
||||
ok = Mod:close_random_reader(State#state.b),
|
||||
|
||||
ok = file:delete(AFileName),
|
||||
ok = file:delete(BFileName),
|
||||
|
@ -821,14 +842,15 @@ filename(PFX, State) ->
|
|||
|
||||
start_range_fold(FileName, WorkerPID, Range, State) ->
|
||||
Owner = self(),
|
||||
Mod = State#state.backend,
|
||||
PID = proc_lib:spawn( fun() ->
|
||||
try
|
||||
?log("start_range_fold ~p on ~p -> ~p", [self(), FileName, WorkerPID]),
|
||||
erlang:link(WorkerPID),
|
||||
{ok, File} = hanoidb_reader:open(FileName, [folding|State#state.opts]),
|
||||
do_range_fold2(File, WorkerPID, self(), Range),
|
||||
{ok, File} = Mod:open_random_reader(FileName, [folding|State#state.opts]),
|
||||
do_range_fold2(Mod,File, WorkerPID, self(), Range),
|
||||
erlang:unlink(WorkerPID),
|
||||
hanoidb_reader:close(File),
|
||||
Mod:close_random_reader(File),
|
||||
|
||||
%% this will release the pinning of the fold file
|
||||
Owner ! {range_fold_done, self(), FileName},
|
||||
|
@ -840,12 +862,13 @@ start_range_fold(FileName, WorkerPID, Range, State) ->
|
|||
end ),
|
||||
{ok, PID}.
|
||||
|
||||
-spec do_range_fold(BT :: hanoidb_reader:read_file(),
|
||||
-spec do_range_fold(Mod :: atom(),
|
||||
BT :: hanoidb_backend:random_reader(),
|
||||
WorkerPID :: pid(),
|
||||
SelfOrRef :: pid() | reference(),
|
||||
Range :: #key_range{} ) -> ok.
|
||||
do_range_fold(BT, WorkerPID, SelfOrRef, Range) ->
|
||||
case hanoidb_reader:range_fold(fun(Key,Value,_) ->
|
||||
do_range_fold(Mod, BT, WorkerPID, SelfOrRef, Range) ->
|
||||
case Mod:range_fold(fun(Key,Value,_) ->
|
||||
WorkerPID ! {level_result, SelfOrRef, Key, Value},
|
||||
ok
|
||||
end,
|
||||
|
@ -863,12 +886,13 @@ do_range_fold(BT, WorkerPID, SelfOrRef, Range) ->
|
|||
|
||||
-define(FOLD_CHUNK_SIZE, 100).
|
||||
|
||||
-spec do_range_fold2(BT :: hanoidb_reader:read_file(),
|
||||
-spec do_range_fold2(Mod :: atom(),
|
||||
BT :: hanoidb_reader:read_file(),
|
||||
WorkerPID :: pid(),
|
||||
SelfOrRef :: pid() | reference(),
|
||||
Range :: #key_range{} ) -> ok.
|
||||
do_range_fold2(BT, WorkerPID, SelfOrRef, Range) ->
|
||||
try hanoidb_reader:range_fold(fun(Key,Value,{0,KVs}) ->
|
||||
do_range_fold2(Mod, BT, WorkerPID, SelfOrRef, Range) ->
|
||||
try Mod:range_fold(fun(Key,Value,{0,KVs}) ->
|
||||
send(WorkerPID, SelfOrRef, [{Key,Value}|KVs]),
|
||||
{?FOLD_CHUNK_SIZE-1, []};
|
||||
(Key,Value,{N,KVs}) ->
|
||||
|
|
|
@ -99,7 +99,7 @@ read_nursery_from_log(Directory, MaxLevel, Config) ->
|
|||
-spec do_add(#nursery{}, binary(), binary()|?TOMBSTONE, non_neg_integer() | infinity, pid()) -> {ok, #nursery{}} | {full, #nursery{}}.
|
||||
do_add(Nursery, Key, Value, infinity, Top) ->
|
||||
do_add(Nursery, Key, Value, 0, Top);
|
||||
do_add(Nursery=#nursery{log_file=File, cache=Cache, total_size=TotalSize, count=Count, config=Config}, Key, Value, KeyExpiryTime, Top) ->
|
||||
do_add(Nursery=#nursery{log_file=File, cache=Cache, total_size=TotalSize, count=Count, config=Config}, Key, Value, KeyExpiryTime, Top) when is_integer(KeyExpiryTime) ->
|
||||
DatabaseExpiryTime = hanoidb:get_opt(expiry_secs, Config),
|
||||
|
||||
{Data, Cache2} =
|
||||
|
@ -175,7 +175,7 @@ lookup(Key, #nursery{cache=Cache}) ->
|
|||
%% @end
|
||||
-spec finish(Nursery::#nursery{}, TopLevel::pid()) -> ok.
|
||||
finish(#nursery{ dir=Dir, cache=Cache, log_file=LogFile, merge_done=DoneMerge,
|
||||
count=Count, config=Config }, TopLevel) ->
|
||||
count=Count, config=Config, backend=Backend }, TopLevel) ->
|
||||
|
||||
hanoidb_util:ensure_expiry(Config),
|
||||
|
||||
|
@ -189,16 +189,17 @@ finish(#nursery{ dir=Dir, cache=Cache, log_file=LogFile, merge_done=DoneMerge,
|
|||
N when N > 0 ->
|
||||
%% next, flush cache to a new BTree
|
||||
BTreeFileName = filename:join(Dir, "nursery.data"),
|
||||
{ok, BT} = hanoidb_writer:open(BTreeFileName, [{size, ?BTREE_SIZE(?TOP_LEVEL)},
|
||||
{ok, BT} = Backend:open_batch_writer(BTreeFileName, [{size, ?BTREE_SIZE(?TOP_LEVEL)},
|
||||
{compress, none} | Config]),
|
||||
try
|
||||
ok = gb_trees_ext:fold(fun(Key, Value, Acc) ->
|
||||
ok = hanoidb_writer:add(BT, Key, Value),
|
||||
Acc
|
||||
end, ok, Cache)
|
||||
after
|
||||
ok = hanoidb_writer:close(BT)
|
||||
end,
|
||||
|
||||
BT3 = gb_trees_ext:fold(fun(Key, {Value,Expiry}, BT1) ->
|
||||
{ok, BT2} = Backend:write_next({Key,Value,Expiry}, BT1),
|
||||
BT2;
|
||||
(Key, Value, BT1) ->
|
||||
{ok, BT2} = Backend:write_next({Key,Value,infinity}, BT1),
|
||||
BT2
|
||||
end, BT, Cache),
|
||||
ok = Backend:close_batch_writer(BT3),
|
||||
|
||||
%% Inject the B-Tree (blocking RPC)
|
||||
ok = hanoidb_level:inject(TopLevel, BTreeFileName),
|
||||
|
|
|
@ -33,7 +33,7 @@
|
|||
-define(ASSERT_WHEN(X), when X).
|
||||
|
||||
-export([open/1, open/2,close/1,lookup/2,fold/3,range_fold/4, destroy/1]).
|
||||
-export([first_node/1,next_node/1]).
|
||||
-export([first_node/1,next_node/1,file_name/1]).
|
||||
-export([serialize/1, deserialize/1]).
|
||||
|
||||
-record(node, {level :: non_neg_integer(),
|
||||
|
@ -61,6 +61,7 @@ open(Name, Config) ->
|
|||
ReadBufferSize = hanoidb:get_opt(read_buffer_size, Config, 512 * 1024),
|
||||
case file:open(Name, [raw,read,{read_ahead, ReadBufferSize},binary]) of
|
||||
{ok, File} ->
|
||||
{ok, Pos} = file:position(File, ?FIRST_BLOCK_POS),
|
||||
{ok, #index{file=File, name=Name, config=Config}};
|
||||
{error, _}=Err ->
|
||||
Err
|
||||
|
@ -99,6 +100,9 @@ open(Name, Config) ->
|
|||
{ok, #index{file=File, root=Root, bloom=Bloom, name=Name, config=Config}}
|
||||
end.
|
||||
|
||||
file_name(#index{name=Name}) ->
|
||||
{ok, Name}.
|
||||
|
||||
destroy(#index{file=File, name=Name}) ->
|
||||
ok = file:close(File),
|
||||
file:delete(Name).
|
||||
|
@ -130,7 +134,9 @@ fold1(File,Fun,Acc0) ->
|
|||
eof ->
|
||||
Acc0;
|
||||
{ok, Node} ->
|
||||
fold0(File,Fun,Node,Acc0)
|
||||
fold0(File,Fun,Node,Acc0);
|
||||
{error,_}=Err ->
|
||||
exit(Err)
|
||||
end.
|
||||
|
||||
-spec range_fold(function(), any(), #index{}, #key_range{}) ->
|
||||
|
@ -193,7 +199,10 @@ do_range_fold(Fun, Acc0, File, Range, undefined) ->
|
|||
{stopped, Result} -> Result;
|
||||
{ok, Acc1} ->
|
||||
do_range_fold(Fun, Acc1, File, Range, undefined)
|
||||
end
|
||||
end;
|
||||
|
||||
{error,_}=Err ->
|
||||
Err
|
||||
end;
|
||||
|
||||
do_range_fold(Fun, Acc0, File, Range, N0) ->
|
||||
|
@ -230,7 +239,10 @@ do_range_fold(Fun, Acc0, File, Range, N0) ->
|
|||
{stopped, Result} -> Result;
|
||||
{ok, {N2, Acc1}} ->
|
||||
do_range_fold(Fun, Acc1, File, Range, N2)
|
||||
end
|
||||
end;
|
||||
|
||||
{error,_}=Err ->
|
||||
Err
|
||||
end.
|
||||
|
||||
lookup_node(_File,_FromKey,#node{level=0},Pos) ->
|
||||
|
@ -269,7 +281,10 @@ next_node(#index{file=File}=_Index) ->
|
|||
% {ok, #node{level=N}} when N>0 ->
|
||||
% next_node(Index);
|
||||
eof ->
|
||||
end_of_data
|
||||
end_of_data;
|
||||
|
||||
{error,_}=Err ->
|
||||
Err
|
||||
end.
|
||||
|
||||
close(#index{file=undefined}) ->
|
||||
|
@ -413,6 +428,8 @@ next_leaf_node(File) ->
|
|||
hanoidb_util:decode_index_node(0, Data);
|
||||
{ok, <<Len:32/unsigned, _:16/unsigned>>} ->
|
||||
{ok, _} = file:position(File, {cur,Len-2}),
|
||||
next_leaf_node(File)
|
||||
next_leaf_node(File);
|
||||
{error,_}=Err ->
|
||||
Err
|
||||
end.
|
||||
|
||||
|
|
Loading…
Reference in a new issue