From 1e0bb4c4049cb5925feabae1f160c7a47286ce40 Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Tue, 29 Mar 2016 18:39:52 +0900 Subject: [PATCH] Fix file rollover problems --- src/machi_file_proxy.erl | 87 ++++++++++++++++++++++++---------- src/machi_flu_filename_mgr.erl | 5 +- src/machi_flu_metadata_mgr.erl | 65 +++++++++++++++---------- 3 files changed, 106 insertions(+), 51 deletions(-) diff --git a/src/machi_file_proxy.erl b/src/machi_file_proxy.erl index 4e61988..909426c 100644 --- a/src/machi_file_proxy.erl +++ b/src/machi_file_proxy.erl @@ -71,7 +71,7 @@ code_change/3 ]). --define(TICK, 30*1000). %% XXX FIXME Should be something like 5 seconds +-define(TICK, 5*1000). -define(TICK_THRESHOLD, 5). %% After this + 1 more quiescent ticks, shutdown -define(TIMEOUT, 10*1000). -define(TOO_MANY_ERRORS_RATIO, 50). @@ -91,6 +91,7 @@ csum_table :: machi_csum_table:table(), eof_position = 0 :: non_neg_integer(), max_file_size = ?DEFAULT_MAX_FILE_SIZE :: pos_integer(), + rollover = false :: boolean(), tref :: reference(), %% timer ref ticks = 0 :: non_neg_integer(), %% ticks elapsed with no new operations ops = 0 :: non_neg_integer(), %% sum of all ops @@ -449,11 +450,23 @@ handle_cast(Cast, State) -> {noreply, State}. % @private -handle_info(tick, State = #state{eof_position = Eof, +handle_info(tick, State = #state{fluname = FluName, + filename = F, + eof_position = Eof, max_file_size = MaxFileSize}) when Eof >= MaxFileSize -> - lager:notice("Eof position ~p >= max file size ~p. Shutting down.", - [Eof, MaxFileSize]), - {stop, file_rollover, State}; + %% Older code halted here with {stop, file_rollover, State}. + %% However, there may be other requests in our mailbox already + %% and/or not yet delivered but in a race with the + %% machi_flu_metadata_mgr. So we close our eleveldb instance (to + %% avoid double-open attempt by a new file proxy proc), tell + %% machi_flu_metadata_mgr that we request a rollover, then stop. + %% terminate() will take care of forwarding messages that are + %% caught in the race. + lager:notice("Eof ~s position ~p >= max file size ~p. Shutting down.", + [F, Eof, MaxFileSize]), + State2 = close_files(State), + machi_flu_metadata_mgr:stop_proxy_pid_rollover(FluName, {file, F}), + {stop, normal, State2#state{rollover = true}}; %% XXX Is this a good idea? Need to think this through a bit. handle_info(tick, State = #state{wedged = true}) -> @@ -526,30 +539,23 @@ handle_info(Req, State) -> {noreply, State}. % @private -terminate(Reason, #state{filename = F, - data_filehandle = FHd, - csum_table = T, - reads = {RT, RE}, - writes = {WT, WE}, - appends = {AT, AE} - }) -> +terminate(Reason, State = #state{fluname = FluName, + filename = F, + rollover = Rollover_p, + reads = {RT, RE}, + writes = {WT, WE}, + appends = {AT, AE} + }) -> lager:info("Shutting down proxy for file ~p because ~p", [F, Reason]), lager:info(" Op Tot/Error", []), lager:info(" Reads: ~p/~p", [RT, RE]), lager:info(" Writes: ~p/~p", [WT, WE]), lager:info("Appends: ~p/~p", [AT, AE]), - case FHd of - undefined -> - noop; %% file deleted - _ -> - ok = file:sync(FHd), - ok = file:close(FHd) - end, - case T of - undefined -> - noop; %% file deleted - _ -> - ok = machi_csum_table:close(T) + close_files(State), + if Rollover_p -> + forward_late_messages(FluName, F, 500); + true -> + ok end, ok. @@ -867,3 +873,36 @@ maybe_gc(Reply, S = #state{fluname=FluName, false -> {reply, Reply, S} end. + +close_files(State = #state{data_filehandle = FHd, + csum_table = T}) -> + case FHd of + undefined -> + noop; %% file deleted + _ -> + ok = file:sync(FHd), + ok = file:close(FHd) + end, + case T of + undefined -> + noop; %% file deleted + _ -> + ok = machi_csum_table:close(T) + end, + State#state{data_filehandle = undefined, csum_table = undefined}. + +forward_late_messages(FluName, F, Timeout) -> + receive + M -> + case machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, F}) of + {ok, Pid} -> + Pid ! M; + {error, trimmed} -> + lager:error("TODO: FLU ~p file ~p reports trimmed status " + "when forwarding ~P\n", + [FluName, F, M, 20]) + end, + forward_late_messages(FluName, F, Timeout) + after Timeout -> + ok + end. diff --git a/src/machi_flu_filename_mgr.erl b/src/machi_flu_filename_mgr.erl index b25d146..908f5c6 100644 --- a/src/machi_flu_filename_mgr.erl +++ b/src/machi_flu_filename_mgr.erl @@ -157,8 +157,9 @@ handle_call({find_filename, _FluName, EpochId, NSInfo, Prefix}, _From, S = #stat File = increment_and_cache_filename(Tid, DataDir, NSInfo, Prefix), {reply, {file, File}, S#state{epoch = EpochId}}; -handle_call({increment_sequence, #ns_info{name=NS, locator=NSLocator}, Prefix}, _From, S = #state{ datadir = DataDir }) -> - ok = machi_util:increment_max_filenum(DataDir, NS, NSLocator, Prefix), +handle_call({increment_sequence, #ns_info{name=NS, locator=NSLocator}, Prefix}, _From, S = #state{ datadir = DataDir, tid=Tid }) -> + NSInfo = #ns_info{name=NS, locator=NSLocator}, + _File = increment_and_cache_filename(Tid, DataDir, NSInfo, Prefix), {reply, ok, S}; handle_call({list_files, Prefix}, From, S = #state{ datadir = DataDir }) -> spawn(fun() -> diff --git a/src/machi_flu_metadata_mgr.erl b/src/machi_flu_metadata_mgr.erl index b9c26c9..6b80c98 100644 --- a/src/machi_flu_metadata_mgr.erl +++ b/src/machi_flu_metadata_mgr.erl @@ -63,6 +63,7 @@ lookup_proxy_pid/2, start_proxy_pid/2, stop_proxy_pid/2, + stop_proxy_pid_rollover/2, build_metadata_mgr_name/2, trim_file/2 ]). @@ -100,7 +101,10 @@ start_proxy_pid(FluName, {file, Filename}) -> gen_server:call(get_manager_atom(FluName, Filename), {start_proxy_pid, Filename}, ?TIMEOUT). stop_proxy_pid(FluName, {file, Filename}) -> - gen_server:call(get_manager_atom(FluName, Filename), {stop_proxy_pid, Filename}, ?TIMEOUT). + gen_server:call(get_manager_atom(FluName, Filename), {stop_proxy_pid, false, Filename}, ?TIMEOUT). + +stop_proxy_pid_rollover(FluName, {file, Filename}) -> + gen_server:call(get_manager_atom(FluName, Filename), {stop_proxy_pid, true, Filename}, ?TIMEOUT). trim_file(FluName, {file, Filename}) -> gen_server:call(get_manager_atom(FluName, Filename), {trim_file, Filename}, ?TIMEOUT). @@ -151,7 +155,7 @@ handle_call({start_proxy_pid, Filename}, _From, {reply, {error, trimmed}, State} end; -handle_call({stop_proxy_pid, Filename}, _From, State = #state{ tid = Tid }) -> +handle_call({stop_proxy_pid, Rollover_p, Filename}, _From, State = #state{ tid = Tid }) -> case lookup_md(Tid, Filename) of not_found -> ok; @@ -159,8 +163,13 @@ handle_call({stop_proxy_pid, Filename}, _From, State = #state{ tid = Tid }) -> ok; #md{ proxy_pid = Pid, mref = M } = R -> demonitor(M, [flush]), - machi_file_proxy:stop(Pid), - update_ets(Tid, R#md{ proxy_pid = undefined, mref = undefined }) + if Rollover_p -> + do_rollover(Filename, State); + true -> + machi_file_proxy:stop(Pid), + update_ets(Tid, R#md{ proxy_pid = undefined, + mref = undefined }) + end end, {reply, ok, State}; @@ -182,27 +191,6 @@ handle_info({'DOWN', Mref, process, Pid, normal}, State = #state{ tid = Tid }) - clear_ets(Tid, Mref), {noreply, State}; -handle_info({'DOWN', Mref, process, Pid, file_rollover}, State = #state{ fluname = FluName, - tid = Tid }) -> - lager:info("file proxy ~p shutdown because of file rollover", [Pid]), - R = get_md_record_by_mref(Tid, Mref), - {Prefix, NS, NSLocator, _, _} = - machi_util:parse_filename(R#md.filename), - - %% We only increment the counter here. The filename will be generated on the - %% next append request to that prefix and since the filename will have a new - %% sequence number it probably will be associated with a different metadata - %% manager. That's why we don't want to generate a new file name immediately - %% and use it to start a new file proxy. - NSInfo = #ns_info{name=NS, locator=NSLocator}, - ok = machi_flu_filename_mgr:increment_prefix_sequence(FluName, NSInfo, {prefix, Prefix}), - - %% purge our ets table of this entry completely since it is likely the - %% new filename (whenever it comes) will be in a different manager than - %% us. - purge_ets(Tid, R), - {noreply, State}; - handle_info({'DOWN', Mref, process, Pid, wedged}, State = #state{ tid = Tid }) -> lager:error("file proxy ~p shutdown because it's wedged", [Pid]), clear_ets(Tid, Mref), @@ -275,8 +263,35 @@ get_md_record_by_mref(Tid, Mref) -> [R] = ets:match_object(Tid, {md, '_', '_', Mref}), R. +get_md_record_by_filename(Tid, Filename) -> + [R] = ets:lookup(Tid, Filename), + R. + get_env(Setting, Default) -> case application:get_env(machi, Setting) of undefined -> Default; {ok, V} -> V end. + +do_rollover(Filename, _State = #state{ fluname = FluName, + tid = Tid }) -> + R = get_md_record_by_filename(Tid, Filename), + lager:info("file ~p proxy ~p shutdown because of file rollover", + [Filename, R#md.proxy_pid]), + {Prefix, NS, NSLocator, _, _} = + machi_util:parse_filename(R#md.filename), + + %% We only increment the counter here. The filename will be generated on the + %% next append request to that prefix and since the filename will have a new + %% sequence number it probably will be associated with a different metadata + %% manager. That's why we don't want to generate a new file name immediately + %% and use it to start a new file proxy. + NSInfo = #ns_info{name=NS, locator=NSLocator}, +lager:warning("INCR: ~p ~p\n", [FluName, Prefix]), + ok = machi_flu_filename_mgr:increment_prefix_sequence(FluName, NSInfo, {prefix, Prefix}), + + %% purge our ets table of this entry completely since it is likely the + %% new filename (whenever it comes) will be in a different manager than + %% us. + purge_ets(Tid, R), + ok.