Update read_chunk() PB protocol to return trimmed chunks #25

Merged
kuenishi merged 1 commit from ku/trim-pb-protocol-2 into master 2015-10-23 08:10:21 +00:00
18 changed files with 285 additions and 178 deletions

View file

@ -205,11 +205,15 @@ message Mpb_ReadChunkReq {
// only makes sense if flag_no_checksum is not set). // only makes sense if flag_no_checksum is not set).
// TODO: not implemented yet. // TODO: not implemented yet.
optional uint32 flag_no_chunk = 3 [default=0]; optional uint32 flag_no_chunk = 3 [default=0];
// TODO: not implemented yet.
optional uint32 flag_needs_trimmed = 4 [default=0];
} }
message Mpb_ReadChunkResp { message Mpb_ReadChunkResp {
required Mpb_GeneralStatusCode status = 1; required Mpb_GeneralStatusCode status = 1;
repeated Mpb_Chunk chunks = 2; repeated Mpb_Chunk chunks = 2;
repeated Mpb_ChunkPos trimmed = 3;
} }
// High level API: trim_chunk() request & response // High level API: trim_chunk() request & response
@ -410,11 +414,14 @@ message Mpb_LL_ReadChunkReq {
// only makes sense if flag_checksum is not set). // only makes sense if flag_checksum is not set).
// TODO: not implemented yet. // TODO: not implemented yet.
optional uint32 flag_no_chunk = 4 [default=0]; optional uint32 flag_no_chunk = 4 [default=0];
optional uint32 flag_needs_trimmed = 5 [default=0];
} }
message Mpb_LL_ReadChunkResp { message Mpb_LL_ReadChunkResp {
required Mpb_GeneralStatusCode status = 1; required Mpb_GeneralStatusCode status = 1;
repeated Mpb_Chunk chunks = 2; repeated Mpb_Chunk chunks = 2;
repeated Mpb_ChunkPos trimmed = 3;
} }
// Low level API: checksum_list() // Low level API: checksum_list()

View file

@ -76,7 +76,7 @@ verify_file_checksums_local2(Sock1, EpochID, Path0) ->
ReadChunk = fun(F, Offset, Size) -> ReadChunk = fun(F, Offset, Size) ->
case file:pread(FH, Offset, Size) of case file:pread(FH, Offset, Size) of
{ok, Bin} -> {ok, Bin} ->
{ok, [{F, Offset, Bin, undefined}]}; {ok, {[{F, Offset, Bin, undefined}], []}};
Err -> Err ->
Err Err
end end
@ -92,7 +92,7 @@ verify_file_checksums_local2(Sock1, EpochID, Path0) ->
verify_file_checksums_remote2(Sock1, EpochID, File) -> verify_file_checksums_remote2(Sock1, EpochID, File) ->
ReadChunk = fun(File_name, Offset, Size) -> ReadChunk = fun(File_name, Offset, Size) ->
?FLU_C:read_chunk(Sock1, EpochID, ?FLU_C:read_chunk(Sock1, EpochID,
File_name, Offset, Size) File_name, Offset, Size, [])
end, end,
verify_file_checksums_common(Sock1, EpochID, File, ReadChunk). verify_file_checksums_common(Sock1, EpochID, File, ReadChunk).
@ -117,7 +117,7 @@ verify_file_checksums_common(Sock1, EpochID, File, ReadChunk) ->
verify_chunk_checksum(File, ReadChunk) -> verify_chunk_checksum(File, ReadChunk) ->
fun({Offset, Size, <<_Tag:1/binary, CSum/binary>>}, Acc) -> fun({Offset, Size, <<_Tag:1/binary, CSum/binary>>}, Acc) ->
case ReadChunk(File, Offset, Size) of case ReadChunk(File, Offset, Size) of
{ok, [{_, Offset, Chunk, _}]} -> {ok, {[{_, Offset, Chunk, _}], _}} ->
CSum2 = machi_util:checksum_chunk(Chunk), CSum2 = machi_util:checksum_chunk(Chunk),
if CSum == CSum2 -> if CSum == CSum2 ->
Acc; Acc;

View file

@ -324,7 +324,8 @@ execute_repair_directive({File, Cmds}, {ProxiesDict, EpochID, Verb, ETS}=Acc) ->
_ -> ok _ -> ok
end, end,
_T1 = os:timestamp(), _T1 = os:timestamp(),
{ok, [{_, Offset, Chunk, _}]} = %% TODO: support case multiple written or trimmed chunks returned
{ok, {[{_, Offset, Chunk, _}], _}} =
machi_proxy_flu1_client:read_chunk( machi_proxy_flu1_client:read_chunk(
SrcP, EpochID, File, Offset, Size, SrcP, EpochID, File, Offset, Size,
?SHORT_TIMEOUT), ?SHORT_TIMEOUT),

View file

