WIP: giant hairball 6
This commit is contained in:
parent
77b4da16c3
commit
920a5c33d7
3 changed files with 114 additions and 127 deletions
|
@ -325,10 +325,13 @@ do_pb_request2(EpochID, CMD, S) ->
|
||||||
|
|
||||||
do_pb_request3({low_echo, _BogusEpochID, Msg}, S) ->
|
do_pb_request3({low_echo, _BogusEpochID, Msg}, S) ->
|
||||||
{Msg, S};
|
{Msg, S};
|
||||||
do_pb_request3({low_append_chunk, _EpochID, PKey, Prefix, Chunk, CSum_tag, CSum,
|
do_pb_request3({low_append_chunk, _EpochID, PKey, Prefix, Chunk, CSum_tag,
|
||||||
ChunkExtra}, S) ->
|
CSum, ChunkExtra}, S) ->
|
||||||
{do_pb_server_append_chunk(PKey, Prefix, Chunk, CSum_tag, CSum,
|
{do_pb_server_append_chunk(PKey, Prefix, Chunk, CSum_tag, CSum,
|
||||||
ChunkExtra, S), S};
|
ChunkExtra, S), S};
|
||||||
|
do_pb_request3({low_write_chunk, _EpochID, File, Offset, Chunk, CSum_tag,
|
||||||
|
CSum}, S) ->
|
||||||
|
{do_pb_server_write_chunk(File, Offset, Chunk, CSum_tag, CSum, S), S};
|
||||||
do_pb_request3({low_read_chunk, _EpochID, File, Offset, Size, Opts}, S) ->
|
do_pb_request3({low_read_chunk, _EpochID, File, Offset, Size, Opts}, S) ->
|
||||||
{do_pb_server_read_chunk(File, Offset, Size, Opts, S), S};
|
{do_pb_server_read_chunk(File, Offset, Size, Opts, S), S};
|
||||||
do_pb_request3({low_checksum_list, _EpochID, File}, S) ->
|
do_pb_request3({low_checksum_list, _EpochID, File}, S) ->
|
||||||
|
@ -368,7 +371,8 @@ do_pb_server_append_chunk2(_PKey, Prefix, Chunk, CSum_tag, Client_CSum,
|
||||||
?CSUM_TAG_CLIENT_SHA ->
|
?CSUM_TAG_CLIENT_SHA ->
|
||||||
CS = machi_util:checksum_chunk(Chunk),
|
CS = machi_util:checksum_chunk(Chunk),
|
||||||
if CS == Client_CSum ->
|
if CS == Client_CSum ->
|
||||||
Client_CSum;
|
machi_util:make_tagged_csum(server_sha,
|
||||||
|
Client_CSum);
|
||||||
true ->
|
true ->
|
||||||
throw({bad_csum, CS})
|
throw({bad_csum, CS})
|
||||||
end
|
end
|
||||||
|
@ -376,11 +380,7 @@ do_pb_server_append_chunk2(_PKey, Prefix, Chunk, CSum_tag, Client_CSum,
|
||||||
FluName ! {seq_append, self(), Prefix, Chunk, CSum, ChunkExtra},
|
FluName ! {seq_append, self(), Prefix, Chunk, CSum, ChunkExtra},
|
||||||
receive
|
receive
|
||||||
{assignment, Offset, File} ->
|
{assignment, Offset, File} ->
|
||||||
Size = if is_binary(Chunk) ->
|
Size = iolist_size(Chunk),
|
||||||
byte_size(Chunk);
|
|
||||||
is_list(Chunk) ->
|
|
||||||
iolist_size(Chunk)
|
|
||||||
end,
|
|
||||||
{ok, {Offset, Size, File}};
|
{ok, {Offset, Size, File}};
|
||||||
wedged ->
|
wedged ->
|
||||||
{error, wedged}
|
{error, wedged}
|
||||||
|
@ -395,6 +395,81 @@ do_pb_server_append_chunk2(_PKey, Prefix, Chunk, CSum_tag, Client_CSum,
|
||||||
{error, bad_arg}
|
{error, bad_arg}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
do_pb_server_write_chunk(File, Offset, Chunk, CSum_tag, CSum,
|
||||||
|
#state{data_dir=DataDir}=S) ->
|
||||||
|
case sanitize_file_string(File) of
|
||||||
|
ok ->
|
||||||
|
CSumPath = machi_util:make_checksum_filename(DataDir, File),
|
||||||
|
case file:open(CSumPath, [write, read, binary, raw]) of
|
||||||
|
{ok, FHc} ->
|
||||||
|
Path = DataDir ++ "/data/" ++
|
||||||
|
machi_util:make_string(File),
|
||||||
|
{ok, FHd} = file:open(Path, [write, binary, raw]),
|
||||||
|
try
|
||||||
|
do_pb_server_write_chunk2(
|
||||||
|
File, Offset, Chunk, CSum_tag, CSum, DataDir,
|
||||||
|
FHc, FHd)
|
||||||
|
after
|
||||||
|
(catch file:close(FHc)),
|
||||||
|
(catch file:close(FHd))
|
||||||
|
end;
|
||||||
|
{error, enoent} ->
|
||||||
|
ok = filelib:ensure_dir(CSumPath),
|
||||||
|
do_pb_server_write_chunk(File, Offset, Chunk, CSum_tag,
|
||||||
|
CSum, S)
|
||||||
|
end;
|
||||||
|
_ ->
|
||||||
|
{error, bad_arg}
|
||||||
|
end.
|
||||||
|
|
||||||
|
do_pb_server_write_chunk2(_File, Offset, Chunk, CSum_tag,
|
||||||
|
Client_CSum, _DataDir, FHc, FHd) ->
|
||||||
|
try
|
||||||
|
CSum = case CSum_tag of
|
||||||
|
?CSUM_TAG_NONE ->
|
||||||
|
%% TODO: If the client was foolish enough to use
|
||||||
|
%% this type of non-checksum, then the client gets
|
||||||
|
%% what it deserves wrt data integrity, alas. In
|
||||||
|
%% the client-side Chain Replication method, each
|
||||||
|
%% server will calculated this independently, which
|
||||||
|
%% isn't exactly what ought to happen for best data
|
||||||
|
%% integrity checking. In server-side CR, the csum
|
||||||
|
%% should be calculated by the head and passed down
|
||||||
|
%% the chain together with the value.
|
||||||
|
CS = machi_util:checksum_chunk(Chunk),
|
||||||
|
machi_util:make_tagged_csum(server_sha,CS);
|
||||||
|
?CSUM_TAG_CLIENT_SHA ->
|
||||||
|
CS = machi_util:checksum_chunk(Chunk),
|
||||||
|
if CS == Client_CSum ->
|
||||||
|
machi_util:make_tagged_csum(server_sha,
|
||||||
|
Client_CSum);
|
||||||
|
true ->
|
||||||
|
throw({bad_csum, CS})
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
Size = iolist_size(Chunk),
|
||||||
|
case file:pwrite(FHd, Offset, Chunk) of
|
||||||
|
ok ->
|
||||||
|
OffsetHex = machi_util:bin_to_hexstr(<<Offset:64/big>>),
|
||||||
|
LenHex = machi_util:bin_to_hexstr(<<Size:32/big>>),
|
||||||
|
CSumHex2 = machi_util:bin_to_hexstr(CSum),
|
||||||
|
CSum_info = [OffsetHex, 32, LenHex, 32,
|
||||||
|
CSumHex2, 10],
|
||||||
|
ok = file:write(FHc, CSum_info),
|
||||||
|
ok;
|
||||||
|
_Else3 ->
|
||||||
|
machi_util:verb("Else3 ~p ~p ~p\n",
|
||||||
|
[Offset, Size, _Else3]),
|
||||||
|
{error, bad_arg}
|
||||||
|
end
|
||||||
|
catch
|
||||||
|
throw:{bad_csum, _CS} ->
|
||||||
|
{error, bad_checksum};
|
||||||
|
error:badarg ->
|
||||||
|
error_logger:error_msg("Message send to ~p gave badarg, make certain server is running with correct registered name\n", [?MODULE]),
|
||||||
|
{error, bad_arg}
|
||||||
|
end.
|
||||||
|
|
||||||
do_pb_server_read_chunk(File, Offset, Size, _Opts, #state{data_dir=DataDir})->
|
do_pb_server_read_chunk(File, Offset, Size, _Opts, #state{data_dir=DataDir})->
|
||||||
%% TODO: Look inside Opts someday.
|
%% TODO: Look inside Opts someday.
|
||||||
case sanitize_file_string(File) of
|
case sanitize_file_string(File) of
|
||||||
|
@ -510,15 +585,15 @@ net_server_loop_old(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) ->
|
||||||
catch gen_tcp:close(Sock),
|
catch gen_tcp:close(Sock),
|
||||||
exit(normal);
|
exit(normal);
|
||||||
%% For "internal" replication only.
|
%% For "internal" replication only.
|
||||||
<<"W-repl ",
|
%% <<"W-repl ",
|
||||||
EpochIDHex:(?EpochIDSpace)/binary,
|
%% EpochIDHex:(?EpochIDSpace)/binary,
|
||||||
CSumHex:(?CSumSpace)/binary,
|
%% CSumHex:(?CSumSpace)/binary,
|
||||||
OffsetHex:16/binary, LenHex:8/binary,
|
%% OffsetHex:16/binary, LenHex:8/binary,
|
||||||
File:WriteFileLenLF/binary, "\n">> ->
|
%% File:WriteFileLenLF/binary, "\n">> ->
|
||||||
_EpochID = decode_epoch_id(EpochIDHex),
|
%% _EpochID = decode_epoch_id(EpochIDHex),
|
||||||
do_net_server_write(Sock, CSumHex, OffsetHex, LenHex,
|
%% do_net_server_write(Sock, CSumHex, OffsetHex, LenHex,
|
||||||
File, DataDir,
|
%% File, DataDir,
|
||||||
<<"fixme1">>, false, <<"fixme2">>);
|
%% <<"fixme1">>, false, <<"fixme2">>);
|
||||||
%% For data migration only.
|
%% For data migration only.
|
||||||
<<"DEL-migration ",
|
<<"DEL-migration ",
|
||||||
EpochIDHex:(?EpochIDSpace)/binary,
|
EpochIDHex:(?EpochIDSpace)/binary,
|
||||||
|
@ -713,67 +788,6 @@ do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin, DataDir,
|
||||||
ok = BadIoFun(Sock)
|
ok = BadIoFun(Sock)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_net_server_write(Sock, CSumHex, OffsetHex, LenHex, FileBin, DataDir,
|
|
||||||
EpochID, Wedged_p, CurrentEpochId) ->
|
|
||||||
CSumPath = machi_util:make_checksum_filename(DataDir, FileBin),
|
|
||||||
case file:open(CSumPath, [append, raw, binary, delayed_write]) of
|
|
||||||
{ok, FHc} ->
|
|
||||||
do_net_server_write2(Sock, CSumHex, OffsetHex, LenHex, FileBin,
|
|
||||||
DataDir, FHc, EpochID, Wedged_p,
|
|
||||||
CurrentEpochId);
|
|
||||||
{error, enoent} ->
|
|
||||||
ok = filelib:ensure_dir(CSumPath),
|
|
||||||
do_net_server_write(Sock, CSumHex, OffsetHex, LenHex, FileBin,
|
|
||||||
DataDir, EpochID, Wedged_p,
|
|
||||||
CurrentEpochId)
|
|
||||||
end.
|
|
||||||
|
|
||||||
do_net_server_write2(Sock, CSumHex, OffsetHex, LenHex, FileBin, DataDir, FHc,
|
|
||||||
EpochID, Wedged_p, CurrentEpochId) ->
|
|
||||||
ClientCSum = machi_util:hexstr_to_bin(CSumHex),
|
|
||||||
DoItFun = fun(FHd, Offset, Len) ->
|
|
||||||
ok = inet:setopts(Sock, [{packet, raw}]),
|
|
||||||
{ok, Chunk} = gen_tcp:recv(Sock, Len),
|
|
||||||
CSum = case ClientCSum of
|
|
||||||
<<?CSUM_TAG_NONE:8, _/binary>> ->
|
|
||||||
%% TODO: If the client was foolish enough to use
|
|
||||||
%% this type of non-checksum, then the client gets
|
|
||||||
%% what it deserves wrt data integrity, alas. In
|
|
||||||
%% the client-side Chain Replication method, each
|
|
||||||
%% server will calculated this independently, which
|
|
||||||
%% isn't exactly what ought to happen for best data
|
|
||||||
%% integrity checking. In server-side CR, the csum
|
|
||||||
%% should be calculated by the head and passed down
|
|
||||||
%% the chain together with the value.
|
|
||||||
CS = machi_util:checksum_chunk(Chunk),
|
|
||||||
machi_util:make_tagged_csum(server_sha,CS);
|
|
||||||
<<?CSUM_TAG_CLIENT_SHA:8, ClientCS/binary>> ->
|
|
||||||
CS = machi_util:checksum_chunk(Chunk),
|
|
||||||
if CS == ClientCS ->
|
|
||||||
ClientCSum;
|
|
||||||
true ->
|
|
||||||
throw({bad_csum, CS})
|
|
||||||
end;
|
|
||||||
_ ->
|
|
||||||
ClientCSum
|
|
||||||
end,
|
|
||||||
case file:pwrite(FHd, Offset, Chunk) of
|
|
||||||
ok ->
|
|
||||||
CSumHex2 = machi_util:bin_to_hexstr(CSum),
|
|
||||||
CSum_info = [OffsetHex, 32, LenHex, 32,
|
|
||||||
CSumHex2, 10],
|
|
||||||
ok = file:write(FHc, CSum_info),
|
|
||||||
ok = file:close(FHc),
|
|
||||||
gen_tcp:send(Sock, <<"OK\n">>);
|
|
||||||
_Else3 ->
|
|
||||||
machi_util:verb("Else3 ~p ~p ~p\n",
|
|
||||||
[Offset, Len, _Else3]),
|
|
||||||
ok = gen_tcp:send(Sock, "ERROR BAD-PWRITE\n")
|
|
||||||
end
|
|
||||||
end,
|
|
||||||
do_net_server_readwrite_common(Sock, OffsetHex, LenHex, FileBin, DataDir,
|
|
||||||
[write, read, binary, raw], DoItFun,
|
|
||||||
EpochID, Wedged_p, CurrentEpochId).
|
|
||||||
|
|
||||||
perhaps_do_net_server_ec_read(Sock, FH) ->
|
perhaps_do_net_server_ec_read(Sock, FH) ->
|
||||||
case file:pread(FH, 0, ?MINIMUM_OFFSET) of
|
case file:pread(FH, 0, ?MINIMUM_OFFSET) of
|
||||||
|
|
|
@ -504,6 +504,23 @@ append_chunk2(Sock, EpochID, Prefix0, Chunk0, ChunkExtra) ->
|
||||||
ChunkExtra}),
|
ChunkExtra}),
|
||||||
do_pb_request_common(Sock, ReqID, Req).
|
do_pb_request_common(Sock, ReqID, Req).
|
||||||
|
|
||||||
|
write_chunk2(Sock, EpochID, File0, Offset, Chunk0) ->
|
||||||
|
ReqID = <<"id">>,
|
||||||
|
File = machi_util:make_binary(File0),
|
||||||
|
true = (Offset >= ?MINIMUM_OFFSET),
|
||||||
|
{Chunk, CSum_tag, CSum} =
|
||||||
|
case Chunk0 of
|
||||||
|
X when is_binary(X) ->
|
||||||
|
{Chunk0, ?CSUM_TAG_NONE, <<>>};
|
||||||
|
{ChunkCSum, Chk} ->
|
||||||
|
{Tag, CS} = machi_util:unmake_tagged_csum(ChunkCSum),
|
||||||
|
{Chk, Tag, CS}
|
||||||
|
end,
|
||||||
|
Req = machi_pb_translate:to_pb_request(
|
||||||
|
ReqID,
|
||||||
|
{low_write_chunk, EpochID, File, Offset, Chunk, CSum_tag, CSum}),
|
||||||
|
do_pb_request_common(Sock, ReqID, Req).
|
||||||
|
|
||||||
list2(Sock, EpochID) ->
|
list2(Sock, EpochID) ->
|
||||||
ReqID = <<"id">>,
|
ReqID = <<"id">>,
|
||||||
Req = machi_pb_translate:to_pb_request(
|
Req = machi_pb_translate:to_pb_request(
|
||||||
|
@ -553,54 +570,6 @@ checksum_list_finish(Chunks) ->
|
||||||
end || Line <- re:split(Bin, "\n", [{return, binary}]),
|
end || Line <- re:split(Bin, "\n", [{return, binary}]),
|
||||||
Line /= <<>>].
|
Line /= <<>>].
|
||||||
|
|
||||||
write_chunk2(Sock, EpochID, File0, Offset, Chunk0) ->
|
|
||||||
erase(bad_sock),
|
|
||||||
try
|
|
||||||
{EpochNum, EpochCSum} = EpochID,
|
|
||||||
EpochIDHex = machi_util:bin_to_hexstr(
|
|
||||||
<<EpochNum:(4*8)/big, EpochCSum/binary>>),
|
|
||||||
%% TODO: add client-side checksum to the server's protocol
|
|
||||||
%% _ = machi_util:checksum_chunk(Chunk),
|
|
||||||
File = machi_util:make_binary(File0),
|
|
||||||
true = (Offset >= ?MINIMUM_OFFSET),
|
|
||||||
OffsetHex = machi_util:int_to_hexbin(Offset, 64),
|
|
||||||
{CSum, Chunk} = case Chunk0 of
|
|
||||||
{_,_} ->
|
|
||||||
Chunk0;
|
|
||||||
XX when is_binary(XX) ->
|
|
||||||
SHA = machi_util:checksum_chunk(Chunk0),
|
|
||||||
{<<?CSUM_TAG_CLIENT_SHA:8, SHA/binary>>, Chunk0}
|
|
||||||
end,
|
|
||||||
CSumHex = machi_util:bin_to_hexstr(CSum),
|
|
||||||
Len = iolist_size(Chunk),
|
|
||||||
true = (Len =< ?MAX_CHUNK_SIZE),
|
|
||||||
LenHex = machi_util:int_to_hexbin(Len, 32),
|
|
||||||
Cmd = [<<"W-repl ">>, EpochIDHex, CSumHex, OffsetHex,
|
|
||||||
LenHex, File, <<"\n">>],
|
|
||||||
ok = w_send(Sock, [Cmd, Chunk]),
|
|
||||||
{ok, Line} = w_recv(Sock, 0),
|
|
||||||
PathLen = byte_size(Line) - 3 - 16 - 1 - 1,
|
|
||||||
case Line of
|
|
||||||
<<"OK\n">> ->
|
|
||||||
ok;
|
|
||||||
<<"ERROR BAD-ARG", _/binary>> ->
|
|
||||||
{error, bad_arg};
|
|
||||||
<<"ERROR WEDGED", _/binary>> ->
|
|
||||||
{error, wedged};
|
|
||||||
<<"ERROR BAD-CHECKSUM", _/binary>> ->
|
|
||||||
{error, bad_checksum};
|
|
||||||
<<"ERROR ", _/binary>>=Else ->
|
|
||||||
{error, {server_said, Else}}
|
|
||||||
end
|
|
||||||
catch
|
|
||||||
throw:Error ->
|
|
||||||
put(bad_sock, Sock),
|
|
||||||
Error;
|
|
||||||
error:{badmatch,_}=BadMatch ->
|
|
||||||
put(bad_sock, Sock),
|
|
||||||
{error, {badmatch, BadMatch, erlang:get_stacktrace()}}
|
|
||||||
end.
|
|
||||||
|
|
||||||
delete_migration2(Sock, EpochID, File) ->
|
delete_migration2(Sock, EpochID, File) ->
|
||||||
erase(bad_sock),
|
erase(bad_sock),
|
||||||
try
|
try
|
||||||
|
|
|
@ -147,7 +147,7 @@ io:format(user, "LINE ~s ~p\n", [?MODULE, ?LINE]),
|
||||||
Chunk2 = <<"yo yo">>,
|
Chunk2 = <<"yo yo">>,
|
||||||
Len2 = byte_size(Chunk2),
|
Len2 = byte_size(Chunk2),
|
||||||
Off2 = ?MINIMUM_OFFSET + 77,
|
Off2 = ?MINIMUM_OFFSET + 77,
|
||||||
File2 = "smoke-prefix",
|
File2 = "smoke-whole-file",
|
||||||
io:format(user, "LINE ~s ~p\n", [?MODULE, ?LINE]),
|
io:format(user, "LINE ~s ~p\n", [?MODULE, ?LINE]),
|
||||||
ok = ?FLU_C:write_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH,
|
ok = ?FLU_C:write_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH,
|
||||||
File2, Off2, Chunk2),
|
File2, Off2, Chunk2),
|
||||||
|
@ -179,11 +179,15 @@ io:format(user, "LINE ~s ~p\n", [?MODULE, ?LINE]),
|
||||||
|
|
||||||
%% We know that File2 still exists. Pretend that we've done a
|
%% We know that File2 still exists. Pretend that we've done a
|
||||||
%% migration and exercise the trunc_hack() API.
|
%% migration and exercise the trunc_hack() API.
|
||||||
|
io:format(user, "LINE ~s ~p\n", [?MODULE, ?LINE]),
|
||||||
ok = ?FLU_C:trunc_hack(Host, TcpPort, ?DUMMY_PV1_EPOCH, File2),
|
ok = ?FLU_C:trunc_hack(Host, TcpPort, ?DUMMY_PV1_EPOCH, File2),
|
||||||
|
io:format(user, "LINE ~s ~p\n", [?MODULE, ?LINE]),
|
||||||
ok = ?FLU_C:trunc_hack(Host, TcpPort, ?DUMMY_PV1_EPOCH, File2),
|
ok = ?FLU_C:trunc_hack(Host, TcpPort, ?DUMMY_PV1_EPOCH, File2),
|
||||||
|
io:format(user, "LINE ~s ~p\n", [?MODULE, ?LINE]),
|
||||||
{error, bad_arg} = ?FLU_C:trunc_hack(Host, TcpPort,
|
{error, bad_arg} = ?FLU_C:trunc_hack(Host, TcpPort,
|
||||||
?DUMMY_PV1_EPOCH, BadFile),
|
?DUMMY_PV1_EPOCH, BadFile),
|
||||||
|
|
||||||
|
io:format(user, "LINE ~s ~p\n", [?MODULE, ?LINE]),
|
||||||
ok = ?FLU_C:quit(?FLU_C:connect(#p_srvr{address=Host,
|
ok = ?FLU_C:quit(?FLU_C:connect(#p_srvr{address=Host,
|
||||||
port=TcpPort}))
|
port=TcpPort}))
|
||||||
after
|
after
|
||||||
|
|
Loading…
Reference in a new issue