Basic multi-party chain repair for ap_mode finished
This commit is contained in:
parent
d2f1549aa3
commit
5c2635346f
2 changed files with 31 additions and 25 deletions
|
@ -86,9 +86,9 @@
|
||||||
%% Define the period of private projection stability before we'll
|
%% Define the period of private projection stability before we'll
|
||||||
%% start repair.
|
%% start repair.
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
-define(REPAIR_START_STABILITY_TIME, 0).
|
-define(REPAIR_START_STABILITY_TIME, 3).
|
||||||
-else. % TEST
|
-else. % TEST
|
||||||
-define(REPAIR_START_STABILITY_TIME, 3). % TODO suggest 10 or 15?
|
-define(REPAIR_START_STABILITY_TIME, 10).
|
||||||
-endif. % TEST
|
-endif. % TEST
|
||||||
|
|
||||||
%% API
|
%% API
|
||||||
|
@ -555,7 +555,9 @@ calc_projection(#ch_mgr{proj=LastProj, runenv=RunEnv} = S,
|
||||||
|
|
||||||
calc_projection(_OldThreshold, _NoPartitionThreshold, LastProj,
|
calc_projection(_OldThreshold, _NoPartitionThreshold, LastProj,
|
||||||
RelativeToServer, AllHosed, Dbg,
|
RelativeToServer, AllHosed, Dbg,
|
||||||
#ch_mgr{name=MyName, runenv=RunEnv1}=S) ->
|
#ch_mgr{name=MyName,
|
||||||
|
runenv=RunEnv1,
|
||||||
|
repair_final_status=RepairFS}=S) ->
|
||||||
#projection_v1{epoch_number=OldEpochNum,
|
#projection_v1{epoch_number=OldEpochNum,
|
||||||
members_dict=MembersDict,
|
members_dict=MembersDict,
|
||||||
upi=OldUPI_list,
|
upi=OldUPI_list,
|
||||||
|
@ -596,6 +598,11 @@ calc_projection(_OldThreshold, _NoPartitionThreshold, LastProj,
|
||||||
if Simulator_p andalso SameEpoch_p ->
|
if Simulator_p andalso SameEpoch_p ->
|
||||||
D_foo=[{repair_airquote_done, {we_agree, (S#ch_mgr.proj)#projection_v1.epoch_number}}],
|
D_foo=[{repair_airquote_done, {we_agree, (S#ch_mgr.proj)#projection_v1.epoch_number}}],
|
||||||
{NewUPI_list ++ [H], T, RunEnv2};
|
{NewUPI_list ++ [H], T, RunEnv2};
|
||||||
|
not Simulator_p
|
||||||
|
andalso
|
||||||
|
RepairFS == {repair_final_status, ok} ->
|
||||||
|
D_foo=[{repair_done, {repair_final_status, ok, (S#ch_mgr.proj)#projection_v1.epoch_number}}],
|
||||||
|
{NewUPI_list ++ Repairing_list2, [], RunEnv2};
|
||||||
true ->
|
true ->
|
||||||
D_foo=[],
|
D_foo=[],
|
||||||
{NewUPI_list, OldRepairing_list, RunEnv2}
|
{NewUPI_list, OldRepairing_list, RunEnv2}
|
||||||
|
@ -1947,11 +1954,11 @@ do_repair(
|
||||||
proj=#projection_v1{upi=UPI,
|
proj=#projection_v1{upi=UPI,
|
||||||
repairing=[_|_]=Repairing,
|
repairing=[_|_]=Repairing,
|
||||||
members_dict=MembersDict}}=_S_copy,
|
members_dict=MembersDict}}=_S_copy,
|
||||||
Opts, ap_mode=_RepairMode) ->
|
Opts, ap_mode=RepairMode) ->
|
||||||
T1 = os:timestamp(),
|
T1 = os:timestamp(),
|
||||||
RepairId = proplists:get_value(repair_id, Opts, id1),
|
RepairId = proplists:get_value(repair_id, Opts, id1),
|
||||||
error_logger:info_msg("Repair start: tail ~p of ~p -> ~p, ID ~p\n",
|
error_logger:info_msg("Repair start: tail ~p of ~p -> ~p, ~p ID ~w\n",
|
||||||
[MyName, UPI, Repairing, RepairId]),
|
[MyName, UPI, Repairing, RepairMode, RepairId]),
|
||||||
|
|
||||||
ETS = ets:new(repair_stats, [private, set]),
|
ETS = ets:new(repair_stats, [private, set]),
|
||||||
ETS_T_Keys = [t_in_files, t_in_chunks, t_in_bytes,
|
ETS_T_Keys = [t_in_files, t_in_chunks, t_in_bytes,
|
||||||
|
@ -1968,12 +1975,12 @@ do_repair(
|
||||||
_ -> "FAILURE"
|
_ -> "FAILURE"
|
||||||
end,
|
end,
|
||||||
Stats = [{K, ets:lookup_element(ETS, K, 2)} || K <- ETS_T_Keys],
|
Stats = [{K, ets:lookup_element(ETS, K, 2)} || K <- ETS_T_Keys],
|
||||||
error_logger:info_msg("Repair ~s: tail ~p of ~p finished repair ID ~p: "
|
error_logger:info_msg("Repair ~s: tail ~p of ~p finished ~p repair ID ~w: "
|
||||||
"~p\nStats ~w\n",
|
"~w\nStats ~w\n",
|
||||||
[Summary, MyName, UPI, RepairId, Res, Stats]),
|
[Summary, MyName, UPI, RepairMode, RepairId,
|
||||||
timer:sleep(12345),
|
Res, Stats]),
|
||||||
ets:delete(ETS),
|
ets:delete(ETS),
|
||||||
exit({todo, Res}).
|
exit({repair_final_status, Res}).
|
||||||
|
|
||||||
sanitize_repair_state(#ch_mgr{repair_final_status=Res,
|
sanitize_repair_state(#ch_mgr{repair_final_status=Res,
|
||||||
proj=#projection_v1{upi=[_|_]}}=S)
|
proj=#projection_v1{upi=[_|_]}}=S)
|
||||||
|
|
|
@ -103,8 +103,7 @@ repair(ap_mode=ConsistencyMode, Src, Repairing, UPI, MembersDict, ETS, Opts) ->
|
||||||
ok = execute_repair_directives(ConsistencyMode, Ds, Src, EpochID,
|
ok = execute_repair_directives(ConsistencyMode, Ds, Src, EpochID,
|
||||||
Verb, OurFLUs, ProxiesDict, ETS),
|
Verb, OurFLUs, ProxiesDict, ETS),
|
||||||
?VERB(" done\n"),
|
?VERB(" done\n"),
|
||||||
|
ok
|
||||||
todo_yo_not_quite_ok
|
|
||||||
catch
|
catch
|
||||||
What:Why ->
|
What:Why ->
|
||||||
Stack = erlang:get_stacktrace(),
|
Stack = erlang:get_stacktrace(),
|
||||||
|
@ -233,11 +232,11 @@ make_repair_directives2(C2, ConsistencyMode, RepairMode,
|
||||||
make_repair_directives3(C2, ConsistencyMode, RepairMode,
|
make_repair_directives3(C2, ConsistencyMode, RepairMode,
|
||||||
File, Verb, Src, FLUs, ProxiesDict, ETS, []).
|
File, Verb, Src, FLUs, ProxiesDict, ETS, []).
|
||||||
|
|
||||||
make_repair_directives3([{?MAX_OFFSET, 0, <<>>, FLU}|_Rest],
|
make_repair_directives3([{?MAX_OFFSET, 0, <<>>, _FLU}|_Rest],
|
||||||
_ConsistencyMode, _RepairMode,
|
_ConsistencyMode, _RepairMode,
|
||||||
_File, Verb, _Src, _FLUs, _ProxiesDict, _ETS, Acc) ->
|
_File, _Verb, _Src, _FLUs, _ProxiesDict, _ETS, Acc) ->
|
||||||
lists:reverse(Acc);
|
lists:reverse(Acc);
|
||||||
make_repair_directives3([{Offset, Size, CSum, FLU}=A|Rest0],
|
make_repair_directives3([{Offset, Size, CSum, _FLU}=A|Rest0],
|
||||||
ConsistencyMode, RepairMode,
|
ConsistencyMode, RepairMode,
|
||||||
File, Verb, Src, FLUs, ProxiesDict, ETS, Acc) ->
|
File, Verb, Src, FLUs, ProxiesDict, ETS, Acc) ->
|
||||||
{As0, Rest1} = take_same_offset_size(Rest0, Offset, Size),
|
{As0, Rest1} = take_same_offset_size(Rest0, Offset, Size),
|
||||||
|
@ -266,7 +265,7 @@ make_repair_directives3([{Offset, Size, CSum, FLU}=A|Rest0],
|
||||||
%% tuples guarantees that if there's a disagreement about chunk size at
|
%% tuples guarantees that if there's a disagreement about chunk size at
|
||||||
%% this offset, we can look ahead exactly one to see if there is sanity
|
%% this offset, we can look ahead exactly one to see if there is sanity
|
||||||
%% or not.
|
%% or not.
|
||||||
[{Offset_next, Size_next, _, _}=A_next|_] = Rest1,
|
[{Offset_next, _Size_next, _, _}=A_next|_] = Rest1,
|
||||||
if Offset + Size =< Offset_next ->
|
if Offset + Size =< Offset_next ->
|
||||||
ok;
|
ok;
|
||||||
true ->
|
true ->
|
||||||
|
@ -276,12 +275,12 @@ make_repair_directives3([{Offset, Size, CSum, FLU}=A|Rest0],
|
||||||
Do = if ConsistencyMode == ap_mode ->
|
Do = if ConsistencyMode == ap_mode ->
|
||||||
Gots = [FLU || {_Off, _Sz, _Cs, FLU} <- As],
|
Gots = [FLU || {_Off, _Sz, _Cs, FLU} <- As],
|
||||||
Missing = FLUs -- Gots,
|
Missing = FLUs -- Gots,
|
||||||
ThisSrc = case lists:member(Src, Gots) of
|
_ThisSrc = case lists:member(Src, Gots) of
|
||||||
true -> Src;
|
true -> Src;
|
||||||
false -> hd(Gots)
|
false -> hd(Gots)
|
||||||
end,
|
end,
|
||||||
[ets:update_counter(ETS, {directive_bytes, FLU}, Size) ||
|
[ets:update_counter(ETS, {directive_bytes, FLU_m}, Size) ||
|
||||||
FLU <- Missing],
|
FLU_m <- Missing],
|
||||||
if Missing == [] ->
|
if Missing == [] ->
|
||||||
noop;
|
noop;
|
||||||
true ->
|
true ->
|
||||||
|
@ -305,8 +304,8 @@ take_same_offset_size([{Offset, Size, _CSum, _FLU}=A|Rest], Offset, Size, Acc) -
|
||||||
take_same_offset_size(Rest, _Offset, _Size, Acc) ->
|
take_same_offset_size(Rest, _Offset, _Size, Acc) ->
|
||||||
{Acc, Rest}.
|
{Acc, Rest}.
|
||||||
|
|
||||||
execute_repair_directives(ap_mode=_ConsistencyMode, Ds, Src, EpochID, Verb,
|
execute_repair_directives(ap_mode=_ConsistencyMode, Ds, _Src, EpochID, Verb,
|
||||||
OurFLUs, ProxiesDict, ETS) ->
|
_OurFLUs, ProxiesDict, ETS) ->
|
||||||
{_,_,_,_} = lists:foldl(fun execute_repair_directive/2,
|
{_,_,_,_} = lists:foldl(fun execute_repair_directive/2,
|
||||||
{ProxiesDict, EpochID, Verb, ETS}, Ds),
|
{ProxiesDict, EpochID, Verb, ETS}, Ds),
|
||||||
ok.
|
ok.
|
||||||
|
@ -316,7 +315,7 @@ execute_repair_directive({File, Cmds}, {ProxiesDict, EpochID, Verb, ETS}=Acc) ->
|
||||||
{in_bytes, t_in_bytes}, {out_files, t_out_files},
|
{in_bytes, t_in_bytes}, {out_files, t_out_files},
|
||||||
{out_chunks, t_out_chunks}, {out_bytes, t_out_bytes}],
|
{out_chunks, t_out_chunks}, {out_bytes, t_out_bytes}],
|
||||||
[ets:insert(ETS, {L_K, 0}) || {L_K, _T_K} <- EtsKeys],
|
[ets:insert(ETS, {L_K, 0}) || {L_K, _T_K} <- EtsKeys],
|
||||||
F = fun({copy, {Offset, Size, CSum, MySrc}, MyDsts}, Acc) ->
|
F = fun({copy, {Offset, Size, CSum, MySrc}, MyDsts}, Acc2) ->
|
||||||
SrcP = orddict:fetch(MySrc, ProxiesDict),
|
SrcP = orddict:fetch(MySrc, ProxiesDict),
|
||||||
case ets:lookup_element(ETS, in_chunks, 2) rem 100 of
|
case ets:lookup_element(ETS, in_chunks, 2) rem 100 of
|
||||||
0 -> ?VERB(".", []);
|
0 -> ?VERB(".", []);
|
||||||
|
@ -342,7 +341,7 @@ execute_repair_directive({File, Cmds}, {ProxiesDict, EpochID, Verb, ETS}=Acc) ->
|
||||||
N = length(MyDsts),
|
N = length(MyDsts),
|
||||||
ets:update_counter(ETS, out_chunks, N),
|
ets:update_counter(ETS, out_chunks, N),
|
||||||
ets:update_counter(ETS, out_bytes, N*Size),
|
ets:update_counter(ETS, out_bytes, N*Size),
|
||||||
Acc;
|
Acc2;
|
||||||
CSum_now ->
|
CSum_now ->
|
||||||
error_logger:error_msg(
|
error_logger:error_msg(
|
||||||
"TODO: Checksum failure: "
|
"TODO: Checksum failure: "
|
||||||
|
@ -350,7 +349,7 @@ execute_repair_directive({File, Cmds}, {ProxiesDict, EpochID, Verb, ETS}=Acc) ->
|
||||||
"expected ~p got ~p\n",
|
"expected ~p got ~p\n",
|
||||||
[File, Offset, Size, CSum, CSum_now]),
|
[File, Offset, Size, CSum, CSum_now]),
|
||||||
ets:update_counter(ETS, t_bad_chunks, 1),
|
ets:update_counter(ETS, t_bad_chunks, 1),
|
||||||
Acc
|
Acc2
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
ok = lists:foldl(F, ok, Cmds),
|
ok = lists:foldl(F, ok, Cmds),
|
||||||
|
|
Loading…
Reference in a new issue