From cdadb88ebf18dbe1d7e58bc368c14af2dac19e20 Mon Sep 17 00:00:00 2001 From: Kresten Krab Thorup Date: Thu, 5 Jan 2012 11:48:14 +0100 Subject: [PATCH] Top-level functionality fractal_btree "works" There is a single unit test for the aggregate functionality, so basic interactions work. [Too many log messages right now] --- src/fractal_btree.erl | 162 +++++++++++++++++++++++++++++++++++ src/fractal_btree_level.erl | 137 +++++++++++++++++++---------- src/fractal_btree_writer.erl | 4 +- test/fractal_btree_tests.erl | 14 ++- 4 files changed, 267 insertions(+), 50 deletions(-) create mode 100644 src/fractal_btree.erl diff --git a/src/fractal_btree.erl b/src/fractal_btree.erl new file mode 100644 index 0000000..dc8253e --- /dev/null +++ b/src/fractal_btree.erl @@ -0,0 +1,162 @@ +-module(fractal_btree). + +-behavior(gen_server). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-export([open/1, close/1, lookup/2, delete/2, put/3]). + + + +-include_lib("kernel/include/file.hrl"). + +-record(state, { top, nursery = gb_trees:empty(), dir }). + +%% smallest levels are 32 entries +-define(TOP_LEVEL, 5). +-define(TOP_SIZE, (1 bsl ?TOP_LEVEL)). + +%% PUBLIC API + +open(Dir) -> + gen_server:start(?MODULE, [Dir], []). + +close(Ref) -> + gen_server:call(Ref, close). + +lookup(Ref,Key) when is_binary(Key) -> + gen_server:call(Ref, {lookup, Key}). + +delete(Ref,Key) when is_binary(Key) -> + gen_server:call(Ref, {delete, Key}). + +put(Ref,Key,Value) when is_binary(Key), is_binary(Value) -> + gen_server:call(Ref, {put, Key, Value}). + + + +init([Dir]) -> + + case file:read_file_info(Dir) of + {ok, #file_info{ type=directory }} -> + {ok, TopLevel} = open_levels(Dir); + + {error, E} when E =:= enoent -> + ok = file:make_dir(Dir), + {ok, TopLevel} = fractal_btree_level:open(Dir, ?TOP_LEVEL, undefined) + end, + + {ok, #state{ top=TopLevel, dir=Dir }}. + + + +open_levels(Dir) -> + {ok, Files} = file:list_dir(Dir), + + %% parse file names and find max level + {MinLevel,MaxLevel} = + lists:foldl(fun(FileName, {MinLevel,MaxLevel}) -> + case parse_level(FileName) of + {ok, Level} -> + { erlang:min(MinLevel, Level), + erlang:max(MaxLevel, Level) }; + _ -> + {MinLevel,MaxLevel} + end + end, + {?TOP_LEVEL, ?TOP_LEVEL}, + Files), + + error_logger:info_msg("found files ... {~p,~p}~n", [MinLevel, MaxLevel]), + + TopLevel = + lists:foldl( fun(LevelNo, Prev) -> + {ok, Level} = fractal_btree_level:open(Dir,LevelNo,Prev), + Level + end, + undefined, + lists:seq(MaxLevel, MinLevel, -1)), + + {ok, TopLevel}. + +parse_level(FileName) -> + case re:run(FileName, "^[^\\d]+-(\\d+)\\.data$", [{capture,all_but_first,list}]) of + {match,[StringVal]} -> + {ok, list_to_integer(StringVal)}; + _ -> + nomatch + end. + + +handle_info(Info,State) -> + error_logger:error_msg("Unknown info ~p~n", [Info]), + {stop,bad_msg,State}. + +handle_cast(Info,State) -> + error_logger:error_msg("Unknown cast ~p~n", [Info]), + {stop,bad_msg,State}. + + +%% premature delete -> cleanup +terminate(Reason,State) -> + error_logger:info_msg("got terminate(~p,~p)~n", [Reason,State]), + % flush_nursery(State), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + + +handle_call({put, Key, Value}, _From, State) when is_binary(Key), is_binary(Value) -> + {ok, State2} = do_put(Key, Value, State), + {reply, ok, State2}; + +handle_call({delete, Key}, _From, State) when is_binary(Key) -> + {ok, State2} = do_put(Key, deleted, State), + {reply, ok, State2}; + +handle_call({lookup, Key}, _From, State=#state{ top=Top, nursery=Nursery } ) when is_binary(Key) -> + case gb_trees:lookup(Key, Nursery) of + {value, deleted} -> + {reply, notfound, State}; + {value, Value} when is_binary(Value) -> + {reply, {ok, Value}, State}; + none -> + Reply = fractal_btree_level:lookup(Top, Key), + {reply, Reply, State} + end; + +handle_call(close, _From, State) -> + {ok, State2} = flush_nursery(State), + {stop, normal, ok, State2}. + + + +do_put(Key, Value, State=#state{ nursery=Tree }) -> + Tree2 = gb_trees:enter(Key, Value, Tree), + TreeSize = gb_trees:size(Tree2), + + if TreeSize >= (1 bsl ?TOP_LEVEL) -> + flush_nursery(State#state{ nursery=Tree2 }); + true -> + {ok, State#state{ nursery=Tree2 }} + end. + +flush_nursery(State=#state{nursery=Tree, top=Top}) -> + TreeSize = gb_trees:size( Tree ), + if TreeSize > 0 -> + error_logger:info_msg("flushing to top=~p, alive=~p~n", [Top, erlang:is_process_alive(Top)]), + FileName = filename:join(State#state.dir, "nursery.data"), + {ok, BT} = fractal_btree_writer:open(FileName, (1 bsl ?TOP_LEVEL)), + lists:foreach( fun({Key2,Value2}) -> + ok = fractal_btree_writer:add(BT, Key2, Value2) + end, + gb_trees:to_list(Tree)), + ok = fractal_btree_writer:close(BT), + ok = fractal_btree_level:inject(Top, FileName), + {ok, State#state{ nursery=gb_trees:empty() } }; + true -> + {ok, State} + end. diff --git a/src/fractal_btree_level.erl b/src/fractal_btree_level.erl index cd2f55f..a024fff 100644 --- a/src/fractal_btree_level.erl +++ b/src/fractal_btree_level.erl @@ -5,36 +5,45 @@ %% the process of injecting/merging parent trees into this pair. %% +%% +%% For now, we use plain_fsm, because we *want* selective receive to postpone +%% pending injects while we're merging. That's a lot simpler than maintaining +%% to corresponding merge queue. +%% + -behavior(plain_fsm). -export([data_vsn/0, code_change/3]). --export([open/2, lookup/2, inject/2, close/1]). +-export([open/3, lookup/2, inject/2, close/1]). -include_lib("kernel/include/file.hrl"). -record(state, { - a, b, next, dir, level, inject_done_ref + a, b, next, dir, level, inject_done_ref, merge_pid }). +%%%%% PUBLIC OPERATIONS - -open(Dir,Level) -> - plain_fsm:spawn_link(?MODULE, - fun() -> - process_flag(trap_exit,true), - initialize(#state{dir=Dir,level=Level}) - end). +open(Dir,Level,Next) when Level>0 -> + {ok, plain_fsm:spawn_link(?MODULE, + fun() -> + process_flag(trap_exit,true), + initialize(#state{dir=Dir,level=Level,next=Next}) + end)}. lookup(Ref, Key) -> call(Ref, {lookup, Key}). inject(Ref, FileName) -> - call(Ref, {inject, FileName}). + Result = call(Ref, {inject, FileName}), + Result. close(Ref) -> call(Ref, close). +%%%%% INTERNAL + data_vsn() -> 5. @@ -46,7 +55,7 @@ code_change(_OldVsn, _State, _Extra) -> -define(REPLY(Ref,Msg), {'$rep', Ref, Msg}). send_request(PID, Request) -> - Ref = erlang:monitor(PID), + Ref = erlang:monitor(process, PID), PID ! ?REQ({self(), Ref}, Request), Ref. @@ -56,6 +65,7 @@ receive_reply(MRef) -> erlang:demonitor(MRef, [flush]), Reply; {'DOWN', MRef, _, _, Reason} -> + error_logger:info_msg("Level dies, reason=~p~n", [Reason]), exit(Reason) end. @@ -69,6 +79,9 @@ reply({PID,Ref}, Reply) -> initialize(State) -> + error_logger:info_msg("in ~p level=~p~n", [self(), State]), + + AFileName = filename("A",State), BFileName = filename("B",State), CFileName = filename("C",State), @@ -82,60 +95,73 @@ initialize(State) -> ok = file:rename(CFileName, AFileName), {ok, BT} = fractal_btree_reader:open(CFileName), - main_loop(State#state{ a= BT, b=undefined }); + main_loop1(State#state{ a= BT, b=undefined }); {error, enoent} -> case file:read_file_info(BFileName) of {ok, _} -> {ok, BT1} = fractal_btree_reader:open(AFileName), {ok, BT2} = fractal_btree_reader:open(BFileName), - main_loop(State#state{ a=BT1, b=BT2 }); + + {ok, MergePID} = begin_merge(State), + + main_loop2(State#state{ a=BT1, b=BT2, merge_pid=MergePID }); {error, enoent} -> case file:read_file_info(AFileName) of {ok, _} -> {ok, BT1} = fractal_btree_reader:open(AFileName), - main_loop(State#state{ a=BT1 }); + main_loop1(State#state{ a=BT1 }); {error, enoent} -> - main_loop(State) + main_loop0(State) end end end. -main_loop(State = #state{ a=undefined, b=undefined }) -> +main_loop0(State = #state{ a=undefined, b=undefined }) -> Parent = plain_fsm:info(parent), + error_logger:info_msg("in main_loop0~n", []), receive ?REQ(From, {lookup, _})=Msg -> + error_logger:info_msg("in main_loop0, msg=~p~n", [Msg]), case State#state.next of undefined -> reply(From, notfound); Next -> Next ! Msg end, - main_loop(State); + main_loop0(State); - ?REQ(From, {inject, FileName}) -> - {ok, BT} = fractal_btree_reader:open(FileName), + ?REQ(From, {inject, FileName})=_Msg -> + error_logger:info_msg("in main_loop0, msg=~p~n", [_Msg]), + AFileName = filename("A",State), + ok = file:rename(FileName, AFileName), + {ok, BT} = fractal_btree_reader:open(AFileName), reply(From, ok), - main_loop(State#state{ a=BT }); + main_loop1(State#state{ a=BT }); - ?REQ(From, close) -> + ?REQ(From, close)=_Msg -> + error_logger:info_msg("in main_loop0, msg=~p~n", [_Msg]), reply(From, ok), ok; %% gen_fsm handling - {system, From, Req} -> + {system, From, Req}=_Msg -> + error_logger:info_msg("in main_loop0, msg=~p~n", [_Msg]), plain_fsm:handle_system_msg( - From, Req, State, fun(S1) -> main_loop(S1) end); - {'EXIT', Parent, Reason} -> - plain_fsm:parent_EXIT(Reason, State) - end; + From, Req, State, fun(S1) -> main_loop0(S1) end); -main_loop(State = #state{ a=BT1, b=undefined, next=Next }) -> + {'EXIT', Parent, Reason}=_Msg -> + error_logger:info_msg("in main_loop0, msg=~p~n", [_Msg]), + plain_fsm:parent_EXIT(Reason, State) + end. + +main_loop1(State = #state{ a=BT1, b=undefined, next=Next }) -> Parent = plain_fsm:info(parent), + error_logger:info_msg("in main_loop1~n", []), receive ?REQ(From, {lookup, Key})=Req -> case fractal_btree_reader:lookup(BT1, Key) of @@ -148,13 +174,15 @@ main_loop(State = #state{ a=BT1, b=undefined, next=Next }) -> notfound -> Next ! Req end, - main_loop(State); + main_loop1(State); ?REQ(From, {inject, FileName}) -> - {ok, BT2} = fractal_btree_reader:open(FileName), + BFileName = filename("B",State), + ok = file:rename(FileName, BFileName), + {ok, BT2} = fractal_btree_reader:open(BFileName), reply(From, ok), - ok = begin_merge(State), - main_loop(State#state{ b=BT2 }); + {ok, MergePID} = begin_merge(State), + main_loop2(State#state{ b=BT2, merge_pid=MergePID }); ?REQ(From, close) -> fractal_btree_reader:close(BT1), @@ -164,22 +192,23 @@ main_loop(State = #state{ a=BT1, b=undefined, next=Next }) -> %% gen_fsm handling {system, From, Req} -> plain_fsm:handle_system_msg( - From, Req, State, fun(S1) -> main_loop(S1) end); + From, Req, State, fun(S1) -> main_loop1(S1) end); {'EXIT', Parent, Reason} -> plain_fsm:parent_EXIT(Reason, State) - end; + end. -main_loop(State = #state{ next=Next }) -> +main_loop2(State = #state{ next=Next }) -> Parent = plain_fsm:info(parent), + error_logger:info_msg("in main_loop2~n", []), receive ?REQ(From, {lookup, Key})=Req -> case fractal_btree_reader:lookup(State#state.b, Key) of {ok, deleted} -> reply(From, notfound), - main_loop(State); + main_loop2(State); {ok, _}=Reply -> reply(From, Reply), - main_loop(State); + main_loop2(State); _ -> case fractal_btree_reader:lookup(State#state.a, Key) of {ok, deleted} -> @@ -191,13 +220,19 @@ main_loop(State = #state{ next=Next }) -> notfound -> Next ! Req end, - main_loop(State) + main_loop2(State) end; ?REQ(From, close) -> fractal_btree_reader:close(State#state.a), fractal_btree_reader:close(State#state.b), + MergePID = State#state.merge_pid, + if MergePID =:= undefined -> + ok; + true -> + erlang:exit(State#state.merge_pid, shutdown) + end, %% TODO: stop merger, if any? reply(From, ok), ok; @@ -206,7 +241,7 @@ main_loop(State = #state{ next=Next }) -> %% The outcome of merging resulted in a file with less than %% level #entries, so we keep it at this level %% - {merge_done, Count, OutFileName} when Count =< State#state.level -> + {merge_done, Count, OutFileName} when Count =< (1 bsl State#state.level) -> % first, rename the tmp file to C, so recovery will pick it up CFileName = filename("C",State), @@ -220,7 +255,7 @@ main_loop(State = #state{ next=Next }) -> ok = file:rename(CFileName, AFileName), {ok, BT} = fractal_btree_reader:open(AFileName), - main_loop(State2#state{ a=BT, b=undefined }); + main_loop1(State2#state{ a=BT, b=undefined, merge_pid=undefined }); %% %% We need to push the output of merging to the next level @@ -228,14 +263,14 @@ main_loop(State = #state{ next=Next }) -> {merge_done, _, OutFileName} -> State1 = if Next =:= undefined -> - PID = open(State#state.dir, State#state.level * 2), + {ok, PID} = ?MODULE:open(State#state.dir, State#state.level + 1, undefined), State#state{ next=PID }; true -> State end, MRef = send_request(State1#state.next, {inject, OutFileName}), - main_loop(State1#state{ inject_done_ref = MRef }); + main_loop2(State1#state{ inject_done_ref = MRef, merge_pid=undefined }); %% %% Our successor accepted the inject @@ -243,7 +278,7 @@ main_loop(State = #state{ next=Next }) -> ?REPLY(MRef, ok) when MRef =:= State#state.inject_done_ref -> erlang:demonitor(MRef, [flush]), {ok, State2} = close_a_and_b(State), - main_loop(State2#state{ inject_done_ref=undefined, a=undefined, b=undefined }); + main_loop0(State2#state{ inject_done_ref=undefined }); %% %% Our successor died! @@ -254,7 +289,7 @@ main_loop(State = #state{ next=Next }) -> %% gen_fsm handling {system, From, Req} -> plain_fsm:handle_system_msg( - From, Req, State, fun(S1) -> main_loop(S1) end); + From, Req, State, fun(S1) -> main_loop2(S1) end); {'EXIT', Parent, Reason} -> plain_fsm:parent_EXIT(Reason, State) @@ -268,13 +303,18 @@ begin_merge(State) -> XFileName = filename("X",State), Owner = self(), - spawn_link(fun() -> + file:delete(XFileName), + + MergePID = spawn_link(fun() -> {ok, OutCount} = fractal_btree_merger:merge(AFileName, BFileName, XFileName, - State#state.level * 2), + State#state.level + 1), + + error_logger:info_msg("merge done ~p,~p -> ~p~n", [AFileName, BFileName, XFileName]), + Owner ! {merge_done, OutCount, XFileName} end), - ok. + {ok, MergePID}. close_a_and_b(State) -> @@ -291,5 +331,8 @@ close_a_and_b(State) -> filename(PFX, State) -> - filename:join(State#state.dir, PFX ++ integer_to_list(State#state.level) ++ ".data"). + filename:join(State#state.dir, PFX ++ "-" ++ integer_to_list(State#state.level) ++ ".data"). + +size(State) -> + (1 bsl State#state.level). diff --git a/src/fractal_btree_writer.erl b/src/fractal_btree_writer.erl index 2e3139c..0b85d6a 100644 --- a/src/fractal_btree_writer.erl +++ b/src/fractal_btree_writer.erl @@ -34,11 +34,11 @@ %%% PUBLIC API open(Name,Size) -> - gen_server:start(?MODULE, [Name,Size], []). + gen_server:start_link(?MODULE, [Name,Size], []). open(Name) -> - gen_server:start(?MODULE, [Name,2048], []). + gen_server:start_link(?MODULE, [Name,2048], []). add(Ref,Key,Data) -> diff --git a/test/fractal_btree_tests.erl b/test/fractal_btree_tests.erl index 3973eb4..ca2c5b8 100644 --- a/test/fractal_btree_tests.erl +++ b/test/fractal_btree_tests.erl @@ -87,7 +87,6 @@ merge_test() -> error_logger:info_msg("time to merge: ~p/sec (time=~p, count=~p)~n", [1000000/(Time/Count), Time/1000000, Count]), - ok = file:delete("test1"), ok = file:delete("test2"), ok = file:delete("test3"), @@ -95,4 +94,17 @@ merge_test() -> ok. +tree_test() -> + + application:start(sasl), + + {ok, Tree} = fractal_btree:open("simple"), + lists:foldl(fun(N,_) -> + ok = fractal_btree:put(Tree, <>, <<"data",N:128>>) + end, + ok, + lists:seq(2,10000,1)), + + ok = fractal_btree:close(Tree). +