From 04bc28b9daf18bc6790b28454110a49c8b384cee Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Sat, 16 May 2015 16:55:48 +0900 Subject: [PATCH] WIP: more generic all-way file chunk merge func, part 2 --- src/machi_chain_repair.erl | 354 ++++++++++--------------------------- 1 file changed, 94 insertions(+), 260 deletions(-) diff --git a/src/machi_chain_repair.erl b/src/machi_chain_repair.erl index 546e5af..24e5005 100644 --- a/src/machi_chain_repair.erl +++ b/src/machi_chain_repair.erl @@ -73,22 +73,34 @@ repair(ap_mode=ConsistencyMode, Src, Repairing, UPI, MembersDict, ETS, Opts) -> D = dict:new(), D2 = lists:foldl(fun({FLU, Proxy}, Dict) -> - append_file_dict(Proxy, FLU, Dict) + get_file_lists(Proxy, FLU, Dict) end, D, ProxiesDict), MissingFileSummary = make_missing_file_summary(D2, OurFLUs), - io:format(user, "MissingFileSummary ~p\n", [MissingFileSummary]), + ?VERB("MissingFileSummary ~p\n", [MissingFileSummary]), + [ets:insert(ETS, {{directive_bytes, FLU}, 0}) || FLU <- OurFLUs], %% Repair files from perspective of Src, i.e. tail(UPI). RepairMode = proplists:get_value(repair_mode, Opts, repair), + Verb = proplists:get_value(verbose, Opts, true), + SrcProxy = orddict:fetch(Src, ProxiesDict), + {ok, EpochID} = machi_proxy_flu1_client:get_epoch_id( + SrcProxy, ?SHORT_TIMEOUT), Ds = [{File, make_repair_directives( - ConsistencyMode, RepairMode, File, Size, - proplists:get_value(verbose, Opts, true), + ConsistencyMode, RepairMode, File, Size, EpochID, + Verb, Src, OurFLUs, ProxiesDict, ETS)} || {File, {Size, _MissingList}} <- MissingFileSummary], %% TODO: for CP mode, any file missing from the Src FLU %% must be deleted on all repairing FLUs - io:format(user, "Ds ~P\n", [Ds, 20]), + [begin + [{_, Bytes}] = ets:lookup(ETS, {directive_bytes, FLU}), + io:format(user, "Directive bytes for ~p: ~p\n", [FLU, Bytes]) + end || FLU <- OurFLUs], + + ok = execute_repair_directives(ConsistencyMode, Ds, Src, EpochID, + Verb, OurFLUs, ProxiesDict, ETS), + todo_yo_not_quite_ok catch What:Why -> @@ -120,7 +132,7 @@ make_missing_file_summary(Dict, AllFLUs) -> ], MissingFileSummary. -append_file_dict(Proxy, FLU_name, D) -> +get_file_lists(Proxy, FLU_name, D) -> {ok, Res} = machi_proxy_flu1_client:list_files(Proxy, ?DUMMY_PV1_EPOCH, ?SHORT_TIMEOUT), lists:foldl(fun({Size, File}, Dict) -> @@ -148,88 +160,18 @@ append_file_dict(Proxy, FLU_name, D) -> %% destination FLU. %% As an additional optimization, add a bit of #2 to start the next %% read while the current write is still in progress. - -repair_file(cp_mode=_ConsistencyMode, RepairMode, - File, Size, MissingList, Verb, Src, _FLUs, ProxiesDict, ETS) -> - %% TODO bit rot, fix for CP mode, yo - case lists:member(Src, MissingList) of - true -> - ?VERB("~p: ~s -> ~p, skipping: not on source server\n", - [Src, File, MissingList]); - false when RepairMode == check -> - ?VERB("~p: ~s -> ~p, copy ~s MB (skipped)\n", - [Src, File, MissingList, mbytes(Size)]); - false -> - ?VERB("~p: ~s -> ~p, copy ~s MB, ", - [Src, File, MissingList, mbytes(Size)]), - MissingProxiesDict = - orddict:filter(fun(K, _V) -> lists:member(K, MissingList) end, - ProxiesDict), - SrcProxy = orddict:fetch(Src, ProxiesDict), - ok = copy_file(File, SrcProxy, MissingProxiesDict, Verb, ETS), - ?VERB("done\n", []) - end. - - -copy_file(File, SrcProxy, MissingProxiesDict, Verb, ETS) -> - %% Use the first source socket to enumerate the chunks & checksums. - %% Use the second source socket to copy each chunk. - N = length(orddict:to_list(MissingProxiesDict)), - {ok, EpochID} = machi_proxy_flu1_client:get_epoch_id(SrcProxy, - ?SHORT_TIMEOUT), - {ok, CheckSums} = machi_proxy_flu1_client:checksum_list( - SrcProxy, EpochID, File, ?LONG_TIMEOUT), - EtsKeys = [{in_files, t_in_files}, {in_chunks, t_in_chunks}, - {in_bytes, t_in_bytes}, {out_files, t_out_files}, - {out_chunks, t_out_chunks}, {out_bytes, t_out_bytes}], - [ets:insert(ETS, {L_K, 0}) || {L_K, _T_K} <- EtsKeys], - CopyChunks = - fun({Offset, Size, CSum}, {ok, ETab, _, _} = Acc) -> - case ets:lookup_element(ETab, in_chunks, 2) rem 100 of - 0 -> ?VERB(".", []); - _ -> ok - end, - _T1 = os:timestamp(), - {ok, Chunk} = machi_proxy_flu1_client:read_chunk( - SrcProxy, EpochID, File, Offset, Size, - ?SHORT_TIMEOUT), - _T2 = os:timestamp(), - case machi_util:checksum_chunk(Chunk) of - CSum_now when CSum_now == CSum -> - [begin - _T3 = os:timestamp(), - ok = machi_proxy_flu1_client:write_chunk( - DstProxy, EpochID, File, Offset, Chunk, - ?SHORT_TIMEOUT), - _T4 = os:timestamp() - end || {_FLU, DstProxy} <- MissingProxiesDict], - ets:update_counter(ETab, in_chunks, 1), - ets:update_counter(ETab, in_bytes, Size), - ets:update_counter(ETab, out_chunks, N), - ets:update_counter(ETab, out_bytes, N*Size), - Acc; - CSum_now -> - error_logger:error_msg( - "TODO: Checksum failure: " - "file ~p offset ~p size ~p: " - "expected ~p got ~p\n", - [File, Offset, Size, CSum, CSum_now]), - ets:update_counter(ETab, t_bad_chunks, 1), - Acc - end; - (_, _=Acc) -> % failure: skip rest of file - Acc - end, - ets:update_counter(ETS, t_in_files, 1), - ets:update_counter(ETS, t_out_files, N), - _CopyRes = lists:foldl(CopyChunks, {ok, ETS, x, x}, CheckSums), - ?VERB("copied ~w chunks to ~w chunks, ", - [ets:lookup_element(ETS, K, 2) || K <- [in_chunks, out_chunks] ]), - - %% Copy this file's stats to the total counts. - [ets:update_counter(ETS, T_K, ets:lookup_element(ETS, L_K, 2)) || - {L_K, T_K} <- EtsKeys], - ok. +%% +%% Most/all of this could be executed in parallel on each FLU relative to +%% its own files. Then, in another TODO option, perhaps build a Merkle tree +%% or other summary of the local files & send that data structure to the +%% repair coordinator. +%% +%% Also, as another TODO note, repair_both_present() in the +%% prototype/demo-day code uses an optimization of calculating the MD5 +%% checksum of the chunk checksum data as it arrives, and if the two MD5s +%% match, then we consider the two files in sync. If there isn't a match, +%% then we sort the lines and try another MD5, and if they match, then we're +%% in sync. In theory, that's lower overhead than the procedure used here. make_repair_compare_fun(SrcFLU) -> fun({{Offset_X, _Sz_a, _Cs_a, FLU_a}, _N_a}, @@ -249,13 +191,10 @@ make_repair_compare_fun(SrcFLU) -> T_a =< T_b end. -make_repair_directives(ConsistencyMode, RepairMode, - File, Size, Verb, Src, FLUs0, ProxiesDict, ETS) -> +make_repair_directives(ConsistencyMode, RepairMode, File, Size, EpochID, + Verb, Src, FLUs0, ProxiesDict, ETS) -> true = (Size < ?MAX_OFFSET), FLUs = lists:usort(FLUs0), - SrcProxy = orddict:fetch(Src, ProxiesDict), - {ok, EpochID} = machi_proxy_flu1_client:get_epoch_id(SrcProxy, - ?SHORT_TIMEOUT), C0 = [begin %% erlang:garbage_collect(), Proxy = orddict:fetch(FLU, ProxiesDict), @@ -272,10 +211,8 @@ make_repair_directives(ConsistencyMode, RepairMode, %% erlang:garbage_collect(), C2 = lists:sort(make_repair_compare_fun(Src), C1), %% erlang:garbage_collect(), - ?VERB("Directives: "), Ds = make_repair_directives2(C2, ConsistencyMode, RepairMode, File, Verb, Src, FLUs, ProxiesDict, ETS), - ?VERB(" done\n"), Ds. make_repair_directives2(C2, ConsistencyMode, RepairMode, @@ -331,13 +268,22 @@ make_repair_directives3([{Offset, Size, CSum, FLU}=A|Rest0], true -> Src; false -> hd(Gots) end, - {copy, A, Missing}; + [ets:update_counter(ETS, {directive_bytes, FLU}, Size) || + FLU <- Missing], + if Missing == [] -> + noop; + true -> + {copy, A, Missing} + end; ConsistencyMode == cp_mode -> exit({todo_cp_mode, ?MODULE, ?LINE}) end, + Acc2 = if Do == noop -> Acc; + true -> [Do|Acc] + end, make_repair_directives3(Rest1, ConsistencyMode, RepairMode, - File, Verb, Src, FLUs, ProxiesDict, ETS, [Do|Acc]). + File, Verb, Src, FLUs, ProxiesDict, ETS, Acc2). take_same_offset_size(L, Offset, Size) -> take_same_offset_size(L, Offset, Size, []). @@ -347,170 +293,58 @@ take_same_offset_size([{Offset, Size, _CSum, _FLU}=A|Rest], Offset, Size, Acc) - take_same_offset_size(Rest, _Offset, _Size, Acc) -> {Acc, Rest}. -%% repair_both_present(File, Size, RepairMode, V, SrcS, _SrcS2, DstS, _DstS2) -> -%% Tmp1 = lists:flatten(io_lib:format("/tmp/sort.1.~w.~w.~w", tuple_to_list(now()))), -%% Tmp2 = lists:flatten(io_lib:format("/tmp/sort.2.~w.~w.~w", tuple_to_list(now()))), -%% J_Both = lists:flatten(io_lib:format("/tmp/join.3-both.~w.~w.~w", tuple_to_list(now()))), -%% J_SrcOnly = lists:flatten(io_lib:format("/tmp/join.4-src-only.~w.~w.~w", tuple_to_list(now()))), -%% J_DstOnly = lists:flatten(io_lib:format("/tmp/join.5-dst-only.~w.~w.~w", tuple_to_list(now()))), -%% S_Identical = lists:flatten(io_lib:format("/tmp/join.6-sort-identical.~w.~w.~w", tuple_to_list(now()))), -%% {ok, FH1} = file:open(Tmp1, [write, raw, binary]), -%% {ok, FH2} = file:open(Tmp2, [write, raw, binary]), -%% try -%% K = md5_ctx, -%% MD5_it = fun(Bin) -> -%% {FH, MD5ctx1} = get(K), -%% file:write(FH, Bin), -%% MD5ctx2 = crypto:hash_update(MD5ctx1, Bin), -%% put(K, {FH, MD5ctx2}) -%% end, -%% put(K, {FH1, crypto:hash_init(md5)}), -%% ok = escript_checksum_list(SrcS, File, fast, MD5_it), -%% {_, MD5_1} = get(K), -%% SrcMD5 = crypto:hash_final(MD5_1), -%% put(K, {FH2, crypto:hash_init(md5)}), -%% ok = escript_checksum_list(DstS, File, fast, MD5_it), -%% {_, MD5_2} = get(K), -%% DstMD5 = crypto:hash_final(MD5_2), -%% if SrcMD5 == DstMD5 -> -%% verb("identical\n", []); -%% true -> -%% ok = file:close(FH1), -%% ok = file:close(FH2), -%% _Q1 = os:cmd("./REPAIR-SORT-JOIN.sh " ++ Tmp1 ++ " " ++ Tmp2 ++ " " ++ J_Both ++ " " ++ J_SrcOnly ++ " " ++ J_DstOnly ++ " " ++ S_Identical), -%% case file:read_file_info(S_Identical) of -%% {ok, _} -> -%% verb("identical (secondary sort)\n", []); -%% {error, enoent} -> -%% io:format("differences found:"), -%% repair_both(File, Size, V, RepairMode, -%% J_Both, J_SrcOnly, J_DstOnly, -%% SrcS, DstS) -%% end -%% end -%% after -%% catch file:close(FH1), -%% catch file:close(FH2), -%% [(catch file:delete(FF)) || FF <- [Tmp1,Tmp2,J_Both,J_SrcOnly,J_DstOnly, -%% S_Identical]] -%% end. +execute_repair_directives(ap_mode=_ConsistencyMode, Ds, Src, EpochID, Verb, + OurFLUs, ProxiesDict, ETS) -> + {_,_,_,_} = lists:foldl(fun execute_repair_directive/2, + {ProxiesDict, EpochID, Verb, ETS}, Ds), + ok. -%% repair_both(File, _Size, V, RepairMode, J_Both, J_SrcOnly, J_DstOnly, SrcS, DstS) -> -%% AccFun = if RepairMode == check -> -%% fun(_X, List) -> List end; -%% RepairMode == repair -> -%% fun( X, List) -> [X|List] end -%% end, -%% BothFun = fun(<<_OffsetSrcHex:16/binary, " ", -%% LenSrcHex:8/binary, " ", CSumSrcHex:32/binary, " ", -%% LenDstHex:8/binary, " ", CSumDstHex:32/binary, "\n">> =Line, -%% {SameB, SameC, DiffB, DiffC, Ds}) -> -%% <> = hexstr_to_bin(LenSrcHex), -%% if LenSrcHex == LenDstHex, -%% CSumSrcHex == CSumDstHex -> -%% {SameB + Len, SameC + 1, DiffB, DiffC, Ds}; -%% true -> -%% %% D = {OffsetSrcHex, LenSrcHex, ........ -%% {SameB, SameC, DiffB + Len, DiffC + 1, -%% AccFun(Line, Ds)} -%% end; -%% (_Else, Acc) -> -%% Acc -%% end, -%% OnlyFun = fun(<<_OffsetSrcHex:16/binary, " ", LenSrcHex:8/binary, " ", -%% _CSumHex:32/binary, "\n">> = Line, -%% {DiffB, DiffC, Ds}) -> -%% <> = hexstr_to_bin(LenSrcHex), -%% {DiffB + Len, DiffC + 1, AccFun(Line, Ds)}; -%% (_Else, Acc) -> -%% Acc -%% end, -%% {SameBx, SameCx, DiffBy, DiffCy, BothDiffs} = -%% file_folder(BothFun, {0,0,0,0,[]}, J_Both), -%% {DiffB_src, DiffC_src, Ds_src} = file_folder(OnlyFun, {0,0,[]}, J_SrcOnly), -%% {DiffB_dst, DiffC_dst, Ds_dst} = file_folder(OnlyFun, {0,0,[]}, J_DstOnly), -%% if RepairMode == check orelse V == true -> -%% io:format("\n\t"), -%% io:format("BothR ~p, ", [{SameBx, SameCx, DiffBy, DiffCy}]), -%% io:format("SrcR ~p, ", [{DiffB_src, DiffC_src}]), -%% io:format("DstR ~p", [{DiffB_dst, DiffC_dst}]), -%% io:format("\n"); -%% true -> ok -%% end, -%% if RepairMode == repair -> -%% ok = repair_both_both(File, V, BothDiffs, SrcS, DstS), -%% ok = repair_copy_chunks(File, V, Ds_src, DiffB_src, DiffC_src, -%% SrcS, DstS), -%% ok = repair_copy_chunks(File, V, Ds_dst, DiffB_dst, DiffC_dst, -%% DstS, SrcS); -%% true -> -%% ok -%% end. - -%% repair_both_both(_File, _V, [_|_], _SrcS, _DstS) -> -%% %% TODO: fetch both, check checksums, hopefully only exactly one -%% %% is correct, then use that one to repair the other. And if the -%% %% sizes are different, hrm, there may be an extra corner case(s) -%% %% hiding there. -%% io:format("WHOA! We have differing checksums or sizes here, TODO not implemented, but there's trouble in the little village on the river....\n"), -%% timer:sleep(3*1000), -%% ok; -%% repair_both_both(_File, _V, [], _SrcS, _DstS) -> -%% ok. - -%% repair_copy_chunks(_File, _V, [], _DiffBytes, _DiffCount, _SrcS, _DstS) -> -%% ok; -%% repair_copy_chunks(File, V, ToBeCopied, DiffBytes, DiffCount, SrcS, DstS) -> -%% verb("\n", []), -%% verb("Starting copy of ~p chunks/~s MBytes to \n ~s: ", -%% [DiffCount, mbytes(DiffBytes), File]), -%% InnerCopyFun = copy_file_proc_checksum_fun(File, SrcS, DstS, V), -%% FoldFun = fun(Line, ok) -> -%% ok = InnerCopyFun(Line) % Strong sanity check -%% end, -%% ok = lists:foldl(FoldFun, ok, ToBeCopied), -%% verb(" done\n", []), -%% ok. - -%% copy_file_proc_checksum_fun(File, SrcS, DstS, _Verbose) -> -%% fun(<>) -> -%% <> = hexstr_to_bin(LenHex), -%% DownloadChunkBin = <>, -%% [Chunk] = escript_download_chunks(SrcS, {{{DownloadChunkBin}}}, -%% fun(_) -> ok end), -%% CSum = hexstr_to_bin(CSumHex), -%% CSum2 = checksum(Chunk), -%% if Len == byte_size(Chunk), CSum == CSum2 -> -%% {_,_,_} = upload_chunk_write(DstS, OffsetHex, File, Chunk), -%% ok; -%% true -> -%% io:format("ERROR: ~s ~s ~s csum/size error\n", -%% [File, OffsetHex, LenHex]), -%% error -%% end; -%% (_Else) -> -%% ok -%% end. - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% - -file_folder(Fun, Acc, Path) -> - {ok, FH} = file:open(Path, [read, raw, binary]), - try - file_folder2(Fun, Acc, FH) - after - file:close(FH) - end. - -file_folder2(Fun, Acc, FH) -> - file_folder2(file:read_line(FH), Fun, Acc, FH). - -file_folder2({ok, Line}, Fun, Acc, FH) -> - Acc2 = Fun(Line, Acc), - file_folder2(Fun, Acc2, FH); -file_folder2(eof, _Fun, Acc, _FH) -> +execute_repair_directive({File, Cmds}, {ProxiesDict, EpochID, Verb, ETS}=Acc) -> + EtsKeys = [{in_files, t_in_files}, {in_chunks, t_in_chunks}, + {in_bytes, t_in_bytes}, {out_files, t_out_files}, + {out_chunks, t_out_chunks}, {out_bytes, t_out_bytes}], + [ets:insert(ETS, {L_K, 0}) || {L_K, _T_K} <- EtsKeys], + F = fun({copy, {Offset, Size, CSum, MySrc}, MyDsts}, Acc) -> + SrcP = orddict:fetch(MySrc, ProxiesDict), + case ets:lookup_element(ETS, in_chunks, 2) rem 100 of + 0 -> ?VERB(".", []); + _ -> ok + end, + _T1 = os:timestamp(), + {ok, Chunk} = machi_proxy_flu1_client:read_chunk( + SrcP, EpochID, File, Offset, Size, + ?SHORT_TIMEOUT), + _T2 = os:timestamp(), + case machi_util:checksum_chunk(Chunk) of + CSum_now when CSum_now == CSum -> + [begin + DstP = orddict:fetch(DstFLU, ProxiesDict), + _T3 = os:timestamp(), + ok = machi_proxy_flu1_client:write_chunk( + DstP, EpochID, File, Offset, Chunk, + ?SHORT_TIMEOUT), + _T4 = os:timestamp() + end || DstFLU <- MyDsts], + ets:update_counter(ETS, in_chunks, 1), + ets:update_counter(ETS, in_bytes, Size), + N = length(MyDsts), + ets:update_counter(ETS, out_chunks, N), + ets:update_counter(ETS, out_bytes, N*Size), + Acc; + CSum_now -> + error_logger:error_msg( + "TODO: Checksum failure: " + "file ~p offset ~p size ~p: " + "expected ~p got ~p\n", + [File, Offset, Size, CSum, CSum_now]), + ets:update_counter(ETS, t_bad_chunks, 1), + Acc + end + end, + ok = lists:foldl(F, ok, Cmds), + %% Copy this file's stats to the total counts. + [ets:update_counter(ETS, T_K, ets:lookup_element(ETS, L_K, 2)) || + {L_K, T_K} <- EtsKeys], Acc. verb(Fmt) ->