Update read_chunk() PB protocol to return trimmed chunks #25
18 changed files with 285 additions and 178 deletions
|
@ -205,11 +205,15 @@ message Mpb_ReadChunkReq {
|
|||
// only makes sense if flag_no_checksum is not set).
|
||||
// TODO: not implemented yet.
|
||||
optional uint32 flag_no_chunk = 3 [default=0];
|
||||
|
||||
// TODO: not implemented yet.
|
||||
optional uint32 flag_needs_trimmed = 4 [default=0];
|
||||
}
|
||||
|
||||
message Mpb_ReadChunkResp {
|
||||
required Mpb_GeneralStatusCode status = 1;
|
||||
repeated Mpb_Chunk chunks = 2;
|
||||
repeated Mpb_ChunkPos trimmed = 3;
|
||||
}
|
||||
|
||||
// High level API: trim_chunk() request & response
|
||||
|
@ -410,11 +414,14 @@ message Mpb_LL_ReadChunkReq {
|
|||
// only makes sense if flag_checksum is not set).
|
||||
// TODO: not implemented yet.
|
||||
optional uint32 flag_no_chunk = 4 [default=0];
|
||||
|
||||
optional uint32 flag_needs_trimmed = 5 [default=0];
|
||||
}
|
||||
|
||||
message Mpb_LL_ReadChunkResp {
|
||||
required Mpb_GeneralStatusCode status = 1;
|
||||
repeated Mpb_Chunk chunks = 2;
|
||||
repeated Mpb_ChunkPos trimmed = 3;
|
||||
}
|
||||
|
||||
// Low level API: checksum_list()
|
||||
|
|
|
@ -76,7 +76,7 @@ verify_file_checksums_local2(Sock1, EpochID, Path0) ->
|
|||
ReadChunk = fun(F, Offset, Size) ->
|
||||
case file:pread(FH, Offset, Size) of
|
||||
{ok, Bin} ->
|
||||
{ok, [{F, Offset, Bin, undefined}]};
|
||||
{ok, {[{F, Offset, Bin, undefined}], []}};
|
||||
Err ->
|
||||
Err
|
||||
end
|
||||
|
@ -92,7 +92,7 @@ verify_file_checksums_local2(Sock1, EpochID, Path0) ->
|
|||
verify_file_checksums_remote2(Sock1, EpochID, File) ->
|
||||
ReadChunk = fun(File_name, Offset, Size) ->
|
||||
?FLU_C:read_chunk(Sock1, EpochID,
|
||||
File_name, Offset, Size)
|
||||
File_name, Offset, Size, [])
|
||||
end,
|
||||
verify_file_checksums_common(Sock1, EpochID, File, ReadChunk).
|
||||
|
||||
|
@ -117,7 +117,7 @@ verify_file_checksums_common(Sock1, EpochID, File, ReadChunk) ->
|
|||
verify_chunk_checksum(File, ReadChunk) ->
|
||||
fun({Offset, Size, <<_Tag:1/binary, CSum/binary>>}, Acc) ->
|
||||
case ReadChunk(File, Offset, Size) of
|
||||
{ok, [{_, Offset, Chunk, _}]} ->
|
||||
{ok, {[{_, Offset, Chunk, _}], _}} ->
|
||||
CSum2 = machi_util:checksum_chunk(Chunk),
|
||||
if CSum == CSum2 ->
|
||||
Acc;
|
||||
|
|
|
@ -324,7 +324,8 @@ execute_repair_directive({File, Cmds}, {ProxiesDict, EpochID, Verb, ETS}=Acc) ->
|
|||
_ -> ok
|
||||
end,
|
||||
_T1 = os:timestamp(),
|
||||
{ok, [{_, Offset, Chunk, _}]} =
|
||||
%% TODO: support case multiple written or trimmed chunks returned
|
||||
{ok, {[{_, Offset, Chunk, _}], _}} =
|
||||
machi_proxy_flu1_client:read_chunk(
|
||||
SrcP, EpochID, File, Offset, Size,
|
||||
?SHORT_TIMEOUT),
|
||||
|
|
|
@ -122,7 +122,7 @@
|
|||
append_chunk/3, append_chunk/4,
|
||||
append_chunk_extra/4, append_chunk_extra/5,
|
||||
write_chunk/4, write_chunk/5,
|
||||
read_chunk/4, read_chunk/5,
|
||||
read_chunk/5, read_chunk/6,
|
||||
trim_chunk/4, trim_chunk/5,
|
||||
checksum_list/2, checksum_list/3,
|
||||
list_files/1, list_files/2,
|
||||
|
@ -201,14 +201,14 @@ write_chunk(PidSpec, File, Offset, Chunk, Timeout0) ->
|
|||
|
||||
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
|
||||
|
||||
read_chunk(PidSpec, File, Offset, Size) ->
|
||||
read_chunk(PidSpec, File, Offset, Size, ?DEFAULT_TIMEOUT).
|
||||
read_chunk(PidSpec, File, Offset, Size, Opts) ->
|
||||
read_chunk(PidSpec, File, Offset, Size, Opts, ?DEFAULT_TIMEOUT).
|
||||
|
||||
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
|
||||
|
||||
read_chunk(PidSpec, File, Offset, Size, Timeout0) ->
|
||||
read_chunk(PidSpec, File, Offset, Size, Opts, Timeout0) ->
|
||||
{TO, Timeout} = timeout(Timeout0),
|
||||
gen_server:call(PidSpec, {req, {read_chunk, File, Offset, Size, TO}},
|
||||
gen_server:call(PidSpec, {req, {read_chunk, File, Offset, Size, Opts, TO}},
|
||||
Timeout).
|
||||
|
||||
%% @doc Trim a chunk of data of size `Size' from `File' at `Offset'.
|
||||
|
@ -288,8 +288,8 @@ handle_call2({append_chunk_extra, Prefix, Chunk, ChunkExtra, TO}, _From, S) ->
|
|||
do_append_head(Prefix, Chunk, ChunkExtra, 0, os:timestamp(), TO, S);
|
||||
handle_call2({write_chunk, File, Offset, Chunk, TO}, _From, S) ->
|
||||
do_write_head(File, Offset, Chunk, 0, os:timestamp(), TO, S);
|
||||
handle_call2({read_chunk, File, Offset, Size, TO}, _From, S) ->
|
||||
do_read_chunk(File, Offset, Size, 0, os:timestamp(), TO, S);
|
||||
handle_call2({read_chunk, File, Offset, Size, Opts, TO}, _From, S) ->
|
||||
do_read_chunk(File, Offset, Size, Opts, 0, os:timestamp(), TO, S);
|
||||
handle_call2({trim_chunk, File, Offset, Size, TO}, _From, S) ->
|
||||
do_trim_chunk(File, Offset, Size, 0, os:timestamp(), TO, S);
|
||||
handle_call2({checksum_list, File, TO}, _From, S) ->
|
||||
|
@ -503,11 +503,10 @@ do_write_head2(File, Offset, Chunk, Depth, STime, TO,
|
|||
iolist_size(Chunk)})
|
||||
end.
|
||||
|
||||
do_read_chunk(File, Offset, Size, 0=Depth, STime, TO,
|
||||
do_read_chunk(File, Offset, Size, Opts, 0=Depth, STime, TO,
|
||||
#state{proj=#projection_v1{upi=[_|_]}}=S) -> % UPI is non-empty
|
||||
do_read_chunk2(File, Offset, Size, Depth + 1, STime, TO, S);
|
||||
do_read_chunk(File, Offset, Size, Depth, STime, TO, #state{proj=P}=S) ->
|
||||
%% io:format(user, "read sleep1,", []),
|
||||
do_read_chunk2(File, Offset, Size, Opts, Depth + 1, STime, TO, S);
|
||||
do_read_chunk(File, Offset, Size, Opts, Depth, STime, TO, #state{proj=P}=S) ->
|
||||
sleep_a_while(Depth),
|
||||
DiffMs = timer:now_diff(os:timestamp(), STime) div 1000,
|
||||
if DiffMs > TO ->
|
||||
|
@ -517,22 +516,22 @@ do_read_chunk(File, Offset, Size, Depth, STime, TO, #state{proj=P}=S) ->
|
|||
case S2#state.proj of
|
||||
P2 when P2 == undefined orelse
|
||||
P2#projection_v1.upi == [] ->
|
||||
do_read_chunk(File, Offset, Size, Depth + 1, STime, TO, S2);
|
||||
do_read_chunk(File, Offset, Size, Opts, Depth + 1, STime, TO, S2);
|
||||
_ ->
|
||||
do_read_chunk2(File, Offset, Size, Depth + 1, STime, TO, S2)
|
||||
do_read_chunk2(File, Offset, Size, Opts, Depth + 1, STime, TO, S2)
|
||||
end
|
||||
end.
|
||||
|
||||
do_read_chunk2(File, Offset, Size, Depth, STime, TO,
|
||||
do_read_chunk2(File, Offset, Size, Opts, Depth, STime, TO,
|
||||
#state{proj=P, epoch_id=EpochID, proxies_dict=PD}=S) ->
|
||||
UPI = readonly_flus(P),
|
||||
Tail = lists:last(UPI),
|
||||
ConsistencyMode = P#projection_v1.mode,
|
||||
case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), EpochID,
|
||||
File, Offset, Size, ?TIMEOUT) of
|
||||
{ok, Chunks0} when is_list(Chunks0) ->
|
||||
File, Offset, Size, Opts, ?TIMEOUT) of
|
||||
{ok, {Chunks0, []}} when is_list(Chunks0) ->
|
||||
Chunks = trim_both_side(Chunks0, Offset, Offset + Size),
|
||||
{reply, {ok, Chunks}, S};
|
||||
{reply, {ok, {Chunks, []}}, S};
|
||||
%% {ok, BadChunk} ->
|
||||
%% %% TODO cleaner handling of bad chunks
|
||||
%% exit({todo, bad_chunk_size, ?MODULE, ?LINE, File, Offset, Size,
|
||||
|
@ -546,7 +545,7 @@ do_read_chunk2(File, Offset, Size, Depth, STime, TO,
|
|||
{reply, BadCS, S};
|
||||
{error, Retry}
|
||||
when Retry == partition; Retry == bad_epoch; Retry == wedged ->
|
||||
do_read_chunk(File, Offset, Size, Depth, STime, TO, S);
|
||||
do_read_chunk(File, Offset, Size, Opts, Depth, STime, TO, S);
|
||||
{error, not_written} ->
|
||||
read_repair(ConsistencyMode, read, File, Offset, Size, Depth, STime, S);
|
||||
%% {reply, {error, not_written}, S};
|
||||
|
@ -624,10 +623,12 @@ read_repair2(ap_mode=ConsistencyMode,
|
|||
#state{proj=P}=S) ->
|
||||
Eligible = mutation_flus(P),
|
||||
case try_to_find_chunk(Eligible, File, Offset, Size, S) of
|
||||
{ok, Chunks, GotItFrom} when is_list(Chunks) ->
|
||||
{ok, {Chunks, _Trimmed}, GotItFrom} when is_list(Chunks) ->
|
||||
ToRepair = mutation_flus(P) -- [GotItFrom],
|
||||
{Reply, S1} = do_repair_chunks(Chunks, ToRepair, ReturnMode, [GotItFrom],
|
||||
{Reply0, S1} = do_repair_chunks(Chunks, ToRepair, ReturnMode, [GotItFrom],
|
||||
File, Depth, STime, S, {ok, Chunks}),
|
||||
{ok, Chunks} = Reply0,
|
||||
Reply = {ok, {Chunks, _Trimmed}},
|
||||
{reply, Reply, S1};
|
||||
{error, bad_checksum}=BadCS ->
|
||||
%% TODO: alternate strategy?
|
||||
|
@ -818,7 +819,7 @@ update_proj2(Count, #state{bad_proj=BadProj, proxies_dict=ProxiesDict}=S) ->
|
|||
run_middleworker_job(Fun, ArgList, WTimeout) ->
|
||||
Parent = self(),
|
||||
MiddleWorker =
|
||||
spawn(fun() ->
|
||||
spawn_link(fun() ->
|
||||
PidsMons =
|
||||
[spawn_monitor(fun() ->
|
||||
Res = (catch Fun(Arg)),
|
||||
|
@ -859,17 +860,19 @@ try_to_find_chunk(Eligible, File, Offset, Size,
|
|||
Work = fun(FLU) ->
|
||||
Proxy = orddict:fetch(FLU, PD),
|
||||
case ?FLU_PC:read_chunk(Proxy, EpochID,
|
||||
File, Offset, Size) of
|
||||
{ok, Chunks} when is_list(Chunks) ->
|
||||
{FLU, {ok, Chunks}};
|
||||
%% TODO Trimmed is required here
|
||||
File, Offset, Size, []) of
|
||||
{ok, {_Chunks, _} = ChunksAndTrimmed} ->
|
||||
{FLU, {ok, ChunksAndTrimmed}};
|
||||
Else ->
|
||||
{FLU, Else}
|
||||
end
|
||||
end,
|
||||
Rs = run_middleworker_job(Work, Eligible, Timeout),
|
||||
case [X || {_, {ok, [{_,_,B,_}]}}=X <- Rs, is_binary(B)] of
|
||||
[{FoundFLU, {ok, Chunk}}|_] ->
|
||||
{ok, Chunk, FoundFLU};
|
||||
|
||||
case [X || {_Fluname, {ok, {[{_,_,B,_}], _}}}=X <- Rs, is_binary(B)] of
|
||||
[{FoundFLU, {ok, ChunkAndTrimmed}}|_] ->
|
||||
{ok, ChunkAndTrimmed, FoundFLU};
|
||||
[] ->
|
||||
RetryErrs = [partition, bad_epoch, wedged],
|
||||
case [Err || {error, Err} <- Rs, lists:member(Err, RetryErrs)] of
|
||||
|
|
|
@ -52,6 +52,7 @@
|
|||
sync/1,
|
||||
sync/2,
|
||||
read/3,
|
||||
read/4,
|
||||
write/3,
|
||||
write/4,
|
||||
append/2,
|
||||
|
@ -83,9 +84,9 @@
|
|||
wedged = false :: boolean(),
|
||||
csum_file :: string()|undefined,
|
||||
csum_path :: string()|undefined,
|
||||
eof_position = 0 :: non_neg_integer(),
|
||||
data_filehandle :: file:io_device(),
|
||||
csum_table :: machi_csum_table:table(),
|
||||
eof_position = 0 :: non_neg_integer(),
|
||||
tref :: reference(), %% timer ref
|
||||
ticks = 0 :: non_neg_integer(), %% ticks elapsed with no new operations
|
||||
ops = 0 :: non_neg_integer(), %% sum of all ops
|
||||
|
@ -135,11 +136,22 @@ sync(_Pid, Type) ->
|
|||
{ok, [{Filename::string(), Offset :: non_neg_integer(),
|
||||
Data :: binary(), Checksum :: binary()}]} |
|
||||
{error, Reason :: term()}.
|
||||
read(Pid, Offset, Length) when is_pid(Pid) andalso is_integer(Offset) andalso Offset >= 0
|
||||
andalso is_integer(Length) andalso Length > 0 ->
|
||||
gen_server:call(Pid, {read, Offset, Length}, ?TIMEOUT);
|
||||
read(_Pid, Offset, Length) ->
|
||||
lager:warning("Bad args to read: Offset ~p, Length ~p", [Offset, Length]),
|
||||
read(Pid, Offset, Length) ->
|
||||
read(Pid, Offset, Length, []).
|
||||
|
||||
-spec read(Pid :: pid(),
|
||||
Offset :: non_neg_integer(),
|
||||
Length :: non_neg_integer(),
|
||||
[{no_checksum|no_chunk|needs_trimmed, boolean()}]) ->
|
||||
{ok, [{Filename::string(), Offset :: non_neg_integer(),
|
||||
Data :: binary(), Checksum :: binary()}]} |
|
||||
{error, Reason :: term()}.
|
||||
read(Pid, Offset, Length, Opts) when is_pid(Pid) andalso is_integer(Offset) andalso Offset >= 0
|
||||
andalso is_integer(Length) andalso Length > 0
|
||||
andalso is_list(Opts) ->
|
||||
gen_server:call(Pid, {read, Offset, Length, Opts}, ?TIMEOUT);
|
||||
read(_Pid, Offset, Length, Opts) ->
|
||||
lager:warning("Bad args to read: Offset ~p, Length ~p, Options ~p", [Offset, Length, Opts]),
|
||||
{error, bad_arg}.
|
||||
|
||||
% @doc Write data at offset
|
||||
|
@ -211,8 +223,8 @@ init({Filename, DataDir}) ->
|
|||
csum_table = CsumTable,
|
||||
tref = Tref,
|
||||
eof_position = Eof},
|
||||
lager:debug("Starting file proxy ~p for filename ~p, state = ~p",
|
||||
[self(), Filename, St]),
|
||||
lager:debug("Starting file proxy ~p for filename ~p, state = ~p, Eof = ~p",
|
||||
[self(), Filename, St, Eof]),
|
||||
{ok, St}.
|
||||
|
||||
% @private
|
||||
|
@ -250,13 +262,13 @@ handle_call({sync, all}, _From, State = #state{filename = F,
|
|||
|
||||
%%% READS
|
||||
|
||||
handle_call({read, _Offset, _Length}, _From,
|
||||
handle_call({read, _Offset, _Length, _}, _From,
|
||||
State = #state{wedged = true,
|
||||
reads = {T, Err}
|
||||
}) ->
|
||||
{reply, {error, wedged}, State#state{writes = {T + 1, Err + 1}}};
|
||||
|
||||
handle_call({read, Offset, Length}, _From,
|
||||
handle_call({read, Offset, Length, _Opts}, _From,
|
||||
State = #state{eof_position = Eof,
|
||||
reads = {T, Err}
|
||||
}) when Offset > Eof ->
|
||||
|
@ -265,23 +277,28 @@ handle_call({read, Offset, Length}, _From,
|
|||
[Offset, Length, Eof]),
|
||||
{reply, {error, not_written}, State#state{reads = {T + 1, Err + 1}}};
|
||||
|
||||
handle_call({read, Offset, Length}, _From,
|
||||
handle_call({read, Offset, Length, Opts}, _From,
|
||||
State = #state{filename = F,
|
||||
data_filehandle = FH,
|
||||
csum_table = CsumTable,
|
||||
eof_position = EofP,
|
||||
reads = {T, Err}
|
||||
}) ->
|
||||
NoChecksum = proplists:get_value(no_checksum, Opts, false),
|
||||
NoChunk = proplists:get_value(no_chunk, Opts, false),
|
||||
NeedsMerge = proplists:get_value(needs_trimmed, Opts, false),
|
||||
{Resp, NewErr} =
|
||||
case do_read(FH, F, CsumTable, Offset, Length) of
|
||||
{ok, []} ->
|
||||
case do_read(FH, F, CsumTable, Offset, Length, NoChecksum, NoChunk, NeedsMerge) of
|
||||
{ok, {[], []}} ->
|
||||
{{error, not_written}, Err + 1};
|
||||
{ok, Chunks} ->
|
||||
{ok, {Chunks, Trimmed}} ->
|
||||
%% Kludge to wrap read result in tuples, to support fragmented read
|
||||
%% XXX FIXME
|
||||
%% For now we are omiting the checksum data because it blows up
|
||||
%% protobufs.
|
||||
{{ok, Chunks}, Err};
|
||||
{{ok, {Chunks, Trimmed}}, Err};
|
||||
Error ->
|
||||
lager:error("Can't read ~p, ~p at File ~p", [Offset, Length, F]),
|
||||
{Error, Err + 1}
|
||||
end,
|
||||
{reply, Resp, State#state{reads = {T+1, NewErr}}};
|
||||
|
@ -298,6 +315,7 @@ handle_call({write, Offset, ClientMeta, Data}, _From,
|
|||
State = #state{filename = F,
|
||||
writes = {T, Err},
|
||||
data_filehandle = FHd,
|
||||
eof_position=EofP,
|
||||
csum_table = CsumTable}) ->
|
||||
|
||||
ClientCsumTag = proplists:get_value(client_csum_tag, ClientMeta, ?CSUM_TAG_NONE),
|
||||
|
@ -318,6 +336,8 @@ handle_call({write, Offset, ClientMeta, Data}, _From,
|
|||
end
|
||||
end,
|
||||
{NewEof, infinity} = lists:last(machi_csum_table:calc_unwritten_bytes(CsumTable)),
|
||||
lager:debug("Wrote ~p bytes at ~p of file ~p, NewEOF = ~p~n",
|
||||
[iolist_size(Data), Offset, F, NewEof]),
|
||||
{reply, Resp, State#state{writes = {T+1, NewErr},
|
||||
eof_position = NewEof}};
|
||||
|
||||
|
@ -351,15 +371,14 @@ handle_call({append, ClientMeta, Extra, Data}, _From,
|
|||
ok ->
|
||||
{{ok, F, EofP}, Err};
|
||||
Error ->
|
||||
{Error, Err + 1, EofP}
|
||||
{Error, Err + 1}
|
||||
end
|
||||
end,
|
||||
%% TODO: do we check this with calling
|
||||
%% machi_csum_table:calc_unwritten_bytes/1?
|
||||
NewEof = EofP + byte_size(Data) + Extra,
|
||||
lager:debug("appended ~p bytes at ~p file ~p. NewEofP = ~p",
|
||||
[iolist_size(Data), EofP, F, NewEof]),
|
||||
{reply, Resp, State#state{appends = {T+1, NewErr},
|
||||
eof_position = NewEof
|
||||
}};
|
||||
eof_position = NewEof}};
|
||||
|
||||
handle_call(Req, _From, State) ->
|
||||
lager:warning("Unknown call: ~p", [Req]),
|
||||
|
@ -500,7 +519,10 @@ check_or_make_tagged_csum(OtherTag, _ClientCsum, _Data) ->
|
|||
Filename :: string(),
|
||||
CsumTable :: machi_csum_table:table(),
|
||||
Offset :: non_neg_integer(),
|
||||
Size :: non_neg_integer()
|
||||
Size :: non_neg_integer(),
|
||||
NoChecksum :: boolean(),
|
||||
NoChunk :: boolean(),
|
||||
NeedsTrimmed :: boolean()
|
||||
) -> {ok, Chunks :: [{string(), Offset::non_neg_integer(), binary(), Csum :: binary()}]} |
|
||||
{error, bad_checksum} |
|
||||
{error, partial_read} |
|
||||
|
@ -519,6 +541,9 @@ check_or_make_tagged_csum(OtherTag, _ClientCsum, _Data) ->
|
|||
% tuple is returned.</ul>
|
||||
% </li>
|
||||
%
|
||||
do_read(FHd, Filename, CsumTable, Offset, Size, _, _, _) ->
|
||||
do_read(FHd, Filename, CsumTable, Offset, Size).
|
||||
|
||||
do_read(FHd, Filename, CsumTable, Offset, Size) ->
|
||||
%% Note that find/3 only returns overlapping chunks, both borders
|
||||
%% are not aligned to original Offset and Size.
|
||||
|
@ -526,7 +551,8 @@ do_read(FHd, Filename, CsumTable, Offset, Size) ->
|
|||
read_all_ranges(FHd, Filename, ChunkCsums, []).
|
||||
|
||||
read_all_ranges(_, _, [], ReadChunks) ->
|
||||
{ok, lists:reverse(ReadChunks)};
|
||||
%% TODO: currently returns empty list of trimmed chunks
|
||||
{ok, {lists:reverse(ReadChunks), []}};
|
||||
|
||||
read_all_ranges(FHd, Filename, [{Offset, Size, TaggedCsum}|T], ReadChunks) ->
|
||||
case file:pread(FHd, Offset, Size) of
|
||||
|
@ -592,12 +618,12 @@ handle_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Data) ->
|
|||
{error, Reason}
|
||||
end;
|
||||
[{Offset, Size, TaggedCsum}] ->
|
||||
case do_read(FHd, Filename, CsumTable, Offset, Size) of
|
||||
case do_read(FHd, Filename, CsumTable, Offset, Size, false, false, false) of
|
||||
{error, _} = E ->
|
||||
lager:warning("This should never happen: got ~p while reading at offset ~p in file ~p that's supposedly written",
|
||||
[E, Offset, Filename]),
|
||||
{error, server_insanity};
|
||||
{ok, [{_, Offset, Data, TaggedCsum}]} ->
|
||||
{ok, {[{_, Offset, Data, TaggedCsum}], _}} ->
|
||||
%% TODO: what if different checksum got from do_read()?
|
||||
ok;
|
||||
{ok, _Other} ->
|
||||
|
@ -632,7 +658,6 @@ do_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Size, Data) ->
|
|||
ok = machi_csum_table:write(CsumTable, Offset, Size, TaggedCsum),
|
||||
lager:debug("Successful write to checksum file for ~p",
|
||||
[Filename]),
|
||||
%% io:format(user, "here, heh ~p~n", [?LINE]),
|
||||
ok;
|
||||
Other ->
|
||||
lager:error("Got ~p during write to file ~p at offset ~p, length ~p",
|
||||
|
|
|
@ -450,9 +450,9 @@ do_pb_hl_request2({high_write_chunk, File, Offset, ChunkBin, TaggedCSum},
|
|||
Chunk = {TaggedCSum, ChunkBin},
|
||||
Res = machi_cr_client:write_chunk(Clnt, File, Offset, Chunk),
|
||||
{Res, S};
|
||||
do_pb_hl_request2({high_read_chunk, File, Offset, Size},
|
||||
do_pb_hl_request2({high_read_chunk, File, Offset, Size, Opts},
|
||||
#state{high_clnt=Clnt}=S) ->
|
||||
Res = machi_cr_client:read_chunk(Clnt, File, Offset, Size),
|
||||
Res = machi_cr_client:read_chunk(Clnt, File, Offset, Size, Opts),
|
||||
{Res, S};
|
||||
do_pb_hl_request2({high_trim_chunk, File, Offset, Size},
|
||||
#state{high_clnt=Clnt}=S) ->
|
||||
|
@ -548,13 +548,13 @@ do_server_write_chunk(File, Offset, Chunk, CSum_tag, CSum, #state{flu_name=FluNa
|
|||
{error, bad_arg}
|
||||
end.
|
||||
|
||||
do_server_read_chunk(File, Offset, Size, _Opts, #state{flu_name=FluName})->
|
||||
do_server_read_chunk(File, Offset, Size, Opts, #state{flu_name=FluName})->
|
||||
%% TODO: Look inside Opts someday.
|
||||
case sanitize_file_string(File) of
|
||||
ok ->
|
||||
{ok, Pid} = machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}),
|
||||
case machi_file_proxy:read(Pid, Offset, Size) of
|
||||
{ok, Chunks} -> {ok, Chunks};
|
||||
case machi_file_proxy:read(Pid, Offset, Size, Opts) of
|
||||
{ok, ChunksAndTrimmed} -> {ok, ChunksAndTrimmed};
|
||||
Other -> Other
|
||||
end;
|
||||
_ ->
|
||||
|
|
|
@ -56,7 +56,7 @@
|
|||
%% File API
|
||||
append_chunk/4, append_chunk/5,
|
||||
append_chunk_extra/5, append_chunk_extra/6,
|
||||
read_chunk/5, read_chunk/6,
|
||||
read_chunk/6, read_chunk/7,
|
||||
checksum_list/3, checksum_list/4,
|
||||
list_files/2, list_files/3,
|
||||
wedge_status/1, wedge_status/2,
|
||||
|
@ -144,26 +144,28 @@ append_chunk_extra(Host, TcpPort, EpochID, Prefix, Chunk, ChunkExtra)
|
|||
|
||||
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
|
||||
|
||||
-spec read_chunk(port_wrap(), machi_dt:epoch_id(), machi_dt:file_name(), machi_dt:file_offset(), machi_dt:chunk_size()) ->
|
||||
-spec read_chunk(port_wrap(), machi_dt:epoch_id(), machi_dt:file_name(), machi_dt:file_offset(), machi_dt:chunk_size(),
|
||||
proplists:proplist()) ->
|
||||
{ok, machi_dt:chunk_s()} |
|
||||
{error, machi_dt:error_general() | 'not_written' | 'partial_read'} |
|
||||
{error, term()}.
|
||||
read_chunk(Sock, EpochID, File, Offset, Size)
|
||||
read_chunk(Sock, EpochID, File, Offset, Size, Opts)
|
||||
when Offset >= ?MINIMUM_OFFSET, Size >= 0 ->
|
||||
read_chunk2(Sock, EpochID, File, Offset, Size).
|
||||
read_chunk2(Sock, EpochID, File, Offset, Size, Opts).
|
||||
|
||||
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
|
||||
|
||||
-spec read_chunk(machi_dt:inet_host(), machi_dt:inet_port(), machi_dt:epoch_id(),
|
||||
machi_dt:file_name(), machi_dt:file_offset(), machi_dt:chunk_size()) ->
|
||||
machi_dt:file_name(), machi_dt:file_offset(), machi_dt:chunk_size(),
|
||||
proplists:proplist()) ->
|
||||
{ok, machi_dt:chunk_s()} |
|
||||
{error, machi_dt:error_general() | 'not_written' | 'partial_read'} |
|
||||
{error, term()}.
|
||||
read_chunk(Host, TcpPort, EpochID, File, Offset, Size)
|
||||
read_chunk(Host, TcpPort, EpochID, File, Offset, Size, Opts)
|
||||
when Offset >= ?MINIMUM_OFFSET, Size >= 0 ->
|
||||
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
|
||||
try
|
||||
read_chunk2(Sock, EpochID, File, Offset, Size)
|
||||
read_chunk2(Sock, EpochID, File, Offset, Size, Opts)
|
||||
after
|
||||
disconnect(Sock)
|
||||
end.
|
||||
|
@ -516,12 +518,12 @@ trunc_hack(Host, TcpPort, EpochID, File) when is_integer(TcpPort) ->
|
|||
|
||||
%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
||||
|
||||
read_chunk2(Sock, EpochID, File0, Offset, Size) ->
|
||||
read_chunk2(Sock, EpochID, File0, Offset, Size, Opts) ->
|
||||
ReqID = <<"id">>,
|
||||
File = machi_util:make_binary(File0),
|
||||
Req = machi_pb_translate:to_pb_request(
|
||||
ReqID,
|
||||
{low_read_chunk, EpochID, File, Offset, Size, []}),
|
||||
{low_read_chunk, EpochID, File, Offset, Size, Opts}),
|
||||
do_pb_request_common(Sock, ReqID, Req).
|
||||
|
||||
append_chunk2(Sock, EpochID, Prefix0, Chunk0, ChunkExtra) ->
|
||||
|
|
|
@ -40,7 +40,7 @@
|
|||
auth/3, auth/4,
|
||||
append_chunk/6, append_chunk/7,
|
||||
write_chunk/5, write_chunk/6,
|
||||
read_chunk/4, read_chunk/5,
|
||||
read_chunk/5, read_chunk/6,
|
||||
trim_chunk/4, trim_chunk/5,
|
||||
checksum_list/2, checksum_list/3,
|
||||
list_files/1, list_files/2
|
||||
|
@ -94,11 +94,18 @@ write_chunk(PidSpec, File, Offset, Chunk, CSum) ->
|
|||
write_chunk(PidSpec, File, Offset, Chunk, CSum, Timeout) ->
|
||||
send_sync(PidSpec, {write_chunk, File, Offset, Chunk, CSum}, Timeout).
|
||||
|
||||
read_chunk(PidSpec, File, Offset, Size) ->
|
||||
read_chunk(PidSpec, File, Offset, Size, ?DEFAULT_TIMEOUT).
|
||||
-spec read_chunk(pid(), string(), pos_integer(), pos_integer(),
|
||||
[{flag_no_checksum | flag_no_chunk | needs_trimmed, boolean()}]) ->
|
||||
{ok, {list(), list()}} | {error, term()}.
|
||||
read_chunk(PidSpec, File, Offset, Size, Options) ->
|
||||
read_chunk(PidSpec, File, Offset, Size, Options, ?DEFAULT_TIMEOUT).
|
||||
|
||||
read_chunk(PidSpec, File, Offset, Size, Timeout) ->
|
||||
send_sync(PidSpec, {read_chunk, File, Offset, Size}, Timeout).
|
||||
-spec read_chunk(pid(), string(), pos_integer(), pos_integer(),
|
||||
[{no_checksum | no_chunk | needs_trimmed, boolean()}],
|
||||
pos_integer()) ->
|
||||
{ok, {list(), list()}} | {error, term()}.
|
||||
read_chunk(PidSpec, File, Offset, Size, Options, Timeout) ->
|
||||
send_sync(PidSpec, {read_chunk, File, Offset, Size, Options}, Timeout).
|
||||
|
||||
trim_chunk(PidSpec, File, Offset, Size) ->
|
||||
trim_chunk(PidSpec, File, Offset, Size, ?DEFAULT_TIMEOUT).
|
||||
|
@ -282,13 +289,19 @@ do_send_sync2({write_chunk, File, Offset, Chunk, CSum},
|
|||
Res = {bummer, {X, Y, erlang:get_stacktrace()}},
|
||||
{Res, S#state{count=Count+1}}
|
||||
end;
|
||||
do_send_sync2({read_chunk, File, Offset, Size},
|
||||
do_send_sync2({read_chunk, File, Offset, Size, Options},
|
||||
#state{sock=Sock, sock_id=Index, count=Count}=S) ->
|
||||
try
|
||||
ReqID = <<Index:64/big, Count:64/big>>,
|
||||
FlagNoChecksum = proplists:get_value(no_checksum, Options, false),
|
||||
FlagNoChunk = proplists:get_value(no_chunk, Options, false),
|
||||
NeedsTrimmed = proplists:get_value(needs_trimmed, Options, false),
|
||||
Req = #mpb_readchunkreq{chunk_pos=#mpb_chunkpos{file_name=File,
|
||||
offset=Offset,
|
||||
chunk_size=Size}},
|
||||
chunk_size=Size},
|
||||
flag_no_checksum=machi_util:bool2int(FlagNoChecksum),
|
||||
flag_no_chunk=machi_util:bool2int(FlagNoChunk),
|
||||
flag_needs_trimmed=machi_util:bool2int(NeedsTrimmed)},
|
||||
R1a = #mpb_request{req_id=ReqID, do_not_alter=1,
|
||||
read_chunk=Req},
|
||||
Bin1a = machi_pb:encode_mpb_request(R1a),
|
||||
|
@ -416,7 +429,7 @@ convert_write_chunk_resp(#mpb_writechunkresp{status='OK'}) ->
|
|||
convert_write_chunk_resp(#mpb_writechunkresp{status=Status}) ->
|
||||
convert_general_status_code(Status).
|
||||
|
||||
convert_read_chunk_resp(#mpb_readchunkresp{status='OK', chunks=PB_Chunks}) ->
|
||||
convert_read_chunk_resp(#mpb_readchunkresp{status='OK', chunks=PB_Chunks, trimmed=PB_Trimmed}) ->
|
||||
Chunks = lists:map(fun(#mpb_chunk{offset=Offset,
|
||||
file_name=File,
|
||||
chunk=Chunk,
|
||||
|
@ -425,7 +438,12 @@ convert_read_chunk_resp(#mpb_readchunkresp{status='OK', chunks=PB_Chunks}) ->
|
|||
Csum = <<(machi_pb_translate:conv_to_csum_tag(T)):8, Ck/binary>>,
|
||||
{File, Offset, Chunk, Csum}
|
||||
end, PB_Chunks),
|
||||
{ok, Chunks};
|
||||
Trimmed = lists:map(fun(#mpb_chunkpos{file_name=File,
|
||||
offset=Offset,
|
||||
chunk_size=Size}) ->
|
||||
{File, Offset, Size}
|
||||
end, PB_Trimmed),
|
||||
{ok, {Chunks, Trimmed}};
|
||||
convert_read_chunk_resp(#mpb_readchunkresp{status=Status}) ->
|
||||
convert_general_status_code(Status).
|
||||
|
||||
|
|
|
@ -78,10 +78,12 @@ from_pb_request(#mpb_ll_request{
|
|||
epoch_id=PB_EpochID,
|
||||
chunk_pos=ChunkPos,
|
||||
flag_no_checksum=PB_GetNoChecksum,
|
||||
flag_no_chunk=PB_GetNoChunk}}) ->
|
||||
flag_no_chunk=PB_GetNoChunk,
|
||||
flag_needs_trimmed=PB_NeedsTrimmed}}) ->
|
||||
EpochID = conv_to_epoch_id(PB_EpochID),
|
||||
Opts = [{no_checksum, conv_to_boolean(PB_GetNoChecksum)},
|
||||
{no_chunk, conv_to_boolean(PB_GetNoChunk)}],
|
||||
{no_chunk, conv_to_boolean(PB_GetNoChunk)},
|
||||
{needs_trimmed, conv_to_boolean(PB_NeedsTrimmed)}],
|
||||
#mpb_chunkpos{file_name=File,
|
||||
offset=Offset,
|
||||
chunk_size=Size} = ChunkPos,
|
||||
|
@ -177,8 +179,15 @@ from_pb_request(#mpb_request{req_id=ReqID,
|
|||
read_chunk=IR=#mpb_readchunkreq{}}) ->
|
||||
#mpb_readchunkreq{chunk_pos=#mpb_chunkpos{file_name=File,
|
||||
offset=Offset,
|
||||
chunk_size=Size}} = IR,
|
||||
{ReqID, {high_read_chunk, File, Offset, Size}};
|
||||
chunk_size=Size},
|
||||
flag_no_checksum=FlagNoChecksum,
|
||||
flag_no_chunk=FlagNoChunk,
|
||||
flag_needs_trimmed=NeedsTrimmed} = IR,
|
||||
%% I want MAPS
|
||||
Options = [{no_checksum, machi_util:int2bool(FlagNoChecksum)},
|
||||
{no_chunk, machi_util:int2bool(FlagNoChunk)},
|
||||
{needs_trimmed, machi_util:int2bool(NeedsTrimmed)}],
|
||||
{ReqID, {high_read_chunk, File, Offset, Size, Options}};
|
||||
from_pb_request(#mpb_request{req_id=ReqID,
|
||||
trim_chunk=IR=#mpb_trimchunkreq{}}) ->
|
||||
#mpb_trimchunkreq{chunk_pos=#mpb_chunkpos{file_name=File,
|
||||
|
@ -233,7 +242,8 @@ from_pb_response(#mpb_ll_response{
|
|||
from_pb_response(#mpb_ll_response{
|
||||
req_id=ReqID,
|
||||
read_chunk=#mpb_ll_readchunkresp{status=Status,
|
||||
chunks=PB_Chunks}}) ->
|
||||
chunks=PB_Chunks,
|
||||
trimmed=PB_Trimmed}}) ->
|
||||
case Status of
|
||||
'OK' ->
|
||||
Chunks = lists:map(fun(#mpb_chunk{file_name=File,
|
||||
|
@ -243,7 +253,12 @@ from_pb_response(#mpb_ll_response{
|
|||
Csum = <<(conv_to_csum_tag(T)):8, Ck/binary>>,
|
||||
{File, Offset, Bytes, Csum}
|
||||
end, PB_Chunks),
|
||||
{ReqID, {ok, Chunks}};
|
||||
Trimmed = lists:map(fun(#mpb_chunkpos{file_name=File,
|
||||
offset=Offset,
|
||||
chunk_size=Size}) ->
|
||||
{File, Offset, Size}
|
||||
end, PB_Trimmed),
|
||||
{ReqID, {ok, {Chunks, Trimmed}}};
|
||||
_ ->
|
||||
{ReqID, machi_pb_high_client:convert_general_status_code(Status)}
|
||||
end;
|
||||
|
@ -382,9 +397,12 @@ to_pb_request(ReqID, {low_write_chunk, EpochID, File, Offset, Chunk, CSum_tag, C
|
|||
offset=Offset,
|
||||
chunk=Chunk,
|
||||
csum=PB_CSum}}};
|
||||
to_pb_request(ReqID, {low_read_chunk, EpochID, File, Offset, Size, _Opts}) ->
|
||||
to_pb_request(ReqID, {low_read_chunk, EpochID, File, Offset, Size, Opts}) ->
|
||||
%% TODO: stop ignoring Opts ^_^
|
||||
PB_EpochID = conv_from_epoch_id(EpochID),
|
||||
FNChecksum = proplists:get_value(no_checksum, Opts, false),
|
||||
FNChunk = proplists:get_value(no_chunk, Opts, false),
|
||||
NeedsTrimmed = proplists:get_value(needs_merge, Opts, false),
|
||||
#mpb_ll_request{
|
||||
req_id=ReqID, do_not_alter=2,
|
||||
read_chunk=#mpb_ll_readchunkreq{
|
||||
|
@ -392,7 +410,10 @@ to_pb_request(ReqID, {low_read_chunk, EpochID, File, Offset, Size, _Opts}) ->
|
|||
chunk_pos=#mpb_chunkpos{
|
||||
file_name=File,
|
||||
offset=Offset,
|
||||
chunk_size=Size}}};
|
||||
chunk_size=Size},
|
||||
flag_no_checksum=machi_util:bool2int(FNChecksum),
|
||||
flag_no_chunk=machi_util:bool2int(FNChunk),
|
||||
flag_needs_trimmed=machi_util:bool2int(NeedsTrimmed)}};
|
||||
to_pb_request(ReqID, {low_checksum_list, EpochID, File}) ->
|
||||
PB_EpochID = conv_from_epoch_id(EpochID),
|
||||
#mpb_ll_request{req_id=ReqID, do_not_alter=2,
|
||||
|
@ -478,7 +499,7 @@ to_pb_response(ReqID, {low_write_chunk, _EID, _Fl, _Off, _Ch, _CST, _CS},Resp)->
|
|||
write_chunk=#mpb_ll_writechunkresp{status=Status}};
|
||||
to_pb_response(ReqID, {low_read_chunk, _EID, _Fl, _Off, _Sz, _Opts}, Resp)->
|
||||
case Resp of
|
||||
{ok, Chunks} ->
|
||||
{ok, {Chunks, Trimmed}} ->
|
||||
PB_Chunks = lists:map(fun({File, Offset, Bytes, Csum}) ->
|
||||
{Tag, Ck} = machi_util:unmake_tagged_csum(Csum),
|
||||
#mpb_chunk{file_name=File,
|
||||
|
@ -487,9 +508,15 @@ to_pb_response(ReqID, {low_read_chunk, _EID, _Fl, _Off, _Sz, _Opts}, Resp)->
|
|||
csum=#mpb_chunkcsum{type=conv_from_csum_tag(Tag),
|
||||
csum=Ck}}
|
||||
end, Chunks),
|
||||
PB_Trimmed = lists:map(fun({File, Offset, Size}) ->
|
||||
#mpb_chunkpos{file_name=File,
|
||||
offset=Offset,
|
||||
chunk_size=Size}
|
||||
end, Trimmed),
|
||||
#mpb_ll_response{req_id=ReqID,
|
||||
read_chunk=#mpb_ll_readchunkresp{status='OK',
|
||||
chunks=PB_Chunks}};
|
||||
chunks=PB_Chunks,
|
||||
trimmed=PB_Trimmed}};
|
||||
{error, _}=Error ->
|
||||
Status = conv_from_status(Error),
|
||||
#mpb_ll_response{req_id=ReqID,
|
||||
|
@ -654,10 +681,10 @@ to_pb_response(ReqID, {high_write_chunk, _File, _Offset, _Chunk, _TaggedCSum}, R
|
|||
_Else ->
|
||||
make_error_resp(ReqID, 66, io_lib:format("err ~p", [_Else]))
|
||||
end;
|
||||
to_pb_response(ReqID, {high_read_chunk, _File, _Offset, _Size}, Resp) ->
|
||||
to_pb_response(ReqID, {high_read_chunk, _File, _Offset, _Size, _}, Resp) ->
|
||||
case Resp of
|
||||
{ok, Chunks} ->
|
||||
MpbChunks = lists:map(fun({File, Offset, Bytes, Csum}) ->
|
||||
{ok, {Chunks, Trimmed}} ->
|
||||
PB_Chunks = lists:map(fun({File, Offset, Bytes, Csum}) ->
|
||||
{Tag, Ck} = machi_util:unmake_tagged_csum(Csum),
|
||||
#mpb_chunk{
|
||||
offset=Offset,
|
||||
|
@ -665,9 +692,15 @@ to_pb_response(ReqID, {high_read_chunk, _File, _Offset, _Size}, Resp) ->
|
|||
chunk=Bytes,
|
||||
csum=#mpb_chunkcsum{type=conv_from_csum_tag(Tag), csum=Ck}}
|
||||
end, Chunks),
|
||||
PB_Trimmed = lists:map(fun({File, Offset, Size}) ->
|
||||
#mpb_chunkpos{file_name=File,
|
||||
offset=Offset,
|
||||
chunk_size=Size}
|
||||
end, Trimmed),
|
||||
#mpb_response{req_id=ReqID,
|
||||
read_chunk=#mpb_readchunkresp{status='OK',
|
||||
chunks=MpbChunks}};
|
||||
chunks=PB_Chunks,
|
||||
trimmed=PB_Trimmed}};
|
||||
{error, _}=Error ->
|
||||
Status = conv_from_status(Error),
|
||||
#mpb_response{req_id=ReqID,
|
||||
|
|
|
@ -59,7 +59,7 @@
|
|||
%% File API
|
||||
append_chunk/4, append_chunk/5,
|
||||
append_chunk_extra/5, append_chunk_extra/6,
|
||||
read_chunk/5, read_chunk/6,
|
||||
read_chunk/6, read_chunk/7,
|
||||
checksum_list/3, checksum_list/4,
|
||||
list_files/2, list_files/3,
|
||||
wedge_status/1, wedge_status/2,
|
||||
|
@ -130,13 +130,13 @@ append_chunk_extra(PidSpec, EpochID, Prefix, Chunk, ChunkExtra, Timeout) ->
|
|||
|
||||
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
|
||||
|
||||
read_chunk(PidSpec, EpochID, File, Offset, Size) ->
|
||||
read_chunk(PidSpec, EpochID, File, Offset, Size, infinity).
|
||||
read_chunk(PidSpec, EpochID, File, Offset, Size, Opts) ->
|
||||
read_chunk(PidSpec, EpochID, File, Offset, Size, Opts, infinity).
|
||||
|
||||
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
|
||||
|
||||
read_chunk(PidSpec, EpochID, File, Offset, Size, Timeout) ->
|
||||
gen_server:call(PidSpec, {req, {read_chunk, EpochID, File, Offset, Size}},
|
||||
read_chunk(PidSpec, EpochID, File, Offset, Size, Opts, Timeout) ->
|
||||
gen_server:call(PidSpec, {req, {read_chunk, EpochID, File, Offset, Size, Opts}},
|
||||
Timeout).
|
||||
|
||||
%% @doc Fetch the list of chunk checksums for `File'.
|
||||
|
@ -299,8 +299,8 @@ write_chunk(PidSpec, EpochID, File, Offset, Chunk, Timeout) ->
|
|||
Timeout) of
|
||||
{error, written}=Err ->
|
||||
Size = byte_size(Chunk),
|
||||
case read_chunk(PidSpec, EpochID, File, Offset, Size, Timeout) of
|
||||
{ok, Chunk2} when Chunk2 == Chunk ->
|
||||
case read_chunk(PidSpec, EpochID, File, Offset, Size, [], Timeout) of
|
||||
{ok, {[{File, Offset, Chunk2, _}], []}} when Chunk2 == Chunk ->
|
||||
%% See equivalent comment inside write_projection().
|
||||
ok;
|
||||
_ ->
|
||||
|
@ -377,9 +377,9 @@ make_req_fun({append_chunk, EpochID, Prefix, Chunk},
|
|||
make_req_fun({append_chunk_extra, EpochID, Prefix, Chunk, ChunkExtra},
|
||||
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
|
||||
fun() -> Mod:append_chunk_extra(Sock, EpochID, Prefix, Chunk, ChunkExtra) end;
|
||||
make_req_fun({read_chunk, EpochID, File, Offset, Size},
|
||||
make_req_fun({read_chunk, EpochID, File, Offset, Size, Opts},
|
||||
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
|
||||
fun() -> Mod:read_chunk(Sock, EpochID, File, Offset, Size) end;
|
||||
fun() -> Mod:read_chunk(Sock, EpochID, File, Offset, Size, Opts) end;
|
||||
make_req_fun({write_chunk, EpochID, File, Offset, Chunk},
|
||||
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
|
||||
fun() -> Mod:write_chunk(Sock, EpochID, File, Offset, Chunk) end;
|
||||
|
|
|
@ -47,7 +47,9 @@
|
|||
combinations/1, ordered_combinations/1,
|
||||
mk_order/2,
|
||||
%% Other
|
||||
wait_for_death/2, wait_for_life/2
|
||||
wait_for_death/2, wait_for_life/2,
|
||||
bool2int/1,
|
||||
int2bool/1
|
||||
]).
|
||||
|
||||
-include("machi.hrl").
|
||||
|
@ -390,3 +392,9 @@ mk_order(UPI2, Repair1) ->
|
|||
error -> error
|
||||
end || X <- UPI2],
|
||||
UPI2_order.
|
||||
|
||||
%% C-style conversion for PB usage.
|
||||
bool2int(true) -> 1;
|
||||
bool2int(false) -> 0.
|
||||
int2bool(0) -> false;
|
||||
int2bool(I) when is_integer(I) -> true.
|
||||
|
|
|
@ -114,13 +114,14 @@ smoke_test2() ->
|
|||
Chunk1_badcs = {<<?CSUM_TAG_CLIENT_SHA:8, 0:(8*20)>>, Chunk1},
|
||||
{error, bad_checksum} =
|
||||
machi_cr_client:append_chunk(C1, Prefix, Chunk1_badcs),
|
||||
{ok, [{_, Off1, Chunk1, _}]} = machi_cr_client:read_chunk(C1, File1, Off1, Size1),
|
||||
{ok, {[{_, Off1, Chunk1, _}], []}} =
|
||||
machi_cr_client:read_chunk(C1, File1, Off1, Size1, []),
|
||||
{ok, PPP} = machi_flu1_client:read_latest_projection(Host, PortBase+0,
|
||||
private),
|
||||
%% Verify that the client's CR wrote to all of them.
|
||||
[{ok, [{_, Off1, Chunk1, _}]} =
|
||||
[{ok, {[{_, Off1, Chunk1, _}], []}} =
|
||||
machi_flu1_client:read_chunk(
|
||||
Host, PortBase+X, EpochID, File1, Off1, Size1) ||
|
||||
Host, PortBase+X, EpochID, File1, Off1, Size1, []) ||
|
||||
X <- [0,1,2] ],
|
||||
|
||||
%% Test read repair: Manually write to head, then verify that
|
||||
|
@ -128,13 +129,19 @@ smoke_test2() ->
|
|||
FooOff1 = Off1 + (1024*1024),
|
||||
[{error, not_written} = machi_flu1_client:read_chunk(
|
||||
Host, PortBase+X, EpochID,
|
||||
File1, FooOff1, Size1) || X <- [0,1,2] ],
|
||||
File1, FooOff1, Size1, []) || X <- [0,1,2] ],
|
||||
ok = machi_flu1_client:write_chunk(Host, PortBase+0, EpochID,
|
||||
File1, FooOff1, Chunk1),
|
||||
{ok, [{_, FooOff1, Chunk1, _}]} = machi_cr_client:read_chunk(C1, File1, FooOff1, Size1),
|
||||
[{X,{ok, [{_, FooOff1, Chunk1, _}]}} = {X,machi_flu1_client:read_chunk(
|
||||
{ok, {[{_, FooOff1, Chunk1, _}], []}} =
|
||||
machi_flu1_client:read_chunk(Host, PortBase+0, EpochID,
|
||||
File1, FooOff1, Size1, []),
|
||||
{ok, {[{_, FooOff1, Chunk1, _}], []}} =
|
||||
machi_cr_client:read_chunk(C1, File1, FooOff1, Size1, []),
|
||||
[?assertMatch({X,{ok, {[{_, FooOff1, Chunk1, _}], []}}},
|
||||
{X,machi_flu1_client:read_chunk(
|
||||
Host, PortBase+X, EpochID,
|
||||
File1, FooOff1, Size1)} || X <- [0,1,2] ],
|
||||
File1, FooOff1, Size1, [])})
|
||||
|| X <- [0,1,2] ],
|
||||
|
||||
%% Test read repair: Manually write to middle, then same checking.
|
||||
FooOff2 = Off1 + (2*1024*1024),
|
||||
|
@ -142,18 +149,19 @@ smoke_test2() ->
|
|||
Size2 = size(Chunk2),
|
||||
ok = machi_flu1_client:write_chunk(Host, PortBase+1, EpochID,
|
||||
File1, FooOff2, Chunk2),
|
||||
{ok, [{_, FooOff2, Chunk2, _}]} = machi_cr_client:read_chunk(C1, File1, FooOff2, Size2),
|
||||
[{X,{ok, [{_, FooOff2, Chunk2, _}]}} = {X,machi_flu1_client:read_chunk(
|
||||
{ok, {[{_, FooOff2, Chunk2, _}], []}} =
|
||||
machi_cr_client:read_chunk(C1, File1, FooOff2, Size2, []),
|
||||
[{X,{ok, {[{_, FooOff2, Chunk2, _}], []}}} =
|
||||
{X,machi_flu1_client:read_chunk(
|
||||
Host, PortBase+X, EpochID,
|
||||
File1, FooOff2, Size2)} || X <- [0,1,2] ],
|
||||
File1, FooOff2, Size2, [])} || X <- [0,1,2] ],
|
||||
|
||||
%% Misc API smoke & minor regression checks
|
||||
{error, bad_arg} = machi_cr_client:read_chunk(C1, <<"no">>,
|
||||
999999999, 1),
|
||||
{ok, [{_,Off1,Chunk1,_},
|
||||
{_,FooOff1,Chunk1,_},
|
||||
{_,FooOff2,Chunk2,_}]} = machi_cr_client:read_chunk(C1, File1,
|
||||
Off1, 88888888),
|
||||
999999999, 1, []),
|
||||
{ok, {[{_,Off1,Chunk1,_}, {_,FooOff1,Chunk1,_}, {_,FooOff2,Chunk2,_}],
|
||||
[]}} =
|
||||
machi_cr_client:read_chunk(C1, File1, Off1, 88888888, []),
|
||||
%% Checksum list return value is a primitive binary().
|
||||
{ok, KludgeBin} = machi_cr_client:checksum_list(C1, File1),
|
||||
true = is_binary(KludgeBin),
|
||||
|
@ -169,8 +177,8 @@ smoke_test2() ->
|
|||
{ok, {Off10,Size10,File10}} =
|
||||
machi_cr_client:append_chunk_extra(C1, Prefix, Chunk10,
|
||||
Extra10 * Size10),
|
||||
{ok, [{_, Off10, Chunk10, _}]} =
|
||||
machi_cr_client:read_chunk(C1, File10, Off10, Size10),
|
||||
{ok, {[{_, Off10, Chunk10, _}], []}} =
|
||||
machi_cr_client:read_chunk(C1, File10, Off10, Size10, []),
|
||||
[begin
|
||||
Offx = Off10 + (Seq * Size10),
|
||||
%% TODO: uncomment written/not_written enforcement is available.
|
||||
|
@ -178,8 +186,8 @@ smoke_test2() ->
|
|||
%% Offx, Size10),
|
||||
{ok, {Offx,Size10,File10}} =
|
||||
machi_cr_client:write_chunk(C1, File10, Offx, Chunk10),
|
||||
{ok, [{_, Offx, Chunk10, _}]} =
|
||||
machi_cr_client:read_chunk(C1, File10, Offx, Size10)
|
||||
{ok, {[{_, Offx, Chunk10, _}], []}} =
|
||||
machi_cr_client:read_chunk(C1, File10, Offx, Size10, [])
|
||||
end || Seq <- lists:seq(1, Extra10)],
|
||||
{ok, {Off11,Size11,File11}} =
|
||||
machi_cr_client:append_chunk(C1, Prefix, Chunk10),
|
||||
|
@ -222,7 +230,8 @@ witness_smoke_test2() ->
|
|||
Chunk1_badcs = {<<?CSUM_TAG_CLIENT_SHA:8, 0:(8*20)>>, Chunk1},
|
||||
{error, bad_checksum} =
|
||||
machi_cr_client:append_chunk(C1, Prefix, Chunk1_badcs),
|
||||
{ok, [{_, Off1, Chunk1, _}]} = machi_cr_client:read_chunk(C1, File1, Off1, Size1),
|
||||
{ok, {[{_, Off1, Chunk1, _}], []}} =
|
||||
machi_cr_client:read_chunk(C1, File1, Off1, Size1, []),
|
||||
|
||||
%% Stop 'b' and let the chain reset.
|
||||
ok = machi_flu_psup:stop_flu_package(b),
|
||||
|
@ -248,8 +257,8 @@ witness_smoke_test2() ->
|
|||
end,
|
||||
|
||||
%% Chunk1 is still readable: not affected by wedged witness head.
|
||||
{ok, [{_, Off1, Chunk1, _}]} =
|
||||
machi_cr_client:read_chunk(C1, File1, Off1, Size1),
|
||||
{ok, {[{_, Off1, Chunk1, _}], []}} =
|
||||
machi_cr_client:read_chunk(C1, File1, Off1, Size1, []),
|
||||
%% But because the head is wedged, an append will fail.
|
||||
{error, partition} =
|
||||
machi_cr_client:append_chunk(C1, Prefix, Chunk1, 1*1000),
|
||||
|
|
|
@ -202,8 +202,9 @@ read_next(S, _Res, _Args) -> S.
|
|||
|
||||
read(Pid, Offset, Length) ->
|
||||
case machi_file_proxy:read(Pid, Offset, Length) of
|
||||
{ok, Chunks} ->
|
||||
[{_, Offset, Data, Csum}] = machi_cr_client:trim_both_side(Chunks, Offset, Offset+Length),
|
||||
{ok, {Chunks, _}} ->
|
||||
[{_, Offset, Data, Csum}] =
|
||||
machi_cr_client:trim_both_side(Chunks, Offset, Offset+Length),
|
||||
{ok, Data, Csum};
|
||||
E ->
|
||||
E
|
||||
|
|
|
@ -92,7 +92,7 @@ machi_file_proxy_test_() ->
|
|||
?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, random_binary(0, 1024))),
|
||||
?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1024, <<"fail">>)),
|
||||
?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1, <<"fail">>)),
|
||||
?_assertMatch({ok, [{_, _, _, _}]}, machi_file_proxy:read(Pid, 1025, 1000)),
|
||||
?_assertMatch({ok, {[{_, _, _, _}], []}}, machi_file_proxy:read(Pid, 1025, 1000)),
|
||||
?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, [], 1024, <<"mind the gap">>)),
|
||||
?_assertEqual(ok, machi_file_proxy:write(Pid, 2060, [], random_binary(0, 1024))),
|
||||
?_assertException(exit, {normal, _}, machi_file_proxy:stop(Pid))
|
||||
|
@ -108,11 +108,11 @@ multiple_chunks_read_test_() ->
|
|||
?_assertEqual(ok, machi_file_proxy:write(Pid, 30000, <<"fail">>)),
|
||||
%% Freeza
|
||||
?_assertEqual(ok, machi_file_proxy:write(Pid, 530000, <<"fail">>)),
|
||||
?_assertMatch({ok, [{"test", 1024, _, _},
|
||||
?_assertMatch({ok, {[{"test", 1024, _, _},
|
||||
{"test", 10000, <<"fail">>, _},
|
||||
{"test", 20000, <<"fail">>, _},
|
||||
{"test", 30000, <<"fail">>, _},
|
||||
{"test", 530000, <<"fail">>, _}]},
|
||||
{"test", 530000, <<"fail">>, _}], []}},
|
||||
machi_file_proxy:read(Pid, 1024, 530000)),
|
||||
?_assertException(exit, {normal, _}, machi_file_proxy:stop(Pid))
|
||||
].
|
||||
|
|
|
@ -97,8 +97,8 @@ flu_smoke_test() ->
|
|||
{ok, {Off1,Len1,File1}} = ?FLU_C:append_chunk(Host, TcpPort,
|
||||
?DUMMY_PV1_EPOCH,
|
||||
Prefix, Chunk1),
|
||||
{ok, [{_, Off1, Chunk1, _}]} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH,
|
||||
File1, Off1, Len1),
|
||||
{ok, {[{_, Off1, Chunk1, _}], _}} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH,
|
||||
File1, Off1, Len1, []),
|
||||
{ok, KludgeBin} = ?FLU_C:checksum_list(Host, TcpPort,
|
||||
?DUMMY_PV1_EPOCH, File1),
|
||||
true = is_binary(KludgeBin),
|
||||
|
@ -109,7 +109,7 @@ flu_smoke_test() ->
|
|||
Len1 = size(Chunk1),
|
||||
{error, not_written} = ?FLU_C:read_chunk(Host, TcpPort,
|
||||
?DUMMY_PV1_EPOCH,
|
||||
File1, Off1*983829323, Len1),
|
||||
File1, Off1*983829323, Len1, []),
|
||||
%% XXX FIXME
|
||||
%%
|
||||
%% This is failing because the read extends past the end of the file.
|
||||
|
@ -151,14 +151,14 @@ flu_smoke_test() ->
|
|||
File2, Off2, Chunk2),
|
||||
{error, bad_arg} = ?FLU_C:write_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH,
|
||||
BadFile, Off2, Chunk2),
|
||||
{ok, [{_, Off2, Chunk2, _}]} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH,
|
||||
File2, Off2, Len2),
|
||||
{ok, {[{_, Off2, Chunk2, _}], _}} =
|
||||
?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH, File2, Off2, Len2, []),
|
||||
{error, bad_arg} = ?FLU_C:read_chunk(Host, TcpPort,
|
||||
?DUMMY_PV1_EPOCH,
|
||||
"no!!", Off2, Len2),
|
||||
"no!!", Off2, Len2, []),
|
||||
{error, bad_arg} = ?FLU_C:read_chunk(Host, TcpPort,
|
||||
?DUMMY_PV1_EPOCH,
|
||||
BadFile, Off2, Len2),
|
||||
BadFile, Off2, Len2, []),
|
||||
|
||||
%% We know that File1 still exists. Pretend that we've done a
|
||||
%% migration and exercise the delete_migration() API.
|
||||
|
@ -261,7 +261,7 @@ witness_test() ->
|
|||
Prefix, Chunk1),
|
||||
File = <<"foofile">>,
|
||||
{error, bad_arg} = ?FLU_C:read_chunk(Host, TcpPort, EpochID1,
|
||||
File, 9999, 9999),
|
||||
File, 9999, 9999, []),
|
||||
{error, bad_arg} = ?FLU_C:checksum_list(Host, TcpPort, EpochID1,
|
||||
File),
|
||||
{error, bad_arg} = ?FLU_C:list_files(Host, TcpPort, EpochID1),
|
||||
|
|
|
@ -146,7 +146,7 @@ partial_stop_restart2() ->
|
|||
{_, #p_srvr{address=Addr_a, port=TcpPort_a}} = hd(Ps),
|
||||
{error, wedged} = machi_flu1_client:read_chunk(
|
||||
Addr_a, TcpPort_a, ?DUMMY_PV1_EPOCH,
|
||||
<<>>, 99999999, 1),
|
||||
<<>>, 99999999, 1, []),
|
||||
{error, wedged} = machi_flu1_client:checksum_list(
|
||||
Addr_a, TcpPort_a, ?DUMMY_PV1_EPOCH, <<>>),
|
||||
%% list_files() is permitted despite wedged status
|
||||
|
|
|
@ -81,8 +81,8 @@ smoke_test2() ->
|
|||
{iolist_to_binary(Chunk3), File3, Off3, Size3}],
|
||||
[begin
|
||||
File = binary_to_list(Fl),
|
||||
?assertMatch({ok, [{File, Off, Ch, _}]},
|
||||
?C:read_chunk(Clnt, Fl, Off, Sz))
|
||||
?assertMatch({ok, {[{File, Off, Ch, _}], []}},
|
||||
?C:read_chunk(Clnt, Fl, Off, Sz, []))
|
||||
end || {Ch, Fl, Off, Sz} <- Reads],
|
||||
|
||||
{ok, KludgeBin} = ?C:checksum_list(Clnt, File1),
|
||||
|
|
|
@ -60,14 +60,14 @@ api_smoke_test() ->
|
|||
{ok, {MyOff,MySize,MyFile}} =
|
||||
?MUT:append_chunk(Prox1, FakeEpoch, Prefix, MyChunk,
|
||||
infinity),
|
||||
{ok, [{_, MyOff, MyChunk, _}]} =
|
||||
?MUT:read_chunk(Prox1, FakeEpoch, MyFile, MyOff, MySize),
|
||||
{ok, {[{_, MyOff, MyChunk, _}], []}} =
|
||||
?MUT:read_chunk(Prox1, FakeEpoch, MyFile, MyOff, MySize, []),
|
||||
MyChunk2 = <<"my chunk data, yeah, again">>,
|
||||
{ok, {MyOff2,MySize2,MyFile2}} =
|
||||
?MUT:append_chunk_extra(Prox1, FakeEpoch, Prefix,
|
||||
MyChunk2, 4242, infinity),
|
||||
{ok, [{_, MyOff2, MyChunk2, _}]} =
|
||||
?MUT:read_chunk(Prox1, FakeEpoch, MyFile2, MyOff2, MySize2),
|
||||
{ok, {[{_, MyOff2, MyChunk2, _}], []}} =
|
||||
?MUT:read_chunk(Prox1, FakeEpoch, MyFile2, MyOff2, MySize2, []),
|
||||
MyChunk_badcs = {<<?CSUM_TAG_CLIENT_SHA:8, 0:(8*20)>>, MyChunk},
|
||||
{error, bad_checksum} = ?MUT:append_chunk(Prox1, FakeEpoch,
|
||||
Prefix, MyChunk_badcs),
|
||||
|
@ -245,13 +245,13 @@ flu_restart_test2() ->
|
|||
(stop) -> ?MUT:append_chunk_extra(Prox1, FakeEpoch,
|
||||
<<"prefix">>, Data, 42, infinity)
|
||||
end,
|
||||
fun(run) -> {ok, [{_, Off1, Data, _}]} =
|
||||
fun(run) -> {ok, {[{_, Off1, Data, _}], []}} =
|
||||
?MUT:read_chunk(Prox1, FakeEpoch,
|
||||
File1, Off1, Size1),
|
||||
File1, Off1, Size1, []),
|
||||
ok;
|
||||
(line) -> io:format("line ~p, ", [?LINE]);
|
||||
(stop) -> ?MUT:read_chunk(Prox1, FakeEpoch,
|
||||
File1, Off1, Size1)
|
||||
File1, Off1, Size1, [])
|
||||
end,
|
||||
fun(run) -> {ok, KludgeBin} =
|
||||
?MUT:checksum_list(Prox1, FakeEpoch, File1),
|
||||
|
|
Loading…
Reference in a new issue