diff --git a/include/machi_chain_manager.hrl b/include/machi_chain_manager.hrl index 9382fa6..1fb4b5e 100644 --- a/include/machi_chain_manager.hrl +++ b/include/machi_chain_manager.hrl @@ -30,6 +30,7 @@ proj :: projection(), %% timer :: 'undefined' | timer:tref(), + ignore_timer :: boolean(), proj_history :: queue:queue(), flaps=0 :: integer(), flap_start = ?NOT_FLAPPING diff --git a/src/machi_chain_manager1.erl b/src/machi_chain_manager1.erl index 5cf9699..ef02f97 100644 --- a/src/machi_chain_manager1.erl +++ b/src/machi_chain_manager1.erl @@ -180,7 +180,7 @@ init({MyName, InitMembersDict, MgrOpts}) -> proj_history=queue:new(), runenv=RunEnv, opts=MgrOpts}, - {_, S2} = do_set_chain_members(MembersDict, S), + {_, S2} = do_set_chain_members_dict(MembersDict, S), S3 = if ActiveP == false -> S2; ActiveP == true -> @@ -190,9 +190,33 @@ init({MyName, InitMembersDict, MgrOpts}) -> handle_call({ping}, _From, S) -> {reply, pong, S}; -handle_call({set_chain_members, MembersDict}, _From, S) -> - {Reply, S2} = do_set_chain_members(MembersDict, S), - {reply, Reply, S2}; +handle_call({set_chain_members, MembersDict}, _From, + #ch_mgr{name=MyName, + proj=#projection_v1{all_members=OldAll_list, + epoch_number=OldEpoch, + upi=OldUPI}=OldProj}=S) -> + {Reply, S2} = do_set_chain_members_dict(MembersDict, S), + %% TODO: should there be any additional sanity checks? Right now, + %% if someone does something bad, then do_react_to_env() will + %% crash, which will crash us, and we'll restart in a sane & old + %% config. + All_list = [P#p_srvr.name || {_, P} <- orddict:to_list(MembersDict)], + MissingInNew = OldAll_list -- All_list, + NewUPI = OldUPI -- MissingInNew, + NewDown = All_list -- NewUPI, + NewEpoch = OldEpoch + 1111, + NewProj = machi_projection:update_checksum( + OldProj#projection_v1{author_server=MyName, + creation_time=now(), + epoch_number=NewEpoch, + all_members=All_list, + upi=NewUPI, + repairing=[], + down=NewDown, + members_dict=MembersDict}), + S3 = S2#ch_mgr{proj=NewProj}, + {_QQ, S4} = do_react_to_env(S3), + {reply, Reply, S4}; handle_call({set_active, Boolean}, _From, #ch_mgr{timer=TRef}=S) -> case {Boolean, TRef} of {true, undefined} -> @@ -231,15 +255,24 @@ handle_cast(_Cast, S) -> ?D({cast_whaaaaaaaaaaa, _Cast}), {noreply, S}. -handle_info(yo_yo_tick, S) -> +handle_info(tick_check_environment, #ch_mgr{ignore_timer=true}=S) -> + {noreply, S}; +handle_info(tick_check_environment, S) -> {{_Delta, Props, _Epoch}, S2} = do_react_to_env(S), case proplists:get_value(throttle_seconds, Props) of N when is_integer(N), N > 0 -> - timer:sleep(N * 1000); + %% We are flapping. Set ignore_timer=true and schedule a + %% reminder to stop ignoring. This slows down the rate of + %% flapping. If/when the yo:tell_author_yo() function in + %% state C200 is ever implemented, then it should be + %% implemented via the test_react_to_env style. + erlang:send_after(N*1000, self(), stop_ignoring_timer), + {noreply, S#ch_mgr{ignore_timer=true}}; _ -> - ok - end, - {noreply, S2}; + {noreply, S2} + end; +handle_info(stop_ignoring_timer, S) -> + {noreply, S#ch_mgr{ignore_timer=false}}; handle_info(Msg, S) -> case get(todo_bummer) of undefined -> io:format("TODO: got ~p\n", [Msg]); _ -> ok @@ -286,8 +319,7 @@ store_zeroth_projection_maybe(ZeroProj, MgrOpts) -> set_active_timer(#ch_mgr{name=MyName, members_dict=MembersDict}=S) -> FLU_list = [P#p_srvr.name || {_,P} <- orddict:to_list(MembersDict)], USec = calc_sleep_ranked_order(1000, 2000, MyName, FLU_list), -io:format(user, "USec ~p for ~p (FLU_list ~p)\n", [USec, MyName, FLU_list]), - {ok, TRef} = timer:send_interval(USec, yo_yo_tick), + {ok, TRef} = timer:send_interval(USec, tick_check_environment), S#ch_mgr{timer=TRef}. do_cl_write_public_proj(Proj, S) -> @@ -487,7 +519,7 @@ calc_projection(_OldThreshold, _NoPartitionThreshold, LastProj, {NewUPI_list3, Repairing_list3, RunEnv3} = case {NewUp, Repairing_list2} of {[], []} -> -D_foo=[], + D_foo=[], {NewUPI_list, [], RunEnv2}; {[], [H|T]} when RelativeToServer == hd(NewUPI_list) -> %% The author is head of the UPI list. Let's see if @@ -501,14 +533,14 @@ D_foo=[], tl(NewUPI_list) ++ Repairing_list2, S#ch_mgr.proj, Partitions, S), if not SameEpoch_p -> -D_foo=[], + D_foo=[], {NewUPI_list, OldRepairing_list, RunEnv2}; true -> -D_foo=[{repair_airquote_done, {we_agree, (S#ch_mgr.proj)#projection_v1.epoch_number}}], + D_foo=[{repair_airquote_done, {we_agree, (S#ch_mgr.proj)#projection_v1.epoch_number}}], {NewUPI_list ++ [H], T, RunEnv2} end; {_, _} -> -D_foo=[], + D_foo=[], {NewUPI_list, OldRepairing_list, RunEnv2} end, Repairing_list4 = case NewUp of @@ -587,7 +619,8 @@ calc_up_nodes_sim(MyName, AllMembers, RunEnv1) -> {UpNodes, Partitions2, RunEnv2}. replace(PropList, Items) -> - proplists:compact(Items ++ PropList). + Tmp = Items ++ PropList, + [{K, proplists:get_value(K, Tmp)} || K <- proplists:get_keys(Tmp)]. rank_and_sort_projections([], CurrentProj) -> rank_projections([CurrentProj], CurrentProj); @@ -645,13 +678,11 @@ rank_projection(#projection_v1{author_server=Author, ( N * length(Repairing_list)) + (N*N * length(UPI_list)). -do_set_chain_members(MembersDict, - #ch_mgr{name=MyName, proxies_dict=OldProxiesDict}=S) -> +do_set_chain_members_dict(MembersDict, #ch_mgr{proxies_dict=OldProxiesDict}=S)-> catch orddict:fold( fun(_K, Pid, _Acc) -> _ = (catch ?FLU_PC:quit(Pid)) end, [], OldProxiesDict), - All_list = [P#p_srvr.name || {_, P} <- orddict:to_list(MembersDict)], Proxies = orddict:fold( fun(K, P, Acc) -> {ok, Pid} = ?FLU_PC:start_link(P), @@ -1196,7 +1227,7 @@ react_to_env_C110(P_latest, #ch_mgr{name=MyName} = S) -> %% timeout exception. %% ok = ?FLU_PC:write_projection(MyNamePid, private, P_latest2, ?TO*30), Goo = P_latest2#projection_v1.epoch_number, - io:format(user, "HEE110 ~w ~w ~w\n", [S#ch_mgr.name, self(), lists:reverse(get(react))]), + %% io:format(user, "HEE110 ~w ~w ~w\n", [S#ch_mgr.name, self(), lists:reverse(get(react))]), {ok,Goo} = {?FLU_PC:write_projection(MyNamePid, private, P_latest2, ?TO*30),Goo}, case proplists:get_value(private_write_verbose, S#ch_mgr.opts) of