Add machi_flu_psup.erl to supervise all 3 FLU processes (see below)

Introduce machi_flu_psup:start_flu_package/4 as a way to start all
related FLU processes
    * The projection store
    * The chain manager
    * The FLU itself

... as well as linked processes.
http://www.snookles.com/scotttmp/flu-tree-20150430.png shows one FLU
running, "a".  The process registered "a" is the append server,
"some-prefix" for the sequencer & writer for the current <<"some-prefix">>
file, and a process each for 3 active TCP connections to that FLU.
This commit is contained in:
Scott Lystig Fritchie 2015-04-30 17:28:43 +09:00
parent 9249663ff6
commit 442e79e4f1
7 changed files with 167 additions and 51 deletions

View file

@ -7,7 +7,7 @@
{env, [
{flu_list,
[
{flu_a, 32900, "./data.flu_a"}
%%%%%% {flu_a, 32900, "./data.flu_a"}
]}
]}
]}.

View file

@ -95,7 +95,8 @@ start_link(MyName, MembersDict) ->
start_link(MyName, MembersDict, []).
start_link(MyName, MembersDict, MgrOpts) ->
gen_server:start_link(?MODULE, {MyName, MembersDict, MgrOpts}, []).
gen_server:start_link({local, make_regname(MyName)}, ?MODULE,
{MyName, MembersDict, MgrOpts}, []).
stop(Pid) ->
gen_server:call(Pid, {stop}, infinity).
@ -206,7 +207,10 @@ handle_cast(_Cast, S) ->
{noreply, S}.
handle_info(Msg, S) ->
exit({bummer, Msg}),
case get(todo_bummer) of undefined -> io:format("TODO: got ~p\n", [Msg]);
_ -> ok
end,
put(todo_bummer, true),
{noreply, S}.
terminate(_Reason, _S) ->
@ -1562,7 +1566,9 @@ calc_sleep_ranked_order(MinSleep, MaxSleep, FLU, FLU_list) ->
Index = length(Front) + 1,
NumNodes = length(FLU_list),
SleepIndex = NumNodes - Index,
SleepChunk = MaxSleep div NumNodes,
SleepChunk = if NumNodes == 0 -> 0;
true -> MaxSleep div NumNodes
end,
MinSleep + (SleepChunk * SleepIndex).
my_find_minmost([]) ->
@ -1648,6 +1654,11 @@ inner_projection_or_self(P) ->
P_inner
end.
make_regname(A) when is_atom(A) ->
list_to_atom(atom_to_list(A) ++ "_chmgr");
make_regname(B) when is_binary(B) ->
list_to_atom(binary_to_list(B) ++ "_chmgr").
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
perhaps_call_t(S, Partitions, FLU, DoIt) ->

View file

