Merge branch 'slf/chain-repair'

This commit is contained in:
Scott Lystig Fritchie 2015-05-08 21:41:25 +09:00
commit 5fe4c7406c
13 changed files with 388 additions and 108 deletions

View file

@ -30,9 +30,9 @@ func, and pattern match Erlang style in that func.
** DONE Move prototype/chain-manager code to "top" of source tree
*** DONE Preserve current test code (leave as-is? tiny changes?)
*** DONE Make chain manager code flexible enough to run "real world" or "sim"
** TODO Implement real data repair, orchestrated by the chain manager
** DONE Add projection wedging logic to each FLU.
** Started.... Implement real data repair, orchestrated by the chain manager
** TODO Change all protocol ops to enforce the epoch ID
** TODO Add projection wedging logic to each FLU.
- Add no-wedging state to make testing easier?

View file

@ -70,7 +70,8 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-export([make_chmgr_regname/1, projection_transitions_are_sane/2]).
-export([make_chmgr_regname/1, projection_transitions_are_sane/2,
inner_projection_exists/1, inner_projection_or_self/1]).
-ifdef(TEST).
@ -171,7 +172,7 @@ init({MyName, InitMembersDict, MgrOpts}) ->
All_list = [P#p_srvr.name || {_, P} <- orddict:to_list(MembersDict)],
Opt = fun(Key, Default) -> proplists:get_value(Key, MgrOpts, Default) end,
RunEnv = [{seed, Opt(seed, now())},
{use_partition_simulator, Opt(use_partition_simulator, true)},
{use_partition_simulator, Opt(use_partition_simulator, false)},
{network_partitions, Opt(network_partitions, [])},
{network_islands, Opt(network_islands, [])},
{flapping_i, Opt(flapping, [])},
@ -523,14 +524,19 @@ calc_projection(_OldThreshold, _NoPartitionThreshold, LastProj,
Down = AllMembers -- Up,
NewUPI_list = [X || X <- OldUPI_list, lists:member(X, Up)],
LastInNewUPI = case NewUPI_list of
[] -> does_not_exist_because_upi_is_empty;
[_|_] -> lists:last(NewUPI_list)
end,
Repairing_list2 = [X || X <- OldRepairing_list, lists:member(X, Up)],
Simulator_p = proplists:get_value(use_partition_simulator, RunEnv2, false),
{NewUPI_list3, Repairing_list3, RunEnv3} =
case {NewUp, Repairing_list2} of
{[], []} ->
D_foo=[],
{NewUPI_list, [], RunEnv2};
{[], [H|T]} when RelativeToServer == hd(NewUPI_list) ->
%% The author is head of the UPI list. Let's see if
{[], [H|T]} when RelativeToServer == LastInNewUPI ->
%% The author is tail of the UPI list. Let's see if
%% *everyone* in the UPI+repairing lists are using our
%% projection. This is to simulate a requirement that repair
%% a real repair process cannot take place until the chain is
@ -540,12 +546,12 @@ calc_projection(_OldThreshold, _NoPartitionThreshold, LastProj,
SameEpoch_p = check_latest_private_projections_same_epoch(
tl(NewUPI_list) ++ Repairing_list2,
S#ch_mgr.proj, Partitions, S),
if not SameEpoch_p ->
D_foo=[],
{NewUPI_list, OldRepairing_list, RunEnv2};
true ->
if Simulator_p andalso SameEpoch_p ->
D_foo=[{repair_airquote_done, {we_agree, (S#ch_mgr.proj)#projection_v1.epoch_number}}],
{NewUPI_list ++ [H], T, RunEnv2}
{NewUPI_list ++ [H], T, RunEnv2};
true ->
D_foo=[],
{NewUPI_list, OldRepairing_list, RunEnv2}
end;
{_, _} ->
D_foo=[],

View file

@ -66,7 +66,8 @@
-include("machi.hrl").
-include("machi_projection.hrl").
-export([start_link/1, stop/1]).
-export([start_link/1, stop/1,
update_wedge_state/3]).
-export([make_listener_regname/1, make_projection_server_regname/1]).
-record(state, {
@ -75,8 +76,9 @@
append_pid :: pid(),
tcp_port :: non_neg_integer(),
data_dir :: string(),
wedge = true :: 'disabled' | boolean(),
my_epoch_id :: 'undefined',
wedged = true :: boolean(),
etstab :: ets:tid(),
epoch_id :: 'undefined' | pv1_epoch(),
dbg_props = [] :: list(), % proplist
props = [] :: list() % proplist
}).
@ -94,14 +96,36 @@ stop(Pid) ->
error
end.
update_wedge_state(PidSpec, Boolean, EpochId)
when (Boolean == true orelse Boolean == false), is_tuple(EpochId) ->
PidSpec ! {wedge_state_change, Boolean, EpochId}.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%
ets_table_name(FluName) when is_atom(FluName) ->
list_to_atom(atom_to_list(FluName) ++ "_epoch").
%% ets_table_name(FluName) when is_binary(FluName) ->
%% list_to_atom(binary_to_list(FluName) ++ "_epoch").
main2(FluName, TcpPort, DataDir, Rest) ->
{Props, DbgProps} = case proplists:get_value(dbg, Rest) of
undefined ->
{Rest, []};
DPs ->
{lists:keydelete(dbg, 1, Rest), DPs}
end,
S0 = #state{flu_name=FluName,
tcp_port=TcpPort,
data_dir=DataDir,
props=Rest},
AppendPid = start_append_server(S0),
wedged=proplists:get_value(initial_wedged, DbgProps, true),
etstab=ets_table_name(FluName),
epoch_id=undefined,
dbg_props=DbgProps,
props=Props},
AppendPid = start_append_server(S0, self()),
receive
append_server_ack -> ok
end,
{_ProjRegName, ProjectionPid} =
case proplists:get_value(projection_store_registered_name, Rest) of
undefined ->
@ -114,15 +138,7 @@ main2(FluName, TcpPort, DataDir, Rest) ->
end,
S1 = S0#state{append_pid=AppendPid,
proj_store=ProjectionPid},
S2 = case proplists:get_value(dbg, Rest) of
undefined ->
S1;
DbgProps ->
S1#state{wedge=disabled,
dbg_props=DbgProps,
props=lists:keydelete(dbg, 1, Rest)}
end,
ListenPid = start_listen_server(S2),
ListenPid = start_listen_server(S1),
Config_e = machi_util:make_config_filename(DataDir, "unused"),
ok = filelib:ensure_dir(Config_e),
@ -144,9 +160,9 @@ main2(FluName, TcpPort, DataDir, Rest) ->
start_listen_server(S) ->
proc_lib:spawn_link(fun() -> run_listen_server(S) end).
start_append_server(S) ->
start_append_server(S, AckPid) ->
FluPid = self(),
proc_lib:spawn_link(fun() -> run_append_server(FluPid, S) end).
proc_lib:spawn_link(fun() -> run_append_server(FluPid, AckPid, S) end).
%% start_projection_server(S) ->
%% spawn_link(fun() -> run_projection_server(S) end).
@ -158,25 +174,43 @@ run_listen_server(#state{flu_name=FluName, tcp_port=TcpPort}=S) ->
{ok, LSock} = gen_tcp:listen(TcpPort, SockOpts),
listen_server_loop(LSock, S).
run_append_server(FluPid, #state{flu_name=Name}=S) ->
run_append_server(FluPid, AckPid, #state{flu_name=Name,dbg_props=DbgProps}=S) ->
%% Reminder: Name is the "main" name of the FLU, i.e., no suffix
register(Name, self()),
append_server_loop(FluPid, S).
TID = ets:new(ets_table_name(Name),
[set, protected, named_table, {read_concurrency, true}]),
InitialWedged = proplists:get_value(initial_wedged, DbgProps, true),
ets:insert(TID, {epoch, {InitialWedged, {-65, <<"bogus epoch, yo">>}}}),
AckPid ! append_server_ack,
append_server_loop(FluPid, S#state{etstab=TID}).
listen_server_loop(LSock, S) ->
{ok, Sock} = gen_tcp:accept(LSock),
spawn_link(fun() -> net_server_loop(Sock, S) end),
listen_server_loop(LSock, S).
append_server_loop(FluPid, #state{data_dir=DataDir}=S) ->
append_server_loop(FluPid, #state{data_dir=DataDir,wedged=Wedged_p}=S) ->
AppendServerPid = self(),
receive
{seq_append, From, _Prefix, _Chunk, _CSum} when Wedged_p ->
From ! wedged,
append_server_loop(FluPid, S);
{seq_append, From, Prefix, Chunk, CSum} ->
spawn(fun() -> append_server_dispatch(From, Prefix, Chunk, CSum,
DataDir, AppendServerPid) end),
%% DataDir, FluPid) end),
append_server_loop(FluPid, S);
{wedge_state_change, Boolean} ->
append_server_loop(FluPid, S#state{wedge=Boolean})
{wedge_state_change, Boolean, EpochId} ->
true = ets:insert(S#state.etstab, {epoch, {Boolean, EpochId}}),
append_server_loop(FluPid, S#state{wedged=Boolean,
epoch_id=EpochId});
{wedge_status, FromPid} ->
#state{wedged=Wedged_p, epoch_id=EpochId} = S,
FromPid ! {wedge_status_reply, Wedged_p, EpochId},
append_server_loop(FluPid, S);
Else ->
io:format(user, "append_server_loop: WHA? ~p\n", [Else]),
append_server_loop(FluPid, S)
end.
-define(EpochIDSpace, (4+20)).
@ -199,16 +233,17 @@ net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) ->
Prefix:PrefixLenLF/binary, "\n">> ->
do_net_server_append(FluName, Sock, LenHex, Prefix);
<<"R ",
_EpochIDRaw:(?EpochIDSpace)/binary,
EpochIDRaw:(?EpochIDSpace)/binary,
OffsetHex:16/binary, LenHex:8/binary,
File:FileLenLF/binary, "\n">> ->
do_net_server_read(Sock, OffsetHex, LenHex, File, DataDir);
do_net_server_read(Sock, OffsetHex, LenHex, File, DataDir,
EpochIDRaw, S);
<<"L ", _EpochIDRaw:(?EpochIDSpace)/binary, "\n">> ->
do_net_server_listing(Sock, DataDir);
do_net_server_listing(Sock, DataDir, S);
<<"C ",
_EpochIDRaw:(?EpochIDSpace)/binary,
File:CSumFileLenLF/binary, "\n">> ->
do_net_server_checksum_listing(Sock, File, DataDir);
do_net_server_checksum_listing(Sock, File, DataDir, S);
<<"QUIT\n">> ->
catch gen_tcp:close(Sock),
exit(normal);
@ -220,7 +255,8 @@ net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) ->
_EpochIDRaw:(?EpochIDSpace)/binary,
OffsetHex:16/binary, LenHex:8/binary,
File:WriteFileLenLF/binary, "\n">> ->
do_net_server_write(Sock, OffsetHex, LenHex, File, DataDir);
do_net_server_write(Sock, OffsetHex, LenHex, File, DataDir,
<<"fixme1">>, false, <<"fixme2">>);
%% For data migration only.
<<"DEL-migration ",
_EpochIDRaw:(?EpochIDSpace)/binary,
@ -233,6 +269,8 @@ net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) ->
do_net_server_truncate_hackityhack(Sock, File, DataDir);
<<"PROJ ", LenHex:8/binary, "\n">> ->
do_projection_command(Sock, LenHex, S);
<<"WEDGE-STATUS\n">> ->
do_wedge_status(FluName, Sock);
_ ->
machi_util:verb("Else Got: ~p\n", [Line]),
gen_tcp:send(Sock, "ERROR SYNTAX\n"),
@ -281,12 +319,37 @@ do_net_server_append2(FluName, Sock, LenHex, Prefix) ->
{assignment, Offset, File} ->
OffsetHex = machi_util:bin_to_hexstr(<<Offset:64/big>>),
Out = io_lib:format("OK ~s ~s\n", [OffsetHex, File]),
ok = gen_tcp:send(Sock, Out)
ok = gen_tcp:send(Sock, Out);
wedged ->
ok = gen_tcp:send(Sock, <<"ERROR WEDGED\n">>)
after 10*1000 ->
ok = gen_tcp:send(Sock, "TIMEOUT\n")
end.
do_net_server_read(Sock, OffsetHex, LenHex, FileBin, DataDir) ->
do_wedge_status(FluName, Sock) ->
FluName ! {wedge_status, self()},
Reply = receive
{wedge_status_reply, Bool, EpochId} ->
BoolHex = if Bool == false -> <<"00">>;
Bool == true -> <<"01">>
end,
case EpochId of
undefined ->
EpochHex = machi_util:int_to_hexstr(0, 32),
CSumHex = machi_util:bin_to_hexstr(<<0:(20*8)/big>>);
{Epoch, EpochCSum} ->
EpochHex = machi_util:int_to_hexstr(Epoch, 32),
CSumHex = machi_util:bin_to_hexstr(EpochCSum)
end,
[<<"OK ">>, BoolHex, 32, EpochHex, 32, CSumHex, 10]
after 30*1000 ->
<<"give_it_up\n">>
end,
ok = gen_tcp:send(Sock, Reply).
do_net_server_read(Sock, OffsetHex, LenHex, FileBin, DataDir,
EpochIDRaw, S) ->
{Wedged_p, CurrentEpochId} = ets:lookup_element(S#state.etstab, epoch, 2),
DoItFun = fun(FH, Offset, Len) ->
case file:pread(FH, Offset, Len) of
{ok, Bytes} when byte_size(Bytes) == Len ->
@ -304,20 +367,26 @@ do_net_server_read(Sock, OffsetHex, LenHex, FileBin, DataDir) ->
end
end,
do_net_server_readwrite_common(Sock, OffsetHex, LenHex, FileBin, DataDir,
[read, binary, raw], DoItFun).
[read, binary, raw], DoItFun,
EpochIDRaw, Wedged_p, CurrentEpochId).
do_net_server_readwrite_common(Sock, OffsetHex, LenHex, FileBin, DataDir,
FileOpts, DoItFun) ->
case sanitize_file_string(FileBin) of
ok ->
FileOpts, DoItFun,
EpochIDRaw, Wedged_p, CurrentEpochId) ->
case {Wedged_p, sanitize_file_string(FileBin)} of
{false, ok} ->
do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin,
DataDir, FileOpts, DoItFun);
_ ->
DataDir, FileOpts, DoItFun,
EpochIDRaw, Wedged_p, CurrentEpochId);
{true, _} ->
ok = gen_tcp:send(Sock, <<"ERROR WEDGED\n">>);
{_, __} ->
ok = gen_tcp:send(Sock, <<"ERROR BAD-ARG\n">>)
end.
do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin, DataDir,
FileOpts, DoItFun) ->
FileOpts, DoItFun,
EpochIDRaw, Wedged_p, CurrentEpochId) ->
<<Offset:64/big>> = machi_util:hexstr_to_bin(OffsetHex),
<<Len:32/big>> = machi_util:hexstr_to_bin(LenHex),
{_, Path} = machi_util:make_data_filename(DataDir, FileBin),
@ -332,24 +401,29 @@ do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin, DataDir,
{error, enoent} when OptsHasWrite ->
do_net_server_readwrite_common(
Sock, OffsetHex, LenHex, FileBin, DataDir,
FileOpts, DoItFun);
FileOpts, DoItFun,
EpochIDRaw, Wedged_p, CurrentEpochId);
_Else ->
%%%%%% keep?? machi_util:verb("Else ~p ~p ~p ~p\n", [Offset, Len, Path, _Else]),
ok = gen_tcp:send(Sock, <<"ERROR BAD-IO\n">>)
end.
do_net_server_write(Sock, OffsetHex, LenHex, FileBin, DataDir) ->
do_net_server_write(Sock, OffsetHex, LenHex, FileBin, DataDir,
EpochIDRaw, Wedged_p, CurrentEpochId) ->
CSumPath = machi_util:make_checksum_filename(DataDir, FileBin),
case file:open(CSumPath, [append, raw, binary, delayed_write]) of
{ok, FHc} ->
do_net_server_write2(Sock, OffsetHex, LenHex, FileBin, DataDir, FHc);
do_net_server_write2(Sock, OffsetHex, LenHex, FileBin, DataDir, FHc,
EpochIDRaw, Wedged_p, CurrentEpochId);
{error, enoent} ->
ok = filelib:ensure_dir(CSumPath),
do_net_server_write(Sock, OffsetHex, LenHex, FileBin, DataDir)
do_net_server_write(Sock, OffsetHex, LenHex, FileBin, DataDir,
EpochIDRaw, Wedged_p, CurrentEpochId)
end.
do_net_server_write2(Sock, OffsetHex, LenHex, FileBin, DataDir, FHc) ->
do_net_server_write2(Sock, OffsetHex, LenHex, FileBin, DataDir, FHc,
EpochIDRaw, Wedged_p, CurrentEpochId) ->
DoItFun = fun(FHd, Offset, Len) ->
ok = inet:setopts(Sock, [{packet, raw}]),
{ok, Chunk} = gen_tcp:recv(Sock, Len),
@ -368,7 +442,8 @@ do_net_server_write2(Sock, OffsetHex, LenHex, FileBin, DataDir, FHc) ->
end
end,
do_net_server_readwrite_common(Sock, OffsetHex, LenHex, FileBin, DataDir,
[write, read, binary, raw], DoItFun).
[write, read, binary, raw], DoItFun,
EpochIDRaw, Wedged_p, CurrentEpochId).
perhaps_do_net_server_ec_read(Sock, FH) ->
case file:pread(FH, 0, ?MINIMUM_OFFSET) of
@ -394,7 +469,15 @@ decode_and_reply_net_server_ec_read_version_a(Sock, Rest) ->
<<Body:BodyLen/binary, _/binary>> = Rest2,
ok = gen_tcp:send(Sock, ["ERASURE ", BodyLenHex, " ", Hdr, Body]).
do_net_server_listing(Sock, DataDir) ->
do_net_server_listing(Sock, DataDir, S) ->
{Wedged_p, _CurrentEpochId} = ets:lookup_element(S#state.etstab, epoch, 2),
if Wedged_p ->
ok = gen_tcp:send(Sock, <<"ERROR WEDGED\n">>);
true ->
do_net_server_listing2(Sock, DataDir)
end.
do_net_server_listing2(Sock, DataDir) ->
{_, WildPath} = machi_util:make_data_filename(DataDir, ""),
Files = filelib:wildcard("*", WildPath),
Out = ["OK\n",
@ -409,9 +492,12 @@ do_net_server_listing(Sock, DataDir) ->
],
ok = gen_tcp:send(Sock, Out).
do_net_server_checksum_listing(Sock, File, DataDir) ->
case sanitize_file_string(File) of
ok ->
do_net_server_checksum_listing(Sock, File, DataDir, S) ->
{Wedged_p, _CurrentEpochId} = ets:lookup_element(S#state.etstab, epoch, 2),
case {Wedged_p, sanitize_file_string(File)} of
{true, _} ->
ok = gen_tcp:send(Sock, <<"ERROR WEDGED\n">>);
{false, ok} ->
do_net_server_checksum_listing2(Sock, File, DataDir);
_ ->
ok = gen_tcp:send(Sock, <<"ERROR BAD-ARG\n">>)

View file

@ -31,6 +31,7 @@
read_chunk/5, read_chunk/6,
checksum_list/3, checksum_list/4,
list_files/2, list_files/3,
wedge_status/1, wedge_status/2,
%% Projection API
get_latest_epoch/2, get_latest_epoch/3,
@ -55,6 +56,7 @@
-type chunk_s() :: binary(). % server always uses binary()
-type chunk_pos() :: {file_offset(), chunk_size(), file_name_s()}.
-type chunk_size() :: non_neg_integer().
-type error_general() :: 'bad_arg' | 'wedged'.
-type epoch_csum() :: binary().
-type epoch_num() :: -1 | non_neg_integer().
-type epoch_id() :: {epoch_num(), epoch_csum()}.
@ -75,7 +77,7 @@
%% with `Prefix'.
-spec append_chunk(port(), epoch_id(), file_prefix(), chunk()) ->
{ok, chunk_pos()} | {error, term()}.
{ok, chunk_pos()} | {error, error_general()} | {error, term()}.
append_chunk(Sock, EpochID, Prefix, Chunk) ->
append_chunk2(Sock, EpochID, Prefix, Chunk).
@ -84,7 +86,7 @@ append_chunk(Sock, EpochID, Prefix, Chunk) ->
-spec append_chunk(inet_host(), inet_port(),
epoch_id(), file_prefix(), chunk()) ->
{ok, chunk_pos()} | {error, term()}.
{ok, chunk_pos()} | {error, error_general()} | {error, term()}.
append_chunk(Host, TcpPort, EpochID, Prefix, Chunk) ->
Sock = machi_util:connect(Host, TcpPort),
try
@ -96,7 +98,9 @@ append_chunk(Host, TcpPort, EpochID, Prefix, Chunk) ->
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
-spec read_chunk(port(), epoch_id(), file_name(), file_offset(), chunk_size()) ->
{ok, chunk_s()} | {error, term()}.
{ok, chunk_s()} |
{error, error_general() | 'no_such_file' | 'partial_read'} |
{error, term()}.
read_chunk(Sock, EpochID, File, Offset, Size)
when Offset >= ?MINIMUM_OFFSET, Size >= 0 ->
read_chunk2(Sock, EpochID, File, Offset, Size).
@ -105,7 +109,9 @@ read_chunk(Sock, EpochID, File, Offset, Size)
-spec read_chunk(inet_host(), inet_port(), epoch_id(),
file_name(), file_offset(), chunk_size()) ->
{ok, chunk_s()} | {error, term()}.
{ok, chunk_s()} |
{error, error_general() | 'no_such_file' | 'partial_read'} |
{error, term()}.
read_chunk(Host, TcpPort, EpochID, File, Offset, Size)
when Offset >= ?MINIMUM_OFFSET, Size >= 0 ->
Sock = machi_util:connect(Host, TcpPort),
@ -118,14 +124,17 @@ read_chunk(Host, TcpPort, EpochID, File, Offset, Size)
%% @doc Fetch the list of chunk checksums for `File'.
-spec checksum_list(port(), epoch_id(), file_name()) ->
{ok, [chunk_csum()]} | {error, term()}.
{ok, [chunk_csum()]} |
{error, error_general() | 'no_such_file' | 'partial_read'} |
{error, term()}.
checksum_list(Sock, EpochID, File) when is_port(Sock) ->
checksum_list2(Sock, EpochID, File).
%% @doc Fetch the list of chunk checksums for `File'.
-spec checksum_list(inet_host(), inet_port(), epoch_id(), file_name()) ->
{ok, [chunk_csum()]} | {error, term()}.
{ok, [chunk_csum()]} |
{error, error_general() | 'no_such_file'} | {error, term()}.
checksum_list(Host, TcpPort, EpochID, File) when is_integer(TcpPort) ->
Sock = machi_util:connect(Host, TcpPort),
try
@ -153,6 +162,26 @@ list_files(Host, TcpPort, EpochID) when is_integer(TcpPort) ->
catch gen_tcp:close(Sock)
end.
%% @doc Fetch the wedge status from the remote FLU.
-spec wedge_status(port()) ->
{ok, {boolean(), pv1_epoch()}} | {error, term()}.
wedge_status(Sock) when is_port(Sock) ->
wedge_status2(Sock).
%% @doc Fetch the wedge status from the remote FLU.
-spec wedge_status(inet_host(), inet_port()) ->
{ok, {boolean(), pv1_epoch()}} | {error, term()}.
wedge_status(Host, TcpPort) when is_integer(TcpPort) ->
Sock = machi_util:connect(Host, TcpPort),
try
wedge_status2(Sock)
after
catch gen_tcp:close(Sock)
end.
%% @doc Get the latest epoch number + checksum from the FLU's projection store.
-spec get_latest_epoch(port(), projection_type()) ->
@ -302,7 +331,7 @@ quit(Sock) when is_port(Sock) ->
%% `File' at `Offset'.
-spec write_chunk(port(), epoch_id(), file_name(), file_offset(), chunk()) ->
ok | {error, term()}.
ok | {error, error_general()} | {error, term()}.
write_chunk(Sock, EpochID, File, Offset, Chunk)
when Offset >= ?MINIMUM_OFFSET ->
write_chunk2(Sock, EpochID, File, Offset, Chunk).
@ -312,7 +341,7 @@ write_chunk(Sock, EpochID, File, Offset, Chunk)
-spec write_chunk(inet_host(), inet_port(),
epoch_id(), file_name(), file_offset(), chunk()) ->
ok | {error, term()}.
ok | {error, error_general()} | {error, term()}.
write_chunk(Host, TcpPort, EpochID, File, Offset, Chunk)
when Offset >= ?MINIMUM_OFFSET ->
Sock = machi_util:connect(Host, TcpPort),
@ -326,7 +355,7 @@ write_chunk(Host, TcpPort, EpochID, File, Offset, Chunk)
%% migrated.
-spec delete_migration(port(), epoch_id(), file_name()) ->
ok | {error, term()}.
ok | {error, error_general() | 'no_such_file'} | {error, term()}.
delete_migration(Sock, EpochID, File) when is_port(Sock) ->
delete_migration2(Sock, EpochID, File).
@ -334,7 +363,7 @@ delete_migration(Sock, EpochID, File) when is_port(Sock) ->
%% migrated.
-spec delete_migration(inet_host(), inet_port(), epoch_id(), file_name()) ->
ok | {error, term()}.
ok | {error, error_general() | 'no_such_file'} | {error, term()}.
delete_migration(Host, TcpPort, EpochID, File) when is_integer(TcpPort) ->
Sock = machi_util:connect(Host, TcpPort),
try
@ -347,7 +376,7 @@ delete_migration(Host, TcpPort, EpochID, File) when is_integer(TcpPort) ->
%% erasure coded.
-spec trunc_hack(port(), epoch_id(), file_name()) ->
ok | {error, term()}.
ok | {error, error_general() | 'no_such_file'} | {error, term()}.
trunc_hack(Sock, EpochID, File) when is_port(Sock) ->
trunc_hack2(Sock, EpochID, File).
@ -355,7 +384,7 @@ trunc_hack(Sock, EpochID, File) when is_port(Sock) ->
%% erasure coded.
-spec trunc_hack(inet_host(), inet_port(), epoch_id(), file_name()) ->
ok | {error, term()}.
ok | {error, error_general() | 'no_such_file'} | {error, term()}.
trunc_hack(Host, TcpPort, EpochID, File) when is_integer(TcpPort) ->
Sock = machi_util:connect(Host, TcpPort),
try
@ -389,6 +418,8 @@ append_chunk2(Sock, EpochID, Prefix0, Chunk0) ->
{ok, {Offset, Len, Path}};
<<"ERROR BAD-ARG", _/binary>> ->
{error, bad_arg};
<<"ERROR WEDGED", _/binary>> ->
{error, wedged};
<<"ERROR ", Rest/binary>> ->
{error, Rest}
end
@ -433,6 +464,8 @@ read_chunk2(Sock, EpochID, File0, Offset, Size) ->
{error, bad_arg};
<<"OR PARTIAL-READ\n">> ->
{error, partial_read};
<<"OR WEDGED", _/binary>> ->
{error, wedged};
_ ->
{error, Else2}
end;
@ -455,10 +488,16 @@ list2(Sock, EpochID) ->
EpochIDRaw = <<EpochNum:(4*8)/big, EpochCSum/binary>>,
ok = gen_tcp:send(Sock, [<<"L ">>, EpochIDRaw, <<"\n">>]),
ok = inet:setopts(Sock, [{packet, line}]),
{ok, <<"OK\n">>} = gen_tcp:recv(Sock, 0),
Res = list3(gen_tcp:recv(Sock, 0), Sock),
ok = inet:setopts(Sock, [{packet, raw}]),
{ok, Res}
case gen_tcp:recv(Sock, 0) of
{ok, <<"OK\n">>} ->
Res = list3(gen_tcp:recv(Sock, 0), Sock),
ok = inet:setopts(Sock, [{packet, raw}]),
{ok, Res};
{ok, <<"ERROR WEDGED\n">>} ->
{error, wedged};
{ok, <<"ERROR ", Rest/binary>>} ->
{error, Rest}
end
catch
throw:Error ->
Error;
@ -476,6 +515,28 @@ list3({ok, Line}, Sock) ->
list3(Else, _Sock) ->
throw({server_protocol_error, Else}).
wedge_status2(Sock) ->
try
ok = gen_tcp:send(Sock, [<<"WEDGE-STATUS\n">>]),
ok = inet:setopts(Sock, [{packet, line}]),
{ok, <<"OK ",
BooleanHex:2/binary, " ",
EpochHex:8/binary, " ",
CSumHex:40/binary, "\n">>} = gen_tcp:recv(Sock, 0),
ok = inet:setopts(Sock, [{packet, raw}]),
Boolean = if BooleanHex == <<"00">> -> false;
BooleanHex == <<"01">> -> true
end,
Res = {Boolean, {machi_util:hexstr_to_int(EpochHex),
machi_util:hexstr_to_bin(CSumHex)}},
{ok, Res}
catch
throw:Error ->
Error;
error:{badmatch,_}=BadMatch ->
{error, {badmatch, BadMatch}}
end.
checksum_list2(Sock, EpochID, File) ->
erase(bad_sock),
try
@ -495,6 +556,8 @@ checksum_list2(Sock, EpochID, File) ->
{error, no_such_file};
{ok, <<"ERROR BAD-ARG", _/binary>>} ->
{error, bad_arg};
{ok, <<"ERROR WEDGED", _/binary>>} ->
{error, wedged};
{ok, Else} ->
throw({server_protocol_error, Else})
end
@ -556,6 +619,8 @@ write_chunk2(Sock, EpochID, File0, Offset, Chunk0) ->
ok;
<<"ERROR BAD-ARG", _/binary>> ->
{error, bad_arg};
<<"ERROR WEDGED", _/binary>> ->
{error, wedged};
<<"ERROR ", _/binary>>=Else ->
{error, {server_said, Else}}
end
@ -583,6 +648,8 @@ delete_migration2(Sock, EpochID, File) ->
{error, no_such_file};
{ok, <<"ERROR BAD-ARG", _/binary>>} ->
{error, bad_arg};
{ok, <<"ERROR WEDGED", _/binary>>} ->
{error, wedged};
{ok, Else} ->
throw({server_protocol_error, Else})
end
@ -610,6 +677,8 @@ trunc_hack2(Sock, EpochID, File) ->
{error, no_such_file};
{ok, <<"ERROR BAD-ARG", _/binary>>} ->
{error, bad_arg};
{ok, <<"ERROR WEDGED", _/binary>>} ->
{error, wedged};
{ok, Else} ->
throw({server_protocol_error, Else})
end

View file

@ -66,7 +66,7 @@ init([FluName, TcpPort, DataDir, Props0]) ->
{use_partition_simulator,false}|Props0],
ProjSpec = {ProjRegName,
{machi_projection_store, start_link,
[ProjRegName, DataDir, zarfus_todo]},
[ProjRegName, DataDir, FluName]},
permanent, 5000, worker, []},
MgrSpec = {make_mgr_supname(FluName),
{machi_chain_manager1, start_link,

View file

@ -60,7 +60,6 @@
-record(state, {
public_dir = "" :: string(),
private_dir = "" :: string(),
wedged = true :: boolean(),
wedge_notify_pid :: pid() | atom(),
max_public_epoch = ?NO_EPOCH :: {-1 | non_neg_integer(), binary()},
max_private_epoch = ?NO_EPOCH :: {-1 | non_neg_integer(), binary()}
@ -170,17 +169,16 @@ init([DataDir, NotifyWedgeStateChanges]) ->
{ok, #state{public_dir=PublicDir,
private_dir=PrivateDir,
wedged=true,
wedge_notify_pid=NotifyWedgeStateChanges,
max_public_epoch=MaxPublicEpoch,
max_private_epoch=MaxPrivateEpoch}}.
handle_call({{get_latest_epoch, ProjType}, LC1}, _From, S) ->
LC2 = lclock_update(LC1),
EpochT = if ProjType == public -> S#state.max_public_epoch;
ProjType == private -> S#state.max_private_epoch
EpochId = if ProjType == public -> S#state.max_public_epoch;
ProjType == private -> S#state.max_private_epoch
end,
{reply, {{ok, EpochT}, LC2}, S};
{reply, {{ok, EpochId}, LC2}, S};
handle_call({{read_latest_projection, ProjType}, LC1}, _From, S) ->
LC2 = lclock_update(LC1),
{EpochNum, _CSum} = if ProjType == public -> S#state.max_public_epoch;
@ -258,15 +256,29 @@ do_proj_write(ProjType, #projection_v1{epoch_number=Epoch}=Proj, S) ->
ok = file:write(FH, term_to_binary(Proj)),
ok = file:sync(FH),
ok = file:close(FH),
EpochT = {Epoch, Proj#projection_v1.epoch_csum},
EffectiveProj = machi_chain_manager1:inner_projection_or_self(Proj),
EffectiveEpoch = EffectiveProj#projection_v1.epoch_number,
EpochId = {EffectiveEpoch, EffectiveProj#projection_v1.epoch_csum},
%%
NewS = if ProjType == public,
Epoch > element(1, S#state.max_public_epoch) ->
%io:format(user, "TODO: tell ~p we are wedged by epoch ~p\n", [S#state.wedge_notify_pid, Epoch]),
S#state{max_public_epoch=EpochT, wedged=true};
if Epoch == EffectiveEpoch ->
%% This is a regular projection, i.e.,
%% does not have an inner proj.
update_wedge_state(
S#state.wedge_notify_pid, true, EpochId);
Epoch /= EffectiveEpoch ->
%% This projection has an inner proj.
%% The outer proj is flapping, so we do
%% not bother wedging.
ok
end,
S#state{max_public_epoch=EpochId};
ProjType == private,
Epoch > element(1, S#state.max_private_epoch) ->
%io:format(user, "TODO: tell ~p we are unwedged by epoch ~p\n", [S#state.wedge_notify_pid, Epoch]),
S#state{max_private_epoch=EpochT, wedged=false};
update_wedge_state(
S#state.wedge_notify_pid, false, EpochId),
S#state{max_private_epoch=EpochId};
true ->
S
end,
@ -275,6 +287,39 @@ do_proj_write(ProjType, #projection_v1{epoch_number=Epoch}=Proj, S) ->
{{error, Else}, S}
end.
update_wedge_state(PidSpec, Boolean, {0,_}=EpochId) ->
%% Epoch #0 is a special case: no projection has been written yet.
%% However, given the way that machi_flu_psup starts the
%% processes, we are roughly 100% certain that the FLU for PidSpec
%% is not yet running.
catch machi_flu1:update_wedge_state(PidSpec, Boolean, EpochId);
update_wedge_state(PidSpec, Boolean, EpochId) ->
%% We have a race problem with the startup order by machi_flu_psup:
%% the order is projection store (me!), projection manager, FLU.
%% PidSpec is the FLU. It's almost certainly a registered name.
%% Wait for it to exist before sending a message to it. Racing with
%% supervisor startup/shutdown/restart is ok.
ok = wait_for_liveness(PidSpec, 10*1000),
machi_flu1:update_wedge_state(PidSpec, Boolean, EpochId).
wait_for_liveness(Pid, _WaitTime) when is_pid(Pid) ->
ok;
wait_for_liveness(PidSpec, WaitTime) ->
wait_for_liveness(PidSpec, os:timestamp(), WaitTime).
wait_for_liveness(PidSpec, StartTime, WaitTime) ->
case whereis(PidSpec) of
undefined ->
case timer:now_diff(os:timestamp(), StartTime) div 1000 of
X when X < WaitTime ->
io:format(user, "\nYOO ~p ~p\n", [PidSpec, lists:sort(registered())]),
timer:sleep(1),
wait_for_liveness(PidSpec, StartTime, WaitTime)
end;
_SomePid ->
ok
end.
pick_path(public, S) ->
S#state.public_dir;
pick_path(private, S) ->

View file

@ -54,6 +54,7 @@
read_chunk/5, read_chunk/6,
checksum_list/3, checksum_list/4,
list_files/2, list_files/3,
wedge_status/1, wedge_status/2,
%% %% Projection API
get_latest_epoch/2, get_latest_epoch/3,
@ -131,6 +132,17 @@ list_files(PidSpec, EpochID, Timeout) ->
gen_server:call(PidSpec, {req, {list_files, EpochID}},
Timeout).
%% @doc Fetch the wedge status from the remote FLU.
wedge_status(PidSpec) ->
wedge_status(PidSpec, infinity).
%% @doc Fetch the wedge status from the remote FLU.
wedge_status(PidSpec, Timeout) ->
gen_server:call(PidSpec, {req, {wedge_status}},
Timeout).
%% @doc Get the latest epoch number + checksum from the FLU's projection store.
get_latest_epoch(PidSpec, ProjType) ->
@ -261,6 +273,8 @@ make_req_fun({checksum_list, EpochID, File}, #state{sock=Sock}) ->
fun() -> ?FLU_C:checksum_list(Sock, EpochID, File) end;
make_req_fun({list_files, EpochID}, #state{sock=Sock}) ->
fun() -> ?FLU_C:list_files(Sock, EpochID) end;
make_req_fun({wedge_status}, #state{sock=Sock}) ->
fun() -> ?FLU_C:wedge_status(Sock) end;
make_req_fun({get_latest_epoch, ProjType}, #state{sock=Sock}) ->
fun() -> ?FLU_C:get_latest_epoch(Sock, ProjType) end;
make_req_fun({read_latest_projection, ProjType}, #state{sock=Sock}) ->

View file

@ -35,7 +35,9 @@ verify_file_checksums_test() ->
Host = "localhost",
TcpPort = 32958,
DataDir = "./data",
FLU1 = machi_flu1_test:setup_test_flu(verify1_flu, TcpPort, DataDir),
W_props = [{initial_wedged, false}],
FLU1 = machi_flu1_test:setup_test_flu(verify1_flu, TcpPort, DataDir,
W_props),
Sock1 = machi_util:connect(Host, TcpPort),
try
Prefix = <<"verify_prefix">>,

View file

@ -167,7 +167,8 @@ convergence_demo_testfun(NumFLUs) ->
{Name, PPid}
end || {#p_srvr{name=Name}=P, _Dir} <- PsDirs],
MembersDict = machi_projection:make_members_dict(Ps),
MgrOpts = [private_write_verbose, {active_mode,false}],
MgrOpts = [private_write_verbose, {active_mode,false},
{use_partition_simulator, true}],
MgrNamez =
[begin
{ok, MPid} = ?MGR:start_link(P#p_srvr.name, MembersDict, MgrOpts),

View file

@ -170,6 +170,11 @@ smoke1_test() ->
{ok, M0} = ?MGR:start_link(a, MembersDict, [{active_mode,false}]),
try
{ok, P1} = ?MGR:test_calc_projection(M0, false),
% DERP! Check for race with manager's proxy vs. proj listener
case ?MGR:test_read_latest_public_projection(M0, false) of
{error, partition} -> timer:sleep(500);
_ -> ok
end,
{local_write_result, ok,
{remote_write_results, [{b,ok},{c,ok}]}} =
?MGR:test_write_public_projection(M0, P1),

View file

@ -55,7 +55,8 @@ flu_smoke_test() ->
Prefix = <<"prefix!">>,
BadPrefix = BadFile = "no/good",
FLU1 = setup_test_flu(smoke_flu, TcpPort, DataDir),
W_props = [{initial_wedged, false}],
FLU1 = setup_test_flu(smoke_flu, TcpPort, DataDir, W_props),
try
{error, no_such_file} = ?FLU_C:checksum_list(Host, TcpPort,
?DUMMY_PV1_EPOCH,
@ -64,6 +65,7 @@ flu_smoke_test() ->
?DUMMY_PV1_EPOCH, BadFile),
{ok, []} = ?FLU_C:list_files(Host, TcpPort, ?DUMMY_PV1_EPOCH),
{ok, {false, _}} = ?FLU_C:wedge_status(Host, TcpPort),
Chunk1 = <<"yo!">>,
{ok, {Off1,Len1,File1}} = ?FLU_C:append_chunk(Host, TcpPort,

View file

@ -61,47 +61,94 @@ smoke_test2() ->
ok
end.
smoke2_test_() ->
{timeout, 5*60, fun() -> smoke2_test2() end}.
partial_stop_restart_test_() ->
{timeout, 5*60, fun() -> partial_stop_restart2() end}.
smoke2_test2() ->
partial_stop_restart2() ->
Ps = [{a,#p_srvr{name=a, address="localhost", port=5555, props="./data.a"}},
{b,#p_srvr{name=b, address="localhost", port=5556, props="./data.b"}},
{c,#p_srvr{name=c, address="localhost", port=5557, props="./data.c"}}
],
ChMgrs = [machi_flu_psup:make_mgr_supname(P#p_srvr.name) || {_,P} <-Ps],
PStores = [machi_flu_psup:make_proj_supname(P#p_srvr.name) || {_,P} <-Ps],
Dict = orddict:from_list(Ps),
[os:cmd("rm -rf " ++ P#p_srvr.props) || {_,P} <- Ps],
{ok, SupPid} = machi_flu_sup:start_link(),
Start = fun({_,P}) ->
#p_srvr{name=Name, port=Port, props=Dir} = P,
{ok, _} = machi_flu_psup:start_flu_package(
Name, Port, Dir, [{active_mode,false}])
end,
WedgeStatus = fun({_,#p_srvr{address=Addr, port=TcpPort}}) ->
machi_flu1_client:wedge_status(Addr, TcpPort)
end,
Append = fun({_,#p_srvr{address=Addr, port=TcpPort}}) ->
machi_flu1_client:append_chunk(Addr, TcpPort,
?DUMMY_PV1_EPOCH,
<<"prefix">>, <<"data">>)
end,
try
[begin
#p_srvr{name=Name, port=Port, props=Dir} = P,
{ok, _} = machi_flu_psup:start_flu_package(Name, Port, Dir,
[{active_mode,false}])
end || {_,P} <- Ps],
[Start(P) || P <- Ps],
[{ok, {true, _}} = WedgeStatus(P) || P <- Ps], % all are wedged
[{error,wedged} = Append(P) || P <- Ps], % all are wedged
ChMgrs = [machi_flu_psup:make_mgr_supname(P#p_srvr.name) || {_,P} <-Ps],
PStores = [machi_flu_psup:make_proj_supname(P#p_srvr.name) || {_,P} <-Ps],
Dict = orddict:from_list(Ps),
[machi_chain_manager1:set_chain_members(ChMgr, Dict) ||
ChMgr <- ChMgrs ],
[{ok, {false, _}} = WedgeStatus(P) || P <- Ps], % *not* wedged
[{ok,_} = Append(P) || P <- Ps], % *not* wedged
{now_using,_,_} = machi_chain_manager1:test_react_to_env(hd(ChMgrs)),
{_,_,_} = machi_chain_manager1:test_react_to_env(hd(ChMgrs)),
[begin
_QQa = machi_chain_manager1:test_react_to_env(ChMgr)
end || _ <- lists:seq(1,25), ChMgr <- ChMgrs],
%% All chain maanagers & projection stores should be using the
%% All chain managers & projection stores should be using the
%% same projection which is max projection in each store.
{no_change,_,Epoch_z} = machi_chain_manager1:test_react_to_env(
hd(ChMgrs)),
[{no_change,_,Epoch_z} = machi_chain_manager1:test_react_to_env(
{no_change,_,Epoch_m} = machi_chain_manager1:test_react_to_env(
hd(ChMgrs)),
[{no_change,_,Epoch_m} = machi_chain_manager1:test_react_to_env(
ChMgr )|| ChMgr <- ChMgrs],
{ok, Proj_z} = machi_projection_store:read_latest_projection(
{ok, Proj_m} = machi_projection_store:read_latest_projection(
hd(PStores), public),
[begin
{ok, Proj_z} = machi_projection_store:read_latest_projection(
{ok, Proj_m} = machi_projection_store:read_latest_projection(
PStore, ProjType)
end || ProjType <- [public, private], PStore <- PStores ],
Epoch_z = Proj_z#projection_v1.epoch_number,
Epoch_m = Proj_m#projection_v1.epoch_number,
%% Confirm that all FLUs are *not* wedged, with correct proj & epoch
Proj_mCSum = Proj_m#projection_v1.epoch_csum,
[{ok, {false, {Epoch_m, Proj_mCSum}}} = WedgeStatus(P) || % *not* wedged
P <- Ps],
[{ok,_} = Append(P) || P <- Ps], % *not* wedged
%% Stop all but 'a'.
[ok = machi_flu_psup:stop_flu_package(Name) || {Name,_} <- tl(Ps)],
%% Stop and restart a.
{FluName_a, _} = hd(Ps),
ok = machi_flu_psup:stop_flu_package(FluName_a),
{ok, _} = Start(hd(Ps)),
%% Remember: 'a' is not in active mode.
{ok, Proj_m} = machi_projection_store:read_latest_projection(
hd(PStores), private),
%% Confirm that 'a' is wedged
{error, wedged} = Append(hd(Ps)),
{_, #p_srvr{address=Addr_a, port=TcpPort_a}} = hd(Ps),
{error, wedged} = machi_flu1_client:read_chunk(
Addr_a, TcpPort_a, ?DUMMY_PV1_EPOCH,
<<>>, 99999999, 1),
{error, wedged} = machi_flu1_client:checksum_list(
Addr_a, TcpPort_a, ?DUMMY_PV1_EPOCH, <<>>),
{error, wedged} = machi_flu1_client:list_files(
Addr_a, TcpPort_a, ?DUMMY_PV1_EPOCH),
%% Iterate through humming consensus once
{now_using,_,Epoch_n} = machi_chain_manager1:test_react_to_env(
hd(ChMgrs)),
true = (Epoch_n > Epoch_m),
%% Confirm that 'a' is *not* wedged
{ok, _} = Append(hd(Ps)),
ok
after
exit(SupPid, normal),

View file

@ -33,7 +33,9 @@ api_smoke_test() ->
Host = "localhost",
TcpPort = 57124,
DataDir = "./data.api_smoke_flu",
FLU1 = machi_flu1_test:setup_test_flu(RegName, TcpPort, DataDir),
W_props = [{initial_wedged, false}],
FLU1 = machi_flu1_test:setup_test_flu(RegName, TcpPort, DataDir,
W_props),
erase(flu_pid),
try
@ -54,18 +56,19 @@ api_smoke_test() ->
infinity),
%% Start the FLU again, we should be able to do stuff immediately
FLU1b = machi_flu1_test:setup_test_flu(RegName, TcpPort, DataDir,
[save_data_dir]),
[save_data_dir|W_props]),
put(flu_pid, FLU1b),
MyChunk = <<"my chunk data">>,
{ok, {MyOff,MySize,MyFile}} =
?MUT:append_chunk(Prox1, FakeEpoch, <<"prefix">>, MyChunk,
infinity),
infinity),
{ok, MyChunk} = ?MUT:read_chunk(Prox1, FakeEpoch, MyFile, MyOff, MySize),
%% Alright, now for the rest of the API, whee
BadFile = <<"no-such-file">>,
{error, no_such_file} = ?MUT:checksum_list(Prox1, FakeEpoch, BadFile),
{ok, [_|_]} = ?MUT:list_files(Prox1, FakeEpoch),
{ok, {false, _}} = ?MUT:wedge_status(Prox1),
{ok, FakeEpoch} = ?MUT:get_latest_epoch(Prox1, public),
{error, not_written} = ?MUT:read_latest_projection(Prox1, public),
{error, not_written} = ?MUT:read_projection(Prox1, public, 44),