Add new API func, append_chunk_extra()

This commit is contained in:
Scott Lystig Fritchie 2015-05-17 14:10:42 +09:00
parent 5c2635346f
commit 6c07522359
6 changed files with 105 additions and 22 deletions

View file

@ -31,8 +31,8 @@ func, and pattern match Erlang style in that func.
*** DONE Preserve current test code (leave as-is? tiny changes?)
*** DONE Make chain manager code flexible enough to run "real world" or "sim"
** DONE Add projection wedging logic to each FLU.
** Started.... Implement real data repair, orchestrated by the chain manager
** TODO Change all protocol ops to enforce the epoch ID
** DONE Implement real data repair, orchestrated by the chain manager
** DONE Change all protocol ops to enforce the epoch ID
- Add no-wedging state to make testing easier?

View file

@ -214,12 +214,13 @@ listen_server_loop(LSock, S) ->
append_server_loop(FluPid, #state{data_dir=DataDir,wedged=Wedged_p}=S) ->
AppendServerPid = self(),
receive
{seq_append, From, _Prefix, _Chunk, _CSum} when Wedged_p ->
{seq_append, From, _Prefix, _Chunk, _CSum, _Extra} when Wedged_p ->
From ! wedged,
append_server_loop(FluPid, S);
{seq_append, From, Prefix, Chunk, CSum} ->
spawn(fun() -> append_server_dispatch(From, Prefix, Chunk, CSum,
DataDir, AppendServerPid) end),
{seq_append, From, Prefix, Chunk, CSum, Extra} ->
spawn(fun() -> append_server_dispatch(From, Prefix,
Chunk, CSum, Extra,
DataDir, AppendServerPid) end),
append_server_loop(FluPid, S);
{wedge_state_change, Boolean, EpochId} ->
true = ets:insert(S#state.etstab, {epoch, {Boolean, EpochId}}),
@ -246,7 +247,7 @@ net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) ->
case gen_tcp:recv(Sock, 0, 600*1000) of
{ok, Line} ->
%% machi_util:verb("Got: ~p\n", [Line]),
PrefixLenLF = byte_size(Line) - 2 - ?EpochIDSpace - 8 - 1,
PrefixLenLF = byte_size(Line) - 2 - ?EpochIDSpace - 8 -8 - 1,
FileLenLF = byte_size(Line) - 2 - ?EpochIDSpace - 16 - 8 - 1,
CSumFileLenLF = byte_size(Line) - 2 - ?EpochIDSpace - 1,
WriteFileLenLF = byte_size(Line) - 7 - ?EpochIDSpace - 16 - 8 - 1,
@ -255,10 +256,11 @@ net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) ->
%% For normal use
<<"A ",
EpochIDHex:(?EpochIDSpace)/binary,
LenHex:8/binary,
LenHex:8/binary, ExtraHex:8/binary,
Prefix:PrefixLenLF/binary, "\n">> ->
_EpochID = decode_epoch_id(EpochIDHex),
do_net_server_append(FluName, Sock, LenHex, Prefix);
do_net_server_append(FluName, Sock, LenHex, ExtraHex,
Prefix);
<<"R ",
EpochIDHex:(?EpochIDSpace)/binary,
OffsetHex:16/binary, LenHex:8/binary,
@ -317,16 +319,16 @@ net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) ->
exit(normal)
end.
append_server_dispatch(From, Prefix, Chunk, CSum, DataDir, LinkPid) ->
append_server_dispatch(From, Prefix, Chunk, CSum, Extra, DataDir, LinkPid) ->
Pid = write_server_get_pid(Prefix, DataDir, LinkPid),
Pid ! {seq_append, From, Prefix, Chunk, CSum},
Pid ! {seq_append, From, Prefix, Chunk, CSum, Extra},
exit(normal).
do_net_server_append(FluName, Sock, LenHex, Prefix) ->
do_net_server_append(FluName, Sock, LenHex, ExtraHex, Prefix) ->
%% TODO: robustify against other invalid path characters such as NUL
case sanitize_file_string(Prefix) of
ok ->
do_net_server_append2(FluName, Sock, LenHex, Prefix);
do_net_server_append2(FluName, Sock, LenHex, ExtraHex, Prefix);
_ ->
ok = gen_tcp:send(Sock, <<"ERROR BAD-ARG">>)
end.
@ -339,13 +341,14 @@ sanitize_file_string(Str) ->
error
end.
do_net_server_append2(FluName, Sock, LenHex, Prefix) ->
do_net_server_append2(FluName, Sock, LenHex, ExtraHex, Prefix) ->
<<Len:32/big>> = machi_util:hexstr_to_bin(LenHex),
<<Extra:32/big>> = machi_util:hexstr_to_bin(ExtraHex),
ok = inet:setopts(Sock, [{packet, raw}]),
{ok, Chunk} = gen_tcp:recv(Sock, Len, 60*1000),
CSum = machi_util:checksum_chunk(Chunk),
try
FluName ! {seq_append, self(), Prefix, Chunk, CSum}
FluName ! {seq_append, self(), Prefix, Chunk, CSum, Extra}
catch error:badarg ->
error_logger:error_msg("Message send to ~p gave badarg, make certain server is running with correct registered name\n", [?MODULE])
end,
@ -707,8 +710,12 @@ seq_append_server_loop(DataDir, Prefix, _File, {FHd,FHc}, FileNum, Offset)
run_seq_append_server2(Prefix, DataDir);
seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}=FH_, FileNum, Offset) ->
receive
{seq_append, From, Prefix, Chunk, CSum} ->
ok = file:pwrite(FHd, Offset, Chunk),
{seq_append, From, Prefix, Chunk, CSum, Extra} ->
if Chunk /= <<>> ->
ok = file:pwrite(FHd, Offset, Chunk);
true ->
ok
end,
From ! {assignment, Offset, File},
Len = byte_size(Chunk),
OffsetHex = machi_util:bin_to_hexstr(<<Offset:64/big>>),
@ -717,7 +724,7 @@ seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}=FH_, FileNum, Offset) ->
CSum_info = [OffsetHex, 32, LenHex, 32, CSumHex, 10],
ok = file:write(FHc, CSum_info),
seq_append_server_loop(DataDir, Prefix, File, FH_,
FileNum, Offset + Len);
FileNum, Offset + Len + Extra);
{sync_stuff, FromPid, Ref} ->
file:sync(FHc),
FromPid ! {sync_finished, Ref},

