From efe6ce7894ba86a6f9fa4e6eea7be300f83b02f8 Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Fri, 11 Sep 2015 16:03:49 +0900 Subject: [PATCH] WIP: small refactoring to prepare for fitness server 'use' of partition simulator --- src/machi_chain_manager1.erl | 9 +++++---- src/machi_fitness.erl | 20 +++++++++++++------- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/src/machi_chain_manager1.erl b/src/machi_chain_manager1.erl index 84ed393..d113012 100644 --- a/src/machi_chain_manager1.erl +++ b/src/machi_chain_manager1.erl @@ -125,6 +125,7 @@ -export([test_calc_projection/2, test_write_public_projection/2, test_read_latest_public_projection/2]). +-export([perhaps_call/5]). % for partition simulator use w/machi_fitness -ifdef(EQC). -include_lib("eqc/include/eqc.hrl"). @@ -2241,9 +2242,10 @@ sanitize_repair_state(S) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -perhaps_call_t(S, Partitions, FLU, DoIt) -> +perhaps_call_t(#ch_mgr{name=MyName}=S, Partitions, FLU, DoIt) -> try - perhaps_call(S, Partitions, FLU, DoIt) + ProxyPid = proxy_pid(FLU, S), + perhaps_call(ProxyPid, MyName, Partitions, FLU, DoIt) catch exit:timeout -> update_remember_down_list(FLU), @@ -2253,8 +2255,7 @@ perhaps_call_t(S, Partitions, FLU, DoIt) -> {error, partition} end. -perhaps_call(#ch_mgr{name=MyName}=S, Partitions, FLU, DoIt) -> - ProxyPid = proxy_pid(FLU, S), +perhaps_call(ProxyPid, MyName, Partitions, FLU, DoIt) -> RemoteFLU_p = FLU /= MyName, erase(bad_sock), case RemoteFLU_p andalso lists:member({MyName, FLU}, Partitions) of diff --git a/src/machi_fitness.erl b/src/machi_fitness.erl index 9f75b90..e770149 100644 --- a/src/machi_fitness.erl +++ b/src/machi_fitness.erl @@ -50,7 +50,8 @@ admin_down=[] :: list({term(),term()}), members_dict=orddict:new() :: orddict:orddict(), active_unfit=[] :: list(), - pending_map=?MAP:new() :: ?MAP:riak_dt_map() + pending_map=?MAP:new() :: ?MAP:riak_dt_map(), + partition_simulator=false :: boolean() }). start_link(Args) -> @@ -79,6 +80,7 @@ send_fitness_update_spam(Pid, FromName, Dict) -> init([{MyFluName}|_Args]) -> RegName = machi_flu_psup:make_fitness_regname(MyFluName), register(RegName, self()), +io:format(user, "Starting fitness for ~w, args ~p\n", [MyFluName, _Args]), timer:send_interval(1000, dump), {ok, #state{my_flu_name=MyFluName, reg_name=RegName}}. @@ -208,7 +210,7 @@ store_in_map(Map, Name, Now, Down, AdminDown, Props) -> map_set(Name, Map, Name, Val). send_spam(NewMap, DontSendList, MembersDict, #state{my_flu_name=MyFluName}) -> - Send = fun(_FLU, #p_srvr{address=Host, port=TcpPort}) -> + Send = fun(FLU, #p_srvr{address=Host, port=TcpPort}) -> SpamProj = machi_projection:update_checksum( #projection_v1{epoch_number=?SPAM_PROJ_EPOCH, author_server=MyFluName, @@ -223,8 +225,7 @@ send_spam(NewMap, DontSendList, MembersDict, #state{my_flu_name=MyFluName}) -> members_dict=[] }), %% Best effort, don't care about failure. spawn(fun() -> - machi_flu1_client:write_projection( - Host, TcpPort, public, SpamProj) + send_projection(FLU, Host, TcpPort, SpamProj) end) end, F = fun(FLU, P_srvr, Acc) -> @@ -236,9 +237,15 @@ send_spam(NewMap, DontSendList, MembersDict, #state{my_flu_name=MyFluName}) -> [FLU|Acc] end end, - Sent = orddict:fold(F, [], MembersDict), + _Sent = orddict:fold(F, [], MembersDict), ok. +send_projection(_FLU, Host, TcpPort, SpamProj) -> + %% At the moment, we're using utterly-temporary-hack method of tunneling + %% our messages through the write_projection API. Eventually the PB + %% API should be expanded to accomodate this new fitness service. + machi_flu1_client:write_projection(Host, TcpPort, public, SpamProj). + calc_unfit(All_list, HosedAnnotations) -> G = digraph:new(), [digraph:add_vertex(G, V) || V <- All_list], @@ -309,8 +316,7 @@ schedule_adjust_messages(FLU_list) -> finish_admin_down(Time, Down, NewAdminDown, Props, #state{my_flu_name=MyFluName, local_down=Down, pending_map=OldMap, members_dict=MembersDict}=S) -> - NewMap = store_in_map(OldMap, MyFluName, erlang:now(), Down, - NewAdminDown, Props), + NewMap = store_in_map(OldMap, MyFluName, Time, Down, NewAdminDown, Props), S2 = S#state{admin_down=NewAdminDown}, do_map_change(NewMap, [MyFluName], MembersDict, S2).