Initial work-in-progress

This commit is contained in:
Kresten Krab Thorup 2012-01-04 15:05:31 +01:00
parent 0a71c6ee5c
commit 6e13f55044
12 changed files with 830 additions and 0 deletions

4
.gitignore vendored Normal file
View file

@ -0,0 +1,4 @@
ebin
deps
*~
.eunit

BIN
rebar vendored Executable file

Binary file not shown.

6
rebar.config Normal file
View file

@ -0,0 +1,6 @@
{cover_enabled, true}.
{deps, [
{plain_fsm, "1.1.*", {git, "git://github.com/uwiger/plain_fsm", {branch, "master"}}}
]}.

12
src/fractal_btree.app.src Normal file
View file

@ -0,0 +1,12 @@
{application, fractal_btree,
[
{description, ""},
{vsn, "1"},
{registered, []},
{applications, [
kernel,
stdlib
]},
{mod, { fractal_btree_app, []}},
{env, []}
]}.

16
src/fractal_btree_app.erl Normal file
View file

@ -0,0 +1,16 @@
-module(fractal_btree_app).
-behaviour(application).
%% Application callbacks
-export([start/2, stop/1]).
%% ===================================================================
%% Application callbacks
%% ===================================================================
start(_StartType, _StartArgs) ->
fractal_btree_sup:start_link().
stop(_State) ->
ok.

294
src/fractal_btree_level.erl Normal file
View file

