Separate out hanoid_backend API
First step towards a hanoidb backend API, which will allow replacing the storage engine with any ordered storage. We still need an API for how to choose backend and perhaps also how to discover the backend of an existing store.
This commit is contained in:
parent
e3689c3d87
commit
3a43d9235b
5 changed files with 356 additions and 72 deletions
|
@ -27,3 +27,5 @@
|
||||||
-define(REPLY(Ref,Msg), {'$reply', Ref, Msg}).
|
-define(REPLY(Ref,Msg), {'$reply', Ref, Msg}).
|
||||||
-define(CAST(From,Msg), {'$cast', From, Msg}).
|
-define(CAST(From,Msg), {'$cast', From, Msg}).
|
||||||
|
|
||||||
|
-type caller() :: { pid(), reference() }.
|
||||||
|
|
||||||
|
|
174
src/hanoidb_backend.erl
Normal file
174
src/hanoidb_backend.erl
Normal file
|
@ -0,0 +1,174 @@
|
||||||
|
|
||||||
|
-module(hanoidb_backend).
|
||||||
|
|
||||||
|
-include("include/hanoidb.hrl").
|
||||||
|
-include("src/hanoidb.hrl").
|
||||||
|
|
||||||
|
-type options() :: [ atom() | { atom(), term() } ].
|
||||||
|
-type kvexp_entry() :: { Key :: key(), Value :: value(), TimeOut :: expiry() }.
|
||||||
|
-type batch_reader() :: any().
|
||||||
|
-type batch_writer() :: any().
|
||||||
|
-type random_reader() :: any().
|
||||||
|
|
||||||
|
-export([merge/7]).
|
||||||
|
|
||||||
|
%%%=========================================================================
|
||||||
|
%%% API
|
||||||
|
%%%=========================================================================
|
||||||
|
|
||||||
|
%% batch_reader and batch_writer are used by the merging logic. A batch_reader
|
||||||
|
%% must return the values in lexicographical order of the binary keys.
|
||||||
|
|
||||||
|
-callback open_batch_reader(File :: string(), Options :: options())
|
||||||
|
-> {ok, batch_reader()} | { error, term() }.
|
||||||
|
-callback read_next(batch_reader())
|
||||||
|
-> { [kvexp_entry(), ...], batch_reader()} | 'done'.
|
||||||
|
-callback close_batch_reader( batch_reader() )
|
||||||
|
-> ok | {error, term()}.
|
||||||
|
|
||||||
|
|
||||||
|
-callback open_batch_writer(File :: string(), Options :: options())
|
||||||
|
-> {ok, batch_writer()} | {error, term()}.
|
||||||
|
-callback write_next( kvexp_entry() , batch_writer() )
|
||||||
|
-> {ok, batch_writer()} | {error, term()}.
|
||||||
|
-callback write_count( batch_writer() ) ->
|
||||||
|
{ok, non_neg_integer()} | {error, term()}.
|
||||||
|
-callback close_batch_writer( batch_writer() )
|
||||||
|
-> ok | {error, term()}.
|
||||||
|
|
||||||
|
|
||||||
|
-callback open_random_reader(File :: string(), Options :: options()) ->
|
||||||
|
{ok, random_reader()} | {error, term()}.
|
||||||
|
-callback file_name( random_reader() ) ->
|
||||||
|
{ok, string()} | {error, term()}.
|
||||||
|
-callback lookup( Key :: key(), random_reader() ) ->
|
||||||
|
not_found | {ok, value()}.
|
||||||
|
-callback range_fold( fun( (key(), value(), term()) -> term() ),
|
||||||
|
Acc0 :: term(),
|
||||||
|
Reader :: random_reader(),
|
||||||
|
Range :: #key_range{} ) ->
|
||||||
|
{limit, term(), LastKey :: binary()} | {ok, term()}.
|
||||||
|
-callback close_random_reader(random_reader()) ->
|
||||||
|
ok | {error, term()}.
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
-spec merge(atom(), string(), string(), string(), integer(), boolean(), list()) -> {ok, integer()}.
|
||||||
|
merge(Mod,A,B,C, Size, IsLastLevel, Options) ->
|
||||||
|
{ok, IXA} = Mod:open_batch_reader(A, Options),
|
||||||
|
{ok, IXB} = Mod:open_batch_reader(B, Options),
|
||||||
|
{ok, Out} = Mod:open_batch_writer(C, [{size, Size} | Options]),
|
||||||
|
scan(Mod,IXA, IXB, Out, IsLastLevel, [], [], {0, none}).
|
||||||
|
|
||||||
|
terminate(Mod, Out) ->
|
||||||
|
{ok, Count} = Mod:write_count( Out ),
|
||||||
|
ok = Mod:close_batch_writer( Out ),
|
||||||
|
{ok, Count}.
|
||||||
|
|
||||||
|
step(S) ->
|
||||||
|
step(S, 1).
|
||||||
|
|
||||||
|
step({N, From}, Steps) ->
|
||||||
|
{N-Steps, From}.
|
||||||
|
|
||||||
|
|
||||||
|
scan(Mod, IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {N, FromPID}) when N < 1, AKVs =/= [], BKVs =/= [] ->
|
||||||
|
case FromPID of
|
||||||
|
none ->
|
||||||
|
ok;
|
||||||
|
{PID, Ref} ->
|
||||||
|
PID ! {Ref, step_done}
|
||||||
|
end,
|
||||||
|
|
||||||
|
receive
|
||||||
|
{step, From, HowMany} ->
|
||||||
|
scan(Mod, IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {N+HowMany, From})
|
||||||
|
end;
|
||||||
|
|
||||||
|
scan(Mod, IXA, IXB, Out, IsLastLevel, [], BKVs, Step) ->
|
||||||
|
case Mod:read_next(IXA) of
|
||||||
|
{AKVs, IXA2} ->
|
||||||
|
scan(Mod, IXA2, IXB, Out, IsLastLevel, AKVs, BKVs, Step);
|
||||||
|
done ->
|
||||||
|
ok = Mod:close_batch_reader(IXA),
|
||||||
|
scan_only(Mod, IXB, Out, IsLastLevel, BKVs, Step)
|
||||||
|
end;
|
||||||
|
|
||||||
|
scan(Mod, IXA, IXB, Out, IsLastLevel, AKVs, [], Step) ->
|
||||||
|
case Mod:read_next(IXB) of
|
||||||
|
{BKVs, IXB2} ->
|
||||||
|
scan(Mod, IXA, IXB2, Out, IsLastLevel, AKVs, BKVs, Step);
|
||||||
|
done ->
|
||||||
|
ok = Mod:close_batch_reader(IXB),
|
||||||
|
scan_only(Mod, IXA, Out, IsLastLevel, AKVs, Step)
|
||||||
|
end;
|
||||||
|
|
||||||
|
scan(Mod, IXA, IXB, Out, IsLastLevel, [{Key1,_,_}=Entry|AT], [{Key2,_,_}|_]=BKVs, Step)
|
||||||
|
when Key1 < Key2 ->
|
||||||
|
case Entry of
|
||||||
|
{_, ?TOMBSTONE, _} when IsLastLevel ->
|
||||||
|
scan(Mod, IXA, IXB, Out, true, AT, BKVs, step(Step));
|
||||||
|
_ ->
|
||||||
|
{ok, Out3} = Mod:write_next( Entry, Out ),
|
||||||
|
scan(Mod, IXA, IXB, Out3, IsLastLevel, AT, BKVs, step(Step))
|
||||||
|
end;
|
||||||
|
scan(Mod, IXA, IXB, Out, IsLastLevel, [{Key1,_,_}|_]=AKVs, [{Key2,_,_}=Entry|BT], Step)
|
||||||
|
when Key1 > Key2 ->
|
||||||
|
case Entry of
|
||||||
|
{_, ?TOMBSTONE, _} when IsLastLevel ->
|
||||||
|
scan(Mod, IXA, IXB, Out, true, AKVs, BT, step(Step));
|
||||||
|
_ ->
|
||||||
|
{ok, Out3} = Mod:write_next( Entry, Out ),
|
||||||
|
scan(Mod, IXA, IXB, Out3, IsLastLevel, AKVs, BT, step(Step))
|
||||||
|
end;
|
||||||
|
scan(Mod, IXA, IXB, Out, IsLastLevel, [_|AT], [Entry|BT], Step) ->
|
||||||
|
case Entry of
|
||||||
|
{_, ?TOMBSTONE, _} when IsLastLevel ->
|
||||||
|
scan(Mod, IXA, IXB, Out, true, AT, BT, step(Step));
|
||||||
|
_ ->
|
||||||
|
{ok, Out3} = Mod:write_next( Entry, Out ),
|
||||||
|
scan(Mod, IXA, IXB, Out3, IsLastLevel, AT, BT, step(Step, 2))
|
||||||
|
end.
|
||||||
|
|
||||||
|
|
||||||
|
scan_only(Mod, IX, Out, IsLastLevel, KVs, {N, FromPID}) when N < 1, KVs =/= [] ->
|
||||||
|
case FromPID of
|
||||||
|
none ->
|
||||||
|
ok;
|
||||||
|
{PID, Ref} ->
|
||||||
|
PID ! {Ref, step_done}
|
||||||
|
end,
|
||||||
|
|
||||||
|
receive
|
||||||
|
{step, From, HowMany} ->
|
||||||
|
scan_only(Mod, IX, Out, IsLastLevel, KVs, {N+HowMany, From})
|
||||||
|
end;
|
||||||
|
|
||||||
|
scan_only(Mod, IX, Out, IsLastLevel, [], {_, FromPID}=Step) ->
|
||||||
|
case Mod:read_next(IX) of
|
||||||
|
{KVs, IX2} ->
|
||||||
|
scan_only(Mod, IX2, Out, IsLastLevel, KVs, Step);
|
||||||
|
done ->
|
||||||
|
case FromPID of
|
||||||
|
none ->
|
||||||
|
ok;
|
||||||
|
{PID, Ref} ->
|
||||||
|
PID ! {Ref, step_done}
|
||||||
|
end,
|
||||||
|
ok = Mod:close_batch_reader(IX),
|
||||||
|
terminate(Mod, Out)
|
||||||
|
end;
|
||||||
|
|
||||||
|
scan_only(Mod, IX, Out, true, [{_,?TOMBSTONE,_}|Rest], Step) ->
|
||||||
|
scan_only(Mod, IX, Out, true, Rest, step(Step));
|
||||||
|
|
||||||
|
scan_only(Mod, IX, Out, IsLastLevel, [Entry|Rest], Step) ->
|
||||||
|
{ok, Out3} = Mod:write_next( Entry, Out ),
|
||||||
|
scan_only(Mod, IX, Out3, IsLastLevel, Rest, step(Step)).
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
67
src/hanoidb_han2_backend.erl
Normal file
67
src/hanoidb_han2_backend.erl
Normal file
|
@ -0,0 +1,67 @@
|
||||||
|
-module(hanoidb_han2_backend).
|
||||||
|
|
||||||
|
-include("hanoidb.hrl").
|
||||||
|
|
||||||
|
-behavior(hanoidb_backend).
|
||||||
|
|
||||||
|
-export([open_random_reader/2, file_name/1, range_fold/4, lookup/2, close_random_reader/1]).
|
||||||
|
-export([open_batch_reader/2, read_next/1, close_batch_reader/1]).
|
||||||
|
-export([open_batch_writer/2, write_next/2, write_count/1, close_batch_writer/1]).
|
||||||
|
|
||||||
|
|
||||||
|
open_random_reader(Name, Options) ->
|
||||||
|
hanoidb_reader:open(Name, [random|Options]).
|
||||||
|
|
||||||
|
file_name(Reader) ->
|
||||||
|
hanoidb_reader:file_name(Reader).
|
||||||
|
|
||||||
|
lookup(Key, Reader) ->
|
||||||
|
hanoidb_reader:lookup(Reader, Key).
|
||||||
|
|
||||||
|
range_fold(Fun, Acc, Reader, Range) ->
|
||||||
|
hanoidb_reader:range_fold(Fun, Acc, Reader, Range).
|
||||||
|
|
||||||
|
close_random_reader(Reader) ->
|
||||||
|
hanoidb_reader:close(Reader).
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
open_batch_reader(Name, Options) ->
|
||||||
|
hanoidb_reader:open(Name, [sequential|Options]).
|
||||||
|
|
||||||
|
read_next(Reader) ->
|
||||||
|
case hanoidb_reader:next_node(Reader) of
|
||||||
|
{node, KVs} ->
|
||||||
|
{[ unfold(KV) || KV <- KVs], Reader};
|
||||||
|
end_of_data ->
|
||||||
|
'done';
|
||||||
|
{error, _}=Err ->
|
||||||
|
Err
|
||||||
|
end.
|
||||||
|
|
||||||
|
unfold({Key,{Value, Expiry}}) when is_binary(Value); ?TOMBSTONE =:= Value ->
|
||||||
|
{Key,Value,Expiry};
|
||||||
|
unfold({Key,Value}) ->
|
||||||
|
{Key, Value, infinity}.
|
||||||
|
|
||||||
|
close_batch_reader(Reader) ->
|
||||||
|
hanoidb_reader:close(Reader).
|
||||||
|
|
||||||
|
open_batch_writer(Name, Options) ->
|
||||||
|
hanoidb_writer:init([Name, Options]).
|
||||||
|
|
||||||
|
write_next( {Key, Value, Expiry}, Writer) ->
|
||||||
|
{noreply, Writer2} = hanoidb_writer:handle_cast({add, Key, {Value, Expiry}}, Writer),
|
||||||
|
{ok, Writer2}.
|
||||||
|
|
||||||
|
write_count( Writer ) ->
|
||||||
|
case hanoidb_writer:handle_call(count, self(), Writer) of
|
||||||
|
{ok, Count, _} ->
|
||||||
|
{ok, Count};
|
||||||
|
Err ->
|
||||||
|
{error, Err}
|
||||||
|
end.
|
||||||
|
|
||||||
|
close_batch_writer(Writer) ->
|
||||||
|
{stop, normal, ok, _} = hanoidb_writer:handle_call(close, self(), Writer),
|
||||||
|
ok.
|
|
@ -51,9 +51,24 @@
|
||||||
-include_lib("kernel/include/file.hrl").
|
-include_lib("kernel/include/file.hrl").
|
||||||
|
|
||||||
-record(state, {
|
-record(state, {
|
||||||
a, b, c, next, dir, level, inject_done_ref, merge_pid, folding = [],
|
a :: undefined | hanoidb_backend:random_reader(),
|
||||||
step_next_ref, step_caller, step_merge_ref,
|
b :: undefined | hanoidb_backend:random_reader(),
|
||||||
opts = [], owner, work_in_progress=0, work_done=0, max_level=?TOP_LEVEL
|
c :: undefined | hanoidb_backend:random_reader(),
|
||||||
|
next :: pid() | undefined,
|
||||||
|
dir :: string(),
|
||||||
|
level :: non_neg_integer(),
|
||||||
|
inject_done_ref :: reference(),
|
||||||
|
merge_pid :: pid(),
|
||||||
|
folding = [] :: list( pid() ),
|
||||||
|
step_next_ref :: reference() | undefined,
|
||||||
|
step_caller :: plain_rpc:caller() | undefined,
|
||||||
|
step_merge_ref :: reference() | undefined,
|
||||||
|
opts = [] :: list(),
|
||||||
|
owner :: pid(),
|
||||||
|
work_in_progress=0 :: non_neg_integer(),
|
||||||
|
work_done=0 :: non_neg_integer(),
|
||||||
|
max_level=?TOP_LEVEL :: pos_integer(),
|
||||||
|
backend = hanoidb_han2_backend :: atom()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
|
|
||||||
|
@ -177,6 +192,8 @@ initialize2(State) ->
|
||||||
file:delete(filename("BF",State)),
|
file:delete(filename("BF",State)),
|
||||||
file:delete(filename("CF",State)),
|
file:delete(filename("CF",State)),
|
||||||
|
|
||||||
|
Mod = State#state.backend,
|
||||||
|
|
||||||
case file:read_file_info(MFileName) of
|
case file:read_file_info(MFileName) of
|
||||||
{ok, _} ->
|
{ok, _} ->
|
||||||
|
|
||||||
|
@ -188,12 +205,12 @@ initialize2(State) ->
|
||||||
file:delete(BFileName),
|
file:delete(BFileName),
|
||||||
ok = file:rename(MFileName, AFileName),
|
ok = file:rename(MFileName, AFileName),
|
||||||
|
|
||||||
{ok, IXA} = hanoidb_reader:open(AFileName, [random|State#state.opts]),
|
{ok, IXA} = Mod:open_random_reader(AFileName, [random|State#state.opts]),
|
||||||
|
|
||||||
case file:read_file_info(CFileName) of
|
case file:read_file_info(CFileName) of
|
||||||
{ok, _} ->
|
{ok, _} ->
|
||||||
file:rename(CFileName, BFileName),
|
file:rename(CFileName, BFileName),
|
||||||
{ok, IXB} = hanoidb_reader:open(BFileName, [random|State#state.opts]),
|
{ok, IXB} = Mod:open_random_reader(BFileName, [random|State#state.opts]),
|
||||||
check_begin_merge_then_loop0(init_state(State#state{ a= IXA, b=IXB }));
|
check_begin_merge_then_loop0(init_state(State#state{ a= IXA, b=IXB }));
|
||||||
|
|
||||||
{error, enoent} ->
|
{error, enoent} ->
|
||||||
|
@ -203,13 +220,13 @@ initialize2(State) ->
|
||||||
{error, enoent} ->
|
{error, enoent} ->
|
||||||
case file:read_file_info(BFileName) of
|
case file:read_file_info(BFileName) of
|
||||||
{ok, _} ->
|
{ok, _} ->
|
||||||
{ok, IXA} = hanoidb_reader:open(AFileName, [random|State#state.opts]),
|
{ok, IXA} = Mod:open_random_reader(AFileName, [random|State#state.opts]),
|
||||||
{ok, IXB} = hanoidb_reader:open(BFileName, [random|State#state.opts]),
|
{ok, IXB} = Mod:open_random_reader(BFileName, [random|State#state.opts]),
|
||||||
|
|
||||||
IXC =
|
IXC =
|
||||||
case file:read_file_info(CFileName) of
|
case file:read_file_info(CFileName) of
|
||||||
{ok, _} ->
|
{ok, _} ->
|
||||||
{ok, C} = hanoidb_reader:open(CFileName, [random|State#state.opts]),
|
{ok, C} = Mod:open_random_reader(CFileName, [random|State#state.opts]),
|
||||||
C;
|
C;
|
||||||
{error, enoent} ->
|
{error, enoent} ->
|
||||||
undefined
|
undefined
|
||||||
|
@ -224,7 +241,7 @@ initialize2(State) ->
|
||||||
|
|
||||||
case file:read_file_info(AFileName) of
|
case file:read_file_info(AFileName) of
|
||||||
{ok, _} ->
|
{ok, _} ->
|
||||||
{ok, IXA} = hanoidb_reader:open(AFileName, [random|State#state.opts]),
|
{ok, IXA} = Mod:open_random_reader(AFileName, [random|State#state.opts]),
|
||||||
main_loop(init_state(State#state{ a=IXA }));
|
main_loop(init_state(State#state{ a=IXA }));
|
||||||
|
|
||||||
{error, enoent} ->
|
{error, enoent} ->
|
||||||
|
@ -263,28 +280,28 @@ check_begin_merge_then_loop(State=#state{a=IXA, b=IXB, merge_pid=undefined})
|
||||||
check_begin_merge_then_loop(State) ->
|
check_begin_merge_then_loop(State) ->
|
||||||
main_loop(State).
|
main_loop(State).
|
||||||
|
|
||||||
main_loop(State = #state{ next=Next }) ->
|
main_loop(State = #state{ next=Next, backend=Mod }) ->
|
||||||
Parent = plain_fsm:info(parent),
|
Parent = plain_fsm:info(parent),
|
||||||
receive
|
receive
|
||||||
?CALL(From, {lookup, Key})=Req ->
|
?CALL(From, {lookup, Key})=Req ->
|
||||||
case do_lookup(Key, [State#state.c, State#state.b, State#state.a, Next]) of
|
case do_lookup(Mod, Key, [State#state.c, State#state.b, State#state.a]) of
|
||||||
not_found ->
|
not_found when Next =:= undefined ->
|
||||||
plain_rpc:send_reply(From, not_found);
|
plain_rpc:send_reply(From, not_found);
|
||||||
{found, Result} ->
|
{found, Result} ->
|
||||||
plain_rpc:send_reply(From, {ok, Result});
|
plain_rpc:send_reply(From, {ok, Result});
|
||||||
{delegate, DelegatePid} ->
|
not_found ->
|
||||||
DelegatePid ! Req
|
Next ! Req
|
||||||
end,
|
end,
|
||||||
main_loop(State);
|
main_loop(State);
|
||||||
|
|
||||||
?CAST(_From, {lookup, Key, ReplyFun})=Req ->
|
?CAST(_From, {lookup, Key, ReplyFun})=Req ->
|
||||||
case do_lookup(Key, [State#state.c, State#state.b, State#state.a, Next]) of
|
case do_lookup(Mod, Key, [State#state.c, State#state.b, State#state.a]) of
|
||||||
not_found ->
|
not_found when Next =:= undefined ->
|
||||||
ReplyFun(not_found);
|
ReplyFun(not_found);
|
||||||
{found, Result} ->
|
{found, Result} ->
|
||||||
ReplyFun({ok, Result});
|
ReplyFun({ok, Result});
|
||||||
{delegate, DelegatePid} ->
|
not_found ->
|
||||||
DelegatePid ! Req
|
Next ! Req
|
||||||
end,
|
end,
|
||||||
main_loop(State);
|
main_loop(State);
|
||||||
|
|
||||||
|
@ -310,7 +327,7 @@ main_loop(State = #state{ next=Next }) ->
|
||||||
|
|
||||||
plain_rpc:send_reply(From, ok),
|
plain_rpc:send_reply(From, ok),
|
||||||
|
|
||||||
case hanoidb_reader:open(ToFileName, [random|State#state.opts]) of
|
case Mod:open_random_reader(ToFileName, [random|State#state.opts]) of
|
||||||
{ok, BT} ->
|
{ok, BT} ->
|
||||||
if SetPos == #state.b ->
|
if SetPos == #state.b ->
|
||||||
check_begin_merge_then_loop(setelement(SetPos, State, BT));
|
check_begin_merge_then_loop(setelement(SetPos, State, BT));
|
||||||
|
@ -406,9 +423,9 @@ main_loop(State = #state{ next=Next }) ->
|
||||||
main_loop(State2#state{ step_next_ref=undefined });
|
main_loop(State2#state{ step_next_ref=undefined });
|
||||||
|
|
||||||
?CALL(From, close) ->
|
?CALL(From, close) ->
|
||||||
close_if_defined(State#state.a),
|
close_if_defined(Mod, State#state.a),
|
||||||
close_if_defined(State#state.b),
|
close_if_defined(Mod, State#state.b),
|
||||||
close_if_defined(State#state.c),
|
close_if_defined(Mod, State#state.c),
|
||||||
[stop_if_defined(PID) || PID <- [State#state.merge_pid | State#state.folding]],
|
[stop_if_defined(PID) || PID <- [State#state.merge_pid | State#state.folding]],
|
||||||
|
|
||||||
%% this is synchronous all the way down, because our
|
%% this is synchronous all the way down, because our
|
||||||
|
@ -422,9 +439,9 @@ main_loop(State = #state{ next=Next }) ->
|
||||||
{ok, closing};
|
{ok, closing};
|
||||||
|
|
||||||
?CALL(From, destroy) ->
|
?CALL(From, destroy) ->
|
||||||
destroy_if_defined(State#state.a),
|
destroy_if_defined(Mod,State#state.a),
|
||||||
destroy_if_defined(State#state.b),
|
destroy_if_defined(Mod,State#state.b),
|
||||||
destroy_if_defined(State#state.c),
|
destroy_if_defined(Mod,State#state.c),
|
||||||
[stop_if_defined(PID) || PID <- [State#state.merge_pid | State#state.folding]],
|
[stop_if_defined(PID) || PID <- [State#state.merge_pid | State#state.folding]],
|
||||||
|
|
||||||
%% this is synchronous all the way down, because our
|
%% this is synchronous all the way down, because our
|
||||||
|
@ -496,27 +513,27 @@ main_loop(State = #state{ next=Next }) ->
|
||||||
|
|
||||||
{_, undefined, undefined} ->
|
{_, undefined, undefined} ->
|
||||||
ARef = erlang:make_ref(),
|
ARef = erlang:make_ref(),
|
||||||
ok = do_range_fold(State#state.a, WorkerPID, ARef, Range),
|
ok = do_range_fold(Mod,State#state.a, WorkerPID, ARef, Range),
|
||||||
[ARef|List];
|
[ARef|List];
|
||||||
|
|
||||||
{_, _, undefined} ->
|
{_, _, undefined} ->
|
||||||
BRef = erlang:make_ref(),
|
BRef = erlang:make_ref(),
|
||||||
ok = do_range_fold(State#state.b, WorkerPID, BRef, Range),
|
ok = do_range_fold(Mod,State#state.b, WorkerPID, BRef, Range),
|
||||||
|
|
||||||
ARef = erlang:make_ref(),
|
ARef = erlang:make_ref(),
|
||||||
ok = do_range_fold(State#state.a, WorkerPID, ARef, Range),
|
ok = do_range_fold(Mod,State#state.a, WorkerPID, ARef, Range),
|
||||||
|
|
||||||
[ARef,BRef|List];
|
[ARef,BRef|List];
|
||||||
|
|
||||||
{_, _, _} ->
|
{_, _, _} ->
|
||||||
CRef = erlang:make_ref(),
|
CRef = erlang:make_ref(),
|
||||||
ok = do_range_fold(State#state.c, WorkerPID, CRef, Range),
|
ok = do_range_fold(Mod,State#state.c, WorkerPID, CRef, Range),
|
||||||
|
|
||||||
BRef = erlang:make_ref(),
|
BRef = erlang:make_ref(),
|
||||||
ok = do_range_fold(State#state.b, WorkerPID, BRef, Range),
|
ok = do_range_fold(Mod,State#state.b, WorkerPID, BRef, Range),
|
||||||
|
|
||||||
ARef = erlang:make_ref(),
|
ARef = erlang:make_ref(),
|
||||||
ok = do_range_fold(State#state.a, WorkerPID, ARef, Range),
|
ok = do_range_fold(Mod,State#state.a, WorkerPID, ARef, Range),
|
||||||
|
|
||||||
[ARef,BRef,CRef|List]
|
[ARef,BRef,CRef|List]
|
||||||
end,
|
end,
|
||||||
|
@ -542,9 +559,9 @@ main_loop(State = #state{ next=Next }) ->
|
||||||
undefined ->
|
undefined ->
|
||||||
main_loop(State2#state{ merge_pid=undefined });
|
main_loop(State2#state{ merge_pid=undefined });
|
||||||
CFile ->
|
CFile ->
|
||||||
ok = hanoidb_reader:close(CFile),
|
ok = Mod:close_random_reader(CFile),
|
||||||
ok = file:rename(filename("C", State2), filename("A", State2)),
|
ok = file:rename(filename("C", State2), filename("A", State2)),
|
||||||
{ok, AFile} = hanoidb_reader:open(filename("A", State2), [random|State#state.opts]),
|
{ok, AFile} = Mod:open_random_reader(filename("A", State2), [random|State#state.opts]),
|
||||||
main_loop(State2#state{ a = AFile, c = undefined, merge_pid=undefined })
|
main_loop(State2#state{ a = AFile, c = undefined, merge_pid=undefined })
|
||||||
end;
|
end;
|
||||||
|
|
||||||
|
@ -565,7 +582,7 @@ main_loop(State = #state{ next=Next }) ->
|
||||||
% then, rename M to A, and open it
|
% then, rename M to A, and open it
|
||||||
AFileName = filename("A",State2),
|
AFileName = filename("A",State2),
|
||||||
ok = file:rename(MFileName, AFileName),
|
ok = file:rename(MFileName, AFileName),
|
||||||
{ok, AFile} = hanoidb_reader:open(AFileName, [random|State#state.opts]),
|
{ok, AFile} = Mod:open_random_reader(AFileName, [random|State#state.opts]),
|
||||||
|
|
||||||
% iff there is a C file, then move it to B position
|
% iff there is a C file, then move it to B position
|
||||||
% TODO: consider recovery for this
|
% TODO: consider recovery for this
|
||||||
|
@ -573,9 +590,9 @@ main_loop(State = #state{ next=Next }) ->
|
||||||
undefined ->
|
undefined ->
|
||||||
main_loop(State2#state{ a=AFile, b=undefined, merge_pid=undefined });
|
main_loop(State2#state{ a=AFile, b=undefined, merge_pid=undefined });
|
||||||
CFile ->
|
CFile ->
|
||||||
ok = hanoidb_reader:close(CFile),
|
ok = Mod:close_random_reader(CFile),
|
||||||
ok = file:rename(filename("C", State2), filename("B", State2)),
|
ok = file:rename(filename("C", State2), filename("B", State2)),
|
||||||
{ok, BFile} = hanoidb_reader:open(filename("B", State2), [random|State#state.opts]),
|
{ok, BFile} = Mod:open_random_reader(filename("B", State2), [random|State#state.opts]),
|
||||||
check_begin_merge_then_loop(State2#state{ a=AFile, b=BFile, c=undefined,
|
check_begin_merge_then_loop(State2#state{ a=AFile, b=BFile, c=undefined,
|
||||||
merge_pid=undefined })
|
merge_pid=undefined })
|
||||||
end;
|
end;
|
||||||
|
@ -722,9 +739,9 @@ reply_step_ok(State) ->
|
||||||
case State#state.step_caller of
|
case State#state.step_caller of
|
||||||
undefined ->
|
undefined ->
|
||||||
ok;
|
ok;
|
||||||
_ ->
|
Caller ->
|
||||||
?log("step_ok -> ~p", [State#state.step_caller]),
|
?log("step_ok -> ~p", [Caller]),
|
||||||
plain_rpc:send_reply(State#state.step_caller, step_ok)
|
plain_rpc:send_reply(Caller, step_ok)
|
||||||
end,
|
end,
|
||||||
State#state{ step_caller=undefined }.
|
State#state{ step_caller=undefined }.
|
||||||
|
|
||||||
|
@ -738,27 +755,28 @@ total_unmerged(State) ->
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
do_lookup(_Key, []) ->
|
do_lookup(_, _Key, []) ->
|
||||||
not_found;
|
not_found;
|
||||||
do_lookup(_Key, [Pid]) when is_pid(Pid) ->
|
do_lookup(Mod, Key, [undefined|Rest]) ->
|
||||||
{delegate, Pid};
|
do_lookup(Mod, Key, Rest);
|
||||||
do_lookup(Key, [undefined|Rest]) ->
|
do_lookup(Mod, Key, [BT|Rest]) ->
|
||||||
do_lookup(Key, Rest);
|
case Mod:lookup(Key, BT) of
|
||||||
do_lookup(Key, [BT|Rest]) ->
|
|
||||||
case hanoidb_reader:lookup(BT, Key) of
|
|
||||||
{ok, ?TOMBSTONE} ->
|
{ok, ?TOMBSTONE} ->
|
||||||
not_found;
|
not_found;
|
||||||
{ok, Result} ->
|
{ok, Result} ->
|
||||||
{found, Result};
|
{found, Result};
|
||||||
not_found ->
|
not_found ->
|
||||||
do_lookup(Key, Rest)
|
do_lookup(Mod,Key, Rest)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
close_if_defined(undefined) -> ok;
|
close_if_defined(_, undefined) -> ok;
|
||||||
close_if_defined(BT) -> hanoidb_reader:close(BT).
|
close_if_defined(Mod, BT) -> Mod:close_random_reader(BT).
|
||||||
|
|
||||||
destroy_if_defined(undefined) -> ok;
|
destroy_if_defined(_, undefined) -> ok;
|
||||||
destroy_if_defined(BT) -> hanoidb_reader:destroy(BT).
|
destroy_if_defined(Mod, BT) ->
|
||||||
|
{ok, Name} = Mod:file_name(BT),
|
||||||
|
Mod:close_random_reader(BT),
|
||||||
|
file:delete(Name).
|
||||||
|
|
||||||
stop_if_defined(undefined) -> ok;
|
stop_if_defined(undefined) -> ok;
|
||||||
stop_if_defined(MergePid) when is_pid(MergePid) ->
|
stop_if_defined(MergePid) when is_pid(MergePid) ->
|
||||||
|
@ -785,10 +803,12 @@ begin_merge(State) ->
|
||||||
try
|
try
|
||||||
?log("merge begun~n", []),
|
?log("merge begun~n", []),
|
||||||
|
|
||||||
{ok, OutCount} = hanoidb_merger:merge(AFileName, BFileName, XFileName,
|
{ok, OutCount} = hanoidb_backend:merge(
|
||||||
?BTREE_SIZE(State#state.level + 1),
|
State#state.backend,
|
||||||
State#state.next =:= undefined,
|
AFileName, BFileName, XFileName,
|
||||||
State#state.opts ),
|
?BTREE_SIZE(State#state.level + 1),
|
||||||
|
State#state.next =:= undefined,
|
||||||
|
State#state.opts ),
|
||||||
|
|
||||||
Owner ! ?CAST(self(),{merge_done, OutCount, XFileName})
|
Owner ! ?CAST(self(),{merge_done, OutCount, XFileName})
|
||||||
catch
|
catch
|
||||||
|
@ -805,9 +825,10 @@ begin_merge(State) ->
|
||||||
close_and_delete_a_and_b(State) ->
|
close_and_delete_a_and_b(State) ->
|
||||||
AFileName = filename("A",State),
|
AFileName = filename("A",State),
|
||||||
BFileName = filename("B",State),
|
BFileName = filename("B",State),
|
||||||
|
Mod = State#state.backend,
|
||||||
|
|
||||||
ok = hanoidb_reader:close(State#state.a),
|
ok = Mod:close_random_reader(State#state.a),
|
||||||
ok = hanoidb_reader:close(State#state.b),
|
ok = Mod:close_random_reader(State#state.b),
|
||||||
|
|
||||||
ok = file:delete(AFileName),
|
ok = file:delete(AFileName),
|
||||||
ok = file:delete(BFileName),
|
ok = file:delete(BFileName),
|
||||||
|
@ -821,14 +842,15 @@ filename(PFX, State) ->
|
||||||
|
|
||||||
start_range_fold(FileName, WorkerPID, Range, State) ->
|
start_range_fold(FileName, WorkerPID, Range, State) ->
|
||||||
Owner = self(),
|
Owner = self(),
|
||||||
|
Mod = State#state.backend,
|
||||||
PID = proc_lib:spawn( fun() ->
|
PID = proc_lib:spawn( fun() ->
|
||||||
try
|
try
|
||||||
?log("start_range_fold ~p on ~p -> ~p", [self(), FileName, WorkerPID]),
|
?log("start_range_fold ~p on ~p -> ~p", [self(), FileName, WorkerPID]),
|
||||||
erlang:link(WorkerPID),
|
erlang:link(WorkerPID),
|
||||||
{ok, File} = hanoidb_reader:open(FileName, [folding|State#state.opts]),
|
{ok, File} = Mod:open_random_reader(FileName, [folding|State#state.opts]),
|
||||||
do_range_fold2(File, WorkerPID, self(), Range),
|
do_range_fold2(Mod,File, WorkerPID, self(), Range),
|
||||||
erlang:unlink(WorkerPID),
|
erlang:unlink(WorkerPID),
|
||||||
hanoidb_reader:close(File),
|
Mod:close_random_reader(File),
|
||||||
|
|
||||||
%% this will release the pinning of the fold file
|
%% this will release the pinning of the fold file
|
||||||
Owner ! {range_fold_done, self(), FileName},
|
Owner ! {range_fold_done, self(), FileName},
|
||||||
|
@ -840,12 +862,13 @@ start_range_fold(FileName, WorkerPID, Range, State) ->
|
||||||
end ),
|
end ),
|
||||||
{ok, PID}.
|
{ok, PID}.
|
||||||
|
|
||||||
-spec do_range_fold(BT :: hanoidb_reader:read_file(),
|
-spec do_range_fold(Mod :: atom(),
|
||||||
|
BT :: hanoidb_backend:random_reader(),
|
||||||
WorkerPID :: pid(),
|
WorkerPID :: pid(),
|
||||||
SelfOrRef :: pid() | reference(),
|
SelfOrRef :: pid() | reference(),
|
||||||
Range :: #key_range{} ) -> ok.
|
Range :: #key_range{} ) -> ok.
|
||||||
do_range_fold(BT, WorkerPID, SelfOrRef, Range) ->
|
do_range_fold(Mod, BT, WorkerPID, SelfOrRef, Range) ->
|
||||||
case hanoidb_reader:range_fold(fun(Key,Value,_) ->
|
case Mod:range_fold(fun(Key,Value,_) ->
|
||||||
WorkerPID ! {level_result, SelfOrRef, Key, Value},
|
WorkerPID ! {level_result, SelfOrRef, Key, Value},
|
||||||
ok
|
ok
|
||||||
end,
|
end,
|
||||||
|
@ -863,12 +886,13 @@ do_range_fold(BT, WorkerPID, SelfOrRef, Range) ->
|
||||||
|
|
||||||
-define(FOLD_CHUNK_SIZE, 100).
|
-define(FOLD_CHUNK_SIZE, 100).
|
||||||
|
|
||||||
-spec do_range_fold2(BT :: hanoidb_reader:read_file(),
|
-spec do_range_fold2(Mod :: atom(),
|
||||||
|
BT :: hanoidb_reader:read_file(),
|
||||||
WorkerPID :: pid(),
|
WorkerPID :: pid(),
|
||||||
SelfOrRef :: pid() | reference(),
|
SelfOrRef :: pid() | reference(),
|
||||||
Range :: #key_range{} ) -> ok.
|
Range :: #key_range{} ) -> ok.
|
||||||
do_range_fold2(BT, WorkerPID, SelfOrRef, Range) ->
|
do_range_fold2(Mod, BT, WorkerPID, SelfOrRef, Range) ->
|
||||||
try hanoidb_reader:range_fold(fun(Key,Value,{0,KVs}) ->
|
try Mod:range_fold(fun(Key,Value,{0,KVs}) ->
|
||||||
send(WorkerPID, SelfOrRef, [{Key,Value}|KVs]),
|
send(WorkerPID, SelfOrRef, [{Key,Value}|KVs]),
|
||||||
{?FOLD_CHUNK_SIZE-1, []};
|
{?FOLD_CHUNK_SIZE-1, []};
|
||||||
(Key,Value,{N,KVs}) ->
|
(Key,Value,{N,KVs}) ->
|
||||||
|
|
|
@ -33,7 +33,7 @@
|
||||||
-define(ASSERT_WHEN(X), when X).
|
-define(ASSERT_WHEN(X), when X).
|
||||||
|
|
||||||
-export([open/1, open/2,close/1,lookup/2,fold/3,range_fold/4, destroy/1]).
|
-export([open/1, open/2,close/1,lookup/2,fold/3,range_fold/4, destroy/1]).
|
||||||
-export([first_node/1,next_node/1]).
|
-export([first_node/1,next_node/1,file_name/1]).
|
||||||
-export([serialize/1, deserialize/1]).
|
-export([serialize/1, deserialize/1]).
|
||||||
|
|
||||||
-record(node, {level :: non_neg_integer(),
|
-record(node, {level :: non_neg_integer(),
|
||||||
|
@ -61,6 +61,7 @@ open(Name, Config) ->
|
||||||
ReadBufferSize = hanoidb:get_opt(read_buffer_size, Config, 512 * 1024),
|
ReadBufferSize = hanoidb:get_opt(read_buffer_size, Config, 512 * 1024),
|
||||||
case file:open(Name, [raw,read,{read_ahead, ReadBufferSize},binary]) of
|
case file:open(Name, [raw,read,{read_ahead, ReadBufferSize},binary]) of
|
||||||
{ok, File} ->
|
{ok, File} ->
|
||||||
|
{ok, Pos} = file:position(File, ?FIRST_BLOCK_POS),
|
||||||
{ok, #index{file=File, name=Name, config=Config}};
|
{ok, #index{file=File, name=Name, config=Config}};
|
||||||
{error, _}=Err ->
|
{error, _}=Err ->
|
||||||
Err
|
Err
|
||||||
|
@ -99,6 +100,9 @@ open(Name, Config) ->
|
||||||
{ok, #index{file=File, root=Root, bloom=Bloom, name=Name, config=Config}}
|
{ok, #index{file=File, root=Root, bloom=Bloom, name=Name, config=Config}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
file_name(#index{name=Name}) ->
|
||||||
|
{ok, Name}.
|
||||||
|
|
||||||
destroy(#index{file=File, name=Name}) ->
|
destroy(#index{file=File, name=Name}) ->
|
||||||
ok = file:close(File),
|
ok = file:close(File),
|
||||||
file:delete(Name).
|
file:delete(Name).
|
||||||
|
@ -130,7 +134,9 @@ fold1(File,Fun,Acc0) ->
|
||||||
eof ->
|
eof ->
|
||||||
Acc0;
|
Acc0;
|
||||||
{ok, Node} ->
|
{ok, Node} ->
|
||||||
fold0(File,Fun,Node,Acc0)
|
fold0(File,Fun,Node,Acc0);
|
||||||
|
{error,_}=Err ->
|
||||||
|
exit(Err)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec range_fold(function(), any(), #index{}, #key_range{}) ->
|
-spec range_fold(function(), any(), #index{}, #key_range{}) ->
|
||||||
|
@ -193,7 +199,10 @@ do_range_fold(Fun, Acc0, File, Range, undefined) ->
|
||||||
{stopped, Result} -> Result;
|
{stopped, Result} -> Result;
|
||||||
{ok, Acc1} ->
|
{ok, Acc1} ->
|
||||||
do_range_fold(Fun, Acc1, File, Range, undefined)
|
do_range_fold(Fun, Acc1, File, Range, undefined)
|
||||||
end
|
end;
|
||||||
|
|
||||||
|
{error,_}=Err ->
|
||||||
|
Err
|
||||||
end;
|
end;
|
||||||
|
|
||||||
do_range_fold(Fun, Acc0, File, Range, N0) ->
|
do_range_fold(Fun, Acc0, File, Range, N0) ->
|
||||||
|
@ -230,7 +239,10 @@ do_range_fold(Fun, Acc0, File, Range, N0) ->
|
||||||
{stopped, Result} -> Result;
|
{stopped, Result} -> Result;
|
||||||
{ok, {N2, Acc1}} ->
|
{ok, {N2, Acc1}} ->
|
||||||
do_range_fold(Fun, Acc1, File, Range, N2)
|
do_range_fold(Fun, Acc1, File, Range, N2)
|
||||||
end
|
end;
|
||||||
|
|
||||||
|
{error,_}=Err ->
|
||||||
|
Err
|
||||||
end.
|
end.
|
||||||
|
|
||||||
lookup_node(_File,_FromKey,#node{level=0},Pos) ->
|
lookup_node(_File,_FromKey,#node{level=0},Pos) ->
|
||||||
|
@ -269,7 +281,10 @@ next_node(#index{file=File}=_Index) ->
|
||||||
% {ok, #node{level=N}} when N>0 ->
|
% {ok, #node{level=N}} when N>0 ->
|
||||||
% next_node(Index);
|
% next_node(Index);
|
||||||
eof ->
|
eof ->
|
||||||
end_of_data
|
end_of_data;
|
||||||
|
|
||||||
|
{error,_}=Err ->
|
||||||
|
Err
|
||||||
end.
|
end.
|
||||||
|
|
||||||
close(#index{file=undefined}) ->
|
close(#index{file=undefined}) ->
|
||||||
|
@ -413,6 +428,8 @@ next_leaf_node(File) ->
|
||||||
hanoidb_util:decode_index_node(0, Data);
|
hanoidb_util:decode_index_node(0, Data);
|
||||||
{ok, <<Len:32/unsigned, _:16/unsigned>>} ->
|
{ok, <<Len:32/unsigned, _:16/unsigned>>} ->
|
||||||
{ok, _} = file:position(File, {cur,Len-2}),
|
{ok, _} = file:position(File, {cur,Len-2}),
|
||||||
next_leaf_node(File)
|
next_leaf_node(File);
|
||||||
|
{error,_}=Err ->
|
||||||
|
Err
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue