Merge branch 'master' of github.com:basho/machi

This commit is contained in:
Scott Lystig Fritchie 2015-04-20 21:26:34 +09:00
commit b238eb4673
5 changed files with 240 additions and 159 deletions

View file

@ -7,7 +7,7 @@
Done via compare() func. Done via compare() func.
** DONE Change all protocol ops to add epoch ID ** DONE Change all protocol ops to add epoch ID
** TODO Add projection store to each FLU. ** DONE Add projection store to each FLU.
*** DONE What should the API look like? (borrow from chain mgr PoC?) *** DONE What should the API look like? (borrow from chain mgr PoC?)
@ -23,8 +23,16 @@ method as append/write where there's a variable size blob. But we'll
format that blob as a term_to_binary(). Then dispatch to a single format that blob as a term_to_binary(). Then dispatch to a single
func, and pattern match Erlang style in that func. func, and pattern match Erlang style in that func.
*** TODO Do it. *** DONE Do it.
** TODO Fix all known bugs with Chain Manager
*** DONE Fix known bugs
*** DONE Clean up crufty TODO comments and other obvious cruft
*** TODO Re-add verification step of stable epochs, including inner projections!
*** TODO Attempt to remove cruft items in flapping_i?
** TODO Finish OTP'izing the Chain Manager with FLU & proj store processes
** TODO Change all protocol ops to enforce the epoch ID ** TODO Change all protocol ops to enforce the epoch ID
** TODO Add projection wedging logic to each FLU. ** TODO Add projection wedging logic to each FLU.
@ -35,3 +43,4 @@ func, and pattern match Erlang style in that func.
*** TODO Make chain manager code flexible enough to run "real world" or "sim" *** TODO Make chain manager code flexible enough to run "real world" or "sim"
** TODO Replace registered name use from FLU write/append dispatcher ** TODO Replace registered name use from FLU write/append dispatcher
** TODO Move the FLU server to gen_server behavior? ** TODO Move the FLU server to gen_server behavior?
** TODO Implement real data repair, orchestrated by the chain manager

View file

@ -44,11 +44,11 @@
epoch_number :: pv1_epoch_n(), epoch_number :: pv1_epoch_n(),
epoch_csum :: pv1_csum(), epoch_csum :: pv1_csum(),
author_server :: pv1_server(), author_server :: pv1_server(),
creation_time :: pv1_timestamp(),
all_members :: [pv1_server()], all_members :: [pv1_server()],
down :: [pv1_server()], creation_time :: pv1_timestamp(),
upi :: [pv1_server()], upi :: [pv1_server()],
repairing :: [pv1_server()], repairing :: [pv1_server()],
down :: [pv1_server()],
dbg :: list(), %proplist(), is checksummed dbg :: list(), %proplist(), is checksummed
dbg2 :: list(), %proplist(), is not checksummed dbg2 :: list(), %proplist(), is not checksummed
members_dict :: p_srvr_dict() members_dict :: p_srvr_dict()

View file

@ -53,11 +53,12 @@ This is a very early experiment to try to create a distributed "rough
consensus" algorithm that is sufficient & safe for managing the order consensus" algorithm that is sufficient & safe for managing the order
of a Chain Replication chain, its members, and its chain order. of a Chain Replication chain, its members, and its chain order.
* Code status: **active**!
Unlike the other code projects in this repository's `prototype` Unlike the other code projects in this repository's `prototype`
directory, the chain management code is still under active directory, the chain management code is still under active
development. It is quite likely (as of early March 2015) that this development. However, the chain manager code here in the `prototype`
code will be robust enough to move to the "real" Machi code base soon. subdirectory will remain frozen in time.
Efforts in April 2015 have moved the chain manager code to the "top level"
of the repository. All new work is being merged weekly into the `master`
branch, see `src/machi_chain_manager1.erl` and related source at the top of
the repo.

View file

