Update read_chunk() PB protocol to return trimmed chunks #25

Merged
kuenishi merged 1 commit from ku/trim-pb-protocol-2 into master 2015-10-23 08:10:21 +00:00
18 changed files with 285 additions and 178 deletions

View file

@ -205,11 +205,15 @@ message Mpb_ReadChunkReq {
// only makes sense if flag_no_checksum is not set).
// TODO: not implemented yet.
optional uint32 flag_no_chunk = 3 [default=0];
// TODO: not implemented yet.
optional uint32 flag_needs_trimmed = 4 [default=0];
}
message Mpb_ReadChunkResp {
required Mpb_GeneralStatusCode status = 1;
repeated Mpb_Chunk chunks = 2;
repeated Mpb_ChunkPos trimmed = 3;
}
// High level API: trim_chunk() request & response
@ -410,11 +414,14 @@ message Mpb_LL_ReadChunkReq {
// only makes sense if flag_checksum is not set).
// TODO: not implemented yet.
optional uint32 flag_no_chunk = 4 [default=0];
optional uint32 flag_needs_trimmed = 5 [default=0];
}
message Mpb_LL_ReadChunkResp {
required Mpb_GeneralStatusCode status = 1;
repeated Mpb_Chunk chunks = 2;
repeated Mpb_ChunkPos trimmed = 3;
}
// Low level API: checksum_list()

View file

@ -76,7 +76,7 @@ verify_file_checksums_local2(Sock1, EpochID, Path0) ->
ReadChunk = fun(F, Offset, Size) ->
case file:pread(FH, Offset, Size) of
{ok, Bin} ->
{ok, [{F, Offset, Bin, undefined}]};
{ok, {[{F, Offset, Bin, undefined}], []}};
Err ->
Err
end
@ -92,7 +92,7 @@ verify_file_checksums_local2(Sock1, EpochID, Path0) ->
verify_file_checksums_remote2(Sock1, EpochID, File) ->
ReadChunk = fun(File_name, Offset, Size) ->
?FLU_C:read_chunk(Sock1, EpochID,
File_name, Offset, Size)
File_name, Offset, Size, [])
end,
verify_file_checksums_common(Sock1, EpochID, File, ReadChunk).
@ -117,7 +117,7 @@ verify_file_checksums_common(Sock1, EpochID, File, ReadChunk) ->
verify_chunk_checksum(File, ReadChunk) ->
fun({Offset, Size, <<_Tag:1/binary, CSum/binary>>}, Acc) ->
case ReadChunk(File, Offset, Size) of
{ok, [{_, Offset, Chunk, _}]} ->
{ok, {[{_, Offset, Chunk, _}], _}} ->
CSum2 = machi_util:checksum_chunk(Chunk),
if CSum == CSum2 ->
Acc;

View file

@ -324,7 +324,8 @@ execute_repair_directive({File, Cmds}, {ProxiesDict, EpochID, Verb, ETS}=Acc) ->
_ -> ok
end,
_T1 = os:timestamp(),
{ok, [{_, Offset, Chunk, _}]} =
%% TODO: support case multiple written or trimmed chunks returned
{ok, {[{_, Offset, Chunk, _}], _}} =
machi_proxy_flu1_client:read_chunk(
SrcP, EpochID, File, Offset, Size,
?SHORT_TIMEOUT),

View file

