From a8785e44b1ff25b067ecfe6234a47cb05c16f108 Mon Sep 17 00:00:00 2001 From: Shunichi Shinohara Date: Mon, 7 Dec 2015 13:19:30 +0900 Subject: [PATCH 1/9] Set longer timeout for hyooge binary write test case --- test/machi_file_proxy_test.erl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/machi_file_proxy_test.erl b/test/machi_file_proxy_test.erl index 1bf6cde..a04d880 100644 --- a/test/machi_file_proxy_test.erl +++ b/test/machi_file_proxy_test.erl @@ -98,7 +98,8 @@ machi_file_proxy_test_() -> ?_assertMatch({ok, {_, []}}, machi_file_proxy:read(Pid, 1, 1024)), ?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1024, ?HYOOGE)), ?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, ?HYOOGE, 1)), - ?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1, random_binary(0, ?HYOOGE))), + {timeout, 10, + ?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1, random_binary(0, ?HYOOGE)))}, ?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, random_binary(0, 1024))), ?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1024, <<"fail">>)), ?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1, <<"fail">>)), From 9579b1b8b259806876eb57aa4fb4a4b7e6ce1a07 Mon Sep 17 00:00:00 2001 From: Shunichi Shinohara Date: Wed, 18 Nov 2015 15:16:04 +0900 Subject: [PATCH 2/9] Introduce ranch and add transport callback --- rebar.config | 1 + src/machi_flu1.erl | 465 +-------------------------- src/machi_flu_psup.erl | 13 +- src/machi_pb_protocol.erl | 645 ++++++++++++++++++++++++++++++++++++++ src/machi_sup.erl | 16 +- 5 files changed, 666 insertions(+), 474 deletions(-) create mode 100644 src/machi_pb_protocol.erl diff --git a/rebar.config b/rebar.config index 7bd81ce..be9b0d3 100644 --- a/rebar.config +++ b/rebar.config @@ -11,6 +11,7 @@ {lager, ".*", {git, "git://github.com/basho/lager.git", {tag, "2.2.0"}}}, {protobuffs, "0.8.*", {git, "git://github.com/basho/erlang_protobuffs.git", {tag, "0.8.1p4"}}}, {riak_dt, ".*", {git, "git://github.com/basho/riak_dt.git", {branch, "develop"}}}, + {ranch, ".*", {git, "git://github.com/ninenines/ranch.git", {branch, "master"}}}, {node_package, ".*", {git, "git://github.com/basho/node_package.git", {branch, "develop"}}}, {eper, ".*", {git, "git://github.com/basho/eper.git", {tag, "0.92-basho1"}}}, {cluster_info, ".*", {git, "git://github.com/basho/cluster_info", {branch, "develop"}}} diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl index bfe599b..c02772d 100644 --- a/src/machi_flu1.erl +++ b/src/machi_flu1.erl @@ -60,7 +60,8 @@ update_wedge_state/3, wedge_myself/2]). -export([make_listener_regname/1, make_projection_server_regname/1]). %% TODO: remove or replace in OTP way after gen_*'ified --export([main2/4, run_append_server/2, run_listen_server/1, +-export([main2/4, run_append_server/2, + %% run_listen_server/1, current_state/1, format_state/1]). -record(state, { @@ -188,27 +189,13 @@ main2(FluName, TcpPort, DataDir, Props) -> (catch exit(ListenPid, kill)), ok. -start_listen_server(S) -> - proc_lib:start_link(?MODULE, run_listen_server, [S], ?INIT_TIMEOUT). - start_append_server(S, AckPid) -> proc_lib:start_link(?MODULE, run_append_server, [AckPid, S], ?INIT_TIMEOUT). -run_listen_server(#state{flu_name=FluName, tcp_port=TcpPort}=S) -> - register(make_listener_regname(FluName), self()), - SockOpts = ?PB_PACKET_OPTS ++ - [{reuseaddr, true}, {mode, binary}, {active, false}, - {backlog,8192}], - case gen_tcp:listen(TcpPort, SockOpts) of - {ok, LSock} -> - proc_lib:init_ack({ok, self()}), - listen_server_loop(LSock, S); - Else -> - error_logger:warning_msg("~s:run_listen_server: " - "listen to TCP port ~w: ~w\n", - [?MODULE, TcpPort, Else]), - exit({?MODULE, run_listen_server, tcp_port, TcpPort, Else}) - end. +start_listen_server(_S) -> + %% FIXMEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE + %% proc_lib:start_link(?MODULE, run_listen_server, [S], ?INIT_TIMEOUT). + {ok, dummy}. run_append_server(FluPid, #state{flu_name=Name, wedged=Wedged_p,epoch_id=EpochId}=S) -> @@ -220,11 +207,6 @@ run_append_server(FluPid, #state{flu_name=Name, proc_lib:init_ack({ok, self()}), append_server_loop(FluPid, S#state{etstab=TID}). -listen_server_loop(LSock, S) -> - {ok, Sock} = gen_tcp:accept(LSock), - spawn_link(fun() -> net_server_loop(Sock, S) end), - listen_server_loop(LSock, S). - append_server_loop(FluPid, #state{wedged=Wedged_p, witness=Witness_p, epoch_id=OldEpochId, flu_name=FluName}=S) -> @@ -292,398 +274,6 @@ append_server_loop(FluPid, #state{wedged=Wedged_p, append_server_loop(FluPid, S) end. -net_server_loop(Sock, S) -> - case gen_tcp:recv(Sock, 0, ?SERVER_CMD_READ_TIMEOUT) of - {ok, Bin} -> - {RespBin, S2} = - case machi_pb:decode_mpb_ll_request(Bin) of - LL_req when LL_req#mpb_ll_request.do_not_alter == 2 -> - {R, NewS} = do_pb_ll_request(LL_req, S), - {maybe_encode_response(R), mode(low, NewS)}; - _ -> - HL_req = machi_pb:decode_mpb_request(Bin), - 1 = HL_req#mpb_request.do_not_alter, - {R, NewS} = do_pb_hl_request(HL_req, make_high_clnt(S)), - {machi_pb:encode_mpb_response(R), mode(high, NewS)} - end, - if RespBin == async_no_response -> - net_server_loop(Sock, S2); - true -> - case gen_tcp:send(Sock, RespBin) of - ok -> - net_server_loop(Sock, S2); - {error, _} -> - (catch gen_tcp:close(Sock)), - exit(normal) - end - end; - {error, SockError} -> - Msg = io_lib:format("Socket error ~w", [SockError]), - R = #mpb_ll_response{req_id= <<>>, - generic=#mpb_errorresp{code=1, msg=Msg}}, - _Resp = machi_pb:encode_mpb_ll_response(R), - %% TODO: Weird that sometimes neither catch nor try/catch - %% can prevent OTP's SASL from logging an error here. - %% Error in process <0.545.0> with exit value: {badarg,[{erlang,port_command,....... - %% TODO: is this what causes the intermittent PULSE deadlock errors? - %% _ = (catch gen_tcp:send(Sock, _Resp)), timer:sleep(1000), - (catch gen_tcp:close(Sock)), - exit(normal) - end. - -maybe_encode_response(async_no_response=X) -> - X; -maybe_encode_response(R) -> - machi_pb:encode_mpb_ll_response(R). - -mode(Mode, #state{pb_mode=undefined}=S) -> - S#state{pb_mode=Mode}; -mode(_, S) -> - S. - -make_high_clnt(#state{high_clnt=undefined}=S) -> - {ok, Proj} = machi_projection_store:read_latest_projection( - S#state.proj_store, private), - Ps = [P_srvr || {_, P_srvr} <- orddict:to_list( - Proj#projection_v1.members_dict)], - {ok, Clnt} = machi_cr_client:start_link(Ps), - S#state{high_clnt=Clnt}; -make_high_clnt(S) -> - S. - -do_pb_ll_request(#mpb_ll_request{req_id=ReqID}, #state{pb_mode=high}=S) -> - Result = {high_error, 41, "Low protocol request while in high mode"}, - {machi_pb_translate:to_pb_response(ReqID, unused, Result), S}; -do_pb_ll_request(PB_request, S) -> - Req = machi_pb_translate:from_pb_request(PB_request), - {ReqID, Cmd, Result, S2} = - case Req of - {RqID, {LowCmd, _}=CMD} - when LowCmd == low_proj; - LowCmd == low_wedge_status; LowCmd == low_list_files -> - %% Skip wedge check for projection commands! - %% Skip wedge check for these unprivileged commands - {Rs, NewS} = do_pb_ll_request3(CMD, S), - {RqID, CMD, Rs, NewS}; - {RqID, CMD} -> - EpochID = element(2, CMD), % by common convention - {Rs, NewS} = do_pb_ll_request2(EpochID, CMD, S), - {RqID, CMD, Rs, NewS} - end, - {machi_pb_translate:to_pb_response(ReqID, Cmd, Result), S2}. - -do_pb_ll_request2(EpochID, CMD, S) -> - {Wedged_p, CurrentEpochID} = ets:lookup_element(S#state.etstab, epoch, 2), - if Wedged_p == true -> - {{error, wedged}, S#state{epoch_id=CurrentEpochID}}; - is_tuple(EpochID) - andalso - EpochID /= CurrentEpochID -> - {Epoch, _} = EpochID, - {CurrentEpoch, _} = CurrentEpochID, - if Epoch < CurrentEpoch -> - ok; - true -> - %% We're at same epoch # but different checksum, or - %% we're at a newer/bigger epoch #. - _ = wedge_myself(S#state.flu_name, CurrentEpochID), - ok - end, - {{error, bad_epoch}, S#state{epoch_id=CurrentEpochID}}; - true -> - do_pb_ll_request3(CMD, S#state{epoch_id=CurrentEpochID}) - end. - -%% Witness status does not matter below. -do_pb_ll_request3({low_echo, _BogusEpochID, Msg}, S) -> - {Msg, S}; -do_pb_ll_request3({low_auth, _BogusEpochID, _User, _Pass}, S) -> - {-6, S}; -do_pb_ll_request3({low_wedge_status, _EpochID}, S) -> - {do_server_wedge_status(S), S}; -do_pb_ll_request3({low_proj, PCMD}, S) -> - {do_server_proj_request(PCMD, S), S}; -%% Witness status *matters* below -do_pb_ll_request3({low_append_chunk, _EpochID, CoC_Namespace, CoC_Locator, - Prefix, Chunk, CSum_tag, - CSum, ChunkExtra}, - #state{witness=false}=S) -> - {do_server_append_chunk(CoC_Namespace, CoC_Locator, - Prefix, Chunk, CSum_tag, CSum, - ChunkExtra, S), S}; -do_pb_ll_request3({low_write_chunk, _EpochID, File, Offset, Chunk, CSum_tag, - CSum}, - #state{witness=false}=S) -> - {do_server_write_chunk(File, Offset, Chunk, CSum_tag, CSum, S), S}; -do_pb_ll_request3({low_read_chunk, _EpochID, File, Offset, Size, Opts}, - #state{witness=false} = S) -> - {do_server_read_chunk(File, Offset, Size, Opts, S), S}; -do_pb_ll_request3({low_trim_chunk, _EpochID, File, Offset, Size, TriggerGC}, - #state{witness=false}=S) -> - {do_server_trim_chunk(File, Offset, Size, TriggerGC, S), S}; -do_pb_ll_request3({low_checksum_list, _EpochID, File}, - #state{witness=false}=S) -> - {do_server_checksum_listing(File, S), S}; -do_pb_ll_request3({low_list_files, _EpochID}, - #state{witness=false}=S) -> - {do_server_list_files(S), S}; -do_pb_ll_request3({low_delete_migration, _EpochID, File}, - #state{witness=false}=S) -> - {do_server_delete_migration(File, S), - #state{witness=false}=S}; -do_pb_ll_request3({low_trunc_hack, _EpochID, File}, - #state{witness=false}=S) -> - {do_server_trunc_hack(File, S), S}; -do_pb_ll_request3(_, #state{witness=true}=S) -> - {{error, bad_arg}, S}. % TODO: new status code?? - -do_pb_hl_request(#mpb_request{req_id=ReqID}, #state{pb_mode=low}=S) -> - Result = {low_error, 41, "High protocol request while in low mode"}, - {machi_pb_translate:to_pb_response(ReqID, unused, Result), S}; -do_pb_hl_request(PB_request, S) -> - {ReqID, Cmd} = machi_pb_translate:from_pb_request(PB_request), - {Result, S2} = do_pb_hl_request2(Cmd, S), - {machi_pb_translate:to_pb_response(ReqID, Cmd, Result), S2}. - -do_pb_hl_request2({high_echo, Msg}, S) -> - {Msg, S}; -do_pb_hl_request2({high_auth, _User, _Pass}, S) -> - {-77, S}; -do_pb_hl_request2({high_append_chunk, CoC_Namespace, CoC_Locator, - Prefix, ChunkBin, TaggedCSum, - ChunkExtra}, #state{high_clnt=Clnt}=S) -> - Chunk = {TaggedCSum, ChunkBin}, - Res = machi_cr_client:append_chunk_extra(Clnt, CoC_Namespace, CoC_Locator, - Prefix, Chunk, - ChunkExtra), - {Res, S}; -do_pb_hl_request2({high_write_chunk, File, Offset, ChunkBin, TaggedCSum}, - #state{high_clnt=Clnt}=S) -> - Chunk = {TaggedCSum, ChunkBin}, - Res = machi_cr_client:write_chunk(Clnt, File, Offset, Chunk), - {Res, S}; -do_pb_hl_request2({high_read_chunk, File, Offset, Size, Opts}, - #state{high_clnt=Clnt}=S) -> - Res = machi_cr_client:read_chunk(Clnt, File, Offset, Size, Opts), - {Res, S}; -do_pb_hl_request2({high_trim_chunk, File, Offset, Size}, - #state{high_clnt=Clnt}=S) -> - Res = machi_cr_client:trim_chunk(Clnt, File, Offset, Size), - {Res, S}; -do_pb_hl_request2({high_checksum_list, File}, #state{high_clnt=Clnt}=S) -> - Res = machi_cr_client:checksum_list(Clnt, File), - {Res, S}; -do_pb_hl_request2({high_list_files}, #state{high_clnt=Clnt}=S) -> - Res = machi_cr_client:list_files(Clnt), - {Res, S}. - -do_server_proj_request({get_latest_epochid, ProjType}, - #state{proj_store=ProjStore}) -> - machi_projection_store:get_latest_epochid(ProjStore, ProjType); -do_server_proj_request({read_latest_projection, ProjType}, - #state{proj_store=ProjStore}) -> - machi_projection_store:read_latest_projection(ProjStore, ProjType); -do_server_proj_request({read_projection, ProjType, Epoch}, - #state{proj_store=ProjStore}) -> - machi_projection_store:read(ProjStore, ProjType, Epoch); -do_server_proj_request({write_projection, ProjType, Proj}, - #state{flu_name=FluName, proj_store=ProjStore}) -> - if Proj#projection_v1.epoch_number == ?SPAM_PROJ_EPOCH -> - %% io:format(user, "DBG ~s ~w ~P\n", [?MODULE, ?LINE, Proj, 5]), - Chmgr = machi_flu_psup:make_fitness_regname(FluName), - [Map] = Proj#projection_v1.dbg, - catch machi_fitness:send_fitness_update_spam( - Chmgr, Proj#projection_v1.author_server, Map); - true -> - catch machi_projection_store:write(ProjStore, ProjType, Proj) - end; -do_server_proj_request({get_all_projections, ProjType}, - #state{proj_store=ProjStore}) -> - machi_projection_store:get_all_projections(ProjStore, ProjType); -do_server_proj_request({list_all_projections, ProjType}, - #state{proj_store=ProjStore}) -> - machi_projection_store:list_all_projections(ProjStore, ProjType); -do_server_proj_request({kick_projection_reaction}, - #state{flu_name=FluName}) -> - %% Tell my chain manager that it might want to react to - %% this new world. - Chmgr = machi_chain_manager1:make_chmgr_regname(FluName), - spawn(fun() -> - catch machi_chain_manager1:trigger_react_to_env(Chmgr) - end), - async_no_response. - -do_server_append_chunk(CoC_Namespace, CoC_Locator, - Prefix, Chunk, CSum_tag, CSum, - ChunkExtra, S) -> - case sanitize_prefix(Prefix) of - ok -> - do_server_append_chunk2(CoC_Namespace, CoC_Locator, - Prefix, Chunk, CSum_tag, CSum, - ChunkExtra, S); - _ -> - {error, bad_arg} - end. - -do_server_append_chunk2(CoC_Namespace, CoC_Locator, - Prefix, Chunk, CSum_tag, Client_CSum, - ChunkExtra, #state{flu_name=FluName, - epoch_id=EpochID}=_S) -> - %% TODO: Do anything with PKey? - try - TaggedCSum = check_or_make_tagged_checksum(CSum_tag, Client_CSum,Chunk), - R = {seq_append, self(), CoC_Namespace, CoC_Locator, - Prefix, Chunk, TaggedCSum, ChunkExtra, EpochID}, - FluName ! R, - receive - {assignment, Offset, File} -> - Size = iolist_size(Chunk), - {ok, {Offset, Size, File}}; - witness -> - {error, bad_arg}; - wedged -> - {error, wedged} - after 10*1000 -> - {error, partition} - end - catch - throw:{bad_csum, _CS} -> - {error, bad_checksum}; - error:badarg -> - error_logger:error_msg("Message send to ~p gave badarg, make certain server is running with correct registered name\n", [?MODULE]), - {error, bad_arg} - end. - -do_server_write_chunk(File, Offset, Chunk, CSum_tag, CSum, #state{flu_name=FluName}) -> - case sanitize_file_string(File) of - ok -> - case machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}) of - {ok, Pid} -> - Meta = [{client_csum_tag, CSum_tag}, {client_csum, CSum}], - machi_file_proxy:write(Pid, Offset, Meta, Chunk); - {error, trimmed} = Error -> - Error - end; - _ -> - {error, bad_arg} - end. - -do_server_read_chunk(File, Offset, Size, Opts, #state{flu_name=FluName})-> - case sanitize_file_string(File) of - ok -> - case machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}) of - {ok, Pid} -> - case machi_file_proxy:read(Pid, Offset, Size, Opts) of - %% XXX FIXME - %% For now we are omiting the checksum data because it blows up - %% protobufs. - {ok, ChunksAndTrimmed} -> {ok, ChunksAndTrimmed}; - Other -> Other - end; - {error, trimmed} = Error -> - Error - end; - _ -> - {error, bad_arg} - end. - -do_server_trim_chunk(File, Offset, Size, TriggerGC, #state{flu_name=FluName}) -> - lager:debug("Hi there! I'm trimming this: ~s, (~p, ~p), ~p~n", - [File, Offset, Size, TriggerGC]), - case sanitize_file_string(File) of - ok -> - case machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}) of - {ok, Pid} -> - machi_file_proxy:trim(Pid, Offset, Size, TriggerGC); - {error, trimmed} = Trimmed -> - %% Should be returned back to (maybe) trigger repair - Trimmed - end; - _ -> - {error, bad_arg} - end. - -do_server_checksum_listing(File, #state{flu_name=FluName, data_dir=DataDir}=_S) -> - case sanitize_file_string(File) of - ok -> - case machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}) of - {ok, Pid} -> - {ok, List} = machi_file_proxy:checksum_list(Pid), - Bin = erlang:term_to_binary(List), - if byte_size(Bin) > (?PB_MAX_MSG_SIZE - 1024) -> - %% TODO: Fix this limitation by streaming the - %% binary in multiple smaller PB messages. - %% Also, don't read the file all at once. ^_^ - error_logger:error_msg("~s:~w oversize ~s\n", - [?MODULE, ?LINE, DataDir]), - {error, bad_arg}; - true -> - {ok, Bin} - end; - {error, trimmed} -> - {error, trimmed} - end; - _ -> - {error, bad_arg} - end. - -do_server_list_files(#state{data_dir=DataDir}=_S) -> - {_, WildPath} = machi_util:make_data_filename(DataDir, ""), - Files = filelib:wildcard("*", WildPath), - {ok, [begin - {ok, FI} = file:read_file_info(WildPath ++ "/" ++ File), - Size = FI#file_info.size, - {Size, File} - end || File <- Files]}. - -do_server_wedge_status(S) -> - {Wedged_p, CurrentEpochID0} = ets:lookup_element(S#state.etstab, epoch, 2), - CurrentEpochID = if CurrentEpochID0 == undefined -> - ?DUMMY_PV1_EPOCH; - true -> - CurrentEpochID0 - end, - {Wedged_p, CurrentEpochID}. - -do_server_delete_migration(File, #state{data_dir=DataDir}=_S) -> - case sanitize_file_string(File) of - ok -> - {_, Path} = machi_util:make_data_filename(DataDir, File), - case file:delete(Path) of - ok -> - ok; - {error, enoent} -> - {error, no_such_file}; - _ -> - {error, bad_arg} - end; - _ -> - {error, bad_arg} - end. - -do_server_trunc_hack(File, #state{data_dir=DataDir}=_S) -> - case sanitize_file_string(File) of - ok -> - {_, Path} = machi_util:make_data_filename(DataDir, File), - case file:open(Path, [read, write, binary, raw]) of - {ok, FH} -> - try - {ok, ?MINIMUM_OFFSET} = file:position(FH, - ?MINIMUM_OFFSET), - ok = file:truncate(FH), - ok - after - file:close(FH) - end; - {error, enoent} -> - {error, no_such_file}; - _ -> - {error, bad_arg} - end; - _ -> - {error, bad_arg} - end. - append_server_dispatch(From, CoC_Namespace, CoC_Locator, Prefix, Chunk, CSum, Extra, FluName, EpochId) -> Result = case handle_append(CoC_Namespace, CoC_Locator, @@ -717,29 +307,6 @@ handle_append(CoC_Namespace, CoC_Locator, Error end. -sanitize_file_string(Str) -> - case has_no_prohibited_chars(Str) andalso machi_util:is_valid_filename(Str) of - true -> ok; - false -> error - end. - -has_no_prohibited_chars(Str) -> - case re:run(Str, "/") of - nomatch -> - true; - _ -> - true - end. - -sanitize_prefix(Prefix) -> - %% We are using '^' as our component delimiter - case re:run(Prefix, "/|\\^") of - nomatch -> - ok; - _ -> - error - end. - make_listener_regname(BaseName) -> list_to_atom(atom_to_list(BaseName) ++ "_listener"). @@ -753,26 +320,6 @@ make_listener_regname(BaseName) -> make_projection_server_regname(BaseName) -> list_to_atom(atom_to_list(BaseName) ++ "_pstore"). -check_or_make_tagged_checksum(?CSUM_TAG_NONE, _Client_CSum, Chunk) -> - %% TODO: If the client was foolish enough to use - %% this type of non-checksum, then the client gets - %% what it deserves wrt data integrity, alas. In - %% the client-side Chain Replication method, each - %% server will calculated this independently, which - %% isn't exactly what ought to happen for best data - %% integrity checking. In server-side CR, the csum - %% should be calculated by the head and passed down - %% the chain together with the value. - CS = machi_util:checksum_chunk(Chunk), - machi_util:make_tagged_csum(server_sha, CS); -check_or_make_tagged_checksum(?CSUM_TAG_CLIENT_SHA, Client_CSum, Chunk) -> - CS = machi_util:checksum_chunk(Chunk), - if CS == Client_CSum -> - machi_util:make_tagged_csum(server_sha, - Client_CSum); - true -> - throw({bad_csum, CS}) - end. -ifdef(TEST). diff --git a/src/machi_flu_psup.erl b/src/machi_flu_psup.erl index 9e568cd..b2ccdcc 100644 --- a/src/machi_flu_psup.erl +++ b/src/machi_flu_psup.erl @@ -143,16 +143,21 @@ init([FluName, TcpPort, DataDir, Props0]) -> FProxySupSpec = machi_file_proxy_sup:child_spec(FluName), + ListenerRegName = machi_flu1:make_listener_regname(FluName), + NbAcceptors = 100, + ListenerSpec = ranch:child_spec(ListenerRegName, NbAcceptors, + ranch_tcp, [{port, TcpPort}], + machi_pb_protocol, []), FluSpec = {FluName, {machi_flu1, start_link, - [ [{FluName, TcpPort, DataDir}|Props] ]}, + [ [{FluName, TcpPort+1, DataDir}|Props] ]}, permanent, ?SHUTDOWN, worker, []}, {ok, {SupFlags, [ - ProjSpec, FitnessSpec, MgrSpec, - FProxySupSpec, FNameMgrSpec, MetaSupSpec, - FluSpec]}}. + ProjSpec, FitnessSpec, MgrSpec, + FProxySupSpec, FNameMgrSpec, MetaSupSpec, + FluSpec, ListenerSpec]}}. make_flu_regname(FluName) when is_atom(FluName) -> FluName. diff --git a/src/machi_pb_protocol.erl b/src/machi_pb_protocol.erl new file mode 100644 index 0000000..28ce626 --- /dev/null +++ b/src/machi_pb_protocol.erl @@ -0,0 +1,645 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +%% @doc Ranch protocol callback module to handle PB protocol over +%% transport + +%% TODO +%% - Two modes, high and low should be separated at listener level? + +-module(machi_pb_protocol). + +-behaviour(gen_server). +-behaviour(ranch_protocol). +-export([start_link/4]). +-export([init/1]). +-export([handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-include("machi.hrl"). +-include("machi_pb.hrl"). +-include("machi_projection.hrl"). +-define(V(X,Y), ok). +%% -include("machi_verbose.hrl"). + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). +-endif. % TEST + +-record(state, {ref, + socket, + transport, + opts, + pb_mode, + data_dir, + witness, + %% - Used in projection related requests in low mode + %% - Used in spawning CR client in high mode + proj_store, + %%%% Low mode only + %% Current best knowledge, used for wedge_self / bad_epoch check + epoch_id, + %% Used in dispatching append_chunk* reqs to the + %% append serializing process + flu_name, + %% Stored in ETS before factorization, can be stored in the recored? + wedged, + %% Used in server_wedge_status to lookup the table + etstab, + %%%% High mode only + high_clnt, + %%%% to be removed + eof + }). + +%% -record(state, { + %% used in append serializer to trigger chain mgr react_to_env +%% flu_name :: atom(), +%% proj_store :: pid(), +%% witness = false :: boolean(), +%% append_pid :: pid(), +%% tcp_port :: non_neg_integer(), +%% data_dir :: string(), +%% wedged = true :: boolean(), +%% etstab :: ets:tid(), +%% epoch_id :: 'undefined' | machi_dt:epoch_id(), +%% pb_mode = undefined :: 'undefined' | 'high' | 'low', +%% high_clnt :: 'undefined' | pid(), +%% trim_table :: ets:tid(), +%% props = [] :: list() % proplist +%% }). + +-spec start_link(ranch:ref(), any(), module(), any()) -> {ok, pid()}. +start_link(Ref, Socket, Transport, Opts) -> + proc_lib:start_link(?MODULE, init, [#state{ref=Ref, socket=Socket, + transport=Transport, + opts=Opts}]). + +init(#state{ref=Ref, socket=Socket, transport=Transport, opts=_Opts}=State) -> + ok = proc_lib:init_ack({ok, self()}), + %% TODO: Perform any required state initialization here. + ok = ranch:accept_ack(Ref), + ok = Transport:setopts(Socket, [{active, once}]), + gen_server:enter_loop(?MODULE, [], State). + +handle_call(Request, _From, S) -> + Reply = {error, {unknown_message, Request}}, + {reply, Reply, S}. + +handle_cast(_Msg, S) -> + io:format(user, "~s:handle_cast: ~p\n", [?MODULE, _Msg]), + {noreply, S}. + +handle_info({tcp, Sock, Data}=_Info, S) -> + io:format(user, "~s:handle_info: ~p\n", [?MODULE, _Info]), + transport_received(Sock, Data, S); +handle_info({tcp_closed, Sock}=_Info, S) -> + io:format(user, "~s:handle_info: ~p\n", [?MODULE, _Info]), + transport_closed(Sock, S); +handle_info({tcp_error, Sock, Reason}=_Info, S) -> + io:format(user, "~s:handle_info: ~p\n", [?MODULE, _Info]), + transport_error(Sock, Reason, S); +handle_info(_Info, S) -> + io:format(user, "~s:handle_info: ~p\n", [?MODULE, _Info]), + {noreply, S}. + +terminate(_Reason, _S) -> + io:format(user, "~s:terminate: ~p\n", [?MODULE, _Reason]), + ok. + +code_change(_OldVsn, S, _Extra) -> + {ok, S}. + +%% Internal functions, or copy-n-paste'd thingie + +%%%% Just copied and will be removed %%%% + +%% TODO: sock opts should be migrated to ranch equivalent +%% run_listen_server(#state{flu_name=FluName, tcp_port=TcpPort}=S) -> +%% register(make_listener_regname(FluName), self()), +%% SockOpts = ?PB_PACKET_OPTS ++ +%% [{reuseaddr, true}, {mode, binary}, {active, false}, +%% {backlog,8192}], +%% case gen_tcp:listen(TcpPort, SockOpts) of +%% {ok, LSock} -> +%% proc_lib:init_ack({ok, self()}), +%% listen_server_loop(LSock, S); +%% Else -> +%% error_logger:warning_msg("~s:run_listen_server: " +%% "listen to TCP port ~w: ~w\n", +%% [?MODULE, TcpPort, Else]), +%% exit({?MODULE, run_listen_server, tcp_port, TcpPort, Else}) +%% end. + +%% listen_server_loop(LSock, S) -> +%% {ok, Sock} = gen_tcp:accept(LSock), +%% spawn_link(fun() -> net_server_loop(Sock, S) end), +%% listen_server_loop(LSock, S). + +%% net_server_loop(Sock, S) -> +%% case gen_tcp:recv(Sock, 0, ?SERVER_CMD_READ_TIMEOUT) of +%% {ok, Bin} -> +%% {RespBin, S2} = +%% case machi_pb:decode_mpb_ll_request(Bin) of +%% LL_req when LL_req#mpb_ll_request.do_not_alter == 2 -> +%% {R, NewS} = do_pb_ll_request(LL_req, S), +%% {maybe_encode_response(R), mode(low, NewS)}; +%% _ -> +%% HL_req = machi_pb:decode_mpb_request(Bin), +%% 1 = HL_req#mpb_request.do_not_alter, +%% {R, NewS} = do_pb_hl_request(HL_req, make_high_clnt(S)), +%% {machi_pb:encode_mpb_response(R), mode(high, NewS)} +%% end, +%% if RespBin == async_no_response -> +%% net_server_loop(Sock, S2); +%% true -> +%% case gen_tcp:send(Sock, RespBin) of +%% ok -> +%% net_server_loop(Sock, S2); +%% {error, _} -> +%% (catch gen_tcp:close(Sock)), +%% exit(normal) +%% end +%% end; +%% {error, SockError} -> +%% Msg = io_lib:format("Socket error ~w", [SockError]), +%% R = #mpb_ll_response{req_id= <<>>, +%% generic=#mpb_errorresp{code=1, msg=Msg}}, +%% _Resp = machi_pb:encode_mpb_ll_response(R), +%% %% TODO: Weird that sometimes neither catch nor try/catch +%% %% can prevent OTP's SASL from logging an error here. +%% %% Error in process <0.545.0> with exit value: {badarg,[{erlang,port_command,....... +%% %% TODO: is this what causes the intermittent PULSE deadlock errors? +%% %% _ = (catch gen_tcp:send(Sock, _Resp)), timer:sleep(1000), +%% (catch gen_tcp:close(Sock)), +%% exit(normal) +%% end. + +%%%% Common transport handling + +transport_received(Sock, Bin, #state{transport=Transport}=S) -> + {RespBin, S2} = + case machi_pb:decode_mpb_ll_request(Bin) of + LL_req when LL_req#mpb_ll_request.do_not_alter == 2 -> + {R, NewS} = do_pb_ll_request(LL_req, S), + {maybe_encode_response(R), mode(low, NewS)}; + _ -> + HL_req = machi_pb:decode_mpb_request(Bin), + 1 = HL_req#mpb_request.do_not_alter, + {R, NewS} = do_pb_hl_request(HL_req, make_high_clnt(S)), + {machi_pb:encode_mpb_response(R), mode(high, NewS)} + end, + if RespBin == async_no_response -> + {noreply, S2}; + true -> + case Transport:send(Sock, RespBin) of + ok -> + {noreply, S2}; + {error, Reason} -> + transport_error(Sock, Reason, S2) + end + end. + +transport_closed(Sock, S) -> + (catch gen_tcp:close(Sock)), + {stop, normal, S#state{sock=undefined}}. + +transport_error(Sock, Reason, S) -> + Msg = io_lib:format("Socket error ~w", [SockError]), + R = #mpb_ll_response{req_id= <<>>, + generic=#mpb_errorresp{code=1, msg=Msg}}, + _Resp = machi_pb:encode_mpb_ll_response(R), + %% TODO of TODO comments: comments below with four %s are copy-n-paste'd, + %% then it should be considered they are still open and should be addressed. + %%%% TODO: Weird that sometimes neither catch nor try/catch + %%%% can prevent OTP's SASL from logging an error here. + %%%% Error in process <0.545.0> with exit value: {badarg,[{erlang,port_command,....... + %%%% TODO: is this what causes the intermittent PULSE deadlock errors? + %%%% _ = (catch gen_tcp:send(Sock, _Resp)), timer:sleep(1000), + (catch gen_tcp:close(Sock)), + %% TODO: better to exit with `Reason'? + exit(normal). + +maybe_encode_response(async_no_response=X) -> + X; +maybe_encode_response(R) -> + machi_pb:encode_mpb_ll_response(R). + +%%%% Not categorized / not-yet-well-understood items +%% TODO: may be external API +mode(Mode, #state{pb_mode=undefined}=S) -> + S#state{pb_mode=Mode}; +mode(_, S) -> + S. + + +%%%% Low PB mode %%%% + +do_pb_ll_request(#mpb_ll_request{req_id=ReqID}, #state{pb_mode=high}=S) -> + Result = {high_error, 41, "Low protocol request while in high mode"}, + {machi_pb_translate:to_pb_response(ReqID, unused, Result), S}; +do_pb_ll_request(PB_request, S) -> + Req = machi_pb_translate:from_pb_request(PB_request), + {ReqID, Cmd, Result, S2} = + case Req of + {RqID, {LowCmd, _}=CMD} + when LowCmd == low_proj; + LowCmd == low_wedge_status; LowCmd == low_list_files -> + %% Skip wedge check for projection commands! + %% Skip wedge check for these unprivileged commands + {Rs, NewS} = do_pb_ll_request3(CMD, S), + {RqID, CMD, Rs, NewS}; + {RqID, CMD} -> + EpochID = element(2, CMD), % by common convention + {Rs, NewS} = do_pb_ll_request2(EpochID, CMD, S), + {RqID, CMD, Rs, NewS} + end, + {machi_pb_translate:to_pb_response(ReqID, Cmd, Result), S2}. + +do_pb_ll_request2(EpochID, CMD, S) -> + {Wedged_p, CurrentEpochID} = ets:lookup_element(S#state.etstab, epoch, 2), + if Wedged_p == true -> + {{error, wedged}, S#state{epoch_id=CurrentEpochID}}; + is_tuple(EpochID) + andalso + EpochID /= CurrentEpochID -> + {Epoch, _} = EpochID, + {CurrentEpoch, _} = CurrentEpochID, + if Epoch < CurrentEpoch -> + ok; + true -> + %% We're at same epoch # but different checksum, or + %% we're at a newer/bigger epoch #. + _ = wedge_myself(S#state.flu_name, CurrentEpochID), + ok + end, + {{error, bad_epoch}, S#state{epoch_id=CurrentEpochID}}; + true -> + do_pb_ll_request3(CMD, S#state{epoch_id=CurrentEpochID}) + end. + +%% Witness status does not matter below. +do_pb_ll_request3({low_echo, _BogusEpochID, Msg}, S) -> + {Msg, S}; +do_pb_ll_request3({low_auth, _BogusEpochID, _User, _Pass}, S) -> + {-6, S}; +do_pb_ll_request3({low_wedge_status, _EpochID}, S) -> + {do_server_wedge_status(S), S}; +do_pb_ll_request3({low_proj, PCMD}, S) -> + {do_server_proj_request(PCMD, S), S}; +%% Witness status *matters* below +do_pb_ll_request3({low_append_chunk, _EpochID, CoC_Namespace, CoC_Locator, + Prefix, Chunk, CSum_tag, + CSum, ChunkExtra}, + #state{witness=false}=S) -> + {do_server_append_chunk(CoC_Namespace, CoC_Locator, + Prefix, Chunk, CSum_tag, CSum, + ChunkExtra, S), S}; +do_pb_ll_request3({low_write_chunk, _EpochID, File, Offset, Chunk, CSum_tag, + CSum}, + #state{witness=false}=S) -> + {do_server_write_chunk(File, Offset, Chunk, CSum_tag, CSum, S), S}; +do_pb_ll_request3({low_read_chunk, _EpochID, File, Offset, Size, Opts}, + #state{witness=false} = S) -> + {do_server_read_chunk(File, Offset, Size, Opts, S), S}; +do_pb_ll_request3({low_trim_chunk, _EpochID, File, Offset, Size, TriggerGC}, + #state{witness=false}=S) -> + {do_server_trim_chunk(File, Offset, Size, TriggerGC, S), S}; +do_pb_ll_request3({low_checksum_list, _EpochID, File}, + #state{witness=false}=S) -> + {do_server_checksum_listing(File, S), S}; +do_pb_ll_request3({low_list_files, _EpochID}, + #state{witness=false}=S) -> + {do_server_list_files(S), S}; +do_pb_ll_request3({low_delete_migration, _EpochID, File}, + #state{witness=false}=S) -> + {do_server_delete_migration(File, S), + #state{witness=false}=S}; +do_pb_ll_request3({low_trunc_hack, _EpochID, File}, + #state{witness=false}=S) -> + {do_server_trunc_hack(File, S), S}; +do_pb_ll_request3(_, #state{witness=true}=S) -> + {{error, bad_arg}, S}. % TODO: new status code?? + +do_server_proj_request({get_latest_epochid, ProjType}, + #state{proj_store=ProjStore}) -> + machi_projection_store:get_latest_epochid(ProjStore, ProjType); +do_server_proj_request({read_latest_projection, ProjType}, + #state{proj_store=ProjStore}) -> + machi_projection_store:read_latest_projection(ProjStore, ProjType); +do_server_proj_request({read_projection, ProjType, Epoch}, + #state{proj_store=ProjStore}) -> + machi_projection_store:read(ProjStore, ProjType, Epoch); +do_server_proj_request({write_projection, ProjType, Proj}, + #state{flu_name=FluName, proj_store=ProjStore}) -> + if Proj#projection_v1.epoch_number == ?SPAM_PROJ_EPOCH -> + %% io:format(user, "DBG ~s ~w ~P\n", [?MODULE, ?LINE, Proj, 5]), + Chmgr = machi_flu_psup:make_fitness_regname(FluName), + [Map] = Proj#projection_v1.dbg, + catch machi_fitness:send_fitness_update_spam( + Chmgr, Proj#projection_v1.author_server, Map); + true -> + catch machi_projection_store:write(ProjStore, ProjType, Proj) + end; +do_server_proj_request({get_all_projections, ProjType}, + #state{proj_store=ProjStore}) -> + machi_projection_store:get_all_projections(ProjStore, ProjType); +do_server_proj_request({list_all_projections, ProjType}, + #state{proj_store=ProjStore}) -> + machi_projection_store:list_all_projections(ProjStore, ProjType); +do_server_proj_request({kick_projection_reaction}, + #state{flu_name=FluName}) -> + %% Tell my chain manager that it might want to react to + %% this new world. + Chmgr = machi_chain_manager1:make_chmgr_regname(FluName), + spawn(fun() -> + catch machi_chain_manager1:trigger_react_to_env(Chmgr) + end), + async_no_response. + +do_server_append_chunk(CoC_Namespace, CoC_Locator, + Prefix, Chunk, CSum_tag, CSum, + ChunkExtra, S) -> + case sanitize_prefix(Prefix) of + ok -> + do_server_append_chunk2(CoC_Namespace, CoC_Locator, + Prefix, Chunk, CSum_tag, CSum, + ChunkExtra, S); + _ -> + {error, bad_arg} + end. + +do_server_append_chunk2(CoC_Namespace, CoC_Locator, + Prefix, Chunk, CSum_tag, Client_CSum, + ChunkExtra, #state{flu_name=FluName, + epoch_id=EpochID}=_S) -> + %% TODO: Do anything with PKey? + try + TaggedCSum = check_or_make_tagged_checksum(CSum_tag, Client_CSum,Chunk), + R = {seq_append, self(), CoC_Namespace, CoC_Locator, + Prefix, Chunk, TaggedCSum, ChunkExtra, EpochID}, + FluName ! R, + receive + {assignment, Offset, File} -> + Size = iolist_size(Chunk), + {ok, {Offset, Size, File}}; + witness -> + {error, bad_arg}; + wedged -> + {error, wedged} + after 10*1000 -> + {error, partition} + end + catch + throw:{bad_csum, _CS} -> + {error, bad_checksum}; + error:badarg -> + error_logger:error_msg("Message send to ~p gave badarg, make certain server is running with correct registered name\n", [?MODULE]), + {error, bad_arg} + end. + +do_server_write_chunk(File, Offset, Chunk, CSum_tag, CSum, #state{flu_name=FluName}) -> + case sanitize_file_string(File) of + ok -> + case machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}) of + {ok, Pid} -> + Meta = [{client_csum_tag, CSum_tag}, {client_csum, CSum}], + machi_file_proxy:write(Pid, Offset, Meta, Chunk); + {error, trimmed} = Error -> + Error + end; + _ -> + {error, bad_arg} + end. + +do_server_read_chunk(File, Offset, Size, Opts, #state{flu_name=FluName})-> + case sanitize_file_string(File) of + ok -> + case machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}) of + {ok, Pid} -> + case machi_file_proxy:read(Pid, Offset, Size, Opts) of + %% XXX FIXME + %% For now we are omiting the checksum data because it blows up + %% protobufs. + {ok, ChunksAndTrimmed} -> {ok, ChunksAndTrimmed}; + Other -> Other + end; + {error, trimmed} = Error -> + Error + end; + _ -> + {error, bad_arg} + end. + +do_server_trim_chunk(File, Offset, Size, TriggerGC, #state{flu_name=FluName}) -> + lager:debug("Hi there! I'm trimming this: ~s, (~p, ~p), ~p~n", + [File, Offset, Size, TriggerGC]), + case sanitize_file_string(File) of + ok -> + case machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}) of + {ok, Pid} -> + machi_file_proxy:trim(Pid, Offset, Size, TriggerGC); + {error, trimmed} = Trimmed -> + %% Should be returned back to (maybe) trigger repair + Trimmed + end; + _ -> + {error, bad_arg} + end. + +do_server_checksum_listing(File, #state{flu_name=FluName, data_dir=DataDir}=_S) -> + case sanitize_file_string(File) of + ok -> + case machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}) of + {ok, Pid} -> + {ok, List} = machi_file_proxy:checksum_list(Pid), + Bin = erlang:term_to_binary(List), + if byte_size(Bin) > (?PB_MAX_MSG_SIZE - 1024) -> + %% TODO: Fix this limitation by streaming the + %% binary in multiple smaller PB messages. + %% Also, don't read the file all at once. ^_^ + error_logger:error_msg("~s:~w oversize ~s\n", + [?MODULE, ?LINE, DataDir]), + {error, bad_arg}; + true -> + {ok, Bin} + end; + {error, trimmed} -> + {error, trimmed} + end; + _ -> + {error, bad_arg} + end. + +do_server_list_files(#state{data_dir=DataDir}=_S) -> + {_, WildPath} = machi_util:make_data_filename(DataDir, ""), + Files = filelib:wildcard("*", WildPath), + {ok, [begin + {ok, FI} = file:read_file_info(WildPath ++ "/" ++ File), + Size = FI#file_info.size, + {Size, File} + end || File <- Files]}. + +do_server_wedge_status(S) -> + {Wedged_p, CurrentEpochID0} = ets:lookup_element(S#state.etstab, epoch, 2), + CurrentEpochID = if CurrentEpochID0 == undefined -> + ?DUMMY_PV1_EPOCH; + true -> + CurrentEpochID0 + end, + {Wedged_p, CurrentEpochID}. + +do_server_delete_migration(File, #state{data_dir=DataDir}=_S) -> + case sanitize_file_string(File) of + ok -> + {_, Path} = machi_util:make_data_filename(DataDir, File), + case file:delete(Path) of + ok -> + ok; + {error, enoent} -> + {error, no_such_file}; + _ -> + {error, bad_arg} + end; + _ -> + {error, bad_arg} + end. + +do_server_trunc_hack(File, #state{data_dir=DataDir}=_S) -> + case sanitize_file_string(File) of + ok -> + {_, Path} = machi_util:make_data_filename(DataDir, File), + case file:open(Path, [read, write, binary, raw]) of + {ok, FH} -> + try + {ok, ?MINIMUM_OFFSET} = file:position(FH, + ?MINIMUM_OFFSET), + ok = file:truncate(FH), + ok + after + file:close(FH) + end; + {error, enoent} -> + {error, no_such_file}; + _ -> + {error, bad_arg} + end; + _ -> + {error, bad_arg} + end. + +sanitize_file_string(Str) -> + case has_no_prohibited_chars(Str) andalso machi_util:is_valid_filename(Str) of + true -> ok; + false -> error + end. + +has_no_prohibited_chars(Str) -> + case re:run(Str, "/") of + nomatch -> + true; + _ -> + true + end. + +sanitize_prefix(Prefix) -> + %% We are using '^' as our component delimiter + case re:run(Prefix, "/|\\^") of + nomatch -> + ok; + _ -> + error + end. + +check_or_make_tagged_checksum(?CSUM_TAG_NONE, _Client_CSum, Chunk) -> + %% TODO: If the client was foolish enough to use + %% this type of non-checksum, then the client gets + %% what it deserves wrt data integrity, alas. In + %% the client-side Chain Replication method, each + %% server will calculated this independently, which + %% isn't exactly what ought to happen for best data + %% integrity checking. In server-side CR, the csum + %% should be calculated by the head and passed down + %% the chain together with the value. + CS = machi_util:checksum_chunk(Chunk), + machi_util:make_tagged_csum(server_sha, CS); +check_or_make_tagged_checksum(?CSUM_TAG_CLIENT_SHA, Client_CSum, Chunk) -> + CS = machi_util:checksum_chunk(Chunk), + if CS == Client_CSum -> + machi_util:make_tagged_csum(server_sha, + Client_CSum); + true -> + throw({bad_csum, CS}) + end. + + + + +%%%% High PB mode %%%% + +do_pb_hl_request(#mpb_request{req_id=ReqID}, #state{pb_mode=low}=S) -> + Result = {low_error, 41, "High protocol request while in low mode"}, + {machi_pb_translate:to_pb_response(ReqID, unused, Result), S}; +do_pb_hl_request(PB_request, S) -> + {ReqID, Cmd} = machi_pb_translate:from_pb_request(PB_request), + {Result, S2} = do_pb_hl_request2(Cmd, S), + {machi_pb_translate:to_pb_response(ReqID, Cmd, Result), S2}. + +do_pb_hl_request2({high_echo, Msg}, S) -> + {Msg, S}; +do_pb_hl_request2({high_auth, _User, _Pass}, S) -> + {-77, S}; +do_pb_hl_request2({high_append_chunk, CoC_Namespace, CoC_Locator, + Prefix, ChunkBin, TaggedCSum, + ChunkExtra}, #state{high_clnt=Clnt}=S) -> + Chunk = {TaggedCSum, ChunkBin}, + Res = machi_cr_client:append_chunk_extra(Clnt, CoC_Namespace, CoC_Locator, + Prefix, Chunk, + ChunkExtra), + {Res, S}; +do_pb_hl_request2({high_write_chunk, File, Offset, ChunkBin, TaggedCSum}, + #state{high_clnt=Clnt}=S) -> + Chunk = {TaggedCSum, ChunkBin}, + Res = machi_cr_client:write_chunk(Clnt, File, Offset, Chunk), + {Res, S}; +do_pb_hl_request2({high_read_chunk, File, Offset, Size, Opts}, + #state{high_clnt=Clnt}=S) -> + Res = machi_cr_client:read_chunk(Clnt, File, Offset, Size, Opts), + {Res, S}; +do_pb_hl_request2({high_trim_chunk, File, Offset, Size}, + #state{high_clnt=Clnt}=S) -> + Res = machi_cr_client:trim_chunk(Clnt, File, Offset, Size), + {Res, S}; +do_pb_hl_request2({high_checksum_list, File}, #state{high_clnt=Clnt}=S) -> + Res = machi_cr_client:checksum_list(Clnt, File), + {Res, S}; +do_pb_hl_request2({high_list_files}, #state{high_clnt=Clnt}=S) -> + Res = machi_cr_client:list_files(Clnt), + {Res, S}. + +make_high_clnt(#state{high_clnt=undefined}=S) -> + {ok, Proj} = machi_projection_store:read_latest_projection( + S#state.proj_store, private), + Ps = [P_srvr || {_, P_srvr} <- orddict:to_list( + Proj#projection_v1.members_dict)], + {ok, Clnt} = machi_cr_client:start_link(Ps), + S#state{high_clnt=Clnt}; +make_high_clnt(S) -> + S. diff --git a/src/machi_sup.erl b/src/machi_sup.erl index d4ab71a..729b6a7 100644 --- a/src/machi_sup.erl +++ b/src/machi_sup.erl @@ -47,8 +47,6 @@ start_link() -> supervisor:start_link({local, ?SERVER}, ?MODULE, []). init([]) -> - %% {_, Ps} = process_info(self(), links), - %% [unlink(P) || P <- Ps], RestartStrategy = one_for_one, MaxRestarts = 1000, MaxSecondsBetweenRestarts = 3600, @@ -59,12 +57,8 @@ init([]) -> Shutdown = ?SHUTDOWN, Type = supervisor, - ServerSup = - {machi_flu_sup, {machi_flu_sup, start_link, []}, - Restart, Shutdown, Type, []}, - - {ok, {SupFlags, [ServerSup]}}. - - %% AChild = {'AName', {'AModule', start_link, []}, - %% Restart, Shutdown, Type, ['AModule']}, - %% {ok, {SupFlags, [AChild]}}. + FluSup = {machi_flu_sup, {machi_flu_sup, start_link, []}, + Restart, Shutdown, Type, []}, + RanchSup = {ranch_sup, {ranch_sup, start_link, []}, + Restart, Shutdown, supervisor, [ranch_sup]}, + {ok, {SupFlags, [FluSup, RanchSup]}}. From 7614910f36ca2074ae523ff6d2abe04de44ec3f4 Mon Sep 17 00:00:00 2001 From: Shunichi Shinohara Date: Wed, 18 Nov 2015 17:00:14 +0900 Subject: [PATCH 3/9] Initialize FLU package with ranch listener --- src/machi_flu1.erl | 29 ++--- src/machi_flu_psup.erl | 14 +- src/machi_listener_sup.erl | 78 ++++++++++++ src/machi_pb_protocol.erl | 253 +++++++++++++++---------------------- 4 files changed, 193 insertions(+), 181 deletions(-) create mode 100644 src/machi_listener_sup.erl diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl index c02772d..56c412c 100644 --- a/src/machi_flu1.erl +++ b/src/machi_flu1.erl @@ -58,10 +58,9 @@ -export([start_link/1, stop/1, update_wedge_state/3, wedge_myself/2]). --export([make_listener_regname/1, make_projection_server_regname/1]). +-export([make_projection_server_regname/1]). %% TODO: remove or replace in OTP way after gen_*'ified -export([main2/4, run_append_server/2, - %% run_listen_server/1, current_state/1, format_state/1]). -record(state, { @@ -69,14 +68,9 @@ proj_store :: pid(), witness = false :: boolean(), append_pid :: pid(), - tcp_port :: non_neg_integer(), - data_dir :: string(), wedged = true :: boolean(), etstab :: ets:tid(), epoch_id :: 'undefined' | machi_dt:epoch_id(), - pb_mode = undefined :: 'undefined' | 'high' | 'low', - high_clnt :: 'undefined' | pid(), - trim_table :: ets:tid(), props = [] :: list() % proplist }). @@ -153,8 +147,6 @@ main2(FluName, TcpPort, DataDir, Props) -> S0 = #state{flu_name=FluName, proj_store=ProjectionPid, - tcp_port=TcpPort, - data_dir=DataDir, wedged=Wedged_p, witness=Witness_p, etstab=ets_table_name(FluName), @@ -168,7 +160,8 @@ main2(FluName, TcpPort, DataDir, Props) -> ok end, S1 = S0#state{append_pid=AppendPid}, - {ok, ListenPid} = start_listen_server(S1), + {ok, ListenerPid} = start_listen_server(TcpPort, DataDir, S1), + io:format(user, "Listener started: ~w~n", [{FluName, ListenerPid}]), Config_e = machi_util:make_config_filename(DataDir, "unused"), ok = filelib:ensure_dir(Config_e), @@ -180,22 +173,23 @@ main2(FluName, TcpPort, DataDir, Props) -> put(flu_flu_name, FluName), put(flu_append_pid, S1#state.append_pid), put(flu_projection_pid, ProjectionPid), - put(flu_listen_pid, ListenPid), + put(flu_listen_pid, ListenerPid), proc_lib:init_ack({ok, self()}), receive killme -> ok end, (catch exit(S1#state.append_pid, kill)), (catch exit(ProjectionPid, kill)), - (catch exit(ListenPid, kill)), + (catch exit(ListenerPid, kill)), ok. start_append_server(S, AckPid) -> proc_lib:start_link(?MODULE, run_append_server, [AckPid, S], ?INIT_TIMEOUT). -start_listen_server(_S) -> - %% FIXMEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE - %% proc_lib:start_link(?MODULE, run_listen_server, [S], ?INIT_TIMEOUT). - {ok, dummy}. +start_listen_server(TcpPort, DataDir, + #state{flu_name=FluName, witness=Witness, etstab=EtsTab, + proj_store=ProjStore}=_S) -> + machi_listener_sup:start_listener(FluName, TcpPort, Witness, DataDir, + EtsTab, ProjStore). run_append_server(FluPid, #state{flu_name=Name, wedged=Wedged_p,epoch_id=EpochId}=S) -> @@ -307,9 +301,6 @@ handle_append(CoC_Namespace, CoC_Locator, Error end. -make_listener_regname(BaseName) -> - list_to_atom(atom_to_list(BaseName) ++ "_listener"). - %% This is the name of the projection store that is spawned by the %% *flu*, for use primarily in testing scenarios. In normal use, we %% ought to be using the OTP style of managing processes, via diff --git a/src/machi_flu_psup.erl b/src/machi_flu_psup.erl index b2ccdcc..55584d5 100644 --- a/src/machi_flu_psup.erl +++ b/src/machi_flu_psup.erl @@ -143,21 +143,19 @@ init([FluName, TcpPort, DataDir, Props0]) -> FProxySupSpec = machi_file_proxy_sup:child_spec(FluName), - ListenerRegName = machi_flu1:make_listener_regname(FluName), - NbAcceptors = 100, - ListenerSpec = ranch:child_spec(ListenerRegName, NbAcceptors, - ranch_tcp, [{port, TcpPort}], - machi_pb_protocol, []), + ListenerSupSpec = {machi_listener_sup:make_listener_sup_name(FluName), + {machi_listener_sup, start_link, [FluName]}, + permanent, ?SHUTDOWN, supervisor, []}, + FluSpec = {FluName, {machi_flu1, start_link, - [ [{FluName, TcpPort+1, DataDir}|Props] ]}, + [ [{FluName, TcpPort, DataDir}|Props] ]}, permanent, ?SHUTDOWN, worker, []}, - {ok, {SupFlags, [ ProjSpec, FitnessSpec, MgrSpec, FProxySupSpec, FNameMgrSpec, MetaSupSpec, - FluSpec, ListenerSpec]}}. + ListenerSupSpec, FluSpec]}}. make_flu_regname(FluName) when is_atom(FluName) -> FluName. diff --git a/src/machi_listener_sup.erl b/src/machi_listener_sup.erl new file mode 100644 index 0000000..dc004ce --- /dev/null +++ b/src/machi_listener_sup.erl @@ -0,0 +1,78 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +%% @doc This is the supervisor to hold ranch listener for sigle FLU, +%% holds at most one child worker. + +%% TODO: This supervisor is maybe useless. First introduced by workaround +%% to start listener dynamically in flu1 initialization time. +%% Because psup is blocked in flu1 initialization time, adding a child +%% to psup leads to deadlock. +%% By the result of refactoring process, if initialization can be done +%% only by static arguments, then this supervisor should be removed +%% and add listener as a direct child of psup. + +-module(machi_listener_sup). +-behaviour(supervisor). + +%% public API +-export([start_link/1, + start_listener/6, + stop_listener/1, + make_listener_sup_name/1, + make_listener_name/1]). + +%% supervisor callback +-export([init/1]). + +-define(BACKLOG, 8192). + +start_link(FluName) -> + supervisor:start_link({local, make_listener_sup_name(FluName)}, ?MODULE, []). + +start_listener(FluName, TcpPort, Witness, DataDir, EpochTab, ProjStore) -> + supervisor:start_child(make_listener_sup_name(FluName), + child_spec(FluName, TcpPort, Witness, DataDir, + EpochTab, ProjStore)). + +stop_listener(FluName) -> + SupName = make_listener_sup_name(FluName), + ListenerName = make_listener_name(FluName), + ok = supervisor:terminate_child(SupName, ListenerName), + ok = supervisor:delete_child(SupName, ListenerName). + +make_listener_sup_name(FluName) when is_atom(FluName) -> + list_to_atom(atom_to_list(FluName) ++ "_listener_sup"). + +make_listener_name(FluName) -> + list_to_atom(atom_to_list(FluName) ++ "_listener"). + +init([]) -> + SupFlags = {one_for_one, 1000, 10}, + {ok, {SupFlags, []}}. + +child_spec(FluName, TcpPort, Witness, DataDir, EpochTab, ProjStore) -> + ListenerName = make_listener_name(FluName), + NbAcceptors = 100, + TcpOpts = [{port, TcpPort}, {backlog, ?BACKLOG}], + ProtoOpts = [FluName, Witness, DataDir, EpochTab, ProjStore], + ranch:child_spec(ListenerName, NbAcceptors, + ranch_tcp, TcpOpts, + machi_pb_protocol, ProtoOpts). diff --git a/src/machi_pb_protocol.erl b/src/machi_pb_protocol.erl index 28ce626..9368a2a 100644 --- a/src/machi_pb_protocol.erl +++ b/src/machi_pb_protocol.erl @@ -19,7 +19,7 @@ %% ------------------------------------------------------------------- %% @doc Ranch protocol callback module to handle PB protocol over -%% transport +%% transport, including both high and low modes. %% TODO %% - Two modes, high and low should be separated at listener level? @@ -28,172 +28,114 @@ -behaviour(gen_server). -behaviour(ranch_protocol). + -export([start_link/4]). -export([init/1]). -export([handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +-include_lib("kernel/include/file.hrl"). + -include("machi.hrl"). -include("machi_pb.hrl"). -include("machi_projection.hrl"). --define(V(X,Y), ok). -%% -include("machi_verbose.hrl"). -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). -endif. % TEST --record(state, {ref, - socket, - transport, - opts, - pb_mode, - data_dir, - witness, - %% - Used in projection related requests in low mode - %% - Used in spawning CR client in high mode - proj_store, - %%%% Low mode only - %% Current best knowledge, used for wedge_self / bad_epoch check - epoch_id, - %% Used in dispatching append_chunk* reqs to the - %% append serializing process - flu_name, - %% Stored in ETS before factorization, can be stored in the recored? - wedged, - %% Used in server_wedge_status to lookup the table - etstab, - %%%% High mode only - high_clnt, - %%%% to be removed - eof - }). +-record(state, { + %% Transport related items passed from Ranch + ref :: ranch:ref(), + socket :: socket(), + transport :: module(), -%% -record(state, { - %% used in append serializer to trigger chain mgr react_to_env -%% flu_name :: atom(), -%% proj_store :: pid(), -%% witness = false :: boolean(), -%% append_pid :: pid(), -%% tcp_port :: non_neg_integer(), -%% data_dir :: string(), -%% wedged = true :: boolean(), -%% etstab :: ets:tid(), -%% epoch_id :: 'undefined' | machi_dt:epoch_id(), -%% pb_mode = undefined :: 'undefined' | 'high' | 'low', -%% high_clnt :: 'undefined' | pid(), -%% trim_table :: ets:tid(), -%% props = [] :: list() % proplist -%% }). + %% Machi application related items below + data_dir :: string(), + witness :: boolean(), + pb_mode :: undefined | high | low, + %% - Used in projection related requests in low mode + %% - Used in spawning CR client in high mode + proj_store :: pid(), --spec start_link(ranch:ref(), any(), module(), any()) -> {ok, pid()}. -start_link(Ref, Socket, Transport, Opts) -> - proc_lib:start_link(?MODULE, init, [#state{ref=Ref, socket=Socket, - transport=Transport, - opts=Opts}]). + %% Low mode only + %% Current best knowledge, used for wedge_self / bad_epoch check + epoch_id :: undefined | machi_dt:epoch_id(), + %% Used in dispatching append_chunk* reqs to the + %% append serializing process + flu_name :: atom(), + %% Used in server_wedge_status to lookup the table + epoch_tab :: ets:tid(), -init(#state{ref=Ref, socket=Socket, transport=Transport, opts=_Opts}=State) -> - ok = proc_lib:init_ack({ok, self()}), - %% TODO: Perform any required state initialization here. - ok = ranch:accept_ack(Ref), - ok = Transport:setopts(Socket, [{active, once}]), - gen_server:enter_loop(?MODULE, [], State). + %% High mode only + high_clnt :: pid(), + + %% anything you want + props = [] :: list() % proplist + }). + +-type socket() :: any(). +-type state() :: #state{}. + +-spec start_link(ranch:ref(), socket(), module(), [term()]) -> {ok, pid()}. +start_link(Ref, Socket, Transport, [FluName, Witness, DataDir, EpochTab, ProjStore]) -> + proc_lib:start_link(?MODULE, init, [#state{ref=Ref, + socket=Socket, + transport=Transport, + flu_name=FluName, + witness=Witness, + data_dir=DataDir, + epoch_tab=EpochTab, + proj_store=ProjStore}]). + +-spec init(state()) -> no_return(). +init(#state{ref=Ref, socket=Socket, transport=Transport}=State) -> + ok = proc_lib:init_ack({ok, self()}), + ok = ranch:accept_ack(Ref), + {_Wedged_p, CurrentEpochID} = lookup_epoch(State), + ok = Transport:setopts(Socket, [{active, once}|?PB_PACKET_OPTS]), + gen_server:enter_loop(?MODULE, [], State#state{epoch_id=CurrentEpochID}). handle_call(Request, _From, S) -> + lager:warning("~s:handle_call UNKNOWN message: ~w", [?MODULE, Request]), Reply = {error, {unknown_message, Request}}, {reply, Reply, S}. handle_cast(_Msg, S) -> - io:format(user, "~s:handle_cast: ~p\n", [?MODULE, _Msg]), + lager:warning("~s:handle_cast UNKNOWN message: ~w", [?MODULE, _Msg]), {noreply, S}. -handle_info({tcp, Sock, Data}=_Info, S) -> - io:format(user, "~s:handle_info: ~p\n", [?MODULE, _Info]), +%% TODO: Other transport support needed?? TLS/SSL, SCTP +handle_info({tcp, Sock, Data}=_Info, #state{socket=Sock}=S) -> + lager:debug("~s:handle_info: ~w", [?MODULE, _Info]), transport_received(Sock, Data, S); -handle_info({tcp_closed, Sock}=_Info, S) -> - io:format(user, "~s:handle_info: ~p\n", [?MODULE, _Info]), +handle_info({tcp_closed, Sock}=_Info, #state{socket=Sock}=S) -> + lager:debug("~s:handle_info: ~w", [?MODULE, _Info]), transport_closed(Sock, S); -handle_info({tcp_error, Sock, Reason}=_Info, S) -> - io:format(user, "~s:handle_info: ~p\n", [?MODULE, _Info]), +handle_info({tcp_error, Sock, Reason}=_Info, #state{socket=Sock}=S) -> + lager:debug("~s:handle_info: ~w", [?MODULE, _Info]), transport_error(Sock, Reason, S); handle_info(_Info, S) -> - io:format(user, "~s:handle_info: ~p\n", [?MODULE, _Info]), + lager:warning("~s:handle_info UNKNOWN message: ~w", [?MODULE, _Info]), {noreply, S}. -terminate(_Reason, _S) -> - io:format(user, "~s:terminate: ~p\n", [?MODULE, _Reason]), +terminate(_Reason, #state{socket=undefined}=_S) -> + lager:debug("~s:terminate: ~w", [?MODULE, _Reason]), + ok; +terminate(_Reason, #state{socket=Socket}=_S) -> + lager:debug("~s:terminate: ~w", [?MODULE, _Reason]), + (catch gen_tcp:close(Socket)), ok. code_change(_OldVsn, S, _Extra) -> {ok, S}. -%% Internal functions, or copy-n-paste'd thingie - -%%%% Just copied and will be removed %%%% - -%% TODO: sock opts should be migrated to ranch equivalent -%% run_listen_server(#state{flu_name=FluName, tcp_port=TcpPort}=S) -> -%% register(make_listener_regname(FluName), self()), -%% SockOpts = ?PB_PACKET_OPTS ++ -%% [{reuseaddr, true}, {mode, binary}, {active, false}, -%% {backlog,8192}], -%% case gen_tcp:listen(TcpPort, SockOpts) of -%% {ok, LSock} -> -%% proc_lib:init_ack({ok, self()}), -%% listen_server_loop(LSock, S); -%% Else -> -%% error_logger:warning_msg("~s:run_listen_server: " -%% "listen to TCP port ~w: ~w\n", -%% [?MODULE, TcpPort, Else]), -%% exit({?MODULE, run_listen_server, tcp_port, TcpPort, Else}) -%% end. - -%% listen_server_loop(LSock, S) -> -%% {ok, Sock} = gen_tcp:accept(LSock), -%% spawn_link(fun() -> net_server_loop(Sock, S) end), -%% listen_server_loop(LSock, S). - -%% net_server_loop(Sock, S) -> -%% case gen_tcp:recv(Sock, 0, ?SERVER_CMD_READ_TIMEOUT) of -%% {ok, Bin} -> -%% {RespBin, S2} = -%% case machi_pb:decode_mpb_ll_request(Bin) of -%% LL_req when LL_req#mpb_ll_request.do_not_alter == 2 -> -%% {R, NewS} = do_pb_ll_request(LL_req, S), -%% {maybe_encode_response(R), mode(low, NewS)}; -%% _ -> -%% HL_req = machi_pb:decode_mpb_request(Bin), -%% 1 = HL_req#mpb_request.do_not_alter, -%% {R, NewS} = do_pb_hl_request(HL_req, make_high_clnt(S)), -%% {machi_pb:encode_mpb_response(R), mode(high, NewS)} -%% end, -%% if RespBin == async_no_response -> -%% net_server_loop(Sock, S2); -%% true -> -%% case gen_tcp:send(Sock, RespBin) of -%% ok -> -%% net_server_loop(Sock, S2); -%% {error, _} -> -%% (catch gen_tcp:close(Sock)), -%% exit(normal) -%% end -%% end; -%% {error, SockError} -> -%% Msg = io_lib:format("Socket error ~w", [SockError]), -%% R = #mpb_ll_response{req_id= <<>>, -%% generic=#mpb_errorresp{code=1, msg=Msg}}, -%% _Resp = machi_pb:encode_mpb_ll_response(R), -%% %% TODO: Weird that sometimes neither catch nor try/catch -%% %% can prevent OTP's SASL from logging an error here. -%% %% Error in process <0.545.0> with exit value: {badarg,[{erlang,port_command,....... -%% %% TODO: is this what causes the intermittent PULSE deadlock errors? -%% %% _ = (catch gen_tcp:send(Sock, _Resp)), timer:sleep(1000), -%% (catch gen_tcp:close(Sock)), -%% exit(normal) -%% end. +%% -- private %%%% Common transport handling +-spec transport_received(socket(), machi_dt:chunk(), state()) -> + {noreply, state()}. transport_received(Sock, Bin, #state{transport=Transport}=S) -> {RespBin, S2} = case machi_pb:decode_mpb_ll_request(Bin) of @@ -207,33 +149,36 @@ transport_received(Sock, Bin, #state{transport=Transport}=S) -> {machi_pb:encode_mpb_response(R), mode(high, NewS)} end, if RespBin == async_no_response -> + Transport:setopts(Sock, [{active, once}]), {noreply, S2}; true -> case Transport:send(Sock, RespBin) of ok -> + Transport:setopts(Sock, [{active, once}]), {noreply, S2}; {error, Reason} -> transport_error(Sock, Reason, S2) end end. -transport_closed(Sock, S) -> - (catch gen_tcp:close(Sock)), - {stop, normal, S#state{sock=undefined}}. +-spec transport_closed(socket(), state()) -> {stop, term(), state()}. +transport_closed(_Socket, S) -> + {stop, normal, S}. -transport_error(Sock, Reason, S) -> - Msg = io_lib:format("Socket error ~w", [SockError]), +-spec transport_error(socket(), term(), state()) -> no_return(). +transport_error(Sock, Reason, #state{transport=Transport}=_S) -> + Msg = io_lib:format("Socket error ~w", [Reason]), R = #mpb_ll_response{req_id= <<>>, generic=#mpb_errorresp{code=1, msg=Msg}}, _Resp = machi_pb:encode_mpb_ll_response(R), - %% TODO of TODO comments: comments below with four %s are copy-n-paste'd, + %% TODO for TODO comments: comments below with four %s are copy-n-paste'd, %% then it should be considered they are still open and should be addressed. %%%% TODO: Weird that sometimes neither catch nor try/catch %%%% can prevent OTP's SASL from logging an error here. %%%% Error in process <0.545.0> with exit value: {badarg,[{erlang,port_command,....... %%%% TODO: is this what causes the intermittent PULSE deadlock errors? %%%% _ = (catch gen_tcp:send(Sock, _Resp)), timer:sleep(1000), - (catch gen_tcp:close(Sock)), + (catch Transport:close(Sock)), %% TODO: better to exit with `Reason'? exit(normal). @@ -242,14 +187,11 @@ maybe_encode_response(async_no_response=X) -> maybe_encode_response(R) -> machi_pb:encode_mpb_ll_response(R). -%%%% Not categorized / not-yet-well-understood items -%% TODO: may be external API mode(Mode, #state{pb_mode=undefined}=S) -> S#state{pb_mode=Mode}; mode(_, S) -> S. - %%%% Low PB mode %%%% do_pb_ll_request(#mpb_ll_request{req_id=ReqID}, #state{pb_mode=high}=S) -> @@ -257,24 +199,27 @@ do_pb_ll_request(#mpb_ll_request{req_id=ReqID}, #state{pb_mode=high}=S) -> {machi_pb_translate:to_pb_response(ReqID, unused, Result), S}; do_pb_ll_request(PB_request, S) -> Req = machi_pb_translate:from_pb_request(PB_request), + %% io:format(user, "[~w] do_pb_ll_request Req: ~w~n", [S#state.flu_name, Req]), {ReqID, Cmd, Result, S2} = case Req of - {RqID, {LowCmd, _}=CMD} - when LowCmd == low_proj; - LowCmd == low_wedge_status; LowCmd == low_list_files -> + {RqID, {LowCmd, _}=Cmd0} + when LowCmd =:= low_proj; + LowCmd =:= low_wedge_status; + LowCmd =:= low_list_files -> %% Skip wedge check for projection commands! %% Skip wedge check for these unprivileged commands - {Rs, NewS} = do_pb_ll_request3(CMD, S), - {RqID, CMD, Rs, NewS}; - {RqID, CMD} -> - EpochID = element(2, CMD), % by common convention - {Rs, NewS} = do_pb_ll_request2(EpochID, CMD, S), - {RqID, CMD, Rs, NewS} + {Rs, NewS} = do_pb_ll_request3(Cmd0, S), + {RqID, Cmd0, Rs, NewS}; + {RqID, Cmd0} -> + EpochID = element(2, Cmd0), % by common convention + {Rs, NewS} = do_pb_ll_request2(EpochID, Cmd0, S), + {RqID, Cmd0, Rs, NewS} end, {machi_pb_translate:to_pb_response(ReqID, Cmd, Result), S2}. do_pb_ll_request2(EpochID, CMD, S) -> - {Wedged_p, CurrentEpochID} = ets:lookup_element(S#state.etstab, epoch, 2), + {Wedged_p, CurrentEpochID} = lookup_epoch(S), + %% io:format(user, "{Wedged_p, CurrentEpochID}: ~w~n", [{Wedged_p, CurrentEpochID}]), if Wedged_p == true -> {{error, wedged}, S#state{epoch_id=CurrentEpochID}}; is_tuple(EpochID) @@ -287,7 +232,7 @@ do_pb_ll_request2(EpochID, CMD, S) -> true -> %% We're at same epoch # but different checksum, or %% we're at a newer/bigger epoch #. - _ = wedge_myself(S#state.flu_name, CurrentEpochID), + _ = machi_flu1:wedge_myself(S#state.flu_name, CurrentEpochID), ok end, {{error, bad_epoch}, S#state{epoch_id=CurrentEpochID}}; @@ -295,6 +240,9 @@ do_pb_ll_request2(EpochID, CMD, S) -> do_pb_ll_request3(CMD, S#state{epoch_id=CurrentEpochID}) end. +lookup_epoch(#state{epoch_tab=T}) -> + ets:lookup_element(T, epoch, 2). + %% Witness status does not matter below. do_pb_ll_request3({low_echo, _BogusEpochID, Msg}, S) -> {Msg, S}; @@ -498,7 +446,7 @@ do_server_list_files(#state{data_dir=DataDir}=_S) -> end || File <- Files]}. do_server_wedge_status(S) -> - {Wedged_p, CurrentEpochID0} = ets:lookup_element(S#state.etstab, epoch, 2), + {Wedged_p, CurrentEpochID0} = lookup_epoch(S), CurrentEpochID = if CurrentEpochID0 == undefined -> ?DUMMY_PV1_EPOCH; true -> @@ -589,9 +537,6 @@ check_or_make_tagged_checksum(?CSUM_TAG_CLIENT_SHA, Client_CSum, Chunk) -> throw({bad_csum, CS}) end. - - - %%%% High PB mode %%%% do_pb_hl_request(#mpb_request{req_id=ReqID}, #state{pb_mode=low}=S) -> From a1f5a6ce6201f2760176addacb82fb83c44c3ce8 Mon Sep 17 00:00:00 2001 From: Shunichi Shinohara Date: Thu, 19 Nov 2015 17:42:01 +0900 Subject: [PATCH 4/9] Fix unit test cases around flu1 startup --- src/machi_flu1.erl | 2 +- test/machi_admin_util_test.erl | 4 +- test/machi_chain_manager1_test.erl | 66 +++++++++---------------- test/machi_cinfo_test.erl | 2 +- test/machi_cr_client_test.erl | 4 +- test/machi_flu_psup_test.erl | 4 +- test/machi_pb_high_client_test.erl | 2 +- test/machi_proxy_flu1_client_test.erl | 8 ++- test/machi_psup_test_util.erl | 71 +++++++++++++++++++++++++++ 9 files changed, 109 insertions(+), 54 deletions(-) create mode 100644 test/machi_psup_test_util.erl diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl index 56c412c..41e5125 100644 --- a/src/machi_flu1.erl +++ b/src/machi_flu1.erl @@ -161,7 +161,7 @@ main2(FluName, TcpPort, DataDir, Props) -> end, S1 = S0#state{append_pid=AppendPid}, {ok, ListenerPid} = start_listen_server(TcpPort, DataDir, S1), - io:format(user, "Listener started: ~w~n", [{FluName, ListenerPid}]), + %% io:format(user, "Listener started: ~w~n", [{FluName, ListenerPid}]), Config_e = machi_util:make_config_filename(DataDir, "unused"), ok = filelib:ensure_dir(Config_e), diff --git a/test/machi_admin_util_test.erl b/test/machi_admin_util_test.erl index e00fa31..8a43c39 100644 --- a/test/machi_admin_util_test.erl +++ b/test/machi_admin_util_test.erl @@ -44,6 +44,7 @@ verify_file_checksums_test2() -> TcpPort = 32958, DataDir = "./data", W_props = [{initial_wedged, false}], + {ok, SupPid} = machi_sup:start_link(), machi_flu1_test:start_flu_package(verify1_flu, TcpPort, DataDir, W_props), Sock1 = ?FLU_C:connect(#p_srvr{address=Host, port=TcpPort}), @@ -80,7 +81,8 @@ verify_file_checksums_test2() -> ok after catch ?FLU_C:quit(Sock1), - catch machi_flu1_test:stop_flu_package(verify1_flu) + catch machi_flu1_test:stop_flu_package(verify1_flu), + exit(SupPid, normal) end. -endif. % !PULSE diff --git a/test/machi_chain_manager1_test.erl b/test/machi_chain_manager1_test.erl index e13193f..3f95955 100644 --- a/test/machi_chain_manager1_test.erl +++ b/test/machi_chain_manager1_test.erl @@ -274,42 +274,30 @@ make_prop_ets() -> -endif. % EQC smoke0_test() -> - {ok, _} = machi_partition_simulator:start_link({1,2,3}, 50, 50), - Host = "localhost", TcpPort = 6623, - {ok, FLUa} = machi_flu1:start_link([{a,TcpPort,"./data.a"}]), - Pa = #p_srvr{name=a, address=Host, port=TcpPort}, - Members_Dict = machi_projection:make_members_dict([Pa]), - %% Egadz, more racing on startup, yay. TODO fix. - timer:sleep(1), + {_, [Pa], [M0]} = machi_psup_test_util:start_flu_packages( + 1, "./data.", TcpPort, []), {ok, FLUaP} = ?FLU_PC:start_link(Pa), - {ok, M0} = ?MGR:start_link(a, Members_Dict, [{active_mode, false}]), try pong = ?MGR:ping(M0) after - ok = ?MGR:stop(M0), - ok = machi_flu1:stop(FLUa), ok = ?FLU_PC:quit(FLUaP), - ok = machi_partition_simulator:stop() + machi_psup_test_util:stop_flu_packages() end. smoke1_test_() -> {timeout, 1*60, fun() -> smoke1_test2() end}. smoke1_test2() -> - machi_partition_simulator:start_link({1,2,3}, 100, 0), TcpPort = 62777, - FluInfo = [{a,TcpPort+0,"./data.a"}, {b,TcpPort+1,"./data.b"}, {c,TcpPort+2,"./data.c"}], - P_s = [#p_srvr{name=Name, address="localhost", port=Port} || - {Name,Port,_Dir} <- FluInfo], - - [machi_flu1_test:clean_up_data_dir(Dir) || {_,_,Dir} <- FluInfo], - FLUs = [element(2, machi_flu1:start_link([{Name,Port,Dir}])) || - {Name,Port,Dir} <- FluInfo], - MembersDict = machi_projection:make_members_dict(P_s), - {ok, M0} = ?MGR:start_link(a, MembersDict, [{active_mode,false}]), + MgrOpts = [{active_mode,false}], + {_, Ps, MgrNames} = machi_psup_test_util:start_flu_packages( + 3, "./data.", TcpPort, MgrOpts), + MembersDict = machi_projection:make_members_dict(Ps), + [machi_chain_manager1:set_chain_members(M, MembersDict) || M <- MgrNames], + Ma = hd(MgrNames), try - {ok, P1} = ?MGR:test_calc_projection(M0, false), + {ok, P1} = ?MGR:test_calc_projection(Ma, false), % DERP! Check for race with manager's proxy vs. proj listener ok = lists:foldl( fun(_, {_,{true,[{c,ok},{b,ok},{a,ok}]}}) -> @@ -318,37 +306,28 @@ smoke1_test2() -> ok; % Skip remaining! (_, _Else) -> timer:sleep(10), - ?MGR:test_write_public_projection(M0, P1) + ?MGR:test_write_public_projection(Ma, P1) end, not_ok, lists:seq(1, 1000)), %% Writing the exact same projection multiple times returns ok: %% no change! - {_,{true,[{c,ok},{b,ok},{a,ok}]}} = ?MGR:test_write_public_projection(M0, P1), - {unanimous, P1, Extra1} = ?MGR:test_read_latest_public_projection(M0, false), + {_,{true,[{c,ok},{b,ok},{a,ok}]}} = ?MGR:test_write_public_projection(Ma, P1), + {unanimous, P1, Extra1} = ?MGR:test_read_latest_public_projection(Ma, false), ok after - ok = ?MGR:stop(M0), - [ok = machi_flu1:stop(X) || X <- FLUs], - ok = machi_partition_simulator:stop() + machi_psup_test_util:stop_flu_packages() end. nonunanimous_setup_and_fix_test() -> - machi_partition_simulator:start_link({1,2,3}, 100, 0), TcpPort = 62877, - FluInfo = [{a,TcpPort+0,"./data.a"}, {b,TcpPort+1,"./data.b"}], - P_s = [#p_srvr{name=Name, address="localhost", port=Port} || - {Name,Port,_Dir} <- FluInfo], - - [machi_flu1_test:clean_up_data_dir(Dir) || {_,_,Dir} <- FluInfo], - {ok, SupPid} = machi_flu_sup:start_link(), - Opts = [{active_mode, false}], - %% {ok, Mb} = ?MGR:start_link(b, MembersDict, [{active_mode, false}]++XX), - [{ok,_}=machi_flu_psup:start_flu_package(Name, Port, Dir, Opts) || - {Name,Port,Dir} <- FluInfo], + MgrOpts = [{active_mode,false}], + {_, Ps, [Ma,Mb]} = machi_psup_test_util:start_flu_packages( + 2, "./data.", TcpPort, MgrOpts), + MembersDict = machi_projection:make_members_dict(Ps), + [machi_chain_manager1:set_chain_members(M, MembersDict) || M <- [Ma, Mb]], + [Proxy_a, Proxy_b] = Proxies = - [element(2,?FLU_PC:start_link(P)) || P <- P_s], - MembersDict = machi_projection:make_members_dict(P_s), - [Ma,Mb] = [a_chmgr, b_chmgr], + [element(2, ?FLU_PC:start_link(P)) || P <- Ps], ok = machi_chain_manager1:set_chain_members(Ma, MembersDict, []), ok = machi_chain_manager1:set_chain_members(Mb, MembersDict, []), try @@ -394,9 +373,8 @@ nonunanimous_setup_and_fix_test() -> ok after - exit(SupPid, normal), [ok = ?FLU_PC:quit(X) || X <- Proxies], - ok = machi_partition_simulator:stop() + machi_psup_test_util:stop_flu_packages() end. unanimous_report_test() -> diff --git a/test/machi_cinfo_test.erl b/test/machi_cinfo_test.erl index dcb611e..9699df3 100644 --- a/test/machi_cinfo_test.erl +++ b/test/machi_cinfo_test.erl @@ -48,7 +48,7 @@ setup() -> {c,#p_srvr{name=c, address="localhost", port=5557, props="./data.c"}} ], [os:cmd("rm -rf " ++ P#p_srvr.props) || {_,P} <- Ps], - {ok, SupPid} = machi_flu_sup:start_link(), + {ok, SupPid} = machi_sup:start_link(), %% Only run a, don't run b & c so we have 100% failures talking to them [begin #p_srvr{name=Name, port=Port, props=Dir} = P, diff --git a/test/machi_cr_client_test.erl b/test/machi_cr_client_test.erl index e127032..a370436 100644 --- a/test/machi_cr_client_test.erl +++ b/test/machi_cr_client_test.erl @@ -96,7 +96,7 @@ run_ticks(MgrList) -> ok. smoke_test2() -> - {ok, SupPid} = machi_flu_sup:start_link(), + {ok, SupPid} = machi_sup:start_link(), error_logger:tty(false), try Prefix = <<"pre">>, @@ -208,7 +208,7 @@ io:format(user, "\nFiles = ~p\n", [machi_cr_client:list_files(C1)]), witness_smoke_test_() -> {timeout, 1*60, fun() -> witness_smoke_test2() end}. witness_smoke_test2() -> - SupPid = case machi_flu_sup:start_link() of + SupPid = case machi_sup:start_link() of {ok, P} -> P; {error, {already_started, P1}} -> P1; Other -> error(Other) diff --git a/test/machi_flu_psup_test.erl b/test/machi_flu_psup_test.erl index 1c7b015..afd2d0f 100644 --- a/test/machi_flu_psup_test.erl +++ b/test/machi_flu_psup_test.erl @@ -43,7 +43,7 @@ smoke_test2() -> {c,#p_srvr{name=c, address="localhost", port=5557, props="./data.c"}} ], [os:cmd("rm -rf " ++ P#p_srvr.props) || {_,P} <- Ps], - {ok, SupPid} = machi_flu_sup:start_link(), + {ok, SupPid} = machi_sup:start_link(), try %% Only run a, don't run b & c so we have 100% failures talking to them [begin @@ -74,7 +74,7 @@ partial_stop_restart2() -> PStores = [machi_flu_psup:make_proj_supname(P#p_srvr.name) || {_,P} <-Ps], Dict = orddict:from_list(Ps), [os:cmd("rm -rf " ++ P#p_srvr.props) || {_,P} <- Ps], - {ok, SupPid} = machi_flu_sup:start_link(), + {ok, SupPid} = machi_sup:start_link(), DbgProps = [{initial_wedged, true}], Start = fun({_,P}) -> #p_srvr{name=Name, port=Port, props=Dir} = P, diff --git a/test/machi_pb_high_client_test.erl b/test/machi_pb_high_client_test.erl index 9f6984b..3273bc2 100644 --- a/test/machi_pb_high_client_test.erl +++ b/test/machi_pb_high_client_test.erl @@ -41,7 +41,7 @@ smoke_test2() -> ok = application:set_env(machi, max_file_size, 1024*1024), [os:cmd("rm -rf " ++ P#p_srvr.props) || P <- Ps], - {ok, SupPid} = machi_flu_sup:start_link(), + {ok, SupPid} = machi_sup:start_link(), try [begin #p_srvr{name=Name, port=Port, props=Dir} = P, diff --git a/test/machi_proxy_flu1_client_test.erl b/test/machi_proxy_flu1_client_test.erl index f505f55..422dce1 100644 --- a/test/machi_proxy_flu1_client_test.erl +++ b/test/machi_proxy_flu1_client_test.erl @@ -38,6 +38,7 @@ api_smoke_test() -> W_props = [{active_mode, false},{initial_wedged, false}], Prefix = <<"prefix">>, + {ok, SupPid} = machi_sup:start_link(), machi_flu1_test:start_flu_package(RegName, TcpPort, DataDir, W_props), try @@ -102,7 +103,8 @@ api_smoke_test() -> _ = (catch ?MUT:quit(Prox1)) end after - (catch machi_flu1_test:stop_flu_package(RegName)) + (catch machi_flu1_test:stop_flu_package(RegName)), + exit(SupPid, normal) end. flu_restart_test_() -> @@ -114,6 +116,7 @@ flu_restart_test2() -> TcpPort = 57125, DataDir = "./data.api_smoke_flu2", W_props = [{initial_wedged, false}, {active_mode, false}], + {ok, SupPid} = machi_sup:start_link(), machi_flu1_test:start_flu_package(RegName, TcpPort, DataDir, W_props), try @@ -306,7 +309,8 @@ flu_restart_test2() -> end || Fun <- ExpectedOps ], ok after - _ = (catch ?MUT:quit(Prox1)) + _ = (catch ?MUT:quit(Prox1)), + exit(SupPid, normal) end after (catch machi_flu1_test:stop_flu_package(RegName)) diff --git a/test/machi_psup_test_util.erl b/test/machi_psup_test_util.erl new file mode 100644 index 0000000..6446bf7 --- /dev/null +++ b/test/machi_psup_test_util.erl @@ -0,0 +1,71 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(machi_psup_test_util). +-compile(export_all). + +-ifdef(TEST). +-ifndef(PULSE). + +-include_lib("eunit/include/eunit.hrl"). + +-include("machi.hrl"). +-include("machi_projection.hrl"). + +-define(FLU, machi_flu1). +-define(FLU_C, machi_flu1_client). + +-spec start_flu_packages(pos_integer(), string(), inet:port(), list()) -> + {MachiSup::pid(), Ps::[#p_srvr{}], MgrNames::[atom()]}. +start_flu_packages(FluCount, DirPrefix, BaseTcpPort, MgrOpts) -> + FluInfo = flu_info(FluCount, DirPrefix, BaseTcpPort), + _ = stop_machi_sup(), + _ = [machi_flu1_test:clean_up_data_dir(Dir) || {_, Dir, _} <- FluInfo], + {ok, SupPid} = machi_sup:start_link(), + [{ok, _} = machi_flu_psup:start_flu_package(Name, Port, Dir, MgrOpts) || + {#p_srvr{name=Name, port=Port}, Dir, _} <- FluInfo], + {Ps, _Dirs, MgrNames} = lists:unzip3(FluInfo), + {SupPid, Ps, MgrNames}. + +stop_flu_packages() -> + stop_machi_sup(). + +flu_info(FluCount, DirPrefix, BaseTcpPort) -> + [begin + FLUNameStr = [$a + I - 1], + FLUName = list_to_atom(FLUNameStr), + MgrName = machi_flu_psup:make_mgr_supname(FLUName), + {#p_srvr{name=FLUName, address="localhost", port=BaseTcpPort + I, + props=[{chmgr, MgrName}]}, + DirPrefix ++ "/data.eqc." ++ FLUNameStr, + MgrName} + end || I <- lists:seq(1, FluCount)]. + +stop_machi_sup() -> + case whereis(machi_sup) of + undefined -> ok; + Pid -> + catch exit(whereis(machi_sup), normal), + machi_util:wait_for_death(Pid, 30) + end. + +-endif. % !PULSE +-endif. % TEST + From 14765a7279ec028609b7779deece6fa39ab7deed Mon Sep 17 00:00:00 2001 From: Shunichi Shinohara Date: Mon, 7 Dec 2015 14:46:08 +0900 Subject: [PATCH 5/9] Change ranch callback module name --- src/machi_flu1.erl | 2 +- ...i_pb_protocol.erl => machi_flu1_net_server.erl} | 14 ++++++++------ src/machi_listener_sup.erl | 4 ++-- 3 files changed, 11 insertions(+), 9 deletions(-) rename src/{machi_pb_protocol.erl => machi_flu1_net_server.erl} (98%) diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl index 41e5125..fade024 100644 --- a/src/machi_flu1.erl +++ b/src/machi_flu1.erl @@ -207,7 +207,7 @@ append_server_loop(FluPid, #state{wedged=Wedged_p, receive {seq_append, From, _N, _L, _Prefix, _Chunk, _CSum, _Extra, _EpochID} when Witness_p -> - %% The FLU's net_server_loop() process ought to filter all + %% The FLU's machi_flu1_net_server process ought to filter all %% witness states, but we'll keep this clause for extra %% paranoia. From ! witness, diff --git a/src/machi_pb_protocol.erl b/src/machi_flu1_net_server.erl similarity index 98% rename from src/machi_pb_protocol.erl rename to src/machi_flu1_net_server.erl index 9368a2a..3f11500 100644 --- a/src/machi_pb_protocol.erl +++ b/src/machi_flu1_net_server.erl @@ -24,7 +24,7 @@ %% TODO %% - Two modes, high and low should be separated at listener level? --module(machi_pb_protocol). +-module(machi_flu1_net_server). -behaviour(gen_server). -behaviour(ranch_protocol). @@ -136,7 +136,9 @@ code_change(_OldVsn, S, _Extra) -> -spec transport_received(socket(), machi_dt:chunk(), state()) -> {noreply, state()}. -transport_received(Sock, Bin, #state{transport=Transport}=S) -> +transport_received(Socket, <<"QUIT\n">>, #state{socket=Socket}=S) -> + {stop, normal, S}; +transport_received(Socket, Bin, #state{transport=Transport}=S) -> {RespBin, S2} = case machi_pb:decode_mpb_ll_request(Bin) of LL_req when LL_req#mpb_ll_request.do_not_alter == 2 -> @@ -149,15 +151,15 @@ transport_received(Sock, Bin, #state{transport=Transport}=S) -> {machi_pb:encode_mpb_response(R), mode(high, NewS)} end, if RespBin == async_no_response -> - Transport:setopts(Sock, [{active, once}]), + Transport:setopts(Socket, [{active, once}]), {noreply, S2}; true -> - case Transport:send(Sock, RespBin) of + case Transport:send(Socket, RespBin) of ok -> - Transport:setopts(Sock, [{active, once}]), + Transport:setopts(Socket, [{active, once}]), {noreply, S2}; {error, Reason} -> - transport_error(Sock, Reason, S2) + transport_error(Socket, Reason, S2) end end. diff --git a/src/machi_listener_sup.erl b/src/machi_listener_sup.erl index dc004ce..f227611 100644 --- a/src/machi_listener_sup.erl +++ b/src/machi_listener_sup.erl @@ -72,7 +72,7 @@ child_spec(FluName, TcpPort, Witness, DataDir, EpochTab, ProjStore) -> ListenerName = make_listener_name(FluName), NbAcceptors = 100, TcpOpts = [{port, TcpPort}, {backlog, ?BACKLOG}], - ProtoOpts = [FluName, Witness, DataDir, EpochTab, ProjStore], + NetServerOpts = [FluName, Witness, DataDir, EpochTab, ProjStore], ranch:child_spec(ListenerName, NbAcceptors, ranch_tcp, TcpOpts, - machi_pb_protocol, ProtoOpts). + machi_flu1_net_server, NetServerOpts). From aa0a0413d1b57873e6d59f0a15ebc489519af39e Mon Sep 17 00:00:00 2001 From: Shunichi Shinohara Date: Mon, 7 Dec 2015 15:26:39 +0900 Subject: [PATCH 6/9] Cosmetics of comments, specs, whitespaces and unit tests refactoring --- src/machi_flu1_net_server.erl | 53 +++++++------ src/machi_listener_sup.erl | 29 ++++--- test/machi_admin_util_test.erl | 67 ++++++++-------- test/machi_ap_repair_eqc.erl | 2 +- test/machi_chain_manager1_test.erl | 30 ++++---- test/machi_flu1_test.erl | 60 ++------------- test/machi_pb_high_client_test.erl | 31 +++----- test/machi_projection_store_test.erl | 4 +- test/machi_proxy_flu1_client_test.erl | 35 ++++----- test/machi_psup_test_util.erl | 71 ----------------- test/machi_test_util.erl | 107 ++++++++++++++++++++++++++ 11 files changed, 243 insertions(+), 246 deletions(-) delete mode 100644 test/machi_psup_test_util.erl create mode 100644 test/machi_test_util.erl diff --git a/src/machi_flu1_net_server.erl b/src/machi_flu1_net_server.erl index 3f11500..6bc9d3b 100644 --- a/src/machi_flu1_net_server.erl +++ b/src/machi_flu1_net_server.erl @@ -45,12 +45,12 @@ -endif. % TEST -record(state, { - %% Transport related items passed from Ranch + %% Ranch's transport management stuff ref :: ranch:ref(), socket :: socket(), transport :: module(), - %% Machi application related items below + %% Machi FLU configurations, common for low and high data_dir :: string(), witness :: boolean(), pb_mode :: undefined | high | low, @@ -58,14 +58,14 @@ %% - Used in spawning CR client in high mode proj_store :: pid(), - %% Low mode only + %% Low mode only items %% Current best knowledge, used for wedge_self / bad_epoch check epoch_id :: undefined | machi_dt:epoch_id(), %% Used in dispatching append_chunk* reqs to the %% append serializing process - flu_name :: atom(), + flu_name :: pv1_server(), %% Used in server_wedge_status to lookup the table - epoch_tab :: ets:tid(), + epoch_tab :: ets:tab(), %% High mode only high_clnt :: pid(), @@ -106,15 +106,15 @@ handle_cast(_Msg, S) -> {noreply, S}. %% TODO: Other transport support needed?? TLS/SSL, SCTP -handle_info({tcp, Sock, Data}=_Info, #state{socket=Sock}=S) -> +handle_info({tcp, Socket, Data}=_Info, #state{socket=Socket}=S) -> lager:debug("~s:handle_info: ~w", [?MODULE, _Info]), - transport_received(Sock, Data, S); -handle_info({tcp_closed, Sock}=_Info, #state{socket=Sock}=S) -> + transport_received(Socket, Data, S); +handle_info({tcp_closed, Socket}=_Info, #state{socket=Socket}=S) -> lager:debug("~s:handle_info: ~w", [?MODULE, _Info]), - transport_closed(Sock, S); -handle_info({tcp_error, Sock, Reason}=_Info, #state{socket=Sock}=S) -> + transport_closed(Socket, S); +handle_info({tcp_error, Socket, Reason}=_Info, #state{socket=Socket}=S) -> lager:debug("~s:handle_info: ~w", [?MODULE, _Info]), - transport_error(Sock, Reason, S); + transport_error(Socket, Reason, S); handle_info(_Info, S) -> lager:warning("~s:handle_info UNKNOWN message: ~w", [?MODULE, _Info]), {noreply, S}. @@ -143,17 +143,18 @@ transport_received(Socket, Bin, #state{transport=Transport}=S) -> case machi_pb:decode_mpb_ll_request(Bin) of LL_req when LL_req#mpb_ll_request.do_not_alter == 2 -> {R, NewS} = do_pb_ll_request(LL_req, S), - {maybe_encode_response(R), mode(low, NewS)}; + {maybe_encode_response(R), set_mode(low, NewS)}; _ -> HL_req = machi_pb:decode_mpb_request(Bin), 1 = HL_req#mpb_request.do_not_alter, {R, NewS} = do_pb_hl_request(HL_req, make_high_clnt(S)), - {machi_pb:encode_mpb_response(R), mode(high, NewS)} + {machi_pb:encode_mpb_response(R), set_mode(high, NewS)} end, - if RespBin == async_no_response -> + case RespBin of + async_no_response -> Transport:setopts(Socket, [{active, once}]), {noreply, S2}; - true -> + _ -> case Transport:send(Socket, RespBin) of ok -> Transport:setopts(Socket, [{active, once}]), @@ -168,7 +169,7 @@ transport_closed(_Socket, S) -> {stop, normal, S}. -spec transport_error(socket(), term(), state()) -> no_return(). -transport_error(Sock, Reason, #state{transport=Transport}=_S) -> +transport_error(Socket, Reason, #state{transport=Transport}=_S) -> Msg = io_lib:format("Socket error ~w", [Reason]), R = #mpb_ll_response{req_id= <<>>, generic=#mpb_errorresp{code=1, msg=Msg}}, @@ -180,18 +181,20 @@ transport_error(Sock, Reason, #state{transport=Transport}=_S) -> %%%% Error in process <0.545.0> with exit value: {badarg,[{erlang,port_command,....... %%%% TODO: is this what causes the intermittent PULSE deadlock errors? %%%% _ = (catch gen_tcp:send(Sock, _Resp)), timer:sleep(1000), - (catch Transport:close(Sock)), - %% TODO: better to exit with `Reason'? + (catch Transport:close(Socket)), + _ = lager:warning("Socket error (~w -> ~w): ~w", + [Transport:sockname(Socket), Transport:peername(Socket), Reason]), + %% TODO: better to exit with `Reason' without logging? exit(normal). -maybe_encode_response(async_no_response=X) -> - X; +maybe_encode_response(async_no_response=R) -> + R; maybe_encode_response(R) -> machi_pb:encode_mpb_ll_response(R). -mode(Mode, #state{pb_mode=undefined}=S) -> +set_mode(Mode, #state{pb_mode=undefined}=S) -> S#state{pb_mode=Mode}; -mode(_, S) -> +set_mode(_, S) -> S. %%%% Low PB mode %%%% @@ -208,7 +211,6 @@ do_pb_ll_request(PB_request, S) -> when LowCmd =:= low_proj; LowCmd =:= low_wedge_status; LowCmd =:= low_list_files -> - %% Skip wedge check for projection commands! %% Skip wedge check for these unprivileged commands {Rs, NewS} = do_pb_ll_request3(Cmd0, S), {RqID, Cmd0, Rs, NewS}; @@ -254,6 +256,7 @@ do_pb_ll_request3({low_wedge_status, _EpochID}, S) -> {do_server_wedge_status(S), S}; do_pb_ll_request3({low_proj, PCMD}, S) -> {do_server_proj_request(PCMD, S), S}; + %% Witness status *matters* below do_pb_ll_request3({low_append_chunk, _EpochID, CoC_Namespace, CoC_Locator, Prefix, Chunk, CSum_tag, @@ -285,6 +288,7 @@ do_pb_ll_request3({low_delete_migration, _EpochID, File}, do_pb_ll_request3({low_trunc_hack, _EpochID, File}, #state{witness=false}=S) -> {do_server_trunc_hack(File, S), S}; + do_pb_ll_request3(_, #state{witness=true}=S) -> {{error, bad_arg}, S}. % TODO: new status code?? @@ -361,7 +365,8 @@ do_server_append_chunk2(CoC_Namespace, CoC_Locator, throw:{bad_csum, _CS} -> {error, bad_checksum}; error:badarg -> - error_logger:error_msg("Message send to ~p gave badarg, make certain server is running with correct registered name\n", [?MODULE]), + lager:error("badarg at ~w:do_server_append_chunk2:~w ~w", + [?MODULE, ?LINE, erlang:get_stacktrace()]), {error, bad_arg} end. diff --git a/src/machi_listener_sup.erl b/src/machi_listener_sup.erl index f227611..f2362ad 100644 --- a/src/machi_listener_sup.erl +++ b/src/machi_listener_sup.erl @@ -18,16 +18,15 @@ %% %% ------------------------------------------------------------------- -%% @doc This is the supervisor to hold ranch listener for sigle FLU, -%% holds at most one child worker. +%% @doc A supervisor to hold ranch listener for sigle FLU. +%% It holds at most one child worker. -%% TODO: This supervisor is maybe useless. First introduced by workaround -%% to start listener dynamically in flu1 initialization time. -%% Because psup is blocked in flu1 initialization time, adding a child -%% to psup leads to deadlock. -%% By the result of refactoring process, if initialization can be done -%% only by static arguments, then this supervisor should be removed -%% and add listener as a direct child of psup. +%% TODO: This supervisor is maybe useless. First introduced for +%% workaround to start listener dynamically in flu1 initialization +%% time. Because psup is being blocked in flu1 initialization time, +%% adding a child to psup leads to deadlock. If initialization can be +%% done only by static arguments, then this supervisor should be +%% removed and added as a direct child of `machi_flu_psup'. -module(machi_listener_sup). -behaviour(supervisor). @@ -42,32 +41,44 @@ %% supervisor callback -export([init/1]). +-include("machi_projection.hrl"). + -define(BACKLOG, 8192). +-spec start_link(pv1_server()) -> {ok, pid()}. start_link(FluName) -> supervisor:start_link({local, make_listener_sup_name(FluName)}, ?MODULE, []). +-spec start_listener(pv1_server(), inet:port_number(), boolean(), + string(), ets:tab(), atom() | pid()) -> {ok, pid()}. start_listener(FluName, TcpPort, Witness, DataDir, EpochTab, ProjStore) -> supervisor:start_child(make_listener_sup_name(FluName), child_spec(FluName, TcpPort, Witness, DataDir, EpochTab, ProjStore)). +-spec stop_listener(pv1_server()) -> ok. stop_listener(FluName) -> SupName = make_listener_sup_name(FluName), ListenerName = make_listener_name(FluName), ok = supervisor:terminate_child(SupName, ListenerName), ok = supervisor:delete_child(SupName, ListenerName). +-spec make_listener_name(pv1_server()) -> atom(). make_listener_sup_name(FluName) when is_atom(FluName) -> list_to_atom(atom_to_list(FluName) ++ "_listener_sup"). +-spec make_listener_sup_name(pv1_server()) -> atom(). make_listener_name(FluName) -> list_to_atom(atom_to_list(FluName) ++ "_listener"). +%% Supervisor callback + init([]) -> SupFlags = {one_for_one, 1000, 10}, {ok, {SupFlags, []}}. +-spec child_spec(pv1_server(), inet:port_number(), boolean(), + string(), ets:tab(), atom() | pid()) -> supervisor:child_spec(). child_spec(FluName, TcpPort, Witness, DataDir, EpochTab, ProjStore) -> ListenerName = make_listener_name(FluName), NbAcceptors = 100, diff --git a/test/machi_admin_util_test.erl b/test/machi_admin_util_test.erl index 8a43c39..1ebbbf3 100644 --- a/test/machi_admin_util_test.erl +++ b/test/machi_admin_util_test.erl @@ -44,45 +44,46 @@ verify_file_checksums_test2() -> TcpPort = 32958, DataDir = "./data", W_props = [{initial_wedged, false}], - {ok, SupPid} = machi_sup:start_link(), - machi_flu1_test:start_flu_package(verify1_flu, TcpPort, DataDir, - W_props), - Sock1 = ?FLU_C:connect(#p_srvr{address=Host, port=TcpPort}), try - Prefix = <<"verify_prefix">>, - NumChunks = 10, - [{ok, _} = ?FLU_C:append_chunk(Sock1, ?DUMMY_PV1_EPOCH, - Prefix, <>) || - X <- lists:seq(1, NumChunks)], - {ok, [{_FileSize,File}]} = ?FLU_C:list_files(Sock1, ?DUMMY_PV1_EPOCH), - ?assertEqual({ok, []}, - machi_admin_util:verify_file_checksums_remote( - Host, TcpPort, ?DUMMY_PV1_EPOCH, File)), + machi_test_util:start_flu_package(verify1_flu, TcpPort, DataDir, + W_props), + Sock1 = ?FLU_C:connect(#p_srvr{address=Host, port=TcpPort}), + try + Prefix = <<"verify_prefix">>, + NumChunks = 10, + [{ok, _} = ?FLU_C:append_chunk(Sock1, ?DUMMY_PV1_EPOCH, + Prefix, <>) || + X <- lists:seq(1, NumChunks)], + {ok, [{_FileSize,File}]} = ?FLU_C:list_files(Sock1, ?DUMMY_PV1_EPOCH), + ?assertEqual({ok, []}, + machi_admin_util:verify_file_checksums_remote( + Host, TcpPort, ?DUMMY_PV1_EPOCH, File)), - %% Clobber the first 3 chunks, which are sizes 1/2/3. - {_, Path} = machi_util:make_data_filename(DataDir,binary_to_list(File)), - {ok, FH} = file:open(Path, [read,write]), - {ok, _} = file:position(FH, ?MINIMUM_OFFSET), - ok = file:write(FH, "y"), - ok = file:write(FH, "yo"), - ok = file:write(FH, "yo!"), - ok = file:close(FH), + %% Clobber the first 3 chunks, which are sizes 1/2/3. + {_, Path} = machi_util:make_data_filename(DataDir,binary_to_list(File)), + {ok, FH} = file:open(Path, [read,write]), + {ok, _} = file:position(FH, ?MINIMUM_OFFSET), + ok = file:write(FH, "y"), + ok = file:write(FH, "yo"), + ok = file:write(FH, "yo!"), + ok = file:close(FH), - %% Check the local flavor of the API: should be 3 bad checksums - {ok, Res1} = machi_admin_util:verify_file_checksums_local( - Host, TcpPort, ?DUMMY_PV1_EPOCH, Path), - 3 = length(Res1), + %% Check the local flavor of the API: should be 3 bad checksums + {ok, Res1} = machi_admin_util:verify_file_checksums_local( + Host, TcpPort, ?DUMMY_PV1_EPOCH, Path), + 3 = length(Res1), - %% Check the remote flavor of the API: should be 3 bad checksums - {ok, Res2} = machi_admin_util:verify_file_checksums_remote( - Host, TcpPort, ?DUMMY_PV1_EPOCH, File), - 3 = length(Res2), + %% Check the remote flavor of the API: should be 3 bad checksums + {ok, Res2} = machi_admin_util:verify_file_checksums_remote( + Host, TcpPort, ?DUMMY_PV1_EPOCH, File), + 3 = length(Res2), - ok + ok + after + catch ?FLU_C:quit(Sock1) + end after - catch ?FLU_C:quit(Sock1), - catch machi_flu1_test:stop_flu_package(verify1_flu), - exit(SupPid, normal) + catch machi_test_util:stop_flu_package() end. -endif. % !PULSE diff --git a/test/machi_ap_repair_eqc.erl b/test/machi_ap_repair_eqc.erl index a42d7cc..23e0e93 100644 --- a/test/machi_ap_repair_eqc.erl +++ b/test/machi_ap_repair_eqc.erl @@ -342,7 +342,7 @@ setup_target(Num, Seed, Verbose) -> setup_chain(Seed, AllListE, FLUNames, MgrNames, Dict) -> ok = shutdown_hard(), [begin - machi_flu1_test:clean_up_data_dir(Dir), + machi_test_util:clean_up_dir(Dir), filelib:ensure_dir(Dir ++ "/not-used") end || {_P, Dir} <- AllListE], [catch ets:delete(T) || T <- tabs()], diff --git a/test/machi_chain_manager1_test.erl b/test/machi_chain_manager1_test.erl index 3f95955..99ecb6a 100644 --- a/test/machi_chain_manager1_test.erl +++ b/test/machi_chain_manager1_test.erl @@ -275,14 +275,14 @@ make_prop_ets() -> smoke0_test() -> TcpPort = 6623, - {_, [Pa], [M0]} = machi_psup_test_util:start_flu_packages( - 1, "./data.", TcpPort, []), + {[Pa], [M0], _Dirs} = machi_test_util:start_flu_packages( + 1, TcpPort, "./data.", []), {ok, FLUaP} = ?FLU_PC:start_link(Pa), try pong = ?MGR:ping(M0) after ok = ?FLU_PC:quit(FLUaP), - machi_psup_test_util:stop_flu_packages() + machi_test_util:stop_flu_packages() end. smoke1_test_() -> @@ -291,12 +291,13 @@ smoke1_test_() -> smoke1_test2() -> TcpPort = 62777, MgrOpts = [{active_mode,false}], - {_, Ps, MgrNames} = machi_psup_test_util:start_flu_packages( - 3, "./data.", TcpPort, MgrOpts), - MembersDict = machi_projection:make_members_dict(Ps), - [machi_chain_manager1:set_chain_members(M, MembersDict) || M <- MgrNames], - Ma = hd(MgrNames), try + {Ps, MgrNames, _Dirs} = machi_test_util:start_flu_packages( + 3, TcpPort, "./data.", MgrOpts), + MembersDict = machi_projection:make_members_dict(Ps), + [machi_chain_manager1:set_chain_members(M, MembersDict) || M <- MgrNames], + Ma = hd(MgrNames), + {ok, P1} = ?MGR:test_calc_projection(Ma, false), % DERP! Check for race with manager's proxy vs. proj listener ok = lists:foldl( @@ -315,22 +316,23 @@ smoke1_test2() -> ok after - machi_psup_test_util:stop_flu_packages() + machi_test_util:stop_flu_packages() end. nonunanimous_setup_and_fix_test() -> TcpPort = 62877, MgrOpts = [{active_mode,false}], - {_, Ps, [Ma,Mb]} = machi_psup_test_util:start_flu_packages( - 2, "./data.", TcpPort, MgrOpts), + {Ps, [Ma,Mb], _Dirs} = machi_test_util:start_flu_packages( + 2, TcpPort, "./data.", MgrOpts), MembersDict = machi_projection:make_members_dict(Ps), [machi_chain_manager1:set_chain_members(M, MembersDict) || M <- [Ma, Mb]], [Proxy_a, Proxy_b] = Proxies = [element(2, ?FLU_PC:start_link(P)) || P <- Ps], - ok = machi_chain_manager1:set_chain_members(Ma, MembersDict, []), - ok = machi_chain_manager1:set_chain_members(Mb, MembersDict, []), + try + ok = machi_chain_manager1:set_chain_members(Ma, MembersDict, []), + ok = machi_chain_manager1:set_chain_members(Mb, MembersDict, []), {ok, P1} = ?MGR:test_calc_projection(Ma, false), P1a = machi_projection:update_checksum( @@ -374,7 +376,7 @@ nonunanimous_setup_and_fix_test() -> ok after [ok = ?FLU_PC:quit(X) || X <- Proxies], - machi_psup_test_util:stop_flu_packages() + machi_test_util:stop_flu_packages() end. unanimous_report_test() -> diff --git a/test/machi_flu1_test.erl b/test/machi_flu1_test.erl index 5491e2b..942d2da 100644 --- a/test/machi_flu1_test.erl +++ b/test/machi_flu1_test.erl @@ -30,46 +30,6 @@ -define(FLU, machi_flu1). -define(FLU_C, machi_flu1_client). -clean_up_data_dir(DataDir) -> - [begin - Fs = filelib:wildcard(DataDir ++ Glob), - [file:delete(F) || F <- Fs], - [file:del_dir(F) || F <- Fs] - end || Glob <- ["*/*/*/*", "*/*/*", "*/*", "*"] ], - _ = file:del_dir(DataDir), - ok. - -start_flu_package(RegName, TcpPort, DataDir) -> - start_flu_package(RegName, TcpPort, DataDir, []). - -start_flu_package(RegName, TcpPort, DataDir, Props) -> - case proplists:get_value(save_data_dir, Props) of - true -> - ok; - _ -> - clean_up_data_dir(DataDir) - end, - - maybe_start_sup(), - machi_flu_psup:start_flu_package(RegName, TcpPort, DataDir, Props). - -stop_flu_package(FluName) -> - machi_flu_psup:stop_flu_package(FluName), - Pid = whereis(machi_sup), - exit(Pid, normal), - machi_util:wait_for_death(Pid, 100). - -maybe_start_sup() -> - case whereis(machi_sup) of - undefined -> - machi_sup:start_link(), - %% evil but we have to let stuff start up - timer:sleep(10), - maybe_start_sup(); - Pid -> Pid - end. - - -ifndef(PULSE). flu_smoke_test() -> @@ -78,10 +38,9 @@ flu_smoke_test() -> DataDir = "./data", Prefix = <<"prefix!">>, BadPrefix = BadFile = "no/good", - W_props = [{initial_wedged, false}], - start_flu_package(smoke_flu, TcpPort, DataDir, W_props), try + _ = machi_test_util:start_flu_package(smoke_flu, TcpPort, DataDir, W_props), Msg = "Hello, world!", Msg = ?FLU_C:echo(Host, TcpPort, Msg), {error, bad_arg} = ?FLU_C:checksum_list(Host, TcpPort, @@ -178,22 +137,21 @@ flu_smoke_test() -> ok = ?FLU_C:quit(?FLU_C:connect(#p_srvr{address=Host, port=TcpPort})) after - stop_flu_package(smoke_flu) + machi_test_util:stop_flu_package() end. flu_projection_smoke_test() -> Host = "localhost", TcpPort = 32959, DataDir = "./data.projst", - - start_flu_package(projection_test_flu, TcpPort, DataDir), try + machi_test_util:start_flu_package(projection_test_flu, TcpPort, DataDir), [ok = flu_projection_common(Host, TcpPort, T) || T <- [public, private] ] %% , {ok, {false, EpochID1}} = ?FLU_C:wedge_status(Host, TcpPort), %% io:format(user, "EpochID1 ~p\n", [EpochID1]) after - stop_flu_package(projection_test_flu) + machi_test_util:stop_flu_package() end. flu_projection_common(Host, TcpPort, T) -> @@ -223,10 +181,9 @@ bad_checksum_test() -> Host = "localhost", TcpPort = 32960, DataDir = "./data.bct", - Opts = [{initial_wedged, false}], - start_flu_package(projection_test_flu, TcpPort, DataDir, Opts), try + machi_test_util:start_flu_package(projection_test_flu, TcpPort, DataDir, Opts), Prefix = <<"some prefix">>, Chunk1 = <<"yo yo yo">>, Chunk1_badcs = {<>, Chunk1}, @@ -235,17 +192,16 @@ bad_checksum_test() -> Prefix, Chunk1_badcs), ok after - stop_flu_package(projection_test_flu) + machi_test_util:stop_flu_package() end. witness_test() -> Host = "localhost", TcpPort = 32961, DataDir = "./data.witness", - Opts = [{initial_wedged, false}, {witness_mode, true}], - start_flu_package(projection_test_flu, TcpPort, DataDir, Opts), try + machi_test_util:start_flu_package(projection_test_flu, TcpPort, DataDir, Opts), Prefix = <<"some prefix">>, Chunk1 = <<"yo yo yo">>, @@ -276,7 +232,7 @@ witness_test() -> ok after - stop_flu_package(projection_test_flu) + machi_test_util:stop_flu_package() end. %% The purpose of timing_pb_encoding_test_ and timing_bif_encoding_test_ is diff --git a/test/machi_pb_high_client_test.erl b/test/machi_pb_high_client_test.erl index 3273bc2..16b125c 100644 --- a/test/machi_pb_high_client_test.erl +++ b/test/machi_pb_high_client_test.erl @@ -34,21 +34,15 @@ smoke_test_() -> {timeout, 5*60, fun() -> smoke_test2() end}. smoke_test2() -> - Port = 5720, - Ps = [#p_srvr{name=a, address="localhost", port=Port, props="./data.a"} - ], - D = orddict:from_list([{P#p_srvr.name, P} || P <- Ps]), + PortBase = 5720, ok = application:set_env(machi, max_file_size, 1024*1024), - - [os:cmd("rm -rf " ++ P#p_srvr.props) || P <- Ps], - {ok, SupPid} = machi_sup:start_link(), try - [begin - #p_srvr{name=Name, port=Port, props=Dir} = P, - {ok, _} = machi_flu_psup:start_flu_package(Name, Port, Dir, []) - end || P <- Ps], - ok = machi_chain_manager1:set_chain_members(a_chmgr, D), - [machi_chain_manager1:trigger_react_to_env(a_chmgr) || _ <-lists:seq(1,5)], + {Ps, MgrNames, Dirs} = machi_test_util:start_flu_packages( + 1, PortBase, "./data.", []), + D = orddict:from_list([{P#p_srvr.name, P} || P <- Ps]), + M0 = hd(MgrNames), + ok = machi_chain_manager1:set_chain_members(M0, D), + [machi_chain_manager1:trigger_react_to_env(M0) || _ <-lists:seq(1,5)], {ok, Clnt} = ?C:start_link(Ps), try @@ -94,7 +88,8 @@ smoke_test2() -> File1Bin = binary_to_list(File1), [begin - #p_srvr{name=Name, port=Port, props=Dir} = P, + #p_srvr{name=Name, props=Props} = P, + Dir = proplists:get_value(data_dir, Props), ?assertEqual({ok, [File1Bin]}, file:list_dir(filename:join([Dir, "data"]))), FileListFileName = filename:join([Dir, "known_files_" ++ atom_to_list(Name)]), @@ -122,7 +117,8 @@ smoke_test2() -> %% Make sure everything was trimmed File = binary_to_list(Filex), [begin - #p_srvr{name=Name, port=_Port, props=Dir} = P, + #p_srvr{name=Name, props=Props} = P, + Dir = proplists:get_value(data_dir, Props), ?assertEqual({ok, []}, file:list_dir(filename:join([Dir, "data"]))), FileListFileName = filename:join([Dir, "known_files_" ++ atom_to_list(Name)]), @@ -139,10 +135,7 @@ smoke_test2() -> (catch ?C:quit(Clnt)) end after - exit(SupPid, normal), - [os:cmd("rm -rf " ++ P#p_srvr.props) || P <- Ps], - machi_util:wait_for_death(SupPid, 100), - ok + machi_test_util:stop_flu_packages() end. -endif. % !PULSE diff --git a/test/machi_projection_store_test.erl b/test/machi_projection_store_test.erl index 665553b..eab42a9 100644 --- a/test/machi_projection_store_test.erl +++ b/test/machi_projection_store_test.erl @@ -33,7 +33,7 @@ smoke_test() -> Dir = "./data.a", Os = [{ignore_stability_time, true}, {active_mode, false}], os:cmd("rm -rf " ++ Dir), - machi_flu1_test:start_flu_package(a, PortBase, "./data.a", Os), + machi_test_util:start_flu_package(a, PortBase, "./data.a", Os), try P1 = machi_projection:new(1, a, [], [], [], [], []), @@ -58,7 +58,7 @@ smoke_test() -> ok after - machi_flu1_test:stop_flu_package(a) + machi_test_util:stop_flu_package() end. -endif. % !PULSE diff --git a/test/machi_proxy_flu1_client_test.erl b/test/machi_proxy_flu1_client_test.erl index 422dce1..dc0bdba 100644 --- a/test/machi_proxy_flu1_client_test.erl +++ b/test/machi_proxy_flu1_client_test.erl @@ -32,17 +32,14 @@ api_smoke_test() -> RegName = api_smoke_flu, - Host = "localhost", TcpPort = 57124, DataDir = "./data.api_smoke_flu", W_props = [{active_mode, false},{initial_wedged, false}], Prefix = <<"prefix">>, - {ok, SupPid} = machi_sup:start_link(), - machi_flu1_test:start_flu_package(RegName, TcpPort, DataDir, W_props), - try - I = #p_srvr{name=RegName, address=Host, port=TcpPort}, + {[I], _, _} = machi_test_util:start_flu_package( + RegName, TcpPort, DataDir, W_props), {ok, Prox1} = ?MUT:start_link(I), try FakeEpoch = ?DUMMY_PV1_EPOCH, @@ -50,13 +47,13 @@ api_smoke_test() -> FakeEpoch, Prefix, <<"data">>, infinity) || _ <- lists:seq(1,5)], %% Stop the FLU, what happens? - machi_flu1_test:stop_flu_package(RegName), + machi_test_util:stop_flu_package(), [{error,partition} = ?MUT:append_chunk(Prox1, FakeEpoch, Prefix, <<"data-stopped1">>, infinity) || _ <- lists:seq(1,3)], %% Start the FLU again, we should be able to do stuff immediately - machi_flu1_test:start_flu_package(RegName, TcpPort, DataDir, - [save_data_dir|W_props]), + machi_test_util:start_flu_package(RegName, TcpPort, DataDir, + [save_data_dir|W_props]), MyChunk = <<"my chunk data">>, {ok, {MyOff,MySize,MyFile}} = ?MUT:append_chunk(Prox1, FakeEpoch, Prefix, MyChunk, @@ -103,8 +100,7 @@ api_smoke_test() -> _ = (catch ?MUT:quit(Prox1)) end after - (catch machi_flu1_test:stop_flu_package(RegName)), - exit(SupPid, normal) + (catch machi_test_util:stop_flu_package()) end. flu_restart_test_() -> @@ -112,15 +108,13 @@ flu_restart_test_() -> flu_restart_test2() -> RegName = a, - Host = "localhost", TcpPort = 57125, DataDir = "./data.api_smoke_flu2", W_props = [{initial_wedged, false}, {active_mode, false}], - {ok, SupPid} = machi_sup:start_link(), - machi_flu1_test:start_flu_package(RegName, TcpPort, DataDir, W_props), try - I = #p_srvr{name=RegName, address=Host, port=TcpPort}, + {[I], _, _} = machi_test_util:start_flu_package( + RegName, TcpPort, DataDir, W_props), {ok, Prox1} = ?MUT:start_link(I), try FakeEpoch = ?DUMMY_PV1_EPOCH, @@ -140,7 +134,7 @@ flu_restart_test2() -> {ok, EpochID} = ?MUT:get_epoch_id(Prox1), {ok, EpochID} = ?MUT:get_latest_epochid(Prox1, public), {ok, EpochID} = ?MUT:get_latest_epochid(Prox1, private), - ok = machi_flu1_test:stop_flu_package(RegName), timer:sleep(50), + ok = machi_test_util:stop_flu_package(), timer:sleep(50), %% Now that the last proxy op was successful and only %% after did we stop the FLU, let's check that both the @@ -296,25 +290,24 @@ flu_restart_test2() -> ], [begin - machi_flu1_test:start_flu_package( + machi_test_util:start_flu_package( RegName, TcpPort, DataDir, [save_data_dir|W_props]), _ = Fun(line), ok = Fun(run), ok = Fun(run), - ok = machi_flu1_test:stop_flu_package(RegName), + ok = machi_test_util:stop_flu_package(), {error, partition} = Fun(stop), {error, partition} = Fun(stop), ok end || Fun <- ExpectedOps ], ok after - _ = (catch ?MUT:quit(Prox1)), - exit(SupPid, normal) + _ = (catch ?MUT:quit(Prox1)) end after - (catch machi_flu1_test:stop_flu_package(RegName)) + (catch machi_test_util:stop_flu_package()) end. - + -endif. % !PULSE -endif. % TEST diff --git a/test/machi_psup_test_util.erl b/test/machi_psup_test_util.erl deleted file mode 100644 index 6446bf7..0000000 --- a/test/machi_psup_test_util.erl +++ /dev/null @@ -1,71 +0,0 @@ -%% ------------------------------------------------------------------- -%% -%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved. -%% -%% This file is provided to you under the Apache License, -%% Version 2.0 (the "License"); you may not use this file -%% except in compliance with the License. You may obtain -%% a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, -%% software distributed under the License is distributed on an -%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -%% KIND, either express or implied. See the License for the -%% specific language governing permissions and limitations -%% under the License. -%% -%% ------------------------------------------------------------------- - --module(machi_psup_test_util). --compile(export_all). - --ifdef(TEST). --ifndef(PULSE). - --include_lib("eunit/include/eunit.hrl"). - --include("machi.hrl"). --include("machi_projection.hrl"). - --define(FLU, machi_flu1). --define(FLU_C, machi_flu1_client). - --spec start_flu_packages(pos_integer(), string(), inet:port(), list()) -> - {MachiSup::pid(), Ps::[#p_srvr{}], MgrNames::[atom()]}. -start_flu_packages(FluCount, DirPrefix, BaseTcpPort, MgrOpts) -> - FluInfo = flu_info(FluCount, DirPrefix, BaseTcpPort), - _ = stop_machi_sup(), - _ = [machi_flu1_test:clean_up_data_dir(Dir) || {_, Dir, _} <- FluInfo], - {ok, SupPid} = machi_sup:start_link(), - [{ok, _} = machi_flu_psup:start_flu_package(Name, Port, Dir, MgrOpts) || - {#p_srvr{name=Name, port=Port}, Dir, _} <- FluInfo], - {Ps, _Dirs, MgrNames} = lists:unzip3(FluInfo), - {SupPid, Ps, MgrNames}. - -stop_flu_packages() -> - stop_machi_sup(). - -flu_info(FluCount, DirPrefix, BaseTcpPort) -> - [begin - FLUNameStr = [$a + I - 1], - FLUName = list_to_atom(FLUNameStr), - MgrName = machi_flu_psup:make_mgr_supname(FLUName), - {#p_srvr{name=FLUName, address="localhost", port=BaseTcpPort + I, - props=[{chmgr, MgrName}]}, - DirPrefix ++ "/data.eqc." ++ FLUNameStr, - MgrName} - end || I <- lists:seq(1, FluCount)]. - -stop_machi_sup() -> - case whereis(machi_sup) of - undefined -> ok; - Pid -> - catch exit(whereis(machi_sup), normal), - machi_util:wait_for_death(Pid, 30) - end. - --endif. % !PULSE --endif. % TEST - diff --git a/test/machi_test_util.erl b/test/machi_test_util.erl new file mode 100644 index 0000000..21d5ca9 --- /dev/null +++ b/test/machi_test_util.erl @@ -0,0 +1,107 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(machi_test_util). +-compile(export_all). + +-ifdef(TEST). +-ifndef(PULSE). + +-include_lib("eunit/include/eunit.hrl"). + +-include("machi.hrl"). +-include("machi_projection.hrl"). + +-define(FLU, machi_flu1). +-define(FLU_C, machi_flu1_client). + +-spec start_flu_package(atom(), inet:port_number(), string()) -> + {Ps::[#p_srvr{}], MgrNames::[atom()], Dirs::[string()]}. +start_flu_package(FluName, TcpPort, DataDir) -> + start_flu_package(FluName, TcpPort, DataDir, []). + +-spec start_flu_package(atom(), inet:port_number(), string(), list()) -> + {Ps::[#p_srvr{}], MgrNames::[atom()], Dirs::[string()]}. +start_flu_package(FluName, TcpPort, DataDir, Props) -> + MgrName = machi_flu_psup:make_mgr_supname(FluName), + FluInfo = [{#p_srvr{name=FluName, address="localhost", port=TcpPort, + props=[{chmgr, MgrName}, {data_dir, DataDir} | Props]}, + DataDir, MgrName}], + start_flu_packages(FluInfo). + +-spec start_flu_packages(pos_integer(), inet:port_number(), string(), list()) -> + {Ps::[#p_srvr{}], MgrNames::[atom()], Dirs::[string()]}. +start_flu_packages(FluCount, BaseTcpPort, DirPrefix, Props) -> + FluInfo = flu_info(FluCount, BaseTcpPort, DirPrefix, Props), + start_flu_packages(FluInfo). + +start_flu_packages(FluInfo) -> + _ = stop_machi_sup(), + clean_up_data_dirs(FluInfo), + {ok, _SupPid} = machi_sup:start_link(), + [{ok, _} = machi_flu_psup:start_flu_package(Name, Port, Dir, Props) || + {#p_srvr{name=Name, port=Port, props=Props}, Dir, _} <- FluInfo], + {Ps, Dirs, MgrNames} = lists:unzip3(FluInfo), + {Ps, MgrNames, Dirs}. + +stop_flu_package() -> + stop_flu_packages(). + +stop_flu_packages() -> + stop_machi_sup(). + +flu_info(FluCount, BaseTcpPort, DirPrefix, Props) -> + [begin + FLUNameStr = [$a + I - 1], + FLUName = list_to_atom(FLUNameStr), + MgrName = machi_flu_psup:make_mgr_supname(FLUName), + DataDir = DirPrefix ++ "/data.eqc." ++ FLUNameStr, + {#p_srvr{name=FLUName, address="localhost", port=BaseTcpPort + I, + props=[{chmgr, MgrName}, {data_dir, DataDir} | Props]}, + DataDir, MgrName} + end || I <- lists:seq(1, FluCount)]. + +stop_machi_sup() -> + case whereis(machi_sup) of + undefined -> ok; + Pid -> + catch exit(whereis(machi_sup), normal), + machi_util:wait_for_death(Pid, 30) + end. + +clean_up_data_dirs(FluInfo) -> + _ = [case proplists:get_value(save_data_dir, Propx) of + true -> ok; + _ -> clean_up_dir(Dir) + end || {#p_srvr{props=Propx}, Dir, _} <- FluInfo], + ok. + +clean_up_dir(Dir) -> + [begin + Fs = filelib:wildcard(Dir ++ Glob), + [file:delete(F) || F <- Fs], + [file:del_dir(F) || F <- Fs] + end || Glob <- ["*/*/*/*", "*/*/*", "*/*", "*"] ], + _ = file:del_dir(Dir), + ok. + +-endif. % !PULSE +-endif. % TEST + From ade4430d303d0ad2df3fe1dafc05802eeaa40d7d Mon Sep 17 00:00:00 2001 From: Shunichi Shinohara Date: Tue, 8 Dec 2015 15:38:27 +0900 Subject: [PATCH 7/9] More cleaner clean up --- src/machi_flu1.erl | 7 ++++++- test/machi_proxy_flu1_client_test.erl | 6 +++--- test/machi_test_util.erl | 16 ++++++++++------ 3 files changed, 19 insertions(+), 10 deletions(-) diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl index fade024..e620308 100644 --- a/src/machi_flu1.erl +++ b/src/machi_flu1.erl @@ -82,7 +82,12 @@ start_link([{FluName, TcpPort, DataDir}|Rest]) proc_lib:start_link(?MODULE, main2, [FluName, TcpPort, DataDir, Rest], ?INIT_TIMEOUT). -stop(Pid) -> +stop(RegName) when is_atom(RegName) -> + case whereis(RegName) of + undefined -> ok; + Pid -> stop(Pid) + end; +stop(Pid) when is_pid(Pid) -> case erlang:is_process_alive(Pid) of true -> Pid ! killme, diff --git a/test/machi_proxy_flu1_client_test.erl b/test/machi_proxy_flu1_client_test.erl index dc0bdba..439b1a7 100644 --- a/test/machi_proxy_flu1_client_test.erl +++ b/test/machi_proxy_flu1_client_test.erl @@ -53,7 +53,7 @@ api_smoke_test() -> infinity) || _ <- lists:seq(1,3)], %% Start the FLU again, we should be able to do stuff immediately machi_test_util:start_flu_package(RegName, TcpPort, DataDir, - [save_data_dir|W_props]), + [no_cleanup|W_props]), MyChunk = <<"my chunk data">>, {ok, {MyOff,MySize,MyFile}} = ?MUT:append_chunk(Prox1, FakeEpoch, Prefix, MyChunk, @@ -148,7 +148,7 @@ flu_restart_test2() -> ExpectedOps = [ - fun(run) -> {ok, EpochID} = ?MUT:get_epoch_id(Prox1), + fun(run) -> ?assertEqual({ok, EpochID}, ?MUT:get_epoch_id(Prox1)), ok; (line) -> io:format("line ~p, ", [?LINE]); (stop) -> ?MUT:get_epoch_id(Prox1) end, @@ -292,7 +292,7 @@ flu_restart_test2() -> [begin machi_test_util:start_flu_package( RegName, TcpPort, DataDir, - [save_data_dir|W_props]), + [no_cleanup|W_props]), _ = Fun(line), ok = Fun(run), ok = Fun(run), diff --git a/test/machi_test_util.erl b/test/machi_test_util.erl index 21d5ca9..ff908b7 100644 --- a/test/machi_test_util.erl +++ b/test/machi_test_util.erl @@ -54,7 +54,7 @@ start_flu_packages(FluCount, BaseTcpPort, DirPrefix, Props) -> start_flu_packages(FluInfo) -> _ = stop_machi_sup(), - clean_up_data_dirs(FluInfo), + clean_up(FluInfo), {ok, _SupPid} = machi_sup:start_link(), [{ok, _} = machi_flu_psup:start_flu_package(Name, Port, Dir, Props) || {#p_srvr{name=Name, port=Port, props=Props}, Dir, _} <- FluInfo], @@ -86,11 +86,15 @@ stop_machi_sup() -> machi_util:wait_for_death(Pid, 30) end. -clean_up_data_dirs(FluInfo) -> - _ = [case proplists:get_value(save_data_dir, Propx) of - true -> ok; - _ -> clean_up_dir(Dir) - end || {#p_srvr{props=Propx}, Dir, _} <- FluInfo], +clean_up(FluInfo) -> + _ = [begin + case proplists:get_value(no_cleanup, Props) of + true -> ok; + _ -> + _ = machi_flu1:stop(FLUName), + clean_up_dir(Dir) + end + end || {#p_srvr{name=FLUName, props=Props}, Dir, _} <- FluInfo], ok. clean_up_dir(Dir) -> From 2e2d282afc8385c224c01cea7d75024407c86830 Mon Sep 17 00:00:00 2001 From: Shunichi Shinohara Date: Wed, 9 Dec 2015 18:04:50 +0900 Subject: [PATCH 8/9] Use outside of ephemeral port range to listen on When there is TCP_WAIT connection whose local part has port to be listened, listen (bind) will fail by eaddrinuse _on Linux_ (won't on Mac OS X). This commit also adds some logs and pattern matches. Reference - Ephemeral port - Wikipedia, the free encyclopedia https://en.wikipedia.org/wiki/Ephemeral_port "Many Linux kernels use the port range 32768 to 61000.[note 2] FreeBSD has used the IANA port range since release 4.6. Previous versions, including the Berkeley Software Distribution (BSD), use ports 1024 to 5000 as ephemeral ports.[2]" - Demostration of collision between already-closed ephemeral port and listen port on Linux (Mac OS X allows) https://gist.github.com/shino/36ae1e01608366d52236 --- src/machi_flu1_net_server.erl | 15 ++++++++++----- test/machi_flu1_test.erl | 16 ++++++++-------- 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/src/machi_flu1_net_server.erl b/src/machi_flu1_net_server.erl index 6bc9d3b..a244a5f 100644 --- a/src/machi_flu1_net_server.erl +++ b/src/machi_flu1_net_server.erl @@ -113,17 +113,22 @@ handle_info({tcp_closed, Socket}=_Info, #state{socket=Socket}=S) -> lager:debug("~s:handle_info: ~w", [?MODULE, _Info]), transport_closed(Socket, S); handle_info({tcp_error, Socket, Reason}=_Info, #state{socket=Socket}=S) -> - lager:debug("~s:handle_info: ~w", [?MODULE, _Info]), + lager:warning("~s:handle_info (socket=~w) tcp_error: ~w", [?MODULE, Socket, Reason]), transport_error(Socket, Reason, S); handle_info(_Info, S) -> lager:warning("~s:handle_info UNKNOWN message: ~w", [?MODULE, _Info]), {noreply, S}. -terminate(_Reason, #state{socket=undefined}=_S) -> - lager:debug("~s:terminate: ~w", [?MODULE, _Reason]), +terminate(normal, #state{socket=undefined}=_S) -> ok; -terminate(_Reason, #state{socket=Socket}=_S) -> - lager:debug("~s:terminate: ~w", [?MODULE, _Reason]), +terminate(Reason, #state{socket=undefined}=_S) -> + lager:warning("~s:terminate (socket=undefined): ~w", [?MODULE, Reason]), + ok; +terminate(normal, #state{socket=Socket}=_S) -> + (catch gen_tcp:close(Socket)), + ok; +terminate(Reason, #state{socket=Socket}=_S) -> + lager:warning("~s:terminate (socket=Socket): ~w", [?MODULE, Reason]), (catch gen_tcp:close(Socket)), ok. diff --git a/test/machi_flu1_test.erl b/test/machi_flu1_test.erl index 942d2da..0577033 100644 --- a/test/machi_flu1_test.erl +++ b/test/machi_flu1_test.erl @@ -34,13 +34,13 @@ flu_smoke_test() -> Host = "localhost", - TcpPort = 32957, + TcpPort = 12957, DataDir = "./data", Prefix = <<"prefix!">>, BadPrefix = BadFile = "no/good", W_props = [{initial_wedged, false}], + {_, _, _} = machi_test_util:start_flu_package(smoke_flu, TcpPort, DataDir, W_props), try - _ = machi_test_util:start_flu_package(smoke_flu, TcpPort, DataDir, W_props), Msg = "Hello, world!", Msg = ?FLU_C:echo(Host, TcpPort, Msg), {error, bad_arg} = ?FLU_C:checksum_list(Host, TcpPort, @@ -142,10 +142,10 @@ flu_smoke_test() -> flu_projection_smoke_test() -> Host = "localhost", - TcpPort = 32959, + TcpPort = 12959, DataDir = "./data.projst", + {_,_,_} = machi_test_util:start_flu_package(projection_test_flu, TcpPort, DataDir), try - machi_test_util:start_flu_package(projection_test_flu, TcpPort, DataDir), [ok = flu_projection_common(Host, TcpPort, T) || T <- [public, private] ] %% , {ok, {false, EpochID1}} = ?FLU_C:wedge_status(Host, TcpPort), @@ -179,11 +179,11 @@ flu_projection_common(Host, TcpPort, T) -> bad_checksum_test() -> Host = "localhost", - TcpPort = 32960, + TcpPort = 12960, DataDir = "./data.bct", Opts = [{initial_wedged, false}], + {_,_,_} = machi_test_util:start_flu_package(projection_test_flu, TcpPort, DataDir, Opts), try - machi_test_util:start_flu_package(projection_test_flu, TcpPort, DataDir, Opts), Prefix = <<"some prefix">>, Chunk1 = <<"yo yo yo">>, Chunk1_badcs = {<>, Chunk1}, @@ -197,11 +197,11 @@ bad_checksum_test() -> witness_test() -> Host = "localhost", - TcpPort = 32961, + TcpPort = 12961, DataDir = "./data.witness", Opts = [{initial_wedged, false}, {witness_mode, true}], + {_,_,_} = machi_test_util:start_flu_package(projection_test_flu, TcpPort, DataDir, Opts), try - machi_test_util:start_flu_package(projection_test_flu, TcpPort, DataDir, Opts), Prefix = <<"some prefix">>, Chunk1 = <<"yo yo yo">>, From eef00e4f8f27189bb7f09e030d1ec629cc57e721 Mon Sep 17 00:00:00 2001 From: Shunichi Shinohara Date: Thu, 10 Dec 2015 15:58:17 +0900 Subject: [PATCH 9/9] Add TODO comment for possible race condition --- src/machi_flu1_net_server.erl | 1 + 1 file changed, 1 insertion(+) diff --git a/src/machi_flu1_net_server.erl b/src/machi_flu1_net_server.erl index a244a5f..93e3675 100644 --- a/src/machi_flu1_net_server.erl +++ b/src/machi_flu1_net_server.erl @@ -250,6 +250,7 @@ do_pb_ll_request2(EpochID, CMD, S) -> end. lookup_epoch(#state{epoch_tab=T}) -> + %% TODO: race in shutdown to access ets table after owner dies ets:lookup_element(T, epoch, 2). %% Witness status does not matter below.