diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl index 6539c51..88a3b6d 100644 --- a/src/machi_flu1.erl +++ b/src/machi_flu1.erl @@ -56,13 +56,12 @@ -include_lib("eunit/include/eunit.hrl"). -endif. % TEST --define(SERVER_CMD_READ_TIMEOUT, 600*1000). - -export([start_link/1, stop/1, update_wedge_state/3, wedge_myself/2]). -export([make_listener_regname/1, make_projection_server_regname/1]). %% TODO: remove or replace in OTP way after gen_*'ified --export([current_state/1, format_state/1]). +-export([main2/4, run_append_server/2, run_listen_server/1, + current_state/1, format_state/1]). -record(state, { flu_name :: atom(), @@ -79,9 +78,13 @@ props = [] :: list() % proplist }). +-define(SERVER_CMD_READ_TIMEOUT, 600*1000). +-define(INIT_TIMEOUT, 60*1000). + start_link([{FluName, TcpPort, DataDir}|Rest]) when is_atom(FluName), is_integer(TcpPort), is_list(DataDir) -> - {ok, spawn_link(fun() -> main2(FluName, TcpPort, DataDir, Rest) end)}. + proc_lib:start_link(?MODULE, main2, [FluName, TcpPort, DataDir, Rest], + ?INIT_TIMEOUT). stop(Pid) -> case erlang:is_process_alive(Pid) of @@ -154,10 +157,7 @@ main2(FluName, TcpPort, DataDir, Props) -> etstab=ets_table_name(FluName), epoch_id=EpochId, props=Props}, - AppendPid = start_append_server(S0, self()), - receive - append_server_ack -> ok - end, + {ok, AppendPid} = start_append_server(S0, self()), if SendAppendPidToProj_p -> machi_projection_store:set_wedge_notify_pid(ProjectionPid, AppendPid); @@ -165,7 +165,7 @@ main2(FluName, TcpPort, DataDir, Props) -> ok end, S1 = S0#state{append_pid=AppendPid}, - ListenPid = start_listen_server(S1), + {ok, ListenPid} = start_listen_server(S1), Config_e = machi_util:make_config_filename(DataDir, "unused"), ok = filelib:ensure_dir(Config_e), @@ -178,6 +178,8 @@ main2(FluName, TcpPort, DataDir, Props) -> put(flu_append_pid, S1#state.append_pid), put(flu_projection_pid, ProjectionPid), put(flu_listen_pid, ListenPid), + proc_lib:init_ack({ok, self()}), + receive killme -> ok end, (catch exit(S1#state.append_pid, kill)), (catch exit(ProjectionPid, kill)), @@ -185,11 +187,10 @@ main2(FluName, TcpPort, DataDir, Props) -> ok. start_listen_server(S) -> - proc_lib:spawn_link(fun() -> run_listen_server(S) end). + proc_lib:start_link(?MODULE, run_listen_server, [S], ?INIT_TIMEOUT). start_append_server(S, AckPid) -> - FluPid = self(), - proc_lib:spawn_link(fun() -> run_append_server(FluPid, AckPid, S) end). + proc_lib:start_link(?MODULE, run_append_server, [AckPid, S], ?INIT_TIMEOUT). run_listen_server(#state{flu_name=FluName, tcp_port=TcpPort}=S) -> register(make_listener_regname(FluName), self()), @@ -198,6 +199,7 @@ run_listen_server(#state{flu_name=FluName, tcp_port=TcpPort}=S) -> {backlog,8192}], case gen_tcp:listen(TcpPort, SockOpts) of {ok, LSock} -> + proc_lib:init_ack({ok, self()}), listen_server_loop(LSock, S); Else -> error_logger:warning_msg("~s:run_listen_server: " @@ -206,14 +208,14 @@ run_listen_server(#state{flu_name=FluName, tcp_port=TcpPort}=S) -> exit({?MODULE, run_listen_server, tcp_port, TcpPort, Else}) end. -run_append_server(FluPid, AckPid, #state{flu_name=Name, - wedged=Wedged_p,epoch_id=EpochId}=S) -> +run_append_server(FluPid, #state{flu_name=Name, + wedged=Wedged_p,epoch_id=EpochId}=S) -> %% Reminder: Name is the "main" name of the FLU, i.e., no suffix register(Name, self()), TID = ets:new(ets_table_name(Name), [set, protected, named_table, {read_concurrency, true}]), ets:insert(TID, {epoch, {Wedged_p, EpochId}}), - AckPid ! append_server_ack, + proc_lib:init_ack({ok, self()}), append_server_loop(FluPid, S#state{etstab=TID}). listen_server_loop(LSock, S) ->