WIP: modify chain still a bit broken
This commit is contained in:
parent
65eec61f82
commit
2871f8397c
3 changed files with 117 additions and 45 deletions
|
@ -82,7 +82,9 @@
|
|||
mode :: pv1_consistency_mode(),
|
||||
full :: [p_srvr()],
|
||||
witnesses :: [p_srvr()],
|
||||
props = [] :: list() % proplist for other related info
|
||||
old_all :: [pv1_server()],
|
||||
old_witnesses :: [pv1_server()],
|
||||
props = [] :: list() % proplist for other related info
|
||||
}).
|
||||
|
||||
-endif. % !MACHI_PROJECTION_HRL
|
||||
|
|
|
@ -1255,7 +1255,7 @@ react_to_env_A20(Retries, #ch_mgr{name=MyName, proj=P_current}=S) ->
|
|||
P_none0 = make_none_projection(Epoch,
|
||||
MyName, All_list, Witness_list, MembersDict),
|
||||
P_none = P_none0#projection_v1{chain_name=ChainName},
|
||||
{{x,y,z,42,77}, set_proj(S2, P_none)};
|
||||
{{now_using,[],Epoch}, set_proj(S2, P_none)};
|
||||
_ ->
|
||||
react_to_env_A21(Retries, UnanimousTag, P_latest, ReadExtra, S2)
|
||||
end.
|
||||
|
|
|
@ -126,6 +126,9 @@
|
|||
%% </li>
|
||||
%% </ul>
|
||||
%%
|
||||
%% A FLU may be created or removed (via implicit or explicit policy).
|
||||
%% An existing FLU may not be reconfigured.
|
||||
%%
|
||||
%% == The Chain Lifecycle ==
|
||||
%%
|
||||
%% If a FLU on the local machine is expected to participate in a
|
||||
|
@ -176,6 +179,17 @@
|
|||
%% </li>
|
||||
%% </ul>
|
||||
%%
|
||||
%% A chain may be created, removed, or modified.
|
||||
%%
|
||||
%% A modification request writes a `#chain_def_v1{}' record with the
|
||||
%% same name but different `full' and/or `witnesses' list to each
|
||||
%% machine that hosts a FLU in the chain (in both the old and new
|
||||
%% versions of the chain).
|
||||
%%
|
||||
%% A deletion request writes a `#chain_def_v1{}' record with the same
|
||||
%% name but empty lists for `full' and `witnesses' to each machine
|
||||
%% that hosts a FLU in the chain (in the old version of the chain).
|
||||
%%
|
||||
%% == Conflicts with TCP ports, FLU & chain names, etc ==
|
||||
%%
|
||||
%% This manager is not responsible for managing conflicts in resource
|
||||
|
@ -273,7 +287,9 @@ get_initial_chains() ->
|
|||
ok = filelib:ensure_dir(Dir ++ "/unused"),
|
||||
Dir
|
||||
end,
|
||||
sanitize_chain_def_records(machi_flu_sup:load_rc_d_files_from_dir(ConfigDir)).
|
||||
CDefs = [CDef || {_File, CDef} <- machi_flu_sup:load_rc_d_files_from_dir(
|
||||
ConfigDir)],
|
||||
sanitize_chain_def_records(CDefs).
|
||||
|
||||
sanitize_chain_def_records(Ps) ->
|
||||
{Sane, _} = lists:foldl(fun sanitize_chain_def_rec/2, {[], dict:new()}, Ps),
|
||||
|
@ -285,23 +301,22 @@ sanitize_chain_def_rec(Whole, {Acc, D}) ->
|
|||
mode=Mode,
|
||||
full=Full,
|
||||
witnesses=Witnesses} = Whole,
|
||||
true = is_atom(Name),
|
||||
{true, 10} = {is_atom(Name), 10},
|
||||
NameK = {name, Name},
|
||||
error = dict:find(NameK, D),
|
||||
true = (Mode == ap_mode orelse Mode == cp_mode),
|
||||
{error, 20} = {dict:find(NameK, D), 20},
|
||||
{true, 30} = {(Mode == ap_mode orelse Mode == cp_mode), 30},
|
||||
IsPSrvr = fun(X) when is_record(X, p_srvr) -> true;
|
||||
(_) -> false
|
||||
end,
|
||||
true = lists:all(IsPSrvr, Full),
|
||||
true = lists:all(IsPSrvr, Witnesses),
|
||||
{true, 40} = {lists:all(IsPSrvr, Full), 40},
|
||||
{true, 50} = {lists:all(IsPSrvr, Witnesses), 50},
|
||||
|
||||
%% All is sane enough.
|
||||
D2 = dict:store(NameK, Name, D),
|
||||
{[Whole|Acc], D2}
|
||||
catch _X:_Y ->
|
||||
_ = lager:log(error, self(),
|
||||
"~s: Bad chain_def record (~w ~w), skipping: ~P\n",
|
||||
[?MODULE, _X, _Y, Whole, 15]),
|
||||
catch X:Y ->
|
||||
lager:error("~s: Bad chain_def record (~w ~w), skipping: ~P\n",
|
||||
[?MODULE, X, Y, Whole, 15]),
|
||||
{Acc, D}
|
||||
end.
|
||||
|
||||
|
@ -323,25 +338,62 @@ perhaps_bootstrap_chains([CD|ChainDefs], LocalFLUs_at_zero, LocalFLUs) ->
|
|||
perhaps_bootstrap_chains(ChainDefs, LocalFLUs_at_zero, LocalFLUs);
|
||||
[FLU1|_]=FLUs ->
|
||||
%% One FLU is enough: Humming Consensus will config the remaining
|
||||
bootstrap_chain(CD, FLU1),
|
||||
_ = bootstrap_chain(CD, FLU1),
|
||||
perhaps_bootstrap_chains(ChainDefs, LocalFLUs_at_zero -- FLUs,
|
||||
LocalFLUs -- FLUs)
|
||||
end.
|
||||
|
||||
bootstrap_chain(#chain_def_v1{name=ChainName, mode=CMode, full=Full,
|
||||
witnesses=Witnesses, props=Props}=CD, FLU) ->
|
||||
All_p_srvrs = Full ++ Witnesses,
|
||||
bootstrap_chain(CD, FLU) ->
|
||||
bootstrap_chain2(CD, FLU, 20).
|
||||
|
||||
bootstrap_chain2(CD, FLU, 0) ->
|
||||
lager:warning("Failed all attempts to bootstrap chain ~w via FLU ~w ",
|
||||
[CD,FLU]),
|
||||
failed;
|
||||
bootstrap_chain2(#chain_def_v1{name=NewChainName, mode=CMode, full=Full,
|
||||
witnesses=Witnesses, props=Props}=CD,
|
||||
FLU, N) ->
|
||||
All_p_srvrs = Witnesses ++ Full,
|
||||
L = [{Name, P_srvr} || #p_srvr{name=Name}=P_srvr <- All_p_srvrs],
|
||||
MembersDict = orddict:from_list(L),
|
||||
Mgr = machi_chain_manager1:make_chmgr_regname(FLU),
|
||||
case machi_chain_manager1:set_chain_members(Mgr, ChainName, 0, CMode,
|
||||
MembersDict, Props) of
|
||||
PStore = machi_flu1:make_projection_server_regname(FLU),
|
||||
{ok, #projection_v1{epoch_number=OldEpoch, chain_name=OldChainName,
|
||||
all_members=OldAll_list, witnesses=OldWitnesses}} =
|
||||
machi_projection_store:read_latest_projection(PStore, private),
|
||||
NewAll_list = [Name || #p_srvr{name=Name} <- All_p_srvrs],
|
||||
case set_chain_members(OldChainName, NewChainName,
|
||||
OldAll_list, OldWitnesses,
|
||||
NewAll_list, NewWitnesses,
|
||||
Mgr, NewChainName, OldEpoch,
|
||||
CMode, MembersDict, NewWitnesses) of
|
||||
ok ->
|
||||
ok;
|
||||
lager:info("Bootstrapped chain ~w via FLU ~w to "
|
||||
"mode=~w all=~w witnesses=~w\n",
|
||||
[NewChainName, FLU, CMode, All_list, NewWitnesses]),
|
||||
AddedFLUs = NewAll_list -- OldAll_list,
|
||||
RemovedFLUs = OldAll_list -- NewAll_list,
|
||||
{ok, AddedFLUs, RemovedFLUs};
|
||||
Else ->
|
||||
error_logger:warning("Attempt to bootstrap chain ~w via FLU ~w "
|
||||
"failed: ~w (defn ~w)\n", [Else, CD]),
|
||||
ok
|
||||
lager:error("Attempt to bootstrap chain ~w via FLU ~w "
|
||||
"failed: ~w (defn ~w)\n",
|
||||
[NewChainName, FLU, Else, CD]),
|
||||
timer:sleep(555),
|
||||
bootstrap_chain2(CD, FLU, N-1)
|
||||
end.
|
||||
|
||||
set_chain_members(OldChainName, NewChainName,
|
||||
OldAll_list, OldWitnesses, NewAll_list, NewWitnesses,
|
||||
Mgr, ChainName, OldEpoch, CMode, MembersDict, Props) ->
|
||||
if OldChainName == NewChainName,
|
||||
OldAll_list == NewAll_list, OldWitnesses == NewWitnesses ->
|
||||
ok;
|
||||
OldEpoch == 0 orelse (OldChainName == NewChainName) ->
|
||||
machi_chain_manager1:set_chain_members(Mgr, ChainName, OldEpoch,
|
||||
CMode,
|
||||
MembersDict, NewWitnesses);
|
||||
true ->
|
||||
chain_bad_arg
|
||||
end.
|
||||
|
||||
do_process_pending(S) ->
|
||||
|
@ -398,7 +450,47 @@ process_pending_flu({File, P}, S) ->
|
|||
process_pending_chains(P_Chains, S) ->
|
||||
lists:foldl(fun process_pending_chain/2, S, P_Chains).
|
||||
|
||||
process_pending_chain({_File, #chain_def_v1{}}, S) ->
|
||||
process_pending_chain({File, CD}, S) ->
|
||||
#chain_def_v1{name=Name, mode=CMode, full=Full, witnesses=Witnesses} = CD,
|
||||
case sanitize_chain_def_records([CD]) of
|
||||
[CD] ->
|
||||
RunningFLUs = get_local_running_flus(),
|
||||
AllNames = [Name || #p_srvr{name=Name} <- Full ++ Witnesses],
|
||||
case ordsets:intersection(ordsets:from_list(AllNames),
|
||||
ordsets:from_list(RunningFLUs)) of
|
||||
[] ->
|
||||
lager:error("Pending chain config file ~s "
|
||||
"has no local FLUs on this machine, rejected\n",
|
||||
[File]),
|
||||
_ = move_to_rejected(File, S),
|
||||
S;
|
||||
[FLU|_]=LocalFLUs ->
|
||||
%% TODO: Between the successful chain change inside of
|
||||
%% bootstrap_chain() (and before it returns to us!) and
|
||||
%% the return of process_pending_chain2(), we have a race
|
||||
%% window if this process crashes.
|
||||
case bootstrap_chain(CD, FLU) of
|
||||
{ok, AddedFLUs, RemovedFLUs} ->
|
||||
process_pending_chain2(File, CD, LocalFLUs,
|
||||
AddedFLUs, RemovedFLUs, S);
|
||||
Else ->
|
||||
lager:error("Pending chain config file ~s "
|
||||
"has failed (~w), rejected\n",
|
||||
[Else, File]),
|
||||
_ = move_to_rejected(File, S),
|
||||
S
|
||||
end
|
||||
end;
|
||||
[] ->
|
||||
lager:error("Pending chain config file ~s has been rejected\n",
|
||||
[File]),
|
||||
_ = move_to_rejected(File, S),
|
||||
S
|
||||
end.
|
||||
|
||||
process_pending_chain2(File, CD, LocalFLUs, AddedFLUs, RemovedFLUs, S) ->
|
||||
%% AddedFLUs: no any extra work.
|
||||
%% RemovedFLUs: Stop the FLU, config -> j-i-c, data -> j-i-c
|
||||
io:format(user, "TODO: ~s LINE ~p\n", [?MODULE, ?LINE]),
|
||||
S.
|
||||
|
||||
|
@ -418,25 +510,3 @@ move_to_flu_config(FLU, File, S) ->
|
|||
Dst = get_flu_config_dir(S),
|
||||
ok = file:rename(File, Dst ++ "/" ++ atom_to_list(FLU)),
|
||||
S.
|
||||
|
||||
%% 1. Don't worry about partial writes to config dir: that's a Policy
|
||||
%% implementation worry: create a temp file, fsync(2), then rename(2)
|
||||
%% and possibly fsync(2) on the dir.
|
||||
%%
|
||||
%% 2. Add periodic re-scan check FLUs & chains (in that order).
|
||||
%%
|
||||
%% 3. Add force re-rescan of FLUs *or* chains. (separate is better?)
|
||||
%%
|
||||
%% 4. For chain modification: add "change" directory? Scan looks in
|
||||
%% "change"?
|
||||
%%
|
||||
%% 5. Easy comparison for current vs. new chain config.
|
||||
%% - identify new FLUs: no action required, HC will deal with it?
|
||||
%% - identify removed FLUs: set_chain_members() to change HC
|
||||
%% : stop FLU package (after short pause)
|
||||
%% : move config file -> just-in-case
|
||||
%% : move data files -> just-in-case
|
||||
%% - identify j-i-c dir in case resuming from interruption??? to avoid problem of putting config & data files in different places?
|
||||
%% - move data files itself may be interrupted?
|
||||
%% - move chain def'n from change -> chain_config_dir
|
||||
|
||||
|
|
Loading…
Reference in a new issue