Support arbitrary bytes write by using find_(left|right)neighbor/2

This commit is contained in:
UENISHI Kota 2015-10-26 13:53:05 +09:00
parent 9fb19aa8ee
commit 8a61055f55
3 changed files with 189 additions and 24 deletions

View file

@ -1,12 +1,16 @@
-module(machi_csum_table).
-export([open/2,
find/3, write/4, trim/3,
find/3,
write/6, write/4, trim/5, trim/3,
find_leftneighbor/2, find_rightneighbor/2,
all_trimmed/3, any_trimmed/3,
all_trimmed/2,
sync/1,
calc_unwritten_bytes/1,
split_checksum_list_blob_decode/1,
close/1, delete/1]).
close/1, delete/1,
foldl_chunks/3]).
-export([encode_csum_file_entry/3, encode_csum_file_entry_bin/3,
decode_csum_file_entry/1]).
@ -14,6 +18,7 @@
-include("machi.hrl").
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-export([all/1]).
-endif.
@ -76,22 +81,69 @@ all(#machi_csum_table{table=T}) ->
ets:tab2list(T).
-endif.
write(#machi_csum_table{fd=Fd, table=T} = CsumT,
Offset, Size, CSum,
LeftUpdate, RightUpdate) ->
Binary =
[encode_csum_file_entry_bin(Offset, Size, CSum),
case LeftUpdate of
{LO, LS, LCsum} when LO + LS =:= Offset ->
encode_csum_file_entry_bin(LO, LS, LCsum);
undefined ->
<<>>
end,
case RightUpdate of
{RO, RS, RCsum} when RO =:= Offset + Size ->
encode_csum_file_entry_bin(RO, RS, RCsum);
undefined ->
<<>>
end],
case file:write(Fd, Binary) of
ok ->
Chunks = find(CsumT, Offset, Size),
lists:foreach(fun({O, _, _}) ->
ets:delete(T, O)
end, Chunks),
case LeftUpdate of
{LO1, LS1, _} when LO1 + LS1 =:= Offset ->
ets:insert(T, LeftUpdate);
undefined -> noop
end,
case RightUpdate of
{RO1, _, _} when RO1 =:= Offset + Size ->
ets:insert(T, RightUpdate);
undefined -> noop
end,
true = ets:insert(T, {Offset, Size, CSum}),
ok;
Error ->
io:format(user, "boob *********************", []),
Error
end.
find_leftneighbor(CsumT, Offset) ->
case find(CsumT, Offset, 1) of
[] -> undefined;
[{Offset, _, _}] -> undefined;
[{LOffset, _, CsumOrTrimmed}] -> {LOffset, Offset - LOffset, CsumOrTrimmed}
end.
find_rightneighbor(CsumT, Offset) ->
case find(CsumT, Offset, 1) of
[] -> undefined;
[{Offset, _, _}] -> undefined;
[{ROffset, RSize, CsumOrTrimmed}] ->
{Offset, ROffset + RSize - Offset, CsumOrTrimmed}
end.
-spec write(table(), machi_dt:file_offset(), machi_dt:file_size(),
machi_dt:chunk_csum()) ->
ok | {error, used|file:posix()}.
write(#machi_csum_table{fd=Fd, table=T}, Offset, Size, CSum) ->
Binary = encode_csum_file_entry_bin(Offset, Size, CSum),
case file:write(Fd, Binary) of
ok ->
case ets:insert_new(T, {Offset, Size, CSum}) of
true ->
ok;
false ->
{error, written}
end;
Error ->
Error
end.
write(CsumT, Offset, Size, CSum) ->
write(CsumT, Offset, Size, CSum, undefined, undefined).
trim(CsumT, Offset, Size, LeftUpdate, RightUpdate) ->
write(CsumT, Offset, Size, trimmed, LeftUpdate, RightUpdate).
-spec trim(table(), machi_dt:file_offset(), machi_dt:file_size()) ->
ok | {error, file:posix()}.
@ -105,10 +157,21 @@ trim(#machi_csum_table{fd=Fd, table=T}, Offset, Size) ->
Error
end.
-spec all_trimmed(table(), machi_dt:chunk_pos(), machi_dt:chunk_pos()) -> boolean().
all_trimmed(#machi_csum_table{table=T}, Left, Right) ->
runthru(ets:tab2list(T), Left, Right).
-spec all_trimmed(table(), machi_dt:chunk_pos()) -> boolean().
all_trimmed(#machi_csum_table{table=T}, Pos) ->
runthru(ets:tab2list(T), 0, Pos).
-spec any_trimmed(table(),
machi_dt:chunk_pos(),
machi_dt:chunk_size()) -> boolean().
any_trimmed(CsumT, Offset, Size) ->
Chunks = find(CsumT, Offset, Size),
lists:any(fun({_, _, State}) -> State =:= trimmed end, Chunks).
-spec sync(table()) -> ok | {error, file:posix()}.
sync(#machi_csum_table{fd=Fd}) ->
file:sync(Fd).
@ -137,6 +200,13 @@ delete(#machi_csum_table{file=F} = C) ->
E -> E
end.
-spec foldl_chunks(fun(({non_neg_integer(), non_neg_integer(), term()},
Acc0 :: term())
-> Acc :: term()),
Acc0 :: term(), table()) -> Acc :: term().
foldl_chunks(Fun, Acc0, #machi_csum_table{table=T}) ->
ets:foldl(Fun, Acc0, T).
%% @doc Encode `Offset + Size + TaggedCSum' into an `iolist()' type for
%% internal storage by the FLU.
@ -236,9 +306,9 @@ build_unwritten_bytes_list([{CO, CS, _Ck}|Rest], _LastOffset, Acc) ->
%% TODO: test with EQC
runthru([], Pos, Pos) -> true;
runthru([], Pos0, Pos) when Pos0 < Pos -> false;
runthru([{Offset, Size, trimmed}|T], Offset, Pos) ->
runthru(T, Offset+Size, Pos);
runthru(_, _, _) ->
runthru([{Offset0, Size0, trimmed}|T], Offset, Pos) when Offset0 =< Offset ->
runthru(T, Offset0+Size0, Pos);
runthru(_L, _O, _P) ->
false.
%% @doc If you want to find an overlap among two areas [x, y] and [a,

View file

@ -601,7 +601,7 @@ read_all_ranges(FHd, Filename, [{Offset, Size, TaggedCsum}|T], ReadChunks) ->
handle_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Data) ->
Size = iolist_size(Data),
case machi_csum_table:find(CsumTable, Offset, Size) of
[] ->
[] -> %% Nothing should be there
try
do_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Size, Data)
catch
@ -614,7 +614,8 @@ handle_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Data) ->
[{Offset, Size, TaggedCsum}] ->
case do_read(FHd, Filename, CsumTable, Offset, Size, false, false, false) of
{error, _} = E ->
lager:warning("This should never happen: got ~p while reading at offset ~p in file ~p that's supposedly written",
lager:warning("This should never happen: got ~p while reading"
" at offset ~p in file ~p that's supposedly written",
[E, Offset, Filename]),
{error, server_insanity};
{ok, {[{_, Offset, Data, TaggedCsum}], _}} ->
@ -626,12 +627,25 @@ handle_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Data) ->
end;
[{Offset, Size, OtherCsum}] ->
%% Got a checksum, but it doesn't match the data block's
lager:error("During a potential write at offset ~p in file ~p, a check for unwritten bytes gave us checksum ~p but the data we were trying to trying to write has checksum ~p",
lager:error("During a potential write at offset ~p in file ~p,"
" a check for unwritten bytes gave us checksum ~p"
" but the data we were trying to write has checksum ~p",
[Offset, Filename, OtherCsum, TaggedCsum]),
{error, written};
_Chunks ->
%% TODO: Do we try to read all continuous chunks to see
%% wether its total checksum matches client-provided checksum?
case machi_csum_table:any_trimmed(CsumTable, Offset, Size) of
true ->
%% More than a byte is trimmed, besides, do we
%% have to return exact written bytes? No. Clients
%% must issue read_chunk() with needs_trimmed
%% option as true
{error, trimmed};
false ->
%% No byte is trimmed, but at least one byte is written
{error, written}
end
end.
% @private Implements the disk writes for both the write and append
@ -649,7 +663,18 @@ do_write(FHd, CsumTable, Filename, TaggedCsum, Offset, Size, Data) ->
ok ->
lager:debug("Successful write in file ~p at offset ~p, length ~p",
[Filename, Offset, Size]),
ok = machi_csum_table:write(CsumTable, Offset, Size, TaggedCsum),
%% Overlapping chunk; calculate checksum
%% read {LOffset, Offset - LOffset} and make csum
%% as server_sha
LUpdate = maybe_regenerate_checksum(
FHd,
machi_csum_table:find_leftneighbor(CsumTable, Offset)),
RUpdate = maybe_regenerate_checksum(
FHd,
machi_csum_table:find_rightneighbor(CsumTable, Offset+Size)),
ok = machi_csum_table:write(CsumTable, Offset, Size,
TaggedCsum, LUpdate, RUpdate),
lager:debug("Successful write to checksum file for ~p",
[Filename]),
ok;
@ -683,3 +708,20 @@ slice_both_side(Chunks, LeftPos, RightPos) when LeftPos =< RightPos ->
true ->
Chunks
end.
maybe_regenerate_checksum(_, undefined) ->
undefined;
maybe_regenerate_checksum(_, {_, _, trimmed} = Change) ->
Change;
maybe_regenerate_checksum(FHd, {Offset, Size, _Csum}) ->
case file:pread(FHd, Offset, Size) of
eof ->
error(eof);
{ok, Bytes} when byte_size(Bytes) =:= Size ->
TaggedCsum = machi_util:make_tagged_csum(server_regen_sha,
machi_util:checksum_chunk(Bytes)),
{Offset, Size, TaggedCsum};
Error ->
throw(Error)
end.

View file

@ -54,4 +54,57 @@ smoke2_test() ->
ok = machi_csum_table:close(MC),
ok = machi_csum_table:delete(MC).
smoke3_test() ->
Filename = "./temp-checksum-dumb-file-4",
_ = file:delete(Filename),
{ok, MC} = machi_csum_table:open(Filename, []),
Scenario =
[%% Command, {Offset, Size, Csum}, LeftNeighbor, RightNeibor
{?LINE, write, {2000, 10, <<"heh">>}, undefined, undefined},
{?LINE, write, {3000, 10, <<"heh">>}, undefined, undefined},
{?LINE, write, {4000, 10, <<"heh2">>}, undefined, undefined},
{?LINE, write, {4000, 10, <<"heh2">>}, undefined, undefined},
{?LINE, write, {4005, 10, <<"heh3">>}, {4000, 5, <<"heh2">>}, undefined},
{?LINE, write, {4005, 10, <<"heh3">>}, undefined, undefined},
{?LINE, trim, {3005, 10, <<>>}, {3000, 5, <<"heh">>}, undefined},
{?LINE, trim, {2000, 10, <<>>}, undefined, undefined},
{?LINE, trim, {2005, 5, <<>>}, {2000, 5, trimmed}, undefined},
{?LINE, trim, {3000, 5, <<>>}, undefined, undefined},
{?LINE, trim, {4000, 10, <<>>}, undefined, {4010, 5, <<"heh3">>}},
{?LINE, trim, {4010, 5, <<>>}, undefined, undefined},
{?LINE, trim, {0, 1024, <<>>}, undefined, undefined}
],
[ begin
%% ?debugVal({Line, Chunk}),
{Offset, Size, Csum} = Chunk,
?assertEqual(LeftN0,
machi_csum_table:find_leftneighbor(MC, Offset)),
?assertEqual(RightN0,
machi_csum_table:find_rightneighbor(MC, Offset+Size)),
LeftN = case LeftN0 of
{OffsL, SizeL, _} -> {OffsL, SizeL, <<"boom">>};
OtherL -> OtherL
end,
RightN = case RightN0 of
{OffsR, SizeR, _} -> {OffsR, SizeR, <<"boot">>};
OtherR -> OtherR
end,
case Cmd of
write ->
ok = machi_csum_table:write(MC, Offset, Size, Csum,
LeftN, RightN);
trim ->
ok = machi_csum_table:trim(MC, Offset, Size,
LeftN, RightN)
end
end || {_Line, Cmd, Chunk, LeftN0, RightN0} <- Scenario ],
?assert(not machi_csum_table:all_trimmed(MC, 10000)),
machi_csum_table:trim(MC, 0, 10000, undefined, undefined),
?assert(machi_csum_table:all_trimmed(MC, 10000)),
ok = machi_csum_table:close(MC),
ok = machi_csum_table:delete(MC).
%% TODO: add quickcheck test here