From 4ae0f94649681a2efe6f87f977240b19506826be Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Tue, 12 May 2015 23:37:20 +0900 Subject: [PATCH] WIP: move to stats via ETS, success/failure propagates, yay! --- src/machi_chain_manager1.erl | 49 +++++++++++++++-------- src/machi_chain_repair.erl | 76 +++++++++++++++++++++--------------- 2 files changed, 77 insertions(+), 48 deletions(-) diff --git a/src/machi_chain_manager1.erl b/src/machi_chain_manager1.erl index ffb398e..a72e6e1 100644 --- a/src/machi_chain_manager1.erl +++ b/src/machi_chain_manager1.erl @@ -1943,6 +1943,39 @@ perhaps_start_repair( perhaps_start_repair(S) -> S. +do_repair( + #ch_mgr{name=MyName, + proj=#projection_v1{upi=UPI, + repairing=[Dst|_]=Repairing, + members_dict=MembersDict}}=_S_copy, + Opts, ap_mode=_RepairMode) -> + T1 = os:timestamp(), + RepairId = proplists:get_value(repair_id, Opts, id1), + error_logger:info_msg("Repair start: tail ~p of ~p -> ~p, ID ~p\n", + [MyName, UPI, Repairing, RepairId]), + + ETS = ets:new(repair_stats, [private, set]), + ETS_T_Keys = [t_in_files, t_in_chunks, t_in_bytes, + t_out_files, t_out_chunks, t_out_bytes, + t_bad_chunks, t_elapsed_seconds], + [ets:insert(ETS, {K, 0}) || K <- ETS_T_Keys], + + Res = machi_chain_repair:repair_ap(MyName, Repairing, UPI, + MembersDict, ETS, Opts), + T2 = os:timestamp(), + Elapsed = (timer:now_diff(T2, T1) div 1000) / 1000, + ets:insert(ETS, {t_elapsed_seconds, Elapsed}), + Summary = case Res of ok -> "success"; + _ -> "FAILURE" + end, + Stats = [{K, ets:lookup_element(ETS, K, 2)} || K <- ETS_T_Keys], + error_logger:info_msg("Repair ~s: tail ~p of ~p finished repair ID ~p: " + "~p\nStats ~w\n", + [Summary, MyName, UPI, RepairId, Res, Stats]), + timer:sleep(12345), + ets:delete(ETS), + exit({todo, Res}). + sanitize_repair_state(#ch_mgr{name=MyName, repair_start=Start, repair_final_status=Res, @@ -1957,22 +1990,6 @@ sanitize_repair_state(#ch_mgr{name=MyName, sanitize_repair_state(S) -> S. -do_repair( - #ch_mgr{name=MyName, - proj=#projection_v1{upi=UPI, - repairing=[Dst|_]=Repairing, - members_dict=MembersDict}}=_S_copy, - Opts, ap_mode=_RepairMode) -> - RepairId = proplists:get_value(repair_id, Opts, id1), - error_logger:info_msg("Chain tail ~p of ~p starting repair ~p of ~p\n", - [MyName, UPI, RepairId, Repairing]), - {ok, Info} = machi_chain_repair:repair_ap(MyName, Repairing, UPI, - MembersDict, Opts), - error_logger:info_msg("Chain tail ~p of ~p finished repair ~p: ~p\n", - [MyName, UPI, RepairId, Info]), - timer:sleep(12345), - exit({todo, Info}). - %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% perhaps_call_t(S, Partitions, FLU, DoIt) -> diff --git a/src/machi_chain_repair.erl b/src/machi_chain_repair.erl index 917bba6..58cab0b 100644 --- a/src/machi_chain_repair.erl +++ b/src/machi_chain_repair.erl @@ -46,23 +46,14 @@ -define(VERB(Fmt), if Verb -> io:format(Fmt ); true -> ok end). -define(VERB(Fmt, Args), if Verb -> io:format(Fmt, Args); true -> ok end). --record(stat, { - i_files = 0, - i_chs = 0, - i_bytes = 0, - o_files = 0, - o_chs = 0, - o_bytes = 0} - ). - --export([repair_cp/4, repair_ap/5]). +-export([repair_cp/4, repair_ap/6]). repair_cp(Src, Dst, MembersDict, Opts) -> %% TODO: add missing function: wipe away any trace of chunks %% are present on Dst but missing on Src. exit(todo_cp_mode). -repair_ap(Src, Repairing, UPI, MembersDict, Opts) -> +repair_ap(Src, Repairing, UPI, MembersDict, ETS, Opts) -> %% Use process dict so that 'after' clause can always quit all %% proxy pids. put(proxies_dict, orddict:new()), @@ -86,14 +77,15 @@ repair_ap(Src, Repairing, UPI, MembersDict, Opts) -> RepairMode = proplists:get_value(repair_mode, Opts, repair), [ok = repair_file(ap_mode, RepairMode, File, Size, MissingList, proplists:get_value(verbose, Opts, true), - Src, ProxiesDict) || + Src, ProxiesDict, ETS) || {File, {Size, MissingList}} <- MissingFileSummary], - {ok, [yo_no_error, todo_stats_here]} + ok catch What:Why -> + Stack = erlang:get_stacktrace(), io:format(user, "What Why ~p ~p @\n\t~p\n", - [What, Why, erlang:get_stacktrace()]), - {error, yo_error} + [What, Why, Stack]), + {error, {What, Why, Stack}} after [(catch machi_proxy_flu1_client:quit(Pid)) || Pid <- orddict:to_list(get(proxies_dict))] @@ -135,12 +127,12 @@ append_file_dict(Proxy, FLU_name, D) -> %% read while the current write is still in progress. repair_file(ap_mode, RepairMode, - File, Size, [], Verb, Src, ProxiesDict) -> + File, Size, [], Verb, Src, ProxiesDict, ETS) -> ?VERB("~p: ~s: present on both: ", [Src, File]), ?VERB("TODO!\n"), ok; %%TODO: repair_both_present(File, Size, RepairMode, V, SrcS, SrcS2, DstS, DstS2); repair_file(ap_mode, RepairMode, - File, Size, MissingList, Verb, Src, ProxiesDict) -> + File, Size, MissingList, Verb, Src, ProxiesDict, ETS) -> case lists:member(Src, MissingList) of true -> ?VERB("~p: ~s -> ~p, skipping: not on source server\n", @@ -155,11 +147,11 @@ repair_file(ap_mode, RepairMode, orddict:filter(fun(K, _V) -> lists:member(K, MissingList) end, ProxiesDict), SrcProxy = orddict:fetch(Src, ProxiesDict), - ok = copy_file(File, SrcProxy, MissingProxiesDict, Verb), + ok = copy_file(File, SrcProxy, MissingProxiesDict, Verb, ETS), ?VERB("done\n", []) end. -copy_file(File, SrcProxy, MissingProxiesDict, Verb) -> +copy_file(File, SrcProxy, MissingProxiesDict, Verb, ETS) -> %% Use the first source socket to enumerate the chunks & checksums. %% Use the second source socket to copy each chunk. N = length(orddict:to_list(MissingProxiesDict)), @@ -167,34 +159,54 @@ copy_file(File, SrcProxy, MissingProxiesDict, Verb) -> ?SHORT_TIMEOUT), {ok, CheckSums} = machi_proxy_flu1_client:checksum_list( SrcProxy, EpochID, File, ?LONG_TIMEOUT), + EtsKeys = [{in_files, t_in_files}, {in_chunks, t_in_chunks}, + {in_bytes, t_in_bytes}, {out_files, t_out_files}, + {out_chunks, t_out_chunks}, {out_bytes, t_out_bytes}], + [ets:insert(ETS, {L_K, 0}) || {L_K, _T_K} <- EtsKeys], CopyChunks = - fun({Offset, Size, CSum}, #stat{i_chs=In_Cs, i_bytes=In_Bs, - o_chs=Out_Cs, o_bytes=Out_Bs}=Acc) -> - if In_Cs rem 100 == 0 -> ?VERB(".", []); - true -> ok end, + fun({Offset, Size, CSum}, {ok, ETS, _, _} = Acc) -> + case ets:lookup_element(ETS, in_chunks, 2) rem 100 of + 0 -> ?VERB(".", []); + _ -> ok + end, + _T1 = os:timestamp(), {ok, Chunk} = machi_proxy_flu1_client:read_chunk( SrcProxy, EpochID, File, Offset, Size), + _T2 = os:timestamp(), case machi_util:checksum_chunk(Chunk) of CSum_now when CSum_now == CSum -> [begin + _T3 = os:timestamp(), ok = machi_proxy_flu1_client:write_chunk( - DstProxy, EpochID, File, Offset, Chunk) + DstProxy, EpochID, File, Offset, Chunk), + _T4 = os:timestamp() end || {_FLU, DstProxy} <- MissingProxiesDict], - Acc#stat{i_chs=In_Cs + 1, i_bytes=In_Bs + Size, - o_chs=Out_Cs + N, o_bytes=Out_Bs+(N*Size)}; + ets:update_counter(ETS, in_chunks, 1), + ets:update_counter(ETS, in_bytes, Size), + ets:update_counter(ETS, out_chunks, N), + ets:update_counter(ETS, out_bytes, N*Size), + Acc; CSum_now -> error_logger:error_msg( "TODO: Checksum failure: " "file ~p offset ~p size ~p: " "expected ~p got ~p\n", [File, Offset, Size, CSum, CSum_now]), - exit({todo_csum_error, - {File, Offset, Size, CSum, CSum_now}}) - end + ets:update_counter(ETS, t_bad_chunks, 1), + Acc + end; + (_, _=Acc) -> % failure: skip rest of file + Acc end, - Stats = lists:foldl(CopyChunks, #stat{i_files=1,o_files=N}, CheckSums), - #stat{i_chs=In_Cs, o_chs=Out_Cs} = Stats, - ?VERB("copied ~w chunks to ~w replicas, ", [In_Cs, Out_Cs]), + ets:update_counter(ETS, t_in_files, 1), + ets:update_counter(ETS, t_out_files, N), + _CopyRes = lists:foldl(CopyChunks, {ok, ETS, x, x}, CheckSums), + ?VERB("copied ~w chunks to ~w chunks, ", + [ets:lookup_element(ETS, K, 2) || K <- [in_chunks, out_chunks] ]), + + %% Copy this file's stats to the total counts. + [ets:update_counter(ETS, T_K, ets:lookup_element(ETS, L_K, 2)) || + {L_K, T_K} <- EtsKeys], ok. %% copy_file_proc_checksum_fun(File, SrcS, DstS, _Verbose) ->