Initialize FLU package with ranch listener
This commit is contained in:
parent
9579b1b8b2
commit
7614910f36
4 changed files with 193 additions and 181 deletions
|
@ -58,10 +58,9 @@
|
||||||
|
|
||||||
-export([start_link/1, stop/1,
|
-export([start_link/1, stop/1,
|
||||||
update_wedge_state/3, wedge_myself/2]).
|
update_wedge_state/3, wedge_myself/2]).
|
||||||
-export([make_listener_regname/1, make_projection_server_regname/1]).
|
-export([make_projection_server_regname/1]).
|
||||||
%% TODO: remove or replace in OTP way after gen_*'ified
|
%% TODO: remove or replace in OTP way after gen_*'ified
|
||||||
-export([main2/4, run_append_server/2,
|
-export([main2/4, run_append_server/2,
|
||||||
%% run_listen_server/1,
|
|
||||||
current_state/1, format_state/1]).
|
current_state/1, format_state/1]).
|
||||||
|
|
||||||
-record(state, {
|
-record(state, {
|
||||||
|
@ -69,14 +68,9 @@
|
||||||
proj_store :: pid(),
|
proj_store :: pid(),
|
||||||
witness = false :: boolean(),
|
witness = false :: boolean(),
|
||||||
append_pid :: pid(),
|
append_pid :: pid(),
|
||||||
tcp_port :: non_neg_integer(),
|
|
||||||
data_dir :: string(),
|
|
||||||
wedged = true :: boolean(),
|
wedged = true :: boolean(),
|
||||||
etstab :: ets:tid(),
|
etstab :: ets:tid(),
|
||||||
epoch_id :: 'undefined' | machi_dt:epoch_id(),
|
epoch_id :: 'undefined' | machi_dt:epoch_id(),
|
||||||
pb_mode = undefined :: 'undefined' | 'high' | 'low',
|
|
||||||
high_clnt :: 'undefined' | pid(),
|
|
||||||
trim_table :: ets:tid(),
|
|
||||||
props = [] :: list() % proplist
|
props = [] :: list() % proplist
|
||||||
}).
|
}).
|
||||||
|
|
||||||
|
@ -153,8 +147,6 @@ main2(FluName, TcpPort, DataDir, Props) ->
|
||||||
|
|
||||||
S0 = #state{flu_name=FluName,
|
S0 = #state{flu_name=FluName,
|
||||||
proj_store=ProjectionPid,
|
proj_store=ProjectionPid,
|
||||||
tcp_port=TcpPort,
|
|
||||||
data_dir=DataDir,
|
|
||||||
wedged=Wedged_p,
|
wedged=Wedged_p,
|
||||||
witness=Witness_p,
|
witness=Witness_p,
|
||||||
etstab=ets_table_name(FluName),
|
etstab=ets_table_name(FluName),
|
||||||
|
@ -168,7 +160,8 @@ main2(FluName, TcpPort, DataDir, Props) ->
|
||||||
ok
|
ok
|
||||||
end,
|
end,
|
||||||
S1 = S0#state{append_pid=AppendPid},
|
S1 = S0#state{append_pid=AppendPid},
|
||||||
{ok, ListenPid} = start_listen_server(S1),
|
{ok, ListenerPid} = start_listen_server(TcpPort, DataDir, S1),
|
||||||
|
io:format(user, "Listener started: ~w~n", [{FluName, ListenerPid}]),
|
||||||
|
|
||||||
Config_e = machi_util:make_config_filename(DataDir, "unused"),
|
Config_e = machi_util:make_config_filename(DataDir, "unused"),
|
||||||
ok = filelib:ensure_dir(Config_e),
|
ok = filelib:ensure_dir(Config_e),
|
||||||
|
@ -180,22 +173,23 @@ main2(FluName, TcpPort, DataDir, Props) ->
|
||||||
put(flu_flu_name, FluName),
|
put(flu_flu_name, FluName),
|
||||||
put(flu_append_pid, S1#state.append_pid),
|
put(flu_append_pid, S1#state.append_pid),
|
||||||
put(flu_projection_pid, ProjectionPid),
|
put(flu_projection_pid, ProjectionPid),
|
||||||
put(flu_listen_pid, ListenPid),
|
put(flu_listen_pid, ListenerPid),
|
||||||
proc_lib:init_ack({ok, self()}),
|
proc_lib:init_ack({ok, self()}),
|
||||||
|
|
||||||
receive killme -> ok end,
|
receive killme -> ok end,
|
||||||
(catch exit(S1#state.append_pid, kill)),
|
(catch exit(S1#state.append_pid, kill)),
|
||||||
(catch exit(ProjectionPid, kill)),
|
(catch exit(ProjectionPid, kill)),
|
||||||
(catch exit(ListenPid, kill)),
|
(catch exit(ListenerPid, kill)),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
start_append_server(S, AckPid) ->
|
start_append_server(S, AckPid) ->
|
||||||
proc_lib:start_link(?MODULE, run_append_server, [AckPid, S], ?INIT_TIMEOUT).
|
proc_lib:start_link(?MODULE, run_append_server, [AckPid, S], ?INIT_TIMEOUT).
|
||||||
|
|
||||||
start_listen_server(_S) ->
|
start_listen_server(TcpPort, DataDir,
|
||||||
%% FIXMEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE
|
#state{flu_name=FluName, witness=Witness, etstab=EtsTab,
|
||||||
%% proc_lib:start_link(?MODULE, run_listen_server, [S], ?INIT_TIMEOUT).
|
proj_store=ProjStore}=_S) ->
|
||||||
{ok, dummy}.
|
machi_listener_sup:start_listener(FluName, TcpPort, Witness, DataDir,
|
||||||
|
EtsTab, ProjStore).
|
||||||
|
|
||||||
run_append_server(FluPid, #state{flu_name=Name,
|
run_append_server(FluPid, #state{flu_name=Name,
|
||||||
wedged=Wedged_p,epoch_id=EpochId}=S) ->
|
wedged=Wedged_p,epoch_id=EpochId}=S) ->
|
||||||
|
@ -307,9 +301,6 @@ handle_append(CoC_Namespace, CoC_Locator,
|
||||||
Error
|
Error
|
||||||
end.
|
end.
|
||||||
|
|
||||||
make_listener_regname(BaseName) ->
|
|
||||||
list_to_atom(atom_to_list(BaseName) ++ "_listener").
|
|
||||||
|
|
||||||
%% This is the name of the projection store that is spawned by the
|
%% This is the name of the projection store that is spawned by the
|
||||||
%% *flu*, for use primarily in testing scenarios. In normal use, we
|
%% *flu*, for use primarily in testing scenarios. In normal use, we
|
||||||
%% ought to be using the OTP style of managing processes, via
|
%% ought to be using the OTP style of managing processes, via
|
||||||
|
|
|
@ -143,21 +143,19 @@ init([FluName, TcpPort, DataDir, Props0]) ->
|
||||||
|
|
||||||
FProxySupSpec = machi_file_proxy_sup:child_spec(FluName),
|
FProxySupSpec = machi_file_proxy_sup:child_spec(FluName),
|
||||||
|
|
||||||
ListenerRegName = machi_flu1:make_listener_regname(FluName),
|
ListenerSupSpec = {machi_listener_sup:make_listener_sup_name(FluName),
|
||||||
NbAcceptors = 100,
|
{machi_listener_sup, start_link, [FluName]},
|
||||||
ListenerSpec = ranch:child_spec(ListenerRegName, NbAcceptors,
|
permanent, ?SHUTDOWN, supervisor, []},
|
||||||
ranch_tcp, [{port, TcpPort}],
|
|
||||||
machi_pb_protocol, []),
|
|
||||||
FluSpec = {FluName,
|
FluSpec = {FluName,
|
||||||
{machi_flu1, start_link,
|
{machi_flu1, start_link,
|
||||||
[ [{FluName, TcpPort+1, DataDir}|Props] ]},
|
[ [{FluName, TcpPort, DataDir}|Props] ]},
|
||||||
permanent, ?SHUTDOWN, worker, []},
|
permanent, ?SHUTDOWN, worker, []},
|
||||||
|
|
||||||
|
|
||||||
{ok, {SupFlags, [
|
{ok, {SupFlags, [
|
||||||
ProjSpec, FitnessSpec, MgrSpec,
|
ProjSpec, FitnessSpec, MgrSpec,
|
||||||
FProxySupSpec, FNameMgrSpec, MetaSupSpec,
|
FProxySupSpec, FNameMgrSpec, MetaSupSpec,
|
||||||
FluSpec, ListenerSpec]}}.
|
ListenerSupSpec, FluSpec]}}.
|
||||||
|
|
||||||
make_flu_regname(FluName) when is_atom(FluName) ->
|
make_flu_regname(FluName) when is_atom(FluName) ->
|
||||||
FluName.
|
FluName.
|
||||||
|
|
78
src/machi_listener_sup.erl
Normal file
78
src/machi_listener_sup.erl
Normal file
|
@ -0,0 +1,78 @@
|
||||||
|
%% -------------------------------------------------------------------
|
||||||
|
%%
|
||||||
|
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% This file is provided to you under the Apache License,
|
||||||
|
%% Version 2.0 (the "License"); you may not use this file
|
||||||
|
%% except in compliance with the License. You may obtain
|
||||||
|
%% a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing,
|
||||||
|
%% software distributed under the License is distributed on an
|
||||||
|
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
%% KIND, either express or implied. See the License for the
|
||||||
|
%% specific language governing permissions and limitations
|
||||||
|
%% under the License.
|
||||||
|
%%
|
||||||
|
%% -------------------------------------------------------------------
|
||||||
|
|
||||||
|
%% @doc This is the supervisor to hold ranch listener for sigle FLU,
|
||||||
|
%% holds at most one child worker.
|
||||||
|
|
||||||
|
%% TODO: This supervisor is maybe useless. First introduced by workaround
|
||||||
|
%% to start listener dynamically in flu1 initialization time.
|
||||||
|
%% Because psup is blocked in flu1 initialization time, adding a child
|
||||||
|
%% to psup leads to deadlock.
|
||||||
|
%% By the result of refactoring process, if initialization can be done
|
||||||
|
%% only by static arguments, then this supervisor should be removed
|
||||||
|
%% and add listener as a direct child of psup.
|
||||||
|
|
||||||
|
-module(machi_listener_sup).
|
||||||
|
-behaviour(supervisor).
|
||||||
|
|
||||||
|
%% public API
|
||||||
|
-export([start_link/1,
|
||||||
|
start_listener/6,
|
||||||
|
stop_listener/1,
|
||||||
|
make_listener_sup_name/1,
|
||||||
|
make_listener_name/1]).
|
||||||
|
|
||||||
|
%% supervisor callback
|
||||||
|
-export([init/1]).
|
||||||
|
|
||||||
|
-define(BACKLOG, 8192).
|
||||||
|
|
||||||
|
start_link(FluName) ->
|
||||||
|
supervisor:start_link({local, make_listener_sup_name(FluName)}, ?MODULE, []).
|
||||||
|
|
||||||
|
start_listener(FluName, TcpPort, Witness, DataDir, EpochTab, ProjStore) ->
|
||||||
|
supervisor:start_child(make_listener_sup_name(FluName),
|
||||||
|
child_spec(FluName, TcpPort, Witness, DataDir,
|
||||||
|
EpochTab, ProjStore)).
|
||||||
|
|
||||||
|
stop_listener(FluName) ->
|
||||||
|
SupName = make_listener_sup_name(FluName),
|
||||||
|
ListenerName = make_listener_name(FluName),
|
||||||
|
ok = supervisor:terminate_child(SupName, ListenerName),
|
||||||
|
ok = supervisor:delete_child(SupName, ListenerName).
|
||||||
|
|
||||||
|
make_listener_sup_name(FluName) when is_atom(FluName) ->
|
||||||
|
list_to_atom(atom_to_list(FluName) ++ "_listener_sup").
|
||||||
|
|
||||||
|
make_listener_name(FluName) ->
|
||||||
|
list_to_atom(atom_to_list(FluName) ++ "_listener").
|
||||||
|
|
||||||
|
init([]) ->
|
||||||
|
SupFlags = {one_for_one, 1000, 10},
|
||||||
|
{ok, {SupFlags, []}}.
|
||||||
|
|
||||||
|
child_spec(FluName, TcpPort, Witness, DataDir, EpochTab, ProjStore) ->
|
||||||
|
ListenerName = make_listener_name(FluName),
|
||||||
|
NbAcceptors = 100,
|
||||||
|
TcpOpts = [{port, TcpPort}, {backlog, ?BACKLOG}],
|
||||||
|
ProtoOpts = [FluName, Witness, DataDir, EpochTab, ProjStore],
|
||||||
|
ranch:child_spec(ListenerName, NbAcceptors,
|
||||||
|
ranch_tcp, TcpOpts,
|
||||||
|
machi_pb_protocol, ProtoOpts).
|
|
@ -19,7 +19,7 @@
|
||||||
%% -------------------------------------------------------------------
|
%% -------------------------------------------------------------------
|
||||||
|
|
||||||
%% @doc Ranch protocol callback module to handle PB protocol over
|
%% @doc Ranch protocol callback module to handle PB protocol over
|
||||||
%% transport
|
%% transport, including both high and low modes.
|
||||||
|
|
||||||
%% TODO
|
%% TODO
|
||||||
%% - Two modes, high and low should be separated at listener level?
|
%% - Two modes, high and low should be separated at listener level?
|
||||||
|
@ -28,172 +28,114 @@
|
||||||
|
|
||||||
-behaviour(gen_server).
|
-behaviour(gen_server).
|
||||||
-behaviour(ranch_protocol).
|
-behaviour(ranch_protocol).
|
||||||
|
|
||||||
-export([start_link/4]).
|
-export([start_link/4]).
|
||||||
-export([init/1]).
|
-export([init/1]).
|
||||||
-export([handle_call/3, handle_cast/2, handle_info/2,
|
-export([handle_call/3, handle_cast/2, handle_info/2,
|
||||||
terminate/2, code_change/3]).
|
terminate/2, code_change/3]).
|
||||||
|
|
||||||
|
-include_lib("kernel/include/file.hrl").
|
||||||
|
|
||||||
-include("machi.hrl").
|
-include("machi.hrl").
|
||||||
-include("machi_pb.hrl").
|
-include("machi_pb.hrl").
|
||||||
-include("machi_projection.hrl").
|
-include("machi_projection.hrl").
|
||||||
-define(V(X,Y), ok).
|
|
||||||
%% -include("machi_verbose.hrl").
|
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
-endif. % TEST
|
-endif. % TEST
|
||||||
|
|
||||||
-record(state, {ref,
|
-record(state, {
|
||||||
socket,
|
%% Transport related items passed from Ranch
|
||||||
transport,
|
ref :: ranch:ref(),
|
||||||
opts,
|
socket :: socket(),
|
||||||
pb_mode,
|
transport :: module(),
|
||||||
data_dir,
|
|
||||||
witness,
|
|
||||||
%% - Used in projection related requests in low mode
|
|
||||||
%% - Used in spawning CR client in high mode
|
|
||||||
proj_store,
|
|
||||||
%%%% Low mode only
|
|
||||||
%% Current best knowledge, used for wedge_self / bad_epoch check
|
|
||||||
epoch_id,
|
|
||||||
%% Used in dispatching append_chunk* reqs to the
|
|
||||||
%% append serializing process
|
|
||||||
flu_name,
|
|
||||||
%% Stored in ETS before factorization, can be stored in the recored?
|
|
||||||
wedged,
|
|
||||||
%% Used in server_wedge_status to lookup the table
|
|
||||||
etstab,
|
|
||||||
%%%% High mode only
|
|
||||||
high_clnt,
|
|
||||||
%%%% to be removed
|
|
||||||
eof
|
|
||||||
}).
|
|
||||||
|
|
||||||
%% -record(state, {
|
%% Machi application related items below
|
||||||
%% used in append serializer to trigger chain mgr react_to_env
|
data_dir :: string(),
|
||||||
%% flu_name :: atom(),
|
witness :: boolean(),
|
||||||
%% proj_store :: pid(),
|
pb_mode :: undefined | high | low,
|
||||||
%% witness = false :: boolean(),
|
%% - Used in projection related requests in low mode
|
||||||
%% append_pid :: pid(),
|
%% - Used in spawning CR client in high mode
|
||||||
%% tcp_port :: non_neg_integer(),
|
proj_store :: pid(),
|
||||||
%% data_dir :: string(),
|
|
||||||
%% wedged = true :: boolean(),
|
|
||||||
%% etstab :: ets:tid(),
|
|
||||||
%% epoch_id :: 'undefined' | machi_dt:epoch_id(),
|
|
||||||
%% pb_mode = undefined :: 'undefined' | 'high' | 'low',
|
|
||||||
%% high_clnt :: 'undefined' | pid(),
|
|
||||||
%% trim_table :: ets:tid(),
|
|
||||||
%% props = [] :: list() % proplist
|
|
||||||
%% }).
|
|
||||||
|
|
||||||
-spec start_link(ranch:ref(), any(), module(), any()) -> {ok, pid()}.
|
%% Low mode only
|
||||||
start_link(Ref, Socket, Transport, Opts) ->
|
%% Current best knowledge, used for wedge_self / bad_epoch check
|
||||||
proc_lib:start_link(?MODULE, init, [#state{ref=Ref, socket=Socket,
|
epoch_id :: undefined | machi_dt:epoch_id(),
|
||||||
transport=Transport,
|
%% Used in dispatching append_chunk* reqs to the
|
||||||
opts=Opts}]).
|
%% append serializing process
|
||||||
|
flu_name :: atom(),
|
||||||
|
%% Used in server_wedge_status to lookup the table
|
||||||
|
epoch_tab :: ets:tid(),
|
||||||
|
|
||||||
init(#state{ref=Ref, socket=Socket, transport=Transport, opts=_Opts}=State) ->
|
%% High mode only
|
||||||
ok = proc_lib:init_ack({ok, self()}),
|
high_clnt :: pid(),
|
||||||
%% TODO: Perform any required state initialization here.
|
|
||||||
ok = ranch:accept_ack(Ref),
|
%% anything you want
|
||||||
ok = Transport:setopts(Socket, [{active, once}]),
|
props = [] :: list() % proplist
|
||||||
gen_server:enter_loop(?MODULE, [], State).
|
}).
|
||||||
|
|
||||||
|
-type socket() :: any().
|
||||||
|
-type state() :: #state{}.
|
||||||
|
|
||||||
|
-spec start_link(ranch:ref(), socket(), module(), [term()]) -> {ok, pid()}.
|
||||||
|
start_link(Ref, Socket, Transport, [FluName, Witness, DataDir, EpochTab, ProjStore]) ->
|
||||||
|
proc_lib:start_link(?MODULE, init, [#state{ref=Ref,
|
||||||
|
socket=Socket,
|
||||||
|
transport=Transport,
|
||||||
|
flu_name=FluName,
|
||||||
|
witness=Witness,
|
||||||
|
data_dir=DataDir,
|
||||||
|
epoch_tab=EpochTab,
|
||||||
|
proj_store=ProjStore}]).
|
||||||
|
|
||||||
|
-spec init(state()) -> no_return().
|
||||||
|
init(#state{ref=Ref, socket=Socket, transport=Transport}=State) ->
|
||||||
|
ok = proc_lib:init_ack({ok, self()}),
|
||||||
|
ok = ranch:accept_ack(Ref),
|
||||||
|
{_Wedged_p, CurrentEpochID} = lookup_epoch(State),
|
||||||
|
ok = Transport:setopts(Socket, [{active, once}|?PB_PACKET_OPTS]),
|
||||||
|
gen_server:enter_loop(?MODULE, [], State#state{epoch_id=CurrentEpochID}).
|
||||||
|
|
||||||
handle_call(Request, _From, S) ->
|
handle_call(Request, _From, S) ->
|
||||||
|
lager:warning("~s:handle_call UNKNOWN message: ~w", [?MODULE, Request]),
|
||||||
Reply = {error, {unknown_message, Request}},
|
Reply = {error, {unknown_message, Request}},
|
||||||
{reply, Reply, S}.
|
{reply, Reply, S}.
|
||||||
|
|
||||||
handle_cast(_Msg, S) ->
|
handle_cast(_Msg, S) ->
|
||||||
io:format(user, "~s:handle_cast: ~p\n", [?MODULE, _Msg]),
|
lager:warning("~s:handle_cast UNKNOWN message: ~w", [?MODULE, _Msg]),
|
||||||
{noreply, S}.
|
{noreply, S}.
|
||||||
|
|
||||||
handle_info({tcp, Sock, Data}=_Info, S) ->
|
%% TODO: Other transport support needed?? TLS/SSL, SCTP
|
||||||
io:format(user, "~s:handle_info: ~p\n", [?MODULE, _Info]),
|
handle_info({tcp, Sock, Data}=_Info, #state{socket=Sock}=S) ->
|
||||||
|
lager:debug("~s:handle_info: ~w", [?MODULE, _Info]),
|
||||||
transport_received(Sock, Data, S);
|
transport_received(Sock, Data, S);
|
||||||
handle_info({tcp_closed, Sock}=_Info, S) ->
|
handle_info({tcp_closed, Sock}=_Info, #state{socket=Sock}=S) ->
|
||||||
io:format(user, "~s:handle_info: ~p\n", [?MODULE, _Info]),
|
lager:debug("~s:handle_info: ~w", [?MODULE, _Info]),
|
||||||
transport_closed(Sock, S);
|
transport_closed(Sock, S);
|
||||||
handle_info({tcp_error, Sock, Reason}=_Info, S) ->
|
handle_info({tcp_error, Sock, Reason}=_Info, #state{socket=Sock}=S) ->
|
||||||
io:format(user, "~s:handle_info: ~p\n", [?MODULE, _Info]),
|
lager:debug("~s:handle_info: ~w", [?MODULE, _Info]),
|
||||||
transport_error(Sock, Reason, S);
|
transport_error(Sock, Reason, S);
|
||||||
handle_info(_Info, S) ->
|
handle_info(_Info, S) ->
|
||||||
io:format(user, "~s:handle_info: ~p\n", [?MODULE, _Info]),
|
lager:warning("~s:handle_info UNKNOWN message: ~w", [?MODULE, _Info]),
|
||||||
{noreply, S}.
|
{noreply, S}.
|
||||||
|
|
||||||
terminate(_Reason, _S) ->
|
terminate(_Reason, #state{socket=undefined}=_S) ->
|
||||||
io:format(user, "~s:terminate: ~p\n", [?MODULE, _Reason]),
|
lager:debug("~s:terminate: ~w", [?MODULE, _Reason]),
|
||||||
|
ok;
|
||||||
|
terminate(_Reason, #state{socket=Socket}=_S) ->
|
||||||
|
lager:debug("~s:terminate: ~w", [?MODULE, _Reason]),
|
||||||
|
(catch gen_tcp:close(Socket)),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
code_change(_OldVsn, S, _Extra) ->
|
code_change(_OldVsn, S, _Extra) ->
|
||||||
{ok, S}.
|
{ok, S}.
|
||||||
|
|
||||||
%% Internal functions, or copy-n-paste'd thingie
|
%% -- private
|
||||||
|
|
||||||
%%%% Just copied and will be removed %%%%
|
|
||||||
|
|
||||||
%% TODO: sock opts should be migrated to ranch equivalent
|
|
||||||
%% run_listen_server(#state{flu_name=FluName, tcp_port=TcpPort}=S) ->
|
|
||||||
%% register(make_listener_regname(FluName), self()),
|
|
||||||
%% SockOpts = ?PB_PACKET_OPTS ++
|
|
||||||
%% [{reuseaddr, true}, {mode, binary}, {active, false},
|
|
||||||
%% {backlog,8192}],
|
|
||||||
%% case gen_tcp:listen(TcpPort, SockOpts) of
|
|
||||||
%% {ok, LSock} ->
|
|
||||||
%% proc_lib:init_ack({ok, self()}),
|
|
||||||
%% listen_server_loop(LSock, S);
|
|
||||||
%% Else ->
|
|
||||||
%% error_logger:warning_msg("~s:run_listen_server: "
|
|
||||||
%% "listen to TCP port ~w: ~w\n",
|
|
||||||
%% [?MODULE, TcpPort, Else]),
|
|
||||||
%% exit({?MODULE, run_listen_server, tcp_port, TcpPort, Else})
|
|
||||||
%% end.
|
|
||||||
|
|
||||||
%% listen_server_loop(LSock, S) ->
|
|
||||||
%% {ok, Sock} = gen_tcp:accept(LSock),
|
|
||||||
%% spawn_link(fun() -> net_server_loop(Sock, S) end),
|
|
||||||
%% listen_server_loop(LSock, S).
|
|
||||||
|
|
||||||
%% net_server_loop(Sock, S) ->
|
|
||||||
%% case gen_tcp:recv(Sock, 0, ?SERVER_CMD_READ_TIMEOUT) of
|
|
||||||
%% {ok, Bin} ->
|
|
||||||
%% {RespBin, S2} =
|
|
||||||
%% case machi_pb:decode_mpb_ll_request(Bin) of
|
|
||||||
%% LL_req when LL_req#mpb_ll_request.do_not_alter == 2 ->
|
|
||||||
%% {R, NewS} = do_pb_ll_request(LL_req, S),
|
|
||||||
%% {maybe_encode_response(R), mode(low, NewS)};
|
|
||||||
%% _ ->
|
|
||||||
%% HL_req = machi_pb:decode_mpb_request(Bin),
|
|
||||||
%% 1 = HL_req#mpb_request.do_not_alter,
|
|
||||||
%% {R, NewS} = do_pb_hl_request(HL_req, make_high_clnt(S)),
|
|
||||||
%% {machi_pb:encode_mpb_response(R), mode(high, NewS)}
|
|
||||||
%% end,
|
|
||||||
%% if RespBin == async_no_response ->
|
|
||||||
%% net_server_loop(Sock, S2);
|
|
||||||
%% true ->
|
|
||||||
%% case gen_tcp:send(Sock, RespBin) of
|
|
||||||
%% ok ->
|
|
||||||
%% net_server_loop(Sock, S2);
|
|
||||||
%% {error, _} ->
|
|
||||||
%% (catch gen_tcp:close(Sock)),
|
|
||||||
%% exit(normal)
|
|
||||||
%% end
|
|
||||||
%% end;
|
|
||||||
%% {error, SockError} ->
|
|
||||||
%% Msg = io_lib:format("Socket error ~w", [SockError]),
|
|
||||||
%% R = #mpb_ll_response{req_id= <<>>,
|
|
||||||
%% generic=#mpb_errorresp{code=1, msg=Msg}},
|
|
||||||
%% _Resp = machi_pb:encode_mpb_ll_response(R),
|
|
||||||
%% %% TODO: Weird that sometimes neither catch nor try/catch
|
|
||||||
%% %% can prevent OTP's SASL from logging an error here.
|
|
||||||
%% %% Error in process <0.545.0> with exit value: {badarg,[{erlang,port_command,.......
|
|
||||||
%% %% TODO: is this what causes the intermittent PULSE deadlock errors?
|
|
||||||
%% %% _ = (catch gen_tcp:send(Sock, _Resp)), timer:sleep(1000),
|
|
||||||
%% (catch gen_tcp:close(Sock)),
|
|
||||||
%% exit(normal)
|
|
||||||
%% end.
|
|
||||||
|
|
||||||
%%%% Common transport handling
|
%%%% Common transport handling
|
||||||
|
|
||||||
|
-spec transport_received(socket(), machi_dt:chunk(), state()) ->
|
||||||
|
{noreply, state()}.
|
||||||
transport_received(Sock, Bin, #state{transport=Transport}=S) ->
|
transport_received(Sock, Bin, #state{transport=Transport}=S) ->
|
||||||
{RespBin, S2} =
|
{RespBin, S2} =
|
||||||
case machi_pb:decode_mpb_ll_request(Bin) of
|
case machi_pb:decode_mpb_ll_request(Bin) of
|
||||||
|
@ -207,33 +149,36 @@ transport_received(Sock, Bin, #state{transport=Transport}=S) ->
|
||||||
{machi_pb:encode_mpb_response(R), mode(high, NewS)}
|
{machi_pb:encode_mpb_response(R), mode(high, NewS)}
|
||||||
end,
|
end,
|
||||||
if RespBin == async_no_response ->
|
if RespBin == async_no_response ->
|
||||||
|
Transport:setopts(Sock, [{active, once}]),
|
||||||
{noreply, S2};
|
{noreply, S2};
|
||||||
true ->
|
true ->
|
||||||
case Transport:send(Sock, RespBin) of
|
case Transport:send(Sock, RespBin) of
|
||||||
ok ->
|
ok ->
|
||||||
|
Transport:setopts(Sock, [{active, once}]),
|
||||||
{noreply, S2};
|
{noreply, S2};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
transport_error(Sock, Reason, S2)
|
transport_error(Sock, Reason, S2)
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
transport_closed(Sock, S) ->
|
-spec transport_closed(socket(), state()) -> {stop, term(), state()}.
|
||||||
(catch gen_tcp:close(Sock)),
|
transport_closed(_Socket, S) ->
|
||||||
{stop, normal, S#state{sock=undefined}}.
|
{stop, normal, S}.
|
||||||
|
|
||||||
transport_error(Sock, Reason, S) ->
|
-spec transport_error(socket(), term(), state()) -> no_return().
|
||||||
Msg = io_lib:format("Socket error ~w", [SockError]),
|
transport_error(Sock, Reason, #state{transport=Transport}=_S) ->
|
||||||
|
Msg = io_lib:format("Socket error ~w", [Reason]),
|
||||||
R = #mpb_ll_response{req_id= <<>>,
|
R = #mpb_ll_response{req_id= <<>>,
|
||||||
generic=#mpb_errorresp{code=1, msg=Msg}},
|
generic=#mpb_errorresp{code=1, msg=Msg}},
|
||||||
_Resp = machi_pb:encode_mpb_ll_response(R),
|
_Resp = machi_pb:encode_mpb_ll_response(R),
|
||||||
%% TODO of TODO comments: comments below with four %s are copy-n-paste'd,
|
%% TODO for TODO comments: comments below with four %s are copy-n-paste'd,
|
||||||
%% then it should be considered they are still open and should be addressed.
|
%% then it should be considered they are still open and should be addressed.
|
||||||
%%%% TODO: Weird that sometimes neither catch nor try/catch
|
%%%% TODO: Weird that sometimes neither catch nor try/catch
|
||||||
%%%% can prevent OTP's SASL from logging an error here.
|
%%%% can prevent OTP's SASL from logging an error here.
|
||||||
%%%% Error in process <0.545.0> with exit value: {badarg,[{erlang,port_command,.......
|
%%%% Error in process <0.545.0> with exit value: {badarg,[{erlang,port_command,.......
|
||||||
%%%% TODO: is this what causes the intermittent PULSE deadlock errors?
|
%%%% TODO: is this what causes the intermittent PULSE deadlock errors?
|
||||||
%%%% _ = (catch gen_tcp:send(Sock, _Resp)), timer:sleep(1000),
|
%%%% _ = (catch gen_tcp:send(Sock, _Resp)), timer:sleep(1000),
|
||||||
(catch gen_tcp:close(Sock)),
|
(catch Transport:close(Sock)),
|
||||||
%% TODO: better to exit with `Reason'?
|
%% TODO: better to exit with `Reason'?
|
||||||
exit(normal).
|
exit(normal).
|
||||||
|
|
||||||
|
@ -242,14 +187,11 @@ maybe_encode_response(async_no_response=X) ->
|
||||||
maybe_encode_response(R) ->
|
maybe_encode_response(R) ->
|
||||||
machi_pb:encode_mpb_ll_response(R).
|
machi_pb:encode_mpb_ll_response(R).
|
||||||
|
|
||||||
%%%% Not categorized / not-yet-well-understood items
|
|
||||||
%% TODO: may be external API
|
|
||||||
mode(Mode, #state{pb_mode=undefined}=S) ->
|
mode(Mode, #state{pb_mode=undefined}=S) ->
|
||||||
S#state{pb_mode=Mode};
|
S#state{pb_mode=Mode};
|
||||||
mode(_, S) ->
|
mode(_, S) ->
|
||||||
S.
|
S.
|
||||||
|
|
||||||
|
|
||||||
%%%% Low PB mode %%%%
|
%%%% Low PB mode %%%%
|
||||||
|
|
||||||
do_pb_ll_request(#mpb_ll_request{req_id=ReqID}, #state{pb_mode=high}=S) ->
|
do_pb_ll_request(#mpb_ll_request{req_id=ReqID}, #state{pb_mode=high}=S) ->
|
||||||
|
@ -257,24 +199,27 @@ do_pb_ll_request(#mpb_ll_request{req_id=ReqID}, #state{pb_mode=high}=S) ->
|
||||||
{machi_pb_translate:to_pb_response(ReqID, unused, Result), S};
|
{machi_pb_translate:to_pb_response(ReqID, unused, Result), S};
|
||||||
do_pb_ll_request(PB_request, S) ->
|
do_pb_ll_request(PB_request, S) ->
|
||||||
Req = machi_pb_translate:from_pb_request(PB_request),
|
Req = machi_pb_translate:from_pb_request(PB_request),
|
||||||
|
%% io:format(user, "[~w] do_pb_ll_request Req: ~w~n", [S#state.flu_name, Req]),
|
||||||
{ReqID, Cmd, Result, S2} =
|
{ReqID, Cmd, Result, S2} =
|
||||||
case Req of
|
case Req of
|
||||||
{RqID, {LowCmd, _}=CMD}
|
{RqID, {LowCmd, _}=Cmd0}
|
||||||
when LowCmd == low_proj;
|
when LowCmd =:= low_proj;
|
||||||
LowCmd == low_wedge_status; LowCmd == low_list_files ->
|
LowCmd =:= low_wedge_status;
|
||||||
|
LowCmd =:= low_list_files ->
|
||||||
%% Skip wedge check for projection commands!
|
%% Skip wedge check for projection commands!
|
||||||
%% Skip wedge check for these unprivileged commands
|
%% Skip wedge check for these unprivileged commands
|
||||||
{Rs, NewS} = do_pb_ll_request3(CMD, S),
|
{Rs, NewS} = do_pb_ll_request3(Cmd0, S),
|
||||||
{RqID, CMD, Rs, NewS};
|
{RqID, Cmd0, Rs, NewS};
|
||||||
{RqID, CMD} ->
|
{RqID, Cmd0} ->
|
||||||
EpochID = element(2, CMD), % by common convention
|
EpochID = element(2, Cmd0), % by common convention
|
||||||
{Rs, NewS} = do_pb_ll_request2(EpochID, CMD, S),
|
{Rs, NewS} = do_pb_ll_request2(EpochID, Cmd0, S),
|
||||||
{RqID, CMD, Rs, NewS}
|
{RqID, Cmd0, Rs, NewS}
|
||||||
end,
|
end,
|
||||||
{machi_pb_translate:to_pb_response(ReqID, Cmd, Result), S2}.
|
{machi_pb_translate:to_pb_response(ReqID, Cmd, Result), S2}.
|
||||||
|
|
||||||
do_pb_ll_request2(EpochID, CMD, S) ->
|
do_pb_ll_request2(EpochID, CMD, S) ->
|
||||||
{Wedged_p, CurrentEpochID} = ets:lookup_element(S#state.etstab, epoch, 2),
|
{Wedged_p, CurrentEpochID} = lookup_epoch(S),
|
||||||
|
%% io:format(user, "{Wedged_p, CurrentEpochID}: ~w~n", [{Wedged_p, CurrentEpochID}]),
|
||||||
if Wedged_p == true ->
|
if Wedged_p == true ->
|
||||||
{{error, wedged}, S#state{epoch_id=CurrentEpochID}};
|
{{error, wedged}, S#state{epoch_id=CurrentEpochID}};
|
||||||
is_tuple(EpochID)
|
is_tuple(EpochID)
|
||||||
|
@ -287,7 +232,7 @@ do_pb_ll_request2(EpochID, CMD, S) ->
|
||||||
true ->
|
true ->
|
||||||
%% We're at same epoch # but different checksum, or
|
%% We're at same epoch # but different checksum, or
|
||||||
%% we're at a newer/bigger epoch #.
|
%% we're at a newer/bigger epoch #.
|
||||||
_ = wedge_myself(S#state.flu_name, CurrentEpochID),
|
_ = machi_flu1:wedge_myself(S#state.flu_name, CurrentEpochID),
|
||||||
ok
|
ok
|
||||||
end,
|
end,
|
||||||
{{error, bad_epoch}, S#state{epoch_id=CurrentEpochID}};
|
{{error, bad_epoch}, S#state{epoch_id=CurrentEpochID}};
|
||||||
|
@ -295,6 +240,9 @@ do_pb_ll_request2(EpochID, CMD, S) ->
|
||||||
do_pb_ll_request3(CMD, S#state{epoch_id=CurrentEpochID})
|
do_pb_ll_request3(CMD, S#state{epoch_id=CurrentEpochID})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
lookup_epoch(#state{epoch_tab=T}) ->
|
||||||
|
ets:lookup_element(T, epoch, 2).
|
||||||
|
|
||||||
%% Witness status does not matter below.
|
%% Witness status does not matter below.
|
||||||
do_pb_ll_request3({low_echo, _BogusEpochID, Msg}, S) ->
|
do_pb_ll_request3({low_echo, _BogusEpochID, Msg}, S) ->
|
||||||
{Msg, S};
|
{Msg, S};
|
||||||
|
@ -498,7 +446,7 @@ do_server_list_files(#state{data_dir=DataDir}=_S) ->
|
||||||
end || File <- Files]}.
|
end || File <- Files]}.
|
||||||
|
|
||||||
do_server_wedge_status(S) ->
|
do_server_wedge_status(S) ->
|
||||||
{Wedged_p, CurrentEpochID0} = ets:lookup_element(S#state.etstab, epoch, 2),
|
{Wedged_p, CurrentEpochID0} = lookup_epoch(S),
|
||||||
CurrentEpochID = if CurrentEpochID0 == undefined ->
|
CurrentEpochID = if CurrentEpochID0 == undefined ->
|
||||||
?DUMMY_PV1_EPOCH;
|
?DUMMY_PV1_EPOCH;
|
||||||
true ->
|
true ->
|
||||||
|
@ -589,9 +537,6 @@ check_or_make_tagged_checksum(?CSUM_TAG_CLIENT_SHA, Client_CSum, Chunk) ->
|
||||||
throw({bad_csum, CS})
|
throw({bad_csum, CS})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
%%%% High PB mode %%%%
|
%%%% High PB mode %%%%
|
||||||
|
|
||||||
do_pb_hl_request(#mpb_request{req_id=ReqID}, #state{pb_mode=low}=S) ->
|
do_pb_hl_request(#mpb_request{req_id=ReqID}, #state{pb_mode=low}=S) ->
|
||||||
|
|
Loading…
Reference in a new issue