Merge pull request #55 from basho/ss/flu1-factorization2

Ss/flu1 factorization2
This commit is contained in:
Scott Lystig Fritchie 2015-12-18 17:19:17 +09:00
commit 70d42a3fb5
10 changed files with 353 additions and 278 deletions

View file

@ -85,8 +85,8 @@ fitness(FluName) ->
-spec flu1(atom()) -> [{atom(), term()}].
flu1(FluName) ->
State = machi_flu1:current_state(FluName),
machi_flu1:format_state(State).
State = machi_flu1_append_server:current_state(FluName),
machi_flu1_append_server:format_state(State).
%% Internal functions

View file

@ -21,7 +21,9 @@
%% @doc The Machi FLU file server + file location sequencer.
%%
%% This module implements only the Machi FLU file server and its
%% implicit sequencer.
%% implicit sequencer together with listener, append server,
%% file management and file proxy processes.
%% Please see the EDoc "Overview" for details about the FLU as a
%% primitive file server process vs. the larger Machi design of a FLU
%% as a sequencer + file server + chain manager group of processes.
@ -54,27 +56,16 @@
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-export([timing_demo_test_COMMENTED_/0, sort_2lines/2]). % Just to suppress warning
-endif. % TEST
-export([start_link/1, stop/1,
update_wedge_state/3, wedge_myself/2]).
-export([make_projection_server_regname/1]).
-export([make_projection_server_regname/1,
ets_table_name/1]).
%% TODO: remove or replace in OTP way after gen_*'ified
-export([main2/4, run_append_server/2,
current_state/1, format_state/1]).
-export([main2/4]).
-record(state, {
flu_name :: atom(),
proj_store :: pid(),
witness = false :: boolean(),
append_pid :: pid(),
wedged = true :: boolean(),
etstab :: ets:tid(),
epoch_id :: 'undefined' | machi_dt:epoch_id(),
props = [] :: list() % proplist
}).
-define(SERVER_CMD_READ_TIMEOUT, 600*1000).
-define(INIT_TIMEOUT, 60*1000).
start_link([{FluName, TcpPort, DataDir}|Rest])
@ -96,33 +87,14 @@ stop(Pid) when is_pid(Pid) ->
error
end.
update_wedge_state(PidSpec, Boolean, EpochId)
when (Boolean == true orelse Boolean == false), is_tuple(EpochId) ->
PidSpec ! {wedge_state_change, Boolean, EpochId}.
update_wedge_state(PidSpec, Boolean, EpochId) ->
machi_flu1_append_server:int_update_wedge_state(PidSpec, Boolean, EpochId).
wedge_myself(PidSpec, EpochId)
when is_tuple(EpochId) ->
PidSpec ! {wedge_myself, EpochId}.
current_state(PidSpec) ->
PidSpec ! {current_state, self()},
%% TODO: Not so rubust f(^^;)
receive
Res -> Res
after
60*1000 -> {error, timeout}
end.
format_state(State) ->
Fields = record_info(fields, state),
[_Name | Values] = tuple_to_list(State),
lists:zip(Fields, Values).
wedge_myself(PidSpec, EpochId) ->
machi_flu1_append_server:int_wedge_myself(PidSpec, EpochId).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%
ets_table_name(FluName) when is_atom(FluName) ->
list_to_atom(atom_to_list(FluName) ++ "_epoch").
main2(FluName, TcpPort, DataDir, Props) ->
{SendAppendPidToProj_p, ProjectionPid} =
case proplists:get_value(projection_store_registered_name, Props) of
@ -149,23 +121,15 @@ main2(FluName, TcpPort, DataDir, Props) ->
{true, undefined}
end,
Witness_p = proplists:get_value(witness_mode, Props, false),
S0 = #state{flu_name=FluName,
proj_store=ProjectionPid,
wedged=Wedged_p,
witness=Witness_p,
etstab=ets_table_name(FluName),
epoch_id=EpochId,
props=Props},
{ok, AppendPid} = start_append_server(S0, self()),
{ok, AppendPid} = start_append_server(FluName, Witness_p, Wedged_p, EpochId),
if SendAppendPidToProj_p ->
machi_projection_store:set_wedge_notify_pid(ProjectionPid,
AppendPid);
machi_projection_store:set_wedge_notify_pid(ProjectionPid, AppendPid);
true ->
ok
end,
S1 = S0#state{append_pid=AppendPid},
{ok, ListenerPid} = start_listen_server(TcpPort, DataDir, S1),
{ok, ListenerPid} = start_listen_server(FluName, TcpPort, Witness_p, DataDir,
ets_table_name(FluName), ProjectionPid),
%% io:format(user, "Listener started: ~w~n", [{FluName, ListenerPid}]),
Config_e = machi_util:make_config_filename(DataDir, "unused"),
@ -176,135 +140,23 @@ main2(FluName, TcpPort, DataDir, Props) ->
ok = filelib:ensure_dir(Projection_e),
put(flu_flu_name, FluName),
put(flu_append_pid, S1#state.append_pid),
put(flu_append_pid, AppendPid),
put(flu_projection_pid, ProjectionPid),
put(flu_listen_pid, ListenerPid),
proc_lib:init_ack({ok, self()}),
receive killme -> ok end,
(catch exit(S1#state.append_pid, kill)),
(catch exit(AppendPid, kill)),
(catch exit(ProjectionPid, kill)),
(catch exit(ListenerPid, kill)),
ok.
start_append_server(S, AckPid) ->
proc_lib:start_link(?MODULE, run_append_server, [AckPid, S], ?INIT_TIMEOUT).
start_append_server(FluName, Witness_p, Wedged_p, EpochId) ->
machi_flu1_subsup:start_append_server(FluName, Witness_p, Wedged_p, EpochId).
start_listen_server(TcpPort, DataDir,
#state{flu_name=FluName, witness=Witness, etstab=EtsTab,
proj_store=ProjStore}=_S) ->
machi_listener_sup:start_listener(FluName, TcpPort, Witness, DataDir,
EtsTab, ProjStore).
run_append_server(FluPid, #state{flu_name=Name,
wedged=Wedged_p,epoch_id=EpochId}=S) ->
%% Reminder: Name is the "main" name of the FLU, i.e., no suffix
register(Name, self()),
TID = ets:new(ets_table_name(Name),
[set, protected, named_table, {read_concurrency, true}]),
ets:insert(TID, {epoch, {Wedged_p, EpochId}}),
proc_lib:init_ack({ok, self()}),
append_server_loop(FluPid, S#state{etstab=TID}).
append_server_loop(FluPid, #state{wedged=Wedged_p,
witness=Witness_p,
epoch_id=OldEpochId, flu_name=FluName}=S) ->
receive
{seq_append, From, _N, _L, _Prefix, _Chunk, _CSum, _Extra, _EpochID}
when Witness_p ->
%% The FLU's machi_flu1_net_server process ought to filter all
%% witness states, but we'll keep this clause for extra
%% paranoia.
From ! witness,
append_server_loop(FluPid, S);
{seq_append, From, _N, _L, _Prefix, _Chunk, _CSum, _Extra, _EpochID}
when Wedged_p ->
From ! wedged,
append_server_loop(FluPid, S);
{seq_append, From, CoC_Namespace, CoC_Locator,
Prefix, Chunk, CSum, Extra, EpochID} ->
%% Old is the one from our state, plain old 'EpochID' comes
%% from the client.
_ = case OldEpochId == EpochID of
true ->
spawn(fun() ->
append_server_dispatch(From, CoC_Namespace, CoC_Locator,
Prefix, Chunk, CSum, Extra,
FluName, EpochID)
end);
false ->
From ! {error, bad_epoch}
end,
append_server_loop(FluPid, S);
{wedge_myself, WedgeEpochId} ->
if not Wedged_p andalso WedgeEpochId == OldEpochId ->
true = ets:insert(S#state.etstab,
{epoch, {true, OldEpochId}}),
%% Tell my chain manager that it might want to react to
%% this new world.
Chmgr = machi_chain_manager1:make_chmgr_regname(FluName),
spawn(fun() ->
catch machi_chain_manager1:trigger_react_to_env(Chmgr)
end),
append_server_loop(FluPid, S#state{wedged=true});
true ->
append_server_loop(FluPid, S)
end;
{wedge_state_change, Boolean, {NewEpoch, _}=NewEpochId} ->
OldEpoch = case OldEpochId of {OldE, _} -> OldE;
undefined -> -1
end,
if NewEpoch >= OldEpoch ->
true = ets:insert(S#state.etstab,
{epoch, {Boolean, NewEpochId}}),
append_server_loop(FluPid, S#state{wedged=Boolean,
epoch_id=NewEpochId});
true ->
append_server_loop(FluPid, S)
end;
{wedge_status, FromPid} ->
#state{wedged=Wedged_p, epoch_id=EpochId} = S,
FromPid ! {wedge_status_reply, Wedged_p, EpochId},
append_server_loop(FluPid, S);
{current_state, FromPid} ->
FromPid ! S;
Else ->
io:format(user, "append_server_loop: WHA? ~p\n", [Else]),
append_server_loop(FluPid, S)
end.
append_server_dispatch(From, CoC_Namespace, CoC_Locator,
Prefix, Chunk, CSum, Extra, FluName, EpochId) ->
Result = case handle_append(CoC_Namespace, CoC_Locator,
Prefix, Chunk, CSum, Extra, FluName, EpochId) of
{ok, File, Offset} ->
{assignment, Offset, File};
Other ->
Other
end,
From ! Result,
exit(normal).
handle_append(_N, _L, _Prefix, <<>>, _Csum, _Extra, _FluName, _EpochId) ->
{error, bad_arg};
handle_append(CoC_Namespace, CoC_Locator,
Prefix, Chunk, Csum, Extra, FluName, EpochId) ->
CoC = {coc, CoC_Namespace, CoC_Locator},
Res = machi_flu_filename_mgr:find_or_make_filename_from_prefix(
FluName, EpochId, {prefix, Prefix}, CoC),
case Res of
{file, F} ->
case machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, F}) of
{ok, Pid} ->
{Tag, CS} = machi_util:unmake_tagged_csum(Csum),
Meta = [{client_csum_tag, Tag}, {client_csum, CS}],
machi_file_proxy:append(Pid, Meta, Extra, Chunk);
{error, trimmed} = E ->
E
end;
Error ->
Error
end.
start_listen_server(FluName, TcpPort, Witness_p, DataDir, EtsTab, ProjectionPid) ->
machi_flu1_subsup:start_listener(FluName, TcpPort, Witness_p, DataDir,
EtsTab, ProjectionPid).
%% This is the name of the projection store that is spawned by the
%% *flu*, for use primarily in testing scenarios. In normal use, we
@ -316,6 +168,8 @@ handle_append(CoC_Namespace, CoC_Locator,
make_projection_server_regname(BaseName) ->
list_to_atom(atom_to_list(BaseName) ++ "_pstore").
ets_table_name(FluName) when is_atom(FluName) ->
list_to_atom(atom_to_list(FluName) ++ "_epoch").
-ifdef(TEST).
@ -357,7 +211,7 @@ timing_demo_test2() ->
lists:foldl(fun(X, _) ->
B = machi_checksums:encode_csum_file_entry_hex(X, 100, CSum),
%% file:write(ZZZ, [B, 10]),
machi_checksums:decode_csum_file_entry_hex(list_to_binary(B))
decode_csum_file_entry_hex(list_to_binary(B))
end, x, Xs)
end),
io:format(user, "~.3f sec\n", [HexUSec / 1000000]),

