diff --git a/src/machi_cinfo.erl b/src/machi_cinfo.erl index 32cfa69..5968449 100644 --- a/src/machi_cinfo.erl +++ b/src/machi_cinfo.erl @@ -85,8 +85,8 @@ fitness(FluName) -> -spec flu1(atom()) -> [{atom(), term()}]. flu1(FluName) -> - State = machi_flu1:current_state(FluName), - machi_flu1:format_state(State). + State = machi_flu1_append_server:current_state(FluName), + machi_flu1_append_server:format_state(State). %% Internal functions diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl index e620308..b75d955 100644 --- a/src/machi_flu1.erl +++ b/src/machi_flu1.erl @@ -21,7 +21,9 @@ %% @doc The Machi FLU file server + file location sequencer. %% %% This module implements only the Machi FLU file server and its -%% implicit sequencer. +%% implicit sequencer together with listener, append server, +%% file management and file proxy processes. + %% Please see the EDoc "Overview" for details about the FLU as a %% primitive file server process vs. the larger Machi design of a FLU %% as a sequencer + file server + chain manager group of processes. @@ -54,27 +56,16 @@ -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). +-export([timing_demo_test_COMMENTED_/0, sort_2lines/2]). % Just to suppress warning -endif. % TEST -export([start_link/1, stop/1, update_wedge_state/3, wedge_myself/2]). --export([make_projection_server_regname/1]). +-export([make_projection_server_regname/1, + ets_table_name/1]). %% TODO: remove or replace in OTP way after gen_*'ified --export([main2/4, run_append_server/2, - current_state/1, format_state/1]). +-export([main2/4]). --record(state, { - flu_name :: atom(), - proj_store :: pid(), - witness = false :: boolean(), - append_pid :: pid(), - wedged = true :: boolean(), - etstab :: ets:tid(), - epoch_id :: 'undefined' | machi_dt:epoch_id(), - props = [] :: list() % proplist - }). - --define(SERVER_CMD_READ_TIMEOUT, 600*1000). -define(INIT_TIMEOUT, 60*1000). start_link([{FluName, TcpPort, DataDir}|Rest]) @@ -96,33 +87,14 @@ stop(Pid) when is_pid(Pid) -> error end. -update_wedge_state(PidSpec, Boolean, EpochId) - when (Boolean == true orelse Boolean == false), is_tuple(EpochId) -> - PidSpec ! {wedge_state_change, Boolean, EpochId}. +update_wedge_state(PidSpec, Boolean, EpochId) -> + machi_flu1_append_server:int_update_wedge_state(PidSpec, Boolean, EpochId). -wedge_myself(PidSpec, EpochId) - when is_tuple(EpochId) -> - PidSpec ! {wedge_myself, EpochId}. - -current_state(PidSpec) -> - PidSpec ! {current_state, self()}, - %% TODO: Not so rubust f(^^;) - receive - Res -> Res - after - 60*1000 -> {error, timeout} - end. - -format_state(State) -> - Fields = record_info(fields, state), - [_Name | Values] = tuple_to_list(State), - lists:zip(Fields, Values). +wedge_myself(PidSpec, EpochId) -> + machi_flu1_append_server:int_wedge_myself(PidSpec, EpochId). %%%%%%%%%%%%%%%%%%%%%%%%%%%% -ets_table_name(FluName) when is_atom(FluName) -> - list_to_atom(atom_to_list(FluName) ++ "_epoch"). - main2(FluName, TcpPort, DataDir, Props) -> {SendAppendPidToProj_p, ProjectionPid} = case proplists:get_value(projection_store_registered_name, Props) of @@ -149,23 +121,15 @@ main2(FluName, TcpPort, DataDir, Props) -> {true, undefined} end, Witness_p = proplists:get_value(witness_mode, Props, false), - - S0 = #state{flu_name=FluName, - proj_store=ProjectionPid, - wedged=Wedged_p, - witness=Witness_p, - etstab=ets_table_name(FluName), - epoch_id=EpochId, - props=Props}, - {ok, AppendPid} = start_append_server(S0, self()), + + {ok, AppendPid} = start_append_server(FluName, Witness_p, Wedged_p, EpochId), if SendAppendPidToProj_p -> - machi_projection_store:set_wedge_notify_pid(ProjectionPid, - AppendPid); + machi_projection_store:set_wedge_notify_pid(ProjectionPid, AppendPid); true -> ok end, - S1 = S0#state{append_pid=AppendPid}, - {ok, ListenerPid} = start_listen_server(TcpPort, DataDir, S1), + {ok, ListenerPid} = start_listen_server(FluName, TcpPort, Witness_p, DataDir, + ets_table_name(FluName), ProjectionPid), %% io:format(user, "Listener started: ~w~n", [{FluName, ListenerPid}]), Config_e = machi_util:make_config_filename(DataDir, "unused"), @@ -176,135 +140,23 @@ main2(FluName, TcpPort, DataDir, Props) -> ok = filelib:ensure_dir(Projection_e), put(flu_flu_name, FluName), - put(flu_append_pid, S1#state.append_pid), + put(flu_append_pid, AppendPid), put(flu_projection_pid, ProjectionPid), put(flu_listen_pid, ListenerPid), proc_lib:init_ack({ok, self()}), receive killme -> ok end, - (catch exit(S1#state.append_pid, kill)), + (catch exit(AppendPid, kill)), (catch exit(ProjectionPid, kill)), (catch exit(ListenerPid, kill)), ok. -start_append_server(S, AckPid) -> - proc_lib:start_link(?MODULE, run_append_server, [AckPid, S], ?INIT_TIMEOUT). +start_append_server(FluName, Witness_p, Wedged_p, EpochId) -> + machi_flu1_subsup:start_append_server(FluName, Witness_p, Wedged_p, EpochId). -start_listen_server(TcpPort, DataDir, - #state{flu_name=FluName, witness=Witness, etstab=EtsTab, - proj_store=ProjStore}=_S) -> - machi_listener_sup:start_listener(FluName, TcpPort, Witness, DataDir, - EtsTab, ProjStore). - -run_append_server(FluPid, #state{flu_name=Name, - wedged=Wedged_p,epoch_id=EpochId}=S) -> - %% Reminder: Name is the "main" name of the FLU, i.e., no suffix - register(Name, self()), - TID = ets:new(ets_table_name(Name), - [set, protected, named_table, {read_concurrency, true}]), - ets:insert(TID, {epoch, {Wedged_p, EpochId}}), - proc_lib:init_ack({ok, self()}), - append_server_loop(FluPid, S#state{etstab=TID}). - -append_server_loop(FluPid, #state{wedged=Wedged_p, - witness=Witness_p, - epoch_id=OldEpochId, flu_name=FluName}=S) -> - receive - {seq_append, From, _N, _L, _Prefix, _Chunk, _CSum, _Extra, _EpochID} - when Witness_p -> - %% The FLU's machi_flu1_net_server process ought to filter all - %% witness states, but we'll keep this clause for extra - %% paranoia. - From ! witness, - append_server_loop(FluPid, S); - {seq_append, From, _N, _L, _Prefix, _Chunk, _CSum, _Extra, _EpochID} - when Wedged_p -> - From ! wedged, - append_server_loop(FluPid, S); - {seq_append, From, CoC_Namespace, CoC_Locator, - Prefix, Chunk, CSum, Extra, EpochID} -> - %% Old is the one from our state, plain old 'EpochID' comes - %% from the client. - _ = case OldEpochId == EpochID of - true -> - spawn(fun() -> - append_server_dispatch(From, CoC_Namespace, CoC_Locator, - Prefix, Chunk, CSum, Extra, - FluName, EpochID) - end); - false -> - From ! {error, bad_epoch} - end, - append_server_loop(FluPid, S); - {wedge_myself, WedgeEpochId} -> - if not Wedged_p andalso WedgeEpochId == OldEpochId -> - true = ets:insert(S#state.etstab, - {epoch, {true, OldEpochId}}), - %% Tell my chain manager that it might want to react to - %% this new world. - Chmgr = machi_chain_manager1:make_chmgr_regname(FluName), - spawn(fun() -> - catch machi_chain_manager1:trigger_react_to_env(Chmgr) - end), - append_server_loop(FluPid, S#state{wedged=true}); - true -> - append_server_loop(FluPid, S) - end; - {wedge_state_change, Boolean, {NewEpoch, _}=NewEpochId} -> - OldEpoch = case OldEpochId of {OldE, _} -> OldE; - undefined -> -1 - end, - if NewEpoch >= OldEpoch -> - true = ets:insert(S#state.etstab, - {epoch, {Boolean, NewEpochId}}), - append_server_loop(FluPid, S#state{wedged=Boolean, - epoch_id=NewEpochId}); - true -> - append_server_loop(FluPid, S) - end; - {wedge_status, FromPid} -> - #state{wedged=Wedged_p, epoch_id=EpochId} = S, - FromPid ! {wedge_status_reply, Wedged_p, EpochId}, - append_server_loop(FluPid, S); - {current_state, FromPid} -> - FromPid ! S; - Else -> - io:format(user, "append_server_loop: WHA? ~p\n", [Else]), - append_server_loop(FluPid, S) - end. - -append_server_dispatch(From, CoC_Namespace, CoC_Locator, - Prefix, Chunk, CSum, Extra, FluName, EpochId) -> - Result = case handle_append(CoC_Namespace, CoC_Locator, - Prefix, Chunk, CSum, Extra, FluName, EpochId) of - {ok, File, Offset} -> - {assignment, Offset, File}; - Other -> - Other - end, - From ! Result, - exit(normal). - -handle_append(_N, _L, _Prefix, <<>>, _Csum, _Extra, _FluName, _EpochId) -> - {error, bad_arg}; -handle_append(CoC_Namespace, CoC_Locator, - Prefix, Chunk, Csum, Extra, FluName, EpochId) -> - CoC = {coc, CoC_Namespace, CoC_Locator}, - Res = machi_flu_filename_mgr:find_or_make_filename_from_prefix( - FluName, EpochId, {prefix, Prefix}, CoC), - case Res of - {file, F} -> - case machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, F}) of - {ok, Pid} -> - {Tag, CS} = machi_util:unmake_tagged_csum(Csum), - Meta = [{client_csum_tag, Tag}, {client_csum, CS}], - machi_file_proxy:append(Pid, Meta, Extra, Chunk); - {error, trimmed} = E -> - E - end; - Error -> - Error - end. +start_listen_server(FluName, TcpPort, Witness_p, DataDir, EtsTab, ProjectionPid) -> + machi_flu1_subsup:start_listener(FluName, TcpPort, Witness_p, DataDir, + EtsTab, ProjectionPid). %% This is the name of the projection store that is spawned by the %% *flu*, for use primarily in testing scenarios. In normal use, we @@ -316,6 +168,8 @@ handle_append(CoC_Namespace, CoC_Locator, make_projection_server_regname(BaseName) -> list_to_atom(atom_to_list(BaseName) ++ "_pstore"). +ets_table_name(FluName) when is_atom(FluName) -> + list_to_atom(atom_to_list(FluName) ++ "_epoch"). -ifdef(TEST). @@ -357,7 +211,7 @@ timing_demo_test2() -> lists:foldl(fun(X, _) -> B = machi_checksums:encode_csum_file_entry_hex(X, 100, CSum), %% file:write(ZZZ, [B, 10]), - machi_checksums:decode_csum_file_entry_hex(list_to_binary(B)) + decode_csum_file_entry_hex(list_to_binary(B)) end, x, Xs) end), io:format(user, "~.3f sec\n", [HexUSec / 1000000]), diff --git a/src/machi_flu1_append_server.erl b/src/machi_flu1_append_server.erl new file mode 100644 index 0000000..a7b029c --- /dev/null +++ b/src/machi_flu1_append_server.erl @@ -0,0 +1,195 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +%% @doc Machi FLU1 append serialization server process + +-module(machi_flu1_append_server). + +-behavior(gen_server). + +-include("machi.hrl"). +-include("machi_projection.hrl"). + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). +-endif. % TEST + +-export([start_link/4]). + +-export([init/1]). +-export([handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). +-export([int_update_wedge_state/3, int_wedge_myself/2]). +-export([current_state/1, format_state/1]). + +-record(state, { + flu_name :: atom(), + witness = false :: boolean(), + wedged = true :: boolean(), + etstab :: ets:tid(), + epoch_id :: 'undefined' | machi_dt:epoch_id() + }). + +-define(INIT_TIMEOUT, 60*1000). +-define(CALL_TIMEOUT, 60*1000). + +-spec start_link(pv1_server(), boolean(), boolean(), + undefined | machi_dt:epoch_id()) -> {ok, pid()}. +start_link(Fluname, Witness_p, Wedged_p, EpochId) -> + %% Reminder: Name is the "main" name of the FLU, i.e., no suffix + gen_server:start_link({local, Fluname}, + ?MODULE, [Fluname, Witness_p, Wedged_p, EpochId], + [{timeout, ?INIT_TIMEOUT}]). + +-spec current_state(atom() | pid()) -> term(). +current_state(PidSpec) -> + gen_server:call(PidSpec, current_state, ?CALL_TIMEOUT). + +format_state(State) -> + Fields = record_info(fields, state), + [_Name | Values] = tuple_to_list(State), + lists:zip(Fields, Values). + +int_update_wedge_state(PidSpec, Boolean, EpochId) + when is_boolean(Boolean), is_tuple(EpochId) -> + gen_server:cast(PidSpec, {wedge_state_change, Boolean, EpochId}). + +int_wedge_myself(PidSpec, EpochId) + when is_tuple(EpochId) -> + gen_server:cast(PidSpec, {wedge_myself, EpochId}). + +init([Fluname, Witness_p, Wedged_p, EpochId]) -> + TID = ets:new(machi_flu1:ets_table_name(Fluname), + [set, protected, named_table, {read_concurrency, true}]), + ets:insert(TID, {epoch, {Wedged_p, EpochId}}), + {ok, #state{flu_name=Fluname, witness=Witness_p, wedged=Wedged_p, + etstab=TID, epoch_id=EpochId}}. + +handle_call({seq_append, _From2, _N, _L, _Prefix, _Chunk, _CSum, _Extra, _EpochID}, + _From, #state{witness=true}=S) -> + %% The FLU's machi_flu1_net_server process ought to filter all + %% witness states, but we'll keep this clause for extra + %% paranoia. + {reply, witness, S}; +handle_call({seq_append, _From2, _N, _L, _Prefix, _Chunk, _CSum, _Extra, _EpochID}, + _From, #state{wedged=true}=S) -> + {reply, wedged, S}; +handle_call({seq_append, _From2, CoC_Namespace, CoC_Locator, + Prefix, Chunk, CSum, Extra, EpochID}, + From, #state{flu_name=FluName, epoch_id=OldEpochId}=S) -> + %% Old is the one from our state, plain old 'EpochID' comes + %% from the client. + _ = case OldEpochId of + EpochID -> + spawn(fun() -> + append_server_dispatch(From, CoC_Namespace, CoC_Locator, + Prefix, Chunk, CSum, Extra, + FluName, EpochID) + end), + {noreply, S}; + _ -> + {reply, {error, bad_epoch}, S} + end; +%% TODO: Who sends this message? +handle_call(wedge_status, _From, + #state{wedged=Wedged_p, epoch_id=EpochId} = S) -> + {reply, {wedge_status_reply, Wedged_p, EpochId}, S}; +handle_call(current_state, _From, S) -> + {reply, S, S}; +handle_call(Else, From, S) -> + io:format(user, "~s:handle_call: WHA? from=~w ~w\n", [?MODULE, From, Else]), + {noreply, S}. + +handle_cast({wedge_myself, WedgeEpochId}, + #state{flu_name=FluName, wedged=Wedged_p, epoch_id=OldEpochId}=S) -> + if not Wedged_p andalso WedgeEpochId == OldEpochId -> + true = ets:insert(S#state.etstab, + {epoch, {true, OldEpochId}}), + %% Tell my chain manager that it might want to react to + %% this new world. + Chmgr = machi_chain_manager1:make_chmgr_regname(FluName), + spawn(fun() -> + catch machi_chain_manager1:trigger_react_to_env(Chmgr) + end), + {noreply, S#state{wedged=true}}; + true -> + {noreply, S} + end; +handle_cast({wedge_state_change, Boolean, {NewEpoch, _}=NewEpochId}, + #state{epoch_id=OldEpochId}=S) -> + OldEpoch = case OldEpochId of {OldE, _} -> OldE; + undefined -> -1 + end, + if NewEpoch >= OldEpoch -> + true = ets:insert(S#state.etstab, + {epoch, {Boolean, NewEpochId}}), + {noreply, S#state{wedged=Boolean, epoch_id=NewEpochId}}; + true -> + {noreply, S} + end; +handle_cast(Else, S) -> + io:format(user, "~s:handle_cast: WHA? ~p\n", [?MODULE, Else]), + {noreply, S}. + +handle_info(Else, S) -> + io:format(user, "~s:handle_info: WHA? ~p\n", [?MODULE, Else]), + {noreply, S}. + +terminate(normal, _S) -> + ok; +terminate(Reason, _S) -> + lager:warning("~s:terminate: ~w", [?MODULE, Reason]), + ok. + +code_change(_OldVsn, S, _Extra) -> + {ok, S}. + +append_server_dispatch(From, CoC_Namespace, CoC_Locator, + Prefix, Chunk, CSum, Extra, FluName, EpochId) -> + Result = case handle_append(CoC_Namespace, CoC_Locator, + Prefix, Chunk, CSum, Extra, FluName, EpochId) of + {ok, File, Offset} -> + {assignment, Offset, File}; + Other -> + Other + end, + _ = gen_server:reply(From, Result), + ok. + +handle_append(_N, _L, _Prefix, <<>>, _Csum, _Extra, _FluName, _EpochId) -> + {error, bad_arg}; +handle_append(CoC_Namespace, CoC_Locator, + Prefix, Chunk, Csum, Extra, FluName, EpochId) -> + CoC = {coc, CoC_Namespace, CoC_Locator}, + Res = machi_flu_filename_mgr:find_or_make_filename_from_prefix( + FluName, EpochId, {prefix, Prefix}, CoC), + case Res of + {file, F} -> + case machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, F}) of + {ok, Pid} -> + {Tag, CS} = machi_util:unmake_tagged_csum(Csum), + Meta = [{client_csum_tag, Tag}, {client_csum, CS}], + machi_file_proxy:append(Pid, Meta, Extra, Chunk); + {error, trimmed} = E -> + E + end; + Error -> + Error + end. diff --git a/src/machi_flu1_net_server.erl b/src/machi_flu1_net_server.erl index 93e3675..6610230 100644 --- a/src/machi_flu1_net_server.erl +++ b/src/machi_flu1_net_server.erl @@ -355,16 +355,15 @@ do_server_append_chunk2(CoC_Namespace, CoC_Locator, TaggedCSum = check_or_make_tagged_checksum(CSum_tag, Client_CSum,Chunk), R = {seq_append, self(), CoC_Namespace, CoC_Locator, Prefix, Chunk, TaggedCSum, ChunkExtra, EpochID}, - FluName ! R, - receive + case gen_server:call(FluName, R, 10*1000) of {assignment, Offset, File} -> Size = iolist_size(Chunk), {ok, {Offset, Size, File}}; witness -> {error, bad_arg}; wedged -> - {error, wedged} - after 10*1000 -> + {error, wedged}; + {error, timeout} -> {error, partition} end catch diff --git a/src/machi_flu1_subsup.erl b/src/machi_flu1_subsup.erl new file mode 100644 index 0000000..21fd6f5 --- /dev/null +++ b/src/machi_flu1_subsup.erl @@ -0,0 +1,115 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +%% @doc A supervisor to hold dynamic processes inside single +%% FLU service, ranch listener and append server. + +%% TODO: This supervisor is maybe useless. First introduced for +%% workaround to start listener dynamically in flu1 initialization +%% phase. Because `machi_flu_psup' is being blocked in flu1 +%% initialization time, adding a child to the supervisor leads to +%% deadlock. If initialization can be done only by static arguments, +%% then this supervisor should be removed and added as a direct child +%% of `machi_flu_psup'. + +-module(machi_flu1_subsup). +-behaviour(supervisor). + +%% public API +-export([start_link/1, + start_append_server/4, + stop_append_server/1, + start_listener/6, + stop_listener/1, + subsup_name/1, + listener_name/1]). + +%% supervisor callback +-export([init/1]). + +-include("machi_projection.hrl"). + +-define(SHUTDOWN, 5000). +-define(BACKLOG, 8192). + +-spec start_link(pv1_server()) -> {ok, pid()}. +start_link(FluName) -> + supervisor:start_link({local, subsup_name(FluName)}, ?MODULE, []). + +-spec start_append_server(pv1_server(), boolean(), boolean(), + undefined | machi_dt:epoch_id()) -> + {ok, pid()}. +start_append_server(FluName, Witness_p, Wedged_p, EpochId) -> + supervisor:start_child(subsup_name(FluName), + append_server_spec(FluName, Witness_p, Wedged_p, EpochId)). + +-spec stop_append_server(pv1_server()) -> ok. +stop_append_server(FluName) -> + SubSup = listener_name(FluName), + ok = supervisor:terminate_child(SubSup, FluName), + ok = supervisor:delete_child(SubSup, FluName). + +-spec start_listener(pv1_server(), inet:port_number(), boolean(), + string(), ets:tab(), atom() | pid()) -> {ok, pid()}. +start_listener(FluName, TcpPort, Witness, DataDir, EpochTab, ProjStore) -> + supervisor:start_child(subsup_name(FluName), + listener_spec(FluName, TcpPort, Witness, DataDir, + EpochTab, ProjStore)). + +-spec stop_listener(pv1_server()) -> ok. +stop_listener(FluName) -> + SupName = subsup_name(FluName), + ListenerName = listener_name(FluName), + ok = supervisor:terminate_child(SupName, ListenerName), + ok = supervisor:delete_child(SupName, ListenerName). + +-spec subsup_name(pv1_server()) -> atom(). +subsup_name(FluName) when is_atom(FluName) -> + list_to_atom(atom_to_list(FluName) ++ "_flu1_subsup"). + +-spec listener_name(pv1_server()) -> atom(). +listener_name(FluName) -> + list_to_atom(atom_to_list(FluName) ++ "_listener"). + +%% Supervisor callback + +init([]) -> + SupFlags = {one_for_all, 1000, 10}, + {ok, {SupFlags, []}}. + +%% private + +-spec listener_spec(pv1_server(), inet:port_number(), boolean(), + string(), ets:tab(), atom() | pid()) -> supervisor:child_spec(). +listener_spec(FluName, TcpPort, Witness, DataDir, EpochTab, ProjStore) -> + ListenerName = listener_name(FluName), + NbAcceptors = 10, + TcpOpts = [{port, TcpPort}, {backlog, ?BACKLOG}], + NetServerOpts = [FluName, Witness, DataDir, EpochTab, ProjStore], + ranch:child_spec(ListenerName, NbAcceptors, + ranch_tcp, TcpOpts, + machi_flu1_net_server, NetServerOpts). + +-spec append_server_spec(pv1_server(), boolean(), boolean(), + undefined | machi_dt:epoch_id()) -> supervisor:child_spec(). +append_server_spec(FluName, Witness_p, Wedged_p, EpochId) -> + {FluName, {machi_flu1_append_server, start_link, + [FluName, Witness_p, Wedged_p, EpochId]}, + permanent, ?SHUTDOWN, worker, [machi_flu1_append_server]}. diff --git a/src/machi_flu_filename_mgr.erl b/src/machi_flu_filename_mgr.erl index 45e580e..293fdc3 100644 --- a/src/machi_flu_filename_mgr.erl +++ b/src/machi_flu_filename_mgr.erl @@ -67,12 +67,12 @@ ]). -define(TIMEOUT, 10 * 1000). --include("machi_projection.hrl"). %% included for pv1_epoch_n type +-include("machi_projection.hrl"). %% included for pv1_epoch type -record(state, {fluname :: atom(), tid :: ets:tid(), datadir :: string(), - epoch :: pv1_epoch_n() + epoch :: pv1_epoch() }). %% public API @@ -87,8 +87,8 @@ start_link(FluName, DataDir) when is_atom(FluName) andalso is_list(DataDir) -> N = make_filename_mgr_name(FluName), gen_server:start_link({local, N}, ?MODULE, [FluName, DataDir], []). --spec find_or_make_filename_from_prefix( FluName :: atom(), - EpochId :: pv1_epoch_n(), +-spec find_or_make_filename_from_prefix( FluName :: atom(), + EpochId :: pv1_epoch(), Prefix :: {prefix, string()}, machi_dt:coc_nl()) -> {file, Filename :: string()} | {error, Reason :: term() } | timeout. @@ -130,7 +130,7 @@ list_files_by_prefix(_FluName, Other) -> init([FluName, DataDir]) -> Tid = ets:new(make_filename_mgr_name(FluName), [named_table, {read_concurrency, true}]), {ok, #state{fluname = FluName, - epoch = 0, + epoch = ?DUMMY_PV1_EPOCH, datadir = DataDir, tid = Tid}}. diff --git a/src/machi_flu_psup.erl b/src/machi_flu_psup.erl index 0539450..a357f8f 100644 --- a/src/machi_flu_psup.erl +++ b/src/machi_flu_psup.erl @@ -145,9 +145,9 @@ init([FluName, TcpPort, DataDir, Props0]) -> FProxySupSpec = machi_file_proxy_sup:child_spec(FluName), - ListenerSupSpec = {machi_listener_sup:make_listener_sup_name(FluName), - {machi_listener_sup, start_link, [FluName]}, - permanent, ?SHUTDOWN, supervisor, []}, + Flu1SubSupSpec = {machi_flu1_subsup:subsup_name(FluName), + {machi_flu1_subsup, start_link, [FluName]}, + permanent, ?SHUTDOWN, supervisor, []}, FluSpec = {FluName, {machi_flu1, start_link, @@ -157,7 +157,7 @@ init([FluName, TcpPort, DataDir, Props0]) -> {ok, {SupFlags, [ ProjSpec, FitnessSpec, MgrSpec, FProxySupSpec, FNameMgrSpec, MetaSupSpec, - ListenerSupSpec, FluSpec]}}. + Flu1SubSupSpec, FluSpec]}}. make_flu_regname(FluName) when is_atom(FluName) -> FluName. diff --git a/src/machi_listener_sup.erl b/src/machi_listener_sup.erl deleted file mode 100644 index f2362ad..0000000 --- a/src/machi_listener_sup.erl +++ /dev/null @@ -1,89 +0,0 @@ -%% ------------------------------------------------------------------- -%% -%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved. -%% -%% This file is provided to you under the Apache License, -%% Version 2.0 (the "License"); you may not use this file -%% except in compliance with the License. You may obtain -%% a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, -%% software distributed under the License is distributed on an -%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -%% KIND, either express or implied. See the License for the -%% specific language governing permissions and limitations -%% under the License. -%% -%% ------------------------------------------------------------------- - -%% @doc A supervisor to hold ranch listener for sigle FLU. -%% It holds at most one child worker. - -%% TODO: This supervisor is maybe useless. First introduced for -%% workaround to start listener dynamically in flu1 initialization -%% time. Because psup is being blocked in flu1 initialization time, -%% adding a child to psup leads to deadlock. If initialization can be -%% done only by static arguments, then this supervisor should be -%% removed and added as a direct child of `machi_flu_psup'. - --module(machi_listener_sup). --behaviour(supervisor). - -%% public API --export([start_link/1, - start_listener/6, - stop_listener/1, - make_listener_sup_name/1, - make_listener_name/1]). - -%% supervisor callback --export([init/1]). - --include("machi_projection.hrl"). - --define(BACKLOG, 8192). - --spec start_link(pv1_server()) -> {ok, pid()}. -start_link(FluName) -> - supervisor:start_link({local, make_listener_sup_name(FluName)}, ?MODULE, []). - --spec start_listener(pv1_server(), inet:port_number(), boolean(), - string(), ets:tab(), atom() | pid()) -> {ok, pid()}. -start_listener(FluName, TcpPort, Witness, DataDir, EpochTab, ProjStore) -> - supervisor:start_child(make_listener_sup_name(FluName), - child_spec(FluName, TcpPort, Witness, DataDir, - EpochTab, ProjStore)). - --spec stop_listener(pv1_server()) -> ok. -stop_listener(FluName) -> - SupName = make_listener_sup_name(FluName), - ListenerName = make_listener_name(FluName), - ok = supervisor:terminate_child(SupName, ListenerName), - ok = supervisor:delete_child(SupName, ListenerName). - --spec make_listener_name(pv1_server()) -> atom(). -make_listener_sup_name(FluName) when is_atom(FluName) -> - list_to_atom(atom_to_list(FluName) ++ "_listener_sup"). - --spec make_listener_sup_name(pv1_server()) -> atom(). -make_listener_name(FluName) -> - list_to_atom(atom_to_list(FluName) ++ "_listener"). - -%% Supervisor callback - -init([]) -> - SupFlags = {one_for_one, 1000, 10}, - {ok, {SupFlags, []}}. - --spec child_spec(pv1_server(), inet:port_number(), boolean(), - string(), ets:tab(), atom() | pid()) -> supervisor:child_spec(). -child_spec(FluName, TcpPort, Witness, DataDir, EpochTab, ProjStore) -> - ListenerName = make_listener_name(FluName), - NbAcceptors = 100, - TcpOpts = [{port, TcpPort}, {backlog, ?BACKLOG}], - NetServerOpts = [FluName, Witness, DataDir, EpochTab, ProjStore], - ranch:child_spec(ListenerName, NbAcceptors, - ranch_tcp, TcpOpts, - machi_flu1_net_server, NetServerOpts). diff --git a/test/machi_cinfo_test.erl b/test/machi_cinfo_test.erl index 9699df3..5e21083 100644 --- a/test/machi_cinfo_test.erl +++ b/test/machi_cinfo_test.erl @@ -38,6 +38,7 @@ smoke_test_() -> fun() -> machi_cinfo:private_projection(a) end, fun() -> machi_cinfo:fitness(a) end, fun() -> machi_cinfo:chain_manager(a) end, + fun() -> machi_cinfo:flu1(a) end, fun() -> machi_cinfo:dump() end ]}. diff --git a/test/machi_proxy_flu1_client_test.erl b/test/machi_proxy_flu1_client_test.erl index 439b1a7..b8556b7 100644 --- a/test/machi_proxy_flu1_client_test.erl +++ b/test/machi_proxy_flu1_client_test.erl @@ -32,7 +32,7 @@ api_smoke_test() -> RegName = api_smoke_flu, - TcpPort = 57124, + TcpPort = 17124, DataDir = "./data.api_smoke_flu", W_props = [{active_mode, false},{initial_wedged, false}], Prefix = <<"prefix">>, @@ -108,7 +108,7 @@ flu_restart_test_() -> flu_restart_test2() -> RegName = a, - TcpPort = 57125, + TcpPort = 17125, DataDir = "./data.api_smoke_flu2", W_props = [{initial_wedged, false}, {active_mode, false}],