WIP: epoch ID added to append protocol command
This commit is contained in:
parent
030d2ecd10
commit
44bb5e1dae
5 changed files with 33 additions and 10 deletions
|
@ -24,6 +24,8 @@
|
|||
-type pv1_server() :: atom() | binary().
|
||||
-type pv1_timestamp() :: {non_neg_integer(), non_neg_integer(), non_neg_integer()}.
|
||||
|
||||
-define(DUMMY_PV1_EPOCH, {0,<<0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0>>}).
|
||||
|
||||
-record(projection_v1, {
|
||||
epoch_number :: pv1_epoch_n(),
|
||||
epoch_csum :: pv1_csum(),
|
||||
|
|
|
@ -100,12 +100,15 @@ append_server_loop(#state{data_dir=DataDir}=S) ->
|
|||
append_server_loop(S)
|
||||
end.
|
||||
|
||||
-define(EpochIDSpace, (4+20)).
|
||||
|
||||
net_server_loop(Sock, #state{reg_name=RegName, data_dir=DataDir}=S) ->
|
||||
ok = inet:setopts(Sock, [{packet, line}]),
|
||||
case gen_tcp:recv(Sock, 0, 60*1000) of
|
||||
{ok, Line} ->
|
||||
%% machi_util:verb("Got: ~p\n", [Line]),
|
||||
PrefixLenLF = byte_size(Line) - 2 - 8 - 1 - 1,
|
||||
PrefixLenLF_E = byte_size(Line) - 2 - ?EpochIDSpace - 8 - 1,
|
||||
PrefixLenCRLF = byte_size(Line) - 2 - 8 - 1 - 2,
|
||||
FileLenLF = byte_size(Line) - 2 - 16 - 1 - 8 - 1 - 1,
|
||||
FileLenCRLF = byte_size(Line) - 2 - 16 - 1 - 8 - 1 - 2,
|
||||
|
@ -118,6 +121,13 @@ net_server_loop(Sock, #state{reg_name=RegName, data_dir=DataDir}=S) ->
|
|||
<<"A ", LenHex:8/binary, " ",
|
||||
Prefix:PrefixLenLF/binary, "\n">> ->
|
||||
do_net_server_append(RegName, Sock, LenHex, Prefix);
|
||||
%% BEGIN epoch-id hack
|
||||
<<"A ",
|
||||
_EpochIDRaw:(?EpochIDSpace)/binary,
|
||||
LenHex:8/binary,
|
||||
Prefix:PrefixLenLF_E/binary, "\n">> ->
|
||||
do_net_server_append(RegName, Sock, LenHex, Prefix);
|
||||
%% END epoch-id hack
|
||||
<<"A ", LenHex:8/binary, " ",
|
||||
Prefix:PrefixLenCRLF/binary, "\r\n">> ->
|
||||
do_net_server_append(RegName, Sock, LenHex, Prefix);
|
||||
|
|
|
@ -23,7 +23,7 @@
|
|||
-include("machi.hrl").
|
||||
|
||||
-export([
|
||||
append_chunk/3, append_chunk/4,
|
||||
append_chunk/4, append_chunk/5,
|
||||
read_chunk/4, read_chunk/5,
|
||||
checksum_list/2, checksum_list/3,
|
||||
list_files/1, list_files/2,
|
||||
|
@ -41,6 +41,9 @@
|
|||
-type chunk_s() :: binary(). % server always uses binary()
|
||||
-type chunk_pos() :: {file_offset(), chunk_size(), file_name_s()}.
|
||||
-type chunk_size() :: non_neg_integer().
|
||||
-type epoch_csum() :: binary().
|
||||
-type epoch_num() :: non_neg_integer().
|
||||
-type epoch_id() :: {epoch_num(), epoch_csum()}.
|
||||
-type inet_host() :: inet:ip_address() | inet:hostname().
|
||||
-type inet_port() :: inet:port_number().
|
||||
-type file_info() :: {file_size(), file_name_s()}.
|
||||
|
@ -53,20 +56,21 @@
|
|||
%% @doc Append a chunk (binary- or iolist-style) of data to a file
|
||||
%% with `Prefix'.
|
||||
|
||||
-spec append_chunk(port(), file_prefix(), chunk()) ->
|
||||
-spec append_chunk(port(), epoch_id(), file_prefix(), chunk()) ->
|
||||
{ok, chunk_pos()} | {error, term()}.
|
||||
append_chunk(Sock, Prefix, Chunk) ->
|
||||
append_chunk2(Sock, Prefix, Chunk).
|
||||
append_chunk(Sock, EpochID, Prefix, Chunk) ->
|
||||
append_chunk2(Sock, EpochID, Prefix, Chunk).
|
||||
|
||||
%% @doc Append a chunk (binary- or iolist-style) of data to a file
|
||||
%% with `Prefix'.
|
||||
|
||||
-spec append_chunk(inet_host(), inet_port(), file_prefix(), chunk()) ->
|
||||
-spec append_chunk(inet_host(), inet_port(),
|
||||
epoch_id(), file_prefix(), chunk()) ->
|
||||
{ok, chunk_pos()} | {error, term()}.
|
||||
append_chunk(Host, TcpPort, Prefix, Chunk) ->
|
||||
append_chunk(Host, TcpPort, EpochID, Prefix, Chunk) ->
|
||||
Sock = machi_util:connect(Host, TcpPort),
|
||||
try
|
||||
append_chunk2(Sock, Prefix, Chunk)
|
||||
append_chunk2(Sock, EpochID, Prefix, Chunk)
|
||||
after
|
||||
catch gen_tcp:close(Sock)
|
||||
end.
|
||||
|
@ -208,7 +212,7 @@ trunc_hack(Host, TcpPort, File) when is_integer(TcpPort) ->
|
|||
|
||||
%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
||||
|
||||
append_chunk2(Sock, Prefix0, Chunk0) ->
|
||||
append_chunk2(Sock, EpochID, Prefix0, Chunk0) ->
|
||||
try
|
||||
%% TODO: add client-side checksum to the server's protocol
|
||||
%% _ = crypto:hash(md5, Chunk),
|
||||
|
@ -216,8 +220,10 @@ append_chunk2(Sock, Prefix0, Chunk0) ->
|
|||
Chunk = machi_util:make_binary(Chunk0),
|
||||
Len = iolist_size(Chunk0),
|
||||
true = (Len =< ?MAX_CHUNK_SIZE),
|
||||
{EpochNum, EpochCSum} = EpochID,
|
||||
EpochIDRaw = <<EpochNum:(4*8)/big, EpochCSum/binary>>,
|
||||
LenHex = machi_util:int_to_hexbin(Len, 32),
|
||||
Cmd = <<"A ", LenHex/binary, " ", Prefix/binary, "\n">>,
|
||||
Cmd = [<<"A ">>, EpochIDRaw, LenHex, Prefix, 10],
|
||||
ok = gen_tcp:send(Sock, [Cmd, Chunk]),
|
||||
{ok, Line} = gen_tcp:recv(Sock, 0),
|
||||
PathLen = byte_size(Line) - 3 - 16 - 1 - 1,
|
||||
|
|
|
@ -24,6 +24,7 @@
|
|||
-ifdef(TEST).
|
||||
|
||||
-include("machi.hrl").
|
||||
-include("machi_projection.hrl").
|
||||
|
||||
-define(FLU, machi_flu1).
|
||||
-define(FLU_C, machi_flu1_client).
|
||||
|
@ -36,7 +37,8 @@ verify_file_checksums_test() ->
|
|||
Sock1 = machi_util:connect(Host, TcpPort),
|
||||
try
|
||||
Prefix = <<"verify_prefix">>,
|
||||
[{ok, _} = ?FLU_C:append_chunk(Sock1, Prefix, <<X:(X*8)/big>>) ||
|
||||
[{ok, _} = ?FLU_C:append_chunk(Sock1, ?DUMMY_PV1_EPOCH,
|
||||
Prefix, <<X:(X*8)/big>>) ||
|
||||
X <- lists:seq(1,10)],
|
||||
{ok, [{_FileSize,File}]} = ?FLU_C:list_files(Sock1),
|
||||
{ok, []} = machi_admin_util:verify_file_checksums_remote(
|
||||
|
|
|
@ -23,6 +23,7 @@
|
|||
|
||||
-ifdef(TEST).
|
||||
-include("machi.hrl").
|
||||
-include("machi_projection.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
-define(FLU, machi_flu1).
|
||||
|
@ -55,10 +56,12 @@ flu_smoke_test() ->
|
|||
|
||||
Chunk1 = <<"yo!">>,
|
||||
{ok, {Off1,Len1,File1}} = ?FLU_C:append_chunk(Host, TcpPort,
|
||||
?DUMMY_PV1_EPOCH,
|
||||
Prefix, Chunk1),
|
||||
{ok, Chunk1} = ?FLU_C:read_chunk(Host, TcpPort, File1, Off1, Len1),
|
||||
{ok, [{_,_,_}]} = ?FLU_C:checksum_list(Host, TcpPort, File1),
|
||||
{error, bad_arg} = ?FLU_C:append_chunk(Host, TcpPort,
|
||||
?DUMMY_PV1_EPOCH,
|
||||
BadPrefix, Chunk1),
|
||||
{ok, [{_,File1}]} = ?FLU_C:list_files(Host, TcpPort),
|
||||
Len1 = size(Chunk1),
|
||||
|
|
Loading…
Reference in a new issue