Add partition simulator support to fitness service

This commit is contained in:
Scott Lystig Fritchie 2015-09-11 16:45:29 +09:00
parent efe6ce7894
commit 8df7d58365
3 changed files with 46 additions and 15 deletions

View file

@ -125,7 +125,9 @@
-export([test_calc_projection/2,
test_write_public_projection/2,
test_read_latest_public_projection/2]).
-export([perhaps_call/5]). % for partition simulator use w/machi_fitness
-export([perhaps_call/5, % for partition simulator use w/machi_fitness
init_remember_down_list/0, update_remember_down_list/1,
get_remember_down_list/0]).
-ifdef(EQC).
-include_lib("eqc/include/eqc.hrl").
@ -2243,9 +2245,12 @@ sanitize_repair_state(S) ->
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
perhaps_call_t(#ch_mgr{name=MyName}=S, Partitions, FLU, DoIt) ->
ProxyPid = proxy_pid(FLU, S),
perhaps_call(ProxyPid, MyName, Partitions, FLU, DoIt).
perhaps_call(ProxyPid, MyName, Partitions, FLU, DoIt) ->
try
ProxyPid = proxy_pid(FLU, S),
perhaps_call(ProxyPid, MyName, Partitions, FLU, DoIt)
perhaps_call2(ProxyPid, MyName, Partitions, FLU, DoIt)
catch
exit:timeout ->
update_remember_down_list(FLU),
@ -2255,7 +2260,7 @@ perhaps_call_t(#ch_mgr{name=MyName}=S, Partitions, FLU, DoIt) ->
{error, partition}
end.
perhaps_call(ProxyPid, MyName, Partitions, FLU, DoIt) ->
perhaps_call2(ProxyPid, MyName, Partitions, FLU, DoIt) ->
RemoteFLU_p = FLU /= MyName,
erase(bad_sock),
case RemoteFLU_p andalso lists:member({MyName, FLU}, Partitions) of
@ -2287,8 +2292,8 @@ init_remember_down_list() ->
put(remember_down_list, []).
update_remember_down_list(FLU) ->
put(remember_down_list,
lists:usort([FLU|get_remember_down_list()])).
catch put(remember_down_list,
lists:usort([FLU|get_remember_down_list()])).
get_remember_down_list() ->
get(remember_down_list).

View file

@ -51,7 +51,7 @@
members_dict=orddict:new() :: orddict:orddict(),
active_unfit=[] :: list(),
pending_map=?MAP:new() :: ?MAP:riak_dt_map(),
partition_simulator=false :: boolean()
partition_simulator_p :: boolean()
}).
start_link(Args) ->
@ -77,12 +77,13 @@ send_fitness_update_spam(Pid, FromName, Dict) ->
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
init([{MyFluName}|_Args]) ->
init([{MyFluName}|Args]) ->
RegName = machi_flu_psup:make_fitness_regname(MyFluName),
register(RegName, self()),
io:format(user, "Starting fitness for ~w, args ~p\n", [MyFluName, _Args]),
timer:send_interval(1000, dump),
{ok, #state{my_flu_name=MyFluName, reg_name=RegName}}.
UseSimulatorP = proplists:get_value(use_partition_simulator, Args, false),
{ok, #state{my_flu_name=MyFluName, reg_name=RegName,
partition_simulator_p=UseSimulatorP}}.
handle_call({get_unfit_list}, _From, #state{active_unfit=ActiveUnfit}=S) ->
Reply = ActiveUnfit,
@ -209,7 +210,7 @@ store_in_map(Map, Name, Now, Down, AdminDown, Props) ->
Val = {Now, Down, AdminDownServers, Props ++ AdminDownProps},
map_set(Name, Map, Name, Val).
send_spam(NewMap, DontSendList, MembersDict, #state{my_flu_name=MyFluName}) ->
send_spam(NewMap, DontSendList, MembersDict, #state{my_flu_name=MyFluName}=S) ->
Send = fun(FLU, #p_srvr{address=Host, port=TcpPort}) ->
SpamProj = machi_projection:update_checksum(
#projection_v1{epoch_number=?SPAM_PROJ_EPOCH,
@ -225,7 +226,7 @@ send_spam(NewMap, DontSendList, MembersDict, #state{my_flu_name=MyFluName}) ->
members_dict=[] }),
%% Best effort, don't care about failure.
spawn(fun() ->
send_projection(FLU, Host, TcpPort, SpamProj)
send_projection(FLU, Host, TcpPort, SpamProj, S)
end)
end,
F = fun(FLU, P_srvr, Acc) ->
@ -240,11 +241,35 @@ send_spam(NewMap, DontSendList, MembersDict, #state{my_flu_name=MyFluName}) ->
_Sent = orddict:fold(F, [], MembersDict),
ok.
send_projection(_FLU, Host, TcpPort, SpamProj) ->
send_projection(FLU, Host, TcpPort, SpamProj,
#state{my_flu_name=MyFluName, members_dict=MembersDict,
partition_simulator_p=SimulatorP}) ->
%% At the moment, we're using utterly-temporary-hack method of tunneling
%% our messages through the write_projection API. Eventually the PB
%% API should be expanded to accomodate this new fitness service.
machi_flu1_client:write_projection(Host, TcpPort, public, SpamProj).
DoIt = fun(_ProxyPid) ->
machi_flu1_client:write_projection(Host, TcpPort,
public, SpamProj)
end,
ProxyPidPlaceholder = proxy_pid_unused,
if SimulatorP ->
AllMembers = [K || {K,_V} <- orddict:to_list(MembersDict)],
{Partitions, _Islands} = machi_partition_simulator:get(AllMembers),
machi_chain_manager1:init_remember_down_list(),
Res = machi_chain_manager1:perhaps_call(ProxyPidPlaceholder,
MyFluName,
Partitions, FLU, DoIt),
%% case machi_chain_manager1:get_remember_down_list() of
%% [] ->
%% ok;
%% _ ->
%% io:format(user, "fitness error ~w -> ~w\n",
%% [MyFluName, FLU])
%% end,
Res;
true ->
DoIt(ProxyPidPlaceholder)
end.
calc_unfit(All_list, HosedAnnotations) ->
G = digraph:new(),

View file

@ -272,7 +272,8 @@ convergence_demo_testfun(NumFLUs, MgrOpts0) ->
io:format(user, "\nSweet, private projections are stable\n", []),
io:format(user, "\t~P\n", [get(stable), 14]),
io:format(user, "Rolling sanity check ... ", []),
MaxFiles = 800,
MaxFiles = 130, io:format(user, "Sweet, NOTICE: alternative MaxFiles=~w\n", [MaxFiles]),
%% MaxFiles = 800,
PrivProjs = [{Name, begin
{ok, Ps8} = ?FLU_PC:get_all_projections(
FLU, private, infinity),