WIP: more generic all-way file chunk merge func, part 3

This commit is contained in:
Scott Lystig Fritchie 2015-05-16 17:11:54 +09:00
parent 04bc28b9da
commit d2f1549aa3

View file

@ -64,6 +64,8 @@ repair(ap_mode=ConsistencyMode, Src, Repairing, UPI, MembersDict, ETS, Opts) ->
put(proxies_dict, orddict:new()), put(proxies_dict, orddict:new()),
Add = fun(Name, Pid) -> put(proxies_dict, orddict:store(Name, Pid, get(proxies_dict))) end, Add = fun(Name, Pid) -> put(proxies_dict, orddict:store(Name, Pid, get(proxies_dict))) end,
OurFLUs = lists:usort([Src] ++ Repairing ++ UPI), % AP assumption! OurFLUs = lists:usort([Src] ++ Repairing ++ UPI), % AP assumption!
RepairMode = proplists:get_value(repair_mode, Opts, repair),
Verb = proplists:get_value(verbose, Opts, true),
Res = try Res = try
[begin [begin
{ok, Proxy} = machi_proxy_flu1_client:start_link(P), {ok, Proxy} = machi_proxy_flu1_client:start_link(P),
@ -80,26 +82,27 @@ repair(ap_mode=ConsistencyMode, Src, Repairing, UPI, MembersDict, ETS, Opts) ->
[ets:insert(ETS, {{directive_bytes, FLU}, 0}) || FLU <- OurFLUs], [ets:insert(ETS, {{directive_bytes, FLU}, 0}) || FLU <- OurFLUs],
%% Repair files from perspective of Src, i.e. tail(UPI). %% Repair files from perspective of Src, i.e. tail(UPI).
RepairMode = proplists:get_value(repair_mode, Opts, repair),
Verb = proplists:get_value(verbose, Opts, true),
SrcProxy = orddict:fetch(Src, ProxiesDict), SrcProxy = orddict:fetch(Src, ProxiesDict),
{ok, EpochID} = machi_proxy_flu1_client:get_epoch_id( {ok, EpochID} = machi_proxy_flu1_client:get_epoch_id(
SrcProxy, ?SHORT_TIMEOUT), SrcProxy, ?SHORT_TIMEOUT),
?VERB("Make repair directives: "),
Ds = Ds =
[{File, make_repair_directives( [{File, make_repair_directives(
ConsistencyMode, RepairMode, File, Size, EpochID, ConsistencyMode, RepairMode, File, Size, EpochID,
Verb, Verb,
Src, OurFLUs, ProxiesDict, ETS)} || Src, OurFLUs, ProxiesDict, ETS)} ||
{File, {Size, _MissingList}} <- MissingFileSummary], {File, {Size, _MissingList}} <- MissingFileSummary],
%% TODO: for CP mode, any file missing from the Src FLU ?VERB(" done\n"),
%% must be deleted on all repairing FLUs
[begin [begin
[{_, Bytes}] = ets:lookup(ETS, {directive_bytes, FLU}), [{_, Bytes}] = ets:lookup(ETS, {directive_bytes, FLU}),
io:format(user, "Directive bytes for ~p: ~p\n", [FLU, Bytes]) ?VERB("Out-of-sync data for FLU ~p: ~s MBytes\n",
[FLU, mbytes(Bytes)])
end || FLU <- OurFLUs], end || FLU <- OurFLUs],
?VERB("Make repair directives: "),
ok = execute_repair_directives(ConsistencyMode, Ds, Src, EpochID, ok = execute_repair_directives(ConsistencyMode, Ds, Src, EpochID,
Verb, OurFLUs, ProxiesDict, ETS), Verb, OurFLUs, ProxiesDict, ETS),
?VERB(" done\n"),
todo_yo_not_quite_ok todo_yo_not_quite_ok
catch catch
@ -172,6 +175,15 @@ get_file_lists(Proxy, FLU_name, D) ->
%% match, then we consider the two files in sync. If there isn't a match, %% match, then we consider the two files in sync. If there isn't a match,
%% then we sort the lines and try another MD5, and if they match, then we're %% then we sort the lines and try another MD5, and if they match, then we're
%% in sync. In theory, that's lower overhead than the procedure used here. %% in sync. In theory, that's lower overhead than the procedure used here.
%%
%% NOTE that one reason I chose the "directives list" method is to have an
%% option, later, of choosing to repair a subset of repairee FLUs if there
%% is a big discrepency between out of sync files: e.g., if FLU x has N
%% bytes out of sync but FLU y has 50N bytes out of sync, then it's likely
%% better to repair x only so that x can return to the UPI list quickly.
%% Also, in the event that all repairees are roughly comparably out of sync,
%% then the repair network traffic can be minimized by reading each chunk
%% only once.
make_repair_compare_fun(SrcFLU) -> make_repair_compare_fun(SrcFLU) ->
fun({{Offset_X, _Sz_a, _Cs_a, FLU_a}, _N_a}, fun({{Offset_X, _Sz_a, _Cs_a, FLU_a}, _N_a},