Add CP mode unwedge. All eunit tests are passing again.

This commit is contained in:
Scott Lystig Fritchie 2015-08-26 18:47:39 +09:00
parent 9222881689
commit 28335a1310
4 changed files with 111 additions and 32 deletions

View file

@ -61,6 +61,7 @@
name :: pv1_server(),
flap_limit :: non_neg_integer(),
proj :: projection(),
proj_unanimous :: boolean(),
%%
timer :: 'undefined' | timer:tref(),
ignore_timer :: boolean(),
@ -272,8 +273,7 @@ init({MyName, InitMembersDict, MgrOpts}) ->
{network_islands, Opt(network_islands, [])},
{up_nodes, Opt(up_nodes, not_init_yet)}],
ActiveP = Opt(active_mode, true),
S = #ch_mgr{name=MyName,
proj=Proj,
S = set_proj(#ch_mgr{name=MyName,
%% TODO 2015-03-04: revisit, should this constant be bigger?
%% Yes, this should be bigger, but it's a hack. There is
%% no guarantee that all parties will advance to a minimum
@ -284,7 +284,7 @@ init({MyName, InitMembersDict, MgrOpts}) ->
not_sanes=orddict:new(),
consistency_mode=CMode,
runenv=RunEnv,
opts=MgrOpts},
opts=MgrOpts}, Proj),
{_, S2} = do_set_chain_members_dict(MembersDict, S),
S3 = if ActiveP == false ->
S2;
@ -310,11 +310,7 @@ handle_call({set_chain_members, MembersDict, Witness_list}, _From,
NewUPI = OldUPI -- MissingInNew,
NewDown = All_list -- NewUPI,
NewEpoch = OldEpoch + ?SET_CHAIN_MEMBERS_EPOCH_SKIP,
CMode = if Witness_list == [] ->
ap_mode;
Witness_list /= [] ->
cp_mode
end,
CMode = calc_consistency_mode(Witness_list),
ok = set_consistency_mode(machi_flu_psup:make_proj_supname(MyName), CMode),
NewProj = machi_projection:update_checksum(
OldProj#projection_v1{author_server=MyName,
@ -329,8 +325,8 @@ handle_call({set_chain_members, MembersDict, Witness_list}, _From,
members_dict=MembersDict}),
%% Reset all flapping state.
NewProj2 = NewProj#projection_v1{flap=make_flapping_i()},
S3 = clear_flapping_state(S2#ch_mgr{proj=NewProj2,
proj_history=queue:new()}),
S3 = clear_flapping_state(set_proj(S2#ch_mgr{proj_history=queue:new()},
NewProj2)),
{_QQ, S4} = do_react_to_env(S3),
{reply, Reply, S4};
handle_call({set_active, Boolean}, _From, #ch_mgr{timer=TRef}=S) ->
@ -461,7 +457,7 @@ store_zeroth_projection_maybe(ZeroProj, MgrOpts) ->
get_projection_store_regname(MgrOpts) ->
proplists:get_value(projection_store_registered_name, MgrOpts).
set_consistency_mode(undefined, CMode) ->
set_consistency_mode(undefined, _CMode) ->
ok;
set_consistency_mode(ProjStore, CMode) ->
machi_projection_store:set_consistency_mode(ProjStore, CMode).
@ -540,6 +536,12 @@ read_latest_projection_call_only(ProjectionType, AllHosed,
#projection_v1{all_members=All_list} = CurrentProj,
All_queried_list = All_list -- AllHosed,
{Rs, S2} = read_latest_projection_call_only2(ProjectionType,
All_queried_list, S),
FLUsRs = lists:zip(All_queried_list, Rs),
{All_queried_list, FLUsRs, S2}.
read_latest_projection_call_only2(ProjectionType, All_queried_list, S) ->
{_UpNodes, Partitions, S2} = calc_up_nodes(S),
DoIt = fun(Pid) ->
case (?FLU_PC:read_latest_projection(Pid, ProjectionType, ?TO)) of
@ -547,12 +549,9 @@ read_latest_projection_call_only(ProjectionType, AllHosed,
Else -> Else
end
end,
Rs = [perhaps_call_t(S, Partitions, FLU, fun(Pid) -> DoIt(Pid) end) ||
Rs = [(catch perhaps_call_t(S, Partitions, FLU, fun(Pid) -> DoIt(Pid) end)) ||
FLU <- All_queried_list],
%% Rs = [perhaps_call_t(S, Partitions, FLU, fun(Pid) -> DoIt(Pid) end) ||
%% FLU <- All_queried_list],
FLUsRs = lists:zip(All_queried_list, Rs),
{All_queried_list, FLUsRs, S2}.
{Rs, S2}.
cl_read_latest_projection(ProjectionType, S) ->
AllHosed = [],
@ -985,8 +984,10 @@ do_react_to_env(#ch_mgr{name=MyName,
{{empty_members_dict, [], Epoch}, S};
true ->
{_, S2} = do_set_chain_members_dict(NewMembersDict, S),
CMode = calc_consistency_mode(NewProj#projection_v1.witnesses),
{{empty_members_dict, [], Epoch},
S2#ch_mgr{proj=NewProj, members_dict=NewMembersDict}}
set_proj(S2#ch_mgr{members_dict=NewMembersDict,
consistency_mode=CMode}, NewProj)}
end;
do_react_to_env(S) ->
%% The not_sanes manager counting dictionary is not strictly
@ -1033,13 +1034,15 @@ do_react_to_env(S) ->
%%
put(react, []),
try
if S#ch_mgr.sane_transitions > 3 -> % TODO review this constant
%% ?V("Skr,~w,", [S#ch_mgr.name]),
react_to_env_A10(S#ch_mgr{not_sanes=orddict:new()});
true ->
%% ?V("Sk,~w,~w,", [S#ch_mgr.name, S#ch_mgr.sane_transitions]),
react_to_env_A10(S)
end
S2 = if S#ch_mgr.sane_transitions > 3 -> % TODO review this constant
S#ch_mgr{not_sanes=orddict:new()};
true ->
S
end,
%% When in CP mode, we call the poll function twice: once before
%% reacting & once after. This call is the 2nd.
{Res, S3} = react_to_env_A10(S2),
{Res, poll_private_proj_is_upi_unanimous(S3)}
catch
throw:{zerf,_}=_Throw ->
Proj = S#ch_mgr.proj,
@ -1049,7 +1052,7 @@ io:format(user, "zerf ~p caught ~p\n", [S#ch_mgr.name, _Throw]),
react_to_env_A10(S) ->
?REACT(a10),
react_to_env_A20(0, S).
react_to_env_A20(0, poll_private_proj_is_upi_unanimous(S)).
react_to_env_A20(Retries, #ch_mgr{name=MyName}=S) ->
?REACT(a20),
@ -1150,7 +1153,7 @@ react_to_env_A29(Retries, P_latest, LatestUnanimousP, ReadExtra,
%% io:format(user, "zerf_in @ A29: ~p: ~w\n", [MyName, machi_projection:make_summary(Zerf)]),
P_current2 = Zerf#projection_v1{
flap=P_current#projection_v1.flap},
%% io:format(user, "A29 ~w cur_flap ~W, ", [S#ch_mgr.name, P_current#projection_v1.flap, 8]),
%% Do not use the usual set_proj() wrapper here.
react_to_env_A30(Retries, P_latest, LatestUnanimousP,
ReadExtra, S#ch_mgr{proj=P_current2});
Zerf ->
@ -1629,6 +1632,7 @@ react_to_env_A49(_P_latest, FinalProps, #ch_mgr{name=MyName,
members_dict=MembersDict} = P_current,
P_none = make_none_projection(MyName, All_list, Witness_list,
MembersDict),
%% Do not use the usual set_proj() wrapper here.
react_to_env_A50(P_none, FinalProps, S#ch_mgr{proj=P_none}).
react_to_env_A50(P_latest, FinalProps, #ch_mgr{proj=P_current}=S) ->
@ -2031,8 +2035,8 @@ react_to_env_C120(P_latest, FinalProps, #ch_mgr{proj_history=H,
diversion_c120_verbose_goop(P_latest, S),
?REACT({c120, [{latest, machi_projection:make_summary(P_latest)}]}),
{{now_using, FinalProps, P_latest#projection_v1.epoch_number},
S#ch_mgr{proj=P_latest, proj_history=H2,
sane_transitions=Xtns + 1}}.
set_proj(S#ch_mgr{proj_history=H2,
sane_transitions=Xtns + 1}, P_latest)}.
add_and_trunc_history(P_latest, H, MaxLength) ->
H2 = if P_latest#projection_v1.epoch_number > 0 ->
@ -2599,6 +2603,61 @@ projection_transition_is_sane_except_si_epoch(
stack, Trace}
end.
poll_private_proj_is_upi_unanimous(#ch_mgr{consistency_mode=ap_mode} = S) ->
S;
poll_private_proj_is_upi_unanimous(#ch_mgr{consistency_mode=cp_mode,
proj_unanimous=true} = S) ->
S;
poll_private_proj_is_upi_unanimous(#ch_mgr{consistency_mode=cp_mode,
proj_unanimous=false,
proj=Proj} = S) ->
if Proj#projection_v1.upi == [] % Nobody to poll?
orelse
Proj#projection_v1.epoch_number == 0 -> % Skip polling for epoch 0?
S;
true ->
poll_private_proj_is_upi_unanimous_sleep(0, S)
end.
poll_private_proj_is_upi_unanimous_sleep(Count, S) when Count > 2 ->
S;
poll_private_proj_is_upi_unanimous_sleep(Count, S) ->
timer:sleep((Count * Count) * 50),
case poll_private_proj_is_upi_unanimous3(S) of
#ch_mgr{proj_unanimous=true} = S2 ->
S2;
S2 ->
poll_private_proj_is_upi_unanimous_sleep(Count + 1, S2)
end.
poll_private_proj_is_upi_unanimous3(#ch_mgr{name=_MyName, proj=Proj0,
opts=MgrOpts} = S) ->
Proj = inner_projection_or_self(Proj0),
UPI = Proj#projection_v1.upi,
EpochID = machi_projection:make_epoch_id(Proj),
{Rs, S2} = read_latest_projection_call_only2(private, UPI, S),
Rs2 = [if is_record(P, projection_v1) ->
machi_projection:make_epoch_id(P);
true ->
P
end || #projection_v1{}=P <- Rs],
case lists:usort(Rs2) of
[EID] when EID == EpochID ->
Annotation = {private_proj_is_upi_unanimous, {EpochID, now()}},
%% Careful, use the outer projection here!
NewDbg2 = [Annotation|Proj0#projection_v1.dbg2],
NewProj = Proj0#projection_v1{dbg2=NewDbg2},
ProjStore = get_projection_store_regname(MgrOpts),
ok = machi_projection_store:write(ProjStore, private, NewProj),
%% Unwedge our FLU.
io:format(user, "\nUnwedge ~w @ ~W\n", [_MyName, EpochID, 7]),
{ok, NotifyPid} = machi_projection_store:get_wedge_notify_pid(ProjStore),
_ = machi_flu1:update_wedge_state(NotifyPid, false, EpochID),
S2#ch_mgr{proj_unanimous=true};
_Else ->
S2
end.
sleep_ranked_order(MinSleep, MaxSleep, FLU, FLU_list) ->
USec = calc_sleep_ranked_order(MinSleep, MaxSleep, FLU, FLU_list),
timer:sleep(USec),
@ -3255,3 +3314,11 @@ calc_magic_down([H|T], G) ->
search_last_flap_counts(FLU, FlapCountsLast) ->
proplists:get_value(FLU, FlapCountsLast, undefined).
calc_consistency_mode(_Witness_list = []) ->
ap_mode;
calc_consistency_mode(_Witness_list) ->
cp_mode.
set_proj(S, Proj) ->
S#ch_mgr{proj=Proj, proj_unanimous=false}.

View file

@ -31,7 +31,8 @@
compare/2,
get_epoch_id/1,
make_summary/1,
make_members_dict/1
make_members_dict/1,
make_epoch_id/1
]).
%% @doc Create a new projection record.
@ -201,3 +202,6 @@ make_members_dict(Ps) ->
exit({badarg, {make_members_dict, lists:filter(F_neither, Ps)}})
end
end.
make_epoch_id(#projection_v1{epoch_number=Epoch, epoch_csum=CSum}) ->
{Epoch, CSum}.

View file

@ -59,7 +59,8 @@
get_all_projections/2, get_all_projections/3,
list_all_projections/2, list_all_projections/3
]).
-export([set_wedge_notify_pid/2, set_consistency_mode/2]).
-export([set_wedge_notify_pid/2, get_wedge_notify_pid/1,
set_consistency_mode/2]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
@ -163,6 +164,10 @@ set_wedge_notify_pid(PidSpec, NotifyWedgeStateChanges) ->
gen_server:call(PidSpec, {set_wedge_notify_pid, NotifyWedgeStateChanges},
infinity).
get_wedge_notify_pid(PidSpec) ->
gen_server:call(PidSpec, {get_wedge_notify_pid},
infinity).
set_consistency_mode(PidSpec, CMode)
when CMode == ap_mode; CMode == cp_mode ->
gen_server:call(PidSpec, {set_consistency_mode, CMode}, infinity).
@ -230,6 +235,8 @@ handle_call({{list_all_projections, ProjType}, LC1}, _From, S) ->
{reply, {{ok, find_all(Dir)}, LC2}, S};
handle_call({set_wedge_notify_pid, NotifyWedgeStateChanges}, _From, S) ->
{reply, ok, S#state{wedge_notify_pid=NotifyWedgeStateChanges}};
handle_call({get_wedge_notify_pid}, _From, S) ->
{reply, {ok, S#state.wedge_notify_pid}, S};
handle_call({set_consistency_mode, CMode}, _From, S) ->
{reply, ok, S#state{consistency_mode=CMode}};
handle_call(_Request, _From, S) ->

View file

@ -205,7 +205,7 @@ witness_smoke_test2() ->
%% Whew ... ok, now start some damn tests.
{ok, C1} = machi_cr_client:start_link([P || {_,P}<-orddict:to_list(D)]),
machi_cr_client:append_chunk(C1, Prefix, Chunk1),
{ok, _} = machi_cr_client:append_chunk(C1, Prefix, Chunk1),
{ok, {Off1,Size1,File1}} =
machi_cr_client:append_chunk(C1, Prefix, Chunk1),
Chunk1_badcs = {<<?CSUM_TAG_CLIENT_SHA:8, 0:(8*20)>>, Chunk1},
@ -215,7 +215,8 @@ witness_smoke_test2() ->
%% Stop 'b' and let the chain reset.
ok = machi_flu_psup:stop_flu_package(b),
run_ticks([a_chmgr,c_chmgr]),
%% Run ticks enough times to force auto-unwedge of both a & c.
[run_ticks([a_chmgr,c_chmgr]) || _ <- [1,2,3,4] ],
%% The chain should now be [a,c].
%% Let's wedge OurWitness and see what happens: timeout/partition.