View file

@ -28,6 +28,7 @@
-export([
%% File API
append_chunk/4, append_chunk/5,
append_chunk_extra/5, append_chunk_extra/6,
read_chunk/5, read_chunk/6,
checksum_list/3, checksum_list/4,
list_files/2, list_files/3,
@ -79,7 +80,7 @@
-spec append_chunk(port(), epoch_id(), file_prefix(), chunk()) ->
{ok, chunk_pos()} | {error, error_general()} | {error, term()}.
append_chunk(Sock, EpochID, Prefix, Chunk) ->
append_chunk2(Sock, EpochID, Prefix, Chunk).
append_chunk2(Sock, EpochID, Prefix, Chunk, 0).
%% @doc Append a chunk (binary- or iolist-style) of data to a file
%% with `Prefix'.
@ -90,7 +91,41 @@ append_chunk(Sock, EpochID, Prefix, Chunk) ->
append_chunk(Host, TcpPort, EpochID, Prefix, Chunk) ->
Sock = machi_util:connect(Host, TcpPort),
try
append_chunk2(Sock, EpochID, Prefix, Chunk)
append_chunk2(Sock, EpochID, Prefix, Chunk, 0)
after
catch gen_tcp:close(Sock)
end.
%% @doc Append a chunk (binary- or iolist-style) of data to a file
%% with `Prefix' and also request an additional `Extra' bytes.
%%
%% For example, if the `Chunk' size is 1 KByte and `Extra' is 4K Bytes, then
%% the file offsets that follow `Chunk''s position for the following 4K will
%% be reserved by the file sequencer for later write(s) by the
%% `write_chunk()' API.
-spec append_chunk_extra(port(), epoch_id(), file_prefix(), chunk(), chunk_size()) ->
{ok, chunk_pos()} | {error, error_general()} | {error, term()}.
append_chunk_extra(Sock, EpochID, Prefix, Chunk, ChunkExtra)
when is_integer(ChunkExtra), ChunkExtra >= 0 ->
append_chunk2(Sock, EpochID, Prefix, Chunk, ChunkExtra).
%% @doc Append a chunk (binary- or iolist-style) of data to a file
%% with `Prefix' and also request an additional `Extra' bytes.
%%
%% For example, if the `Chunk' size is 1 KByte and `Extra' is 4K Bytes, then
%% the file offsets that follow `Chunk''s position for the following 4K will
%% be reserved by the file sequencer for later write(s) by the
%% `write_chunk()' API.
-spec append_chunk_extra(inet_host(), inet_port(),
epoch_id(), file_prefix(), chunk(), chunk_size()) ->
{ok, chunk_pos()} | {error, error_general()} | {error, term()}.
append_chunk_extra(Host, TcpPort, EpochID, Prefix, Chunk, ChunkExtra)
when is_integer(ChunkExtra), ChunkExtra >= 0 ->
Sock = machi_util:connect(Host, TcpPort),
try
append_chunk2(Sock, EpochID, Prefix, Chunk, ChunkExtra)
after
catch gen_tcp:close(Sock)
end.
@ -395,7 +430,7 @@ trunc_hack(Host, TcpPort, EpochID, File) when is_integer(TcpPort) ->
%%%%%%%%%%%%%%%%%%%%%%%%%%%
append_chunk2(Sock, EpochID, Prefix0, Chunk0) ->
append_chunk2(Sock, EpochID, Prefix0, Chunk0, ChunkExtra) ->
erase(bad_sock),
try
%% TODO: add client-side checksum to the server's protocol
@ -408,7 +443,8 @@ append_chunk2(Sock, EpochID, Prefix0, Chunk0) ->
EpochIDHex = machi_util:bin_to_hexstr(
<<EpochNum:(4*8)/big, EpochCSum/binary>>),
LenHex = machi_util:int_to_hexbin(Len, 32),
Cmd = [<<"A ">>, EpochIDHex, LenHex, Prefix, 10],
ExtraHex = machi_util:int_to_hexbin(ChunkExtra, 32),
Cmd = [<<"A ">>, EpochIDHex, LenHex, ExtraHex, Prefix, 10],
ok = gen_tcp:send(Sock, [Cmd, Chunk]),
{ok, Line} = gen_tcp:recv(Sock, 0),
PathLen = byte_size(Line) - 3 - 16 - 1 - 1,

View file

@ -51,6 +51,7 @@
-export([
%% File API
append_chunk/4, append_chunk/5,
append_chunk_extra/5, append_chunk_extra/6,
read_chunk/5, read_chunk/6,
checksum_list/3, checksum_list/4,
list_files/2, list_files/3,
@ -103,6 +104,21 @@ append_chunk(PidSpec, EpochID, Prefix, Chunk, Timeout) ->
gen_server:call(PidSpec, {req, {append_chunk, EpochID, Prefix, Chunk}},
Timeout).
%% @doc Append a chunk (binary- or iolist-style) of data to a file
%% with `Prefix'.
append_chunk_extra(PidSpec, EpochID, Prefix, Chunk, ChunkExtra)
when is_integer(ChunkExtra), ChunkExtra >= 0 ->
append_chunk_extra(PidSpec, EpochID, Prefix, Chunk, ChunkExtra, infinity).
%% @doc Append a chunk (binary- or iolist-style) of data to a file
%% with `Prefix'.
append_chunk_extra(PidSpec, EpochID, Prefix, Chunk, ChunkExtra, Timeout) ->
gen_server:call(PidSpec, {req, {append_chunk_extra, EpochID, Prefix,
Chunk, ChunkExtra}},
Timeout).
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
read_chunk(PidSpec, EpochID, File, Offset, Size) ->
@ -294,6 +310,8 @@ do_req(Req, S) ->
make_req_fun({append_chunk, EpochID, Prefix, Chunk}, #state{sock=Sock}) ->
fun() -> ?FLU_C:append_chunk(Sock, EpochID, Prefix, Chunk) end;
make_req_fun({append_chunk_extra, EpochID, Prefix, Chunk, ChunkExtra}, #state{sock=Sock}) ->
fun() -> ?FLU_C:append_chunk_extra(Sock, EpochID, Prefix, Chunk, ChunkExtra) end;
make_req_fun({read_chunk, EpochID, File, Offset, Size}, #state{sock=Sock}) ->
fun() -> ?FLU_C:read_chunk(Sock, EpochID, File, Offset, Size) end;
make_req_fun({write_chunk, EpochID, File, Offset, Chunk}, #state{sock=Sock}) ->

View file

@ -87,6 +87,23 @@ flu_smoke_test() ->
?DUMMY_PV1_EPOCH,
File1, Off1, Len1*984),
{ok, {Off1b,Len1b,File1b}} = ?FLU_C:append_chunk(Host, TcpPort,
?DUMMY_PV1_EPOCH,
Prefix, Chunk1),
Extra = 42,
{ok, {Off1c,Len1c,File1c}} = ?FLU_C:append_chunk_extra(Host, TcpPort,
?DUMMY_PV1_EPOCH,
Prefix, Chunk1, Extra),
{ok, {Off1d,Len1d,File1d}} = ?FLU_C:append_chunk(Host, TcpPort,
?DUMMY_PV1_EPOCH,
Prefix, Chunk1),
if File1b == File1c, File1c == File1d ->
true = (Off1c == Off1b + Len1b),
true = (Off1d == Off1c + Len1c + Extra);
true ->
exit(not_mandatory_but_test_expected_same_file_fixme)
end,
Chunk2 = <<"yo yo">>,
Len2 = byte_size(Chunk2),
Off2 = ?MINIMUM_OFFSET + 77,

View file

@ -63,6 +63,11 @@ api_smoke_test() ->
?MUT:append_chunk(Prox1, FakeEpoch, <<"prefix">>, MyChunk,
infinity),
{ok, MyChunk} = ?MUT:read_chunk(Prox1, FakeEpoch, MyFile, MyOff, MySize),
MyChunk2 = <<"my chunk data, yeah, again">>,
{ok, {MyOff2,MySize2,MyFile2}} =
?MUT:append_chunk_extra(Prox1, FakeEpoch, <<"prefix">>,
MyChunk2, 4242, infinity),
{ok, MyChunk2} = ?MUT:read_chunk(Prox1, FakeEpoch, MyFile2, MyOff2, MySize2),
%% Alright, now for the rest of the API, whee
BadFile = <<"no-such-file">>,