Fix file rollover problems

This commit is contained in:
Scott Lystig Fritchie 2016-03-29 18:39:52 +09:00
parent 549963545f
commit 1e0bb4c404
3 changed files with 106 additions and 51 deletions

View file

@ -71,7 +71,7 @@
code_change/3
]).
-define(TICK, 30*1000). %% XXX FIXME Should be something like 5 seconds
-define(TICK, 5*1000).
-define(TICK_THRESHOLD, 5). %% After this + 1 more quiescent ticks, shutdown
-define(TIMEOUT, 10*1000).
-define(TOO_MANY_ERRORS_RATIO, 50).
@ -91,6 +91,7 @@
csum_table :: machi_csum_table:table(),
eof_position = 0 :: non_neg_integer(),
max_file_size = ?DEFAULT_MAX_FILE_SIZE :: pos_integer(),
rollover = false :: boolean(),
tref :: reference(), %% timer ref
ticks = 0 :: non_neg_integer(), %% ticks elapsed with no new operations
ops = 0 :: non_neg_integer(), %% sum of all ops
@ -449,11 +450,23 @@ handle_cast(Cast, State) ->
{noreply, State}.
% @private
handle_info(tick, State = #state{eof_position = Eof,
handle_info(tick, State = #state{fluname = FluName,
filename = F,
eof_position = Eof,
max_file_size = MaxFileSize}) when Eof >= MaxFileSize ->
lager:notice("Eof position ~p >= max file size ~p. Shutting down.",
[Eof, MaxFileSize]),
{stop, file_rollover, State};
%% Older code halted here with {stop, file_rollover, State}.
%% However, there may be other requests in our mailbox already
%% and/or not yet delivered but in a race with the
%% machi_flu_metadata_mgr. So we close our eleveldb instance (to
%% avoid double-open attempt by a new file proxy proc), tell
%% machi_flu_metadata_mgr that we request a rollover, then stop.
%% terminate() will take care of forwarding messages that are
%% caught in the race.
lager:notice("Eof ~s position ~p >= max file size ~p. Shutting down.",
[F, Eof, MaxFileSize]),
State2 = close_files(State),
machi_flu_metadata_mgr:stop_proxy_pid_rollover(FluName, {file, F}),
{stop, normal, State2#state{rollover = true}};
%% XXX Is this a good idea? Need to think this through a bit.
handle_info(tick, State = #state{wedged = true}) ->
@ -526,30 +539,23 @@ handle_info(Req, State) ->
{noreply, State}.
% @private
terminate(Reason, #state{filename = F,
data_filehandle = FHd,
csum_table = T,
reads = {RT, RE},
writes = {WT, WE},
appends = {AT, AE}
}) ->
terminate(Reason, State = #state{fluname = FluName,
filename = F,
rollover = Rollover_p,
reads = {RT, RE},
writes = {WT, WE},
appends = {AT, AE}
}) ->
lager:info("Shutting down proxy for file ~p because ~p", [F, Reason]),
lager:info(" Op Tot/Error", []),
lager:info(" Reads: ~p/~p", [RT, RE]),
lager:info(" Writes: ~p/~p", [WT, WE]),
lager:info("Appends: ~p/~p", [AT, AE]),
case FHd of
undefined ->
noop; %% file deleted
_ ->
ok = file:sync(FHd),
ok = file:close(FHd)
end,
case T of
undefined ->
noop; %% file deleted
_ ->
ok = machi_csum_table:close(T)
close_files(State),
if Rollover_p ->
forward_late_messages(FluName, F, 500);
true ->
ok
end,
ok.
@ -867,3 +873,36 @@ maybe_gc(Reply, S = #state{fluname=FluName,
false ->
{reply, Reply, S}
end.
close_files(State = #state{data_filehandle = FHd,
csum_table = T}) ->
case FHd of
undefined ->
noop; %% file deleted
_ ->
ok = file:sync(FHd),
ok = file:close(FHd)
end,
case T of
undefined ->
noop; %% file deleted
_ ->
ok = machi_csum_table:close(T)
end,
State#state{data_filehandle = undefined, csum_table = undefined}.
forward_late_messages(FluName, F, Timeout) ->
receive
M ->
case machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, F}) of
{ok, Pid} ->
Pid ! M;
{error, trimmed} ->
lager:error("TODO: FLU ~p file ~p reports trimmed status "
"when forwarding ~P\n",
[FluName, F, M, 20])
end,
forward_late_messages(FluName, F, Timeout)
after Timeout ->
ok
end.

View file

