Merge branch 'slf/client-side-chain-replication'

This commit is contained in:
Scott Lystig Fritchie 2015-05-20 11:06:05 +09:00
commit 19510831a4
14 changed files with 1158 additions and 103 deletions

View file

@ -37,14 +37,17 @@ func, and pattern match Erlang style in that func.
- Add no-wedging state to make testing easier?
** TODO Adapt the projection-aware, CR-implementing client from demo-day
** DONE Adapt the projection-aware, CR-implementing client from demo-day
** DONE Add major comment sections to the CR-impl client
** TODO Simple basho_bench driver, put some unscientific chalk on the benchtop
** TODO Create parallel PULSE test for basic API plus chain manager repair
** TODO Add client-side vs. server-side checksum type, expand client API?
** TODO Add gproc and get rid of registered name rendezvous
*** TODO Fixes the atom table leak
*** TODO Fixes the problem of having active sequencer for the same prefix
on two FLUS in the same VM
** TODO Fix all known bugs with Chain Manager (list below)
** TODO Fix all known bugs/cruft with Chain Manager (list below)
*** DONE Fix known bugs
*** DONE Clean up crufty TODO comments and other obvious cruft
*** TODO Re-add verification step of stable epochs, including inner projections!

View file

@ -21,6 +21,7 @@
-ifndef(MACHI_PROJECTION_HRL).
-define(MACHI_PROJECTION_HRL, true).
-type pv1_consistency_mode() :: 'ap_mode' | 'cp_mode'.
-type pv1_csum() :: binary().
-type pv1_epoch() :: {pv1_epoch_n(), pv1_csum()}.
-type pv1_epoch_n() :: non_neg_integer().
@ -46,6 +47,7 @@
author_server :: pv1_server(),
all_members :: [pv1_server()],
creation_time :: pv1_timestamp(),
mode = ap_mode :: pv1_consistency_mode(),
upi :: [pv1_server()],
repairing :: [pv1_server()],
down :: [pv1_server()],

View file

