Allow reading multiple chunks at once

* When repairing multiple chunks at once and any of its repair
  failed, the whole read request and repair work will fail
* Rename read_repair3 and read_repair4 to do_repair_chunks and
  do_repair chunk in machi_file_proxy
* This pull request changes return semantics of read_chunk(), that
  returns any chunk included in requested range
* First and last chunk may be cut to fit the requested range
* In machi_file_proxy, unwritten_bytes are removed and replaced by
  machi_csum_table
This commit is contained in:
UENISHI Kota 2015-10-20 17:38:09 +09:00
parent 1193fb8510
commit ebb9bc3f5a
8 changed files with 265 additions and 282 deletions

View file

@ -112,6 +112,7 @@
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-export([trim_both_side/3]).
-endif. % TEST.
-export([start_link/1]).
@ -432,8 +433,8 @@ do_append_midtail2([FLU|RestFLUs]=FLUs, Prefix, File, Offset, Chunk,
%% We know what the chunk ought to be, so jump to the
%% middle of read-repair.
Resume = {append, Offset, iolist_size(Chunk), File},
read_repair3(FLUs, Resume, Chunk, [], File, Offset,
iolist_size(Chunk), Depth, STime, S);
do_repair_chunk(FLUs, Resume, Chunk, [], File, Offset,
iolist_size(Chunk), Depth, STime, S);
{error, not_written} ->
exit({todo_should_never_happen,?MODULE,?LINE,File,Offset})
end.
@ -529,14 +530,13 @@ do_read_chunk2(File, Offset, Size, Depth, STime, TO,
ConsistencyMode = P#projection_v1.mode,
case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), EpochID,
File, Offset, Size, ?TIMEOUT) of
{ok, [{_, _, Chunk, _Csum}] = Chunks} when byte_size(Chunk) == Size ->
{ok, Chunks0} when is_list(Chunks0) ->
Chunks = trim_both_side(Chunks0, Offset, Size),
{reply, {ok, Chunks}, S};
{ok, Chunks} when is_list(Chunks) ->
{reply, {ok, Chunks}, S};
{ok, BadChunk} ->
%% TODO cleaner handling of bad chunks
exit({todo, bad_chunk_size, ?MODULE, ?LINE, File, Offset, Size,
got, byte_size(BadChunk)});
%% {ok, BadChunk} ->
%% %% TODO cleaner handling of bad chunks
%% exit({todo, bad_chunk_size, ?MODULE, ?LINE, File, Offset, Size,
%% got, byte_size(BadChunk)});
{error, bad_arg} = BadArg ->
{reply, BadArg, S};
{error, partial_read}=Err ->
@ -599,13 +599,14 @@ read_repair2(cp_mode=ConsistencyMode,
Tail = lists:last(readonly_flus(P)),
case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), EpochID,
File, Offset, Size, ?TIMEOUT) of
{ok, [{_, Offset, Chunk, _}]} when byte_size(Chunk) == Size ->
{ok, Chunks} when is_list(Chunks) ->
ToRepair = mutation_flus(P) -- [Tail],
read_repair3(ToRepair, ReturnMode, Chunk, [Tail], File, Offset,
Size, Depth, STime, S);
{ok, BadChunk} ->
exit({todo, bad_chunk_size, ?MODULE, ?LINE, File, Offset,
Size, got, byte_size(BadChunk)});
{Reply, S1} = do_repair_chunks(Chunks, ToRepair, ReturnMode,
[Tail], File, Depth, STime, S, {ok, Chunks}),
{reply, Reply, S1};
%% {ok, BadChunk} ->
%% exit({todo, bad_chunk_size, ?MODULE, ?LINE, File, Offset,
%% Size, got, byte_size(BadChunk)});
{error, bad_checksum}=BadCS ->
%% TODO: alternate strategy?
{reply, BadCS, S};
@ -623,26 +624,11 @@ read_repair2(ap_mode=ConsistencyMode,
#state{proj=P}=S) ->
Eligible = mutation_flus(P),
case try_to_find_chunk(Eligible, File, Offset, Size, S) of
{ok, [{File0,Offset0,Chunk0,Csum}], GotItFrom} when byte_size(Chunk0) == Size ->
{ok, Chunks, GotItFrom} when is_list(Chunks) ->
ToRepair = mutation_flus(P) -- [GotItFrom],
%% TODO: stop matching single-size list
%% {RepairedChunks, S2} =
%% lists:foldl(fun({_, Offset0, Chunk0, Csum}, {Chunks0, S0}) ->
%% Size0 = byte_size(Chunk0),
%% {reply, {ok, Chunk1}, S1} =
%% read_repair3(ToRepair, ReturnMode, Chunk0, [GotItFrom], File,
%% Offset0, Size0, Depth, STime, S0),
%% {[{File, Offset0, Chunk1, Csum}|Chunks0], S1}
%% end,
%% {[], S}, Chunks),
%% {reply, {ok, RepairedChunks}, S2};
{reply, {ok, RepairedChunk}, S2}
= read_repair3(ToRepair, ReturnMode, Chunk0, [GotItFrom], File0,
Offset0, Size, Depth, STime, S),
{reply, {ok, [{File0, Offset0, RepairedChunk, Csum}]}, S2};
{ok, BadChunk, _GotItFrom} ->
exit({todo, bad_chunk_size, ?MODULE, ?LINE, File,
Offset, Size, got, byte_size(BadChunk)});
{Reply, S1} = do_repair_chunks(Chunks, ToRepair, ReturnMode, [GotItFrom],
File, Depth, STime, S, {ok, Chunks}),
{reply, Reply, S1};
{error, bad_checksum}=BadCS ->
%% TODO: alternate strategy?
{reply, BadCS, S};
@ -656,19 +642,21 @@ read_repair2(ap_mode=ConsistencyMode,
exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size})
end.
read_repair3([], ReturnMode, Chunk, Repaired, File, Offset,
Size, Depth, STime, S) ->
read_repair4([], ReturnMode, Chunk, Repaired, File, Offset,
Size, Depth, STime, S);
%% Never matches because Depth is always incremented beyond 0 prior to
%% getting here.
%%
%% read_repair3(ToRepair, ReturnMode, Chunk, Repaired, File, Offset,
%% Size, 0=Depth, STime, S) ->
%% read_repair4(ToRepair, ReturnMode, Chunk, Repaired, File, Offset,
%% Size, Depth + 1, STime, S);
read_repair3(ToRepair, ReturnMode, Chunk, Repaired, File, Offset,
Size, Depth, STime, #state{proj=P}=S) ->
do_repair_chunks([], _, _, _, _, _, _, S, Reply) ->
{Reply, S};
do_repair_chunks([{_, Offset, Chunk, _Csum}|T],
ToRepair, ReturnMode, [GotItFrom], File, Depth, STime, S, Reply) ->
Size = iolist_size(Chunk),
case do_repair_chunk(ToRepair, ReturnMode, Chunk, [GotItFrom], File, Offset,
Size, Depth, STime, S) of
{ok, Chunk, S1} ->
do_repair_chunks(T, ToRepair, ReturnMode, [GotItFrom], File, Depth, STime, S1, Reply);
Error ->
Error
end.
do_repair_chunk(ToRepair, ReturnMode, Chunk, Repaired, File, Offset,
Size, Depth, STime, #state{proj=P}=S) ->
%% io:format(user, "read_repair3 sleep1,", []),
sleep_a_while(Depth),
DiffMs = timer:now_diff(os:timestamp(), STime) div 1000,
@ -679,43 +667,43 @@ read_repair3(ToRepair, ReturnMode, Chunk, Repaired, File, Offset,
case S2#state.proj of
P2 when P2 == undefined orelse
P2#projection_v1.upi == [] ->
read_repair3(ToRepair, ReturnMode, Chunk, Repaired, File,
Offset, Size, Depth + 1, STime, S2);
do_repair_chunk(ToRepair, ReturnMode, Chunk, Repaired, File,
Offset, Size, Depth + 1, STime, S2);
P2 ->
ToRepair2 = mutation_flus(P2) -- Repaired,
read_repair4(ToRepair2, ReturnMode, Chunk, Repaired, File,
Offset, Size, Depth + 1, STime, S2)
do_repair_chunk2(ToRepair2, ReturnMode, Chunk, Repaired, File,
Offset, Size, Depth + 1, STime, S2)
end
end.
read_repair4([], ReturnMode, Chunk, _Repaired, File, Offset,
_IgnoreSize, _Depth, _STime, S) ->
do_repair_chunk2([], ReturnMode, Chunk, _Repaired, File, Offset,
_IgnoreSize, _Depth, _STime, S) ->
%% TODO: add stats for # of repairs, length(_Repaired)-1, etc etc?
case ReturnMode of
read ->
{reply, {ok, Chunk}, S};
{ok, Chunk, S};
{append, Offset, Size, File} ->
{reply, {ok, {Offset, Size, File}}, S}
{ok, {Offset, Size, File}, S}
end;
read_repair4([First|Rest]=ToRepair, ReturnMode, Chunk, Repaired, File, Offset,
Size, Depth, STime, #state{epoch_id=EpochID, proxies_dict=PD}=S) ->
do_repair_chunk2([First|Rest]=ToRepair, ReturnMode, Chunk, Repaired, File, Offset,
Size, Depth, STime, #state{epoch_id=EpochID, proxies_dict=PD}=S) ->
Proxy = orddict:fetch(First, PD),
case ?FLU_PC:write_chunk(Proxy, EpochID, File, Offset, Chunk, ?TIMEOUT) of
ok ->
read_repair4(Rest, ReturnMode, Chunk, [First|Repaired], File,
Offset, Size, Depth, STime, S);
do_repair_chunk2(Rest, ReturnMode, Chunk, [First|Repaired], File,
Offset, Size, Depth, STime, S);
{error, bad_checksum}=BadCS ->
%% TODO: alternate strategy?
{reply, BadCS, S};
{BadCS, S};
{error, Retry}
when Retry == partition; Retry == bad_epoch; Retry == wedged ->
read_repair3(ToRepair, ReturnMode, Chunk, Repaired, File,
Offset, Size, Depth, STime, S);
do_repair_chunk(ToRepair, ReturnMode, Chunk, Repaired, File,
Offset, Size, Depth, STime, S);
{error, written} ->
%% TODO: To be very paranoid, read the chunk here to verify
%% that it is exactly our Chunk.
read_repair4(Rest, ReturnMode, Chunk, Repaired, File,
Offset, Size, Depth, STime, S);
do_repair_chunk2(Rest, ReturnMode, Chunk, Repaired, File,
Offset, Size, Depth, STime, S);
{error, not_written} ->
exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size})
end.
@ -872,7 +860,7 @@ try_to_find_chunk(Eligible, File, Offset, Size,
Proxy = orddict:fetch(FLU, PD),
case ?FLU_PC:read_chunk(Proxy, EpochID,
File, Offset, Size) of
{ok, [{_, Offset, Chunk, _}] = Chunks} when byte_size(Chunk) == Size ->
{ok, Chunks} when is_list(Chunks) ->
{FLU, {ok, Chunks}};
Else ->
{FLU, Else}
@ -935,3 +923,22 @@ timeout(infinity) ->
timeout(15*60*1000); % close enough to infinity
timeout(Timeout0) ->
{Timeout0, Timeout0 + 30*1000}.
trim_both_side([], _Offset, _Size) -> [];
trim_both_side([{F, Offset0, Chunk, _Csum}|L], Offset, Size)
when Offset0 < Offset ->
TrashLen = 8 * (Offset - Offset0),
<<_:TrashLen/binary, NewChunk/binary>> = Chunk,
NewH = {F, Offset, NewChunk, <<>>},
trim_both_side([NewH|L], Offset, Size);
trim_both_side(Chunks, Offset, Size) ->
%% TODO: optimize
[{F, Offset1, Chunk1, _Csum1}|L] = lists:reverse(Chunks),
Size1 = iolist_size(Chunk1),
if Offset + Size < Offset1 + Size1 ->
Size2 = Offset + Size - Offset1,
<<NewChunk1:Size2/binary, _/binary>> = Chunk1,
lists:reverse([{F, Offset1, NewChunk1, <<>>}|L]);
true ->
Chunks
end.

View file

@ -2,6 +2,7 @@
-export([open/2,
find/3, write/4, trim/3,
all_trimmed/2,
sync/1,
calc_unwritten_bytes/1,
close/1, delete/1]).
@ -11,7 +12,7 @@
-include("machi.hrl").
-ifdef(TEST).
-export([split_checksum_list_blob_decode/1]).
-export([split_checksum_list_blob_decode/1, all/1]).
-endif.
-record(machi_csum_table,
@ -29,6 +30,9 @@
{ok, table()} | {error, file:posix()}.
open(CSumFilename, _Opts) ->
T = ets:new(?MODULE, [private, ordered_set]),
CSum = machi_util:make_tagged_csum(none),
%% Dummy entry for headers
true = ets:insert_new(T, {0, ?MINIMUM_OFFSET, CSum}),
C0 = #machi_csum_table{
file=CSumFilename,
table=T},
@ -57,15 +61,18 @@ open(CSumFilename, _Opts) ->
{ok, C0#machi_csum_table{fd=Fd}}.
-spec find(table(), machi_dt:chunk_pos(), machi_dt:chunk_size()) ->
{ok, machi_dt:chunk_csum()} | {error, trimmed|notfound}.
list({machi_dt:chunk_pos(),
machi_dt:chunk_size(),
machi_dt:csum()}).
find(#machi_csum_table{table=T}, Offset, Size) ->
%% TODO: Check whether all bytes here are written or not
case ets:lookup(T, Offset) of
[{Offset, Size, trimmed}] -> {error, trimmed};
[{Offset, Size, Checksum}] -> {ok, Checksum};
[{Offset, _, _}] -> {error, unknown_chunk};
[] -> {error, unknown_chunk}
end.
ets:select(T, [{{'$1', '$2', '$3'},
[inclusion_match_spec(Offset, Size)],
['$_']}]).
-ifdef(TEST).
all(#machi_csum_table{table=T}) ->
ets:tab2list(T).
-endif.
-spec write(table(), machi_dt:chunk_pos(), machi_dt:chunk_size(),
machi_dt:chunk_csum()) ->
@ -96,6 +103,10 @@ trim(#machi_csum_table{fd=Fd, table=T}, Offset, Size) ->
Error
end.
-spec all_trimmed(table(), machi_dt:chunk_pos()) -> boolean().
all_trimmed(#machi_csum_table{table=T}, Pos) ->
runthru(ets:tab2list(T), 0, Pos).
-spec sync(table()) -> ok | {error, file:posix()}.
sync(#machi_csum_table{fd=Fd}) ->
file:sync(Fd).
@ -218,3 +229,22 @@ build_unwritten_bytes_list([{CurrentOffset, CurrentSize, _Csum}|Rest], LastOffse
build_unwritten_bytes_list(Rest, (CurrentOffset+CurrentSize), [{LastOffset, Hole}|Acc]);
build_unwritten_bytes_list([{CO, CS, _Ck}|Rest], _LastOffset, Acc) ->
build_unwritten_bytes_list(Rest, CO + CS, Acc).
%% @doc make sure all trimmed chunks are continously chained
%% TODO: test with EQC
runthru([], Pos, Pos) -> true;
runthru([], Pos0, Pos) when Pos0 < Pos -> false;
runthru([{Offset, Size, trimmed}|T], Offset, Pos) ->
runthru(T, Offset+Size, Pos);
runthru(_, _, _) ->
false.
%% @doc If you want to find an overlap among two areas [x, y] and [a,
%% b] where x < y and a < b; if (a-y)*(b-x) < 0 then there's a
%% overlap, else, > 0 then there're no overlap. border condition = 0
%% is not overlap in this offset-size case.
inclusion_match_spec(Offset, Size) ->
{'>', 0,
{'*',
{'-', Offset + Size, '$1'},
{'-', Offset, {'+', '$1', '$2'}}}}.

View file

@ -76,9 +76,6 @@
-type op_stats() :: { Total :: non_neg_integer(),
Errors :: non_neg_integer() }.
-type byte_sequence() :: { Offset :: non_neg_integer(),
Size :: pos_integer()|infinity }.
-record(state, {
data_dir :: string() | undefined,
filename :: string() | undefined,
@ -87,7 +84,6 @@
csum_file :: string()|undefined,
csum_path :: string()|undefined,
eof_position = 0 :: non_neg_integer(),
unwritten_bytes = [] :: [byte_sequence()],
data_filehandle :: file:filehandle(),
csum_table :: machi_csum_table:table(),
tref :: reference(), %% timer ref
@ -129,7 +125,10 @@ sync(_Pid, Type) ->
lager:warning("Bad arg to sync: Type ~p", [Type]),
{error, bad_arg}.
% @doc Read file at offset for length
% @doc Read file at offset for length. This returns a sequence of all
% chunks that overlaps with requested offset and length. Note that
% borders are not alinged, not to mess up repair at cr_client with
% checksums. They should be cut at cr_client.
-spec read(Pid :: pid(),
Offset :: non_neg_integer(),
Length :: non_neg_integer()) ->
@ -211,7 +210,6 @@ init({Filename, DataDir}) ->
data_filehandle = FHd,
csum_table = CsumTable,
tref = Tref,
unwritten_bytes = UnwrittenBytes,
eof_position = Eof},
lager:debug("Starting file proxy ~p for filename ~p, state = ~p",
[self(), Filename, St]),
@ -261,7 +259,8 @@ handle_call({read, _Offset, _Length}, _From,
handle_call({read, Offset, Length}, _From,
State = #state{eof_position = Eof,
reads = {T, Err}
}) when Offset + Length > Eof ->
}) when Offset > Eof ->
%% make sure [Offset, Offset+Length) has an overlap with file range
lager:error("Read request at offset ~p for ~p bytes is past the last write offset of ~p",
[Offset, Length, Eof]),
{reply, {error, not_written}, State#state{reads = {T + 1, Err + 1}}};
@ -270,19 +269,12 @@ handle_call({read, Offset, Length}, _From,
State = #state{filename = F,
data_filehandle = FH,
csum_table = CsumTable,
unwritten_bytes = U,
reads = {T, Err}
}) ->
Checksum = case machi_csum_table:find(CsumTable, Offset, Length) of
{ok, Checksum0} ->
Checksum0;
_ ->
undefined
end,
{Resp, NewErr} =
case handle_read(FH, F, Checksum, Offset, Length, U) of
case do_read(FH, F, CsumTable, Offset, Length) of
{ok, []} ->
{{error, not_written}, Err + 1};
{ok, Chunks} ->
%% Kludge to wrap read result in tuples, to support fragmented read
%% XXX FIXME
@ -305,35 +297,31 @@ handle_call({write, _Offset, _ClientMeta, _Data}, _From,
{reply, {error, wedged}, State#state{writes = {T + 1, Err + 1}}};
handle_call({write, Offset, ClientMeta, Data}, _From,
State = #state{unwritten_bytes = U,
filename = F,
State = #state{filename = F,
writes = {T, Err},
data_filehandle = FHd,
csum_table = CsumTable
}) ->
csum_table = CsumTable}) ->
ClientCsumTag = proplists:get_value(client_csum_tag, ClientMeta, ?CSUM_TAG_NONE),
ClientCsum = proplists:get_value(client_csum, ClientMeta, <<>>),
{Resp, NewErr, NewU} =
{Resp, NewErr} =
case check_or_make_tagged_csum(ClientCsumTag, ClientCsum, Data) of
{error, {bad_csum, Bad}} ->
lager:error("Bad checksum on write; client sent ~p, we computed ~p",
[ClientCsum, Bad]),
{{error, bad_checksum}, Err + 1, U};
{{error, bad_checksum}, Err + 1};
TaggedCsum ->
case handle_write(FHd, CsumTable, F, TaggedCsum, Offset, Data, U) of
{ok, NewU1} ->
{ok, Err, NewU1};
case handle_write(FHd, CsumTable, F, TaggedCsum, Offset, Data) of
ok ->
{ok, Err};
Error ->
{Error, Err + 1, U}
{Error, Err + 1}
end
end,
{NewEof, infinity} = lists:last(NewU),
{NewEof, infinity} = lists:last(machi_csum_table:calc_unwritten_bytes(CsumTable)),
{reply, Resp, State#state{writes = {T+1, NewErr},
eof_position = NewEof,
unwritten_bytes = NewU
}};
eof_position = NewEof}};
%% APPENDS
@ -345,7 +333,6 @@ handle_call({append, _ClientMeta, _Extra, _Data}, _From,
handle_call({append, ClientMeta, Extra, Data}, _From,
State = #state{eof_position = EofP,
unwritten_bytes = U,
filename = F,
appends = {T, Err},
data_filehandle = FHd,
@ -355,24 +342,25 @@ handle_call({append, ClientMeta, Extra, Data}, _From,
ClientCsumTag = proplists:get_value(client_csum_tag, ClientMeta, ?CSUM_TAG_NONE),
ClientCsum = proplists:get_value(client_csum, ClientMeta, <<>>),
{Resp, NewErr, NewU} =
{Resp, NewErr} =
case check_or_make_tagged_csum(ClientCsumTag, ClientCsum, Data) of
{error, {bad_csum, Bad}} ->
lager:error("Bad checksum; client sent ~p, we computed ~p",
[ClientCsum, Bad]),
{{error, bad_checksum}, Err + 1, U};
{{error, bad_checksum}, Err + 1};
TaggedCsum ->
case handle_write(FHd, CsumTable, F, TaggedCsum, EofP, Data, U) of
{ok, NewU1} ->
{{ok, F, EofP}, Err, NewU1};
case handle_write(FHd, CsumTable, F, TaggedCsum, EofP, Data) of
ok ->
{{ok, F, EofP}, Err};
Error ->
{Error, Err + 1, EofP, U}
{Error, Err + 1, EofP}
end
end,
{NewEof, infinity} = lists:last(NewU),
%% TODO: do we check this with calling
%% machi_csum_table:calc_unwritten_bytes/1?
NewEof = EofP + byte_size(Data) + Extra,
{reply, Resp, State#state{appends = {T+1, NewErr},
eof_position = NewEof + Extra,
unwritten_bytes = NewU
eof_position = NewEof
}};
handle_call(Req, _From, State) ->
@ -510,17 +498,15 @@ check_or_make_tagged_csum(OtherTag, _ClientCsum, _Data) ->
lager:warning("Unknown checksum tag ~p", [OtherTag]),
{error, bad_checksum}.
-spec handle_read(FHd :: file:filehandle(),
Filename :: string(),
TaggedCsum :: undefined|binary(),
Offset :: non_neg_integer(),
Size :: non_neg_integer(),
Unwritten :: [byte_sequence()]
) -> {ok, Bytes :: binary(), Csum :: binary()} |
eof |
-spec do_read(FHd :: file:filehandle(),
Filename :: string(),
CsumTable :: machi_csum_table:table(),
Offset :: non_neg_integer(),
Size :: non_neg_integer()
) -> {ok, Chunks :: [{string(), Offset::non_neg_integer(), binary(), Csum :: binary()}]} |
{error, bad_checksum} |
{error, partial_read} |
{error, not_written} |
{error, file:posix()} |
{error, Other :: term() }.
% @private Attempt a read operation on the given offset and length.
% <li>
@ -535,22 +521,19 @@ check_or_make_tagged_csum(OtherTag, _ClientCsum, _Data) ->
% tuple is returned.</ul>
% </li>
%
% On success, `{ok, Bytes, Checksum}' is returned.
handle_read(FHd, Filename, undefined, Offset, Size, U) ->
handle_read(FHd, Filename, machi_util:make_tagged_csum(none), Offset, Size, U);
do_read(FHd, Filename, CsumTable, Offset, Size) ->
%% Note that find/3 only returns overlapping chunks, both borders
%% are not aligned to original Offset and Size.
ChunkCsums = machi_csum_table:find(CsumTable, Offset, Size),
read_all_ranges(FHd, Filename, ChunkCsums, []).
handle_read(FHd, Filename, TaggedCsum, Offset, Size, U) ->
case is_byte_range_unwritten(Offset, Size, U) of
true ->
{error, not_written};
false ->
do_read(FHd, Filename, TaggedCsum, Offset, Size)
end.
read_all_ranges(_, _, [], ReadChunks) ->
{ok, lists:reverse(ReadChunks)};
do_read(FHd, Filename, TaggedCsum, Offset, Size) ->
read_all_ranges(FHd, Filename, [{Offset, Size, TaggedCsum}|T], ReadChunks) ->
case file:pread(FHd, Offset, Size) of
eof ->
eof;
read_all_ranges(FHd, Filename, T, ReadChunks);
{ok, Bytes} when byte_size(Bytes) == Size ->
{Tag, Ck} = machi_util:unmake_tagged_csum(TaggedCsum),
case check_or_make_tagged_csum(Tag, Ck, Bytes) of
@ -559,11 +542,13 @@ do_read(FHd, Filename, TaggedCsum, Offset, Size) ->
[Bad, Ck]),
{error, bad_checksum};
TaggedCsum ->
{ok, [{Filename, Offset, Bytes, TaggedCsum}]};
read_all_ranges(FHd, Filename, T,
[{Filename, Offset, Bytes, TaggedCsum}|ReadChunks]);
OtherCsum when Tag =:= ?CSUM_TAG_NONE ->
%% XXX FIXME: Should we return something other than
%% {ok, ....} in this case?
{ok, [{Filename, Offset, Bytes, OtherCsum}]}
read_all_ranges(FHd, Filename, T,
[{Filename, Offset, Bytes, OtherCsum}|ReadChunks])
end;
{ok, Partial} ->
lager:error("In file ~p, offset ~p, wanted to read ~p bytes, but got ~p",
@ -580,9 +565,8 @@ do_read(FHd, Filename, TaggedCsum, Offset, Size) ->
Filename :: string(),
TaggedCsum :: binary(),
Offset :: non_neg_integer(),
Data :: binary(),
Unwritten :: [byte_sequence()]
) -> {ok, NewU :: [byte_sequence()]} |
Data :: binary()
) -> ok |
{error, written} |
{error, Reason :: term()}.
% @private Implements the write and append operation. The first task is to
@ -592,50 +576,44 @@ do_read(FHd, Filename, TaggedCsum, Offset, Size) ->
% checksum and return a "fake" ok response as if the write had been performed
% when it hasn't really.
%
% If a write proceeds, the offset, size and checksum are written to a metadata
% file, and the internal list of unwritten bytes is modified to reflect the
% just-performed write. This is then returned to the caller as
% `{ok, NewUnwritten}' where NewUnwritten is the revised unwritten byte list.
handle_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Data, U) ->
% If a write proceeds, the offset, size and checksum are written to a
% metadata file, and the internal list of unwritten bytes is modified
% to reflect the just-performed write. This is then returned to the
% caller as `ok'
handle_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Data) ->
Size = iolist_size(Data),
case is_byte_range_unwritten(Offset, Size, U) of
false ->
case machi_csum_table:find(CsumTable, Offset, Size) of
{error, trimmed} = Error ->
Error;
{error, unknown_chunk} ->
%% The specified has some bytes written, while
%% it's not in the checksum table. Trust U and
%% return as it is used.
{error, written};
{ok, TaggedCsum} ->
case do_read(FHd, Filename, TaggedCsum, Offset, Size) of
eof ->
lager:warning("This should never happen: got eof while reading at offset ~p in file ~p that's supposedly written",
[Offset, Filename]),
{error, server_insanity};
{ok, _} ->
{ok, U};
_ ->
{error, written}
end;
{ok, OtherCsum} ->
%% Got a checksum, but it doesn't match the data block's
lager:error("During a potential write at offset ~p in file ~p, a check for unwritten bytes gave us checksum ~p but the data we were trying to trying to write has checksum ~p",
[Offset, Filename, OtherCsum, TaggedCsum]),
{error, written}
end;
true ->
case machi_csum_table:find(CsumTable, Offset, Size) of
[] ->
try
do_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Size, Data, U)
do_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Size, Data)
catch
%%% XXX FIXME: be more specific on badmatch that might
%%% occur around line 593 when we write the checksum
%%% file entry for the data blob we just put on the disk
%% XXX FIXME: be more specific on badmatch that might
%% occur around line 593 when we write the checksum
%% file entry for the data blob we just put on the disk
error:Reason ->
{error, Reason}
end
end;
[{Offset, Size, TaggedCsum}] ->
case do_read(FHd, Filename, CsumTable, Offset, Size) of
{error, _} = E ->
lager:warning("This should never happen: got ~p while reading at offset ~p in file ~p that's supposedly written",
[E, Offset, Filename]),
{error, server_insanity};
{ok, [{_, Offset, Data, TaggedCsum}]} ->
%% TODO: what if different checksum got from do_read()?
ok;
{ok, _Other} ->
%% TODO: leave some debug/warning message here?
{error, written}
end;
[{Offset, Size, OtherCsum}] ->
%% Got a checksum, but it doesn't match the data block's
lager:error("During a potential write at offset ~p in file ~p, a check for unwritten bytes gave us checksum ~p but the data we were trying to trying to write has checksum ~p",
[Offset, Filename, OtherCsum, TaggedCsum]),
{error, written};
_Chunks ->
%% No byte is trimmed, but at least one byte is written
{error, written}
end.
% @private Implements the disk writes for both the write and append
@ -646,99 +624,20 @@ handle_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Data, U) ->
TaggedCsum :: binary(),
Offset :: non_neg_integer(),
Size :: non_neg_integer(),
Data :: binary(),
Unwritten :: [byte_sequence()]
) -> {ok, NewUnwritten :: [byte_sequence()]} |
{error, Reason :: term()}.
do_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Size, Data, U) ->
Data :: binary()
) -> ok | {error, Reason :: term()}.
do_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Size, Data) ->
case file:pwrite(FHd, Offset, Data) of
ok ->
lager:debug("Successful write in file ~p at offset ~p, length ~p",
[Filename, Offset, Size]),
ok = machi_csum_table:write(CsumTable, Offset, Size, TaggedCsum),
NewU = update_unwritten(Offset, Size, U),
lager:debug("Successful write to checksum file for ~p; unwritten bytes are now: ~p",
[Filename, NewU]),
{ok, NewU};
lager:debug("Successful write to checksum file for ~p",
[Filename]),
%% io:format(user, "here, heh ~p~n", [?LINE]),
ok;
Other ->
lager:error("Got ~p during write to file ~p at offset ~p, length ~p",
[Other, Filename, Offset, Size]),
{error, Other}
end.
-spec is_byte_range_unwritten( Offset :: non_neg_integer(),
Size :: pos_integer(),
Unwritten :: [byte_sequence()] ) -> boolean().
% @private Given an offset and a size, return `true' if a byte range has
% <b>not</b> been written. Otherwise, return `false'.
is_byte_range_unwritten(Offset, Size, Unwritten) ->
case Unwritten of
[] ->
lager:critical("Unwritten byte list has 0 entries! This should never happen."),
false;
[{Eof, infinity}] ->
Offset >= Eof;
_ ->
case lookup_unwritten(Offset, Size, Unwritten) of
{ok, _} -> true;
not_found -> false
end
end.
-spec lookup_unwritten( Offset :: non_neg_integer(),
Size :: pos_integer(),
Unwritten :: [byte_sequence()]
) -> {ok, byte_sequence()} | not_found.
% @private Given an offset and a size, scan the list of unwritten bytes and
% look for a "hole" where a write might be allowed if any exist. If a
% suitable byte sequence is found, the function returns a tuple of {ok,
% {Position, Space}} is returned. `not_found' is returned if no suitable
% space is located.
lookup_unwritten(_Offset, _Size, []) ->
not_found;
lookup_unwritten(Offset, _Size, [H={Pos, infinity}|_Rest]) when Offset >= Pos ->
{ok, H};
lookup_unwritten(Offset, Size, [H={Pos, Space}|_Rest])
when Offset >= Pos andalso Offset < Pos+Space
andalso Size =< (Space - (Offset - Pos)) ->
{ok, H};
lookup_unwritten(Offset, Size, [_H|Rest]) ->
%% These are not the droids you're looking for.
lookup_unwritten(Offset, Size, Rest).
%%% if the pos is greater than offset + size then we're done. End early.
-spec update_unwritten( Offset :: non_neg_integer(),
Size :: pos_integer(),
Unwritten :: [byte_sequence()] ) -> NewUnwritten :: [byte_sequence()].
% @private Given an offset, a size and the unwritten byte list, return an updated
% and sorted unwritten byte list accounting for any completed write operation.
update_unwritten(Offset, Size, Unwritten) ->
case lookup_unwritten(Offset, Size, Unwritten) of
not_found ->
lager:error("Couldn't find byte sequence tuple for a write which earlier found a valid spot to write!!! This should never happen!"),
Unwritten;
{ok, {Offset, Size}} ->
%% we neatly filled in our hole...
lists:keydelete(Offset, 1, Unwritten);
{ok, S={Pos, _}} ->
lists:sort(lists:keydelete(Pos, 1, Unwritten) ++
update_byte_range(Offset, Size, S))
end.
-spec update_byte_range( Offset :: non_neg_integer(),
Size :: pos_integer(),
Sequence :: byte_sequence() ) -> Updates :: [byte_sequence()].
% @private Given an offset and size and a byte sequence tuple where a
% write took place, return a list of updates to the list of unwritten bytes
% accounting for the space occupied by the just completed write.
update_byte_range(Offset, Size, {Eof, infinity}) when Offset == Eof ->
[{Offset + Size, infinity}];
update_byte_range(Offset, Size, {Eof, infinity}) when Offset > Eof ->
[{Eof, (Offset - Eof)}, {Offset+Size, infinity}];
update_byte_range(Offset, Size, {Pos, Space}) when Offset == Pos andalso Size < Space ->
[{Offset + Size, Space - Size}];
update_byte_range(Offset, Size, {Pos, Space}) when Offset > Pos ->
[{Pos, Offset - Pos}, {Offset+Size, ( (Pos+Space) - (Offset + Size) )}].

View file

@ -659,7 +659,6 @@ to_pb_response(ReqID, {high_read_chunk, _File, _Offset, _Size}, Resp) ->
{ok, Chunks} ->
MpbChunks = lists:map(fun({File, Offset, Bytes, Csum}) ->
{Tag, Ck} = machi_util:unmake_tagged_csum(Csum),
io:format(user, "~p oom~n", [Csum]),
#mpb_chunk{
offset=Offset,
file_name=File,

View file

@ -149,8 +149,10 @@ smoke_test2() ->
%% Misc API smoke & minor regression checks
{error, bad_arg} = machi_cr_client:read_chunk(C1, <<"no">>,
999999999, 1),
{error, not_written} = machi_cr_client:read_chunk(C1, File1,
Off1, 88888888),
{ok, [{_,Off1,Chunk1,_},
{_,FooOff1,Chunk1,_},
{_,FooOff2,Chunk2,_}]} = machi_cr_client:read_chunk(C1, File1,
Off1, 88888888),
%% Checksum list return value is a primitive binary().
{ok, KludgeBin} = machi_cr_client:checksum_list(C1, File1),
true = is_binary(KludgeBin),

View file

@ -1,16 +1,21 @@
-module(machi_csum_table_test).
-compile(export_all).
-include_lib("eunit/include/eunit.hrl").
-define(HDR, {0, 1024, <<0>>}).
smoke_test() ->
Filename = "./temp-checksum-dumb-file",
_ = file:delete(Filename),
{ok, MC} = machi_csum_table:open(Filename, []),
{Offset, Size, Checksum} = {64, 34, <<"deadbeef">>},
{error, unknown_chunk} = machi_csum_table:find(MC, Offset, Size),
[{1024, infinity}] = machi_csum_table:calc_unwritten_bytes(MC),
Entry = {Offset, Size, Checksum} = {1064, 34, <<"deadbeef">>},
[] = machi_csum_table:find(MC, Offset, Size),
ok = machi_csum_table:write(MC, Offset, Size, Checksum),
{ok, Checksum} = machi_csum_table:find(MC, Offset, Size),
[{1024, 40}, {1098, infinity}] = machi_csum_table:calc_unwritten_bytes(MC),
[Entry] = machi_csum_table:find(MC, Offset, Size),
ok = machi_csum_table:trim(MC, Offset, Size),
{error, trimmed} = machi_csum_table:find(MC, Offset, Size),
[{Offset, Size, trimmed}] = machi_csum_table:find(MC, Offset, Size),
ok = machi_csum_table:close(MC),
ok = machi_csum_table:delete(MC).
@ -18,14 +23,35 @@ close_test() ->
Filename = "./temp-checksum-dumb-file-2",
_ = file:delete(Filename),
{ok, MC} = machi_csum_table:open(Filename, []),
{Offset, Size, Checksum} = {64, 34, <<"deadbeef">>},
{error, unknown_chunk} = machi_csum_table:find(MC, Offset, Size),
Entry = {Offset, Size, Checksum} = {1064, 34, <<"deadbeef">>},
[] = machi_csum_table:find(MC, Offset, Size),
ok = machi_csum_table:write(MC, Offset, Size, Checksum),
{ok, Checksum} = machi_csum_table:find(MC, Offset, Size),
[Entry] = machi_csum_table:find(MC, Offset, Size),
ok = machi_csum_table:close(MC),
{ok, MC2} = machi_csum_table:open(Filename, []),
{ok, Checksum} = machi_csum_table:find(MC2, Offset, Size),
[Entry] = machi_csum_table:find(MC2, Offset, Size),
ok = machi_csum_table:trim(MC2, Offset, Size),
{error, trimmed} = machi_csum_table:find(MC2, Offset, Size),
[{Offset, Size, trimmed}] = machi_csum_table:find(MC2, Offset, Size),
ok = machi_csum_table:delete(MC2).
smoke2_test() ->
Filename = "./temp-checksum-dumb-file-3",
_ = file:delete(Filename),
{ok, MC} = machi_csum_table:open(Filename, []),
Entry = {Offset, Size, Checksum} = {1025, 10, <<"deadbeef">>},
ok = machi_csum_table:write(MC, Offset, Size, Checksum),
[] = machi_csum_table:find(MC, 0, 0),
[?HDR] = machi_csum_table:find(MC, 0, 1),
[Entry] = machi_csum_table:find(MC, Offset, Size),
[?HDR] = machi_csum_table:find(MC, 1, 1024),
[?HDR, Entry] = machi_csum_table:find(MC, 1023, 1024),
[Entry] = machi_csum_table:find(MC, 1024, 1024),
[Entry] = machi_csum_table:find(MC, 1025, 1024),
ok = machi_csum_table:trim(MC, Offset, Size),
[{Offset, Size, trimmed}] = machi_csum_table:find(MC, Offset, Size),
ok = machi_csum_table:close(MC),
ok = machi_csum_table:delete(MC).
%% TODO: add quickcheck test here

View file

@ -202,7 +202,8 @@ read_next(S, _Res, _Args) -> S.
read(Pid, Offset, Length) ->
case machi_file_proxy:read(Pid, Offset, Length) of
{ok, [{_, Offset, Data, Csum}]} ->
{ok, Chunks} ->
[{_, Offset, Data, Csum}] = machi_cr_client:trim_both_side(Chunks, Offset, Length),
{ok, Data, Csum};
E ->
E

View file

@ -89,8 +89,8 @@ machi_file_proxy_test_() ->
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1024, ?HYOOGE)),
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, ?HYOOGE, 1)),
?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1, random_binary(0, ?HYOOGE))),
?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, random_binary(0, 1024))),
?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1024, <<"fail">>)),
?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, random_binary(0, 1024))),
?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1024, <<"fail">>)),
?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1, <<"fail">>)),
?_assertMatch({ok, [{_, _, _, _}]}, machi_file_proxy:read(Pid, 1025, 1000)),
?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, [], 1024, <<"mind the gap">>)),
@ -98,6 +98,25 @@ machi_file_proxy_test_() ->
?_assertException(exit, {normal, _}, machi_file_proxy:stop(Pid))
].
multiple_chunks_read_test_() ->
clean_up_data_dir(?TESTDIR),
{ok, Pid} = machi_file_proxy:start_link("test", ?TESTDIR),
[
?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, random_binary(0, 1024))),
?_assertEqual(ok, machi_file_proxy:write(Pid, 10000, <<"fail">>)),
?_assertEqual(ok, machi_file_proxy:write(Pid, 20000, <<"fail">>)),
?_assertEqual(ok, machi_file_proxy:write(Pid, 30000, <<"fail">>)),
%% Freeza
?_assertEqual(ok, machi_file_proxy:write(Pid, 530000, <<"fail">>)),
?_assertMatch({ok, [{"test", 1024, _, _},
{"test", 10000, <<"fail">>, _},
{"test", 20000, <<"fail">>, _},
{"test", 30000, <<"fail">>, _},
{"test", 530000, <<"fail">>, _}]},
machi_file_proxy:read(Pid, 1024, 530000)),
?_assertException(exit, {normal, _}, machi_file_proxy:stop(Pid))
].
-endif. % !PULSE
-endif. % TEST.