diff --git a/src/machi.proto b/src/machi.proto index 80a39e7..a3afde8 100644 --- a/src/machi.proto +++ b/src/machi.proto @@ -525,6 +525,11 @@ message Mpb_LL_ListAllProjectionsResp { repeated uint32 epochs = 2; } +// Low level API: kick_projection_reaction request, NO RESPONSE! + +message Mpb_LL_KickProjectionReactionReq { +} + ////////////////////////////////////////// // // Low API request & response wrapper @@ -552,6 +557,7 @@ message Mpb_LL_Request { optional Mpb_LL_WriteProjectionReq proj_wp = 15; optional Mpb_LL_GetAllProjectionsReq proj_ga = 16; optional Mpb_LL_ListAllProjectionsReq proj_la = 17; + optional Mpb_LL_KickProjectionReactionReq proj_kp = 18; optional Mpb_LL_AppendChunkReq append_chunk = 30; optional Mpb_LL_WriteChunkReq write_chunk = 31; @@ -585,6 +591,7 @@ message Mpb_LL_Response { optional Mpb_LL_WriteProjectionResp proj_wp = 15; optional Mpb_LL_GetAllProjectionsResp proj_ga = 16; optional Mpb_LL_ListAllProjectionsResp proj_la = 17; + // No reponse to Mpb_LL_KickProjectionReactionReq = 18; optional Mpb_LL_AppendChunkResp append_chunk = 30; optional Mpb_LL_WriteChunkResp write_chunk = 31; diff --git a/src/machi_chain_manager1.erl b/src/machi_chain_manager1.erl index eb163ce..4cd07c1 100644 --- a/src/machi_chain_manager1.erl +++ b/src/machi_chain_manager1.erl @@ -284,7 +284,8 @@ handle_call({test_read_latest_public_projection, ReadRepairP}, _From, S) -> do_cl_read_latest_public_projection(ReadRepairP, S), Res = {Perhaps, Val, ExtraInfo}, {reply, Res, S2}; -handle_call({test_react_to_env}, _From, S) -> +handle_call({test_react_to_env}=Call, _From, S) -> + gobble_calls(Call), {TODOtodo, S2} = do_react_to_env(S), {reply, TODOtodo, S2}; handle_call(_Call, _From, S) -> @@ -304,9 +305,7 @@ handle_info(tick_check_environment, S) -> N when is_integer(N), N > 0 -> %% We are flapping. Set ignore_timer=true and schedule a %% reminder to stop ignoring. This slows down the rate of - %% flapping. If/when the yo:tell_author_yo() function in - %% state C200 is ever implemented, then it should be - %% implemented via the test_react_to_env style. + %% flapping. erlang:send_after(N*1000, self(), stop_ignoring_timer), {noreply, S3#ch_mgr{ignore_timer=true}}; _ -> @@ -1349,13 +1348,10 @@ react_to_env_C110(P_latest, #ch_mgr{name=MyName} = S) -> P_latest2 = machi_projection:update_dbg2(P_latest, Extra_todo), MyNamePid = proxy_pid(MyName, S), + Goo = P_latest2#projection_v1.epoch_number, %% This is the local projection store. Use a larger timeout, so %% that things locally are pretty horrible if we're killed by a %% timeout exception. - %% ok = ?FLU_PC:write_projection(MyNamePid, private, P_latest2, ?TO*30), - Goo = P_latest2#projection_v1.epoch_number, - %% io:format(user, "HEE110 ~w ~w ~w\n", [S#ch_mgr.name, self(), lists:reverse(get(react))]), - {ok,Goo} = {?FLU_PC:write_projection(MyNamePid, private, P_latest2, ?TO*30),Goo}, case proplists:get_value(private_write_verbose, S#ch_mgr.opts) of true -> @@ -1412,13 +1408,11 @@ react_to_env_C120(P_latest, FinalProps, #ch_mgr{proj_history=H} = S) -> react_to_env_C200(Retries, P_latest, S) -> ?REACT(c200), try - %% TODO: This code works "well enough" without actually - %% telling anybody anything. Do we want to rip this out? - %% Actually implement it? None of the above? - yo:tell_author_yo(P_latest#projection_v1.author_server) + AuthorProxyPid = proxy_pid(P_latest#projection_v1.author_server, S), + ?FLU_PC:kick_projection_reaction(AuthorProxyPid, []) catch _Type:_Err -> - %% io:format(user, "TODO: tell_author_yo is broken: ~p ~p\n", - %% [_Type, _Err]), + io:format(user, "TODO: tell_author_yo error is probably ignorable: ~p ~p\n", + [_Type, _Err]), ok end, react_to_env_C210(Retries, S). @@ -2009,6 +2003,15 @@ make_chmgr_regname(A) when is_atom(A) -> make_chmgr_regname(B) when is_binary(B) -> list_to_atom(binary_to_list(B) ++ "_chmgr"). +gobble_calls(_StaticCall) -> + receive + {'$gen_call',From,{test_react_to_env}} -> + gen_server:reply(From, todo_overload), + gobble_calls(_StaticCall) + after 1 -> % after 0 angers pulse. + ok + end. + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% perhaps_start_repair( diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl index 998735c..573b617 100644 --- a/src/machi_flu1.erl +++ b/src/machi_flu1.erl @@ -261,7 +261,7 @@ append_server_loop(FluPid, #state{data_dir=DataDir, wedged=Wedged_p, %% this new world. Chmgr = machi_chain_manager1:make_chmgr_regname(FluName), spawn(fun() -> - catch machi_chain_manager:test_react_to_env(Chmgr) + catch machi_chain_manager1:test_react_to_env(Chmgr) end), append_server_loop(FluPid, S#state{wedged=true}); true -> @@ -295,14 +295,18 @@ net_server_loop(Sock, S) -> case machi_pb:decode_mpb_ll_request(Bin) of LL_req when LL_req#mpb_ll_request.do_not_alter == 2 -> {R, NewS} = do_pb_ll_request(LL_req, S), - {machi_pb:encode_mpb_ll_response(R), mode(low, NewS)}; + {maybe_encode_response(R), mode(low, NewS)}; _ -> HL_req = machi_pb:decode_mpb_request(Bin), 1 = HL_req#mpb_request.do_not_alter, {R, NewS} = do_pb_hl_request(HL_req, make_high_clnt(S)), {machi_pb:encode_mpb_response(R), mode(high, NewS)} end, - ok = gen_tcp:send(Sock, RespBin), + if RespBin == async_no_response -> + ok; + true -> + ok = gen_tcp:send(Sock, RespBin) + end, net_server_loop(Sock, S2); {error, SockError} -> Msg = io_lib:format("Socket error ~w", [SockError]), @@ -317,6 +321,11 @@ net_server_loop(Sock, S) -> exit(normal) end. +maybe_encode_response(async_no_response=X) -> + X; +maybe_encode_response(R) -> + machi_pb:encode_mpb_ll_response(R). + mode(Mode, #state{pb_mode=undefined}=S) -> S#state{pb_mode=Mode}; mode(_, S) -> @@ -452,7 +461,16 @@ do_server_proj_request({get_all_projections, ProjType}, machi_projection_store:get_all_projections(ProjStore, ProjType); do_server_proj_request({list_all_projections, ProjType}, #state{proj_store=ProjStore}) -> - machi_projection_store:list_all_projections(ProjStore, ProjType). + machi_projection_store:list_all_projections(ProjStore, ProjType); +do_server_proj_request({kick_projection_reaction}, + #state{flu_name=FluName}) -> + %% Tell my chain manager that it might want to react to + %% this new world. + Chmgr = machi_chain_manager1:make_chmgr_regname(FluName), + spawn(fun() -> + catch machi_chain_manager1:test_react_to_env(Chmgr) + end), + async_no_response. do_server_append_chunk(PKey, Prefix, Chunk, CSum_tag, CSum, ChunkExtra, S) -> diff --git a/src/machi_flu1_client.erl b/src/machi_flu1_client.erl index 9d3d4a7..d7a1e7e 100644 --- a/src/machi_flu1_client.erl +++ b/src/machi_flu1_client.erl @@ -63,6 +63,7 @@ write_projection/3, write_projection/4, get_all_projections/2, get_all_projections/3, list_all_projections/2, list_all_projections/3, + kick_projection_reaction/2, kick_projection_reaction/3, %% Common API echo/2, echo/3, @@ -374,6 +375,27 @@ list_all_projections(Host, TcpPort, ProjType) disconnect(Sock) end. +%% @doc Kick (politely) the remote chain manager to react to a +%% projection change. + +-spec kick_projection_reaction(port_wrap(), list()) -> + ok. +kick_projection_reaction(Sock, Options) -> + kick_projection_reaction2(Sock, Options). + +%% @doc Kick (politely) the remote chain manager to react to a +%% projection change. + +-spec kick_projection_reaction(machi_dt:inet_host(), machi_dt:inet_port(), list()) -> + ok. +kick_projection_reaction(Host, TcpPort, Options) -> + Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), + try + kick_projection_reaction2(Sock, Options) + after + disconnect(Sock) + end. + %% @doc Echo -- test protocol round-trip. -spec echo(port_wrap(), string()) -> @@ -604,19 +626,32 @@ list_all_projections2(Sock, ProjType) -> ReqID, {low_proj, {list_all_projections, ProjType}}), do_pb_request_common(Sock, ReqID, Req). +kick_projection_reaction2(Sock, _Options) -> + ReqID = <<42>>, + Req = machi_pb_translate:to_pb_request( + ReqID, {low_proj, {kick_projection_reaction}}), + do_pb_request_common(Sock, ReqID, Req, false). + do_pb_request_common(Sock, ReqID, Req) -> + do_pb_request_common(Sock, ReqID, Req, true). + +do_pb_request_common(Sock, ReqID, Req, GetReply_p) -> erase(bad_sock), try ReqBin = list_to_binary(machi_pb:encode_mpb_ll_request(Req)), ok = w_send(Sock, ReqBin), - case w_recv(Sock, 0) of - {ok, RespBin} -> - Resp = machi_pb:decode_mpb_ll_response(RespBin), - {ReqID2, Reply} = machi_pb_translate:from_pb_response(Resp), - true = (ReqID == ReqID2 orelse ReqID2 == <<>>), - Reply; - {error, _}=Err -> - throw(Err) + if GetReply_p -> + case w_recv(Sock, 0) of + {ok, RespBin} -> + Resp = machi_pb:decode_mpb_ll_response(RespBin), + {ReqID2, Reply} = machi_pb_translate:from_pb_response(Resp), + true = (ReqID == ReqID2 orelse ReqID2 == <<>>), + Reply; + {error, _}=Err -> + throw(Err) + end; + not GetReply_p -> + ok end catch throw:Error -> diff --git a/src/machi_pb_translate.erl b/src/machi_pb_translate.erl index 8e4ae21..ea98289 100644 --- a/src/machi_pb_translate.erl +++ b/src/machi_pb_translate.erl @@ -135,6 +135,10 @@ from_pb_request(#mpb_ll_request{ req_id=ReqID, proj_la=#mpb_ll_listallprojectionsreq{type=ProjType}}) -> {ReqID, {low_proj, {list_all_projections, conv_to_type(ProjType)}}}; +from_pb_request(#mpb_ll_request{ + req_id=ReqID, + proj_kp=#mpb_ll_kickprojectionreactionreq{}}) -> + {ReqID, {low_proj, {kick_projection_reaction}}}; %%qqq from_pb_request(#mpb_request{req_id=ReqID, echo=#mpb_echoreq{message=Msg}}) -> @@ -313,6 +317,7 @@ from_pb_response(#mpb_ll_response{ _ -> {ReqID< machi_pb_high_client:convert_general_status_code(Status)} end. +%% No response for proj_kp/kick_projection_reaction %% TODO: move the #mbp_* record making code from %% machi_pb_high_client:do_send_sync() clauses into to_pb_request(). @@ -403,9 +408,14 @@ to_pb_request(ReqID, {low_proj, {get_all_projections, ProjType}}) -> proj_ga=#mpb_ll_getallprojectionsreq{type=conv_from_type(ProjType)}}; to_pb_request(ReqID, {low_proj, {list_all_projections, ProjType}}) -> #mpb_ll_request{req_id=ReqID, do_not_alter=2, - proj_la=#mpb_ll_listallprojectionsreq{type=conv_from_type(ProjType)}}. + proj_la=#mpb_ll_listallprojectionsreq{type=conv_from_type(ProjType)}}; +to_pb_request(ReqID, {low_proj, {kick_projection_reaction}}) -> + #mpb_ll_request{req_id=ReqID, do_not_alter=2, + proj_kp=#mpb_ll_kickprojectionreactionreq{}}. %%qqq +to_pb_response(_ReqID, _, async_no_response=X) -> + X; to_pb_response(ReqID, _, {low_error, ErrCode, ErrMsg}) -> make_ll_error_resp(ReqID, ErrCode, ErrMsg); to_pb_response(ReqID, {low_echo, _BogusEpochID, _Msg}, Resp) -> @@ -567,6 +577,7 @@ to_pb_response(ReqID, {low_proj, {list_all_projections, _ProjType}}, Resp)-> proj_la=#mpb_ll_listallprojectionsresp{ status=Status}} end; +%% No response for {kick_projection_reaction}! %%qqq to_pb_response(ReqID, _, {high_error, ErrCode, ErrMsg}) -> make_error_resp(ReqID, ErrCode, ErrMsg); diff --git a/src/machi_proxy_flu1_client.erl b/src/machi_proxy_flu1_client.erl index 3d05f05..6657623 100644 --- a/src/machi_proxy_flu1_client.erl +++ b/src/machi_proxy_flu1_client.erl @@ -68,6 +68,7 @@ write_projection/3, write_projection/4, get_all_projections/2, get_all_projections/3, list_all_projections/2, list_all_projections/3, + kick_projection_reaction/2, kick_projection_reaction/3, %% Common API quit/1, @@ -243,6 +244,19 @@ list_all_projections(PidSpec, ProjType, Timeout) -> gen_server:call(PidSpec, {req, {list_all_projections, ProjType}}, Timeout). +%% @doc Kick (politely) the remote chain manager to react to a +%% projection change. + +kick_projection_reaction(PidSpec, Options) -> + kick_projection_reaction(PidSpec, Options, infinity). + +%% @doc Kick (politely) the remote chain manager to react to a +%% projection change. + +kick_projection_reaction(PidSpec, Options, Timeout) -> + gen_server:call(PidSpec, {req, {kick_projection_reaction, Options}}, + Timeout). + %% @doc Quit & close the connection to remote FLU and stop our %% proxy process. @@ -373,7 +387,10 @@ make_req_fun({get_all_projections, ProjType}, fun() -> Mod:get_all_projections(Sock, ProjType) end; make_req_fun({list_all_projections, ProjType}, #state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) -> - fun() -> Mod:list_all_projections(Sock, ProjType) end. + fun() -> Mod:list_all_projections(Sock, ProjType) end; +make_req_fun({kick_projection_reaction, Options}, + #state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) -> + fun() -> Mod:kick_projection_reaction(Sock, Options) end. connected_p(#state{sock=SockMaybe, i=#p_srvr{proto_mod=Mod}=_I}=_S) -> diff --git a/test/machi_chain_manager1_pulse.erl b/test/machi_chain_manager1_pulse.erl index a985b85..7607e47 100644 --- a/test/machi_chain_manager1_pulse.erl +++ b/test/machi_chain_manager1_pulse.erl @@ -82,7 +82,7 @@ command(S) -> { 1, {call, ?MODULE, change_partitions, [gen_old_threshold(), gen_no_partition_threshold()]}}, {50, {call, ?MODULE, do_ticks, - [choose(5, 200), S#state.pids, + [choose(5, 100), S#state.pids, gen_old_threshold(), gen_no_partition_threshold()]}} ]). @@ -280,7 +280,7 @@ dump_state() -> prop_pulse() -> ?FORALL({Cmds0, Seed}, {non_empty(commands(?MODULE)), pulse:seed()}, - ?IMPLIES(1 < length(Cmds0) andalso length(Cmds0) < 5, + ?IMPLIES(1 < length(Cmds0) andalso length(Cmds0) < 6, begin ok = shutdown_hard(), %% PULSE can be really unfair, of course, including having exec_ticks @@ -421,10 +421,15 @@ private_projections_are_stable_check(ProxiesDict, All_listE) -> %% also check for flapping, and if yes, to see if all_hosed are %% all exactly equal. - _ = exec_ticks(40, All_listE), + %% gobble_calls() workaround: many small exec_ticks() calls + + %% sleep after each. + [begin + _ = exec_ticks(10, All_listE), + timer:sleep(10) + end|| _ <- lists:seq(1, 40)], Private1 = [?FLU_PC:get_latest_epochid(Proxy, private) || {_FLU, Proxy} <- orddict:to_list(ProxiesDict)], - _ = exec_ticks(5, All_listE), + _ = exec_ticks(3*20, All_listE), Private2 = [?FLU_PC:get_latest_epochid(Proxy, private) || {_FLU, Proxy} <- orddict:to_list(ProxiesDict)], diff --git a/test/machi_chain_manager1_test.erl b/test/machi_chain_manager1_test.erl index c4d52d9..444ab37 100644 --- a/test/machi_chain_manager1_test.erl +++ b/test/machi_chain_manager1_test.erl @@ -238,16 +238,17 @@ nonunanimous_setup_and_fix_test() -> {ok, P2pa} = ?FLU_PC:read_latest_projection(Proxy_a, private), P2 = P2pa#projection_v1{dbg2=[]}, - %% FLUb should have nothing written to private because it hasn't - %% reacted yet. - {error, not_written} = ?FLU_PC:read_latest_projection(Proxy_b, private), + %% %% FLUb should have nothing written to private because it hasn't + %% %% reacted yet. + %% {error, not_written} = ?FLU_PC:read_latest_projection(Proxy_b, private), - %% Poke FLUb to react ... should be using the same private proj - %% as FLUa. - {now_using, _, EpochNum_a} = ?MGR:test_react_to_env(Mb), + %% %% Poke FLUb to react ... should be using the same private proj + %% %% as FLUa. + %% {now_using, _, EpochNum_a} = ?MGR:test_react_to_env(Mb), {ok, P2pb} = ?FLU_PC:read_latest_projection(Proxy_b, private), P2 = P2pb#projection_v1{dbg2=[]}, +timer:sleep(3000), ok after ok = ?MGR:stop(Ma), diff --git a/test/machi_proxy_flu1_client_test.erl b/test/machi_proxy_flu1_client_test.erl index 976b6ab..8550384 100644 --- a/test/machi_proxy_flu1_client_test.erl +++ b/test/machi_proxy_flu1_client_test.erl @@ -75,6 +75,11 @@ api_smoke_test() -> <<"foo-file">>, 99832, MyChunk_badcs), + %% Put kick_projection_reaction() in the middle of the test so + %% that any problems with its async nature will (hopefully) + %% cause problems later in the test. + ok = ?MUT:kick_projection_reaction(Prox1, []), + %% Alright, now for the rest of the API, whee BadFile = <<"no-such-file">>, {error, no_such_file} = ?MUT:checksum_list(Prox1, FakeEpoch, BadFile),