diff --git a/src/hanoidb.hrl b/src/hanoidb.hrl index 72cf08c..f6df372 100644 --- a/src/hanoidb.hrl +++ b/src/hanoidb.hrl @@ -82,3 +82,13 @@ -define(BLOOM_INSERT(Bloom, Key), hanoidb_util:bloom_insert(Bloom, Key)). -define(BLOOM_CONTAINS(Bloom, Key), hanoidb_util:bloom_contains(Bloom, Key)). +%% tags used in the on-disk representation +-define(TAG_KV_DATA, 16#80). +-define(TAG_DELETED, 16#81). +-define(TAG_POSLEN32, 16#82). +-define(TAG_TRANSACT, 16#83). +-define(TAG_KV_DATA2, 16#84). +-define(TAG_DELETED2, 16#85). +-define(TAG_END, 16#FF). + + diff --git a/src/hanoidb_merger.erl b/src/hanoidb_merger.erl index 85c5693..3b18917 100644 --- a/src/hanoidb_merger.erl +++ b/src/hanoidb_merger.erl @@ -69,12 +69,12 @@ merge(A,B,C, Size, IsLastLevel, Options) -> {ok, Out} = hanoidb_writer:init([C, [{size, Size} | Options]]), AKVs = case hanoidb_reader:first_node(IXA) of - {node, AKV} -> AKV; + {kvlist, AKV} -> AKV; none -> [] end, BKVs = case hanoidb_reader:first_node(IXB) of - {node, BKV} ->BKV; + {kvlist, BKV} ->BKV; none -> [] end, scan(IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {0, none}). @@ -177,7 +177,7 @@ scan(IXA, IXB, Out, IsLastLevel, AKVs, BKVs, {N, FromPID}) when N < 1, AKVs =/= scan(IXA, IXB, Out, IsLastLevel, [], BKVs, Step) -> case hanoidb_reader:next_node(IXA) of - {node, AKVs} -> + {kvlist, AKVs} -> scan(IXA, IXB, Out, IsLastLevel, AKVs, BKVs, Step); end_of_data -> hanoidb_reader:close(IXA), @@ -186,7 +186,7 @@ scan(IXA, IXB, Out, IsLastLevel, [], BKVs, Step) -> scan(IXA, IXB, Out, IsLastLevel, AKVs, [], Step) -> case hanoidb_reader:next_node(IXB) of - {node, BKVs} -> + {kvlist, BKVs} -> scan(IXA, IXB, Out, IsLastLevel, AKVs, BKVs, Step); end_of_data -> hanoidb_reader:close(IXB), @@ -248,7 +248,7 @@ scan_only(IX, Out, IsLastLevel, KVs, {N, FromPID}) when N < 1, KVs =/= [] -> scan_only(IX, Out, IsLastLevel, [], {_, FromPID}=Step) -> case hanoidb_reader:next_node(IX) of - {node, KVs} -> + {kvlist, KVs} -> scan_only(IX, Out, IsLastLevel, KVs, Step); end_of_data -> case FromPID of diff --git a/src/hanoidb_reader.erl b/src/hanoidb_reader.erl index 940df57..1944e1d 100644 --- a/src/hanoidb_reader.erl +++ b/src/hanoidb_reader.erl @@ -37,24 +37,23 @@ -export([serialize/1, deserialize/1]). -record(node, {level :: non_neg_integer(), - members=[] :: list(any()) }). + members=[] :: list(any()) | binary() }). -record(index, {file :: file:io_device(), - root :: #node{} | none, + root= none :: #node{} | none, bloom :: term(), name :: string(), config=[] :: term() }). -type read_file() :: #index{}. +-export_type([read_file/0]). -spec open(Name::string()) -> {ok, read_file()} | {error, any()}. open(Name) -> open(Name, [random]). -type config() :: [sequential | folding | random | {atom(), term()}]. - -spec open(Name::string(), config()) -> {ok, read_file()} | {error, any()}. - open(Name, Config) -> case proplists:get_bool(sequential, Config) of true -> @@ -115,11 +114,15 @@ deserialize({seq_read_file, Index, Position}) -> + fold(Fun, Acc0, #index{file=File}) -> {ok, Node} = read_node(File,?FIRST_BLOCK_POS), fold0(File,fun({K,V},Acc) -> Fun(K,V,Acc) end,Node,Acc0). -fold0(File,Fun,#node{level=0, members=List},Acc0) -> +fold0(File,Fun,#node{level=0, members=BinPage},Acc0) when is_binary(BinPage) -> + Acc1 = vbisect:foldl(fun(K, V, Acc2) -> Fun({K, decode_binary_value(V)}, Acc2) end,Acc0,BinPage), + fold1(File,Fun,Acc1); +fold0(File,Fun,#node{level=0, members=List},Acc0) when is_list(List) -> Acc1 = lists:foldl(Fun,Acc0,List), fold1(File,Fun,Acc1); fold0(File,Fun,_InnerNode,Acc0) -> @@ -133,22 +136,39 @@ fold1(File,Fun,Acc0) -> fold0(File,Fun,Node,Acc0) end. --spec range_fold(function(), any(), #index{}, #key_range{}) -> +-spec range_fold(fun((binary(),binary(),any()) -> any()), any(), #index{}, #key_range{}) -> {limit, any(), binary()} | {done, any()}. range_fold(Fun, Acc0, #index{file=File,root=Root}, Range) -> - case lookup_node(File,Range#key_range.from_key,Root,?FIRST_BLOCK_POS) of - {ok, {Pos,_}} -> - {ok, _} = file:position(File, Pos), - do_range_fold(Fun, Acc0, File, Range, Range#key_range.limit); - {ok, Pos} -> - {ok, _} = file:position(File, Pos), - do_range_fold(Fun, Acc0, File, Range, Range#key_range.limit); - none -> - {done, Acc0} + case Range#key_range.from_key =< first_key(Root) of + true -> + {ok, _} = file:position(File, ?FIRST_BLOCK_POS), + range_fold_from_here(Fun, Acc0, File, Range, Range#key_range.limit); + false -> + case find_leaf_node(File,Range#key_range.from_key,Root,?FIRST_BLOCK_POS) of + {ok, {Pos,_}} -> + {ok, _} = file:position(File, Pos), + range_fold_from_here(Fun, Acc0, File, Range, Range#key_range.limit); + {ok, Pos} -> + {ok, _} = file:position(File, Pos), + range_fold_from_here(Fun, Acc0, File, Range, Range#key_range.limit); + none -> + {done, Acc0} + end end. -fold_until_stop(Fun,Acc,List) -> - fold_until_stop2(Fun, {continue, Acc}, List). +first_key(#node{members=Dict}) -> + {_,FirstKey} = fold_until_stop(fun({K,_},_) -> {stop, K} end, none, Dict), + FirstKey. + +fold_until_stop(Fun,Acc,List) when is_list(List) -> + fold_until_stop2(Fun, {continue, Acc}, List); +fold_until_stop(Fun,Acc0,Bin) when is_binary(Bin) -> + vbisect:fold_until_stop(fun({Key,VBin},Acc1) -> +% io:format("-> DOING ~p,~p~n", [Key,Acc1]), + Fun({Key, decode_binary_value(VBin)}, Acc1) + end, + Acc0, + Bin). fold_until_stop2(_Fun,{stop,Result},_) -> {stopped, Result}; @@ -170,7 +190,8 @@ get_value({Value, _TStamp}) -> get_value(Value) -> Value. -do_range_fold(Fun, Acc0, File, Range, undefined) -> +range_fold_from_here(Fun, Acc0, File, Range, undefined) -> +% io:format("RANGE_FOLD_FROM_HERE(~p,~p)~n", [Acc0,File]), case next_leaf_node(File) of eof -> {done, Acc0}; @@ -185,18 +206,19 @@ do_range_fold(Fun, Acc0, File, Range, undefined) -> false -> {continue, Fun(Key, get_value(Value), Acc)} end; - (_, Acc) -> + (_Huh, Acc) -> +% io:format("SKIPPING ~p~n", [_Huh]), {continue, Acc} end, Acc0, Members) of {stopped, Result} -> Result; {ok, Acc1} -> - do_range_fold(Fun, Acc1, File, Range, undefined) + range_fold_from_here(Fun, Acc1, File, Range, undefined) end end; -do_range_fold(Fun, Acc0, File, Range, N0) -> +range_fold_from_here(Fun, Acc0, File, Range, N0) -> case next_leaf_node(File) of eof -> {done, Acc0}; @@ -226,52 +248,74 @@ do_range_fold(Fun, Acc0, File, Range, N0) -> {continue, Acc} end, {N0, Acc0}, - Members) of - {stopped, Result} -> Result; + Members) + of + {stopped, Result} -> + Result; {ok, {N2, Acc1}} -> - do_range_fold(Fun, Acc1, File, Range, N2) + range_fold_from_here(Fun, Acc1, File, Range, N2) end end. -lookup_node(_File,_FromKey,#node{level=0},Pos) -> +find_leaf_node(_File,_FromKey,#node{level=0},Pos) -> {ok, Pos}; -lookup_node(File,FromKey,#node{members=Members,level=N},_) -> +find_leaf_node(File,FromKey,#node{members=Members,level=N},_) when is_list(Members) -> case find_start(FromKey, Members) of - {ok, ChildPos} when N==1 -> - {ok, ChildPos}; {ok, ChildPos} -> - case read_node(File,ChildPos) of - {ok, ChildNode} -> - lookup_node(File,FromKey,ChildNode,ChildPos); - eof -> - none - end; + recursive_find(File, FromKey, N, ChildPos); not_found -> none end; -lookup_node(_,_,none,_) -> +find_leaf_node(File,FromKey,#node{members=Members,level=N},_) when is_binary(Members) -> + case vbisect:find_geq(FromKey,Members) of + {ok, _, <>} -> +% io:format("** FIND_LEAF_NODE(~p,~p) -> {~p,~p}~n", [FromKey, N, Pos,Len]), + recursive_find(File, FromKey, N, {Pos,Len}); + none -> +% io:format("** FIND_LEAF_NODE(~p,~p) -> none~n", [FromKey, N]), + none + end; +find_leaf_node(_,_,none,_) -> none. +recursive_find(_File,_FromKey,1,ChildPos) -> + {ok, ChildPos}; +recursive_find(File,FromKey,N,ChildPos) when N>1 -> + case read_node(File,ChildPos) of + {ok, ChildNode} -> + find_leaf_node(File, FromKey,ChildNode,ChildPos); + eof -> + none + end. +%% used by the merger, needs list value first_node(#index{file=File}) -> case read_node(File, ?FIRST_BLOCK_POS) of {ok, #node{level=0, members=Members}} -> - {node, Members}; + {kvlist, decode_member_list(Members)}; eof-> none end. +%% used by the merger, needs list value next_node(#index{file=File}=_Index) -> case next_leaf_node(File) of {ok, #node{level=0, members=Members}} -> - {node, Members}; -% {ok, #node{level=N}} when N>0 -> -% next_node(Index); + {kvlist, decode_member_list(Members)}; eof -> end_of_data end. +decode_member_list(List) when is_list(List) -> + List; +decode_member_list(BinDict) when is_binary(BinDict) -> + vbisect:foldr( fun(Key,Value,Acc) -> + [{Key, decode_binary_value(Value) }|Acc] + end, + [], + BinDict). + close(#index{file=undefined}) -> ok; close(#index{file=File}) -> @@ -297,11 +341,20 @@ lookup(#index{file=File, root=Node, bloom=Bloom}, Key) -> end. lookup_in_node(_File,#node{level=0,members=Members}, Key) -> - case lists:keyfind(Key,1,Members) of - false -> - not_found; - {_,Value} -> - {ok, Value} + find_in_leaf(Key,Members); + +lookup_in_node(File,#node{members=Members},Key) when is_binary(Members) -> + case vbisect:find_geq(Key,Members) of + {ok, _Key, <>} -> +% io:format("FOUND ~p @ ~p~n", [_Key, {Pos,Size}]), + case read_node(File,{Pos,Size}) of + {ok, Node} -> + lookup_in_node(File, Node, Key); + eof -> + not_found + end; + none -> + not_found end; lookup_in_node(File,#node{members=Members},Key) -> @@ -416,3 +469,29 @@ next_leaf_node(File) -> next_leaf_node(File) end. + +find_in_leaf(Key,Bin) when is_binary(Bin) -> + case vbisect:find(Key,Bin) of + {ok, BinValue} -> + {ok, decode_binary_value(BinValue)}; + error -> + not_found + end; +find_in_leaf(Key,List) when is_list(List) -> + case lists:keyfind(Key, 1, List) of + {_, Value} -> + {ok, Value}; + false -> + not_found + end. + +decode_binary_value(<>) -> + Value; +decode_binary_value(<>) -> + {Value, TStamp}; +decode_binary_value(<>) -> + ?TOMBSTONE; +decode_binary_value(<>) -> + {?TOMBSTONE, TStamp}; +decode_binary_value(<>) -> + {Pos, Len}. diff --git a/src/hanoidb_util.erl b/src/hanoidb_util.erl index f327784..6939f10 100644 --- a/src/hanoidb_util.erl +++ b/src/hanoidb_util.erl @@ -53,14 +53,10 @@ -define(ERLANG_ENCODED, 131). -define(CRC_ENCODED, 127). +-define(BISECT_ENCODED, 126). --define(TAG_KV_DATA, 16#80). --define(TAG_DELETED, 16#81). --define(TAG_POSLEN32, 16#82). --define(TAG_TRANSACT, 16#83). --define(TAG_KV_DATA2, 16#84). --define(TAG_DELETED2, 16#85). --define(TAG_END, 16#FF). + +-define(FILE_ENCODING, bisect). -compile({inline, [crc_encapsulate/1, crc_encapsulate_kv_entry/2 ]}). @@ -143,18 +139,47 @@ uncompress(<>) -> zlib:gunzip(Data). encode_index_node(KVList, Method) -> - TermData = [ ?TAG_END | - lists:map(fun ({Key,Value}) -> - crc_encapsulate_kv_entry(Key, Value) - end, - KVList) ], + TermData = + case ?FILE_ENCODING of + bisect -> + Binary = vbisect:from_orddict(lists:map(fun binary_encode_kv/1, KVList)), + CRC = erlang:crc32(Binary), + [?BISECT_ENCODED, <>, Binary]; + hanoi2 -> + [ ?TAG_END | + lists:map(fun ({Key,Value}) -> + crc_encapsulate_kv_entry(Key, Value) + end, + KVList) ] + end, {MethodName, OutData} = compress(Method, TermData), {ok, [MethodName | OutData]}. decode_index_node(Level, Data) -> TermData = uncompress(Data), - {ok, KVList} = decode_kv_list(TermData), - {ok, {node, Level, KVList}}. + case decode_kv_list(TermData) of + {ok, KVList} -> + {ok, {node, Level, KVList}}; + {bisect, Binary} -> +% io:format("[page level=~p~n", [Level]), +% vbisect:foldl(fun(K,V,_) -> io:format(" ~p -> ~p,~n", [K,V]) end, 0, Binary), +% io:format("]~n",[]), + {ok, {node, Level, Binary}} + end. + + +binary_encode_kv({Key, {Value,infinity}}) -> + binary_encode_kv({Key,Value}); +binary_encode_kv({Key, {?TOMBSTONE, TStamp}}) -> + {Key, <>}; +binary_encode_kv({Key, ?TOMBSTONE}) -> + {Key, <>}; +binary_encode_kv({Key, {Value, TStamp}}) when is_binary(Value) -> + {Key, <>}; +binary_encode_kv({Key, Value}) when is_binary(Value)-> + {Key, <>}; +binary_encode_kv({Key, {Pos, Len}}) when Len < 16#ffffffff -> + {Key, <>}. -spec crc_encapsulate_kv_entry(binary(), expvalue()) -> iolist(). @@ -193,7 +218,14 @@ decode_kv_list(<>) -> decode_kv_list(<>=TermData) -> {ok, erlang:term_to_binary(TermData)}; decode_kv_list(<>) -> - decode_crc_data(Custom, [], []). + decode_crc_data(Custom, [], []); +decode_kv_list(<>) -> + CRCTest = erlang:crc32( Binary ), + if CRC == CRCTest -> + {bisect, Binary}; + true -> + {bisect, vbisect:from_orddict([])} + end. -spec decode_crc_data(binary(), list(), list()) -> {ok, [kventry()]} | {partial, [kventry()], iolist()}. decode_crc_data(<<>>, [], Acc) -> diff --git a/src/vbisect.erl b/src/vbisect.erl new file mode 100644 index 0000000..8e1735f --- /dev/null +++ b/src/vbisect.erl @@ -0,0 +1,260 @@ + + +-module(vbisect). + +-export([from_orddict/1, + from_gb_tree/1, + to_gb_tree/1, + first_key/1, + find/2, find_geq/2, + foldl/3, foldr/3, fold_until_stop/3, + to_orddict/1, + merge/3]). + +-define(MAGIC, "vbis"). +-type key() :: binary(). +-type value() :: binary(). +-type bindict() :: binary(). + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). +-endif. + +-spec from_gb_tree(gb_trees:tree()) -> bindict(). +from_gb_tree({Count,Node}) when Count =< 16#ffffffff -> + {_BinSize,IOList} = encode_gb_node(Node), + erlang:iolist_to_binary([ <> | IOList ]). + +encode_gb_node({Key, Value, Smaller, Bigger}) when is_binary(Key), is_binary(Value) -> + {BinSizeSmaller, IOSmaller} = encode_gb_node(Smaller), + {BinSizeBigger, IOBigger} = encode_gb_node(Bigger), + + KeySize = byte_size(Key), + ValueSize = byte_size(Value), + { 2 + KeySize + + 4 + ValueSize + + 4 + BinSizeSmaller + + BinSizeBigger, + + [ << KeySize:16, Key/binary, + BinSizeSmaller:32 >>, IOSmaller, + << ValueSize:32, Value/binary >> | IOBigger ] }; + +encode_gb_node(nil) -> + { 0, [] }. + +to_gb_tree(<>) -> + { Count, to_gb_node(Nodes) }. + +to_gb_node( <<>> ) -> + nil; + +to_gb_node( << KeySize:16, Key:KeySize/binary, + BinSizeSmaller:32, Smaller:BinSizeSmaller/binary, + ValueSize:32, Value:ValueSize/binary, + Bigger/binary >> ) -> + {Key, Value, + to_gb_node(Smaller), + to_gb_node(Bigger)}. + +-spec find(Key::key(), Dict::bindict()) -> + { ok, value() } | error. +find(Key, <>) -> + find_node(byte_size(Key), Key, Binary). + +find_node(KeySize, Key, <> = Bin) -> + if + Key < HereKey -> + Skip = 6 + HereKeySize, + << _:Skip/binary, Smaller:BinSizeSmaller/binary, _/binary>> = Bin, + find_node(KeySize, Key, Smaller); + HereKey < Key -> + Skip = 10 + HereKeySize + BinSizeSmaller + ValueSize, + << _:Skip/binary, Bigger/binary>> = Bin, + find_node(KeySize, Key, Bigger); + true -> + {ok, Value} + end; + +find_node(_, _, <<>>) -> + error. + +to_orddict(BinDict) -> + foldr(fun(Key,Value,Acc) -> + [{Key,Value}|Acc] + end, + [], + BinDict). + +merge(Fun, BinDict1, BinDict2) -> + OD1 = to_orddict(BinDict1), + OD2 = to_orddict(BinDict2), + OD3 = orddict:merge(Fun, OD1, OD2), + from_orddict(OD3). + +-spec first_key( bindict() ) -> binary() | none. +first_key(BinDict) -> + {_, Key} = fold_until_stop(fun({K,_},_) -> {stop, K} end, none, BinDict), + Key. + +%% @doc Find largest {K,V} where K is smaller than or equal to key. +%% This is good for an inner node where key is the smallest key +%% in the child node. + +-spec find_geq(Key::binary(), Binary::binary()) -> + none | {ok, Key::key(), Value::value()}. + +find_geq(Key, <>) -> + find_geq_node(byte_size(Key), Key, Binary, none). + +find_geq_node(_, _, <<>>, Else) -> + Else; + +find_geq_node(KeySize, Key, <> = Bin, Else) -> + if + Key < HereKey -> + Skip = 6 + HereKeySize, + << _:Skip/binary, Smaller:BinSizeSmaller/binary, _/binary>> = Bin, + find_geq_node(KeySize, Key, Smaller, Else); + HereKey < Key -> + Skip = 10 + HereKeySize + BinSizeSmaller + ValueSize, + << _:Skip/binary, Bigger/binary>> = Bin, + find_geq_node(KeySize, Key, Bigger, {ok, HereKey, Value}); + true -> + {ok, HereKey, Value} + end. + +-spec foldl(fun((Key::key(), Value::value(), Acc::term()) -> term()), term(), bindict()) -> + term(). +foldl(Fun, Acc, <>) -> + foldl_node(Fun, Acc, Binary). + +foldl_node(_Fun, Acc, <<>>) -> + Acc; + +foldl_node(Fun, Acc, <>) -> + Acc1 = foldl_node(Fun, Acc, Smaller), + Acc2 = Fun(Key, Value, Acc1), + foldl_node(Fun, Acc2, Bigger). + + +-spec fold_until_stop(function(), term(), bindict()) -> {stopped, term()} | {ok, term()}. + +fold_until_stop(Fun, Acc, <>) -> + fold_until_stop2(Fun, {continue, Acc}, Bin). + +fold_until_stop2(_Fun,{stop,Result},_) -> + {stopped, Result}; +fold_until_stop2(_Fun,{continue, Acc},<<>>) -> + {ok, Acc}; +fold_until_stop2(Fun,{continue, Acc}, <>) -> + + case fold_until_stop2(Fun, {continue, Acc}, Smaller) of + {stopped, Result} -> + {stopped, Result}; + {ok, Acc1} -> + ContinueOrStopAcc = Fun({Key,Value}, Acc1), + fold_until_stop2(Fun, ContinueOrStopAcc, Bigger) + end. + + +-spec foldr(fun((Key::key(), Value::value(), Acc::term()) -> term()), term(), bindict()) -> + term(). +foldr(Fun, Acc, <>) -> + foldr_node(Fun, Acc, Binary). + +foldr_node(_Fun, Acc, <<>>) -> + Acc; + +foldr_node(Fun, Acc, <>) -> + Acc1 = foldr_node(Fun, Acc, Bigger), + Acc2 = Fun(Key, Value, Acc1), + foldr_node(Fun, Acc2, Smaller). + + +from_orddict(OrdDict) -> + from_gb_tree(gb_trees:from_orddict(OrdDict)). + +-ifdef(TEST). + +speed_test_() -> + {timeout, 600, + fun() -> + Start = 100000000000000, + N = 100000, + Keys = lists:seq(Start, Start+N), + KeyValuePairs = lists:map(fun (I) -> {<>, <<255:8/integer>>} end, + Keys), + + %% Will mostly be unique, if N is bigger than 10000 + ReadKeys = [<<(lists:nth(random:uniform(N), Keys)):64/integer>> || _ <- lists:seq(1, 1000)], + B = from_orddict(KeyValuePairs), + time_reads(B, N, ReadKeys) + end}. + + +geq_test() -> + B = from_orddict([{<<2>>,<<2>>},{<<4>>,<<4>>},{<<6>>,<<6>>},{<<122>>,<<122>>}]), + none = find_geq(<<1>>, B), + {ok, <<2>>, <<2>>} = find_geq(<<2>>, B), + {ok, <<2>>, <<2>>} = find_geq(<<3>>, B), + {ok, <<4>>, <<4>>} = find_geq(<<5>>, B), + {ok, <<6>>, <<6>>} = find_geq(<<100>>, B), + {ok, <<122>>, <<122>>} = find_geq(<<150>>, B), + true. + + +time_reads(B, Size, ReadKeys) -> + Parent = self(), + spawn( + fun() -> + Runs = 20, + Timings = + lists:map( + fun (_) -> + StartTime = now(), + find_many(B, ReadKeys), + timer:now_diff(now(), StartTime) + end, lists:seq(1, Runs)), + + Rps = 1000000 / ((lists:sum(Timings) / length(Timings)) / 1000), + error_logger:info_msg("Average over ~p runs, ~p keys in dict~n" + "Average fetch ~p keys: ~p us, max: ~p us~n" + "Average fetch 1 key: ~p us~n" + "Theoretical sequential RPS: ~w~n", + [Runs, Size, length(ReadKeys), + lists:sum(Timings) / length(Timings), + lists:max(Timings), + (lists:sum(Timings) / length(Timings)) / length(ReadKeys), + trunc(Rps)]), + + Parent ! done + end), + receive done -> ok after 1000 -> ok end. + +-spec find_many(bindict(), [key()]) -> non_neg_integer(). +find_many(B, Keys) -> + lists:foldl(fun (K, N) -> + case find(K, B) of + {ok, _} -> N+1; + error -> N + end + end, + 0, Keys). + +-endif.