WIP: fix one source of problems, now shift back to 'TODO this clause needs more review'
This commit is contained in:
parent
b7aa33c617
commit
10c655ebfe
1 changed files with 41 additions and 5 deletions
|
@ -31,6 +31,8 @@
|
||||||
-define(LWWREG, riak_dt_lwwreg).
|
-define(LWWREG, riak_dt_lwwreg).
|
||||||
-define(MAP, riak_dt_map).
|
-define(MAP, riak_dt_map).
|
||||||
|
|
||||||
|
-define(DELAY_TIME_MS, 300). % TODO make configurable!
|
||||||
|
|
||||||
%% API
|
%% API
|
||||||
-export([start_link/1,
|
-export([start_link/1,
|
||||||
get_unfit_list/1, update_local_down_list/3,
|
get_unfit_list/1, update_local_down_list/3,
|
||||||
|
@ -100,6 +102,38 @@ handle_info({adjust_down_list, FLU}, #state{active_unfit=ActiveUnfit}=S) ->
|
||||||
Added_to_new = NewUnfit -- ActiveUnfit,
|
Added_to_new = NewUnfit -- ActiveUnfit,
|
||||||
Dropped_from_new = ActiveUnfit -- NewUnfit,
|
Dropped_from_new = ActiveUnfit -- NewUnfit,
|
||||||
io:format(user, "adjust_down_list: ~w: adjust ~w: add ~p drop ~p\n", [S#state.my_flu_name, FLU, Added_to_new, Dropped_from_new]),
|
io:format(user, "adjust_down_list: ~w: adjust ~w: add ~p drop ~p\n", [S#state.my_flu_name, FLU, Added_to_new, Dropped_from_new]),
|
||||||
|
%% We need to schedule a new round of adjustment messages. They might
|
||||||
|
%% be redundant, or they might not. Here's a case where the current
|
||||||
|
%% code needs the extra:
|
||||||
|
%%
|
||||||
|
%% SET partitions = [{a,c},{b,c},{c,b}] (11 of 26) at {23,37,44}
|
||||||
|
%% We are stable spam/gossip at:
|
||||||
|
%% [{a,problem_with,b},{b,problem_with,c},
|
||||||
|
%% {c,problem_with,a},{c,problem_with,b}]
|
||||||
|
%% So everyone agrees unfit=[c].
|
||||||
|
%%
|
||||||
|
%% SET partitions = [{c,a}] (12 of 26) at {23,37,48}
|
||||||
|
%% We are stable spam/gossip at:
|
||||||
|
%% [{a,problem_with,c},{c,problem_with,a}]
|
||||||
|
%% So everyone *ought* to agree that unfit=[a].
|
||||||
|
%%
|
||||||
|
%% In this case, when the partition list changes to [{c,a}],
|
||||||
|
%% then we will discover via spam gossip that reports by B & C will
|
||||||
|
%% change. However, our calc_unfit() via
|
||||||
|
%% make_unfit_list() algorithm will decide that *a* is the bad guy
|
||||||
|
%% and needs to go into our active_unfit list! And the only way
|
||||||
|
%% to get added is via an {adjust_down_list,...} message. The
|
||||||
|
%% usual place for generating them isn't wise enough because it
|
||||||
|
%% doesn't call make_unfit_list().
|
||||||
|
%%
|
||||||
|
%% The cost is that there will (at least) a 2x delay to the
|
||||||
|
%% ?DELAY_TIME_MS waiting period to detect all partitions.
|
||||||
|
%%
|
||||||
|
%% Aside: for all I know right now, there may be a corner case
|
||||||
|
%% hiding where we need this extra round of messages to *remove* a
|
||||||
|
%% FLU from the active_unfit list?
|
||||||
|
|
||||||
|
schedule_adjust_messages(lists:usort(Added_to_new ++ Dropped_from_new)),
|
||||||
case {lists:member(FLU,Added_to_new), lists:member(FLU,Dropped_from_new)} of
|
case {lists:member(FLU,Added_to_new), lists:member(FLU,Dropped_from_new)} of
|
||||||
{true, true} ->
|
{true, true} ->
|
||||||
error({bad, ?MODULE, ?LINE, FLU, ActiveUnfit, NewUnfit});
|
error({bad, ?MODULE, ?LINE, FLU, ActiveUnfit, NewUnfit});
|
||||||
|
@ -224,22 +258,24 @@ do_incoming_spam(Author, Map,
|
||||||
do_map_change(NewMap, DontSendList, MembersDict,
|
do_map_change(NewMap, DontSendList, MembersDict,
|
||||||
#state{my_flu_name=_MyFluName, pending_map=OldMap}=S) ->
|
#state{my_flu_name=_MyFluName, pending_map=OldMap}=S) ->
|
||||||
send_spam(NewMap, DontSendList, MembersDict, S),
|
send_spam(NewMap, DontSendList, MembersDict, S),
|
||||||
ChangedServers = find_changed_servers(OldMap, NewMap),
|
ChangedServers = find_changed_servers(OldMap, NewMap, _MyFluName),
|
||||||
DelayTimeMS = 300, % TODO make configurable!
|
schedule_adjust_messages(ChangedServers),
|
||||||
[erlang:send_after(DelayTimeMS, self(), {adjust_down_list, FLU}) ||
|
|
||||||
FLU <- ChangedServers],
|
|
||||||
%% _OldMapV = map_value(OldMap),
|
%% _OldMapV = map_value(OldMap),
|
||||||
%% _MapV = map_value(NewMap),
|
%% _MapV = map_value(NewMap),
|
||||||
%% io:format(user, "TODO: ~w async tick trigger/scheduling... ~w for:\n"
|
%% io:format(user, "TODO: ~w async tick trigger/scheduling... ~w for:\n"
|
||||||
%% " ~p\n ~p\n",[_MyFluName,ChangedServers,_OldMapV,_MapV]),
|
%% " ~p\n ~p\n",[_MyFluName,ChangedServers,_OldMapV,_MapV]),
|
||||||
S#state{pending_map=NewMap, members_dict=MembersDict}.
|
S#state{pending_map=NewMap, members_dict=MembersDict}.
|
||||||
|
|
||||||
find_changed_servers(OldMap, NewMap) ->
|
find_changed_servers(OldMap, NewMap, _MyFluName) ->
|
||||||
AddBad = fun({_Who, {_Time, BadList, _Props}}, Acc) -> BadList ++ Acc end,
|
AddBad = fun({_Who, {_Time, BadList, _Props}}, Acc) -> BadList ++ Acc end,
|
||||||
OldBad = map_fold(AddBad, [], OldMap),
|
OldBad = map_fold(AddBad, [], OldMap),
|
||||||
NewBad = map_fold(AddBad, [], NewMap),
|
NewBad = map_fold(AddBad, [], NewMap),
|
||||||
lists:usort((OldBad -- NewBad) ++ (NewBad -- OldBad)).
|
lists:usort((OldBad -- NewBad) ++ (NewBad -- OldBad)).
|
||||||
|
|
||||||
|
schedule_adjust_messages(FLU_list) ->
|
||||||
|
[erlang:send_after(?DELAY_TIME_MS, self(), {adjust_down_list, FLU}) ||
|
||||||
|
FLU <- FLU_list].
|
||||||
|
|
||||||
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
||||||
|
|
||||||
map_set(Actor, Map, Key, ValTerm) ->
|
map_set(Actor, Map, Key, ValTerm) ->
|
||||||
|
|
Loading…
Reference in a new issue