2015-09-08 07:11:54 +00:00
|
|
|
%% -------------------------------------------------------------------
|
|
|
|
%%
|
|
|
|
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
|
|
|
|
%%
|
|
|
|
%% This file is provided to you under the Apache License,
|
|
|
|
%% Version 2.0 (the "License"); you may not use this file
|
|
|
|
%% except in compliance with the License. You may obtain
|
|
|
|
%% a copy of the License at
|
|
|
|
%%
|
|
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
%%
|
|
|
|
%% Unless required by applicable law or agreed to in writing,
|
|
|
|
%% software distributed under the License is distributed on an
|
|
|
|
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
|
|
%% KIND, either express or implied. See the License for the
|
|
|
|
%% specific language governing permissions and limitations
|
|
|
|
%% under the License.
|
|
|
|
%%
|
|
|
|
%% -------------------------------------------------------------------
|
|
|
|
|
|
|
|
-module(machi_fitness).
|
|
|
|
|
|
|
|
-behaviour(gen_server).
|
|
|
|
|
2015-09-09 10:09:39 +00:00
|
|
|
-include("machi_projection.hrl").
|
|
|
|
|
2015-09-08 10:13:03 +00:00
|
|
|
-ifdef(TEST).
|
|
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
|
|
-endif. % TEST
|
|
|
|
|
|
|
|
-define(LWWREG, riak_dt_lwwreg).
|
|
|
|
-define(MAP, riak_dt_map).
|
|
|
|
|
2015-09-09 14:59:40 +00:00
|
|
|
-define(DELAY_TIME_MS, 300). % TODO make configurable!
|
|
|
|
|
2015-09-08 07:11:54 +00:00
|
|
|
%% API
|
|
|
|
-export([start_link/1,
|
2015-09-09 10:09:39 +00:00
|
|
|
get_unfit_list/1, update_local_down_list/3,
|
2015-09-10 08:30:11 +00:00
|
|
|
add_admin_down/3, delete_admin_down/2,
|
2015-09-09 10:09:39 +00:00
|
|
|
send_fitness_update_spam/3]).
|
2015-09-08 07:11:54 +00:00
|
|
|
|
|
|
|
%% gen_server callbacks
|
|
|
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
|
|
|
terminate/2, code_change/3]).
|
|
|
|
|
|
|
|
-record(state, {
|
2015-09-09 10:09:39 +00:00
|
|
|
my_flu_name :: atom() | binary(),
|
|
|
|
reg_name :: atom(),
|
|
|
|
local_down=[] :: list(),
|
2015-09-10 08:30:11 +00:00
|
|
|
admin_down=[] :: list({term(),term()}),
|
2015-09-09 10:09:39 +00:00
|
|
|
members_dict=orddict:new() :: orddict:orddict(),
|
2015-09-09 11:45:57 +00:00
|
|
|
active_unfit=[] :: list(),
|
2015-09-11 07:03:49 +00:00
|
|
|
pending_map=?MAP:new() :: ?MAP:riak_dt_map(),
|
2015-09-11 07:45:29 +00:00
|
|
|
partition_simulator_p :: boolean()
|
2015-09-08 07:11:54 +00:00
|
|
|
}).
|
|
|
|
|
|
|
|
start_link(Args) ->
|
|
|
|
gen_server:start_link(?MODULE, Args, []).
|
|
|
|
|
|
|
|
get_unfit_list(PidSpec) ->
|
|
|
|
gen_server:call(PidSpec, {get_unfit_list}, infinity).
|
|
|
|
|
2015-09-09 10:09:39 +00:00
|
|
|
update_local_down_list(PidSpec, Down, MembersDict) ->
|
|
|
|
gen_server:call(PidSpec, {update_local_down_list, Down, MembersDict},
|
|
|
|
infinity).
|
|
|
|
|
2015-09-10 08:30:11 +00:00
|
|
|
add_admin_down(PidSpec, DownFLU, DownProps) ->
|
|
|
|
gen_server:call(PidSpec, {add_admin_down, DownFLU, DownProps},
|
|
|
|
infinity).
|
|
|
|
|
|
|
|
delete_admin_down(PidSpec, DownFLU) ->
|
|
|
|
gen_server:call(PidSpec, {delete_admin_down, DownFLU},
|
|
|
|
infinity).
|
|
|
|
|
2015-09-09 10:09:39 +00:00
|
|
|
send_fitness_update_spam(Pid, FromName, Dict) ->
|
|
|
|
gen_server:call(Pid, {incoming_spam, FromName, Dict}, infinity).
|
2015-09-08 07:11:54 +00:00
|
|
|
|
|
|
|
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
|
|
|
|
2015-09-11 07:45:29 +00:00
|
|
|
init([{MyFluName}|Args]) ->
|
2015-09-09 10:09:39 +00:00
|
|
|
RegName = machi_flu_psup:make_fitness_regname(MyFluName),
|
2015-09-08 07:11:54 +00:00
|
|
|
register(RegName, self()),
|
2015-09-09 14:10:39 +00:00
|
|
|
timer:send_interval(1000, dump),
|
2015-09-11 07:45:29 +00:00
|
|
|
UseSimulatorP = proplists:get_value(use_partition_simulator, Args, false),
|
|
|
|
{ok, #state{my_flu_name=MyFluName, reg_name=RegName,
|
|
|
|
partition_simulator_p=UseSimulatorP}}.
|
2015-09-08 07:11:54 +00:00
|
|
|
|
2015-09-09 11:45:57 +00:00
|
|
|
handle_call({get_unfit_list}, _From, #state{active_unfit=ActiveUnfit}=S) ->
|
|
|
|
Reply = ActiveUnfit,
|
|
|
|
{reply, Reply, S};
|
2015-09-09 10:09:39 +00:00
|
|
|
handle_call({update_local_down_list, Down, MembersDict}, _From,
|
|
|
|
#state{my_flu_name=MyFluName, pending_map=OldMap,
|
2015-09-10 08:30:11 +00:00
|
|
|
local_down=OldDown, members_dict=OldMembersDict,
|
|
|
|
admin_down=AdminDown}=S) ->
|
|
|
|
NewMap = store_in_map(OldMap, MyFluName, erlang:now(), Down,
|
|
|
|
AdminDown, [props_yo]),
|
2015-09-09 10:09:39 +00:00
|
|
|
S2 = if Down == OldDown, MembersDict == OldMembersDict ->
|
|
|
|
%% Do nothing only if both are equal. If members_dict is
|
|
|
|
%% changing, that's sufficient reason to spam.
|
|
|
|
ok;
|
|
|
|
true ->
|
|
|
|
do_map_change(NewMap, [MyFluName], MembersDict, S)
|
|
|
|
end,
|
|
|
|
{reply, ok, S2#state{local_down=Down}};
|
2015-09-10 08:30:11 +00:00
|
|
|
handle_call({add_admin_down, DownFLU, DownProps}, _From,
|
2015-09-10 09:05:18 +00:00
|
|
|
#state{local_down=OldDown, admin_down=AdminDown}=S) ->
|
2015-09-10 08:30:11 +00:00
|
|
|
NewAdminDown = [{DownFLU,DownProps}|lists:keydelete(DownFLU, 1, AdminDown)],
|
2015-09-10 09:05:18 +00:00
|
|
|
S3 = finish_admin_down(erlang:now(), OldDown, NewAdminDown,
|
|
|
|
[props_yo], S),
|
|
|
|
{reply, ok, S3};
|
|
|
|
handle_call({delete_admin_down, DownFLU}, _From,
|
|
|
|
#state{local_down=OldDown, admin_down=AdminDown}=S) ->
|
|
|
|
NewAdminDown = lists:keydelete(DownFLU, 1, AdminDown),
|
|
|
|
S3 = finish_admin_down(erlang:now(), OldDown, NewAdminDown,
|
|
|
|
[props_yo], S),
|
2015-09-10 08:30:11 +00:00
|
|
|
{reply, ok, S3};
|
2015-09-09 10:09:39 +00:00
|
|
|
handle_call({incoming_spam, Author, Dict}, _From, S) ->
|
|
|
|
{Res, S2} = do_incoming_spam(Author, Dict, S),
|
|
|
|
{reply, Res, S2};
|
2015-09-08 07:11:54 +00:00
|
|
|
handle_call(_Request, _From, S) ->
|
|
|
|
Reply = whhhhhhhhhhhhhhaaaaaaaaaaaaaaa,
|
|
|
|
{reply, Reply, S}.
|
|
|
|
|
|
|
|
handle_cast(_Msg, S) ->
|
|
|
|
{noreply, S}.
|
|
|
|
|
2015-09-09 11:45:57 +00:00
|
|
|
handle_info({adjust_down_list, FLU}, #state{active_unfit=ActiveUnfit}=S) ->
|
|
|
|
NewUnfit = make_unfit_list(S),
|
|
|
|
Added_to_new = NewUnfit -- ActiveUnfit,
|
|
|
|
Dropped_from_new = ActiveUnfit -- NewUnfit,
|
2015-09-09 14:10:39 +00:00
|
|
|
io:format(user, "adjust_down_list: ~w: adjust ~w: add ~p drop ~p\n", [S#state.my_flu_name, FLU, Added_to_new, Dropped_from_new]),
|
2015-09-09 14:59:40 +00:00
|
|
|
%% We need to schedule a new round of adjustment messages. They might
|
|
|
|
%% be redundant, or they might not. Here's a case where the current
|
|
|
|
%% code needs the extra:
|
|
|
|
%%
|
|
|
|
%% SET partitions = [{a,c},{b,c},{c,b}] (11 of 26) at {23,37,44}
|
|
|
|
%% We are stable spam/gossip at:
|
|
|
|
%% [{a,problem_with,b},{b,problem_with,c},
|
|
|
|
%% {c,problem_with,a},{c,problem_with,b}]
|
|
|
|
%% So everyone agrees unfit=[c].
|
|
|
|
%%
|
|
|
|
%% SET partitions = [{c,a}] (12 of 26) at {23,37,48}
|
|
|
|
%% We are stable spam/gossip at:
|
|
|
|
%% [{a,problem_with,c},{c,problem_with,a}]
|
|
|
|
%% So everyone *ought* to agree that unfit=[a].
|
|
|
|
%%
|
|
|
|
%% In this case, when the partition list changes to [{c,a}],
|
|
|
|
%% then we will discover via spam gossip that reports by B & C will
|
|
|
|
%% change. However, our calc_unfit() via
|
|
|
|
%% make_unfit_list() algorithm will decide that *a* is the bad guy
|
|
|
|
%% and needs to go into our active_unfit list! And the only way
|
|
|
|
%% to get added is via an {adjust_down_list,...} message. The
|
|
|
|
%% usual place for generating them isn't wise enough because it
|
|
|
|
%% doesn't call make_unfit_list().
|
|
|
|
%%
|
|
|
|
%% The cost is that there will (at least) a 2x delay to the
|
|
|
|
%% ?DELAY_TIME_MS waiting period to detect all partitions.
|
|
|
|
%%
|
|
|
|
%% Aside: for all I know right now, there may be a corner case
|
|
|
|
%% hiding where we need this extra round of messages to *remove* a
|
|
|
|
%% FLU from the active_unfit list?
|
|
|
|
|
|
|
|
schedule_adjust_messages(lists:usort(Added_to_new ++ Dropped_from_new)),
|
2015-09-09 11:45:57 +00:00
|
|
|
case {lists:member(FLU,Added_to_new), lists:member(FLU,Dropped_from_new)} of
|
|
|
|
{true, true} ->
|
|
|
|
error({bad, ?MODULE, ?LINE, FLU, ActiveUnfit, NewUnfit});
|
|
|
|
{true, false} ->
|
|
|
|
{noreply, S#state{active_unfit=lists:usort(ActiveUnfit ++ [FLU])}};
|
|
|
|
{false, true} ->
|
|
|
|
{noreply, S#state{active_unfit=ActiveUnfit -- [FLU]}};
|
|
|
|
{false, false} ->
|
|
|
|
{noreply, S}
|
|
|
|
end;
|
2015-09-09 14:10:39 +00:00
|
|
|
handle_info(dump, #state{my_flu_name=MyFluName,active_unfit=ActiveUnfit,
|
|
|
|
pending_map=Map}=S) ->
|
2015-09-11 14:08:47 +00:00
|
|
|
%% io:format(user, "DUMP: ~w: ~p ~w\n", [MyFluName, ActiveUnfit, map_value(Map)]),
|
|
|
|
io:format(user, "DUMP ~w: ~w, ", [MyFluName, ActiveUnfit]),
|
2015-09-09 14:10:39 +00:00
|
|
|
{noreply, S};
|
2015-09-08 07:11:54 +00:00
|
|
|
handle_info(_Info, S) ->
|
|
|
|
{noreply, S}.
|
|
|
|
|
|
|
|
terminate(_Reason, _S) ->
|
|
|
|
ok.
|
|
|
|
|
|
|
|
code_change(_OldVsn, S, _Extra) ->
|
|
|
|
{ok, S}.
|
|
|
|
|
|
|
|
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
2015-09-08 10:13:03 +00:00
|
|
|
|
2015-09-09 11:45:57 +00:00
|
|
|
make_unfit_list(#state{members_dict=MembersDict}=S) ->
|
2015-09-08 10:13:03 +00:00
|
|
|
Now = erlang:now(),
|
2015-09-10 08:30:11 +00:00
|
|
|
F = fun({Server, {UpdateTime, DownList, AdminDown, _Props}},
|
|
|
|
{ProblemAcc, AdminAcc}) ->
|
2015-09-08 10:13:03 +00:00
|
|
|
case timer:now_diff(Now, UpdateTime) div (1000*1000) of
|
2015-09-10 08:30:11 +00:00
|
|
|
N when N > 900 -> % TODO make configurable
|
|
|
|
{ProblemAcc, AdminAcc};
|
2015-09-08 10:13:03 +00:00
|
|
|
_ ->
|
|
|
|
Probs = [{Server,problem_with,D} || D <- DownList],
|
2015-09-10 08:30:11 +00:00
|
|
|
{[Probs|ProblemAcc], AdminDown++AdminAcc}
|
2015-09-08 10:13:03 +00:00
|
|
|
end
|
|
|
|
end,
|
2015-09-10 08:30:11 +00:00
|
|
|
{Problems0, AdminDown} = map_fold(F, {[], []}, S#state.pending_map),
|
|
|
|
Problems = lists:flatten(Problems0),
|
2015-09-09 11:45:57 +00:00
|
|
|
All_list = [K || {K,_V} <- orddict:to_list(MembersDict)],
|
|
|
|
Unfit = calc_unfit(All_list, Problems),
|
2015-09-10 08:30:11 +00:00
|
|
|
lists:usort(Unfit ++ AdminDown).
|
2015-09-08 10:13:03 +00:00
|
|
|
|
2015-09-10 08:30:11 +00:00
|
|
|
store_in_map(Map, Name, Now, Down, AdminDown, Props) ->
|
|
|
|
{AdminDownServers, AdminDownProps0} = lists:unzip(AdminDown),
|
|
|
|
AdminDownProps = lists:append(AdminDownProps0), % flatten one level
|
|
|
|
Val = {Now, Down, AdminDownServers, Props ++ AdminDownProps},
|
2015-09-08 10:13:03 +00:00
|
|
|
map_set(Name, Map, Name, Val).
|
|
|
|
|
2015-09-11 07:45:29 +00:00
|
|
|
send_spam(NewMap, DontSendList, MembersDict, #state{my_flu_name=MyFluName}=S) ->
|
2015-09-11 07:03:49 +00:00
|
|
|
Send = fun(FLU, #p_srvr{address=Host, port=TcpPort}) ->
|
2015-09-09 10:09:39 +00:00
|
|
|
SpamProj = machi_projection:update_checksum(
|
|
|
|
#projection_v1{epoch_number=?SPAM_PROJ_EPOCH,
|
|
|
|
author_server=MyFluName,
|
|
|
|
dbg=[NewMap],
|
|
|
|
%% stuff only to make PB happy
|
|
|
|
all_members=[],
|
|
|
|
witnesses=[],
|
|
|
|
creation_time={1,2,3},
|
|
|
|
mode=ap_mode,
|
|
|
|
upi=[], repairing=[], down=[],
|
|
|
|
dbg2=[],
|
|
|
|
members_dict=[] }),
|
|
|
|
%% Best effort, don't care about failure.
|
|
|
|
spawn(fun() ->
|
2015-09-11 07:45:29 +00:00
|
|
|
send_projection(FLU, Host, TcpPort, SpamProj, S)
|
2015-09-09 10:09:39 +00:00
|
|
|
end)
|
|
|
|
end,
|
|
|
|
F = fun(FLU, P_srvr, Acc) ->
|
|
|
|
case lists:member(FLU, DontSendList) of
|
|
|
|
true ->
|
|
|
|
Acc;
|
|
|
|
false ->
|
|
|
|
Send(FLU, P_srvr),
|
|
|
|
[FLU|Acc]
|
|
|
|
end
|
|
|
|
end,
|
2015-09-11 07:03:49 +00:00
|
|
|
_Sent = orddict:fold(F, [], MembersDict),
|
2015-09-09 10:09:39 +00:00
|
|
|
ok.
|
|
|
|
|
2015-09-11 07:45:29 +00:00
|
|
|
send_projection(FLU, Host, TcpPort, SpamProj,
|
|
|
|
#state{my_flu_name=MyFluName, members_dict=MembersDict,
|
|
|
|
partition_simulator_p=SimulatorP}) ->
|
2015-09-11 07:03:49 +00:00
|
|
|
%% At the moment, we're using utterly-temporary-hack method of tunneling
|
|
|
|
%% our messages through the write_projection API. Eventually the PB
|
|
|
|
%% API should be expanded to accomodate this new fitness service.
|
2015-09-11 07:45:29 +00:00
|
|
|
DoIt = fun(_ProxyPid) ->
|
|
|
|
machi_flu1_client:write_projection(Host, TcpPort,
|
|
|
|
public, SpamProj)
|
|
|
|
end,
|
|
|
|
ProxyPidPlaceholder = proxy_pid_unused,
|
|
|
|
if SimulatorP ->
|
|
|
|
AllMembers = [K || {K,_V} <- orddict:to_list(MembersDict)],
|
|
|
|
{Partitions, _Islands} = machi_partition_simulator:get(AllMembers),
|
|
|
|
machi_chain_manager1:init_remember_down_list(),
|
|
|
|
Res = machi_chain_manager1:perhaps_call(ProxyPidPlaceholder,
|
|
|
|
MyFluName,
|
|
|
|
Partitions, FLU, DoIt),
|
|
|
|
%% case machi_chain_manager1:get_remember_down_list() of
|
|
|
|
%% [] ->
|
|
|
|
%% ok;
|
|
|
|
%% _ ->
|
|
|
|
%% io:format(user, "fitness error ~w -> ~w\n",
|
|
|
|
%% [MyFluName, FLU])
|
|
|
|
%% end,
|
|
|
|
Res;
|
|
|
|
true ->
|
|
|
|
DoIt(ProxyPidPlaceholder)
|
|
|
|
end.
|
2015-09-11 07:03:49 +00:00
|
|
|
|
2015-09-09 11:45:57 +00:00
|
|
|
calc_unfit(All_list, HosedAnnotations) ->
|
|
|
|
G = digraph:new(),
|
|
|
|
[digraph:add_vertex(G, V) || V <- All_list],
|
|
|
|
[digraph:add_edge(G, V1, V2) || {V1, problem_with, V2} <- HosedAnnotations],
|
|
|
|
calc_unfit2(lists:sort(digraph:vertices(G)), G).
|
|
|
|
|
|
|
|
calc_unfit2([], G) ->
|
2015-09-09 10:09:39 +00:00
|
|
|
digraph:delete(G),
|
|
|
|
[];
|
2015-09-09 11:45:57 +00:00
|
|
|
calc_unfit2([H|T], G) ->
|
2015-09-09 10:09:39 +00:00
|
|
|
case digraph:in_degree(G, H) of
|
|
|
|
0 ->
|
2015-09-09 11:45:57 +00:00
|
|
|
calc_unfit2(T, G);
|
2015-09-09 10:09:39 +00:00
|
|
|
1 ->
|
|
|
|
Neighbors = digraph:in_neighbours(G, H),
|
|
|
|
case [V || V <- Neighbors, digraph:in_degree(G, V) == 1] of
|
|
|
|
[AlsoOne|_] ->
|
|
|
|
%% TODO: be smarter here about the choice of which is down.
|
2015-09-09 11:45:57 +00:00
|
|
|
[H|calc_unfit2(T -- [AlsoOne], G)];
|
2015-09-09 10:09:39 +00:00
|
|
|
[] ->
|
|
|
|
%% H is "on the end", e.g. 1-2-1, so it's OK.
|
2015-09-09 11:45:57 +00:00
|
|
|
calc_unfit2(T, G)
|
2015-09-09 10:09:39 +00:00
|
|
|
end;
|
|
|
|
N when N > 1 ->
|
2015-09-09 11:45:57 +00:00
|
|
|
[H|calc_unfit2(T, G)]
|
2015-09-09 10:09:39 +00:00
|
|
|
end.
|
|
|
|
|
|
|
|
do_incoming_spam(Author, Map,
|
|
|
|
#state{my_flu_name=MyFluName,pending_map=OldMap,
|
|
|
|
members_dict=MembersDict}=S) ->
|
|
|
|
OldMapV = map_value(OldMap),
|
|
|
|
MapV = map_value(Map),
|
|
|
|
if MapV == OldMapV ->
|
|
|
|
{ok, S};
|
|
|
|
true ->
|
|
|
|
%% io:format(user, "YY1 ~p\n", [OldMapV]),
|
|
|
|
%% io:format(user, "YY2 ~p\n", [MapV]),
|
|
|
|
NewMap = map_merge(OldMap, Map),
|
|
|
|
%% NewMapV = map_value(NewMap),
|
|
|
|
%% io:format(user, "YY3 ~p\n", [NewMapV]),
|
|
|
|
S2 = do_map_change(NewMap, [MyFluName, Author], MembersDict, S),
|
|
|
|
{ok, S2}
|
|
|
|
end.
|
|
|
|
|
|
|
|
do_map_change(NewMap, DontSendList, MembersDict,
|
2015-09-09 11:45:57 +00:00
|
|
|
#state{my_flu_name=_MyFluName, pending_map=OldMap}=S) ->
|
2015-09-09 10:09:39 +00:00
|
|
|
send_spam(NewMap, DontSendList, MembersDict, S),
|
2015-09-09 14:59:40 +00:00
|
|
|
ChangedServers = find_changed_servers(OldMap, NewMap, _MyFluName),
|
|
|
|
schedule_adjust_messages(ChangedServers),
|
2015-09-09 11:45:57 +00:00
|
|
|
%% _OldMapV = map_value(OldMap),
|
|
|
|
%% _MapV = map_value(NewMap),
|
|
|
|
%% io:format(user, "TODO: ~w async tick trigger/scheduling... ~w for:\n"
|
|
|
|
%% " ~p\n ~p\n",[_MyFluName,ChangedServers,_OldMapV,_MapV]),
|
2015-09-09 10:09:39 +00:00
|
|
|
S#state{pending_map=NewMap, members_dict=MembersDict}.
|
|
|
|
|
2015-09-09 14:59:40 +00:00
|
|
|
find_changed_servers(OldMap, NewMap, _MyFluName) ->
|
2015-09-10 08:30:11 +00:00
|
|
|
AddBad = fun({_Who, {_Time, BadList, AdminDown, _Props}}, Acc) ->
|
|
|
|
BadList ++ AdminDown ++ Acc
|
|
|
|
end,
|
2015-09-09 10:09:39 +00:00
|
|
|
OldBad = map_fold(AddBad, [], OldMap),
|
|
|
|
NewBad = map_fold(AddBad, [], NewMap),
|
|
|
|
lists:usort((OldBad -- NewBad) ++ (NewBad -- OldBad)).
|
|
|
|
|
2015-09-09 14:59:40 +00:00
|
|
|
schedule_adjust_messages(FLU_list) ->
|
|
|
|
[erlang:send_after(?DELAY_TIME_MS, self(), {adjust_down_list, FLU}) ||
|
|
|
|
FLU <- FLU_list].
|
|
|
|
|
2015-09-10 09:05:18 +00:00
|
|
|
finish_admin_down(Time, Down, NewAdminDown, Props,
|
|
|
|
#state{my_flu_name=MyFluName, local_down=Down,
|
|
|
|
pending_map=OldMap, members_dict=MembersDict}=S) ->
|
2015-09-11 07:03:49 +00:00
|
|
|
NewMap = store_in_map(OldMap, MyFluName, Time, Down, NewAdminDown, Props),
|
2015-09-10 09:05:18 +00:00
|
|
|
S2 = S#state{admin_down=NewAdminDown},
|
|
|
|
do_map_change(NewMap, [MyFluName], MembersDict, S2).
|
|
|
|
|
2015-09-08 10:13:03 +00:00
|
|
|
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
|
|
|
|
|
|
|
map_set(Actor, Map, Key, ValTerm) ->
|
|
|
|
Field = {Key, ?LWWREG},
|
|
|
|
Val = term_to_binary(ValTerm),
|
|
|
|
{ok, Map2} = ?MAP:update({update, [{update, Field, {assign, Val}}]},
|
|
|
|
Actor, Map),
|
|
|
|
Map2.
|
|
|
|
|
|
|
|
map_get(Map, Key) ->
|
|
|
|
Field = {Key, ?LWWREG},
|
|
|
|
case lists:keyfind(Field, 1, ?MAP:value(Map)) of
|
|
|
|
false ->
|
|
|
|
error;
|
|
|
|
{Field, ValBin} ->
|
|
|
|
{ok, binary_to_term(ValBin)}
|
|
|
|
end.
|
|
|
|
|
|
|
|
map_fold(Fun, Acc, Map) ->
|
2015-09-09 10:09:39 +00:00
|
|
|
Vs = map_value(Map),
|
2015-09-08 10:13:03 +00:00
|
|
|
lists:foldl(Fun, Acc, lists:sort(Vs)).
|
|
|
|
|
2015-09-09 10:09:39 +00:00
|
|
|
map_value(Map) ->
|
|
|
|
lists:sort([{K, binary_to_term(V)} || {{K, _Type}, V} <- ?MAP:value(Map)]).
|
|
|
|
|
|
|
|
map_merge(Map1, Map2) ->
|
|
|
|
?MAP:merge(Map1, Map2).
|
|
|
|
|
2015-09-08 10:13:03 +00:00
|
|
|
-ifdef(TEST).
|
|
|
|
|
|
|
|
|
|
|
|
dt_understanding_test() ->
|
|
|
|
F1 = {'X', riak_dt_lwwreg},
|
|
|
|
F2 = {'Y', riak_dt_lwwreg},
|
|
|
|
{ok, Map1} = ?MAP:update({update, [{update, F1, {assign, <<"A">>}}]}, a, ?MAP:new()),
|
|
|
|
{ok, Map2} = ?MAP:update({update, [{update, F2, {assign, <<"B2">>}}]}, b, ?MAP:new()),
|
|
|
|
|
|
|
|
%% io:format(user, "\n", []),
|
|
|
|
%% io:format(user, "Merge comparison: ~p\n", [?MAP:merge(Map1, Map2) == ?MAP:merge(Map2, Map1)]),
|
|
|
|
%% io:format(user, "M12 Val: ~p\n", [?MAP:value(?MAP:merge(Map1, Map2))]),
|
|
|
|
%% io:format(user, "M21 Val: ~p\n", [?MAP:value(?MAP:merge(Map2, Map1))]),
|
|
|
|
?MAP:merge(Map1, Map2) == ?MAP:merge(Map2, Map1).
|
|
|
|
|
|
|
|
smoke_test() ->
|
2015-09-09 07:49:27 +00:00
|
|
|
Map1 = map_set(a, ?MAP:new(), k1, val1),
|
|
|
|
Map2 = map_set(a, Map1, k2, val2),
|
2015-09-08 10:13:03 +00:00
|
|
|
{ok, val1} = map_get(Map2, k1),
|
|
|
|
{ok, val2} = map_get(Map2, k2),
|
|
|
|
error = map_get(Map2, does_not_exist),
|
2015-09-09 07:49:27 +00:00
|
|
|
Map3 = map_set(a, Map2, k3, val3),
|
2015-09-08 10:13:03 +00:00
|
|
|
|
|
|
|
[{k3,1},{k2,1},{k1,1}] = map_fold(fun({K,_}, Acc) -> [{K,1}|Acc] end,
|
|
|
|
[], Map3),
|
|
|
|
ok.
|
|
|
|
|
|
|
|
-endif. % TEST
|