Top-level functionality fractal_btree "works"

There is a single unit test for the aggregate
functionality, so basic interactions work.

[Too many log messages right now]
This commit is contained in:
Kresten Krab Thorup 2012-01-05 11:48:14 +01:00
parent da65b9abb1
commit cdadb88ebf
4 changed files with 267 additions and 50 deletions

162
src/fractal_btree.erl Normal file
View file

@ -0,0 +1,162 @@
-module(fractal_btree).
-behavior(gen_server).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-export([open/1, close/1, lookup/2, delete/2, put/3]).
-include_lib("kernel/include/file.hrl").
-record(state, { top, nursery = gb_trees:empty(), dir }).
%% smallest levels are 32 entries
-define(TOP_LEVEL, 5).
-define(TOP_SIZE, (1 bsl ?TOP_LEVEL)).
%% PUBLIC API
open(Dir) ->
gen_server:start(?MODULE, [Dir], []).
close(Ref) ->
gen_server:call(Ref, close).
lookup(Ref,Key) when is_binary(Key) ->
gen_server:call(Ref, {lookup, Key}).
delete(Ref,Key) when is_binary(Key) ->
gen_server:call(Ref, {delete, Key}).
put(Ref,Key,Value) when is_binary(Key), is_binary(Value) ->
gen_server:call(Ref, {put, Key, Value}).
init([Dir]) ->
case file:read_file_info(Dir) of
{ok, #file_info{ type=directory }} ->
{ok, TopLevel} = open_levels(Dir);
{error, E} when E =:= enoent ->
ok = file:make_dir(Dir),
{ok, TopLevel} = fractal_btree_level:open(Dir, ?TOP_LEVEL, undefined)
end,
{ok, #state{ top=TopLevel, dir=Dir }}.
open_levels(Dir) ->
{ok, Files} = file:list_dir(Dir),
%% parse file names and find max level
{MinLevel,MaxLevel} =
lists:foldl(fun(FileName, {MinLevel,MaxLevel}) ->
case parse_level(FileName) of
{ok, Level} ->
{ erlang:min(MinLevel, Level),
erlang:max(MaxLevel, Level) };
_ ->
{MinLevel,MaxLevel}
end
end,
{?TOP_LEVEL, ?TOP_LEVEL},
Files),
error_logger:info_msg("found files ... {~p,~p}~n", [MinLevel, MaxLevel]),
TopLevel =
lists:foldl( fun(LevelNo, Prev) ->
{ok, Level} = fractal_btree_level:open(Dir,LevelNo,Prev),
Level
end,
undefined,
lists:seq(MaxLevel, MinLevel, -1)),
{ok, TopLevel}.
parse_level(FileName) ->
case re:run(FileName, "^[^\\d]+-(\\d+)\\.data$", [{capture,all_but_first,list}]) of
{match,[StringVal]} ->
{ok, list_to_integer(StringVal)};
_ ->
nomatch
end.
handle_info(Info,State) ->
error_logger:error_msg("Unknown info ~p~n", [Info]),
{stop,bad_msg,State}.
handle_cast(Info,State) ->
error_logger:error_msg("Unknown cast ~p~n", [Info]),
{stop,bad_msg,State}.
%% premature delete -> cleanup
terminate(Reason,State) ->
error_logger:info_msg("got terminate(~p,~p)~n", [Reason,State]),
% flush_nursery(State),
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
handle_call({put, Key, Value}, _From, State) when is_binary(Key), is_binary(Value) ->
{ok, State2} = do_put(Key, Value, State),
{reply, ok, State2};
handle_call({delete, Key}, _From, State) when is_binary(Key) ->
{ok, State2} = do_put(Key, deleted, State),
{reply, ok, State2};
handle_call({lookup, Key}, _From, State=#state{ top=Top, nursery=Nursery } ) when is_binary(Key) ->
case gb_trees:lookup(Key, Nursery) of
{value, deleted} ->
{reply, notfound, State};
{value, Value} when is_binary(Value) ->
{reply, {ok, Value}, State};
none ->
Reply = fractal_btree_level:lookup(Top, Key),
{reply, Reply, State}
end;
handle_call(close, _From, State) ->
{ok, State2} = flush_nursery(State),
{stop, normal, ok, State2}.
do_put(Key, Value, State=#state{ nursery=Tree }) ->
Tree2 = gb_trees:enter(Key, Value, Tree),
TreeSize = gb_trees:size(Tree2),
if TreeSize >= (1 bsl ?TOP_LEVEL) ->
flush_nursery(State#state{ nursery=Tree2 });
true ->
{ok, State#state{ nursery=Tree2 }}
end.
flush_nursery(State=#state{nursery=Tree, top=Top}) ->
TreeSize = gb_trees:size( Tree ),
if TreeSize > 0 ->
error_logger:info_msg("flushing to top=~p, alive=~p~n", [Top, erlang:is_process_alive(Top)]),
FileName = filename:join(State#state.dir, "nursery.data"),
{ok, BT} = fractal_btree_writer:open(FileName, (1 bsl ?TOP_LEVEL)),
lists:foreach( fun({Key2,Value2}) ->
ok = fractal_btree_writer:add(BT, Key2, Value2)
end,
gb_trees:to_list(Tree)),
ok = fractal_btree_writer:close(BT),
ok = fractal_btree_level:inject(Top, FileName),
{ok, State#state{ nursery=gb_trees:empty() } };
true ->
{ok, State}
end.

View file

@ -5,36 +5,45 @@
%% the process of injecting/merging parent trees into this pair.
%%
%%
%% For now, we use plain_fsm, because we *want* selective receive to postpone
%% pending injects while we're merging. That's a lot simpler than maintaining
%% to corresponding merge queue.
%%
-behavior(plain_fsm).
-export([data_vsn/0, code_change/3]).
-export([open/2, lookup/2, inject/2, close/1]).
-export([open/3, lookup/2, inject/2, close/1]).
-include_lib("kernel/include/file.hrl").
-record(state, {
a, b, next, dir, level, inject_done_ref
a, b, next, dir, level, inject_done_ref, merge_pid
}).
%%%%% PUBLIC OPERATIONS
open(Dir,Level) ->
plain_fsm:spawn_link(?MODULE,
open(Dir,Level,Next) when Level>0 ->
{ok, plain_fsm:spawn_link(?MODULE,
fun() ->
process_flag(trap_exit,true),
initialize(#state{dir=Dir,level=Level})
end).
initialize(#state{dir=Dir,level=Level,next=Next})
end)}.
lookup(Ref, Key) ->
call(Ref, {lookup, Key}).
inject(Ref, FileName) ->
call(Ref, {inject, FileName}).
Result = call(Ref, {inject, FileName}),
Result.
close(Ref) ->
call(Ref, close).
%%%%% INTERNAL
data_vsn() ->
5.
@ -46,7 +55,7 @@ code_change(_OldVsn, _State, _Extra) ->
-define(REPLY(Ref,Msg), {'$rep', Ref, Msg}).
send_request(PID, Request) ->
Ref = erlang:monitor(PID),
Ref = erlang:monitor(process, PID),
PID ! ?REQ({self(), Ref}, Request),
Ref.
@ -56,6 +65,7 @@ receive_reply(MRef) ->
erlang:demonitor(MRef, [flush]),
Reply;
{'DOWN', MRef, _, _, Reason} ->
error_logger:info_msg("Level dies, reason=~p~n", [Reason]),
exit(Reason)
end.
@ -69,6 +79,9 @@ reply({PID,Ref}, Reply) ->
initialize(State) ->
error_logger:info_msg("in ~p level=~p~n", [self(), State]),
AFileName = filename("A",State),
BFileName = filename("B",State),
CFileName = filename("C",State),
@ -82,60 +95,73 @@ initialize(State) ->
ok = file:rename(CFileName, AFileName),
{ok, BT} = fractal_btree_reader:open(CFileName),
main_loop(State#state{ a= BT, b=undefined });
main_loop1(State#state{ a= BT, b=undefined });
{error, enoent} ->
case file:read_file_info(BFileName) of
{ok, _} ->
{ok, BT1} = fractal_btree_reader:open(AFileName),
{ok, BT2} = fractal_btree_reader:open(BFileName),
main_loop(State#state{ a=BT1, b=BT2 });
{ok, MergePID} = begin_merge(State),
main_loop2(State#state{ a=BT1, b=BT2, merge_pid=MergePID });
{error, enoent} ->
case file:read_file_info(AFileName) of
{ok, _} ->
{ok, BT1} = fractal_btree_reader:open(AFileName),
main_loop(State#state{ a=BT1 });
main_loop1(State#state{ a=BT1 });
{error, enoent} ->
main_loop(State)
main_loop0(State)
end
end
end.
main_loop(State = #state{ a=undefined, b=undefined }) ->
main_loop0(State = #state{ a=undefined, b=undefined }) ->
Parent = plain_fsm:info(parent),
error_logger:info_msg("in main_loop0~n", []),
receive
?REQ(From, {lookup, _})=Msg ->
error_logger:info_msg("in main_loop0, msg=~p~n", [Msg]),
case State#state.next of
undefined ->
reply(From, notfound);
Next ->
Next ! Msg
end,
main_loop(State);
main_loop0(State);
?REQ(From, {inject, FileName}) ->
{ok, BT} = fractal_btree_reader:open(FileName),
?REQ(From, {inject, FileName})=_Msg ->
error_logger:info_msg("in main_loop0, msg=~p~n", [_Msg]),
AFileName = filename("A",State),
ok = file:rename(FileName, AFileName),
{ok, BT} = fractal_btree_reader:open(AFileName),
reply(From, ok),
main_loop(State#state{ a=BT });
main_loop1(State#state{ a=BT });
?REQ(From, close) ->
?REQ(From, close)=_Msg ->
error_logger:info_msg("in main_loop0, msg=~p~n", [_Msg]),
reply(From, ok),
ok;
%% gen_fsm handling
{system, From, Req} ->
{system, From, Req}=_Msg ->
error_logger:info_msg("in main_loop0, msg=~p~n", [_Msg]),
plain_fsm:handle_system_msg(
From, Req, State, fun(S1) -> main_loop(S1) end);
{'EXIT', Parent, Reason} ->
plain_fsm:parent_EXIT(Reason, State)
end;
From, Req, State, fun(S1) -> main_loop0(S1) end);
main_loop(State = #state{ a=BT1, b=undefined, next=Next }) ->
{'EXIT', Parent, Reason}=_Msg ->
error_logger:info_msg("in main_loop0, msg=~p~n", [_Msg]),
plain_fsm:parent_EXIT(Reason, State)
end.
main_loop1(State = #state{ a=BT1, b=undefined, next=Next }) ->
Parent = plain_fsm:info(parent),
error_logger:info_msg("in main_loop1~n", []),
receive
?REQ(From, {lookup, Key})=Req ->
case fractal_btree_reader:lookup(BT1, Key) of
@ -148,13 +174,15 @@ main_loop(State = #state{ a=BT1, b=undefined, next=Next }) ->
notfound ->
Next ! Req
end,
main_loop(State);
main_loop1(State);
?REQ(From, {inject, FileName}) ->
{ok, BT2} = fractal_btree_reader:open(FileName),
BFileName = filename("B",State),
ok = file:rename(FileName, BFileName),
{ok, BT2} = fractal_btree_reader:open(BFileName),
reply(From, ok),
ok = begin_merge(State),
main_loop(State#state{ b=BT2 });
{ok, MergePID} = begin_merge(State),
main_loop2(State#state{ b=BT2, merge_pid=MergePID });
?REQ(From, close) ->
fractal_btree_reader:close(BT1),
@ -164,22 +192,23 @@ main_loop(State = #state{ a=BT1, b=undefined, next=Next }) ->
%% gen_fsm handling
{system, From, Req} ->
plain_fsm:handle_system_msg(
From, Req, State, fun(S1) -> main_loop(S1) end);
From, Req, State, fun(S1) -> main_loop1(S1) end);
{'EXIT', Parent, Reason} ->
plain_fsm:parent_EXIT(Reason, State)
end;
end.
main_loop(State = #state{ next=Next }) ->
main_loop2(State = #state{ next=Next }) ->
Parent = plain_fsm:info(parent),
error_logger:info_msg("in main_loop2~n", []),
receive
?REQ(From, {lookup, Key})=Req ->
case fractal_btree_reader:lookup(State#state.b, Key) of
{ok, deleted} ->
reply(From, notfound),
main_loop(State);
main_loop2(State);
{ok, _}=Reply ->
reply(From, Reply),
main_loop(State);
main_loop2(State);
_ ->
case fractal_btree_reader:lookup(State#state.a, Key) of
{ok, deleted} ->
@ -191,13 +220,19 @@ main_loop(State = #state{ next=Next }) ->
notfound ->
Next ! Req
end,
main_loop(State)
main_loop2(State)
end;
?REQ(From, close) ->
fractal_btree_reader:close(State#state.a),
fractal_btree_reader:close(State#state.b),
MergePID = State#state.merge_pid,
if MergePID =:= undefined ->
ok;
true ->
erlang:exit(State#state.merge_pid, shutdown)
end,
%% TODO: stop merger, if any?
reply(From, ok),
ok;
@ -206,7 +241,7 @@ main_loop(State = #state{ next=Next }) ->
%% The outcome of merging resulted in a file with less than
%% level #entries, so we keep it at this level
%%
{merge_done, Count, OutFileName} when Count =< State#state.level ->
{merge_done, Count, OutFileName} when Count =< (1 bsl State#state.level) ->
% first, rename the tmp file to C, so recovery will pick it up
CFileName = filename("C",State),
@ -220,7 +255,7 @@ main_loop(State = #state{ next=Next }) ->
ok = file:rename(CFileName, AFileName),
{ok, BT} = fractal_btree_reader:open(AFileName),
main_loop(State2#state{ a=BT, b=undefined });
main_loop1(State2#state{ a=BT, b=undefined, merge_pid=undefined });
%%
%% We need to push the output of merging to the next level
@ -228,14 +263,14 @@ main_loop(State = #state{ next=Next }) ->
{merge_done, _, OutFileName} ->
State1 =
if Next =:= undefined ->
PID = open(State#state.dir, State#state.level * 2),
{ok, PID} = ?MODULE:open(State#state.dir, State#state.level + 1, undefined),
State#state{ next=PID };
true ->
State
end,
MRef = send_request(State1#state.next, {inject, OutFileName}),
main_loop(State1#state{ inject_done_ref = MRef });
main_loop2(State1#state{ inject_done_ref = MRef, merge_pid=undefined });
%%
%% Our successor accepted the inject
@ -243,7 +278,7 @@ main_loop(State = #state{ next=Next }) ->
?REPLY(MRef, ok) when MRef =:= State#state.inject_done_ref ->
erlang:demonitor(MRef, [flush]),
{ok, State2} = close_a_and_b(State),
main_loop(State2#state{ inject_done_ref=undefined, a=undefined, b=undefined });
main_loop0(State2#state{ inject_done_ref=undefined });
%%
%% Our successor died!
@ -254,7 +289,7 @@ main_loop(State = #state{ next=Next }) ->
%% gen_fsm handling
{system, From, Req} ->
plain_fsm:handle_system_msg(
From, Req, State, fun(S1) -> main_loop(S1) end);
From, Req, State, fun(S1) -> main_loop2(S1) end);
{'EXIT', Parent, Reason} ->
plain_fsm:parent_EXIT(Reason, State)
@ -268,13 +303,18 @@ begin_merge(State) ->
XFileName = filename("X",State),
Owner = self(),
spawn_link(fun() ->
file:delete(XFileName),
MergePID = spawn_link(fun() ->
{ok, OutCount} = fractal_btree_merger:merge(AFileName, BFileName, XFileName,
State#state.level * 2),
State#state.level + 1),
error_logger:info_msg("merge done ~p,~p -> ~p~n", [AFileName, BFileName, XFileName]),
Owner ! {merge_done, OutCount, XFileName}
end),
ok.
{ok, MergePID}.
close_a_and_b(State) ->
@ -291,5 +331,8 @@ close_a_and_b(State) ->
filename(PFX, State) ->
filename:join(State#state.dir, PFX ++ integer_to_list(State#state.level) ++ ".data").
filename:join(State#state.dir, PFX ++ "-" ++ integer_to_list(State#state.level) ++ ".data").
size(State) ->
(1 bsl State#state.level).

View file

@ -34,11 +34,11 @@
%%% PUBLIC API
open(Name,Size) ->
gen_server:start(?MODULE, [Name,Size], []).
gen_server:start_link(?MODULE, [Name,Size], []).
open(Name) ->
gen_server:start(?MODULE, [Name,2048], []).
gen_server:start_link(?MODULE, [Name,2048], []).
add(Ref,Key,Data) ->

View file

@ -87,7 +87,6 @@ merge_test() ->
error_logger:info_msg("time to merge: ~p/sec (time=~p, count=~p)~n", [1000000/(Time/Count), Time/1000000, Count]),
ok = file:delete("test1"),
ok = file:delete("test2"),
ok = file:delete("test3"),
@ -95,4 +94,17 @@ merge_test() ->
ok.
tree_test() ->
application:start(sasl),
{ok, Tree} = fractal_btree:open("simple"),
lists:foldl(fun(N,_) ->
ok = fractal_btree:put(Tree, <<N:128>>, <<"data",N:128>>)
end,
ok,
lists:seq(2,10000,1)),
ok = fractal_btree:close(Tree).