View file

@ -0,0 +1,195 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
%% @doc Machi FLU1 append serialization server process
-module(machi_flu1_append_server).
-behavior(gen_server).
-include("machi.hrl").
-include("machi_projection.hrl").
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif. % TEST
-export([start_link/4]).
-export([init/1]).
-export([handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-export([int_update_wedge_state/3, int_wedge_myself/2]).
-export([current_state/1, format_state/1]).
-record(state, {
flu_name :: atom(),
witness = false :: boolean(),
wedged = true :: boolean(),
etstab :: ets:tid(),
epoch_id :: 'undefined' | machi_dt:epoch_id()
}).
-define(INIT_TIMEOUT, 60*1000).
-define(CALL_TIMEOUT, 60*1000).
-spec start_link(pv1_server(), boolean(), boolean(),
undefined | machi_dt:epoch_id()) -> {ok, pid()}.
start_link(Fluname, Witness_p, Wedged_p, EpochId) ->
%% Reminder: Name is the "main" name of the FLU, i.e., no suffix
gen_server:start_link({local, Fluname},
?MODULE, [Fluname, Witness_p, Wedged_p, EpochId],
[{timeout, ?INIT_TIMEOUT}]).
-spec current_state(atom() | pid()) -> term().
current_state(PidSpec) ->
gen_server:call(PidSpec, current_state, ?CALL_TIMEOUT).
format_state(State) ->
Fields = record_info(fields, state),
[_Name | Values] = tuple_to_list(State),
lists:zip(Fields, Values).
int_update_wedge_state(PidSpec, Boolean, EpochId)
when is_boolean(Boolean), is_tuple(EpochId) ->
gen_server:cast(PidSpec, {wedge_state_change, Boolean, EpochId}).
int_wedge_myself(PidSpec, EpochId)
when is_tuple(EpochId) ->
gen_server:cast(PidSpec, {wedge_myself, EpochId}).
init([Fluname, Witness_p, Wedged_p, EpochId]) ->
TID = ets:new(machi_flu1:ets_table_name(Fluname),
[set, protected, named_table, {read_concurrency, true}]),
ets:insert(TID, {epoch, {Wedged_p, EpochId}}),
{ok, #state{flu_name=Fluname, witness=Witness_p, wedged=Wedged_p,
etstab=TID, epoch_id=EpochId}}.
handle_call({seq_append, _From2, _N, _L, _Prefix, _Chunk, _CSum, _Extra, _EpochID},
_From, #state{witness=true}=S) ->
%% The FLU's machi_flu1_net_server process ought to filter all
%% witness states, but we'll keep this clause for extra
%% paranoia.
{reply, witness, S};
handle_call({seq_append, _From2, _N, _L, _Prefix, _Chunk, _CSum, _Extra, _EpochID},
_From, #state{wedged=true}=S) ->
{reply, wedged, S};
handle_call({seq_append, _From2, CoC_Namespace, CoC_Locator,
Prefix, Chunk, CSum, Extra, EpochID},
From, #state{flu_name=FluName, epoch_id=OldEpochId}=S) ->
%% Old is the one from our state, plain old 'EpochID' comes
%% from the client.
_ = case OldEpochId of
EpochID ->
spawn(fun() ->
append_server_dispatch(From, CoC_Namespace, CoC_Locator,
Prefix, Chunk, CSum, Extra,
FluName, EpochID)
end),
{noreply, S};
_ ->
{reply, {error, bad_epoch}, S}
end;
%% TODO: Who sends this message?
handle_call(wedge_status, _From,
#state{wedged=Wedged_p, epoch_id=EpochId} = S) ->
{reply, {wedge_status_reply, Wedged_p, EpochId}, S};
handle_call(current_state, _From, S) ->
{reply, S, S};
handle_call(Else, From, S) ->
io:format(user, "~s:handle_call: WHA? from=~w ~w\n", [?MODULE, From, Else]),
{noreply, S}.
handle_cast({wedge_myself, WedgeEpochId},
#state{flu_name=FluName, wedged=Wedged_p, epoch_id=OldEpochId}=S) ->
if not Wedged_p andalso WedgeEpochId == OldEpochId ->
true = ets:insert(S#state.etstab,
{epoch, {true, OldEpochId}}),
%% Tell my chain manager that it might want to react to
%% this new world.
Chmgr = machi_chain_manager1:make_chmgr_regname(FluName),
spawn(fun() ->
catch machi_chain_manager1:trigger_react_to_env(Chmgr)
end),
{noreply, S#state{wedged=true}};
true ->
{noreply, S}
end;
handle_cast({wedge_state_change, Boolean, {NewEpoch, _}=NewEpochId},
#state{epoch_id=OldEpochId}=S) ->
OldEpoch = case OldEpochId of {OldE, _} -> OldE;
undefined -> -1
end,
if NewEpoch >= OldEpoch ->
true = ets:insert(S#state.etstab,
{epoch, {Boolean, NewEpochId}}),
{noreply, S#state{wedged=Boolean, epoch_id=NewEpochId}};
true ->
{noreply, S}
end;
handle_cast(Else, S) ->
io:format(user, "~s:handle_cast: WHA? ~p\n", [?MODULE, Else]),
{noreply, S}.
handle_info(Else, S) ->
io:format(user, "~s:handle_info: WHA? ~p\n", [?MODULE, Else]),
{noreply, S}.
terminate(normal, _S) ->
ok;
terminate(Reason, _S) ->
lager:warning("~s:terminate: ~w", [?MODULE, Reason]),
ok.
code_change(_OldVsn, S, _Extra) ->
{ok, S}.
append_server_dispatch(From, CoC_Namespace, CoC_Locator,
Prefix, Chunk, CSum, Extra, FluName, EpochId) ->
Result = case handle_append(CoC_Namespace, CoC_Locator,
Prefix, Chunk, CSum, Extra, FluName, EpochId) of
{ok, File, Offset} ->
{assignment, Offset, File};
Other ->
Other
end,
_ = gen_server:reply(From, Result),
ok.
handle_append(_N, _L, _Prefix, <<>>, _Csum, _Extra, _FluName, _EpochId) ->
{error, bad_arg};
handle_append(CoC_Namespace, CoC_Locator,
Prefix, Chunk, Csum, Extra, FluName, EpochId) ->
CoC = {coc, CoC_Namespace, CoC_Locator},
Res = machi_flu_filename_mgr:find_or_make_filename_from_prefix(
FluName, EpochId, {prefix, Prefix}, CoC),
case Res of
{file, F} ->
case machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, F}) of
{ok, Pid} ->
{Tag, CS} = machi_util:unmake_tagged_csum(Csum),
Meta = [{client_csum_tag, Tag}, {client_csum, CS}],
machi_file_proxy:append(Pid, Meta, Extra, Chunk);
{error, trimmed} = E ->
E
end;
Error ->
Error
end.

View file

@ -355,16 +355,15 @@ do_server_append_chunk2(CoC_Namespace, CoC_Locator,
TaggedCSum = check_or_make_tagged_checksum(CSum_tag, Client_CSum,Chunk),
R = {seq_append, self(), CoC_Namespace, CoC_Locator,
Prefix, Chunk, TaggedCSum, ChunkExtra, EpochID},
FluName ! R,
receive
case gen_server:call(FluName, R, 10*1000) of
{assignment, Offset, File} ->
Size = iolist_size(Chunk),
{ok, {Offset, Size, File}};
witness ->
{error, bad_arg};
wedged ->
{error, wedged}
after 10*1000 ->
{error, wedged};
{error, timeout} ->
{error, partition}
end
catch

115
src/machi_flu1_subsup.erl Normal file
View file

@ -0,0 +1,115 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
%% @doc A supervisor to hold dynamic processes inside single
%% FLU service, ranch listener and append server.
%% TODO: This supervisor is maybe useless. First introduced for
%% workaround to start listener dynamically in flu1 initialization
%% phase. Because `machi_flu_psup' is being blocked in flu1
%% initialization time, adding a child to the supervisor leads to
%% deadlock. If initialization can be done only by static arguments,
%% then this supervisor should be removed and added as a direct child
%% of `machi_flu_psup'.
-module(machi_flu1_subsup).
-behaviour(supervisor).
%% public API
-export([start_link/1,
start_append_server/4,
stop_append_server/1,
start_listener/6,
stop_listener/1,
subsup_name/1,
listener_name/1]).
%% supervisor callback
-export([init/1]).
-include("machi_projection.hrl").
-define(SHUTDOWN, 5000).
-define(BACKLOG, 8192).
-spec start_link(pv1_server()) -> {ok, pid()}.
start_link(FluName) ->
supervisor:start_link({local, subsup_name(FluName)}, ?MODULE, []).
-spec start_append_server(pv1_server(), boolean(), boolean(),
undefined | machi_dt:epoch_id()) ->
{ok, pid()}.
start_append_server(FluName, Witness_p, Wedged_p, EpochId) ->
supervisor:start_child(subsup_name(FluName),
append_server_spec(FluName, Witness_p, Wedged_p, EpochId)).
-spec stop_append_server(pv1_server()) -> ok.
stop_append_server(FluName) ->
SubSup = listener_name(FluName),
ok = supervisor:terminate_child(SubSup, FluName),
ok = supervisor:delete_child(SubSup, FluName).
-spec start_listener(pv1_server(), inet:port_number(), boolean(),
string(), ets:tab(), atom() | pid()) -> {ok, pid()}.
start_listener(FluName, TcpPort, Witness, DataDir, EpochTab, ProjStore) ->
supervisor:start_child(subsup_name(FluName),
listener_spec(FluName, TcpPort, Witness, DataDir,
EpochTab, ProjStore)).
-spec stop_listener(pv1_server()) -> ok.
stop_listener(FluName) ->
SupName = subsup_name(FluName),
ListenerName = listener_name(FluName),
ok = supervisor:terminate_child(SupName, ListenerName),
ok = supervisor:delete_child(SupName, ListenerName).
-spec subsup_name(pv1_server()) -> atom().
subsup_name(FluName) when is_atom(FluName) ->
list_to_atom(atom_to_list(FluName) ++ "_flu1_subsup").
-spec listener_name(pv1_server()) -> atom().
listener_name(FluName) ->
list_to_atom(atom_to_list(FluName) ++ "_listener").
%% Supervisor callback
init([]) ->
SupFlags = {one_for_all, 1000, 10},
{ok, {SupFlags, []}}.
%% private
-spec listener_spec(pv1_server(), inet:port_number(), boolean(),
string(), ets:tab(), atom() | pid()) -> supervisor:child_spec().
listener_spec(FluName, TcpPort, Witness, DataDir, EpochTab, ProjStore) ->
ListenerName = listener_name(FluName),
NbAcceptors = 10,
TcpOpts = [{port, TcpPort}, {backlog, ?BACKLOG}],
NetServerOpts = [FluName, Witness, DataDir, EpochTab, ProjStore],
ranch:child_spec(ListenerName, NbAcceptors,
ranch_tcp, TcpOpts,
machi_flu1_net_server, NetServerOpts).
-spec append_server_spec(pv1_server(), boolean(), boolean(),
undefined | machi_dt:epoch_id()) -> supervisor:child_spec().
append_server_spec(FluName, Witness_p, Wedged_p, EpochId) ->
{FluName, {machi_flu1_append_server, start_link,
[FluName, Witness_p, Wedged_p, EpochId]},
permanent, ?SHUTDOWN, worker, [machi_flu1_append_server]}.

View file

@ -67,12 +67,12 @@
]).
-define(TIMEOUT, 10 * 1000).
-include("machi_projection.hrl"). %% included for pv1_epoch_n type
-include("machi_projection.hrl"). %% included for pv1_epoch type
-record(state, {fluname :: atom(),
tid :: ets:tid(),
datadir :: string(),
epoch :: pv1_epoch_n()
epoch :: pv1_epoch()
}).
%% public API
@ -87,8 +87,8 @@ start_link(FluName, DataDir) when is_atom(FluName) andalso is_list(DataDir) ->
N = make_filename_mgr_name(FluName),
gen_server:start_link({local, N}, ?MODULE, [FluName, DataDir], []).
-spec find_or_make_filename_from_prefix( FluName :: atom(),
EpochId :: pv1_epoch_n(),
-spec find_or_make_filename_from_prefix( FluName :: atom(),
EpochId :: pv1_epoch(),
Prefix :: {prefix, string()},
machi_dt:coc_nl()) ->
{file, Filename :: string()} | {error, Reason :: term() } | timeout.
@ -130,7 +130,7 @@ list_files_by_prefix(_FluName, Other) ->
init([FluName, DataDir]) ->
Tid = ets:new(make_filename_mgr_name(FluName), [named_table, {read_concurrency, true}]),
{ok, #state{fluname = FluName,
epoch = 0,
epoch = ?DUMMY_PV1_EPOCH,
datadir = DataDir,
tid = Tid}}.

View file

@ -145,9 +145,9 @@ init([FluName, TcpPort, DataDir, Props0]) ->
FProxySupSpec = machi_file_proxy_sup:child_spec(FluName),
ListenerSupSpec = {machi_listener_sup:make_listener_sup_name(FluName),
{machi_listener_sup, start_link, [FluName]},
permanent, ?SHUTDOWN, supervisor, []},
Flu1SubSupSpec = {machi_flu1_subsup:subsup_name(FluName),
{machi_flu1_subsup, start_link, [FluName]},
permanent, ?SHUTDOWN, supervisor, []},
FluSpec = {FluName,
{machi_flu1, start_link,
@ -157,7 +157,7 @@ init([FluName, TcpPort, DataDir, Props0]) ->
{ok, {SupFlags, [
ProjSpec, FitnessSpec, MgrSpec,
FProxySupSpec, FNameMgrSpec, MetaSupSpec,
ListenerSupSpec, FluSpec]}}.
Flu1SubSupSpec, FluSpec]}}.
make_flu_regname(FluName) when is_atom(FluName) ->
FluName.

View file

@ -1,89 +0,0 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
%% @doc A supervisor to hold ranch listener for sigle FLU.
%% It holds at most one child worker.
%% TODO: This supervisor is maybe useless. First introduced for
%% workaround to start listener dynamically in flu1 initialization
%% time. Because psup is being blocked in flu1 initialization time,
%% adding a child to psup leads to deadlock. If initialization can be
%% done only by static arguments, then this supervisor should be
%% removed and added as a direct child of `machi_flu_psup'.
-module(machi_listener_sup).
-behaviour(supervisor).
%% public API
-export([start_link/1,
start_listener/6,
stop_listener/1,
make_listener_sup_name/1,
make_listener_name/1]).
%% supervisor callback
-export([init/1]).
-include("machi_projection.hrl").
-define(BACKLOG, 8192).
-spec start_link(pv1_server()) -> {ok, pid()}.
start_link(FluName) ->
supervisor:start_link({local, make_listener_sup_name(FluName)}, ?MODULE, []).
-spec start_listener(pv1_server(), inet:port_number(), boolean(),
string(), ets:tab(), atom() | pid()) -> {ok, pid()}.
start_listener(FluName, TcpPort, Witness, DataDir, EpochTab, ProjStore) ->
supervisor:start_child(make_listener_sup_name(FluName),
child_spec(FluName, TcpPort, Witness, DataDir,
EpochTab, ProjStore)).
-spec stop_listener(pv1_server()) -> ok.
stop_listener(FluName) ->
SupName = make_listener_sup_name(FluName),
ListenerName = make_listener_name(FluName),
ok = supervisor:terminate_child(SupName, ListenerName),
ok = supervisor:delete_child(SupName, ListenerName).
-spec make_listener_name(pv1_server()) -> atom().
make_listener_sup_name(FluName) when is_atom(FluName) ->
list_to_atom(atom_to_list(FluName) ++ "_listener_sup").
-spec make_listener_sup_name(pv1_server()) -> atom().
make_listener_name(FluName) ->
list_to_atom(atom_to_list(FluName) ++ "_listener").
%% Supervisor callback
init([]) ->
SupFlags = {one_for_one, 1000, 10},
{ok, {SupFlags, []}}.
-spec child_spec(pv1_server(), inet:port_number(), boolean(),
string(), ets:tab(), atom() | pid()) -> supervisor:child_spec().
child_spec(FluName, TcpPort, Witness, DataDir, EpochTab, ProjStore) ->
ListenerName = make_listener_name(FluName),
NbAcceptors = 100,
TcpOpts = [{port, TcpPort}, {backlog, ?BACKLOG}],
NetServerOpts = [FluName, Witness, DataDir, EpochTab, ProjStore],
ranch:child_spec(ListenerName, NbAcceptors,
ranch_tcp, TcpOpts,
machi_flu1_net_server, NetServerOpts).

View file

@ -38,6 +38,7 @@ smoke_test_() ->
fun() -> machi_cinfo:private_projection(a) end,
fun() -> machi_cinfo:fitness(a) end,
fun() -> machi_cinfo:chain_manager(a) end,
fun() -> machi_cinfo:flu1(a) end,
fun() -> machi_cinfo:dump() end
]}.

View file

@ -32,7 +32,7 @@
api_smoke_test() ->
RegName = api_smoke_flu,
TcpPort = 57124,
TcpPort = 17124,
DataDir = "./data.api_smoke_flu",
W_props = [{active_mode, false},{initial_wedged, false}],
Prefix = <<"prefix">>,
@ -108,7 +108,7 @@ flu_restart_test_() ->
flu_restart_test2() ->
RegName = a,
TcpPort = 57125,
TcpPort = 17125,
DataDir = "./data.api_smoke_flu2",
W_props = [{initial_wedged, false}, {active_mode, false}],