From 3a43d9235bf18d06d9672d18b6e0c793e939fe41 Mon Sep 17 00:00:00 2001 From: Kresten Krab Thorup Date: Mon, 22 Oct 2012 02:00:47 +0200 Subject: [PATCH] Separate out hanoid_backend API First step towards a hanoidb backend API, which will allow replacing the storage engine with any ordered storage. We still need an API for how to choose backend and perhaps also how to discover the backend of an existing store. --- include/plain_rpc.hrl | 2 + src/hanoidb_backend.erl | 174 +++++++++++++++++++++++++++++++++++ src/hanoidb_han2_backend.erl | 67 ++++++++++++++ src/hanoidb_level.erl | 156 ++++++++++++++++++------------- src/hanoidb_reader.erl | 29 ++++-- 5 files changed, 356 insertions(+), 72 deletions(-) create mode 100644 src/hanoidb_backend.erl create mode 100644 src/hanoidb_han2_backend.erl diff --git a/include/plain_rpc.hrl b/include/plain_rpc.hrl index 09e68dd..3fc5dc0 100644 --- a/include/plain_rpc.hrl +++ b/include/plain_rpc.hrl @@ -27,3 +27,5 @@ -define(REPLY(Ref,Msg), {'$reply', Ref, Msg}). -define(CAST(From,Msg), {'$cast', From, Msg}). +-type caller() :: { pid(), reference() }. + diff --git a/src/hanoidb_backend.erl b/src/hanoidb_backend.erl new file mode 100644 index 0000000..37af86b --- /dev/null +++ b/src/hanoidb_backend.erl @@ -0,0 +1,174 @@ + +-module(hanoidb_backend). + +-include("include/hanoidb.hrl"). +-include("src/hanoidb.hrl"). + +-type options() :: [ atom() | { atom(), term() } ]. +-type kvexp_entry() :: { Key :: key(), Value :: value(), TimeOut :: expiry() }. +-type batch_reader() :: any(). +-type batch_writer() :: any(). +-type random_reader() :: any(). + +-export([merge/7]). + +%%%========================================================================= +%%% API +%%%========================================================================= + +%% batch_reader and batch_writer are used by the merging logic. A batch_reader +%% must return the values in lexicographical order of the binary keys. + +-callback open_batch_reader(File :: string(), Options :: options()) + -> {ok, batch_reader()} | { error, term() }. +-callback read_next(batch_reader()) + -> { [kvexp_entry(), ...], batch_reader()} | 'done'. +-callback close_batch_reader( batch_reader() ) + -> ok | {error, term()}. + + +-callback open_batch_writer(File :: string(), Options :: options()) + -> {ok, batch_writer()} | {error, term()}. +-callback write_next( kvexp_entry() , batch_writer() ) + -> {ok, batch_writer()} | {error, term()}. +-callback write_count( batch_writer() ) -> + {ok, non_neg_integer()} | {error, term()}. +-callback close_batch_writer( batch_writer() ) + -> ok | {error, term()}. + + +-callback open_random_reader(File :: string(), Options :: options()) -> + {ok, random_reader()} | {error, term()}. +-callback file_name( random_reader() ) -> + {ok, string()} | {error, term()}. +-callback lookup( Key :: key(), random_reader() ) -> + not_found | {ok, value()}. +-callback range_fold( fun( (key(), value(), term()) -> term() ), + Acc0 :: term(), + Reader :: random_reader(), + Range :: #key_range{} ) -> + {limit, term(), LastKey :: binary()} | {ok, term()}. +-callback close_random_reader(random_reader()) -> + ok | {error, term()}. + + + + +-spec merge(atom(), string(), string(), string(), integer(), boolean(), list()) -> {ok, integer()}. +merge(Mod,A,B,C, Size, IsLastLevel, Options) -> + {ok, IXA} = Mod:open_batch_reader(A, Options), + {ok, IXB} = Mod:open_batch_reader(B, Options), + {ok, Out} = Mod:open_batch_writer(C, [{size, Size} | Options]), + scan(Mod,IXA, IXB, Out, IsLastLevel, [], [], {0, none}). + +terminate(Mod, Out) -> + {ok, Count} = Mod:write_count( Out ), + ok = Mod:close_batch_writer( Out ), + {ok, Count}. + +step(S) -> + step(S, 1). + +step({N, From}, Steps) -> + {N-Steps, From}. + + +scan(Mod, IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {N, FromPID}) when N < 1, AKVs =/= [], BKVs =/= [] -> + case FromPID of + none -> + ok; + {PID, Ref} -> + PID ! {Ref, step_done} + end, + + receive + {step, From, HowMany} -> + scan(Mod, IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {N+HowMany, From}) + end; + +scan(Mod, IXA, IXB, Out, IsLastLevel, [], BKVs, Step) -> + case Mod:read_next(IXA) of + {AKVs, IXA2} -> + scan(Mod, IXA2, IXB, Out, IsLastLevel, AKVs, BKVs, Step); + done -> + ok = Mod:close_batch_reader(IXA), + scan_only(Mod, IXB, Out, IsLastLevel, BKVs, Step) + end; + +scan(Mod, IXA, IXB, Out, IsLastLevel, AKVs, [], Step) -> + case Mod:read_next(IXB) of + {BKVs, IXB2} -> + scan(Mod, IXA, IXB2, Out, IsLastLevel, AKVs, BKVs, Step); + done -> + ok = Mod:close_batch_reader(IXB), + scan_only(Mod, IXA, Out, IsLastLevel, AKVs, Step) + end; + +scan(Mod, IXA, IXB, Out, IsLastLevel, [{Key1,_,_}=Entry|AT], [{Key2,_,_}|_]=BKVs, Step) + when Key1 < Key2 -> + case Entry of + {_, ?TOMBSTONE, _} when IsLastLevel -> + scan(Mod, IXA, IXB, Out, true, AT, BKVs, step(Step)); + _ -> + {ok, Out3} = Mod:write_next( Entry, Out ), + scan(Mod, IXA, IXB, Out3, IsLastLevel, AT, BKVs, step(Step)) + end; +scan(Mod, IXA, IXB, Out, IsLastLevel, [{Key1,_,_}|_]=AKVs, [{Key2,_,_}=Entry|BT], Step) + when Key1 > Key2 -> + case Entry of + {_, ?TOMBSTONE, _} when IsLastLevel -> + scan(Mod, IXA, IXB, Out, true, AKVs, BT, step(Step)); + _ -> + {ok, Out3} = Mod:write_next( Entry, Out ), + scan(Mod, IXA, IXB, Out3, IsLastLevel, AKVs, BT, step(Step)) + end; +scan(Mod, IXA, IXB, Out, IsLastLevel, [_|AT], [Entry|BT], Step) -> + case Entry of + {_, ?TOMBSTONE, _} when IsLastLevel -> + scan(Mod, IXA, IXB, Out, true, AT, BT, step(Step)); + _ -> + {ok, Out3} = Mod:write_next( Entry, Out ), + scan(Mod, IXA, IXB, Out3, IsLastLevel, AT, BT, step(Step, 2)) + end. + + +scan_only(Mod, IX, Out, IsLastLevel, KVs, {N, FromPID}) when N < 1, KVs =/= [] -> + case FromPID of + none -> + ok; + {PID, Ref} -> + PID ! {Ref, step_done} + end, + + receive + {step, From, HowMany} -> + scan_only(Mod, IX, Out, IsLastLevel, KVs, {N+HowMany, From}) + end; + +scan_only(Mod, IX, Out, IsLastLevel, [], {_, FromPID}=Step) -> + case Mod:read_next(IX) of + {KVs, IX2} -> + scan_only(Mod, IX2, Out, IsLastLevel, KVs, Step); + done -> + case FromPID of + none -> + ok; + {PID, Ref} -> + PID ! {Ref, step_done} + end, + ok = Mod:close_batch_reader(IX), + terminate(Mod, Out) + end; + +scan_only(Mod, IX, Out, true, [{_,?TOMBSTONE,_}|Rest], Step) -> + scan_only(Mod, IX, Out, true, Rest, step(Step)); + +scan_only(Mod, IX, Out, IsLastLevel, [Entry|Rest], Step) -> + {ok, Out3} = Mod:write_next( Entry, Out ), + scan_only(Mod, IX, Out3, IsLastLevel, Rest, step(Step)). + + + + + + diff --git a/src/hanoidb_han2_backend.erl b/src/hanoidb_han2_backend.erl new file mode 100644 index 0000000..1cd820b --- /dev/null +++ b/src/hanoidb_han2_backend.erl @@ -0,0 +1,67 @@ +-module(hanoidb_han2_backend). + +-include("hanoidb.hrl"). + +-behavior(hanoidb_backend). + +-export([open_random_reader/2, file_name/1, range_fold/4, lookup/2, close_random_reader/1]). +-export([open_batch_reader/2, read_next/1, close_batch_reader/1]). +-export([open_batch_writer/2, write_next/2, write_count/1, close_batch_writer/1]). + + +open_random_reader(Name, Options) -> + hanoidb_reader:open(Name, [random|Options]). + +file_name(Reader) -> + hanoidb_reader:file_name(Reader). + +lookup(Key, Reader) -> + hanoidb_reader:lookup(Reader, Key). + +range_fold(Fun, Acc, Reader, Range) -> + hanoidb_reader:range_fold(Fun, Acc, Reader, Range). + +close_random_reader(Reader) -> + hanoidb_reader:close(Reader). + + + +open_batch_reader(Name, Options) -> + hanoidb_reader:open(Name, [sequential|Options]). + +read_next(Reader) -> + case hanoidb_reader:next_node(Reader) of + {node, KVs} -> + {[ unfold(KV) || KV <- KVs], Reader}; + end_of_data -> + 'done'; + {error, _}=Err -> + Err + end. + +unfold({Key,{Value, Expiry}}) when is_binary(Value); ?TOMBSTONE =:= Value -> + {Key,Value,Expiry}; +unfold({Key,Value}) -> + {Key, Value, infinity}. + +close_batch_reader(Reader) -> + hanoidb_reader:close(Reader). + +open_batch_writer(Name, Options) -> + hanoidb_writer:init([Name, Options]). + +write_next( {Key, Value, Expiry}, Writer) -> + {noreply, Writer2} = hanoidb_writer:handle_cast({add, Key, {Value, Expiry}}, Writer), + {ok, Writer2}. + +write_count( Writer ) -> + case hanoidb_writer:handle_call(count, self(), Writer) of + {ok, Count, _} -> + {ok, Count}; + Err -> + {error, Err} + end. + +close_batch_writer(Writer) -> + {stop, normal, ok, _} = hanoidb_writer:handle_call(close, self(), Writer), + ok. diff --git a/src/hanoidb_level.erl b/src/hanoidb_level.erl index fa633fc..8ea3ef0 100644 --- a/src/hanoidb_level.erl +++ b/src/hanoidb_level.erl @@ -51,9 +51,24 @@ -include_lib("kernel/include/file.hrl"). -record(state, { - a, b, c, next, dir, level, inject_done_ref, merge_pid, folding = [], - step_next_ref, step_caller, step_merge_ref, - opts = [], owner, work_in_progress=0, work_done=0, max_level=?TOP_LEVEL + a :: undefined | hanoidb_backend:random_reader(), + b :: undefined | hanoidb_backend:random_reader(), + c :: undefined | hanoidb_backend:random_reader(), + next :: pid() | undefined, + dir :: string(), + level :: non_neg_integer(), + inject_done_ref :: reference(), + merge_pid :: pid(), + folding = [] :: list( pid() ), + step_next_ref :: reference() | undefined, + step_caller :: plain_rpc:caller() | undefined, + step_merge_ref :: reference() | undefined, + opts = [] :: list(), + owner :: pid(), + work_in_progress=0 :: non_neg_integer(), + work_done=0 :: non_neg_integer(), + max_level=?TOP_LEVEL :: pos_integer(), + backend = hanoidb_han2_backend :: atom() }). @@ -177,6 +192,8 @@ initialize2(State) -> file:delete(filename("BF",State)), file:delete(filename("CF",State)), + Mod = State#state.backend, + case file:read_file_info(MFileName) of {ok, _} -> @@ -188,12 +205,12 @@ initialize2(State) -> file:delete(BFileName), ok = file:rename(MFileName, AFileName), - {ok, IXA} = hanoidb_reader:open(AFileName, [random|State#state.opts]), + {ok, IXA} = Mod:open_random_reader(AFileName, [random|State#state.opts]), case file:read_file_info(CFileName) of {ok, _} -> file:rename(CFileName, BFileName), - {ok, IXB} = hanoidb_reader:open(BFileName, [random|State#state.opts]), + {ok, IXB} = Mod:open_random_reader(BFileName, [random|State#state.opts]), check_begin_merge_then_loop0(init_state(State#state{ a= IXA, b=IXB })); {error, enoent} -> @@ -203,13 +220,13 @@ initialize2(State) -> {error, enoent} -> case file:read_file_info(BFileName) of {ok, _} -> - {ok, IXA} = hanoidb_reader:open(AFileName, [random|State#state.opts]), - {ok, IXB} = hanoidb_reader:open(BFileName, [random|State#state.opts]), + {ok, IXA} = Mod:open_random_reader(AFileName, [random|State#state.opts]), + {ok, IXB} = Mod:open_random_reader(BFileName, [random|State#state.opts]), IXC = case file:read_file_info(CFileName) of {ok, _} -> - {ok, C} = hanoidb_reader:open(CFileName, [random|State#state.opts]), + {ok, C} = Mod:open_random_reader(CFileName, [random|State#state.opts]), C; {error, enoent} -> undefined @@ -224,7 +241,7 @@ initialize2(State) -> case file:read_file_info(AFileName) of {ok, _} -> - {ok, IXA} = hanoidb_reader:open(AFileName, [random|State#state.opts]), + {ok, IXA} = Mod:open_random_reader(AFileName, [random|State#state.opts]), main_loop(init_state(State#state{ a=IXA })); {error, enoent} -> @@ -263,28 +280,28 @@ check_begin_merge_then_loop(State=#state{a=IXA, b=IXB, merge_pid=undefined}) check_begin_merge_then_loop(State) -> main_loop(State). -main_loop(State = #state{ next=Next }) -> +main_loop(State = #state{ next=Next, backend=Mod }) -> Parent = plain_fsm:info(parent), receive ?CALL(From, {lookup, Key})=Req -> - case do_lookup(Key, [State#state.c, State#state.b, State#state.a, Next]) of - not_found -> + case do_lookup(Mod, Key, [State#state.c, State#state.b, State#state.a]) of + not_found when Next =:= undefined -> plain_rpc:send_reply(From, not_found); {found, Result} -> plain_rpc:send_reply(From, {ok, Result}); - {delegate, DelegatePid} -> - DelegatePid ! Req + not_found -> + Next ! Req end, main_loop(State); ?CAST(_From, {lookup, Key, ReplyFun})=Req -> - case do_lookup(Key, [State#state.c, State#state.b, State#state.a, Next]) of - not_found -> + case do_lookup(Mod, Key, [State#state.c, State#state.b, State#state.a]) of + not_found when Next =:= undefined -> ReplyFun(not_found); {found, Result} -> ReplyFun({ok, Result}); - {delegate, DelegatePid} -> - DelegatePid ! Req + not_found -> + Next ! Req end, main_loop(State); @@ -310,7 +327,7 @@ main_loop(State = #state{ next=Next }) -> plain_rpc:send_reply(From, ok), - case hanoidb_reader:open(ToFileName, [random|State#state.opts]) of + case Mod:open_random_reader(ToFileName, [random|State#state.opts]) of {ok, BT} -> if SetPos == #state.b -> check_begin_merge_then_loop(setelement(SetPos, State, BT)); @@ -406,9 +423,9 @@ main_loop(State = #state{ next=Next }) -> main_loop(State2#state{ step_next_ref=undefined }); ?CALL(From, close) -> - close_if_defined(State#state.a), - close_if_defined(State#state.b), - close_if_defined(State#state.c), + close_if_defined(Mod, State#state.a), + close_if_defined(Mod, State#state.b), + close_if_defined(Mod, State#state.c), [stop_if_defined(PID) || PID <- [State#state.merge_pid | State#state.folding]], %% this is synchronous all the way down, because our @@ -422,9 +439,9 @@ main_loop(State = #state{ next=Next }) -> {ok, closing}; ?CALL(From, destroy) -> - destroy_if_defined(State#state.a), - destroy_if_defined(State#state.b), - destroy_if_defined(State#state.c), + destroy_if_defined(Mod,State#state.a), + destroy_if_defined(Mod,State#state.b), + destroy_if_defined(Mod,State#state.c), [stop_if_defined(PID) || PID <- [State#state.merge_pid | State#state.folding]], %% this is synchronous all the way down, because our @@ -496,27 +513,27 @@ main_loop(State = #state{ next=Next }) -> {_, undefined, undefined} -> ARef = erlang:make_ref(), - ok = do_range_fold(State#state.a, WorkerPID, ARef, Range), + ok = do_range_fold(Mod,State#state.a, WorkerPID, ARef, Range), [ARef|List]; {_, _, undefined} -> BRef = erlang:make_ref(), - ok = do_range_fold(State#state.b, WorkerPID, BRef, Range), + ok = do_range_fold(Mod,State#state.b, WorkerPID, BRef, Range), ARef = erlang:make_ref(), - ok = do_range_fold(State#state.a, WorkerPID, ARef, Range), + ok = do_range_fold(Mod,State#state.a, WorkerPID, ARef, Range), [ARef,BRef|List]; {_, _, _} -> CRef = erlang:make_ref(), - ok = do_range_fold(State#state.c, WorkerPID, CRef, Range), + ok = do_range_fold(Mod,State#state.c, WorkerPID, CRef, Range), BRef = erlang:make_ref(), - ok = do_range_fold(State#state.b, WorkerPID, BRef, Range), + ok = do_range_fold(Mod,State#state.b, WorkerPID, BRef, Range), ARef = erlang:make_ref(), - ok = do_range_fold(State#state.a, WorkerPID, ARef, Range), + ok = do_range_fold(Mod,State#state.a, WorkerPID, ARef, Range), [ARef,BRef,CRef|List] end, @@ -542,9 +559,9 @@ main_loop(State = #state{ next=Next }) -> undefined -> main_loop(State2#state{ merge_pid=undefined }); CFile -> - ok = hanoidb_reader:close(CFile), + ok = Mod:close_random_reader(CFile), ok = file:rename(filename("C", State2), filename("A", State2)), - {ok, AFile} = hanoidb_reader:open(filename("A", State2), [random|State#state.opts]), + {ok, AFile} = Mod:open_random_reader(filename("A", State2), [random|State#state.opts]), main_loop(State2#state{ a = AFile, c = undefined, merge_pid=undefined }) end; @@ -565,7 +582,7 @@ main_loop(State = #state{ next=Next }) -> % then, rename M to A, and open it AFileName = filename("A",State2), ok = file:rename(MFileName, AFileName), - {ok, AFile} = hanoidb_reader:open(AFileName, [random|State#state.opts]), + {ok, AFile} = Mod:open_random_reader(AFileName, [random|State#state.opts]), % iff there is a C file, then move it to B position % TODO: consider recovery for this @@ -573,9 +590,9 @@ main_loop(State = #state{ next=Next }) -> undefined -> main_loop(State2#state{ a=AFile, b=undefined, merge_pid=undefined }); CFile -> - ok = hanoidb_reader:close(CFile), + ok = Mod:close_random_reader(CFile), ok = file:rename(filename("C", State2), filename("B", State2)), - {ok, BFile} = hanoidb_reader:open(filename("B", State2), [random|State#state.opts]), + {ok, BFile} = Mod:open_random_reader(filename("B", State2), [random|State#state.opts]), check_begin_merge_then_loop(State2#state{ a=AFile, b=BFile, c=undefined, merge_pid=undefined }) end; @@ -722,9 +739,9 @@ reply_step_ok(State) -> case State#state.step_caller of undefined -> ok; - _ -> - ?log("step_ok -> ~p", [State#state.step_caller]), - plain_rpc:send_reply(State#state.step_caller, step_ok) + Caller -> + ?log("step_ok -> ~p", [Caller]), + plain_rpc:send_reply(Caller, step_ok) end, State#state{ step_caller=undefined }. @@ -738,27 +755,28 @@ total_unmerged(State) -> -do_lookup(_Key, []) -> +do_lookup(_, _Key, []) -> not_found; -do_lookup(_Key, [Pid]) when is_pid(Pid) -> - {delegate, Pid}; -do_lookup(Key, [undefined|Rest]) -> - do_lookup(Key, Rest); -do_lookup(Key, [BT|Rest]) -> - case hanoidb_reader:lookup(BT, Key) of +do_lookup(Mod, Key, [undefined|Rest]) -> + do_lookup(Mod, Key, Rest); +do_lookup(Mod, Key, [BT|Rest]) -> + case Mod:lookup(Key, BT) of {ok, ?TOMBSTONE} -> not_found; {ok, Result} -> {found, Result}; not_found -> - do_lookup(Key, Rest) + do_lookup(Mod,Key, Rest) end. -close_if_defined(undefined) -> ok; -close_if_defined(BT) -> hanoidb_reader:close(BT). +close_if_defined(_, undefined) -> ok; +close_if_defined(Mod, BT) -> Mod:close_random_reader(BT). -destroy_if_defined(undefined) -> ok; -destroy_if_defined(BT) -> hanoidb_reader:destroy(BT). +destroy_if_defined(_, undefined) -> ok; +destroy_if_defined(Mod, BT) -> + {ok, Name} = Mod:file_name(BT), + Mod:close_random_reader(BT), + file:delete(Name). stop_if_defined(undefined) -> ok; stop_if_defined(MergePid) when is_pid(MergePid) -> @@ -785,10 +803,12 @@ begin_merge(State) -> try ?log("merge begun~n", []), - {ok, OutCount} = hanoidb_merger:merge(AFileName, BFileName, XFileName, - ?BTREE_SIZE(State#state.level + 1), - State#state.next =:= undefined, - State#state.opts ), + {ok, OutCount} = hanoidb_backend:merge( + State#state.backend, + AFileName, BFileName, XFileName, + ?BTREE_SIZE(State#state.level + 1), + State#state.next =:= undefined, + State#state.opts ), Owner ! ?CAST(self(),{merge_done, OutCount, XFileName}) catch @@ -805,9 +825,10 @@ begin_merge(State) -> close_and_delete_a_and_b(State) -> AFileName = filename("A",State), BFileName = filename("B",State), + Mod = State#state.backend, - ok = hanoidb_reader:close(State#state.a), - ok = hanoidb_reader:close(State#state.b), + ok = Mod:close_random_reader(State#state.a), + ok = Mod:close_random_reader(State#state.b), ok = file:delete(AFileName), ok = file:delete(BFileName), @@ -821,14 +842,15 @@ filename(PFX, State) -> start_range_fold(FileName, WorkerPID, Range, State) -> Owner = self(), + Mod = State#state.backend, PID = proc_lib:spawn( fun() -> try ?log("start_range_fold ~p on ~p -> ~p", [self(), FileName, WorkerPID]), erlang:link(WorkerPID), - {ok, File} = hanoidb_reader:open(FileName, [folding|State#state.opts]), - do_range_fold2(File, WorkerPID, self(), Range), + {ok, File} = Mod:open_random_reader(FileName, [folding|State#state.opts]), + do_range_fold2(Mod,File, WorkerPID, self(), Range), erlang:unlink(WorkerPID), - hanoidb_reader:close(File), + Mod:close_random_reader(File), %% this will release the pinning of the fold file Owner ! {range_fold_done, self(), FileName}, @@ -840,12 +862,13 @@ start_range_fold(FileName, WorkerPID, Range, State) -> end ), {ok, PID}. --spec do_range_fold(BT :: hanoidb_reader:read_file(), +-spec do_range_fold(Mod :: atom(), + BT :: hanoidb_backend:random_reader(), WorkerPID :: pid(), SelfOrRef :: pid() | reference(), Range :: #key_range{} ) -> ok. -do_range_fold(BT, WorkerPID, SelfOrRef, Range) -> - case hanoidb_reader:range_fold(fun(Key,Value,_) -> +do_range_fold(Mod, BT, WorkerPID, SelfOrRef, Range) -> + case Mod:range_fold(fun(Key,Value,_) -> WorkerPID ! {level_result, SelfOrRef, Key, Value}, ok end, @@ -863,12 +886,13 @@ do_range_fold(BT, WorkerPID, SelfOrRef, Range) -> -define(FOLD_CHUNK_SIZE, 100). --spec do_range_fold2(BT :: hanoidb_reader:read_file(), +-spec do_range_fold2(Mod :: atom(), + BT :: hanoidb_reader:read_file(), WorkerPID :: pid(), SelfOrRef :: pid() | reference(), Range :: #key_range{} ) -> ok. -do_range_fold2(BT, WorkerPID, SelfOrRef, Range) -> - try hanoidb_reader:range_fold(fun(Key,Value,{0,KVs}) -> +do_range_fold2(Mod, BT, WorkerPID, SelfOrRef, Range) -> + try Mod:range_fold(fun(Key,Value,{0,KVs}) -> send(WorkerPID, SelfOrRef, [{Key,Value}|KVs]), {?FOLD_CHUNK_SIZE-1, []}; (Key,Value,{N,KVs}) -> diff --git a/src/hanoidb_reader.erl b/src/hanoidb_reader.erl index 7c2cf06..2fd7d63 100644 --- a/src/hanoidb_reader.erl +++ b/src/hanoidb_reader.erl @@ -33,7 +33,7 @@ -define(ASSERT_WHEN(X), when X). -export([open/1, open/2,close/1,lookup/2,fold/3,range_fold/4, destroy/1]). --export([first_node/1,next_node/1]). +-export([first_node/1,next_node/1,file_name/1]). -export([serialize/1, deserialize/1]). -record(node, {level :: non_neg_integer(), @@ -61,6 +61,7 @@ open(Name, Config) -> ReadBufferSize = hanoidb:get_opt(read_buffer_size, Config, 512 * 1024), case file:open(Name, [raw,read,{read_ahead, ReadBufferSize},binary]) of {ok, File} -> + {ok, Pos} = file:position(File, ?FIRST_BLOCK_POS), {ok, #index{file=File, name=Name, config=Config}}; {error, _}=Err -> Err @@ -99,6 +100,9 @@ open(Name, Config) -> {ok, #index{file=File, root=Root, bloom=Bloom, name=Name, config=Config}} end. +file_name(#index{name=Name}) -> + {ok, Name}. + destroy(#index{file=File, name=Name}) -> ok = file:close(File), file:delete(Name). @@ -130,7 +134,9 @@ fold1(File,Fun,Acc0) -> eof -> Acc0; {ok, Node} -> - fold0(File,Fun,Node,Acc0) + fold0(File,Fun,Node,Acc0); + {error,_}=Err -> + exit(Err) end. -spec range_fold(function(), any(), #index{}, #key_range{}) -> @@ -193,7 +199,10 @@ do_range_fold(Fun, Acc0, File, Range, undefined) -> {stopped, Result} -> Result; {ok, Acc1} -> do_range_fold(Fun, Acc1, File, Range, undefined) - end + end; + + {error,_}=Err -> + Err end; do_range_fold(Fun, Acc0, File, Range, N0) -> @@ -230,7 +239,10 @@ do_range_fold(Fun, Acc0, File, Range, N0) -> {stopped, Result} -> Result; {ok, {N2, Acc1}} -> do_range_fold(Fun, Acc1, File, Range, N2) - end + end; + + {error,_}=Err -> + Err end. lookup_node(_File,_FromKey,#node{level=0},Pos) -> @@ -269,7 +281,10 @@ next_node(#index{file=File}=_Index) -> % {ok, #node{level=N}} when N>0 -> % next_node(Index); eof -> - end_of_data + end_of_data; + + {error,_}=Err -> + Err end. close(#index{file=undefined}) -> @@ -413,6 +428,8 @@ next_leaf_node(File) -> hanoidb_util:decode_index_node(0, Data); {ok, <>} -> {ok, _} = file:position(File, {cur,Len-2}), - next_leaf_node(File) + next_leaf_node(File); + {error,_}=Err -> + Err end.