WIP: more generic all-way file chunk merge func

This commit is contained in:
Scott Lystig Fritchie 2015-05-15 17:15:02 +09:00
parent 358764d403
commit a9c753ad64
2 changed files with 137 additions and 26 deletions

View file

@ -1959,8 +1959,8 @@ do_repair(
t_bad_chunks, t_elapsed_seconds],
[ets:insert(ETS, {K, 0}) || K <- ETS_T_Keys],
Res = machi_chain_repair:repair_ap(MyName, Repairing, UPI,
MembersDict, ETS, Opts),
Res = machi_chain_repair:repair(ap_mode, MyName, Repairing, UPI,
MembersDict, ETS, Opts),
T2 = os:timestamp(),
Elapsed = (timer:now_diff(T2, T1) div 1000) / 1000,
ets:insert(ETS, {t_elapsed_seconds, Elapsed}),

View file

@ -41,6 +41,7 @@
-define(SHORT_TIMEOUT, 5*1000).
-define(LONG_TIMEOUT, 60*1000).
-define(MAX_OFFSET, 999*1024*1024*1024*1024*1024*1024*1024).
%% These macros assume there's a bound variable called Verb.
-define(VERB(Fmt), if Verb -> io:format(Fmt ); true -> ok end).
@ -50,19 +51,19 @@
-compile(export_all).
-endif. % TEST
-export([repair_cp/4, repair_ap/6]).
-export([repair/7]).
repair_cp(_Src, _Dst, _MembersDict, _Opts) ->
%% TODO: add missing function: wipe away any trace of chunks
%% are present on Dst but missing on Src.
exit(todo_cp_mode).
repair_ap(Src, Repairing, UPI, MembersDict, ETS, Opts) ->
repair(ap_mode=ConsistencyMode, Src, Repairing, UPI, MembersDict, ETS, Opts) ->
%% Use process dict so that 'after' clause can always quit all
%% proxy pids.
put(proxies_dict, orddict:new()),
Add = fun(Name, Pid) -> put(proxies_dict, orddict:store(Name, Pid, get(proxies_dict))) end,
OurFLUs = lists:usort([Src] ++ Repairing ++ UPI),
OurFLUs = lists:usort([Src] ++ Repairing ++ UPI), % AP assumption!
Res = try
[begin
{ok, Proxy} = machi_proxy_flu1_client:start_link(P),
@ -79,11 +80,16 @@ repair_ap(Src, Repairing, UPI, MembersDict, ETS, Opts) ->
%% Repair files from perspective of Src, i.e. tail(UPI).
RepairMode = proplists:get_value(repair_mode, Opts, repair),
[ok = repair_file(ap_mode, RepairMode, File, Size, MissingList,
proplists:get_value(verbose, Opts, true),
Src, ProxiesDict, ETS) ||
{File, {Size, MissingList}} <- MissingFileSummary],
ok
Ds =
[{File, make_repair_directives(
ConsistencyMode, RepairMode, File, Size,
proplists:get_value(verbose, Opts, true),
Src, OurFLUs, ProxiesDict, ETS)} ||
{File, {Size, _MissingList}} <- MissingFileSummary],
%% TODO: for CP mode, any file missing from the Src FLU
%% must be deleted on all repairing FLUs
io:format(user, "Ds ~P\n", [Ds, 20]),
todo_yo_not_quite_ok
catch
What:Why ->
Stack = erlang:get_stacktrace(),
@ -96,6 +102,10 @@ repair_ap(Src, Repairing, UPI, MembersDict, ETS, Opts) ->
end,
Res.
%% Create a list of servers where the file is completely missing.
%% In the "demo day" implementation and in an early integration WIP,
%% this was a useful thing. TODO: Can this be removed?
make_missing_file_summary(Dict, AllFLUs) ->
%% FileFilterFun = fun(_) -> true end,
FoldRes = lists:sort(dict:to_list(Dict)),
@ -117,6 +127,14 @@ append_file_dict(Proxy, FLU_name, D) ->
dict:append(File, {FLU_name, Size}, Dict)
end, D, Res).
%% Wow, it's so easy to bikeshed this into a 1 year programming exercise.
%%
%% TODO: There are a lot of areas for exploiting parallelism here.
%% I've set the bikeshed aside for now, but "make repair faster" has a
%% lot of room for exploiting concurrency, overlapping reads & writes,
%% etc etc. There are also lots of different trade-offs to make with
%% regard to RAM use vs. disk use.
%%
%% TODO: There's no reason why repair can't be done 1).in parallel
%% across multiple repairees, and/or 2). with multiple byte ranges in
%% the same file, and/or 3). with bigger chunks.
@ -131,13 +149,9 @@ append_file_dict(Proxy, FLU_name, D) ->
%% As an additional optimization, add a bit of #2 to start the next
%% read while the current write is still in progress.
repair_file(ap_mode, _RepairMode,
File, _Size, [], Verb, Src, _ProxiesDict, _ETS) ->
?VERB("~p: ~s: present on both: ", [Src, File]),
?VERB("TODO!\n"), ok;
%%TODO: repair_both_present(File, Size, RepairMode, V, SrcS, SrcS2, DstS, DstS2);
repair_file(ap_mode, RepairMode,
File, Size, MissingList, Verb, Src, ProxiesDict, ETS) ->
repair_file(cp_mode=_ConsistencyMode, RepairMode,
File, Size, MissingList, Verb, Src, _FLUs, ProxiesDict, ETS) ->
%% TODO bit rot, fix for CP mode, yo
case lists:member(Src, MissingList) of
true ->
?VERB("~p: ~s -> ~p, skipping: not on source server\n",
@ -156,6 +170,7 @@ repair_file(ap_mode, RepairMode,
?VERB("done\n", [])
end.
copy_file(File, SrcProxy, MissingProxiesDict, Verb, ETS) ->
%% Use the first source socket to enumerate the chunks & checksums.
%% Use the second source socket to copy each chunk.
@ -216,26 +231,122 @@ copy_file(File, SrcProxy, MissingProxiesDict, Verb, ETS) ->
{L_K, T_K} <- EtsKeys],
ok.
repair_both_present(_File, _Size, _RepairMode, Verb, _SrcS, _SrcS2, _DstS, _DstS2) ->
?VERB("repair_both_present TODO\n"),
ok.
%% io:format("repair_both_present: ~p ~p mode ~p\n", [File, Size, RepairMode]).
make_repair_compare_fun(SrcFLU) ->
fun({{Offset_X, _Sz_a, FLU_a, _Cs_a}, _N_a},
{{Offset_X, _Sz_b, FLU_b, _CS_b}, _N_b}) ->
fun({{Offset_X, _Sz_a, _Cs_a, FLU_a}, _N_a},
{{Offset_X, _Sz_b, _CS_b, FLU_b}, _N_b}) ->
%% The repair source FLU always sorts less/earlier than anything else.
if FLU_a == SrcFLU ->
true;
FLU_b == SrcFLU ->
false;
true ->
%% Implicitly, smallest offset first.
%% Secondarily (and implicitly), sort smallest chunk size first
FLU_a < FLU_b
end;
(T_a, T_b) ->
%% See implicitly comments above
T_a =< T_b
end.
make_repair_directives(ConsistencyMode, RepairMode,
File, Size, Verb, Src, FLUs0, ProxiesDict, ETS) ->
true = (Size < ?MAX_OFFSET),
FLUs = lists:usort(FLUs0),
SrcProxy = orddict:fetch(Src, ProxiesDict),
{ok, EpochID} = machi_proxy_flu1_client:get_epoch_id(SrcProxy,
?SHORT_TIMEOUT),
C0 = [begin
%% erlang:garbage_collect(),
Proxy = orddict:fetch(FLU, ProxiesDict),
OffSzCs = case machi_proxy_flu1_client:checksum_list(
Proxy, EpochID, File, ?LONG_TIMEOUT) of
{ok, X} -> X;
{error, no_such_file} -> []
end,
[{?MAX_OFFSET, 0, <<>>, FLU}] % our end-of-file marker
++
[{Off, Sz, Cs, FLU} || {Off, Sz, Cs} <- OffSzCs]
end || FLU <- FLUs],
C1 = lists:append(C0),
%% erlang:garbage_collect(),
C2 = lists:sort(make_repair_compare_fun(Src), C1),
%% erlang:garbage_collect(),
?VERB("Directives: "),
Ds = make_repair_directives2(C2, ConsistencyMode, RepairMode,
File, Verb, Src, FLUs, ProxiesDict, ETS),
?VERB(" done\n"),
Ds.
make_repair_directives2(C2, ConsistencyMode, RepairMode,
File, Verb, Src, FLUs, ProxiesDict, ETS) ->
?VERB("."),
make_repair_directives3(C2, ConsistencyMode, RepairMode,
File, Verb, Src, FLUs, ProxiesDict, ETS, []).
make_repair_directives3([{?MAX_OFFSET, 0, <<>>, FLU}|_Rest],
_ConsistencyMode, _RepairMode,
_File, Verb, _Src, _FLUs, _ProxiesDict, _ETS, Acc) ->
lists:reverse(Acc);
make_repair_directives3([{Offset, Size, CSum, FLU}=A|Rest0],
ConsistencyMode, RepairMode,
File, Verb, Src, FLUs, ProxiesDict, ETS, Acc) ->
{As0, Rest1} = take_same_offset_size(Rest0, Offset, Size),
As = [A|As0],
%% Sanity checking time
case lists:all(fun({_, _, Cs, _}) when Cs == CSum -> true;
(_) -> false
end, As) of
true ->
ok;
false ->
%% TODO: Pathology: someone has the wrong checksum.
%% 1. Fetch Src's chunk. If checksum is valid, use this chunk
%% to repair any invalid value.
%% 2. If Src's chunk is invalid, then check for other copies
%% in the UPI. If there is a valid chunk there, use it to
%% repair any invalid value.
%% 3a. If there is no valid UPI chunk, then delete this
%% byte range from all FLUs
%% 3b. Log big warning about data loss.
%% 4. Log any other checksum discrepencies as they are found.
exit({todo_repair_sanity_check, ?LINE, File, Offset, As})
end,
%% List construction guarantees us that there's at least one ?MAX_OFFSET
%% item remains. Sort order + our "taking" of all exact Offset+Size
%% tuples guarantees that if there's a disagreement about chunk size at
%% this offset, we can look ahead exactly one to see if there is sanity
%% or not.
[{Offset_next, Size_next, _, _}=A_next|_] = Rest1,
if Offset + Size =< Offset_next ->
ok;
true ->
exit({todo_repair_sanity_check, ?LINE, File, Offset, Size,
next_is, A_next})
end,
Do = if ConsistencyMode == ap_mode ->
Gots = [FLU || {_Off, _Sz, _Cs, FLU} <- As],
Missing = FLUs -- Gots,
ThisSrc = case lists:member(Src, Gots) of
true -> Src;
false -> hd(Gots)
end,
{copy, A, Missing};
ConsistencyMode == cp_mode ->
exit({todo_cp_mode, ?MODULE, ?LINE})
end,
make_repair_directives3(Rest1,
ConsistencyMode, RepairMode,
File, Verb, Src, FLUs, ProxiesDict, ETS, [Do|Acc]).
take_same_offset_size(L, Offset, Size) ->
take_same_offset_size(L, Offset, Size, []).
take_same_offset_size([{Offset, Size, _CSum, _FLU}=A|Rest], Offset, Size, Acc) ->
take_same_offset_size(Rest, Offset, Size, [A|Acc]);
take_same_offset_size(Rest, _Offset, _Size, Acc) ->
{Acc, Rest}.
%% repair_both_present(File, Size, RepairMode, V, SrcS, _SrcS2, DstS, _DstS2) ->
%% Tmp1 = lists:flatten(io_lib:format("/tmp/sort.1.~w.~w.~w", tuple_to_list(now()))),
%% Tmp2 = lists:flatten(io_lib:format("/tmp/sort.2.~w.~w.~w", tuple_to_list(now()))),
@ -420,14 +531,14 @@ mbytes(Size) ->
repair_compare_fun_test() ->
F = make_repair_compare_fun(b),
List = [{{1,10,b,x},y},{{50,10,a,x},y},{{50,10,b,x},y},{{50,10,c,x},y},{{90,10,d,x},y}],
List = [{{1,10,x,b},y},{{50,10,x,a},y},{{50,10,x,b},y},{{50,10,x,c},y},{{90,10,x,d},y}],
Input = lists:reverse(lists:sort(List)),
%% Although the merge func should never have two of the same FLU
%% represented, it doesn't matter for the purposes of this test.
%% 1. Smaller offset (element #1) wins, else...
%% 2. The FLU (element #2) that's the repair source always wins, else...
%% 3. The FLU with smallest name wins.
Expect = [{{1,10,b,x},y},{{50,10,b,x},y},{{50,10,a,x},y},{{50,10,c,x},y},{{90,10,d,x},y}],
Expect = [{{1,10,x,b},y},{{50,10,x,b},y},{{50,10,x,a},y},{{50,10,x,c},y},{{90,10,x,d},y}],
Expect = lists:sort(F, Input).
-endif. % TEST