Basic stuff to add new flus via 'pending' dir

This commit is contained in:
Scott Lystig Fritchie 2015-12-09 14:48:46 +09:00
parent 7301c8308e
commit 65eec61f82
5 changed files with 110 additions and 15 deletions

View file

@ -16,6 +16,10 @@
%% Default = 10
%% {metadata_manager_count, 2},
%% Platform vars (mirror of reltool packaging)
{platform_data_dir, "{{platform_data_dir}}"},
{platform_etc_dir, "{{platform_etc_dir}}"},
%% Do not delete, do not put Machi config items after this line.
{final_comma_stopper, do_not_delete}
]

View file

@ -95,6 +95,8 @@
{template, "files/app.config", "etc/app.config"},
{mkdir, "etc/chain-config"},
{mkdir, "etc/flu-config"},
{mkdir, "etc/pending"},
{mkdir, "etc/rejected"},
{mkdir, "lib/basho-patches"}
%% {copy, "../apps/machi/ebin/etop_txt.beam", "lib/basho-patches"}
]}.

View file

@ -180,5 +180,8 @@ get_env(Setting, Default) ->
get_data_dir(Props) ->
case proplists:get_value(data_dir, Props) of
Path when is_list(Path) ->
Path
Path;
undefined ->
{ok, Dir} = application:get_env(machi, flu_data_dir),
Dir
end.

View file

@ -47,7 +47,9 @@
-endif. %TEST
%% API
-export([start_link/0, load_rc_d_files_from_dir/1]).
-export([start_link/0,
get_initial_flus/0, load_rc_d_files_from_dir/1,
sanitize_p_srvr_records/1]).
%% Supervisor callbacks
-export([init/1]).
@ -81,22 +83,22 @@ get_initial_flus() ->
DoesNotExist ->
DoesNotExist;
Dir ->
ok = filelib:ensure_dir(Dir ++ "/unused"),
Dir
end,
sanitize_p_srvr_records(load_rc_d_files_from_dir(ConfigDir)).
Ps = [P || {_File, P} <- load_rc_d_files_from_dir(ConfigDir)],
sanitize_p_srvr_records(Ps).
-endif. % PULSE
load_rc_d_files_from_dir(Dir) ->
Files = filelib:wildcard(Dir ++ "/*"),
lists:append([case file:consult(File) of
{ok, X} ->
X;
_ ->
lager:warning("Error parsing file '~s', ignoring",
[File]),
[]
end || File <- Files]).
[case file:consult(File) of
{ok, [X]} ->
{File, X};
_ ->
lager:warning("Error parsing file '~s', ignoring",
[File]),
{File, []}
end || File <- Files].
sanitize_p_srvr_records(Ps) ->
{Sane, _} = lists:foldl(fun sanitize_p_srvr_rec/2, {[], dict:new()}, Ps),

View file

@ -189,7 +189,8 @@
-include("machi_projection.hrl").
%% API
-export([start_link/0]).
-export([start_link/0,
process_pending/0]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
@ -205,12 +206,20 @@
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
process_pending() ->
gen_server:call(?SERVER, {process_pending}, infinity).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
init([]) ->
self() ! finish_init,
{ok, #state{}}.
handle_call({process_pending}, _From, State) ->
{Reply, NewState} = do_process_pending(State),
{reply, Reply, NewState};
handle_call(_Request, _From, State) ->
Reply = ok,
Reply = 'whatwatwha????????????????????',
{reply, Reply, State}.
handle_cast(_Msg, State) ->
@ -227,7 +236,7 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%%%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
finish_init(S) ->
%% machi_flu_sup will start all FLUs that have a valid definition
@ -335,6 +344,81 @@ bootstrap_chain(#chain_def_v1{name=ChainName, mode=CMode, full=Full,
ok
end.
do_process_pending(S) ->
PendingDir = get_pending_dir(S),
%% PendingFiles = get_pending_files(PendingDir, S),
PendingParsed = machi_flu_sup:load_rc_d_files_from_dir(PendingDir),
P_FLUs = [X || {_File, #p_srvr{}}=X <- PendingParsed],
P_Chains = [X || {_File, #chain_def_v1{}}=X <- PendingParsed],
BadFiles = [File || {File, []} <- PendingParsed],
S2 = process_pending_flus(P_FLUs, S),
S3 = process_pending_chains(P_Chains, S2),
S4 = process_bad_files(BadFiles, S3),
{{P_FLUs, P_Chains}, S4}.
get_pending_dir(_S) ->
{ok, EtcDir} = application:get_env(machi, platform_etc_dir),
EtcDir ++ "/pending".
get_rejected_dir(_S) ->
{ok, EtcDir} = application:get_env(machi, platform_etc_dir),
EtcDir ++ "/rejected".
get_flu_config_dir(_S) ->
{ok, Dir} = application:get_env(machi, flu_config_dir),
Dir.
get_chain_config_dir(_S) ->
{ok, Dir} = application:get_env(machi, chain_config_dir),
Dir.
process_pending_flus(P_FLUs, S) ->
lists:foldl(fun process_pending_flu/2, S, P_FLUs).
process_pending_flu({File, P}, S) ->
#p_srvr{name=FLU} = P,
CurrentPs = machi_flu_sup:get_initial_flus(),
Valid_Ps = machi_flu_sup:sanitize_p_srvr_records(CurrentPs ++ [P]),
case lists:member(P, Valid_Ps)
andalso
(not lists:keymember(FLU, #p_srvr.name, CurrentPs)) of
false ->
lager:error("Pending FLU config file ~s has been rejected\n",
[File]),
_ = move_to_rejected(File, S),
S;
true ->
{ok, SupPid} = machi_flu_psup:start_flu_package(P),
lager:info("Started FLU ~w with supervisor pid ~p\n",
[FLU, SupPid]),
_ = move_to_flu_config(FLU, File, S),
S
end.
process_pending_chains(P_Chains, S) ->
lists:foldl(fun process_pending_chain/2, S, P_Chains).
process_pending_chain({_File, #chain_def_v1{}}, S) ->
io:format(user, "TODO: ~s LINE ~p\n", [?MODULE, ?LINE]),
S.
process_bad_files(Files, S) ->
lists:foldl(fun move_to_rejected/2, S, Files).
move_to_rejected(File, S) ->
lager:error("Pending unknown config file ~s has been rejected\n", [File]),
Dst = get_rejected_dir(S),
Suffix = lists:flatten(io_lib:format("~w,~w,~w",
tuple_to_list(os:timestamp()))),
ok = file:rename(File, Dst ++ "/" ++ filename:basename(File) ++ Suffix),
S.
move_to_flu_config(FLU, File, S) ->
lager:info("Creating FLU config file ~w\n", [FLU]),
Dst = get_flu_config_dir(S),
ok = file:rename(File, Dst ++ "/" ++ atom_to_list(FLU)),
S.
%% 1. Don't worry about partial writes to config dir: that's a Policy
%% implementation worry: create a temp file, fsync(2), then rename(2)
%% and possibly fsync(2) on the dir.