@ -122,7 +122,7 @@
append_chunk/3, append_chunk/4,
append_chunk_extra/4, append_chunk_extra/5,
write_chunk/4, write_chunk/5,
read_chunk/4, read_chunk/5,
read_chunk/5, read_chunk/6,
trim_chunk/4, trim_chunk/5,
checksum_list/2, checksum_list/3,
list_files/1, list_files/2,
@ -201,14 +201,14 @@ write_chunk(PidSpec, File, Offset, Chunk, Timeout0) ->
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
read_chunk(PidSpec, File, Offset, Size) ->
read_chunk(PidSpec, File, Offset, Size, ?DEFAULT_TIMEOUT).
read_chunk(PidSpec, File, Offset, Size, Opts) ->
read_chunk(PidSpec, File, Offset, Size, Opts, ?DEFAULT_TIMEOUT).
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
read_chunk(PidSpec, File, Offset, Size, Timeout0) ->
read_chunk(PidSpec, File, Offset, Size, Opts, Timeout0) ->
{TO, Timeout} = timeout(Timeout0),
gen_server:call(PidSpec, {req, {read_chunk, File, Offset, Size, TO}},
gen_server:call(PidSpec, {req, {read_chunk, File, Offset, Size, Opts, TO}},
Timeout).
%% @doc Trim a chunk of data of size `Size' from `File' at `Offset'.
@ -288,8 +288,8 @@ handle_call2({append_chunk_extra, Prefix, Chunk, ChunkExtra, TO}, _From, S) ->
do_append_head(Prefix, Chunk, ChunkExtra, 0, os:timestamp(), TO, S);
handle_call2({write_chunk, File, Offset, Chunk, TO}, _From, S) ->
do_write_head(File, Offset, Chunk, 0, os:timestamp(), TO, S);
handle_call2({read_chunk, File, Offset, Size, TO}, _From, S) ->
do_read_chunk(File, Offset, Size, 0, os:timestamp(), TO, S);
handle_call2({read_chunk, File, Offset, Size, Opts, TO}, _From, S) ->
do_read_chunk(File, Offset, Size, Opts, 0, os:timestamp(), TO, S);
handle_call2({trim_chunk, File, Offset, Size, TO}, _From, S) ->
do_trim_chunk(File, Offset, Size, 0, os:timestamp(), TO, S);
handle_call2({checksum_list, File, TO}, _From, S) ->
@ -503,11 +503,10 @@ do_write_head2(File, Offset, Chunk, Depth, STime, TO,
iolist_size(Chunk)})
end.
do_read_chunk(File, Offset, Size, 0=Depth, STime, TO,
do_read_chunk(File, Offset, Size, Opts, 0=Depth, STime, TO,
#state{proj=#projection_v1{upi=[_|_]}}=S) -> % UPI is non-empty
do_read_chunk2(File, Offset, Size, Depth + 1, STime, TO, S);
do_read_chunk(File, Offset, Size, Depth, STime, TO, #state{proj=P}=S) ->
%% io:format(user, "read sleep1,", []),
do_read_chunk2(File, Offset, Size, Opts, Depth + 1, STime, TO, S);
do_read_chunk(File, Offset, Size, Opts, Depth, STime, TO, #state{proj=P}=S) ->
sleep_a_while(Depth),
DiffMs = timer:now_diff(os:timestamp(), STime) div 1000,
if DiffMs > TO ->
@ -517,22 +516,22 @@ do_read_chunk(File, Offset, Size, Depth, STime, TO, #state{proj=P}=S) ->
case S2#state.proj of
P2 when P2 == undefined orelse
P2#projection_v1.upi == [] ->
do_read_chunk(File, Offset, Size, Depth + 1, STime, TO, S2);
do_read_chunk(File, Offset, Size, Opts, Depth + 1, STime, TO, S2);
_ ->
do_read_chunk2(File, Offset, Size, Depth + 1, STime, TO, S2)
do_read_chunk2(File, Offset, Size, Opts, Depth + 1, STime, TO, S2)
end
end.
do_read_chunk2(File, Offset, Size, Depth, STime, TO,
do_read_chunk2(File, Offset, Size, Opts, Depth, STime, TO,
#state{proj=P, epoch_id=EpochID, proxies_dict=PD}=S) ->
UPI = readonly_flus(P),
Tail = lists:last(UPI),
ConsistencyMode = P#projection_v1.mode,
case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), EpochID,
File, Offset, Size, ?TIMEOUT) of
{ok, Chunks0} when is_list(Chunks0) ->
File, Offset, Size, Opts, ?TIMEOUT) of
{ok, {Chunks0, []}} when is_list(Chunks0) ->
Chunks = trim_both_side(Chunks0, Offset, Offset + Size),
{reply, {ok, Chunks}, S};
{reply, {ok, {Chunks, []}}, S};
%% {ok, BadChunk} ->
%% %% TODO cleaner handling of bad chunks
%% exit({todo, bad_chunk_size, ?MODULE, ?LINE, File, Offset, Size,
@ -546,7 +545,7 @@ do_read_chunk2(File, Offset, Size, Depth, STime, TO,
{reply, BadCS, S};
{error, Retry}
when Retry == partition; Retry == bad_epoch; Retry == wedged ->
do_read_chunk(File, Offset, Size, Depth, STime, TO, S);
do_read_chunk(File, Offset, Size, Opts, Depth, STime, TO, S);
{error, not_written} ->
read_repair(ConsistencyMode, read, File, Offset, Size, Depth, STime, S);
%% {reply, {error, not_written}, S};
@ -624,10 +623,12 @@ read_repair2(ap_mode=ConsistencyMode,
#state{proj=P}=S) ->
Eligible = mutation_flus(P),
case try_to_find_chunk(Eligible, File, Offset, Size, S) of
{ok, Chunks, GotItFrom} when is_list(Chunks) ->
{ok, {Chunks, _Trimmed}, GotItFrom} when is_list(Chunks) ->
ToRepair = mutation_flus(P) -- [GotItFrom],
{Reply, S1} = do_repair_chunks(Chunks, ToRepair, ReturnMode, [GotItFrom],
{Reply0, S1} = do_repair_chunks(Chunks, ToRepair, ReturnMode, [GotItFrom],
File, Depth, STime, S, {ok, Chunks}),
{ok, Chunks} = Reply0,
Reply = {ok, {Chunks, _Trimmed}},
{reply, Reply, S1};
{error, bad_checksum}=BadCS ->
%% TODO: alternate strategy?
@ -818,7 +819,7 @@ update_proj2(Count, #state{bad_proj=BadProj, proxies_dict=ProxiesDict}=S) ->
run_middleworker_job(Fun, ArgList, WTimeout) ->
Parent = self(),
MiddleWorker =
spawn(fun() ->
spawn_link(fun() ->
PidsMons =
[spawn_monitor(fun() ->
Res = (catch Fun(Arg)),
@ -859,17 +860,19 @@ try_to_find_chunk(Eligible, File, Offset, Size,
Work = fun(FLU) ->
Proxy = orddict:fetch(FLU, PD),
case ?FLU_PC:read_chunk(Proxy, EpochID,
File, Offset, Size) of
{ok, Chunks} when is_list(Chunks) ->
{FLU, {ok, Chunks}};
%% TODO Trimmed is required here
File, Offset, Size, []) of
{ok, {_Chunks, _} = ChunksAndTrimmed} ->
{FLU, {ok, ChunksAndTrimmed}};
Else ->
{FLU, Else}
end
end,
Rs = run_middleworker_job(Work, Eligible, Timeout),
case [X || {_, {ok, [{_,_,B,_}]}}=X <- Rs, is_binary(B)] of
[{FoundFLU, {ok, Chunk}}|_] ->
{ok, Chunk, FoundFLU};
case [X || {_Fluname, {ok, {[{_,_,B,_}], _}}}=X <- Rs, is_binary(B)] of
[{FoundFLU, {ok, ChunkAndTrimmed}}|_] ->
{ok, ChunkAndTrimmed, FoundFLU};
[] ->
RetryErrs = [partition, bad_epoch, wedged],
case [Err || {error, Err} <- Rs, lists:member(Err, RetryErrs)] of

View file

@ -52,6 +52,7 @@
sync/1,
sync/2,
read/3,
read/4,
write/3,
write/4,
append/2,
@ -83,9 +84,9 @@
wedged = false :: boolean(),
csum_file :: string()|undefined,
csum_path :: string()|undefined,
eof_position = 0 :: non_neg_integer(),
data_filehandle :: file:io_device(),
csum_table :: machi_csum_table:table(),
eof_position = 0 :: non_neg_integer(),
tref :: reference(), %% timer ref
ticks = 0 :: non_neg_integer(), %% ticks elapsed with no new operations
ops = 0 :: non_neg_integer(), %% sum of all ops
@ -135,11 +136,22 @@ sync(_Pid, Type) ->
{ok, [{Filename::string(), Offset :: non_neg_integer(),
Data :: binary(), Checksum :: binary()}]} |
{error, Reason :: term()}.
read(Pid, Offset, Length) when is_pid(Pid) andalso is_integer(Offset) andalso Offset >= 0
andalso is_integer(Length) andalso Length > 0 ->
gen_server:call(Pid, {read, Offset, Length}, ?TIMEOUT);
read(_Pid, Offset, Length) ->
lager:warning("Bad args to read: Offset ~p, Length ~p", [Offset, Length]),
read(Pid, Offset, Length) ->
read(Pid, Offset, Length, []).
-spec read(Pid :: pid(),
Offset :: non_neg_integer(),
Length :: non_neg_integer(),
[{no_checksum|no_chunk|needs_trimmed, boolean()}]) ->
{ok, [{Filename::string(), Offset :: non_neg_integer(),
Data :: binary(), Checksum :: binary()}]} |
{error, Reason :: term()}.
read(Pid, Offset, Length, Opts) when is_pid(Pid) andalso is_integer(Offset) andalso Offset >= 0
andalso is_integer(Length) andalso Length > 0
andalso is_list(Opts) ->
gen_server:call(Pid, {read, Offset, Length, Opts}, ?TIMEOUT);
read(_Pid, Offset, Length, Opts) ->
lager:warning("Bad args to read: Offset ~p, Length ~p, Options ~p", [Offset, Length, Opts]),
{error, bad_arg}.
% @doc Write data at offset
@ -211,8 +223,8 @@ init({Filename, DataDir}) ->
csum_table = CsumTable,
tref = Tref,
eof_position = Eof},
lager:debug("Starting file proxy ~p for filename ~p, state = ~p",
[self(), Filename, St]),
lager:debug("Starting file proxy ~p for filename ~p, state = ~p, Eof = ~p",
[self(), Filename, St, Eof]),
{ok, St}.
% @private
@ -250,13 +262,13 @@ handle_call({sync, all}, _From, State = #state{filename = F,
%%% READS
handle_call({read, _Offset, _Length}, _From,
handle_call({read, _Offset, _Length, _}, _From,
State = #state{wedged = true,
reads = {T, Err}
}) ->
{reply, {error, wedged}, State#state{writes = {T + 1, Err + 1}}};
handle_call({read, Offset, Length}, _From,
handle_call({read, Offset, Length, _Opts}, _From,
State = #state{eof_position = Eof,
reads = {T, Err}
}) when Offset > Eof ->
@ -265,23 +277,28 @@ handle_call({read, Offset, Length}, _From,
[Offset, Length, Eof]),
{reply, {error, not_written}, State#state{reads = {T + 1, Err + 1}}};
handle_call({read, Offset, Length}, _From,
handle_call({read, Offset, Length, Opts}, _From,
State = #state{filename = F,
data_filehandle = FH,
csum_table = CsumTable,
eof_position = EofP,
reads = {T, Err}
}) ->
NoChecksum = proplists:get_value(no_checksum, Opts, false),
NoChunk = proplists:get_value(no_chunk, Opts, false),
NeedsMerge = proplists:get_value(needs_trimmed, Opts, false),
{Resp, NewErr} =
case do_read(FH, F, CsumTable, Offset, Length) of
{ok, []} ->
case do_read(FH, F, CsumTable, Offset, Length, NoChecksum, NoChunk, NeedsMerge) of
{ok, {[], []}} ->
{{error, not_written}, Err + 1};
{ok, Chunks} ->
%% Kludge to wrap read result in tuples, to support fragmented read
%% XXX FIXME
%% For now we are omiting the checksum data because it blows up
%% protobufs.
{{ok, Chunks}, Err};
{ok, {Chunks, Trimmed}} ->
%% Kludge to wrap read result in tuples, to support fragmented read
%% XXX FIXME
%% For now we are omiting the checksum data because it blows up
%% protobufs.
{{ok, {Chunks, Trimmed}}, Err};
Error ->
lager:error("Can't read ~p, ~p at File ~p", [Offset, Length, F]),
{Error, Err + 1}
end,
{reply, Resp, State#state{reads = {T+1, NewErr}}};
@ -298,6 +315,7 @@ handle_call({write, Offset, ClientMeta, Data}, _From,
State = #state{filename = F,
writes = {T, Err},
data_filehandle = FHd,
eof_position=EofP,
csum_table = CsumTable}) ->
ClientCsumTag = proplists:get_value(client_csum_tag, ClientMeta, ?CSUM_TAG_NONE),
@ -318,6 +336,8 @@ handle_call({write, Offset, ClientMeta, Data}, _From,
end
end,
{NewEof, infinity} = lists:last(machi_csum_table:calc_unwritten_bytes(CsumTable)),
lager:debug("Wrote ~p bytes at ~p of file ~p, NewEOF = ~p~n",
[iolist_size(Data), Offset, F, NewEof]),
{reply, Resp, State#state{writes = {T+1, NewErr},
eof_position = NewEof}};
@ -351,15 +371,14 @@ handle_call({append, ClientMeta, Extra, Data}, _From,
ok ->
{{ok, F, EofP}, Err};
Error ->
{Error, Err + 1, EofP}
{Error, Err + 1}
end
end,
%% TODO: do we check this with calling
%% machi_csum_table:calc_unwritten_bytes/1?
NewEof = EofP + byte_size(Data) + Extra,
lager:debug("appended ~p bytes at ~p file ~p. NewEofP = ~p",
[iolist_size(Data), EofP, F, NewEof]),
{reply, Resp, State#state{appends = {T+1, NewErr},
eof_position = NewEof
}};
eof_position = NewEof}};
handle_call(Req, _From, State) ->
lager:warning("Unknown call: ~p", [Req]),
@ -500,7 +519,10 @@ check_or_make_tagged_csum(OtherTag, _ClientCsum, _Data) ->
Filename :: string(),
CsumTable :: machi_csum_table:table(),
Offset :: non_neg_integer(),
Size :: non_neg_integer()
Size :: non_neg_integer(),
NoChecksum :: boolean(),
NoChunk :: boolean(),
NeedsTrimmed :: boolean()
) -> {ok, Chunks :: [{string(), Offset::non_neg_integer(), binary(), Csum :: binary()}]} |
{error, bad_checksum} |
{error, partial_read} |
@ -519,6 +541,9 @@ check_or_make_tagged_csum(OtherTag, _ClientCsum, _Data) ->
% tuple is returned.</ul>
% </li>
%
do_read(FHd, Filename, CsumTable, Offset, Size, _, _, _) ->
do_read(FHd, Filename, CsumTable, Offset, Size).
do_read(FHd, Filename, CsumTable, Offset, Size) ->
%% Note that find/3 only returns overlapping chunks, both borders
%% are not aligned to original Offset and Size.
@ -526,7 +551,8 @@ do_read(FHd, Filename, CsumTable, Offset, Size) ->
read_all_ranges(FHd, Filename, ChunkCsums, []).
read_all_ranges(_, _, [], ReadChunks) ->
{ok, lists:reverse(ReadChunks)};
%% TODO: currently returns empty list of trimmed chunks
{ok, {lists:reverse(ReadChunks), []}};
read_all_ranges(FHd, Filename, [{Offset, Size, TaggedCsum}|T], ReadChunks) ->
case file:pread(FHd, Offset, Size) of
@ -592,12 +618,12 @@ handle_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Data) ->
{error, Reason}
end;
[{Offset, Size, TaggedCsum}] ->
case do_read(FHd, Filename, CsumTable, Offset, Size) of
case do_read(FHd, Filename, CsumTable, Offset, Size, false, false, false) of
{error, _} = E ->
lager:warning("This should never happen: got ~p while reading at offset ~p in file ~p that's supposedly written",
[E, Offset, Filename]),
{error, server_insanity};
{ok, [{_, Offset, Data, TaggedCsum}]} ->
{ok, {[{_, Offset, Data, TaggedCsum}], _}} ->
%% TODO: what if different checksum got from do_read()?
ok;
{ok, _Other} ->
@ -632,7 +658,6 @@ do_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Size, Data) ->
ok = machi_csum_table:write(CsumTable, Offset, Size, TaggedCsum),
lager:debug("Successful write to checksum file for ~p",
[Filename]),
%% io:format(user, "here, heh ~p~n", [?LINE]),
ok;
Other ->
lager:error("Got ~p during write to file ~p at offset ~p, length ~p",

View file

@ -450,9 +450,9 @@ do_pb_hl_request2({high_write_chunk, File, Offset, ChunkBin, TaggedCSum},
Chunk = {TaggedCSum, ChunkBin},
Res = machi_cr_client:write_chunk(Clnt, File, Offset, Chunk),
{Res, S};
do_pb_hl_request2({high_read_chunk, File, Offset, Size},
do_pb_hl_request2({high_read_chunk, File, Offset, Size, Opts},
#state{high_clnt=Clnt}=S) ->
Res = machi_cr_client:read_chunk(Clnt, File, Offset, Size),
Res = machi_cr_client:read_chunk(Clnt, File, Offset, Size, Opts),
{Res, S};
do_pb_hl_request2({high_trim_chunk, File, Offset, Size},
#state{high_clnt=Clnt}=S) ->
@ -548,13 +548,13 @@ do_server_write_chunk(File, Offset, Chunk, CSum_tag, CSum, #state{flu_name=FluNa
{error, bad_arg}
end.
do_server_read_chunk(File, Offset, Size, _Opts, #state{flu_name=FluName})->
do_server_read_chunk(File, Offset, Size, Opts, #state{flu_name=FluName})->
%% TODO: Look inside Opts someday.
case sanitize_file_string(File) of
ok ->
{ok, Pid} = machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}),
case machi_file_proxy:read(Pid, Offset, Size) of
{ok, Chunks} -> {ok, Chunks};
case machi_file_proxy:read(Pid, Offset, Size, Opts) of
{ok, ChunksAndTrimmed} -> {ok, ChunksAndTrimmed};
Other -> Other
end;
_ ->

View file

@ -56,7 +56,7 @@
%% File API
append_chunk/4, append_chunk/5,
append_chunk_extra/5, append_chunk_extra/6,
read_chunk/5, read_chunk/6,
read_chunk/6, read_chunk/7,
checksum_list/3, checksum_list/4,
list_files/2, list_files/3,
wedge_status/1, wedge_status/2,
@ -144,26 +144,28 @@ append_chunk_extra(Host, TcpPort, EpochID, Prefix, Chunk, ChunkExtra)
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
-spec read_chunk(port_wrap(), machi_dt:epoch_id(), machi_dt:file_name(), machi_dt:file_offset(), machi_dt:chunk_size()) ->
-spec read_chunk(port_wrap(), machi_dt:epoch_id(), machi_dt:file_name(), machi_dt:file_offset(), machi_dt:chunk_size(),
proplists:proplist()) ->
{ok, machi_dt:chunk_s()} |
{error, machi_dt:error_general() | 'not_written' | 'partial_read'} |
{error, term()}.
read_chunk(Sock, EpochID, File, Offset, Size)
read_chunk(Sock, EpochID, File, Offset, Size, Opts)
when Offset >= ?MINIMUM_OFFSET, Size >= 0 ->
read_chunk2(Sock, EpochID, File, Offset, Size).
read_chunk2(Sock, EpochID, File, Offset, Size, Opts).
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
-spec read_chunk(machi_dt:inet_host(), machi_dt:inet_port(), machi_dt:epoch_id(),
machi_dt:file_name(), machi_dt:file_offset(), machi_dt:chunk_size()) ->
machi_dt:file_name(), machi_dt:file_offset(), machi_dt:chunk_size(),
proplists:proplist()) ->
{ok, machi_dt:chunk_s()} |
{error, machi_dt:error_general() | 'not_written' | 'partial_read'} |
{error, term()}.
read_chunk(Host, TcpPort, EpochID, File, Offset, Size)
read_chunk(Host, TcpPort, EpochID, File, Offset, Size, Opts)
when Offset >= ?MINIMUM_OFFSET, Size >= 0 ->
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
try
read_chunk2(Sock, EpochID, File, Offset, Size)
read_chunk2(Sock, EpochID, File, Offset, Size, Opts)
after
disconnect(Sock)
end.
@ -516,12 +518,12 @@ trunc_hack(Host, TcpPort, EpochID, File) when is_integer(TcpPort) ->
%%%%%%%%%%%%%%%%%%%%%%%%%%%
read_chunk2(Sock, EpochID, File0, Offset, Size) ->
read_chunk2(Sock, EpochID, File0, Offset, Size, Opts) ->
ReqID = <<"id">>,
File = machi_util:make_binary(File0),
Req = machi_pb_translate:to_pb_request(
ReqID,
{low_read_chunk, EpochID, File, Offset, Size, []}),
{low_read_chunk, EpochID, File, Offset, Size, Opts}),
do_pb_request_common(Sock, ReqID, Req).
append_chunk2(Sock, EpochID, Prefix0, Chunk0, ChunkExtra) ->

View file

@ -40,7 +40,7 @@
auth/3, auth/4,
append_chunk/6, append_chunk/7,
write_chunk/5, write_chunk/6,
read_chunk/4, read_chunk/5,
read_chunk/5, read_chunk/6,
trim_chunk/4, trim_chunk/5,
checksum_list/2, checksum_list/3,
list_files/1, list_files/2
@ -94,11 +94,18 @@ write_chunk(PidSpec, File, Offset, Chunk, CSum) ->
write_chunk(PidSpec, File, Offset, Chunk, CSum, Timeout) ->
send_sync(PidSpec, {write_chunk, File, Offset, Chunk, CSum}, Timeout).
read_chunk(PidSpec, File, Offset, Size) ->
read_chunk(PidSpec, File, Offset, Size, ?DEFAULT_TIMEOUT).
-spec read_chunk(pid(), string(), pos_integer(), pos_integer(),
[{flag_no_checksum | flag_no_chunk | needs_trimmed, boolean()}]) ->
{ok, {list(), list()}} | {error, term()}.
read_chunk(PidSpec, File, Offset, Size, Options) ->
read_chunk(PidSpec, File, Offset, Size, Options, ?DEFAULT_TIMEOUT).
read_chunk(PidSpec, File, Offset, Size, Timeout) ->
send_sync(PidSpec, {read_chunk, File, Offset, Size}, Timeout).
-spec read_chunk(pid(), string(), pos_integer(), pos_integer(),
[{no_checksum | no_chunk | needs_trimmed, boolean()}],
pos_integer()) ->
{ok, {list(), list()}} | {error, term()}.
read_chunk(PidSpec, File, Offset, Size, Options, Timeout) ->
send_sync(PidSpec, {read_chunk, File, Offset, Size, Options}, Timeout).
trim_chunk(PidSpec, File, Offset, Size) ->
trim_chunk(PidSpec, File, Offset, Size, ?DEFAULT_TIMEOUT).
@ -282,13 +289,19 @@ do_send_sync2({write_chunk, File, Offset, Chunk, CSum},
Res = {bummer, {X, Y, erlang:get_stacktrace()}},
{Res, S#state{count=Count+1}}
end;
do_send_sync2({read_chunk, File, Offset, Size},
do_send_sync2({read_chunk, File, Offset, Size, Options},
#state{sock=Sock, sock_id=Index, count=Count}=S) ->
try
ReqID = <<Index:64/big, Count:64/big>>,
FlagNoChecksum = proplists:get_value(no_checksum, Options, false),
FlagNoChunk = proplists:get_value(no_chunk, Options, false),
NeedsTrimmed = proplists:get_value(needs_trimmed, Options, false),
Req = #mpb_readchunkreq{chunk_pos=#mpb_chunkpos{file_name=File,
offset=Offset,
chunk_size=Size}},
chunk_size=Size},
flag_no_checksum=machi_util:bool2int(FlagNoChecksum),
flag_no_chunk=machi_util:bool2int(FlagNoChunk),
flag_needs_trimmed=machi_util:bool2int(NeedsTrimmed)},
R1a = #mpb_request{req_id=ReqID, do_not_alter=1,
read_chunk=Req},
Bin1a = machi_pb:encode_mpb_request(R1a),
@ -416,7 +429,7 @@ convert_write_chunk_resp(#mpb_writechunkresp{status='OK'}) ->
convert_write_chunk_resp(#mpb_writechunkresp{status=Status}) ->
convert_general_status_code(Status).
convert_read_chunk_resp(#mpb_readchunkresp{status='OK', chunks=PB_Chunks}) ->
convert_read_chunk_resp(#mpb_readchunkresp{status='OK', chunks=PB_Chunks, trimmed=PB_Trimmed}) ->
Chunks = lists:map(fun(#mpb_chunk{offset=Offset,
file_name=File,
chunk=Chunk,
@ -425,7 +438,12 @@ convert_read_chunk_resp(#mpb_readchunkresp{status='OK', chunks=PB_Chunks}) ->
Csum = <<(machi_pb_translate:conv_to_csum_tag(T)):8, Ck/binary>>,
{File, Offset, Chunk, Csum}
end, PB_Chunks),
{ok, Chunks};
Trimmed = lists:map(fun(#mpb_chunkpos{file_name=File,
offset=Offset,
chunk_size=Size}) ->
{File, Offset, Size}
end, PB_Trimmed),
{ok, {Chunks, Trimmed}};
convert_read_chunk_resp(#mpb_readchunkresp{status=Status}) ->
convert_general_status_code(Status).

View file

@ -75,13 +75,15 @@ from_pb_request(#mpb_ll_request{
from_pb_request(#mpb_ll_request{
req_id=ReqID,
read_chunk=#mpb_ll_readchunkreq{
epoch_id=PB_EpochID,
chunk_pos=ChunkPos,
flag_no_checksum=PB_GetNoChecksum,
flag_no_chunk=PB_GetNoChunk}}) ->
epoch_id=PB_EpochID,
chunk_pos=ChunkPos,
flag_no_checksum=PB_GetNoChecksum,
flag_no_chunk=PB_GetNoChunk,
flag_needs_trimmed=PB_NeedsTrimmed}}) ->
EpochID = conv_to_epoch_id(PB_EpochID),
Opts = [{no_checksum, conv_to_boolean(PB_GetNoChecksum)},
{no_chunk, conv_to_boolean(PB_GetNoChunk)}],
{no_chunk, conv_to_boolean(PB_GetNoChunk)},
{needs_trimmed, conv_to_boolean(PB_NeedsTrimmed)}],
#mpb_chunkpos{file_name=File,
offset=Offset,
chunk_size=Size} = ChunkPos,
@ -177,8 +179,15 @@ from_pb_request(#mpb_request{req_id=ReqID,
read_chunk=IR=#mpb_readchunkreq{}}) ->
#mpb_readchunkreq{chunk_pos=#mpb_chunkpos{file_name=File,
offset=Offset,
chunk_size=Size}} = IR,
{ReqID, {high_read_chunk, File, Offset, Size}};
chunk_size=Size},
flag_no_checksum=FlagNoChecksum,
flag_no_chunk=FlagNoChunk,
flag_needs_trimmed=NeedsTrimmed} = IR,
%% I want MAPS
Options = [{no_checksum, machi_util:int2bool(FlagNoChecksum)},
{no_chunk, machi_util:int2bool(FlagNoChunk)},
{needs_trimmed, machi_util:int2bool(NeedsTrimmed)}],
{ReqID, {high_read_chunk, File, Offset, Size, Options}};
from_pb_request(#mpb_request{req_id=ReqID,
trim_chunk=IR=#mpb_trimchunkreq{}}) ->
#mpb_trimchunkreq{chunk_pos=#mpb_chunkpos{file_name=File,
@ -233,7 +242,8 @@ from_pb_response(#mpb_ll_response{
from_pb_response(#mpb_ll_response{
req_id=ReqID,
read_chunk=#mpb_ll_readchunkresp{status=Status,
chunks=PB_Chunks}}) ->
chunks=PB_Chunks,
trimmed=PB_Trimmed}}) ->
case Status of
'OK' ->
Chunks = lists:map(fun(#mpb_chunk{file_name=File,
@ -243,7 +253,12 @@ from_pb_response(#mpb_ll_response{
Csum = <<(conv_to_csum_tag(T)):8, Ck/binary>>,
{File, Offset, Bytes, Csum}
end, PB_Chunks),
{ReqID, {ok, Chunks}};
Trimmed = lists:map(fun(#mpb_chunkpos{file_name=File,
offset=Offset,
chunk_size=Size}) ->
{File, Offset, Size}
end, PB_Trimmed),
{ReqID, {ok, {Chunks, Trimmed}}};
_ ->
{ReqID, machi_pb_high_client:convert_general_status_code(Status)}
end;
@ -382,17 +397,23 @@ to_pb_request(ReqID, {low_write_chunk, EpochID, File, Offset, Chunk, CSum_tag, C
offset=Offset,
chunk=Chunk,
csum=PB_CSum}}};
to_pb_request(ReqID, {low_read_chunk, EpochID, File, Offset, Size, _Opts}) ->
to_pb_request(ReqID, {low_read_chunk, EpochID, File, Offset, Size, Opts}) ->
%% TODO: stop ignoring Opts ^_^
PB_EpochID = conv_from_epoch_id(EpochID),
FNChecksum = proplists:get_value(no_checksum, Opts, false),
FNChunk = proplists:get_value(no_chunk, Opts, false),
NeedsTrimmed = proplists:get_value(needs_merge, Opts, false),
#mpb_ll_request{
req_id=ReqID, do_not_alter=2,
read_chunk=#mpb_ll_readchunkreq{
epoch_id=PB_EpochID,
chunk_pos=#mpb_chunkpos{
file_name=File,
offset=Offset,
chunk_size=Size}}};
epoch_id=PB_EpochID,
chunk_pos=#mpb_chunkpos{
file_name=File,
offset=Offset,
chunk_size=Size},
flag_no_checksum=machi_util:bool2int(FNChecksum),
flag_no_chunk=machi_util:bool2int(FNChunk),
flag_needs_trimmed=machi_util:bool2int(NeedsTrimmed)}};
to_pb_request(ReqID, {low_checksum_list, EpochID, File}) ->
PB_EpochID = conv_from_epoch_id(EpochID),
#mpb_ll_request{req_id=ReqID, do_not_alter=2,
@ -478,7 +499,7 @@ to_pb_response(ReqID, {low_write_chunk, _EID, _Fl, _Off, _Ch, _CST, _CS},Resp)->
write_chunk=#mpb_ll_writechunkresp{status=Status}};
to_pb_response(ReqID, {low_read_chunk, _EID, _Fl, _Off, _Sz, _Opts}, Resp)->
case Resp of
{ok, Chunks} ->
{ok, {Chunks, Trimmed}} ->
PB_Chunks = lists:map(fun({File, Offset, Bytes, Csum}) ->
{Tag, Ck} = machi_util:unmake_tagged_csum(Csum),
#mpb_chunk{file_name=File,
@ -487,9 +508,15 @@ to_pb_response(ReqID, {low_read_chunk, _EID, _Fl, _Off, _Sz, _Opts}, Resp)->
csum=#mpb_chunkcsum{type=conv_from_csum_tag(Tag),
csum=Ck}}
end, Chunks),
PB_Trimmed = lists:map(fun({File, Offset, Size}) ->
#mpb_chunkpos{file_name=File,
offset=Offset,
chunk_size=Size}
end, Trimmed),
#mpb_ll_response{req_id=ReqID,
read_chunk=#mpb_ll_readchunkresp{status='OK',
chunks=PB_Chunks}};
chunks=PB_Chunks,
trimmed=PB_Trimmed}};
{error, _}=Error ->
Status = conv_from_status(Error),
#mpb_ll_response{req_id=ReqID,
@ -654,10 +681,10 @@ to_pb_response(ReqID, {high_write_chunk, _File, _Offset, _Chunk, _TaggedCSum}, R
_Else ->
make_error_resp(ReqID, 66, io_lib:format("err ~p", [_Else]))
end;
to_pb_response(ReqID, {high_read_chunk, _File, _Offset, _Size}, Resp) ->
to_pb_response(ReqID, {high_read_chunk, _File, _Offset, _Size, _}, Resp) ->
case Resp of
{ok, Chunks} ->
MpbChunks = lists:map(fun({File, Offset, Bytes, Csum}) ->
{ok, {Chunks, Trimmed}} ->
PB_Chunks = lists:map(fun({File, Offset, Bytes, Csum}) ->
{Tag, Ck} = machi_util:unmake_tagged_csum(Csum),
#mpb_chunk{
offset=Offset,
@ -665,9 +692,15 @@ to_pb_response(ReqID, {high_read_chunk, _File, _Offset, _Size}, Resp) ->
chunk=Bytes,
csum=#mpb_chunkcsum{type=conv_from_csum_tag(Tag), csum=Ck}}
end, Chunks),
PB_Trimmed = lists:map(fun({File, Offset, Size}) ->
#mpb_chunkpos{file_name=File,
offset=Offset,
chunk_size=Size}
end, Trimmed),
#mpb_response{req_id=ReqID,
read_chunk=#mpb_readchunkresp{status='OK',
chunks=MpbChunks}};
chunks=PB_Chunks,
trimmed=PB_Trimmed}};
{error, _}=Error ->
Status = conv_from_status(Error),
#mpb_response{req_id=ReqID,

