From 9249663ff6e352eb76a404e77d0d6b64cd35de21 Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Thu, 30 Apr 2015 15:15:43 +0900 Subject: [PATCH 01/12] TODO updates for machi_flu1.erl based on doc rewriting work --- src/machi_flu1.erl | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl index 459aad3..54bf08e 100644 --- a/src/machi_flu1.erl +++ b/src/machi_flu1.erl @@ -37,13 +37,27 @@ %% %% The FLU is named after the CORFU server "FLU" or "FLash Unit" server. %% -%% TODO There is one major missing feature in this FLU implementation: +%% TODO There is a major missing feature in this FLU implementation: %% there is no "write-once" enforcement for any position in a Machi %% file. At the moment, we rely on correct behavior of the client %% & the sequencer to avoid overwriting data. In the Real World, %% however, all Machi file data is supposed to be exactly write-once %% to avoid problems with bugs, wire protocol corruption, malicious %% clients, etc. +%% +%% TODO The per-file metadata tuple store is missing from this implementation. +%% +%% TODO Section 4.1 ("The FLU") of the Machi design doc suggests that +%% the FLU keep track of the epoch number of the last file write (and +%% perhaps last metadata write), as an optimization for inter-FLU data +%% replication/chain repair. +%% +%% TODO Section 4.2 ("The Sequencer") says that the sequencer must +%% change its file assignments to new & unique names whenever we move +%% to wedge state. This is not yet implemented. In the current +%% Erlang process scheme (which will probably be changing soon), a +%% simple implementation would stop all existing processes that are +%% running run_seq_append_server(). -module(machi_flu1). From 442e79e4f15a732729e8cddde964f551bf73a47d Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Thu, 30 Apr 2015 17:28:43 +0900 Subject: [PATCH 02/12] Add machi_flu_psup.erl to supervise all 3 FLU processes (see below) Introduce machi_flu_psup:start_flu_package/4 as a way to start all related FLU processes * The projection store * The chain manager * The FLU itself ... as well as linked processes. http://www.snookles.com/scotttmp/flu-tree-20150430.png shows one FLU running, "a". The process registered "a" is the append server, "some-prefix" for the sequencer & writer for the current <<"some-prefix">> file, and a process each for 3 active TCP connections to that FLU. --- src/machi.app.src | 2 +- src/machi_chain_manager1.erl | 17 ++++- src/machi_flu1.erl | 93 +++++++++++++++++---------- src/machi_flu_psup.erl | 76 ++++++++++++++++++++++ src/machi_flu_sup.erl | 11 +--- src/machi_projection.erl | 17 ++++- test/machi_proxy_flu1_client_test.erl | 2 +- 7 files changed, 167 insertions(+), 51 deletions(-) create mode 100644 src/machi_flu_psup.erl diff --git a/src/machi.app.src b/src/machi.app.src index 7a2866b..64d9a3a 100644 --- a/src/machi.app.src +++ b/src/machi.app.src @@ -7,7 +7,7 @@ {env, [ {flu_list, [ - {flu_a, 32900, "./data.flu_a"} + %%%%%% {flu_a, 32900, "./data.flu_a"} ]} ]} ]}. diff --git a/src/machi_chain_manager1.erl b/src/machi_chain_manager1.erl index 3bcb462..7244822 100644 --- a/src/machi_chain_manager1.erl +++ b/src/machi_chain_manager1.erl @@ -95,7 +95,8 @@ start_link(MyName, MembersDict) -> start_link(MyName, MembersDict, []). start_link(MyName, MembersDict, MgrOpts) -> - gen_server:start_link(?MODULE, {MyName, MembersDict, MgrOpts}, []). + gen_server:start_link({local, make_regname(MyName)}, ?MODULE, + {MyName, MembersDict, MgrOpts}, []). stop(Pid) -> gen_server:call(Pid, {stop}, infinity). @@ -206,7 +207,10 @@ handle_cast(_Cast, S) -> {noreply, S}. handle_info(Msg, S) -> - exit({bummer, Msg}), + case get(todo_bummer) of undefined -> io:format("TODO: got ~p\n", [Msg]); + _ -> ok + end, + put(todo_bummer, true), {noreply, S}. terminate(_Reason, _S) -> @@ -1562,7 +1566,9 @@ calc_sleep_ranked_order(MinSleep, MaxSleep, FLU, FLU_list) -> Index = length(Front) + 1, NumNodes = length(FLU_list), SleepIndex = NumNodes - Index, - SleepChunk = MaxSleep div NumNodes, + SleepChunk = if NumNodes == 0 -> 0; + true -> MaxSleep div NumNodes + end, MinSleep + (SleepChunk * SleepIndex). my_find_minmost([]) -> @@ -1648,6 +1654,11 @@ inner_projection_or_self(P) -> P_inner end. +make_regname(A) when is_atom(A) -> + list_to_atom(atom_to_list(A) ++ "_chmgr"); +make_regname(B) when is_binary(B) -> + list_to_atom(binary_to_list(B) ++ "_chmgr"). + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% perhaps_call_t(S, Partitions, FLU, DoIt) -> diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl index 54bf08e..91ff10a 100644 --- a/src/machi_flu1.erl +++ b/src/machi_flu1.erl @@ -69,7 +69,7 @@ -export([start_link/1, stop/1]). -record(state, { - reg_name :: atom(), + flu_name :: atom(), proj_store :: pid(), append_pid :: pid(), tcp_port :: non_neg_integer(), @@ -95,15 +95,22 @@ stop(Pid) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%% -main2(RegName, TcpPort, DataDir, Rest) -> - S0 = #state{reg_name=RegName, +main2(FluName, TcpPort, DataDir, Rest) -> + S0 = #state{flu_name=FluName, tcp_port=TcpPort, data_dir=DataDir, props=Rest}, AppendPid = start_append_server(S0), - ProjRegName = make_projection_server_regname(RegName), - {ok, ProjectionPid} = - machi_projection_store:start_link(ProjRegName, DataDir, AppendPid), + {_ProjRegName, ProjectionPid} = + case proplists:get_value(projection_store_registered_name, Rest) of + undefined -> + RN = make_projection_server_regname(FluName), + {ok, PP} = + machi_projection_store:start_link(RN, DataDir, AppendPid), + {RN, PP}; + RN -> + {RN, whereis(RN)} + end, S1 = S0#state{append_pid=AppendPid, proj_store=ProjectionPid}, S2 = case proplists:get_value(dbg, Rest) of @@ -123,7 +130,7 @@ main2(RegName, TcpPort, DataDir, Rest) -> Projection_e = machi_util:make_projection_filename(DataDir, "unused"), ok = filelib:ensure_dir(Projection_e), - put(flu_reg_name, RegName), + put(flu_flu_name, FluName), put(flu_append_pid, AppendPid), put(flu_projection_pid, ProjectionPid), put(flu_listen_pid, ListenPid), @@ -134,44 +141,48 @@ main2(RegName, TcpPort, DataDir, Rest) -> ok. start_listen_server(S) -> - spawn_link(fun() -> run_listen_server(S) end). + proc_lib:spawn_link(fun() -> run_listen_server(S) end). start_append_server(S) -> - spawn_link(fun() -> run_append_server(S) end). + FluPid = self(), + proc_lib:spawn_link(fun() -> run_append_server(FluPid, S) end). %% start_projection_server(S) -> %% spawn_link(fun() -> run_projection_server(S) end). -run_listen_server(#state{tcp_port=TcpPort}=S) -> +run_listen_server(#state{flu_name=FluName, tcp_port=TcpPort}=S) -> + register(make_listener_regname(FluName), self()), SockOpts = [{reuseaddr, true}, {mode, binary}, {active, false}, {packet, line}], {ok, LSock} = gen_tcp:listen(TcpPort, SockOpts), listen_server_loop(LSock, S). -run_append_server(#state{reg_name=Name}=S) -> +run_append_server(FluPid, #state{flu_name=Name}=S) -> register(Name, self()), - append_server_loop(S). + append_server_loop(FluPid, S). listen_server_loop(LSock, S) -> {ok, Sock} = gen_tcp:accept(LSock), spawn_link(fun() -> net_server_loop(Sock, S) end), listen_server_loop(LSock, S). -append_server_loop(#state{data_dir=DataDir}=S) -> +append_server_loop(FluPid, #state{data_dir=DataDir}=S) -> + AppendServerPid = self(), receive {seq_append, From, Prefix, Chunk, CSum} -> spawn(fun() -> append_server_dispatch(From, Prefix, Chunk, CSum, - DataDir) end), - append_server_loop(S); + DataDir, AppendServerPid) end), + %% DataDir, FluPid) end), + append_server_loop(FluPid, S); {wedge_state_change, Boolean} -> - append_server_loop(S#state{wedge=Boolean}) + append_server_loop(FluPid, S#state{wedge=Boolean}) end. -define(EpochIDSpace, (4+20)). -net_server_loop(Sock, #state{reg_name=RegName, data_dir=DataDir}=S) -> +net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) -> ok = inet:setopts(Sock, [{packet, line}]), - case gen_tcp:recv(Sock, 0, 60*1000) of + case gen_tcp:recv(Sock, 0, 600*1000) of {ok, Line} -> %% machi_util:verb("Got: ~p\n", [Line]), PrefixLenLF = byte_size(Line) - 2 - ?EpochIDSpace - 8 - 1, @@ -185,7 +196,7 @@ net_server_loop(Sock, #state{reg_name=RegName, data_dir=DataDir}=S) -> _EpochIDRaw:(?EpochIDSpace)/binary, LenHex:8/binary, Prefix:PrefixLenLF/binary, "\n">> -> - do_net_server_append(RegName, Sock, LenHex, Prefix); + do_net_server_append(FluName, Sock, LenHex, Prefix); <<"R ", _EpochIDRaw:(?EpochIDSpace)/binary, OffsetHex:16/binary, LenHex:8/binary, @@ -233,16 +244,16 @@ net_server_loop(Sock, #state{reg_name=RegName, data_dir=DataDir}=S) -> exit(normal) end. -append_server_dispatch(From, Prefix, Chunk, CSum, DataDir) -> - Pid = write_server_get_pid(Prefix, DataDir), +append_server_dispatch(From, Prefix, Chunk, CSum, DataDir, LinkPid) -> + Pid = write_server_get_pid(Prefix, DataDir, LinkPid), Pid ! {seq_append, From, Prefix, Chunk, CSum}, exit(normal). -do_net_server_append(RegName, Sock, LenHex, Prefix) -> +do_net_server_append(FluName, Sock, LenHex, Prefix) -> %% TODO: robustify against other invalid path characters such as NUL case sanitize_file_string(Prefix) of ok -> - do_net_server_append2(RegName, Sock, LenHex, Prefix); + do_net_server_append2(FluName, Sock, LenHex, Prefix); _ -> ok = gen_tcp:send(Sock, <<"ERROR BAD-ARG">>) end. @@ -255,13 +266,13 @@ sanitize_file_string(Str) -> error end. -do_net_server_append2(RegName, Sock, LenHex, Prefix) -> +do_net_server_append2(FluName, Sock, LenHex, Prefix) -> <> = machi_util:hexstr_to_bin(LenHex), ok = inet:setopts(Sock, [{packet, raw}]), {ok, Chunk} = gen_tcp:recv(Sock, Len, 60*1000), CSum = machi_util:checksum_chunk(Chunk), try - RegName ! {seq_append, self(), Prefix, Chunk, CSum} + FluName ! {seq_append, self(), Prefix, Chunk, CSum} catch error:badarg -> error_logger:error_msg("Message send to ~p gave badarg, make certain server is running with correct registered name\n", [?MODULE]) end, @@ -503,22 +514,30 @@ do_net_server_truncate_hackityhack2(Sock, File, DataDir) -> ok = gen_tcp:send(Sock, "ERROR\n") end. -write_server_get_pid(Prefix, DataDir) -> +write_server_get_pid(Prefix, DataDir, LinkPid) -> case write_server_find_pid(Prefix) of undefined -> - start_seq_append_server(Prefix, DataDir), + start_seq_append_server(Prefix, DataDir, LinkPid), timer:sleep(1), - write_server_get_pid(Prefix, DataDir); + write_server_get_pid(Prefix, DataDir, LinkPid); Pid -> Pid end. write_server_find_pid(Prefix) -> - RegName = machi_util:make_regname(Prefix), - whereis(RegName). + FluName = machi_util:make_regname(Prefix), + whereis(FluName). -start_seq_append_server(Prefix, DataDir) -> - spawn_link(fun() -> run_seq_append_server(Prefix, DataDir) end). +start_seq_append_server(Prefix, DataDir, AppendServerPid) -> + proc_lib:spawn_link(fun() -> + %% The following is only necessary to + %% make nice process relationships in + %% 'appmon' and related tools. + put('$ancestors', [AppendServerPid]), + put('$initial_call', {x,y,3}), + link(AppendServerPid), + run_seq_append_server(Prefix, DataDir) + end). run_seq_append_server(Prefix, DataDir) -> true = register(machi_util:make_regname(Prefix), self()), @@ -586,8 +605,9 @@ seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}=FH_, FileNum, Offset) -> after 30*1000 -> ok = file:close(FHd), ok = file:close(FHc), - machi_util:info_msg("stop: ~p server at file ~w offset ~w\n", - [Prefix, FileNum, Offset]), + machi_util:info_msg("stop: ~p server ~p at file ~w offset ~w\n", + [Prefix, self(), FileNum, Offset]), + machi_util:info_msg("links by ~p = ~p\n", [self(), process_info(self(), links)]), exit(normal) end. @@ -633,5 +653,8 @@ handle_projection_command({list_all_projections, ProjType}, handle_projection_command(Else, _S) -> {error, unknown_cmd, Else}. +make_listener_regname(BaseName) -> + list_to_atom(atom_to_list(BaseName) ++ "_listener"). + make_projection_server_regname(BaseName) -> - list_to_atom(atom_to_list(BaseName) ++ "_projection"). + list_to_atom(atom_to_list(BaseName) ++ "_pstore2"). diff --git a/src/machi_flu_psup.erl b/src/machi_flu_psup.erl new file mode 100644 index 0000000..13ce1b4 --- /dev/null +++ b/src/machi_flu_psup.erl @@ -0,0 +1,76 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +%% @doc Supervisor for Machi FLU servers and their related support +%% servers. + +-module(machi_flu_psup). + +-behaviour(supervisor). + +%% External API +-export([start_flu_package/4]). +%% Internal API +-export([start_link/4]). + +%% Supervisor callbacks +-export([init/1]). + +start_flu_package(FluName, TcpPort, DataDir, Props) -> + Spec = {FluName, {machi_flu_psup, start_link, + [FluName, TcpPort, DataDir, Props]}, + permanent, 5000, supervisor, []}, + {ok, _SupPid} = supervisor:start_child(machi_flu_sup, Spec). + + +start_link(FluName, TcpPort, DataDir, Props) -> + supervisor:start_link({local, make_p_regname(FluName)}, ?MODULE, + [FluName, TcpPort, DataDir, Props]). + +init([FluName, TcpPort, DataDir, Props0]) -> + RestartStrategy = one_for_all, + MaxRestarts = 1000, + MaxSecondsBetweenRestarts = 3600, + SupFlags = {RestartStrategy, MaxRestarts, MaxSecondsBetweenRestarts}, + + ProjRegName = make_proj_supname(FluName), + ProjSpec = {ProjRegName, + {machi_projection_store, start_link, + [ProjRegName, DataDir, zarfus_todo]}, + permanent, 5000, worker, []}, + MgrSpec = {make_mgr_supname(FluName), + {machi_chain_manager1, start_link, + [FluName, []]}, + permanent, 5000, worker, []}, + Props = [{projection_store_registered_name, ProjRegName}|Props0], + FluSpec = {FluName, + {machi_flu1, start_link, + [ [{FluName, TcpPort, DataDir}|Props] ]}, + permanent, 5000, worker, []}, + {ok, {SupFlags, [ProjSpec, MgrSpec, FluSpec]}}. + +make_p_regname(FluName) when is_atom(FluName) -> + list_to_atom("flusup_" ++ atom_to_list(FluName)). + +make_mgr_supname(MgrName) when is_atom(MgrName) -> + list_to_atom(atom_to_list(MgrName) ++ "_s"). + +make_proj_supname(ProjName) when is_atom(ProjName) -> + list_to_atom(atom_to_list(ProjName) ++ "_pstore"). diff --git a/src/machi_flu_sup.erl b/src/machi_flu_sup.erl index ce29502..f74ea07 100644 --- a/src/machi_flu_sup.erl +++ b/src/machi_flu_sup.erl @@ -40,15 +40,6 @@ init([]) -> RestartStrategy = one_for_one, MaxRestarts = 1000, MaxSecondsBetweenRestarts = 3600, - SupFlags = {RestartStrategy, MaxRestarts, MaxSecondsBetweenRestarts}, + {ok, {SupFlags, []}}. - Restart = permanent, - Shutdown = 5000, - Type = worker, - - {ok, FluList} = application:get_env(machi, flu_list), - FluSpecs = [{FluName, {machi_flu, start_link, [FluArgs]}, - Restart, Shutdown, Type, []} || - {FluName, _Port, _Dir}=FluArgs <- FluList], - {ok, {SupFlags, FluSpecs}}. diff --git a/src/machi_projection.erl b/src/machi_projection.erl index 5f97f94..8e48966 100644 --- a/src/machi_projection.erl +++ b/src/machi_projection.erl @@ -51,7 +51,7 @@ new(EpochNum, MyName, MemberDict, Down_list, UPI_list, Repairing_list, Dbg) -> %% or it may be simply `list(p_srvr())', in which case we'll convert it %% to a `p_srvr_dict()'. -new(EpochNum, MyName, MembersDict0, Down_list, UPI_list, Repairing_list, +new(EpochNum, MyName, [_|_] = MembersDict0, Down_list, UPI_list, Repairing_list, Dbg, Dbg2) when is_integer(EpochNum), EpochNum >= 0, is_atom(MyName) orelse is_binary(MyName), @@ -85,6 +85,21 @@ new(EpochNum, MyName, MembersDict0, Down_list, UPI_list, Repairing_list, repairing=Repairing_list, dbg=Dbg }, + update_dbg2(update_checksum(P), Dbg2); +new(EpochNum, MyName, [] = _MembersDict0, _Down_list, _UPI_list,_Repairing_list, + Dbg, Dbg2) + when is_integer(EpochNum), EpochNum >= 0, + is_atom(MyName) orelse is_binary(MyName) -> + P = #projection_v1{epoch_number=EpochNum, + creation_time=now(), + author_server=MyName, + all_members=[], + members_dict=[], + down=[], + upi=[], + repairing=[], + dbg=Dbg + }, update_dbg2(update_checksum(P), Dbg2). %% @doc Update the checksum element of a projection record. diff --git a/test/machi_proxy_flu1_client_test.erl b/test/machi_proxy_flu1_client_test.erl index a22456e..7b1dcbc 100644 --- a/test/machi_proxy_flu1_client_test.erl +++ b/test/machi_proxy_flu1_client_test.erl @@ -65,7 +65,7 @@ api_smoke_test() -> %% Alright, now for the rest of the API, whee BadFile = <<"no-such-file">>, {error, no_such_file} = ?MUT:checksum_list(Prox1, FakeEpoch, BadFile), - {ok, [_]} = ?MUT:list_files(Prox1, FakeEpoch), + {ok, [_|_]} = ?MUT:list_files(Prox1, FakeEpoch), {ok, FakeEpoch} = ?MUT:get_latest_epoch(Prox1, public), {error, not_written} = ?MUT:read_latest_projection(Prox1, public), {error, not_written} = ?MUT:read_projection(Prox1, public, 44), From 99fd7e7fe12e4f01860483a667b603a0912492cb Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Thu, 30 Apr 2015 21:20:21 +0900 Subject: [PATCH 03/12] Add test/machi_flu_psup_test.erl, small cleanups --- TODO-shortterm.org | 4 +++ src/machi_flu1.erl | 1 - src/machi_flu_psup.erl | 11 ++++-- src/machi_projection_store.erl | 1 - test/machi_admin_util_test.erl | 2 ++ test/machi_flu_psup_test.erl | 62 ++++++++++++++++++++++++++++++++++ 6 files changed, 77 insertions(+), 4 deletions(-) create mode 100644 test/machi_flu_psup_test.erl diff --git a/TODO-shortterm.org b/TODO-shortterm.org index 8dc7a75..46e440c 100644 --- a/TODO-shortterm.org +++ b/TODO-shortterm.org @@ -33,6 +33,10 @@ func, and pattern match Erlang style in that func. *** TODO Attempt to remove cruft items in flapping_i? ** TODO Finish OTP'izing the Chain Manager with FLU & proj store processes +** TODO Add gproc and get rid of registered name rendezvous +*** TODO Fixes the atom table leak +*** TODO Fixes the problem of having active sequencer for the same prefix + on two FLUS in the same VM ** TODO Change all protocol ops to enforce the epoch ID ** TODO Add projection wedging logic to each FLU. diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl index 91ff10a..95c5a0b 100644 --- a/src/machi_flu1.erl +++ b/src/machi_flu1.erl @@ -607,7 +607,6 @@ seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}=FH_, FileNum, Offset) -> ok = file:close(FHc), machi_util:info_msg("stop: ~p server ~p at file ~w offset ~w\n", [Prefix, self(), FileNum, Offset]), - machi_util:info_msg("links by ~p = ~p\n", [self(), process_info(self(), links)]), exit(normal) end. diff --git a/src/machi_flu_psup.erl b/src/machi_flu_psup.erl index 13ce1b4..fca5ff1 100644 --- a/src/machi_flu_psup.erl +++ b/src/machi_flu_psup.erl @@ -26,7 +26,7 @@ -behaviour(supervisor). %% External API --export([start_flu_package/4]). +-export([start_flu_package/4, stop_flu_package/1]). %% Internal API -export([start_link/4]). @@ -39,6 +39,13 @@ start_flu_package(FluName, TcpPort, DataDir, Props) -> permanent, 5000, supervisor, []}, {ok, _SupPid} = supervisor:start_child(machi_flu_sup, Spec). +stop_flu_package(FluName) -> + case supervisor:terminate_child(machi_flu_sup, FluName) of + ok -> + ok = supervisor:delete_child(machi_flu_sup, FluName); + Else -> + Else + end. start_link(FluName, TcpPort, DataDir, Props) -> supervisor:start_link({local, make_p_regname(FluName)}, ?MODULE, @@ -57,7 +64,7 @@ init([FluName, TcpPort, DataDir, Props0]) -> permanent, 5000, worker, []}, MgrSpec = {make_mgr_supname(FluName), {machi_chain_manager1, start_link, - [FluName, []]}, + [FluName, [], Props0]}, permanent, 5000, worker, []}, Props = [{projection_store_registered_name, ProjRegName}|Props0], FluSpec = {FluName, diff --git a/src/machi_projection_store.erl b/src/machi_projection_store.erl index 4a68aa1..ffe8786 100644 --- a/src/machi_projection_store.erl +++ b/src/machi_projection_store.erl @@ -36,7 +36,6 @@ %% `private' type); the value is a projection data structure %% (`projection_v1()' type). - -module(machi_projection_store). -include("machi_projection.hrl"). diff --git a/test/machi_admin_util_test.erl b/test/machi_admin_util_test.erl index 8555959..ded534a 100644 --- a/test/machi_admin_util_test.erl +++ b/test/machi_admin_util_test.erl @@ -23,6 +23,8 @@ -ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + -include("machi.hrl"). -include("machi_projection.hrl"). diff --git a/test/machi_flu_psup_test.erl b/test/machi_flu_psup_test.erl new file mode 100644 index 0000000..8131712 --- /dev/null +++ b/test/machi_flu_psup_test.erl @@ -0,0 +1,62 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +%% @doc The Machi write-once projection store service. +%% +%% This API is gen_server-style message passing, intended for use +%% within a single Erlang node to glue together the projection store +%% server with the node-local process that implements Machi's TCP +%% client access protocol (on the "server side" of the TCP connection). +%% +%% All Machi client access to the projection store SHOULD NOT use this +%% module's API. +%% +%% The projection store is implemented by an Erlang/OTP `gen_server' +%% process that is associated with each FLU. Conceptually, the +%% projection store is an array of write-once registers. For each +%% projection store register, the key is a 2-tuple of an epoch number +%% (`non_neg_integer()' type) and a projection type (`public' or +%% `private' type); the value is a projection data structure +%% (`projection_v1()' type). + +-module(machi_flu_psup_test). + +-ifdef(TEST). + +-include_lib("eunit/include/eunit.hrl"). + +smoke_test() -> + {ok, PidA} = machi_flu_sup:start_link(), + try + {ok, _} = machi_flu_psup:start_flu_package(a, 5555, "./data.a", + [{active_mode,false}]), + {ok, _} = machi_flu_psup:start_flu_package(b, 5556, "./data.b", + [{active_mode,false}]), + {ok, _} = machi_flu_psup:start_flu_package(c, 5557, "./data.c", + [{active_mode,false}]), + ok + after + [ok = machi_flu_psup:stop_flu_package(X) || X <- [a,b,c]] + end. + +-endif. % TEST + + + From 7bafc1c28af2981a400de2695ab2d0837e9fb8e5 Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Thu, 30 Apr 2015 23:16:08 +0900 Subject: [PATCH 04/12] WIP: stop for the night, we are broken --- src/machi_chain_manager1.erl | 72 ++++++++++++++++++++++++++++++++---- src/machi_flu_psup.erl | 5 ++- test/machi_flu_psup_test.erl | 11 +++++- 3 files changed, 77 insertions(+), 11 deletions(-) diff --git a/src/machi_chain_manager1.erl b/src/machi_chain_manager1.erl index 7244822..fbe3f42 100644 --- a/src/machi_chain_manager1.erl +++ b/src/machi_chain_manager1.erl @@ -131,10 +131,31 @@ test_react_to_env(Pid) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +get_my_private_projection_members_dict(MgrOpts) -> + EmptyDict = orddict:new(), + case proplists:get_value(projection_store_registered_name, MgrOpts) of + undefined -> + EmptyDict; + Store -> + case machi_projection_store:read_latest_projection(Store, private) of + {error, not_written} -> + EmptyDict; + {ok, P} -> + P#projection_v1.members_dict + end + end. + init({MyName, MembersDict, MgrOpts}) -> - All_list = [P#p_srvr.name || {_, P} <- orddict:to_list(MembersDict)], + Dx = case MembersDict of + [] -> + get_my_private_projection_members_dict(MgrOpts); + _ -> + MembersDict + end, + All_list = [P#p_srvr.name || {_, P} <- orddict:to_list(Dx)], Opt = fun(Key, Default) -> proplists:get_value(Key, MgrOpts, Default) end, RunEnv = [{seed, Opt(seed, now())}, + {use_partition_simulator, Opt(use_partition_simulator, true)}, {network_partitions, Opt(network_partitions, [])}, {network_islands, Opt(network_islands, [])}, {flapping_i, Opt(flapping, [])}, @@ -277,16 +298,21 @@ cl_write_public_proj_remote(FLUs, Partitions, _Epoch, Proj, S) -> {{remote_write_results, Rs}, S}. do_cl_read_latest_public_projection(ReadRepairP, - #ch_mgr{proj=Proj1} = S) -> + #ch_mgr{name=MyName, proj=Proj1} = S) -> _Epoch1 = Proj1#projection_v1.epoch_number, case cl_read_latest_projection(public, S) of {needs_repair, FLUsRs, Extra, S3} -> if not ReadRepairP -> - {not_unanimous, todoxyz, [{results, FLUsRs}|Extra], S3}; + {not_unanimous, todoxyz, [{unanimous_flus, []}, + {results, FLUsRs}|Extra], S3}; true -> {_Status, S4} = do_read_repair(FLUsRs, Extra, S3), do_cl_read_latest_public_projection(ReadRepairP, S4) end; + {error_unwritten, FLUsRs, Extra, S3} -> + NoneProj = make_none_projection(MyName, [], orddict:new()), + {not_unanimous, NoneProj, [{unanimous_flus, []}, + {results, FLUsRs}|Extra], S3}; {UnanimousTag, Proj2, Extra, S3}=_Else -> {UnanimousTag, Proj2, Extra, S3} end. @@ -319,7 +345,7 @@ cl_read_latest_projection(ProjectionType, AllHosed, S) -> rank_and_sort_projections_with_extra(All_queried_list, FLUsRs, S2). rank_and_sort_projections_with_extra(All_queried_list, FLUsRs, - #ch_mgr{proj=CurrentProj}=S) -> + #ch_mgr{name=MyName,proj=CurrentProj}=S) -> UnwrittenRs = [x || {_, error_unwritten} <- FLUsRs], Ps = [Proj || {_FLU, Proj} <- FLUsRs, is_record(Proj, projection_v1)], BadAnswerFLUs = [FLU || {FLU, Answer} <- FLUsRs, @@ -328,7 +354,17 @@ rank_and_sort_projections_with_extra(All_queried_list, FLUsRs, if All_queried_list == [] orelse length(UnwrittenRs) == length(FLUsRs) -> - {error_unwritten, FLUsRs, [todo_fix_caller_perhaps], S}; + NoneProj = make_none_projection(MyName, [], orddict:new()), + Extra2 = [{all_members_replied, true}, + {all_queried_list, All_queried_list}, + {flus_rs, FLUsRs}, + {unanimous_flus,[]}, + {not_unanimous_flus, []}, + {bad_answer_flus, BadAnswerFLUs}, + {not_unanimous_answers, []}, + {trans_all_hosed, []}, + {trans_all_flap_counts, []}], + {not_unanimous, NoneProj, Extra2, S}; UnwrittenRs /= [] -> {needs_repair, FLUsRs, [flarfus], S}; true -> @@ -493,7 +529,13 @@ calc_up_nodes(#ch_mgr{name=MyName, proj=Proj, runenv=RunEnv1}=S) -> {UpNodes, Partitions, S#ch_mgr{runenv=RunEnv2}}. calc_up_nodes(MyName, AllMembers, RunEnv1) -> - {Partitions2, Islands2} = machi_partition_simulator:get(AllMembers), + {Partitions2, Islands2} = + case proplists:get_value(use_partition_simulator, RunEnv1) of + true -> + machi_partition_simulator:get(AllMembers); + false -> + {[], [AllMembers]} + end, catch ?REACT({partitions,Partitions2}), catch ?REACT({islands,Islands2}), UpNodes = lists:sort( @@ -581,6 +623,9 @@ react_to_env_A20(Retries, S) -> %% The UnanimousTag isn't quite sufficient for our needs. We need %% to determine if *all* of the UPI+Repairing FLUs are members of %% the unanimous server replies. +io:format(user, "\nReact ~P\n", [lists:reverse(get(react)), 10]), +io:format(user, "\nReadExtra ~p\n", [ReadExtra]), +io:format(user, "\nP_latest ~p\n", [P_latest]), UnanimousFLUs = lists:sort(proplists:get_value(unanimous_flus, ReadExtra)), UPI_Repairing_FLUs = lists:sort(P_latest#projection_v1.upi ++ P_latest#projection_v1.repairing), @@ -614,6 +659,9 @@ react_to_env_A30(Retries, P_latest, LatestUnanimousP, _ReadExtra, ?REACT({a30, ?LINE, [{newprop1, machi_projection:make_summary(P_newprop1)}]}), %% Are we flapping yet? +io:format(user, "React 2 ~P\n", [lists:reverse(get(react)), 109999]), +io:format(user, "NewProp1 ~p\n", [P_newprop1]), +io:format(user, "Current ~p\n", [P_current]), {P_newprop2, S3} = calculate_flaps(P_newprop1, P_current, FlapLimit, S2), %% Move the epoch number up ... originally done in C300. @@ -1186,9 +1234,13 @@ calculate_flaps(P_newprop, _P_current, _FlapLimit, P#projection_v1.repairing, P#projection_v1.down} || P <- Ps]), +QQQ = {_WhateverUnanimous, BestP, Props, _S} = cl_read_latest_projection(private, S), - NotBestPs = proplists:get_value(not_unanimous_answers, Props), + NotBestPs = proplists:get_value(not_unanimous_answers, Props, []), +io:format(user, "QQQ ~p\n", [QQQ]), +io:format(user, "BestP ~p\n", [BestP]), +io:format(user, "NotBestPs ~p\n", [NotBestPs]), DownUnion = lists:usort( lists:flatten( [P#projection_v1.down || @@ -1628,6 +1680,8 @@ merge_flap_counts([FlapCount|Rest], D1) -> end, D1, D2), merge_flap_counts(Rest, D3). +%% proxy_pid(Name, #ch_mgr{proxies_dict=[]}) -> +%% throw(empty_proxies_dict); proxy_pid(Name, #ch_mgr{proxies_dict=ProxiesDict}) -> orddict:fetch(Name, ProxiesDict). @@ -1672,6 +1726,7 @@ perhaps_call_t(S, Partitions, FLU, DoIt) -> perhaps_call(#ch_mgr{name=MyName}=S, Partitions, FLU, DoIt) -> ProxyPid = proxy_pid(FLU, S), RemoteFLU_p = FLU /= MyName, + try case RemoteFLU_p andalso lists:member({MyName, FLU}, Partitions) of false -> Res = DoIt(ProxyPid), @@ -1685,6 +1740,9 @@ perhaps_call(#ch_mgr{name=MyName}=S, Partitions, FLU, DoIt) -> _ -> (catch put(react, [{timeout1,me,MyName,to,FLU,RemoteFLU_p,Partitions}|get(react)])), exit(timeout) + end + catch throw:empty_proxies_dict -> + asdflkjweoiasd end. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% diff --git a/src/machi_flu_psup.erl b/src/machi_flu_psup.erl index fca5ff1..ecfaca4 100644 --- a/src/machi_flu_psup.erl +++ b/src/machi_flu_psup.erl @@ -58,15 +58,16 @@ init([FluName, TcpPort, DataDir, Props0]) -> SupFlags = {RestartStrategy, MaxRestarts, MaxSecondsBetweenRestarts}, ProjRegName = make_proj_supname(FluName), + Props = [{projection_store_registered_name, ProjRegName}, + {use_partition_simulator,false}|Props0], ProjSpec = {ProjRegName, {machi_projection_store, start_link, [ProjRegName, DataDir, zarfus_todo]}, permanent, 5000, worker, []}, MgrSpec = {make_mgr_supname(FluName), {machi_chain_manager1, start_link, - [FluName, [], Props0]}, + [FluName, [], Props]}, permanent, 5000, worker, []}, - Props = [{projection_store_registered_name, ProjRegName}|Props0], FluSpec = {FluName, {machi_flu1, start_link, [ [{FluName, TcpPort, DataDir}|Props] ]}, diff --git a/test/machi_flu_psup_test.erl b/test/machi_flu_psup_test.erl index 8131712..03f9d6f 100644 --- a/test/machi_flu_psup_test.erl +++ b/test/machi_flu_psup_test.erl @@ -43,7 +43,7 @@ -include_lib("eunit/include/eunit.hrl"). smoke_test() -> - {ok, PidA} = machi_flu_sup:start_link(), + {ok, SupPid} = machi_flu_sup:start_link(), try {ok, _} = machi_flu_psup:start_flu_package(a, 5555, "./data.a", [{active_mode,false}]), @@ -51,9 +51,16 @@ smoke_test() -> [{active_mode,false}]), {ok, _} = machi_flu_psup:start_flu_package(c, 5557, "./data.c", [{active_mode,false}]), + + [begin + QQ = machi_chain_manager1:test_react_to_env(a_chmgr), + io:format(user, "QQ ~p\n", [QQ]) + end || _ <- [1,2,3]], ok after - [ok = machi_flu_psup:stop_flu_package(X) || X <- [a,b,c]] + [ok = machi_flu_psup:stop_flu_package(X) || X <- [a,b,c]], + unlink(SupPid), + exit(SupPid, stop_please) end. -endif. % TEST From 53f6a753f4f5ad1ad7f50a0754a5a6b0b3ad73c2 Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Fri, 1 May 2015 14:51:42 +0900 Subject: [PATCH 05/12] WIP: tests pass, but not finished yet --- src/machi_chain_manager1.erl | 76 ++++++++++++++++++++++++------------ src/machi_util.erl | 11 ++++++ test/machi_flu_psup_test.erl | 15 +++---- 3 files changed, 71 insertions(+), 31 deletions(-) diff --git a/src/machi_chain_manager1.erl b/src/machi_chain_manager1.erl index fbe3f42..b9ba138 100644 --- a/src/machi_chain_manager1.erl +++ b/src/machi_chain_manager1.erl @@ -131,28 +131,31 @@ test_react_to_env(Pid) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -get_my_private_projection_members_dict(MgrOpts) -> - EmptyDict = orddict:new(), - case proplists:get_value(projection_store_registered_name, MgrOpts) of - undefined -> - EmptyDict; - Store -> - case machi_projection_store:read_latest_projection(Store, private) of - {error, not_written} -> - EmptyDict; - {ok, P} -> - P#projection_v1.members_dict - end - end. +%% Bootstrapping is a hassle ... when when isn't it? +%% +%% If InitMembersDict == [], then we don't know anything about the chain +%% that we'll be participating in. We'll have to wait for directions from +%% our sysadmin later. +%% +%% If InitMembersDict /= [], then we do know what chain we're +%% participating in. It's probably test code, since that's about the +%% only time that we know so much at init() time. +%% +%% In either case, we'll try to create & store an epoch 0 projection +%% and store it to both projections stores. This is tricky if +%% InitMembersDict == [] because InitMembersDict usually contains the +%% #p_svrv records that we need to *write* to the projection store, +%% even our own private store! For test code, we get the store +%% manager's pid in MgrOpts and use direct gen_server calls to the +%% local projection store. -init({MyName, MembersDict, MgrOpts}) -> - Dx = case MembersDict of - [] -> - get_my_private_projection_members_dict(MgrOpts); - _ -> - MembersDict - end, - All_list = [P#p_srvr.name || {_, P} <- orddict:to_list(Dx)], +init({MyName, InitMembersDict, MgrOpts}) -> + ZeroAll_list = [P#p_srvr.name || {_,P} <- orddict:to_list(InitMembersDict)], + ZeroProj = make_none_projection(MyName, ZeroAll_list, InitMembersDict), + ok = store_zeroth_projection_maybe(ZeroProj, MgrOpts), + + MembersDict = get_my_private_projection_members_dict(MgrOpts, InitMembersDict), + All_list = [P#p_srvr.name || {_, P} <- orddict:to_list(MembersDict)], Opt = fun(Key, Default) -> proplists:get_value(Key, MgrOpts, Default) end, RunEnv = [{seed, Opt(seed, now())}, {use_partition_simulator, Opt(use_partition_simulator, true)}, @@ -247,6 +250,29 @@ make_none_projection(MyName, All_list, MembersDict) -> UPI_list = [], machi_projection:new(MyName, MembersDict, UPI_list, Down_list, [], []). +get_my_private_projection_members_dict(MgrOpts, DefaultDict) -> + case proplists:get_value(projection_store_registered_name, MgrOpts) of + undefined -> + DefaultDict; + Store -> + {ok, P} = machi_projection_store:read_latest_projection(Store, + private), + P#projection_v1.members_dict + end. + +%% Write the epoch 0 projection store, to assist bootstrapping. If the +%% 0th epoch is already written, there's no problem. + +store_zeroth_projection_maybe(ZeroProj, MgrOpts) -> + case proplists:get_value(projection_store_registered_name, MgrOpts) of + undefined -> + ok; + Store -> + _ = machi_projection_store:write(Store, public, ZeroProj), + _ = machi_projection_store:write(Store, private, ZeroProj), + ok + end. + set_active_timer(#ch_mgr{name=MyName, members_dict=MembersDict}=S) -> FLU_list = [P#p_srvr.name || {_,P} <- orddict:to_list(MembersDict)], USec = calc_sleep_ranked_order(1000, 2000, MyName, FLU_list), @@ -607,6 +633,8 @@ rank_projection(#projection_v1{author_server=Author, ( N * length(Repairing_list)) + (N*N * length(UPI_list)). +do_react_to_env(#ch_mgr{proj=#projection_v1{members_dict=[]}}=S) -> + {empty_members_dict, S}; do_react_to_env(S) -> put(react, []), react_to_env_A10(S). @@ -1238,9 +1266,9 @@ QQQ = {_WhateverUnanimous, BestP, Props, _S} = cl_read_latest_projection(private, S), NotBestPs = proplists:get_value(not_unanimous_answers, Props, []), -io:format(user, "QQQ ~p\n", [QQQ]), -io:format(user, "BestP ~p\n", [BestP]), -io:format(user, "NotBestPs ~p\n", [NotBestPs]), +%% io:format(user, "QQQ ~p\n", [QQQ]), +%% io:format(user, "BestP ~p\n", [BestP]), +%% io:format(user, "NotBestPs ~p\n", [NotBestPs]), DownUnion = lists:usort( lists:flatten( [P#projection_v1.down || diff --git a/src/machi_util.erl b/src/machi_util.erl index 6025c25..aedf4c6 100644 --- a/src/machi_util.erl +++ b/src/machi_util.erl @@ -239,6 +239,17 @@ info_msg(Fmt, Args) -> _ -> error_logger:info_msg(Fmt, Args) end. +wait_for_death(Pid, 0) -> + exit({not_dead_yet, Pid}); +wait_for_death(Pid, Iters) when is_pid(Pid) -> + case erlang:is_process_alive(Pid) of + false -> + ok; + true -> + timer:sleep(1), + wait_for_death(Pid, Iters-1) + end. + %%%%%%%%%%%%%%%%% %% @doc Create a TCP connection to a remote Machi server. diff --git a/test/machi_flu_psup_test.erl b/test/machi_flu_psup_test.erl index 03f9d6f..f812120 100644 --- a/test/machi_flu_psup_test.erl +++ b/test/machi_flu_psup_test.erl @@ -43,6 +43,7 @@ -include_lib("eunit/include/eunit.hrl"). smoke_test() -> + [os:cmd("rm -rf " ++ X) || X <- ["./data.a", "./data.b", "/data.c"] ], {ok, SupPid} = machi_flu_sup:start_link(), try {ok, _} = machi_flu_psup:start_flu_package(a, 5555, "./data.a", @@ -51,16 +52,16 @@ smoke_test() -> [{active_mode,false}]), {ok, _} = machi_flu_psup:start_flu_package(c, 5557, "./data.c", [{active_mode,false}]), - + [begin - QQ = machi_chain_manager1:test_react_to_env(a_chmgr), - io:format(user, "QQ ~p\n", [QQ]) - end || _ <- [1,2,3]], + _QQ = machi_chain_manager1:test_react_to_env(a_chmgr), + ok + end || _ <- lists:seq(1,5)], ok after - [ok = machi_flu_psup:stop_flu_package(X) || X <- [a,b,c]], - unlink(SupPid), - exit(SupPid, stop_please) + exit(SupPid, normal), + machi_util:wait_for_death(SupPid, 100), + ok end. -endif. % TEST From 65993dfcb6c70b42dde734b720947556384815f8 Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Fri, 1 May 2015 14:52:19 +0900 Subject: [PATCH 06/12] WIP: tests pass, but not finished yet --- test/machi_flu_psup_test.erl | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/test/machi_flu_psup_test.erl b/test/machi_flu_psup_test.erl index f812120..b709fc0 100644 --- a/test/machi_flu_psup_test.erl +++ b/test/machi_flu_psup_test.erl @@ -18,24 +18,6 @@ %% %% ------------------------------------------------------------------- -%% @doc The Machi write-once projection store service. -%% -%% This API is gen_server-style message passing, intended for use -%% within a single Erlang node to glue together the projection store -%% server with the node-local process that implements Machi's TCP -%% client access protocol (on the "server side" of the TCP connection). -%% -%% All Machi client access to the projection store SHOULD NOT use this -%% module's API. -%% -%% The projection store is implemented by an Erlang/OTP `gen_server' -%% process that is associated with each FLU. Conceptually, the -%% projection store is an array of write-once registers. For each -%% projection store register, the key is a 2-tuple of an epoch number -%% (`non_neg_integer()' type) and a projection type (`public' or -%% `private' type); the value is a projection data structure -%% (`projection_v1()' type). - -module(machi_flu_psup_test). -ifdef(TEST). From 16750201505668d56cf078a2a4c77f609170e225 Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Sat, 2 May 2015 00:33:49 +0900 Subject: [PATCH 07/12] WIP, tests pass again, includign the newest one --- TODO-shortterm.org | 1 + src/machi_chain_manager1.erl | 117 ++++++++++++++++---------- src/machi_proxy_flu1_client.erl | 2 +- test/machi_flu_psup_test.erl | 37 ++++++-- test/machi_proxy_flu1_client_test.erl | 2 +- 5 files changed, 105 insertions(+), 54 deletions(-) diff --git a/TODO-shortterm.org b/TODO-shortterm.org index 46e440c..44eed7b 100644 --- a/TODO-shortterm.org +++ b/TODO-shortterm.org @@ -37,6 +37,7 @@ func, and pattern match Erlang style in that func. *** TODO Fixes the atom table leak *** TODO Fixes the problem of having active sequencer for the same prefix on two FLUS in the same VM +** TODO Eliminate the timeout exception for the client: just {error,timeout} ret ** TODO Change all protocol ops to enforce the epoch ID ** TODO Add projection wedging logic to each FLU. diff --git a/src/machi_chain_manager1.erl b/src/machi_chain_manager1.erl index b9ba138..5cb962a 100644 --- a/src/machi_chain_manager1.erl +++ b/src/machi_chain_manager1.erl @@ -65,7 +65,8 @@ -define(REACT(T), put(react, [T|get(react)])). %% API --export([start_link/2, start_link/3, stop/1, ping/1]). +-export([start_link/2, start_link/3, stop/1, ping/1, + set_chain_members/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -104,6 +105,9 @@ stop(Pid) -> ping(Pid) -> gen_server:call(Pid, {ping}, infinity). +set_chain_members(Pid, MembersDict) -> + gen_server:call(Pid, {set_chain_members, MembersDict}, infinity). + -ifdef(TEST). %% Test/debugging code only. @@ -150,6 +154,7 @@ test_react_to_env(Pid) -> %% local projection store. init({MyName, InitMembersDict, MgrOpts}) -> + init_remember_partition_hack(), ZeroAll_list = [P#p_srvr.name || {_,P} <- orddict:to_list(InitMembersDict)], ZeroProj = make_none_projection(MyName, ZeroAll_list, InitMembersDict), ok = store_zeroth_projection_maybe(ZeroProj, MgrOpts), @@ -163,35 +168,30 @@ init({MyName, InitMembersDict, MgrOpts}) -> {network_islands, Opt(network_islands, [])}, {flapping_i, Opt(flapping, [])}, {up_nodes, Opt(up_nodes, not_init_yet)}], - ActiveP = Opt(active_mode, true), - NoneProj = make_none_projection(MyName, All_list, MembersDict), - Proxies = orddict:fold( - fun(K, P, Acc) -> - {ok, Pid} = ?FLU_PC:start_link(P), - [{K, Pid}|Acc] - end, [], MembersDict), + ActiveP = Opt(active_mode, false), S = #ch_mgr{name=MyName, %% TODO 2015-03-04: revisit, should this constant be bigger? %% Yes, this should be bigger, but it's a hack. There is %% no guarantee that all parties will advance to a minimum %% flap awareness in the amount of time that this mgr will. flap_limit=length(All_list) + 50, - proj=NoneProj, timer='undefined', proj_history=queue:new(), runenv=RunEnv, - opts=MgrOpts, - members_dict=MembersDict, - proxies_dict=orddict:from_list(Proxies)}, - S2 = if ActiveP == false -> - S; + opts=MgrOpts}, + {_, S2} = do_set_chain_members(MembersDict, S), + S3 = if ActiveP == false -> + S2; ActiveP == true -> - set_active_timer(S) + set_active_timer(S2) end, - {ok, S2}. + {ok, S3}. handle_call({ping}, _From, S) -> {reply, pong, S}; +handle_call({set_chain_members, MembersDict}, _From, S) -> + {Reply, S2} = do_set_chain_members(MembersDict, S), + {reply, Reply, S2}; handle_call({stop}, _From, S) -> {stop, normal, ok, S}; handle_call({test_calc_projection, KeepRunenvP}, _From, @@ -311,7 +311,7 @@ cl_write_public_proj_local(Epoch, Proj, SkipLocalWriteErrorP, Else when SkipLocalWriteErrorP -> {XX, SS} = Continue(), {{local_write_result, Else, XX}, SS}; - Else when Else == error_written; Else == timeout; Else == t_timeout -> + Else -> {Else, S2} end. @@ -350,13 +350,16 @@ read_latest_projection_call_only(ProjectionType, AllHosed, {_UpNodes, Partitions, S2} = calc_up_nodes(S), DoIt = fun(Pid) -> - case ?FLU_PC:read_latest_projection(Pid, ProjectionType, ?TO) of + case (?FLU_PC:read_latest_projection(Pid, ProjectionType, ?TO)) of {ok, P} -> P; Else -> Else end end, +%% io:format(user, "All_queried_list ~p\n", [All_queried_list]), Rs = [perhaps_call_t(S, Partitions, FLU, fun(Pid) -> DoIt(Pid) end) || FLU <- All_queried_list], + %% Rs = [perhaps_call_t(S, Partitions, FLU, fun(Pid) -> DoIt(Pid) end) || + %% FLU <- All_queried_list], FLUsRs = lists:zip(All_queried_list, Rs), {All_queried_list, FLUsRs, S2}. @@ -555,15 +558,17 @@ calc_up_nodes(#ch_mgr{name=MyName, proj=Proj, runenv=RunEnv1}=S) -> {UpNodes, Partitions, S#ch_mgr{runenv=RunEnv2}}. calc_up_nodes(MyName, AllMembers, RunEnv1) -> - {Partitions2, Islands2} = - case proplists:get_value(use_partition_simulator, RunEnv1) of - true -> - machi_partition_simulator:get(AllMembers); - false -> - {[], [AllMembers]} - end, - catch ?REACT({partitions,Partitions2}), - catch ?REACT({islands,Islands2}), + case proplists:get_value(use_partition_simulator, RunEnv1) of + true -> + calc_up_nodes_sim(MyName, AllMembers, RunEnv1); + false -> + {AllMembers -- get(remember_partition_hack), [], RunEnv1} + end. + +calc_up_nodes_sim(MyName, AllMembers, RunEnv1) -> + {Partitions2, Islands2} = machi_partition_simulator:get(AllMembers), + catch ?REACT({calc_up_nodes,?LINE,[{partitions,Partitions2}, + {islands,Islands2}]}), UpNodes = lists:sort( [Node || Node <- AllMembers, not lists:member({MyName, Node}, Partitions2), @@ -633,6 +638,23 @@ rank_projection(#projection_v1{author_server=Author, ( N * length(Repairing_list)) + (N*N * length(UPI_list)). +do_set_chain_members(MembersDict, + #ch_mgr{name=MyName, proxies_dict=OldProxiesDict}=S) -> + catch orddict:fold( + fun(_K, Pid, _Acc) -> + _ = (catch ?FLU_PC:quit(Pid)) + end, [], OldProxiesDict), + All_list = [P#p_srvr.name || {_, P} <- orddict:to_list(MembersDict)], + NoneProj = make_none_projection(MyName, All_list, MembersDict), + Proxies = orddict:fold( + fun(K, P, Acc) -> + {ok, Pid} = ?FLU_PC:start_link(P), + [{K, Pid}|Acc] + end, [], MembersDict), + {ok, S#ch_mgr{proj=NoneProj, + members_dict=MembersDict, + proxies_dict=orddict:from_list(Proxies)}}. + do_react_to_env(#ch_mgr{proj=#projection_v1{members_dict=[]}}=S) -> {empty_members_dict, S}; do_react_to_env(S) -> @@ -645,15 +667,15 @@ react_to_env_A10(S) -> react_to_env_A20(Retries, S) -> ?REACT(a20), + init_remember_partition_hack(), {UnanimousTag, P_latest, ReadExtra, S2} = do_cl_read_latest_public_projection(true, S), %% The UnanimousTag isn't quite sufficient for our needs. We need %% to determine if *all* of the UPI+Repairing FLUs are members of - %% the unanimous server replies. -io:format(user, "\nReact ~P\n", [lists:reverse(get(react)), 10]), -io:format(user, "\nReadExtra ~p\n", [ReadExtra]), -io:format(user, "\nP_latest ~p\n", [P_latest]), + %% the unanimous server replies. All Repairing FLUs should be up + %% now (because if they aren't then they cannot be repairing), so + %% all Repairing FLUs have no non-race excuse not to be in UnanimousFLUs. UnanimousFLUs = lists:sort(proplists:get_value(unanimous_flus, ReadExtra)), UPI_Repairing_FLUs = lists:sort(P_latest#projection_v1.upi ++ P_latest#projection_v1.repairing), @@ -687,9 +709,6 @@ react_to_env_A30(Retries, P_latest, LatestUnanimousP, _ReadExtra, ?REACT({a30, ?LINE, [{newprop1, machi_projection:make_summary(P_newprop1)}]}), %% Are we flapping yet? -io:format(user, "React 2 ~P\n", [lists:reverse(get(react)), 109999]), -io:format(user, "NewProp1 ~p\n", [P_newprop1]), -io:format(user, "Current ~p\n", [P_current]), {P_newprop2, S3} = calculate_flaps(P_newprop1, P_current, FlapLimit, S2), %% Move the epoch number up ... originally done in C300. @@ -1166,7 +1185,10 @@ react_to_env_C110(P_latest, #ch_mgr{name=MyName} = S) -> P_latest2 = machi_projection:update_dbg2(P_latest, Extra_todo), MyNamePid = proxy_pid(MyName, S), - ok = ?FLU_PC:write_projection(MyNamePid, private, P_latest2, ?TO), + %% This is the local projection store. Use a larger timeout, so + %% that things locally are pretty horrible if we're killed by a + %% timeout exception. + ok = ?FLU_PC:write_projection(MyNamePid, private, P_latest2, ?TO*30), case proplists:get_value(private_write_verbose, S#ch_mgr.opts) of true -> {_,_,C} = os:timestamp(), @@ -1262,7 +1284,7 @@ calculate_flaps(P_newprop, _P_current, _FlapLimit, P#projection_v1.repairing, P#projection_v1.down} || P <- Ps]), -QQQ = + _QQQ = {_WhateverUnanimous, BestP, Props, _S} = cl_read_latest_projection(private, S), NotBestPs = proplists:get_value(not_unanimous_answers, Props, []), @@ -1708,8 +1730,6 @@ merge_flap_counts([FlapCount|Rest], D1) -> end, D1, D2), merge_flap_counts(Rest, D3). -%% proxy_pid(Name, #ch_mgr{proxies_dict=[]}) -> -%% throw(empty_proxies_dict); proxy_pid(Name, #ch_mgr{proxies_dict=ProxiesDict}) -> orddict:fetch(Name, ProxiesDict). @@ -1748,16 +1768,23 @@ perhaps_call_t(S, Partitions, FLU, DoIt) -> perhaps_call(S, Partitions, FLU, DoIt) catch exit:timeout -> - t_timeout + {error, partition}; + exit:{timeout,_} -> + {error, partition} end. perhaps_call(#ch_mgr{name=MyName}=S, Partitions, FLU, DoIt) -> ProxyPid = proxy_pid(FLU, S), RemoteFLU_p = FLU /= MyName, - try + erase(bad_sock), case RemoteFLU_p andalso lists:member({MyName, FLU}, Partitions) of false -> Res = DoIt(ProxyPid), + if Res == {error, partition} -> + remember_partition_hack(FLU); + true -> + ok + end, case RemoteFLU_p andalso lists:member({FLU, MyName}, Partitions) of false -> Res; @@ -1768,10 +1795,12 @@ perhaps_call(#ch_mgr{name=MyName}=S, Partitions, FLU, DoIt) -> _ -> (catch put(react, [{timeout1,me,MyName,to,FLU,RemoteFLU_p,Partitions}|get(react)])), exit(timeout) - end - catch throw:empty_proxies_dict -> - asdflkjweoiasd end. -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +init_remember_partition_hack() -> + put(remember_partition_hack, []). +remember_partition_hack(FLU) -> + put(remember_partition_hack, [FLU|get(remember_partition_hack)]). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% diff --git a/src/machi_proxy_flu1_client.erl b/src/machi_proxy_flu1_client.erl index ba3a3d2..f69d573 100644 --- a/src/machi_proxy_flu1_client.erl +++ b/src/machi_proxy_flu1_client.erl @@ -250,7 +250,7 @@ do_req(Req, S) -> end end; false -> - {{error, not_connected}, S2} + {{error, partition}, S2} end. make_req_fun({append_chunk, EpochID, Prefix, Chunk}, #state{sock=Sock}) -> diff --git a/test/machi_flu_psup_test.erl b/test/machi_flu_psup_test.erl index b709fc0..6786c5e 100644 --- a/test/machi_flu_psup_test.erl +++ b/test/machi_flu_psup_test.erl @@ -24,17 +24,37 @@ -include_lib("eunit/include/eunit.hrl"). -smoke_test() -> - [os:cmd("rm -rf " ++ X) || X <- ["./data.a", "./data.b", "/data.c"] ], +-include("machi_projection.hrl"). + +%% smoke_test2() will try repeatedly to make a TCP connection to ports +%% on localhost that have no listener. +%% If you use 'sysctl -w net.inet.icmp.icmplim=3' before running this +%% test, you'll get to exercise some timeout handling in +%% machi_chain_manager1:perhaps_call_t(). +%% The default for net.inet.icmp.icmplim is 50. + +smoke_test_() -> + {timeout, 5*60, fun() -> smoke_test2() end}. + +smoke_test2() -> + Ps = [{a,#p_srvr{name=a, address="localhost", port=5555, props="./data.a"}}, + {b,#p_srvr{name=b, address="localhost", port=5556, props="./data.b"}}, + {c,#p_srvr{name=c, address="localhost", port=5557, props="./data.c"}} + ], + [os:cmd("rm -rf " ++ P#p_srvr.props) || {_,P} <- Ps], {ok, SupPid} = machi_flu_sup:start_link(), try - {ok, _} = machi_flu_psup:start_flu_package(a, 5555, "./data.a", - [{active_mode,false}]), - {ok, _} = machi_flu_psup:start_flu_package(b, 5556, "./data.b", - [{active_mode,false}]), - {ok, _} = machi_flu_psup:start_flu_package(c, 5557, "./data.c", - [{active_mode,false}]), + [begin + #p_srvr{name=Name, port=Port, props=Dir} = P, + {ok, _} = machi_flu_psup:start_flu_package(Name, Port, Dir, []) + end || {_,P} <- [hd(Ps)]], + %% end || {_,P} <- Ps], + [begin + _QQ = machi_chain_manager1:test_react_to_env(a_chmgr), + ok + end || _ <- lists:seq(1,5)], + machi_chain_manager1:set_chain_members(a_chmgr, orddict:from_list(Ps)), [begin _QQ = machi_chain_manager1:test_react_to_env(a_chmgr), ok @@ -42,6 +62,7 @@ smoke_test() -> ok after exit(SupPid, normal), + [os:cmd("rm -rf " ++ P#p_srvr.props) || {_,P} <- Ps], machi_util:wait_for_death(SupPid, 100), ok end. diff --git a/test/machi_proxy_flu1_client_test.erl b/test/machi_proxy_flu1_client_test.erl index 7b1dcbc..5de23e8 100644 --- a/test/machi_proxy_flu1_client_test.erl +++ b/test/machi_proxy_flu1_client_test.erl @@ -49,7 +49,7 @@ api_smoke_test() -> {error,_} = ?MUT:append_chunk(Prox1, FakeEpoch, <<"prefix">>, <<"data">>, infinity), - {error,not_connected} = ?MUT:append_chunk(Prox1, + {error,partition} = ?MUT:append_chunk(Prox1, FakeEpoch, <<"prefix">>, <<"data">>, infinity), %% Start the FLU again, we should be able to do stuff immediately From a7bd8e43d3b1cc1ac07f3426f764b1e6654f2cca Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Sat, 2 May 2015 16:59:28 +0900 Subject: [PATCH 08/12] Clean up machi_flu_psup_test.erl --- src/machi_chain_manager1.erl | 66 +++++++++++++----------------------- src/machi_flu1.erl | 8 +++++ src/machi_flu_psup.erl | 5 +-- test/machi_flu_psup_test.erl | 59 +++++++++++++++++++++++++++----- 4 files changed, 86 insertions(+), 52 deletions(-) diff --git a/src/machi_chain_manager1.erl b/src/machi_chain_manager1.erl index 5cb962a..40465f9 100644 --- a/src/machi_chain_manager1.erl +++ b/src/machi_chain_manager1.erl @@ -66,18 +66,17 @@ %% API -export([start_link/2, start_link/3, stop/1, ping/1, - set_chain_members/2]). + set_chain_members/2, set_active/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([projection_transitions_are_sane/2]). +-export([make_chmgr_regname/1, projection_transitions_are_sane/2]). -ifdef(TEST). -export([test_calc_projection/2, test_write_public_projection/2, test_read_latest_public_projection/2, - test_set_active/2, test_react_to_env/1, get_all_hosed/1]). @@ -96,7 +95,7 @@ start_link(MyName, MembersDict) -> start_link(MyName, MembersDict, []). start_link(MyName, MembersDict, MgrOpts) -> - gen_server:start_link({local, make_regname(MyName)}, ?MODULE, + gen_server:start_link({local, make_chmgr_regname(MyName)}, ?MODULE, {MyName, MembersDict, MgrOpts}, []). stop(Pid) -> @@ -108,6 +107,9 @@ ping(Pid) -> set_chain_members(Pid, MembersDict) -> gen_server:call(Pid, {set_chain_members, MembersDict}, infinity). +set_active(Pid, Boolean) when Boolean == true; Boolean == false -> + gen_server:call(Pid, {set_active, Boolean}, infinity). + -ifdef(TEST). %% Test/debugging code only. @@ -125,9 +127,6 @@ test_read_latest_public_projection(Pid, ReadRepairP) -> gen_server:call(Pid, {test_read_latest_public_projection, ReadRepairP}, infinity). -test_set_active(Pid, Boolean) when Boolean == true; Boolean == false -> - gen_server:call(Pid, {test_set_active, Boolean}, infinity). - test_react_to_env(Pid) -> gen_server:call(Pid, {test_react_to_env}, infinity). @@ -192,6 +191,17 @@ handle_call({ping}, _From, S) -> handle_call({set_chain_members, MembersDict}, _From, S) -> {Reply, S2} = do_set_chain_members(MembersDict, S), {reply, Reply, S2}; +handle_call({set_active, Boolean}, _From, #ch_mgr{timer=TRef}=S) -> + case {Boolean, TRef} of + {true, undefined} -> + S2 = set_active_timer(S), + {reply, ok, S2}; + {false, _} -> + (catch timer:cancel(TRef)), + {reply, ok, S#ch_mgr{timer=undefined}}; + _ -> + {reply, error, S} + end; handle_call({stop}, _From, S) -> {stop, normal, ok, S}; handle_call({test_calc_projection, KeepRunenvP}, _From, @@ -209,17 +219,6 @@ handle_call({test_read_latest_public_projection, ReadRepairP}, _From, S) -> do_cl_read_latest_public_projection(ReadRepairP, S), Res = {Perhaps, Val, ExtraInfo}, {reply, Res, S2}; -handle_call({test_set_active, Boolean}, _From, #ch_mgr{timer=TRef}=S) -> - case {Boolean, TRef} of - {true, undefined} -> - S2 = set_active_timer(S), - {reply, ok, S2}; - {false, _} -> - (catch timer:cancel(TRef)), - {reply, ok, S#ch_mgr{timer=undefined}}; - _ -> - {reply, error, S} - end; handle_call({test_react_to_env}, _From, S) -> {TODOtodo, S2} = do_react_to_env(S), {reply, TODOtodo, S2}; @@ -324,7 +323,7 @@ cl_write_public_proj_remote(FLUs, Partitions, _Epoch, Proj, S) -> {{remote_write_results, Rs}, S}. do_cl_read_latest_public_projection(ReadRepairP, - #ch_mgr{name=MyName, proj=Proj1} = S) -> + #ch_mgr{proj=Proj1} = S) -> _Epoch1 = Proj1#projection_v1.epoch_number, case cl_read_latest_projection(public, S) of {needs_repair, FLUsRs, Extra, S3} -> @@ -335,12 +334,8 @@ do_cl_read_latest_public_projection(ReadRepairP, {_Status, S4} = do_read_repair(FLUsRs, Extra, S3), do_cl_read_latest_public_projection(ReadRepairP, S4) end; - {error_unwritten, FLUsRs, Extra, S3} -> - NoneProj = make_none_projection(MyName, [], orddict:new()), - {not_unanimous, NoneProj, [{unanimous_flus, []}, - {results, FLUsRs}|Extra], S3}; - {UnanimousTag, Proj2, Extra, S3}=_Else -> - {UnanimousTag, Proj2, Extra, S3} + {_UnanimousTag, _Proj2, _Extra, _S3}=Else -> + Else end. read_latest_projection_call_only(ProjectionType, AllHosed, @@ -1569,6 +1564,8 @@ projection_transition_is_sane( if UPI_2_suffix == UPI_2_concat -> ok; true -> + %% 'make dialyzer' will believe that this can never succeed. + %% 'make dialyzer-test' will not complain, however. if RetrospectiveP -> %% We are in retrospective mode. But there are %% some transitions that are difficult to find @@ -1673,13 +1670,6 @@ calc_sleep_ranked_order(MinSleep, MaxSleep, FLU, FLU_list) -> end, MinSleep + (SleepChunk * SleepIndex). -my_find_minmost([]) -> - 0; -my_find_minmost([{_,_}|_] = TransFlapCounts0) -> - lists:min([FlapCount || {_T, {_FlTime, FlapCount}} <- TransFlapCounts0]); -my_find_minmost(TransFlapCounts0) -> - lists:min(TransFlapCounts0). - get_raw_flapping_i(#projection_v1{dbg=Dbg}) -> proplists:get_value(flapping_i, Dbg, []). @@ -1689,14 +1679,6 @@ get_flap_count(P) -> get_all_flap_counts(P) -> proplists:get_value(all_flap_counts, get_raw_flapping_i(P), []). -get_all_flap_counts_counts(P) -> - case get_all_flap_counts(P) of - [] -> - []; - [{_,{_,_}}|_] = Cs -> - [Count || {_FLU, {_Time, Count}} <- Cs] - end. - get_all_hosed(P) when is_record(P, projection_v1)-> proplists:get_value(all_hosed, get_raw_flapping_i(P), []). @@ -1756,9 +1738,9 @@ inner_projection_or_self(P) -> P_inner end. -make_regname(A) when is_atom(A) -> +make_chmgr_regname(A) when is_atom(A) -> list_to_atom(atom_to_list(A) ++ "_chmgr"); -make_regname(B) when is_binary(B) -> +make_chmgr_regname(B) when is_binary(B) -> list_to_atom(binary_to_list(B) ++ "_chmgr"). %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl index 95c5a0b..7b06f62 100644 --- a/src/machi_flu1.erl +++ b/src/machi_flu1.erl @@ -67,6 +67,7 @@ -include("machi_projection.hrl"). -export([start_link/1, stop/1]). +-export([make_listener_regname/1, make_projection_server_regname/1]). -record(state, { flu_name :: atom(), @@ -655,5 +656,12 @@ handle_projection_command(Else, _S) -> make_listener_regname(BaseName) -> list_to_atom(atom_to_list(BaseName) ++ "_listener"). +%% This is the name of the projection store that is spawned by the +%% *flu*, for use primarily in testing scenarios. In normal use, we +%% ought to be using the OTP style of managing processes, via +%% supervisors, namely via machi_flu_psup.erl, which uses a +%% *different* naming convention for the projection store name that it +%% registers. + make_projection_server_regname(BaseName) -> list_to_atom(atom_to_list(BaseName) ++ "_pstore2"). diff --git a/src/machi_flu_psup.erl b/src/machi_flu_psup.erl index ecfaca4..4f9d408 100644 --- a/src/machi_flu_psup.erl +++ b/src/machi_flu_psup.erl @@ -28,7 +28,8 @@ %% External API -export([start_flu_package/4, stop_flu_package/1]). %% Internal API --export([start_link/4]). +-export([start_link/4, + make_p_regname/1, make_mgr_supname/1, make_proj_supname/1]). %% Supervisor callbacks -export([init/1]). @@ -78,7 +79,7 @@ make_p_regname(FluName) when is_atom(FluName) -> list_to_atom("flusup_" ++ atom_to_list(FluName)). make_mgr_supname(MgrName) when is_atom(MgrName) -> - list_to_atom(atom_to_list(MgrName) ++ "_s"). + machi_chain_manager1:make_chmgr_regname(MgrName). make_proj_supname(ProjName) when is_atom(ProjName) -> list_to_atom(atom_to_list(ProjName) ++ "_pstore"). diff --git a/test/machi_flu_psup_test.erl b/test/machi_flu_psup_test.erl index 6786c5e..59ea017 100644 --- a/test/machi_flu_psup_test.erl +++ b/test/machi_flu_psup_test.erl @@ -44,21 +44,64 @@ smoke_test2() -> [os:cmd("rm -rf " ++ P#p_srvr.props) || {_,P} <- Ps], {ok, SupPid} = machi_flu_sup:start_link(), try + %% Only run a, don't run b & c so we have 100% failures talking to them [begin #p_srvr{name=Name, port=Port, props=Dir} = P, {ok, _} = machi_flu_psup:start_flu_package(Name, Port, Dir, []) end || {_,P} <- [hd(Ps)]], - %% end || {_,P} <- Ps], - [begin - _QQ = machi_chain_manager1:test_react_to_env(a_chmgr), - ok - end || _ <- lists:seq(1,5)], + [machi_chain_manager1:test_react_to_env(a_chmgr) || _ <-lists:seq(1,5)], machi_chain_manager1:set_chain_members(a_chmgr, orddict:from_list(Ps)), + [machi_chain_manager1:test_react_to_env(a_chmgr) || _ <-lists:seq(1,5)], + ok + after + exit(SupPid, normal), + [os:cmd("rm -rf " ++ P#p_srvr.props) || {_,P} <- Ps], + machi_util:wait_for_death(SupPid, 100), + ok + end. + +smoke2_test_() -> + {timeout, 5*60, fun() -> smoke2_test2() end}. + +smoke2_test2() -> + Ps = [{a,#p_srvr{name=a, address="localhost", port=5555, props="./data.a"}}, + {b,#p_srvr{name=b, address="localhost", port=5556, props="./data.b"}}, + {c,#p_srvr{name=c, address="localhost", port=5557, props="./data.c"}} + ], + [os:cmd("rm -rf " ++ P#p_srvr.props) || {_,P} <- Ps], + {ok, SupPid} = machi_flu_sup:start_link(), + try [begin - _QQ = machi_chain_manager1:test_react_to_env(a_chmgr), - ok - end || _ <- lists:seq(1,5)], + #p_srvr{name=Name, port=Port, props=Dir} = P, + {ok, _} = machi_flu_psup:start_flu_package(Name, Port, Dir, + [{active_mode,false}]) + end || {_,P} <- Ps], + + ChMgrs = [machi_flu_psup:make_mgr_supname(P#p_srvr.name) || {_,P} <-Ps], + PStores = [machi_flu_psup:make_proj_supname(P#p_srvr.name) || {_,P} <-Ps], + Dict = orddict:from_list(Ps), + [machi_chain_manager1:set_chain_members(ChMgr, Dict) || + ChMgr <- ChMgrs ], + + {now_using,_,_} = machi_chain_manager1:test_react_to_env(hd(ChMgrs)), + [begin + _QQa = machi_chain_manager1:test_react_to_env(ChMgr) + end || _ <- lists:seq(1,25), ChMgr <- ChMgrs], + + %% All chain maanagers & projection stores should be using the + %% same projection which is max projection in each store. + {no_change,_,Epoch_z} = machi_chain_manager1:test_react_to_env( + hd(ChMgrs)), + [{no_change,_,Epoch_z} = machi_chain_manager1:test_react_to_env( + ChMgr )|| ChMgr <- ChMgrs], + {ok, Proj_z} = machi_projection_store:read_latest_projection( + hd(PStores), public), + [begin + {ok, Proj_z} = machi_projection_store:read_latest_projection( + PStore, ProjType) + end || ProjType <- [public, private], PStore <- PStores ], + Epoch_z = Proj_z#projection_v1.epoch_number, ok after exit(SupPid, normal), From aeb2e4ef9e3bbbb5bd860ff665118c3add2f9e4c Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Wed, 6 May 2015 11:41:04 +0900 Subject: [PATCH 09/12] WIP: partial refactoring of chmgr 2nd start code, one test broken --- TODO-shortterm.org | 1 - src/machi_chain_manager1.erl | 48 +++++++++++++++++++++++------------- 2 files changed, 31 insertions(+), 18 deletions(-) diff --git a/TODO-shortterm.org b/TODO-shortterm.org index 44eed7b..a539b98 100644 --- a/TODO-shortterm.org +++ b/TODO-shortterm.org @@ -46,6 +46,5 @@ func, and pattern match Erlang style in that func. ** TODO Move prototype/chain-manager code to "top" of source tree *** TODO Preserve current test code (leave as-is? tiny changes?) *** TODO Make chain manager code flexible enough to run "real world" or "sim" -** TODO Replace registered name use from FLU write/append dispatcher ** TODO Move the FLU server to gen_server behavior? ** TODO Implement real data repair, orchestrated by the chain manager diff --git a/src/machi_chain_manager1.erl b/src/machi_chain_manager1.erl index 40465f9..5cf9699 100644 --- a/src/machi_chain_manager1.erl +++ b/src/machi_chain_manager1.erl @@ -158,7 +158,8 @@ init({MyName, InitMembersDict, MgrOpts}) -> ZeroProj = make_none_projection(MyName, ZeroAll_list, InitMembersDict), ok = store_zeroth_projection_maybe(ZeroProj, MgrOpts), - MembersDict = get_my_private_projection_members_dict(MgrOpts, InitMembersDict), + {MembersDict, Proj} = + get_my_private_proj_boot_info(MgrOpts, InitMembersDict, ZeroProj), All_list = [P#p_srvr.name || {_, P} <- orddict:to_list(MembersDict)], Opt = fun(Key, Default) -> proplists:get_value(Key, MgrOpts, Default) end, RunEnv = [{seed, Opt(seed, now())}, @@ -167,8 +168,9 @@ init({MyName, InitMembersDict, MgrOpts}) -> {network_islands, Opt(network_islands, [])}, {flapping_i, Opt(flapping, [])}, {up_nodes, Opt(up_nodes, not_init_yet)}], - ActiveP = Opt(active_mode, false), + ActiveP = Opt(active_mode, true), S = #ch_mgr{name=MyName, + proj=Proj, %% TODO 2015-03-04: revisit, should this constant be bigger? %% Yes, this should be bigger, but it's a hack. There is %% no guarantee that all parties will advance to a minimum @@ -229,6 +231,15 @@ handle_cast(_Cast, S) -> ?D({cast_whaaaaaaaaaaa, _Cast}), {noreply, S}. +handle_info(yo_yo_tick, S) -> + {{_Delta, Props, _Epoch}, S2} = do_react_to_env(S), + case proplists:get_value(throttle_seconds, Props) of + N when is_integer(N), N > 0 -> + timer:sleep(N * 1000); + _ -> + ok + end, + {noreply, S2}; handle_info(Msg, S) -> case get(todo_bummer) of undefined -> io:format("TODO: got ~p\n", [Msg]); _ -> ok @@ -249,14 +260,14 @@ make_none_projection(MyName, All_list, MembersDict) -> UPI_list = [], machi_projection:new(MyName, MembersDict, UPI_list, Down_list, [], []). -get_my_private_projection_members_dict(MgrOpts, DefaultDict) -> +get_my_private_proj_boot_info(MgrOpts, DefaultDict, DefaultProj) -> case proplists:get_value(projection_store_registered_name, MgrOpts) of undefined -> - DefaultDict; + {DefaultDict, DefaultProj}; Store -> {ok, P} = machi_projection_store:read_latest_projection(Store, private), - P#projection_v1.members_dict + {P#projection_v1.members_dict, P} end. %% Write the epoch 0 projection store, to assist bootstrapping. If the @@ -275,7 +286,8 @@ store_zeroth_projection_maybe(ZeroProj, MgrOpts) -> set_active_timer(#ch_mgr{name=MyName, members_dict=MembersDict}=S) -> FLU_list = [P#p_srvr.name || {_,P} <- orddict:to_list(MembersDict)], USec = calc_sleep_ranked_order(1000, 2000, MyName, FLU_list), - {ok, TRef} = timer:send_interval(USec, yo_yo_yo_todo), +io:format(user, "USec ~p for ~p (FLU_list ~p)\n", [USec, MyName, FLU_list]), + {ok, TRef} = timer:send_interval(USec, yo_yo_tick), S#ch_mgr{timer=TRef}. do_cl_write_public_proj(Proj, S) -> @@ -640,18 +652,17 @@ do_set_chain_members(MembersDict, _ = (catch ?FLU_PC:quit(Pid)) end, [], OldProxiesDict), All_list = [P#p_srvr.name || {_, P} <- orddict:to_list(MembersDict)], - NoneProj = make_none_projection(MyName, All_list, MembersDict), Proxies = orddict:fold( fun(K, P, Acc) -> {ok, Pid} = ?FLU_PC:start_link(P), [{K, Pid}|Acc] end, [], MembersDict), - {ok, S#ch_mgr{proj=NoneProj, - members_dict=MembersDict, + {ok, S#ch_mgr{members_dict=MembersDict, proxies_dict=orddict:from_list(Proxies)}}. -do_react_to_env(#ch_mgr{proj=#projection_v1{members_dict=[]}}=S) -> - {empty_members_dict, S}; +do_react_to_env(#ch_mgr{proj=#projection_v1{epoch_number=Epoch, + members_dict=[]}}=S) -> + {{empty_members_dict, [], Epoch}, S}; do_react_to_env(S) -> put(react, []), react_to_env_A10(S). @@ -1183,7 +1194,11 @@ react_to_env_C110(P_latest, #ch_mgr{name=MyName} = S) -> %% This is the local projection store. Use a larger timeout, so %% that things locally are pretty horrible if we're killed by a %% timeout exception. - ok = ?FLU_PC:write_projection(MyNamePid, private, P_latest2, ?TO*30), + %% ok = ?FLU_PC:write_projection(MyNamePid, private, P_latest2, ?TO*30), + Goo = P_latest2#projection_v1.epoch_number, + io:format(user, "HEE110 ~w ~w ~w\n", [S#ch_mgr.name, self(), lists:reverse(get(react))]), + + {ok,Goo} = {?FLU_PC:write_projection(MyNamePid, private, P_latest2, ?TO*30),Goo}, case proplists:get_value(private_write_verbose, S#ch_mgr.opts) of true -> {_,_,C} = os:timestamp(), @@ -1661,14 +1676,13 @@ sleep_ranked_order(MinSleep, MaxSleep, FLU, FLU_list) -> USec. calc_sleep_ranked_order(MinSleep, MaxSleep, FLU, FLU_list) -> - Front = lists:takewhile(fun(X) -> X /=FLU end, FLU_list), - Index = length(Front) + 1, + Front = lists:takewhile(fun(X) -> X /= FLU end, lists:sort(FLU_list)), + Index = length(Front), NumNodes = length(FLU_list), - SleepIndex = NumNodes - Index, SleepChunk = if NumNodes == 0 -> 0; - true -> MaxSleep div NumNodes + true -> (MaxSleep - MinSleep) div NumNodes end, - MinSleep + (SleepChunk * SleepIndex). + MinSleep + (SleepChunk * Index). get_raw_flapping_i(#projection_v1{dbg=Dbg}) -> proplists:get_value(flapping_i, Dbg, []). From 517941aaaaac1b00e90ec55f76785dfe61030984 Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Thu, 7 May 2015 17:52:16 +0900 Subject: [PATCH 10/12] Finish chain manager restart & membership changing --- include/machi_chain_manager.hrl | 1 + src/machi_chain_manager1.erl | 71 +++++++++++++++++++++++---------- 2 files changed, 52 insertions(+), 20 deletions(-) diff --git a/include/machi_chain_manager.hrl b/include/machi_chain_manager.hrl index 9382fa6..1fb4b5e 100644 --- a/include/machi_chain_manager.hrl +++ b/include/machi_chain_manager.hrl @@ -30,6 +30,7 @@ proj :: projection(), %% timer :: 'undefined' | timer:tref(), + ignore_timer :: boolean(), proj_history :: queue:queue(), flaps=0 :: integer(), flap_start = ?NOT_FLAPPING diff --git a/src/machi_chain_manager1.erl b/src/machi_chain_manager1.erl index 5cf9699..ef02f97 100644 --- a/src/machi_chain_manager1.erl +++ b/src/machi_chain_manager1.erl @@ -180,7 +180,7 @@ init({MyName, InitMembersDict, MgrOpts}) -> proj_history=queue:new(), runenv=RunEnv, opts=MgrOpts}, - {_, S2} = do_set_chain_members(MembersDict, S), + {_, S2} = do_set_chain_members_dict(MembersDict, S), S3 = if ActiveP == false -> S2; ActiveP == true -> @@ -190,9 +190,33 @@ init({MyName, InitMembersDict, MgrOpts}) -> handle_call({ping}, _From, S) -> {reply, pong, S}; -handle_call({set_chain_members, MembersDict}, _From, S) -> - {Reply, S2} = do_set_chain_members(MembersDict, S), - {reply, Reply, S2}; +handle_call({set_chain_members, MembersDict}, _From, + #ch_mgr{name=MyName, + proj=#projection_v1{all_members=OldAll_list, + epoch_number=OldEpoch, + upi=OldUPI}=OldProj}=S) -> + {Reply, S2} = do_set_chain_members_dict(MembersDict, S), + %% TODO: should there be any additional sanity checks? Right now, + %% if someone does something bad, then do_react_to_env() will + %% crash, which will crash us, and we'll restart in a sane & old + %% config. + All_list = [P#p_srvr.name || {_, P} <- orddict:to_list(MembersDict)], + MissingInNew = OldAll_list -- All_list, + NewUPI = OldUPI -- MissingInNew, + NewDown = All_list -- NewUPI, + NewEpoch = OldEpoch + 1111, + NewProj = machi_projection:update_checksum( + OldProj#projection_v1{author_server=MyName, + creation_time=now(), + epoch_number=NewEpoch, + all_members=All_list, + upi=NewUPI, + repairing=[], + down=NewDown, + members_dict=MembersDict}), + S3 = S2#ch_mgr{proj=NewProj}, + {_QQ, S4} = do_react_to_env(S3), + {reply, Reply, S4}; handle_call({set_active, Boolean}, _From, #ch_mgr{timer=TRef}=S) -> case {Boolean, TRef} of {true, undefined} -> @@ -231,15 +255,24 @@ handle_cast(_Cast, S) -> ?D({cast_whaaaaaaaaaaa, _Cast}), {noreply, S}. -handle_info(yo_yo_tick, S) -> +handle_info(tick_check_environment, #ch_mgr{ignore_timer=true}=S) -> + {noreply, S}; +handle_info(tick_check_environment, S) -> {{_Delta, Props, _Epoch}, S2} = do_react_to_env(S), case proplists:get_value(throttle_seconds, Props) of N when is_integer(N), N > 0 -> - timer:sleep(N * 1000); + %% We are flapping. Set ignore_timer=true and schedule a + %% reminder to stop ignoring. This slows down the rate of + %% flapping. If/when the yo:tell_author_yo() function in + %% state C200 is ever implemented, then it should be + %% implemented via the test_react_to_env style. + erlang:send_after(N*1000, self(), stop_ignoring_timer), + {noreply, S#ch_mgr{ignore_timer=true}}; _ -> - ok - end, - {noreply, S2}; + {noreply, S2} + end; +handle_info(stop_ignoring_timer, S) -> + {noreply, S#ch_mgr{ignore_timer=false}}; handle_info(Msg, S) -> case get(todo_bummer) of undefined -> io:format("TODO: got ~p\n", [Msg]); _ -> ok @@ -286,8 +319,7 @@ store_zeroth_projection_maybe(ZeroProj, MgrOpts) -> set_active_timer(#ch_mgr{name=MyName, members_dict=MembersDict}=S) -> FLU_list = [P#p_srvr.name || {_,P} <- orddict:to_list(MembersDict)], USec = calc_sleep_ranked_order(1000, 2000, MyName, FLU_list), -io:format(user, "USec ~p for ~p (FLU_list ~p)\n", [USec, MyName, FLU_list]), - {ok, TRef} = timer:send_interval(USec, yo_yo_tick), + {ok, TRef} = timer:send_interval(USec, tick_check_environment), S#ch_mgr{timer=TRef}. do_cl_write_public_proj(Proj, S) -> @@ -487,7 +519,7 @@ calc_projection(_OldThreshold, _NoPartitionThreshold, LastProj, {NewUPI_list3, Repairing_list3, RunEnv3} = case {NewUp, Repairing_list2} of {[], []} -> -D_foo=[], + D_foo=[], {NewUPI_list, [], RunEnv2}; {[], [H|T]} when RelativeToServer == hd(NewUPI_list) -> %% The author is head of the UPI list. Let's see if @@ -501,14 +533,14 @@ D_foo=[], tl(NewUPI_list) ++ Repairing_list2, S#ch_mgr.proj, Partitions, S), if not SameEpoch_p -> -D_foo=[], + D_foo=[], {NewUPI_list, OldRepairing_list, RunEnv2}; true -> -D_foo=[{repair_airquote_done, {we_agree, (S#ch_mgr.proj)#projection_v1.epoch_number}}], + D_foo=[{repair_airquote_done, {we_agree, (S#ch_mgr.proj)#projection_v1.epoch_number}}], {NewUPI_list ++ [H], T, RunEnv2} end; {_, _} -> -D_foo=[], + D_foo=[], {NewUPI_list, OldRepairing_list, RunEnv2} end, Repairing_list4 = case NewUp of @@ -587,7 +619,8 @@ calc_up_nodes_sim(MyName, AllMembers, RunEnv1) -> {UpNodes, Partitions2, RunEnv2}. replace(PropList, Items) -> - proplists:compact(Items ++ PropList). + Tmp = Items ++ PropList, + [{K, proplists:get_value(K, Tmp)} || K <- proplists:get_keys(Tmp)]. rank_and_sort_projections([], CurrentProj) -> rank_projections([CurrentProj], CurrentProj); @@ -645,13 +678,11 @@ rank_projection(#projection_v1{author_server=Author, ( N * length(Repairing_list)) + (N*N * length(UPI_list)). -do_set_chain_members(MembersDict, - #ch_mgr{name=MyName, proxies_dict=OldProxiesDict}=S) -> +do_set_chain_members_dict(MembersDict, #ch_mgr{proxies_dict=OldProxiesDict}=S)-> catch orddict:fold( fun(_K, Pid, _Acc) -> _ = (catch ?FLU_PC:quit(Pid)) end, [], OldProxiesDict), - All_list = [P#p_srvr.name || {_, P} <- orddict:to_list(MembersDict)], Proxies = orddict:fold( fun(K, P, Acc) -> {ok, Pid} = ?FLU_PC:start_link(P), @@ -1196,7 +1227,7 @@ react_to_env_C110(P_latest, #ch_mgr{name=MyName} = S) -> %% timeout exception. %% ok = ?FLU_PC:write_projection(MyNamePid, private, P_latest2, ?TO*30), Goo = P_latest2#projection_v1.epoch_number, - io:format(user, "HEE110 ~w ~w ~w\n", [S#ch_mgr.name, self(), lists:reverse(get(react))]), + %% io:format(user, "HEE110 ~w ~w ~w\n", [S#ch_mgr.name, self(), lists:reverse(get(react))]), {ok,Goo} = {?FLU_PC:write_projection(MyNamePid, private, P_latest2, ?TO*30),Goo}, case proplists:get_value(private_write_verbose, S#ch_mgr.opts) of From 14fc37bd0df12de21067698f58179cd740b5a041 Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Thu, 7 May 2015 18:39:39 +0900 Subject: [PATCH 11/12] Add ability to start FLUs at application startup --- src/machi_chain_manager1.erl | 8 ++++++++ src/machi_flu_psup.erl | 11 +++++++---- src/machi_flu_sup.erl | 8 +++++++- 3 files changed, 22 insertions(+), 5 deletions(-) diff --git a/src/machi_chain_manager1.erl b/src/machi_chain_manager1.erl index ef02f97..9afa807 100644 --- a/src/machi_chain_manager1.erl +++ b/src/machi_chain_manager1.erl @@ -104,6 +104,14 @@ stop(Pid) -> ping(Pid) -> gen_server:call(Pid, {ping}, infinity). +%% @doc Set chain members list. +%% +%% NOTE: This implementation is a bit brittle, in that an author with +%% higher rank may try to re-suggest the old membership list if it +%% races with an author of lower rank. For now, we suggest calling +%% set_chain_members() first on the author of highest rank and finish +%% with lowest rank, i.e. name z* first, name a* last. + set_chain_members(Pid, MembersDict) -> gen_server:call(Pid, {set_chain_members, MembersDict}, infinity). diff --git a/src/machi_flu_psup.erl b/src/machi_flu_psup.erl index 4f9d408..4405bf6 100644 --- a/src/machi_flu_psup.erl +++ b/src/machi_flu_psup.erl @@ -26,7 +26,7 @@ -behaviour(supervisor). %% External API --export([start_flu_package/4, stop_flu_package/1]). +-export([make_package_spec/4, start_flu_package/4, stop_flu_package/1]). %% Internal API -export([start_link/4, make_p_regname/1, make_mgr_supname/1, make_proj_supname/1]). @@ -34,10 +34,13 @@ %% Supervisor callbacks -export([init/1]). +make_package_spec(FluName, TcpPort, DataDir, Props) -> + {FluName, {machi_flu_psup, start_link, + [FluName, TcpPort, DataDir, Props]}, + permanent, 5000, supervisor, []}. + start_flu_package(FluName, TcpPort, DataDir, Props) -> - Spec = {FluName, {machi_flu_psup, start_link, - [FluName, TcpPort, DataDir, Props]}, - permanent, 5000, supervisor, []}, + Spec = make_package_spec(FluName, TcpPort, DataDir, Props), {ok, _SupPid} = supervisor:start_child(machi_flu_sup, Spec). stop_flu_package(FluName) -> diff --git a/src/machi_flu_sup.erl b/src/machi_flu_sup.erl index f74ea07..51efd87 100644 --- a/src/machi_flu_sup.erl +++ b/src/machi_flu_sup.erl @@ -41,5 +41,11 @@ init([]) -> MaxRestarts = 1000, MaxSecondsBetweenRestarts = 3600, SupFlags = {RestartStrategy, MaxRestarts, MaxSecondsBetweenRestarts}, - {ok, {SupFlags, []}}. + + Ps = application:get_env(machi, initial_flus, []), + FLU_specs = [machi_flu_psup:make_package_spec(FluName, TcpPort, + DataDir, Props) || + {FluName, TcpPort, DataDir, Props} <- Ps], + + {ok, {SupFlags, FLU_specs}}. From 238c8472cdbf87c2c82ff081bcba820a48b5128b Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Thu, 7 May 2015 18:43:51 +0900 Subject: [PATCH 12/12] WIP: timeout comments --- TODO-shortterm.org | 32 ++++++++++----------- src/machi_chain_manager1.erl | 10 ++----- test/machi_chain_manager1_converge_demo.erl | 2 +- 3 files changed, 20 insertions(+), 24 deletions(-) diff --git a/TODO-shortterm.org b/TODO-shortterm.org index a539b98..9dbd35f 100644 --- a/TODO-shortterm.org +++ b/TODO-shortterm.org @@ -25,6 +25,22 @@ func, and pattern match Erlang style in that func. *** DONE Do it. +** DONE Finish OTP'izing the Chain Manager with FLU & proj store processes +** DONE Eliminate the timeout exception for the client: just {error,timeout} ret +** DONE Move prototype/chain-manager code to "top" of source tree +*** DONE Preserve current test code (leave as-is? tiny changes?) +*** DONE Make chain manager code flexible enough to run "real world" or "sim" +** TODO Implement real data repair, orchestrated by the chain manager +** TODO Change all protocol ops to enforce the epoch ID +** TODO Add projection wedging logic to each FLU. + +- Add no-wedging state to make testing easier? + +** TODO Move the FLU server to gen_server behavior? +** TODO Add gproc and get rid of registered name rendezvous +*** TODO Fixes the atom table leak +*** TODO Fixes the problem of having active sequencer for the same prefix + on two FLUS in the same VM ** TODO Fix all known bugs with Chain Manager *** DONE Fix known bugs @@ -32,19 +48,3 @@ func, and pattern match Erlang style in that func. *** TODO Re-add verification step of stable epochs, including inner projections! *** TODO Attempt to remove cruft items in flapping_i? -** TODO Finish OTP'izing the Chain Manager with FLU & proj store processes -** TODO Add gproc and get rid of registered name rendezvous -*** TODO Fixes the atom table leak -*** TODO Fixes the problem of having active sequencer for the same prefix - on two FLUS in the same VM -** TODO Eliminate the timeout exception for the client: just {error,timeout} ret -** TODO Change all protocol ops to enforce the epoch ID -** TODO Add projection wedging logic to each FLU. - -- Add no-wedging state to make testing easier? - -** TODO Move prototype/chain-manager code to "top" of source tree -*** TODO Preserve current test code (leave as-is? tiny changes?) -*** TODO Make chain manager code flexible enough to run "real world" or "sim" -** TODO Move the FLU server to gen_server behavior? -** TODO Implement real data repair, orchestrated by the chain manager diff --git a/src/machi_chain_manager1.erl b/src/machi_chain_manager1.erl index 9afa807..a983ea7 100644 --- a/src/machi_chain_manager1.erl +++ b/src/machi_chain_manager1.erl @@ -1333,13 +1333,9 @@ calculate_flaps(P_newprop, _P_current, _FlapLimit, P#projection_v1.repairing, P#projection_v1.down} || P <- Ps]), - _QQQ = {_WhateverUnanimous, BestP, Props, _S} = cl_read_latest_projection(private, S), NotBestPs = proplists:get_value(not_unanimous_answers, Props, []), -%% io:format(user, "QQQ ~p\n", [QQQ]), -%% io:format(user, "BestP ~p\n", [BestP]), -%% io:format(user, "NotBestPs ~p\n", [NotBestPs]), DownUnion = lists:usort( lists:flatten( [P#projection_v1.down || @@ -1414,9 +1410,9 @@ calculate_flaps(P_newprop, _P_current, _FlapLimit, %% %% So b's working on epoch 451 at the same time that d's latest %% public projection is only epoch 441. But there's enough - %% lag so that b can "see" that a's bad=[c] (due to t_timeout!) - %% and c's bad=[a]. So voila, b magically knows about both - %% problem FLUs. Weird/cool. + %% lag so that b can "see" that a's bad=[c] (due to + %% {error,partition}!) and c's bad=[a]. So voila, b + %% magically knows about both problem FLUs. Weird/cool. AllFlapCounts = TempAllFlapCounts, AllHosed = lists:usort(DownUnion ++ HosedTransUnion ++ BadFLUs); diff --git a/test/machi_chain_manager1_converge_demo.erl b/test/machi_chain_manager1_converge_demo.erl index de6db4b..30e7b23 100644 --- a/test/machi_chain_manager1_converge_demo.erl +++ b/test/machi_chain_manager1_converge_demo.erl @@ -310,7 +310,7 @@ convergence_demo_testfun(NumFLUs) -> io:format(user, "We should see convergence to 1 correct chain.\n", []), machi_partition_simulator:no_partitions(), [DoIt(50, 10, 100) || _ <- [1]], - io:format(user, "Sweet, finishing early\n", []), exit(yoyoyo_testing_hack), + io:format(user, "Sweet, finishing early\n", []), exit(yoyoyo_testing_hack_finishing_early), %% WARNING: In asymmetric partitions, private_projections_are_stable() %% will never be true; code beyond this point on the -exp3 %% branch is bit-rotted, sorry!