From d627f238bfb428816a8688a77c5a763c34fb001d Mon Sep 17 00:00:00 2001 From: Mark Allen Date: Tue, 6 Oct 2015 22:44:31 -0500 Subject: [PATCH] Cache generated names until disk files are written --- src/machi_flu_filename_mgr.erl | 122 ++++++++++++++++++++++----------- 1 file changed, 82 insertions(+), 40 deletions(-) diff --git a/src/machi_flu_filename_mgr.erl b/src/machi_flu_filename_mgr.erl index ef2046e..ef454ef 100644 --- a/src/machi_flu_filename_mgr.erl +++ b/src/machi_flu_filename_mgr.erl @@ -43,12 +43,17 @@ -module(machi_flu_filename_mgr). -behavior(gen_server). +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). +-compile(export_all). +-endif. + -export([ child_spec/2, start_link/2, - find_or_make_filename_from_prefix/1, - increment_prefix_sequence/1, - list_files_by_prefix/1 + find_or_make_filename_from_prefix/2, + increment_prefix_sequence/2, + list_files_by_prefix/2 ]). %% gen_server callbacks @@ -63,6 +68,11 @@ -define(TIMEOUT, 10 * 1000). +-record(state, {fluname :: atom(), + tid :: ets:tid(), + datadir :: file:dir() + }). + %% public API child_spec(FluName, DataDir) -> @@ -72,75 +82,63 @@ child_spec(FluName, DataDir) -> permanent, 5000, worker, [?MODULE]}. start_link(FluName, DataDir) when is_atom(FluName) andalso is_list(DataDir) -> - gen_server:start_link({local, make_filename_mgr_name(FluName)}, ?MODULE, [DataDir], []). + N = make_filename_mgr_name(FluName), + gen_server:start_link({local, N}, ?MODULE, [FluName, DataDir], []). --spec find_or_make_filename_from_prefix( Prefix :: {prefix, string()} ) -> +-spec find_or_make_filename_from_prefix( FluName :: atom(), Prefix :: {prefix, string()} ) -> {file, Filename :: string()} | {error, Reason :: term() } | timeout. % @doc Find the latest available or make a filename from a prefix. A prefix % should be in the form of a tagged tuple `{prefix, P}'. Returns a tagged % tuple in the form of `{file, F}' or an `{error, Reason}' -find_or_make_filename_from_prefix({prefix, Prefix}) -> - gen_server:call(?MODULE, {find_filename, Prefix}, ?TIMEOUT); -find_or_make_filename_from_prefix(Other) -> +find_or_make_filename_from_prefix(FluName, {prefix, Prefix}) when is_atom(FluName) -> + N = make_filename_mgr_name(FluName), + gen_server:call(N, {find_filename, Prefix}, ?TIMEOUT); +find_or_make_filename_from_prefix(_FluName, Other) -> lager:error("~p is not a valid prefix.", [Other]), error(badarg). --spec increment_prefix_sequence( Prefix :: {prefix, string()} ) -> +-spec increment_prefix_sequence( FluName :: atom(), Prefix :: {prefix, string()} ) -> ok | {error, Reason :: term() } | timeout. % @doc Increment the sequence counter for a given prefix. Prefix should % be in the form of `{prefix, P}'. -increment_prefix_sequence({prefix, Prefix}) -> - gen_server:call(?MODULE, {increment_sequence, Prefix}, ?TIMEOUT); -increment_prefix_sequence(Other) -> +increment_prefix_sequence(FluName, {prefix, Prefix}) when is_atom(FluName) -> + gen_server:call(make_filename_mgr_name(FluName), {increment_sequence, Prefix}, ?TIMEOUT); +increment_prefix_sequence(_FluName, Other) -> lager:error("~p is not a valid prefix.", [Other]), error(badarg). --spec list_files_by_prefix( Prefix :: {prefix, string()} ) -> +-spec list_files_by_prefix( FluName :: atom(), Prefix :: {prefix, string()} ) -> [ file:name() ] | timeout | {error, Reason :: term() }. % @doc Given a prefix in the form of `{prefix, P}' return % all the data files associated with that prefix. Returns % a list. -list_files_by_prefix({prefix, Prefix}) -> - gen_server:call(?MODULE, {list_files, Prefix}, ?TIMEOUT); -list_files_by_prefix(Other) -> +list_files_by_prefix(FluName, {prefix, Prefix}) when is_atom(FluName) -> + gen_server:call(make_filename_mgr_name(FluName), {list_files, Prefix}, ?TIMEOUT); +list_files_by_prefix(_FluName, Other) -> lager:error("~p is not a valid prefix.", [Other]), error(badarg). %% gen_server API -init([DataDir]) -> - {ok, DataDir}. +init([FluName, DataDir]) -> + Tid = ets:new(make_filename_mgr_name(FluName), [named_table, {read_concurrency, true}]), + {ok, #state{ fluname = FluName, datadir = DataDir, tid = Tid }}. handle_cast(Req, State) -> lager:warning("Got unknown cast ~p", [Req]), {noreply, State}. -handle_call({find_filename, Prefix}, _From, DataDir) -> - N = machi_util:read_max_filenum(DataDir, Prefix), - File = case find_file(DataDir, Prefix, N) of - [] -> - {F, _} = machi_util:make_data_filename( - DataDir, - Prefix, - generate_uuid_v4_str(), - N), - F; - [H] -> H; - [Fn | _ ] = L -> - lager:warning( - "Searching for a matching file to prefix ~p and sequence number ~p gave multiples: ~p", - [Prefix, N, L]), - Fn - end, - {reply, {file, File}, DataDir}; -handle_call({increment_sequence, Prefix}, _From, DataDir) -> +handle_call({find_filename, Prefix}, _From, S = #state{ datadir = DataDir, tid = Tid }) -> + File = handle_find_file(Tid, Prefix, DataDir), + {reply, {file, File}, S}; +handle_call({increment_sequence, Prefix}, _From, S = #state{ datadir = DataDir }) -> ok = machi_util:increment_max_filenum(DataDir, Prefix), - {reply, ok, DataDir}; -handle_call({list_files, Prefix}, From, DataDir) -> + {reply, ok, S}; +handle_call({list_files, Prefix}, From, S = #state{ datadir = DataDir }) -> spawn(fun() -> L = list_files(DataDir, Prefix), gen_server:reply(From, L) end), - {noreply, DataDir}; + {noreply, S}; handle_call(Req, From, State) -> lager:warning("Got unknown call ~p from ~p", [Req, From]), @@ -176,3 +174,47 @@ list_files(DataDir, Prefix) -> make_filename_mgr_name(FluName) when is_atom(FluName) -> list_to_atom(atom_to_list(FluName) ++ "_filename_mgr"). + +handle_find_file(Tid, Prefix, DataDir) -> + N = machi_util:read_max_filenum(DataDir, Prefix), + {File, Cleanup} = case find_file(DataDir, Prefix, N) of + [] -> + {find_or_make_filename(Tid, DataDir, Prefix, N), false}; + [H] -> {H, true}; + [Fn | _ ] = L -> + lager:warning( + "Searching for a matching file to prefix ~p and sequence number ~p gave multiples: ~p", + [Prefix, N, L]), + {Fn, true} + end, + maybe_cleanup(Tid, {Prefix, N}, Cleanup), + filename:basename(File). + +find_or_make_filename(Tid, DataDir, Prefix, N) -> + case ets:lookup(Tid, {Prefix, N}) of + [] -> + F = generate_filename(DataDir, Prefix, N), + true = ets:insert_new(Tid, {{Prefix, N}, F}), + F; + [{_Key, File}] -> + File + end. + +generate_filename(DataDir, Prefix, N) -> + {F, _} = machi_util:make_data_filename( + DataDir, + Prefix, + generate_uuid_v4_str(), + N), + binary_to_list(F). + +maybe_cleanup(_Tid, _Key, false) -> + ok; +maybe_cleanup(Tid, Key, true) -> + ok = ets:delete(Tid, Key). + + +-ifdef(TEST). + +-endif. +