WIP: refactoring machi_cr_client:append_chunk*

This commit is contained in:
Scott Lystig Fritchie 2015-05-18 19:06:06 +09:00
parent a347722a15
commit 185c670b2f
7 changed files with 270 additions and 70 deletions

View file

@ -763,9 +763,9 @@ do_projection_command(Sock, LenHex, S) ->
_ = (catch gen_tcp:send(Sock, [<<"ERROR ">>, WHA, <<"\n">>]))
end.
handle_projection_command({get_latest_epoch, ProjType},
handle_projection_command({get_latest_epochid, ProjType},
#state{proj_store=ProjStore}) ->
machi_projection_store:get_latest_epoch(ProjStore, ProjType);
machi_projection_store:get_latest_epochid(ProjStore, ProjType);
handle_projection_command({read_latest_projection, ProjType},
#state{proj_store=ProjStore}) ->
machi_projection_store:read_latest_projection(ProjStore, ProjType);

View file

@ -25,6 +25,8 @@
-include("machi.hrl").
-include("machi_projection.hrl").
-define(HARD_TIMEOUT, 2500).
-export([
%% File API
append_chunk/4, append_chunk/5,
@ -35,7 +37,7 @@
wedge_status/1, wedge_status/2,
%% Projection API
get_latest_epoch/2, get_latest_epoch/3,
get_latest_epochid/2, get_latest_epochid/3,
read_latest_projection/2, read_latest_projection/3,
read_projection/3, read_projection/4,
write_projection/3, write_projection/4,
@ -223,22 +225,21 @@ wedge_status(Host, TcpPort) when is_integer(TcpPort) ->
%% @doc Get the latest epoch number + checksum from the FLU's projection store.
-spec get_latest_epoch(port_wrap(), projection_type()) ->
-spec get_latest_epochid(port_wrap(), projection_type()) ->
{ok, epoch_id()} | {error, term()}.
get_latest_epoch(Sock, ProjType)
get_latest_epochid(Sock, ProjType)
when ProjType == 'public' orelse ProjType == 'private' ->
get_latest_epoch2(Sock, ProjType).
get_latest_epochid2(Sock, ProjType).
%% @doc Get the latest epoch number + checksum from the FLU's projection store.
-spec get_latest_epoch(inet_host(), inet_port(),
projection_type()) ->
-spec get_latest_epochid(inet_host(), inet_port(), projection_type()) ->
{ok, epoch_id()} | {error, term()}.
get_latest_epoch(Host, TcpPort, ProjType)
get_latest_epochid(Host, TcpPort, ProjType)
when ProjType == 'public' orelse ProjType == 'private' ->
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
try
get_latest_epoch2(Sock, ProjType)
get_latest_epochid2(Sock, ProjType)
after
disconnect(Sock)
end.
@ -501,15 +502,14 @@ read_chunk2(Sock, EpochID, File0, Offset, Size) ->
SizeHex = machi_util:int_to_hexbin(Size, 32),
CmdLF = [$R, 32, EpochIDHex, PrefixHex, SizeHex, File, 10],
ok = w_send(Sock, CmdLF),
ok = w_setopts(Sock, [{packet, raw}]),
case w_recv(Sock, 3) of
{ok, <<"OK\n">>} ->
{ok, _Chunk}=Res = w_recv(Sock, Size),
Res;
{ok, Else} ->
{ok, OldOpts} = w_getopts(Sock, [packet]),
ok = w_setopts(Sock, [{packet, line}]),
{ok, Else2} = w_recv(Sock, 0),
ok = w_setopts(Sock, OldOpts),
case Else of
<<"ERA">> ->
{error, todo_erasure_coded}; %% escript_cc_parse_ec_info(Sock, Line, Else2);
@ -536,6 +536,9 @@ read_chunk2(Sock, EpochID, File0, Offset, Size) ->
throw:Error ->
put(bad_sock, Sock),
Error;
error:{case_clause,_}=Noo ->
put(bad_sock, Sock),
{error, {badmatch, Noo, erlang:get_stacktrace()}};
error:{badmatch,_}=BadMatch ->
put(bad_sock, Sock),
{error, {badmatch, BadMatch, erlang:get_stacktrace()}}
@ -561,7 +564,11 @@ list2(Sock, EpochID) ->
catch
throw:Error ->
Error;
error:{case_clause,_}=Noo ->
put(bad_sock, Sock),
{error, {badmatch, Noo, erlang:get_stacktrace()}};
error:{badmatch,_}=BadMatch ->
put(bad_sock, Sock),
{error, {badmatch, BadMatch}}
end.
@ -593,7 +600,11 @@ wedge_status2(Sock) ->
catch
throw:Error ->
Error;
error:{case_clause,_}=Noo ->
put(bad_sock, Sock),
{error, {badmatch, Noo, erlang:get_stacktrace()}};
error:{badmatch,_}=BadMatch ->
put(bad_sock, Sock),
{error, {badmatch, BadMatch}}
end.
@ -620,16 +631,15 @@ checksum_list2(Sock, EpochID, File) ->
{ok, <<"ERROR WEDGED", _/binary>>} ->
{error, wedged};
{ok, Else} ->
throw({server_protocol_error, Else});
{error, closed} ->
throw({error, closed});
Else ->
throw(Else)
throw({server_protocol_error, Else})
end
catch
throw:Error ->
put(bad_sock, Sock),
Error;
error:{case_clause,_}=Noo ->
put(bad_sock, Sock),
{error, {badmatch, Noo, erlang:get_stacktrace()}};
error:{badmatch,_}=BadMatch ->
put(bad_sock, Sock),
{error, {badmatch, BadMatch}}
@ -724,6 +734,9 @@ delete_migration2(Sock, EpochID, File) ->
throw:Error ->
put(bad_sock, Sock),
Error;
error:{case_clause,_}=Noo ->
put(bad_sock, Sock),
{error, {badmatch, Noo, erlang:get_stacktrace()}};
error:{badmatch,_}=BadMatch ->
put(bad_sock, Sock),
{error, {badmatch, BadMatch}}
@ -754,13 +767,16 @@ trunc_hack2(Sock, EpochID, File) ->
throw:Error ->
put(bad_sock, Sock),
Error;
error:{case_clause,_}=Noo ->
put(bad_sock, Sock),
{error, {badmatch, Noo, erlang:get_stacktrace()}};
error:{badmatch,_}=BadMatch ->
put(bad_sock, Sock),
{error, {badmatch, BadMatch}}
end.
get_latest_epoch2(Sock, ProjType) ->
ProjCmd = {get_latest_epoch, ProjType},
get_latest_epochid2(Sock, ProjType) ->
ProjCmd = {get_latest_epochid, ProjType},
do_projection_common(Sock, ProjCmd).
read_latest_projection2(Sock, ProjType) ->
@ -804,14 +820,15 @@ do_projection_common(Sock, ProjCmd) ->
binary_to_term(ResBin);
Else ->
{error, Else}
end;
{error, _} = Bad ->
throw(Bad)
end
end
catch
throw:Error ->
put(bad_sock, Sock),
Error;
error:{case_clause,_}=Noo ->
put(bad_sock, Sock),
{error, {badmatch, Noo, erlang:get_stacktrace()}};
error:{badmatch,_}=BadMatch ->
put(bad_sock, Sock),
{error, {badmatch, BadMatch, erlang:get_stacktrace()}}
@ -823,7 +840,7 @@ w_connect(#p_srvr{proto_mod=?MODULE, address=Host, port=Port, props=Props})->
try
case proplists:get_value(session_proto, Props, tcp) of
tcp ->
Sock = machi_util:connect(Host, Port),
Sock = machi_util:connect(Host, Port, ?HARD_TIMEOUT),
{w,tcp,Sock};
%% sctp ->
%% %% TODO: not implemented
@ -845,14 +862,11 @@ w_close({w,tcp,Sock}) ->
ok.
w_recv({w,tcp,Sock}, Amt) ->
gen_tcp:recv(Sock, Amt).
gen_tcp:recv(Sock, Amt, ?HARD_TIMEOUT).
w_send({w,tcp,Sock}, IoData) ->
gen_tcp:send(Sock, IoData).
w_getopts({w,tcp,Sock}, Opts) ->
inet:getopts(Sock, Opts).
w_setopts({w,tcp,Sock}, Opts) ->
inet:setopts(Sock, Opts).

View file

@ -43,7 +43,7 @@
%% API
-export([
start_link/3,
get_latest_epoch/2, get_latest_epoch/3,
get_latest_epochid/2, get_latest_epochid/3,
read_latest_projection/2, read_latest_projection/3,
read/3, read/4,
write/3, write/4,
@ -62,8 +62,8 @@
public_dir = "" :: string(),
private_dir = "" :: string(),
wedge_notify_pid :: pid() | atom(),
max_public_epoch = ?NO_EPOCH :: {-1 | non_neg_integer(), binary()},
max_private_epoch = ?NO_EPOCH :: {-1 | non_neg_integer(), binary()}
max_public_epochid = ?NO_EPOCH :: {-1 | non_neg_integer(), binary()},
max_private_epochid = ?NO_EPOCH :: {-1 | non_neg_integer(), binary()}
}).
%% @doc Start a new projection store server.
@ -80,15 +80,15 @@ start_link(RegName, DataDir, NotifyWedgeStateChanges) ->
%% @doc Fetch the latest epoch number + checksum for type `ProjType'.
get_latest_epoch(PidSpec, ProjType) ->
get_latest_epoch(PidSpec, ProjType, infinity).
get_latest_epochid(PidSpec, ProjType) ->
get_latest_epochid(PidSpec, ProjType, infinity).
%% @doc Fetch the latest epoch number + checksum for type `ProjType'.
%% projection.
get_latest_epoch(PidSpec, ProjType, Timeout)
get_latest_epochid(PidSpec, ProjType, Timeout)
when ProjType == 'public' orelse ProjType == 'private' ->
g_call(PidSpec, {get_latest_epoch, ProjType}, Timeout).
g_call(PidSpec, {get_latest_epochid, ProjType}, Timeout).
%% @doc Fetch the latest projection record for type `ProjType'.
@ -168,25 +168,27 @@ init([DataDir, NotifyWedgeStateChanges]) ->
PrivateDir = machi_util:make_projection_filename(DataDir, "private"),
ok = filelib:ensure_dir(PublicDir ++ "/ignored"),
ok = filelib:ensure_dir(PrivateDir ++ "/ignored"),
MaxPublicEpoch = find_max_epoch(PublicDir),
MaxPrivateEpoch = find_max_epoch(PrivateDir),
MbEpoch = find_max_epochid(PublicDir),
%% MbEpoch = {Mb#projection_v1.epoch_number, Mb#projection_v1.epoch_csum},
MvEpoch = find_max_epochid(PrivateDir),
%% MvEpoch = {Mv#projection_v1.epoch_number, Mv#projection_v1.epoch_csum},
{ok, #state{public_dir=PublicDir,
private_dir=PrivateDir,
wedge_notify_pid=NotifyWedgeStateChanges,
max_public_epoch=MaxPublicEpoch,
max_private_epoch=MaxPrivateEpoch}}.
max_public_epochid=MbEpoch,
max_private_epochid=MvEpoch}}.
handle_call({{get_latest_epoch, ProjType}, LC1}, _From, S) ->
handle_call({{get_latest_epochid, ProjType}, LC1}, _From, S) ->
LC2 = lclock_update(LC1),
EpochId = if ProjType == public -> S#state.max_public_epoch;
ProjType == private -> S#state.max_private_epoch
EpochId = if ProjType == public -> S#state.max_public_epochid;
ProjType == private -> S#state.max_private_epochid
end,
{reply, {{ok, EpochId}, LC2}, S};
handle_call({{read_latest_projection, ProjType}, LC1}, _From, S) ->
LC2 = lclock_update(LC1),
{EpochNum, _CSum} = if ProjType == public -> S#state.max_public_epoch;
ProjType == private -> S#state.max_private_epoch
{EpochNum, _CSum} = if ProjType == public -> S#state.max_public_epochid;
ProjType == private -> S#state.max_private_epochid
end,
{Reply, NewS} = do_proj_read(ProjType, EpochNum, S),
{reply, {Reply, LC2}, NewS};
@ -268,7 +270,7 @@ do_proj_write(ProjType, #projection_v1{epoch_number=Epoch}=Proj, S) ->
EffectiveEpochId = {EffectiveEpoch, EffectiveProj#projection_v1.epoch_csum},
%%
NewS = if ProjType == public,
Epoch > element(1, S#state.max_public_epoch) ->
Epoch > element(1, S#state.max_public_epochid) ->
if Epoch == EffectiveEpoch ->
%% This is a regular projection, i.e.,
%% does not have an inner proj.
@ -281,13 +283,13 @@ do_proj_write(ProjType, #projection_v1{epoch_number=Epoch}=Proj, S) ->
%% not bother wedging.
ok
end,
S#state{max_public_epoch=EpochId};
S#state{max_public_epochid=EpochId};
ProjType == private,
Epoch > element(1, S#state.max_private_epoch) ->
Epoch > element(1, S#state.max_private_epochid) ->
update_wedge_state(
S#state.wedge_notify_pid, false,
EffectiveEpochId),
S#state{max_private_epoch=EpochId};
S#state{max_private_epochid=EpochId};
true ->
S
end,
@ -344,14 +346,14 @@ find_all(Dir) ->
Fs = filelib:wildcard("*", Dir),
lists:sort([name2epoch(F) || F <- Fs]).
find_max_epoch(Dir) ->
find_max_epochid(Dir) ->
Fs = lists:sort(filelib:wildcard("*", Dir)),
if Fs == [] ->
?NO_EPOCH;
true ->
EpochNum = name2epoch(lists:last(Fs)),
{{ok, Proj}, _} = do_proj_read(proj_type_ignored, EpochNum, Dir),
{EpochNum, Proj}
{EpochNum, Proj#projection_v1.epoch_csum}
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%

View file

@ -59,7 +59,7 @@
%% %% Projection API
get_epoch_id/1, get_epoch_id/2,
get_latest_epoch/2, get_latest_epoch/3,
get_latest_epochid/2, get_latest_epochid/3,
read_latest_projection/2, read_latest_projection/3,
read_projection/3, read_projection/4,
write_projection/3, write_projection/4,
@ -176,13 +176,13 @@ get_epoch_id(PidSpec, Timeout) ->
%% @doc Get the latest epoch number + checksum from the FLU's projection store.
get_latest_epoch(PidSpec, ProjType) ->
get_latest_epoch(PidSpec, ProjType, infinity).
get_latest_epochid(PidSpec, ProjType) ->
get_latest_epochid(PidSpec, ProjType, infinity).
%% @doc Get the latest epoch number + checksum from the FLU's projection store.
get_latest_epoch(PidSpec, ProjType, Timeout) ->
gen_server:call(PidSpec, {req, {get_latest_epoch, ProjType}},
get_latest_epochid(PidSpec, ProjType, Timeout) ->
gen_server:call(PidSpec, {req, {get_latest_epochid, ProjType}},
Timeout).
%% @doc Get the latest projection from the FLU's projection store for `ProjType'
@ -290,6 +290,9 @@ code_change(_OldVsn, S, _Extra) ->
%%%%%%%%%%%%%%%%%%%%%%%%%%%
do_req(Req, S) ->
do_req(Req, 1, S).
do_req(Req, Depth, S) ->
S2 = try_connect(S),
Fun = make_req_fun(Req, S2),
case connected_p(S2) of
@ -299,24 +302,26 @@ do_req(Req, S) ->
{ok, S2};
T when element(1, T) == ok ->
{T, S2};
{error, {badmatch, {badmatch, {error, Why}}, _Stk}}
when Why == closed; Why == timeout ->
%% TODO: Infinite recursion isn't
%% good. Exponential backoff might be good.
timer:sleep(500),
do_req(Req, disconnect(S2));
Else ->
%% {error, {badmatch, {badmatch, {error, Why}=TheErr}, _Stk}}
%% when Why == closed; Why == timeout ->
%% do_req_retry(Req, Depth, TheErr, S2);
TheErr ->
case get(bad_sock) of
Bad when Bad == S2#state.sock ->
{Else, disconnect(S2)};
do_req_retry(Req, Depth, TheErr, S2);
_ ->
{Else, S2}
{TheErr, S2}
end
end;
false ->
{{error, partition}, S2}
end.
do_req_retry(_Req, 2, Err, S) ->
{Err, disconnect(S)};
do_req_retry(Req, Depth, _Err, S) ->
do_req(Req, Depth + 1, try_connect(disconnect(S))).
make_req_fun({append_chunk, EpochID, Prefix, Chunk},
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
fun() -> Mod:append_chunk(Sock, EpochID, Prefix, Chunk) end;
@ -350,9 +355,9 @@ make_req_fun({get_epoch_id},
Error
end
end;
make_req_fun({get_latest_epoch, ProjType},
make_req_fun({get_latest_epochid, ProjType},
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
fun() -> Mod:get_latest_epoch(Sock, ProjType) end;
fun() -> Mod:get_latest_epochid(Sock, ProjType) end;
make_req_fun({read_latest_projection, ProjType},
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
fun() -> Mod:read_latest_projection(Sock, ProjType) end;

View file

@ -378,10 +378,10 @@ todo_why_does_this_crash_sometimes(FLUName, FLU, PPPepoch) ->
end.
private_projections_are_stable(Namez, PollFunc) ->
Private1 = [?FLU_PC:get_latest_epoch(FLU, private) ||
Private1 = [?FLU_PC:get_latest_epochid(FLU, private) ||
{_Name, FLU} <- Namez],
PollFunc(5, 1, 10),
Private2 = [?FLU_PC:get_latest_epoch(FLU, private) ||
Private2 = [?FLU_PC:get_latest_epochid(FLU, private) ||
{_Name, FLU} <- Namez],
true = (Private1 == Private2).

View file

@ -149,7 +149,7 @@ flu_projection_smoke_test() ->
FLU1 = setup_test_flu(projection_test_flu, TcpPort, DataDir),
try
[begin
{ok, {0,_}} = ?FLU_C:get_latest_epoch(Host, TcpPort, T),
{ok, {0,_}} = ?FLU_C:get_latest_epochid(Host, TcpPort, T),
{error, not_written} =
?FLU_C:read_latest_projection(Host, TcpPort, T),
{ok, []} = ?FLU_C:list_all_projections(Host, TcpPort, T),
@ -160,7 +160,7 @@ flu_projection_smoke_test() ->
ok = ?FLU_C:write_projection(Host, TcpPort, T, P1),
{error, written} = ?FLU_C:write_projection(Host, TcpPort, T, P1),
{ok, P1} = ?FLU_C:read_projection(Host, TcpPort, T, 1),
{ok, {1,_}} = ?FLU_C:get_latest_epoch(Host, TcpPort, T),
{ok, {1,_}} = ?FLU_C:get_latest_epochid(Host, TcpPort, T),
{ok, P1} = ?FLU_C:read_latest_projection(Host, TcpPort, T),
{ok, [1]} = ?FLU_C:list_all_projections(Host, TcpPort, T),
{ok, [P1]} = ?FLU_C:get_all_projections(Host, TcpPort, T),

View file

@ -74,7 +74,7 @@ api_smoke_test() ->
{error, no_such_file} = ?MUT:checksum_list(Prox1, FakeEpoch, BadFile),
{ok, [_|_]} = ?MUT:list_files(Prox1, FakeEpoch),
{ok, {false, _}} = ?MUT:wedge_status(Prox1),
{ok, FakeEpoch} = ?MUT:get_latest_epoch(Prox1, public),
{ok, FakeEpoch} = ?MUT:get_latest_epochid(Prox1, public),
{error, not_written} = ?MUT:read_latest_projection(Prox1, public),
{error, not_written} = ?MUT:read_projection(Prox1, public, 44),
P_a = #p_srvr{name=a, address="localhost", port=6622},
@ -83,6 +83,7 @@ api_smoke_test() ->
{ok, P1} = ?MUT:read_projection(Prox1, public, 1),
{ok, [P1]} = ?MUT:get_all_projections(Prox1, public),
{ok, [1]} = ?MUT:list_all_projections(Prox1, public),
ok
after
_ = (catch ?MUT:quit(Prox1))
@ -91,5 +92,183 @@ api_smoke_test() ->
(catch machi_flu1:stop(FLU1)),
(catch machi_flu1:stop(get(flu_pid)))
end.
flu_restart_test() ->
RegName = api_smoke_flu,
Host = "localhost",
TcpPort = 57125,
DataDir = "./data.api_smoke_flu2",
W_props = [{initial_wedged, false}],
erase(flu_pid),
put(flu_pid, []),
FLU1 = machi_flu1_test:setup_test_flu(RegName, TcpPort, DataDir,
W_props),
put(flu_pid, [FLU1|get(flu_pid)]),
try
I = #p_srvr{name=RegName, address=Host, port=TcpPort},
{ok, Prox1} = ?MUT:start_link(I),
try
FakeEpoch = ?DUMMY_PV1_EPOCH,
Data = <<"data!">>,
{ok, {Off1,Size1,File1}} = ?MUT:append_chunk(Prox1,
FakeEpoch, <<"prefix">>, Data,
infinity),
P_a = #p_srvr{name=a, address="localhost", port=6622},
P1 = machi_projection:new(1, a, [P_a], [], [a], [], []),
EpochID = {P1#projection_v1.epoch_number,
P1#projection_v1.epoch_csum},
ok = ?MUT:write_projection(Prox1, public, P1),
ok = ?MUT:write_projection(Prox1, private, P1),
{ok, EpochID} = ?MUT:get_epoch_id(Prox1),
{ok, EpochID} = ?MUT:get_latest_epochid(Prox1, public),
{ok, EpochID} = ?MUT:get_latest_epochid(Prox1, private),
ok = machi_flu1:stop(FLU1), timer:sleep(50),
%% Now that the last proxy op was successful and only
%% after did we stop the FLU, let's check that both the
%% 1st & 2nd ops-via-proxy after FLU is restarted are
%% successful. And immediately after stopping the FLU,
%% both 1st & 2nd ops-via-proxy should always fail.
%%
%% Some of the expectations have unbound variables, which
%% makes the code a bit convoluted. (No LFE or
%% Elixir macros here, alas, they'd be useful.)
ExpectedOps =
[
fun(run) -> {ok, EpochID} = ?MUT:get_epoch_id(Prox1),
ok;
(line) -> io:format("line ~p, ", [?LINE]);
(stop) -> ?MUT:get_epoch_id(Prox1) end,
fun(run) -> {ok, EpochID} =
?MUT:get_latest_epochid(Prox1, public),
ok;
(line) -> io:format("line ~p, ", [?LINE]);
(stop) -> ?MUT:get_latest_epochid(Prox1, public)
end,
fun(run) -> {ok, EpochID} =
?MUT:get_latest_epochid(Prox1, private),
ok;
(line) -> io:format("line ~p, ", [?LINE]);
(stop) -> ?MUT:get_latest_epochid(Prox1, private)
end,
fun(run) -> {ok, P1} =
?MUT:read_projection(Prox1, public, 1),
ok;
(line) -> io:format("line ~p, ", [?LINE]);
(stop) -> ?MUT:read_projection(Prox1, public, 1)
end,
fun(run) -> {ok, P1} =
?MUT:read_projection(Prox1, private, 1),
ok;
(line) -> io:format("line ~p, ", [?LINE]);
(stop) -> ?MUT:read_projection(Prox1, private, 1)
end,
fun(run) -> {error, not_written} =
?MUT:read_projection(Prox1, private, 7),
ok;
(line) -> io:format("line ~p, ", [?LINE]);
(stop) -> ?MUT:read_projection(Prox1, private, 7)
end,
fun(run) -> {error, written} =
?MUT:write_projection(Prox1, public, P1),
ok;
(line) -> io:format("line ~p, ", [?LINE]);
(stop) -> ?MUT:write_projection(Prox1, public, P1)
end,
fun(run) -> {error, written} =
?MUT:write_projection(Prox1, private, P1),
ok;
(line) -> io:format("line ~p, ", [?LINE]);
(stop) -> ?MUT:write_projection(Prox1, private, P1)
end,
fun(run) -> {ok, [_]} =
?MUT:get_all_projections(Prox1, public),
ok;
(line) -> io:format("line ~p, ", [?LINE]);
(stop) -> ?MUT:get_all_projections(Prox1, public)
end,
fun(run) -> {ok, [_]} =
?MUT:get_all_projections(Prox1, private),
ok;
(line) -> io:format("line ~p, ", [?LINE]);
(stop) -> ?MUT:get_all_projections(Prox1, private)
end,
fun(run) -> {ok, {_,_,_}} =
?MUT:append_chunk(Prox1, FakeEpoch,
<<"prefix">>, Data, infinity),
ok;
(line) -> io:format("line ~p, ", [?LINE]);
(stop) -> ?MUT:append_chunk(Prox1, FakeEpoch,
<<"prefix">>, Data, infinity)
end,
fun(run) -> {ok, {_,_,_}} =
?MUT:append_chunk_extra(Prox1, FakeEpoch,
<<"prefix">>, Data, 42, infinity),
ok;
(line) -> io:format("line ~p, ", [?LINE]);
(stop) -> ?MUT:append_chunk_extra(Prox1, FakeEpoch,
<<"prefix">>, Data, 42, infinity)
end,
fun(run) -> {ok, Data} =
?MUT:read_chunk(Prox1, FakeEpoch,
File1, Off1, Size1),
ok;
(line) -> io:format("line ~p, ", [?LINE]);
(stop) -> ?MUT:read_chunk(Prox1, FakeEpoch,
File1, Off1, Size1)
end,
fun(run) -> {ok, _} =
?MUT:checksum_list(Prox1, FakeEpoch, File1),
ok;
(line) -> io:format("line ~p, ", [?LINE]);
(stop) -> ?MUT:checksum_list(Prox1, FakeEpoch, File1)
end,
fun(run) -> {ok, _} =
?MUT:list_files(Prox1, FakeEpoch),
ok;
(line) -> io:format("line ~p, ", [?LINE]);
(stop) -> ?MUT:list_files(Prox1, FakeEpoch)
end,
fun(run) -> {ok, _} =
?MUT:wedge_status(Prox1),
ok;
(line) -> io:format("line ~p, ", [?LINE]);
(stop) -> ?MUT:wedge_status(Prox1)
end,
%% NOTE: When write-once enforcement is enabled, this test
%% will fail: change ok -> {error, written}
fun(run) -> %% {error, written} =
ok =
?MUT:write_chunk(Prox1, FakeEpoch, File1, Off1,
Data, infinity),
ok;
(line) -> io:format("line ~p, ", [?LINE]);
(stop) -> ?MUT:write_chunk(Prox1, FakeEpoch, File1, Off1,
Data, infinity)
end
],
[begin
FLU2 = machi_flu1_test:setup_test_flu(
RegName, TcpPort, DataDir,
[save_data_dir|W_props]),
put(flu_pid, [FLU2|get(flu_pid)]),
_ = Fun(line),
ok = Fun(run),
ok = Fun(run),
ok = machi_flu1:stop(FLU2),
{error, partition} = Fun(stop),
{error, partition} = Fun(stop),
ok
end || Fun <- ExpectedOps ],
ok
after
_ = (catch ?MUT:quit(Prox1))
end
after
[catch machi_flu1:stop(Pid) || Pid <- get(flu_pid)]
end.
-endif. % TEST