diff --git a/src/machi_flu1_net_server.erl b/src/machi_flu1_net_server.erl index 3f11500..6bc9d3b 100644 --- a/src/machi_flu1_net_server.erl +++ b/src/machi_flu1_net_server.erl @@ -45,12 +45,12 @@ -endif. % TEST -record(state, { - %% Transport related items passed from Ranch + %% Ranch's transport management stuff ref :: ranch:ref(), socket :: socket(), transport :: module(), - %% Machi application related items below + %% Machi FLU configurations, common for low and high data_dir :: string(), witness :: boolean(), pb_mode :: undefined | high | low, @@ -58,14 +58,14 @@ %% - Used in spawning CR client in high mode proj_store :: pid(), - %% Low mode only + %% Low mode only items %% Current best knowledge, used for wedge_self / bad_epoch check epoch_id :: undefined | machi_dt:epoch_id(), %% Used in dispatching append_chunk* reqs to the %% append serializing process - flu_name :: atom(), + flu_name :: pv1_server(), %% Used in server_wedge_status to lookup the table - epoch_tab :: ets:tid(), + epoch_tab :: ets:tab(), %% High mode only high_clnt :: pid(), @@ -106,15 +106,15 @@ handle_cast(_Msg, S) -> {noreply, S}. %% TODO: Other transport support needed?? TLS/SSL, SCTP -handle_info({tcp, Sock, Data}=_Info, #state{socket=Sock}=S) -> +handle_info({tcp, Socket, Data}=_Info, #state{socket=Socket}=S) -> lager:debug("~s:handle_info: ~w", [?MODULE, _Info]), - transport_received(Sock, Data, S); -handle_info({tcp_closed, Sock}=_Info, #state{socket=Sock}=S) -> + transport_received(Socket, Data, S); +handle_info({tcp_closed, Socket}=_Info, #state{socket=Socket}=S) -> lager:debug("~s:handle_info: ~w", [?MODULE, _Info]), - transport_closed(Sock, S); -handle_info({tcp_error, Sock, Reason}=_Info, #state{socket=Sock}=S) -> + transport_closed(Socket, S); +handle_info({tcp_error, Socket, Reason}=_Info, #state{socket=Socket}=S) -> lager:debug("~s:handle_info: ~w", [?MODULE, _Info]), - transport_error(Sock, Reason, S); + transport_error(Socket, Reason, S); handle_info(_Info, S) -> lager:warning("~s:handle_info UNKNOWN message: ~w", [?MODULE, _Info]), {noreply, S}. @@ -143,17 +143,18 @@ transport_received(Socket, Bin, #state{transport=Transport}=S) -> case machi_pb:decode_mpb_ll_request(Bin) of LL_req when LL_req#mpb_ll_request.do_not_alter == 2 -> {R, NewS} = do_pb_ll_request(LL_req, S), - {maybe_encode_response(R), mode(low, NewS)}; + {maybe_encode_response(R), set_mode(low, NewS)}; _ -> HL_req = machi_pb:decode_mpb_request(Bin), 1 = HL_req#mpb_request.do_not_alter, {R, NewS} = do_pb_hl_request(HL_req, make_high_clnt(S)), - {machi_pb:encode_mpb_response(R), mode(high, NewS)} + {machi_pb:encode_mpb_response(R), set_mode(high, NewS)} end, - if RespBin == async_no_response -> + case RespBin of + async_no_response -> Transport:setopts(Socket, [{active, once}]), {noreply, S2}; - true -> + _ -> case Transport:send(Socket, RespBin) of ok -> Transport:setopts(Socket, [{active, once}]), @@ -168,7 +169,7 @@ transport_closed(_Socket, S) -> {stop, normal, S}. -spec transport_error(socket(), term(), state()) -> no_return(). -transport_error(Sock, Reason, #state{transport=Transport}=_S) -> +transport_error(Socket, Reason, #state{transport=Transport}=_S) -> Msg = io_lib:format("Socket error ~w", [Reason]), R = #mpb_ll_response{req_id= <<>>, generic=#mpb_errorresp{code=1, msg=Msg}}, @@ -180,18 +181,20 @@ transport_error(Sock, Reason, #state{transport=Transport}=_S) -> %%%% Error in process <0.545.0> with exit value: {badarg,[{erlang,port_command,....... %%%% TODO: is this what causes the intermittent PULSE deadlock errors? %%%% _ = (catch gen_tcp:send(Sock, _Resp)), timer:sleep(1000), - (catch Transport:close(Sock)), - %% TODO: better to exit with `Reason'? + (catch Transport:close(Socket)), + _ = lager:warning("Socket error (~w -> ~w): ~w", + [Transport:sockname(Socket), Transport:peername(Socket), Reason]), + %% TODO: better to exit with `Reason' without logging? exit(normal). -maybe_encode_response(async_no_response=X) -> - X; +maybe_encode_response(async_no_response=R) -> + R; maybe_encode_response(R) -> machi_pb:encode_mpb_ll_response(R). -mode(Mode, #state{pb_mode=undefined}=S) -> +set_mode(Mode, #state{pb_mode=undefined}=S) -> S#state{pb_mode=Mode}; -mode(_, S) -> +set_mode(_, S) -> S. %%%% Low PB mode %%%% @@ -208,7 +211,6 @@ do_pb_ll_request(PB_request, S) -> when LowCmd =:= low_proj; LowCmd =:= low_wedge_status; LowCmd =:= low_list_files -> - %% Skip wedge check for projection commands! %% Skip wedge check for these unprivileged commands {Rs, NewS} = do_pb_ll_request3(Cmd0, S), {RqID, Cmd0, Rs, NewS}; @@ -254,6 +256,7 @@ do_pb_ll_request3({low_wedge_status, _EpochID}, S) -> {do_server_wedge_status(S), S}; do_pb_ll_request3({low_proj, PCMD}, S) -> {do_server_proj_request(PCMD, S), S}; + %% Witness status *matters* below do_pb_ll_request3({low_append_chunk, _EpochID, CoC_Namespace, CoC_Locator, Prefix, Chunk, CSum_tag, @@ -285,6 +288,7 @@ do_pb_ll_request3({low_delete_migration, _EpochID, File}, do_pb_ll_request3({low_trunc_hack, _EpochID, File}, #state{witness=false}=S) -> {do_server_trunc_hack(File, S), S}; + do_pb_ll_request3(_, #state{witness=true}=S) -> {{error, bad_arg}, S}. % TODO: new status code?? @@ -361,7 +365,8 @@ do_server_append_chunk2(CoC_Namespace, CoC_Locator, throw:{bad_csum, _CS} -> {error, bad_checksum}; error:badarg -> - error_logger:error_msg("Message send to ~p gave badarg, make certain server is running with correct registered name\n", [?MODULE]), + lager:error("badarg at ~w:do_server_append_chunk2:~w ~w", + [?MODULE, ?LINE, erlang:get_stacktrace()]), {error, bad_arg} end. diff --git a/src/machi_listener_sup.erl b/src/machi_listener_sup.erl index f227611..f2362ad 100644 --- a/src/machi_listener_sup.erl +++ b/src/machi_listener_sup.erl @@ -18,16 +18,15 @@ %% %% ------------------------------------------------------------------- -%% @doc This is the supervisor to hold ranch listener for sigle FLU, -%% holds at most one child worker. +%% @doc A supervisor to hold ranch listener for sigle FLU. +%% It holds at most one child worker. -%% TODO: This supervisor is maybe useless. First introduced by workaround -%% to start listener dynamically in flu1 initialization time. -%% Because psup is blocked in flu1 initialization time, adding a child -%% to psup leads to deadlock. -%% By the result of refactoring process, if initialization can be done -%% only by static arguments, then this supervisor should be removed -%% and add listener as a direct child of psup. +%% TODO: This supervisor is maybe useless. First introduced for +%% workaround to start listener dynamically in flu1 initialization +%% time. Because psup is being blocked in flu1 initialization time, +%% adding a child to psup leads to deadlock. If initialization can be +%% done only by static arguments, then this supervisor should be +%% removed and added as a direct child of `machi_flu_psup'. -module(machi_listener_sup). -behaviour(supervisor). @@ -42,32 +41,44 @@ %% supervisor callback -export([init/1]). +-include("machi_projection.hrl"). + -define(BACKLOG, 8192). +-spec start_link(pv1_server()) -> {ok, pid()}. start_link(FluName) -> supervisor:start_link({local, make_listener_sup_name(FluName)}, ?MODULE, []). +-spec start_listener(pv1_server(), inet:port_number(), boolean(), + string(), ets:tab(), atom() | pid()) -> {ok, pid()}. start_listener(FluName, TcpPort, Witness, DataDir, EpochTab, ProjStore) -> supervisor:start_child(make_listener_sup_name(FluName), child_spec(FluName, TcpPort, Witness, DataDir, EpochTab, ProjStore)). +-spec stop_listener(pv1_server()) -> ok. stop_listener(FluName) -> SupName = make_listener_sup_name(FluName), ListenerName = make_listener_name(FluName), ok = supervisor:terminate_child(SupName, ListenerName), ok = supervisor:delete_child(SupName, ListenerName). +-spec make_listener_name(pv1_server()) -> atom(). make_listener_sup_name(FluName) when is_atom(FluName) -> list_to_atom(atom_to_list(FluName) ++ "_listener_sup"). +-spec make_listener_sup_name(pv1_server()) -> atom(). make_listener_name(FluName) -> list_to_atom(atom_to_list(FluName) ++ "_listener"). +%% Supervisor callback + init([]) -> SupFlags = {one_for_one, 1000, 10}, {ok, {SupFlags, []}}. +-spec child_spec(pv1_server(), inet:port_number(), boolean(), + string(), ets:tab(), atom() | pid()) -> supervisor:child_spec(). child_spec(FluName, TcpPort, Witness, DataDir, EpochTab, ProjStore) -> ListenerName = make_listener_name(FluName), NbAcceptors = 100, diff --git a/test/machi_admin_util_test.erl b/test/machi_admin_util_test.erl index 8a43c39..1ebbbf3 100644 --- a/test/machi_admin_util_test.erl +++ b/test/machi_admin_util_test.erl @@ -44,45 +44,46 @@ verify_file_checksums_test2() -> TcpPort = 32958, DataDir = "./data", W_props = [{initial_wedged, false}], - {ok, SupPid} = machi_sup:start_link(), - machi_flu1_test:start_flu_package(verify1_flu, TcpPort, DataDir, - W_props), - Sock1 = ?FLU_C:connect(#p_srvr{address=Host, port=TcpPort}), try - Prefix = <<"verify_prefix">>, - NumChunks = 10, - [{ok, _} = ?FLU_C:append_chunk(Sock1, ?DUMMY_PV1_EPOCH, - Prefix, <>) || - X <- lists:seq(1, NumChunks)], - {ok, [{_FileSize,File}]} = ?FLU_C:list_files(Sock1, ?DUMMY_PV1_EPOCH), - ?assertEqual({ok, []}, - machi_admin_util:verify_file_checksums_remote( - Host, TcpPort, ?DUMMY_PV1_EPOCH, File)), + machi_test_util:start_flu_package(verify1_flu, TcpPort, DataDir, + W_props), + Sock1 = ?FLU_C:connect(#p_srvr{address=Host, port=TcpPort}), + try + Prefix = <<"verify_prefix">>, + NumChunks = 10, + [{ok, _} = ?FLU_C:append_chunk(Sock1, ?DUMMY_PV1_EPOCH, + Prefix, <>) || + X <- lists:seq(1, NumChunks)], + {ok, [{_FileSize,File}]} = ?FLU_C:list_files(Sock1, ?DUMMY_PV1_EPOCH), + ?assertEqual({ok, []}, + machi_admin_util:verify_file_checksums_remote( + Host, TcpPort, ?DUMMY_PV1_EPOCH, File)), - %% Clobber the first 3 chunks, which are sizes 1/2/3. - {_, Path} = machi_util:make_data_filename(DataDir,binary_to_list(File)), - {ok, FH} = file:open(Path, [read,write]), - {ok, _} = file:position(FH, ?MINIMUM_OFFSET), - ok = file:write(FH, "y"), - ok = file:write(FH, "yo"), - ok = file:write(FH, "yo!"), - ok = file:close(FH), + %% Clobber the first 3 chunks, which are sizes 1/2/3. + {_, Path} = machi_util:make_data_filename(DataDir,binary_to_list(File)), + {ok, FH} = file:open(Path, [read,write]), + {ok, _} = file:position(FH, ?MINIMUM_OFFSET), + ok = file:write(FH, "y"), + ok = file:write(FH, "yo"), + ok = file:write(FH, "yo!"), + ok = file:close(FH), - %% Check the local flavor of the API: should be 3 bad checksums - {ok, Res1} = machi_admin_util:verify_file_checksums_local( - Host, TcpPort, ?DUMMY_PV1_EPOCH, Path), - 3 = length(Res1), + %% Check the local flavor of the API: should be 3 bad checksums + {ok, Res1} = machi_admin_util:verify_file_checksums_local( + Host, TcpPort, ?DUMMY_PV1_EPOCH, Path), + 3 = length(Res1), - %% Check the remote flavor of the API: should be 3 bad checksums - {ok, Res2} = machi_admin_util:verify_file_checksums_remote( - Host, TcpPort, ?DUMMY_PV1_EPOCH, File), - 3 = length(Res2), + %% Check the remote flavor of the API: should be 3 bad checksums + {ok, Res2} = machi_admin_util:verify_file_checksums_remote( + Host, TcpPort, ?DUMMY_PV1_EPOCH, File), + 3 = length(Res2), - ok + ok + after + catch ?FLU_C:quit(Sock1) + end after - catch ?FLU_C:quit(Sock1), - catch machi_flu1_test:stop_flu_package(verify1_flu), - exit(SupPid, normal) + catch machi_test_util:stop_flu_package() end. -endif. % !PULSE diff --git a/test/machi_ap_repair_eqc.erl b/test/machi_ap_repair_eqc.erl index a42d7cc..23e0e93 100644 --- a/test/machi_ap_repair_eqc.erl +++ b/test/machi_ap_repair_eqc.erl @@ -342,7 +342,7 @@ setup_target(Num, Seed, Verbose) -> setup_chain(Seed, AllListE, FLUNames, MgrNames, Dict) -> ok = shutdown_hard(), [begin - machi_flu1_test:clean_up_data_dir(Dir), + machi_test_util:clean_up_dir(Dir), filelib:ensure_dir(Dir ++ "/not-used") end || {_P, Dir} <- AllListE], [catch ets:delete(T) || T <- tabs()], diff --git a/test/machi_chain_manager1_test.erl b/test/machi_chain_manager1_test.erl index 3f95955..99ecb6a 100644 --- a/test/machi_chain_manager1_test.erl +++ b/test/machi_chain_manager1_test.erl @@ -275,14 +275,14 @@ make_prop_ets() -> smoke0_test() -> TcpPort = 6623, - {_, [Pa], [M0]} = machi_psup_test_util:start_flu_packages( - 1, "./data.", TcpPort, []), + {[Pa], [M0], _Dirs} = machi_test_util:start_flu_packages( + 1, TcpPort, "./data.", []), {ok, FLUaP} = ?FLU_PC:start_link(Pa), try pong = ?MGR:ping(M0) after ok = ?FLU_PC:quit(FLUaP), - machi_psup_test_util:stop_flu_packages() + machi_test_util:stop_flu_packages() end. smoke1_test_() -> @@ -291,12 +291,13 @@ smoke1_test_() -> smoke1_test2() -> TcpPort = 62777, MgrOpts = [{active_mode,false}], - {_, Ps, MgrNames} = machi_psup_test_util:start_flu_packages( - 3, "./data.", TcpPort, MgrOpts), - MembersDict = machi_projection:make_members_dict(Ps), - [machi_chain_manager1:set_chain_members(M, MembersDict) || M <- MgrNames], - Ma = hd(MgrNames), try + {Ps, MgrNames, _Dirs} = machi_test_util:start_flu_packages( + 3, TcpPort, "./data.", MgrOpts), + MembersDict = machi_projection:make_members_dict(Ps), + [machi_chain_manager1:set_chain_members(M, MembersDict) || M <- MgrNames], + Ma = hd(MgrNames), + {ok, P1} = ?MGR:test_calc_projection(Ma, false), % DERP! Check for race with manager's proxy vs. proj listener ok = lists:foldl( @@ -315,22 +316,23 @@ smoke1_test2() -> ok after - machi_psup_test_util:stop_flu_packages() + machi_test_util:stop_flu_packages() end. nonunanimous_setup_and_fix_test() -> TcpPort = 62877, MgrOpts = [{active_mode,false}], - {_, Ps, [Ma,Mb]} = machi_psup_test_util:start_flu_packages( - 2, "./data.", TcpPort, MgrOpts), + {Ps, [Ma,Mb], _Dirs} = machi_test_util:start_flu_packages( + 2, TcpPort, "./data.", MgrOpts), MembersDict = machi_projection:make_members_dict(Ps), [machi_chain_manager1:set_chain_members(M, MembersDict) || M <- [Ma, Mb]], [Proxy_a, Proxy_b] = Proxies = [element(2, ?FLU_PC:start_link(P)) || P <- Ps], - ok = machi_chain_manager1:set_chain_members(Ma, MembersDict, []), - ok = machi_chain_manager1:set_chain_members(Mb, MembersDict, []), + try + ok = machi_chain_manager1:set_chain_members(Ma, MembersDict, []), + ok = machi_chain_manager1:set_chain_members(Mb, MembersDict, []), {ok, P1} = ?MGR:test_calc_projection(Ma, false), P1a = machi_projection:update_checksum( @@ -374,7 +376,7 @@ nonunanimous_setup_and_fix_test() -> ok after [ok = ?FLU_PC:quit(X) || X <- Proxies], - machi_psup_test_util:stop_flu_packages() + machi_test_util:stop_flu_packages() end. unanimous_report_test() -> diff --git a/test/machi_flu1_test.erl b/test/machi_flu1_test.erl index 5491e2b..942d2da 100644 --- a/test/machi_flu1_test.erl +++ b/test/machi_flu1_test.erl @@ -30,46 +30,6 @@ -define(FLU, machi_flu1). -define(FLU_C, machi_flu1_client). -clean_up_data_dir(DataDir) -> - [begin - Fs = filelib:wildcard(DataDir ++ Glob), - [file:delete(F) || F <- Fs], - [file:del_dir(F) || F <- Fs] - end || Glob <- ["*/*/*/*", "*/*/*", "*/*", "*"] ], - _ = file:del_dir(DataDir), - ok. - -start_flu_package(RegName, TcpPort, DataDir) -> - start_flu_package(RegName, TcpPort, DataDir, []). - -start_flu_package(RegName, TcpPort, DataDir, Props) -> - case proplists:get_value(save_data_dir, Props) of - true -> - ok; - _ -> - clean_up_data_dir(DataDir) - end, - - maybe_start_sup(), - machi_flu_psup:start_flu_package(RegName, TcpPort, DataDir, Props). - -stop_flu_package(FluName) -> - machi_flu_psup:stop_flu_package(FluName), - Pid = whereis(machi_sup), - exit(Pid, normal), - machi_util:wait_for_death(Pid, 100). - -maybe_start_sup() -> - case whereis(machi_sup) of - undefined -> - machi_sup:start_link(), - %% evil but we have to let stuff start up - timer:sleep(10), - maybe_start_sup(); - Pid -> Pid - end. - - -ifndef(PULSE). flu_smoke_test() -> @@ -78,10 +38,9 @@ flu_smoke_test() -> DataDir = "./data", Prefix = <<"prefix!">>, BadPrefix = BadFile = "no/good", - W_props = [{initial_wedged, false}], - start_flu_package(smoke_flu, TcpPort, DataDir, W_props), try + _ = machi_test_util:start_flu_package(smoke_flu, TcpPort, DataDir, W_props), Msg = "Hello, world!", Msg = ?FLU_C:echo(Host, TcpPort, Msg), {error, bad_arg} = ?FLU_C:checksum_list(Host, TcpPort, @@ -178,22 +137,21 @@ flu_smoke_test() -> ok = ?FLU_C:quit(?FLU_C:connect(#p_srvr{address=Host, port=TcpPort})) after - stop_flu_package(smoke_flu) + machi_test_util:stop_flu_package() end. flu_projection_smoke_test() -> Host = "localhost", TcpPort = 32959, DataDir = "./data.projst", - - start_flu_package(projection_test_flu, TcpPort, DataDir), try + machi_test_util:start_flu_package(projection_test_flu, TcpPort, DataDir), [ok = flu_projection_common(Host, TcpPort, T) || T <- [public, private] ] %% , {ok, {false, EpochID1}} = ?FLU_C:wedge_status(Host, TcpPort), %% io:format(user, "EpochID1 ~p\n", [EpochID1]) after - stop_flu_package(projection_test_flu) + machi_test_util:stop_flu_package() end. flu_projection_common(Host, TcpPort, T) -> @@ -223,10 +181,9 @@ bad_checksum_test() -> Host = "localhost", TcpPort = 32960, DataDir = "./data.bct", - Opts = [{initial_wedged, false}], - start_flu_package(projection_test_flu, TcpPort, DataDir, Opts), try + machi_test_util:start_flu_package(projection_test_flu, TcpPort, DataDir, Opts), Prefix = <<"some prefix">>, Chunk1 = <<"yo yo yo">>, Chunk1_badcs = {<>, Chunk1}, @@ -235,17 +192,16 @@ bad_checksum_test() -> Prefix, Chunk1_badcs), ok after - stop_flu_package(projection_test_flu) + machi_test_util:stop_flu_package() end. witness_test() -> Host = "localhost", TcpPort = 32961, DataDir = "./data.witness", - Opts = [{initial_wedged, false}, {witness_mode, true}], - start_flu_package(projection_test_flu, TcpPort, DataDir, Opts), try + machi_test_util:start_flu_package(projection_test_flu, TcpPort, DataDir, Opts), Prefix = <<"some prefix">>, Chunk1 = <<"yo yo yo">>, @@ -276,7 +232,7 @@ witness_test() -> ok after - stop_flu_package(projection_test_flu) + machi_test_util:stop_flu_package() end. %% The purpose of timing_pb_encoding_test_ and timing_bif_encoding_test_ is diff --git a/test/machi_pb_high_client_test.erl b/test/machi_pb_high_client_test.erl index 3273bc2..16b125c 100644 --- a/test/machi_pb_high_client_test.erl +++ b/test/machi_pb_high_client_test.erl @@ -34,21 +34,15 @@ smoke_test_() -> {timeout, 5*60, fun() -> smoke_test2() end}. smoke_test2() -> - Port = 5720, - Ps = [#p_srvr{name=a, address="localhost", port=Port, props="./data.a"} - ], - D = orddict:from_list([{P#p_srvr.name, P} || P <- Ps]), + PortBase = 5720, ok = application:set_env(machi, max_file_size, 1024*1024), - - [os:cmd("rm -rf " ++ P#p_srvr.props) || P <- Ps], - {ok, SupPid} = machi_sup:start_link(), try - [begin - #p_srvr{name=Name, port=Port, props=Dir} = P, - {ok, _} = machi_flu_psup:start_flu_package(Name, Port, Dir, []) - end || P <- Ps], - ok = machi_chain_manager1:set_chain_members(a_chmgr, D), - [machi_chain_manager1:trigger_react_to_env(a_chmgr) || _ <-lists:seq(1,5)], + {Ps, MgrNames, Dirs} = machi_test_util:start_flu_packages( + 1, PortBase, "./data.", []), + D = orddict:from_list([{P#p_srvr.name, P} || P <- Ps]), + M0 = hd(MgrNames), + ok = machi_chain_manager1:set_chain_members(M0, D), + [machi_chain_manager1:trigger_react_to_env(M0) || _ <-lists:seq(1,5)], {ok, Clnt} = ?C:start_link(Ps), try @@ -94,7 +88,8 @@ smoke_test2() -> File1Bin = binary_to_list(File1), [begin - #p_srvr{name=Name, port=Port, props=Dir} = P, + #p_srvr{name=Name, props=Props} = P, + Dir = proplists:get_value(data_dir, Props), ?assertEqual({ok, [File1Bin]}, file:list_dir(filename:join([Dir, "data"]))), FileListFileName = filename:join([Dir, "known_files_" ++ atom_to_list(Name)]), @@ -122,7 +117,8 @@ smoke_test2() -> %% Make sure everything was trimmed File = binary_to_list(Filex), [begin - #p_srvr{name=Name, port=_Port, props=Dir} = P, + #p_srvr{name=Name, props=Props} = P, + Dir = proplists:get_value(data_dir, Props), ?assertEqual({ok, []}, file:list_dir(filename:join([Dir, "data"]))), FileListFileName = filename:join([Dir, "known_files_" ++ atom_to_list(Name)]), @@ -139,10 +135,7 @@ smoke_test2() -> (catch ?C:quit(Clnt)) end after - exit(SupPid, normal), - [os:cmd("rm -rf " ++ P#p_srvr.props) || P <- Ps], - machi_util:wait_for_death(SupPid, 100), - ok + machi_test_util:stop_flu_packages() end. -endif. % !PULSE diff --git a/test/machi_projection_store_test.erl b/test/machi_projection_store_test.erl index 665553b..eab42a9 100644 --- a/test/machi_projection_store_test.erl +++ b/test/machi_projection_store_test.erl @@ -33,7 +33,7 @@ smoke_test() -> Dir = "./data.a", Os = [{ignore_stability_time, true}, {active_mode, false}], os:cmd("rm -rf " ++ Dir), - machi_flu1_test:start_flu_package(a, PortBase, "./data.a", Os), + machi_test_util:start_flu_package(a, PortBase, "./data.a", Os), try P1 = machi_projection:new(1, a, [], [], [], [], []), @@ -58,7 +58,7 @@ smoke_test() -> ok after - machi_flu1_test:stop_flu_package(a) + machi_test_util:stop_flu_package() end. -endif. % !PULSE diff --git a/test/machi_proxy_flu1_client_test.erl b/test/machi_proxy_flu1_client_test.erl index 422dce1..dc0bdba 100644 --- a/test/machi_proxy_flu1_client_test.erl +++ b/test/machi_proxy_flu1_client_test.erl @@ -32,17 +32,14 @@ api_smoke_test() -> RegName = api_smoke_flu, - Host = "localhost", TcpPort = 57124, DataDir = "./data.api_smoke_flu", W_props = [{active_mode, false},{initial_wedged, false}], Prefix = <<"prefix">>, - {ok, SupPid} = machi_sup:start_link(), - machi_flu1_test:start_flu_package(RegName, TcpPort, DataDir, W_props), - try - I = #p_srvr{name=RegName, address=Host, port=TcpPort}, + {[I], _, _} = machi_test_util:start_flu_package( + RegName, TcpPort, DataDir, W_props), {ok, Prox1} = ?MUT:start_link(I), try FakeEpoch = ?DUMMY_PV1_EPOCH, @@ -50,13 +47,13 @@ api_smoke_test() -> FakeEpoch, Prefix, <<"data">>, infinity) || _ <- lists:seq(1,5)], %% Stop the FLU, what happens? - machi_flu1_test:stop_flu_package(RegName), + machi_test_util:stop_flu_package(), [{error,partition} = ?MUT:append_chunk(Prox1, FakeEpoch, Prefix, <<"data-stopped1">>, infinity) || _ <- lists:seq(1,3)], %% Start the FLU again, we should be able to do stuff immediately - machi_flu1_test:start_flu_package(RegName, TcpPort, DataDir, - [save_data_dir|W_props]), + machi_test_util:start_flu_package(RegName, TcpPort, DataDir, + [save_data_dir|W_props]), MyChunk = <<"my chunk data">>, {ok, {MyOff,MySize,MyFile}} = ?MUT:append_chunk(Prox1, FakeEpoch, Prefix, MyChunk, @@ -103,8 +100,7 @@ api_smoke_test() -> _ = (catch ?MUT:quit(Prox1)) end after - (catch machi_flu1_test:stop_flu_package(RegName)), - exit(SupPid, normal) + (catch machi_test_util:stop_flu_package()) end. flu_restart_test_() -> @@ -112,15 +108,13 @@ flu_restart_test_() -> flu_restart_test2() -> RegName = a, - Host = "localhost", TcpPort = 57125, DataDir = "./data.api_smoke_flu2", W_props = [{initial_wedged, false}, {active_mode, false}], - {ok, SupPid} = machi_sup:start_link(), - machi_flu1_test:start_flu_package(RegName, TcpPort, DataDir, W_props), try - I = #p_srvr{name=RegName, address=Host, port=TcpPort}, + {[I], _, _} = machi_test_util:start_flu_package( + RegName, TcpPort, DataDir, W_props), {ok, Prox1} = ?MUT:start_link(I), try FakeEpoch = ?DUMMY_PV1_EPOCH, @@ -140,7 +134,7 @@ flu_restart_test2() -> {ok, EpochID} = ?MUT:get_epoch_id(Prox1), {ok, EpochID} = ?MUT:get_latest_epochid(Prox1, public), {ok, EpochID} = ?MUT:get_latest_epochid(Prox1, private), - ok = machi_flu1_test:stop_flu_package(RegName), timer:sleep(50), + ok = machi_test_util:stop_flu_package(), timer:sleep(50), %% Now that the last proxy op was successful and only %% after did we stop the FLU, let's check that both the @@ -296,25 +290,24 @@ flu_restart_test2() -> ], [begin - machi_flu1_test:start_flu_package( + machi_test_util:start_flu_package( RegName, TcpPort, DataDir, [save_data_dir|W_props]), _ = Fun(line), ok = Fun(run), ok = Fun(run), - ok = machi_flu1_test:stop_flu_package(RegName), + ok = machi_test_util:stop_flu_package(), {error, partition} = Fun(stop), {error, partition} = Fun(stop), ok end || Fun <- ExpectedOps ], ok after - _ = (catch ?MUT:quit(Prox1)), - exit(SupPid, normal) + _ = (catch ?MUT:quit(Prox1)) end after - (catch machi_flu1_test:stop_flu_package(RegName)) + (catch machi_test_util:stop_flu_package()) end. - + -endif. % !PULSE -endif. % TEST diff --git a/test/machi_psup_test_util.erl b/test/machi_psup_test_util.erl deleted file mode 100644 index 6446bf7..0000000 --- a/test/machi_psup_test_util.erl +++ /dev/null @@ -1,71 +0,0 @@ -%% ------------------------------------------------------------------- -%% -%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved. -%% -%% This file is provided to you under the Apache License, -%% Version 2.0 (the "License"); you may not use this file -%% except in compliance with the License. You may obtain -%% a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, -%% software distributed under the License is distributed on an -%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -%% KIND, either express or implied. See the License for the -%% specific language governing permissions and limitations -%% under the License. -%% -%% ------------------------------------------------------------------- - --module(machi_psup_test_util). --compile(export_all). - --ifdef(TEST). --ifndef(PULSE). - --include_lib("eunit/include/eunit.hrl"). - --include("machi.hrl"). --include("machi_projection.hrl"). - --define(FLU, machi_flu1). --define(FLU_C, machi_flu1_client). - --spec start_flu_packages(pos_integer(), string(), inet:port(), list()) -> - {MachiSup::pid(), Ps::[#p_srvr{}], MgrNames::[atom()]}. -start_flu_packages(FluCount, DirPrefix, BaseTcpPort, MgrOpts) -> - FluInfo = flu_info(FluCount, DirPrefix, BaseTcpPort), - _ = stop_machi_sup(), - _ = [machi_flu1_test:clean_up_data_dir(Dir) || {_, Dir, _} <- FluInfo], - {ok, SupPid} = machi_sup:start_link(), - [{ok, _} = machi_flu_psup:start_flu_package(Name, Port, Dir, MgrOpts) || - {#p_srvr{name=Name, port=Port}, Dir, _} <- FluInfo], - {Ps, _Dirs, MgrNames} = lists:unzip3(FluInfo), - {SupPid, Ps, MgrNames}. - -stop_flu_packages() -> - stop_machi_sup(). - -flu_info(FluCount, DirPrefix, BaseTcpPort) -> - [begin - FLUNameStr = [$a + I - 1], - FLUName = list_to_atom(FLUNameStr), - MgrName = machi_flu_psup:make_mgr_supname(FLUName), - {#p_srvr{name=FLUName, address="localhost", port=BaseTcpPort + I, - props=[{chmgr, MgrName}]}, - DirPrefix ++ "/data.eqc." ++ FLUNameStr, - MgrName} - end || I <- lists:seq(1, FluCount)]. - -stop_machi_sup() -> - case whereis(machi_sup) of - undefined -> ok; - Pid -> - catch exit(whereis(machi_sup), normal), - machi_util:wait_for_death(Pid, 30) - end. - --endif. % !PULSE --endif. % TEST - diff --git a/test/machi_test_util.erl b/test/machi_test_util.erl new file mode 100644 index 0000000..21d5ca9 --- /dev/null +++ b/test/machi_test_util.erl @@ -0,0 +1,107 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(machi_test_util). +-compile(export_all). + +-ifdef(TEST). +-ifndef(PULSE). + +-include_lib("eunit/include/eunit.hrl"). + +-include("machi.hrl"). +-include("machi_projection.hrl"). + +-define(FLU, machi_flu1). +-define(FLU_C, machi_flu1_client). + +-spec start_flu_package(atom(), inet:port_number(), string()) -> + {Ps::[#p_srvr{}], MgrNames::[atom()], Dirs::[string()]}. +start_flu_package(FluName, TcpPort, DataDir) -> + start_flu_package(FluName, TcpPort, DataDir, []). + +-spec start_flu_package(atom(), inet:port_number(), string(), list()) -> + {Ps::[#p_srvr{}], MgrNames::[atom()], Dirs::[string()]}. +start_flu_package(FluName, TcpPort, DataDir, Props) -> + MgrName = machi_flu_psup:make_mgr_supname(FluName), + FluInfo = [{#p_srvr{name=FluName, address="localhost", port=TcpPort, + props=[{chmgr, MgrName}, {data_dir, DataDir} | Props]}, + DataDir, MgrName}], + start_flu_packages(FluInfo). + +-spec start_flu_packages(pos_integer(), inet:port_number(), string(), list()) -> + {Ps::[#p_srvr{}], MgrNames::[atom()], Dirs::[string()]}. +start_flu_packages(FluCount, BaseTcpPort, DirPrefix, Props) -> + FluInfo = flu_info(FluCount, BaseTcpPort, DirPrefix, Props), + start_flu_packages(FluInfo). + +start_flu_packages(FluInfo) -> + _ = stop_machi_sup(), + clean_up_data_dirs(FluInfo), + {ok, _SupPid} = machi_sup:start_link(), + [{ok, _} = machi_flu_psup:start_flu_package(Name, Port, Dir, Props) || + {#p_srvr{name=Name, port=Port, props=Props}, Dir, _} <- FluInfo], + {Ps, Dirs, MgrNames} = lists:unzip3(FluInfo), + {Ps, MgrNames, Dirs}. + +stop_flu_package() -> + stop_flu_packages(). + +stop_flu_packages() -> + stop_machi_sup(). + +flu_info(FluCount, BaseTcpPort, DirPrefix, Props) -> + [begin + FLUNameStr = [$a + I - 1], + FLUName = list_to_atom(FLUNameStr), + MgrName = machi_flu_psup:make_mgr_supname(FLUName), + DataDir = DirPrefix ++ "/data.eqc." ++ FLUNameStr, + {#p_srvr{name=FLUName, address="localhost", port=BaseTcpPort + I, + props=[{chmgr, MgrName}, {data_dir, DataDir} | Props]}, + DataDir, MgrName} + end || I <- lists:seq(1, FluCount)]. + +stop_machi_sup() -> + case whereis(machi_sup) of + undefined -> ok; + Pid -> + catch exit(whereis(machi_sup), normal), + machi_util:wait_for_death(Pid, 30) + end. + +clean_up_data_dirs(FluInfo) -> + _ = [case proplists:get_value(save_data_dir, Propx) of + true -> ok; + _ -> clean_up_dir(Dir) + end || {#p_srvr{props=Propx}, Dir, _} <- FluInfo], + ok. + +clean_up_dir(Dir) -> + [begin + Fs = filelib:wildcard(Dir ++ Glob), + [file:delete(F) || F <- Fs], + [file:del_dir(F) || F <- Fs] + end || Glob <- ["*/*/*/*", "*/*/*", "*/*", "*"] ], + _ = file:del_dir(Dir), + ok. + +-endif. % !PULSE +-endif. % TEST +