Move FLU client 'verify checksums + local path' code from prototype/demo-day-hack
This commit is contained in:
parent
76fcd4d931
commit
f8263c15cc
5 changed files with 98 additions and 26 deletions
3
Makefile
3
Makefile
|
@ -34,5 +34,8 @@ build_plt: deps compile
|
|||
dialyzer: deps compile
|
||||
dialyzer -Wno_return --plt $(PLT) ebin
|
||||
|
||||
dialyzer-test: deps compile
|
||||
dialyzer -Wno_return --plt $(PLT) .eunit
|
||||
|
||||
clean_plt:
|
||||
rm $(PLT)
|
||||
|
|
|
@ -20,7 +20,13 @@
|
|||
|
||||
-module(machi_admin_util).
|
||||
|
||||
%% TODO Move these types to a common header file? (also machi_flu1_client.erl?)
|
||||
-type inet_host() :: inet:ip_address() | inet:hostname().
|
||||
-type inet_port() :: inet:port_number().
|
||||
|
||||
-export([
|
||||
%% verify_file_checksums_local/2,
|
||||
verify_file_checksums_local/3,
|
||||
verify_file_checksums_remote/2, verify_file_checksums_remote/3
|
||||
]).
|
||||
-compile(export_all).
|
||||
|
@ -29,28 +35,69 @@
|
|||
|
||||
-define(FLU_C, machi_flu1_client).
|
||||
|
||||
-spec verify_file_checksums_local(inet_host(), inet_port(), binary()|list()) ->
|
||||
{ok, [tuple()]} | {error, term()}.
|
||||
verify_file_checksums_local(Host, TcpPort, Path) ->
|
||||
Sock1 = machi_util:connect(Host, TcpPort),
|
||||
verify_file_checksums_local2(Sock1, Path).
|
||||
|
||||
-spec verify_file_checksums_remote(port(), binary()|list()) ->
|
||||
{ok, [tuple()]} | {error, term()}.
|
||||
verify_file_checksums_remote(Sock1, File) when is_port(Sock1) ->
|
||||
verify_file_checksums_remote2(Sock1, File).
|
||||
|
||||
-spec verify_file_checksums_remote(inet_host(), inet_port(), binary()|list()) ->
|
||||
{ok, [tuple()]} | {error, term()}.
|
||||
verify_file_checksums_remote(Host, TcpPort, File) ->
|
||||
Sock1 = machi_util:connect(Host, TcpPort),
|
||||
verify_file_checksums_remote2(Sock1, File).
|
||||
|
||||
%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
||||
|
||||
verify_file_checksums_local2(Sock1, Path0) ->
|
||||
Path = machi_util:make_string(Path0),
|
||||
case file:open(Path, [read, binary, raw]) of
|
||||
{ok, FH} ->
|
||||
File = re:replace(Path, ".*/", "", [{return, binary}]),
|
||||
try
|
||||
ReadChunk = fun(_File, Offset, Size) ->
|
||||
file:pread(FH, Offset, Size)
|
||||
end,
|
||||
verify_file_checksums_common(Sock1, File, ReadChunk)
|
||||
after
|
||||
file:close(FH)
|
||||
end;
|
||||
Else ->
|
||||
Else
|
||||
end.
|
||||
|
||||
verify_file_checksums_remote2(Sock1, File) ->
|
||||
ReadChunk = fun(File_name, Offset, Size) ->
|
||||
?FLU_C:read_chunk(Sock1, File_name, Offset, Size)
|
||||
end,
|
||||
verify_file_checksums_common(Sock1, File, ReadChunk).
|
||||
|
||||
verify_file_checksums_common(Sock1, File, ReadChunk) ->
|
||||
try
|
||||
{ok, Info} = ?FLU_C:checksum_list(Sock1, File),
|
||||
Res = lists:foldl(verify_chunk_checksum(Sock1, File), [], Info),
|
||||
{ok, Res}
|
||||
case ?FLU_C:checksum_list(Sock1, File) of
|
||||
{ok, Info} ->
|
||||
?FLU_C:checksum_list(Sock1, File),
|
||||
Res = lists:foldl(verify_chunk_checksum(File, ReadChunk),
|
||||
[], Info),
|
||||
{ok, Res};
|
||||
{error, no_such_file}=Nope ->
|
||||
Nope;
|
||||
{error, _}=Else ->
|
||||
Else
|
||||
end
|
||||
catch
|
||||
What:Why ->
|
||||
{error, {What, Why, erlang:get_stacktrace()}}
|
||||
end.
|
||||
|
||||
verify_chunk_checksum(Sock1, File) ->
|
||||
verify_chunk_checksum(File, ReadChunk) ->
|
||||
fun({Offset, Size, CSum}, Acc) ->
|
||||
case ?FLU_C:read_chunk(Sock1, File, Offset, Size) of
|
||||
case ReadChunk(File, Offset, Size) of
|
||||
{ok, Chunk} ->
|
||||
CSum2 = machi_util:checksum(Chunk),
|
||||
if CSum == CSum2 ->
|
||||
|
|
|
@ -36,15 +36,18 @@
|
|||
trunc_hack/2, trunc_hack/3
|
||||
]).
|
||||
|
||||
-type chunk() :: iolist().
|
||||
-type chunk_s() :: binary().
|
||||
-type chunk() :: binary() | iolist(). % client can use either
|
||||
-type chunk_csum() :: {file_offset(), chunk_size(), binary()}.
|
||||
-type chunk_s() :: binary(). % server always uses binary()
|
||||
-type chunk_pos() :: {file_offset(), chunk_size(), file_name_s()}.
|
||||
-type chunk_size() :: non_neg_integer().
|
||||
-type inet_host() :: inet:ip_address() | inet:hostname().
|
||||
-type inet_port() :: inet:port_number().
|
||||
-type file_info() :: {file_size(), file_name_s()}.
|
||||
-type file_name() :: binary() | list().
|
||||
-type file_name_s() :: binary(). % server reply
|
||||
-type file_offset() :: non_neg_integer().
|
||||
-type file_size() :: non_neg_integer().
|
||||
-type file_prefix() :: binary() | list().
|
||||
|
||||
%% @doc Append a chunk (binary- or iolist-style) of data to a file
|
||||
|
@ -72,14 +75,16 @@ append_chunk(Host, TcpPort, Prefix, Chunk) ->
|
|||
|
||||
-spec read_chunk(port(), file_name(), file_offset(), chunk_size()) ->
|
||||
{ok, chunk_s()} | {error, term()}.
|
||||
read_chunk(Sock, File, Offset, Size) ->
|
||||
read_chunk(Sock, File, Offset, Size)
|
||||
when Offset >= ?MINIMUM_OFFSET, Size >= 0 ->
|
||||
read_chunk2(Sock, File, Offset, Size).
|
||||
|
||||
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
|
||||
|
||||
-spec read_chunk(inet_host(), inet_port(), file_name(), file_offset(), chunk_size()) ->
|
||||
{ok, chunk_s()} | {error, term()}.
|
||||
read_chunk(Host, TcpPort, File, Offset, Size) ->
|
||||
read_chunk(Host, TcpPort, File, Offset, Size)
|
||||
when Offset >= ?MINIMUM_OFFSET, Size >= 0 ->
|
||||
Sock = machi_util:connect(Host, TcpPort),
|
||||
try
|
||||
read_chunk2(Sock, File, Offset, Size)
|
||||
|
@ -90,14 +95,14 @@ read_chunk(Host, TcpPort, File, Offset, Size) ->
|
|||
%% @doc Fetch the list of chunk checksums for `File'.
|
||||
|
||||
-spec checksum_list(port(), file_name()) ->
|
||||
{ok, [file_name()]} | {error, term()}.
|
||||
{ok, [chunk_csum()]} | {error, term()}.
|
||||
checksum_list(Sock, File) when is_port(Sock) ->
|
||||
checksum_list2(Sock, File).
|
||||
|
||||
%% @doc Fetch the list of chunk checksums for `File'.
|
||||
|
||||
-spec checksum_list(inet_host(), inet_port(), file_name()) ->
|
||||
{ok, [file_name()]} | {error, term()}.
|
||||
{ok, [chunk_csum()]} | {error, term()}.
|
||||
checksum_list(Host, TcpPort, File) when is_integer(TcpPort) ->
|
||||
Sock = machi_util:connect(Host, TcpPort),
|
||||
try
|
||||
|
@ -109,14 +114,14 @@ checksum_list(Host, TcpPort, File) when is_integer(TcpPort) ->
|
|||
%% @doc Fetch the list of all files on the remote FLU.
|
||||
|
||||
-spec list_files(port()) ->
|
||||
{ok, [file_name()]} | {error, term()}.
|
||||
{ok, [file_info()]} | {error, term()}.
|
||||
list_files(Sock) when is_port(Sock) ->
|
||||
list2(Sock).
|
||||
|
||||
%% @doc Fetch the list of all files on the remote FLU.
|
||||
|
||||
-spec list_files(inet_host(), inet_port()) ->
|
||||
{ok, [file_name()]} | {error, term()}.
|
||||
{ok, [file_info()]} | {error, term()}.
|
||||
list_files(Host, TcpPort) when is_integer(TcpPort) ->
|
||||
Sock = machi_util:connect(Host, TcpPort),
|
||||
try
|
||||
|
@ -140,16 +145,18 @@ quit(Sock) when is_port(Sock) ->
|
|||
%% `File' at `Offset'.
|
||||
|
||||
-spec write_chunk(port(), file_name(), file_offset(), chunk()) ->
|
||||
{ok, chunk_s()} | {error, term()}.
|
||||
write_chunk(Sock, File, Offset, Chunk) ->
|
||||
ok | {error, term()}.
|
||||
write_chunk(Sock, File, Offset, Chunk)
|
||||
when Offset >= ?MINIMUM_OFFSET ->
|
||||
write_chunk2(Sock, File, Offset, Chunk).
|
||||
|
||||
%% @doc Restricted API: Write a chunk of already-sequenced data to
|
||||
%% `File' at `Offset'.
|
||||
|
||||
-spec write_chunk(inet_host(), inet_port(), file_name(), file_offset(), chunk()) ->
|
||||
{ok, chunk_s()} | {error, term()}.
|
||||
write_chunk(Host, TcpPort, File, Offset, Chunk) ->
|
||||
ok | {error, term()}.
|
||||
write_chunk(Host, TcpPort, File, Offset, Chunk)
|
||||
when Offset >= ?MINIMUM_OFFSET ->
|
||||
Sock = machi_util:connect(Host, TcpPort),
|
||||
try
|
||||
write_chunk2(Sock, File, Offset, Chunk)
|
||||
|
@ -161,7 +168,7 @@ write_chunk(Host, TcpPort, File, Offset, Chunk) ->
|
|||
%% migrated.
|
||||
|
||||
-spec delete_migration(port(), file_name()) ->
|
||||
{ok, [file_name()]} | {error, term()}.
|
||||
ok | {error, term()}.
|
||||
delete_migration(Sock, File) when is_port(Sock) ->
|
||||
delete_migration2(Sock, File).
|
||||
|
||||
|
@ -169,7 +176,7 @@ delete_migration(Sock, File) when is_port(Sock) ->
|
|||
%% migrated.
|
||||
|
||||
-spec delete_migration(inet_host(), inet_port(), file_name()) ->
|
||||
{ok, [file_name()]} | {error, term()}.
|
||||
ok | {error, term()}.
|
||||
delete_migration(Host, TcpPort, File) when is_integer(TcpPort) ->
|
||||
Sock = machi_util:connect(Host, TcpPort),
|
||||
try
|
||||
|
@ -182,7 +189,7 @@ delete_migration(Host, TcpPort, File) when is_integer(TcpPort) ->
|
|||
%% erasure coded.
|
||||
|
||||
-spec trunc_hack(port(), file_name()) ->
|
||||
{ok, [file_name()]} | {error, term()}.
|
||||
ok | {error, term()}.
|
||||
trunc_hack(Sock, File) when is_port(Sock) ->
|
||||
trunc_hack2(Sock, File).
|
||||
|
||||
|
@ -190,7 +197,7 @@ trunc_hack(Sock, File) when is_port(Sock) ->
|
|||
%% erasure coded.
|
||||
|
||||
-spec trunc_hack(inet_host(), inet_port(), file_name()) ->
|
||||
{ok, [file_name()]} | {error, term()}.
|
||||
ok | {error, term()}.
|
||||
trunc_hack(Host, TcpPort, File) when is_integer(TcpPort) ->
|
||||
Sock = machi_util:connect(Host, TcpPort),
|
||||
try
|
||||
|
|
|
@ -24,7 +24,7 @@
|
|||
checksum/1,
|
||||
hexstr_to_bin/1, bin_to_hexstr/1,
|
||||
hexstr_to_int/1, int_to_hexstr/2, int_to_hexbin/2,
|
||||
make_binary/1,
|
||||
make_binary/1, make_string/1,
|
||||
make_regname/1,
|
||||
make_checksum_filename/2, make_data_filename/2,
|
||||
read_max_filenum/2, increment_max_filenum/2,
|
||||
|
@ -118,6 +118,11 @@ make_binary(X) when is_binary(X) ->
|
|||
make_binary(X) when is_list(X) ->
|
||||
iolist_to_binary(X).
|
||||
|
||||
make_string(X) when is_list(X) ->
|
||||
lists:flatten(X);
|
||||
make_string(X) when is_binary(X) ->
|
||||
binary_to_list(X).
|
||||
|
||||
hexstr_to_int(X) ->
|
||||
B = hexstr_to_bin(X),
|
||||
B_size = byte_size(B) * 8,
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
-define(FLU, machi_flu1).
|
||||
-define(FLU_C, machi_flu1_client).
|
||||
|
||||
verify_file_checksums_remote_test() ->
|
||||
verify_file_checksums_test() ->
|
||||
Host = "localhost",
|
||||
TcpPort = 32958,
|
||||
DataDir = "./data",
|
||||
|
@ -49,10 +49,20 @@ verify_file_checksums_remote_test() ->
|
|||
ok = file:write(FH, "yo"),
|
||||
ok = file:write(FH, "yo!"),
|
||||
ok = file:close(FH),
|
||||
{ok, [_,_,_]} = machi_admin_util:verify_file_checksums_remote(
|
||||
Host, TcpPort, File)
|
||||
|
||||
%% Check the local flavor of the API
|
||||
{ok, Res1} = machi_admin_util:verify_file_checksums_local(
|
||||
Host, TcpPort, Path),
|
||||
3 = length(Res1),
|
||||
|
||||
%% Check the remote flavor of the API
|
||||
{ok, Res2} = machi_admin_util:verify_file_checksums_remote(
|
||||
Host, TcpPort, File),
|
||||
3 = length(Res2),
|
||||
|
||||
ok
|
||||
after
|
||||
catch ?FLU_C:quick(Sock1),
|
||||
catch ?FLU_C:quit(Sock1),
|
||||
ok = ?FLU:stop(FLU1)
|
||||
end.
|
||||
|
||||
|
|
Loading…
Reference in a new issue