From 442e79e4f15a732729e8cddde964f551bf73a47d Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Thu, 30 Apr 2015 17:28:43 +0900 Subject: [PATCH] Add machi_flu_psup.erl to supervise all 3 FLU processes (see below) Introduce machi_flu_psup:start_flu_package/4 as a way to start all related FLU processes * The projection store * The chain manager * The FLU itself ... as well as linked processes. http://www.snookles.com/scotttmp/flu-tree-20150430.png shows one FLU running, "a". The process registered "a" is the append server, "some-prefix" for the sequencer & writer for the current <<"some-prefix">> file, and a process each for 3 active TCP connections to that FLU. --- src/machi.app.src | 2 +- src/machi_chain_manager1.erl | 17 ++++- src/machi_flu1.erl | 93 +++++++++++++++++---------- src/machi_flu_psup.erl | 76 ++++++++++++++++++++++ src/machi_flu_sup.erl | 11 +--- src/machi_projection.erl | 17 ++++- test/machi_proxy_flu1_client_test.erl | 2 +- 7 files changed, 167 insertions(+), 51 deletions(-) create mode 100644 src/machi_flu_psup.erl diff --git a/src/machi.app.src b/src/machi.app.src index 7a2866b..64d9a3a 100644 --- a/src/machi.app.src +++ b/src/machi.app.src @@ -7,7 +7,7 @@ {env, [ {flu_list, [ - {flu_a, 32900, "./data.flu_a"} + %%%%%% {flu_a, 32900, "./data.flu_a"} ]} ]} ]}. diff --git a/src/machi_chain_manager1.erl b/src/machi_chain_manager1.erl index 3bcb462..7244822 100644 --- a/src/machi_chain_manager1.erl +++ b/src/machi_chain_manager1.erl @@ -95,7 +95,8 @@ start_link(MyName, MembersDict) -> start_link(MyName, MembersDict, []). start_link(MyName, MembersDict, MgrOpts) -> - gen_server:start_link(?MODULE, {MyName, MembersDict, MgrOpts}, []). + gen_server:start_link({local, make_regname(MyName)}, ?MODULE, + {MyName, MembersDict, MgrOpts}, []). stop(Pid) -> gen_server:call(Pid, {stop}, infinity). @@ -206,7 +207,10 @@ handle_cast(_Cast, S) -> {noreply, S}. handle_info(Msg, S) -> - exit({bummer, Msg}), + case get(todo_bummer) of undefined -> io:format("TODO: got ~p\n", [Msg]); + _ -> ok + end, + put(todo_bummer, true), {noreply, S}. terminate(_Reason, _S) -> @@ -1562,7 +1566,9 @@ calc_sleep_ranked_order(MinSleep, MaxSleep, FLU, FLU_list) -> Index = length(Front) + 1, NumNodes = length(FLU_list), SleepIndex = NumNodes - Index, - SleepChunk = MaxSleep div NumNodes, + SleepChunk = if NumNodes == 0 -> 0; + true -> MaxSleep div NumNodes + end, MinSleep + (SleepChunk * SleepIndex). my_find_minmost([]) -> @@ -1648,6 +1654,11 @@ inner_projection_or_self(P) -> P_inner end. +make_regname(A) when is_atom(A) -> + list_to_atom(atom_to_list(A) ++ "_chmgr"); +make_regname(B) when is_binary(B) -> + list_to_atom(binary_to_list(B) ++ "_chmgr"). + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% perhaps_call_t(S, Partitions, FLU, DoIt) -> diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl index 54bf08e..91ff10a 100644 --- a/src/machi_flu1.erl +++ b/src/machi_flu1.erl @@ -69,7 +69,7 @@ -export([start_link/1, stop/1]). -record(state, { - reg_name :: atom(), + flu_name :: atom(), proj_store :: pid(), append_pid :: pid(), tcp_port :: non_neg_integer(), @@ -95,15 +95,22 @@ stop(Pid) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%% -main2(RegName, TcpPort, DataDir, Rest) -> - S0 = #state{reg_name=RegName, +main2(FluName, TcpPort, DataDir, Rest) -> + S0 = #state{flu_name=FluName, tcp_port=TcpPort, data_dir=DataDir, props=Rest}, AppendPid = start_append_server(S0), - ProjRegName = make_projection_server_regname(RegName), - {ok, ProjectionPid} = - machi_projection_store:start_link(ProjRegName, DataDir, AppendPid), + {_ProjRegName, ProjectionPid} = + case proplists:get_value(projection_store_registered_name, Rest) of + undefined -> + RN = make_projection_server_regname(FluName), + {ok, PP} = + machi_projection_store:start_link(RN, DataDir, AppendPid), + {RN, PP}; + RN -> + {RN, whereis(RN)} + end, S1 = S0#state{append_pid=AppendPid, proj_store=ProjectionPid}, S2 = case proplists:get_value(dbg, Rest) of @@ -123,7 +130,7 @@ main2(RegName, TcpPort, DataDir, Rest) -> Projection_e = machi_util:make_projection_filename(DataDir, "unused"), ok = filelib:ensure_dir(Projection_e), - put(flu_reg_name, RegName), + put(flu_flu_name, FluName), put(flu_append_pid, AppendPid), put(flu_projection_pid, ProjectionPid), put(flu_listen_pid, ListenPid), @@ -134,44 +141,48 @@ main2(RegName, TcpPort, DataDir, Rest) -> ok. start_listen_server(S) -> - spawn_link(fun() -> run_listen_server(S) end). + proc_lib:spawn_link(fun() -> run_listen_server(S) end). start_append_server(S) -> - spawn_link(fun() -> run_append_server(S) end). + FluPid = self(), + proc_lib:spawn_link(fun() -> run_append_server(FluPid, S) end). %% start_projection_server(S) -> %% spawn_link(fun() -> run_projection_server(S) end). -run_listen_server(#state{tcp_port=TcpPort}=S) -> +run_listen_server(#state{flu_name=FluName, tcp_port=TcpPort}=S) -> + register(make_listener_regname(FluName), self()), SockOpts = [{reuseaddr, true}, {mode, binary}, {active, false}, {packet, line}], {ok, LSock} = gen_tcp:listen(TcpPort, SockOpts), listen_server_loop(LSock, S). -run_append_server(#state{reg_name=Name}=S) -> +run_append_server(FluPid, #state{flu_name=Name}=S) -> register(Name, self()), - append_server_loop(S). + append_server_loop(FluPid, S). listen_server_loop(LSock, S) -> {ok, Sock} = gen_tcp:accept(LSock), spawn_link(fun() -> net_server_loop(Sock, S) end), listen_server_loop(LSock, S). -append_server_loop(#state{data_dir=DataDir}=S) -> +append_server_loop(FluPid, #state{data_dir=DataDir}=S) -> + AppendServerPid = self(), receive {seq_append, From, Prefix, Chunk, CSum} -> spawn(fun() -> append_server_dispatch(From, Prefix, Chunk, CSum, - DataDir) end), - append_server_loop(S); + DataDir, AppendServerPid) end), + %% DataDir, FluPid) end), + append_server_loop(FluPid, S); {wedge_state_change, Boolean} -> - append_server_loop(S#state{wedge=Boolean}) + append_server_loop(FluPid, S#state{wedge=Boolean}) end. -define(EpochIDSpace, (4+20)). -net_server_loop(Sock, #state{reg_name=RegName, data_dir=DataDir}=S) -> +net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) -> ok = inet:setopts(Sock, [{packet, line}]), - case gen_tcp:recv(Sock, 0, 60*1000) of + case gen_tcp:recv(Sock, 0, 600*1000) of {ok, Line} -> %% machi_util:verb("Got: ~p\n", [Line]), PrefixLenLF = byte_size(Line) - 2 - ?EpochIDSpace - 8 - 1, @@ -185,7 +196,7 @@ net_server_loop(Sock, #state{reg_name=RegName, data_dir=DataDir}=S) -> _EpochIDRaw:(?EpochIDSpace)/binary, LenHex:8/binary, Prefix:PrefixLenLF/binary, "\n">> -> - do_net_server_append(RegName, Sock, LenHex, Prefix); + do_net_server_append(FluName, Sock, LenHex, Prefix); <<"R ", _EpochIDRaw:(?EpochIDSpace)/binary, OffsetHex:16/binary, LenHex:8/binary, @@ -233,16 +244,16 @@ net_server_loop(Sock, #state{reg_name=RegName, data_dir=DataDir}=S) -> exit(normal) end. -append_server_dispatch(From, Prefix, Chunk, CSum, DataDir) -> - Pid = write_server_get_pid(Prefix, DataDir), +append_server_dispatch(From, Prefix, Chunk, CSum, DataDir, LinkPid) -> + Pid = write_server_get_pid(Prefix, DataDir, LinkPid), Pid ! {seq_append, From, Prefix, Chunk, CSum}, exit(normal). -do_net_server_append(RegName, Sock, LenHex, Prefix) -> +do_net_server_append(FluName, Sock, LenHex, Prefix) -> %% TODO: robustify against other invalid path characters such as NUL case sanitize_file_string(Prefix) of ok -> - do_net_server_append2(RegName, Sock, LenHex, Prefix); + do_net_server_append2(FluName, Sock, LenHex, Prefix); _ -> ok = gen_tcp:send(Sock, <<"ERROR BAD-ARG">>) end. @@ -255,13 +266,13 @@ sanitize_file_string(Str) -> error end. -do_net_server_append2(RegName, Sock, LenHex, Prefix) -> +do_net_server_append2(FluName, Sock, LenHex, Prefix) -> <> = machi_util:hexstr_to_bin(LenHex), ok = inet:setopts(Sock, [{packet, raw}]), {ok, Chunk} = gen_tcp:recv(Sock, Len, 60*1000), CSum = machi_util:checksum_chunk(Chunk), try - RegName ! {seq_append, self(), Prefix, Chunk, CSum} + FluName ! {seq_append, self(), Prefix, Chunk, CSum} catch error:badarg -> error_logger:error_msg("Message send to ~p gave badarg, make certain server is running with correct registered name\n", [?MODULE]) end, @@ -503,22 +514,30 @@ do_net_server_truncate_hackityhack2(Sock, File, DataDir) -> ok = gen_tcp:send(Sock, "ERROR\n") end. -write_server_get_pid(Prefix, DataDir) -> +write_server_get_pid(Prefix, DataDir, LinkPid) -> case write_server_find_pid(Prefix) of undefined -> - start_seq_append_server(Prefix, DataDir), + start_seq_append_server(Prefix, DataDir, LinkPid), timer:sleep(1), - write_server_get_pid(Prefix, DataDir); + write_server_get_pid(Prefix, DataDir, LinkPid); Pid -> Pid end. write_server_find_pid(Prefix) -> - RegName = machi_util:make_regname(Prefix), - whereis(RegName). + FluName = machi_util:make_regname(Prefix), + whereis(FluName). -start_seq_append_server(Prefix, DataDir) -> - spawn_link(fun() -> run_seq_append_server(Prefix, DataDir) end). +start_seq_append_server(Prefix, DataDir, AppendServerPid) -> + proc_lib:spawn_link(fun() -> + %% The following is only necessary to + %% make nice process relationships in + %% 'appmon' and related tools. + put('$ancestors', [AppendServerPid]), + put('$initial_call', {x,y,3}), + link(AppendServerPid), + run_seq_append_server(Prefix, DataDir) + end). run_seq_append_server(Prefix, DataDir) -> true = register(machi_util:make_regname(Prefix), self()), @@ -586,8 +605,9 @@ seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}=FH_, FileNum, Offset) -> after 30*1000 -> ok = file:close(FHd), ok = file:close(FHc), - machi_util:info_msg("stop: ~p server at file ~w offset ~w\n", - [Prefix, FileNum, Offset]), + machi_util:info_msg("stop: ~p server ~p at file ~w offset ~w\n", + [Prefix, self(), FileNum, Offset]), + machi_util:info_msg("links by ~p = ~p\n", [self(), process_info(self(), links)]), exit(normal) end. @@ -633,5 +653,8 @@ handle_projection_command({list_all_projections, ProjType}, handle_projection_command(Else, _S) -> {error, unknown_cmd, Else}. +make_listener_regname(BaseName) -> + list_to_atom(atom_to_list(BaseName) ++ "_listener"). + make_projection_server_regname(BaseName) -> - list_to_atom(atom_to_list(BaseName) ++ "_projection"). + list_to_atom(atom_to_list(BaseName) ++ "_pstore2"). diff --git a/src/machi_flu_psup.erl b/src/machi_flu_psup.erl new file mode 100644 index 0000000..13ce1b4 --- /dev/null +++ b/src/machi_flu_psup.erl @@ -0,0 +1,76 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +%% @doc Supervisor for Machi FLU servers and their related support +%% servers. + +-module(machi_flu_psup). + +-behaviour(supervisor). + +%% External API +-export([start_flu_package/4]). +%% Internal API +-export([start_link/4]). + +%% Supervisor callbacks +-export([init/1]). + +start_flu_package(FluName, TcpPort, DataDir, Props) -> + Spec = {FluName, {machi_flu_psup, start_link, + [FluName, TcpPort, DataDir, Props]}, + permanent, 5000, supervisor, []}, + {ok, _SupPid} = supervisor:start_child(machi_flu_sup, Spec). + + +start_link(FluName, TcpPort, DataDir, Props) -> + supervisor:start_link({local, make_p_regname(FluName)}, ?MODULE, + [FluName, TcpPort, DataDir, Props]). + +init([FluName, TcpPort, DataDir, Props0]) -> + RestartStrategy = one_for_all, + MaxRestarts = 1000, + MaxSecondsBetweenRestarts = 3600, + SupFlags = {RestartStrategy, MaxRestarts, MaxSecondsBetweenRestarts}, + + ProjRegName = make_proj_supname(FluName), + ProjSpec = {ProjRegName, + {machi_projection_store, start_link, + [ProjRegName, DataDir, zarfus_todo]}, + permanent, 5000, worker, []}, + MgrSpec = {make_mgr_supname(FluName), + {machi_chain_manager1, start_link, + [FluName, []]}, + permanent, 5000, worker, []}, + Props = [{projection_store_registered_name, ProjRegName}|Props0], + FluSpec = {FluName, + {machi_flu1, start_link, + [ [{FluName, TcpPort, DataDir}|Props] ]}, + permanent, 5000, worker, []}, + {ok, {SupFlags, [ProjSpec, MgrSpec, FluSpec]}}. + +make_p_regname(FluName) when is_atom(FluName) -> + list_to_atom("flusup_" ++ atom_to_list(FluName)). + +make_mgr_supname(MgrName) when is_atom(MgrName) -> + list_to_atom(atom_to_list(MgrName) ++ "_s"). + +make_proj_supname(ProjName) when is_atom(ProjName) -> + list_to_atom(atom_to_list(ProjName) ++ "_pstore"). diff --git a/src/machi_flu_sup.erl b/src/machi_flu_sup.erl index ce29502..f74ea07 100644 --- a/src/machi_flu_sup.erl +++ b/src/machi_flu_sup.erl @@ -40,15 +40,6 @@ init([]) -> RestartStrategy = one_for_one, MaxRestarts = 1000, MaxSecondsBetweenRestarts = 3600, - SupFlags = {RestartStrategy, MaxRestarts, MaxSecondsBetweenRestarts}, + {ok, {SupFlags, []}}. - Restart = permanent, - Shutdown = 5000, - Type = worker, - - {ok, FluList} = application:get_env(machi, flu_list), - FluSpecs = [{FluName, {machi_flu, start_link, [FluArgs]}, - Restart, Shutdown, Type, []} || - {FluName, _Port, _Dir}=FluArgs <- FluList], - {ok, {SupFlags, FluSpecs}}. diff --git a/src/machi_projection.erl b/src/machi_projection.erl index 5f97f94..8e48966 100644 --- a/src/machi_projection.erl +++ b/src/machi_projection.erl @@ -51,7 +51,7 @@ new(EpochNum, MyName, MemberDict, Down_list, UPI_list, Repairing_list, Dbg) -> %% or it may be simply `list(p_srvr())', in which case we'll convert it %% to a `p_srvr_dict()'. -new(EpochNum, MyName, MembersDict0, Down_list, UPI_list, Repairing_list, +new(EpochNum, MyName, [_|_] = MembersDict0, Down_list, UPI_list, Repairing_list, Dbg, Dbg2) when is_integer(EpochNum), EpochNum >= 0, is_atom(MyName) orelse is_binary(MyName), @@ -85,6 +85,21 @@ new(EpochNum, MyName, MembersDict0, Down_list, UPI_list, Repairing_list, repairing=Repairing_list, dbg=Dbg }, + update_dbg2(update_checksum(P), Dbg2); +new(EpochNum, MyName, [] = _MembersDict0, _Down_list, _UPI_list,_Repairing_list, + Dbg, Dbg2) + when is_integer(EpochNum), EpochNum >= 0, + is_atom(MyName) orelse is_binary(MyName) -> + P = #projection_v1{epoch_number=EpochNum, + creation_time=now(), + author_server=MyName, + all_members=[], + members_dict=[], + down=[], + upi=[], + repairing=[], + dbg=Dbg + }, update_dbg2(update_checksum(P), Dbg2). %% @doc Update the checksum element of a projection record. diff --git a/test/machi_proxy_flu1_client_test.erl b/test/machi_proxy_flu1_client_test.erl index a22456e..7b1dcbc 100644 --- a/test/machi_proxy_flu1_client_test.erl +++ b/test/machi_proxy_flu1_client_test.erl @@ -65,7 +65,7 @@ api_smoke_test() -> %% Alright, now for the rest of the API, whee BadFile = <<"no-such-file">>, {error, no_such_file} = ?MUT:checksum_list(Prox1, FakeEpoch, BadFile), - {ok, [_]} = ?MUT:list_files(Prox1, FakeEpoch), + {ok, [_|_]} = ?MUT:list_files(Prox1, FakeEpoch), {ok, FakeEpoch} = ?MUT:get_latest_epoch(Prox1, public), {error, not_written} = ?MUT:read_latest_projection(Prox1, public), {error, not_written} = ?MUT:read_projection(Prox1, public, 44),