From 69a304102ee1a39f77a4a627b403127a9837798a Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Mon, 21 Sep 2015 15:09:16 +0900 Subject: [PATCH] Write public proj in all_members order only --- src/machi_chain_manager1.erl | 75 +++++++++++++-------- src/machi_fitness.erl | 6 +- src/machi_projection_store.erl | 6 +- test/machi_chain_manager1_converge_demo.erl | 48 +++++++------ 4 files changed, 82 insertions(+), 53 deletions(-) diff --git a/src/machi_chain_manager1.erl b/src/machi_chain_manager1.erl index ebdb0bd..472d223 100644 --- a/src/machi_chain_manager1.erl +++ b/src/machi_chain_manager1.erl @@ -477,41 +477,58 @@ do_cl_write_public_proj(Proj, S) -> cl_write_public_proj(Epoch, Proj, S) -> cl_write_public_proj(Epoch, Proj, false, S). -cl_write_public_proj_skip_local_error(Epoch, Proj, S) -> +cl_write_public_proj_ignore_written_error(Epoch, Proj, S) -> cl_write_public_proj(Epoch, Proj, true, S). -cl_write_public_proj(Epoch, Proj, SkipLocalWriteErrorP, S) -> - %% Write to local public projection store first, and if it succeeds, +cl_write_public_proj(Epoch, Proj, IgnoreWrittenErrorP, S) -> + %% OLD: Write to local public projection store first, and if it succeeds, %% then write to all remote public projection stores. - cl_write_public_proj_local(Epoch, Proj, SkipLocalWriteErrorP, S). - -cl_write_public_proj_local(Epoch, Proj, SkipLocalWriteErrorP, - #ch_mgr{name=MyName}=S) -> + %% NEW: Hypothesis: The OLD idea is a bad idea and causes needless retries + %% via racing with other wrier + %% NEW: Let's see what kind of trouble we can get ourselves into if + %% we abort our writing efforts if we encounter an + %% {error,written} status. + %% Heh, that doesn't work too well: if we have random uniform write + %% delays of 0-1,500 msec, then we end up abandoning this epoch + %% number at the first sign of trouble and then re-iterate to suggest + %% a new epoch number ... which causes *more* thrash, not less. {_UpNodes, Partitions, S2} = calc_up_nodes(S), - Res0 = perhaps_call_t( - S, Partitions, MyName, - fun(Pid) -> ?FLU_PC:write_projection(Pid, public, Proj, ?TO) end), - Continue = fun() -> - FLUs = Proj#projection_v1.all_members -- [MyName], - cl_write_public_proj_remote(FLUs, Partitions, Epoch, Proj, S) - end, - case Res0 of - ok -> - {XX, SS} = Continue(), - {{local_write_result, ok, XX}, SS}; - Else when SkipLocalWriteErrorP -> - {XX, SS} = Continue(), - {{local_write_result, Else, XX}, SS}; - Else -> - {Else, S2} - end. + FLUs = Proj#projection_v1.all_members, + cl_write_public_proj2(FLUs, Partitions, Epoch, Proj, + IgnoreWrittenErrorP, S2). -cl_write_public_proj_remote(FLUs, Partitions, _Epoch, Proj, S) -> +cl_write_public_proj2(FLUs, Partitions, Epoch, Proj, IgnoreWrittenErrorP, S) -> %% We're going to be very care-free about this write because we'll rely %% on the read side to do any read repair. DoIt = fun(Pid) -> ?FLU_PC:write_projection(Pid, public, Proj, ?TO) end, - Rs = [{FLU, perhaps_call_t(S, Partitions, FLU, fun(Pid) -> DoIt(Pid) end)} || - FLU <- FLUs], + %% Rs = [{FLU, perhaps_call_t(S, Partitions, FLU, fun(Pid) -> DoIt(Pid) end)} || + %% FLU <- FLUs], + Rs = lists:foldl( + fun(FLU, {false=_KeepGoingP, Acc}) -> + {false, [{FLU,skipped}|Acc]}; + (FLU, {true=_KeepGoingP, Acc}) -> + case perhaps_call_t(S, Partitions, FLU, DoIt) of + {error,written}=Written when IgnoreWrittenErrorP -> + %% io:format(user, "\nTried but written and ignoring written\n", []), + {true, [{FLU,Written}|Acc]}; + {error,written}=Written when not IgnoreWrittenErrorP -> + %% Brute-force read-repair-like good enough. + DoRead = fun(Pid) -> ?FLU_PC:read_projection( + Pid, public, Epoch, ?TO) + end, + {ok, Proj2} = perhaps_call_t(S, Partitions, FLU, + DoRead), + DoIt2 = fun(Pid) -> ?FLU_PC:write_projection(Pid, public, Proj2, ?TO) end, + _Rs=[_ = perhaps_call_t(S, Partitions, Fl, DoIt2)|| + Fl <- FLUs], + %% io:format(user, "\nTried ~w ~W but repairing with ~w ~W: ~w\n", [Epoch, Proj#projection_v1.epoch_csum, 4, Epoch, Proj2#projection_v1.epoch_csum, 4, _Rs]), + {false, [{FLU,Written}|Acc]}; + Else -> + %% io:format(user, "\nTried but got an Else ~p\n", [Else]), + {true, [{FLU,Else}|Acc]} + end + end, {true, []}, FLUs), + %% io:format(user, "\nWrite public ~w by ~w: ~w\n", [Epoch, S#ch_mgr.name, Rs]), {{remote_write_results, Rs}, S}. do_cl_read_latest_public_projection(ReadRepairP, @@ -641,7 +658,7 @@ do_read_repair(FLUsRs, _Extra, #ch_mgr{proj=CurrentProj} = S) -> %% We're doing repair, so use the flavor that will %% continue to all others even if there is an %% error_written on the local FLU. - {_DontCare, _S2}=Res = cl_write_public_proj_skip_local_error( + {_DontCare, _S2}=Res = cl_write_public_proj_ignore_written_error( Epoch, BestProj, S), Res end. @@ -1975,7 +1992,7 @@ react_to_env_C300(#projection_v1{epoch_number=_Epoch_newprop}=P_newprop, react_to_env_C310(P_newprop, S) -> ?REACT(c310), Epoch = P_newprop#projection_v1.epoch_number, - {WriteRes, S2} = cl_write_public_proj_skip_local_error(Epoch, P_newprop, S), + {WriteRes, S2} = cl_write_public_proj(Epoch, P_newprop, S), ?REACT({c310, ?LINE, [{newprop, machi_projection:make_summary(P_newprop)}, {write_result, WriteRes}]}), diff --git a/src/machi_fitness.erl b/src/machi_fitness.erl index 515ad49..67cf21b 100644 --- a/src/machi_fitness.erl +++ b/src/machi_fitness.erl @@ -86,7 +86,7 @@ send_spam_to_everyone(Pid) -> init([{MyFluName}|Args]) -> RegName = machi_flu_psup:make_fitness_regname(MyFluName), register(RegName, self()), -timer:send_interval(1000, dump), +timer:send_interval(5000, dump), UseSimulatorP = proplists:get_value(use_partition_simulator, Args, false), {ok, #state{my_flu_name=MyFluName, reg_name=RegName, partition_simulator_p=UseSimulatorP, @@ -184,8 +184,8 @@ handle_info({adjust_down_list, FLU}, #state{active_unfit=ActiveUnfit}=S) -> end; handle_info(dump, #state{my_flu_name=MyFluName,active_unfit=ActiveUnfit, pending_map=Map}=S) -> - io:format(user, "DUMP: ~w/~w: ~p ~W\n", [MyFluName, self(), ActiveUnfit, map_value(Map), 13]), - %% io:format(user, "DUMP ~w: ~w, ", [MyFluName, ActiveUnfit]), + %% io:format(user, "DUMP: ~w/~w: ~p ~W\n", [MyFluName, self(), ActiveUnfit, map_value(Map), 13]), + io:format(user, "DUMP ~w: ~w, ", [MyFluName, ActiveUnfit]), {noreply, S}; handle_info(_Info, S) -> {noreply, S}. diff --git a/src/machi_projection_store.erl b/src/machi_projection_store.erl index 68277a2..ba50544 100644 --- a/src/machi_projection_store.erl +++ b/src/machi_projection_store.erl @@ -137,6 +137,7 @@ write(PidSpec, ProjType, Proj, Timeout) is_record(Proj, projection_v1), is_integer(Proj#projection_v1.epoch_number), Proj#projection_v1.epoch_number >= 0 -> + testing_sleep_perhaps(), g_call(PidSpec, {write, ProjType, Proj}, Timeout). %% @doc Fetch all projection records of type `ProjType'. @@ -176,7 +177,6 @@ set_consistency_mode(PidSpec, CMode) %%%%%%%%%%%%%%%%%%%%%%%%%%% g_call(PidSpec, Arg, Timeout) -> - testing_sleep_perhaps(), LC1 = lclock_get(), {Res, LC2} = gen_server:call(PidSpec, {Arg, LC1}, Timeout), lclock_update(LC2), @@ -437,7 +437,9 @@ testing_sleep_perhaps() -> try [{_,Max}] = ets:lookup(?TEST_ETS_TABLE, projection_store_sleep_time), MSec = random:uniform(Max), - timer:sleep(MSec * 5), + io:format(user, "{", []), + timer:sleep(MSec), + io:format(user, "}", []), ok catch _X:_Y -> ok diff --git a/test/machi_chain_manager1_converge_demo.erl b/test/machi_chain_manager1_converge_demo.erl index 86f27dc..782e7be 100644 --- a/test/machi_chain_manager1_converge_demo.erl +++ b/test/machi_chain_manager1_converge_demo.erl @@ -201,8 +201,6 @@ convergence_demo_testfun(NumFLUs, MgrOpts0) -> false -> ap_mode end, - %% ets:insert(?TEST_ETS_TABLE, {projection_store_sleep_time, 25}), - try [{_, Ma}|_] = MgrNamez, {ok, P1} = ?MGR:test_calc_projection(Ma, false), @@ -220,6 +218,7 @@ convergence_demo_testfun(NumFLUs, MgrOpts0) -> random:seed(now()), [begin erlang:yield(), + perhaps_adjust_pstore_sleep(), S_max_rand = random:uniform( S_max + 1), %% io:format(user, "{t}", []), @@ -251,14 +250,16 @@ convergence_demo_testfun(NumFLUs, MgrOpts0) -> [a_chmgr,b_chmgr,c_chmgr,d_chmgr,e_chmgr,f_chmgr,g_chmgr,h_chmgr,i_chmgr,j_chmgr] ++ [a_pstore,b_pstore,c_pstore,d_pstore,e_pstore,f_pstore,g_pstore,h_pstore,i_pstore,j_pstore] ++ [a_fitness,b_fitness,c_fitness,d_fitness,e_fitness,f_fitness,g_fitness,h_fitness,i_fitness,j_fitness] ], - [begin - timer:sleep(2*1000), - case whereis(XX) of - undefined -> ok; - XXPid -> {_, XXbin} = process_info(XXPid, backtrace), - io:format(user, "BACK ~w: ~w\n~s\n", [XX, time(), XXbin]) - end - end || XX <- [a_pstore,b_pstore,c_pstore,d_pstore,e_pstore,f_pstore,g_pstore,h_pstore,i_pstore,j_pstore], _ <- lists:seq(1,5)], + [begin + [begin + case whereis(XX) of + undefined -> ok; + XXPid -> {_, XXbin} = process_info(XXPid, backtrace), + io:format(user, "BACK ~w: ~w\n~s\n", [XX, time(), XXbin]) + end + end || XX <- [a_pstore,b_pstore,c_pstore,d_pstore,e_pstore,f_pstore,g_pstore,h_pstore,i_pstore,j_pstore] ], + timer:sleep(20) + end || _ <- lists:seq(1,30)], exit({icky_timeout, M_name}) end || {ThePid,M_name} <- Pids] end, @@ -421,19 +422,19 @@ make_partition_list(All_list) -> A <- All_list, B <- All_list, A /= B, C <- All_list, D <- All_list, C /= D, X /= A, X /= C, A /= C], - _X_Ys4 = [[{X,Y}, {A,B}, {C,D}, {E,F}] || - X <- All_list, Y <- All_list, X /= Y, - A <- All_list, B <- All_list, A /= B, - C <- All_list, D <- All_list, C /= D, - E <- All_list, F <- All_list, E /= F, - X /= A, X /= C, X /= E, A /= C, A /= E, - C /= E], + %% _X_Ys4 = [[{X,Y}, {A,B}, {C,D}, {E,F}] || + %% X <- All_list, Y <- All_list, X /= Y, + %% A <- All_list, B <- All_list, A /= B, + %% C <- All_list, D <- All_list, C /= D, + %% E <- All_list, F <- All_list, E /= F, + %% X /= A, X /= C, X /= E, A /= C, A /= E, + %% C /= E], %% Concat = _X_Ys1, %% Concat = _X_Ys2, %% Concat = _X_Ys1 ++ _X_Ys2, %% %% Concat = _X_Ys3, - %% Concat = _X_Ys1 ++ _X_Ys2 ++ _X_Ys3, - Concat = _X_Ys1 ++ _X_Ys2 ++ _X_Ys3 ++ _X_Ys4, + Concat = _X_Ys1 ++ _X_Ys2 ++ _X_Ys3, + %% Concat = _X_Ys1 ++ _X_Ys2 ++ _X_Ys3 ++ _X_Ys4, NoPartitions = lists:duplicate(trunc(length(Concat) * 0.1), []), uniq_reverse(random_sort(lists:usort([lists:sort(L) || L <- Concat]) ++ NoPartitions)). @@ -870,6 +871,15 @@ uniq_c([H|T], Count, H) -> uniq_c([H|T], Count, Last) -> [{Count, Last}|uniq_c(T, 1, H)]. +perhaps_adjust_pstore_sleep() -> + try + {ok, Bin} = file:read_file("/tmp/pstore_sleep_msec"), + {MSec,_} = string:to_integer(binary_to_list(Bin)), + ets:insert(?TEST_ETS_TABLE, {projection_store_sleep_time, MSec}) + catch _:_ -> + ok + end. + %% MaxIters = NumFLUs * (NumFLUs + 1) * 6, %% Stable = fun(S_Namez) ->