Merge branch 'slf/checksum-typing'
This commit is contained in:
commit
c62e9c98bd
9 changed files with 228 additions and 45 deletions
|
@ -24,3 +24,9 @@
|
|||
-define(DATA_DIR, "./data").
|
||||
-define(MINIMUM_OFFSET, 1024).
|
||||
|
||||
%% 0th draft of checksum typing with 1st byte.
|
||||
-define(CSUM_TAG_NONE, 0). % No csum provided by client
|
||||
-define(CSUM_TAG_CLIENT_GEN, 1). % Client-generated csum
|
||||
-define(CSUM_TAG_SERVER_GEN, 2). % Server-genereated csum
|
||||
-define(CSUM_TAG_SERVER_REGEN, 3). % Server-regenerated csum
|
||||
|
||||
|
|
|
@ -113,7 +113,7 @@ verify_file_checksums_common(Sock1, EpochID, File, ReadChunk) ->
|
|||
end.
|
||||
|
||||
verify_chunk_checksum(File, ReadChunk) ->
|
||||
fun({Offset, Size, CSum}, Acc) ->
|
||||
fun({Offset, Size, <<_Tag:1/binary, CSum/binary>>}, Acc) ->
|
||||
case ReadChunk(File, Offset, Size) of
|
||||
{ok, Chunk} ->
|
||||
CSum2 = machi_util:checksum_chunk(Chunk),
|
||||
|
|
|
@ -295,6 +295,8 @@ do_append_head2(Prefix, Chunk, ChunkExtra, Depth, STime,
|
|||
%% io:format(user, "append ~w,", [HeadFLU]),
|
||||
do_append_midtail(RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra,
|
||||
[HeadFLU], 0, STime, S);
|
||||
{error, bad_checksum}=BadCS ->
|
||||
{reply, BadCS, S};
|
||||
{error, Retry}
|
||||
when Retry == partition; Retry == bad_epoch; Retry == wedged ->
|
||||
do_append_head(Prefix, Chunk, ChunkExtra, Depth, STime, S);
|
||||
|
@ -359,6 +361,9 @@ do_append_midtail2([FLU|RestFLUs]=FLUs, Prefix, File, Offset, Chunk,
|
|||
%% io:format(user, "write ~w,", [FLU]),
|
||||
do_append_midtail2(RestFLUs, Prefix, File, Offset, Chunk,
|
||||
ChunkExtra, [FLU|Ws], Depth, STime, S);
|
||||
{error, bad_checksum}=BadCS ->
|
||||
%% TODO: alternate strategy?
|
||||
{reply, BadCS, S};
|
||||
{error, Retry}
|
||||
when Retry == partition; Retry == bad_epoch; Retry == wedged ->
|
||||
do_append_midtail(FLUs, Prefix, File, Offset, Chunk,
|
||||
|
@ -407,6 +412,9 @@ do_read_chunk2(File, Offset, Size, Depth, STime,
|
|||
got, byte_size(BadChunk)});
|
||||
{error, partial_read}=Err ->
|
||||
{reply, Err, S};
|
||||
{error, bad_checksum}=BadCS ->
|
||||
%% TODO: alternate strategy?
|
||||
{reply, BadCS, S};
|
||||
{error, Retry}
|
||||
when Retry == partition; Retry == bad_epoch; Retry == wedged ->
|
||||
do_read_chunk(File, Offset, Size, Depth, STime, S);
|
||||
|
@ -461,6 +469,9 @@ read_repair2(cp_mode=ConsistencyMode,
|
|||
{ok, BadChunk} ->
|
||||
exit({todo, bad_chunk_size, ?MODULE, ?LINE, File, Offset,
|
||||
Size, got, byte_size(BadChunk)});
|
||||
{error, bad_checksum}=BadCS ->
|
||||
%% TODO: alternate strategy?
|
||||
{reply, BadCS, S};
|
||||
{error, Retry}
|
||||
when Retry == partition; Retry == bad_epoch; Retry == wedged ->
|
||||
read_repair(ConsistencyMode, ReturnMode, File, Offset,
|
||||
|
@ -482,6 +493,9 @@ read_repair2(ap_mode=ConsistencyMode,
|
|||
{ok, BadChunk} ->
|
||||
exit({todo, bad_chunk_size, ?MODULE, ?LINE, File,
|
||||
Offset, Size, got, byte_size(BadChunk)});
|
||||
{error, bad_checksum}=BadCS ->
|
||||
%% TODO: alternate strategy?
|
||||
{reply, BadCS, S};
|
||||
{error, Retry}
|
||||
when Retry == partition; Retry == bad_epoch; Retry == wedged ->
|
||||
read_repair(ConsistencyMode, ReturnMode, File,
|
||||
|
@ -537,6 +551,9 @@ read_repair4([First|Rest]=ToRepair, ReturnMode, Chunk, Repaired, File, Offset,
|
|||
ok ->
|
||||
read_repair4(Rest, ReturnMode, Chunk, [First|Repaired], File,
|
||||
Offset, Size, Depth, STime, S);
|
||||
{error, bad_checksum}=BadCS ->
|
||||
%% TODO: alternate strategy?
|
||||
{reply, BadCS, S};
|
||||
{error, Retry}
|
||||
when Retry == partition; Retry == bad_epoch; Retry == wedged ->
|
||||
read_repair3(ToRepair, ReturnMode, Chunk, Repaired, File,
|
||||
|
|
|
@ -97,7 +97,8 @@
|
|||
}).
|
||||
|
||||
-record(http_goop, {
|
||||
len % content-length
|
||||
len, % content-length
|
||||
x_csum % x-checksum
|
||||
}).
|
||||
|
||||
start_link([{FluName, TcpPort, DataDir}|Rest])
|
||||
|
@ -260,6 +261,7 @@ append_server_loop(FluPid, #state{data_dir=DataDir,wedged=Wedged_p}=S) ->
|
|||
end.
|
||||
|
||||
-define(EpochIDSpace, ((4*2)+(20*2))). % hexencodingwhee!
|
||||
-define(CSumSpace, ((1*2)+(20*2))). % hexencodingwhee!
|
||||
|
||||
decode_epoch_id(EpochIDHex) ->
|
||||
<<EpochNum:(4*8)/big, EpochCSum/binary>> =
|
||||
|
@ -273,20 +275,23 @@ net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) ->
|
|||
case gen_tcp:recv(Sock, 0, ?SERVER_CMD_READ_TIMEOUT) of
|
||||
{ok, Line} ->
|
||||
%% machi_util:verb("Got: ~p\n", [Line]),
|
||||
PrefixLenLF = byte_size(Line) - 2 - ?EpochIDSpace - 8 -8 - 1,
|
||||
PrefixLenLF = byte_size(Line) - 2 - ?EpochIDSpace - ?CSumSpace
|
||||
- 8 - 8 - 1,
|
||||
FileLenLF = byte_size(Line) - 2 - ?EpochIDSpace - 16 - 8 - 1,
|
||||
CSumFileLenLF = byte_size(Line) - 2 - ?EpochIDSpace - 1,
|
||||
WriteFileLenLF = byte_size(Line) - 7 - ?EpochIDSpace - 16 - 8 - 1,
|
||||
WriteFileLenLF = byte_size(Line) - 7 - ?EpochIDSpace - ?CSumSpace
|
||||
- 16 - 8 - 1,
|
||||
DelFileLenLF = byte_size(Line) - 14 - ?EpochIDSpace - 1,
|
||||
case Line of
|
||||
%% For normal use
|
||||
<<"A ",
|
||||
EpochIDHex:(?EpochIDSpace)/binary,
|
||||
CSumHex:(?CSumSpace)/binary,
|
||||
LenHex:8/binary, ExtraHex:8/binary,
|
||||
Prefix:PrefixLenLF/binary, "\n">> ->
|
||||
_EpochID = decode_epoch_id(EpochIDHex),
|
||||
do_net_server_append(FluName, Sock, LenHex, ExtraHex,
|
||||
Prefix);
|
||||
do_net_server_append(FluName, Sock, CSumHex,
|
||||
LenHex, ExtraHex, Prefix);
|
||||
<<"R ",
|
||||
EpochIDHex:(?EpochIDSpace)/binary,
|
||||
OffsetHex:16/binary, LenHex:8/binary,
|
||||
|
@ -311,10 +316,12 @@ net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) ->
|
|||
%% For "internal" replication only.
|
||||
<<"W-repl ",
|
||||
EpochIDHex:(?EpochIDSpace)/binary,
|
||||
CSumHex:(?CSumSpace)/binary,
|
||||
OffsetHex:16/binary, LenHex:8/binary,
|
||||
File:WriteFileLenLF/binary, "\n">> ->
|
||||
_EpochID = decode_epoch_id(EpochIDHex),
|
||||
do_net_server_write(Sock, OffsetHex, LenHex, File, DataDir,
|
||||
do_net_server_write(Sock, CSumHex, OffsetHex, LenHex,
|
||||
File, DataDir,
|
||||
<<"fixme1">>, false, <<"fixme2">>);
|
||||
%% For data migration only.
|
||||
<<"DEL-migration ",
|
||||
|
@ -354,11 +361,12 @@ append_server_dispatch(From, Prefix, Chunk, CSum, Extra, DataDir, LinkPid) ->
|
|||
Pid ! {seq_append, From, Prefix, Chunk, CSum, Extra},
|
||||
exit(normal).
|
||||
|
||||
do_net_server_append(FluName, Sock, LenHex, ExtraHex, Prefix) ->
|
||||
do_net_server_append(FluName, Sock, CSumHex, LenHex, ExtraHex, Prefix) ->
|
||||
%% TODO: robustify against other invalid path characters such as NUL
|
||||
case sanitize_file_string(Prefix) of
|
||||
ok ->
|
||||
do_net_server_append2(FluName, Sock, LenHex, ExtraHex, Prefix);
|
||||
do_net_server_append2(FluName, Sock, CSumHex,
|
||||
LenHex, ExtraHex, Prefix);
|
||||
_ ->
|
||||
ok = gen_tcp:send(Sock, <<"ERROR BAD-ARG">>)
|
||||
end.
|
||||
|
@ -371,21 +379,48 @@ sanitize_file_string(Str) ->
|
|||
error
|
||||
end.
|
||||
|
||||
do_net_server_append2(FluName, Sock, LenHex, ExtraHex, Prefix) ->
|
||||
do_net_server_append2(FluName, Sock, CSumHex, LenHex, ExtraHex, Prefix) ->
|
||||
<<Len:32/big>> = machi_util:hexstr_to_bin(LenHex),
|
||||
<<Extra:32/big>> = machi_util:hexstr_to_bin(ExtraHex),
|
||||
ClientCSum = machi_util:hexstr_to_bin(CSumHex),
|
||||
ok = inet:setopts(Sock, [{packet, raw}]),
|
||||
{ok, Chunk} = gen_tcp:recv(Sock, Len, 60*1000),
|
||||
CSum = machi_util:checksum_chunk(Chunk),
|
||||
try
|
||||
CSum = case ClientCSum of
|
||||
<<?CSUM_TAG_NONE:8, _/binary>> ->
|
||||
%% TODO: If the client was foolish enough to use
|
||||
%% this type of non-checksum, then the client gets
|
||||
%% what it deserves wrt data integrity, alas. In
|
||||
%% the client-side Chain Replication method, each
|
||||
%% server will calculated this independently, which
|
||||
%% isn't exactly what ought to happen for best data
|
||||
%% integrity checking. In server-side CR, the csum
|
||||
%% should be calculated by the head and passed down
|
||||
%% the chain together with the value.
|
||||
CS = machi_util:checksum_chunk(Chunk),
|
||||
machi_util:make_tagged_csum(server_gen, CS);
|
||||
<<?CSUM_TAG_CLIENT_GEN:8, ClientCS/binary>> ->
|
||||
CS = machi_util:checksum_chunk(Chunk),
|
||||
if CS == ClientCS ->
|
||||
ClientCSum;
|
||||
true ->
|
||||
throw({bad_csum, CS})
|
||||
end;
|
||||
_ ->
|
||||
ClientCSum
|
||||
end,
|
||||
FluName ! {seq_append, self(), Prefix, Chunk, CSum, Extra}
|
||||
catch error:badarg ->
|
||||
catch
|
||||
throw:{bad_csum, _CS} ->
|
||||
ok = gen_tcp:send(Sock, <<"ERROR BAD-CHECKSUM\n">>),
|
||||
exit(normal);
|
||||
error:badarg ->
|
||||
error_logger:error_msg("Message send to ~p gave badarg, make certain server is running with correct registered name\n", [?MODULE])
|
||||
end,
|
||||
receive
|
||||
{assignment, Offset, File} ->
|
||||
OffsetHex = machi_util:bin_to_hexstr(<<Offset:64/big>>),
|
||||
Out = io_lib:format("OK ~s ~s\n", [OffsetHex, File]),
|
||||
Out = io_lib:format("OK ~s ~s ~s\n", [CSumHex, OffsetHex, File]),
|
||||
ok = gen_tcp:send(Sock, Out);
|
||||
wedged ->
|
||||
ok = gen_tcp:send(Sock, <<"ERROR WEDGED\n">>)
|
||||
|
@ -478,6 +513,9 @@ do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin, DataDir,
|
|||
{ok, FH} ->
|
||||
try
|
||||
DoItFun(FH, Offset, Len)
|
||||
catch
|
||||
throw:{bad_csum, _CS} ->
|
||||
ok = gen_tcp:send(Sock, <<"ERROR BAD-CHECKSUM\n">>)
|
||||
after
|
||||
file:close(FH)
|
||||
end;
|
||||
|
@ -492,29 +530,55 @@ do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin, DataDir,
|
|||
ok = BadIoFun(Sock)
|
||||
end.
|
||||
|
||||
do_net_server_write(Sock, OffsetHex, LenHex, FileBin, DataDir,
|
||||
do_net_server_write(Sock, CSumHex, OffsetHex, LenHex, FileBin, DataDir,
|
||||
EpochID, Wedged_p, CurrentEpochId) ->
|
||||
CSumPath = machi_util:make_checksum_filename(DataDir, FileBin),
|
||||
case file:open(CSumPath, [append, raw, binary, delayed_write]) of
|
||||
{ok, FHc} ->
|
||||
do_net_server_write2(Sock, OffsetHex, LenHex, FileBin, DataDir, FHc,
|
||||
EpochID, Wedged_p, CurrentEpochId);
|
||||
do_net_server_write2(Sock, CSumHex, OffsetHex, LenHex, FileBin,
|
||||
DataDir, FHc, EpochID, Wedged_p,
|
||||
CurrentEpochId);
|
||||
{error, enoent} ->
|
||||
ok = filelib:ensure_dir(CSumPath),
|
||||
do_net_server_write(Sock, OffsetHex, LenHex, FileBin, DataDir,
|
||||
EpochID, Wedged_p, CurrentEpochId)
|
||||
do_net_server_write(Sock, CSumHex, OffsetHex, LenHex, FileBin,
|
||||
DataDir, EpochID, Wedged_p,
|
||||
CurrentEpochId)
|
||||
end.
|
||||
|
||||
do_net_server_write2(Sock, OffsetHex, LenHex, FileBin, DataDir, FHc,
|
||||
do_net_server_write2(Sock, CSumHex, OffsetHex, LenHex, FileBin, DataDir, FHc,
|
||||
EpochID, Wedged_p, CurrentEpochId) ->
|
||||
ClientCSum = machi_util:hexstr_to_bin(CSumHex),
|
||||
DoItFun = fun(FHd, Offset, Len) ->
|
||||
ok = inet:setopts(Sock, [{packet, raw}]),
|
||||
{ok, Chunk} = gen_tcp:recv(Sock, Len),
|
||||
CSum = machi_util:checksum_chunk(Chunk),
|
||||
CSum = case ClientCSum of
|
||||
<<?CSUM_TAG_NONE:8, _/binary>> ->
|
||||
%% TODO: If the client was foolish enough to use
|
||||
%% this type of non-checksum, then the client gets
|
||||
%% what it deserves wrt data integrity, alas. In
|
||||
%% the client-side Chain Replication method, each
|
||||
%% server will calculated this independently, which
|
||||
%% isn't exactly what ought to happen for best data
|
||||
%% integrity checking. In server-side CR, the csum
|
||||
%% should be calculated by the head and passed down
|
||||
%% the chain together with the value.
|
||||
CS = machi_util:checksum_chunk(Chunk),
|
||||
machi_util:make_tagged_csum(server_gen,CS);
|
||||
<<?CSUM_TAG_CLIENT_GEN:8, ClientCS/binary>> ->
|
||||
CS = machi_util:checksum_chunk(Chunk),
|
||||
if CS == ClientCS ->
|
||||
ClientCSum;
|
||||
true ->
|
||||
throw({bad_csum, CS})
|
||||
end;
|
||||
_ ->
|
||||
ClientCSum
|
||||
end,
|
||||
case file:pwrite(FHd, Offset, Chunk) of
|
||||
ok ->
|
||||
CSumHex = machi_util:bin_to_hexstr(CSum),
|
||||
CSum_info = [OffsetHex, 32, LenHex, 32, CSumHex, 10],
|
||||
CSumHex2 = machi_util:bin_to_hexstr(CSum),
|
||||
CSum_info = [OffsetHex, 32, LenHex, 32,
|
||||
CSumHex2, 10],
|
||||
ok = file:write(FHc, CSum_info),
|
||||
ok = file:close(FHc),
|
||||
gen_tcp:send(Sock, <<"OK\n">>);
|
||||
|
@ -861,18 +925,35 @@ http_server_hack(FluName, Line1, Sock, S) ->
|
|||
http_server_hack_put(Sock, G, FluName, MyURI) ->
|
||||
ok = inet:setopts(Sock, [{packet, raw}]),
|
||||
{ok, Chunk} = gen_tcp:recv(Sock, G#http_goop.len, 60*1000),
|
||||
CSum = machi_util:checksum_chunk(Chunk),
|
||||
CSum0 = machi_util:checksum_chunk(Chunk),
|
||||
try
|
||||
CSum = case G#http_goop.x_csum of
|
||||
undefined ->
|
||||
machi_util:make_tagged_csum(server_gen, CSum0);
|
||||
XX when is_binary(XX) ->
|
||||
if XX == CSum0 ->
|
||||
machi_util:make_tagged_csum(client_gen, CSum0);
|
||||
true ->
|
||||
throw({bad_csum, XX})
|
||||
end
|
||||
end,
|
||||
FluName ! {seq_append, self(), MyURI, Chunk, CSum, 0}
|
||||
catch error:badarg ->
|
||||
catch
|
||||
throw:{bad_csum, _CS} ->
|
||||
Out = "HTTP/1.0 412 Precondition failed\r\n"
|
||||
"X-Reason: bad checksum\r\n\r\n",
|
||||
ok = gen_tcp:send(Sock, Out),
|
||||
ok = gen_tcp:close(Sock),
|
||||
exit(normal);
|
||||
error:badarg ->
|
||||
error_logger:error_msg("Message send to ~p gave badarg, make certain server is running with correct registered name\n", [?MODULE])
|
||||
end,
|
||||
receive
|
||||
{assignment, Offset, File} ->
|
||||
Out = io_lib:format("HTTP/1.0 201 Created\r\nLocation: ~s\r\n"
|
||||
Msg = io_lib:format("HTTP/1.0 201 Created\r\nLocation: ~s\r\n"
|
||||
"X-Offset: ~w\r\nX-Size: ~w\r\n\r\n",
|
||||
[File, Offset, byte_size(Chunk)]),
|
||||
ok = gen_tcp:send(Sock, Out);
|
||||
ok = gen_tcp:send(Sock, Msg);
|
||||
wedged ->
|
||||
ok = gen_tcp:send(Sock, <<"HTTP/1.0 499 WEDGED\r\n\r\n">>)
|
||||
after 10*1000 ->
|
||||
|
@ -932,6 +1013,10 @@ digest_header_goop([], G) ->
|
|||
G;
|
||||
digest_header_goop([{http_header, _, 'Content-Length', _, Str}|T], G) ->
|
||||
digest_header_goop(T, G#http_goop{len=list_to_integer(Str)});
|
||||
digest_header_goop([{http_header, _, "X-Checksum", _, Str}|T], G) ->
|
||||
SHA = machi_util:hexstr_to_bin(Str),
|
||||
CSum = machi_util:make_tagged_csum(client_gen, SHA),
|
||||
digest_header_goop(T, G#http_goop{x_csum=CSum});
|
||||
digest_header_goop([_H|T], G) ->
|
||||
digest_header_goop(T, G).
|
||||
|
||||
|
|
|
@ -85,12 +85,16 @@
|
|||
trunc_hack/3, trunc_hack/4
|
||||
]).
|
||||
|
||||
-type chunk() :: binary() | iolist(). % client can use either
|
||||
-type chunk_csum() :: {file_offset(), chunk_size(), binary()}.
|
||||
%% TODO: Hrm, this kind of API use ... is it a bad idea? We really want to
|
||||
%% encourage client-side checksums; thus it ought to be dead easy.
|
||||
-type chunk() :: chunk_bin() | {chunk_csum(), chunk_bin()}.
|
||||
-type chunk_bin() :: binary() | iolist(). % client can use either
|
||||
-type chunk_csum() :: binary(). % 1 byte tag, N-1 bytes checksum
|
||||
-type chunk_summary() :: {file_offset(), chunk_size(), binary()}.
|
||||
-type chunk_s() :: binary(). % server always uses binary()
|
||||
-type chunk_pos() :: {file_offset(), chunk_size(), file_name_s()}.
|
||||
-type chunk_size() :: non_neg_integer().
|
||||
-type error_general() :: 'bad_arg' | 'wedged'.
|
||||
-type error_general() :: 'bad_arg' | 'wedged' | 'bad_checksum'.
|
||||
-type epoch_csum() :: binary().
|
||||
-type epoch_num() :: -1 | non_neg_integer().
|
||||
-type epoch_id() :: {epoch_num(), epoch_csum()}.
|
||||
|
@ -193,7 +197,7 @@ read_chunk(Host, TcpPort, EpochID, File, Offset, Size)
|
|||
%% @doc Fetch the list of chunk checksums for `File'.
|
||||
|
||||
-spec checksum_list(port_wrap(), epoch_id(), file_name()) ->
|
||||
{ok, [chunk_csum()]} |
|
||||
{ok, [chunk_summary()]} |
|
||||
{error, error_general() | 'no_such_file' | 'partial_read'} |
|
||||
{error, term()}.
|
||||
checksum_list(Sock, EpochID, File) ->
|
||||
|
@ -202,7 +206,7 @@ checksum_list(Sock, EpochID, File) ->
|
|||
%% @doc Fetch the list of chunk checksums for `File'.
|
||||
|
||||
-spec checksum_list(inet_host(), inet_port(), epoch_id(), file_name()) ->
|
||||
{ok, [chunk_csum()]} |
|
||||
{ok, [chunk_summary()]} |
|
||||
{error, error_general() | 'no_such_file'} | {error, term()}.
|
||||
checksum_list(Host, TcpPort, EpochID, File) when is_integer(TcpPort) ->
|
||||
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
|
||||
|
@ -486,20 +490,28 @@ append_chunk2(Sock, EpochID, Prefix0, Chunk0, ChunkExtra) ->
|
|||
%% TODO: add client-side checksum to the server's protocol
|
||||
%% _ = machi_util:checksum_chunk(Chunk),
|
||||
Prefix = machi_util:make_binary(Prefix0),
|
||||
Chunk = machi_util:make_binary(Chunk0),
|
||||
Len = iolist_size(Chunk0),
|
||||
{CSum, Chunk} = case Chunk0 of
|
||||
{_,_} ->
|
||||
Chunk0;
|
||||
XX when is_binary(XX) ->
|
||||
SHA = machi_util:checksum_chunk(Chunk0),
|
||||
{<<?CSUM_TAG_CLIENT_GEN:8, SHA/binary>>, Chunk0}
|
||||
end,
|
||||
Len = iolist_size(Chunk),
|
||||
true = (Len =< ?MAX_CHUNK_SIZE),
|
||||
{EpochNum, EpochCSum} = EpochID,
|
||||
EpochIDHex = machi_util:bin_to_hexstr(
|
||||
<<EpochNum:(4*8)/big, EpochCSum/binary>>),
|
||||
CSumHex = machi_util:bin_to_hexstr(CSum),
|
||||
LenHex = machi_util:int_to_hexbin(Len, 32),
|
||||
ExtraHex = machi_util:int_to_hexbin(ChunkExtra, 32),
|
||||
Cmd = [<<"A ">>, EpochIDHex, LenHex, ExtraHex, Prefix, 10],
|
||||
Cmd = [<<"A ">>, EpochIDHex, CSumHex, LenHex, ExtraHex, Prefix, 10],
|
||||
ok = w_send(Sock, [Cmd, Chunk]),
|
||||
{ok, Line} = w_recv(Sock, 0),
|
||||
PathLen = byte_size(Line) - 3 - 16 - 1 - 1,
|
||||
PathLen = byte_size(Line) - 3 - (2*(1+20)) - 16 - 1 - 1 - 1,
|
||||
case Line of
|
||||
<<"OK ", OffsetHex:16/binary, " ",
|
||||
<<"OK ", ServerCSum:(2*(1+20))/binary, " ",
|
||||
OffsetHex:16/binary, " ",
|
||||
Path:PathLen/binary, _:1/binary>> ->
|
||||
Offset = machi_util:hexstr_to_int(OffsetHex),
|
||||
{ok, {Offset, Len, Path}};
|
||||
|
@ -507,6 +519,8 @@ append_chunk2(Sock, EpochID, Prefix0, Chunk0, ChunkExtra) ->
|
|||
{error, bad_arg};
|
||||
<<"ERROR WEDGED", _/binary>> ->
|
||||
{error, wedged};
|
||||
<<"ERROR BAD-CHECKSUM", _/binary>> ->
|
||||
{error, bad_checksum};
|
||||
<<"ERROR ", Rest/binary>> ->
|
||||
{error, Rest}
|
||||
end
|
||||
|
@ -711,11 +725,18 @@ write_chunk2(Sock, EpochID, File0, Offset, Chunk0) ->
|
|||
File = machi_util:make_binary(File0),
|
||||
true = (Offset >= ?MINIMUM_OFFSET),
|
||||
OffsetHex = machi_util:int_to_hexbin(Offset, 64),
|
||||
Chunk = machi_util:make_binary(Chunk0),
|
||||
Len = iolist_size(Chunk0),
|
||||
{CSum, Chunk} = case Chunk0 of
|
||||
{_,_} ->
|
||||
Chunk0;
|
||||
XX when is_binary(XX) ->
|
||||
SHA = machi_util:checksum_chunk(Chunk0),
|
||||
{<<?CSUM_TAG_CLIENT_GEN:8, SHA/binary>>, Chunk0}
|
||||
end,
|
||||
CSumHex = machi_util:bin_to_hexstr(CSum),
|
||||
Len = iolist_size(Chunk),
|
||||
true = (Len =< ?MAX_CHUNK_SIZE),
|
||||
LenHex = machi_util:int_to_hexbin(Len, 32),
|
||||
Cmd = [<<"W-repl ">>, EpochIDHex, OffsetHex,
|
||||
Cmd = [<<"W-repl ">>, EpochIDHex, CSumHex, OffsetHex,
|
||||
LenHex, File, <<"\n">>],
|
||||
ok = w_send(Sock, [Cmd, Chunk]),
|
||||
{ok, Line} = w_recv(Sock, 0),
|
||||
|
@ -727,6 +748,8 @@ write_chunk2(Sock, EpochID, File0, Offset, Chunk0) ->
|
|||
{error, bad_arg};
|
||||
<<"ERROR WEDGED", _/binary>> ->
|
||||
{error, wedged};
|
||||
<<"ERROR BAD-CHECKSUM", _/binary>> ->
|
||||
{error, bad_checksum};
|
||||
<<"ERROR ", _/binary>>=Else ->
|
||||
{error, {server_said, Else}}
|
||||
end
|
||||
|
|
|
@ -24,6 +24,7 @@
|
|||
|
||||
-export([
|
||||
checksum_chunk/1,
|
||||
make_tagged_csum/2,
|
||||
hexstr_to_bin/1, bin_to_hexstr/1,
|
||||
hexstr_to_int/1, int_to_hexstr/2, int_to_hexbin/2,
|
||||
make_binary/1, make_string/1,
|
||||
|
@ -217,6 +218,17 @@ int_to_hexbin(I, I_size) ->
|
|||
checksum_chunk(Chunk) when is_binary(Chunk); is_list(Chunk) ->
|
||||
crypto:hash(sha, Chunk).
|
||||
|
||||
%% @doc Create a tagged checksum
|
||||
|
||||
make_tagged_csum(none, SHA) ->
|
||||
<<?CSUM_TAG_NONE:8, SHA/binary>>;
|
||||
make_tagged_csum(client_gen, SHA) ->
|
||||
<<?CSUM_TAG_CLIENT_GEN:8, SHA/binary>>;
|
||||
make_tagged_csum(server_gen, SHA) ->
|
||||
<<?CSUM_TAG_SERVER_GEN:8, SHA/binary>>;
|
||||
make_tagged_csum(server_regen, SHA) ->
|
||||
<<?CSUM_TAG_SERVER_REGEN:8, SHA/binary>>.
|
||||
|
||||
%% @doc Log a verbose message.
|
||||
|
||||
-spec verb(string()) -> term().
|
||||
|
|
|
@ -27,7 +27,9 @@
|
|||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
|
||||
smoke_test() ->
|
||||
smoke_test_() -> {timeout, 1*60, fun() -> smoke_test2() end}.
|
||||
|
||||
smoke_test2() ->
|
||||
os:cmd("rm -rf ./data.a ./data.b ./data.c"),
|
||||
{ok, SupPid} = machi_flu_sup:start_link(),
|
||||
error_logger:tty(false),
|
||||
|
@ -86,6 +88,9 @@ smoke_test() ->
|
|||
machi_cr_client:append_chunk(C1, Prefix, Chunk1),
|
||||
{ok, {Off1,Size1,File1}} =
|
||||
machi_cr_client:append_chunk(C1, Prefix, Chunk1),
|
||||
Chunk1_badcs = {<<?CSUM_TAG_CLIENT_GEN:8, 0:(8*20)>>, Chunk1},
|
||||
{error, bad_checksum} =
|
||||
machi_cr_client:append_chunk(C1, Prefix, Chunk1_badcs),
|
||||
{ok, Chunk1} = machi_cr_client:read_chunk(C1, File1, Off1, Size1),
|
||||
{ok, PPP} = machi_flu1_client:read_latest_projection(Host, PortBase+0,
|
||||
private),
|
||||
|
|
|
@ -104,6 +104,11 @@ flu_smoke_test() ->
|
|||
exit(not_mandatory_but_test_expected_same_file_fixme)
|
||||
end,
|
||||
|
||||
Chunk1_cs = {<<?CSUM_TAG_NONE:8, 0:(8*20)>>, Chunk1},
|
||||
{ok, {Off1e,Len1e,File1e}} = ?FLU_C:append_chunk(Host, TcpPort,
|
||||
?DUMMY_PV1_EPOCH,
|
||||
Prefix, Chunk1_cs),
|
||||
|
||||
Chunk2 = <<"yo yo">>,
|
||||
Len2 = byte_size(Chunk2),
|
||||
Off2 = ?MINIMUM_OFFSET + 77,
|
||||
|
@ -170,6 +175,28 @@ flu_projection_smoke_test() ->
|
|||
ok = ?FLU:stop(FLU1)
|
||||
end.
|
||||
|
||||
bad_checksum_test() ->
|
||||
Host = "localhost",
|
||||
TcpPort = 32960,
|
||||
DataDir = "./data",
|
||||
|
||||
FLU1 = setup_test_flu(projection_test_flu, TcpPort, DataDir),
|
||||
try
|
||||
Prefix = <<"some prefix">>,
|
||||
Chunk1 = <<"yo yo yo">>,
|
||||
Chunk1_badcs = {<<?CSUM_TAG_CLIENT_GEN:8, 0:(8*20)>>, Chunk1},
|
||||
{error, bad_checksum} = ?FLU_C:append_chunk(Host, TcpPort,
|
||||
?DUMMY_PV1_EPOCH,
|
||||
Prefix, Chunk1_badcs),
|
||||
{error, bad_checksum} = ?FLU_C:write_chunk(Host, TcpPort,
|
||||
?DUMMY_PV1_EPOCH,
|
||||
<<"foo-file">>, 99832,
|
||||
Chunk1_badcs),
|
||||
ok
|
||||
after
|
||||
ok = ?FLU:stop(FLU1)
|
||||
end.
|
||||
|
||||
clean_up_data_dir(DataDir) ->
|
||||
[begin
|
||||
Fs = filelib:wildcard(DataDir ++ Glob),
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
-module(machi_proxy_flu1_client_test).
|
||||
-compile(export_all).
|
||||
|
||||
-include("machi.hrl").
|
||||
-include("machi_projection.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
|
@ -34,6 +35,7 @@ api_smoke_test() ->
|
|||
TcpPort = 57124,
|
||||
DataDir = "./data.api_smoke_flu",
|
||||
W_props = [{initial_wedged, false}],
|
||||
Prefix = <<"prefix">>,
|
||||
FLU1 = machi_flu1_test:setup_test_flu(RegName, TcpPort, DataDir,
|
||||
W_props),
|
||||
erase(flu_pid),
|
||||
|
@ -44,15 +46,15 @@ api_smoke_test() ->
|
|||
try
|
||||
FakeEpoch = ?DUMMY_PV1_EPOCH,
|
||||
[{ok, {_,_,_}} = ?MUT:append_chunk(Prox1,
|
||||
FakeEpoch, <<"prefix">>, <<"data">>,
|
||||
FakeEpoch, Prefix, <<"data">>,
|
||||
infinity) || _ <- lists:seq(1,5)],
|
||||
%% Stop the FLU, what happens?
|
||||
machi_flu1:stop(FLU1),
|
||||
{error,_} = ?MUT:append_chunk(Prox1,
|
||||
FakeEpoch, <<"prefix">>, <<"data">>,
|
||||
FakeEpoch, Prefix, <<"data">>,
|
||||
infinity),
|
||||
{error,partition} = ?MUT:append_chunk(Prox1,
|
||||
FakeEpoch, <<"prefix">>, <<"data">>,
|
||||
FakeEpoch, Prefix, <<"data">>,
|
||||
infinity),
|
||||
%% Start the FLU again, we should be able to do stuff immediately
|
||||
FLU1b = machi_flu1_test:setup_test_flu(RegName, TcpPort, DataDir,
|
||||
|
@ -60,14 +62,20 @@ api_smoke_test() ->
|
|||
put(flu_pid, FLU1b),
|
||||
MyChunk = <<"my chunk data">>,
|
||||
{ok, {MyOff,MySize,MyFile}} =
|
||||
?MUT:append_chunk(Prox1, FakeEpoch, <<"prefix">>, MyChunk,
|
||||
?MUT:append_chunk(Prox1, FakeEpoch, Prefix, MyChunk,
|
||||
infinity),
|
||||
{ok, MyChunk} = ?MUT:read_chunk(Prox1, FakeEpoch, MyFile, MyOff, MySize),
|
||||
MyChunk2 = <<"my chunk data, yeah, again">>,
|
||||
{ok, {MyOff2,MySize2,MyFile2}} =
|
||||
?MUT:append_chunk_extra(Prox1, FakeEpoch, <<"prefix">>,
|
||||
?MUT:append_chunk_extra(Prox1, FakeEpoch, Prefix,
|
||||
MyChunk2, 4242, infinity),
|
||||
{ok, MyChunk2} = ?MUT:read_chunk(Prox1, FakeEpoch, MyFile2, MyOff2, MySize2),
|
||||
MyChunk_badcs = {<<?CSUM_TAG_CLIENT_GEN:8, 0:(8*20)>>, MyChunk},
|
||||
{error, bad_checksum} = ?MUT:append_chunk(Prox1, FakeEpoch,
|
||||
Prefix, MyChunk_badcs),
|
||||
{error, bad_checksum} = ?MUT:write_chunk(Prox1, FakeEpoch,
|
||||
<<"foo-file">>, 99832,
|
||||
MyChunk_badcs),
|
||||
|
||||
%% Alright, now for the rest of the API, whee
|
||||
BadFile = <<"no-such-file">>,
|
||||
|
|
Loading…
Reference in a new issue