@ -122,7 +122,7 @@
append_chunk/3, append_chunk/4, append_chunk/3, append_chunk/4,
append_chunk_extra/4, append_chunk_extra/5, append_chunk_extra/4, append_chunk_extra/5,
write_chunk/4, write_chunk/5, write_chunk/4, write_chunk/5,
read_chunk/4, read_chunk/5, read_chunk/5, read_chunk/6,
trim_chunk/4, trim_chunk/5, trim_chunk/4, trim_chunk/5,
checksum_list/2, checksum_list/3, checksum_list/2, checksum_list/3,
list_files/1, list_files/2, list_files/1, list_files/2,
@ -201,14 +201,14 @@ write_chunk(PidSpec, File, Offset, Chunk, Timeout0) ->
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'. %% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
read_chunk(PidSpec, File, Offset, Size) -> read_chunk(PidSpec, File, Offset, Size, Opts) ->
read_chunk(PidSpec, File, Offset, Size, ?DEFAULT_TIMEOUT). read_chunk(PidSpec, File, Offset, Size, Opts, ?DEFAULT_TIMEOUT).
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'. %% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
read_chunk(PidSpec, File, Offset, Size, Timeout0) -> read_chunk(PidSpec, File, Offset, Size, Opts, Timeout0) ->
{TO, Timeout} = timeout(Timeout0), {TO, Timeout} = timeout(Timeout0),
gen_server:call(PidSpec, {req, {read_chunk, File, Offset, Size, TO}}, gen_server:call(PidSpec, {req, {read_chunk, File, Offset, Size, Opts, TO}},
Timeout). Timeout).
%% @doc Trim a chunk of data of size `Size' from `File' at `Offset'. %% @doc Trim a chunk of data of size `Size' from `File' at `Offset'.
@ -288,8 +288,8 @@ handle_call2({append_chunk_extra, Prefix, Chunk, ChunkExtra, TO}, _From, S) ->
do_append_head(Prefix, Chunk, ChunkExtra, 0, os:timestamp(), TO, S); do_append_head(Prefix, Chunk, ChunkExtra, 0, os:timestamp(), TO, S);
handle_call2({write_chunk, File, Offset, Chunk, TO}, _From, S) -> handle_call2({write_chunk, File, Offset, Chunk, TO}, _From, S) ->
do_write_head(File, Offset, Chunk, 0, os:timestamp(), TO, S); do_write_head(File, Offset, Chunk, 0, os:timestamp(), TO, S);
handle_call2({read_chunk, File, Offset, Size, TO}, _From, S) -> handle_call2({read_chunk, File, Offset, Size, Opts, TO}, _From, S) ->
do_read_chunk(File, Offset, Size, 0, os:timestamp(), TO, S); do_read_chunk(File, Offset, Size, Opts, 0, os:timestamp(), TO, S);
handle_call2({trim_chunk, File, Offset, Size, TO}, _From, S) -> handle_call2({trim_chunk, File, Offset, Size, TO}, _From, S) ->
do_trim_chunk(File, Offset, Size, 0, os:timestamp(), TO, S); do_trim_chunk(File, Offset, Size, 0, os:timestamp(), TO, S);
handle_call2({checksum_list, File, TO}, _From, S) -> handle_call2({checksum_list, File, TO}, _From, S) ->
@ -503,11 +503,10 @@ do_write_head2(File, Offset, Chunk, Depth, STime, TO,
iolist_size(Chunk)}) iolist_size(Chunk)})
end. end.
do_read_chunk(File, Offset, Size, 0=Depth, STime, TO, do_read_chunk(File, Offset, Size, Opts, 0=Depth, STime, TO,
#state{proj=#projection_v1{upi=[_|_]}}=S) -> % UPI is non-empty #state{proj=#projection_v1{upi=[_|_]}}=S) -> % UPI is non-empty
do_read_chunk2(File, Offset, Size, Depth + 1, STime, TO, S); do_read_chunk2(File, Offset, Size, Opts, Depth + 1, STime, TO, S);
do_read_chunk(File, Offset, Size, Depth, STime, TO, #state{proj=P}=S) -> do_read_chunk(File, Offset, Size, Opts, Depth, STime, TO, #state{proj=P}=S) ->
%% io:format(user, "read sleep1,", []),
sleep_a_while(Depth), sleep_a_while(Depth),
DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, DiffMs = timer:now_diff(os:timestamp(), STime) div 1000,
if DiffMs > TO -> if DiffMs > TO ->
@ -517,22 +516,22 @@ do_read_chunk(File, Offset, Size, Depth, STime, TO, #state{proj=P}=S) ->
case S2#state.proj of case S2#state.proj of
P2 when P2 == undefined orelse P2 when P2 == undefined orelse
P2#projection_v1.upi == [] -> P2#projection_v1.upi == [] ->
do_read_chunk(File, Offset, Size, Depth + 1, STime, TO, S2); do_read_chunk(File, Offset, Size, Opts, Depth + 1, STime, TO, S2);
_ -> _ ->
do_read_chunk2(File, Offset, Size, Depth + 1, STime, TO, S2) do_read_chunk2(File, Offset, Size, Opts, Depth + 1, STime, TO, S2)
end end
end. end.
do_read_chunk2(File, Offset, Size, Depth, STime, TO, do_read_chunk2(File, Offset, Size, Opts, Depth, STime, TO,
#state{proj=P, epoch_id=EpochID, proxies_dict=PD}=S) -> #state{proj=P, epoch_id=EpochID, proxies_dict=PD}=S) ->
UPI = readonly_flus(P), UPI = readonly_flus(P),
Tail = lists:last(UPI), Tail = lists:last(UPI),
ConsistencyMode = P#projection_v1.mode, ConsistencyMode = P#projection_v1.mode,
case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), EpochID, case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), EpochID,
File, Offset, Size, ?TIMEOUT) of File, Offset, Size, Opts, ?TIMEOUT) of
{ok, Chunks0} when is_list(Chunks0) -> {ok, {Chunks0, []}} when is_list(Chunks0) ->
Chunks = trim_both_side(Chunks0, Offset, Offset + Size), Chunks = trim_both_side(Chunks0, Offset, Offset + Size),
{reply, {ok, Chunks}, S}; {reply, {ok, {Chunks, []}}, S};
%% {ok, BadChunk} -> %% {ok, BadChunk} ->
%% %% TODO cleaner handling of bad chunks %% %% TODO cleaner handling of bad chunks
%% exit({todo, bad_chunk_size, ?MODULE, ?LINE, File, Offset, Size, %% exit({todo, bad_chunk_size, ?MODULE, ?LINE, File, Offset, Size,
@ -546,7 +545,7 @@ do_read_chunk2(File, Offset, Size, Depth, STime, TO,
{reply, BadCS, S}; {reply, BadCS, S};
{error, Retry} {error, Retry}
when Retry == partition; Retry == bad_epoch; Retry == wedged -> when Retry == partition; Retry == bad_epoch; Retry == wedged ->
do_read_chunk(File, Offset, Size, Depth, STime, TO, S); do_read_chunk(File, Offset, Size, Opts, Depth, STime, TO, S);
{error, not_written} -> {error, not_written} ->
read_repair(ConsistencyMode, read, File, Offset, Size, Depth, STime, S); read_repair(ConsistencyMode, read, File, Offset, Size, Depth, STime, S);
%% {reply, {error, not_written}, S}; %% {reply, {error, not_written}, S};
@ -624,10 +623,12 @@ read_repair2(ap_mode=ConsistencyMode,
#state{proj=P}=S) -> #state{proj=P}=S) ->
Eligible = mutation_flus(P), Eligible = mutation_flus(P),
case try_to_find_chunk(Eligible, File, Offset, Size, S) of case try_to_find_chunk(Eligible, File, Offset, Size, S) of
{ok, Chunks, GotItFrom} when is_list(Chunks) -> {ok, {Chunks, _Trimmed}, GotItFrom} when is_list(Chunks) ->
ToRepair = mutation_flus(P) -- [GotItFrom], ToRepair = mutation_flus(P) -- [GotItFrom],
{Reply, S1} = do_repair_chunks(Chunks, ToRepair, ReturnMode, [GotItFrom], {Reply0, S1} = do_repair_chunks(Chunks, ToRepair, ReturnMode, [GotItFrom],
File, Depth, STime, S, {ok, Chunks}), File, Depth, STime, S, {ok, Chunks}),
{ok, Chunks} = Reply0,
Reply = {ok, {Chunks, _Trimmed}},
{reply, Reply, S1}; {reply, Reply, S1};
{error, bad_checksum}=BadCS -> {error, bad_checksum}=BadCS ->
%% TODO: alternate strategy? %% TODO: alternate strategy?
@ -818,7 +819,7 @@ update_proj2(Count, #state{bad_proj=BadProj, proxies_dict=ProxiesDict}=S) ->
run_middleworker_job(Fun, ArgList, WTimeout) -> run_middleworker_job(Fun, ArgList, WTimeout) ->
Parent = self(), Parent = self(),
MiddleWorker = MiddleWorker =
spawn(fun() -> spawn_link(fun() ->
PidsMons = PidsMons =
[spawn_monitor(fun() -> [spawn_monitor(fun() ->
Res = (catch Fun(Arg)), Res = (catch Fun(Arg)),
@ -859,17 +860,19 @@ try_to_find_chunk(Eligible, File, Offset, Size,
Work = fun(FLU) -> Work = fun(FLU) ->
Proxy = orddict:fetch(FLU, PD), Proxy = orddict:fetch(FLU, PD),
case ?FLU_PC:read_chunk(Proxy, EpochID, case ?FLU_PC:read_chunk(Proxy, EpochID,
File, Offset, Size) of %% TODO Trimmed is required here
{ok, Chunks} when is_list(Chunks) -> File, Offset, Size, []) of
{FLU, {ok, Chunks}}; {ok, {_Chunks, _} = ChunksAndTrimmed} ->
{FLU, {ok, ChunksAndTrimmed}};
Else -> Else ->
{FLU, Else} {FLU, Else}
end end
end, end,
Rs = run_middleworker_job(Work, Eligible, Timeout), Rs = run_middleworker_job(Work, Eligible, Timeout),
case [X || {_, {ok, [{_,_,B,_}]}}=X <- Rs, is_binary(B)] of
[{FoundFLU, {ok, Chunk}}|_] -> case [X || {_Fluname, {ok, {[{_,_,B,_}], _}}}=X <- Rs, is_binary(B)] of
{ok, Chunk, FoundFLU}; [{FoundFLU, {ok, ChunkAndTrimmed}}|_] ->
{ok, ChunkAndTrimmed, FoundFLU};
[] -> [] ->
RetryErrs = [partition, bad_epoch, wedged], RetryErrs = [partition, bad_epoch, wedged],
case [Err || {error, Err} <- Rs, lists:member(Err, RetryErrs)] of case [Err || {error, Err} <- Rs, lists:member(Err, RetryErrs)] of

View file

@ -52,6 +52,7 @@
sync/1, sync/1,
sync/2, sync/2,
read/3, read/3,
read/4,
write/3, write/3,
write/4, write/4,
append/2, append/2,
@ -83,9 +84,9 @@
wedged = false :: boolean(), wedged = false :: boolean(),
csum_file :: string()|undefined, csum_file :: string()|undefined,
csum_path :: string()|undefined, csum_path :: string()|undefined,
eof_position = 0 :: non_neg_integer(),
data_filehandle :: file:io_device(), data_filehandle :: file:io_device(),
csum_table :: machi_csum_table:table(), csum_table :: machi_csum_table:table(),
eof_position = 0 :: non_neg_integer(),
tref :: reference(), %% timer ref tref :: reference(), %% timer ref
ticks = 0 :: non_neg_integer(), %% ticks elapsed with no new operations ticks = 0 :: non_neg_integer(), %% ticks elapsed with no new operations
ops = 0 :: non_neg_integer(), %% sum of all ops ops = 0 :: non_neg_integer(), %% sum of all ops
@ -135,11 +136,22 @@ sync(_Pid, Type) ->
{ok, [{Filename::string(), Offset :: non_neg_integer(), {ok, [{Filename::string(), Offset :: non_neg_integer(),
Data :: binary(), Checksum :: binary()}]} | Data :: binary(), Checksum :: binary()}]} |
{error, Reason :: term()}. {error, Reason :: term()}.
read(Pid, Offset, Length) when is_pid(Pid) andalso is_integer(Offset) andalso Offset >= 0 read(Pid, Offset, Length) ->
andalso is_integer(Length) andalso Length > 0 -> read(Pid, Offset, Length, []).
gen_server:call(Pid, {read, Offset, Length}, ?TIMEOUT);
read(_Pid, Offset, Length) -> -spec read(Pid :: pid(),
lager:warning("Bad args to read: Offset ~p, Length ~p", [Offset, Length]), Offset :: non_neg_integer(),
Length :: non_neg_integer(),
[{no_checksum|no_chunk|needs_trimmed, boolean()}]) ->
{ok, [{Filename::string(), Offset :: non_neg_integer(),
Data :: binary(), Checksum :: binary()}]} |
{error, Reason :: term()}.
read(Pid, Offset, Length, Opts) when is_pid(Pid) andalso is_integer(Offset) andalso Offset >= 0
andalso is_integer(Length) andalso Length > 0
andalso is_list(Opts) ->
gen_server:call(Pid, {read, Offset, Length, Opts}, ?TIMEOUT);
read(_Pid, Offset, Length, Opts) ->
lager:warning("Bad args to read: Offset ~p, Length ~p, Options ~p", [Offset, Length, Opts]),
{error, bad_arg}. {error, bad_arg}.
% @doc Write data at offset % @doc Write data at offset
@ -211,8 +223,8 @@ init({Filename, DataDir}) ->
csum_table = CsumTable, csum_table = CsumTable,
tref = Tref, tref = Tref,
eof_position = Eof}, eof_position = Eof},
lager:debug("Starting file proxy ~p for filename ~p, state = ~p", lager:debug("Starting file proxy ~p for filename ~p, state = ~p, Eof = ~p",
[self(), Filename, St]), [self(), Filename, St, Eof]),
{ok, St}. {ok, St}.
% @private % @private
@ -250,13 +262,13 @@ handle_call({sync, all}, _From, State = #state{filename = F,
%%% READS %%% READS
handle_call({read, _Offset, _Length}, _From, handle_call({read, _Offset, _Length, _}, _From,
State = #state{wedged = true, State = #state{wedged = true,
reads = {T, Err} reads = {T, Err}
}) -> }) ->
{reply, {error, wedged}, State#state{writes = {T + 1, Err + 1}}}; {reply, {error, wedged}, State#state{writes = {T + 1, Err + 1}}};
handle_call({read, Offset, Length}, _From, handle_call({read, Offset, Length, _Opts}, _From,
State = #state{eof_position = Eof, State = #state{eof_position = Eof,
reads = {T, Err} reads = {T, Err}
}) when Offset > Eof -> }) when Offset > Eof ->
@ -265,23 +277,28 @@ handle_call({read, Offset, Length}, _From,
[Offset, Length, Eof]), [Offset, Length, Eof]),
{reply, {error, not_written}, State#state{reads = {T + 1, Err + 1}}}; {reply, {error, not_written}, State#state{reads = {T + 1, Err + 1}}};
handle_call({read, Offset, Length}, _From, handle_call({read, Offset, Length, Opts}, _From,
State = #state{filename = F, State = #state{filename = F,
data_filehandle = FH, data_filehandle = FH,
csum_table = CsumTable, csum_table = CsumTable,
eof_position = EofP,
reads = {T, Err} reads = {T, Err}
}) -> }) ->
NoChecksum = proplists:get_value(no_checksum, Opts, false),
NoChunk = proplists:get_value(no_chunk, Opts, false),
NeedsMerge = proplists:get_value(needs_trimmed, Opts, false),
{Resp, NewErr} = {Resp, NewErr} =
case do_read(FH, F, CsumTable, Offset, Length) of case do_read(FH, F, CsumTable, Offset, Length, NoChecksum, NoChunk, NeedsMerge) of
{ok, []} -> {ok, {[], []}} ->
{{error, not_written}, Err + 1}; {{error, not_written}, Err + 1};
{ok, Chunks} -> {ok, {Chunks, Trimmed}} ->
%% Kludge to wrap read result in tuples, to support fragmented read %% Kludge to wrap read result in tuples, to support fragmented read
%% XXX FIXME %% XXX FIXME
%% For now we are omiting the checksum data because it blows up %% For now we are omiting the checksum data because it blows up
%% protobufs. %% protobufs.
{{ok, Chunks}, Err}; {{ok, {Chunks, Trimmed}}, Err};
Error -> Error ->
lager:error("Can't read ~p, ~p at File ~p", [Offset, Length, F]),
{Error, Err + 1} {Error, Err + 1}
end, end,
{reply, Resp, State#state{reads = {T+1, NewErr}}}; {reply, Resp, State#state{reads = {T+1, NewErr}}};
@ -298,6 +315,7 @@ handle_call({write, Offset, ClientMeta, Data}, _From,
State = #state{filename = F, State = #state{filename = F,
writes = {T, Err}, writes = {T, Err},
data_filehandle = FHd, data_filehandle = FHd,
eof_position=EofP,
csum_table = CsumTable}) -> csum_table = CsumTable}) ->
ClientCsumTag = proplists:get_value(client_csum_tag, ClientMeta, ?CSUM_TAG_NONE), ClientCsumTag = proplists:get_value(client_csum_tag, ClientMeta, ?CSUM_TAG_NONE),
@ -318,6 +336,8 @@ handle_call({write, Offset, ClientMeta, Data}, _From,
end end
end, end,
{NewEof, infinity} = lists:last(machi_csum_table:calc_unwritten_bytes(CsumTable)), {NewEof, infinity} = lists:last(machi_csum_table:calc_unwritten_bytes(CsumTable)),
lager:debug("Wrote ~p bytes at ~p of file ~p, NewEOF = ~p~n",
[iolist_size(Data), Offset, F, NewEof]),
{reply, Resp, State#state{writes = {T+1, NewErr}, {reply, Resp, State#state{writes = {T+1, NewErr},
eof_position = NewEof}}; eof_position = NewEof}};
@ -351,15 +371,14 @@ handle_call({append, ClientMeta, Extra, Data}, _From,
ok -> ok ->
{{ok, F, EofP}, Err}; {{ok, F, EofP}, Err};
Error -> Error ->
{Error, Err + 1, EofP} {Error, Err + 1}
end end
end, end,
%% TODO: do we check this with calling
%% machi_csum_table:calc_unwritten_bytes/1?
NewEof = EofP + byte_size(Data) + Extra, NewEof = EofP + byte_size(Data) + Extra,
lager:debug("appended ~p bytes at ~p file ~p. NewEofP = ~p",
[iolist_size(Data), EofP, F, NewEof]),
{reply, Resp, State#state{appends = {T+1, NewErr}, {reply, Resp, State#state{appends = {T+1, NewErr},
eof_position = NewEof eof_position = NewEof}};
}};
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
lager:warning("Unknown call: ~p", [Req]), lager:warning("Unknown call: ~p", [Req]),
@ -500,7 +519,10 @@ check_or_make_tagged_csum(OtherTag, _ClientCsum, _Data) ->
Filename :: string(), Filename :: string(),
CsumTable :: machi_csum_table:table(), CsumTable :: machi_csum_table:table(),
Offset :: non_neg_integer(), Offset :: non_neg_integer(),
Size :: non_neg_integer() Size :: non_neg_integer(),
NoChecksum :: boolean(),
NoChunk :: boolean(),
NeedsTrimmed :: boolean()
) -> {ok, Chunks :: [{string(), Offset::non_neg_integer(), binary(), Csum :: binary()}]} | ) -> {ok, Chunks :: [{string(), Offset::non_neg_integer(), binary(), Csum :: binary()}]} |
{error, bad_checksum} | {error, bad_checksum} |
{error, partial_read} | {error, partial_read} |
@ -519,6 +541,9 @@ check_or_make_tagged_csum(OtherTag, _ClientCsum, _Data) ->
% tuple is returned.</ul> % tuple is returned.</ul>
% </li> % </li>
% %
do_read(FHd, Filename, CsumTable, Offset, Size, _, _, _) ->
do_read(FHd, Filename, CsumTable, Offset, Size).
do_read(FHd, Filename, CsumTable, Offset, Size) -> do_read(FHd, Filename, CsumTable, Offset, Size) ->
%% Note that find/3 only returns overlapping chunks, both borders %% Note that find/3 only returns overlapping chunks, both borders
%% are not aligned to original Offset and Size. %% are not aligned to original Offset and Size.
@ -526,7 +551,8 @@ do_read(FHd, Filename, CsumTable, Offset, Size) ->
read_all_ranges(FHd, Filename, ChunkCsums, []). read_all_ranges(FHd, Filename, ChunkCsums, []).
read_all_ranges(_, _, [], ReadChunks) -> read_all_ranges(_, _, [], ReadChunks) ->
{ok, lists:reverse(ReadChunks)}; %% TODO: currently returns empty list of trimmed chunks
{ok, {lists:reverse(ReadChunks), []}};
read_all_ranges(FHd, Filename, [{Offset, Size, TaggedCsum}|T], ReadChunks) -> read_all_ranges(FHd, Filename, [{Offset, Size, TaggedCsum}|T], ReadChunks) ->
case file:pread(FHd, Offset, Size) of case file:pread(FHd, Offset, Size) of
@ -592,12 +618,12 @@ handle_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Data) ->
{error, Reason} {error, Reason}
end; end;
[{Offset, Size, TaggedCsum}] -> [{Offset, Size, TaggedCsum}] ->
case do_read(FHd, Filename, CsumTable, Offset, Size) of case do_read(FHd, Filename, CsumTable, Offset, Size, false, false, false) of
{error, _} = E -> {error, _} = E ->
lager:warning("This should never happen: got ~p while reading at offset ~p in file ~p that's supposedly written", lager:warning("This should never happen: got ~p while reading at offset ~p in file ~p that's supposedly written",
[E, Offset, Filename]), [E, Offset, Filename]),
{error, server_insanity}; {error, server_insanity};
{ok, [{_, Offset, Data, TaggedCsum}]} -> {ok, {[{_, Offset, Data, TaggedCsum}], _}} ->
%% TODO: what if different checksum got from do_read()? %% TODO: what if different checksum got from do_read()?
ok; ok;
{ok, _Other} -> {ok, _Other} ->
@ -632,7 +658,6 @@ do_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Size, Data) ->
ok = machi_csum_table:write(CsumTable, Offset, Size, TaggedCsum), ok = machi_csum_table:write(CsumTable, Offset, Size, TaggedCsum),
lager:debug("Successful write to checksum file for ~p", lager:debug("Successful write to checksum file for ~p",
[Filename]), [Filename]),
%% io:format(user, "here, heh ~p~n", [?LINE]),
ok; ok;
Other -> Other ->
lager:error("Got ~p during write to file ~p at offset ~p, length ~p", lager:error("Got ~p during write to file ~p at offset ~p, length ~p",

View file

@ -450,9 +450,9 @@ do_pb_hl_request2({high_write_chunk, File, Offset, ChunkBin, TaggedCSum},
Chunk = {TaggedCSum, ChunkBin}, Chunk = {TaggedCSum, ChunkBin},
Res = machi_cr_client:write_chunk(Clnt, File, Offset, Chunk), Res = machi_cr_client:write_chunk(Clnt, File, Offset, Chunk),
{Res, S}; {Res, S};
do_pb_hl_request2({high_read_chunk, File, Offset, Size}, do_pb_hl_request2({high_read_chunk, File, Offset, Size, Opts},
#state{high_clnt=Clnt}=S) -> #state{high_clnt=Clnt}=S) ->
Res = machi_cr_client:read_chunk(Clnt, File, Offset, Size), Res = machi_cr_client:read_chunk(Clnt, File, Offset, Size, Opts),
{Res, S}; {Res, S};
do_pb_hl_request2({high_trim_chunk, File, Offset, Size}, do_pb_hl_request2({high_trim_chunk, File, Offset, Size},
#state{high_clnt=Clnt}=S) -> #state{high_clnt=Clnt}=S) ->
@ -548,13 +548,13 @@ do_server_write_chunk(File, Offset, Chunk, CSum_tag, CSum, #state{flu_name=FluNa
{error, bad_arg} {error, bad_arg}
end. end.
do_server_read_chunk(File, Offset, Size, _Opts, #state{flu_name=FluName})-> do_server_read_chunk(File, Offset, Size, Opts, #state{flu_name=FluName})->
%% TODO: Look inside Opts someday. %% TODO: Look inside Opts someday.
case sanitize_file_string(File) of case sanitize_file_string(File) of
ok -> ok ->
{ok, Pid} = machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}), {ok, Pid} = machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}),
case machi_file_proxy:read(Pid, Offset, Size) of case machi_file_proxy:read(Pid, Offset, Size, Opts) of
{ok, Chunks} -> {ok, Chunks}; {ok, ChunksAndTrimmed} -> {ok, ChunksAndTrimmed};
Other -> Other Other -> Other
end; end;
_ -> _ ->

View file

@ -56,7 +56,7 @@
%% File API %% File API
append_chunk/4, append_chunk/5, append_chunk/4, append_chunk/5,
append_chunk_extra/5, append_chunk_extra/6, append_chunk_extra/5, append_chunk_extra/6,
read_chunk/5, read_chunk/6, read_chunk/6, read_chunk/7,
checksum_list/3, checksum_list/4, checksum_list/3, checksum_list/4,
list_files/2, list_files/3, list_files/2, list_files/3,
wedge_status/1, wedge_status/2, wedge_status/1, wedge_status/2,
@ -144,26 +144,28 @@ append_chunk_extra(Host, TcpPort, EpochID, Prefix, Chunk, ChunkExtra)
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'. %% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
-spec read_chunk(port_wrap(), machi_dt:epoch_id(), machi_dt:file_name(), machi_dt:file_offset(), machi_dt:chunk_size()) -> -spec read_chunk(port_wrap(), machi_dt:epoch_id(), machi_dt:file_name(), machi_dt:file_offset(), machi_dt:chunk_size(),
proplists:proplist()) ->
{ok, machi_dt:chunk_s()} | {ok, machi_dt:chunk_s()} |
{error, machi_dt:error_general() | 'not_written' | 'partial_read'} | {error, machi_dt:error_general() | 'not_written' | 'partial_read'} |
{error, term()}. {error, term()}.
read_chunk(Sock, EpochID, File, Offset, Size) read_chunk(Sock, EpochID, File, Offset, Size, Opts)
when Offset >= ?MINIMUM_OFFSET, Size >= 0 -> when Offset >= ?MINIMUM_OFFSET, Size >= 0 ->
read_chunk2(Sock, EpochID, File, Offset, Size). read_chunk2(Sock, EpochID, File, Offset, Size, Opts).
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'. %% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
-spec read_chunk(machi_dt:inet_host(), machi_dt:inet_port(), machi_dt:epoch_id(), -spec read_chunk(machi_dt:inet_host(), machi_dt:inet_port(), machi_dt:epoch_id(),
machi_dt:file_name(), machi_dt:file_offset(), machi_dt:chunk_size()) -> machi_dt:file_name(), machi_dt:file_offset(), machi_dt:chunk_size(),
proplists:proplist()) ->
{ok, machi_dt:chunk_s()} | {ok, machi_dt:chunk_s()} |
{error, machi_dt:error_general() | 'not_written' | 'partial_read'} | {error, machi_dt:error_general() | 'not_written' | 'partial_read'} |
{error, term()}. {error, term()}.
read_chunk(Host, TcpPort, EpochID, File, Offset, Size) read_chunk(Host, TcpPort, EpochID, File, Offset, Size, Opts)
when Offset >= ?MINIMUM_OFFSET, Size >= 0 -> when Offset >= ?MINIMUM_OFFSET, Size >= 0 ->
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
try try
read_chunk2(Sock, EpochID, File, Offset, Size) read_chunk2(Sock, EpochID, File, Offset, Size, Opts)
after after
disconnect(Sock) disconnect(Sock)
end. end.
@ -516,12 +518,12 @@ trunc_hack(Host, TcpPort, EpochID, File) when is_integer(TcpPort) ->
%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%
read_chunk2(Sock, EpochID, File0, Offset, Size) -> read_chunk2(Sock, EpochID, File0, Offset, Size, Opts) ->
ReqID = <<"id">>, ReqID = <<"id">>,
File = machi_util:make_binary(File0), File = machi_util:make_binary(File0),
Req = machi_pb_translate:to_pb_request( Req = machi_pb_translate:to_pb_request(
ReqID, ReqID,
{low_read_chunk, EpochID, File, Offset, Size, []}), {low_read_chunk, EpochID, File, Offset, Size, Opts}),
do_pb_request_common(Sock, ReqID, Req). do_pb_request_common(Sock, ReqID, Req).
append_chunk2(Sock, EpochID, Prefix0, Chunk0, ChunkExtra) -> append_chunk2(Sock, EpochID, Prefix0, Chunk0, ChunkExtra) ->

View file

@ -40,7 +40,7 @@
auth/3, auth/4, auth/3, auth/4,
append_chunk/6, append_chunk/7, append_chunk/6, append_chunk/7,
write_chunk/5, write_chunk/6, write_chunk/5, write_chunk/6,
read_chunk/4, read_chunk/5, read_chunk/5, read_chunk/6,
trim_chunk/4, trim_chunk/5, trim_chunk/4, trim_chunk/5,
checksum_list/2, checksum_list/3, checksum_list/2, checksum_list/3,
list_files/1, list_files/2 list_files/1, list_files/2
@ -94,11 +94,18 @@ write_chunk(PidSpec, File, Offset, Chunk, CSum) ->
write_chunk(PidSpec, File, Offset, Chunk, CSum, Timeout) -> write_chunk(PidSpec, File, Offset, Chunk, CSum, Timeout) ->
send_sync(PidSpec, {write_chunk, File, Offset, Chunk, CSum}, Timeout). send_sync(PidSpec, {write_chunk, File, Offset, Chunk, CSum}, Timeout).
read_chunk(PidSpec, File, Offset, Size) -> -spec read_chunk(pid(), string(), pos_integer(), pos_integer(),
read_chunk(PidSpec, File, Offset, Size, ?DEFAULT_TIMEOUT). [{flag_no_checksum | flag_no_chunk | needs_trimmed, boolean()}]) ->
{ok, {list(), list()}} | {error, term()}.
read_chunk(PidSpec, File, Offset, Size, Options) ->
read_chunk(PidSpec, File, Offset, Size, Options, ?DEFAULT_TIMEOUT).
read_chunk(PidSpec, File, Offset, Size, Timeout) -> -spec read_chunk(pid(), string(), pos_integer(), pos_integer(),
send_sync(PidSpec, {read_chunk, File, Offset, Size}, Timeout). [{no_checksum | no_chunk | needs_trimmed, boolean()}],
pos_integer()) ->
{ok, {list(), list()}} | {error, term()}.
read_chunk(PidSpec, File, Offset, Size, Options, Timeout) ->
send_sync(PidSpec, {read_chunk, File, Offset, Size, Options}, Timeout).
trim_chunk(PidSpec, File, Offset, Size) -> trim_chunk(PidSpec, File, Offset, Size) ->
trim_chunk(PidSpec, File, Offset, Size, ?DEFAULT_TIMEOUT). trim_chunk(PidSpec, File, Offset, Size, ?DEFAULT_TIMEOUT).
@ -282,13 +289,19 @@ do_send_sync2({write_chunk, File, Offset, Chunk, CSum},
Res = {bummer, {X, Y, erlang:get_stacktrace()}}, Res = {bummer, {X, Y, erlang:get_stacktrace()}},
{Res, S#state{count=Count+1}} {Res, S#state{count=Count+1}}
end; end;
do_send_sync2({read_chunk, File, Offset, Size}, do_send_sync2({read_chunk, File, Offset, Size, Options},
#state{sock=Sock, sock_id=Index, count=Count}=S) -> #state{sock=Sock, sock_id=Index, count=Count}=S) ->
try try
ReqID = <<Index:64/big, Count:64/big>>, ReqID = <<Index:64/big, Count:64/big>>,
FlagNoChecksum = proplists:get_value(no_checksum, Options, false),
FlagNoChunk = proplists:get_value(no_chunk, Options, false),
NeedsTrimmed = proplists:get_value(needs_trimmed, Options, false),
Req = #mpb_readchunkreq{chunk_pos=#mpb_chunkpos{file_name=File, Req = #mpb_readchunkreq{chunk_pos=#mpb_chunkpos{file_name=File,
offset=Offset, offset=Offset,
chunk_size=Size}}, chunk_size=Size},
flag_no_checksum=machi_util:bool2int(FlagNoChecksum),
flag_no_chunk=machi_util:bool2int(FlagNoChunk),
flag_needs_trimmed=machi_util:bool2int(NeedsTrimmed)},
R1a = #mpb_request{req_id=ReqID, do_not_alter=1, R1a = #mpb_request{req_id=ReqID, do_not_alter=1,
read_chunk=Req}, read_chunk=Req},
Bin1a = machi_pb:encode_mpb_request(R1a), Bin1a = machi_pb:encode_mpb_request(R1a),
@ -416,7 +429,7 @@ convert_write_chunk_resp(#mpb_writechunkresp{status='OK'}) ->
convert_write_chunk_resp(#mpb_writechunkresp{status=Status}) -> convert_write_chunk_resp(#mpb_writechunkresp{status=Status}) ->
convert_general_status_code(Status). convert_general_status_code(Status).
convert_read_chunk_resp(#mpb_readchunkresp{status='OK', chunks=PB_Chunks}) -> convert_read_chunk_resp(#mpb_readchunkresp{status='OK', chunks=PB_Chunks, trimmed=PB_Trimmed}) ->
Chunks = lists:map(fun(#mpb_chunk{offset=Offset, Chunks = lists:map(fun(#mpb_chunk{offset=Offset,
file_name=File, file_name=File,
chunk=Chunk, chunk=Chunk,
@ -425,7 +438,12 @@ convert_read_chunk_resp(#mpb_readchunkresp{status='OK', chunks=PB_Chunks}) ->
Csum = <<(machi_pb_translate:conv_to_csum_tag(T)):8, Ck/binary>>, Csum = <<(machi_pb_translate:conv_to_csum_tag(T)):8, Ck/binary>>,
{File, Offset, Chunk, Csum} {File, Offset, Chunk, Csum}
end, PB_Chunks), end, PB_Chunks),
{ok, Chunks}; Trimmed = lists:map(fun(#mpb_chunkpos{file_name=File,
offset=Offset,
chunk_size=Size}) ->
{File, Offset, Size}
end, PB_Trimmed),
{ok, {Chunks, Trimmed}};
convert_read_chunk_resp(#mpb_readchunkresp{status=Status}) -> convert_read_chunk_resp(#mpb_readchunkresp{status=Status}) ->
convert_general_status_code(Status). convert_general_status_code(Status).

View file

@ -75,13 +75,15 @@ from_pb_request(#mpb_ll_request{
from_pb_request(#mpb_ll_request{ from_pb_request(#mpb_ll_request{
req_id=ReqID, req_id=ReqID,
read_chunk=#mpb_ll_readchunkreq{ read_chunk=#mpb_ll_readchunkreq{
epoch_id=PB_EpochID, epoch_id=PB_EpochID,
chunk_pos=ChunkPos, chunk_pos=ChunkPos,
flag_no_checksum=PB_GetNoChecksum, flag_no_checksum=PB_GetNoChecksum,
flag_no_chunk=PB_GetNoChunk}}) -> flag_no_chunk=PB_GetNoChunk,
flag_needs_trimmed=PB_NeedsTrimmed}}) ->
EpochID = conv_to_epoch_id(PB_EpochID), EpochID = conv_to_epoch_id(PB_EpochID),
Opts = [{no_checksum, conv_to_boolean(PB_GetNoChecksum)}, Opts = [{no_checksum, conv_to_boolean(PB_GetNoChecksum)},
{no_chunk, conv_to_boolean(PB_GetNoChunk)}], {no_chunk, conv_to_boolean(PB_GetNoChunk)},
{needs_trimmed, conv_to_boolean(PB_NeedsTrimmed)}],
#mpb_chunkpos{file_name=File, #mpb_chunkpos{file_name=File,
offset=Offset, offset=Offset,
chunk_size=Size} = ChunkPos, chunk_size=Size} = ChunkPos,
@ -177,8 +179,15 @@ from_pb_request(#mpb_request{req_id=ReqID,
read_chunk=IR=#mpb_readchunkreq{}}) -> read_chunk=IR=#mpb_readchunkreq{}}) ->
#mpb_readchunkreq{chunk_pos=#mpb_chunkpos{file_name=File, #mpb_readchunkreq{chunk_pos=#mpb_chunkpos{file_name=File,
offset=Offset, offset=Offset,
chunk_size=Size}} = IR, chunk_size=Size},
{ReqID, {high_read_chunk, File, Offset, Size}}; flag_no_checksum=FlagNoChecksum,
flag_no_chunk=FlagNoChunk,
flag_needs_trimmed=NeedsTrimmed} = IR,
%% I want MAPS
Options = [{no_checksum, machi_util:int2bool(FlagNoChecksum)},
{no_chunk, machi_util:int2bool(FlagNoChunk)},
{needs_trimmed, machi_util:int2bool(NeedsTrimmed)}],
{ReqID, {high_read_chunk, File, Offset, Size, Options}};
from_pb_request(#mpb_request{req_id=ReqID, from_pb_request(#mpb_request{req_id=ReqID,
trim_chunk=IR=#mpb_trimchunkreq{}}) -> trim_chunk=IR=#mpb_trimchunkreq{}}) ->
#mpb_trimchunkreq{chunk_pos=#mpb_chunkpos{file_name=File, #mpb_trimchunkreq{chunk_pos=#mpb_chunkpos{file_name=File,
@ -233,7 +242,8 @@ from_pb_response(#mpb_ll_response{
from_pb_response(#mpb_ll_response{ from_pb_response(#mpb_ll_response{
req_id=ReqID, req_id=ReqID,
read_chunk=#mpb_ll_readchunkresp{status=Status, read_chunk=#mpb_ll_readchunkresp{status=Status,
chunks=PB_Chunks}}) -> chunks=PB_Chunks,
trimmed=PB_Trimmed}}) ->
case Status of case Status of
'OK' -> 'OK' ->
Chunks = lists:map(fun(#mpb_chunk{file_name=File, Chunks = lists:map(fun(#mpb_chunk{file_name=File,
@ -243,7 +253,12 @@ from_pb_response(#mpb_ll_response{
Csum = <<(conv_to_csum_tag(T)):8, Ck/binary>>, Csum = <<(conv_to_csum_tag(T)):8, Ck/binary>>,
{File, Offset, Bytes, Csum} {File, Offset, Bytes, Csum}
end, PB_Chunks), end, PB_Chunks),
{ReqID, {ok, Chunks}}; Trimmed = lists:map(fun(#mpb_chunkpos{file_name=File,
offset=Offset,
chunk_size=Size}) ->
{File, Offset, Size}
end, PB_Trimmed),
{ReqID, {ok, {Chunks, Trimmed}}};
_ -> _ ->
{ReqID, machi_pb_high_client:convert_general_status_code(Status)} {ReqID, machi_pb_high_client:convert_general_status_code(Status)}
end; end;
@ -382,17 +397,23 @@ to_pb_request(ReqID, {low_write_chunk, EpochID, File, Offset, Chunk, CSum_tag, C
offset=Offset, offset=Offset,
chunk=Chunk, chunk=Chunk,
csum=PB_CSum}}}; csum=PB_CSum}}};
to_pb_request(ReqID, {low_read_chunk, EpochID, File, Offset, Size, _Opts}) -> to_pb_request(ReqID, {low_read_chunk, EpochID, File, Offset, Size, Opts}) ->
%% TODO: stop ignoring Opts ^_^ %% TODO: stop ignoring Opts ^_^
PB_EpochID = conv_from_epoch_id(EpochID), PB_EpochID = conv_from_epoch_id(EpochID),
FNChecksum = proplists:get_value(no_checksum, Opts, false),
FNChunk = proplists:get_value(no_chunk, Opts, false),
NeedsTrimmed = proplists:get_value(needs_merge, Opts, false),
#mpb_ll_request{ #mpb_ll_request{
req_id=ReqID, do_not_alter=2, req_id=ReqID, do_not_alter=2,
read_chunk=#mpb_ll_readchunkreq{ read_chunk=#mpb_ll_readchunkreq{
epoch_id=PB_EpochID, epoch_id=PB_EpochID,
chunk_pos=#mpb_chunkpos{ chunk_pos=#mpb_chunkpos{
file_name=File, file_name=File,
offset=Offset, offset=Offset,
chunk_size=Size}}}; chunk_size=Size},
flag_no_checksum=machi_util:bool2int(FNChecksum),
flag_no_chunk=machi_util:bool2int(FNChunk),
flag_needs_trimmed=machi_util:bool2int(NeedsTrimmed)}};
to_pb_request(ReqID, {low_checksum_list, EpochID, File}) -> to_pb_request(ReqID, {low_checksum_list, EpochID, File}) ->
PB_EpochID = conv_from_epoch_id(EpochID), PB_EpochID = conv_from_epoch_id(EpochID),
#mpb_ll_request{req_id=ReqID, do_not_alter=2, #mpb_ll_request{req_id=ReqID, do_not_alter=2,
@ -478,7 +499,7 @@ to_pb_response(ReqID, {low_write_chunk, _EID, _Fl, _Off, _Ch, _CST, _CS},Resp)->
write_chunk=#mpb_ll_writechunkresp{status=Status}}; write_chunk=#mpb_ll_writechunkresp{status=Status}};
to_pb_response(ReqID, {low_read_chunk, _EID, _Fl, _Off, _Sz, _Opts}, Resp)-> to_pb_response(ReqID, {low_read_chunk, _EID, _Fl, _Off, _Sz, _Opts}, Resp)->
case Resp of case Resp of
{ok, Chunks} -> {ok, {Chunks, Trimmed}} ->
PB_Chunks = lists:map(fun({File, Offset, Bytes, Csum}) -> PB_Chunks = lists:map(fun({File, Offset, Bytes, Csum}) ->
{Tag, Ck} = machi_util:unmake_tagged_csum(Csum), {Tag, Ck} = machi_util:unmake_tagged_csum(Csum),
#mpb_chunk{file_name=File, #mpb_chunk{file_name=File,
@ -487,9 +508,15 @@ to_pb_response(ReqID, {low_read_chunk, _EID, _Fl, _Off, _Sz, _Opts}, Resp)->
csum=#mpb_chunkcsum{type=conv_from_csum_tag(Tag), csum=#mpb_chunkcsum{type=conv_from_csum_tag(Tag),
csum=Ck}} csum=Ck}}
end, Chunks), end, Chunks),
PB_Trimmed = lists:map(fun({File, Offset, Size}) ->
#mpb_chunkpos{file_name=File,
offset=Offset,
chunk_size=Size}
end, Trimmed),
#mpb_ll_response{req_id=ReqID, #mpb_ll_response{req_id=ReqID,
read_chunk=#mpb_ll_readchunkresp{status='OK', read_chunk=#mpb_ll_readchunkresp{status='OK',
chunks=PB_Chunks}}; chunks=PB_Chunks,
trimmed=PB_Trimmed}};
{error, _}=Error -> {error, _}=Error ->
Status = conv_from_status(Error), Status = conv_from_status(Error),
#mpb_ll_response{req_id=ReqID, #mpb_ll_response{req_id=ReqID,
@ -654,10 +681,10 @@ to_pb_response(ReqID, {high_write_chunk, _File, _Offset, _Chunk, _TaggedCSum}, R
_Else -> _Else ->
make_error_resp(ReqID, 66, io_lib:format("err ~p", [_Else])) make_error_resp(ReqID, 66, io_lib:format("err ~p", [_Else]))
end; end;
to_pb_response(ReqID, {high_read_chunk, _File, _Offset, _Size}, Resp) -> to_pb_response(ReqID, {high_read_chunk, _File, _Offset, _Size, _}, Resp) ->
case Resp of case Resp of
{ok, Chunks} -> {ok, {Chunks, Trimmed}} ->
MpbChunks = lists:map(fun({File, Offset, Bytes, Csum}) -> PB_Chunks = lists:map(fun({File, Offset, Bytes, Csum}) ->
{Tag, Ck} = machi_util:unmake_tagged_csum(Csum), {Tag, Ck} = machi_util:unmake_tagged_csum(Csum),
#mpb_chunk{ #mpb_chunk{
offset=Offset, offset=Offset,
@ -665,9 +692,15 @@ to_pb_response(ReqID, {high_read_chunk, _File, _Offset, _Size}, Resp) ->
chunk=Bytes, chunk=Bytes,
csum=#mpb_chunkcsum{type=conv_from_csum_tag(Tag), csum=Ck}} csum=#mpb_chunkcsum{type=conv_from_csum_tag(Tag), csum=Ck}}
end, Chunks), end, Chunks),
PB_Trimmed = lists:map(fun({File, Offset, Size}) ->
#mpb_chunkpos{file_name=File,
offset=Offset,
chunk_size=Size}
end, Trimmed),
#mpb_response{req_id=ReqID, #mpb_response{req_id=ReqID,
read_chunk=#mpb_readchunkresp{status='OK', read_chunk=#mpb_readchunkresp{status='OK',
chunks=MpbChunks}}; chunks=PB_Chunks,
trimmed=PB_Trimmed}};
{error, _}=Error -> {error, _}=Error ->
Status = conv_from_status(Error), Status = conv_from_status(Error),
#mpb_response{req_id=ReqID, #mpb_response{req_id=ReqID,

