diff --git a/rebar.config b/rebar.config index 98584ca..8479f49 100644 --- a/rebar.config +++ b/rebar.config @@ -4,6 +4,7 @@ {eunit_opts, [verbose, {report, {eunit_surefire, [{dir, "."}]}}]}. {erl_opts, [%{d,'DEBUG',true}, + {d,'USE_SCALABLE_BLOOM',true}, {parse_transform, lager_transform}, fail_on_warning, warn_unused_vars, @@ -26,9 +27,10 @@ {deps, [ {sext, ".*", {git, "git://github.com/esl/sext", {branch, "master"}}} , {lager, ".*", {git, "git://github.com/basho/lager", {branch, "master"}}} - , {snappy, "1.1.*", {git, "git://github.com/fdmanana/snappy-erlang-nif.git", {branch, "master"}}} - , {plain_fsm, "1.1.*", {git, "git://github.com/gburd/plain_fsm", {branch, "master"}}} - , {basho_bench, ".*", {git, "git://github.com/basho/basho_bench", {branch, "master"}}} + , {snappy, "1.*", {git, "git://github.com/fdmanana/snappy-erlang-nif.git", {branch, "master"}}} + , {plain_fsm, "1.*", {git, "git://github.com/gburd/plain_fsm", {branch, "master"}}} +% , {basho_bench, ".*", {git, "git://github.com/basho/basho_bench", {branch, "master"}}} + , {ebloom, ".*", {git, "git://github.com/basho/ebloom", {branch, "master"}}} , {triq, ".*", {git, "git://github.com/krestenkrab/triq", {branch, "master"}}} , {lz4, ".*", {git, "git://github.com/krestenkrab/erlang-lz4.git", {branch, "master"}}} % , {edown, "0.3.*", {git, "git://github.com/esl/edown.git", {branch, "master"}}} diff --git a/src/basho_bench_driver_hanoidb.erl b/src/basho_bench_driver_hanoidb.erl deleted file mode 100644 index f4fdc0a..0000000 --- a/src/basho_bench_driver_hanoidb.erl +++ /dev/null @@ -1,122 +0,0 @@ -%% ---------------------------------------------------------------------------- -%% -%% hanoidb: LSM-trees (Log-Structured Merge Trees) Indexed Storage -%% -%% Copyright 2011-2012 (c) Trifork A/S. All Rights Reserved. -%% http://trifork.com/ info@trifork.com -%% -%% Copyright 2012 (c) Basho Technologies, Inc. All Rights Reserved. -%% http://basho.com/ info@basho.com -%% -%% This file is provided to you under the Apache License, Version 2.0 (the -%% "License"); you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -%% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -%% License for the specific language governing permissions and limitations -%% under the License. -%% -%% ---------------------------------------------------------------------------- - --module(basho_bench_driver_hanoidb). - --record(state, { tree :: hanoidb:hanoidb(), - filename :: string(), - flags :: list(), - sync_interval :: integer() | infinity, - last_sync :: erlang:timestamp() }). - --export([new/1, - run/4]). - --include("hanoidb.hrl"). --include_lib("basho_bench/include/basho_bench.hrl"). - --record(key_range, { from_key = <<>> :: binary(), - from_inclusive = true :: boolean(), - to_key :: binary() | undefined, - to_inclusive = false :: boolean(), - limit :: pos_integer() | undefined }). - -%% ==================================================================== -%% API -%% ==================================================================== - -new(_Id) -> - %% Make sure bitcask is available - case code:which(hanoidb) of - non_existing -> - ?FAIL_MSG("~s requires hanoidb to be available on code path.\n", - [?MODULE]); - _ -> - ok - end, - - %% Get the target directory - Dir = basho_bench_config:get(hanoidb_dir, "."), - Filename = filename:join(Dir, "test.hanoidb"), - Config = basho_bench_config:get(hanoidb_flags, []), - - %% Look for sync interval config - SyncInterval = - case basho_bench_config:get(hanoidb_sync_interval, infinity) of - Value when is_integer(Value) -> - Value; - infinity -> - infinity - end, - - %% Get any bitcask flags - case hanoidb:open(Filename, Config) of - {error, Reason} -> - ?FAIL_MSG("Failed to open hanoidb in ~s: ~p\n", [Filename, Reason]); - {ok, FBTree} -> - {ok, #state { tree = FBTree, - filename = Filename, - sync_interval = SyncInterval, - last_sync = os:timestamp() }} - end. - -run(get, KeyGen, _ValueGen, State) -> - case hanoidb:lookup(State#state.tree, KeyGen()) of - {ok, _Value} -> - {ok, State}; - not_found -> - {ok, State}; - {error, Reason} -> - {error, Reason} - end; -run(put, KeyGen, ValueGen, State) -> - case hanoidb:put(State#state.tree, KeyGen(), ValueGen()) of - ok -> - {ok, State}; - {error, Reason} -> - {error, Reason} - end; -run(delete, KeyGen, _ValueGen, State) -> - case hanoidb:delete(State#state.tree, KeyGen()) of - ok -> - {ok, State}; - {error, Reason} -> - {error, Reason} - end; - -run(fold_100, KeyGen, _ValueGen, State) -> - [From,To] = lists:usort([KeyGen(), KeyGen()]), - case hanoidb:fold_range(State#state.tree, - fun(_Key,_Value,Count) -> - Count+1 - end, - 0, - #key_range{ from_key=From, - to_key=To, - limit=100 }) of - Count when Count >= 0; Count =< 100 -> - {ok,State}; - Count -> - {error, {bad_fold_count, Count}} - end. diff --git a/src/hanoidb.hrl b/src/hanoidb.hrl index 0264518..219051b 100644 --- a/src/hanoidb.hrl +++ b/src/hanoidb.hrl @@ -50,7 +50,7 @@ -record(nursery, { log_file :: file:fd(), dir :: string(), - cache :: gb_tree(), + cache :: gb_trees:tree(binary(), binary()), total_size=0 :: integer(), count=0 :: integer(), last_sync=now() :: erlang:timestamp(), @@ -69,3 +69,22 @@ | value() | filepos(). + +-ifdef(USE_SCALABLE_BLOOM). + +-define(BLOOM_NEW(Size), {ok, hanoidb_bloom:bloom(Size, 0.01)}). +-define(BLOOM_TO_BIN(Bloom), hanoidb_bloom:encode(Bloom)). %% -> Binary +-define(BIN_TO_BLOOM(Bin), {ok, hanoidb_bloom:decode(Bin)}). +-define(BLOOM_INSERT(Bloom, Key), {ok, hanoidb_bloom:add(Key,Bloom)}). +-define(BLOOM_CONTAINS(Bloom, Key), hanoidb_bloom:member(Bloom, Key)). %% -> 'true' | 'false' + +-else. + +-define(BLOOM_NEW(Size), begin ebloom:new(Size, 0.01, Size) end). +-define(BLOOM_TO_BIN(Bloom), begin ebloom:serialize(Bloom) end). %% -> Binary +-define(BIN_TO_BLOOM(Bin), begin ebloom:deserialize(Bin) end). +-define(BLOOM_INSERT(Bloom, Key), begin ok=ebloom:insert(Bloom, Key), {ok, Bloom} end). +-define(BLOOM_CONTAINS(Bloom, Key), begin ebloom:member(Bloom, Key) end). %% -> 'true' | 'false' + +-endif. + diff --git a/src/hanoidb_level.erl b/src/hanoidb_level.erl index 0a6faeb..75fd40e 100644 --- a/src/hanoidb_level.erl +++ b/src/hanoidb_level.erl @@ -794,8 +794,8 @@ begin_merge(State) -> Owner ! ?CAST(self(),{merge_done, OutCount, XFileName}) catch C:E -> - error_logger:error_msg("merge failed ~p:~p ~p~n", - [C,E,erlang:get_stacktrace()]), + error_logger:error_msg("~p: merge failed ~p:~p ~p -> ~s~n", + [self(), C,E,erlang:get_stacktrace(), XFileName]), erlang:raise(C,E,erlang:get_stacktrace()) end end), diff --git a/src/hanoidb_merger.erl b/src/hanoidb_merger.erl index 8f30712..1a54a5d 100644 --- a/src/hanoidb_merger.erl +++ b/src/hanoidb_merger.erl @@ -58,8 +58,8 @@ merge(A,B,C, Size, IsLastLevel, Options) -> scan(IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {0, none}). terminate(Out) -> - {ok, Count, _} = hanoidb_writer:handle_call(count, self(), Out), - {stop, normal, ok, _} = hanoidb_writer:handle_call(close, self(), Out), + {ok, Count, Out1} = hanoidb_writer:handle_call(count, self(), Out), + {stop, normal, ok, _Out2} = hanoidb_writer:handle_call(close, self(), Out1), {ok, Count}. step(S) -> diff --git a/src/hanoidb_reader.erl b/src/hanoidb_reader.erl index 932c6d8..ced9e7b 100644 --- a/src/hanoidb_reader.erl +++ b/src/hanoidb_reader.erl @@ -85,7 +85,7 @@ open(Name, Config) -> {ok, <>} = file:pread(File, FileInfo#file_info.size - 8, 8), {ok, <>} = file:pread(File, FileInfo#file_info.size - 12, 4), {ok, BloomData} = file:pread(File, (FileInfo#file_info.size - 12 - BloomSize), BloomSize), - Bloom = hanoidb_bloom:decode(BloomData), + {ok, Bloom} = ?BIN_TO_BLOOM(BloomData), %% read in the root node Root = @@ -279,7 +279,7 @@ close(#index{file=File}) -> lookup(#index{file=File, root=Node, bloom=Bloom}, Key) -> - case hanoidb_bloom:member(Key, Bloom) of + case ?BLOOM_CONTAINS(Bloom, Key) of true -> case lookup_in_node(File, Node, Key) of not_found -> diff --git a/src/hanoidb_writer.erl b/src/hanoidb_writer.erl index 25b5783..9232807 100644 --- a/src/hanoidb_writer.erl +++ b/src/hanoidb_writer.erl @@ -94,7 +94,7 @@ init([Name, Options]) -> case do_open(Name, Options, [exclusive]) of {ok, IdxFile} -> ok = file:write(IdxFile, ?FILE_FORMAT), - Bloom = hanoidb_bloom:bloom(Size), + {ok, Bloom} = ?BLOOM_NEW(Size), BlockSize = hanoidb:get_opt(block_size, Options, ?NODE_SIZE), {ok, #state{ name=Name, index_file_pos=?FIRST_BLOCK_POS, index_file=IdxFile, @@ -170,11 +170,11 @@ serialize(#state{ bloom=Bloom, index_file=File, index_file_pos=Position }=State) exit({bad_position, Position, WrongPosition}) end, ok = file:close(File), - erlang:term_to_binary( { State#state{ index_file=undefined }, hanoidb_bloom:encode(Bloom) } ). + erlang:term_to_binary( { State#state{ index_file=undefined, bloom=undefined }, ?BLOOM_TO_BIN(Bloom) } ). deserialize(Binary) -> {State, Bin} = erlang:binary_to_term(Binary), - Bloom = hanoidb_bloom:decode(Bin), + {ok, Bloom} = ?BIN_TO_BLOOM(Bin), {ok, IdxFile} = do_open(State#state.name, State#state.opts, []), State#state{ bloom=Bloom, index_file=IdxFile }. @@ -182,13 +182,14 @@ deserialize(Binary) -> do_open(Name, Options, OpenOpts) -> WriteBufferSize = hanoidb:get_opt(write_buffer_size, Options, 512 * 1024), file:open(hanoidb_util:index_file_name(Name), - [raw, append, {delayed_write, WriteBufferSize, 2000} | OpenOpts]). + [raw, append, {delayed_write, WriteBufferSize, 2000}, exclusive | OpenOpts]). %% @doc flush pending nodes and write trailer archive_nodes(#state{ nodes=[], last_node_pos=LastNodePos, last_node_size=_LastNodeSize, bloom=Bloom, index_file=IdxFile }=State) -> - BloomBin = hanoidb_bloom:encode(Bloom), + BloomBin = ?BLOOM_TO_BIN(Bloom), + true = is_binary(BloomBin), BloomSize = byte_size(BloomBin), RootPos = case LastNodePos of @@ -204,7 +205,7 @@ archive_nodes(#state{ nodes=[], last_node_pos=LastNodePos, last_node_size=_LastN ok = file:write(IdxFile, Trailer), ok = file:datasync(IdxFile), ok = file:close(IdxFile), - {ok, State#state{ index_file=undefined, index_file_pos=undefined }}; + {ok, State#state{ index_file=undefined, index_file_pos=undefined, bloom=undefined }}; archive_nodes(State=#state{ nodes=[#node{level=N, members=[{_,{Pos,_Len}}]}], last_node_pos=Pos }) when N > 0 -> @@ -221,7 +222,8 @@ append_node(Level, Key, Value, State=#state{ nodes=[] }) -> append_node(Level, Key, Value, State=#state{ nodes=[ #node{level=Level2 } |_]=Stack }) when Level < Level2 -> append_node(Level, Key, Value, State#state{ nodes=[ #node{ level=(Level2 - 1) } | Stack] }); -append_node(Level, Key, Value, #state{ nodes=[ #node{level=Level, members=List, size=NodeSize}=CurrNode | RestNodes ], value_count=VC, tombstone_count=TC, bloom=Bloom }=State) -> +append_node(Level, Key, Value, #state{ nodes=[ #node{level=Level, members=List, size=NodeSize}=CurrNode | RestNodes ], value_count=VC, tombstone_count=TC, bloom=Bloom }=State) + when Bloom /= undefined -> %% The top-of-stack node is at the level we wish to insert at. %% Assert that keys are increasing: @@ -236,10 +238,14 @@ append_node(Level, Key, Value, #state{ nodes=[ #node{level=Level, members=List, exit({badarg, Key}) end end, - NewSize = NodeSize + hanoidb_util:estimate_node_size_increment(List, Key, Value), - NewBloom = hanoidb_bloom:add(Key, Bloom), + {ok,Bloom2} = case Level of + 0 -> + ?BLOOM_INSERT(Bloom, Key); + _ -> + {ok,Bloom} + end, {TC1, VC1} = case Level of @@ -258,7 +264,7 @@ append_node(Level, Key, Value, #state{ nodes=[ #node{level=Level, members=List, NodeMembers = [{Key, Value} | List], State2 = State#state{ nodes=[CurrNode#node{members=NodeMembers, size=NewSize} | RestNodes], - value_count=VC1, tombstone_count=TC1, bloom=NewBloom }, + value_count=VC1, tombstone_count=TC1, bloom=Bloom2 }, case NewSize >= State#state.block_size of true -> @@ -267,7 +273,8 @@ append_node(Level, Key, Value, #state{ nodes=[ #node{level=Level, members=List, {ok, State2} end. -flush_node_buffer(#state{nodes=[#node{ level=Level, members=NodeMembers }|RestNodes], compress=Compress, index_file_pos=NodePos} = State) -> +flush_node_buffer(#state{nodes=[#node{ level=Level, members=NodeMembers }|RestNodes], compress=Compress, index_file_pos=NodePos } = State) -> + OrderedMembers = lists:reverse(NodeMembers), {ok, BlockData} = hanoidb_util:encode_index_node(OrderedMembers, Compress),