Compare commits

...

2 commits

Author SHA1 Message Date
Kresten Krab Thorup
2a205db9fe Use backend API in nursery also 2012-10-22 10:51:08 +02:00
Kresten Krab Thorup
3a43d9235b Separate out hanoid_backend API
First step towards a hanoidb backend API, which
will allow replacing the storage engine with
any ordered storage.

We still need an API for how to choose backend
and perhaps also how to discover the backend
of an existing store.
2012-10-22 02:00:47 +02:00
7 changed files with 374 additions and 84 deletions

View file

@ -27,3 +27,5 @@
-define(REPLY(Ref,Msg), {'$reply', Ref, Msg}).
-define(CAST(From,Msg), {'$cast', From, Msg}).
-type caller() :: { pid(), reference() }.

View file

@ -57,7 +57,9 @@
max_level :: integer(),
config=[] :: [{atom(), term()}],
step=0 :: integer(),
merge_done=0 :: integer()}).
merge_done=0 :: integer(),
backend = hanoidb_han2_backend :: atom()
}).
-type kventry() :: { key(), expvalue() } | [ kventry() ].
-type key() :: binary().

174
src/hanoidb_backend.erl Normal file
View file

@ -0,0 +1,174 @@
-module(hanoidb_backend).
-include("include/hanoidb.hrl").
-include("src/hanoidb.hrl").
-type options() :: [ atom() | { atom(), term() } ].
-type kvexp_entry() :: { Key :: key(), Value :: value(), TimeOut :: expiry() }.
-type batch_reader() :: any().
-type batch_writer() :: any().
-type random_reader() :: any().
-export([merge/7]).
%%%=========================================================================
%%% API
%%%=========================================================================
%% batch_reader and batch_writer are used by the merging logic. A batch_reader
%% must return the values in lexicographical order of the binary keys.
-callback open_batch_reader(File :: string(), Options :: options())
-> {ok, batch_reader()} | { error, term() }.
-callback read_next(batch_reader())
-> { [kvexp_entry(), ...], batch_reader()} | 'done'.
-callback close_batch_reader( batch_reader() )
-> ok | {error, term()}.
-callback open_batch_writer(File :: string(), Options :: options())
-> {ok, batch_writer()} | {error, term()}.
-callback write_next( kvexp_entry() , batch_writer() )
-> {ok, batch_writer()} | {error, term()}.
-callback write_count( batch_writer() ) ->
{ok, non_neg_integer()} | {error, term()}.
-callback close_batch_writer( batch_writer() )
-> ok | {error, term()}.
-callback open_random_reader(File :: string(), Options :: options()) ->
{ok, random_reader()} | {error, term()}.
-callback file_name( random_reader() ) ->
{ok, string()} | {error, term()}.
-callback lookup( Key :: key(), random_reader() ) ->
not_found | {ok, value()}.
-callback range_fold( fun( (key(), value(), term()) -> term() ),
Acc0 :: term(),
Reader :: random_reader(),
Range :: #key_range{} ) ->
{limit, term(), LastKey :: binary()} | {ok, term()}.
-callback close_random_reader(random_reader()) ->
ok | {error, term()}.
-spec merge(atom(), string(), string(), string(), integer(), boolean(), list()) -> {ok, integer()}.
merge(Mod,A,B,C, Size, IsLastLevel, Options) ->
{ok, IXA} = Mod:open_batch_reader(A, Options),
{ok, IXB} = Mod:open_batch_reader(B, Options),
{ok, Out} = Mod:open_batch_writer(C, [{size, Size} | Options]),
scan(Mod,IXA, IXB, Out, IsLastLevel, [], [], {0, none}).
terminate(Mod, Out) ->
{ok, Count} = Mod:write_count( Out ),
ok = Mod:close_batch_writer( Out ),
{ok, Count}.
step(S) ->
step(S, 1).
step({N, From}, Steps) ->
{N-Steps, From}.
scan(Mod, IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {N, FromPID}) when N < 1, AKVs =/= [], BKVs =/= [] ->
case FromPID of
none ->
ok;
{PID, Ref} ->
PID ! {Ref, step_done}
end,
receive
{step, From, HowMany} ->
scan(Mod, IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {N+HowMany, From})
end;
scan(Mod, IXA, IXB, Out, IsLastLevel, [], BKVs, Step) ->
case Mod:read_next(IXA) of
{AKVs, IXA2} ->
scan(Mod, IXA2, IXB, Out, IsLastLevel, AKVs, BKVs, Step);
done ->
ok = Mod:close_batch_reader(IXA),
scan_only(Mod, IXB, Out, IsLastLevel, BKVs, Step)
end;
scan(Mod, IXA, IXB, Out, IsLastLevel, AKVs, [], Step) ->
case Mod:read_next(IXB) of
{BKVs, IXB2} ->
scan(Mod, IXA, IXB2, Out, IsLastLevel, AKVs, BKVs, Step);
done ->
ok = Mod:close_batch_reader(IXB),
scan_only(Mod, IXA, Out, IsLastLevel, AKVs, Step)
end;
scan(Mod, IXA, IXB, Out, IsLastLevel, [{Key1,_,_}=Entry|AT], [{Key2,_,_}|_]=BKVs, Step)
when Key1 < Key2 ->
case Entry of
{_, ?TOMBSTONE, _} when IsLastLevel ->
scan(Mod, IXA, IXB, Out, true, AT, BKVs, step(Step));
_ ->
{ok, Out3} = Mod:write_next( Entry, Out ),
scan(Mod, IXA, IXB, Out3, IsLastLevel, AT, BKVs, step(Step))
end;
scan(Mod, IXA, IXB, Out, IsLastLevel, [{Key1,_,_}|_]=AKVs, [{Key2,_,_}=Entry|BT], Step)
when Key1 > Key2 ->
case Entry of
{_, ?TOMBSTONE, _} when IsLastLevel ->
scan(Mod, IXA, IXB, Out, true, AKVs, BT, step(Step));
_ ->
{ok, Out3} = Mod:write_next( Entry, Out ),
scan(Mod, IXA, IXB, Out3, IsLastLevel, AKVs, BT, step(Step))
end;
scan(Mod, IXA, IXB, Out, IsLastLevel, [_|AT], [Entry|BT], Step) ->
case Entry of
{_, ?TOMBSTONE, _} when IsLastLevel ->
scan(Mod, IXA, IXB, Out, true, AT, BT, step(Step));
_ ->
{ok, Out3} = Mod:write_next( Entry, Out ),
scan(Mod, IXA, IXB, Out3, IsLastLevel, AT, BT, step(Step, 2))
end.
scan_only(Mod, IX, Out, IsLastLevel, KVs, {N, FromPID}) when N < 1, KVs =/= [] ->
case FromPID of
none ->
ok;
{PID, Ref} ->
PID ! {Ref, step_done}
end,
receive
{step, From, HowMany} ->
scan_only(Mod, IX, Out, IsLastLevel, KVs, {N+HowMany, From})
end;
scan_only(Mod, IX, Out, IsLastLevel, [], {_, FromPID}=Step) ->
case Mod:read_next(IX) of
{KVs, IX2} ->
scan_only(Mod, IX2, Out, IsLastLevel, KVs, Step);
done ->
case FromPID of
none ->
ok;
{PID, Ref} ->
PID ! {Ref, step_done}
end,
ok = Mod:close_batch_reader(IX),
terminate(Mod, Out)
end;
scan_only(Mod, IX, Out, true, [{_,?TOMBSTONE,_}|Rest], Step) ->
scan_only(Mod, IX, Out, true, Rest, step(Step));
scan_only(Mod, IX, Out, IsLastLevel, [Entry|Rest], Step) ->
{ok, Out3} = Mod:write_next( Entry, Out ),
scan_only(Mod, IX, Out3, IsLastLevel, Rest, step(Step)).

View file

@ -0,0 +1,70 @@
-module(hanoidb_han2_backend).
-include("hanoidb.hrl").
-behavior(hanoidb_backend).
-export([open_random_reader/2, file_name/1, range_fold/4, lookup/2, close_random_reader/1]).
-export([open_batch_reader/2, read_next/1, close_batch_reader/1]).
-export([open_batch_writer/2, write_next/2, write_count/1, close_batch_writer/1]).
open_random_reader(Name, Options) ->
hanoidb_reader:open(Name, [random|Options]).
file_name(Reader) ->
hanoidb_reader:file_name(Reader).
lookup(Key, Reader) ->
hanoidb_reader:lookup(Reader, Key).
range_fold(Fun, Acc, Reader, Range) ->
hanoidb_reader:range_fold(Fun, Acc, Reader, Range).
close_random_reader(Reader) ->
hanoidb_reader:close(Reader).
open_batch_reader(Name, Options) ->
hanoidb_reader:open(Name, [sequential|Options]).
read_next(Reader) ->
case hanoidb_reader:next_node(Reader) of
{node, KVs} ->
{[ unfold(KV) || KV <- KVs], Reader};
end_of_data ->
'done';
{error, _}=Err ->
Err
end.
unfold({Key,{Value, Expiry}}) when is_binary(Value); ?TOMBSTONE =:= Value ->
{Key,Value,Expiry};
unfold({Key,Value}) ->
{Key, Value, infinity}.
close_batch_reader(Reader) ->
hanoidb_reader:close(Reader).
open_batch_writer(Name, Options) ->
hanoidb_writer:init([Name, Options]).
write_next( {Key, Value, infinity}, Writer) ->
{noreply, Writer2} = hanoidb_writer:handle_cast({add, Key, Value}, Writer),
{ok, Writer2};
write_next( {Key, Value, Expiry}, Writer) ->
{noreply, Writer2} = hanoidb_writer:handle_cast({add, Key, {Value, Expiry}}, Writer),
{ok, Writer2}.
write_count( Writer ) ->
case hanoidb_writer:handle_call(count, self(), Writer) of
{ok, Count, _} ->
{ok, Count};
Err ->
{error, Err}
end.
close_batch_writer(Writer) ->
{stop, normal, ok, _} = hanoidb_writer:handle_call(close, self(), Writer),
ok.

View file

@ -51,9 +51,24 @@
-include_lib("kernel/include/file.hrl").
-record(state, {
a, b, c, next, dir, level, inject_done_ref, merge_pid, folding = [],
step_next_ref, step_caller, step_merge_ref,
opts = [], owner, work_in_progress=0, work_done=0, max_level=?TOP_LEVEL
a :: undefined | hanoidb_backend:random_reader(),
b :: undefined | hanoidb_backend:random_reader(),
c :: undefined | hanoidb_backend:random_reader(),
next :: pid() | undefined,
dir :: string(),
level :: non_neg_integer(),
inject_done_ref :: reference(),
merge_pid :: pid(),
folding = [] :: list( pid() ),
step_next_ref :: reference() | undefined,
step_caller :: plain_rpc:caller() | undefined,
step_merge_ref :: reference() | undefined,
opts = [] :: list(),
owner :: pid(),
work_in_progress=0 :: non_neg_integer(),
work_done=0 :: non_neg_integer(),
max_level=?TOP_LEVEL :: pos_integer(),
backend = hanoidb_han2_backend :: atom()
}).
@ -177,6 +192,8 @@ initialize2(State) ->
file:delete(filename("BF",State)),
file:delete(filename("CF",State)),
Mod = State#state.backend,
case file:read_file_info(MFileName) of
{ok, _} ->
@ -188,12 +205,12 @@ initialize2(State) ->
file:delete(BFileName),
ok = file:rename(MFileName, AFileName),
{ok, IXA} = hanoidb_reader:open(AFileName, [random|State#state.opts]),
{ok, IXA} = Mod:open_random_reader(AFileName, [random|State#state.opts]),
case file:read_file_info(CFileName) of
{ok, _} ->
file:rename(CFileName, BFileName),
{ok, IXB} = hanoidb_reader:open(BFileName, [random|State#state.opts]),
{ok, IXB} = Mod:open_random_reader(BFileName, [random|State#state.opts]),
check_begin_merge_then_loop0(init_state(State#state{ a= IXA, b=IXB }));
{error, enoent} ->
@ -203,13 +220,13 @@ initialize2(State) ->
{error, enoent} ->
case file:read_file_info(BFileName) of
{ok, _} ->
{ok, IXA} = hanoidb_reader:open(AFileName, [random|State#state.opts]),
{ok, IXB} = hanoidb_reader:open(BFileName, [random|State#state.opts]),
{ok, IXA} = Mod:open_random_reader(AFileName, [random|State#state.opts]),
{ok, IXB} = Mod:open_random_reader(BFileName, [random|State#state.opts]),
IXC =
case file:read_file_info(CFileName) of
{ok, _} ->
{ok, C} = hanoidb_reader:open(CFileName, [random|State#state.opts]),
{ok, C} = Mod:open_random_reader(CFileName, [random|State#state.opts]),
C;
{error, enoent} ->
undefined
@ -224,7 +241,7 @@ initialize2(State) ->
case file:read_file_info(AFileName) of
{ok, _} ->
{ok, IXA} = hanoidb_reader:open(AFileName, [random|State#state.opts]),
{ok, IXA} = Mod:open_random_reader(AFileName, [random|State#state.opts]),
main_loop(init_state(State#state{ a=IXA }));
{error, enoent} ->
@ -263,28 +280,28 @@ check_begin_merge_then_loop(State=#state{a=IXA, b=IXB, merge_pid=undefined})
check_begin_merge_then_loop(State) ->
main_loop(State).
main_loop(State = #state{ next=Next }) ->
main_loop(State = #state{ next=Next, backend=Mod }) ->
Parent = plain_fsm:info(parent),
receive
?CALL(From, {lookup, Key})=Req ->
case do_lookup(Key, [State#state.c, State#state.b, State#state.a, Next]) of
not_found ->
case do_lookup(Mod, Key, [State#state.c, State#state.b, State#state.a]) of
not_found when Next =:= undefined ->
plain_rpc:send_reply(From, not_found);
{found, Result} ->
plain_rpc:send_reply(From, {ok, Result});
{delegate, DelegatePid} ->
DelegatePid ! Req
not_found ->
Next ! Req
end,
main_loop(State);
?CAST(_From, {lookup, Key, ReplyFun})=Req ->
case do_lookup(Key, [State#state.c, State#state.b, State#state.a, Next]) of
not_found ->
case do_lookup(Mod, Key, [State#state.c, State#state.b, State#state.a]) of
not_found when Next =:= undefined ->
ReplyFun(not_found);
{found, Result} ->
ReplyFun({ok, Result});
{delegate, DelegatePid} ->
DelegatePid ! Req
not_found ->
Next ! Req
end,
main_loop(State);
@ -310,7 +327,7 @@ main_loop(State = #state{ next=Next }) ->
plain_rpc:send_reply(From, ok),
case hanoidb_reader:open(ToFileName, [random|State#state.opts]) of
case Mod:open_random_reader(ToFileName, [random|State#state.opts]) of
{ok, BT} ->
if SetPos == #state.b ->
check_begin_merge_then_loop(setelement(SetPos, State, BT));
@ -406,9 +423,9 @@ main_loop(State = #state{ next=Next }) ->
main_loop(State2#state{ step_next_ref=undefined });
?CALL(From, close) ->
close_if_defined(State#state.a),
close_if_defined(State#state.b),
close_if_defined(State#state.c),
close_if_defined(Mod, State#state.a),
close_if_defined(Mod, State#state.b),
close_if_defined(Mod, State#state.c),
[stop_if_defined(PID) || PID <- [State#state.merge_pid | State#state.folding]],
%% this is synchronous all the way down, because our
@ -422,9 +439,9 @@ main_loop(State = #state{ next=Next }) ->
{ok, closing};
?CALL(From, destroy) ->
destroy_if_defined(State#state.a),
destroy_if_defined(State#state.b),
destroy_if_defined(State#state.c),
destroy_if_defined(Mod,State#state.a),
destroy_if_defined(Mod,State#state.b),
destroy_if_defined(Mod,State#state.c),
[stop_if_defined(PID) || PID <- [State#state.merge_pid | State#state.folding]],
%% this is synchronous all the way down, because our
@ -496,27 +513,27 @@ main_loop(State = #state{ next=Next }) ->
{_, undefined, undefined} ->
ARef = erlang:make_ref(),
ok = do_range_fold(State#state.a, WorkerPID, ARef, Range),
ok = do_range_fold(Mod,State#state.a, WorkerPID, ARef, Range),
[ARef|List];
{_, _, undefined} ->
BRef = erlang:make_ref(),
ok = do_range_fold(State#state.b, WorkerPID, BRef, Range),
ok = do_range_fold(Mod,State#state.b, WorkerPID, BRef, Range),
ARef = erlang:make_ref(),
ok = do_range_fold(State#state.a, WorkerPID, ARef, Range),
ok = do_range_fold(Mod,State#state.a, WorkerPID, ARef, Range),
[ARef,BRef|List];
{_, _, _} ->
CRef = erlang:make_ref(),
ok = do_range_fold(State#state.c, WorkerPID, CRef, Range),
ok = do_range_fold(Mod,State#state.c, WorkerPID, CRef, Range),
BRef = erlang:make_ref(),
ok = do_range_fold(State#state.b, WorkerPID, BRef, Range),
ok = do_range_fold(Mod,State#state.b, WorkerPID, BRef, Range),
ARef = erlang:make_ref(),
ok = do_range_fold(State#state.a, WorkerPID, ARef, Range),
ok = do_range_fold(Mod,State#state.a, WorkerPID, ARef, Range),
[ARef,BRef,CRef|List]
end,
@ -542,9 +559,9 @@ main_loop(State = #state{ next=Next }) ->
undefined ->
main_loop(State2#state{ merge_pid=undefined });
CFile ->
ok = hanoidb_reader:close(CFile),
ok = Mod:close_random_reader(CFile),
ok = file:rename(filename("C", State2), filename("A", State2)),
{ok, AFile} = hanoidb_reader:open(filename("A", State2), [random|State#state.opts]),
{ok, AFile} = Mod:open_random_reader(filename("A", State2), [random|State#state.opts]),
main_loop(State2#state{ a = AFile, c = undefined, merge_pid=undefined })
end;
@ -565,7 +582,7 @@ main_loop(State = #state{ next=Next }) ->
% then, rename M to A, and open it
AFileName = filename("A",State2),
ok = file:rename(MFileName, AFileName),
{ok, AFile} = hanoidb_reader:open(AFileName, [random|State#state.opts]),
{ok, AFile} = Mod:open_random_reader(AFileName, [random|State#state.opts]),
% iff there is a C file, then move it to B position
% TODO: consider recovery for this
@ -573,9 +590,9 @@ main_loop(State = #state{ next=Next }) ->
undefined ->
main_loop(State2#state{ a=AFile, b=undefined, merge_pid=undefined });
CFile ->
ok = hanoidb_reader:close(CFile),
ok = Mod:close_random_reader(CFile),
ok = file:rename(filename("C", State2), filename("B", State2)),
{ok, BFile} = hanoidb_reader:open(filename("B", State2), [random|State#state.opts]),
{ok, BFile} = Mod:open_random_reader(filename("B", State2), [random|State#state.opts]),
check_begin_merge_then_loop(State2#state{ a=AFile, b=BFile, c=undefined,
merge_pid=undefined })
end;
@ -722,9 +739,9 @@ reply_step_ok(State) ->
case State#state.step_caller of
undefined ->
ok;
_ ->
?log("step_ok -> ~p", [State#state.step_caller]),
plain_rpc:send_reply(State#state.step_caller, step_ok)
Caller ->
?log("step_ok -> ~p", [Caller]),
plain_rpc:send_reply(Caller, step_ok)
end,
State#state{ step_caller=undefined }.
@ -738,27 +755,28 @@ total_unmerged(State) ->
do_lookup(_Key, []) ->
do_lookup(_, _Key, []) ->
not_found;
do_lookup(_Key, [Pid]) when is_pid(Pid) ->
{delegate, Pid};
do_lookup(Key, [undefined|Rest]) ->
do_lookup(Key, Rest);
do_lookup(Key, [BT|Rest]) ->
case hanoidb_reader:lookup(BT, Key) of
do_lookup(Mod, Key, [undefined|Rest]) ->
do_lookup(Mod, Key, Rest);
do_lookup(Mod, Key, [BT|Rest]) ->
case Mod:lookup(Key, BT) of
{ok, ?TOMBSTONE} ->
not_found;
{ok, Result} ->
{found, Result};
not_found ->
do_lookup(Key, Rest)
do_lookup(Mod,Key, Rest)
end.
close_if_defined(undefined) -> ok;
close_if_defined(BT) -> hanoidb_reader:close(BT).
close_if_defined(_, undefined) -> ok;
close_if_defined(Mod, BT) -> Mod:close_random_reader(BT).
destroy_if_defined(undefined) -> ok;
destroy_if_defined(BT) -> hanoidb_reader:destroy(BT).
destroy_if_defined(_, undefined) -> ok;
destroy_if_defined(Mod, BT) ->
{ok, Name} = Mod:file_name(BT),
Mod:close_random_reader(BT),
file:delete(Name).
stop_if_defined(undefined) -> ok;
stop_if_defined(MergePid) when is_pid(MergePid) ->
@ -785,10 +803,12 @@ begin_merge(State) ->
try
?log("merge begun~n", []),
{ok, OutCount} = hanoidb_merger:merge(AFileName, BFileName, XFileName,
?BTREE_SIZE(State#state.level + 1),
State#state.next =:= undefined,
State#state.opts ),
{ok, OutCount} = hanoidb_backend:merge(
State#state.backend,
AFileName, BFileName, XFileName,
?BTREE_SIZE(State#state.level + 1),
State#state.next =:= undefined,
State#state.opts ),
Owner ! ?CAST(self(),{merge_done, OutCount, XFileName})
catch
@ -805,9 +825,10 @@ begin_merge(State) ->
close_and_delete_a_and_b(State) ->
AFileName = filename("A",State),
BFileName = filename("B",State),
Mod = State#state.backend,
ok = hanoidb_reader:close(State#state.a),
ok = hanoidb_reader:close(State#state.b),
ok = Mod:close_random_reader(State#state.a),
ok = Mod:close_random_reader(State#state.b),
ok = file:delete(AFileName),
ok = file:delete(BFileName),
@ -821,14 +842,15 @@ filename(PFX, State) ->
start_range_fold(FileName, WorkerPID, Range, State) ->
Owner = self(),
Mod = State#state.backend,
PID = proc_lib:spawn( fun() ->
try
?log("start_range_fold ~p on ~p -> ~p", [self(), FileName, WorkerPID]),
erlang:link(WorkerPID),
{ok, File} = hanoidb_reader:open(FileName, [folding|State#state.opts]),
do_range_fold2(File, WorkerPID, self(), Range),
{ok, File} = Mod:open_random_reader(FileName, [folding|State#state.opts]),
do_range_fold2(Mod,File, WorkerPID, self(), Range),
erlang:unlink(WorkerPID),
hanoidb_reader:close(File),
Mod:close_random_reader(File),
%% this will release the pinning of the fold file
Owner ! {range_fold_done, self(), FileName},
@ -840,12 +862,13 @@ start_range_fold(FileName, WorkerPID, Range, State) ->
end ),
{ok, PID}.
-spec do_range_fold(BT :: hanoidb_reader:read_file(),
-spec do_range_fold(Mod :: atom(),
BT :: hanoidb_backend:random_reader(),
WorkerPID :: pid(),
SelfOrRef :: pid() | reference(),
Range :: #key_range{} ) -> ok.
do_range_fold(BT, WorkerPID, SelfOrRef, Range) ->
case hanoidb_reader:range_fold(fun(Key,Value,_) ->
do_range_fold(Mod, BT, WorkerPID, SelfOrRef, Range) ->
case Mod:range_fold(fun(Key,Value,_) ->
WorkerPID ! {level_result, SelfOrRef, Key, Value},
ok
end,
@ -863,12 +886,13 @@ do_range_fold(BT, WorkerPID, SelfOrRef, Range) ->
-define(FOLD_CHUNK_SIZE, 100).
-spec do_range_fold2(BT :: hanoidb_reader:read_file(),
-spec do_range_fold2(Mod :: atom(),
BT :: hanoidb_reader:read_file(),
WorkerPID :: pid(),
SelfOrRef :: pid() | reference(),
Range :: #key_range{} ) -> ok.
do_range_fold2(BT, WorkerPID, SelfOrRef, Range) ->
try hanoidb_reader:range_fold(fun(Key,Value,{0,KVs}) ->
do_range_fold2(Mod, BT, WorkerPID, SelfOrRef, Range) ->
try Mod:range_fold(fun(Key,Value,{0,KVs}) ->
send(WorkerPID, SelfOrRef, [{Key,Value}|KVs]),
{?FOLD_CHUNK_SIZE-1, []};
(Key,Value,{N,KVs}) ->

View file

@ -99,7 +99,7 @@ read_nursery_from_log(Directory, MaxLevel, Config) ->
-spec do_add(#nursery{}, binary(), binary()|?TOMBSTONE, non_neg_integer() | infinity, pid()) -> {ok, #nursery{}} | {full, #nursery{}}.
do_add(Nursery, Key, Value, infinity, Top) ->
do_add(Nursery, Key, Value, 0, Top);
do_add(Nursery=#nursery{log_file=File, cache=Cache, total_size=TotalSize, count=Count, config=Config}, Key, Value, KeyExpiryTime, Top) ->
do_add(Nursery=#nursery{log_file=File, cache=Cache, total_size=TotalSize, count=Count, config=Config}, Key, Value, KeyExpiryTime, Top) when is_integer(KeyExpiryTime) ->
DatabaseExpiryTime = hanoidb:get_opt(expiry_secs, Config),
{Data, Cache2} =
@ -175,7 +175,7 @@ lookup(Key, #nursery{cache=Cache}) ->
%% @end
-spec finish(Nursery::#nursery{}, TopLevel::pid()) -> ok.
finish(#nursery{ dir=Dir, cache=Cache, log_file=LogFile, merge_done=DoneMerge,
count=Count, config=Config }, TopLevel) ->
count=Count, config=Config, backend=Backend }, TopLevel) ->
hanoidb_util:ensure_expiry(Config),
@ -189,16 +189,17 @@ finish(#nursery{ dir=Dir, cache=Cache, log_file=LogFile, merge_done=DoneMerge,
N when N > 0 ->
%% next, flush cache to a new BTree
BTreeFileName = filename:join(Dir, "nursery.data"),
{ok, BT} = hanoidb_writer:open(BTreeFileName, [{size, ?BTREE_SIZE(?TOP_LEVEL)},
{ok, BT} = Backend:open_batch_writer(BTreeFileName, [{size, ?BTREE_SIZE(?TOP_LEVEL)},
{compress, none} | Config]),
try
ok = gb_trees_ext:fold(fun(Key, Value, Acc) ->
ok = hanoidb_writer:add(BT, Key, Value),
Acc
end, ok, Cache)
after
ok = hanoidb_writer:close(BT)
end,
BT3 = gb_trees_ext:fold(fun(Key, {Value,Expiry}, BT1) ->
{ok, BT2} = Backend:write_next({Key,Value,Expiry}, BT1),
BT2;
(Key, Value, BT1) ->
{ok, BT2} = Backend:write_next({Key,Value,infinity}, BT1),
BT2
end, BT, Cache),
ok = Backend:close_batch_writer(BT3),
%% Inject the B-Tree (blocking RPC)
ok = hanoidb_level:inject(TopLevel, BTreeFileName),

View file

@ -33,7 +33,7 @@
-define(ASSERT_WHEN(X), when X).
-export([open/1, open/2,close/1,lookup/2,fold/3,range_fold/4, destroy/1]).
-export([first_node/1,next_node/1]).
-export([first_node/1,next_node/1,file_name/1]).
-export([serialize/1, deserialize/1]).
-record(node, {level :: non_neg_integer(),
@ -61,6 +61,7 @@ open(Name, Config) ->
ReadBufferSize = hanoidb:get_opt(read_buffer_size, Config, 512 * 1024),
case file:open(Name, [raw,read,{read_ahead, ReadBufferSize},binary]) of
{ok, File} ->
{ok, Pos} = file:position(File, ?FIRST_BLOCK_POS),
{ok, #index{file=File, name=Name, config=Config}};
{error, _}=Err ->
Err
@ -99,6 +100,9 @@ open(Name, Config) ->
{ok, #index{file=File, root=Root, bloom=Bloom, name=Name, config=Config}}
end.
file_name(#index{name=Name}) ->
{ok, Name}.
destroy(#index{file=File, name=Name}) ->
ok = file:close(File),
file:delete(Name).
@ -130,7 +134,9 @@ fold1(File,Fun,Acc0) ->
eof ->
Acc0;
{ok, Node} ->
fold0(File,Fun,Node,Acc0)
fold0(File,Fun,Node,Acc0);
{error,_}=Err ->
exit(Err)
end.
-spec range_fold(function(), any(), #index{}, #key_range{}) ->
@ -193,7 +199,10 @@ do_range_fold(Fun, Acc0, File, Range, undefined) ->
{stopped, Result} -> Result;
{ok, Acc1} ->
do_range_fold(Fun, Acc1, File, Range, undefined)
end
end;
{error,_}=Err ->
Err
end;
do_range_fold(Fun, Acc0, File, Range, N0) ->
@ -230,7 +239,10 @@ do_range_fold(Fun, Acc0, File, Range, N0) ->
{stopped, Result} -> Result;
{ok, {N2, Acc1}} ->
do_range_fold(Fun, Acc1, File, Range, N2)
end
end;
{error,_}=Err ->
Err
end.
lookup_node(_File,_FromKey,#node{level=0},Pos) ->
@ -269,7 +281,10 @@ next_node(#index{file=File}=_Index) ->
% {ok, #node{level=N}} when N>0 ->
% next_node(Index);
eof ->
end_of_data
end_of_data;
{error,_}=Err ->
Err
end.
close(#index{file=undefined}) ->
@ -413,6 +428,8 @@ next_leaf_node(File) ->
hanoidb_util:decode_index_node(0, Data);
{ok, <<Len:32/unsigned, _:16/unsigned>>} ->
{ok, _} = file:position(File, {cur,Len-2}),
next_leaf_node(File)
next_leaf_node(File);
{error,_}=Err ->
Err
end.