View file

@ -59,7 +59,7 @@
%% File API %% File API
append_chunk/4, append_chunk/5, append_chunk/4, append_chunk/5,
append_chunk_extra/5, append_chunk_extra/6, append_chunk_extra/5, append_chunk_extra/6,
read_chunk/5, read_chunk/6, read_chunk/6, read_chunk/7,
checksum_list/3, checksum_list/4, checksum_list/3, checksum_list/4,
list_files/2, list_files/3, list_files/2, list_files/3,
wedge_status/1, wedge_status/2, wedge_status/1, wedge_status/2,
@ -130,13 +130,13 @@ append_chunk_extra(PidSpec, EpochID, Prefix, Chunk, ChunkExtra, Timeout) ->
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'. %% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
read_chunk(PidSpec, EpochID, File, Offset, Size) -> read_chunk(PidSpec, EpochID, File, Offset, Size, Opts) ->
read_chunk(PidSpec, EpochID, File, Offset, Size, infinity). read_chunk(PidSpec, EpochID, File, Offset, Size, Opts, infinity).
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'. %% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
read_chunk(PidSpec, EpochID, File, Offset, Size, Timeout) -> read_chunk(PidSpec, EpochID, File, Offset, Size, Opts, Timeout) ->
gen_server:call(PidSpec, {req, {read_chunk, EpochID, File, Offset, Size}}, gen_server:call(PidSpec, {req, {read_chunk, EpochID, File, Offset, Size, Opts}},
Timeout). Timeout).
%% @doc Fetch the list of chunk checksums for `File'. %% @doc Fetch the list of chunk checksums for `File'.
@ -299,8 +299,8 @@ write_chunk(PidSpec, EpochID, File, Offset, Chunk, Timeout) ->
Timeout) of Timeout) of
{error, written}=Err -> {error, written}=Err ->
Size = byte_size(Chunk), Size = byte_size(Chunk),
case read_chunk(PidSpec, EpochID, File, Offset, Size, Timeout) of case read_chunk(PidSpec, EpochID, File, Offset, Size, [], Timeout) of
{ok, Chunk2} when Chunk2 == Chunk -> {ok, {[{File, Offset, Chunk2, _}], []}} when Chunk2 == Chunk ->
%% See equivalent comment inside write_projection(). %% See equivalent comment inside write_projection().
ok; ok;
_ -> _ ->
@ -377,9 +377,9 @@ make_req_fun({append_chunk, EpochID, Prefix, Chunk},
make_req_fun({append_chunk_extra, EpochID, Prefix, Chunk, ChunkExtra}, make_req_fun({append_chunk_extra, EpochID, Prefix, Chunk, ChunkExtra},
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) -> #state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
fun() -> Mod:append_chunk_extra(Sock, EpochID, Prefix, Chunk, ChunkExtra) end; fun() -> Mod:append_chunk_extra(Sock, EpochID, Prefix, Chunk, ChunkExtra) end;
make_req_fun({read_chunk, EpochID, File, Offset, Size}, make_req_fun({read_chunk, EpochID, File, Offset, Size, Opts},
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) -> #state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
fun() -> Mod:read_chunk(Sock, EpochID, File, Offset, Size) end; fun() -> Mod:read_chunk(Sock, EpochID, File, Offset, Size, Opts) end;
make_req_fun({write_chunk, EpochID, File, Offset, Chunk}, make_req_fun({write_chunk, EpochID, File, Offset, Chunk},
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) -> #state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
fun() -> Mod:write_chunk(Sock, EpochID, File, Offset, Chunk) end; fun() -> Mod:write_chunk(Sock, EpochID, File, Offset, Chunk) end;

View file

@ -47,7 +47,9 @@
combinations/1, ordered_combinations/1, combinations/1, ordered_combinations/1,
mk_order/2, mk_order/2,
%% Other %% Other
wait_for_death/2, wait_for_life/2 wait_for_death/2, wait_for_life/2,
bool2int/1,
int2bool/1
]). ]).
-include("machi.hrl"). -include("machi.hrl").
@ -390,3 +392,9 @@ mk_order(UPI2, Repair1) ->
error -> error error -> error
end || X <- UPI2], end || X <- UPI2],
UPI2_order. UPI2_order.
%% C-style conversion for PB usage.
bool2int(true) -> 1;
bool2int(false) -> 0.
int2bool(0) -> false;
int2bool(I) when is_integer(I) -> true.