@ -0,0 +1,294 @@
-module(fractal_btree_level).
%%
%% Manages a "pair" of fractal_index (or rathern, 0, 1 or 2), and governs
%% the process of injecting/merging parent trees into this pair.
%%
-behavior(plain_fsm).
-export([data_vsn/0, code_change/3]).
-export([open/2, lookup/2, inject/2, close/1]).
-include_lib("kernel/include/file.hrl").
-record(state, {
a, b, next, dir, level, inject_done_ref
}).
open(Dir,Level) ->
plain_fsm:spawn_link(?MODULE,
fun() ->
process_flag(trap_exit,true),
initialize(#state{dir=Dir,level=Level})
end).
lookup(Ref, Key) ->
call(Ref, {lookup, Key}).
inject(Ref, FileName) ->
call(Ref, {inject, FileName}).
close(Ref) ->
call(Ref, close).
data_vsn() ->
5.
code_change(_OldVsn, _State, _Extra) ->
{ok, {#state{}, data_vsn()}}.
-define(REQ(From,Msg), {'$req', From, Msg}).
-define(REPLY(Ref,Msg), {'$rep', Ref, Msg}).
send_request(PID, Request) ->
Ref = erlang:monitor(PID),
PID ! ?REQ({self(), Ref}, Request),
Ref.
receive_reply(MRef) ->
receive
?REPLY(MRef, Reply) ->
erlang:demonitor(MRef, [flush]),
Reply;
{'DOWN', MRef, _, _, Reason} ->
exit(Reason)
end.
call(PID,Request) ->
Ref = send_request(PID, Request),
receive_reply(Ref).
reply({PID,Ref}, Reply) ->
erlang:send(PID, ?REPLY(Ref, Reply)),
ok.
initialize(State) ->
AFileName = filename("A",State),
BFileName = filename("B",State),
CFileName = filename("C",State),
case file:read_file_info(CFileName) of
{ok, _} ->
%% recover from post-merge crash
file:delete(AFileName),
file:delete(BFileName),
ok = file:rename(CFileName, AFileName),
{ok, BT} = fractal_btree_reader:open(CFileName),
main_loop(State#state{ a= BT, b=undefined });
{error, enoent} ->
case file:read_file_info(BFileName) of
{ok, _} ->
{ok, BT1} = fractal_btree_reader:open(AFileName),
{ok, BT2} = fractal_btree_reader:open(BFileName),
main_loop(State#state{ a=BT1, b=BT2 });
{error, enoent} ->
case file:read_file_info(AFileName) of
{ok, _} ->
{ok, BT1} = fractal_btree_reader:open(AFileName),
main_loop(State#state{ a=BT1 });
{error, enoent} ->
main_loop(State)
end
end
end.
main_loop(State = #state{ a=undefined, b=undefined }) ->
Parent = plain_fsm:info(parent),
receive
?REQ(From, {lookup, _})=Msg ->
case State#state.next of
undefined ->
reply(From, notfound);
Next ->
Next ! Msg
end,
main_loop(State);
?REQ(From, {inject, FileName}) ->
{ok, BT} = fractal_btree_reader:open(FileName),
reply(From, ok),
main_loop(State#state{ a=BT });
?REQ(From, close) ->
reply(From, ok),
ok;
%% gen_fsm handling
{system, From, Req} ->
plain_fsm:handle_system_msg(
From, Req, State, fun(S1) -> main_loop(S1) end);
{'EXIT', Parent, Reason} ->
plain_fsm:parent_EXIT(Reason, State)
end;
main_loop(State = #state{ a=BT1, b=undefined, next=Next }) ->
Parent = plain_fsm:info(parent),
receive
?REQ(From, {lookup, Key})=Req ->
case fractal_btree_reader:lookup(BT1, Key) of
{ok, deleted} ->
reply(From, notfound);
{ok, _}=Reply ->
reply(From, Reply);
notfound when Next =:= undefined ->
reply(From, notfound);
notfound ->
Next ! Req
end,
main_loop(State);
?REQ(From, {inject, FileName}) ->
{ok, BT2} = fractal_btree_reader:open(FileName),
reply(From, ok),
ok = begin_merge(State),
main_loop(State#state{ b=BT2 });
?REQ(From, close) ->
fractal_btree_reader:close(BT1),
reply(From, ok),
ok;
%% gen_fsm handling
{system, From, Req} ->
plain_fsm:handle_system_msg(
From, Req, State, fun(S1) -> main_loop(S1) end);
{'EXIT', Parent, Reason} ->
plain_fsm:parent_EXIT(Reason, State)
end;
main_loop(State = #state{ next=Next }) ->
Parent = plain_fsm:info(parent),
receive
?REQ(From, {lookup, Key})=Req ->
case fractal_btree_reader:lookup(State#state.b, Key) of
{ok, deleted} ->
reply(From, notfound),
main_loop(State);
{ok, _}=Reply ->
reply(From, Reply),
main_loop(State);
_ ->
case fractal_btree_reader:lookup(State#state.a, Key) of
{ok, deleted} ->
reply(From, notfound);
{ok, _}=Reply ->
reply(From, Reply);
notfound when Next =:= undefined ->
reply(From, notfound);
notfound ->
Next ! Req
end,
main_loop(State)
end;
?REQ(From, close) ->
fractal_btree_reader:close(State#state.a),
fractal_btree_reader:close(State#state.b),
%% TODO: stop merger, if any?
reply(From, ok),
ok;
%%
%% The outcome of merging resulted in a file with less than
%% level #entries, so we keep it at this level
%%
{merge_done, Count, OutFileName} when Count =< State#state.level ->
% first, rename the tmp file to C, so recovery will pick it up
CFileName = filename("C",State),
ok = file:rename(OutFileName, CFileName),
% then delete A and B (if we crash now, C will become the A file)
{ok, State2} = close_a_and_b(State),
% then, rename C to A, and open it
AFileName = filename("A",State2),
ok = file:rename(CFileName, AFileName),
{ok, BT} = fractal_btree_reader:open(AFileName),
main_loop(State2#state{ a=BT, b=undefined });
%%
%% We need to push the output of merging to the next level
%%
{merge_done, _, OutFileName} ->
State1 =
if Next =:= undefined ->
PID = open(State#state.dir, State#state.level * 2),
State#state{ next=PID };
true ->
State
end,
MRef = send_request(State1#state.next, {inject, OutFileName}),
main_loop(State1#state{ inject_done_ref = MRef });
%%
%% Our successor accepted the inject
%%
?REPLY(MRef, ok) when MRef =:= State#state.inject_done_ref ->
erlang:demonitor(MRef, [flush]),
{ok, State2} = close_a_and_b(State),
main_loop(State2#state{ inject_done_ref=undefined, a=undefined, b=undefined });
%%
%% Our successor died!
%%
{'DOWN', MRef, _, _, Reason} when MRef =:= State#state.inject_done_ref ->
exit(Reason);
%% gen_fsm handling
{system, From, Req} ->
plain_fsm:handle_system_msg(
From, Req, State, fun(S1) -> main_loop(S1) end);
{'EXIT', Parent, Reason} ->
plain_fsm:parent_EXIT(Reason, State)
end.
begin_merge(State) ->
AFileName = filename("A",State),
BFileName = filename("B",State),
XFileName = filename("X",State),
Owner = self(),
spawn_link(fun() ->
{ok, OutCount} = fractal_btree_merger:merge(AFileName, BFileName, XFileName),
Owner ! {merge_done, OutCount, XFileName}
end),
ok.
close_a_and_b(State) ->
AFileName = filename("A",State),
BFileName = filename("B",State),
ok = fractal_btree_reader:close(State#state.a),
ok = fractal_btree_reader:close(State#state.b),
ok = file:delete(AFileName),
ok = file:delete(BFileName),
{ok, State#state{a=undefined, b=undefined}}.
filename(PFX, State) ->
filename:join(State#state.dir, PFX ++ integer_to_list(State#state.level) ++ ".data").

View file

@ -0,0 +1,109 @@
-module(fractal_btree_merger).
%%
%% Naive Merge of two b-trees. A better implementation should iterate leafs, not KV's
%%
-export([merge/3]).
-record(state, { out, a_pid, b_pid }).
merge(A,B,C) ->
{ok, Out} = fractal_btree_writer:open(C),
Owner = self(),
PID1 = spawn_link(fun() -> scan(Owner, A) end),
PID2 = spawn_link(fun() -> scan(Owner, B) end),
%% "blocks" until both scans are done ...
{ok, Count} = receive_both(undefined, undefined, #state{ out=Out, a_pid=PID1, b_pid=PID2 }, 0),
%% finish stream tree
ok = fractal_btree_writer:close(Out),
{ok, Count}.
scan(SendTo,FileName) ->
%% yes, we need a separate file to scan it, since pread doesn't do read-ahead
{ok, File} = fractal_btree_reader:open(FileName),
fractal_btree_reader:fold(fun(K,V,_) ->
SendTo ! {ok, self(), K, V}
end,
ok,
File),
fractal_btree_reader:close(File),
SendTo ! {eod, self()},
ok.
receive_both(undefined, BVal, #state{a_pid=PID1}=State, Count) ->
receive
{ok, PID1, Key1, Value1} ->
receive_both({Key1,Value1}, BVal, State, Count);
{eod, PID1} ->
case BVal of
{Key2, Value2} ->
fractal_btree_writer:add(State#state.out, Key2, Value2),
receive_bonly(State, Count+1);
undefined ->
receive_bonly(State, Count)
end
end;
receive_both({Key1,Value1}=AValue, undefined, #state{ b_pid=PID2 }=State, Count) ->
receive
{ok, PID2, Key2, Value2} ->
receive_both(AValue, {Key2,Value2}, State, Count);
{eod, PID2} ->
ok = fractal_btree_writer:add(State#state.out, Key1, Value1),
receive_aonly(State, Count+1)
end;
receive_both(AValue={Key1,Value1}, BValue={Key2,Value2}, State, Count) ->
if Key1 < Key2 ->
ok = fractal_btree_writer:add(State#state.out, Key1, Value1),
receive_both(undefined, BValue, State, Count+1);
Key2 < Key1 ->
ok = fractal_btree_writer:add(State#state.out, Key2, Value2),
receive_both(AValue, undefined, State, Count+1);
Key1 == Key2 ->
%% TODO: eliminate tombstones, right now they just bubble down
ok = fractal_btree_writer:add(State#state.out, Key2, Value2),
receive_both(undefined, undefined, State, Count+1)
end.
%%
%% Reached the end of the "B File" ... now just stream everything from A to OUT
%%
receive_aonly(#state{a_pid=PID1}=State, Count) ->
receive
{ok, PID1, Key1, Value1} ->
ok = fractal_btree_writer:add(State#state.out,
Key1,
Value1),
receive_aonly(State, Count+1);
{eod, PID1} ->
{ok, Count}
end.
%%
%% Reached the end of the "A File" ... now just stream everything from B to OUT
%%
receive_bonly(#state{b_pid=PID2}=State, Count) ->
receive
{ok, PID2, Key2, Value2} ->
ok = fractal_btree_writer:add(State#state.out,
Key2,
Value2),
receive_bonly(State, Count+1);
{eod, PID2} ->
{ok, Count}
end.

View file

@ -0,0 +1,94 @@
-module(fractal_btree_reader).
-include_lib("kernel/include/file.hrl").
-export([open/1,close/1,lookup/2,fold/3]).
-record(node, { level, members=[] }).
-record(index, {file, root}).
open(Name) ->
{ok, File} = file:open(Name, [raw,read,read_ahead,binary]),
{ok, FileInfo} = file:read_file_info(Name),
%% read root position
{ok, <<RootPos:64/unsigned>>} = file:pread(File, FileInfo#file_info.size-8, 8),
%% suck in the root
{ok, Root} = read_node(File, RootPos),
{ok, #index{file=File, root=Root}}.
fold(Fun, Acc0, #index{file=File}) ->
{ok, Node} = read_node(File,0),
fold0(File,fun({K,V},Acc) -> Fun(K,V,Acc) end,Node,Acc0).
fold0(File,Fun,#node{level=0,members=List},Acc0) ->
Acc1 = lists:foldl(Fun,Acc0,List),
fold1(File,Fun,Acc1);
fold0(File,Fun,_InnerNode,Acc0) ->
fold1(File,Fun,Acc0).
fold1(File,Fun,Acc0) ->
case read_node(File) of
eof ->
Acc0;
{ok, Node} ->
fold0(File,Fun,Node,Acc0)
end.
close(#index{file=File}) ->
file:close(File).
lookup(#index{file=File, root=Node},Key) ->
lookup_in_node(File,Node,Key).
lookup_in_node(_File,#node{level=0,members=Members},Key) ->
case lists:keyfind(Key,1,Members) of
false ->
notfound;
{_,Value} ->
{ok, Value}
end;
lookup_in_node(File,#node{members=Members},Key) ->
case find(Key, Members) of
{ok, Pos} ->
{ok, Node} = read_node(File, Pos),
lookup_in_node(File, Node, Key);
notfound ->
notfound
end.
find(K, [{K1,V},{K2,_}|_]) when K >= K1, K < K2 ->
{ok, V};
find(K, [{K1,V}]) when K >= K1 ->
{ok, V};
find(K, [_|T]) ->
find(K,T);
find(_, _) ->
notfound.
read_node(File,Pos) ->
{ok, Pos} = file:position(File, Pos),
Result = read_node(File),
% error_logger:info_msg("decoded ~p ~p~n", [Pos, Result]),
Result.
read_node(File) ->
{ok, <<Len:32>>} = file:read(File, 4),
case Len of
0 -> eof;
_ ->
{ok, Data} = file:read(File, Len),
{ok, Node} = fractal_btree_util:decode_index_node(Data),
{ok, Node}
end.

28
src/fractal_btree_sup.erl Normal file
View file

@ -0,0 +1,28 @@
-module(fractal_btree_sup).
-behaviour(supervisor).
%% API
-export([start_link/0]).
%% Supervisor callbacks
-export([init/1]).
%% Helper macro for declaring children of supervisor
-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).
%% ===================================================================
%% API functions
%% ===================================================================
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
%% ===================================================================
%% Supervisor callbacks
%% ===================================================================
init([]) ->
{ok, { {one_for_one, 5, 10}, []} }.

View file

@ -0,0 +1,31 @@
-module(fractal_btree_util).
-compile(export_all).
index_file_name(Name) ->
Name.
estimate_node_size_increment(_KVList,Key,Value) ->
byte_size(Key)
+ 10
+ if
is_integer(Value) ->
5;
is_binary(Value) ->
5 + byte_size(Value);
is_atom(Value) ->
8
end.
encode_index_node(Level, KVList) ->
Data = %zlib:zip(
erlang:term_to_binary({Level, KVList})
% )
,
Size = byte_size(Data),
{ok, Size+4, [ <<Size:32>> | Data ] }.
decode_index_node(Data) ->
{Level,KVList} = erlang:binary_to_term(Data), %zlib:unzip(Data)),
{ok, {node, Level, KVList}}.

View file

@ -0,0 +1,138 @@
-module(fractal_btree_writer).
-define(NODE_SIZE, 2*1024).
-behavior(gen_server).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-export([open/1, add/3,close/1]).
-record(node, { level, members=[], size=0 }).
-record(state, { index_file,
index_file_pos,
last_node_pos :: pos_integer(),
nodes = [] :: [ #node{} ],
name :: string()
}).
%%% PUBLIC API
open(Name) ->
gen_server:start(?MODULE, [Name], []).
add(Ref,Key,Data) ->
gen_server:cast(Ref, {add, Key, Data}).
close(Ref) ->
gen_server:call(Ref, close).
%%%
init([Name]) ->
% io:format("got name: ~p~n", [Name]),
{ok, IdxFile} = file:open( fractal_btree_util:index_file_name(Name),
[raw, exclusive, write, delayed_write]),
{ok, #state{ name=Name,
index_file_pos=0, index_file=IdxFile
}}.
handle_cast({add, Key, Data}, State) when is_binary(Key), is_binary(Data) ->
{ok, State2} = add_record(0, Key, Data, State),
{noreply, State2}.
handle_call(close, _From, State) ->
{ok, State2} = flush_nodes(State),
{stop,normal,ok,State2}.
handle_info(Info,State) ->
error_logger:error_msg("Unknown info ~p~n", [Info]),
{stop,bad_msg,State}.
terminate(normal,_State) ->
ok;
%% premature delete -> cleanup
terminate(_Reason,State) ->
file:close( State#state.index_file ),
file:delete( fractal_btree_util:index_file_name(State#state.name) ).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%%% INTERNAL FUNCTIONS
flush_nodes(#state{ nodes=[], last_node_pos=LastNodePos }=State) ->
Trailer = << 0:8, LastNodePos:64/unsigned >>,
IdxFile = State#state.index_file,
ok = file:write(IdxFile, Trailer),
ok = file:close(IdxFile),
{ok, State#state{ index_file=undefined }};
flush_nodes(State=#state{ nodes=[#node{level=N, members=[_]}] }) when N>0 ->
flush_nodes(State#state{ nodes=[] });
flush_nodes(State) ->
{ok, State2} = close_node(State),
flush_nodes(State2).
add_record(Level, Key, Value, #state{ nodes=[ #node{level=Level, members=List, size=NodeSize}=CurrNode |RestNodes] }=State) ->
%% assert that keys are increasing
case List of
[] -> ok;
[{PrevKey,_}|_] ->
if
(Key >= PrevKey) -> ok;
true ->
error_logger:error_msg("keys not ascending ~p < ~p~n", [PrevKey, Key]),
exit({badarg, Key})
end
end,
NewSize = NodeSize + fractal_btree_util:estimate_node_size_increment(List, Key, Value),
NodeMembers = [{Key,Value} | List],
if
NewSize >= ?NODE_SIZE ->
close_node(State#state{ nodes=[CurrNode#node{ members=NodeMembers, size=NewSize} | RestNodes] });
true ->
{ok, State#state{ nodes=[ CurrNode#node{ members=NodeMembers, size=NewSize } | RestNodes ] }}
end;
add_record(Level, Key, Value, #state{ nodes=Nodes }=State) ->
add_record(Level, Key, Value, State#state{ nodes = [ #node{ level=Level, members=[] } | Nodes ] }).
close_node(#state{nodes=[#node{ level=Level, members=NodeMembers }|RestNodes]} = State) ->
OrderedMembers = lists:reverse(NodeMembers),
{ok, DataSize, Data} = fractal_btree_util:encode_index_node(Level, OrderedMembers),
NodePos = State#state.index_file_pos,
ok = file:write(State#state.index_file, Data),
{FirstKey, _} = hd(OrderedMembers),
add_record(Level+1, FirstKey, NodePos,
State#state{ nodes = RestNodes,
index_file_pos = NodePos + DataSize,
last_node_pos = NodePos}).

View file

@ -0,0 +1,98 @@
-module(fractal_btree_tests).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.
-compile(export_all).
simple_test() ->
{ok, BT} = fractal_btree_writer:open("testdata"),
ok = fractal_btree_writer:add(BT, <<"A">>, <<"Avalue">>),
ok = fractal_btree_writer:add(BT, <<"B">>, <<"Bvalue">>),
ok = fractal_btree_writer:close(BT),
{ok, IN} = fractal_btree_reader:open("testdata"),
{ok, <<"Avalue">>} = fractal_btree_reader:lookup(IN, <<"A">>),
ok = fractal_btree_reader:close(IN),
ok = file:delete("testdata").
simple1_test() ->
{ok, BT} = fractal_btree_writer:open("testdata"),
Max = 30*1024,
Seq = lists:seq(0, Max),
{Time1,_} = timer:tc(
fun() ->
lists:foreach(
fun(Int) ->
ok = fractal_btree_writer:add(BT, <<Int:128>>, <<"valuevalue/", Int:128>>)
end,
Seq),
ok = fractal_btree_writer:close(BT)
end,
[]),
error_logger:info_msg("time to insert: ~p/sec~n", [1000000/(Time1/Max)]),
{ok, IN} = fractal_btree_reader:open("testdata"),
{ok, <<"valuevalue/", 2048:128>>} = fractal_btree_reader:lookup(IN, <<2048:128>>),
{Time2,Count} = timer:tc(
fun() -> fractal_btree_reader:fold(fun(Key, <<"valuevalue/", Key/binary>>, N) ->
N+1
end,
0,
IN)
end,
[]),
error_logger:info_msg("time to scan: ~p/sec~n", [1000000/(Time2/Max)]),
Max = Count-1,
ok = fractal_btree_reader:close(IN),
ok = file:delete("testdata").
merge_test() ->
{ok, BT1} = fractal_btree_writer:open("test1"),
lists:foldl(fun(N,_) ->
ok = fractal_btree_writer:add(BT1, <<N:128>>, <<"data",N:128>>)
end,
ok,
lists:seq(1,10000,2)),
ok = fractal_btree_writer:close(BT1),
{ok, BT2} = fractal_btree_writer:open("test2"),
lists:foldl(fun(N,_) ->
ok = fractal_btree_writer:add(BT2, <<N:128>>, <<"data",N:128>>)
end,
ok,
lists:seq(2,5001,1)),
ok = fractal_btree_writer:close(BT2),
{Time,{ok,Count}} = timer:tc(fractal_btree_merger, merge, ["test1", "test2", "test3"]),
error_logger:info_msg("time to merge: ~p/sec (time=~p, count=~p)~n", [1000000/(Time/Count), Time/1000000, Count]),
ok = file:delete("test1"),
ok = file:delete("test2"),
ok = file:delete("test3"),
ok.