WIP: more generic all-way file chunk merge func, part 2

This commit is contained in:
Scott Lystig Fritchie 2015-05-16 16:55:48 +09:00
parent a9c753ad64
commit 04bc28b9da

View file

@ -73,22 +73,34 @@ repair(ap_mode=ConsistencyMode, Src, Repairing, UPI, MembersDict, ETS, Opts) ->
D = dict:new(), D = dict:new(),
D2 = lists:foldl(fun({FLU, Proxy}, Dict) -> D2 = lists:foldl(fun({FLU, Proxy}, Dict) ->
append_file_dict(Proxy, FLU, Dict) get_file_lists(Proxy, FLU, Dict)
end, D, ProxiesDict), end, D, ProxiesDict),
MissingFileSummary = make_missing_file_summary(D2, OurFLUs), MissingFileSummary = make_missing_file_summary(D2, OurFLUs),
io:format(user, "MissingFileSummary ~p\n", [MissingFileSummary]), ?VERB("MissingFileSummary ~p\n", [MissingFileSummary]),
[ets:insert(ETS, {{directive_bytes, FLU}, 0}) || FLU <- OurFLUs],
%% Repair files from perspective of Src, i.e. tail(UPI). %% Repair files from perspective of Src, i.e. tail(UPI).
RepairMode = proplists:get_value(repair_mode, Opts, repair), RepairMode = proplists:get_value(repair_mode, Opts, repair),
Verb = proplists:get_value(verbose, Opts, true),
SrcProxy = orddict:fetch(Src, ProxiesDict),
{ok, EpochID} = machi_proxy_flu1_client:get_epoch_id(
SrcProxy, ?SHORT_TIMEOUT),
Ds = Ds =
[{File, make_repair_directives( [{File, make_repair_directives(
ConsistencyMode, RepairMode, File, Size, ConsistencyMode, RepairMode, File, Size, EpochID,
proplists:get_value(verbose, Opts, true), Verb,
Src, OurFLUs, ProxiesDict, ETS)} || Src, OurFLUs, ProxiesDict, ETS)} ||
{File, {Size, _MissingList}} <- MissingFileSummary], {File, {Size, _MissingList}} <- MissingFileSummary],
%% TODO: for CP mode, any file missing from the Src FLU %% TODO: for CP mode, any file missing from the Src FLU
%% must be deleted on all repairing FLUs %% must be deleted on all repairing FLUs
io:format(user, "Ds ~P\n", [Ds, 20]), [begin
[{_, Bytes}] = ets:lookup(ETS, {directive_bytes, FLU}),
io:format(user, "Directive bytes for ~p: ~p\n", [FLU, Bytes])
end || FLU <- OurFLUs],
ok = execute_repair_directives(ConsistencyMode, Ds, Src, EpochID,
Verb, OurFLUs, ProxiesDict, ETS),
todo_yo_not_quite_ok todo_yo_not_quite_ok
catch catch
What:Why -> What:Why ->
@ -120,7 +132,7 @@ make_missing_file_summary(Dict, AllFLUs) ->
], ],
MissingFileSummary. MissingFileSummary.
append_file_dict(Proxy, FLU_name, D) -> get_file_lists(Proxy, FLU_name, D) ->
{ok, Res} = machi_proxy_flu1_client:list_files(Proxy, ?DUMMY_PV1_EPOCH, {ok, Res} = machi_proxy_flu1_client:list_files(Proxy, ?DUMMY_PV1_EPOCH,
?SHORT_TIMEOUT), ?SHORT_TIMEOUT),
lists:foldl(fun({Size, File}, Dict) -> lists:foldl(fun({Size, File}, Dict) ->
@ -148,88 +160,18 @@ append_file_dict(Proxy, FLU_name, D) ->
%% destination FLU. %% destination FLU.
%% As an additional optimization, add a bit of #2 to start the next %% As an additional optimization, add a bit of #2 to start the next
%% read while the current write is still in progress. %% read while the current write is still in progress.
%%
repair_file(cp_mode=_ConsistencyMode, RepairMode, %% Most/all of this could be executed in parallel on each FLU relative to
File, Size, MissingList, Verb, Src, _FLUs, ProxiesDict, ETS) -> %% its own files. Then, in another TODO option, perhaps build a Merkle tree
%% TODO bit rot, fix for CP mode, yo %% or other summary of the local files & send that data structure to the
case lists:member(Src, MissingList) of %% repair coordinator.
true -> %%
?VERB("~p: ~s -> ~p, skipping: not on source server\n", %% Also, as another TODO note, repair_both_present() in the
[Src, File, MissingList]); %% prototype/demo-day code uses an optimization of calculating the MD5
false when RepairMode == check -> %% checksum of the chunk checksum data as it arrives, and if the two MD5s
?VERB("~p: ~s -> ~p, copy ~s MB (skipped)\n", %% match, then we consider the two files in sync. If there isn't a match,
[Src, File, MissingList, mbytes(Size)]); %% then we sort the lines and try another MD5, and if they match, then we're
false -> %% in sync. In theory, that's lower overhead than the procedure used here.
?VERB("~p: ~s -> ~p, copy ~s MB, ",
[Src, File, MissingList, mbytes(Size)]),
MissingProxiesDict =
orddict:filter(fun(K, _V) -> lists:member(K, MissingList) end,
ProxiesDict),
SrcProxy = orddict:fetch(Src, ProxiesDict),
ok = copy_file(File, SrcProxy, MissingProxiesDict, Verb, ETS),
?VERB("done\n", [])
end.
copy_file(File, SrcProxy, MissingProxiesDict, Verb, ETS) ->
%% Use the first source socket to enumerate the chunks & checksums.
%% Use the second source socket to copy each chunk.
N = length(orddict:to_list(MissingProxiesDict)),
{ok, EpochID} = machi_proxy_flu1_client:get_epoch_id(SrcProxy,
?SHORT_TIMEOUT),
{ok, CheckSums} = machi_proxy_flu1_client:checksum_list(
SrcProxy, EpochID, File, ?LONG_TIMEOUT),
EtsKeys = [{in_files, t_in_files}, {in_chunks, t_in_chunks},
{in_bytes, t_in_bytes}, {out_files, t_out_files},
{out_chunks, t_out_chunks}, {out_bytes, t_out_bytes}],
[ets:insert(ETS, {L_K, 0}) || {L_K, _T_K} <- EtsKeys],
CopyChunks =
fun({Offset, Size, CSum}, {ok, ETab, _, _} = Acc) ->
case ets:lookup_element(ETab, in_chunks, 2) rem 100 of
0 -> ?VERB(".", []);
_ -> ok
end,
_T1 = os:timestamp(),
{ok, Chunk} = machi_proxy_flu1_client:read_chunk(
SrcProxy, EpochID, File, Offset, Size,
?SHORT_TIMEOUT),
_T2 = os:timestamp(),
case machi_util:checksum_chunk(Chunk) of
CSum_now when CSum_now == CSum ->
[begin
_T3 = os:timestamp(),
ok = machi_proxy_flu1_client:write_chunk(
DstProxy, EpochID, File, Offset, Chunk,
?SHORT_TIMEOUT),
_T4 = os:timestamp()
end || {_FLU, DstProxy} <- MissingProxiesDict],
ets:update_counter(ETab, in_chunks, 1),
ets:update_counter(ETab, in_bytes, Size),
ets:update_counter(ETab, out_chunks, N),
ets:update_counter(ETab, out_bytes, N*Size),
Acc;
CSum_now ->
error_logger:error_msg(
"TODO: Checksum failure: "
"file ~p offset ~p size ~p: "
"expected ~p got ~p\n",
[File, Offset, Size, CSum, CSum_now]),
ets:update_counter(ETab, t_bad_chunks, 1),
Acc
end;
(_, _=Acc) -> % failure: skip rest of file
Acc
end,
ets:update_counter(ETS, t_in_files, 1),
ets:update_counter(ETS, t_out_files, N),
_CopyRes = lists:foldl(CopyChunks, {ok, ETS, x, x}, CheckSums),
?VERB("copied ~w chunks to ~w chunks, ",
[ets:lookup_element(ETS, K, 2) || K <- [in_chunks, out_chunks] ]),
%% Copy this file's stats to the total counts.
[ets:update_counter(ETS, T_K, ets:lookup_element(ETS, L_K, 2)) ||
{L_K, T_K} <- EtsKeys],
ok.
make_repair_compare_fun(SrcFLU) -> make_repair_compare_fun(SrcFLU) ->
fun({{Offset_X, _Sz_a, _Cs_a, FLU_a}, _N_a}, fun({{Offset_X, _Sz_a, _Cs_a, FLU_a}, _N_a},
@ -249,13 +191,10 @@ make_repair_compare_fun(SrcFLU) ->
T_a =< T_b T_a =< T_b
end. end.
make_repair_directives(ConsistencyMode, RepairMode, make_repair_directives(ConsistencyMode, RepairMode, File, Size, EpochID,
File, Size, Verb, Src, FLUs0, ProxiesDict, ETS) -> Verb, Src, FLUs0, ProxiesDict, ETS) ->
true = (Size < ?MAX_OFFSET), true = (Size < ?MAX_OFFSET),
FLUs = lists:usort(FLUs0), FLUs = lists:usort(FLUs0),
SrcProxy = orddict:fetch(Src, ProxiesDict),
{ok, EpochID} = machi_proxy_flu1_client:get_epoch_id(SrcProxy,
?SHORT_TIMEOUT),
C0 = [begin C0 = [begin
%% erlang:garbage_collect(), %% erlang:garbage_collect(),
Proxy = orddict:fetch(FLU, ProxiesDict), Proxy = orddict:fetch(FLU, ProxiesDict),
@ -272,10 +211,8 @@ make_repair_directives(ConsistencyMode, RepairMode,
%% erlang:garbage_collect(), %% erlang:garbage_collect(),
C2 = lists:sort(make_repair_compare_fun(Src), C1), C2 = lists:sort(make_repair_compare_fun(Src), C1),
%% erlang:garbage_collect(), %% erlang:garbage_collect(),
?VERB("Directives: "),
Ds = make_repair_directives2(C2, ConsistencyMode, RepairMode, Ds = make_repair_directives2(C2, ConsistencyMode, RepairMode,
File, Verb, Src, FLUs, ProxiesDict, ETS), File, Verb, Src, FLUs, ProxiesDict, ETS),
?VERB(" done\n"),
Ds. Ds.
make_repair_directives2(C2, ConsistencyMode, RepairMode, make_repair_directives2(C2, ConsistencyMode, RepairMode,
@ -331,13 +268,22 @@ make_repair_directives3([{Offset, Size, CSum, FLU}=A|Rest0],
true -> Src; true -> Src;
false -> hd(Gots) false -> hd(Gots)
end, end,
{copy, A, Missing}; [ets:update_counter(ETS, {directive_bytes, FLU}, Size) ||
FLU <- Missing],
if Missing == [] ->
noop;
true ->
{copy, A, Missing}
end;
ConsistencyMode == cp_mode -> ConsistencyMode == cp_mode ->
exit({todo_cp_mode, ?MODULE, ?LINE}) exit({todo_cp_mode, ?MODULE, ?LINE})
end, end,
Acc2 = if Do == noop -> Acc;
true -> [Do|Acc]
end,
make_repair_directives3(Rest1, make_repair_directives3(Rest1,
ConsistencyMode, RepairMode, ConsistencyMode, RepairMode,
File, Verb, Src, FLUs, ProxiesDict, ETS, [Do|Acc]). File, Verb, Src, FLUs, ProxiesDict, ETS, Acc2).
take_same_offset_size(L, Offset, Size) -> take_same_offset_size(L, Offset, Size) ->
take_same_offset_size(L, Offset, Size, []). take_same_offset_size(L, Offset, Size, []).
@ -347,170 +293,58 @@ take_same_offset_size([{Offset, Size, _CSum, _FLU}=A|Rest], Offset, Size, Acc) -
take_same_offset_size(Rest, _Offset, _Size, Acc) -> take_same_offset_size(Rest, _Offset, _Size, Acc) ->
{Acc, Rest}. {Acc, Rest}.
%% repair_both_present(File, Size, RepairMode, V, SrcS, _SrcS2, DstS, _DstS2) -> execute_repair_directives(ap_mode=_ConsistencyMode, Ds, Src, EpochID, Verb,
%% Tmp1 = lists:flatten(io_lib:format("/tmp/sort.1.~w.~w.~w", tuple_to_list(now()))), OurFLUs, ProxiesDict, ETS) ->
%% Tmp2 = lists:flatten(io_lib:format("/tmp/sort.2.~w.~w.~w", tuple_to_list(now()))), {_,_,_,_} = lists:foldl(fun execute_repair_directive/2,
%% J_Both = lists:flatten(io_lib:format("/tmp/join.3-both.~w.~w.~w", tuple_to_list(now()))), {ProxiesDict, EpochID, Verb, ETS}, Ds),
%% J_SrcOnly = lists:flatten(io_lib:format("/tmp/join.4-src-only.~w.~w.~w", tuple_to_list(now()))), ok.
%% J_DstOnly = lists:flatten(io_lib:format("/tmp/join.5-dst-only.~w.~w.~w", tuple_to_list(now()))),
%% S_Identical = lists:flatten(io_lib:format("/tmp/join.6-sort-identical.~w.~w.~w", tuple_to_list(now()))),
%% {ok, FH1} = file:open(Tmp1, [write, raw, binary]),
%% {ok, FH2} = file:open(Tmp2, [write, raw, binary]),
%% try
%% K = md5_ctx,
%% MD5_it = fun(Bin) ->
%% {FH, MD5ctx1} = get(K),
%% file:write(FH, Bin),
%% MD5ctx2 = crypto:hash_update(MD5ctx1, Bin),
%% put(K, {FH, MD5ctx2})
%% end,
%% put(K, {FH1, crypto:hash_init(md5)}),
%% ok = escript_checksum_list(SrcS, File, fast, MD5_it),
%% {_, MD5_1} = get(K),
%% SrcMD5 = crypto:hash_final(MD5_1),
%% put(K, {FH2, crypto:hash_init(md5)}),
%% ok = escript_checksum_list(DstS, File, fast, MD5_it),
%% {_, MD5_2} = get(K),
%% DstMD5 = crypto:hash_final(MD5_2),
%% if SrcMD5 == DstMD5 ->
%% verb("identical\n", []);
%% true ->
%% ok = file:close(FH1),
%% ok = file:close(FH2),
%% _Q1 = os:cmd("./REPAIR-SORT-JOIN.sh " ++ Tmp1 ++ " " ++ Tmp2 ++ " " ++ J_Both ++ " " ++ J_SrcOnly ++ " " ++ J_DstOnly ++ " " ++ S_Identical),
%% case file:read_file_info(S_Identical) of
%% {ok, _} ->
%% verb("identical (secondary sort)\n", []);
%% {error, enoent} ->
%% io:format("differences found:"),
%% repair_both(File, Size, V, RepairMode,
%% J_Both, J_SrcOnly, J_DstOnly,
%% SrcS, DstS)
%% end
%% end
%% after
%% catch file:close(FH1),
%% catch file:close(FH2),
%% [(catch file:delete(FF)) || FF <- [Tmp1,Tmp2,J_Both,J_SrcOnly,J_DstOnly,
%% S_Identical]]
%% end.
%% repair_both(File, _Size, V, RepairMode, J_Both, J_SrcOnly, J_DstOnly, SrcS, DstS) -> execute_repair_directive({File, Cmds}, {ProxiesDict, EpochID, Verb, ETS}=Acc) ->
%% AccFun = if RepairMode == check -> EtsKeys = [{in_files, t_in_files}, {in_chunks, t_in_chunks},
%% fun(_X, List) -> List end; {in_bytes, t_in_bytes}, {out_files, t_out_files},
%% RepairMode == repair -> {out_chunks, t_out_chunks}, {out_bytes, t_out_bytes}],
%% fun( X, List) -> [X|List] end [ets:insert(ETS, {L_K, 0}) || {L_K, _T_K} <- EtsKeys],
%% end, F = fun({copy, {Offset, Size, CSum, MySrc}, MyDsts}, Acc) ->
%% BothFun = fun(<<_OffsetSrcHex:16/binary, " ", SrcP = orddict:fetch(MySrc, ProxiesDict),
%% LenSrcHex:8/binary, " ", CSumSrcHex:32/binary, " ", case ets:lookup_element(ETS, in_chunks, 2) rem 100 of
%% LenDstHex:8/binary, " ", CSumDstHex:32/binary, "\n">> =Line, 0 -> ?VERB(".", []);
%% {SameB, SameC, DiffB, DiffC, Ds}) -> _ -> ok
%% <<Len:32/big>> = hexstr_to_bin(LenSrcHex), end,
%% if LenSrcHex == LenDstHex, _T1 = os:timestamp(),
%% CSumSrcHex == CSumDstHex -> {ok, Chunk} = machi_proxy_flu1_client:read_chunk(
%% {SameB + Len, SameC + 1, DiffB, DiffC, Ds}; SrcP, EpochID, File, Offset, Size,
%% true -> ?SHORT_TIMEOUT),
%% %% D = {OffsetSrcHex, LenSrcHex, ........ _T2 = os:timestamp(),
%% {SameB, SameC, DiffB + Len, DiffC + 1, case machi_util:checksum_chunk(Chunk) of
%% AccFun(Line, Ds)} CSum_now when CSum_now == CSum ->
%% end; [begin
%% (_Else, Acc) -> DstP = orddict:fetch(DstFLU, ProxiesDict),
%% Acc _T3 = os:timestamp(),
%% end, ok = machi_proxy_flu1_client:write_chunk(
%% OnlyFun = fun(<<_OffsetSrcHex:16/binary, " ", LenSrcHex:8/binary, " ", DstP, EpochID, File, Offset, Chunk,
%% _CSumHex:32/binary, "\n">> = Line, ?SHORT_TIMEOUT),
%% {DiffB, DiffC, Ds}) -> _T4 = os:timestamp()
%% <<Len:32/big>> = hexstr_to_bin(LenSrcHex), end || DstFLU <- MyDsts],
%% {DiffB + Len, DiffC + 1, AccFun(Line, Ds)}; ets:update_counter(ETS, in_chunks, 1),
%% (_Else, Acc) -> ets:update_counter(ETS, in_bytes, Size),
%% Acc N = length(MyDsts),
%% end, ets:update_counter(ETS, out_chunks, N),
%% {SameBx, SameCx, DiffBy, DiffCy, BothDiffs} = ets:update_counter(ETS, out_bytes, N*Size),
%% file_folder(BothFun, {0,0,0,0,[]}, J_Both), Acc;
%% {DiffB_src, DiffC_src, Ds_src} = file_folder(OnlyFun, {0,0,[]}, J_SrcOnly), CSum_now ->
%% {DiffB_dst, DiffC_dst, Ds_dst} = file_folder(OnlyFun, {0,0,[]}, J_DstOnly), error_logger:error_msg(
%% if RepairMode == check orelse V == true -> "TODO: Checksum failure: "
%% io:format("\n\t"), "file ~p offset ~p size ~p: "
%% io:format("BothR ~p, ", [{SameBx, SameCx, DiffBy, DiffCy}]), "expected ~p got ~p\n",
%% io:format("SrcR ~p, ", [{DiffB_src, DiffC_src}]), [File, Offset, Size, CSum, CSum_now]),
%% io:format("DstR ~p", [{DiffB_dst, DiffC_dst}]), ets:update_counter(ETS, t_bad_chunks, 1),
%% io:format("\n"); Acc
%% true -> ok end
%% end, end,
%% if RepairMode == repair -> ok = lists:foldl(F, ok, Cmds),
%% ok = repair_both_both(File, V, BothDiffs, SrcS, DstS), %% Copy this file's stats to the total counts.
%% ok = repair_copy_chunks(File, V, Ds_src, DiffB_src, DiffC_src, [ets:update_counter(ETS, T_K, ets:lookup_element(ETS, L_K, 2)) ||
%% SrcS, DstS), {L_K, T_K} <- EtsKeys],
%% ok = repair_copy_chunks(File, V, Ds_dst, DiffB_dst, DiffC_dst,
%% DstS, SrcS);
%% true ->
%% ok
%% end.
%% repair_both_both(_File, _V, [_|_], _SrcS, _DstS) ->
%% %% TODO: fetch both, check checksums, hopefully only exactly one
%% %% is correct, then use that one to repair the other. And if the
%% %% sizes are different, hrm, there may be an extra corner case(s)
%% %% hiding there.
%% io:format("WHOA! We have differing checksums or sizes here, TODO not implemented, but there's trouble in the little village on the river....\n"),
%% timer:sleep(3*1000),
%% ok;
%% repair_both_both(_File, _V, [], _SrcS, _DstS) ->
%% ok.
%% repair_copy_chunks(_File, _V, [], _DiffBytes, _DiffCount, _SrcS, _DstS) ->
%% ok;
%% repair_copy_chunks(File, V, ToBeCopied, DiffBytes, DiffCount, SrcS, DstS) ->
%% verb("\n", []),
%% verb("Starting copy of ~p chunks/~s MBytes to \n ~s: ",
%% [DiffCount, mbytes(DiffBytes), File]),
%% InnerCopyFun = copy_file_proc_checksum_fun(File, SrcS, DstS, V),
%% FoldFun = fun(Line, ok) ->
%% ok = InnerCopyFun(Line) % Strong sanity check
%% end,
%% ok = lists:foldl(FoldFun, ok, ToBeCopied),
%% verb(" done\n", []),
%% ok.
%% copy_file_proc_checksum_fun(File, SrcS, DstS, _Verbose) ->
%% fun(<<OffsetHex:16/binary, " ", LenHex:8/binary, " ",
%% CSumHex:32/binary, "\n">>) ->
%% <<Len:32/big>> = hexstr_to_bin(LenHex),
%% DownloadChunkBin = <<OffsetHex/binary, " ", LenHex/binary, " ",
%% File/binary, "\n">>,
%% [Chunk] = escript_download_chunks(SrcS, {{{DownloadChunkBin}}},
%% fun(_) -> ok end),
%% CSum = hexstr_to_bin(CSumHex),
%% CSum2 = checksum(Chunk),
%% if Len == byte_size(Chunk), CSum == CSum2 ->
%% {_,_,_} = upload_chunk_write(DstS, OffsetHex, File, Chunk),
%% ok;
%% true ->
%% io:format("ERROR: ~s ~s ~s csum/size error\n",
%% [File, OffsetHex, LenHex]),
%% error
%% end;
%% (_Else) ->
%% ok
%% end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
file_folder(Fun, Acc, Path) ->
{ok, FH} = file:open(Path, [read, raw, binary]),
try
file_folder2(Fun, Acc, FH)
after
file:close(FH)
end.
file_folder2(Fun, Acc, FH) ->
file_folder2(file:read_line(FH), Fun, Acc, FH).
file_folder2({ok, Line}, Fun, Acc, FH) ->
Acc2 = Fun(Line, Acc),
file_folder2(Fun, Acc2, FH);
file_folder2(eof, _Fun, Acc, _FH) ->
Acc. Acc.
verb(Fmt) -> verb(Fmt) ->