@ -139,10 +139,7 @@ init({MyName, MembersDict, MgrOpts}) ->
{flapping_i, Opt(flapping, [])}, {flapping_i, Opt(flapping, [])},
{up_nodes, Opt(up_nodes, not_init_yet)}], {up_nodes, Opt(up_nodes, not_init_yet)}],
ActiveP = Opt(active_mode, true), ActiveP = Opt(active_mode, true),
Down_list = All_list -- [MyName], NoneProj = make_none_projection(MyName, All_list, MembersDict),
UPI_list = [MyName],
NoneProj = machi_projection:new(MyName, MembersDict,
Down_list, UPI_list, [], []),
Proxies = orddict:fold( Proxies = orddict:fold(
fun(K, P, Acc) -> fun(K, P, Acc) ->
{ok, Pid} = ?FLU_PC:start_link(P), {ok, Pid} = ?FLU_PC:start_link(P),
@ -220,10 +217,15 @@ code_change(_OldVsn, S, _Extra) ->
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
make_none_projection(MyName, All_list, MembersDict) ->
Down_list = All_list,
UPI_list = [],
machi_projection:new(MyName, MembersDict, UPI_list, Down_list, [], []).
set_active_timer(#ch_mgr{name=MyName, members_dict=MembersDict}=S) -> set_active_timer(#ch_mgr{name=MyName, members_dict=MembersDict}=S) ->
FLU_list = [P#p_srvr.name || {_,P} <- orddict:to_list(MembersDict)], FLU_list = [P#p_srvr.name || {_,P} <- orddict:to_list(MembersDict)],
USec = calc_sleep_ranked_order(1000, 2000, MyName, FLU_list), USec = calc_sleep_ranked_order(1000, 2000, MyName, FLU_list),
{ok, TRef} = timer:send_interval(USec, yo_yo_yo), {ok, TRef} = timer:send_interval(USec, yo_yo_yo_todo),
S#ch_mgr{timer=TRef}. S#ch_mgr{timer=TRef}.
do_cl_write_public_proj(Proj, S) -> do_cl_write_public_proj(Proj, S) ->
@ -516,6 +518,31 @@ rank_and_sort_projections(Ps, CurrentProj) ->
%% Caller must ensure all Projs are of the same epoch number. %% Caller must ensure all Projs are of the same epoch number.
%% If the caller gives us projections with different epochs, we assume %% If the caller gives us projections with different epochs, we assume
%% that the caller is doing an OK thing. %% that the caller is doing an OK thing.
%%
%% TODO: This implementation currently gives higher rank to the last
%% member of All_list, which is typically/always/TODO-CLARIFY
%% sorted. That's fine, but there's a source of unnecessary
%% churn: during repair, we assume that the head of the chain is
%% the coordinator of the repair. So any time that the head
%% makes a repair-related transition, that projection may get
%% quickly replaced by an identical projection that merely has
%% higher rank because it's authored by a higher-ranked member.
%% Worst case, for chain len=4:
%% E+0: author=a, upi=[a], repairing=[b,c,d]
%% E+1: author=b, upi=[a], repairing=[b,c,d] (**)
%% E+2: author=c, upi=[a], repairing=[b,c,d] (**)
%% E+3: author=d, upi=[a], repairing=[b,c,d] (**)
%% E+4: author=a, upi=[a,b], repairing=[c,d]
%% E+5: author=b, upi=[a,b], repairing=[c,d] (**)
%% E+6: author=c, upi=[a,b], repairing=[c,d] (**)
%% E+7: author=d, upi=[a,b], repairing=[c,d] (**)
%% E+... 6 more (**) epochs when c & d finish their respective repairs.
%% Ideally, the "(**)" epochs are avoidable churn.
%% Perhaps this means that we should change the responsibility
%% for repair management to the highest ranking member of the
%% UPI_list?
%% TODO Hrrrmmmmm ... what about the TODO comment in A40's A40a clause?
%% That could perhaps resolve this same problem in a better way?
rank_projections(Projs, CurrentProj) -> rank_projections(Projs, CurrentProj) ->
#projection_v1{all_members=All_list} = CurrentProj, #projection_v1{all_members=All_list} = CurrentProj,
@ -530,8 +557,7 @@ rank_projection(#projection_v1{author_server=Author,
upi=UPI_list, upi=UPI_list,
repairing=Repairing_list}, MemberRank, N) -> repairing=Repairing_list}, MemberRank, N) ->
AuthorRank = orddict:fetch(Author, MemberRank), AuthorRank = orddict:fetch(Author, MemberRank),
%% (AuthorRank-AuthorRank) + % feels unstable???? AuthorRank +
AuthorRank + % feels stable
( N * length(Repairing_list)) + ( N * length(Repairing_list)) +
(N*N * length(UPI_list)). (N*N * length(UPI_list)).
@ -580,6 +606,7 @@ react_to_env_A30(Retries, P_latest, LatestUnanimousP, _ReadExtra,
flap_limit=FlapLimit} = S) -> flap_limit=FlapLimit} = S) ->
?REACT(a30), ?REACT(a30),
{P_newprop1, S2} = calc_projection(S, MyName), {P_newprop1, S2} = calc_projection(S, MyName),
?REACT({a30, ?LINE, [{current, machi_projection:make_summary(S#ch_mgr.proj)}]}),
?REACT({a30, ?LINE, [{newprop1, machi_projection:make_summary(P_newprop1)}]}), ?REACT({a30, ?LINE, [{newprop1, machi_projection:make_summary(P_newprop1)}]}),
%% Are we flapping yet? %% Are we flapping yet?
@ -587,7 +614,8 @@ react_to_env_A30(Retries, P_latest, LatestUnanimousP, _ReadExtra,
%% Move the epoch number up ... originally done in C300. %% Move the epoch number up ... originally done in C300.
#projection_v1{epoch_number=Epoch_newprop2}=P_newprop2, #projection_v1{epoch_number=Epoch_newprop2}=P_newprop2,
#projection_v1{epoch_number=Epoch_latest}=P_latest, #projection_v1{epoch_number=Epoch_latest,
author_server=Author_latest}=P_latest,
NewEpoch = erlang:max(Epoch_newprop2, Epoch_latest) + 1, NewEpoch = erlang:max(Epoch_newprop2, Epoch_latest) + 1,
P_newprop3 = P_newprop2#projection_v1{epoch_number=NewEpoch}, P_newprop3 = P_newprop2#projection_v1{epoch_number=NewEpoch},
?REACT({a30, ?LINE, [{newprop3, machi_projection:make_summary(P_newprop3)}]}), ?REACT({a30, ?LINE, [{newprop3, machi_projection:make_summary(P_newprop3)}]}),
@ -595,7 +623,7 @@ react_to_env_A30(Retries, P_latest, LatestUnanimousP, _ReadExtra,
{P_newprop10, S10} = {P_newprop10, S10} =
case get_flap_count(P_newprop3) of case get_flap_count(P_newprop3) of
{_, P_newprop3_flap_count} when P_newprop3_flap_count >= FlapLimit -> {_, P_newprop3_flap_count} when P_newprop3_flap_count >= FlapLimit ->
AllHosed = get_all_hosed(S3), AllHosed = get_all_hosed(P_newprop3),
{P_i, S_i} = calc_projection(S3, MyName, AllHosed), {P_i, S_i} = calc_projection(S3, MyName, AllHosed),
P_inner = case lists:member(MyName, AllHosed) of P_inner = case lists:member(MyName, AllHosed) of
false -> false ->
@ -632,14 +660,17 @@ react_to_env_A30(Retries, P_latest, LatestUnanimousP, _ReadExtra,
andalso andalso
P_oldinner#projection_v1.down == P_oldinner#projection_v1.down ==
P_inner#projection_v1.down -> P_inner#projection_v1.down ->
%% HRM, distrust?... P_oldinner#projection_v1.epoch_number;
%% P_oldinner#projection_v1.epoch_number;
P_oldinner#projection_v1.epoch_number + 1;
true -> true ->
P_oldinner#projection_v1.epoch_number + 1 P_oldinner#projection_v1.epoch_number + 1
end end
end, end,
%% TODO: When we implement the real chain repair function, we
%% need to keep in mind that an inner projection with
%% up nodes > 1, repair is required there! In the
%% current simulator, repair is not simulated and
%% finished (and then growing the UPI list). Fix.
P_inner2 = P_inner#projection_v1{epoch_number=FinalInnerEpoch}, P_inner2 = P_inner#projection_v1{epoch_number=FinalInnerEpoch},
InnerInfo = [{inner_summary, machi_projection:make_summary(P_inner2)}, InnerInfo = [{inner_summary, machi_projection:make_summary(P_inner2)},
{inner_projection, P_inner2}], {inner_projection, P_inner2}],
@ -650,8 +681,106 @@ react_to_env_A30(Retries, P_latest, LatestUnanimousP, _ReadExtra,
{P_newprop3, S3} {P_newprop3, S3}
end, end,
react_to_env_A40(Retries, P_newprop10, P_latest, %% Here's a more common reason for moving from inner projection to
LatestUnanimousP, S10). %% a normal projection: the old proj has an inner but the newprop
%% does not.
MoveFromInnerToNorm_p =
case {inner_projection_exists(P_current),
inner_projection_exists(P_newprop10)} of
{true, false} -> true;
{_, _} -> false
end,
%% If P_current says that we believe that we're currently flapping,
%% and if P_newprop10 says that we're no longer flapping, then we
%% really ought to stop flapping, right.
%%
%% Not quite so simple....
%%
%% AAAAH, right. The case I'm dealing with right now is an asymmetric
%% partition in a 4 member chain that affects all_hosed=[a,b,c] but
%% member D is *NOT* noticing anything different in the current scheme:
%% {inner_projection_exists(current), inner_projection_exists(new)}
%% is {true, true}.
%% Yes, that hypothesis is confirmed by time-honored io:format() tracing.
%%
%% So, we need something to kick a silly member like 'd' out of its
%% rut of am-still-flapping. So, let's try this:
%% If we see a P_latest from author != MyName, and if P_latest's
%% author's flap count is now 0 (latest!), but that same member's
%% flap count in P_current is non-zero, then we assume that author
%% has moved out of flapping state and that therefore we ought to do
%% the same.
%% Remember! P_current is this manager's private in-use projection.
%% It is always less than or equal to P_latest's epoch!
Current_flap_counts = get_all_flap_counts(P_current),
Latest_authors_flap_count_current = proplists:get_value(
Author_latest, Current_flap_counts),
Latest_flap_counts = get_all_flap_counts(P_latest),
Latest_authors_flap_count_latest = proplists:get_value(
Author_latest, Latest_flap_counts),
Kicker_p = case {Latest_authors_flap_count_current,
Latest_authors_flap_count_latest} of
{NotUndef, undefined} when NotUndef /= undefined ->
true;
{_, _} ->
false
end,
if MoveFromInnerToNorm_p orelse Kicker_p ->
ClauseInfo = [{inner_kicker, Kicker_p},
{move_from_inner, MoveFromInnerToNorm_p}],
?REACT({a30, ?LINE, ClauseInfo}),
%% %% 2015-04-14: YEAH, this appears to work!
%% %% 1. Create a "safe" projection that is upi=[],repairing=[]
%% %% 2. Declare it to be best & latest by pure fiat.
%% %% (The C100 transition will double-check that it's safe.)
%% %% 3. Jump to C100. Then, for the next iteration,
%% %% our P_current state to a smallest-possible-score
%% %% state ... and let the chain reassemble itself from
%% %% length zero.
%% #projection_v1{epoch_number=Epoch_newprop10, all_members=All_list,
%% members_dict=MembersDict} = P_newprop10,
%% P_noneprop0 = make_none_projection(MyName, All_list, MembersDict),
%% P_noneprop1 = P_noneprop0#projection_v1{epoch_number=Epoch_newprop10},
%% %% Just to be clear, we clobber any flapping info by setting dbg.
%% P_noneprop = P_noneprop1#projection_v1{dbg=ClauseInfo},
%% react_to_env_C100(P_noneprop, P_latest, S);
%% 2015-04-14: Let's experiment with using the current inner
%% projection (or, if there really is no inner, just P_current).
%% This is safe because it's already P_current and by assumption,
%% anything that made it through the logical maze to get here
%% is safe. So re-using it with a higher epoch number doesn't
%% make any significant change.
%%
%% Yeah, it appears to work, also, nice! This can help save some
%% repair operations (compared to the other safe thing to do
%% here, which uses make_none_projection() to build & repair the
%% entire chain from scratch). Note that this isn't a guarantee
%% that repair steps will be minimized: for a 4-member cluster
%% that has an asymmetric partition which organizes 3 clusters of
%% inner-upi=[a], inner-upi=[b], and inner-upi[c,d], there is no
%% guarantee (yet?) that the [c,d] chain will be the UPI basis
%% for repairs when the partition is healed: the quickest author
%% after the healing will make that choice for everyone.
%% TODO: Perhaps that quickest author should consult all of the
%% other private stores, check their inner, and if there is a
%% higher rank there, then goto C200 for a wait-and-see cycle?
P_inner2A = inner_projection_or_self(P_current),
P_inner2B =
P_inner2A#projection_v1{epoch_number=
P_newprop10#projection_v1.epoch_number,
dbg=ClauseInfo},
react_to_env_C100(P_inner2B, P_latest, S);
true ->
?REACT({a30, ?LINE, []}),
react_to_env_A40(Retries, P_newprop10, P_latest,
LatestUnanimousP, S10)
end.
react_to_env_A40(Retries, P_newprop, P_latest, LatestUnanimousP, react_to_env_A40(Retries, P_newprop, P_latest, LatestUnanimousP,
#ch_mgr{name=MyName, proj=P_current}=S) -> #ch_mgr{name=MyName, proj=P_current}=S) ->
@ -710,7 +839,7 @@ react_to_env_A40(Retries, P_newprop, P_latest, LatestUnanimousP,
%% A40a (see flowchart) %% A40a (see flowchart)
Rank_newprop > Rank_latest -> Rank_newprop > Rank_latest ->
?REACT({b10, ?LINE, ?REACT({a40, ?LINE,
[{rank_latest, Rank_latest}, [{rank_latest, Rank_latest},
{rank_newprop, Rank_newprop}, {rank_newprop, Rank_newprop},
{latest_author, P_latest#projection_v1.author_server}]}), {latest_author, P_latest#projection_v1.author_server}]}),
@ -774,11 +903,6 @@ react_to_env_A40(Retries, P_newprop, P_latest, LatestUnanimousP,
react_to_env_A50(P_latest, FinalProps, S) -> react_to_env_A50(P_latest, FinalProps, S) ->
?REACT(a50), ?REACT(a50),
_HH = get(react),
% io:format(user, "HEE50s ~w ~w ~w\n", [S#ch_mgr.name, self(), lists:reverse([X || X <- _HH, is_atom(X)])]),
%% io:format(user, "HEE50 ~w ~w ~p\n", [S#ch_mgr.name, self(), lists:reverse(_HH)]),
?REACT({a50, ?LINE, [{latest_epoch, P_latest#projection_v1.epoch_number}, ?REACT({a50, ?LINE, [{latest_epoch, P_latest#projection_v1.epoch_number},
{final_props, FinalProps}]}), {final_props, FinalProps}]}),
{{no_change, FinalProps, P_latest#projection_v1.epoch_number}, S}. {{no_change, FinalProps, P_latest#projection_v1.epoch_number}, S}.
@ -789,14 +913,44 @@ react_to_env_B10(Retries, P_newprop, P_latest, LatestUnanimousP,
?REACT(b10), ?REACT(b10),
{_P_newprop_flap_time, P_newprop_flap_count} = get_flap_count(P_newprop), {_P_newprop_flap_time, P_newprop_flap_count} = get_flap_count(P_newprop),
_LatestAllFlapCounts = get_all_flap_counts_counts(P_latest), UnanimousLatestInnerNotRelevant_p =
%% Transitive flap counts are buggy: the current method to observe case inner_projection_exists(P_latest) of
%% them is **buggy**. true when P_latest#projection_v1.author_server /= MyName ->
%% P_latest_trans_flap_count = my_find_minmost(LatestAllFlapCounts), #projection_v1{down=Down_inner} = inner_projection_or_self(
P_latest),
case lists:member(MyName, Down_inner) of
true ->
%% Some foreign author's inner projection thinks that
%% I'm down. Silly! We ought to ignore this one.
?REACT({b10, ?LINE, [{down_inner, Down_inner}]}),
true;
false ->
?REACT({b10, ?LINE, [{down_inner, Down_inner}]}),
false
end;
_Else_u ->
false
end,
if if
LatestUnanimousP
andalso
UnanimousLatestInnerNotRelevant_p ->
?REACT({b10, ?LINE, []}),
put(b10_hack, false),
%% Do not go to C100, because we want to ignore this latest
%% proposal. Write ours instead via C300.
react_to_env_C300(P_newprop, P_latest, S);
LatestUnanimousP -> LatestUnanimousP ->
?REACT({b10, ?LINE, [{latest_unanimous_p, LatestUnanimousP}]}), ?REACT({b10, ?LINE,
[{latest_unanimous_p, LatestUnanimousP},
{latest_epoch,P_latest#projection_v1.epoch_number},
{latest_author,P_latest#projection_v1.author_server},
{newprop_epoch,P_newprop#projection_v1.epoch_number},
{newprop_author,P_newprop#projection_v1.author_server}
]}),
put(b10_hack, false), put(b10_hack, false),
react_to_env_C100(P_newprop, P_latest, S); react_to_env_C100(P_newprop, P_latest, S);
@ -805,25 +959,11 @@ react_to_env_B10(Retries, P_newprop, P_latest, LatestUnanimousP,
%% I am flapping ... what else do I do? %% I am flapping ... what else do I do?
?REACT({b10, ?LINE, [i_am_flapping, ?REACT({b10, ?LINE, [i_am_flapping,
{newprop_flap_count, P_newprop_flap_count}, {newprop_flap_count, P_newprop_flap_count},
%% {latest_trans_flap_count, P_latest_trans_flap_count},
{flap_limit, FlapLimit}]}), {flap_limit, FlapLimit}]}),
_B10Hack = get(b10_hack), _B10Hack = get(b10_hack),
%% if _B10Hack == false andalso P_newprop_flap_count - FlapLimit - 3 =< 0 -> io:format(user, "{FLAP: ~w flaps ~w}!\n", [S#ch_mgr.name, P_newprop_flap_count]), put(b10_hack, true); true -> ok end,
io:format(user, "{FLAP: ~w flaps ~w}!\n", [S#ch_mgr.name, P_newprop_flap_count]), io:format(user, "{FLAP: ~w flaps ~w}!\n", [S#ch_mgr.name, P_newprop_flap_count]),
%io:format(user, "FLAP: ~w flapz ~w ~w\n", [S#ch_mgr.name, self(), lists:reverse([X || X <- get(react), is_atom(X)])]),
if if
%% So, if we noticed a flap count by some FLU X with a
%% count below FlapLimit, then X crashes so that X's
%% flap count remains below FlapLimit, then we could get
%% stuck forever? Hrm, except that 'crashes' ought to be
%% detected by our own failure detector and get us out of
%% this current flapping situation, right? TODO
%%
%% 2015-04-10: TODO Flap count detection, as it has
%% been attempted before now, is buggy.
%%
%% MEANWHILE, we have learned some things about this %% MEANWHILE, we have learned some things about this
%% algorithm in the past few months. With the introduction %% algorithm in the past few months. With the introduction
%% of the "inner projection" concept, we know that the inner %% of the "inner projection" concept, we know that the inner
@ -839,7 +979,7 @@ react_to_env_B10(Retries, P_newprop, P_latest, LatestUnanimousP,
%% doesn't give an accurate sense of global flapping state. %% doesn't give an accurate sense of global flapping state.
%% FlapLimit is enough to be able to tell us to slow down. %% FlapLimit is enough to be able to tell us to slow down.
true -> %% P_latest_trans_flap_count >= FlapLimit -> true ->
%% We already know that I'm flapping. We need to %% We already know that I'm flapping. We need to
%% signal to the rest of the world that I'm writing %% signal to the rest of the world that I'm writing
%% and flapping and churning, so we cannot always %% and flapping and churning, so we cannot always
@ -857,7 +997,6 @@ react_to_env_B10(Retries, P_newprop, P_latest, LatestUnanimousP,
end, end,
FinalProps = [{my_flap_limit, FlapLimit}, FinalProps = [{my_flap_limit, FlapLimit},
{throttle_seconds, ThrottleTime}], {throttle_seconds, ThrottleTime}],
io:format(user, "<--x=~w-.--~w-~w-~w->", [X, MyName, P_newprop_flap_count,FlapLimit]),
react_to_env_A50(P_latest, FinalProps, S2); react_to_env_A50(P_latest, FinalProps, S2);
true -> true ->
%% It is our moral imperative to write so that %% It is our moral imperative to write so that
@ -865,7 +1004,6 @@ io:format(user, "<--x=~w-.--~w-~w-~w->", [X, MyName, P_newprop_flap_count,FlapLi
%% everyone notices then eventually falls into %% everyone notices then eventually falls into
%% consensus. %% consensus.
?REACT({b10, ?LINE, [flap_continue]}), ?REACT({b10, ?LINE, [flap_continue]}),
io:format(user, "<--x=~w-oooo-~w-~w-~w->", [X, MyName, P_newprop_flap_count,FlapLimit]),
react_to_env_C300(P_newprop, P_latest, S2) react_to_env_C300(P_newprop, P_latest, S2)
end end
end; end;
@ -887,13 +1025,16 @@ io:format(user, "<--x=~w-oooo-~w-~w-~w->", [X, MyName, P_newprop_flap_count,Flap
{latest_author, P_latest#projection_v1.author_server}]}), {latest_author, P_latest#projection_v1.author_server}]}),
put(b10_hack, false), put(b10_hack, false),
%% Give the author of P_latest an opportunite to write a %% TODO: Is a UnanimousLatestInnerNotRelevant_p test needed in this clause???
%% Give the author of P_latest an opportunity to write a
%% new projection in a new epoch to resolve this mixed %% new projection in a new epoch to resolve this mixed
%% opinion. %% opinion.
react_to_env_C200(Retries, P_latest, S); react_to_env_C200(Retries, P_latest, S);
true -> true ->
?REACT({b10, ?LINE}), ?REACT({b10, ?LINE}),
?REACT({b10, ?LINE, [{retries,Retries},{rank_latest, Rank_latest}, {rank_newprop, Rank_newprop}, {latest_author, P_latest#projection_v1.author_server}]}), % TODO debug delete me!
put(b10_hack, false), put(b10_hack, false),
%% P_newprop is best, so let's write it. %% P_newprop is best, so let's write it.
@ -904,21 +1045,6 @@ react_to_env_C100(P_newprop, P_latest,
#ch_mgr{name=MyName, proj=P_current}=S) -> #ch_mgr{name=MyName, proj=P_current}=S) ->
?REACT(c100), ?REACT(c100),
%% TODO 2015-04-10
%% OK, well, we need to be checking sanity on inner projections here,
%% but how to do it is still a bit of a mystery.
%%
%% If the *Y bindings are identical to incoming args, then we aren't
%% checking at all. That's bad, but we don't go into Infinite Loops of
%% ReallyReallyBad.
P_newpropY = P_newprop,
P_latestY = P_latest,
P_currentY = P_current,
%% P_newpropY = inner_projection_or_self(P_newprop),
%% P_latestY = inner_projection_or_self(P_latest),
%% P_currentY = inner_projection_or_self(P_current),
I_am_UPI_in_newprop_p = lists:member(MyName, P_newprop#projection_v1.upi), I_am_UPI_in_newprop_p = lists:member(MyName, P_newprop#projection_v1.upi),
I_am_Repairing_in_latest_p = lists:member(MyName, I_am_Repairing_in_latest_p = lists:member(MyName,
P_latest#projection_v1.repairing), P_latest#projection_v1.repairing),
@ -931,13 +1057,6 @@ react_to_env_C100(P_newprop, P_latest,
Current_sane_p = projection_transition_is_sane(P_current, P_latest, Current_sane_p = projection_transition_is_sane(P_current, P_latest,
MyName), MyName),
Inner_sane_p =
if P_currentY == P_current, P_latestY == P_latest ->
true;
true ->
projection_transition_is_sane(P_currentY, P_latestY, MyName)
end,
case {ShortCircuit_p, Current_sane_p} of case {ShortCircuit_p, Current_sane_p} of
_ when P_current#projection_v1.epoch_number == 0 -> _ when P_current#projection_v1.epoch_number == 0 ->
%% Epoch == 0 is reserved for first-time, just booting conditions. %% Epoch == 0 is reserved for first-time, just booting conditions.
@ -949,43 +1068,24 @@ react_to_env_C100(P_newprop, P_latest,
%% am/should be repairing. We ignore our proposal and try %% am/should be repairing. We ignore our proposal and try
%% to go with the latest. %% to go with the latest.
?REACT({c100, ?LINE, [repairing_short_circuit]}), ?REACT({c100, ?LINE, [repairing_short_circuit]}),
if Inner_sane_p == false -> io:format(user, "QQQ line ~p false\n", [?LINE]), timer:sleep(500); true -> ok end,
react_to_env_C110(P_latest, S); react_to_env_C110(P_latest, S);
{_, true} when Inner_sane_p -> {_, true} ->
?REACT({c100, ?LINE, [sane]}), ?REACT({c100, ?LINE, [sane]}),
if Inner_sane_p == false -> io:format(user, "QQQ line ~p false\n", [?LINE]), timer:sleep(500); true -> ok end,
react_to_env_C110(P_latest, S); react_to_env_C110(P_latest, S);
{_, _AnyOtherReturnValue} -> {_, _AnyOtherReturnValue} ->
%% P_latest is not sane or else P_latestY is not sane. %% P_latest is not sane.
%% By process of elimination, P_newprop is best, %% By process of elimination, P_newprop is best,
%% so let's write it. %% so let's write it.
io:format(user, "\nUrp: ~p ~p ~p ~p\n", [MyName, ShortCircuit_p, _AnyOtherReturnValue, Inner_sane_p]),
io:format(user, "c100 P_newprop : ~w\n", [machi_projection:make_summary(P_newprop)]),
io:format(user, "c100 P_newpropY: ~w\n", [machi_projection:make_summary(P_newpropY)]),
io:format(user, "c100 P_latest : ~w\n", [machi_projection:make_summary(P_latest)]),
io:format(user, "c100 P_latestY: ~w\n", [machi_projection:make_summary(P_latestY)]),
?REACT({c100, ?LINE, [not_sane]}), ?REACT({c100, ?LINE, [not_sane]}),
react_to_env_C300(P_newprop, P_latest, S) react_to_env_C300(P_newprop, P_latest, S)
end. end.
react_to_env_C110(P_latest, #ch_mgr{name=MyName} = S) -> react_to_env_C110(P_latest, #ch_mgr{name=MyName} = S) ->
?REACT(c110), ?REACT(c110),
%% TOOD: Should we carry along any extra info that that would be useful
%% in the dbg2 list?
Extra_todo = [], Extra_todo = [],
RunEnv = S#ch_mgr.runenv, P_latest2 = machi_projection:update_dbg2(P_latest, Extra_todo),
Islands = proplists:get_value(network_islands, RunEnv),
P_latest2 = machi_projection:update_dbg2(
P_latest,
[%% {network_islands, Islands},
%% {hooray, {v2, date(), time()}}
Islands--Islands
|Extra_todo]),
MyNamePid = proxy_pid(MyName, S), MyNamePid = proxy_pid(MyName, S),
%% TODO: We need to fetch the inner projection, if it exists, and
%% write it to the private store. Put the outer projection
%% into dbg2 for forensics and perhaps re-start use?
ok = ?FLU_PC:write_projection(MyNamePid, private, P_latest2, ?TO), ok = ?FLU_PC:write_projection(MyNamePid, private, P_latest2, ?TO),
case proplists:get_value(private_write_verbose, S#ch_mgr.opts) of case proplists:get_value(private_write_verbose, S#ch_mgr.opts) of
true -> true ->
@ -994,16 +1094,15 @@ react_to_env_C110(P_latest, #ch_mgr{name=MyName} = S) ->
{HH,MM,SS} = time(), {HH,MM,SS} = time(),
case inner_projection_exists(P_latest2) of case inner_projection_exists(P_latest2) of
false -> false ->
ok; io:format(user, "\n~2..0w:~2..0w:~2..0w.~3..0w ~p uses plain: ~w\n",
[HH,MM,SS,MSec, S#ch_mgr.name,
machi_projection:make_summary(P_latest2)]);
true -> true ->
P_inner = inner_projection_or_self(P_latest2), P_inner = inner_projection_or_self(P_latest2),
io:format(user, "\n~2..0w:~2..0w:~2..0w.~3..0w ~p uses INNER: ~w\n", io:format(user, "\n~2..0w:~2..0w:~2..0w.~3..0w ~p uses inner: ~w\n",
[HH,MM,SS,MSec, S#ch_mgr.name, [HH,MM,SS,MSec, S#ch_mgr.name,
machi_projection:make_summary(P_inner)]) machi_projection:make_summary(P_inner)])
end, end;
io:format(user, "\n~2..0w:~2..0w:~2..0w.~3..0w ~p uses: ~w\n",
[HH,MM,SS,MSec, S#ch_mgr.name,
machi_projection:make_summary(P_latest2)]);
_ -> _ ->
ok ok
end, end,
@ -1023,10 +1122,8 @@ react_to_env_C120(P_latest, FinalProps, #ch_mgr{proj_history=H} = S) ->
_ -> _ ->
H2 H2
end, end,
%% HH = [if is_atom(X) -> X; is_tuple(X) -> {element(1,X), element(2,X)} end || X <- get(react), is_atom(X) orelse size(X) == 3],
HH = get(react), %% io:format(user, "HEE120 ~w ~w ~w\n", [S#ch_mgr.name, self(), lists:reverse(HH)]),
io:format(user, "HEE120s ~w ~w ~w\n", [S#ch_mgr.name, self(), lists:reverse([X || X <- HH, is_atom(X)])]),
%% io:format(user, "HEE120 ~w ~w ~p\n", [S#ch_mgr.name, self(), lists:reverse(HH)]),
?REACT({c120, [{latest, machi_projection:make_summary(P_latest)}]}), ?REACT({c120, [{latest, machi_projection:make_summary(P_latest)}]}),
{{now_using, FinalProps, P_latest#projection_v1.epoch_number}, {{now_using, FinalProps, P_latest#projection_v1.epoch_number},
@ -1075,7 +1172,7 @@ react_to_env_C310(P_newprop, S) ->
{write_result, WriteRes}]}), {write_result, WriteRes}]}),
react_to_env_A10(S2). react_to_env_A10(S2).
calculate_flaps(P_newprop, _P_current, FlapLimit, calculate_flaps(P_newprop, _P_current, _FlapLimit,
#ch_mgr{name=MyName, proj_history=H, flap_start=FlapStart, #ch_mgr{name=MyName, proj_history=H, flap_start=FlapStart,
flaps=Flaps, runenv=RunEnv0} = S) -> flaps=Flaps, runenv=RunEnv0} = S) ->
RunEnv1 = replace(RunEnv0, [{flapping_i, []}]), RunEnv1 = replace(RunEnv0, [{flapping_i, []}]),
@ -1175,34 +1272,26 @@ calculate_flaps(P_newprop, _P_current, FlapLimit,
AllHosed = [] AllHosed = []
end, end,
%% If there's at least one count in AllFlapCounts that isn't my
%% flap count, and if it's over the flap limit, then consider them
%% settled.
AllFlapCountsSettled = lists:keydelete(MyName, 1, AllFlapCounts) /= []
andalso
my_find_minmost(AllFlapCounts) >= FlapLimit,
FlappingI = {flapping_i, [{flap_count, {NewFlapStart, NewFlaps}}, FlappingI = {flapping_i, [{flap_count, {NewFlapStart, NewFlaps}},
{all_hosed, AllHosed}, {all_hosed, AllHosed},
{all_flap_counts, lists:sort(AllFlapCounts)}, {all_flap_counts, lists:sort(AllFlapCounts)},
{all_flap_counts_settled, AllFlapCountsSettled}, {bad,BadFLUs}]},
{bad,BadFLUs},
{da_downu, DownUnion}, % debugging aid
{da_hosedtu, HosedTransUnion}, % debugging aid
{da_downreports, [{P#projection_v1.epoch_number, P#projection_v1.author_server, P#projection_v1.down} || P <- [BestP|NotBestPs]]} % debugging aid
]},
Dbg2 = [FlappingI|P_newprop#projection_v1.dbg], Dbg2 = [FlappingI|P_newprop#projection_v1.dbg],
%% SLF TODO: 2015-03-04: I'm growing increasingly suspicious of %% TODO: 2015-03-04: I'm growing increasingly suspicious of
%% the 'runenv' variable that's threaded through all this code. %% the 'runenv' variable that's threaded through all this code.
%% It isn't doing what I'd originally intended. And I think that %% It isn't doing what I'd originally intended. Fix it.
%% the flapping information that we've just constructed here is
%% going to get lost, and that's a shame. Fix it.
RunEnv2 = replace(RunEnv1, [FlappingI]), RunEnv2 = replace(RunEnv1, [FlappingI]),
%% NOTE: If we'd increment of flaps here, that doesn't mean that %% NOTE: Just because we increment flaps here, there's no correlation
%% someone's public proj store has been updated. For example, %% to successful public proj store writes! For example,
%% if we loop through states C2xx a few times, we would incr %% if we loop through states C2xx a few times, we would incr
%% flaps each time ... but the C2xx path doesn't write a new %% flaps each time ... but the C2xx path doesn't write a new
%% proposal to everyone's public proj stores, and there's no %% proposal to everyone's public proj stores. Similarly,
%% guarantee that anyone else as written a new public proj either. %% if we go through to C300, we will *try* to write to all public
%% stores, but the C3xx path doesn't care if all of those write
%% attempts *fail*. Our flap count is a rough heuristic only, and
%% a large local flaps count gives no concrete guarantee that any
%% communication has been successful with any other part of the
%% cluster.
{machi_projection:update_checksum(P_newprop#projection_v1{dbg=Dbg2}), {machi_projection:update_checksum(P_newprop#projection_v1{dbg=Dbg2}),
S#ch_mgr{flaps=NewFlaps, flap_start=NewFlapStart, runenv=RunEnv2}}. S#ch_mgr{flaps=NewFlaps, flap_start=NewFlapStart, runenv=RunEnv2}}.
@ -1271,7 +1360,7 @@ projection_transition_is_sane(
true = is_binary(CSum1) andalso is_binary(CSum2), true = is_binary(CSum1) andalso is_binary(CSum2),
{_,_,_} = CreationTime1, {_,_,_} = CreationTime1,
{_,_,_} = CreationTime2, {_,_,_} = CreationTime2,
true = is_atom(AuthorServer1) andalso is_atom(AuthorServer2), % todo will probably change true = is_atom(AuthorServer1) andalso is_atom(AuthorServer2), % todo type may change?
true = is_list(All_list1) andalso is_list(All_list2), true = is_list(All_list1) andalso is_list(All_list2),
true = is_list(Down_list1) andalso is_list(Down_list2), true = is_list(Down_list1) andalso is_list(Down_list2),
true = is_list(UPI_list1) andalso is_list(UPI_list2), true = is_list(UPI_list1) andalso is_list(UPI_list2),
@ -1299,13 +1388,6 @@ projection_transition_is_sane(
true = sets:is_disjoint(DownS2, RepairingS2), true = sets:is_disjoint(DownS2, RepairingS2),
true = sets:is_disjoint(UPIS2, RepairingS2), true = sets:is_disjoint(UPIS2, RepairingS2),
%% The author must not be down.
false = lists:member(AuthorServer1, Down_list1),
false = lists:member(AuthorServer2, Down_list2),
%% The author must be in either the UPI or repairing list.
true = lists:member(AuthorServer1, UPI_list1 ++ Repairing_list1),
true = lists:member(AuthorServer2, UPI_list2 ++ Repairing_list2),
%% Additions to the UPI chain may only be at the tail %% Additions to the UPI chain may only be at the tail
UPI_common_prefix = find_common_prefix(UPI_list1, UPI_list2), UPI_common_prefix = find_common_prefix(UPI_list1, UPI_list2),
if UPI_common_prefix == [] -> if UPI_common_prefix == [] ->
@ -1427,10 +1509,7 @@ projection_transition_is_sane(
%% then adopts that projection (and unwedges %% then adopts that projection (and unwedges
%% itself, etc etc). %% itself, etc etc).
%% io:format(user, "QQQ: RetrospectiveP ~p\n", [RetrospectiveP]), exit({todo, revisit, ?MODULE, ?LINE}),
%% io:format(user, "QQQ: UPI_2_suffix ~p\n", [UPI_2_suffix]),
%% io:format(user, "QQQ: UPI_2_suffix_from_UPI1 ~p\n", [UPI_2_suffix_from_UPI1]),
%% io:format(user, "QQQ: UPI_2_suffix_from_Repairing1 ~p\n", [UPI_2_suffix_from_Repairing1]),
io:format(user, "|~p,~p TODO revisit|", io:format(user, "|~p,~p TODO revisit|",
[?MODULE, ?LINE]), [?MODULE, ?LINE]),
ok; ok;
@ -1440,9 +1519,11 @@ projection_transition_is_sane(
%% normal has a UPI that has nothing to do with %% normal has a UPI that has nothing to do with
%% RelativeToServer a.k.a. me. %% RelativeToServer a.k.a. me.
%% from: %% from:
%% {epoch,847},{author,c},{upi,[c]},{repair,[]},{down,[a,b,d]}, %% {epoch,847},{author,c},{upi,[c]},{repair,[]},
%% {down,[a,b,d]}
%% to: %% to:
%% {epoch,848},{author,a},{upi,[a]},{repair,[]},{down,[b,c,d]}, %% {epoch,848},{author,a},{upi,[a]},{repair,[]},
%% {down,[b,c,d]}
if UPI_2_suffix == [AuthorServer2] -> if UPI_2_suffix == [AuthorServer2] ->
true; true;
not RetrospectiveP -> not RetrospectiveP ->
@ -1457,12 +1538,6 @@ projection_transition_is_sane(
S1 = machi_projection:make_summary(P1), S1 = machi_projection:make_summary(P1),
S2 = machi_projection:make_summary(P2), S2 = machi_projection:make_summary(P2),
Trace = erlang:get_stacktrace(), Trace = erlang:get_stacktrace(),
%% %% TODO: this history goop is useful sometimes for debugging but
%% %% not for any "real" use. Get rid of it, for the long term.
%% H = (catch [{FLUName, Type, P#projection_v1.epoch_number, machi_projection:make_summary(P)} ||
%% FLUName <- P1#projection_v1.all_members,
%% Type <- [public,private],
%% P <- ?FLU_PC:proj_get_all(orddict:fetch(FLUName, What?), Type)]),
{err, _Type, _Err, from, S1, to, S2, relative_to, RelativeToServer, {err, _Type, _Err, from, S1, to, S2, relative_to, RelativeToServer,
history, (catch lists:sort([no_history])), history, (catch lists:sort([no_history])),
stack, Trace} stack, Trace}
@ -1501,7 +1576,7 @@ get_raw_flapping_i(#projection_v1{dbg=Dbg}) ->
proplists:get_value(flapping_i, Dbg, []). proplists:get_value(flapping_i, Dbg, []).
get_flap_count(P) -> get_flap_count(P) ->
proplists:get_value(flap_count, get_raw_flapping_i(P), 0). proplists:get_value(flap_count, get_raw_flapping_i(P), {0,0}).
get_all_flap_counts(P) -> get_all_flap_counts(P) ->
proplists:get_value(all_flap_counts, get_raw_flapping_i(P), []). proplists:get_value(all_flap_counts, get_raw_flapping_i(P), []).
@ -1515,11 +1590,7 @@ get_all_flap_counts_counts(P) ->
end. end.
get_all_hosed(P) when is_record(P, projection_v1)-> get_all_hosed(P) when is_record(P, projection_v1)->
proplists:get_value(all_hosed, get_raw_flapping_i(P), []); proplists:get_value(all_hosed, get_raw_flapping_i(P), []).
get_all_hosed(S) when is_record(S, ch_mgr) ->
proplists:get_value(all_hosed,
proplists:get_value(flapping_i, S#ch_mgr.runenv, []),
[]).
merge_flap_counts(FlapCounts) -> merge_flap_counts(FlapCounts) ->
merge_flap_counts(FlapCounts, orddict:new()). merge_flap_counts(FlapCounts, orddict:new()).
@ -1529,7 +1600,7 @@ merge_flap_counts([], D) ->
merge_flap_counts([FlapCount|Rest], D1) -> merge_flap_counts([FlapCount|Rest], D1) ->
%% We know that FlapCount is list({Actor, {{_epk,FlapStartTime},NumFlaps}}). %% We know that FlapCount is list({Actor, {{_epk,FlapStartTime},NumFlaps}}).
D2 = orddict:from_list(FlapCount), D2 = orddict:from_list(FlapCount),
D2 = orddict:from_list(FlapCount), D2 = orddict:from_list(FlapCount),
%% If the FlapStartTimes are identical, then pick the bigger flap count. %% If the FlapStartTimes are identical, then pick the bigger flap count.
%% If the FlapStartTimes differ, then pick the larger start time tuple. %% If the FlapStartTimes differ, then pick the larger start time tuple.
D3 = orddict:merge(fun(_Key, {{_,T1}, NF1}= V1, {{_,T2}, NF2}=V2) D3 = orddict:merge(fun(_Key, {{_,T1}, NF1}= V1, {{_,T2}, NF2}=V2)

View file

@ -215,7 +215,7 @@ convergence_demo_testfun(NumFLUs) ->
[receive [receive
done -> done ->
ok ok
after 995000 -> after 120*1000 ->
exit(icky_timeout) exit(icky_timeout)
end || _ <- Pids] end || _ <- Pids]
end, end,
@ -273,14 +273,14 @@ convergence_demo_testfun(NumFLUs) ->
%% %% [{a,b},{b,d},{c,b}, {b,a},{a,b},{b,c},{c,b},{b,d},{d,b}], %% %% [{a,b},{b,d},{c,b}, {b,a},{a,b},{b,c},{c,b},{b,d},{d,b}],
%% [{a,b},{b,d},{c,b}, {c,a},{a,c},{c,b},{b,c},{c,d},{d,c}], %% [{a,b},{b,d},{c,b}, {c,a},{a,c},{c,b},{b,c},{c,d},{d,c}],
%% [{a,b},{b,d},{c,b}, {d,a},{a,d},{d,b},{b,d},{d,c},{c,d}] ] %% [{a,b},{b,d},{c,b}, {d,a},{a,d},{d,b},{b,d},{d,c},{c,d}] ]
end || Partition <- [ [{a,b}, {b,c}], %% end || Partition <- [ [{a,b}, {b,c}],
[{a,b}, {c,b}] ] %% [{a,b}, {c,b}] ]
%% end || Partition <- [ [{a,b}, {b,c}] ] %% hosed-not-equal @ 3 FLUs %% end || Partition <- [ [{a,b}, {b,c}] ] %% hosed-not-equal @ 3 FLUs
%% end || Partition <- [ [{b,d}] ] %% end || Partition <- [ [{b,d}] ]
%% end || Partition <- [ [{a,b}, {b,a}] ] %% end || Partition <- [ [{a,b}, {b,a}] ]
%% end || Partition <- [ [{a,b}, {b,a}, {a,c},{c,a}] ] %% end || Partition <- [ [{a,b}, {b,a}, {a,c},{c,a}] ]
%% end || Partition <- [ [{a,b}], end || Partition <- [ [{a,b}],
%% [{b,a}] ] [{b,a}] ]
%% end || Partition <- [ [{a,b}, {c,b}], %% end || Partition <- [ [{a,b}, {c,b}],
%% [{a,b}, {b,c}] ] %% [{a,b}, {b,c}] ]
%% end || Partition <- [ [{a,b}, {b,c}, {c,d}], %% end || Partition <- [ [{a,b}, {b,c}, {c,d}],