Merge branch 'slf/chmgr-model-overhaul'

This commit is contained in:
Scott Lystig Fritchie 2015-06-15 17:22:19 +09:00
commit 2e94ccc84e
18 changed files with 573 additions and 421 deletions

View file

@ -29,7 +29,7 @@ edoc-clean:
pulse: compile
env USE_PULSE=1 $(REBAR_BIN) skip_deps=true clean compile
env USE_PULSE=1 $(REBAR_BIN) skip_deps=true -D PULSE eunit
env USE_PULSE=1 $(REBAR_BIN) skip_deps=true -D PULSE eunit -v
APPS = kernel stdlib sasl erts ssl compiler eunit crypto
PLT = $(HOME)/.machi_dialyzer_plt
@ -41,6 +41,8 @@ dialyzer: deps compile
dialyzer -Wno_return --plt $(PLT) ebin
dialyzer-test: deps compile
echo Force rebar to recompile .eunit dir w/o running tests > /dev/null
rebar skip_deps=true eunit suite=lamport_clock
dialyzer -Wno_return --plt $(PLT) .eunit
clean_plt:

View file

@ -41,7 +41,7 @@ func, and pattern match Erlang style in that func.
** DONE Add major comment sections to the CR-impl client
** DONE Simple basho_bench driver, put some unscientific chalk on the benchtop
** TODO Create parallel PULSE test for basic API plus chain manager repair
** TODO Add client-side vs. server-side checksum type, expand client API?
** DONE Add client-side vs. server-side checksum type, expand client API?
** TODO Add gproc and get rid of registered name rendezvous
*** TODO Fixes the atom table leak
*** TODO Fixes the problem of having active sequencer for the same prefix

View file

@ -36,6 +36,13 @@
props = [] :: list() % proplist for other related info
}).
-record(flap_i, {
flap_count :: {term(), term()},
all_hosed :: list(),
all_flap_counts :: list(),
bad :: list()
}).
-type p_srvr() :: #p_srvr{}.
-type p_srvr_dict() :: orddict:orddict().
@ -51,6 +58,8 @@
upi :: [pv1_server()],
repairing :: [pv1_server()],
down :: [pv1_server()],
flap :: 'undefined' | #flap_i{}, % flapping information
inner :: 'undefined' | #projection_v1{},
dbg :: list(), %proplist(), is checksummed
dbg2 :: list(), %proplist(), is not checksummed
members_dict :: p_srvr_dict()

View file

