WIP: additional tests for wedge state

This commit is contained in:
Scott Lystig Fritchie 2015-05-08 19:50:47 +09:00
parent 316126fa59
commit 6f7818fca7
2 changed files with 64 additions and 22 deletions

View file

@ -77,6 +77,7 @@
tcp_port :: non_neg_integer(),
data_dir :: string(),
wedged = true :: boolean(),
etstab :: ets:tid(),
epoch_id :: 'undefined' | pv1_epoch(),
dbg_props = [] :: list(), % proplist
props = [] :: list() % proplist
@ -101,14 +102,23 @@ update_wedge_state(PidSpec, Boolean, EpochId)
%%%%%%%%%%%%%%%%%%%%%%%%%%%%
ets_table_name(FluName) when is_atom(FluName) ->
list_to_atom(atom_to_list(FluName) ++ "_epoch");
ets_table_name(FluName) when is_binary(FluName) ->
list_to_atom(binary_to_list(FluName) ++ "_epoch").
main2(FluName, TcpPort, DataDir, Rest) ->
S0 = #state{flu_name=FluName,
tcp_port=TcpPort,
data_dir=DataDir,
wedged=true,
etstab=ets_table_name(FluName),
epoch_id=undefined,
props=Rest},
AppendPid = start_append_server(S0),
AppendPid = start_append_server(S0, self()),
receive
append_server_ack -> ok
end,
{_ProjRegName, ProjectionPid} =
case proplists:get_value(projection_store_registered_name, Rest) of
undefined ->
@ -150,9 +160,9 @@ main2(FluName, TcpPort, DataDir, Rest) ->
start_listen_server(S) ->
proc_lib:spawn_link(fun() -> run_listen_server(S) end).
start_append_server(S) ->
start_append_server(S, AckPid) ->
FluPid = self(),
proc_lib:spawn_link(fun() -> run_append_server(FluPid, S) end).
proc_lib:spawn_link(fun() -> run_append_server(FluPid, AckPid, S) end).
%% start_projection_server(S) ->
%% spawn_link(fun() -> run_projection_server(S) end).
@ -164,10 +174,14 @@ run_listen_server(#state{flu_name=FluName, tcp_port=TcpPort}=S) ->
{ok, LSock} = gen_tcp:listen(TcpPort, SockOpts),
listen_server_loop(LSock, S).
run_append_server(FluPid, #state{flu_name=Name}=S) ->
run_append_server(FluPid, AckPid, #state{flu_name=Name}=S) ->
%% Reminder: Name is the "main" name of the FLU, i.e., no suffix
register(Name, self()),
append_server_loop(FluPid, S).
TID = ets:new(ets_table_name(Name),
[set, protected, named_table, {read_concurrency, true}]),
ets:insert(TID, {epoch, {true, {-1, <<>>}}}),
AckPid ! append_server_ack,
append_server_loop(FluPid, S#state{etstab=TID}).
listen_server_loop(LSock, S) ->
{ok, Sock} = gen_tcp:accept(LSock),
@ -186,6 +200,7 @@ append_server_loop(FluPid, #state{data_dir=DataDir,wedged=Wedged_p}=S) ->
%% DataDir, FluPid) end),
append_server_loop(FluPid, S);
{wedge_state_change, Boolean, EpochId} ->
true = ets:insert(S#state.etstab, {epoch, {Boolean, EpochId}}),
append_server_loop(FluPid, S#state{wedged=Boolean,
epoch_id=EpochId});
{wedge_status, FromPid} ->
@ -217,10 +232,14 @@ net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) ->
Prefix:PrefixLenLF/binary, "\n">> ->
do_net_server_append(FluName, Sock, LenHex, Prefix);
<<"R ",
_EpochIDRaw:(?EpochIDSpace)/binary,
EpochIDRaw:(?EpochIDSpace)/binary,
OffsetHex:16/binary, LenHex:8/binary,
File:FileLenLF/binary, "\n">> ->
do_net_server_read(Sock, OffsetHex, LenHex, File, DataDir);
{Wedged_p, CurrentEpochId} = ets:lookup_element(S#state.etstab, epoch, 2),
io:format(user, "TT wedged ~p, CurrentEpochId ~p\n", [Wedged_p, CurrentEpochId]),
timer:sleep(50),
do_net_server_read(Sock, OffsetHex, LenHex, File, DataDir,
EpochIDRaw, Wedged_p, CurrentEpochId);
<<"L ", _EpochIDRaw:(?EpochIDSpace)/binary, "\n">> ->
do_net_server_listing(Sock, DataDir);
<<"C ",
@ -238,7 +257,8 @@ net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) ->
_EpochIDRaw:(?EpochIDSpace)/binary,
OffsetHex:16/binary, LenHex:8/binary,
File:WriteFileLenLF/binary, "\n">> ->
do_net_server_write(Sock, OffsetHex, LenHex, File, DataDir);
do_net_server_write(Sock, OffsetHex, LenHex, File, DataDir,
<<"fixme1">>, false, <<"fixme2">>);
%% For data migration only.
<<"DEL-migration ",
_EpochIDRaw:(?EpochIDSpace)/binary,
@ -329,7 +349,8 @@ do_wedge_status(FluName, Sock) ->
end,
ok = gen_tcp:send(Sock, Reply).
do_net_server_read(Sock, OffsetHex, LenHex, FileBin, DataDir) ->
do_net_server_read(Sock, OffsetHex, LenHex, FileBin, DataDir,
EpochIDRaw, Wedged_p, CurrentEpochId) ->
DoItFun = fun(FH, Offset, Len) ->
case file:pread(FH, Offset, Len) of
{ok, Bytes} when byte_size(Bytes) == Len ->
@ -347,20 +368,26 @@ do_net_server_read(Sock, OffsetHex, LenHex, FileBin, DataDir) ->
end
end,
do_net_server_readwrite_common(Sock, OffsetHex, LenHex, FileBin, DataDir,
[read, binary, raw], DoItFun).
[read, binary, raw], DoItFun,
EpochIDRaw, Wedged_p, CurrentEpochId).
do_net_server_readwrite_common(Sock, OffsetHex, LenHex, FileBin, DataDir,
FileOpts, DoItFun) ->
case sanitize_file_string(FileBin) of
ok ->
FileOpts, DoItFun,
EpochIDRaw, Wedged_p, CurrentEpochId) ->
case {Wedged_p, sanitize_file_string(FileBin)} of
{false, ok} ->
do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin,
DataDir, FileOpts, DoItFun);
_ ->
DataDir, FileOpts, DoItFun,
EpochIDRaw, Wedged_p, CurrentEpochId);
{true, _} ->
ok = gen_tcp:send(Sock, <<"ERROR WEDGED\n">>);
{_, __} ->
ok = gen_tcp:send(Sock, <<"ERROR BAD-ARG\n">>)
end.
do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin, DataDir,
FileOpts, DoItFun) ->
FileOpts, DoItFun,
EpochIDRaw, Wedged_p, CurrentEpochId) ->
<<Offset:64/big>> = machi_util:hexstr_to_bin(OffsetHex),
<<Len:32/big>> = machi_util:hexstr_to_bin(LenHex),
{_, Path} = machi_util:make_data_filename(DataDir, FileBin),
@ -375,24 +402,29 @@ do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin, DataDir,
{error, enoent} when OptsHasWrite ->
do_net_server_readwrite_common(
Sock, OffsetHex, LenHex, FileBin, DataDir,
FileOpts, DoItFun);
FileOpts, DoItFun,
EpochIDRaw, Wedged_p, CurrentEpochId);
_Else ->
%%%%%% keep?? machi_util:verb("Else ~p ~p ~p ~p\n", [Offset, Len, Path, _Else]),
ok = gen_tcp:send(Sock, <<"ERROR BAD-IO\n">>)
end.
do_net_server_write(Sock, OffsetHex, LenHex, FileBin, DataDir) ->
do_net_server_write(Sock, OffsetHex, LenHex, FileBin, DataDir,
EpochIDRaw, Wedged_p, CurrentEpochId) ->
CSumPath = machi_util:make_checksum_filename(DataDir, FileBin),
case file:open(CSumPath, [append, raw, binary, delayed_write]) of
{ok, FHc} ->
do_net_server_write2(Sock, OffsetHex, LenHex, FileBin, DataDir, FHc);
do_net_server_write2(Sock, OffsetHex, LenHex, FileBin, DataDir, FHc,
EpochIDRaw, Wedged_p, CurrentEpochId);
{error, enoent} ->
ok = filelib:ensure_dir(CSumPath),
do_net_server_write(Sock, OffsetHex, LenHex, FileBin, DataDir)
do_net_server_write(Sock, OffsetHex, LenHex, FileBin, DataDir,
EpochIDRaw, Wedged_p, CurrentEpochId)
end.
do_net_server_write2(Sock, OffsetHex, LenHex, FileBin, DataDir, FHc) ->
do_net_server_write2(Sock, OffsetHex, LenHex, FileBin, DataDir, FHc,
EpochIDRaw, Wedged_p, CurrentEpochId) ->
DoItFun = fun(FHd, Offset, Len) ->
ok = inet:setopts(Sock, [{packet, raw}]),
{ok, Chunk} = gen_tcp:recv(Sock, Len),
@ -411,7 +443,8 @@ do_net_server_write2(Sock, OffsetHex, LenHex, FileBin, DataDir, FHc) ->
end
end,
do_net_server_readwrite_common(Sock, OffsetHex, LenHex, FileBin, DataDir,
[write, read, binary, raw], DoItFun).
[write, read, binary, raw], DoItFun,
EpochIDRaw, Wedged_p, CurrentEpochId).
perhaps_do_net_server_ec_read(Sock, FH) ->
case file:pread(FH, 0, ?MINIMUM_OFFSET) of

View file

@ -133,6 +133,15 @@ partial_stop_restart2() ->
hd(PStores), private),
%% Confirm that 'a' is wedged
{error, wedged} = Append(hd(Ps)),
{_, #p_srvr{address=Addr_a, port=TcpPort_a}} = hd(Ps),
{error, wedged} = machi_flu1_client:read_chunk(
Addr_a, TcpPort_a, ?DUMMY_PV1_EPOCH,
<<>>, 99999999, 1),
{error, wedged} = machi_flu1_client:checksum_list(
Addr_a, TcpPort_a, ?DUMMY_PV1_EPOCH, <<>>),
{error, wedged} = machi_flu1_client:list_files(
Addr_a, TcpPort_a, ?DUMMY_PV1_EPOCH),
%% Iterate through humming consensus once
{now_using,_,Epoch_n} = machi_chain_manager1:test_react_to_env(
hd(ChMgrs)),