Remove most dialyzer warnings
This is a large change set, which adds type specs needed for dialyzer to be happy. We likely need even more type specs as we move forward.
This commit is contained in:
parent
adab9556db
commit
0449e442f7
13 changed files with 118 additions and 125 deletions
8
Makefile
8
Makefile
|
@ -30,14 +30,22 @@ clean-test-btrees:
|
|||
|
||||
plt: compile
|
||||
$(DIALYZER) --build_plt --output_plt .hanoi.plt \
|
||||
-pa deps/snappy/ebin \
|
||||
-pa deps/plain_fsm/ebin \
|
||||
deps/plain_fsm/ebin \
|
||||
--apps kernel stdlib
|
||||
|
||||
analyze: compile
|
||||
$(DIALYZER) --plt .hanoi.plt \
|
||||
-pa deps/snappy/ebin \
|
||||
-pa deps/plain_fsm/ebin \
|
||||
ebin
|
||||
|
||||
analyze-nospec: compile
|
||||
$(DIALYZER) --plt .hanoi.plt \
|
||||
-pa deps/plain_fsm/ebin \
|
||||
--no_spec \
|
||||
ebin
|
||||
|
||||
repl:
|
||||
erl -pz deps/*/ebin -pa ebin
|
||||
|
|
|
@ -24,11 +24,11 @@
|
|||
|
||||
-module(basho_bench_driver_hanoidb).
|
||||
|
||||
-record(state, { tree,
|
||||
filename,
|
||||
flags,
|
||||
sync_interval,
|
||||
last_sync }).
|
||||
-record(state, { tree :: hanoidb:hanoidb(),
|
||||
filename :: string(),
|
||||
flags :: list(),
|
||||
sync_interval :: integer() | infinity,
|
||||
last_sync :: erlang:timestamp() }).
|
||||
|
||||
-export([new/1,
|
||||
run/4]).
|
||||
|
@ -62,11 +62,12 @@ new(_Id) ->
|
|||
Config = basho_bench_config:get(hanoidb_flags, []),
|
||||
|
||||
%% Look for sync interval config
|
||||
SyncInterval =
|
||||
case basho_bench_config:get(hanoidb_sync_interval, infinity) of
|
||||
Value when is_integer(Value) ->
|
||||
SyncInterval = Value;
|
||||
Value;
|
||||
infinity ->
|
||||
SyncInterval = infinity
|
||||
infinity
|
||||
end,
|
||||
|
||||
%% Get any bitcask flags
|
||||
|
@ -106,7 +107,7 @@ run(delete, KeyGen, _ValueGen, State) ->
|
|||
|
||||
run(fold_100, KeyGen, _ValueGen, State) ->
|
||||
[From,To] = lists:usort([KeyGen(), KeyGen()]),
|
||||
case hanoidb:sync_fold_range(State#state.tree,
|
||||
case hanoidb:fold_range(State#state.tree,
|
||||
fun(_Key,_Value,Count) ->
|
||||
Count+1
|
||||
end,
|
||||
|
|
|
@ -42,8 +42,8 @@
|
|||
-include_lib("include/hanoidb.hrl").
|
||||
-include_lib("include/plain_rpc.hrl").
|
||||
|
||||
-record(state, { top :: pos_integer(),
|
||||
nursery :: term(),
|
||||
-record(state, { top :: pid(),
|
||||
nursery :: #nursery{},
|
||||
dir :: string(),
|
||||
opt :: term(),
|
||||
max_level :: pos_integer()}).
|
||||
|
@ -407,6 +407,9 @@ handle_call({get, Key}, From, State=#state{ top=Top, nursery=Nursery } ) when is
|
|||
{noreply, State}
|
||||
end;
|
||||
|
||||
handle_call(close, _From, State=#state{ nursery=undefined }) ->
|
||||
{stop, normal, ok, State};
|
||||
|
||||
handle_call(close, _From, State=#state{ nursery=Nursery, top=Top, dir=Dir, max_level=MaxLevel, opt=Config }) ->
|
||||
try
|
||||
ok = hanoidb_nursery:finish(Nursery, Top),
|
||||
|
@ -424,8 +427,8 @@ handle_call(destroy, _From, State=#state{top=Top, nursery=Nursery }) ->
|
|||
ok = hanoidb_level:destroy(Top),
|
||||
{stop, normal, ok, State#state{ top=undefined, nursery=undefined, max_level=?TOP_LEVEL }}.
|
||||
|
||||
|
||||
do_put(Key, Value, Expiry, State=#state{ nursery=Nursery, top=Top }) ->
|
||||
-spec do_put(key(), value(), expiry(), #state{}) -> {ok, #state{}}.
|
||||
do_put(Key, Value, Expiry, State=#state{ nursery=Nursery, top=Top }) when Nursery =/= undefined ->
|
||||
{ok, Nursery2} = hanoidb_nursery:add(Key, Value, Expiry, Nursery, Top),
|
||||
{ok, State#state{nursery=Nursery2}}.
|
||||
|
||||
|
|
|
@ -47,3 +47,25 @@
|
|||
|
||||
-define(KEY_IN_RANGE(Key,Range),
|
||||
(?KEY_IN_FROM_RANGE(Key,Range) andalso ?KEY_IN_TO_RANGE(Key,Range))).
|
||||
|
||||
-record(nursery, { log_file :: file:fd(),
|
||||
dir :: string(),
|
||||
cache :: gb_tree(),
|
||||
total_size=0 :: integer(),
|
||||
count=0 :: integer(),
|
||||
last_sync=now() :: erlang:timestamp(),
|
||||
max_level :: integer(),
|
||||
config=[] :: [{atom(), term()}],
|
||||
step=0 :: integer(),
|
||||
merge_done=0 :: integer()}).
|
||||
|
||||
-type kventry() :: { key(), expvalue() } | [ kventry() ].
|
||||
-type key() :: binary().
|
||||
-type txspec() :: { delete, key() } | { put, key(), value() }.
|
||||
-type value() :: ?TOMBSTONE | binary().
|
||||
-type expiry() :: infinity | integer().
|
||||
-type filepos() :: { non_neg_integer(), non_neg_integer() }.
|
||||
-type expvalue() :: { value(), expiry() }
|
||||
| value()
|
||||
| filepos().
|
||||
|
||||
|
|
|
@ -46,7 +46,7 @@
|
|||
n :: non_neg_integer(), % maximum number of elements
|
||||
mb :: non_neg_integer(), % 2^mb = m, the size of each slice (bitvector)
|
||||
size :: non_neg_integer(), % number of elements
|
||||
a :: [binary()] % list of bitvectors
|
||||
a :: [array()] % list of bitvectors
|
||||
}).
|
||||
|
||||
-record(sbf, {
|
||||
|
@ -183,16 +183,18 @@ set_bits(Mask, I1, I, [H|T], Acc) ->
|
|||
|
||||
bitarray_new(N) -> array:new((N-1) div ?W + 1, {default, 0}).
|
||||
|
||||
-spec bitarray_set( non_neg_integer(), array() ) -> array().
|
||||
bitarray_set(I, A) ->
|
||||
AI = I div ?W,
|
||||
V = array:get(AI, A),
|
||||
V1 = V bor (1 bsl (I rem ?W)),
|
||||
array:set(AI, V1, A).
|
||||
|
||||
-spec bitarray_get( non_neg_integer(), array() ) -> boolean().
|
||||
bitarray_get(I, A) ->
|
||||
AI = I div ?W,
|
||||
V = array:get(AI, A),
|
||||
V band (1 bsl (I rem ?W)) =/= 0.
|
||||
(V band (1 bsl (I rem ?W))) =/= 0.
|
||||
|
||||
encode(Bloom) ->
|
||||
zlib:gzip(term_to_binary(Bloom)).
|
||||
|
|
|
@ -68,7 +68,7 @@
|
|||
-include("hanoidb.hrl").
|
||||
-include("plain_rpc.hrl").
|
||||
|
||||
-record(state, {sendto, sendto_ref}).
|
||||
-record(state, {sendto :: pid(), sendto_ref :: reference()}).
|
||||
|
||||
start(SendTo) ->
|
||||
F = fun() ->
|
||||
|
@ -195,7 +195,7 @@ emit_next(State, [], _Queues) ->
|
|||
Msg = {fold_done, self()},
|
||||
Target = State#state.sendto,
|
||||
?log( "~p ! ~p~n", [Target, Msg]),
|
||||
plain_rpc:cast(Target, Msg),
|
||||
_ = plain_rpc:cast(Target, Msg),
|
||||
end_of_fold(State);
|
||||
|
||||
emit_next(State, [{FirstPID,FirstKV}|Rest]=Values, Queues) ->
|
||||
|
@ -215,7 +215,7 @@ emit_next(State, [{FirstPID,FirstKV}|Rest]=Values, Queues) ->
|
|||
fill(State, Values, Queues, FillFrom);
|
||||
{{Key, limit}, _} ->
|
||||
?log( "~p ! ~p~n", [State#state.sendto, {fold_limit, self(), Key}]),
|
||||
plain_rpc:cast(State#state.sendto, {fold_limit, self(), Key}),
|
||||
_ = plain_rpc:cast(State#state.sendto, {fold_limit, self(), Key}),
|
||||
end_of_fold(State);
|
||||
{{Key, Value}, FillFrom} ->
|
||||
?log( "~p ! ~p~n", [State#state.sendto, {fold_result, self(), Key, '...'}]),
|
||||
|
|
|
@ -853,13 +853,13 @@ do_range_fold(BT, WorkerPID, SelfOrRef, Range) ->
|
|||
BT,
|
||||
Range) of
|
||||
{limit, _, LastKey} ->
|
||||
WorkerPID ! {level_limit, SelfOrRef, LastKey};
|
||||
WorkerPID ! {level_limit, SelfOrRef, LastKey},
|
||||
ok;
|
||||
{done, _} ->
|
||||
%% tell fold merge worker we're done
|
||||
WorkerPID ! {level_done, SelfOrRef}
|
||||
|
||||
end,
|
||||
ok.
|
||||
WorkerPID ! {level_done, SelfOrRef},
|
||||
ok
|
||||
end.
|
||||
|
||||
-define(FOLD_CHUNK_SIZE, 100).
|
||||
|
||||
|
|
|
@ -40,16 +40,11 @@
|
|||
%% merges, so we default to running the entire merge in one process.
|
||||
-define(LOCAL_WRITER, true).
|
||||
|
||||
-spec merge(string(), string(), string(), integer(), boolean(), list()) -> {ok, integer()}.
|
||||
merge(A,B,C, Size, IsLastLevel, Options) ->
|
||||
{ok, IXA} = hanoidb_reader:open(A, [sequential|Options]),
|
||||
{ok, IXB} = hanoidb_reader:open(B, [sequential|Options]),
|
||||
{ok, Out} =
|
||||
case ?LOCAL_WRITER of
|
||||
true ->
|
||||
hanoidb_writer:init([C, [{size, Size} | Options]]);
|
||||
false ->
|
||||
hanoidb_writer:open(C, [{size, Size} | Options])
|
||||
end,
|
||||
{ok, Out} = hanoidb_writer:init([C, [{size, Size} | Options]]),
|
||||
AKVs =
|
||||
case hanoidb_reader:first_node(IXA) of
|
||||
{node, AKV} -> AKV;
|
||||
|
@ -63,16 +58,9 @@ merge(A,B,C, Size, IsLastLevel, Options) ->
|
|||
scan(IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {0, none}).
|
||||
|
||||
terminate(Out) ->
|
||||
case ?LOCAL_WRITER of
|
||||
true ->
|
||||
{ok, Count, _} = hanoidb_writer:handle_call(count, self(), Out),
|
||||
{stop, normal, ok, _} = hanoidb_writer:handle_call(close, self(), Out),
|
||||
{ok, Count};
|
||||
false ->
|
||||
Count = hanoidb_writer:count(Out),
|
||||
ok = hanoidb_writer:close(Out),
|
||||
{ok, Count}
|
||||
end.
|
||||
{ok, Count, _} = hanoidb_writer:handle_call(count, self(), Out),
|
||||
{stop, normal, ok, _} = hanoidb_writer:handle_call(close, self(), Out),
|
||||
{ok, Count}.
|
||||
|
||||
step(S) ->
|
||||
step(S, 1).
|
||||
|
@ -103,16 +91,11 @@ scan(IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {N, FromPID}) when N < 1, AKVs =/=
|
|||
{step, From, HowMany} ->
|
||||
scan(IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {N+HowMany, From})
|
||||
after ?HIBERNATE_TIMEOUT ->
|
||||
case ?LOCAL_WRITER of
|
||||
true ->
|
||||
Args = {hanoidb_reader:serialize(IXA),
|
||||
hanoidb_reader:serialize(IXB),
|
||||
hanoidb_writer:serialize(Out), IsLastLevel, AKVs, BKVs, N},
|
||||
Keep = zlib:gzip(erlang:term_to_binary(Args)),
|
||||
hibernate_scan(Keep);
|
||||
false ->
|
||||
scan(IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {0, none})
|
||||
end
|
||||
Args = {hanoidb_reader:serialize(IXA),
|
||||
hanoidb_reader:serialize(IXB),
|
||||
hanoidb_writer:serialize(Out), IsLastLevel, AKVs, BKVs, N},
|
||||
Keep = zlib:gzip(erlang:term_to_binary(Args)),
|
||||
hibernate_scan(Keep)
|
||||
end;
|
||||
|
||||
scan(IXA, IXB, Out, IsLastLevel, [], BKVs, Step) ->
|
||||
|
@ -135,38 +118,14 @@ scan(IXA, IXB, Out, IsLastLevel, AKVs, [], Step) ->
|
|||
|
||||
scan(IXA, IXB, Out, IsLastLevel, [{Key1,Value1}|AT]=_AKVs, [{Key2,_Value2}|_IX]=BKVs, Step)
|
||||
when Key1 < Key2 ->
|
||||
Out3 =
|
||||
case ?LOCAL_WRITER of
|
||||
true ->
|
||||
{noreply, Out2} = hanoidb_writer:handle_cast({add, Key1, Value1}, Out),
|
||||
Out2;
|
||||
false ->
|
||||
ok = hanoidb_writer:add(Out, Key1, Value1),
|
||||
Out
|
||||
end,
|
||||
{noreply, Out3} = hanoidb_writer:handle_cast({add, Key1, Value1}, Out),
|
||||
scan(IXA, IXB, Out3, IsLastLevel, AT, BKVs, step(Step));
|
||||
scan(IXA, IXB, Out, IsLastLevel, [{Key1,_Value1}|_AT]=AKVs, [{Key2,Value2}|IX]=_BKVs, Step)
|
||||
when Key1 > Key2 ->
|
||||
Out3 =
|
||||
case ?LOCAL_WRITER of
|
||||
true ->
|
||||
{noreply, Out2} = hanoidb_writer:handle_cast({add, Key2, Value2}, Out),
|
||||
Out2;
|
||||
false ->
|
||||
ok = hanoidb_writer:add(Out, Key2, Value2),
|
||||
Out
|
||||
end,
|
||||
{noreply, Out3} = hanoidb_writer:handle_cast({add, Key2, Value2}, Out),
|
||||
scan(IXA, IXB, Out3, IsLastLevel, AKVs, IX, step(Step));
|
||||
scan(IXA, IXB, Out, IsLastLevel, [{_Key1,_Value1}|AT]=_AKVs, [{Key2,Value2}|IX]=_BKVs, Step) ->
|
||||
Out3 =
|
||||
case ?LOCAL_WRITER of
|
||||
true ->
|
||||
{noreply, Out2} = hanoidb_writer:handle_cast({add, Key2, Value2}, Out),
|
||||
Out2;
|
||||
false ->
|
||||
ok = hanoidb_writer:add(Out, Key2, Value2),
|
||||
Out
|
||||
end,
|
||||
{noreply, Out3} = hanoidb_writer:handle_cast({add, Key2, Value2}, Out),
|
||||
scan(IXA, IXB, Out3, IsLastLevel, AT, IX, step(Step, 2)).
|
||||
|
||||
hibernate_scan_only(Keep) ->
|
||||
|
@ -217,13 +176,5 @@ scan_only(IX, Out, true, [{_,?TOMBSTONE}|Rest], Step) ->
|
|||
scan_only(IX, Out, true, Rest, step(Step));
|
||||
|
||||
scan_only(IX, Out, IsLastLevel, [{Key,Value}|Rest], Step) ->
|
||||
Out3 =
|
||||
case ?LOCAL_WRITER of
|
||||
true ->
|
||||
{noreply, Out2} = hanoidb_writer:handle_cast({add, Key, Value}, Out),
|
||||
Out2;
|
||||
false ->
|
||||
ok = hanoidb_writer:add(Out, Key, Value),
|
||||
Out
|
||||
end,
|
||||
{noreply, Out3} = hanoidb_writer:handle_cast({add, Key, Value}, Out),
|
||||
scan_only(IX, Out3, IsLastLevel, Rest, step(Step)).
|
||||
|
|
|
@ -32,17 +32,6 @@
|
|||
-include("hanoidb.hrl").
|
||||
-include_lib("kernel/include/file.hrl").
|
||||
|
||||
-record(nursery, { log_file :: file:fd(),
|
||||
dir :: string(),
|
||||
cache :: gb_tree(),
|
||||
total_size=0 :: integer(),
|
||||
count=0 :: integer(),
|
||||
last_sync=now() :: erlang:timestamp(),
|
||||
max_level :: integer(),
|
||||
config=[] :: [{atom(), term()}],
|
||||
step=0 :: integer(),
|
||||
merge_done=0 :: integer()}).
|
||||
|
||||
-spec new(string(), integer(), [_]) -> {ok, #nursery{}} | {error, term()}.
|
||||
|
||||
-define(LOGFILENAME(Dir), filename:join(Dir, "nursery.log")).
|
||||
|
@ -99,15 +88,15 @@ read_nursery_from_log(Directory, MaxLevel, Config) ->
|
|||
case hanoidb_util:decode_crc_data(LogBinary, [], []) of
|
||||
{ok, KVs} ->
|
||||
fill_cache(KVs, gb_trees:empty());
|
||||
{error, _Reason} ->
|
||||
{partial, KVs, _ErrorData} ->
|
||||
error_logger:info_msg("ignoring undecypherable bytes in ~p~n", [?LOGFILENAME(Directory)]),
|
||||
gb_trees:empty()
|
||||
fill_cache(KVs, gb_trees:empty())
|
||||
end,
|
||||
{ok, #nursery{ dir=Directory, cache=Cache, count=gb_trees:size(Cache), max_level=MaxLevel, config=Config }}.
|
||||
|
||||
%% @doc Add a Key/Value to the nursery
|
||||
%% @end
|
||||
-spec do_add(#nursery{}, binary(), binary()|?TOMBSTONE, non_neg_integer() | infinity, pid()) -> {ok, #nursery{}}.
|
||||
-spec do_add(#nursery{}, binary(), binary()|?TOMBSTONE, non_neg_integer() | infinity, pid()) -> {ok, #nursery{}} | {full, #nursery{}}.
|
||||
do_add(Nursery, Key, Value, infinity, Top) ->
|
||||
do_add(Nursery, Key, Value, 0, Top);
|
||||
do_add(Nursery=#nursery{log_file=File, cache=Cache, total_size=TotalSize, count=Count, config=Config}, Key, Value, KeyExpiryTime, Top) ->
|
||||
|
@ -244,10 +233,11 @@ destroy(#nursery{ dir=Dir, log_file=LogFile }) ->
|
|||
file:delete(LogFileName),
|
||||
ok.
|
||||
|
||||
|
||||
-spec add(key(), value(), #nursery{}, pid()) -> {ok, #nursery{}}.
|
||||
add(Key, Value, Nursery, Top) ->
|
||||
add(Key, Value, infinity, Nursery, Top).
|
||||
|
||||
-spec add(key(), value(), expiry(), #nursery{}, pid()) -> {ok, #nursery{}}.
|
||||
add(Key, Value, Expiry, Nursery, Top) ->
|
||||
case do_add(Nursery, Key, Value, Expiry, Top) of
|
||||
{ok, Nursery0} ->
|
||||
|
@ -256,6 +246,7 @@ add(Key, Value, Expiry, Nursery, Top) ->
|
|||
flush(Nursery0, Top)
|
||||
end.
|
||||
|
||||
-spec flush(#nursery{}, pid()) -> {ok, #nursery{}}.
|
||||
flush(Nursery=#nursery{ dir=Dir, max_level=MaxLevel, config=Config }, Top) ->
|
||||
ok = finish(Nursery, Top),
|
||||
{error, enoent} = file:read_file_info(filename:join(Dir, "nursery.log")),
|
||||
|
|
|
@ -39,28 +39,32 @@
|
|||
-record(node, {level :: non_neg_integer(),
|
||||
members=[] :: list(any()) }).
|
||||
|
||||
-record(index, {file :: port(),
|
||||
root :: #node{},
|
||||
-record(index, {file :: file:io_device(),
|
||||
root :: #node{} | none,
|
||||
bloom :: term(),
|
||||
name :: string(),
|
||||
config=[] :: term() }).
|
||||
|
||||
-type read_file() :: #index{}.
|
||||
|
||||
-spec open(Name::string()) -> read_file().
|
||||
-spec open(Name::string()) -> {ok, read_file()} | {error, any()}.
|
||||
open(Name) ->
|
||||
open(Name, [random]).
|
||||
|
||||
-type config() :: [sequential | folding | random | {atom(), term()}].
|
||||
|
||||
-spec open(Name::string(), config()) -> read_file().
|
||||
-spec open(Name::string(), config()) -> {ok, read_file()} | {error, any()}.
|
||||
|
||||
open(Name, Config) ->
|
||||
case proplists:get_bool(sequential, Config) of
|
||||
true ->
|
||||
ReadBufferSize = hanoidb:get_opt(read_buffer_size, Config, 512 * 1024),
|
||||
{ok, File} = file:open(Name, [raw,read,{read_ahead, ReadBufferSize},binary]),
|
||||
{ok, #index{file=File, name=Name, config=Config}};
|
||||
case file:open(Name, [raw,read,{read_ahead, ReadBufferSize},binary]) of
|
||||
{ok, File} ->
|
||||
{ok, #index{file=File, name=Name, config=Config}};
|
||||
{error, _}=Err ->
|
||||
Err
|
||||
end;
|
||||
|
||||
false ->
|
||||
{ok, File} =
|
||||
|
@ -129,13 +133,15 @@ fold1(File,Fun,Acc0) ->
|
|||
fold0(File,Fun,Node,Acc0)
|
||||
end.
|
||||
|
||||
-spec range_fold(function(), any(), #index{}, #key_range{}) ->
|
||||
{limit, any(), binary()} | {done, any()}.
|
||||
range_fold(Fun, Acc0, #index{file=File,root=Root}, Range) ->
|
||||
case lookup_node(File,Range#key_range.from_key,Root,?FIRST_BLOCK_POS) of
|
||||
{ok, {Pos,_}} ->
|
||||
file:position(File, Pos),
|
||||
{ok, _} = file:position(File, Pos),
|
||||
do_range_fold(Fun, Acc0, File, Range, Range#key_range.limit);
|
||||
{ok, Pos} ->
|
||||
file:position(File, Pos),
|
||||
{ok, _} = file:position(File, Pos),
|
||||
do_range_fold(Fun, Acc0, File, Range, Range#key_range.limit);
|
||||
none ->
|
||||
{done, Acc0}
|
||||
|
@ -266,6 +272,8 @@ next_node(#index{file=File}=_Index) ->
|
|||
end_of_data
|
||||
end.
|
||||
|
||||
close(#index{file=undefined}) ->
|
||||
ok;
|
||||
close(#index{file=File}) ->
|
||||
file:close(File).
|
||||
|
||||
|
@ -308,8 +316,8 @@ lookup_in_node(File,#node{members=Members},Key) ->
|
|||
{ok, Node} ->
|
||||
Result = lookup_in_node2(File, Node, Key),
|
||||
plain_rpc:send_reply(From, Result);
|
||||
{error, _}=Error ->
|
||||
plain_rpc:send_reply(From, Error)
|
||||
eof ->
|
||||
plain_rpc:send_reply(From, {error, eof})
|
||||
end
|
||||
end
|
||||
end),
|
||||
|
@ -339,8 +347,8 @@ lookup_in_node2(File,#node{members=Members},Key) ->
|
|||
case read_node(File, {Pos,Size}) of
|
||||
{ok, Node} ->
|
||||
lookup_in_node2(File, Node, Key);
|
||||
{error, _}=Error ->
|
||||
Error
|
||||
eof ->
|
||||
{error, eof}
|
||||
end;
|
||||
not_found ->
|
||||
not_found
|
||||
|
@ -365,7 +373,8 @@ find_start(K, KVs) ->
|
|||
find_1(K, KVs).
|
||||
|
||||
|
||||
|
||||
-spec read_node(file:io_device(), non_neg_integer() | { non_neg_integer(), non_neg_integer() }) ->
|
||||
{ok, #node{}} | eof.
|
||||
|
||||
read_node(File, {Pos, Size}) ->
|
||||
% error_logger:info_msg("read_node ~p ~p ~p~n", [File, Pos, Size]),
|
||||
|
|
|
@ -56,9 +56,11 @@
|
|||
-compile({inline, [crc_encapsulate/1, crc_encapsulate_kv_entry/2 ]}).
|
||||
|
||||
|
||||
-spec index_file_name(string()) -> string().
|
||||
index_file_name(Name) ->
|
||||
Name.
|
||||
|
||||
-spec file_exists(string()) -> boolean().
|
||||
file_exists(FileName) ->
|
||||
case file:read_file_info(FileName) of
|
||||
{ok, _} ->
|
||||
|
@ -146,6 +148,7 @@ decode_index_node(Level, Data) ->
|
|||
{ok, {node, Level, KVList}}.
|
||||
|
||||
|
||||
-spec crc_encapsulate_kv_entry(binary(), expvalue()) -> iolist().
|
||||
crc_encapsulate_kv_entry(Key, {Value, infinity}) ->
|
||||
crc_encapsulate_kv_entry(Key, Value);
|
||||
crc_encapsulate_kv_entry(Key, {?TOMBSTONE, TStamp}) -> %
|
||||
|
@ -153,13 +156,13 @@ crc_encapsulate_kv_entry(Key, {?TOMBSTONE, TStamp}) -> %
|
|||
crc_encapsulate_kv_entry(Key, ?TOMBSTONE) ->
|
||||
crc_encapsulate( [?TAG_DELETED | Key] );
|
||||
crc_encapsulate_kv_entry(Key, {Value, TStamp}) when is_binary(Value) ->
|
||||
crc_encapsulate( [?TAG_KV_DATA2, <<TStamp:32, (byte_size(Key)):32/unsigned>>, Key | Value] );
|
||||
crc_encapsulate( [?TAG_KV_DATA2, <<TStamp:32, (byte_size(Key)):32/unsigned>>, Key, Value] );
|
||||
crc_encapsulate_kv_entry(Key, Value) when is_binary(Value) ->
|
||||
crc_encapsulate( [?TAG_KV_DATA, <<(byte_size(Key)):32/unsigned>>, Key | Value] );
|
||||
crc_encapsulate( [?TAG_KV_DATA, <<(byte_size(Key)):32/unsigned>>, Key, Value] );
|
||||
crc_encapsulate_kv_entry(Key, {Pos,Len}) when Len < 16#ffffffff ->
|
||||
crc_encapsulate( [?TAG_POSLEN32, <<Pos:64/unsigned, Len:32/unsigned>>, Key] ).
|
||||
|
||||
|
||||
-spec crc_encapsulate_transaction( [ txspec() ], expiry() ) -> iolist().
|
||||
crc_encapsulate_transaction(TransactionSpec, Expiry) ->
|
||||
crc_encapsulate([?TAG_TRANSACT |
|
||||
lists:map(fun({delete, Key}) ->
|
||||
|
@ -169,11 +172,13 @@ crc_encapsulate_transaction(TransactionSpec, Expiry) ->
|
|||
end,
|
||||
TransactionSpec)]).
|
||||
|
||||
-spec crc_encapsulate( iolist() ) -> iolist().
|
||||
crc_encapsulate(Blob) ->
|
||||
CRC = erlang:crc32(Blob),
|
||||
Size = erlang:iolist_size(Blob),
|
||||
[<< (Size):32/unsigned, CRC:32/unsigned >>, Blob, ?TAG_END].
|
||||
|
||||
-spec decode_kv_list( binary() ) -> {ok, [ kventry() ]} | {partial, [kventry()], iolist()}.
|
||||
decode_kv_list(<<?TAG_END, Custom/binary>>) ->
|
||||
decode_crc_data(Custom, [], []);
|
||||
decode_kv_list(<<?ERLANG_ENCODED, _/binary>>=TermData) ->
|
||||
|
@ -181,14 +186,13 @@ decode_kv_list(<<?ERLANG_ENCODED, _/binary>>=TermData) ->
|
|||
decode_kv_list(<<?CRC_ENCODED, Custom/binary>>) ->
|
||||
decode_crc_data(Custom, [], []).
|
||||
|
||||
-spec decode_crc_data(binary(), list(), list()) -> {ok, list()} | {error, data_corruption}.
|
||||
-spec decode_crc_data(binary(), list(), list()) -> {ok, [kventry()]} | {partial, [kventry()], iolist()}.
|
||||
decode_crc_data(<<>>, [], Acc) ->
|
||||
{ok, lists:reverse(Acc)};
|
||||
decode_crc_data(<<>>, _BrokenData, Acc) ->
|
||||
% {error, data_corruption};
|
||||
decode_crc_data(<<>>, BrokenData, Acc) ->
|
||||
{partial, lists:reverse(Acc), BrokenData};
|
||||
% TODO: we *could* simply return the good parts of the data...
|
||||
% would that be so wrong?
|
||||
{ok, lists:reverse(Acc)};
|
||||
decode_crc_data(<< BinSize:32/unsigned, CRC:32/unsigned, Bin:BinSize/binary, ?TAG_END, Rest/binary >>, Broken, Acc) ->
|
||||
CRCTest = erlang:crc32( Bin ),
|
||||
if CRC == CRCTest ->
|
||||
|
@ -203,6 +207,7 @@ decode_crc_data(Bad, Broken, Acc) ->
|
|||
{Skipped, MaybeGood} = find_next_value(Bad),
|
||||
decode_crc_data(MaybeGood, [Skipped|Broken], Acc).
|
||||
|
||||
-spec find_next_value(binary()) -> { binary(), binary() }.
|
||||
find_next_value(<<>>) ->
|
||||
{<<>>, <<>>};
|
||||
find_next_value(Bin) ->
|
||||
|
@ -214,6 +219,7 @@ find_next_value(Bin) ->
|
|||
{Bin, <<>>}
|
||||
end.
|
||||
|
||||
-spec decode_kv_data( binary() ) -> kventry().
|
||||
decode_kv_data(<<?TAG_KV_DATA, KLen:32/unsigned, Key:KLen/binary, Value/binary >>) ->
|
||||
{Key, Value};
|
||||
decode_kv_data(<<?TAG_DELETED, Key/binary>>) ->
|
||||
|
|
|
@ -42,10 +42,10 @@
|
|||
-export([open/1, open/2, add/3, count/1, close/1]).
|
||||
|
||||
-record(node, {level :: integer(),
|
||||
members=[] :: [ {binary(), binary()} ],
|
||||
members=[] :: [ {key(), expvalue()} ],
|
||||
size=0 :: integer()}).
|
||||
|
||||
-record(state, {index_file :: port(),
|
||||
-record(state, {index_file :: file:io_device() | undefined,
|
||||
index_file_pos :: integer(),
|
||||
|
||||
last_node_pos :: pos_integer(),
|
||||
|
@ -154,7 +154,7 @@ terminate(normal,_State) ->
|
|||
ok;
|
||||
terminate(_Reason, State) ->
|
||||
%% premature delete -> cleanup
|
||||
file:close(State#state.index_file),
|
||||
_ignore = file:close(State#state.index_file),
|
||||
file:delete(hanoidb_util:index_file_name(State#state.name)).
|
||||
|
||||
code_change(_OldVsn, State, _Extra) ->
|
||||
|
@ -170,7 +170,7 @@ serialize(#state{ bloom=Bloom, index_file=File, index_file_pos=Position }=State)
|
|||
exit({bad_position, Position, WrongPosition})
|
||||
end,
|
||||
ok = file:close(File),
|
||||
erlang:term_to_binary( { State#state{ index_file=closed }, hanoidb_bloom:encode(Bloom) } ).
|
||||
erlang:term_to_binary( { State#state{ index_file=undefined }, hanoidb_bloom:encode(Bloom) } ).
|
||||
|
||||
deserialize(Binary) ->
|
||||
{State, Bin} = erlang:binary_to_term(Binary),
|
||||
|
|
|
@ -45,7 +45,7 @@ receive_reply(MRef) ->
|
|||
end.
|
||||
|
||||
send_reply({PID,Ref}, Reply) ->
|
||||
erlang:send(PID, ?REPLY(Ref, Reply)),
|
||||
_ = erlang:send(PID, ?REPLY(Ref, Reply)),
|
||||
ok.
|
||||
|
||||
call(PID,Request) ->
|
||||
|
|
Loading…
Reference in a new issue