diff --git a/Makefile b/Makefile index b91d653..a118de4 100644 --- a/Makefile +++ b/Makefile @@ -29,7 +29,7 @@ edoc-clean: pulse: compile env USE_PULSE=1 $(REBAR_BIN) skip_deps=true clean compile - env USE_PULSE=1 $(REBAR_BIN) skip_deps=true -D PULSE eunit + env USE_PULSE=1 $(REBAR_BIN) skip_deps=true -D PULSE eunit -v APPS = kernel stdlib sasl erts ssl compiler eunit crypto PLT = $(HOME)/.machi_dialyzer_plt @@ -41,6 +41,8 @@ dialyzer: deps compile dialyzer -Wno_return --plt $(PLT) ebin dialyzer-test: deps compile + echo Force rebar to recompile .eunit dir w/o running tests > /dev/null + rebar skip_deps=true eunit suite=lamport_clock dialyzer -Wno_return --plt $(PLT) .eunit clean_plt: diff --git a/TODO-shortterm.org b/TODO-shortterm.org index dbb6ee4..2a69ece 100644 --- a/TODO-shortterm.org +++ b/TODO-shortterm.org @@ -41,7 +41,7 @@ func, and pattern match Erlang style in that func. ** DONE Add major comment sections to the CR-impl client ** DONE Simple basho_bench driver, put some unscientific chalk on the benchtop ** TODO Create parallel PULSE test for basic API plus chain manager repair -** TODO Add client-side vs. server-side checksum type, expand client API? +** DONE Add client-side vs. server-side checksum type, expand client API? ** TODO Add gproc and get rid of registered name rendezvous *** TODO Fixes the atom table leak *** TODO Fixes the problem of having active sequencer for the same prefix diff --git a/include/machi_projection.hrl b/include/machi_projection.hrl index ba0b8fd..a94210f 100644 --- a/include/machi_projection.hrl +++ b/include/machi_projection.hrl @@ -36,6 +36,13 @@ props = [] :: list() % proplist for other related info }). +-record(flap_i, { + flap_count :: {term(), term()}, + all_hosed :: list(), + all_flap_counts :: list(), + bad :: list() + }). + -type p_srvr() :: #p_srvr{}. -type p_srvr_dict() :: orddict:orddict(). @@ -51,6 +58,8 @@ upi :: [pv1_server()], repairing :: [pv1_server()], down :: [pv1_server()], + flap :: 'undefined' | #flap_i{}, % flapping information + inner :: 'undefined' | #projection_v1{}, dbg :: list(), %proplist(), is checksummed dbg2 :: list(), %proplist(), is not checksummed members_dict :: p_srvr_dict() diff --git a/src/machi_basho_bench_driver.erl b/src/machi_basho_bench_driver.erl index 0652df1..090f3a6 100644 --- a/src/machi_basho_bench_driver.erl +++ b/src/machi_basho_bench_driver.erl @@ -102,7 +102,7 @@ run(append, KeyGen, ValueGen, #m{conn=Conn}=S) -> {ok, S}; {error, _}=Err -> ?ERROR("append ~w bytes to prefix ~w: ~p\n", - [iolist_size(ValueGen), Prefix, Err]), + [iolist_size(Value), Prefix, Err]), {error, Err, S} end; run(read, KeyGen, ValueGen, #m{max_key=undefined}=S) -> diff --git a/src/machi_chain_manager1.erl b/src/machi_chain_manager1.erl index 572bd8f..9bbae42 100644 --- a/src/machi_chain_manager1.erl +++ b/src/machi_chain_manager1.erl @@ -203,7 +203,6 @@ init({MyName, InitMembersDict, MgrOpts}) -> {use_partition_simulator, Opt(use_partition_simulator, false)}, {network_partitions, Opt(network_partitions, [])}, {network_islands, Opt(network_islands, [])}, - {flapping_i, Opt(flapping, [])}, {up_nodes, Opt(up_nodes, not_init_yet)}], ActiveP = Opt(active_mode, true), S = #ch_mgr{name=MyName, @@ -252,8 +251,7 @@ handle_call({set_chain_members, MembersDict}, _From, down=NewDown, members_dict=MembersDict}), %% Reset all flapping state. - NewProj2 = NewProj#projection_v1{dbg=replace(NewProj#projection_v1.dbg, - [make_flapping_i()])}, + NewProj2 = NewProj#projection_v1{flap=make_flapping_i()}, S3 = S2#ch_mgr{proj=NewProj2, proj_history=queue:new()}, {_QQ, S4} = do_react_to_env(S3), @@ -594,7 +592,7 @@ calc_projection(_OldThreshold, _NoPartitionThreshold, LastProj, %% TODO create a real API call for fetching this info? SameEpoch_p = check_latest_private_projections_same_epoch( - tl(NewUPI_list) ++ Repairing_list2, + NewUPI_list ++ Repairing_list2, S#ch_mgr.proj, Partitions, S), if Simulator_p andalso SameEpoch_p -> D_foo=[{repair_airquote_done, {we_agree, (S#ch_mgr.proj)#projection_v1.epoch_number}}], @@ -636,6 +634,10 @@ calc_projection(_OldThreshold, _NoPartitionThreshold, LastProj, {P, S#ch_mgr{runenv=RunEnv3}}. check_latest_private_projections_same_epoch(FLUs, MyProj, Partitions, S) -> + %% NOTE: The caller must provide us with the FLUs list for all + %% FLUs that must be up & available right now. So any + %% failure of perhaps_call_t() means that we must return + %% false. FoldFun = fun(_FLU, false) -> false; (FLU, true) -> @@ -896,17 +898,15 @@ react_to_env_A30(Retries, P_latest, LatestUnanimousP, _ReadExtra, down=P_i#projection_v1.all_members -- [MyName]} end, - FinalInnerEpoch = case inner_projection_exists(P_current) of false -> + FinalCreation = P_newprop3#projection_v1.creation_time, AllFlapCounts_epk = [Epk || {{Epk,_FlTime}, _FlCount} <- get_all_flap_counts(P_newprop3)], case AllFlapCounts_epk of [] -> - %% HRM, distrust?... - %% P_newprop3#projection_v1.epoch_number; P_newprop3#projection_v1.epoch_number; [_|_] -> lists:max(AllFlapCounts_epk) @@ -921,8 +921,10 @@ react_to_env_A30(Retries, P_latest, LatestUnanimousP, _ReadExtra, andalso P_oldinner#projection_v1.down == P_inner#projection_v1.down -> + FinalCreation = P_oldinner#projection_v1.creation_time, P_oldinner#projection_v1.epoch_number; true -> + FinalCreation = P_newprop3#projection_v1.creation_time, P_oldinner#projection_v1.epoch_number + 1 end end, @@ -932,12 +934,15 @@ react_to_env_A30(Retries, P_latest, LatestUnanimousP, _ReadExtra, %% up nodes > 1, repair is required there! In the %% current simulator, repair is not simulated and %% finished (and then growing the UPI list). Fix. - P_inner2 = P_inner#projection_v1{epoch_number=FinalInnerEpoch}, - InnerInfo = [{inner_summary, machi_projection:make_summary(P_inner2)}, - {inner_projection, P_inner2}], + P_inner2 = machi_projection:update_checksum( + P_inner#projection_v1{epoch_number=FinalInnerEpoch, + creation_time=FinalCreation}), + InnerInfo = [{inner_summary, + machi_projection:make_summary(P_inner2)}], DbgX = replace(P_newprop3#projection_v1.dbg, InnerInfo), ?REACT({a30, ?LINE, [qqqwww|DbgX]}), - {P_newprop3#projection_v1{dbg=DbgX}, S_i}; + {P_newprop3#projection_v1{dbg=DbgX, + inner=P_inner2}, S_i}; _ -> {P_newprop3, S3} end, @@ -1222,7 +1227,12 @@ react_to_env_B10(Retries, P_newprop, P_latest, LatestUnanimousP, {newprop_flap_count, P_newprop_flap_count}, {flap_limit, FlapLimit}]}), _B10Hack = get(b10_hack), - io:format(user, "{FLAP: ~w flaps ~w}!\n", [S#ch_mgr.name, P_newprop_flap_count]), + case proplists:get_value(private_write_verbose, S#ch_mgr.opts) of + true -> + io:format(user, "{FLAP: ~w flaps ~w}! ", [S#ch_mgr.name, P_newprop_flap_count]); + _ -> + ok + end, if %% MEANWHILE, we have learned some things about this @@ -1309,31 +1319,21 @@ react_to_env_C100(P_newprop, P_latest, I_am_UPI_in_newprop_p = lists:member(MyName, P_newprop#projection_v1.upi), I_am_Repairing_in_latest_p = lists:member(MyName, P_latest#projection_v1.repairing), - ShortCircuit_p = - P_latest#projection_v1.epoch_number > P_current#projection_v1.epoch_number - andalso - I_am_UPI_in_newprop_p - andalso - I_am_Repairing_in_latest_p, - Current_sane_p = projection_transition_is_sane(P_current, P_latest, MyName), - case {ShortCircuit_p, Current_sane_p} of + put(xxx_hack, [{p_current, machi_projection:make_summary(P_current)}, + {epoch_compare, P_latest#projection_v1.epoch_number > P_current#projection_v1.epoch_number}, + {i_am_upi_in_newprop_p, I_am_UPI_in_newprop_p}, + {i_am_repairing_in_latest_p, I_am_Repairing_in_latest_p}]), + case Current_sane_p of _ when P_current#projection_v1.epoch_number == 0 -> %% Epoch == 0 is reserved for first-time, just booting conditions. ?REACT({c100, ?LINE, [first_write]}), react_to_env_C110(P_latest, S); - {true, _} -> - %% Someone else believes that I am repairing. We assume - %% that nobody is being Byzantine, so we'll believe that I - %% am/should be repairing. We ignore our proposal and try - %% to go with the latest. - ?REACT({c100, ?LINE, [repairing_short_circuit]}), - react_to_env_C110(P_latest, S); - {_, true} -> + true -> ?REACT({c100, ?LINE, [sane]}), react_to_env_C110(P_latest, S); - {_, _AnyOtherReturnValue} -> + _AnyOtherReturnValue -> %% P_latest is not sane. %% By process of elimination, P_newprop is best, %% so let's write it. @@ -1343,7 +1343,9 @@ react_to_env_C100(P_newprop, P_latest, react_to_env_C110(P_latest, #ch_mgr{name=MyName} = S) -> ?REACT(c110), - Extra_todo = [], + %% Extra_todo = [], + Extra_todo = get(xxx_hack), + %% Extra_todo = [{hee, lists:reverse(get(react))}], P_latest2 = machi_projection:update_dbg2(P_latest, Extra_todo), MyNamePid = proxy_pid(MyName, S), @@ -1362,14 +1364,24 @@ react_to_env_C110(P_latest, #ch_mgr{name=MyName} = S) -> {HH,MM,SS} = time(), case inner_projection_exists(P_latest2) of false -> - io:format(user, "\n~2..0w:~2..0w:~2..0w.~3..0w ~p uses plain: ~w\n", + case proplists:get_value(private_write_verbose, S#ch_mgr.opts) of + true -> + io:format(user, "\n~2..0w:~2..0w:~2..0w.~3..0w ~p uses plain: ~w\n", [HH,MM,SS,MSec, S#ch_mgr.name, machi_projection:make_summary(P_latest2)]); + _ -> + ok + end; true -> - P_inner = inner_projection_or_self(P_latest2), - io:format(user, "\n~2..0w:~2..0w:~2..0w.~3..0w ~p uses inner: ~w\n", + case proplists:get_value(private_write_verbose, S#ch_mgr.opts) of + true -> + P_inner = inner_projection_or_self(P_latest2), + io:format(user, "\n~2..0w:~2..0w:~2..0w.~3..0w ~p uses inner: ~w\n", [HH,MM,SS,MSec, S#ch_mgr.name, - machi_projection:make_summary(P_inner)]) + machi_projection:make_summary(P_inner)]); + _ -> + ok + end end; _ -> ok @@ -1438,12 +1450,12 @@ react_to_env_C310(P_newprop, S) -> ?REACT({c310, ?LINE, [{newprop, machi_projection:make_summary(P_newprop)}, {write_result, WriteRes}]}), +%% io:format(user, "HEE310 ~w ~w ~w\n", [S#ch_mgr.name, self(), lists:reverse(get(react))]), react_to_env_A10(S2). calculate_flaps(P_newprop, _P_current, _FlapLimit, #ch_mgr{name=MyName, proj_history=H, flap_start=FlapStart, - flaps=Flaps, runenv=RunEnv0} = S) -> - RunEnv1 = replace(RunEnv0, [{flapping_i, []}]), + flaps=Flaps, runenv=RunEnv1} = S) -> HistoryPs = queue:to_list(H), Ps = HistoryPs ++ [P_newprop], UniqueProposalSummaries = lists:usort([{P#projection_v1.upi, @@ -1460,8 +1472,6 @@ calculate_flaps(P_newprop, _P_current, _FlapLimit, HosedTransUnion = proplists:get_value(trans_all_hosed, Props), TransFlapCounts0 = proplists:get_value(trans_all_flap_counts, Props), - _Unanimous = proplists:get_value(unanimous_flus, Props), - _NotUnanimous = proplists:get_value(not_unanimous_flus, Props), %% NOTE: bad_answer_flus are probably due to timeout or some other network %% glitch, i.e., anything other than {ok, P::projection()} %% response from machi_flu0:proj_read_latest(). @@ -1542,11 +1552,6 @@ calculate_flaps(P_newprop, _P_current, _FlapLimit, FlappingI = make_flapping_i(NewFlapStart, NewFlaps, AllHosed, AllFlapCounts, BadFLUs), - Dbg2 = [FlappingI|P_newprop#projection_v1.dbg], - %% TODO: 2015-03-04: I'm growing increasingly suspicious of - %% the 'runenv' variable that's threaded through all this code. - %% It isn't doing what I'd originally intended. Fix it. - RunEnv2 = replace(RunEnv1, [FlappingI]), %% NOTE: Just because we increment flaps here, there's no correlation %% to successful public proj store writes! For example, %% if we loop through states C2xx a few times, we would incr @@ -1558,17 +1563,21 @@ calculate_flaps(P_newprop, _P_current, _FlapLimit, %% a large local flaps count gives no concrete guarantee that any %% communication has been successful with any other part of the %% cluster. - {machi_projection:update_checksum(P_newprop#projection_v1{dbg=Dbg2}), - S#ch_mgr{flaps=NewFlaps, flap_start=NewFlapStart, runenv=RunEnv2}}. + %% TODO: 2015-03-04: I'm growing increasingly suspicious of + %% the 'runenv' variable that's threaded through all this code. + %% It isn't doing what I'd originally intended. Fix it. + {machi_projection:update_checksum(P_newprop#projection_v1{ + flap=FlappingI}), + S#ch_mgr{flaps=NewFlaps, flap_start=NewFlapStart, runenv=RunEnv1}}. make_flapping_i() -> make_flapping_i({{epk,-1},?NOT_FLAPPING}, 0, [], [], []). make_flapping_i(NewFlapStart, NewFlaps, AllHosed, AllFlapCounts, BadFLUs) -> - {flapping_i, [{flap_count, {NewFlapStart, NewFlaps}}, - {all_hosed, AllHosed}, - {all_flap_counts, lists:sort(AllFlapCounts)}, - {bad,BadFLUs}]}. + #flap_i{flap_count={NewFlapStart, NewFlaps}, + all_hosed=AllHosed, + all_flap_counts=lists:sort(AllFlapCounts), + bad=BadFLUs}. projection_transitions_are_sane(Ps, RelativeToServer) -> projection_transitions_are_sane(Ps, RelativeToServer, false). @@ -1665,7 +1674,8 @@ projection_transition_is_sane( %% Additions to the UPI chain may only be at the tail UPI_common_prefix = find_common_prefix(UPI_list1, UPI_list2), - if UPI_common_prefix == [] -> + true = + if UPI_common_prefix == [] -> if UPI_list1 == [] orelse UPI_list2 == [] -> %% If the common prefix is empty, then one of the %% inputs must be empty. @@ -1691,7 +1701,7 @@ projection_transition_is_sane( %% [] [b] %% %% ... where RelativeToServer=b. In this case, b - %% has been partitions for a while and has only + %% has been partitioned for a while and has only %% now just learned of several epoch transitions. %% If the author of both is also in the UPI of %% both, then those authors would not have allowed @@ -1716,15 +1726,47 @@ projection_transition_is_sane( not (lists:member(RelativeToServer, Down_list2) orelse lists:member(RelativeToServer, Repairing_list2)), + UPIs_are_disjointP = ordsets:is_disjoint(ordsets:from_list(UPI_list1), + ordsets:from_list(UPI_list2)), + case UPI_2_suffix -- UPI_list1 of + [] -> + true; + [_|_] = _Added_by_2 -> + if RetrospectiveP -> + %% Any servers added to the UPI must be added from the + %% repairing list ... but in retrospective mode (where + %% we're checking only the transitions where all + %% UPI+repairing participants have unanimous private + %% projections!), and if we're under asymmetric + %% partition/churn, then we may not see the repairing + %% list. So we will not check that condition here. + true; + not RetrospectiveP -> + %% We're not retrospective. So, if some server was + %% added by to the UPI, then that means that it was + %% added by repair. And repair is coordinated by the + %% UPI tail/last. +%io:format(user, "g: UPI_list1=~w, UPI_list2=~w, UPI_2_suffix=~w, ", +% [UPI_list1, UPI_list2, UPI_2_suffix]), +%io:format(user, "g", []), + true = UPI_list1 == [] orelse + UPIs_are_disjointP orelse + (lists:last(UPI_list1) == AuthorServer2) + end + end, + if not MoreCheckingP -> ok; MoreCheckingP -> %% Where did elements in UPI_2_suffix come from? %% Only two sources are permitted. - [lists:member(X, Repairing_list1) % X added after repair done - orelse - lists:member(X, UPI_list1) % X in UPI_list1 after common pref - || X <- UPI_2_suffix], + Oops_check_UPI_2_suffix = + [lists:member(X, Repairing_list1) % X added after repair done + orelse + lists:member(X, UPI_list1) % X in UPI_list1 after common pref + || X <- UPI_2_suffix], + %% Grrrrr, ok, so this check isn't good, at least at bootstrap time. + %% TODO: false = lists:member(false, Oops_check_UPI_2_suffix), %% The UPI_2_suffix must exactly be equal to: ordered items from %% UPI_list1 concat'ed with ordered items from Repairing_list1. @@ -1780,30 +1822,77 @@ projection_transition_is_sane( %% The retrospective view by %% machi_chain_manager1_pulse.erl just can't %% reason correctly about this situation. We - %% will instead rely on the non-introspective + %% will instead rely on the non-retrospective %% sanity checking that each FLU does before it %% writes to its private projection store and %% then adopts that projection (and unwedges %% itself, etc etc). - exit({todo, revisit, ?MODULE, ?LINE}), - io:format(user, "|~p,~p TODO revisit|", - [?MODULE, ?LINE]), - ok; + if UPIs_are_disjointP -> + true; + true -> + exit({todo, revisit, ?MODULE, ?LINE, + [ + {oops_check_UPI_2_suffix, Oops_check_UPI_2_suffix}, + {upi_2_suffix, UPI_2_suffix}, + {upi_2_concat, UPI_2_concat}, + {retrospectivep, RetrospectiveP} + ]}), + io:format(user, "|~p,~p TODO revisit|", + [?MODULE, ?LINE]), + ok + end; true -> %% The following is OK: We're shifting from a %% normal projection to an inner one. The old %% normal has a UPI that has nothing to do with %% RelativeToServer a.k.a. me. + %% Or else the UPI_list1 is empty, and I'm + %% the only member of UPI_list2 + %% But the new/suffix is definitely me. %% from: %% {epoch,847},{author,c},{upi,[c]},{repair,[]}, %% {down,[a,b,d]} %% to: %% {epoch,848},{author,a},{upi,[a]},{repair,[]}, %% {down,[b,c,d]} - if UPI_2_suffix == [AuthorServer2] -> + FirstCase_p = (UPI_2_suffix == [AuthorServer2]) + andalso + ((inner_projection_exists(P1) == false + andalso + inner_projection_exists(P2) == true) + orelse UPI_list1 == []), + + %% Here's another case that's alright: + %% + %% {a,{err,exit, + %% {upi_2_suffix_error,[c]}, .... + %% + %% from: + %% {epoch,937},{author,a},{upi,[a,b]},{repair,[]}, + %% {down,[c]} + %% to: + %% {epoch,943},{author,a},{upi,{a,b,c},{repair,[]}, + %% {down,[]} + + %% The author server doesn't matter. However, + %% there were two other epochs in between, 939 + %% and 941, where there wasn't universal agreement + %% of private projections. The repair controller + %% at the tail, 'b', had decided that the repair + %% of 'c' was finished @ epoch 941. + SecondCase_p = ((UPI_2_suffix -- Repairing_list1) + == []), + if FirstCase_p -> true; - not RetrospectiveP -> + SecondCase_p -> + true; + UPIs_are_disjointP -> + %% If there's no overlap at all between + %% UPI_list1 & UPI_list2, then we're OK + %% here. + true; + true -> exit({upi_2_suffix_error, UPI_2_suffix}) end end @@ -1844,17 +1933,23 @@ calc_sleep_ranked_order(MinSleep, MaxSleep, FLU, FLU_list) -> end, MinSleep + (SleepChunk * Index). -get_raw_flapping_i(#projection_v1{dbg=Dbg}) -> - proplists:get_value(flapping_i, Dbg, []). +get_raw_flapping_i(#projection_v1{flap=F}) -> + F. get_flap_count(P) -> - proplists:get_value(flap_count, get_raw_flapping_i(P), {0,0}). + case get_raw_flapping_i(P) of undefined -> {0, 0}; + F -> F#flap_i.flap_count + end. get_all_flap_counts(P) -> - proplists:get_value(all_flap_counts, get_raw_flapping_i(P), []). + case get_raw_flapping_i(P) of undefined -> []; + F -> F#flap_i.all_flap_counts + end. get_all_hosed(P) when is_record(P, projection_v1)-> - proplists:get_value(all_hosed, get_raw_flapping_i(P), []). + case get_raw_flapping_i(P) of undefined -> []; + F -> F#flap_i.all_hosed + end. merge_flap_counts(FlapCounts) -> merge_flap_counts(FlapCounts, orddict:new()). @@ -1896,20 +1991,17 @@ gimme_random_uniform(N, S) -> RunEnv2 = [{seed, Seed2}|lists:keydelete(seed, 1, RunEnv1)], {X, S#ch_mgr{runenv=RunEnv2}}. -inner_projection_exists(P) -> - case proplists:get_value(inner_projection, P#projection_v1.dbg) of - undefined -> - false; - _ -> - true - end. +inner_projection_exists(#projection_v1{inner=undefined}) -> + false; +inner_projection_exists(#projection_v1{inner=_}) -> + true. inner_projection_or_self(P) -> - case proplists:get_value(inner_projection, P#projection_v1.dbg) of - undefined -> + case inner_projection_exists(P) of + false -> P; - P_inner -> - P_inner + true -> + P#projection_v1.inner end. make_chmgr_regname(A) when is_atom(A) -> diff --git a/src/machi_cr_client.erl b/src/machi_cr_client.erl index bbb2bbc..ac7134b 100644 --- a/src/machi_cr_client.erl +++ b/src/machi_cr_client.erl @@ -135,7 +135,7 @@ -define(TIMEOUT, 2*1000). -define(DEFAULT_TIMEOUT, 10*1000). -define(MAX_RUNTIME, 8*1000). --define(WORST_PROJ, #projection_v1{epoch_number=-1,epoch_csum= <<>>, +-define(WORST_PROJ, #projection_v1{epoch_number=0,epoch_csum= <<>>, members_dict=[]}). -record(state, { @@ -433,10 +433,13 @@ do_read_chunk2(File, Offset, Size, Depth, STime, %% UPI+repairing. %% If all FLUs in UPI++Repairing are not_written, then do nothing. -read_repair(ConsistencyMode, ReturnMode, File, Offset, Size, 0=Depth, - STime, #state{proj=#projection_v1{upi=[_|_]}}=S) -> % UPI is non-empty - read_repair2(ConsistencyMode, ReturnMode, File, Offset, Size, Depth + 1, - STime, S); +%% Never matches because Depth is always incremented beyond 0 prior to +%% getting here. +%% +%% read_repair(ConsistencyMode, ReturnMode, File, Offset, Size, 0=Depth, +%% STime, #state{proj=#projection_v1{upi=[_|_]}}=S) -> % UPI is non-empty +%% read_repair2(ConsistencyMode, ReturnMode, File, Offset, Size, Depth + 1, +%% STime, S); read_repair(ConsistencyMode, ReturnMode, File, Offset, Size, Depth, STime, #state{proj=P}=S) -> sleep_a_while(Depth), @@ -490,7 +493,7 @@ read_repair2(ap_mode=ConsistencyMode, ToRepair = mutation_flus(P) -- [GotItFrom], read_repair3(ToRepair, ReturnMode, Chunk, [GotItFrom], File, Offset, Size, Depth, STime, S); - {ok, BadChunk} -> + {ok, BadChunk, _GotItFrom} -> exit({todo, bad_chunk_size, ?MODULE, ?LINE, File, Offset, Size, got, byte_size(BadChunk)}); {error, bad_checksum}=BadCS -> @@ -510,10 +513,13 @@ read_repair3([], ReturnMode, Chunk, Repaired, File, Offset, Size, Depth, STime, S) -> read_repair4([], ReturnMode, Chunk, Repaired, File, Offset, Size, Depth, STime, S); -read_repair3(ToRepair, ReturnMode, Chunk, Repaired, File, Offset, - Size, 0=Depth, STime, S) -> - read_repair4(ToRepair, ReturnMode, Chunk, Repaired, File, Offset, - Size, Depth + 1, STime, S); +%% Never matches because Depth is always incremented beyond 0 prior to +%% getting here. +%% +%% read_repair3(ToRepair, ReturnMode, Chunk, Repaired, File, Offset, +%% Size, 0=Depth, STime, S) -> +%% read_repair4(ToRepair, ReturnMode, Chunk, Repaired, File, Offset, +%% Size, Depth + 1, STime, S); read_repair3(ToRepair, ReturnMode, Chunk, Repaired, File, Offset, Size, Depth, STime, #state{proj=P}=S) -> %% io:format(user, "read_repair3 sleep1,", []), diff --git a/src/machi_flu_psup.erl b/src/machi_flu_psup.erl index f7bbf87..b02f64d 100644 --- a/src/machi_flu_psup.erl +++ b/src/machi_flu_psup.erl @@ -98,8 +98,8 @@ init([FluName, TcpPort, DataDir, Props0]) -> SupFlags = {RestartStrategy, MaxRestarts, MaxSecondsBetweenRestarts}, ProjRegName = make_proj_supname(FluName), - Props = [{projection_store_registered_name, ProjRegName}, - {use_partition_simulator,false}|Props0], + Props = Props0 ++ [{projection_store_registered_name, ProjRegName}, + {use_partition_simulator,false}], ProjSpec = {ProjRegName, {machi_projection_store, start_link, [ProjRegName, DataDir, FluName]}, diff --git a/src/machi_projection_store.erl b/src/machi_projection_store.erl index 4eed3ed..4c2664b 100644 --- a/src/machi_projection_store.erl +++ b/src/machi_projection_store.erl @@ -218,7 +218,7 @@ handle_call({{list_all_projections, ProjType}, LC1}, _From, S) -> handle_call({set_wedge_notify_pid, NotifyWedgeStateChanges}, _From, S) -> {reply, ok, S#state{wedge_notify_pid=NotifyWedgeStateChanges}}; handle_call(_Request, _From, S) -> - Reply = whaaaaaaaaaaaaa, + Reply = {whaaaaaaaaaaaaazz, _Request}, {reply, Reply, S}. handle_cast(_Msg, S) -> diff --git a/src/machi_yessir_client.erl b/src/machi_yessir_client.erl index c3cedad..c7e4872 100644 --- a/src/machi_yessir_client.erl +++ b/src/machi_yessir_client.erl @@ -64,8 +64,10 @@ chunk_size }). --type chunk() :: binary() | iolist(). % client can use either --type chunk_csum() :: {file_offset(), chunk_size(), binary()}. +-type chunk() :: chunk_bin() | {chunk_csum(), chunk_bin()}. +-type chunk_bin() :: binary() | iolist(). % client can use either +-type chunk_csum() :: binary(). % 1 byte tag, N-1 bytes checksum +%% -type chunk_summary() :: {file_offset(), chunk_size(), binary()}. -type chunk_s() :: binary(). % server always uses binary() -type chunk_pos() :: {file_offset(), chunk_size(), file_name_s()}. -type chunk_size() :: non_neg_integer(). @@ -81,7 +83,7 @@ -type file_prefix() :: binary() | list(). -type inet_host() :: inet:ip_address() | inet:hostname(). -type inet_port() :: inet:port_number(). --type port_wrap() :: {w,atom(),term()}. +-type port_wrap() :: #yessir{}. % yessir non-standard! -type projection() :: #projection_v1{}. -type projection_type() :: 'public' | 'private'. @@ -118,7 +120,7 @@ append_chunk(_Host, _TcpPort, EpochID, Prefix, Chunk) -> %% `write_chunk()' API. -spec append_chunk_extra(port_wrap(), epoch_id(), file_prefix(), chunk(), chunk_size()) -> - {ok, chunk_pos()} | {error, error_general()} | {error, term()}. + {ok, chunk_pos()}. %%%% | {error, error_general()} | {error, term()}. append_chunk_extra(#yessir{name=Name,start_bin=StartBin}, _EpochID, Prefix, Chunk, ChunkExtra) when is_integer(ChunkExtra), ChunkExtra >= 0 -> @@ -128,7 +130,7 @@ append_chunk_extra(#yessir{name=Name,start_bin=StartBin}, N -> N end, put({Name,offset,File}, Pos + size(Chunk) + ChunkExtra), - {ok, {File, Pos}}. + {ok, {Pos, iolist_size(Chunk), Prefix}}. %% @doc Append a chunk (binary- or iolist-style) of data to a file %% with `Prefix' and also request an additional `Extra' bytes. @@ -140,7 +142,7 @@ append_chunk_extra(#yessir{name=Name,start_bin=StartBin}, -spec append_chunk_extra(inet_host(), inet_port(), epoch_id(), file_prefix(), chunk(), chunk_size()) -> - {ok, chunk_pos()} | {error, error_general()} | {error, term()}. + {ok, chunk_pos()}. %%%% | {error, error_general()} | {error, term()}. append_chunk_extra(_Host, _TcpPort, EpochID, Prefix, Chunk, ChunkExtra) when is_integer(ChunkExtra), ChunkExtra >= 0 -> Sock = connect(#p_srvr{proto_mod=?MODULE}), @@ -291,9 +293,7 @@ get_latest_epoch(Sock, ProjType) when ProjType == 'public' orelse ProjType == 'private' -> case read_latest_projection(Sock, ProjType) of {ok, P} -> - {ok, {P#projection_v1.epoch_number, P#projection_v1.epoch_csum}}; - _ -> - {ok, {0, <<"no such checksum">>}} + {ok, {P#projection_v1.epoch_number, P#projection_v1.epoch_csum}} end. %% @doc Get the latest epoch number + checksum from the FLU's projection store. @@ -431,9 +431,7 @@ list_all_projections(Sock, ProjType) when ProjType == 'public' orelse ProjType == 'private' -> case get_all_projections(Sock, ProjType) of {ok, Ps} -> - {ok, [P#projection_v1.epoch_number || P <- Ps]}; - _ -> - {error, not_written} + {ok, [P#projection_v1.epoch_number || P <- Ps]} end. %% @doc Get all epoch numbers from the FLU's projection store. @@ -558,7 +556,7 @@ connect(#p_srvr{name=Name, props=Props})-> %% Add fake dict entries for these files [begin Prefix = list_to_binary(io_lib:format("fake~w", [X])), - {ok, _} = append_chunk_extra(Sock, unused, Prefix, <<>>, FileSize) + {ok, _} = append_chunk_extra(Sock, {1,<<"unused">>}, Prefix, <<>>, FileSize) end || X <- lists:seq(1, NumFiles)], Sock. diff --git a/test/machi_admin_util_test.erl b/test/machi_admin_util_test.erl index 9ce952f..4d2c37e 100644 --- a/test/machi_admin_util_test.erl +++ b/test/machi_admin_util_test.erl @@ -22,6 +22,7 @@ -compile(export_all). -ifdef(TEST). +-ifndef(PULSE). -include_lib("eunit/include/eunit.hrl"). @@ -72,5 +73,6 @@ verify_file_checksums_test() -> ok = ?FLU:stop(FLU1) end. +-endif. % !PULSE -endif. % TEST diff --git a/test/machi_chain_manager1_converge_demo.erl b/test/machi_chain_manager1_converge_demo.erl index 93e73eb..f0ff2af 100644 --- a/test/machi_chain_manager1_converge_demo.erl +++ b/test/machi_chain_manager1_converge_demo.erl @@ -34,6 +34,7 @@ -compile(export_all). -ifdef(TEST). +-ifndef(PULSE). -ifdef(EQC). -include_lib("eqc/include/eqc.hrl"). @@ -151,7 +152,7 @@ convergence_demo_testfun(NumFLUs) -> {e,TcpPort+4,"./data.e"}, {f,TcpPort+5,"./data.f"}], FLU_biglist = [X || {X,_,_} <- FluInfo], All_list = lists:sublist(FLU_biglist, NumFLUs), - io:format(user, "\nSET # of FLus = ~w members ~w).\n", + io:format(user, "\nSET # of FLUs = ~w members ~w).\n", [NumFLUs, All_list]), machi_partition_simulator:start_link({111,222,33}, 0, 100), _ = machi_partition_simulator:get(All_list), @@ -167,7 +168,8 @@ convergence_demo_testfun(NumFLUs) -> {Name, PPid} end || {#p_srvr{name=Name}=P, _Dir} <- PsDirs], MembersDict = machi_projection:make_members_dict(Ps), - MgrOpts = [private_write_verbose, {active_mode,false}, + %% MgrOpts = [private_write_verbose, {active_mode,false}, + MgrOpts = [{active_mode,false}, {use_partition_simulator, true}], MgrNamez = [begin @@ -186,29 +188,23 @@ convergence_demo_testfun(NumFLUs) -> Parent = self(), DoIt = fun(Iters, S_min, S_max) -> - io:format(user, "\nDoIt: top\n\n", []), + %% io:format(user, "\nDoIt: top\n\n", []), + io:format(user, "DoIt, ", []), Pids = [spawn(fun() -> random:seed(now()), [begin erlang:yield(), S_max_rand = random:uniform( S_max + 1), - io:format(user, "{t}", []), + %% io:format(user, "{t}", []), Elapsed = ?MGR:sleep_ranked_order( S_min, S_max_rand, M_name, All_list), _ = ?MGR:test_react_to_env(MMM), - %% if M_name == d -> - %% [_ = ?MGR:test_react_to_env(MMM) || - %% _ <- lists:seq(1,3)], - %% superunfair; - %% true -> - %% ok - %% end, %% Be more unfair by not %% sleeping here. - %% timer:sleep(S_max - Elapsed), + % timer:sleep(S_max - Elapsed), Elapsed end || _ <- lists:seq(1, Iters)], Parent ! done @@ -221,100 +217,55 @@ convergence_demo_testfun(NumFLUs) -> end || _ <- Pids] end, - _XandYs1 = [[{X,Y}] || X <- All_list, Y <- All_list, X /= Y], - _XandYs2 = [[{X,Y}, {A,B}] || X <- All_list, Y <- All_list, X /= Y, - A <- All_list, B <- All_list, A /= B, - X /= A], - _XandYs3 = [[{X,Y}, {A,B}, {C,D}] || X <- All_list, Y <- All_list, X /= Y, - A <- All_list, B <- All_list, A /= B, - C <- All_list, D <- All_list, C /= D, - X /= A, X /= C, A /= C], - %% AllPartitionCombinations = _XandYs1 ++ _XandYs2, - %% AllPartitionCombinations = _XandYs3, - AllPartitionCombinations = _XandYs1 ++ _XandYs2 ++ _XandYs3, - ?D({?LINE, length(AllPartitionCombinations)}), - machi_partition_simulator:reset_thresholds(10, 50), io:format(user, "\nLet loose the dogs of war!\n", []), DoIt(30, 0, 0), + AllPs = make_partition_list(All_list), + PartitionCounts = lists:zip(AllPs, lists:seq(1, length(AllPs))), + FLUFudge = if NumFLUs < 4 -> + 2; + true -> + 13 + end, [begin - %% io:format(user, "\nSET partitions = ~w.\n", [ [] ]),machi_partition_simulator:no_partitions(), - %% [DoIt(50, 10, 100) || _ <- [1,2,3]], - io:format(user, "\nLet loose the dogs of war!\n", []), - DoIt(30, 0, 0), - io:format(user, "\nSET partitions = ~w.\n", [ [] ]),machi_partition_simulator:no_partitions(), - [DoIt(10, 10, 100) || _ <- [1]], - - %% machi_partition_simulator:reset_thresholds(10, 50), - %% io:format(user, "\nLet loose the dogs of war!\n", []), - %% DoIt(30, 0, 0), - machi_partition_simulator:always_these_partitions(Partition), - io:format(user, "\nSET partitions = ~w.\n", [Partition]), - [DoIt(50, 10, 100) || _ <- [1,2,3,4] ], - _PPP = - [begin - {ok, PPPallPubs} = ?FLU_PC:list_all_projections(FLU,public), - [begin - {ok, Pr} = todo_why_does_this_crash_sometimes( - FLUName, FLU, PPPepoch), - {Pr#projection_v1.epoch_number, FLUName, Pr} - end || PPPepoch <- PPPallPubs] - end || {FLUName, FLU} <- Namez], - %% io:format(user, "PPP ~p\n", [lists:sort(lists:append(_PPP))]), + io:format(user, "\nSET partitions = ~w (~w of ~w) at ~w\n", + [Partition, Count, length(AllPs), time()]), + [DoIt(40, 10, 50) || _ <- lists:seq(0, trunc(NumFLUs*FLUFudge)) ], - %%%%%%%% {stable,true} = {stable,private_projections_are_stable(Namez, DoIt)}, - {hosed_ok,true} = {hosed_ok,all_hosed_lists_are_identical(Namez, Partition)}, - io:format(user, "\nSweet, all_hosed are identical-or-islands-inconclusive.\n", []), - timer:sleep(1000), + {stable,true} = {stable,private_projections_are_stable(Namez, DoIt)}, + io:format(user, "\nSweet, private projections are stable\n", []), +io:format(user, "Rolling sanity check ... ", []), +PrivProjs = [{Name, begin + {ok, Ps8} = ?FLU_PC:get_all_projections(FLU, + private), + Ps9 = if length(Ps8) < 5*1000 -> + Ps8; + true -> + io:format(user, "trunc a bit... ", []), + lists:nthtail(3*1000, Ps8) + end, + [P || P <- Ps9, + P#projection_v1.epoch_number /= 0] + end} || {Name, FLU} <- Namez], +try + [{FLU, true} = {FLU, ?MGR:projection_transitions_are_sane_retrospective(Psx, FLU)} || + {FLU, Psx} <- PrivProjs] +catch _Err:_What -> + io:format(user, "PrivProjs ~p\n", [PrivProjs]), + exit({line, ?LINE, _Err, _What}) +end, +io:format(user, "Yay!\n", []), + timer:sleep(1250), ok - %% end || Partition <- AllPartitionCombinations - %% end || Partition <- [ [{a,b},{b,d},{c,b}], - %% [{a,b},{b,d},{c,b}, {a,b},{b,a},{a,c},{c,a},{a,d},{d,a}], - %% %% [{a,b},{b,d},{c,b}, {b,a},{a,b},{b,c},{c,b},{b,d},{d,b}], - %% [{a,b},{b,d},{c,b}, {c,a},{a,c},{c,b},{b,c},{c,d},{d,c}], - %% [{a,b},{b,d},{c,b}, {d,a},{a,d},{d,b},{b,d},{d,c},{c,d}] ] - %% end || Partition <- [ [{a,b}, {b,c}], - %% [{a,b}, {c,b}] ] - %% end || Partition <- [ [{a,b}, {b,c}] ] %% hosed-not-equal @ 3 FLUs - %% end || Partition <- [ [{b,d}] ] - %% end || Partition <- [ [{a,b}, {b,a}] ] - %% end || Partition <- [ [{a,b}, {b,a}, {a,c},{c,a}] ] - end || Partition <- [ [{a,b}], - [{b,a}] ] - %% end || Partition <- [ [{a,b}, {c,b}], - %% [{a,b}, {b,c}] ] - %% end || Partition <- [ [{a,b}, {b,c}, {c,d}], - %% [{a,b}, {b,c},{b,d}, {c,d}], - %% [{b,a}, {b,c}, {c,d}], - %% [{a,b}, {c,b}, {c,d}], - %% [{a,b}, {b,c}, {d,c}] ] - %% end || Partition <- [ [{a,b}, {b,c}, {c,d}, {d,e}], - %% [{b,a}, {b,c}, {c,d}, {d,e}], - %% [{a,b}, {c,b}, {c,d}, {d,e}], - %% [{a,b}, {b,c}, {d,c}, {d,e}], - %% [{a,b}, {b,c}, {c,d}, {e,d}] ] - %% end || Partition <- [ [{c,a}] ] - %% end || Partition <- [ [{c,a}], [{c,b}, {a, b}] ] - %% end || Partition <- [ [{a,b},{b,a}, {a,c},{c,a}, {a,d},{d,a}], - %% [{a,b},{b,a}, {a,c},{c,a}, {a,d},{d,a}, {b,c}], - %% [{a,b},{b,a}, {a,c},{c,a}, {a,d},{d,a}, {c,d}] ] - %% end || Partition <- [ [{a,b}], - %% [{a,b}, {a,b},{b,a},{a,c},{c,a},{a,d},{d,a}], - %% [{a,b}, {b,a},{a,b},{b,c},{c,b},{b,d},{d,b}], - %% [{a,b}, {c,a},{a,c},{c,b},{b,c},{c,d},{d,c}], - %% [{a,b}, {d,a},{a,d},{d,b},{b,d},{d,c},{c,d}] ] + end || {Partition, Count} <- PartitionCounts ], %% exit(end_experiment), io:format(user, "\nSET partitions = []\n", []), io:format(user, "We should see convergence to 1 correct chain.\n", []), machi_partition_simulator:no_partitions(), - [DoIt(50, 10, 100) || _ <- [1]], - io:format(user, "Sweet, finishing early\n", []), exit(yoyoyo_testing_hack_finishing_early), - %% WARNING: In asymmetric partitions, private_projections_are_stable() - %% will never be true; code beyond this point on the -exp3 - %% branch is bit-rotted, sorry! + [DoIt(50, 10, 50) || _ <- [1]], true = private_projections_are_stable(Namez, DoIt), io:format(user, "~s\n", [os:cmd("date")]), @@ -331,24 +282,21 @@ convergence_demo_testfun(NumFLUs) -> %% unique chains are disjoint. true = machi_chain_manager1_test:all_reports_are_disjoint(Report), - %% Given the report, we flip it around so that we observe the - %% sets of chain transitions relative to each FLU. - R_Chains = [machi_chain_manager1_test:extract_chains_relative_to_flu( - FLU, Report) || FLU <- All_list], - %% ?D(R_Chains), - R_Projs = [{FLU, [machi_chain_manager1_test:chain_to_projection( - FLU, Epoch, UPI, Repairing, All_list) || - {Epoch, UPI, Repairing} <- E_Chains]} || - {FLU, E_Chains} <- R_Chains], - %% For each chain transition experienced by a particular FLU, %% confirm that each state transition is OK. + PrivProjs = [{Name, begin + {ok, Ps9} = ?FLU_PC:get_all_projections(FLU, + private), + [P || P <- Ps9, + P#projection_v1.epoch_number /= 0] + end} || {Name, FLU} <- Namez], try - [{FLU, true} = {FLU, ?MGR:projection_transitions_are_sane(Psx, FLU)} || - {FLU, Psx} <- R_Projs], + [{FLU, true} = {FLU, ?MGR:projection_transitions_are_sane_retrospective(Psx, FLU)} || + {FLU, Psx} <- PrivProjs], io:format(user, "\nAll sanity checks pass, hooray!\n", []) catch _Err:_What -> io:format(user, "Report ~p\n", [Report]), + io:format(user, "PrivProjs ~p\n", [PrivProjs]), exit({line, ?LINE, _Err, _What}) end, %% ?D(R_Projs), @@ -366,6 +314,72 @@ convergence_demo_testfun(NumFLUs) -> ok = machi_partition_simulator:stop() end. +%% Many of the static partition lists below have been problematic at one +%% time or another..... +%% +%% Uncomment *one* of the following make_partition_list() bodies. + +make_partition_list(All_list) -> + _X_Ys1 = [[{X,Y}] || X <- All_list, Y <- All_list, X /= Y], + _X_Ys2 = [[{X,Y}, {A,B}] || X <- All_list, Y <- All_list, X /= Y, + A <- All_list, B <- All_list, A /= B, + X /= A], + _X_Ys3 = [[{X,Y}, {A,B}, {C,D}] || X <- All_list, Y <- All_list, X /= Y, + A <- All_list, B <- All_list, A /= B, + C <- All_list, D <- All_list, C /= D, + X /= A, X /= C, A /= C], + %% Concat = _X_Ys1 ++ _X_Ys2. + %% Concat = _X_Ys3. + Concat = _X_Ys1 ++ _X_Ys2 ++ _X_Ys3, + random_sort(lists:usort([lists:sort(L) || L <- Concat])). + + %% [ [{a,b},{b,d},{c,b}], + %% [{a,b},{b,d},{c,b}, {a,b},{b,a},{a,c},{c,a},{a,d},{d,a}], + %% %% [{a,b},{b,d},{c,b}, {b,a},{a,b},{b,c},{c,b},{b,d},{d,b}], + %% [{a,b},{b,d},{c,b}, {c,a},{a,c},{c,b},{b,c},{c,d},{d,c}], + %% [{a,b},{b,d},{c,b}, {d,a},{a,d},{d,b},{b,d},{d,c},{c,d}] ]. + + %% [ [{a,b}, {b,c}], + %% [{a,b}, {c,b}] ]. + + %% [{a,b}, {b,c}] ]. %% hosed-not-equal @ 3 FLUs + + %% [{b,d}] ]. + + %% [ [{a,b}, {b,a}] ]. + + %% [ [{a,b},{b,c},{c,a}], + %% [{a,b}, {b,a}, {a,c},{c,a}] ]. + + %% [{a,b}, {c,b}], + %% [{a,b}, {b,c}] ]. + + %% [ [{a,b}, {b,c}, {c,d}], + %% [{a,b}, {b,c},{b,d}, {c,d}], + %% [{b,a}, {b,c}, {c,d}], + %% [{a,b}, {c,b}, {c,d}], + %% [{a,b}, {b,c}, {d,c}] ]. + + %% [ [{a,b}, {b,c}, {c,d}, {d,e}], + %% [{b,a}, {b,c}, {c,d}, {d,e}], + %% [{a,b}, {c,b}, {c,d}, {d,e}], + %% [{a,b}, {b,c}, {d,c}, {d,e}], + %% [{a,b}, {b,c}, {c,d}, {e,d}] ]. + + %% [ [{c,a}] ]. + + %% [ [{c,a}], [{c,b}, {a, b}] ]. + + %% [ [{a,b},{b,a}, {a,c},{c,a}, {a,d},{d,a}], + %% [{a,b},{b,a}, {a,c},{c,a}, {a,d},{d,a}, {b,c}], + %% [{a,b},{b,a}, {a,c},{c,a}, {a,d},{d,a}, {c,d}] ]. + + %% [ [{a,b}], + %% [{a,b}, {a,b},{b,a},{a,c},{c,a},{a,d},{d,a}], + %% [{a,b}, {b,a},{a,b},{b,c},{c,b},{b,d},{d,b}], + %% [{a,b}, {c,a},{a,c},{c,b},{b,c},{c,d},{d,c}], + %% [{a,b}, {d,a},{a,d},{d,b},{b,d},{d,c},{c,d}] ]. + todo_why_does_this_crash_sometimes(FLUName, FLU, PPPepoch) -> try {ok, _}=Res = ?FLU_PC:read_projection(FLU, public, PPPepoch), @@ -374,75 +388,32 @@ todo_why_does_this_crash_sometimes(FLUName, FLU, PPPepoch) -> io:format(user, "QQQ Whoa, it crashed this time for ~p at epoch ~p\n", [FLUName, PPPepoch]), timer:sleep(1000), + exit(still_a_problem), ?FLU_PC:read_projection(FLU, public, PPPepoch) end. private_projections_are_stable(Namez, PollFunc) -> - Private1 = [?FLU_PC:get_latest_epochid(FLU, private) || - {_Name, FLU} <- Namez], + Private1 = [get_latest_inner_proj_summ(FLU) || {_Name, FLU} <- Namez], PollFunc(5, 1, 10), - Private2 = [?FLU_PC:get_latest_epochid(FLU, private) || - {_Name, FLU} <- Namez], + Private2 = [get_latest_inner_proj_summ(FLU) || {_Name, FLU} <- Namez], + if Private1 == Private2 -> + ok; + true -> + io:format(user, "Oops: Private1: ~p\n", [Private1]), + io:format(user, "Oops: Private2: ~p\n", [Private2]) + end, true = (Private1 == Private2). -all_hosed_lists_are_identical(Namez, Partition0) -> - Partition = lists:usort(Partition0), - Ps = [element(2,?FLU_PC:read_latest_projection(FLU, private)) || - {_Name, FLU} <- Namez], - UniqueAllHoseds = lists:usort([machi_chain_manager1:get_all_hosed(P) || - {ok, P} <- Ps]), - Members = [M || {M, _Pid} <- Namez], - Islands = machi_partition_simulator:partitions2num_islands( - Members, Partition), - %% io:format(user, "all_hosed_lists_are_identical:\n", []), - %% io:format(user, " Uniques = ~p Islands ~p\n Partition ~p\n", - %% [Uniques, Islands, Partition]), - case length(UniqueAllHoseds) of - 1 -> - true; - %% TODO: With the addition of the digraph stuff below, the clause - %% below probably isn't necessary anymore, since the - %% digraph calculation should catch complete partition islands? - _ when Islands == 'many' -> - %% There are at least two partitions, so yes, it's quite - %% possible that the all_hosed lists may differ. - %% TODO Fix this up to be smarter about fully-isolated - %% islands of partition. - true; - _ -> - DG = digraph:new(), - Connection = machi_partition_simulator:partition2connection( - Members, Partition), - [digraph:add_vertex(DG, X) || X <- Members], - [digraph:add_edge(DG, X, Y) || {X,Y} <- Connection], - Any = - lists:any( - fun(X) -> - NotX = Members -- [X], - lists:any( - fun(Y) -> - %% There must be a shortest path of length - %% two in both directions, otherwise - %% the read projection call will fail. - %% And it's that failure that we're - %% interested in here. - XtoY = digraph:get_short_path(DG, X, Y), - YtoX = digraph:get_short_path(DG, Y, X), - (XtoY == false orelse - length(XtoY) > 2) - orelse - (YtoX == false orelse - length(YtoX) > 2) - end, NotX) - end, Members), - digraph:delete(DG), - if Any == true -> - %% There's a missing path of length 2 between some - %% two FLUs, so yes, there's going to be - %% non-identical all_hosed lists. - true; - true -> - false % There's no excuse, buddy - end - end. +get_latest_inner_proj_summ(FLU) -> + {ok, Proj} = ?FLU_PC:read_latest_projection(FLU, private), + #projection_v1{epoch_number=E, upi=UPI, repairing=Repairing, down=Down} = + machi_chain_manager1:inner_projection_or_self(Proj), + {E, UPI, Repairing, Down}. + +random_sort(L) -> + random:seed(now()), + L1 = [{random:uniform(99999), X} || X <- L], + [X || {_, X} <- lists:sort(L1)]. + +-endif. % !PULSE -endif. % TEST diff --git a/test/machi_chain_manager1_pulse.erl b/test/machi_chain_manager1_pulse.erl index b95cf00..b7785f9 100644 --- a/test/machi_chain_manager1_pulse.erl +++ b/test/machi_chain_manager1_pulse.erl @@ -26,6 +26,7 @@ -compile(export_all). +-include("machi_projection.hrl"). -include_lib("eqc/include/eqc.hrl"). -include_lib("eqc/include/eqc_statem.hrl"). @@ -37,7 +38,7 @@ -compile({pulse_replace_module, [{application, pulse_application}]}). %% The following functions contains side_effects but are run outside %% PULSE, i.e. PULSE needs to leave them alone --compile({pulse_skip,[{prop_pulse_test_,0}]}). +-compile({pulse_skip,[{prop_pulse_test_,0}, {shutdown_hard,0}]}). -compile({pulse_no_side_effect,[{file,'_','_'}, {erlang, now, 0}]}). %% Used for output within EUnit... @@ -50,6 +51,7 @@ -define(MGR, machi_chain_manager1). -define(MGRTEST, machi_chain_manager1_test). +-define(FLU_PC, machi_proxy_flu1_client). -record(state, { step=0, @@ -62,7 +64,7 @@ initial_state() -> #state{}. gen_num_pids() -> - choose(2, 5). + choose(2, length(all_list_extra())). gen_seed() -> noshrink({choose(1, 10000), choose(1, 10000), choose(1, 10000)}). @@ -80,7 +82,7 @@ command(S) -> { 1, {call, ?MODULE, change_partitions, [gen_old_threshold(), gen_no_partition_threshold()]}}, {50, {call, ?MODULE, do_ticks, - [choose(5, 100), S#state.pids, + [choose(5, 200), S#state.pids, gen_old_threshold(), gen_no_partition_threshold()]}} ]). @@ -100,34 +102,66 @@ next_state2(S, _Res, {call, _, _Func, _Args}) -> postcondition(_S, {call, _, _Func, _Args}, _Res) -> true. +all_list_extra() -> + [ %% Genenerators assume that this list is at least 2 items + {#p_srvr{name=a, address="localhost", port=7400, + props=[{chmgr, a_chmgr}]}, "./data.pulse.a"} + , {#p_srvr{name=b, address="localhost", port=7401, + props=[{chmgr, b_chmgr}]}, "./data.pulse.b"} + , {#p_srvr{name=c, address="localhost", port=7402, + props=[{chmgr, c_chmgr}]}, "./data.pulse.c"} + , {#p_srvr{name=d, address="localhost", port=7403, + props=[{chmgr, d_chmgr}]}, "./data.pulse.d"} + , {#p_srvr{name=e, address="localhost", port=7404, + props=[{chmgr, e_chmgr}]}, "./data.pulse.e"} + ]. + all_list() -> - [a,b,c]. - %% [a,b,c,d,e]. + [P#p_srvr.name || {P, _Dir} <- all_list_extra()]. -setup(_Num, Seed) -> - ?QC_FMT("\nsetup,", []), - All_list = all_list(), - _ = machi_partition_simulator:start_link(Seed, 0, 100), +setup(Num, Seed) -> + ?QC_FMT("\nsetup(~w", [Num]), + error_logger:tty(false), + All_list = lists:sublist(all_list(), Num), + All_listE = lists:sublist(all_list_extra(), Num), + %% shutdown_hard() has taken care of killing all relevant procs. + [machi_flu1_test:clean_up_data_dir(Dir) || {_P, Dir} <- All_listE], + ?QC_FMT(",z~w", [?LINE]), + + %% Start partition simulator + {ok, PSimPid} = machi_partition_simulator:start_link(Seed, 0, 100), _Partitions = machi_partition_simulator:get(All_list), + ?QC_FMT(",z~w", [?LINE]), - FLU_pids = [begin - {ok, FLUPid} = machi_flu0:start_link(Name), - _ = machi_flu0:get_epoch(FLUPid), - FLUPid - end || Name <- All_list], - Namez = lists:zip(All_list, FLU_pids), - Mgr_pids = [begin - {ok, Mgr} = ?MGR:start_link(Name, All_list, FLU_pid), - Mgr - end || {Name, FLU_pid} <- Namez], - timer:sleep(1), - {ok, P1} = ?MGR:test_calc_projection(hd(Mgr_pids), false), - P1Epoch = P1#projection.epoch_number, - [ok = machi_flu0:proj_write(FLU, P1Epoch, public, P1) || FLU <- FLU_pids], - [?MGR:test_react_to_env(Mgr) || Mgr <- Mgr_pids], + %% Start FLUs and their associated procs + {ok, SupPid} = machi_flu_sup:start_link(), + FluOpts = [{use_partition_simulator, true}, {active_mode, false}], + [begin + #p_srvr{name=Name, port=Port} = P, + {ok, _} = machi_flu_psup:start_flu_package(Name, Port, Dir, FluOpts) + end || {P, Dir} <- All_listE], + %% Set up the chain + Dict = orddict:from_list([{P#p_srvr.name, P} || {P, _Dir} <- All_listE]), + ?QC_FMT(",z~w", [?LINE]), + [machi_chain_manager1:set_chain_members(get_chmgr(P), Dict) || + {P, _Dir} <- All_listE], + %% Trigger some environment reactions for humming consensus: first + %% do all the same server first, then round-robin evenly across + %% servers. + [begin + _QQa = machi_chain_manager1:test_react_to_env(get_chmgr(P)) + end || {P, _Dir} <- All_listE, _I <- lists:seq(1,20), _Repeat <- [1,2]], + ?QC_FMT(",z~w", [?LINE]), + [begin + _QQa = machi_chain_manager1:test_react_to_env(get_chmgr(P)) + end || _I <- lists:seq(1,20), {P, _Dir} <- All_listE, _Repeat <- [1,2]], + ?QC_FMT(",z~w", [?LINE]), - Res = {FLU_pids, Mgr_pids}, + ProxiesDict = ?FLU_PC:start_proxies(Dict), + + Res = {PSimPid, SupPid, ProxiesDict, All_listE}, put(manager_pids_hack, Res), + ?QC_FMT("),", []), Res. change_partitions(OldThreshold, NoPartitionThreshold) -> @@ -137,9 +171,9 @@ change_partitions(OldThreshold, NoPartitionThreshold) -> always_last_partitions() -> machi_partition_simulator:always_last_partitions(). -private_stable_check(FLUs) -> - {_FLU_pids, Mgr_pids} = get(manager_pids_hack), - Res = private_projections_are_stable_check(FLUs, Mgr_pids), +private_stable_check() -> + {_PSimPid, _SupPid, ProxiesDict, All_listE} = get(manager_pids_hack), + Res = private_projections_are_stable_check(ProxiesDict, All_listE), if not Res -> io:format(user, "BUMMER: private stable check failed!\n", []); true -> @@ -149,56 +183,61 @@ private_stable_check(FLUs) -> do_ticks(Num, PidsMaybe, OldThreshold, NoPartitionThreshold) -> io:format(user, "~p,~p,~p|", [Num, OldThreshold, NoPartitionThreshold]), - {_FLU_pids, Mgr_pids} = case PidsMaybe of - undefined -> get(manager_pids_hack); - _ -> PidsMaybe - end, + {_PSimPid, _SupPid, ProxiesDict, All_listE} = + case PidsMaybe of + undefined -> get(manager_pids_hack); + _ -> PidsMaybe + end, if is_integer(OldThreshold) -> machi_partition_simulator:reset_thresholds(OldThreshold, NoPartitionThreshold); true -> - ?QC_FMT("{e=~w},", [get_biggest_private_epoch_number()]), + ?QC_FMT("{e=~w},", [get_biggest_private_epoch_number(ProxiesDict)]), machi_partition_simulator:no_partitions() end, - Res = exec_ticks(Num, Mgr_pids), + Res = exec_ticks(Num, All_listE), if not is_integer(OldThreshold) -> - ?QC_FMT("{e=~w},", [get_biggest_private_epoch_number()]); + ?QC_FMT("{e=~w},", [get_biggest_private_epoch_number(ProxiesDict)]); true -> ok end, Res. -get_biggest_private_epoch_number() -> +get_biggest_private_epoch_number(ProxiesDict) -> lists:last( lists:usort( lists:flatten( - [machi_flu0:proj_list_all(FLU, private) || - FLU <- all_list()]))). + [begin + {ok, {Epoch, _}} = ?FLU_PC:get_latest_epochid(Proxy, private), + Epoch + end || {_Name, Proxy} <- orddict:to_list(ProxiesDict)]))). dump_state() -> try ?QC_FMT("dump_state(", []), - {FLU_pids, _Mgr_pids} = get(manager_pids_hack), - Namez = zip(all_list(), FLU_pids), - Report = ?MGRTEST:unanimous_report(Namez), + {_PSimPid, _SupPid, ProxiesDict, _AlE} = get(manager_pids_hack), + Report = ?MGRTEST:unanimous_report(ProxiesDict), + Namez = ProxiesDict, %% ?QC_FMT("Report ~p\n", [Report]), - Diag1 = [begin - Ps = machi_flu0:proj_get_all(FLU, Type), - [io_lib:format("~p ~p ~p: ~w\n", [FLUName, Type, P#projection.epoch_number, ?MGR:make_projection_summary(P)]) || P <- Ps] - end || {FLUName, FLU} <- Namez, - Type <- [public] ], + %% Diag1 = [begin + %% {ok, Ps} = ?FLU_PC:get_all_projections(Proxy, Type), + %% [io_lib:format("~p ~p ~p: ~w\n", [FLUName, Type, P#projection_v1.epoch_number, machi_projection:make_summary(P)]) || P <- Ps] + %% end || {FLUName, Proxy} <- orddict:to_list(ProxiesDict), + %% Type <- [public] ], UniquePrivateEs = lists:usort(lists:flatten( - [machi_flu0:proj_list_all(FLU, private) || - {_FLUName, FLU} <- Namez])), - P_lists0 = [{FLUName, Type, machi_flu0:proj_get_all(FLUPid, Type)} || - {FLUName, FLUPid} <- Namez, Type <- [public,private]], + [element(2,?FLU_PC:list_all_projections(Proxy,private)) || + {_FLUName, Proxy} <- orddict:to_list(ProxiesDict)])), + P_lists0 = [{FLUName, Type, + element(2,?FLU_PC:get_all_projections(Proxy, Type))} || + {FLUName, Proxy} <- orddict:to_list(ProxiesDict), + Type <- [public,private]], P_lists = [{FLUName, Type, P} || {FLUName, Type, Ps} <- P_lists0, P <- Ps], AllDict = lists:foldl(fun({FLU, Type, P}, D) -> - K = {FLU, Type, P#projection.epoch_number}, + K = {FLU, Type, P#projection_v1.epoch_number}, dict:store(K, P, D) end, dict:new(), lists:flatten(P_lists)), DumbFinderBackward = @@ -208,28 +247,35 @@ dump_state() -> {ok, T} -> T; error -> error_unwritten end; - %% case machi_flu0:proj_read(FLU, E, private) of - %% {ok, T} -> T; - %% Else -> Else - %% end; (_E, Acc) -> Acc end end, - Diag2 = [[ - io_lib:format("~p private: ~w\n", - [FLUName, - ?MGR:make_projection_summary( - lists:foldl(DumbFinderBackward(FLUName), - error_unwritten, - lists:seq(Epoch, 0, -1)))]) - || {FLUName, _FLU} <- Namez] - || Epoch <- UniquePrivateEs], + %% Diag2 = [[ + %% io_lib:format("~p private: ~w\n", + %% [FLUName, + %% machi_projection:make_summary( + %% lists:foldl(DumbFinderBackward(FLUName), + %% error_unwritten, + %% lists:seq(Epoch, 0, -1)))]) + %% || {FLUName, _FLU} <- Namez] + %% || Epoch <- UniquePrivateEs], + + PrivProjs = [{Name, begin + {ok, Ps} = ?FLU_PC:get_all_projections(Proxy, + private), + [P || P <- Ps, + P#projection_v1.epoch_number /= 0] + end} || {Name, Proxy} <- ProxiesDict], ?QC_FMT(")", []), - {Report, lists:flatten([Diag1, Diag2])} + Diag1 = Diag2 = "skip_diags", + {Report, PrivProjs, lists:flatten([Diag1, Diag2])} catch XX:YY -> - ?QC_FMT("OUCH: ~p ~p @ ~p\n", [XX, YY, erlang:get_stacktrace()]) + ?QC_FMT("OUCH: ~p ~p @ ~p\n", [XX, YY, erlang:get_stacktrace()]), + ?QC_FMT("Exiting now to move to manual post-mortem....\n", []), + erlang:halt(0), + false end. prop_pulse() -> @@ -245,9 +291,9 @@ prop_pulse() -> Stabilize1 = [{set,{var,99999995}, {call, ?MODULE, always_last_partitions, []}}], Stabilize2 = [{set,{var,99999996}, - {call, ?MODULE, private_stable_check, [all_list()]}}], + {call, ?MODULE, private_stable_check, []}}], LastTriggerTicks = {set,{var,99999997}, - {call, ?MODULE, do_ticks, [25, undefined, no, no]}}, + {call, ?MODULE, do_ticks, [123, undefined, no, no]}}, Cmds1 = lists:duplicate(2, LastTriggerTicks), %% Cmds1 = lists:duplicate(length(all_list())*2, LastTriggerTicks), Cmds = Cmds0 ++ @@ -260,9 +306,14 @@ prop_pulse() -> {_H, _S, _R} = run_commands(?MODULE, Cmds) end, [{seed, Seed}, {strategy, unfair}]), - ok = shutdown_hard(), - - {Report, Diag} = S2#state.dump_state, + %% ?QC_FMT("S2 ~p\n", [S2]), + case S2#state.dump_state of + undefined -> + ?QC_FMT("BUMMER Cmds = ~p\n", [Cmds]); + _ -> + ok + end, + {Report, PrivProjs, Diag} = S2#state.dump_state, %% Report is ordered by Epoch. For each private projection %% written during any given epoch, confirm that all chain @@ -270,21 +321,12 @@ prop_pulse() -> %% unique chains are disjoint. AllDisjointP = ?MGRTEST:all_reports_are_disjoint(Report), - %% Given the report, we flip it around so that we observe the - %% sets of chain transitions relative to each FLU. - R_Chains = [?MGRTEST:extract_chains_relative_to_flu(FLU, Report) || - FLU <- all_list()], - R_Projs = [{FLU, [?MGRTEST:chain_to_projection( - FLU, Epoch, UPI, Repairing, all_list()) || - {Epoch, UPI, Repairing} <- E_Chains]} || - {FLU, E_Chains} <- R_Chains], - %% For each chain transition experienced by a particular FLU, %% confirm that each state transition is OK. Sane = [{FLU,_SaneRes} = {FLU,?MGR:projection_transitions_are_sane_retrospective( Ps, FLU)} || - {FLU, Ps} <- R_Projs], + {FLU, Ps} <- PrivProjs], SaneP = lists:all(fun({_FLU, SaneRes}) -> SaneRes == true end, Sane), %% The final report item should say that all are agreed_membership. @@ -299,13 +341,17 @@ prop_pulse() -> LastRepXs end, + ok = shutdown_hard(), ?WHENFAIL( begin + ?QC_FMT("Cmds = ~p\n", [Cmds]), ?QC_FMT("Res = ~p\n", [Res]), ?QC_FMT("Diag = ~s\n", [Diag]), ?QC_FMT("Report = ~p\n", [Report]), + ?QC_FMT("PrivProjs = ~p\n", [PrivProjs]), ?QC_FMT("Sane = ~p\n", [Sane]), ?QC_FMT("SingleChainNoRepair failure =\n ~p\n", [SingleChainNoRepair]) +,erlang:halt(0) end, conjunction([{res, Res == true orelse Res == ok}, {all_disjoint, AllDisjointP}, @@ -331,49 +377,60 @@ prop_pulse_test_() -> end}. shutdown_hard() -> - (catch machi_partition_simulator:stop()), - [(catch machi_flu0:stop(X)) || X <- all_list()], + ?QC_FMT("shutdown(", []), + (catch unlink(whereis(machi_partition_simulator))), + [begin + Pid = whereis(X), + spawn(fun() -> (catch X:stop()) end), + timer:sleep(50), + (catch unlink(Pid)), + timer:sleep(10), + (catch exit(Pid, shutdown)), + timer:sleep(1), + (catch exit(Pid, kill)) + end || X <- [machi_partition_simulator, machi_flu_sup] ], timer:sleep(1), - (catch exit(whereis(machi_partition_simulator), kill)), - [(catch exit(whereis(X), kill)) || X <- all_list()], - erlang:yield(), + ?QC_FMT(")", []), ok. -exec_ticks(Num, Mgr_pids) -> +exec_ticks(Num, All_listE) -> Parent = self(), Pids = [spawn_link(fun() -> [begin erlang:yield(), + M_name = P#p_srvr.name, Max = 10, Elapsed = ?MGR:sleep_ranked_order(1, Max, M_name, all_list()), - Res = ?MGR:test_react_to_env(MMM), + Res = ?MGR:test_react_to_env(get_chmgr(P)), timer:sleep(erlang:max(0, Max - Elapsed)), Res=Res %% ?D({self(), Res}) end || _ <- lists:seq(1,Num)], Parent ! done - end) || {M_name, MMM} <- lists:zip(all_list(), Mgr_pids) ], + end) || {P, _Dir} <- All_listE], [receive done -> ok - after 5000 -> - exit(icky_timeout) - end || _ <- Pids], + %% after 500*1000 -> + %% exit(icky_timeout) + end || _Pid <- Pids], ok. -private_projections_are_stable_check(All_list, Mgr_pids) -> +private_projections_are_stable_check(ProxiesDict, All_listE) -> %% TODO: extend the check to look not only for latest num, but %% also check for flapping, and if yes, to see if all_hosed are %% all exactly equal. - _ = exec_ticks(40, Mgr_pids), - Private1 = [machi_flu0:proj_get_latest_num(FLU, private) || - FLU <- All_list], - _ = exec_ticks(5, Mgr_pids), - Private2 = [machi_flu0:proj_get_latest_num(FLU, private) || - FLU <- All_list], + _ = exec_ticks(40, All_listE), + Private1 = [?FLU_PC:get_latest_epochid(Proxy, private) || + {_FLU, Proxy} <- orddict:to_list(ProxiesDict)], + _ = exec_ticks(5, All_listE), + Private2 = [?FLU_PC:get_latest_epochid(Proxy, private) || + {_FLU, Proxy} <- orddict:to_list(ProxiesDict)], (Private1 == Private2). +get_chmgr(#p_srvr{props=Ps}) -> + proplists:get_value(chmgr, Ps). -endif. % PULSE diff --git a/test/machi_chain_manager1_test.erl b/test/machi_chain_manager1_test.erl index ef9f92f..c4d52d9 100644 --- a/test/machi_chain_manager1_test.erl +++ b/test/machi_chain_manager1_test.erl @@ -45,18 +45,33 @@ -include_lib("eunit/include/eunit.hrl"). -compile(export_all). +%% Example: +%% [{1,{ok_disjoint,[{agreed_membership,{[a],[b,c]}}]}}, +%% {3,{ok_disjoint,[{agreed_membership,{[a],[b,c]}}]}}, +%% {8, +%% {ok_disjoint,[{not_agreed,{[a],[b,c]}, +%% [{b,not_in_this_epoch}, +%% <<65,159,66,113,232,15,156,244,197, +%% 210,39,82,229,84,192,19,27,45,161,38>>]}]}}, +%% {10,{ok_disjoint,[{agreed_membership,{[c],[]}}]}}, +%% ...] + unanimous_report(Namez) -> UniquePrivateEs = lists:usort(lists:flatten( [element(2, ?FLU_PC:list_all_projections(FLU, private)) || {_FLUName, FLU} <- Namez])), - [unanimous_report(Epoch, Namez) || Epoch <- UniquePrivateEs]. + [unanimous_report(Epoch, Namez) || Epoch <- UniquePrivateEs, + Epoch /= 0]. unanimous_report(Epoch, Namez) -> - Projs = [{FLUName, case ?FLU_PC:read_projection(FLU, private, Epoch) of - {ok, T} -> T; - _Else -> not_in_this_epoch - end} || {FLUName, FLU} <- Namez], + Projs = [{FLUName, + case ?FLU_PC:read_projection(FLU, private, Epoch) of + {ok, T} -> + machi_chain_manager1:inner_projection_or_self(T); + _Else -> + {FLUName, not_in_this_epoch} + end} || {FLUName, FLU} <- Namez], UPI_R_Sums = [{Proj#projection_v1.upi, Proj#projection_v1.repairing, Proj#projection_v1.epoch_csum} || {_FLUname, Proj} <- Projs, @@ -71,31 +86,12 @@ unanimous_report(Epoch, Namez) -> %% that all FLUs are in agreement. {UPI, Repairing, _CSum} = lists:keyfind(UPI, 1, UPI_R_Sums), - %% TODO: make certain that this subtlety doesn't get - %% last in later implementations. - - %% So, this is a bit of a tricky thing. If we're at - %% upi=[c] and repairing=[a,b], then the transition - %% (eventually!) to upi=[c,a] does not currently depend - %% on b being an active participant in the repair. - %% - %% Yes, b's state is very important for making certain - %% that all repair operations succeed both to a & b. - %% However, in this simulation, we only consider that - %% the head(Repairing) is sane. Therefore, we use only - %% the "HeadOfRepairing" in our considerations here. - HeadOfRepairing = case Repairing of - [H_Rep|_] -> - [H_Rep]; - _ -> - [] - end, Tmp = [{FLU, case proplists:get_value(FLU, Projs) of P when is_record(P, projection_v1) -> P#projection_v1.epoch_csum; Else -> Else - end} || FLU <- UPI ++ HeadOfRepairing], + end} || FLU <- UPI ++ Repairing], case lists:usort([CSum || {_FLU, CSum} <- Tmp]) of [_] -> {agreed_membership, {UPI, Repairing}}; @@ -103,7 +99,7 @@ unanimous_report(Epoch, Namez) -> {not_agreed, {UPI, Repairing}, Else2} end; _Else -> - {UPI, not_unique, Epoch, _Else} + {not_agreed, {undefined, undefined}, Projs} end end || UPI <- UniqueUPIs], AgreedResUPI_Rs = [UPI++Repairing || @@ -128,10 +124,11 @@ extract_chains_relative_to_flu(FLU, Report) -> lists:member(FLU, UPI) orelse lists:member(FLU, Repairing)]}. chain_to_projection(MyName, Epoch, UPI_list, Repairing_list, All_list) -> - exit({todo_broken_fixme,?MODULE,?LINE}), - machi_projection:new(Epoch, MyName, All_list, + MemberDict = orddict:from_list([{FLU, #p_srvr{name=FLU}} || + FLU <- All_list]), + machi_projection:new(Epoch, MyName, MemberDict, All_list -- (UPI_list ++ Repairing_list), - UPI_list, Repairing_list, []). + UPI_list, Repairing_list, [{artificial_by, ?MODULE}]). -ifndef(PULSE). @@ -260,5 +257,5 @@ nonunanimous_setup_and_fix_test() -> ok = machi_partition_simulator:stop() end. --endif. % not PULSE +-endif. % !PULSE -endif. % TEST diff --git a/test/machi_cr_client_test.erl b/test/machi_cr_client_test.erl index d22908d..6466294 100644 --- a/test/machi_cr_client_test.erl +++ b/test/machi_cr_client_test.erl @@ -24,8 +24,9 @@ -include("machi_projection.hrl"). -ifdef(TEST). --include_lib("eunit/include/eunit.hrl"). +-ifndef(PULSE). +-include_lib("eunit/include/eunit.hrl"). smoke_test_() -> {timeout, 1*60, fun() -> smoke_test2() end}. @@ -141,4 +142,5 @@ smoke_test2() -> exit(SupPid, normal) end. +-endif. % !PULSE -endif. % TEST. diff --git a/test/machi_flu1_test.erl b/test/machi_flu1_test.erl index 034708e..153f65a 100644 --- a/test/machi_flu1_test.erl +++ b/test/machi_flu1_test.erl @@ -22,6 +22,7 @@ -compile(export_all). -ifdef(TEST). + -include("machi.hrl"). -include("machi_projection.hrl"). -include_lib("eunit/include/eunit.hrl"). @@ -29,6 +30,15 @@ -define(FLU, machi_flu1). -define(FLU_C, machi_flu1_client). +clean_up_data_dir(DataDir) -> + [begin + Fs = filelib:wildcard(DataDir ++ Glob), + [file:delete(F) || F <- Fs], + [file:del_dir(F) || F <- Fs] + end || Glob <- ["*/*/*/*", "*/*/*", "*/*", "*"] ], + _ = file:del_dir(DataDir), + ok. + setup_test_flu(RegName, TcpPort, DataDir) -> setup_test_flu(RegName, TcpPort, DataDir, []). @@ -48,6 +58,8 @@ setup_test_flu(RegName, TcpPort, DataDir, DbgProps) -> timer:sleep(10), FLU1. +-ifndef(PULSE). + flu_smoke_test() -> Host = "localhost", TcpPort = 32957, @@ -141,7 +153,8 @@ flu_smoke_test() -> {error, bad_arg} = ?FLU_C:trunc_hack(Host, TcpPort, ?DUMMY_PV1_EPOCH, BadFile), - ok = ?FLU_C:quit(machi_util:connect(Host, TcpPort)) + ok = ?FLU_C:quit(?FLU_C:connect(#p_srvr{address=Host, + port=TcpPort})) after ok = ?FLU:stop(FLU1) end. @@ -197,13 +210,5 @@ bad_checksum_test() -> ok = ?FLU:stop(FLU1) end. -clean_up_data_dir(DataDir) -> - [begin - Fs = filelib:wildcard(DataDir ++ Glob), - [file:delete(F) || F <- Fs], - [file:del_dir(F) || F <- Fs] - end || Glob <- ["*/*/*/*", "*/*/*", "*/*", "*"] ], - _ = file:del_dir(DataDir), - ok. - +-endif. % !PULSE -endif. % TEST diff --git a/test/machi_flu_psup_test.erl b/test/machi_flu_psup_test.erl index e47691a..3e1a90c 100644 --- a/test/machi_flu_psup_test.erl +++ b/test/machi_flu_psup_test.erl @@ -21,6 +21,7 @@ -module(machi_flu_psup_test). -ifdef(TEST). +-ifndef(PULSE). -include_lib("eunit/include/eunit.hrl"). @@ -101,7 +102,7 @@ partial_stop_restart2() -> {_,_,_} = machi_chain_manager1:test_react_to_env(hd(ChMgrs)), [begin _QQa = machi_chain_manager1:test_react_to_env(ChMgr) - end || _ <- lists:seq(1,25), ChMgr <- ChMgrs], + end || _ <- lists:seq(1,125), ChMgr <- ChMgrs], %% All chain managers & projection stores should be using the %% same projection which is max projection in each store. @@ -112,8 +113,10 @@ partial_stop_restart2() -> {ok, Proj_m} = machi_projection_store:read_latest_projection( hd(PStores), public), [begin - {ok, Proj_m} = machi_projection_store:read_latest_projection( - PStore, ProjType) + {ok, Proj_m2} = machi_projection_store:read_latest_projection( + PStore, ProjType), + true = (machi_projection:update_dbg2(Proj_m, []) == + machi_projection:update_dbg2(Proj_m2, [])) end || ProjType <- [public, private], PStore <- PStores ], Epoch_m = Proj_m#projection_v1.epoch_number, %% Confirm that all FLUs are *not* wedged, with correct proj & epoch @@ -130,8 +133,10 @@ partial_stop_restart2() -> ok = machi_flu_psup:stop_flu_package(FluName_a), {ok, _} = Start(hd(Ps)), %% Remember: 'a' is not in active mode. - {ok, Proj_m} = machi_projection_store:read_latest_projection( - hd(PStores), private), + {ok, Proj_m3} = machi_projection_store:read_latest_projection( + hd(PStores), private), + true = (machi_projection:update_dbg2(Proj_m, []) == + machi_projection:update_dbg2(Proj_m, [])), %% Confirm that 'a' is wedged {error, wedged} = Append(hd(Ps)), {_, #p_srvr{address=Addr_a, port=TcpPort_a}} = hd(Ps), @@ -158,6 +163,7 @@ partial_stop_restart2() -> ok end. +-endif. % !PULSE -endif. % TEST diff --git a/test/machi_projection_test.erl b/test/machi_projection_test.erl index 3f2d59a..e2ed93e 100644 --- a/test/machi_projection_test.erl +++ b/test/machi_projection_test.erl @@ -21,6 +21,8 @@ -module(machi_projection_test). -ifdef(TEST). +-ifndef(PULSE). + -compile(export_all). -include("machi_projection.hrl"). @@ -83,4 +85,5 @@ try_it(MyName, All_list, UPI_list, Down_list, Repairing_list, Ps) -> false end. +-endif. % !PULSE -endif. % TEST diff --git a/test/machi_proxy_flu1_client_test.erl b/test/machi_proxy_flu1_client_test.erl index ac64a22..e98b129 100644 --- a/test/machi_proxy_flu1_client_test.erl +++ b/test/machi_proxy_flu1_client_test.erl @@ -28,6 +28,7 @@ -define(MUT, machi_proxy_flu1_client). -ifdef(TEST). +-ifndef(PULSE). api_smoke_test() -> RegName = api_smoke_flu, @@ -279,4 +280,5 @@ flu_restart_test() -> [catch machi_flu1:stop(Pid) || Pid <- get(flu_pid)] end. +-endif. % !PULSE -endif. % TEST