From 5c2635346f0a4243847c3e0e893be3b6c922f6ad Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Sat, 16 May 2015 17:39:58 +0900 Subject: [PATCH] Basic multi-party chain repair for ap_mode finished --- src/machi_chain_manager1.erl | 29 ++++++++++++++++++----------- src/machi_chain_repair.erl | 27 +++++++++++++-------------- 2 files changed, 31 insertions(+), 25 deletions(-) diff --git a/src/machi_chain_manager1.erl b/src/machi_chain_manager1.erl index 5041444..a17529a 100644 --- a/src/machi_chain_manager1.erl +++ b/src/machi_chain_manager1.erl @@ -86,9 +86,9 @@ %% Define the period of private projection stability before we'll %% start repair. -ifdef(TEST). --define(REPAIR_START_STABILITY_TIME, 0). +-define(REPAIR_START_STABILITY_TIME, 3). -else. % TEST --define(REPAIR_START_STABILITY_TIME, 3). % TODO suggest 10 or 15? +-define(REPAIR_START_STABILITY_TIME, 10). -endif. % TEST %% API @@ -555,7 +555,9 @@ calc_projection(#ch_mgr{proj=LastProj, runenv=RunEnv} = S, calc_projection(_OldThreshold, _NoPartitionThreshold, LastProj, RelativeToServer, AllHosed, Dbg, - #ch_mgr{name=MyName, runenv=RunEnv1}=S) -> + #ch_mgr{name=MyName, + runenv=RunEnv1, + repair_final_status=RepairFS}=S) -> #projection_v1{epoch_number=OldEpochNum, members_dict=MembersDict, upi=OldUPI_list, @@ -596,6 +598,11 @@ calc_projection(_OldThreshold, _NoPartitionThreshold, LastProj, if Simulator_p andalso SameEpoch_p -> D_foo=[{repair_airquote_done, {we_agree, (S#ch_mgr.proj)#projection_v1.epoch_number}}], {NewUPI_list ++ [H], T, RunEnv2}; + not Simulator_p + andalso + RepairFS == {repair_final_status, ok} -> + D_foo=[{repair_done, {repair_final_status, ok, (S#ch_mgr.proj)#projection_v1.epoch_number}}], + {NewUPI_list ++ Repairing_list2, [], RunEnv2}; true -> D_foo=[], {NewUPI_list, OldRepairing_list, RunEnv2} @@ -1947,11 +1954,11 @@ do_repair( proj=#projection_v1{upi=UPI, repairing=[_|_]=Repairing, members_dict=MembersDict}}=_S_copy, - Opts, ap_mode=_RepairMode) -> + Opts, ap_mode=RepairMode) -> T1 = os:timestamp(), RepairId = proplists:get_value(repair_id, Opts, id1), - error_logger:info_msg("Repair start: tail ~p of ~p -> ~p, ID ~p\n", - [MyName, UPI, Repairing, RepairId]), + error_logger:info_msg("Repair start: tail ~p of ~p -> ~p, ~p ID ~w\n", + [MyName, UPI, Repairing, RepairMode, RepairId]), ETS = ets:new(repair_stats, [private, set]), ETS_T_Keys = [t_in_files, t_in_chunks, t_in_bytes, @@ -1968,12 +1975,12 @@ do_repair( _ -> "FAILURE" end, Stats = [{K, ets:lookup_element(ETS, K, 2)} || K <- ETS_T_Keys], - error_logger:info_msg("Repair ~s: tail ~p of ~p finished repair ID ~p: " - "~p\nStats ~w\n", - [Summary, MyName, UPI, RepairId, Res, Stats]), - timer:sleep(12345), + error_logger:info_msg("Repair ~s: tail ~p of ~p finished ~p repair ID ~w: " + "~w\nStats ~w\n", + [Summary, MyName, UPI, RepairMode, RepairId, + Res, Stats]), ets:delete(ETS), - exit({todo, Res}). + exit({repair_final_status, Res}). sanitize_repair_state(#ch_mgr{repair_final_status=Res, proj=#projection_v1{upi=[_|_]}}=S) diff --git a/src/machi_chain_repair.erl b/src/machi_chain_repair.erl index 6abcde2..0c4fa26 100644 --- a/src/machi_chain_repair.erl +++ b/src/machi_chain_repair.erl @@ -103,8 +103,7 @@ repair(ap_mode=ConsistencyMode, Src, Repairing, UPI, MembersDict, ETS, Opts) -> ok = execute_repair_directives(ConsistencyMode, Ds, Src, EpochID, Verb, OurFLUs, ProxiesDict, ETS), ?VERB(" done\n"), - - todo_yo_not_quite_ok + ok catch What:Why -> Stack = erlang:get_stacktrace(), @@ -233,11 +232,11 @@ make_repair_directives2(C2, ConsistencyMode, RepairMode, make_repair_directives3(C2, ConsistencyMode, RepairMode, File, Verb, Src, FLUs, ProxiesDict, ETS, []). -make_repair_directives3([{?MAX_OFFSET, 0, <<>>, FLU}|_Rest], +make_repair_directives3([{?MAX_OFFSET, 0, <<>>, _FLU}|_Rest], _ConsistencyMode, _RepairMode, - _File, Verb, _Src, _FLUs, _ProxiesDict, _ETS, Acc) -> + _File, _Verb, _Src, _FLUs, _ProxiesDict, _ETS, Acc) -> lists:reverse(Acc); -make_repair_directives3([{Offset, Size, CSum, FLU}=A|Rest0], +make_repair_directives3([{Offset, Size, CSum, _FLU}=A|Rest0], ConsistencyMode, RepairMode, File, Verb, Src, FLUs, ProxiesDict, ETS, Acc) -> {As0, Rest1} = take_same_offset_size(Rest0, Offset, Size), @@ -266,7 +265,7 @@ make_repair_directives3([{Offset, Size, CSum, FLU}=A|Rest0], %% tuples guarantees that if there's a disagreement about chunk size at %% this offset, we can look ahead exactly one to see if there is sanity %% or not. - [{Offset_next, Size_next, _, _}=A_next|_] = Rest1, + [{Offset_next, _Size_next, _, _}=A_next|_] = Rest1, if Offset + Size =< Offset_next -> ok; true -> @@ -276,12 +275,12 @@ make_repair_directives3([{Offset, Size, CSum, FLU}=A|Rest0], Do = if ConsistencyMode == ap_mode -> Gots = [FLU || {_Off, _Sz, _Cs, FLU} <- As], Missing = FLUs -- Gots, - ThisSrc = case lists:member(Src, Gots) of + _ThisSrc = case lists:member(Src, Gots) of true -> Src; false -> hd(Gots) end, - [ets:update_counter(ETS, {directive_bytes, FLU}, Size) || - FLU <- Missing], + [ets:update_counter(ETS, {directive_bytes, FLU_m}, Size) || + FLU_m <- Missing], if Missing == [] -> noop; true -> @@ -305,8 +304,8 @@ take_same_offset_size([{Offset, Size, _CSum, _FLU}=A|Rest], Offset, Size, Acc) - take_same_offset_size(Rest, _Offset, _Size, Acc) -> {Acc, Rest}. -execute_repair_directives(ap_mode=_ConsistencyMode, Ds, Src, EpochID, Verb, - OurFLUs, ProxiesDict, ETS) -> +execute_repair_directives(ap_mode=_ConsistencyMode, Ds, _Src, EpochID, Verb, + _OurFLUs, ProxiesDict, ETS) -> {_,_,_,_} = lists:foldl(fun execute_repair_directive/2, {ProxiesDict, EpochID, Verb, ETS}, Ds), ok. @@ -316,7 +315,7 @@ execute_repair_directive({File, Cmds}, {ProxiesDict, EpochID, Verb, ETS}=Acc) -> {in_bytes, t_in_bytes}, {out_files, t_out_files}, {out_chunks, t_out_chunks}, {out_bytes, t_out_bytes}], [ets:insert(ETS, {L_K, 0}) || {L_K, _T_K} <- EtsKeys], - F = fun({copy, {Offset, Size, CSum, MySrc}, MyDsts}, Acc) -> + F = fun({copy, {Offset, Size, CSum, MySrc}, MyDsts}, Acc2) -> SrcP = orddict:fetch(MySrc, ProxiesDict), case ets:lookup_element(ETS, in_chunks, 2) rem 100 of 0 -> ?VERB(".", []); @@ -342,7 +341,7 @@ execute_repair_directive({File, Cmds}, {ProxiesDict, EpochID, Verb, ETS}=Acc) -> N = length(MyDsts), ets:update_counter(ETS, out_chunks, N), ets:update_counter(ETS, out_bytes, N*Size), - Acc; + Acc2; CSum_now -> error_logger:error_msg( "TODO: Checksum failure: " @@ -350,7 +349,7 @@ execute_repair_directive({File, Cmds}, {ProxiesDict, EpochID, Verb, ETS}=Acc) -> "expected ~p got ~p\n", [File, Offset, Size, CSum, CSum_now]), ets:update_counter(ETS, t_bad_chunks, 1), - Acc + Acc2 end end, ok = lists:foldl(F, ok, Cmds),