Merge branch 'slf/otp-refactoring-step2' (more commentary follows)

It's Friday, so this is an end-of-week merge.  This week has focused
on the chain manager.   I ended up doing more refactoring than I'd
expected in order to lift it out of it's "one node, talk to everything
by distributed Erlang, run inside a not-quite-PULSE-but-still-quite-
restricted simulator" and into some OTP sunlight + communicate by
generic point-to-point TCP connections (same ASCII protocol as
demo day, no change there) + capable of running without all of the
simulator control.

I'm happy to say that it appears to work as well as it does inside
of the simulator.  Having said that, the branch of experimental
work that I chose to integrate has some problems making transitions
when asymmetric network splits happen.  But those appear fixable.
Next week.  ^_^
This commit is contained in:
Scott Lystig Fritchie 2015-04-10 22:54:32 +09:00
commit 7eead876c8
28 changed files with 4079 additions and 223 deletions

5
.gitignore vendored
View file

@ -5,7 +5,4 @@ deps
erl_crash.dump
.concrete/DEV_MODE
.rebar
doc/edoc-info
doc/erlang.png
doc/*.html
doc/stylesheet.css
edoc

View file

@ -3,7 +3,7 @@ ifeq ($(REBAR_BIN),)
REBAR_BIN = ./rebar
endif
.PHONY: rel deps package pkgclean
.PHONY: rel deps package pkgclean edoc
all: deps compile
@ -21,6 +21,12 @@ test: deps compile eunit
eunit:
$(REBAR_BIN) -v skip_deps=true eunit
edoc: edoc-clean
$(REBAR_BIN) skip_deps=true doc
edoc-clean:
rm -f edoc/*.png edoc/*.html edoc/*.css edoc/edoc-info
pulse: compile
env USE_PULSE=1 $(REBAR_BIN) skip_deps=true clean compile
env USE_PULSE=1 $(REBAR_BIN) skip_deps=true -D PULSE eunit

12
doc/README.md Normal file
View file

@ -0,0 +1,12 @@
## Machi Documentation Overview
For a Web-browsable version of a snapshot of the source doc "EDoc"
Erlang documentation, please use this link:
[Machi EDoc snapshot](https://basho.github.io/machi/edoc/).
## Documents in this directory
* __chain-self-management-sketch.org__ is an introduction to the
self-management algorithm proposed for Machi. This algorithm is
(hoped to be) sufficient for managing the Chain Replication state of a
Machi cluster.

170
doc/overview.edoc Normal file
View file

@ -0,0 +1,170 @@
@title Machi: a small village of replicated files
@doc
== About This EDoc Documentation ==
This EDoc-style documentation will concern itself only with Erlang
function APIs and function & data types. Higher-level design and
commentary will remain outside of the Erlang EDoc system; please see
the "Pointers to Other Machi Documentation" section below for more
details.
Readers should beware that this documentation may be out-of-sync with
the source code. When in doubt, use the `make edoc' command to
regenerate all HTML pages.
It is the developer's responsibility to re-generate the documentation
periodically and commit it to the Git repo.
== Machi Code Overview ==
=== Chain Manager ===
The Chain Manager is responsible for managing the state of Machi's
"Chain Replication" state. This role is roughly analogous to the
"Riak Core" application inside of Riak, which takes care of
coordinating replica placement and replica repair.
For each primitive data server in the cluster, a Machi FLU, there is a
Chain Manager process that manages its FLU's role within the Machi
cluster's Chain Replication scheme. Each Chain Manager process
executes locally and independently to manage the distributed state of
a single Machi Chain Replication chain.
<ul>
<li> To contrast with Riak Core ... Riak Core's claimant process is
solely responsible for managing certain critical aspects of
Riak Core distributed state. Machi's Chain Manager process
performs similar tasks as Riak Core's claimant. However, Machi
has several active Chain Manager processes, one per FLU server,
instead of a single active process like Core's claimant. Each
Chain Manager process acts independently; each is constrained
so that it will reach consensus via independent computation
&amp; action.
Full discussion of this distributed consensus is outside the
scope of this document; see the "Pointers to Other Machi
Documentation" section below for more information.
</li>
<li> Machi differs from a Riak Core application because Machi's
replica placement policy is simply, "All Machi servers store
replicas of all Machi files".
Machi is intended to be a primitive building block for creating larger
cluster-of-clusters where files are
distributed/fragmented/sharded across a large pool of
independent Machi clusters.
</li>
<li> See
[https://www.usenix.org/legacy/events/osdi04/tech/renesse.html]
for a copy of the paper, "Chain Replication for Supporting High
Throughput and Availability" by Robbert van Renesse and Fred
B. Schneider.
</li>
</ul>
=== FLU ===
The FLU is the basic storage server for Machi.
<ul>
<li> The name FLU is taken from "flash storage unit" from the paper
"CORFU: A Shared Log Design for Flash Clusters" by
Balakrishnan, Malkhi, Prabhakaran, and Wobber. See
[https://www.usenix.org/conference/nsdi12/technical-sessions/presentation/balakrishnan]
</li>
<li> In CORFU, the sequencer step is a prerequisite step that is
performed by a separate component, the Sequencer.
In Machi, the `append_chunk()' protocol message has
an implicit "sequencer" operation applied by the "head" of the
Machi Chain Replication chain. If a client wishes to write
data that has already been assigned a sequencer position, then
the `write_chunk()' API function is used.
</li>
</ul>
For each FLU, there are three independent tasks that are implemented
using three different Erlang processes:
<ul>
<li> A FLU server, implemented primarily by `machi_flu.erl'.
</li>
<li> A projection store server, implemented primarily by
`machi_projection_store.erl'.
</li>
<li> A chain state manager server, implemented primarily by
`machi_chain_manager1.erl'.
</li>
</ul>
From the perspective of failure detection, it is very convenient that
all three FLU-related services (file server, sequencer server, and
projection server) are accessed using the same single TCP port.
=== Projection (data structure) ===
The projection is a data structure that specifies the current state
of the Machi cluster: all FLUs, which FLUS are considered
up/running or down/crashed/stopped, which FLUs are actively
participants in the Chain Replication protocol, and which FLUs are
under "repair" (i.e., having their data resyncronized when
newly-added to a cluster or when restarting after a crash).
=== Projection Store (server) ===
The projection store is a storage service that is implemented by an
Erlang/OTP `gen_server' process that is associated with each
FLU. Conceptually, the projection store is an array of
write-once registers. For each projection store register, the
key is a 2-tuple of an epoch number (`non_neg_integer()' type)
and a projection type (`public' or `private' type); the value is
a projection data structure (`projection_v1()' type).
=== Client and Proxy Client ===
Machi is intentionally avoiding using distributed Erlang for Machi's
communication. This design decision makes Erlang-side code more
difficult &amp; complex but allows us the freedom of implementing
parts of Machi in other languages without major
protocol&amp;API&amp;glue code changes later in the product's
lifetime.
There are two layers of interface for Machi clients.
<ul>
<li> The `machi_flu1_client' module implements an API that uses a
TCP socket directly.
</li>
<li> The `machi_proxy_flu1_client' module implements an API that
uses a local, long-lived `gen_server' process as a proxy for
the remote, perhaps disconnected-or-crashed Machi FLU server.
</li>
</ul>
The types for both modules ought to be the same. However, due to
rapid code churn, some differences might exist. Any major difference
is (almost by definition) a bug: please open a GitHub issue to request
a correction.
== TODO notes ==
Any use of the string "TODO" in upper/lower/mixed case, anywhere in
the code, is a reminder signal of unfinished work.
== Pointers to Other Machi Documentation ==
<ul>
<li> If you are viewing this document locally, please look in the
`../doc/' directory,
</li>
<li> If you are viewing this document via the Web, please find the
documentation via this link:
[http://github.com/basho/machi/tree/master/doc/]
Please be aware that this link points to the `master' branch
of the Machi source repository and therefore may be
out-of-sync with non-`master' branch code.
</li>
</ul>

View file

@ -0,0 +1,41 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-include("machi_projection.hrl").
-define(NOT_FLAPPING, {0,0,0}).
-type projection() :: #projection_v1{}.
-record(ch_mgr, {
name :: pv1_server(),
flap_limit :: non_neg_integer(),
proj :: projection(),
%%
timer :: 'undefined' | timer:tref(),
proj_history :: queue:queue(),
flaps=0 :: integer(),
flap_start = ?NOT_FLAPPING
:: erlang:timestamp(),
runenv :: list(), %proplist()
opts :: list(), %proplist()
members_dict :: p_srvr_dict(),
proxies_dict :: orddict:orddict()
}).

View file

@ -18,30 +18,15 @@
%%
%% -------------------------------------------------------------------
-ifndef(MACHI_PROJECTION_HRL).
-define(MACHI_PROJECTION_HRL, true).
-type pv1_csum() :: binary().
-type pv1_epoch() :: {pv1_epoch_n(), pv1_csum()}.
-type pv1_epoch_n() :: non_neg_integer().
-type pv1_server() :: atom() | binary().
-type pv1_timestamp() :: {non_neg_integer(), non_neg_integer(), non_neg_integer()}.
-define(DUMMY_PV1_EPOCH, {0,<<0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0>>}).
-record(projection_v1, {
epoch_number :: pv1_epoch_n(),
epoch_csum :: pv1_csum(),
all_members :: [pv1_server()],
member_dict :: orddict:orddict(),
down :: [pv1_server()],
creation_time :: pv1_timestamp(),
author_server :: pv1_server(),
upi :: [pv1_server()],
repairing :: [pv1_server()],
dbg :: list(), %proplist(), is checksummed
dbg2 :: list() %proplist(), is not checksummed
}).
-define(MACHI_DEFAULT_TCP_PORT, 50000).
-record(p_srvr, {
name :: pv1_server(),
proto = 'ipv4' :: 'ipv4' | 'disterl', % disterl? Hrm.
@ -50,4 +35,27 @@
props = [] :: list() % proplist for other related info
}).
-type p_srvr() :: #p_srvr{}.
-type p_srvr_dict() :: orddict:orddict().
-define(DUMMY_PV1_EPOCH, {0,<<0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0>>}).
-record(projection_v1, {
epoch_number :: pv1_epoch_n(),
epoch_csum :: pv1_csum(),
author_server :: pv1_server(),
creation_time :: pv1_timestamp(),
all_members :: [pv1_server()],
down :: [pv1_server()],
upi :: [pv1_server()],
repairing :: [pv1_server()],
dbg :: list(), %proplist(), is checksummed
dbg2 :: list(), %proplist(), is not checksummed
members_dict :: p_srvr_dict()
}).
-define(MACHI_DEFAULT_TCP_PORT, 50000).
-define(SHA_MAX, (1 bsl (20*8))).
-endif. % !MACHI_PROJECTION_HRL

BIN
rebar vendored

Binary file not shown.

View file

@ -1,5 +1,8 @@
{require_otp_vsn, "17"}.
%%% {erl_opts, [warnings_as_errors, {parse_transform, lager_transform}, debug_info]}.
{erl_opts, [{parse_transform, lager_transform}, debug_info]}.
{edoc_opts, [{dir, "./edoc"}]}.
{deps, [
{lager, ".*", {git, "git://github.com/basho/lager.git", {tag, "2.0.1"}}}

View file

@ -18,6 +18,8 @@
%%
%% -------------------------------------------------------------------
%% @doc Machi chain replication administration utilities.
-module(machi_admin_util).
%% TODO Move these types to a common header file? (also machi_flu1_client.erl?)
@ -114,7 +116,7 @@ verify_chunk_checksum(File, ReadChunk) ->
fun({Offset, Size, CSum}, Acc) ->
case ReadChunk(File, Offset, Size) of
{ok, Chunk} ->
CSum2 = machi_util:checksum(Chunk),
CSum2 = machi_util:checksum_chunk(Chunk),
if CSum == CSum2 ->
Acc;
true ->

View file

@ -18,6 +18,8 @@
%%
%% -------------------------------------------------------------------
%% @doc Top-level supervisor for the Machi application.
-module(machi_app).
-behaviour(application).

1609
src/machi_chain_manager1.erl Normal file

File diff suppressed because it is too large Load diff

View file

@ -16,9 +16,13 @@
%%%
%%%-------------------------------------------------------------------
%% Consistent hashing library. Also known as "random slicing".
%% Originally from the Hibari DB source code at https://github.com/hibari
%% @doc Consistent hashing library. Also known as "random slicing".
%%
%% This code was originally from the Hibari DB source code at
%% [https://github.com/hibari]
-module(machi_chash).
%% TODO items:
%%
%% 1. Refactor to use bigints instead of floating point numbers. The
@ -26,8 +30,6 @@
%% much wiggle-room for making really small hashing range
%% definitions.
-module(machi_chash).
-define(SMALLEST_SIGNIFICANT_FLOAT_SIZE, 0.1e-12).
-define(SHA_MAX, (1 bsl (20*8))).

View file

@ -18,6 +18,33 @@
%%
%% -------------------------------------------------------------------
%% @doc The Machi FLU file server + file location sequencer.
%%
%% This module implements only the Machi FLU file server and its
%% implicit sequencer.
%% Please see the EDoc "Overview" for details about the FLU as a
%% primitive file server process vs. the larger Machi design of a FLU
%% as a sequencer + file server + chain manager group of processes.
%%
%% For the moment, this module also implements a rudimentary TCP-based
%% protocol as the sole supported access method to the server,
%% sequencer, and projection store. Conceptually, those three
%% services are independent and ought to have their own protocols. As
%% a practical matter, there is no need for wire protocol
%% compatibility. Furthermore, from the perspective of failure
%% detection, it is very convenient that all three FLU-related
%% services are accessed using the same single TCP port.
%%
%% The FLU is named after the CORFU server "FLU" or "FLash Unit" server.
%%
%% TODO There is one major missing feature in this FLU implementation:
%% there is no "write-once" enforcement for any position in a Machi
%% file. At the moment, we rely on correct behavior of the client
%% &amp; the sequencer to avoid overwriting data. In the Real World,
%% however, all Machi file data is supposed to be exactly write-once
%% to avoid problems with bugs, wire protocol corruption, malicious
%% clients, etc.
-module(machi_flu1).
-include_lib("kernel/include/file.hrl").
@ -46,7 +73,7 @@ start_link([{FluName, TcpPort, DataDir}|Rest])
stop(Pid) ->
case erlang:is_process_alive(Pid) of
true ->
Pid ! forever,
Pid ! killme,
ok;
false ->
error
@ -86,7 +113,11 @@ main2(RegName, TcpPort, DataDir, Rest) ->
put(flu_append_pid, AppendPid),
put(flu_projection_pid, ProjectionPid),
put(flu_listen_pid, ListenPid),
receive forever -> ok end.
receive killme -> ok end,
(catch exit(AppendPid, kill)),
(catch exit(ProjectionPid, kill)),
(catch exit(ListenPid, kill)),
ok.
start_listen_server(S) ->
spawn_link(fun() -> run_listen_server(S) end).
@ -214,7 +245,7 @@ do_net_server_append2(RegName, Sock, LenHex, Prefix) ->
<<Len:32/big>> = machi_util:hexstr_to_bin(LenHex),
ok = inet:setopts(Sock, [{packet, raw}]),
{ok, Chunk} = gen_tcp:recv(Sock, Len, 60*1000),
CSum = machi_util:checksum(Chunk),
CSum = machi_util:checksum_chunk(Chunk),
try
RegName ! {seq_append, self(), Prefix, Chunk, CSum}
catch error:badarg ->
@ -296,7 +327,7 @@ do_net_server_write2(Sock, OffsetHex, LenHex, FileBin, DataDir, FHc) ->
DoItFun = fun(FHd, Offset, Len) ->
ok = inet:setopts(Sock, [{packet, raw}]),
{ok, Chunk} = gen_tcp:recv(Sock, Len),
CSum = machi_util:checksum(Chunk),
CSum = machi_util:checksum_chunk(Chunk),
case file:pwrite(FHd, Offset, Chunk) of
ok ->
CSumHex = machi_util:bin_to_hexstr(CSum),
@ -494,11 +525,14 @@ run_seq_append_server2(Prefix, DataDir) ->
end.
seq_append_server_loop(DataDir, Prefix, FileNum) ->
SequencerNameHack = lists:flatten(io_lib:format(
"~.36B~.36B",
-spec seq_name_hack() -> string().
seq_name_hack() ->
lists:flatten(io_lib:format("~.36B~.36B",
[element(3,now()),
list_to_integer(os:getpid())])),
list_to_integer(os:getpid())])).
seq_append_server_loop(DataDir, Prefix, FileNum) ->
SequencerNameHack = seq_name_hack(),
{File, FullPath} = machi_util:make_data_filename(
DataDir, Prefix, SequencerNameHack, FileNum),
{ok, FHd} = file:open(FullPath,
@ -576,12 +610,12 @@ handle_projection_command({read_projection, ProjType, Epoch},
handle_projection_command({write_projection, ProjType, Proj},
#state{proj_store=ProjStore}) ->
machi_projection_store:write(ProjStore, ProjType, Proj);
handle_projection_command({get_all, ProjType},
handle_projection_command({get_all_projections, ProjType},
#state{proj_store=ProjStore}) ->
machi_projection_store:get_all(ProjStore, ProjType);
handle_projection_command({list_all, ProjType},
machi_projection_store:get_all_projections(ProjStore, ProjType);
handle_projection_command({list_all_projections, ProjType},
#state{proj_store=ProjStore}) ->
machi_projection_store:list_all(ProjStore, ProjType);
machi_projection_store:list_all_projections(ProjStore, ProjType);
handle_projection_command(Else, _S) ->
{error, unknown_cmd, Else}.

View file

@ -18,6 +18,8 @@
%%
%% -------------------------------------------------------------------
%% @doc Erlang API for the Machi FLU TCP protocol version 1.
-module(machi_flu1_client).
-include("machi.hrl").
@ -35,8 +37,8 @@
read_latest_projection/2, read_latest_projection/3,
read_projection/3, read_projection/4,
write_projection/3, write_projection/4,
get_all/2, get_all/3,
list_all/2, list_all/3,
get_all_projections/2, get_all_projections/3,
list_all_projections/2, list_all_projections/3,
%% Common API
quit/1
@ -54,7 +56,7 @@
-type chunk_pos() :: {file_offset(), chunk_size(), file_name_s()}.
-type chunk_size() :: non_neg_integer().
-type epoch_csum() :: binary().
-type epoch_num() :: non_neg_integer().
-type epoch_num() :: -1 | non_neg_integer().
-type epoch_id() :: {epoch_num(), epoch_csum()}.
-type file_info() :: {file_size(), file_name_s()}.
-type file_name() :: binary() | list().
@ -151,19 +153,19 @@ list_files(Host, TcpPort, EpochID) when is_integer(TcpPort) ->
catch gen_tcp:close(Sock)
end.
%% @doc Get the latest epoch number from the FLU's projection store.
%% @doc Get the latest epoch number + checksum from the FLU's projection store.
-spec get_latest_epoch(port(), projection_type()) ->
{ok, -1|non_neg_integer()} | {error, term()}.
{ok, epoch_id()} | {error, term()}.
get_latest_epoch(Sock, ProjType)
when ProjType == 'public' orelse ProjType == 'private' ->
get_latest_epoch2(Sock, ProjType).
%% @doc Get the latest epoch number from the FLU's projection store.
%% @doc Get the latest epoch number + checksum from the FLU's projection store.
-spec get_latest_epoch(inet_host(), inet_port(),
projection_type()) ->
{ok, -1|non_neg_integer()} | {error, term()}.
{ok, epoch_id()} | {error, term()}.
get_latest_epoch(Host, TcpPort, ProjType)
when ProjType == 'public' orelse ProjType == 'private' ->
Sock = machi_util:connect(Host, TcpPort),
@ -173,7 +175,7 @@ get_latest_epoch(Host, TcpPort, ProjType)
catch gen_tcp:close(Sock)
end.
%% @doc Get the latest epoch number from the FLU's projection store.
%% @doc Get the latest projection from the FLU's projection store for `ProjType'
-spec read_latest_projection(port(), projection_type()) ->
{ok, projection()} | {error, not_written} | {error, term()}.
@ -181,7 +183,7 @@ read_latest_projection(Sock, ProjType)
when ProjType == 'public' orelse ProjType == 'private' ->
read_latest_projection2(Sock, ProjType).
%% @doc Get the latest epoch number from the FLU's projection store.
%% @doc Get the latest projection from the FLU's projection store for `ProjType'
-spec read_latest_projection(inet_host(), inet_port(),
projection_type()) ->
@ -243,44 +245,44 @@ write_projection(Host, TcpPort, ProjType, Proj)
%% @doc Get all projections from the FLU's projection store.
-spec get_all(port(), projection_type()) ->
-spec get_all_projections(port(), projection_type()) ->
{ok, [projection()]} | {error, term()}.
get_all(Sock, ProjType)
get_all_projections(Sock, ProjType)
when ProjType == 'public' orelse ProjType == 'private' ->
get_all2(Sock, ProjType).
get_all_projections2(Sock, ProjType).
%% @doc Get all projections from the FLU's projection store.
-spec get_all(inet_host(), inet_port(),
-spec get_all_projections(inet_host(), inet_port(),
projection_type()) ->
{ok, [projection()]} | {error, term()}.
get_all(Host, TcpPort, ProjType)
get_all_projections(Host, TcpPort, ProjType)
when ProjType == 'public' orelse ProjType == 'private' ->
Sock = machi_util:connect(Host, TcpPort),
try
get_all2(Sock, ProjType)
get_all_projections2(Sock, ProjType)
after
catch gen_tcp:close(Sock)
end.
%% @doc Get all epoch numbers from the FLU's projection store.
-spec list_all(port(), projection_type()) ->
-spec list_all_projections(port(), projection_type()) ->
{ok, [non_neg_integer()]} | {error, term()}.
list_all(Sock, ProjType)
list_all_projections(Sock, ProjType)
when ProjType == 'public' orelse ProjType == 'private' ->
list_all2(Sock, ProjType).
list_all_projections2(Sock, ProjType).
%% @doc Get all epoch numbers from the FLU's projection store.
-spec list_all(inet_host(), inet_port(),
-spec list_all_projections(inet_host(), inet_port(),
projection_type()) ->
{ok, [non_neg_integer()]} | {error, term()}.
list_all(Host, TcpPort, ProjType)
list_all_projections(Host, TcpPort, ProjType)
when ProjType == 'public' orelse ProjType == 'private' ->
Sock = machi_util:connect(Host, TcpPort),
try
list_all2(Sock, ProjType)
list_all_projections2(Sock, ProjType)
after
catch gen_tcp:close(Sock)
end.
@ -365,9 +367,10 @@ trunc_hack(Host, TcpPort, EpochID, File) when is_integer(TcpPort) ->
%%%%%%%%%%%%%%%%%%%%%%%%%%%
append_chunk2(Sock, EpochID, Prefix0, Chunk0) ->
erase(bad_sock),
try
%% TODO: add client-side checksum to the server's protocol
%% _ = crypto:hash(md5, Chunk),
%% _ = machi_util:checksum_chunk(Chunk),
Prefix = machi_util:make_binary(Prefix0),
Chunk = machi_util:make_binary(Chunk0),
Len = iolist_size(Chunk0),
@ -391,12 +394,16 @@ append_chunk2(Sock, EpochID, Prefix0, Chunk0) ->
end
catch
throw:Error ->
put(bad_sock, Sock),
Error;
error:{badmatch,_}=BadMatch ->
put(bad_sock, Sock),
{error, {badmatch, BadMatch, erlang:get_stacktrace()}}
end.
read_chunk2(Sock, EpochID, File0, Offset, Size) ->
erase(bad_sock),
try
{EpochNum, EpochCSum} = EpochID,
EpochIDRaw = <<EpochNum:(4*8)/big, EpochCSum/binary>>,
File = machi_util:make_binary(File0),
@ -430,8 +437,16 @@ read_chunk2(Sock, EpochID, File0, Offset, Size) ->
{error, Else2}
end;
_ ->
{error, {whaaa, <<Else/binary, Else2/binary>>}}
{error, {whaaa_todo, <<Else/binary, Else2/binary>>}}
end
end
catch
throw:Error ->
put(bad_sock, Sock),
Error;
error:{badmatch,_}=BadMatch ->
put(bad_sock, Sock),
{error, {badmatch, BadMatch, erlang:get_stacktrace()}}
end.
list2(Sock, EpochID) ->
@ -462,6 +477,7 @@ list3(Else, _Sock) ->
throw({server_protocol_error, Else}).
checksum_list2(Sock, EpochID, File) ->
erase(bad_sock),
try
{EpochNum, EpochCSum} = EpochID,
EpochIDRaw = <<EpochNum:(4*8)/big, EpochCSum/binary>>,
@ -484,8 +500,10 @@ checksum_list2(Sock, EpochID, File) ->
end
catch
throw:Error ->
put(bad_sock, Sock),
Error;
error:{badmatch,_}=BadMatch ->
put(bad_sock, Sock),
{error, {badmatch, BadMatch}}
end.
@ -515,11 +533,12 @@ checksum_list_finish(Chunks) ->
Line /= <<>>].
write_chunk2(Sock, EpochID, File0, Offset, Chunk0) ->
erase(bad_sock),
try
{EpochNum, EpochCSum} = EpochID,
EpochIDRaw = <<EpochNum:(4*8)/big, EpochCSum/binary>>,
%% TODO: add client-side checksum to the server's protocol
%% _ = crypto:hash(md5, Chunk),
%% _ = machi_util:checksum_chunk(Chunk),
File = machi_util:make_binary(File0),
true = (Offset >= ?MINIMUM_OFFSET),
OffsetHex = machi_util:int_to_hexbin(Offset, 64),
@ -542,12 +561,15 @@ write_chunk2(Sock, EpochID, File0, Offset, Chunk0) ->
end
catch
throw:Error ->
put(bad_sock, Sock),
Error;
error:{badmatch,_}=BadMatch ->
put(bad_sock, Sock),
{error, {badmatch, BadMatch, erlang:get_stacktrace()}}
end.
delete_migration2(Sock, EpochID, File) ->
erase(bad_sock),
try
{EpochNum, EpochCSum} = EpochID,
EpochIDRaw = <<EpochNum:(4*8)/big, EpochCSum/binary>>,
@ -566,12 +588,15 @@ delete_migration2(Sock, EpochID, File) ->
end
catch
throw:Error ->
put(bad_sock, Sock),
Error;
error:{badmatch,_}=BadMatch ->
put(bad_sock, Sock),
{error, {badmatch, BadMatch}}
end.
trunc_hack2(Sock, EpochID, File) ->
erase(bad_sock),
try
{EpochNum, EpochCSum} = EpochID,
EpochIDRaw = <<EpochNum:(4*8)/big, EpochCSum/binary>>,
@ -590,8 +615,10 @@ trunc_hack2(Sock, EpochID, File) ->
end
catch
throw:Error ->
put(bad_sock, Sock),
Error;
error:{badmatch,_}=BadMatch ->
put(bad_sock, Sock),
{error, {badmatch, BadMatch}}
end.
@ -611,15 +638,16 @@ write_projection2(Sock, ProjType, Proj) ->
ProjCmd = {write_projection, ProjType, Proj},
do_projection_common(Sock, ProjCmd).
get_all2(Sock, ProjType) ->
ProjCmd = {get_all, ProjType},
get_all_projections2(Sock, ProjType) ->
ProjCmd = {get_all_projections, ProjType},
do_projection_common(Sock, ProjCmd).
list_all2(Sock, ProjType) ->
ProjCmd = {list_all, ProjType},
list_all_projections2(Sock, ProjType) ->
ProjCmd = {list_all_projections, ProjType},
do_projection_common(Sock, ProjCmd).
do_projection_common(Sock, ProjCmd) ->
erase(bad_sock),
try
ProjCmdBin = term_to_binary(ProjCmd),
Len = iolist_size(ProjCmdBin),
@ -641,7 +669,9 @@ do_projection_common(Sock, ProjCmd) ->
end
catch
throw:Error ->
put(bad_sock, Sock),
Error;
error:{badmatch,_}=BadMatch ->
put(bad_sock, Sock),
{error, {badmatch, BadMatch, erlang:get_stacktrace()}}
end.

View file

@ -18,6 +18,9 @@
%%
%% -------------------------------------------------------------------
%% @doc Supervisor for Machi FLU servers and their related support
%% servers.
-module(machi_flu_sup).
-behaviour(supervisor).

View file

@ -18,47 +18,47 @@
%%
%% -------------------------------------------------------------------
%% @doc API for manipulating Machi projection data structures (i.e., records).
-module(machi_projection).
-include("machi_projection.hrl").
-export([
new/6, new/7, new/8,
update_projection_checksum/1,
update_projection_dbg2/2,
update_checksum/1,
update_dbg2/2,
compare/2,
make_projection_summary/1
make_summary/1,
make_members_dict/1
]).
new(MyName, All_list, UPI_list, Down_list, Repairing_list, Ps) ->
new(0, MyName, All_list, Down_list, UPI_list, Repairing_list, Ps).
%% @doc Create a new projection record.
new(EpochNum, MyName, All_list, Down_list, UPI_list, Repairing_list, Dbg) ->
new(EpochNum, MyName, All_list, Down_list, UPI_list, Repairing_list,
new(MyName, MemberDict, UPI_list, Down_list, Repairing_list, Ps) ->
new(0, MyName, MemberDict, Down_list, UPI_list, Repairing_list, Ps).
%% @doc Create a new projection record.
new(EpochNum, MyName, MemberDict, Down_list, UPI_list, Repairing_list, Dbg) ->
new(EpochNum, MyName, MemberDict, Down_list, UPI_list, Repairing_list,
Dbg, []).
new(EpochNum, MyName, All_list0, Down_list, UPI_list, Repairing_list,
%% @doc Create a new projection record.
%%
%% The `MemberDict0' argument may be a true `p_srvr_dict()' (i.e, it
%% is a well-formed `orddict' with the correct 2-tuple key-value form)
%% or it may be simply `list(p_srvr())', in which case we'll convert it
%% to a `p_srvr_dict()'.
new(EpochNum, MyName, MembersDict0, Down_list, UPI_list, Repairing_list,
Dbg, Dbg2)
when is_integer(EpochNum), EpochNum >= 0,
is_atom(MyName) orelse is_binary(MyName),
is_list(All_list0), is_list(Down_list), is_list(UPI_list),
is_list(MembersDict0), is_list(Down_list), is_list(UPI_list),
is_list(Repairing_list), is_list(Dbg), is_list(Dbg2) ->
{All_list, MemberDict} =
case lists:all(fun(P) when is_record(P, p_srvr) -> true;
(_) -> false
end, All_list0) of
true ->
All = [S#p_srvr.name || S <- All_list0],
TmpL = [{S#p_srvr.name, S} || S <- All_list0],
{All, orddict:from_list(TmpL)};
false ->
All_list1 = lists:zip(All_list0,lists:seq(0,length(All_list0)-1)),
All_list2 = [#p_srvr{name=S, address="localhost",
port=?MACHI_DEFAULT_TCP_PORT+I} ||
{S, I} <- All_list1],
TmpL = [{S#p_srvr.name, S} || S <- All_list2],
{All_list0, orddict:from_list(TmpL)}
end,
MembersDict = make_members_dict(MembersDict0),
All_list = [Name || {Name, _P} <- MembersDict],
true = lists:all(fun(X) when is_atom(X) orelse is_binary(X) -> true;
(_) -> false
end, All_list),
@ -79,23 +79,34 @@ new(EpochNum, MyName, All_list0, Down_list, UPI_list, Repairing_list,
creation_time=now(),
author_server=MyName,
all_members=All_list,
member_dict=MemberDict,
members_dict=MembersDict,
down=Down_list,
upi=UPI_list,
repairing=Repairing_list,
dbg=Dbg
},
update_projection_dbg2(update_projection_checksum(P), Dbg2).
update_dbg2(update_checksum(P), Dbg2).
update_projection_checksum(P) ->
%% @doc Update the checksum element of a projection record.
update_checksum(P) ->
CSum = crypto:hash(sha,
term_to_binary(P#projection_v1{epoch_csum= <<>>,
dbg2=[]})),
P#projection_v1{epoch_csum=CSum}.
update_projection_dbg2(P, Dbg2) when is_list(Dbg2) ->
%% @doc Update the `dbg2' element of a projection record.
update_dbg2(P, Dbg2) when is_list(Dbg2) ->
P#projection_v1{dbg2=Dbg2}.
%% @doc Compare two projection records for equality (assuming that the
%% checksum element has been correctly calculated).
%%
%% The name "compare" is probably too close to "rank"? This
%% comparison has nothing to do with projection ranking.
%% TODO: change the name of this function?
-spec compare(#projection_v1{}, #projection_v1{}) ->
integer().
compare(#projection_v1{epoch_number=E1, epoch_csum=C1},
@ -107,7 +118,9 @@ compare(#projection_v1{epoch_number=E1},
E1 > E2 -> 1
end.
make_projection_summary(#projection_v1{epoch_number=EpochNum,
%% @doc Create a proplist-style summary of a projection record.
make_summary(#projection_v1{epoch_number=EpochNum,
all_members=_All_list,
down=Down_list,
author_server=Author,
@ -117,3 +130,36 @@ make_projection_summary(#projection_v1{epoch_number=EpochNum,
[{epoch,EpochNum},{author,Author},
{upi,UPI_list},{repair,Repairing_list},{down,Down_list},
{d,Dbg}, {d2,Dbg2}].
%% @doc Make a `p_srvr_dict()' out of a list of `p_srvr()' or out of a
%% `p_srvr_dict()'.
%%
%% If `Ps' is a `p_srvr_dict()', then this function is usually a
%% no-op. However, if someone has tampered with the list and screwed
%% up its order, then we should fix it so `orddict' can work
%% correctly.
%%
%% If `Ps' is simply `list(p_srvr())', in which case we'll convert it
%% to a `p_srvr_dict()'.
-spec make_members_dict(list(p_srvr()) | p_srvr_dict()) ->
p_srvr_dict().
make_members_dict(Ps) ->
F_rec = fun(P) when is_record(P, p_srvr) -> true;
(_) -> false
end,
F_tup = fun({_K, P}) when is_record(P, p_srvr) -> true;
(_) -> false
end,
case lists:all(F_rec, Ps) of
true ->
orddict:from_list([{P#p_srvr.name, P} || P <- Ps]);
false ->
case lists:all(F_tup, Ps) of
true ->
orddict:from_list(Ps);
false ->
F_neither = fun(X) -> not (F_rec(X) or F_tup(X)) end,
exit({badarg, {make_members_dict, lists:filter(F_neither, Ps)}})
end
end.

View file

@ -18,6 +18,25 @@
%%
%% -------------------------------------------------------------------
%% @doc The Machi write-once projection store service.
%%
%% This API is gen_server-style message passing, intended for use
%% within a single Erlang node to glue together the projection store
%% server with the node-local process that implements Machi's TCP
%% client access protocol (on the "server side" of the TCP connection).
%%
%% All Machi client access to the projection store SHOULD NOT use this
%% module's API.
%%
%% The projection store is implemented by an Erlang/OTP `gen_server'
%% process that is associated with each FLU. Conceptually, the
%% projection store is an array of write-once registers. For each
%% projection store register, the key is a 2-tuple of an epoch number
%% (`non_neg_integer()' type) and a projection type (`public' or
%% `private' type); the value is a projection data structure
%% (`projection_v1()' type).
-module(machi_projection_store).
-include("machi_projection.hrl").
@ -29,52 +48,79 @@
read_latest_projection/2, read_latest_projection/3,
read/3, read/4,
write/3, write/4,
get_all/2, get_all/3,
list_all/2, list_all/3
get_all_projections/2, get_all_projections/3,
list_all_projections/2, list_all_projections/3
]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-define(NO_EPOCH, {-1,<<0:(20*8)/big>>}).
-record(state, {
public_dir = "" :: string(),
private_dir = "" :: string(),
wedged = true :: boolean(),
wedge_notify_pid :: pid() | atom(),
max_public_epoch = -1 :: -1 | non_neg_integer(),
max_private_epoch = -1 :: -1 | non_neg_integer()
max_public_epoch = ?NO_EPOCH :: {-1 | non_neg_integer(), binary()},
max_private_epoch = ?NO_EPOCH :: {-1 | non_neg_integer(), binary()}
}).
%% @doc Start a new projection store server.
%%
%% The `DataDir' argument should be the same directory as specified
%% for use by our companion FLU data server -- all file system paths
%% used by this server are intended to be stored underneath a common
%% file system parent directory as the FLU data server &amp; sequencer
%% servers.
start_link(RegName, DataDir, NotifyWedgeStateChanges) ->
gen_server:start_link({local, RegName},
?MODULE, [DataDir, NotifyWedgeStateChanges], []).
%% @doc Fetch the latest epoch number + checksum for type `ProjType'.
get_latest_epoch(PidSpec, ProjType) ->
get_latest_epoch(PidSpec, ProjType, infinity).
%% @doc Fetch the latest epoch number + checksum for type `ProjType'.
%% projection.
get_latest_epoch(PidSpec, ProjType, Timeout)
when ProjType == 'public' orelse ProjType == 'private' ->
g_call(PidSpec, {get_latest_epoch, ProjType}, Timeout).
%% @doc Fetch the latest projection record for type `ProjType'.
read_latest_projection(PidSpec, ProjType) ->
read_latest_projection(PidSpec, ProjType, infinity).
%% @doc Fetch the latest projection record for type `ProjType'.
read_latest_projection(PidSpec, ProjType, Timeout)
when ProjType == 'public' orelse ProjType == 'private' ->
g_call(PidSpec, {read_latest_projection, ProjType}, Timeout).
%% @doc Fetch the projection record type `ProjType' for epoch number `Epoch' .
read(PidSpec, ProjType, Epoch) ->
read(PidSpec, ProjType, Epoch, infinity).
%% @doc Fetch the projection record type `ProjType' for epoch number `Epoch' .
read(PidSpec, ProjType, Epoch, Timeout)
when ProjType == 'public' orelse ProjType == 'private',
is_integer(Epoch), Epoch >= 0 ->
g_call(PidSpec, {read, ProjType, Epoch}, Timeout).
%% @doc Write the projection record type `ProjType' for epoch number `Epoch' .
write(PidSpec, ProjType, Proj) ->
write(PidSpec, ProjType, Proj, infinity).
%% @doc Write the projection record type `ProjType' for epoch number `Epoch' .
write(PidSpec, ProjType, Proj, Timeout)
when ProjType == 'public' orelse ProjType == 'private',
is_record(Proj, projection_v1),
@ -82,19 +128,27 @@ write(PidSpec, ProjType, Proj, Timeout)
Proj#projection_v1.epoch_number >= 0 ->
g_call(PidSpec, {write, ProjType, Proj}, Timeout).
get_all(PidSpec, ProjType) ->
get_all(PidSpec, ProjType, infinity).
%% @doc Fetch all projection records of type `ProjType'.
get_all(PidSpec, ProjType, Timeout)
get_all_projections(PidSpec, ProjType) ->
get_all_projections(PidSpec, ProjType, infinity).
%% @doc Fetch all projection records of type `ProjType'.
get_all_projections(PidSpec, ProjType, Timeout)
when ProjType == 'public' orelse ProjType == 'private' ->
g_call(PidSpec, {get_all, ProjType}, Timeout).
g_call(PidSpec, {get_all_projections, ProjType}, Timeout).
list_all(PidSpec, ProjType) ->
list_all(PidSpec, ProjType, infinity).
%% @doc Fetch all projection epoch numbers of type `ProjType'.
list_all(PidSpec, ProjType, Timeout)
list_all_projections(PidSpec, ProjType) ->
list_all_projections(PidSpec, ProjType, infinity).
%% @doc Fetch all projection epoch numbers of type `ProjType'.
list_all_projections(PidSpec, ProjType, Timeout)
when ProjType == 'public' orelse ProjType == 'private' ->
g_call(PidSpec, {list_all, ProjType}, Timeout).
g_call(PidSpec, {list_all_projections, ProjType}, Timeout).
%%%%%%%%%%%%%%%%%%%%%%%%%%%
@ -124,16 +178,16 @@ init([DataDir, NotifyWedgeStateChanges]) ->
handle_call({{get_latest_epoch, ProjType}, LC1}, _From, S) ->
LC2 = lclock_update(LC1),
Epoch = if ProjType == public -> S#state.max_public_epoch;
EpochT = if ProjType == public -> S#state.max_public_epoch;
ProjType == private -> S#state.max_private_epoch
end,
{reply, {{ok, Epoch}, LC2}, S};
{reply, {{ok, EpochT}, LC2}, S};
handle_call({{read_latest_projection, ProjType}, LC1}, _From, S) ->
LC2 = lclock_update(LC1),
Epoch = if ProjType == public -> S#state.max_public_epoch;
{EpochNum, _CSum} = if ProjType == public -> S#state.max_public_epoch;
ProjType == private -> S#state.max_private_epoch
end,
{Reply, NewS} = do_proj_read(ProjType, Epoch, S),
{Reply, NewS} = do_proj_read(ProjType, EpochNum, S),
{reply, {Reply, LC2}, NewS};
handle_call({{read, ProjType, Epoch}, LC1}, _From, S) ->
LC2 = lclock_update(LC1),
@ -143,7 +197,7 @@ handle_call({{write, ProjType, Proj}, LC1}, _From, S) ->
LC2 = lclock_update(LC1),
{Reply, NewS} = do_proj_write(ProjType, Proj, S),
{reply, {Reply, LC2}, NewS};
handle_call({{get_all, ProjType}, LC1}, _From, S) ->
handle_call({{get_all_projections, ProjType}, LC1}, _From, S) ->
LC2 = lclock_update(LC1),
Dir = pick_path(ProjType, S),
Epochs = find_all(Dir),
@ -152,7 +206,7 @@ handle_call({{get_all, ProjType}, LC1}, _From, S) ->
Proj
end || Epoch <- Epochs],
{reply, {{ok, All}, LC2}, S};
handle_call({{list_all, ProjType}, LC1}, _From, S) ->
handle_call({{list_all_projections, ProjType}, LC1}, _From, S) ->
LC2 = lclock_update(LC1),
Dir = pick_path(ProjType, S),
{reply, {{ok, find_all(Dir)}, LC2}, S};
@ -176,17 +230,21 @@ code_change(_OldVsn, S, _Extra) ->
do_proj_read(_ProjType, Epoch, S) when Epoch < 0 ->
{{error, not_written}, S};
do_proj_read(ProjType, Epoch, S) ->
Dir = pick_path(ProjType, S),
do_proj_read(ProjType, Epoch, S_or_Dir) ->
Dir = if is_record(S_or_Dir, state) ->
pick_path(ProjType, S_or_Dir);
is_list(S_or_Dir) ->
S_or_Dir
end,
Path = filename:join(Dir, epoch2name(Epoch)),
case file:read_file(Path) of
{ok, Bin} ->
%% TODO and if Bin is corrupt? (even if binary_to_term() succeeds)
{{ok, binary_to_term(Bin)}, S};
{{ok, binary_to_term(Bin)}, S_or_Dir};
{error, enoent} ->
{{error, not_written}, S};
{{error, not_written}, S_or_Dir};
{error, Else} ->
{{error, Else}, S}
{{error, Else}, S_or_Dir}
end.
do_proj_write(ProjType, #projection_v1{epoch_number=Epoch}=Proj, S) ->
@ -201,12 +259,15 @@ do_proj_write(ProjType, #projection_v1{epoch_number=Epoch}=Proj, S) ->
ok = file:write(FH, term_to_binary(Proj)),
ok = file:sync(FH),
ok = file:close(FH),
NewS = if ProjType == public, Epoch > S#state.max_public_epoch ->
io:format(user, "TODO: tell ~p we are wedged by epoch ~p\n", [S#state.wedge_notify_pid, Epoch]),
S#state{max_public_epoch=Epoch, wedged=true};
ProjType == private, Epoch > S#state.max_private_epoch ->
io:format(user, "TODO: tell ~p we are unwedged by epoch ~p\n", [S#state.wedge_notify_pid, Epoch]),
S#state{max_private_epoch=Epoch, wedged=false};
EpochT = {Epoch, Proj#projection_v1.epoch_csum},
NewS = if ProjType == public,
Epoch > element(1, S#state.max_public_epoch) ->
%io:format(user, "TODO: tell ~p we are wedged by epoch ~p\n", [S#state.wedge_notify_pid, Epoch]),
S#state{max_public_epoch=EpochT, wedged=true};
ProjType == private,
Epoch > element(1, S#state.max_private_epoch) ->
%io:format(user, "TODO: tell ~p we are unwedged by epoch ~p\n", [S#state.wedge_notify_pid, Epoch]),
S#state{max_private_epoch=EpochT, wedged=false};
true ->
S
end,
@ -233,9 +294,11 @@ find_all(Dir) ->
find_max_epoch(Dir) ->
Fs = lists:sort(filelib:wildcard("*", Dir)),
if Fs == [] ->
-1;
?NO_EPOCH;
true ->
name2epoch(lists:last(Fs))
EpochNum = name2epoch(lists:last(Fs)),
{{ok, Proj}, _} = do_proj_read(proj_type_ignored, EpochNum, Dir),
{EpochNum, Proj}
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%

View file

@ -0,0 +1,309 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
%% @doc Erlang API for the Machi FLU TCP protocol version 1, with a
%% proxy-process style API for hiding messy details such as TCP
%% connection/disconnection with the remote Machi server.
%%
%% Machi is intentionally avoiding using distributed Erlang for
%% Machi's communication. This design decision makes Erlang-side code
%% more difficult &amp; complex, but it's the price to pay for some
%% language independence. Later in Machi's life cycle, we need to
%% (re-)implement some components in a non-Erlang/BEAM-based language.
%%
%% This module implements a "man in the middle" proxy between the
%% Erlang client and Machi server (which is on the "far side" of a TCP
%% connection to somewhere). This proxy process will always execute
%% on the same Erlang node as the Erlang client that uses it. The
%% proxy is intended to be a stable, long-lived process that survives
%% TCP communication problems with the remote server.
-module(machi_proxy_flu1_client).
-behaviour(gen_server).
-include("machi.hrl").
-include("machi_projection.hrl").
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif. % TEST.
-export([start_link/1]).
%% FLU1 API
-export([
%% File API
append_chunk/4, append_chunk/5,
read_chunk/5, read_chunk/6,
checksum_list/3, checksum_list/4,
list_files/2, list_files/3,
%% %% Projection API
get_latest_epoch/2, get_latest_epoch/3,
read_latest_projection/2, read_latest_projection/3,
read_projection/3, read_projection/4,
write_projection/3, write_projection/4,
get_all_projections/2, get_all_projections/3,
list_all_projections/2, list_all_projections/3,
%% Common API
quit/1
]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-define(FLU_C, machi_flu1_client).
-record(state, {
i :: #p_srvr{},
sock :: 'undefined' | port()
}).
%% @doc Start a local, long-lived process that will be our steady
%% &amp; reliable communication proxy with the fickle &amp; flaky
%% remote Machi server.
start_link(#p_srvr{}=I) ->
gen_server:start_link(?MODULE, [I], []).
%% @doc Append a chunk (binary- or iolist-style) of data to a file
%% with `Prefix'.
append_chunk(PidSpec, EpochID, Prefix, Chunk) ->
append_chunk(PidSpec, EpochID, Prefix, Chunk, infinity).
%% @doc Append a chunk (binary- or iolist-style) of data to a file
%% with `Prefix'.
append_chunk(PidSpec, EpochID, Prefix, Chunk, Timeout) ->
gen_server:call(PidSpec, {req, {append_chunk, EpochID, Prefix, Chunk}},
Timeout).
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
read_chunk(PidSpec, EpochID, File, Offset, Size) ->
read_chunk(PidSpec, EpochID, File, Offset, Size, infinity).
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
read_chunk(PidSpec, EpochID, File, Offset, Size, Timeout) ->
gen_server:call(PidSpec, {req, {read_chunk, EpochID, File, Offset, Size}},
Timeout).
%% @doc Fetch the list of chunk checksums for `File'.
checksum_list(PidSpec, EpochID, File) ->
checksum_list(PidSpec, EpochID, File, infinity).
%% @doc Fetch the list of chunk checksums for `File'.
checksum_list(PidSpec, EpochID, File, Timeout) ->
gen_server:call(PidSpec, {req, {checksum_list, EpochID, File}},
Timeout).
%% @doc Fetch the list of all files on the remote FLU.
list_files(PidSpec, EpochID) ->
list_files(PidSpec, EpochID, infinity).
%% @doc Fetch the list of all files on the remote FLU.
list_files(PidSpec, EpochID, Timeout) ->
gen_server:call(PidSpec, {req, {list_files, EpochID}},
Timeout).
%% @doc Get the latest epoch number + checksum from the FLU's projection store.
get_latest_epoch(PidSpec, ProjType) ->
get_latest_epoch(PidSpec, ProjType, infinity).
%% @doc Get the latest epoch number + checksum from the FLU's projection store.
get_latest_epoch(PidSpec, ProjType, Timeout) ->
gen_server:call(PidSpec, {req, {get_latest_epoch, ProjType}},
Timeout).
%% @doc Get the latest projection from the FLU's projection store for `ProjType'
read_latest_projection(PidSpec, ProjType) ->
read_latest_projection(PidSpec, ProjType, infinity).
%% @doc Get the latest projection from the FLU's projection store for `ProjType'
read_latest_projection(PidSpec, ProjType, Timeout) ->
gen_server:call(PidSpec, {req, {read_latest_projection, ProjType}},
Timeout).
%% @doc Read a projection `Proj' of type `ProjType'.
read_projection(PidSpec, ProjType, Epoch) ->
read_projection(PidSpec, ProjType, Epoch, infinity).
%% @doc Read a projection `Proj' of type `ProjType'.
read_projection(PidSpec, ProjType, Epoch, Timeout) ->
gen_server:call(PidSpec, {req, {read_projection, ProjType, Epoch}},
Timeout).
%% @doc Write a projection `Proj' of type `ProjType'.
write_projection(PidSpec, ProjType, Proj) ->
write_projection(PidSpec, ProjType, Proj, infinity).
%% @doc Write a projection `Proj' of type `ProjType'.
write_projection(PidSpec, ProjType, Proj, Timeout) ->
gen_server:call(PidSpec, {req, {write_projection, ProjType, Proj}},
Timeout).
%% @doc Get all projections from the FLU's projection store.
get_all_projections(PidSpec, ProjType) ->
get_all_projections(PidSpec, ProjType, infinity).
%% @doc Get all projections from the FLU's projection store.
get_all_projections(PidSpec, ProjType, Timeout) ->
gen_server:call(PidSpec, {req, {get_all_projections, ProjType}},
Timeout).
%% @doc Get all epoch numbers from the FLU's projection store.
list_all_projections(PidSpec, ProjType) ->
list_all_projections(PidSpec, ProjType, infinity).
%% @doc Get all epoch numbers from the FLU's projection store.
list_all_projections(PidSpec, ProjType, Timeout) ->
gen_server:call(PidSpec, {req, {list_all_projections, ProjType}},
Timeout).
%% @doc Quit &amp; close the connection to remote FLU and stop our
%% proxy process.
quit(PidSpec) ->
gen_server:call(PidSpec, quit, infinity).
%%%%%%%%%%%%%%%%%%%%%%%%%%%
init([I]) ->
S0 = #state{i=I},
S1 = try_connect(S0),
{ok, S1}.
handle_call({req, Req}, _From, S) ->
{Reply, NewS} = do_req(Req, S),
{reply, Reply, NewS};
handle_call(quit, _From, S) ->
{stop, normal, ok, disconnect(S)};
handle_call(_Request, _From, S) ->
Reply = ok,
{reply, Reply, S}.
handle_cast(_Msg, S) ->
{noreply, S}.
handle_info(_Info, S) ->
{noreply, S}.
terminate(_Reason, _S) ->
ok.
code_change(_OldVsn, S, _Extra) ->
{ok, S}.
%%%%%%%%%%%%%%%%%%%%%%%%%%%
do_req(Req, S) ->
S2 = try_connect(S),
Fun = make_req_fun(Req, S2),
case connected_p(S2) of
true ->
case Fun() of
T when element(1, T) == ok ->
{T, S2};
Else ->
case get(bad_sock) of
Bad when Bad == S2#state.sock ->
{Else, disconnect(S2)};
_ ->
{Else, S2}
end
end;
false ->
{{error, not_connected}, S2}
end.
make_req_fun({append_chunk, EpochID, Prefix, Chunk}, #state{sock=Sock}) ->
fun() -> ?FLU_C:append_chunk(Sock, EpochID, Prefix, Chunk) end;
make_req_fun({read_chunk, EpochID, File, Offset, Size}, #state{sock=Sock}) ->
fun() -> ?FLU_C:read_chunk(Sock, EpochID, File, Offset, Size) end;
make_req_fun({checksum_list, EpochID, File}, #state{sock=Sock}) ->
fun() -> ?FLU_C:checksum_list(Sock, EpochID, File) end;
make_req_fun({list_files, EpochID}, #state{sock=Sock}) ->
fun() -> ?FLU_C:list_files(Sock, EpochID) end;
make_req_fun({get_latest_epoch, ProjType}, #state{sock=Sock}) ->
fun() -> ?FLU_C:get_latest_epoch(Sock, ProjType) end;
make_req_fun({read_latest_projection, ProjType}, #state{sock=Sock}) ->
fun() -> ?FLU_C:read_latest_projection(Sock, ProjType) end;
make_req_fun({read_projection, ProjType, Epoch}, #state{sock=Sock}) ->
fun() -> ?FLU_C:read_projection(Sock, ProjType, Epoch) end;
make_req_fun({write_projection, ProjType, Proj}, #state{sock=Sock}) ->
fun() -> ?FLU_C:write_projection(Sock, ProjType, Proj) end;
make_req_fun({get_all_projections, ProjType}, #state{sock=Sock}) ->
fun() -> ?FLU_C:get_all_projections(Sock, ProjType) end;
make_req_fun({list_all_projections, ProjType}, #state{sock=Sock}) ->
fun() -> ?FLU_C:list_all_projections(Sock, ProjType) end.
connected_p(#state{sock=SockMaybe,
i=#p_srvr{proto=ipv4}=_I}=_S) ->
is_port(SockMaybe);
connected_p(#state{i=#p_srvr{proto=disterl,
name=_NodeName}=_I}=_S) ->
true.
%% case net_adm:ping(NodeName) of
%% ping ->
%% true;
%% _ ->
%% false
%% end.
try_connect(#state{sock=undefined,
i=#p_srvr{proto=ipv4, address=Host, port=TcpPort}=_I}=S) ->
try
Sock = machi_util:connect(Host, TcpPort),
S#state{sock=Sock}
catch
_:_ ->
S
end;
try_connect(S) ->
%% If we're connection-based, we're already connected.
%% If we're not connection-based, then there's nothing to do.
S.
disconnect(#state{sock=Sock,
i=#p_srvr{proto=ipv4}=_I}=S) ->
(catch gen_tcp:close(Sock)),
S#state{sock=undefined};
disconnect(S) ->
S.

View file

@ -18,6 +18,9 @@
%%
%% -------------------------------------------------------------------
%% @doc "Mothballed" sequencer code, perhaps to be reused sometime in
%% the future?
-module(machi_sequencer).
-compile(export_all).

View file

@ -18,6 +18,8 @@
%%
%% -------------------------------------------------------------------
%% @doc Top Machi application supervisor.
-module(machi_sup).
-behaviour(supervisor).

View file

@ -18,20 +18,24 @@
%%
%% -------------------------------------------------------------------
%% @doc Miscellaneous utility functions.
-module(machi_util).
-export([
checksum/1,
checksum_chunk/1,
hexstr_to_bin/1, bin_to_hexstr/1,
hexstr_to_int/1, int_to_hexstr/2, int_to_hexbin/2,
make_binary/1, make_string/1,
make_regname/1,
make_checksum_filename/2, make_data_filename/2,
make_config_filename/2,
make_checksum_filename/4, make_checksum_filename/2,
make_data_filename/4, make_data_filename/2,
make_projection_filename/2,
read_max_filenum/2, increment_max_filenum/2,
info_msg/2, verb/1, verb/2,
%% TCP protocol helpers
connect/2
connect/2, connect/3
]).
-compile(export_all).
@ -39,33 +43,54 @@
-include("machi_projection.hrl").
-include_lib("kernel/include/file.hrl").
append(Server, Prefix, Chunk) when is_binary(Prefix), is_binary(Chunk) ->
CSum = checksum(Chunk),
Server ! {seq_append, self(), Prefix, Chunk, CSum},
receive
{assignment, Offset, File} ->
{Offset, File}
after 10*1000 ->
bummer
end.
%% @doc Create a registered name atom for FLU sequencer internal
%% rendezvous/message passing use.
-spec make_regname(binary()|list()) ->
atom().
make_regname(Prefix) when is_binary(Prefix) ->
erlang:binary_to_atom(Prefix, latin1);
make_regname(Prefix) when is_list(Prefix) ->
erlang:list_to_atom(Prefix).
%% @doc Calculate a config file path, by common convention.
-spec make_config_filename(string(), string()) ->
string().
make_config_filename(DataDir, Prefix) ->
lists:flatten(io_lib:format("~s/config/~s", [DataDir, Prefix])).
%% @doc Calculate a checksum file path, by common convention.
-spec make_checksum_filename(string(), string(), atom()|string()|binary(), integer()) ->
string().
make_checksum_filename(DataDir, Prefix, SequencerName, FileNum) ->
lists:flatten(io_lib:format("~s/config/~s.~s.~w.csum",
[DataDir, Prefix, SequencerName, FileNum])).
%% @doc Calculate a checksum file path, by common convention.
-spec make_checksum_filename(string(), [] | string() | binary()) ->
string().
make_checksum_filename(DataDir, "") ->
lists:flatten(io_lib:format("~s/config", [DataDir]));
make_checksum_filename(DataDir, FileName) ->
lists:flatten(io_lib:format("~s/config/~s.csum", [DataDir, FileName])).
%% @doc Calculate a file data file path, by common convention.
-spec make_data_filename(string(), string(), atom()|string()|binary(), integer()) ->
{binary(), string()}.
make_data_filename(DataDir, Prefix, SequencerName, FileNum) ->
File = erlang:iolist_to_binary(io_lib:format("~s.~s.~w",
[Prefix, SequencerName, FileNum])),
FullPath = lists:flatten(io_lib:format("~s/data/~s", [DataDir, File])),
{File, FullPath}.
%% @doc Calculate a file data file path, by common convention.
-spec make_data_filename(string(), [] | string() | binary()) ->
{binary(), string()}.
make_data_filename(DataDir, "") ->
FullPath = lists:flatten(io_lib:format("~s/data", [DataDir])),
{"", FullPath};
@ -73,17 +98,20 @@ make_data_filename(DataDir, File) ->
FullPath = lists:flatten(io_lib:format("~s/data/~s", [DataDir, File])),
{File, FullPath}.
make_data_filename(DataDir, Prefix, SequencerName, FileNum) ->
File = erlang:iolist_to_binary(io_lib:format("~s.~s.~w",
[Prefix, SequencerName, FileNum])),
FullPath = lists:flatten(io_lib:format("~s/data/~s", [DataDir, File])),
{File, FullPath}.
%% @doc Calculate a projection store file path, by common convention.
-spec make_projection_filename(string(), [] | string()) ->
string().
make_projection_filename(DataDir, "") ->
lists:flatten(io_lib:format("~s/projection", [DataDir]));
make_projection_filename(DataDir, File) ->
lists:flatten(io_lib:format("~s/projection/~s", [DataDir, File])).
%% @doc Read the file size of a config file, which is used as the
%% basis for a minimum sequence number.
-spec read_max_filenum(string(), string()) ->
non_neg_integer().
read_max_filenum(DataDir, Prefix) ->
case file:read_file_info(make_config_filename(DataDir, Prefix)) of
{error, enoent} ->
@ -92,6 +120,11 @@ read_max_filenum(DataDir, Prefix) ->
FI#file_info.size
end.
%% @doc Increase the file size of a config file, which is used as the
%% basis for a minimum sequence number.
-spec increment_max_filenum(string(), string()) ->
ok | {error, term()}.
increment_max_filenum(DataDir, Prefix) ->
try
{ok, FH} = file:open(make_config_filename(DataDir, Prefix), [append]),
@ -100,9 +133,13 @@ increment_max_filenum(DataDir, Prefix) ->
ok = file:close(FH)
catch
error:{badmatch,_}=Error ->
{error, Error, erlang:get_stacktrace()}
{error, {Error, erlang:get_stacktrace()}}
end.
%% @doc Convert a hexadecimal string to a `binary()'.
-spec hexstr_to_bin(string() | binary()) ->
binary().
hexstr_to_bin(S) when is_list(S) ->
hexstr_to_bin(S, []);
hexstr_to_bin(B) when is_binary(B) ->
@ -114,6 +151,10 @@ hexstr_to_bin([X,Y|T], Acc) ->
{ok, [V], []} = io_lib:fread("~16u", [X,Y]),
hexstr_to_bin(T, [V | Acc]).
%% @doc Convert a `binary()' to a hexadecimal string.
-spec bin_to_hexstr(binary()) ->
string().
bin_to_hexstr(<<>>) ->
[];
bin_to_hexstr(<<X:4, Y:4, Rest/binary>>) ->
@ -124,40 +165,75 @@ hex_digit(X) when X < 10 ->
hex_digit(X) ->
X - 10 + $a.
%% @doc Convert a compatible Erlang data type into a `binary()' equivalent.
-spec make_binary(binary() | iolist()) ->
binary().
make_binary(X) when is_binary(X) ->
X;
make_binary(X) when is_list(X) ->
iolist_to_binary(X).
%% @doc Convert a compatible Erlang data type into a `string()' equivalent.
-spec make_string(binary() | iolist()) ->
string().
make_string(X) when is_list(X) ->
lists:flatten(X);
make_string(X) when is_binary(X) ->
binary_to_list(X).
%% @doc Convert a hexadecimal string to an integer.
-spec hexstr_to_int(string() | binary()) ->
non_neg_integer().
hexstr_to_int(X) ->
B = hexstr_to_bin(X),
B_size = byte_size(B) * 8,
<<I:B_size/big>> = B,
I.
%% @doc Convert an integer into a hexadecimal string whose length is
%% based on `I_size'.
-spec int_to_hexstr(non_neg_integer(), non_neg_integer()) ->
string().
int_to_hexstr(I, I_size) ->
bin_to_hexstr(<<I:I_size/big>>).
%% @doc Convert an integer into a hexadecimal string (in `binary()'
%% form) whose length is based on `I_size'.
-spec int_to_hexbin(non_neg_integer(), non_neg_integer()) ->
binary().
int_to_hexbin(I, I_size) ->
list_to_binary(int_to_hexstr(I, I_size)).
checksum(Bin) when is_binary(Bin) ->
crypto:hash(md5, Bin).
%% @doc Calculate a checksum for a chunk of file data.
-spec checksum_chunk(binary() | iolist()) ->
binary().
checksum_chunk(Chunk) when is_binary(Chunk); is_list(Chunk) ->
crypto:hash(sha, Chunk).
%% @doc Log a verbose message.
-spec verb(string()) -> term().
verb(Fmt) ->
verb(Fmt, []).
%% @doc Log a verbose message.
-spec verb(string(), list()) -> term().
verb(Fmt, Args) ->
case application:get_env(kernel, verbose) of
{ok, true} -> io:format(Fmt, Args);
_ -> ok
end.
%% @doc Log an 'info' level message.
-spec info_msg(string(), list()) -> term().
info_msg(Fmt, Args) ->
case application:get_env(kernel, verbose) of {ok, false} -> ok;
_ -> error_logger:info_msg(Fmt, Args)
@ -165,16 +241,26 @@ info_msg(Fmt, Args) ->
%%%%%%%%%%%%%%%%%
%% @doc Create a TCP connection to a remote Machi server.
-spec connect(inet:ip_address() | inet:hostname(), inet:port_number()) ->
port().
connect(Host, Port) ->
escript_connect(Host, Port).
escript_connect(Host, Port, 4500).
escript_connect(Host, PortStr) when is_list(PortStr) ->
%% @doc Create a TCP connection to a remote Machi server.
-spec connect(inet:ip_address() | inet:hostname(), inet:port_number(),
timeout()) ->
port().
connect(Host, Port, Timeout) ->
escript_connect(Host, Port, Timeout).
escript_connect(Host, PortStr, Timeout) when is_list(PortStr) ->
Port = list_to_integer(PortStr),
escript_connect(Host, Port);
escript_connect(Host, Port) when is_integer(Port) ->
escript_connect(Host, Port, Timeout);
escript_connect(Host, Port, Timeout) when is_integer(Port) ->
{ok, Sock} = gen_tcp:connect(Host, Port, [{active,false}, {mode,binary},
{packet, raw}]),
{packet, raw}], Timeout),
Sock.

View file

@ -0,0 +1,447 @@
%% -------------------------------------------------------------------
%%
%% Machi: a small village of replicated files
%%
%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(machi_chain_manager1_converge_demo).
-include("machi.hrl").
-include("machi_projection.hrl").
-define(MGR, machi_chain_manager1).
-define(D(X), io:format(user, "~s ~p\n", [??X, X])).
-define(Dw(X), io:format(user, "~s ~w\n", [??X, X])).
-define(FLU_C, machi_flu1_client).
-define(FLU_PC, machi_proxy_flu1_client).
-compile(export_all).
-ifdef(TEST).
-ifdef(EQC).
-include_lib("eqc/include/eqc.hrl").
%% -include_lib("eqc/include/eqc_statem.hrl").
-define(QC_OUT(P),
eqc:on_output(fun(Str, Args) -> io:format(user, Str, Args) end, P)).
-endif.
-include_lib("eunit/include/eunit.hrl").
short_doc() ->
"
A visualization of the convergence behavior of the chain self-management
algorithm for Machi.
1. Set up 4 FLUs and chain manager pairs.
2. Create a number of different network partition scenarios, where
(simulated) partitions may be symmetric or asymmetric. Then halt changing
the partitions and keep the simulated network stable and broken.
3. Run a number of iterations of the algorithm in parallel by poking each
of the manager processes on a random'ish basis.
4. Afterward, fetch the chain transition changes made by each FLU and
verify that no transition was unsafe.
During the iteration periods, the following is a cheatsheet for the output.
See the internal source for interpreting the rest of the output.
'Let loose the dogs of war!' Network instability
'SET partitions = ' Network stability (but broken)
'x uses:' The FLU x has made an internal state transition. The rest of
the line is a dump of internal state.
'{t}' This is a tick event which triggers one of the manager processes
to evaluate its environment and perhaps make a state transition.
A long chain of '{t}{t}{t}{t}' means that the chain state has settled
to a stable configuration, which is the goal of the algorithm.
Press control-c to interrupt....".
long_doc() ->
"
'Let loose the dogs of war!'
The simulated network is very unstable for a few seconds.
'x uses'
After a single iteration, server x has determined that the chain
should be defined by the upi, repair, and down list in this record.
If all participants reach the same conclusion at the same epoch
number (and checksum, see next item below), then the chain is
stable, fully configured, and can provide full service.
'epoch,E'
The epoch number for this decision is E. The checksum of the full
record is not shown. For purposes of the protocol, a server will
'wedge' itself and refuse service (until a new config is chosen)
whenever: a). it sees a bigger epoch number mentioned somewhere, or
b). it sees the same epoch number but a different checksum. In case
of b), there was a network partition that has healed, and both sides
had chosen to operate with an identical epoch number but different
chain configs.
'upi', 'repair', and 'down'
Members in the chain that are fully in sync and thus preserving the
Update Propagation Invariant, up but under repair (simulated), and
down, respectively.
'ps,[some list]'
The list of asymmetric network partitions. {a,b} means that a
cannot send to b, but b can send to a.
This partition list is recorded for debugging purposes but is *not*
used by the algorithm. The algorithm only 'feels' its effects via
simulated timeout whenever there's a partition in one of the
messaging directions.
'nodes_up,[list]'
The best guess right now of which ndoes are up, relative to the
author node, specified by '{author,X}'
'SET partitions = [some list]'
All subsequent iterations should have a stable list of partitions,
i.e. the 'ps' list described should be stable.
'{FLAP: x flaps n}!'
Server x has detected that it's flapping/oscillating after iteration
n of a naive/1st draft detection algorithm.
".
%% convergence_demo_test_() ->
%% {timeout, 98*300, fun() -> convergence_demo_testfun() end}.
%% convergence_demo_testfun() ->
%% convergence_demo_testfun(3).
t() ->
t(3).
t(N) ->
convergence_demo_testfun(N).
convergence_demo_testfun(NumFLUs) ->
timer:sleep(100),
%% Faster test startup, commented: io:format(user, short_doc(), []),
%% Faster test startup, commented: timer:sleep(3000),
TcpPort = 62877,
FluInfo = [{a,TcpPort+0,"./data.a"}, {b,TcpPort+1,"./data.b"},
{c,TcpPort+2,"./data.c"}, {d,TcpPort+3,"./data.d"},
{e,TcpPort+4,"./data.e"}, {f,TcpPort+5,"./data.f"}],
FLU_biglist = [X || {X,_,_} <- FluInfo],
All_list = lists:sublist(FLU_biglist, NumFLUs),
io:format(user, "\nSET # of FLus = ~w members ~w).\n",
[NumFLUs, All_list]),
machi_partition_simulator:start_link({111,222,33}, 0, 100),
_ = machi_partition_simulator:get(All_list),
Ps = [#p_srvr{name=Name,address="localhost",port=Port} ||
{Name,Port,_Dir} <- lists:sublist(FluInfo, NumFLUs)],
PsDirs = lists:zip(Ps,
[Dir || {_,_,Dir} <- lists:sublist(FluInfo, NumFLUs)]),
FLU_pids = [machi_flu1_test:setup_test_flu(Name, Port, Dir) ||
{#p_srvr{name=Name,port=Port}, Dir} <- PsDirs],
Namez = [begin
{ok, PPid} = ?FLU_PC:start_link(P),
{Name, PPid}
end || {#p_srvr{name=Name}=P, _Dir} <- PsDirs],
MembersDict = machi_projection:make_members_dict(Ps),
MgrOpts = [private_write_verbose, {active_mode,false}],
MgrNamez =
[begin
{ok, MPid} = ?MGR:start_link(P#p_srvr.name, MembersDict, MgrOpts),
{P#p_srvr.name, MPid}
end || P <- Ps],
try
[{_, Ma}|_] = MgrNamez,
{ok, P1} = ?MGR:test_calc_projection(Ma, false),
[ok = ?FLU_PC:write_projection(FLUPid, public, P1) ||
{_, FLUPid} <- Namez, FLUPid /= Ma],
machi_partition_simulator:reset_thresholds(10, 50),
_ = machi_partition_simulator:get(All_list),
Parent = self(),
DoIt = fun(Iters, S_min, S_max) ->
io:format(user, "\nDoIt: top\n\n", []),
Pids = [spawn(fun() ->
random:seed(now()),
[begin
erlang:yield(),
S_max_rand = random:uniform(
S_max + 1),
io:format(user, "{t}", []),
Elapsed =
?MGR:sleep_ranked_order(
S_min, S_max_rand,
M_name, All_list),
_ = ?MGR:test_react_to_env(MMM),
%% if M_name == d ->
%% [_ = ?MGR:test_react_to_env(MMM) ||
%% _ <- lists:seq(1,3)],
%% superunfair;
%% true ->
%% ok
%% end,
%% Be more unfair by not
%% sleeping here.
%% timer:sleep(S_max - Elapsed),
Elapsed
end || _ <- lists:seq(1, Iters)],
Parent ! done
end) || {M_name, MMM} <- MgrNamez ],
[receive
done ->
ok
after 995000 ->
exit(icky_timeout)
end || _ <- Pids]
end,
_XandYs1 = [[{X,Y}] || X <- All_list, Y <- All_list, X /= Y],
_XandYs2 = [[{X,Y}, {A,B}] || X <- All_list, Y <- All_list, X /= Y,
A <- All_list, B <- All_list, A /= B,
X /= A],
_XandYs3 = [[{X,Y}, {A,B}, {C,D}] || X <- All_list, Y <- All_list, X /= Y,
A <- All_list, B <- All_list, A /= B,
C <- All_list, D <- All_list, C /= D,
X /= A, X /= C, A /= C],
%% AllPartitionCombinations = _XandYs1 ++ _XandYs2,
%% AllPartitionCombinations = _XandYs3,
AllPartitionCombinations = _XandYs1 ++ _XandYs2 ++ _XandYs3,
?D({?LINE, length(AllPartitionCombinations)}),
machi_partition_simulator:reset_thresholds(10, 50),
io:format(user, "\nLet loose the dogs of war!\n", []),
DoIt(30, 0, 0),
[begin
%% io:format(user, "\nSET partitions = ~w.\n", [ [] ]),machi_partition_simulator:no_partitions(),
%% [DoIt(50, 10, 100) || _ <- [1,2,3]],
io:format(user, "\nLet loose the dogs of war!\n", []),
DoIt(30, 0, 0),
io:format(user, "\nSET partitions = ~w.\n", [ [] ]),machi_partition_simulator:no_partitions(),
[DoIt(10, 10, 100) || _ <- [1]],
%% machi_partition_simulator:reset_thresholds(10, 50),
%% io:format(user, "\nLet loose the dogs of war!\n", []),
%% DoIt(30, 0, 0),
machi_partition_simulator:always_these_partitions(Partition),
io:format(user, "\nSET partitions = ~w.\n", [Partition]),
[DoIt(50, 10, 100) || _ <- [1,2,3,4] ],
_PPP =
[begin
{ok, PPPallPubs} = ?FLU_PC:list_all_projections(FLU,public),
[begin
{ok, Pr} = todo_why_does_this_crash_sometimes(
FLUName, FLU, PPPepoch),
{Pr#projection_v1.epoch_number, FLUName, Pr}
end || PPPepoch <- PPPallPubs]
end || {FLUName, FLU} <- Namez],
%% io:format(user, "PPP ~p\n", [lists:sort(lists:append(_PPP))]),
%%%%%%%% {stable,true} = {stable,private_projections_are_stable(Namez, DoIt)},
{hosed_ok,true} = {hosed_ok,all_hosed_lists_are_identical(Namez, Partition)},
io:format(user, "\nSweet, all_hosed are identical-or-islands-inconclusive.\n", []),
timer:sleep(1000),
ok
%% end || Partition <- AllPartitionCombinations
%% end || Partition <- [ [{a,b},{b,d},{c,b}],
%% [{a,b},{b,d},{c,b}, {a,b},{b,a},{a,c},{c,a},{a,d},{d,a}],
%% %% [{a,b},{b,d},{c,b}, {b,a},{a,b},{b,c},{c,b},{b,d},{d,b}],
%% [{a,b},{b,d},{c,b}, {c,a},{a,c},{c,b},{b,c},{c,d},{d,c}],
%% [{a,b},{b,d},{c,b}, {d,a},{a,d},{d,b},{b,d},{d,c},{c,d}] ]
end || Partition <- [ [{a,b}, {b,c}],
[{a,b}, {c,b}] ]
%% end || Partition <- [ [{a,b}, {b,c}] ] %% hosed-not-equal @ 3 FLUs
%% end || Partition <- [ [{b,d}] ]
%% end || Partition <- [ [{a,b}, {b,a}] ]
%% end || Partition <- [ [{a,b}, {b,a}, {a,c},{c,a}] ]
%% end || Partition <- [ [{a,b}],
%% [{b,a}] ]
%% end || Partition <- [ [{a,b}, {c,b}],
%% [{a,b}, {b,c}] ]
%% end || Partition <- [ [{a,b}, {b,c}, {c,d}],
%% [{a,b}, {b,c},{b,d}, {c,d}],
%% [{b,a}, {b,c}, {c,d}],
%% [{a,b}, {c,b}, {c,d}],
%% [{a,b}, {b,c}, {d,c}] ]
%% end || Partition <- [ [{a,b}, {b,c}, {c,d}, {d,e}],
%% [{b,a}, {b,c}, {c,d}, {d,e}],
%% [{a,b}, {c,b}, {c,d}, {d,e}],
%% [{a,b}, {b,c}, {d,c}, {d,e}],
%% [{a,b}, {b,c}, {c,d}, {e,d}] ]
%% end || Partition <- [ [{c,a}] ]
%% end || Partition <- [ [{c,a}], [{c,b}, {a, b}] ]
%% end || Partition <- [ [{a,b},{b,a}, {a,c},{c,a}, {a,d},{d,a}],
%% [{a,b},{b,a}, {a,c},{c,a}, {a,d},{d,a}, {b,c}],
%% [{a,b},{b,a}, {a,c},{c,a}, {a,d},{d,a}, {c,d}] ]
%% end || Partition <- [ [{a,b}],
%% [{a,b}, {a,b},{b,a},{a,c},{c,a},{a,d},{d,a}],
%% [{a,b}, {b,a},{a,b},{b,c},{c,b},{b,d},{d,b}],
%% [{a,b}, {c,a},{a,c},{c,b},{b,c},{c,d},{d,c}],
%% [{a,b}, {d,a},{a,d},{d,b},{b,d},{d,c},{c,d}] ]
],
%% exit(end_experiment),
io:format(user, "\nSET partitions = []\n", []),
io:format(user, "We should see convergence to 1 correct chain.\n", []),
machi_partition_simulator:no_partitions(),
[DoIt(50, 10, 100) || _ <- [1]],
io:format(user, "Sweet, finishing early\n", []), exit(yoyoyo_testing_hack),
%% WARNING: In asymmetric partitions, private_projections_are_stable()
%% will never be true; code beyond this point on the -exp3
%% branch is bit-rotted, sorry!
true = private_projections_are_stable(Namez, DoIt),
io:format(user, "~s\n", [os:cmd("date")]),
%% We are stable now ... analyze it.
%% Create a report where at least one FLU has written a
%% private projection.
Report = machi_chain_manager1_test:unanimous_report(Namez),
%% ?D(Report),
%% Report is ordered by Epoch. For each private projection
%% written during any given epoch, confirm that all chain
%% members appear in only one unique chain, i.e., the sets of
%% unique chains are disjoint.
true = machi_chain_manager1_test:all_reports_are_disjoint(Report),
%% Given the report, we flip it around so that we observe the
%% sets of chain transitions relative to each FLU.
R_Chains = [machi_chain_manager1_test:extract_chains_relative_to_flu(
FLU, Report) || FLU <- All_list],
%% ?D(R_Chains),
R_Projs = [{FLU, [machi_chain_manager1_test:chain_to_projection(
FLU, Epoch, UPI, Repairing, All_list) ||
{Epoch, UPI, Repairing} <- E_Chains]} ||
{FLU, E_Chains} <- R_Chains],
%% For each chain transition experienced by a particular FLU,
%% confirm that each state transition is OK.
try
[{FLU, true} = {FLU, ?MGR:projection_transitions_are_sane(Psx, FLU)} ||
{FLU, Psx} <- R_Projs],
io:format(user, "\nAll sanity checks pass, hooray!\n", [])
catch _Err:_What ->
io:format(user, "Report ~p\n", [Report]),
exit({line, ?LINE, _Err, _What})
end,
%% ?D(R_Projs),
ok
catch
XX:YY ->
io:format(user, "BUMMER ~p ~p @ ~p\n",
[XX, YY, erlang:get_stacktrace()]),
exit({bummer,XX,YY})
after
[ok = ?MGR:stop(MgrPid) || {_, MgrPid} <- MgrNamez],
[ok = ?FLU_PC:quit(PPid) || {_, PPid} <- Namez],
[ok = machi_flu1:stop(FLUPid) || FLUPid <- FLU_pids],
ok = machi_partition_simulator:stop()
end.
todo_why_does_this_crash_sometimes(FLUName, FLU, PPPepoch) ->
try
{ok, _}=Res = ?FLU_PC:read_projection(FLU, public, PPPepoch),
Res
catch _:_ ->
io:format(user, "QQQ Whoa, it crashed this time for ~p at epoch ~p\n",
[FLUName, PPPepoch]),
timer:sleep(1000),
?FLU_PC:read_projection(FLU, public, PPPepoch)
end.
private_projections_are_stable(Namez, PollFunc) ->
Private1 = [?FLU_PC:get_latest_epoch(FLU, private) ||
{_Name, FLU} <- Namez],
PollFunc(5, 1, 10),
Private2 = [?FLU_PC:get_latest_epoch(FLU, private) ||
{_Name, FLU} <- Namez],
true = (Private1 == Private2).
all_hosed_lists_are_identical(Namez, Partition0) ->
Partition = lists:usort(Partition0),
Ps = [element(2,?FLU_PC:read_latest_projection(FLU, private)) ||
{_Name, FLU} <- Namez],
UniqueAllHoseds = lists:usort([machi_chain_manager1:get_all_hosed(P) ||
{ok, P} <- Ps]),
Members = [M || {M, _Pid} <- Namez],
Islands = machi_partition_simulator:partitions2num_islands(
Members, Partition),
%% io:format(user, "all_hosed_lists_are_identical:\n", []),
%% io:format(user, " Uniques = ~p Islands ~p\n Partition ~p\n",
%% [Uniques, Islands, Partition]),
case length(UniqueAllHoseds) of
1 ->
true;
%% TODO: With the addition of the digraph stuff below, the clause
%% below probably isn't necessary anymore, since the
%% digraph calculation should catch complete partition islands?
_ when Islands == 'many' ->
%% There are at least two partitions, so yes, it's quite
%% possible that the all_hosed lists may differ.
%% TODO Fix this up to be smarter about fully-isolated
%% islands of partition.
true;
_ ->
DG = digraph:new(),
Connection = machi_partition_simulator:partition2connection(
Members, Partition),
[digraph:add_vertex(DG, X) || X <- Members],
[digraph:add_edge(DG, X, Y) || {X,Y} <- Connection],
Any =
lists:any(
fun(X) ->
NotX = Members -- [X],
lists:any(
fun(Y) ->
%% There must be a shortest path of length
%% two in both directions, otherwise
%% the read projection call will fail.
%% And it's that failure that we're
%% interested in here.
XtoY = digraph:get_short_path(DG, X, Y),
YtoX = digraph:get_short_path(DG, Y, X),
(XtoY == false orelse
length(XtoY) > 2)
orelse
(YtoX == false orelse
length(YtoX) > 2)
end, NotX)
end, Members),
digraph:delete(DG),
if Any == true ->
%% There's a missing path of length 2 between some
%% two FLUs, so yes, there's going to be
%% non-identical all_hosed lists.
true;
true ->
false % There's no excuse, buddy
end
end.
-endif. % TEST

View file

@ -0,0 +1,379 @@
%% -------------------------------------------------------------------
%%
%% Machi: a small village of replicated files
%%
%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(machi_chain_manager1_pulse).
%% The while module is ifdef:ed, rebar should set PULSE
-ifdef(PULSE).
-compile(export_all).
-include_lib("eqc/include/eqc.hrl").
-include_lib("eqc/include/eqc_statem.hrl").
-include("machi.hrl").
-include_lib("eunit/include/eunit.hrl").
-compile({parse_transform, pulse_instrument}).
-compile({pulse_replace_module, [{application, pulse_application}]}).
%% The following functions contains side_effects but are run outside
%% PULSE, i.e. PULSE needs to leave them alone
-compile({pulse_skip,[{prop_pulse_test_,0}]}).
-compile({pulse_no_side_effect,[{file,'_','_'}, {erlang, now, 0}]}).
%% Used for output within EUnit...
-define(QC_FMT(Fmt, Args),
io:format(user, Fmt, Args)).
%% And to force EUnit to output QuickCheck output...
-define(QC_OUT(P),
eqc:on_output(fun(Str, Args) -> ?QC_FMT(Str, Args) end, P)).
-define(MGR, machi_chain_manager1).
-define(MGRTEST, machi_chain_manager1_test).
-record(state, {
step=0,
num_pids,
pids,
dump_state
}).
initial_state() ->
#state{}.
gen_num_pids() ->
choose(2, 5).
gen_seed() ->
noshrink({choose(1, 10000), choose(1, 10000), choose(1, 10000)}).
gen_old_threshold() ->
noshrink(choose(1, 100)).
gen_no_partition_threshold() ->
noshrink(choose(1, 100)).
command(#state{step=0}) ->
{call, ?MODULE, setup, [gen_num_pids(), gen_seed()]};
command(S) ->
frequency([
{ 1, {call, ?MODULE, change_partitions,
[gen_old_threshold(), gen_no_partition_threshold()]}},
{50, {call, ?MODULE, do_ticks,
[choose(5, 100), S#state.pids,
gen_old_threshold(), gen_no_partition_threshold()]}}
]).
precondition(_S, _) ->
true.
next_state(#state{step=Step}=S, Res, Call) ->
next_state2(S#state{step=Step + 1}, Res, Call).
next_state2(S, Res, {call, _, setup, [NumPids, _Seed]}) ->
S#state{num_pids=NumPids, pids=Res};
next_state2(S, Res, {call, _, dump_state, _Args}) ->
S#state{dump_state=Res};
next_state2(S, _Res, {call, _, _Func, _Args}) ->
S.
postcondition(_S, {call, _, _Func, _Args}, _Res) ->
true.
all_list() ->
[a,b,c].
%% [a,b,c,d,e].
setup(_Num, Seed) ->
?QC_FMT("\nsetup,", []),
All_list = all_list(),
_ = machi_partition_simulator:start_link(Seed, 0, 100),
_Partitions = machi_partition_simulator:get(All_list),
FLU_pids = [begin
{ok, FLUPid} = machi_flu0:start_link(Name),
_ = machi_flu0:get_epoch(FLUPid),
FLUPid
end || Name <- All_list],
Namez = lists:zip(All_list, FLU_pids),
Mgr_pids = [begin
{ok, Mgr} = ?MGR:start_link(Name, All_list, FLU_pid),
Mgr
end || {Name, FLU_pid} <- Namez],
timer:sleep(1),
{ok, P1} = ?MGR:test_calc_projection(hd(Mgr_pids), false),
P1Epoch = P1#projection.epoch_number,
[ok = machi_flu0:proj_write(FLU, P1Epoch, public, P1) || FLU <- FLU_pids],
[?MGR:test_react_to_env(Mgr) || Mgr <- Mgr_pids],
Res = {FLU_pids, Mgr_pids},
put(manager_pids_hack, Res),
Res.
change_partitions(OldThreshold, NoPartitionThreshold) ->
machi_partition_simulator:reset_thresholds(OldThreshold,
NoPartitionThreshold).
always_last_partitions() ->
machi_partition_simulator:always_last_partitions().
private_stable_check(FLUs) ->
{_FLU_pids, Mgr_pids} = get(manager_pids_hack),
Res = private_projections_are_stable_check(FLUs, Mgr_pids),
if not Res ->
io:format(user, "BUMMER: private stable check failed!\n", []);
true ->
ok
end,
Res.
do_ticks(Num, PidsMaybe, OldThreshold, NoPartitionThreshold) ->
io:format(user, "~p,~p,~p|", [Num, OldThreshold, NoPartitionThreshold]),
{_FLU_pids, Mgr_pids} = case PidsMaybe of
undefined -> get(manager_pids_hack);
_ -> PidsMaybe
end,
if is_integer(OldThreshold) ->
machi_partition_simulator:reset_thresholds(OldThreshold,
NoPartitionThreshold);
true ->
?QC_FMT("{e=~w},", [get_biggest_private_epoch_number()]),
machi_partition_simulator:no_partitions()
end,
Res = exec_ticks(Num, Mgr_pids),
if not is_integer(OldThreshold) ->
?QC_FMT("{e=~w},", [get_biggest_private_epoch_number()]);
true ->
ok
end,
Res.
get_biggest_private_epoch_number() ->
lists:last(
lists:usort(
lists:flatten(
[machi_flu0:proj_list_all(FLU, private) ||
FLU <- all_list()]))).
dump_state() ->
try
?QC_FMT("dump_state(", []),
{FLU_pids, _Mgr_pids} = get(manager_pids_hack),
Namez = zip(all_list(), FLU_pids),
Report = ?MGRTEST:unanimous_report(Namez),
%% ?QC_FMT("Report ~p\n", [Report]),
Diag1 = [begin
Ps = machi_flu0:proj_get_all(FLU, Type),
[io_lib:format("~p ~p ~p: ~w\n", [FLUName, Type, P#projection.epoch_number, ?MGR:make_projection_summary(P)]) || P <- Ps]
end || {FLUName, FLU} <- Namez,
Type <- [public] ],
UniquePrivateEs =
lists:usort(lists:flatten(
[machi_flu0:proj_list_all(FLU, private) ||
{_FLUName, FLU} <- Namez])),
P_lists0 = [{FLUName, Type, machi_flu0:proj_get_all(FLUPid, Type)} ||
{FLUName, FLUPid} <- Namez, Type <- [public,private]],
P_lists = [{FLUName, Type, P} || {FLUName, Type, Ps} <- P_lists0,
P <- Ps],
AllDict = lists:foldl(fun({FLU, Type, P}, D) ->
K = {FLU, Type, P#projection.epoch_number},
dict:store(K, P, D)
end, dict:new(), lists:flatten(P_lists)),
DumbFinderBackward =
fun(FLUName) ->
fun(E, error_unwritten) ->
case dict:find({FLUName, private, E}, AllDict) of
{ok, T} -> T;
error -> error_unwritten
end;
%% case machi_flu0:proj_read(FLU, E, private) of
%% {ok, T} -> T;
%% Else -> Else
%% end;
(_E, Acc) ->
Acc
end
end,
Diag2 = [[
io_lib:format("~p private: ~w\n",
[FLUName,
?MGR:make_projection_summary(
lists:foldl(DumbFinderBackward(FLUName),
error_unwritten,
lists:seq(Epoch, 0, -1)))])
|| {FLUName, _FLU} <- Namez]
|| Epoch <- UniquePrivateEs],
?QC_FMT(")", []),
{Report, lists:flatten([Diag1, Diag2])}
catch XX:YY ->
?QC_FMT("OUCH: ~p ~p @ ~p\n", [XX, YY, erlang:get_stacktrace()])
end.
prop_pulse() ->
?FORALL({Cmds0, Seed}, {non_empty(commands(?MODULE)), pulse:seed()},
?IMPLIES(1 < length(Cmds0) andalso length(Cmds0) < 5,
begin
ok = shutdown_hard(),
%% PULSE can be really unfair, of course, including having exec_ticks
%% run where all of FLU a does its ticks then FLU b. Such a situation
%% doesn't always allow unanimous private projection store values:
%% FLU a might need one more tick to write its private projection, but
%% it isn't given a chance at the end of the PULSE run. So we cheat
Stabilize1 = [{set,{var,99999995},
{call, ?MODULE, always_last_partitions, []}}],
Stabilize2 = [{set,{var,99999996},
{call, ?MODULE, private_stable_check, [all_list()]}}],
LastTriggerTicks = {set,{var,99999997},
{call, ?MODULE, do_ticks, [25, undefined, no, no]}},
Cmds1 = lists:duplicate(2, LastTriggerTicks),
%% Cmds1 = lists:duplicate(length(all_list())*2, LastTriggerTicks),
Cmds = Cmds0 ++
Stabilize1 ++
Cmds1 ++
Stabilize2 ++
[{set,{var,99999999}, {call, ?MODULE, dump_state, []}}],
{_H2, S2, Res} = pulse:run(
fun() ->
{_H, _S, _R} = run_commands(?MODULE, Cmds)
end, [{seed, Seed},
{strategy, unfair}]),
ok = shutdown_hard(),
{Report, Diag} = S2#state.dump_state,
%% Report is ordered by Epoch. For each private projection
%% written during any given epoch, confirm that all chain
%% members appear in only one unique chain, i.e., the sets of
%% unique chains are disjoint.
AllDisjointP = ?MGRTEST:all_reports_are_disjoint(Report),
%% Given the report, we flip it around so that we observe the
%% sets of chain transitions relative to each FLU.
R_Chains = [?MGRTEST:extract_chains_relative_to_flu(FLU, Report) ||
FLU <- all_list()],
R_Projs = [{FLU, [?MGRTEST:chain_to_projection(
FLU, Epoch, UPI, Repairing, all_list()) ||
{Epoch, UPI, Repairing} <- E_Chains]} ||
{FLU, E_Chains} <- R_Chains],
%% For each chain transition experienced by a particular FLU,
%% confirm that each state transition is OK.
Sane =
[{FLU,_SaneRes} = {FLU,?MGR:projection_transitions_are_sane_retrospective(
Ps, FLU)} ||
{FLU, Ps} <- R_Projs],
SaneP = lists:all(fun({_FLU, SaneRes}) -> SaneRes == true end, Sane),
%% The final report item should say that all are agreed_membership.
{_LastEpoch, {ok_disjoint, LastRepXs}} = lists:last(Report),
AgreedOrNot = lists:usort([element(1, X) || X <- LastRepXs]),
%% TODO: Check that we've converged to a single chain with no repairs.
SingleChainNoRepair = case LastRepXs of
[{agreed_membership,{_UPI,[]}}] ->
true;
_ ->
LastRepXs
end,
?WHENFAIL(
begin
?QC_FMT("Res = ~p\n", [Res]),
?QC_FMT("Diag = ~s\n", [Diag]),
?QC_FMT("Report = ~p\n", [Report]),
?QC_FMT("Sane = ~p\n", [Sane]),
?QC_FMT("SingleChainNoRepair failure =\n ~p\n", [SingleChainNoRepair])
end,
conjunction([{res, Res == true orelse Res == ok},
{all_disjoint, AllDisjointP},
{sane, SaneP},
{all_agreed_at_end, AgreedOrNot == [agreed_membership]},
{single_chain_no_repair, SingleChainNoRepair}
]))
end)).
prop_pulse_test_() ->
Timeout = case os:getenv("PULSE_TIME") of
false -> 60;
Val -> list_to_integer(Val)
end,
ExtraTO = case os:getenv("PULSE_SHRINK_TIME") of
false -> 0;
Val2 -> list_to_integer(Val2)
end,
{timeout, (Timeout+ExtraTO+300), % 300 = a bit more fudge time
fun() ->
?assert(eqc:quickcheck(eqc:testing_time(Timeout,
?QC_OUT(prop_pulse()))))
end}.
shutdown_hard() ->
(catch machi_partition_simulator:stop()),
[(catch machi_flu0:stop(X)) || X <- all_list()],
timer:sleep(1),
(catch exit(whereis(machi_partition_simulator), kill)),
[(catch exit(whereis(X), kill)) || X <- all_list()],
erlang:yield(),
ok.
exec_ticks(Num, Mgr_pids) ->
Parent = self(),
Pids = [spawn_link(fun() ->
[begin
erlang:yield(),
Max = 10,
Elapsed =
?MGR:sleep_ranked_order(1, Max, M_name, all_list()),
Res = ?MGR:test_react_to_env(MMM),
timer:sleep(erlang:max(0, Max - Elapsed)),
Res=Res %% ?D({self(), Res})
end || _ <- lists:seq(1,Num)],
Parent ! done
end) || {M_name, MMM} <- lists:zip(all_list(), Mgr_pids) ],
[receive
done ->
ok
after 5000 ->
exit(icky_timeout)
end || _ <- Pids],
ok.
private_projections_are_stable_check(All_list, Mgr_pids) ->
%% TODO: extend the check to look not only for latest num, but
%% also check for flapping, and if yes, to see if all_hosed are
%% all exactly equal.
_ = exec_ticks(40, Mgr_pids),
Private1 = [machi_flu0:proj_get_latest_num(FLU, private) ||
FLU <- All_list],
_ = exec_ticks(5, Mgr_pids),
Private2 = [machi_flu0:proj_get_latest_num(FLU, private) ||
FLU <- All_list],
(Private1 == Private2).
-endif. % PULSE

View file

@ -0,0 +1,259 @@
%% -------------------------------------------------------------------
%%
%% Machi: a small village of replicated files
%%
%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(machi_chain_manager1_test).
-include("machi.hrl").
-include("machi_projection.hrl").
-define(MGR, machi_chain_manager1).
-define(D(X), io:format(user, "~s ~p\n", [??X, X])).
-define(Dw(X), io:format(user, "~s ~w\n", [??X, X])).
-define(FLU_C, machi_flu1_client).
-define(FLU_PC, machi_proxy_flu1_client).
-export([]).
-ifdef(TEST).
-ifdef(EQC).
-include_lib("eqc/include/eqc.hrl").
%% -include_lib("eqc/include/eqc_statem.hrl").
-define(QC_OUT(P),
eqc:on_output(fun(Str, Args) -> io:format(user, Str, Args) end, P)).
-endif.
-include_lib("eunit/include/eunit.hrl").
-compile(export_all).
unanimous_report(Namez) ->
UniquePrivateEs =
lists:usort(lists:flatten(
[element(2, ?FLU_PC:list_all_projections(FLU, private)) ||
{_FLUName, FLU} <- Namez])),
[unanimous_report(Epoch, Namez) || Epoch <- UniquePrivateEs].
unanimous_report(Epoch, Namez) ->
Projs = [{FLUName, case ?FLU_PC:read_projection(FLU, private, Epoch) of
{ok, T} -> T;
_Else -> not_in_this_epoch
end} || {FLUName, FLU} <- Namez],
UPI_R_Sums = [{Proj#projection_v1.upi, Proj#projection_v1.repairing,
Proj#projection_v1.epoch_csum} ||
{_FLUname, Proj} <- Projs,
is_record(Proj, projection_v1)],
UniqueUPIs = lists:usort([UPI || {UPI, _Repairing, _CSum} <- UPI_R_Sums]),
Res =
[begin
case lists:usort([CSum || {U, _Repairing, CSum} <- UPI_R_Sums,
U == UPI]) of
[_1CSum] ->
%% Yay, there's only 1 checksum. Let's check
%% that all FLUs are in agreement.
{UPI, Repairing, _CSum} =
lists:keyfind(UPI, 1, UPI_R_Sums),
%% TODO: make certain that this subtlety doesn't get
%% last in later implementations.
%% So, this is a bit of a tricky thing. If we're at
%% upi=[c] and repairing=[a,b], then the transition
%% (eventually!) to upi=[c,a] does not currently depend
%% on b being an active participant in the repair.
%%
%% Yes, b's state is very important for making certain
%% that all repair operations succeed both to a & b.
%% However, in this simulation, we only consider that
%% the head(Repairing) is sane. Therefore, we use only
%% the "HeadOfRepairing" in our considerations here.
HeadOfRepairing = case Repairing of
[H_Rep|_] ->
[H_Rep];
_ ->
[]
end,
Tmp = [{FLU, case proplists:get_value(FLU, Projs) of
P when is_record(P, projection_v1) ->
P#projection_v1.epoch_csum;
Else ->
Else
end} || FLU <- UPI ++ HeadOfRepairing],
case lists:usort([CSum || {_FLU, CSum} <- Tmp]) of
[_] ->
{agreed_membership, {UPI, Repairing}};
Else2 ->
{not_agreed, {UPI, Repairing}, Else2}
end;
_Else ->
{UPI, not_unique, Epoch, _Else}
end
end || UPI <- UniqueUPIs],
AgreedResUPI_Rs = [UPI++Repairing ||
{agreed_membership, {UPI, Repairing}} <- Res],
Tag = case lists:usort(lists:flatten(AgreedResUPI_Rs)) ==
lists:sort(lists:flatten(AgreedResUPI_Rs)) of
true ->
ok_disjoint;
false ->
bummer_NOT_DISJOINT
end,
{Epoch, {Tag, Res}}.
all_reports_are_disjoint(Report) ->
[] == [X || {_Epoch, Tuple}=X <- Report,
element(1, Tuple) /= ok_disjoint].
extract_chains_relative_to_flu(FLU, Report) ->
{FLU, [{Epoch, UPI, Repairing} ||
{Epoch, {ok_disjoint, Es}} <- Report,
{agreed_membership, {UPI, Repairing}} <- Es,
lists:member(FLU, UPI) orelse lists:member(FLU, Repairing)]}.
chain_to_projection(MyName, Epoch, UPI_list, Repairing_list, All_list) ->
exit({todo_broken_fixme,?MODULE,?LINE}),
machi_projection:new(Epoch, MyName, All_list,
All_list -- (UPI_list ++ Repairing_list),
UPI_list, Repairing_list, []).
-ifndef(PULSE).
smoke0_test() ->
{ok, _} = machi_partition_simulator:start_link({1,2,3}, 50, 50),
Host = "localhost",
TcpPort = 6623,
{ok, FLUa} = machi_flu1:start_link([{a,TcpPort,"./data.a"}]),
Pa = #p_srvr{name=a, proto=ipv4, address=Host, port=TcpPort},
Members_Dict = machi_projection:make_members_dict([Pa]),
%% Egadz, more racing on startup, yay. TODO fix.
timer:sleep(1),
{ok, FLUaP} = ?FLU_PC:start_link(Pa),
{ok, M0} = ?MGR:start_link(a, Members_Dict, [{active_mode, false}]),
_SockA = machi_util:connect(Host, TcpPort),
try
pong = ?MGR:ping(M0)
after
ok = ?MGR:stop(M0),
ok = machi_flu1:stop(FLUa),
ok = ?FLU_PC:quit(FLUaP),
ok = machi_partition_simulator:stop()
end.
smoke1_test() ->
machi_partition_simulator:start_link({1,2,3}, 100, 0),
TcpPort = 62777,
FluInfo = [{a,TcpPort+0,"./data.a"}, {b,TcpPort+1,"./data.b"}, {c,TcpPort+2,"./data.c"}],
P_s = [#p_srvr{name=Name, address="localhost", port=Port} ||
{Name,Port,_Dir} <- FluInfo],
[machi_flu1_test:clean_up_data_dir(Dir) || {_,_,Dir} <- FluInfo],
FLUs = [element(2, machi_flu1:start_link([{Name,Port,Dir}])) ||
{Name,Port,Dir} <- FluInfo],
MembersDict = machi_projection:make_members_dict(P_s),
{ok, M0} = ?MGR:start_link(a, MembersDict, [{active_mode,false}]),
try
{ok, P1} = ?MGR:test_calc_projection(M0, false),
{local_write_result, ok,
{remote_write_results, [{b,ok},{c,ok}]}} =
?MGR:test_write_public_projection(M0, P1),
{unanimous, P1, Extra1} = ?MGR:test_read_latest_public_projection(M0, false),
ok
after
ok = ?MGR:stop(M0),
[ok = machi_flu1:stop(X) || X <- FLUs],
ok = machi_partition_simulator:stop()
end.
nonunanimous_setup_and_fix_test() ->
%% TODO attack list:
%% __ Add start option to chain manager to be "passive" only, i.e.,
%% not immediately go to work on
%% 1. Start FLUs with full complement of FLU+proj+chmgr.
%% 2. Put each of them under a supervisor?
%% - Sup proc could be a created-specifically-for-test thing, perhaps?
%% Rather than relying on a supervisor with reg name + OTP app started
%% plus plus more more yaddayadda?
%% 3. Add projection catalog/orddict of #p_srvr records??
%% 4. Fix this test, etc etc.
machi_partition_simulator:start_link({1,2,3}, 100, 0),
TcpPort = 62877,
FluInfo = [{a,TcpPort+0,"./data.a"}, {b,TcpPort+1,"./data.b"}],
P_s = [#p_srvr{name=Name, address="localhost", port=Port} ||
{Name,Port,_Dir} <- FluInfo],
[machi_flu1_test:clean_up_data_dir(Dir) || {_,_,Dir} <- FluInfo],
FLUs = [element(2, machi_flu1:start_link([{Name,Port,Dir}])) ||
{Name,Port,Dir} <- FluInfo],
[Proxy_a, Proxy_b] = Proxies =
[element(2,?FLU_PC:start_link(P)) || P <- P_s],
MembersDict = machi_projection:make_members_dict(P_s),
XX = [],
%% XX = [{private_write_verbose,true}],
{ok, Ma} = ?MGR:start_link(a, MembersDict, [{active_mode, false}]++XX),
{ok, Mb} = ?MGR:start_link(b, MembersDict, [{active_mode, false}]++XX),
try
{ok, P1} = ?MGR:test_calc_projection(Ma, false),
P1a = machi_projection:update_checksum(
P1#projection_v1{down=[b], upi=[a], dbg=[{hackhack, ?LINE}]}),
P1b = machi_projection:update_checksum(
P1#projection_v1{author_server=b, creation_time=now(),
down=[a], upi=[b], dbg=[{hackhack, ?LINE}]}),
%% Scribble different projections
ok = ?FLU_PC:write_projection(Proxy_a, public, P1a),
ok = ?FLU_PC:write_projection(Proxy_b, public, P1b),
%% ?D(x),
{not_unanimous,_,_}=_XX = ?MGR:test_read_latest_public_projection(Ma, false),
%% ?Dw(_XX),
{not_unanimous,_,_}=_YY = ?MGR:test_read_latest_public_projection(Ma, true),
%% The read repair here doesn't automatically trigger the creation of
%% a new projection (to try to create a unanimous projection). So
%% we expect nothing to change when called again.
{not_unanimous,_,_}=_YY = ?MGR:test_read_latest_public_projection(Ma, true),
{now_using, _, EpochNum_a} = ?MGR:test_react_to_env(Ma),
{no_change, _, EpochNum_a} = ?MGR:test_react_to_env(Ma),
{unanimous,P2,_E2} = ?MGR:test_read_latest_public_projection(Ma, false),
{ok, P2pa} = ?FLU_PC:read_latest_projection(Proxy_a, private),
P2 = P2pa#projection_v1{dbg2=[]},
%% FLUb should have nothing written to private because it hasn't
%% reacted yet.
{error, not_written} = ?FLU_PC:read_latest_projection(Proxy_b, private),
%% Poke FLUb to react ... should be using the same private proj
%% as FLUa.
{now_using, _, EpochNum_a} = ?MGR:test_react_to_env(Mb),
{ok, P2pb} = ?FLU_PC:read_latest_projection(Proxy_b, private),
P2 = P2pb#projection_v1{dbg2=[]},
ok
after
ok = ?MGR:stop(Ma),
ok = ?MGR:stop(Mb),
[ok = ?FLU_PC:quit(X) || X <- Proxies],
[ok = machi_flu1:stop(X) || X <- FLUs],
ok = machi_partition_simulator:stop()
end.
-endif. % not PULSE
-endif. % TEST

View file

@ -33,7 +33,12 @@ setup_test_flu(RegName, TcpPort, DataDir) ->
setup_test_flu(RegName, TcpPort, DataDir, []).
setup_test_flu(RegName, TcpPort, DataDir, DbgProps) ->
clean_up_data_dir(DataDir),
case proplists:get_value(save_data_dir, DbgProps) of
true ->
ok;
_ ->
clean_up_data_dir(DataDir)
end,
{ok, FLU1} = ?FLU:start_link([{RegName, TcpPort, DataDir},
{dbg, DbgProps}]),
@ -125,19 +130,21 @@ flu_projection_smoke_test() ->
FLU1 = setup_test_flu(projection_test_flu, TcpPort, DataDir),
try
[begin
{ok, -1} = ?FLU_C:get_latest_epoch(Host, TcpPort, T),
{ok, {-1,_}} = ?FLU_C:get_latest_epoch(Host, TcpPort, T),
{error, not_written} =
?FLU_C:read_latest_projection(Host, TcpPort, T),
{ok, []} = ?FLU_C:list_all(Host, TcpPort, T),
{ok, []} = ?FLU_C:get_all(Host, TcpPort, T),
{ok, []} = ?FLU_C:list_all_projections(Host, TcpPort, T),
{ok, []} = ?FLU_C:get_all_projections(Host, TcpPort, T),
P1 = machi_projection:new(1, a, [a], [], [a], [], []),
P_a = #p_srvr{name=a},
P1 = machi_projection:new(1, a, [P_a], [], [a], [], []),
ok = ?FLU_C:write_projection(Host, TcpPort, T, P1),
{error, written} = ?FLU_C:write_projection(Host, TcpPort, T, P1),
{ok, P1} = ?FLU_C:read_projection(Host, TcpPort, T, 1),
{ok, {1,_}} = ?FLU_C:get_latest_epoch(Host, TcpPort, T),
{ok, P1} = ?FLU_C:read_latest_projection(Host, TcpPort, T),
{ok, [1]} = ?FLU_C:list_all(Host, TcpPort, T),
{ok, [P1]} = ?FLU_C:get_all(Host, TcpPort, T),
{ok, [1]} = ?FLU_C:list_all_projections(Host, TcpPort, T),
{ok, [P1]} = ?FLU_C:get_all_projections(Host, TcpPort, T),
{error, not_written} = ?FLU_C:read_projection(Host, TcpPort, T, 2)
end || T <- [public, private] ]
after

View file

@ -0,0 +1,240 @@
%% -------------------------------------------------------------------
%%
%% Machi: a small village of replicated files
%%
%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(machi_partition_simulator).
-behaviour(gen_server).
-ifdef(TEST).
-ifdef(EQC).
-include_lib("eqc/include/eqc.hrl").
-endif.
-ifdef(PULSE).
-compile({parse_transform, pulse_instrument}).
-endif.
-export([start_link/3, stop/0,
get/1, reset_thresholds/2,
no_partitions/0, always_last_partitions/0, always_these_partitions/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-export([islands2partitions/1,
partition2connection/2,
connection2partition/2,
partitions2num_islands/2,
partition_list_is_symmetric_p/2]).
-define(TAB, ?MODULE).
-record(state, {
seed,
old_partitions,
old_threshold,
no_partition_threshold,
method=oneway_partitions :: 'island' | 'oneway_partitions'
}).
start_link(Seed, OldThreshold, NoPartitionThreshold) ->
gen_server:start_link({local, ?MODULE}, ?MODULE,
{Seed, OldThreshold, NoPartitionThreshold}, []).
stop() ->
gen_server:call(?MODULE, {stop}, infinity).
get(Nodes) ->
gen_server:call(?MODULE, {get, Nodes}, infinity).
reset_thresholds(OldThreshold, NoPartitionThreshold) ->
gen_server:call(?MODULE, {reset_thresholds, OldThreshold, NoPartitionThreshold}, infinity).
no_partitions() ->
reset_thresholds(-999, 999).
always_last_partitions() ->
reset_thresholds(999, 0).
always_these_partitions(Parts) ->
reset_thresholds(999, 0),
gen_server:call(?MODULE, {always_these_partitions, Parts}, infinity).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
init({Seed, OldThreshold, NoPartitionThreshold}) ->
{ok, #state{seed=Seed,
old_partitions={[],[[]]},
old_threshold=OldThreshold,
no_partition_threshold=NoPartitionThreshold}}.
handle_call({get, Nodes}, _From, S) ->
{Seed2, Partitions} =
calc_network_partitions(S#state.method,
Nodes,
S#state.seed,
S#state.old_partitions,
S#state.old_threshold,
S#state.no_partition_threshold),
{reply, Partitions, S#state{seed=Seed2,
old_partitions=Partitions}};
handle_call({reset_thresholds, OldThreshold, NoPartitionThreshold}, _From, S) ->
{reply, ok, S#state{old_threshold=OldThreshold,
no_partition_threshold=NoPartitionThreshold}};
handle_call({always_these_partitions, Parts}, _From, S) ->
{reply, ok, S#state{old_partitions={Parts,[na_reset_by_always]}}};
handle_call({stop}, _From, S) ->
{stop, normal, ok, S}.
handle_cast(_Cast, S) ->
{noreply, S}.
handle_info(_Info, S) ->
{noreply, S}.
terminate(_Reason, _S) ->
ok.
code_change(_OldVsn, S, _Extra) ->
{ok, S}.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
calc_network_partitions(Method, Nodes, Seed1, OldPartition,
OldThreshold, NoPartitionThreshold) ->
{Cutoff2, Seed2} = random:uniform_s(100, Seed1),
if Cutoff2 < OldThreshold ->
{Seed2, OldPartition};
true ->
{Cutoff3, Seed3} = random:uniform_s(100, Seed1),
if Cutoff3 < NoPartitionThreshold ->
{Seed3, {[], [Nodes]}};
true ->
make_network_partition_locations(Method, Nodes, Seed3)
end
end.
make_network_partition_locations(island=_Method, Nodes, Seed1) ->
Num = length(Nodes),
{Seed2, WeightsNodes} = lists:foldl(
fun(Node, {Seeda, Acc}) ->
{Cutoff0, Seedb} =
random:uniform_s(100, Seeda),
Cutoff = erlang:max(
2, if Cutoff0 rem 4 == 0 ->
0;
true ->
Cutoff0
end),
{Seedb, [{Cutoff, Node}|Acc]}
end, {Seed1, []}, Nodes),
IslandSep = 100 div Num,
Islands = [
lists:sort([Nd || {Weight, Nd} <- WeightsNodes,
(Max - IslandSep) =< Weight, Weight < Max])
|| Max <- lists:seq(IslandSep + 1, 105, IslandSep)],
{Seed2, {lists:usort(islands2partitions(Islands)), lists:sort(Islands)}};
make_network_partition_locations(oneway_partitions=_Method, Nodes, Seed1) ->
Pairs = make_all_pairs(Nodes),
Num = length(Pairs),
{Seed2, Weights} = lists:foldl(
fun(_, {Seeda, Acc}) ->
{Cutoff, Seedb} = random:uniform_s(100, Seeda),
{Seedb, [Cutoff|Acc]}
end, {Seed1, []}, lists:seq(1, Num)),
{Cutoff3, Seed3} = random:uniform_s(100, Seed2),
{Seed3, {[X || {Weight, X} <- lists:zip(Weights, Pairs),
Weight < Cutoff3], [islands_not_supported]}}.
make_all_pairs(L) ->
lists:flatten(make_all_pairs2(lists:usort(L))).
make_all_pairs2([]) ->
[];
make_all_pairs2([_]) ->
[];
make_all_pairs2([H1|T]) ->
[[{H1, X}, {X, H1}] || X <- T] ++ make_all_pairs(T).
islands2partitions([]) ->
[];
islands2partitions([Island|Rest]) ->
[{X,Y} || X <- Island,
Y <- lists:append(Rest), X /= Y]
++
[{Y,X} || X <- Island,
Y <- lists:append(Rest), X /= Y]
++
islands2partitions(Rest).
partition2connection(Members0, Partition0) ->
p2c_invert(lists:usort(Members0), lists:usort(Partition0)).
connection2partition(Members0, Partition0) ->
p2c_invert(lists:usort(Members0), lists:usort(Partition0)).
p2c_invert(Members, Partition_list_Or_Connection_list) ->
All = [{X,Y} || X <- Members, Y <- Members, X /= Y],
All -- Partition_list_Or_Connection_list.
partitions2num_islands(Members0, Partition0) ->
%% Ignore duplicates in either arg, if any.
Members = lists:usort(Members0),
Partition = lists:usort(Partition0),
Connections = partition2connection(Members, Partition),
Cs = [lists:member({X,Y}, Connections)
orelse
lists:member({Y,X}, Connections) || X <- Members, Y <- Members,
X /= Y],
case lists:usort(Cs) of
[true] -> 1;
[false] -> many;
[false, true] -> many % TODO too lazy to finish
end.
partition_list_is_symmetric_p(Members0, Partition0) ->
%% %% Ignore duplicates in either arg, if any.
Members = lists:usort(Members0),
NumMembers = length(Members),
Partition = lists:usort(Partition0),
NewDict = lists:foldl(
fun({A,B}, Dict) ->
Key = if A > B -> {A,B};
true -> {B,A}
end,
orddict:update_counter(Key, 1, Dict)
end, orddict:new(), Partition),
AllOddP = orddict:fold(
fun(_Key, Count, true) when Count rem 2 == 0 ->
true;
(_, _, _) ->
false
end, true, NewDict),
if not AllOddP ->
false;
true ->
TwosCount = [Key || {Key, Count} <- orddict:to_list(NewDict),
Count == 2],
length(TwosCount) >= (NumMembers - 1)
end.
-endif. % TEST

View file

@ -25,36 +25,45 @@
-include("machi_projection.hrl").
new_test() ->
new_fake(Name) ->
#p_srvr{name=Name}.
%% Bleh, hey QuickCheck ... except that any model probably equals
%% code under test, bleh.
true = try_it(a, [a,b,c], [a,b], [], [c], []),
true = try_it(<<"a">>, [<<"a">>,b,c], [<<"a">>,b], [], [c], []),
Servers = [#p_srvr{name=a}, #p_srvr{name=b}, #p_srvr{name=c}],
Servers_bad1 = [#p_srvr{name= <<"a">>}, #p_srvr{name=b}, #p_srvr{name=c}],
Servers_bad2 = [#p_srvr{name=z}, #p_srvr{name=b}, #p_srvr{name=c}],
new_test() ->
All0 = [new_fake(X) || X <- [a,b,c]],
All_binA = [new_fake(<<"a">>)] ++ [new_fake(X) || X <- [b,c]],
true = try_it(a, All0, [a,b], [], [c], []),
true = try_it(<<"a">>, All_binA, [<<"a">>,b], [], [c], []),
Servers = All0,
Servers_bad1 = [new_fake(X) || X <- [<<"a">>,b,c]],
Servers_bad2 = [new_fake(X) || X <- [z,b,c]],
true = try_it(a, Servers, [a,b], [], [c], []),
false = try_it(a, not_list, [a,b], [], [c], []),
false = try_it(a, [a,b,c], not_list, [], [c], []),
false = try_it(a, [a,b,c], [a,b], not_list, [c], []),
false = try_it(a, [a,b,c], [a,b], [], not_list, []),
false = try_it(a, [a,b,c], [a,b], [], [c], not_list),
false = try_it(a, All0, not_list, [], [c], []),
false = try_it(a, All0, [a,b], not_list, [c], []),
false = try_it(a, All0, [a,b], [], not_list, []),
false = try_it(a, All0, [a,b], [], [c], not_list),
false = try_it(<<"x">>, [a,b,c], [a,b], [], [c], []),
false = try_it(a, [a,b,c], [a,b,c], [], [c], []),
false = try_it(a, [a,b,c], [a,b], [c], [c], []),
false = try_it(a, [a,b,c], [a,b], [], [c,c], []),
false = try_it(<<"x">>, All0, [a,b], [], [c], []),
false = try_it(a, All0, [a,b,c], [], [c], []),
false = try_it(a, All0, [a,b], [c], [c], []),
false = try_it(a, All0, [a,b], [], [c,c], []),
false = try_it(a, Servers_bad1, [a,b], [], [c], []),
false = try_it(a, Servers_bad2, [a,b], [], [c], []),
ok.
compare_test() ->
P0 = machi_projection:new(0, a, [a,b,c], [a,b], [], [c], []),
P1a = machi_projection:new(1, a, [a,b,c], [a,b], [], [c], []),
P1b = machi_projection:new(1, b, [a,b,c], [a,b], [], [c], []),
P2 = machi_projection:new(2, a, [a,b,c], [a,b], [], [c], []),
All0 = [new_fake(X) || X <- [a,b,c]],
P0 = machi_projection:new(0, a, All0, [a,b], [], [c], []),
P1a = machi_projection:new(1, a, All0, [a,b], [], [c], []),
P1b = machi_projection:new(1, b, All0, [a,b], [], [c], []),
P2 = machi_projection:new(2, a, All0, [a,b], [], [c], []),
0 = machi_projection:compare(P0, P0),
-1 = machi_projection:compare(P0, P1a),

View file

@ -0,0 +1,87 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(machi_proxy_flu1_client_test).
-compile(export_all).
-include("machi_projection.hrl").
-include_lib("eunit/include/eunit.hrl").
-define(MUT, machi_proxy_flu1_client).
-ifdef(TEST).
api_smoke_test() ->
RegName = api_smoke_flu,
Host = "localhost",
TcpPort = 57124,
DataDir = "./data.api_smoke_flu",
FLU1 = machi_flu1_test:setup_test_flu(RegName, TcpPort, DataDir),
erase(flu_pid),
try
I = #p_srvr{name=RegName, proto=ipv4, address=Host, port=TcpPort},
{ok, Prox1} = ?MUT:start_link(I),
try
FakeEpoch = {-1, <<0:(20*8)/big>>},
[{ok, {_,_,_}} = ?MUT:append_chunk(Prox1,
FakeEpoch, <<"prefix">>, <<"data">>,
infinity) || _ <- lists:seq(1,5)],
%% Stop the FLU, what happens?
machi_flu1:stop(FLU1),
{error,_} = ?MUT:append_chunk(Prox1,
FakeEpoch, <<"prefix">>, <<"data">>,
infinity),
{error,not_connected} = ?MUT:append_chunk(Prox1,
FakeEpoch, <<"prefix">>, <<"data">>,
infinity),
%% Start the FLU again, we should be able to do stuff immediately
FLU1b = machi_flu1_test:setup_test_flu(RegName, TcpPort, DataDir,
[save_data_dir]),
put(flu_pid, FLU1b),
MyChunk = <<"my chunk data">>,
{ok, {MyOff,MySize,MyFile}} =
?MUT:append_chunk(Prox1, FakeEpoch, <<"prefix">>, MyChunk,
infinity),
{ok, MyChunk} = ?MUT:read_chunk(Prox1, FakeEpoch, MyFile, MyOff, MySize),
%% Alright, now for the rest of the API, whee
BadFile = <<"no-such-file">>,
{error, no_such_file} = ?MUT:checksum_list(Prox1, FakeEpoch, BadFile),
{ok, [_]} = ?MUT:list_files(Prox1, FakeEpoch),
{ok, FakeEpoch} = ?MUT:get_latest_epoch(Prox1, public),
{error, not_written} = ?MUT:read_latest_projection(Prox1, public),
{error, not_written} = ?MUT:read_projection(Prox1, public, 44),
P_a = #p_srvr{name=a, address="localhost", port=6622},
P1 = machi_projection:new(1, a, [P_a], [], [a], [], []),
ok = ?MUT:write_projection(Prox1, public, P1),
{ok, P1} = ?MUT:read_projection(Prox1, public, 1),
{ok, [P1]} = ?MUT:get_all_projections(Prox1, public),
{ok, [1]} = ?MUT:list_all_projections(Prox1, public),
ok
after
_ = (catch ?MUT:quit(Prox1))
end
after
(catch machi_flu1:stop(FLU1)),
(catch machi_flu1:stop(get(flu_pid)))
end.
-endif. % TEST