From 6f7818fca757a00875a53d1f3e869e22a319919e Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Fri, 8 May 2015 19:50:47 +0900 Subject: [PATCH] WIP: additional tests for wedge state --- src/machi_flu1.erl | 77 +++++++++++++++++++++++++----------- test/machi_flu_psup_test.erl | 9 +++++ 2 files changed, 64 insertions(+), 22 deletions(-) diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl index 56bdd60..f6ebb06 100644 --- a/src/machi_flu1.erl +++ b/src/machi_flu1.erl @@ -77,6 +77,7 @@ tcp_port :: non_neg_integer(), data_dir :: string(), wedged = true :: boolean(), + etstab :: ets:tid(), epoch_id :: 'undefined' | pv1_epoch(), dbg_props = [] :: list(), % proplist props = [] :: list() % proplist @@ -101,14 +102,23 @@ update_wedge_state(PidSpec, Boolean, EpochId) %%%%%%%%%%%%%%%%%%%%%%%%%%%% +ets_table_name(FluName) when is_atom(FluName) -> + list_to_atom(atom_to_list(FluName) ++ "_epoch"); +ets_table_name(FluName) when is_binary(FluName) -> + list_to_atom(binary_to_list(FluName) ++ "_epoch"). + main2(FluName, TcpPort, DataDir, Rest) -> S0 = #state{flu_name=FluName, tcp_port=TcpPort, data_dir=DataDir, wedged=true, + etstab=ets_table_name(FluName), epoch_id=undefined, props=Rest}, - AppendPid = start_append_server(S0), + AppendPid = start_append_server(S0, self()), + receive + append_server_ack -> ok + end, {_ProjRegName, ProjectionPid} = case proplists:get_value(projection_store_registered_name, Rest) of undefined -> @@ -150,9 +160,9 @@ main2(FluName, TcpPort, DataDir, Rest) -> start_listen_server(S) -> proc_lib:spawn_link(fun() -> run_listen_server(S) end). -start_append_server(S) -> +start_append_server(S, AckPid) -> FluPid = self(), - proc_lib:spawn_link(fun() -> run_append_server(FluPid, S) end). + proc_lib:spawn_link(fun() -> run_append_server(FluPid, AckPid, S) end). %% start_projection_server(S) -> %% spawn_link(fun() -> run_projection_server(S) end). @@ -164,10 +174,14 @@ run_listen_server(#state{flu_name=FluName, tcp_port=TcpPort}=S) -> {ok, LSock} = gen_tcp:listen(TcpPort, SockOpts), listen_server_loop(LSock, S). -run_append_server(FluPid, #state{flu_name=Name}=S) -> +run_append_server(FluPid, AckPid, #state{flu_name=Name}=S) -> %% Reminder: Name is the "main" name of the FLU, i.e., no suffix register(Name, self()), - append_server_loop(FluPid, S). + TID = ets:new(ets_table_name(Name), + [set, protected, named_table, {read_concurrency, true}]), + ets:insert(TID, {epoch, {true, {-1, <<>>}}}), + AckPid ! append_server_ack, + append_server_loop(FluPid, S#state{etstab=TID}). listen_server_loop(LSock, S) -> {ok, Sock} = gen_tcp:accept(LSock), @@ -186,6 +200,7 @@ append_server_loop(FluPid, #state{data_dir=DataDir,wedged=Wedged_p}=S) -> %% DataDir, FluPid) end), append_server_loop(FluPid, S); {wedge_state_change, Boolean, EpochId} -> + true = ets:insert(S#state.etstab, {epoch, {Boolean, EpochId}}), append_server_loop(FluPid, S#state{wedged=Boolean, epoch_id=EpochId}); {wedge_status, FromPid} -> @@ -217,10 +232,14 @@ net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) -> Prefix:PrefixLenLF/binary, "\n">> -> do_net_server_append(FluName, Sock, LenHex, Prefix); <<"R ", - _EpochIDRaw:(?EpochIDSpace)/binary, + EpochIDRaw:(?EpochIDSpace)/binary, OffsetHex:16/binary, LenHex:8/binary, File:FileLenLF/binary, "\n">> -> - do_net_server_read(Sock, OffsetHex, LenHex, File, DataDir); + {Wedged_p, CurrentEpochId} = ets:lookup_element(S#state.etstab, epoch, 2), + io:format(user, "TT wedged ~p, CurrentEpochId ~p\n", [Wedged_p, CurrentEpochId]), + timer:sleep(50), + do_net_server_read(Sock, OffsetHex, LenHex, File, DataDir, + EpochIDRaw, Wedged_p, CurrentEpochId); <<"L ", _EpochIDRaw:(?EpochIDSpace)/binary, "\n">> -> do_net_server_listing(Sock, DataDir); <<"C ", @@ -238,7 +257,8 @@ net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) -> _EpochIDRaw:(?EpochIDSpace)/binary, OffsetHex:16/binary, LenHex:8/binary, File:WriteFileLenLF/binary, "\n">> -> - do_net_server_write(Sock, OffsetHex, LenHex, File, DataDir); + do_net_server_write(Sock, OffsetHex, LenHex, File, DataDir, + <<"fixme1">>, false, <<"fixme2">>); %% For data migration only. <<"DEL-migration ", _EpochIDRaw:(?EpochIDSpace)/binary, @@ -329,7 +349,8 @@ do_wedge_status(FluName, Sock) -> end, ok = gen_tcp:send(Sock, Reply). -do_net_server_read(Sock, OffsetHex, LenHex, FileBin, DataDir) -> +do_net_server_read(Sock, OffsetHex, LenHex, FileBin, DataDir, + EpochIDRaw, Wedged_p, CurrentEpochId) -> DoItFun = fun(FH, Offset, Len) -> case file:pread(FH, Offset, Len) of {ok, Bytes} when byte_size(Bytes) == Len -> @@ -347,20 +368,26 @@ do_net_server_read(Sock, OffsetHex, LenHex, FileBin, DataDir) -> end end, do_net_server_readwrite_common(Sock, OffsetHex, LenHex, FileBin, DataDir, - [read, binary, raw], DoItFun). + [read, binary, raw], DoItFun, + EpochIDRaw, Wedged_p, CurrentEpochId). do_net_server_readwrite_common(Sock, OffsetHex, LenHex, FileBin, DataDir, - FileOpts, DoItFun) -> - case sanitize_file_string(FileBin) of - ok -> + FileOpts, DoItFun, + EpochIDRaw, Wedged_p, CurrentEpochId) -> + case {Wedged_p, sanitize_file_string(FileBin)} of + {false, ok} -> do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin, - DataDir, FileOpts, DoItFun); - _ -> + DataDir, FileOpts, DoItFun, + EpochIDRaw, Wedged_p, CurrentEpochId); + {true, _} -> + ok = gen_tcp:send(Sock, <<"ERROR WEDGED\n">>); + {_, __} -> ok = gen_tcp:send(Sock, <<"ERROR BAD-ARG\n">>) end. do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin, DataDir, - FileOpts, DoItFun) -> + FileOpts, DoItFun, + EpochIDRaw, Wedged_p, CurrentEpochId) -> <> = machi_util:hexstr_to_bin(OffsetHex), <> = machi_util:hexstr_to_bin(LenHex), {_, Path} = machi_util:make_data_filename(DataDir, FileBin), @@ -375,24 +402,29 @@ do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin, DataDir, {error, enoent} when OptsHasWrite -> do_net_server_readwrite_common( Sock, OffsetHex, LenHex, FileBin, DataDir, - FileOpts, DoItFun); + FileOpts, DoItFun, + EpochIDRaw, Wedged_p, CurrentEpochId); _Else -> %%%%%% keep?? machi_util:verb("Else ~p ~p ~p ~p\n", [Offset, Len, Path, _Else]), ok = gen_tcp:send(Sock, <<"ERROR BAD-IO\n">>) end. -do_net_server_write(Sock, OffsetHex, LenHex, FileBin, DataDir) -> +do_net_server_write(Sock, OffsetHex, LenHex, FileBin, DataDir, + EpochIDRaw, Wedged_p, CurrentEpochId) -> CSumPath = machi_util:make_checksum_filename(DataDir, FileBin), case file:open(CSumPath, [append, raw, binary, delayed_write]) of {ok, FHc} -> - do_net_server_write2(Sock, OffsetHex, LenHex, FileBin, DataDir, FHc); + do_net_server_write2(Sock, OffsetHex, LenHex, FileBin, DataDir, FHc, + EpochIDRaw, Wedged_p, CurrentEpochId); {error, enoent} -> ok = filelib:ensure_dir(CSumPath), - do_net_server_write(Sock, OffsetHex, LenHex, FileBin, DataDir) + do_net_server_write(Sock, OffsetHex, LenHex, FileBin, DataDir, + EpochIDRaw, Wedged_p, CurrentEpochId) end. -do_net_server_write2(Sock, OffsetHex, LenHex, FileBin, DataDir, FHc) -> +do_net_server_write2(Sock, OffsetHex, LenHex, FileBin, DataDir, FHc, + EpochIDRaw, Wedged_p, CurrentEpochId) -> DoItFun = fun(FHd, Offset, Len) -> ok = inet:setopts(Sock, [{packet, raw}]), {ok, Chunk} = gen_tcp:recv(Sock, Len), @@ -411,7 +443,8 @@ do_net_server_write2(Sock, OffsetHex, LenHex, FileBin, DataDir, FHc) -> end end, do_net_server_readwrite_common(Sock, OffsetHex, LenHex, FileBin, DataDir, - [write, read, binary, raw], DoItFun). + [write, read, binary, raw], DoItFun, + EpochIDRaw, Wedged_p, CurrentEpochId). perhaps_do_net_server_ec_read(Sock, FH) -> case file:pread(FH, 0, ?MINIMUM_OFFSET) of diff --git a/test/machi_flu_psup_test.erl b/test/machi_flu_psup_test.erl index 2c04ef5..94ecf69 100644 --- a/test/machi_flu_psup_test.erl +++ b/test/machi_flu_psup_test.erl @@ -133,6 +133,15 @@ partial_stop_restart2() -> hd(PStores), private), %% Confirm that 'a' is wedged {error, wedged} = Append(hd(Ps)), + {_, #p_srvr{address=Addr_a, port=TcpPort_a}} = hd(Ps), + {error, wedged} = machi_flu1_client:read_chunk( + Addr_a, TcpPort_a, ?DUMMY_PV1_EPOCH, + <<>>, 99999999, 1), + {error, wedged} = machi_flu1_client:checksum_list( + Addr_a, TcpPort_a, ?DUMMY_PV1_EPOCH, <<>>), + {error, wedged} = machi_flu1_client:list_files( + Addr_a, TcpPort_a, ?DUMMY_PV1_EPOCH), + %% Iterate through humming consensus once {now_using,_,Epoch_n} = machi_chain_manager1:test_react_to_env( hd(ChMgrs)),