View file

@ -114,13 +114,14 @@ smoke_test2() ->
Chunk1_badcs = {<<?CSUM_TAG_CLIENT_SHA:8, 0:(8*20)>>, Chunk1}, Chunk1_badcs = {<<?CSUM_TAG_CLIENT_SHA:8, 0:(8*20)>>, Chunk1},
{error, bad_checksum} = {error, bad_checksum} =
machi_cr_client:append_chunk(C1, Prefix, Chunk1_badcs), machi_cr_client:append_chunk(C1, Prefix, Chunk1_badcs),
{ok, [{_, Off1, Chunk1, _}]} = machi_cr_client:read_chunk(C1, File1, Off1, Size1), {ok, {[{_, Off1, Chunk1, _}], []}} =
machi_cr_client:read_chunk(C1, File1, Off1, Size1, []),
{ok, PPP} = machi_flu1_client:read_latest_projection(Host, PortBase+0, {ok, PPP} = machi_flu1_client:read_latest_projection(Host, PortBase+0,
private), private),
%% Verify that the client's CR wrote to all of them. %% Verify that the client's CR wrote to all of them.
[{ok, [{_, Off1, Chunk1, _}]} = [{ok, {[{_, Off1, Chunk1, _}], []}} =
machi_flu1_client:read_chunk( machi_flu1_client:read_chunk(
Host, PortBase+X, EpochID, File1, Off1, Size1) || Host, PortBase+X, EpochID, File1, Off1, Size1, []) ||
X <- [0,1,2] ], X <- [0,1,2] ],
%% Test read repair: Manually write to head, then verify that %% Test read repair: Manually write to head, then verify that
@ -128,13 +129,19 @@ smoke_test2() ->
FooOff1 = Off1 + (1024*1024), FooOff1 = Off1 + (1024*1024),
[{error, not_written} = machi_flu1_client:read_chunk( [{error, not_written} = machi_flu1_client:read_chunk(
Host, PortBase+X, EpochID, Host, PortBase+X, EpochID,
File1, FooOff1, Size1) || X <- [0,1,2] ], File1, FooOff1, Size1, []) || X <- [0,1,2] ],
ok = machi_flu1_client:write_chunk(Host, PortBase+0, EpochID, ok = machi_flu1_client:write_chunk(Host, PortBase+0, EpochID,
File1, FooOff1, Chunk1), File1, FooOff1, Chunk1),
{ok, [{_, FooOff1, Chunk1, _}]} = machi_cr_client:read_chunk(C1, File1, FooOff1, Size1), {ok, {[{_, FooOff1, Chunk1, _}], []}} =
[{X,{ok, [{_, FooOff1, Chunk1, _}]}} = {X,machi_flu1_client:read_chunk( machi_flu1_client:read_chunk(Host, PortBase+0, EpochID,
File1, FooOff1, Size1, []),
{ok, {[{_, FooOff1, Chunk1, _}], []}} =
machi_cr_client:read_chunk(C1, File1, FooOff1, Size1, []),
[?assertMatch({X,{ok, {[{_, FooOff1, Chunk1, _}], []}}},
{X,machi_flu1_client:read_chunk(
Host, PortBase+X, EpochID, Host, PortBase+X, EpochID,
File1, FooOff1, Size1)} || X <- [0,1,2] ], File1, FooOff1, Size1, [])})
|| X <- [0,1,2] ],
%% Test read repair: Manually write to middle, then same checking. %% Test read repair: Manually write to middle, then same checking.
FooOff2 = Off1 + (2*1024*1024), FooOff2 = Off1 + (2*1024*1024),
@ -142,18 +149,19 @@ smoke_test2() ->
Size2 = size(Chunk2), Size2 = size(Chunk2),
ok = machi_flu1_client:write_chunk(Host, PortBase+1, EpochID, ok = machi_flu1_client:write_chunk(Host, PortBase+1, EpochID,
File1, FooOff2, Chunk2), File1, FooOff2, Chunk2),
{ok, [{_, FooOff2, Chunk2, _}]} = machi_cr_client:read_chunk(C1, File1, FooOff2, Size2), {ok, {[{_, FooOff2, Chunk2, _}], []}} =
[{X,{ok, [{_, FooOff2, Chunk2, _}]}} = {X,machi_flu1_client:read_chunk( machi_cr_client:read_chunk(C1, File1, FooOff2, Size2, []),
Host, PortBase+X, EpochID, [{X,{ok, {[{_, FooOff2, Chunk2, _}], []}}} =
File1, FooOff2, Size2)} || X <- [0,1,2] ], {X,machi_flu1_client:read_chunk(
Host, PortBase+X, EpochID,
File1, FooOff2, Size2, [])} || X <- [0,1,2] ],
%% Misc API smoke & minor regression checks %% Misc API smoke & minor regression checks
{error, bad_arg} = machi_cr_client:read_chunk(C1, <<"no">>, {error, bad_arg} = machi_cr_client:read_chunk(C1, <<"no">>,
999999999, 1), 999999999, 1, []),
{ok, [{_,Off1,Chunk1,_}, {ok, {[{_,Off1,Chunk1,_}, {_,FooOff1,Chunk1,_}, {_,FooOff2,Chunk2,_}],
{_,FooOff1,Chunk1,_}, []}} =
{_,FooOff2,Chunk2,_}]} = machi_cr_client:read_chunk(C1, File1, machi_cr_client:read_chunk(C1, File1, Off1, 88888888, []),
Off1, 88888888),
%% Checksum list return value is a primitive binary(). %% Checksum list return value is a primitive binary().
{ok, KludgeBin} = machi_cr_client:checksum_list(C1, File1), {ok, KludgeBin} = machi_cr_client:checksum_list(C1, File1),
true = is_binary(KludgeBin), true = is_binary(KludgeBin),
@ -169,8 +177,8 @@ smoke_test2() ->
{ok, {Off10,Size10,File10}} = {ok, {Off10,Size10,File10}} =
machi_cr_client:append_chunk_extra(C1, Prefix, Chunk10, machi_cr_client:append_chunk_extra(C1, Prefix, Chunk10,
Extra10 * Size10), Extra10 * Size10),
{ok, [{_, Off10, Chunk10, _}]} = {ok, {[{_, Off10, Chunk10, _}], []}} =
machi_cr_client:read_chunk(C1, File10, Off10, Size10), machi_cr_client:read_chunk(C1, File10, Off10, Size10, []),
[begin [begin
Offx = Off10 + (Seq * Size10), Offx = Off10 + (Seq * Size10),
%% TODO: uncomment written/not_written enforcement is available. %% TODO: uncomment written/not_written enforcement is available.
@ -178,8 +186,8 @@ smoke_test2() ->
%% Offx, Size10), %% Offx, Size10),
{ok, {Offx,Size10,File10}} = {ok, {Offx,Size10,File10}} =
machi_cr_client:write_chunk(C1, File10, Offx, Chunk10), machi_cr_client:write_chunk(C1, File10, Offx, Chunk10),
{ok, [{_, Offx, Chunk10, _}]} = {ok, {[{_, Offx, Chunk10, _}], []}} =
machi_cr_client:read_chunk(C1, File10, Offx, Size10) machi_cr_client:read_chunk(C1, File10, Offx, Size10, [])
end || Seq <- lists:seq(1, Extra10)], end || Seq <- lists:seq(1, Extra10)],
{ok, {Off11,Size11,File11}} = {ok, {Off11,Size11,File11}} =
machi_cr_client:append_chunk(C1, Prefix, Chunk10), machi_cr_client:append_chunk(C1, Prefix, Chunk10),
@ -222,7 +230,8 @@ witness_smoke_test2() ->
Chunk1_badcs = {<<?CSUM_TAG_CLIENT_SHA:8, 0:(8*20)>>, Chunk1}, Chunk1_badcs = {<<?CSUM_TAG_CLIENT_SHA:8, 0:(8*20)>>, Chunk1},
{error, bad_checksum} = {error, bad_checksum} =
machi_cr_client:append_chunk(C1, Prefix, Chunk1_badcs), machi_cr_client:append_chunk(C1, Prefix, Chunk1_badcs),
{ok, [{_, Off1, Chunk1, _}]} = machi_cr_client:read_chunk(C1, File1, Off1, Size1), {ok, {[{_, Off1, Chunk1, _}], []}} =
machi_cr_client:read_chunk(C1, File1, Off1, Size1, []),
%% Stop 'b' and let the chain reset. %% Stop 'b' and let the chain reset.
ok = machi_flu_psup:stop_flu_package(b), ok = machi_flu_psup:stop_flu_package(b),
@ -248,8 +257,8 @@ witness_smoke_test2() ->
end, end,
%% Chunk1 is still readable: not affected by wedged witness head. %% Chunk1 is still readable: not affected by wedged witness head.
{ok, [{_, Off1, Chunk1, _}]} = {ok, {[{_, Off1, Chunk1, _}], []}} =
machi_cr_client:read_chunk(C1, File1, Off1, Size1), machi_cr_client:read_chunk(C1, File1, Off1, Size1, []),
%% But because the head is wedged, an append will fail. %% But because the head is wedged, an append will fail.
{error, partition} = {error, partition} =
machi_cr_client:append_chunk(C1, Prefix, Chunk1, 1*1000), machi_cr_client:append_chunk(C1, Prefix, Chunk1, 1*1000),