@ -69,7 +69,7 @@
-export([start_link/1, stop/1]).
-record(state, {
reg_name :: atom(),
flu_name :: atom(),
proj_store :: pid(),
append_pid :: pid(),
tcp_port :: non_neg_integer(),
@ -95,15 +95,22 @@ stop(Pid) ->
%%%%%%%%%%%%%%%%%%%%%%%%%%%%
main2(RegName, TcpPort, DataDir, Rest) ->
S0 = #state{reg_name=RegName,
main2(FluName, TcpPort, DataDir, Rest) ->
S0 = #state{flu_name=FluName,
tcp_port=TcpPort,
data_dir=DataDir,
props=Rest},
AppendPid = start_append_server(S0),
ProjRegName = make_projection_server_regname(RegName),
{ok, ProjectionPid} =
machi_projection_store:start_link(ProjRegName, DataDir, AppendPid),
{_ProjRegName, ProjectionPid} =
case proplists:get_value(projection_store_registered_name, Rest) of
undefined ->
RN = make_projection_server_regname(FluName),
{ok, PP} =
machi_projection_store:start_link(RN, DataDir, AppendPid),
{RN, PP};
RN ->
{RN, whereis(RN)}
end,
S1 = S0#state{append_pid=AppendPid,
proj_store=ProjectionPid},
S2 = case proplists:get_value(dbg, Rest) of
@ -123,7 +130,7 @@ main2(RegName, TcpPort, DataDir, Rest) ->
Projection_e = machi_util:make_projection_filename(DataDir, "unused"),
ok = filelib:ensure_dir(Projection_e),
put(flu_reg_name, RegName),
put(flu_flu_name, FluName),
put(flu_append_pid, AppendPid),
put(flu_projection_pid, ProjectionPid),
put(flu_listen_pid, ListenPid),
@ -134,44 +141,48 @@ main2(RegName, TcpPort, DataDir, Rest) ->
ok.
start_listen_server(S) ->
spawn_link(fun() -> run_listen_server(S) end).
proc_lib:spawn_link(fun() -> run_listen_server(S) end).
start_append_server(S) ->
spawn_link(fun() -> run_append_server(S) end).
FluPid = self(),
proc_lib:spawn_link(fun() -> run_append_server(FluPid, S) end).
%% start_projection_server(S) ->
%% spawn_link(fun() -> run_projection_server(S) end).
run_listen_server(#state{tcp_port=TcpPort}=S) ->
run_listen_server(#state{flu_name=FluName, tcp_port=TcpPort}=S) ->
register(make_listener_regname(FluName), self()),
SockOpts = [{reuseaddr, true},
{mode, binary}, {active, false}, {packet, line}],
{ok, LSock} = gen_tcp:listen(TcpPort, SockOpts),
listen_server_loop(LSock, S).
run_append_server(#state{reg_name=Name}=S) ->
run_append_server(FluPid, #state{flu_name=Name}=S) ->
register(Name, self()),
append_server_loop(S).
append_server_loop(FluPid, S).
listen_server_loop(LSock, S) ->
{ok, Sock} = gen_tcp:accept(LSock),
spawn_link(fun() -> net_server_loop(Sock, S) end),
listen_server_loop(LSock, S).
append_server_loop(#state{data_dir=DataDir}=S) ->
append_server_loop(FluPid, #state{data_dir=DataDir}=S) ->
AppendServerPid = self(),
receive
{seq_append, From, Prefix, Chunk, CSum} ->
spawn(fun() -> append_server_dispatch(From, Prefix, Chunk, CSum,
DataDir) end),
append_server_loop(S);
DataDir, AppendServerPid) end),
%% DataDir, FluPid) end),
append_server_loop(FluPid, S);
{wedge_state_change, Boolean} ->
append_server_loop(S#state{wedge=Boolean})
append_server_loop(FluPid, S#state{wedge=Boolean})
end.
-define(EpochIDSpace, (4+20)).
net_server_loop(Sock, #state{reg_name=RegName, data_dir=DataDir}=S) ->
net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) ->
ok = inet:setopts(Sock, [{packet, line}]),
case gen_tcp:recv(Sock, 0, 60*1000) of
case gen_tcp:recv(Sock, 0, 600*1000) of
{ok, Line} ->
%% machi_util:verb("Got: ~p\n", [Line]),
PrefixLenLF = byte_size(Line) - 2 - ?EpochIDSpace - 8 - 1,
@ -185,7 +196,7 @@ net_server_loop(Sock, #state{reg_name=RegName, data_dir=DataDir}=S) ->
_EpochIDRaw:(?EpochIDSpace)/binary,
LenHex:8/binary,
Prefix:PrefixLenLF/binary, "\n">> ->
do_net_server_append(RegName, Sock, LenHex, Prefix);
do_net_server_append(FluName, Sock, LenHex, Prefix);
<<"R ",
_EpochIDRaw:(?EpochIDSpace)/binary,
OffsetHex:16/binary, LenHex:8/binary,
@ -233,16 +244,16 @@ net_server_loop(Sock, #state{reg_name=RegName, data_dir=DataDir}=S) ->
exit(normal)
end.
append_server_dispatch(From, Prefix, Chunk, CSum, DataDir) ->
Pid = write_server_get_pid(Prefix, DataDir),
append_server_dispatch(From, Prefix, Chunk, CSum, DataDir, LinkPid) ->
Pid = write_server_get_pid(Prefix, DataDir, LinkPid),
Pid ! {seq_append, From, Prefix, Chunk, CSum},
exit(normal).
do_net_server_append(RegName, Sock, LenHex, Prefix) ->
do_net_server_append(FluName, Sock, LenHex, Prefix) ->
%% TODO: robustify against other invalid path characters such as NUL
case sanitize_file_string(Prefix) of
ok ->
do_net_server_append2(RegName, Sock, LenHex, Prefix);
do_net_server_append2(FluName, Sock, LenHex, Prefix);
_ ->
ok = gen_tcp:send(Sock, <<"ERROR BAD-ARG">>)
end.
@ -255,13 +266,13 @@ sanitize_file_string(Str) ->
error
end.
do_net_server_append2(RegName, Sock, LenHex, Prefix) ->
do_net_server_append2(FluName, Sock, LenHex, Prefix) ->
<<Len:32/big>> = machi_util:hexstr_to_bin(LenHex),
ok = inet:setopts(Sock, [{packet, raw}]),
{ok, Chunk} = gen_tcp:recv(Sock, Len, 60*1000),
CSum = machi_util:checksum_chunk(Chunk),
try
RegName ! {seq_append, self(), Prefix, Chunk, CSum}
FluName ! {seq_append, self(), Prefix, Chunk, CSum}
catch error:badarg ->
error_logger:error_msg("Message send to ~p gave badarg, make certain server is running with correct registered name\n", [?MODULE])
end,
@ -503,22 +514,30 @@ do_net_server_truncate_hackityhack2(Sock, File, DataDir) ->
ok = gen_tcp:send(Sock, "ERROR\n")
end.
write_server_get_pid(Prefix, DataDir) ->
write_server_get_pid(Prefix, DataDir, LinkPid) ->
case write_server_find_pid(Prefix) of
undefined ->
start_seq_append_server(Prefix, DataDir),
start_seq_append_server(Prefix, DataDir, LinkPid),
timer:sleep(1),
write_server_get_pid(Prefix, DataDir);
write_server_get_pid(Prefix, DataDir, LinkPid);
Pid ->
Pid
end.
write_server_find_pid(Prefix) ->
RegName = machi_util:make_regname(Prefix),
whereis(RegName).
FluName = machi_util:make_regname(Prefix),
whereis(FluName).
start_seq_append_server(Prefix, DataDir) ->
spawn_link(fun() -> run_seq_append_server(Prefix, DataDir) end).
start_seq_append_server(Prefix, DataDir, AppendServerPid) ->
proc_lib:spawn_link(fun() ->
%% The following is only necessary to
%% make nice process relationships in
%% 'appmon' and related tools.
put('$ancestors', [AppendServerPid]),
put('$initial_call', {x,y,3}),
link(AppendServerPid),
run_seq_append_server(Prefix, DataDir)
end).
run_seq_append_server(Prefix, DataDir) ->
true = register(machi_util:make_regname(Prefix), self()),
@ -586,8 +605,9 @@ seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}=FH_, FileNum, Offset) ->
after 30*1000 ->
ok = file:close(FHd),
ok = file:close(FHc),
machi_util:info_msg("stop: ~p server at file ~w offset ~w\n",
[Prefix, FileNum, Offset]),
machi_util:info_msg("stop: ~p server ~p at file ~w offset ~w\n",
[Prefix, self(), FileNum, Offset]),
machi_util:info_msg("links by ~p = ~p\n", [self(), process_info(self(), links)]),
exit(normal)
end.
@ -633,5 +653,8 @@ handle_projection_command({list_all_projections, ProjType},
handle_projection_command(Else, _S) ->
{error, unknown_cmd, Else}.
make_listener_regname(BaseName) ->
list_to_atom(atom_to_list(BaseName) ++ "_listener").
make_projection_server_regname(BaseName) ->
list_to_atom(atom_to_list(BaseName) ++ "_projection").
list_to_atom(atom_to_list(BaseName) ++ "_pstore2").

76
src/machi_flu_psup.erl Normal file
View file

@ -0,0 +1,76 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
%% @doc Supervisor for Machi FLU servers and their related support
%% servers.
-module(machi_flu_psup).
-behaviour(supervisor).
%% External API
-export([start_flu_package/4]).
%% Internal API
-export([start_link/4]).
%% Supervisor callbacks
-export([init/1]).
start_flu_package(FluName, TcpPort, DataDir, Props) ->
Spec = {FluName, {machi_flu_psup, start_link,
[FluName, TcpPort, DataDir, Props]},
permanent, 5000, supervisor, []},
{ok, _SupPid} = supervisor:start_child(machi_flu_sup, Spec).
start_link(FluName, TcpPort, DataDir, Props) ->
supervisor:start_link({local, make_p_regname(FluName)}, ?MODULE,
[FluName, TcpPort, DataDir, Props]).
init([FluName, TcpPort, DataDir, Props0]) ->
RestartStrategy = one_for_all,
MaxRestarts = 1000,
MaxSecondsBetweenRestarts = 3600,
SupFlags = {RestartStrategy, MaxRestarts, MaxSecondsBetweenRestarts},
ProjRegName = make_proj_supname(FluName),
ProjSpec = {ProjRegName,
{machi_projection_store, start_link,
[ProjRegName, DataDir, zarfus_todo]},
permanent, 5000, worker, []},
MgrSpec = {make_mgr_supname(FluName),
{machi_chain_manager1, start_link,
[FluName, []]},
permanent, 5000, worker, []},
Props = [{projection_store_registered_name, ProjRegName}|Props0],
FluSpec = {FluName,
{machi_flu1, start_link,
[ [{FluName, TcpPort, DataDir}|Props] ]},
permanent, 5000, worker, []},
{ok, {SupFlags, [ProjSpec, MgrSpec, FluSpec]}}.
make_p_regname(FluName) when is_atom(FluName) ->
list_to_atom("flusup_" ++ atom_to_list(FluName)).
make_mgr_supname(MgrName) when is_atom(MgrName) ->
list_to_atom(atom_to_list(MgrName) ++ "_s").
make_proj_supname(ProjName) when is_atom(ProjName) ->
list_to_atom(atom_to_list(ProjName) ++ "_pstore").

View file

@ -40,15 +40,6 @@ init([]) ->
RestartStrategy = one_for_one,
MaxRestarts = 1000,
MaxSecondsBetweenRestarts = 3600,
SupFlags = {RestartStrategy, MaxRestarts, MaxSecondsBetweenRestarts},
{ok, {SupFlags, []}}.
Restart = permanent,
Shutdown = 5000,
Type = worker,
{ok, FluList} = application:get_env(machi, flu_list),
FluSpecs = [{FluName, {machi_flu, start_link, [FluArgs]},
Restart, Shutdown, Type, []} ||
{FluName, _Port, _Dir}=FluArgs <- FluList],
{ok, {SupFlags, FluSpecs}}.

View file

@ -51,7 +51,7 @@ new(EpochNum, MyName, MemberDict, Down_list, UPI_list, Repairing_list, Dbg) ->
%% or it may be simply `list(p_srvr())', in which case we'll convert it
%% to a `p_srvr_dict()'.
new(EpochNum, MyName, MembersDict0, Down_list, UPI_list, Repairing_list,
new(EpochNum, MyName, [_|_] = MembersDict0, Down_list, UPI_list, Repairing_list,
Dbg, Dbg2)
when is_integer(EpochNum), EpochNum >= 0,
is_atom(MyName) orelse is_binary(MyName),
@ -85,6 +85,21 @@ new(EpochNum, MyName, MembersDict0, Down_list, UPI_list, Repairing_list,
repairing=Repairing_list,
dbg=Dbg
},
update_dbg2(update_checksum(P), Dbg2);
new(EpochNum, MyName, [] = _MembersDict0, _Down_list, _UPI_list,_Repairing_list,
Dbg, Dbg2)
when is_integer(EpochNum), EpochNum >= 0,
is_atom(MyName) orelse is_binary(MyName) ->
P = #projection_v1{epoch_number=EpochNum,
creation_time=now(),
author_server=MyName,
all_members=[],
members_dict=[],
down=[],
upi=[],
repairing=[],
dbg=Dbg
},
update_dbg2(update_checksum(P), Dbg2).
%% @doc Update the checksum element of a projection record.

View file

@ -65,7 +65,7 @@ api_smoke_test() ->
%% Alright, now for the rest of the API, whee
BadFile = <<"no-such-file">>,
{error, no_such_file} = ?MUT:checksum_list(Prox1, FakeEpoch, BadFile),
{ok, [_]} = ?MUT:list_files(Prox1, FakeEpoch),
{ok, [_|_]} = ?MUT:list_files(Prox1, FakeEpoch),
{ok, FakeEpoch} = ?MUT:get_latest_epoch(Prox1, public),
{error, not_written} = ?MUT:read_latest_projection(Prox1, public),
{error, not_written} = ?MUT:read_projection(Prox1, public, 44),