diff --git a/src/machi_fitness.erl b/src/machi_fitness.erl index c56c244..735aaa9 100644 --- a/src/machi_fitness.erl +++ b/src/machi_fitness.erl @@ -45,7 +45,7 @@ reg_name :: atom(), local_down=[] :: list(), members_dict=orddict:new() :: orddict:orddict(), - active_down=[] :: list(), + active_unfit=[] :: list(), pending_map=?MAP:new() :: ?MAP:riak_dt_map() }). @@ -69,11 +69,10 @@ init([{MyFluName}|_Args]) -> register(RegName, self()), {ok, #state{my_flu_name=MyFluName, reg_name=RegName}}. -handle_call({get_unfit_list}, _From, S) -> - _Unfit = make_unfit_list(S), - IncompleteAnswer = S#state.local_down, - io:format(user, "Unfit incomplete answer @ ~w: ~p\n", [S#state.my_flu_name, IncompleteAnswer]), - {reply, IncompleteAnswer, S}; +handle_call({get_unfit_list}, _From, #state{active_unfit=ActiveUnfit}=S) -> + Reply = ActiveUnfit, + io:format(user, "Unfit answer @ ~w: ~p\n", [S#state.my_flu_name, ActiveUnfit]), + {reply, Reply, S}; handle_call({update_local_down_list, Down, MembersDict}, _From, #state{my_flu_name=MyFluName, pending_map=OldMap, local_down=OldDown, members_dict=OldMembersDict}=S) -> @@ -96,6 +95,21 @@ handle_call(_Request, _From, S) -> handle_cast(_Msg, S) -> {noreply, S}. +handle_info({adjust_down_list, FLU}, #state{active_unfit=ActiveUnfit}=S) -> + io:format(user, "ADJUST: ~w: {adjust_down_list, ~w}\n", [S#state.my_flu_name,FLU]), + NewUnfit = make_unfit_list(S), + Added_to_new = NewUnfit -- ActiveUnfit, + Dropped_from_new = ActiveUnfit -- NewUnfit, + case {lists:member(FLU,Added_to_new), lists:member(FLU,Dropped_from_new)} of + {true, true} -> + error({bad, ?MODULE, ?LINE, FLU, ActiveUnfit, NewUnfit}); + {true, false} -> + {noreply, S#state{active_unfit=lists:usort(ActiveUnfit ++ [FLU])}}; + {false, true} -> + {noreply, S#state{active_unfit=ActiveUnfit -- [FLU]}}; + {false, false} -> + {noreply, S} + end; handle_info(_Info, S) -> {noreply, S}. @@ -107,7 +121,7 @@ code_change(_OldVsn, S, _Extra) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -make_unfit_list(S) -> +make_unfit_list(#state{members_dict=MembersDict}=S) -> Now = erlang:now(), F = fun({Server, {UpdateTime, DownList, _Props}}, Acc) -> case timer:now_diff(Now, UpdateTime) div (1000*1000) of @@ -118,20 +132,15 @@ make_unfit_list(S) -> [Probs|Acc] end end, - QQ2 = (catch lists:flatten(map_fold(F, [], S#state.pending_map))), - io:format(user, "QQ2 ~w: ~p\n", [S#state.my_flu_name, QQ2]), - {todo_finish, QQ2}. + Problems = (catch lists:flatten(map_fold(F, [], S#state.pending_map))), + All_list = [K || {K,_V} <- orddict:to_list(MembersDict)], + Unfit = calc_unfit(All_list, Problems), + Unfit. store_in_map(Map, Name, Now, Down, Props) -> Val = {Now, Down, Props}, map_set(Name, Map, Name, Val). -digraph_magic(All_list, HosedAnnotations) -> - G = digraph:new(), - [digraph:add_vertex(G, V) || V <- All_list], - [digraph:add_edge(G, V1, V2) || {V1, problem_with, V2} <- HosedAnnotations], - calc_magic_down(lists:sort(digraph:vertices(G)), G). - send_spam(NewMap, DontSendList, MembersDict, #state{my_flu_name=MyFluName}) -> Send = fun(FLU, #p_srvr{address=Host, port=TcpPort}) -> SpamProj = machi_projection:update_checksum( @@ -166,25 +175,31 @@ send_spam(NewMap, DontSendList, MembersDict, #state{my_flu_name=MyFluName}) -> io:format(user, "send_spam: ~w: sent to ~w\n", [MyFluName, Sent]), ok. -calc_magic_down([], G) -> +calc_unfit(All_list, HosedAnnotations) -> + G = digraph:new(), + [digraph:add_vertex(G, V) || V <- All_list], + [digraph:add_edge(G, V1, V2) || {V1, problem_with, V2} <- HosedAnnotations], + calc_unfit2(lists:sort(digraph:vertices(G)), G). + +calc_unfit2([], G) -> digraph:delete(G), []; -calc_magic_down([H|T], G) -> +calc_unfit2([H|T], G) -> case digraph:in_degree(G, H) of 0 -> - calc_magic_down(T, G); + calc_unfit2(T, G); 1 -> Neighbors = digraph:in_neighbours(G, H), case [V || V <- Neighbors, digraph:in_degree(G, V) == 1] of [AlsoOne|_] -> %% TODO: be smarter here about the choice of which is down. - [H|calc_magic_down(T -- [AlsoOne], G)]; + [H|calc_unfit2(T -- [AlsoOne], G)]; [] -> %% H is "on the end", e.g. 1-2-1, so it's OK. - calc_magic_down(T, G) + calc_unfit2(T, G) end; N when N > 1 -> - [H|calc_magic_down(T, G)] + [H|calc_unfit2(T, G)] end. do_incoming_spam(Author, Map, @@ -205,13 +220,16 @@ do_incoming_spam(Author, Map, end. do_map_change(NewMap, DontSendList, MembersDict, - #state{my_flu_name=MyFluName, pending_map=OldMap}=S) -> + #state{my_flu_name=_MyFluName, pending_map=OldMap}=S) -> send_spam(NewMap, DontSendList, MembersDict, S), ChangedServers = find_changed_servers(OldMap, NewMap), - _OldMapV = map_value(OldMap), - _MapV = map_value(NewMap), - io:format(user, "TODO: ~w async tick trigger/scheduling... ~w for:\n" - " ~p\n ~p\n", [MyFluName, ChangedServers, _OldMapV, _MapV]), + DelayTimeMS = 300, % TODO make configurable! + [erlang:send_after(DelayTimeMS, self(), {adjust_down_list, FLU}) || + FLU <- ChangedServers], + %% _OldMapV = map_value(OldMap), + %% _MapV = map_value(NewMap), + %% io:format(user, "TODO: ~w async tick trigger/scheduling... ~w for:\n" + %% " ~p\n ~p\n",[_MyFluName,ChangedServers,_OldMapV,_MapV]), S#state{pending_map=NewMap, members_dict=MembersDict}. find_changed_servers(OldMap, NewMap) ->