diff --git a/include/machi_projection.hrl b/include/machi_projection.hrl index 5f5b11b..2e35aed 100644 --- a/include/machi_projection.hrl +++ b/include/machi_projection.hrl @@ -24,6 +24,8 @@ -type pv1_server() :: atom() | binary(). -type pv1_timestamp() :: {non_neg_integer(), non_neg_integer(), non_neg_integer()}. +-define(DUMMY_PV1_EPOCH, {0,<<0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0>>}). + -record(projection_v1, { epoch_number :: pv1_epoch_n(), epoch_csum :: pv1_csum(), diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl index 7ad649f..5a8ba7e 100644 --- a/src/machi_flu1.erl +++ b/src/machi_flu1.erl @@ -100,12 +100,15 @@ append_server_loop(#state{data_dir=DataDir}=S) -> append_server_loop(S) end. +-define(EpochIDSpace, (4+20)). + net_server_loop(Sock, #state{reg_name=RegName, data_dir=DataDir}=S) -> ok = inet:setopts(Sock, [{packet, line}]), case gen_tcp:recv(Sock, 0, 60*1000) of {ok, Line} -> %% machi_util:verb("Got: ~p\n", [Line]), PrefixLenLF = byte_size(Line) - 2 - 8 - 1 - 1, + PrefixLenLF_E = byte_size(Line) - 2 - ?EpochIDSpace - 8 - 1, PrefixLenCRLF = byte_size(Line) - 2 - 8 - 1 - 2, FileLenLF = byte_size(Line) - 2 - 16 - 1 - 8 - 1 - 1, FileLenCRLF = byte_size(Line) - 2 - 16 - 1 - 8 - 1 - 2, @@ -118,6 +121,13 @@ net_server_loop(Sock, #state{reg_name=RegName, data_dir=DataDir}=S) -> <<"A ", LenHex:8/binary, " ", Prefix:PrefixLenLF/binary, "\n">> -> do_net_server_append(RegName, Sock, LenHex, Prefix); +%% BEGIN epoch-id hack + <<"A ", + _EpochIDRaw:(?EpochIDSpace)/binary, + LenHex:8/binary, + Prefix:PrefixLenLF_E/binary, "\n">> -> + do_net_server_append(RegName, Sock, LenHex, Prefix); +%% END epoch-id hack <<"A ", LenHex:8/binary, " ", Prefix:PrefixLenCRLF/binary, "\r\n">> -> do_net_server_append(RegName, Sock, LenHex, Prefix); diff --git a/src/machi_flu1_client.erl b/src/machi_flu1_client.erl index 87accc0..1bbc06d 100644 --- a/src/machi_flu1_client.erl +++ b/src/machi_flu1_client.erl @@ -23,7 +23,7 @@ -include("machi.hrl"). -export([ - append_chunk/3, append_chunk/4, + append_chunk/4, append_chunk/5, read_chunk/4, read_chunk/5, checksum_list/2, checksum_list/3, list_files/1, list_files/2, @@ -41,6 +41,9 @@ -type chunk_s() :: binary(). % server always uses binary() -type chunk_pos() :: {file_offset(), chunk_size(), file_name_s()}. -type chunk_size() :: non_neg_integer(). +-type epoch_csum() :: binary(). +-type epoch_num() :: non_neg_integer(). +-type epoch_id() :: {epoch_num(), epoch_csum()}. -type inet_host() :: inet:ip_address() | inet:hostname(). -type inet_port() :: inet:port_number(). -type file_info() :: {file_size(), file_name_s()}. @@ -53,20 +56,21 @@ %% @doc Append a chunk (binary- or iolist-style) of data to a file %% with `Prefix'. --spec append_chunk(port(), file_prefix(), chunk()) -> +-spec append_chunk(port(), epoch_id(), file_prefix(), chunk()) -> {ok, chunk_pos()} | {error, term()}. -append_chunk(Sock, Prefix, Chunk) -> - append_chunk2(Sock, Prefix, Chunk). +append_chunk(Sock, EpochID, Prefix, Chunk) -> + append_chunk2(Sock, EpochID, Prefix, Chunk). %% @doc Append a chunk (binary- or iolist-style) of data to a file %% with `Prefix'. --spec append_chunk(inet_host(), inet_port(), file_prefix(), chunk()) -> +-spec append_chunk(inet_host(), inet_port(), + epoch_id(), file_prefix(), chunk()) -> {ok, chunk_pos()} | {error, term()}. -append_chunk(Host, TcpPort, Prefix, Chunk) -> +append_chunk(Host, TcpPort, EpochID, Prefix, Chunk) -> Sock = machi_util:connect(Host, TcpPort), try - append_chunk2(Sock, Prefix, Chunk) + append_chunk2(Sock, EpochID, Prefix, Chunk) after catch gen_tcp:close(Sock) end. @@ -208,7 +212,7 @@ trunc_hack(Host, TcpPort, File) when is_integer(TcpPort) -> %%%%%%%%%%%%%%%%%%%%%%%%%%% -append_chunk2(Sock, Prefix0, Chunk0) -> +append_chunk2(Sock, EpochID, Prefix0, Chunk0) -> try %% TODO: add client-side checksum to the server's protocol %% _ = crypto:hash(md5, Chunk), @@ -216,8 +220,10 @@ append_chunk2(Sock, Prefix0, Chunk0) -> Chunk = machi_util:make_binary(Chunk0), Len = iolist_size(Chunk0), true = (Len =< ?MAX_CHUNK_SIZE), + {EpochNum, EpochCSum} = EpochID, + EpochIDRaw = <>, LenHex = machi_util:int_to_hexbin(Len, 32), - Cmd = <<"A ", LenHex/binary, " ", Prefix/binary, "\n">>, + Cmd = [<<"A ">>, EpochIDRaw, LenHex, Prefix, 10], ok = gen_tcp:send(Sock, [Cmd, Chunk]), {ok, Line} = gen_tcp:recv(Sock, 0), PathLen = byte_size(Line) - 3 - 16 - 1 - 1, diff --git a/test/machi_admin_util_test.erl b/test/machi_admin_util_test.erl index 640e760..dd46af2 100644 --- a/test/machi_admin_util_test.erl +++ b/test/machi_admin_util_test.erl @@ -24,6 +24,7 @@ -ifdef(TEST). -include("machi.hrl"). +-include("machi_projection.hrl"). -define(FLU, machi_flu1). -define(FLU_C, machi_flu1_client). @@ -36,7 +37,8 @@ verify_file_checksums_test() -> Sock1 = machi_util:connect(Host, TcpPort), try Prefix = <<"verify_prefix">>, - [{ok, _} = ?FLU_C:append_chunk(Sock1, Prefix, <>) || + [{ok, _} = ?FLU_C:append_chunk(Sock1, ?DUMMY_PV1_EPOCH, + Prefix, <>) || X <- lists:seq(1,10)], {ok, [{_FileSize,File}]} = ?FLU_C:list_files(Sock1), {ok, []} = machi_admin_util:verify_file_checksums_remote( diff --git a/test/machi_flu1_test.erl b/test/machi_flu1_test.erl index ef960b7..e006adc 100644 --- a/test/machi_flu1_test.erl +++ b/test/machi_flu1_test.erl @@ -23,6 +23,7 @@ -ifdef(TEST). -include("machi.hrl"). +-include("machi_projection.hrl"). -include_lib("eunit/include/eunit.hrl"). -define(FLU, machi_flu1). @@ -55,10 +56,12 @@ flu_smoke_test() -> Chunk1 = <<"yo!">>, {ok, {Off1,Len1,File1}} = ?FLU_C:append_chunk(Host, TcpPort, + ?DUMMY_PV1_EPOCH, Prefix, Chunk1), {ok, Chunk1} = ?FLU_C:read_chunk(Host, TcpPort, File1, Off1, Len1), {ok, [{_,_,_}]} = ?FLU_C:checksum_list(Host, TcpPort, File1), {error, bad_arg} = ?FLU_C:append_chunk(Host, TcpPort, + ?DUMMY_PV1_EPOCH, BadPrefix, Chunk1), {ok, [{_,File1}]} = ?FLU_C:list_files(Host, TcpPort), Len1 = size(Chunk1),