diff --git a/src/machi.proto b/src/machi.proto index 80a39e7..a3afde8 100644 --- a/src/machi.proto +++ b/src/machi.proto @@ -525,6 +525,11 @@ message Mpb_LL_ListAllProjectionsResp { repeated uint32 epochs = 2; } +// Low level API: kick_projection_reaction request, NO RESPONSE! + +message Mpb_LL_KickProjectionReactionReq { +} + ////////////////////////////////////////// // // Low API request & response wrapper @@ -552,6 +557,7 @@ message Mpb_LL_Request { optional Mpb_LL_WriteProjectionReq proj_wp = 15; optional Mpb_LL_GetAllProjectionsReq proj_ga = 16; optional Mpb_LL_ListAllProjectionsReq proj_la = 17; + optional Mpb_LL_KickProjectionReactionReq proj_kp = 18; optional Mpb_LL_AppendChunkReq append_chunk = 30; optional Mpb_LL_WriteChunkReq write_chunk = 31; @@ -585,6 +591,7 @@ message Mpb_LL_Response { optional Mpb_LL_WriteProjectionResp proj_wp = 15; optional Mpb_LL_GetAllProjectionsResp proj_ga = 16; optional Mpb_LL_ListAllProjectionsResp proj_la = 17; + // No reponse to Mpb_LL_KickProjectionReactionReq = 18; optional Mpb_LL_AppendChunkResp append_chunk = 30; optional Mpb_LL_WriteChunkResp write_chunk = 31; diff --git a/src/machi_chain_manager1.erl b/src/machi_chain_manager1.erl index eb163ce..274d02a 100644 --- a/src/machi_chain_manager1.erl +++ b/src/machi_chain_manager1.erl @@ -93,7 +93,8 @@ %% API -export([start_link/2, start_link/3, stop/1, ping/1, - set_chain_members/2, set_active/2]). + set_chain_members/2, set_active/2, + trigger_react_to_env/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -105,7 +106,6 @@ -export([test_calc_projection/2, test_write_public_projection/2, test_read_latest_public_projection/2, - test_react_to_env/1, get_all_hosed/1]). -ifdef(EQC). @@ -146,6 +146,9 @@ set_chain_members(Pid, MembersDict) -> set_active(Pid, Boolean) when Boolean == true; Boolean == false -> gen_server:call(Pid, {set_active, Boolean}, infinity). +trigger_react_to_env(Pid) -> + gen_server:call(Pid, {trigger_react_to_env}, infinity). + -ifdef(TEST). %% Test/debugging code only. @@ -163,9 +166,6 @@ test_read_latest_public_projection(Pid, ReadRepairP) -> gen_server:call(Pid, {test_read_latest_public_projection, ReadRepairP}, infinity). -test_react_to_env(Pid) -> - gen_server:call(Pid, {test_react_to_env}, infinity). - -endif. % TEST %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -284,7 +284,8 @@ handle_call({test_read_latest_public_projection, ReadRepairP}, _From, S) -> do_cl_read_latest_public_projection(ReadRepairP, S), Res = {Perhaps, Val, ExtraInfo}, {reply, Res, S2}; -handle_call({test_react_to_env}, _From, S) -> +handle_call({trigger_react_to_env}=Call, _From, S) -> + gobble_calls(Call), {TODOtodo, S2} = do_react_to_env(S), {reply, TODOtodo, S2}; handle_call(_Call, _From, S) -> @@ -304,9 +305,7 @@ handle_info(tick_check_environment, S) -> N when is_integer(N), N > 0 -> %% We are flapping. Set ignore_timer=true and schedule a %% reminder to stop ignoring. This slows down the rate of - %% flapping. If/when the yo:tell_author_yo() function in - %% state C200 is ever implemented, then it should be - %% implemented via the test_react_to_env style. + %% flapping. erlang:send_after(N*1000, self(), stop_ignoring_timer), {noreply, S3#ch_mgr{ignore_timer=true}}; _ -> @@ -1349,13 +1348,10 @@ react_to_env_C110(P_latest, #ch_mgr{name=MyName} = S) -> P_latest2 = machi_projection:update_dbg2(P_latest, Extra_todo), MyNamePid = proxy_pid(MyName, S), + Goo = P_latest2#projection_v1.epoch_number, %% This is the local projection store. Use a larger timeout, so %% that things locally are pretty horrible if we're killed by a %% timeout exception. - %% ok = ?FLU_PC:write_projection(MyNamePid, private, P_latest2, ?TO*30), - Goo = P_latest2#projection_v1.epoch_number, - %% io:format(user, "HEE110 ~w ~w ~w\n", [S#ch_mgr.name, self(), lists:reverse(get(react))]), - {ok,Goo} = {?FLU_PC:write_projection(MyNamePid, private, P_latest2, ?TO*30),Goo}, case proplists:get_value(private_write_verbose, S#ch_mgr.opts) of true -> @@ -1412,13 +1408,11 @@ react_to_env_C120(P_latest, FinalProps, #ch_mgr{proj_history=H} = S) -> react_to_env_C200(Retries, P_latest, S) -> ?REACT(c200), try - %% TODO: This code works "well enough" without actually - %% telling anybody anything. Do we want to rip this out? - %% Actually implement it? None of the above? - yo:tell_author_yo(P_latest#projection_v1.author_server) + AuthorProxyPid = proxy_pid(P_latest#projection_v1.author_server, S), + ?FLU_PC:kick_projection_reaction(AuthorProxyPid, []) catch _Type:_Err -> - %% io:format(user, "TODO: tell_author_yo is broken: ~p ~p\n", - %% [_Type, _Err]), + io:format(user, "TODO: tell_author_yo error is probably ignorable: ~p ~p\n", + [_Type, _Err]), ok end, react_to_env_C210(Retries, S). @@ -2009,6 +2003,15 @@ make_chmgr_regname(A) when is_atom(A) -> make_chmgr_regname(B) when is_binary(B) -> list_to_atom(binary_to_list(B) ++ "_chmgr"). +gobble_calls(StaticCall) -> + receive + {'$gen_call',From,{trigger_react_to_env}} -> + gen_server:reply(From, todo_overload), + gobble_calls(StaticCall) + after 1 -> % after 0 angers pulse. + ok + end. + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% perhaps_start_repair( diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl index ccf0f3a..5628282 100644 --- a/src/machi_flu1.erl +++ b/src/machi_flu1.erl @@ -62,13 +62,6 @@ %% the FLU keep track of the epoch number of the last file write (and %% perhaps last metadata write), as an optimization for inter-FLU data %% replication/chain repair. -%% -%% TODO Section 4.2 ("The Sequencer") says that the sequencer must -%% change its file assignments to new & unique names whenever we move -%% to wedge state. This is not yet implemented. In the current -%% Erlang process scheme (which will probably be changing soon), a -%% simple implementation would stop all existing processes that are -%% running run_seq_append_server(). -module(machi_flu1). @@ -216,9 +209,6 @@ start_append_server(S, AckPid) -> FluPid = self(), proc_lib:spawn_link(fun() -> run_append_server(FluPid, AckPid, S) end). -%% start_projection_server(S) -> -%% spawn_link(fun() -> run_projection_server(S) end). - run_listen_server(#state{flu_name=FluName, tcp_port=TcpPort}=S) -> register(make_listener_regname(FluName), self()), SockOpts = ?PB_PACKET_OPTS ++ @@ -250,22 +240,29 @@ listen_server_loop(LSock, S) -> spawn_link(fun() -> net_server_loop(Sock, S) end), listen_server_loop(LSock, S). -append_server_loop(FluPid, #state{data_dir=DataDir,wedged=Wedged_p, - epoch_id=OldEpochId}=S) -> +append_server_loop(FluPid, #state{data_dir=DataDir, wedged=Wedged_p, + epoch_id=OldEpochId, flu_name=FluName}=S) -> AppendServerPid = self(), receive - {seq_append, From, _Prefix, _Chunk, _CSum, _Extra} when Wedged_p -> + {seq_append, From, _Prefix, _Chunk, _CSum, _Extra, _EpochID} + when Wedged_p -> From ! wedged, append_server_loop(FluPid, S); - {seq_append, From, Prefix, Chunk, CSum, Extra} -> + {seq_append, From, Prefix, Chunk, CSum, Extra, EpochID} -> spawn(fun() -> append_server_dispatch(From, Prefix, - Chunk, CSum, Extra, + Chunk, CSum, Extra, EpochID, DataDir, AppendServerPid) end), append_server_loop(FluPid, S); {wedge_myself, WedgeEpochId} -> - if WedgeEpochId == OldEpochId -> + if not Wedged_p andalso WedgeEpochId == OldEpochId -> true = ets:insert(S#state.etstab, {epoch, {true, OldEpochId}}), + %% Tell my chain manager that it might want to react to + %% this new world. + Chmgr = machi_chain_manager1:make_chmgr_regname(FluName), + spawn(fun() -> + catch machi_chain_manager1:trigger_react_to_env(Chmgr) + end), append_server_loop(FluPid, S#state{wedged=true}); true -> append_server_loop(FluPid, S) @@ -298,14 +295,18 @@ net_server_loop(Sock, S) -> case machi_pb:decode_mpb_ll_request(Bin) of LL_req when LL_req#mpb_ll_request.do_not_alter == 2 -> {R, NewS} = do_pb_ll_request(LL_req, S), - {machi_pb:encode_mpb_ll_response(R), mode(low, NewS)}; + {maybe_encode_response(R), mode(low, NewS)}; _ -> HL_req = machi_pb:decode_mpb_request(Bin), 1 = HL_req#mpb_request.do_not_alter, {R, NewS} = do_pb_hl_request(HL_req, make_high_clnt(S)), {machi_pb:encode_mpb_response(R), mode(high, NewS)} end, - ok = gen_tcp:send(Sock, RespBin), + if RespBin == async_no_response -> + ok; + true -> + ok = gen_tcp:send(Sock, RespBin) + end, net_server_loop(Sock, S2); {error, SockError} -> Msg = io_lib:format("Socket error ~w", [SockError]), @@ -320,6 +321,11 @@ net_server_loop(Sock, S) -> exit(normal) end. +maybe_encode_response(async_no_response=X) -> + X; +maybe_encode_response(R) -> + machi_pb:encode_mpb_ll_response(R). + mode(Mode, #state{pb_mode=undefined}=S) -> S#state{pb_mode=Mode}; mode(_, S) -> @@ -359,7 +365,7 @@ do_pb_ll_request(PB_request, S) -> do_pb_ll_request2(EpochID, CMD, S) -> {Wedged_p, CurrentEpochID} = ets:lookup_element(S#state.etstab, epoch, 2), if Wedged_p == true -> - {{error, wedged}, S}; + {{error, wedged}, S#state{epoch_id=CurrentEpochID}}; is_tuple(EpochID) andalso EpochID /= CurrentEpochID -> @@ -370,13 +376,12 @@ do_pb_ll_request2(EpochID, CMD, S) -> true -> %% We're at same epoch # but different checksum, or %% we're at a newer/bigger epoch #. - io:format(user, "\n\nTODO/monitor: wedging myself!\n\n",[]), wedge_myself(S#state.flu_name, CurrentEpochID), ok end, - {{error, bad_epoch}, S}; + {{error, bad_epoch}, S#state{epoch_id=CurrentEpochID}}; true -> - do_pb_ll_request3(CMD, S) + do_pb_ll_request3(CMD, S#state{epoch_id=CurrentEpochID}) end. do_pb_ll_request3({low_echo, _BogusEpochID, Msg}, S) -> @@ -456,7 +461,16 @@ do_server_proj_request({get_all_projections, ProjType}, machi_projection_store:get_all_projections(ProjStore, ProjType); do_server_proj_request({list_all_projections, ProjType}, #state{proj_store=ProjStore}) -> - machi_projection_store:list_all_projections(ProjStore, ProjType). + machi_projection_store:list_all_projections(ProjStore, ProjType); +do_server_proj_request({kick_projection_reaction}, + #state{flu_name=FluName}) -> + %% Tell my chain manager that it might want to react to + %% this new world. + Chmgr = machi_chain_manager1:make_chmgr_regname(FluName), + spawn(fun() -> + catch machi_chain_manager1:trigger_react_to_env(Chmgr) + end), + async_no_response. do_server_append_chunk(PKey, Prefix, Chunk, CSum_tag, CSum, ChunkExtra, S) -> @@ -469,11 +483,13 @@ do_server_append_chunk(PKey, Prefix, Chunk, CSum_tag, CSum, end. do_server_append_chunk2(_PKey, Prefix, Chunk, CSum_tag, Client_CSum, - ChunkExtra, #state{flu_name=FluName}=_S) -> + ChunkExtra, #state{flu_name=FluName, + epoch_id=EpochID}=_S) -> %% TODO: Do anything with PKey? try TaggedCSum = check_or_make_tagged_checksum(CSum_tag, Client_CSum,Chunk), - FluName ! {seq_append, self(), Prefix, Chunk, TaggedCSum, ChunkExtra}, + FluName ! {seq_append, self(), Prefix, Chunk, TaggedCSum, + ChunkExtra, EpochID}, receive {assignment, Offset, File} -> Size = iolist_size(Chunk), @@ -664,9 +680,10 @@ do_server_trunc_hack(File, #state{data_dir=DataDir}=_S) -> {error, bad_arg} end. -append_server_dispatch(From, Prefix, Chunk, CSum, Extra, DataDir, LinkPid) -> - Pid = write_server_get_pid(Prefix, DataDir, LinkPid), - Pid ! {seq_append, From, Prefix, Chunk, CSum, Extra}, +append_server_dispatch(From, Prefix, Chunk, CSum, Extra, EpochID, + DataDir, LinkPid) -> + Pid = write_server_get_pid(Prefix, EpochID, DataDir, LinkPid), + Pid ! {seq_append, From, Prefix, Chunk, CSum, Extra, EpochID}, exit(normal). sanitize_file_string(Str) -> @@ -700,12 +717,12 @@ sync_checksum_file(File) -> end end. -write_server_get_pid(Prefix, DataDir, LinkPid) -> +write_server_get_pid(Prefix, EpochID, DataDir, LinkPid) -> case write_server_find_pid(Prefix) of undefined -> - start_seq_append_server(Prefix, DataDir, LinkPid), + start_seq_append_server(Prefix, EpochID, DataDir, LinkPid), timer:sleep(1), - write_server_get_pid(Prefix, DataDir, LinkPid); + write_server_get_pid(Prefix, EpochID, DataDir, LinkPid); Pid -> Pid end. @@ -714,7 +731,7 @@ write_server_find_pid(Prefix) -> FluName = machi_util:make_regname(Prefix), whereis(FluName). -start_seq_append_server(Prefix, DataDir, AppendServerPid) -> +start_seq_append_server(Prefix, EpochID, DataDir, AppendServerPid) -> proc_lib:spawn_link(fun() -> %% The following is only necessary to %% make nice process relationships in @@ -722,21 +739,20 @@ start_seq_append_server(Prefix, DataDir, AppendServerPid) -> put('$ancestors', [AppendServerPid]), put('$initial_call', {x,y,3}), link(AppendServerPid), - run_seq_append_server(Prefix, DataDir) + run_seq_append_server(Prefix, EpochID, DataDir) end). -run_seq_append_server(Prefix, DataDir) -> +run_seq_append_server(Prefix, EpochID, DataDir) -> true = register(machi_util:make_regname(Prefix), self()), - run_seq_append_server2(Prefix, DataDir). + run_seq_append_server2(Prefix, EpochID, DataDir). -run_seq_append_server2(Prefix, DataDir) -> +run_seq_append_server2(Prefix, EpochID, DataDir) -> FileNum = machi_util:read_max_filenum(DataDir, Prefix) + 1, case machi_util:increment_max_filenum(DataDir, Prefix) of ok -> - machi_util:increment_max_filenum(DataDir, Prefix), machi_util:info_msg("start: ~p server at file ~w\n", [Prefix, FileNum]), - seq_append_server_loop(DataDir, Prefix, FileNum); + seq_append_server_loop(DataDir, Prefix, EpochID, FileNum); Else -> error_logger:error_msg("start: ~p server at file ~w: ~p\n", [Prefix, FileNum, Else]), @@ -750,7 +766,7 @@ seq_name_hack() -> [element(3,now()), list_to_integer(os:getpid())])). -seq_append_server_loop(DataDir, Prefix, FileNum) -> +seq_append_server_loop(DataDir, Prefix, EpochID, FileNum) -> SequencerNameHack = seq_name_hack(), {File, FullPath} = machi_util:make_data_filename( DataDir, Prefix, SequencerNameHack, FileNum), @@ -759,19 +775,22 @@ seq_append_server_loop(DataDir, Prefix, FileNum) -> CSumPath = machi_util:make_checksum_filename( DataDir, Prefix, SequencerNameHack, FileNum), {ok, FHc} = file:open(CSumPath, [append, raw, binary]), - seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}, FileNum, + seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}, EpochID, FileNum, ?MINIMUM_OFFSET). -seq_append_server_loop(DataDir, Prefix, _File, {FHd,FHc}, FileNum, Offset) +seq_append_server_loop(DataDir, Prefix, _File, {FHd,FHc}, EpochID, + FileNum, Offset) when Offset > ?MAX_FILE_SIZE -> ok = file:close(FHd), ok = file:close(FHc), machi_util:info_msg("rollover: ~p server at file ~w offset ~w\n", [Prefix, FileNum, Offset]), - run_seq_append_server2(Prefix, DataDir); -seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}=FH_, FileNum, Offset) -> + run_seq_append_server2(Prefix, EpochID, DataDir); +seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}=FH_, EpochID, + FileNum, Offset) -> receive - {seq_append, From, Prefix, Chunk, TaggedCSum, Extra} -> + {seq_append, From, Prefix, Chunk, TaggedCSum, Extra, R_EpochID} + when R_EpochID == EpochID -> if Chunk /= <<>> -> ok = file:pwrite(FHd, Offset, Chunk); true -> @@ -781,12 +800,20 @@ seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}=FH_, FileNum, Offset) -> Size = iolist_size(Chunk), CSum_info = encode_csum_file_entry(Offset, Size, TaggedCSum), ok = file:write(FHc, CSum_info), - seq_append_server_loop(DataDir, Prefix, File, FH_, + seq_append_server_loop(DataDir, Prefix, File, FH_, EpochID, FileNum, Offset + Size + Extra); + {seq_append, _From, _Prefix, _Chunk, _TCSum, _Extra, R_EpochID}=MSG -> + %% Rare'ish event: send MSG to myself so it doesn't get lost + %% while we recurse around to pick up a new FileNum. + self() ! MSG, + machi_util:info_msg("rollover: ~p server at file ~w offset ~w " + "by new epoch_id ~W\n", + [Prefix, FileNum, Offset, R_EpochID, 8]), + run_seq_append_server2(Prefix, R_EpochID, DataDir); {sync_stuff, FromPid, Ref} -> file:sync(FHc), FromPid ! {sync_finished, Ref}, - seq_append_server_loop(DataDir, Prefix, File, FH_, + seq_append_server_loop(DataDir, Prefix, File, FH_, EpochID, FileNum, Offset) after 30*1000 -> ok = file:close(FHd), @@ -843,7 +870,7 @@ http_hack_server_put(Sock, G, FluName, MyURI) -> throw({bad_csum, XX}) end end, - FluName ! {seq_append, self(), MyURI, Chunk, CSum, 0} + FluName ! {seq_append, self(), MyURI, Chunk, CSum, 0, todo_epoch_id_bitrot} catch throw:{bad_csum, _CS} -> Out = "HTTP/1.0 412 Precondition failed\r\n" diff --git a/src/machi_flu1_client.erl b/src/machi_flu1_client.erl index 9d3d4a7..d7a1e7e 100644 --- a/src/machi_flu1_client.erl +++ b/src/machi_flu1_client.erl @@ -63,6 +63,7 @@ write_projection/3, write_projection/4, get_all_projections/2, get_all_projections/3, list_all_projections/2, list_all_projections/3, + kick_projection_reaction/2, kick_projection_reaction/3, %% Common API echo/2, echo/3, @@ -374,6 +375,27 @@ list_all_projections(Host, TcpPort, ProjType) disconnect(Sock) end. +%% @doc Kick (politely) the remote chain manager to react to a +%% projection change. + +-spec kick_projection_reaction(port_wrap(), list()) -> + ok. +kick_projection_reaction(Sock, Options) -> + kick_projection_reaction2(Sock, Options). + +%% @doc Kick (politely) the remote chain manager to react to a +%% projection change. + +-spec kick_projection_reaction(machi_dt:inet_host(), machi_dt:inet_port(), list()) -> + ok. +kick_projection_reaction(Host, TcpPort, Options) -> + Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), + try + kick_projection_reaction2(Sock, Options) + after + disconnect(Sock) + end. + %% @doc Echo -- test protocol round-trip. -spec echo(port_wrap(), string()) -> @@ -604,19 +626,32 @@ list_all_projections2(Sock, ProjType) -> ReqID, {low_proj, {list_all_projections, ProjType}}), do_pb_request_common(Sock, ReqID, Req). +kick_projection_reaction2(Sock, _Options) -> + ReqID = <<42>>, + Req = machi_pb_translate:to_pb_request( + ReqID, {low_proj, {kick_projection_reaction}}), + do_pb_request_common(Sock, ReqID, Req, false). + do_pb_request_common(Sock, ReqID, Req) -> + do_pb_request_common(Sock, ReqID, Req, true). + +do_pb_request_common(Sock, ReqID, Req, GetReply_p) -> erase(bad_sock), try ReqBin = list_to_binary(machi_pb:encode_mpb_ll_request(Req)), ok = w_send(Sock, ReqBin), - case w_recv(Sock, 0) of - {ok, RespBin} -> - Resp = machi_pb:decode_mpb_ll_response(RespBin), - {ReqID2, Reply} = machi_pb_translate:from_pb_response(Resp), - true = (ReqID == ReqID2 orelse ReqID2 == <<>>), - Reply; - {error, _}=Err -> - throw(Err) + if GetReply_p -> + case w_recv(Sock, 0) of + {ok, RespBin} -> + Resp = machi_pb:decode_mpb_ll_response(RespBin), + {ReqID2, Reply} = machi_pb_translate:from_pb_response(Resp), + true = (ReqID == ReqID2 orelse ReqID2 == <<>>), + Reply; + {error, _}=Err -> + throw(Err) + end; + not GetReply_p -> + ok end catch throw:Error -> diff --git a/src/machi_pb_translate.erl b/src/machi_pb_translate.erl index 8e4ae21..ea98289 100644 --- a/src/machi_pb_translate.erl +++ b/src/machi_pb_translate.erl @@ -135,6 +135,10 @@ from_pb_request(#mpb_ll_request{ req_id=ReqID, proj_la=#mpb_ll_listallprojectionsreq{type=ProjType}}) -> {ReqID, {low_proj, {list_all_projections, conv_to_type(ProjType)}}}; +from_pb_request(#mpb_ll_request{ + req_id=ReqID, + proj_kp=#mpb_ll_kickprojectionreactionreq{}}) -> + {ReqID, {low_proj, {kick_projection_reaction}}}; %%qqq from_pb_request(#mpb_request{req_id=ReqID, echo=#mpb_echoreq{message=Msg}}) -> @@ -313,6 +317,7 @@ from_pb_response(#mpb_ll_response{ _ -> {ReqID< machi_pb_high_client:convert_general_status_code(Status)} end. +%% No response for proj_kp/kick_projection_reaction %% TODO: move the #mbp_* record making code from %% machi_pb_high_client:do_send_sync() clauses into to_pb_request(). @@ -403,9 +408,14 @@ to_pb_request(ReqID, {low_proj, {get_all_projections, ProjType}}) -> proj_ga=#mpb_ll_getallprojectionsreq{type=conv_from_type(ProjType)}}; to_pb_request(ReqID, {low_proj, {list_all_projections, ProjType}}) -> #mpb_ll_request{req_id=ReqID, do_not_alter=2, - proj_la=#mpb_ll_listallprojectionsreq{type=conv_from_type(ProjType)}}. + proj_la=#mpb_ll_listallprojectionsreq{type=conv_from_type(ProjType)}}; +to_pb_request(ReqID, {low_proj, {kick_projection_reaction}}) -> + #mpb_ll_request{req_id=ReqID, do_not_alter=2, + proj_kp=#mpb_ll_kickprojectionreactionreq{}}. %%qqq +to_pb_response(_ReqID, _, async_no_response=X) -> + X; to_pb_response(ReqID, _, {low_error, ErrCode, ErrMsg}) -> make_ll_error_resp(ReqID, ErrCode, ErrMsg); to_pb_response(ReqID, {low_echo, _BogusEpochID, _Msg}, Resp) -> @@ -567,6 +577,7 @@ to_pb_response(ReqID, {low_proj, {list_all_projections, _ProjType}}, Resp)-> proj_la=#mpb_ll_listallprojectionsresp{ status=Status}} end; +%% No response for {kick_projection_reaction}! %%qqq to_pb_response(ReqID, _, {high_error, ErrCode, ErrMsg}) -> make_error_resp(ReqID, ErrCode, ErrMsg); diff --git a/src/machi_proxy_flu1_client.erl b/src/machi_proxy_flu1_client.erl index 3d05f05..6657623 100644 --- a/src/machi_proxy_flu1_client.erl +++ b/src/machi_proxy_flu1_client.erl @@ -68,6 +68,7 @@ write_projection/3, write_projection/4, get_all_projections/2, get_all_projections/3, list_all_projections/2, list_all_projections/3, + kick_projection_reaction/2, kick_projection_reaction/3, %% Common API quit/1, @@ -243,6 +244,19 @@ list_all_projections(PidSpec, ProjType, Timeout) -> gen_server:call(PidSpec, {req, {list_all_projections, ProjType}}, Timeout). +%% @doc Kick (politely) the remote chain manager to react to a +%% projection change. + +kick_projection_reaction(PidSpec, Options) -> + kick_projection_reaction(PidSpec, Options, infinity). + +%% @doc Kick (politely) the remote chain manager to react to a +%% projection change. + +kick_projection_reaction(PidSpec, Options, Timeout) -> + gen_server:call(PidSpec, {req, {kick_projection_reaction, Options}}, + Timeout). + %% @doc Quit & close the connection to remote FLU and stop our %% proxy process. @@ -373,7 +387,10 @@ make_req_fun({get_all_projections, ProjType}, fun() -> Mod:get_all_projections(Sock, ProjType) end; make_req_fun({list_all_projections, ProjType}, #state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) -> - fun() -> Mod:list_all_projections(Sock, ProjType) end. + fun() -> Mod:list_all_projections(Sock, ProjType) end; +make_req_fun({kick_projection_reaction, Options}, + #state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) -> + fun() -> Mod:kick_projection_reaction(Sock, Options) end. connected_p(#state{sock=SockMaybe, i=#p_srvr{proto_mod=Mod}=_I}=_S) -> diff --git a/src/machi_util.erl b/src/machi_util.erl index 6ebe1bc..9557d53 100644 --- a/src/machi_util.erl +++ b/src/machi_util.erl @@ -132,7 +132,7 @@ increment_max_filenum(DataDir, Prefix) -> try {ok, FH} = file:open(make_config_filename(DataDir, Prefix), [append]), ok = file:write(FH, "x"), - %% ok = file:sync(FH), + ok = file:sync(FH), ok = file:close(FH) catch error:{badmatch,_}=Error -> diff --git a/test/machi_chain_manager1_converge_demo.erl b/test/machi_chain_manager1_converge_demo.erl index a54c6cf..b91b333 100644 --- a/test/machi_chain_manager1_converge_demo.erl +++ b/test/machi_chain_manager1_converge_demo.erl @@ -2,7 +2,7 @@ %% %% Machi: a small village of replicated files %% -%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved. +%% Copyright (c) 2014-2015 Basho Technologies, Inc. All Rights Reserved. %% %% This file is provided to you under the Apache License, %% Version 2.0 (the "License"); you may not use this file @@ -207,7 +207,7 @@ convergence_demo_testfun(NumFLUs, MgrOpts0) -> ?MGR:sleep_ranked_order( S_min, S_max_rand, M_name, All_list), - _ = ?MGR:test_react_to_env(MMM), + _ = ?MGR:trigger_react_to_env(MMM), %% Be more unfair by not %% sleeping here. % timer:sleep(S_max - Elapsed), diff --git a/test/machi_chain_manager1_pulse.erl b/test/machi_chain_manager1_pulse.erl index a985b85..9611dcc 100644 --- a/test/machi_chain_manager1_pulse.erl +++ b/test/machi_chain_manager1_pulse.erl @@ -82,7 +82,7 @@ command(S) -> { 1, {call, ?MODULE, change_partitions, [gen_old_threshold(), gen_no_partition_threshold()]}}, {50, {call, ?MODULE, do_ticks, - [choose(5, 200), S#state.pids, + [choose(5, 100), S#state.pids, gen_old_threshold(), gen_no_partition_threshold()]}} ]). @@ -149,11 +149,11 @@ setup(Num, Seed) -> %% do all the same server first, then round-robin evenly across %% servers. [begin - _QQa = machi_chain_manager1:test_react_to_env(get_chmgr(P)) + _QQa = machi_chain_manager1:trigger_react_to_env(get_chmgr(P)) end || {P, _Dir} <- All_listE, _I <- lists:seq(1,20), _Repeat <- [1,2]], ?QC_FMT(",z~w", [?LINE]), [begin - _QQa = machi_chain_manager1:test_react_to_env(get_chmgr(P)) + _QQa = machi_chain_manager1:trigger_react_to_env(get_chmgr(P)) end || _I <- lists:seq(1,20), {P, _Dir} <- All_listE, _Repeat <- [1,2]], ?QC_FMT(",z~w", [?LINE]), @@ -280,7 +280,7 @@ dump_state() -> prop_pulse() -> ?FORALL({Cmds0, Seed}, {non_empty(commands(?MODULE)), pulse:seed()}, - ?IMPLIES(1 < length(Cmds0) andalso length(Cmds0) < 5, + ?IMPLIES(1 < length(Cmds0) andalso length(Cmds0) < 10, begin ok = shutdown_hard(), %% PULSE can be really unfair, of course, including having exec_ticks @@ -402,7 +402,7 @@ exec_ticks(Num, All_listE) -> Max = 10, Elapsed = ?MGR:sleep_ranked_order(1, Max, M_name, all_list()), - Res = ?MGR:test_react_to_env(get_chmgr(P)), + Res = ?MGR:trigger_react_to_env(get_chmgr(P)), timer:sleep(erlang:max(0, Max - Elapsed)), Res=Res %% ?D({self(), Res}) end || _ <- lists:seq(1,Num)], @@ -421,10 +421,15 @@ private_projections_are_stable_check(ProxiesDict, All_listE) -> %% also check for flapping, and if yes, to see if all_hosed are %% all exactly equal. - _ = exec_ticks(40, All_listE), + %% gobble_calls() workaround: many small exec_ticks() calls + + %% sleep after each. + [begin + _ = exec_ticks(10, All_listE), + timer:sleep(10) + end|| _ <- lists:seq(1, 40)], Private1 = [?FLU_PC:get_latest_epochid(Proxy, private) || {_FLU, Proxy} <- orddict:to_list(ProxiesDict)], - _ = exec_ticks(5, All_listE), + _ = exec_ticks(3*20, All_listE), Private2 = [?FLU_PC:get_latest_epochid(Proxy, private) || {_FLU, Proxy} <- orddict:to_list(ProxiesDict)], diff --git a/test/machi_chain_manager1_test.erl b/test/machi_chain_manager1_test.erl index c4d52d9..8f58994 100644 --- a/test/machi_chain_manager1_test.erl +++ b/test/machi_chain_manager1_test.erl @@ -232,22 +232,23 @@ nonunanimous_setup_and_fix_test() -> %% we expect nothing to change when called again. {not_unanimous,_,_}=_YY = ?MGR:test_read_latest_public_projection(Ma, true), - {now_using, _, EpochNum_a} = ?MGR:test_react_to_env(Ma), - {no_change, _, EpochNum_a} = ?MGR:test_react_to_env(Ma), + {now_using, _, EpochNum_a} = ?MGR:trigger_react_to_env(Ma), + {no_change, _, EpochNum_a} = ?MGR:trigger_react_to_env(Ma), {unanimous,P2,_E2} = ?MGR:test_read_latest_public_projection(Ma, false), {ok, P2pa} = ?FLU_PC:read_latest_projection(Proxy_a, private), P2 = P2pa#projection_v1{dbg2=[]}, - %% FLUb should have nothing written to private because it hasn't - %% reacted yet. - {error, not_written} = ?FLU_PC:read_latest_projection(Proxy_b, private), + %% %% FLUb should have nothing written to private because it hasn't + %% %% reacted yet. + %% {error, not_written} = ?FLU_PC:read_latest_projection(Proxy_b, private), - %% Poke FLUb to react ... should be using the same private proj - %% as FLUa. - {now_using, _, EpochNum_a} = ?MGR:test_react_to_env(Mb), + %% %% Poke FLUb to react ... should be using the same private proj + %% %% as FLUa. + %% {now_using, _, EpochNum_a} = ?MGR:trigger_react_to_env(Mb), {ok, P2pb} = ?FLU_PC:read_latest_projection(Proxy_b, private), P2 = P2pb#projection_v1{dbg2=[]}, +timer:sleep(3000), ok after ok = ?MGR:stop(Ma), diff --git a/test/machi_flu_psup_test.erl b/test/machi_flu_psup_test.erl index a406bdd..7e5e593 100644 --- a/test/machi_flu_psup_test.erl +++ b/test/machi_flu_psup_test.erl @@ -51,9 +51,9 @@ smoke_test2() -> {ok, _} = machi_flu_psup:start_flu_package(Name, Port, Dir, []) end || {_,P} <- [hd(Ps)]], - [machi_chain_manager1:test_react_to_env(a_chmgr) || _ <-lists:seq(1,5)], + [machi_chain_manager1:trigger_react_to_env(a_chmgr) || _ <-lists:seq(1,5)], machi_chain_manager1:set_chain_members(a_chmgr, orddict:from_list(Ps)), - [machi_chain_manager1:test_react_to_env(a_chmgr) || _ <-lists:seq(1,5)], + [machi_chain_manager1:trigger_react_to_env(a_chmgr) || _ <-lists:seq(1,5)], ok after exit(SupPid, normal), @@ -99,17 +99,18 @@ partial_stop_restart2() -> {ok, {false, EpochID1}} = WedgeStatus(hd(Ps)), [{ok, {false, EpochID1}} = WedgeStatus(P) || P <- Ps], % *not* wedged [{ok,_} = Append(P, EpochID1) || P <- Ps], % *not* wedged + {ok, {_,_,File1}} = Append(hd(Ps), EpochID1), - {_,_,_} = machi_chain_manager1:test_react_to_env(hd(ChMgrs)), + {_,_,_} = machi_chain_manager1:trigger_react_to_env(hd(ChMgrs)), [begin - _QQa = machi_chain_manager1:test_react_to_env(ChMgr) + _QQa = machi_chain_manager1:trigger_react_to_env(ChMgr) end || _ <- lists:seq(1,125), ChMgr <- ChMgrs], %% All chain managers & projection stores should be using the %% same projection which is max projection in each store. - {no_change,_,Epoch_m} = machi_chain_manager1:test_react_to_env( + {no_change,_,Epoch_m} = machi_chain_manager1:trigger_react_to_env( hd(ChMgrs)), - [{no_change,_,Epoch_m} = machi_chain_manager1:test_react_to_env( + [{no_change,_,Epoch_m} = machi_chain_manager1:trigger_react_to_env( ChMgr )|| ChMgr <- ChMgrs], {ok, Proj_m} = machi_projection_store:read_latest_projection( hd(PStores), public), @@ -126,6 +127,13 @@ partial_stop_restart2() -> P <- Ps], {ok, {false, EpochID2}} = WedgeStatus(hd(Ps)), [{ok,_} = Append(P, EpochID2) || P <- Ps], % *not* wedged + %% The file we're assigned should be different with the epoch change. + {ok, {_,_,File2}} = Append(hd(Ps), EpochID2), + true = (File1 /= File2), + %% If we use the old epoch, then we're told that it's bad + {error, bad_epoch} = Append(hd(Ps), EpochID1), + %% If we use the current epoch again, then it's OK and given same File2 + {ok, {_,_,File2}} = Append(hd(Ps), EpochID2), %% Stop all but 'a'. [ok = machi_flu_psup:stop_flu_package(Name) || {Name,_} <- tl(Ps)], @@ -152,10 +160,13 @@ partial_stop_restart2() -> Addr_a, TcpPort_a, ?DUMMY_PV1_EPOCH), %% Iterate through humming consensus once - {now_using,_,Epoch_n} = machi_chain_manager1:test_react_to_env( + {now_using,_,Epoch_n} = machi_chain_manager1:trigger_react_to_env( hd(ChMgrs)), true = (Epoch_n > Epoch_m), {ok, {false, EpochID3}} = WedgeStatus(hd(Ps)), + %% The file we're assigned should be different with the epoch change. + {ok, {_,_,File3}} = Append(hd(Ps), EpochID3), + true = (File2 /= File3), %% Confirm that 'a' is *not* wedged {ok, _} = Append(hd(Ps), EpochID3), diff --git a/test/machi_pb_high_client_test.erl b/test/machi_pb_high_client_test.erl index e5cb7df..d70489f 100644 --- a/test/machi_pb_high_client_test.erl +++ b/test/machi_pb_high_client_test.erl @@ -47,7 +47,7 @@ smoke_test2() -> {ok, _} = machi_flu_psup:start_flu_package(Name, Port, Dir, []) end || P <- Ps], ok = machi_chain_manager1:set_chain_members(a_chmgr, D), - [machi_chain_manager1:test_react_to_env(a_chmgr) || _ <-lists:seq(1,5)], + [machi_chain_manager1:trigger_react_to_env(a_chmgr) || _ <-lists:seq(1,5)], {ok, Clnt} = ?C:start_link(Ps), try diff --git a/test/machi_proxy_flu1_client_test.erl b/test/machi_proxy_flu1_client_test.erl index 976b6ab..8550384 100644 --- a/test/machi_proxy_flu1_client_test.erl +++ b/test/machi_proxy_flu1_client_test.erl @@ -75,6 +75,11 @@ api_smoke_test() -> <<"foo-file">>, 99832, MyChunk_badcs), + %% Put kick_projection_reaction() in the middle of the test so + %% that any problems with its async nature will (hopefully) + %% cause problems later in the test. + ok = ?MUT:kick_projection_reaction(Prox1, []), + %% Alright, now for the rest of the API, whee BadFile = <<"no-such-file">>, {error, no_such_file} = ?MUT:checksum_list(Prox1, FakeEpoch, BadFile),