WIP: epoch ID added to checksum protocol command

This commit is contained in:
Scott Lystig Fritchie 2015-04-02 20:49:45 +09:00
parent 9479baac46
commit 6b8a3cf2a4
5 changed files with 65 additions and 46 deletions

View file

@ -25,9 +25,8 @@
-type inet_port() :: inet:port_number().
-export([
%% verify_file_checksums_local/2,
verify_file_checksums_local/3,
verify_file_checksums_remote/2, verify_file_checksums_remote/3
verify_file_checksums_local/3, verify_file_checksums_local/4,
verify_file_checksums_remote/3, verify_file_checksums_remote/4
]).
-compile(export_all).
@ -36,26 +35,41 @@
-define(FLU_C, machi_flu1_client).
-spec verify_file_checksums_local(inet_host(), inet_port(), binary()|list()) ->
-spec verify_file_checksums_local(port(), machi_flu1_client:epoch_id(), binary()|list()) ->
{ok, [tuple()]} | {error, term()}.
verify_file_checksums_local(Host, TcpPort, Path) ->
Sock1 = machi_util:connect(Host, TcpPort),
verify_file_checksums_local2(Sock1, Path).
verify_file_checksums_local(Sock1, EpochID, Path) when is_port(Sock1) ->
verify_file_checksums_local2(Sock1, EpochID, Path).
-spec verify_file_checksums_remote(port(), binary()|list()) ->
-spec verify_file_checksums_local(inet_host(), inet_port(),
machi_flu1_client:epoch_id(), binary()|list()) ->
{ok, [tuple()]} | {error, term()}.
verify_file_checksums_remote(Sock1, File) when is_port(Sock1) ->
verify_file_checksums_remote2(Sock1, File).
-spec verify_file_checksums_remote(inet_host(), inet_port(), binary()|list()) ->
{ok, [tuple()]} | {error, term()}.
verify_file_checksums_remote(Host, TcpPort, File) ->
verify_file_checksums_local(Host, TcpPort, EpochID, Path) ->
Sock1 = machi_util:connect(Host, TcpPort),
verify_file_checksums_remote2(Sock1, File).
try
verify_file_checksums_local2(Sock1, EpochID, Path)
after
catch gen_tcp:close(Sock1)
end.
-spec verify_file_checksums_remote(port(), machi_flu1_client:epoch_id(), binary()|list()) ->
{ok, [tuple()]} | {error, term()}.
verify_file_checksums_remote(Sock1, EpochID, File) when is_port(Sock1) ->
verify_file_checksums_remote2(Sock1, EpochID, File).
-spec verify_file_checksums_remote(inet_host(), inet_port(),
machi_flu1_client:epoch_id(), binary()|list()) ->
{ok, [tuple()]} | {error, term()}.
verify_file_checksums_remote(Host, TcpPort, EpochID, File) ->
Sock1 = machi_util:connect(Host, TcpPort),
try
verify_file_checksums_remote2(Sock1, EpochID, File)
after
catch gen_tcp:close(Sock1)
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%
verify_file_checksums_local2(Sock1, Path0) ->
verify_file_checksums_local2(Sock1, EpochID, Path0) ->
Path = machi_util:make_string(Path0),
case file:open(Path, [read, binary, raw]) of
{ok, FH} ->
@ -64,7 +78,7 @@ verify_file_checksums_local2(Sock1, Path0) ->
ReadChunk = fun(_File, Offset, Size) ->
file:pread(FH, Offset, Size)
end,
verify_file_checksums_common(Sock1, File, ReadChunk)
verify_file_checksums_common(Sock1, EpochID, File, ReadChunk)
after
file:close(FH)
end;
@ -72,18 +86,17 @@ verify_file_checksums_local2(Sock1, Path0) ->
Else
end.
verify_file_checksums_remote2(Sock1, File) ->
verify_file_checksums_remote2(Sock1, EpochID, File) ->
ReadChunk = fun(File_name, Offset, Size) ->
?FLU_C:read_chunk(Sock1, ?DUMMY_PV1_EPOCH,
?FLU_C:read_chunk(Sock1, EpochID,
File_name, Offset, Size)
end,
verify_file_checksums_common(Sock1, File, ReadChunk).
verify_file_checksums_common(Sock1, EpochID, File, ReadChunk).
verify_file_checksums_common(Sock1, File, ReadChunk) ->
verify_file_checksums_common(Sock1, EpochID, File, ReadChunk) ->
try
case ?FLU_C:checksum_list(Sock1, File) of
case ?FLU_C:checksum_list(Sock1, EpochID, File) of
{ok, Info} ->
?FLU_C:checksum_list(Sock1, File),
Res = lists:foldl(verify_chunk_checksum(File, ReadChunk),
[], Info),
{ok, Res};

View file

@ -107,10 +107,9 @@ net_server_loop(Sock, #state{reg_name=RegName, data_dir=DataDir}=S) ->
case gen_tcp:recv(Sock, 0, 60*1000) of
{ok, Line} ->
%% machi_util:verb("Got: ~p\n", [Line]),
PrefixLenLF_E = byte_size(Line) - 2 - ?EpochIDSpace - 8 - 1,
FileLenLF_E = byte_size(Line) - 2 - ?EpochIDSpace - 16 - 8 - 1,
CSumFileLenLF = byte_size(Line) - 2 - 1,
CSumFileLenCRLF = byte_size(Line) - 2 - 2,
PrefixLenLF = byte_size(Line) - 2 - ?EpochIDSpace - 8 - 1,
FileLenLF = byte_size(Line) - 2 - ?EpochIDSpace - 16 - 8 - 1,
CSumFileLenLF = byte_size(Line) - 2 - ?EpochIDSpace - 1,
WriteFileLenLF = byte_size(Line) - 7 - 16 - 1 - 8 - 1 - 1,
DelFileLenLF = byte_size(Line) - 14 - 1,
case Line of
@ -118,20 +117,20 @@ net_server_loop(Sock, #state{reg_name=RegName, data_dir=DataDir}=S) ->
<<"A ",
_EpochIDRaw:(?EpochIDSpace)/binary,
LenHex:8/binary,
Prefix:PrefixLenLF_E/binary, "\n">> ->
Prefix:PrefixLenLF/binary, "\n">> ->
do_net_server_append(RegName, Sock, LenHex, Prefix);
<<"R ",
_EpochIDRaw:(?EpochIDSpace)/binary,
OffsetHex:16/binary, LenHex:8/binary,
File:FileLenLF_E/binary, "\n">> ->
File:FileLenLF/binary, "\n">> ->
do_net_server_read(Sock, OffsetHex, LenHex, File, DataDir);
<<"L\n">> ->
do_net_server_listing(Sock, DataDir);
<<"L\r\n">> ->
do_net_server_listing(Sock, DataDir);
<<"C ", File:CSumFileLenLF/binary, "\n">> ->
do_net_server_checksum_listing(Sock, File, DataDir);
<<"C ", File:CSumFileLenCRLF/binary, "\n">> ->
<<"C ",
_EpochIDRaw:(?EpochIDSpace)/binary,
File:CSumFileLenLF/binary, "\n">> ->
do_net_server_checksum_listing(Sock, File, DataDir);
<<"QUIT\n">> ->
catch gen_tcp:close(Sock),

View file

@ -25,7 +25,7 @@
-export([
append_chunk/4, append_chunk/5,
read_chunk/5, read_chunk/6,
checksum_list/2, checksum_list/3,
checksum_list/3, checksum_list/4,
list_files/1, list_files/2,
quit/1
]).
@ -53,6 +53,8 @@
-type file_size() :: non_neg_integer().
-type file_prefix() :: binary() | list().
-export_type([epoch_id/0]).
%% @doc Append a chunk (binary- or iolist-style) of data to a file
%% with `Prefix'.
@ -99,19 +101,19 @@ read_chunk(Host, TcpPort, EpochID, File, Offset, Size)
%% @doc Fetch the list of chunk checksums for `File'.
-spec checksum_list(port(), file_name()) ->
-spec checksum_list(port(), epoch_id(), file_name()) ->
{ok, [chunk_csum()]} | {error, term()}.
checksum_list(Sock, File) when is_port(Sock) ->
checksum_list2(Sock, File).
checksum_list(Sock, EpochID, File) when is_port(Sock) ->
checksum_list2(Sock, EpochID, File).
%% @doc Fetch the list of chunk checksums for `File'.
-spec checksum_list(inet_host(), inet_port(), file_name()) ->
-spec checksum_list(inet_host(), inet_port(), epoch_id(), file_name()) ->
{ok, [chunk_csum()]} | {error, term()}.
checksum_list(Host, TcpPort, File) when is_integer(TcpPort) ->
checksum_list(Host, TcpPort, EpochID, File) when is_integer(TcpPort) ->
Sock = machi_util:connect(Host, TcpPort),
try
checksum_list2(Sock, File)
checksum_list2(Sock, EpochID, File)
after
catch gen_tcp:close(Sock)
end.
@ -308,9 +310,11 @@ list2({ok, Line}, Sock) ->
list2(Else, _Sock) ->
throw({server_protocol_error, Else}).
checksum_list2(Sock, File) ->
checksum_list2(Sock, EpochID, File) ->
try
ok = gen_tcp:send(Sock, [<<"C ">>, File, <<"\n">>]),
{EpochNum, EpochCSum} = EpochID,
EpochIDRaw = <<EpochNum:(4*8)/big, EpochCSum/binary>>,
ok = gen_tcp:send(Sock, [<<"C ">>, EpochIDRaw, File, <<"\n">>]),
ok = inet:setopts(Sock, [{packet, line}]),
case gen_tcp:recv(Sock, 0) of
{ok, <<"OK ", Rest/binary>> = Line} ->

View file

@ -42,7 +42,7 @@ verify_file_checksums_test() ->
X <- lists:seq(1,10)],
{ok, [{_FileSize,File}]} = ?FLU_C:list_files(Sock1),
{ok, []} = machi_admin_util:verify_file_checksums_remote(
Host, TcpPort, File),
Host, TcpPort, ?DUMMY_PV1_EPOCH, File),
Path = DataDir ++ "/" ++ binary_to_list(File),
{ok, FH} = file:open(Path, [read,write]),
@ -54,12 +54,12 @@ verify_file_checksums_test() ->
%% Check the local flavor of the API
{ok, Res1} = machi_admin_util:verify_file_checksums_local(
Host, TcpPort, Path),
Host, TcpPort, ?DUMMY_PV1_EPOCH, Path),
3 = length(Res1),
%% Check the remote flavor of the API
{ok, Res2} = machi_admin_util:verify_file_checksums_remote(
Host, TcpPort, File),
Host, TcpPort, ?DUMMY_PV1_EPOCH, File),
3 = length(Res2),
ok

View file

@ -49,8 +49,10 @@ flu_smoke_test() ->
FLU1 = setup_test_flu(smoke_flu, TcpPort, DataDir),
try
{error, no_such_file} = ?FLU_C:checksum_list(Host, TcpPort,
?DUMMY_PV1_EPOCH,
"does-not-exist"),
{error, bad_arg} = ?FLU_C:checksum_list(Host, TcpPort, BadFile),
{error, bad_arg} = ?FLU_C:checksum_list(Host, TcpPort,
?DUMMY_PV1_EPOCH, BadFile),
{ok, []} = ?FLU_C:list_files(Host, TcpPort),
@ -60,7 +62,8 @@ flu_smoke_test() ->
Prefix, Chunk1),
{ok, Chunk1} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH,
File1, Off1, Len1),
{ok, [{_,_,_}]} = ?FLU_C:checksum_list(Host, TcpPort, File1),
{ok, [{_,_,_}]} = ?FLU_C:checksum_list(Host, TcpPort,
?DUMMY_PV1_EPOCH, File1),
{error, bad_arg} = ?FLU_C:append_chunk(Host, TcpPort,
?DUMMY_PV1_EPOCH,
BadPrefix, Chunk1),