Integration with current FLU implementation

This commit is contained in:
Mark Allen 2015-10-05 22:18:29 -05:00
parent 36c11e7d08
commit 2d0c03ef35
6 changed files with 162 additions and 257 deletions

View file

@ -24,8 +24,9 @@
%% public API %% public API
-export([ -export([
start_link/0, child_spec/1,
start_proxy/2 start_link/1,
start_proxy/3
]). ]).
%% supervisor callback %% supervisor callback
@ -33,14 +34,23 @@
init/1 init/1
]). ]).
start_link() -> child_spec(FluName) ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []). Name = make_proxy_name(FluName),
{Name,
{?MODULE, start_link, [FluName]},
permanent, 5000, supervisor, [?MODULE]}.
start_proxy(Filename, DataDir) -> start_link(FluName) ->
supervisor:start_child(?MODULE, [Filename, DataDir]). supervisor:start_link({local, make_proxy_name(FluName)}, ?MODULE, []).
start_proxy(FluName, Filename, DataDir) ->
supervisor:start_child(make_proxy_name(FluName), [Filename, DataDir]).
init([]) -> init([]) ->
SupFlags = {simple_one_for_one, 1000, 10}, SupFlags = {simple_one_for_one, 1000, 10},
ChildSpec = {unused, {machi_file_proxy, start_link, []}, ChildSpec = {unused, {machi_file_proxy, start_link, []},
temporary, 2000, worker, [machi_file_proxy]}, temporary, 2000, worker, [machi_file_proxy]},
{ok, {SupFlags, [ChildSpec]}}. {ok, {SupFlags, [ChildSpec]}}.
make_proxy_name(FluName) when is_atom(FluName) ->
list_to_atom(atom_to_list(FluName) ++ "_file_proxy_sup").

View file

@ -185,7 +185,7 @@ handle_info({adjust_down_list, FLU}, #state{active_unfit=ActiveUnfit}=S) ->
handle_info(dump, #state{my_flu_name=MyFluName,active_unfit=ActiveUnfit, handle_info(dump, #state{my_flu_name=MyFluName,active_unfit=ActiveUnfit,
pending_map=Map}=S) -> pending_map=Map}=S) ->
%% io:format(user, "DUMP: ~w/~w: ~p ~W\n", [MyFluName, self(), ActiveUnfit, map_value(Map), 13]), %% io:format(user, "DUMP: ~w/~w: ~p ~W\n", [MyFluName, self(), ActiveUnfit, map_value(Map), 13]),
io:format(user, "DUMP ~w: ~w, ", [MyFluName, ActiveUnfit]), %io:format(user, "DUMP ~w: ~w, ", [MyFluName, ActiveUnfit]),
{noreply, S}; {noreply, S};
handle_info(_Info, S) -> handle_info(_Info, S) ->
{noreply, S}. {noreply, S}.

View file

@ -216,10 +216,9 @@ listen_server_loop(LSock, S) ->
spawn_link(fun() -> net_server_loop(Sock, S) end), spawn_link(fun() -> net_server_loop(Sock, S) end),
listen_server_loop(LSock, S). listen_server_loop(LSock, S).
append_server_loop(FluPid, #state{data_dir=DataDir, wedged=Wedged_p, append_server_loop(FluPid, #state{wedged=Wedged_p,
witness=Witness_p, witness=Witness_p,
epoch_id=OldEpochId, flu_name=FluName}=S) -> epoch_id=OldEpochId, flu_name=FluName}=S) ->
AppendServerPid = self(),
receive receive
{seq_append, From, _Prefix, _Chunk, _CSum, _Extra, _EpochID} {seq_append, From, _Prefix, _Chunk, _CSum, _Extra, _EpochID}
when Witness_p -> when Witness_p ->
@ -233,9 +232,16 @@ append_server_loop(FluPid, #state{data_dir=DataDir, wedged=Wedged_p,
From ! wedged, From ! wedged,
append_server_loop(FluPid, S); append_server_loop(FluPid, S);
{seq_append, From, Prefix, Chunk, CSum, Extra, EpochID} -> {seq_append, From, Prefix, Chunk, CSum, Extra, EpochID} ->
spawn(fun() -> append_server_dispatch(From, Prefix, %% Old is the one from our state, plain old 'EpochID' comes
Chunk, CSum, Extra, EpochID, %% from the client.
DataDir, AppendServerPid) end), case OldEpochId == EpochID of
true ->
spawn(fun() ->
append_server_dispatch(From, Prefix, Chunk, CSum, Extra, FluName)
end);
false ->
From ! {error, bad_epoch}
end,
append_server_loop(FluPid, S); append_server_loop(FluPid, S);
{wedge_myself, WedgeEpochId} -> {wedge_myself, WedgeEpochId} ->
if not Wedged_p andalso WedgeEpochId == OldEpochId -> if not Wedged_p andalso WedgeEpochId == OldEpochId ->
@ -484,7 +490,7 @@ do_server_proj_request({kick_projection_reaction},
do_server_append_chunk(PKey, Prefix, Chunk, CSum_tag, CSum, do_server_append_chunk(PKey, Prefix, Chunk, CSum_tag, CSum,
ChunkExtra, S) -> ChunkExtra, S) ->
case sanitize_file_string(Prefix) of case sanitize_prefix(Prefix) of
ok -> ok ->
do_server_append_chunk2(PKey, Prefix, Chunk, CSum_tag, CSum, do_server_append_chunk2(PKey, Prefix, Chunk, CSum_tag, CSum,
ChunkExtra, S); ChunkExtra, S);
@ -519,98 +525,30 @@ do_server_append_chunk2(_PKey, Prefix, Chunk, CSum_tag, Client_CSum,
{error, bad_arg} {error, bad_arg}
end. end.
do_server_write_chunk(File, Offset, Chunk, CSum_tag, CSum, do_server_write_chunk(File, Offset, Chunk, CSum_tag, CSum, #state{flu_name=FluName}) ->
#state{data_dir=DataDir}=S) ->
case sanitize_file_string(File) of case sanitize_file_string(File) of
ok -> ok ->
CSumPath = machi_util:make_checksum_filename(DataDir, File), {ok, Pid} = machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}),
case file:open(CSumPath, [append, raw, binary]) of Meta = [{client_csum_tag, CSum_tag}, {client_csum, CSum}],
{ok, FHc} -> machi_file_proxy:write(Pid, Offset, Meta, Chunk);
Path = DataDir ++ "/data/" ++
machi_util:make_string(File),
{ok, FHd} = file:open(Path, [read, write, raw, binary]),
try
do_server_write_chunk2(
File, Offset, Chunk, CSum_tag, CSum, DataDir,
FHc, FHd)
after
(catch file:close(FHc)),
(catch file:close(FHd))
end;
{error, enoent} ->
ok = filelib:ensure_dir(CSumPath),
do_server_write_chunk(File, Offset, Chunk, CSum_tag,
CSum, S)
end;
_ -> _ ->
{error, bad_arg} {error, bad_arg}
end. end.
do_server_write_chunk2(_File, Offset, Chunk, CSum_tag, do_server_read_chunk(File, Offset, Size, _Opts, #state{flu_name=FluName})->
Client_CSum, _DataDir, FHc, FHd) ->
try
TaggedCSum = check_or_make_tagged_checksum(CSum_tag, Client_CSum,Chunk),
Size = iolist_size(Chunk),
case file:pwrite(FHd, Offset, Chunk) of
ok ->
CSum_info = encode_csum_file_entry(Offset, Size, TaggedCSum),
ok = file:write(FHc, CSum_info),
ok;
_Else3 ->
machi_util:verb("Else3 ~p ~p ~p\n",
[Offset, Size, _Else3]),
{error, bad_arg}
end
catch
throw:{bad_csum, _CS} ->
{error, bad_checksum};
error:badarg ->
error_logger:error_msg("Message send to ~p gave badarg, make certain server is running with correct registered name\n", [?MODULE]),
{error, bad_arg}
end.
do_server_read_chunk(File, Offset, Size, _Opts, #state{data_dir=DataDir})->
%% TODO: Look inside Opts someday. %% TODO: Look inside Opts someday.
case sanitize_file_string(File) of case sanitize_file_string(File) of
ok -> ok ->
{_, Path} = machi_util:make_data_filename(DataDir, File), {ok, Pid} = machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}),
case file:open(Path, [read, binary, raw]) of machi_file_proxy:read(Pid, Offset, Size);
{ok, FH} ->
try
case file:pread(FH, Offset, Size) of
{ok, Bytes} when byte_size(Bytes) == Size ->
{ok, Bytes};
{ok, Bytes} ->
machi_util:verb("ok read but wanted ~p got ~p: ~p @ offset ~p\n",
[Size,size(Bytes),File,Offset]),
io:format(user, "ok read but wanted ~p got ~p: ~p @ offset ~p\n",
[Size,size(Bytes),File,Offset]),
{error, partial_read};
eof ->
{error, not_written}; %% TODO perhaps_do_net_server_ec_read(Sock, FH);
_Else2 ->
machi_util:verb("Else2 ~p ~p ~P\n",
[Offset, Size, _Else2, 20]),
{error, bad_read}
end
after
file:close(FH)
end;
{error, enoent} ->
{error, not_written};
{error, _Else} ->
io:format(user, "Unexpected ~p at ~p ~p\n",
[_Else, ?MODULE, ?LINE]),
{error, bad_arg}
end;
_ -> _ ->
{error, bad_arg} {error, bad_arg}
end. end.
do_server_checksum_listing(File, #state{data_dir=DataDir}=_S) -> do_server_checksum_listing(File, #state{flu_name=FluName, data_dir=DataDir}=_S) ->
case sanitize_file_string(File) of case sanitize_file_string(File) of
ok -> ok ->
ok = sync_checksum_file(File), ok = sync_checksum_file(FluName, File),
CSumPath = machi_util:make_checksum_filename(DataDir, File), CSumPath = machi_util:make_checksum_filename(DataDir, File),
%% TODO: If this file is legitimately bigger than our %% TODO: If this file is legitimately bigger than our
%% {packet_size,N} limit, then we'll have a difficult time, eh? %% {packet_size,N} limit, then we'll have a difficult time, eh?
@ -692,12 +630,29 @@ do_server_trunc_hack(File, #state{data_dir=DataDir}=_S) ->
{error, bad_arg} {error, bad_arg}
end. end.
append_server_dispatch(From, Prefix, Chunk, CSum, Extra, EpochID, append_server_dispatch(From, Prefix, Chunk, CSum, Extra, FluName) ->
DataDir, LinkPid) -> Result = case handle_append(Prefix, Chunk, CSum, Extra, FluName) of
Pid = write_server_get_pid(Prefix, EpochID, DataDir, LinkPid), {ok, File, Offset} ->
Pid ! {seq_append, From, Prefix, Chunk, CSum, Extra, EpochID}, {assignment, Offset, File};
Other ->
Other
end,
From ! Result,
exit(normal). exit(normal).
handle_append(_Prefix, <<>>, _Csum, _Extra, _FluName) ->
{error, bad_arg};
handle_append(Prefix, Chunk, Csum, Extra, FluName) ->
case machi_flu_filename_mgr:find_or_make_filename_from_prefix({prefix, Prefix}) of
{file, F} ->
{ok, Pid} = machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, F}),
{Tag, Csum} = machi_util:unmake_tagged_csum(Csum),
Meta = [{client_csum_tag, Tag}, {client_csum, Csum}],
machi_file_proxy:append(Pid, Meta, Extra, Chunk);
Error ->
Error
end.
sanitize_file_string(Str) -> sanitize_file_string(Str) ->
case re:run(Str, "/") of case re:run(Str, "/") of
nomatch -> nomatch ->
@ -706,133 +661,28 @@ sanitize_file_string(Str) ->
error error
end. end.
sync_checksum_file(File) -> sanitize_prefix(Prefix) ->
Prefix = re:replace(File, "\\..*", "", [{return, binary}]), %% We are using '^' as our component delimiter
case write_server_find_pid(Prefix) of case re:run(Prefix, "^|/") of
undefined -> nomatch ->
ok; ok;
Pid -> _ ->
Ref = make_ref(),
Pid ! {sync_stuff, self(), Ref},
receive
{sync_finished, Ref} ->
ok
after 5000 ->
case write_server_find_pid(Prefix) of
undefined ->
ok;
Pid2 when Pid2 /= Pid ->
ok;
_Pid2 ->
error error
end
end
end. end.
write_server_get_pid(Prefix, EpochID, DataDir, LinkPid) -> sync_checksum_file(FluName, File) ->
case write_server_find_pid(Prefix) of %% We just lookup the pid here - we don't start a proxy server. If
%% there isn't a pid for this file, then we just return ok. The
%% csum file was synced when the proxy was shutdown.
%%
%% If there *is* a pid, we call the sync function to ensure the
%% csum file is sync'd before we return. (Or an error if we get
%% an error).
case machi_flu_metadata_mgr:lookup_proxy_pid(FluName, {file, File}) of
undefined -> undefined ->
start_seq_append_server(Prefix, EpochID, DataDir, LinkPid), ok;
timer:sleep(1), {ok, Pid} ->
write_server_get_pid(Prefix, EpochID, DataDir, LinkPid); machi_file_proxy:sync(Pid, csum)
Pid ->
Pid
end.
write_server_find_pid(Prefix) ->
FluName = machi_util:make_regname(Prefix),
whereis(FluName).
start_seq_append_server(Prefix, EpochID, DataDir, AppendServerPid) ->
proc_lib:spawn_link(fun() ->
%% The following is only necessary to
%% make nice process relationships in
%% 'appmon' and related tools.
put('$ancestors', [AppendServerPid]),
put('$initial_call', {x,y,3}),
link(AppendServerPid),
run_seq_append_server(Prefix, EpochID, DataDir)
end).
run_seq_append_server(Prefix, EpochID, DataDir) ->
true = register(machi_util:make_regname(Prefix), self()),
run_seq_append_server2(Prefix, EpochID, DataDir).
run_seq_append_server2(Prefix, EpochID, DataDir) ->
FileNum = machi_util:read_max_filenum(DataDir, Prefix) + 1,
case machi_util:increment_max_filenum(DataDir, Prefix) of
ok ->
machi_util:info_msg("start: ~p server at file ~w\n",
[Prefix, FileNum]),
seq_append_server_loop(DataDir, Prefix, EpochID, FileNum);
Else ->
error_logger:error_msg("start: ~p server at file ~w: ~p\n",
[Prefix, FileNum, Else]),
exit(Else)
end.
-spec seq_name_hack() -> string().
seq_name_hack() ->
lists:flatten(io_lib:format("~.36B~.36B",
[element(3,now()),
list_to_integer(os:getpid())])).
seq_append_server_loop(DataDir, Prefix, EpochID, FileNum) ->
SequencerNameHack = seq_name_hack(),
{File, FullPath} = machi_util:make_data_filename(
DataDir, Prefix, SequencerNameHack, FileNum),
{ok, FHd} = file:open(FullPath,
[read, write, raw, binary]),
CSumPath = machi_util:make_checksum_filename(
DataDir, Prefix, SequencerNameHack, FileNum),
{ok, FHc} = file:open(CSumPath, [append, raw, binary]),
seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}, EpochID, FileNum,
?MINIMUM_OFFSET).
seq_append_server_loop(DataDir, Prefix, _File, {FHd,FHc}, EpochID,
FileNum, Offset)
when Offset > ?MAX_FILE_SIZE ->
ok = file:close(FHd),
ok = file:close(FHc),
machi_util:info_msg("rollover: ~p server at file ~w offset ~w\n",
[Prefix, FileNum, Offset]),
run_seq_append_server2(Prefix, EpochID, DataDir);
seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}=FH_, EpochID,
FileNum, Offset) ->
receive
{seq_append, From, Prefix, Chunk, TaggedCSum, Extra, R_EpochID}
when R_EpochID == EpochID ->
if Chunk /= <<>> ->
ok = file:pwrite(FHd, Offset, Chunk);
true ->
ok
end,
From ! {assignment, Offset, File},
Size = iolist_size(Chunk),
CSum_info = encode_csum_file_entry(Offset, Size, TaggedCSum),
ok = file:write(FHc, CSum_info),
seq_append_server_loop(DataDir, Prefix, File, FH_, EpochID,
FileNum, Offset + Size + Extra);
{seq_append, _From, _Prefix, _Chunk, _TCSum, _Extra, R_EpochID}=MSG ->
%% Rare'ish event: send MSG to myself so it doesn't get lost
%% while we recurse around to pick up a new FileNum.
self() ! MSG,
machi_util:info_msg("rollover: ~p server at file ~w offset ~w "
"by new epoch_id ~W\n",
[Prefix, FileNum, Offset, R_EpochID, 8]),
run_seq_append_server2(Prefix, R_EpochID, DataDir);
{sync_stuff, FromPid, Ref} ->
file:sync(FHc),
FromPid ! {sync_finished, Ref},
seq_append_server_loop(DataDir, Prefix, File, FH_, EpochID,
FileNum, Offset)
after 30*1000 ->
ok = file:close(FHd),
ok = file:close(FHc),
machi_util:info_msg("stop: ~p server ~p at file ~w offset ~w\n",
[Prefix, self(), FileNum, Offset]),
exit(normal)
end. end.
make_listener_regname(BaseName) -> make_listener_regname(BaseName) ->

View file

@ -19,7 +19,7 @@
%% ------------------------------------------------------------------- %% -------------------------------------------------------------------
%% %%
%% @doc This process is responsible for managing filenames assigned to %% @doc This process is responsible for managing filenames assigned to
%% prefixes. %% prefixes. It's started out of `machi_flu_psup'.
%% %%
%% Supported operations include finding the "current" filename assigned to %% Supported operations include finding the "current" filename assigned to
%% a prefix. Incrementing the sequence number and returning a new file name %% a prefix. Incrementing the sequence number and returning a new file name
@ -44,7 +44,8 @@
-behavior(gen_server). -behavior(gen_server).
-export([ -export([
start_link/1, child_spec/2,
start_link/2,
find_or_make_filename_from_prefix/1, find_or_make_filename_from_prefix/1,
increment_prefix_sequence/1, increment_prefix_sequence/1,
list_files_by_prefix/1 list_files_by_prefix/1
@ -63,8 +64,15 @@
-define(TIMEOUT, 10 * 1000). -define(TIMEOUT, 10 * 1000).
%% public API %% public API
start_link(DataDir) when is_list(DataDir) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [DataDir], []). child_spec(FluName, DataDir) ->
Name = make_filename_mgr_name(FluName),
{Name,
{?MODULE, start_link, [FluName, DataDir]},
permanent, 5000, worker, [?MODULE]}.
start_link(FluName, DataDir) when is_atom(FluName) andalso is_list(DataDir) ->
gen_server:start_link({local, make_filename_mgr_name(FluName)}, ?MODULE, [DataDir], []).
-spec find_or_make_filename_from_prefix( Prefix :: {prefix, string()} ) -> -spec find_or_make_filename_from_prefix( Prefix :: {prefix, string()} ) ->
{file, Filename :: string()} | {error, Reason :: term() } | timeout. {file, Filename :: string()} | {error, Reason :: term() } | timeout.
@ -165,3 +173,6 @@ find_file(DataDir, Prefix, N) ->
list_files(DataDir, Prefix) -> list_files(DataDir, Prefix) ->
{F, Path} = machi_util:make_data_filename(DataDir, Prefix, "*", "*"), {F, Path} = machi_util:make_data_filename(DataDir, Prefix, "*", "*"),
filelib:wildcard(F, filename:dirname(Path)). filelib:wildcard(F, filename:dirname(Path)).
make_filename_mgr_name(FluName) when is_atom(FluName) ->
list_to_atom(atom_to_list(FluName) ++ "_filename_mgr").

View file

@ -39,12 +39,13 @@
-define(HASH(X), erlang:phash2(X)). %% hash algorithm to use -define(HASH(X), erlang:phash2(X)). %% hash algorithm to use
-define(TIMEOUT, 10 * 1000). %% 10 second timeout -define(TIMEOUT, 10 * 1000). %% 10 second timeout
-record(state, {name :: atom(), -record(state, {fluname :: atom(),
datadir :: string(), datadir :: string(),
tid :: ets:tid() tid :: ets:tid(),
cnt :: non_neg_integer()
}). }).
%% This record goes in the ets table where prefix is the key %% This record goes in the ets table where filename is the key
-record(md, {filename :: string(), -record(md, {filename :: string(),
proxy_pid :: undefined|pid(), proxy_pid :: undefined|pid(),
mref :: undefined|reference() %% monitor ref for file proxy mref :: undefined|reference() %% monitor ref for file proxy
@ -52,11 +53,13 @@
%% public api %% public api
-export([ -export([
start_link/2, child_spec/4,
lookup_manager_pid/1, start_link/4,
lookup_proxy_pid/1, lookup_manager_pid/2,
start_proxy_pid/1, lookup_proxy_pid/2,
stop_proxy_pid/1 start_proxy_pid/2,
stop_proxy_pid/2,
build_metadata_mgr_name/2
]). ]).
%% gen_server callbacks %% gen_server callbacks
@ -70,26 +73,34 @@
]). ]).
%% Public API %% Public API
build_metadata_mgr_name(FluName, N) when is_atom(FluName) andalso is_integer(N) ->
list_to_atom(atom_to_list(FluName) ++ "_metadata_mgr_" ++ integer_to_list(N)).
start_link(Name, DataDir) when is_atom(Name) andalso is_list(DataDir) -> child_spec(FluName, C, DataDir, N) ->
gen_server:start_link({local, Name}, ?MODULE, [Name, DataDir], []). Name = build_metadata_mgr_name(FluName, C),
{Name,
{?MODULE, start_link, [FluName, Name, DataDir, N]},
permanent, 5000, worker, [?MODULE]}.
lookup_manager_pid({file, Filename}) -> start_link(FluName, Name, DataDir, Num) when is_atom(Name) andalso is_list(DataDir) ->
whereis(get_manager_atom(Filename)). gen_server:start_link({local, Name}, ?MODULE, [FluName, Name, DataDir, Num], []).
lookup_proxy_pid({file, Filename}) -> lookup_manager_pid(FluName, {file, Filename}) ->
gen_server:call(get_manager_atom(Filename), {proxy_pid, Filename}, ?TIMEOUT). whereis(get_manager_atom(FluName, Filename)).
start_proxy_pid({file, Filename}) -> lookup_proxy_pid(FluName, {file, Filename}) ->
gen_server:call(get_manager_atom(Filename), {start_proxy_pid, Filename}, ?TIMEOUT). gen_server:call(get_manager_atom(FluName, Filename), {proxy_pid, Filename}, ?TIMEOUT).
stop_proxy_pid({file, Filename}) -> start_proxy_pid(FluName, {file, Filename}) ->
gen_server:call(get_manager_atom(Filename), {stop_proxy_pid, Filename}, ?TIMEOUT). gen_server:call(get_manager_atom(FluName, Filename), {start_proxy_pid, Filename}, ?TIMEOUT).
stop_proxy_pid(FluName, {file, Filename}) ->
gen_server:call(get_manager_atom(FluName, Filename), {stop_proxy_pid, Filename}, ?TIMEOUT).
%% gen_server callbacks %% gen_server callbacks
init([Name, DataDir]) -> init([FluName, Name, DataDir, Num]) ->
Tid = ets:new(Name, [{keypos, 2}, {read_concurrency, true}, {write_concurrency, true}]), Tid = ets:new(Name, [{keypos, 2}, {read_concurrency, true}, {write_concurrency, true}]),
{ok, #state{ name = Name, datadir = DataDir, tid = Tid}}. {ok, #state{ fluname = FluName, datadir = DataDir, tid = Tid, cnt = Num}}.
handle_cast(Req, State) -> handle_cast(Req, State) ->
lager:warning("Got unknown cast ~p", [Req]), lager:warning("Got unknown cast ~p", [Req]),
@ -102,12 +113,12 @@ handle_call({proxy_pid, Filename}, _From, State = #state{ tid = Tid }) ->
end, end,
{reply, Reply, State}; {reply, Reply, State};
handle_call({start_proxy_pid, Filename}, _From, State = #state{ tid = Tid, datadir = D }) -> handle_call({start_proxy_pid, Filename}, _From, State = #state{ fluname = N, tid = Tid, datadir = D }) ->
NewR = case lookup_md(Tid, Filename) of NewR = case lookup_md(Tid, Filename) of
not_found -> not_found ->
start_file_proxy(D, Filename); start_file_proxy(N, D, Filename);
#md{ proxy_pid = undefined } = R0 -> #md{ proxy_pid = undefined } = R0 ->
start_file_proxy(D, R0); start_file_proxy(N, D, R0);
#md{ proxy_pid = _Pid } = R1 -> #md{ proxy_pid = _Pid } = R1 ->
R1 R1
end, end,
@ -180,13 +191,11 @@ compute_hash(Data) ->
?HASH(Data). ?HASH(Data).
compute_worker(Hash) -> compute_worker(Hash) ->
Hash rem ?MAX_MGRS. MgrCount = get_env(metadata_manager_count, ?MAX_MGRS),
Hash rem MgrCount.
build_metadata_mgr_name(N) when is_integer(N) -> get_manager_atom(FluName, Data) ->
list_to_atom("machi_flu_metadata_mgr_" ++ integer_to_list(N)). build_metadata_mgr_name(FluName, compute_worker(compute_hash(Data))).
get_manager_atom(Data) ->
build_metadata_mgr_name(compute_worker(compute_hash(Data))).
lookup_md(Tid, Data) -> lookup_md(Tid, Data) ->
case ets:lookup(Tid, Data) of case ets:lookup(Tid, Data) of
@ -194,13 +203,13 @@ lookup_md(Tid, Data) ->
[R] -> R [R] -> R
end. end.
start_file_proxy(D, R = #md{filename = F} ) -> start_file_proxy(FluName, D, R = #md{filename = F} ) ->
{ok, Pid} = machi_file_proxy_sup:start_proxy(D, F), {ok, Pid} = machi_file_proxy_sup:start_proxy(FluName, D, F),
Mref = monitor(process, Pid), Mref = monitor(process, Pid),
R#md{ proxy_pid = Pid, mref = Mref }; R#md{ proxy_pid = Pid, mref = Mref };
start_file_proxy(D, Filename) -> start_file_proxy(FluName, D, Filename) ->
start_file_proxy(D, #md{ filename = Filename }). start_file_proxy(FluName, D, #md{ filename = Filename }).
update_ets(Tid, R) -> update_ets(Tid, R) ->
ets:insert(Tid, R). ets:insert(Tid, R).
@ -215,3 +224,9 @@ purge_ets(Tid, R) ->
get_md_record_by_mref(Tid, Mref) -> get_md_record_by_mref(Tid, Mref) ->
[R] = ets:match_object(Tid, {md, '_', '_', Mref}), [R] = ets:match_object(Tid, {md, '_', '_', Mref}),
R. R.
get_env(Setting, Default) ->
case application:get_env(machi, Setting) of
undefined -> Default;
{ok, V} -> V
end.

View file

@ -130,11 +130,24 @@ init([FluName, TcpPort, DataDir, Props0]) ->
{machi_chain_manager1, start_link, {machi_chain_manager1, start_link,
[FluName, [], Props]}, [FluName, [], Props]},
permanent, ?SHUTDOWN, worker, []}, permanent, ?SHUTDOWN, worker, []},
FNameMgrSpec = machi_flu_filename_mgr:child_spec(FluName, DataDir),
MetaMgrCnt = get_env(metadata_manager_count, 10),
MetaSupSpec = machi_flu_metadata_mgr_sup:child_spec(FluName, DataDir, MetaMgrCnt),
FProxySupSpec = machi_file_proxy_sup:child_spec(FluName),
FluSpec = {FluName, FluSpec = {FluName,
{machi_flu1, start_link, {machi_flu1, start_link,
[ [{FluName, TcpPort, DataDir}|Props] ]}, [ [{FluName, TcpPort, DataDir}|Props] ]},
permanent, ?SHUTDOWN, worker, []}, permanent, ?SHUTDOWN, worker, []},
{ok, {SupFlags, [ProjSpec, FitnessSpec, MgrSpec, FluSpec]}}.
{ok, {SupFlags, [
ProjSpec, FitnessSpec, MgrSpec,
FProxySupSpec, FNameMgrSpec, MetaSupSpec,
FluSpec]}}.
make_flu_regname(FluName) when is_atom(FluName) -> make_flu_regname(FluName) when is_atom(FluName) ->
FluName. FluName.
@ -150,3 +163,9 @@ make_proj_supname(ProjName) when is_atom(ProjName) ->
make_fitness_regname(FluName) when is_atom(FluName) -> make_fitness_regname(FluName) when is_atom(FluName) ->
list_to_atom(atom_to_list(FluName) ++ "_fitness"). list_to_atom(atom_to_list(FluName) ++ "_fitness").
get_env(Setting, Default) ->
case application:get_env(machi, Setting) of
undefined -> Default;
{ok, V} -> V
end.