Ss/flu1 factorization2 #55
6 changed files with 335 additions and 266 deletions
|
@ -21,7 +21,9 @@
|
||||||
%% @doc The Machi FLU file server + file location sequencer.
|
%% @doc The Machi FLU file server + file location sequencer.
|
||||||
%%
|
%%
|
||||||
%% This module implements only the Machi FLU file server and its
|
%% This module implements only the Machi FLU file server and its
|
||||||
%% implicit sequencer.
|
%% implicit sequencer together with listener, append server,
|
||||||
|
%% file management and file proxy processes.
|
||||||
|
|
||||||
%% Please see the EDoc "Overview" for details about the FLU as a
|
%% Please see the EDoc "Overview" for details about the FLU as a
|
||||||
%% primitive file server process vs. the larger Machi design of a FLU
|
%% primitive file server process vs. the larger Machi design of a FLU
|
||||||
%% as a sequencer + file server + chain manager group of processes.
|
%% as a sequencer + file server + chain manager group of processes.
|
||||||
|
@ -54,27 +56,16 @@
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
-export([timing_demo_test_COMMENTED_/0, sort_2lines/2]). % Just to suppress warning
|
||||||
-endif. % TEST
|
-endif. % TEST
|
||||||
|
|
||||||
-export([start_link/1, stop/1,
|
-export([start_link/1, stop/1,
|
||||||
update_wedge_state/3, wedge_myself/2]).
|
update_wedge_state/3, wedge_myself/2]).
|
||||||
-export([make_projection_server_regname/1]).
|
-export([make_projection_server_regname/1,
|
||||||
|
ets_table_name/1]).
|
||||||
%% TODO: remove or replace in OTP way after gen_*'ified
|
%% TODO: remove or replace in OTP way after gen_*'ified
|
||||||
-export([main2/4, run_append_server/2,
|
-export([main2/4]).
|
||||||
current_state/1, format_state/1]).
|
|
||||||
|
|
||||||
-record(state, {
|
|
||||||
flu_name :: atom(),
|
|
||||||
proj_store :: pid(),
|
|
||||||
witness = false :: boolean(),
|
|
||||||
append_pid :: pid(),
|
|
||||||
wedged = true :: boolean(),
|
|
||||||
etstab :: ets:tid(),
|
|
||||||
epoch_id :: 'undefined' | machi_dt:epoch_id(),
|
|
||||||
props = [] :: list() % proplist
|
|
||||||
}).
|
|
||||||
|
|
||||||
-define(SERVER_CMD_READ_TIMEOUT, 600*1000).
|
|
||||||
-define(INIT_TIMEOUT, 60*1000).
|
-define(INIT_TIMEOUT, 60*1000).
|
||||||
|
|
||||||
start_link([{FluName, TcpPort, DataDir}|Rest])
|
start_link([{FluName, TcpPort, DataDir}|Rest])
|
||||||
|
@ -97,32 +88,15 @@ stop(Pid) when is_pid(Pid) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
update_wedge_state(PidSpec, Boolean, EpochId)
|
update_wedge_state(PidSpec, Boolean, EpochId)
|
||||||
when (Boolean == true orelse Boolean == false), is_tuple(EpochId) ->
|
when is_boolean(Boolean), is_tuple(EpochId) ->
|
||||||
PidSpec ! {wedge_state_change, Boolean, EpochId}.
|
gen_server:cast(PidSpec, {wedge_state_change, Boolean, EpochId}).
|
||||||
|
|
||||||
wedge_myself(PidSpec, EpochId)
|
wedge_myself(PidSpec, EpochId)
|
||||||
when is_tuple(EpochId) ->
|
when is_tuple(EpochId) ->
|
||||||
PidSpec ! {wedge_myself, EpochId}.
|
gen_server:cast(PidSpec, {wedge_myself, EpochId}).
|
||||||
|
|
||||||
current_state(PidSpec) ->
|
|
||||||
PidSpec ! {current_state, self()},
|
|
||||||
%% TODO: Not so rubust f(^^;)
|
|
||||||
receive
|
|
||||||
Res -> Res
|
|
||||||
after
|
|
||||||
60*1000 -> {error, timeout}
|
|
||||||
end.
|
|
||||||
|
|
||||||
format_state(State) ->
|
|
||||||
Fields = record_info(fields, state),
|
|
||||||
[_Name | Values] = tuple_to_list(State),
|
|
||||||
lists:zip(Fields, Values).
|
|
||||||
|
|
||||||
%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
||||||
|
|
||||||
ets_table_name(FluName) when is_atom(FluName) ->
|
|
||||||
list_to_atom(atom_to_list(FluName) ++ "_epoch").
|
|
||||||
|
|
||||||
main2(FluName, TcpPort, DataDir, Props) ->
|
main2(FluName, TcpPort, DataDir, Props) ->
|
||||||
{SendAppendPidToProj_p, ProjectionPid} =
|
{SendAppendPidToProj_p, ProjectionPid} =
|
||||||
case proplists:get_value(projection_store_registered_name, Props) of
|
case proplists:get_value(projection_store_registered_name, Props) of
|
||||||
|
@ -150,22 +124,14 @@ main2(FluName, TcpPort, DataDir, Props) ->
|
||||||
end,
|
end,
|
||||||
Witness_p = proplists:get_value(witness_mode, Props, false),
|
Witness_p = proplists:get_value(witness_mode, Props, false),
|
||||||
|
|
||||||
S0 = #state{flu_name=FluName,
|
{ok, AppendPid} = start_append_server(FluName, Witness_p, Wedged_p, EpochId),
|
||||||
proj_store=ProjectionPid,
|
|
||||||
wedged=Wedged_p,
|
|
||||||
witness=Witness_p,
|
|
||||||
etstab=ets_table_name(FluName),
|
|
||||||
epoch_id=EpochId,
|
|
||||||
props=Props},
|
|
||||||
{ok, AppendPid} = start_append_server(S0, self()),
|
|
||||||
if SendAppendPidToProj_p ->
|
if SendAppendPidToProj_p ->
|
||||||
machi_projection_store:set_wedge_notify_pid(ProjectionPid,
|
machi_projection_store:set_wedge_notify_pid(ProjectionPid, AppendPid);
|
||||||
AppendPid);
|
|
||||||
true ->
|
true ->
|
||||||
ok
|
ok
|
||||||
end,
|
end,
|
||||||
S1 = S0#state{append_pid=AppendPid},
|
{ok, ListenerPid} = start_listen_server(FluName, TcpPort, Witness_p, DataDir,
|
||||||
{ok, ListenerPid} = start_listen_server(TcpPort, DataDir, S1),
|
ets_table_name(FluName), ProjectionPid),
|
||||||
%% io:format(user, "Listener started: ~w~n", [{FluName, ListenerPid}]),
|
%% io:format(user, "Listener started: ~w~n", [{FluName, ListenerPid}]),
|
||||||
|
|
||||||
Config_e = machi_util:make_config_filename(DataDir, "unused"),
|
Config_e = machi_util:make_config_filename(DataDir, "unused"),
|
||||||
|
@ -176,135 +142,23 @@ main2(FluName, TcpPort, DataDir, Props) ->
|
||||||
ok = filelib:ensure_dir(Projection_e),
|
ok = filelib:ensure_dir(Projection_e),
|
||||||
|
|
||||||
put(flu_flu_name, FluName),
|
put(flu_flu_name, FluName),
|
||||||
put(flu_append_pid, S1#state.append_pid),
|
put(flu_append_pid, AppendPid),
|
||||||
put(flu_projection_pid, ProjectionPid),
|
put(flu_projection_pid, ProjectionPid),
|
||||||
put(flu_listen_pid, ListenerPid),
|
put(flu_listen_pid, ListenerPid),
|
||||||
proc_lib:init_ack({ok, self()}),
|
proc_lib:init_ack({ok, self()}),
|
||||||
|
|
||||||
receive killme -> ok end,
|
receive killme -> ok end,
|
||||||
(catch exit(S1#state.append_pid, kill)),
|
(catch exit(AppendPid, kill)),
|
||||||
(catch exit(ProjectionPid, kill)),
|
(catch exit(ProjectionPid, kill)),
|
||||||
(catch exit(ListenerPid, kill)),
|
(catch exit(ListenerPid, kill)),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
start_append_server(S, AckPid) ->
|
start_append_server(FluName, Witness_p, Wedged_p, EpochId) ->
|
||||||
proc_lib:start_link(?MODULE, run_append_server, [AckPid, S], ?INIT_TIMEOUT).
|
machi_flu1_subsup:start_append_server(FluName, Witness_p, Wedged_p, EpochId).
|
||||||
|
|
||||||
start_listen_server(TcpPort, DataDir,
|
start_listen_server(FluName, TcpPort, Witness_p, DataDir, EtsTab, ProjectionPid) ->
|
||||||
#state{flu_name=FluName, witness=Witness, etstab=EtsTab,
|
machi_flu1_subsup:start_listener(FluName, TcpPort, Witness_p, DataDir,
|
||||||
proj_store=ProjStore}=_S) ->
|
EtsTab, ProjectionPid).
|
||||||
machi_listener_sup:start_listener(FluName, TcpPort, Witness, DataDir,
|
|
||||||
EtsTab, ProjStore).
|
|
||||||
|
|
||||||
run_append_server(FluPid, #state{flu_name=Name,
|
|
||||||
wedged=Wedged_p,epoch_id=EpochId}=S) ->
|
|
||||||
%% Reminder: Name is the "main" name of the FLU, i.e., no suffix
|
|
||||||
register(Name, self()),
|
|
||||||
TID = ets:new(ets_table_name(Name),
|
|
||||||
[set, protected, named_table, {read_concurrency, true}]),
|
|
||||||
ets:insert(TID, {epoch, {Wedged_p, EpochId}}),
|
|
||||||
proc_lib:init_ack({ok, self()}),
|
|
||||||
append_server_loop(FluPid, S#state{etstab=TID}).
|
|
||||||
|
|
||||||
append_server_loop(FluPid, #state{wedged=Wedged_p,
|
|
||||||
witness=Witness_p,
|
|
||||||
epoch_id=OldEpochId, flu_name=FluName}=S) ->
|
|
||||||
receive
|
|
||||||
{seq_append, From, _N, _L, _Prefix, _Chunk, _CSum, _Extra, _EpochID}
|
|
||||||
when Witness_p ->
|
|
||||||
%% The FLU's machi_flu1_net_server process ought to filter all
|
|
||||||
%% witness states, but we'll keep this clause for extra
|
|
||||||
%% paranoia.
|
|
||||||
From ! witness,
|
|
||||||
append_server_loop(FluPid, S);
|
|
||||||
{seq_append, From, _N, _L, _Prefix, _Chunk, _CSum, _Extra, _EpochID}
|
|
||||||
when Wedged_p ->
|
|
||||||
From ! wedged,
|
|
||||||
append_server_loop(FluPid, S);
|
|
||||||
{seq_append, From, CoC_Namespace, CoC_Locator,
|
|
||||||
Prefix, Chunk, CSum, Extra, EpochID} ->
|
|
||||||
%% Old is the one from our state, plain old 'EpochID' comes
|
|
||||||
%% from the client.
|
|
||||||
_ = case OldEpochId == EpochID of
|
|
||||||
true ->
|
|
||||||
spawn(fun() ->
|
|
||||||
append_server_dispatch(From, CoC_Namespace, CoC_Locator,
|
|
||||||
Prefix, Chunk, CSum, Extra,
|
|
||||||
FluName, EpochID)
|
|
||||||
end);
|
|
||||||
false ->
|
|
||||||
From ! {error, bad_epoch}
|
|
||||||
end,
|
|
||||||
append_server_loop(FluPid, S);
|
|
||||||
{wedge_myself, WedgeEpochId} ->
|
|
||||||
if not Wedged_p andalso WedgeEpochId == OldEpochId ->
|
|
||||||
true = ets:insert(S#state.etstab,
|
|
||||||
{epoch, {true, OldEpochId}}),
|
|
||||||
%% Tell my chain manager that it might want to react to
|
|
||||||
%% this new world.
|
|
||||||
Chmgr = machi_chain_manager1:make_chmgr_regname(FluName),
|
|
||||||
spawn(fun() ->
|
|
||||||
catch machi_chain_manager1:trigger_react_to_env(Chmgr)
|
|
||||||
end),
|
|
||||||
append_server_loop(FluPid, S#state{wedged=true});
|
|
||||||
true ->
|
|
||||||
append_server_loop(FluPid, S)
|
|
||||||
end;
|
|
||||||
{wedge_state_change, Boolean, {NewEpoch, _}=NewEpochId} ->
|
|
||||||
OldEpoch = case OldEpochId of {OldE, _} -> OldE;
|
|
||||||
undefined -> -1
|
|
||||||
end,
|
|
||||||
if NewEpoch >= OldEpoch ->
|
|
||||||
true = ets:insert(S#state.etstab,
|
|
||||||
{epoch, {Boolean, NewEpochId}}),
|
|
||||||
append_server_loop(FluPid, S#state{wedged=Boolean,
|
|
||||||
epoch_id=NewEpochId});
|
|
||||||
true ->
|
|
||||||
append_server_loop(FluPid, S)
|
|
||||||
end;
|
|
||||||
{wedge_status, FromPid} ->
|
|
||||||
#state{wedged=Wedged_p, epoch_id=EpochId} = S,
|
|
||||||
FromPid ! {wedge_status_reply, Wedged_p, EpochId},
|
|
||||||
append_server_loop(FluPid, S);
|
|
||||||
{current_state, FromPid} ->
|
|
||||||
FromPid ! S;
|
|
||||||
Else ->
|
|
||||||
io:format(user, "append_server_loop: WHA? ~p\n", [Else]),
|
|
||||||
append_server_loop(FluPid, S)
|
|
||||||
end.
|
|
||||||
|
|
||||||
append_server_dispatch(From, CoC_Namespace, CoC_Locator,
|
|
||||||
Prefix, Chunk, CSum, Extra, FluName, EpochId) ->
|
|
||||||
Result = case handle_append(CoC_Namespace, CoC_Locator,
|
|
||||||
Prefix, Chunk, CSum, Extra, FluName, EpochId) of
|
|
||||||
{ok, File, Offset} ->
|
|
||||||
{assignment, Offset, File};
|
|
||||||
Other ->
|
|
||||||
Other
|
|
||||||
end,
|
|
||||||
From ! Result,
|
|
||||||
exit(normal).
|
|
||||||
|
|
||||||
handle_append(_N, _L, _Prefix, <<>>, _Csum, _Extra, _FluName, _EpochId) ->
|
|
||||||
{error, bad_arg};
|
|
||||||
handle_append(CoC_Namespace, CoC_Locator,
|
|
||||||
Prefix, Chunk, Csum, Extra, FluName, EpochId) ->
|
|
||||||
CoC = {coc, CoC_Namespace, CoC_Locator},
|
|
||||||
Res = machi_flu_filename_mgr:find_or_make_filename_from_prefix(
|
|
||||||
FluName, EpochId, {prefix, Prefix}, CoC),
|
|
||||||
case Res of
|
|
||||||
{file, F} ->
|
|
||||||
case machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, F}) of
|
|
||||||
{ok, Pid} ->
|
|
||||||
{Tag, CS} = machi_util:unmake_tagged_csum(Csum),
|
|
||||||
Meta = [{client_csum_tag, Tag}, {client_csum, CS}],
|
|
||||||
machi_file_proxy:append(Pid, Meta, Extra, Chunk);
|
|
||||||
{error, trimmed} = E ->
|
|
||||||
E
|
|
||||||
end;
|
|
||||||
Error ->
|
|
||||||
Error
|
|
||||||
end.
|
|
||||||
|
|
||||||
%% This is the name of the projection store that is spawned by the
|
%% This is the name of the projection store that is spawned by the
|
||||||
%% *flu*, for use primarily in testing scenarios. In normal use, we
|
%% *flu*, for use primarily in testing scenarios. In normal use, we
|
||||||
|
@ -316,6 +170,8 @@ handle_append(CoC_Namespace, CoC_Locator,
|
||||||
make_projection_server_regname(BaseName) ->
|
make_projection_server_regname(BaseName) ->
|
||||||
list_to_atom(atom_to_list(BaseName) ++ "_pstore").
|
list_to_atom(atom_to_list(BaseName) ++ "_pstore").
|
||||||
|
|
||||||
|
ets_table_name(FluName) when is_atom(FluName) ->
|
||||||
|
list_to_atom(atom_to_list(FluName) ++ "_epoch").
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
|
|
||||||
|
@ -357,7 +213,7 @@ timing_demo_test2() ->
|
||||||
lists:foldl(fun(X, _) ->
|
lists:foldl(fun(X, _) ->
|
||||||
B = machi_checksums:encode_csum_file_entry_hex(X, 100, CSum),
|
B = machi_checksums:encode_csum_file_entry_hex(X, 100, CSum),
|
||||||
%% file:write(ZZZ, [B, 10]),
|
%% file:write(ZZZ, [B, 10]),
|
||||||
machi_checksums:decode_csum_file_entry_hex(list_to_binary(B))
|
decode_csum_file_entry_hex(list_to_binary(B))
|
||||||
end, x, Xs)
|
end, x, Xs)
|
||||||
end),
|
end),
|
||||||
io:format(user, "~.3f sec\n", [HexUSec / 1000000]),
|
io:format(user, "~.3f sec\n", [HexUSec / 1000000]),
|
||||||
|
|
188
src/machi_flu1_append_server.erl
Normal file
188
src/machi_flu1_append_server.erl
Normal file
|
@ -0,0 +1,188 @@
|
||||||
|
%% -------------------------------------------------------------------
|
||||||
|
%%
|
||||||
|
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% This file is provided to you under the Apache License,
|
||||||
|
%% Version 2.0 (the "License"); you may not use this file
|
||||||
|
%% except in compliance with the License. You may obtain
|
||||||
|
%% a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing,
|
||||||
|
%% software distributed under the License is distributed on an
|
||||||
|
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
%% KIND, either express or implied. See the License for the
|
||||||
|
%% specific language governing permissions and limitations
|
||||||
|
%% under the License.
|
||||||
|
%%
|
||||||
|
%% -------------------------------------------------------------------
|
||||||
|
|
||||||
|
%% @doc Machi FLU1 append serialization server process
|
||||||
|
|
||||||
|
-module(machi_flu1_append_server).
|
||||||
|
|
||||||
|
-behavior(gen_server).
|
||||||
|
|
||||||
|
-include("machi.hrl").
|
||||||
|
-include("machi_projection.hrl").
|
||||||
|
|
||||||
|
-ifdef(TEST).
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
-endif. % TEST
|
||||||
|
|
||||||
|
-export([start_link/4]).
|
||||||
|
|
||||||
|
-export([init/1]).
|
||||||
|
-export([handle_call/3, handle_cast/2, handle_info/2,
|
||||||
|
terminate/2, code_change/3]).
|
||||||
|
|
||||||
|
-export([current_state/1, format_state/1]).
|
||||||
|
|
||||||
|
-record(state, {
|
||||||
|
flu_name :: atom(),
|
||||||
|
witness = false :: boolean(),
|
||||||
|
wedged = true :: boolean(),
|
||||||
|
etstab :: ets:tid(),
|
||||||
|
epoch_id :: 'undefined' | machi_dt:epoch_id()
|
||||||
|
}).
|
||||||
|
|
||||||
|
-define(INIT_TIMEOUT, 60*1000).
|
||||||
|
-define(CALL_TIMEOUT, 60*1000).
|
||||||
|
|
||||||
|
-spec start_link(pv1_server(), boolean(), boolean(),
|
||||||
|
undefined | machi_dt:epoch_id()) -> {ok, pid()}.
|
||||||
|
start_link(Fluname, Witness_p, Wedged_p, EpochId) ->
|
||||||
|
%% Reminder: Name is the "main" name of the FLU, i.e., no suffix
|
||||||
|
gen_server:start_link({local, Fluname},
|
||||||
|
?MODULE, [Fluname, Witness_p, Wedged_p, EpochId],
|
||||||
|
[{timeout, ?INIT_TIMEOUT}]).
|
||||||
|
|
||||||
|
-spec current_state(atom() | pid()) -> term().
|
||||||
|
current_state(PidSpec) ->
|
||||||
|
gen_server:call(PidSpec, current_state, ?CALL_TIMEOUT).
|
||||||
|
|
||||||
|
format_state(State) ->
|
||||||
|
Fields = record_info(fields, state),
|
||||||
|
[_Name | Values] = tuple_to_list(State),
|
||||||
|
lists:zip(Fields, Values).
|
||||||
|
|
||||||
|
|
||||||
|
init([Fluname, Witness_p, Wedged_p, EpochId]) ->
|
||||||
|
TID = ets:new(machi_flu1:ets_table_name(Fluname),
|
||||||
|
[set, protected, named_table, {read_concurrency, true}]),
|
||||||
|
ets:insert(TID, {epoch, {Wedged_p, EpochId}}),
|
||||||
|
{ok, #state{flu_name=Fluname, witness=Witness_p, wedged=Wedged_p,
|
||||||
|
etstab=TID, epoch_id=EpochId}}.
|
||||||
|
|
||||||
|
handle_call({seq_append, _From2, _N, _L, _Prefix, _Chunk, _CSum, _Extra, _EpochID},
|
||||||
|
_From, #state{witness=true}=S) ->
|
||||||
|
%% The FLU's machi_flu1_net_server process ought to filter all
|
||||||
|
%% witness states, but we'll keep this clause for extra
|
||||||
|
%% paranoia.
|
||||||
|
{reply, witness, S};
|
||||||
|
handle_call({seq_append, _From2, _N, _L, _Prefix, _Chunk, _CSum, _Extra, _EpochID},
|
||||||
|
_From, #state{wedged=true}=S) ->
|
||||||
|
{reply, wedged, S};
|
||||||
|
handle_call({seq_append, _From2, CoC_Namespace, CoC_Locator,
|
||||||
|
Prefix, Chunk, CSum, Extra, EpochID},
|
||||||
|
From, #state{flu_name=FluName, epoch_id=OldEpochId}=S) ->
|
||||||
|
%% Old is the one from our state, plain old 'EpochID' comes
|
||||||
|
%% from the client.
|
||||||
|
_ = case OldEpochId of
|
||||||
|
EpochID ->
|
||||||
|
spawn(fun() ->
|
||||||
|
append_server_dispatch(From, CoC_Namespace, CoC_Locator,
|
||||||
|
Prefix, Chunk, CSum, Extra,
|
||||||
|
FluName, EpochID)
|
||||||
|
end),
|
||||||
|
{noreply, S};
|
||||||
|
_ ->
|
||||||
|
{reply, {error, bad_epoch}, S}
|
||||||
|
end;
|
||||||
|
%% TODO: Who sends this message?
|
||||||
|
handle_call(wedge_status, _From,
|
||||||
|
#state{wedged=Wedged_p, epoch_id=EpochId} = S) ->
|
||||||
|
{reply, {wedge_status_reply, Wedged_p, EpochId}, S};
|
||||||
|
handle_call(current_state, _From, S) ->
|
||||||
|
{reply, S, S};
|
||||||
|
handle_call(Else, From, S) ->
|
||||||
|
io:format(user, "~s:handle_call: WHA? from=~w ~w\n", [?MODULE, From, Else]),
|
||||||
|
{noreply, S}.
|
||||||
|
|
||||||
|
handle_cast({wedge_myself, WedgeEpochId},
|
||||||
|
#state{flu_name=FluName, wedged=Wedged_p, epoch_id=OldEpochId}=S) ->
|
||||||
|
if not Wedged_p andalso WedgeEpochId == OldEpochId ->
|
||||||
|
true = ets:insert(S#state.etstab,
|
||||||
|
{epoch, {true, OldEpochId}}),
|
||||||
|
%% Tell my chain manager that it might want to react to
|
||||||
|
%% this new world.
|
||||||
|
Chmgr = machi_chain_manager1:make_chmgr_regname(FluName),
|
||||||
|
spawn(fun() ->
|
||||||
|
catch machi_chain_manager1:trigger_react_to_env(Chmgr)
|
||||||
|
end),
|
||||||
|
{noreply, S#state{wedged=true}};
|
||||||
|
true ->
|
||||||
|
{noreply, S}
|
||||||
|
end;
|
||||||
|
handle_cast({wedge_state_change, Boolean, {NewEpoch, _}=NewEpochId},
|
||||||
|
#state{epoch_id=OldEpochId}=S) ->
|
||||||
|
OldEpoch = case OldEpochId of {OldE, _} -> OldE;
|
||||||
|
undefined -> -1
|
||||||
|
end,
|
||||||
|
if NewEpoch >= OldEpoch ->
|
||||||
|
true = ets:insert(S#state.etstab,
|
||||||
|
{epoch, {Boolean, NewEpochId}}),
|
||||||
|
{noreply, S#state{wedged=Boolean, epoch_id=NewEpochId}};
|
||||||
|
true ->
|
||||||
|
{noreply, S}
|
||||||
|
end;
|
||||||
|
handle_cast(Else, S) ->
|
||||||
|
io:format(user, "~s:handle_cast: WHA? ~p\n", [?MODULE, Else]),
|
||||||
|
{noreply, S}.
|
||||||
|
|
||||||
|
handle_info(Else, S) ->
|
||||||
|
io:format(user, "~s:handle_info: WHA? ~p\n", [?MODULE, Else]),
|
||||||
|
{noreply, S}.
|
||||||
|
|
||||||
|
terminate(normal, _S) ->
|
||||||
|
ok;
|
||||||
|
terminate(Reason, _S) ->
|
||||||
|
lager:warning("~s:terminate: ~w", [?MODULE, Reason]),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
code_change(_OldVsn, S, _Extra) ->
|
||||||
|
{ok, S}.
|
||||||
|
|
||||||
|
append_server_dispatch(From, CoC_Namespace, CoC_Locator,
|
||||||
|
Prefix, Chunk, CSum, Extra, FluName, EpochId) ->
|
||||||
|
Result = case handle_append(CoC_Namespace, CoC_Locator,
|
||||||
|
Prefix, Chunk, CSum, Extra, FluName, EpochId) of
|
||||||
|
{ok, File, Offset} ->
|
||||||
|
{assignment, Offset, File};
|
||||||
|
Other ->
|
||||||
|
Other
|
||||||
|
end,
|
||||||
|
_ = gen_server:reply(From, Result),
|
||||||
|
exit(normal).
|
||||||
|
|
||||||
|
handle_append(_N, _L, _Prefix, <<>>, _Csum, _Extra, _FluName, _EpochId) ->
|
||||||
|
{error, bad_arg};
|
||||||
|
handle_append(CoC_Namespace, CoC_Locator,
|
||||||
|
Prefix, Chunk, Csum, Extra, FluName, EpochId) ->
|
||||||
|
CoC = {coc, CoC_Namespace, CoC_Locator},
|
||||||
|
Res = machi_flu_filename_mgr:find_or_make_filename_from_prefix(
|
||||||
|
FluName, EpochId, {prefix, Prefix}, CoC),
|
||||||
|
case Res of
|
||||||
|
{file, F} ->
|
||||||
|
case machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, F}) of
|
||||||
|
{ok, Pid} ->
|
||||||
|
{Tag, CS} = machi_util:unmake_tagged_csum(Csum),
|
||||||
|
Meta = [{client_csum_tag, Tag}, {client_csum, CS}],
|
||||||
|
machi_file_proxy:append(Pid, Meta, Extra, Chunk);
|
||||||
|
{error, trimmed} = E ->
|
||||||
|
E
|
||||||
|
end;
|
||||||
|
Error ->
|
||||||
|
Error
|
||||||
|
end.
|
|
@ -355,16 +355,15 @@ do_server_append_chunk2(CoC_Namespace, CoC_Locator,
|
||||||
TaggedCSum = check_or_make_tagged_checksum(CSum_tag, Client_CSum,Chunk),
|
TaggedCSum = check_or_make_tagged_checksum(CSum_tag, Client_CSum,Chunk),
|
||||||
R = {seq_append, self(), CoC_Namespace, CoC_Locator,
|
R = {seq_append, self(), CoC_Namespace, CoC_Locator,
|
||||||
Prefix, Chunk, TaggedCSum, ChunkExtra, EpochID},
|
Prefix, Chunk, TaggedCSum, ChunkExtra, EpochID},
|
||||||
FluName ! R,
|
case gen_server:call(FluName, R, 10*1000) of
|
||||||
receive
|
|
||||||
{assignment, Offset, File} ->
|
{assignment, Offset, File} ->
|
||||||
Size = iolist_size(Chunk),
|
Size = iolist_size(Chunk),
|
||||||
{ok, {Offset, Size, File}};
|
{ok, {Offset, Size, File}};
|
||||||
witness ->
|
witness ->
|
||||||
{error, bad_arg};
|
{error, bad_arg};
|
||||||
wedged ->
|
wedged ->
|
||||||
{error, wedged}
|
{error, wedged};
|
||||||
after 10*1000 ->
|
{error, timeout} ->
|
||||||
{error, partition}
|
{error, partition}
|
||||||
end
|
end
|
||||||
catch
|
catch
|
||||||
|
|
115
src/machi_flu1_subsup.erl
Normal file
115
src/machi_flu1_subsup.erl
Normal file
|
@ -0,0 +1,115 @@
|
||||||
|
%% -------------------------------------------------------------------
|
||||||
|
%%
|
||||||
|
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% This file is provided to you under the Apache License,
|
||||||
|
%% Version 2.0 (the "License"); you may not use this file
|
||||||
|
%% except in compliance with the License. You may obtain
|
||||||
|
%% a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing,
|
||||||
|
%% software distributed under the License is distributed on an
|
||||||
|
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
%% KIND, either express or implied. See the License for the
|
||||||
|
%% specific language governing permissions and limitations
|
||||||
|
%% under the License.
|
||||||
|
%%
|
||||||
|
%% -------------------------------------------------------------------
|
||||||
|
|
||||||
|
%% @doc A supervisor to hold dynamic processes inside single
|
||||||
|
%% FLU service, ranch listener and append server.
|
||||||
|
|
||||||
|
%% TODO: This supervisor is maybe useless. First introduced for
|
||||||
|
%% workaround to start listener dynamically in flu1 initialization
|
||||||
|
%% phase. Because `machi_flu_psup' is being blocked in flu1
|
||||||
|
%% initialization time, adding a child to the supervisor leads to
|
||||||
|
%% deadlock. If initialization can be done only by static arguments,
|
||||||
|
%% then this supervisor should be removed and added as a direct child
|
||||||
|
%% of `machi_flu_psup'.
|
||||||
|
|
||||||
|
-module(machi_flu1_subsup).
|
||||||
|
-behaviour(supervisor).
|
||||||
|
|
||||||
|
%% public API
|
||||||
|
-export([start_link/1,
|
||||||
|
start_append_server/4,
|
||||||
|
stop_append_server/1,
|
||||||
|
start_listener/6,
|
||||||
|
stop_listener/1,
|
||||||
|
subsup_name/1,
|
||||||
|
listener_name/1]).
|
||||||
|
|
||||||
|
%% supervisor callback
|
||||||
|
-export([init/1]).
|
||||||
|
|
||||||
|
-include("machi_projection.hrl").
|
||||||
|
|
||||||
|
-define(SHUTDOWN, 5000).
|
||||||
|
-define(BACKLOG, 8192).
|
||||||
|
|
||||||
|
-spec start_link(pv1_server()) -> {ok, pid()}.
|
||||||
|
start_link(FluName) ->
|
||||||
|
supervisor:start_link({local, subsup_name(FluName)}, ?MODULE, []).
|
||||||
|
|
||||||
|
-spec start_append_server(pv1_server(), boolean(), boolean(),
|
||||||
|
undefined | machi_dt:epoch_id()) ->
|
||||||
|
{ok, pid()}.
|
||||||
|
start_append_server(FluName, Witness_p, Wedged_p, EpochId) ->
|
||||||
|
supervisor:start_child(subsup_name(FluName),
|
||||||
|
append_server_spec(FluName, Witness_p, Wedged_p, EpochId)).
|
||||||
|
|
||||||
|
-spec stop_append_server(pv1_server()) -> ok.
|
||||||
|
stop_append_server(FluName) ->
|
||||||
|
SubSup = listener_name(FluName),
|
||||||
|
ok = supervisor:terminate_child(SubSup, FluName),
|
||||||
|
ok = supervisor:delete_child(SubSup, FluName).
|
||||||
|
|
||||||
|
-spec start_listener(pv1_server(), inet:port_number(), boolean(),
|
||||||
|
string(), ets:tab(), atom() | pid()) -> {ok, pid()}.
|
||||||
|
start_listener(FluName, TcpPort, Witness, DataDir, EpochTab, ProjStore) ->
|
||||||
|
supervisor:start_child(subsup_name(FluName),
|
||||||
|
listener_spec(FluName, TcpPort, Witness, DataDir,
|
||||||
|
EpochTab, ProjStore)).
|
||||||
|
|
||||||
|
-spec stop_listener(pv1_server()) -> ok.
|
||||||
|
stop_listener(FluName) ->
|
||||||
|
SupName = subsup_name(FluName),
|
||||||
|
ListenerName = listener_name(FluName),
|
||||||
|
ok = supervisor:terminate_child(SupName, ListenerName),
|
||||||
|
ok = supervisor:delete_child(SupName, ListenerName).
|
||||||
|
|
||||||
|
-spec subsup_name(pv1_server()) -> atom().
|
||||||
|
subsup_name(FluName) when is_atom(FluName) ->
|
||||||
|
list_to_atom(atom_to_list(FluName) ++ "_flu1_subsup").
|
||||||
|
|
||||||
|
-spec listener_name(pv1_server()) -> atom().
|
||||||
|
listener_name(FluName) ->
|
||||||
|
list_to_atom(atom_to_list(FluName) ++ "_listener").
|
||||||
|
|
||||||
|
%% Supervisor callback
|
||||||
|
|
||||||
|
init([]) ->
|
||||||
|
SupFlags = {one_for_all, 1000, 10},
|
||||||
|
{ok, {SupFlags, []}}.
|
||||||
|
|
||||||
|
%% private
|
||||||
|
|
||||||
|
-spec listener_spec(pv1_server(), inet:port_number(), boolean(),
|
||||||
|
string(), ets:tab(), atom() | pid()) -> supervisor:child_spec().
|
||||||
|
listener_spec(FluName, TcpPort, Witness, DataDir, EpochTab, ProjStore) ->
|
||||||
|
ListenerName = listener_name(FluName),
|
||||||
|
NbAcceptors = 100,
|
||||||
|
TcpOpts = [{port, TcpPort}, {backlog, ?BACKLOG}],
|
||||||
|
NetServerOpts = [FluName, Witness, DataDir, EpochTab, ProjStore],
|
||||||
|
ranch:child_spec(ListenerName, NbAcceptors,
|
||||||
|
ranch_tcp, TcpOpts,
|
||||||
|
machi_flu1_net_server, NetServerOpts).
|
||||||
|
|
||||||
|
-spec append_server_spec(pv1_server(), boolean(), boolean(),
|
||||||
|
undefined | machi_dt:epoch_id()) -> supervisor:child_spec().
|
||||||
|
append_server_spec(FluName, Witness_p, Wedged_p, EpochId) ->
|
||||||
|
{FluName, {machi_flu1_append_server, start_link,
|
||||||
|
[FluName, Witness_p, Wedged_p, EpochId]},
|
||||||
|
permanent, ?SHUTDOWN, worker, [machi_flu1_append_server]}.
|
|
@ -143,8 +143,8 @@ init([FluName, TcpPort, DataDir, Props0]) ->
|
||||||
|
|
||||||
FProxySupSpec = machi_file_proxy_sup:child_spec(FluName),
|
FProxySupSpec = machi_file_proxy_sup:child_spec(FluName),
|
||||||
|
|
||||||
ListenerSupSpec = {machi_listener_sup:make_listener_sup_name(FluName),
|
Flu1SubSupSpec = {machi_flu1_subsup:subsup_name(FluName),
|
||||||
{machi_listener_sup, start_link, [FluName]},
|
{machi_flu1_subsup, start_link, [FluName]},
|
||||||
permanent, ?SHUTDOWN, supervisor, []},
|
permanent, ?SHUTDOWN, supervisor, []},
|
||||||
|
|
||||||
FluSpec = {FluName,
|
FluSpec = {FluName,
|
||||||
|
@ -155,7 +155,7 @@ init([FluName, TcpPort, DataDir, Props0]) ->
|
||||||
{ok, {SupFlags, [
|
{ok, {SupFlags, [
|
||||||
ProjSpec, FitnessSpec, MgrSpec,
|
ProjSpec, FitnessSpec, MgrSpec,
|
||||||
FProxySupSpec, FNameMgrSpec, MetaSupSpec,
|
FProxySupSpec, FNameMgrSpec, MetaSupSpec,
|
||||||
ListenerSupSpec, FluSpec]}}.
|
Flu1SubSupSpec, FluSpec]}}.
|
||||||
|
|
||||||
make_flu_regname(FluName) when is_atom(FluName) ->
|
make_flu_regname(FluName) when is_atom(FluName) ->
|
||||||
FluName.
|
FluName.
|
||||||
|
|
|
@ -1,89 +0,0 @@
|
||||||
%% -------------------------------------------------------------------
|
|
||||||
%%
|
|
||||||
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
|
|
||||||
%%
|
|
||||||
%% This file is provided to you under the Apache License,
|
|
||||||
%% Version 2.0 (the "License"); you may not use this file
|
|
||||||
%% except in compliance with the License. You may obtain
|
|
||||||
%% a copy of the License at
|
|
||||||
%%
|
|
||||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
%%
|
|
||||||
%% Unless required by applicable law or agreed to in writing,
|
|
||||||
%% software distributed under the License is distributed on an
|
|
||||||
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
||||||
%% KIND, either express or implied. See the License for the
|
|
||||||
%% specific language governing permissions and limitations
|
|
||||||
%% under the License.
|
|
||||||
%%
|
|
||||||
%% -------------------------------------------------------------------
|
|
||||||
|
|
||||||
%% @doc A supervisor to hold ranch listener for sigle FLU.
|
|
||||||
%% It holds at most one child worker.
|
|
||||||
|
|
||||||
%% TODO: This supervisor is maybe useless. First introduced for
|
|
||||||
%% workaround to start listener dynamically in flu1 initialization
|
|
||||||
%% time. Because psup is being blocked in flu1 initialization time,
|
|
||||||
%% adding a child to psup leads to deadlock. If initialization can be
|
|
||||||
%% done only by static arguments, then this supervisor should be
|
|
||||||
%% removed and added as a direct child of `machi_flu_psup'.
|
|
||||||
|
|
||||||
-module(machi_listener_sup).
|
|
||||||
-behaviour(supervisor).
|
|
||||||
|
|
||||||
%% public API
|
|
||||||
-export([start_link/1,
|
|
||||||
start_listener/6,
|
|
||||||
stop_listener/1,
|
|
||||||
make_listener_sup_name/1,
|
|
||||||
make_listener_name/1]).
|
|
||||||
|
|
||||||
%% supervisor callback
|
|
||||||
-export([init/1]).
|
|
||||||
|
|
||||||
-include("machi_projection.hrl").
|
|
||||||
|
|
||||||
-define(BACKLOG, 8192).
|
|
||||||
|
|
||||||
-spec start_link(pv1_server()) -> {ok, pid()}.
|
|
||||||
start_link(FluName) ->
|
|
||||||
supervisor:start_link({local, make_listener_sup_name(FluName)}, ?MODULE, []).
|
|
||||||
|
|
||||||
-spec start_listener(pv1_server(), inet:port_number(), boolean(),
|
|
||||||
string(), ets:tab(), atom() | pid()) -> {ok, pid()}.
|
|
||||||
start_listener(FluName, TcpPort, Witness, DataDir, EpochTab, ProjStore) ->
|
|
||||||
supervisor:start_child(make_listener_sup_name(FluName),
|
|
||||||
child_spec(FluName, TcpPort, Witness, DataDir,
|
|
||||||
EpochTab, ProjStore)).
|
|
||||||
|
|
||||||
-spec stop_listener(pv1_server()) -> ok.
|
|
||||||
stop_listener(FluName) ->
|
|
||||||
SupName = make_listener_sup_name(FluName),
|
|
||||||
ListenerName = make_listener_name(FluName),
|
|
||||||
ok = supervisor:terminate_child(SupName, ListenerName),
|
|
||||||
ok = supervisor:delete_child(SupName, ListenerName).
|
|
||||||
|
|
||||||
-spec make_listener_name(pv1_server()) -> atom().
|
|
||||||
make_listener_sup_name(FluName) when is_atom(FluName) ->
|
|
||||||
list_to_atom(atom_to_list(FluName) ++ "_listener_sup").
|
|
||||||
|
|
||||||
-spec make_listener_sup_name(pv1_server()) -> atom().
|
|
||||||
make_listener_name(FluName) ->
|
|
||||||
list_to_atom(atom_to_list(FluName) ++ "_listener").
|
|
||||||
|
|
||||||
%% Supervisor callback
|
|
||||||
|
|
||||||
init([]) ->
|
|
||||||
SupFlags = {one_for_one, 1000, 10},
|
|
||||||
{ok, {SupFlags, []}}.
|
|
||||||
|
|
||||||
-spec child_spec(pv1_server(), inet:port_number(), boolean(),
|
|
||||||
string(), ets:tab(), atom() | pid()) -> supervisor:child_spec().
|
|
||||||
child_spec(FluName, TcpPort, Witness, DataDir, EpochTab, ProjStore) ->
|
|
||||||
ListenerName = make_listener_name(FluName),
|
|
||||||
NbAcceptors = 100,
|
|
||||||
TcpOpts = [{port, TcpPort}, {backlog, ?BACKLOG}],
|
|
||||||
NetServerOpts = [FluName, Witness, DataDir, EpochTab, ProjStore],
|
|
||||||
ranch:child_spec(ListenerName, NbAcceptors,
|
|
||||||
ranch_tcp, TcpOpts,
|
|
||||||
machi_flu1_net_server, NetServerOpts).
|
|
Loading…
Reference in a new issue