@ -157,8 +157,9 @@ handle_call({find_filename, _FluName, EpochId, NSInfo, Prefix}, _From, S = #stat
File = increment_and_cache_filename(Tid, DataDir, NSInfo, Prefix),
{reply, {file, File}, S#state{epoch = EpochId}};
handle_call({increment_sequence, #ns_info{name=NS, locator=NSLocator}, Prefix}, _From, S = #state{ datadir = DataDir }) ->
ok = machi_util:increment_max_filenum(DataDir, NS, NSLocator, Prefix),
handle_call({increment_sequence, #ns_info{name=NS, locator=NSLocator}, Prefix}, _From, S = #state{ datadir = DataDir, tid=Tid }) ->
NSInfo = #ns_info{name=NS, locator=NSLocator},
_File = increment_and_cache_filename(Tid, DataDir, NSInfo, Prefix),
{reply, ok, S};
handle_call({list_files, Prefix}, From, S = #state{ datadir = DataDir }) ->
spawn(fun() ->

View file

@ -63,6 +63,7 @@
lookup_proxy_pid/2,
start_proxy_pid/2,
stop_proxy_pid/2,
stop_proxy_pid_rollover/2,
build_metadata_mgr_name/2,
trim_file/2
]).
@ -100,7 +101,10 @@ start_proxy_pid(FluName, {file, Filename}) ->
gen_server:call(get_manager_atom(FluName, Filename), {start_proxy_pid, Filename}, ?TIMEOUT).
stop_proxy_pid(FluName, {file, Filename}) ->
gen_server:call(get_manager_atom(FluName, Filename), {stop_proxy_pid, Filename}, ?TIMEOUT).
gen_server:call(get_manager_atom(FluName, Filename), {stop_proxy_pid, false, Filename}, ?TIMEOUT).
stop_proxy_pid_rollover(FluName, {file, Filename}) ->
gen_server:call(get_manager_atom(FluName, Filename), {stop_proxy_pid, true, Filename}, ?TIMEOUT).
trim_file(FluName, {file, Filename}) ->
gen_server:call(get_manager_atom(FluName, Filename), {trim_file, Filename}, ?TIMEOUT).
@ -151,7 +155,7 @@ handle_call({start_proxy_pid, Filename}, _From,
{reply, {error, trimmed}, State}
end;
handle_call({stop_proxy_pid, Filename}, _From, State = #state{ tid = Tid }) ->
handle_call({stop_proxy_pid, Rollover_p, Filename}, _From, State = #state{ tid = Tid }) ->
case lookup_md(Tid, Filename) of
not_found ->
ok;
@ -159,8 +163,13 @@ handle_call({stop_proxy_pid, Filename}, _From, State = #state{ tid = Tid }) ->
ok;
#md{ proxy_pid = Pid, mref = M } = R ->
demonitor(M, [flush]),
machi_file_proxy:stop(Pid),
update_ets(Tid, R#md{ proxy_pid = undefined, mref = undefined })
if Rollover_p ->
do_rollover(Filename, State);
true ->
machi_file_proxy:stop(Pid),
update_ets(Tid, R#md{ proxy_pid = undefined,
mref = undefined })
end
end,
{reply, ok, State};
@ -182,27 +191,6 @@ handle_info({'DOWN', Mref, process, Pid, normal}, State = #state{ tid = Tid }) -
clear_ets(Tid, Mref),
{noreply, State};
handle_info({'DOWN', Mref, process, Pid, file_rollover}, State = #state{ fluname = FluName,
tid = Tid }) ->
lager:info("file proxy ~p shutdown because of file rollover", [Pid]),
R = get_md_record_by_mref(Tid, Mref),
{Prefix, NS, NSLocator, _, _} =
machi_util:parse_filename(R#md.filename),
%% We only increment the counter here. The filename will be generated on the
%% next append request to that prefix and since the filename will have a new
%% sequence number it probably will be associated with a different metadata
%% manager. That's why we don't want to generate a new file name immediately
%% and use it to start a new file proxy.
NSInfo = #ns_info{name=NS, locator=NSLocator},
ok = machi_flu_filename_mgr:increment_prefix_sequence(FluName, NSInfo, {prefix, Prefix}),
%% purge our ets table of this entry completely since it is likely the
%% new filename (whenever it comes) will be in a different manager than
%% us.
purge_ets(Tid, R),
{noreply, State};
handle_info({'DOWN', Mref, process, Pid, wedged}, State = #state{ tid = Tid }) ->
lager:error("file proxy ~p shutdown because it's wedged", [Pid]),
clear_ets(Tid, Mref),
@ -275,8 +263,35 @@ get_md_record_by_mref(Tid, Mref) ->
[R] = ets:match_object(Tid, {md, '_', '_', Mref}),
R.
get_md_record_by_filename(Tid, Filename) ->
[R] = ets:lookup(Tid, Filename),
R.
get_env(Setting, Default) ->
case application:get_env(machi, Setting) of
undefined -> Default;
{ok, V} -> V
end.
do_rollover(Filename, _State = #state{ fluname = FluName,
tid = Tid }) ->
R = get_md_record_by_filename(Tid, Filename),
lager:info("file ~p proxy ~p shutdown because of file rollover",
[Filename, R#md.proxy_pid]),
{Prefix, NS, NSLocator, _, _} =
machi_util:parse_filename(R#md.filename),
%% We only increment the counter here. The filename will be generated on the
%% next append request to that prefix and since the filename will have a new
%% sequence number it probably will be associated with a different metadata
%% manager. That's why we don't want to generate a new file name immediately
%% and use it to start a new file proxy.
NSInfo = #ns_info{name=NS, locator=NSLocator},
lager:warning("INCR: ~p ~p\n", [FluName, Prefix]),
ok = machi_flu_filename_mgr:increment_prefix_sequence(FluName, NSInfo, {prefix, Prefix}),
%% purge our ets table of this entry completely since it is likely the
%% new filename (whenever it comes) will be in a different manager than
%% us.
purge_ets(Tid, R),
ok.