Cache generated names until disk files are written

This commit is contained in:
Mark Allen 2015-10-06 22:44:31 -05:00
parent f83b0973f2
commit d627f238bf

View file

@ -43,12 +43,17 @@
-module(machi_flu_filename_mgr). -module(machi_flu_filename_mgr).
-behavior(gen_server). -behavior(gen_server).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-compile(export_all).
-endif.
-export([ -export([
child_spec/2, child_spec/2,
start_link/2, start_link/2,
find_or_make_filename_from_prefix/1, find_or_make_filename_from_prefix/2,
increment_prefix_sequence/1, increment_prefix_sequence/2,
list_files_by_prefix/1 list_files_by_prefix/2
]). ]).
%% gen_server callbacks %% gen_server callbacks
@ -63,6 +68,11 @@
-define(TIMEOUT, 10 * 1000). -define(TIMEOUT, 10 * 1000).
-record(state, {fluname :: atom(),
tid :: ets:tid(),
datadir :: file:dir()
}).
%% public API %% public API
child_spec(FluName, DataDir) -> child_spec(FluName, DataDir) ->
@ -72,75 +82,63 @@ child_spec(FluName, DataDir) ->
permanent, 5000, worker, [?MODULE]}. permanent, 5000, worker, [?MODULE]}.
start_link(FluName, DataDir) when is_atom(FluName) andalso is_list(DataDir) -> start_link(FluName, DataDir) when is_atom(FluName) andalso is_list(DataDir) ->
gen_server:start_link({local, make_filename_mgr_name(FluName)}, ?MODULE, [DataDir], []). N = make_filename_mgr_name(FluName),
gen_server:start_link({local, N}, ?MODULE, [FluName, DataDir], []).
-spec find_or_make_filename_from_prefix( Prefix :: {prefix, string()} ) -> -spec find_or_make_filename_from_prefix( FluName :: atom(), Prefix :: {prefix, string()} ) ->
{file, Filename :: string()} | {error, Reason :: term() } | timeout. {file, Filename :: string()} | {error, Reason :: term() } | timeout.
% @doc Find the latest available or make a filename from a prefix. A prefix % @doc Find the latest available or make a filename from a prefix. A prefix
% should be in the form of a tagged tuple `{prefix, P}'. Returns a tagged % should be in the form of a tagged tuple `{prefix, P}'. Returns a tagged
% tuple in the form of `{file, F}' or an `{error, Reason}' % tuple in the form of `{file, F}' or an `{error, Reason}'
find_or_make_filename_from_prefix({prefix, Prefix}) -> find_or_make_filename_from_prefix(FluName, {prefix, Prefix}) when is_atom(FluName) ->
gen_server:call(?MODULE, {find_filename, Prefix}, ?TIMEOUT); N = make_filename_mgr_name(FluName),
find_or_make_filename_from_prefix(Other) -> gen_server:call(N, {find_filename, Prefix}, ?TIMEOUT);
find_or_make_filename_from_prefix(_FluName, Other) ->
lager:error("~p is not a valid prefix.", [Other]), lager:error("~p is not a valid prefix.", [Other]),
error(badarg). error(badarg).
-spec increment_prefix_sequence( Prefix :: {prefix, string()} ) -> -spec increment_prefix_sequence( FluName :: atom(), Prefix :: {prefix, string()} ) ->
ok | {error, Reason :: term() } | timeout. ok | {error, Reason :: term() } | timeout.
% @doc Increment the sequence counter for a given prefix. Prefix should % @doc Increment the sequence counter for a given prefix. Prefix should
% be in the form of `{prefix, P}'. % be in the form of `{prefix, P}'.
increment_prefix_sequence({prefix, Prefix}) -> increment_prefix_sequence(FluName, {prefix, Prefix}) when is_atom(FluName) ->
gen_server:call(?MODULE, {increment_sequence, Prefix}, ?TIMEOUT); gen_server:call(make_filename_mgr_name(FluName), {increment_sequence, Prefix}, ?TIMEOUT);
increment_prefix_sequence(Other) -> increment_prefix_sequence(_FluName, Other) ->
lager:error("~p is not a valid prefix.", [Other]), lager:error("~p is not a valid prefix.", [Other]),
error(badarg). error(badarg).
-spec list_files_by_prefix( Prefix :: {prefix, string()} ) -> -spec list_files_by_prefix( FluName :: atom(), Prefix :: {prefix, string()} ) ->
[ file:name() ] | timeout | {error, Reason :: term() }. [ file:name() ] | timeout | {error, Reason :: term() }.
% @doc Given a prefix in the form of `{prefix, P}' return % @doc Given a prefix in the form of `{prefix, P}' return
% all the data files associated with that prefix. Returns % all the data files associated with that prefix. Returns
% a list. % a list.
list_files_by_prefix({prefix, Prefix}) -> list_files_by_prefix(FluName, {prefix, Prefix}) when is_atom(FluName) ->
gen_server:call(?MODULE, {list_files, Prefix}, ?TIMEOUT); gen_server:call(make_filename_mgr_name(FluName), {list_files, Prefix}, ?TIMEOUT);
list_files_by_prefix(Other) -> list_files_by_prefix(_FluName, Other) ->
lager:error("~p is not a valid prefix.", [Other]), lager:error("~p is not a valid prefix.", [Other]),
error(badarg). error(badarg).
%% gen_server API %% gen_server API
init([DataDir]) -> init([FluName, DataDir]) ->
{ok, DataDir}. Tid = ets:new(make_filename_mgr_name(FluName), [named_table, {read_concurrency, true}]),
{ok, #state{ fluname = FluName, datadir = DataDir, tid = Tid }}.
handle_cast(Req, State) -> handle_cast(Req, State) ->
lager:warning("Got unknown cast ~p", [Req]), lager:warning("Got unknown cast ~p", [Req]),
{noreply, State}. {noreply, State}.
handle_call({find_filename, Prefix}, _From, DataDir) -> handle_call({find_filename, Prefix}, _From, S = #state{ datadir = DataDir, tid = Tid }) ->
N = machi_util:read_max_filenum(DataDir, Prefix), File = handle_find_file(Tid, Prefix, DataDir),
File = case find_file(DataDir, Prefix, N) of {reply, {file, File}, S};
[] -> handle_call({increment_sequence, Prefix}, _From, S = #state{ datadir = DataDir }) ->
{F, _} = machi_util:make_data_filename(
DataDir,
Prefix,
generate_uuid_v4_str(),
N),
F;
[H] -> H;
[Fn | _ ] = L ->
lager:warning(
"Searching for a matching file to prefix ~p and sequence number ~p gave multiples: ~p",
[Prefix, N, L]),
Fn
end,
{reply, {file, File}, DataDir};
handle_call({increment_sequence, Prefix}, _From, DataDir) ->
ok = machi_util:increment_max_filenum(DataDir, Prefix), ok = machi_util:increment_max_filenum(DataDir, Prefix),
{reply, ok, DataDir}; {reply, ok, S};
handle_call({list_files, Prefix}, From, DataDir) -> handle_call({list_files, Prefix}, From, S = #state{ datadir = DataDir }) ->
spawn(fun() -> spawn(fun() ->
L = list_files(DataDir, Prefix), L = list_files(DataDir, Prefix),
gen_server:reply(From, L) gen_server:reply(From, L)
end), end),
{noreply, DataDir}; {noreply, S};
handle_call(Req, From, State) -> handle_call(Req, From, State) ->
lager:warning("Got unknown call ~p from ~p", [Req, From]), lager:warning("Got unknown call ~p from ~p", [Req, From]),
@ -176,3 +174,47 @@ list_files(DataDir, Prefix) ->
make_filename_mgr_name(FluName) when is_atom(FluName) -> make_filename_mgr_name(FluName) when is_atom(FluName) ->
list_to_atom(atom_to_list(FluName) ++ "_filename_mgr"). list_to_atom(atom_to_list(FluName) ++ "_filename_mgr").
handle_find_file(Tid, Prefix, DataDir) ->
N = machi_util:read_max_filenum(DataDir, Prefix),
{File, Cleanup} = case find_file(DataDir, Prefix, N) of
[] ->
{find_or_make_filename(Tid, DataDir, Prefix, N), false};
[H] -> {H, true};
[Fn | _ ] = L ->
lager:warning(
"Searching for a matching file to prefix ~p and sequence number ~p gave multiples: ~p",
[Prefix, N, L]),
{Fn, true}
end,
maybe_cleanup(Tid, {Prefix, N}, Cleanup),
filename:basename(File).
find_or_make_filename(Tid, DataDir, Prefix, N) ->
case ets:lookup(Tid, {Prefix, N}) of
[] ->
F = generate_filename(DataDir, Prefix, N),
true = ets:insert_new(Tid, {{Prefix, N}, F}),
F;
[{_Key, File}] ->
File
end.
generate_filename(DataDir, Prefix, N) ->
{F, _} = machi_util:make_data_filename(
DataDir,
Prefix,
generate_uuid_v4_str(),
N),
binary_to_list(F).
maybe_cleanup(_Tid, _Key, false) ->
ok;
maybe_cleanup(Tid, Key, true) ->
ok = ets:delete(Tid, Key).
-ifdef(TEST).
-endif.