Configure FLUs and chains with "rc.d" style configuration #56

Merged
slfritchie merged 43 commits from slf/flu-config-rcd-style into master 2015-12-18 06:46:05 +00:00
32 changed files with 2517 additions and 164 deletions

1
.gitignore vendored
View file

@ -25,5 +25,6 @@ rel/machi
*.patch *.patch
current_counterexample.eqc current_counterexample.eqc
foo* foo*
RUNLOG*
typescript* typescript*
*.swp *.swp

View file

@ -35,6 +35,9 @@ deps:
clean: clean:
$(REBAR) -r clean $(REBAR) -r clean
edoc: edoc-clean
$(REBAR) skip_deps=true doc
edoc-clean: edoc-clean:
rm -f edoc/*.png edoc/*.html edoc/*.css edoc/edoc-info rm -f edoc/*.png edoc/*.html edoc/*.css edoc/edoc-info

View file

@ -0,0 +1,620 @@
FLU and Chain Life Cycle Management -*- mode: org; -*-
#+STARTUP: lognotedone hidestars indent showall inlineimages
#+COMMENT: To generate the outline section: egrep '^\*[*]* ' doc/flu-and-chain-lifecycle.org | egrep -v '^\* Outline' | sed -e 's/^\*\*\* / + /' -e 's/^\*\* / + /' -e 's/^\* /+ /'
* FLU and Chain Life Cycle Management
In an ideal world, we (the Machi development team) would have a full
vision of how Machi would be managed, down to the last detail of
beautiful CLI character and network protocol bit. Our vision isn't
complete yet, so we are working one small step at a time.
* Outline
+ FLU and Chain Life Cycle Management
+ Terminology review
+ Terminology: Machi run-time components/services/thingies
+ Terminology: Machi data structures
+ Terminology: Cluster-of-cluster (CoC) data structures
+ Overview of administrative life cycles
+ Cluster-of-clusters (CoC) administrative life cycle
+ Chain administrative life cycle
+ FLU server administrative life cycle
+ Quick admin: declarative management of Machi FLU and chain life cycles
+ Quick admin uses the "rc.d" config scheme for life cycle management
+ Quick admin's declarative "language": an Erlang-flavored AST
+ Term 'host': define a new host for FLU services
+ Term 'flu': define a new FLU
+ Term 'chain': define or reconfigure a chain
+ Executing quick admin AST files via the 'machi-admin' utility
+ Checking the syntax of an AST file
+ Executing an AST file
+ Using quick admin to manage multiple machines
+ The "rc.d" style configuration file scheme
+ Riak had a similar configuration file editing problem (and its solution)
+ Machi's "rc.d" file scheme.
+ FLU life cycle management using "rc.d" style files
+ The key configuration components of a FLU
+ Chain life cycle management using "rc.d" style files
+ The key configuration components of a chain
* Terminology review
** Terminology: Machi run-time components/services/thingies
+ FLU: a basic Machi server, responsible for managing a collection of
files.
+ Chain: a small collection of FLUs that maintain replicas of the same
collection of files. A chain is usually small, 1-3 servers, where
more than 3 would be used only in cases when availability of
certain data is critical despite failures of several machines.
+ The length of a chain is directly proportional to its
replication factor, e.g., a chain length=3 will maintain
(nominally) 3 replicas of each file.
+ To maintain file availability when ~F~ failures have occurred, a
chain must be at least ~F+1~ members long. (In comparison, the
quorum replication technique requires ~2F+1~ members in the
general case.)
+ Cluster: this word can be used interchangeably with "chain".
+ Cluster-of-clusters: A collection of Machi clusters where files are
horizontally partitioned/sharded/distributed across
** Terminology: Machi data structures
+ Projection: used to define a single chain: the chain's consistency
mode (strong or eventual consistency), all members (from an
administrative point of view), all active members (from a runtime,
automatically-managed point of view), repairing/file-syncing members
(also runtime, auto-managed), and so on
+ Epoch: A version number of a projection. The epoch number is used
by both clients & servers to manage transitions from one projection
to another, e.g., when the chain is temporarily shortened by the
failure of a member FLU server.
** Terminology: Cluster-of-cluster (CoC) data structures
+ Namespace: A collection of human-friendly names that are mapped to
groups of Machi chains that provide the same type of storage
service: consistency mode, replication policy, etc.
+ A single namespace name, e.g. ~normal-ec~, is paired with a single
CoC chart (see below).
+ Example: ~normal-ec~ might be a collection of Machi chains in
eventually-consistent mode that are of length=3.
+ Example: ~risky-ec~ might be a collection of Machi chains in
eventually-consistent mode that are of length=1.
+ Example: ~mgmt-critical~ might be a collection of Machi chains in
strongly-consistent mode that are of length=7.
+ CoC chart: Encodes the rules which partition/shard/distribute a
particular namespace across a group of chains that collectively
store the namespace's files.
+ "chart: noun, a geographical map or plan, especially on used for
navigation by sea or air."
+ Chain weight: A value assigned to each chain within a CoC chart
structure that defines the relative storage capacity of a chain
within the namespace. For example, a chain weight=150 has 50% more
capacity than a chain weight=100.
+ CoC chart epoch: The version number assigned to a CoC chart.
* Overview of administrative life cycles
** Cluster-of-clusters (CoC) administrative life cycle
+ CoC is first created
+ CoC adds namespaces (e.g. consistency policy + chain length policy)
+ CoC adds/removes chains to a namespace to increase/decrease the
namespace's storage capacity.
+ CoC adjusts chain weights within a namespace, e.g., to shift files
within the namespace to chains with greater storage capacity
resources and/or runtime I/O resources.
A CoC "file migration" is the process of moving files from one
namespace member chain to another for purposes of shifting &
re-balancing storage capacity and/or runtime I/O capacity.
** Chain administrative life cycle
+ A chain is created with an initial FLU membership list.
+ Chain may be administratively modified zero or more times to
add/remove member FLU servers.
+ A chain may be decommissioned.
See also: http://basho.github.io/machi/edoc/machi_lifecycle_mgr.html
** FLU server administrative life cycle
+ A FLU is created after an administrator chooses the FLU's runtime
location is selected by the administrator: which machine/virtual
machine, IP address and TCP port allocation, etc.
+ An unassigned FLU may be added to a chain by chain administrative
policy.
+ A FLU that is assigned to a chain may be removed from that chain by
chain administrative policy.
+ In the current implementation, the FLU's Erlang processes will be
halted. Then the FLU's data and metadata files will be moved to
another area of the disk for safekeeping. Later, a "garbage
collection" process can be used for reclaiming disk space used by
halted FLU servers.
See also: http://basho.github.io/machi/edoc/machi_lifecycle_mgr.html
* Quick admin: declarative management of Machi FLU and chain life cycles
The "quick admin" scheme is a temporary (?) tool for managing Machi
FLU server and chain life cycles in a declarative manner. The API is
described in this section.
** Quick admin uses the "rc.d" config scheme for life cycle management
As described at the top of
http://basho.github.io/machi/edoc/machi_lifecycle_mgr.html, the "rc.d"
config files do not manage "policy". "Policy" is doing the right
thing with a Machi cluster-of-clusters from a systems administrator's
point of view. The "rc.d" config files can only implement decisions
made according to policy.
The "quick admin" tool is a first attempt at automating policy
decisions in a safe way (we hope) that is also easy to implement (we
hope) with a variety of systems management tools, e.g. Chef, Puppet,
Ansible, Saltstack, or plain-old-human-at-a-keyboard.
** Quick admin's declarative "language": an Erlang-flavored AST
The "language" that an administrator uses to express desired policy
changes is not (yet) a true language. As a quick implementation hack,
the current language is an Erlang-flavored abstract syntax tree
(AST). The tree isn't very deep, either, frequently just one
element tall. (Not much of a tree, is it?)
There are three terms in the language currently:
+ ~host~, define a new host that can execute FLU servers
+ ~flu~, define a new FLU
+ ~chain~, define a new chain or re-configure an existing chain with
the same name
*** Term 'host': define a new host for FLU services
In this context, a host is a machine, virtual machine, or container
that can execute the Machi application and can therefore provide FLU
services, i.e. file service, Humming Consensus management.
Two formats may be used to define a new host:
#+BEGIN_SRC
{host, Name, Props}.
{host, Name, AdminI, ClientI, Props}.
#+END_SRC
The shorter tuple is shorthand notation for the latter. If the
shorthand form is used, then it will be converted automatically to the
long form as:
#+BEGIN_SRC
{host, Name, AdminI=Name, ClientI=Name, Props}.
#+END_SRC
Type information, description, and restrictions:
+ ~Name::string()~ The ~Name~ attribute must be unique. Note that it
is possible to define two different hosts, one using a DNS hostname
and one using an IP address. The user must avoid this
double-definition because it is not enforced by quick admin.
+ The ~Name~ field is used for cross-reference purposes with other
terms, e.g., ~flu~ and ~chain~.
+ There is no syntax yet for removing a host definition.
+ ~AdminI::string()~ A DNS hostname or IP address for cluster
administration purposes, e.g. SSH access.
+ This field is unused at the present time.
+ ~ClientI::string()~ A DNS hostname or IP address for Machi's client
protocol access, e.g., Protocol Buffers network API service.
+ This field is unused at the present time.
+ ~props::proplist()~ is an Erlang-style property list for specifying
additional configuration options, debugging information, sysadmin
comments, etc.
+ A full-featured admin tool should also include managing several
other aspects of configuration related to a "host". For example,
for any single IP address, quick admin assumes that there will be
exactly one Erlang VM that is running the Machi application. Of
course, it is possible to have dozens of Erlang VMs on the same
(let's assume for clarity) hardware machine and all running Machi
... but there are additional aspects of such a machine that quick
admin does not account for
+ multiple IP addresses per machine
+ multiple Machi package installation paths
+ multiple Machi config files (e.g. cuttlefish config, ~etc.conf~,
~vm.args~)
+ multiple data directories/file system mount points
+ This is also a management problem for quick admin for a single
Machi package on a machine to take advantage of bulk data
storage using multiple multiple file system mount points.
+ multiple Erlang VM host names, required for distributed Erlang,
which is used for communication with ~machi~ and ~machi-admin~
command line utilities.
+ and others....
*** Term 'flu': define a new FLU
A new FLU is defined relative to a previously-defined ~host~ entities;
an exception will be thrown if the ~host~ cannot be cross-referenced.
#+BEGIN_SRC
{flu, Name, HostName, Port, Props}
#+END_SRC
Type information, description, and restrictions:
+ ~Name::atom()~ The name of the FLU, as a human-friendly name and
also for internal management use; please note the ~atom()~ type.
This name must be unique.
+ The ~Name~ field is used for cross-reference purposes with the
~chain~ term.
+ There is no syntax yet for removing a FLU definition.
+ ~Hostname::string()~ The cross-reference name of the ~host~ that
this FLU should run on.
+ ~Port::non_neg_integer()~ The TCP port used by this FLU server's
Protocol Buffers network API listener service
+ ~props::proplist()~ is an Erlang-style property list for specifying
additional configuration options, debugging information, sysadmin
comments, etc.
*** Term 'chain': define or reconfigure a chain
A chain is defined relative to zero or more previously-defined ~flu~
entities; an exception will be thrown if any ~flu~ cannot be
cross-referenced.
Two formats may be used to define/reconfigure a chain:
#+BEGIN_SRC
{chain, Name, FullList, Props}.
{chain, Name, CMode, FullList, Witnesses, Props}.
#+END_SRC
The shorter tuple is shorthand notation for the latter. If the
shorthand form is used, then it will be converted automatically to the
long form as:
#+BEGIN_SRC
{chain, Name, ap_mode, FullList, [], Props}.
#+END_SRC
Type information, description, and restrictions:
+ ~Name::atom()~ The name of the chain, as a human-friendly name and
also for internal management use; please note the ~atom()~ type.
This name must be unique.
+ There is no syntax yet for removing a chain definition.
+ ~CMode::'ap_mode'|'cp_mode'~ Defines the consistency mode of the
chain, either eventual consistency or strong consistency,
respectively.
+ A chain cannot change consistency mode, e.g., from
strong~->~eventual consistency.
+ ~FullList::list(atom())~ Specifies the list of full-service FLU
servers, i.e. servers that provide file data & metadata services as
well as Humming Consensus. Each atom in the list must
cross-reference with a previously defined ~chain~; an exception will
be thrown if any ~flu~ cannot be cross-referenced.
+ ~Witnesses::list(atom())~ Specifies the list of witness-only
servers, i.e. servers that only participate in Humming Consensus.
Each atom in the list must cross-reference with a previously defined
~chain~; an exception will be thrown if any ~flu~ cannot be
cross-referenced.
+ This list must be empty for eventual consistency chains.
+ ~props::proplist()~ is an Erlang-style property list for specifying
additional configuration options, debugging information, sysadmin
comments, etc.
+ If this term specifies a new ~chain~ name, then all of the member
FLU servers (full & witness types) will be bootstrapped to a
starting configuration.
+ If this term specifies a previously-defined ~chain~ name, then all
of the member FLU servers (full & witness types, respectively) will
be adjusted to add or remove members, as appropriate.
+ Any FLU servers added to either list must not be assigned to any
other chain, or they must be a member of this specific chain.
+ Any FLU servers removed from either list will be halted.
(See the "FLU server administrative life cycle" section above.)
** Executing quick admin AST files via the 'machi-admin' utility
Examples of quick admin AST files can be found in the
~priv/quick-admin/examples~ directory. Below is an example that will
define a new host ( ~"localhost"~ ), three new FLU servers ( ~f1~ & ~f2~
and ~f3~ ), and an eventually consistent chain ( ~c1~ ) that uses the new
FLU servers:
#+BEGIN_SRC
{host, "localhost", []}.
{flu,f1,"localhost",20401,[]}.
{flu,f2,"localhost",20402,[]}.
{flu,f3,"localhost",20403,[]}.
{chain,c1,[f1,f2,f3],[]}.
#+END_SRC
*** Checking the syntax of an AST file
Given an AST config file, ~/path/to/ast/file~, its basic syntax and
correctness can be checked without executing it.
#+BEGIN_SRC
./rel/machi/bin/machi-admin quick-admin-check /path/to/ast/file
#+END_SRC
+ The utility will exit with status zero and output ~ok~ if the syntax
and proposed configuration appears to be correct.
+ If there is an error, the utility will exit with status one, and an
error message will be printed.
*** Executing an AST file
Given an AST config file, ~/path/to/ast/file~, it can be executed
using the command:
#+BEGIN_SRC
./rel/machi/bin/machi-admin quick-admin-apply /path/to/ast/file RelativeHost
#+END_SRC
... where the last argument, ~RelativeHost~, should be the exact
spelling of one of the previously defined AST ~host~ entities,
*and also* is the same host that the ~machi-admin~ utility is being
executed on.
Restrictions and warnings:
+ This is alpha quality software.
+ There is no "undo".
+ Of course there is, but you need to resort to doing things like
using ~machi attach~ to attach to the server's CLI to then execute
magic Erlang incantations to stop FLUs, unconfigure chains, etc.
+ Oh, and delete some files with magic paths, also.
** Using quick admin to manage multiple machines
A quick sketch follows:
1. Create the AST file to specify all of the changes that you wish to
make to all hosts, FLUs, and/or chains, e.g., ~/tmp/ast.txt~.
2. Check the basic syntax with the ~quick-admin-check~ argument to
~machi-admin~.
3. If the syntax is good, then copy ~/tmp/ast.txt~ to all hosts in the
cluster, using the same path, ~/tmp/ast.txt~.
4. For each machine in the cluster, run:
#+BEGIN_SRC
./rel/machi/bin/machi-admin quick-admin-apply /tmp/ast.txt RelativeHost
#+END_SRC
... where RelativeHost is the AST ~host~ name of the machine that you
are executing the ~machi-admin~ command on. The command should be
successful, with exit status 0 and outputting the string ~ok~.
Finally, for each machine in the cluster, a listing of all files in
the directory ~rel/machi/etc/quick-admin-archive~ should show exactly
the same files, one for each time that ~quick-admin-apply~ has been
run successfully on that machine.
* The "rc.d" style configuration file scheme
This configuration scheme is inspired by BSD UNIX's ~init(8)~ process
manager's configuration style, called "rc.d" after the name of the
directory where these files are stored, ~/etc/rc.d~. The ~init~
process is responsible for (among other things) starting UNIX
processes at machine boot time and stopping them when the machine is
shut down.
The original scheme used by ~init~ to start processes at boot time was
a single Bourne shell script called ~/etc/rc~. When a new software
package was installed that required a daemon to be started at boot
time, text was added to the ~/etc/rc~ file. Uninstalling packages was
much trickier, because it meant removing lines from a file that
*is a computer program (run by the Bourne shell, a Turing-complete
programming language)*. Error-free editing of the ~/etc/rc~ script
was impossible in all cases.
Later, ~init~'s configuration was split into a few master Bourne shell
scripts and a subdirectory, ~/etc/rc.d~. The subdirectory contained
shell scripts that were responsible for boot time starting of a single
daemon or service, e.g. NFS or an HTTP server. When a new software
package was added, a new file was added to the ~rc.d~ subdirectory.
When a package was removed, the corresponding file in ~rc.d~ was
removed. With this simple scheme, addition & removal of boot time
scripts was vastly simplified.
** Riak had a similar configuration file editing problem (and its solution)
Another software product from Basho Technologies, Riak, had a similar
configuration file editing problem. One file in particular,
~app.config~, had a syntax that made it difficult both for human
systems administrators and also computer programs to edit the file in
a syntactically correct manner.
Later releases of Riak switched to an alternative configuration file
format, one inspired by the BSD UNIX ~sysctl(8)~ utility and
~sysctl.conf(5)~ file syntax. The ~sysctl.conf~ format is much easier
to manage by computer programs to add items. Removing items is not
100% simple, however: the correct lines must be identified and then
removed (e.g. with Perl or a text editor or combination of ~grep -v~
and ~mv~), but removing any comment lines that "belong" to the removed
config item(s) is not any easy for a 1-line shell script to do 100%
correctly.
Machi will use the ~sysctl.conf~ style configuration for some
application configuration variables. However, adding & removing FLUs
and chains will be managed using the "rc.d" style because of the
"rc.d" scheme's simplicity and tolerance of mistakes by administrators
(human or computer).
** Machi's "rc.d" file scheme.
Machi will use a single subdirectory that will contain configuration
files for some life cycle management task, e.g. a single FLU or a
single chain.
The contents of the file should be a single Erlang term, serialized in
ASCII form as Erlang source code statement, i.e. a single Erlang term
~T~ that is formatted by ~io:format("~w.",[T]).~. This file must be
parseable by the Erlang function ~file:consult()~.
Later versions of Machi may change the file format to be more familiar
to administrators who are unaccustomed to Erlang language syntax.
** FLU life cycle management using "rc.d" style files
*** The key configuration components of a FLU
1. The machine (or virtual machine) to run it on.
2. The Machi software package's artifacts to execute.
3. The disk device(s) used to store Machi file data & metadata, "rc.d"
style config files, etc.
4. The name, IP address and TCP port assigned to the FLU service.
5. Its chain assignment.
Notes:
+ Items 1-3 are currently outside of the scope of this life cycle
document. We assume that human administrators know how to do these
things.
+ Item 4's properties are explicitly managed by a FLU-defining "rc.d"
style config file.
+ Item 5 is managed by the chain life cycle management system.
Here is an example of a properly formatted FLU config file:
#+BEGIN_SRC
{p_srvr,f1,machi_flu1_client,"192.168.72.23",20401,[]}.
#+END_SRC
... which corresponds to the following Erlang record definition:
#+BEGIN_SRC
-record(p_srvr, {
name :: atom(),
proto_mod = 'machi_flu1_client' :: atom(), % Module name
address :: term(), % Protocol-specific
port :: term(), % Protocol-specific
props = [] :: list() % proplist for other related info
}).
#+END_SRC
+ ~name~ is ~f1~. This is name of the FLU. This name should be
unique over the lifetime of the administrative domain and thus
managed by external policy. This name must be the same as the name
of the config file that defines the FLU.
+ ~proto_mod~ is used for internal management purposes and should be
considered a mandatory constant.
+ ~address~ is "192.168.72.23". The DNS hostname or IP address used
by other servers to communicate with this FLU. This must be a valid
IP address, previously assigned to this machine/VM using the
appropriate operating system-specific procedure.
+ ~port~ is TCP port 20401. The TCP port number that the FLU listens
to for incoming Protocol Buffers-serialized communication. This TCP
port must not be in use (now or in the future) by another Machi FLU
or any other process running on this machine/VM.
+ ~props~ is an Erlang-style property list for specifying additional
configuration options, debugging information, sysadmin comments,
etc.
** Chain life cycle management using "rc.d" style files
Unlike FLUs, chains have a self-management aspect that makes a chain
life cycle different from a single FLU server. Machi's chains are
self-managing, via Humming Consensus; see the
https://github.com/basho/machi/tree/master/doc/ directory for much
more detail about Humming Consensus. After FLUs have received their
initial chain configuration for Humming Consensus, the FLUs will
manage the chain (and each other) by themselves.
However, Humming Consensus does not handle three chain management
problems:
1. Specifying the very first chain configuration,
2. Altering the membership of the chain (i.e. adding/removing FLUs
from the chain),
3. Stopping the chain permanently.
A chain "rc.d" file will only be used to bootstrap a newly-defined FLU
server. It's like a piece of glue information to introduce the new
FLU to the Humming Consensus group that is managing the chain's
dynamic state (e.g. which members are up or down). In all other
respects, chain config files are ignored by life cycle management code.
However, to mimic the life cycle of the FLU server's "rc.d" config
files, a chain "rc.d" files is not deleted until the chain has been
decommissioned (i.e. defined with length=0).
*** The key configuration components of a chain
1. The name of the chain.
2. Consistency mode: eventually consistent or strongly consistent.
3. The membership list of all FLU servers in the chain.
+ Remember, all servers in a single chain will manage full replicas
of the same collection of Machi files.
4. If the chain is defined to use strongly consistent mode, then a
list of "witness servers" may also be defined. See the
[https://github.com/basho/machi/tree/master/doc/] documentation for
more information on witness servers.
+ The witness list must be empty for all chains in eventual
consistency mode.
Here is an example of a properly formatted chain config file:
#+BEGIN_SRC
{chain_def_v1,c1,ap_mode,
[{p_srvr,f1,machi_flu1_client,"localhost",20401,[]},
{p_srvr,f2,machi_flu1_client,"localhost",20402,[]},
{p_srvr,f3,machi_flu1_client,"localhost",20403,[]}],
[],[],[],
[f1,f2,f3],
[],[]}.
#+END_SRC
... which corresponds to the following Erlang record definition:
#+BEGIN_SRC
-record(chain_def_v1, {
name :: atom(), % chain name
mode :: 'ap_mode' | 'cp_mode',
full = [] :: [p_srvr()],
witnesses = [] :: [p_srvr()],
old_full = [] :: [atom()], % guard against some races
old_witnesses=[] :: [atom()], % guard against some races
local_run = [] :: [atom()], % must be tailored to each machine!
local_stop = [] :: [atom()], % must be tailored to each machine!
props = [] :: list() % proplist for other related info
}).
#+END_SRC
+ ~name~ is ~c1~, the name of the chain. This name should be unique
over the lifetime of the administrative domain and thus managed by
external policy. This name must be the same as the name of the
config file that defines the chain.
+ ~mode~ is ~ap_mode~, an internal code symbol for eventual
consistency mode.
+ ~full~ is a list of Erlang ~#p_srvr{}~ records for full-service
members of the chain, i.e., providing Machi file data & metadata
storage services.
+ ~witnesses~ is a list of Erlang ~#p_srvr{}~ records for witness-only
FLU servers, i.e., providing only Humming Consensus service.
+ The next four fields are used for internal management only.
+ ~props~ is an Erlang-style property list for specifying additional
configuration options, debugging information, sysadmin comments,
etc.

View file

@ -1,6 +1,6 @@
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------
%% %%
%% Copyright (c) 2007-2014 Basho Technologies, Inc. All Rights Reserved. %% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
%% %%
%% This file is provided to you under the Apache License, %% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file %% Version 2.0 (the "License"); you may not use this file
@ -22,10 +22,11 @@
-define(MACHI_PROJECTION_HRL, true). -define(MACHI_PROJECTION_HRL, true).
-type pv1_consistency_mode() :: 'ap_mode' | 'cp_mode'. -type pv1_consistency_mode() :: 'ap_mode' | 'cp_mode'.
-type pv1_chain_name():: atom().
-type pv1_csum() :: binary(). -type pv1_csum() :: binary().
-type pv1_epoch() :: {pv1_epoch_n(), pv1_csum()}. -type pv1_epoch() :: {pv1_epoch_n(), pv1_csum()}.
-type pv1_epoch_n() :: non_neg_integer(). -type pv1_epoch_n() :: non_neg_integer().
-type pv1_server() :: atom() | binary(). -type pv1_server() :: atom().
-type pv1_timestamp() :: {non_neg_integer(), non_neg_integer(), non_neg_integer()}. -type pv1_timestamp() :: {non_neg_integer(), non_neg_integer(), non_neg_integer()}.
-record(p_srvr, { -record(p_srvr, {
@ -55,6 +56,7 @@
epoch_number :: pv1_epoch_n() | ?SPAM_PROJ_EPOCH, epoch_number :: pv1_epoch_n() | ?SPAM_PROJ_EPOCH,
epoch_csum :: pv1_csum(), epoch_csum :: pv1_csum(),
author_server :: pv1_server(), author_server :: pv1_server(),
chain_name = ch_not_def_yet :: pv1_chain_name(),
all_members :: [pv1_server()], all_members :: [pv1_server()],
witnesses = [] :: [pv1_server()], witnesses = [] :: [pv1_server()],
creation_time :: pv1_timestamp(), creation_time :: pv1_timestamp(),
@ -75,4 +77,16 @@
%% create a consistent projection ranking score. %% create a consistent projection ranking score.
-define(MAX_CHAIN_LENGTH, 64). -define(MAX_CHAIN_LENGTH, 64).
-record(chain_def_v1, {
name :: atom(), % chain name
mode :: pv1_consistency_mode(),
full = [] :: [p_srvr()],
witnesses = [] :: [p_srvr()],
old_full = [] :: [pv1_server()], % guard against some races
old_witnesses=[] :: [pv1_server()], % guard against some races
local_run = [] :: [pv1_server()], % must be tailored to each machine!
local_stop = [] :: [pv1_server()], % must be tailored to each machine!
props = [] :: list() % proplist for other related info
}).
-endif. % !MACHI_PROJECTION_HRL -endif. % !MACHI_PROJECTION_HRL

View file

@ -0,0 +1 @@
{host, "localhost", []}.

View file

@ -0,0 +1,4 @@
{flu,f1,"localhost",20401,[]}.
{flu,f2,"localhost",20402,[]}.
{flu,f3,"localhost",20403,[]}.
{chain,c1,[f1,f2,f3],[]}.

View file

@ -0,0 +1,4 @@
{flu,f4,"localhost",20404,[]}.
{flu,f5,"localhost",20405,[]}.
{flu,f6,"localhost",20406,[]}.
{chain,c2,[f4,f5,f6],[]}.

View file

@ -1,25 +1,31 @@
[ [
{machi, [ {machi, [
%% Data directory for all FLUs. %% Data directory for all FLUs.
{flu_data_dir, "{{platform_data_dir}}"}, {flu_data_dir, "{{platform_data_dir}}/flu"},
%% FLU config directory
{flu_config_dir, "{{platform_etc_dir}}/flu-config"},
%% Chain config directory
{chain_config_dir, "{{platform_etc_dir}}/chain-config"},
%% FLUs to start at app start. %% FLUs to start at app start.
{initial_flus, [ %% This task has moved to machi_flu_sup and machi_lifecycle_mgr.
%% Remember, this is a list, so separate all tuples
%% with a comma.
%%
%% {Name::atom(), Port::pos_integer(), proplist()}
%%
%% For example: {my_name_is_a, 12500, []}
]},
%% Number of metadata manager processes to run per FLU. %% Number of metadata manager processes to run per FLU.
%% Default = 10 %% Default = 10
%% {metadata_manager_count, 2}, %% {metadata_manager_count, 2},
%% Platform vars (mirror of reltool packaging)
{platform_data_dir, "{{platform_data_dir}}"},
{platform_etc_dir, "{{platform_etc_dir}}"},
%% Do not delete, do not put Machi config items after this line. %% Do not delete, do not put Machi config items after this line.
{final_comma_stopper, do_not_delete} {final_comma_stopper, do_not_delete}
] ]
},
{lager, [
{error_logger_hwm, 5000} % lager's default of 50/sec is too low
]
} }
]. ].

View file

@ -22,23 +22,41 @@ cd $RUNNER_BASE_DIR
SCRIPT=`basename $0` SCRIPT=`basename $0`
usage() { usage() {
echo "Usage: $SCRIPT { test | " echo "Usage: $SCRIPT { quick-admin-check | quick-admin-apply | "
echo " top }" echo " top }"
} }
case "$1" in case "$1" in
test) quick-admin-check)
# Make sure the local node IS running # Make sure the local node IS running
node_up_check node_up_check
shift shift
# Parse out the node name to pass to the client NODE_NAME=${NAME_ARG#* } # target machi server node name
NODE_NAME=${NAME_ARG#* } IN_FILE="$1"
$ERTS_PATH/erl -noshell $NAME_PARAM machi_test$NAME_HOST $COOKIE_ARG \ $ERTS_PATH/erl -noshell -noinput $NAME_PARAM machi_test$NAME_HOST $COOKIE_ARG \
-pa $RUNNER_LIB_DIR/basho-patches \ -remsh $NODE_NAME \
-eval "case catch(machi:client_test(\"$NODE_NAME\")) of \ -eval "Me = self(), spawn('"$NODE_NAME"', fun() -> X = (catch(machi_lifecycle_mgr:quick_admin_sanity_check(\"$IN_FILE\"))), Me ! {res, X} end), XX = receive {res, Res} -> Res after 10*1000 -> timeout end, io:format(user, \"Result: ~p\n\", [XX]), case XX of \
ok -> init:stop(); \
_ -> init:stop(1) \
end."
;;
quick-admin-apply)
# Make sure the local node IS running
node_up_check
shift
NODE_NAME=${NAME_ARG#* } # target machi server node name
IN_FILE="$1"
RELATIVE_HOST="$2"
$ERTS_PATH/erl -noshell -noinput $NAME_PARAM machi_test$NAME_HOST $COOKIE_ARG \
-remsh $NODE_NAME \
-eval "Me = self(), spawn('"$NODE_NAME"', fun() -> X = (catch(machi_lifecycle_mgr:quick_admin_apply(\"$IN_FILE\", \"$RELATIVE_HOST\"))), Me ! {res, X} end), XX = receive {res, Res} -> Res after 10*1000 -> timeout end, io:format(user, \"Result: ~p\n\", [XX]), case XX of \
ok -> init:stop(); \ ok -> init:stop(); \
_ -> init:stop(1) \ _ -> init:stop(1) \
end." end."

View file

@ -47,6 +47,7 @@
{overlay, [ {overlay, [
{mkdir, "data"}, {mkdir, "data"},
{mkdir, "data/^PRESERVE"},
{mkdir, "log"}, {mkdir, "log"},
%% Copy base files for starting and interacting w/ node %% Copy base files for starting and interacting w/ node
@ -93,6 +94,19 @@
{template, "files/vm.args", "etc/vm.args"}, {template, "files/vm.args", "etc/vm.args"},
{template, "files/app.config", "etc/app.config"}, {template, "files/app.config", "etc/app.config"},
{mkdir, "etc/chain-config"},
{mkdir, "etc/flu-config"},
{mkdir, "etc/pending"},
{mkdir, "etc/rejected"},
%% Experiment: quick-admin
{mkdir, "etc/quick-admin-archive"},
{mkdir, "priv"},
{mkdir, "priv/quick-admin-examples"},
{copy, "../priv/quick-admin-examples/000", "priv/quick-admin-examples"},
{copy, "../priv/quick-admin-examples/001", "priv/quick-admin-examples"},
{copy, "../priv/quick-admin-examples/002", "priv/quick-admin-examples"},
{mkdir, "lib/basho-patches"} {mkdir, "lib/basho-patches"}
%% {copy, "../apps/machi/ebin/etop_txt.beam", "lib/basho-patches"} %% {copy, "../apps/machi/ebin/etop_txt.beam", "lib/basho-patches"}
]}. ]}.

View file

@ -330,18 +330,17 @@ message Mpb_ProjectionV1 {
required uint32 epoch_number = 1; required uint32 epoch_number = 1;
required bytes epoch_csum = 2; required bytes epoch_csum = 2;
required string author_server = 3; required string author_server = 3;
repeated string all_members = 4; required string chain_name = 4;
repeated string witnesses = 5; repeated string all_members = 5;
required Mpb_Now creation_time = 6; repeated string witnesses = 6;
required Mpb_Mode mode = 7; required Mpb_Now creation_time = 7;
repeated string upi = 8; required Mpb_Mode mode = 8;
repeated string repairing = 9; repeated string upi = 9;
repeated string down = 10; repeated string repairing = 10;
optional bytes opaque_flap = 11; repeated string down = 11;
optional bytes opaque_inner = 12; required bytes opaque_dbg = 12;
required bytes opaque_dbg = 13; required bytes opaque_dbg2 = 13;
required bytes opaque_dbg2 = 14; repeated Mpb_MembersDictEntry members_dict = 14;
repeated Mpb_MembersDictEntry members_dict = 15;
} }
////////////////////////////////////////// //////////////////////////////////////////

View file

@ -108,7 +108,7 @@
%% API %% API
-export([start_link/2, start_link/3, stop/1, ping/1, -export([start_link/2, start_link/3, stop/1, ping/1,
set_chain_members/2, set_chain_members/3, set_active/2, set_chain_members/2, set_chain_members/6, set_active/2,
trigger_react_to_env/1]). trigger_react_to_env/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, format_status/2, code_change/3]). terminate/2, format_status/2, code_change/3]).
@ -168,13 +168,22 @@ ping(Pid) ->
%% with lowest rank, i.e. name z* first, name a* last. %% with lowest rank, i.e. name z* first, name a* last.
set_chain_members(Pid, MembersDict) -> set_chain_members(Pid, MembersDict) ->
set_chain_members(Pid, MembersDict, []). set_chain_members(Pid, ch0_name, 0, ap_mode, MembersDict, []).
set_chain_members(Pid, MembersDict, Witness_list) -> set_chain_members(Pid, ChainName, OldEpoch, CMode, MembersDict, Witness_list)
case lists:all(fun(Witness) -> orddict:is_key(Witness, MembersDict) end, when is_atom(ChainName) andalso
Witness_list) of is_integer(OldEpoch) andalso OldEpoch >= 0 andalso
(CMode == ap_mode orelse CMode == cp_mode) andalso
is_list(MembersDict) andalso
is_list(Witness_list) ->
case lists:all(fun({X, #p_srvr{name=X}}) -> true;
(_) -> false
end, MembersDict)
andalso
lists:all(fun(Witness) -> orddict:is_key(Witness, MembersDict) end,
Witness_list) of
true -> true ->
Cmd = {set_chain_members, MembersDict, Witness_list}, Cmd = {set_chain_members, ChainName, OldEpoch, CMode, MembersDict, Witness_list},
gen_server:call(Pid, Cmd, infinity); gen_server:call(Pid, Cmd, infinity);
false -> false ->
{error, bad_arg} {error, bad_arg}
@ -281,7 +290,7 @@ init({MyName, InitMembersDict, MgrOpts}) ->
last_down=[no_such_server_initial_value_only], last_down=[no_such_server_initial_value_only],
fitness_svr=machi_flu_psup:make_fitness_regname(MyName) fitness_svr=machi_flu_psup:make_fitness_regname(MyName)
}, Proj), }, Proj),
{_, S2} = do_set_chain_members_dict(MembersDict, S), S2 = do_set_chain_members_dict(MembersDict, S),
S3 = if ActiveP == false -> S3 = if ActiveP == false ->
S2; S2;
ActiveP == true -> ActiveP == true ->
@ -291,12 +300,17 @@ init({MyName, InitMembersDict, MgrOpts}) ->
handle_call({ping}, _From, S) -> handle_call({ping}, _From, S) ->
{reply, pong, S}; {reply, pong, S};
handle_call({set_chain_members, MembersDict, Witness_list}, _From, handle_call({set_chain_members, SetChainName, SetOldEpoch, CMode,
MembersDict, Witness_list}, _From,
#ch_mgr{name=MyName, #ch_mgr{name=MyName,
proj=#projection_v1{all_members=OldAll_list, proj=#projection_v1{all_members=OldAll_list,
epoch_number=OldEpoch, epoch_number=OldEpoch,
chain_name=ChainName,
upi=OldUPI}=OldProj}=S) -> upi=OldUPI}=OldProj}=S) ->
{Reply, S2} = do_set_chain_members_dict(MembersDict, S), true = (OldEpoch == 0) % in this case we want unconditional set of ch name
orelse
(SetOldEpoch == OldEpoch andalso SetChainName == ChainName),
S2 = do_set_chain_members_dict(MembersDict, S),
%% TODO: should there be any additional sanity checks? Right now, %% TODO: should there be any additional sanity checks? Right now,
%% if someone does something bad, then do_react_to_env() will %% if someone does something bad, then do_react_to_env() will
%% crash, which will crash us, and we'll restart in a sane & old %% crash, which will crash us, and we'll restart in a sane & old
@ -310,10 +324,10 @@ handle_call({set_chain_members, MembersDict, Witness_list}, _From,
{NUPI, All_list -- NUPI} {NUPI, All_list -- NUPI}
end, end,
NewEpoch = OldEpoch + ?SET_CHAIN_MEMBERS_EPOCH_SKIP, NewEpoch = OldEpoch + ?SET_CHAIN_MEMBERS_EPOCH_SKIP,
CMode = calc_consistency_mode(Witness_list),
ok = set_consistency_mode(machi_flu_psup:make_proj_supname(MyName), CMode), ok = set_consistency_mode(machi_flu_psup:make_proj_supname(MyName), CMode),
NewProj = machi_projection:update_checksum( NewProj = machi_projection:update_checksum(
OldProj#projection_v1{author_server=MyName, OldProj#projection_v1{author_server=MyName,
chain_name=SetChainName,
creation_time=now(), creation_time=now(),
mode=CMode, mode=CMode,
epoch_number=NewEpoch, epoch_number=NewEpoch,
@ -325,7 +339,11 @@ handle_call({set_chain_members, MembersDict, Witness_list}, _From,
members_dict=MembersDict}), members_dict=MembersDict}),
S3 = set_proj(S2#ch_mgr{proj_history=queue:new(), S3 = set_proj(S2#ch_mgr{proj_history=queue:new(),
consistency_mode=CMode}, NewProj), consistency_mode=CMode}, NewProj),
{_QQ, S4} = do_react_to_env(S3), {Res, S4} = do_react_to_env(S3),
Reply = case Res of
{_,_,_} -> ok;
_ -> Res
end,
{reply, Reply, S4}; {reply, Reply, S4};
handle_call({set_active, Boolean}, _From, #ch_mgr{timer=TRef}=S) -> handle_call({set_active, Boolean}, _From, #ch_mgr{timer=TRef}=S) ->
case {Boolean, TRef} of case {Boolean, TRef} of
@ -357,8 +375,8 @@ handle_call({test_read_latest_public_projection, ReadRepairP}, _From, S) ->
{reply, Res, S2}; {reply, Res, S2};
handle_call({trigger_react_to_env}=Call, _From, S) -> handle_call({trigger_react_to_env}=Call, _From, S) ->
gobble_calls(Call), gobble_calls(Call),
{TODOtodo, S2} = do_react_to_env(S), {Res, S2} = do_react_to_env(S),
{reply, TODOtodo, S2}; {reply, Res, S2};
handle_call(_Call, _From, S) -> handle_call(_Call, _From, S) ->
io:format(user, "\nBad call to ~p: ~p\n", [S#ch_mgr.name, _Call]), io:format(user, "\nBad call to ~p: ~p\n", [S#ch_mgr.name, _Call]),
{reply, whaaaaaaaaaa, S}. {reply, whaaaaaaaaaa, S}.
@ -535,6 +553,7 @@ cl_write_public_proj2(FLUs, Partitions, Epoch, Proj, IgnoreWrittenErrorP, S) ->
end end
end, {true, []}, FLUs), end, {true, []}, FLUs),
%% io:format(user, "\nWrite public ~w by ~w: ~w\n", [Epoch, S#ch_mgr.name, Rs]), %% io:format(user, "\nWrite public ~w by ~w: ~w\n", [Epoch, S#ch_mgr.name, Rs]),
%% io:format(user, "mgr ~w epoch ~w Rs ~p\n", [S#ch_mgr.name, Epoch, Rs]),
{{remote_write_results, Rs}, S}. {{remote_write_results, Rs}, S}.
do_cl_read_latest_public_projection(ReadRepairP, do_cl_read_latest_public_projection(ReadRepairP,
@ -556,12 +575,41 @@ do_cl_read_latest_public_projection(ReadRepairP,
read_latest_projection_call_only(ProjectionType, AllHosed, read_latest_projection_call_only(ProjectionType, AllHosed,
#ch_mgr{proj=CurrentProj}=S) -> #ch_mgr{proj=CurrentProj}=S) ->
#projection_v1{all_members=All_list} = CurrentProj, #projection_v1{all_members=All_list} = CurrentProj,
All_queried_list = All_list -- AllHosed, All_queried_list = lists:sort(All_list -- AllHosed),
read_latest_projection_call_only1(ProjectionType, AllHosed,
All_queried_list, S).
{Rs, S2} = read_latest_projection_call_only2(ProjectionType, read_latest_projection_call_only1(ProjectionType, AllHosed,
All_queried_list, S), All_queried_list, S) ->
FLUsRs = lists:zip(All_queried_list, Rs), {Rs_tmp, S2} = read_latest_projection_call_only2(ProjectionType,
{All_queried_list, FLUsRs, S2}. All_queried_list, S),
New_all_maybe =
lists:usort(
lists:flatten(
[A_l || #projection_v1{all_members=A_l} <- Rs_tmp])) -- AllHosed,
case New_all_maybe -- All_queried_list of
[] ->
FLUsRs = lists:zip(All_queried_list, Rs_tmp),
{All_queried_list, FLUsRs, S2};
[AnotherFLU|_] ->
%% Stop AnotherFLU proxy, in unexpected case where it's open
try
Proxy = proxy_pid(AnotherFLU, S2),
?FLU_PC:stop_proxies([Proxy])
catch _:_ -> ok
end,
MD = orddict:from_list(
lists:usort(
lists:flatten(
[orddict:to_list(D) || #projection_v1{members_dict=D} <- Rs_tmp]))),
Another_P_srvr = orddict:fetch(AnotherFLU, MD),
{ok, Proxy2} = ?FLU_PC:start_link(Another_P_srvr),
S3 = S2#ch_mgr{proxies_dict=orddict:store(AnotherFLU, Proxy2,
S2#ch_mgr.proxies_dict)},
read_latest_projection_call_only1(
ProjectionType, AllHosed,
lists:usort([AnotherFLU|All_queried_list]), S3)
end.
read_latest_projection_call_only2(ProjectionType, All_queried_list, S) -> read_latest_projection_call_only2(ProjectionType, All_queried_list, S) ->
{_UpNodes, Partitions, S2} = calc_up_nodes(S), {_UpNodes, Partitions, S2} = calc_up_nodes(S),
@ -601,6 +649,8 @@ rank_and_sort_projections_with_extra(All_queried_list, FLUsRs, ProjectionType,
Witness_list = CurrentProj#projection_v1.witnesses, Witness_list = CurrentProj#projection_v1.witnesses,
NoneProj = make_none_projection(0, MyName, [], Witness_list, NoneProj = make_none_projection(0, MyName, [], Witness_list,
orddict:new()), orddict:new()),
ChainName = CurrentProj#projection_v1.chain_name,
NoneProj2 = NoneProj#projection_v1{chain_name=ChainName},
Extra2 = [{all_members_replied, true}, Extra2 = [{all_members_replied, true},
{all_queried_list, All_queried_list}, {all_queried_list, All_queried_list},
{flus_rs, FLUsRs}, {flus_rs, FLUsRs},
@ -609,7 +659,7 @@ rank_and_sort_projections_with_extra(All_queried_list, FLUsRs, ProjectionType,
{bad_answer_flus, BadAnswerFLUs}, {bad_answer_flus, BadAnswerFLUs},
{bad_answers, BadAnswers}, {bad_answers, BadAnswers},
{not_unanimous_answers, []}], {not_unanimous_answers, []}],
{not_unanimous, NoneProj, Extra2, S}; {not_unanimous, NoneProj2, Extra2, S};
ProjectionType == public, UnwrittenRs /= [] -> ProjectionType == public, UnwrittenRs /= [] ->
{needs_repair, FLUsRs, [flarfus], S}; {needs_repair, FLUsRs, [flarfus], S};
true -> true ->
@ -723,13 +773,14 @@ calc_projection2(LastProj, RelativeToServer, AllHosed, Dbg,
runenv=RunEnv1, runenv=RunEnv1,
repair_final_status=RepairFS}=S) -> repair_final_status=RepairFS}=S) ->
#projection_v1{epoch_number=OldEpochNum, #projection_v1{epoch_number=OldEpochNum,
chain_name=ChainName,
members_dict=MembersDict, members_dict=MembersDict,
witnesses=OldWitness_list, witnesses=OldWitness_list,
upi=OldUPI_list, upi=OldUPI_list,
repairing=OldRepairing_list repairing=OldRepairing_list
} = LastProj, } = LastProj,
LastUp = lists:usort(OldUPI_list ++ OldRepairing_list), LastUp = lists:usort(OldUPI_list ++ OldRepairing_list),
AllMembers = (S#ch_mgr.proj)#projection_v1.all_members, AllMembers = CurrentProj#projection_v1.all_members,
{Up0, Partitions, RunEnv2} = calc_up_nodes(MyName, {Up0, Partitions, RunEnv2} = calc_up_nodes(MyName,
AllMembers, RunEnv1), AllMembers, RunEnv1),
Up = Up0 -- AllHosed, Up = Up0 -- AllHosed,
@ -821,10 +872,11 @@ calc_projection2(LastProj, RelativeToServer, AllHosed, Dbg,
end, end,
?REACT({calc,?LINE,[{new_upi, NewUPI},{new_rep, NewRepairing}]}), ?REACT({calc,?LINE,[{new_upi, NewUPI},{new_rep, NewRepairing}]}),
P = machi_projection:new(OldEpochNum + 1, P0 = machi_projection:new(OldEpochNum + 1,
MyName, MembersDict, Down, NewUPI, NewRepairing, MyName, MembersDict, Down, NewUPI, NewRepairing,
D_foo ++ D_foo ++
Dbg ++ [{ps, Partitions},{nodes_up, Up}]), Dbg ++ [{ps, Partitions},{nodes_up, Up}]),
P1 = P0#projection_v1{chain_name=ChainName},
P2 = if CMode == cp_mode -> P2 = if CMode == cp_mode ->
UpWitnesses = [W || W <- Up, lists:member(W, OldWitness_list)], UpWitnesses = [W || W <- Up, lists:member(W, OldWitness_list)],
Majority = full_majority_size(AllMembers), Majority = full_majority_size(AllMembers),
@ -833,7 +885,7 @@ calc_projection2(LastProj, RelativeToServer, AllHosed, Dbg,
SoFar = length(NewUPI ++ NewRepairing), SoFar = length(NewUPI ++ NewRepairing),
if SoFar >= Majority -> if SoFar >= Majority ->
?REACT({calc,?LINE,[]}), ?REACT({calc,?LINE,[]}),
P; P1;
true -> true ->
Need = Majority - SoFar, Need = Majority - SoFar,
UpWitnesses = [W || W <- Up, UpWitnesses = [W || W <- Up,
@ -842,7 +894,7 @@ calc_projection2(LastProj, RelativeToServer, AllHosed, Dbg,
Ws = lists:sublist(UpWitnesses, Need), Ws = lists:sublist(UpWitnesses, Need),
?REACT({calc,?LINE,[{ws, Ws}]}), ?REACT({calc,?LINE,[{ws, Ws}]}),
machi_projection:update_checksum( machi_projection:update_checksum(
P#projection_v1{upi=Ws++NewUPI}); P1#projection_v1{upi=Ws++NewUPI});
true -> true ->
?REACT({calc,?LINE,[]}), ?REACT({calc,?LINE,[]}),
P_none0 = make_none_projection( P_none0 = make_none_projection(
@ -855,6 +907,7 @@ calc_projection2(LastProj, RelativeToServer, AllHosed, Dbg,
"Not enough witnesses are available now" "Not enough witnesses are available now"
end, end,
P_none1 = P_none0#projection_v1{ P_none1 = P_none0#projection_v1{
chain_name=ChainName,
%% Stable creation time! %% Stable creation time!
creation_time={1,2,3}, creation_time={1,2,3},
dbg=[{none_projection,true}, dbg=[{none_projection,true},
@ -875,7 +928,7 @@ calc_projection2(LastProj, RelativeToServer, AllHosed, Dbg,
end; end;
CMode == ap_mode -> CMode == ap_mode ->
?REACT({calc,?LINE,[]}), ?REACT({calc,?LINE,[]}),
P P1
end, end,
P3 = machi_projection:update_checksum( P3 = machi_projection:update_checksum(
P2#projection_v1{mode=CMode, witnesses=OldWitness_list}), P2#projection_v1{mode=CMode, witnesses=OldWitness_list}),
@ -1027,31 +1080,33 @@ rank_projection(#projection_v1{author_server=_Author,
do_set_chain_members_dict(MembersDict, #ch_mgr{proxies_dict=OldProxiesDict}=S)-> do_set_chain_members_dict(MembersDict, #ch_mgr{proxies_dict=OldProxiesDict}=S)->
_ = ?FLU_PC:stop_proxies(OldProxiesDict), _ = ?FLU_PC:stop_proxies(OldProxiesDict),
ProxiesDict = ?FLU_PC:start_proxies(MembersDict), ProxiesDict = ?FLU_PC:start_proxies(MembersDict),
{ok, S#ch_mgr{members_dict=MembersDict, S#ch_mgr{members_dict=MembersDict,
proxies_dict=ProxiesDict}}. proxies_dict=ProxiesDict}.
do_react_to_env(#ch_mgr{name=MyName, do_react_to_env(#ch_mgr{name=MyName,
proj=#projection_v1{epoch_number=Epoch, proj=#projection_v1{epoch_number=Epoch,
members_dict=[]=OldDict}=OldProj, members_dict=[]=OldDict}=OldProj,
opts=Opts}=S) -> opts=Opts}=S) ->
put(ttt, [?LINE]),
%% Read from our local *public* projection store. If some other %% Read from our local *public* projection store. If some other
%% chain member has written something there, and if we are a %% chain member has written something there, and if we are a
%% member of that chain, then we'll adopt that projection and then %% member of that chain, then we'll adopt that projection and then
%% start actively humming in that chain. %% start actively humming in that chain.
{NewMembersDict, NewProj} = {NewMD, NewProj} =
get_my_public_proj_boot_info(Opts, OldDict, OldProj), get_my_public_proj_boot_info(Opts, OldDict, OldProj),
case orddict:is_key(MyName, NewMembersDict) of case orddict:is_key(MyName, NewMD) of
false -> false ->
{{empty_members_dict, [], Epoch}, S}; {{empty_members_dict1, [], Epoch}, S};
true -> true ->
{_, S2} = do_set_chain_members_dict(NewMembersDict, S), CMode = NewProj#projection_v1.mode,
CMode = calc_consistency_mode(NewProj#projection_v1.witnesses), S2 = do_set_chain_members_dict(NewMD, S),
{{empty_members_dict, [], Epoch}, {Reply, S3} = react_to_env_C110(NewProj,
set_proj(S2#ch_mgr{members_dict=NewMembersDict, S2#ch_mgr{members_dict=NewMD,
consistency_mode=CMode}, NewProj)} consistency_mode=CMode}),
{Reply, S3}
end; end;
do_react_to_env(S) -> do_react_to_env(S) ->
put(ttt, [?LINE]), put(ttt, [?LINE]),
%% The not_sanes manager counting dictionary is not strictly %% The not_sanes manager counting dictionary is not strictly
%% limited to flapping scenarios. (Though the mechanism first %% limited to flapping scenarios. (Though the mechanism first
%% started as a way to deal with rare flapping scenarios.) %% started as a way to deal with rare flapping scenarios.)
@ -1150,7 +1205,7 @@ react_to_env_A10(S) ->
?REACT(a10), ?REACT(a10),
react_to_env_A20(0, poll_private_proj_is_upi_unanimous(S)). react_to_env_A20(0, poll_private_proj_is_upi_unanimous(S)).
react_to_env_A20(Retries, #ch_mgr{name=MyName}=S) -> react_to_env_A20(Retries, #ch_mgr{name=MyName, proj=P_current}=S) ->
?REACT(a20), ?REACT(a20),
init_remember_down_list(), init_remember_down_list(),
{UnanimousTag, P_latest, ReadExtra, S2} = {UnanimousTag, P_latest, ReadExtra, S2} =
@ -1178,17 +1233,34 @@ react_to_env_A20(Retries, #ch_mgr{name=MyName}=S) ->
false when P_latest#projection_v1.epoch_number /= LastComplaint, false when P_latest#projection_v1.epoch_number /= LastComplaint,
P_latest#projection_v1.all_members /= [] -> P_latest#projection_v1.all_members /= [] ->
put(rogue_server_epoch, P_latest#projection_v1.epoch_number), put(rogue_server_epoch, P_latest#projection_v1.epoch_number),
error_logger:info_msg("Chain manager ~p found latest public " error_logger:info_msg("Chain manager ~w found latest public "
"projection ~p has author ~p has a " "projection ~w with author ~w has a "
"members list ~p that does not include me.\n", "members list ~w that does not include me. "
"We assume this is a result of administrator "
"action and will thus wedge ourselves until "
"we are re-added to the chain or shutdown.\n",
[S#ch_mgr.name, [S#ch_mgr.name,
P_latest#projection_v1.epoch_number, P_latest#projection_v1.epoch_number,
P_latest#projection_v1.author_server, P_latest#projection_v1.author_server,
P_latest#projection_v1.all_members]); P_latest#projection_v1.all_members]),
EpochID = machi_projection:make_epoch_id(P_current),
ProjStore = get_projection_store_pid_or_regname(S),
{ok, NotifyPid} = machi_projection_store:get_wedge_notify_pid(ProjStore),
_QQ = machi_flu1:update_wedge_state(NotifyPid, true, EpochID),
#projection_v1{epoch_number=Epoch,
chain_name=ChainName,
all_members=All_list,
witnesses=Witness_list,
members_dict=MembersDict} = P_current,
P_none0 = make_none_projection(Epoch,
MyName, All_list, Witness_list, MembersDict),
P_none = P_none0#projection_v1{chain_name=ChainName},
{{now_using,[],Epoch}, set_proj(S2, P_none)};
_ -> _ ->
ok react_to_env_A21(Retries, UnanimousTag, P_latest, ReadExtra, S2)
end, end.
react_to_env_A21(Retries, UnanimousTag, P_latest, ReadExtra, S) ->
%% The UnanimousTag isn't quite sufficient for our needs. We need %% The UnanimousTag isn't quite sufficient for our needs. We need
%% to determine if *all* of the UPI+Repairing FLUs are members of %% to determine if *all* of the UPI+Repairing FLUs are members of
%% the unanimous server replies. All Repairing FLUs should be up %% the unanimous server replies. All Repairing FLUs should be up
@ -1233,7 +1305,7 @@ react_to_env_A20(Retries, #ch_mgr{name=MyName}=S) ->
true -> true ->
exit({badbad, UnanimousTag}) exit({badbad, UnanimousTag})
end, end,
react_to_env_A29(Retries, P_latest, LatestUnanimousP, ReadExtra, S2). react_to_env_A29(Retries, P_latest, LatestUnanimousP, ReadExtra, S).
react_to_env_A29(Retries, P_latest, LatestUnanimousP, _ReadExtra, react_to_env_A29(Retries, P_latest, LatestUnanimousP, _ReadExtra,
#ch_mgr{consistency_mode=CMode, #ch_mgr{consistency_mode=CMode,
@ -1267,7 +1339,6 @@ react_to_env_A29(Retries, P_latest, LatestUnanimousP, _ReadExtra,
?REACT({a29, ?LINE, ?REACT({a29, ?LINE,
[{zerf_backstop, true}, [{zerf_backstop, true},
{zerf_in, machi_projection:make_summary(Zerf)}]}), {zerf_in, machi_projection:make_summary(Zerf)}]}),
%% io:format(user, "zerf_in: A29: ~p: ~w\n\t~p\n", [MyName, machi_projection:make_summary(Zerf), get(yyy_hack)]),
#projection_v1{dbg=ZerfDbg} = Zerf, #projection_v1{dbg=ZerfDbg} = Zerf,
Backstop = if Zerf#projection_v1.upi == [] -> Backstop = if Zerf#projection_v1.upi == [] ->
[]; [];
@ -1287,7 +1358,8 @@ react_to_env_A29(Retries, P_latest, LatestUnanimousP, _ReadExtra,
end. end.
react_to_env_A30(Retries, P_latest, LatestUnanimousP, P_current_calc, react_to_env_A30(Retries, P_latest, LatestUnanimousP, P_current_calc,
#ch_mgr{name=MyName, consistency_mode=CMode} = S) -> #ch_mgr{name=MyName, proj=P_current,
consistency_mode=CMode} = S) ->
V = case file:read_file("/tmp/moomoo."++atom_to_list(S#ch_mgr.name)) of {ok,_} -> true; _ -> false end, V = case file:read_file("/tmp/moomoo."++atom_to_list(S#ch_mgr.name)) of {ok,_} -> true; _ -> false end,
if V -> io:format(user, "A30: ~w: ~p\n", [S#ch_mgr.name, get(react)]); true -> ok end, if V -> io:format(user, "A30: ~w: ~p\n", [S#ch_mgr.name, get(react)]); true -> ok end,
?REACT(a30), ?REACT(a30),
@ -1307,15 +1379,17 @@ react_to_env_A30(Retries, P_latest, LatestUnanimousP, P_current_calc,
P = #projection_v1{down=Down} = P = #projection_v1{down=Down} =
make_none_projection(Epoch + 1, MyName, All_list, make_none_projection(Epoch + 1, MyName, All_list,
Witness_list, MembersDict), Witness_list, MembersDict),
ChainName = P_current#projection_v1.chain_name,
P1 = P#projection_v1{chain_name=ChainName},
P_newprop = if CMode == ap_mode -> P_newprop = if CMode == ap_mode ->
%% Not really none proj: just myself, AP style %% Not really none proj: just myself, AP style
machi_projection:update_checksum( machi_projection:update_checksum(
P#projection_v1{upi=[MyName], P1#projection_v1{upi=[MyName],
down=Down -- [MyName], down=Down -- [MyName],
dbg=[{hosed_list,AllHosed}]}); dbg=[{hosed_list,AllHosed}]});
CMode == cp_mode -> CMode == cp_mode ->
machi_projection:update_checksum( machi_projection:update_checksum(
P#projection_v1{dbg=[{hosed_list,AllHosed}]}) P1#projection_v1{dbg=[{hosed_list,AllHosed}]})
end, end,
react_to_env_A40(Retries, P_newprop, P_latest, LatestUnanimousP, react_to_env_A40(Retries, P_newprop, P_latest, LatestUnanimousP,
P_current_calc, true, S); P_current_calc, true, S);
@ -1388,13 +1462,22 @@ react_to_env_A40(Retries, P_newprop, P_latest, LatestUnanimousP,
%% we have a disagreement. %% we have a disagreement.
not ordsets:is_disjoint(P_latest_s, Down_s) not ordsets:is_disjoint(P_latest_s, Down_s)
end, end,
AmExcludedFromLatestAll_p =
P_latest#projection_v1.epoch_number /= 0
andalso
(not lists:member(MyName, P_latest#projection_v1.all_members)),
?REACT({a40, ?LINE, ?REACT({a40, ?LINE,
[{latest_author, P_latest#projection_v1.author_server}, [{latest_author, P_latest#projection_v1.author_server},
{am_excluded_from_latest_all_p, AmExcludedFromLatestAll_p},
{author_is_down_p, LatestAuthorDownP}, {author_is_down_p, LatestAuthorDownP},
{rank_latest, Rank_latest}, {rank_latest, Rank_latest},
{rank_newprop, Rank_newprop}]}), {rank_newprop, Rank_newprop}]}),
if if
AmExcludedFromLatestAll_p ->
?REACT({a40, ?LINE, [{latest,machi_projection:make_summary(P_latest)}]}),
react_to_env_A50(P_latest, [], S);
AmHosedP -> AmHosedP ->
ExpectedUPI = if CMode == cp_mode -> []; ExpectedUPI = if CMode == cp_mode -> [];
CMode == ap_mode -> [MyName] CMode == ap_mode -> [MyName]
@ -1560,12 +1643,10 @@ react_to_env_A40(Retries, P_newprop, P_latest, LatestUnanimousP,
end, end,
if GoTo50_p -> if GoTo50_p ->
?REACT({a40, ?LINE, []}), ?REACT({a40, ?LINE, []}),
%% io:format(user, "CONFIRM debug question line ~w\n", [?LINE]),
FinalProps = [{throttle_seconds, 0}], FinalProps = [{throttle_seconds, 0}],
react_to_env_A50(P_latest, FinalProps, S); react_to_env_A50(P_latest, FinalProps, S);
true -> true ->
?REACT({a40, ?LINE, []}), ?REACT({a40, ?LINE, []}),
io:format(user, "CONFIRM debug question line ~w\n", [?LINE]),
react_to_env_C300(P_newprop, P_latest, S) react_to_env_C300(P_newprop, P_latest, S)
end end
end. end.
@ -1575,7 +1656,6 @@ react_to_env_A50(P_latest, FinalProps, #ch_mgr{proj=P_current}=S) ->
?REACT({a50, ?LINE, [{current_epoch, P_current#projection_v1.epoch_number}, ?REACT({a50, ?LINE, [{current_epoch, P_current#projection_v1.epoch_number},
{latest_epoch, P_latest#projection_v1.epoch_number}, {latest_epoch, P_latest#projection_v1.epoch_number},
{final_props, FinalProps}]}), {final_props, FinalProps}]}),
%% if S#ch_mgr.name == c -> io:format(user, "A50: ~w: ~p\n", [S#ch_mgr.name, get(react)]); true -> ok end,
V = case file:read_file("/tmp/moomoo."++atom_to_list(S#ch_mgr.name)) of {ok,_} -> true; _ -> false end, V = case file:read_file("/tmp/moomoo."++atom_to_list(S#ch_mgr.name)) of {ok,_} -> true; _ -> false end,
if V -> io:format(user, "A50: ~w: ~p\n", [S#ch_mgr.name, get(react)]); true -> ok end, if V -> io:format(user, "A50: ~w: ~p\n", [S#ch_mgr.name, get(react)]); true -> ok end,
{{no_change, FinalProps, P_current#projection_v1.epoch_number}, S}. {{no_change, FinalProps, P_current#projection_v1.epoch_number}, S}.
@ -1850,7 +1930,9 @@ react_to_env_C103(#projection_v1{epoch_number=_Epoch_newprop} = _P_newprop,
members_dict=MembersDict} = P_current, members_dict=MembersDict} = P_current,
P_none0 = make_none_projection(Epoch_latest, P_none0 = make_none_projection(Epoch_latest,
MyName, All_list, Witness_list, MembersDict), MyName, All_list, Witness_list, MembersDict),
P_none1 = P_none0#projection_v1{dbg=[{none_projection,true}]}, ChainName = P_current#projection_v1.chain_name,
P_none1 = P_none0#projection_v1{chain_name=ChainName,
dbg=[{none_projection,true}]},
P_none = machi_projection:update_checksum(P_none1), P_none = machi_projection:update_checksum(P_none1),
?REACT({c103, ?LINE, ?REACT({c103, ?LINE,
[{current_epoch, P_current#projection_v1.epoch_number}, [{current_epoch, P_current#projection_v1.epoch_number},
@ -2206,6 +2288,7 @@ projection_transition_is_sane_except_si_epoch(
creation_time=CreationTime1, creation_time=CreationTime1,
mode=CMode1, mode=CMode1,
author_server=AuthorServer1, author_server=AuthorServer1,
chain_name=ChainName1,
all_members=All_list1, all_members=All_list1,
witnesses=Witness_list1, witnesses=Witness_list1,
down=Down_list1, down=Down_list1,
@ -2217,6 +2300,7 @@ projection_transition_is_sane_except_si_epoch(
creation_time=CreationTime2, creation_time=CreationTime2,
mode=CMode2, mode=CMode2,
author_server=AuthorServer2, author_server=AuthorServer2,
chain_name=ChainName2,
all_members=All_list2, all_members=All_list2,
witnesses=Witness_list2, witnesses=Witness_list2,
down=Down_list2, down=Down_list2,
@ -2237,7 +2321,8 @@ projection_transition_is_sane_except_si_epoch(
true = is_binary(CSum1) andalso is_binary(CSum2), true = is_binary(CSum1) andalso is_binary(CSum2),
{_,_,_} = CreationTime1, {_,_,_} = CreationTime1,
{_,_,_} = CreationTime2, {_,_,_} = CreationTime2,
true = is_atom(AuthorServer1) andalso is_atom(AuthorServer2), % todo type may change? true = is_atom(AuthorServer1) andalso is_atom(AuthorServer2),
true = is_atom(ChainName1) andalso is_atom(ChainName2),
true = is_list(All_list1) andalso is_list(All_list2), true = is_list(All_list1) andalso is_list(All_list2),
true = is_list(Witness_list1) andalso is_list(Witness_list2), true = is_list(Witness_list1) andalso is_list(Witness_list2),
true = is_list(Down_list1) andalso is_list(Down_list2), true = is_list(Down_list1) andalso is_list(Down_list2),
@ -2249,6 +2334,9 @@ projection_transition_is_sane_except_si_epoch(
%% projection_transition_is_sane_with_si_epoch(). %% projection_transition_is_sane_with_si_epoch().
true = Epoch2 >= Epoch1, true = Epoch2 >= Epoch1,
%% Don't change chain names in the middle of the stream.
true = (ChainName1 == ChainName2),
%% No duplicates %% No duplicates
true = lists:sort(Witness_list2) == lists:usort(Witness_list2), true = lists:sort(Witness_list2) == lists:usort(Witness_list2),
true = lists:sort(Down_list2) == lists:usort(Down_list2), true = lists:sort(Down_list2) == lists:usort(Down_list2),
@ -2256,7 +2344,7 @@ projection_transition_is_sane_except_si_epoch(
true = lists:sort(Repairing_list2) == lists:usort(Repairing_list2), true = lists:sort(Repairing_list2) == lists:usort(Repairing_list2),
%% Disjoint-ness %% Disjoint-ness
All_list1 = All_list2, % todo will probably change %% %% %% %% %% %% %% %% All_list1 = All_list2, % todo will probably change
%% true = lists:sort(All_list2) == lists:sort(Down_list2 ++ UPI_list2 ++ %% true = lists:sort(All_list2) == lists:sort(Down_list2 ++ UPI_list2 ++
%% Repairing_list2), %% Repairing_list2),
[] = [X || X <- Witness_list2, not lists:member(X, All_list2)], [] = [X || X <- Witness_list2, not lists:member(X, All_list2)],
@ -2361,8 +2449,7 @@ poll_private_proj_is_upi_unanimous_sleep(Count, #ch_mgr{runenv=RunEnv}=S) ->
S2 S2
end. end.
poll_private_proj_is_upi_unanimous3(#ch_mgr{name=MyName, proj=P_current, poll_private_proj_is_upi_unanimous3(#ch_mgr{name=MyName, proj=P_current} = S) ->
opts=MgrOpts} = S) ->
UPI = P_current#projection_v1.upi, UPI = P_current#projection_v1.upi,
EpochID = machi_projection:make_epoch_id(P_current), EpochID = machi_projection:make_epoch_id(P_current),
{Rs, S2} = read_latest_projection_call_only2(private, UPI, S), {Rs, S2} = read_latest_projection_call_only2(private, UPI, S),
@ -2395,12 +2482,7 @@ poll_private_proj_is_upi_unanimous3(#ch_mgr{name=MyName, proj=P_current,
Annotation = make_annotation(EpochID, Now), Annotation = make_annotation(EpochID, Now),
NewDbg2 = [Annotation|P_currentFull#projection_v1.dbg2], NewDbg2 = [Annotation|P_currentFull#projection_v1.dbg2],
NewProj = P_currentFull#projection_v1{dbg2=NewDbg2}, NewProj = P_currentFull#projection_v1{dbg2=NewDbg2},
ProjStore = case get_projection_store_regname(MgrOpts) of ProjStore = get_projection_store_pid_or_regname(S),
undefined ->
machi_flu_psup:make_proj_supname(MyName);
PStr ->
PStr
end,
#projection_v1{epoch_number=_EpochRep, #projection_v1{epoch_number=_EpochRep,
epoch_csum= <<_CSumRep:4/binary,_/binary>>, epoch_csum= <<_CSumRep:4/binary,_/binary>>,
upi=_UPIRep, upi=_UPIRep,
@ -2420,8 +2502,6 @@ poll_private_proj_is_upi_unanimous3(#ch_mgr{name=MyName, proj=P_current,
S2 S2
end; end;
_Else -> _Else ->
%% io:format(user, "poll by ~w: want ~W got ~W\n",
%% [MyName, EpochID, 6, _Else, 8]),
S2 S2
end. end.
@ -2518,8 +2598,8 @@ do_repair(#ch_mgr{name=MyName,
T1 = os:timestamp(), T1 = os:timestamp(),
RepairId = proplists:get_value(repair_id, Opts, id1), RepairId = proplists:get_value(repair_id, Opts, id1),
error_logger:info_msg( error_logger:info_msg(
"Repair start: tail ~p of ~p -> ~p, ~p ID ~w\n", "Repair ~w start: tail ~p of ~p -> ~p, ~p\n",
[MyName, UPI0, Repairing, RepairMode, RepairId]), [RepairId, MyName, UPI0, Repairing, RepairMode]),
UPI = UPI0 -- Witness_list, UPI = UPI0 -- Witness_list,
Res = machi_chain_repair:repair(RepairMode, MyName, Repairing, UPI, Res = machi_chain_repair:repair(RepairMode, MyName, Repairing, UPI,
@ -2532,10 +2612,9 @@ do_repair(#ch_mgr{name=MyName,
end, end,
Stats = [{K, ets:lookup_element(ETS, K, 2)} || K <- ETS_T_Keys], Stats = [{K, ets:lookup_element(ETS, K, 2)} || K <- ETS_T_Keys],
error_logger:info_msg( error_logger:info_msg(
"Repair ~s: tail ~p of ~p finished ~p repair ID ~w: " "Repair ~w ~s: tail ~p of ~p finished ~p: "
"~p\nStats ~p\n", "~p Stats: ~p\n",
[Summary, MyName, UPI0, RepairMode, RepairId, [RepairId, Summary, MyName, UPI0, RepairMode, Res, Stats]),
Res, Stats]),
ets:delete(ETS), ets:delete(ETS),
exit({repair_final_status, Res}); exit({repair_final_status, Res});
_ -> _ ->
@ -2772,6 +2851,7 @@ full_majority_size(L) when is_list(L) ->
full_majority_size(length(L)). full_majority_size(length(L)).
make_zerf(#projection_v1{epoch_number=OldEpochNum, make_zerf(#projection_v1{epoch_number=OldEpochNum,
chain_name=ChainName,
all_members=AllMembers, all_members=AllMembers,
members_dict=MembersDict, members_dict=MembersDict,
witnesses=OldWitness_list witnesses=OldWitness_list
@ -2794,7 +2874,8 @@ make_zerf(#projection_v1{epoch_number=OldEpochNum,
MyName, AllMembers, OldWitness_list, MyName, AllMembers, OldWitness_list,
MembersDict), MembersDict),
machi_projection:update_checksum( machi_projection:update_checksum(
P#projection_v1{mode=cp_mode, P#projection_v1{chain_name=ChainName,
mode=cp_mode,
dbg2=[zerf_none,{up,Up},{maj,MajoritySize}]}); dbg2=[zerf_none,{up,Up},{maj,MajoritySize}]});
true -> true ->
make_zerf2(OldEpochNum, Up, MajoritySize, MyName, make_zerf2(OldEpochNum, Up, MajoritySize, MyName,
@ -2809,7 +2890,6 @@ make_zerf2(OldEpochNum, Up, MajoritySize, MyName, AllMembers, OldWitness_list,
Proj2 = Proj#projection_v1{dbg2=[{make_zerf,Epoch}, Proj2 = Proj#projection_v1{dbg2=[{make_zerf,Epoch},
{yyy_hack, get(yyy_hack)}, {yyy_hack, get(yyy_hack)},
{up,Up},{maj,MajoritySize}]}, {up,Up},{maj,MajoritySize}]},
%% io:format(user, "ZERF ~w\n",[machi_projection:make_summary(Proj2)]),
Proj2 Proj2
catch catch
throw:{zerf,no_common} -> throw:{zerf,no_common} ->
@ -2916,11 +2996,6 @@ perhaps_verbose_c111(P_latest2, S) ->
ok ok
end. end.
calc_consistency_mode(_Witness_list = []) ->
ap_mode;
calc_consistency_mode(_Witness_list) ->
cp_mode.
set_proj(S, Proj) -> set_proj(S, Proj) ->
S#ch_mgr{proj=Proj, proj_unanimous=false}. S#ch_mgr{proj=Proj, proj_unanimous=false}.
@ -2953,3 +3028,10 @@ get_unfit_list(FitnessServer) ->
[] []
end. end.
get_projection_store_pid_or_regname(#ch_mgr{name=MyName, opts=MgrOpts}) ->
case get_projection_store_regname(MgrOpts) of
undefined ->
machi_flu_psup:make_proj_supname(MyName);
PStr ->
PStr
end.

View file

@ -103,7 +103,8 @@ repair(ap_mode=ConsistencyMode, Src, Repairing, UPI, MembersDict, ETS, Opts) ->
Add = fun(Name, Pid) -> put(proxies_dict, orddict:store(Name, Pid, get(proxies_dict))) end, Add = fun(Name, Pid) -> put(proxies_dict, orddict:store(Name, Pid, get(proxies_dict))) end,
OurFLUs = lists:usort([Src] ++ Repairing ++ UPI), % AP assumption! OurFLUs = lists:usort([Src] ++ Repairing ++ UPI), % AP assumption!
RepairMode = proplists:get_value(repair_mode, Opts, repair), RepairMode = proplists:get_value(repair_mode, Opts, repair),
Verb = proplists:get_value(verbose, Opts, true), Verb = proplists:get_value(verbose, Opts, false),
RepairId = proplists:get_value(repair_id, Opts, id1),
Res = try Res = try
_ = [begin _ = [begin
{ok, Proxy} = machi_proxy_flu1_client:start_link(P), {ok, Proxy} = machi_proxy_flu1_client:start_link(P),
@ -116,31 +117,38 @@ repair(ap_mode=ConsistencyMode, Src, Repairing, UPI, MembersDict, ETS, Opts) ->
get_file_lists(Proxy, FLU, Dict) get_file_lists(Proxy, FLU, Dict)
end, D, ProxiesDict), end, D, ProxiesDict),
MissingFileSummary = make_missing_file_summary(D2, OurFLUs), MissingFileSummary = make_missing_file_summary(D2, OurFLUs),
?VERB("MissingFileSummary ~p\n", [MissingFileSummary]), %% ?VERB("~w MissingFileSummary ~p\n",[RepairId,MissingFileSummary]),
lager:info("Repair ~w MissingFileSummary ~p\n",
[RepairId, MissingFileSummary]),
[ets:insert(ETS, {{directive_bytes, FLU}, 0}) || FLU <- OurFLUs], [ets:insert(ETS, {{directive_bytes, FLU}, 0}) || FLU <- OurFLUs],
%% Repair files from perspective of Src, i.e. tail(UPI). %% Repair files from perspective of Src, i.e. tail(UPI).
SrcProxy = orddict:fetch(Src, ProxiesDict), SrcProxy = orddict:fetch(Src, ProxiesDict),
{ok, EpochID} = machi_proxy_flu1_client:get_epoch_id( {ok, EpochID} = machi_proxy_flu1_client:get_epoch_id(
SrcProxy, ?SHORT_TIMEOUT), SrcProxy, ?SHORT_TIMEOUT),
?VERB("Make repair directives: "), %% ?VERB("Make repair directives: "),
Ds = Ds =
[{File, make_repair_directives( [{File, make_repair_directives(
ConsistencyMode, RepairMode, File, Size, EpochID, ConsistencyMode, RepairMode, File, Size, EpochID,
Verb, Verb,
Src, OurFLUs, ProxiesDict, ETS)} || Src, OurFLUs, ProxiesDict, ETS)} ||
{File, {Size, _MissingList}} <- MissingFileSummary], {File, {Size, _MissingList}} <- MissingFileSummary],
?VERB(" done\n"), %% ?VERB(" done\n"),
lager:info("Repair ~w repair directives finished\n", [RepairId]),
[begin [begin
[{_, Bytes}] = ets:lookup(ETS, {directive_bytes, FLU}), [{_, Bytes}] = ets:lookup(ETS, {directive_bytes, FLU}),
?VERB("Out-of-sync data for FLU ~p: ~s MBytes\n", %% ?VERB("Out-of-sync data for FLU ~p: ~s MBytes\n",
[FLU, mbytes(Bytes)]) %% [FLU, mbytes(Bytes)]),
lager:info("Repair ~w "
"Out-of-sync data for FLU ~p: ~s MBytes\n",
[RepairId, FLU, mbytes(Bytes)])
end || FLU <- OurFLUs], end || FLU <- OurFLUs],
?VERB("Execute repair directives: "), %% ?VERB("Execute repair directives: "),
ok = execute_repair_directives(ConsistencyMode, Ds, Src, EpochID, ok = execute_repair_directives(ConsistencyMode, Ds, Src, EpochID,
Verb, OurFLUs, ProxiesDict, ETS), Verb, OurFLUs, ProxiesDict, ETS),
?VERB(" done\n"), %% ?VERB(" done\n"),
lager:info("Repair ~w repair directives finished\n", [RepairId]),
ok ok
catch catch
What:Why -> What:Why ->

View file

@ -332,7 +332,7 @@ runthru(_L, _O, _P) ->
false. false.
%% @doc If you want to find an overlap among two areas [x, y] and [a, %% @doc If you want to find an overlap among two areas [x, y] and [a,
%% b] where x < y and a < b; if (a-y)*(b-x) < 0 then there's a %% b] where x &lt; y and a &lt; b; if (a-y)*(b-x) &lt; 0 then there's a
%% overlap, else, > 0 then there're no overlap. border condition = 0 %% overlap, else, > 0 then there're no overlap. border condition = 0
%% is not overlap in this offset-size case. %% is not overlap in this offset-size case.
inclusion_match_spec(Offset, Size) -> inclusion_match_spec(Offset, Size) ->

View file

@ -44,7 +44,7 @@
-type projection() :: #projection_v1{}. -type projection() :: #projection_v1{}.
-type projection_type() :: 'public' | 'private'. -type projection_type() :: 'public' | 'private'.
%% @doc Tags that stand for how that checksum was generated. See %% Tags that stand for how that checksum was generated. See
%% machi_util:make_tagged_csum/{1,2} for further documentation and %% machi_util:make_tagged_csum/{1,2} for further documentation and
%% implementation. %% implementation.
-type csum_tag() :: none | client_sha | server_sha | server_regen_sha. -type csum_tag() :: none | client_sha | server_sha | server_regen_sha.

View file

@ -39,7 +39,8 @@
get_unfit_list/1, update_local_down_list/3, get_unfit_list/1, update_local_down_list/3,
add_admin_down/3, delete_admin_down/2, add_admin_down/3, delete_admin_down/2,
send_fitness_update_spam/3, send_fitness_update_spam/3,
send_spam_to_everyone/1]). send_spam_to_everyone/1,
trigger_early_adjustment/2]).
%% gen_server callbacks %% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
@ -81,6 +82,13 @@ send_fitness_update_spam(Pid, FromName, Dict) ->
send_spam_to_everyone(Pid) -> send_spam_to_everyone(Pid) ->
gen_server:call(Pid, {send_spam_to_everyone}, infinity). gen_server:call(Pid, {send_spam_to_everyone}, infinity).
%% @doc For testing purposes, we don't want a test to wait for
%% wall-clock time to elapse before the fitness server makes a
%% down->up status decision.
trigger_early_adjustment(Pid, FLU) ->
Pid ! {adjust_down_list, FLU}.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
init([{MyFluName}|Args]) -> init([{MyFluName}|Args]) ->

View file

@ -83,6 +83,8 @@
%% Supervisor callbacks %% Supervisor callbacks
-export([init/1]). -export([init/1]).
make_package_spec(#p_srvr{name=FluName, port=TcpPort, props=Props}) when is_list(Props) ->
make_package_spec({FluName, TcpPort, Props});
make_package_spec({FluName, TcpPort, Props}) when is_list(Props) -> make_package_spec({FluName, TcpPort, Props}) when is_list(Props) ->
FluDataDir = get_env(flu_data_dir, undefined_is_invalid), FluDataDir = get_env(flu_data_dir, undefined_is_invalid),
MyDataDir = filename:join(FluDataDir, atom_to_list(FluName)), MyDataDir = filename:join(FluDataDir, atom_to_list(FluName)),
@ -94,7 +96,7 @@ make_package_spec(FluName, TcpPort, DataDir, Props) ->
permanent, ?SHUTDOWN, supervisor, []}. permanent, ?SHUTDOWN, supervisor, []}.
start_flu_package(#p_srvr{name=FluName, port=TcpPort, props=Props}) -> start_flu_package(#p_srvr{name=FluName, port=TcpPort, props=Props}) ->
DataDir = get_data_dir(Props), DataDir = get_data_dir(FluName, Props),
start_flu_package(FluName, TcpPort, DataDir, Props). start_flu_package(FluName, TcpPort, DataDir, Props).
start_flu_package(FluName, TcpPort, DataDir, Props) -> start_flu_package(FluName, TcpPort, DataDir, Props) ->
@ -175,8 +177,11 @@ get_env(Setting, Default) ->
{ok, V} -> V {ok, V} -> V
end. end.
get_data_dir(Props) -> get_data_dir(FluName, Props) ->
case proplists:get_value(data_dir, Props) of case proplists:get_value(data_dir, Props) of
Path when is_list(Path) -> Path when is_list(Path) ->
Path Path;
undefined ->
{ok, Dir} = application:get_env(machi, flu_data_dir),
Dir ++ "/" ++ atom_to_list(FluName)
end. end.

View file

@ -21,6 +21,9 @@
%% @doc Supervisor for Machi FLU servers and their related support %% @doc Supervisor for Machi FLU servers and their related support
%% servers. %% servers.
%% %%
%% Responsibility for managing FLU and chain lifecycle after the initial
%% application startup is delegated to {@link machi_lifecycle_mgr}.
%%
%% See {@link machi_flu_psup} for an illustration of the entire Machi %% See {@link machi_flu_psup} for an illustration of the entire Machi
%% application process structure. %% application process structure.
@ -29,8 +32,11 @@
-behaviour(supervisor). -behaviour(supervisor).
-include("machi.hrl"). -include("machi.hrl").
-include("machi_projection.hrl").
-include("machi_verbose.hrl"). -include("machi_verbose.hrl").
-ifdef(TEST).
-compile(export_all).
-ifdef(PULSE). -ifdef(PULSE).
-compile({parse_transform, pulse_instrument}). -compile({parse_transform, pulse_instrument}).
-include_lib("pulse_otp/include/pulse_otp.hrl"). -include_lib("pulse_otp/include/pulse_otp.hrl").
@ -38,9 +44,12 @@
-else. -else.
-define(SHUTDOWN, 5000). -define(SHUTDOWN, 5000).
-endif. -endif.
-endif. %TEST
%% API %% API
-export([start_link/0]). -export([start_link/0,
get_initial_flus/0, load_rc_d_files_from_dir/1,
sanitize_p_srvr_records/1]).
%% Supervisor callbacks %% Supervisor callbacks
-export([init/1]). -export([init/1]).
@ -69,5 +78,66 @@ get_initial_flus() ->
[]. [].
-else. % PULSE -else. % PULSE
get_initial_flus() -> get_initial_flus() ->
application:get_env(machi, initial_flus, []). DoesNotExist = "/tmp/does/not/exist",
ConfigDir = case application:get_env(machi, flu_config_dir, DoesNotExist) of
DoesNotExist ->
DoesNotExist;
Dir ->
Dir
end,
Ps = [P || {_File, P} <- load_rc_d_files_from_dir(ConfigDir)],
sanitize_p_srvr_records(Ps).
-endif. % PULSE -endif. % PULSE
load_rc_d_files_from_dir(Dir) ->
Files = filelib:wildcard(Dir ++ "/*"),
[case file:consult(File) of
{ok, [X]} ->
{File, X};
_ ->
lager:warning("Error parsing file '~s', ignoring",
[File]),
{File, []}
end || File <- Files].
sanitize_p_srvr_records(Ps) ->
{Sane, _} = lists:foldl(fun sanitize_p_srvr_rec/2, {[], dict:new()}, Ps),
Sane.
sanitize_p_srvr_rec(Whole, {Acc, D}) ->
try
#p_srvr{name=Name,
proto_mod=PMod,
address=Address,
port=Port,
props=Props} = Whole,
true = is_atom(Name),
NameK = {name, Name},
error = dict:find(NameK, D),
true = is_atom(PMod),
case code:is_loaded(PMod) of
{file, _} ->
ok;
_ ->
{module, _} = code:load_file(PMod),
ok
end,
if is_list(Address) -> ok;
is_tuple(Address) -> ok % Erlang-style IPv4 or IPv6
end,
true = is_integer(Port) andalso Port >= 1024 andalso Port =< 65534,
PortK = {port, Port},
error = dict:find(PortK, D),
true = is_list(Props),
%% All is sane enough.
D2 = dict:store(NameK, Name,
dict:store(PortK, Port, D)),
{[Whole|Acc], D2}
catch _:_ ->
_ = lager:log(error, self(),
"~s: Bad (or duplicate name/port) p_srvr record, "
"skipping: ~P\n",
[?MODULE, Whole, 15]),
{Acc, D}
end.

1016
src/machi_lifecycle_mgr.erl Normal file

File diff suppressed because it is too large Load diff

View file

@ -58,7 +58,7 @@
count=0 :: non_neg_integer() count=0 :: non_neg_integer()
}). }).
%% @doc official error types that is specific in Machi %% Official error types that is specific in Machi
-type machi_client_error_reason() :: bad_arg | wedged | bad_checksum | -type machi_client_error_reason() :: bad_arg | wedged | bad_checksum |
partition | not_written | written | partition | not_written | written |
trimmed | no_such_file | partial_read | trimmed | no_such_file | partial_read |
@ -145,7 +145,7 @@ read_chunk(PidSpec, File, Offset, Size, Options, Timeout) ->
send_sync(PidSpec, {read_chunk, File, Offset, Size, Options}, Timeout). send_sync(PidSpec, {read_chunk, File, Offset, Size, Options}, Timeout).
%% @doc Trims arbitrary binary range of any file. If a specified range %% @doc Trims arbitrary binary range of any file. If a specified range
%% has any byte trimmed, it fails and returns `{error, trimmed}`. %% has any byte trimmed, it fails and returns `{error, trimmed}'.
%% Otherwise it trims all bytes in that range. If there are %% Otherwise it trims all bytes in that range. If there are
%% overlapping chunks with client-specified checksum, they will cut %% overlapping chunks with client-specified checksum, they will cut
%% off and checksum are re-calculated in server side. TODO: Add %% off and checksum are re-calculated in server side. TODO: Add

View file

@ -811,6 +811,7 @@ conv_to_epoch_id(#mpb_epochid{epoch_number=Epoch,
conv_to_projection_v1(#mpb_projectionv1{epoch_number=Epoch, conv_to_projection_v1(#mpb_projectionv1{epoch_number=Epoch,
epoch_csum=CSum, epoch_csum=CSum,
author_server=Author, author_server=Author,
chain_name=ChainName,
all_members=AllMembers, all_members=AllMembers,
witnesses=Witnesses, witnesses=Witnesses,
creation_time=CTime, creation_time=CTime,
@ -824,6 +825,7 @@ conv_to_projection_v1(#mpb_projectionv1{epoch_number=Epoch,
#projection_v1{epoch_number=Epoch, #projection_v1{epoch_number=Epoch,
epoch_csum=CSum, epoch_csum=CSum,
author_server=to_atom(Author), author_server=to_atom(Author),
chain_name=to_atom(ChainName),
all_members=[to_atom(X) || X <- AllMembers], all_members=[to_atom(X) || X <- AllMembers],
witnesses=[to_atom(X) || X <- Witnesses], witnesses=[to_atom(X) || X <- Witnesses],
creation_time=conv_to_now(CTime), creation_time=conv_to_now(CTime),
@ -953,7 +955,7 @@ conv_from_status({error, partial_read}) ->
conv_from_status({error, bad_epoch}) -> conv_from_status({error, bad_epoch}) ->
'BAD_EPOCH'; 'BAD_EPOCH';
conv_from_status(_OOPS) -> conv_from_status(_OOPS) ->
io:format(user, "HEY, ~s:~w got ~w\n", [?MODULE, ?LINE, _OOPS]), io:format(user, "HEY, ~s:~w got ~p\n", [?MODULE, ?LINE, _OOPS]),
'BAD_JOSS'. 'BAD_JOSS'.
conv_to_boolean(undefined) -> conv_to_boolean(undefined) ->
@ -971,6 +973,7 @@ conv_from_boolean(true) ->
conv_from_projection_v1(#projection_v1{epoch_number=Epoch, conv_from_projection_v1(#projection_v1{epoch_number=Epoch,
epoch_csum=CSum, epoch_csum=CSum,
author_server=Author, author_server=Author,
chain_name=ChainName,
all_members=AllMembers, all_members=AllMembers,
witnesses=Witnesses, witnesses=Witnesses,
creation_time=CTime, creation_time=CTime,
@ -984,6 +987,7 @@ conv_from_projection_v1(#projection_v1{epoch_number=Epoch,
#mpb_projectionv1{epoch_number=Epoch, #mpb_projectionv1{epoch_number=Epoch,
epoch_csum=CSum, epoch_csum=CSum,
author_server=to_list(Author), author_server=to_list(Author),
chain_name=to_list(ChainName),
all_members=[to_list(X) || X <- AllMembers], all_members=[to_list(X) || X <- AllMembers],
witnesses=[to_list(X) || X <- Witnesses], witnesses=[to_list(X) || X <- Witnesses],
creation_time=conv_from_now(CTime), creation_time=conv_from_now(CTime),

View file

@ -174,6 +174,7 @@ make_summary(#projection_v1{epoch_number=EpochNum,
repairing=Repairing_list, repairing=Repairing_list,
dbg=Dbg, dbg2=Dbg2}) -> dbg=Dbg, dbg2=Dbg2}) ->
[{epoch,EpochNum}, {csum,_CSum4}, [{epoch,EpochNum}, {csum,_CSum4},
{all, _All_list},
{author,Author}, {mode,CMode},{witnesses, Witness_list}, {author,Author}, {mode,CMode},{witnesses, Witness_list},
{upi,UPI_list},{repair,Repairing_list},{down,Down_list}] ++ {upi,UPI_list},{repair,Repairing_list},{down,Down_list}] ++
[{d,Dbg}, {d2,Dbg2}]. [{d,Dbg}, {d2,Dbg2}].

View file

@ -321,7 +321,7 @@ do_proj_write3(ProjType, #projection_v1{epoch_number=Epoch,
end. end.
do_proj_write4(ProjType, Proj, Path, Epoch, #state{consistency_mode=CMode}=S) -> do_proj_write4(ProjType, Proj, Path, Epoch, #state{consistency_mode=CMode}=S) ->
{ok, FH} = file:open(Path, [write, raw, binary]), {{ok, FH}, Epoch, Path} = {file:open(Path, [write, raw, binary]), Epoch, Path},
ok = file:write(FH, term_to_binary(Proj)), ok = file:write(FH, term_to_binary(Proj)),
ok = file:sync(FH), ok = file:sync(FH),
ok = file:close(FH), ok = file:close(FH),
@ -387,7 +387,6 @@ wait_for_liveness(PidSpec, StartTime, WaitTime) ->
undefined -> undefined ->
case timer:now_diff(os:timestamp(), StartTime) div 1000 of case timer:now_diff(os:timestamp(), StartTime) div 1000 of
X when X < WaitTime -> X when X < WaitTime ->
io:format(user, "\nYOO ~p ~p\n", [PidSpec, lists:sort(registered())]),
timer:sleep(1), timer:sleep(1),
wait_for_liveness(PidSpec, StartTime, WaitTime) wait_for_liveness(PidSpec, StartTime, WaitTime)
end; end;

View file

@ -62,9 +62,8 @@ init([]) ->
ServerSup = ServerSup =
{machi_flu_sup, {machi_flu_sup, start_link, []}, {machi_flu_sup, {machi_flu_sup, start_link, []},
Restart, Shutdown, Type, []}, Restart, Shutdown, Type, []},
LifecycleMgr =
{machi_lifecycle_mgr, {machi_lifecycle_mgr, start_link, []},
Restart, Shutdown, worker, []},
{ok, {SupFlags, [ServerSup]}}. {ok, {SupFlags, [ServerSup, LifecycleMgr]}}.
%% AChild = {'AName', {'AModule', start_link, []},
%% Restart, Shutdown, Type, ['AModule']},
%% {ok, {SupFlags, [AChild]}}.

View file

@ -407,8 +407,8 @@ stabilize(0, _T) ->
stabilize(_CmdsLen, #target{flu_names=FLUNames, mgr_names=MgrNames, stabilize(_CmdsLen, #target{flu_names=FLUNames, mgr_names=MgrNames,
verbose=Verbose}) -> verbose=Verbose}) ->
machi_partition_simulator:no_partitions(), machi_partition_simulator:no_partitions(),
wait_until_stable(chain_state_all_ok(FLUNames), FLUNames, MgrNames, true = wait_until_stable(chain_state_all_ok(FLUNames), FLUNames, MgrNames,
100, Verbose), 100, Verbose),
ok. ok.
chain_state_all_ok(FLUNames) -> chain_state_all_ok(FLUNames) ->

View file

@ -187,15 +187,18 @@ convergence_demo_testfun(NumFLUs, MgrOpts0) ->
end || #p_srvr{name=Name}=P <- Ps], end || #p_srvr{name=Name}=P <- Ps],
MembersDict = machi_projection:make_members_dict(Ps), MembersDict = machi_projection:make_members_dict(Ps),
Witnesses = proplists:get_value(witnesses, MgrOpts, []), Witnesses = proplists:get_value(witnesses, MgrOpts, []),
CMode = case {Witnesses, proplists:get_value(consistency_mode, MgrOpts,
ap_mode)} of
{[_|_], _} -> cp_mode;
{_, cp_mode} -> cp_mode;
{_, ap_mode} -> ap_mode
end,
MgrNamez = [begin MgrNamez = [begin
MgrName = machi_flu_psup:make_mgr_supname(Name), MgrName = machi_flu_psup:make_mgr_supname(Name),
ok = ?MGR:set_chain_members(MgrName,MembersDict,Witnesses), ok = ?MGR:set_chain_members(MgrName, ch_demo, 0, CMode,
MembersDict,Witnesses),
{Name, MgrName} {Name, MgrName}
end || #p_srvr{name=Name} <- Ps], end || #p_srvr{name=Name} <- Ps],
CpApMode = case Witnesses /= [] of
true -> cp_mode;
false -> ap_mode
end,
try try
[{_, Ma}|_] = MgrNamez, [{_, Ma}|_] = MgrNamez,
@ -303,9 +306,9 @@ convergence_demo_testfun(NumFLUs, MgrOpts0) ->
[{FLU, true} = {FLU, ?MGR:projection_transitions_are_sane_retrospective(Psx, FLU)} || [{FLU, true} = {FLU, ?MGR:projection_transitions_are_sane_retrospective(Psx, FLU)} ||
{FLU, Psx} <- PrivProjs] {FLU, Psx} <- PrivProjs]
catch catch
_Err:_What when CpApMode == cp_mode -> _Err:_What when CMode == cp_mode ->
io:format(user, "none proj skip detected, TODO? ", []); io:format(user, "none proj skip detected, TODO? ", []);
_Err:_What when CpApMode == ap_mode -> _Err:_What when CMode == ap_mode ->
io:format(user, "PrivProjs ~p\n", [PrivProjs]), io:format(user, "PrivProjs ~p\n", [PrivProjs]),
exit({line, ?LINE, _Err, _What}) exit({line, ?LINE, _Err, _What})
end, end,
@ -371,9 +374,9 @@ timer:sleep(1234),
{FLU, Psx} <- PrivProjs], {FLU, Psx} <- PrivProjs],
io:format(user, "\nAll sanity checks pass, hooray!\n", []) io:format(user, "\nAll sanity checks pass, hooray!\n", [])
catch catch
_Err:_What when CpApMode == cp_mode -> _Err:_What when CMode == cp_mode ->
io:format(user, "none proj skip detected, TODO? ", []); io:format(user, "none proj skip detected, TODO? ", []);
_Err:_What when CpApMode == ap_mode -> _Err:_What when CMode == ap_mode ->
io:format(user, "Report ~p\n", [Report]), io:format(user, "Report ~p\n", [Report]),
io:format(user, "PrivProjs ~p\n", [PrivProjs]), io:format(user, "PrivProjs ~p\n", [PrivProjs]),
exit({line, ?LINE, _Err, _What}) exit({line, ?LINE, _Err, _What})

View file

@ -273,6 +273,17 @@ make_prop_ets() ->
-endif. % EQC -endif. % EQC
make_advance_fun(FitList, FLUList, MgrList, Num) ->
fun() ->
[begin
[catch machi_fitness:trigger_early_adjustment(Fit, Tgt) ||
Fit <- FitList,
Tgt <- FLUList ],
[catch ?MGR:trigger_react_to_env(Mgr) || Mgr <- MgrList],
ok
end || _ <- lists:seq(1, Num)]
end.
smoke0_test() -> smoke0_test() ->
{ok, _} = machi_partition_simulator:start_link({1,2,3}, 50, 50), {ok, _} = machi_partition_simulator:start_link({1,2,3}, 50, 50),
Host = "localhost", Host = "localhost",
@ -332,25 +343,39 @@ smoke1_test2() ->
ok = machi_partition_simulator:stop() ok = machi_partition_simulator:stop()
end. end.
nonunanimous_setup_and_fix_test() -> nonunanimous_setup_and_fix_test_() ->
os:cmd("rm -f /tmp/moomoo.*"),
{timeout, 1*60, fun() -> nonunanimous_setup_and_fix_test2() end}.
nonunanimous_setup_and_fix_test2() ->
error_logger:tty(false),
machi_partition_simulator:start_link({1,2,3}, 100, 0), machi_partition_simulator:start_link({1,2,3}, 100, 0),
TcpPort = 62877, TcpPort = 62877,
FluInfo = [{a,TcpPort+0,"./data.a"}, {b,TcpPort+1,"./data.b"}], FluInfo = [{a,TcpPort+0,"./data.a"}, {b,TcpPort+1,"./data.b"},
{c,TcpPort+2,"./data.c"}],
P_s = [#p_srvr{name=Name, address="localhost", port=Port} || P_s = [#p_srvr{name=Name, address="localhost", port=Port} ||
{Name,Port,_Dir} <- FluInfo], {Name,Port,_Dir} <- FluInfo],
[machi_flu1_test:clean_up_data_dir(Dir) || {_,_,Dir} <- FluInfo], [machi_flu1_test:clean_up_data_dir(Dir) || {_,_,Dir} <- FluInfo],
{ok, SupPid} = machi_flu_sup:start_link(), {ok, SupPid} = machi_flu_sup:start_link(),
Opts = [{active_mode, false}], Opts = [{active_mode, false}, {initial_wedged, true}],
%% {ok, Mb} = ?MGR:start_link(b, MembersDict, [{active_mode, false}]++XX), ChainName = my_little_chain,
[{ok,_}=machi_flu_psup:start_flu_package(Name, Port, Dir, Opts) || [{ok,_}=machi_flu_psup:start_flu_package(Name, Port, Dir, Opts) ||
{Name,Port,Dir} <- FluInfo], {Name,Port,Dir} <- FluInfo],
[Proxy_a, Proxy_b] = Proxies = Proxies = [Proxy_a, Proxy_b, Proxy_c] =
[element(2,?FLU_PC:start_link(P)) || P <- P_s], [element(2,?FLU_PC:start_link(P)) || P <- P_s],
MembersDict = machi_projection:make_members_dict(P_s), %% MembersDict = machi_projection:make_members_dict(P_s),
[Ma,Mb] = [a_chmgr, b_chmgr], MembersDict = machi_projection:make_members_dict(lists:sublist(P_s, 2)),
ok = machi_chain_manager1:set_chain_members(Ma, MembersDict, []), Mgrs = [Ma,Mb,Mc] = [a_chmgr, b_chmgr, c_chmgr],
ok = machi_chain_manager1:set_chain_members(Mb, MembersDict, []), MgrProxies = [{Ma, Proxy_a}, {Mb, Proxy_b}, {Mc, Proxy_c}],
Advance = make_advance_fun([a_fitness,b_fitness,c_fitness],
[a,b,c],
[Mgr || {Mgr,_Proxy} <- MgrProxies],
3),
ok = machi_chain_manager1:set_chain_members(Ma, ChainName, 0, ap_mode,
MembersDict, []),
ok = machi_chain_manager1:set_chain_members(Mb, ChainName, 0, ap_mode,
MembersDict, []),
try try
{ok, P1} = ?MGR:test_calc_projection(Ma, false), {ok, P1} = ?MGR:test_calc_projection(Ma, false),
@ -387,16 +412,121 @@ nonunanimous_setup_and_fix_test() ->
{ok, P2pb} = ?FLU_PC:read_latest_projection(Proxy_b, private), {ok, P2pb} = ?FLU_PC:read_latest_projection(Proxy_b, private),
P2 = P2pb#projection_v1{dbg2=[]}, P2 = P2pb#projection_v1{dbg2=[]},
%% Pspam = machi_projection:update_checksum( %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% P1b#projection_v1{epoch_number=?SPAM_PROJ_EPOCH, io:format("\nSTEP: Add a 3rd member to the chain.\n", []),
%% dbg=[hello_spam]}),
%% ok = ?FLU_PC:write_projection(Proxy_b, public, Pspam),
MembersDict3 = machi_projection:make_members_dict(P_s),
ok = machi_chain_manager1:set_chain_members(
Ma, ChainName, EpochNum_a, ap_mode, MembersDict3, []),
Advance(),
{_, _, TheEpoch_3} = ?MGR:trigger_react_to_env(Ma),
{_, _, TheEpoch_3} = ?MGR:trigger_react_to_env(Mb),
{_, _, TheEpoch_3} = ?MGR:trigger_react_to_env(Mc),
[{ok, #projection_v1{upi=[a,b], repairing=[c]}} =
?FLU_PC:read_latest_projection(Pxy, private) || Pxy <- Proxies],
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
io:format("STEP: Remove 'a' from the chain.\n", []),
MembersDict4 = machi_projection:make_members_dict(tl(P_s)),
ok = machi_chain_manager1:set_chain_members(
Mb, ChainName, TheEpoch_3, ap_mode, MembersDict4, []),
Advance(),
{ok, {true, _}} = ?FLU_PC:wedge_status(Proxy_a),
{_, _, TheEpoch_4} = ?MGR:trigger_react_to_env(Mb),
{_, _, TheEpoch_4} = ?MGR:trigger_react_to_env(Mc),
[{ok, #projection_v1{upi=[b], repairing=[c]}} =
?FLU_PC:read_latest_projection(Pxy, private) || Pxy <- tl(Proxies)],
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
io:format("STEP: Add a to the chain again (a is running).\n", []),
MembersDict5 = machi_projection:make_members_dict(P_s),
ok = machi_chain_manager1:set_chain_members(
Mb, ChainName, TheEpoch_4, ap_mode, MembersDict5, []),
Advance(),
{_, _, TheEpoch_5} = ?MGR:trigger_react_to_env(Ma),
{_, _, TheEpoch_5} = ?MGR:trigger_react_to_env(Mb),
{_, _, TheEpoch_5} = ?MGR:trigger_react_to_env(Mc),
[{ok, #projection_v1{upi=[b], repairing=[a,c]}} =
?FLU_PC:read_latest_projection(Pxy, private) || Pxy <- Proxies],
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
io:format("STEP: Stop a while a chain member, advance b&c.\n", []),
ok = machi_flu_psup:stop_flu_package(a),
Advance(),
{_, _, TheEpoch_6} = ?MGR:trigger_react_to_env(Mb),
{_, _, TheEpoch_6} = ?MGR:trigger_react_to_env(Mc),
[{ok, #projection_v1{upi=[b], repairing=[c]}} =
?FLU_PC:read_latest_projection(Pxy, private) || Pxy <- tl(Proxies)],
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
io:format("STEP: Remove 'a' from the chain.\n", []),
MembersDict7 = machi_projection:make_members_dict(tl(P_s)),
ok = machi_chain_manager1:set_chain_members(
Mb, ChainName, TheEpoch_6, ap_mode, MembersDict7, []),
Advance(),
{_, _, TheEpoch_7} = ?MGR:trigger_react_to_env(Mb),
{_, _, TheEpoch_7} = ?MGR:trigger_react_to_env(Mc),
[{ok, #projection_v1{upi=[b], repairing=[c]}} =
?FLU_PC:read_latest_projection(Pxy, private) || Pxy <- tl(Proxies)],
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
io:format("STEP: Start a, advance.\n", []),
[{ok,_}=machi_flu_psup:start_flu_package(Name, Port, Dir, Opts) ||
{Name,Port,Dir} <- [hd(FluInfo)]],
Advance(),
{ok, {true, _}} = ?FLU_PC:wedge_status(Proxy_a),
{ok, {false, EpochID_8}} = ?FLU_PC:wedge_status(Proxy_b),
{ok, {false, EpochID_8}} = ?FLU_PC:wedge_status(Proxy_c),
[{ok, #projection_v1{upi=[b], repairing=[c]}} =
?FLU_PC:read_latest_projection(Pxy, private) || Pxy <- tl(Proxies)],
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
io:format("STEP: Stop a, delete a's data, leave it stopped\n", []),
ok = machi_flu_psup:stop_flu_package(a),
Advance(),
{_,_,Dir_a} = hd(FluInfo),
[machi_flu1_test:clean_up_data_dir(Dir) || {_,_,Dir} <- [hd(FluInfo)]],
{ok, {false, _}} = ?FLU_PC:wedge_status(Proxy_b),
{ok, {false, _}} = ?FLU_PC:wedge_status(Proxy_c),
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
io:format("STEP: Add a to the chain again (a is stopped).\n", []),
MembersDict9 = machi_projection:make_members_dict(P_s),
{_, _, TheEpoch_9} = ?MGR:trigger_react_to_env(Mb),
ok = machi_chain_manager1:set_chain_members(
Mb, ChainName, TheEpoch_9, ap_mode, MembersDict9, []),
Advance(),
{_, _, TheEpoch_9b} = ?MGR:trigger_react_to_env(Mb),
true = (TheEpoch_9b > TheEpoch_9),
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
io:format("STEP: Start a, and it joins like it ought to\n", []),
[{ok,_}=machi_flu_psup:start_flu_package(Name, Port, Dir, Opts) ||
{Name,Port,Dir} <- [hd(FluInfo)]],
Advance(),
{ok, {false, {TheEpoch10,_}}} = ?FLU_PC:wedge_status(Proxy_a),
{ok, {false, {TheEpoch10,_}}} = ?FLU_PC:wedge_status(Proxy_b),
{ok, {false, {TheEpoch10,_}}} = ?FLU_PC:wedge_status(Proxy_c),
[{ok, #projection_v1{upi=[b], repairing=[c,a]}} =
?FLU_PC:read_latest_projection(Pxy, private) || Pxy <- Proxies],
ok ok
after after
exit(SupPid, normal), exit(SupPid, normal),
[ok = ?FLU_PC:quit(X) || X <- Proxies], [ok = ?FLU_PC:quit(X) || X <- Proxies],
ok = machi_partition_simulator:stop() ok = machi_partition_simulator:stop(),
error_logger:tty(true)
end. end.
unanimous_report_test() -> unanimous_report_test() ->

View file

@ -58,9 +58,15 @@ setup_smoke_test(Host, PortBase, Os, Witness_list) ->
%% 4. Wait until all others are using epoch id from #3. %% 4. Wait until all others are using epoch id from #3.
%% %%
%% Damn, this is a pain to make 100% deterministic, bleh. %% Damn, this is a pain to make 100% deterministic, bleh.
ok = machi_chain_manager1:set_chain_members(a_chmgr, D, Witness_list), CMode = if Witness_list == [] -> ap_mode;
ok = machi_chain_manager1:set_chain_members(b_chmgr, D, Witness_list), Witness_list /= [] -> cp_mode
ok = machi_chain_manager1:set_chain_members(c_chmgr, D, Witness_list), end,
ok = machi_chain_manager1:set_chain_members(a_chmgr, ch0, 0, CMode,
D, Witness_list),
ok = machi_chain_manager1:set_chain_members(b_chmgr, ch0, 0, CMode,
D, Witness_list),
ok = machi_chain_manager1:set_chain_members(c_chmgr, ch0, 0, CMode,
D, Witness_list),
run_ticks([a_chmgr,b_chmgr,c_chmgr]), run_ticks([a_chmgr,b_chmgr,c_chmgr]),
%% Everyone is settled on the same damn epoch id. %% Everyone is settled on the same damn epoch id.
{ok, EpochID} = machi_flu1_client:get_latest_epochid(Host, PortBase+0, {ok, EpochID} = machi_flu1_client:get_latest_epochid(Host, PortBase+0,

View file

@ -30,6 +30,22 @@
-define(FLU, machi_flu1). -define(FLU, machi_flu1).
-define(FLU_C, machi_flu1_client). -define(FLU_C, machi_flu1_client).
get_env_vars(App, Ks) ->
Raw = [application:get_env(App, K) || K <- Ks],
Old = lists:zip(Ks, Raw),
{App, Old}.
clean_up_env_vars({App, Old}) ->
[case Res of
undefined ->
application:unset_env(App, K);
{ok, V} ->
application:set_env(App, K, V)
end || {K, Res} <- Old].
filter_env_var({ok, V}) -> V;
filter_env_var(Else) -> Else.
clean_up_data_dir(DataDir) -> clean_up_data_dir(DataDir) ->
[begin [begin
Fs = filelib:wildcard(DataDir ++ Glob), Fs = filelib:wildcard(DataDir ++ Glob),

View file

@ -173,6 +173,19 @@ partial_stop_restart2() ->
ok ok
end. end.
p_srvr_rec_test() ->
P = #p_srvr{name=a, address="localhost", port=1024, props=[yo]},
[P] = machi_flu_sup:sanitize_p_srvr_records([P]),
[P] = machi_flu_sup:sanitize_p_srvr_records([P,P]),
[] = machi_flu_sup:sanitize_p_srvr_records([nope]),
[] = machi_flu_sup:sanitize_p_srvr_records([#p_srvr{proto_mod=does_not_exist}]),
[] = machi_flu_sup:sanitize_p_srvr_records([#p_srvr{proto_mod="lists"}]),
[] = machi_flu_sup:sanitize_p_srvr_records([#p_srvr{address=7}]),
[] = machi_flu_sup:sanitize_p_srvr_records([#p_srvr{port=5}]),
[] = machi_flu_sup:sanitize_p_srvr_records([#p_srvr{port=foo}]),
[] = machi_flu_sup:sanitize_p_srvr_records([#p_srvr{props=foo}]),
ok.
-endif. % !PULSE -endif. % !PULSE
-endif. % TEST -endif. % TEST

View file

@ -0,0 +1,307 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2014 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(machi_lifecycle_mgr_test).
-compile(export_all).
-ifdef(TEST).
-ifndef(PULSE).
-include_lib("eunit/include/eunit.hrl").
-include("machi.hrl").
-include("machi_projection.hrl").
-define(MGR, machi_chain_manager1).
setup() ->
catch application:stop(machi),
{ok, SupPid} = machi_sup:start_link(),
error_logger:tty(false),
Dir = "./" ++ atom_to_list(?MODULE) ++ ".datadir",
machi_flu1_test:clean_up_data_dir(Dir ++ "/*/*"),
machi_flu1_test:clean_up_data_dir(Dir),
Envs = [{flu_data_dir, Dir ++ "/data/flu"},
{flu_config_dir, Dir ++ "/etc/flu-config"},
{chain_config_dir, Dir ++ "/etc/chain-config"},
{platform_data_dir, Dir ++ "/data"},
{platform_etc_dir, Dir ++ "/etc"},
{not_used_pending, Dir ++ "/etc/pending"}
],
EnvKeys = [K || {K,_V} <- Envs],
undefined = application:get_env(machi, yo),
Cleanup = machi_flu1_test:get_env_vars(machi, EnvKeys ++ [yo]),
[begin
filelib:ensure_dir(V ++ "/unused"),
application:set_env(machi, K, V)
end || {K, V} <- Envs],
{SupPid, Dir, Cleanup}.
cleanup({SupPid, Dir, Cleanup}) ->
exit(SupPid, normal),
machi_util:wait_for_death(SupPid, 100),
error_logger:tty(true),
catch application:stop(machi),
machi_flu1_test:clean_up_data_dir(Dir ++ "/*/*"),
machi_flu1_test:clean_up_data_dir(Dir),
machi_flu1_test:clean_up_env_vars(Cleanup),
undefined = application:get_env(machi, yo),
ok.
smoke_test_() ->
{timeout, 60, fun() -> smoke_test2() end}.
smoke_test2() ->
YoCleanup = setup(),
try
Prefix = <<"pre">>,
Chunk1 = <<"yochunk">>,
Host = "localhost",
PortBase = 60120,
Pa = #p_srvr{name=a,address="localhost",port=PortBase+0},
Pb = #p_srvr{name=b,address="localhost",port=PortBase+1},
Pc = #p_srvr{name=c,address="localhost",port=PortBase+2},
%% Pstore_a = machi_flu1:make_projection_server_regname(a),
%% Pstore_b = machi_flu1:make_projection_server_regname(b),
%% Pstore_c = machi_flu1:make_projection_server_regname(c),
Pstores = [Pstore_a, Pstore_b, Pstore_c] =
[machi_flu1:make_projection_server_regname(a),
machi_flu1:make_projection_server_regname(b),
machi_flu1:make_projection_server_regname(c)],
ChMgrs = [ChMgr_a, ChMgr_b, ChMgr_c] =
[machi_chain_manager1:make_chmgr_regname(a),
machi_chain_manager1:make_chmgr_regname(b),
machi_chain_manager1:make_chmgr_regname(c)],
Fits = [Fit_a, Fit_b, Fit_c] =
[machi_flu_psup:make_fitness_regname(a),
machi_flu_psup:make_fitness_regname(b),
machi_flu_psup:make_fitness_regname(c)],
Advance = machi_chain_manager1_test:make_advance_fun(
Fits, [a,b,c], ChMgrs, 3),
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
io:format("\nSTEP: Start 3 FLUs, no chain.\n", []),
[machi_lifecycle_mgr:make_pending_config(P) || P <- [Pa,Pb,Pc] ],
{[_,_,_],[]} = machi_lifecycle_mgr:process_pending(),
[{ok, #projection_v1{epoch_number=0}} =
machi_projection_store:read_latest_projection(PSTORE, private)
|| PSTORE <- Pstores],
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
io:format("\nSTEP: Start chain = [a,b,c]\n", []),
C1 = #chain_def_v1{name=cx, mode=ap_mode, full=[Pa,Pb,Pc],
local_run=[a,b,c]},
machi_lifecycle_mgr:make_pending_config(C1),
{[],[_]} = machi_lifecycle_mgr:process_pending(),
Advance(),
[{ok, #projection_v1{all_members=[a,b,c]}} =
machi_projection_store:read_latest_projection(PSTORE, private)
|| PSTORE <- Pstores],
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
io:format("\nSTEP: Reset chain = [b,c]\n", []),
C2 = #chain_def_v1{name=cx, mode=ap_mode, full=[Pb,Pc],
old_full=[a,b,c], old_witnesses=[],
local_stop=[a], local_run=[b,c]},
machi_lifecycle_mgr:make_pending_config(C2),
{[],[_]} = machi_lifecycle_mgr:process_pending(),
Advance(),
%% a should be down
{'EXIT', _} = (catch machi_projection_store:read_latest_projection(
hd(Pstores), private)),
[{ok, #projection_v1{all_members=[b,c]}} =
machi_projection_store:read_latest_projection(PSTORE, private)
|| PSTORE <- tl(Pstores)],
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
io:format("\nSTEP: Reset chain = []\n", []),
C3 = #chain_def_v1{name=cx, mode=ap_mode, full=[],
old_full=[b,c], old_witnesses=[],
local_stop=[b,c], local_run=[]},
machi_lifecycle_mgr:make_pending_config(C3),
{[],[_]} = machi_lifecycle_mgr:process_pending(),
Advance(),
%% a,b,c should be down
[{'EXIT', _} = (catch machi_projection_store:read_latest_projection(
PSTORE, private))
|| PSTORE <- Pstores],
ok
after
cleanup(YoCleanup)
end.
ast_tuple_syntax_test() ->
T = fun(L) -> machi_lifecycle_mgr:check_ast_tuple_syntax(L) end,
Canon1 = [ {host, "localhost", []},
{host, "localhost", [{client_interface, "1.2.3.4"},
{admin_interface, "5.6.7.8"}]},
{flu, 'fx', "foohost", 4000, []},
switch_old_and_new,
{chain, 'cy', ['fx', 'fy'], [{foo,"yay"},{bar,baz}]} ],
{_Good,[]=_Bad} = T(Canon1),
Canon1_norm = machi_lifecycle_mgr:normalize_ast_tuple_syntax(Canon1),
true = (length(Canon1) == length(Canon1_norm)),
{Canon1_norm_b, []} = T(Canon1_norm),
true = (length(Canon1_norm) == length(Canon1_norm_b)),
{[],[_,_,_,_]} =
T([ {host, 'localhost', []},
{host, 'localhost', yo},
{host, "localhost", [{client_interface, 77.88293829832}]},
{host, "localhost", [{client_interface, "1.2.3.4"},
{bummer, "5.6.7.8"}]} ]),
{[],[_,_,_,_,_,_]} =
T([ {flu, 'fx', 'foohost', 4000, []},
{flu, 'fx', <<"foohost">>, 4000, []},
{flu, 'fx', "foohost", -4000, []},
{flu, 'fx', "foohost", 40009999, []},
{flu, 'fx', "foohost", 4000, gack},
{flu, 'fx', "foohost", 4000, [22]} ]),
{[],[_,_,_]} =
T([ {chain, 'cy', ["fx", "fy"], [foo,{bar,baz}]},
yoloyolo,
{chain, "cy", ["fx", 27], oops,arity,way,way,way,too,big,x}
]).
ast_run_test() ->
PortBase = 20300,
R1 = [
{host, "localhost", "localhost", "localhost", []},
{flu, 'f0', "localhost", PortBase+0, []},
{flu, 'f1', "localhost", PortBase+1, []},
{chain, 'ca', ['f0'], []},
{chain, 'cb', ['f1'], []},
switch_old_and_new,
{flu, 'f2', "localhost", PortBase+2, []},
{flu, 'f3', "localhost", PortBase+3, []},
{flu, 'f4', "localhost", PortBase+4, []},
{chain, 'ca', ['f0', 'f2'], []},
{chain, 'cc', ['f3', 'f4'], []}
],
{ok, Env1} = machi_lifecycle_mgr:run_ast(R1),
%% Uncomment to examine the Env trees.
%% Y1 = {lists:sort(gb_trees:to_list(element(1, Env1))),
%% lists:sort(gb_trees:to_list(element(2, Env1))),
%% element(3, Env1)},
%% io:format(user, "\nY1 ~p\n", [Y1]),
Negative_after_R1 =
[
{host, "localhost", "foo", "foo", []}, % dupe host
{flu, 'f1', "other", PortBase+9999999, []}, % bogus port # (syntax)
{flu, 'f1', "other", PortBase+888, []}, % dupe flu name
{flu, 'f7', "localhost", PortBase+1, []}, % dupe host+port
{chain, 'ca', ['f7'], []}, % unknown flu
{chain, 'cc', ['f0'], []}, % flu previously assigned
{chain, 'ca', cp_mode, ['f0', 'f1', 'f2'], [], []} % mode change
],
[begin
%% io:format(user, "dbg: Neg ~p\n", [Neg]),
{error, _} = machi_lifecycle_mgr:run_ast(R1 ++ [Neg])
end || Neg <- Negative_after_R1],
%% The 'run' phase doesn't blow smoke. What about 'diff'?
{X1a, X1b} = machi_lifecycle_mgr:diff_env(Env1, "localhost"),
%% There's only one host, "localhost", so 'all' should be exactly equal.
{X1a, X1b} = machi_lifecycle_mgr:diff_env(Env1, all),
%% io:format(user, "X1b: ~p\n", [X1b]),
%% Append to the R1 scenario: for chain cc: add f5, remove f4
%% Expect: see pattern matching below on X2b.
R2 = (R1 -- [switch_old_and_new]) ++
[switch_old_and_new,
{flu, 'f5', "localhost", PortBase+5, []},
{chain, 'cc', ['f3','f5'], []}],
{ok, Env2} = machi_lifecycle_mgr:run_ast(R2),
{_X2a, X2b} = machi_lifecycle_mgr:diff_env(Env2, "localhost"),
%% io:format(user, "X2b: ~p\n", [X2b]),
F5_port = PortBase+5,
[#p_srvr{name='f5',address="localhost",port=F5_port},
#chain_def_v1{name='cc',
full=[#p_srvr{name='f3'},#p_srvr{name='f5'}], witnesses=[],
old_full=[f3,f4], old_witnesses=[],
local_run=[f5], local_stop=[f4]}] = X2b,
ok.
ast_then_apply_test_() ->
{timeout, 60, fun() -> ast_then_apply_test2() end}.
ast_then_apply_test2() ->
YoCleanup = setup(),
try
PortBase = 20400,
NumChains = 4,
ChainLen = 3,
FLU_num = NumChains * ChainLen,
FLU_defs = [{flu, list_to_atom("f"++integer_to_list(X)),
"localhost", PortBase+X, []} || X <- lists:seq(1,FLU_num)],
FLU_names = [FLU || {flu,FLU,_,_,_} <- FLU_defs],
Ch_defs = [{chain, list_to_atom("c"++integer_to_list(X)),
lists:sublist(FLU_names, X, 3),
[]} || X <- lists:seq(1, FLU_num, 3)],
R1 = [switch_old_and_new,
{host, "localhost", "localhost", "localhost", []}]
++ FLU_defs ++ Ch_defs,
{ok, Env1} = machi_lifecycle_mgr:run_ast(R1),
{_X1a, X1b} = machi_lifecycle_mgr:diff_env(Env1, "localhost"),
%% io:format(user, "X1b ~p\n", [X1b]),
[machi_lifecycle_mgr:make_pending_config(X) || X <- X1b],
{PassFLUs, PassChains} = machi_lifecycle_mgr:process_pending(),
true = (length(PassFLUs) == length(FLU_defs)),
true = (length(PassChains) == length(Ch_defs)),
%% Kick the chain managers into doing something useful right now.
Pstores = [list_to_atom(atom_to_list(X) ++ "_pstore") || X <- FLU_names],
Fits = [list_to_atom(atom_to_list(X) ++ "_fitness") || X <- FLU_names],
ChMgrs = [list_to_atom(atom_to_list(X) ++ "_chmgr") || X <- FLU_names],
Advance = machi_chain_manager1_test:make_advance_fun(
Fits, FLU_names, ChMgrs, 3),
Advance(),
%% Sanity check: everyone is configured properly.
[begin
{ok, #projection_v1{epoch_number=Epoch, all_members=All,
chain_name=ChainName, upi=UPI}} =
machi_projection_store:read_latest_projection(PStore, private),
%% io:format(user, "~p: epoch ~p all ~p\n", [PStore, Epoch, All]),
true = Epoch > 0,
ChainLen = length(All),
true = (length(UPI) > 0),
{chain, _, Full, []} = lists:keyfind(ChainName, 2, Ch_defs),
true = lists:sort(Full) == lists:sort(All)
end || PStore <- Pstores],
ok
after
cleanup(YoCleanup)
end.
-endif. % !PULSE
-endif. % TEST

View file

@ -49,6 +49,8 @@ smoke_test2() ->
end || P <- Ps], end || P <- Ps],
ok = machi_chain_manager1:set_chain_members(a_chmgr, D), ok = machi_chain_manager1:set_chain_members(a_chmgr, D),
[machi_chain_manager1:trigger_react_to_env(a_chmgr) || _ <-lists:seq(1,5)], [machi_chain_manager1:trigger_react_to_env(a_chmgr) || _ <-lists:seq(1,5)],
{ok, PQQ} = machi_projection_store:read_latest_projection(a_pstore, public),
io:format(user, "a's proj: ~w\n", [machi_projection:make_summary(PQQ)]),
{ok, Clnt} = ?C:start_link(Ps), {ok, Clnt} = ?C:start_link(Ps),
try try