WIP, tests pass again, includign the newest one

This commit is contained in:
Scott Lystig Fritchie 2015-05-02 00:33:49 +09:00
parent 65993dfcb6
commit 1675020150
5 changed files with 105 additions and 54 deletions

View file

@ -37,6 +37,7 @@ func, and pattern match Erlang style in that func.
*** TODO Fixes the atom table leak *** TODO Fixes the atom table leak
*** TODO Fixes the problem of having active sequencer for the same prefix *** TODO Fixes the problem of having active sequencer for the same prefix
on two FLUS in the same VM on two FLUS in the same VM
** TODO Eliminate the timeout exception for the client: just {error,timeout} ret
** TODO Change all protocol ops to enforce the epoch ID ** TODO Change all protocol ops to enforce the epoch ID
** TODO Add projection wedging logic to each FLU. ** TODO Add projection wedging logic to each FLU.

View file

@ -65,7 +65,8 @@
-define(REACT(T), put(react, [T|get(react)])). -define(REACT(T), put(react, [T|get(react)])).
%% API %% API
-export([start_link/2, start_link/3, stop/1, ping/1]). -export([start_link/2, start_link/3, stop/1, ping/1,
set_chain_members/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]). terminate/2, code_change/3]).
@ -104,6 +105,9 @@ stop(Pid) ->
ping(Pid) -> ping(Pid) ->
gen_server:call(Pid, {ping}, infinity). gen_server:call(Pid, {ping}, infinity).
set_chain_members(Pid, MembersDict) ->
gen_server:call(Pid, {set_chain_members, MembersDict}, infinity).
-ifdef(TEST). -ifdef(TEST).
%% Test/debugging code only. %% Test/debugging code only.
@ -150,6 +154,7 @@ test_react_to_env(Pid) ->
%% local projection store. %% local projection store.
init({MyName, InitMembersDict, MgrOpts}) -> init({MyName, InitMembersDict, MgrOpts}) ->
init_remember_partition_hack(),
ZeroAll_list = [P#p_srvr.name || {_,P} <- orddict:to_list(InitMembersDict)], ZeroAll_list = [P#p_srvr.name || {_,P} <- orddict:to_list(InitMembersDict)],
ZeroProj = make_none_projection(MyName, ZeroAll_list, InitMembersDict), ZeroProj = make_none_projection(MyName, ZeroAll_list, InitMembersDict),
ok = store_zeroth_projection_maybe(ZeroProj, MgrOpts), ok = store_zeroth_projection_maybe(ZeroProj, MgrOpts),
@ -163,35 +168,30 @@ init({MyName, InitMembersDict, MgrOpts}) ->
{network_islands, Opt(network_islands, [])}, {network_islands, Opt(network_islands, [])},
{flapping_i, Opt(flapping, [])}, {flapping_i, Opt(flapping, [])},
{up_nodes, Opt(up_nodes, not_init_yet)}], {up_nodes, Opt(up_nodes, not_init_yet)}],
ActiveP = Opt(active_mode, true), ActiveP = Opt(active_mode, false),
NoneProj = make_none_projection(MyName, All_list, MembersDict),
Proxies = orddict:fold(
fun(K, P, Acc) ->
{ok, Pid} = ?FLU_PC:start_link(P),
[{K, Pid}|Acc]
end, [], MembersDict),
S = #ch_mgr{name=MyName, S = #ch_mgr{name=MyName,
%% TODO 2015-03-04: revisit, should this constant be bigger? %% TODO 2015-03-04: revisit, should this constant be bigger?
%% Yes, this should be bigger, but it's a hack. There is %% Yes, this should be bigger, but it's a hack. There is
%% no guarantee that all parties will advance to a minimum %% no guarantee that all parties will advance to a minimum
%% flap awareness in the amount of time that this mgr will. %% flap awareness in the amount of time that this mgr will.
flap_limit=length(All_list) + 50, flap_limit=length(All_list) + 50,
proj=NoneProj,
timer='undefined', timer='undefined',
proj_history=queue:new(), proj_history=queue:new(),
runenv=RunEnv, runenv=RunEnv,
opts=MgrOpts, opts=MgrOpts},
members_dict=MembersDict, {_, S2} = do_set_chain_members(MembersDict, S),
proxies_dict=orddict:from_list(Proxies)}, S3 = if ActiveP == false ->
S2 = if ActiveP == false -> S2;
S;
ActiveP == true -> ActiveP == true ->
set_active_timer(S) set_active_timer(S2)
end, end,
{ok, S2}. {ok, S3}.
handle_call({ping}, _From, S) -> handle_call({ping}, _From, S) ->
{reply, pong, S}; {reply, pong, S};
handle_call({set_chain_members, MembersDict}, _From, S) ->
{Reply, S2} = do_set_chain_members(MembersDict, S),
{reply, Reply, S2};
handle_call({stop}, _From, S) -> handle_call({stop}, _From, S) ->
{stop, normal, ok, S}; {stop, normal, ok, S};
handle_call({test_calc_projection, KeepRunenvP}, _From, handle_call({test_calc_projection, KeepRunenvP}, _From,
@ -311,7 +311,7 @@ cl_write_public_proj_local(Epoch, Proj, SkipLocalWriteErrorP,
Else when SkipLocalWriteErrorP -> Else when SkipLocalWriteErrorP ->
{XX, SS} = Continue(), {XX, SS} = Continue(),
{{local_write_result, Else, XX}, SS}; {{local_write_result, Else, XX}, SS};
Else when Else == error_written; Else == timeout; Else == t_timeout -> Else ->
{Else, S2} {Else, S2}
end. end.
@ -350,13 +350,16 @@ read_latest_projection_call_only(ProjectionType, AllHosed,
{_UpNodes, Partitions, S2} = calc_up_nodes(S), {_UpNodes, Partitions, S2} = calc_up_nodes(S),
DoIt = fun(Pid) -> DoIt = fun(Pid) ->
case ?FLU_PC:read_latest_projection(Pid, ProjectionType, ?TO) of case (?FLU_PC:read_latest_projection(Pid, ProjectionType, ?TO)) of
{ok, P} -> P; {ok, P} -> P;
Else -> Else Else -> Else
end end
end, end,
%% io:format(user, "All_queried_list ~p\n", [All_queried_list]),
Rs = [perhaps_call_t(S, Partitions, FLU, fun(Pid) -> DoIt(Pid) end) || Rs = [perhaps_call_t(S, Partitions, FLU, fun(Pid) -> DoIt(Pid) end) ||
FLU <- All_queried_list], FLU <- All_queried_list],
%% Rs = [perhaps_call_t(S, Partitions, FLU, fun(Pid) -> DoIt(Pid) end) ||
%% FLU <- All_queried_list],
FLUsRs = lists:zip(All_queried_list, Rs), FLUsRs = lists:zip(All_queried_list, Rs),
{All_queried_list, FLUsRs, S2}. {All_queried_list, FLUsRs, S2}.
@ -555,15 +558,17 @@ calc_up_nodes(#ch_mgr{name=MyName, proj=Proj, runenv=RunEnv1}=S) ->
{UpNodes, Partitions, S#ch_mgr{runenv=RunEnv2}}. {UpNodes, Partitions, S#ch_mgr{runenv=RunEnv2}}.
calc_up_nodes(MyName, AllMembers, RunEnv1) -> calc_up_nodes(MyName, AllMembers, RunEnv1) ->
{Partitions2, Islands2} =
case proplists:get_value(use_partition_simulator, RunEnv1) of case proplists:get_value(use_partition_simulator, RunEnv1) of
true -> true ->
machi_partition_simulator:get(AllMembers); calc_up_nodes_sim(MyName, AllMembers, RunEnv1);
false -> false ->
{[], [AllMembers]} {AllMembers -- get(remember_partition_hack), [], RunEnv1}
end, end.
catch ?REACT({partitions,Partitions2}),
catch ?REACT({islands,Islands2}), calc_up_nodes_sim(MyName, AllMembers, RunEnv1) ->
{Partitions2, Islands2} = machi_partition_simulator:get(AllMembers),
catch ?REACT({calc_up_nodes,?LINE,[{partitions,Partitions2},
{islands,Islands2}]}),
UpNodes = lists:sort( UpNodes = lists:sort(
[Node || Node <- AllMembers, [Node || Node <- AllMembers,
not lists:member({MyName, Node}, Partitions2), not lists:member({MyName, Node}, Partitions2),
@ -633,6 +638,23 @@ rank_projection(#projection_v1{author_server=Author,
( N * length(Repairing_list)) + ( N * length(Repairing_list)) +
(N*N * length(UPI_list)). (N*N * length(UPI_list)).
do_set_chain_members(MembersDict,
#ch_mgr{name=MyName, proxies_dict=OldProxiesDict}=S) ->
catch orddict:fold(
fun(_K, Pid, _Acc) ->
_ = (catch ?FLU_PC:quit(Pid))
end, [], OldProxiesDict),
All_list = [P#p_srvr.name || {_, P} <- orddict:to_list(MembersDict)],
NoneProj = make_none_projection(MyName, All_list, MembersDict),
Proxies = orddict:fold(
fun(K, P, Acc) ->
{ok, Pid} = ?FLU_PC:start_link(P),
[{K, Pid}|Acc]
end, [], MembersDict),
{ok, S#ch_mgr{proj=NoneProj,
members_dict=MembersDict,
proxies_dict=orddict:from_list(Proxies)}}.
do_react_to_env(#ch_mgr{proj=#projection_v1{members_dict=[]}}=S) -> do_react_to_env(#ch_mgr{proj=#projection_v1{members_dict=[]}}=S) ->
{empty_members_dict, S}; {empty_members_dict, S};
do_react_to_env(S) -> do_react_to_env(S) ->
@ -645,15 +667,15 @@ react_to_env_A10(S) ->
react_to_env_A20(Retries, S) -> react_to_env_A20(Retries, S) ->
?REACT(a20), ?REACT(a20),
init_remember_partition_hack(),
{UnanimousTag, P_latest, ReadExtra, S2} = {UnanimousTag, P_latest, ReadExtra, S2} =
do_cl_read_latest_public_projection(true, S), do_cl_read_latest_public_projection(true, S),
%% The UnanimousTag isn't quite sufficient for our needs. We need %% The UnanimousTag isn't quite sufficient for our needs. We need
%% to determine if *all* of the UPI+Repairing FLUs are members of %% to determine if *all* of the UPI+Repairing FLUs are members of
%% the unanimous server replies. %% the unanimous server replies. All Repairing FLUs should be up
io:format(user, "\nReact ~P\n", [lists:reverse(get(react)), 10]), %% now (because if they aren't then they cannot be repairing), so
io:format(user, "\nReadExtra ~p\n", [ReadExtra]), %% all Repairing FLUs have no non-race excuse not to be in UnanimousFLUs.
io:format(user, "\nP_latest ~p\n", [P_latest]),
UnanimousFLUs = lists:sort(proplists:get_value(unanimous_flus, ReadExtra)), UnanimousFLUs = lists:sort(proplists:get_value(unanimous_flus, ReadExtra)),
UPI_Repairing_FLUs = lists:sort(P_latest#projection_v1.upi ++ UPI_Repairing_FLUs = lists:sort(P_latest#projection_v1.upi ++
P_latest#projection_v1.repairing), P_latest#projection_v1.repairing),
@ -687,9 +709,6 @@ react_to_env_A30(Retries, P_latest, LatestUnanimousP, _ReadExtra,
?REACT({a30, ?LINE, [{newprop1, machi_projection:make_summary(P_newprop1)}]}), ?REACT({a30, ?LINE, [{newprop1, machi_projection:make_summary(P_newprop1)}]}),
%% Are we flapping yet? %% Are we flapping yet?
io:format(user, "React 2 ~P\n", [lists:reverse(get(react)), 109999]),
io:format(user, "NewProp1 ~p\n", [P_newprop1]),
io:format(user, "Current ~p\n", [P_current]),
{P_newprop2, S3} = calculate_flaps(P_newprop1, P_current, FlapLimit, S2), {P_newprop2, S3} = calculate_flaps(P_newprop1, P_current, FlapLimit, S2),
%% Move the epoch number up ... originally done in C300. %% Move the epoch number up ... originally done in C300.
@ -1166,7 +1185,10 @@ react_to_env_C110(P_latest, #ch_mgr{name=MyName} = S) ->
P_latest2 = machi_projection:update_dbg2(P_latest, Extra_todo), P_latest2 = machi_projection:update_dbg2(P_latest, Extra_todo),
MyNamePid = proxy_pid(MyName, S), MyNamePid = proxy_pid(MyName, S),
ok = ?FLU_PC:write_projection(MyNamePid, private, P_latest2, ?TO), %% This is the local projection store. Use a larger timeout, so
%% that things locally are pretty horrible if we're killed by a
%% timeout exception.
ok = ?FLU_PC:write_projection(MyNamePid, private, P_latest2, ?TO*30),
case proplists:get_value(private_write_verbose, S#ch_mgr.opts) of case proplists:get_value(private_write_verbose, S#ch_mgr.opts) of
true -> true ->
{_,_,C} = os:timestamp(), {_,_,C} = os:timestamp(),
@ -1262,7 +1284,7 @@ calculate_flaps(P_newprop, _P_current, _FlapLimit,
P#projection_v1.repairing, P#projection_v1.repairing,
P#projection_v1.down} || P <- Ps]), P#projection_v1.down} || P <- Ps]),
QQQ = _QQQ =
{_WhateverUnanimous, BestP, Props, _S} = {_WhateverUnanimous, BestP, Props, _S} =
cl_read_latest_projection(private, S), cl_read_latest_projection(private, S),
NotBestPs = proplists:get_value(not_unanimous_answers, Props, []), NotBestPs = proplists:get_value(not_unanimous_answers, Props, []),
@ -1708,8 +1730,6 @@ merge_flap_counts([FlapCount|Rest], D1) ->
end, D1, D2), end, D1, D2),
merge_flap_counts(Rest, D3). merge_flap_counts(Rest, D3).
%% proxy_pid(Name, #ch_mgr{proxies_dict=[]}) ->
%% throw(empty_proxies_dict);
proxy_pid(Name, #ch_mgr{proxies_dict=ProxiesDict}) -> proxy_pid(Name, #ch_mgr{proxies_dict=ProxiesDict}) ->
orddict:fetch(Name, ProxiesDict). orddict:fetch(Name, ProxiesDict).
@ -1748,16 +1768,23 @@ perhaps_call_t(S, Partitions, FLU, DoIt) ->
perhaps_call(S, Partitions, FLU, DoIt) perhaps_call(S, Partitions, FLU, DoIt)
catch catch
exit:timeout -> exit:timeout ->
t_timeout {error, partition};
exit:{timeout,_} ->
{error, partition}
end. end.
perhaps_call(#ch_mgr{name=MyName}=S, Partitions, FLU, DoIt) -> perhaps_call(#ch_mgr{name=MyName}=S, Partitions, FLU, DoIt) ->
ProxyPid = proxy_pid(FLU, S), ProxyPid = proxy_pid(FLU, S),
RemoteFLU_p = FLU /= MyName, RemoteFLU_p = FLU /= MyName,
try erase(bad_sock),
case RemoteFLU_p andalso lists:member({MyName, FLU}, Partitions) of case RemoteFLU_p andalso lists:member({MyName, FLU}, Partitions) of
false -> false ->
Res = DoIt(ProxyPid), Res = DoIt(ProxyPid),
if Res == {error, partition} ->
remember_partition_hack(FLU);
true ->
ok
end,
case RemoteFLU_p andalso lists:member({FLU, MyName}, Partitions) of case RemoteFLU_p andalso lists:member({FLU, MyName}, Partitions) of
false -> false ->
Res; Res;
@ -1768,10 +1795,12 @@ perhaps_call(#ch_mgr{name=MyName}=S, Partitions, FLU, DoIt) ->
_ -> _ ->
(catch put(react, [{timeout1,me,MyName,to,FLU,RemoteFLU_p,Partitions}|get(react)])), (catch put(react, [{timeout1,me,MyName,to,FLU,RemoteFLU_p,Partitions}|get(react)])),
exit(timeout) exit(timeout)
end
catch throw:empty_proxies_dict ->
asdflkjweoiasd
end. end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% init_remember_partition_hack() ->
put(remember_partition_hack, []).
remember_partition_hack(FLU) ->
put(remember_partition_hack, [FLU|get(remember_partition_hack)]).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

View file

@ -250,7 +250,7 @@ do_req(Req, S) ->
end end
end; end;
false -> false ->
{{error, not_connected}, S2} {{error, partition}, S2}
end. end.
make_req_fun({append_chunk, EpochID, Prefix, Chunk}, #state{sock=Sock}) -> make_req_fun({append_chunk, EpochID, Prefix, Chunk}, #state{sock=Sock}) ->

View file

@ -24,17 +24,37 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
smoke_test() -> -include("machi_projection.hrl").
[os:cmd("rm -rf " ++ X) || X <- ["./data.a", "./data.b", "/data.c"] ],
%% smoke_test2() will try repeatedly to make a TCP connection to ports
%% on localhost that have no listener.
%% If you use 'sysctl -w net.inet.icmp.icmplim=3' before running this
%% test, you'll get to exercise some timeout handling in
%% machi_chain_manager1:perhaps_call_t().
%% The default for net.inet.icmp.icmplim is 50.
smoke_test_() ->
{timeout, 5*60, fun() -> smoke_test2() end}.
smoke_test2() ->
Ps = [{a,#p_srvr{name=a, address="localhost", port=5555, props="./data.a"}},
{b,#p_srvr{name=b, address="localhost", port=5556, props="./data.b"}},
{c,#p_srvr{name=c, address="localhost", port=5557, props="./data.c"}}
],
[os:cmd("rm -rf " ++ P#p_srvr.props) || {_,P} <- Ps],
{ok, SupPid} = machi_flu_sup:start_link(), {ok, SupPid} = machi_flu_sup:start_link(),
try try
{ok, _} = machi_flu_psup:start_flu_package(a, 5555, "./data.a", [begin
[{active_mode,false}]), #p_srvr{name=Name, port=Port, props=Dir} = P,
{ok, _} = machi_flu_psup:start_flu_package(b, 5556, "./data.b", {ok, _} = machi_flu_psup:start_flu_package(Name, Port, Dir, [])
[{active_mode,false}]), end || {_,P} <- [hd(Ps)]],
{ok, _} = machi_flu_psup:start_flu_package(c, 5557, "./data.c", %% end || {_,P} <- Ps],
[{active_mode,false}]),
[begin
_QQ = machi_chain_manager1:test_react_to_env(a_chmgr),
ok
end || _ <- lists:seq(1,5)],
machi_chain_manager1:set_chain_members(a_chmgr, orddict:from_list(Ps)),
[begin [begin
_QQ = machi_chain_manager1:test_react_to_env(a_chmgr), _QQ = machi_chain_manager1:test_react_to_env(a_chmgr),
ok ok
@ -42,6 +62,7 @@ smoke_test() ->
ok ok
after after
exit(SupPid, normal), exit(SupPid, normal),
[os:cmd("rm -rf " ++ P#p_srvr.props) || {_,P} <- Ps],
machi_util:wait_for_death(SupPid, 100), machi_util:wait_for_death(SupPid, 100),
ok ok
end. end.

View file

@ -49,7 +49,7 @@ api_smoke_test() ->
{error,_} = ?MUT:append_chunk(Prox1, {error,_} = ?MUT:append_chunk(Prox1,
FakeEpoch, <<"prefix">>, <<"data">>, FakeEpoch, <<"prefix">>, <<"data">>,
infinity), infinity),
{error,not_connected} = ?MUT:append_chunk(Prox1, {error,partition} = ?MUT:append_chunk(Prox1,
FakeEpoch, <<"prefix">>, <<"data">>, FakeEpoch, <<"prefix">>, <<"data">>,
infinity), infinity),
%% Start the FLU again, we should be able to do stuff immediately %% Start the FLU again, we should be able to do stuff immediately