Choose new filename when epoch changes
The filename manager needs to choose a new file name for a prefix when the epoch number changes. This helps ensure safety of file merges across the cluster. (Prevents conflicts across divergent cluster members.)
This commit is contained in:
parent
161e6cd9f9
commit
f8707c61c0
1 changed files with 36 additions and 8 deletions
|
@ -51,7 +51,7 @@
|
||||||
-export([
|
-export([
|
||||||
child_spec/2,
|
child_spec/2,
|
||||||
start_link/2,
|
start_link/2,
|
||||||
find_or_make_filename_from_prefix/2,
|
find_or_make_filename_from_prefix/3,
|
||||||
increment_prefix_sequence/2,
|
increment_prefix_sequence/2,
|
||||||
list_files_by_prefix/2
|
list_files_by_prefix/2
|
||||||
]).
|
]).
|
||||||
|
@ -67,10 +67,12 @@
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-define(TIMEOUT, 10 * 1000).
|
-define(TIMEOUT, 10 * 1000).
|
||||||
|
-include("machi_projection.hrl"). %% included for pv1_epoch_n type
|
||||||
|
|
||||||
-record(state, {fluname :: atom(),
|
-record(state, {fluname :: atom(),
|
||||||
tid :: ets:tid(),
|
tid :: ets:tid(),
|
||||||
datadir :: file:dir()
|
datadir :: file:dir(),
|
||||||
|
epoch :: pv1_epoch_n()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
%% public API
|
%% public API
|
||||||
|
@ -85,15 +87,17 @@ start_link(FluName, DataDir) when is_atom(FluName) andalso is_list(DataDir) ->
|
||||||
N = make_filename_mgr_name(FluName),
|
N = make_filename_mgr_name(FluName),
|
||||||
gen_server:start_link({local, N}, ?MODULE, [FluName, DataDir], []).
|
gen_server:start_link({local, N}, ?MODULE, [FluName, DataDir], []).
|
||||||
|
|
||||||
-spec find_or_make_filename_from_prefix( FluName :: atom(), Prefix :: {prefix, string()} ) ->
|
-spec find_or_make_filename_from_prefix( FluName :: atom(),
|
||||||
|
EpochId :: pv1_epoch_n(),
|
||||||
|
Prefix :: {prefix, string()} ) ->
|
||||||
{file, Filename :: string()} | {error, Reason :: term() } | timeout.
|
{file, Filename :: string()} | {error, Reason :: term() } | timeout.
|
||||||
% @doc Find the latest available or make a filename from a prefix. A prefix
|
% @doc Find the latest available or make a filename from a prefix. A prefix
|
||||||
% should be in the form of a tagged tuple `{prefix, P}'. Returns a tagged
|
% should be in the form of a tagged tuple `{prefix, P}'. Returns a tagged
|
||||||
% tuple in the form of `{file, F}' or an `{error, Reason}'
|
% tuple in the form of `{file, F}' or an `{error, Reason}'
|
||||||
find_or_make_filename_from_prefix(FluName, {prefix, Prefix}) when is_atom(FluName) ->
|
find_or_make_filename_from_prefix(FluName, EpochId, {prefix, Prefix}) when is_atom(FluName) ->
|
||||||
N = make_filename_mgr_name(FluName),
|
N = make_filename_mgr_name(FluName),
|
||||||
gen_server:call(N, {find_filename, Prefix}, ?TIMEOUT);
|
gen_server:call(N, {find_filename, EpochId, Prefix}, ?TIMEOUT);
|
||||||
find_or_make_filename_from_prefix(_FluName, Other) ->
|
find_or_make_filename_from_prefix(_FluName, _EpochId, Other) ->
|
||||||
lager:error("~p is not a valid prefix.", [Other]),
|
lager:error("~p is not a valid prefix.", [Other]),
|
||||||
error(badarg).
|
error(badarg).
|
||||||
|
|
||||||
|
@ -121,15 +125,31 @@ list_files_by_prefix(_FluName, Other) ->
|
||||||
%% gen_server API
|
%% gen_server API
|
||||||
init([FluName, DataDir]) ->
|
init([FluName, DataDir]) ->
|
||||||
Tid = ets:new(make_filename_mgr_name(FluName), [named_table, {read_concurrency, true}]),
|
Tid = ets:new(make_filename_mgr_name(FluName), [named_table, {read_concurrency, true}]),
|
||||||
{ok, #state{ fluname = FluName, datadir = DataDir, tid = Tid }}.
|
{ok, #state{ fluname = FluName, epoch = 0, datadir = DataDir, tid = Tid }}.
|
||||||
|
|
||||||
handle_cast(Req, State) ->
|
handle_cast(Req, State) ->
|
||||||
lager:warning("Got unknown cast ~p", [Req]),
|
lager:warning("Got unknown cast ~p", [Req]),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_call({find_filename, Prefix}, _From, S = #state{ datadir = DataDir, tid = Tid }) ->
|
%% Important assumption: by the time we reach here the EpochId is kosher.
|
||||||
|
%% the FLU has already validated that the caller's epoch id and the FLU's epoch id
|
||||||
|
%% are the same. So we *assume* that remains the case here - that is to say, we
|
||||||
|
%% are not wedged.
|
||||||
|
handle_call({find_filename, EpochId, Prefix}, _From, S = #state{ datadir = DataDir,
|
||||||
|
epoch = EpochId,
|
||||||
|
tid = Tid }) ->
|
||||||
|
%% Our state and the caller's epoch ids are the same. Business as usual.
|
||||||
File = handle_find_file(Tid, Prefix, DataDir),
|
File = handle_find_file(Tid, Prefix, DataDir),
|
||||||
{reply, {file, File}, S};
|
{reply, {file, File}, S};
|
||||||
|
|
||||||
|
handle_call({find_filename, EpochId, Prefix}, _From, S = #state{ datadir = DataDir, tid = Tid }) ->
|
||||||
|
%% If the epoch id in our state and the caller's epoch id were the same, it would've
|
||||||
|
%% matched the above clause. Since we're here, we know that they are different.
|
||||||
|
%% If epoch ids between our state and the caller's are different, we must increment the
|
||||||
|
%% sequence number, generate a filename and then cache it.
|
||||||
|
File = increment_and_cache_filename(Tid, DataDir, Prefix),
|
||||||
|
{reply, {file, File}, S#state{epoch = EpochId}};
|
||||||
|
|
||||||
handle_call({increment_sequence, Prefix}, _From, S = #state{ datadir = DataDir }) ->
|
handle_call({increment_sequence, Prefix}, _From, S = #state{ datadir = DataDir }) ->
|
||||||
ok = machi_util:increment_max_filenum(DataDir, Prefix),
|
ok = machi_util:increment_max_filenum(DataDir, Prefix),
|
||||||
{reply, ok, S};
|
{reply, ok, S};
|
||||||
|
@ -213,6 +233,14 @@ maybe_cleanup(_Tid, _Key, false) ->
|
||||||
maybe_cleanup(Tid, Key, true) ->
|
maybe_cleanup(Tid, Key, true) ->
|
||||||
true = ets:delete(Tid, Key).
|
true = ets:delete(Tid, Key).
|
||||||
|
|
||||||
|
increment_and_cache_filename(Tid, DataDir, Prefix) ->
|
||||||
|
ok = machi_util:increment_max_filenum(DataDir, Prefix),
|
||||||
|
N = machi_util:read_max_filenum(DataDir, Prefix),
|
||||||
|
F = generate_filename(DataDir, Prefix, N),
|
||||||
|
true = ets:insert_new(Tid, {{Prefix, N}, F}),
|
||||||
|
filename:basename(F).
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue