Add & remove, mostly working (2 eunit tests broken)

This commit is contained in:
Scott Lystig Fritchie 2015-12-07 21:52:27 +09:00
parent 5aeaf872d9
commit 38e63e8181
4 changed files with 170 additions and 53 deletions

View file

@ -290,7 +290,7 @@ init({MyName, InitMembersDict, MgrOpts}) ->
last_down=[no_such_server_initial_value_only],
fitness_svr=machi_flu_psup:make_fitness_regname(MyName)
}, Proj),
{_, S2} = do_set_chain_members_dict(MembersDict, S),
S2 = do_set_chain_members_dict(MembersDict, S),
S3 = if ActiveP == false ->
S2;
ActiveP == true ->
@ -300,13 +300,17 @@ init({MyName, InitMembersDict, MgrOpts}) ->
handle_call({ping}, _From, S) ->
{reply, pong, S};
handle_call({set_chain_members, ChainName, OldEpoch, CMode,
handle_call({set_chain_members, SetChainName, SetOldEpoch, CMode,
MembersDict, Witness_list}, _From,
#ch_mgr{name=MyName,
proj=#projection_v1{all_members=OldAll_list,
epoch_number=OldEpoch,
chain_name=ChainName,
upi=OldUPI}=OldProj}=S) ->
{Reply, S2} = do_set_chain_members_dict(MembersDict, S),
true = (OldEpoch == 0) % in this case we want unconditional set of ch name
orelse
(SetOldEpoch == OldEpoch andalso SetChainName == ChainName),
S2 = do_set_chain_members_dict(MembersDict, S),
%% TODO: should there be any additional sanity checks? Right now,
%% if someone does something bad, then do_react_to_env() will
%% crash, which will crash us, and we'll restart in a sane & old
@ -335,7 +339,11 @@ handle_call({set_chain_members, ChainName, OldEpoch, CMode,
members_dict=MembersDict}),
S3 = set_proj(S2#ch_mgr{proj_history=queue:new(),
consistency_mode=CMode}, NewProj),
{_QQ, S4} = do_react_to_env(S3),
{Res, S4} = do_react_to_env(S3),
Reply = case Res of
{_,_,_} -> ok;
_ -> Res
end,
{reply, Reply, S4};
handle_call({set_active, Boolean}, _From, #ch_mgr{timer=TRef}=S) ->
case {Boolean, TRef} of
@ -367,8 +375,8 @@ handle_call({test_read_latest_public_projection, ReadRepairP}, _From, S) ->
{reply, Res, S2};
handle_call({trigger_react_to_env}=Call, _From, S) ->
gobble_calls(Call),
{TODOtodo, S2} = do_react_to_env(S),
{reply, TODOtodo, S2};
{Res, S2} = do_react_to_env(S),
{reply, Res, S2};
handle_call(_Call, _From, S) ->
io:format(user, "\nBad call to ~p: ~p\n", [S#ch_mgr.name, _Call]),
{reply, whaaaaaaaaaa, S}.
@ -545,6 +553,7 @@ cl_write_public_proj2(FLUs, Partitions, Epoch, Proj, IgnoreWrittenErrorP, S) ->
end
end, {true, []}, FLUs),
%% io:format(user, "\nWrite public ~w by ~w: ~w\n", [Epoch, S#ch_mgr.name, Rs]),
%% io:format(user, "mgr ~w epoch ~w Rs ~p\n", [S#ch_mgr.name, Epoch, Rs]),
{{remote_write_results, Rs}, S}.
do_cl_read_latest_public_projection(ReadRepairP,
@ -566,12 +575,41 @@ do_cl_read_latest_public_projection(ReadRepairP,
read_latest_projection_call_only(ProjectionType, AllHosed,
#ch_mgr{proj=CurrentProj}=S) ->
#projection_v1{all_members=All_list} = CurrentProj,
All_queried_list = All_list -- AllHosed,
All_queried_list = lists:sort(All_list -- AllHosed),
read_latest_projection_call_only1(ProjectionType, AllHosed,
All_queried_list, S).
{Rs, S2} = read_latest_projection_call_only2(ProjectionType,
All_queried_list, S),
FLUsRs = lists:zip(All_queried_list, Rs),
{All_queried_list, FLUsRs, S2}.
read_latest_projection_call_only1(ProjectionType, AllHosed,
All_queried_list, S) ->
{Rs_tmp, S2} = read_latest_projection_call_only2(ProjectionType,
All_queried_list, S),
New_all_maybe =
lists:usort(
lists:flatten(
[A_l || #projection_v1{all_members=A_l} <- Rs_tmp])) -- AllHosed,
case New_all_maybe -- All_queried_list of
[] ->
FLUsRs = lists:zip(All_queried_list, Rs_tmp),
{All_queried_list, FLUsRs, S2};
[AnotherFLU|_] ->
%% Stop AnotherFLU proxy, in unexpected case where it's open
try
Proxy = proxy_pid(AnotherFLU, S2),
?FLU_PC:stop_proxies([Proxy])
catch _:_ -> ok
end,
MD = orddict:from_list(
lists:usort(
lists:flatten(
[orddict:to_list(D) || #projection_v1{members_dict=D} <- Rs_tmp]))),
Another_P_srvr = orddict:fetch(AnotherFLU, MD),
{ok, Proxy2} = ?FLU_PC:start_link(Another_P_srvr),
S3 = S2#ch_mgr{proxies_dict=orddict:store(AnotherFLU, Proxy2,
S2#ch_mgr.proxies_dict)},
read_latest_projection_call_only1(
ProjectionType, AllHosed,
lists:usort([AnotherFLU|All_queried_list]), S3)
end.
read_latest_projection_call_only2(ProjectionType, All_queried_list, S) ->
{_UpNodes, Partitions, S2} = calc_up_nodes(S),
@ -621,7 +659,7 @@ rank_and_sort_projections_with_extra(All_queried_list, FLUsRs, ProjectionType,
{bad_answer_flus, BadAnswerFLUs},
{bad_answers, BadAnswers},
{not_unanimous_answers, []}],
{not_unanimous, NoneProj, Extra2, S};
{not_unanimous, NoneProj2, Extra2, S};
ProjectionType == public, UnwrittenRs /= [] ->
{needs_repair, FLUsRs, [flarfus], S};
true ->
@ -1042,28 +1080,30 @@ rank_projection(#projection_v1{author_server=_Author,
do_set_chain_members_dict(MembersDict, #ch_mgr{proxies_dict=OldProxiesDict}=S)->
_ = ?FLU_PC:stop_proxies(OldProxiesDict),
ProxiesDict = ?FLU_PC:start_proxies(MembersDict),
{ok, S#ch_mgr{members_dict=MembersDict,
proxies_dict=ProxiesDict}}.
S#ch_mgr{members_dict=MembersDict,
proxies_dict=ProxiesDict}.
do_react_to_env(#ch_mgr{name=MyName,
proj=#projection_v1{epoch_number=Epoch,
members_dict=[]=OldDict}=OldProj,
opts=Opts}=S) ->
put(ttt, [?LINE]),
%% Read from our local *public* projection store. If some other
%% chain member has written something there, and if we are a
%% member of that chain, then we'll adopt that projection and then
%% start actively humming in that chain.
{NewMembersDict, NewProj} =
{NewMD, NewProj} =
get_my_public_proj_boot_info(Opts, OldDict, OldProj),
case orddict:is_key(MyName, NewMembersDict) of
case orddict:is_key(MyName, NewMD) of
false ->
{{empty_members_dict, [], Epoch}, S};
{{empty_members_dict1, [], Epoch}, S};
true ->
{_, S2} = do_set_chain_members_dict(NewMembersDict, S),
CMode = NewProj#projection_v1.mode,
{{empty_members_dict, [], Epoch},
set_proj(S2#ch_mgr{members_dict=NewMembersDict,
consistency_mode=CMode}, NewProj)}
S2 = do_set_chain_members_dict(NewMD, S),
{Reply, S3} = react_to_env_C110(NewProj,
S2#ch_mgr{members_dict=NewMD,
consistency_mode=CMode}),
{Reply, S3}
end;
do_react_to_env(S) ->
put(ttt, [?LINE]),
@ -1165,7 +1205,7 @@ react_to_env_A10(S) ->
?REACT(a10),
react_to_env_A20(0, poll_private_proj_is_upi_unanimous(S)).
react_to_env_A20(Retries, #ch_mgr{name=MyName}=S) ->
react_to_env_A20(Retries, #ch_mgr{name=MyName, proj=P_current}=S) ->
?REACT(a20),
init_remember_down_list(),
{UnanimousTag, P_latest, ReadExtra, S2} =
@ -1193,17 +1233,34 @@ react_to_env_A20(Retries, #ch_mgr{name=MyName}=S) ->
false when P_latest#projection_v1.epoch_number /= LastComplaint,
P_latest#projection_v1.all_members /= [] ->
put(rogue_server_epoch, P_latest#projection_v1.epoch_number),
error_logger:info_msg("Chain manager ~p found latest public "
"projection ~p has author ~p has a "
"members list ~p that does not include me.\n",
error_logger:info_msg("Chain manager ~w found latest public "
"projection ~w with author ~w has a "
"members list ~w that does not include me. "
"We assume this is a result of administrator "
"action and will thus wedge ourselves until "
"we are re-added to the chain or shutdown.\n",
[S#ch_mgr.name,
P_latest#projection_v1.epoch_number,
P_latest#projection_v1.author_server,
P_latest#projection_v1.all_members]);
P_latest#projection_v1.all_members]),
EpochID = machi_projection:make_epoch_id(P_current),
ProjStore = get_projection_store_pid_or_regname(S),
{ok, NotifyPid} = machi_projection_store:get_wedge_notify_pid(ProjStore),
_QQ = machi_flu1:update_wedge_state(NotifyPid, true, EpochID),
#projection_v1{epoch_number=Epoch,
chain_name=ChainName,
all_members=All_list,
witnesses=Witness_list,
members_dict=MembersDict} = P_current,
P_none0 = make_none_projection(Epoch,
MyName, All_list, Witness_list, MembersDict),
P_none = P_none0#projection_v1{chain_name=ChainName},
{{x,y,z,42,77}, S2#ch_mgr{proj=P_none}};
_ ->
ok
end,
react_to_env_A21(Retries, UnanimousTag, P_latest, ReadExtra, S2)
end.
react_to_env_A21(Retries, UnanimousTag, P_latest, ReadExtra, S) ->
%% The UnanimousTag isn't quite sufficient for our needs. We need
%% to determine if *all* of the UPI+Repairing FLUs are members of
%% the unanimous server replies. All Repairing FLUs should be up
@ -1248,7 +1305,7 @@ react_to_env_A20(Retries, #ch_mgr{name=MyName}=S) ->
true ->
exit({badbad, UnanimousTag})
end,
react_to_env_A29(Retries, P_latest, LatestUnanimousP, ReadExtra, S2).
react_to_env_A29(Retries, P_latest, LatestUnanimousP, ReadExtra, S).
react_to_env_A29(Retries, P_latest, LatestUnanimousP, _ReadExtra,
#ch_mgr{consistency_mode=CMode,
@ -1282,7 +1339,6 @@ react_to_env_A29(Retries, P_latest, LatestUnanimousP, _ReadExtra,
?REACT({a29, ?LINE,
[{zerf_backstop, true},
{zerf_in, machi_projection:make_summary(Zerf)}]}),
%% io:format(user, "zerf_in: A29: ~p: ~w\n\t~p\n", [MyName, machi_projection:make_summary(Zerf), get(yyy_hack)]),
#projection_v1{dbg=ZerfDbg} = Zerf,
Backstop = if Zerf#projection_v1.upi == [] ->
[];
@ -1406,13 +1462,20 @@ react_to_env_A40(Retries, P_newprop, P_latest, LatestUnanimousP,
%% we have a disagreement.
not ordsets:is_disjoint(P_latest_s, Down_s)
end,
AmExcludedFromLatestAll_p =
(not lists:member(MyName, P_latest#projection_v1.all_members)),
?REACT({a40, ?LINE,
[{latest_author, P_latest#projection_v1.author_server},
{am_excluded_from_latest_all_p, AmExcludedFromLatestAll_p},
{author_is_down_p, LatestAuthorDownP},
{rank_latest, Rank_latest},
{rank_newprop, Rank_newprop}]}),
if
AmExcludedFromLatestAll_p ->
?REACT({a40, ?LINE, []}),
react_to_env_A50(P_latest, [], S);
AmHosedP ->
ExpectedUPI = if CMode == cp_mode -> [];
CMode == ap_mode -> [MyName]
@ -1578,12 +1641,10 @@ react_to_env_A40(Retries, P_newprop, P_latest, LatestUnanimousP,
end,
if GoTo50_p ->
?REACT({a40, ?LINE, []}),
%% io:format(user, "CONFIRM debug question line ~w\n", [?LINE]),
FinalProps = [{throttle_seconds, 0}],
react_to_env_A50(P_latest, FinalProps, S);
true ->
?REACT({a40, ?LINE, []}),
io:format(user, "CONFIRM debug question line ~w\n", [?LINE]),
react_to_env_C300(P_newprop, P_latest, S)
end
end.
@ -1593,7 +1654,6 @@ react_to_env_A50(P_latest, FinalProps, #ch_mgr{proj=P_current}=S) ->
?REACT({a50, ?LINE, [{current_epoch, P_current#projection_v1.epoch_number},
{latest_epoch, P_latest#projection_v1.epoch_number},
{final_props, FinalProps}]}),
%% if S#ch_mgr.name == c -> io:format(user, "A50: ~w: ~p\n", [S#ch_mgr.name, get(react)]); true -> ok end,
V = case file:read_file("/tmp/moomoo."++atom_to_list(S#ch_mgr.name)) of {ok,_} -> true; _ -> false end,
if V -> io:format(user, "A50: ~w: ~p\n", [S#ch_mgr.name, get(react)]); true -> ok end,
{{no_change, FinalProps, P_current#projection_v1.epoch_number}, S}.
@ -2282,7 +2342,7 @@ projection_transition_is_sane_except_si_epoch(
true = lists:sort(Repairing_list2) == lists:usort(Repairing_list2),
%% Disjoint-ness
All_list1 = All_list2, % todo will probably change
%% %% %% %% %% %% %% %% All_list1 = All_list2, % todo will probably change
%% true = lists:sort(All_list2) == lists:sort(Down_list2 ++ UPI_list2 ++
%% Repairing_list2),
[] = [X || X <- Witness_list2, not lists:member(X, All_list2)],
@ -2387,8 +2447,7 @@ poll_private_proj_is_upi_unanimous_sleep(Count, #ch_mgr{runenv=RunEnv}=S) ->
S2
end.
poll_private_proj_is_upi_unanimous3(#ch_mgr{name=MyName, proj=P_current,
opts=MgrOpts} = S) ->
poll_private_proj_is_upi_unanimous3(#ch_mgr{name=MyName, proj=P_current} = S) ->
UPI = P_current#projection_v1.upi,
EpochID = machi_projection:make_epoch_id(P_current),
{Rs, S2} = read_latest_projection_call_only2(private, UPI, S),
@ -2421,12 +2480,7 @@ poll_private_proj_is_upi_unanimous3(#ch_mgr{name=MyName, proj=P_current,
Annotation = make_annotation(EpochID, Now),
NewDbg2 = [Annotation|P_currentFull#projection_v1.dbg2],
NewProj = P_currentFull#projection_v1{dbg2=NewDbg2},
ProjStore = case get_projection_store_regname(MgrOpts) of
undefined ->
machi_flu_psup:make_proj_supname(MyName);
PStr ->
PStr
end,
ProjStore = get_projection_store_pid_or_regname(S),
#projection_v1{epoch_number=_EpochRep,
epoch_csum= <<_CSumRep:4/binary,_/binary>>,
upi=_UPIRep,
@ -2446,8 +2500,6 @@ poll_private_proj_is_upi_unanimous3(#ch_mgr{name=MyName, proj=P_current,
S2
end;
_Else ->
%% io:format(user, "poll by ~w: want ~W got ~W\n",
%% [MyName, EpochID, 6, _Else, 8]),
S2
end.
@ -2837,7 +2889,6 @@ make_zerf2(OldEpochNum, Up, MajoritySize, MyName, AllMembers, OldWitness_list,
Proj2 = Proj#projection_v1{dbg2=[{make_zerf,Epoch},
{yyy_hack, get(yyy_hack)},
{up,Up},{maj,MajoritySize}]},
%% io:format(user, "ZERF ~w\n",[machi_projection:make_summary(Proj2)]),
Proj2
catch
throw:{zerf,no_common} ->
@ -2976,3 +3027,10 @@ get_unfit_list(FitnessServer) ->
[]
end.
get_projection_store_pid_or_regname(#ch_mgr{name=MyName, opts=MgrOpts}) ->
case get_projection_store_regname(MgrOpts) of
undefined ->
machi_flu_psup:make_proj_supname(MyName);
PStr ->
PStr
end.

View file

@ -174,6 +174,7 @@ make_summary(#projection_v1{epoch_number=EpochNum,
repairing=Repairing_list,
dbg=Dbg, dbg2=Dbg2}) ->
[{epoch,EpochNum}, {csum,_CSum4},
{all, _All_list},
{author,Author}, {mode,CMode},{witnesses, Witness_list},
{upi,UPI_list},{repair,Repairing_list},{down,Down_list}] ++
[{d,Dbg}, {d2,Dbg2}].

View file

@ -335,7 +335,8 @@ smoke1_test2() ->
nonunanimous_setup_and_fix_test() ->
machi_partition_simulator:start_link({1,2,3}, 100, 0),
TcpPort = 62877,
FluInfo = [{a,TcpPort+0,"./data.a"}, {b,TcpPort+1,"./data.b"}],
FluInfo = [{a,TcpPort+0,"./data.a"}, {b,TcpPort+1,"./data.b"},
{c,TcpPort+2,"./data.c"}],
P_s = [#p_srvr{name=Name, address="localhost", port=Port} ||
{Name,Port,_Dir} <- FluInfo],
@ -345,10 +346,11 @@ nonunanimous_setup_and_fix_test() ->
%% {ok, Mb} = ?MGR:start_link(b, MembersDict, [{active_mode, false}]++XX),
[{ok,_}=machi_flu_psup:start_flu_package(Name, Port, Dir, Opts) ||
{Name,Port,Dir} <- FluInfo],
[Proxy_a, Proxy_b] = Proxies =
Proxies = [Proxy_a, Proxy_b, Proxy_c] =
[element(2,?FLU_PC:start_link(P)) || P <- P_s],
MembersDict = machi_projection:make_members_dict(P_s),
[Ma,Mb] = [a_chmgr, b_chmgr],
%% MembersDict = machi_projection:make_members_dict(P_s),
MembersDict = machi_projection:make_members_dict(lists:sublist(P_s, 2)),
Mgrs = [Ma,Mb,Mc] = [a_chmgr, b_chmgr, c_chmgr],
ok = machi_chain_manager1:set_chain_members(Ma, MembersDict),
ok = machi_chain_manager1:set_chain_members(Mb, MembersDict),
try
@ -387,10 +389,63 @@ nonunanimous_setup_and_fix_test() ->
{ok, P2pb} = ?FLU_PC:read_latest_projection(Proxy_b, private),
P2 = P2pb#projection_v1{dbg2=[]},
%% Pspam = machi_projection:update_checksum(
%% P1b#projection_v1{epoch_number=?SPAM_PROJ_EPOCH,
%% dbg=[hello_spam]}),
%% ok = ?FLU_PC:write_projection(Proxy_b, public, Pspam),
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
io:format(user, "\n+ Add a 3rd member to the chain.\n", []),
io:format(user, "\nNOTE: One INFO REPORT will follow.\n", []),
timer:sleep(50),
MembersDict3 = machi_projection:make_members_dict(P_s),
ok = machi_chain_manager1:set_chain_members(
Ma, ch_not_def_yet, EpochNum_a, ap_mode, MembersDict3, []),
[begin
{_rr1, _rr2, TheEpoch} = ?MGR:trigger_react_to_env(Mgr),
ok
end || _ <- lists:seq(1, 7),
{Mgr,Proxy} <- [{Ma, Proxy_a}, {Mb, Proxy_b}, {Mc, Proxy_c}]],
{_, _, TheEpoch_3} = ?MGR:trigger_react_to_env(Ma),
{_, _, TheEpoch_3} = ?MGR:trigger_react_to_env(Mb),
{_, _, TheEpoch_3} = ?MGR:trigger_react_to_env(Mc),
[{ok, #projection_v1{upi=[a,b], repairing=[c]}} =
?FLU_PC:read_latest_projection(Pxy, private) || Pxy <- Proxies],
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
io:format(user, "\n+ Remove 'a' from the chain.\n", []),
io:format(user, "\nNOTE: One INFO REPORT will follow.\n", []),
timer:sleep(50),
MembersDict4 = machi_projection:make_members_dict(tl(P_s)),
ok = machi_chain_manager1:set_chain_members(
Mb, ch_not_def_yet, TheEpoch_3, ap_mode, MembersDict4, []),
[begin
_ = ?MGR:trigger_react_to_env(Mgr),
ok
end || _ <- lists:seq(1, 7),
{Mgr,Proxy} <- [{Ma, Proxy_a}, {Mb, Proxy_b}, {Mc, Proxy_c}]],
%% {_, _, TheEpoch_4} = ?MGR:trigger_react_to_env(Ma),
{_, _, TheEpoch_4} = ?MGR:trigger_react_to_env(Mb),
{_, _, TheEpoch_4} = ?MGR:trigger_react_to_env(Mc),
[{ok, #projection_v1{upi=[b], repairing=[c]}} =
?FLU_PC:read_latest_projection(Pxy, private) || Pxy <- tl(Proxies)],
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
io:format(user, "\n+ Add a to the chain again.\n", []),
MembersDict5 = machi_projection:make_members_dict(P_s),
ok = machi_chain_manager1:set_chain_members(
Mb, ch_not_def_yet, TheEpoch_4, ap_mode, MembersDict5, []),
[begin
{_rr1, _rr2, TheEpoch} = ?MGR:trigger_react_to_env(Mgr),
ok
end || _ <- lists:seq(1, 7),
{Mgr,Proxy} <- [{Ma, Proxy_a}, {Mb, Proxy_b}, {Mc, Proxy_c}]],
{_, _, TheEpoch_5} = ?MGR:trigger_react_to_env(Ma),
{_, _, TheEpoch_5} = ?MGR:trigger_react_to_env(Mb),
{_, _, TheEpoch_5} = ?MGR:trigger_react_to_env(Mc),
[{ok, #projection_v1{upi=[b], repairing=[a,c]}} =
?FLU_PC:read_latest_projection(Pxy, private) || Pxy <- Proxies],
ok
after

View file

@ -64,11 +64,13 @@ smoke_test2() ->
PK = <<>>,
Prefix = <<"prefix">>,
Chunk1 = <<"Hello, chunk!">>,
io:format(user, "~s LINE ~p\n", [?MODULE, ?LINE]),
{ok, {Off1, Size1, File1}} =
?C:append_chunk(Clnt, PK, Prefix, Chunk1, none, 0),
true = is_binary(File1),
Chunk2 = "It's another chunk",
CSum2 = {client_sha, machi_util:checksum_chunk(Chunk2)},
io:format(user, "~s LINE ~p\n", [?MODULE, ?LINE]),
{ok, {Off2, Size2, File2}} =
?C:append_chunk(Clnt, PK, Prefix, Chunk2, CSum2, 1024),
Chunk3 = ["This is a ", <<"test,">>, 32, [["Hello, world!"]]],
@ -113,6 +115,7 @@ smoke_test2() ->
LargeBytes = binary:copy(<<"x">>, 1024*1024),
LBCsum = {client_sha, machi_util:checksum_chunk(LargeBytes)},
io:format(user, "~s LINE ~p\n", [?MODULE, ?LINE]),
{ok, {Offx, Sizex, Filex}} =
?C:append_chunk(Clnt, PK, Prefix, LargeBytes, LBCsum, 0),
ok = ?C:trim_chunk(Clnt, Filex, Offx, Sizex),