View file

@ -202,8 +202,9 @@ read_next(S, _Res, _Args) -> S.
read(Pid, Offset, Length) -> read(Pid, Offset, Length) ->
case machi_file_proxy:read(Pid, Offset, Length) of case machi_file_proxy:read(Pid, Offset, Length) of
{ok, Chunks} -> {ok, {Chunks, _}} ->
[{_, Offset, Data, Csum}] = machi_cr_client:trim_both_side(Chunks, Offset, Offset+Length), [{_, Offset, Data, Csum}] =
machi_cr_client:trim_both_side(Chunks, Offset, Offset+Length),
{ok, Data, Csum}; {ok, Data, Csum};
E -> E ->
E E

View file

@ -80,22 +80,22 @@ machi_file_proxy_test_() ->
clean_up_data_dir(?TESTDIR), clean_up_data_dir(?TESTDIR),
{ok, Pid} = machi_file_proxy:start_link("test", ?TESTDIR), {ok, Pid} = machi_file_proxy:start_link("test", ?TESTDIR),
[ [
?_assertEqual({error, bad_arg}, machi_file_proxy:read(Pid, -1, -1)), ?_assertEqual({error, bad_arg}, machi_file_proxy:read(Pid, -1, -1)),
?_assertEqual({error, bad_arg}, machi_file_proxy:write(Pid, -1, <<"yo">>)), ?_assertEqual({error, bad_arg}, machi_file_proxy:write(Pid, -1, <<"yo">>)),
?_assertEqual({error, bad_arg}, machi_file_proxy:append(Pid, [], -1, <<"krep">>)), ?_assertEqual({error, bad_arg}, machi_file_proxy:append(Pid, [], -1, <<"krep">>)),
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1, 1)), ?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1, 1)),
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1024, 1)), ?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1024, 1)),
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1, 1024)), ?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1, 1024)),
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1024, ?HYOOGE)), ?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1024, ?HYOOGE)),
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, ?HYOOGE, 1)), ?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, ?HYOOGE, 1)),
?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1, random_binary(0, ?HYOOGE))), ?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1, random_binary(0, ?HYOOGE))),
?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, random_binary(0, 1024))), ?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, random_binary(0, 1024))),
?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1024, <<"fail">>)), ?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1024, <<"fail">>)),
?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1, <<"fail">>)), ?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1, <<"fail">>)),
?_assertMatch({ok, [{_, _, _, _}]}, machi_file_proxy:read(Pid, 1025, 1000)), ?_assertMatch({ok, {[{_, _, _, _}], []}}, machi_file_proxy:read(Pid, 1025, 1000)),
?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, [], 1024, <<"mind the gap">>)), ?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, [], 1024, <<"mind the gap">>)),
?_assertEqual(ok, machi_file_proxy:write(Pid, 2060, [], random_binary(0, 1024))), ?_assertEqual(ok, machi_file_proxy:write(Pid, 2060, [], random_binary(0, 1024))),
?_assertException(exit, {normal, _}, machi_file_proxy:stop(Pid)) ?_assertException(exit, {normal, _}, machi_file_proxy:stop(Pid))
]. ].
multiple_chunks_read_test_() -> multiple_chunks_read_test_() ->
@ -108,11 +108,11 @@ multiple_chunks_read_test_() ->
?_assertEqual(ok, machi_file_proxy:write(Pid, 30000, <<"fail">>)), ?_assertEqual(ok, machi_file_proxy:write(Pid, 30000, <<"fail">>)),
%% Freeza %% Freeza
?_assertEqual(ok, machi_file_proxy:write(Pid, 530000, <<"fail">>)), ?_assertEqual(ok, machi_file_proxy:write(Pid, 530000, <<"fail">>)),
?_assertMatch({ok, [{"test", 1024, _, _}, ?_assertMatch({ok, {[{"test", 1024, _, _},
{"test", 10000, <<"fail">>, _}, {"test", 10000, <<"fail">>, _},
{"test", 20000, <<"fail">>, _}, {"test", 20000, <<"fail">>, _},
{"test", 30000, <<"fail">>, _}, {"test", 30000, <<"fail">>, _},
{"test", 530000, <<"fail">>, _}]}, {"test", 530000, <<"fail">>, _}], []}},
machi_file_proxy:read(Pid, 1024, 530000)), machi_file_proxy:read(Pid, 1024, 530000)),
?_assertException(exit, {normal, _}, machi_file_proxy:stop(Pid)) ?_assertException(exit, {normal, _}, machi_file_proxy:stop(Pid))
]. ].

