WIP: epoch ID added to write/delete/trunc protocol commands
This commit is contained in:
parent
3aaa2c3a3d
commit
7627ba08a3
3 changed files with 60 additions and 42 deletions
|
@ -110,8 +110,8 @@ net_server_loop(Sock, #state{reg_name=RegName, data_dir=DataDir}=S) ->
|
|||
PrefixLenLF = byte_size(Line) - 2 - ?EpochIDSpace - 8 - 1,
|
||||
FileLenLF = byte_size(Line) - 2 - ?EpochIDSpace - 16 - 8 - 1,
|
||||
CSumFileLenLF = byte_size(Line) - 2 - ?EpochIDSpace - 1,
|
||||
WriteFileLenLF = byte_size(Line) - 7 - 16 - 1 - 8 - 1 - 1,
|
||||
DelFileLenLF = byte_size(Line) - 14 - 1,
|
||||
WriteFileLenLF = byte_size(Line) - 7 - ?EpochIDSpace - 16 - 8 - 1,
|
||||
DelFileLenLF = byte_size(Line) - 14 - ?EpochIDSpace - 1,
|
||||
case Line of
|
||||
%% For normal use
|
||||
<<"A ",
|
||||
|
@ -137,14 +137,20 @@ net_server_loop(Sock, #state{reg_name=RegName, data_dir=DataDir}=S) ->
|
|||
catch gen_tcp:close(Sock),
|
||||
exit(normal);
|
||||
%% For "internal" replication only.
|
||||
<<"W-repl ", OffsetHex:16/binary, " ", LenHex:8/binary, " ",
|
||||
<<"W-repl ",
|
||||
_EpochIDRaw:(?EpochIDSpace)/binary,
|
||||
OffsetHex:16/binary, LenHex:8/binary,
|
||||
File:WriteFileLenLF/binary, "\n">> ->
|
||||
do_net_server_write(Sock, OffsetHex, LenHex, File, DataDir);
|
||||
%% For data migration only.
|
||||
<<"DEL-migration ", File:DelFileLenLF/binary, "\n">> ->
|
||||
<<"DEL-migration ",
|
||||
_EpochIDRaw:(?EpochIDSpace)/binary,
|
||||
File:DelFileLenLF/binary, "\n">> ->
|
||||
do_net_server_delete_migration_only(Sock, File, DataDir);
|
||||
%% For erasure coding hackityhack
|
||||
<<"TRUNC-hack--- ", File:DelFileLenLF/binary, "\n">> ->
|
||||
<<"TRUNC-hack--- ",
|
||||
_EpochIDRaw:(?EpochIDSpace)/binary,
|
||||
File:DelFileLenLF/binary, "\n">> ->
|
||||
do_net_server_truncate_hackityhack(Sock, File, DataDir);
|
||||
_ ->
|
||||
machi_util:verb("Else Got: ~p\n", [Line]),
|
||||
|
|
|
@ -31,9 +31,9 @@
|
|||
]).
|
||||
%% For "internal" replication only.
|
||||
-export([
|
||||
write_chunk/4, write_chunk/5,
|
||||
delete_migration/2, delete_migration/3,
|
||||
trunc_hack/2, trunc_hack/3
|
||||
write_chunk/5, write_chunk/6,
|
||||
delete_migration/3, delete_migration/4,
|
||||
trunc_hack/3, trunc_hack/4
|
||||
]).
|
||||
|
||||
-type chunk() :: binary() | iolist(). % client can use either
|
||||
|
@ -151,22 +151,23 @@ quit(Sock) when is_port(Sock) ->
|
|||
%% @doc Restricted API: Write a chunk of already-sequenced data to
|
||||
%% `File' at `Offset'.
|
||||
|
||||
-spec write_chunk(port(), file_name(), file_offset(), chunk()) ->
|
||||
-spec write_chunk(port(), epoch_id(), file_name(), file_offset(), chunk()) ->
|
||||
ok | {error, term()}.
|
||||
write_chunk(Sock, File, Offset, Chunk)
|
||||
write_chunk(Sock, EpochID, File, Offset, Chunk)
|
||||
when Offset >= ?MINIMUM_OFFSET ->
|
||||
write_chunk2(Sock, File, Offset, Chunk).
|
||||
write_chunk2(Sock, EpochID, File, Offset, Chunk).
|
||||
|
||||
%% @doc Restricted API: Write a chunk of already-sequenced data to
|
||||
%% `File' at `Offset'.
|
||||
|
||||
-spec write_chunk(inet_host(), inet_port(), file_name(), file_offset(), chunk()) ->
|
||||
-spec write_chunk(inet_host(), inet_port(),
|
||||
epoch_id(), file_name(), file_offset(), chunk()) ->
|
||||
ok | {error, term()}.
|
||||
write_chunk(Host, TcpPort, File, Offset, Chunk)
|
||||
write_chunk(Host, TcpPort, EpochID, File, Offset, Chunk)
|
||||
when Offset >= ?MINIMUM_OFFSET ->
|
||||
Sock = machi_util:connect(Host, TcpPort),
|
||||
try
|
||||
write_chunk2(Sock, File, Offset, Chunk)
|
||||
write_chunk2(Sock, EpochID, File, Offset, Chunk)
|
||||
after
|
||||
catch gen_tcp:close(Sock)
|
||||
end.
|
||||
|
@ -174,20 +175,20 @@ write_chunk(Host, TcpPort, File, Offset, Chunk)
|
|||
%% @doc Restricted API: Delete a file after it has been successfully
|
||||
%% migrated.
|
||||
|
||||
-spec delete_migration(port(), file_name()) ->
|
||||
-spec delete_migration(port(), epoch_id(), file_name()) ->
|
||||
ok | {error, term()}.
|
||||
delete_migration(Sock, File) when is_port(Sock) ->
|
||||
delete_migration2(Sock, File).
|
||||
delete_migration(Sock, EpochID, File) when is_port(Sock) ->
|
||||
delete_migration2(Sock, EpochID, File).
|
||||
|
||||
%% @doc Restricted API: Delete a file after it has been successfully
|
||||
%% migrated.
|
||||
|
||||
-spec delete_migration(inet_host(), inet_port(), file_name()) ->
|
||||
-spec delete_migration(inet_host(), inet_port(), epoch_id(), file_name()) ->
|
||||
ok | {error, term()}.
|
||||
delete_migration(Host, TcpPort, File) when is_integer(TcpPort) ->
|
||||
delete_migration(Host, TcpPort, EpochID, File) when is_integer(TcpPort) ->
|
||||
Sock = machi_util:connect(Host, TcpPort),
|
||||
try
|
||||
delete_migration2(Sock, File)
|
||||
delete_migration2(Sock, EpochID, File)
|
||||
after
|
||||
catch gen_tcp:close(Sock)
|
||||
end.
|
||||
|
@ -195,20 +196,20 @@ delete_migration(Host, TcpPort, File) when is_integer(TcpPort) ->
|
|||
%% @doc Restricted API: Truncate a file after it has been successfully
|
||||
%% erasure coded.
|
||||
|
||||
-spec trunc_hack(port(), file_name()) ->
|
||||
-spec trunc_hack(port(), epoch_id(), file_name()) ->
|
||||
ok | {error, term()}.
|
||||
trunc_hack(Sock, File) when is_port(Sock) ->
|
||||
trunc_hack2(Sock, File).
|
||||
trunc_hack(Sock, EpochID, File) when is_port(Sock) ->
|
||||
trunc_hack2(Sock, EpochID, File).
|
||||
|
||||
%% @doc Restricted API: Truncate a file after it has been successfully
|
||||
%% erasure coded.
|
||||
|
||||
-spec trunc_hack(inet_host(), inet_port(), file_name()) ->
|
||||
-spec trunc_hack(inet_host(), inet_port(), epoch_id(), file_name()) ->
|
||||
ok | {error, term()}.
|
||||
trunc_hack(Host, TcpPort, File) when is_integer(TcpPort) ->
|
||||
trunc_hack(Host, TcpPort, EpochID, File) when is_integer(TcpPort) ->
|
||||
Sock = machi_util:connect(Host, TcpPort),
|
||||
try
|
||||
trunc_hack2(Sock, File)
|
||||
trunc_hack2(Sock, EpochID, File)
|
||||
after
|
||||
catch gen_tcp:close(Sock)
|
||||
end.
|
||||
|
@ -365,8 +366,10 @@ checksum_list_finish(Chunks) ->
|
|||
end || Line <- re:split(Bin, "\n", [{return, binary}]),
|
||||
Line /= <<>>].
|
||||
|
||||
write_chunk2(Sock, File0, Offset, Chunk0) ->
|
||||
write_chunk2(Sock, EpochID, File0, Offset, Chunk0) ->
|
||||
try
|
||||
{EpochNum, EpochCSum} = EpochID,
|
||||
EpochIDRaw = <<EpochNum:(4*8)/big, EpochCSum/binary>>,
|
||||
%% TODO: add client-side checksum to the server's protocol
|
||||
%% _ = crypto:hash(md5, Chunk),
|
||||
File = machi_util:make_binary(File0),
|
||||
|
@ -376,9 +379,8 @@ write_chunk2(Sock, File0, Offset, Chunk0) ->
|
|||
Len = iolist_size(Chunk0),
|
||||
true = (Len =< ?MAX_CHUNK_SIZE),
|
||||
LenHex = machi_util:int_to_hexbin(Len, 32),
|
||||
Cmd = <<"W-repl ", OffsetHex/binary, " ",
|
||||
LenHex/binary, " ", File/binary, "\n">>,
|
||||
|
||||
Cmd = [<<"W-repl ">>, EpochIDRaw, OffsetHex,
|
||||
LenHex, File, <<"\n">>],
|
||||
ok = gen_tcp:send(Sock, [Cmd, Chunk]),
|
||||
{ok, Line} = gen_tcp:recv(Sock, 0),
|
||||
PathLen = byte_size(Line) - 3 - 16 - 1 - 1,
|
||||
|
@ -397,9 +399,12 @@ write_chunk2(Sock, File0, Offset, Chunk0) ->
|
|||
{error, {badmatch, BadMatch, erlang:get_stacktrace()}}
|
||||
end.
|
||||
|
||||
delete_migration2(Sock, File) ->
|
||||
delete_migration2(Sock, EpochID, File) ->
|
||||
try
|
||||
ok = gen_tcp:send(Sock, [<<"DEL-migration ">>, File, <<"\n">>]),
|
||||
{EpochNum, EpochCSum} = EpochID,
|
||||
EpochIDRaw = <<EpochNum:(4*8)/big, EpochCSum/binary>>,
|
||||
Cmd = [<<"DEL-migration ">>, EpochIDRaw, File, <<"\n">>],
|
||||
ok = gen_tcp:send(Sock, Cmd),
|
||||
ok = inet:setopts(Sock, [{packet, line}]),
|
||||
case gen_tcp:recv(Sock, 0) of
|
||||
{ok, <<"OK\n">>} ->
|
||||
|
@ -418,9 +423,12 @@ delete_migration2(Sock, File) ->
|
|||
{error, {badmatch, BadMatch}}
|
||||
end.
|
||||
|
||||
trunc_hack2(Sock, File) ->
|
||||
trunc_hack2(Sock, EpochID, File) ->
|
||||
try
|
||||
ok = gen_tcp:send(Sock, [<<"TRUNC-hack--- ">>, File, <<"\n">>]),
|
||||
{EpochNum, EpochCSum} = EpochID,
|
||||
EpochIDRaw = <<EpochNum:(4*8)/big, EpochCSum/binary>>,
|
||||
Cmd = [<<"TRUNC-hack--- ">>, EpochIDRaw, File, <<"\n">>],
|
||||
ok = gen_tcp:send(Sock, Cmd),
|
||||
ok = inet:setopts(Sock, [{packet, line}]),
|
||||
case gen_tcp:recv(Sock, 0) of
|
||||
{ok, <<"OK\n">>} ->
|
||||
|
|
|
@ -80,8 +80,9 @@ flu_smoke_test() ->
|
|||
Len2 = byte_size(Chunk2),
|
||||
Off2 = ?MINIMUM_OFFSET + 77,
|
||||
File2 = "smoke-prefix",
|
||||
ok = ?FLU_C:write_chunk(Host, TcpPort, File2, Off2, Chunk2),
|
||||
{error, bad_arg} = ?FLU_C:write_chunk(Host, TcpPort,
|
||||
ok = ?FLU_C:write_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH,
|
||||
File2, Off2, Chunk2),
|
||||
{error, bad_arg} = ?FLU_C:write_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH,
|
||||
BadFile, Off2, Chunk2),
|
||||
{ok, Chunk2} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH,
|
||||
File2, Off2, Len2),
|
||||
|
@ -94,15 +95,18 @@ flu_smoke_test() ->
|
|||
|
||||
%% We know that File1 still exists. Pretend that we've done a
|
||||
%% migration and exercise the delete_migration() API.
|
||||
ok = ?FLU_C:delete_migration(Host, TcpPort, File1),
|
||||
{error, no_such_file} = ?FLU_C:delete_migration(Host, TcpPort, File1),
|
||||
{error, bad_arg} = ?FLU_C:delete_migration(Host, TcpPort, BadFile),
|
||||
ok = ?FLU_C:delete_migration(Host, TcpPort, ?DUMMY_PV1_EPOCH, File1),
|
||||
{error, no_such_file} = ?FLU_C:delete_migration(Host, TcpPort,
|
||||
?DUMMY_PV1_EPOCH, File1),
|
||||
{error, bad_arg} = ?FLU_C:delete_migration(Host, TcpPort,
|
||||
?DUMMY_PV1_EPOCH, BadFile),
|
||||
|
||||
%% We know that File2 still exists. Pretend that we've done a
|
||||
%% migration and exercise the trunc_hack() API.
|
||||
ok = ?FLU_C:trunc_hack(Host, TcpPort, File2),
|
||||
ok = ?FLU_C:trunc_hack(Host, TcpPort, File2),
|
||||
{error, bad_arg} = ?FLU_C:trunc_hack(Host, TcpPort, BadFile),
|
||||
ok = ?FLU_C:trunc_hack(Host, TcpPort, ?DUMMY_PV1_EPOCH, File2),
|
||||
ok = ?FLU_C:trunc_hack(Host, TcpPort, ?DUMMY_PV1_EPOCH, File2),
|
||||
{error, bad_arg} = ?FLU_C:trunc_hack(Host, TcpPort,
|
||||
?DUMMY_PV1_EPOCH, BadFile),
|
||||
|
||||
ok = ?FLU_C:quit(machi_util:connect(Host, TcpPort))
|
||||
after
|
||||
|
|
Loading…
Reference in a new issue