Make ebloom / scalable bloom a compile-time option

bloom (from basho) uses a NIF implementation which
may be faster and use less memory; test before use.

The “old” bloom, a.k.a. “scalable bloom” 
is pure erlang, which also has nice properties. 

Note!  Switching to ebloom changes the file format
so the default is still to use ‘scalable bloom’.
This commit is contained in:
Kresten Krab Thorup 2014-10-21 15:03:53 +01:00
parent 813863c9c3
commit 697683240d
7 changed files with 49 additions and 143 deletions

View file

@ -4,6 +4,7 @@
{eunit_opts, [verbose, {report, {eunit_surefire, [{dir, "."}]}}]}.
{erl_opts, [%{d,'DEBUG',true},
{d,'USE_SCALABLE_BLOOM',true},
{parse_transform, lager_transform},
fail_on_warning,
warn_unused_vars,
@ -26,9 +27,10 @@
{deps, [ {sext, ".*", {git, "git://github.com/esl/sext", {branch, "master"}}}
, {lager, ".*", {git, "git://github.com/basho/lager", {branch, "master"}}}
, {snappy, "1.1.*", {git, "git://github.com/fdmanana/snappy-erlang-nif.git", {branch, "master"}}}
, {plain_fsm, "1.1.*", {git, "git://github.com/gburd/plain_fsm", {branch, "master"}}}
, {basho_bench, ".*", {git, "git://github.com/basho/basho_bench", {branch, "master"}}}
, {snappy, "1.*", {git, "git://github.com/fdmanana/snappy-erlang-nif.git", {branch, "master"}}}
, {plain_fsm, "1.*", {git, "git://github.com/gburd/plain_fsm", {branch, "master"}}}
% , {basho_bench, ".*", {git, "git://github.com/basho/basho_bench", {branch, "master"}}}
, {ebloom, ".*", {git, "git://github.com/basho/ebloom", {branch, "master"}}}
, {triq, ".*", {git, "git://github.com/krestenkrab/triq", {branch, "master"}}}
, {lz4, ".*", {git, "git://github.com/krestenkrab/erlang-lz4.git", {branch, "master"}}}
% , {edown, "0.3.*", {git, "git://github.com/esl/edown.git", {branch, "master"}}}

View file

@ -1,122 +0,0 @@
%% ----------------------------------------------------------------------------
%%
%% hanoidb: LSM-trees (Log-Structured Merge Trees) Indexed Storage
%%
%% Copyright 2011-2012 (c) Trifork A/S. All Rights Reserved.
%% http://trifork.com/ info@trifork.com
%%
%% Copyright 2012 (c) Basho Technologies, Inc. All Rights Reserved.
%% http://basho.com/ info@basho.com
%%
%% This file is provided to you under the Apache License, Version 2.0 (the
%% "License"); you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
%% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
%% License for the specific language governing permissions and limitations
%% under the License.
%%
%% ----------------------------------------------------------------------------
-module(basho_bench_driver_hanoidb).
-record(state, { tree :: hanoidb:hanoidb(),
filename :: string(),
flags :: list(),
sync_interval :: integer() | infinity,
last_sync :: erlang:timestamp() }).
-export([new/1,
run/4]).
-include("hanoidb.hrl").
-include_lib("basho_bench/include/basho_bench.hrl").
-record(key_range, { from_key = <<>> :: binary(),
from_inclusive = true :: boolean(),
to_key :: binary() | undefined,
to_inclusive = false :: boolean(),
limit :: pos_integer() | undefined }).
%% ====================================================================
%% API
%% ====================================================================
new(_Id) ->
%% Make sure bitcask is available
case code:which(hanoidb) of
non_existing ->
?FAIL_MSG("~s requires hanoidb to be available on code path.\n",
[?MODULE]);
_ ->
ok
end,
%% Get the target directory
Dir = basho_bench_config:get(hanoidb_dir, "."),
Filename = filename:join(Dir, "test.hanoidb"),
Config = basho_bench_config:get(hanoidb_flags, []),
%% Look for sync interval config
SyncInterval =
case basho_bench_config:get(hanoidb_sync_interval, infinity) of
Value when is_integer(Value) ->
Value;
infinity ->
infinity
end,
%% Get any bitcask flags
case hanoidb:open(Filename, Config) of
{error, Reason} ->
?FAIL_MSG("Failed to open hanoidb in ~s: ~p\n", [Filename, Reason]);
{ok, FBTree} ->
{ok, #state { tree = FBTree,
filename = Filename,
sync_interval = SyncInterval,
last_sync = os:timestamp() }}
end.
run(get, KeyGen, _ValueGen, State) ->
case hanoidb:lookup(State#state.tree, KeyGen()) of
{ok, _Value} ->
{ok, State};
not_found ->
{ok, State};
{error, Reason} ->
{error, Reason}
end;
run(put, KeyGen, ValueGen, State) ->
case hanoidb:put(State#state.tree, KeyGen(), ValueGen()) of
ok ->
{ok, State};
{error, Reason} ->
{error, Reason}
end;
run(delete, KeyGen, _ValueGen, State) ->
case hanoidb:delete(State#state.tree, KeyGen()) of
ok ->
{ok, State};
{error, Reason} ->
{error, Reason}
end;
run(fold_100, KeyGen, _ValueGen, State) ->
[From,To] = lists:usort([KeyGen(), KeyGen()]),
case hanoidb:fold_range(State#state.tree,
fun(_Key,_Value,Count) ->
Count+1
end,
0,
#key_range{ from_key=From,
to_key=To,
limit=100 }) of
Count when Count >= 0; Count =< 100 ->
{ok,State};
Count ->
{error, {bad_fold_count, Count}}
end.

View file

@ -50,7 +50,7 @@
-record(nursery, { log_file :: file:fd(),
dir :: string(),
cache :: gb_tree(),
cache :: gb_trees:tree(binary(), binary()),
total_size=0 :: integer(),
count=0 :: integer(),
last_sync=now() :: erlang:timestamp(),
@ -69,3 +69,22 @@
| value()
| filepos().
-ifdef(USE_SCALABLE_BLOOM).
-define(BLOOM_NEW(Size), {ok, hanoidb_bloom:bloom(Size, 0.01)}).
-define(BLOOM_TO_BIN(Bloom), hanoidb_bloom:encode(Bloom)). %% -> Binary
-define(BIN_TO_BLOOM(Bin), {ok, hanoidb_bloom:decode(Bin)}).
-define(BLOOM_INSERT(Bloom, Key), {ok, hanoidb_bloom:add(Key,Bloom)}).
-define(BLOOM_CONTAINS(Bloom, Key), hanoidb_bloom:member(Bloom, Key)). %% -> 'true' | 'false'
-else.
-define(BLOOM_NEW(Size), begin ebloom:new(Size, 0.01, Size) end).
-define(BLOOM_TO_BIN(Bloom), begin ebloom:serialize(Bloom) end). %% -> Binary
-define(BIN_TO_BLOOM(Bin), begin ebloom:deserialize(Bin) end).
-define(BLOOM_INSERT(Bloom, Key), begin ok=ebloom:insert(Bloom, Key), {ok, Bloom} end).
-define(BLOOM_CONTAINS(Bloom, Key), begin ebloom:member(Bloom, Key) end). %% -> 'true' | 'false'
-endif.

View file

@ -794,8 +794,8 @@ begin_merge(State) ->
Owner ! ?CAST(self(),{merge_done, OutCount, XFileName})
catch
C:E ->
error_logger:error_msg("merge failed ~p:~p ~p~n",
[C,E,erlang:get_stacktrace()]),
error_logger:error_msg("~p: merge failed ~p:~p ~p -> ~s~n",
[self(), C,E,erlang:get_stacktrace(), XFileName]),
erlang:raise(C,E,erlang:get_stacktrace())
end
end),

View file

@ -58,8 +58,8 @@ merge(A,B,C, Size, IsLastLevel, Options) ->
scan(IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {0, none}).
terminate(Out) ->
{ok, Count, _} = hanoidb_writer:handle_call(count, self(), Out),
{stop, normal, ok, _} = hanoidb_writer:handle_call(close, self(), Out),
{ok, Count, Out1} = hanoidb_writer:handle_call(count, self(), Out),
{stop, normal, ok, _Out2} = hanoidb_writer:handle_call(close, self(), Out1),
{ok, Count}.
step(S) ->

View file

@ -85,7 +85,7 @@ open(Name, Config) ->
{ok, <<RootPos:64/unsigned>>} = file:pread(File, FileInfo#file_info.size - 8, 8),
{ok, <<BloomSize:32/unsigned>>} = file:pread(File, FileInfo#file_info.size - 12, 4),
{ok, BloomData} = file:pread(File, (FileInfo#file_info.size - 12 - BloomSize), BloomSize),
Bloom = hanoidb_bloom:decode(BloomData),
{ok, Bloom} = ?BIN_TO_BLOOM(BloomData),
%% read in the root node
Root =
@ -279,7 +279,7 @@ close(#index{file=File}) ->
lookup(#index{file=File, root=Node, bloom=Bloom}, Key) ->
case hanoidb_bloom:member(Key, Bloom) of
case ?BLOOM_CONTAINS(Bloom, Key) of
true ->
case lookup_in_node(File, Node, Key) of
not_found ->

View file

@ -94,7 +94,7 @@ init([Name, Options]) ->
case do_open(Name, Options, [exclusive]) of
{ok, IdxFile} ->
ok = file:write(IdxFile, ?FILE_FORMAT),
Bloom = hanoidb_bloom:bloom(Size),
{ok, Bloom} = ?BLOOM_NEW(Size),
BlockSize = hanoidb:get_opt(block_size, Options, ?NODE_SIZE),
{ok, #state{ name=Name,
index_file_pos=?FIRST_BLOCK_POS, index_file=IdxFile,
@ -170,11 +170,11 @@ serialize(#state{ bloom=Bloom, index_file=File, index_file_pos=Position }=State)
exit({bad_position, Position, WrongPosition})
end,
ok = file:close(File),
erlang:term_to_binary( { State#state{ index_file=undefined }, hanoidb_bloom:encode(Bloom) } ).
erlang:term_to_binary( { State#state{ index_file=undefined, bloom=undefined }, ?BLOOM_TO_BIN(Bloom) } ).
deserialize(Binary) ->
{State, Bin} = erlang:binary_to_term(Binary),
Bloom = hanoidb_bloom:decode(Bin),
{ok, Bloom} = ?BIN_TO_BLOOM(Bin),
{ok, IdxFile} = do_open(State#state.name, State#state.opts, []),
State#state{ bloom=Bloom, index_file=IdxFile }.
@ -182,13 +182,14 @@ deserialize(Binary) ->
do_open(Name, Options, OpenOpts) ->
WriteBufferSize = hanoidb:get_opt(write_buffer_size, Options, 512 * 1024),
file:open(hanoidb_util:index_file_name(Name),
[raw, append, {delayed_write, WriteBufferSize, 2000} | OpenOpts]).
[raw, append, {delayed_write, WriteBufferSize, 2000}, exclusive | OpenOpts]).
%% @doc flush pending nodes and write trailer
archive_nodes(#state{ nodes=[], last_node_pos=LastNodePos, last_node_size=_LastNodeSize, bloom=Bloom, index_file=IdxFile }=State) ->
BloomBin = hanoidb_bloom:encode(Bloom),
BloomBin = ?BLOOM_TO_BIN(Bloom),
true = is_binary(BloomBin),
BloomSize = byte_size(BloomBin),
RootPos =
case LastNodePos of
@ -204,7 +205,7 @@ archive_nodes(#state{ nodes=[], last_node_pos=LastNodePos, last_node_size=_LastN
ok = file:write(IdxFile, Trailer),
ok = file:datasync(IdxFile),
ok = file:close(IdxFile),
{ok, State#state{ index_file=undefined, index_file_pos=undefined }};
{ok, State#state{ index_file=undefined, index_file_pos=undefined, bloom=undefined }};
archive_nodes(State=#state{ nodes=[#node{level=N, members=[{_,{Pos,_Len}}]}], last_node_pos=Pos })
when N > 0 ->
@ -221,7 +222,8 @@ append_node(Level, Key, Value, State=#state{ nodes=[] }) ->
append_node(Level, Key, Value, State=#state{ nodes=[ #node{level=Level2 } |_]=Stack })
when Level < Level2 ->
append_node(Level, Key, Value, State#state{ nodes=[ #node{ level=(Level2 - 1) } | Stack] });
append_node(Level, Key, Value, #state{ nodes=[ #node{level=Level, members=List, size=NodeSize}=CurrNode | RestNodes ], value_count=VC, tombstone_count=TC, bloom=Bloom }=State) ->
append_node(Level, Key, Value, #state{ nodes=[ #node{level=Level, members=List, size=NodeSize}=CurrNode | RestNodes ], value_count=VC, tombstone_count=TC, bloom=Bloom }=State)
when Bloom /= undefined ->
%% The top-of-stack node is at the level we wish to insert at.
%% Assert that keys are increasing:
@ -236,10 +238,14 @@ append_node(Level, Key, Value, #state{ nodes=[ #node{level=Level, members=List,
exit({badarg, Key})
end
end,
NewSize = NodeSize + hanoidb_util:estimate_node_size_increment(List, Key, Value),
NewBloom = hanoidb_bloom:add(Key, Bloom),
{ok,Bloom2} = case Level of
0 ->
?BLOOM_INSERT(Bloom, Key);
_ ->
{ok,Bloom}
end,
{TC1, VC1} =
case Level of
@ -258,7 +264,7 @@ append_node(Level, Key, Value, #state{ nodes=[ #node{level=Level, members=List,
NodeMembers = [{Key, Value} | List],
State2 = State#state{ nodes=[CurrNode#node{members=NodeMembers, size=NewSize} | RestNodes],
value_count=VC1, tombstone_count=TC1, bloom=NewBloom },
value_count=VC1, tombstone_count=TC1, bloom=Bloom2 },
case NewSize >= State#state.block_size of
true ->
@ -267,7 +273,8 @@ append_node(Level, Key, Value, #state{ nodes=[ #node{level=Level, members=List,
{ok, State2}
end.
flush_node_buffer(#state{nodes=[#node{ level=Level, members=NodeMembers }|RestNodes], compress=Compress, index_file_pos=NodePos} = State) ->
flush_node_buffer(#state{nodes=[#node{ level=Level, members=NodeMembers }|RestNodes], compress=Compress, index_file_pos=NodePos } = State) ->
OrderedMembers = lists:reverse(NodeMembers),
{ok, BlockData} = hanoidb_util:encode_index_node(OrderedMembers, Compress),