From f8707c61c036a9854472c6cd4ad383e223564740 Mon Sep 17 00:00:00 2001 From: Mark Allen Date: Tue, 13 Oct 2015 21:09:31 -0500 Subject: [PATCH] Choose new filename when epoch changes The filename manager needs to choose a new file name for a prefix when the epoch number changes. This helps ensure safety of file merges across the cluster. (Prevents conflicts across divergent cluster members.) --- src/machi_flu_filename_mgr.erl | 44 +++++++++++++++++++++++++++------- 1 file changed, 36 insertions(+), 8 deletions(-) diff --git a/src/machi_flu_filename_mgr.erl b/src/machi_flu_filename_mgr.erl index 06f601f..29780bf 100644 --- a/src/machi_flu_filename_mgr.erl +++ b/src/machi_flu_filename_mgr.erl @@ -51,7 +51,7 @@ -export([ child_spec/2, start_link/2, - find_or_make_filename_from_prefix/2, + find_or_make_filename_from_prefix/3, increment_prefix_sequence/2, list_files_by_prefix/2 ]). @@ -67,10 +67,12 @@ ]). -define(TIMEOUT, 10 * 1000). +-include("machi_projection.hrl"). %% included for pv1_epoch_n type -record(state, {fluname :: atom(), tid :: ets:tid(), - datadir :: file:dir() + datadir :: file:dir(), + epoch :: pv1_epoch_n() }). %% public API @@ -85,15 +87,17 @@ start_link(FluName, DataDir) when is_atom(FluName) andalso is_list(DataDir) -> N = make_filename_mgr_name(FluName), gen_server:start_link({local, N}, ?MODULE, [FluName, DataDir], []). --spec find_or_make_filename_from_prefix( FluName :: atom(), Prefix :: {prefix, string()} ) -> +-spec find_or_make_filename_from_prefix( FluName :: atom(), + EpochId :: pv1_epoch_n(), + Prefix :: {prefix, string()} ) -> {file, Filename :: string()} | {error, Reason :: term() } | timeout. % @doc Find the latest available or make a filename from a prefix. A prefix % should be in the form of a tagged tuple `{prefix, P}'. Returns a tagged % tuple in the form of `{file, F}' or an `{error, Reason}' -find_or_make_filename_from_prefix(FluName, {prefix, Prefix}) when is_atom(FluName) -> +find_or_make_filename_from_prefix(FluName, EpochId, {prefix, Prefix}) when is_atom(FluName) -> N = make_filename_mgr_name(FluName), - gen_server:call(N, {find_filename, Prefix}, ?TIMEOUT); -find_or_make_filename_from_prefix(_FluName, Other) -> + gen_server:call(N, {find_filename, EpochId, Prefix}, ?TIMEOUT); +find_or_make_filename_from_prefix(_FluName, _EpochId, Other) -> lager:error("~p is not a valid prefix.", [Other]), error(badarg). @@ -121,15 +125,31 @@ list_files_by_prefix(_FluName, Other) -> %% gen_server API init([FluName, DataDir]) -> Tid = ets:new(make_filename_mgr_name(FluName), [named_table, {read_concurrency, true}]), - {ok, #state{ fluname = FluName, datadir = DataDir, tid = Tid }}. + {ok, #state{ fluname = FluName, epoch = 0, datadir = DataDir, tid = Tid }}. handle_cast(Req, State) -> lager:warning("Got unknown cast ~p", [Req]), {noreply, State}. -handle_call({find_filename, Prefix}, _From, S = #state{ datadir = DataDir, tid = Tid }) -> +%% Important assumption: by the time we reach here the EpochId is kosher. +%% the FLU has already validated that the caller's epoch id and the FLU's epoch id +%% are the same. So we *assume* that remains the case here - that is to say, we +%% are not wedged. +handle_call({find_filename, EpochId, Prefix}, _From, S = #state{ datadir = DataDir, + epoch = EpochId, + tid = Tid }) -> + %% Our state and the caller's epoch ids are the same. Business as usual. File = handle_find_file(Tid, Prefix, DataDir), {reply, {file, File}, S}; + +handle_call({find_filename, EpochId, Prefix}, _From, S = #state{ datadir = DataDir, tid = Tid }) -> + %% If the epoch id in our state and the caller's epoch id were the same, it would've + %% matched the above clause. Since we're here, we know that they are different. + %% If epoch ids between our state and the caller's are different, we must increment the + %% sequence number, generate a filename and then cache it. + File = increment_and_cache_filename(Tid, DataDir, Prefix), + {reply, {file, File}, S#state{epoch = EpochId}}; + handle_call({increment_sequence, Prefix}, _From, S = #state{ datadir = DataDir }) -> ok = machi_util:increment_max_filenum(DataDir, Prefix), {reply, ok, S}; @@ -213,6 +233,14 @@ maybe_cleanup(_Tid, _Key, false) -> maybe_cleanup(Tid, Key, true) -> true = ets:delete(Tid, Key). +increment_and_cache_filename(Tid, DataDir, Prefix) -> + ok = machi_util:increment_max_filenum(DataDir, Prefix), + N = machi_util:read_max_filenum(DataDir, Prefix), + F = generate_filename(DataDir, Prefix, N), + true = ets:insert_new(Tid, {{Prefix, N}, F}), + filename:basename(F). + + -ifdef(TEST).