Fix machi_flu_filename_mgr to avoid double-write errors during network partitions
This commit is contained in:
parent
546901ef49
commit
ecfad4726b
1 changed files with 10 additions and 35 deletions
|
@ -100,7 +100,7 @@ find_or_make_filename_from_prefix(FluName, EpochId,
|
||||||
{coc, _CoC_Ns, _CoC_Loc}=CoC_NL)
|
{coc, _CoC_Ns, _CoC_Loc}=CoC_NL)
|
||||||
when is_atom(FluName) ->
|
when is_atom(FluName) ->
|
||||||
N = make_filename_mgr_name(FluName),
|
N = make_filename_mgr_name(FluName),
|
||||||
gen_server:call(N, {find_filename, EpochId, CoC_NL, Prefix}, ?TIMEOUT);
|
gen_server:call(N, {find_filename, FluName, EpochId, CoC_NL, Prefix}, ?TIMEOUT);
|
||||||
find_or_make_filename_from_prefix(_FluName, _EpochId, Other, Other2) ->
|
find_or_make_filename_from_prefix(_FluName, _EpochId, Other, Other2) ->
|
||||||
lager:error("~p is not a valid prefix/CoC ~p", [Other, Other2]),
|
lager:error("~p is not a valid prefix/CoC ~p", [Other, Other2]),
|
||||||
error(badarg).
|
error(badarg).
|
||||||
|
@ -142,14 +142,14 @@ handle_cast(Req, State) ->
|
||||||
%% the FLU has already validated that the caller's epoch id and the FLU's epoch id
|
%% the FLU has already validated that the caller's epoch id and the FLU's epoch id
|
||||||
%% are the same. So we *assume* that remains the case here - that is to say, we
|
%% are the same. So we *assume* that remains the case here - that is to say, we
|
||||||
%% are not wedged.
|
%% are not wedged.
|
||||||
handle_call({find_filename, EpochId, CoC_NL, Prefix}, _From, S = #state{ datadir = DataDir,
|
handle_call({find_filename, FluName, EpochId, CoC_NL, Prefix}, _From, S = #state{ datadir = DataDir,
|
||||||
epoch = EpochId,
|
epoch = EpochId,
|
||||||
tid = Tid }) ->
|
tid = Tid }) ->
|
||||||
%% Our state and the caller's epoch ids are the same. Business as usual.
|
%% Our state and the caller's epoch ids are the same. Business as usual.
|
||||||
File = handle_find_file(Tid, CoC_NL, Prefix, DataDir),
|
File = handle_find_file(FluName, Tid, CoC_NL, Prefix, DataDir),
|
||||||
{reply, {file, File}, S};
|
{reply, {file, File}, S};
|
||||||
|
|
||||||
handle_call({find_filename, EpochId, CoC_NL, Prefix}, _From, S = #state{ datadir = DataDir, tid = Tid }) ->
|
handle_call({find_filename, _FluName, EpochId, CoC_NL, Prefix}, _From, S = #state{ datadir = DataDir, tid = Tid }) ->
|
||||||
%% If the epoch id in our state and the caller's epoch id were the same, it would've
|
%% If the epoch id in our state and the caller's epoch id were the same, it would've
|
||||||
%% matched the above clause. Since we're here, we know that they are different.
|
%% matched the above clause. Since we're here, we know that they are different.
|
||||||
%% If epoch ids between our state and the caller's are different, we must increment the
|
%% If epoch ids between our state and the caller's are different, we must increment the
|
||||||
|
@ -191,12 +191,6 @@ generate_uuid_v4_str() ->
|
||||||
io_lib:format("~8.16.0b-~4.16.0b-4~3.16.0b-~4.16.0b-~12.16.0b",
|
io_lib:format("~8.16.0b-~4.16.0b-4~3.16.0b-~4.16.0b-~12.16.0b",
|
||||||
[A, B, C band 16#0fff, D band 16#3fff bor 16#8000, E]).
|
[A, B, C band 16#0fff, D band 16#3fff bor 16#8000, E]).
|
||||||
|
|
||||||
find_file(DataDir, {coc,CoC_Namespace,CoC_Locator}=_CoC_NL, Prefix, N) ->
|
|
||||||
{_Filename, Path} = machi_util:make_data_filename(DataDir,
|
|
||||||
CoC_Namespace,CoC_Locator,
|
|
||||||
Prefix, "*", N),
|
|
||||||
filelib:wildcard(Path).
|
|
||||||
|
|
||||||
list_files(DataDir, Prefix) ->
|
list_files(DataDir, Prefix) ->
|
||||||
{F_bin, Path} = machi_util:make_data_filename(DataDir, "*^" ++ Prefix ++ "^*"),
|
{F_bin, Path} = machi_util:make_data_filename(DataDir, "*^" ++ Prefix ++ "^*"),
|
||||||
filelib:wildcard(binary_to_list(F_bin), filename:dirname(Path)).
|
filelib:wildcard(binary_to_list(F_bin), filename:dirname(Path)).
|
||||||
|
@ -204,26 +198,12 @@ list_files(DataDir, Prefix) ->
|
||||||
make_filename_mgr_name(FluName) when is_atom(FluName) ->
|
make_filename_mgr_name(FluName) when is_atom(FluName) ->
|
||||||
list_to_atom(atom_to_list(FluName) ++ "_filename_mgr").
|
list_to_atom(atom_to_list(FluName) ++ "_filename_mgr").
|
||||||
|
|
||||||
handle_find_file(Tid, {coc,CoC_Namespace,CoC_Locator}=CoC_NL, Prefix, DataDir) ->
|
handle_find_file(_FluName, Tid, {coc,CoC_Namespace,CoC_Locator}, Prefix, DataDir) ->
|
||||||
|
case ets:lookup(Tid, {CoC_Namespace, CoC_Locator, Prefix}) of
|
||||||
|
[] ->
|
||||||
N = machi_util:read_max_filenum(DataDir, CoC_Namespace, CoC_Locator, Prefix),
|
N = machi_util:read_max_filenum(DataDir, CoC_Namespace, CoC_Locator, Prefix),
|
||||||
{File, Cleanup} = case find_file(DataDir, CoC_NL, Prefix, N) of
|
|
||||||
[] ->
|
|
||||||
{find_or_make_filename(Tid, DataDir, CoC_Namespace, CoC_Locator, Prefix, N), false};
|
|
||||||
[H] -> {H, true};
|
|
||||||
[Fn | _ ] = L ->
|
|
||||||
lager:debug(
|
|
||||||
"Searching for a matching file to prefix ~p and sequence number ~p gave multiples: ~p",
|
|
||||||
[Prefix, N, L]),
|
|
||||||
{Fn, true}
|
|
||||||
end,
|
|
||||||
maybe_cleanup(Tid, {CoC_Namespace, CoC_Locator, Prefix, N}, Cleanup),
|
|
||||||
filename:basename(File).
|
|
||||||
|
|
||||||
find_or_make_filename(Tid, DataDir, CoC_Namespace, CoC_Locator, Prefix, N) ->
|
|
||||||
case ets:lookup(Tid, {CoC_Namespace, CoC_Locator, Prefix, N}) of
|
|
||||||
[] ->
|
|
||||||
F = generate_filename(DataDir, CoC_Namespace, CoC_Locator, Prefix, N),
|
F = generate_filename(DataDir, CoC_Namespace, CoC_Locator, Prefix, N),
|
||||||
true = ets:insert_new(Tid, {{CoC_Namespace, CoC_Locator, Prefix, N}, F}),
|
true = ets:insert(Tid, {{CoC_Namespace, CoC_Locator, Prefix}, F}),
|
||||||
F;
|
F;
|
||||||
[{_Key, File}] ->
|
[{_Key, File}] ->
|
||||||
File
|
File
|
||||||
|
@ -237,17 +217,12 @@ generate_filename(DataDir, CoC_Namespace, CoC_Locator, Prefix, N) ->
|
||||||
N),
|
N),
|
||||||
binary_to_list(F).
|
binary_to_list(F).
|
||||||
|
|
||||||
maybe_cleanup(_Tid, _Key, false) ->
|
|
||||||
ok;
|
|
||||||
maybe_cleanup(Tid, Key, true) ->
|
|
||||||
true = ets:delete(Tid, Key).
|
|
||||||
|
|
||||||
increment_and_cache_filename(Tid, DataDir, {coc,CoC_Namespace,CoC_Locator}, Prefix) ->
|
increment_and_cache_filename(Tid, DataDir, {coc,CoC_Namespace,CoC_Locator}, Prefix) ->
|
||||||
ok = machi_util:increment_max_filenum(DataDir, CoC_Namespace, CoC_Locator, Prefix),
|
ok = machi_util:increment_max_filenum(DataDir, CoC_Namespace, CoC_Locator, Prefix),
|
||||||
N = machi_util:read_max_filenum(DataDir, CoC_Namespace, CoC_Locator, Prefix),
|
N = machi_util:read_max_filenum(DataDir, CoC_Namespace, CoC_Locator, Prefix),
|
||||||
F = generate_filename(DataDir, CoC_Namespace, CoC_Locator, Prefix, N),
|
F = generate_filename(DataDir, CoC_Namespace, CoC_Locator, Prefix, N),
|
||||||
true = ets:insert_new(Tid, {{CoC_Namespace, CoC_Locator, Prefix, N}, F}),
|
true = ets:insert(Tid, {{CoC_Namespace, CoC_Locator, Prefix}, F}),
|
||||||
filename:basename(F).
|
F.
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue