WIP: Stand back, I'm going to try math! ... It works, {redacted}!
This commit is contained in:
parent
27891bc5e9
commit
38ea36fc1c
1 changed files with 45 additions and 27 deletions
|
@ -45,7 +45,7 @@
|
||||||
reg_name :: atom(),
|
reg_name :: atom(),
|
||||||
local_down=[] :: list(),
|
local_down=[] :: list(),
|
||||||
members_dict=orddict:new() :: orddict:orddict(),
|
members_dict=orddict:new() :: orddict:orddict(),
|
||||||
active_down=[] :: list(),
|
active_unfit=[] :: list(),
|
||||||
pending_map=?MAP:new() :: ?MAP:riak_dt_map()
|
pending_map=?MAP:new() :: ?MAP:riak_dt_map()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
|
@ -69,11 +69,10 @@ init([{MyFluName}|_Args]) ->
|
||||||
register(RegName, self()),
|
register(RegName, self()),
|
||||||
{ok, #state{my_flu_name=MyFluName, reg_name=RegName}}.
|
{ok, #state{my_flu_name=MyFluName, reg_name=RegName}}.
|
||||||
|
|
||||||
handle_call({get_unfit_list}, _From, S) ->
|
handle_call({get_unfit_list}, _From, #state{active_unfit=ActiveUnfit}=S) ->
|
||||||
_Unfit = make_unfit_list(S),
|
Reply = ActiveUnfit,
|
||||||
IncompleteAnswer = S#state.local_down,
|
io:format(user, "Unfit answer @ ~w: ~p\n", [S#state.my_flu_name, ActiveUnfit]),
|
||||||
io:format(user, "Unfit incomplete answer @ ~w: ~p\n", [S#state.my_flu_name, IncompleteAnswer]),
|
{reply, Reply, S};
|
||||||
{reply, IncompleteAnswer, S};
|
|
||||||
handle_call({update_local_down_list, Down, MembersDict}, _From,
|
handle_call({update_local_down_list, Down, MembersDict}, _From,
|
||||||
#state{my_flu_name=MyFluName, pending_map=OldMap,
|
#state{my_flu_name=MyFluName, pending_map=OldMap,
|
||||||
local_down=OldDown, members_dict=OldMembersDict}=S) ->
|
local_down=OldDown, members_dict=OldMembersDict}=S) ->
|
||||||
|
@ -96,6 +95,21 @@ handle_call(_Request, _From, S) ->
|
||||||
handle_cast(_Msg, S) ->
|
handle_cast(_Msg, S) ->
|
||||||
{noreply, S}.
|
{noreply, S}.
|
||||||
|
|
||||||
|
handle_info({adjust_down_list, FLU}, #state{active_unfit=ActiveUnfit}=S) ->
|
||||||
|
io:format(user, "ADJUST: ~w: {adjust_down_list, ~w}\n", [S#state.my_flu_name,FLU]),
|
||||||
|
NewUnfit = make_unfit_list(S),
|
||||||
|
Added_to_new = NewUnfit -- ActiveUnfit,
|
||||||
|
Dropped_from_new = ActiveUnfit -- NewUnfit,
|
||||||
|
case {lists:member(FLU,Added_to_new), lists:member(FLU,Dropped_from_new)} of
|
||||||
|
{true, true} ->
|
||||||
|
error({bad, ?MODULE, ?LINE, FLU, ActiveUnfit, NewUnfit});
|
||||||
|
{true, false} ->
|
||||||
|
{noreply, S#state{active_unfit=lists:usort(ActiveUnfit ++ [FLU])}};
|
||||||
|
{false, true} ->
|
||||||
|
{noreply, S#state{active_unfit=ActiveUnfit -- [FLU]}};
|
||||||
|
{false, false} ->
|
||||||
|
{noreply, S}
|
||||||
|
end;
|
||||||
handle_info(_Info, S) ->
|
handle_info(_Info, S) ->
|
||||||
{noreply, S}.
|
{noreply, S}.
|
||||||
|
|
||||||
|
@ -107,7 +121,7 @@ code_change(_OldVsn, S, _Extra) ->
|
||||||
|
|
||||||
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
||||||
|
|
||||||
make_unfit_list(S) ->
|
make_unfit_list(#state{members_dict=MembersDict}=S) ->
|
||||||
Now = erlang:now(),
|
Now = erlang:now(),
|
||||||
F = fun({Server, {UpdateTime, DownList, _Props}}, Acc) ->
|
F = fun({Server, {UpdateTime, DownList, _Props}}, Acc) ->
|
||||||
case timer:now_diff(Now, UpdateTime) div (1000*1000) of
|
case timer:now_diff(Now, UpdateTime) div (1000*1000) of
|
||||||
|
@ -118,20 +132,15 @@ make_unfit_list(S) ->
|
||||||
[Probs|Acc]
|
[Probs|Acc]
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
QQ2 = (catch lists:flatten(map_fold(F, [], S#state.pending_map))),
|
Problems = (catch lists:flatten(map_fold(F, [], S#state.pending_map))),
|
||||||
io:format(user, "QQ2 ~w: ~p\n", [S#state.my_flu_name, QQ2]),
|
All_list = [K || {K,_V} <- orddict:to_list(MembersDict)],
|
||||||
{todo_finish, QQ2}.
|
Unfit = calc_unfit(All_list, Problems),
|
||||||
|
Unfit.
|
||||||
|
|
||||||
store_in_map(Map, Name, Now, Down, Props) ->
|
store_in_map(Map, Name, Now, Down, Props) ->
|
||||||
Val = {Now, Down, Props},
|
Val = {Now, Down, Props},
|
||||||
map_set(Name, Map, Name, Val).
|
map_set(Name, Map, Name, Val).
|
||||||
|
|
||||||
digraph_magic(All_list, HosedAnnotations) ->
|
|
||||||
G = digraph:new(),
|
|
||||||
[digraph:add_vertex(G, V) || V <- All_list],
|
|
||||||
[digraph:add_edge(G, V1, V2) || {V1, problem_with, V2} <- HosedAnnotations],
|
|
||||||
calc_magic_down(lists:sort(digraph:vertices(G)), G).
|
|
||||||
|
|
||||||
send_spam(NewMap, DontSendList, MembersDict, #state{my_flu_name=MyFluName}) ->
|
send_spam(NewMap, DontSendList, MembersDict, #state{my_flu_name=MyFluName}) ->
|
||||||
Send = fun(FLU, #p_srvr{address=Host, port=TcpPort}) ->
|
Send = fun(FLU, #p_srvr{address=Host, port=TcpPort}) ->
|
||||||
SpamProj = machi_projection:update_checksum(
|
SpamProj = machi_projection:update_checksum(
|
||||||
|
@ -166,25 +175,31 @@ send_spam(NewMap, DontSendList, MembersDict, #state{my_flu_name=MyFluName}) ->
|
||||||
io:format(user, "send_spam: ~w: sent to ~w\n", [MyFluName, Sent]),
|
io:format(user, "send_spam: ~w: sent to ~w\n", [MyFluName, Sent]),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
calc_magic_down([], G) ->
|
calc_unfit(All_list, HosedAnnotations) ->
|
||||||
|
G = digraph:new(),
|
||||||
|
[digraph:add_vertex(G, V) || V <- All_list],
|
||||||
|
[digraph:add_edge(G, V1, V2) || {V1, problem_with, V2} <- HosedAnnotations],
|
||||||
|
calc_unfit2(lists:sort(digraph:vertices(G)), G).
|
||||||
|
|
||||||
|
calc_unfit2([], G) ->
|
||||||
digraph:delete(G),
|
digraph:delete(G),
|
||||||
[];
|
[];
|
||||||
calc_magic_down([H|T], G) ->
|
calc_unfit2([H|T], G) ->
|
||||||
case digraph:in_degree(G, H) of
|
case digraph:in_degree(G, H) of
|
||||||
0 ->
|
0 ->
|
||||||
calc_magic_down(T, G);
|
calc_unfit2(T, G);
|
||||||
1 ->
|
1 ->
|
||||||
Neighbors = digraph:in_neighbours(G, H),
|
Neighbors = digraph:in_neighbours(G, H),
|
||||||
case [V || V <- Neighbors, digraph:in_degree(G, V) == 1] of
|
case [V || V <- Neighbors, digraph:in_degree(G, V) == 1] of
|
||||||
[AlsoOne|_] ->
|
[AlsoOne|_] ->
|
||||||
%% TODO: be smarter here about the choice of which is down.
|
%% TODO: be smarter here about the choice of which is down.
|
||||||
[H|calc_magic_down(T -- [AlsoOne], G)];
|
[H|calc_unfit2(T -- [AlsoOne], G)];
|
||||||
[] ->
|
[] ->
|
||||||
%% H is "on the end", e.g. 1-2-1, so it's OK.
|
%% H is "on the end", e.g. 1-2-1, so it's OK.
|
||||||
calc_magic_down(T, G)
|
calc_unfit2(T, G)
|
||||||
end;
|
end;
|
||||||
N when N > 1 ->
|
N when N > 1 ->
|
||||||
[H|calc_magic_down(T, G)]
|
[H|calc_unfit2(T, G)]
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_incoming_spam(Author, Map,
|
do_incoming_spam(Author, Map,
|
||||||
|
@ -205,13 +220,16 @@ do_incoming_spam(Author, Map,
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_map_change(NewMap, DontSendList, MembersDict,
|
do_map_change(NewMap, DontSendList, MembersDict,
|
||||||
#state{my_flu_name=MyFluName, pending_map=OldMap}=S) ->
|
#state{my_flu_name=_MyFluName, pending_map=OldMap}=S) ->
|
||||||
send_spam(NewMap, DontSendList, MembersDict, S),
|
send_spam(NewMap, DontSendList, MembersDict, S),
|
||||||
ChangedServers = find_changed_servers(OldMap, NewMap),
|
ChangedServers = find_changed_servers(OldMap, NewMap),
|
||||||
_OldMapV = map_value(OldMap),
|
DelayTimeMS = 300, % TODO make configurable!
|
||||||
_MapV = map_value(NewMap),
|
[erlang:send_after(DelayTimeMS, self(), {adjust_down_list, FLU}) ||
|
||||||
io:format(user, "TODO: ~w async tick trigger/scheduling... ~w for:\n"
|
FLU <- ChangedServers],
|
||||||
" ~p\n ~p\n", [MyFluName, ChangedServers, _OldMapV, _MapV]),
|
%% _OldMapV = map_value(OldMap),
|
||||||
|
%% _MapV = map_value(NewMap),
|
||||||
|
%% io:format(user, "TODO: ~w async tick trigger/scheduling... ~w for:\n"
|
||||||
|
%% " ~p\n ~p\n",[_MyFluName,ChangedServers,_OldMapV,_MapV]),
|
||||||
S#state{pending_map=NewMap, members_dict=MembersDict}.
|
S#state{pending_map=NewMap, members_dict=MembersDict}.
|
||||||
|
|
||||||
find_changed_servers(OldMap, NewMap) ->
|
find_changed_servers(OldMap, NewMap) ->
|
||||||
|
|
Loading…
Reference in a new issue