WIP: move to stats via ETS, success/failure propagates, yay!

This commit is contained in:
Scott Lystig Fritchie 2015-05-12 23:37:20 +09:00
parent cad84442bb
commit 4ae0f94649
2 changed files with 77 additions and 48 deletions

View file

@ -1943,6 +1943,39 @@ perhaps_start_repair(
perhaps_start_repair(S) ->
S.
do_repair(
#ch_mgr{name=MyName,
proj=#projection_v1{upi=UPI,
repairing=[Dst|_]=Repairing,
members_dict=MembersDict}}=_S_copy,
Opts, ap_mode=_RepairMode) ->
T1 = os:timestamp(),
RepairId = proplists:get_value(repair_id, Opts, id1),
error_logger:info_msg("Repair start: tail ~p of ~p -> ~p, ID ~p\n",
[MyName, UPI, Repairing, RepairId]),
ETS = ets:new(repair_stats, [private, set]),
ETS_T_Keys = [t_in_files, t_in_chunks, t_in_bytes,
t_out_files, t_out_chunks, t_out_bytes,
t_bad_chunks, t_elapsed_seconds],
[ets:insert(ETS, {K, 0}) || K <- ETS_T_Keys],
Res = machi_chain_repair:repair_ap(MyName, Repairing, UPI,
MembersDict, ETS, Opts),
T2 = os:timestamp(),
Elapsed = (timer:now_diff(T2, T1) div 1000) / 1000,
ets:insert(ETS, {t_elapsed_seconds, Elapsed}),
Summary = case Res of ok -> "success";
_ -> "FAILURE"
end,
Stats = [{K, ets:lookup_element(ETS, K, 2)} || K <- ETS_T_Keys],
error_logger:info_msg("Repair ~s: tail ~p of ~p finished repair ID ~p: "
"~p\nStats ~w\n",
[Summary, MyName, UPI, RepairId, Res, Stats]),
timer:sleep(12345),
ets:delete(ETS),
exit({todo, Res}).
sanitize_repair_state(#ch_mgr{name=MyName,
repair_start=Start,
repair_final_status=Res,
@ -1957,22 +1990,6 @@ sanitize_repair_state(#ch_mgr{name=MyName,
sanitize_repair_state(S) ->
S.
do_repair(
#ch_mgr{name=MyName,
proj=#projection_v1{upi=UPI,
repairing=[Dst|_]=Repairing,
members_dict=MembersDict}}=_S_copy,
Opts, ap_mode=_RepairMode) ->
RepairId = proplists:get_value(repair_id, Opts, id1),
error_logger:info_msg("Chain tail ~p of ~p starting repair ~p of ~p\n",
[MyName, UPI, RepairId, Repairing]),
{ok, Info} = machi_chain_repair:repair_ap(MyName, Repairing, UPI,
MembersDict, Opts),
error_logger:info_msg("Chain tail ~p of ~p finished repair ~p: ~p\n",
[MyName, UPI, RepairId, Info]),
timer:sleep(12345),
exit({todo, Info}).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
perhaps_call_t(S, Partitions, FLU, DoIt) ->

View file

