WIP: rearrange client code to approach some semblance of modularity someday

This commit is contained in:
Scott Lystig Fritchie 2015-05-17 16:18:30 +09:00
parent c7d4131a44
commit a06055ac23
7 changed files with 161 additions and 126 deletions

View file

@ -29,7 +29,7 @@
-record(p_srvr, {
name :: pv1_server(),
proto = 'ipv4' :: 'ipv4' | 'disterl', % disterl? Hrm.
proto_mod = 'machi_flu1_client' :: atom(), % Module name
address :: term(), % Protocol-specific
port :: term(), % Protocol-specific
props = [] :: list() % proplist for other related info

View file

@ -46,11 +46,11 @@ verify_file_checksums_local(Sock1, EpochID, Path) when is_port(Sock1) ->
machi_flu1_client:epoch_id(), binary()|list()) ->
{ok, [tuple()]} | {error, term()}.
verify_file_checksums_local(Host, TcpPort, EpochID, Path) ->
Sock1 = machi_util:connect(Host, TcpPort),
Sock1 = ?FLU_C:connect(#p_srvr{address=Host, port=TcpPort}),
try
verify_file_checksums_local2(Sock1, EpochID, Path)
after
catch gen_tcp:close(Sock1)
catch ?FLU_C:disconnect(Sock1)
end.
-spec verify_file_checksums_remote(port(), machi_flu1_client:epoch_id(), binary()|list()) ->
@ -62,11 +62,11 @@ verify_file_checksums_remote(Sock1, EpochID, File) when is_port(Sock1) ->
machi_flu1_client:epoch_id(), binary()|list()) ->
{ok, [tuple()]} | {error, term()}.
verify_file_checksums_remote(Host, TcpPort, EpochID, File) ->
Sock1 = machi_util:connect(Host, TcpPort),
Sock1 = ?FLU_C:connect(#p_srvr{address=Host, port=TcpPort}),
try
verify_file_checksums_remote2(Sock1, EpochID, File)
after
catch gen_tcp:close(Sock1)
catch ?FLU_C:disconnect(Sock1)
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%

View file

@ -43,7 +43,10 @@
list_all_projections/2, list_all_projections/3,
%% Common API
quit/1
quit/1,
%% Connection management API
connected_p/1, connect/1, disconnect/1
]).
%% For "internal" replication only.
-export([
@ -69,6 +72,7 @@
-type file_prefix() :: binary() | list().
-type inet_host() :: inet:ip_address() | inet:hostname().
-type inet_port() :: inet:port_number().
-type port_wrap() :: {w,atom(),term()}.
-type projection() :: #projection_v1{}.
-type projection_type() :: 'public' | 'private'.
@ -77,7 +81,7 @@
%% @doc Append a chunk (binary- or iolist-style) of data to a file
%% with `Prefix'.
-spec append_chunk(port(), epoch_id(), file_prefix(), chunk()) ->
-spec append_chunk(port_wrap(), epoch_id(), file_prefix(), chunk()) ->
{ok, chunk_pos()} | {error, error_general()} | {error, term()}.
append_chunk(Sock, EpochID, Prefix, Chunk) ->
append_chunk2(Sock, EpochID, Prefix, Chunk, 0).
@ -89,11 +93,11 @@ append_chunk(Sock, EpochID, Prefix, Chunk) ->
epoch_id(), file_prefix(), chunk()) ->
{ok, chunk_pos()} | {error, error_general()} | {error, term()}.
append_chunk(Host, TcpPort, EpochID, Prefix, Chunk) ->
Sock = machi_util:connect(Host, TcpPort),
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
try
append_chunk2(Sock, EpochID, Prefix, Chunk, 0)
after
catch gen_tcp:close(Sock)
disconnect(Sock)
end.
%% @doc Append a chunk (binary- or iolist-style) of data to a file
@ -104,7 +108,7 @@ append_chunk(Host, TcpPort, EpochID, Prefix, Chunk) ->
%% be reserved by the file sequencer for later write(s) by the
%% `write_chunk()' API.
-spec append_chunk_extra(port(), epoch_id(), file_prefix(), chunk(), chunk_size()) ->
-spec append_chunk_extra(port_wrap(), epoch_id(), file_prefix(), chunk(), chunk_size()) ->
{ok, chunk_pos()} | {error, error_general()} | {error, term()}.
append_chunk_extra(Sock, EpochID, Prefix, Chunk, ChunkExtra)
when is_integer(ChunkExtra), ChunkExtra >= 0 ->
@ -123,16 +127,16 @@ append_chunk_extra(Sock, EpochID, Prefix, Chunk, ChunkExtra)
{ok, chunk_pos()} | {error, error_general()} | {error, term()}.
append_chunk_extra(Host, TcpPort, EpochID, Prefix, Chunk, ChunkExtra)
when is_integer(ChunkExtra), ChunkExtra >= 0 ->
Sock = machi_util:connect(Host, TcpPort),
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
try
append_chunk2(Sock, EpochID, Prefix, Chunk, ChunkExtra)
after
catch gen_tcp:close(Sock)
disconnect(Sock)
end.
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
-spec read_chunk(port(), epoch_id(), file_name(), file_offset(), chunk_size()) ->
-spec read_chunk(port_wrap(), epoch_id(), file_name(), file_offset(), chunk_size()) ->
{ok, chunk_s()} |
{error, error_general() | 'no_such_file' | 'partial_read'} |
{error, term()}.
@ -149,20 +153,20 @@ read_chunk(Sock, EpochID, File, Offset, Size)
{error, term()}.
read_chunk(Host, TcpPort, EpochID, File, Offset, Size)
when Offset >= ?MINIMUM_OFFSET, Size >= 0 ->
Sock = machi_util:connect(Host, TcpPort),
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
try
read_chunk2(Sock, EpochID, File, Offset, Size)
after
catch gen_tcp:close(Sock)
disconnect(Sock)
end.
%% @doc Fetch the list of chunk checksums for `File'.
-spec checksum_list(port(), epoch_id(), file_name()) ->
-spec checksum_list(port_wrap(), epoch_id(), file_name()) ->
{ok, [chunk_csum()]} |
{error, error_general() | 'no_such_file' | 'partial_read'} |
{error, term()}.
checksum_list(Sock, EpochID, File) when is_port(Sock) ->
checksum_list(Sock, EpochID, File) ->
checksum_list2(Sock, EpochID, File).
%% @doc Fetch the list of chunk checksums for `File'.
@ -171,18 +175,18 @@ checksum_list(Sock, EpochID, File) when is_port(Sock) ->
{ok, [chunk_csum()]} |
{error, error_general() | 'no_such_file'} | {error, term()}.
checksum_list(Host, TcpPort, EpochID, File) when is_integer(TcpPort) ->
Sock = machi_util:connect(Host, TcpPort),
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
try
checksum_list2(Sock, EpochID, File)
after
catch gen_tcp:close(Sock)
disconnect(Sock)
end.
%% @doc Fetch the list of all files on the remote FLU.
-spec list_files(port(), epoch_id()) ->
-spec list_files(port_wrap(), epoch_id()) ->
{ok, [file_info()]} | {error, term()}.
list_files(Sock, EpochID) when is_port(Sock) ->
list_files(Sock, EpochID) ->
list2(Sock, EpochID).
%% @doc Fetch the list of all files on the remote FLU.
@ -190,19 +194,19 @@ list_files(Sock, EpochID) when is_port(Sock) ->
-spec list_files(inet_host(), inet_port(), epoch_id()) ->
{ok, [file_info()]} | {error, term()}.
list_files(Host, TcpPort, EpochID) when is_integer(TcpPort) ->
Sock = machi_util:connect(Host, TcpPort),
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
try
list2(Sock, EpochID)
after
catch gen_tcp:close(Sock)
disconnect(Sock)
end.
%% @doc Fetch the wedge status from the remote FLU.
-spec wedge_status(port()) ->
-spec wedge_status(port_wrap()) ->
{ok, {boolean(), pv1_epoch()}} | {error, term()}.
wedge_status(Sock) when is_port(Sock) ->
wedge_status(Sock) ->
wedge_status2(Sock).
%% @doc Fetch the wedge status from the remote FLU.
@ -210,16 +214,16 @@ wedge_status(Sock) when is_port(Sock) ->
-spec wedge_status(inet_host(), inet_port()) ->
{ok, {boolean(), pv1_epoch()}} | {error, term()}.
wedge_status(Host, TcpPort) when is_integer(TcpPort) ->
Sock = machi_util:connect(Host, TcpPort),
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
try
wedge_status2(Sock)
after
catch gen_tcp:close(Sock)
disconnect(Sock)
end.
%% @doc Get the latest epoch number + checksum from the FLU's projection store.
-spec get_latest_epoch(port(), projection_type()) ->
-spec get_latest_epoch(port_wrap(), projection_type()) ->
{ok, epoch_id()} | {error, term()}.
get_latest_epoch(Sock, ProjType)
when ProjType == 'public' orelse ProjType == 'private' ->
@ -232,16 +236,16 @@ get_latest_epoch(Sock, ProjType)
{ok, epoch_id()} | {error, term()}.
get_latest_epoch(Host, TcpPort, ProjType)
when ProjType == 'public' orelse ProjType == 'private' ->
Sock = machi_util:connect(Host, TcpPort),
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
try
get_latest_epoch2(Sock, ProjType)
after
catch gen_tcp:close(Sock)
disconnect(Sock)
end.
%% @doc Get the latest projection from the FLU's projection store for `ProjType'
-spec read_latest_projection(port(), projection_type()) ->
-spec read_latest_projection(port_wrap(), projection_type()) ->
{ok, projection()} | {error, not_written} | {error, term()}.
read_latest_projection(Sock, ProjType)
when ProjType == 'public' orelse ProjType == 'private' ->
@ -254,16 +258,16 @@ read_latest_projection(Sock, ProjType)
{ok, projection()} | {error, not_written} | {error, term()}.
read_latest_projection(Host, TcpPort, ProjType)
when ProjType == 'public' orelse ProjType == 'private' ->
Sock = machi_util:connect(Host, TcpPort),
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
try
read_latest_projection2(Sock, ProjType)
after
catch gen_tcp:close(Sock)
disconnect(Sock)
end.
%% @doc Read a projection `Proj' of type `ProjType'.
-spec read_projection(port(), projection_type(), epoch_num()) ->
-spec read_projection(port_wrap(), projection_type(), epoch_num()) ->
{ok, projection()} | {error, written} | {error, term()}.
read_projection(Sock, ProjType, Epoch)
when ProjType == 'public' orelse ProjType == 'private' ->
@ -276,16 +280,16 @@ read_projection(Sock, ProjType, Epoch)
{ok, projection()} | {error, written} | {error, term()}.
read_projection(Host, TcpPort, ProjType, Epoch)
when ProjType == 'public' orelse ProjType == 'private' ->
Sock = machi_util:connect(Host, TcpPort),
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
try
read_projection2(Sock, ProjType, Epoch)
after
catch gen_tcp:close(Sock)
disconnect(Sock)
end.
%% @doc Write a projection `Proj' of type `ProjType'.
-spec write_projection(port(), projection_type(), projection()) ->
-spec write_projection(port_wrap(), projection_type(), projection()) ->
'ok' | {error, written} | {error, term()}.
write_projection(Sock, ProjType, Proj)
when ProjType == 'public' orelse ProjType == 'private',
@ -300,16 +304,16 @@ write_projection(Sock, ProjType, Proj)
write_projection(Host, TcpPort, ProjType, Proj)
when ProjType == 'public' orelse ProjType == 'private',
is_record(Proj, projection_v1) ->
Sock = machi_util:connect(Host, TcpPort),
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
try
write_projection2(Sock, ProjType, Proj)
after
catch gen_tcp:close(Sock)
disconnect(Sock)
end.
%% @doc Get all projections from the FLU's projection store.
-spec get_all_projections(port(), projection_type()) ->
-spec get_all_projections(port_wrap(), projection_type()) ->
{ok, [projection()]} | {error, term()}.
get_all_projections(Sock, ProjType)
when ProjType == 'public' orelse ProjType == 'private' ->
@ -322,16 +326,16 @@ get_all_projections(Sock, ProjType)
{ok, [projection()]} | {error, term()}.
get_all_projections(Host, TcpPort, ProjType)
when ProjType == 'public' orelse ProjType == 'private' ->
Sock = machi_util:connect(Host, TcpPort),
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
try
get_all_projections2(Sock, ProjType)
after
catch gen_tcp:close(Sock)
disconnect(Sock)
end.
%% @doc Get all epoch numbers from the FLU's projection store.
-spec list_all_projections(port(), projection_type()) ->
-spec list_all_projections(port_wrap(), projection_type()) ->
{ok, [non_neg_integer()]} | {error, term()}.
list_all_projections(Sock, ProjType)
when ProjType == 'public' orelse ProjType == 'private' ->
@ -344,20 +348,37 @@ list_all_projections(Sock, ProjType)
{ok, [non_neg_integer()]} | {error, term()}.
list_all_projections(Host, TcpPort, ProjType)
when ProjType == 'public' orelse ProjType == 'private' ->
Sock = machi_util:connect(Host, TcpPort),
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
try
list_all_projections2(Sock, ProjType)
after
catch gen_tcp:close(Sock)
disconnect(Sock)
end.
%% @doc Quit & close the connection to remote FLU.
-spec quit(port()) ->
-spec quit(port_wrap()) ->
ok.
quit(Sock) when is_port(Sock) ->
catch (_ = gen_tcp:send(Sock, <<"QUIT\n">>)),
catch gen_tcp:close(Sock),
quit(Sock) ->
catch (_ = w_send(Sock, <<"QUIT\n">>)),
disconnect(Sock),
ok.
connected_p({w,tcp,Sock}) ->
case (catch inet:peername(Sock)) of
{ok, _} -> true;
_ -> false
end;
connected_p(_) ->
false.
connect(#p_srvr{}=P) ->
w_connect(P).
disconnect({w,tcp,_Sock}=WS) ->
w_close(WS),
ok;
disconnect(_) ->
ok.
%%%%%%%%%%%%%%%%%%%%%%%%%%%
@ -365,7 +386,7 @@ quit(Sock) when is_port(Sock) ->
%% @doc Restricted API: Write a chunk of already-sequenced data to
%% `File' at `Offset'.
-spec write_chunk(port(), epoch_id(), file_name(), file_offset(), chunk()) ->
-spec write_chunk(port_wrap(), epoch_id(), file_name(), file_offset(), chunk()) ->
ok | {error, error_general()} | {error, term()}.
write_chunk(Sock, EpochID, File, Offset, Chunk)
when Offset >= ?MINIMUM_OFFSET ->
@ -379,19 +400,19 @@ write_chunk(Sock, EpochID, File, Offset, Chunk)
ok | {error, error_general()} | {error, term()}.
write_chunk(Host, TcpPort, EpochID, File, Offset, Chunk)
when Offset >= ?MINIMUM_OFFSET ->
Sock = machi_util:connect(Host, TcpPort),
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
try
write_chunk2(Sock, EpochID, File, Offset, Chunk)
after
catch gen_tcp:close(Sock)
disconnect(Sock)
end.
%% @doc Restricted API: Delete a file after it has been successfully
%% migrated.
-spec delete_migration(port(), epoch_id(), file_name()) ->
-spec delete_migration(port_wrap(), epoch_id(), file_name()) ->
ok | {error, error_general() | 'no_such_file'} | {error, term()}.
delete_migration(Sock, EpochID, File) when is_port(Sock) ->
delete_migration(Sock, EpochID, File) ->
delete_migration2(Sock, EpochID, File).
%% @doc Restricted API: Delete a file after it has been successfully
@ -400,19 +421,19 @@ delete_migration(Sock, EpochID, File) when is_port(Sock) ->
-spec delete_migration(inet_host(), inet_port(), epoch_id(), file_name()) ->
ok | {error, error_general() | 'no_such_file'} | {error, term()}.
delete_migration(Host, TcpPort, EpochID, File) when is_integer(TcpPort) ->
Sock = machi_util:connect(Host, TcpPort),
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
try
delete_migration2(Sock, EpochID, File)
after
catch gen_tcp:close(Sock)
disconnect(Sock)
end.
%% @doc Restricted API: Truncate a file after it has been successfully
%% erasure coded.
-spec trunc_hack(port(), epoch_id(), file_name()) ->
-spec trunc_hack(port_wrap(), epoch_id(), file_name()) ->
ok | {error, error_general() | 'no_such_file'} | {error, term()}.
trunc_hack(Sock, EpochID, File) when is_port(Sock) ->
trunc_hack(Sock, EpochID, File) ->
trunc_hack2(Sock, EpochID, File).
%% @doc Restricted API: Truncate a file after it has been successfully
@ -421,11 +442,11 @@ trunc_hack(Sock, EpochID, File) when is_port(Sock) ->
-spec trunc_hack(inet_host(), inet_port(), epoch_id(), file_name()) ->
ok | {error, error_general() | 'no_such_file'} | {error, term()}.
trunc_hack(Host, TcpPort, EpochID, File) when is_integer(TcpPort) ->
Sock = machi_util:connect(Host, TcpPort),
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
try
trunc_hack2(Sock, EpochID, File)
after
catch gen_tcp:close(Sock)
disconnect(Sock)
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%
@ -445,8 +466,8 @@ append_chunk2(Sock, EpochID, Prefix0, Chunk0, ChunkExtra) ->
LenHex = machi_util:int_to_hexbin(Len, 32),
ExtraHex = machi_util:int_to_hexbin(ChunkExtra, 32),
Cmd = [<<"A ">>, EpochIDHex, LenHex, ExtraHex, Prefix, 10],
ok = gen_tcp:send(Sock, [Cmd, Chunk]),
{ok, Line} = gen_tcp:recv(Sock, 0),
ok = w_send(Sock, [Cmd, Chunk]),
{ok, Line} = w_recv(Sock, 0),
PathLen = byte_size(Line) - 3 - 16 - 1 - 1,
case Line of
<<"OK ", OffsetHex:16/binary, " ",
@ -479,16 +500,16 @@ read_chunk2(Sock, EpochID, File0, Offset, Size) ->
PrefixHex = machi_util:int_to_hexbin(Offset, 64),
SizeHex = machi_util:int_to_hexbin(Size, 32),
CmdLF = [$R, 32, EpochIDHex, PrefixHex, SizeHex, File, 10],
ok = gen_tcp:send(Sock, CmdLF),
case gen_tcp:recv(Sock, 3) of
ok = w_send(Sock, CmdLF),
case w_recv(Sock, 3) of
{ok, <<"OK\n">>} ->
{ok, _Chunk}=Res = gen_tcp:recv(Sock, Size),
{ok, _Chunk}=Res = w_recv(Sock, Size),
Res;
{ok, Else} ->
{ok, OldOpts} = inet:getopts(Sock, [packet]),
ok = inet:setopts(Sock, [{packet, line}]),
{ok, Else2} = gen_tcp:recv(Sock, 0),
ok = inet:setopts(Sock, OldOpts),
{ok, OldOpts} = w_getopts(Sock, [packet]),
ok = w_setopts(Sock, [{packet, line}]),
{ok, Else2} = w_recv(Sock, 0),
ok = w_setopts(Sock, OldOpts),
case Else of
<<"ERA">> ->
{error, todo_erasure_coded}; %% escript_cc_parse_ec_info(Sock, Line, Else2);
@ -525,12 +546,12 @@ list2(Sock, EpochID) ->
{EpochNum, EpochCSum} = EpochID,
EpochIDHex = machi_util:bin_to_hexstr(
<<EpochNum:(4*8)/big, EpochCSum/binary>>),
ok = gen_tcp:send(Sock, [<<"L ">>, EpochIDHex, <<"\n">>]),
ok = inet:setopts(Sock, [{packet, line}]),
case gen_tcp:recv(Sock, 0) of
ok = w_send(Sock, [<<"L ">>, EpochIDHex, <<"\n">>]),
ok = w_setopts(Sock, [{packet, line}]),
case w_recv(Sock, 0) of
{ok, <<"OK\n">>} ->
Res = list3(gen_tcp:recv(Sock, 0), Sock),
ok = inet:setopts(Sock, [{packet, raw}]),
Res = list3(w_recv(Sock, 0), Sock),
ok = w_setopts(Sock, [{packet, raw}]),
{ok, Res};
{ok, <<"ERROR WEDGED\n">>} ->
{error, wedged};
@ -550,19 +571,19 @@ list3({ok, Line}, Sock) ->
FileLen = byte_size(Line) - 16 - 1 - 1,
<<SizeHex:16/binary, " ", File:FileLen/binary, _/binary>> = Line,
Size = machi_util:hexstr_to_int(SizeHex),
[{Size, File}|list3(gen_tcp:recv(Sock, 0), Sock)];
[{Size, File}|list3(w_recv(Sock, 0), Sock)];
list3(Else, _Sock) ->
throw({server_protocol_error, Else}).
wedge_status2(Sock) ->
try
ok = gen_tcp:send(Sock, [<<"WEDGE-STATUS\n">>]),
ok = inet:setopts(Sock, [{packet, line}]),
ok = w_send(Sock, [<<"WEDGE-STATUS\n">>]),
ok = w_setopts(Sock, [{packet, line}]),
{ok, <<"OK ",
BooleanHex:2/binary, " ",
EpochHex:8/binary, " ",
CSumHex:40/binary, "\n">>} = gen_tcp:recv(Sock, 0),
ok = inet:setopts(Sock, [{packet, raw}]),
CSumHex:40/binary, "\n">>} = w_recv(Sock, 0),
ok = w_setopts(Sock, [{packet, raw}]),
Boolean = if BooleanHex == <<"00">> -> false;
BooleanHex == <<"01">> -> true
end,
@ -582,15 +603,15 @@ checksum_list2(Sock, EpochID, File) ->
{EpochNum, EpochCSum} = EpochID,
EpochIDHex = machi_util:bin_to_hexstr(
<<EpochNum:(4*8)/big, EpochCSum/binary>>),
ok = gen_tcp:send(Sock, [<<"C ">>, EpochIDHex, File, <<"\n">>]),
ok = inet:setopts(Sock, [{packet, line}]),
case gen_tcp:recv(Sock, 0) of
ok = w_send(Sock, [<<"C ">>, EpochIDHex, File, <<"\n">>]),
ok = w_setopts(Sock, [{packet, line}]),
case w_recv(Sock, 0) of
{ok, <<"OK ", Rest/binary>> = Line} ->
put(status, ok), % may be unset later
RestLen = byte_size(Rest) - 1,
<<LenHex:RestLen/binary, _:1/binary>> = Rest,
<<Len:64/big>> = machi_util:hexstr_to_bin(LenHex),
ok = inet:setopts(Sock, [{packet, raw}]),
ok = w_setopts(Sock, [{packet, raw}]),
{ok, checksum_list_finish(checksum_list_fast(Sock, Len))};
{ok, <<"ERROR NO-SUCH-FILE", _/binary>>} ->
{error, no_such_file};
@ -615,11 +636,11 @@ checksum_list2(Sock, EpochID, File) ->
end.
checksum_list_fast(Sock, 0) ->
{ok, <<".\n">> = _Line} = gen_tcp:recv(Sock, 2),
{ok, <<".\n">> = _Line} = w_recv(Sock, 2),
[];
checksum_list_fast(Sock, Remaining) ->
Num = erlang:min(Remaining, 1024*1024),
{ok, Bytes} = gen_tcp:recv(Sock, Num),
{ok, Bytes} = w_recv(Sock, Num),
[Bytes|checksum_list_fast(Sock, Remaining - byte_size(Bytes))].
checksum_list_finish(Chunks) ->
@ -656,8 +677,8 @@ write_chunk2(Sock, EpochID, File0, Offset, Chunk0) ->
LenHex = machi_util:int_to_hexbin(Len, 32),
Cmd = [<<"W-repl ">>, EpochIDHex, OffsetHex,
LenHex, File, <<"\n">>],
ok = gen_tcp:send(Sock, [Cmd, Chunk]),
{ok, Line} = gen_tcp:recv(Sock, 0),
ok = w_send(Sock, [Cmd, Chunk]),
{ok, Line} = w_recv(Sock, 0),
PathLen = byte_size(Line) - 3 - 16 - 1 - 1,
case Line of
<<"OK\n">> ->
@ -685,9 +706,9 @@ delete_migration2(Sock, EpochID, File) ->
EpochIDHex = machi_util:bin_to_hexstr(
<<EpochNum:(4*8)/big, EpochCSum/binary>>),
Cmd = [<<"DEL-migration ">>, EpochIDHex, File, <<"\n">>],
ok = gen_tcp:send(Sock, Cmd),
ok = inet:setopts(Sock, [{packet, line}]),
case gen_tcp:recv(Sock, 0) of
ok = w_send(Sock, Cmd),
ok = w_setopts(Sock, [{packet, line}]),
case w_recv(Sock, 0) of
{ok, <<"OK\n">>} ->
ok;
{ok, <<"ERROR NO-SUCH-FILE", _/binary>>} ->
@ -715,9 +736,9 @@ trunc_hack2(Sock, EpochID, File) ->
EpochIDHex = machi_util:bin_to_hexstr(
<<EpochNum:(4*8)/big, EpochCSum/binary>>),
Cmd = [<<"TRUNC-hack--- ">>, EpochIDHex, File, <<"\n">>],
ok = gen_tcp:send(Sock, Cmd),
ok = inet:setopts(Sock, [{packet, line}]),
case gen_tcp:recv(Sock, 0) of
ok = w_send(Sock, Cmd),
ok = w_setopts(Sock, [{packet, line}]),
case w_recv(Sock, 0) of
{ok, <<"OK\n">>} ->
ok;
{ok, <<"ERROR NO-SUCH-FILE", _/binary>>} ->
@ -770,16 +791,16 @@ do_projection_common(Sock, ProjCmd) ->
true = (Len =< ?MAX_CHUNK_SIZE),
LenHex = machi_util:int_to_hexbin(Len, 32),
Cmd = [<<"PROJ ">>, LenHex, <<"\n">>],
ok = gen_tcp:send(Sock, [Cmd, ProjCmdBin]),
ok = inet:setopts(Sock, [{packet, line}]),
case gen_tcp:recv(Sock, 0) of
ok = w_send(Sock, [Cmd, ProjCmdBin]),
ok = w_setopts(Sock, [{packet, line}]),
case w_recv(Sock, 0) of
{ok, Line} ->
case Line of
<<"OK ", ResLenHex:8/binary, "\n">> ->
ResLen = machi_util:hexstr_to_int(ResLenHex),
ok = inet:setopts(Sock, [{packet, raw}]),
{ok, ResBin} = gen_tcp:recv(Sock, ResLen),
ok = inet:setopts(Sock, [{packet, line}]),
ok = w_setopts(Sock, [{packet, raw}]),
{ok, ResBin} = w_recv(Sock, ResLen),
ok = w_setopts(Sock, [{packet, line}]),
binary_to_term(ResBin);
Else ->
{error, Else}
@ -795,3 +816,31 @@ do_projection_common(Sock, ProjCmd) ->
put(bad_sock, Sock),
{error, {badmatch, BadMatch, erlang:get_stacktrace()}}
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%
w_connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}) ->
try
Sock = machi_util:connect(Host, TcpPort),
{w,tcp,Sock}
catch
_:_ ->
undefined
end.
w_close({w,tcp,Sock}) ->
catch gen_tcp:close(Sock),
ok.
w_recv({w,tcp,Sock}, Amt) ->
gen_tcp:recv(Sock, Amt).
w_send({w,tcp,Sock}, IoData) ->
gen_tcp:send(Sock, IoData).
w_getopts({w,tcp,Sock}, Opts) ->
inet:getopts(Sock, Opts).
w_setopts({w,tcp,Sock}, Opts) ->
inet:setopts(Sock, Opts).

