From 3d05f543df2a7543829ac0d02c8ce34127abd05d Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Mon, 22 Jun 2015 17:49:07 +0900 Subject: [PATCH] WIP: new test case is failing, quick fix soon --- src/machi_flu1.erl | 3 +- src/machi_pb_high_client.erl | 146 ++++++++++++++++++ ...test.erl => machi_pb_high_client_test.erl} | 31 ++-- 3 files changed, 158 insertions(+), 22 deletions(-) create mode 100644 src/machi_pb_high_client.erl rename test/{machi_pb_cr_client_test.erl => machi_pb_high_client_test.erl} (60%) diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl index af507c2..2955341 100644 --- a/src/machi_flu1.erl +++ b/src/machi_flu1.erl @@ -346,7 +346,8 @@ net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) -> http_server_hack(FluName, PutLine, Sock, S); <<"PROTOCOL-BUFFERS\n">> -> ok = gen_tcp:send(Sock, <<"OK\n">>), - ok = inet:setopts(Sock, [{packet, 4}]), + ok = inet:setopts(Sock, [{packet, 4}, + {packet_size, 33*1024*1024}]), protocol_buffers_loop(Sock, S); _ -> machi_util:verb("Else Got: ~p\n", [Line]), diff --git a/src/machi_pb_high_client.erl b/src/machi_pb_high_client.erl new file mode 100644 index 0000000..def6843 --- /dev/null +++ b/src/machi_pb_high_client.erl @@ -0,0 +1,146 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +%% @doc Machi PB (Protocol Buffers) high-level client (prototype, API TBD) + +-module(machi_pb_high_client). + +-include("machi.hrl"). +-include("machi_pb.hrl"). +-include("machi_projection.hrl"). + +-define(DEFAULT_TIMEOUT, 10*1000). + +-export([start_link/1, quit/1, + connected_p/1, + echo/2, echo/3]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-record(state, { + server_list :: p_srvr_dict(), + sock :: 'undefined' | port() + }). + +start_link(P_srvr_list) -> + gen_server:start_link(?MODULE, [P_srvr_list], []). + +quit(PidSpec) -> + gen_server:call(PidSpec, quit, infinity). + +connected_p(PidSpec) -> + gen_server:call(PidSpec, connected_p, infinity). + +echo(PidSpec, String) -> + echo(PidSpec, String, ?DEFAULT_TIMEOUT). + +echo(PidSpec, String, Timeout) -> + send_sync(PidSpec, {echo, String}, Timeout). + +send_sync(PidSpec, Cmd, Timeout) -> + gen_server:call(PidSpec, {send_sync, Cmd}, Timeout). + +%%%%%%%%%%%%%%%%%%%%%%%%%%% + +init([P_srvr_list]) -> + {ok, #state{server_list=P_srvr_list}}. + +handle_call(quit, _From, S) -> + {stop, normal, ok, S}; +handle_call(connected_p, _From, #state{sock=Port}=S) + when is_port(Port) -> + {reply, true, S}; +handle_call(connected_p, _From, #state{sock=undefined}=S) -> + S2 = try_connect(S), + {reply, is_port(S2#state.sock), S2}; +handle_call({send_sync, _Cmd}, _From, #state{sock=undefined}=S) -> + {reply, not_connected, S}; +handle_call({send_sync, Cmd}, _From, S) -> + {Reply, S2} = do_send_sync(Cmd, S), + {reply, Reply, S2}; +handle_call(_Request, _From, S) -> + Reply = whaaaaaaaaaaaaaaaaaaaa, + {reply, Reply, S}. + +handle_cast(_Msg, S) -> + {noreply, S}. + +handle_info(_Info, S) -> + io:format(user, "~s:handle_info: ~p\n", [?MODULE, _Info]), + {noreply, S}. + +terminate(_Reason, _S) -> + ok. + +code_change(_OldVsn, S, _Extra) -> + {ok, S}. + +%%%%%%%%%%%%%%%%%%%%%%%%%%% + +try_connect(#state{sock=Sock}=S) when is_port(Sock) -> + S; +try_connect(#state{server_list=Ps}=S) -> + case lists:foldl(fun(_, Sock) when is_port(Sock) -> + Sock; + (P, _) -> + do_connect_to_pb_listener(P) + end, unused, Ps) of + Sock when is_port(Sock) -> + S#state{sock=Sock}; + _Else -> + S + end. + +do_connect_to_pb_listener(P) -> + try + {ok, Sock} = gen_tcp:connect(P#p_srvr.address, P#p_srvr.port, + [{packet, line}, binary, {active, false}]), + ok = gen_tcp:send(Sock, <<"PROTOCOL-BUFFERS\n">>), + {ok, <<"OK\n">>} = gen_tcp:recv(Sock, 0), + ok = inet:setopts(Sock, [{packet,4}]), + Sock + catch _X:_Y -> + io:format(user, "\n~p ~p @ ~p\n", [_X, _Y, erlang:get_stacktrace()]), + bummer + end. + +%% {Reply, S2} = do_send_sync(Cmd, S), + +do_send_sync({echo, String}, #state{sock=Sock}=S) -> + try + ReqID = <<0>>, + R1a = #mpb_request{req_id=ReqID, + echo=#mpb_echoreq{message=String}}, + Bin1a = machi_pb:encode_mpb_request(R1a), + ok = gen_tcp:send(Sock, Bin1a), + {ok, Bin1B} = gen_tcp:recv(Sock, 0), + case (catch machi_pb:decode_mpb_response(Bin1B)) of + #mpb_response{req_id=ReqID, echo=Echo} = _R1b -> + io:format(user, "do_send_sync ~p\n", [_R1b]), + {Echo#mpb_echoresp.message, S} + end + catch X:Y -> + Res = {bummer, {X, Y, erlang:get_stacktrace()}}, + {Res, S} + end. + + diff --git a/test/machi_pb_cr_client_test.erl b/test/machi_pb_high_client_test.erl similarity index 60% rename from test/machi_pb_cr_client_test.erl rename to test/machi_pb_high_client_test.erl index 483ab9d..6aa4e6c 100644 --- a/test/machi_pb_cr_client_test.erl +++ b/test/machi_pb_high_client_test.erl @@ -18,7 +18,7 @@ %% %% ------------------------------------------------------------------- --module(machi_pb_cr_client_test). +-module(machi_pb_high_client_test). -compile(export_all). -ifdef(TEST). @@ -28,45 +28,34 @@ -include("machi_projection.hrl"). -include_lib("eunit/include/eunit.hrl"). --define(C, machi_pb_cr_client). +-define(C, machi_pb_high_client). smoke_test_() -> {timeout, 5*60, fun() -> smoke_test2() end}. smoke_test2() -> Port = 5720, - Ps = [{a,#p_srvr{name=a, address="localhost", port=Port, props="./data.a"}} + Ps = [#p_srvr{name=a, address="localhost", port=Port, props="./data.a"} ], - [os:cmd("rm -rf " ++ P#p_srvr.props) || {_,P} <- Ps], + [os:cmd("rm -rf " ++ P#p_srvr.props) || P <- Ps], {ok, SupPid} = machi_flu_sup:start_link(), try [begin #p_srvr{name=Name, port=Port, props=Dir} = P, {ok, _} = machi_flu_psup:start_flu_package(Name, Port, Dir, []) - end || {_,P} <- Ps], + end || P <- Ps], [machi_chain_manager1:test_react_to_env(a_chmgr) || _ <-lists:seq(1,5)], - {_, P_a} = hd(Ps), - {ok, Sock} = gen_tcp:connect(P_a#p_srvr.address, P_a#p_srvr.port, - [{packet, line}, binary, {active, false}]), + {ok, Clnt} = ?C:start_link(Ps), try - Prefix = <<"pre">>, - Chunk1 = <<"yochunk">>, - ok = gen_tcp:send(Sock, <<"PROTOCOL-BUFFERS\n">>), - {ok, <<"OK\n">>} = gen_tcp:recv(Sock, 0), - ok = inet:setopts(Sock, [{packet,4}]), - R1a = #mpb_request{req_id= <<0>>, - echo=#mpb_echoreq{message = <<"Hello, world!">>}}, - Bin1a = machi_pb:encode_mpb_request(R1a), - ok = gen_tcp:send(Sock, Bin1a), - {ok, Bin1B} = gen_tcp:recv(Sock, 0), - R1b = machi_pb:decode_mpb_response(Bin1B), - true = is_record(R1b, mpb_response), + true = ?C:connected_p(Clnt), + String = <<"yo, dawg">>, + String = ?C:echo(Clnt, String), ok after - (catch gen_tcp:close(Sock)) + (catch ?C:quit(Clnt)) end after exit(SupPid, normal),