Merge branch 'slf/chain-manager/cp-mode2'

This commit is contained in:
Scott Lystig Fritchie 2015-07-21 14:24:08 +09:00
commit 6ed5767e06
6 changed files with 261 additions and 148 deletions

View file

@ -68,7 +68,8 @@
flaps=0 :: integer(),
flap_start = ?NOT_FLAPPING_START
:: {{'epk', integer()}, erlang:timestamp()},
flap_not_sanes :: orddict:orddict(),
not_sanes :: orddict:orddict(),
sane_transitions = 0 :: non_neg_integer(),
repair_worker :: 'undefined' | pid(),
repair_start :: 'undefined' | erlang:timestamp(),
repair_final_status :: 'undefined' | term(),
@ -213,6 +214,7 @@ init({MyName, InitMembersDict, MgrOpts}) ->
Opt = fun(Key, Default) -> proplists:get_value(Key, MgrOpts, Default) end,
RunEnv = [{seed, Opt(seed, now())},
{use_partition_simulator, Opt(use_partition_simulator, false)},
{simulate_repair, Opt(simulate_repair, true)},
{network_partitions, Opt(network_partitions, [])},
{network_islands, Opt(network_islands, [])},
{up_nodes, Opt(up_nodes, not_init_yet)}],
@ -226,7 +228,7 @@ init({MyName, InitMembersDict, MgrOpts}) ->
flap_limit=length(All_list) + 50,
timer='undefined',
proj_history=queue:new(),
flap_not_sanes=orddict:new(),
not_sanes=orddict:new(),
runenv=RunEnv,
opts=MgrOpts},
{_, S2} = do_set_chain_members_dict(MembersDict, S),
@ -567,6 +569,7 @@ calc_projection(#ch_mgr{proj=LastProj, runenv=RunEnv} = S,
calc_projection(_OldThreshold, _NoPartitionThreshold, LastProj,
RelativeToServer, AllHosed, Dbg,
#ch_mgr{name=MyName,
proj=CurrentProj,
runenv=RunEnv1,
repair_final_status=RepairFS}=S) ->
#projection_v1{epoch_number=OldEpochNum,
@ -584,12 +587,18 @@ calc_projection(_OldThreshold, _NoPartitionThreshold, LastProj,
Down = AllMembers -- Up,
NewUPI_list = [X || X <- OldUPI_list, lists:member(X, Up)],
#projection_v1{upi=CurrentUPI_list} = CurrentProj,
LastInCurrentUPI = case CurrentUPI_list of
[] -> does_not_exist_because_upi_is_empty;
[_|_] -> lists:last(CurrentUPI_list)
end,
LastInNewUPI = case NewUPI_list of
[] -> does_not_exist_because_upi_is_empty;
[_|_] -> lists:last(NewUPI_list)
end,
Repairing_list2 = [X || X <- OldRepairing_list, lists:member(X, Up)],
Simulator_p = proplists:get_value(use_partition_simulator, RunEnv2, false),
SimRepair_p = proplists:get_value(simulate_repair, RunEnv2, true),
{NewUPI_list3, Repairing_list3, RunEnv3} =
case {NewUp, Repairing_list2} of
{[], []} ->
@ -606,10 +615,11 @@ calc_projection(_OldThreshold, _NoPartitionThreshold, LastProj,
SameEpoch_p = check_latest_private_projections_same_epoch(
NewUPI_list ++ Repairing_list2,
S#ch_mgr.proj, Partitions, S),
if Simulator_p andalso SameEpoch_p ->
if Simulator_p andalso SimRepair_p andalso
SameEpoch_p andalso RelativeToServer == LastInCurrentUPI ->
D_foo=[{repair_airquote_done, {we_agree, (S#ch_mgr.proj)#projection_v1.epoch_number}}],
{NewUPI_list ++ [H], T, RunEnv2};
not Simulator_p
not (Simulator_p andalso SimRepair_p)
andalso
RepairFS == {repair_final_status, ok} ->
D_foo=[{repair_done, {repair_final_status, ok, (S#ch_mgr.proj)#projection_v1.epoch_number}}],
@ -805,8 +815,56 @@ do_react_to_env(#ch_mgr{name=MyName,
S2#ch_mgr{proj=NewProj, members_dict=NewMembersDict}}
end;
do_react_to_env(S) ->
%% The not_sanes manager counting dictionary is not strictly
%% limited to flapping scenarios. (Though the mechanism first
%% started as a way to deal with rare flapping scenarios.)
%%
%% I believe that the problem cannot happen in real life, but it can
%% happen in simulated environments, especially if the simulation for
%% repair can be approximately infinitely fast.
%%
%% For example:
%% P_current: epoch=1135, UPI=[b,e,a], Repairing=[c,d], author=e
%%
%% Now a partition happens, a & b are on an island, c & d & e on
%% the other island.
%%
%% P_newprop: epoch=1136, UPI=[e,c], Repairing=[d], author=e
%%
%% Why does e think that this is feasible? Well, the old UPI was
%% [b,e,a], and we know that a & b are partitioned away from e.
%% Therefore e chooses the best UPI, [e]. However, the simulator
%% now also says, hey, there are nodes in the repairing list, so
%% let's simulate a repair ... and the repair goes infinitely
%% quickly ...and the epoch is stable during the repair period
%% (i.e., both e/repairer and c/repairee remained in the same
%% epoch 1135) ... so e decides that the simulated repair is
%% "finished" and it's time to add the repairee to the tail of the
%% UPI ... so that's why 1136's UPI=[e,c].
%%
%% I'll try to add a condition to the simulated repair to try to
%% make slightly fewer assumptions in a row. However, I believe
%% it's a good idea to keep this too-many-not_sane-transition-
%% attempts counter very generic (i.e., not specific for flapping
%% as it once was).
%%
%% The not_sanes counter dict should be reset when we have had at
%% least 3 state transitions that did not have a not_sane
%% suggested projection transition or whenever we fall back to the
%% none_projection.
%%
%% We'll probably implement a very simple counter that may/will be
%% *inaccurate* by at most one -- so any reset test should ignore
%% counter values of 0 & 1.
%%
put(react, []),
react_to_env_A10(S).
if S#ch_mgr.sane_transitions > 3 -> % TODO review this constant
%% ?V("Skr,~w,", [S#ch_mgr.name]),
react_to_env_A10(S#ch_mgr{not_sanes=orddict:new()});
true ->
%% ?V("Sk,~w,~w,", [S#ch_mgr.name, S#ch_mgr.sane_transitions]),
react_to_env_A10(S)
end.
react_to_env_A10(S) ->
?REACT(a10),
@ -986,7 +1044,7 @@ react_to_env_A30(Retries, P_latest, LatestUnanimousP, _ReadExtra,
?REACT({a30, ?LINE, [{inner_summary,
machi_projection:make_summary(P_inner2)}]}),
%% Adjust the outer projection's #flap_i info.
?V("~w,", [{'YOYO',MyName,NewEpoch}]),
%% ?V("~w,", [{'FLAP',MyName,NewEpoch}]),
#projection_v1{flap=OldFlap} = P_newprop3,
NewFlap = OldFlap#flap_i{flapping_me=true},
?REACT({a30, ?LINE, [flap_continue,
@ -1282,7 +1340,7 @@ react_to_env_B10(Retries, P_newprop, P_latest, LatestUnanimousP,
{flap_limit, FlapLimit}]}),
case proplists:get_value(private_write_verbose, S#ch_mgr.opts) of
true ->
?V("{FLAP: ~w flaps ~w}! ", [S#ch_mgr.name, P_newprop_flap_count]);
ok; %% ?V("{FLAP: ~w flaps ~w}! ", [S#ch_mgr.name, P_newprop_flap_count]);
_ ->
ok
end,
@ -1360,7 +1418,7 @@ react_to_env_B10(Retries, P_newprop, P_latest, LatestUnanimousP,
react_to_env_C100(P_newprop, #projection_v1{author_server=Author_latest,
flap=Flap_latest0}=P_latest,
#ch_mgr{name=MyName, proj=P_current,
flap_not_sanes=NotSanesDict0}=S) ->
not_sanes=NotSanesDict0}=S) ->
?REACT(c100),
Sane = projection_transition_is_sane(P_current, P_latest, MyName),
@ -1382,19 +1440,16 @@ react_to_env_C100(P_newprop, #projection_v1{author_server=Author_latest,
_ when P_current#projection_v1.epoch_number == 0 ->
%% Epoch == 0 is reserved for first-time, just booting conditions.
?REACT({c100, ?LINE, [first_write]}),
erase(perhaps_reset_loop),
if Sane == true -> ok; true -> ?V("insane-~w-~w@~w,", [MyName, P_newprop#projection_v1.epoch_number, ?LINE]) end, %%% DELME!!!
react_to_env_C110(P_latest, S);
true ->
?REACT({c100, ?LINE, [sane]}),
erase(perhaps_reset_loop),
if Sane == true -> ok; true -> ?V("insane-~w-~w@~w,", [MyName, P_newprop#projection_v1.epoch_number, ?LINE]) end, %%% DELME!!!
react_to_env_C110(P_latest, S);
%% 20150715: I've seen this loop happen with {expected_author2,X}
%% where nobody agrees, weird.
false when Author_latest == MyName andalso
is_record(Flap_latest, flap_i) andalso
Flap_latest#flap_i.flapping_me == true ->
?REACT({c100, ?LINE}),
?V("\n\n1YOYO ~w breaking the cycle of ~p\n", [MyName, machi_projection:make_summary(P_latest)]),
DoctorSays ->
?REACT({c100, ?LINE, [{not_sane, DoctorSays}]}),
%% This is a fun case. We had just enough asymmetric partition
%% to cause the chain to fragment into two *incompatible* and
%% *overlapping membership* chains, but the chain fragmentation
@ -1436,69 +1491,37 @@ react_to_env_C100(P_newprop, #projection_v1{author_server=Author_latest,
%%
%% So, we're going to keep track in #ch_mgr state for the number
%% of times that this insane judgement has happened.
%%
%% See also: comment in do_react_to_env() about
%% non-flapping-scenario that can also cause us to want to
%% collapse to the none_projection to break a
%% livelock/infinite loop.
react_to_env_C100_inner(Author_latest, NotSanesDict0, MyName,
P_newprop, P_latest, S);
{expected_author2,_}=_ExpectedErr when Author_latest == MyName andalso
is_record(Flap_latest, flap_i) andalso
Flap_latest#flap_i.flapping_me == true ->
?REACT({c100, ?LINE}),
react_to_env_C100_inner(Author_latest, NotSanesDict0, MyName,
P_newprop, P_latest, S);
{expected_author2,_ExpectedAuthor2}=_ExpectedErr ->
case get(perhaps_reset_loop) of
undefined ->
put(perhaps_reset_loop, 1),
?REACT({c100, ?LINE, [not_sane, get(why2), _ExpectedErr]}),
react_to_env_C300(P_newprop, P_latest, S);
X when X > ?TOO_FREQUENT_BREAKER ->
%% Ha, yes, this is possible. For example:
%% outer: author=e,upi=[b,a,d],repair=[c,e]
%% inner: author=e,upi=[b,e], repair=[]
%% In this case, the transition from inner to outer by A30
%% has chosen the wrong author. We have two choices.
%% 1. Accept this transition, because it really was the
%% safe & transition-approved UPI+repeairing that we
%% were using while we were flapping. I'm 99% certain
%% that this is safe. TODO: Verify
%% 2. I'm not yet 100% certain that #1 is safe, so instead
%% we fall back to the one thing that we know is safe:
%% the 'none' projection, which lets the chain rebuild
%% itself normally during future iterations.
?REACT({c100, ?LINE}),
react_to_env_C103(P_latest, S);
X ->
put(perhaps_reset_loop, X+1),
?REACT({c100, ?LINE, [not_sane, get(why2), _ExpectedErr]}),
react_to_env_C300(P_newprop, P_latest, S)
end;
_AnyOtherReturnValue ->
%% P_latest is not sane.
%% By process of elimination, P_newprop is best,
%% so let's write it.
?REACT({c100, ?LINE, [not_sane, get(why2), _AnyOtherReturnValue]}),
erase(perhaps_reset_loop),
react_to_env_C300(P_newprop, P_latest, S)
P_newprop, P_latest, S)
end.
react_to_env_C100_inner(Author_latest, NotSanesDict0, MyName,
P_newprop, P_latest, S) ->
NotSanesDict = orddict:update_counter(Author_latest, 1, NotSanesDict0),
S2 = S#ch_mgr{flap_not_sanes=NotSanesDict},
S2 = S#ch_mgr{not_sanes=NotSanesDict, sane_transitions=0},
case orddict:fetch(Author_latest, NotSanesDict) of
N when N > ?TOO_FREQUENT_BREAKER ->
?V("\n\nYOYO ~w breaking the cycle of ~p\n", [MyName, machi_projection:make_summary(P_latest)]),
?REACT({c100, ?LINE, [{not_sanes_author_count, N}]}),
react_to_env_C103(P_latest, S2);
N ->
?V("YOYO,~w,~w,~w,",[MyName, P_latest#projection_v1.epoch_number,N]),
?REACT({c100, ?LINE, [{not_sanes_author_count, N}]}),
%% P_latest is not sane.
%% By process of elimination, P_newprop is best,
%% so let's write it.
react_to_env_C300(P_newprop, P_latest, S2)
end.
react_to_env_C103(#projection_v1{epoch_number=Epoch_latest,
all_members=All_list,
members_dict=MembersDict} = P_latest,
#ch_mgr{name=MyName}=S) ->
all_members=All_list,
members_dict=MembersDict} = P_latest,
#ch_mgr{name=MyName, proj=P_current}=S) ->
#projection_v1{epoch_number=Epoch_latest,
all_members=All_list,
members_dict=MembersDict} = P_latest,
@ -1507,9 +1530,16 @@ react_to_env_C103(#projection_v1{epoch_number=Epoch_latest,
dbg=[{none_projection,true}]},
P_none = machi_projection:update_checksum(P_none1),
%% Use it, darn it, because it's 100% safe. And exit flapping state.
?REACT({c103, ?LINE,
[{current_epoch, P_current#projection_v1.epoch_number},
{none_projection_epoch, Epoch_latest}]}),
%% Reset the not_sanes count dictionary here, or else an already
%% ?TOO_FREQUENT_BREAKER count for an author might prevent a
%% transition from C100_inner()->C300, which can lead to infinite
%% looping C100->C103->C100.
react_to_env_C100(P_none, P_none, S#ch_mgr{flaps=0,
flap_start=?NOT_FLAPPING_START,
flap_not_sanes=orddict:new()}).
not_sanes=orddict:new()}).
react_to_env_C110(P_latest, #ch_mgr{name=MyName} = S) ->
?REACT(c110),
@ -1544,22 +1574,28 @@ react_to_env_C110(P_latest, #ch_mgr{name=MyName} = S) ->
P_latest2x = P_latest2#projection_v1{dbg2=[]}, % limit verbose len.
case inner_projection_exists(P_latest2) of
false ->
case proplists:get_value(private_write_verbose, S#ch_mgr.opts) of
true ->
Last2 = get(last_verbose),
Summ2 = machi_projection:make_summary(P_latest2x),
case proplists:get_value(private_write_verbose,
S#ch_mgr.opts) of
true when Summ2 /= Last2 ->
put(last_verbose, Summ2),
?V("\n~2..0w:~2..0w:~2..0w.~3..0w ~p uses plain: ~w\n",
[HH,MM,SS,MSec, S#ch_mgr.name,
machi_projection:make_summary(P_latest2x)]);
[HH,MM,SS,MSec, S#ch_mgr.name, Summ2]);
_ ->
ok
end;
true ->
case proplists:get_value(private_write_verbose, S#ch_mgr.opts) of
true ->
P_inner = inner_projection_or_self(P_latest2),
P_innerx = P_inner#projection_v1{dbg2=[]}, % limit verbose len.
Last2 = get(last_verbose),
P_inner = inner_projection_or_self(P_latest2),
P_innerx = P_inner#projection_v1{dbg2=[]}, % limit verbose len.
Summ2 = machi_projection:make_summary(P_innerx),
case proplists:get_value(private_write_verbose,
S#ch_mgr.opts) of
true when Summ2 /= Last2 ->
put(last_verbose, Summ2),
?V("\n~2..0w:~2..0w:~2..0w.~3..0w ~p uses inner: ~w\n",
[HH,MM,SS,MSec, S#ch_mgr.name,
machi_projection:make_summary(P_innerx)]);
[HH,MM,SS,MSec, S#ch_mgr.name, Summ2]);
_ ->
ok
end
@ -1569,7 +1605,8 @@ react_to_env_C110(P_latest, #ch_mgr{name=MyName} = S) ->
end,
react_to_env_C120(P_latest, [], S).
react_to_env_C120(P_latest, FinalProps, #ch_mgr{proj_history=H}=S) ->
react_to_env_C120(P_latest, FinalProps, #ch_mgr{proj_history=H,
sane_transitions=Xtns}=S) ->
?REACT(c120),
H2 = queue:in(P_latest, H),
H3 = case queue:len(H2) of
@ -1588,7 +1625,7 @@ react_to_env_C120(P_latest, FinalProps, #ch_mgr{proj_history=H}=S) ->
?REACT({c120, [{latest, machi_projection:make_summary(P_latest)}]}),
{{now_using, FinalProps, P_latest#projection_v1.epoch_number},
S#ch_mgr{proj=P_latest, proj_history=H3}}.
S#ch_mgr{proj=P_latest, proj_history=H3, sane_transitions=Xtns + 1}}.
react_to_env_C200(Retries, P_latest, S) ->
?REACT(c200),
@ -1633,8 +1670,7 @@ react_to_env_C310(P_newprop, S) ->
calculate_flaps(P_newprop, _P_current, _FlapLimit,
#ch_mgr{name=MyName, proj_history=H, flap_start=FlapStart,
flaps=Flaps, flap_not_sanes=NotSanesDict0,
runenv=RunEnv1}=S) ->
flaps=Flaps, runenv=RunEnv1}=S) ->
HistoryPs = queue:to_list(H),
Ps = HistoryPs ++ [P_newprop],
UniqueProposalSummaries = lists:usort([{P#projection_v1.upi,
@ -1696,7 +1732,6 @@ calculate_flaps(P_newprop, _P_current, _FlapLimit,
true ->
NewFlapStart = FlapStart
end,
NotSanesDict = NotSanesDict0,
%% Wow, this behavior is almost spooky.
%%
@ -1726,7 +1761,6 @@ calculate_flaps(P_newprop, _P_current, _FlapLimit,
{_N, _} ->
NewFlaps = 0,
NewFlapStart = ?NOT_FLAPPING_START,
NotSanesDict = orddict:new(),
AllFlapCounts = [],
AllHosed = []
end,
@ -1749,8 +1783,7 @@ calculate_flaps(P_newprop, _P_current, _FlapLimit,
%% It isn't doing what I'd originally intended. Fix it.
{machi_projection:update_checksum(P_newprop#projection_v1{
flap=FlappingI}),
S#ch_mgr{flaps=NewFlaps, flap_start=NewFlapStart,
flap_not_sanes=NotSanesDict, runenv=RunEnv1}}.
S#ch_mgr{flaps=NewFlaps, flap_start=NewFlapStart, runenv=RunEnv1}}.
make_flapping_i() ->
make_flapping_i({{epk,-1},?NOT_FLAPPING}, 0, [], [], []).
@ -2071,66 +2104,85 @@ gobble_calls(StaticCall) ->
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
perhaps_start_repair(
#ch_mgr{name=MyName,
repair_worker=undefined,
proj=#projection_v1{creation_time=Start,
upi=[_|_]=UPI,
repairing=[_|_]}}=S) ->
RepairId = {MyName, os:timestamp()},
RepairOpts = [{repair_mode, repair}, verbose, {repair_id, RepairId}],
%% RepairOpts = [{repair_mode, check}, verbose],
RepairFun = fun() -> do_repair(S, RepairOpts, ap_mode) end,
LastUPI = lists:last(UPI),
IgnoreStabilityTime_p = proplists:get_value(ignore_stability_time,
S#ch_mgr.opts, false),
case timer:now_diff(os:timestamp(), Start) div 1000000 of
N when MyName == LastUPI andalso
(IgnoreStabilityTime_p orelse
N >= ?REPAIR_START_STABILITY_TIME) ->
{WorkerPid, _Ref} = spawn_monitor(RepairFun),
S#ch_mgr{repair_worker=WorkerPid,
repair_start=os:timestamp(),
repair_final_status=undefined};
perhaps_start_repair(#ch_mgr{name=MyName,
repair_worker=undefined,
proj=P_current}=S) ->
case inner_projection_or_self(P_current) of
#projection_v1{creation_time=Start,
upi=[_|_]=UPI,
repairing=[_|_]} ->
RepairId = {MyName, os:timestamp()},
RepairOpts = [{repair_mode,repair}, verbose, {repair_id,RepairId}],
%% RepairOpts = [{repair_mode, check}, verbose],
RepairFun = fun() -> do_repair(S, RepairOpts, ap_mode) end,
LastUPI = lists:last(UPI),
IgnoreStabilityTime_p = proplists:get_value(ignore_stability_time,
S#ch_mgr.opts, false),
case timer:now_diff(os:timestamp(), Start) div 1000000 of
N when MyName == LastUPI andalso
(IgnoreStabilityTime_p orelse
N >= ?REPAIR_START_STABILITY_TIME) ->
{WorkerPid, _Ref} = spawn_monitor(RepairFun),
S#ch_mgr{repair_worker=WorkerPid,
repair_start=os:timestamp(),
repair_final_status=undefined};
_ ->
S
end;
_ ->
S
end;
perhaps_start_repair(S) ->
S.
do_repair(
#ch_mgr{name=MyName,
proj=#projection_v1{upi=UPI,
repairing=[_|_]=Repairing,
members_dict=MembersDict}}=_S_copy,
Opts, ap_mode=RepairMode) ->
?V("RePaiR-~w,", [self()]),
T1 = os:timestamp(),
RepairId = proplists:get_value(repair_id, Opts, id1),
error_logger:info_msg("Repair start: tail ~p of ~p -> ~p, ~p ID ~w\n",
[MyName, UPI, Repairing, RepairMode, RepairId]),
do_repair(#ch_mgr{name=MyName,
proj=#projection_v1{upi=UPI,
repairing=[_|_]=Repairing,
members_dict=MembersDict}}=S,
Opts, ap_mode=RepairMode) ->
ETS = ets:new(repair_stats, [private, set]),
ETS_T_Keys = [t_in_files, t_in_chunks, t_in_bytes,
t_out_files, t_out_chunks, t_out_bytes,
t_bad_chunks, t_elapsed_seconds],
[ets:insert(ETS, {K, 0}) || K <- ETS_T_Keys],
Res = machi_chain_repair:repair(ap_mode, MyName, Repairing, UPI,
MembersDict, ETS, Opts),
T2 = os:timestamp(),
Elapsed = (timer:now_diff(T2, T1) div 1000) / 1000,
ets:insert(ETS, {t_elapsed_seconds, Elapsed}),
Summary = case Res of ok -> "success";
{ok, MyProj} = ?FLU_PC:read_latest_projection(proxy_pid(MyName, S),
private),
MyEpochID = machi_projection:get_epoch_id(MyProj),
RepairEpochIDs = [case ?FLU_PC:read_latest_projection(proxy_pid(Rep, S),
private) of
{ok, Proj} ->
machi_projection:get_epoch_id(Proj);
_ ->
unknown
end || Rep <- Repairing],
case lists:usort(RepairEpochIDs) of
[MyEpochID] ->
T1 = os:timestamp(),
RepairId = proplists:get_value(repair_id, Opts, id1),
error_logger:info_msg(
"Repair start: tail ~p of ~p -> ~p, ~p ID ~w\n",
[MyName, UPI, Repairing, RepairMode, RepairId]),
Res = machi_chain_repair:repair(ap_mode, MyName, Repairing, UPI,
MembersDict, ETS, Opts),
T2 = os:timestamp(),
Elapsed = (timer:now_diff(T2, T1) div 1000) / 1000,
ets:insert(ETS, {t_elapsed_seconds, Elapsed}),
Summary = case Res of ok -> "success";
_ -> "FAILURE"
end,
Stats = [{K, ets:lookup_element(ETS, K, 2)} || K <- ETS_T_Keys],
error_logger:info_msg("Repair ~s: tail ~p of ~p finished ~p repair ID ~w: "
"~p\nStats ~p\n",
[Summary, MyName, UPI, RepairMode, RepairId,
Res, Stats]),
ets:delete(ETS),
exit({repair_final_status, Res}).
end,
Stats = [{K, ets:lookup_element(ETS, K, 2)} || K <- ETS_T_Keys],
error_logger:info_msg(
"Repair ~s: tail ~p of ~p finished ~p repair ID ~w: "
"~p\nStats ~p\n",
[Summary, MyName, UPI, RepairMode, RepairId,
Res, Stats]),
ets:delete(ETS),
exit({repair_final_status, Res});
_ ->
exit(not_all_in_same_epoch)
end.
sanitize_repair_state(#ch_mgr{repair_final_status=Res,
proj=#projection_v1{upi=[_|_]}}=S)

View file

@ -29,6 +29,7 @@
update_checksum/1,
update_dbg2/2,
compare/2,
get_epoch_id/1,
make_summary/1,
make_members_dict/1
]).
@ -138,6 +139,11 @@ compare(#projection_v1{epoch_number=E1},
E1 > E2 -> 1
end.
%% @doc Return the epoch_id of the projection.
get_epoch_id(#projection_v1{epoch_number=Epoch, epoch_csum=CSum}) ->
{Epoch, CSum}.
%% @doc Create a proplist-style summary of a projection record.
make_summary(#projection_v1{epoch_number=EpochNum,

View file

@ -223,8 +223,26 @@ write_projection(PidSpec, ProjType, Proj) ->
%% @doc Write a projection `Proj' of type `ProjType'.
write_projection(PidSpec, ProjType, Proj, Timeout) ->
gen_server:call(PidSpec, {req, {write_projection, ProjType, Proj}},
Timeout).
case gen_server:call(PidSpec, {req, {write_projection, ProjType, Proj}},
Timeout) of
{error, written}=Err ->
Epoch = Proj#projection_v1.epoch_number,
case read_projection(PidSpec, ProjType, Epoch, Timeout) of
{ok, Proj2} when Proj2 == Proj ->
%% The proxy made (at least) two attempts to write
%% this projection. An earlier one appeared to
%% have failed, so the proxy retried. The later
%% attempt returned to us {error,written} because
%% the earlier attempt was actually received &
%% processed by the server. So, we consider this
%% a successful write.
ok;
_ ->
Err
end;
Else ->
Else
end.
%% @doc Get all projections from the FLU's projection store.
@ -277,8 +295,19 @@ write_chunk(PidSpec, EpochID, File, Offset, Chunk) ->
%% with `Prefix' at `Offset'.
write_chunk(PidSpec, EpochID, File, Offset, Chunk, Timeout) ->
gen_server:call(PidSpec, {req, {write_chunk, EpochID, File, Offset, Chunk}},
Timeout).
case gen_server:call(PidSpec, {req, {write_chunk, EpochID, File, Offset, Chunk}},
Timeout) of
{error, written}=Err ->
case read_chunk(PidSpec, EpochID, File, Offset, Chunk, Timeout) of
{ok, Chunk2} when Chunk2 == Chunk ->
%% See equivalent comment inside write_projection().
ok;
_ ->
Err
end;
Else ->
Else
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%

View file

@ -129,6 +129,8 @@ long_doc() ->
n of a naive/1st draft detection algorithm.
".
%% ' silly Emacs syntax highlighting....
%% convergence_demo_test_() ->
%% {timeout, 98*300, fun() -> convergence_demo_testfun() end}.
@ -157,7 +159,9 @@ convergence_demo_testfun(NumFLUs, MgrOpts0) ->
ok = filelib:ensure_dir("/tmp/c/not-used"),
FluInfo = [{a,TcpPort+0,"/tmp/c/data.a"}, {b,TcpPort+1,"/tmp/c/data.b"},
{c,TcpPort+2,"/tmp/c/data.c"}, {d,TcpPort+3,"/tmp/c/data.d"},
{e,TcpPort+4,"/tmp/c/data.e"}, {f,TcpPort+5,"/tmp/c/data.f"}],
{e,TcpPort+4,"/tmp/c/data.e"}, {f,TcpPort+5,"/tmp/c/data.f"},
{g,TcpPort+6,"/tmp/c/data.g"}, {h,TcpPort+7,"/tmp/c/data.h"},
{i,TcpPort+8,"/tmp/c/data.i"}, {j,TcpPort+9,"/tmp/c/data.j"}],
FLU_biglist = [X || {X,_,_} <- FluInfo],
All_list = lists:sublist(FLU_biglist, NumFLUs),
io:format(user, "\nSET # of FLUs = ~w members ~w).\n",
@ -444,19 +448,16 @@ private_projections_are_stable(Namez, PollFunc) ->
Private1 = [get_latest_inner_proj_summ(FLU) || {_Name, FLU} <- Namez],
PollFunc(5, 1, 10),
Private2 = [get_latest_inner_proj_summ(FLU) || {_Name, FLU} <- Namez],
if Private1 == Private2 ->
ok;
true ->
io:format(user, "Private1: ~p, ", [Private1]),
io:format(user, "Private2: ~p, ", [Private2])
end,
Private1 == Private2.
Is = [Inner_p || {_,_,_,_,Inner_p} <- Private1],
%% We want either all true or all false (inner or not).
Private1 == Private2 andalso length(lists:usort(Is)) == 1.
get_latest_inner_proj_summ(FLU) ->
{ok, Proj} = ?FLU_PC:read_latest_projection(FLU, private),
#projection_v1{epoch_number=E, upi=UPI, repairing=Repairing, down=Down} =
machi_chain_manager1:inner_projection_or_self(Proj),
{E, UPI, Repairing, Down}.
Inner_p = machi_chain_manager1:inner_projection_exists(Proj),
{E, UPI, Repairing, Down, Inner_p}.
random_sort(L) ->
random:seed(now()),

View file

@ -172,6 +172,7 @@ all_list() ->
setup(Num, Seed) ->
?V("\nsetup(~w,~w", [Num, Seed]),
[catch erlang:garbage_collect(P) || P <- processes()],
All_list = lists:sublist(all_list(), Num),
All_listE = lists:sublist(all_list_extra(), Num),
%% shutdown_hard() has taken care of killing all relevant procs.
@ -322,7 +323,7 @@ prop_pulse() ->
prop_pulse(Style) when Style == new; Style == regression ->
_ = application:start(crypto),
?FORALL({Cmds0, Seed}, {gen_commands(Style), pulse:seed()},
?IMPLIES(1 < length(Cmds0) andalso length(Cmds0) < 11,
?IMPLIES(length(Cmds0) < 11,
begin
ok = shutdown_hard(),
%% PULSE can be really unfair, of course, including having exec_ticks

View file

@ -121,12 +121,14 @@ flu_restart_test() ->
{ok, Prox1} = ?MUT:start_link(I),
try
FakeEpoch = ?DUMMY_PV1_EPOCH,
Data = <<"data!">>,
Data = <<"data!">>,
Dataxx = <<"Fake!">>,
{ok, {Off1,Size1,File1}} = ?MUT:append_chunk(Prox1,
FakeEpoch, <<"prefix">>, Data,
infinity),
P_a = #p_srvr{name=a, address="localhost", port=6622},
P1 = machi_projection:new(1, a, [P_a], [], [a], [], []),
P1 = machi_projection:new(1, a, [P_a], [], [a], [], []),
P1xx = P1#projection_v1{dbg2=["not exactly the same as P1!!!"]},
EpochID = {P1#projection_v1.epoch_number,
P1#projection_v1.epoch_csum},
ok = ?MUT:write_projection(Prox1, public, P1),
@ -182,18 +184,31 @@ flu_restart_test() ->
(line) -> io:format("line ~p, ", [?LINE]);
(stop) -> ?MUT:read_projection(Prox1, private, 7)
end,
fun(run) -> {error, written} =
fun(run) -> ok =
?MUT:write_projection(Prox1, public, P1),
ok;
(line) -> io:format("line ~p, ", [?LINE]);
(stop) -> ?MUT:write_projection(Prox1, public, P1)
end,
fun(run) -> {error, written} =
fun(run) -> ok =
?MUT:write_projection(Prox1, private, P1),
ok;
(line) -> io:format("line ~p, ", [?LINE]);
(stop) -> ?MUT:write_projection(Prox1, private, P1)
end,
fun(run) -> {error, written} =
?MUT:write_projection(Prox1, public, P1xx),
ok;
(line) -> io:format("line ~p, ", [?LINE]);
(stop) -> ?MUT:write_projection(Prox1, public, P1xx)
end,
fun(run) -> {error, written} =
?MUT:write_projection(Prox1, private, P1xx),
ok;
(line) -> io:format("line ~p, ", [?LINE]);
(stop) -> ?MUT:write_projection(Prox1, private, P1xx)
end,
fun(run) -> {ok, [_]} =
?MUT:get_all_projections(Prox1, public),
ok;
@ -249,9 +264,7 @@ flu_restart_test() ->
(line) -> io:format("line ~p, ", [?LINE]);
(stop) -> ?MUT:wedge_status(Prox1)
end,
%% NOTE: When write-once enforcement is enabled, this test
%% will fail: change ok -> {error, written}
fun(run) -> %% {error, written} =
fun(run) ->
ok =
?MUT:write_chunk(Prox1, FakeEpoch, File1, Off1,
Data, infinity),
@ -259,6 +272,17 @@ flu_restart_test() ->
(line) -> io:format("line ~p, ", [?LINE]);
(stop) -> ?MUT:write_chunk(Prox1, FakeEpoch, File1, Off1,
Data, infinity)
end,
%% NOTE: When write-once enforcement is enabled, this test
%% will fail: change ok -> {error, written}
fun(run) -> %% {error, written} =
ok =
?MUT:write_chunk(Prox1, FakeEpoch, File1, Off1,
Dataxx, infinity),
ok;
(line) -> io:format("line ~p, ", [?LINE]);
(stop) -> ?MUT:write_chunk(Prox1, FakeEpoch, File1, Off1,
Dataxx, infinity)
end
],