Make flu1 initialization synchronous
This commit is contained in:
parent
84f9ccc4f5
commit
478107915b
1 changed files with 17 additions and 15 deletions
|
@ -56,13 +56,12 @@
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
-endif. % TEST
|
-endif. % TEST
|
||||||
|
|
||||||
-define(SERVER_CMD_READ_TIMEOUT, 600*1000).
|
|
||||||
|
|
||||||
-export([start_link/1, stop/1,
|
-export([start_link/1, stop/1,
|
||||||
update_wedge_state/3, wedge_myself/2]).
|
update_wedge_state/3, wedge_myself/2]).
|
||||||
-export([make_listener_regname/1, make_projection_server_regname/1]).
|
-export([make_listener_regname/1, make_projection_server_regname/1]).
|
||||||
%% TODO: remove or replace in OTP way after gen_*'ified
|
%% TODO: remove or replace in OTP way after gen_*'ified
|
||||||
-export([current_state/1, format_state/1]).
|
-export([main2/4, run_append_server/2, run_listen_server/1,
|
||||||
|
current_state/1, format_state/1]).
|
||||||
|
|
||||||
-record(state, {
|
-record(state, {
|
||||||
flu_name :: atom(),
|
flu_name :: atom(),
|
||||||
|
@ -79,9 +78,13 @@
|
||||||
props = [] :: list() % proplist
|
props = [] :: list() % proplist
|
||||||
}).
|
}).
|
||||||
|
|
||||||
|
-define(SERVER_CMD_READ_TIMEOUT, 600*1000).
|
||||||
|
-define(INIT_TIMEOUT, 60*1000).
|
||||||
|
|
||||||
start_link([{FluName, TcpPort, DataDir}|Rest])
|
start_link([{FluName, TcpPort, DataDir}|Rest])
|
||||||
when is_atom(FluName), is_integer(TcpPort), is_list(DataDir) ->
|
when is_atom(FluName), is_integer(TcpPort), is_list(DataDir) ->
|
||||||
{ok, spawn_link(fun() -> main2(FluName, TcpPort, DataDir, Rest) end)}.
|
proc_lib:start_link(?MODULE, main2, [FluName, TcpPort, DataDir, Rest],
|
||||||
|
?INIT_TIMEOUT).
|
||||||
|
|
||||||
stop(Pid) ->
|
stop(Pid) ->
|
||||||
case erlang:is_process_alive(Pid) of
|
case erlang:is_process_alive(Pid) of
|
||||||
|
@ -154,10 +157,7 @@ main2(FluName, TcpPort, DataDir, Props) ->
|
||||||
etstab=ets_table_name(FluName),
|
etstab=ets_table_name(FluName),
|
||||||
epoch_id=EpochId,
|
epoch_id=EpochId,
|
||||||
props=Props},
|
props=Props},
|
||||||
AppendPid = start_append_server(S0, self()),
|
{ok, AppendPid} = start_append_server(S0, self()),
|
||||||
receive
|
|
||||||
append_server_ack -> ok
|
|
||||||
end,
|
|
||||||
if SendAppendPidToProj_p ->
|
if SendAppendPidToProj_p ->
|
||||||
machi_projection_store:set_wedge_notify_pid(ProjectionPid,
|
machi_projection_store:set_wedge_notify_pid(ProjectionPid,
|
||||||
AppendPid);
|
AppendPid);
|
||||||
|
@ -165,7 +165,7 @@ main2(FluName, TcpPort, DataDir, Props) ->
|
||||||
ok
|
ok
|
||||||
end,
|
end,
|
||||||
S1 = S0#state{append_pid=AppendPid},
|
S1 = S0#state{append_pid=AppendPid},
|
||||||
ListenPid = start_listen_server(S1),
|
{ok, ListenPid} = start_listen_server(S1),
|
||||||
|
|
||||||
Config_e = machi_util:make_config_filename(DataDir, "unused"),
|
Config_e = machi_util:make_config_filename(DataDir, "unused"),
|
||||||
ok = filelib:ensure_dir(Config_e),
|
ok = filelib:ensure_dir(Config_e),
|
||||||
|
@ -178,6 +178,8 @@ main2(FluName, TcpPort, DataDir, Props) ->
|
||||||
put(flu_append_pid, S1#state.append_pid),
|
put(flu_append_pid, S1#state.append_pid),
|
||||||
put(flu_projection_pid, ProjectionPid),
|
put(flu_projection_pid, ProjectionPid),
|
||||||
put(flu_listen_pid, ListenPid),
|
put(flu_listen_pid, ListenPid),
|
||||||
|
proc_lib:init_ack({ok, self()}),
|
||||||
|
|
||||||
receive killme -> ok end,
|
receive killme -> ok end,
|
||||||
(catch exit(S1#state.append_pid, kill)),
|
(catch exit(S1#state.append_pid, kill)),
|
||||||
(catch exit(ProjectionPid, kill)),
|
(catch exit(ProjectionPid, kill)),
|
||||||
|
@ -185,11 +187,10 @@ main2(FluName, TcpPort, DataDir, Props) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
start_listen_server(S) ->
|
start_listen_server(S) ->
|
||||||
proc_lib:spawn_link(fun() -> run_listen_server(S) end).
|
proc_lib:start_link(?MODULE, run_listen_server, [S], ?INIT_TIMEOUT).
|
||||||
|
|
||||||
start_append_server(S, AckPid) ->
|
start_append_server(S, AckPid) ->
|
||||||
FluPid = self(),
|
proc_lib:start_link(?MODULE, run_append_server, [AckPid, S], ?INIT_TIMEOUT).
|
||||||
proc_lib:spawn_link(fun() -> run_append_server(FluPid, AckPid, S) end).
|
|
||||||
|
|
||||||
run_listen_server(#state{flu_name=FluName, tcp_port=TcpPort}=S) ->
|
run_listen_server(#state{flu_name=FluName, tcp_port=TcpPort}=S) ->
|
||||||
register(make_listener_regname(FluName), self()),
|
register(make_listener_regname(FluName), self()),
|
||||||
|
@ -198,6 +199,7 @@ run_listen_server(#state{flu_name=FluName, tcp_port=TcpPort}=S) ->
|
||||||
{backlog,8192}],
|
{backlog,8192}],
|
||||||
case gen_tcp:listen(TcpPort, SockOpts) of
|
case gen_tcp:listen(TcpPort, SockOpts) of
|
||||||
{ok, LSock} ->
|
{ok, LSock} ->
|
||||||
|
proc_lib:init_ack({ok, self()}),
|
||||||
listen_server_loop(LSock, S);
|
listen_server_loop(LSock, S);
|
||||||
Else ->
|
Else ->
|
||||||
error_logger:warning_msg("~s:run_listen_server: "
|
error_logger:warning_msg("~s:run_listen_server: "
|
||||||
|
@ -206,14 +208,14 @@ run_listen_server(#state{flu_name=FluName, tcp_port=TcpPort}=S) ->
|
||||||
exit({?MODULE, run_listen_server, tcp_port, TcpPort, Else})
|
exit({?MODULE, run_listen_server, tcp_port, TcpPort, Else})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
run_append_server(FluPid, AckPid, #state{flu_name=Name,
|
run_append_server(FluPid, #state{flu_name=Name,
|
||||||
wedged=Wedged_p,epoch_id=EpochId}=S) ->
|
wedged=Wedged_p,epoch_id=EpochId}=S) ->
|
||||||
%% Reminder: Name is the "main" name of the FLU, i.e., no suffix
|
%% Reminder: Name is the "main" name of the FLU, i.e., no suffix
|
||||||
register(Name, self()),
|
register(Name, self()),
|
||||||
TID = ets:new(ets_table_name(Name),
|
TID = ets:new(ets_table_name(Name),
|
||||||
[set, protected, named_table, {read_concurrency, true}]),
|
[set, protected, named_table, {read_concurrency, true}]),
|
||||||
ets:insert(TID, {epoch, {Wedged_p, EpochId}}),
|
ets:insert(TID, {epoch, {Wedged_p, EpochId}}),
|
||||||
AckPid ! append_server_ack,
|
proc_lib:init_ack({ok, self()}),
|
||||||
append_server_loop(FluPid, S#state{etstab=TID}).
|
append_server_loop(FluPid, S#state{etstab=TID}).
|
||||||
|
|
||||||
listen_server_loop(LSock, S) ->
|
listen_server_loop(LSock, S) ->
|
||||||
|
|
Loading…
Reference in a new issue