View file

@ -97,8 +97,8 @@ flu_smoke_test() ->
{ok, {Off1,Len1,File1}} = ?FLU_C:append_chunk(Host, TcpPort, {ok, {Off1,Len1,File1}} = ?FLU_C:append_chunk(Host, TcpPort,
?DUMMY_PV1_EPOCH, ?DUMMY_PV1_EPOCH,
Prefix, Chunk1), Prefix, Chunk1),
{ok, [{_, Off1, Chunk1, _}]} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH, {ok, {[{_, Off1, Chunk1, _}], _}} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH,
File1, Off1, Len1), File1, Off1, Len1, []),
{ok, KludgeBin} = ?FLU_C:checksum_list(Host, TcpPort, {ok, KludgeBin} = ?FLU_C:checksum_list(Host, TcpPort,
?DUMMY_PV1_EPOCH, File1), ?DUMMY_PV1_EPOCH, File1),
true = is_binary(KludgeBin), true = is_binary(KludgeBin),
@ -109,7 +109,7 @@ flu_smoke_test() ->
Len1 = size(Chunk1), Len1 = size(Chunk1),
{error, not_written} = ?FLU_C:read_chunk(Host, TcpPort, {error, not_written} = ?FLU_C:read_chunk(Host, TcpPort,
?DUMMY_PV1_EPOCH, ?DUMMY_PV1_EPOCH,
File1, Off1*983829323, Len1), File1, Off1*983829323, Len1, []),
%% XXX FIXME %% XXX FIXME
%% %%
%% This is failing because the read extends past the end of the file. %% This is failing because the read extends past the end of the file.
@ -151,14 +151,14 @@ flu_smoke_test() ->
File2, Off2, Chunk2), File2, Off2, Chunk2),
{error, bad_arg} = ?FLU_C:write_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH, {error, bad_arg} = ?FLU_C:write_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH,
BadFile, Off2, Chunk2), BadFile, Off2, Chunk2),
{ok, [{_, Off2, Chunk2, _}]} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH, {ok, {[{_, Off2, Chunk2, _}], _}} =
File2, Off2, Len2), ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH, File2, Off2, Len2, []),
{error, bad_arg} = ?FLU_C:read_chunk(Host, TcpPort, {error, bad_arg} = ?FLU_C:read_chunk(Host, TcpPort,
?DUMMY_PV1_EPOCH, ?DUMMY_PV1_EPOCH,
"no!!", Off2, Len2), "no!!", Off2, Len2, []),
{error, bad_arg} = ?FLU_C:read_chunk(Host, TcpPort, {error, bad_arg} = ?FLU_C:read_chunk(Host, TcpPort,
?DUMMY_PV1_EPOCH, ?DUMMY_PV1_EPOCH,
BadFile, Off2, Len2), BadFile, Off2, Len2, []),
%% We know that File1 still exists. Pretend that we've done a %% We know that File1 still exists. Pretend that we've done a
%% migration and exercise the delete_migration() API. %% migration and exercise the delete_migration() API.
@ -261,7 +261,7 @@ witness_test() ->
Prefix, Chunk1), Prefix, Chunk1),
File = <<"foofile">>, File = <<"foofile">>,
{error, bad_arg} = ?FLU_C:read_chunk(Host, TcpPort, EpochID1, {error, bad_arg} = ?FLU_C:read_chunk(Host, TcpPort, EpochID1,
File, 9999, 9999), File, 9999, 9999, []),
{error, bad_arg} = ?FLU_C:checksum_list(Host, TcpPort, EpochID1, {error, bad_arg} = ?FLU_C:checksum_list(Host, TcpPort, EpochID1,
File), File),
{error, bad_arg} = ?FLU_C:list_files(Host, TcpPort, EpochID1), {error, bad_arg} = ?FLU_C:list_files(Host, TcpPort, EpochID1),

