WIP: small refactoring to prepare for fitness server 'use' of partition simulator
This commit is contained in:
parent
35e8efeb96
commit
efe6ce7894
2 changed files with 18 additions and 11 deletions
|
@ -125,6 +125,7 @@
|
||||||
-export([test_calc_projection/2,
|
-export([test_calc_projection/2,
|
||||||
test_write_public_projection/2,
|
test_write_public_projection/2,
|
||||||
test_read_latest_public_projection/2]).
|
test_read_latest_public_projection/2]).
|
||||||
|
-export([perhaps_call/5]). % for partition simulator use w/machi_fitness
|
||||||
|
|
||||||
-ifdef(EQC).
|
-ifdef(EQC).
|
||||||
-include_lib("eqc/include/eqc.hrl").
|
-include_lib("eqc/include/eqc.hrl").
|
||||||
|
@ -2241,9 +2242,10 @@ sanitize_repair_state(S) ->
|
||||||
|
|
||||||
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
||||||
|
|
||||||
perhaps_call_t(S, Partitions, FLU, DoIt) ->
|
perhaps_call_t(#ch_mgr{name=MyName}=S, Partitions, FLU, DoIt) ->
|
||||||
try
|
try
|
||||||
perhaps_call(S, Partitions, FLU, DoIt)
|
ProxyPid = proxy_pid(FLU, S),
|
||||||
|
perhaps_call(ProxyPid, MyName, Partitions, FLU, DoIt)
|
||||||
catch
|
catch
|
||||||
exit:timeout ->
|
exit:timeout ->
|
||||||
update_remember_down_list(FLU),
|
update_remember_down_list(FLU),
|
||||||
|
@ -2253,8 +2255,7 @@ perhaps_call_t(S, Partitions, FLU, DoIt) ->
|
||||||
{error, partition}
|
{error, partition}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
perhaps_call(#ch_mgr{name=MyName}=S, Partitions, FLU, DoIt) ->
|
perhaps_call(ProxyPid, MyName, Partitions, FLU, DoIt) ->
|
||||||
ProxyPid = proxy_pid(FLU, S),
|
|
||||||
RemoteFLU_p = FLU /= MyName,
|
RemoteFLU_p = FLU /= MyName,
|
||||||
erase(bad_sock),
|
erase(bad_sock),
|
||||||
case RemoteFLU_p andalso lists:member({MyName, FLU}, Partitions) of
|
case RemoteFLU_p andalso lists:member({MyName, FLU}, Partitions) of
|
||||||
|
|
|
@ -50,7 +50,8 @@
|
||||||
admin_down=[] :: list({term(),term()}),
|
admin_down=[] :: list({term(),term()}),
|
||||||
members_dict=orddict:new() :: orddict:orddict(),
|
members_dict=orddict:new() :: orddict:orddict(),
|
||||||
active_unfit=[] :: list(),
|
active_unfit=[] :: list(),
|
||||||
pending_map=?MAP:new() :: ?MAP:riak_dt_map()
|
pending_map=?MAP:new() :: ?MAP:riak_dt_map(),
|
||||||
|
partition_simulator=false :: boolean()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
start_link(Args) ->
|
start_link(Args) ->
|
||||||
|
@ -79,6 +80,7 @@ send_fitness_update_spam(Pid, FromName, Dict) ->
|
||||||
init([{MyFluName}|_Args]) ->
|
init([{MyFluName}|_Args]) ->
|
||||||
RegName = machi_flu_psup:make_fitness_regname(MyFluName),
|
RegName = machi_flu_psup:make_fitness_regname(MyFluName),
|
||||||
register(RegName, self()),
|
register(RegName, self()),
|
||||||
|
io:format(user, "Starting fitness for ~w, args ~p\n", [MyFluName, _Args]),
|
||||||
timer:send_interval(1000, dump),
|
timer:send_interval(1000, dump),
|
||||||
{ok, #state{my_flu_name=MyFluName, reg_name=RegName}}.
|
{ok, #state{my_flu_name=MyFluName, reg_name=RegName}}.
|
||||||
|
|
||||||
|
@ -208,7 +210,7 @@ store_in_map(Map, Name, Now, Down, AdminDown, Props) ->
|
||||||
map_set(Name, Map, Name, Val).
|
map_set(Name, Map, Name, Val).
|
||||||
|
|
||||||
send_spam(NewMap, DontSendList, MembersDict, #state{my_flu_name=MyFluName}) ->
|
send_spam(NewMap, DontSendList, MembersDict, #state{my_flu_name=MyFluName}) ->
|
||||||
Send = fun(_FLU, #p_srvr{address=Host, port=TcpPort}) ->
|
Send = fun(FLU, #p_srvr{address=Host, port=TcpPort}) ->
|
||||||
SpamProj = machi_projection:update_checksum(
|
SpamProj = machi_projection:update_checksum(
|
||||||
#projection_v1{epoch_number=?SPAM_PROJ_EPOCH,
|
#projection_v1{epoch_number=?SPAM_PROJ_EPOCH,
|
||||||
author_server=MyFluName,
|
author_server=MyFluName,
|
||||||
|
@ -223,8 +225,7 @@ send_spam(NewMap, DontSendList, MembersDict, #state{my_flu_name=MyFluName}) ->
|
||||||
members_dict=[] }),
|
members_dict=[] }),
|
||||||
%% Best effort, don't care about failure.
|
%% Best effort, don't care about failure.
|
||||||
spawn(fun() ->
|
spawn(fun() ->
|
||||||
machi_flu1_client:write_projection(
|
send_projection(FLU, Host, TcpPort, SpamProj)
|
||||||
Host, TcpPort, public, SpamProj)
|
|
||||||
end)
|
end)
|
||||||
end,
|
end,
|
||||||
F = fun(FLU, P_srvr, Acc) ->
|
F = fun(FLU, P_srvr, Acc) ->
|
||||||
|
@ -236,9 +237,15 @@ send_spam(NewMap, DontSendList, MembersDict, #state{my_flu_name=MyFluName}) ->
|
||||||
[FLU|Acc]
|
[FLU|Acc]
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
Sent = orddict:fold(F, [], MembersDict),
|
_Sent = orddict:fold(F, [], MembersDict),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
send_projection(_FLU, Host, TcpPort, SpamProj) ->
|
||||||
|
%% At the moment, we're using utterly-temporary-hack method of tunneling
|
||||||
|
%% our messages through the write_projection API. Eventually the PB
|
||||||
|
%% API should be expanded to accomodate this new fitness service.
|
||||||
|
machi_flu1_client:write_projection(Host, TcpPort, public, SpamProj).
|
||||||
|
|
||||||
calc_unfit(All_list, HosedAnnotations) ->
|
calc_unfit(All_list, HosedAnnotations) ->
|
||||||
G = digraph:new(),
|
G = digraph:new(),
|
||||||
[digraph:add_vertex(G, V) || V <- All_list],
|
[digraph:add_vertex(G, V) || V <- All_list],
|
||||||
|
@ -309,8 +316,7 @@ schedule_adjust_messages(FLU_list) ->
|
||||||
finish_admin_down(Time, Down, NewAdminDown, Props,
|
finish_admin_down(Time, Down, NewAdminDown, Props,
|
||||||
#state{my_flu_name=MyFluName, local_down=Down,
|
#state{my_flu_name=MyFluName, local_down=Down,
|
||||||
pending_map=OldMap, members_dict=MembersDict}=S) ->
|
pending_map=OldMap, members_dict=MembersDict}=S) ->
|
||||||
NewMap = store_in_map(OldMap, MyFluName, erlang:now(), Down,
|
NewMap = store_in_map(OldMap, MyFluName, Time, Down, NewAdminDown, Props),
|
||||||
NewAdminDown, Props),
|
|
||||||
S2 = S#state{admin_down=NewAdminDown},
|
S2 = S#state{admin_down=NewAdminDown},
|
||||||
do_map_change(NewMap, [MyFluName], MembersDict, S2).
|
do_map_change(NewMap, [MyFluName], MembersDict, S2).
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue