Introduce ranch and add transport callback
This commit is contained in:
parent
a8785e44b1
commit
9579b1b8b2
5 changed files with 666 additions and 474 deletions
|
@ -11,6 +11,7 @@
|
||||||
{lager, ".*", {git, "git://github.com/basho/lager.git", {tag, "2.2.0"}}},
|
{lager, ".*", {git, "git://github.com/basho/lager.git", {tag, "2.2.0"}}},
|
||||||
{protobuffs, "0.8.*", {git, "git://github.com/basho/erlang_protobuffs.git", {tag, "0.8.1p4"}}},
|
{protobuffs, "0.8.*", {git, "git://github.com/basho/erlang_protobuffs.git", {tag, "0.8.1p4"}}},
|
||||||
{riak_dt, ".*", {git, "git://github.com/basho/riak_dt.git", {branch, "develop"}}},
|
{riak_dt, ".*", {git, "git://github.com/basho/riak_dt.git", {branch, "develop"}}},
|
||||||
|
{ranch, ".*", {git, "git://github.com/ninenines/ranch.git", {branch, "master"}}},
|
||||||
{node_package, ".*", {git, "git://github.com/basho/node_package.git", {branch, "develop"}}},
|
{node_package, ".*", {git, "git://github.com/basho/node_package.git", {branch, "develop"}}},
|
||||||
{eper, ".*", {git, "git://github.com/basho/eper.git", {tag, "0.92-basho1"}}},
|
{eper, ".*", {git, "git://github.com/basho/eper.git", {tag, "0.92-basho1"}}},
|
||||||
{cluster_info, ".*", {git, "git://github.com/basho/cluster_info", {branch, "develop"}}}
|
{cluster_info, ".*", {git, "git://github.com/basho/cluster_info", {branch, "develop"}}}
|
||||||
|
|
|
@ -60,7 +60,8 @@
|
||||||
update_wedge_state/3, wedge_myself/2]).
|
update_wedge_state/3, wedge_myself/2]).
|
||||||
-export([make_listener_regname/1, make_projection_server_regname/1]).
|
-export([make_listener_regname/1, make_projection_server_regname/1]).
|
||||||
%% TODO: remove or replace in OTP way after gen_*'ified
|
%% TODO: remove or replace in OTP way after gen_*'ified
|
||||||
-export([main2/4, run_append_server/2, run_listen_server/1,
|
-export([main2/4, run_append_server/2,
|
||||||
|
%% run_listen_server/1,
|
||||||
current_state/1, format_state/1]).
|
current_state/1, format_state/1]).
|
||||||
|
|
||||||
-record(state, {
|
-record(state, {
|
||||||
|
@ -188,27 +189,13 @@ main2(FluName, TcpPort, DataDir, Props) ->
|
||||||
(catch exit(ListenPid, kill)),
|
(catch exit(ListenPid, kill)),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
start_listen_server(S) ->
|
|
||||||
proc_lib:start_link(?MODULE, run_listen_server, [S], ?INIT_TIMEOUT).
|
|
||||||
|
|
||||||
start_append_server(S, AckPid) ->
|
start_append_server(S, AckPid) ->
|
||||||
proc_lib:start_link(?MODULE, run_append_server, [AckPid, S], ?INIT_TIMEOUT).
|
proc_lib:start_link(?MODULE, run_append_server, [AckPid, S], ?INIT_TIMEOUT).
|
||||||
|
|
||||||
run_listen_server(#state{flu_name=FluName, tcp_port=TcpPort}=S) ->
|
start_listen_server(_S) ->
|
||||||
register(make_listener_regname(FluName), self()),
|
%% FIXMEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE
|
||||||
SockOpts = ?PB_PACKET_OPTS ++
|
%% proc_lib:start_link(?MODULE, run_listen_server, [S], ?INIT_TIMEOUT).
|
||||||
[{reuseaddr, true}, {mode, binary}, {active, false},
|
{ok, dummy}.
|
||||||
{backlog,8192}],
|
|
||||||
case gen_tcp:listen(TcpPort, SockOpts) of
|
|
||||||
{ok, LSock} ->
|
|
||||||
proc_lib:init_ack({ok, self()}),
|
|
||||||
listen_server_loop(LSock, S);
|
|
||||||
Else ->
|
|
||||||
error_logger:warning_msg("~s:run_listen_server: "
|
|
||||||
"listen to TCP port ~w: ~w\n",
|
|
||||||
[?MODULE, TcpPort, Else]),
|
|
||||||
exit({?MODULE, run_listen_server, tcp_port, TcpPort, Else})
|
|
||||||
end.
|
|
||||||
|
|
||||||
run_append_server(FluPid, #state{flu_name=Name,
|
run_append_server(FluPid, #state{flu_name=Name,
|
||||||
wedged=Wedged_p,epoch_id=EpochId}=S) ->
|
wedged=Wedged_p,epoch_id=EpochId}=S) ->
|
||||||
|
@ -220,11 +207,6 @@ run_append_server(FluPid, #state{flu_name=Name,
|
||||||
proc_lib:init_ack({ok, self()}),
|
proc_lib:init_ack({ok, self()}),
|
||||||
append_server_loop(FluPid, S#state{etstab=TID}).
|
append_server_loop(FluPid, S#state{etstab=TID}).
|
||||||
|
|
||||||
listen_server_loop(LSock, S) ->
|
|
||||||
{ok, Sock} = gen_tcp:accept(LSock),
|
|
||||||
spawn_link(fun() -> net_server_loop(Sock, S) end),
|
|
||||||
listen_server_loop(LSock, S).
|
|
||||||
|
|
||||||
append_server_loop(FluPid, #state{wedged=Wedged_p,
|
append_server_loop(FluPid, #state{wedged=Wedged_p,
|
||||||
witness=Witness_p,
|
witness=Witness_p,
|
||||||
epoch_id=OldEpochId, flu_name=FluName}=S) ->
|
epoch_id=OldEpochId, flu_name=FluName}=S) ->
|
||||||
|
@ -292,398 +274,6 @@ append_server_loop(FluPid, #state{wedged=Wedged_p,
|
||||||
append_server_loop(FluPid, S)
|
append_server_loop(FluPid, S)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
net_server_loop(Sock, S) ->
|
|
||||||
case gen_tcp:recv(Sock, 0, ?SERVER_CMD_READ_TIMEOUT) of
|
|
||||||
{ok, Bin} ->
|
|
||||||
{RespBin, S2} =
|
|
||||||
case machi_pb:decode_mpb_ll_request(Bin) of
|
|
||||||
LL_req when LL_req#mpb_ll_request.do_not_alter == 2 ->
|
|
||||||
{R, NewS} = do_pb_ll_request(LL_req, S),
|
|
||||||
{maybe_encode_response(R), mode(low, NewS)};
|
|
||||||
_ ->
|
|
||||||
HL_req = machi_pb:decode_mpb_request(Bin),
|
|
||||||
1 = HL_req#mpb_request.do_not_alter,
|
|
||||||
{R, NewS} = do_pb_hl_request(HL_req, make_high_clnt(S)),
|
|
||||||
{machi_pb:encode_mpb_response(R), mode(high, NewS)}
|
|
||||||
end,
|
|
||||||
if RespBin == async_no_response ->
|
|
||||||
net_server_loop(Sock, S2);
|
|
||||||
true ->
|
|
||||||
case gen_tcp:send(Sock, RespBin) of
|
|
||||||
ok ->
|
|
||||||
net_server_loop(Sock, S2);
|
|
||||||
{error, _} ->
|
|
||||||
(catch gen_tcp:close(Sock)),
|
|
||||||
exit(normal)
|
|
||||||
end
|
|
||||||
end;
|
|
||||||
{error, SockError} ->
|
|
||||||
Msg = io_lib:format("Socket error ~w", [SockError]),
|
|
||||||
R = #mpb_ll_response{req_id= <<>>,
|
|
||||||
generic=#mpb_errorresp{code=1, msg=Msg}},
|
|
||||||
_Resp = machi_pb:encode_mpb_ll_response(R),
|
|
||||||
%% TODO: Weird that sometimes neither catch nor try/catch
|
|
||||||
%% can prevent OTP's SASL from logging an error here.
|
|
||||||
%% Error in process <0.545.0> with exit value: {badarg,[{erlang,port_command,.......
|
|
||||||
%% TODO: is this what causes the intermittent PULSE deadlock errors?
|
|
||||||
%% _ = (catch gen_tcp:send(Sock, _Resp)), timer:sleep(1000),
|
|
||||||
(catch gen_tcp:close(Sock)),
|
|
||||||
exit(normal)
|
|
||||||
end.
|
|
||||||
|
|
||||||
maybe_encode_response(async_no_response=X) ->
|
|
||||||
X;
|
|
||||||
maybe_encode_response(R) ->
|
|
||||||
machi_pb:encode_mpb_ll_response(R).
|
|
||||||
|
|
||||||
mode(Mode, #state{pb_mode=undefined}=S) ->
|
|
||||||
S#state{pb_mode=Mode};
|
|
||||||
mode(_, S) ->
|
|
||||||
S.
|
|
||||||
|
|
||||||
make_high_clnt(#state{high_clnt=undefined}=S) ->
|
|
||||||
{ok, Proj} = machi_projection_store:read_latest_projection(
|
|
||||||
S#state.proj_store, private),
|
|
||||||
Ps = [P_srvr || {_, P_srvr} <- orddict:to_list(
|
|
||||||
Proj#projection_v1.members_dict)],
|
|
||||||
{ok, Clnt} = machi_cr_client:start_link(Ps),
|
|
||||||
S#state{high_clnt=Clnt};
|
|
||||||
make_high_clnt(S) ->
|
|
||||||
S.
|
|
||||||
|
|
||||||
do_pb_ll_request(#mpb_ll_request{req_id=ReqID}, #state{pb_mode=high}=S) ->
|
|
||||||
Result = {high_error, 41, "Low protocol request while in high mode"},
|
|
||||||
{machi_pb_translate:to_pb_response(ReqID, unused, Result), S};
|
|
||||||
do_pb_ll_request(PB_request, S) ->
|
|
||||||
Req = machi_pb_translate:from_pb_request(PB_request),
|
|
||||||
{ReqID, Cmd, Result, S2} =
|
|
||||||
case Req of
|
|
||||||
{RqID, {LowCmd, _}=CMD}
|
|
||||||
when LowCmd == low_proj;
|
|
||||||
LowCmd == low_wedge_status; LowCmd == low_list_files ->
|
|
||||||
%% Skip wedge check for projection commands!
|
|
||||||
%% Skip wedge check for these unprivileged commands
|
|
||||||
{Rs, NewS} = do_pb_ll_request3(CMD, S),
|
|
||||||
{RqID, CMD, Rs, NewS};
|
|
||||||
{RqID, CMD} ->
|
|
||||||
EpochID = element(2, CMD), % by common convention
|
|
||||||
{Rs, NewS} = do_pb_ll_request2(EpochID, CMD, S),
|
|
||||||
{RqID, CMD, Rs, NewS}
|
|
||||||
end,
|
|
||||||
{machi_pb_translate:to_pb_response(ReqID, Cmd, Result), S2}.
|
|
||||||
|
|
||||||
do_pb_ll_request2(EpochID, CMD, S) ->
|
|
||||||
{Wedged_p, CurrentEpochID} = ets:lookup_element(S#state.etstab, epoch, 2),
|
|
||||||
if Wedged_p == true ->
|
|
||||||
{{error, wedged}, S#state{epoch_id=CurrentEpochID}};
|
|
||||||
is_tuple(EpochID)
|
|
||||||
andalso
|
|
||||||
EpochID /= CurrentEpochID ->
|
|
||||||
{Epoch, _} = EpochID,
|
|
||||||
{CurrentEpoch, _} = CurrentEpochID,
|
|
||||||
if Epoch < CurrentEpoch ->
|
|
||||||
ok;
|
|
||||||
true ->
|
|
||||||
%% We're at same epoch # but different checksum, or
|
|
||||||
%% we're at a newer/bigger epoch #.
|
|
||||||
_ = wedge_myself(S#state.flu_name, CurrentEpochID),
|
|
||||||
ok
|
|
||||||
end,
|
|
||||||
{{error, bad_epoch}, S#state{epoch_id=CurrentEpochID}};
|
|
||||||
true ->
|
|
||||||
do_pb_ll_request3(CMD, S#state{epoch_id=CurrentEpochID})
|
|
||||||
end.
|
|
||||||
|
|
||||||
%% Witness status does not matter below.
|
|
||||||
do_pb_ll_request3({low_echo, _BogusEpochID, Msg}, S) ->
|
|
||||||
{Msg, S};
|
|
||||||
do_pb_ll_request3({low_auth, _BogusEpochID, _User, _Pass}, S) ->
|
|
||||||
{-6, S};
|
|
||||||
do_pb_ll_request3({low_wedge_status, _EpochID}, S) ->
|
|
||||||
{do_server_wedge_status(S), S};
|
|
||||||
do_pb_ll_request3({low_proj, PCMD}, S) ->
|
|
||||||
{do_server_proj_request(PCMD, S), S};
|
|
||||||
%% Witness status *matters* below
|
|
||||||
do_pb_ll_request3({low_append_chunk, _EpochID, CoC_Namespace, CoC_Locator,
|
|
||||||
Prefix, Chunk, CSum_tag,
|
|
||||||
CSum, ChunkExtra},
|
|
||||||
#state{witness=false}=S) ->
|
|
||||||
{do_server_append_chunk(CoC_Namespace, CoC_Locator,
|
|
||||||
Prefix, Chunk, CSum_tag, CSum,
|
|
||||||
ChunkExtra, S), S};
|
|
||||||
do_pb_ll_request3({low_write_chunk, _EpochID, File, Offset, Chunk, CSum_tag,
|
|
||||||
CSum},
|
|
||||||
#state{witness=false}=S) ->
|
|
||||||
{do_server_write_chunk(File, Offset, Chunk, CSum_tag, CSum, S), S};
|
|
||||||
do_pb_ll_request3({low_read_chunk, _EpochID, File, Offset, Size, Opts},
|
|
||||||
#state{witness=false} = S) ->
|
|
||||||
{do_server_read_chunk(File, Offset, Size, Opts, S), S};
|
|
||||||
do_pb_ll_request3({low_trim_chunk, _EpochID, File, Offset, Size, TriggerGC},
|
|
||||||
#state{witness=false}=S) ->
|
|
||||||
{do_server_trim_chunk(File, Offset, Size, TriggerGC, S), S};
|
|
||||||
do_pb_ll_request3({low_checksum_list, _EpochID, File},
|
|
||||||
#state{witness=false}=S) ->
|
|
||||||
{do_server_checksum_listing(File, S), S};
|
|
||||||
do_pb_ll_request3({low_list_files, _EpochID},
|
|
||||||
#state{witness=false}=S) ->
|
|
||||||
{do_server_list_files(S), S};
|
|
||||||
do_pb_ll_request3({low_delete_migration, _EpochID, File},
|
|
||||||
#state{witness=false}=S) ->
|
|
||||||
{do_server_delete_migration(File, S),
|
|
||||||
#state{witness=false}=S};
|
|
||||||
do_pb_ll_request3({low_trunc_hack, _EpochID, File},
|
|
||||||
#state{witness=false}=S) ->
|
|
||||||
{do_server_trunc_hack(File, S), S};
|
|
||||||
do_pb_ll_request3(_, #state{witness=true}=S) ->
|
|
||||||
{{error, bad_arg}, S}. % TODO: new status code??
|
|
||||||
|
|
||||||
do_pb_hl_request(#mpb_request{req_id=ReqID}, #state{pb_mode=low}=S) ->
|
|
||||||
Result = {low_error, 41, "High protocol request while in low mode"},
|
|
||||||
{machi_pb_translate:to_pb_response(ReqID, unused, Result), S};
|
|
||||||
do_pb_hl_request(PB_request, S) ->
|
|
||||||
{ReqID, Cmd} = machi_pb_translate:from_pb_request(PB_request),
|
|
||||||
{Result, S2} = do_pb_hl_request2(Cmd, S),
|
|
||||||
{machi_pb_translate:to_pb_response(ReqID, Cmd, Result), S2}.
|
|
||||||
|
|
||||||
do_pb_hl_request2({high_echo, Msg}, S) ->
|
|
||||||
{Msg, S};
|
|
||||||
do_pb_hl_request2({high_auth, _User, _Pass}, S) ->
|
|
||||||
{-77, S};
|
|
||||||
do_pb_hl_request2({high_append_chunk, CoC_Namespace, CoC_Locator,
|
|
||||||
Prefix, ChunkBin, TaggedCSum,
|
|
||||||
ChunkExtra}, #state{high_clnt=Clnt}=S) ->
|
|
||||||
Chunk = {TaggedCSum, ChunkBin},
|
|
||||||
Res = machi_cr_client:append_chunk_extra(Clnt, CoC_Namespace, CoC_Locator,
|
|
||||||
Prefix, Chunk,
|
|
||||||
ChunkExtra),
|
|
||||||
{Res, S};
|
|
||||||
do_pb_hl_request2({high_write_chunk, File, Offset, ChunkBin, TaggedCSum},
|
|
||||||
#state{high_clnt=Clnt}=S) ->
|
|
||||||
Chunk = {TaggedCSum, ChunkBin},
|
|
||||||
Res = machi_cr_client:write_chunk(Clnt, File, Offset, Chunk),
|
|
||||||
{Res, S};
|
|
||||||
do_pb_hl_request2({high_read_chunk, File, Offset, Size, Opts},
|
|
||||||
#state{high_clnt=Clnt}=S) ->
|
|
||||||
Res = machi_cr_client:read_chunk(Clnt, File, Offset, Size, Opts),
|
|
||||||
{Res, S};
|
|
||||||
do_pb_hl_request2({high_trim_chunk, File, Offset, Size},
|
|
||||||
#state{high_clnt=Clnt}=S) ->
|
|
||||||
Res = machi_cr_client:trim_chunk(Clnt, File, Offset, Size),
|
|
||||||
{Res, S};
|
|
||||||
do_pb_hl_request2({high_checksum_list, File}, #state{high_clnt=Clnt}=S) ->
|
|
||||||
Res = machi_cr_client:checksum_list(Clnt, File),
|
|
||||||
{Res, S};
|
|
||||||
do_pb_hl_request2({high_list_files}, #state{high_clnt=Clnt}=S) ->
|
|
||||||
Res = machi_cr_client:list_files(Clnt),
|
|
||||||
{Res, S}.
|
|
||||||
|
|
||||||
do_server_proj_request({get_latest_epochid, ProjType},
|
|
||||||
#state{proj_store=ProjStore}) ->
|
|
||||||
machi_projection_store:get_latest_epochid(ProjStore, ProjType);
|
|
||||||
do_server_proj_request({read_latest_projection, ProjType},
|
|
||||||
#state{proj_store=ProjStore}) ->
|
|
||||||
machi_projection_store:read_latest_projection(ProjStore, ProjType);
|
|
||||||
do_server_proj_request({read_projection, ProjType, Epoch},
|
|
||||||
#state{proj_store=ProjStore}) ->
|
|
||||||
machi_projection_store:read(ProjStore, ProjType, Epoch);
|
|
||||||
do_server_proj_request({write_projection, ProjType, Proj},
|
|
||||||
#state{flu_name=FluName, proj_store=ProjStore}) ->
|
|
||||||
if Proj#projection_v1.epoch_number == ?SPAM_PROJ_EPOCH ->
|
|
||||||
%% io:format(user, "DBG ~s ~w ~P\n", [?MODULE, ?LINE, Proj, 5]),
|
|
||||||
Chmgr = machi_flu_psup:make_fitness_regname(FluName),
|
|
||||||
[Map] = Proj#projection_v1.dbg,
|
|
||||||
catch machi_fitness:send_fitness_update_spam(
|
|
||||||
Chmgr, Proj#projection_v1.author_server, Map);
|
|
||||||
true ->
|
|
||||||
catch machi_projection_store:write(ProjStore, ProjType, Proj)
|
|
||||||
end;
|
|
||||||
do_server_proj_request({get_all_projections, ProjType},
|
|
||||||
#state{proj_store=ProjStore}) ->
|
|
||||||
machi_projection_store:get_all_projections(ProjStore, ProjType);
|
|
||||||
do_server_proj_request({list_all_projections, ProjType},
|
|
||||||
#state{proj_store=ProjStore}) ->
|
|
||||||
machi_projection_store:list_all_projections(ProjStore, ProjType);
|
|
||||||
do_server_proj_request({kick_projection_reaction},
|
|
||||||
#state{flu_name=FluName}) ->
|
|
||||||
%% Tell my chain manager that it might want to react to
|
|
||||||
%% this new world.
|
|
||||||
Chmgr = machi_chain_manager1:make_chmgr_regname(FluName),
|
|
||||||
spawn(fun() ->
|
|
||||||
catch machi_chain_manager1:trigger_react_to_env(Chmgr)
|
|
||||||
end),
|
|
||||||
async_no_response.
|
|
||||||
|
|
||||||
do_server_append_chunk(CoC_Namespace, CoC_Locator,
|
|
||||||
Prefix, Chunk, CSum_tag, CSum,
|
|
||||||
ChunkExtra, S) ->
|
|
||||||
case sanitize_prefix(Prefix) of
|
|
||||||
ok ->
|
|
||||||
do_server_append_chunk2(CoC_Namespace, CoC_Locator,
|
|
||||||
Prefix, Chunk, CSum_tag, CSum,
|
|
||||||
ChunkExtra, S);
|
|
||||||
_ ->
|
|
||||||
{error, bad_arg}
|
|
||||||
end.
|
|
||||||
|
|
||||||
do_server_append_chunk2(CoC_Namespace, CoC_Locator,
|
|
||||||
Prefix, Chunk, CSum_tag, Client_CSum,
|
|
||||||
ChunkExtra, #state{flu_name=FluName,
|
|
||||||
epoch_id=EpochID}=_S) ->
|
|
||||||
%% TODO: Do anything with PKey?
|
|
||||||
try
|
|
||||||
TaggedCSum = check_or_make_tagged_checksum(CSum_tag, Client_CSum,Chunk),
|
|
||||||
R = {seq_append, self(), CoC_Namespace, CoC_Locator,
|
|
||||||
Prefix, Chunk, TaggedCSum, ChunkExtra, EpochID},
|
|
||||||
FluName ! R,
|
|
||||||
receive
|
|
||||||
{assignment, Offset, File} ->
|
|
||||||
Size = iolist_size(Chunk),
|
|
||||||
{ok, {Offset, Size, File}};
|
|
||||||
witness ->
|
|
||||||
{error, bad_arg};
|
|
||||||
wedged ->
|
|
||||||
{error, wedged}
|
|
||||||
after 10*1000 ->
|
|
||||||
{error, partition}
|
|
||||||
end
|
|
||||||
catch
|
|
||||||
throw:{bad_csum, _CS} ->
|
|
||||||
{error, bad_checksum};
|
|
||||||
error:badarg ->
|
|
||||||
error_logger:error_msg("Message send to ~p gave badarg, make certain server is running with correct registered name\n", [?MODULE]),
|
|
||||||
{error, bad_arg}
|
|
||||||
end.
|
|
||||||
|
|
||||||
do_server_write_chunk(File, Offset, Chunk, CSum_tag, CSum, #state{flu_name=FluName}) ->
|
|
||||||
case sanitize_file_string(File) of
|
|
||||||
ok ->
|
|
||||||
case machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}) of
|
|
||||||
{ok, Pid} ->
|
|
||||||
Meta = [{client_csum_tag, CSum_tag}, {client_csum, CSum}],
|
|
||||||
machi_file_proxy:write(Pid, Offset, Meta, Chunk);
|
|
||||||
{error, trimmed} = Error ->
|
|
||||||
Error
|
|
||||||
end;
|
|
||||||
_ ->
|
|
||||||
{error, bad_arg}
|
|
||||||
end.
|
|
||||||
|
|
||||||
do_server_read_chunk(File, Offset, Size, Opts, #state{flu_name=FluName})->
|
|
||||||
case sanitize_file_string(File) of
|
|
||||||
ok ->
|
|
||||||
case machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}) of
|
|
||||||
{ok, Pid} ->
|
|
||||||
case machi_file_proxy:read(Pid, Offset, Size, Opts) of
|
|
||||||
%% XXX FIXME
|
|
||||||
%% For now we are omiting the checksum data because it blows up
|
|
||||||
%% protobufs.
|
|
||||||
{ok, ChunksAndTrimmed} -> {ok, ChunksAndTrimmed};
|
|
||||||
Other -> Other
|
|
||||||
end;
|
|
||||||
{error, trimmed} = Error ->
|
|
||||||
Error
|
|
||||||
end;
|
|
||||||
_ ->
|
|
||||||
{error, bad_arg}
|
|
||||||
end.
|
|
||||||
|
|
||||||
do_server_trim_chunk(File, Offset, Size, TriggerGC, #state{flu_name=FluName}) ->
|
|
||||||
lager:debug("Hi there! I'm trimming this: ~s, (~p, ~p), ~p~n",
|
|
||||||
[File, Offset, Size, TriggerGC]),
|
|
||||||
case sanitize_file_string(File) of
|
|
||||||
ok ->
|
|
||||||
case machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}) of
|
|
||||||
{ok, Pid} ->
|
|
||||||
machi_file_proxy:trim(Pid, Offset, Size, TriggerGC);
|
|
||||||
{error, trimmed} = Trimmed ->
|
|
||||||
%% Should be returned back to (maybe) trigger repair
|
|
||||||
Trimmed
|
|
||||||
end;
|
|
||||||
_ ->
|
|
||||||
{error, bad_arg}
|
|
||||||
end.
|
|
||||||
|
|
||||||
do_server_checksum_listing(File, #state{flu_name=FluName, data_dir=DataDir}=_S) ->
|
|
||||||
case sanitize_file_string(File) of
|
|
||||||
ok ->
|
|
||||||
case machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}) of
|
|
||||||
{ok, Pid} ->
|
|
||||||
{ok, List} = machi_file_proxy:checksum_list(Pid),
|
|
||||||
Bin = erlang:term_to_binary(List),
|
|
||||||
if byte_size(Bin) > (?PB_MAX_MSG_SIZE - 1024) ->
|
|
||||||
%% TODO: Fix this limitation by streaming the
|
|
||||||
%% binary in multiple smaller PB messages.
|
|
||||||
%% Also, don't read the file all at once. ^_^
|
|
||||||
error_logger:error_msg("~s:~w oversize ~s\n",
|
|
||||||
[?MODULE, ?LINE, DataDir]),
|
|
||||||
{error, bad_arg};
|
|
||||||
true ->
|
|
||||||
{ok, Bin}
|
|
||||||
end;
|
|
||||||
{error, trimmed} ->
|
|
||||||
{error, trimmed}
|
|
||||||
end;
|
|
||||||
_ ->
|
|
||||||
{error, bad_arg}
|
|
||||||
end.
|
|
||||||
|
|
||||||
do_server_list_files(#state{data_dir=DataDir}=_S) ->
|
|
||||||
{_, WildPath} = machi_util:make_data_filename(DataDir, ""),
|
|
||||||
Files = filelib:wildcard("*", WildPath),
|
|
||||||
{ok, [begin
|
|
||||||
{ok, FI} = file:read_file_info(WildPath ++ "/" ++ File),
|
|
||||||
Size = FI#file_info.size,
|
|
||||||
{Size, File}
|
|
||||||
end || File <- Files]}.
|
|
||||||
|
|
||||||
do_server_wedge_status(S) ->
|
|
||||||
{Wedged_p, CurrentEpochID0} = ets:lookup_element(S#state.etstab, epoch, 2),
|
|
||||||
CurrentEpochID = if CurrentEpochID0 == undefined ->
|
|
||||||
?DUMMY_PV1_EPOCH;
|
|
||||||
true ->
|
|
||||||
CurrentEpochID0
|
|
||||||
end,
|
|
||||||
{Wedged_p, CurrentEpochID}.
|
|
||||||
|
|
||||||
do_server_delete_migration(File, #state{data_dir=DataDir}=_S) ->
|
|
||||||
case sanitize_file_string(File) of
|
|
||||||
ok ->
|
|
||||||
{_, Path} = machi_util:make_data_filename(DataDir, File),
|
|
||||||
case file:delete(Path) of
|
|
||||||
ok ->
|
|
||||||
ok;
|
|
||||||
{error, enoent} ->
|
|
||||||
{error, no_such_file};
|
|
||||||
_ ->
|
|
||||||
{error, bad_arg}
|
|
||||||
end;
|
|
||||||
_ ->
|
|
||||||
{error, bad_arg}
|
|
||||||
end.
|
|
||||||
|
|
||||||
do_server_trunc_hack(File, #state{data_dir=DataDir}=_S) ->
|
|
||||||
case sanitize_file_string(File) of
|
|
||||||
ok ->
|
|
||||||
{_, Path} = machi_util:make_data_filename(DataDir, File),
|
|
||||||
case file:open(Path, [read, write, binary, raw]) of
|
|
||||||
{ok, FH} ->
|
|
||||||
try
|
|
||||||
{ok, ?MINIMUM_OFFSET} = file:position(FH,
|
|
||||||
?MINIMUM_OFFSET),
|
|
||||||
ok = file:truncate(FH),
|
|
||||||
ok
|
|
||||||
after
|
|
||||||
file:close(FH)
|
|
||||||
end;
|
|
||||||
{error, enoent} ->
|
|
||||||
{error, no_such_file};
|
|
||||||
_ ->
|
|
||||||
{error, bad_arg}
|
|
||||||
end;
|
|
||||||
_ ->
|
|
||||||
{error, bad_arg}
|
|
||||||
end.
|
|
||||||
|
|
||||||
append_server_dispatch(From, CoC_Namespace, CoC_Locator,
|
append_server_dispatch(From, CoC_Namespace, CoC_Locator,
|
||||||
Prefix, Chunk, CSum, Extra, FluName, EpochId) ->
|
Prefix, Chunk, CSum, Extra, FluName, EpochId) ->
|
||||||
Result = case handle_append(CoC_Namespace, CoC_Locator,
|
Result = case handle_append(CoC_Namespace, CoC_Locator,
|
||||||
|
@ -717,29 +307,6 @@ handle_append(CoC_Namespace, CoC_Locator,
|
||||||
Error
|
Error
|
||||||
end.
|
end.
|
||||||
|
|
||||||
sanitize_file_string(Str) ->
|
|
||||||
case has_no_prohibited_chars(Str) andalso machi_util:is_valid_filename(Str) of
|
|
||||||
true -> ok;
|
|
||||||
false -> error
|
|
||||||
end.
|
|
||||||
|
|
||||||
has_no_prohibited_chars(Str) ->
|
|
||||||
case re:run(Str, "/") of
|
|
||||||
nomatch ->
|
|
||||||
true;
|
|
||||||
_ ->
|
|
||||||
true
|
|
||||||
end.
|
|
||||||
|
|
||||||
sanitize_prefix(Prefix) ->
|
|
||||||
%% We are using '^' as our component delimiter
|
|
||||||
case re:run(Prefix, "/|\\^") of
|
|
||||||
nomatch ->
|
|
||||||
ok;
|
|
||||||
_ ->
|
|
||||||
error
|
|
||||||
end.
|
|
||||||
|
|
||||||
make_listener_regname(BaseName) ->
|
make_listener_regname(BaseName) ->
|
||||||
list_to_atom(atom_to_list(BaseName) ++ "_listener").
|
list_to_atom(atom_to_list(BaseName) ++ "_listener").
|
||||||
|
|
||||||
|
@ -753,26 +320,6 @@ make_listener_regname(BaseName) ->
|
||||||
make_projection_server_regname(BaseName) ->
|
make_projection_server_regname(BaseName) ->
|
||||||
list_to_atom(atom_to_list(BaseName) ++ "_pstore").
|
list_to_atom(atom_to_list(BaseName) ++ "_pstore").
|
||||||
|
|
||||||
check_or_make_tagged_checksum(?CSUM_TAG_NONE, _Client_CSum, Chunk) ->
|
|
||||||
%% TODO: If the client was foolish enough to use
|
|
||||||
%% this type of non-checksum, then the client gets
|
|
||||||
%% what it deserves wrt data integrity, alas. In
|
|
||||||
%% the client-side Chain Replication method, each
|
|
||||||
%% server will calculated this independently, which
|
|
||||||
%% isn't exactly what ought to happen for best data
|
|
||||||
%% integrity checking. In server-side CR, the csum
|
|
||||||
%% should be calculated by the head and passed down
|
|
||||||
%% the chain together with the value.
|
|
||||||
CS = machi_util:checksum_chunk(Chunk),
|
|
||||||
machi_util:make_tagged_csum(server_sha, CS);
|
|
||||||
check_or_make_tagged_checksum(?CSUM_TAG_CLIENT_SHA, Client_CSum, Chunk) ->
|
|
||||||
CS = machi_util:checksum_chunk(Chunk),
|
|
||||||
if CS == Client_CSum ->
|
|
||||||
machi_util:make_tagged_csum(server_sha,
|
|
||||||
Client_CSum);
|
|
||||||
true ->
|
|
||||||
throw({bad_csum, CS})
|
|
||||||
end.
|
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
|
|
||||||
|
|
|
@ -143,16 +143,21 @@ init([FluName, TcpPort, DataDir, Props0]) ->
|
||||||
|
|
||||||
FProxySupSpec = machi_file_proxy_sup:child_spec(FluName),
|
FProxySupSpec = machi_file_proxy_sup:child_spec(FluName),
|
||||||
|
|
||||||
|
ListenerRegName = machi_flu1:make_listener_regname(FluName),
|
||||||
|
NbAcceptors = 100,
|
||||||
|
ListenerSpec = ranch:child_spec(ListenerRegName, NbAcceptors,
|
||||||
|
ranch_tcp, [{port, TcpPort}],
|
||||||
|
machi_pb_protocol, []),
|
||||||
FluSpec = {FluName,
|
FluSpec = {FluName,
|
||||||
{machi_flu1, start_link,
|
{machi_flu1, start_link,
|
||||||
[ [{FluName, TcpPort, DataDir}|Props] ]},
|
[ [{FluName, TcpPort+1, DataDir}|Props] ]},
|
||||||
permanent, ?SHUTDOWN, worker, []},
|
permanent, ?SHUTDOWN, worker, []},
|
||||||
|
|
||||||
|
|
||||||
{ok, {SupFlags, [
|
{ok, {SupFlags, [
|
||||||
ProjSpec, FitnessSpec, MgrSpec,
|
ProjSpec, FitnessSpec, MgrSpec,
|
||||||
FProxySupSpec, FNameMgrSpec, MetaSupSpec,
|
FProxySupSpec, FNameMgrSpec, MetaSupSpec,
|
||||||
FluSpec]}}.
|
FluSpec, ListenerSpec]}}.
|
||||||
|
|
||||||
make_flu_regname(FluName) when is_atom(FluName) ->
|
make_flu_regname(FluName) when is_atom(FluName) ->
|
||||||
FluName.
|
FluName.
|
||||||
|
|
645
src/machi_pb_protocol.erl
Normal file
645
src/machi_pb_protocol.erl
Normal file
|
@ -0,0 +1,645 @@
|
||||||
|
%% -------------------------------------------------------------------
|
||||||
|
%%
|
||||||
|
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% This file is provided to you under the Apache License,
|
||||||
|
%% Version 2.0 (the "License"); you may not use this file
|
||||||
|
%% except in compliance with the License. You may obtain
|
||||||
|
%% a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing,
|
||||||
|
%% software distributed under the License is distributed on an
|
||||||
|
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
%% KIND, either express or implied. See the License for the
|
||||||
|
%% specific language governing permissions and limitations
|
||||||
|
%% under the License.
|
||||||
|
%%
|
||||||
|
%% -------------------------------------------------------------------
|
||||||
|
|
||||||
|
%% @doc Ranch protocol callback module to handle PB protocol over
|
||||||
|
%% transport
|
||||||
|
|
||||||
|
%% TODO
|
||||||
|
%% - Two modes, high and low should be separated at listener level?
|
||||||
|
|
||||||
|
-module(machi_pb_protocol).
|
||||||
|
|
||||||
|
-behaviour(gen_server).
|
||||||
|
-behaviour(ranch_protocol).
|
||||||
|
-export([start_link/4]).
|
||||||
|
-export([init/1]).
|
||||||
|
-export([handle_call/3, handle_cast/2, handle_info/2,
|
||||||
|
terminate/2, code_change/3]).
|
||||||
|
|
||||||
|
-include("machi.hrl").
|
||||||
|
-include("machi_pb.hrl").
|
||||||
|
-include("machi_projection.hrl").
|
||||||
|
-define(V(X,Y), ok).
|
||||||
|
%% -include("machi_verbose.hrl").
|
||||||
|
|
||||||
|
-ifdef(TEST).
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
-endif. % TEST
|
||||||
|
|
||||||
|
-record(state, {ref,
|
||||||
|
socket,
|
||||||
|
transport,
|
||||||
|
opts,
|
||||||
|
pb_mode,
|
||||||
|
data_dir,
|
||||||
|
witness,
|
||||||
|
%% - Used in projection related requests in low mode
|
||||||
|
%% - Used in spawning CR client in high mode
|
||||||
|
proj_store,
|
||||||
|
%%%% Low mode only
|
||||||
|
%% Current best knowledge, used for wedge_self / bad_epoch check
|
||||||
|
epoch_id,
|
||||||
|
%% Used in dispatching append_chunk* reqs to the
|
||||||
|
%% append serializing process
|
||||||
|
flu_name,
|
||||||
|
%% Stored in ETS before factorization, can be stored in the recored?
|
||||||
|
wedged,
|
||||||
|
%% Used in server_wedge_status to lookup the table
|
||||||
|
etstab,
|
||||||
|
%%%% High mode only
|
||||||
|
high_clnt,
|
||||||
|
%%%% to be removed
|
||||||
|
eof
|
||||||
|
}).
|
||||||
|
|
||||||
|
%% -record(state, {
|
||||||
|
%% used in append serializer to trigger chain mgr react_to_env
|
||||||
|
%% flu_name :: atom(),
|
||||||
|
%% proj_store :: pid(),
|
||||||
|
%% witness = false :: boolean(),
|
||||||
|
%% append_pid :: pid(),
|
||||||
|
%% tcp_port :: non_neg_integer(),
|
||||||
|
%% data_dir :: string(),
|
||||||
|
%% wedged = true :: boolean(),
|
||||||
|
%% etstab :: ets:tid(),
|
||||||
|
%% epoch_id :: 'undefined' | machi_dt:epoch_id(),
|
||||||
|
%% pb_mode = undefined :: 'undefined' | 'high' | 'low',
|
||||||
|
%% high_clnt :: 'undefined' | pid(),
|
||||||
|
%% trim_table :: ets:tid(),
|
||||||
|
%% props = [] :: list() % proplist
|
||||||
|
%% }).
|
||||||
|
|
||||||
|
-spec start_link(ranch:ref(), any(), module(), any()) -> {ok, pid()}.
|
||||||
|
start_link(Ref, Socket, Transport, Opts) ->
|
||||||
|
proc_lib:start_link(?MODULE, init, [#state{ref=Ref, socket=Socket,
|
||||||
|
transport=Transport,
|
||||||
|
opts=Opts}]).
|
||||||
|
|
||||||
|
init(#state{ref=Ref, socket=Socket, transport=Transport, opts=_Opts}=State) ->
|
||||||
|
ok = proc_lib:init_ack({ok, self()}),
|
||||||
|
%% TODO: Perform any required state initialization here.
|
||||||
|
ok = ranch:accept_ack(Ref),
|
||||||
|
ok = Transport:setopts(Socket, [{active, once}]),
|
||||||
|
gen_server:enter_loop(?MODULE, [], State).
|
||||||
|
|
||||||
|
handle_call(Request, _From, S) ->
|
||||||
|
Reply = {error, {unknown_message, Request}},
|
||||||
|
{reply, Reply, S}.
|
||||||
|
|
||||||
|
handle_cast(_Msg, S) ->
|
||||||
|
io:format(user, "~s:handle_cast: ~p\n", [?MODULE, _Msg]),
|
||||||
|
{noreply, S}.
|
||||||
|
|
||||||
|
handle_info({tcp, Sock, Data}=_Info, S) ->
|
||||||
|
io:format(user, "~s:handle_info: ~p\n", [?MODULE, _Info]),
|
||||||
|
transport_received(Sock, Data, S);
|
||||||
|
handle_info({tcp_closed, Sock}=_Info, S) ->
|
||||||
|
io:format(user, "~s:handle_info: ~p\n", [?MODULE, _Info]),
|
||||||
|
transport_closed(Sock, S);
|
||||||
|
handle_info({tcp_error, Sock, Reason}=_Info, S) ->
|
||||||
|
io:format(user, "~s:handle_info: ~p\n", [?MODULE, _Info]),
|
||||||
|
transport_error(Sock, Reason, S);
|
||||||
|
handle_info(_Info, S) ->
|
||||||
|
io:format(user, "~s:handle_info: ~p\n", [?MODULE, _Info]),
|
||||||
|
{noreply, S}.
|
||||||
|
|
||||||
|
terminate(_Reason, _S) ->
|
||||||
|
io:format(user, "~s:terminate: ~p\n", [?MODULE, _Reason]),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
code_change(_OldVsn, S, _Extra) ->
|
||||||
|
{ok, S}.
|
||||||
|
|
||||||
|
%% Internal functions, or copy-n-paste'd thingie
|
||||||
|
|
||||||
|
%%%% Just copied and will be removed %%%%
|
||||||
|
|
||||||
|
%% TODO: sock opts should be migrated to ranch equivalent
|
||||||
|
%% run_listen_server(#state{flu_name=FluName, tcp_port=TcpPort}=S) ->
|
||||||
|
%% register(make_listener_regname(FluName), self()),
|
||||||
|
%% SockOpts = ?PB_PACKET_OPTS ++
|
||||||
|
%% [{reuseaddr, true}, {mode, binary}, {active, false},
|
||||||
|
%% {backlog,8192}],
|
||||||
|
%% case gen_tcp:listen(TcpPort, SockOpts) of
|
||||||
|
%% {ok, LSock} ->
|
||||||
|
%% proc_lib:init_ack({ok, self()}),
|
||||||
|
%% listen_server_loop(LSock, S);
|
||||||
|
%% Else ->
|
||||||
|
%% error_logger:warning_msg("~s:run_listen_server: "
|
||||||
|
%% "listen to TCP port ~w: ~w\n",
|
||||||
|
%% [?MODULE, TcpPort, Else]),
|
||||||
|
%% exit({?MODULE, run_listen_server, tcp_port, TcpPort, Else})
|
||||||
|
%% end.
|
||||||
|
|
||||||
|
%% listen_server_loop(LSock, S) ->
|
||||||
|
%% {ok, Sock} = gen_tcp:accept(LSock),
|
||||||
|
%% spawn_link(fun() -> net_server_loop(Sock, S) end),
|
||||||
|
%% listen_server_loop(LSock, S).
|
||||||
|
|
||||||
|
%% net_server_loop(Sock, S) ->
|
||||||
|
%% case gen_tcp:recv(Sock, 0, ?SERVER_CMD_READ_TIMEOUT) of
|
||||||
|
%% {ok, Bin} ->
|
||||||
|
%% {RespBin, S2} =
|
||||||
|
%% case machi_pb:decode_mpb_ll_request(Bin) of
|
||||||
|
%% LL_req when LL_req#mpb_ll_request.do_not_alter == 2 ->
|
||||||
|
%% {R, NewS} = do_pb_ll_request(LL_req, S),
|
||||||
|
%% {maybe_encode_response(R), mode(low, NewS)};
|
||||||
|
%% _ ->
|
||||||
|
%% HL_req = machi_pb:decode_mpb_request(Bin),
|
||||||
|
%% 1 = HL_req#mpb_request.do_not_alter,
|
||||||
|
%% {R, NewS} = do_pb_hl_request(HL_req, make_high_clnt(S)),
|
||||||
|
%% {machi_pb:encode_mpb_response(R), mode(high, NewS)}
|
||||||
|
%% end,
|
||||||
|
%% if RespBin == async_no_response ->
|
||||||
|
%% net_server_loop(Sock, S2);
|
||||||
|
%% true ->
|
||||||
|
%% case gen_tcp:send(Sock, RespBin) of
|
||||||
|
%% ok ->
|
||||||
|
%% net_server_loop(Sock, S2);
|
||||||
|
%% {error, _} ->
|
||||||
|
%% (catch gen_tcp:close(Sock)),
|
||||||
|
%% exit(normal)
|
||||||
|
%% end
|
||||||
|
%% end;
|
||||||
|
%% {error, SockError} ->
|
||||||
|
%% Msg = io_lib:format("Socket error ~w", [SockError]),
|
||||||
|
%% R = #mpb_ll_response{req_id= <<>>,
|
||||||
|
%% generic=#mpb_errorresp{code=1, msg=Msg}},
|
||||||
|
%% _Resp = machi_pb:encode_mpb_ll_response(R),
|
||||||
|
%% %% TODO: Weird that sometimes neither catch nor try/catch
|
||||||
|
%% %% can prevent OTP's SASL from logging an error here.
|
||||||
|
%% %% Error in process <0.545.0> with exit value: {badarg,[{erlang,port_command,.......
|
||||||
|
%% %% TODO: is this what causes the intermittent PULSE deadlock errors?
|
||||||
|
%% %% _ = (catch gen_tcp:send(Sock, _Resp)), timer:sleep(1000),
|
||||||
|
%% (catch gen_tcp:close(Sock)),
|
||||||
|
%% exit(normal)
|
||||||
|
%% end.
|
||||||
|
|
||||||
|
%%%% Common transport handling
|
||||||
|
|
||||||
|
transport_received(Sock, Bin, #state{transport=Transport}=S) ->
|
||||||
|
{RespBin, S2} =
|
||||||
|
case machi_pb:decode_mpb_ll_request(Bin) of
|
||||||
|
LL_req when LL_req#mpb_ll_request.do_not_alter == 2 ->
|
||||||
|
{R, NewS} = do_pb_ll_request(LL_req, S),
|
||||||
|
{maybe_encode_response(R), mode(low, NewS)};
|
||||||
|
_ ->
|
||||||
|
HL_req = machi_pb:decode_mpb_request(Bin),
|
||||||
|
1 = HL_req#mpb_request.do_not_alter,
|
||||||
|
{R, NewS} = do_pb_hl_request(HL_req, make_high_clnt(S)),
|
||||||
|
{machi_pb:encode_mpb_response(R), mode(high, NewS)}
|
||||||
|
end,
|
||||||
|
if RespBin == async_no_response ->
|
||||||
|
{noreply, S2};
|
||||||
|
true ->
|
||||||
|
case Transport:send(Sock, RespBin) of
|
||||||
|
ok ->
|
||||||
|
{noreply, S2};
|
||||||
|
{error, Reason} ->
|
||||||
|
transport_error(Sock, Reason, S2)
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
|
transport_closed(Sock, S) ->
|
||||||
|
(catch gen_tcp:close(Sock)),
|
||||||
|
{stop, normal, S#state{sock=undefined}}.
|
||||||
|
|
||||||
|
transport_error(Sock, Reason, S) ->
|
||||||
|
Msg = io_lib:format("Socket error ~w", [SockError]),
|
||||||
|
R = #mpb_ll_response{req_id= <<>>,
|
||||||
|
generic=#mpb_errorresp{code=1, msg=Msg}},
|
||||||
|
_Resp = machi_pb:encode_mpb_ll_response(R),
|
||||||
|
%% TODO of TODO comments: comments below with four %s are copy-n-paste'd,
|
||||||
|
%% then it should be considered they are still open and should be addressed.
|
||||||
|
%%%% TODO: Weird that sometimes neither catch nor try/catch
|
||||||
|
%%%% can prevent OTP's SASL from logging an error here.
|
||||||
|
%%%% Error in process <0.545.0> with exit value: {badarg,[{erlang,port_command,.......
|
||||||
|
%%%% TODO: is this what causes the intermittent PULSE deadlock errors?
|
||||||
|
%%%% _ = (catch gen_tcp:send(Sock, _Resp)), timer:sleep(1000),
|
||||||
|
(catch gen_tcp:close(Sock)),
|
||||||
|
%% TODO: better to exit with `Reason'?
|
||||||
|
exit(normal).
|
||||||
|
|
||||||
|
maybe_encode_response(async_no_response=X) ->
|
||||||
|
X;
|
||||||
|
maybe_encode_response(R) ->
|
||||||
|
machi_pb:encode_mpb_ll_response(R).
|
||||||
|
|
||||||
|
%%%% Not categorized / not-yet-well-understood items
|
||||||
|
%% TODO: may be external API
|
||||||
|
mode(Mode, #state{pb_mode=undefined}=S) ->
|
||||||
|
S#state{pb_mode=Mode};
|
||||||
|
mode(_, S) ->
|
||||||
|
S.
|
||||||
|
|
||||||
|
|
||||||
|
%%%% Low PB mode %%%%
|
||||||
|
|
||||||
|
do_pb_ll_request(#mpb_ll_request{req_id=ReqID}, #state{pb_mode=high}=S) ->
|
||||||
|
Result = {high_error, 41, "Low protocol request while in high mode"},
|
||||||
|
{machi_pb_translate:to_pb_response(ReqID, unused, Result), S};
|
||||||
|
do_pb_ll_request(PB_request, S) ->
|
||||||
|
Req = machi_pb_translate:from_pb_request(PB_request),
|
||||||
|
{ReqID, Cmd, Result, S2} =
|
||||||
|
case Req of
|
||||||
|
{RqID, {LowCmd, _}=CMD}
|
||||||
|
when LowCmd == low_proj;
|
||||||
|
LowCmd == low_wedge_status; LowCmd == low_list_files ->
|
||||||
|
%% Skip wedge check for projection commands!
|
||||||
|
%% Skip wedge check for these unprivileged commands
|
||||||
|
{Rs, NewS} = do_pb_ll_request3(CMD, S),
|
||||||
|
{RqID, CMD, Rs, NewS};
|
||||||
|
{RqID, CMD} ->
|
||||||
|
EpochID = element(2, CMD), % by common convention
|
||||||
|
{Rs, NewS} = do_pb_ll_request2(EpochID, CMD, S),
|
||||||
|
{RqID, CMD, Rs, NewS}
|
||||||
|
end,
|
||||||
|
{machi_pb_translate:to_pb_response(ReqID, Cmd, Result), S2}.
|
||||||
|
|
||||||
|
do_pb_ll_request2(EpochID, CMD, S) ->
|
||||||
|
{Wedged_p, CurrentEpochID} = ets:lookup_element(S#state.etstab, epoch, 2),
|
||||||
|
if Wedged_p == true ->
|
||||||
|
{{error, wedged}, S#state{epoch_id=CurrentEpochID}};
|
||||||
|
is_tuple(EpochID)
|
||||||
|
andalso
|
||||||
|
EpochID /= CurrentEpochID ->
|
||||||
|
{Epoch, _} = EpochID,
|
||||||
|
{CurrentEpoch, _} = CurrentEpochID,
|
||||||
|
if Epoch < CurrentEpoch ->
|
||||||
|
ok;
|
||||||
|
true ->
|
||||||
|
%% We're at same epoch # but different checksum, or
|
||||||
|
%% we're at a newer/bigger epoch #.
|
||||||
|
_ = wedge_myself(S#state.flu_name, CurrentEpochID),
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
{{error, bad_epoch}, S#state{epoch_id=CurrentEpochID}};
|
||||||
|
true ->
|
||||||
|
do_pb_ll_request3(CMD, S#state{epoch_id=CurrentEpochID})
|
||||||
|
end.
|
||||||
|
|
||||||
|
%% Witness status does not matter below.
|
||||||
|
do_pb_ll_request3({low_echo, _BogusEpochID, Msg}, S) ->
|
||||||
|
{Msg, S};
|
||||||
|
do_pb_ll_request3({low_auth, _BogusEpochID, _User, _Pass}, S) ->
|
||||||
|
{-6, S};
|
||||||
|
do_pb_ll_request3({low_wedge_status, _EpochID}, S) ->
|
||||||
|
{do_server_wedge_status(S), S};
|
||||||
|
do_pb_ll_request3({low_proj, PCMD}, S) ->
|
||||||
|
{do_server_proj_request(PCMD, S), S};
|
||||||
|
%% Witness status *matters* below
|
||||||
|
do_pb_ll_request3({low_append_chunk, _EpochID, CoC_Namespace, CoC_Locator,
|
||||||
|
Prefix, Chunk, CSum_tag,
|
||||||
|
CSum, ChunkExtra},
|
||||||
|
#state{witness=false}=S) ->
|
||||||
|
{do_server_append_chunk(CoC_Namespace, CoC_Locator,
|
||||||
|
Prefix, Chunk, CSum_tag, CSum,
|
||||||
|
ChunkExtra, S), S};
|
||||||
|
do_pb_ll_request3({low_write_chunk, _EpochID, File, Offset, Chunk, CSum_tag,
|
||||||
|
CSum},
|
||||||
|
#state{witness=false}=S) ->
|
||||||
|
{do_server_write_chunk(File, Offset, Chunk, CSum_tag, CSum, S), S};
|
||||||
|
do_pb_ll_request3({low_read_chunk, _EpochID, File, Offset, Size, Opts},
|
||||||
|
#state{witness=false} = S) ->
|
||||||
|
{do_server_read_chunk(File, Offset, Size, Opts, S), S};
|
||||||
|
do_pb_ll_request3({low_trim_chunk, _EpochID, File, Offset, Size, TriggerGC},
|
||||||
|
#state{witness=false}=S) ->
|
||||||
|
{do_server_trim_chunk(File, Offset, Size, TriggerGC, S), S};
|
||||||
|
do_pb_ll_request3({low_checksum_list, _EpochID, File},
|
||||||
|
#state{witness=false}=S) ->
|
||||||
|
{do_server_checksum_listing(File, S), S};
|
||||||
|
do_pb_ll_request3({low_list_files, _EpochID},
|
||||||
|
#state{witness=false}=S) ->
|
||||||
|
{do_server_list_files(S), S};
|
||||||
|
do_pb_ll_request3({low_delete_migration, _EpochID, File},
|
||||||
|
#state{witness=false}=S) ->
|
||||||
|
{do_server_delete_migration(File, S),
|
||||||
|
#state{witness=false}=S};
|
||||||
|
do_pb_ll_request3({low_trunc_hack, _EpochID, File},
|
||||||
|
#state{witness=false}=S) ->
|
||||||
|
{do_server_trunc_hack(File, S), S};
|
||||||
|
do_pb_ll_request3(_, #state{witness=true}=S) ->
|
||||||
|
{{error, bad_arg}, S}. % TODO: new status code??
|
||||||
|
|
||||||
|
do_server_proj_request({get_latest_epochid, ProjType},
|
||||||
|
#state{proj_store=ProjStore}) ->
|
||||||
|
machi_projection_store:get_latest_epochid(ProjStore, ProjType);
|
||||||
|
do_server_proj_request({read_latest_projection, ProjType},
|
||||||
|
#state{proj_store=ProjStore}) ->
|
||||||
|
machi_projection_store:read_latest_projection(ProjStore, ProjType);
|
||||||
|
do_server_proj_request({read_projection, ProjType, Epoch},
|
||||||
|
#state{proj_store=ProjStore}) ->
|
||||||
|
machi_projection_store:read(ProjStore, ProjType, Epoch);
|
||||||
|
do_server_proj_request({write_projection, ProjType, Proj},
|
||||||
|
#state{flu_name=FluName, proj_store=ProjStore}) ->
|
||||||
|
if Proj#projection_v1.epoch_number == ?SPAM_PROJ_EPOCH ->
|
||||||
|
%% io:format(user, "DBG ~s ~w ~P\n", [?MODULE, ?LINE, Proj, 5]),
|
||||||
|
Chmgr = machi_flu_psup:make_fitness_regname(FluName),
|
||||||
|
[Map] = Proj#projection_v1.dbg,
|
||||||
|
catch machi_fitness:send_fitness_update_spam(
|
||||||
|
Chmgr, Proj#projection_v1.author_server, Map);
|
||||||
|
true ->
|
||||||
|
catch machi_projection_store:write(ProjStore, ProjType, Proj)
|
||||||
|
end;
|
||||||
|
do_server_proj_request({get_all_projections, ProjType},
|
||||||
|
#state{proj_store=ProjStore}) ->
|
||||||
|
machi_projection_store:get_all_projections(ProjStore, ProjType);
|
||||||
|
do_server_proj_request({list_all_projections, ProjType},
|
||||||
|
#state{proj_store=ProjStore}) ->
|
||||||
|
machi_projection_store:list_all_projections(ProjStore, ProjType);
|
||||||
|
do_server_proj_request({kick_projection_reaction},
|
||||||
|
#state{flu_name=FluName}) ->
|
||||||
|
%% Tell my chain manager that it might want to react to
|
||||||
|
%% this new world.
|
||||||
|
Chmgr = machi_chain_manager1:make_chmgr_regname(FluName),
|
||||||
|
spawn(fun() ->
|
||||||
|
catch machi_chain_manager1:trigger_react_to_env(Chmgr)
|
||||||
|
end),
|
||||||
|
async_no_response.
|
||||||
|
|
||||||
|
do_server_append_chunk(CoC_Namespace, CoC_Locator,
|
||||||
|
Prefix, Chunk, CSum_tag, CSum,
|
||||||
|
ChunkExtra, S) ->
|
||||||
|
case sanitize_prefix(Prefix) of
|
||||||
|
ok ->
|
||||||
|
do_server_append_chunk2(CoC_Namespace, CoC_Locator,
|
||||||
|
Prefix, Chunk, CSum_tag, CSum,
|
||||||
|
ChunkExtra, S);
|
||||||
|
_ ->
|
||||||
|
{error, bad_arg}
|
||||||
|
end.
|
||||||
|
|
||||||
|
do_server_append_chunk2(CoC_Namespace, CoC_Locator,
|
||||||
|
Prefix, Chunk, CSum_tag, Client_CSum,
|
||||||
|
ChunkExtra, #state{flu_name=FluName,
|
||||||
|
epoch_id=EpochID}=_S) ->
|
||||||
|
%% TODO: Do anything with PKey?
|
||||||
|
try
|
||||||
|
TaggedCSum = check_or_make_tagged_checksum(CSum_tag, Client_CSum,Chunk),
|
||||||
|
R = {seq_append, self(), CoC_Namespace, CoC_Locator,
|
||||||
|
Prefix, Chunk, TaggedCSum, ChunkExtra, EpochID},
|
||||||
|
FluName ! R,
|
||||||
|
receive
|
||||||
|
{assignment, Offset, File} ->
|
||||||
|
Size = iolist_size(Chunk),
|
||||||
|
{ok, {Offset, Size, File}};
|
||||||
|
witness ->
|
||||||
|
{error, bad_arg};
|
||||||
|
wedged ->
|
||||||
|
{error, wedged}
|
||||||
|
after 10*1000 ->
|
||||||
|
{error, partition}
|
||||||
|
end
|
||||||
|
catch
|
||||||
|
throw:{bad_csum, _CS} ->
|
||||||
|
{error, bad_checksum};
|
||||||
|
error:badarg ->
|
||||||
|
error_logger:error_msg("Message send to ~p gave badarg, make certain server is running with correct registered name\n", [?MODULE]),
|
||||||
|
{error, bad_arg}
|
||||||
|
end.
|
||||||
|
|
||||||
|
do_server_write_chunk(File, Offset, Chunk, CSum_tag, CSum, #state{flu_name=FluName}) ->
|
||||||
|
case sanitize_file_string(File) of
|
||||||
|
ok ->
|
||||||
|
case machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}) of
|
||||||
|
{ok, Pid} ->
|
||||||
|
Meta = [{client_csum_tag, CSum_tag}, {client_csum, CSum}],
|
||||||
|
machi_file_proxy:write(Pid, Offset, Meta, Chunk);
|
||||||
|
{error, trimmed} = Error ->
|
||||||
|
Error
|
||||||
|
end;
|
||||||
|
_ ->
|
||||||
|
{error, bad_arg}
|
||||||
|
end.
|
||||||
|
|
||||||
|
do_server_read_chunk(File, Offset, Size, Opts, #state{flu_name=FluName})->
|
||||||
|
case sanitize_file_string(File) of
|
||||||
|
ok ->
|
||||||
|
case machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}) of
|
||||||
|
{ok, Pid} ->
|
||||||
|
case machi_file_proxy:read(Pid, Offset, Size, Opts) of
|
||||||
|
%% XXX FIXME
|
||||||
|
%% For now we are omiting the checksum data because it blows up
|
||||||
|
%% protobufs.
|
||||||
|
{ok, ChunksAndTrimmed} -> {ok, ChunksAndTrimmed};
|
||||||
|
Other -> Other
|
||||||
|
end;
|
||||||
|
{error, trimmed} = Error ->
|
||||||
|
Error
|
||||||
|
end;
|
||||||
|
_ ->
|
||||||
|
{error, bad_arg}
|
||||||
|
end.
|
||||||
|
|
||||||
|
do_server_trim_chunk(File, Offset, Size, TriggerGC, #state{flu_name=FluName}) ->
|
||||||
|
lager:debug("Hi there! I'm trimming this: ~s, (~p, ~p), ~p~n",
|
||||||
|
[File, Offset, Size, TriggerGC]),
|
||||||
|
case sanitize_file_string(File) of
|
||||||
|
ok ->
|
||||||
|
case machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}) of
|
||||||
|
{ok, Pid} ->
|
||||||
|
machi_file_proxy:trim(Pid, Offset, Size, TriggerGC);
|
||||||
|
{error, trimmed} = Trimmed ->
|
||||||
|
%% Should be returned back to (maybe) trigger repair
|
||||||
|
Trimmed
|
||||||
|
end;
|
||||||
|
_ ->
|
||||||
|
{error, bad_arg}
|
||||||
|
end.
|
||||||
|
|
||||||
|
do_server_checksum_listing(File, #state{flu_name=FluName, data_dir=DataDir}=_S) ->
|
||||||
|
case sanitize_file_string(File) of
|
||||||
|
ok ->
|
||||||
|
case machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}) of
|
||||||
|
{ok, Pid} ->
|
||||||
|
{ok, List} = machi_file_proxy:checksum_list(Pid),
|
||||||
|
Bin = erlang:term_to_binary(List),
|
||||||
|
if byte_size(Bin) > (?PB_MAX_MSG_SIZE - 1024) ->
|
||||||
|
%% TODO: Fix this limitation by streaming the
|
||||||
|
%% binary in multiple smaller PB messages.
|
||||||
|
%% Also, don't read the file all at once. ^_^
|
||||||
|
error_logger:error_msg("~s:~w oversize ~s\n",
|
||||||
|
[?MODULE, ?LINE, DataDir]),
|
||||||
|
{error, bad_arg};
|
||||||
|
true ->
|
||||||
|
{ok, Bin}
|
||||||
|
end;
|
||||||
|
{error, trimmed} ->
|
||||||
|
{error, trimmed}
|
||||||
|
end;
|
||||||
|
_ ->
|
||||||
|
{error, bad_arg}
|
||||||
|
end.
|
||||||
|
|
||||||
|
do_server_list_files(#state{data_dir=DataDir}=_S) ->
|
||||||
|
{_, WildPath} = machi_util:make_data_filename(DataDir, ""),
|
||||||
|
Files = filelib:wildcard("*", WildPath),
|
||||||
|
{ok, [begin
|
||||||
|
{ok, FI} = file:read_file_info(WildPath ++ "/" ++ File),
|
||||||
|
Size = FI#file_info.size,
|
||||||
|
{Size, File}
|
||||||
|
end || File <- Files]}.
|
||||||
|
|
||||||
|
do_server_wedge_status(S) ->
|
||||||
|
{Wedged_p, CurrentEpochID0} = ets:lookup_element(S#state.etstab, epoch, 2),
|
||||||
|
CurrentEpochID = if CurrentEpochID0 == undefined ->
|
||||||
|
?DUMMY_PV1_EPOCH;
|
||||||
|
true ->
|
||||||
|
CurrentEpochID0
|
||||||
|
end,
|
||||||
|
{Wedged_p, CurrentEpochID}.
|
||||||
|
|
||||||
|
do_server_delete_migration(File, #state{data_dir=DataDir}=_S) ->
|
||||||
|
case sanitize_file_string(File) of
|
||||||
|
ok ->
|
||||||
|
{_, Path} = machi_util:make_data_filename(DataDir, File),
|
||||||
|
case file:delete(Path) of
|
||||||
|
ok ->
|
||||||
|
ok;
|
||||||
|
{error, enoent} ->
|
||||||
|
{error, no_such_file};
|
||||||
|
_ ->
|
||||||
|
{error, bad_arg}
|
||||||
|
end;
|
||||||
|
_ ->
|
||||||
|
{error, bad_arg}
|
||||||
|
end.
|
||||||
|
|
||||||
|
do_server_trunc_hack(File, #state{data_dir=DataDir}=_S) ->
|
||||||
|
case sanitize_file_string(File) of
|
||||||
|
ok ->
|
||||||
|
{_, Path} = machi_util:make_data_filename(DataDir, File),
|
||||||
|
case file:open(Path, [read, write, binary, raw]) of
|
||||||
|
{ok, FH} ->
|
||||||
|
try
|
||||||
|
{ok, ?MINIMUM_OFFSET} = file:position(FH,
|
||||||
|
?MINIMUM_OFFSET),
|
||||||
|
ok = file:truncate(FH),
|
||||||
|
ok
|
||||||
|
after
|
||||||
|
file:close(FH)
|
||||||
|
end;
|
||||||
|
{error, enoent} ->
|
||||||
|
{error, no_such_file};
|
||||||
|
_ ->
|
||||||
|
{error, bad_arg}
|
||||||
|
end;
|
||||||
|
_ ->
|
||||||
|
{error, bad_arg}
|
||||||
|
end.
|
||||||
|
|
||||||
|
sanitize_file_string(Str) ->
|
||||||
|
case has_no_prohibited_chars(Str) andalso machi_util:is_valid_filename(Str) of
|
||||||
|
true -> ok;
|
||||||
|
false -> error
|
||||||
|
end.
|
||||||
|
|
||||||
|
has_no_prohibited_chars(Str) ->
|
||||||
|
case re:run(Str, "/") of
|
||||||
|
nomatch ->
|
||||||
|
true;
|
||||||
|
_ ->
|
||||||
|
true
|
||||||
|
end.
|
||||||
|
|
||||||
|
sanitize_prefix(Prefix) ->
|
||||||
|
%% We are using '^' as our component delimiter
|
||||||
|
case re:run(Prefix, "/|\\^") of
|
||||||
|
nomatch ->
|
||||||
|
ok;
|
||||||
|
_ ->
|
||||||
|
error
|
||||||
|
end.
|
||||||
|
|
||||||
|
check_or_make_tagged_checksum(?CSUM_TAG_NONE, _Client_CSum, Chunk) ->
|
||||||
|
%% TODO: If the client was foolish enough to use
|
||||||
|
%% this type of non-checksum, then the client gets
|
||||||
|
%% what it deserves wrt data integrity, alas. In
|
||||||
|
%% the client-side Chain Replication method, each
|
||||||
|
%% server will calculated this independently, which
|
||||||
|
%% isn't exactly what ought to happen for best data
|
||||||
|
%% integrity checking. In server-side CR, the csum
|
||||||
|
%% should be calculated by the head and passed down
|
||||||
|
%% the chain together with the value.
|
||||||
|
CS = machi_util:checksum_chunk(Chunk),
|
||||||
|
machi_util:make_tagged_csum(server_sha, CS);
|
||||||
|
check_or_make_tagged_checksum(?CSUM_TAG_CLIENT_SHA, Client_CSum, Chunk) ->
|
||||||
|
CS = machi_util:checksum_chunk(Chunk),
|
||||||
|
if CS == Client_CSum ->
|
||||||
|
machi_util:make_tagged_csum(server_sha,
|
||||||
|
Client_CSum);
|
||||||
|
true ->
|
||||||
|
throw({bad_csum, CS})
|
||||||
|
end.
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
%%%% High PB mode %%%%
|
||||||
|
|
||||||
|
do_pb_hl_request(#mpb_request{req_id=ReqID}, #state{pb_mode=low}=S) ->
|
||||||
|
Result = {low_error, 41, "High protocol request while in low mode"},
|
||||||
|
{machi_pb_translate:to_pb_response(ReqID, unused, Result), S};
|
||||||
|
do_pb_hl_request(PB_request, S) ->
|
||||||
|
{ReqID, Cmd} = machi_pb_translate:from_pb_request(PB_request),
|
||||||
|
{Result, S2} = do_pb_hl_request2(Cmd, S),
|
||||||
|
{machi_pb_translate:to_pb_response(ReqID, Cmd, Result), S2}.
|
||||||
|
|
||||||
|
do_pb_hl_request2({high_echo, Msg}, S) ->
|
||||||
|
{Msg, S};
|
||||||
|
do_pb_hl_request2({high_auth, _User, _Pass}, S) ->
|
||||||
|
{-77, S};
|
||||||
|
do_pb_hl_request2({high_append_chunk, CoC_Namespace, CoC_Locator,
|
||||||
|
Prefix, ChunkBin, TaggedCSum,
|
||||||
|
ChunkExtra}, #state{high_clnt=Clnt}=S) ->
|
||||||
|
Chunk = {TaggedCSum, ChunkBin},
|
||||||
|
Res = machi_cr_client:append_chunk_extra(Clnt, CoC_Namespace, CoC_Locator,
|
||||||
|
Prefix, Chunk,
|
||||||
|
ChunkExtra),
|
||||||
|
{Res, S};
|
||||||
|
do_pb_hl_request2({high_write_chunk, File, Offset, ChunkBin, TaggedCSum},
|
||||||
|
#state{high_clnt=Clnt}=S) ->
|
||||||
|
Chunk = {TaggedCSum, ChunkBin},
|
||||||
|
Res = machi_cr_client:write_chunk(Clnt, File, Offset, Chunk),
|
||||||
|
{Res, S};
|
||||||
|
do_pb_hl_request2({high_read_chunk, File, Offset, Size, Opts},
|
||||||
|
#state{high_clnt=Clnt}=S) ->
|
||||||
|
Res = machi_cr_client:read_chunk(Clnt, File, Offset, Size, Opts),
|
||||||
|
{Res, S};
|
||||||
|
do_pb_hl_request2({high_trim_chunk, File, Offset, Size},
|
||||||
|
#state{high_clnt=Clnt}=S) ->
|
||||||
|
Res = machi_cr_client:trim_chunk(Clnt, File, Offset, Size),
|
||||||
|
{Res, S};
|
||||||
|
do_pb_hl_request2({high_checksum_list, File}, #state{high_clnt=Clnt}=S) ->
|
||||||
|
Res = machi_cr_client:checksum_list(Clnt, File),
|
||||||
|
{Res, S};
|
||||||
|
do_pb_hl_request2({high_list_files}, #state{high_clnt=Clnt}=S) ->
|
||||||
|
Res = machi_cr_client:list_files(Clnt),
|
||||||
|
{Res, S}.
|
||||||
|
|
||||||
|
make_high_clnt(#state{high_clnt=undefined}=S) ->
|
||||||
|
{ok, Proj} = machi_projection_store:read_latest_projection(
|
||||||
|
S#state.proj_store, private),
|
||||||
|
Ps = [P_srvr || {_, P_srvr} <- orddict:to_list(
|
||||||
|
Proj#projection_v1.members_dict)],
|
||||||
|
{ok, Clnt} = machi_cr_client:start_link(Ps),
|
||||||
|
S#state{high_clnt=Clnt};
|
||||||
|
make_high_clnt(S) ->
|
||||||
|
S.
|
|
@ -47,8 +47,6 @@ start_link() ->
|
||||||
supervisor:start_link({local, ?SERVER}, ?MODULE, []).
|
supervisor:start_link({local, ?SERVER}, ?MODULE, []).
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
%% {_, Ps} = process_info(self(), links),
|
|
||||||
%% [unlink(P) || P <- Ps],
|
|
||||||
RestartStrategy = one_for_one,
|
RestartStrategy = one_for_one,
|
||||||
MaxRestarts = 1000,
|
MaxRestarts = 1000,
|
||||||
MaxSecondsBetweenRestarts = 3600,
|
MaxSecondsBetweenRestarts = 3600,
|
||||||
|
@ -59,12 +57,8 @@ init([]) ->
|
||||||
Shutdown = ?SHUTDOWN,
|
Shutdown = ?SHUTDOWN,
|
||||||
Type = supervisor,
|
Type = supervisor,
|
||||||
|
|
||||||
ServerSup =
|
FluSup = {machi_flu_sup, {machi_flu_sup, start_link, []},
|
||||||
{machi_flu_sup, {machi_flu_sup, start_link, []},
|
|
||||||
Restart, Shutdown, Type, []},
|
Restart, Shutdown, Type, []},
|
||||||
|
RanchSup = {ranch_sup, {ranch_sup, start_link, []},
|
||||||
{ok, {SupFlags, [ServerSup]}}.
|
Restart, Shutdown, supervisor, [ranch_sup]},
|
||||||
|
{ok, {SupFlags, [FluSup, RanchSup]}}.
|
||||||
%% AChild = {'AName', {'AModule', start_link, []},
|
|
||||||
%% Restart, Shutdown, Type, ['AModule']},
|
|
||||||
%% {ok, {SupFlags, [AChild]}}.
|
|
||||||
|
|
Loading…
Reference in a new issue