Merge pull request #22 from basho/ss-flu1-init-sync

Make flu1 initialization synchronous
This commit is contained in:
Scott Lystig Fritchie 2015-10-21 18:36:12 +09:00
commit 177aca0a68

View file

@ -56,13 +56,12 @@
-include_lib("eunit/include/eunit.hrl").
-endif. % TEST
-define(SERVER_CMD_READ_TIMEOUT, 600*1000).
-export([start_link/1, stop/1,
update_wedge_state/3, wedge_myself/2]).
-export([make_listener_regname/1, make_projection_server_regname/1]).
%% TODO: remove or replace in OTP way after gen_*'ified
-export([current_state/1, format_state/1]).
-export([main2/4, run_append_server/2, run_listen_server/1,
current_state/1, format_state/1]).
-record(state, {
flu_name :: atom(),
@ -79,9 +78,13 @@
props = [] :: list() % proplist
}).
-define(SERVER_CMD_READ_TIMEOUT, 600*1000).
-define(INIT_TIMEOUT, 60*1000).
start_link([{FluName, TcpPort, DataDir}|Rest])
when is_atom(FluName), is_integer(TcpPort), is_list(DataDir) ->
{ok, spawn_link(fun() -> main2(FluName, TcpPort, DataDir, Rest) end)}.
proc_lib:start_link(?MODULE, main2, [FluName, TcpPort, DataDir, Rest],
?INIT_TIMEOUT).
stop(Pid) ->
case erlang:is_process_alive(Pid) of
@ -154,10 +157,7 @@ main2(FluName, TcpPort, DataDir, Props) ->
etstab=ets_table_name(FluName),
epoch_id=EpochId,
props=Props},
AppendPid = start_append_server(S0, self()),
receive
append_server_ack -> ok
end,
{ok, AppendPid} = start_append_server(S0, self()),
if SendAppendPidToProj_p ->
machi_projection_store:set_wedge_notify_pid(ProjectionPid,
AppendPid);
@ -165,7 +165,7 @@ main2(FluName, TcpPort, DataDir, Props) ->
ok
end,
S1 = S0#state{append_pid=AppendPid},
ListenPid = start_listen_server(S1),
{ok, ListenPid} = start_listen_server(S1),
Config_e = machi_util:make_config_filename(DataDir, "unused"),
ok = filelib:ensure_dir(Config_e),
@ -178,6 +178,8 @@ main2(FluName, TcpPort, DataDir, Props) ->
put(flu_append_pid, S1#state.append_pid),
put(flu_projection_pid, ProjectionPid),
put(flu_listen_pid, ListenPid),
proc_lib:init_ack({ok, self()}),
receive killme -> ok end,
(catch exit(S1#state.append_pid, kill)),
(catch exit(ProjectionPid, kill)),
@ -185,11 +187,10 @@ main2(FluName, TcpPort, DataDir, Props) ->
ok.
start_listen_server(S) ->
proc_lib:spawn_link(fun() -> run_listen_server(S) end).
proc_lib:start_link(?MODULE, run_listen_server, [S], ?INIT_TIMEOUT).
start_append_server(S, AckPid) ->
FluPid = self(),
proc_lib:spawn_link(fun() -> run_append_server(FluPid, AckPid, S) end).
proc_lib:start_link(?MODULE, run_append_server, [AckPid, S], ?INIT_TIMEOUT).
run_listen_server(#state{flu_name=FluName, tcp_port=TcpPort}=S) ->
register(make_listener_regname(FluName), self()),
@ -198,6 +199,7 @@ run_listen_server(#state{flu_name=FluName, tcp_port=TcpPort}=S) ->
{backlog,8192}],
case gen_tcp:listen(TcpPort, SockOpts) of
{ok, LSock} ->
proc_lib:init_ack({ok, self()}),
listen_server_loop(LSock, S);
Else ->
error_logger:warning_msg("~s:run_listen_server: "
@ -206,14 +208,14 @@ run_listen_server(#state{flu_name=FluName, tcp_port=TcpPort}=S) ->
exit({?MODULE, run_listen_server, tcp_port, TcpPort, Else})
end.
run_append_server(FluPid, AckPid, #state{flu_name=Name,
wedged=Wedged_p,epoch_id=EpochId}=S) ->
run_append_server(FluPid, #state{flu_name=Name,
wedged=Wedged_p,epoch_id=EpochId}=S) ->
%% Reminder: Name is the "main" name of the FLU, i.e., no suffix
register(Name, self()),
TID = ets:new(ets_table_name(Name),
[set, protected, named_table, {read_concurrency, true}]),
ets:insert(TID, {epoch, {Wedged_p, EpochId}}),
AckPid ! append_server_ack,
proc_lib:init_ack({ok, self()}),
append_server_loop(FluPid, S#state{etstab=TID}).
listen_server_loop(LSock, S) ->