View file

@ -146,7 +146,7 @@ partial_stop_restart2() ->
{_, #p_srvr{address=Addr_a, port=TcpPort_a}} = hd(Ps), {_, #p_srvr{address=Addr_a, port=TcpPort_a}} = hd(Ps),
{error, wedged} = machi_flu1_client:read_chunk( {error, wedged} = machi_flu1_client:read_chunk(
Addr_a, TcpPort_a, ?DUMMY_PV1_EPOCH, Addr_a, TcpPort_a, ?DUMMY_PV1_EPOCH,
<<>>, 99999999, 1), <<>>, 99999999, 1, []),
{error, wedged} = machi_flu1_client:checksum_list( {error, wedged} = machi_flu1_client:checksum_list(
Addr_a, TcpPort_a, ?DUMMY_PV1_EPOCH, <<>>), Addr_a, TcpPort_a, ?DUMMY_PV1_EPOCH, <<>>),
%% list_files() is permitted despite wedged status %% list_files() is permitted despite wedged status

View file

@ -81,8 +81,8 @@ smoke_test2() ->
{iolist_to_binary(Chunk3), File3, Off3, Size3}], {iolist_to_binary(Chunk3), File3, Off3, Size3}],
[begin [begin
File = binary_to_list(Fl), File = binary_to_list(Fl),
?assertMatch({ok, [{File, Off, Ch, _}]}, ?assertMatch({ok, {[{File, Off, Ch, _}], []}},
?C:read_chunk(Clnt, Fl, Off, Sz)) ?C:read_chunk(Clnt, Fl, Off, Sz, []))
end || {Ch, Fl, Off, Sz} <- Reads], end || {Ch, Fl, Off, Sz} <- Reads],
{ok, KludgeBin} = ?C:checksum_list(Clnt, File1), {ok, KludgeBin} = ?C:checksum_list(Clnt, File1),

