Finish chain manager restart & membership changing

This commit is contained in:
Scott Lystig Fritchie 2015-05-07 17:52:16 +09:00
parent aeb2e4ef9e
commit 517941aaaa
2 changed files with 52 additions and 20 deletions

View file

@ -30,6 +30,7 @@
proj :: projection(), proj :: projection(),
%% %%
timer :: 'undefined' | timer:tref(), timer :: 'undefined' | timer:tref(),
ignore_timer :: boolean(),
proj_history :: queue:queue(), proj_history :: queue:queue(),
flaps=0 :: integer(), flaps=0 :: integer(),
flap_start = ?NOT_FLAPPING flap_start = ?NOT_FLAPPING

View file

@ -180,7 +180,7 @@ init({MyName, InitMembersDict, MgrOpts}) ->
proj_history=queue:new(), proj_history=queue:new(),
runenv=RunEnv, runenv=RunEnv,
opts=MgrOpts}, opts=MgrOpts},
{_, S2} = do_set_chain_members(MembersDict, S), {_, S2} = do_set_chain_members_dict(MembersDict, S),
S3 = if ActiveP == false -> S3 = if ActiveP == false ->
S2; S2;
ActiveP == true -> ActiveP == true ->
@ -190,9 +190,33 @@ init({MyName, InitMembersDict, MgrOpts}) ->
handle_call({ping}, _From, S) -> handle_call({ping}, _From, S) ->
{reply, pong, S}; {reply, pong, S};
handle_call({set_chain_members, MembersDict}, _From, S) -> handle_call({set_chain_members, MembersDict}, _From,
{Reply, S2} = do_set_chain_members(MembersDict, S), #ch_mgr{name=MyName,
{reply, Reply, S2}; proj=#projection_v1{all_members=OldAll_list,
epoch_number=OldEpoch,
upi=OldUPI}=OldProj}=S) ->
{Reply, S2} = do_set_chain_members_dict(MembersDict, S),
%% TODO: should there be any additional sanity checks? Right now,
%% if someone does something bad, then do_react_to_env() will
%% crash, which will crash us, and we'll restart in a sane & old
%% config.
All_list = [P#p_srvr.name || {_, P} <- orddict:to_list(MembersDict)],
MissingInNew = OldAll_list -- All_list,
NewUPI = OldUPI -- MissingInNew,
NewDown = All_list -- NewUPI,
NewEpoch = OldEpoch + 1111,
NewProj = machi_projection:update_checksum(
OldProj#projection_v1{author_server=MyName,
creation_time=now(),
epoch_number=NewEpoch,
all_members=All_list,
upi=NewUPI,
repairing=[],
down=NewDown,
members_dict=MembersDict}),
S3 = S2#ch_mgr{proj=NewProj},
{_QQ, S4} = do_react_to_env(S3),
{reply, Reply, S4};
handle_call({set_active, Boolean}, _From, #ch_mgr{timer=TRef}=S) -> handle_call({set_active, Boolean}, _From, #ch_mgr{timer=TRef}=S) ->
case {Boolean, TRef} of case {Boolean, TRef} of
{true, undefined} -> {true, undefined} ->
@ -231,15 +255,24 @@ handle_cast(_Cast, S) ->
?D({cast_whaaaaaaaaaaa, _Cast}), ?D({cast_whaaaaaaaaaaa, _Cast}),
{noreply, S}. {noreply, S}.
handle_info(yo_yo_tick, S) -> handle_info(tick_check_environment, #ch_mgr{ignore_timer=true}=S) ->
{noreply, S};
handle_info(tick_check_environment, S) ->
{{_Delta, Props, _Epoch}, S2} = do_react_to_env(S), {{_Delta, Props, _Epoch}, S2} = do_react_to_env(S),
case proplists:get_value(throttle_seconds, Props) of case proplists:get_value(throttle_seconds, Props) of
N when is_integer(N), N > 0 -> N when is_integer(N), N > 0 ->
timer:sleep(N * 1000); %% We are flapping. Set ignore_timer=true and schedule a
%% reminder to stop ignoring. This slows down the rate of
%% flapping. If/when the yo:tell_author_yo() function in
%% state C200 is ever implemented, then it should be
%% implemented via the test_react_to_env style.
erlang:send_after(N*1000, self(), stop_ignoring_timer),
{noreply, S#ch_mgr{ignore_timer=true}};
_ -> _ ->
ok {noreply, S2}
end, end;
{noreply, S2}; handle_info(stop_ignoring_timer, S) ->
{noreply, S#ch_mgr{ignore_timer=false}};
handle_info(Msg, S) -> handle_info(Msg, S) ->
case get(todo_bummer) of undefined -> io:format("TODO: got ~p\n", [Msg]); case get(todo_bummer) of undefined -> io:format("TODO: got ~p\n", [Msg]);
_ -> ok _ -> ok
@ -286,8 +319,7 @@ store_zeroth_projection_maybe(ZeroProj, MgrOpts) ->
set_active_timer(#ch_mgr{name=MyName, members_dict=MembersDict}=S) -> set_active_timer(#ch_mgr{name=MyName, members_dict=MembersDict}=S) ->
FLU_list = [P#p_srvr.name || {_,P} <- orddict:to_list(MembersDict)], FLU_list = [P#p_srvr.name || {_,P} <- orddict:to_list(MembersDict)],
USec = calc_sleep_ranked_order(1000, 2000, MyName, FLU_list), USec = calc_sleep_ranked_order(1000, 2000, MyName, FLU_list),
io:format(user, "USec ~p for ~p (FLU_list ~p)\n", [USec, MyName, FLU_list]), {ok, TRef} = timer:send_interval(USec, tick_check_environment),
{ok, TRef} = timer:send_interval(USec, yo_yo_tick),
S#ch_mgr{timer=TRef}. S#ch_mgr{timer=TRef}.
do_cl_write_public_proj(Proj, S) -> do_cl_write_public_proj(Proj, S) ->
@ -487,7 +519,7 @@ calc_projection(_OldThreshold, _NoPartitionThreshold, LastProj,
{NewUPI_list3, Repairing_list3, RunEnv3} = {NewUPI_list3, Repairing_list3, RunEnv3} =
case {NewUp, Repairing_list2} of case {NewUp, Repairing_list2} of
{[], []} -> {[], []} ->
D_foo=[], D_foo=[],
{NewUPI_list, [], RunEnv2}; {NewUPI_list, [], RunEnv2};
{[], [H|T]} when RelativeToServer == hd(NewUPI_list) -> {[], [H|T]} when RelativeToServer == hd(NewUPI_list) ->
%% The author is head of the UPI list. Let's see if %% The author is head of the UPI list. Let's see if
@ -501,14 +533,14 @@ D_foo=[],
tl(NewUPI_list) ++ Repairing_list2, tl(NewUPI_list) ++ Repairing_list2,
S#ch_mgr.proj, Partitions, S), S#ch_mgr.proj, Partitions, S),
if not SameEpoch_p -> if not SameEpoch_p ->
D_foo=[], D_foo=[],
{NewUPI_list, OldRepairing_list, RunEnv2}; {NewUPI_list, OldRepairing_list, RunEnv2};
true -> true ->
D_foo=[{repair_airquote_done, {we_agree, (S#ch_mgr.proj)#projection_v1.epoch_number}}], D_foo=[{repair_airquote_done, {we_agree, (S#ch_mgr.proj)#projection_v1.epoch_number}}],
{NewUPI_list ++ [H], T, RunEnv2} {NewUPI_list ++ [H], T, RunEnv2}
end; end;
{_, _} -> {_, _} ->
D_foo=[], D_foo=[],
{NewUPI_list, OldRepairing_list, RunEnv2} {NewUPI_list, OldRepairing_list, RunEnv2}
end, end,
Repairing_list4 = case NewUp of Repairing_list4 = case NewUp of
@ -587,7 +619,8 @@ calc_up_nodes_sim(MyName, AllMembers, RunEnv1) ->
{UpNodes, Partitions2, RunEnv2}. {UpNodes, Partitions2, RunEnv2}.
replace(PropList, Items) -> replace(PropList, Items) ->
proplists:compact(Items ++ PropList). Tmp = Items ++ PropList,
[{K, proplists:get_value(K, Tmp)} || K <- proplists:get_keys(Tmp)].
rank_and_sort_projections([], CurrentProj) -> rank_and_sort_projections([], CurrentProj) ->
rank_projections([CurrentProj], CurrentProj); rank_projections([CurrentProj], CurrentProj);
@ -645,13 +678,11 @@ rank_projection(#projection_v1{author_server=Author,
( N * length(Repairing_list)) + ( N * length(Repairing_list)) +
(N*N * length(UPI_list)). (N*N * length(UPI_list)).
do_set_chain_members(MembersDict, do_set_chain_members_dict(MembersDict, #ch_mgr{proxies_dict=OldProxiesDict}=S)->
#ch_mgr{name=MyName, proxies_dict=OldProxiesDict}=S) ->
catch orddict:fold( catch orddict:fold(
fun(_K, Pid, _Acc) -> fun(_K, Pid, _Acc) ->
_ = (catch ?FLU_PC:quit(Pid)) _ = (catch ?FLU_PC:quit(Pid))
end, [], OldProxiesDict), end, [], OldProxiesDict),
All_list = [P#p_srvr.name || {_, P} <- orddict:to_list(MembersDict)],
Proxies = orddict:fold( Proxies = orddict:fold(
fun(K, P, Acc) -> fun(K, P, Acc) ->
{ok, Pid} = ?FLU_PC:start_link(P), {ok, Pid} = ?FLU_PC:start_link(P),
@ -1196,7 +1227,7 @@ react_to_env_C110(P_latest, #ch_mgr{name=MyName} = S) ->
%% timeout exception. %% timeout exception.
%% ok = ?FLU_PC:write_projection(MyNamePid, private, P_latest2, ?TO*30), %% ok = ?FLU_PC:write_projection(MyNamePid, private, P_latest2, ?TO*30),
Goo = P_latest2#projection_v1.epoch_number, Goo = P_latest2#projection_v1.epoch_number,
io:format(user, "HEE110 ~w ~w ~w\n", [S#ch_mgr.name, self(), lists:reverse(get(react))]), %% io:format(user, "HEE110 ~w ~w ~w\n", [S#ch_mgr.name, self(), lists:reverse(get(react))]),
{ok,Goo} = {?FLU_PC:write_projection(MyNamePid, private, P_latest2, ?TO*30),Goo}, {ok,Goo} = {?FLU_PC:write_projection(MyNamePid, private, P_latest2, ?TO*30),Goo},
case proplists:get_value(private_write_verbose, S#ch_mgr.opts) of case proplists:get_value(private_write_verbose, S#ch_mgr.opts) of