WIP: changes to annotation management

This commit is contained in:
Scott Lystig Fritchie 2015-08-27 17:58:43 +09:00
parent 8a61a85ae0
commit 65cd18939c
2 changed files with 85 additions and 27 deletions

View file

@ -61,7 +61,7 @@
name :: pv1_server(),
flap_limit :: non_neg_integer(),
proj :: projection(),
proj_unanimous :: boolean(),
proj_unanimous :: 'false' | erlang:timestamp(),
%%
timer :: 'undefined' | timer:tref(),
ignore_timer :: boolean(),
@ -70,7 +70,7 @@
flap_start=?NOT_FLAPPING_START
:: {{'epk', integer()}, erlang:timestamp()},
flap_last_up=[] :: list(),
flap_last_up_change=now() :: erlang:now(),
flap_last_up_change=now() :: erlang:timestamp(),
flap_counts_last=[] :: list(),
not_sanes :: orddict:orddict(),
sane_transitions = 0 :: non_neg_integer(),
@ -553,6 +553,18 @@ read_latest_projection_call_only2(ProjectionType, All_queried_list, S) ->
FLU <- All_queried_list],
{Rs, S2}.
read_projection_call_only2(ProjectionType, Epoch, All_queried_list, S) ->
{_UpNodes, Partitions, S2} = calc_up_nodes(S),
DoIt = fun(Pid) ->
case (?FLU_PC:read_projection(Pid, ProjectionType, Epoch, ?TO)) of
{ok, P} -> P;
Else -> Else
end
end,
Rs = [(catch perhaps_call_t(S, Partitions, FLU, fun(Pid) -> DoIt(Pid) end)) ||
FLU <- All_queried_list],
{Rs, S2}.
cl_read_latest_projection(ProjectionType, S) ->
AllHosed = [],
cl_read_latest_projection(ProjectionType, AllHosed, S).
@ -808,6 +820,11 @@ calc_projection2(LastProj, RelativeToServer, AllHosed, Dbg,
P_none0 = make_none_projection(
MyName, AllMembers, OldWitness_list,
MembersDict),
Why = if NewUPI == [] ->
no_real_servers;
true ->
not_enough_witnesses
end,
P_none1 = P_none0#projection_v1{
epoch_number=OldEpochNum + 1,
dbg=[{none_projection,true},
@ -821,7 +838,7 @@ calc_projection2(LastProj, RelativeToServer, AllHosed, Dbg,
{tent_upi, TentativeUPI},
{new_upi, NewUPI},
{up_witnesses, UpWitnesses},
{not_enough_witnesses,true}]},
{why_none, Why}]},
machi_projection:update_checksum(P_none1)
end
end;
@ -1965,11 +1982,28 @@ react_to_env_C103(#projection_v1{epoch_number=_Epoch_newprop} = _P_newprop,
%% looping C100->C103->C100.
react_to_env_C100(P_none, P_none, clear_flapping_state(S)).
react_to_env_C110(P_latest, #ch_mgr{name=MyName} = S) ->
react_to_env_C110(P_latest, #ch_mgr{name=MyName, proj=P_current,
proj_unanimous=ProjUnanimous} = S) ->
?REACT(c110),
?REACT({c110, ?LINE, [{latest_epoch,P_latest#projection_v1.epoch_number}]}),
Extra_todo = [{react,get(react)}],
P_latest2 = machi_projection:update_dbg2(P_latest, Extra_todo),
Extra1 = case inner_projection_exists(P_current) andalso
inner_projection_exists(P_latest) andalso
(machi_projection:get_epoch_id(
inner_projection_or_self(P_current)) ==
machi_projection:get_epoch_id(
inner_projection_or_self(P_latest)))
andalso ProjUnanimous /= false of
true ->
EpochID = machi_projection:get_epoch_id(
inner_projection_or_self(P_latest)),
UnanimousTime = ProjUnanimous,
A = make_annotation(EpochID, UnanimousTime),
[A];
false ->
[]
end,
Extra2 = [{react,get(react)}],
P_latest2 = machi_projection:update_dbg2(P_latest, Extra1 ++ Extra2),
MyNamePid = proxy_pid(MyName, S),
Goo = P_latest2#projection_v1.epoch_number,
@ -2034,6 +2068,7 @@ react_to_env_C120(P_latest, FinalProps, #ch_mgr{proj_history=H,
diversion_c120_verbose_goop(P_latest, S),
?REACT({c120, [{latest, machi_projection:make_summary(P_latest)}]}),
io:format(user, "C120: ~w wrote ~w ~W\n", [S#ch_mgr.name, P_latest#projection_v1.epoch_number, P_latest#projection_v1.epoch_csum, 6]),
{{now_using, FinalProps, P_latest#projection_v1.epoch_number},
set_proj(S#ch_mgr{proj_history=H2,
sane_transitions=Xtns + 1}, P_latest)}.
@ -2606,7 +2641,7 @@ projection_transition_is_sane_except_si_epoch(
poll_private_proj_is_upi_unanimous(#ch_mgr{consistency_mode=ap_mode} = S) ->
S;
poll_private_proj_is_upi_unanimous(#ch_mgr{consistency_mode=cp_mode,
proj_unanimous=true} = S) ->
proj_unanimous={_,_,_}} = S) ->
S;
poll_private_proj_is_upi_unanimous(#ch_mgr{consistency_mode=cp_mode,
proj_unanimous=false,
@ -2624,17 +2659,17 @@ poll_private_proj_is_upi_unanimous_sleep(Count, S) when Count > 2 ->
poll_private_proj_is_upi_unanimous_sleep(Count, S) ->
timer:sleep((Count * Count) * 50),
case poll_private_proj_is_upi_unanimous3(S) of
#ch_mgr{proj_unanimous=true} = S2 ->
S2;
#ch_mgr{proj_unanimous=false} = S2 ->
poll_private_proj_is_upi_unanimous_sleep(Count + 1, S2);
S2 ->
poll_private_proj_is_upi_unanimous_sleep(Count + 1, S2)
S2
end.
poll_private_proj_is_upi_unanimous3(#ch_mgr{name=_MyName, proj=Proj0,
poll_private_proj_is_upi_unanimous3(#ch_mgr{name=MyName, proj=P_current,
opts=MgrOpts} = S) ->
Proj = inner_projection_or_self(Proj0),
UPI = Proj#projection_v1.upi,
EpochID = machi_projection:make_epoch_id(Proj),
Proj_ios = inner_projection_or_self(P_current),
UPI = Proj_ios#projection_v1.upi,
EpochID = machi_projection:make_epoch_id(Proj_ios),
{Rs, S2} = read_latest_projection_call_only2(private, UPI, S),
Rs2 = [if is_record(P, projection_v1) ->
machi_projection:make_epoch_id(P);
@ -2643,21 +2678,41 @@ poll_private_proj_is_upi_unanimous3(#ch_mgr{name=_MyName, proj=Proj0,
end || #projection_v1{}=P <- Rs],
case lists:usort(Rs2) of
[EID] when EID == EpochID ->
Annotation = {private_proj_is_upi_unanimous, {EpochID, now()}},
%% Careful, use the outer projection here!
NewDbg2 = [Annotation|Proj0#projection_v1.dbg2],
NewProj = Proj0#projection_v1{dbg2=NewDbg2},
ProjStore = get_projection_store_regname(MgrOpts),
Now = os:timestamp(),
Annotation = make_annotation(EpochID, Now),
NewDbg2 = [Annotation|P_current#projection_v1.dbg2],
NewProj = P_current#projection_v1{dbg2=NewDbg2},
ProjStore = case get_projection_store_regname(MgrOpts) of
undefined ->
machi_flu_psup:make_proj_supname(MyName);
PStr ->
PStr
end,
[io:format(user, "whereis(~w) ~w, ", [X, whereis(X)]) ||
X <- [a_pstore, a_pstore2]],
io:format(user, "POLL: ~w updates ~w ~W\n", [S#ch_mgr.name, NewProj#projection_v1.epoch_number, NewProj#projection_v1.epoch_csum, 6]),
ok = machi_projection_store:write(ProjStore, private, NewProj),
%% Unwedge our FLU.
io:format(user, "\nUnwedge ~w @ ~W\n", [_MyName, EpochID, 7]),
io:format(user, "\nUnwedge ~w @ ~W\n", [MyName, EpochID, 7]),
{ok, NotifyPid} = machi_projection_store:get_wedge_notify_pid(ProjStore),
_ = machi_flu1:update_wedge_state(NotifyPid, false, EpochID),
S2#ch_mgr{proj_unanimous=true};
S2#ch_mgr{proj_unanimous=Now};
_Else ->
io:format(user, "poll by ~w: want ~W got ~W\n", [MyName, EpochID, 6, _Else, 8]),
S2
end.
poll_read_private_projections(#projection_v1{inner=undefined,
epoch_number=Epoch,
upi=UPI}=_P_current, S) ->
read_projection_call_only2(private, Epoch, UPI, S);
poll_read_private_projections(#projection_v1{inner=_not_undefined,
upi=UPI}=_P_current, S) ->
%% For inner projections, we are (by definition) flapping, and the
%% outer epoch numbers are (by definition) unstable. However, any
%% observed use of the the inner proj epoch # is what we need.
read_latest_projection_call_only2(private, UPI, S).
sleep_ranked_order(MinSleep, MaxSleep, FLU, FLU_list) ->
USec = calc_sleep_ranked_order(MinSleep, MaxSleep, FLU, FLU_list),
timer:sleep(USec),
@ -3104,8 +3159,7 @@ make_zerf2(OldEpochNum, Up, MajoritySize, MyName, AllMembers, OldWitness_list,
MembersDict, OldFlap, S) ->
try
Proj = zerf_find_last_common(MajoritySize, Up, S),
Proj2 =
Proj#projection_v1{flap=OldFlap}
Proj2 = Proj#projection_v1{flap=OldFlap, dbg2=[]}
, io:format(user, "ZERF ~w\n", [machi_projection:make_summary(Proj2)]),
Proj2
catch
@ -3140,7 +3194,8 @@ zerf_find_last_common(MajoritySize, Up, S) ->
[zerf_find_last_annotated(FLU,MajoritySize,S) || FLU <- Up]))) of
[] ->
throw({zerf,no_common});
[P|_] ->
[P|_]=_TheList ->
io:format(user, "Zerf results: ~P\n", [ [machi_projection:make_summary(X) || X <- _TheList], 20]),
%% TODO is this simple sort really good enough?
P
end.
@ -3149,7 +3204,7 @@ zerf_find_last_annotated(FLU, MajoritySize, S) ->
Proxy = proxy_pid(FLU, S),
{ok, Epochs} = ?FLU_PC:list_all_projections(Proxy, private, 60*1000),
P = lists:foldl(
fun(Epoch, #projection_v1{}=Proj) ->
fun(_Epoch, #projection_v1{}=Proj) ->
Proj;
(Epoch, Acc) ->
{ok, Proj} = ?FLU_PC:read_projection(Proxy, private,
@ -3174,7 +3229,7 @@ zerf_find_last_annotated(FLU, MajoritySize, S) ->
Acc
end
end
end, first_accumulator, Epochs),
end, first_accumulator, lists:reverse(Epochs)),
if is_record(P, projection_v1) ->
P;
true ->
@ -3303,3 +3358,6 @@ calc_consistency_mode(_Witness_list) ->
set_proj(S, Proj) ->
S#ch_mgr{proj=Proj, proj_unanimous=false}.
make_annotation(EpochID, Time) ->
{private_proj_is_upi_unanimous, {EpochID, Time}}.

View file

@ -837,7 +837,7 @@ make_listener_regname(BaseName) ->
%% registers.
make_projection_server_regname(BaseName) ->
list_to_atom(atom_to_list(BaseName) ++ "_pstore2").
list_to_atom(atom_to_list(BaseName) ++ "_pstore").
%% @doc Encode `Offset + Size + TaggedCSum' into an `iolist()' type for
%% internal storage by the FLU.