View file

@ -347,35 +347,21 @@ make_req_fun({list_all_projections, ProjType}, #state{sock=Sock}) ->
fun() -> ?FLU_C:list_all_projections(Sock, ProjType) end.
connected_p(#state{sock=SockMaybe,
i=#p_srvr{proto=ipv4}=_I}=_S) ->
is_port(SockMaybe);
connected_p(#state{i=#p_srvr{proto=disterl,
name=_NodeName}=_I}=_S) ->
true.
%% case net_adm:ping(NodeName) of
%% ping ->
%% true;
%% _ ->
%% false
%% end.
i=#p_srvr{proto_mod=Mod}=_I}=_S) ->
Mod:connected_p(SockMaybe).
try_connect(#state{sock=undefined,
i=#p_srvr{proto=ipv4, address=Host, port=TcpPort}=_I}=S) ->
try
Sock = machi_util:connect(Host, TcpPort),
S#state{sock=Sock}
catch
_:_ ->
S
end;
i=#p_srvr{proto_mod=Mod}=P}=S) ->
Sock = Mod:connect(P),
S#state{sock=Sock};
try_connect(S) ->
%% If we're connection-based, we're already connected.
%% If we're not connection-based, then there's nothing to do.
S.
disconnect(#state{sock=undefined}=S) ->
S;
disconnect(#state{sock=Sock,
i=#p_srvr{proto=ipv4}=_I}=S) ->
(catch gen_tcp:close(Sock)),
S#state{sock=undefined};
disconnect(S) ->
S.
i=#p_srvr{proto_mod=Mod}=_I}=S) ->
Mod:disconnect(Sock),
S#state{sock=undefined}.

View file

@ -38,7 +38,7 @@ verify_file_checksums_test() ->
W_props = [{initial_wedged, false}],
FLU1 = machi_flu1_test:setup_test_flu(verify1_flu, TcpPort, DataDir,
W_props),
Sock1 = machi_util:connect(Host, TcpPort),
Sock1 = ?FLU_C:connect(#p_srvr{address=Host, port=TcpPort}),
try
Prefix = <<"verify_prefix">>,
[{ok, _} = ?FLU_C:append_chunk(Sock1, ?DUMMY_PV1_EPOCH,

View file

@ -140,7 +140,7 @@ smoke0_test() ->
Host = "localhost",
TcpPort = 6623,
{ok, FLUa} = machi_flu1:start_link([{a,TcpPort,"./data.a"}]),
Pa = #p_srvr{name=a, proto=ipv4, address=Host, port=TcpPort},
Pa = #p_srvr{name=a, address=Host, port=TcpPort},
Members_Dict = machi_projection:make_members_dict([Pa]),
%% Egadz, more racing on startup, yay. TODO fix.
timer:sleep(1),

View file

@ -39,7 +39,7 @@ api_smoke_test() ->
erase(flu_pid),
try
I = #p_srvr{name=RegName, proto=ipv4, address=Host, port=TcpPort},
I = #p_srvr{name=RegName, address=Host, port=TcpPort},
{ok, Prox1} = ?MUT:start_link(I),
try
FakeEpoch = ?DUMMY_PV1_EPOCH,