WIP: flu1 refactoring including some cosmetics

This commit is contained in:
Shunichi Shinohara 2015-12-22 17:51:56 +09:00
parent 546901ef49
commit 9de618cfe1
11 changed files with 81 additions and 81 deletions

View file

@ -324,7 +324,7 @@ handle_call({set_chain_members, SetChainName, SetOldEpoch, CMode,
{NUPI, All_list -- NUPI}
end,
NewEpoch = OldEpoch + ?SET_CHAIN_MEMBERS_EPOCH_SKIP,
ok = set_consistency_mode(machi_flu_psup:make_proj_supname(MyName), CMode),
ok = set_consistency_mode(machi_flu_psup:make_proj_regname(MyName), CMode),
NewProj = machi_projection:update_checksum(
OldProj#projection_v1{author_server=MyName,
chain_name=SetChainName,
@ -3031,7 +3031,7 @@ get_unfit_list(FitnessServer) ->
get_projection_store_pid_or_regname(#ch_mgr{name=MyName, opts=MgrOpts}) ->
case get_projection_store_regname(MgrOpts) of
undefined ->
machi_flu_psup:make_proj_supname(MyName);
machi_flu_psup:make_proj_regname(MyName);
PStr ->
PStr
end.

View file

@ -75,7 +75,7 @@ private_projection(FluName) ->
-spec chain_manager(atom()) -> term().
chain_manager(FluName) ->
Mgr = machi_flu_psup:make_mgr_supname(FluName),
Mgr = machi_flu_psup:make_mgr_regname(FluName),
sys:get_status(Mgr).
-spec fitness(atom()) -> term().
@ -91,7 +91,7 @@ flu1(FluName) ->
%% Internal functions
projection(FluName, Kind) ->
ProjStore = machi_flu1:make_projection_server_regname(FluName),
ProjStore = machi_flu_psup:make_proj_regname(FluName),
{ok, Projection} = machi_projection_store:read_latest_projection(
whereis(ProjStore), Kind),
Fields = record_info(fields, projection_v1),

View file

@ -46,13 +46,10 @@
-module(machi_flu1).
-include_lib("kernel/include/file.hrl").
-behavior(gen_server).
-include("machi.hrl").
-include("machi_pb.hrl").
-include("machi_projection.hrl").
-define(V(X,Y), ok).
%% -include("machi_verbose.hrl").
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
@ -60,32 +57,29 @@
-endif. % TEST
-export([start_link/1, stop/1,
update_wedge_state/3, wedge_myself/2]).
-export([make_projection_server_regname/1,
ets_table_name/1]).
%% TODO: remove or replace in OTP way after gen_*'ified
-export([main2/4]).
update_wedge_state/3, wedge_myself/2,
epoch_table_name/1]).
-export([init/1]).
-export([handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-define(INIT_TIMEOUT, 60*1000).
-record(state, {
flu_name :: atom(),
append_pid :: pid(),
projection_pid :: atom() | pid(),
listener_pid :: pid()
}).
start_link([{FluName, TcpPort, DataDir}|Rest])
when is_atom(FluName), is_integer(TcpPort), is_list(DataDir) ->
proc_lib:start_link(?MODULE, main2, [FluName, TcpPort, DataDir, Rest],
?INIT_TIMEOUT).
gen_server:start_link(?MODULE, [FluName, TcpPort, DataDir, Rest],
[{timeout, ?INIT_TIMEOUT}]).
stop(RegName) when is_atom(RegName) ->
case whereis(RegName) of
undefined -> ok;
Pid -> stop(Pid)
end;
stop(Pid) when is_pid(Pid) ->
case erlang:is_process_alive(Pid) of
true ->
Pid ! killme,
ok;
false ->
error
end.
stop(Pid) ->
gen_server:call(Pid, stop).
update_wedge_state(PidSpec, Boolean, EpochId) ->
machi_flu1_append_server:int_update_wedge_state(PidSpec, Boolean, EpochId).
@ -95,11 +89,19 @@ wedge_myself(PidSpec, EpochId) ->
%%%%%%%%%%%%%%%%%%%%%%%%%%%%
main2(FluName, TcpPort, DataDir, Props) ->
init([FluName, TcpPort, DataDir, Props]) ->
Config_e = machi_util:make_config_filename(DataDir, "unused"),
ok = filelib:ensure_dir(Config_e),
{_, Data_e} = machi_util:make_data_filename(DataDir, "unused"),
ok = filelib:ensure_dir(Data_e),
Projection_e = machi_util:make_projection_filename(DataDir, "unused"),
ok = filelib:ensure_dir(Projection_e),
put(flu_flu_name, FluName),
{SendAppendPidToProj_p, ProjectionPid} =
case proplists:get_value(projection_store_registered_name, Props) of
undefined ->
RN = make_projection_server_regname(FluName),
RN = machi_flu_psup:make_proj_regname(FluName),
{ok, PP} =
machi_projection_store:start_link(RN, DataDir, undefined),
{true, PP};
@ -109,6 +111,8 @@ main2(FluName, TcpPort, DataDir, Props) ->
InitialWedged_p = proplists:get_value(initial_wedged, Props),
ProjRes = machi_projection_store:read_latest_projection(ProjectionPid,
private),
put(flu_projection_pid, ProjectionPid),
{Wedged_p, EpochId} =
if InitialWedged_p == undefined,
is_tuple(ProjRes), element(1, ProjRes) == ok ->
@ -128,29 +132,39 @@ main2(FluName, TcpPort, DataDir, Props) ->
true ->
ok
end,
{ok, ListenerPid} = start_listen_server(FluName, TcpPort, Witness_p, DataDir,
ets_table_name(FluName), ProjectionPid),
%% io:format(user, "Listener started: ~w~n", [{FluName, ListenerPid}]),
Config_e = machi_util:make_config_filename(DataDir, "unused"),
ok = filelib:ensure_dir(Config_e),
{_, Data_e} = machi_util:make_data_filename(DataDir, "unused"),
ok = filelib:ensure_dir(Data_e),
Projection_e = machi_util:make_projection_filename(DataDir, "unused"),
ok = filelib:ensure_dir(Projection_e),
put(flu_flu_name, FluName),
put(flu_append_pid, AppendPid),
put(flu_projection_pid, ProjectionPid),
put(flu_listen_pid, ListenerPid),
proc_lib:init_ack({ok, self()}),
receive killme -> ok end,
{ok, ListenerPid} = start_listen_server(FluName, TcpPort, Witness_p, DataDir,
epoch_table_name(FluName), ProjectionPid),
put(flu_listener_pid, ListenerPid),
{ok, #state{flu_name=FluName, append_pid=AppendPid,
projection_pid=ProjectionPid, listener_pid=ListenerPid}}.
handle_call(stop, _From, S) ->
{stop, normal, ok, S};
handle_call(Else, From, S) ->
lager:info("~s:handle_call: WHA? from=~w ~w", [?MODULE, From, Else]),
{noreply, S}.
handle_cast(Else, S) ->
lager:info("~s:handle_cast: WHA? ~p", [?MODULE, Else]),
{noreply, S}.
handle_info(Else, S) ->
lager:info("~s:handle_info: WHA? ~p", [?MODULE, Else]),
{noreply, S}.
terminate(_Reason, #state{append_pid=AppendPid, projection_pid=ProjectionPid,
listener_pid=ListenerPid}) ->
(catch exit(AppendPid, kill)),
(catch exit(ProjectionPid, kill)),
(catch exit(ListenerPid, kill)),
ok.
code_change(_OldVsn, S, _Extra) ->
{ok, S}.
start_append_server(FluName, Witness_p, Wedged_p, EpochId) ->
machi_flu1_subsup:start_append_server(FluName, Witness_p, Wedged_p, EpochId).
@ -158,17 +172,7 @@ start_listen_server(FluName, TcpPort, Witness_p, DataDir, EtsTab, ProjectionPid)
machi_flu1_subsup:start_listener(FluName, TcpPort, Witness_p, DataDir,
EtsTab, ProjectionPid).
%% This is the name of the projection store that is spawned by the
%% *flu*, for use primarily in testing scenarios. In normal use, we
%% ought to be using the OTP style of managing processes, via
%% supervisors, namely via machi_flu_psup.erl, which uses a
%% *different* naming convention for the projection store name that it
%% registers.
make_projection_server_regname(BaseName) ->
list_to_atom(atom_to_list(BaseName) ++ "_pstore").
ets_table_name(FluName) when is_atom(FluName) ->
epoch_table_name(FluName) when is_atom(FluName) ->
list_to_atom(atom_to_list(FluName) ++ "_epoch").
-ifdef(TEST).

View file

@ -76,7 +76,7 @@ int_wedge_myself(PidSpec, EpochId)
gen_server:cast(PidSpec, {wedge_myself, EpochId}).
init([Fluname, Witness_p, Wedged_p, EpochId]) ->
TID = ets:new(machi_flu1:ets_table_name(Fluname),
TID = ets:new(machi_flu1:epoch_table_name(Fluname),
[set, protected, named_table, {read_concurrency, true}]),
ets:insert(TID, {epoch, {Wedged_p, EpochId}}),
{ok, #state{flu_name=Fluname, witness=Witness_p, wedged=Wedged_p,

View file

@ -77,8 +77,8 @@
start_flu_package/1, start_flu_package/4, stop_flu_package/1]).
%% Internal API
-export([start_link/4,
make_flu_regname/1, make_p_regname/1, make_mgr_supname/1,
make_proj_supname/1, make_fitness_regname/1]).
make_flu_regname/1, make_p_regname/1, make_mgr_regname/1,
make_proj_regname/1, make_fitness_regname/1]).
%% Supervisor callbacks
-export([init/1]).
@ -121,7 +121,7 @@ init([FluName, TcpPort, DataDir, Props0]) ->
MaxSecondsBetweenRestarts = 3600,
SupFlags = {RestartStrategy, MaxRestarts, MaxSecondsBetweenRestarts},
ProjRegName = make_proj_supname(FluName),
ProjRegName = make_proj_regname(FluName),
Props = Props0 ++ [{projection_store_registered_name, ProjRegName},
{use_partition_simulator,false}],
ProjSpec = {ProjRegName,
@ -133,7 +133,7 @@ init([FluName, TcpPort, DataDir, Props0]) ->
{machi_fitness, start_link,
[ [{FluName}|Props] ]},
permanent, ?SHUTDOWN, worker, []},
MgrSpec = {make_mgr_supname(FluName),
MgrSpec = {make_mgr_regname(FluName),
{machi_chain_manager1, start_link,
[FluName, [], Props]},
permanent, ?SHUTDOWN, worker, []},
@ -165,10 +165,10 @@ make_flu_regname(FluName) when is_atom(FluName) ->
make_p_regname(FluName) when is_atom(FluName) ->
list_to_atom("flusup_" ++ atom_to_list(FluName)).
make_mgr_supname(MgrName) when is_atom(MgrName) ->
make_mgr_regname(MgrName) when is_atom(MgrName) ->
machi_chain_manager1:make_chmgr_regname(MgrName).
make_proj_supname(ProjName) when is_atom(ProjName) ->
make_proj_regname(ProjName) when is_atom(ProjName) ->
list_to_atom(atom_to_list(ProjName) ++ "_pstore").
make_fitness_regname(FluName) when is_atom(FluName) ->

View file

@ -211,7 +211,7 @@ get_local_running_flus() ->
get_latest_public_epochs(FLUs) ->
[begin
PS = machi_flu1:make_projection_server_regname(FLU),
PS = machi_flu_psup:make_proj_regname(FLU),
{ok, {Epoch, _CSum}} = machi_projection_store:get_latest_epochid(
PS, public),
{FLU, Epoch}
@ -297,7 +297,7 @@ bootstrap_chain2(#chain_def_v1{name=NewChainName, mode=NewCMode,
NewWitnesses_list = [Name || #p_srvr{name=Name} <- Witnesses],
Mgr = machi_chain_manager1:make_chmgr_regname(FLU),
PStore = machi_flu1:make_projection_server_regname(FLU),
PStore = machi_flu_psup:make_proj_regname(FLU),
{ok, #projection_v1{epoch_number=OldEpoch, chain_name=OldChainName,
mode=OldCMode,
all_members=OldAll_list, witnesses=OldWitnesses}} =

View file

@ -207,7 +207,7 @@ all_list_extra(Num) ->
[begin
FLUNameStr = [$a + I - 1],
FLUName = list_to_atom(FLUNameStr),
MgrName = machi_flu_psup:make_mgr_supname(FLUName),
MgrName = machi_flu_psup:make_mgr_regname(FLUName),
{#p_srvr{name=FLUName, address="localhost", port=PortBase+I,
props=[{chmgr, MgrName}]},
DirBase ++ "/data.eqc." ++ FLUNameStr}
@ -316,7 +316,7 @@ initial_state() ->
initial_state(Num, Verbose) ->
AllListE = all_list_extra(Num),
FLUNames = [P#p_srvr.name || {P, _Dir} <- AllListE],
MgrNames = [{Name, machi_flu_psup:make_mgr_supname(Name)} || Name <- FLUNames],
MgrNames = [{Name, machi_flu_psup:make_mgr_regname(Name)} || Name <- FLUNames],
#state{num=Num, verbose=Verbose,
flu_names=FLUNames, mgr_names=MgrNames,
cr_count=cr_count(Num)}.
@ -325,7 +325,7 @@ setup_target(Num, Seed, Verbose) ->
%% ?V("setup_target(Num=~w, Seed=~w~nn", [Num, Seed]),
AllListE = all_list_extra(Num),
FLUNames = [P#p_srvr.name || {P, _Dir} <- AllListE],
MgrNames = [{Name, machi_flu_psup:make_mgr_supname(Name)} || Name <- FLUNames],
MgrNames = [{Name, machi_flu_psup:make_mgr_regname(Name)} || Name <- FLUNames],
Dict = orddict:from_list([{P#p_srvr.name, P} || {P, _Dir} <- AllListE]),
setup_chain(Seed, AllListE, FLUNames, MgrNames, Dict),

View file

@ -194,7 +194,7 @@ convergence_demo_testfun(NumFLUs, MgrOpts0) ->
{_, ap_mode} -> ap_mode
end,
MgrNamez = [begin
MgrName = machi_flu_psup:make_mgr_supname(Name),
MgrName = machi_flu_psup:make_mgr_regname(Name),
ok = ?MGR:set_chain_members(MgrName, ch_demo, 0, CMode,
MembersDict,Witnesses),
{Name, MgrName}

View file

@ -70,8 +70,8 @@ partial_stop_restart2() ->
{b,#p_srvr{name=b, address="localhost", port=5561, props="./data.b"}},
{c,#p_srvr{name=c, address="localhost", port=5562, props="./data.c"}}
],
ChMgrs = [machi_flu_psup:make_mgr_supname(P#p_srvr.name) || {_,P} <-Ps],
PStores = [machi_flu_psup:make_proj_supname(P#p_srvr.name) || {_,P} <-Ps],
ChMgrs = [machi_flu_psup:make_mgr_regname(P#p_srvr.name) || {_,P} <-Ps],
PStores = [machi_flu_psup:make_proj_regname(P#p_srvr.name) || {_,P} <-Ps],
Dict = orddict:from_list(Ps),
[os:cmd("rm -rf " ++ P#p_srvr.props) || {_,P} <- Ps],
{ok, SupPid} = machi_sup:start_link(),

View file

@ -79,13 +79,10 @@ smoke_test2() ->
Pa = #p_srvr{name=a,address="localhost",port=PortBase+0},
Pb = #p_srvr{name=b,address="localhost",port=PortBase+1},
Pc = #p_srvr{name=c,address="localhost",port=PortBase+2},
%% Pstore_a = machi_flu1:make_projection_server_regname(a),
%% Pstore_b = machi_flu1:make_projection_server_regname(b),
%% Pstore_c = machi_flu1:make_projection_server_regname(c),
Pstores = [Pstore_a, Pstore_b, Pstore_c] =
[machi_flu1:make_projection_server_regname(a),
machi_flu1:make_projection_server_regname(b),
machi_flu1:make_projection_server_regname(c)],
[machi_flu_psup:make_proj_regname(a),
machi_flu_psup:make_proj_regname(b),
machi_flu_psup:make_proj_regname(c)],
ChMgrs = [ChMgr_a, ChMgr_b, ChMgr_c] =
[machi_chain_manager1:make_chmgr_regname(a),
machi_chain_manager1:make_chmgr_regname(b),

View file

@ -40,7 +40,7 @@ start_flu_package(FluName, TcpPort, DataDir) ->
-spec start_flu_package(atom(), inet:port_number(), string(), list()) ->
{Ps::[#p_srvr{}], MgrNames::[atom()], Dirs::[string()]}.
start_flu_package(FluName, TcpPort, DataDir, Props) ->
MgrName = machi_flu_psup:make_mgr_supname(FluName),
MgrName = machi_flu_psup:make_mgr_regname(FluName),
FluInfo = [{#p_srvr{name=FluName, address="localhost", port=TcpPort,
props=[{chmgr, MgrName}, {data_dir, DataDir} | Props]},
DataDir, MgrName}],
@ -71,7 +71,7 @@ flu_info(FluCount, BaseTcpPort, DirPrefix, Props) ->
[begin
FLUNameStr = [$a + I - 1],
FLUName = list_to_atom(FLUNameStr),
MgrName = machi_flu_psup:make_mgr_supname(FLUName),
MgrName = machi_flu_psup:make_mgr_regname(FLUName),
DataDir = DirPrefix ++ "/data.eqc." ++ FLUNameStr,
{#p_srvr{name=FLUName, address="localhost", port=BaseTcpPort + I,
props=[{chmgr, MgrName}, {data_dir, DataDir} | Props]},
@ -91,7 +91,6 @@ clean_up(FluInfo) ->
case proplists:get_value(no_cleanup, Props) of
true -> ok;
_ ->
_ = machi_flu1:stop(FLUName),
clean_up_dir(Dir)
end
end || {#p_srvr{name=FLUName, props=Props}, Dir, _} <- FluInfo],