diff --git a/src/machi_fitness.erl b/src/machi_fitness.erl index 3d4f527..03127a2 100644 --- a/src/machi_fitness.erl +++ b/src/machi_fitness.erl @@ -51,6 +51,7 @@ local_down=[] :: list(), admin_down=[] :: list({term(),term()}), members_dict=orddict:new() :: orddict:orddict(), + proxies_dict=orddict:new() :: orddict:orddict(), active_unfit=[] :: list(), pending_map=?MAP:new() :: ?MAP:riak_dt_map(), partition_simulator_p :: boolean() @@ -250,24 +251,26 @@ send_spam(NewMap, DontSendList, MembersDict, #state{my_flu_name=MyFluName}=S) -> _Sent = orddict:fold(F, [], MembersDict), ok. -send_projection(FLU, Host, TcpPort, SpamProj, +send_projection(FLU, _Host, _TcpPort, SpamProj, #state{my_flu_name=MyFluName, members_dict=MembersDict, - partition_simulator_p=SimulatorP}) -> + partition_simulator_p=SimulatorP}=S) -> %% At the moment, we're using utterly-temporary-hack method of tunneling %% our messages through the write_projection API. Eventually the PB %% API should be expanded to accomodate this new fitness service. - DoIt = fun(_ProxyPid) -> - machi_flu1_client:write_projection(Host, TcpPort, - public, SpamProj) + %% This is "best effort" only, use catch to ignore failures. + ProxyPid = (catch proxy_pid(FLU, S)), + DoIt = fun(_ArgIgnored) -> + machi_proxy_flu1_client:write_projection(ProxyPid, + public, SpamProj) end, ProxyPidPlaceholder = proxy_pid_unused, if SimulatorP -> AllMembers = [K || {K,_V} <- orddict:to_list(MembersDict)], {Partitions, _Islands} = machi_partition_simulator:get(AllMembers), machi_chain_manager1:init_remember_down_list(), - Res = machi_chain_manager1:perhaps_call(ProxyPidPlaceholder, - MyFluName, - Partitions, FLU, DoIt), + Res = (catch machi_chain_manager1:perhaps_call(ProxyPidPlaceholder, + MyFluName, + Partitions, FLU, DoIt)), %% case machi_chain_manager1:get_remember_down_list() of %% [] -> %% ok; @@ -277,9 +280,12 @@ send_projection(FLU, Host, TcpPort, SpamProj, %% end, Res; true -> - DoIt(ProxyPidPlaceholder) + (catch DoIt(ProxyPidPlaceholder)) end. +proxy_pid(Name, #state{proxies_dict=ProxiesDict}) -> + orddict:fetch(Name, ProxiesDict). + calc_unfit(All_list, HosedAnnotations) -> G = digraph:new(), [digraph:add_vertex(G, V) || V <- All_list], @@ -342,7 +348,17 @@ do_map_change(NewMap, DontSendList, MembersDict, %% _MapV = map_value(NewMap), %% io:format(user, "TODO: ~w async tick trigger/scheduling... ~w for:\n" %% " ~p\n ~p\n",[_MyFluName,ChangedServers,_OldMapV,_MapV]), - S#state{pending_map=NewMap, members_dict=MembersDict}. + S2 = perhaps_adjust_members_proxies_dicts(MembersDict, S), + S2#state{pending_map=NewMap}. + +perhaps_adjust_members_proxies_dicts(SameMembersDict, + #state{members_dict=SameMembersDict}=S) -> + S; +perhaps_adjust_members_proxies_dicts(MembersDict, + #state{proxies_dict=OldProxiesDict}=S) -> + _ = machi_proxy_flu1_client:stop_proxies(OldProxiesDict), + ProxiesDict = machi_proxy_flu1_client:start_proxies(MembersDict), + S#state{members_dict=MembersDict, proxies_dict=ProxiesDict}. find_changed_servers(OldMap, NewMap, _MyFluName) -> AddBad = fun({_Who, {_Time, BadList, AdminDown, _Props}}, Acc) -> diff --git a/src/machi_flu1_client.erl b/src/machi_flu1_client.erl index 68ceef4..909d9b3 100644 --- a/src/machi_flu1_client.erl +++ b/src/machi_flu1_client.erl @@ -687,7 +687,7 @@ filter_sock_error_result(Error) -> %%%%%%%%%%%%%%%%%%%%%%%%%%% -w_connect(#p_srvr{proto_mod=?MODULE, address=Host, port=Port, props=Props})-> +w_connect(#p_srvr{proto_mod=?MODULE, address=Host, port=Port, props=Props}=_P)-> try case proplists:get_value(session_proto, Props, tcp) of tcp -> @@ -705,7 +705,7 @@ w_connect(#p_srvr{proto_mod=?MODULE, address=Host, port=Port, props=Props})-> {w,ssl,SslSock} end catch - _:_ -> + _X:_Y -> undefined end.