Add bloom filter to btree index format
This commit is contained in:
parent
6e13f55044
commit
5af86b9e23
6 changed files with 47 additions and 19 deletions
|
@ -2,5 +2,6 @@
|
||||||
{cover_enabled, true}.
|
{cover_enabled, true}.
|
||||||
|
|
||||||
{deps, [
|
{deps, [
|
||||||
{plain_fsm, "1.1.*", {git, "git://github.com/uwiger/plain_fsm", {branch, "master"}}}
|
{plain_fsm, "1.1.*", {git, "git://github.com/uwiger/plain_fsm", {branch, "master"}}},
|
||||||
|
{ebloom, "1.0.*", {git, "git://github.com/basho/ebloom.git", {branch, "master"}}}
|
||||||
]}.
|
]}.
|
||||||
|
|
|
@ -269,7 +269,8 @@ begin_merge(State) ->
|
||||||
Owner = self(),
|
Owner = self(),
|
||||||
|
|
||||||
spawn_link(fun() ->
|
spawn_link(fun() ->
|
||||||
{ok, OutCount} = fractal_btree_merger:merge(AFileName, BFileName, XFileName),
|
{ok, OutCount} = fractal_btree_merger:merge(AFileName, BFileName, XFileName,
|
||||||
|
State#state.level * 2),
|
||||||
Owner ! {merge_done, OutCount, XFileName}
|
Owner ! {merge_done, OutCount, XFileName}
|
||||||
end),
|
end),
|
||||||
|
|
||||||
|
|
|
@ -4,12 +4,12 @@
|
||||||
%% Naive Merge of two b-trees. A better implementation should iterate leafs, not KV's
|
%% Naive Merge of two b-trees. A better implementation should iterate leafs, not KV's
|
||||||
%%
|
%%
|
||||||
|
|
||||||
-export([merge/3]).
|
-export([merge/4]).
|
||||||
|
|
||||||
-record(state, { out, a_pid, b_pid }).
|
-record(state, { out, a_pid, b_pid }).
|
||||||
|
|
||||||
merge(A,B,C) ->
|
merge(A,B,C, Size) ->
|
||||||
{ok, Out} = fractal_btree_writer:open(C),
|
{ok, Out} = fractal_btree_writer:open(C, Size),
|
||||||
Owner = self(),
|
Owner = self(),
|
||||||
PID1 = spawn_link(fun() -> scan(Owner, A) end),
|
PID1 = spawn_link(fun() -> scan(Owner, A) end),
|
||||||
PID2 = spawn_link(fun() -> scan(Owner, B) end),
|
PID2 = spawn_link(fun() -> scan(Owner, B) end),
|
||||||
|
|
|
@ -5,7 +5,7 @@
|
||||||
-export([open/1,close/1,lookup/2,fold/3]).
|
-export([open/1,close/1,lookup/2,fold/3]).
|
||||||
|
|
||||||
-record(node, { level, members=[] }).
|
-record(node, { level, members=[] }).
|
||||||
-record(index, {file, root}).
|
-record(index, {file, root, bloom}).
|
||||||
|
|
||||||
open(Name) ->
|
open(Name) ->
|
||||||
|
|
||||||
|
@ -14,11 +14,15 @@ open(Name) ->
|
||||||
|
|
||||||
%% read root position
|
%% read root position
|
||||||
{ok, <<RootPos:64/unsigned>>} = file:pread(File, FileInfo#file_info.size-8, 8),
|
{ok, <<RootPos:64/unsigned>>} = file:pread(File, FileInfo#file_info.size-8, 8),
|
||||||
|
{ok, <<BloomSize:32/unsigned>>} = file:pread(File, FileInfo#file_info.size-12, 4),
|
||||||
|
{ok, BloomData} = file:pread(File, FileInfo#file_info.size-12-BloomSize ,BloomSize),
|
||||||
|
|
||||||
|
{ok, Bloom} = ebloom:deserialize(BloomData),
|
||||||
|
|
||||||
%% suck in the root
|
%% suck in the root
|
||||||
{ok, Root} = read_node(File, RootPos),
|
{ok, Root} = read_node(File, RootPos),
|
||||||
|
|
||||||
{ok, #index{file=File, root=Root}}.
|
{ok, #index{file=File, root=Root, bloom=Bloom}}.
|
||||||
|
|
||||||
|
|
||||||
fold(Fun, Acc0, #index{file=File}) ->
|
fold(Fun, Acc0, #index{file=File}) ->
|
||||||
|
@ -43,8 +47,13 @@ close(#index{file=File}) ->
|
||||||
file:close(File).
|
file:close(File).
|
||||||
|
|
||||||
|
|
||||||
lookup(#index{file=File, root=Node},Key) ->
|
lookup(#index{file=File, root=Node, bloom=Bloom}, Key) ->
|
||||||
lookup_in_node(File,Node,Key).
|
case ebloom:contains(Bloom, Key) of
|
||||||
|
true ->
|
||||||
|
lookup_in_node(File,Node,Key);
|
||||||
|
false ->
|
||||||
|
notfound
|
||||||
|
end.
|
||||||
|
|
||||||
lookup_in_node(_File,#node{level=0,members=Members},Key) ->
|
lookup_in_node(_File,#node{level=0,members=Members},Key) ->
|
||||||
case lists:keyfind(Key,1,Members) of
|
case lists:keyfind(Key,1,Members) of
|
||||||
|
|
|
@ -1,6 +1,11 @@
|
||||||
|
|
||||||
-module(fractal_btree_writer).
|
-module(fractal_btree_writer).
|
||||||
|
|
||||||
|
%%
|
||||||
|
%% Streaming btree writer. Accepts only monotonically increasing keys for put.
|
||||||
|
%%
|
||||||
|
|
||||||
|
%% TODO: add a bloom filter to the file
|
||||||
|
|
||||||
-define(NODE_SIZE, 2*1024).
|
-define(NODE_SIZE, 2*1024).
|
||||||
|
|
||||||
-behavior(gen_server).
|
-behavior(gen_server).
|
||||||
|
@ -9,7 +14,7 @@
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||||
terminate/2, code_change/3]).
|
terminate/2, code_change/3]).
|
||||||
|
|
||||||
-export([open/1, add/3,close/1]).
|
-export([open/1, open/2, add/3,close/1]).
|
||||||
|
|
||||||
-record(node, { level, members=[], size=0 }).
|
-record(node, { level, members=[], size=0 }).
|
||||||
|
|
||||||
|
@ -20,14 +25,20 @@
|
||||||
|
|
||||||
nodes = [] :: [ #node{} ],
|
nodes = [] :: [ #node{} ],
|
||||||
|
|
||||||
name :: string()
|
name :: string(),
|
||||||
|
|
||||||
|
bloom
|
||||||
}).
|
}).
|
||||||
|
|
||||||
|
|
||||||
%%% PUBLIC API
|
%%% PUBLIC API
|
||||||
|
|
||||||
|
open(Name,Size) ->
|
||||||
|
gen_server:start(?MODULE, [Name,Size], []).
|
||||||
|
|
||||||
|
|
||||||
open(Name) ->
|
open(Name) ->
|
||||||
gen_server:start(?MODULE, [Name], []).
|
gen_server:start(?MODULE, [Name,2048], []).
|
||||||
|
|
||||||
|
|
||||||
add(Ref,Key,Data) ->
|
add(Ref,Key,Data) ->
|
||||||
|
@ -39,15 +50,16 @@ close(Ref) ->
|
||||||
%%%
|
%%%
|
||||||
|
|
||||||
|
|
||||||
init([Name]) ->
|
init([Name,Size]) ->
|
||||||
|
|
||||||
% io:format("got name: ~p~n", [Name]),
|
% io:format("got name: ~p~n", [Name]),
|
||||||
|
|
||||||
{ok, IdxFile} = file:open( fractal_btree_util:index_file_name(Name),
|
{ok, IdxFile} = file:open( fractal_btree_util:index_file_name(Name),
|
||||||
[raw, exclusive, write, delayed_write]),
|
[raw, exclusive, write, delayed_write]),
|
||||||
|
{ok, BloomFilter} = ebloom:new(Size, 0.01, 123),
|
||||||
{ok, #state{ name=Name,
|
{ok, #state{ name=Name,
|
||||||
index_file_pos=0, index_file=IdxFile
|
index_file_pos=0, index_file=IdxFile,
|
||||||
|
bloom = BloomFilter
|
||||||
}}.
|
}}.
|
||||||
|
|
||||||
handle_cast({add, Key, Data}, State) when is_binary(Key), is_binary(Data) ->
|
handle_cast({add, Key, Data}, State) when is_binary(Key), is_binary(Data) ->
|
||||||
|
@ -79,9 +91,12 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
flush_nodes(#state{ nodes=[], last_node_pos=LastNodePos }=State) ->
|
flush_nodes(#state{ nodes=[], last_node_pos=LastNodePos, bloom=Ref }=State) ->
|
||||||
|
|
||||||
Trailer = << 0:8, LastNodePos:64/unsigned >>,
|
Bloom = ebloom:serialize(Ref),
|
||||||
|
BloomSize = byte_size(Bloom),
|
||||||
|
|
||||||
|
Trailer = << 0:32, Bloom/binary, BloomSize:32/unsigned, LastNodePos:64/unsigned >>,
|
||||||
IdxFile = State#state.index_file,
|
IdxFile = State#state.index_file,
|
||||||
|
|
||||||
ok = file:write(IdxFile, Trailer),
|
ok = file:write(IdxFile, Trailer),
|
||||||
|
@ -112,6 +127,8 @@ add_record(Level, Key, Value, #state{ nodes=[ #node{level=Level, members=List, s
|
||||||
|
|
||||||
NewSize = NodeSize + fractal_btree_util:estimate_node_size_increment(List, Key, Value),
|
NewSize = NodeSize + fractal_btree_util:estimate_node_size_increment(List, Key, Value),
|
||||||
|
|
||||||
|
ebloom:insert( State#state.bloom, Key ),
|
||||||
|
|
||||||
NodeMembers = [{Key,Value} | List],
|
NodeMembers = [{Key,Value} | List],
|
||||||
if
|
if
|
||||||
NewSize >= ?NODE_SIZE ->
|
NewSize >= ?NODE_SIZE ->
|
||||||
|
|
|
@ -83,7 +83,7 @@ merge_test() ->
|
||||||
ok = fractal_btree_writer:close(BT2),
|
ok = fractal_btree_writer:close(BT2),
|
||||||
|
|
||||||
|
|
||||||
{Time,{ok,Count}} = timer:tc(fractal_btree_merger, merge, ["test1", "test2", "test3"]),
|
{Time,{ok,Count}} = timer:tc(fractal_btree_merger, merge, ["test1", "test2", "test3", 8]),
|
||||||
|
|
||||||
error_logger:info_msg("time to merge: ~p/sec (time=~p, count=~p)~n", [1000000/(Time/Count), Time/1000000, Count]),
|
error_logger:info_msg("time to merge: ~p/sec (time=~p, count=~p)~n", [1000000/(Time/Count), Time/1000000, Count]),
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue