Update read_chunk() PB protocol to return trimmed chunks #25
18 changed files with 285 additions and 178 deletions
|
@ -205,11 +205,15 @@ message Mpb_ReadChunkReq {
|
||||||
// only makes sense if flag_no_checksum is not set).
|
// only makes sense if flag_no_checksum is not set).
|
||||||
// TODO: not implemented yet.
|
// TODO: not implemented yet.
|
||||||
optional uint32 flag_no_chunk = 3 [default=0];
|
optional uint32 flag_no_chunk = 3 [default=0];
|
||||||
|
|
||||||
|
// TODO: not implemented yet.
|
||||||
|
optional uint32 flag_needs_trimmed = 4 [default=0];
|
||||||
}
|
}
|
||||||
|
|
||||||
message Mpb_ReadChunkResp {
|
message Mpb_ReadChunkResp {
|
||||||
required Mpb_GeneralStatusCode status = 1;
|
required Mpb_GeneralStatusCode status = 1;
|
||||||
repeated Mpb_Chunk chunks = 2;
|
repeated Mpb_Chunk chunks = 2;
|
||||||
|
repeated Mpb_ChunkPos trimmed = 3;
|
||||||
}
|
}
|
||||||
|
|
||||||
// High level API: trim_chunk() request & response
|
// High level API: trim_chunk() request & response
|
||||||
|
@ -410,11 +414,14 @@ message Mpb_LL_ReadChunkReq {
|
||||||
// only makes sense if flag_checksum is not set).
|
// only makes sense if flag_checksum is not set).
|
||||||
// TODO: not implemented yet.
|
// TODO: not implemented yet.
|
||||||
optional uint32 flag_no_chunk = 4 [default=0];
|
optional uint32 flag_no_chunk = 4 [default=0];
|
||||||
|
|
||||||
|
optional uint32 flag_needs_trimmed = 5 [default=0];
|
||||||
}
|
}
|
||||||
|
|
||||||
message Mpb_LL_ReadChunkResp {
|
message Mpb_LL_ReadChunkResp {
|
||||||
required Mpb_GeneralStatusCode status = 1;
|
required Mpb_GeneralStatusCode status = 1;
|
||||||
repeated Mpb_Chunk chunks = 2;
|
repeated Mpb_Chunk chunks = 2;
|
||||||
|
repeated Mpb_ChunkPos trimmed = 3;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Low level API: checksum_list()
|
// Low level API: checksum_list()
|
||||||
|
|
|
@ -76,7 +76,7 @@ verify_file_checksums_local2(Sock1, EpochID, Path0) ->
|
||||||
ReadChunk = fun(F, Offset, Size) ->
|
ReadChunk = fun(F, Offset, Size) ->
|
||||||
case file:pread(FH, Offset, Size) of
|
case file:pread(FH, Offset, Size) of
|
||||||
{ok, Bin} ->
|
{ok, Bin} ->
|
||||||
{ok, [{F, Offset, Bin, undefined}]};
|
{ok, {[{F, Offset, Bin, undefined}], []}};
|
||||||
Err ->
|
Err ->
|
||||||
Err
|
Err
|
||||||
end
|
end
|
||||||
|
@ -92,7 +92,7 @@ verify_file_checksums_local2(Sock1, EpochID, Path0) ->
|
||||||
verify_file_checksums_remote2(Sock1, EpochID, File) ->
|
verify_file_checksums_remote2(Sock1, EpochID, File) ->
|
||||||
ReadChunk = fun(File_name, Offset, Size) ->
|
ReadChunk = fun(File_name, Offset, Size) ->
|
||||||
?FLU_C:read_chunk(Sock1, EpochID,
|
?FLU_C:read_chunk(Sock1, EpochID,
|
||||||
File_name, Offset, Size)
|
File_name, Offset, Size, [])
|
||||||
end,
|
end,
|
||||||
verify_file_checksums_common(Sock1, EpochID, File, ReadChunk).
|
verify_file_checksums_common(Sock1, EpochID, File, ReadChunk).
|
||||||
|
|
||||||
|
@ -117,7 +117,7 @@ verify_file_checksums_common(Sock1, EpochID, File, ReadChunk) ->
|
||||||
verify_chunk_checksum(File, ReadChunk) ->
|
verify_chunk_checksum(File, ReadChunk) ->
|
||||||
fun({Offset, Size, <<_Tag:1/binary, CSum/binary>>}, Acc) ->
|
fun({Offset, Size, <<_Tag:1/binary, CSum/binary>>}, Acc) ->
|
||||||
case ReadChunk(File, Offset, Size) of
|
case ReadChunk(File, Offset, Size) of
|
||||||
{ok, [{_, Offset, Chunk, _}]} ->
|
{ok, {[{_, Offset, Chunk, _}], _}} ->
|
||||||
CSum2 = machi_util:checksum_chunk(Chunk),
|
CSum2 = machi_util:checksum_chunk(Chunk),
|
||||||
if CSum == CSum2 ->
|
if CSum == CSum2 ->
|
||||||
Acc;
|
Acc;
|
||||||
|
|
|
@ -324,7 +324,8 @@ execute_repair_directive({File, Cmds}, {ProxiesDict, EpochID, Verb, ETS}=Acc) ->
|
||||||
_ -> ok
|
_ -> ok
|
||||||
end,
|
end,
|
||||||
_T1 = os:timestamp(),
|
_T1 = os:timestamp(),
|
||||||
{ok, [{_, Offset, Chunk, _}]} =
|
%% TODO: support case multiple written or trimmed chunks returned
|
||||||
|
{ok, {[{_, Offset, Chunk, _}], _}} =
|
||||||
machi_proxy_flu1_client:read_chunk(
|
machi_proxy_flu1_client:read_chunk(
|
||||||
SrcP, EpochID, File, Offset, Size,
|
SrcP, EpochID, File, Offset, Size,
|
||||||
?SHORT_TIMEOUT),
|
?SHORT_TIMEOUT),
|
||||||
|
|
|
@ -122,7 +122,7 @@
|
||||||
append_chunk/3, append_chunk/4,
|
append_chunk/3, append_chunk/4,
|
||||||
append_chunk_extra/4, append_chunk_extra/5,
|
append_chunk_extra/4, append_chunk_extra/5,
|
||||||
write_chunk/4, write_chunk/5,
|
write_chunk/4, write_chunk/5,
|
||||||
read_chunk/4, read_chunk/5,
|
read_chunk/5, read_chunk/6,
|
||||||
trim_chunk/4, trim_chunk/5,
|
trim_chunk/4, trim_chunk/5,
|
||||||
checksum_list/2, checksum_list/3,
|
checksum_list/2, checksum_list/3,
|
||||||
list_files/1, list_files/2,
|
list_files/1, list_files/2,
|
||||||
|
@ -201,14 +201,14 @@ write_chunk(PidSpec, File, Offset, Chunk, Timeout0) ->
|
||||||
|
|
||||||
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
|
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
|
||||||
|
|
||||||
read_chunk(PidSpec, File, Offset, Size) ->
|
read_chunk(PidSpec, File, Offset, Size, Opts) ->
|
||||||
read_chunk(PidSpec, File, Offset, Size, ?DEFAULT_TIMEOUT).
|
read_chunk(PidSpec, File, Offset, Size, Opts, ?DEFAULT_TIMEOUT).
|
||||||
|
|
||||||
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
|
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
|
||||||
|
|
||||||
read_chunk(PidSpec, File, Offset, Size, Timeout0) ->
|
read_chunk(PidSpec, File, Offset, Size, Opts, Timeout0) ->
|
||||||
{TO, Timeout} = timeout(Timeout0),
|
{TO, Timeout} = timeout(Timeout0),
|
||||||
gen_server:call(PidSpec, {req, {read_chunk, File, Offset, Size, TO}},
|
gen_server:call(PidSpec, {req, {read_chunk, File, Offset, Size, Opts, TO}},
|
||||||
Timeout).
|
Timeout).
|
||||||
|
|
||||||
%% @doc Trim a chunk of data of size `Size' from `File' at `Offset'.
|
%% @doc Trim a chunk of data of size `Size' from `File' at `Offset'.
|
||||||
|
@ -288,8 +288,8 @@ handle_call2({append_chunk_extra, Prefix, Chunk, ChunkExtra, TO}, _From, S) ->
|
||||||
do_append_head(Prefix, Chunk, ChunkExtra, 0, os:timestamp(), TO, S);
|
do_append_head(Prefix, Chunk, ChunkExtra, 0, os:timestamp(), TO, S);
|
||||||
handle_call2({write_chunk, File, Offset, Chunk, TO}, _From, S) ->
|
handle_call2({write_chunk, File, Offset, Chunk, TO}, _From, S) ->
|
||||||
do_write_head(File, Offset, Chunk, 0, os:timestamp(), TO, S);
|
do_write_head(File, Offset, Chunk, 0, os:timestamp(), TO, S);
|
||||||
handle_call2({read_chunk, File, Offset, Size, TO}, _From, S) ->
|
handle_call2({read_chunk, File, Offset, Size, Opts, TO}, _From, S) ->
|
||||||
do_read_chunk(File, Offset, Size, 0, os:timestamp(), TO, S);
|
do_read_chunk(File, Offset, Size, Opts, 0, os:timestamp(), TO, S);
|
||||||
handle_call2({trim_chunk, File, Offset, Size, TO}, _From, S) ->
|
handle_call2({trim_chunk, File, Offset, Size, TO}, _From, S) ->
|
||||||
do_trim_chunk(File, Offset, Size, 0, os:timestamp(), TO, S);
|
do_trim_chunk(File, Offset, Size, 0, os:timestamp(), TO, S);
|
||||||
handle_call2({checksum_list, File, TO}, _From, S) ->
|
handle_call2({checksum_list, File, TO}, _From, S) ->
|
||||||
|
@ -503,11 +503,10 @@ do_write_head2(File, Offset, Chunk, Depth, STime, TO,
|
||||||
iolist_size(Chunk)})
|
iolist_size(Chunk)})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_read_chunk(File, Offset, Size, 0=Depth, STime, TO,
|
do_read_chunk(File, Offset, Size, Opts, 0=Depth, STime, TO,
|
||||||
#state{proj=#projection_v1{upi=[_|_]}}=S) -> % UPI is non-empty
|
#state{proj=#projection_v1{upi=[_|_]}}=S) -> % UPI is non-empty
|
||||||
do_read_chunk2(File, Offset, Size, Depth + 1, STime, TO, S);
|
do_read_chunk2(File, Offset, Size, Opts, Depth + 1, STime, TO, S);
|
||||||
do_read_chunk(File, Offset, Size, Depth, STime, TO, #state{proj=P}=S) ->
|
do_read_chunk(File, Offset, Size, Opts, Depth, STime, TO, #state{proj=P}=S) ->
|
||||||
%% io:format(user, "read sleep1,", []),
|
|
||||||
sleep_a_while(Depth),
|
sleep_a_while(Depth),
|
||||||
DiffMs = timer:now_diff(os:timestamp(), STime) div 1000,
|
DiffMs = timer:now_diff(os:timestamp(), STime) div 1000,
|
||||||
if DiffMs > TO ->
|
if DiffMs > TO ->
|
||||||
|
@ -517,22 +516,22 @@ do_read_chunk(File, Offset, Size, Depth, STime, TO, #state{proj=P}=S) ->
|
||||||
case S2#state.proj of
|
case S2#state.proj of
|
||||||
P2 when P2 == undefined orelse
|
P2 when P2 == undefined orelse
|
||||||
P2#projection_v1.upi == [] ->
|
P2#projection_v1.upi == [] ->
|
||||||
do_read_chunk(File, Offset, Size, Depth + 1, STime, TO, S2);
|
do_read_chunk(File, Offset, Size, Opts, Depth + 1, STime, TO, S2);
|
||||||
_ ->
|
_ ->
|
||||||
do_read_chunk2(File, Offset, Size, Depth + 1, STime, TO, S2)
|
do_read_chunk2(File, Offset, Size, Opts, Depth + 1, STime, TO, S2)
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_read_chunk2(File, Offset, Size, Depth, STime, TO,
|
do_read_chunk2(File, Offset, Size, Opts, Depth, STime, TO,
|
||||||
#state{proj=P, epoch_id=EpochID, proxies_dict=PD}=S) ->
|
#state{proj=P, epoch_id=EpochID, proxies_dict=PD}=S) ->
|
||||||
UPI = readonly_flus(P),
|
UPI = readonly_flus(P),
|
||||||
Tail = lists:last(UPI),
|
Tail = lists:last(UPI),
|
||||||
ConsistencyMode = P#projection_v1.mode,
|
ConsistencyMode = P#projection_v1.mode,
|
||||||
case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), EpochID,
|
case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), EpochID,
|
||||||
File, Offset, Size, ?TIMEOUT) of
|
File, Offset, Size, Opts, ?TIMEOUT) of
|
||||||
{ok, Chunks0} when is_list(Chunks0) ->
|
{ok, {Chunks0, []}} when is_list(Chunks0) ->
|
||||||
Chunks = trim_both_side(Chunks0, Offset, Offset + Size),
|
Chunks = trim_both_side(Chunks0, Offset, Offset + Size),
|
||||||
{reply, {ok, Chunks}, S};
|
{reply, {ok, {Chunks, []}}, S};
|
||||||
%% {ok, BadChunk} ->
|
%% {ok, BadChunk} ->
|
||||||
%% %% TODO cleaner handling of bad chunks
|
%% %% TODO cleaner handling of bad chunks
|
||||||
%% exit({todo, bad_chunk_size, ?MODULE, ?LINE, File, Offset, Size,
|
%% exit({todo, bad_chunk_size, ?MODULE, ?LINE, File, Offset, Size,
|
||||||
|
@ -546,7 +545,7 @@ do_read_chunk2(File, Offset, Size, Depth, STime, TO,
|
||||||
{reply, BadCS, S};
|
{reply, BadCS, S};
|
||||||
{error, Retry}
|
{error, Retry}
|
||||||
when Retry == partition; Retry == bad_epoch; Retry == wedged ->
|
when Retry == partition; Retry == bad_epoch; Retry == wedged ->
|
||||||
do_read_chunk(File, Offset, Size, Depth, STime, TO, S);
|
do_read_chunk(File, Offset, Size, Opts, Depth, STime, TO, S);
|
||||||
{error, not_written} ->
|
{error, not_written} ->
|
||||||
read_repair(ConsistencyMode, read, File, Offset, Size, Depth, STime, S);
|
read_repair(ConsistencyMode, read, File, Offset, Size, Depth, STime, S);
|
||||||
%% {reply, {error, not_written}, S};
|
%% {reply, {error, not_written}, S};
|
||||||
|
@ -624,10 +623,12 @@ read_repair2(ap_mode=ConsistencyMode,
|
||||||
#state{proj=P}=S) ->
|
#state{proj=P}=S) ->
|
||||||
Eligible = mutation_flus(P),
|
Eligible = mutation_flus(P),
|
||||||
case try_to_find_chunk(Eligible, File, Offset, Size, S) of
|
case try_to_find_chunk(Eligible, File, Offset, Size, S) of
|
||||||
{ok, Chunks, GotItFrom} when is_list(Chunks) ->
|
{ok, {Chunks, _Trimmed}, GotItFrom} when is_list(Chunks) ->
|
||||||
ToRepair = mutation_flus(P) -- [GotItFrom],
|
ToRepair = mutation_flus(P) -- [GotItFrom],
|
||||||
{Reply, S1} = do_repair_chunks(Chunks, ToRepair, ReturnMode, [GotItFrom],
|
{Reply0, S1} = do_repair_chunks(Chunks, ToRepair, ReturnMode, [GotItFrom],
|
||||||
File, Depth, STime, S, {ok, Chunks}),
|
File, Depth, STime, S, {ok, Chunks}),
|
||||||
|
{ok, Chunks} = Reply0,
|
||||||
|
Reply = {ok, {Chunks, _Trimmed}},
|
||||||
{reply, Reply, S1};
|
{reply, Reply, S1};
|
||||||
{error, bad_checksum}=BadCS ->
|
{error, bad_checksum}=BadCS ->
|
||||||
%% TODO: alternate strategy?
|
%% TODO: alternate strategy?
|
||||||
|
@ -818,7 +819,7 @@ update_proj2(Count, #state{bad_proj=BadProj, proxies_dict=ProxiesDict}=S) ->
|
||||||
run_middleworker_job(Fun, ArgList, WTimeout) ->
|
run_middleworker_job(Fun, ArgList, WTimeout) ->
|
||||||
Parent = self(),
|
Parent = self(),
|
||||||
MiddleWorker =
|
MiddleWorker =
|
||||||
spawn(fun() ->
|
spawn_link(fun() ->
|
||||||
PidsMons =
|
PidsMons =
|
||||||
[spawn_monitor(fun() ->
|
[spawn_monitor(fun() ->
|
||||||
Res = (catch Fun(Arg)),
|
Res = (catch Fun(Arg)),
|
||||||
|
@ -859,17 +860,19 @@ try_to_find_chunk(Eligible, File, Offset, Size,
|
||||||
Work = fun(FLU) ->
|
Work = fun(FLU) ->
|
||||||
Proxy = orddict:fetch(FLU, PD),
|
Proxy = orddict:fetch(FLU, PD),
|
||||||
case ?FLU_PC:read_chunk(Proxy, EpochID,
|
case ?FLU_PC:read_chunk(Proxy, EpochID,
|
||||||
File, Offset, Size) of
|
%% TODO Trimmed is required here
|
||||||
{ok, Chunks} when is_list(Chunks) ->
|
File, Offset, Size, []) of
|
||||||
{FLU, {ok, Chunks}};
|
{ok, {_Chunks, _} = ChunksAndTrimmed} ->
|
||||||
|
{FLU, {ok, ChunksAndTrimmed}};
|
||||||
Else ->
|
Else ->
|
||||||
{FLU, Else}
|
{FLU, Else}
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
Rs = run_middleworker_job(Work, Eligible, Timeout),
|
Rs = run_middleworker_job(Work, Eligible, Timeout),
|
||||||
case [X || {_, {ok, [{_,_,B,_}]}}=X <- Rs, is_binary(B)] of
|
|
||||||
[{FoundFLU, {ok, Chunk}}|_] ->
|
case [X || {_Fluname, {ok, {[{_,_,B,_}], _}}}=X <- Rs, is_binary(B)] of
|
||||||
{ok, Chunk, FoundFLU};
|
[{FoundFLU, {ok, ChunkAndTrimmed}}|_] ->
|
||||||
|
{ok, ChunkAndTrimmed, FoundFLU};
|
||||||
[] ->
|
[] ->
|
||||||
RetryErrs = [partition, bad_epoch, wedged],
|
RetryErrs = [partition, bad_epoch, wedged],
|
||||||
case [Err || {error, Err} <- Rs, lists:member(Err, RetryErrs)] of
|
case [Err || {error, Err} <- Rs, lists:member(Err, RetryErrs)] of
|
||||||
|
|
|
@ -52,6 +52,7 @@
|
||||||
sync/1,
|
sync/1,
|
||||||
sync/2,
|
sync/2,
|
||||||
read/3,
|
read/3,
|
||||||
|
read/4,
|
||||||
write/3,
|
write/3,
|
||||||
write/4,
|
write/4,
|
||||||
append/2,
|
append/2,
|
||||||
|
@ -83,9 +84,9 @@
|
||||||
wedged = false :: boolean(),
|
wedged = false :: boolean(),
|
||||||
csum_file :: string()|undefined,
|
csum_file :: string()|undefined,
|
||||||
csum_path :: string()|undefined,
|
csum_path :: string()|undefined,
|
||||||
eof_position = 0 :: non_neg_integer(),
|
|
||||||
data_filehandle :: file:io_device(),
|
data_filehandle :: file:io_device(),
|
||||||
csum_table :: machi_csum_table:table(),
|
csum_table :: machi_csum_table:table(),
|
||||||
|
eof_position = 0 :: non_neg_integer(),
|
||||||
tref :: reference(), %% timer ref
|
tref :: reference(), %% timer ref
|
||||||
ticks = 0 :: non_neg_integer(), %% ticks elapsed with no new operations
|
ticks = 0 :: non_neg_integer(), %% ticks elapsed with no new operations
|
||||||
ops = 0 :: non_neg_integer(), %% sum of all ops
|
ops = 0 :: non_neg_integer(), %% sum of all ops
|
||||||
|
@ -135,11 +136,22 @@ sync(_Pid, Type) ->
|
||||||
{ok, [{Filename::string(), Offset :: non_neg_integer(),
|
{ok, [{Filename::string(), Offset :: non_neg_integer(),
|
||||||
Data :: binary(), Checksum :: binary()}]} |
|
Data :: binary(), Checksum :: binary()}]} |
|
||||||
{error, Reason :: term()}.
|
{error, Reason :: term()}.
|
||||||
read(Pid, Offset, Length) when is_pid(Pid) andalso is_integer(Offset) andalso Offset >= 0
|
read(Pid, Offset, Length) ->
|
||||||
andalso is_integer(Length) andalso Length > 0 ->
|
read(Pid, Offset, Length, []).
|
||||||
gen_server:call(Pid, {read, Offset, Length}, ?TIMEOUT);
|
|
||||||
read(_Pid, Offset, Length) ->
|
-spec read(Pid :: pid(),
|
||||||
lager:warning("Bad args to read: Offset ~p, Length ~p", [Offset, Length]),
|
Offset :: non_neg_integer(),
|
||||||
|
Length :: non_neg_integer(),
|
||||||
|
[{no_checksum|no_chunk|needs_trimmed, boolean()}]) ->
|
||||||
|
{ok, [{Filename::string(), Offset :: non_neg_integer(),
|
||||||
|
Data :: binary(), Checksum :: binary()}]} |
|
||||||
|
{error, Reason :: term()}.
|
||||||
|
read(Pid, Offset, Length, Opts) when is_pid(Pid) andalso is_integer(Offset) andalso Offset >= 0
|
||||||
|
andalso is_integer(Length) andalso Length > 0
|
||||||
|
andalso is_list(Opts) ->
|
||||||
|
gen_server:call(Pid, {read, Offset, Length, Opts}, ?TIMEOUT);
|
||||||
|
read(_Pid, Offset, Length, Opts) ->
|
||||||
|
lager:warning("Bad args to read: Offset ~p, Length ~p, Options ~p", [Offset, Length, Opts]),
|
||||||
{error, bad_arg}.
|
{error, bad_arg}.
|
||||||
|
|
||||||
% @doc Write data at offset
|
% @doc Write data at offset
|
||||||
|
@ -211,8 +223,8 @@ init({Filename, DataDir}) ->
|
||||||
csum_table = CsumTable,
|
csum_table = CsumTable,
|
||||||
tref = Tref,
|
tref = Tref,
|
||||||
eof_position = Eof},
|
eof_position = Eof},
|
||||||
lager:debug("Starting file proxy ~p for filename ~p, state = ~p",
|
lager:debug("Starting file proxy ~p for filename ~p, state = ~p, Eof = ~p",
|
||||||
[self(), Filename, St]),
|
[self(), Filename, St, Eof]),
|
||||||
{ok, St}.
|
{ok, St}.
|
||||||
|
|
||||||
% @private
|
% @private
|
||||||
|
@ -250,13 +262,13 @@ handle_call({sync, all}, _From, State = #state{filename = F,
|
||||||
|
|
||||||
%%% READS
|
%%% READS
|
||||||
|
|
||||||
handle_call({read, _Offset, _Length}, _From,
|
handle_call({read, _Offset, _Length, _}, _From,
|
||||||
State = #state{wedged = true,
|
State = #state{wedged = true,
|
||||||
reads = {T, Err}
|
reads = {T, Err}
|
||||||
}) ->
|
}) ->
|
||||||
{reply, {error, wedged}, State#state{writes = {T + 1, Err + 1}}};
|
{reply, {error, wedged}, State#state{writes = {T + 1, Err + 1}}};
|
||||||
|
|
||||||
handle_call({read, Offset, Length}, _From,
|
handle_call({read, Offset, Length, _Opts}, _From,
|
||||||
State = #state{eof_position = Eof,
|
State = #state{eof_position = Eof,
|
||||||
reads = {T, Err}
|
reads = {T, Err}
|
||||||
}) when Offset > Eof ->
|
}) when Offset > Eof ->
|
||||||
|
@ -265,23 +277,28 @@ handle_call({read, Offset, Length}, _From,
|
||||||
[Offset, Length, Eof]),
|
[Offset, Length, Eof]),
|
||||||
{reply, {error, not_written}, State#state{reads = {T + 1, Err + 1}}};
|
{reply, {error, not_written}, State#state{reads = {T + 1, Err + 1}}};
|
||||||
|
|
||||||
handle_call({read, Offset, Length}, _From,
|
handle_call({read, Offset, Length, Opts}, _From,
|
||||||
State = #state{filename = F,
|
State = #state{filename = F,
|
||||||
data_filehandle = FH,
|
data_filehandle = FH,
|
||||||
csum_table = CsumTable,
|
csum_table = CsumTable,
|
||||||
|
eof_position = EofP,
|
||||||
reads = {T, Err}
|
reads = {T, Err}
|
||||||
}) ->
|
}) ->
|
||||||
|
NoChecksum = proplists:get_value(no_checksum, Opts, false),
|
||||||
|
NoChunk = proplists:get_value(no_chunk, Opts, false),
|
||||||
|
NeedsMerge = proplists:get_value(needs_trimmed, Opts, false),
|
||||||
{Resp, NewErr} =
|
{Resp, NewErr} =
|
||||||
case do_read(FH, F, CsumTable, Offset, Length) of
|
case do_read(FH, F, CsumTable, Offset, Length, NoChecksum, NoChunk, NeedsMerge) of
|
||||||
{ok, []} ->
|
{ok, {[], []}} ->
|
||||||
{{error, not_written}, Err + 1};
|
{{error, not_written}, Err + 1};
|
||||||
{ok, Chunks} ->
|
{ok, {Chunks, Trimmed}} ->
|
||||||
%% Kludge to wrap read result in tuples, to support fragmented read
|
%% Kludge to wrap read result in tuples, to support fragmented read
|
||||||
%% XXX FIXME
|
%% XXX FIXME
|
||||||
%% For now we are omiting the checksum data because it blows up
|
%% For now we are omiting the checksum data because it blows up
|
||||||
%% protobufs.
|
%% protobufs.
|
||||||
{{ok, Chunks}, Err};
|
{{ok, {Chunks, Trimmed}}, Err};
|
||||||
Error ->
|
Error ->
|
||||||
|
lager:error("Can't read ~p, ~p at File ~p", [Offset, Length, F]),
|
||||||
{Error, Err + 1}
|
{Error, Err + 1}
|
||||||
end,
|
end,
|
||||||
{reply, Resp, State#state{reads = {T+1, NewErr}}};
|
{reply, Resp, State#state{reads = {T+1, NewErr}}};
|
||||||
|
@ -298,6 +315,7 @@ handle_call({write, Offset, ClientMeta, Data}, _From,
|
||||||
State = #state{filename = F,
|
State = #state{filename = F,
|
||||||
writes = {T, Err},
|
writes = {T, Err},
|
||||||
data_filehandle = FHd,
|
data_filehandle = FHd,
|
||||||
|
eof_position=EofP,
|
||||||
csum_table = CsumTable}) ->
|
csum_table = CsumTable}) ->
|
||||||
|
|
||||||
ClientCsumTag = proplists:get_value(client_csum_tag, ClientMeta, ?CSUM_TAG_NONE),
|
ClientCsumTag = proplists:get_value(client_csum_tag, ClientMeta, ?CSUM_TAG_NONE),
|
||||||
|
@ -318,6 +336,8 @@ handle_call({write, Offset, ClientMeta, Data}, _From,
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
{NewEof, infinity} = lists:last(machi_csum_table:calc_unwritten_bytes(CsumTable)),
|
{NewEof, infinity} = lists:last(machi_csum_table:calc_unwritten_bytes(CsumTable)),
|
||||||
|
lager:debug("Wrote ~p bytes at ~p of file ~p, NewEOF = ~p~n",
|
||||||
|
[iolist_size(Data), Offset, F, NewEof]),
|
||||||
{reply, Resp, State#state{writes = {T+1, NewErr},
|
{reply, Resp, State#state{writes = {T+1, NewErr},
|
||||||
eof_position = NewEof}};
|
eof_position = NewEof}};
|
||||||
|
|
||||||
|
@ -351,15 +371,14 @@ handle_call({append, ClientMeta, Extra, Data}, _From,
|
||||||
ok ->
|
ok ->
|
||||||
{{ok, F, EofP}, Err};
|
{{ok, F, EofP}, Err};
|
||||||
Error ->
|
Error ->
|
||||||
{Error, Err + 1, EofP}
|
{Error, Err + 1}
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
%% TODO: do we check this with calling
|
|
||||||
%% machi_csum_table:calc_unwritten_bytes/1?
|
|
||||||
NewEof = EofP + byte_size(Data) + Extra,
|
NewEof = EofP + byte_size(Data) + Extra,
|
||||||
|
lager:debug("appended ~p bytes at ~p file ~p. NewEofP = ~p",
|
||||||
|
[iolist_size(Data), EofP, F, NewEof]),
|
||||||
{reply, Resp, State#state{appends = {T+1, NewErr},
|
{reply, Resp, State#state{appends = {T+1, NewErr},
|
||||||
eof_position = NewEof
|
eof_position = NewEof}};
|
||||||
}};
|
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
lager:warning("Unknown call: ~p", [Req]),
|
lager:warning("Unknown call: ~p", [Req]),
|
||||||
|
@ -500,7 +519,10 @@ check_or_make_tagged_csum(OtherTag, _ClientCsum, _Data) ->
|
||||||
Filename :: string(),
|
Filename :: string(),
|
||||||
CsumTable :: machi_csum_table:table(),
|
CsumTable :: machi_csum_table:table(),
|
||||||
Offset :: non_neg_integer(),
|
Offset :: non_neg_integer(),
|
||||||
Size :: non_neg_integer()
|
Size :: non_neg_integer(),
|
||||||
|
NoChecksum :: boolean(),
|
||||||
|
NoChunk :: boolean(),
|
||||||
|
NeedsTrimmed :: boolean()
|
||||||
) -> {ok, Chunks :: [{string(), Offset::non_neg_integer(), binary(), Csum :: binary()}]} |
|
) -> {ok, Chunks :: [{string(), Offset::non_neg_integer(), binary(), Csum :: binary()}]} |
|
||||||
{error, bad_checksum} |
|
{error, bad_checksum} |
|
||||||
{error, partial_read} |
|
{error, partial_read} |
|
||||||
|
@ -519,6 +541,9 @@ check_or_make_tagged_csum(OtherTag, _ClientCsum, _Data) ->
|
||||||
% tuple is returned.</ul>
|
% tuple is returned.</ul>
|
||||||
% </li>
|
% </li>
|
||||||
%
|
%
|
||||||
|
do_read(FHd, Filename, CsumTable, Offset, Size, _, _, _) ->
|
||||||
|
do_read(FHd, Filename, CsumTable, Offset, Size).
|
||||||
|
|
||||||
do_read(FHd, Filename, CsumTable, Offset, Size) ->
|
do_read(FHd, Filename, CsumTable, Offset, Size) ->
|
||||||
%% Note that find/3 only returns overlapping chunks, both borders
|
%% Note that find/3 only returns overlapping chunks, both borders
|
||||||
%% are not aligned to original Offset and Size.
|
%% are not aligned to original Offset and Size.
|
||||||
|
@ -526,7 +551,8 @@ do_read(FHd, Filename, CsumTable, Offset, Size) ->
|
||||||
read_all_ranges(FHd, Filename, ChunkCsums, []).
|
read_all_ranges(FHd, Filename, ChunkCsums, []).
|
||||||
|
|
||||||
read_all_ranges(_, _, [], ReadChunks) ->
|
read_all_ranges(_, _, [], ReadChunks) ->
|
||||||
{ok, lists:reverse(ReadChunks)};
|
%% TODO: currently returns empty list of trimmed chunks
|
||||||
|
{ok, {lists:reverse(ReadChunks), []}};
|
||||||
|
|
||||||
read_all_ranges(FHd, Filename, [{Offset, Size, TaggedCsum}|T], ReadChunks) ->
|
read_all_ranges(FHd, Filename, [{Offset, Size, TaggedCsum}|T], ReadChunks) ->
|
||||||
case file:pread(FHd, Offset, Size) of
|
case file:pread(FHd, Offset, Size) of
|
||||||
|
@ -592,12 +618,12 @@ handle_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Data) ->
|
||||||
{error, Reason}
|
{error, Reason}
|
||||||
end;
|
end;
|
||||||
[{Offset, Size, TaggedCsum}] ->
|
[{Offset, Size, TaggedCsum}] ->
|
||||||
case do_read(FHd, Filename, CsumTable, Offset, Size) of
|
case do_read(FHd, Filename, CsumTable, Offset, Size, false, false, false) of
|
||||||
{error, _} = E ->
|
{error, _} = E ->
|
||||||
lager:warning("This should never happen: got ~p while reading at offset ~p in file ~p that's supposedly written",
|
lager:warning("This should never happen: got ~p while reading at offset ~p in file ~p that's supposedly written",
|
||||||
[E, Offset, Filename]),
|
[E, Offset, Filename]),
|
||||||
{error, server_insanity};
|
{error, server_insanity};
|
||||||
{ok, [{_, Offset, Data, TaggedCsum}]} ->
|
{ok, {[{_, Offset, Data, TaggedCsum}], _}} ->
|
||||||
%% TODO: what if different checksum got from do_read()?
|
%% TODO: what if different checksum got from do_read()?
|
||||||
ok;
|
ok;
|
||||||
{ok, _Other} ->
|
{ok, _Other} ->
|
||||||
|
@ -632,7 +658,6 @@ do_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Size, Data) ->
|
||||||
ok = machi_csum_table:write(CsumTable, Offset, Size, TaggedCsum),
|
ok = machi_csum_table:write(CsumTable, Offset, Size, TaggedCsum),
|
||||||
lager:debug("Successful write to checksum file for ~p",
|
lager:debug("Successful write to checksum file for ~p",
|
||||||
[Filename]),
|
[Filename]),
|
||||||
%% io:format(user, "here, heh ~p~n", [?LINE]),
|
|
||||||
ok;
|
ok;
|
||||||
Other ->
|
Other ->
|
||||||
lager:error("Got ~p during write to file ~p at offset ~p, length ~p",
|
lager:error("Got ~p during write to file ~p at offset ~p, length ~p",
|
||||||
|
|
|
@ -450,9 +450,9 @@ do_pb_hl_request2({high_write_chunk, File, Offset, ChunkBin, TaggedCSum},
|
||||||
Chunk = {TaggedCSum, ChunkBin},
|
Chunk = {TaggedCSum, ChunkBin},
|
||||||
Res = machi_cr_client:write_chunk(Clnt, File, Offset, Chunk),
|
Res = machi_cr_client:write_chunk(Clnt, File, Offset, Chunk),
|
||||||
{Res, S};
|
{Res, S};
|
||||||
do_pb_hl_request2({high_read_chunk, File, Offset, Size},
|
do_pb_hl_request2({high_read_chunk, File, Offset, Size, Opts},
|
||||||
#state{high_clnt=Clnt}=S) ->
|
#state{high_clnt=Clnt}=S) ->
|
||||||
Res = machi_cr_client:read_chunk(Clnt, File, Offset, Size),
|
Res = machi_cr_client:read_chunk(Clnt, File, Offset, Size, Opts),
|
||||||
{Res, S};
|
{Res, S};
|
||||||
do_pb_hl_request2({high_trim_chunk, File, Offset, Size},
|
do_pb_hl_request2({high_trim_chunk, File, Offset, Size},
|
||||||
#state{high_clnt=Clnt}=S) ->
|
#state{high_clnt=Clnt}=S) ->
|
||||||
|
@ -548,13 +548,13 @@ do_server_write_chunk(File, Offset, Chunk, CSum_tag, CSum, #state{flu_name=FluNa
|
||||||
{error, bad_arg}
|
{error, bad_arg}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_server_read_chunk(File, Offset, Size, _Opts, #state{flu_name=FluName})->
|
do_server_read_chunk(File, Offset, Size, Opts, #state{flu_name=FluName})->
|
||||||
%% TODO: Look inside Opts someday.
|
%% TODO: Look inside Opts someday.
|
||||||
case sanitize_file_string(File) of
|
case sanitize_file_string(File) of
|
||||||
ok ->
|
ok ->
|
||||||
{ok, Pid} = machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}),
|
{ok, Pid} = machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}),
|
||||||
case machi_file_proxy:read(Pid, Offset, Size) of
|
case machi_file_proxy:read(Pid, Offset, Size, Opts) of
|
||||||
{ok, Chunks} -> {ok, Chunks};
|
{ok, ChunksAndTrimmed} -> {ok, ChunksAndTrimmed};
|
||||||
Other -> Other
|
Other -> Other
|
||||||
end;
|
end;
|
||||||
_ ->
|
_ ->
|
||||||
|
|
|
@ -56,7 +56,7 @@
|
||||||
%% File API
|
%% File API
|
||||||
append_chunk/4, append_chunk/5,
|
append_chunk/4, append_chunk/5,
|
||||||
append_chunk_extra/5, append_chunk_extra/6,
|
append_chunk_extra/5, append_chunk_extra/6,
|
||||||
read_chunk/5, read_chunk/6,
|
read_chunk/6, read_chunk/7,
|
||||||
checksum_list/3, checksum_list/4,
|
checksum_list/3, checksum_list/4,
|
||||||
list_files/2, list_files/3,
|
list_files/2, list_files/3,
|
||||||
wedge_status/1, wedge_status/2,
|
wedge_status/1, wedge_status/2,
|
||||||
|
@ -144,26 +144,28 @@ append_chunk_extra(Host, TcpPort, EpochID, Prefix, Chunk, ChunkExtra)
|
||||||
|
|
||||||
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
|
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
|
||||||
|
|
||||||
-spec read_chunk(port_wrap(), machi_dt:epoch_id(), machi_dt:file_name(), machi_dt:file_offset(), machi_dt:chunk_size()) ->
|
-spec read_chunk(port_wrap(), machi_dt:epoch_id(), machi_dt:file_name(), machi_dt:file_offset(), machi_dt:chunk_size(),
|
||||||
|
proplists:proplist()) ->
|
||||||
{ok, machi_dt:chunk_s()} |
|
{ok, machi_dt:chunk_s()} |
|
||||||
{error, machi_dt:error_general() | 'not_written' | 'partial_read'} |
|
{error, machi_dt:error_general() | 'not_written' | 'partial_read'} |
|
||||||
{error, term()}.
|
{error, term()}.
|
||||||
read_chunk(Sock, EpochID, File, Offset, Size)
|
read_chunk(Sock, EpochID, File, Offset, Size, Opts)
|
||||||
when Offset >= ?MINIMUM_OFFSET, Size >= 0 ->
|
when Offset >= ?MINIMUM_OFFSET, Size >= 0 ->
|
||||||
read_chunk2(Sock, EpochID, File, Offset, Size).
|
read_chunk2(Sock, EpochID, File, Offset, Size, Opts).
|
||||||
|
|
||||||
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
|
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
|
||||||
|
|
||||||
-spec read_chunk(machi_dt:inet_host(), machi_dt:inet_port(), machi_dt:epoch_id(),
|
-spec read_chunk(machi_dt:inet_host(), machi_dt:inet_port(), machi_dt:epoch_id(),
|
||||||
machi_dt:file_name(), machi_dt:file_offset(), machi_dt:chunk_size()) ->
|
machi_dt:file_name(), machi_dt:file_offset(), machi_dt:chunk_size(),
|
||||||
|
proplists:proplist()) ->
|
||||||
{ok, machi_dt:chunk_s()} |
|
{ok, machi_dt:chunk_s()} |
|
||||||
{error, machi_dt:error_general() | 'not_written' | 'partial_read'} |
|
{error, machi_dt:error_general() | 'not_written' | 'partial_read'} |
|
||||||
{error, term()}.
|
{error, term()}.
|
||||||
read_chunk(Host, TcpPort, EpochID, File, Offset, Size)
|
read_chunk(Host, TcpPort, EpochID, File, Offset, Size, Opts)
|
||||||
when Offset >= ?MINIMUM_OFFSET, Size >= 0 ->
|
when Offset >= ?MINIMUM_OFFSET, Size >= 0 ->
|
||||||
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
|
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
|
||||||
try
|
try
|
||||||
read_chunk2(Sock, EpochID, File, Offset, Size)
|
read_chunk2(Sock, EpochID, File, Offset, Size, Opts)
|
||||||
after
|
after
|
||||||
disconnect(Sock)
|
disconnect(Sock)
|
||||||
end.
|
end.
|
||||||
|
@ -516,12 +518,12 @@ trunc_hack(Host, TcpPort, EpochID, File) when is_integer(TcpPort) ->
|
||||||
|
|
||||||
%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
||||||
|
|
||||||
read_chunk2(Sock, EpochID, File0, Offset, Size) ->
|
read_chunk2(Sock, EpochID, File0, Offset, Size, Opts) ->
|
||||||
ReqID = <<"id">>,
|
ReqID = <<"id">>,
|
||||||
File = machi_util:make_binary(File0),
|
File = machi_util:make_binary(File0),
|
||||||
Req = machi_pb_translate:to_pb_request(
|
Req = machi_pb_translate:to_pb_request(
|
||||||
ReqID,
|
ReqID,
|
||||||
{low_read_chunk, EpochID, File, Offset, Size, []}),
|
{low_read_chunk, EpochID, File, Offset, Size, Opts}),
|
||||||
do_pb_request_common(Sock, ReqID, Req).
|
do_pb_request_common(Sock, ReqID, Req).
|
||||||
|
|
||||||
append_chunk2(Sock, EpochID, Prefix0, Chunk0, ChunkExtra) ->
|
append_chunk2(Sock, EpochID, Prefix0, Chunk0, ChunkExtra) ->
|
||||||
|
|
|
@ -40,7 +40,7 @@
|
||||||
auth/3, auth/4,
|
auth/3, auth/4,
|
||||||
append_chunk/6, append_chunk/7,
|
append_chunk/6, append_chunk/7,
|
||||||
write_chunk/5, write_chunk/6,
|
write_chunk/5, write_chunk/6,
|
||||||
read_chunk/4, read_chunk/5,
|
read_chunk/5, read_chunk/6,
|
||||||
trim_chunk/4, trim_chunk/5,
|
trim_chunk/4, trim_chunk/5,
|
||||||
checksum_list/2, checksum_list/3,
|
checksum_list/2, checksum_list/3,
|
||||||
list_files/1, list_files/2
|
list_files/1, list_files/2
|
||||||
|
@ -94,11 +94,18 @@ write_chunk(PidSpec, File, Offset, Chunk, CSum) ->
|
||||||
write_chunk(PidSpec, File, Offset, Chunk, CSum, Timeout) ->
|
write_chunk(PidSpec, File, Offset, Chunk, CSum, Timeout) ->
|
||||||
send_sync(PidSpec, {write_chunk, File, Offset, Chunk, CSum}, Timeout).
|
send_sync(PidSpec, {write_chunk, File, Offset, Chunk, CSum}, Timeout).
|
||||||
|
|
||||||
read_chunk(PidSpec, File, Offset, Size) ->
|
-spec read_chunk(pid(), string(), pos_integer(), pos_integer(),
|
||||||
read_chunk(PidSpec, File, Offset, Size, ?DEFAULT_TIMEOUT).
|
[{flag_no_checksum | flag_no_chunk | needs_trimmed, boolean()}]) ->
|
||||||
|
{ok, {list(), list()}} | {error, term()}.
|
||||||
|
read_chunk(PidSpec, File, Offset, Size, Options) ->
|
||||||
|
read_chunk(PidSpec, File, Offset, Size, Options, ?DEFAULT_TIMEOUT).
|
||||||
|
|
||||||
read_chunk(PidSpec, File, Offset, Size, Timeout) ->
|
-spec read_chunk(pid(), string(), pos_integer(), pos_integer(),
|
||||||
send_sync(PidSpec, {read_chunk, File, Offset, Size}, Timeout).
|
[{no_checksum | no_chunk | needs_trimmed, boolean()}],
|
||||||
|
pos_integer()) ->
|
||||||
|
{ok, {list(), list()}} | {error, term()}.
|
||||||
|
read_chunk(PidSpec, File, Offset, Size, Options, Timeout) ->
|
||||||
|
send_sync(PidSpec, {read_chunk, File, Offset, Size, Options}, Timeout).
|
||||||
|
|
||||||
trim_chunk(PidSpec, File, Offset, Size) ->
|
trim_chunk(PidSpec, File, Offset, Size) ->
|
||||||
trim_chunk(PidSpec, File, Offset, Size, ?DEFAULT_TIMEOUT).
|
trim_chunk(PidSpec, File, Offset, Size, ?DEFAULT_TIMEOUT).
|
||||||
|
@ -282,13 +289,19 @@ do_send_sync2({write_chunk, File, Offset, Chunk, CSum},
|
||||||
Res = {bummer, {X, Y, erlang:get_stacktrace()}},
|
Res = {bummer, {X, Y, erlang:get_stacktrace()}},
|
||||||
{Res, S#state{count=Count+1}}
|
{Res, S#state{count=Count+1}}
|
||||||
end;
|
end;
|
||||||
do_send_sync2({read_chunk, File, Offset, Size},
|
do_send_sync2({read_chunk, File, Offset, Size, Options},
|
||||||
#state{sock=Sock, sock_id=Index, count=Count}=S) ->
|
#state{sock=Sock, sock_id=Index, count=Count}=S) ->
|
||||||
try
|
try
|
||||||
ReqID = <<Index:64/big, Count:64/big>>,
|
ReqID = <<Index:64/big, Count:64/big>>,
|
||||||
|
FlagNoChecksum = proplists:get_value(no_checksum, Options, false),
|
||||||
|
FlagNoChunk = proplists:get_value(no_chunk, Options, false),
|
||||||
|
NeedsTrimmed = proplists:get_value(needs_trimmed, Options, false),
|
||||||
Req = #mpb_readchunkreq{chunk_pos=#mpb_chunkpos{file_name=File,
|
Req = #mpb_readchunkreq{chunk_pos=#mpb_chunkpos{file_name=File,
|
||||||
offset=Offset,
|
offset=Offset,
|
||||||
chunk_size=Size}},
|
chunk_size=Size},
|
||||||
|
flag_no_checksum=machi_util:bool2int(FlagNoChecksum),
|
||||||
|
flag_no_chunk=machi_util:bool2int(FlagNoChunk),
|
||||||
|
flag_needs_trimmed=machi_util:bool2int(NeedsTrimmed)},
|
||||||
R1a = #mpb_request{req_id=ReqID, do_not_alter=1,
|
R1a = #mpb_request{req_id=ReqID, do_not_alter=1,
|
||||||
read_chunk=Req},
|
read_chunk=Req},
|
||||||
Bin1a = machi_pb:encode_mpb_request(R1a),
|
Bin1a = machi_pb:encode_mpb_request(R1a),
|
||||||
|
@ -416,7 +429,7 @@ convert_write_chunk_resp(#mpb_writechunkresp{status='OK'}) ->
|
||||||
convert_write_chunk_resp(#mpb_writechunkresp{status=Status}) ->
|
convert_write_chunk_resp(#mpb_writechunkresp{status=Status}) ->
|
||||||
convert_general_status_code(Status).
|
convert_general_status_code(Status).
|
||||||
|
|
||||||
convert_read_chunk_resp(#mpb_readchunkresp{status='OK', chunks=PB_Chunks}) ->
|
convert_read_chunk_resp(#mpb_readchunkresp{status='OK', chunks=PB_Chunks, trimmed=PB_Trimmed}) ->
|
||||||
Chunks = lists:map(fun(#mpb_chunk{offset=Offset,
|
Chunks = lists:map(fun(#mpb_chunk{offset=Offset,
|
||||||
file_name=File,
|
file_name=File,
|
||||||
chunk=Chunk,
|
chunk=Chunk,
|
||||||
|
@ -425,7 +438,12 @@ convert_read_chunk_resp(#mpb_readchunkresp{status='OK', chunks=PB_Chunks}) ->
|
||||||
Csum = <<(machi_pb_translate:conv_to_csum_tag(T)):8, Ck/binary>>,
|
Csum = <<(machi_pb_translate:conv_to_csum_tag(T)):8, Ck/binary>>,
|
||||||
{File, Offset, Chunk, Csum}
|
{File, Offset, Chunk, Csum}
|
||||||
end, PB_Chunks),
|
end, PB_Chunks),
|
||||||
{ok, Chunks};
|
Trimmed = lists:map(fun(#mpb_chunkpos{file_name=File,
|
||||||
|
offset=Offset,
|
||||||
|
chunk_size=Size}) ->
|
||||||
|
{File, Offset, Size}
|
||||||
|
end, PB_Trimmed),
|
||||||
|
{ok, {Chunks, Trimmed}};
|
||||||
convert_read_chunk_resp(#mpb_readchunkresp{status=Status}) ->
|
convert_read_chunk_resp(#mpb_readchunkresp{status=Status}) ->
|
||||||
convert_general_status_code(Status).
|
convert_general_status_code(Status).
|
||||||
|
|
||||||
|
|
|
@ -75,13 +75,15 @@ from_pb_request(#mpb_ll_request{
|
||||||
from_pb_request(#mpb_ll_request{
|
from_pb_request(#mpb_ll_request{
|
||||||
req_id=ReqID,
|
req_id=ReqID,
|
||||||
read_chunk=#mpb_ll_readchunkreq{
|
read_chunk=#mpb_ll_readchunkreq{
|
||||||
epoch_id=PB_EpochID,
|
epoch_id=PB_EpochID,
|
||||||
chunk_pos=ChunkPos,
|
chunk_pos=ChunkPos,
|
||||||
flag_no_checksum=PB_GetNoChecksum,
|
flag_no_checksum=PB_GetNoChecksum,
|
||||||
flag_no_chunk=PB_GetNoChunk}}) ->
|
flag_no_chunk=PB_GetNoChunk,
|
||||||
|
flag_needs_trimmed=PB_NeedsTrimmed}}) ->
|
||||||
EpochID = conv_to_epoch_id(PB_EpochID),
|
EpochID = conv_to_epoch_id(PB_EpochID),
|
||||||
Opts = [{no_checksum, conv_to_boolean(PB_GetNoChecksum)},
|
Opts = [{no_checksum, conv_to_boolean(PB_GetNoChecksum)},
|
||||||
{no_chunk, conv_to_boolean(PB_GetNoChunk)}],
|
{no_chunk, conv_to_boolean(PB_GetNoChunk)},
|
||||||
|
{needs_trimmed, conv_to_boolean(PB_NeedsTrimmed)}],
|
||||||
#mpb_chunkpos{file_name=File,
|
#mpb_chunkpos{file_name=File,
|
||||||
offset=Offset,
|
offset=Offset,
|
||||||
chunk_size=Size} = ChunkPos,
|
chunk_size=Size} = ChunkPos,
|
||||||
|
@ -177,8 +179,15 @@ from_pb_request(#mpb_request{req_id=ReqID,
|
||||||
read_chunk=IR=#mpb_readchunkreq{}}) ->
|
read_chunk=IR=#mpb_readchunkreq{}}) ->
|
||||||
#mpb_readchunkreq{chunk_pos=#mpb_chunkpos{file_name=File,
|
#mpb_readchunkreq{chunk_pos=#mpb_chunkpos{file_name=File,
|
||||||
offset=Offset,
|
offset=Offset,
|
||||||
chunk_size=Size}} = IR,
|
chunk_size=Size},
|
||||||
{ReqID, {high_read_chunk, File, Offset, Size}};
|
flag_no_checksum=FlagNoChecksum,
|
||||||
|
flag_no_chunk=FlagNoChunk,
|
||||||
|
flag_needs_trimmed=NeedsTrimmed} = IR,
|
||||||
|
%% I want MAPS
|
||||||
|
Options = [{no_checksum, machi_util:int2bool(FlagNoChecksum)},
|
||||||
|
{no_chunk, machi_util:int2bool(FlagNoChunk)},
|
||||||
|
{needs_trimmed, machi_util:int2bool(NeedsTrimmed)}],
|
||||||
|
{ReqID, {high_read_chunk, File, Offset, Size, Options}};
|
||||||
from_pb_request(#mpb_request{req_id=ReqID,
|
from_pb_request(#mpb_request{req_id=ReqID,
|
||||||
trim_chunk=IR=#mpb_trimchunkreq{}}) ->
|
trim_chunk=IR=#mpb_trimchunkreq{}}) ->
|
||||||
#mpb_trimchunkreq{chunk_pos=#mpb_chunkpos{file_name=File,
|
#mpb_trimchunkreq{chunk_pos=#mpb_chunkpos{file_name=File,
|
||||||
|
@ -233,7 +242,8 @@ from_pb_response(#mpb_ll_response{
|
||||||
from_pb_response(#mpb_ll_response{
|
from_pb_response(#mpb_ll_response{
|
||||||
req_id=ReqID,
|
req_id=ReqID,
|
||||||
read_chunk=#mpb_ll_readchunkresp{status=Status,
|
read_chunk=#mpb_ll_readchunkresp{status=Status,
|
||||||
chunks=PB_Chunks}}) ->
|
chunks=PB_Chunks,
|
||||||
|
trimmed=PB_Trimmed}}) ->
|
||||||
case Status of
|
case Status of
|
||||||
'OK' ->
|
'OK' ->
|
||||||
Chunks = lists:map(fun(#mpb_chunk{file_name=File,
|
Chunks = lists:map(fun(#mpb_chunk{file_name=File,
|
||||||
|
@ -243,7 +253,12 @@ from_pb_response(#mpb_ll_response{
|
||||||
Csum = <<(conv_to_csum_tag(T)):8, Ck/binary>>,
|
Csum = <<(conv_to_csum_tag(T)):8, Ck/binary>>,
|
||||||
{File, Offset, Bytes, Csum}
|
{File, Offset, Bytes, Csum}
|
||||||
end, PB_Chunks),
|
end, PB_Chunks),
|
||||||
{ReqID, {ok, Chunks}};
|
Trimmed = lists:map(fun(#mpb_chunkpos{file_name=File,
|
||||||
|
offset=Offset,
|
||||||
|
chunk_size=Size}) ->
|
||||||
|
{File, Offset, Size}
|
||||||
|
end, PB_Trimmed),
|
||||||
|
{ReqID, {ok, {Chunks, Trimmed}}};
|
||||||
_ ->
|
_ ->
|
||||||
{ReqID, machi_pb_high_client:convert_general_status_code(Status)}
|
{ReqID, machi_pb_high_client:convert_general_status_code(Status)}
|
||||||
end;
|
end;
|
||||||
|
@ -382,17 +397,23 @@ to_pb_request(ReqID, {low_write_chunk, EpochID, File, Offset, Chunk, CSum_tag, C
|
||||||
offset=Offset,
|
offset=Offset,
|
||||||
chunk=Chunk,
|
chunk=Chunk,
|
||||||
csum=PB_CSum}}};
|
csum=PB_CSum}}};
|
||||||
to_pb_request(ReqID, {low_read_chunk, EpochID, File, Offset, Size, _Opts}) ->
|
to_pb_request(ReqID, {low_read_chunk, EpochID, File, Offset, Size, Opts}) ->
|
||||||
%% TODO: stop ignoring Opts ^_^
|
%% TODO: stop ignoring Opts ^_^
|
||||||
PB_EpochID = conv_from_epoch_id(EpochID),
|
PB_EpochID = conv_from_epoch_id(EpochID),
|
||||||
|
FNChecksum = proplists:get_value(no_checksum, Opts, false),
|
||||||
|
FNChunk = proplists:get_value(no_chunk, Opts, false),
|
||||||
|
NeedsTrimmed = proplists:get_value(needs_merge, Opts, false),
|
||||||
#mpb_ll_request{
|
#mpb_ll_request{
|
||||||
req_id=ReqID, do_not_alter=2,
|
req_id=ReqID, do_not_alter=2,
|
||||||
read_chunk=#mpb_ll_readchunkreq{
|
read_chunk=#mpb_ll_readchunkreq{
|
||||||
epoch_id=PB_EpochID,
|
epoch_id=PB_EpochID,
|
||||||
chunk_pos=#mpb_chunkpos{
|
chunk_pos=#mpb_chunkpos{
|
||||||
file_name=File,
|
file_name=File,
|
||||||
offset=Offset,
|
offset=Offset,
|
||||||
chunk_size=Size}}};
|
chunk_size=Size},
|
||||||
|
flag_no_checksum=machi_util:bool2int(FNChecksum),
|
||||||
|
flag_no_chunk=machi_util:bool2int(FNChunk),
|
||||||
|
flag_needs_trimmed=machi_util:bool2int(NeedsTrimmed)}};
|
||||||
to_pb_request(ReqID, {low_checksum_list, EpochID, File}) ->
|
to_pb_request(ReqID, {low_checksum_list, EpochID, File}) ->
|
||||||
PB_EpochID = conv_from_epoch_id(EpochID),
|
PB_EpochID = conv_from_epoch_id(EpochID),
|
||||||
#mpb_ll_request{req_id=ReqID, do_not_alter=2,
|
#mpb_ll_request{req_id=ReqID, do_not_alter=2,
|
||||||
|
@ -478,7 +499,7 @@ to_pb_response(ReqID, {low_write_chunk, _EID, _Fl, _Off, _Ch, _CST, _CS},Resp)->
|
||||||
write_chunk=#mpb_ll_writechunkresp{status=Status}};
|
write_chunk=#mpb_ll_writechunkresp{status=Status}};
|
||||||
to_pb_response(ReqID, {low_read_chunk, _EID, _Fl, _Off, _Sz, _Opts}, Resp)->
|
to_pb_response(ReqID, {low_read_chunk, _EID, _Fl, _Off, _Sz, _Opts}, Resp)->
|
||||||
case Resp of
|
case Resp of
|
||||||
{ok, Chunks} ->
|
{ok, {Chunks, Trimmed}} ->
|
||||||
PB_Chunks = lists:map(fun({File, Offset, Bytes, Csum}) ->
|
PB_Chunks = lists:map(fun({File, Offset, Bytes, Csum}) ->
|
||||||
{Tag, Ck} = machi_util:unmake_tagged_csum(Csum),
|
{Tag, Ck} = machi_util:unmake_tagged_csum(Csum),
|
||||||
#mpb_chunk{file_name=File,
|
#mpb_chunk{file_name=File,
|
||||||
|
@ -487,9 +508,15 @@ to_pb_response(ReqID, {low_read_chunk, _EID, _Fl, _Off, _Sz, _Opts}, Resp)->
|
||||||
csum=#mpb_chunkcsum{type=conv_from_csum_tag(Tag),
|
csum=#mpb_chunkcsum{type=conv_from_csum_tag(Tag),
|
||||||
csum=Ck}}
|
csum=Ck}}
|
||||||
end, Chunks),
|
end, Chunks),
|
||||||
|
PB_Trimmed = lists:map(fun({File, Offset, Size}) ->
|
||||||
|
#mpb_chunkpos{file_name=File,
|
||||||
|
offset=Offset,
|
||||||
|
chunk_size=Size}
|
||||||
|
end, Trimmed),
|
||||||
#mpb_ll_response{req_id=ReqID,
|
#mpb_ll_response{req_id=ReqID,
|
||||||
read_chunk=#mpb_ll_readchunkresp{status='OK',
|
read_chunk=#mpb_ll_readchunkresp{status='OK',
|
||||||
chunks=PB_Chunks}};
|
chunks=PB_Chunks,
|
||||||
|
trimmed=PB_Trimmed}};
|
||||||
{error, _}=Error ->
|
{error, _}=Error ->
|
||||||
Status = conv_from_status(Error),
|
Status = conv_from_status(Error),
|
||||||
#mpb_ll_response{req_id=ReqID,
|
#mpb_ll_response{req_id=ReqID,
|
||||||
|
@ -654,10 +681,10 @@ to_pb_response(ReqID, {high_write_chunk, _File, _Offset, _Chunk, _TaggedCSum}, R
|
||||||
_Else ->
|
_Else ->
|
||||||
make_error_resp(ReqID, 66, io_lib:format("err ~p", [_Else]))
|
make_error_resp(ReqID, 66, io_lib:format("err ~p", [_Else]))
|
||||||
end;
|
end;
|
||||||
to_pb_response(ReqID, {high_read_chunk, _File, _Offset, _Size}, Resp) ->
|
to_pb_response(ReqID, {high_read_chunk, _File, _Offset, _Size, _}, Resp) ->
|
||||||
case Resp of
|
case Resp of
|
||||||
{ok, Chunks} ->
|
{ok, {Chunks, Trimmed}} ->
|
||||||
MpbChunks = lists:map(fun({File, Offset, Bytes, Csum}) ->
|
PB_Chunks = lists:map(fun({File, Offset, Bytes, Csum}) ->
|
||||||
{Tag, Ck} = machi_util:unmake_tagged_csum(Csum),
|
{Tag, Ck} = machi_util:unmake_tagged_csum(Csum),
|
||||||
#mpb_chunk{
|
#mpb_chunk{
|
||||||
offset=Offset,
|
offset=Offset,
|
||||||
|
@ -665,9 +692,15 @@ to_pb_response(ReqID, {high_read_chunk, _File, _Offset, _Size}, Resp) ->
|
||||||
chunk=Bytes,
|
chunk=Bytes,
|
||||||
csum=#mpb_chunkcsum{type=conv_from_csum_tag(Tag), csum=Ck}}
|
csum=#mpb_chunkcsum{type=conv_from_csum_tag(Tag), csum=Ck}}
|
||||||
end, Chunks),
|
end, Chunks),
|
||||||
|
PB_Trimmed = lists:map(fun({File, Offset, Size}) ->
|
||||||
|
#mpb_chunkpos{file_name=File,
|
||||||
|
offset=Offset,
|
||||||
|
chunk_size=Size}
|
||||||
|
end, Trimmed),
|
||||||
#mpb_response{req_id=ReqID,
|
#mpb_response{req_id=ReqID,
|
||||||
read_chunk=#mpb_readchunkresp{status='OK',
|
read_chunk=#mpb_readchunkresp{status='OK',
|
||||||
chunks=MpbChunks}};
|
chunks=PB_Chunks,
|
||||||
|
trimmed=PB_Trimmed}};
|
||||||
{error, _}=Error ->
|
{error, _}=Error ->
|
||||||
Status = conv_from_status(Error),
|
Status = conv_from_status(Error),
|
||||||
#mpb_response{req_id=ReqID,
|
#mpb_response{req_id=ReqID,
|
||||||
|
|
|
@ -59,7 +59,7 @@
|
||||||
%% File API
|
%% File API
|
||||||
append_chunk/4, append_chunk/5,
|
append_chunk/4, append_chunk/5,
|
||||||
append_chunk_extra/5, append_chunk_extra/6,
|
append_chunk_extra/5, append_chunk_extra/6,
|
||||||
read_chunk/5, read_chunk/6,
|
read_chunk/6, read_chunk/7,
|
||||||
checksum_list/3, checksum_list/4,
|
checksum_list/3, checksum_list/4,
|
||||||
list_files/2, list_files/3,
|
list_files/2, list_files/3,
|
||||||
wedge_status/1, wedge_status/2,
|
wedge_status/1, wedge_status/2,
|
||||||
|
@ -130,13 +130,13 @@ append_chunk_extra(PidSpec, EpochID, Prefix, Chunk, ChunkExtra, Timeout) ->
|
||||||
|
|
||||||
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
|
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
|
||||||
|
|
||||||
read_chunk(PidSpec, EpochID, File, Offset, Size) ->
|
read_chunk(PidSpec, EpochID, File, Offset, Size, Opts) ->
|
||||||
read_chunk(PidSpec, EpochID, File, Offset, Size, infinity).
|
read_chunk(PidSpec, EpochID, File, Offset, Size, Opts, infinity).
|
||||||
|
|
||||||
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
|
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
|
||||||
|
|
||||||
read_chunk(PidSpec, EpochID, File, Offset, Size, Timeout) ->
|
read_chunk(PidSpec, EpochID, File, Offset, Size, Opts, Timeout) ->
|
||||||
gen_server:call(PidSpec, {req, {read_chunk, EpochID, File, Offset, Size}},
|
gen_server:call(PidSpec, {req, {read_chunk, EpochID, File, Offset, Size, Opts}},
|
||||||
Timeout).
|
Timeout).
|
||||||
|
|
||||||
%% @doc Fetch the list of chunk checksums for `File'.
|
%% @doc Fetch the list of chunk checksums for `File'.
|
||||||
|
@ -299,8 +299,8 @@ write_chunk(PidSpec, EpochID, File, Offset, Chunk, Timeout) ->
|
||||||
Timeout) of
|
Timeout) of
|
||||||
{error, written}=Err ->
|
{error, written}=Err ->
|
||||||
Size = byte_size(Chunk),
|
Size = byte_size(Chunk),
|
||||||
case read_chunk(PidSpec, EpochID, File, Offset, Size, Timeout) of
|
case read_chunk(PidSpec, EpochID, File, Offset, Size, [], Timeout) of
|
||||||
{ok, Chunk2} when Chunk2 == Chunk ->
|
{ok, {[{File, Offset, Chunk2, _}], []}} when Chunk2 == Chunk ->
|
||||||
%% See equivalent comment inside write_projection().
|
%% See equivalent comment inside write_projection().
|
||||||
ok;
|
ok;
|
||||||
_ ->
|
_ ->
|
||||||
|
@ -377,9 +377,9 @@ make_req_fun({append_chunk, EpochID, Prefix, Chunk},
|
||||||
make_req_fun({append_chunk_extra, EpochID, Prefix, Chunk, ChunkExtra},
|
make_req_fun({append_chunk_extra, EpochID, Prefix, Chunk, ChunkExtra},
|
||||||
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
|
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
|
||||||
fun() -> Mod:append_chunk_extra(Sock, EpochID, Prefix, Chunk, ChunkExtra) end;
|
fun() -> Mod:append_chunk_extra(Sock, EpochID, Prefix, Chunk, ChunkExtra) end;
|
||||||
make_req_fun({read_chunk, EpochID, File, Offset, Size},
|
make_req_fun({read_chunk, EpochID, File, Offset, Size, Opts},
|
||||||
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
|
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
|
||||||
fun() -> Mod:read_chunk(Sock, EpochID, File, Offset, Size) end;
|
fun() -> Mod:read_chunk(Sock, EpochID, File, Offset, Size, Opts) end;
|
||||||
make_req_fun({write_chunk, EpochID, File, Offset, Chunk},
|
make_req_fun({write_chunk, EpochID, File, Offset, Chunk},
|
||||||
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
|
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
|
||||||
fun() -> Mod:write_chunk(Sock, EpochID, File, Offset, Chunk) end;
|
fun() -> Mod:write_chunk(Sock, EpochID, File, Offset, Chunk) end;
|
||||||
|
|
|
@ -47,7 +47,9 @@
|
||||||
combinations/1, ordered_combinations/1,
|
combinations/1, ordered_combinations/1,
|
||||||
mk_order/2,
|
mk_order/2,
|
||||||
%% Other
|
%% Other
|
||||||
wait_for_death/2, wait_for_life/2
|
wait_for_death/2, wait_for_life/2,
|
||||||
|
bool2int/1,
|
||||||
|
int2bool/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-include("machi.hrl").
|
-include("machi.hrl").
|
||||||
|
@ -390,3 +392,9 @@ mk_order(UPI2, Repair1) ->
|
||||||
error -> error
|
error -> error
|
||||||
end || X <- UPI2],
|
end || X <- UPI2],
|
||||||
UPI2_order.
|
UPI2_order.
|
||||||
|
|
||||||
|
%% C-style conversion for PB usage.
|
||||||
|
bool2int(true) -> 1;
|
||||||
|
bool2int(false) -> 0.
|
||||||
|
int2bool(0) -> false;
|
||||||
|
int2bool(I) when is_integer(I) -> true.
|
||||||
|
|
|
@ -114,13 +114,14 @@ smoke_test2() ->
|
||||||
Chunk1_badcs = {<<?CSUM_TAG_CLIENT_SHA:8, 0:(8*20)>>, Chunk1},
|
Chunk1_badcs = {<<?CSUM_TAG_CLIENT_SHA:8, 0:(8*20)>>, Chunk1},
|
||||||
{error, bad_checksum} =
|
{error, bad_checksum} =
|
||||||
machi_cr_client:append_chunk(C1, Prefix, Chunk1_badcs),
|
machi_cr_client:append_chunk(C1, Prefix, Chunk1_badcs),
|
||||||
{ok, [{_, Off1, Chunk1, _}]} = machi_cr_client:read_chunk(C1, File1, Off1, Size1),
|
{ok, {[{_, Off1, Chunk1, _}], []}} =
|
||||||
|
machi_cr_client:read_chunk(C1, File1, Off1, Size1, []),
|
||||||
{ok, PPP} = machi_flu1_client:read_latest_projection(Host, PortBase+0,
|
{ok, PPP} = machi_flu1_client:read_latest_projection(Host, PortBase+0,
|
||||||
private),
|
private),
|
||||||
%% Verify that the client's CR wrote to all of them.
|
%% Verify that the client's CR wrote to all of them.
|
||||||
[{ok, [{_, Off1, Chunk1, _}]} =
|
[{ok, {[{_, Off1, Chunk1, _}], []}} =
|
||||||
machi_flu1_client:read_chunk(
|
machi_flu1_client:read_chunk(
|
||||||
Host, PortBase+X, EpochID, File1, Off1, Size1) ||
|
Host, PortBase+X, EpochID, File1, Off1, Size1, []) ||
|
||||||
X <- [0,1,2] ],
|
X <- [0,1,2] ],
|
||||||
|
|
||||||
%% Test read repair: Manually write to head, then verify that
|
%% Test read repair: Manually write to head, then verify that
|
||||||
|
@ -128,13 +129,19 @@ smoke_test2() ->
|
||||||
FooOff1 = Off1 + (1024*1024),
|
FooOff1 = Off1 + (1024*1024),
|
||||||
[{error, not_written} = machi_flu1_client:read_chunk(
|
[{error, not_written} = machi_flu1_client:read_chunk(
|
||||||
Host, PortBase+X, EpochID,
|
Host, PortBase+X, EpochID,
|
||||||
File1, FooOff1, Size1) || X <- [0,1,2] ],
|
File1, FooOff1, Size1, []) || X <- [0,1,2] ],
|
||||||
ok = machi_flu1_client:write_chunk(Host, PortBase+0, EpochID,
|
ok = machi_flu1_client:write_chunk(Host, PortBase+0, EpochID,
|
||||||
File1, FooOff1, Chunk1),
|
File1, FooOff1, Chunk1),
|
||||||
{ok, [{_, FooOff1, Chunk1, _}]} = machi_cr_client:read_chunk(C1, File1, FooOff1, Size1),
|
{ok, {[{_, FooOff1, Chunk1, _}], []}} =
|
||||||
[{X,{ok, [{_, FooOff1, Chunk1, _}]}} = {X,machi_flu1_client:read_chunk(
|
machi_flu1_client:read_chunk(Host, PortBase+0, EpochID,
|
||||||
|
File1, FooOff1, Size1, []),
|
||||||
|
{ok, {[{_, FooOff1, Chunk1, _}], []}} =
|
||||||
|
machi_cr_client:read_chunk(C1, File1, FooOff1, Size1, []),
|
||||||
|
[?assertMatch({X,{ok, {[{_, FooOff1, Chunk1, _}], []}}},
|
||||||
|
{X,machi_flu1_client:read_chunk(
|
||||||
Host, PortBase+X, EpochID,
|
Host, PortBase+X, EpochID,
|
||||||
File1, FooOff1, Size1)} || X <- [0,1,2] ],
|
File1, FooOff1, Size1, [])})
|
||||||
|
|| X <- [0,1,2] ],
|
||||||
|
|
||||||
%% Test read repair: Manually write to middle, then same checking.
|
%% Test read repair: Manually write to middle, then same checking.
|
||||||
FooOff2 = Off1 + (2*1024*1024),
|
FooOff2 = Off1 + (2*1024*1024),
|
||||||
|
@ -142,18 +149,19 @@ smoke_test2() ->
|
||||||
Size2 = size(Chunk2),
|
Size2 = size(Chunk2),
|
||||||
ok = machi_flu1_client:write_chunk(Host, PortBase+1, EpochID,
|
ok = machi_flu1_client:write_chunk(Host, PortBase+1, EpochID,
|
||||||
File1, FooOff2, Chunk2),
|
File1, FooOff2, Chunk2),
|
||||||
{ok, [{_, FooOff2, Chunk2, _}]} = machi_cr_client:read_chunk(C1, File1, FooOff2, Size2),
|
{ok, {[{_, FooOff2, Chunk2, _}], []}} =
|
||||||
[{X,{ok, [{_, FooOff2, Chunk2, _}]}} = {X,machi_flu1_client:read_chunk(
|
machi_cr_client:read_chunk(C1, File1, FooOff2, Size2, []),
|
||||||
Host, PortBase+X, EpochID,
|
[{X,{ok, {[{_, FooOff2, Chunk2, _}], []}}} =
|
||||||
File1, FooOff2, Size2)} || X <- [0,1,2] ],
|
{X,machi_flu1_client:read_chunk(
|
||||||
|
Host, PortBase+X, EpochID,
|
||||||
|
File1, FooOff2, Size2, [])} || X <- [0,1,2] ],
|
||||||
|
|
||||||
%% Misc API smoke & minor regression checks
|
%% Misc API smoke & minor regression checks
|
||||||
{error, bad_arg} = machi_cr_client:read_chunk(C1, <<"no">>,
|
{error, bad_arg} = machi_cr_client:read_chunk(C1, <<"no">>,
|
||||||
999999999, 1),
|
999999999, 1, []),
|
||||||
{ok, [{_,Off1,Chunk1,_},
|
{ok, {[{_,Off1,Chunk1,_}, {_,FooOff1,Chunk1,_}, {_,FooOff2,Chunk2,_}],
|
||||||
{_,FooOff1,Chunk1,_},
|
[]}} =
|
||||||
{_,FooOff2,Chunk2,_}]} = machi_cr_client:read_chunk(C1, File1,
|
machi_cr_client:read_chunk(C1, File1, Off1, 88888888, []),
|
||||||
Off1, 88888888),
|
|
||||||
%% Checksum list return value is a primitive binary().
|
%% Checksum list return value is a primitive binary().
|
||||||
{ok, KludgeBin} = machi_cr_client:checksum_list(C1, File1),
|
{ok, KludgeBin} = machi_cr_client:checksum_list(C1, File1),
|
||||||
true = is_binary(KludgeBin),
|
true = is_binary(KludgeBin),
|
||||||
|
@ -169,8 +177,8 @@ smoke_test2() ->
|
||||||
{ok, {Off10,Size10,File10}} =
|
{ok, {Off10,Size10,File10}} =
|
||||||
machi_cr_client:append_chunk_extra(C1, Prefix, Chunk10,
|
machi_cr_client:append_chunk_extra(C1, Prefix, Chunk10,
|
||||||
Extra10 * Size10),
|
Extra10 * Size10),
|
||||||
{ok, [{_, Off10, Chunk10, _}]} =
|
{ok, {[{_, Off10, Chunk10, _}], []}} =
|
||||||
machi_cr_client:read_chunk(C1, File10, Off10, Size10),
|
machi_cr_client:read_chunk(C1, File10, Off10, Size10, []),
|
||||||
[begin
|
[begin
|
||||||
Offx = Off10 + (Seq * Size10),
|
Offx = Off10 + (Seq * Size10),
|
||||||
%% TODO: uncomment written/not_written enforcement is available.
|
%% TODO: uncomment written/not_written enforcement is available.
|
||||||
|
@ -178,8 +186,8 @@ smoke_test2() ->
|
||||||
%% Offx, Size10),
|
%% Offx, Size10),
|
||||||
{ok, {Offx,Size10,File10}} =
|
{ok, {Offx,Size10,File10}} =
|
||||||
machi_cr_client:write_chunk(C1, File10, Offx, Chunk10),
|
machi_cr_client:write_chunk(C1, File10, Offx, Chunk10),
|
||||||
{ok, [{_, Offx, Chunk10, _}]} =
|
{ok, {[{_, Offx, Chunk10, _}], []}} =
|
||||||
machi_cr_client:read_chunk(C1, File10, Offx, Size10)
|
machi_cr_client:read_chunk(C1, File10, Offx, Size10, [])
|
||||||
end || Seq <- lists:seq(1, Extra10)],
|
end || Seq <- lists:seq(1, Extra10)],
|
||||||
{ok, {Off11,Size11,File11}} =
|
{ok, {Off11,Size11,File11}} =
|
||||||
machi_cr_client:append_chunk(C1, Prefix, Chunk10),
|
machi_cr_client:append_chunk(C1, Prefix, Chunk10),
|
||||||
|
@ -222,7 +230,8 @@ witness_smoke_test2() ->
|
||||||
Chunk1_badcs = {<<?CSUM_TAG_CLIENT_SHA:8, 0:(8*20)>>, Chunk1},
|
Chunk1_badcs = {<<?CSUM_TAG_CLIENT_SHA:8, 0:(8*20)>>, Chunk1},
|
||||||
{error, bad_checksum} =
|
{error, bad_checksum} =
|
||||||
machi_cr_client:append_chunk(C1, Prefix, Chunk1_badcs),
|
machi_cr_client:append_chunk(C1, Prefix, Chunk1_badcs),
|
||||||
{ok, [{_, Off1, Chunk1, _}]} = machi_cr_client:read_chunk(C1, File1, Off1, Size1),
|
{ok, {[{_, Off1, Chunk1, _}], []}} =
|
||||||
|
machi_cr_client:read_chunk(C1, File1, Off1, Size1, []),
|
||||||
|
|
||||||
%% Stop 'b' and let the chain reset.
|
%% Stop 'b' and let the chain reset.
|
||||||
ok = machi_flu_psup:stop_flu_package(b),
|
ok = machi_flu_psup:stop_flu_package(b),
|
||||||
|
@ -248,8 +257,8 @@ witness_smoke_test2() ->
|
||||||
end,
|
end,
|
||||||
|
|
||||||
%% Chunk1 is still readable: not affected by wedged witness head.
|
%% Chunk1 is still readable: not affected by wedged witness head.
|
||||||
{ok, [{_, Off1, Chunk1, _}]} =
|
{ok, {[{_, Off1, Chunk1, _}], []}} =
|
||||||
machi_cr_client:read_chunk(C1, File1, Off1, Size1),
|
machi_cr_client:read_chunk(C1, File1, Off1, Size1, []),
|
||||||
%% But because the head is wedged, an append will fail.
|
%% But because the head is wedged, an append will fail.
|
||||||
{error, partition} =
|
{error, partition} =
|
||||||
machi_cr_client:append_chunk(C1, Prefix, Chunk1, 1*1000),
|
machi_cr_client:append_chunk(C1, Prefix, Chunk1, 1*1000),
|
||||||
|
|
|
@ -202,8 +202,9 @@ read_next(S, _Res, _Args) -> S.
|
||||||
|
|
||||||
read(Pid, Offset, Length) ->
|
read(Pid, Offset, Length) ->
|
||||||
case machi_file_proxy:read(Pid, Offset, Length) of
|
case machi_file_proxy:read(Pid, Offset, Length) of
|
||||||
{ok, Chunks} ->
|
{ok, {Chunks, _}} ->
|
||||||
[{_, Offset, Data, Csum}] = machi_cr_client:trim_both_side(Chunks, Offset, Offset+Length),
|
[{_, Offset, Data, Csum}] =
|
||||||
|
machi_cr_client:trim_both_side(Chunks, Offset, Offset+Length),
|
||||||
{ok, Data, Csum};
|
{ok, Data, Csum};
|
||||||
E ->
|
E ->
|
||||||
E
|
E
|
||||||
|
|
|
@ -80,22 +80,22 @@ machi_file_proxy_test_() ->
|
||||||
clean_up_data_dir(?TESTDIR),
|
clean_up_data_dir(?TESTDIR),
|
||||||
{ok, Pid} = machi_file_proxy:start_link("test", ?TESTDIR),
|
{ok, Pid} = machi_file_proxy:start_link("test", ?TESTDIR),
|
||||||
[
|
[
|
||||||
?_assertEqual({error, bad_arg}, machi_file_proxy:read(Pid, -1, -1)),
|
?_assertEqual({error, bad_arg}, machi_file_proxy:read(Pid, -1, -1)),
|
||||||
?_assertEqual({error, bad_arg}, machi_file_proxy:write(Pid, -1, <<"yo">>)),
|
?_assertEqual({error, bad_arg}, machi_file_proxy:write(Pid, -1, <<"yo">>)),
|
||||||
?_assertEqual({error, bad_arg}, machi_file_proxy:append(Pid, [], -1, <<"krep">>)),
|
?_assertEqual({error, bad_arg}, machi_file_proxy:append(Pid, [], -1, <<"krep">>)),
|
||||||
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1, 1)),
|
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1, 1)),
|
||||||
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1024, 1)),
|
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1024, 1)),
|
||||||
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1, 1024)),
|
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1, 1024)),
|
||||||
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1024, ?HYOOGE)),
|
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1024, ?HYOOGE)),
|
||||||
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, ?HYOOGE, 1)),
|
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, ?HYOOGE, 1)),
|
||||||
?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1, random_binary(0, ?HYOOGE))),
|
?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1, random_binary(0, ?HYOOGE))),
|
||||||
?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, random_binary(0, 1024))),
|
?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, random_binary(0, 1024))),
|
||||||
?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1024, <<"fail">>)),
|
?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1024, <<"fail">>)),
|
||||||
?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1, <<"fail">>)),
|
?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1, <<"fail">>)),
|
||||||
?_assertMatch({ok, [{_, _, _, _}]}, machi_file_proxy:read(Pid, 1025, 1000)),
|
?_assertMatch({ok, {[{_, _, _, _}], []}}, machi_file_proxy:read(Pid, 1025, 1000)),
|
||||||
?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, [], 1024, <<"mind the gap">>)),
|
?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, [], 1024, <<"mind the gap">>)),
|
||||||
?_assertEqual(ok, machi_file_proxy:write(Pid, 2060, [], random_binary(0, 1024))),
|
?_assertEqual(ok, machi_file_proxy:write(Pid, 2060, [], random_binary(0, 1024))),
|
||||||
?_assertException(exit, {normal, _}, machi_file_proxy:stop(Pid))
|
?_assertException(exit, {normal, _}, machi_file_proxy:stop(Pid))
|
||||||
].
|
].
|
||||||
|
|
||||||
multiple_chunks_read_test_() ->
|
multiple_chunks_read_test_() ->
|
||||||
|
@ -108,11 +108,11 @@ multiple_chunks_read_test_() ->
|
||||||
?_assertEqual(ok, machi_file_proxy:write(Pid, 30000, <<"fail">>)),
|
?_assertEqual(ok, machi_file_proxy:write(Pid, 30000, <<"fail">>)),
|
||||||
%% Freeza
|
%% Freeza
|
||||||
?_assertEqual(ok, machi_file_proxy:write(Pid, 530000, <<"fail">>)),
|
?_assertEqual(ok, machi_file_proxy:write(Pid, 530000, <<"fail">>)),
|
||||||
?_assertMatch({ok, [{"test", 1024, _, _},
|
?_assertMatch({ok, {[{"test", 1024, _, _},
|
||||||
{"test", 10000, <<"fail">>, _},
|
{"test", 10000, <<"fail">>, _},
|
||||||
{"test", 20000, <<"fail">>, _},
|
{"test", 20000, <<"fail">>, _},
|
||||||
{"test", 30000, <<"fail">>, _},
|
{"test", 30000, <<"fail">>, _},
|
||||||
{"test", 530000, <<"fail">>, _}]},
|
{"test", 530000, <<"fail">>, _}], []}},
|
||||||
machi_file_proxy:read(Pid, 1024, 530000)),
|
machi_file_proxy:read(Pid, 1024, 530000)),
|
||||||
?_assertException(exit, {normal, _}, machi_file_proxy:stop(Pid))
|
?_assertException(exit, {normal, _}, machi_file_proxy:stop(Pid))
|
||||||
].
|
].
|
||||||
|
|
|
@ -97,8 +97,8 @@ flu_smoke_test() ->
|
||||||
{ok, {Off1,Len1,File1}} = ?FLU_C:append_chunk(Host, TcpPort,
|
{ok, {Off1,Len1,File1}} = ?FLU_C:append_chunk(Host, TcpPort,
|
||||||
?DUMMY_PV1_EPOCH,
|
?DUMMY_PV1_EPOCH,
|
||||||
Prefix, Chunk1),
|
Prefix, Chunk1),
|
||||||
{ok, [{_, Off1, Chunk1, _}]} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH,
|
{ok, {[{_, Off1, Chunk1, _}], _}} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH,
|
||||||
File1, Off1, Len1),
|
File1, Off1, Len1, []),
|
||||||
{ok, KludgeBin} = ?FLU_C:checksum_list(Host, TcpPort,
|
{ok, KludgeBin} = ?FLU_C:checksum_list(Host, TcpPort,
|
||||||
?DUMMY_PV1_EPOCH, File1),
|
?DUMMY_PV1_EPOCH, File1),
|
||||||
true = is_binary(KludgeBin),
|
true = is_binary(KludgeBin),
|
||||||
|
@ -109,7 +109,7 @@ flu_smoke_test() ->
|
||||||
Len1 = size(Chunk1),
|
Len1 = size(Chunk1),
|
||||||
{error, not_written} = ?FLU_C:read_chunk(Host, TcpPort,
|
{error, not_written} = ?FLU_C:read_chunk(Host, TcpPort,
|
||||||
?DUMMY_PV1_EPOCH,
|
?DUMMY_PV1_EPOCH,
|
||||||
File1, Off1*983829323, Len1),
|
File1, Off1*983829323, Len1, []),
|
||||||
%% XXX FIXME
|
%% XXX FIXME
|
||||||
%%
|
%%
|
||||||
%% This is failing because the read extends past the end of the file.
|
%% This is failing because the read extends past the end of the file.
|
||||||
|
@ -151,14 +151,14 @@ flu_smoke_test() ->
|
||||||
File2, Off2, Chunk2),
|
File2, Off2, Chunk2),
|
||||||
{error, bad_arg} = ?FLU_C:write_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH,
|
{error, bad_arg} = ?FLU_C:write_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH,
|
||||||
BadFile, Off2, Chunk2),
|
BadFile, Off2, Chunk2),
|
||||||
{ok, [{_, Off2, Chunk2, _}]} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH,
|
{ok, {[{_, Off2, Chunk2, _}], _}} =
|
||||||
File2, Off2, Len2),
|
?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH, File2, Off2, Len2, []),
|
||||||
{error, bad_arg} = ?FLU_C:read_chunk(Host, TcpPort,
|
{error, bad_arg} = ?FLU_C:read_chunk(Host, TcpPort,
|
||||||
?DUMMY_PV1_EPOCH,
|
?DUMMY_PV1_EPOCH,
|
||||||
"no!!", Off2, Len2),
|
"no!!", Off2, Len2, []),
|
||||||
{error, bad_arg} = ?FLU_C:read_chunk(Host, TcpPort,
|
{error, bad_arg} = ?FLU_C:read_chunk(Host, TcpPort,
|
||||||
?DUMMY_PV1_EPOCH,
|
?DUMMY_PV1_EPOCH,
|
||||||
BadFile, Off2, Len2),
|
BadFile, Off2, Len2, []),
|
||||||
|
|
||||||
%% We know that File1 still exists. Pretend that we've done a
|
%% We know that File1 still exists. Pretend that we've done a
|
||||||
%% migration and exercise the delete_migration() API.
|
%% migration and exercise the delete_migration() API.
|
||||||
|
@ -261,7 +261,7 @@ witness_test() ->
|
||||||
Prefix, Chunk1),
|
Prefix, Chunk1),
|
||||||
File = <<"foofile">>,
|
File = <<"foofile">>,
|
||||||
{error, bad_arg} = ?FLU_C:read_chunk(Host, TcpPort, EpochID1,
|
{error, bad_arg} = ?FLU_C:read_chunk(Host, TcpPort, EpochID1,
|
||||||
File, 9999, 9999),
|
File, 9999, 9999, []),
|
||||||
{error, bad_arg} = ?FLU_C:checksum_list(Host, TcpPort, EpochID1,
|
{error, bad_arg} = ?FLU_C:checksum_list(Host, TcpPort, EpochID1,
|
||||||
File),
|
File),
|
||||||
{error, bad_arg} = ?FLU_C:list_files(Host, TcpPort, EpochID1),
|
{error, bad_arg} = ?FLU_C:list_files(Host, TcpPort, EpochID1),
|
||||||
|
|
|
@ -146,7 +146,7 @@ partial_stop_restart2() ->
|
||||||
{_, #p_srvr{address=Addr_a, port=TcpPort_a}} = hd(Ps),
|
{_, #p_srvr{address=Addr_a, port=TcpPort_a}} = hd(Ps),
|
||||||
{error, wedged} = machi_flu1_client:read_chunk(
|
{error, wedged} = machi_flu1_client:read_chunk(
|
||||||
Addr_a, TcpPort_a, ?DUMMY_PV1_EPOCH,
|
Addr_a, TcpPort_a, ?DUMMY_PV1_EPOCH,
|
||||||
<<>>, 99999999, 1),
|
<<>>, 99999999, 1, []),
|
||||||
{error, wedged} = machi_flu1_client:checksum_list(
|
{error, wedged} = machi_flu1_client:checksum_list(
|
||||||
Addr_a, TcpPort_a, ?DUMMY_PV1_EPOCH, <<>>),
|
Addr_a, TcpPort_a, ?DUMMY_PV1_EPOCH, <<>>),
|
||||||
%% list_files() is permitted despite wedged status
|
%% list_files() is permitted despite wedged status
|
||||||
|
|
|
@ -81,8 +81,8 @@ smoke_test2() ->
|
||||||
{iolist_to_binary(Chunk3), File3, Off3, Size3}],
|
{iolist_to_binary(Chunk3), File3, Off3, Size3}],
|
||||||
[begin
|
[begin
|
||||||
File = binary_to_list(Fl),
|
File = binary_to_list(Fl),
|
||||||
?assertMatch({ok, [{File, Off, Ch, _}]},
|
?assertMatch({ok, {[{File, Off, Ch, _}], []}},
|
||||||
?C:read_chunk(Clnt, Fl, Off, Sz))
|
?C:read_chunk(Clnt, Fl, Off, Sz, []))
|
||||||
end || {Ch, Fl, Off, Sz} <- Reads],
|
end || {Ch, Fl, Off, Sz} <- Reads],
|
||||||
|
|
||||||
{ok, KludgeBin} = ?C:checksum_list(Clnt, File1),
|
{ok, KludgeBin} = ?C:checksum_list(Clnt, File1),
|
||||||
|
|
|
@ -60,14 +60,14 @@ api_smoke_test() ->
|
||||||
{ok, {MyOff,MySize,MyFile}} =
|
{ok, {MyOff,MySize,MyFile}} =
|
||||||
?MUT:append_chunk(Prox1, FakeEpoch, Prefix, MyChunk,
|
?MUT:append_chunk(Prox1, FakeEpoch, Prefix, MyChunk,
|
||||||
infinity),
|
infinity),
|
||||||
{ok, [{_, MyOff, MyChunk, _}]} =
|
{ok, {[{_, MyOff, MyChunk, _}], []}} =
|
||||||
?MUT:read_chunk(Prox1, FakeEpoch, MyFile, MyOff, MySize),
|
?MUT:read_chunk(Prox1, FakeEpoch, MyFile, MyOff, MySize, []),
|
||||||
MyChunk2 = <<"my chunk data, yeah, again">>,
|
MyChunk2 = <<"my chunk data, yeah, again">>,
|
||||||
{ok, {MyOff2,MySize2,MyFile2}} =
|
{ok, {MyOff2,MySize2,MyFile2}} =
|
||||||
?MUT:append_chunk_extra(Prox1, FakeEpoch, Prefix,
|
?MUT:append_chunk_extra(Prox1, FakeEpoch, Prefix,
|
||||||
MyChunk2, 4242, infinity),
|
MyChunk2, 4242, infinity),
|
||||||
{ok, [{_, MyOff2, MyChunk2, _}]} =
|
{ok, {[{_, MyOff2, MyChunk2, _}], []}} =
|
||||||
?MUT:read_chunk(Prox1, FakeEpoch, MyFile2, MyOff2, MySize2),
|
?MUT:read_chunk(Prox1, FakeEpoch, MyFile2, MyOff2, MySize2, []),
|
||||||
MyChunk_badcs = {<<?CSUM_TAG_CLIENT_SHA:8, 0:(8*20)>>, MyChunk},
|
MyChunk_badcs = {<<?CSUM_TAG_CLIENT_SHA:8, 0:(8*20)>>, MyChunk},
|
||||||
{error, bad_checksum} = ?MUT:append_chunk(Prox1, FakeEpoch,
|
{error, bad_checksum} = ?MUT:append_chunk(Prox1, FakeEpoch,
|
||||||
Prefix, MyChunk_badcs),
|
Prefix, MyChunk_badcs),
|
||||||
|
@ -245,13 +245,13 @@ flu_restart_test2() ->
|
||||||
(stop) -> ?MUT:append_chunk_extra(Prox1, FakeEpoch,
|
(stop) -> ?MUT:append_chunk_extra(Prox1, FakeEpoch,
|
||||||
<<"prefix">>, Data, 42, infinity)
|
<<"prefix">>, Data, 42, infinity)
|
||||||
end,
|
end,
|
||||||
fun(run) -> {ok, [{_, Off1, Data, _}]} =
|
fun(run) -> {ok, {[{_, Off1, Data, _}], []}} =
|
||||||
?MUT:read_chunk(Prox1, FakeEpoch,
|
?MUT:read_chunk(Prox1, FakeEpoch,
|
||||||
File1, Off1, Size1),
|
File1, Off1, Size1, []),
|
||||||
ok;
|
ok;
|
||||||
(line) -> io:format("line ~p, ", [?LINE]);
|
(line) -> io:format("line ~p, ", [?LINE]);
|
||||||
(stop) -> ?MUT:read_chunk(Prox1, FakeEpoch,
|
(stop) -> ?MUT:read_chunk(Prox1, FakeEpoch,
|
||||||
File1, Off1, Size1)
|
File1, Off1, Size1, [])
|
||||||
end,
|
end,
|
||||||
fun(run) -> {ok, KludgeBin} =
|
fun(run) -> {ok, KludgeBin} =
|
||||||
?MUT:checksum_list(Prox1, FakeEpoch, File1),
|
?MUT:checksum_list(Prox1, FakeEpoch, File1),
|
||||||
|
|
Loading…
Reference in a new issue