Write public proj in all_members order only

This commit is contained in:
Scott Lystig Fritchie 2015-09-21 15:09:16 +09:00
parent 58b19e76be
commit 69a304102e
4 changed files with 82 additions and 53 deletions

View file

@ -477,41 +477,58 @@ do_cl_write_public_proj(Proj, S) ->
cl_write_public_proj(Epoch, Proj, S) ->
cl_write_public_proj(Epoch, Proj, false, S).
cl_write_public_proj_skip_local_error(Epoch, Proj, S) ->
cl_write_public_proj_ignore_written_error(Epoch, Proj, S) ->
cl_write_public_proj(Epoch, Proj, true, S).
cl_write_public_proj(Epoch, Proj, SkipLocalWriteErrorP, S) ->
%% Write to local public projection store first, and if it succeeds,
cl_write_public_proj(Epoch, Proj, IgnoreWrittenErrorP, S) ->
%% OLD: Write to local public projection store first, and if it succeeds,
%% then write to all remote public projection stores.
cl_write_public_proj_local(Epoch, Proj, SkipLocalWriteErrorP, S).
cl_write_public_proj_local(Epoch, Proj, SkipLocalWriteErrorP,
#ch_mgr{name=MyName}=S) ->
%% NEW: Hypothesis: The OLD idea is a bad idea and causes needless retries
%% via racing with other wrier
%% NEW: Let's see what kind of trouble we can get ourselves into if
%% we abort our writing efforts if we encounter an
%% {error,written} status.
%% Heh, that doesn't work too well: if we have random uniform write
%% delays of 0-1,500 msec, then we end up abandoning this epoch
%% number at the first sign of trouble and then re-iterate to suggest
%% a new epoch number ... which causes *more* thrash, not less.
{_UpNodes, Partitions, S2} = calc_up_nodes(S),
Res0 = perhaps_call_t(
S, Partitions, MyName,
fun(Pid) -> ?FLU_PC:write_projection(Pid, public, Proj, ?TO) end),
Continue = fun() ->
FLUs = Proj#projection_v1.all_members -- [MyName],
cl_write_public_proj_remote(FLUs, Partitions, Epoch, Proj, S)
end,
case Res0 of
ok ->
{XX, SS} = Continue(),
{{local_write_result, ok, XX}, SS};
Else when SkipLocalWriteErrorP ->
{XX, SS} = Continue(),
{{local_write_result, Else, XX}, SS};
Else ->
{Else, S2}
end.
FLUs = Proj#projection_v1.all_members,
cl_write_public_proj2(FLUs, Partitions, Epoch, Proj,
IgnoreWrittenErrorP, S2).
cl_write_public_proj_remote(FLUs, Partitions, _Epoch, Proj, S) ->
cl_write_public_proj2(FLUs, Partitions, Epoch, Proj, IgnoreWrittenErrorP, S) ->
%% We're going to be very care-free about this write because we'll rely
%% on the read side to do any read repair.
DoIt = fun(Pid) -> ?FLU_PC:write_projection(Pid, public, Proj, ?TO) end,
Rs = [{FLU, perhaps_call_t(S, Partitions, FLU, fun(Pid) -> DoIt(Pid) end)} ||
FLU <- FLUs],
%% Rs = [{FLU, perhaps_call_t(S, Partitions, FLU, fun(Pid) -> DoIt(Pid) end)} ||
%% FLU <- FLUs],
Rs = lists:foldl(
fun(FLU, {false=_KeepGoingP, Acc}) ->
{false, [{FLU,skipped}|Acc]};
(FLU, {true=_KeepGoingP, Acc}) ->
case perhaps_call_t(S, Partitions, FLU, DoIt) of
{error,written}=Written when IgnoreWrittenErrorP ->
%% io:format(user, "\nTried but written and ignoring written\n", []),
{true, [{FLU,Written}|Acc]};
{error,written}=Written when not IgnoreWrittenErrorP ->
%% Brute-force read-repair-like good enough.
DoRead = fun(Pid) -> ?FLU_PC:read_projection(
Pid, public, Epoch, ?TO)
end,
{ok, Proj2} = perhaps_call_t(S, Partitions, FLU,
DoRead),
DoIt2 = fun(Pid) -> ?FLU_PC:write_projection(Pid, public, Proj2, ?TO) end,
_Rs=[_ = perhaps_call_t(S, Partitions, Fl, DoIt2)||
Fl <- FLUs],
%% io:format(user, "\nTried ~w ~W but repairing with ~w ~W: ~w\n", [Epoch, Proj#projection_v1.epoch_csum, 4, Epoch, Proj2#projection_v1.epoch_csum, 4, _Rs]),
{false, [{FLU,Written}|Acc]};
Else ->
%% io:format(user, "\nTried but got an Else ~p\n", [Else]),
{true, [{FLU,Else}|Acc]}
end
end, {true, []}, FLUs),
%% io:format(user, "\nWrite public ~w by ~w: ~w\n", [Epoch, S#ch_mgr.name, Rs]),
{{remote_write_results, Rs}, S}.
do_cl_read_latest_public_projection(ReadRepairP,
@ -641,7 +658,7 @@ do_read_repair(FLUsRs, _Extra, #ch_mgr{proj=CurrentProj} = S) ->
%% We're doing repair, so use the flavor that will
%% continue to all others even if there is an
%% error_written on the local FLU.
{_DontCare, _S2}=Res = cl_write_public_proj_skip_local_error(
{_DontCare, _S2}=Res = cl_write_public_proj_ignore_written_error(
Epoch, BestProj, S),
Res
end.
@ -1975,7 +1992,7 @@ react_to_env_C300(#projection_v1{epoch_number=_Epoch_newprop}=P_newprop,
react_to_env_C310(P_newprop, S) ->
?REACT(c310),
Epoch = P_newprop#projection_v1.epoch_number,
{WriteRes, S2} = cl_write_public_proj_skip_local_error(Epoch, P_newprop, S),
{WriteRes, S2} = cl_write_public_proj(Epoch, P_newprop, S),
?REACT({c310, ?LINE,
[{newprop, machi_projection:make_summary(P_newprop)},
{write_result, WriteRes}]}),

View file

@ -86,7 +86,7 @@ send_spam_to_everyone(Pid) ->
init([{MyFluName}|Args]) ->
RegName = machi_flu_psup:make_fitness_regname(MyFluName),
register(RegName, self()),
timer:send_interval(1000, dump),
timer:send_interval(5000, dump),
UseSimulatorP = proplists:get_value(use_partition_simulator, Args, false),
{ok, #state{my_flu_name=MyFluName, reg_name=RegName,
partition_simulator_p=UseSimulatorP,
@ -184,8 +184,8 @@ handle_info({adjust_down_list, FLU}, #state{active_unfit=ActiveUnfit}=S) ->
end;
handle_info(dump, #state{my_flu_name=MyFluName,active_unfit=ActiveUnfit,
pending_map=Map}=S) ->
io:format(user, "DUMP: ~w/~w: ~p ~W\n", [MyFluName, self(), ActiveUnfit, map_value(Map), 13]),
%% io:format(user, "DUMP ~w: ~w, ", [MyFluName, ActiveUnfit]),
%% io:format(user, "DUMP: ~w/~w: ~p ~W\n", [MyFluName, self(), ActiveUnfit, map_value(Map), 13]),
io:format(user, "DUMP ~w: ~w, ", [MyFluName, ActiveUnfit]),
{noreply, S};
handle_info(_Info, S) ->
{noreply, S}.

View file

@ -137,6 +137,7 @@ write(PidSpec, ProjType, Proj, Timeout)
is_record(Proj, projection_v1),
is_integer(Proj#projection_v1.epoch_number),
Proj#projection_v1.epoch_number >= 0 ->
testing_sleep_perhaps(),
g_call(PidSpec, {write, ProjType, Proj}, Timeout).
%% @doc Fetch all projection records of type `ProjType'.
@ -176,7 +177,6 @@ set_consistency_mode(PidSpec, CMode)
%%%%%%%%%%%%%%%%%%%%%%%%%%%
g_call(PidSpec, Arg, Timeout) ->
testing_sleep_perhaps(),
LC1 = lclock_get(),
{Res, LC2} = gen_server:call(PidSpec, {Arg, LC1}, Timeout),
lclock_update(LC2),
@ -437,7 +437,9 @@ testing_sleep_perhaps() ->
try
[{_,Max}] = ets:lookup(?TEST_ETS_TABLE, projection_store_sleep_time),
MSec = random:uniform(Max),
timer:sleep(MSec * 5),
io:format(user, "{", []),
timer:sleep(MSec),
io:format(user, "}", []),
ok
catch _X:_Y ->
ok

View file

@ -201,8 +201,6 @@ convergence_demo_testfun(NumFLUs, MgrOpts0) ->
false -> ap_mode
end,
%% ets:insert(?TEST_ETS_TABLE, {projection_store_sleep_time, 25}),
try
[{_, Ma}|_] = MgrNamez,
{ok, P1} = ?MGR:test_calc_projection(Ma, false),
@ -220,6 +218,7 @@ convergence_demo_testfun(NumFLUs, MgrOpts0) ->
random:seed(now()),
[begin
erlang:yield(),
perhaps_adjust_pstore_sleep(),
S_max_rand = random:uniform(
S_max + 1),
%% io:format(user, "{t}", []),
@ -251,14 +250,16 @@ convergence_demo_testfun(NumFLUs, MgrOpts0) ->
[a_chmgr,b_chmgr,c_chmgr,d_chmgr,e_chmgr,f_chmgr,g_chmgr,h_chmgr,i_chmgr,j_chmgr] ++
[a_pstore,b_pstore,c_pstore,d_pstore,e_pstore,f_pstore,g_pstore,h_pstore,i_pstore,j_pstore] ++
[a_fitness,b_fitness,c_fitness,d_fitness,e_fitness,f_fitness,g_fitness,h_fitness,i_fitness,j_fitness] ],
[begin
timer:sleep(2*1000),
case whereis(XX) of
undefined -> ok;
XXPid -> {_, XXbin} = process_info(XXPid, backtrace),
io:format(user, "BACK ~w: ~w\n~s\n", [XX, time(), XXbin])
end
end || XX <- [a_pstore,b_pstore,c_pstore,d_pstore,e_pstore,f_pstore,g_pstore,h_pstore,i_pstore,j_pstore], _ <- lists:seq(1,5)],
[begin
[begin
case whereis(XX) of
undefined -> ok;
XXPid -> {_, XXbin} = process_info(XXPid, backtrace),
io:format(user, "BACK ~w: ~w\n~s\n", [XX, time(), XXbin])
end
end || XX <- [a_pstore,b_pstore,c_pstore,d_pstore,e_pstore,f_pstore,g_pstore,h_pstore,i_pstore,j_pstore] ],
timer:sleep(20)
end || _ <- lists:seq(1,30)],
exit({icky_timeout, M_name})
end || {ThePid,M_name} <- Pids]
end,
@ -421,19 +422,19 @@ make_partition_list(All_list) ->
A <- All_list, B <- All_list, A /= B,
C <- All_list, D <- All_list, C /= D,
X /= A, X /= C, A /= C],
_X_Ys4 = [[{X,Y}, {A,B}, {C,D}, {E,F}] ||
X <- All_list, Y <- All_list, X /= Y,
A <- All_list, B <- All_list, A /= B,
C <- All_list, D <- All_list, C /= D,
E <- All_list, F <- All_list, E /= F,
X /= A, X /= C, X /= E, A /= C, A /= E,
C /= E],
%% _X_Ys4 = [[{X,Y}, {A,B}, {C,D}, {E,F}] ||
%% X <- All_list, Y <- All_list, X /= Y,
%% A <- All_list, B <- All_list, A /= B,
%% C <- All_list, D <- All_list, C /= D,
%% E <- All_list, F <- All_list, E /= F,
%% X /= A, X /= C, X /= E, A /= C, A /= E,
%% C /= E],
%% Concat = _X_Ys1,
%% Concat = _X_Ys2,
%% Concat = _X_Ys1 ++ _X_Ys2,
%% %% Concat = _X_Ys3,
%% Concat = _X_Ys1 ++ _X_Ys2 ++ _X_Ys3,
Concat = _X_Ys1 ++ _X_Ys2 ++ _X_Ys3 ++ _X_Ys4,
Concat = _X_Ys1 ++ _X_Ys2 ++ _X_Ys3,
%% Concat = _X_Ys1 ++ _X_Ys2 ++ _X_Ys3 ++ _X_Ys4,
NoPartitions = lists:duplicate(trunc(length(Concat) * 0.1), []),
uniq_reverse(random_sort(lists:usort([lists:sort(L) || L <- Concat])
++ NoPartitions)).
@ -870,6 +871,15 @@ uniq_c([H|T], Count, H) ->
uniq_c([H|T], Count, Last) ->
[{Count, Last}|uniq_c(T, 1, H)].
perhaps_adjust_pstore_sleep() ->
try
{ok, Bin} = file:read_file("/tmp/pstore_sleep_msec"),
{MSec,_} = string:to_integer(binary_to_list(Bin)),
ets:insert(?TEST_ETS_TABLE, {projection_store_sleep_time, MSec})
catch _:_ ->
ok
end.
%% MaxIters = NumFLUs * (NumFLUs + 1) * 6,
%% Stable = fun(S_Namez) ->