@ -46,23 +46,14 @@
-define(VERB(Fmt), if Verb -> io:format(Fmt ); true -> ok end).
-define(VERB(Fmt, Args), if Verb -> io:format(Fmt, Args); true -> ok end).
-record(stat, {
i_files = 0,
i_chs = 0,
i_bytes = 0,
o_files = 0,
o_chs = 0,
o_bytes = 0}
).
-export([repair_cp/4, repair_ap/5]).
-export([repair_cp/4, repair_ap/6]).
repair_cp(Src, Dst, MembersDict, Opts) ->
%% TODO: add missing function: wipe away any trace of chunks
%% are present on Dst but missing on Src.
exit(todo_cp_mode).
repair_ap(Src, Repairing, UPI, MembersDict, Opts) ->
repair_ap(Src, Repairing, UPI, MembersDict, ETS, Opts) ->
%% Use process dict so that 'after' clause can always quit all
%% proxy pids.
put(proxies_dict, orddict:new()),
@ -86,14 +77,15 @@ repair_ap(Src, Repairing, UPI, MembersDict, Opts) ->
RepairMode = proplists:get_value(repair_mode, Opts, repair),
[ok = repair_file(ap_mode, RepairMode, File, Size, MissingList,
proplists:get_value(verbose, Opts, true),
Src, ProxiesDict) ||
Src, ProxiesDict, ETS) ||
{File, {Size, MissingList}} <- MissingFileSummary],
{ok, [yo_no_error, todo_stats_here]}
ok
catch
What:Why ->
Stack = erlang:get_stacktrace(),
io:format(user, "What Why ~p ~p @\n\t~p\n",
[What, Why, erlang:get_stacktrace()]),
{error, yo_error}
[What, Why, Stack]),
{error, {What, Why, Stack}}
after
[(catch machi_proxy_flu1_client:quit(Pid)) ||
Pid <- orddict:to_list(get(proxies_dict))]
@ -135,12 +127,12 @@ append_file_dict(Proxy, FLU_name, D) ->
%% read while the current write is still in progress.
repair_file(ap_mode, RepairMode,
File, Size, [], Verb, Src, ProxiesDict) ->
File, Size, [], Verb, Src, ProxiesDict, ETS) ->
?VERB("~p: ~s: present on both: ", [Src, File]),
?VERB("TODO!\n"), ok;
%%TODO: repair_both_present(File, Size, RepairMode, V, SrcS, SrcS2, DstS, DstS2);
repair_file(ap_mode, RepairMode,
File, Size, MissingList, Verb, Src, ProxiesDict) ->
File, Size, MissingList, Verb, Src, ProxiesDict, ETS) ->
case lists:member(Src, MissingList) of
true ->
?VERB("~p: ~s -> ~p, skipping: not on source server\n",
@ -155,11 +147,11 @@ repair_file(ap_mode, RepairMode,
orddict:filter(fun(K, _V) -> lists:member(K, MissingList) end,
ProxiesDict),
SrcProxy = orddict:fetch(Src, ProxiesDict),
ok = copy_file(File, SrcProxy, MissingProxiesDict, Verb),
ok = copy_file(File, SrcProxy, MissingProxiesDict, Verb, ETS),
?VERB("done\n", [])
end.
copy_file(File, SrcProxy, MissingProxiesDict, Verb) ->
copy_file(File, SrcProxy, MissingProxiesDict, Verb, ETS) ->
%% Use the first source socket to enumerate the chunks & checksums.
%% Use the second source socket to copy each chunk.
N = length(orddict:to_list(MissingProxiesDict)),
@ -167,34 +159,54 @@ copy_file(File, SrcProxy, MissingProxiesDict, Verb) ->
?SHORT_TIMEOUT),
{ok, CheckSums} = machi_proxy_flu1_client:checksum_list(
SrcProxy, EpochID, File, ?LONG_TIMEOUT),
EtsKeys = [{in_files, t_in_files}, {in_chunks, t_in_chunks},
{in_bytes, t_in_bytes}, {out_files, t_out_files},
{out_chunks, t_out_chunks}, {out_bytes, t_out_bytes}],
[ets:insert(ETS, {L_K, 0}) || {L_K, _T_K} <- EtsKeys],
CopyChunks =
fun({Offset, Size, CSum}, #stat{i_chs=In_Cs, i_bytes=In_Bs,
o_chs=Out_Cs, o_bytes=Out_Bs}=Acc) ->
if In_Cs rem 100 == 0 -> ?VERB(".", []);
true -> ok end,
fun({Offset, Size, CSum}, {ok, ETS, _, _} = Acc) ->
case ets:lookup_element(ETS, in_chunks, 2) rem 100 of
0 -> ?VERB(".", []);
_ -> ok
end,
_T1 = os:timestamp(),
{ok, Chunk} = machi_proxy_flu1_client:read_chunk(
SrcProxy, EpochID, File, Offset, Size),
_T2 = os:timestamp(),
case machi_util:checksum_chunk(Chunk) of
CSum_now when CSum_now == CSum ->
[begin
_T3 = os:timestamp(),
ok = machi_proxy_flu1_client:write_chunk(
DstProxy, EpochID, File, Offset, Chunk)
DstProxy, EpochID, File, Offset, Chunk),
_T4 = os:timestamp()
end || {_FLU, DstProxy} <- MissingProxiesDict],
Acc#stat{i_chs=In_Cs + 1, i_bytes=In_Bs + Size,
o_chs=Out_Cs + N, o_bytes=Out_Bs+(N*Size)};
ets:update_counter(ETS, in_chunks, 1),
ets:update_counter(ETS, in_bytes, Size),
ets:update_counter(ETS, out_chunks, N),
ets:update_counter(ETS, out_bytes, N*Size),
Acc;
CSum_now ->
error_logger:error_msg(
"TODO: Checksum failure: "
"file ~p offset ~p size ~p: "
"expected ~p got ~p\n",
[File, Offset, Size, CSum, CSum_now]),
exit({todo_csum_error,
{File, Offset, Size, CSum, CSum_now}})
end
ets:update_counter(ETS, t_bad_chunks, 1),
Acc
end;
(_, _=Acc) -> % failure: skip rest of file
Acc
end,
Stats = lists:foldl(CopyChunks, #stat{i_files=1,o_files=N}, CheckSums),
#stat{i_chs=In_Cs, o_chs=Out_Cs} = Stats,
?VERB("copied ~w chunks to ~w replicas, ", [In_Cs, Out_Cs]),
ets:update_counter(ETS, t_in_files, 1),
ets:update_counter(ETS, t_out_files, N),
_CopyRes = lists:foldl(CopyChunks, {ok, ETS, x, x}, CheckSums),
?VERB("copied ~w chunks to ~w chunks, ",
[ets:lookup_element(ETS, K, 2) || K <- [in_chunks, out_chunks] ]),
%% Copy this file's stats to the total counts.
[ets:update_counter(ETS, T_K, ets:lookup_element(ETS, L_K, 2)) ||
{L_K, T_K} <- EtsKeys],
ok.
%% copy_file_proc_checksum_fun(File, SrcS, DstS, _Verbose) ->