From 9579b1b8b259806876eb57aa4fb4a4b7e6ce1a07 Mon Sep 17 00:00:00 2001 From: Shunichi Shinohara Date: Wed, 18 Nov 2015 15:16:04 +0900 Subject: [PATCH] Introduce ranch and add transport callback --- rebar.config | 1 + src/machi_flu1.erl | 465 +-------------------------- src/machi_flu_psup.erl | 13 +- src/machi_pb_protocol.erl | 645 ++++++++++++++++++++++++++++++++++++++ src/machi_sup.erl | 16 +- 5 files changed, 666 insertions(+), 474 deletions(-) create mode 100644 src/machi_pb_protocol.erl diff --git a/rebar.config b/rebar.config index 7bd81ce..be9b0d3 100644 --- a/rebar.config +++ b/rebar.config @@ -11,6 +11,7 @@ {lager, ".*", {git, "git://github.com/basho/lager.git", {tag, "2.2.0"}}}, {protobuffs, "0.8.*", {git, "git://github.com/basho/erlang_protobuffs.git", {tag, "0.8.1p4"}}}, {riak_dt, ".*", {git, "git://github.com/basho/riak_dt.git", {branch, "develop"}}}, + {ranch, ".*", {git, "git://github.com/ninenines/ranch.git", {branch, "master"}}}, {node_package, ".*", {git, "git://github.com/basho/node_package.git", {branch, "develop"}}}, {eper, ".*", {git, "git://github.com/basho/eper.git", {tag, "0.92-basho1"}}}, {cluster_info, ".*", {git, "git://github.com/basho/cluster_info", {branch, "develop"}}} diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl index bfe599b..c02772d 100644 --- a/src/machi_flu1.erl +++ b/src/machi_flu1.erl @@ -60,7 +60,8 @@ update_wedge_state/3, wedge_myself/2]). -export([make_listener_regname/1, make_projection_server_regname/1]). %% TODO: remove or replace in OTP way after gen_*'ified --export([main2/4, run_append_server/2, run_listen_server/1, +-export([main2/4, run_append_server/2, + %% run_listen_server/1, current_state/1, format_state/1]). -record(state, { @@ -188,27 +189,13 @@ main2(FluName, TcpPort, DataDir, Props) -> (catch exit(ListenPid, kill)), ok. -start_listen_server(S) -> - proc_lib:start_link(?MODULE, run_listen_server, [S], ?INIT_TIMEOUT). - start_append_server(S, AckPid) -> proc_lib:start_link(?MODULE, run_append_server, [AckPid, S], ?INIT_TIMEOUT). -run_listen_server(#state{flu_name=FluName, tcp_port=TcpPort}=S) -> - register(make_listener_regname(FluName), self()), - SockOpts = ?PB_PACKET_OPTS ++ - [{reuseaddr, true}, {mode, binary}, {active, false}, - {backlog,8192}], - case gen_tcp:listen(TcpPort, SockOpts) of - {ok, LSock} -> - proc_lib:init_ack({ok, self()}), - listen_server_loop(LSock, S); - Else -> - error_logger:warning_msg("~s:run_listen_server: " - "listen to TCP port ~w: ~w\n", - [?MODULE, TcpPort, Else]), - exit({?MODULE, run_listen_server, tcp_port, TcpPort, Else}) - end. +start_listen_server(_S) -> + %% FIXMEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE + %% proc_lib:start_link(?MODULE, run_listen_server, [S], ?INIT_TIMEOUT). + {ok, dummy}. run_append_server(FluPid, #state{flu_name=Name, wedged=Wedged_p,epoch_id=EpochId}=S) -> @@ -220,11 +207,6 @@ run_append_server(FluPid, #state{flu_name=Name, proc_lib:init_ack({ok, self()}), append_server_loop(FluPid, S#state{etstab=TID}). -listen_server_loop(LSock, S) -> - {ok, Sock} = gen_tcp:accept(LSock), - spawn_link(fun() -> net_server_loop(Sock, S) end), - listen_server_loop(LSock, S). - append_server_loop(FluPid, #state{wedged=Wedged_p, witness=Witness_p, epoch_id=OldEpochId, flu_name=FluName}=S) -> @@ -292,398 +274,6 @@ append_server_loop(FluPid, #state{wedged=Wedged_p, append_server_loop(FluPid, S) end. -net_server_loop(Sock, S) -> - case gen_tcp:recv(Sock, 0, ?SERVER_CMD_READ_TIMEOUT) of - {ok, Bin} -> - {RespBin, S2} = - case machi_pb:decode_mpb_ll_request(Bin) of - LL_req when LL_req#mpb_ll_request.do_not_alter == 2 -> - {R, NewS} = do_pb_ll_request(LL_req, S), - {maybe_encode_response(R), mode(low, NewS)}; - _ -> - HL_req = machi_pb:decode_mpb_request(Bin), - 1 = HL_req#mpb_request.do_not_alter, - {R, NewS} = do_pb_hl_request(HL_req, make_high_clnt(S)), - {machi_pb:encode_mpb_response(R), mode(high, NewS)} - end, - if RespBin == async_no_response -> - net_server_loop(Sock, S2); - true -> - case gen_tcp:send(Sock, RespBin) of - ok -> - net_server_loop(Sock, S2); - {error, _} -> - (catch gen_tcp:close(Sock)), - exit(normal) - end - end; - {error, SockError} -> - Msg = io_lib:format("Socket error ~w", [SockError]), - R = #mpb_ll_response{req_id= <<>>, - generic=#mpb_errorresp{code=1, msg=Msg}}, - _Resp = machi_pb:encode_mpb_ll_response(R), - %% TODO: Weird that sometimes neither catch nor try/catch - %% can prevent OTP's SASL from logging an error here. - %% Error in process <0.545.0> with exit value: {badarg,[{erlang,port_command,....... - %% TODO: is this what causes the intermittent PULSE deadlock errors? - %% _ = (catch gen_tcp:send(Sock, _Resp)), timer:sleep(1000), - (catch gen_tcp:close(Sock)), - exit(normal) - end. - -maybe_encode_response(async_no_response=X) -> - X; -maybe_encode_response(R) -> - machi_pb:encode_mpb_ll_response(R). - -mode(Mode, #state{pb_mode=undefined}=S) -> - S#state{pb_mode=Mode}; -mode(_, S) -> - S. - -make_high_clnt(#state{high_clnt=undefined}=S) -> - {ok, Proj} = machi_projection_store:read_latest_projection( - S#state.proj_store, private), - Ps = [P_srvr || {_, P_srvr} <- orddict:to_list( - Proj#projection_v1.members_dict)], - {ok, Clnt} = machi_cr_client:start_link(Ps), - S#state{high_clnt=Clnt}; -make_high_clnt(S) -> - S. - -do_pb_ll_request(#mpb_ll_request{req_id=ReqID}, #state{pb_mode=high}=S) -> - Result = {high_error, 41, "Low protocol request while in high mode"}, - {machi_pb_translate:to_pb_response(ReqID, unused, Result), S}; -do_pb_ll_request(PB_request, S) -> - Req = machi_pb_translate:from_pb_request(PB_request), - {ReqID, Cmd, Result, S2} = - case Req of - {RqID, {LowCmd, _}=CMD} - when LowCmd == low_proj; - LowCmd == low_wedge_status; LowCmd == low_list_files -> - %% Skip wedge check for projection commands! - %% Skip wedge check for these unprivileged commands - {Rs, NewS} = do_pb_ll_request3(CMD, S), - {RqID, CMD, Rs, NewS}; - {RqID, CMD} -> - EpochID = element(2, CMD), % by common convention - {Rs, NewS} = do_pb_ll_request2(EpochID, CMD, S), - {RqID, CMD, Rs, NewS} - end, - {machi_pb_translate:to_pb_response(ReqID, Cmd, Result), S2}. - -do_pb_ll_request2(EpochID, CMD, S) -> - {Wedged_p, CurrentEpochID} = ets:lookup_element(S#state.etstab, epoch, 2), - if Wedged_p == true -> - {{error, wedged}, S#state{epoch_id=CurrentEpochID}}; - is_tuple(EpochID) - andalso - EpochID /= CurrentEpochID -> - {Epoch, _} = EpochID, - {CurrentEpoch, _} = CurrentEpochID, - if Epoch < CurrentEpoch -> - ok; - true -> - %% We're at same epoch # but different checksum, or - %% we're at a newer/bigger epoch #. - _ = wedge_myself(S#state.flu_name, CurrentEpochID), - ok - end, - {{error, bad_epoch}, S#state{epoch_id=CurrentEpochID}}; - true -> - do_pb_ll_request3(CMD, S#state{epoch_id=CurrentEpochID}) - end. - -%% Witness status does not matter below. -do_pb_ll_request3({low_echo, _BogusEpochID, Msg}, S) -> - {Msg, S}; -do_pb_ll_request3({low_auth, _BogusEpochID, _User, _Pass}, S) -> - {-6, S}; -do_pb_ll_request3({low_wedge_status, _EpochID}, S) -> - {do_server_wedge_status(S), S}; -do_pb_ll_request3({low_proj, PCMD}, S) -> - {do_server_proj_request(PCMD, S), S}; -%% Witness status *matters* below -do_pb_ll_request3({low_append_chunk, _EpochID, CoC_Namespace, CoC_Locator, - Prefix, Chunk, CSum_tag, - CSum, ChunkExtra}, - #state{witness=false}=S) -> - {do_server_append_chunk(CoC_Namespace, CoC_Locator, - Prefix, Chunk, CSum_tag, CSum, - ChunkExtra, S), S}; -do_pb_ll_request3({low_write_chunk, _EpochID, File, Offset, Chunk, CSum_tag, - CSum}, - #state{witness=false}=S) -> - {do_server_write_chunk(File, Offset, Chunk, CSum_tag, CSum, S), S}; -do_pb_ll_request3({low_read_chunk, _EpochID, File, Offset, Size, Opts}, - #state{witness=false} = S) -> - {do_server_read_chunk(File, Offset, Size, Opts, S), S}; -do_pb_ll_request3({low_trim_chunk, _EpochID, File, Offset, Size, TriggerGC}, - #state{witness=false}=S) -> - {do_server_trim_chunk(File, Offset, Size, TriggerGC, S), S}; -do_pb_ll_request3({low_checksum_list, _EpochID, File}, - #state{witness=false}=S) -> - {do_server_checksum_listing(File, S), S}; -do_pb_ll_request3({low_list_files, _EpochID}, - #state{witness=false}=S) -> - {do_server_list_files(S), S}; -do_pb_ll_request3({low_delete_migration, _EpochID, File}, - #state{witness=false}=S) -> - {do_server_delete_migration(File, S), - #state{witness=false}=S}; -do_pb_ll_request3({low_trunc_hack, _EpochID, File}, - #state{witness=false}=S) -> - {do_server_trunc_hack(File, S), S}; -do_pb_ll_request3(_, #state{witness=true}=S) -> - {{error, bad_arg}, S}. % TODO: new status code?? - -do_pb_hl_request(#mpb_request{req_id=ReqID}, #state{pb_mode=low}=S) -> - Result = {low_error, 41, "High protocol request while in low mode"}, - {machi_pb_translate:to_pb_response(ReqID, unused, Result), S}; -do_pb_hl_request(PB_request, S) -> - {ReqID, Cmd} = machi_pb_translate:from_pb_request(PB_request), - {Result, S2} = do_pb_hl_request2(Cmd, S), - {machi_pb_translate:to_pb_response(ReqID, Cmd, Result), S2}. - -do_pb_hl_request2({high_echo, Msg}, S) -> - {Msg, S}; -do_pb_hl_request2({high_auth, _User, _Pass}, S) -> - {-77, S}; -do_pb_hl_request2({high_append_chunk, CoC_Namespace, CoC_Locator, - Prefix, ChunkBin, TaggedCSum, - ChunkExtra}, #state{high_clnt=Clnt}=S) -> - Chunk = {TaggedCSum, ChunkBin}, - Res = machi_cr_client:append_chunk_extra(Clnt, CoC_Namespace, CoC_Locator, - Prefix, Chunk, - ChunkExtra), - {Res, S}; -do_pb_hl_request2({high_write_chunk, File, Offset, ChunkBin, TaggedCSum}, - #state{high_clnt=Clnt}=S) -> - Chunk = {TaggedCSum, ChunkBin}, - Res = machi_cr_client:write_chunk(Clnt, File, Offset, Chunk), - {Res, S}; -do_pb_hl_request2({high_read_chunk, File, Offset, Size, Opts}, - #state{high_clnt=Clnt}=S) -> - Res = machi_cr_client:read_chunk(Clnt, File, Offset, Size, Opts), - {Res, S}; -do_pb_hl_request2({high_trim_chunk, File, Offset, Size}, - #state{high_clnt=Clnt}=S) -> - Res = machi_cr_client:trim_chunk(Clnt, File, Offset, Size), - {Res, S}; -do_pb_hl_request2({high_checksum_list, File}, #state{high_clnt=Clnt}=S) -> - Res = machi_cr_client:checksum_list(Clnt, File), - {Res, S}; -do_pb_hl_request2({high_list_files}, #state{high_clnt=Clnt}=S) -> - Res = machi_cr_client:list_files(Clnt), - {Res, S}. - -do_server_proj_request({get_latest_epochid, ProjType}, - #state{proj_store=ProjStore}) -> - machi_projection_store:get_latest_epochid(ProjStore, ProjType); -do_server_proj_request({read_latest_projection, ProjType}, - #state{proj_store=ProjStore}) -> - machi_projection_store:read_latest_projection(ProjStore, ProjType); -do_server_proj_request({read_projection, ProjType, Epoch}, - #state{proj_store=ProjStore}) -> - machi_projection_store:read(ProjStore, ProjType, Epoch); -do_server_proj_request({write_projection, ProjType, Proj}, - #state{flu_name=FluName, proj_store=ProjStore}) -> - if Proj#projection_v1.epoch_number == ?SPAM_PROJ_EPOCH -> - %% io:format(user, "DBG ~s ~w ~P\n", [?MODULE, ?LINE, Proj, 5]), - Chmgr = machi_flu_psup:make_fitness_regname(FluName), - [Map] = Proj#projection_v1.dbg, - catch machi_fitness:send_fitness_update_spam( - Chmgr, Proj#projection_v1.author_server, Map); - true -> - catch machi_projection_store:write(ProjStore, ProjType, Proj) - end; -do_server_proj_request({get_all_projections, ProjType}, - #state{proj_store=ProjStore}) -> - machi_projection_store:get_all_projections(ProjStore, ProjType); -do_server_proj_request({list_all_projections, ProjType}, - #state{proj_store=ProjStore}) -> - machi_projection_store:list_all_projections(ProjStore, ProjType); -do_server_proj_request({kick_projection_reaction}, - #state{flu_name=FluName}) -> - %% Tell my chain manager that it might want to react to - %% this new world. - Chmgr = machi_chain_manager1:make_chmgr_regname(FluName), - spawn(fun() -> - catch machi_chain_manager1:trigger_react_to_env(Chmgr) - end), - async_no_response. - -do_server_append_chunk(CoC_Namespace, CoC_Locator, - Prefix, Chunk, CSum_tag, CSum, - ChunkExtra, S) -> - case sanitize_prefix(Prefix) of - ok -> - do_server_append_chunk2(CoC_Namespace, CoC_Locator, - Prefix, Chunk, CSum_tag, CSum, - ChunkExtra, S); - _ -> - {error, bad_arg} - end. - -do_server_append_chunk2(CoC_Namespace, CoC_Locator, - Prefix, Chunk, CSum_tag, Client_CSum, - ChunkExtra, #state{flu_name=FluName, - epoch_id=EpochID}=_S) -> - %% TODO: Do anything with PKey? - try - TaggedCSum = check_or_make_tagged_checksum(CSum_tag, Client_CSum,Chunk), - R = {seq_append, self(), CoC_Namespace, CoC_Locator, - Prefix, Chunk, TaggedCSum, ChunkExtra, EpochID}, - FluName ! R, - receive - {assignment, Offset, File} -> - Size = iolist_size(Chunk), - {ok, {Offset, Size, File}}; - witness -> - {error, bad_arg}; - wedged -> - {error, wedged} - after 10*1000 -> - {error, partition} - end - catch - throw:{bad_csum, _CS} -> - {error, bad_checksum}; - error:badarg -> - error_logger:error_msg("Message send to ~p gave badarg, make certain server is running with correct registered name\n", [?MODULE]), - {error, bad_arg} - end. - -do_server_write_chunk(File, Offset, Chunk, CSum_tag, CSum, #state{flu_name=FluName}) -> - case sanitize_file_string(File) of - ok -> - case machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}) of - {ok, Pid} -> - Meta = [{client_csum_tag, CSum_tag}, {client_csum, CSum}], - machi_file_proxy:write(Pid, Offset, Meta, Chunk); - {error, trimmed} = Error -> - Error - end; - _ -> - {error, bad_arg} - end. - -do_server_read_chunk(File, Offset, Size, Opts, #state{flu_name=FluName})-> - case sanitize_file_string(File) of - ok -> - case machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}) of - {ok, Pid} -> - case machi_file_proxy:read(Pid, Offset, Size, Opts) of - %% XXX FIXME - %% For now we are omiting the checksum data because it blows up - %% protobufs. - {ok, ChunksAndTrimmed} -> {ok, ChunksAndTrimmed}; - Other -> Other - end; - {error, trimmed} = Error -> - Error - end; - _ -> - {error, bad_arg} - end. - -do_server_trim_chunk(File, Offset, Size, TriggerGC, #state{flu_name=FluName}) -> - lager:debug("Hi there! I'm trimming this: ~s, (~p, ~p), ~p~n", - [File, Offset, Size, TriggerGC]), - case sanitize_file_string(File) of - ok -> - case machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}) of - {ok, Pid} -> - machi_file_proxy:trim(Pid, Offset, Size, TriggerGC); - {error, trimmed} = Trimmed -> - %% Should be returned back to (maybe) trigger repair - Trimmed - end; - _ -> - {error, bad_arg} - end. - -do_server_checksum_listing(File, #state{flu_name=FluName, data_dir=DataDir}=_S) -> - case sanitize_file_string(File) of - ok -> - case machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}) of - {ok, Pid} -> - {ok, List} = machi_file_proxy:checksum_list(Pid), - Bin = erlang:term_to_binary(List), - if byte_size(Bin) > (?PB_MAX_MSG_SIZE - 1024) -> - %% TODO: Fix this limitation by streaming the - %% binary in multiple smaller PB messages. - %% Also, don't read the file all at once. ^_^ - error_logger:error_msg("~s:~w oversize ~s\n", - [?MODULE, ?LINE, DataDir]), - {error, bad_arg}; - true -> - {ok, Bin} - end; - {error, trimmed} -> - {error, trimmed} - end; - _ -> - {error, bad_arg} - end. - -do_server_list_files(#state{data_dir=DataDir}=_S) -> - {_, WildPath} = machi_util:make_data_filename(DataDir, ""), - Files = filelib:wildcard("*", WildPath), - {ok, [begin - {ok, FI} = file:read_file_info(WildPath ++ "/" ++ File), - Size = FI#file_info.size, - {Size, File} - end || File <- Files]}. - -do_server_wedge_status(S) -> - {Wedged_p, CurrentEpochID0} = ets:lookup_element(S#state.etstab, epoch, 2), - CurrentEpochID = if CurrentEpochID0 == undefined -> - ?DUMMY_PV1_EPOCH; - true -> - CurrentEpochID0 - end, - {Wedged_p, CurrentEpochID}. - -do_server_delete_migration(File, #state{data_dir=DataDir}=_S) -> - case sanitize_file_string(File) of - ok -> - {_, Path} = machi_util:make_data_filename(DataDir, File), - case file:delete(Path) of - ok -> - ok; - {error, enoent} -> - {error, no_such_file}; - _ -> - {error, bad_arg} - end; - _ -> - {error, bad_arg} - end. - -do_server_trunc_hack(File, #state{data_dir=DataDir}=_S) -> - case sanitize_file_string(File) of - ok -> - {_, Path} = machi_util:make_data_filename(DataDir, File), - case file:open(Path, [read, write, binary, raw]) of - {ok, FH} -> - try - {ok, ?MINIMUM_OFFSET} = file:position(FH, - ?MINIMUM_OFFSET), - ok = file:truncate(FH), - ok - after - file:close(FH) - end; - {error, enoent} -> - {error, no_such_file}; - _ -> - {error, bad_arg} - end; - _ -> - {error, bad_arg} - end. - append_server_dispatch(From, CoC_Namespace, CoC_Locator, Prefix, Chunk, CSum, Extra, FluName, EpochId) -> Result = case handle_append(CoC_Namespace, CoC_Locator, @@ -717,29 +307,6 @@ handle_append(CoC_Namespace, CoC_Locator, Error end. -sanitize_file_string(Str) -> - case has_no_prohibited_chars(Str) andalso machi_util:is_valid_filename(Str) of - true -> ok; - false -> error - end. - -has_no_prohibited_chars(Str) -> - case re:run(Str, "/") of - nomatch -> - true; - _ -> - true - end. - -sanitize_prefix(Prefix) -> - %% We are using '^' as our component delimiter - case re:run(Prefix, "/|\\^") of - nomatch -> - ok; - _ -> - error - end. - make_listener_regname(BaseName) -> list_to_atom(atom_to_list(BaseName) ++ "_listener"). @@ -753,26 +320,6 @@ make_listener_regname(BaseName) -> make_projection_server_regname(BaseName) -> list_to_atom(atom_to_list(BaseName) ++ "_pstore"). -check_or_make_tagged_checksum(?CSUM_TAG_NONE, _Client_CSum, Chunk) -> - %% TODO: If the client was foolish enough to use - %% this type of non-checksum, then the client gets - %% what it deserves wrt data integrity, alas. In - %% the client-side Chain Replication method, each - %% server will calculated this independently, which - %% isn't exactly what ought to happen for best data - %% integrity checking. In server-side CR, the csum - %% should be calculated by the head and passed down - %% the chain together with the value. - CS = machi_util:checksum_chunk(Chunk), - machi_util:make_tagged_csum(server_sha, CS); -check_or_make_tagged_checksum(?CSUM_TAG_CLIENT_SHA, Client_CSum, Chunk) -> - CS = machi_util:checksum_chunk(Chunk), - if CS == Client_CSum -> - machi_util:make_tagged_csum(server_sha, - Client_CSum); - true -> - throw({bad_csum, CS}) - end. -ifdef(TEST). diff --git a/src/machi_flu_psup.erl b/src/machi_flu_psup.erl index 9e568cd..b2ccdcc 100644 --- a/src/machi_flu_psup.erl +++ b/src/machi_flu_psup.erl @@ -143,16 +143,21 @@ init([FluName, TcpPort, DataDir, Props0]) -> FProxySupSpec = machi_file_proxy_sup:child_spec(FluName), + ListenerRegName = machi_flu1:make_listener_regname(FluName), + NbAcceptors = 100, + ListenerSpec = ranch:child_spec(ListenerRegName, NbAcceptors, + ranch_tcp, [{port, TcpPort}], + machi_pb_protocol, []), FluSpec = {FluName, {machi_flu1, start_link, - [ [{FluName, TcpPort, DataDir}|Props] ]}, + [ [{FluName, TcpPort+1, DataDir}|Props] ]}, permanent, ?SHUTDOWN, worker, []}, {ok, {SupFlags, [ - ProjSpec, FitnessSpec, MgrSpec, - FProxySupSpec, FNameMgrSpec, MetaSupSpec, - FluSpec]}}. + ProjSpec, FitnessSpec, MgrSpec, + FProxySupSpec, FNameMgrSpec, MetaSupSpec, + FluSpec, ListenerSpec]}}. make_flu_regname(FluName) when is_atom(FluName) -> FluName. diff --git a/src/machi_pb_protocol.erl b/src/machi_pb_protocol.erl new file mode 100644 index 0000000..28ce626 --- /dev/null +++ b/src/machi_pb_protocol.erl @@ -0,0 +1,645 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +%% @doc Ranch protocol callback module to handle PB protocol over +%% transport + +%% TODO +%% - Two modes, high and low should be separated at listener level? + +-module(machi_pb_protocol). + +-behaviour(gen_server). +-behaviour(ranch_protocol). +-export([start_link/4]). +-export([init/1]). +-export([handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-include("machi.hrl"). +-include("machi_pb.hrl"). +-include("machi_projection.hrl"). +-define(V(X,Y), ok). +%% -include("machi_verbose.hrl"). + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). +-endif. % TEST + +-record(state, {ref, + socket, + transport, + opts, + pb_mode, + data_dir, + witness, + %% - Used in projection related requests in low mode + %% - Used in spawning CR client in high mode + proj_store, + %%%% Low mode only + %% Current best knowledge, used for wedge_self / bad_epoch check + epoch_id, + %% Used in dispatching append_chunk* reqs to the + %% append serializing process + flu_name, + %% Stored in ETS before factorization, can be stored in the recored? + wedged, + %% Used in server_wedge_status to lookup the table + etstab, + %%%% High mode only + high_clnt, + %%%% to be removed + eof + }). + +%% -record(state, { + %% used in append serializer to trigger chain mgr react_to_env +%% flu_name :: atom(), +%% proj_store :: pid(), +%% witness = false :: boolean(), +%% append_pid :: pid(), +%% tcp_port :: non_neg_integer(), +%% data_dir :: string(), +%% wedged = true :: boolean(), +%% etstab :: ets:tid(), +%% epoch_id :: 'undefined' | machi_dt:epoch_id(), +%% pb_mode = undefined :: 'undefined' | 'high' | 'low', +%% high_clnt :: 'undefined' | pid(), +%% trim_table :: ets:tid(), +%% props = [] :: list() % proplist +%% }). + +-spec start_link(ranch:ref(), any(), module(), any()) -> {ok, pid()}. +start_link(Ref, Socket, Transport, Opts) -> + proc_lib:start_link(?MODULE, init, [#state{ref=Ref, socket=Socket, + transport=Transport, + opts=Opts}]). + +init(#state{ref=Ref, socket=Socket, transport=Transport, opts=_Opts}=State) -> + ok = proc_lib:init_ack({ok, self()}), + %% TODO: Perform any required state initialization here. + ok = ranch:accept_ack(Ref), + ok = Transport:setopts(Socket, [{active, once}]), + gen_server:enter_loop(?MODULE, [], State). + +handle_call(Request, _From, S) -> + Reply = {error, {unknown_message, Request}}, + {reply, Reply, S}. + +handle_cast(_Msg, S) -> + io:format(user, "~s:handle_cast: ~p\n", [?MODULE, _Msg]), + {noreply, S}. + +handle_info({tcp, Sock, Data}=_Info, S) -> + io:format(user, "~s:handle_info: ~p\n", [?MODULE, _Info]), + transport_received(Sock, Data, S); +handle_info({tcp_closed, Sock}=_Info, S) -> + io:format(user, "~s:handle_info: ~p\n", [?MODULE, _Info]), + transport_closed(Sock, S); +handle_info({tcp_error, Sock, Reason}=_Info, S) -> + io:format(user, "~s:handle_info: ~p\n", [?MODULE, _Info]), + transport_error(Sock, Reason, S); +handle_info(_Info, S) -> + io:format(user, "~s:handle_info: ~p\n", [?MODULE, _Info]), + {noreply, S}. + +terminate(_Reason, _S) -> + io:format(user, "~s:terminate: ~p\n", [?MODULE, _Reason]), + ok. + +code_change(_OldVsn, S, _Extra) -> + {ok, S}. + +%% Internal functions, or copy-n-paste'd thingie + +%%%% Just copied and will be removed %%%% + +%% TODO: sock opts should be migrated to ranch equivalent +%% run_listen_server(#state{flu_name=FluName, tcp_port=TcpPort}=S) -> +%% register(make_listener_regname(FluName), self()), +%% SockOpts = ?PB_PACKET_OPTS ++ +%% [{reuseaddr, true}, {mode, binary}, {active, false}, +%% {backlog,8192}], +%% case gen_tcp:listen(TcpPort, SockOpts) of +%% {ok, LSock} -> +%% proc_lib:init_ack({ok, self()}), +%% listen_server_loop(LSock, S); +%% Else -> +%% error_logger:warning_msg("~s:run_listen_server: " +%% "listen to TCP port ~w: ~w\n", +%% [?MODULE, TcpPort, Else]), +%% exit({?MODULE, run_listen_server, tcp_port, TcpPort, Else}) +%% end. + +%% listen_server_loop(LSock, S) -> +%% {ok, Sock} = gen_tcp:accept(LSock), +%% spawn_link(fun() -> net_server_loop(Sock, S) end), +%% listen_server_loop(LSock, S). + +%% net_server_loop(Sock, S) -> +%% case gen_tcp:recv(Sock, 0, ?SERVER_CMD_READ_TIMEOUT) of +%% {ok, Bin} -> +%% {RespBin, S2} = +%% case machi_pb:decode_mpb_ll_request(Bin) of +%% LL_req when LL_req#mpb_ll_request.do_not_alter == 2 -> +%% {R, NewS} = do_pb_ll_request(LL_req, S), +%% {maybe_encode_response(R), mode(low, NewS)}; +%% _ -> +%% HL_req = machi_pb:decode_mpb_request(Bin), +%% 1 = HL_req#mpb_request.do_not_alter, +%% {R, NewS} = do_pb_hl_request(HL_req, make_high_clnt(S)), +%% {machi_pb:encode_mpb_response(R), mode(high, NewS)} +%% end, +%% if RespBin == async_no_response -> +%% net_server_loop(Sock, S2); +%% true -> +%% case gen_tcp:send(Sock, RespBin) of +%% ok -> +%% net_server_loop(Sock, S2); +%% {error, _} -> +%% (catch gen_tcp:close(Sock)), +%% exit(normal) +%% end +%% end; +%% {error, SockError} -> +%% Msg = io_lib:format("Socket error ~w", [SockError]), +%% R = #mpb_ll_response{req_id= <<>>, +%% generic=#mpb_errorresp{code=1, msg=Msg}}, +%% _Resp = machi_pb:encode_mpb_ll_response(R), +%% %% TODO: Weird that sometimes neither catch nor try/catch +%% %% can prevent OTP's SASL from logging an error here. +%% %% Error in process <0.545.0> with exit value: {badarg,[{erlang,port_command,....... +%% %% TODO: is this what causes the intermittent PULSE deadlock errors? +%% %% _ = (catch gen_tcp:send(Sock, _Resp)), timer:sleep(1000), +%% (catch gen_tcp:close(Sock)), +%% exit(normal) +%% end. + +%%%% Common transport handling + +transport_received(Sock, Bin, #state{transport=Transport}=S) -> + {RespBin, S2} = + case machi_pb:decode_mpb_ll_request(Bin) of + LL_req when LL_req#mpb_ll_request.do_not_alter == 2 -> + {R, NewS} = do_pb_ll_request(LL_req, S), + {maybe_encode_response(R), mode(low, NewS)}; + _ -> + HL_req = machi_pb:decode_mpb_request(Bin), + 1 = HL_req#mpb_request.do_not_alter, + {R, NewS} = do_pb_hl_request(HL_req, make_high_clnt(S)), + {machi_pb:encode_mpb_response(R), mode(high, NewS)} + end, + if RespBin == async_no_response -> + {noreply, S2}; + true -> + case Transport:send(Sock, RespBin) of + ok -> + {noreply, S2}; + {error, Reason} -> + transport_error(Sock, Reason, S2) + end + end. + +transport_closed(Sock, S) -> + (catch gen_tcp:close(Sock)), + {stop, normal, S#state{sock=undefined}}. + +transport_error(Sock, Reason, S) -> + Msg = io_lib:format("Socket error ~w", [SockError]), + R = #mpb_ll_response{req_id= <<>>, + generic=#mpb_errorresp{code=1, msg=Msg}}, + _Resp = machi_pb:encode_mpb_ll_response(R), + %% TODO of TODO comments: comments below with four %s are copy-n-paste'd, + %% then it should be considered they are still open and should be addressed. + %%%% TODO: Weird that sometimes neither catch nor try/catch + %%%% can prevent OTP's SASL from logging an error here. + %%%% Error in process <0.545.0> with exit value: {badarg,[{erlang,port_command,....... + %%%% TODO: is this what causes the intermittent PULSE deadlock errors? + %%%% _ = (catch gen_tcp:send(Sock, _Resp)), timer:sleep(1000), + (catch gen_tcp:close(Sock)), + %% TODO: better to exit with `Reason'? + exit(normal). + +maybe_encode_response(async_no_response=X) -> + X; +maybe_encode_response(R) -> + machi_pb:encode_mpb_ll_response(R). + +%%%% Not categorized / not-yet-well-understood items +%% TODO: may be external API +mode(Mode, #state{pb_mode=undefined}=S) -> + S#state{pb_mode=Mode}; +mode(_, S) -> + S. + + +%%%% Low PB mode %%%% + +do_pb_ll_request(#mpb_ll_request{req_id=ReqID}, #state{pb_mode=high}=S) -> + Result = {high_error, 41, "Low protocol request while in high mode"}, + {machi_pb_translate:to_pb_response(ReqID, unused, Result), S}; +do_pb_ll_request(PB_request, S) -> + Req = machi_pb_translate:from_pb_request(PB_request), + {ReqID, Cmd, Result, S2} = + case Req of + {RqID, {LowCmd, _}=CMD} + when LowCmd == low_proj; + LowCmd == low_wedge_status; LowCmd == low_list_files -> + %% Skip wedge check for projection commands! + %% Skip wedge check for these unprivileged commands + {Rs, NewS} = do_pb_ll_request3(CMD, S), + {RqID, CMD, Rs, NewS}; + {RqID, CMD} -> + EpochID = element(2, CMD), % by common convention + {Rs, NewS} = do_pb_ll_request2(EpochID, CMD, S), + {RqID, CMD, Rs, NewS} + end, + {machi_pb_translate:to_pb_response(ReqID, Cmd, Result), S2}. + +do_pb_ll_request2(EpochID, CMD, S) -> + {Wedged_p, CurrentEpochID} = ets:lookup_element(S#state.etstab, epoch, 2), + if Wedged_p == true -> + {{error, wedged}, S#state{epoch_id=CurrentEpochID}}; + is_tuple(EpochID) + andalso + EpochID /= CurrentEpochID -> + {Epoch, _} = EpochID, + {CurrentEpoch, _} = CurrentEpochID, + if Epoch < CurrentEpoch -> + ok; + true -> + %% We're at same epoch # but different checksum, or + %% we're at a newer/bigger epoch #. + _ = wedge_myself(S#state.flu_name, CurrentEpochID), + ok + end, + {{error, bad_epoch}, S#state{epoch_id=CurrentEpochID}}; + true -> + do_pb_ll_request3(CMD, S#state{epoch_id=CurrentEpochID}) + end. + +%% Witness status does not matter below. +do_pb_ll_request3({low_echo, _BogusEpochID, Msg}, S) -> + {Msg, S}; +do_pb_ll_request3({low_auth, _BogusEpochID, _User, _Pass}, S) -> + {-6, S}; +do_pb_ll_request3({low_wedge_status, _EpochID}, S) -> + {do_server_wedge_status(S), S}; +do_pb_ll_request3({low_proj, PCMD}, S) -> + {do_server_proj_request(PCMD, S), S}; +%% Witness status *matters* below +do_pb_ll_request3({low_append_chunk, _EpochID, CoC_Namespace, CoC_Locator, + Prefix, Chunk, CSum_tag, + CSum, ChunkExtra}, + #state{witness=false}=S) -> + {do_server_append_chunk(CoC_Namespace, CoC_Locator, + Prefix, Chunk, CSum_tag, CSum, + ChunkExtra, S), S}; +do_pb_ll_request3({low_write_chunk, _EpochID, File, Offset, Chunk, CSum_tag, + CSum}, + #state{witness=false}=S) -> + {do_server_write_chunk(File, Offset, Chunk, CSum_tag, CSum, S), S}; +do_pb_ll_request3({low_read_chunk, _EpochID, File, Offset, Size, Opts}, + #state{witness=false} = S) -> + {do_server_read_chunk(File, Offset, Size, Opts, S), S}; +do_pb_ll_request3({low_trim_chunk, _EpochID, File, Offset, Size, TriggerGC}, + #state{witness=false}=S) -> + {do_server_trim_chunk(File, Offset, Size, TriggerGC, S), S}; +do_pb_ll_request3({low_checksum_list, _EpochID, File}, + #state{witness=false}=S) -> + {do_server_checksum_listing(File, S), S}; +do_pb_ll_request3({low_list_files, _EpochID}, + #state{witness=false}=S) -> + {do_server_list_files(S), S}; +do_pb_ll_request3({low_delete_migration, _EpochID, File}, + #state{witness=false}=S) -> + {do_server_delete_migration(File, S), + #state{witness=false}=S}; +do_pb_ll_request3({low_trunc_hack, _EpochID, File}, + #state{witness=false}=S) -> + {do_server_trunc_hack(File, S), S}; +do_pb_ll_request3(_, #state{witness=true}=S) -> + {{error, bad_arg}, S}. % TODO: new status code?? + +do_server_proj_request({get_latest_epochid, ProjType}, + #state{proj_store=ProjStore}) -> + machi_projection_store:get_latest_epochid(ProjStore, ProjType); +do_server_proj_request({read_latest_projection, ProjType}, + #state{proj_store=ProjStore}) -> + machi_projection_store:read_latest_projection(ProjStore, ProjType); +do_server_proj_request({read_projection, ProjType, Epoch}, + #state{proj_store=ProjStore}) -> + machi_projection_store:read(ProjStore, ProjType, Epoch); +do_server_proj_request({write_projection, ProjType, Proj}, + #state{flu_name=FluName, proj_store=ProjStore}) -> + if Proj#projection_v1.epoch_number == ?SPAM_PROJ_EPOCH -> + %% io:format(user, "DBG ~s ~w ~P\n", [?MODULE, ?LINE, Proj, 5]), + Chmgr = machi_flu_psup:make_fitness_regname(FluName), + [Map] = Proj#projection_v1.dbg, + catch machi_fitness:send_fitness_update_spam( + Chmgr, Proj#projection_v1.author_server, Map); + true -> + catch machi_projection_store:write(ProjStore, ProjType, Proj) + end; +do_server_proj_request({get_all_projections, ProjType}, + #state{proj_store=ProjStore}) -> + machi_projection_store:get_all_projections(ProjStore, ProjType); +do_server_proj_request({list_all_projections, ProjType}, + #state{proj_store=ProjStore}) -> + machi_projection_store:list_all_projections(ProjStore, ProjType); +do_server_proj_request({kick_projection_reaction}, + #state{flu_name=FluName}) -> + %% Tell my chain manager that it might want to react to + %% this new world. + Chmgr = machi_chain_manager1:make_chmgr_regname(FluName), + spawn(fun() -> + catch machi_chain_manager1:trigger_react_to_env(Chmgr) + end), + async_no_response. + +do_server_append_chunk(CoC_Namespace, CoC_Locator, + Prefix, Chunk, CSum_tag, CSum, + ChunkExtra, S) -> + case sanitize_prefix(Prefix) of + ok -> + do_server_append_chunk2(CoC_Namespace, CoC_Locator, + Prefix, Chunk, CSum_tag, CSum, + ChunkExtra, S); + _ -> + {error, bad_arg} + end. + +do_server_append_chunk2(CoC_Namespace, CoC_Locator, + Prefix, Chunk, CSum_tag, Client_CSum, + ChunkExtra, #state{flu_name=FluName, + epoch_id=EpochID}=_S) -> + %% TODO: Do anything with PKey? + try + TaggedCSum = check_or_make_tagged_checksum(CSum_tag, Client_CSum,Chunk), + R = {seq_append, self(), CoC_Namespace, CoC_Locator, + Prefix, Chunk, TaggedCSum, ChunkExtra, EpochID}, + FluName ! R, + receive + {assignment, Offset, File} -> + Size = iolist_size(Chunk), + {ok, {Offset, Size, File}}; + witness -> + {error, bad_arg}; + wedged -> + {error, wedged} + after 10*1000 -> + {error, partition} + end + catch + throw:{bad_csum, _CS} -> + {error, bad_checksum}; + error:badarg -> + error_logger:error_msg("Message send to ~p gave badarg, make certain server is running with correct registered name\n", [?MODULE]), + {error, bad_arg} + end. + +do_server_write_chunk(File, Offset, Chunk, CSum_tag, CSum, #state{flu_name=FluName}) -> + case sanitize_file_string(File) of + ok -> + case machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}) of + {ok, Pid} -> + Meta = [{client_csum_tag, CSum_tag}, {client_csum, CSum}], + machi_file_proxy:write(Pid, Offset, Meta, Chunk); + {error, trimmed} = Error -> + Error + end; + _ -> + {error, bad_arg} + end. + +do_server_read_chunk(File, Offset, Size, Opts, #state{flu_name=FluName})-> + case sanitize_file_string(File) of + ok -> + case machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}) of + {ok, Pid} -> + case machi_file_proxy:read(Pid, Offset, Size, Opts) of + %% XXX FIXME + %% For now we are omiting the checksum data because it blows up + %% protobufs. + {ok, ChunksAndTrimmed} -> {ok, ChunksAndTrimmed}; + Other -> Other + end; + {error, trimmed} = Error -> + Error + end; + _ -> + {error, bad_arg} + end. + +do_server_trim_chunk(File, Offset, Size, TriggerGC, #state{flu_name=FluName}) -> + lager:debug("Hi there! I'm trimming this: ~s, (~p, ~p), ~p~n", + [File, Offset, Size, TriggerGC]), + case sanitize_file_string(File) of + ok -> + case machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}) of + {ok, Pid} -> + machi_file_proxy:trim(Pid, Offset, Size, TriggerGC); + {error, trimmed} = Trimmed -> + %% Should be returned back to (maybe) trigger repair + Trimmed + end; + _ -> + {error, bad_arg} + end. + +do_server_checksum_listing(File, #state{flu_name=FluName, data_dir=DataDir}=_S) -> + case sanitize_file_string(File) of + ok -> + case machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}) of + {ok, Pid} -> + {ok, List} = machi_file_proxy:checksum_list(Pid), + Bin = erlang:term_to_binary(List), + if byte_size(Bin) > (?PB_MAX_MSG_SIZE - 1024) -> + %% TODO: Fix this limitation by streaming the + %% binary in multiple smaller PB messages. + %% Also, don't read the file all at once. ^_^ + error_logger:error_msg("~s:~w oversize ~s\n", + [?MODULE, ?LINE, DataDir]), + {error, bad_arg}; + true -> + {ok, Bin} + end; + {error, trimmed} -> + {error, trimmed} + end; + _ -> + {error, bad_arg} + end. + +do_server_list_files(#state{data_dir=DataDir}=_S) -> + {_, WildPath} = machi_util:make_data_filename(DataDir, ""), + Files = filelib:wildcard("*", WildPath), + {ok, [begin + {ok, FI} = file:read_file_info(WildPath ++ "/" ++ File), + Size = FI#file_info.size, + {Size, File} + end || File <- Files]}. + +do_server_wedge_status(S) -> + {Wedged_p, CurrentEpochID0} = ets:lookup_element(S#state.etstab, epoch, 2), + CurrentEpochID = if CurrentEpochID0 == undefined -> + ?DUMMY_PV1_EPOCH; + true -> + CurrentEpochID0 + end, + {Wedged_p, CurrentEpochID}. + +do_server_delete_migration(File, #state{data_dir=DataDir}=_S) -> + case sanitize_file_string(File) of + ok -> + {_, Path} = machi_util:make_data_filename(DataDir, File), + case file:delete(Path) of + ok -> + ok; + {error, enoent} -> + {error, no_such_file}; + _ -> + {error, bad_arg} + end; + _ -> + {error, bad_arg} + end. + +do_server_trunc_hack(File, #state{data_dir=DataDir}=_S) -> + case sanitize_file_string(File) of + ok -> + {_, Path} = machi_util:make_data_filename(DataDir, File), + case file:open(Path, [read, write, binary, raw]) of + {ok, FH} -> + try + {ok, ?MINIMUM_OFFSET} = file:position(FH, + ?MINIMUM_OFFSET), + ok = file:truncate(FH), + ok + after + file:close(FH) + end; + {error, enoent} -> + {error, no_such_file}; + _ -> + {error, bad_arg} + end; + _ -> + {error, bad_arg} + end. + +sanitize_file_string(Str) -> + case has_no_prohibited_chars(Str) andalso machi_util:is_valid_filename(Str) of + true -> ok; + false -> error + end. + +has_no_prohibited_chars(Str) -> + case re:run(Str, "/") of + nomatch -> + true; + _ -> + true + end. + +sanitize_prefix(Prefix) -> + %% We are using '^' as our component delimiter + case re:run(Prefix, "/|\\^") of + nomatch -> + ok; + _ -> + error + end. + +check_or_make_tagged_checksum(?CSUM_TAG_NONE, _Client_CSum, Chunk) -> + %% TODO: If the client was foolish enough to use + %% this type of non-checksum, then the client gets + %% what it deserves wrt data integrity, alas. In + %% the client-side Chain Replication method, each + %% server will calculated this independently, which + %% isn't exactly what ought to happen for best data + %% integrity checking. In server-side CR, the csum + %% should be calculated by the head and passed down + %% the chain together with the value. + CS = machi_util:checksum_chunk(Chunk), + machi_util:make_tagged_csum(server_sha, CS); +check_or_make_tagged_checksum(?CSUM_TAG_CLIENT_SHA, Client_CSum, Chunk) -> + CS = machi_util:checksum_chunk(Chunk), + if CS == Client_CSum -> + machi_util:make_tagged_csum(server_sha, + Client_CSum); + true -> + throw({bad_csum, CS}) + end. + + + + +%%%% High PB mode %%%% + +do_pb_hl_request(#mpb_request{req_id=ReqID}, #state{pb_mode=low}=S) -> + Result = {low_error, 41, "High protocol request while in low mode"}, + {machi_pb_translate:to_pb_response(ReqID, unused, Result), S}; +do_pb_hl_request(PB_request, S) -> + {ReqID, Cmd} = machi_pb_translate:from_pb_request(PB_request), + {Result, S2} = do_pb_hl_request2(Cmd, S), + {machi_pb_translate:to_pb_response(ReqID, Cmd, Result), S2}. + +do_pb_hl_request2({high_echo, Msg}, S) -> + {Msg, S}; +do_pb_hl_request2({high_auth, _User, _Pass}, S) -> + {-77, S}; +do_pb_hl_request2({high_append_chunk, CoC_Namespace, CoC_Locator, + Prefix, ChunkBin, TaggedCSum, + ChunkExtra}, #state{high_clnt=Clnt}=S) -> + Chunk = {TaggedCSum, ChunkBin}, + Res = machi_cr_client:append_chunk_extra(Clnt, CoC_Namespace, CoC_Locator, + Prefix, Chunk, + ChunkExtra), + {Res, S}; +do_pb_hl_request2({high_write_chunk, File, Offset, ChunkBin, TaggedCSum}, + #state{high_clnt=Clnt}=S) -> + Chunk = {TaggedCSum, ChunkBin}, + Res = machi_cr_client:write_chunk(Clnt, File, Offset, Chunk), + {Res, S}; +do_pb_hl_request2({high_read_chunk, File, Offset, Size, Opts}, + #state{high_clnt=Clnt}=S) -> + Res = machi_cr_client:read_chunk(Clnt, File, Offset, Size, Opts), + {Res, S}; +do_pb_hl_request2({high_trim_chunk, File, Offset, Size}, + #state{high_clnt=Clnt}=S) -> + Res = machi_cr_client:trim_chunk(Clnt, File, Offset, Size), + {Res, S}; +do_pb_hl_request2({high_checksum_list, File}, #state{high_clnt=Clnt}=S) -> + Res = machi_cr_client:checksum_list(Clnt, File), + {Res, S}; +do_pb_hl_request2({high_list_files}, #state{high_clnt=Clnt}=S) -> + Res = machi_cr_client:list_files(Clnt), + {Res, S}. + +make_high_clnt(#state{high_clnt=undefined}=S) -> + {ok, Proj} = machi_projection_store:read_latest_projection( + S#state.proj_store, private), + Ps = [P_srvr || {_, P_srvr} <- orddict:to_list( + Proj#projection_v1.members_dict)], + {ok, Clnt} = machi_cr_client:start_link(Ps), + S#state{high_clnt=Clnt}; +make_high_clnt(S) -> + S. diff --git a/src/machi_sup.erl b/src/machi_sup.erl index d4ab71a..729b6a7 100644 --- a/src/machi_sup.erl +++ b/src/machi_sup.erl @@ -47,8 +47,6 @@ start_link() -> supervisor:start_link({local, ?SERVER}, ?MODULE, []). init([]) -> - %% {_, Ps} = process_info(self(), links), - %% [unlink(P) || P <- Ps], RestartStrategy = one_for_one, MaxRestarts = 1000, MaxSecondsBetweenRestarts = 3600, @@ -59,12 +57,8 @@ init([]) -> Shutdown = ?SHUTDOWN, Type = supervisor, - ServerSup = - {machi_flu_sup, {machi_flu_sup, start_link, []}, - Restart, Shutdown, Type, []}, - - {ok, {SupFlags, [ServerSup]}}. - - %% AChild = {'AName', {'AModule', start_link, []}, - %% Restart, Shutdown, Type, ['AModule']}, - %% {ok, {SupFlags, [AChild]}}. + FluSup = {machi_flu_sup, {machi_flu_sup, start_link, []}, + Restart, Shutdown, Type, []}, + RanchSup = {ranch_sup, {ranch_sup, start_link, []}, + Restart, Shutdown, supervisor, [ranch_sup]}, + {ok, {SupFlags, [FluSup, RanchSup]}}.