From d331e09923f58485eeb81755479dc1a461ca1823 Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Thu, 16 Jul 2015 17:59:02 +0900 Subject: [PATCH] Hrm, fewer deadlocks, but sometimes unreliable shutdown --- rebar.config.script | 5 ++++ src/machi_chain_manager1.erl | 1 + src/machi_cr_client.erl | 3 +++ src/machi_flu1.erl | 36 ++++++++++++++++++++++++++--- src/machi_flu_psup.erl | 8 +++++++ src/machi_flu_sup.erl | 17 +++++++++++++- src/machi_pb_translate.erl | 2 +- src/machi_projection_store.erl | 14 +++++++---- test/machi_chain_manager1_pulse.erl | 18 +++++++++------ 9 files changed, 88 insertions(+), 16 deletions(-) diff --git a/rebar.config.script b/rebar.config.script index 7cd0b95..fbd0e8b 100644 --- a/rebar.config.script +++ b/rebar.config.script @@ -16,6 +16,11 @@ case PulseBuild of {pulse_side_effect, [ {does_not_exist_yet, some_func, '_'} + , {machi_flu1_client, '_', '_'} + , {machi_projection_store, '_', '_'} + , {machi_proxy_flu1_client, '_', '_'} + , {machi_pb_translate, '_', '_'} + , {prim_file, '_', '_'} , {file, '_', '_'} , {filelib, '_', '_'} diff --git a/src/machi_chain_manager1.erl b/src/machi_chain_manager1.erl index b449657..6d24a4d 100644 --- a/src/machi_chain_manager1.erl +++ b/src/machi_chain_manager1.erl @@ -2059,6 +2059,7 @@ do_repair( repairing=[_|_]=Repairing, members_dict=MembersDict}}=_S_copy, Opts, ap_mode=RepairMode) -> +?V("RePaiR-~w,", [self()]), T1 = os:timestamp(), RepairId = proplists:get_value(repair_id, Opts, id1), error_logger:info_msg("Repair start: tail ~p of ~p -> ~p, ~p ID ~w\n", diff --git a/src/machi_cr_client.erl b/src/machi_cr_client.erl index ce30722..5fedae3 100644 --- a/src/machi_cr_client.erl +++ b/src/machi_cr_client.erl @@ -108,6 +108,7 @@ -include("machi.hrl"). -include("machi_projection.hrl"). +-include("machi_verbose.hrl"). -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). @@ -750,8 +751,10 @@ run_middleworker_job(Fun, ArgList, WTimeout) -> Parent = self(), MiddleWorker = spawn(fun() -> +?V("Goo1-~w,", [self()]), PidsMons = [spawn_monitor(fun() -> +?V("Goo1-~w,", [self()]), Res = (catch Fun(Arg)), exit(Res) end) || Arg <- ArgList], diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl index 8c2a687..d25924c 100644 --- a/src/machi_flu1.erl +++ b/src/machi_flu1.erl @@ -77,6 +77,8 @@ -include("machi.hrl"). -include("machi_pb.hrl"). -include("machi_projection.hrl"). +-define(V(X,Y), ok). +%% -include("machi_verbose.hrl"). -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). @@ -140,6 +142,7 @@ ets_table_name(FluName) when is_atom(FluName) -> %% list_to_atom(binary_to_list(FluName) ++ "_epoch"). main2(FluName, TcpPort, DataDir, Rest) -> + ?V("flu-~w,", [self()]), {Props, DbgProps} = case proplists:get_value(dbg, Rest) of undefined -> {Rest, []}; @@ -220,6 +223,7 @@ start_append_server(S, AckPid) -> %% spawn_link(fun() -> run_projection_server(S) end). run_listen_server(#state{flu_name=FluName, tcp_port=TcpPort}=S) -> + ?V("listen-~w,", [self()]), register(make_listener_regname(FluName), self()), SockOpts = ?PB_PACKET_OPTS ++ [{reuseaddr, true}, {mode, binary}, {active, false}], @@ -235,6 +239,7 @@ run_listen_server(#state{flu_name=FluName, tcp_port=TcpPort}=S) -> run_append_server(FluPid, AckPid, #state{flu_name=Name, wedged=Wedged_p,epoch_id=EpochId}=S) -> + ?V("append-~w,", [self()]), %% Reminder: Name is the "main" name of the FLU, i.e., no suffix register(Name, self()), TID = ets:new(ets_table_name(Name), @@ -247,7 +252,7 @@ run_append_server(FluPid, AckPid, #state{flu_name=Name, listen_server_loop(LSock, S) -> {ok, Sock} = gen_tcp:accept(LSock), - spawn_link(fun() -> net_server_loop(Sock, S) end), + spawn_link(fun() -> ?V("net_server-~w,", [self()]), net_server_loop(Sock, S) end), listen_server_loop(LSock, S). append_server_loop(FluPid, #state{data_dir=DataDir,wedged=Wedged_p, @@ -258,7 +263,7 @@ append_server_loop(FluPid, #state{data_dir=DataDir,wedged=Wedged_p, From ! wedged, append_server_loop(FluPid, S); {seq_append, From, Prefix, Chunk, CSum, Extra} -> - spawn(fun() -> append_server_dispatch(From, Prefix, + spawn(fun() -> ?V("appendX-~w,", [self()]), append_server_dispatch(From, Prefix, Chunk, CSum, Extra, DataDir, AppendServerPid) end), append_server_loop(FluPid, S); @@ -292,22 +297,36 @@ append_server_loop(FluPid, #state{data_dir=DataDir,wedged=Wedged_p, end. net_server_loop(Sock, S) -> + ?V("~w ~w,", [self(), ?LINE]), case gen_tcp:recv(Sock, 0, ?SERVER_CMD_READ_TIMEOUT) of {ok, Bin} -> + ?V("~w ~w,", [self(), ?LINE]), {RespBin, S2} = case machi_pb:decode_mpb_ll_request(Bin) of LL_req when LL_req#mpb_ll_request.do_not_alter == 2 -> - {R, NewS} = do_pb_ll_request(LL_req, S), + ?V("~w ~w,", [self(), ?LINE]), + ZARF = (catch do_pb_ll_request(LL_req, S)), + %% ?V("~w ~w ~p,", [self(), ?LINE, ZARF]), + {R, NewS} = ZARF, + %% {R, NewS} = do_pb_ll_request(LL_req, S), + ?V("~w ~w,", [self(), ?LINE]), {machi_pb:encode_mpb_ll_response(R), mode(low, NewS)}; _ -> + ?V("~w ~w,", [self(), ?LINE]), HL_req = machi_pb:decode_mpb_request(Bin), + ?V("~w ~w,", [self(), ?LINE]), 1 = HL_req#mpb_request.do_not_alter, + ?V("~w ~w,", [self(), ?LINE]), {R, NewS} = do_pb_hl_request(HL_req, make_high_clnt(S)), + ?V("~w ~w,", [self(), ?LINE]), {machi_pb:encode_mpb_response(R), mode(high, NewS)} end, + ?V("~w ~w,", [self(), ?LINE]), ok = gen_tcp:send(Sock, RespBin), + ?V("~w ~w,", [self(), ?LINE]), net_server_loop(Sock, S2); {error, SockError} -> + ?V("~w ~w,", [self(), ?LINE]), Msg = io_lib:format("Socket error ~w", [SockError]), R = #mpb_ll_response{req_id= <<>>, generic=#mpb_errorresp{code=1, msg=Msg}}, @@ -337,10 +356,13 @@ make_high_clnt(S) -> S. do_pb_ll_request(#mpb_ll_request{req_id=ReqID}, #state{pb_mode=high}=S) -> + ?V("~w ~w,", [self(), ?LINE]), Result = {high_error, 41, "Low protocol request while in high mode"}, {machi_pb_translate:to_pb_response(ReqID, unused, Result), S}; do_pb_ll_request(PB_request, S) -> + ?V("~w ~w,", [self(), ?LINE]), Req = machi_pb_translate:from_pb_request(PB_request), + ?V("~w ~w,", [self(), ?LINE]), {ReqID, Cmd, Result, S2} = case Req of {RqID, {LowCmd, _}=CMD} @@ -348,13 +370,19 @@ do_pb_ll_request(PB_request, S) -> LowCmd == low_wedge_status; LowCmd == low_list_files -> %% Skip wedge check for projection commands! %% Skip wedge check for these unprivileged commands + ?V("~w ~w,", [self(), ?LINE]), {Rs, NewS} = do_pb_ll_request3(CMD, S), + ?V("~w ~w,", [self(), ?LINE]), {RqID, CMD, Rs, NewS}; {RqID, CMD} -> + ?V("~w ~w,", [self(), ?LINE]), EpochID = element(2, CMD), % by common convention + ?V("~w ~w,", [self(), ?LINE]), {Rs, NewS} = do_pb_ll_request2(EpochID, CMD, S), + ?V("~w ~w,", [self(), ?LINE]), {RqID, CMD, Rs, NewS} end, + ?V("~w ~w,", [self(), ?LINE]), {machi_pb_translate:to_pb_response(ReqID, Cmd, Result), S2}. do_pb_ll_request2(EpochID, CMD, S) -> @@ -404,6 +432,7 @@ do_pb_ll_request3({low_delete_migration, _EpochID, File}, S) -> do_pb_ll_request3({low_trunc_hack, _EpochID, File}, S) -> {do_server_trunc_hack(File, S), S}; do_pb_ll_request3({low_proj, PCMD}, S) -> + ?V("~w ~w,", [self(), ?LINE]), {do_server_proj_request(PCMD, S), S}. do_pb_hl_request(#mpb_request{req_id=ReqID}, #state{pb_mode=low}=S) -> @@ -717,6 +746,7 @@ write_server_find_pid(Prefix) -> start_seq_append_server(Prefix, DataDir, AppendServerPid) -> proc_lib:spawn_link(fun() -> +?V("appendY-~w,", [self()]), %% The following is only necessary to %% make nice process relationships in %% 'appmon' and related tools. diff --git a/src/machi_flu_psup.erl b/src/machi_flu_psup.erl index b02f64d..d88a4ea 100644 --- a/src/machi_flu_psup.erl +++ b/src/machi_flu_psup.erl @@ -61,6 +61,13 @@ -behaviour(supervisor). +-include("machi_verbose.hrl"). + +-ifdef(PULSE). +-compile({parse_transform, pulse_instrument}). +-include_lib("pulse_otp/include/pulse_otp.hrl"). +-endif. + %% External API -export([make_package_spec/4, start_flu_package/4, stop_flu_package/1]). %% Internal API @@ -92,6 +99,7 @@ start_link(FluName, TcpPort, DataDir, Props) -> [FluName, TcpPort, DataDir, Props]). init([FluName, TcpPort, DataDir, Props0]) -> + erlang:display({flu_psup,self()}), RestartStrategy = one_for_all, MaxRestarts = 1000, MaxSecondsBetweenRestarts = 3600, diff --git a/src/machi_flu_sup.erl b/src/machi_flu_sup.erl index 5082b55..8b236d0 100644 --- a/src/machi_flu_sup.erl +++ b/src/machi_flu_sup.erl @@ -28,6 +28,13 @@ -behaviour(supervisor). +-include("machi_verbose.hrl"). + +-ifdef(PULSE). +-compile({parse_transform, pulse_instrument}). +-include_lib("pulse_otp/include/pulse_otp.hrl"). +-endif. + %% API -export([start_link/0]). @@ -40,15 +47,23 @@ start_link() -> supervisor:start_link({local, ?SERVER}, ?MODULE, []). init([]) -> + erlang:display({flu_sup,self()}), RestartStrategy = one_for_one, MaxRestarts = 1000, MaxSecondsBetweenRestarts = 3600, SupFlags = {RestartStrategy, MaxRestarts, MaxSecondsBetweenRestarts}, - Ps = application:get_env(machi, initial_flus, []), + Ps = get_initial_flus(), FLU_specs = [machi_flu_psup:make_package_spec(FluName, TcpPort, DataDir, Props) || {FluName, TcpPort, DataDir, Props} <- Ps], {ok, {SupFlags, FLU_specs}}. +-ifdef(PULSE). +get_initial_flus() -> + []. +-else. % PULSE +get_initial_flus() -> + application:get_env(machi, initial_flus, []). +-endif. % PULSE diff --git a/src/machi_pb_translate.erl b/src/machi_pb_translate.erl index 71ac325..29774d1 100644 --- a/src/machi_pb_translate.erl +++ b/src/machi_pb_translate.erl @@ -181,7 +181,7 @@ from_pb_request(#mpb_request{req_id=ReqID, from_pb_request(#mpb_request{req_id=ReqID}) -> {ReqID, {high_error, 999966, "Unknown request"}}; from_pb_request(_Else) -> - io:format(user, "\nRRR from_pb_request(~p)\n", [_Else]), timer:sleep(2000), + io:format(user, "\nRRR from_pb_request(~p)\n", [_Else]), %%timer:sleep(2000), {<<>>, {high_error, 999667, "Unknown PB request"}}. from_pb_response(#mpb_ll_response{ diff --git a/src/machi_projection_store.erl b/src/machi_projection_store.erl index 6b015a6..800b12f 100644 --- a/src/machi_projection_store.erl +++ b/src/machi_projection_store.erl @@ -41,11 +41,13 @@ -module(machi_projection_store). -include("machi_projection.hrl"). +-define(V(X,Y), ok). +%% -include("machi_verbose.hrl"). --ifdef(PULSE). --compile({parse_transform, pulse_instrument}). --include_lib("pulse_otp/include/pulse_otp.hrl"). --endif. +%% -ifdef(PULSE). +%% -compile({parse_transform, pulse_instrument}). +%% -include_lib("pulse_otp/include/pulse_otp.hrl"). +%% -endif. %% API -export([ @@ -106,6 +108,7 @@ read_latest_projection(PidSpec, ProjType) -> read_latest_projection(PidSpec, ProjType, Timeout) when ProjType == 'public' orelse ProjType == 'private' -> + ?V("~w ~w ~w,", [self(), ?MODULE, ?LINE]), g_call(PidSpec, {read_latest_projection, ProjType}, Timeout). %% @doc Fetch the projection record type `ProjType' for epoch number `Epoch' . @@ -170,6 +173,7 @@ g_call(PidSpec, Arg, Timeout) -> %%%%%%%%%%%%%%%%%%%%%%%%%%% init([DataDir, NotifyWedgeStateChanges]) -> + ?V("pstore-~w,", [self()]), lclock_init(), PublicDir = machi_util:make_projection_filename(DataDir, "public"), PrivateDir = machi_util:make_projection_filename(DataDir, "private"), @@ -197,7 +201,9 @@ handle_call({{read_latest_projection, ProjType}, LC1}, _From, S) -> {EpochNum, _CSum} = if ProjType == public -> S#state.max_public_epochid; ProjType == private -> S#state.max_private_epochid end, + ?V("~w ~w ~w,", [self(), ?MODULE, ?LINE]), {Reply, NewS} = do_proj_read(ProjType, EpochNum, S), + ?V("~w ~w ~w,", [self(), ?MODULE, ?LINE]), {reply, {Reply, LC2}, NewS}; handle_call({{read, ProjType, Epoch}, LC1}, _From, S) -> LC2 = lclock_update(LC1), diff --git a/test/machi_chain_manager1_pulse.erl b/test/machi_chain_manager1_pulse.erl index 7f60ae3..6bd3b7d 100644 --- a/test/machi_chain_manager1_pulse.erl +++ b/test/machi_chain_manager1_pulse.erl @@ -170,7 +170,7 @@ all_list() -> [P#p_srvr.name || {P, _Dir} <- all_list_extra()]. setup(Num, Seed) -> - ?V("\nsetup(~w", [Num]), + ?V("\nsetup(~w,~w", [self(), Num]), All_list = lists:sublist(all_list(), Num), All_listE = lists:sublist(all_list_extra(), Num), %% shutdown_hard() has taken care of killing all relevant procs. @@ -245,7 +245,7 @@ private_stable_check() -> {_PSimPid, _SupPid, ProxiesDict, All_listE} = get(manager_pids_hack), Res = private_projections_are_stable_check(ProxiesDict, All_listE), if not Res -> - ?V("BUMMER: private stable check failed!\n", []); + ?QC_FMT("BUMMER: private stable check failed!\n", []); true -> ok end, @@ -311,6 +311,7 @@ prop_pulse(Style) when Style == new; Style == regression -> ?FORALL({Cmds0, Seed}, {gen_commands(Style), pulse:seed()}, ?IMPLIES(1 < length(Cmds0) andalso length(Cmds0) < 11, begin +erlang:display({prop,?MODULE,?LINE,self()}), ok = shutdown_hard(), %% PULSE can be really unfair, of course, including having exec_ticks %% run where all of FLU a does its ticks then FLU b. Such a situation @@ -334,9 +335,13 @@ prop_pulse(Style) when Style == new; Style == regression -> pulse:verbose([format]), {_H2, S2, Res} = pulse:run( fun() -> - {_H, _S, _R} = run_commands(?MODULE, Cmds) + ?V("PROP-~w,", [self()]), + %% {_H, _S, _R} = run_commands(?MODULE, Cmds) +QAQA = run_commands(?MODULE, Cmds) +,?V("pid681=~p", [process_info(list_to_pid("<0.681.0>"))]), QAQA end, [{seed, Seed}, {strategy, unfair}]), + ok = shutdown_hard(), {Report, PrivProjs, Diag} = S2#state.dump_state, %% Report is ordered by Epoch. For each private projection @@ -368,7 +373,6 @@ prop_pulse(Style) when Style == new; Style == regression -> {false, LastRepXs} end, - ok = shutdown_hard(), ?WHENFAIL( begin %% ?QC_FMT("PrivProjs = ~P\n", [PrivProjs, 50]), @@ -456,20 +460,20 @@ shutdown_hard() -> (catch unlink(whereis(machi_partition_simulator))), [begin Pid = whereis(X), - spawn(fun() -> (catch X:stop()) end), + %%%%%%DELME deadlock source? spawn(fun() -> ?QC_FMT("shutdown-~w,", [self()]), (catch X:stop()) end), timer:sleep(50), timer:sleep(10), (catch exit(Pid, shutdown)), timer:sleep(1), (catch exit(Pid, kill)) end || X <- [machi_partition_simulator, machi_flu_sup] ], - timer:sleep(1), + timer:sleep(100), ok. exec_ticks(Num, All_listE) -> Parent = self(), Pids = [spawn_link(fun() -> - %% ?V("tick-~w,", [self()]), + ?V("tick-~w,", [self()]), [begin M_name = P#p_srvr.name, %% Max = 10,