@ -466,11 +466,12 @@ cl_read_latest_projection(ProjectionType, AllHosed, S) ->
{All_queried_list, FLUsRs, S2} =
read_latest_projection_call_only(ProjectionType, AllHosed, S),
rank_and_sort_projections_with_extra(All_queried_list, FLUsRs, S2).
rank_and_sort_projections_with_extra(All_queried_list, FLUsRs,
ProjectionType, S2).
rank_and_sort_projections_with_extra(All_queried_list, FLUsRs,
rank_and_sort_projections_with_extra(All_queried_list, FLUsRs, ProjectionType,
#ch_mgr{name=MyName,proj=CurrentProj}=S) ->
UnwrittenRs = [x || {_, error_unwritten} <- FLUsRs],
UnwrittenRs = [x || {_, {error, not_written}} <- FLUsRs],
Ps = [Proj || {_FLU, Proj} <- FLUsRs, is_record(Proj, projection_v1)],
BadAnswerFLUs = [FLU || {FLU, Answer} <- FLUsRs,
not is_record(Answer, projection_v1)],
@ -489,7 +490,7 @@ rank_and_sort_projections_with_extra(All_queried_list, FLUsRs,
{trans_all_hosed, []},
{trans_all_flap_counts, []}],
{not_unanimous, NoneProj, Extra2, S};
UnwrittenRs /= [] ->
ProjectionType == public, UnwrittenRs /= [] ->
{needs_repair, FLUsRs, [flarfus], S};
true ->
[{_Rank, BestProj}|_] = rank_and_sort_projections(Ps, CurrentProj),
@ -516,7 +517,7 @@ rank_and_sort_projections_with_extra(All_queried_list, FLUsRs,
end.
do_read_repair(FLUsRs, _Extra, #ch_mgr{proj=CurrentProj} = S) ->
Unwrittens = [x || {_FLU, error_unwritten} <- FLUsRs],
Unwrittens = [x || {_FLU, {error, not_written}} <- FLUsRs],
Ps = [Proj || {_FLU, Proj} <- FLUsRs, is_record(Proj, projection_v1)],
if Unwrittens == [] orelse Ps == [] ->
{nothing_to_do, S};
@ -762,17 +763,10 @@ rank_projection(#projection_v1{author_server=Author,
(N*N * length(UPI_list)).
do_set_chain_members_dict(MembersDict, #ch_mgr{proxies_dict=OldProxiesDict}=S)->
catch orddict:fold(
fun(_K, Pid, _Acc) ->
_ = (catch ?FLU_PC:quit(Pid))
end, [], OldProxiesDict),
Proxies = orddict:fold(
fun(K, P, Acc) ->
{ok, Pid} = ?FLU_PC:start_link(P),
[{K, Pid}|Acc]
end, [], MembersDict),
_ = ?FLU_PC:stop_proxies(OldProxiesDict),
ProxiesDict = ?FLU_PC:start_proxies(MembersDict),
{ok, S#ch_mgr{members_dict=MembersDict,
proxies_dict=orddict:from_list(Proxies)}}.
proxies_dict=ProxiesDict}}.
do_react_to_env(#ch_mgr{name=MyName,
proj=#projection_v1{epoch_number=Epoch,
@ -1936,9 +1930,12 @@ perhaps_start_repair(
%% RepairOpts = [{repair_mode, check}, verbose],
RepairFun = fun() -> do_repair(S, RepairOpts, ap_mode) end,
LastUPI = lists:last(UPI),
IgnoreStabilityTime_p = proplists:get_value(ignore_stability_time,
S#ch_mgr.opts, false),
case timer:now_diff(os:timestamp(), Start) div 1000000 of
N when MyName == LastUPI,
N >= ?REPAIR_START_STABILITY_TIME ->
N when MyName == LastUPI andalso
(IgnoreStabilityTime_p orelse
N >= ?REPAIR_START_STABILITY_TIME) ->
{WorkerPid, _Ref} = spawn_monitor(RepairFun),
S#ch_mgr{repair_worker=WorkerPid,
repair_start=os:timestamp(),

View file

@ -107,8 +107,6 @@ repair(ap_mode=ConsistencyMode, Src, Repairing, UPI, MembersDict, ETS, Opts) ->
catch
What:Why ->
Stack = erlang:get_stacktrace(),
io:format(user, "What Why ~p ~p @\n\t~p\n",
[What, Why, Stack]),
{error, {What, Why, Stack}}
after
[(catch machi_proxy_flu1_client:quit(Pid)) ||

674
src/machi_cr_client.erl Normal file
View file

@ -0,0 +1,674 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
%% @doc Erlang API for the Machi client-implemented Chain Replication
%% (CORFU-style) protocol.
%%
%% The major operation processing is implemented in a state machine-like
%% manner. Before attempting an operation `X', there's an initial
%% operation `pre-X' that takes care of updating the epoch id,
%% restarting client protocol proxies, and if there's any server
%% instability (e.g. some server is wedged), then insert some sleep
%% time. When the chain appears to have stabilized, then we try the `X'
%% operation again.
%%
%% Function name for the `pre-X' stuff is usually `X()', and the
%% function name for the `X' stuff is usually `X2()'. (I.e., the `X'
%% stuff follows after `pre-X' and therefore has a `2' suffix on the
%% function name.)
%%
%% In the case of read repair, there are two stages: find the value to
%% perform the repair, then perform the repair writes. In the case of
%% the repair writes, the `pre-X' function is named `read_repair3()',
%% and the `X' function is named `read_repair4()'.
%%
%% TODO: It would be nifty to lift the very-nearly-but-not-quite-boilerplate
%% of the `pre-X' functions into a single common function ... but I'm not
%% sure yet on how to do it without making the code uglier.
-module(machi_cr_client).
-behaviour(gen_server).
-include("machi.hrl").
-include("machi_projection.hrl").
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif. % TEST.
-export([start_link/1]).
%% FLU1 API
-export([
%% File API
append_chunk/3, append_chunk/4,
append_chunk_extra/4, append_chunk_extra/5,
read_chunk/4, read_chunk/5,
checksum_list/2, checksum_list/3,
list_files/1, list_files/2,
%% Common API
quit/1
]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-define(FLU_PC, machi_proxy_flu1_client).
-define(TIMEOUT, 2*1000).
-define(MAX_RUNTIME, 8*1000).
-record(state, {
members_dict :: p_srvr_dict(),
proxies_dict :: orddict:orddict(),
epoch_id,
proj,
bad_proj
}).
%% @doc Start a local, long-lived process that will be our steady
%% &amp; reliable communication proxy with the fickle &amp; flaky
%% remote Machi server.
start_link(P_srvr_list) ->
gen_server:start_link(?MODULE, [P_srvr_list], []).
%% @doc Append a chunk (binary- or iolist-style) of data to a file
%% with `Prefix'.
append_chunk(PidSpec, Prefix, Chunk) ->
append_chunk(PidSpec, Prefix, Chunk, infinity).
%% @doc Append a chunk (binary- or iolist-style) of data to a file
%% with `Prefix'.
append_chunk(PidSpec, Prefix, Chunk, Timeout) ->
append_chunk_extra(PidSpec, Prefix, Chunk, 0, Timeout).
%% @doc Append a chunk (binary- or iolist-style) of data to a file
%% with `Prefix'.
append_chunk_extra(PidSpec, Prefix, Chunk, ChunkExtra)
when is_integer(ChunkExtra), ChunkExtra >= 0 ->
append_chunk_extra(PidSpec, Prefix, Chunk, ChunkExtra, infinity).
%% @doc Append a chunk (binary- or iolist-style) of data to a file
%% with `Prefix'.
append_chunk_extra(PidSpec, Prefix, Chunk, ChunkExtra, Timeout) ->
gen_server:call(PidSpec, {req, {append_chunk_extra, Prefix,
Chunk, ChunkExtra}},
Timeout).
%% %% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
read_chunk(PidSpec, File, Offset, Size) ->
read_chunk(PidSpec, File, Offset, Size, infinity).
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
read_chunk(PidSpec, File, Offset, Size, Timeout) ->
gen_server:call(PidSpec, {req, {read_chunk, File, Offset, Size}},
Timeout).
%% @doc Fetch the list of chunk checksums for `File'.
checksum_list(PidSpec, File) ->
checksum_list(PidSpec, File, infinity).
%% @doc Fetch the list of chunk checksums for `File'.
checksum_list(PidSpec, File, Timeout) ->
gen_server:call(PidSpec, {req, {checksum_list, File}},
Timeout).
%% @doc Fetch the list of all files on the remote FLU.
list_files(PidSpec) ->
list_files(PidSpec, infinity).
%% @doc Fetch the list of all files on the remote FLU.
list_files(PidSpec, Timeout) ->
gen_server:call(PidSpec, {req, {list_files}},
Timeout).
%% @doc Quit &amp; close the connection to remote FLU and stop our
%% proxy process.
quit(PidSpec) ->
gen_server:call(PidSpec, quit, infinity).
%%%%%%%%%%%%%%%%%%%%%%%%%%%
init([P_srvr_list]) ->
MembersDict = orddict:from_list([{P#p_srvr.name, P} || P <- P_srvr_list]),
ProxiesDict = ?FLU_PC:start_proxies(MembersDict),
{ok, #state{members_dict=MembersDict, proxies_dict=ProxiesDict}}.
handle_call({req, Req}, From, S) ->
handle_call2(Req, From, update_proj(S));
handle_call(quit, _From, S) ->
{stop, normal, ok, S};
handle_call(_Request, _From, S) ->
Reply = whaaaaaaaaaaaaaaaaaaaa,
{reply, Reply, S}.
handle_cast(_Msg, S) ->
{noreply, S}.
handle_info(_Info, S) ->
io:format(user, "~s:handle_info: ~p\n", [?MODULE, _Info]),
{noreply, S}.
terminate(_Reason, #state{proxies_dict=ProxiesDict}=_S) ->
_ = ?FLU_PC:stop_proxies(ProxiesDict),
ok.
code_change(_OldVsn, S, _Extra) ->
{ok, S}.
%%%%%%%%%%%%%%%%%%%%%%%%%%%
handle_call2({append_chunk_extra, Prefix, Chunk, ChunkExtra}, _From, S) ->
do_append_head(Prefix, Chunk, ChunkExtra, 0, os:timestamp(), S);
handle_call2({read_chunk, File, Offset, Size}, _From, S) ->
do_read_chunk(File, Offset, Size, 0, os:timestamp(), S);
handle_call2({checksum_list, File}, _From, S) ->
do_checksum_list(File, 0, os:timestamp(), S);
handle_call2({list_files}, _From, S) ->
do_list_files(0, os:timestamp(), S).
do_append_head(Prefix, Chunk, ChunkExtra, 0=Depth, STime, S) ->
do_append_head2(Prefix, Chunk, ChunkExtra, Depth + 1, STime, S);
do_append_head(Prefix, Chunk, ChunkExtra, Depth, STime, #state{proj=P}=S) ->
%% io:format(user, "head sleep1,", []),
sleep_a_while(Depth),
DiffMs = timer:now_diff(os:timestamp(), STime) div 1000,
if DiffMs > ?MAX_RUNTIME ->
{reply, {error, partition}, S};
true ->
%% This is suboptimal for performance: there are some paths
%% through this point where our current projection is good
%% enough. But we're going to try to keep the code as simple
%% as we can for now.
S2 = update_proj(S#state{proj=undefined, bad_proj=P}),
case S2#state.proj of
P2 when P2 == undefined orelse
P2#projection_v1.upi == [] ->
do_append_head(Prefix, Chunk, ChunkExtra, Depth + 1,
STime, S2);
_ ->
do_append_head2(Prefix, Chunk, ChunkExtra, Depth + 1,
STime, S2)
end
end.
do_append_head2(Prefix, Chunk, ChunkExtra, Depth, STime,
#state{epoch_id=EpochID, proj=P, proxies_dict=PD}=S) ->
[HeadFLU|RestFLUs] = mutation_flus(P),
Proxy = orddict:fetch(HeadFLU, PD),
case ?FLU_PC:append_chunk_extra(Proxy,
EpochID, Prefix, Chunk, ChunkExtra,
?TIMEOUT) of
{ok, {Offset, _Size, File}=_X} ->
%% io:format(user, "append ~w,", [HeadFLU]),
do_append_midtail(RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra,
[HeadFLU], 0, STime, S);
{error, Retry}
when Retry == partition; Retry == bad_epoch; Retry == wedged ->
do_append_head(Prefix, Chunk, ChunkExtra, Depth, STime, S);
{error, written} ->
%% Implicit sequencing + this error = we don't know where this
%% written block is. But we lost a race. Repeat, with a new
%% sequencer assignment.
do_append_head(Prefix, Chunk, ChunkExtra, Depth, STime, S);
{error, not_written} ->
exit({todo_should_never_happen,?MODULE,?LINE,
Prefix,iolist_size(Chunk)})
end.
do_append_midtail(RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra,
Ws, Depth, STime, S)
when RestFLUs == [] orelse Depth == 0 ->
do_append_midtail2(RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra,
Ws, Depth + 1, STime, S);
do_append_midtail(_RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra,
Ws, Depth, STime, #state{proj=P}=S) ->
%% io:format(user, "midtail sleep2,", []),
sleep_a_while(Depth),
DiffMs = timer:now_diff(os:timestamp(), STime) div 1000,
if DiffMs > ?MAX_RUNTIME ->
{reply, {error, partition}, S};
true ->
S2 = update_proj(S#state{proj=undefined, bad_proj=P}),
case S2#state.proj of
undefined ->
{reply, {error, partition}, S};
P2 ->
RestFLUs2 = mutation_flus(P2),
case RestFLUs2 -- Ws of
RestFLUs2 ->
%% None of the writes that we have done so far
%% are to FLUs that are in the RestFLUs2 list.
%% We are pessimistic here and assume that
%% those FLUs are permanently dead. Start
%% over with a new sequencer assignment, at
%% the 2nd have of the impl (we have already
%% slept & refreshed the projection).
do_append_head2(Prefix, Chunk, ChunkExtra, Depth,
STime, S2);
RestFLUs3 ->
do_append_midtail2(RestFLUs3, Prefix, File, Offset,
Chunk, ChunkExtra,
Ws, Depth + 1, STime, S2)
end
end
end.
do_append_midtail2([], _Prefix, File, Offset, Chunk,
_ChunkExtra, _Ws, _Depth, _STime, S) ->
%% io:format(user, "ok!\n", []),
{reply, {ok, {Offset, iolist_size(Chunk), File}}, S};
do_append_midtail2([FLU|RestFLUs]=FLUs, Prefix, File, Offset, Chunk,
ChunkExtra, Ws, Depth, STime,
#state{epoch_id=EpochID, proxies_dict=PD}=S) ->
Proxy = orddict:fetch(FLU, PD),
case ?FLU_PC:write_chunk(Proxy, EpochID, File, Offset, Chunk, ?TIMEOUT) of
ok ->
%% io:format(user, "write ~w,", [FLU]),
do_append_midtail2(RestFLUs, Prefix, File, Offset, Chunk,
ChunkExtra, [FLU|Ws], Depth, STime, S);
{error, Retry}
when Retry == partition; Retry == bad_epoch; Retry == wedged ->
do_append_midtail(FLUs, Prefix, File, Offset, Chunk,
ChunkExtra, Ws, Depth, STime, S);
{error, written} ->
%% We know what the chunk ought to be, so jump to the
%% middle of read-repair.
Resume = {append, Offset, iolist_size(Chunk), File},
read_repair3(FLUs, Resume, Chunk, [], File, Offset,
iolist_size(Chunk), Depth, STime, S);
{error, not_written} ->
exit({todo_should_never_happen,?MODULE,?LINE,File,Offset})
end.
do_read_chunk(File, Offset, Size, 0=Depth, STime,
#state{proj=#projection_v1{upi=[_|_]}}=S) -> % UPI is non-empty
do_read_chunk2(File, Offset, Size, Depth + 1, STime, S);
do_read_chunk(File, Offset, Size, Depth, STime, #state{proj=P}=S) ->
%% io:format(user, "read sleep1,", []),
sleep_a_while(Depth),
DiffMs = timer:now_diff(os:timestamp(), STime) div 1000,
if DiffMs > ?MAX_RUNTIME ->
{reply, {error, partition}, S};
true ->
S2 = update_proj(S#state{proj=undefined, bad_proj=P}),
case S2#state.proj of
P2 when P2 == undefined orelse
P2#projection_v1.upi == [] ->
do_read_chunk(File, Offset, Size, Depth + 1, STime, S2);
_ ->
do_read_chunk2(File, Offset, Size, Depth + 1, STime, S2)
end
end.
do_read_chunk2(File, Offset, Size, Depth, STime,
#state{proj=P, epoch_id=EpochID, proxies_dict=PD}=S) ->
UPI = readonly_flus(P),
Tail = lists:last(UPI),
ConsistencyMode = P#projection_v1.mode,
case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), EpochID,
File, Offset, Size, ?TIMEOUT) of
{ok, Chunk} when byte_size(Chunk) == Size ->
{reply, {ok, Chunk}, S};
{ok, BadChunk} ->
exit({todo, bad_chunk_size, ?MODULE, ?LINE, File, Offset, Size,
got, byte_size(BadChunk)});
{error, Retry}
when Retry == partition; Retry == bad_epoch; Retry == wedged ->
do_read_chunk(File, Offset, Size, Depth, STime, S);
{error, not_written} ->
read_repair(ConsistencyMode, read, File, Offset, Size, Depth, STime, S);
%% {reply, {error, not_written}, S};
{error, written} ->
exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size})
end.
%% Read repair: depends on the consistency mode that we're in:
%%
%% CP mode: If the head is written, then use it to repair UPI++Repairing.
%% If head is not_written, then do nothing.
%% AP mode: If any FLU in UPI++Repairing is written, then use it to repair
%% UPI+repairing.
%% If all FLUs in UPI++Repairing are not_written, then do nothing.
read_repair(ConsistencyMode, ReturnMode, File, Offset, Size, 0=Depth,
STime, #state{proj=#projection_v1{upi=[_|_]}}=S) -> % UPI is non-empty
read_repair2(ConsistencyMode, ReturnMode, File, Offset, Size, Depth + 1,
STime, S);
read_repair(ConsistencyMode, ReturnMode, File, Offset, Size, Depth,
STime, #state{proj=P}=S) ->
sleep_a_while(Depth),
DiffMs = timer:now_diff(os:timestamp(), STime) div 1000,
if DiffMs > ?MAX_RUNTIME ->
{reply, {error, partition}, S};
true ->
S2 = update_proj(S#state{proj=undefined, bad_proj=P}),
case S2#state.proj of
P2 when P2 == undefined orelse
P2#projection_v1.upi == [] ->
read_repair(ConsistencyMode, ReturnMode, File, Offset,
Size, Depth + 1, STime, S2);
_ ->
read_repair2(ConsistencyMode, ReturnMode, File, Offset,
Size, Depth + 1, STime, S2)
end
end.
read_repair2(cp_mode=ConsistencyMode,
ReturnMode, File, Offset, Size, Depth, STime,
#state{proj=P, epoch_id=EpochID, proxies_dict=PD}=S) ->
Tail = lists:last(readonly_flus(P)),
case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), EpochID,
File, Offset, Size, ?TIMEOUT) of
{ok, Chunk} when byte_size(Chunk) == Size ->
ToRepair = mutation_flus(P) -- [Tail],
read_repair3(ToRepair, ReturnMode, Chunk, [Tail], File, Offset,
Size, Depth, STime, S);
{ok, BadChunk} ->
exit({todo, bad_chunk_size, ?MODULE, ?LINE, File, Offset,
Size, got, byte_size(BadChunk)});
{error, Retry}
when Retry == partition; Retry == bad_epoch; Retry == wedged ->
read_repair(ConsistencyMode, ReturnMode, File, Offset,
Size, Depth, STime, S);
{error, not_written} ->
{reply, {error, not_written}, S};
{error, written} ->
exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size})
end;
read_repair2(ap_mode=ConsistencyMode,
ReturnMode, File, Offset, Size, Depth, STime,
#state{proj=P}=S) ->
Eligible = mutation_flus(P),
case try_to_find_chunk(Eligible, File, Offset, Size, S) of
{ok, Chunk, GotItFrom} when byte_size(Chunk) == Size ->
ToRepair = mutation_flus(P) -- [GotItFrom],
read_repair3(ToRepair, ReturnMode, Chunk, [GotItFrom], File,
Offset, Size, Depth, STime, S);
{ok, BadChunk} ->
exit({todo, bad_chunk_size, ?MODULE, ?LINE, File,
Offset, Size, got, byte_size(BadChunk)});
{error, Retry}
when Retry == partition; Retry == bad_epoch; Retry == wedged ->
read_repair(ConsistencyMode, ReturnMode, File,
Offset, Size, Depth, STime, S);
{error, not_written} ->
{reply, {error, not_written}, S};
{error, written} ->
exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size})
end.
read_repair3([], ReturnMode, Chunk, Repaired, File, Offset,
Size, Depth, STime, S) ->
read_repair4([], ReturnMode, Chunk, Repaired, File, Offset,
Size, Depth, STime, S);
read_repair3(ToRepair, ReturnMode, Chunk, Repaired, File, Offset,
Size, 0=Depth, STime, S) ->
read_repair4(ToRepair, ReturnMode, Chunk, Repaired, File, Offset,
Size, Depth + 1, STime, S);
read_repair3(ToRepair, ReturnMode, Chunk, Repaired, File, Offset,
Size, Depth, STime, #state{proj=P}=S) ->
%% io:format(user, "read_repair3 sleep1,", []),
sleep_a_while(Depth),
DiffMs = timer:now_diff(os:timestamp(), STime) div 1000,
if DiffMs > ?MAX_RUNTIME ->
{reply, {error, partition}, S};
true ->
S2 = update_proj(S#state{proj=undefined, bad_proj=P}),
case S2#state.proj of
P2 when P2 == undefined orelse
P2#projection_v1.upi == [] ->
read_repair3(ToRepair, ReturnMode, Chunk, Repaired, File,
Offset, Size, Depth + 1, STime, S2);
P2 ->
ToRepair2 = mutation_flus(P2) -- Repaired,
read_repair4(ToRepair2, ReturnMode, Chunk, Repaired, File,
Offset, Size, Depth + 1, STime, S2)
end
end.
read_repair4([], ReturnMode, Chunk, _Repaired, File, Offset,
_IgnoreSize, _Depth, _STime, S) ->
%% TODO: add stats for # of repairs, length(_Repaired)-1, etc etc?
case ReturnMode of
read ->
{reply, {ok, Chunk}, S};
{append, Offset, Size, File} ->
{reply, {ok, {Offset, Size, File}}, S}
end;
read_repair4([First|Rest]=ToRepair, ReturnMode, Chunk, Repaired, File, Offset,
Size, Depth, STime, #state{epoch_id=EpochID, proxies_dict=PD}=S) ->
Proxy = orddict:fetch(First, PD),
case ?FLU_PC:write_chunk(Proxy, EpochID, File, Offset, Chunk, ?TIMEOUT) of
ok ->
read_repair4(Rest, ReturnMode, Chunk, [First|Repaired], File,
Offset, Size, Depth, STime, S);
{error, Retry}
when Retry == partition; Retry == bad_epoch; Retry == wedged ->
read_repair3(ToRepair, ReturnMode, Chunk, Repaired, File,
Offset, Size, Depth, STime, S);
{error, written} ->
%% TODO: To be very paranoid, read the chunk here to verify
%% that it is exactly our Chunk.
read_repair4(Rest, ReturnMode, Chunk, Repaired, File,
Offset, Size, Depth, STime, S);
{error, not_written} ->
exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size})
end.
do_checksum_list(File, 0=Depth, STime, S) ->
do_checksum_list2(File, Depth + 1, STime, S);
do_checksum_list(File, Depth, STime, #state{proj=P}=S) ->
sleep_a_while(Depth),
DiffMs = timer:now_diff(os:timestamp(), STime) div 1000,
if DiffMs > ?MAX_RUNTIME ->
{reply, {error, partition}, S};
true ->
%% This is suboptimal for performance: there are some paths
%% through this point where our current projection is good
%% enough. But we're going to try to keep the code as simple
%% as we can for now.
S2 = update_proj(S#state{proj=undefined, bad_proj=P}),
case S2#state.proj of
P2 when P2 == undefined orelse
P2#projection_v1.upi == [] ->
do_checksum_list(File, Depth + 1, STime, S2);
_ ->
do_checksum_list2(File, Depth + 1, STime, S2)
end
end.
do_checksum_list2(File, Depth, STime,
#state{epoch_id=EpochID, proj=P, proxies_dict=PD}=S) ->
Proxy = orddict:fetch(lists:last(readonly_flus(P)), PD),
case ?FLU_PC:checksum_list(Proxy, EpochID, File, ?TIMEOUT) of
{ok, _}=OK ->
{reply, OK, S};
{error, Retry}
when Retry == partition; Retry == bad_epoch; Retry == wedged ->
do_checksum_list(File, Depth, STime, S);
{error, _}=Error ->
{reply, Error, S}
end.
do_list_files(0=Depth, STime, S) ->
do_list_files2(Depth + 1, STime, S);
do_list_files(Depth, STime, #state{proj=P}=S) ->
sleep_a_while(Depth),
DiffMs = timer:now_diff(os:timestamp(), STime) div 1000,
if DiffMs > ?MAX_RUNTIME ->
{reply, {error, partition}, S};
true ->
%% This is suboptimal for performance: there are some paths
%% through this point where our current projection is good
%% enough. But we're going to try to keep the code as simple
%% as we can for now.
S2 = update_proj(S#state{proj=undefined, bad_proj=P}),
case S2#state.proj of
P2 when P2 == undefined orelse
P2#projection_v1.upi == [] ->
do_list_files(Depth + 1, STime, S2);
_ ->
do_list_files2(Depth + 1, STime, S2)
end
end.
do_list_files2(Depth, STime,
#state{epoch_id=EpochID, proj=P, proxies_dict=PD}=S) ->
Proxy = orddict:fetch(lists:last(readonly_flus(P)), PD),
case ?FLU_PC:list_files(Proxy, EpochID, ?TIMEOUT) of
{ok, _}=OK ->
{reply, OK, S};
{error, Retry}
when Retry == partition; Retry == bad_epoch; Retry == wedged ->
do_list_files(Depth, STime, S);
{error, _}=Error ->
{reply, Error, S}
end.
update_proj(#state{proj=undefined}=S) ->
update_proj2(1, S);
update_proj(S) ->
S.
update_proj2(Count, #state{bad_proj=BadProj, proxies_dict=ProxiesDict}=S) ->
Timeout = 2*1000,
WTimeout = 2*Timeout,
Proxies = orddict:to_list(ProxiesDict),
Work = fun({_K, Proxy}) ->
?FLU_PC:read_latest_projection(Proxy, private, Timeout)
end,
Rs = run_middleworker_job(Work, Proxies, WTimeout),
%% TODO: There's a possible bug here when running multiple independent
%% Machi clusters/chains. If our chain used to be [a,b], but our
%% sysadmin has changed our cluster to be [a] and a completely seprate
%% cluster with [b], and if b is reusing the address & port number,
%% then it is possible that choose_best_projs() can incorrectly choose
%% b's projection.
case choose_best_proj(Rs) of
P when P >= BadProj ->
#projection_v1{epoch_number=Epoch, epoch_csum=CSum,
members_dict=NewMembersDict} = P,
EpochID = {Epoch, CSum},
?FLU_PC:stop_proxies(ProxiesDict),
NewProxiesDict = ?FLU_PC:start_proxies(NewMembersDict),
S#state{bad_proj=undefined, proj=P, epoch_id=EpochID,
members_dict=NewMembersDict, proxies_dict=NewProxiesDict};
_ ->
sleep_a_while(Count),
update_proj2(Count + 1, S)
end.
run_middleworker_job(Fun, ArgList, WTimeout) ->
Parent = self(),
MiddleWorker =
spawn(fun() ->
PidsMons =
[spawn_monitor(fun() ->
Res = (catch Fun(Arg)),
exit(Res)
end) || Arg <- ArgList],
Rs = gather_worker_statuses(PidsMons, WTimeout),
Parent ! {res, self(), Rs},
exit(normal)
end),
receive
{res, MiddleWorker, Results} ->
Results
after WTimeout+100 ->
[]
end.
gather_worker_statuses([], _Timeout) ->
[];
gather_worker_statuses([{Pid,Ref}|Rest], Timeout) ->
receive
{'DOWN', R, process, P, Status} when R == Ref, P == Pid ->
[Status|gather_worker_statuses(Rest, Timeout)]
after Timeout ->
gather_worker_statuses(Rest, 0)
end.
choose_best_proj(Rs) ->
WorstEpoch = #projection_v1{epoch_number=-1,epoch_csum= <<>>},
lists:foldl(fun({ok, NewEpoch}, BestEpoch)
when NewEpoch > BestEpoch ->
NewEpoch;
(_, BestEpoch) ->
BestEpoch
end, WorstEpoch, Rs).
try_to_find_chunk(Eligible, File, Offset, Size,
#state{epoch_id=EpochID, proxies_dict=PD}) ->
Timeout = 2*1000,
Work = fun(FLU) ->
Proxy = orddict:fetch(FLU, PD),
case ?FLU_PC:read_chunk(Proxy, EpochID,
File, Offset, Size) of
{ok, Chunk} when byte_size(Chunk) == Size ->
{FLU, {ok, Chunk}};
Else ->
{FLU, Else}
end
end,
Rs = run_middleworker_job(Work, Eligible, Timeout),
case [X || {_, {ok, B}}=X <- Rs, is_binary(B)] of
[{FoundFLU, {ok, Chunk}}|_] ->
{ok, Chunk, FoundFLU};
[] ->
RetryErrs = [partition, bad_epoch, wedged],
case [Err || {error, Err} <- Rs, lists:member(Err, RetryErrs)] of
[SomeErr|_] ->
{error, SomeErr};
[] ->
%% TODO does this really work 100% of the time?
{error, not_written}
end
end.
mutation_flus(#projection_v1{upi=UPI, repairing=Repairing}) ->
UPI ++ Repairing;
mutation_flus(#state{proj=P}) ->
mutation_flus(P).
readonly_flus(#projection_v1{upi=UPI}) ->
UPI;
readonly_flus(#state{proj=P}) ->
readonly_flus(P).
sleep_a_while(0) ->
ok;
sleep_a_while(1) ->
ok;
sleep_a_while(Depth) ->
timer:sleep(30 + trunc(math:pow(1.9, Depth))).

View file

@ -66,6 +66,8 @@
-include("machi.hrl").
-include("machi_projection.hrl").
-define(SERVER_CMD_READ_TIMEOUT, 600*1000).
-export([start_link/1, stop/1,
update_wedge_state/3]).
-export([make_listener_regname/1, make_projection_server_regname/1]).
@ -191,8 +193,15 @@ run_listen_server(#state{flu_name=FluName, tcp_port=TcpPort}=S) ->
register(make_listener_regname(FluName), self()),
SockOpts = [{reuseaddr, true},
{mode, binary}, {active, false}, {packet, line}],
{ok, LSock} = gen_tcp:listen(TcpPort, SockOpts),
listen_server_loop(LSock, S).
case gen_tcp:listen(TcpPort, SockOpts) of
{ok, LSock} ->
listen_server_loop(LSock, S);
Else ->
error_logger:warning_msg("~s:run_listen_server: "
"listen to TCP port ~w: ~w\n",
[?MODULE, TcpPort, Else]),
exit({?MODULE, run_listen_server, tcp_port, TcpPort, Else})
end.
run_append_server(FluPid, AckPid, #state{flu_name=Name,
wedged=Wedged_p,epoch_id=EpochId}=S) ->
@ -244,7 +253,9 @@ decode_epoch_id(EpochIDHex) ->
net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) ->
ok = inet:setopts(Sock, [{packet, line}]),
case gen_tcp:recv(Sock, 0, 600*1000) of
%% TODO: Add testing control knob to adjust this timeout and/or inject
%% timeout condition.
case gen_tcp:recv(Sock, 0, ?SERVER_CMD_READ_TIMEOUT) of
{ok, Line} ->
%% machi_util:verb("Got: ~p\n", [Line]),
PrefixLenLF = byte_size(Line) - 2 - ?EpochIDSpace - 8 -8 - 1,
@ -428,6 +439,7 @@ do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin, DataDir,
<<Len:32/big>> = machi_util:hexstr_to_bin(LenHex),
{_, Path} = machi_util:make_data_filename(DataDir, FileBin),
OptsHasWrite = lists:member(write, FileOpts),
OptsHasRead = lists:member(read, FileOpts),
case file:open(Path, FileOpts) of
{ok, FH} ->
try
@ -440,8 +452,9 @@ do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin, DataDir,
Sock, OffsetHex, LenHex, FileBin, DataDir,
FileOpts, DoItFun,
EpochID, Wedged_p, CurrentEpochId);
{error, enoent} when OptsHasRead ->
ok = gen_tcp:send(Sock, <<"ERROR NO-SUCH-FILE\n">>);
_Else ->
%%%%%% keep?? machi_util:verb("Else ~p ~p ~p ~p\n", [Offset, Len, Path, _Else]),
ok = gen_tcp:send(Sock, <<"ERROR BAD-IO\n">>)
end.
@ -759,9 +772,9 @@ do_projection_command(Sock, LenHex, S) ->
_ = (catch gen_tcp:send(Sock, [<<"ERROR ">>, WHA, <<"\n">>]))
end.
handle_projection_command({get_latest_epoch, ProjType},
handle_projection_command({get_latest_epochid, ProjType},
#state{proj_store=ProjStore}) ->
machi_projection_store:get_latest_epoch(ProjStore, ProjType);
machi_projection_store:get_latest_epochid(ProjStore, ProjType);
handle_projection_command({read_latest_projection, ProjType},
#state{proj_store=ProjStore}) ->
machi_projection_store:read_latest_projection(ProjStore, ProjType);

View file

@ -25,6 +25,8 @@
-include("machi.hrl").
-include("machi_projection.hrl").
-define(HARD_TIMEOUT, 2500).
-export([
%% File API
append_chunk/4, append_chunk/5,
@ -35,7 +37,7 @@
wedge_status/1, wedge_status/2,
%% Projection API
get_latest_epoch/2, get_latest_epoch/3,
get_latest_epochid/2, get_latest_epochid/3,
read_latest_projection/2, read_latest_projection/3,
read_projection/3, read_projection/4,
write_projection/3, write_projection/4,
@ -138,7 +140,7 @@ append_chunk_extra(Host, TcpPort, EpochID, Prefix, Chunk, ChunkExtra)
-spec read_chunk(port_wrap(), epoch_id(), file_name(), file_offset(), chunk_size()) ->
{ok, chunk_s()} |
{error, error_general() | 'no_such_file' | 'partial_read'} |
{error, error_general() | 'not_written' | 'partial_read'} |
{error, term()}.
read_chunk(Sock, EpochID, File, Offset, Size)
when Offset >= ?MINIMUM_OFFSET, Size >= 0 ->
@ -149,7 +151,7 @@ read_chunk(Sock, EpochID, File, Offset, Size)
-spec read_chunk(inet_host(), inet_port(), epoch_id(),
file_name(), file_offset(), chunk_size()) ->
{ok, chunk_s()} |
{error, error_general() | 'no_such_file' | 'partial_read'} |
{error, error_general() | 'not_written' | 'partial_read'} |
{error, term()}.
read_chunk(Host, TcpPort, EpochID, File, Offset, Size)
when Offset >= ?MINIMUM_OFFSET, Size >= 0 ->
@ -223,22 +225,21 @@ wedge_status(Host, TcpPort) when is_integer(TcpPort) ->
%% @doc Get the latest epoch number + checksum from the FLU's projection store.
-spec get_latest_epoch(port_wrap(), projection_type()) ->
-spec get_latest_epochid(port_wrap(), projection_type()) ->
{ok, epoch_id()} | {error, term()}.
get_latest_epoch(Sock, ProjType)
get_latest_epochid(Sock, ProjType)
when ProjType == 'public' orelse ProjType == 'private' ->
get_latest_epoch2(Sock, ProjType).
get_latest_epochid2(Sock, ProjType).
%% @doc Get the latest epoch number + checksum from the FLU's projection store.
-spec get_latest_epoch(inet_host(), inet_port(),
projection_type()) ->
-spec get_latest_epochid(inet_host(), inet_port(), projection_type()) ->
{ok, epoch_id()} | {error, term()}.
get_latest_epoch(Host, TcpPort, ProjType)
get_latest_epochid(Host, TcpPort, ProjType)
when ProjType == 'public' orelse ProjType == 'private' ->
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
try
get_latest_epoch2(Sock, ProjType)
get_latest_epochid2(Sock, ProjType)
after
disconnect(Sock)
end.
@ -290,7 +291,7 @@ read_projection(Host, TcpPort, ProjType, Epoch)
%% @doc Write a projection `Proj' of type `ProjType'.
-spec write_projection(port_wrap(), projection_type(), projection()) ->
'ok' | {error, written} | {error, term()}.
'ok' | {error, 'written'} | {error, term()}.
write_projection(Sock, ProjType, Proj)
when ProjType == 'public' orelse ProjType == 'private',
is_record(Proj, projection_v1) ->
@ -300,7 +301,7 @@ write_projection(Sock, ProjType, Proj)
-spec write_projection(inet_host(), inet_port(),
projection_type(), projection()) ->
'ok' | {error, written} | {error, term()}.
'ok' | {error, 'written'} | {error, term()}.
write_projection(Host, TcpPort, ProjType, Proj)
when ProjType == 'public' orelse ProjType == 'private',
is_record(Proj, projection_v1) ->
@ -501,24 +502,25 @@ read_chunk2(Sock, EpochID, File0, Offset, Size) ->
SizeHex = machi_util:int_to_hexbin(Size, 32),
CmdLF = [$R, 32, EpochIDHex, PrefixHex, SizeHex, File, 10],
ok = w_send(Sock, CmdLF),
ok = w_setopts(Sock, [{packet, raw}]),
case w_recv(Sock, 3) of
{ok, <<"OK\n">>} ->
{ok, _Chunk}=Res = w_recv(Sock, Size),
Res;
{ok, Else} ->
{ok, OldOpts} = w_getopts(Sock, [packet]),
ok = w_setopts(Sock, [{packet, line}]),
{ok, Else2} = w_recv(Sock, 0),
ok = w_setopts(Sock, OldOpts),
case Else of
<<"ERA">> ->
{error, todo_erasure_coded}; %% escript_cc_parse_ec_info(Sock, Line, Else2);
<<"ERR">> ->
case Else2 of
<<"OR BAD-IO\n">> ->
{error, no_such_file};
<<"OR NO-SUCH-FILE\n">> ->
{error, not_written};
<<"OR NOT-ERASURE\n">> ->
{error, no_such_file};
%% {error, no_such_file};
%% Ignore the fact that the file doesn't exist.
{error, not_written};
<<"OR BAD-ARG\n">> ->
{error, bad_arg};
<<"OR PARTIAL-READ\n">> ->
@ -536,6 +538,9 @@ read_chunk2(Sock, EpochID, File0, Offset, Size) ->
throw:Error ->
put(bad_sock, Sock),
Error;
error:{case_clause,_}=Noo ->
put(bad_sock, Sock),
{error, {badmatch, Noo, erlang:get_stacktrace()}};
error:{badmatch,_}=BadMatch ->
put(bad_sock, Sock),
{error, {badmatch, BadMatch, erlang:get_stacktrace()}}
@ -561,7 +566,11 @@ list2(Sock, EpochID) ->
catch
throw:Error ->
Error;
error:{case_clause,_}=Noo ->
put(bad_sock, Sock),
{error, {badmatch, Noo, erlang:get_stacktrace()}};
error:{badmatch,_}=BadMatch ->
put(bad_sock, Sock),
{error, {badmatch, BadMatch}}
end.
@ -593,7 +602,11 @@ wedge_status2(Sock) ->
catch
throw:Error ->
Error;
error:{case_clause,_}=Noo ->
put(bad_sock, Sock),
{error, {badmatch, Noo, erlang:get_stacktrace()}};
error:{badmatch,_}=BadMatch ->
put(bad_sock, Sock),
{error, {badmatch, BadMatch}}
end.
@ -620,16 +633,15 @@ checksum_list2(Sock, EpochID, File) ->
{ok, <<"ERROR WEDGED", _/binary>>} ->
{error, wedged};
{ok, Else} ->
throw({server_protocol_error, Else});
{error, closed} ->
throw({error, closed});
Else ->
throw(Else)
throw({server_protocol_error, Else})
end
catch
throw:Error ->
put(bad_sock, Sock),
Error;
error:{case_clause,_}=Noo ->
put(bad_sock, Sock),
{error, {badmatch, Noo, erlang:get_stacktrace()}};
error:{badmatch,_}=BadMatch ->
put(bad_sock, Sock),
{error, {badmatch, BadMatch}}
@ -724,6 +736,9 @@ delete_migration2(Sock, EpochID, File) ->
throw:Error ->
put(bad_sock, Sock),
Error;
error:{case_clause,_}=Noo ->
put(bad_sock, Sock),
{error, {badmatch, Noo, erlang:get_stacktrace()}};
error:{badmatch,_}=BadMatch ->
put(bad_sock, Sock),
{error, {badmatch, BadMatch}}
@ -754,13 +769,16 @@ trunc_hack2(Sock, EpochID, File) ->
throw:Error ->
put(bad_sock, Sock),
Error;
error:{case_clause,_}=Noo ->
put(bad_sock, Sock),
{error, {badmatch, Noo, erlang:get_stacktrace()}};
error:{badmatch,_}=BadMatch ->
put(bad_sock, Sock),
{error, {badmatch, BadMatch}}
end.
get_latest_epoch2(Sock, ProjType) ->
ProjCmd = {get_latest_epoch, ProjType},
get_latest_epochid2(Sock, ProjType) ->
ProjCmd = {get_latest_epochid, ProjType},
do_projection_common(Sock, ProjCmd).
read_latest_projection2(Sock, ProjType) ->
@ -804,14 +822,15 @@ do_projection_common(Sock, ProjCmd) ->
binary_to_term(ResBin);
Else ->
{error, Else}
end;
{error, _} = Bad ->
throw(Bad)
end
end
catch
throw:Error ->
put(bad_sock, Sock),
Error;
error:{case_clause,_}=Noo ->
put(bad_sock, Sock),
{error, {badmatch, Noo, erlang:get_stacktrace()}};
error:{badmatch,_}=BadMatch ->
put(bad_sock, Sock),
{error, {badmatch, BadMatch, erlang:get_stacktrace()}}
@ -823,7 +842,7 @@ w_connect(#p_srvr{proto_mod=?MODULE, address=Host, port=Port, props=Props})->
try
case proplists:get_value(session_proto, Props, tcp) of
tcp ->
Sock = machi_util:connect(Host, Port),
Sock = machi_util:connect(Host, Port, ?HARD_TIMEOUT),
{w,tcp,Sock};
%% sctp ->
%% %% TODO: not implemented
@ -845,14 +864,11 @@ w_close({w,tcp,Sock}) ->
ok.
w_recv({w,tcp,Sock}, Amt) ->
gen_tcp:recv(Sock, Amt).
gen_tcp:recv(Sock, Amt, ?HARD_TIMEOUT).
w_send({w,tcp,Sock}, IoData) ->
gen_tcp:send(Sock, IoData).
w_getopts({w,tcp,Sock}, Opts) ->
inet:getopts(Sock, Opts).
w_setopts({w,tcp,Sock}, Opts) ->
inet:setopts(Sock, Opts).

View file

@ -43,7 +43,7 @@
%% API
-export([
start_link/3,
get_latest_epoch/2, get_latest_epoch/3,
get_latest_epochid/2, get_latest_epochid/3,
read_latest_projection/2, read_latest_projection/3,
read/3, read/4,
write/3, write/4,
@ -62,8 +62,8 @@
public_dir = "" :: string(),
private_dir = "" :: string(),
wedge_notify_pid :: pid() | atom(),
max_public_epoch = ?NO_EPOCH :: {-1 | non_neg_integer(), binary()},
max_private_epoch = ?NO_EPOCH :: {-1 | non_neg_integer(), binary()}
max_public_epochid = ?NO_EPOCH :: {-1 | non_neg_integer(), binary()},
max_private_epochid = ?NO_EPOCH :: {-1 | non_neg_integer(), binary()}
}).
%% @doc Start a new projection store server.
@ -80,15 +80,15 @@ start_link(RegName, DataDir, NotifyWedgeStateChanges) ->
%% @doc Fetch the latest epoch number + checksum for type `ProjType'.
get_latest_epoch(PidSpec, ProjType) ->
get_latest_epoch(PidSpec, ProjType, infinity).
get_latest_epochid(PidSpec, ProjType) ->
get_latest_epochid(PidSpec, ProjType, infinity).
%% @doc Fetch the latest epoch number + checksum for type `ProjType'.
%% projection.
get_latest_epoch(PidSpec, ProjType, Timeout)
get_latest_epochid(PidSpec, ProjType, Timeout)
when ProjType == 'public' orelse ProjType == 'private' ->
g_call(PidSpec, {get_latest_epoch, ProjType}, Timeout).
g_call(PidSpec, {get_latest_epochid, ProjType}, Timeout).
%% @doc Fetch the latest projection record for type `ProjType'.
@ -168,25 +168,27 @@ init([DataDir, NotifyWedgeStateChanges]) ->
PrivateDir = machi_util:make_projection_filename(DataDir, "private"),
ok = filelib:ensure_dir(PublicDir ++ "/ignored"),
ok = filelib:ensure_dir(PrivateDir ++ "/ignored"),
MaxPublicEpoch = find_max_epoch(PublicDir),
MaxPrivateEpoch = find_max_epoch(PrivateDir),
MbEpoch = find_max_epochid(PublicDir),
%% MbEpoch = {Mb#projection_v1.epoch_number, Mb#projection_v1.epoch_csum},
MvEpoch = find_max_epochid(PrivateDir),
%% MvEpoch = {Mv#projection_v1.epoch_number, Mv#projection_v1.epoch_csum},
{ok, #state{public_dir=PublicDir,
private_dir=PrivateDir,
wedge_notify_pid=NotifyWedgeStateChanges,
max_public_epoch=MaxPublicEpoch,
max_private_epoch=MaxPrivateEpoch}}.
max_public_epochid=MbEpoch,
max_private_epochid=MvEpoch}}.
handle_call({{get_latest_epoch, ProjType}, LC1}, _From, S) ->
handle_call({{get_latest_epochid, ProjType}, LC1}, _From, S) ->
LC2 = lclock_update(LC1),
EpochId = if ProjType == public -> S#state.max_public_epoch;
ProjType == private -> S#state.max_private_epoch
EpochId = if ProjType == public -> S#state.max_public_epochid;
ProjType == private -> S#state.max_private_epochid
end,
{reply, {{ok, EpochId}, LC2}, S};
handle_call({{read_latest_projection, ProjType}, LC1}, _From, S) ->
LC2 = lclock_update(LC1),
{EpochNum, _CSum} = if ProjType == public -> S#state.max_public_epoch;
ProjType == private -> S#state.max_private_epoch
{EpochNum, _CSum} = if ProjType == public -> S#state.max_public_epochid;
ProjType == private -> S#state.max_private_epochid
end,
{Reply, NewS} = do_proj_read(ProjType, EpochNum, S),
{reply, {Reply, LC2}, NewS};
@ -268,7 +270,7 @@ do_proj_write(ProjType, #projection_v1{epoch_number=Epoch}=Proj, S) ->
EffectiveEpochId = {EffectiveEpoch, EffectiveProj#projection_v1.epoch_csum},
%%
NewS = if ProjType == public,
Epoch > element(1, S#state.max_public_epoch) ->
Epoch > element(1, S#state.max_public_epochid) ->
if Epoch == EffectiveEpoch ->
%% This is a regular projection, i.e.,
%% does not have an inner proj.
@ -281,13 +283,13 @@ do_proj_write(ProjType, #projection_v1{epoch_number=Epoch}=Proj, S) ->
%% not bother wedging.
ok
end,
S#state{max_public_epoch=EpochId};
S#state{max_public_epochid=EpochId};
ProjType == private,
Epoch > element(1, S#state.max_private_epoch) ->
Epoch > element(1, S#state.max_private_epochid) ->
update_wedge_state(
S#state.wedge_notify_pid, false,
EffectiveEpochId),
S#state{max_private_epoch=EpochId};
S#state{max_private_epochid=EpochId};
true ->
S
end,
@ -344,14 +346,14 @@ find_all(Dir) ->
Fs = filelib:wildcard("*", Dir),
lists:sort([name2epoch(F) || F <- Fs]).
find_max_epoch(Dir) ->
find_max_epochid(Dir) ->
Fs = lists:sort(filelib:wildcard("*", Dir)),
if Fs == [] ->
?NO_EPOCH;
true ->
EpochNum = name2epoch(lists:last(Fs)),
{{ok, Proj}, _} = do_proj_read(proj_type_ignored, EpochNum, Dir),
{EpochNum, Proj}
{EpochNum, Proj#projection_v1.epoch_csum}
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%

View file

@ -59,7 +59,7 @@
%% %% Projection API
get_epoch_id/1, get_epoch_id/2,
get_latest_epoch/2, get_latest_epoch/3,
get_latest_epochid/2, get_latest_epochid/3,
read_latest_projection/2, read_latest_projection/3,
read_projection/3, read_projection/4,
write_projection/3, write_projection/4,
@ -70,7 +70,10 @@
quit/1,
%% Internal API
write_chunk/5, write_chunk/6
write_chunk/5, write_chunk/6,
%% Helpers
stop_proxies/1, start_proxies/1
]).
%% gen_server callbacks
@ -173,13 +176,13 @@ get_epoch_id(PidSpec, Timeout) ->
%% @doc Get the latest epoch number + checksum from the FLU's projection store.
get_latest_epoch(PidSpec, ProjType) ->
get_latest_epoch(PidSpec, ProjType, infinity).
get_latest_epochid(PidSpec, ProjType) ->
get_latest_epochid(PidSpec, ProjType, infinity).
%% @doc Get the latest epoch number + checksum from the FLU's projection store.
get_latest_epoch(PidSpec, ProjType, Timeout) ->
gen_server:call(PidSpec, {req, {get_latest_epoch, ProjType}},
get_latest_epochid(PidSpec, ProjType, Timeout) ->
gen_server:call(PidSpec, {req, {get_latest_epochid, ProjType}},
Timeout).
%% @doc Get the latest projection from the FLU's projection store for `ProjType'
@ -276,6 +279,7 @@ handle_cast(_Msg, S) ->
{noreply, S}.
handle_info(_Info, S) ->
io:format(user, "~s:handle_info: ~p\n", [?MODULE, _Info]),
{noreply, S}.
terminate(_Reason, _S) ->
@ -287,25 +291,38 @@ code_change(_OldVsn, S, _Extra) ->
%%%%%%%%%%%%%%%%%%%%%%%%%%%
do_req(Req, S) ->
do_req(Req, 1, S).
do_req(Req, Depth, S) ->
S2 = try_connect(S),
Fun = make_req_fun(Req, S2),
case connected_p(S2) of
true ->
case Fun() of
ok ->
{ok, S2};
T when element(1, T) == ok ->
{T, S2};
Else ->
%% {error, {badmatch, {badmatch, {error, Why}=TheErr}, _Stk}}
%% when Why == closed; Why == timeout ->
%% do_req_retry(Req, Depth, TheErr, S2);
TheErr ->
case get(bad_sock) of
Bad when Bad == S2#state.sock ->
{Else, disconnect(S2)};
do_req_retry(Req, Depth, TheErr, S2);
_ ->
{Else, S2}
{TheErr, S2}
end
end;
false ->
{{error, partition}, S2}
end.
do_req_retry(_Req, 2, Err, S) ->
{Err, disconnect(S)};
do_req_retry(Req, Depth, _Err, S) ->
do_req(Req, Depth + 1, try_connect(disconnect(S))).
make_req_fun({append_chunk, EpochID, Prefix, Chunk},
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
fun() -> Mod:append_chunk(Sock, EpochID, Prefix, Chunk) end;
@ -339,9 +356,9 @@ make_req_fun({get_epoch_id},
Error
end
end;
make_req_fun({get_latest_epoch, ProjType},
make_req_fun({get_latest_epochid, ProjType},
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
fun() -> Mod:get_latest_epoch(Sock, ProjType) end;
fun() -> Mod:get_latest_epochid(Sock, ProjType) end;
make_req_fun({read_latest_projection, ProjType},
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
fun() -> Mod:read_latest_projection(Sock, ProjType) end;
@ -377,3 +394,22 @@ disconnect(#state{sock=Sock,
i=#p_srvr{proto_mod=Mod}=_I}=S) ->
Mod:disconnect(Sock),
S#state{sock=undefined}.
stop_proxies(undefined) ->
[];
stop_proxies(ProxiesDict) ->
orddict:fold(
fun(_K, Pid, _Acc) ->
_ = (catch machi_proxy_flu1_client:quit(Pid))
end, [], ProxiesDict).
start_proxies(MembersDict) ->
Proxies = orddict:fold(
fun(K, P, Acc) ->
{ok, Pid} = machi_proxy_flu1_client:start_link(P),
[{K, Pid}|Acc]
end, [], orddict:to_list(MembersDict)),
orddict:from_list(Proxies).

View file

@ -274,4 +274,3 @@ escript_connect(Host, Port, Timeout) when is_integer(Port) ->
{ok, Sock} = gen_tcp:connect(Host, Port, [{active,false}, {mode,binary},
{packet, raw}], Timeout),
Sock.

View file

@ -378,10 +378,10 @@ todo_why_does_this_crash_sometimes(FLUName, FLU, PPPepoch) ->
end.
private_projections_are_stable(Namez, PollFunc) ->
Private1 = [?FLU_PC:get_latest_epoch(FLU, private) ||
Private1 = [?FLU_PC:get_latest_epochid(FLU, private) ||
{_Name, FLU} <- Namez],
PollFunc(5, 1, 10),
Private2 = [?FLU_PC:get_latest_epoch(FLU, private) ||
Private2 = [?FLU_PC:get_latest_epochid(FLU, private) ||
{_Name, FLU} <- Namez],
true = (Private1 == Private2).

View file

@ -0,0 +1,136 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(machi_cr_client_test).
-include("machi.hrl").
-include("machi_projection.hrl").
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
smoke_test() ->
os:cmd("rm -rf ./data.a ./data.b ./data.c"),
{ok, SupPid} = machi_flu_sup:start_link(),
error_logger:tty(false),
try
Prefix = <<"pre">>,
Chunk1 = <<"yochunk">>,
Host = "localhost",
PortBase = 4444,
Os = [{ignore_stability_time, true}, {active_mode, false}],
{ok,_}=machi_flu_psup:start_flu_package(a, PortBase+0, "./data.a", Os),
{ok,_}=machi_flu_psup:start_flu_package(b, PortBase+1, "./data.b", Os),
{ok,_}=machi_flu_psup:start_flu_package(c, PortBase+2, "./data.c", Os),
D = orddict:from_list(
[{a,{p_srvr,a,machi_flu1_client,"localhost",PortBase+0,[]}},
{b,{p_srvr,b,machi_flu1_client,"localhost",PortBase+1,[]}},
{c,{p_srvr,c,machi_flu1_client,"localhost",PortBase+2,[]}}]),
%% Force the chain to repair & fully assemble as quickly as possible.
%% 1. Use set_chain_members() on all 3
%% 2. Force a to run repair in a tight loop
%% 3. Stop as soon as we see UPI=[a,b,c] and also author=c.
%% Otherwise, we can have a race with later, minor
%% projection changes which will change our understanding of
%% the epoch id. (C is the author with highest weight.)
%% 4. Wait until all others are using epoch id from #3.
%%
%% Damn, this is a pain to make 100% deterministic, bleh.
ok = machi_chain_manager1:set_chain_members(a_chmgr, D),
ok = machi_chain_manager1:set_chain_members(b_chmgr, D),
ok = machi_chain_manager1:set_chain_members(c_chmgr, D),
TickAll = fun() -> [begin
Pid ! tick_check_environment,
timer:sleep(50)
end || Pid <- [a_chmgr,b_chmgr,c_chmgr] ]
end,
_ = lists:foldl(
fun(_, [{c,[a,b,c]}]=Acc) -> Acc;
(_, _Acc) ->
TickAll(), % has some sleep time inside
Xs = [begin
{ok, Prj} = machi_projection_store:read_latest_projection(PStore, private),
{Prj#projection_v1.author_server,
Prj#projection_v1.upi}
end || PStore <- [a_pstore,b_pstore,c_pstore] ],
lists:usort(Xs)
end, undefined, lists:seq(1,10000)),
%% Everyone is settled on the same damn epoch id.
{ok, EpochID} = machi_flu1_client:get_latest_epochid(Host, PortBase+0,
private),
{ok, EpochID} = machi_flu1_client:get_latest_epochid(Host, PortBase+1,
private),
{ok, EpochID} = machi_flu1_client:get_latest_epochid(Host, PortBase+2,
private),
%% Whew ... ok, now start some damn tests.
{ok, C1} = machi_cr_client:start_link([P || {_,P}<-orddict:to_list(D)]),
machi_cr_client:append_chunk(C1, Prefix, Chunk1),
%% {machi_flu_psup:stop_flu_package(c), timer:sleep(50)},
{ok, {Off1,Size1,File1}} =
machi_cr_client:append_chunk(C1, Prefix, Chunk1),
{ok, Chunk1} = machi_cr_client:read_chunk(C1, File1, Off1, Size1),
{ok, PPP} = machi_flu1_client:read_latest_projection(Host, PortBase+0,
private),
%% Verify that the client's CR wrote to all of them.
[{ok, Chunk1} = machi_flu1_client:read_chunk(
Host, PortBase+X, EpochID, File1, Off1, Size1) ||
X <- [0,1,2] ],
%% Test read repair: Manually write to head, then verify that
%% read-repair fixes all.
FooOff1 = Off1 + (1024*1024),
[{error, not_written} = machi_flu1_client:read_chunk(
Host, PortBase+X, EpochID,
File1, FooOff1, Size1) || X <- [0,1,2] ],
ok = machi_flu1_client:write_chunk(Host, PortBase+0, EpochID,
File1, FooOff1, Chunk1),
{ok, Chunk1} = machi_cr_client:read_chunk(C1, File1, FooOff1, Size1),
[{X,{ok, Chunk1}} = {X,machi_flu1_client:read_chunk(
Host, PortBase+X, EpochID,
File1, FooOff1, Size1)} || X <- [0,1,2] ],
%% Test read repair: Manually write to middle, then same checking.
FooOff2 = Off1 + (2*1024*1024),
Chunk2 = <<"Middle repair chunk">>,
Size2 = size(Chunk2),
ok = machi_flu1_client:write_chunk(Host, PortBase+1, EpochID,
File1, FooOff2, Chunk2),
{ok, Chunk2} = machi_cr_client:read_chunk(C1, File1, FooOff2, Size2),
[{X,{ok, Chunk2}} = {X,machi_flu1_client:read_chunk(
Host, PortBase+X, EpochID,
File1, FooOff2, Size2)} || X <- [0,1,2] ],
%% Misc API smoke
%% Checksum lists are 3-tuples
{ok, [{_,_,_}|_]} = machi_cr_client:checksum_list(C1, File1),
{error, no_such_file} = machi_cr_client:checksum_list(C1, <<"!!!!">>),
%% Exactly one file right now
{ok, [_]} = machi_cr_client:list_files(C1),
ok
after
error_logger:tty(true),
catch application:stop(machi),
exit(SupPid, normal)
end.
-endif. % TEST.

View file

@ -80,9 +80,9 @@ flu_smoke_test() ->
BadPrefix, Chunk1),
{ok, [{_,File1}]} = ?FLU_C:list_files(Host, TcpPort, ?DUMMY_PV1_EPOCH),
Len1 = size(Chunk1),
{error, no_such_file} = ?FLU_C:read_chunk(Host, TcpPort,
{error, not_written} = ?FLU_C:read_chunk(Host, TcpPort,
?DUMMY_PV1_EPOCH,
File1, Off1*983, Len1),
File1, Off1*983829323, Len1),
{error, partial_read} = ?FLU_C:read_chunk(Host, TcpPort,
?DUMMY_PV1_EPOCH,
File1, Off1, Len1*984),
@ -114,7 +114,7 @@ flu_smoke_test() ->
BadFile, Off2, Chunk2),
{ok, Chunk2} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH,
File2, Off2, Len2),
{error, no_such_file} = ?FLU_C:read_chunk(Host, TcpPort,
{error, not_written} = ?FLU_C:read_chunk(Host, TcpPort,
?DUMMY_PV1_EPOCH,
"no!!", Off2, Len2),
{error, bad_arg} = ?FLU_C:read_chunk(Host, TcpPort,
@ -149,7 +149,7 @@ flu_projection_smoke_test() ->
FLU1 = setup_test_flu(projection_test_flu, TcpPort, DataDir),
try
[begin
{ok, {0,_}} = ?FLU_C:get_latest_epoch(Host, TcpPort, T),
{ok, {0,_}} = ?FLU_C:get_latest_epochid(Host, TcpPort, T),
{error, not_written} =
?FLU_C:read_latest_projection(Host, TcpPort, T),
{ok, []} = ?FLU_C:list_all_projections(Host, TcpPort, T),
@ -160,7 +160,7 @@ flu_projection_smoke_test() ->
ok = ?FLU_C:write_projection(Host, TcpPort, T, P1),
{error, written} = ?FLU_C:write_projection(Host, TcpPort, T, P1),
{ok, P1} = ?FLU_C:read_projection(Host, TcpPort, T, 1),
{ok, {1,_}} = ?FLU_C:get_latest_epoch(Host, TcpPort, T),
{ok, {1,_}} = ?FLU_C:get_latest_epochid(Host, TcpPort, T),
{ok, P1} = ?FLU_C:read_latest_projection(Host, TcpPort, T),
{ok, [1]} = ?FLU_C:list_all_projections(Host, TcpPort, T),
{ok, [P1]} = ?FLU_C:get_all_projections(Host, TcpPort, T),

View file

@ -74,7 +74,7 @@ api_smoke_test() ->
{error, no_such_file} = ?MUT:checksum_list(Prox1, FakeEpoch, BadFile),
{ok, [_|_]} = ?MUT:list_files(Prox1, FakeEpoch),
{ok, {false, _}} = ?MUT:wedge_status(Prox1),
{ok, FakeEpoch} = ?MUT:get_latest_epoch(Prox1, public),
{ok, FakeEpoch} = ?MUT:get_latest_epochid(Prox1, public),
{error, not_written} = ?MUT:read_latest_projection(Prox1, public),
{error, not_written} = ?MUT:read_projection(Prox1, public, 44),
P_a = #p_srvr{name=a, address="localhost", port=6622},
@ -83,6 +83,7 @@ api_smoke_test() ->
{ok, P1} = ?MUT:read_projection(Prox1, public, 1),
{ok, [P1]} = ?MUT:get_all_projections(Prox1, public),
{ok, [1]} = ?MUT:list_all_projections(Prox1, public),
ok
after
_ = (catch ?MUT:quit(Prox1))
@ -92,4 +93,182 @@ api_smoke_test() ->
(catch machi_flu1:stop(get(flu_pid)))
end.
flu_restart_test() ->
RegName = api_smoke_flu,
Host = "localhost",
TcpPort = 57125,
DataDir = "./data.api_smoke_flu2",
W_props = [{initial_wedged, false}],
erase(flu_pid),
put(flu_pid, []),
FLU1 = machi_flu1_test:setup_test_flu(RegName, TcpPort, DataDir,
W_props),
put(flu_pid, [FLU1|get(flu_pid)]),
try
I = #p_srvr{name=RegName, address=Host, port=TcpPort},
{ok, Prox1} = ?MUT:start_link(I),
try
FakeEpoch = ?DUMMY_PV1_EPOCH,
Data = <<"data!">>,
{ok, {Off1,Size1,File1}} = ?MUT:append_chunk(Prox1,
FakeEpoch, <<"prefix">>, Data,
infinity),
P_a = #p_srvr{name=a, address="localhost", port=6622},
P1 = machi_projection:new(1, a, [P_a], [], [a], [], []),
EpochID = {P1#projection_v1.epoch_number,
P1#projection_v1.epoch_csum},
ok = ?MUT:write_projection(Prox1, public, P1),
ok = ?MUT:write_projection(Prox1, private, P1),
{ok, EpochID} = ?MUT:get_epoch_id(Prox1),
{ok, EpochID} = ?MUT:get_latest_epochid(Prox1, public),
{ok, EpochID} = ?MUT:get_latest_epochid(Prox1, private),
ok = machi_flu1:stop(FLU1), timer:sleep(50),
%% Now that the last proxy op was successful and only
%% after did we stop the FLU, let's check that both the
%% 1st & 2nd ops-via-proxy after FLU is restarted are
%% successful. And immediately after stopping the FLU,
%% both 1st & 2nd ops-via-proxy should always fail.
%%
%% Some of the expectations have unbound variables, which
%% makes the code a bit convoluted. (No LFE or
%% Elixir macros here, alas, they'd be useful.)
ExpectedOps =
[
fun(run) -> {ok, EpochID} = ?MUT:get_epoch_id(Prox1),
ok;
(line) -> io:format("line ~p, ", [?LINE]);
(stop) -> ?MUT:get_epoch_id(Prox1) end,
fun(run) -> {ok, EpochID} =
?MUT:get_latest_epochid(Prox1, public),
ok;
(line) -> io:format("line ~p, ", [?LINE]);
(stop) -> ?MUT:get_latest_epochid(Prox1, public)
end,
fun(run) -> {ok, EpochID} =
?MUT:get_latest_epochid(Prox1, private),
ok;
(line) -> io:format("line ~p, ", [?LINE]);
(stop) -> ?MUT:get_latest_epochid(Prox1, private)
end,
fun(run) -> {ok, P1} =
?MUT:read_projection(Prox1, public, 1),
ok;
(line) -> io:format("line ~p, ", [?LINE]);
(stop) -> ?MUT:read_projection(Prox1, public, 1)
end,
fun(run) -> {ok, P1} =
?MUT:read_projection(Prox1, private, 1),
ok;
(line) -> io:format("line ~p, ", [?LINE]);
(stop) -> ?MUT:read_projection(Prox1, private, 1)
end,
fun(run) -> {error, not_written} =
?MUT:read_projection(Prox1, private, 7),
ok;
(line) -> io:format("line ~p, ", [?LINE]);
(stop) -> ?MUT:read_projection(Prox1, private, 7)
end,
fun(run) -> {error, written} =
?MUT:write_projection(Prox1, public, P1),
ok;
(line) -> io:format("line ~p, ", [?LINE]);
(stop) -> ?MUT:write_projection(Prox1, public, P1)
end,
fun(run) -> {error, written} =
?MUT:write_projection(Prox1, private, P1),
ok;
(line) -> io:format("line ~p, ", [?LINE]);
(stop) -> ?MUT:write_projection(Prox1, private, P1)
end,
fun(run) -> {ok, [_]} =
?MUT:get_all_projections(Prox1, public),
ok;
(line) -> io:format("line ~p, ", [?LINE]);
(stop) -> ?MUT:get_all_projections(Prox1, public)
end,
fun(run) -> {ok, [_]} =
?MUT:get_all_projections(Prox1, private),
ok;
(line) -> io:format("line ~p, ", [?LINE]);
(stop) -> ?MUT:get_all_projections(Prox1, private)
end,
fun(run) -> {ok, {_,_,_}} =
?MUT:append_chunk(Prox1, FakeEpoch,
<<"prefix">>, Data, infinity),
ok;
(line) -> io:format("line ~p, ", [?LINE]);
(stop) -> ?MUT:append_chunk(Prox1, FakeEpoch,
<<"prefix">>, Data, infinity)
end,
fun(run) -> {ok, {_,_,_}} =
?MUT:append_chunk_extra(Prox1, FakeEpoch,
<<"prefix">>, Data, 42, infinity),
ok;
(line) -> io:format("line ~p, ", [?LINE]);
(stop) -> ?MUT:append_chunk_extra(Prox1, FakeEpoch,
<<"prefix">>, Data, 42, infinity)
end,
fun(run) -> {ok, Data} =
?MUT:read_chunk(Prox1, FakeEpoch,
File1, Off1, Size1),
ok;
(line) -> io:format("line ~p, ", [?LINE]);
(stop) -> ?MUT:read_chunk(Prox1, FakeEpoch,
File1, Off1, Size1)
end,
fun(run) -> {ok, _} =
?MUT:checksum_list(Prox1, FakeEpoch, File1),
ok;
(line) -> io:format("line ~p, ", [?LINE]);
(stop) -> ?MUT:checksum_list(Prox1, FakeEpoch, File1)
end,
fun(run) -> {ok, _} =
?MUT:list_files(Prox1, FakeEpoch),
ok;
(line) -> io:format("line ~p, ", [?LINE]);
(stop) -> ?MUT:list_files(Prox1, FakeEpoch)
end,
fun(run) -> {ok, _} =
?MUT:wedge_status(Prox1),
ok;
(line) -> io:format("line ~p, ", [?LINE]);
(stop) -> ?MUT:wedge_status(Prox1)
end,
%% NOTE: When write-once enforcement is enabled, this test
%% will fail: change ok -> {error, written}
fun(run) -> %% {error, written} =
ok =
?MUT:write_chunk(Prox1, FakeEpoch, File1, Off1,
Data, infinity),
ok;
(line) -> io:format("line ~p, ", [?LINE]);
(stop) -> ?MUT:write_chunk(Prox1, FakeEpoch, File1, Off1,
Data, infinity)
end
],
[begin
FLU2 = machi_flu1_test:setup_test_flu(
RegName, TcpPort, DataDir,
[save_data_dir|W_props]),
put(flu_pid, [FLU2|get(flu_pid)]),
_ = Fun(line),
ok = Fun(run),
ok = Fun(run),
ok = machi_flu1:stop(FLU2),
{error, partition} = Fun(stop),
{error, partition} = Fun(stop),
ok
end || Fun <- ExpectedOps ],
ok
after
_ = (catch ?MUT:quit(Prox1))
end
after
[catch machi_flu1:stop(Pid) || Pid <- get(flu_pid)]
end.
-endif. % TEST