From 7627ba08a3fea58cb5adc1d4a0a61090300a656e Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Thu, 2 Apr 2015 21:18:41 +0900 Subject: [PATCH] WIP: epoch ID added to write/delete/trunc protocol commands --- src/machi_flu1.erl | 16 +++++++--- src/machi_flu1_client.erl | 66 ++++++++++++++++++++++----------------- test/machi_flu1_test.erl | 20 +++++++----- 3 files changed, 60 insertions(+), 42 deletions(-) diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl index 5ba5dd9..78fc3b9 100644 --- a/src/machi_flu1.erl +++ b/src/machi_flu1.erl @@ -110,8 +110,8 @@ net_server_loop(Sock, #state{reg_name=RegName, data_dir=DataDir}=S) -> PrefixLenLF = byte_size(Line) - 2 - ?EpochIDSpace - 8 - 1, FileLenLF = byte_size(Line) - 2 - ?EpochIDSpace - 16 - 8 - 1, CSumFileLenLF = byte_size(Line) - 2 - ?EpochIDSpace - 1, - WriteFileLenLF = byte_size(Line) - 7 - 16 - 1 - 8 - 1 - 1, - DelFileLenLF = byte_size(Line) - 14 - 1, + WriteFileLenLF = byte_size(Line) - 7 - ?EpochIDSpace - 16 - 8 - 1, + DelFileLenLF = byte_size(Line) - 14 - ?EpochIDSpace - 1, case Line of %% For normal use <<"A ", @@ -137,14 +137,20 @@ net_server_loop(Sock, #state{reg_name=RegName, data_dir=DataDir}=S) -> catch gen_tcp:close(Sock), exit(normal); %% For "internal" replication only. - <<"W-repl ", OffsetHex:16/binary, " ", LenHex:8/binary, " ", + <<"W-repl ", + _EpochIDRaw:(?EpochIDSpace)/binary, + OffsetHex:16/binary, LenHex:8/binary, File:WriteFileLenLF/binary, "\n">> -> do_net_server_write(Sock, OffsetHex, LenHex, File, DataDir); %% For data migration only. - <<"DEL-migration ", File:DelFileLenLF/binary, "\n">> -> + <<"DEL-migration ", + _EpochIDRaw:(?EpochIDSpace)/binary, + File:DelFileLenLF/binary, "\n">> -> do_net_server_delete_migration_only(Sock, File, DataDir); %% For erasure coding hackityhack - <<"TRUNC-hack--- ", File:DelFileLenLF/binary, "\n">> -> + <<"TRUNC-hack--- ", + _EpochIDRaw:(?EpochIDSpace)/binary, + File:DelFileLenLF/binary, "\n">> -> do_net_server_truncate_hackityhack(Sock, File, DataDir); _ -> machi_util:verb("Else Got: ~p\n", [Line]), diff --git a/src/machi_flu1_client.erl b/src/machi_flu1_client.erl index 6652ed0..d999bab 100644 --- a/src/machi_flu1_client.erl +++ b/src/machi_flu1_client.erl @@ -31,9 +31,9 @@ ]). %% For "internal" replication only. -export([ - write_chunk/4, write_chunk/5, - delete_migration/2, delete_migration/3, - trunc_hack/2, trunc_hack/3 + write_chunk/5, write_chunk/6, + delete_migration/3, delete_migration/4, + trunc_hack/3, trunc_hack/4 ]). -type chunk() :: binary() | iolist(). % client can use either @@ -151,22 +151,23 @@ quit(Sock) when is_port(Sock) -> %% @doc Restricted API: Write a chunk of already-sequenced data to %% `File' at `Offset'. --spec write_chunk(port(), file_name(), file_offset(), chunk()) -> +-spec write_chunk(port(), epoch_id(), file_name(), file_offset(), chunk()) -> ok | {error, term()}. -write_chunk(Sock, File, Offset, Chunk) +write_chunk(Sock, EpochID, File, Offset, Chunk) when Offset >= ?MINIMUM_OFFSET -> - write_chunk2(Sock, File, Offset, Chunk). + write_chunk2(Sock, EpochID, File, Offset, Chunk). %% @doc Restricted API: Write a chunk of already-sequenced data to %% `File' at `Offset'. --spec write_chunk(inet_host(), inet_port(), file_name(), file_offset(), chunk()) -> +-spec write_chunk(inet_host(), inet_port(), + epoch_id(), file_name(), file_offset(), chunk()) -> ok | {error, term()}. -write_chunk(Host, TcpPort, File, Offset, Chunk) +write_chunk(Host, TcpPort, EpochID, File, Offset, Chunk) when Offset >= ?MINIMUM_OFFSET -> Sock = machi_util:connect(Host, TcpPort), try - write_chunk2(Sock, File, Offset, Chunk) + write_chunk2(Sock, EpochID, File, Offset, Chunk) after catch gen_tcp:close(Sock) end. @@ -174,20 +175,20 @@ write_chunk(Host, TcpPort, File, Offset, Chunk) %% @doc Restricted API: Delete a file after it has been successfully %% migrated. --spec delete_migration(port(), file_name()) -> +-spec delete_migration(port(), epoch_id(), file_name()) -> ok | {error, term()}. -delete_migration(Sock, File) when is_port(Sock) -> - delete_migration2(Sock, File). +delete_migration(Sock, EpochID, File) when is_port(Sock) -> + delete_migration2(Sock, EpochID, File). %% @doc Restricted API: Delete a file after it has been successfully %% migrated. --spec delete_migration(inet_host(), inet_port(), file_name()) -> +-spec delete_migration(inet_host(), inet_port(), epoch_id(), file_name()) -> ok | {error, term()}. -delete_migration(Host, TcpPort, File) when is_integer(TcpPort) -> +delete_migration(Host, TcpPort, EpochID, File) when is_integer(TcpPort) -> Sock = machi_util:connect(Host, TcpPort), try - delete_migration2(Sock, File) + delete_migration2(Sock, EpochID, File) after catch gen_tcp:close(Sock) end. @@ -195,20 +196,20 @@ delete_migration(Host, TcpPort, File) when is_integer(TcpPort) -> %% @doc Restricted API: Truncate a file after it has been successfully %% erasure coded. --spec trunc_hack(port(), file_name()) -> +-spec trunc_hack(port(), epoch_id(), file_name()) -> ok | {error, term()}. -trunc_hack(Sock, File) when is_port(Sock) -> - trunc_hack2(Sock, File). +trunc_hack(Sock, EpochID, File) when is_port(Sock) -> + trunc_hack2(Sock, EpochID, File). %% @doc Restricted API: Truncate a file after it has been successfully %% erasure coded. --spec trunc_hack(inet_host(), inet_port(), file_name()) -> +-spec trunc_hack(inet_host(), inet_port(), epoch_id(), file_name()) -> ok | {error, term()}. -trunc_hack(Host, TcpPort, File) when is_integer(TcpPort) -> +trunc_hack(Host, TcpPort, EpochID, File) when is_integer(TcpPort) -> Sock = machi_util:connect(Host, TcpPort), try - trunc_hack2(Sock, File) + trunc_hack2(Sock, EpochID, File) after catch gen_tcp:close(Sock) end. @@ -365,8 +366,10 @@ checksum_list_finish(Chunks) -> end || Line <- re:split(Bin, "\n", [{return, binary}]), Line /= <<>>]. -write_chunk2(Sock, File0, Offset, Chunk0) -> +write_chunk2(Sock, EpochID, File0, Offset, Chunk0) -> try + {EpochNum, EpochCSum} = EpochID, + EpochIDRaw = <>, %% TODO: add client-side checksum to the server's protocol %% _ = crypto:hash(md5, Chunk), File = machi_util:make_binary(File0), @@ -376,9 +379,8 @@ write_chunk2(Sock, File0, Offset, Chunk0) -> Len = iolist_size(Chunk0), true = (Len =< ?MAX_CHUNK_SIZE), LenHex = machi_util:int_to_hexbin(Len, 32), - Cmd = <<"W-repl ", OffsetHex/binary, " ", - LenHex/binary, " ", File/binary, "\n">>, - + Cmd = [<<"W-repl ">>, EpochIDRaw, OffsetHex, + LenHex, File, <<"\n">>], ok = gen_tcp:send(Sock, [Cmd, Chunk]), {ok, Line} = gen_tcp:recv(Sock, 0), PathLen = byte_size(Line) - 3 - 16 - 1 - 1, @@ -397,9 +399,12 @@ write_chunk2(Sock, File0, Offset, Chunk0) -> {error, {badmatch, BadMatch, erlang:get_stacktrace()}} end. -delete_migration2(Sock, File) -> +delete_migration2(Sock, EpochID, File) -> try - ok = gen_tcp:send(Sock, [<<"DEL-migration ">>, File, <<"\n">>]), + {EpochNum, EpochCSum} = EpochID, + EpochIDRaw = <>, + Cmd = [<<"DEL-migration ">>, EpochIDRaw, File, <<"\n">>], + ok = gen_tcp:send(Sock, Cmd), ok = inet:setopts(Sock, [{packet, line}]), case gen_tcp:recv(Sock, 0) of {ok, <<"OK\n">>} -> @@ -418,9 +423,12 @@ delete_migration2(Sock, File) -> {error, {badmatch, BadMatch}} end. -trunc_hack2(Sock, File) -> +trunc_hack2(Sock, EpochID, File) -> try - ok = gen_tcp:send(Sock, [<<"TRUNC-hack--- ">>, File, <<"\n">>]), + {EpochNum, EpochCSum} = EpochID, + EpochIDRaw = <>, + Cmd = [<<"TRUNC-hack--- ">>, EpochIDRaw, File, <<"\n">>], + ok = gen_tcp:send(Sock, Cmd), ok = inet:setopts(Sock, [{packet, line}]), case gen_tcp:recv(Sock, 0) of {ok, <<"OK\n">>} -> diff --git a/test/machi_flu1_test.erl b/test/machi_flu1_test.erl index e98dd6c..0552fad 100644 --- a/test/machi_flu1_test.erl +++ b/test/machi_flu1_test.erl @@ -80,8 +80,9 @@ flu_smoke_test() -> Len2 = byte_size(Chunk2), Off2 = ?MINIMUM_OFFSET + 77, File2 = "smoke-prefix", - ok = ?FLU_C:write_chunk(Host, TcpPort, File2, Off2, Chunk2), - {error, bad_arg} = ?FLU_C:write_chunk(Host, TcpPort, + ok = ?FLU_C:write_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH, + File2, Off2, Chunk2), + {error, bad_arg} = ?FLU_C:write_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH, BadFile, Off2, Chunk2), {ok, Chunk2} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH, File2, Off2, Len2), @@ -94,15 +95,18 @@ flu_smoke_test() -> %% We know that File1 still exists. Pretend that we've done a %% migration and exercise the delete_migration() API. - ok = ?FLU_C:delete_migration(Host, TcpPort, File1), - {error, no_such_file} = ?FLU_C:delete_migration(Host, TcpPort, File1), - {error, bad_arg} = ?FLU_C:delete_migration(Host, TcpPort, BadFile), + ok = ?FLU_C:delete_migration(Host, TcpPort, ?DUMMY_PV1_EPOCH, File1), + {error, no_such_file} = ?FLU_C:delete_migration(Host, TcpPort, + ?DUMMY_PV1_EPOCH, File1), + {error, bad_arg} = ?FLU_C:delete_migration(Host, TcpPort, + ?DUMMY_PV1_EPOCH, BadFile), %% We know that File2 still exists. Pretend that we've done a %% migration and exercise the trunc_hack() API. - ok = ?FLU_C:trunc_hack(Host, TcpPort, File2), - ok = ?FLU_C:trunc_hack(Host, TcpPort, File2), - {error, bad_arg} = ?FLU_C:trunc_hack(Host, TcpPort, BadFile), + ok = ?FLU_C:trunc_hack(Host, TcpPort, ?DUMMY_PV1_EPOCH, File2), + ok = ?FLU_C:trunc_hack(Host, TcpPort, ?DUMMY_PV1_EPOCH, File2), + {error, bad_arg} = ?FLU_C:trunc_hack(Host, TcpPort, + ?DUMMY_PV1_EPOCH, BadFile), ok = ?FLU_C:quit(machi_util:connect(Host, TcpPort)) after