View file

@ -60,14 +60,14 @@ api_smoke_test() ->
{ok, {MyOff,MySize,MyFile}} = {ok, {MyOff,MySize,MyFile}} =
?MUT:append_chunk(Prox1, FakeEpoch, Prefix, MyChunk, ?MUT:append_chunk(Prox1, FakeEpoch, Prefix, MyChunk,
infinity), infinity),
{ok, [{_, MyOff, MyChunk, _}]} = {ok, {[{_, MyOff, MyChunk, _}], []}} =
?MUT:read_chunk(Prox1, FakeEpoch, MyFile, MyOff, MySize), ?MUT:read_chunk(Prox1, FakeEpoch, MyFile, MyOff, MySize, []),
MyChunk2 = <<"my chunk data, yeah, again">>, MyChunk2 = <<"my chunk data, yeah, again">>,
{ok, {MyOff2,MySize2,MyFile2}} = {ok, {MyOff2,MySize2,MyFile2}} =
?MUT:append_chunk_extra(Prox1, FakeEpoch, Prefix, ?MUT:append_chunk_extra(Prox1, FakeEpoch, Prefix,
MyChunk2, 4242, infinity), MyChunk2, 4242, infinity),
{ok, [{_, MyOff2, MyChunk2, _}]} = {ok, {[{_, MyOff2, MyChunk2, _}], []}} =
?MUT:read_chunk(Prox1, FakeEpoch, MyFile2, MyOff2, MySize2), ?MUT:read_chunk(Prox1, FakeEpoch, MyFile2, MyOff2, MySize2, []),
MyChunk_badcs = {<<?CSUM_TAG_CLIENT_SHA:8, 0:(8*20)>>, MyChunk}, MyChunk_badcs = {<<?CSUM_TAG_CLIENT_SHA:8, 0:(8*20)>>, MyChunk},
{error, bad_checksum} = ?MUT:append_chunk(Prox1, FakeEpoch, {error, bad_checksum} = ?MUT:append_chunk(Prox1, FakeEpoch,
Prefix, MyChunk_badcs), Prefix, MyChunk_badcs),
@ -245,13 +245,13 @@ flu_restart_test2() ->
(stop) -> ?MUT:append_chunk_extra(Prox1, FakeEpoch, (stop) -> ?MUT:append_chunk_extra(Prox1, FakeEpoch,
<<"prefix">>, Data, 42, infinity) <<"prefix">>, Data, 42, infinity)
end, end,
fun(run) -> {ok, [{_, Off1, Data, _}]} = fun(run) -> {ok, {[{_, Off1, Data, _}], []}} =
?MUT:read_chunk(Prox1, FakeEpoch, ?MUT:read_chunk(Prox1, FakeEpoch,
File1, Off1, Size1), File1, Off1, Size1, []),
ok; ok;
(line) -> io:format("line ~p, ", [?LINE]); (line) -> io:format("line ~p, ", [?LINE]);
(stop) -> ?MUT:read_chunk(Prox1, FakeEpoch, (stop) -> ?MUT:read_chunk(Prox1, FakeEpoch,
File1, Off1, Size1) File1, Off1, Size1, [])
end, end,
fun(run) -> {ok, KludgeBin} = fun(run) -> {ok, KludgeBin} =
?MUT:checksum_list(Prox1, FakeEpoch, File1), ?MUT:checksum_list(Prox1, FakeEpoch, File1),