Merge branch 'slf/cp-mode-adjustments' to 'master'

This commit is contained in:
Scott Lystig Fritchie 2015-09-22 16:19:58 +09:00
commit 3fb3890788
5 changed files with 161 additions and 85 deletions

View file

@ -477,41 +477,58 @@ do_cl_write_public_proj(Proj, S) ->
cl_write_public_proj(Epoch, Proj, S) ->
cl_write_public_proj(Epoch, Proj, false, S).
cl_write_public_proj_skip_local_error(Epoch, Proj, S) ->
cl_write_public_proj_ignore_written_error(Epoch, Proj, S) ->
cl_write_public_proj(Epoch, Proj, true, S).
cl_write_public_proj(Epoch, Proj, SkipLocalWriteErrorP, S) ->
%% Write to local public projection store first, and if it succeeds,
cl_write_public_proj(Epoch, Proj, IgnoreWrittenErrorP, S) ->
%% OLD: Write to local public projection store first, and if it succeeds,
%% then write to all remote public projection stores.
cl_write_public_proj_local(Epoch, Proj, SkipLocalWriteErrorP, S).
cl_write_public_proj_local(Epoch, Proj, SkipLocalWriteErrorP,
#ch_mgr{name=MyName}=S) ->
%% NEW: Hypothesis: The OLD idea is a bad idea and causes needless retries
%% via racing with other wrier
%% NEW: Let's see what kind of trouble we can get ourselves into if
%% we abort our writing efforts if we encounter an
%% {error,written} status.
%% Heh, that doesn't work too well: if we have random uniform write
%% delays of 0-1,500 msec, then we end up abandoning this epoch
%% number at the first sign of trouble and then re-iterate to suggest
%% a new epoch number ... which causes *more* thrash, not less.
{_UpNodes, Partitions, S2} = calc_up_nodes(S),
Res0 = perhaps_call_t(
S, Partitions, MyName,
fun(Pid) -> ?FLU_PC:write_projection(Pid, public, Proj, ?TO) end),
Continue = fun() ->
FLUs = Proj#projection_v1.all_members -- [MyName],
cl_write_public_proj_remote(FLUs, Partitions, Epoch, Proj, S)
end,
case Res0 of
ok ->
{XX, SS} = Continue(),
{{local_write_result, ok, XX}, SS};
Else when SkipLocalWriteErrorP ->
{XX, SS} = Continue(),
{{local_write_result, Else, XX}, SS};
Else ->
{Else, S2}
end.
FLUs = Proj#projection_v1.all_members,
cl_write_public_proj2(FLUs, Partitions, Epoch, Proj,
IgnoreWrittenErrorP, S2).
cl_write_public_proj_remote(FLUs, Partitions, _Epoch, Proj, S) ->
cl_write_public_proj2(FLUs, Partitions, Epoch, Proj, IgnoreWrittenErrorP, S) ->
%% We're going to be very care-free about this write because we'll rely
%% on the read side to do any read repair.
DoIt = fun(Pid) -> ?FLU_PC:write_projection(Pid, public, Proj, ?TO) end,
Rs = [{FLU, perhaps_call_t(S, Partitions, FLU, fun(Pid) -> DoIt(Pid) end)} ||
FLU <- FLUs],
%% Rs = [{FLU, perhaps_call_t(S, Partitions, FLU, fun(Pid) -> DoIt(Pid) end)} ||
%% FLU <- FLUs],
Rs = lists:foldl(
fun(FLU, {false=_KeepGoingP, Acc}) ->
{false, [{FLU,skipped}|Acc]};
(FLU, {true=_KeepGoingP, Acc}) ->
case perhaps_call_t(S, Partitions, FLU, DoIt) of
{error,written}=Written when IgnoreWrittenErrorP ->
%% io:format(user, "\nTried but written and ignoring written\n", []),
{true, [{FLU,Written}|Acc]};
{error,written}=Written when not IgnoreWrittenErrorP ->
%% Brute-force read-repair-like good enough.
DoRead = fun(Pid) -> ?FLU_PC:read_projection(
Pid, public, Epoch, ?TO)
end,
{ok, Proj2} = perhaps_call_t(S, Partitions, FLU,
DoRead),
DoIt2 = fun(Pid) -> ?FLU_PC:write_projection(Pid, public, Proj2, ?TO) end,
_Rs=[_ = perhaps_call_t(S, Partitions, Fl, DoIt2)||
Fl <- FLUs],
%% io:format(user, "\nTried ~w ~W but repairing with ~w ~W: ~w\n", [Epoch, Proj#projection_v1.epoch_csum, 4, Epoch, Proj2#projection_v1.epoch_csum, 4, _Rs]),
{false, [{FLU,Written}|Acc]};
Else ->
%% io:format(user, "\nTried but got an Else ~p\n", [Else]),
{true, [{FLU,Else}|Acc]}
end
end, {true, []}, FLUs),
%% io:format(user, "\nWrite public ~w by ~w: ~w\n", [Epoch, S#ch_mgr.name, Rs]),
{{remote_write_results, Rs}, S}.
do_cl_read_latest_public_projection(ReadRepairP,
@ -596,6 +613,9 @@ rank_and_sort_projections_with_extra(All_queried_list, FLUsRs, ProjectionType,
NotBestPsEpochFilt =
[Proj || Proj <- Ps, Proj /= BestProj,
Proj#projection_v1.epoch_number == BestEpoch],
BadAnswers2 = [Answer || {_FLU, Answer} <- FLUsRs,
not is_record(Answer, projection_v1)
orelse Answer /= BestProj],
%% Wow, I'm not sure how long this bug has been here, but it's
%% likely 5 months old (April 2015). I just now noticed a problem
%% where BestProj was epoch 1194, but NotBestPs contained a
@ -620,7 +640,7 @@ rank_and_sort_projections_with_extra(All_queried_list, FLUsRs, ProjectionType,
{not_unanimous_flus, All_queried_list --
(Best_FLUs ++ BadAnswerFLUs)},
{bad_answer_flus, BadAnswerFLUs},
{bad_answers, BadAnswers},
{bad_answers, BadAnswers2},
{not_best_ps, NotBestPs},
{not_best_ps_epoch_filt, NotBestPsEpochFilt}|Extra],
{UnanimousTag, BestProj, Extra2, S}
@ -641,7 +661,7 @@ do_read_repair(FLUsRs, _Extra, #ch_mgr{proj=CurrentProj} = S) ->
%% We're doing repair, so use the flavor that will
%% continue to all others even if there is an
%% error_written on the local FLU.
{_DontCare, _S2}=Res = cl_write_public_proj_skip_local_error(
{_DontCare, _S2}=Res = cl_write_public_proj_ignore_written_error(
Epoch, BestProj, S),
Res
end.
@ -661,25 +681,29 @@ calc_projection(#ch_mgr{name=MyName, consistency_mode=CMode,
if CMode == ap_mode ->
calc_projection2(P_current, RelativeToServer, AllHosed, Dbg, S);
CMode == cp_mode ->
#projection_v1{epoch_number=OldEpochNum,
all_members=AllMembers,
upi=OldUPI_list
} = P_current,
UPI_length_ok_p =
length(OldUPI_list) >= full_majority_size(AllMembers),
case {OldEpochNum, UPI_length_ok_p} of
{0, _} ->
calc_projection2(P_current, RelativeToServer, AllHosed,
Dbg, S);
{_, true} ->
calc_projection2(P_current, RelativeToServer, AllHosed,
Dbg, S);
{_, false} ->
{Up, _Partitions, RunEnv2} = calc_up_nodes(
MyName, AllMembers, RunEnv),
%% We can't improve on the current projection.
{P_current, S#ch_mgr{runenv=RunEnv2}, Up}
end
calc_projection2(P_current, RelativeToServer, AllHosed, Dbg, S)
%% TODO EXPERIMENT 2015-09-21 DELETE-ME???
%% #projection_v1{epoch_number=OldEpochNum,
%% all_members=AllMembers,
%% upi=OldUPI_list,
%% upi=OldRepairing_list
%% } = P_current,
%% OldUPI_and_Repairing = OldUPI_list ++ OldRepairing_list,
%% UPI_and_Repairing_length_ok_p =
%% length(OldUPI_and_Repairing) >= full_majority_size(AllMembers),
%% case {OldEpochNum, UPI_length_ok_p} of
%% {0, _} ->
%% calc_projection2(P_current, RelativeToServer, AllHosed,
%% Dbg, S);
%% {_, true} ->
%% calc_projection2(P_current, RelativeToServer, AllHosed,
%% Dbg, S);
%% {_, false} ->
%% {Up, _Partitions, RunEnv2} = calc_up_nodes(
%% MyName, AllMembers, RunEnv),
%% %% We can't improve on the current projection.
%% {P_current, S#ch_mgr{runenv=RunEnv2}, Up}
%% end
end.
%% AllHosed: FLUs that we must treat as if they are down, e.g., we are
@ -798,7 +822,9 @@ calc_projection2(LastProj, RelativeToServer, AllHosed, Dbg,
P2 = if CMode == cp_mode ->
UpWitnesses = [W || W <- Up, lists:member(W, OldWitness_list)],
Majority = full_majority_size(AllMembers),
SoFar = length(NewUPI),
%% A repairing node can also contribute to the quorum
%% majority required to attest to the history of the UPI.
SoFar = length(NewUPI ++ NewRepairing),
if SoFar >= Majority ->
?REACT({calc,?LINE,[]}),
P;
@ -1715,10 +1741,40 @@ react_to_env_C100(P_newprop,
?REACT(c100),
P_cur_for_sanity = if CMode == cp_mode ->
P_current_calc;
%% Assume E = P_latest's epoch #. P_current
%% may be at epoch E-delta but P_current_calc
%% is at exactly E because E-delta is stale,
%% and the CP world has changed while we were
%% napping. But the "exactly epoch E" will
%% cause problems for the strictly monotonic
%% epoch check in our sanity checking. So we
%% fake the epoch number here to work around
%% the strictly-SI check.
%%
%% If we don't fake the epoch here, then (I
%% have observed today) that we go through
%% several insane projection attempts and then
%% reset and eventually come to the right
%% answer ... but this churn is avoidable, and
%% for CP mode this is an ok safeguard to bend
%% expressly because of the make_zerf() aspect
%% of CP's chain processing.
E_c = erlang:min(
P_current#projection_v1.epoch_number,
P_current_calc#projection_v1.epoch_number),
P_current_calc#projection_v1{epoch_number=E_c};
CMode == ap_mode ->
P_current
end,
?REACT({c100, ?LINE,
[{current_epoch, P_cur_for_sanity#projection_v1.epoch_number},
{current_author, P_cur_for_sanity#projection_v1.author_server},
{current_upi, P_cur_for_sanity#projection_v1.upi},
{current_repairing, P_cur_for_sanity#projection_v1.repairing},
{latest_epoch, P_latest#projection_v1.epoch_number},
{latest_author, P_latest#projection_v1.author_server},
{latest_upi, P_latest#projection_v1.upi},
{latest_repairing, P_latest#projection_v1.repairing}]}),
Sane = projection_transition_is_sane(P_cur_for_sanity, P_latest, MyName),
if Sane == true ->
ok;
@ -1959,12 +2015,12 @@ react_to_env_C200(Retries, P_latest, S) ->
react_to_env_C210(Retries, #ch_mgr{name=MyName, proj=Proj} = S) ->
?REACT(c210),
sleep_ranked_order(10, 100, MyName, Proj#projection_v1.all_members),
sleep_ranked_order(250, 500, MyName, Proj#projection_v1.all_members),
react_to_env_C220(Retries, S).
react_to_env_C220(Retries, S) ->
?REACT(c220),
react_to_env_A20(Retries + 1, S).
react_to_env_A20(Retries + 1, manage_last_down_list(S)).
react_to_env_C300(#projection_v1{epoch_number=_Epoch_newprop}=P_newprop,
#projection_v1{epoch_number=_Epoch_latest}=_P_latest, S) ->
@ -1975,11 +2031,11 @@ react_to_env_C300(#projection_v1{epoch_number=_Epoch_newprop}=P_newprop,
react_to_env_C310(P_newprop, S) ->
?REACT(c310),
Epoch = P_newprop#projection_v1.epoch_number,
{WriteRes, S2} = cl_write_public_proj_skip_local_error(Epoch, P_newprop, S),
{WriteRes, S2} = cl_write_public_proj(Epoch, P_newprop, S),
?REACT({c310, ?LINE,
[{newprop, machi_projection:make_summary(P_newprop)},
{write_result, WriteRes}]}),
react_to_env_A10(S2).
react_to_env_A10(manage_last_down_list(S2)).
projection_transitions_are_sane(Ps, RelativeToServer) ->
projection_transitions_are_sane(Ps, RelativeToServer, false).
@ -2213,11 +2269,16 @@ projection_transition_is_sane_except_si_epoch(
%% CP mode extra sanity checks
if CMode1 == cp_mode ->
Majority = full_majority_size(All_list2),
UPI2_and_Repairing2 = UPI_list2 ++ Repairing_list2,
if length(UPI_list2) == 0 ->
ok; % none projection
length(UPI_list2) >= Majority ->
%% We have at least one non-witness
true = (length(UPI_list2 -- Witness_list2) > 0);
length(UPI2_and_Repairing2) >= Majority ->
%% We are assuming here that the client side is smart
%% enough to do the *safe* thing when the
%% length(UPI_list2) < Majority ... the client must use
%% the repairing nodes both as witnesses to check the
%% current epoch.
ok;
true ->
error({majority_not_met, UPI_list2})
end;
@ -2738,7 +2799,9 @@ make_zerf2(OldEpochNum, Up, MajoritySize, MyName, AllMembers, OldWitness_list,
try
#projection_v1{epoch_number=Epoch} = Proj =
zerf_find_last_common(MajoritySize, Up, S),
Proj2 = Proj#projection_v1{dbg2=[{make_zerf,Epoch}]},
Proj2 = Proj#projection_v1{dbg2=[{make_zerf,Epoch},
{yyy_hack, get(yyy_hack)},
{up,Up},{maj,MajoritySize}]},
%% io:format(user, "ZERF ~w\n",[machi_projection:make_summary(Proj2)]),
Proj2
catch
@ -2799,7 +2862,9 @@ zerf_find_last_annotated(FLU, MajoritySize, S) ->
(catch put(yyy_hack, [{FLU, Epoch, ok2}|get(yyy_hack)])),
Proj
end,
if length(Px#projection_v1.upi) >= MajoritySize ->
UPI_and_Repairing = Px#projection_v1.upi ++
Px#projection_v1.repairing,
if length(UPI_and_Repairing) >= MajoritySize ->
(catch put(yyy_hack, [{FLU, Epoch, yay}|get(yyy_hack)])),
Px;
true ->

View file

@ -86,7 +86,7 @@ send_spam_to_everyone(Pid) ->
init([{MyFluName}|Args]) ->
RegName = machi_flu_psup:make_fitness_regname(MyFluName),
register(RegName, self()),
timer:send_interval(1000, dump),
timer:send_interval(5000, dump),
UseSimulatorP = proplists:get_value(use_partition_simulator, Args, false),
{ok, #state{my_flu_name=MyFluName, reg_name=RegName,
partition_simulator_p=UseSimulatorP,
@ -184,8 +184,8 @@ handle_info({adjust_down_list, FLU}, #state{active_unfit=ActiveUnfit}=S) ->
end;
handle_info(dump, #state{my_flu_name=MyFluName,active_unfit=ActiveUnfit,
pending_map=Map}=S) ->
io:format(user, "DUMP: ~w/~w: ~p ~W\n", [MyFluName, self(), ActiveUnfit, map_value(Map), 13]),
%% io:format(user, "DUMP ~w: ~w, ", [MyFluName, ActiveUnfit]),
%% io:format(user, "DUMP: ~w/~w: ~p ~W\n", [MyFluName, self(), ActiveUnfit, map_value(Map), 13]),
io:format(user, "DUMP ~w: ~w, ", [MyFluName, ActiveUnfit]),
{noreply, S};
handle_info(_Info, S) ->
{noreply, S}.

View file

@ -137,6 +137,7 @@ write(PidSpec, ProjType, Proj, Timeout)
is_record(Proj, projection_v1),
is_integer(Proj#projection_v1.epoch_number),
Proj#projection_v1.epoch_number >= 0 ->
testing_sleep_perhaps(),
g_call(PidSpec, {write, ProjType, Proj}, Timeout).
%% @doc Fetch all projection records of type `ProjType'.
@ -176,7 +177,6 @@ set_consistency_mode(PidSpec, CMode)
%%%%%%%%%%%%%%%%%%%%%%%%%%%
g_call(PidSpec, Arg, Timeout) ->
testing_sleep_perhaps(),
LC1 = lclock_get(),
{Res, LC2} = gen_server:call(PidSpec, {Arg, LC1}, Timeout),
lclock_update(LC2),
@ -437,7 +437,9 @@ testing_sleep_perhaps() ->
try
[{_,Max}] = ets:lookup(?TEST_ETS_TABLE, projection_store_sleep_time),
MSec = random:uniform(Max),
timer:sleep(MSec * 5),
io:format(user, "{", []),
timer:sleep(MSec),
io:format(user, "}", []),
ok
catch _X:_Y ->
ok

View file

@ -201,8 +201,6 @@ convergence_demo_testfun(NumFLUs, MgrOpts0) ->
false -> ap_mode
end,
%% ets:insert(?TEST_ETS_TABLE, {projection_store_sleep_time, 25}),
try
[{_, Ma}|_] = MgrNamez,
{ok, P1} = ?MGR:test_calc_projection(Ma, false),
@ -220,6 +218,7 @@ convergence_demo_testfun(NumFLUs, MgrOpts0) ->
random:seed(now()),
[begin
erlang:yield(),
perhaps_adjust_pstore_sleep(),
S_max_rand = random:uniform(
S_max + 1),
%% io:format(user, "{t}", []),
@ -251,14 +250,16 @@ convergence_demo_testfun(NumFLUs, MgrOpts0) ->
[a_chmgr,b_chmgr,c_chmgr,d_chmgr,e_chmgr,f_chmgr,g_chmgr,h_chmgr,i_chmgr,j_chmgr] ++
[a_pstore,b_pstore,c_pstore,d_pstore,e_pstore,f_pstore,g_pstore,h_pstore,i_pstore,j_pstore] ++
[a_fitness,b_fitness,c_fitness,d_fitness,e_fitness,f_fitness,g_fitness,h_fitness,i_fitness,j_fitness] ],
[begin
timer:sleep(2*1000),
case whereis(XX) of
undefined -> ok;
XXPid -> {_, XXbin} = process_info(XXPid, backtrace),
io:format(user, "BACK ~w: ~w\n~s\n", [XX, time(), XXbin])
end
end || XX <- [a_pstore,b_pstore,c_pstore,d_pstore,e_pstore,f_pstore,g_pstore,h_pstore,i_pstore,j_pstore], _ <- lists:seq(1,5)],
[begin
[begin
case whereis(XX) of
undefined -> ok;
XXPid -> {_, XXbin} = process_info(XXPid, backtrace),
io:format(user, "BACK ~w: ~w\n~s\n", [XX, time(), XXbin])
end
end || XX <- [a_pstore,b_pstore,c_pstore,d_pstore,e_pstore,f_pstore,g_pstore,h_pstore,i_pstore,j_pstore] ],
timer:sleep(20)
end || _ <- lists:seq(1,30)],
exit({icky_timeout, M_name})
end || {ThePid,M_name} <- Pids]
end,
@ -421,19 +422,19 @@ make_partition_list(All_list) ->
A <- All_list, B <- All_list, A /= B,
C <- All_list, D <- All_list, C /= D,
X /= A, X /= C, A /= C],
_X_Ys4 = [[{X,Y}, {A,B}, {C,D}, {E,F}] ||
X <- All_list, Y <- All_list, X /= Y,
A <- All_list, B <- All_list, A /= B,
C <- All_list, D <- All_list, C /= D,
E <- All_list, F <- All_list, E /= F,
X /= A, X /= C, X /= E, A /= C, A /= E,
C /= E],
%% _X_Ys4 = [[{X,Y}, {A,B}, {C,D}, {E,F}] ||
%% X <- All_list, Y <- All_list, X /= Y,
%% A <- All_list, B <- All_list, A /= B,
%% C <- All_list, D <- All_list, C /= D,
%% E <- All_list, F <- All_list, E /= F,
%% X /= A, X /= C, X /= E, A /= C, A /= E,
%% C /= E],
%% Concat = _X_Ys1,
%% Concat = _X_Ys2,
%% Concat = _X_Ys1 ++ _X_Ys2,
%% %% Concat = _X_Ys3,
%% Concat = _X_Ys1 ++ _X_Ys2 ++ _X_Ys3,
Concat = _X_Ys1 ++ _X_Ys2 ++ _X_Ys3 ++ _X_Ys4,
Concat = _X_Ys1 ++ _X_Ys2 ++ _X_Ys3,
%% Concat = _X_Ys1 ++ _X_Ys2 ++ _X_Ys3 ++ _X_Ys4,
NoPartitions = lists:duplicate(trunc(length(Concat) * 0.1), []),
uniq_reverse(random_sort(lists:usort([lists:sort(L) || L <- Concat])
++ NoPartitions)).
@ -870,6 +871,15 @@ uniq_c([H|T], Count, H) ->
uniq_c([H|T], Count, Last) ->
[{Count, Last}|uniq_c(T, 1, H)].
perhaps_adjust_pstore_sleep() ->
try
{ok, Bin} = file:read_file("/tmp/pstore_sleep_msec"),
{MSec,_} = string:to_integer(binary_to_list(Bin)),
ets:insert(?TEST_ETS_TABLE, {projection_store_sleep_time, MSec})
catch _:_ ->
ok
end.
%% MaxIters = NumFLUs * (NumFLUs + 1) * 6,
%% Stable = fun(S_Namez) ->

View file

@ -313,8 +313,7 @@ smoke1_test() ->
{error, partition} -> timer:sleep(500);
_ -> ok
end,
{local_write_result, ok,
{remote_write_results, [{b,ok},{c,ok}]}} =
{remote_write_results,{true,[{c,ok},{b,ok},{a,ok}]}} =
?MGR:test_write_public_projection(M0, P1),
{unanimous, P1, Extra1} = ?MGR:test_read_latest_public_projection(M0, false),