Chain manager projection store flowchart implemented & passes smoke test!

This commit is contained in:
Scott Lystig Fritchie 2014-10-31 19:59:54 +09:00
parent 8faa1404c6
commit 83e4937658

View file

@ -40,7 +40,8 @@
-export([test_calc_projection/2,
test_calc_proposed_projection/1,
test_write_proposed_projection/1,
test_read_latest_public_projection/2]).
test_read_latest_public_projection/2,
test_react_to_env/1]).
-ifdef(EQC).
-include_lib("eqc/include/eqc.hrl").
@ -90,6 +91,10 @@ test_calc_proposed_projection(Pid) ->
test_read_latest_public_projection(Pid, ReadRepairP) ->
gen_server:call(Pid, {test_read_latest_public_projection, ReadRepairP}, infinity).
test_react_to_env(Pid) ->
gen_server:call(Pid, {test_react_to_env}, infinity).
-endif. % TEST
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
@ -137,6 +142,9 @@ handle_call({test_read_latest_public_projection, ReadRepairP}, _From, S) ->
do_cl_read_latest_public_projection(ReadRepairP, S),
Res = {Perhaps, Val, ExtraInfo},
{reply, Res, S2};
handle_call({test_react_to_env}, _From, S) ->
{TODOtodo, S2} = do_react_to_env(S),
{reply, TODOtodo, S2};
handle_call(_Call, _From, S) ->
{reply, whaaaaaaaaaa, S}.
@ -144,7 +152,7 @@ handle_cast(_Cast, #ch_mgr{init_finished=false} = S) ->
{noreply, S};
handle_cast({test_calc_proposed_projection}, S) ->
{Proj, S2} = calc_projection(S, [{author_proc, cast}]),
?D({make_projection_summary(Proj)}),
%% ?Dw({?LINE,make_projection_summary(Proj)}),
{noreply, S2#ch_mgr{proj_proposed=Proj}};
handle_cast(_Cast, S) ->
?D({cast_whaaaaaaaaaaa, _Cast}),
@ -236,8 +244,8 @@ cl_write_public_proj_remote(FLUs, Partitions, Epoch, Proj, S) ->
{{remote_write_results, Rs}, S}.
do_cl_read_latest_public_projection(ReadRepairP,
#ch_mgr{proj=Proj1, myflu=MyFLU} = S) ->
Epoch1 = Proj1#projection.epoch_number,
#ch_mgr{proj=Proj1, myflu=_MyFLU} = S) ->
_Epoch1 = Proj1#projection.epoch_number,
case cl_read_latest_public_projection(S) of
{needs_repair, FLUsRs, Extra, S3} ->
if not ReadRepairP ->
@ -246,19 +254,8 @@ do_cl_read_latest_public_projection(ReadRepairP,
{_Status, S4} = do_read_repair(FLUsRs, Extra, S3),
do_cl_read_latest_public_projection(ReadRepairP, S4)
end;
{unanimous, Proj2, Extra, S3} when Proj2 == Proj1 ->
{unanimous, Proj2, Extra, S3};
{unanimous, #projection{epoch_number=Epoch2}=Proj2, Extra, _S3}
when Epoch2 < Epoch1 orelse
(Epoch2 == Epoch1 andalso Proj2 /= Proj1) ->
exit({invariant_error, mine, Proj1, cl_unanimous, Proj2, extra, Extra});
{unanimous, #projection{epoch_number=Epoch2}=Proj2, Extra, S3}
when Epoch2 > Epoch1 ->
io:format(user, "\nHEY! This probably needs an update!\n\n", []),
Proj2b = update_projection_dbg2(
Proj2, [{hooray, {date(), time()}}|Extra]),
ok = machi_flu0:proj_write(MyFLU, Epoch2, private, Proj2b),
{unanimous, Proj2, Extra, S3#ch_mgr{proj=Proj2b}}
{UnanimousTag, Proj2, Extra, S3} ->
{UnanimousTag, Proj2, Extra, S3}
end.
cl_read_latest_public_projection(#ch_mgr{proj=CurrentProj}=S) ->
@ -273,15 +270,17 @@ cl_read_latest_public_projection(#ch_mgr{proj=CurrentProj}=S) ->
Rs = [perhaps_call_t(S, Partitions, FLU, fun() -> DoIt(FLU) end) ||
FLU <- All_list],
FLUsRs = lists:zip(All_list, Rs),
UnwrittenP = ([x || error_unwritten <- Rs] == []),
UnwrittenRs = [x || error_unwritten <- Rs],
Ps = [Proj || {_FLU, Proj} <- FLUsRs, is_record(Proj, projection)],
if UnwrittenP ->
if length(UnwrittenRs) == length(Rs) ->
{error_unwritten, FLUsRs, [todo_fix_caller_perhaps], S2};
UnwrittenRs /= [] ->
{needs_repair, FLUsRs, [flarfus], S2};
true ->
[{_Rank, BestProj}|Rest] = rank_and_sort_projections(
Ps, CurrentProj),
UnanimousTag = if Rest == [] -> unanimous;
true -> not_unanimous
[{_Rank, BestProj}|_] = rank_and_sort_projections(Ps, CurrentProj),
NotBestPs = [Proj || Proj <- Ps, Proj /= BestProj],
UnanimousTag = if NotBestPs == [] -> unanimous;
true -> not_unanimous
end,
Extra = [{all_members_replied, length(Rs) == length(All_list)}],
Best_FLUs = [FLU || {FLU, Projx} <- FLUsRs, Projx == BestProj],
@ -470,6 +469,8 @@ rank_and_sort_projections(Ps, CurrentProj) ->
rank_projections(MaxPs, CurrentProj)).
%% Caller must ensure all Projs are of the same epoch number.
%% If the caller gives us projections with different epochs, we assume
%% that the caller is doing an OK thing.
rank_projections(Projs, CurrentProj) ->
#projection{all_members=All_list} = CurrentProj,
@ -486,6 +487,271 @@ rank_projection(#projection{author_server=Author,
(1*N + length(Repairing_list)) +
(2*N + length(UPI_list)).
do_react_to_env(S) ->
react_to_env_A10(S).
react_to_env_A10(S) ->
react_to_env_A20(0, S).
react_to_env_A20(Retries, S) ->
%% io:format(user, "current: ~w\n", [make_projection_summary(S#ch_mgr.proj)]),
{P_newprop, S2} = calc_projection(S, [{author_proc, react}]),
%% io:format(user, "proposed: ~w\n", [make_projection_summary(Proposed)]),
react_to_env_A30(Retries, P_newprop, S2).
react_to_env_A30(Retries, P_newprop, S) ->
{UnanimousTag, P_latest, _Extra, S2} =
do_cl_read_latest_public_projection(true, S),
LatestUnanimousP = if UnanimousTag == unanimous -> true;
UnanimousTag == not_unanimous -> false
end,
react_to_env_A40(Retries, P_newprop, P_latest, LatestUnanimousP, S2).
react_to_env_A40(Retries, P_newprop, P_latest, LatestUnanimousP,
#ch_mgr{proj=P_current}=S) ->
[{Rank_newprop, _}] = rank_projections([P_newprop], P_current),
[{Rank_latest, _}] = rank_projections([P_latest], P_current),
if
P_latest#projection.epoch_number > P_current#projection.epoch_number
orelse
not LatestUnanimousP ->
%% 1st clause: someone else has written a newer projection
%% 2nd clause: a network partition has healed, revealing a
%% differing opinion.
react_to_env_B10(Retries, P_newprop, P_latest, LatestUnanimousP,
Rank_newprop, Rank_latest, S);
P_latest#projection.epoch_number < P_current#projection.epoch_number
orelse
P_latest /= P_current ->
%% Both of these cases are rare. Elsewhere, the code
%% assumes that the local FLU's projection store is always
%% available, so reads & writes to it aren't going to fail
%% willy-nilly. If that assumption is true, then we can
%% reason as follows:
%%
%% a. If we can always read from the local FLU projection
%% store, then the 1st clause isn't possible because
%% P_latest's epoch # must be at least as large as
%% P_current's epoch #
%%
%% b. If P_latest /= P_current, then there can't be a
%% unanimous reply for P_latest, so the earlier 'if'
%% clause would be triggered and so we could never reach
%% this clause.
%%
%% I'm keeping this 'if' clause just in case the local FLU
%% projection store assumption changes.
react_to_env_B10(Retries, P_newprop, P_latest, LatestUnanimousP,
Rank_newprop, Rank_latest, S);
Rank_newprop > Rank_latest ->
react_to_env_C300(P_newprop, S);
true ->
{{no_change, P_latest#projection.epoch_number}, S}
end.
react_to_env_B10(Retries, P_newprop, P_latest, LatestUnanimousP,
Rank_newprop, Rank_latest, #ch_mgr{name=MyName}=S) ->
if
LatestUnanimousP ->
react_to_env_C100(P_newprop, P_latest, S);
Retries > 2 ->
%% The author of P_latest is too slow or crashed.
%% Let's try to write P_newprop and see what happens!
react_to_env_C300(P_newprop, S);
Rank_latest >= Rank_newprop
andalso
P_latest#projection.author_server /= MyName ->
%% Give the author of P_latest an opportunite to write a
%% new projection in a new epoch to resolve this mixed
%% opinion.
react_to_env_C200(Retries, P_latest, S);
true ->
%% P_newprop is best, so let's write it.
react_to_env_C300(P_newprop, S)
end.
react_to_env_C100(P_newprop, P_latest, #ch_mgr{proj=P_current}=S) ->
case projection_transition_is_sane(P_current, P_latest) of
true ->
react_to_env_C110(P_latest, S);
false ->
%% P_latest is known to be crap.
%% By process of elimination, P_newprop is best,
%% so let's write it.
react_to_env_C300(P_newprop, S)
end.
react_to_env_C110(P_latest, #ch_mgr{myflu=MyFLU} = S) ->
%% TOOD: Should we carry along any extra info that that would be useful
%% in the dbg2 list?
Extra_todo = [],
P_latest2 = update_projection_dbg2(
P_latest,
[{hooray, {v2, date(), time()}}|Extra_todo]),
Epoch = P_latest2#projection.epoch_number,
ok = machi_flu0:proj_write(MyFLU, Epoch, private, P_latest2),
react_to_env_C120(P_latest, S).
react_to_env_C120(P_latest, S) ->
{{now_using, P_latest#projection.epoch_number},
S#ch_mgr{proj=P_latest, proj_proposed=none}}.
react_to_env_C200(Retries, P_latest, S) ->
try
yo:tell_author_yo(P_latest#projection.author_server)
catch Type:Err ->
io:format(user, "TODO: tell_author_yo is broken: ~p ~p\n",
[Type, Err])
end,
react_to_env_C210(Retries, S).
react_to_env_C210(Retries, S) ->
%% TODO: implement the ranked sleep thingie?
timer:sleep(10),
react_to_env_A20(Retries + 1, S).
react_to_env_C300(#projection{epoch_number=Epoch}=P_newprop, S) ->
P_newprop2 = P_newprop#projection{epoch_number=Epoch + 1},
react_to_env_C310(update_projection_checksum(P_newprop2), S).
react_to_env_C310(P_newprop, S) ->
Epoch = P_newprop#projection.epoch_number,
{_Res, S2} = cl_write_public_proj(Epoch, P_newprop, S),
?D({c310, _Res}),
react_to_env_A10(S2).
projection_transition_is_sane(
#projection{epoch_number=Epoch1,
epoch_csum=CSum1,
creation_time=CreationTime1,
author_server=AuthorServer1,
all_members=All_list1,
down=Down_list1,
upi=UPI_list1,
repairing=Repairing_list1,
dbg=Dbg1} = P1,
#projection{epoch_number=Epoch2,
epoch_csum=CSum2,
creation_time=CreationTime2,
author_server=AuthorServer2,
all_members=All_list2,
down=Down_list2,
upi=UPI_list2,
repairing=Repairing_list2,
dbg=Dbg2} = P2) ->
try
true = is_integer(Epoch1) andalso is_integer(Epoch2),
true = is_binary(CSum1) andalso is_binary(CSum2),
{_,_,_} = CreationTime1,
{_,_,_} = CreationTime2,
true = is_atom(AuthorServer1) andalso is_atom(AuthorServer2), % todo will probably change
true = is_list(All_list1) andalso is_list(All_list2),
true = is_list(Down_list1) andalso is_list(Down_list2),
true = is_list(UPI_list1) andalso is_list(UPI_list2),
true = is_list(Repairing_list1) andalso is_list(Repairing_list2),
true = is_list(Dbg1) andalso is_list(Dbg2),
true = Epoch2 > Epoch1,
All_list1 = All_list2, % todo will probably change
%% No duplicates
true = lists:sort(Down_list2) == lists:usort(Down_list2),
true = lists:sort(UPI_list2) == lists:usort(UPI_list2),
true = lists:sort(Repairing_list2) == lists:usort(Repairing_list2),
%% Disjoint-ness
true = lists:sort(All_list2) == lists:sort(Down_list2 ++ UPI_list2 ++
Repairing_list2),
[] = [X || X <- Down_list2, not lists:member(X, All_list2)],
[] = [X || X <- UPI_list2, not lists:member(X, All_list2)],
[] = [X || X <- Repairing_list2, not lists:member(X, All_list2)],
DownS2 = sets:from_list(Down_list2),
UPIS2 = sets:from_list(UPI_list2),
RepairingS2 = sets:from_list(Repairing_list2),
true = sets:is_disjoint(DownS2, UPIS2),
true = sets:is_disjoint(DownS2, RepairingS2),
true = sets:is_disjoint(UPIS2, RepairingS2),
%% The author must not be down.
false = lists:member(AuthorServer1, Down_list1),
false = lists:member(AuthorServer2, Down_list2),
%% The author must be in either the UPI or repairing list.
true = lists:member(AuthorServer1, UPI_list1 ++ Repairing_list1),
true = lists:member(AuthorServer2, UPI_list2 ++ Repairing_list2),
%% Additions to the UPI chain may only be at the tail
UPI_common_prefix = find_common_prefix(UPI_list1, UPI_list2),
if UPI_common_prefix == [] ->
if UPI_list1 == [] orelse UPI_list2 == [] ->
%% If the common prefix is empty, then one of the
%% inputs must be empty.
true;
true ->
%% Otherwise, we have a case of UPI changing from
%% one of these two situations:
%%
%% UPI_list1 -> UPI_list2
%% [d,c,b,a] -> [c,a]
%% [d,c,b,a] -> [c,a,repair_finished_added_to_tail].
NotUPI2 = (Down_list2 ++ Repairing_list2),
true = lists:prefix(UPI_list1 -- NotUPI2,
UPI_list2)
end;
true ->
true
end,
true = lists:prefix(UPI_common_prefix, UPI_list1),
true = lists:prefix(UPI_common_prefix, UPI_list2),
UPI_1_suffix = UPI_list1 -- UPI_common_prefix,
UPI_2_suffix = UPI_list2 -- UPI_common_prefix,
%% Where did elements in UPI_2_suffix come from?
%% Only two sources are permitted.
[true = lists:member(X, Repairing_list1) % X added after repair done
orelse
lists:member(X, UPI_list1) % X in UPI_list1 after common pref
|| X <- UPI_2_suffix],
%% The UPI_2_suffix must exactly be equal to: ordered items from
%% UPI_list1 concat'ed with ordered items from Repairing_list1.
%% Both temp vars below preserve relative order!
UPI_2_suffix_from_UPI1 = [X || X <- UPI_1_suffix,
lists:member(X, UPI_list2)],
UPI_2_suffix_from_Repairing1 = [X || X <- UPI_2_suffix,
lists:member(X, Repairing_list1)],
%% true?
UPI_2_suffix = UPI_2_suffix_from_UPI1 ++ UPI_2_suffix_from_Repairing1,
true
catch
_Type:_Err ->
S1 = make_projection_summary(P1),
S2 = make_projection_summary(P2),
Trace = erlang:get_stacktrace(),
{err, from, S1, to, S2, stack, Trace}
end.
find_common_prefix([], _) ->
[];
find_common_prefix(_, []) ->
[];
find_common_prefix([H|L1], [H|L2]) ->
[H|find_common_prefix(L1, L2)];
find_common_prefix(_, _) ->
[].
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
make_network_partition_locations(Nodes, Seed1) ->
@ -566,13 +832,15 @@ smoke1_test() ->
%% {ok, M0} = ?MGR:start_link(I_represent, [a,b,c], {1,2,3}, 50, 50, I_am),
{ok, M0} = ?MGR:start_link(I_represent, [a,b,c], {1,2,3}, 0, 100, I_am),
try
%% ?D(x),
{ok, _P1} = test_calc_projection(M0, false),
_ = test_calc_proposed_projection(M0),
{remote_write_results,
[{b,ok},{c,ok}]} = test_write_proposed_projection(M0),
XX = test_read_latest_public_projection(M0, false),
?D(XX),
{unanimous, P1, Extra1} = test_read_latest_public_projection(M0, false),
%% ?Dw(make_projection_summary(P1)),
%% ?D(Extra1),
ok
after
@ -582,11 +850,12 @@ smoke1_test() ->
ok = machi_flu0:stop(FLUc)
end.
nonunanimous_read_setup_test() ->
nonunanimous_setup_and_fix_test() ->
{ok, FLUa} = machi_flu0:start_link(a),
{ok, FLUb} = machi_flu0:start_link(b),
I_represent = I_am = a,
{ok, Ma} = ?MGR:start_link(I_represent, [a,b], {1,2,3}, 0, 100, I_am),
{ok, Mb} = ?MGR:start_link(b, [a,b], {1,2,3}, 0, 100, b),
try
{ok, P1} = test_calc_projection(Ma, false),
@ -608,12 +877,25 @@ nonunanimous_read_setup_test() ->
%% we expect nothing to change when called again.
{not_unanimous,_,_}=_YY = test_read_latest_public_projection(Ma, true),
ok
{now_using, _} = test_react_to_env(Ma),
{unanimous,P2,E2} = test_read_latest_public_projection(Ma, false),
{ok, P2pa} = machi_flu0:proj_read_latest(FLUa, private),
P2 = P2pa#projection{dbg2=[]},
%% XX2 = test_read_public_projection(Ma, true),
%% ?D(XX2)
%% FLUb should still be using proj #0 for its private use
{ok, P0pb} = machi_flu0:proj_read_latest(FLUb, private),
0 = P0pb#projection.epoch_number,
%% Poke FLUb to react ... should be using the same private proj
%% as FLUa.
{now_using, _} = test_react_to_env(Mb),
{ok, P2pb} = machi_flu0:proj_read_latest(FLUb, private),
P2 = P2pb#projection{dbg2=[]},
ok
after
ok = ?MGR:stop(Ma),
ok = ?MGR:stop(Mb),
ok = machi_flu0:stop(FLUa),
ok = machi_flu0:stop(FLUb)
end.