From 10c655ebfe738da3dfd527b70a54c72210257271 Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Wed, 9 Sep 2015 23:59:40 +0900 Subject: [PATCH] WIP: fix one source of problems, now shift back to 'TODO this clause needs more review' --- src/machi_fitness.erl | 46 ++++++++++++++++++++++++++++++++++++++----- 1 file changed, 41 insertions(+), 5 deletions(-) diff --git a/src/machi_fitness.erl b/src/machi_fitness.erl index 5b6658f..8aed2ce 100644 --- a/src/machi_fitness.erl +++ b/src/machi_fitness.erl @@ -31,6 +31,8 @@ -define(LWWREG, riak_dt_lwwreg). -define(MAP, riak_dt_map). +-define(DELAY_TIME_MS, 300). % TODO make configurable! + %% API -export([start_link/1, get_unfit_list/1, update_local_down_list/3, @@ -100,6 +102,38 @@ handle_info({adjust_down_list, FLU}, #state{active_unfit=ActiveUnfit}=S) -> Added_to_new = NewUnfit -- ActiveUnfit, Dropped_from_new = ActiveUnfit -- NewUnfit, io:format(user, "adjust_down_list: ~w: adjust ~w: add ~p drop ~p\n", [S#state.my_flu_name, FLU, Added_to_new, Dropped_from_new]), + %% We need to schedule a new round of adjustment messages. They might + %% be redundant, or they might not. Here's a case where the current + %% code needs the extra: + %% + %% SET partitions = [{a,c},{b,c},{c,b}] (11 of 26) at {23,37,44} + %% We are stable spam/gossip at: + %% [{a,problem_with,b},{b,problem_with,c}, + %% {c,problem_with,a},{c,problem_with,b}] + %% So everyone agrees unfit=[c]. + %% + %% SET partitions = [{c,a}] (12 of 26) at {23,37,48} + %% We are stable spam/gossip at: + %% [{a,problem_with,c},{c,problem_with,a}] + %% So everyone *ought* to agree that unfit=[a]. + %% + %% In this case, when the partition list changes to [{c,a}], + %% then we will discover via spam gossip that reports by B & C will + %% change. However, our calc_unfit() via + %% make_unfit_list() algorithm will decide that *a* is the bad guy + %% and needs to go into our active_unfit list! And the only way + %% to get added is via an {adjust_down_list,...} message. The + %% usual place for generating them isn't wise enough because it + %% doesn't call make_unfit_list(). + %% + %% The cost is that there will (at least) a 2x delay to the + %% ?DELAY_TIME_MS waiting period to detect all partitions. + %% + %% Aside: for all I know right now, there may be a corner case + %% hiding where we need this extra round of messages to *remove* a + %% FLU from the active_unfit list? + + schedule_adjust_messages(lists:usort(Added_to_new ++ Dropped_from_new)), case {lists:member(FLU,Added_to_new), lists:member(FLU,Dropped_from_new)} of {true, true} -> error({bad, ?MODULE, ?LINE, FLU, ActiveUnfit, NewUnfit}); @@ -224,22 +258,24 @@ do_incoming_spam(Author, Map, do_map_change(NewMap, DontSendList, MembersDict, #state{my_flu_name=_MyFluName, pending_map=OldMap}=S) -> send_spam(NewMap, DontSendList, MembersDict, S), - ChangedServers = find_changed_servers(OldMap, NewMap), - DelayTimeMS = 300, % TODO make configurable! - [erlang:send_after(DelayTimeMS, self(), {adjust_down_list, FLU}) || - FLU <- ChangedServers], + ChangedServers = find_changed_servers(OldMap, NewMap, _MyFluName), + schedule_adjust_messages(ChangedServers), %% _OldMapV = map_value(OldMap), %% _MapV = map_value(NewMap), %% io:format(user, "TODO: ~w async tick trigger/scheduling... ~w for:\n" %% " ~p\n ~p\n",[_MyFluName,ChangedServers,_OldMapV,_MapV]), S#state{pending_map=NewMap, members_dict=MembersDict}. -find_changed_servers(OldMap, NewMap) -> +find_changed_servers(OldMap, NewMap, _MyFluName) -> AddBad = fun({_Who, {_Time, BadList, _Props}}, Acc) -> BadList ++ Acc end, OldBad = map_fold(AddBad, [], OldMap), NewBad = map_fold(AddBad, [], NewMap), lists:usort((OldBad -- NewBad) ++ (NewBad -- OldBad)). +schedule_adjust_messages(FLU_list) -> + [erlang:send_after(?DELAY_TIME_MS, self(), {adjust_down_list, FLU}) || + FLU <- FLU_list]. + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% map_set(Actor, Map, Key, ValTerm) ->