From ebb9bc3f5aac12243b9a3f619ae5f54047181140 Mon Sep 17 00:00:00 2001 From: UENISHI Kota Date: Tue, 20 Oct 2015 17:38:09 +0900 Subject: [PATCH] Allow reading multiple chunks at once * When repairing multiple chunks at once and any of its repair failed, the whole read request and repair work will fail * Rename read_repair3 and read_repair4 to do_repair_chunks and do_repair chunk in machi_file_proxy * This pull request changes return semantics of read_chunk(), that returns any chunk included in requested range * First and last chunk may be cut to fit the requested range * In machi_file_proxy, unwritten_bytes are removed and replaced by machi_csum_table --- src/machi_cr_client.erl | 137 ++++++++-------- src/machi_csum_table.erl | 48 ++++-- src/machi_file_proxy.erl | 283 +++++++++++---------------------- src/machi_pb_translate.erl | 1 - test/machi_cr_client_test.erl | 6 +- test/machi_csum_table_test.erl | 44 +++-- test/machi_file_proxy_eqc.erl | 3 +- test/machi_file_proxy_test.erl | 25 ++- 8 files changed, 265 insertions(+), 282 deletions(-) diff --git a/src/machi_cr_client.erl b/src/machi_cr_client.erl index 91f3a23..ef9894d 100644 --- a/src/machi_cr_client.erl +++ b/src/machi_cr_client.erl @@ -112,6 +112,7 @@ -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). +-export([trim_both_side/3]). -endif. % TEST. -export([start_link/1]). @@ -432,8 +433,8 @@ do_append_midtail2([FLU|RestFLUs]=FLUs, Prefix, File, Offset, Chunk, %% We know what the chunk ought to be, so jump to the %% middle of read-repair. Resume = {append, Offset, iolist_size(Chunk), File}, - read_repair3(FLUs, Resume, Chunk, [], File, Offset, - iolist_size(Chunk), Depth, STime, S); + do_repair_chunk(FLUs, Resume, Chunk, [], File, Offset, + iolist_size(Chunk), Depth, STime, S); {error, not_written} -> exit({todo_should_never_happen,?MODULE,?LINE,File,Offset}) end. @@ -529,14 +530,13 @@ do_read_chunk2(File, Offset, Size, Depth, STime, TO, ConsistencyMode = P#projection_v1.mode, case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), EpochID, File, Offset, Size, ?TIMEOUT) of - {ok, [{_, _, Chunk, _Csum}] = Chunks} when byte_size(Chunk) == Size -> + {ok, Chunks0} when is_list(Chunks0) -> + Chunks = trim_both_side(Chunks0, Offset, Size), {reply, {ok, Chunks}, S}; - {ok, Chunks} when is_list(Chunks) -> - {reply, {ok, Chunks}, S}; - {ok, BadChunk} -> - %% TODO cleaner handling of bad chunks - exit({todo, bad_chunk_size, ?MODULE, ?LINE, File, Offset, Size, - got, byte_size(BadChunk)}); + %% {ok, BadChunk} -> + %% %% TODO cleaner handling of bad chunks + %% exit({todo, bad_chunk_size, ?MODULE, ?LINE, File, Offset, Size, + %% got, byte_size(BadChunk)}); {error, bad_arg} = BadArg -> {reply, BadArg, S}; {error, partial_read}=Err -> @@ -599,13 +599,14 @@ read_repair2(cp_mode=ConsistencyMode, Tail = lists:last(readonly_flus(P)), case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), EpochID, File, Offset, Size, ?TIMEOUT) of - {ok, [{_, Offset, Chunk, _}]} when byte_size(Chunk) == Size -> + {ok, Chunks} when is_list(Chunks) -> ToRepair = mutation_flus(P) -- [Tail], - read_repair3(ToRepair, ReturnMode, Chunk, [Tail], File, Offset, - Size, Depth, STime, S); - {ok, BadChunk} -> - exit({todo, bad_chunk_size, ?MODULE, ?LINE, File, Offset, - Size, got, byte_size(BadChunk)}); + {Reply, S1} = do_repair_chunks(Chunks, ToRepair, ReturnMode, + [Tail], File, Depth, STime, S, {ok, Chunks}), + {reply, Reply, S1}; + %% {ok, BadChunk} -> + %% exit({todo, bad_chunk_size, ?MODULE, ?LINE, File, Offset, + %% Size, got, byte_size(BadChunk)}); {error, bad_checksum}=BadCS -> %% TODO: alternate strategy? {reply, BadCS, S}; @@ -623,26 +624,11 @@ read_repair2(ap_mode=ConsistencyMode, #state{proj=P}=S) -> Eligible = mutation_flus(P), case try_to_find_chunk(Eligible, File, Offset, Size, S) of - {ok, [{File0,Offset0,Chunk0,Csum}], GotItFrom} when byte_size(Chunk0) == Size -> + {ok, Chunks, GotItFrom} when is_list(Chunks) -> ToRepair = mutation_flus(P) -- [GotItFrom], - %% TODO: stop matching single-size list - %% {RepairedChunks, S2} = - %% lists:foldl(fun({_, Offset0, Chunk0, Csum}, {Chunks0, S0}) -> - %% Size0 = byte_size(Chunk0), - %% {reply, {ok, Chunk1}, S1} = - %% read_repair3(ToRepair, ReturnMode, Chunk0, [GotItFrom], File, - %% Offset0, Size0, Depth, STime, S0), - %% {[{File, Offset0, Chunk1, Csum}|Chunks0], S1} - %% end, - %% {[], S}, Chunks), - %% {reply, {ok, RepairedChunks}, S2}; - {reply, {ok, RepairedChunk}, S2} - = read_repair3(ToRepair, ReturnMode, Chunk0, [GotItFrom], File0, - Offset0, Size, Depth, STime, S), - {reply, {ok, [{File0, Offset0, RepairedChunk, Csum}]}, S2}; - {ok, BadChunk, _GotItFrom} -> - exit({todo, bad_chunk_size, ?MODULE, ?LINE, File, - Offset, Size, got, byte_size(BadChunk)}); + {Reply, S1} = do_repair_chunks(Chunks, ToRepair, ReturnMode, [GotItFrom], + File, Depth, STime, S, {ok, Chunks}), + {reply, Reply, S1}; {error, bad_checksum}=BadCS -> %% TODO: alternate strategy? {reply, BadCS, S}; @@ -656,19 +642,21 @@ read_repair2(ap_mode=ConsistencyMode, exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size}) end. -read_repair3([], ReturnMode, Chunk, Repaired, File, Offset, - Size, Depth, STime, S) -> - read_repair4([], ReturnMode, Chunk, Repaired, File, Offset, - Size, Depth, STime, S); -%% Never matches because Depth is always incremented beyond 0 prior to -%% getting here. -%% -%% read_repair3(ToRepair, ReturnMode, Chunk, Repaired, File, Offset, -%% Size, 0=Depth, STime, S) -> -%% read_repair4(ToRepair, ReturnMode, Chunk, Repaired, File, Offset, -%% Size, Depth + 1, STime, S); -read_repair3(ToRepair, ReturnMode, Chunk, Repaired, File, Offset, - Size, Depth, STime, #state{proj=P}=S) -> +do_repair_chunks([], _, _, _, _, _, _, S, Reply) -> + {Reply, S}; +do_repair_chunks([{_, Offset, Chunk, _Csum}|T], + ToRepair, ReturnMode, [GotItFrom], File, Depth, STime, S, Reply) -> + Size = iolist_size(Chunk), + case do_repair_chunk(ToRepair, ReturnMode, Chunk, [GotItFrom], File, Offset, + Size, Depth, STime, S) of + {ok, Chunk, S1} -> + do_repair_chunks(T, ToRepair, ReturnMode, [GotItFrom], File, Depth, STime, S1, Reply); + Error -> + Error + end. + +do_repair_chunk(ToRepair, ReturnMode, Chunk, Repaired, File, Offset, + Size, Depth, STime, #state{proj=P}=S) -> %% io:format(user, "read_repair3 sleep1,", []), sleep_a_while(Depth), DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, @@ -679,43 +667,43 @@ read_repair3(ToRepair, ReturnMode, Chunk, Repaired, File, Offset, case S2#state.proj of P2 when P2 == undefined orelse P2#projection_v1.upi == [] -> - read_repair3(ToRepair, ReturnMode, Chunk, Repaired, File, - Offset, Size, Depth + 1, STime, S2); + do_repair_chunk(ToRepair, ReturnMode, Chunk, Repaired, File, + Offset, Size, Depth + 1, STime, S2); P2 -> ToRepair2 = mutation_flus(P2) -- Repaired, - read_repair4(ToRepair2, ReturnMode, Chunk, Repaired, File, - Offset, Size, Depth + 1, STime, S2) + do_repair_chunk2(ToRepair2, ReturnMode, Chunk, Repaired, File, + Offset, Size, Depth + 1, STime, S2) end end. -read_repair4([], ReturnMode, Chunk, _Repaired, File, Offset, - _IgnoreSize, _Depth, _STime, S) -> +do_repair_chunk2([], ReturnMode, Chunk, _Repaired, File, Offset, + _IgnoreSize, _Depth, _STime, S) -> %% TODO: add stats for # of repairs, length(_Repaired)-1, etc etc? case ReturnMode of read -> - {reply, {ok, Chunk}, S}; + {ok, Chunk, S}; {append, Offset, Size, File} -> - {reply, {ok, {Offset, Size, File}}, S} + {ok, {Offset, Size, File}, S} end; -read_repair4([First|Rest]=ToRepair, ReturnMode, Chunk, Repaired, File, Offset, - Size, Depth, STime, #state{epoch_id=EpochID, proxies_dict=PD}=S) -> +do_repair_chunk2([First|Rest]=ToRepair, ReturnMode, Chunk, Repaired, File, Offset, + Size, Depth, STime, #state{epoch_id=EpochID, proxies_dict=PD}=S) -> Proxy = orddict:fetch(First, PD), case ?FLU_PC:write_chunk(Proxy, EpochID, File, Offset, Chunk, ?TIMEOUT) of ok -> - read_repair4(Rest, ReturnMode, Chunk, [First|Repaired], File, - Offset, Size, Depth, STime, S); + do_repair_chunk2(Rest, ReturnMode, Chunk, [First|Repaired], File, + Offset, Size, Depth, STime, S); {error, bad_checksum}=BadCS -> %% TODO: alternate strategy? - {reply, BadCS, S}; + {BadCS, S}; {error, Retry} when Retry == partition; Retry == bad_epoch; Retry == wedged -> - read_repair3(ToRepair, ReturnMode, Chunk, Repaired, File, - Offset, Size, Depth, STime, S); + do_repair_chunk(ToRepair, ReturnMode, Chunk, Repaired, File, + Offset, Size, Depth, STime, S); {error, written} -> %% TODO: To be very paranoid, read the chunk here to verify %% that it is exactly our Chunk. - read_repair4(Rest, ReturnMode, Chunk, Repaired, File, - Offset, Size, Depth, STime, S); + do_repair_chunk2(Rest, ReturnMode, Chunk, Repaired, File, + Offset, Size, Depth, STime, S); {error, not_written} -> exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size}) end. @@ -872,7 +860,7 @@ try_to_find_chunk(Eligible, File, Offset, Size, Proxy = orddict:fetch(FLU, PD), case ?FLU_PC:read_chunk(Proxy, EpochID, File, Offset, Size) of - {ok, [{_, Offset, Chunk, _}] = Chunks} when byte_size(Chunk) == Size -> + {ok, Chunks} when is_list(Chunks) -> {FLU, {ok, Chunks}}; Else -> {FLU, Else} @@ -935,3 +923,22 @@ timeout(infinity) -> timeout(15*60*1000); % close enough to infinity timeout(Timeout0) -> {Timeout0, Timeout0 + 30*1000}. + +trim_both_side([], _Offset, _Size) -> []; +trim_both_side([{F, Offset0, Chunk, _Csum}|L], Offset, Size) + when Offset0 < Offset -> + TrashLen = 8 * (Offset - Offset0), + <<_:TrashLen/binary, NewChunk/binary>> = Chunk, + NewH = {F, Offset, NewChunk, <<>>}, + trim_both_side([NewH|L], Offset, Size); +trim_both_side(Chunks, Offset, Size) -> + %% TODO: optimize + [{F, Offset1, Chunk1, _Csum1}|L] = lists:reverse(Chunks), + Size1 = iolist_size(Chunk1), + if Offset + Size < Offset1 + Size1 -> + Size2 = Offset + Size - Offset1, + <> = Chunk1, + lists:reverse([{F, Offset1, NewChunk1, <<>>}|L]); + true -> + Chunks + end. diff --git a/src/machi_csum_table.erl b/src/machi_csum_table.erl index 9720ddc..3ce391d 100644 --- a/src/machi_csum_table.erl +++ b/src/machi_csum_table.erl @@ -2,6 +2,7 @@ -export([open/2, find/3, write/4, trim/3, + all_trimmed/2, sync/1, calc_unwritten_bytes/1, close/1, delete/1]). @@ -11,7 +12,7 @@ -include("machi.hrl"). -ifdef(TEST). --export([split_checksum_list_blob_decode/1]). +-export([split_checksum_list_blob_decode/1, all/1]). -endif. -record(machi_csum_table, @@ -29,6 +30,9 @@ {ok, table()} | {error, file:posix()}. open(CSumFilename, _Opts) -> T = ets:new(?MODULE, [private, ordered_set]), + CSum = machi_util:make_tagged_csum(none), + %% Dummy entry for headers + true = ets:insert_new(T, {0, ?MINIMUM_OFFSET, CSum}), C0 = #machi_csum_table{ file=CSumFilename, table=T}, @@ -57,15 +61,18 @@ open(CSumFilename, _Opts) -> {ok, C0#machi_csum_table{fd=Fd}}. -spec find(table(), machi_dt:chunk_pos(), machi_dt:chunk_size()) -> - {ok, machi_dt:chunk_csum()} | {error, trimmed|notfound}. + list({machi_dt:chunk_pos(), + machi_dt:chunk_size(), + machi_dt:csum()}). find(#machi_csum_table{table=T}, Offset, Size) -> - %% TODO: Check whether all bytes here are written or not - case ets:lookup(T, Offset) of - [{Offset, Size, trimmed}] -> {error, trimmed}; - [{Offset, Size, Checksum}] -> {ok, Checksum}; - [{Offset, _, _}] -> {error, unknown_chunk}; - [] -> {error, unknown_chunk} - end. + ets:select(T, [{{'$1', '$2', '$3'}, + [inclusion_match_spec(Offset, Size)], + ['$_']}]). + +-ifdef(TEST). +all(#machi_csum_table{table=T}) -> + ets:tab2list(T). +-endif. -spec write(table(), machi_dt:chunk_pos(), machi_dt:chunk_size(), machi_dt:chunk_csum()) -> @@ -96,6 +103,10 @@ trim(#machi_csum_table{fd=Fd, table=T}, Offset, Size) -> Error end. +-spec all_trimmed(table(), machi_dt:chunk_pos()) -> boolean(). +all_trimmed(#machi_csum_table{table=T}, Pos) -> + runthru(ets:tab2list(T), 0, Pos). + -spec sync(table()) -> ok | {error, file:posix()}. sync(#machi_csum_table{fd=Fd}) -> file:sync(Fd). @@ -218,3 +229,22 @@ build_unwritten_bytes_list([{CurrentOffset, CurrentSize, _Csum}|Rest], LastOffse build_unwritten_bytes_list(Rest, (CurrentOffset+CurrentSize), [{LastOffset, Hole}|Acc]); build_unwritten_bytes_list([{CO, CS, _Ck}|Rest], _LastOffset, Acc) -> build_unwritten_bytes_list(Rest, CO + CS, Acc). + +%% @doc make sure all trimmed chunks are continously chained +%% TODO: test with EQC +runthru([], Pos, Pos) -> true; +runthru([], Pos0, Pos) when Pos0 < Pos -> false; +runthru([{Offset, Size, trimmed}|T], Offset, Pos) -> + runthru(T, Offset+Size, Pos); +runthru(_, _, _) -> + false. + +%% @doc If you want to find an overlap among two areas [x, y] and [a, +%% b] where x < y and a < b; if (a-y)*(b-x) < 0 then there's a +%% overlap, else, > 0 then there're no overlap. border condition = 0 +%% is not overlap in this offset-size case. +inclusion_match_spec(Offset, Size) -> + {'>', 0, + {'*', + {'-', Offset + Size, '$1'}, + {'-', Offset, {'+', '$1', '$2'}}}}. diff --git a/src/machi_file_proxy.erl b/src/machi_file_proxy.erl index 11fdf2a..90630ae 100644 --- a/src/machi_file_proxy.erl +++ b/src/machi_file_proxy.erl @@ -76,9 +76,6 @@ -type op_stats() :: { Total :: non_neg_integer(), Errors :: non_neg_integer() }. --type byte_sequence() :: { Offset :: non_neg_integer(), - Size :: pos_integer()|infinity }. - -record(state, { data_dir :: string() | undefined, filename :: string() | undefined, @@ -87,7 +84,6 @@ csum_file :: string()|undefined, csum_path :: string()|undefined, eof_position = 0 :: non_neg_integer(), - unwritten_bytes = [] :: [byte_sequence()], data_filehandle :: file:filehandle(), csum_table :: machi_csum_table:table(), tref :: reference(), %% timer ref @@ -129,7 +125,10 @@ sync(_Pid, Type) -> lager:warning("Bad arg to sync: Type ~p", [Type]), {error, bad_arg}. -% @doc Read file at offset for length +% @doc Read file at offset for length. This returns a sequence of all +% chunks that overlaps with requested offset and length. Note that +% borders are not alinged, not to mess up repair at cr_client with +% checksums. They should be cut at cr_client. -spec read(Pid :: pid(), Offset :: non_neg_integer(), Length :: non_neg_integer()) -> @@ -211,7 +210,6 @@ init({Filename, DataDir}) -> data_filehandle = FHd, csum_table = CsumTable, tref = Tref, - unwritten_bytes = UnwrittenBytes, eof_position = Eof}, lager:debug("Starting file proxy ~p for filename ~p, state = ~p", [self(), Filename, St]), @@ -261,7 +259,8 @@ handle_call({read, _Offset, _Length}, _From, handle_call({read, Offset, Length}, _From, State = #state{eof_position = Eof, reads = {T, Err} - }) when Offset + Length > Eof -> + }) when Offset > Eof -> + %% make sure [Offset, Offset+Length) has an overlap with file range lager:error("Read request at offset ~p for ~p bytes is past the last write offset of ~p", [Offset, Length, Eof]), {reply, {error, not_written}, State#state{reads = {T + 1, Err + 1}}}; @@ -270,19 +269,12 @@ handle_call({read, Offset, Length}, _From, State = #state{filename = F, data_filehandle = FH, csum_table = CsumTable, - unwritten_bytes = U, reads = {T, Err} }) -> - - Checksum = case machi_csum_table:find(CsumTable, Offset, Length) of - {ok, Checksum0} -> - Checksum0; - _ -> - undefined - end, - {Resp, NewErr} = - case handle_read(FH, F, Checksum, Offset, Length, U) of + case do_read(FH, F, CsumTable, Offset, Length) of + {ok, []} -> + {{error, not_written}, Err + 1}; {ok, Chunks} -> %% Kludge to wrap read result in tuples, to support fragmented read %% XXX FIXME @@ -305,35 +297,31 @@ handle_call({write, _Offset, _ClientMeta, _Data}, _From, {reply, {error, wedged}, State#state{writes = {T + 1, Err + 1}}}; handle_call({write, Offset, ClientMeta, Data}, _From, - State = #state{unwritten_bytes = U, - filename = F, + State = #state{filename = F, writes = {T, Err}, data_filehandle = FHd, - csum_table = CsumTable - }) -> + csum_table = CsumTable}) -> ClientCsumTag = proplists:get_value(client_csum_tag, ClientMeta, ?CSUM_TAG_NONE), ClientCsum = proplists:get_value(client_csum, ClientMeta, <<>>), - {Resp, NewErr, NewU} = + {Resp, NewErr} = case check_or_make_tagged_csum(ClientCsumTag, ClientCsum, Data) of {error, {bad_csum, Bad}} -> lager:error("Bad checksum on write; client sent ~p, we computed ~p", [ClientCsum, Bad]), - {{error, bad_checksum}, Err + 1, U}; + {{error, bad_checksum}, Err + 1}; TaggedCsum -> - case handle_write(FHd, CsumTable, F, TaggedCsum, Offset, Data, U) of - {ok, NewU1} -> - {ok, Err, NewU1}; + case handle_write(FHd, CsumTable, F, TaggedCsum, Offset, Data) of + ok -> + {ok, Err}; Error -> - {Error, Err + 1, U} + {Error, Err + 1} end end, - {NewEof, infinity} = lists:last(NewU), + {NewEof, infinity} = lists:last(machi_csum_table:calc_unwritten_bytes(CsumTable)), {reply, Resp, State#state{writes = {T+1, NewErr}, - eof_position = NewEof, - unwritten_bytes = NewU - }}; + eof_position = NewEof}}; %% APPENDS @@ -345,7 +333,6 @@ handle_call({append, _ClientMeta, _Extra, _Data}, _From, handle_call({append, ClientMeta, Extra, Data}, _From, State = #state{eof_position = EofP, - unwritten_bytes = U, filename = F, appends = {T, Err}, data_filehandle = FHd, @@ -355,24 +342,25 @@ handle_call({append, ClientMeta, Extra, Data}, _From, ClientCsumTag = proplists:get_value(client_csum_tag, ClientMeta, ?CSUM_TAG_NONE), ClientCsum = proplists:get_value(client_csum, ClientMeta, <<>>), - {Resp, NewErr, NewU} = + {Resp, NewErr} = case check_or_make_tagged_csum(ClientCsumTag, ClientCsum, Data) of {error, {bad_csum, Bad}} -> lager:error("Bad checksum; client sent ~p, we computed ~p", [ClientCsum, Bad]), - {{error, bad_checksum}, Err + 1, U}; + {{error, bad_checksum}, Err + 1}; TaggedCsum -> - case handle_write(FHd, CsumTable, F, TaggedCsum, EofP, Data, U) of - {ok, NewU1} -> - {{ok, F, EofP}, Err, NewU1}; + case handle_write(FHd, CsumTable, F, TaggedCsum, EofP, Data) of + ok -> + {{ok, F, EofP}, Err}; Error -> - {Error, Err + 1, EofP, U} + {Error, Err + 1, EofP} end end, - {NewEof, infinity} = lists:last(NewU), + %% TODO: do we check this with calling + %% machi_csum_table:calc_unwritten_bytes/1? + NewEof = EofP + byte_size(Data) + Extra, {reply, Resp, State#state{appends = {T+1, NewErr}, - eof_position = NewEof + Extra, - unwritten_bytes = NewU + eof_position = NewEof }}; handle_call(Req, _From, State) -> @@ -510,17 +498,15 @@ check_or_make_tagged_csum(OtherTag, _ClientCsum, _Data) -> lager:warning("Unknown checksum tag ~p", [OtherTag]), {error, bad_checksum}. --spec handle_read(FHd :: file:filehandle(), - Filename :: string(), - TaggedCsum :: undefined|binary(), - Offset :: non_neg_integer(), - Size :: non_neg_integer(), - Unwritten :: [byte_sequence()] - ) -> {ok, Bytes :: binary(), Csum :: binary()} | - eof | +-spec do_read(FHd :: file:filehandle(), + Filename :: string(), + CsumTable :: machi_csum_table:table(), + Offset :: non_neg_integer(), + Size :: non_neg_integer() + ) -> {ok, Chunks :: [{string(), Offset::non_neg_integer(), binary(), Csum :: binary()}]} | {error, bad_checksum} | {error, partial_read} | - {error, not_written} | + {error, file:posix()} | {error, Other :: term() }. % @private Attempt a read operation on the given offset and length. %
  • @@ -535,22 +521,19 @@ check_or_make_tagged_csum(OtherTag, _ClientCsum, _Data) -> % tuple is returned. %
  • % -% On success, `{ok, Bytes, Checksum}' is returned. -handle_read(FHd, Filename, undefined, Offset, Size, U) -> - handle_read(FHd, Filename, machi_util:make_tagged_csum(none), Offset, Size, U); +do_read(FHd, Filename, CsumTable, Offset, Size) -> + %% Note that find/3 only returns overlapping chunks, both borders + %% are not aligned to original Offset and Size. + ChunkCsums = machi_csum_table:find(CsumTable, Offset, Size), + read_all_ranges(FHd, Filename, ChunkCsums, []). -handle_read(FHd, Filename, TaggedCsum, Offset, Size, U) -> - case is_byte_range_unwritten(Offset, Size, U) of - true -> - {error, not_written}; - false -> - do_read(FHd, Filename, TaggedCsum, Offset, Size) - end. +read_all_ranges(_, _, [], ReadChunks) -> + {ok, lists:reverse(ReadChunks)}; -do_read(FHd, Filename, TaggedCsum, Offset, Size) -> +read_all_ranges(FHd, Filename, [{Offset, Size, TaggedCsum}|T], ReadChunks) -> case file:pread(FHd, Offset, Size) of eof -> - eof; + read_all_ranges(FHd, Filename, T, ReadChunks); {ok, Bytes} when byte_size(Bytes) == Size -> {Tag, Ck} = machi_util:unmake_tagged_csum(TaggedCsum), case check_or_make_tagged_csum(Tag, Ck, Bytes) of @@ -559,11 +542,13 @@ do_read(FHd, Filename, TaggedCsum, Offset, Size) -> [Bad, Ck]), {error, bad_checksum}; TaggedCsum -> - {ok, [{Filename, Offset, Bytes, TaggedCsum}]}; + read_all_ranges(FHd, Filename, T, + [{Filename, Offset, Bytes, TaggedCsum}|ReadChunks]); OtherCsum when Tag =:= ?CSUM_TAG_NONE -> %% XXX FIXME: Should we return something other than %% {ok, ....} in this case? - {ok, [{Filename, Offset, Bytes, OtherCsum}]} + read_all_ranges(FHd, Filename, T, + [{Filename, Offset, Bytes, OtherCsum}|ReadChunks]) end; {ok, Partial} -> lager:error("In file ~p, offset ~p, wanted to read ~p bytes, but got ~p", @@ -580,9 +565,8 @@ do_read(FHd, Filename, TaggedCsum, Offset, Size) -> Filename :: string(), TaggedCsum :: binary(), Offset :: non_neg_integer(), - Data :: binary(), - Unwritten :: [byte_sequence()] - ) -> {ok, NewU :: [byte_sequence()]} | + Data :: binary() + ) -> ok | {error, written} | {error, Reason :: term()}. % @private Implements the write and append operation. The first task is to @@ -592,50 +576,44 @@ do_read(FHd, Filename, TaggedCsum, Offset, Size) -> % checksum and return a "fake" ok response as if the write had been performed % when it hasn't really. % -% If a write proceeds, the offset, size and checksum are written to a metadata -% file, and the internal list of unwritten bytes is modified to reflect the -% just-performed write. This is then returned to the caller as -% `{ok, NewUnwritten}' where NewUnwritten is the revised unwritten byte list. -handle_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Data, U) -> +% If a write proceeds, the offset, size and checksum are written to a +% metadata file, and the internal list of unwritten bytes is modified +% to reflect the just-performed write. This is then returned to the +% caller as `ok' +handle_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Data) -> Size = iolist_size(Data), - - case is_byte_range_unwritten(Offset, Size, U) of - false -> - case machi_csum_table:find(CsumTable, Offset, Size) of - {error, trimmed} = Error -> - Error; - {error, unknown_chunk} -> - %% The specified has some bytes written, while - %% it's not in the checksum table. Trust U and - %% return as it is used. - {error, written}; - {ok, TaggedCsum} -> - case do_read(FHd, Filename, TaggedCsum, Offset, Size) of - eof -> - lager:warning("This should never happen: got eof while reading at offset ~p in file ~p that's supposedly written", - [Offset, Filename]), - {error, server_insanity}; - {ok, _} -> - {ok, U}; - _ -> - {error, written} - end; - {ok, OtherCsum} -> - %% Got a checksum, but it doesn't match the data block's - lager:error("During a potential write at offset ~p in file ~p, a check for unwritten bytes gave us checksum ~p but the data we were trying to trying to write has checksum ~p", - [Offset, Filename, OtherCsum, TaggedCsum]), - {error, written} - end; - true -> + case machi_csum_table:find(CsumTable, Offset, Size) of + [] -> try - do_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Size, Data, U) + do_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Size, Data) catch - %%% XXX FIXME: be more specific on badmatch that might - %%% occur around line 593 when we write the checksum - %%% file entry for the data blob we just put on the disk + %% XXX FIXME: be more specific on badmatch that might + %% occur around line 593 when we write the checksum + %% file entry for the data blob we just put on the disk error:Reason -> {error, Reason} - end + end; + [{Offset, Size, TaggedCsum}] -> + case do_read(FHd, Filename, CsumTable, Offset, Size) of + {error, _} = E -> + lager:warning("This should never happen: got ~p while reading at offset ~p in file ~p that's supposedly written", + [E, Offset, Filename]), + {error, server_insanity}; + {ok, [{_, Offset, Data, TaggedCsum}]} -> + %% TODO: what if different checksum got from do_read()? + ok; + {ok, _Other} -> + %% TODO: leave some debug/warning message here? + {error, written} + end; + [{Offset, Size, OtherCsum}] -> + %% Got a checksum, but it doesn't match the data block's + lager:error("During a potential write at offset ~p in file ~p, a check for unwritten bytes gave us checksum ~p but the data we were trying to trying to write has checksum ~p", + [Offset, Filename, OtherCsum, TaggedCsum]), + {error, written}; + _Chunks -> + %% No byte is trimmed, but at least one byte is written + {error, written} end. % @private Implements the disk writes for both the write and append @@ -646,99 +624,20 @@ handle_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Data, U) -> TaggedCsum :: binary(), Offset :: non_neg_integer(), Size :: non_neg_integer(), - Data :: binary(), - Unwritten :: [byte_sequence()] - ) -> {ok, NewUnwritten :: [byte_sequence()]} | - {error, Reason :: term()}. -do_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Size, Data, U) -> + Data :: binary() + ) -> ok | {error, Reason :: term()}. +do_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Size, Data) -> case file:pwrite(FHd, Offset, Data) of ok -> lager:debug("Successful write in file ~p at offset ~p, length ~p", [Filename, Offset, Size]), ok = machi_csum_table:write(CsumTable, Offset, Size, TaggedCsum), - NewU = update_unwritten(Offset, Size, U), - lager:debug("Successful write to checksum file for ~p; unwritten bytes are now: ~p", - [Filename, NewU]), - {ok, NewU}; + lager:debug("Successful write to checksum file for ~p", + [Filename]), + %% io:format(user, "here, heh ~p~n", [?LINE]), + ok; Other -> lager:error("Got ~p during write to file ~p at offset ~p, length ~p", [Other, Filename, Offset, Size]), {error, Other} end. - --spec is_byte_range_unwritten( Offset :: non_neg_integer(), - Size :: pos_integer(), - Unwritten :: [byte_sequence()] ) -> boolean(). -% @private Given an offset and a size, return `true' if a byte range has -% not been written. Otherwise, return `false'. -is_byte_range_unwritten(Offset, Size, Unwritten) -> - case Unwritten of - [] -> - lager:critical("Unwritten byte list has 0 entries! This should never happen."), - false; - [{Eof, infinity}] -> - Offset >= Eof; - _ -> - case lookup_unwritten(Offset, Size, Unwritten) of - {ok, _} -> true; - not_found -> false - end - end. - --spec lookup_unwritten( Offset :: non_neg_integer(), - Size :: pos_integer(), - Unwritten :: [byte_sequence()] - ) -> {ok, byte_sequence()} | not_found. -% @private Given an offset and a size, scan the list of unwritten bytes and -% look for a "hole" where a write might be allowed if any exist. If a -% suitable byte sequence is found, the function returns a tuple of {ok, -% {Position, Space}} is returned. `not_found' is returned if no suitable -% space is located. -lookup_unwritten(_Offset, _Size, []) -> - not_found; -lookup_unwritten(Offset, _Size, [H={Pos, infinity}|_Rest]) when Offset >= Pos -> - {ok, H}; -lookup_unwritten(Offset, Size, [H={Pos, Space}|_Rest]) - when Offset >= Pos andalso Offset < Pos+Space - andalso Size =< (Space - (Offset - Pos)) -> - {ok, H}; -lookup_unwritten(Offset, Size, [_H|Rest]) -> - %% These are not the droids you're looking for. - lookup_unwritten(Offset, Size, Rest). - -%%% if the pos is greater than offset + size then we're done. End early. - --spec update_unwritten( Offset :: non_neg_integer(), - Size :: pos_integer(), - Unwritten :: [byte_sequence()] ) -> NewUnwritten :: [byte_sequence()]. -% @private Given an offset, a size and the unwritten byte list, return an updated -% and sorted unwritten byte list accounting for any completed write operation. -update_unwritten(Offset, Size, Unwritten) -> - case lookup_unwritten(Offset, Size, Unwritten) of - not_found -> - lager:error("Couldn't find byte sequence tuple for a write which earlier found a valid spot to write!!! This should never happen!"), - Unwritten; - {ok, {Offset, Size}} -> - %% we neatly filled in our hole... - lists:keydelete(Offset, 1, Unwritten); - {ok, S={Pos, _}} -> - lists:sort(lists:keydelete(Pos, 1, Unwritten) ++ - update_byte_range(Offset, Size, S)) - end. - --spec update_byte_range( Offset :: non_neg_integer(), - Size :: pos_integer(), - Sequence :: byte_sequence() ) -> Updates :: [byte_sequence()]. -% @private Given an offset and size and a byte sequence tuple where a -% write took place, return a list of updates to the list of unwritten bytes -% accounting for the space occupied by the just completed write. -update_byte_range(Offset, Size, {Eof, infinity}) when Offset == Eof -> - [{Offset + Size, infinity}]; -update_byte_range(Offset, Size, {Eof, infinity}) when Offset > Eof -> - [{Eof, (Offset - Eof)}, {Offset+Size, infinity}]; -update_byte_range(Offset, Size, {Pos, Space}) when Offset == Pos andalso Size < Space -> - [{Offset + Size, Space - Size}]; -update_byte_range(Offset, Size, {Pos, Space}) when Offset > Pos -> - [{Pos, Offset - Pos}, {Offset+Size, ( (Pos+Space) - (Offset + Size) )}]. - - diff --git a/src/machi_pb_translate.erl b/src/machi_pb_translate.erl index 0c5392e..be0d75e 100644 --- a/src/machi_pb_translate.erl +++ b/src/machi_pb_translate.erl @@ -659,7 +659,6 @@ to_pb_response(ReqID, {high_read_chunk, _File, _Offset, _Size}, Resp) -> {ok, Chunks} -> MpbChunks = lists:map(fun({File, Offset, Bytes, Csum}) -> {Tag, Ck} = machi_util:unmake_tagged_csum(Csum), - io:format(user, "~p oom~n", [Csum]), #mpb_chunk{ offset=Offset, file_name=File, diff --git a/test/machi_cr_client_test.erl b/test/machi_cr_client_test.erl index 41f1bda..fb07052 100644 --- a/test/machi_cr_client_test.erl +++ b/test/machi_cr_client_test.erl @@ -149,8 +149,10 @@ smoke_test2() -> %% Misc API smoke & minor regression checks {error, bad_arg} = machi_cr_client:read_chunk(C1, <<"no">>, 999999999, 1), - {error, not_written} = machi_cr_client:read_chunk(C1, File1, - Off1, 88888888), + {ok, [{_,Off1,Chunk1,_}, + {_,FooOff1,Chunk1,_}, + {_,FooOff2,Chunk2,_}]} = machi_cr_client:read_chunk(C1, File1, + Off1, 88888888), %% Checksum list return value is a primitive binary(). {ok, KludgeBin} = machi_cr_client:checksum_list(C1, File1), true = is_binary(KludgeBin), diff --git a/test/machi_csum_table_test.erl b/test/machi_csum_table_test.erl index 9846e1a..cf43aeb 100644 --- a/test/machi_csum_table_test.erl +++ b/test/machi_csum_table_test.erl @@ -1,16 +1,21 @@ -module(machi_csum_table_test). -compile(export_all). +-include_lib("eunit/include/eunit.hrl"). +-define(HDR, {0, 1024, <<0>>}). + smoke_test() -> Filename = "./temp-checksum-dumb-file", _ = file:delete(Filename), {ok, MC} = machi_csum_table:open(Filename, []), - {Offset, Size, Checksum} = {64, 34, <<"deadbeef">>}, - {error, unknown_chunk} = machi_csum_table:find(MC, Offset, Size), + [{1024, infinity}] = machi_csum_table:calc_unwritten_bytes(MC), + Entry = {Offset, Size, Checksum} = {1064, 34, <<"deadbeef">>}, + [] = machi_csum_table:find(MC, Offset, Size), ok = machi_csum_table:write(MC, Offset, Size, Checksum), - {ok, Checksum} = machi_csum_table:find(MC, Offset, Size), + [{1024, 40}, {1098, infinity}] = machi_csum_table:calc_unwritten_bytes(MC), + [Entry] = machi_csum_table:find(MC, Offset, Size), ok = machi_csum_table:trim(MC, Offset, Size), - {error, trimmed} = machi_csum_table:find(MC, Offset, Size), + [{Offset, Size, trimmed}] = machi_csum_table:find(MC, Offset, Size), ok = machi_csum_table:close(MC), ok = machi_csum_table:delete(MC). @@ -18,14 +23,35 @@ close_test() -> Filename = "./temp-checksum-dumb-file-2", _ = file:delete(Filename), {ok, MC} = machi_csum_table:open(Filename, []), - {Offset, Size, Checksum} = {64, 34, <<"deadbeef">>}, - {error, unknown_chunk} = machi_csum_table:find(MC, Offset, Size), + Entry = {Offset, Size, Checksum} = {1064, 34, <<"deadbeef">>}, + [] = machi_csum_table:find(MC, Offset, Size), ok = machi_csum_table:write(MC, Offset, Size, Checksum), - {ok, Checksum} = machi_csum_table:find(MC, Offset, Size), + [Entry] = machi_csum_table:find(MC, Offset, Size), ok = machi_csum_table:close(MC), {ok, MC2} = machi_csum_table:open(Filename, []), - {ok, Checksum} = machi_csum_table:find(MC2, Offset, Size), + [Entry] = machi_csum_table:find(MC2, Offset, Size), ok = machi_csum_table:trim(MC2, Offset, Size), - {error, trimmed} = machi_csum_table:find(MC2, Offset, Size), + [{Offset, Size, trimmed}] = machi_csum_table:find(MC2, Offset, Size), ok = machi_csum_table:delete(MC2). + +smoke2_test() -> + Filename = "./temp-checksum-dumb-file-3", + _ = file:delete(Filename), + {ok, MC} = machi_csum_table:open(Filename, []), + Entry = {Offset, Size, Checksum} = {1025, 10, <<"deadbeef">>}, + ok = machi_csum_table:write(MC, Offset, Size, Checksum), + [] = machi_csum_table:find(MC, 0, 0), + [?HDR] = machi_csum_table:find(MC, 0, 1), + [Entry] = machi_csum_table:find(MC, Offset, Size), + [?HDR] = machi_csum_table:find(MC, 1, 1024), + [?HDR, Entry] = machi_csum_table:find(MC, 1023, 1024), + [Entry] = machi_csum_table:find(MC, 1024, 1024), + [Entry] = machi_csum_table:find(MC, 1025, 1024), + + ok = machi_csum_table:trim(MC, Offset, Size), + [{Offset, Size, trimmed}] = machi_csum_table:find(MC, Offset, Size), + ok = machi_csum_table:close(MC), + ok = machi_csum_table:delete(MC). + +%% TODO: add quickcheck test here diff --git a/test/machi_file_proxy_eqc.erl b/test/machi_file_proxy_eqc.erl index 725f4ac..d9c3a3b 100644 --- a/test/machi_file_proxy_eqc.erl +++ b/test/machi_file_proxy_eqc.erl @@ -202,7 +202,8 @@ read_next(S, _Res, _Args) -> S. read(Pid, Offset, Length) -> case machi_file_proxy:read(Pid, Offset, Length) of - {ok, [{_, Offset, Data, Csum}]} -> + {ok, Chunks} -> + [{_, Offset, Data, Csum}] = machi_cr_client:trim_both_side(Chunks, Offset, Length), {ok, Data, Csum}; E -> E diff --git a/test/machi_file_proxy_test.erl b/test/machi_file_proxy_test.erl index ea6f784..017effc 100644 --- a/test/machi_file_proxy_test.erl +++ b/test/machi_file_proxy_test.erl @@ -89,8 +89,8 @@ machi_file_proxy_test_() -> ?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1024, ?HYOOGE)), ?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, ?HYOOGE, 1)), ?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1, random_binary(0, ?HYOOGE))), - ?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, random_binary(0, 1024))), - ?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1024, <<"fail">>)), + ?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, random_binary(0, 1024))), + ?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1024, <<"fail">>)), ?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1, <<"fail">>)), ?_assertMatch({ok, [{_, _, _, _}]}, machi_file_proxy:read(Pid, 1025, 1000)), ?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, [], 1024, <<"mind the gap">>)), @@ -98,6 +98,25 @@ machi_file_proxy_test_() -> ?_assertException(exit, {normal, _}, machi_file_proxy:stop(Pid)) ]. +multiple_chunks_read_test_() -> + clean_up_data_dir(?TESTDIR), + {ok, Pid} = machi_file_proxy:start_link("test", ?TESTDIR), + [ + ?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, random_binary(0, 1024))), + ?_assertEqual(ok, machi_file_proxy:write(Pid, 10000, <<"fail">>)), + ?_assertEqual(ok, machi_file_proxy:write(Pid, 20000, <<"fail">>)), + ?_assertEqual(ok, machi_file_proxy:write(Pid, 30000, <<"fail">>)), + %% Freeza + ?_assertEqual(ok, machi_file_proxy:write(Pid, 530000, <<"fail">>)), + ?_assertMatch({ok, [{"test", 1024, _, _}, + {"test", 10000, <<"fail">>, _}, + {"test", 20000, <<"fail">>, _}, + {"test", 30000, <<"fail">>, _}, + {"test", 530000, <<"fail">>, _}]}, + machi_file_proxy:read(Pid, 1024, 530000)), + ?_assertException(exit, {normal, _}, machi_file_proxy:stop(Pid)) + ]. + + -endif. % !PULSE -endif. % TEST. -