WIP: additional tests for wedge state
This commit is contained in:
parent
376c4a9ae1
commit
316126fa59
3 changed files with 50 additions and 20 deletions
|
@ -77,7 +77,7 @@
|
|||
tcp_port :: non_neg_integer(),
|
||||
data_dir :: string(),
|
||||
wedged = true :: boolean(),
|
||||
epoch_id :: 'undefined' | machi_projection:pv_epoch(),
|
||||
epoch_id :: 'undefined' | pv1_epoch(),
|
||||
dbg_props = [] :: list(), % proplist
|
||||
props = [] :: list() % proplist
|
||||
}).
|
||||
|
@ -174,9 +174,12 @@ listen_server_loop(LSock, S) ->
|
|||
spawn_link(fun() -> net_server_loop(Sock, S) end),
|
||||
listen_server_loop(LSock, S).
|
||||
|
||||
append_server_loop(FluPid, #state{data_dir=DataDir}=S) ->
|
||||
append_server_loop(FluPid, #state{data_dir=DataDir,wedged=Wedged_p}=S) ->
|
||||
AppendServerPid = self(),
|
||||
receive
|
||||
{seq_append, From, _Prefix, _Chunk, _CSum} when Wedged_p ->
|
||||
From ! wedged,
|
||||
append_server_loop(FluPid, S);
|
||||
{seq_append, From, Prefix, Chunk, CSum} ->
|
||||
spawn(fun() -> append_server_dispatch(From, Prefix, Chunk, CSum,
|
||||
DataDir, AppendServerPid) end),
|
||||
|
@ -298,7 +301,9 @@ do_net_server_append2(FluName, Sock, LenHex, Prefix) ->
|
|||
{assignment, Offset, File} ->
|
||||
OffsetHex = machi_util:bin_to_hexstr(<<Offset:64/big>>),
|
||||
Out = io_lib:format("OK ~s ~s\n", [OffsetHex, File]),
|
||||
ok = gen_tcp:send(Sock, Out)
|
||||
ok = gen_tcp:send(Sock, Out);
|
||||
wedged ->
|
||||
ok = gen_tcp:send(Sock, <<"ERROR WEDGED\n">>)
|
||||
after 10*1000 ->
|
||||
ok = gen_tcp:send(Sock, "TIMEOUT\n")
|
||||
end.
|
||||
|
|
|
@ -56,6 +56,7 @@
|
|||
-type chunk_s() :: binary(). % server always uses binary()
|
||||
-type chunk_pos() :: {file_offset(), chunk_size(), file_name_s()}.
|
||||
-type chunk_size() :: non_neg_integer().
|
||||
-type error_general() :: 'bad_arg' | 'wedged'.
|
||||
-type epoch_csum() :: binary().
|
||||
-type epoch_num() :: -1 | non_neg_integer().
|
||||
-type epoch_id() :: {epoch_num(), epoch_csum()}.
|
||||
|
@ -76,7 +77,7 @@
|
|||
%% with `Prefix'.
|
||||
|
||||
-spec append_chunk(port(), epoch_id(), file_prefix(), chunk()) ->
|
||||
{ok, chunk_pos()} | {error, term()}.
|
||||
{ok, chunk_pos()} | {error, error_general()} | {error, term()}.
|
||||
append_chunk(Sock, EpochID, Prefix, Chunk) ->
|
||||
append_chunk2(Sock, EpochID, Prefix, Chunk).
|
||||
|
||||
|
@ -85,7 +86,7 @@ append_chunk(Sock, EpochID, Prefix, Chunk) ->
|
|||
|
||||
-spec append_chunk(inet_host(), inet_port(),
|
||||
epoch_id(), file_prefix(), chunk()) ->
|
||||
{ok, chunk_pos()} | {error, term()}.
|
||||
{ok, chunk_pos()} | {error, error_general()} | {error, term()}.
|
||||
append_chunk(Host, TcpPort, EpochID, Prefix, Chunk) ->
|
||||
Sock = machi_util:connect(Host, TcpPort),
|
||||
try
|
||||
|
@ -97,7 +98,9 @@ append_chunk(Host, TcpPort, EpochID, Prefix, Chunk) ->
|
|||
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
|
||||
|
||||
-spec read_chunk(port(), epoch_id(), file_name(), file_offset(), chunk_size()) ->
|
||||
{ok, chunk_s()} | {error, term()}.
|
||||
{ok, chunk_s()} |
|
||||
{error, error_general() | 'no_such_file' | 'partial_read'} |
|
||||
{error, term()}.
|
||||
read_chunk(Sock, EpochID, File, Offset, Size)
|
||||
when Offset >= ?MINIMUM_OFFSET, Size >= 0 ->
|
||||
read_chunk2(Sock, EpochID, File, Offset, Size).
|
||||
|
@ -106,7 +109,9 @@ read_chunk(Sock, EpochID, File, Offset, Size)
|
|||
|
||||
-spec read_chunk(inet_host(), inet_port(), epoch_id(),
|
||||
file_name(), file_offset(), chunk_size()) ->
|
||||
{ok, chunk_s()} | {error, term()}.
|
||||
{ok, chunk_s()} |
|
||||
{error, error_general() | 'no_such_file' | 'partial_read'} |
|
||||
{error, term()}.
|
||||
read_chunk(Host, TcpPort, EpochID, File, Offset, Size)
|
||||
when Offset >= ?MINIMUM_OFFSET, Size >= 0 ->
|
||||
Sock = machi_util:connect(Host, TcpPort),
|
||||
|
@ -119,14 +124,17 @@ read_chunk(Host, TcpPort, EpochID, File, Offset, Size)
|
|||
%% @doc Fetch the list of chunk checksums for `File'.
|
||||
|
||||
-spec checksum_list(port(), epoch_id(), file_name()) ->
|
||||
{ok, [chunk_csum()]} | {error, term()}.
|
||||
{ok, [chunk_csum()]} |
|
||||
{error, error_general() | 'no_such_file' | 'partial_read'} |
|
||||
{error, term()}.
|
||||
checksum_list(Sock, EpochID, File) when is_port(Sock) ->
|
||||
checksum_list2(Sock, EpochID, File).
|
||||
|
||||
%% @doc Fetch the list of chunk checksums for `File'.
|
||||
|
||||
-spec checksum_list(inet_host(), inet_port(), epoch_id(), file_name()) ->
|
||||
{ok, [chunk_csum()]} | {error, term()}.
|
||||
{ok, [chunk_csum()]} |
|
||||
{error, error_general() | 'no_such_file'} | {error, term()}.
|
||||
checksum_list(Host, TcpPort, EpochID, File) when is_integer(TcpPort) ->
|
||||
Sock = machi_util:connect(Host, TcpPort),
|
||||
try
|
||||
|
@ -157,7 +165,7 @@ list_files(Host, TcpPort, EpochID) when is_integer(TcpPort) ->
|
|||
%% @doc Fetch the wedge status from the remote FLU.
|
||||
|
||||
-spec wedge_status(port()) ->
|
||||
{ok, {boolean(), machi_projection:pv1_epoch()}} | {error, term()}.
|
||||
{ok, {boolean(), pv1_epoch()}} | {error, term()}.
|
||||
|
||||
wedge_status(Sock) when is_port(Sock) ->
|
||||
wedge_status2(Sock).
|
||||
|
@ -165,7 +173,7 @@ wedge_status(Sock) when is_port(Sock) ->
|
|||
%% @doc Fetch the wedge status from the remote FLU.
|
||||
|
||||
-spec wedge_status(inet_host(), inet_port()) ->
|
||||
{ok, {boolean(), machi_projection:pv1_epoch()}} | {error, term()}.
|
||||
{ok, {boolean(), pv1_epoch()}} | {error, term()}.
|
||||
wedge_status(Host, TcpPort) when is_integer(TcpPort) ->
|
||||
Sock = machi_util:connect(Host, TcpPort),
|
||||
try
|
||||
|
@ -323,7 +331,7 @@ quit(Sock) when is_port(Sock) ->
|
|||
%% `File' at `Offset'.
|
||||
|
||||
-spec write_chunk(port(), epoch_id(), file_name(), file_offset(), chunk()) ->
|
||||
ok | {error, term()}.
|
||||
ok | {error, error_general()} | {error, term()}.
|
||||
write_chunk(Sock, EpochID, File, Offset, Chunk)
|
||||
when Offset >= ?MINIMUM_OFFSET ->
|
||||
write_chunk2(Sock, EpochID, File, Offset, Chunk).
|
||||
|
@ -333,7 +341,7 @@ write_chunk(Sock, EpochID, File, Offset, Chunk)
|
|||
|
||||
-spec write_chunk(inet_host(), inet_port(),
|
||||
epoch_id(), file_name(), file_offset(), chunk()) ->
|
||||
ok | {error, term()}.
|
||||
ok | {error, error_general()} | {error, term()}.
|
||||
write_chunk(Host, TcpPort, EpochID, File, Offset, Chunk)
|
||||
when Offset >= ?MINIMUM_OFFSET ->
|
||||
Sock = machi_util:connect(Host, TcpPort),
|
||||
|
@ -347,7 +355,7 @@ write_chunk(Host, TcpPort, EpochID, File, Offset, Chunk)
|
|||
%% migrated.
|
||||
|
||||
-spec delete_migration(port(), epoch_id(), file_name()) ->
|
||||
ok | {error, term()}.
|
||||
ok | {error, error_general() | 'no_such_file'} | {error, term()}.
|
||||
delete_migration(Sock, EpochID, File) when is_port(Sock) ->
|
||||
delete_migration2(Sock, EpochID, File).
|
||||
|
||||
|
@ -355,7 +363,7 @@ delete_migration(Sock, EpochID, File) when is_port(Sock) ->
|
|||
%% migrated.
|
||||
|
||||
-spec delete_migration(inet_host(), inet_port(), epoch_id(), file_name()) ->
|
||||
ok | {error, term()}.
|
||||
ok | {error, error_general() | 'no_such_file'} | {error, term()}.
|
||||
delete_migration(Host, TcpPort, EpochID, File) when is_integer(TcpPort) ->
|
||||
Sock = machi_util:connect(Host, TcpPort),
|
||||
try
|
||||
|
@ -368,7 +376,7 @@ delete_migration(Host, TcpPort, EpochID, File) when is_integer(TcpPort) ->
|
|||
%% erasure coded.
|
||||
|
||||
-spec trunc_hack(port(), epoch_id(), file_name()) ->
|
||||
ok | {error, term()}.
|
||||
ok | {error, error_general() | 'no_such_file'} | {error, term()}.
|
||||
trunc_hack(Sock, EpochID, File) when is_port(Sock) ->
|
||||
trunc_hack2(Sock, EpochID, File).
|
||||
|
||||
|
@ -376,7 +384,7 @@ trunc_hack(Sock, EpochID, File) when is_port(Sock) ->
|
|||
%% erasure coded.
|
||||
|
||||
-spec trunc_hack(inet_host(), inet_port(), epoch_id(), file_name()) ->
|
||||
ok | {error, term()}.
|
||||
ok | {error, error_general() | 'no_such_file'} | {error, term()}.
|
||||
trunc_hack(Host, TcpPort, EpochID, File) when is_integer(TcpPort) ->
|
||||
Sock = machi_util:connect(Host, TcpPort),
|
||||
try
|
||||
|
@ -410,6 +418,8 @@ append_chunk2(Sock, EpochID, Prefix0, Chunk0) ->
|
|||
{ok, {Offset, Len, Path}};
|
||||
<<"ERROR BAD-ARG", _/binary>> ->
|
||||
{error, bad_arg};
|
||||
<<"ERROR WEDGED", _/binary>> ->
|
||||
{error, wedged};
|
||||
<<"ERROR ", Rest/binary>> ->
|
||||
{error, Rest}
|
||||
end
|
||||
|
@ -454,6 +464,8 @@ read_chunk2(Sock, EpochID, File0, Offset, Size) ->
|
|||
{error, bad_arg};
|
||||
<<"OR PARTIAL-READ\n">> ->
|
||||
{error, partial_read};
|
||||
<<"OR WEDGED", _/binary>> ->
|
||||
{error, wedged};
|
||||
_ ->
|
||||
{error, Else2}
|
||||
end;
|
||||
|
@ -538,6 +550,8 @@ checksum_list2(Sock, EpochID, File) ->
|
|||
{error, no_such_file};
|
||||
{ok, <<"ERROR BAD-ARG", _/binary>>} ->
|
||||
{error, bad_arg};
|
||||
{ok, <<"ERROR WEDGED", _/binary>>} ->
|
||||
{error, wedged};
|
||||
{ok, Else} ->
|
||||
throw({server_protocol_error, Else})
|
||||
end
|
||||
|
@ -599,6 +613,8 @@ write_chunk2(Sock, EpochID, File0, Offset, Chunk0) ->
|
|||
ok;
|
||||
<<"ERROR BAD-ARG", _/binary>> ->
|
||||
{error, bad_arg};
|
||||
<<"ERROR WEDGED", _/binary>> ->
|
||||
{error, wedged};
|
||||
<<"ERROR ", _/binary>>=Else ->
|
||||
{error, {server_said, Else}}
|
||||
end
|
||||
|
@ -626,6 +642,8 @@ delete_migration2(Sock, EpochID, File) ->
|
|||
{error, no_such_file};
|
||||
{ok, <<"ERROR BAD-ARG", _/binary>>} ->
|
||||
{error, bad_arg};
|
||||
{ok, <<"ERROR WEDGED", _/binary>>} ->
|
||||
{error, wedged};
|
||||
{ok, Else} ->
|
||||
throw({server_protocol_error, Else})
|
||||
end
|
||||
|
@ -653,6 +671,8 @@ trunc_hack2(Sock, EpochID, File) ->
|
|||
{error, no_such_file};
|
||||
{ok, <<"ERROR BAD-ARG", _/binary>>} ->
|
||||
{error, bad_arg};
|
||||
{ok, <<"ERROR WEDGED", _/binary>>} ->
|
||||
{error, wedged};
|
||||
{ok, Else} ->
|
||||
throw({server_protocol_error, Else})
|
||||
end
|
||||
|
|
|
@ -90,11 +90,12 @@ partial_stop_restart2() ->
|
|||
try
|
||||
[Start(P) || P <- Ps],
|
||||
[{ok, {true, _}} = WedgeStatus(P) || P <- Ps], % all are wedged
|
||||
[bummer = Append(P) || P <- Ps], % all are wedged
|
||||
[{error,wedged} = Append(P) || P <- Ps], % all are wedged
|
||||
|
||||
[machi_chain_manager1:set_chain_members(ChMgr, Dict) ||
|
||||
ChMgr <- ChMgrs ],
|
||||
[{ok, {false, _}} = WedgeStatus(P) || P <- Ps], % *not* wedged
|
||||
[{ok,_} = Append(P) || P <- Ps], % *not* wedged
|
||||
|
||||
{_,_,_} = machi_chain_manager1:test_react_to_env(hd(ChMgrs)),
|
||||
[begin
|
||||
|
@ -118,6 +119,7 @@ partial_stop_restart2() ->
|
|||
Proj_mCSum = Proj_m#projection_v1.epoch_csum,
|
||||
[{ok, {false, {Epoch_m, Proj_mCSum}}} = WedgeStatus(P) || % *not* wedged
|
||||
P <- Ps],
|
||||
[{ok,_} = Append(P) || P <- Ps], % *not* wedged
|
||||
|
||||
%% Stop all but 'a'.
|
||||
[ok = machi_flu_psup:stop_flu_package(Name) || {Name,_} <- tl(Ps)],
|
||||
|
@ -129,11 +131,14 @@ partial_stop_restart2() ->
|
|||
%% Remember: 'a' is not in active mode.
|
||||
{ok, Proj_m} = machi_projection_store:read_latest_projection(
|
||||
hd(PStores), private),
|
||||
%% TODO: confirm that 'a' is wedged
|
||||
%% Confirm that 'a' is wedged
|
||||
{error, wedged} = Append(hd(Ps)),
|
||||
%% Iterate through humming consensus once
|
||||
{now_using,_,Epoch_n} = machi_chain_manager1:test_react_to_env(
|
||||
hd(ChMgrs)),
|
||||
true = (Epoch_n > Epoch_m),
|
||||
%% TODO: confirm that 'b' is wedged
|
||||
%% Confirm that 'a' is *not* wedged
|
||||
{ok, _} = Append(hd(Ps)),
|
||||
|
||||
ok
|
||||
after
|
||||
|
|
Loading…
Reference in a new issue