diff --git a/src/machi_chain_manager1.erl b/src/machi_chain_manager1.erl index ebdb0bd..83a5a93 100644 --- a/src/machi_chain_manager1.erl +++ b/src/machi_chain_manager1.erl @@ -477,41 +477,58 @@ do_cl_write_public_proj(Proj, S) -> cl_write_public_proj(Epoch, Proj, S) -> cl_write_public_proj(Epoch, Proj, false, S). -cl_write_public_proj_skip_local_error(Epoch, Proj, S) -> +cl_write_public_proj_ignore_written_error(Epoch, Proj, S) -> cl_write_public_proj(Epoch, Proj, true, S). -cl_write_public_proj(Epoch, Proj, SkipLocalWriteErrorP, S) -> - %% Write to local public projection store first, and if it succeeds, +cl_write_public_proj(Epoch, Proj, IgnoreWrittenErrorP, S) -> + %% OLD: Write to local public projection store first, and if it succeeds, %% then write to all remote public projection stores. - cl_write_public_proj_local(Epoch, Proj, SkipLocalWriteErrorP, S). - -cl_write_public_proj_local(Epoch, Proj, SkipLocalWriteErrorP, - #ch_mgr{name=MyName}=S) -> + %% NEW: Hypothesis: The OLD idea is a bad idea and causes needless retries + %% via racing with other wrier + %% NEW: Let's see what kind of trouble we can get ourselves into if + %% we abort our writing efforts if we encounter an + %% {error,written} status. + %% Heh, that doesn't work too well: if we have random uniform write + %% delays of 0-1,500 msec, then we end up abandoning this epoch + %% number at the first sign of trouble and then re-iterate to suggest + %% a new epoch number ... which causes *more* thrash, not less. {_UpNodes, Partitions, S2} = calc_up_nodes(S), - Res0 = perhaps_call_t( - S, Partitions, MyName, - fun(Pid) -> ?FLU_PC:write_projection(Pid, public, Proj, ?TO) end), - Continue = fun() -> - FLUs = Proj#projection_v1.all_members -- [MyName], - cl_write_public_proj_remote(FLUs, Partitions, Epoch, Proj, S) - end, - case Res0 of - ok -> - {XX, SS} = Continue(), - {{local_write_result, ok, XX}, SS}; - Else when SkipLocalWriteErrorP -> - {XX, SS} = Continue(), - {{local_write_result, Else, XX}, SS}; - Else -> - {Else, S2} - end. + FLUs = Proj#projection_v1.all_members, + cl_write_public_proj2(FLUs, Partitions, Epoch, Proj, + IgnoreWrittenErrorP, S2). -cl_write_public_proj_remote(FLUs, Partitions, _Epoch, Proj, S) -> +cl_write_public_proj2(FLUs, Partitions, Epoch, Proj, IgnoreWrittenErrorP, S) -> %% We're going to be very care-free about this write because we'll rely %% on the read side to do any read repair. DoIt = fun(Pid) -> ?FLU_PC:write_projection(Pid, public, Proj, ?TO) end, - Rs = [{FLU, perhaps_call_t(S, Partitions, FLU, fun(Pid) -> DoIt(Pid) end)} || - FLU <- FLUs], + %% Rs = [{FLU, perhaps_call_t(S, Partitions, FLU, fun(Pid) -> DoIt(Pid) end)} || + %% FLU <- FLUs], + Rs = lists:foldl( + fun(FLU, {false=_KeepGoingP, Acc}) -> + {false, [{FLU,skipped}|Acc]}; + (FLU, {true=_KeepGoingP, Acc}) -> + case perhaps_call_t(S, Partitions, FLU, DoIt) of + {error,written}=Written when IgnoreWrittenErrorP -> + %% io:format(user, "\nTried but written and ignoring written\n", []), + {true, [{FLU,Written}|Acc]}; + {error,written}=Written when not IgnoreWrittenErrorP -> + %% Brute-force read-repair-like good enough. + DoRead = fun(Pid) -> ?FLU_PC:read_projection( + Pid, public, Epoch, ?TO) + end, + {ok, Proj2} = perhaps_call_t(S, Partitions, FLU, + DoRead), + DoIt2 = fun(Pid) -> ?FLU_PC:write_projection(Pid, public, Proj2, ?TO) end, + _Rs=[_ = perhaps_call_t(S, Partitions, Fl, DoIt2)|| + Fl <- FLUs], + %% io:format(user, "\nTried ~w ~W but repairing with ~w ~W: ~w\n", [Epoch, Proj#projection_v1.epoch_csum, 4, Epoch, Proj2#projection_v1.epoch_csum, 4, _Rs]), + {false, [{FLU,Written}|Acc]}; + Else -> + %% io:format(user, "\nTried but got an Else ~p\n", [Else]), + {true, [{FLU,Else}|Acc]} + end + end, {true, []}, FLUs), + %% io:format(user, "\nWrite public ~w by ~w: ~w\n", [Epoch, S#ch_mgr.name, Rs]), {{remote_write_results, Rs}, S}. do_cl_read_latest_public_projection(ReadRepairP, @@ -596,6 +613,9 @@ rank_and_sort_projections_with_extra(All_queried_list, FLUsRs, ProjectionType, NotBestPsEpochFilt = [Proj || Proj <- Ps, Proj /= BestProj, Proj#projection_v1.epoch_number == BestEpoch], + BadAnswers2 = [Answer || {_FLU, Answer} <- FLUsRs, + not is_record(Answer, projection_v1) + orelse Answer /= BestProj], %% Wow, I'm not sure how long this bug has been here, but it's %% likely 5 months old (April 2015). I just now noticed a problem %% where BestProj was epoch 1194, but NotBestPs contained a @@ -620,7 +640,7 @@ rank_and_sort_projections_with_extra(All_queried_list, FLUsRs, ProjectionType, {not_unanimous_flus, All_queried_list -- (Best_FLUs ++ BadAnswerFLUs)}, {bad_answer_flus, BadAnswerFLUs}, - {bad_answers, BadAnswers}, + {bad_answers, BadAnswers2}, {not_best_ps, NotBestPs}, {not_best_ps_epoch_filt, NotBestPsEpochFilt}|Extra], {UnanimousTag, BestProj, Extra2, S} @@ -641,7 +661,7 @@ do_read_repair(FLUsRs, _Extra, #ch_mgr{proj=CurrentProj} = S) -> %% We're doing repair, so use the flavor that will %% continue to all others even if there is an %% error_written on the local FLU. - {_DontCare, _S2}=Res = cl_write_public_proj_skip_local_error( + {_DontCare, _S2}=Res = cl_write_public_proj_ignore_written_error( Epoch, BestProj, S), Res end. @@ -661,25 +681,29 @@ calc_projection(#ch_mgr{name=MyName, consistency_mode=CMode, if CMode == ap_mode -> calc_projection2(P_current, RelativeToServer, AllHosed, Dbg, S); CMode == cp_mode -> - #projection_v1{epoch_number=OldEpochNum, - all_members=AllMembers, - upi=OldUPI_list - } = P_current, - UPI_length_ok_p = - length(OldUPI_list) >= full_majority_size(AllMembers), - case {OldEpochNum, UPI_length_ok_p} of - {0, _} -> - calc_projection2(P_current, RelativeToServer, AllHosed, - Dbg, S); - {_, true} -> - calc_projection2(P_current, RelativeToServer, AllHosed, - Dbg, S); - {_, false} -> - {Up, _Partitions, RunEnv2} = calc_up_nodes( - MyName, AllMembers, RunEnv), - %% We can't improve on the current projection. - {P_current, S#ch_mgr{runenv=RunEnv2}, Up} - end + calc_projection2(P_current, RelativeToServer, AllHosed, Dbg, S) + %% TODO EXPERIMENT 2015-09-21 DELETE-ME??? + %% #projection_v1{epoch_number=OldEpochNum, + %% all_members=AllMembers, + %% upi=OldUPI_list, + %% upi=OldRepairing_list + %% } = P_current, + %% OldUPI_and_Repairing = OldUPI_list ++ OldRepairing_list, + %% UPI_and_Repairing_length_ok_p = + %% length(OldUPI_and_Repairing) >= full_majority_size(AllMembers), + %% case {OldEpochNum, UPI_length_ok_p} of + %% {0, _} -> + %% calc_projection2(P_current, RelativeToServer, AllHosed, + %% Dbg, S); + %% {_, true} -> + %% calc_projection2(P_current, RelativeToServer, AllHosed, + %% Dbg, S); + %% {_, false} -> + %% {Up, _Partitions, RunEnv2} = calc_up_nodes( + %% MyName, AllMembers, RunEnv), + %% %% We can't improve on the current projection. + %% {P_current, S#ch_mgr{runenv=RunEnv2}, Up} + %% end end. %% AllHosed: FLUs that we must treat as if they are down, e.g., we are @@ -798,7 +822,9 @@ calc_projection2(LastProj, RelativeToServer, AllHosed, Dbg, P2 = if CMode == cp_mode -> UpWitnesses = [W || W <- Up, lists:member(W, OldWitness_list)], Majority = full_majority_size(AllMembers), - SoFar = length(NewUPI), + %% A repairing node can also contribute to the quorum + %% majority required to attest to the history of the UPI. + SoFar = length(NewUPI ++ NewRepairing), if SoFar >= Majority -> ?REACT({calc,?LINE,[]}), P; @@ -1715,10 +1741,40 @@ react_to_env_C100(P_newprop, ?REACT(c100), P_cur_for_sanity = if CMode == cp_mode -> - P_current_calc; + %% Assume E = P_latest's epoch #. P_current + %% may be at epoch E-delta but P_current_calc + %% is at exactly E because E-delta is stale, + %% and the CP world has changed while we were + %% napping. But the "exactly epoch E" will + %% cause problems for the strictly monotonic + %% epoch check in our sanity checking. So we + %% fake the epoch number here to work around + %% the strictly-SI check. + %% + %% If we don't fake the epoch here, then (I + %% have observed today) that we go through + %% several insane projection attempts and then + %% reset and eventually come to the right + %% answer ... but this churn is avoidable, and + %% for CP mode this is an ok safeguard to bend + %% expressly because of the make_zerf() aspect + %% of CP's chain processing. + E_c = erlang:min( + P_current#projection_v1.epoch_number, + P_current_calc#projection_v1.epoch_number), + P_current_calc#projection_v1{epoch_number=E_c}; CMode == ap_mode -> P_current end, + ?REACT({c100, ?LINE, + [{current_epoch, P_cur_for_sanity#projection_v1.epoch_number}, + {current_author, P_cur_for_sanity#projection_v1.author_server}, + {current_upi, P_cur_for_sanity#projection_v1.upi}, + {current_repairing, P_cur_for_sanity#projection_v1.repairing}, + {latest_epoch, P_latest#projection_v1.epoch_number}, + {latest_author, P_latest#projection_v1.author_server}, + {latest_upi, P_latest#projection_v1.upi}, + {latest_repairing, P_latest#projection_v1.repairing}]}), Sane = projection_transition_is_sane(P_cur_for_sanity, P_latest, MyName), if Sane == true -> ok; @@ -1959,12 +2015,12 @@ react_to_env_C200(Retries, P_latest, S) -> react_to_env_C210(Retries, #ch_mgr{name=MyName, proj=Proj} = S) -> ?REACT(c210), - sleep_ranked_order(10, 100, MyName, Proj#projection_v1.all_members), + sleep_ranked_order(250, 500, MyName, Proj#projection_v1.all_members), react_to_env_C220(Retries, S). react_to_env_C220(Retries, S) -> ?REACT(c220), - react_to_env_A20(Retries + 1, S). + react_to_env_A20(Retries + 1, manage_last_down_list(S)). react_to_env_C300(#projection_v1{epoch_number=_Epoch_newprop}=P_newprop, #projection_v1{epoch_number=_Epoch_latest}=_P_latest, S) -> @@ -1975,11 +2031,11 @@ react_to_env_C300(#projection_v1{epoch_number=_Epoch_newprop}=P_newprop, react_to_env_C310(P_newprop, S) -> ?REACT(c310), Epoch = P_newprop#projection_v1.epoch_number, - {WriteRes, S2} = cl_write_public_proj_skip_local_error(Epoch, P_newprop, S), + {WriteRes, S2} = cl_write_public_proj(Epoch, P_newprop, S), ?REACT({c310, ?LINE, [{newprop, machi_projection:make_summary(P_newprop)}, {write_result, WriteRes}]}), - react_to_env_A10(S2). + react_to_env_A10(manage_last_down_list(S2)). projection_transitions_are_sane(Ps, RelativeToServer) -> projection_transitions_are_sane(Ps, RelativeToServer, false). @@ -2213,11 +2269,16 @@ projection_transition_is_sane_except_si_epoch( %% CP mode extra sanity checks if CMode1 == cp_mode -> Majority = full_majority_size(All_list2), + UPI2_and_Repairing2 = UPI_list2 ++ Repairing_list2, if length(UPI_list2) == 0 -> ok; % none projection - length(UPI_list2) >= Majority -> - %% We have at least one non-witness - true = (length(UPI_list2 -- Witness_list2) > 0); + length(UPI2_and_Repairing2) >= Majority -> + %% We are assuming here that the client side is smart + %% enough to do the *safe* thing when the + %% length(UPI_list2) < Majority ... the client must use + %% the repairing nodes both as witnesses to check the + %% current epoch. + ok; true -> error({majority_not_met, UPI_list2}) end; @@ -2738,7 +2799,9 @@ make_zerf2(OldEpochNum, Up, MajoritySize, MyName, AllMembers, OldWitness_list, try #projection_v1{epoch_number=Epoch} = Proj = zerf_find_last_common(MajoritySize, Up, S), - Proj2 = Proj#projection_v1{dbg2=[{make_zerf,Epoch}]}, + Proj2 = Proj#projection_v1{dbg2=[{make_zerf,Epoch}, + {yyy_hack, get(yyy_hack)}, + {up,Up},{maj,MajoritySize}]}, %% io:format(user, "ZERF ~w\n",[machi_projection:make_summary(Proj2)]), Proj2 catch @@ -2799,7 +2862,9 @@ zerf_find_last_annotated(FLU, MajoritySize, S) -> (catch put(yyy_hack, [{FLU, Epoch, ok2}|get(yyy_hack)])), Proj end, - if length(Px#projection_v1.upi) >= MajoritySize -> + UPI_and_Repairing = Px#projection_v1.upi ++ + Px#projection_v1.repairing, + if length(UPI_and_Repairing) >= MajoritySize -> (catch put(yyy_hack, [{FLU, Epoch, yay}|get(yyy_hack)])), Px; true -> diff --git a/src/machi_fitness.erl b/src/machi_fitness.erl index 515ad49..67cf21b 100644 --- a/src/machi_fitness.erl +++ b/src/machi_fitness.erl @@ -86,7 +86,7 @@ send_spam_to_everyone(Pid) -> init([{MyFluName}|Args]) -> RegName = machi_flu_psup:make_fitness_regname(MyFluName), register(RegName, self()), -timer:send_interval(1000, dump), +timer:send_interval(5000, dump), UseSimulatorP = proplists:get_value(use_partition_simulator, Args, false), {ok, #state{my_flu_name=MyFluName, reg_name=RegName, partition_simulator_p=UseSimulatorP, @@ -184,8 +184,8 @@ handle_info({adjust_down_list, FLU}, #state{active_unfit=ActiveUnfit}=S) -> end; handle_info(dump, #state{my_flu_name=MyFluName,active_unfit=ActiveUnfit, pending_map=Map}=S) -> - io:format(user, "DUMP: ~w/~w: ~p ~W\n", [MyFluName, self(), ActiveUnfit, map_value(Map), 13]), - %% io:format(user, "DUMP ~w: ~w, ", [MyFluName, ActiveUnfit]), + %% io:format(user, "DUMP: ~w/~w: ~p ~W\n", [MyFluName, self(), ActiveUnfit, map_value(Map), 13]), + io:format(user, "DUMP ~w: ~w, ", [MyFluName, ActiveUnfit]), {noreply, S}; handle_info(_Info, S) -> {noreply, S}. diff --git a/src/machi_projection_store.erl b/src/machi_projection_store.erl index 68277a2..ba50544 100644 --- a/src/machi_projection_store.erl +++ b/src/machi_projection_store.erl @@ -137,6 +137,7 @@ write(PidSpec, ProjType, Proj, Timeout) is_record(Proj, projection_v1), is_integer(Proj#projection_v1.epoch_number), Proj#projection_v1.epoch_number >= 0 -> + testing_sleep_perhaps(), g_call(PidSpec, {write, ProjType, Proj}, Timeout). %% @doc Fetch all projection records of type `ProjType'. @@ -176,7 +177,6 @@ set_consistency_mode(PidSpec, CMode) %%%%%%%%%%%%%%%%%%%%%%%%%%% g_call(PidSpec, Arg, Timeout) -> - testing_sleep_perhaps(), LC1 = lclock_get(), {Res, LC2} = gen_server:call(PidSpec, {Arg, LC1}, Timeout), lclock_update(LC2), @@ -437,7 +437,9 @@ testing_sleep_perhaps() -> try [{_,Max}] = ets:lookup(?TEST_ETS_TABLE, projection_store_sleep_time), MSec = random:uniform(Max), - timer:sleep(MSec * 5), + io:format(user, "{", []), + timer:sleep(MSec), + io:format(user, "}", []), ok catch _X:_Y -> ok diff --git a/test/machi_chain_manager1_converge_demo.erl b/test/machi_chain_manager1_converge_demo.erl index 86f27dc..782e7be 100644 --- a/test/machi_chain_manager1_converge_demo.erl +++ b/test/machi_chain_manager1_converge_demo.erl @@ -201,8 +201,6 @@ convergence_demo_testfun(NumFLUs, MgrOpts0) -> false -> ap_mode end, - %% ets:insert(?TEST_ETS_TABLE, {projection_store_sleep_time, 25}), - try [{_, Ma}|_] = MgrNamez, {ok, P1} = ?MGR:test_calc_projection(Ma, false), @@ -220,6 +218,7 @@ convergence_demo_testfun(NumFLUs, MgrOpts0) -> random:seed(now()), [begin erlang:yield(), + perhaps_adjust_pstore_sleep(), S_max_rand = random:uniform( S_max + 1), %% io:format(user, "{t}", []), @@ -251,14 +250,16 @@ convergence_demo_testfun(NumFLUs, MgrOpts0) -> [a_chmgr,b_chmgr,c_chmgr,d_chmgr,e_chmgr,f_chmgr,g_chmgr,h_chmgr,i_chmgr,j_chmgr] ++ [a_pstore,b_pstore,c_pstore,d_pstore,e_pstore,f_pstore,g_pstore,h_pstore,i_pstore,j_pstore] ++ [a_fitness,b_fitness,c_fitness,d_fitness,e_fitness,f_fitness,g_fitness,h_fitness,i_fitness,j_fitness] ], - [begin - timer:sleep(2*1000), - case whereis(XX) of - undefined -> ok; - XXPid -> {_, XXbin} = process_info(XXPid, backtrace), - io:format(user, "BACK ~w: ~w\n~s\n", [XX, time(), XXbin]) - end - end || XX <- [a_pstore,b_pstore,c_pstore,d_pstore,e_pstore,f_pstore,g_pstore,h_pstore,i_pstore,j_pstore], _ <- lists:seq(1,5)], + [begin + [begin + case whereis(XX) of + undefined -> ok; + XXPid -> {_, XXbin} = process_info(XXPid, backtrace), + io:format(user, "BACK ~w: ~w\n~s\n", [XX, time(), XXbin]) + end + end || XX <- [a_pstore,b_pstore,c_pstore,d_pstore,e_pstore,f_pstore,g_pstore,h_pstore,i_pstore,j_pstore] ], + timer:sleep(20) + end || _ <- lists:seq(1,30)], exit({icky_timeout, M_name}) end || {ThePid,M_name} <- Pids] end, @@ -421,19 +422,19 @@ make_partition_list(All_list) -> A <- All_list, B <- All_list, A /= B, C <- All_list, D <- All_list, C /= D, X /= A, X /= C, A /= C], - _X_Ys4 = [[{X,Y}, {A,B}, {C,D}, {E,F}] || - X <- All_list, Y <- All_list, X /= Y, - A <- All_list, B <- All_list, A /= B, - C <- All_list, D <- All_list, C /= D, - E <- All_list, F <- All_list, E /= F, - X /= A, X /= C, X /= E, A /= C, A /= E, - C /= E], + %% _X_Ys4 = [[{X,Y}, {A,B}, {C,D}, {E,F}] || + %% X <- All_list, Y <- All_list, X /= Y, + %% A <- All_list, B <- All_list, A /= B, + %% C <- All_list, D <- All_list, C /= D, + %% E <- All_list, F <- All_list, E /= F, + %% X /= A, X /= C, X /= E, A /= C, A /= E, + %% C /= E], %% Concat = _X_Ys1, %% Concat = _X_Ys2, %% Concat = _X_Ys1 ++ _X_Ys2, %% %% Concat = _X_Ys3, - %% Concat = _X_Ys1 ++ _X_Ys2 ++ _X_Ys3, - Concat = _X_Ys1 ++ _X_Ys2 ++ _X_Ys3 ++ _X_Ys4, + Concat = _X_Ys1 ++ _X_Ys2 ++ _X_Ys3, + %% Concat = _X_Ys1 ++ _X_Ys2 ++ _X_Ys3 ++ _X_Ys4, NoPartitions = lists:duplicate(trunc(length(Concat) * 0.1), []), uniq_reverse(random_sort(lists:usort([lists:sort(L) || L <- Concat]) ++ NoPartitions)). @@ -870,6 +871,15 @@ uniq_c([H|T], Count, H) -> uniq_c([H|T], Count, Last) -> [{Count, Last}|uniq_c(T, 1, H)]. +perhaps_adjust_pstore_sleep() -> + try + {ok, Bin} = file:read_file("/tmp/pstore_sleep_msec"), + {MSec,_} = string:to_integer(binary_to_list(Bin)), + ets:insert(?TEST_ETS_TABLE, {projection_store_sleep_time, MSec}) + catch _:_ -> + ok + end. + %% MaxIters = NumFLUs * (NumFLUs + 1) * 6, %% Stable = fun(S_Namez) -> diff --git a/test/machi_chain_manager1_test.erl b/test/machi_chain_manager1_test.erl index 2390078..df02269 100644 --- a/test/machi_chain_manager1_test.erl +++ b/test/machi_chain_manager1_test.erl @@ -313,8 +313,7 @@ smoke1_test() -> {error, partition} -> timer:sleep(500); _ -> ok end, - {local_write_result, ok, - {remote_write_results, [{b,ok},{c,ok}]}} = + {remote_write_results,{true,[{c,ok},{b,ok},{a,ok}]}} = ?MGR:test_write_public_projection(M0, P1), {unanimous, P1, Extra1} = ?MGR:test_read_latest_public_projection(M0, false),