From a06055ac23bd30605856d059da86cd40824be869 Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Sun, 17 May 2015 16:18:30 +0900 Subject: [PATCH] WIP: rearrange client code to approach some semblance of modularity someday --- include/machi_projection.hrl | 2 +- src/machi_admin_util.erl | 8 +- src/machi_flu1_client.erl | 237 ++++++++++++++++---------- src/machi_proxy_flu1_client.erl | 34 ++-- test/machi_admin_util_test.erl | 2 +- test/machi_chain_manager1_test.erl | 2 +- test/machi_proxy_flu1_client_test.erl | 2 +- 7 files changed, 161 insertions(+), 126 deletions(-) diff --git a/include/machi_projection.hrl b/include/machi_projection.hrl index 548d716..e20908a 100644 --- a/include/machi_projection.hrl +++ b/include/machi_projection.hrl @@ -29,7 +29,7 @@ -record(p_srvr, { name :: pv1_server(), - proto = 'ipv4' :: 'ipv4' | 'disterl', % disterl? Hrm. + proto_mod = 'machi_flu1_client' :: atom(), % Module name address :: term(), % Protocol-specific port :: term(), % Protocol-specific props = [] :: list() % proplist for other related info diff --git a/src/machi_admin_util.erl b/src/machi_admin_util.erl index f0db9d0..25c96be 100644 --- a/src/machi_admin_util.erl +++ b/src/machi_admin_util.erl @@ -46,11 +46,11 @@ verify_file_checksums_local(Sock1, EpochID, Path) when is_port(Sock1) -> machi_flu1_client:epoch_id(), binary()|list()) -> {ok, [tuple()]} | {error, term()}. verify_file_checksums_local(Host, TcpPort, EpochID, Path) -> - Sock1 = machi_util:connect(Host, TcpPort), + Sock1 = ?FLU_C:connect(#p_srvr{address=Host, port=TcpPort}), try verify_file_checksums_local2(Sock1, EpochID, Path) after - catch gen_tcp:close(Sock1) + catch ?FLU_C:disconnect(Sock1) end. -spec verify_file_checksums_remote(port(), machi_flu1_client:epoch_id(), binary()|list()) -> @@ -62,11 +62,11 @@ verify_file_checksums_remote(Sock1, EpochID, File) when is_port(Sock1) -> machi_flu1_client:epoch_id(), binary()|list()) -> {ok, [tuple()]} | {error, term()}. verify_file_checksums_remote(Host, TcpPort, EpochID, File) -> - Sock1 = machi_util:connect(Host, TcpPort), + Sock1 = ?FLU_C:connect(#p_srvr{address=Host, port=TcpPort}), try verify_file_checksums_remote2(Sock1, EpochID, File) after - catch gen_tcp:close(Sock1) + catch ?FLU_C:disconnect(Sock1) end. %%%%%%%%%%%%%%%%%%%%%%%%%%% diff --git a/src/machi_flu1_client.erl b/src/machi_flu1_client.erl index 0aeb156..6d6741d 100644 --- a/src/machi_flu1_client.erl +++ b/src/machi_flu1_client.erl @@ -43,7 +43,10 @@ list_all_projections/2, list_all_projections/3, %% Common API - quit/1 + quit/1, + + %% Connection management API + connected_p/1, connect/1, disconnect/1 ]). %% For "internal" replication only. -export([ @@ -69,6 +72,7 @@ -type file_prefix() :: binary() | list(). -type inet_host() :: inet:ip_address() | inet:hostname(). -type inet_port() :: inet:port_number(). +-type port_wrap() :: {w,atom(),term()}. -type projection() :: #projection_v1{}. -type projection_type() :: 'public' | 'private'. @@ -77,7 +81,7 @@ %% @doc Append a chunk (binary- or iolist-style) of data to a file %% with `Prefix'. --spec append_chunk(port(), epoch_id(), file_prefix(), chunk()) -> +-spec append_chunk(port_wrap(), epoch_id(), file_prefix(), chunk()) -> {ok, chunk_pos()} | {error, error_general()} | {error, term()}. append_chunk(Sock, EpochID, Prefix, Chunk) -> append_chunk2(Sock, EpochID, Prefix, Chunk, 0). @@ -89,11 +93,11 @@ append_chunk(Sock, EpochID, Prefix, Chunk) -> epoch_id(), file_prefix(), chunk()) -> {ok, chunk_pos()} | {error, error_general()} | {error, term()}. append_chunk(Host, TcpPort, EpochID, Prefix, Chunk) -> - Sock = machi_util:connect(Host, TcpPort), + Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), try append_chunk2(Sock, EpochID, Prefix, Chunk, 0) after - catch gen_tcp:close(Sock) + disconnect(Sock) end. %% @doc Append a chunk (binary- or iolist-style) of data to a file @@ -104,7 +108,7 @@ append_chunk(Host, TcpPort, EpochID, Prefix, Chunk) -> %% be reserved by the file sequencer for later write(s) by the %% `write_chunk()' API. --spec append_chunk_extra(port(), epoch_id(), file_prefix(), chunk(), chunk_size()) -> +-spec append_chunk_extra(port_wrap(), epoch_id(), file_prefix(), chunk(), chunk_size()) -> {ok, chunk_pos()} | {error, error_general()} | {error, term()}. append_chunk_extra(Sock, EpochID, Prefix, Chunk, ChunkExtra) when is_integer(ChunkExtra), ChunkExtra >= 0 -> @@ -123,16 +127,16 @@ append_chunk_extra(Sock, EpochID, Prefix, Chunk, ChunkExtra) {ok, chunk_pos()} | {error, error_general()} | {error, term()}. append_chunk_extra(Host, TcpPort, EpochID, Prefix, Chunk, ChunkExtra) when is_integer(ChunkExtra), ChunkExtra >= 0 -> - Sock = machi_util:connect(Host, TcpPort), + Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), try append_chunk2(Sock, EpochID, Prefix, Chunk, ChunkExtra) after - catch gen_tcp:close(Sock) + disconnect(Sock) end. %% @doc Read a chunk of data of size `Size' from `File' at `Offset'. --spec read_chunk(port(), epoch_id(), file_name(), file_offset(), chunk_size()) -> +-spec read_chunk(port_wrap(), epoch_id(), file_name(), file_offset(), chunk_size()) -> {ok, chunk_s()} | {error, error_general() | 'no_such_file' | 'partial_read'} | {error, term()}. @@ -149,20 +153,20 @@ read_chunk(Sock, EpochID, File, Offset, Size) {error, term()}. read_chunk(Host, TcpPort, EpochID, File, Offset, Size) when Offset >= ?MINIMUM_OFFSET, Size >= 0 -> - Sock = machi_util:connect(Host, TcpPort), + Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), try read_chunk2(Sock, EpochID, File, Offset, Size) after - catch gen_tcp:close(Sock) + disconnect(Sock) end. %% @doc Fetch the list of chunk checksums for `File'. --spec checksum_list(port(), epoch_id(), file_name()) -> +-spec checksum_list(port_wrap(), epoch_id(), file_name()) -> {ok, [chunk_csum()]} | {error, error_general() | 'no_such_file' | 'partial_read'} | {error, term()}. -checksum_list(Sock, EpochID, File) when is_port(Sock) -> +checksum_list(Sock, EpochID, File) -> checksum_list2(Sock, EpochID, File). %% @doc Fetch the list of chunk checksums for `File'. @@ -171,18 +175,18 @@ checksum_list(Sock, EpochID, File) when is_port(Sock) -> {ok, [chunk_csum()]} | {error, error_general() | 'no_such_file'} | {error, term()}. checksum_list(Host, TcpPort, EpochID, File) when is_integer(TcpPort) -> - Sock = machi_util:connect(Host, TcpPort), + Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), try checksum_list2(Sock, EpochID, File) after - catch gen_tcp:close(Sock) + disconnect(Sock) end. %% @doc Fetch the list of all files on the remote FLU. --spec list_files(port(), epoch_id()) -> +-spec list_files(port_wrap(), epoch_id()) -> {ok, [file_info()]} | {error, term()}. -list_files(Sock, EpochID) when is_port(Sock) -> +list_files(Sock, EpochID) -> list2(Sock, EpochID). %% @doc Fetch the list of all files on the remote FLU. @@ -190,19 +194,19 @@ list_files(Sock, EpochID) when is_port(Sock) -> -spec list_files(inet_host(), inet_port(), epoch_id()) -> {ok, [file_info()]} | {error, term()}. list_files(Host, TcpPort, EpochID) when is_integer(TcpPort) -> - Sock = machi_util:connect(Host, TcpPort), + Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), try list2(Sock, EpochID) after - catch gen_tcp:close(Sock) + disconnect(Sock) end. %% @doc Fetch the wedge status from the remote FLU. --spec wedge_status(port()) -> +-spec wedge_status(port_wrap()) -> {ok, {boolean(), pv1_epoch()}} | {error, term()}. -wedge_status(Sock) when is_port(Sock) -> +wedge_status(Sock) -> wedge_status2(Sock). %% @doc Fetch the wedge status from the remote FLU. @@ -210,16 +214,16 @@ wedge_status(Sock) when is_port(Sock) -> -spec wedge_status(inet_host(), inet_port()) -> {ok, {boolean(), pv1_epoch()}} | {error, term()}. wedge_status(Host, TcpPort) when is_integer(TcpPort) -> - Sock = machi_util:connect(Host, TcpPort), + Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), try wedge_status2(Sock) after - catch gen_tcp:close(Sock) + disconnect(Sock) end. %% @doc Get the latest epoch number + checksum from the FLU's projection store. --spec get_latest_epoch(port(), projection_type()) -> +-spec get_latest_epoch(port_wrap(), projection_type()) -> {ok, epoch_id()} | {error, term()}. get_latest_epoch(Sock, ProjType) when ProjType == 'public' orelse ProjType == 'private' -> @@ -232,16 +236,16 @@ get_latest_epoch(Sock, ProjType) {ok, epoch_id()} | {error, term()}. get_latest_epoch(Host, TcpPort, ProjType) when ProjType == 'public' orelse ProjType == 'private' -> - Sock = machi_util:connect(Host, TcpPort), + Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), try get_latest_epoch2(Sock, ProjType) after - catch gen_tcp:close(Sock) + disconnect(Sock) end. %% @doc Get the latest projection from the FLU's projection store for `ProjType' --spec read_latest_projection(port(), projection_type()) -> +-spec read_latest_projection(port_wrap(), projection_type()) -> {ok, projection()} | {error, not_written} | {error, term()}. read_latest_projection(Sock, ProjType) when ProjType == 'public' orelse ProjType == 'private' -> @@ -254,16 +258,16 @@ read_latest_projection(Sock, ProjType) {ok, projection()} | {error, not_written} | {error, term()}. read_latest_projection(Host, TcpPort, ProjType) when ProjType == 'public' orelse ProjType == 'private' -> - Sock = machi_util:connect(Host, TcpPort), + Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), try read_latest_projection2(Sock, ProjType) after - catch gen_tcp:close(Sock) + disconnect(Sock) end. %% @doc Read a projection `Proj' of type `ProjType'. --spec read_projection(port(), projection_type(), epoch_num()) -> +-spec read_projection(port_wrap(), projection_type(), epoch_num()) -> {ok, projection()} | {error, written} | {error, term()}. read_projection(Sock, ProjType, Epoch) when ProjType == 'public' orelse ProjType == 'private' -> @@ -276,16 +280,16 @@ read_projection(Sock, ProjType, Epoch) {ok, projection()} | {error, written} | {error, term()}. read_projection(Host, TcpPort, ProjType, Epoch) when ProjType == 'public' orelse ProjType == 'private' -> - Sock = machi_util:connect(Host, TcpPort), + Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), try read_projection2(Sock, ProjType, Epoch) after - catch gen_tcp:close(Sock) + disconnect(Sock) end. %% @doc Write a projection `Proj' of type `ProjType'. --spec write_projection(port(), projection_type(), projection()) -> +-spec write_projection(port_wrap(), projection_type(), projection()) -> 'ok' | {error, written} | {error, term()}. write_projection(Sock, ProjType, Proj) when ProjType == 'public' orelse ProjType == 'private', @@ -300,16 +304,16 @@ write_projection(Sock, ProjType, Proj) write_projection(Host, TcpPort, ProjType, Proj) when ProjType == 'public' orelse ProjType == 'private', is_record(Proj, projection_v1) -> - Sock = machi_util:connect(Host, TcpPort), + Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), try write_projection2(Sock, ProjType, Proj) after - catch gen_tcp:close(Sock) + disconnect(Sock) end. %% @doc Get all projections from the FLU's projection store. --spec get_all_projections(port(), projection_type()) -> +-spec get_all_projections(port_wrap(), projection_type()) -> {ok, [projection()]} | {error, term()}. get_all_projections(Sock, ProjType) when ProjType == 'public' orelse ProjType == 'private' -> @@ -322,16 +326,16 @@ get_all_projections(Sock, ProjType) {ok, [projection()]} | {error, term()}. get_all_projections(Host, TcpPort, ProjType) when ProjType == 'public' orelse ProjType == 'private' -> - Sock = machi_util:connect(Host, TcpPort), + Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), try get_all_projections2(Sock, ProjType) after - catch gen_tcp:close(Sock) + disconnect(Sock) end. %% @doc Get all epoch numbers from the FLU's projection store. --spec list_all_projections(port(), projection_type()) -> +-spec list_all_projections(port_wrap(), projection_type()) -> {ok, [non_neg_integer()]} | {error, term()}. list_all_projections(Sock, ProjType) when ProjType == 'public' orelse ProjType == 'private' -> @@ -344,20 +348,37 @@ list_all_projections(Sock, ProjType) {ok, [non_neg_integer()]} | {error, term()}. list_all_projections(Host, TcpPort, ProjType) when ProjType == 'public' orelse ProjType == 'private' -> - Sock = machi_util:connect(Host, TcpPort), + Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), try list_all_projections2(Sock, ProjType) after - catch gen_tcp:close(Sock) + disconnect(Sock) end. %% @doc Quit & close the connection to remote FLU. --spec quit(port()) -> +-spec quit(port_wrap()) -> ok. -quit(Sock) when is_port(Sock) -> - catch (_ = gen_tcp:send(Sock, <<"QUIT\n">>)), - catch gen_tcp:close(Sock), +quit(Sock) -> + catch (_ = w_send(Sock, <<"QUIT\n">>)), + disconnect(Sock), + ok. + +connected_p({w,tcp,Sock}) -> + case (catch inet:peername(Sock)) of + {ok, _} -> true; + _ -> false + end; +connected_p(_) -> + false. + +connect(#p_srvr{}=P) -> + w_connect(P). + +disconnect({w,tcp,_Sock}=WS) -> + w_close(WS), + ok; +disconnect(_) -> ok. %%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -365,7 +386,7 @@ quit(Sock) when is_port(Sock) -> %% @doc Restricted API: Write a chunk of already-sequenced data to %% `File' at `Offset'. --spec write_chunk(port(), epoch_id(), file_name(), file_offset(), chunk()) -> +-spec write_chunk(port_wrap(), epoch_id(), file_name(), file_offset(), chunk()) -> ok | {error, error_general()} | {error, term()}. write_chunk(Sock, EpochID, File, Offset, Chunk) when Offset >= ?MINIMUM_OFFSET -> @@ -379,19 +400,19 @@ write_chunk(Sock, EpochID, File, Offset, Chunk) ok | {error, error_general()} | {error, term()}. write_chunk(Host, TcpPort, EpochID, File, Offset, Chunk) when Offset >= ?MINIMUM_OFFSET -> - Sock = machi_util:connect(Host, TcpPort), + Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), try write_chunk2(Sock, EpochID, File, Offset, Chunk) after - catch gen_tcp:close(Sock) + disconnect(Sock) end. %% @doc Restricted API: Delete a file after it has been successfully %% migrated. --spec delete_migration(port(), epoch_id(), file_name()) -> +-spec delete_migration(port_wrap(), epoch_id(), file_name()) -> ok | {error, error_general() | 'no_such_file'} | {error, term()}. -delete_migration(Sock, EpochID, File) when is_port(Sock) -> +delete_migration(Sock, EpochID, File) -> delete_migration2(Sock, EpochID, File). %% @doc Restricted API: Delete a file after it has been successfully @@ -400,19 +421,19 @@ delete_migration(Sock, EpochID, File) when is_port(Sock) -> -spec delete_migration(inet_host(), inet_port(), epoch_id(), file_name()) -> ok | {error, error_general() | 'no_such_file'} | {error, term()}. delete_migration(Host, TcpPort, EpochID, File) when is_integer(TcpPort) -> - Sock = machi_util:connect(Host, TcpPort), + Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), try delete_migration2(Sock, EpochID, File) after - catch gen_tcp:close(Sock) + disconnect(Sock) end. %% @doc Restricted API: Truncate a file after it has been successfully %% erasure coded. --spec trunc_hack(port(), epoch_id(), file_name()) -> +-spec trunc_hack(port_wrap(), epoch_id(), file_name()) -> ok | {error, error_general() | 'no_such_file'} | {error, term()}. -trunc_hack(Sock, EpochID, File) when is_port(Sock) -> +trunc_hack(Sock, EpochID, File) -> trunc_hack2(Sock, EpochID, File). %% @doc Restricted API: Truncate a file after it has been successfully @@ -421,11 +442,11 @@ trunc_hack(Sock, EpochID, File) when is_port(Sock) -> -spec trunc_hack(inet_host(), inet_port(), epoch_id(), file_name()) -> ok | {error, error_general() | 'no_such_file'} | {error, term()}. trunc_hack(Host, TcpPort, EpochID, File) when is_integer(TcpPort) -> - Sock = machi_util:connect(Host, TcpPort), + Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), try trunc_hack2(Sock, EpochID, File) after - catch gen_tcp:close(Sock) + disconnect(Sock) end. %%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -445,8 +466,8 @@ append_chunk2(Sock, EpochID, Prefix0, Chunk0, ChunkExtra) -> LenHex = machi_util:int_to_hexbin(Len, 32), ExtraHex = machi_util:int_to_hexbin(ChunkExtra, 32), Cmd = [<<"A ">>, EpochIDHex, LenHex, ExtraHex, Prefix, 10], - ok = gen_tcp:send(Sock, [Cmd, Chunk]), - {ok, Line} = gen_tcp:recv(Sock, 0), + ok = w_send(Sock, [Cmd, Chunk]), + {ok, Line} = w_recv(Sock, 0), PathLen = byte_size(Line) - 3 - 16 - 1 - 1, case Line of <<"OK ", OffsetHex:16/binary, " ", @@ -479,16 +500,16 @@ read_chunk2(Sock, EpochID, File0, Offset, Size) -> PrefixHex = machi_util:int_to_hexbin(Offset, 64), SizeHex = machi_util:int_to_hexbin(Size, 32), CmdLF = [$R, 32, EpochIDHex, PrefixHex, SizeHex, File, 10], - ok = gen_tcp:send(Sock, CmdLF), - case gen_tcp:recv(Sock, 3) of + ok = w_send(Sock, CmdLF), + case w_recv(Sock, 3) of {ok, <<"OK\n">>} -> - {ok, _Chunk}=Res = gen_tcp:recv(Sock, Size), + {ok, _Chunk}=Res = w_recv(Sock, Size), Res; {ok, Else} -> - {ok, OldOpts} = inet:getopts(Sock, [packet]), - ok = inet:setopts(Sock, [{packet, line}]), - {ok, Else2} = gen_tcp:recv(Sock, 0), - ok = inet:setopts(Sock, OldOpts), + {ok, OldOpts} = w_getopts(Sock, [packet]), + ok = w_setopts(Sock, [{packet, line}]), + {ok, Else2} = w_recv(Sock, 0), + ok = w_setopts(Sock, OldOpts), case Else of <<"ERA">> -> {error, todo_erasure_coded}; %% escript_cc_parse_ec_info(Sock, Line, Else2); @@ -525,12 +546,12 @@ list2(Sock, EpochID) -> {EpochNum, EpochCSum} = EpochID, EpochIDHex = machi_util:bin_to_hexstr( <>), - ok = gen_tcp:send(Sock, [<<"L ">>, EpochIDHex, <<"\n">>]), - ok = inet:setopts(Sock, [{packet, line}]), - case gen_tcp:recv(Sock, 0) of + ok = w_send(Sock, [<<"L ">>, EpochIDHex, <<"\n">>]), + ok = w_setopts(Sock, [{packet, line}]), + case w_recv(Sock, 0) of {ok, <<"OK\n">>} -> - Res = list3(gen_tcp:recv(Sock, 0), Sock), - ok = inet:setopts(Sock, [{packet, raw}]), + Res = list3(w_recv(Sock, 0), Sock), + ok = w_setopts(Sock, [{packet, raw}]), {ok, Res}; {ok, <<"ERROR WEDGED\n">>} -> {error, wedged}; @@ -550,19 +571,19 @@ list3({ok, Line}, Sock) -> FileLen = byte_size(Line) - 16 - 1 - 1, <> = Line, Size = machi_util:hexstr_to_int(SizeHex), - [{Size, File}|list3(gen_tcp:recv(Sock, 0), Sock)]; + [{Size, File}|list3(w_recv(Sock, 0), Sock)]; list3(Else, _Sock) -> throw({server_protocol_error, Else}). wedge_status2(Sock) -> try - ok = gen_tcp:send(Sock, [<<"WEDGE-STATUS\n">>]), - ok = inet:setopts(Sock, [{packet, line}]), + ok = w_send(Sock, [<<"WEDGE-STATUS\n">>]), + ok = w_setopts(Sock, [{packet, line}]), {ok, <<"OK ", BooleanHex:2/binary, " ", EpochHex:8/binary, " ", - CSumHex:40/binary, "\n">>} = gen_tcp:recv(Sock, 0), - ok = inet:setopts(Sock, [{packet, raw}]), + CSumHex:40/binary, "\n">>} = w_recv(Sock, 0), + ok = w_setopts(Sock, [{packet, raw}]), Boolean = if BooleanHex == <<"00">> -> false; BooleanHex == <<"01">> -> true end, @@ -582,15 +603,15 @@ checksum_list2(Sock, EpochID, File) -> {EpochNum, EpochCSum} = EpochID, EpochIDHex = machi_util:bin_to_hexstr( <>), - ok = gen_tcp:send(Sock, [<<"C ">>, EpochIDHex, File, <<"\n">>]), - ok = inet:setopts(Sock, [{packet, line}]), - case gen_tcp:recv(Sock, 0) of + ok = w_send(Sock, [<<"C ">>, EpochIDHex, File, <<"\n">>]), + ok = w_setopts(Sock, [{packet, line}]), + case w_recv(Sock, 0) of {ok, <<"OK ", Rest/binary>> = Line} -> put(status, ok), % may be unset later RestLen = byte_size(Rest) - 1, <> = Rest, <> = machi_util:hexstr_to_bin(LenHex), - ok = inet:setopts(Sock, [{packet, raw}]), + ok = w_setopts(Sock, [{packet, raw}]), {ok, checksum_list_finish(checksum_list_fast(Sock, Len))}; {ok, <<"ERROR NO-SUCH-FILE", _/binary>>} -> {error, no_such_file}; @@ -615,11 +636,11 @@ checksum_list2(Sock, EpochID, File) -> end. checksum_list_fast(Sock, 0) -> - {ok, <<".\n">> = _Line} = gen_tcp:recv(Sock, 2), + {ok, <<".\n">> = _Line} = w_recv(Sock, 2), []; checksum_list_fast(Sock, Remaining) -> Num = erlang:min(Remaining, 1024*1024), - {ok, Bytes} = gen_tcp:recv(Sock, Num), + {ok, Bytes} = w_recv(Sock, Num), [Bytes|checksum_list_fast(Sock, Remaining - byte_size(Bytes))]. checksum_list_finish(Chunks) -> @@ -656,8 +677,8 @@ write_chunk2(Sock, EpochID, File0, Offset, Chunk0) -> LenHex = machi_util:int_to_hexbin(Len, 32), Cmd = [<<"W-repl ">>, EpochIDHex, OffsetHex, LenHex, File, <<"\n">>], - ok = gen_tcp:send(Sock, [Cmd, Chunk]), - {ok, Line} = gen_tcp:recv(Sock, 0), + ok = w_send(Sock, [Cmd, Chunk]), + {ok, Line} = w_recv(Sock, 0), PathLen = byte_size(Line) - 3 - 16 - 1 - 1, case Line of <<"OK\n">> -> @@ -685,9 +706,9 @@ delete_migration2(Sock, EpochID, File) -> EpochIDHex = machi_util:bin_to_hexstr( <>), Cmd = [<<"DEL-migration ">>, EpochIDHex, File, <<"\n">>], - ok = gen_tcp:send(Sock, Cmd), - ok = inet:setopts(Sock, [{packet, line}]), - case gen_tcp:recv(Sock, 0) of + ok = w_send(Sock, Cmd), + ok = w_setopts(Sock, [{packet, line}]), + case w_recv(Sock, 0) of {ok, <<"OK\n">>} -> ok; {ok, <<"ERROR NO-SUCH-FILE", _/binary>>} -> @@ -715,9 +736,9 @@ trunc_hack2(Sock, EpochID, File) -> EpochIDHex = machi_util:bin_to_hexstr( <>), Cmd = [<<"TRUNC-hack--- ">>, EpochIDHex, File, <<"\n">>], - ok = gen_tcp:send(Sock, Cmd), - ok = inet:setopts(Sock, [{packet, line}]), - case gen_tcp:recv(Sock, 0) of + ok = w_send(Sock, Cmd), + ok = w_setopts(Sock, [{packet, line}]), + case w_recv(Sock, 0) of {ok, <<"OK\n">>} -> ok; {ok, <<"ERROR NO-SUCH-FILE", _/binary>>} -> @@ -770,16 +791,16 @@ do_projection_common(Sock, ProjCmd) -> true = (Len =< ?MAX_CHUNK_SIZE), LenHex = machi_util:int_to_hexbin(Len, 32), Cmd = [<<"PROJ ">>, LenHex, <<"\n">>], - ok = gen_tcp:send(Sock, [Cmd, ProjCmdBin]), - ok = inet:setopts(Sock, [{packet, line}]), - case gen_tcp:recv(Sock, 0) of + ok = w_send(Sock, [Cmd, ProjCmdBin]), + ok = w_setopts(Sock, [{packet, line}]), + case w_recv(Sock, 0) of {ok, Line} -> case Line of <<"OK ", ResLenHex:8/binary, "\n">> -> ResLen = machi_util:hexstr_to_int(ResLenHex), - ok = inet:setopts(Sock, [{packet, raw}]), - {ok, ResBin} = gen_tcp:recv(Sock, ResLen), - ok = inet:setopts(Sock, [{packet, line}]), + ok = w_setopts(Sock, [{packet, raw}]), + {ok, ResBin} = w_recv(Sock, ResLen), + ok = w_setopts(Sock, [{packet, line}]), binary_to_term(ResBin); Else -> {error, Else} @@ -795,3 +816,31 @@ do_projection_common(Sock, ProjCmd) -> put(bad_sock, Sock), {error, {badmatch, BadMatch, erlang:get_stacktrace()}} end. + +%%%%%%%%%%%%%%%%%%%%%%%%%%% + +w_connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}) -> + try + Sock = machi_util:connect(Host, TcpPort), + {w,tcp,Sock} + catch + _:_ -> + undefined + end. + +w_close({w,tcp,Sock}) -> + catch gen_tcp:close(Sock), + ok. + +w_recv({w,tcp,Sock}, Amt) -> + gen_tcp:recv(Sock, Amt). + +w_send({w,tcp,Sock}, IoData) -> + gen_tcp:send(Sock, IoData). + +w_getopts({w,tcp,Sock}, Opts) -> + inet:getopts(Sock, Opts). + +w_setopts({w,tcp,Sock}, Opts) -> + inet:setopts(Sock, Opts). + diff --git a/src/machi_proxy_flu1_client.erl b/src/machi_proxy_flu1_client.erl index 0ca85ce..f82a7b9 100644 --- a/src/machi_proxy_flu1_client.erl +++ b/src/machi_proxy_flu1_client.erl @@ -347,35 +347,21 @@ make_req_fun({list_all_projections, ProjType}, #state{sock=Sock}) -> fun() -> ?FLU_C:list_all_projections(Sock, ProjType) end. connected_p(#state{sock=SockMaybe, - i=#p_srvr{proto=ipv4}=_I}=_S) -> - is_port(SockMaybe); -connected_p(#state{i=#p_srvr{proto=disterl, - name=_NodeName}=_I}=_S) -> - true. - %% case net_adm:ping(NodeName) of - %% ping -> - %% true; - %% _ -> - %% false - %% end. + i=#p_srvr{proto_mod=Mod}=_I}=_S) -> + Mod:connected_p(SockMaybe). try_connect(#state{sock=undefined, - i=#p_srvr{proto=ipv4, address=Host, port=TcpPort}=_I}=S) -> - try - Sock = machi_util:connect(Host, TcpPort), - S#state{sock=Sock} - catch - _:_ -> - S - end; + i=#p_srvr{proto_mod=Mod}=P}=S) -> + Sock = Mod:connect(P), + S#state{sock=Sock}; try_connect(S) -> %% If we're connection-based, we're already connected. %% If we're not connection-based, then there's nothing to do. S. +disconnect(#state{sock=undefined}=S) -> + S; disconnect(#state{sock=Sock, - i=#p_srvr{proto=ipv4}=_I}=S) -> - (catch gen_tcp:close(Sock)), - S#state{sock=undefined}; -disconnect(S) -> - S. + i=#p_srvr{proto_mod=Mod}=_I}=S) -> + Mod:disconnect(Sock), + S#state{sock=undefined}. diff --git a/test/machi_admin_util_test.erl b/test/machi_admin_util_test.erl index 09d4717..9ce952f 100644 --- a/test/machi_admin_util_test.erl +++ b/test/machi_admin_util_test.erl @@ -38,7 +38,7 @@ verify_file_checksums_test() -> W_props = [{initial_wedged, false}], FLU1 = machi_flu1_test:setup_test_flu(verify1_flu, TcpPort, DataDir, W_props), - Sock1 = machi_util:connect(Host, TcpPort), + Sock1 = ?FLU_C:connect(#p_srvr{address=Host, port=TcpPort}), try Prefix = <<"verify_prefix">>, [{ok, _} = ?FLU_C:append_chunk(Sock1, ?DUMMY_PV1_EPOCH, diff --git a/test/machi_chain_manager1_test.erl b/test/machi_chain_manager1_test.erl index c51559d..ef9f92f 100644 --- a/test/machi_chain_manager1_test.erl +++ b/test/machi_chain_manager1_test.erl @@ -140,7 +140,7 @@ smoke0_test() -> Host = "localhost", TcpPort = 6623, {ok, FLUa} = machi_flu1:start_link([{a,TcpPort,"./data.a"}]), - Pa = #p_srvr{name=a, proto=ipv4, address=Host, port=TcpPort}, + Pa = #p_srvr{name=a, address=Host, port=TcpPort}, Members_Dict = machi_projection:make_members_dict([Pa]), %% Egadz, more racing on startup, yay. TODO fix. timer:sleep(1), diff --git a/test/machi_proxy_flu1_client_test.erl b/test/machi_proxy_flu1_client_test.erl index e4bfdb5..4b0c7c7 100644 --- a/test/machi_proxy_flu1_client_test.erl +++ b/test/machi_proxy_flu1_client_test.erl @@ -39,7 +39,7 @@ api_smoke_test() -> erase(flu_pid), try - I = #p_srvr{name=RegName, proto=ipv4, address=Host, port=TcpPort}, + I = #p_srvr{name=RegName, address=Host, port=TcpPort}, {ok, Prox1} = ?MUT:start_link(I), try FakeEpoch = ?DUMMY_PV1_EPOCH,