From a9c753ad645d9b83988c6ab8557cb12b333b8d9a Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Fri, 15 May 2015 17:15:02 +0900 Subject: [PATCH] WIP: more generic all-way file chunk merge func --- src/machi_chain_manager1.erl | 4 +- src/machi_chain_repair.erl | 159 +++++++++++++++++++++++++++++------ 2 files changed, 137 insertions(+), 26 deletions(-) diff --git a/src/machi_chain_manager1.erl b/src/machi_chain_manager1.erl index 9ca5431..5041444 100644 --- a/src/machi_chain_manager1.erl +++ b/src/machi_chain_manager1.erl @@ -1959,8 +1959,8 @@ do_repair( t_bad_chunks, t_elapsed_seconds], [ets:insert(ETS, {K, 0}) || K <- ETS_T_Keys], - Res = machi_chain_repair:repair_ap(MyName, Repairing, UPI, - MembersDict, ETS, Opts), + Res = machi_chain_repair:repair(ap_mode, MyName, Repairing, UPI, + MembersDict, ETS, Opts), T2 = os:timestamp(), Elapsed = (timer:now_diff(T2, T1) div 1000) / 1000, ets:insert(ETS, {t_elapsed_seconds, Elapsed}), diff --git a/src/machi_chain_repair.erl b/src/machi_chain_repair.erl index bc923b4..546e5af 100644 --- a/src/machi_chain_repair.erl +++ b/src/machi_chain_repair.erl @@ -41,6 +41,7 @@ -define(SHORT_TIMEOUT, 5*1000). -define(LONG_TIMEOUT, 60*1000). +-define(MAX_OFFSET, 999*1024*1024*1024*1024*1024*1024*1024). %% These macros assume there's a bound variable called Verb. -define(VERB(Fmt), if Verb -> io:format(Fmt ); true -> ok end). @@ -50,19 +51,19 @@ -compile(export_all). -endif. % TEST --export([repair_cp/4, repair_ap/6]). +-export([repair/7]). repair_cp(_Src, _Dst, _MembersDict, _Opts) -> %% TODO: add missing function: wipe away any trace of chunks %% are present on Dst but missing on Src. exit(todo_cp_mode). -repair_ap(Src, Repairing, UPI, MembersDict, ETS, Opts) -> +repair(ap_mode=ConsistencyMode, Src, Repairing, UPI, MembersDict, ETS, Opts) -> %% Use process dict so that 'after' clause can always quit all %% proxy pids. put(proxies_dict, orddict:new()), Add = fun(Name, Pid) -> put(proxies_dict, orddict:store(Name, Pid, get(proxies_dict))) end, - OurFLUs = lists:usort([Src] ++ Repairing ++ UPI), + OurFLUs = lists:usort([Src] ++ Repairing ++ UPI), % AP assumption! Res = try [begin {ok, Proxy} = machi_proxy_flu1_client:start_link(P), @@ -79,11 +80,16 @@ repair_ap(Src, Repairing, UPI, MembersDict, ETS, Opts) -> %% Repair files from perspective of Src, i.e. tail(UPI). RepairMode = proplists:get_value(repair_mode, Opts, repair), - [ok = repair_file(ap_mode, RepairMode, File, Size, MissingList, - proplists:get_value(verbose, Opts, true), - Src, ProxiesDict, ETS) || - {File, {Size, MissingList}} <- MissingFileSummary], - ok + Ds = + [{File, make_repair_directives( + ConsistencyMode, RepairMode, File, Size, + proplists:get_value(verbose, Opts, true), + Src, OurFLUs, ProxiesDict, ETS)} || + {File, {Size, _MissingList}} <- MissingFileSummary], + %% TODO: for CP mode, any file missing from the Src FLU + %% must be deleted on all repairing FLUs + io:format(user, "Ds ~P\n", [Ds, 20]), + todo_yo_not_quite_ok catch What:Why -> Stack = erlang:get_stacktrace(), @@ -96,6 +102,10 @@ repair_ap(Src, Repairing, UPI, MembersDict, ETS, Opts) -> end, Res. +%% Create a list of servers where the file is completely missing. +%% In the "demo day" implementation and in an early integration WIP, +%% this was a useful thing. TODO: Can this be removed? + make_missing_file_summary(Dict, AllFLUs) -> %% FileFilterFun = fun(_) -> true end, FoldRes = lists:sort(dict:to_list(Dict)), @@ -117,6 +127,14 @@ append_file_dict(Proxy, FLU_name, D) -> dict:append(File, {FLU_name, Size}, Dict) end, D, Res). +%% Wow, it's so easy to bikeshed this into a 1 year programming exercise. +%% +%% TODO: There are a lot of areas for exploiting parallelism here. +%% I've set the bikeshed aside for now, but "make repair faster" has a +%% lot of room for exploiting concurrency, overlapping reads & writes, +%% etc etc. There are also lots of different trade-offs to make with +%% regard to RAM use vs. disk use. +%% %% TODO: There's no reason why repair can't be done 1).in parallel %% across multiple repairees, and/or 2). with multiple byte ranges in %% the same file, and/or 3). with bigger chunks. @@ -131,13 +149,9 @@ append_file_dict(Proxy, FLU_name, D) -> %% As an additional optimization, add a bit of #2 to start the next %% read while the current write is still in progress. -repair_file(ap_mode, _RepairMode, - File, _Size, [], Verb, Src, _ProxiesDict, _ETS) -> - ?VERB("~p: ~s: present on both: ", [Src, File]), - ?VERB("TODO!\n"), ok; - %%TODO: repair_both_present(File, Size, RepairMode, V, SrcS, SrcS2, DstS, DstS2); -repair_file(ap_mode, RepairMode, - File, Size, MissingList, Verb, Src, ProxiesDict, ETS) -> +repair_file(cp_mode=_ConsistencyMode, RepairMode, + File, Size, MissingList, Verb, Src, _FLUs, ProxiesDict, ETS) -> + %% TODO bit rot, fix for CP mode, yo case lists:member(Src, MissingList) of true -> ?VERB("~p: ~s -> ~p, skipping: not on source server\n", @@ -156,6 +170,7 @@ repair_file(ap_mode, RepairMode, ?VERB("done\n", []) end. + copy_file(File, SrcProxy, MissingProxiesDict, Verb, ETS) -> %% Use the first source socket to enumerate the chunks & checksums. %% Use the second source socket to copy each chunk. @@ -216,26 +231,122 @@ copy_file(File, SrcProxy, MissingProxiesDict, Verb, ETS) -> {L_K, T_K} <- EtsKeys], ok. -repair_both_present(_File, _Size, _RepairMode, Verb, _SrcS, _SrcS2, _DstS, _DstS2) -> - ?VERB("repair_both_present TODO\n"), - ok. - %% io:format("repair_both_present: ~p ~p mode ~p\n", [File, Size, RepairMode]). - make_repair_compare_fun(SrcFLU) -> - fun({{Offset_X, _Sz_a, FLU_a, _Cs_a}, _N_a}, - {{Offset_X, _Sz_b, FLU_b, _CS_b}, _N_b}) -> + fun({{Offset_X, _Sz_a, _Cs_a, FLU_a}, _N_a}, + {{Offset_X, _Sz_b, _CS_b, FLU_b}, _N_b}) -> %% The repair source FLU always sorts less/earlier than anything else. if FLU_a == SrcFLU -> true; FLU_b == SrcFLU -> false; true -> + %% Implicitly, smallest offset first. + %% Secondarily (and implicitly), sort smallest chunk size first FLU_a < FLU_b end; (T_a, T_b) -> + %% See implicitly comments above T_a =< T_b end. +make_repair_directives(ConsistencyMode, RepairMode, + File, Size, Verb, Src, FLUs0, ProxiesDict, ETS) -> + true = (Size < ?MAX_OFFSET), + FLUs = lists:usort(FLUs0), + SrcProxy = orddict:fetch(Src, ProxiesDict), + {ok, EpochID} = machi_proxy_flu1_client:get_epoch_id(SrcProxy, + ?SHORT_TIMEOUT), + C0 = [begin + %% erlang:garbage_collect(), + Proxy = orddict:fetch(FLU, ProxiesDict), + OffSzCs = case machi_proxy_flu1_client:checksum_list( + Proxy, EpochID, File, ?LONG_TIMEOUT) of + {ok, X} -> X; + {error, no_such_file} -> [] + end, + [{?MAX_OFFSET, 0, <<>>, FLU}] % our end-of-file marker + ++ + [{Off, Sz, Cs, FLU} || {Off, Sz, Cs} <- OffSzCs] + end || FLU <- FLUs], + C1 = lists:append(C0), + %% erlang:garbage_collect(), + C2 = lists:sort(make_repair_compare_fun(Src), C1), + %% erlang:garbage_collect(), + ?VERB("Directives: "), + Ds = make_repair_directives2(C2, ConsistencyMode, RepairMode, + File, Verb, Src, FLUs, ProxiesDict, ETS), + ?VERB(" done\n"), + Ds. + +make_repair_directives2(C2, ConsistencyMode, RepairMode, + File, Verb, Src, FLUs, ProxiesDict, ETS) -> + ?VERB("."), + make_repair_directives3(C2, ConsistencyMode, RepairMode, + File, Verb, Src, FLUs, ProxiesDict, ETS, []). + +make_repair_directives3([{?MAX_OFFSET, 0, <<>>, FLU}|_Rest], + _ConsistencyMode, _RepairMode, + _File, Verb, _Src, _FLUs, _ProxiesDict, _ETS, Acc) -> + lists:reverse(Acc); +make_repair_directives3([{Offset, Size, CSum, FLU}=A|Rest0], + ConsistencyMode, RepairMode, + File, Verb, Src, FLUs, ProxiesDict, ETS, Acc) -> + {As0, Rest1} = take_same_offset_size(Rest0, Offset, Size), + As = [A|As0], + %% Sanity checking time + case lists:all(fun({_, _, Cs, _}) when Cs == CSum -> true; + (_) -> false + end, As) of + true -> + ok; + false -> + %% TODO: Pathology: someone has the wrong checksum. + %% 1. Fetch Src's chunk. If checksum is valid, use this chunk + %% to repair any invalid value. + %% 2. If Src's chunk is invalid, then check for other copies + %% in the UPI. If there is a valid chunk there, use it to + %% repair any invalid value. + %% 3a. If there is no valid UPI chunk, then delete this + %% byte range from all FLUs + %% 3b. Log big warning about data loss. + %% 4. Log any other checksum discrepencies as they are found. + exit({todo_repair_sanity_check, ?LINE, File, Offset, As}) + end, + %% List construction guarantees us that there's at least one ?MAX_OFFSET + %% item remains. Sort order + our "taking" of all exact Offset+Size + %% tuples guarantees that if there's a disagreement about chunk size at + %% this offset, we can look ahead exactly one to see if there is sanity + %% or not. + [{Offset_next, Size_next, _, _}=A_next|_] = Rest1, + if Offset + Size =< Offset_next -> + ok; + true -> + exit({todo_repair_sanity_check, ?LINE, File, Offset, Size, + next_is, A_next}) + end, + Do = if ConsistencyMode == ap_mode -> + Gots = [FLU || {_Off, _Sz, _Cs, FLU} <- As], + Missing = FLUs -- Gots, + ThisSrc = case lists:member(Src, Gots) of + true -> Src; + false -> hd(Gots) + end, + {copy, A, Missing}; + ConsistencyMode == cp_mode -> + exit({todo_cp_mode, ?MODULE, ?LINE}) + end, + make_repair_directives3(Rest1, + ConsistencyMode, RepairMode, + File, Verb, Src, FLUs, ProxiesDict, ETS, [Do|Acc]). + +take_same_offset_size(L, Offset, Size) -> + take_same_offset_size(L, Offset, Size, []). + +take_same_offset_size([{Offset, Size, _CSum, _FLU}=A|Rest], Offset, Size, Acc) -> + take_same_offset_size(Rest, Offset, Size, [A|Acc]); +take_same_offset_size(Rest, _Offset, _Size, Acc) -> + {Acc, Rest}. + %% repair_both_present(File, Size, RepairMode, V, SrcS, _SrcS2, DstS, _DstS2) -> %% Tmp1 = lists:flatten(io_lib:format("/tmp/sort.1.~w.~w.~w", tuple_to_list(now()))), %% Tmp2 = lists:flatten(io_lib:format("/tmp/sort.2.~w.~w.~w", tuple_to_list(now()))), @@ -420,14 +531,14 @@ mbytes(Size) -> repair_compare_fun_test() -> F = make_repair_compare_fun(b), - List = [{{1,10,b,x},y},{{50,10,a,x},y},{{50,10,b,x},y},{{50,10,c,x},y},{{90,10,d,x},y}], + List = [{{1,10,x,b},y},{{50,10,x,a},y},{{50,10,x,b},y},{{50,10,x,c},y},{{90,10,x,d},y}], Input = lists:reverse(lists:sort(List)), %% Although the merge func should never have two of the same FLU %% represented, it doesn't matter for the purposes of this test. %% 1. Smaller offset (element #1) wins, else... %% 2. The FLU (element #2) that's the repair source always wins, else... %% 3. The FLU with smallest name wins. - Expect = [{{1,10,b,x},y},{{50,10,b,x},y},{{50,10,a,x},y},{{50,10,c,x},y},{{90,10,d,x},y}], + Expect = [{{1,10,x,b},y},{{50,10,x,b},y},{{50,10,x,a},y},{{50,10,x,c},y},{{90,10,x,d},y}], Expect = lists:sort(F, Input). -endif. % TEST