WIP: chain mgmt prototype scaffolding 8: basic read repair done

This commit is contained in:
Scott Lystig Fritchie 2014-10-29 22:34:00 +09:00
parent 932b6afb76
commit 67b661494e
2 changed files with 121 additions and 92 deletions

View file

@ -37,14 +37,14 @@
epoch_csum :: m_csum(), epoch_csum :: m_csum(),
prev_epoch_num :: m_epoch_n(), prev_epoch_num :: m_epoch_n(),
prev_epoch_csum :: m_csum(), prev_epoch_csum :: m_csum(),
creation_time :: timestamp(),
author_server :: m_server(),
all_members :: [m_server()], all_members :: [m_server()],
down :: [m_server()], down :: [m_server()],
creation_time :: timestamp(),
author_server :: m_server(),
upi :: [m_server()], upi :: [m_server()],
repairing :: [m_server()], repairing :: [m_server()],
dbg :: list(), %proplist() checksummed dbg :: list(), %proplist(), is checksummed
dbg2 :: list() %proplist() not checksummed dbg2 :: list() %proplist(), is not checksummed
}). }).
-record(ch_mgr, { -record(ch_mgr, {

View file

@ -39,7 +39,7 @@
-export([test_calc_projection/2, -export([test_calc_projection/2,
test_calc_proposed_projection/1, test_calc_proposed_projection/1,
test_write_proposed_projection/1, test_write_proposed_projection/1,
test_read_public_projection/2]). test_read_latest_public_projection/2]).
-ifdef(EQC). -ifdef(EQC).
-include_lib("eqc/include/eqc.hrl"). -include_lib("eqc/include/eqc.hrl").
@ -87,8 +87,8 @@ test_calc_projection(Pid, KeepRunenvP) ->
test_calc_proposed_projection(Pid) -> test_calc_proposed_projection(Pid) ->
gen_server:cast(Pid, {test_calc_proposed_projection}). gen_server:cast(Pid, {test_calc_proposed_projection}).
test_read_public_projection(Pid, ReadRepairP) -> test_read_latest_public_projection(Pid, ReadRepairP) ->
gen_server:call(Pid, {test_read_public_projection, ReadRepairP}, infinity). gen_server:call(Pid, {test_read_latest_public_projection, ReadRepairP}, infinity).
-endif. % TEST -endif. % TEST
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
@ -131,9 +131,9 @@ handle_call({test_calc_projection, KeepRunenvP}, _From, S) ->
{reply, {ok, P}, if KeepRunenvP -> S2; {reply, {ok, P}, if KeepRunenvP -> S2;
true -> S true -> S
end}; end};
handle_call({test_read_public_projection, ReadRepairP}, _From, S) -> handle_call({test_read_latest_public_projection, ReadRepairP}, _From, S) ->
{Res, ExtraInfo, _DiscardS2} = do_cl_read_public_projection(ReadRepairP, S), {Res, ExtraInfo, S2} = do_cl_read_latest_public_projection(ReadRepairP, S),
{reply, {Res, ExtraInfo}, S}; {reply, {Res, ExtraInfo}, S2};
handle_call(_Call, _From, S) -> handle_call(_Call, _From, S) ->
{reply, whaaaaaaaaaa, S}. {reply, whaaaaaaaaaa, S}.
@ -195,19 +195,31 @@ do_cl_write_proposed_proj(#ch_mgr{proj_proposed=Proj} = S) ->
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
cl_write_public_proj(Epoch, Proj, S) -> cl_write_public_proj(Epoch, Proj, S) ->
cl_write_public_proj(Epoch, Proj, false, S).
cl_write_public_proj_skip_local_error(Epoch, Proj, S) ->
cl_write_public_proj(Epoch, Proj, true, S).
cl_write_public_proj(Epoch, Proj, SkipLocalWriteErrorP, S) ->
%% Write to local public projection store first, and if it succeeds, %% Write to local public projection store first, and if it succeeds,
%% then write to all remote public projection stores. %% then write to all remote public projection stores.
cl_write_public_proj_local(Epoch, Proj, S). cl_write_public_proj_local(Epoch, Proj, SkipLocalWriteErrorP, S).
cl_write_public_proj_local(Epoch, Proj, #ch_mgr{myflu=MyFLU}=S) -> cl_write_public_proj_local(Epoch, Proj, SkipLocalWriteErrorP,
#ch_mgr{myflu=MyFLU}=S) ->
{_UpNodes, Partitions, S2} = calc_up_nodes(S), {_UpNodes, Partitions, S2} = calc_up_nodes(S),
Res0 = perhaps_call_t( Res0 = perhaps_call_t(
S, Partitions, MyFLU, S, Partitions, MyFLU,
fun() -> machi_flu0:proj_write(MyFLU, Epoch, public, Proj) end), fun() -> machi_flu0:proj_write(MyFLU, Epoch, public, Proj) end),
Continue = fun() ->
FLUs = Proj#projection.all_members -- [MyFLU],
cl_write_public_proj_remote(FLUs, Partitions, Epoch, Proj, S)
end,
case Res0 of case Res0 of
ok -> ok ->
FLUs = Proj#projection.all_members -- [MyFLU], Continue();
cl_write_public_proj_remote(FLUs, Partitions, Epoch, Proj, S); _Else when SkipLocalWriteErrorP ->
Continue();
Else when Else == error_written; Else == timeout; Else == t_timeout -> Else when Else == error_written; Else == timeout; Else == t_timeout ->
{Else, S2} {Else, S2}
end. end.
@ -220,10 +232,10 @@ cl_write_public_proj_remote(FLUs, Partitions, Epoch, Proj, S) ->
FLU <- FLUs], FLU <- FLUs],
{{remote_write_results, Rs}, S}. {{remote_write_results, Rs}, S}.
do_cl_read_public_projection(ReadRepairP, do_cl_read_latest_public_projection(ReadRepairP,
#ch_mgr{proj=Proj1, myflu=MyFLU} = S) -> #ch_mgr{proj=Proj1, myflu=MyFLU} = S) ->
Epoch1 = Proj1#projection.epoch_number, Epoch1 = Proj1#projection.epoch_number,
case cl_read_public_projection(S) of case cl_read_latest_public_projection(S) of
{unanimous, Proj2, Extra, S3} when Proj2 == Proj1 -> {unanimous, Proj2, Extra, S3} when Proj2 == Proj1 ->
{Proj2, Extra, S3}; {Proj2, Extra, S3};
{unanimous, #projection{epoch_number=Epoch2}=Proj2, Extra, _S3} {unanimous, #projection{epoch_number=Epoch2}=Proj2, Extra, _S3}
@ -236,25 +248,16 @@ do_cl_read_public_projection(ReadRepairP,
Proj2, [{hooray, {date(), time()}}|Extra]), Proj2, [{hooray, {date(), time()}}|Extra]),
ok = machi_flu0:proj_write(MyFLU, Epoch2, private, Proj2b), ok = machi_flu0:proj_write(MyFLU, Epoch2, private, Proj2b),
{Proj2, Extra, S3#ch_mgr{proj=Proj2b}}; {Proj2, Extra, S3#ch_mgr{proj=Proj2b}};
{needs_work, _FLUsRs, _Extra, _S3}=Else3 -> {needs_work, FLUsRs, Extra, S3} ->
if not ReadRepairP -> if not ReadRepairP ->
Else3; {not_unanimous, [{results, FLUsRs}|Extra], S3};
true -> true ->
%% 1. Do the results contain a projection? {_Status, S4} = do_read_repair(FLUsRs, Extra, S3),
%% perhaps figure that in cl_read_public_projection()? {not_unanimous, Extra, S4}
%% 2. Were there any error_unwritten?
%% 3. Repair the unwritten FLUs.
%% 4. Nothing to do with timeouts, right? They're
%% hopeless for the moment, need to wait.
%% For read-repair, just choose the best and then brute-
%% force write, don't care about write status, then
%% repeat do_cl_read_public_projection() ??
Else3
end end
end. end.
cl_read_public_projection(#ch_mgr{proj=Proj}=S) -> cl_read_latest_public_projection(#ch_mgr{proj=Proj}=S) ->
#projection{all_members=All_list} = Proj, #projection{all_members=All_list} = Proj,
{_UpNodes, Partitions, S2} = calc_up_nodes(S), {_UpNodes, Partitions, S2} = calc_up_nodes(S),
DoIt = fun(X) -> DoIt = fun(X) ->
@ -274,6 +277,36 @@ cl_read_public_projection(#ch_mgr{proj=Proj}=S) ->
{needs_work, FLUsRs, [flarfus], S2} % todo? {needs_work, FLUsRs, [flarfus], S2} % todo?
end. end.
% {_Status, S4} = do_read_repair(FLUsRs, Extra, S3),
%% 1. Do the results contain a projection?
%% perhaps figure that in cl_read_latest_public_projection()?
%% 2. Were there any error_unwritten?
%% 3. Repair the unwritten FLUs.
%% 4. Nothing to do with timeouts, right? They're
%% hopeless for the moment, need to wait.
%%
%% For read-repair, just choose the best and then brute-
%% force write, don't care about write status, then
%% repeat do_cl_read_latest_public_projection() ??
do_read_repair(FLUsRs, _Extra, #ch_mgr{proj=CurrentProj} = S) ->
Ps = [Proj || {_FLU, Proj} <- FLUsRs, is_record(Proj, projection)],
if Ps == [] ->
{nothing_to_do, S};
true ->
[{_Rank, BestProj}|_] = rank_and_sort_projections(Ps, CurrentProj),
Epoch = BestProj#projection.epoch_number,
%% We're doing repair, so use the flavor that will skip an
%% error on the local FLU, which if it happens is almost
%% certainly error_written ... that or any other error on
%% the local projection store should not interfere with
%% attempting to write the other projection stores.
{_DontCare, _S2}=Res = cl_write_public_proj_skip_local_error(
Epoch, BestProj, S),
Res
end.
make_initial_projection(MyName, All_list, UPI_list, Repairing_list, Ps) -> make_initial_projection(MyName, All_list, UPI_list, Repairing_list, Ps) ->
make_projection(0, -1, <<>>, make_projection(0, -1, <<>>,
MyName, All_list, [], UPI_list, Repairing_list, Ps). MyName, All_list, [], UPI_list, Repairing_list, Ps).
@ -301,9 +334,12 @@ make_projection(EpochNum, PrevEpochNum, PrevEpochCSum,
dbg=Dbg, dbg=Dbg,
dbg2=[] % always checksums as [] dbg2=[] % always checksums as []
}, },
P2 = update_projection_checksum(P),
P2#projection{dbg2=Dbg2}.
update_projection_checksum(P) ->
CSum = crypto:hash(sha, term_to_binary(P)), CSum = crypto:hash(sha, term_to_binary(P)),
P#projection{epoch_csum=CSum, P#projection{epoch_csum=CSum}.
dbg2=Dbg2}.
update_projection_dbg2(P, Dbg2) when is_list(Dbg2) -> update_projection_dbg2(P, Dbg2) when is_list(Dbg2) ->
P#projection{dbg2=Dbg2}. P#projection{dbg2=Dbg2}.
@ -417,37 +453,22 @@ roll_dice(N, RunEnv) ->
{Val, Seed2} = random:uniform_s(N, Seed1), {Val, Seed2} = random:uniform_s(N, Seed1),
{Val, replace(RunEnv, [{seed, Seed2}])}. {Val, replace(RunEnv, [{seed, Seed2}])}.
rank_and_repair_instructions([_|_] = FLUsRs, Proj) -> rank_and_sort_projections(Ps, CurrentProj) ->
#projection{all_members=All_list} = Proj, Epoch = lists:max([Proj#projection.epoch_number || Proj <- Ps]),
MaxPs = [Proj || Proj <- Ps,
Proj#projection.epoch_number == Epoch],
%% Sort with highest rank first (custom sort)
lists:sort(fun({RankA,_}, {RankB,_}) -> RankA > RankB end,
rank_projections(MaxPs, CurrentProj)).
%% Caller must ensure all Projs are of the same epoch number.
rank_projections(Projs, CurrentProj) ->
#projection{all_members=All_list} = CurrentProj,
MemberRank = orddict:from_list( MemberRank = orddict:from_list(
lists:zip(All_list, lists:seq(1, length(All_list)))), lists:zip(All_list, lists:seq(1, length(All_list)))),
N = length(All_list), N = length(All_list),
TopRank = lists:max([-1] ++ [{rank_projection(Proj, MemberRank, N), Proj} || Proj <- Projs].
[rank_projection(ProjX, MemberRank, N) ||
{_FLU, ProjX} <- FLUsRs,
is_record(ProjX, projection)]),
rank_and_repair(FLUsRs, Proj, TopRank, undefined, [], MemberRank, N).
rank_and_repair([], _Proj, TopRank, WinningProj,
RepairInstr, _MemberRank, _N) ->
{TopRank, WinningProj, RepairInstr};
rank_and_repair([{FLU, Proj}|T], OldProj, TopRank, WinningProj,
RepairInstr, MemberRank, N)
when is_record(Proj, projection) ->
case rank_projection(Proj, MemberRank, N) of
Rank when Rank == TopRank ->
rank_and_repair(T, OldProj, TopRank, Proj,
RepairInstr, MemberRank, N);
_Rank ->
rank_and_repair(T, OldProj, TopRank, WinningProj,
[FLU|RepairInstr], MemberRank, N)
end;
rank_and_repair([{FLU, _SomeError}|T], OldProj, TopRank, WinningProj,
RepairInstr, MemberRank, N) ->
?D({?LINE, _SomeError}),
rank_and_repair(T, OldProj, TopRank, WinningProj,
[FLU|RepairInstr], MemberRank, N).
rank_projection(#projection{author_server=Author, rank_projection(#projection{author_server=Author,
upi=UPI_list, upi=UPI_list,
@ -534,17 +555,12 @@ smoke1_test() ->
%% {ok, M0} = ?MGR:start_link(I_represent, [a,b,c], {1,2,3}, 50, 50, I_am), %% {ok, M0} = ?MGR:start_link(I_represent, [a,b,c], {1,2,3}, 50, 50, I_am),
{ok, M0} = ?MGR:start_link(I_represent, [a,b,c], {1,2,3}, 0, 100, I_am), {ok, M0} = ?MGR:start_link(I_represent, [a,b,c], {1,2,3}, 0, 100, I_am),
try try
{ok, P1} = test_calc_projection(M0, false), {ok, _P1} = test_calc_projection(M0, false),
%% ?D(make_projection_summary(P1)),
%% {ok, _Pa} = test_calc_projection(M0, true),
%% %% ?D(make_projection_summary(_Pa)),
%% {ok, _Pb} = test_calc_projection(M0, true),
%% %% ?D(make_projection_summary(_Pb)),
_ = test_calc_proposed_projection(M0), _ = test_calc_proposed_projection(M0),
{remote_write_results, [{b,ok},{c,ok}]} = {remote_write_results,
test_write_proposed_projection(M0), [{b,ok},{c,ok}]} = test_write_proposed_projection(M0),
XX = test_read_public_projection(M0, false), XX = test_read_latest_public_projection(M0, false),
?D(XX), ?D(XX),
ok ok
@ -555,27 +571,40 @@ smoke1_test() ->
ok = machi_flu0:stop(FLUc) ok = machi_flu0:stop(FLUc)
end. end.
%% nonunanimous_read_setup_test() -> nonunanimous_read_setup_test() ->
%% {ok, FLUa} = machi_flu0:start_link(a), {ok, FLUa} = machi_flu0:start_link(a),
%% {ok, FLUb} = machi_flu0:start_link(b), {ok, FLUb} = machi_flu0:start_link(b),
%% I_represent = I_am = a, I_represent = I_am = a,
%% {ok, M0} = ?MGR:start_link(I_represent, [a,b], {1,2,3}, 50, 50, I_am), {ok, Ma} = ?MGR:start_link(I_represent, [a,b], {1,2,3}, 0, 100, I_am),
%% try try
%% ok = test_write_proposed_projection(M0), {ok, P1} = test_calc_projection(Ma, false),
%% {ok, P1} = test_calc_projection(M0),
%% P1a = P1#projection{down=[b], upi=[a], dbg=[{hackhack, ?LINE}]}, P1a = update_projection_checksum(
%% P1b = P1#projection{down=[a], upi=[b], dbg=[{hackhack, ?LINE}]}, P1#projection{down=[b], upi=[a], dbg=[{hackhack, ?LINE}]}),
%% P1Epoch = P1#projection.epoch_number, P1b = update_projection_checksum(
%% ok = machi_flu0:proj_write(FLUa, P1Epoch, public, P1a), P1#projection{author_server=b, creation_time=now(),
%% ok = machi_flu0:proj_write(FLUb, P1Epoch, public, P1b), down=[a], upi=[b], dbg=[{hackhack, ?LINE}]}),
%% XX = test_read_public_projection(M0, false), P1Epoch = P1#projection.epoch_number,
%% ?D(XX), ok = machi_flu0:proj_write(FLUa, P1Epoch, public, P1a),
%% XX2 = test_read_public_projection(M0, true), ok = machi_flu0:proj_write(FLUb, P1Epoch, public, P1b),
%% ?D(XX2)
%% after ?D(x),
%% ok = ?MGR:stop(M0), {not_unanimous, _}=_XX = test_read_latest_public_projection(Ma, false),
%% ok = machi_flu0:stop(FLUa), ?Dw(_XX),
%% ok = machi_flu0:stop(FLUb) {not_unanimous, _}=_YY = test_read_latest_public_projection(Ma, true),
%% end. %% The read repair here doesn't automatically trigger the creation of
%% a new projection (to try to create a unanimous projection). So
%% we expect nothing to change when called again.
{not_unanimous, _}=_YY = test_read_latest_public_projection(Ma, true),
ok
%% XX2 = test_read_public_projection(Ma, true),
%% ?D(XX2)
after
ok = ?MGR:stop(Ma),
ok = machi_flu0:stop(FLUa),
ok = machi_flu0:stop(FLUb)
end.
-endif. -endif.