Rudimentary client-side checksum and server-side checksum type tags

This commit is contained in:
Scott Lystig Fritchie 2015-06-01 14:25:55 +09:00
parent 98d0b735f9
commit e3162fdcda
5 changed files with 153 additions and 34 deletions

View file

@ -24,3 +24,9 @@
-define(DATA_DIR, "./data").
-define(MINIMUM_OFFSET, 1024).
%% 0th draft of checksum typing with 1st byte.
-define(CSUM_TAG_NONE, 0). % No csum provided by client
-define(CSUM_TAG_CLIENT_GEN, 1). % Client-generated csum
-define(CSUM_TAG_SERVER_GEN, 2). % Server-genereated csum
-define(CSUM_TAG_SERVER_REGEN, 3). % Server-regenerated csum

View file

@ -113,7 +113,7 @@ verify_file_checksums_common(Sock1, EpochID, File, ReadChunk) ->
end.
verify_chunk_checksum(File, ReadChunk) ->
fun({Offset, Size, CSum}, Acc) ->
fun({Offset, Size, <<_Tag:1/binary, CSum/binary>>}, Acc) ->
case ReadChunk(File, Offset, Size) of
{ok, Chunk} ->
CSum2 = machi_util:checksum_chunk(Chunk),

View file

@ -260,6 +260,7 @@ append_server_loop(FluPid, #state{data_dir=DataDir,wedged=Wedged_p}=S) ->
end.
-define(EpochIDSpace, ((4*2)+(20*2))). % hexencodingwhee!
-define(CSumSpace, ((1*2)+(20*2))). % hexencodingwhee!
decode_epoch_id(EpochIDHex) ->
<<EpochNum:(4*8)/big, EpochCSum/binary>> =
@ -273,20 +274,23 @@ net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) ->
case gen_tcp:recv(Sock, 0, ?SERVER_CMD_READ_TIMEOUT) of
{ok, Line} ->
%% machi_util:verb("Got: ~p\n", [Line]),
PrefixLenLF = byte_size(Line) - 2 - ?EpochIDSpace - 8 -8 - 1,
PrefixLenLF = byte_size(Line) - 2 - ?EpochIDSpace - ?CSumSpace
- 8 - 8 - 1,
FileLenLF = byte_size(Line) - 2 - ?EpochIDSpace - 16 - 8 - 1,
CSumFileLenLF = byte_size(Line) - 2 - ?EpochIDSpace - 1,
WriteFileLenLF = byte_size(Line) - 7 - ?EpochIDSpace - 16 - 8 - 1,
WriteFileLenLF = byte_size(Line) - 7 - ?EpochIDSpace - ?CSumSpace
- 16 - 8 - 1,
DelFileLenLF = byte_size(Line) - 14 - ?EpochIDSpace - 1,
case Line of
%% For normal use
<<"A ",
EpochIDHex:(?EpochIDSpace)/binary,
CSumHex:(?CSumSpace)/binary,
LenHex:8/binary, ExtraHex:8/binary,
Prefix:PrefixLenLF/binary, "\n">> ->
_EpochID = decode_epoch_id(EpochIDHex),
do_net_server_append(FluName, Sock, LenHex, ExtraHex,
Prefix);
do_net_server_append(FluName, Sock, CSumHex,
LenHex, ExtraHex, Prefix);
<<"R ",
EpochIDHex:(?EpochIDSpace)/binary,
OffsetHex:16/binary, LenHex:8/binary,
@ -311,10 +315,12 @@ net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) ->
%% For "internal" replication only.
<<"W-repl ",
EpochIDHex:(?EpochIDSpace)/binary,
CSumHex:(?CSumSpace)/binary,
OffsetHex:16/binary, LenHex:8/binary,
File:WriteFileLenLF/binary, "\n">> ->
_EpochID = decode_epoch_id(EpochIDHex),
do_net_server_write(Sock, OffsetHex, LenHex, File, DataDir,
do_net_server_write(Sock, CSumHex, OffsetHex, LenHex,
File, DataDir,
<<"fixme1">>, false, <<"fixme2">>);
%% For data migration only.
<<"DEL-migration ",
@ -354,11 +360,12 @@ append_server_dispatch(From, Prefix, Chunk, CSum, Extra, DataDir, LinkPid) ->
Pid ! {seq_append, From, Prefix, Chunk, CSum, Extra},
exit(normal).
do_net_server_append(FluName, Sock, LenHex, ExtraHex, Prefix) ->
do_net_server_append(FluName, Sock, CSumHex, LenHex, ExtraHex, Prefix) ->
%% TODO: robustify against other invalid path characters such as NUL
case sanitize_file_string(Prefix) of
ok ->
do_net_server_append2(FluName, Sock, LenHex, ExtraHex, Prefix);
do_net_server_append2(FluName, Sock, CSumHex,
LenHex, ExtraHex, Prefix);
_ ->
ok = gen_tcp:send(Sock, <<"ERROR BAD-ARG">>)
end.
@ -371,21 +378,48 @@ sanitize_file_string(Str) ->
error
end.
do_net_server_append2(FluName, Sock, LenHex, ExtraHex, Prefix) ->
do_net_server_append2(FluName, Sock, CSumHex, LenHex, ExtraHex, Prefix) ->
<<Len:32/big>> = machi_util:hexstr_to_bin(LenHex),
<<Extra:32/big>> = machi_util:hexstr_to_bin(ExtraHex),
ClientCSum = machi_util:hexstr_to_bin(CSumHex),
ok = inet:setopts(Sock, [{packet, raw}]),
{ok, Chunk} = gen_tcp:recv(Sock, Len, 60*1000),
CSum = machi_util:checksum_chunk(Chunk),
try
CSum = case ClientCSum of
<<?CSUM_TAG_NONE:8, _/binary>> ->
%% TODO: If the client was foolish enough to use
%% this type of non-checksum, then the client gets
%% what it deserves wrt data integrity, alas. In
%% the client-side Chain Replication method, each
%% server will calculated this independently, which
%% isn't exactly what ought to happen for best data
%% integrity checking. In server-side CR, the csum
%% should be calculated by the head and passed down
%% the chain together with the value.
CS = machi_util:checksum_chunk(Chunk),
<<?CSUM_TAG_SERVER_GEN:8, CS/binary>>;
<<?CSUM_TAG_CLIENT_GEN:8, ClientCS/binary>> ->
CS = machi_util:checksum_chunk(Chunk),
if CS == ClientCS ->
ClientCSum;
true ->
throw({bad_csum, CS})
end;
_ ->
ClientCSum
end,
FluName ! {seq_append, self(), Prefix, Chunk, CSum, Extra}
catch error:badarg ->
catch
throw:{bad_csum, _CS} ->
ok = gen_tcp:send(Sock, <<"ERROR BAD-CHECKSUM\n">>),
exit(normal);
error:badarg ->
error_logger:error_msg("Message send to ~p gave badarg, make certain server is running with correct registered name\n", [?MODULE])
end,
receive
{assignment, Offset, File} ->
OffsetHex = machi_util:bin_to_hexstr(<<Offset:64/big>>),
Out = io_lib:format("OK ~s ~s\n", [OffsetHex, File]),
Out = io_lib:format("OK ~s ~s ~s\n", [CSumHex, OffsetHex, File]),
ok = gen_tcp:send(Sock, Out);
wedged ->
ok = gen_tcp:send(Sock, <<"ERROR WEDGED\n">>)
@ -478,6 +512,9 @@ do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin, DataDir,
{ok, FH} ->
try
DoItFun(FH, Offset, Len)
catch
throw:{bad_csum, _CS} ->
ok = gen_tcp:send(Sock, <<"ERROR BAD-CHECKSUM\n">>)
after
file:close(FH)
end;
@ -492,29 +529,55 @@ do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin, DataDir,
ok = BadIoFun(Sock)
end.
do_net_server_write(Sock, OffsetHex, LenHex, FileBin, DataDir,
do_net_server_write(Sock, CSumHex, OffsetHex, LenHex, FileBin, DataDir,
EpochID, Wedged_p, CurrentEpochId) ->
CSumPath = machi_util:make_checksum_filename(DataDir, FileBin),
case file:open(CSumPath, [append, raw, binary, delayed_write]) of
{ok, FHc} ->
do_net_server_write2(Sock, OffsetHex, LenHex, FileBin, DataDir, FHc,
EpochID, Wedged_p, CurrentEpochId);
do_net_server_write2(Sock, CSumHex, OffsetHex, LenHex, FileBin,
DataDir, FHc, EpochID, Wedged_p,
CurrentEpochId);
{error, enoent} ->
ok = filelib:ensure_dir(CSumPath),
do_net_server_write(Sock, OffsetHex, LenHex, FileBin, DataDir,
EpochID, Wedged_p, CurrentEpochId)
do_net_server_write(Sock, CSumHex, OffsetHex, LenHex, FileBin,
DataDir, EpochID, Wedged_p,
CurrentEpochId)
end.
do_net_server_write2(Sock, OffsetHex, LenHex, FileBin, DataDir, FHc,
do_net_server_write2(Sock, CSumHex, OffsetHex, LenHex, FileBin, DataDir, FHc,
EpochID, Wedged_p, CurrentEpochId) ->
ClientCSum = machi_util:hexstr_to_bin(CSumHex),
DoItFun = fun(FHd, Offset, Len) ->
ok = inet:setopts(Sock, [{packet, raw}]),
{ok, Chunk} = gen_tcp:recv(Sock, Len),
CSum = machi_util:checksum_chunk(Chunk),
CSum = case ClientCSum of
<<?CSUM_TAG_NONE:8, _/binary>> ->
%% TODO: If the client was foolish enough to use
%% this type of non-checksum, then the client gets
%% what it deserves wrt data integrity, alas. In
%% the client-side Chain Replication method, each
%% server will calculated this independently, which
%% isn't exactly what ought to happen for best data
%% integrity checking. In server-side CR, the csum
%% should be calculated by the head and passed down
%% the chain together with the value.
CS = machi_util:checksum_chunk(Chunk),
<<?CSUM_TAG_SERVER_GEN:8, CS/binary>>;
<<?CSUM_TAG_CLIENT_GEN:8, ClientCS/binary>> ->
CS = machi_util:checksum_chunk(Chunk),
if CS == ClientCS ->
ClientCSum;
true ->
throw({bad_csum, CS})
end;
_ ->
ClientCSum
end,
case file:pwrite(FHd, Offset, Chunk) of
ok ->
CSumHex = machi_util:bin_to_hexstr(CSum),
CSum_info = [OffsetHex, 32, LenHex, 32, CSumHex, 10],
CSumHex2 = machi_util:bin_to_hexstr(CSum),
CSum_info = [OffsetHex, 32, LenHex, 32,
CSumHex2, 10],
ok = file:write(FHc, CSum_info),
ok = file:close(FHc),
gen_tcp:send(Sock, <<"OK\n">>);

View file

@ -85,12 +85,16 @@
trunc_hack/3, trunc_hack/4
]).
-type chunk() :: binary() | iolist(). % client can use either
-type chunk_csum() :: {file_offset(), chunk_size(), binary()}.
%% TODO: Hrm, this kind of API use ... is it a bad idea? We really want to
%% encourage client-side checksums; thus it ought to be dead easy.
-type chunk() :: chunk_bin() | {chunk_csum(), chunk_bin()}.
-type chunk_bin() :: binary() | iolist(). % client can use either
-type chunk_csum() :: binary(). % 1 byte tag, N-1 bytes checksum
-type chunk_summary() :: {file_offset(), chunk_size(), binary()}.
-type chunk_s() :: binary(). % server always uses binary()
-type chunk_pos() :: {file_offset(), chunk_size(), file_name_s()}.
-type chunk_size() :: non_neg_integer().
-type error_general() :: 'bad_arg' | 'wedged'.
-type error_general() :: 'bad_arg' | 'wedged' | 'bad_checksum'.
-type epoch_csum() :: binary().
-type epoch_num() :: -1 | non_neg_integer().
-type epoch_id() :: {epoch_num(), epoch_csum()}.
@ -193,7 +197,7 @@ read_chunk(Host, TcpPort, EpochID, File, Offset, Size)
%% @doc Fetch the list of chunk checksums for `File'.
-spec checksum_list(port_wrap(), epoch_id(), file_name()) ->
{ok, [chunk_csum()]} |
{ok, [chunk_summary()]} |
{error, error_general() | 'no_such_file' | 'partial_read'} |
{error, term()}.
checksum_list(Sock, EpochID, File) ->
@ -202,7 +206,7 @@ checksum_list(Sock, EpochID, File) ->
%% @doc Fetch the list of chunk checksums for `File'.
-spec checksum_list(inet_host(), inet_port(), epoch_id(), file_name()) ->
{ok, [chunk_csum()]} |
{ok, [chunk_summary()]} |
{error, error_general() | 'no_such_file'} | {error, term()}.
checksum_list(Host, TcpPort, EpochID, File) when is_integer(TcpPort) ->
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
@ -486,20 +490,28 @@ append_chunk2(Sock, EpochID, Prefix0, Chunk0, ChunkExtra) ->
%% TODO: add client-side checksum to the server's protocol
%% _ = machi_util:checksum_chunk(Chunk),
Prefix = machi_util:make_binary(Prefix0),
Chunk = machi_util:make_binary(Chunk0),
Len = iolist_size(Chunk0),
{CSum, Chunk} = case Chunk0 of
{_,_} ->
Chunk0;
XX when is_binary(XX) ->
SHA = machi_util:checksum_chunk(Chunk0),
{<<?CSUM_TAG_CLIENT_GEN:8, SHA/binary>>, Chunk0}
end,
Len = iolist_size(Chunk),
true = (Len =< ?MAX_CHUNK_SIZE),
{EpochNum, EpochCSum} = EpochID,
EpochIDHex = machi_util:bin_to_hexstr(
<<EpochNum:(4*8)/big, EpochCSum/binary>>),
CSumHex = machi_util:bin_to_hexstr(CSum),
LenHex = machi_util:int_to_hexbin(Len, 32),
ExtraHex = machi_util:int_to_hexbin(ChunkExtra, 32),
Cmd = [<<"A ">>, EpochIDHex, LenHex, ExtraHex, Prefix, 10],
Cmd = [<<"A ">>, EpochIDHex, CSumHex, LenHex, ExtraHex, Prefix, 10],
ok = w_send(Sock, [Cmd, Chunk]),
{ok, Line} = w_recv(Sock, 0),
PathLen = byte_size(Line) - 3 - 16 - 1 - 1,
PathLen = byte_size(Line) - 3 - (2*(1+20)) - 16 - 1 - 1 - 1,
case Line of
<<"OK ", OffsetHex:16/binary, " ",
<<"OK ", ServerCSum:(2*(1+20))/binary, " ",
OffsetHex:16/binary, " ",
Path:PathLen/binary, _:1/binary>> ->
Offset = machi_util:hexstr_to_int(OffsetHex),
{ok, {Offset, Len, Path}};
@ -507,6 +519,8 @@ append_chunk2(Sock, EpochID, Prefix0, Chunk0, ChunkExtra) ->
{error, bad_arg};
<<"ERROR WEDGED", _/binary>> ->
{error, wedged};
<<"ERROR BAD-CHECKSUM", _/binary>> ->
{error, bad_checksum};
<<"ERROR ", Rest/binary>> ->
{error, Rest}
end
@ -711,11 +725,18 @@ write_chunk2(Sock, EpochID, File0, Offset, Chunk0) ->
File = machi_util:make_binary(File0),
true = (Offset >= ?MINIMUM_OFFSET),
OffsetHex = machi_util:int_to_hexbin(Offset, 64),
Chunk = machi_util:make_binary(Chunk0),
Len = iolist_size(Chunk0),
{CSum, Chunk} = case Chunk0 of
{_,_} ->
Chunk0;
XX when is_binary(XX) ->
SHA = machi_util:checksum_chunk(Chunk0),
{<<?CSUM_TAG_CLIENT_GEN:8, SHA/binary>>, Chunk0}
end,
CSumHex = machi_util:bin_to_hexstr(CSum),
Len = iolist_size(Chunk),
true = (Len =< ?MAX_CHUNK_SIZE),
LenHex = machi_util:int_to_hexbin(Len, 32),
Cmd = [<<"W-repl ">>, EpochIDHex, OffsetHex,
Cmd = [<<"W-repl ">>, EpochIDHex, CSumHex, OffsetHex,
LenHex, File, <<"\n">>],
ok = w_send(Sock, [Cmd, Chunk]),
{ok, Line} = w_recv(Sock, 0),
@ -727,6 +748,8 @@ write_chunk2(Sock, EpochID, File0, Offset, Chunk0) ->
{error, bad_arg};
<<"ERROR WEDGED", _/binary>> ->
{error, wedged};
<<"ERROR BAD-CHECKSUM", _/binary>> ->
{error, bad_checksum};
<<"ERROR ", _/binary>>=Else ->
{error, {server_said, Else}}
end

View file

@ -104,6 +104,11 @@ flu_smoke_test() ->
exit(not_mandatory_but_test_expected_same_file_fixme)
end,
Chunk1_cs = {<<?CSUM_TAG_NONE:8, 0:(8*20)>>, Chunk1},
{ok, {Off1e,Len1e,File1e}} = ?FLU_C:append_chunk(Host, TcpPort,
?DUMMY_PV1_EPOCH,
Prefix, Chunk1_cs),
Chunk2 = <<"yo yo">>,
Len2 = byte_size(Chunk2),
Off2 = ?MINIMUM_OFFSET + 77,
@ -170,6 +175,28 @@ flu_projection_smoke_test() ->
ok = ?FLU:stop(FLU1)
end.
bad_checksum_test() ->
Host = "localhost",
TcpPort = 32960,
DataDir = "./data",
FLU1 = setup_test_flu(projection_test_flu, TcpPort, DataDir),
try
Prefix = <<"some prefix">>,
Chunk1 = <<"yo yo yo">>,
Chunk1_badcs = {<<?CSUM_TAG_CLIENT_GEN:8, 0:(8*20)>>, Chunk1},
{error, bad_checksum} = ?FLU_C:append_chunk(Host, TcpPort,
?DUMMY_PV1_EPOCH,
Prefix, Chunk1_badcs),
{error, bad_checksum} = ?FLU_C:write_chunk(Host, TcpPort,
?DUMMY_PV1_EPOCH,
<<"foo-file">>, 99832,
Chunk1_badcs),
ok
after
ok = ?FLU:stop(FLU1)
end.
clean_up_data_dir(DataDir) ->
[begin
Fs = filelib:wildcard(DataDir ++ Glob),