Add witness_mode to FLU

This commit is contained in:
Scott Lystig Fritchie 2015-07-21 17:29:33 +09:00
parent 6ed5767e06
commit 432190435e
3 changed files with 124 additions and 76 deletions

View file

@ -30,21 +30,8 @@
%%
%% === Protocol origins ===
%%
%% The protocol implemented here is an artisanal, hand-crafted, silly
%% thing that was very quick to put together for a "demo day" proof of
%% concept. It will almost certainly be replaced with something else,
%% both in terms of wire format and better code separation of
%% serialization/deserialization vs. network transport management,
%% etc.
%%
%% For the moment, this module implements a rudimentary TCP-based
%% protocol as the sole supported access method to the server,
%% sequencer, and projection store. Conceptually, those three
%% services are independent and ought to have their own protocols. As
%% a practical matter, there is no need for wire protocol
%% compatibility. Furthermore, from the perspective of failure
%% detection, it is very convenient that all three FLU-related
%% services are accessed using the same single TCP port.
%% Today's FLU fully supports a protocol that is based on Protocol
%% Buffers. Please see the `src/machi.proto' file for details.
%%
%% === TODO items ===
%%
@ -89,6 +76,7 @@
-record(state, {
flu_name :: atom(),
proj_store :: pid(),
witness = false :: boolean(),
append_pid :: pid(),
tcp_port :: non_neg_integer(),
data_dir :: string(),
@ -97,7 +85,6 @@
epoch_id :: 'undefined' | machi_dt:epoch_id(),
pb_mode = undefined :: 'undefined' | 'high' | 'low',
high_clnt :: 'undefined' | pid(),
dbg_props = [] :: list(), % proplist
props = [] :: list() % proplist
}).
@ -131,18 +118,10 @@ wedge_myself(PidSpec, EpochId)
ets_table_name(FluName) when is_atom(FluName) ->
list_to_atom(atom_to_list(FluName) ++ "_epoch").
%% ets_table_name(FluName) when is_binary(FluName) ->
%% list_to_atom(binary_to_list(FluName) ++ "_epoch").
main2(FluName, TcpPort, DataDir, Rest) ->
{Props, DbgProps} = case proplists:get_value(dbg, Rest) of
undefined ->
{Rest, []};
DPs ->
{lists:keydelete(dbg, 1, Rest), DPs}
end,
main2(FluName, TcpPort, DataDir, Props) ->
{SendAppendPidToProj_p, ProjectionPid} =
case proplists:get_value(projection_store_registered_name, Rest) of
case proplists:get_value(projection_store_registered_name, Props) of
undefined ->
RN = make_projection_server_regname(FluName),
{ok, PP} =
@ -151,7 +130,7 @@ main2(FluName, TcpPort, DataDir, Rest) ->
RN ->
{false, whereis(RN)}
end,
InitialWedged_p = proplists:get_value(initial_wedged, DbgProps),
InitialWedged_p = proplists:get_value(initial_wedged, Props),
ProjRes = machi_projection_store:read_latest_projection(ProjectionPid,
private),
{Wedged_p, EpochId} =
@ -165,14 +144,15 @@ main2(FluName, TcpPort, DataDir, Rest) ->
true ->
{true, undefined}
end,
Witness_p = proplists:get_value(witness_mode, Props, false),
S0 = #state{flu_name=FluName,
proj_store=ProjectionPid,
tcp_port=TcpPort,
data_dir=DataDir,
wedged=Wedged_p,
witness=Witness_p,
etstab=ets_table_name(FluName),
epoch_id=EpochId,
dbg_props=DbgProps,
props=Props},
AppendPid = start_append_server(S0, self()),
receive
@ -195,11 +175,11 @@ main2(FluName, TcpPort, DataDir, Rest) ->
ok = filelib:ensure_dir(Projection_e),
put(flu_flu_name, FluName),
put(flu_append_pid, AppendPid),
put(flu_append_pid, S1#state.append_pid),
put(flu_projection_pid, ProjectionPid),
put(flu_listen_pid, ListenPid),
receive killme -> ok end,
(catch exit(AppendPid, kill)),
(catch exit(S1#state.append_pid, kill)),
(catch exit(ProjectionPid, kill)),
(catch exit(ListenPid, kill)),
ok.
@ -231,8 +211,6 @@ run_append_server(FluPid, AckPid, #state{flu_name=Name,
register(Name, self()),
TID = ets:new(ets_table_name(Name),
[set, protected, named_table, {read_concurrency, true}]),
%% InitialWedged = proplists:get_value(initial_wedged, DbgProps, true),
%% ets:insert(TID, {epoch, {InitialWedged, {-65, <<"bogus epoch, yo">>}}}),
ets:insert(TID, {epoch, {Wedged_p, EpochId}}),
AckPid ! append_server_ack,
append_server_loop(FluPid, S#state{etstab=TID}).
@ -243,9 +221,17 @@ listen_server_loop(LSock, S) ->
listen_server_loop(LSock, S).
append_server_loop(FluPid, #state{data_dir=DataDir, wedged=Wedged_p,
witness=Witness_p,
epoch_id=OldEpochId, flu_name=FluName}=S) ->
AppendServerPid = self(),
receive
{seq_append, From, _Prefix, _Chunk, _CSum, _Extra, _EpochID}
when Witness_p ->
%% The FLU's net_server_loop() process ought to filter all
%% witness states, but we'll keep this clause for extra
%% paranoia.
From ! witness,
append_server_loop(FluPid, S);
{seq_append, From, _Prefix, _Chunk, _CSum, _Extra, _EpochID}
when Wedged_p ->
From ! wedged,
@ -255,7 +241,9 @@ append_server_loop(FluPid, #state{data_dir=DataDir, wedged=Wedged_p,
Chunk, CSum, Extra, EpochID,
DataDir, AppendServerPid) end),
append_server_loop(FluPid, S);
QQ =
{wedge_myself, WedgeEpochId} ->
io:format(user, "QQ ~p\n", [QQ]),
if not Wedged_p andalso WedgeEpochId == OldEpochId ->
true = ets:insert(S#state.etstab,
{epoch, {true, OldEpochId}}),
@ -305,11 +293,16 @@ net_server_loop(Sock, S) ->
{machi_pb:encode_mpb_response(R), mode(high, NewS)}
end,
if RespBin == async_no_response ->
ok;
net_server_loop(Sock, S2);
true ->
ok = gen_tcp:send(Sock, RespBin)
end,
net_server_loop(Sock, S2);
case gen_tcp:send(Sock, RespBin) of
ok ->
net_server_loop(Sock, S2);
{error, _} ->
(catch gen_tcp:close(Sock)),
exit(normal)
end
end;
{error, SockError} ->
Msg = io_lib:format("Socket error ~w", [SockError]),
R = #mpb_ll_response{req_id= <<>>,
@ -387,31 +380,43 @@ do_pb_ll_request2(EpochID, CMD, S) ->
do_pb_ll_request3(CMD, S#state{epoch_id=CurrentEpochID})
end.
%% Witness status does not matter below.
do_pb_ll_request3({low_echo, _BogusEpochID, Msg}, S) ->
{Msg, S};
do_pb_ll_request3({low_auth, _BogusEpochID, _User, _Pass}, S) ->
{-6, S};
do_pb_ll_request3({low_wedge_status, _EpochID}, S) ->
{do_server_wedge_status(S), S};
do_pb_ll_request3({low_proj, PCMD}, S) ->
{do_server_proj_request(PCMD, S), S};
%% Witness status *matters* below
do_pb_ll_request3({low_append_chunk, _EpochID, PKey, Prefix, Chunk, CSum_tag,
CSum, ChunkExtra}, S) ->
CSum, ChunkExtra},
#state{witness=false}=S) ->
{do_server_append_chunk(PKey, Prefix, Chunk, CSum_tag, CSum,
ChunkExtra, S), S};
do_pb_ll_request3({low_write_chunk, _EpochID, File, Offset, Chunk, CSum_tag,
CSum}, S) ->
CSum},
#state{witness=false}=S) ->
{do_server_write_chunk(File, Offset, Chunk, CSum_tag, CSum, S), S};
do_pb_ll_request3({low_read_chunk, _EpochID, File, Offset, Size, Opts}, S) ->
do_pb_ll_request3({low_read_chunk, _EpochID, File, Offset, Size, Opts},
#state{witness=false}=S) ->
{do_server_read_chunk(File, Offset, Size, Opts, S), S};
do_pb_ll_request3({low_checksum_list, _EpochID, File}, S) ->
do_pb_ll_request3({low_checksum_list, _EpochID, File},
#state{witness=false}=S) ->
{do_server_checksum_listing(File, S), S};
do_pb_ll_request3({low_list_files, _EpochID}, S) ->
do_pb_ll_request3({low_list_files, _EpochID},
#state{witness=false}=S) ->
{do_server_list_files(S), S};
do_pb_ll_request3({low_wedge_status, _EpochID}, S) ->
{do_server_wedge_status(S), S};
do_pb_ll_request3({low_delete_migration, _EpochID, File}, S) ->
{do_server_delete_migration(File, S), S};
do_pb_ll_request3({low_trunc_hack, _EpochID, File}, S) ->
do_pb_ll_request3({low_delete_migration, _EpochID, File},
#state{witness=false}=S) ->
{do_server_delete_migration(File, S),
#state{witness=false}=S};
do_pb_ll_request3({low_trunc_hack, _EpochID, File},
#state{witness=false}=S) ->
{do_server_trunc_hack(File, S), S};
do_pb_ll_request3({low_proj, PCMD}, S) ->
{do_server_proj_request(PCMD, S), S}.
do_pb_ll_request3(_, #state{witness=true}=S) ->
{{error, bad_arg}, S}. % TODO: new status code??
do_pb_hl_request(#mpb_request{req_id=ReqID}, #state{pb_mode=low}=S) ->
Result = {low_error, 41, "High protocol request while in low mode"},
@ -458,7 +463,7 @@ do_server_proj_request({read_projection, ProjType, Epoch},
machi_projection_store:read(ProjStore, ProjType, Epoch);
do_server_proj_request({write_projection, ProjType, Proj},
#state{proj_store=ProjStore}) ->
machi_projection_store:write(ProjStore, ProjType, Proj);
catch machi_projection_store:write(ProjStore, ProjType, Proj);
do_server_proj_request({get_all_projections, ProjType},
#state{proj_store=ProjStore}) ->
machi_projection_store:get_all_projections(ProjStore, ProjType);
@ -497,6 +502,8 @@ do_server_append_chunk2(_PKey, Prefix, Chunk, CSum_tag, Client_CSum,
{assignment, Offset, File} ->
Size = iolist_size(Chunk),
{ok, {Offset, Size, File}};
witness ->
{error, bad_arg};
wedged ->
{error, wedged}
after 10*1000 ->

View file

@ -42,16 +42,15 @@ clean_up_data_dir(DataDir) ->
setup_test_flu(RegName, TcpPort, DataDir) ->
setup_test_flu(RegName, TcpPort, DataDir, []).
setup_test_flu(RegName, TcpPort, DataDir, DbgProps) ->
case proplists:get_value(save_data_dir, DbgProps) of
setup_test_flu(RegName, TcpPort, DataDir, Props) ->
case proplists:get_value(save_data_dir, Props) of
true ->
ok;
_ ->
clean_up_data_dir(DataDir)
end,
{ok, FLU1} = ?FLU:start_link([{RegName, TcpPort, DataDir},
{dbg, DbgProps}]),
{ok, FLU1} = ?FLU:start_link([{RegName, TcpPort, DataDir}|Props]),
%% TODO the process structuring/racy-ness of the various processes
%% of the FLU needs to be deterministic to remove this sleep race
%% "prevention".
@ -169,28 +168,33 @@ flu_projection_smoke_test() ->
FLU1 = setup_test_flu(projection_test_flu, TcpPort, DataDir),
try
[begin
{ok, {0,_}} = ?FLU_C:get_latest_epochid(Host, TcpPort, T),
{error, not_written} =
?FLU_C:read_latest_projection(Host, TcpPort, T),
{ok, []} = ?FLU_C:list_all_projections(Host, TcpPort, T),
{ok, []} = ?FLU_C:get_all_projections(Host, TcpPort, T),
P_a = #p_srvr{name=a, address="localhost", port=4321},
P1 = machi_projection:new(1, a, [P_a], [], [a], [], []),
ok = ?FLU_C:write_projection(Host, TcpPort, T, P1),
{error, written} = ?FLU_C:write_projection(Host, TcpPort, T, P1),
{ok, P1} = ?FLU_C:read_projection(Host, TcpPort, T, 1),
{ok, {1,_}} = ?FLU_C:get_latest_epochid(Host, TcpPort, T),
{ok, P1} = ?FLU_C:read_latest_projection(Host, TcpPort, T),
{ok, [1]} = ?FLU_C:list_all_projections(Host, TcpPort, T),
{ok, [P1]} = ?FLU_C:get_all_projections(Host, TcpPort, T),
{error, not_written} = ?FLU_C:read_projection(Host, TcpPort, T, 2)
end || T <- [public, private] ]
[ok = flu_projection_common(Host, TcpPort, T) ||
T <- [public, private] ]
%% , {ok, {false, EpochID1}} = ?FLU_C:wedge_status(Host, TcpPort),
%% io:format(user, "EpochID1 ~p\n", [EpochID1])
after
ok = ?FLU:stop(FLU1)
end.
flu_projection_common(Host, TcpPort, T) ->
{ok, {0,_}} = ?FLU_C:get_latest_epochid(Host, TcpPort, T),
{error, not_written} =
?FLU_C:read_latest_projection(Host, TcpPort, T),
{ok, []} = ?FLU_C:list_all_projections(Host, TcpPort, T),
{ok, []} = ?FLU_C:get_all_projections(Host, TcpPort, T),
P_a = #p_srvr{name=a, address="localhost", port=4321},
P1 = machi_projection:new(1, a, [P_a], [], [a], [], []),
ok = ?FLU_C:write_projection(Host, TcpPort, T, P1),
{error, written} = ?FLU_C:write_projection(Host, TcpPort, T, P1),
{ok, P1} = ?FLU_C:read_projection(Host, TcpPort, T, 1),
{ok, {1,_}} = ?FLU_C:get_latest_epochid(Host, TcpPort, T),
{ok, P1} = ?FLU_C:read_latest_projection(Host, TcpPort, T),
{ok, [1]} = ?FLU_C:list_all_projections(Host, TcpPort, T),
{ok, [P1]} = ?FLU_C:get_all_projections(Host, TcpPort, T),
{error, not_written} = ?FLU_C:read_projection(Host, TcpPort, T, 2),
ok.
bad_checksum_test() ->
Host = "localhost",
TcpPort = 32960,
@ -205,10 +209,47 @@ bad_checksum_test() ->
{error, bad_checksum} = ?FLU_C:append_chunk(Host, TcpPort,
?DUMMY_PV1_EPOCH,
Prefix, Chunk1_badcs),
{error, bad_checksum} = ?FLU_C:write_chunk(Host, TcpPort,
?DUMMY_PV1_EPOCH,
<<"foo-file">>, 99832,
Chunk1_badcs),
ok
after
ok = ?FLU:stop(FLU1)
end.
witness_test() ->
Host = "localhost",
TcpPort = 32961,
DataDir = "./data",
Opts = [{initial_wedged, false}, {witness_mode, true}],
FLU1 = setup_test_flu(projection_test_flu, TcpPort, DataDir, Opts),
try
Prefix = <<"some prefix">>,
Chunk1 = <<"yo yo yo">>,
%% All of the projection commands are ok.
[ok = flu_projection_common(Host, TcpPort, T) ||
T <- [public, private] ],
%% Projection has moved beyond initial 0, so get the current EpochID
{ok, EpochID1} = ?FLU_C:get_latest_epochid(Host, TcpPort, private),
%% Witness-protected ops all fail
{error, bad_arg} = ?FLU_C:append_chunk(Host, TcpPort, EpochID1,
Prefix, Chunk1),
File = <<"foofile">>,
{error, bad_arg} = ?FLU_C:read_chunk(Host, TcpPort, EpochID1,
File, 9999, 9999),
{error, bad_arg} = ?FLU_C:checksum_list(Host, TcpPort, EpochID1,
File),
{error, bad_arg} = ?FLU_C:list_files(Host, TcpPort, EpochID1),
{ok, {false, EpochID1}} = ?FLU_C:wedge_status(Host, TcpPort),
{ok, _} = ?FLU_C:get_latest_epochid(Host, TcpPort, public),
{ok, _} = ?FLU_C:read_latest_projection(Host, TcpPort, public),
{error, not_written} = ?FLU_C:read_projection(Host, TcpPort,
public, 99999),
%% write_projection already tested by flu_projection_common
{ok, _} = ?FLU_C:get_all_projections(Host, TcpPort, public),
{ok, _} = ?FLU_C:list_all_projections(Host, TcpPort, public),
ok
after
ok = ?FLU:stop(FLU1)

View file

@ -75,7 +75,7 @@ partial_stop_restart2() ->
Dict = orddict:from_list(Ps),
[os:cmd("rm -rf " ++ P#p_srvr.props) || {_,P} <- Ps],
{ok, SupPid} = machi_flu_sup:start_link(),
DbgProps = [{dbg, [{initial_wedged, true}]}],
DbgProps = [{initial_wedged, true}],
Start = fun({_,P}) ->
#p_srvr{name=Name, port=Port, props=Dir} = P,
{ok, _} = machi_flu_psup:start_flu_package(