@ -102,7 +102,7 @@ run(append, KeyGen, ValueGen, #m{conn=Conn}=S) ->
{ok, S};
{error, _}=Err ->
?ERROR("append ~w bytes to prefix ~w: ~p\n",
[iolist_size(ValueGen), Prefix, Err]),
[iolist_size(Value), Prefix, Err]),
{error, Err, S}
end;
run(read, KeyGen, ValueGen, #m{max_key=undefined}=S) ->

View file

@ -203,7 +203,6 @@ init({MyName, InitMembersDict, MgrOpts}) ->
{use_partition_simulator, Opt(use_partition_simulator, false)},
{network_partitions, Opt(network_partitions, [])},
{network_islands, Opt(network_islands, [])},
{flapping_i, Opt(flapping, [])},
{up_nodes, Opt(up_nodes, not_init_yet)}],
ActiveP = Opt(active_mode, true),
S = #ch_mgr{name=MyName,
@ -252,8 +251,7 @@ handle_call({set_chain_members, MembersDict}, _From,
down=NewDown,
members_dict=MembersDict}),
%% Reset all flapping state.
NewProj2 = NewProj#projection_v1{dbg=replace(NewProj#projection_v1.dbg,
[make_flapping_i()])},
NewProj2 = NewProj#projection_v1{flap=make_flapping_i()},
S3 = S2#ch_mgr{proj=NewProj2,
proj_history=queue:new()},
{_QQ, S4} = do_react_to_env(S3),
@ -594,7 +592,7 @@ calc_projection(_OldThreshold, _NoPartitionThreshold, LastProj,
%% TODO create a real API call for fetching this info?
SameEpoch_p = check_latest_private_projections_same_epoch(
tl(NewUPI_list) ++ Repairing_list2,
NewUPI_list ++ Repairing_list2,
S#ch_mgr.proj, Partitions, S),
if Simulator_p andalso SameEpoch_p ->
D_foo=[{repair_airquote_done, {we_agree, (S#ch_mgr.proj)#projection_v1.epoch_number}}],
@ -636,6 +634,10 @@ calc_projection(_OldThreshold, _NoPartitionThreshold, LastProj,
{P, S#ch_mgr{runenv=RunEnv3}}.
check_latest_private_projections_same_epoch(FLUs, MyProj, Partitions, S) ->
%% NOTE: The caller must provide us with the FLUs list for all
%% FLUs that must be up & available right now. So any
%% failure of perhaps_call_t() means that we must return
%% false.
FoldFun = fun(_FLU, false) ->
false;
(FLU, true) ->
@ -896,17 +898,15 @@ react_to_env_A30(Retries, P_latest, LatestUnanimousP, _ReadExtra,
down=P_i#projection_v1.all_members
-- [MyName]}
end,
FinalInnerEpoch =
case inner_projection_exists(P_current) of
false ->
FinalCreation = P_newprop3#projection_v1.creation_time,
AllFlapCounts_epk =
[Epk || {{Epk,_FlTime}, _FlCount} <-
get_all_flap_counts(P_newprop3)],
case AllFlapCounts_epk of
[] ->
%% HRM, distrust?...
%% P_newprop3#projection_v1.epoch_number;
P_newprop3#projection_v1.epoch_number;
[_|_] ->
lists:max(AllFlapCounts_epk)
@ -921,8 +921,10 @@ react_to_env_A30(Retries, P_latest, LatestUnanimousP, _ReadExtra,
andalso
P_oldinner#projection_v1.down ==
P_inner#projection_v1.down ->
FinalCreation = P_oldinner#projection_v1.creation_time,
P_oldinner#projection_v1.epoch_number;
true ->
FinalCreation = P_newprop3#projection_v1.creation_time,
P_oldinner#projection_v1.epoch_number + 1
end
end,
@ -932,12 +934,15 @@ react_to_env_A30(Retries, P_latest, LatestUnanimousP, _ReadExtra,
%% up nodes > 1, repair is required there! In the
%% current simulator, repair is not simulated and
%% finished (and then growing the UPI list). Fix.
P_inner2 = P_inner#projection_v1{epoch_number=FinalInnerEpoch},
InnerInfo = [{inner_summary, machi_projection:make_summary(P_inner2)},
{inner_projection, P_inner2}],
P_inner2 = machi_projection:update_checksum(
P_inner#projection_v1{epoch_number=FinalInnerEpoch,
creation_time=FinalCreation}),
InnerInfo = [{inner_summary,
machi_projection:make_summary(P_inner2)}],
DbgX = replace(P_newprop3#projection_v1.dbg, InnerInfo),
?REACT({a30, ?LINE, [qqqwww|DbgX]}),
{P_newprop3#projection_v1{dbg=DbgX}, S_i};
{P_newprop3#projection_v1{dbg=DbgX,
inner=P_inner2}, S_i};
_ ->
{P_newprop3, S3}
end,
@ -1222,7 +1227,12 @@ react_to_env_B10(Retries, P_newprop, P_latest, LatestUnanimousP,
{newprop_flap_count, P_newprop_flap_count},
{flap_limit, FlapLimit}]}),
_B10Hack = get(b10_hack),
io:format(user, "{FLAP: ~w flaps ~w}!\n", [S#ch_mgr.name, P_newprop_flap_count]),
case proplists:get_value(private_write_verbose, S#ch_mgr.opts) of
true ->
io:format(user, "{FLAP: ~w flaps ~w}! ", [S#ch_mgr.name, P_newprop_flap_count]);
_ ->
ok
end,
if
%% MEANWHILE, we have learned some things about this
@ -1309,31 +1319,21 @@ react_to_env_C100(P_newprop, P_latest,
I_am_UPI_in_newprop_p = lists:member(MyName, P_newprop#projection_v1.upi),
I_am_Repairing_in_latest_p = lists:member(MyName,
P_latest#projection_v1.repairing),
ShortCircuit_p =
P_latest#projection_v1.epoch_number > P_current#projection_v1.epoch_number
andalso
I_am_UPI_in_newprop_p
andalso
I_am_Repairing_in_latest_p,
Current_sane_p = projection_transition_is_sane(P_current, P_latest,
MyName),
case {ShortCircuit_p, Current_sane_p} of
put(xxx_hack, [{p_current, machi_projection:make_summary(P_current)},
{epoch_compare, P_latest#projection_v1.epoch_number > P_current#projection_v1.epoch_number},
{i_am_upi_in_newprop_p, I_am_UPI_in_newprop_p},
{i_am_repairing_in_latest_p, I_am_Repairing_in_latest_p}]),
case Current_sane_p of
_ when P_current#projection_v1.epoch_number == 0 ->
%% Epoch == 0 is reserved for first-time, just booting conditions.
?REACT({c100, ?LINE, [first_write]}),
react_to_env_C110(P_latest, S);
{true, _} ->
%% Someone else believes that I am repairing. We assume
%% that nobody is being Byzantine, so we'll believe that I
%% am/should be repairing. We ignore our proposal and try
%% to go with the latest.
?REACT({c100, ?LINE, [repairing_short_circuit]}),
react_to_env_C110(P_latest, S);
{_, true} ->
true ->
?REACT({c100, ?LINE, [sane]}),
react_to_env_C110(P_latest, S);
{_, _AnyOtherReturnValue} ->
_AnyOtherReturnValue ->
%% P_latest is not sane.
%% By process of elimination, P_newprop is best,
%% so let's write it.
@ -1343,7 +1343,9 @@ react_to_env_C100(P_newprop, P_latest,
react_to_env_C110(P_latest, #ch_mgr{name=MyName} = S) ->
?REACT(c110),
Extra_todo = [],
%% Extra_todo = [],
Extra_todo = get(xxx_hack),
%% Extra_todo = [{hee, lists:reverse(get(react))}],
P_latest2 = machi_projection:update_dbg2(P_latest, Extra_todo),
MyNamePid = proxy_pid(MyName, S),
@ -1362,14 +1364,24 @@ react_to_env_C110(P_latest, #ch_mgr{name=MyName} = S) ->
{HH,MM,SS} = time(),
case inner_projection_exists(P_latest2) of
false ->
case proplists:get_value(private_write_verbose, S#ch_mgr.opts) of
true ->
io:format(user, "\n~2..0w:~2..0w:~2..0w.~3..0w ~p uses plain: ~w\n",
[HH,MM,SS,MSec, S#ch_mgr.name,
machi_projection:make_summary(P_latest2)]);
_ ->
ok
end;
true ->
case proplists:get_value(private_write_verbose, S#ch_mgr.opts) of
true ->
P_inner = inner_projection_or_self(P_latest2),
io:format(user, "\n~2..0w:~2..0w:~2..0w.~3..0w ~p uses inner: ~w\n",
[HH,MM,SS,MSec, S#ch_mgr.name,
machi_projection:make_summary(P_inner)])
machi_projection:make_summary(P_inner)]);
_ ->
ok
end
end;
_ ->
ok
@ -1438,12 +1450,12 @@ react_to_env_C310(P_newprop, S) ->
?REACT({c310, ?LINE,
[{newprop, machi_projection:make_summary(P_newprop)},
{write_result, WriteRes}]}),
%% io:format(user, "HEE310 ~w ~w ~w\n", [S#ch_mgr.name, self(), lists:reverse(get(react))]),
react_to_env_A10(S2).
calculate_flaps(P_newprop, _P_current, _FlapLimit,
#ch_mgr{name=MyName, proj_history=H, flap_start=FlapStart,
flaps=Flaps, runenv=RunEnv0} = S) ->
RunEnv1 = replace(RunEnv0, [{flapping_i, []}]),
flaps=Flaps, runenv=RunEnv1} = S) ->
HistoryPs = queue:to_list(H),
Ps = HistoryPs ++ [P_newprop],
UniqueProposalSummaries = lists:usort([{P#projection_v1.upi,
@ -1460,8 +1472,6 @@ calculate_flaps(P_newprop, _P_current, _FlapLimit,
HosedTransUnion = proplists:get_value(trans_all_hosed, Props),
TransFlapCounts0 = proplists:get_value(trans_all_flap_counts, Props),
_Unanimous = proplists:get_value(unanimous_flus, Props),
_NotUnanimous = proplists:get_value(not_unanimous_flus, Props),
%% NOTE: bad_answer_flus are probably due to timeout or some other network
%% glitch, i.e., anything other than {ok, P::projection()}
%% response from machi_flu0:proj_read_latest().
@ -1542,11 +1552,6 @@ calculate_flaps(P_newprop, _P_current, _FlapLimit,
FlappingI = make_flapping_i(NewFlapStart, NewFlaps, AllHosed,
AllFlapCounts, BadFLUs),
Dbg2 = [FlappingI|P_newprop#projection_v1.dbg],
%% TODO: 2015-03-04: I'm growing increasingly suspicious of
%% the 'runenv' variable that's threaded through all this code.
%% It isn't doing what I'd originally intended. Fix it.
RunEnv2 = replace(RunEnv1, [FlappingI]),
%% NOTE: Just because we increment flaps here, there's no correlation
%% to successful public proj store writes! For example,
%% if we loop through states C2xx a few times, we would incr
@ -1558,17 +1563,21 @@ calculate_flaps(P_newprop, _P_current, _FlapLimit,
%% a large local flaps count gives no concrete guarantee that any
%% communication has been successful with any other part of the
%% cluster.
{machi_projection:update_checksum(P_newprop#projection_v1{dbg=Dbg2}),
S#ch_mgr{flaps=NewFlaps, flap_start=NewFlapStart, runenv=RunEnv2}}.
%% TODO: 2015-03-04: I'm growing increasingly suspicious of
%% the 'runenv' variable that's threaded through all this code.
%% It isn't doing what I'd originally intended. Fix it.
{machi_projection:update_checksum(P_newprop#projection_v1{
flap=FlappingI}),
S#ch_mgr{flaps=NewFlaps, flap_start=NewFlapStart, runenv=RunEnv1}}.
make_flapping_i() ->
make_flapping_i({{epk,-1},?NOT_FLAPPING}, 0, [], [], []).
make_flapping_i(NewFlapStart, NewFlaps, AllHosed, AllFlapCounts, BadFLUs) ->
{flapping_i, [{flap_count, {NewFlapStart, NewFlaps}},
{all_hosed, AllHosed},
{all_flap_counts, lists:sort(AllFlapCounts)},
{bad,BadFLUs}]}.
#flap_i{flap_count={NewFlapStart, NewFlaps},
all_hosed=AllHosed,
all_flap_counts=lists:sort(AllFlapCounts),
bad=BadFLUs}.
projection_transitions_are_sane(Ps, RelativeToServer) ->
projection_transitions_are_sane(Ps, RelativeToServer, false).
@ -1665,6 +1674,7 @@ projection_transition_is_sane(
%% Additions to the UPI chain may only be at the tail
UPI_common_prefix = find_common_prefix(UPI_list1, UPI_list2),
true =
if UPI_common_prefix == [] ->
if UPI_list1 == [] orelse UPI_list2 == [] ->
%% If the common prefix is empty, then one of the
@ -1691,7 +1701,7 @@ projection_transition_is_sane(
%% [] [b]
%%
%% ... where RelativeToServer=b. In this case, b
%% has been partitions for a while and has only
%% has been partitioned for a while and has only
%% now just learned of several epoch transitions.
%% If the author of both is also in the UPI of
%% both, then those authors would not have allowed
@ -1716,15 +1726,47 @@ projection_transition_is_sane(
not (lists:member(RelativeToServer, Down_list2) orelse
lists:member(RelativeToServer, Repairing_list2)),
UPIs_are_disjointP = ordsets:is_disjoint(ordsets:from_list(UPI_list1),
ordsets:from_list(UPI_list2)),
case UPI_2_suffix -- UPI_list1 of
[] ->
true;
[_|_] = _Added_by_2 ->
if RetrospectiveP ->
%% Any servers added to the UPI must be added from the
%% repairing list ... but in retrospective mode (where
%% we're checking only the transitions where all
%% UPI+repairing participants have unanimous private
%% projections!), and if we're under asymmetric
%% partition/churn, then we may not see the repairing
%% list. So we will not check that condition here.
true;
not RetrospectiveP ->
%% We're not retrospective. So, if some server was
%% added by to the UPI, then that means that it was
%% added by repair. And repair is coordinated by the
%% UPI tail/last.
%io:format(user, "g: UPI_list1=~w, UPI_list2=~w, UPI_2_suffix=~w, ",
% [UPI_list1, UPI_list2, UPI_2_suffix]),
%io:format(user, "g", []),
true = UPI_list1 == [] orelse
UPIs_are_disjointP orelse
(lists:last(UPI_list1) == AuthorServer2)
end
end,
if not MoreCheckingP ->
ok;
MoreCheckingP ->
%% Where did elements in UPI_2_suffix come from?
%% Only two sources are permitted.
Oops_check_UPI_2_suffix =
[lists:member(X, Repairing_list1) % X added after repair done
orelse
lists:member(X, UPI_list1) % X in UPI_list1 after common pref
|| X <- UPI_2_suffix],
%% Grrrrr, ok, so this check isn't good, at least at bootstrap time.
%% TODO: false = lists:member(false, Oops_check_UPI_2_suffix),
%% The UPI_2_suffix must exactly be equal to: ordered items from
%% UPI_list1 concat'ed with ordered items from Repairing_list1.
@ -1780,30 +1822,77 @@ projection_transition_is_sane(
%% The retrospective view by
%% machi_chain_manager1_pulse.erl just can't
%% reason correctly about this situation. We
%% will instead rely on the non-introspective
%% will instead rely on the non-retrospective
%% sanity checking that each FLU does before it
%% writes to its private projection store and
%% then adopts that projection (and unwedges
%% itself, etc etc).
exit({todo, revisit, ?MODULE, ?LINE}),
if UPIs_are_disjointP ->
true;
true ->
exit({todo, revisit, ?MODULE, ?LINE,
[
{oops_check_UPI_2_suffix, Oops_check_UPI_2_suffix},
{upi_2_suffix, UPI_2_suffix},
{upi_2_concat, UPI_2_concat},
{retrospectivep, RetrospectiveP}
]}),
io:format(user, "|~p,~p TODO revisit|",
[?MODULE, ?LINE]),
ok;
ok
end;
true ->
%% The following is OK: We're shifting from a
%% normal projection to an inner one. The old
%% normal has a UPI that has nothing to do with
%% RelativeToServer a.k.a. me.
%% Or else the UPI_list1 is empty, and I'm
%% the only member of UPI_list2
%% But the new/suffix is definitely me.
%% from:
%% {epoch,847},{author,c},{upi,[c]},{repair,[]},
%% {down,[a,b,d]}
%% to:
%% {epoch,848},{author,a},{upi,[a]},{repair,[]},
%% {down,[b,c,d]}
if UPI_2_suffix == [AuthorServer2] ->
FirstCase_p = (UPI_2_suffix == [AuthorServer2])
andalso
((inner_projection_exists(P1) == false
andalso
inner_projection_exists(P2) == true)
orelse UPI_list1 == []),
%% Here's another case that's alright:
%%
%% {a,{err,exit,
%% {upi_2_suffix_error,[c]}, ....
%%
%% from:
%% {epoch,937},{author,a},{upi,[a,b]},{repair,[]},
%% {down,[c]}
%% to:
%% {epoch,943},{author,a},{upi,{a,b,c},{repair,[]},
%% {down,[]}
%% The author server doesn't matter. However,
%% there were two other epochs in between, 939
%% and 941, where there wasn't universal agreement
%% of private projections. The repair controller
%% at the tail, 'b', had decided that the repair
%% of 'c' was finished @ epoch 941.
SecondCase_p = ((UPI_2_suffix -- Repairing_list1)
== []),
if FirstCase_p ->
true;
not RetrospectiveP ->
SecondCase_p ->
true;
UPIs_are_disjointP ->
%% If there's no overlap at all between
%% UPI_list1 & UPI_list2, then we're OK
%% here.
true;
true ->
exit({upi_2_suffix_error, UPI_2_suffix})
end
end
@ -1844,17 +1933,23 @@ calc_sleep_ranked_order(MinSleep, MaxSleep, FLU, FLU_list) ->
end,
MinSleep + (SleepChunk * Index).
get_raw_flapping_i(#projection_v1{dbg=Dbg}) ->
proplists:get_value(flapping_i, Dbg, []).
get_raw_flapping_i(#projection_v1{flap=F}) ->
F.
get_flap_count(P) ->
proplists:get_value(flap_count, get_raw_flapping_i(P), {0,0}).
case get_raw_flapping_i(P) of undefined -> {0, 0};
F -> F#flap_i.flap_count
end.
get_all_flap_counts(P) ->
proplists:get_value(all_flap_counts, get_raw_flapping_i(P), []).
case get_raw_flapping_i(P) of undefined -> [];
F -> F#flap_i.all_flap_counts
end.
get_all_hosed(P) when is_record(P, projection_v1)->
proplists:get_value(all_hosed, get_raw_flapping_i(P), []).
case get_raw_flapping_i(P) of undefined -> [];
F -> F#flap_i.all_hosed
end.
merge_flap_counts(FlapCounts) ->
merge_flap_counts(FlapCounts, orddict:new()).
@ -1896,20 +1991,17 @@ gimme_random_uniform(N, S) ->
RunEnv2 = [{seed, Seed2}|lists:keydelete(seed, 1, RunEnv1)],
{X, S#ch_mgr{runenv=RunEnv2}}.
inner_projection_exists(P) ->
case proplists:get_value(inner_projection, P#projection_v1.dbg) of
undefined ->
inner_projection_exists(#projection_v1{inner=undefined}) ->
false;
_ ->
true
end.
inner_projection_exists(#projection_v1{inner=_}) ->
true.
inner_projection_or_self(P) ->
case proplists:get_value(inner_projection, P#projection_v1.dbg) of
undefined ->
case inner_projection_exists(P) of
false ->
P;
P_inner ->
P_inner
true ->
P#projection_v1.inner
end.
make_chmgr_regname(A) when is_atom(A) ->

View file

@ -135,7 +135,7 @@
-define(TIMEOUT, 2*1000).
-define(DEFAULT_TIMEOUT, 10*1000).
-define(MAX_RUNTIME, 8*1000).
-define(WORST_PROJ, #projection_v1{epoch_number=-1,epoch_csum= <<>>,
-define(WORST_PROJ, #projection_v1{epoch_number=0,epoch_csum= <<>>,
members_dict=[]}).
-record(state, {
@ -433,10 +433,13 @@ do_read_chunk2(File, Offset, Size, Depth, STime,
%% UPI+repairing.
%% If all FLUs in UPI++Repairing are not_written, then do nothing.
read_repair(ConsistencyMode, ReturnMode, File, Offset, Size, 0=Depth,
STime, #state{proj=#projection_v1{upi=[_|_]}}=S) -> % UPI is non-empty
read_repair2(ConsistencyMode, ReturnMode, File, Offset, Size, Depth + 1,
STime, S);
%% Never matches because Depth is always incremented beyond 0 prior to
%% getting here.
%%
%% read_repair(ConsistencyMode, ReturnMode, File, Offset, Size, 0=Depth,
%% STime, #state{proj=#projection_v1{upi=[_|_]}}=S) -> % UPI is non-empty
%% read_repair2(ConsistencyMode, ReturnMode, File, Offset, Size, Depth + 1,
%% STime, S);
read_repair(ConsistencyMode, ReturnMode, File, Offset, Size, Depth,
STime, #state{proj=P}=S) ->
sleep_a_while(Depth),
@ -490,7 +493,7 @@ read_repair2(ap_mode=ConsistencyMode,
ToRepair = mutation_flus(P) -- [GotItFrom],
read_repair3(ToRepair, ReturnMode, Chunk, [GotItFrom], File,
Offset, Size, Depth, STime, S);
{ok, BadChunk} ->
{ok, BadChunk, _GotItFrom} ->
exit({todo, bad_chunk_size, ?MODULE, ?LINE, File,
Offset, Size, got, byte_size(BadChunk)});
{error, bad_checksum}=BadCS ->
@ -510,10 +513,13 @@ read_repair3([], ReturnMode, Chunk, Repaired, File, Offset,
Size, Depth, STime, S) ->
read_repair4([], ReturnMode, Chunk, Repaired, File, Offset,
Size, Depth, STime, S);
read_repair3(ToRepair, ReturnMode, Chunk, Repaired, File, Offset,
Size, 0=Depth, STime, S) ->
read_repair4(ToRepair, ReturnMode, Chunk, Repaired, File, Offset,
Size, Depth + 1, STime, S);
%% Never matches because Depth is always incremented beyond 0 prior to
%% getting here.
%%
%% read_repair3(ToRepair, ReturnMode, Chunk, Repaired, File, Offset,
%% Size, 0=Depth, STime, S) ->
%% read_repair4(ToRepair, ReturnMode, Chunk, Repaired, File, Offset,
%% Size, Depth + 1, STime, S);
read_repair3(ToRepair, ReturnMode, Chunk, Repaired, File, Offset,
Size, Depth, STime, #state{proj=P}=S) ->
%% io:format(user, "read_repair3 sleep1,", []),

View file

@ -98,8 +98,8 @@ init([FluName, TcpPort, DataDir, Props0]) ->
SupFlags = {RestartStrategy, MaxRestarts, MaxSecondsBetweenRestarts},
ProjRegName = make_proj_supname(FluName),
Props = [{projection_store_registered_name, ProjRegName},
{use_partition_simulator,false}|Props0],
Props = Props0 ++ [{projection_store_registered_name, ProjRegName},
{use_partition_simulator,false}],
ProjSpec = {ProjRegName,
{machi_projection_store, start_link,
[ProjRegName, DataDir, FluName]},

View file

@ -218,7 +218,7 @@ handle_call({{list_all_projections, ProjType}, LC1}, _From, S) ->
handle_call({set_wedge_notify_pid, NotifyWedgeStateChanges}, _From, S) ->
{reply, ok, S#state{wedge_notify_pid=NotifyWedgeStateChanges}};
handle_call(_Request, _From, S) ->
Reply = whaaaaaaaaaaaaa,
Reply = {whaaaaaaaaaaaaazz, _Request},
{reply, Reply, S}.
handle_cast(_Msg, S) ->

View file

@ -64,8 +64,10 @@
chunk_size
}).
-type chunk() :: binary() | iolist(). % client can use either
-type chunk_csum() :: {file_offset(), chunk_size(), binary()}.
-type chunk() :: chunk_bin() | {chunk_csum(), chunk_bin()}.
-type chunk_bin() :: binary() | iolist(). % client can use either
-type chunk_csum() :: binary(). % 1 byte tag, N-1 bytes checksum
%% -type chunk_summary() :: {file_offset(), chunk_size(), binary()}.
-type chunk_s() :: binary(). % server always uses binary()
-type chunk_pos() :: {file_offset(), chunk_size(), file_name_s()}.
-type chunk_size() :: non_neg_integer().
@ -81,7 +83,7 @@
-type file_prefix() :: binary() | list().
-type inet_host() :: inet:ip_address() | inet:hostname().
-type inet_port() :: inet:port_number().
-type port_wrap() :: {w,atom(),term()}.
-type port_wrap() :: #yessir{}. % yessir non-standard!
-type projection() :: #projection_v1{}.
-type projection_type() :: 'public' | 'private'.
@ -118,7 +120,7 @@ append_chunk(_Host, _TcpPort, EpochID, Prefix, Chunk) ->
%% `write_chunk()' API.
-spec append_chunk_extra(port_wrap(), epoch_id(), file_prefix(), chunk(), chunk_size()) ->
{ok, chunk_pos()} | {error, error_general()} | {error, term()}.
{ok, chunk_pos()}. %%%% | {error, error_general()} | {error, term()}.
append_chunk_extra(#yessir{name=Name,start_bin=StartBin},
_EpochID, Prefix, Chunk, ChunkExtra)
when is_integer(ChunkExtra), ChunkExtra >= 0 ->
@ -128,7 +130,7 @@ append_chunk_extra(#yessir{name=Name,start_bin=StartBin},
N -> N
end,
put({Name,offset,File}, Pos + size(Chunk) + ChunkExtra),
{ok, {File, Pos}}.
{ok, {Pos, iolist_size(Chunk), Prefix}}.
%% @doc Append a chunk (binary- or iolist-style) of data to a file
%% with `Prefix' and also request an additional `Extra' bytes.
@ -140,7 +142,7 @@ append_chunk_extra(#yessir{name=Name,start_bin=StartBin},
-spec append_chunk_extra(inet_host(), inet_port(),
epoch_id(), file_prefix(), chunk(), chunk_size()) ->
{ok, chunk_pos()} | {error, error_general()} | {error, term()}.
{ok, chunk_pos()}. %%%% | {error, error_general()} | {error, term()}.
append_chunk_extra(_Host, _TcpPort, EpochID, Prefix, Chunk, ChunkExtra)
when is_integer(ChunkExtra), ChunkExtra >= 0 ->
Sock = connect(#p_srvr{proto_mod=?MODULE}),
@ -291,9 +293,7 @@ get_latest_epoch(Sock, ProjType)
when ProjType == 'public' orelse ProjType == 'private' ->
case read_latest_projection(Sock, ProjType) of
{ok, P} ->
{ok, {P#projection_v1.epoch_number, P#projection_v1.epoch_csum}};
_ ->
{ok, {0, <<"no such checksum">>}}
{ok, {P#projection_v1.epoch_number, P#projection_v1.epoch_csum}}
end.
%% @doc Get the latest epoch number + checksum from the FLU's projection store.
@ -431,9 +431,7 @@ list_all_projections(Sock, ProjType)
when ProjType == 'public' orelse ProjType == 'private' ->
case get_all_projections(Sock, ProjType) of
{ok, Ps} ->
{ok, [P#projection_v1.epoch_number || P <- Ps]};
_ ->
{error, not_written}
{ok, [P#projection_v1.epoch_number || P <- Ps]}
end.
%% @doc Get all epoch numbers from the FLU's projection store.
@ -558,7 +556,7 @@ connect(#p_srvr{name=Name, props=Props})->
%% Add fake dict entries for these files
[begin
Prefix = list_to_binary(io_lib:format("fake~w", [X])),
{ok, _} = append_chunk_extra(Sock, unused, Prefix, <<>>, FileSize)
{ok, _} = append_chunk_extra(Sock, {1,<<"unused">>}, Prefix, <<>>, FileSize)
end || X <- lists:seq(1, NumFiles)],
Sock.

View file

@ -22,6 +22,7 @@
-compile(export_all).
-ifdef(TEST).
-ifndef(PULSE).
-include_lib("eunit/include/eunit.hrl").
@ -72,5 +73,6 @@ verify_file_checksums_test() ->
ok = ?FLU:stop(FLU1)
end.
-endif. % !PULSE
-endif. % TEST

View file

@ -34,6 +34,7 @@
-compile(export_all).
-ifdef(TEST).
-ifndef(PULSE).
-ifdef(EQC).
-include_lib("eqc/include/eqc.hrl").
@ -151,7 +152,7 @@ convergence_demo_testfun(NumFLUs) ->
{e,TcpPort+4,"./data.e"}, {f,TcpPort+5,"./data.f"}],
FLU_biglist = [X || {X,_,_} <- FluInfo],
All_list = lists:sublist(FLU_biglist, NumFLUs),
io:format(user, "\nSET # of FLus = ~w members ~w).\n",
io:format(user, "\nSET # of FLUs = ~w members ~w).\n",
[NumFLUs, All_list]),
machi_partition_simulator:start_link({111,222,33}, 0, 100),
_ = machi_partition_simulator:get(All_list),
@ -167,7 +168,8 @@ convergence_demo_testfun(NumFLUs) ->
{Name, PPid}
end || {#p_srvr{name=Name}=P, _Dir} <- PsDirs],
MembersDict = machi_projection:make_members_dict(Ps),
MgrOpts = [private_write_verbose, {active_mode,false},
%% MgrOpts = [private_write_verbose, {active_mode,false},
MgrOpts = [{active_mode,false},
{use_partition_simulator, true}],
MgrNamez =
[begin
@ -186,29 +188,23 @@ convergence_demo_testfun(NumFLUs) ->
Parent = self(),
DoIt = fun(Iters, S_min, S_max) ->
io:format(user, "\nDoIt: top\n\n", []),
%% io:format(user, "\nDoIt: top\n\n", []),
io:format(user, "DoIt, ", []),
Pids = [spawn(fun() ->
random:seed(now()),
[begin
erlang:yield(),
S_max_rand = random:uniform(
S_max + 1),
io:format(user, "{t}", []),
%% io:format(user, "{t}", []),
Elapsed =
?MGR:sleep_ranked_order(
S_min, S_max_rand,
M_name, All_list),
_ = ?MGR:test_react_to_env(MMM),
%% if M_name == d ->
%% [_ = ?MGR:test_react_to_env(MMM) ||
%% _ <- lists:seq(1,3)],
%% superunfair;
%% true ->
%% ok
%% end,
%% Be more unfair by not
%% sleeping here.
%% timer:sleep(S_max - Elapsed),
% timer:sleep(S_max - Elapsed),
Elapsed
end || _ <- lists:seq(1, Iters)],
Parent ! done
@ -221,100 +217,55 @@ convergence_demo_testfun(NumFLUs) ->
end || _ <- Pids]
end,
_XandYs1 = [[{X,Y}] || X <- All_list, Y <- All_list, X /= Y],
_XandYs2 = [[{X,Y}, {A,B}] || X <- All_list, Y <- All_list, X /= Y,
A <- All_list, B <- All_list, A /= B,
X /= A],
_XandYs3 = [[{X,Y}, {A,B}, {C,D}] || X <- All_list, Y <- All_list, X /= Y,
A <- All_list, B <- All_list, A /= B,
C <- All_list, D <- All_list, C /= D,
X /= A, X /= C, A /= C],
%% AllPartitionCombinations = _XandYs1 ++ _XandYs2,
%% AllPartitionCombinations = _XandYs3,
AllPartitionCombinations = _XandYs1 ++ _XandYs2 ++ _XandYs3,
?D({?LINE, length(AllPartitionCombinations)}),
machi_partition_simulator:reset_thresholds(10, 50),
io:format(user, "\nLet loose the dogs of war!\n", []),
DoIt(30, 0, 0),
AllPs = make_partition_list(All_list),
PartitionCounts = lists:zip(AllPs, lists:seq(1, length(AllPs))),
FLUFudge = if NumFLUs < 4 ->
2;
true ->
13
end,
[begin
%% io:format(user, "\nSET partitions = ~w.\n", [ [] ]),machi_partition_simulator:no_partitions(),
%% [DoIt(50, 10, 100) || _ <- [1,2,3]],
io:format(user, "\nLet loose the dogs of war!\n", []),
DoIt(30, 0, 0),
io:format(user, "\nSET partitions = ~w.\n", [ [] ]),machi_partition_simulator:no_partitions(),
[DoIt(10, 10, 100) || _ <- [1]],
%% machi_partition_simulator:reset_thresholds(10, 50),
%% io:format(user, "\nLet loose the dogs of war!\n", []),
%% DoIt(30, 0, 0),
machi_partition_simulator:always_these_partitions(Partition),
io:format(user, "\nSET partitions = ~w.\n", [Partition]),
[DoIt(50, 10, 100) || _ <- [1,2,3,4] ],
_PPP =
[begin
{ok, PPPallPubs} = ?FLU_PC:list_all_projections(FLU,public),
[begin
{ok, Pr} = todo_why_does_this_crash_sometimes(
FLUName, FLU, PPPepoch),
{Pr#projection_v1.epoch_number, FLUName, Pr}
end || PPPepoch <- PPPallPubs]
end || {FLUName, FLU} <- Namez],
%% io:format(user, "PPP ~p\n", [lists:sort(lists:append(_PPP))]),
io:format(user, "\nSET partitions = ~w (~w of ~w) at ~w\n",
[Partition, Count, length(AllPs), time()]),
[DoIt(40, 10, 50) || _ <- lists:seq(0, trunc(NumFLUs*FLUFudge)) ],
%%%%%%%% {stable,true} = {stable,private_projections_are_stable(Namez, DoIt)},
{hosed_ok,true} = {hosed_ok,all_hosed_lists_are_identical(Namez, Partition)},
io:format(user, "\nSweet, all_hosed are identical-or-islands-inconclusive.\n", []),
timer:sleep(1000),
{stable,true} = {stable,private_projections_are_stable(Namez, DoIt)},
io:format(user, "\nSweet, private projections are stable\n", []),
io:format(user, "Rolling sanity check ... ", []),
PrivProjs = [{Name, begin
{ok, Ps8} = ?FLU_PC:get_all_projections(FLU,
private),
Ps9 = if length(Ps8) < 5*1000 ->
Ps8;
true ->
io:format(user, "trunc a bit... ", []),
lists:nthtail(3*1000, Ps8)
end,
[P || P <- Ps9,
P#projection_v1.epoch_number /= 0]
end} || {Name, FLU} <- Namez],
try
[{FLU, true} = {FLU, ?MGR:projection_transitions_are_sane_retrospective(Psx, FLU)} ||
{FLU, Psx} <- PrivProjs]
catch _Err:_What ->
io:format(user, "PrivProjs ~p\n", [PrivProjs]),
exit({line, ?LINE, _Err, _What})
end,
io:format(user, "Yay!\n", []),
timer:sleep(1250),
ok
%% end || Partition <- AllPartitionCombinations
%% end || Partition <- [ [{a,b},{b,d},{c,b}],
%% [{a,b},{b,d},{c,b}, {a,b},{b,a},{a,c},{c,a},{a,d},{d,a}],
%% %% [{a,b},{b,d},{c,b}, {b,a},{a,b},{b,c},{c,b},{b,d},{d,b}],
%% [{a,b},{b,d},{c,b}, {c,a},{a,c},{c,b},{b,c},{c,d},{d,c}],
%% [{a,b},{b,d},{c,b}, {d,a},{a,d},{d,b},{b,d},{d,c},{c,d}] ]
%% end || Partition <- [ [{a,b}, {b,c}],
%% [{a,b}, {c,b}] ]
%% end || Partition <- [ [{a,b}, {b,c}] ] %% hosed-not-equal @ 3 FLUs
%% end || Partition <- [ [{b,d}] ]
%% end || Partition <- [ [{a,b}, {b,a}] ]
%% end || Partition <- [ [{a,b}, {b,a}, {a,c},{c,a}] ]
end || Partition <- [ [{a,b}],
[{b,a}] ]
%% end || Partition <- [ [{a,b}, {c,b}],
%% [{a,b}, {b,c}] ]
%% end || Partition <- [ [{a,b}, {b,c}, {c,d}],
%% [{a,b}, {b,c},{b,d}, {c,d}],
%% [{b,a}, {b,c}, {c,d}],
%% [{a,b}, {c,b}, {c,d}],
%% [{a,b}, {b,c}, {d,c}] ]
%% end || Partition <- [ [{a,b}, {b,c}, {c,d}, {d,e}],
%% [{b,a}, {b,c}, {c,d}, {d,e}],
%% [{a,b}, {c,b}, {c,d}, {d,e}],
%% [{a,b}, {b,c}, {d,c}, {d,e}],
%% [{a,b}, {b,c}, {c,d}, {e,d}] ]
%% end || Partition <- [ [{c,a}] ]
%% end || Partition <- [ [{c,a}], [{c,b}, {a, b}] ]
%% end || Partition <- [ [{a,b},{b,a}, {a,c},{c,a}, {a,d},{d,a}],
%% [{a,b},{b,a}, {a,c},{c,a}, {a,d},{d,a}, {b,c}],
%% [{a,b},{b,a}, {a,c},{c,a}, {a,d},{d,a}, {c,d}] ]
%% end || Partition <- [ [{a,b}],
%% [{a,b}, {a,b},{b,a},{a,c},{c,a},{a,d},{d,a}],
%% [{a,b}, {b,a},{a,b},{b,c},{c,b},{b,d},{d,b}],
%% [{a,b}, {c,a},{a,c},{c,b},{b,c},{c,d},{d,c}],
%% [{a,b}, {d,a},{a,d},{d,b},{b,d},{d,c},{c,d}] ]
end || {Partition, Count} <- PartitionCounts
],
%% exit(end_experiment),
io:format(user, "\nSET partitions = []\n", []),
io:format(user, "We should see convergence to 1 correct chain.\n", []),
machi_partition_simulator:no_partitions(),
[DoIt(50, 10, 100) || _ <- [1]],
io:format(user, "Sweet, finishing early\n", []), exit(yoyoyo_testing_hack_finishing_early),
%% WARNING: In asymmetric partitions, private_projections_are_stable()
%% will never be true; code beyond this point on the -exp3
%% branch is bit-rotted, sorry!
[DoIt(50, 10, 50) || _ <- [1]],
true = private_projections_are_stable(Namez, DoIt),
io:format(user, "~s\n", [os:cmd("date")]),
@ -331,24 +282,21 @@ convergence_demo_testfun(NumFLUs) ->
%% unique chains are disjoint.
true = machi_chain_manager1_test:all_reports_are_disjoint(Report),
%% Given the report, we flip it around so that we observe the
%% sets of chain transitions relative to each FLU.
R_Chains = [machi_chain_manager1_test:extract_chains_relative_to_flu(
FLU, Report) || FLU <- All_list],
%% ?D(R_Chains),
R_Projs = [{FLU, [machi_chain_manager1_test:chain_to_projection(
FLU, Epoch, UPI, Repairing, All_list) ||
{Epoch, UPI, Repairing} <- E_Chains]} ||
{FLU, E_Chains} <- R_Chains],
%% For each chain transition experienced by a particular FLU,
%% confirm that each state transition is OK.
PrivProjs = [{Name, begin
{ok, Ps9} = ?FLU_PC:get_all_projections(FLU,
private),
[P || P <- Ps9,
P#projection_v1.epoch_number /= 0]
end} || {Name, FLU} <- Namez],
try
[{FLU, true} = {FLU, ?MGR:projection_transitions_are_sane(Psx, FLU)} ||
{FLU, Psx} <- R_Projs],
[{FLU, true} = {FLU, ?MGR:projection_transitions_are_sane_retrospective(Psx, FLU)} ||
{FLU, Psx} <- PrivProjs],
io:format(user, "\nAll sanity checks pass, hooray!\n", [])
catch _Err:_What ->
io:format(user, "Report ~p\n", [Report]),
io:format(user, "PrivProjs ~p\n", [PrivProjs]),
exit({line, ?LINE, _Err, _What})
end,
%% ?D(R_Projs),
@ -366,6 +314,72 @@ convergence_demo_testfun(NumFLUs) ->
ok = machi_partition_simulator:stop()
end.
%% Many of the static partition lists below have been problematic at one
%% time or another.....
%%
%% Uncomment *one* of the following make_partition_list() bodies.
make_partition_list(All_list) ->
_X_Ys1 = [[{X,Y}] || X <- All_list, Y <- All_list, X /= Y],
_X_Ys2 = [[{X,Y}, {A,B}] || X <- All_list, Y <- All_list, X /= Y,
A <- All_list, B <- All_list, A /= B,
X /= A],
_X_Ys3 = [[{X,Y}, {A,B}, {C,D}] || X <- All_list, Y <- All_list, X /= Y,
A <- All_list, B <- All_list, A /= B,
C <- All_list, D <- All_list, C /= D,
X /= A, X /= C, A /= C],
%% Concat = _X_Ys1 ++ _X_Ys2.
%% Concat = _X_Ys3.
Concat = _X_Ys1 ++ _X_Ys2 ++ _X_Ys3,
random_sort(lists:usort([lists:sort(L) || L <- Concat])).
%% [ [{a,b},{b,d},{c,b}],
%% [{a,b},{b,d},{c,b}, {a,b},{b,a},{a,c},{c,a},{a,d},{d,a}],
%% %% [{a,b},{b,d},{c,b}, {b,a},{a,b},{b,c},{c,b},{b,d},{d,b}],
%% [{a,b},{b,d},{c,b}, {c,a},{a,c},{c,b},{b,c},{c,d},{d,c}],
%% [{a,b},{b,d},{c,b}, {d,a},{a,d},{d,b},{b,d},{d,c},{c,d}] ].
%% [ [{a,b}, {b,c}],
%% [{a,b}, {c,b}] ].
%% [{a,b}, {b,c}] ]. %% hosed-not-equal @ 3 FLUs
%% [{b,d}] ].
%% [ [{a,b}, {b,a}] ].
%% [ [{a,b},{b,c},{c,a}],
%% [{a,b}, {b,a}, {a,c},{c,a}] ].
%% [{a,b}, {c,b}],
%% [{a,b}, {b,c}] ].
%% [ [{a,b}, {b,c}, {c,d}],
%% [{a,b}, {b,c},{b,d}, {c,d}],
%% [{b,a}, {b,c}, {c,d}],
%% [{a,b}, {c,b}, {c,d}],
%% [{a,b}, {b,c}, {d,c}] ].
%% [ [{a,b}, {b,c}, {c,d}, {d,e}],
%% [{b,a}, {b,c}, {c,d}, {d,e}],
%% [{a,b}, {c,b}, {c,d}, {d,e}],
%% [{a,b}, {b,c}, {d,c}, {d,e}],
%% [{a,b}, {b,c}, {c,d}, {e,d}] ].
%% [ [{c,a}] ].
%% [ [{c,a}], [{c,b}, {a, b}] ].
%% [ [{a,b},{b,a}, {a,c},{c,a}, {a,d},{d,a}],
%% [{a,b},{b,a}, {a,c},{c,a}, {a,d},{d,a}, {b,c}],
%% [{a,b},{b,a}, {a,c},{c,a}, {a,d},{d,a}, {c,d}] ].
%% [ [{a,b}],
%% [{a,b}, {a,b},{b,a},{a,c},{c,a},{a,d},{d,a}],
%% [{a,b}, {b,a},{a,b},{b,c},{c,b},{b,d},{d,b}],
%% [{a,b}, {c,a},{a,c},{c,b},{b,c},{c,d},{d,c}],
%% [{a,b}, {d,a},{a,d},{d,b},{b,d},{d,c},{c,d}] ].
todo_why_does_this_crash_sometimes(FLUName, FLU, PPPepoch) ->
try
{ok, _}=Res = ?FLU_PC:read_projection(FLU, public, PPPepoch),
@ -374,75 +388,32 @@ todo_why_does_this_crash_sometimes(FLUName, FLU, PPPepoch) ->
io:format(user, "QQQ Whoa, it crashed this time for ~p at epoch ~p\n",
[FLUName, PPPepoch]),
timer:sleep(1000),
exit(still_a_problem),
?FLU_PC:read_projection(FLU, public, PPPepoch)
end.
private_projections_are_stable(Namez, PollFunc) ->
Private1 = [?FLU_PC:get_latest_epochid(FLU, private) ||
{_Name, FLU} <- Namez],
Private1 = [get_latest_inner_proj_summ(FLU) || {_Name, FLU} <- Namez],
PollFunc(5, 1, 10),
Private2 = [?FLU_PC:get_latest_epochid(FLU, private) ||
{_Name, FLU} <- Namez],
Private2 = [get_latest_inner_proj_summ(FLU) || {_Name, FLU} <- Namez],
if Private1 == Private2 ->
ok;
true ->
io:format(user, "Oops: Private1: ~p\n", [Private1]),
io:format(user, "Oops: Private2: ~p\n", [Private2])
end,
true = (Private1 == Private2).
all_hosed_lists_are_identical(Namez, Partition0) ->
Partition = lists:usort(Partition0),
Ps = [element(2,?FLU_PC:read_latest_projection(FLU, private)) ||
{_Name, FLU} <- Namez],
UniqueAllHoseds = lists:usort([machi_chain_manager1:get_all_hosed(P) ||
{ok, P} <- Ps]),
Members = [M || {M, _Pid} <- Namez],
Islands = machi_partition_simulator:partitions2num_islands(
Members, Partition),
%% io:format(user, "all_hosed_lists_are_identical:\n", []),
%% io:format(user, " Uniques = ~p Islands ~p\n Partition ~p\n",
%% [Uniques, Islands, Partition]),
case length(UniqueAllHoseds) of
1 ->
true;
%% TODO: With the addition of the digraph stuff below, the clause
%% below probably isn't necessary anymore, since the
%% digraph calculation should catch complete partition islands?
_ when Islands == 'many' ->
%% There are at least two partitions, so yes, it's quite
%% possible that the all_hosed lists may differ.
%% TODO Fix this up to be smarter about fully-isolated
%% islands of partition.
true;
_ ->
DG = digraph:new(),
Connection = machi_partition_simulator:partition2connection(
Members, Partition),
[digraph:add_vertex(DG, X) || X <- Members],
[digraph:add_edge(DG, X, Y) || {X,Y} <- Connection],
Any =
lists:any(
fun(X) ->
NotX = Members -- [X],
lists:any(
fun(Y) ->
%% There must be a shortest path of length
%% two in both directions, otherwise
%% the read projection call will fail.
%% And it's that failure that we're
%% interested in here.
XtoY = digraph:get_short_path(DG, X, Y),
YtoX = digraph:get_short_path(DG, Y, X),
(XtoY == false orelse
length(XtoY) > 2)
orelse
(YtoX == false orelse
length(YtoX) > 2)
end, NotX)
end, Members),
digraph:delete(DG),
if Any == true ->
%% There's a missing path of length 2 between some
%% two FLUs, so yes, there's going to be
%% non-identical all_hosed lists.
true;
true ->
false % There's no excuse, buddy
end
end.
get_latest_inner_proj_summ(FLU) ->
{ok, Proj} = ?FLU_PC:read_latest_projection(FLU, private),
#projection_v1{epoch_number=E, upi=UPI, repairing=Repairing, down=Down} =
machi_chain_manager1:inner_projection_or_self(Proj),
{E, UPI, Repairing, Down}.
random_sort(L) ->
random:seed(now()),
L1 = [{random:uniform(99999), X} || X <- L],
[X || {_, X} <- lists:sort(L1)].
-endif. % !PULSE
-endif. % TEST

View file

@ -26,6 +26,7 @@
-compile(export_all).
-include("machi_projection.hrl").
-include_lib("eqc/include/eqc.hrl").
-include_lib("eqc/include/eqc_statem.hrl").
@ -37,7 +38,7 @@
-compile({pulse_replace_module, [{application, pulse_application}]}).
%% The following functions contains side_effects but are run outside
%% PULSE, i.e. PULSE needs to leave them alone
-compile({pulse_skip,[{prop_pulse_test_,0}]}).
-compile({pulse_skip,[{prop_pulse_test_,0}, {shutdown_hard,0}]}).
-compile({pulse_no_side_effect,[{file,'_','_'}, {erlang, now, 0}]}).
%% Used for output within EUnit...
@ -50,6 +51,7 @@
-define(MGR, machi_chain_manager1).
-define(MGRTEST, machi_chain_manager1_test).
-define(FLU_PC, machi_proxy_flu1_client).
-record(state, {
step=0,
@ -62,7 +64,7 @@ initial_state() ->
#state{}.
gen_num_pids() ->
choose(2, 5).
choose(2, length(all_list_extra())).
gen_seed() ->
noshrink({choose(1, 10000), choose(1, 10000), choose(1, 10000)}).
@ -80,7 +82,7 @@ command(S) ->
{ 1, {call, ?MODULE, change_partitions,
[gen_old_threshold(), gen_no_partition_threshold()]}},
{50, {call, ?MODULE, do_ticks,
[choose(5, 100), S#state.pids,
[choose(5, 200), S#state.pids,
gen_old_threshold(), gen_no_partition_threshold()]}}
]).
@ -100,34 +102,66 @@ next_state2(S, _Res, {call, _, _Func, _Args}) ->
postcondition(_S, {call, _, _Func, _Args}, _Res) ->
true.
all_list_extra() ->
[ %% Genenerators assume that this list is at least 2 items
{#p_srvr{name=a, address="localhost", port=7400,
props=[{chmgr, a_chmgr}]}, "./data.pulse.a"}
, {#p_srvr{name=b, address="localhost", port=7401,
props=[{chmgr, b_chmgr}]}, "./data.pulse.b"}
, {#p_srvr{name=c, address="localhost", port=7402,
props=[{chmgr, c_chmgr}]}, "./data.pulse.c"}
, {#p_srvr{name=d, address="localhost", port=7403,
props=[{chmgr, d_chmgr}]}, "./data.pulse.d"}
, {#p_srvr{name=e, address="localhost", port=7404,
props=[{chmgr, e_chmgr}]}, "./data.pulse.e"}
].
all_list() ->
[a,b,c].
%% [a,b,c,d,e].
[P#p_srvr.name || {P, _Dir} <- all_list_extra()].
setup(_Num, Seed) ->
?QC_FMT("\nsetup,", []),
All_list = all_list(),
_ = machi_partition_simulator:start_link(Seed, 0, 100),
setup(Num, Seed) ->
?QC_FMT("\nsetup(~w", [Num]),
error_logger:tty(false),
All_list = lists:sublist(all_list(), Num),
All_listE = lists:sublist(all_list_extra(), Num),
%% shutdown_hard() has taken care of killing all relevant procs.
[machi_flu1_test:clean_up_data_dir(Dir) || {_P, Dir} <- All_listE],
?QC_FMT(",z~w", [?LINE]),
%% Start partition simulator
{ok, PSimPid} = machi_partition_simulator:start_link(Seed, 0, 100),
_Partitions = machi_partition_simulator:get(All_list),
?QC_FMT(",z~w", [?LINE]),
FLU_pids = [begin
{ok, FLUPid} = machi_flu0:start_link(Name),
_ = machi_flu0:get_epoch(FLUPid),
FLUPid
end || Name <- All_list],
Namez = lists:zip(All_list, FLU_pids),
Mgr_pids = [begin
{ok, Mgr} = ?MGR:start_link(Name, All_list, FLU_pid),
Mgr
end || {Name, FLU_pid} <- Namez],
timer:sleep(1),
{ok, P1} = ?MGR:test_calc_projection(hd(Mgr_pids), false),
P1Epoch = P1#projection.epoch_number,
[ok = machi_flu0:proj_write(FLU, P1Epoch, public, P1) || FLU <- FLU_pids],
[?MGR:test_react_to_env(Mgr) || Mgr <- Mgr_pids],
%% Start FLUs and their associated procs
{ok, SupPid} = machi_flu_sup:start_link(),
FluOpts = [{use_partition_simulator, true}, {active_mode, false}],
[begin
#p_srvr{name=Name, port=Port} = P,
{ok, _} = machi_flu_psup:start_flu_package(Name, Port, Dir, FluOpts)
end || {P, Dir} <- All_listE],
%% Set up the chain
Dict = orddict:from_list([{P#p_srvr.name, P} || {P, _Dir} <- All_listE]),
?QC_FMT(",z~w", [?LINE]),
[machi_chain_manager1:set_chain_members(get_chmgr(P), Dict) ||
{P, _Dir} <- All_listE],
%% Trigger some environment reactions for humming consensus: first
%% do all the same server first, then round-robin evenly across
%% servers.
[begin
_QQa = machi_chain_manager1:test_react_to_env(get_chmgr(P))
end || {P, _Dir} <- All_listE, _I <- lists:seq(1,20), _Repeat <- [1,2]],
?QC_FMT(",z~w", [?LINE]),
[begin
_QQa = machi_chain_manager1:test_react_to_env(get_chmgr(P))
end || _I <- lists:seq(1,20), {P, _Dir} <- All_listE, _Repeat <- [1,2]],
?QC_FMT(",z~w", [?LINE]),
Res = {FLU_pids, Mgr_pids},
ProxiesDict = ?FLU_PC:start_proxies(Dict),
Res = {PSimPid, SupPid, ProxiesDict, All_listE},
put(manager_pids_hack, Res),
?QC_FMT("),", []),
Res.
change_partitions(OldThreshold, NoPartitionThreshold) ->
@ -137,9 +171,9 @@ change_partitions(OldThreshold, NoPartitionThreshold) ->
always_last_partitions() ->
machi_partition_simulator:always_last_partitions().
private_stable_check(FLUs) ->
{_FLU_pids, Mgr_pids} = get(manager_pids_hack),
Res = private_projections_are_stable_check(FLUs, Mgr_pids),
private_stable_check() ->
{_PSimPid, _SupPid, ProxiesDict, All_listE} = get(manager_pids_hack),
Res = private_projections_are_stable_check(ProxiesDict, All_listE),
if not Res ->
io:format(user, "BUMMER: private stable check failed!\n", []);
true ->
@ -149,7 +183,8 @@ private_stable_check(FLUs) ->
do_ticks(Num, PidsMaybe, OldThreshold, NoPartitionThreshold) ->
io:format(user, "~p,~p,~p|", [Num, OldThreshold, NoPartitionThreshold]),
{_FLU_pids, Mgr_pids} = case PidsMaybe of
{_PSimPid, _SupPid, ProxiesDict, All_listE} =
case PidsMaybe of
undefined -> get(manager_pids_hack);
_ -> PidsMaybe
end,
@ -157,48 +192,52 @@ do_ticks(Num, PidsMaybe, OldThreshold, NoPartitionThreshold) ->
machi_partition_simulator:reset_thresholds(OldThreshold,
NoPartitionThreshold);
true ->
?QC_FMT("{e=~w},", [get_biggest_private_epoch_number()]),
?QC_FMT("{e=~w},", [get_biggest_private_epoch_number(ProxiesDict)]),
machi_partition_simulator:no_partitions()
end,
Res = exec_ticks(Num, Mgr_pids),
Res = exec_ticks(Num, All_listE),
if not is_integer(OldThreshold) ->
?QC_FMT("{e=~w},", [get_biggest_private_epoch_number()]);
?QC_FMT("{e=~w},", [get_biggest_private_epoch_number(ProxiesDict)]);
true ->
ok
end,
Res.
get_biggest_private_epoch_number() ->
get_biggest_private_epoch_number(ProxiesDict) ->
lists:last(
lists:usort(
lists:flatten(
[machi_flu0:proj_list_all(FLU, private) ||
FLU <- all_list()]))).
[begin
{ok, {Epoch, _}} = ?FLU_PC:get_latest_epochid(Proxy, private),
Epoch
end || {_Name, Proxy} <- orddict:to_list(ProxiesDict)]))).
dump_state() ->
try
?QC_FMT("dump_state(", []),
{FLU_pids, _Mgr_pids} = get(manager_pids_hack),
Namez = zip(all_list(), FLU_pids),
Report = ?MGRTEST:unanimous_report(Namez),
{_PSimPid, _SupPid, ProxiesDict, _AlE} = get(manager_pids_hack),
Report = ?MGRTEST:unanimous_report(ProxiesDict),
Namez = ProxiesDict,
%% ?QC_FMT("Report ~p\n", [Report]),
Diag1 = [begin
Ps = machi_flu0:proj_get_all(FLU, Type),
[io_lib:format("~p ~p ~p: ~w\n", [FLUName, Type, P#projection.epoch_number, ?MGR:make_projection_summary(P)]) || P <- Ps]
end || {FLUName, FLU} <- Namez,
Type <- [public] ],
%% Diag1 = [begin
%% {ok, Ps} = ?FLU_PC:get_all_projections(Proxy, Type),
%% [io_lib:format("~p ~p ~p: ~w\n", [FLUName, Type, P#projection_v1.epoch_number, machi_projection:make_summary(P)]) || P <- Ps]
%% end || {FLUName, Proxy} <- orddict:to_list(ProxiesDict),
%% Type <- [public] ],
UniquePrivateEs =
lists:usort(lists:flatten(
[machi_flu0:proj_list_all(FLU, private) ||
{_FLUName, FLU} <- Namez])),
P_lists0 = [{FLUName, Type, machi_flu0:proj_get_all(FLUPid, Type)} ||
{FLUName, FLUPid} <- Namez, Type <- [public,private]],
[element(2,?FLU_PC:list_all_projections(Proxy,private)) ||
{_FLUName, Proxy} <- orddict:to_list(ProxiesDict)])),
P_lists0 = [{FLUName, Type,
element(2,?FLU_PC:get_all_projections(Proxy, Type))} ||
{FLUName, Proxy} <- orddict:to_list(ProxiesDict),
Type <- [public,private]],
P_lists = [{FLUName, Type, P} || {FLUName, Type, Ps} <- P_lists0,
P <- Ps],
AllDict = lists:foldl(fun({FLU, Type, P}, D) ->
K = {FLU, Type, P#projection.epoch_number},
K = {FLU, Type, P#projection_v1.epoch_number},
dict:store(K, P, D)
end, dict:new(), lists:flatten(P_lists)),
DumbFinderBackward =
@ -208,28 +247,35 @@ dump_state() ->
{ok, T} -> T;
error -> error_unwritten
end;
%% case machi_flu0:proj_read(FLU, E, private) of
%% {ok, T} -> T;
%% Else -> Else
%% end;
(_E, Acc) ->
Acc
end
end,
Diag2 = [[
io_lib:format("~p private: ~w\n",
[FLUName,
?MGR:make_projection_summary(
lists:foldl(DumbFinderBackward(FLUName),
error_unwritten,
lists:seq(Epoch, 0, -1)))])
|| {FLUName, _FLU} <- Namez]
|| Epoch <- UniquePrivateEs],
%% Diag2 = [[
%% io_lib:format("~p private: ~w\n",
%% [FLUName,
%% machi_projection:make_summary(
%% lists:foldl(DumbFinderBackward(FLUName),
%% error_unwritten,
%% lists:seq(Epoch, 0, -1)))])
%% || {FLUName, _FLU} <- Namez]
%% || Epoch <- UniquePrivateEs],
PrivProjs = [{Name, begin
{ok, Ps} = ?FLU_PC:get_all_projections(Proxy,
private),
[P || P <- Ps,
P#projection_v1.epoch_number /= 0]
end} || {Name, Proxy} <- ProxiesDict],
?QC_FMT(")", []),
{Report, lists:flatten([Diag1, Diag2])}
Diag1 = Diag2 = "skip_diags",
{Report, PrivProjs, lists:flatten([Diag1, Diag2])}
catch XX:YY ->
?QC_FMT("OUCH: ~p ~p @ ~p\n", [XX, YY, erlang:get_stacktrace()])
?QC_FMT("OUCH: ~p ~p @ ~p\n", [XX, YY, erlang:get_stacktrace()]),
?QC_FMT("Exiting now to move to manual post-mortem....\n", []),
erlang:halt(0),
false
end.
prop_pulse() ->
@ -245,9 +291,9 @@ prop_pulse() ->
Stabilize1 = [{set,{var,99999995},
{call, ?MODULE, always_last_partitions, []}}],
Stabilize2 = [{set,{var,99999996},
{call, ?MODULE, private_stable_check, [all_list()]}}],
{call, ?MODULE, private_stable_check, []}}],
LastTriggerTicks = {set,{var,99999997},
{call, ?MODULE, do_ticks, [25, undefined, no, no]}},
{call, ?MODULE, do_ticks, [123, undefined, no, no]}},
Cmds1 = lists:duplicate(2, LastTriggerTicks),
%% Cmds1 = lists:duplicate(length(all_list())*2, LastTriggerTicks),
Cmds = Cmds0 ++
@ -260,9 +306,14 @@ prop_pulse() ->
{_H, _S, _R} = run_commands(?MODULE, Cmds)
end, [{seed, Seed},
{strategy, unfair}]),
ok = shutdown_hard(),
{Report, Diag} = S2#state.dump_state,
%% ?QC_FMT("S2 ~p\n", [S2]),
case S2#state.dump_state of
undefined ->
?QC_FMT("BUMMER Cmds = ~p\n", [Cmds]);
_ ->
ok
end,
{Report, PrivProjs, Diag} = S2#state.dump_state,
%% Report is ordered by Epoch. For each private projection
%% written during any given epoch, confirm that all chain
@ -270,21 +321,12 @@ prop_pulse() ->
%% unique chains are disjoint.
AllDisjointP = ?MGRTEST:all_reports_are_disjoint(Report),
%% Given the report, we flip it around so that we observe the
%% sets of chain transitions relative to each FLU.
R_Chains = [?MGRTEST:extract_chains_relative_to_flu(FLU, Report) ||
FLU <- all_list()],
R_Projs = [{FLU, [?MGRTEST:chain_to_projection(
FLU, Epoch, UPI, Repairing, all_list()) ||
{Epoch, UPI, Repairing} <- E_Chains]} ||
{FLU, E_Chains} <- R_Chains],
%% For each chain transition experienced by a particular FLU,
%% confirm that each state transition is OK.
Sane =
[{FLU,_SaneRes} = {FLU,?MGR:projection_transitions_are_sane_retrospective(
Ps, FLU)} ||
{FLU, Ps} <- R_Projs],
{FLU, Ps} <- PrivProjs],
SaneP = lists:all(fun({_FLU, SaneRes}) -> SaneRes == true end, Sane),
%% The final report item should say that all are agreed_membership.
@ -299,13 +341,17 @@ prop_pulse() ->
LastRepXs
end,
ok = shutdown_hard(),
?WHENFAIL(
begin
?QC_FMT("Cmds = ~p\n", [Cmds]),
?QC_FMT("Res = ~p\n", [Res]),
?QC_FMT("Diag = ~s\n", [Diag]),
?QC_FMT("Report = ~p\n", [Report]),
?QC_FMT("PrivProjs = ~p\n", [PrivProjs]),
?QC_FMT("Sane = ~p\n", [Sane]),
?QC_FMT("SingleChainNoRepair failure =\n ~p\n", [SingleChainNoRepair])
,erlang:halt(0)
end,
conjunction([{res, Res == true orelse Res == ok},
{all_disjoint, AllDisjointP},
@ -331,49 +377,60 @@ prop_pulse_test_() ->
end}.
shutdown_hard() ->
(catch machi_partition_simulator:stop()),
[(catch machi_flu0:stop(X)) || X <- all_list()],
?QC_FMT("shutdown(", []),
(catch unlink(whereis(machi_partition_simulator))),
[begin
Pid = whereis(X),
spawn(fun() -> (catch X:stop()) end),
timer:sleep(50),
(catch unlink(Pid)),
timer:sleep(10),
(catch exit(Pid, shutdown)),
timer:sleep(1),
(catch exit(whereis(machi_partition_simulator), kill)),
[(catch exit(whereis(X), kill)) || X <- all_list()],
erlang:yield(),
(catch exit(Pid, kill))
end || X <- [machi_partition_simulator, machi_flu_sup] ],
timer:sleep(1),
?QC_FMT(")", []),
ok.
exec_ticks(Num, Mgr_pids) ->
exec_ticks(Num, All_listE) ->
Parent = self(),
Pids = [spawn_link(fun() ->
[begin
erlang:yield(),
M_name = P#p_srvr.name,
Max = 10,
Elapsed =
?MGR:sleep_ranked_order(1, Max, M_name, all_list()),
Res = ?MGR:test_react_to_env(MMM),
Res = ?MGR:test_react_to_env(get_chmgr(P)),
timer:sleep(erlang:max(0, Max - Elapsed)),
Res=Res %% ?D({self(), Res})
end || _ <- lists:seq(1,Num)],
Parent ! done
end) || {M_name, MMM} <- lists:zip(all_list(), Mgr_pids) ],
end) || {P, _Dir} <- All_listE],
[receive
done ->
ok
after 5000 ->
exit(icky_timeout)
end || _ <- Pids],
%% after 500*1000 ->
%% exit(icky_timeout)
end || _Pid <- Pids],
ok.
private_projections_are_stable_check(All_list, Mgr_pids) ->
private_projections_are_stable_check(ProxiesDict, All_listE) ->
%% TODO: extend the check to look not only for latest num, but
%% also check for flapping, and if yes, to see if all_hosed are
%% all exactly equal.
_ = exec_ticks(40, Mgr_pids),
Private1 = [machi_flu0:proj_get_latest_num(FLU, private) ||
FLU <- All_list],
_ = exec_ticks(5, Mgr_pids),
Private2 = [machi_flu0:proj_get_latest_num(FLU, private) ||
FLU <- All_list],
_ = exec_ticks(40, All_listE),
Private1 = [?FLU_PC:get_latest_epochid(Proxy, private) ||
{_FLU, Proxy} <- orddict:to_list(ProxiesDict)],
_ = exec_ticks(5, All_listE),
Private2 = [?FLU_PC:get_latest_epochid(Proxy, private) ||
{_FLU, Proxy} <- orddict:to_list(ProxiesDict)],
(Private1 == Private2).
get_chmgr(#p_srvr{props=Ps}) ->
proplists:get_value(chmgr, Ps).
-endif. % PULSE

View file

@ -45,17 +45,32 @@
-include_lib("eunit/include/eunit.hrl").
-compile(export_all).
%% Example:
%% [{1,{ok_disjoint,[{agreed_membership,{[a],[b,c]}}]}},
%% {3,{ok_disjoint,[{agreed_membership,{[a],[b,c]}}]}},
%% {8,
%% {ok_disjoint,[{not_agreed,{[a],[b,c]},
%% [{b,not_in_this_epoch},
%% <<65,159,66,113,232,15,156,244,197,
%% 210,39,82,229,84,192,19,27,45,161,38>>]}]}},
%% {10,{ok_disjoint,[{agreed_membership,{[c],[]}}]}},
%% ...]
unanimous_report(Namez) ->
UniquePrivateEs =
lists:usort(lists:flatten(
[element(2, ?FLU_PC:list_all_projections(FLU, private)) ||
{_FLUName, FLU} <- Namez])),
[unanimous_report(Epoch, Namez) || Epoch <- UniquePrivateEs].
[unanimous_report(Epoch, Namez) || Epoch <- UniquePrivateEs,
Epoch /= 0].
unanimous_report(Epoch, Namez) ->
Projs = [{FLUName, case ?FLU_PC:read_projection(FLU, private, Epoch) of
{ok, T} -> T;
_Else -> not_in_this_epoch
Projs = [{FLUName,
case ?FLU_PC:read_projection(FLU, private, Epoch) of
{ok, T} ->
machi_chain_manager1:inner_projection_or_self(T);
_Else ->
{FLUName, not_in_this_epoch}
end} || {FLUName, FLU} <- Namez],
UPI_R_Sums = [{Proj#projection_v1.upi, Proj#projection_v1.repairing,
Proj#projection_v1.epoch_csum} ||
@ -71,31 +86,12 @@ unanimous_report(Epoch, Namez) ->
%% that all FLUs are in agreement.
{UPI, Repairing, _CSum} =
lists:keyfind(UPI, 1, UPI_R_Sums),
%% TODO: make certain that this subtlety doesn't get
%% last in later implementations.
%% So, this is a bit of a tricky thing. If we're at
%% upi=[c] and repairing=[a,b], then the transition
%% (eventually!) to upi=[c,a] does not currently depend
%% on b being an active participant in the repair.
%%
%% Yes, b's state is very important for making certain
%% that all repair operations succeed both to a & b.
%% However, in this simulation, we only consider that
%% the head(Repairing) is sane. Therefore, we use only
%% the "HeadOfRepairing" in our considerations here.
HeadOfRepairing = case Repairing of
[H_Rep|_] ->
[H_Rep];
_ ->
[]
end,
Tmp = [{FLU, case proplists:get_value(FLU, Projs) of
P when is_record(P, projection_v1) ->
P#projection_v1.epoch_csum;
Else ->
Else
end} || FLU <- UPI ++ HeadOfRepairing],
end} || FLU <- UPI ++ Repairing],
case lists:usort([CSum || {_FLU, CSum} <- Tmp]) of
[_] ->
{agreed_membership, {UPI, Repairing}};
@ -103,7 +99,7 @@ unanimous_report(Epoch, Namez) ->
{not_agreed, {UPI, Repairing}, Else2}
end;
_Else ->
{UPI, not_unique, Epoch, _Else}
{not_agreed, {undefined, undefined}, Projs}
end
end || UPI <- UniqueUPIs],
AgreedResUPI_Rs = [UPI++Repairing ||
@ -128,10 +124,11 @@ extract_chains_relative_to_flu(FLU, Report) ->
lists:member(FLU, UPI) orelse lists:member(FLU, Repairing)]}.
chain_to_projection(MyName, Epoch, UPI_list, Repairing_list, All_list) ->
exit({todo_broken_fixme,?MODULE,?LINE}),
machi_projection:new(Epoch, MyName, All_list,
MemberDict = orddict:from_list([{FLU, #p_srvr{name=FLU}} ||
FLU <- All_list]),
machi_projection:new(Epoch, MyName, MemberDict,
All_list -- (UPI_list ++ Repairing_list),
UPI_list, Repairing_list, []).
UPI_list, Repairing_list, [{artificial_by, ?MODULE}]).
-ifndef(PULSE).
@ -260,5 +257,5 @@ nonunanimous_setup_and_fix_test() ->
ok = machi_partition_simulator:stop()
end.
-endif. % not PULSE
-endif. % !PULSE
-endif. % TEST

View file

@ -24,8 +24,9 @@
-include("machi_projection.hrl").
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-ifndef(PULSE).
-include_lib("eunit/include/eunit.hrl").
smoke_test_() -> {timeout, 1*60, fun() -> smoke_test2() end}.
@ -141,4 +142,5 @@ smoke_test2() ->
exit(SupPid, normal)
end.
-endif. % !PULSE
-endif. % TEST.

View file

@ -22,6 +22,7 @@
-compile(export_all).
-ifdef(TEST).
-include("machi.hrl").
-include("machi_projection.hrl").
-include_lib("eunit/include/eunit.hrl").
@ -29,6 +30,15 @@
-define(FLU, machi_flu1).
-define(FLU_C, machi_flu1_client).
clean_up_data_dir(DataDir) ->
[begin
Fs = filelib:wildcard(DataDir ++ Glob),
[file:delete(F) || F <- Fs],
[file:del_dir(F) || F <- Fs]
end || Glob <- ["*/*/*/*", "*/*/*", "*/*", "*"] ],
_ = file:del_dir(DataDir),
ok.
setup_test_flu(RegName, TcpPort, DataDir) ->
setup_test_flu(RegName, TcpPort, DataDir, []).
@ -48,6 +58,8 @@ setup_test_flu(RegName, TcpPort, DataDir, DbgProps) ->
timer:sleep(10),
FLU1.
-ifndef(PULSE).
flu_smoke_test() ->
Host = "localhost",
TcpPort = 32957,
@ -141,7 +153,8 @@ flu_smoke_test() ->
{error, bad_arg} = ?FLU_C:trunc_hack(Host, TcpPort,
?DUMMY_PV1_EPOCH, BadFile),
ok = ?FLU_C:quit(machi_util:connect(Host, TcpPort))
ok = ?FLU_C:quit(?FLU_C:connect(#p_srvr{address=Host,
port=TcpPort}))
after
ok = ?FLU:stop(FLU1)
end.
@ -197,13 +210,5 @@ bad_checksum_test() ->
ok = ?FLU:stop(FLU1)
end.
clean_up_data_dir(DataDir) ->
[begin
Fs = filelib:wildcard(DataDir ++ Glob),
[file:delete(F) || F <- Fs],
[file:del_dir(F) || F <- Fs]
end || Glob <- ["*/*/*/*", "*/*/*", "*/*", "*"] ],
_ = file:del_dir(DataDir),
ok.
-endif. % !PULSE
-endif. % TEST

View file

@ -21,6 +21,7 @@
-module(machi_flu_psup_test).
-ifdef(TEST).
-ifndef(PULSE).
-include_lib("eunit/include/eunit.hrl").
@ -101,7 +102,7 @@ partial_stop_restart2() ->
{_,_,_} = machi_chain_manager1:test_react_to_env(hd(ChMgrs)),
[begin
_QQa = machi_chain_manager1:test_react_to_env(ChMgr)
end || _ <- lists:seq(1,25), ChMgr <- ChMgrs],
end || _ <- lists:seq(1,125), ChMgr <- ChMgrs],
%% All chain managers & projection stores should be using the
%% same projection which is max projection in each store.
@ -112,8 +113,10 @@ partial_stop_restart2() ->
{ok, Proj_m} = machi_projection_store:read_latest_projection(
hd(PStores), public),
[begin
{ok, Proj_m} = machi_projection_store:read_latest_projection(
PStore, ProjType)
{ok, Proj_m2} = machi_projection_store:read_latest_projection(
PStore, ProjType),
true = (machi_projection:update_dbg2(Proj_m, []) ==
machi_projection:update_dbg2(Proj_m2, []))
end || ProjType <- [public, private], PStore <- PStores ],
Epoch_m = Proj_m#projection_v1.epoch_number,
%% Confirm that all FLUs are *not* wedged, with correct proj & epoch
@ -130,8 +133,10 @@ partial_stop_restart2() ->
ok = machi_flu_psup:stop_flu_package(FluName_a),
{ok, _} = Start(hd(Ps)),
%% Remember: 'a' is not in active mode.
{ok, Proj_m} = machi_projection_store:read_latest_projection(
{ok, Proj_m3} = machi_projection_store:read_latest_projection(
hd(PStores), private),
true = (machi_projection:update_dbg2(Proj_m, []) ==
machi_projection:update_dbg2(Proj_m, [])),
%% Confirm that 'a' is wedged
{error, wedged} = Append(hd(Ps)),
{_, #p_srvr{address=Addr_a, port=TcpPort_a}} = hd(Ps),
@ -158,6 +163,7 @@ partial_stop_restart2() ->
ok
end.
-endif. % !PULSE
-endif. % TEST

View file

@ -21,6 +21,8 @@
-module(machi_projection_test).
-ifdef(TEST).
-ifndef(PULSE).
-compile(export_all).
-include("machi_projection.hrl").
@ -83,4 +85,5 @@ try_it(MyName, All_list, UPI_list, Down_list, Repairing_list, Ps) ->
false
end.
-endif. % !PULSE
-endif. % TEST

View file

@ -28,6 +28,7 @@
-define(MUT, machi_proxy_flu1_client).
-ifdef(TEST).
-ifndef(PULSE).
api_smoke_test() ->
RegName = api_smoke_flu,
@ -279,4 +280,5 @@ flu_restart_test() ->
[catch machi_flu1:stop(Pid) || Pid <- get(flu_pid)]
end.
-endif. % !PULSE
-endif. % TEST