Merge slf/chain-manager/cp-mode (fix conflicts)

This commit is contained in:
Scott Lystig Fritchie 2015-07-17 16:39:37 +09:00
commit 19ce841471
26 changed files with 2228 additions and 688 deletions

View file

@ -38,6 +38,7 @@
-record(flap_i, {
flap_count :: {term(), term()},
flapping_me = false :: boolean(),
all_hosed :: list(),
all_flap_counts :: list(),
bad :: list()

31
include/machi_verbose.hrl Normal file
View file

@ -0,0 +1,31 @@
%% -------------------------------------------------------------------
%%
%% Machi: a small village of replicated files
%%
%% Copyright (c) 2014-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-ifdef(PULSE).
-define(V(Fmt, Args), pulse:format(Fmt, Args)).
-else. % PULSE
-define(V(Fmt, Args), io:format(user, Fmt, Args)).
-endif. % PULSE
-define(D(X), ?V("~s ~p\n", [??X, X])).
-define(Dw(X), ?V("~s ~w\n", [??X, X])).

View file

@ -8,15 +8,24 @@ case PulseBuild of
true ->
PulseOpts =
[{pulse_no_side_effect,
[{erlang,display,1}
]},
[{erlang,display,1},
{os,getenv,1},
{io,format,2},
{io,format,3}
]},
{pulse_side_effect,
[ {does_not_exist_yet, some_func, '_'}
, {machi_flu1_client, '_', '_'}
, {machi_projection_store, '_', '_'}
, {machi_proxy_flu1_client, '_', '_'}
, {machi_pb_translate, '_', '_'}
, {prim_file, '_', '_'}
, {file, '_', '_'}
, {filelib, '_', '_'}
, {os, '_', '_'} ]},
%% , {os, '_', '_'}
]},
{pulse_replace_module,
[ {gen_server, pulse_gen_server}

View file

@ -1,7 +1,7 @@
{application, machi, [
{description, "A village of write-once files."},
{vsn, "0.0.0"},
{applications, [kernel, stdlib, sasl, crypto]},
{applications, [kernel, stdlib, crypto]},
{mod,{machi_app,[]}},
{registered, []},
{env, [

View file

@ -27,6 +27,11 @@
-behaviour(application).
-ifdef(PULSE).
-compile({parse_transform, pulse_instrument}).
-include_lib("pulse_otp/include/pulse_otp.hrl").
-endif.
%% Application callbacks
-export([start/2, stop/1]).

File diff suppressed because it is too large Load diff

View file

@ -108,6 +108,7 @@
-include("machi.hrl").
-include("machi_projection.hrl").
-include("machi_verbose.hrl").
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").

View file

@ -70,6 +70,8 @@
-include("machi.hrl").
-include("machi_pb.hrl").
-include("machi_projection.hrl").
-define(V(X,Y), ok).
%% -include("machi_verbose.hrl").
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
@ -316,7 +318,8 @@ net_server_loop(Sock, S) ->
%% TODO: Weird that sometimes neither catch nor try/catch
%% can prevent OTP's SASL from logging an error here.
%% Error in process <0.545.0> with exit value: {badarg,[{erlang,port_command,.......
_ = (catch gen_tcp:send(Sock, Resp)), timer:sleep(1000),
%% TODO: is this what causes the intermittent PULSE deadlock errors?
%% _ = (catch gen_tcp:send(Sock, Resp)), timer:sleep(1000),
(catch gen_tcp:close(Sock)),
exit(normal)
end.

View file

@ -45,6 +45,11 @@
-include("machi_pb.hrl").
-include("machi_projection.hrl").
-ifdef(PULSE).
-compile({parse_transform, pulse_instrument}).
-include_lib("pulse_otp/include/pulse_otp.hrl").
-endif.
-define(HARD_TIMEOUT, 2500).
-export([

View file

@ -61,6 +61,16 @@
-behaviour(supervisor).
-include("machi_verbose.hrl").
-ifdef(PULSE).
-compile({parse_transform, pulse_instrument}).
-include_lib("pulse_otp/include/pulse_otp.hrl").
-define(SHUTDOWN, infinity).
-else.
-define(SHUTDOWN, 5000).
-endif.
%% External API
-export([make_package_spec/4, start_flu_package/4, stop_flu_package/1]).
%% Internal API
@ -73,7 +83,7 @@
make_package_spec(FluName, TcpPort, DataDir, Props) ->
{FluName, {machi_flu_psup, start_link,
[FluName, TcpPort, DataDir, Props]},
permanent, 5000, supervisor, []}.
permanent, ?SHUTDOWN, supervisor, []}.
start_flu_package(FluName, TcpPort, DataDir, Props) ->
Spec = make_package_spec(FluName, TcpPort, DataDir, Props),
@ -103,15 +113,15 @@ init([FluName, TcpPort, DataDir, Props0]) ->
ProjSpec = {ProjRegName,
{machi_projection_store, start_link,
[ProjRegName, DataDir, FluName]},
permanent, 5000, worker, []},
permanent, ?SHUTDOWN, worker, []},
MgrSpec = {make_mgr_supname(FluName),
{machi_chain_manager1, start_link,
[FluName, [], Props]},
permanent, 5000, worker, []},
permanent, ?SHUTDOWN, worker, []},
FluSpec = {FluName,
{machi_flu1, start_link,
[ [{FluName, TcpPort, DataDir}|Props] ]},
permanent, 5000, worker, []},
permanent, ?SHUTDOWN, worker, []},
{ok, {SupFlags, [ProjSpec, MgrSpec, FluSpec]}}.
make_p_regname(FluName) when is_atom(FluName) ->

View file

@ -28,6 +28,16 @@
-behaviour(supervisor).
-include("machi_verbose.hrl").
-ifdef(PULSE).
-compile({parse_transform, pulse_instrument}).
-include_lib("pulse_otp/include/pulse_otp.hrl").
-define(SHUTDOWN, infinity).
-else.
-define(SHUTDOWN, 5000).
-endif.
%% API
-export([start_link/0]).
@ -45,10 +55,17 @@ init([]) ->
MaxSecondsBetweenRestarts = 3600,
SupFlags = {RestartStrategy, MaxRestarts, MaxSecondsBetweenRestarts},
Ps = application:get_env(machi, initial_flus, []),
Ps = get_initial_flus(),
FLU_specs = [machi_flu_psup:make_package_spec(FluName, TcpPort,
DataDir, Props) ||
{FluName, TcpPort, DataDir, Props} <- Ps],
{ok, {SupFlags, FLU_specs}}.
-ifdef(PULSE).
get_initial_flus() ->
[].
-else. % PULSE
get_initial_flus() ->
application:get_env(machi, initial_flus, []).
-endif. % PULSE

View file

@ -26,6 +26,11 @@
-include("machi_pb.hrl").
-include("machi_projection.hrl").
-ifdef(PULSE).
-compile({parse_transform, pulse_instrument}).
-include_lib("pulse_otp/include/pulse_otp.hrl").
-endif.
-export([from_pb_request/1,
from_pb_response/1,
to_pb_request/2,
@ -180,7 +185,7 @@ from_pb_request(#mpb_request{req_id=ReqID,
from_pb_request(#mpb_request{req_id=ReqID}) ->
{ReqID, {high_error, 999966, "Unknown request"}};
from_pb_request(_Else) ->
io:format(user, "\nRRR from_pb_request(~p)\n", [_Else]), timer:sleep(2000),
io:format(user, "\nRRR from_pb_request(~p)\n", [_Else]), %%timer:sleep(2000),
{<<>>, {high_error, 999667, "Unknown PB request"}}.
from_pb_response(#mpb_ll_response{

View file

@ -35,7 +35,7 @@
%% @doc Create a new projection record.
new(MyName, MemberDict, UPI_list, Down_list, Repairing_list, Ps) ->
new(MyName, MemberDict, Down_list, UPI_list, Repairing_list, Ps) ->
new(0, MyName, MemberDict, Down_list, UPI_list, Repairing_list, Ps).
%% @doc Create a new projection record.
@ -141,15 +141,26 @@ compare(#projection_v1{epoch_number=E1},
%% @doc Create a proplist-style summary of a projection record.
make_summary(#projection_v1{epoch_number=EpochNum,
epoch_csum= <<_CSum4:4/binary, _/binary>>=_CSum,
all_members=_All_list,
down=Down_list,
author_server=Author,
upi=UPI_list,
repairing=Repairing_list,
inner=Inner,
flap=Flap,
dbg=Dbg, dbg2=Dbg2}) ->
InnerInfo = if is_record(Inner, projection_v1) ->
[{inner, make_summary(Inner)}];
true ->
[]
end,
[{epoch,EpochNum},{author,Author},
{upi,UPI_list},{repair,Repairing_list},{down,Down_list},
{d,Dbg}, {d2,Dbg2}].
{upi,UPI_list},{repair,Repairing_list},{down,Down_list}] ++
InnerInfo ++
[{flap, Flap}] ++
%% [{flap, lists:flatten(io_lib:format("~p", [Flap]))}] ++
[{d,Dbg}, {d2,Dbg2}].
%% @doc Make a `p_srvr_dict()' out of a list of `p_srvr()' or out of a
%% `p_srvr_dict()'.

View file

@ -41,6 +41,13 @@
-module(machi_projection_store).
-include("machi_projection.hrl").
-define(V(X,Y), ok).
%% -include("machi_verbose.hrl").
%% -ifdef(PULSE).
%% -compile({parse_transform, pulse_instrument}).
%% -include_lib("pulse_otp/include/pulse_otp.hrl").
%% -endif.
%% API
-export([

View file

@ -47,6 +47,10 @@
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-ifdef(PULSE).
-compile({parse_transform, pulse_instrument}).
-include_lib("pulse_otp/include/pulse_otp.hrl").
-endif.
-endif. % TEST.
-export([start_link/1]).

View file

@ -27,6 +27,14 @@
-behaviour(supervisor).
-ifdef(PULSE).
-compile({parse_transform, pulse_instrument}).
-include_lib("pulse_otp/include/pulse_otp.hrl").
-define(SHUTDOWN, infinity).
-else.
-define(SHUTDOWN, 5000).
-endif.
%% API
-export([start_link/0]).
@ -39,6 +47,8 @@ start_link() ->
supervisor:start_link({local, ?SERVER}, ?MODULE, []).
init([]) ->
%% {_, Ps} = process_info(self(), links),
%% [unlink(P) || P <- Ps],
RestartStrategy = one_for_one,
MaxRestarts = 1000,
MaxSecondsBetweenRestarts = 3600,
@ -46,7 +56,7 @@ init([]) ->
SupFlags = {RestartStrategy, MaxRestarts, MaxSecondsBetweenRestarts},
Restart = permanent,
Shutdown = 5000,
Shutdown = ?SHUTDOWN,
Type = supervisor,
ServerSup =

View file

@ -38,7 +38,11 @@
info_msg/2, verb/1, verb/2,
mbytes/1,
%% TCP protocol helpers
connect/2, connect/3
connect/2, connect/3,
%% List twiddling
permutations/1, perms/1,
combinations/1, ordered_combinations/1,
mk_order/2
]).
-compile(export_all).
@ -301,3 +305,29 @@ escript_connect(Host, Port, Timeout) when is_integer(Port) ->
{ok, Sock} = gen_tcp:connect(Host, Port, [{active,false}, {mode,binary},
{packet, raw}], Timeout),
Sock.
permutations(L) ->
perms(L).
perms([]) -> [[]];
perms(L) -> [[H|T] || H <- L, T <- perms(L--[H])].
combinations(L) ->
lists:usort(perms(L) ++ lists:append([ combinations(L -- [X]) || X <- L])).
ordered_combinations(Master) ->
[L || L <- combinations(Master), is_ordered(L, Master)].
is_ordered(L, Reference) ->
L_order = mk_order(L, Reference),
lists:all(fun(X) -> is_integer(X) end, L_order) andalso
L_order == lists:sort(L_order).
mk_order(UPI2, Repair1) ->
R1 = length(Repair1),
Repair1_order_d = orddict:from_list(lists:zip(Repair1, lists:seq(1, R1))),
UPI2_order = [case orddict:find(X, Repair1_order_d) of
{ok, Idx} -> Idx;
error -> error
end || X <- UPI2],
UPI2_order.

View file

@ -0,0 +1,359 @@
%% -------------------------------------------------------------------
%%
%% Machi: a small village of replicated files
%%
%% Copyright (c) 2014-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(chain_mgr_legacy).
-compile(export_all).
-include("machi_projection.hrl").
-include("machi_chain_manager.hrl").
-define(RETURN1(X), begin put(why1, [?LINE|get(why1)]), X end).
%% This is old code, and it's broken. We keep it around in case
%% someone wants to execute the QuickCheck property
%% machi_chain_manager1_test:prop_compare_legacy_with_v2_chain_transition_check().
%% In all counterexamples, this legacy code returns 'true' (i.e., the
%% state transition is OK) where the v2 new code correcly returns
%% 'false' (i.e. the state transition is BAD). Fun, good times.
%% Hooray about more systematic/mathematical reasoning, code
%% structure, and property-based testing.
projection_transition_is_sane(
#projection_v1{epoch_number=Epoch1,
epoch_csum=CSum1,
creation_time=CreationTime1,
author_server=AuthorServer1,
all_members=All_list1,
down=Down_list1,
upi=UPI_list1,
repairing=Repairing_list1,
dbg=Dbg1} = P1,
#projection_v1{epoch_number=Epoch2,
epoch_csum=CSum2,
creation_time=CreationTime2,
author_server=AuthorServer2,
all_members=All_list2,
down=Down_list2,
upi=UPI_list2,
repairing=Repairing_list2,
dbg=Dbg2} = P2,
RelativeToServer, RetrospectiveP) ->
try
put(why1, []),
%% General notes:
%%
%% I'm making no attempt to be "efficient" here. All of these data
%% structures are small, and they're not called zillions of times per
%% second.
%%
%% The chain sequence/order checks at the bottom of this function aren't
%% as easy-to-read as they ought to be. However, I'm moderately confident
%% that it isn't buggy. TODO: refactor them for clarity.
true = is_integer(Epoch1) andalso is_integer(Epoch2),
true = is_binary(CSum1) andalso is_binary(CSum2),
{_,_,_} = CreationTime1,
{_,_,_} = CreationTime2,
true = is_atom(AuthorServer1) andalso is_atom(AuthorServer2), % todo type may change?
true = is_list(All_list1) andalso is_list(All_list2),
true = is_list(Down_list1) andalso is_list(Down_list2),
true = is_list(UPI_list1) andalso is_list(UPI_list2),
true = is_list(Repairing_list1) andalso is_list(Repairing_list2),
true = is_list(Dbg1) andalso is_list(Dbg2),
true = Epoch2 > Epoch1,
All_list1 = All_list2, % todo will probably change
%% No duplicates
true = lists:sort(Down_list2) == lists:usort(Down_list2),
true = lists:sort(UPI_list2) == lists:usort(UPI_list2),
true = lists:sort(Repairing_list2) == lists:usort(Repairing_list2),
%% Disjoint-ness
true = lists:sort(All_list2) == lists:sort(Down_list2 ++ UPI_list2 ++
Repairing_list2),
[] = [X || X <- Down_list2, not lists:member(X, All_list2)],
[] = [X || X <- UPI_list2, not lists:member(X, All_list2)],
[] = [X || X <- Repairing_list2, not lists:member(X, All_list2)],
DownS2 = sets:from_list(Down_list2),
UPIS2 = sets:from_list(UPI_list2),
RepairingS2 = sets:from_list(Repairing_list2),
true = sets:is_disjoint(DownS2, UPIS2),
true = sets:is_disjoint(DownS2, RepairingS2),
true = sets:is_disjoint(UPIS2, RepairingS2),
%% Additions to the UPI chain may only be at the tail
UPI_common_prefix = find_common_prefix(UPI_list1, UPI_list2),
true =
if UPI_common_prefix == [] ->
if UPI_list1 == [] orelse UPI_list2 == [] ->
%% If the common prefix is empty, then one of the
%% inputs must be empty.
?RETURN1(true);
true ->
%% Otherwise, we have a case of UPI changing from
%% one of these two situations:
%%
%% UPI_list1 -> UPI_list2
%% -------------------------------------------------
%% [d,c,b,a] -> [c,a]
%% [d,c,b,a] -> [c,a,repair_finished_added_to_tail].
NotUPI2 = (Down_list2 ++ Repairing_list2),
case lists:prefix(UPI_list1 -- NotUPI2, UPI_list2) of
true ->
?RETURN1(true);
false ->
%% Here's a possible failure scenario:
%% UPI_list1 -> UPI_list2
%% Repairing_list1 -> Repairing_list2
%% -----------------------------------
%% [a,b,c] author=a -> [c,a] author=c
%% [] [b]
%%
%% ... where RelativeToServer=b. In this case, b
%% has been partitioned for a while and has only
%% now just learned of several epoch transitions.
%% If the author of both is also in the UPI of
%% both, then those authors would not have allowed
%% a bad transition, so we will assume this
%% transition is OK.
?RETURN1(
lists:member(AuthorServer1, UPI_list1)
andalso
lists:member(AuthorServer2, UPI_list2)
)
end
end;
true ->
?RETURN1(true)
end,
true = lists:prefix(UPI_common_prefix, UPI_list1),
true = lists:prefix(UPI_common_prefix, UPI_list2),
UPI_1_suffix = UPI_list1 -- UPI_common_prefix,
UPI_2_suffix = UPI_list2 -- UPI_common_prefix,
_ = ?RETURN1(yo),
MoreCheckingP =
RelativeToServer == undefined
orelse
not (lists:member(RelativeToServer, Down_list2) orelse
lists:member(RelativeToServer, Repairing_list2)),
_ = ?RETURN1(yo),
UPIs_are_disjointP = ordsets:is_disjoint(ordsets:from_list(UPI_list1),
ordsets:from_list(UPI_list2)),
case UPI_2_suffix -- UPI_list1 of
[] ->
?RETURN1(true);
[_|_] = _Added_by_2 ->
if RetrospectiveP ->
%% Any servers added to the UPI must be added from the
%% repairing list ... but in retrospective mode (where
%% we're checking only the transitions where all
%% UPI+repairing participants have unanimous private
%% projections!), and if we're under asymmetric
%% partition/churn, then we may not see the repairing
%% list. So we will not check that condition here.
?RETURN1(true);
not RetrospectiveP ->
%% We're not retrospective. So, if some server was
%% added by to the UPI, then that means that it was
%% added by repair. And repair is coordinated by the
%% UPI tail/last.
%io:format(user, "g: UPI_list1=~w, UPI_list2=~w, UPI_2_suffix=~w, ",
% [UPI_list1, UPI_list2, UPI_2_suffix]),
%io:format(user, "g", []),
?RETURN1(true = UPI_list1 == [] orelse
UPIs_are_disjointP orelse
(lists:last(UPI_list1) == AuthorServer2) )
end
end,
if not MoreCheckingP ->
?RETURN1(ok);
MoreCheckingP ->
%% Where did elements in UPI_2_suffix come from?
%% Only two sources are permitted.
Oops_check_UPI_2_suffix =
[lists:member(X, Repairing_list1) % X added after repair done
orelse
lists:member(X, UPI_list1) % X in UPI_list1 after common pref
|| X <- UPI_2_suffix],
%% Grrrrr, ok, so this check isn't good, at least at bootstrap time.
%% TODO: false = lists:member(false, Oops_check_UPI_2_suffix),
%% The UPI_2_suffix must exactly be equal to: ordered items from
%% UPI_list1 concat'ed with ordered items from Repairing_list1.
%% Both temp vars below preserve relative order!
UPI_2_suffix_from_UPI1 = [X || X <- UPI_1_suffix,
lists:member(X, UPI_list2)],
UPI_2_suffix_from_Repairing1 = [X || X <- UPI_2_suffix,
lists:member(X, Repairing_list1)],
%% true?
UPI_2_concat = (UPI_2_suffix_from_UPI1 ++ UPI_2_suffix_from_Repairing1),
if UPI_2_suffix == UPI_2_concat ->
?RETURN1(ok);
true ->
%% 'make dialyzer' will believe that this can never succeed.
%% 'make dialyzer-test' will not complain, however.
if RetrospectiveP ->
%% We are in retrospective mode. But there are
%% some transitions that are difficult to find
%% when standing outside of all of the FLUs and
%% examining their behavior. (In contrast to
%% this same function being called "in the path"
%% of a projection transition by a particular FLU
%% which knows exactly its prior projection and
%% exactly what it intends to do.) Perhaps this
%% exception clause here can go away with
%% better/more clever retrospection analysis?
%%
%% Here's a case that PULSE found:
%% FLU B:
%% E=257: UPI=[c,a], REPAIRING=[b]
%% E=284: UPI=[c,a], REPAIRING=[b]
%% FLU a:
%% E=251: UPI=[c], REPAIRING=[a,b]
%% E=284: UPI=[c,a], REPAIRING=[b]
%% FLU c:
%% E=282: UPI=[c], REPAIRING=[a,b]
%% E=284: UPI=[c,a], REPAIRING=[b]
%%
%% From the perspective of each individual FLU,
%% the unanimous transition at epoch #284 is
%% good. The repair that is done by FLU c -> a
%% is likewise good.
%%
%% From a retrospective point of view (and the
%% current implementation), there's a bad-looking
%% transition from epoch #269 to #284. This is
%% from the point of view of the last two
%% unanimous private projection store epochs:
%%
%% E=269: UPI=[c], REPAIRING=[], DOWN=[a,b]
%% E=284: UPI=[c,a], REPAIRING=[b]
%%
%% The retrospective view by
%% machi_chain_manager1_pulse.erl just can't
%% reason correctly about this situation. We
%% will instead rely on the non-retrospective
%% sanity checking that each FLU does before it
%% writes to its private projection store and
%% then adopts that projection (and unwedges
%% itself, etc etc).
if UPIs_are_disjointP ->
?RETURN1(true);
true ->
?RETURN1(todo),
exit({todo, revisit, ?MODULE, ?LINE,
[
{oops_check_UPI_2_suffix, Oops_check_UPI_2_suffix},
{upi_2_suffix, UPI_2_suffix},
{upi_2_concat, UPI_2_concat},
{retrospectivep, RetrospectiveP}
]}),
io:format(user, "|~p,~p TODO revisit|",
[?MODULE, ?LINE]),
ok
end;
true ->
%% The following is OK: We're shifting from a
%% normal projection to an inner one. The old
%% normal has a UPI that has nothing to do with
%% RelativeToServer a.k.a. me.
%% Or else the UPI_list1 is empty, and I'm
%% the only member of UPI_list2
%% But the new/suffix is definitely me.
%% from:
%% {epoch,847},{author,c},{upi,[c]},{repair,[]},
%% {down,[a,b,d]}
%% to:
%% {epoch,848},{author,a},{upi,[a]},{repair,[]},
%% {down,[b,c,d]}
FirstCase_p = (UPI_2_suffix == [AuthorServer2])
andalso
((inner_projection_exists(P1) == false
andalso
inner_projection_exists(P2) == true)
orelse UPI_list1 == []),
%% Here's another case that's alright:
%%
%% {a,{err,exit,
%% {upi_2_suffix_error,[c]}, ....
%%
%% from:
%% {epoch,937},{author,a},{upi,[a,b]},{repair,[]},
%% {down,[c]}
%% to:
%% {epoch,943},{author,a},{upi,{a,b,c},{repair,[]},
%% {down,[]}
%% The author server doesn't matter. However,
%% there were two other epochs in between, 939
%% and 941, where there wasn't universal agreement
%% of private projections. The repair controller
%% at the tail, 'b', had decided that the repair
%% of 'c' was finished @ epoch 941.
SecondCase_p = ((UPI_2_suffix -- Repairing_list1)
== []),
if FirstCase_p ->
?RETURN1(true);
SecondCase_p ->
?RETURN1(true);
UPIs_are_disjointP ->
%% If there's no overlap at all between
%% UPI_list1 & UPI_list2, then we're OK
%% here.
?RETURN1(true);
true ->
exit({upi_2_suffix_error, UPI_2_suffix})
end
end
end
end,
?RETURN1(true)
catch
_Type:_Err ->
?RETURN1(oops),
S1 = machi_projection:make_summary(P1),
S2 = machi_projection:make_summary(P2),
Trace = erlang:get_stacktrace(),
{err, _Type, _Err, from, S1, to, S2, relative_to, RelativeToServer,
history, (catch lists:sort([no_history])),
stack, Trace}
end.
find_common_prefix([], _) ->
[];
find_common_prefix(_, []) ->
[];
find_common_prefix([H|L1], [H|L2]) ->
[H|find_common_prefix(L1, L2)];
find_common_prefix(_, _) ->
[].
inner_projection_exists(#projection_v1{inner=undefined}) ->
false;
inner_projection_exists(#projection_v1{inner=_}) ->
true.

View file

@ -154,9 +154,10 @@ convergence_demo_testfun(NumFLUs, MgrOpts0) ->
%% Faster test startup, commented: timer:sleep(3000),
TcpPort = 62877,
FluInfo = [{a,TcpPort+0,"./data.a"}, {b,TcpPort+1,"./data.b"},
{c,TcpPort+2,"./data.c"}, {d,TcpPort+3,"./data.d"},
{e,TcpPort+4,"./data.e"}, {f,TcpPort+5,"./data.f"}],
ok = filelib:ensure_dir("/tmp/c/not-used"),
FluInfo = [{a,TcpPort+0,"/tmp/c/data.a"}, {b,TcpPort+1,"/tmp/c/data.b"},
{c,TcpPort+2,"/tmp/c/data.c"}, {d,TcpPort+3,"/tmp/c/data.d"},
{e,TcpPort+4,"/tmp/c/data.e"}, {f,TcpPort+5,"/tmp/c/data.f"}],
FLU_biglist = [X || {X,_,_} <- FluInfo],
All_list = lists:sublist(FLU_biglist, NumFLUs),
io:format(user, "\nSET # of FLUs = ~w members ~w).\n",
@ -175,7 +176,6 @@ convergence_demo_testfun(NumFLUs, MgrOpts0) ->
{Name, PPid}
end || {#p_srvr{name=Name}=P, _Dir} <- PsDirs],
MembersDict = machi_projection:make_members_dict(Ps),
%% MgrOpts = [private_write_verbose, {active_mode,false},
MgrOpts = MgrOpts0 ++ ?DEFAULT_MGR_OPTS,
MgrNamez =
[begin
@ -225,48 +225,64 @@ convergence_demo_testfun(NumFLUs, MgrOpts0) ->
machi_partition_simulator:reset_thresholds(10, 50),
io:format(user, "\nLet loose the dogs of war!\n", []),
DoIt(30, 0, 0),
[DoIt(30, 0, 0) || _ <- lists:seq(1,2)],
AllPs = make_partition_list(All_list),
PartitionCounts = lists:zip(AllPs, lists:seq(1, length(AllPs))),
FLUFudge = if NumFLUs < 4 ->
2;
true ->
13
end,
MaxIters = NumFLUs * (NumFLUs + 1) * 6,
[begin
machi_partition_simulator:always_these_partitions(Partition),
io:format(user, "\nSET partitions = ~w (~w of ~w) at ~w\n",
[Partition, Count, length(AllPs), time()]),
[DoIt(40, 10, 50) || _ <- lists:seq(0, trunc(NumFLUs*FLUFudge)) ],
{stable,true} = {stable,private_projections_are_stable(Namez, DoIt)},
true = lists:foldl(
fun(_, true) ->
true;
(_, _) ->
%% Run a few iterations
[DoIt(10, 10, 50) || _ <- lists:seq(1, 6)],
%% If stable, return true to short circuit remaining
private_projections_are_stable(Namez, DoIt)
end, false, lists:seq(0, MaxIters)),
io:format(user, "\nSweet, private projections are stable\n", []),
io:format(user, "Rolling sanity check ... ", []),
PrivProjs = [{Name, begin
{ok, Ps8} = ?FLU_PC:get_all_projections(FLU, private,
infinity),
Ps9 = if length(Ps8) < 5*1000 ->
Ps8;
true ->
io:format(user, "trunc a bit... ", []),
lists:nthtail(3*1000, Ps8)
end,
[P || P <- Ps9,
P#projection_v1.epoch_number /= 0]
end} || {Name, FLU} <- Namez],
try
[{FLU, true} = {FLU, ?MGR:projection_transitions_are_sane_retrospective(Psx, FLU)} ||
{FLU, Psx} <- PrivProjs]
catch _Err:_What ->
io:format(user, "PrivProjs ~p\n", [PrivProjs]),
exit({line, ?LINE, _Err, _What})
end,
io:format(user, "Yay!\n", []),
io:format(user, "Rolling sanity check ... ", []),
MaxFiles = 3*1000,
PrivProjs = [{Name, begin
{ok, Ps8} = ?FLU_PC:get_all_projections(
FLU, private, infinity),
Ps9 = if length(Ps8) < MaxFiles ->
Ps8;
true ->
lists:nthtail(MaxFiles, Ps8)
end,
[P || P <- Ps9,
P#projection_v1.epoch_number /= 0]
end} || {Name, FLU} <- Namez],
try
[{FLU, true} = {FLU, ?MGR:projection_transitions_are_sane_retrospective(Psx, FLU)} ||
{FLU, Psx} <- PrivProjs]
catch _Err:_What ->
io:format(user, "PrivProjs ~p\n", [PrivProjs]),
exit({line, ?LINE, _Err, _What})
end,
io:format(user, "Yay!\n", []),
ReportXX = machi_chain_manager1_test:unanimous_report(Namez),
true = machi_chain_manager1_test:all_reports_are_disjoint(ReportXX),
io:format(user, "Yay for ReportXX!\n", []),
[begin
Privs = filelib:wildcard(Dir ++ "/projection/private/*"),
FilesToDel1 = lists:sublist(Privs,
max(0, length(Privs)-MaxFiles)),
[_ = file:delete(File) || File <- FilesToDel1],
Pubs = filelib:wildcard(Dir ++ "/projection/public/*"),
FilesToDel2 = lists:sublist(Pubs,
max(0, length(Pubs)-MaxFiles)),
[_ = file:delete(File) || File <- FilesToDel2]
end || Dir <- filelib:wildcard("/tmp/c/data*")],
timer:sleep(1250),
ok
end || {Partition, Count} <- PartitionCounts
],
%% exit(end_experiment),
io:format(user, "\nSET partitions = []\n", []),
io:format(user, "We should see convergence to 1 correct chain.\n", []),
@ -287,6 +303,7 @@ io:format(user, "Yay!\n", []),
%% members appear in only one unique chain, i.e., the sets of
%% unique chains are disjoint.
true = machi_chain_manager1_test:all_reports_are_disjoint(Report),
%% io:format(user, "\nLast Reports: ~p\n", [lists:nthtail(length(Report)-8,Report)]),
%% For each chain transition experienced by a particular FLU,
%% confirm that each state transition is OK.
@ -334,10 +351,12 @@ make_partition_list(All_list) ->
A <- All_list, B <- All_list, A /= B,
C <- All_list, D <- All_list, C /= D,
X /= A, X /= C, A /= C],
%% Concat = _X_Ys1 ++ _X_Ys2.
%% Concat = _X_Ys3.
Concat = _X_Ys1 ++ _X_Ys2 ++ _X_Ys3,
random_sort(lists:usort([lists:sort(L) || L <- Concat])).
%% Concat = _X_Ys1,
%% Concat = _X_Ys2,
%% Concat = _X_Ys1 ++ _X_Ys2,
%% %% Concat = _X_Ys3,
%% %% Concat = _X_Ys1 ++ _X_Ys2 ++ _X_Ys3,
%% random_sort(lists:usort([lists:sort(L) || L <- Concat])).
%% [ [{a,b},{b,d},{c,b}],
%% [{a,b},{b,d},{c,b}, {a,b},{b,a},{a,c},{c,a},{a,d},{d,a}],
@ -348,17 +367,29 @@ make_partition_list(All_list) ->
%% [ [{a,b}, {b,c}],
%% [{a,b}, {c,b}] ].
%% [{a,b}, {b,c}] ]. %% hosed-not-equal @ 3 FLUs
%% [ [{a,b}, {b,c}] ]. %% hosed-not-equal @ 3 FLUs
%% [{b,d}] ].
%% [ [{b,d}] ].
%% [ [{a,b}], [], [{a,b}], [], [{a,b}] ].
%% [
%% %% [{a,b}], [], [{a,b}], [], [{a,b}], [], [{a,b}], [], [{a,b}], [],
%% %% [{a,b}], [], [{a,b}], [], [{a,b}], [], [{a,b}], [], [{a,b}], [],
%% %% [{a,b}], [], [{a,b}], [], [{a,b}], [], [{a,b}], [], [{a,b}], [],
%% %% [{a,b}], [], [{a,b}], [], [{a,b}], [], [{a,b}], [], [{a,b}], [],
%% [{a,b}], [], [{a,b}], [], [{a,b}], [], [{a,b}], [], [{a,b}], [],
%% [{b,a},{d,e}],
%% [{a,b}], [], [{a,b}], [], [{a,b}], [], [{a,b}], [], [{a,b}], []
%% ].
%% [ [{a,b}, {b,a}] ].
%% [ [{a,b},{b,c},{c,a}],
%% [{a,b}, {b,a}, {a,c},{c,a}] ].
%% [{a,b}, {c,b}],
%% [{a,b}, {b,c}] ].
%% [ [{a,b}, {c,b}],
%% [{a,b}, {b,c}] ].
%% [ [{a,b}, {b,c}, {c,d}],
%% [{a,b}, {b,c},{b,d}, {c,d}],
@ -366,23 +397,34 @@ make_partition_list(All_list) ->
%% [{a,b}, {c,b}, {c,d}],
%% [{a,b}, {b,c}, {d,c}] ].
[
%% [{a,b}], [], [{a,b}], [], [{a,b}], [], [{a,b}], [], [{a,b}], [],
%% [{a,b}], [], [{a,b}], [], [{a,b}], [], [{a,b}], [], [{a,b}], [],
%% [{a,b}], [], [{a,b}], [], [{a,b}], [], [{a,b}], [], [{a,b}], [],
%% [{a,b}], [], [{a,b}], [], [{a,b}], [], [{a,b}], [], [{a,b}], [],
[{a,b}], [], [{a,b}], [], [{a,b}]
%% [{a,b}], [], [{a,b}], [], [{a,b}], [], [{a,b}], [], [{a,b}], [],
%% [{b,a},{d,e}],
%% [{a,b}], [], [{a,b}], [], [{a,b}], [], [{a,b}], [], [{a,b}], []
].
%% [ [{a,b}, {b,c}, {c,d}, {d,e}],
%% [{b,a}, {b,c}, {c,d}, {d,e}],
%% [{a,b}, {c,b}, {c,d}, {d,e}],
%% [{a,b}, {b,c}, {d,c}, {d,e}],
%% [{a,b}, {b,c}, {c,d}, {e,d}] ].
%% [ [{c,a}] ].
%% [ [{c,a}] ]. %% TODO double-check for total repair stability at SET=[]!!
%% [ [{c,a}], [{c,b}, {a, b}] ].
%% [ [{c,a}],
%% [{c,b}, {a, b}] ].
%% [ [{a,b},{b,a}, {a,c},{c,a}, {a,d},{d,a}],
%% [{a,b},{b,a}, {a,c},{c,a}, {a,d},{d,a}, {b,c}],
%% [{a,b},{b,a}, {a,c},{c,a}, {a,d},{d,a}, {c,d}] ].
%% [ [{a,b}],
%% [{a,b}, {a,b},{b,a},{a,c},{c,a},{a,d},{d,a}],
%% [ [{a,b}, {a,b},{b,a},{a,c},{c,a},{a,d},{d,a}],
%% [{a,b}, {b,a},{a,b},{b,c},{c,b},{b,d},{d,b}],
%% [{a,b}],
%% [{a,b}, {c,a},{a,c},{c,b},{b,c},{c,d},{d,c}],
%% [{a,b}, {d,a},{a,d},{d,b},{b,d},{d,c},{c,d}] ].
@ -405,10 +447,10 @@ private_projections_are_stable(Namez, PollFunc) ->
if Private1 == Private2 ->
ok;
true ->
io:format(user, "Oops: Private1: ~p\n", [Private1]),
io:format(user, "Oops: Private2: ~p\n", [Private2])
io:format(user, "Private1: ~p, ", [Private1]),
io:format(user, "Private2: ~p, ", [Private2])
end,
true = (Private1 == Private2).
Private1 == Private2.
get_latest_inner_proj_summ(FLU) ->
{ok, Proj} = ?FLU_PC:read_latest_projection(FLU, private),

View file

@ -0,0 +1,462 @@
%% -------------------------------------------------------------------
%%
%% Machi: a small village of replicated files
%%
%% Copyright (c) 2014-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(machi_chain_manager1_converge_demo).
-include("machi.hrl").
-include("machi_projection.hrl").
-define(MGR, machi_chain_manager1).
-define(D(X), io:format(user, "~s ~p\n", [??X, X])).
-define(Dw(X), io:format(user, "~s ~w\n", [??X, X])).
-define(FLU_C, machi_flu1_client).
-define(FLU_PC, machi_proxy_flu1_client).
-compile(export_all).
-ifdef(TEST).
-ifndef(PULSE).
-ifdef(EQC).
-include_lib("eqc/include/eqc.hrl").
%% -include_lib("eqc/include/eqc_statem.hrl").
-define(QC_OUT(P),
eqc:on_output(fun(Str, Args) -> io:format(user, Str, Args) end, P)).
-endif.
-include_lib("eunit/include/eunit.hrl").
short_doc() ->
"
A visualization of the convergence behavior of the chain self-management
algorithm for Machi.
1. Set up 4 FLUs and chain manager pairs.
2. Create a number of different network partition scenarios, where
(simulated) partitions may be symmetric or asymmetric. Then halt changing
the partitions and keep the simulated network stable and broken.
3. Run a number of iterations of the algorithm in parallel by poking each
of the manager processes on a random'ish basis.
4. Afterward, fetch the chain transition changes made by each FLU and
verify that no transition was unsafe.
During the iteration periods, the following is a cheatsheet for the output.
See the internal source for interpreting the rest of the output.
'Let loose the dogs of war!' Network instability
'SET partitions = ' Network stability (but broken)
'x uses:' The FLU x has made an internal state transition. The rest of
the line is a dump of internal state.
'{t}' This is a tick event which triggers one of the manager processes
to evaluate its environment and perhaps make a state transition.
A long chain of '{t}{t}{t}{t}' means that the chain state has settled
to a stable configuration, which is the goal of the algorithm.
Press control-c to interrupt....".
long_doc() ->
"
'Let loose the dogs of war!'
The simulated network is very unstable for a few seconds.
'x uses'
After a single iteration, server x has determined that the chain
should be defined by the upi, repair, and down list in this record.
If all participants reach the same conclusion at the same epoch
number (and checksum, see next item below), then the chain is
stable, fully configured, and can provide full service.
'epoch,E'
The epoch number for this decision is E. The checksum of the full
record is not shown. For purposes of the protocol, a server will
'wedge' itself and refuse service (until a new config is chosen)
whenever: a). it sees a bigger epoch number mentioned somewhere, or
b). it sees the same epoch number but a different checksum. In case
of b), there was a network partition that has healed, and both sides
had chosen to operate with an identical epoch number but different
chain configs.
'upi', 'repair', and 'down'
Members in the chain that are fully in sync and thus preserving the
Update Propagation Invariant, up but under repair (simulated), and
down, respectively.
'ps,[some list]'
The list of asymmetric network partitions. {a,b} means that a
cannot send to b, but b can send to a.
This partition list is recorded for debugging purposes but is *not*
used by the algorithm. The algorithm only 'feels' its effects via
simulated timeout whenever there's a partition in one of the
messaging directions.
'nodes_up,[list]'
The best guess right now of which ndoes are up, relative to the
author node, specified by '{author,X}'
'SET partitions = [some list]'
All subsequent iterations should have a stable list of partitions,
i.e. the 'ps' list described should be stable.
'{FLAP: x flaps n}!'
Server x has detected that it's flapping/oscillating after iteration
n of a naive/1st draft detection algorithm.
".
%% convergence_demo_test_() ->
%% {timeout, 98*300, fun() -> convergence_demo_testfun() end}.
%% convergence_demo_testfun() ->
%% convergence_demo_testfun(3).
-define(DEFAULT_MGR_OPTS, [{private_write_verbose, false},
{active_mode,false},
{use_partition_simulator, true}]).
t() ->
t(3).
t(N) ->
t(N, ?DEFAULT_MGR_OPTS).
t(N, MgrOpts) ->
convergence_demo_testfun(N, MgrOpts).
convergence_demo_testfun(NumFLUs, MgrOpts0) ->
timer:sleep(100),
%% Faster test startup, commented: io:format(user, short_doc(), []),
%% Faster test startup, commented: timer:sleep(3000),
TcpPort = 62877,
ok = filelib:ensure_dir("/tmp/c/not-used"),
FluInfo = [{a,TcpPort+0,"/tmp/c/data.a"}, {b,TcpPort+1,"/tmp/c/data.b"},
{c,TcpPort+2,"/tmp/c/data.c"}, {d,TcpPort+3,"/tmp/c/data.d"},
{e,TcpPort+4,"/tmp/c/data.e"}, {f,TcpPort+5,"/tmp/c/data.f"}],
FLU_biglist = [X || {X,_,_} <- FluInfo],
All_list = lists:sublist(FLU_biglist, NumFLUs),
io:format(user, "\nSET # of FLUs = ~w members ~w).\n",
[NumFLUs, All_list]),
machi_partition_simulator:start_link({111,222,33}, 0, 100),
_ = machi_partition_simulator:get(All_list),
Ps = [#p_srvr{name=Name,address="localhost",port=Port} ||
{Name,Port,_Dir} <- lists:sublist(FluInfo, NumFLUs)],
PsDirs = lists:zip(Ps,
[Dir || {_,_,Dir} <- lists:sublist(FluInfo, NumFLUs)]),
FLU_pids = [machi_flu1_test:setup_test_flu(Name, Port, Dir) ||
{#p_srvr{name=Name,port=Port}, Dir} <- PsDirs],
Namez = [begin
{ok, PPid} = ?FLU_PC:start_link(P),
{Name, PPid}
end || {#p_srvr{name=Name}=P, _Dir} <- PsDirs],
MembersDict = machi_projection:make_members_dict(Ps),
MgrOpts = MgrOpts0 ++ ?DEFAULT_MGR_OPTS,
MgrNamez =
[begin
{ok, MPid} = ?MGR:start_link(P#p_srvr.name, MembersDict, MgrOpts),
{P#p_srvr.name, MPid}
end || P <- Ps],
try
[{_, Ma}|_] = MgrNamez,
{ok, P1} = ?MGR:test_calc_projection(Ma, false),
[ok = ?FLU_PC:write_projection(FLUPid, public, P1) ||
{_, FLUPid} <- Namez, FLUPid /= Ma],
machi_partition_simulator:reset_thresholds(10, 50),
_ = machi_partition_simulator:get(All_list),
Parent = self(),
DoIt = fun(Iters, S_min, S_max) ->
%% io:format(user, "\nDoIt: top\n\n", []),
io:format(user, "DoIt, ", []),
Pids = [spawn(fun() ->
random:seed(now()),
[begin
erlang:yield(),
S_max_rand = random:uniform(
S_max + 1),
%% io:format(user, "{t}", []),
Elapsed =
?MGR:sleep_ranked_order(
S_min, S_max_rand,
M_name, All_list),
_ = ?MGR:test_react_to_env(MMM),
%% Be more unfair by not
%% sleeping here.
% timer:sleep(S_max - Elapsed),
Elapsed
end || _ <- lists:seq(1, Iters)],
Parent ! done
end) || {M_name, MMM} <- MgrNamez ],
[receive
done ->
ok
after 120*1000 ->
exit(icky_timeout)
end || _ <- Pids]
end,
machi_partition_simulator:reset_thresholds(10, 50),
io:format(user, "\nLet loose the dogs of war!\n", []),
[DoIt(30, 0, 0) || _ <- lists:seq(1,2)],
AllPs = make_partition_list(All_list),
PartitionCounts = lists:zip(AllPs, lists:seq(1, length(AllPs))),
FLUFudge = if NumFLUs < 4 ->
2;
true ->
2
%% 13
end,
[begin
machi_partition_simulator:always_these_partitions(Partition),
io:format(user, "\nSET partitions = ~w (~w of ~w) at ~w\n",
[Partition, Count, length(AllPs), time()]),
%% [DoIt(40, 10, 50) || _ <- lists:seq(0, trunc(NumFLUs*FLUFudge)) ],
[DoIt(20, 10, 50) || _ <- lists:seq(0, trunc(NumFLUs*FLUFudge)*2) ],
{stable,true} = {stable,private_projections_are_stable(Namez, DoIt)},
io:format(user, "\nSweet, private projections are stable\n", []),
io:format(user, "Rolling sanity check ... ", []),
PrivProjs = [{Name, begin
{ok, Ps8} = ?FLU_PC:get_all_projections(FLU, private,
infinity),
Max = 3*1000,
Ps9 = if length(Ps8) < Max ->
Ps8;
true ->
NumToDel = length(Ps8) - Max,
io:format(user, "trunc a bit... ", []),
[begin
FilesToDel = lists:sublist(filelib:wildcard(Dir ++ "/projection/private/*"), NumToDel),
[_ = file:delete(File) || File <- FilesToDel]
end || Dir <- filelib:wildcard("/tmp/c/data*")],
lists:nthtail(Max, Ps8)
end,
[P || P <- Ps9,
P#projection_v1.epoch_number /= 0]
end} || {Name, FLU} <- Namez],
try
[{FLU, true} = {FLU, ?MGR:projection_transitions_are_sane_retrospective(Psx, FLU)} ||
{FLU, Psx} <- PrivProjs]
catch _Err:_What ->
io:format(user, "PrivProjs ~p\n", [PrivProjs]),
exit({line, ?LINE, _Err, _What})
end,
io:format(user, "Yay!\n", []),
ReportXX = machi_chain_manager1_test:unanimous_report(Namez),
io:format(user, "ReportXX ~P\n", [ReportXX, 30]),
true = machi_chain_manager1_test:all_reports_are_disjoint(ReportXX),
io:format(user, "Yay for ReportXX!\n", []),
timer:sleep(1250),
ok
end || {Partition, Count} <- PartitionCounts
],
io:format(user, "\nSET partitions = []\n", []),
io:format(user, "We should see convergence to 1 correct chain.\n", []),
machi_partition_simulator:no_partitions(),
[DoIt(50, 10, 50) || _ <- [1]],
true = private_projections_are_stable(Namez, DoIt),
io:format(user, "~s\n", [os:cmd("date")]),
%% We are stable now ... analyze it.
%% Create a report where at least one FLU has written a
%% private projection.
Report = machi_chain_manager1_test:unanimous_report(Namez),
%% ?D(Report),
%% Report is ordered by Epoch. For each private projection
%% written during any given epoch, confirm that all chain
%% members appear in only one unique chain, i.e., the sets of
%% unique chains are disjoint.
true = machi_chain_manager1_test:all_reports_are_disjoint(Report),
%% io:format(user, "\nLast Reports: ~p\n", [lists:nthtail(length(Report)-8,Report)]),
%% For each chain transition experienced by a particular FLU,
%% confirm that each state transition is OK.
PrivProjs = [{Name, begin
{ok, Ps9} = ?FLU_PC:get_all_projections(FLU,
private),
[P || P <- Ps9,
P#projection_v1.epoch_number /= 0]
end} || {Name, FLU} <- Namez],
try
[{FLU, true} = {FLU, ?MGR:projection_transitions_are_sane_retrospective(Psx, FLU)} ||
{FLU, Psx} <- PrivProjs],
io:format(user, "\nAll sanity checks pass, hooray!\n", [])
catch _Err:_What ->
io:format(user, "Report ~p\n", [Report]),
io:format(user, "PrivProjs ~p\n", [PrivProjs]),
exit({line, ?LINE, _Err, _What})
end,
%% ?D(R_Projs),
ok
catch
XX:YY ->
io:format(user, "BUMMER ~p ~p @ ~p\n",
[XX, YY, erlang:get_stacktrace()]),
exit({bummer,XX,YY})
after
[ok = ?MGR:stop(MgrPid) || {_, MgrPid} <- MgrNamez],
[ok = ?FLU_PC:quit(PPid) || {_, PPid} <- Namez],
[ok = machi_flu1:stop(FLUPid) || FLUPid <- FLU_pids],
ok = machi_partition_simulator:stop()
end.
%% Many of the static partition lists below have been problematic at one
%% time or another.....
%%
%% Uncomment *one* of the following make_partition_list() bodies.
make_partition_list(All_list) ->
_X_Ys1 = [[{X,Y}] || X <- All_list, Y <- All_list, X /= Y],
_X_Ys2 = [[{X,Y}, {A,B}] || X <- All_list, Y <- All_list, X /= Y,
A <- All_list, B <- All_list, A /= B,
X /= A],
_X_Ys3 = [[{X,Y}, {A,B}, {C,D}] || X <- All_list, Y <- All_list, X /= Y,
A <- All_list, B <- All_list, A /= B,
C <- All_list, D <- All_list, C /= D,
X /= A, X /= C, A /= C],
%% Concat = _X_Ys1,
%% Concat = _X_Ys2,
%% Concat = _X_Ys1 ++ _X_Ys2,
%% %% Concat = _X_Ys3,
%% %% Concat = _X_Ys1 ++ _X_Ys2 ++ _X_Ys3,
%% random_sort(lists:usort([lists:sort(L) || L <- Concat])).
%% [ [{a,b},{b,d},{c,b}],
%% [{a,b},{b,d},{c,b}, {a,b},{b,a},{a,c},{c,a},{a,d},{d,a}],
%% %% [{a,b},{b,d},{c,b}, {b,a},{a,b},{b,c},{c,b},{b,d},{d,b}],
%% [{a,b},{b,d},{c,b}, {c,a},{a,c},{c,b},{b,c},{c,d},{d,c}],
%% [{a,b},{b,d},{c,b}, {d,a},{a,d},{d,b},{b,d},{d,c},{c,d}] ].
%% [ [{a,b}, {b,c}],
%% [{a,b}, {c,b}] ].
%% [ [{a,b}, {b,c}] ]. %% hosed-not-equal @ 3 FLUs
%% [ [{b,d}] ].
%% [ [{a,b}], [], [{a,b}], [], [{a,b}] ].
%% [
%% %% [{a,b}], [], [{a,b}], [], [{a,b}], [], [{a,b}], [], [{a,b}], [],
%% %% [{a,b}], [], [{a,b}], [], [{a,b}], [], [{a,b}], [], [{a,b}], [],
%% %% [{a,b}], [], [{a,b}], [], [{a,b}], [], [{a,b}], [], [{a,b}], [],
%% %% [{a,b}], [], [{a,b}], [], [{a,b}], [], [{a,b}], [], [{a,b}], [],
%% [{a,b}], [], [{a,b}], [], [{a,b}], [], [{a,b}], [], [{a,b}], [],
%% [{b,a},{d,e}],
%% [{a,b}], [], [{a,b}], [], [{a,b}], [], [{a,b}], [], [{a,b}], []
%% ].
%% [ [{a,b}, {b,a}] ].
%% [ [{a,b},{b,c},{c,a}],
%% [{a,b}, {b,a}, {a,c},{c,a}] ].
%% [ [{a,b}, {c,b}],
%% [{a,b}, {b,c}] ].
%% [ [{a,b}, {b,c}, {c,d}],
%% [{a,b}, {b,c},{b,d}, {c,d}],
%% [{b,a}, {b,c}, {c,d}],
%% [{a,b}, {c,b}, {c,d}],
%% [{a,b}, {b,c}, {d,c}] ].
[
%% [{a,b}], [], [{a,b}], [], [{a,b}], [], [{a,b}], [], [{a,b}], [],
%% [{a,b}], [], [{a,b}], [], [{a,b}], [], [{a,b}], [], [{a,b}], [],
%% [{a,b}], [], [{a,b}], [], [{a,b}], [], [{a,b}], [], [{a,b}], [],
%% [{a,b}], [], [{a,b}], [], [{a,b}], [], [{a,b}], [], [{a,b}], [],
[{a,b}], [], [{a,b}], [], [{a,b}]
%% [{a,b}], [], [{a,b}], [], [{a,b}], [], [{a,b}], [], [{a,b}], [],
%% [{b,a},{d,e}],
%% [{a,b}], [], [{a,b}], [], [{a,b}], [], [{a,b}], [], [{a,b}], []
].
%% [ [{a,b}, {b,c}, {c,d}, {d,e}],
%% [{b,a}, {b,c}, {c,d}, {d,e}],
%% [{a,b}, {c,b}, {c,d}, {d,e}],
%% [{a,b}, {b,c}, {d,c}, {d,e}],
%% [{a,b}, {b,c}, {c,d}, {e,d}] ].
%% [ [{c,a}] ]. %% TODO double-check for total repair stability at SET=[]!!
%% [ [{c,a}],
%% [{c,b}, {a, b}] ].
%% [ [{a,b},{b,a}, {a,c},{c,a}, {a,d},{d,a}],
%% [{a,b},{b,a}, {a,c},{c,a}, {a,d},{d,a}, {b,c}],
%% [{a,b},{b,a}, {a,c},{c,a}, {a,d},{d,a}, {c,d}] ].
% [ [{a,b}, {a,b},{b,a},{a,c},{c,a},{a,d},{d,a}],
% [{a,b}, {b,a},{a,b},{b,c},{c,b},{b,d},{d,b}],
% [{a,b}],
% [{a,b}, {c,a},{a,c},{c,b},{b,c},{c,d},{d,c}],
% [{a,b}, {d,a},{a,d},{d,b},{b,d},{d,c},{c,d}] ].
todo_why_does_this_crash_sometimes(FLUName, FLU, PPPepoch) ->
try
{ok, _}=Res = ?FLU_PC:read_projection(FLU, public, PPPepoch),
Res
catch _:_ ->
io:format(user, "QQQ Whoa, it crashed this time for ~p at epoch ~p\n",
[FLUName, PPPepoch]),
timer:sleep(1000),
exit(still_a_problem),
?FLU_PC:read_projection(FLU, public, PPPepoch)
end.
private_projections_are_stable(Namez, PollFunc) ->
Private1 = [get_latest_inner_proj_summ(FLU) || {_Name, FLU} <- Namez],
PollFunc(5, 1, 10),
Private2 = [get_latest_inner_proj_summ(FLU) || {_Name, FLU} <- Namez],
if Private1 == Private2 ->
ok;
true ->
io:format(user, "Oops: Private1: ~p\n", [Private1]),
io:format(user, "Oops: Private2: ~p\n", [Private2])
end,
true = (Private1 == Private2).
get_latest_inner_proj_summ(FLU) ->
{ok, Proj} = ?FLU_PC:read_latest_projection(FLU, private),
#projection_v1{epoch_number=E, upi=UPI, repairing=Repairing, down=Down} =
machi_chain_manager1:inner_projection_or_self(Proj),
{E, UPI, Repairing, Down}.
random_sort(L) ->
random:seed(now()),
L1 = [{random:uniform(99999), X} || X <- L],
[X || {_, X} <- lists:sort(L1)].
-endif. % !PULSE
-endif. % TEST

View file

@ -0,0 +1,21 @@
***************
*** 348,356 ****
X /= A, X /= C, A /= C],
%% Concat = _X_Ys1,
%% Concat = _X_Ys2,
- Concat = _X_Ys1 ++ _X_Ys2,
%% Concat = _X_Ys3,
- %% Concat = _X_Ys1 ++ _X_Ys2 ++ _X_Ys3,
random_sort(lists:usort([lists:sort(L) || L <- Concat])).
%% [ [{a,b},{b,d},{c,b}],
--- 353,361 ----
X /= A, X /= C, A /= C],
%% Concat = _X_Ys1,
%% Concat = _X_Ys2,
+ %% Concat = _X_Ys1 ++ _X_Ys2,
%% Concat = _X_Ys3,
+ Concat = _X_Ys1 ++ _X_Ys2 ++ _X_Ys3,
random_sort(lists:usort([lists:sort(L) || L <- Concat])).
%% [ [{a,b},{b,d},{c,b}],

View file

@ -2,7 +2,7 @@
%%
%% Machi: a small village of replicated files
%%
%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved.
%% Copyright (c) 2014-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
@ -27,6 +27,7 @@
-compile(export_all).
-include("machi_projection.hrl").
-include("machi_verbose.hrl").
-include_lib("eqc/include/eqc.hrl").
-include_lib("eqc/include/eqc_statem.hrl").
@ -38,7 +39,9 @@
-compile({pulse_replace_module, [{application, pulse_application}]}).
%% The following functions contains side_effects but are run outside
%% PULSE, i.e. PULSE needs to leave them alone
-compile({pulse_skip,[{prop_pulse_test_,0}, {shutdown_hard,0}]}).
-compile({pulse_skip,[{prop_pulse_test_,0}, {prop_pulse_regression_test_,0},
{prop_pulse,1},
{shutdown_hard,0}]}).
-compile({pulse_no_side_effect,[{file,'_','_'}, {erlang, now, 0}]}).
%% Used for output within EUnit...
@ -70,11 +73,58 @@ gen_seed() ->
noshrink({choose(1, 10000), choose(1, 10000), choose(1, 10000)}).
gen_old_threshold() ->
noshrink(choose(1, 100)).
noshrink(frequency([
{10, {keep}},
{10, choose(1, 100)},
{10, oneof([{island1}])},
{10, oneof([{asymm1}, {asymm2}, {asymm3}])}
])).
gen_no_partition_threshold() ->
noshrink(choose(1, 100)).
gen_commands(new) ->
non_empty(commands(?MODULE));
gen_commands(regression) ->
%% These regression tests include only few, very limited command
%% sequences that have been helpful in turning up bugs in the past.
%% For this style test, QuickCheck is basically just choosing random
%% seeds + PULSE execution to see if one of the oldies-but-goodies can
%% find another execution/interleaving that still shows a problem.
Cmd_a = [{set,{var,1},
{call,machi_chain_manager1_pulse,setup,[3,{846,1222,4424}]}},
{set,{var,2},
{call,machi_chain_manager1_pulse,do_ticks,[6,{var,1},13,48]}}],
Cmd_b = [{set,{var,1},
{call,machi_chain_manager1_pulse,setup,[4,{354,7401,1237}]}},
{set,{var,2},
{call,machi_chain_manager1_pulse,do_ticks,[10,{var,1},15,77]}},
{set,{var,3},
{call,machi_chain_manager1_pulse,do_ticks,[7,{var,1},92,39]}}],
Cmd_c = [{set,{var,1},
{call,machi_chain_manager1_pulse,setup,[2,{5202,467,3157}]}},
{set,{var,2},
{call,machi_chain_manager1_pulse,do_ticks,[8,{var,1},98,3]}},
{set,{var,3},
{call,machi_chain_manager1_pulse,do_ticks,[5,{var,1},56,49]}},
{set,{var,4},
{call,machi_chain_manager1_pulse,do_ticks,[10,{var,1},33,72]}},
{set,{var,5},
{call,machi_chain_manager1_pulse,do_ticks,[10,{var,1},88,20]}},
{set,{var,6},
{call,machi_chain_manager1_pulse,do_ticks,[8,{var,1},67,10]}},
{set,{var,7},
{call,machi_chain_manager1_pulse,do_ticks,[5,{var,1},86,25]}},
{set,{var,8},
{call,machi_chain_manager1_pulse,do_ticks,[6,{var,1},74,88]}},
{set,{var,9},
{call,machi_chain_manager1_pulse,do_ticks,[8,{var,1},78,39]}}],
Cmd_d = [{set,{var,1},
{call,machi_chain_manager1_pulse,setup,[5,{436,5950,9085}]}},
{set,{var,2},
{call,machi_chain_manager1_pulse,do_ticks,[7,{var,1},19,80]}}],
noshrink(oneof([Cmd_a, Cmd_b, Cmd_c, Cmd_d])).
command(#state{step=0}) ->
{call, ?MODULE, setup, [gen_num_pids(), gen_seed()]};
command(S) ->
@ -82,7 +132,7 @@ command(S) ->
{ 1, {call, ?MODULE, change_partitions,
[gen_old_threshold(), gen_no_partition_threshold()]}},
{50, {call, ?MODULE, do_ticks,
[choose(5, 100), S#state.pids,
[choose(1, 100), S#state.pids,
gen_old_threshold(), gen_no_partition_threshold()]}}
]).
@ -103,38 +153,49 @@ postcondition(_S, {call, _, _Func, _Args}, _Res) ->
true.
all_list_extra() ->
{PortBase, DirBase} = get_port_dir_base(),
[ %% Genenerators assume that this list is at least 2 items
{#p_srvr{name=a, address="localhost", port=7400,
props=[{chmgr, a_chmgr}]}, "./data.pulse.a"}
, {#p_srvr{name=b, address="localhost", port=7401,
props=[{chmgr, b_chmgr}]}, "./data.pulse.b"}
, {#p_srvr{name=c, address="localhost", port=7402,
props=[{chmgr, c_chmgr}]}, "./data.pulse.c"}
, {#p_srvr{name=d, address="localhost", port=7403,
props=[{chmgr, d_chmgr}]}, "./data.pulse.d"}
, {#p_srvr{name=e, address="localhost", port=7404,
props=[{chmgr, e_chmgr}]}, "./data.pulse.e"}
{#p_srvr{name=a, address="localhost", port=PortBase+0,
props=[{chmgr, a_chmgr}]}, DirBase ++ "/data.pulse.a"}
, {#p_srvr{name=b, address="localhost", port=PortBase+1,
props=[{chmgr, b_chmgr}]}, DirBase ++ "/data.pulse.b"}
, {#p_srvr{name=c, address="localhost", port=PortBase+2,
props=[{chmgr, c_chmgr}]}, DirBase ++ "//data.pulse.c"}
, {#p_srvr{name=d, address="localhost", port=PortBase+3,
props=[{chmgr, d_chmgr}]}, DirBase ++ "/data.pulse.d"}
, {#p_srvr{name=e, address="localhost", port=PortBase+4,
props=[{chmgr, e_chmgr}]}, DirBase ++ "/data.pulse.e"}
].
all_list() ->
[P#p_srvr.name || {P, _Dir} <- all_list_extra()].
setup(Num, Seed) ->
?QC_FMT("\nsetup(~w", [Num]),
error_logger:tty(false),
?V("\nsetup(~w,~w", [Num, Seed]),
All_list = lists:sublist(all_list(), Num),
All_listE = lists:sublist(all_list_extra(), Num),
%% shutdown_hard() has taken care of killing all relevant procs.
[machi_flu1_test:clean_up_data_dir(Dir) || {_P, Dir} <- All_listE],
?QC_FMT(",z~w", [?LINE]),
[begin
machi_flu1_test:clean_up_data_dir(Dir),
filelib:ensure_dir(Dir ++ "/not-used")
end || {_P, Dir} <- All_listE],
?V(",z~w", [?LINE]),
%% Start partition simulator
{ok, PSimPid} = machi_partition_simulator:start_link(Seed, 0, 100),
%% GRRR, not PULSE: {ok, _} = application:ensure_all_started(machi),
[begin
_QQ = (catch application:start(App))
end || App <- [machi] ],
?V(",z~w", [?LINE]),
SimSpec = {part_sim, {machi_partition_simulator, start_link,
[{0,0,0}, 0, 100]},
permanent, 500, worker, []},
{ok, PSimPid} = supervisor:start_child(machi_sup, SimSpec),
ok = machi_partition_simulator:set_seed(Seed),
_Partitions = machi_partition_simulator:get(All_list),
?QC_FMT(",z~w", [?LINE]),
?V(",z~w", [?LINE]),
%% Start FLUs and their associated procs
{ok, SupPid} = machi_flu_sup:start_link(),
FluOpts = [{use_partition_simulator, true}, {active_mode, false}],
[begin
#p_srvr{name=Name, port=Port} = P,
@ -142,31 +203,49 @@ setup(Num, Seed) ->
end || {P, Dir} <- All_listE],
%% Set up the chain
Dict = orddict:from_list([{P#p_srvr.name, P} || {P, _Dir} <- All_listE]),
?QC_FMT(",z~w", [?LINE]),
[machi_chain_manager1:set_chain_members(get_chmgr(P), Dict) ||
{P, _Dir} <- All_listE],
?V(",z~w", [?LINE]),
%% Trigger some environment reactions for humming consensus: first
%% do all the same server first, then round-robin evenly across
%% servers.
[begin
_QQa = machi_chain_manager1:trigger_react_to_env(get_chmgr(P))
end || {P, _Dir} <- All_listE, _I <- lists:seq(1,20), _Repeat <- [1,2]],
?QC_FMT(",z~w", [?LINE]),
[begin
_QQa = machi_chain_manager1:trigger_react_to_env(get_chmgr(P))
end || _I <- lists:seq(1,20), {P, _Dir} <- All_listE, _Repeat <- [1,2]],
?QC_FMT(",z~w", [?LINE]),
?V(",z~w", [?LINE]),
ProxiesDict = ?FLU_PC:start_proxies(Dict),
Res = {PSimPid, SupPid, ProxiesDict, All_listE},
Res = {PSimPid, 'machi_flu_sup', ProxiesDict, All_listE},
put(manager_pids_hack, Res),
?QC_FMT("),", []),
?V("),", []),
Res.
change_partitions(OldThreshold, NoPartitionThreshold) ->
change_partitions(OldThreshold, NoPartitionThreshold)
when is_integer(OldThreshold) ->
machi_partition_simulator:reset_thresholds(OldThreshold,
NoPartitionThreshold).
NoPartitionThreshold);
change_partitions({keep}, _NoPartitionThreshold) ->
ok;
change_partitions({island1}, _NoPartitionThreshold) ->
AB = [a,b],
NotAB = all_list() -- AB,
Partitions = lists:usort([{X, Y} || X <- AB, Y <- NotAB] ++
[{X, Y} || X <- NotAB, Y <- AB]),
machi_partition_simulator:always_these_partitions(Partitions);
change_partitions({asymm1}, _NoPartitionThreshold) ->
Partitions = [{a,b}],
machi_partition_simulator:always_these_partitions(Partitions);
change_partitions({asymm2}, _NoPartitionThreshold) ->
Partitions = [{a,b},{a,c},{a,d},{a,e},{b,a},{b,c},{b,e},{c,a},{c,b},{c,d},{c,e},{d,a},{d,c},{d,e},{e,a},{e,b},{e,c},{e,d}],
machi_partition_simulator:always_these_partitions(Partitions);
change_partitions({asymm3}, _NoPartitionThreshold) ->
Partitions = [{a,b},{a,c},{a,d},{a,e},{b,a},{b,d},{b,e},{c,d},{d,a},{d,c},{d,e},{e,a},{e,b},{e,d}],
machi_partition_simulator:always_these_partitions(Partitions).
always_last_partitions() ->
machi_partition_simulator:always_last_partitions().
@ -175,29 +254,28 @@ private_stable_check() ->
{_PSimPid, _SupPid, ProxiesDict, All_listE} = get(manager_pids_hack),
Res = private_projections_are_stable_check(ProxiesDict, All_listE),
if not Res ->
io:format(user, "BUMMER: private stable check failed!\n", []);
?QC_FMT("BUMMER: private stable check failed!\n", []);
true ->
ok
end,
Res.
do_ticks(Num, PidsMaybe, OldThreshold, NoPartitionThreshold) ->
io:format(user, "~p,~p,~p|", [Num, OldThreshold, NoPartitionThreshold]),
?V("~p,~p,~p|", [Num, OldThreshold, NoPartitionThreshold]),
{_PSimPid, _SupPid, ProxiesDict, All_listE} =
case PidsMaybe of
undefined -> get(manager_pids_hack);
_ -> PidsMaybe
end,
if is_integer(OldThreshold) ->
machi_partition_simulator:reset_thresholds(OldThreshold,
NoPartitionThreshold);
if is_atom(OldThreshold) ->
?V("{e=~w},", [get_biggest_private_epoch_number(ProxiesDict)]),
machi_partition_simulator:no_partitions();
true ->
?QC_FMT("{e=~w},", [get_biggest_private_epoch_number(ProxiesDict)]),
machi_partition_simulator:no_partitions()
change_partitions(OldThreshold, NoPartitionThreshold)
end,
Res = exec_ticks(Num, All_listE),
if not is_integer(OldThreshold) ->
?QC_FMT("{e=~w},", [get_biggest_private_epoch_number(ProxiesDict)]);
?V("{e=~w},", [get_biggest_private_epoch_number(ProxiesDict)]);
true ->
ok
end,
@ -214,73 +292,34 @@ get_biggest_private_epoch_number(ProxiesDict) ->
dump_state() ->
try
?QC_FMT("dump_state(", []),
?V("dump_state(", []),
{_PSimPid, _SupPid, ProxiesDict, _AlE} = get(manager_pids_hack),
Report = ?MGRTEST:unanimous_report(ProxiesDict),
Namez = ProxiesDict,
%% ?QC_FMT("Report ~p\n", [Report]),
%% Diag1 = [begin
%% {ok, Ps} = ?FLU_PC:get_all_projections(Proxy, Type),
%% [io_lib:format("~p ~p ~p: ~w\n", [FLUName, Type, P#projection_v1.epoch_number, machi_projection:make_summary(P)]) || P <- Ps]
%% end || {FLUName, Proxy} <- orddict:to_list(ProxiesDict),
%% Type <- [public] ],
UniquePrivateEs =
lists:usort(lists:flatten(
[element(2,?FLU_PC:list_all_projections(Proxy,private)) ||
{_FLUName, Proxy} <- orddict:to_list(ProxiesDict)])),
P_lists0 = [{FLUName, Type,
element(2,?FLU_PC:get_all_projections(Proxy, Type))} ||
{FLUName, Proxy} <- orddict:to_list(ProxiesDict),
Type <- [public,private]],
P_lists = [{FLUName, Type, P} || {FLUName, Type, Ps} <- P_lists0,
P <- Ps],
AllDict = lists:foldl(fun({FLU, Type, P}, D) ->
K = {FLU, Type, P#projection_v1.epoch_number},
dict:store(K, P, D)
end, dict:new(), lists:flatten(P_lists)),
DumbFinderBackward =
fun(FLUName) ->
fun(E, error_unwritten) ->
case dict:find({FLUName, private, E}, AllDict) of
{ok, T} -> T;
error -> error_unwritten
end;
(_E, Acc) ->
Acc
end
end,
%% Diag2 = [[
%% io_lib:format("~p private: ~w\n",
%% [FLUName,
%% machi_projection:make_summary(
%% lists:foldl(DumbFinderBackward(FLUName),
%% error_unwritten,
%% lists:seq(Epoch, 0, -1)))])
%% || {FLUName, _FLU} <- Namez]
%% || Epoch <- UniquePrivateEs],
PrivProjs = [{Name, begin
{ok, Ps} = ?FLU_PC:get_all_projections(Proxy,
private),
[P || P <- Ps,
P#projection_v1.epoch_number /= 0]
end} || {Name, Proxy} <- ProxiesDict],
?QC_FMT(")", []),
?V("~w", [catch application:stop(machi)]),
?V(")", []),
Diag1 = Diag2 = "skip_diags",
{Report, PrivProjs, lists:flatten([Diag1, Diag2])}
catch XX:YY ->
?QC_FMT("OUCH: ~p ~p @ ~p\n", [XX, YY, erlang:get_stacktrace()]),
?QC_FMT("Exiting now to move to manual post-mortem....\n", []),
erlang:halt(0),
?V("OUCH: ~p ~p @ ~p\n", [XX, YY, erlang:get_stacktrace()]),
?V("Exiting now to move to manual post-mortem....\n", []),
erlang:halt(66),
false
end.
prop_pulse() ->
?FORALL({Cmds0, Seed}, {non_empty(commands(?MODULE)), pulse:seed()},
?IMPLIES(1 < length(Cmds0) andalso length(Cmds0) < 10,
prop_pulse(new).
prop_pulse(Style) when Style == new; Style == regression ->
_ = application:start(crypto),
?FORALL({Cmds0, Seed}, {gen_commands(Style), pulse:seed()},
?IMPLIES(1 < length(Cmds0) andalso length(Cmds0) < 11,
begin
ok = shutdown_hard(),
%% PULSE can be really unfair, of course, including having exec_ticks
@ -294,32 +333,32 @@ prop_pulse() ->
{call, ?MODULE, private_stable_check, []}}],
LastTriggerTicks = {set,{var,99999997},
{call, ?MODULE, do_ticks, [123, undefined, no, no]}},
Cmds1 = lists:duplicate(2, LastTriggerTicks),
%% Cmds1 = lists:duplicate(length(all_list())*2, LastTriggerTicks),
Cmds1 = lists:duplicate(4, LastTriggerTicks),
Cmds = Cmds0 ++
Stabilize1 ++
Cmds1 ++
Stabilize2 ++
[{set,{var,99999999}, {call, ?MODULE, dump_state, []}}],
error_logger:tty(false),
pulse:verbose([format]),
{_H2, S2, Res} = pulse:run(
fun() ->
{_H, _S, _R} = run_commands(?MODULE, Cmds)
end, [{seed, Seed},
{strategy, unfair}]),
%% ?QC_FMT("S2 ~p\n", [S2]),
case S2#state.dump_state of
undefined ->
?QC_FMT("BUMMER Cmds = ~p\n", [Cmds]);
_ ->
ok
end,
ok = shutdown_hard(),
{Report, PrivProjs, Diag} = S2#state.dump_state,
%% Report is ordered by Epoch. For each private projection
%% written during any given epoch, confirm that all chain
%% members appear in only one unique chain, i.e., the sets of
%% unique chains are disjoint.
AllDisjointP = ?MGRTEST:all_reports_are_disjoint(Report),
{AllDisjointP, AllDisjointDetail} =
case ?MGRTEST:all_reports_are_disjoint(Report) of
true -> {true, true};
Else -> {false, Else}
end,
%% For each chain transition experienced by a particular FLU,
%% confirm that each state transition is OK.
@ -328,40 +367,82 @@ prop_pulse() ->
Ps, FLU)} ||
{FLU, Ps} <- PrivProjs],
SaneP = lists:all(fun({_FLU, SaneRes}) -> SaneRes == true end, Sane),
%% The final report item should say that all are agreed_membership.
%% On a really bad day, this could trigger a badmatch exception....
{_LastEpoch, {ok_disjoint, LastRepXs}} = lists:last(Report),
AgreedOrNot = lists:usort([element(1, X) || X <- LastRepXs]),
%% TODO: Check that we've converged to a single chain with no repairs.
SingleChainNoRepair = case LastRepXs of
[{agreed_membership,{_UPI,[]}}] ->
true;
_ ->
LastRepXs
end,
{SingleChainNoRepair_p, SingleChainNoRepairDetail} =
case LastRepXs of
[LastUPI] when length(LastUPI) == S2#state.num_pids ->
{true, true};
_ ->
{false, LastRepXs}
end,
ok = shutdown_hard(),
?WHENFAIL(
begin
%% ?QC_FMT("PrivProjs = ~P\n", [PrivProjs, 50]),
?QC_FMT("Report = ~p\n", [Report]),
?QC_FMT("Cmds = ~p\n", [Cmds]),
?QC_FMT("Res = ~p\n", [Res]),
?QC_FMT("Diag = ~s\n", [Diag]),
?QC_FMT("Report = ~p\n", [Report]),
?QC_FMT("PrivProjs = ~p\n", [PrivProjs]),
?QC_FMT("Sane = ~p\n", [Sane]),
?QC_FMT("SingleChainNoRepair failure =\n ~p\n", [SingleChainNoRepair])
,erlang:halt(0)
?QC_FMT("AllDisjointDetail = ~p\n", [AllDisjointDetail]),
?QC_FMT("SingleChainNoRepair failure = ~p\n", [SingleChainNoRepairDetail])
,?QC_FMT("\n\nHalting now!!!!!!!!!!\n\n", []),timer:sleep(500),erlang:halt(1)
end,
conjunction([{res, Res == true orelse Res == ok},
{all_disjoint, AllDisjointP},
{sane, SaneP},
{all_agreed_at_end, AgreedOrNot == [agreed_membership]},
{single_chain_no_repair, SingleChainNoRepair}
{single_chain_no_repair, SingleChainNoRepair_p}
]))
end)).
prop_pulse_test_() ->
-define(FIXTURE(TIMEOUT, EXTRATO, FUN), {timeout, (Timeout+ExtraTO+600), FUN}).
prop_pulse_new_test_() ->
{Timeout, ExtraTO} = get_timeouts(),
DoShrink = get_do_shrink(),
F = fun() ->
?assert(do_quickcheck(DoShrink, Timeout, new))
end,
case os:getenv("PULSE_SKIP_NEW") of
false ->
?FIXTURE(Timeout, ExtraTO, F);
_ ->
{timeout, 5,
fun() -> timer:sleep(200),
io:format(user, " (skip new style) ", []) end}
end.
%% See gen_commands() for more detail on the regression tests.
prop_pulse_regression_test_() ->
{Timeout, ExtraTO} = get_timeouts(),
DoShrink = get_do_shrink(),
F = fun() ->
?assert(do_quickcheck(DoShrink, Timeout, regression))
end,
case os:getenv("PULSE_SKIP_REGRESSION") of
false ->
?FIXTURE(Timeout, ExtraTO, F);
_ ->
{timeout, 5,
fun() -> timer:sleep(200),
io:format(user, " (skip regression style) ", []) end}
end.
do_quickcheck(Timeout, Style) ->
do_quickcheck(true, Timeout, Style).
do_quickcheck(true, Timeout, Style) ->
eqc:quickcheck(eqc:testing_time(Timeout,
?QC_OUT(prop_pulse(Style))));
do_quickcheck(false, Timeout, Style) ->
eqc:quickcheck(eqc:testing_time(Timeout,
?QC_OUT(noshrink(prop_pulse(Style))))).
get_timeouts() ->
Timeout = case os:getenv("PULSE_TIME") of
false -> 60;
Val -> list_to_integer(Val)
@ -370,36 +451,31 @@ prop_pulse_test_() ->
false -> 0;
Val2 -> list_to_integer(Val2)
end,
{timeout, (Timeout+ExtraTO+600), % 600 = a bit more fudge time
fun() ->
?assert(eqc:quickcheck(eqc:testing_time(Timeout,
?QC_OUT(prop_pulse()))))
end}.
{Timeout, ExtraTO}.
get_do_shrink() ->
case os:getenv("PULSE_NOSHRINK") of
false ->
true;
_ ->
false
end.
shutdown_hard() ->
?QC_FMT("shutdown(", []),
(catch unlink(whereis(machi_partition_simulator))),
[begin
Pid = whereis(X),
spawn(fun() -> (catch X:stop()) end),
timer:sleep(50),
(catch unlink(Pid)),
timer:sleep(10),
(catch exit(Pid, shutdown)),
timer:sleep(1),
(catch exit(Pid, kill))
end || X <- [machi_partition_simulator, machi_flu_sup] ],
timer:sleep(1),
?QC_FMT(")", []),
_STOP = application:stop(App)
end || App <- [machi] ],
timer:sleep(100),
ok.
exec_ticks(Num, All_listE) ->
Parent = self(),
Pids = [spawn_link(fun() ->
[begin
erlang:yield(),
M_name = P#p_srvr.name,
Max = 10,
%% Max = 10,
Max = 25,
Elapsed =
?MGR:sleep_ranked_order(1, Max, M_name, all_list()),
Res = ?MGR:trigger_react_to_env(get_chmgr(P)),
@ -438,4 +514,20 @@ private_projections_are_stable_check(ProxiesDict, All_listE) ->
get_chmgr(#p_srvr{props=Ps}) ->
proplists:get_value(chmgr, Ps).
%% {PortBase, DirBase} = get_port_dir_base(),
get_port_dir_base() ->
I = case os:getenv("PULSE_BASE_PORT") of
false ->
0;
II ->
list_to_integer(II)
end,
D = case os:getenv("PULSE_BASE_DIR") of
false ->
"/tmp/c/";
DD ->
DD
end,
{7400 + (I * 100), D ++ "/" ++ integer_to_list(I)}.
-endif. % PULSE

View file

@ -45,93 +45,234 @@
-include_lib("eunit/include/eunit.hrl").
-compile(export_all).
%% @doc Create a summary report of all of the *private* projections of
%% each of the FLUs in the chain, and create a summary for each
%% epoch number.
%%
%% Report format: list({EpochNumber:non_neg_integer(), Report:rpt()})
%% rpt(): {'ok_disjoint', unique_upi_repair_lists()} |
%% {'bummer_NOT_DISJOINT', {flat(), summaries()}
%% unique_upi_repair_lists(): list(upi_and_repair_lists_concatenated())
%% flat(): debugging term; any duplicate in this list is an invalid FLU.
%% summaries(): list({FLU, ProjectionSummary:string() | 'not_in_this_epoch'})
%%
%% Example:
%% [{1,{ok_disjoint,[{agreed_membership,{[a],[b,c]}}]}},
%% {3,{ok_disjoint,[{agreed_membership,{[a],[b,c]}}]}},
%% {8,
%% {ok_disjoint,[{not_agreed,{[a],[b,c]},
%% [{b,not_in_this_epoch},
%% <<65,159,66,113,232,15,156,244,197,
%% 210,39,82,229,84,192,19,27,45,161,38>>]}]}},
%% {10,{ok_disjoint,[{agreed_membership,{[c],[]}}]}},
%% ...]
%%
%% [{1,{ok_disjoint,[[a,b,c]]}},
%% {4,{ok_disjoint,[[a,b,c]]}},
%% {6,{ok_disjoint,[[a,b,c]]}},
%% {16,{ok_disjoint,[[a,b,c]]}},
%% {22,{ok_disjoint,[[b]]}},
%% {1174,
%% {bummer_NOT_DISJOINT,{[a,a,b],
%% [{a,"[{epoch,1174},{author,a},{upi,[a]},{repair,[]},{down,[b]},{d,[{ps,[{a,b},{b,a}]},{nodes_up,[a]}]},{d2,[]}]"},
%% {b,"[{epoch,1174},{author,b},{upi,[b]},{repair,[a]},{down,[]},{d,[{ps,[]},{nodes_up,[a,b]}]},{d2,[]}]"}]}}},
%% ...]
unanimous_report(Namez) ->
UniquePrivateEs =
lists:usort(lists:flatten(
[element(2, ?FLU_PC:list_all_projections(FLU, private)) ||
{_FLUName, FLU} <- Namez])),
[unanimous_report(Epoch, Namez) || Epoch <- UniquePrivateEs,
Epoch /= 0].
[{Epoch, unanimous_report(Epoch, Namez)} || Epoch <- UniquePrivateEs,
Epoch /= 0].
unanimous_report(Epoch, Namez) ->
Projs = [{FLUName,
case ?FLU_PC:read_projection(FLU, private, Epoch) of
{ok, T} ->
machi_chain_manager1:inner_projection_or_self(T);
_Else ->
{FLUName, not_in_this_epoch}
end} || {FLUName, FLU} <- Namez],
FLU_Projs = [{FLUName,
case ?FLU_PC:read_projection(FLU, private, Epoch) of
{ok, T} ->
machi_chain_manager1:inner_projection_or_self(T);
_Else ->
not_in_this_epoch
end} || {FLUName, FLU} <- Namez],
unanimous_report2(FLU_Projs).
unanimous_report2(FLU_Projs) ->
ProjsSumms = [{FLU, if is_tuple(P) ->
Summ = machi_projection:make_summary(P),
lists:flatten(io_lib:format("~w", [Summ]));
is_atom(P) ->
P
end} || {FLU, P} <- FLU_Projs],
UPI_R_Sums = [{Proj#projection_v1.upi, Proj#projection_v1.repairing,
Proj#projection_v1.epoch_csum} ||
{_FLUname, Proj} <- Projs,
{_FLUname, Proj} <- FLU_Projs,
is_record(Proj, projection_v1)],
UniqueUPIs = lists:usort([UPI || {UPI, _Repairing, _CSum} <- UPI_R_Sums]),
Res =
[begin
case lists:usort([CSum || {U, _Repairing, CSum} <- UPI_R_Sums,
U == UPI]) of
[_1CSum] ->
%% Yay, there's only 1 checksum. Let's check
%% that all FLUs are in agreement.
{UPI, Repairing, _CSum} =
lists:keyfind(UPI, 1, UPI_R_Sums),
Tmp = [{FLU, case proplists:get_value(FLU, Projs) of
P when is_record(P, projection_v1) ->
P#projection_v1.epoch_csum;
Else ->
Else
end} || FLU <- UPI ++ Repairing],
case lists:usort([CSum || {_FLU, CSum} <- Tmp]) of
[_] ->
{agreed_membership, {UPI, Repairing}};
Else2 ->
{not_agreed, {UPI, Repairing}, Else2}
end;
_Else ->
{not_agreed, {undefined, undefined}, Projs}
end
end || UPI <- UniqueUPIs],
AgreedResUPI_Rs = [UPI++Repairing ||
{agreed_membership, {UPI, Repairing}} <- Res],
Tag = case lists:usort(lists:flatten(AgreedResUPI_Rs)) ==
lists:sort(lists:flatten(AgreedResUPI_Rs)) of
true ->
ok_disjoint;
false ->
bummer_NOT_DISJOINT
end,
{Epoch, {Tag, Res}}.
if length(UniqueUPIs) =< 1 ->
{ok_disjoint, UniqueUPIs};
true ->
Flat = lists:flatten(UniqueUPIs),
case lists:usort(Flat) == lists:sort(Flat) of
true ->
{ok_disjoint, UniqueUPIs};
false ->
{bummer_NOT_DISJOINT, {lists:sort(Flat), ProjsSumms}}
end
end.
all_reports_are_disjoint(Report) ->
[] == [X || {_Epoch, Tuple}=X <- Report,
element(1, Tuple) /= ok_disjoint].
extract_chains_relative_to_flu(FLU, Report) ->
{FLU, [{Epoch, UPI, Repairing} ||
{Epoch, {ok_disjoint, Es}} <- Report,
{agreed_membership, {UPI, Repairing}} <- Es,
lists:member(FLU, UPI) orelse lists:member(FLU, Repairing)]}.
chain_to_projection(MyName, Epoch, UPI_list, Repairing_list, All_list) ->
MemberDict = orddict:from_list([{FLU, #p_srvr{name=FLU}} ||
FLU <- All_list]),
machi_projection:new(Epoch, MyName, MemberDict,
All_list -- (UPI_list ++ Repairing_list),
UPI_list, Repairing_list, [{artificial_by, ?MODULE}]).
case [X || {_Epoch, Tuple}=X <- Report,
element(1, Tuple) /= ok_disjoint] of
[] ->
true;
Else ->
Else
end.
-ifndef(PULSE).
simple_chain_state_transition_is_sane_test_() ->
{timeout, 60, fun() -> simple_chain_state_transition_is_sane_test2() end}.
simple_chain_state_transition_is_sane_test2() ->
%% All: A list of all FLUS for a particular test
%% UPI1: some combination of All that represents UPI1
%% Repair1: Some combination of (All -- UP1) that represents Repairing1
%% ... then we test check_simple_chain_state_transition_is_sane() with all
%% possible UPI1 and Repair1.
[true = check_simple_chain_state_transition_is_sane(UPI1, Repair1) ||
%% The five elements below runs on my MacBook Pro in about 4.8 seconds
%% All <- [ [a], [a,b], [a,b,c], [a,b,c,d], [a,b,c,d,e] ],
%% For elements on the same MBP is about 0.15 seconds.
All <- [ [a], [a,b], [a,b,c], [a,b,c,d] ],
UPI1 <- machi_util:combinations(All),
Repair1 <- machi_util:combinations(All -- UPI1)].
%% Given a UPI1 and Repair1 list, we calculate all possible good UPI2
%% lists. For all good {UPI1, Repair1} -> UPI2 transitions, then the
%% simple_chain_state_transition_is_sane() function must be true. For
%% all other UPI2 transitions, simple_chain_state_transition_is_sane()
%% must be false.
%%
%% We include adding an extra possible participant, 'bogus', to the
%% list of all possible UPI2 transitions, just to demonstrate that
%% adding an extra element/participant/thingie is never sane.
check_simple_chain_state_transition_is_sane([], []) ->
true;
check_simple_chain_state_transition_is_sane(UPI1, Repair1) ->
Good_UPI2s = [ X ++ Y || X <- machi_util:ordered_combinations(UPI1),
Y <- machi_util:ordered_combinations(Repair1)],
All_UPI2s = machi_util:combinations(lists:usort(UPI1 ++ Repair1) ++
[bogus]),
[true = ?MGR:simple_chain_state_transition_is_sane(UPI1, Repair1, UPI2) ||
UPI2 <- Good_UPI2s],
[false = ?MGR:simple_chain_state_transition_is_sane(UPI1, Repair1, UPI2) ||
UPI2 <- (All_UPI2s -- Good_UPI2s)],
true.
-ifdef(EQC).
%% This QuickCheck property is crippled: because the old chain state
%% transition check, chain_mgr_legacy:projection_transition_is_sane(),
%% is so buggy and the new check is (apparently) so much better, I
%% have changed the ?WHENFAIL() criteria to check for either agreement
%% _or_ a case where the legacy check says true but the new check says
%% false.
%%
%% On my MacBook Pro, less than 1000 tests are required to find at
%% least one problem case for the legacy check that the new check is
%% correct for. Running for two seconds can do about 3,500 test
%% cases.
compare_eqc_setup_test_() ->
%% Silly QuickCheck can take a long time to start up, check its
%% license, etcetc.
%% machi_chain_manager1_test: compare_eqc_setup_test...[1.788 s] ok
{timeout, 30,
fun() -> eqc:quickcheck(eqc:testing_time(0.1, true)) end}.
-define(COMPARE_TIMEOUT, 1.2).
%% -define(COMPARE_TIMEOUT, 4.8).
compare_legacy_with_v2_chain_transition_check1_test() ->
eqc:quickcheck(
?QC_OUT(
eqc:testing_time(
?COMPARE_TIMEOUT,
prop_compare_legacy_with_v2_chain_transition_check(primitive)))).
compare_legacy_with_v2_chain_transition_check2_test() ->
eqc:quickcheck(
?QC_OUT(
eqc:testing_time(
?COMPARE_TIMEOUT,
prop_compare_legacy_with_v2_chain_transition_check(primitive)))).
prop_compare_legacy_with_v2_chain_transition_check() ->
prop_compare_legacy_with_v2_chain_transition_check(primitive).
prop_compare_legacy_with_v2_chain_transition_check(Style) ->
%% ?FORALL(All, nonempty(list([a,b,c,d,e])),
?FORALL(All, non_empty(some([a,b,c,d])),
?FORALL({Author1, UPI1, Repair1x, Author2, UPI2, Repair2x},
{elements(All),some(All),some(All),elements(All),some(All),some(All)},
?IMPLIES(length(lists:usort(UPI1 ++ Repair1x)) > 0 andalso
length(lists:usort(UPI2 ++ Repair2x)) > 0,
begin
MembersDict = orddict:from_list([{X, #p_srvr{name=X}} || X <- All]),
Repair1 = Repair1x -- UPI1,
Down1 = All -- (UPI1 ++ Repair1),
Repair2 = Repair2x -- UPI2,
Down2 = All -- (UPI2 ++ Repair2),
P1 = machi_projection:new(1, Author1, MembersDict,
Down1, UPI1, Repair1, []),
P2 = machi_projection:new(2, Author2, MembersDict,
Down2, UPI2, Repair2, []),
Old_res = chain_mgr_legacy:projection_transition_is_sane(
P1, P2, Author1, false),
Old_p = case Old_res of true -> true;
_ -> false
end,
case Style of
primitive ->
New_res = ?MGR:chain_state_transition_is_sane(
Author1, UPI1, Repair1, Author2, UPI2),
New_p = case New_res of true -> true;
_ -> false
end;
whole ->
New_res = machi_chain_manager1:projection_transition_is_sane(
P1, P2, Author1, false),
New_p = case New_res of true -> true;
_ -> false
end
end,
(catch ets:insert(count,
{{Author1, UPI1, Repair1, Author2, UPI2, Repair2}, true})),
?WHENFAIL(io:format(user,
"Old_res: ~p/~p (~p)\nNew_res: ~p/~p (why line ~P)\n",
[Old_p, Old_res, catch get(why1),
New_p, New_res, catch get(why2), 30]),
%% Old_p == New_p)
Old_p == New_p orelse (Old_p == true andalso New_p == false))
end))).
some(L) ->
?LET(L2, list(oneof(L)),
dedupe(L2)).
dedupe(L) ->
dedupe(L, []).
dedupe([H|T], Seen) ->
case lists:member(H, Seen) of
false ->
[H|dedupe(T, [H|Seen])];
true ->
dedupe(T, Seen)
end;
dedupe([], _) ->
[].
make_prop_ets() ->
ets:new(count, [named_table, set, public]).
-endif. % EQC
smoke0_test() ->
{ok, _} = machi_partition_simulator:start_link({1,2,3}, 50, 50),
Host = "localhost",
@ -185,16 +326,6 @@ smoke1_test() ->
end.
nonunanimous_setup_and_fix_test() ->
%% TODO attack list:
%% __ Add start option to chain manager to be "passive" only, i.e.,
%% not immediately go to work on
%% 1. Start FLUs with full complement of FLU+proj+chmgr.
%% 2. Put each of them under a supervisor?
%% - Sup proc could be a created-specifically-for-test thing, perhaps?
%% Rather than relying on a supervisor with reg name + OTP app started
%% plus plus more more yaddayadda?
%% 3. Add projection catalog/orddict of #p_srvr records??
%% 4. Fix this test, etc etc.
machi_partition_simulator:start_link({1,2,3}, 100, 0),
TcpPort = 62877,
FluInfo = [{a,TcpPort+0,"./data.a"}, {b,TcpPort+1,"./data.b"}],
@ -224,13 +355,16 @@ nonunanimous_setup_and_fix_test() ->
ok = ?FLU_PC:write_projection(Proxy_b, public, P1b),
%% ?D(x),
{not_unanimous,_,_}=_XX = ?MGR:test_read_latest_public_projection(Ma, false),
{not_unanimous,_,_}=_XX = ?MGR:test_read_latest_public_projection(
Ma, false),
%% ?Dw(_XX),
{not_unanimous,_,_}=_YY = ?MGR:test_read_latest_public_projection(Ma, true),
{not_unanimous,_,_}=_YY = ?MGR:test_read_latest_public_projection(
Ma, true),
%% The read repair here doesn't automatically trigger the creation of
%% a new projection (to try to create a unanimous projection). So
%% we expect nothing to change when called again.
{not_unanimous,_,_}=_YY = ?MGR:test_read_latest_public_projection(Ma, true),
{not_unanimous,_,_}=_YY = ?MGR:test_read_latest_public_projection(
Ma, true),
{now_using, _, EpochNum_a} = ?MGR:trigger_react_to_env(Ma),
{no_change, _, EpochNum_a} = ?MGR:trigger_react_to_env(Ma),
@ -258,5 +392,54 @@ timer:sleep(3000),
ok = machi_partition_simulator:stop()
end.
unanimous_report_test() ->
TcpPort = 63877,
FluInfo = [{a,TcpPort+0,"./data.a"}, {b,TcpPort+1,"./data.b"}],
P_s = [#p_srvr{name=Name, address="localhost", port=Port} ||
{Name,Port,_Dir} <- FluInfo],
MembersDict = machi_projection:make_members_dict(P_s),
E5 = 5,
UPI5 = [a,b],
Rep5 = [],
Report5 = [UPI5],
P5 = machi_projection:new(E5, a, MembersDict, [], UPI5, Rep5, []),
{ok_disjoint, Report5} =
unanimous_report2([{a, P5}, {b, P5}]),
{ok_disjoint, Report5} =
unanimous_report2([{a, not_in_this_epoch}, {b, P5}]),
{ok_disjoint, Report5} =
unanimous_report2([{a, P5}, {b, not_in_this_epoch}]),
UPI5_b = [a],
Rep5_b = [],
P5_b = machi_projection:new(E5, b, MembersDict, [b], UPI5_b, Rep5_b, []),
{bummer_NOT_DISJOINT, _} = unanimous_report2([{a, P5}, {b, P5_b}]),
UPI5_c = [b],
Rep5_c = [a],
P5_c = machi_projection:new(E5, b, MembersDict, [], UPI5_c, Rep5_c, []),
{bummer_NOT_DISJOINT, _} =
unanimous_report2([{a, P5}, {b, P5_c}]),
P_s3 = [#p_srvr{name=Name, address="localhost", port=Port} ||
{Name,Port,_Dir} <- FluInfo ++ [{c,TcpPort+0,"./data.c"}]],
MembersDict3 = machi_projection:make_members_dict(P_s3),
UPI5_d = [c],
Rep5_d = [a],
Report5d = [UPI5, UPI5_d],
P5_d = machi_projection:new(E5, b, MembersDict3, [b], UPI5_d, Rep5_d, []),
{ok_disjoint, Report5d} = unanimous_report2([{a, P5}, {b, P5_d}]),
UPI5_e = [b],
Rep5_e = [c],
Report5be = [UPI5_b, UPI5_e],
P5_e = machi_projection:new(E5, b, MembersDict3, [a], UPI5_e, Rep5_e, []),
{bummer_NOT_DISJOINT, _} = unanimous_report2([{a, P5}, {b, P5_e}]),
{ok_disjoint, Report5be} = unanimous_report2([{a, P5_b}, {b, P5_e}]),
ok.
-endif. % !PULSE
-endif. % TEST

View file

@ -66,7 +66,7 @@ smoke_test2() ->
end || Pid <- [a_chmgr,b_chmgr,c_chmgr] ]
end,
_ = lists:foldl(
fun(_, [{c,[a,b,c]}]=Acc) -> Acc;
fun(_, [{a,[a,b,c]}]=Acc) -> Acc;
(_, _Acc) ->
TickAll(), % has some sleep time inside
Xs = [begin

View file

@ -33,7 +33,7 @@
-endif.
-export([start_link/3, stop/0,
get/1, reset_thresholds/2,
get/1, reset_thresholds/2, set_seed/1,
no_partitions/0, always_last_partitions/0, always_these_partitions/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@ -67,6 +67,9 @@ get(Nodes) ->
reset_thresholds(OldThreshold, NoPartitionThreshold) ->
gen_server:call(?MODULE, {reset_thresholds, OldThreshold, NoPartitionThreshold}, infinity).
set_seed(Seed) ->
gen_server:call(?MODULE, {set_seed, Seed}, infinity).
no_partitions() ->
reset_thresholds(-999, 999).
@ -98,6 +101,8 @@ handle_call({get, Nodes}, _From, S) ->
handle_call({reset_thresholds, OldThreshold, NoPartitionThreshold}, _From, S) ->
{reply, ok, S#state{old_threshold=OldThreshold,
no_partition_threshold=NoPartitionThreshold}};
handle_call({set_seed, Seed}, _From, S) ->
{reply, ok, S#state{seed=Seed}};
handle_call({always_these_partitions, Parts}, _From, S) ->
{reply, ok, S#state{old_partitions={Parts,[na_reset_by_always]}}};
handle_call({stop}, _From, S) ->

View file

@ -78,9 +78,12 @@ compare_test() ->
try_it(MyName, All_list, UPI_list, Down_list, Repairing_list, Ps) ->
try
P = machi_projection:new(MyName, All_list, UPI_list, Down_list,
P = machi_projection:new(MyName, All_list, Down_list, UPI_list,
Repairing_list, Ps),
is_record(P, projection_v1)
Down_list = P#projection_v1.down,
UPI_list = P#projection_v1.upi,
Repairing_list = P#projection_v1.repairing,
true
catch _:_ ->
false
end.