From 2d0c03ef35290cc17963848653fd9d3ac8009fb5 Mon Sep 17 00:00:00 2001 From: Mark Allen Date: Mon, 5 Oct 2015 22:18:29 -0500 Subject: [PATCH] Integration with current FLU implementation --- src/machi_file_proxy_sup.erl | 22 ++- src/machi_fitness.erl | 2 +- src/machi_flu1.erl | 274 ++++++++------------------------- src/machi_flu_filename_mgr.erl | 19 ++- src/machi_flu_metadata_mgr.erl | 81 ++++++---- src/machi_flu_psup.erl | 21 ++- 6 files changed, 162 insertions(+), 257 deletions(-) diff --git a/src/machi_file_proxy_sup.erl b/src/machi_file_proxy_sup.erl index bd5cec2..639fa45 100644 --- a/src/machi_file_proxy_sup.erl +++ b/src/machi_file_proxy_sup.erl @@ -24,8 +24,9 @@ %% public API -export([ - start_link/0, - start_proxy/2 + child_spec/1, + start_link/1, + start_proxy/3 ]). %% supervisor callback @@ -33,14 +34,23 @@ init/1 ]). -start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, []). +child_spec(FluName) -> + Name = make_proxy_name(FluName), + {Name, + {?MODULE, start_link, [FluName]}, + permanent, 5000, supervisor, [?MODULE]}. -start_proxy(Filename, DataDir) -> - supervisor:start_child(?MODULE, [Filename, DataDir]). +start_link(FluName) -> + supervisor:start_link({local, make_proxy_name(FluName)}, ?MODULE, []). + +start_proxy(FluName, Filename, DataDir) -> + supervisor:start_child(make_proxy_name(FluName), [Filename, DataDir]). init([]) -> SupFlags = {simple_one_for_one, 1000, 10}, ChildSpec = {unused, {machi_file_proxy, start_link, []}, temporary, 2000, worker, [machi_file_proxy]}, {ok, {SupFlags, [ChildSpec]}}. + +make_proxy_name(FluName) when is_atom(FluName) -> + list_to_atom(atom_to_list(FluName) ++ "_file_proxy_sup"). diff --git a/src/machi_fitness.erl b/src/machi_fitness.erl index 67cf21b..9537369 100644 --- a/src/machi_fitness.erl +++ b/src/machi_fitness.erl @@ -185,7 +185,7 @@ handle_info({adjust_down_list, FLU}, #state{active_unfit=ActiveUnfit}=S) -> handle_info(dump, #state{my_flu_name=MyFluName,active_unfit=ActiveUnfit, pending_map=Map}=S) -> %% io:format(user, "DUMP: ~w/~w: ~p ~W\n", [MyFluName, self(), ActiveUnfit, map_value(Map), 13]), - io:format(user, "DUMP ~w: ~w, ", [MyFluName, ActiveUnfit]), + %io:format(user, "DUMP ~w: ~w, ", [MyFluName, ActiveUnfit]), {noreply, S}; handle_info(_Info, S) -> {noreply, S}. diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl index e18d06c..6ee257a 100644 --- a/src/machi_flu1.erl +++ b/src/machi_flu1.erl @@ -216,10 +216,9 @@ listen_server_loop(LSock, S) -> spawn_link(fun() -> net_server_loop(Sock, S) end), listen_server_loop(LSock, S). -append_server_loop(FluPid, #state{data_dir=DataDir, wedged=Wedged_p, +append_server_loop(FluPid, #state{wedged=Wedged_p, witness=Witness_p, epoch_id=OldEpochId, flu_name=FluName}=S) -> - AppendServerPid = self(), receive {seq_append, From, _Prefix, _Chunk, _CSum, _Extra, _EpochID} when Witness_p -> @@ -233,9 +232,16 @@ append_server_loop(FluPid, #state{data_dir=DataDir, wedged=Wedged_p, From ! wedged, append_server_loop(FluPid, S); {seq_append, From, Prefix, Chunk, CSum, Extra, EpochID} -> - spawn(fun() -> append_server_dispatch(From, Prefix, - Chunk, CSum, Extra, EpochID, - DataDir, AppendServerPid) end), + %% Old is the one from our state, plain old 'EpochID' comes + %% from the client. + case OldEpochId == EpochID of + true -> + spawn(fun() -> + append_server_dispatch(From, Prefix, Chunk, CSum, Extra, FluName) + end); + false -> + From ! {error, bad_epoch} + end, append_server_loop(FluPid, S); {wedge_myself, WedgeEpochId} -> if not Wedged_p andalso WedgeEpochId == OldEpochId -> @@ -484,7 +490,7 @@ do_server_proj_request({kick_projection_reaction}, do_server_append_chunk(PKey, Prefix, Chunk, CSum_tag, CSum, ChunkExtra, S) -> - case sanitize_file_string(Prefix) of + case sanitize_prefix(Prefix) of ok -> do_server_append_chunk2(PKey, Prefix, Chunk, CSum_tag, CSum, ChunkExtra, S); @@ -519,98 +525,30 @@ do_server_append_chunk2(_PKey, Prefix, Chunk, CSum_tag, Client_CSum, {error, bad_arg} end. -do_server_write_chunk(File, Offset, Chunk, CSum_tag, CSum, - #state{data_dir=DataDir}=S) -> +do_server_write_chunk(File, Offset, Chunk, CSum_tag, CSum, #state{flu_name=FluName}) -> case sanitize_file_string(File) of ok -> - CSumPath = machi_util:make_checksum_filename(DataDir, File), - case file:open(CSumPath, [append, raw, binary]) of - {ok, FHc} -> - Path = DataDir ++ "/data/" ++ - machi_util:make_string(File), - {ok, FHd} = file:open(Path, [read, write, raw, binary]), - try - do_server_write_chunk2( - File, Offset, Chunk, CSum_tag, CSum, DataDir, - FHc, FHd) - after - (catch file:close(FHc)), - (catch file:close(FHd)) - end; - {error, enoent} -> - ok = filelib:ensure_dir(CSumPath), - do_server_write_chunk(File, Offset, Chunk, CSum_tag, - CSum, S) - end; + {ok, Pid} = machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}), + Meta = [{client_csum_tag, CSum_tag}, {client_csum, CSum}], + machi_file_proxy:write(Pid, Offset, Meta, Chunk); _ -> {error, bad_arg} end. -do_server_write_chunk2(_File, Offset, Chunk, CSum_tag, - Client_CSum, _DataDir, FHc, FHd) -> - try - TaggedCSum = check_or_make_tagged_checksum(CSum_tag, Client_CSum,Chunk), - Size = iolist_size(Chunk), - case file:pwrite(FHd, Offset, Chunk) of - ok -> - CSum_info = encode_csum_file_entry(Offset, Size, TaggedCSum), - ok = file:write(FHc, CSum_info), - ok; - _Else3 -> - machi_util:verb("Else3 ~p ~p ~p\n", - [Offset, Size, _Else3]), - {error, bad_arg} - end - catch - throw:{bad_csum, _CS} -> - {error, bad_checksum}; - error:badarg -> - error_logger:error_msg("Message send to ~p gave badarg, make certain server is running with correct registered name\n", [?MODULE]), - {error, bad_arg} - end. - -do_server_read_chunk(File, Offset, Size, _Opts, #state{data_dir=DataDir})-> +do_server_read_chunk(File, Offset, Size, _Opts, #state{flu_name=FluName})-> %% TODO: Look inside Opts someday. case sanitize_file_string(File) of ok -> - {_, Path} = machi_util:make_data_filename(DataDir, File), - case file:open(Path, [read, binary, raw]) of - {ok, FH} -> - try - case file:pread(FH, Offset, Size) of - {ok, Bytes} when byte_size(Bytes) == Size -> - {ok, Bytes}; - {ok, Bytes} -> - machi_util:verb("ok read but wanted ~p got ~p: ~p @ offset ~p\n", - [Size,size(Bytes),File,Offset]), - io:format(user, "ok read but wanted ~p got ~p: ~p @ offset ~p\n", - [Size,size(Bytes),File,Offset]), - {error, partial_read}; - eof -> - {error, not_written}; %% TODO perhaps_do_net_server_ec_read(Sock, FH); - _Else2 -> - machi_util:verb("Else2 ~p ~p ~P\n", - [Offset, Size, _Else2, 20]), - {error, bad_read} - end - after - file:close(FH) - end; - {error, enoent} -> - {error, not_written}; - {error, _Else} -> - io:format(user, "Unexpected ~p at ~p ~p\n", - [_Else, ?MODULE, ?LINE]), - {error, bad_arg} - end; + {ok, Pid} = machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}), + machi_file_proxy:read(Pid, Offset, Size); _ -> {error, bad_arg} end. -do_server_checksum_listing(File, #state{data_dir=DataDir}=_S) -> +do_server_checksum_listing(File, #state{flu_name=FluName, data_dir=DataDir}=_S) -> case sanitize_file_string(File) of ok -> - ok = sync_checksum_file(File), + ok = sync_checksum_file(FluName, File), CSumPath = machi_util:make_checksum_filename(DataDir, File), %% TODO: If this file is legitimately bigger than our %% {packet_size,N} limit, then we'll have a difficult time, eh? @@ -692,12 +630,29 @@ do_server_trunc_hack(File, #state{data_dir=DataDir}=_S) -> {error, bad_arg} end. -append_server_dispatch(From, Prefix, Chunk, CSum, Extra, EpochID, - DataDir, LinkPid) -> - Pid = write_server_get_pid(Prefix, EpochID, DataDir, LinkPid), - Pid ! {seq_append, From, Prefix, Chunk, CSum, Extra, EpochID}, +append_server_dispatch(From, Prefix, Chunk, CSum, Extra, FluName) -> + Result = case handle_append(Prefix, Chunk, CSum, Extra, FluName) of + {ok, File, Offset} -> + {assignment, Offset, File}; + Other -> + Other + end, + From ! Result, exit(normal). +handle_append(_Prefix, <<>>, _Csum, _Extra, _FluName) -> + {error, bad_arg}; +handle_append(Prefix, Chunk, Csum, Extra, FluName) -> + case machi_flu_filename_mgr:find_or_make_filename_from_prefix({prefix, Prefix}) of + {file, F} -> + {ok, Pid} = machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, F}), + {Tag, Csum} = machi_util:unmake_tagged_csum(Csum), + Meta = [{client_csum_tag, Tag}, {client_csum, Csum}], + machi_file_proxy:append(Pid, Meta, Extra, Chunk); + Error -> + Error + end. + sanitize_file_string(Str) -> case re:run(Str, "/") of nomatch -> @@ -706,133 +661,28 @@ sanitize_file_string(Str) -> error end. -sync_checksum_file(File) -> - Prefix = re:replace(File, "\\..*", "", [{return, binary}]), - case write_server_find_pid(Prefix) of +sanitize_prefix(Prefix) -> + %% We are using '^' as our component delimiter + case re:run(Prefix, "^|/") of + nomatch -> + ok; + _ -> + error + end. + +sync_checksum_file(FluName, File) -> + %% We just lookup the pid here - we don't start a proxy server. If + %% there isn't a pid for this file, then we just return ok. The + %% csum file was synced when the proxy was shutdown. + %% + %% If there *is* a pid, we call the sync function to ensure the + %% csum file is sync'd before we return. (Or an error if we get + %% an error). + case machi_flu_metadata_mgr:lookup_proxy_pid(FluName, {file, File}) of undefined -> ok; - Pid -> - Ref = make_ref(), - Pid ! {sync_stuff, self(), Ref}, - receive - {sync_finished, Ref} -> - ok - after 5000 -> - case write_server_find_pid(Prefix) of - undefined -> - ok; - Pid2 when Pid2 /= Pid -> - ok; - _Pid2 -> - error - end - end - end. - -write_server_get_pid(Prefix, EpochID, DataDir, LinkPid) -> - case write_server_find_pid(Prefix) of - undefined -> - start_seq_append_server(Prefix, EpochID, DataDir, LinkPid), - timer:sleep(1), - write_server_get_pid(Prefix, EpochID, DataDir, LinkPid); - Pid -> - Pid - end. - -write_server_find_pid(Prefix) -> - FluName = machi_util:make_regname(Prefix), - whereis(FluName). - -start_seq_append_server(Prefix, EpochID, DataDir, AppendServerPid) -> - proc_lib:spawn_link(fun() -> - %% The following is only necessary to - %% make nice process relationships in - %% 'appmon' and related tools. - put('$ancestors', [AppendServerPid]), - put('$initial_call', {x,y,3}), - link(AppendServerPid), - run_seq_append_server(Prefix, EpochID, DataDir) - end). - -run_seq_append_server(Prefix, EpochID, DataDir) -> - true = register(machi_util:make_regname(Prefix), self()), - run_seq_append_server2(Prefix, EpochID, DataDir). - -run_seq_append_server2(Prefix, EpochID, DataDir) -> - FileNum = machi_util:read_max_filenum(DataDir, Prefix) + 1, - case machi_util:increment_max_filenum(DataDir, Prefix) of - ok -> - machi_util:info_msg("start: ~p server at file ~w\n", - [Prefix, FileNum]), - seq_append_server_loop(DataDir, Prefix, EpochID, FileNum); - Else -> - error_logger:error_msg("start: ~p server at file ~w: ~p\n", - [Prefix, FileNum, Else]), - exit(Else) - - end. - --spec seq_name_hack() -> string(). -seq_name_hack() -> - lists:flatten(io_lib:format("~.36B~.36B", - [element(3,now()), - list_to_integer(os:getpid())])). - -seq_append_server_loop(DataDir, Prefix, EpochID, FileNum) -> - SequencerNameHack = seq_name_hack(), - {File, FullPath} = machi_util:make_data_filename( - DataDir, Prefix, SequencerNameHack, FileNum), - {ok, FHd} = file:open(FullPath, - [read, write, raw, binary]), - CSumPath = machi_util:make_checksum_filename( - DataDir, Prefix, SequencerNameHack, FileNum), - {ok, FHc} = file:open(CSumPath, [append, raw, binary]), - seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}, EpochID, FileNum, - ?MINIMUM_OFFSET). - -seq_append_server_loop(DataDir, Prefix, _File, {FHd,FHc}, EpochID, - FileNum, Offset) - when Offset > ?MAX_FILE_SIZE -> - ok = file:close(FHd), - ok = file:close(FHc), - machi_util:info_msg("rollover: ~p server at file ~w offset ~w\n", - [Prefix, FileNum, Offset]), - run_seq_append_server2(Prefix, EpochID, DataDir); -seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}=FH_, EpochID, - FileNum, Offset) -> - receive - {seq_append, From, Prefix, Chunk, TaggedCSum, Extra, R_EpochID} - when R_EpochID == EpochID -> - if Chunk /= <<>> -> - ok = file:pwrite(FHd, Offset, Chunk); - true -> - ok - end, - From ! {assignment, Offset, File}, - Size = iolist_size(Chunk), - CSum_info = encode_csum_file_entry(Offset, Size, TaggedCSum), - ok = file:write(FHc, CSum_info), - seq_append_server_loop(DataDir, Prefix, File, FH_, EpochID, - FileNum, Offset + Size + Extra); - {seq_append, _From, _Prefix, _Chunk, _TCSum, _Extra, R_EpochID}=MSG -> - %% Rare'ish event: send MSG to myself so it doesn't get lost - %% while we recurse around to pick up a new FileNum. - self() ! MSG, - machi_util:info_msg("rollover: ~p server at file ~w offset ~w " - "by new epoch_id ~W\n", - [Prefix, FileNum, Offset, R_EpochID, 8]), - run_seq_append_server2(Prefix, R_EpochID, DataDir); - {sync_stuff, FromPid, Ref} -> - file:sync(FHc), - FromPid ! {sync_finished, Ref}, - seq_append_server_loop(DataDir, Prefix, File, FH_, EpochID, - FileNum, Offset) - after 30*1000 -> - ok = file:close(FHd), - ok = file:close(FHc), - machi_util:info_msg("stop: ~p server ~p at file ~w offset ~w\n", - [Prefix, self(), FileNum, Offset]), - exit(normal) + {ok, Pid} -> + machi_file_proxy:sync(Pid, csum) end. make_listener_regname(BaseName) -> diff --git a/src/machi_flu_filename_mgr.erl b/src/machi_flu_filename_mgr.erl index c260421..ef2046e 100644 --- a/src/machi_flu_filename_mgr.erl +++ b/src/machi_flu_filename_mgr.erl @@ -19,7 +19,7 @@ %% ------------------------------------------------------------------- %% %% @doc This process is responsible for managing filenames assigned to -%% prefixes. +%% prefixes. It's started out of `machi_flu_psup'. %% %% Supported operations include finding the "current" filename assigned to %% a prefix. Incrementing the sequence number and returning a new file name @@ -44,7 +44,8 @@ -behavior(gen_server). -export([ - start_link/1, + child_spec/2, + start_link/2, find_or_make_filename_from_prefix/1, increment_prefix_sequence/1, list_files_by_prefix/1 @@ -63,8 +64,15 @@ -define(TIMEOUT, 10 * 1000). %% public API -start_link(DataDir) when is_list(DataDir) -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [DataDir], []). + +child_spec(FluName, DataDir) -> + Name = make_filename_mgr_name(FluName), + {Name, + {?MODULE, start_link, [FluName, DataDir]}, + permanent, 5000, worker, [?MODULE]}. + +start_link(FluName, DataDir) when is_atom(FluName) andalso is_list(DataDir) -> + gen_server:start_link({local, make_filename_mgr_name(FluName)}, ?MODULE, [DataDir], []). -spec find_or_make_filename_from_prefix( Prefix :: {prefix, string()} ) -> {file, Filename :: string()} | {error, Reason :: term() } | timeout. @@ -165,3 +173,6 @@ find_file(DataDir, Prefix, N) -> list_files(DataDir, Prefix) -> {F, Path} = machi_util:make_data_filename(DataDir, Prefix, "*", "*"), filelib:wildcard(F, filename:dirname(Path)). + +make_filename_mgr_name(FluName) when is_atom(FluName) -> + list_to_atom(atom_to_list(FluName) ++ "_filename_mgr"). diff --git a/src/machi_flu_metadata_mgr.erl b/src/machi_flu_metadata_mgr.erl index 0fbafa9..9169208 100644 --- a/src/machi_flu_metadata_mgr.erl +++ b/src/machi_flu_metadata_mgr.erl @@ -39,12 +39,13 @@ -define(HASH(X), erlang:phash2(X)). %% hash algorithm to use -define(TIMEOUT, 10 * 1000). %% 10 second timeout --record(state, {name :: atom(), +-record(state, {fluname :: atom(), datadir :: string(), - tid :: ets:tid() + tid :: ets:tid(), + cnt :: non_neg_integer() }). -%% This record goes in the ets table where prefix is the key +%% This record goes in the ets table where filename is the key -record(md, {filename :: string(), proxy_pid :: undefined|pid(), mref :: undefined|reference() %% monitor ref for file proxy @@ -52,11 +53,13 @@ %% public api -export([ - start_link/2, - lookup_manager_pid/1, - lookup_proxy_pid/1, - start_proxy_pid/1, - stop_proxy_pid/1 + child_spec/4, + start_link/4, + lookup_manager_pid/2, + lookup_proxy_pid/2, + start_proxy_pid/2, + stop_proxy_pid/2, + build_metadata_mgr_name/2 ]). %% gen_server callbacks @@ -70,26 +73,34 @@ ]). %% Public API +build_metadata_mgr_name(FluName, N) when is_atom(FluName) andalso is_integer(N) -> + list_to_atom(atom_to_list(FluName) ++ "_metadata_mgr_" ++ integer_to_list(N)). -start_link(Name, DataDir) when is_atom(Name) andalso is_list(DataDir) -> - gen_server:start_link({local, Name}, ?MODULE, [Name, DataDir], []). +child_spec(FluName, C, DataDir, N) -> + Name = build_metadata_mgr_name(FluName, C), + {Name, + {?MODULE, start_link, [FluName, Name, DataDir, N]}, + permanent, 5000, worker, [?MODULE]}. -lookup_manager_pid({file, Filename}) -> - whereis(get_manager_atom(Filename)). +start_link(FluName, Name, DataDir, Num) when is_atom(Name) andalso is_list(DataDir) -> + gen_server:start_link({local, Name}, ?MODULE, [FluName, Name, DataDir, Num], []). -lookup_proxy_pid({file, Filename}) -> - gen_server:call(get_manager_atom(Filename), {proxy_pid, Filename}, ?TIMEOUT). +lookup_manager_pid(FluName, {file, Filename}) -> + whereis(get_manager_atom(FluName, Filename)). -start_proxy_pid({file, Filename}) -> - gen_server:call(get_manager_atom(Filename), {start_proxy_pid, Filename}, ?TIMEOUT). +lookup_proxy_pid(FluName, {file, Filename}) -> + gen_server:call(get_manager_atom(FluName, Filename), {proxy_pid, Filename}, ?TIMEOUT). -stop_proxy_pid({file, Filename}) -> - gen_server:call(get_manager_atom(Filename), {stop_proxy_pid, Filename}, ?TIMEOUT). +start_proxy_pid(FluName, {file, Filename}) -> + gen_server:call(get_manager_atom(FluName, Filename), {start_proxy_pid, Filename}, ?TIMEOUT). + +stop_proxy_pid(FluName, {file, Filename}) -> + gen_server:call(get_manager_atom(FluName, Filename), {stop_proxy_pid, Filename}, ?TIMEOUT). %% gen_server callbacks -init([Name, DataDir]) -> +init([FluName, Name, DataDir, Num]) -> Tid = ets:new(Name, [{keypos, 2}, {read_concurrency, true}, {write_concurrency, true}]), - {ok, #state{ name = Name, datadir = DataDir, tid = Tid}}. + {ok, #state{ fluname = FluName, datadir = DataDir, tid = Tid, cnt = Num}}. handle_cast(Req, State) -> lager:warning("Got unknown cast ~p", [Req]), @@ -102,12 +113,12 @@ handle_call({proxy_pid, Filename}, _From, State = #state{ tid = Tid }) -> end, {reply, Reply, State}; -handle_call({start_proxy_pid, Filename}, _From, State = #state{ tid = Tid, datadir = D }) -> +handle_call({start_proxy_pid, Filename}, _From, State = #state{ fluname = N, tid = Tid, datadir = D }) -> NewR = case lookup_md(Tid, Filename) of not_found -> - start_file_proxy(D, Filename); + start_file_proxy(N, D, Filename); #md{ proxy_pid = undefined } = R0 -> - start_file_proxy(D, R0); + start_file_proxy(N, D, R0); #md{ proxy_pid = _Pid } = R1 -> R1 end, @@ -180,13 +191,11 @@ compute_hash(Data) -> ?HASH(Data). compute_worker(Hash) -> - Hash rem ?MAX_MGRS. + MgrCount = get_env(metadata_manager_count, ?MAX_MGRS), + Hash rem MgrCount. -build_metadata_mgr_name(N) when is_integer(N) -> - list_to_atom("machi_flu_metadata_mgr_" ++ integer_to_list(N)). - -get_manager_atom(Data) -> - build_metadata_mgr_name(compute_worker(compute_hash(Data))). +get_manager_atom(FluName, Data) -> + build_metadata_mgr_name(FluName, compute_worker(compute_hash(Data))). lookup_md(Tid, Data) -> case ets:lookup(Tid, Data) of @@ -194,13 +203,13 @@ lookup_md(Tid, Data) -> [R] -> R end. -start_file_proxy(D, R = #md{filename = F} ) -> - {ok, Pid} = machi_file_proxy_sup:start_proxy(D, F), +start_file_proxy(FluName, D, R = #md{filename = F} ) -> + {ok, Pid} = machi_file_proxy_sup:start_proxy(FluName, D, F), Mref = monitor(process, Pid), R#md{ proxy_pid = Pid, mref = Mref }; -start_file_proxy(D, Filename) -> - start_file_proxy(D, #md{ filename = Filename }). +start_file_proxy(FluName, D, Filename) -> + start_file_proxy(FluName, D, #md{ filename = Filename }). update_ets(Tid, R) -> ets:insert(Tid, R). @@ -215,3 +224,9 @@ purge_ets(Tid, R) -> get_md_record_by_mref(Tid, Mref) -> [R] = ets:match_object(Tid, {md, '_', '_', Mref}), R. + +get_env(Setting, Default) -> + case application:get_env(machi, Setting) of + undefined -> Default; + {ok, V} -> V + end. diff --git a/src/machi_flu_psup.erl b/src/machi_flu_psup.erl index 491ae27..127629b 100644 --- a/src/machi_flu_psup.erl +++ b/src/machi_flu_psup.erl @@ -130,11 +130,24 @@ init([FluName, TcpPort, DataDir, Props0]) -> {machi_chain_manager1, start_link, [FluName, [], Props]}, permanent, ?SHUTDOWN, worker, []}, + + FNameMgrSpec = machi_flu_filename_mgr:child_spec(FluName, DataDir), + + MetaMgrCnt = get_env(metadata_manager_count, 10), + MetaSupSpec = machi_flu_metadata_mgr_sup:child_spec(FluName, DataDir, MetaMgrCnt), + + FProxySupSpec = machi_file_proxy_sup:child_spec(FluName), + FluSpec = {FluName, {machi_flu1, start_link, [ [{FluName, TcpPort, DataDir}|Props] ]}, permanent, ?SHUTDOWN, worker, []}, - {ok, {SupFlags, [ProjSpec, FitnessSpec, MgrSpec, FluSpec]}}. + + + {ok, {SupFlags, [ + ProjSpec, FitnessSpec, MgrSpec, + FProxySupSpec, FNameMgrSpec, MetaSupSpec, + FluSpec]}}. make_flu_regname(FluName) when is_atom(FluName) -> FluName. @@ -150,3 +163,9 @@ make_proj_supname(ProjName) when is_atom(ProjName) -> make_fitness_regname(FluName) when is_atom(FluName) -> list_to_atom(atom_to_list(FluName) ++ "_fitness"). + +get_env(Setting, Default) -> + case application:get_env(machi, Setting) of + undefined -> Default; + {ok, V} -> V + end.