View file

@ -59,7 +59,7 @@
%% File API
append_chunk/4, append_chunk/5,
append_chunk_extra/5, append_chunk_extra/6,
read_chunk/5, read_chunk/6,
read_chunk/6, read_chunk/7,
checksum_list/3, checksum_list/4,
list_files/2, list_files/3,
wedge_status/1, wedge_status/2,
@ -130,13 +130,13 @@ append_chunk_extra(PidSpec, EpochID, Prefix, Chunk, ChunkExtra, Timeout) ->
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
read_chunk(PidSpec, EpochID, File, Offset, Size) ->
read_chunk(PidSpec, EpochID, File, Offset, Size, infinity).
read_chunk(PidSpec, EpochID, File, Offset, Size, Opts) ->
read_chunk(PidSpec, EpochID, File, Offset, Size, Opts, infinity).
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
read_chunk(PidSpec, EpochID, File, Offset, Size, Timeout) ->
gen_server:call(PidSpec, {req, {read_chunk, EpochID, File, Offset, Size}},
read_chunk(PidSpec, EpochID, File, Offset, Size, Opts, Timeout) ->
gen_server:call(PidSpec, {req, {read_chunk, EpochID, File, Offset, Size, Opts}},
Timeout).
%% @doc Fetch the list of chunk checksums for `File'.
@ -299,8 +299,8 @@ write_chunk(PidSpec, EpochID, File, Offset, Chunk, Timeout) ->
Timeout) of
{error, written}=Err ->
Size = byte_size(Chunk),
case read_chunk(PidSpec, EpochID, File, Offset, Size, Timeout) of
{ok, Chunk2} when Chunk2 == Chunk ->
case read_chunk(PidSpec, EpochID, File, Offset, Size, [], Timeout) of
{ok, {[{File, Offset, Chunk2, _}], []}} when Chunk2 == Chunk ->
%% See equivalent comment inside write_projection().
ok;
_ ->
@ -377,9 +377,9 @@ make_req_fun({append_chunk, EpochID, Prefix, Chunk},
make_req_fun({append_chunk_extra, EpochID, Prefix, Chunk, ChunkExtra},
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
fun() -> Mod:append_chunk_extra(Sock, EpochID, Prefix, Chunk, ChunkExtra) end;
make_req_fun({read_chunk, EpochID, File, Offset, Size},
make_req_fun({read_chunk, EpochID, File, Offset, Size, Opts},
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
fun() -> Mod:read_chunk(Sock, EpochID, File, Offset, Size) end;
fun() -> Mod:read_chunk(Sock, EpochID, File, Offset, Size, Opts) end;
make_req_fun({write_chunk, EpochID, File, Offset, Chunk},
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
fun() -> Mod:write_chunk(Sock, EpochID, File, Offset, Chunk) end;

View file

@ -47,7 +47,9 @@
combinations/1, ordered_combinations/1,
mk_order/2,
%% Other
wait_for_death/2, wait_for_life/2
wait_for_death/2, wait_for_life/2,
bool2int/1,
int2bool/1
]).
-include("machi.hrl").
@ -390,3 +392,9 @@ mk_order(UPI2, Repair1) ->
error -> error
end || X <- UPI2],
UPI2_order.
%% C-style conversion for PB usage.
bool2int(true) -> 1;
bool2int(false) -> 0.
int2bool(0) -> false;
int2bool(I) when is_integer(I) -> true.

View file

@ -114,13 +114,14 @@ smoke_test2() ->
Chunk1_badcs = {<<?CSUM_TAG_CLIENT_SHA:8, 0:(8*20)>>, Chunk1},
{error, bad_checksum} =
machi_cr_client:append_chunk(C1, Prefix, Chunk1_badcs),
{ok, [{_, Off1, Chunk1, _}]} = machi_cr_client:read_chunk(C1, File1, Off1, Size1),
{ok, {[{_, Off1, Chunk1, _}], []}} =
machi_cr_client:read_chunk(C1, File1, Off1, Size1, []),
{ok, PPP} = machi_flu1_client:read_latest_projection(Host, PortBase+0,
private),
%% Verify that the client's CR wrote to all of them.
[{ok, [{_, Off1, Chunk1, _}]} =
[{ok, {[{_, Off1, Chunk1, _}], []}} =
machi_flu1_client:read_chunk(
Host, PortBase+X, EpochID, File1, Off1, Size1) ||
Host, PortBase+X, EpochID, File1, Off1, Size1, []) ||
X <- [0,1,2] ],
%% Test read repair: Manually write to head, then verify that
@ -128,13 +129,19 @@ smoke_test2() ->
FooOff1 = Off1 + (1024*1024),
[{error, not_written} = machi_flu1_client:read_chunk(
Host, PortBase+X, EpochID,
File1, FooOff1, Size1) || X <- [0,1,2] ],
File1, FooOff1, Size1, []) || X <- [0,1,2] ],
ok = machi_flu1_client:write_chunk(Host, PortBase+0, EpochID,
File1, FooOff1, Chunk1),
{ok, [{_, FooOff1, Chunk1, _}]} = machi_cr_client:read_chunk(C1, File1, FooOff1, Size1),
[{X,{ok, [{_, FooOff1, Chunk1, _}]}} = {X,machi_flu1_client:read_chunk(
{ok, {[{_, FooOff1, Chunk1, _}], []}} =
machi_flu1_client:read_chunk(Host, PortBase+0, EpochID,
File1, FooOff1, Size1, []),
{ok, {[{_, FooOff1, Chunk1, _}], []}} =
machi_cr_client:read_chunk(C1, File1, FooOff1, Size1, []),
[?assertMatch({X,{ok, {[{_, FooOff1, Chunk1, _}], []}}},
{X,machi_flu1_client:read_chunk(
Host, PortBase+X, EpochID,
File1, FooOff1, Size1)} || X <- [0,1,2] ],
File1, FooOff1, Size1, [])})
|| X <- [0,1,2] ],
%% Test read repair: Manually write to middle, then same checking.
FooOff2 = Off1 + (2*1024*1024),
@ -142,18 +149,19 @@ smoke_test2() ->
Size2 = size(Chunk2),
ok = machi_flu1_client:write_chunk(Host, PortBase+1, EpochID,
File1, FooOff2, Chunk2),
{ok, [{_, FooOff2, Chunk2, _}]} = machi_cr_client:read_chunk(C1, File1, FooOff2, Size2),
[{X,{ok, [{_, FooOff2, Chunk2, _}]}} = {X,machi_flu1_client:read_chunk(
Host, PortBase+X, EpochID,
File1, FooOff2, Size2)} || X <- [0,1,2] ],
{ok, {[{_, FooOff2, Chunk2, _}], []}} =
machi_cr_client:read_chunk(C1, File1, FooOff2, Size2, []),
[{X,{ok, {[{_, FooOff2, Chunk2, _}], []}}} =
{X,machi_flu1_client:read_chunk(
Host, PortBase+X, EpochID,
File1, FooOff2, Size2, [])} || X <- [0,1,2] ],
%% Misc API smoke & minor regression checks
{error, bad_arg} = machi_cr_client:read_chunk(C1, <<"no">>,
999999999, 1),
{ok, [{_,Off1,Chunk1,_},
{_,FooOff1,Chunk1,_},
{_,FooOff2,Chunk2,_}]} = machi_cr_client:read_chunk(C1, File1,
Off1, 88888888),
999999999, 1, []),
{ok, {[{_,Off1,Chunk1,_}, {_,FooOff1,Chunk1,_}, {_,FooOff2,Chunk2,_}],
[]}} =
machi_cr_client:read_chunk(C1, File1, Off1, 88888888, []),
%% Checksum list return value is a primitive binary().
{ok, KludgeBin} = machi_cr_client:checksum_list(C1, File1),
true = is_binary(KludgeBin),
@ -169,8 +177,8 @@ smoke_test2() ->
{ok, {Off10,Size10,File10}} =
machi_cr_client:append_chunk_extra(C1, Prefix, Chunk10,
Extra10 * Size10),
{ok, [{_, Off10, Chunk10, _}]} =
machi_cr_client:read_chunk(C1, File10, Off10, Size10),
{ok, {[{_, Off10, Chunk10, _}], []}} =
machi_cr_client:read_chunk(C1, File10, Off10, Size10, []),
[begin
Offx = Off10 + (Seq * Size10),
%% TODO: uncomment written/not_written enforcement is available.
@ -178,8 +186,8 @@ smoke_test2() ->
%% Offx, Size10),
{ok, {Offx,Size10,File10}} =
machi_cr_client:write_chunk(C1, File10, Offx, Chunk10),
{ok, [{_, Offx, Chunk10, _}]} =
machi_cr_client:read_chunk(C1, File10, Offx, Size10)
{ok, {[{_, Offx, Chunk10, _}], []}} =
machi_cr_client:read_chunk(C1, File10, Offx, Size10, [])
end || Seq <- lists:seq(1, Extra10)],
{ok, {Off11,Size11,File11}} =
machi_cr_client:append_chunk(C1, Prefix, Chunk10),
@ -222,7 +230,8 @@ witness_smoke_test2() ->
Chunk1_badcs = {<<?CSUM_TAG_CLIENT_SHA:8, 0:(8*20)>>, Chunk1},
{error, bad_checksum} =
machi_cr_client:append_chunk(C1, Prefix, Chunk1_badcs),
{ok, [{_, Off1, Chunk1, _}]} = machi_cr_client:read_chunk(C1, File1, Off1, Size1),
{ok, {[{_, Off1, Chunk1, _}], []}} =
machi_cr_client:read_chunk(C1, File1, Off1, Size1, []),
%% Stop 'b' and let the chain reset.
ok = machi_flu_psup:stop_flu_package(b),
@ -248,8 +257,8 @@ witness_smoke_test2() ->
end,
%% Chunk1 is still readable: not affected by wedged witness head.
{ok, [{_, Off1, Chunk1, _}]} =
machi_cr_client:read_chunk(C1, File1, Off1, Size1),
{ok, {[{_, Off1, Chunk1, _}], []}} =
machi_cr_client:read_chunk(C1, File1, Off1, Size1, []),
%% But because the head is wedged, an append will fail.
{error, partition} =
machi_cr_client:append_chunk(C1, Prefix, Chunk1, 1*1000),

View file

@ -202,8 +202,9 @@ read_next(S, _Res, _Args) -> S.
read(Pid, Offset, Length) ->
case machi_file_proxy:read(Pid, Offset, Length) of
{ok, Chunks} ->
[{_, Offset, Data, Csum}] = machi_cr_client:trim_both_side(Chunks, Offset, Offset+Length),
{ok, {Chunks, _}} ->
[{_, Offset, Data, Csum}] =
machi_cr_client:trim_both_side(Chunks, Offset, Offset+Length),
{ok, Data, Csum};
E ->
E

View file

@ -80,22 +80,22 @@ machi_file_proxy_test_() ->
clean_up_data_dir(?TESTDIR),
{ok, Pid} = machi_file_proxy:start_link("test", ?TESTDIR),
[
?_assertEqual({error, bad_arg}, machi_file_proxy:read(Pid, -1, -1)),
?_assertEqual({error, bad_arg}, machi_file_proxy:write(Pid, -1, <<"yo">>)),
?_assertEqual({error, bad_arg}, machi_file_proxy:append(Pid, [], -1, <<"krep">>)),
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1, 1)),
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1024, 1)),
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1, 1024)),
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1024, ?HYOOGE)),
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, ?HYOOGE, 1)),
?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1, random_binary(0, ?HYOOGE))),
?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, random_binary(0, 1024))),
?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1024, <<"fail">>)),
?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1, <<"fail">>)),
?_assertMatch({ok, [{_, _, _, _}]}, machi_file_proxy:read(Pid, 1025, 1000)),
?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, [], 1024, <<"mind the gap">>)),
?_assertEqual(ok, machi_file_proxy:write(Pid, 2060, [], random_binary(0, 1024))),
?_assertException(exit, {normal, _}, machi_file_proxy:stop(Pid))
?_assertEqual({error, bad_arg}, machi_file_proxy:read(Pid, -1, -1)),
?_assertEqual({error, bad_arg}, machi_file_proxy:write(Pid, -1, <<"yo">>)),
?_assertEqual({error, bad_arg}, machi_file_proxy:append(Pid, [], -1, <<"krep">>)),
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1, 1)),
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1024, 1)),
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1, 1024)),
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1024, ?HYOOGE)),
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, ?HYOOGE, 1)),
?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1, random_binary(0, ?HYOOGE))),
?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, random_binary(0, 1024))),
?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1024, <<"fail">>)),
?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1, <<"fail">>)),
?_assertMatch({ok, {[{_, _, _, _}], []}}, machi_file_proxy:read(Pid, 1025, 1000)),
?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, [], 1024, <<"mind the gap">>)),
?_assertEqual(ok, machi_file_proxy:write(Pid, 2060, [], random_binary(0, 1024))),
?_assertException(exit, {normal, _}, machi_file_proxy:stop(Pid))
].
multiple_chunks_read_test_() ->
@ -108,11 +108,11 @@ multiple_chunks_read_test_() ->
?_assertEqual(ok, machi_file_proxy:write(Pid, 30000, <<"fail">>)),
%% Freeza
?_assertEqual(ok, machi_file_proxy:write(Pid, 530000, <<"fail">>)),
?_assertMatch({ok, [{"test", 1024, _, _},
{"test", 10000, <<"fail">>, _},
{"test", 20000, <<"fail">>, _},
{"test", 30000, <<"fail">>, _},
{"test", 530000, <<"fail">>, _}]},
?_assertMatch({ok, {[{"test", 1024, _, _},
{"test", 10000, <<"fail">>, _},
{"test", 20000, <<"fail">>, _},
{"test", 30000, <<"fail">>, _},
{"test", 530000, <<"fail">>, _}], []}},
machi_file_proxy:read(Pid, 1024, 530000)),
?_assertException(exit, {normal, _}, machi_file_proxy:stop(Pid))
].

View file

@ -97,8 +97,8 @@ flu_smoke_test() ->
{ok, {Off1,Len1,File1}} = ?FLU_C:append_chunk(Host, TcpPort,
?DUMMY_PV1_EPOCH,
Prefix, Chunk1),
{ok, [{_, Off1, Chunk1, _}]} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH,
File1, Off1, Len1),
{ok, {[{_, Off1, Chunk1, _}], _}} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH,
File1, Off1, Len1, []),
{ok, KludgeBin} = ?FLU_C:checksum_list(Host, TcpPort,
?DUMMY_PV1_EPOCH, File1),
true = is_binary(KludgeBin),
@ -109,7 +109,7 @@ flu_smoke_test() ->
Len1 = size(Chunk1),
{error, not_written} = ?FLU_C:read_chunk(Host, TcpPort,
?DUMMY_PV1_EPOCH,
File1, Off1*983829323, Len1),
File1, Off1*983829323, Len1, []),
%% XXX FIXME
%%
%% This is failing because the read extends past the end of the file.
@ -151,14 +151,14 @@ flu_smoke_test() ->
File2, Off2, Chunk2),
{error, bad_arg} = ?FLU_C:write_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH,
BadFile, Off2, Chunk2),
{ok, [{_, Off2, Chunk2, _}]} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH,
File2, Off2, Len2),
{ok, {[{_, Off2, Chunk2, _}], _}} =
?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH, File2, Off2, Len2, []),
{error, bad_arg} = ?FLU_C:read_chunk(Host, TcpPort,
?DUMMY_PV1_EPOCH,
"no!!", Off2, Len2),
"no!!", Off2, Len2, []),
{error, bad_arg} = ?FLU_C:read_chunk(Host, TcpPort,
?DUMMY_PV1_EPOCH,
BadFile, Off2, Len2),
BadFile, Off2, Len2, []),
%% We know that File1 still exists. Pretend that we've done a
%% migration and exercise the delete_migration() API.
@ -261,7 +261,7 @@ witness_test() ->
Prefix, Chunk1),
File = <<"foofile">>,
{error, bad_arg} = ?FLU_C:read_chunk(Host, TcpPort, EpochID1,
File, 9999, 9999),
File, 9999, 9999, []),
{error, bad_arg} = ?FLU_C:checksum_list(Host, TcpPort, EpochID1,
File),
{error, bad_arg} = ?FLU_C:list_files(Host, TcpPort, EpochID1),

View file

@ -146,7 +146,7 @@ partial_stop_restart2() ->
{_, #p_srvr{address=Addr_a, port=TcpPort_a}} = hd(Ps),
{error, wedged} = machi_flu1_client:read_chunk(
Addr_a, TcpPort_a, ?DUMMY_PV1_EPOCH,
<<>>, 99999999, 1),
<<>>, 99999999, 1, []),
{error, wedged} = machi_flu1_client:checksum_list(
Addr_a, TcpPort_a, ?DUMMY_PV1_EPOCH, <<>>),
%% list_files() is permitted despite wedged status

View file

@ -81,8 +81,8 @@ smoke_test2() ->
{iolist_to_binary(Chunk3), File3, Off3, Size3}],
[begin
File = binary_to_list(Fl),
?assertMatch({ok, [{File, Off, Ch, _}]},
?C:read_chunk(Clnt, Fl, Off, Sz))
?assertMatch({ok, {[{File, Off, Ch, _}], []}},
?C:read_chunk(Clnt, Fl, Off, Sz, []))
end || {Ch, Fl, Off, Sz} <- Reads],
{ok, KludgeBin} = ?C:checksum_list(Clnt, File1),

View file

@ -60,14 +60,14 @@ api_smoke_test() ->
{ok, {MyOff,MySize,MyFile}} =
?MUT:append_chunk(Prox1, FakeEpoch, Prefix, MyChunk,
infinity),
{ok, [{_, MyOff, MyChunk, _}]} =
?MUT:read_chunk(Prox1, FakeEpoch, MyFile, MyOff, MySize),
{ok, {[{_, MyOff, MyChunk, _}], []}} =
?MUT:read_chunk(Prox1, FakeEpoch, MyFile, MyOff, MySize, []),
MyChunk2 = <<"my chunk data, yeah, again">>,
{ok, {MyOff2,MySize2,MyFile2}} =
?MUT:append_chunk_extra(Prox1, FakeEpoch, Prefix,
MyChunk2, 4242, infinity),
{ok, [{_, MyOff2, MyChunk2, _}]} =
?MUT:read_chunk(Prox1, FakeEpoch, MyFile2, MyOff2, MySize2),
{ok, {[{_, MyOff2, MyChunk2, _}], []}} =
?MUT:read_chunk(Prox1, FakeEpoch, MyFile2, MyOff2, MySize2, []),
MyChunk_badcs = {<<?CSUM_TAG_CLIENT_SHA:8, 0:(8*20)>>, MyChunk},
{error, bad_checksum} = ?MUT:append_chunk(Prox1, FakeEpoch,
Prefix, MyChunk_badcs),
@ -245,13 +245,13 @@ flu_restart_test2() ->
(stop) -> ?MUT:append_chunk_extra(Prox1, FakeEpoch,
<<"prefix">>, Data, 42, infinity)
end,
fun(run) -> {ok, [{_, Off1, Data, _}]} =
fun(run) -> {ok, {[{_, Off1, Data, _}], []}} =
?MUT:read_chunk(Prox1, FakeEpoch,
File1, Off1, Size1),
File1, Off1, Size1, []),
ok;
(line) -> io:format("line ~p, ", [?LINE]);
(stop) -> ?MUT:read_chunk(Prox1, FakeEpoch,
File1, Off1, Size1)
File1, Off1, Size1, [])
end,
fun(run) -> {ok, KludgeBin} =
?MUT:checksum_list(Prox1, FakeEpoch, File1),