diff --git a/src/machi_chain_manager1.erl b/src/machi_chain_manager1.erl index ca091c9..ffb398e 100644 --- a/src/machi_chain_manager1.erl +++ b/src/machi_chain_manager1.erl @@ -1926,8 +1926,8 @@ perhaps_start_repair( upi=[_|_]=UPI, repairing=[_|_]}}=S) -> RepairId = {MyName, os:timestamp()}, - RepairOpts = [{mode, repair}, verbose, {repair_id, RepairId}], - %% RepairOpts = [{mode, check}, verbose], + RepairOpts = [{repair_mode, repair}, verbose, {repair_id, RepairId}], + %% RepairOpts = [{repair_mode, check}, verbose], RepairFun = fun() -> do_repair(S, RepairOpts, ap_mode) end, LastUPI = lists:last(UPI), case timer:now_diff(os:timestamp(), Start) div 1000000 of diff --git a/src/machi_chain_repair.erl b/src/machi_chain_repair.erl index 51f1ea8..917bba6 100644 --- a/src/machi_chain_repair.erl +++ b/src/machi_chain_repair.erl @@ -43,9 +43,18 @@ -define(LONG_TIMEOUT, 60*1000). %% These macros assume there's a bound variable called Verb. --define(VERB(Fmt), if Verb -> io:format(Fmt, []); true -> ok end). +-define(VERB(Fmt), if Verb -> io:format(Fmt ); true -> ok end). -define(VERB(Fmt, Args), if Verb -> io:format(Fmt, Args); true -> ok end). +-record(stat, { + i_files = 0, + i_chs = 0, + i_bytes = 0, + o_files = 0, + o_chs = 0, + o_bytes = 0} + ). + -export([repair_cp/4, repair_ap/5]). repair_cp(Src, Dst, MembersDict, Opts) -> @@ -74,9 +83,10 @@ repair_ap(Src, Repairing, UPI, MembersDict, Opts) -> io:format(user, "MissingFileSummary ~p\n", [MissingFileSummary]), %% Repair files from perspective of Src, i.e. tail(UPI). - [ok = repair(ap_mode, File, Size, MissingList, - proplists:get_value(verbose, Opts, true), - Src, ProxiesDict) || + RepairMode = proplists:get_value(repair_mode, Opts, repair), + [ok = repair_file(ap_mode, RepairMode, File, Size, MissingList, + proplists:get_value(verbose, Opts, true), + Src, ProxiesDict) || {File, {Size, MissingList}} <- MissingFileSummary], {ok, [yo_no_error, todo_stats_here]} catch @@ -110,16 +120,32 @@ append_file_dict(Proxy, FLU_name, D) -> dict:append(File, {FLU_name, Size}, Dict) end, D, Res). -repair(Mode, File, Size, [], Verb, Src, ProxiesDict) -> +%% TODO: There's no reason why repair can't be done 1).in parallel +%% across multiple repairees, and/or 2). with multiple byte ranges in +%% the same file, and/or 3). with bigger chunks. +%% +%% 1. Optimization +%% 2. Optimization +%% 3. Optimization, but it would be the easiest to implement, e.g. use +%% constant-sized 4MB chunks. Unfortuntely, it would also destroy +%% the ability to verify here that the chunk checksums are correct +%% *and* also propagate the correct checksum metadata to the +%% destination FLU. +%% As an additional optimization, add a bit of #2 to start the next +%% read while the current write is still in progress. + +repair_file(ap_mode, RepairMode, + File, Size, [], Verb, Src, ProxiesDict) -> ?VERB("~p: ~s: present on both: ", [Src, File]), ?VERB("TODO!\n"), ok; - %%TODO: repair_both_present(File, Size, Mode, V, SrcS, SrcS2, DstS, DstS2); -repair(Mode, File, Size, MissingList, Verb, Src, ProxiesDict) -> + %%TODO: repair_both_present(File, Size, RepairMode, V, SrcS, SrcS2, DstS, DstS2); +repair_file(ap_mode, RepairMode, + File, Size, MissingList, Verb, Src, ProxiesDict) -> case lists:member(Src, MissingList) of true -> ?VERB("~p: ~s -> ~p, skipping: not on source server\n", [Src, File, MissingList]); - false when Mode == check -> + false when RepairMode == check -> ?VERB("~p: ~s -> ~p, copy ~s MB (skipped)\n", [Src, File, MissingList, mbytes(Size)]); false -> @@ -136,12 +162,14 @@ repair(Mode, File, Size, MissingList, Verb, Src, ProxiesDict) -> copy_file(File, SrcProxy, MissingProxiesDict, Verb) -> %% Use the first source socket to enumerate the chunks & checksums. %% Use the second source socket to copy each chunk. + N = length(orddict:to_list(MissingProxiesDict)), {ok, EpochID} = machi_proxy_flu1_client:get_epoch_id(SrcProxy, ?SHORT_TIMEOUT), {ok, CheckSums} = machi_proxy_flu1_client:checksum_list( SrcProxy, EpochID, File, ?LONG_TIMEOUT), CopyChunks = - fun({Offset, Size, CSum}, {In_Cs, In_Bs, Out_Cs, Out_Bs}) -> + fun({Offset, Size, CSum}, #stat{i_chs=In_Cs, i_bytes=In_Bs, + o_chs=Out_Cs, o_bytes=Out_Bs}=Acc) -> if In_Cs rem 100 == 0 -> ?VERB(".", []); true -> ok end, {ok, Chunk} = machi_proxy_flu1_client:read_chunk( @@ -152,8 +180,8 @@ copy_file(File, SrcProxy, MissingProxiesDict, Verb) -> ok = machi_proxy_flu1_client:write_chunk( DstProxy, EpochID, File, Offset, Chunk) end || {_FLU, DstProxy} <- MissingProxiesDict], - N = length(MissingProxiesDict), - {In_Cs + 1, In_Bs + Size, Out_Cs + N, Out_Bs+(N*Size)}; + Acc#stat{i_chs=In_Cs + 1, i_bytes=In_Bs + Size, + o_chs=Out_Cs + N, o_bytes=Out_Bs+(N*Size)}; CSum_now -> error_logger:error_msg( "TODO: Checksum failure: " @@ -164,8 +192,8 @@ copy_file(File, SrcProxy, MissingProxiesDict, Verb) -> {File, Offset, Size, CSum, CSum_now}}) end end, - {In_Cs, _In_Bs, Out_Cs, _Out_Bs} = lists:foldl(CopyChunks, - {0,0,0,0}, CheckSums), + Stats = lists:foldl(CopyChunks, #stat{i_files=1,o_files=N}, CheckSums), + #stat{i_chs=In_Cs, o_chs=Out_Cs} = Stats, ?VERB("copied ~w chunks to ~w replicas, ", [In_Cs, Out_Cs]), ok. @@ -191,12 +219,12 @@ copy_file(File, SrcProxy, MissingProxiesDict, Verb) -> %% ok %% end. -repair_both_present(File, Size, Mode, V, SrcS, _SrcS2, DstS, _DstS2) -> +repair_both_present(File, Size, RepairMode, V, SrcS, _SrcS2, DstS, _DstS2) -> verb("repair_both_present TODO\n"), ok. - %% io:format("repair_both_present: ~p ~p mode ~p\n", [File, Size, Mode]). + %% io:format("repair_both_present: ~p ~p mode ~p\n", [File, Size, RepairMode]). -%% repair_both_present(File, Size, Mode, V, SrcS, _SrcS2, DstS, _DstS2) -> +%% repair_both_present(File, Size, RepairMode, V, SrcS, _SrcS2, DstS, _DstS2) -> %% Tmp1 = lists:flatten(io_lib:format("/tmp/sort.1.~w.~w.~w", tuple_to_list(now()))), %% Tmp2 = lists:flatten(io_lib:format("/tmp/sort.2.~w.~w.~w", tuple_to_list(now()))), %% J_Both = lists:flatten(io_lib:format("/tmp/join.3-both.~w.~w.~w", tuple_to_list(now()))), @@ -232,7 +260,7 @@ repair_both_present(File, Size, Mode, V, SrcS, _SrcS2, DstS, _DstS2) -> %% verb("identical (secondary sort)\n", []); %% {error, enoent} -> %% io:format("differences found:"), -%% repair_both(File, Size, V, Mode, +%% repair_both(File, Size, V, RepairMode, %% J_Both, J_SrcOnly, J_DstOnly, %% SrcS, DstS) %% end @@ -244,10 +272,10 @@ repair_both_present(File, Size, Mode, V, SrcS, _SrcS2, DstS, _DstS2) -> %% S_Identical]] %% end. -%% repair_both(File, _Size, V, Mode, J_Both, J_SrcOnly, J_DstOnly, SrcS, DstS) -> -%% AccFun = if Mode == check -> +%% repair_both(File, _Size, V, RepairMode, J_Both, J_SrcOnly, J_DstOnly, SrcS, DstS) -> +%% AccFun = if RepairMode == check -> %% fun(_X, List) -> List end; -%% Mode == repair -> +%% RepairMode == repair -> %% fun( X, List) -> [X|List] end %% end, %% BothFun = fun(<<_OffsetSrcHex:16/binary, " ", @@ -278,7 +306,7 @@ repair_both_present(File, Size, Mode, V, SrcS, _SrcS2, DstS, _DstS2) -> %% file_folder(BothFun, {0,0,0,0,[]}, J_Both), %% {DiffB_src, DiffC_src, Ds_src} = file_folder(OnlyFun, {0,0,[]}, J_SrcOnly), %% {DiffB_dst, DiffC_dst, Ds_dst} = file_folder(OnlyFun, {0,0,[]}, J_DstOnly), -%% if Mode == check orelse V == true -> +%% if RepairMode == check orelse V == true -> %% io:format("\n\t"), %% io:format("BothR ~p, ", [{SameBx, SameCx, DiffBy, DiffCy}]), %% io:format("SrcR ~p, ", [{DiffB_src, DiffC_src}]), @@ -286,7 +314,7 @@ repair_both_present(File, Size, Mode, V, SrcS, _SrcS2, DstS, _DstS2) -> %% io:format("\n"); %% true -> ok %% end, -%% if Mode == repair -> +%% if RepairMode == repair -> %% ok = repair_both_both(File, V, BothDiffs, SrcS, DstS), %% ok = repair_copy_chunks(File, V, Ds_src, DiffB_src, DiffC_src, %% SrcS, DstS),