commit
e27a59e20f
12 changed files with 340 additions and 342 deletions
|
@ -5,6 +5,9 @@
|
|||
{edoc_opts, [{dir, "./edoc"}]}.
|
||||
|
||||
{deps, [
|
||||
{cuttlefish, ".*", {git, "git://github.com/basho/cuttlefish.git", {branch, "develop"}}},
|
||||
{sext, ".*", {git, "git://github.com/basho/sext.git", {branch, "master"}}},
|
||||
{eleveldb, ".*", {git, "git://github.com/basho/eleveldb.git", {branch, "develop"}}},
|
||||
{lager, ".*", {git, "git://github.com/basho/lager.git", {tag, "2.2.0"}}},
|
||||
{protobuffs, "0.8.*", {git, "git://github.com/basho/erlang_protobuffs.git", {tag, "0.8.1p4"}}},
|
||||
{riak_dt, ".*", {git, "git://github.com/basho/riak_dt.git", {branch, "develop"}}},
|
||||
|
|
|
@ -100,7 +100,7 @@ verify_file_checksums_common(Sock1, EpochID, File, ReadChunk) ->
|
|||
try
|
||||
case ?FLU_C:checksum_list(Sock1, EpochID, File) of
|
||||
{ok, InfoBin} ->
|
||||
{Info, _} = machi_csum_table:split_checksum_list_blob_decode(InfoBin),
|
||||
Info = machi_csum_table:split_checksum_list_blob_decode(InfoBin),
|
||||
Res = lists:foldl(verify_chunk_checksum(File, ReadChunk),
|
||||
[], Info),
|
||||
{ok, Res};
|
||||
|
@ -115,7 +115,9 @@ verify_file_checksums_common(Sock1, EpochID, File, ReadChunk) ->
|
|||
end.
|
||||
|
||||
verify_chunk_checksum(File, ReadChunk) ->
|
||||
fun({Offset, Size, <<_Tag:1/binary, CSum/binary>>}, Acc) ->
|
||||
fun({0, ?MINIMUM_OFFSET, none}, []) ->
|
||||
[];
|
||||
({Offset, Size, <<_Tag:1/binary, CSum/binary>>}, Acc) ->
|
||||
case ReadChunk(File, Offset, Size) of
|
||||
{ok, {[{_, Offset, Chunk, _}], _}} ->
|
||||
CSum2 = machi_util:checksum_chunk(Chunk),
|
||||
|
|
|
@ -136,7 +136,7 @@ load_ets_table(Conn, ETS) ->
|
|||
{ok, Fs} = machi_cr_client:list_files(Conn),
|
||||
[begin
|
||||
{ok, InfoBin} = machi_cr_client:checksum_list(Conn, File),
|
||||
{PosList, _} = machi_csum_table:split_checksum_list_blob_decode(InfoBin),
|
||||
PosList = machi_csum_table:split_checksum_list_blob_decode(InfoBin),
|
||||
StartKey = ets:update_counter(ETS, max_key, 0),
|
||||
%% _EndKey = lists:foldl(fun({Off,Sz,CSum}, K) ->
|
||||
%% V = {File, Off, Sz, CSum},
|
||||
|
|
|
@ -209,9 +209,7 @@ make_repair_directives(ConsistencyMode, RepairMode, File, Size, EpochID,
|
|||
case machi_proxy_flu1_client:checksum_list(
|
||||
Proxy, EpochID, File, ?LONG_TIMEOUT) of
|
||||
{ok, InfoBin} ->
|
||||
{Info, _} =
|
||||
machi_csum_table:split_checksum_list_blob_decode(InfoBin),
|
||||
Info;
|
||||
machi_csum_table:split_checksum_list_blob_decode(InfoBin);
|
||||
{error, no_such_file} ->
|
||||
[]
|
||||
end,
|
||||
|
|
|
@ -2,34 +2,32 @@
|
|||
|
||||
-export([open/2,
|
||||
find/3,
|
||||
write/6, write/4, trim/5, trim/3,
|
||||
write/6, write/4, trim/5,
|
||||
find_leftneighbor/2, find_rightneighbor/2,
|
||||
all_trimmed/3, any_trimmed/3,
|
||||
all_trimmed/2,
|
||||
sync/1,
|
||||
calc_unwritten_bytes/1,
|
||||
split_checksum_list_blob_decode/1,
|
||||
all/1,
|
||||
close/1, delete/1,
|
||||
foldl_chunks/3]).
|
||||
|
||||
-export([encode_csum_file_entry/3, encode_csum_file_entry_bin/3,
|
||||
decode_csum_file_entry/1]).
|
||||
|
||||
-include("machi.hrl").
|
||||
|
||||
-ifdef(TEST).
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-export([all/1]).
|
||||
-endif.
|
||||
|
||||
-record(machi_csum_table,
|
||||
{file :: string(),
|
||||
fd :: file:io_device(),
|
||||
table :: ets:tid()}).
|
||||
table :: eleveldb:db_ref()}).
|
||||
|
||||
-type table() :: #machi_csum_table{}.
|
||||
-type byte_sequence() :: { Offset :: non_neg_integer(),
|
||||
Size :: pos_integer()|infinity }.
|
||||
-type chunk() :: {Offset :: machi_dt:file_offset(),
|
||||
Size :: machi_dt:chunk_size(),
|
||||
machi_dt:chunk_csum() | trimmed | none}.
|
||||
|
||||
-export_type([table/0]).
|
||||
|
||||
|
@ -37,100 +35,101 @@
|
|||
{ok, table()} | {error, file:posix()}.
|
||||
|
||||
open(CSumFilename, _Opts) ->
|
||||
T = ets:new(?MODULE, [private, ordered_set]),
|
||||
CSum = machi_util:make_tagged_csum(none),
|
||||
%% Dummy entry for headers
|
||||
true = ets:insert_new(T, {0, ?MINIMUM_OFFSET, CSum}),
|
||||
LevelDBOptions = [{create_if_missing, true},
|
||||
%% Keep this table small so as not to interfere
|
||||
%% operating system's file cache, which is for
|
||||
%% Machi's main read efficiency
|
||||
{total_leveldb_mem_percent, 10}],
|
||||
{ok, T} = eleveldb:open(CSumFilename, LevelDBOptions),
|
||||
%% Dummy entry for reserved headers
|
||||
ok = eleveldb:put(T,
|
||||
sext:encode({0, ?MINIMUM_OFFSET}),
|
||||
sext:encode(?CSUM_TAG_NONE_ATOM),
|
||||
[{sync, true}]),
|
||||
C0 = #machi_csum_table{
|
||||
file=CSumFilename,
|
||||
table=T},
|
||||
case file:read_file(CSumFilename) of
|
||||
{ok, Bin} ->
|
||||
List = case split_checksum_list_blob_decode(Bin) of
|
||||
{List0, <<>>} ->
|
||||
List0;
|
||||
{List0, _Junk} ->
|
||||
%% Partially written, needs repair TODO
|
||||
List0
|
||||
end,
|
||||
%% assuming all entries are strictly ordered by offset,
|
||||
%% trim command should always come after checksum entry.
|
||||
%% *if* by any chance that order cound not be kept, we
|
||||
%% still can do ordering check and monotonic merge here.
|
||||
%% TODO: make some random injection tests?
|
||||
[begin %% Replay all file contents, Same logic as in write/6
|
||||
Chunks = find(C0, Offset, Size),
|
||||
lists:foreach(fun({O, _, _}) ->
|
||||
ets:delete(T, O)
|
||||
end, Chunks),
|
||||
true = ets:insert(T, {Offset, Size, CsumOrTrimmed})
|
||||
end
|
||||
|| {Offset, Size, CsumOrTrimmed} <- List],
|
||||
ok;
|
||||
{error, enoent} ->
|
||||
ok;
|
||||
Error ->
|
||||
throw(Error)
|
||||
end,
|
||||
{ok, Fd} = file:open(CSumFilename, [raw, binary, append]),
|
||||
{ok, C0#machi_csum_table{fd=Fd}}.
|
||||
{ok, C0}.
|
||||
|
||||
-spec find(table(), machi_dt:file_offset(), machi_dt:file_size()) ->
|
||||
list({machi_dt:file_offset(),
|
||||
machi_dt:file_size(),
|
||||
machi_dt:chunk_csum()|trimmed}).
|
||||
-spec split_checksum_list_blob_decode(binary())-> [chunk()].
|
||||
split_checksum_list_blob_decode(Bin) ->
|
||||
erlang:binary_to_term(Bin).
|
||||
|
||||
|
||||
-define(has_overlap(LeftOffset, LeftSize, RightOffset, RightSize),
|
||||
((LeftOffset - (RightOffset+RightSize)) * (LeftOffset+LeftSize - RightOffset) < 0)).
|
||||
|
||||
-spec find(table(), machi_dt:file_offset(), machi_dt:chunk_size())
|
||||
-> [chunk()].
|
||||
find(#machi_csum_table{table=T}, Offset, Size) ->
|
||||
ets:select(T, [{{'$1', '$2', '$3'},
|
||||
[inclusion_match_spec(Offset, Size)],
|
||||
['$_']}]).
|
||||
{ok, I} = eleveldb:iterator(T, [], keys_only),
|
||||
EndKey = sext:encode({Offset+Size, 0}),
|
||||
StartKey = sext:encode({Offset, Size}),
|
||||
|
||||
-ifdef(TEST).
|
||||
all(#machi_csum_table{table=T}) ->
|
||||
ets:tab2list(T).
|
||||
-endif.
|
||||
{ok, FirstKey} = case eleveldb:iterator_move(I, StartKey) of
|
||||
{error, invalid_iterator} ->
|
||||
eleveldb:iterator_move(I, first);
|
||||
{ok, _} = R0 ->
|
||||
case eleveldb:iterator_move(I, prev) of
|
||||
{error, invalid_iterator} ->
|
||||
R0;
|
||||
{ok, _} = R1 ->
|
||||
R1
|
||||
end
|
||||
end,
|
||||
_ = eleveldb:iterator_close(I),
|
||||
FoldFun = fun({K, V}, Acc) ->
|
||||
{TargetOffset, TargetSize} = sext:decode(K),
|
||||
case ?has_overlap(TargetOffset, TargetSize, Offset, Size) of
|
||||
true ->
|
||||
[{TargetOffset, TargetSize, sext:decode(V)}|Acc];
|
||||
false ->
|
||||
Acc
|
||||
end;
|
||||
(_K, Acc) ->
|
||||
lager:error("~p wrong option", [_K]),
|
||||
Acc
|
||||
end,
|
||||
lists:reverse(eleveldb_fold(T, FirstKey, EndKey, FoldFun, [])).
|
||||
|
||||
write(#machi_csum_table{fd=Fd, table=T} = CsumT,
|
||||
Offset, Size, CSum,
|
||||
|
||||
%% @doc Updates all chunk info, by deleting existing entries if exists
|
||||
%% and putting new chunk info
|
||||
-spec write(table(),
|
||||
machi_dt:file_offset(), machi_dt:chunk_size(),
|
||||
machi_dt:chunk_csum()|'none'|'trimmed',
|
||||
undefined|chunk(), undefined|chunk()) ->
|
||||
ok | {error, term()}.
|
||||
write(#machi_csum_table{table=T} = CsumT, Offset, Size, CSum,
|
||||
LeftUpdate, RightUpdate) ->
|
||||
Binary =
|
||||
[encode_csum_file_entry_bin(Offset, Size, CSum),
|
||||
case LeftUpdate of
|
||||
{LO, LS, LCsum} when LO + LS =:= Offset ->
|
||||
encode_csum_file_entry_bin(LO, LS, LCsum);
|
||||
undefined ->
|
||||
<<>>
|
||||
end,
|
||||
case RightUpdate of
|
||||
{RO, RS, RCsum} when RO =:= Offset + Size ->
|
||||
encode_csum_file_entry_bin(RO, RS, RCsum);
|
||||
undefined ->
|
||||
<<>>
|
||||
end],
|
||||
case file:write(Fd, Binary) of
|
||||
ok ->
|
||||
Chunks = find(CsumT, Offset, Size),
|
||||
lists:foreach(fun({O, _, _}) ->
|
||||
ets:delete(T, O)
|
||||
PutOps =
|
||||
[{put,
|
||||
sext:encode({Offset, Size}),
|
||||
sext:encode(CSum)}]
|
||||
++ case LeftUpdate of
|
||||
{LO, LS, LCsum} when LO + LS =:= Offset ->
|
||||
[{put,
|
||||
sext:encode({LO, LS}),
|
||||
sext:encode(LCsum)}];
|
||||
undefined ->
|
||||
[]
|
||||
end
|
||||
++ case RightUpdate of
|
||||
{RO, RS, RCsum} when RO =:= Offset + Size ->
|
||||
[{put,
|
||||
sext:encode({RO, RS}),
|
||||
sext:encode(RCsum)}];
|
||||
undefined ->
|
||||
[]
|
||||
end,
|
||||
Chunks = find(CsumT, Offset, Size),
|
||||
DeleteOps = lists:map(fun({O, L, _}) ->
|
||||
{delete, sext:encode({O, L})}
|
||||
end, Chunks),
|
||||
case LeftUpdate of
|
||||
{LO1, LS1, _} when LO1 + LS1 =:= Offset ->
|
||||
ets:insert(T, LeftUpdate);
|
||||
undefined -> noop
|
||||
end,
|
||||
case RightUpdate of
|
||||
{RO1, _, _} when RO1 =:= Offset + Size ->
|
||||
ets:insert(T, RightUpdate);
|
||||
undefined -> noop
|
||||
end,
|
||||
true = ets:insert(T, {Offset, Size, CSum}),
|
||||
ok;
|
||||
Error ->
|
||||
Error
|
||||
end.
|
||||
eleveldb:write(T, DeleteOps ++ PutOps, [{sync, true}]).
|
||||
|
||||
-spec find_leftneighbor(table(), non_neg_integer()) ->
|
||||
undefined |
|
||||
{non_neg_integer(), machi_dt:chunk_size(), trimmed|machi_dt:chunk_csum()}.
|
||||
undefined | chunk().
|
||||
find_leftneighbor(CsumT, Offset) ->
|
||||
case find(CsumT, Offset, 1) of
|
||||
[] -> undefined;
|
||||
|
@ -139,8 +138,7 @@ find_leftneighbor(CsumT, Offset) ->
|
|||
end.
|
||||
|
||||
-spec find_rightneighbor(table(), non_neg_integer()) ->
|
||||
undefined |
|
||||
{non_neg_integer(), machi_dt:chunk_size(), trimmed|machi_dt:chunk_csum()}.
|
||||
undefined | chunk().
|
||||
find_rightneighbor(CsumT, Offset) ->
|
||||
case find(CsumT, Offset, 1) of
|
||||
[] -> undefined;
|
||||
|
@ -150,41 +148,47 @@ find_rightneighbor(CsumT, Offset) ->
|
|||
end.
|
||||
|
||||
-spec write(table(), machi_dt:file_offset(), machi_dt:file_size(),
|
||||
machi_dt:chunk_csum()|trimmed) ->
|
||||
machi_dt:chunk_csum()|none|trimmed) ->
|
||||
ok | {error, trimmed|file:posix()}.
|
||||
write(CsumT, Offset, Size, CSum) ->
|
||||
write(CsumT, Offset, Size, CSum, undefined, undefined).
|
||||
|
||||
trim(CsumT, Offset, Size, LeftUpdate, RightUpdate) ->
|
||||
write(CsumT, Offset, Size, trimmed, LeftUpdate, RightUpdate).
|
||||
|
||||
-spec trim(table(), machi_dt:file_offset(), machi_dt:file_size()) ->
|
||||
ok | {error, file:posix()}.
|
||||
trim(#machi_csum_table{fd=Fd, table=T}, Offset, Size) ->
|
||||
Binary = encode_csum_file_entry_bin(Offset, Size, trimmed),
|
||||
case file:write(Fd, Binary) of
|
||||
ok ->
|
||||
true = ets:insert(T, {Offset, Size, trimmed}),
|
||||
ok;
|
||||
Error ->
|
||||
Error
|
||||
end.
|
||||
write(CsumT, Offset, Size,
|
||||
trimmed, %% Should this be much smaller like $t or just 't'
|
||||
LeftUpdate, RightUpdate).
|
||||
|
||||
%% @doc returns whether all bytes in a specific window is continously
|
||||
%% trimmed or not
|
||||
-spec all_trimmed(table(), non_neg_integer(), non_neg_integer()) -> boolean().
|
||||
all_trimmed(#machi_csum_table{table=T}, Left, Right) ->
|
||||
runthru(ets:tab2list(T), Left, Right).
|
||||
|
||||
-spec all_trimmed(table(), non_neg_integer()) -> boolean().
|
||||
all_trimmed(#machi_csum_table{table=T}, Pos) ->
|
||||
case ets:tab2list(T) of
|
||||
[{0, ?MINIMUM_OFFSET, _}|L] ->
|
||||
%% tl/1 to remove header space {0, 1024, <<0>>}
|
||||
runthru(L, ?MINIMUM_OFFSET, Pos);
|
||||
List ->
|
||||
%% In case a header is removed;
|
||||
runthru(List, 0, Pos)
|
||||
FoldFun = fun({_, _}, false) ->
|
||||
false;
|
||||
({K, V}, Pos) when is_integer(Pos) andalso Pos =< Right ->
|
||||
case {sext:decode(K), sext:decode(V)} of
|
||||
{{Pos, Size}, trimmed} ->
|
||||
Pos + Size;
|
||||
{{Offset, Size}, _}
|
||||
when Offset + Size =< Left ->
|
||||
Left;
|
||||
_Eh ->
|
||||
false
|
||||
end
|
||||
end,
|
||||
case eleveldb:fold(T, FoldFun, Left, [{verify_checksums, true}]) of
|
||||
false -> false;
|
||||
Right -> true;
|
||||
LastTrimmed when LastTrimmed < Right -> false;
|
||||
_ -> %% LastTrimmed > Pos0, which is a irregular case but ok
|
||||
true
|
||||
end.
|
||||
|
||||
%% @doc returns whether all bytes 0-Pos0 is continously trimmed or
|
||||
%% not, including header.
|
||||
-spec all_trimmed(table(), non_neg_integer()) -> boolean().
|
||||
all_trimmed(CsumT, Pos0) ->
|
||||
all_trimmed(CsumT, 0, Pos0).
|
||||
|
||||
-spec any_trimmed(table(),
|
||||
pos_integer(),
|
||||
machi_dt:chunk_size()) -> boolean().
|
||||
|
@ -192,13 +196,9 @@ any_trimmed(CsumT, Offset, Size) ->
|
|||
Chunks = find(CsumT, Offset, Size),
|
||||
lists:any(fun({_, _, State}) -> State =:= trimmed end, Chunks).
|
||||
|
||||
-spec sync(table()) -> ok | {error, file:posix()}.
|
||||
sync(#machi_csum_table{fd=Fd}) ->
|
||||
file:sync(Fd).
|
||||
|
||||
-spec calc_unwritten_bytes(table()) -> [byte_sequence()].
|
||||
calc_unwritten_bytes(#machi_csum_table{table=T}) ->
|
||||
case lists:sort(ets:tab2list(T)) of
|
||||
calc_unwritten_bytes(#machi_csum_table{table=_} = CsumT) ->
|
||||
case lists:sort(all(CsumT)) of
|
||||
[] ->
|
||||
[{?MINIMUM_OFFSET, infinity}];
|
||||
Sorted ->
|
||||
|
@ -206,101 +206,34 @@ calc_unwritten_bytes(#machi_csum_table{table=T}) ->
|
|||
build_unwritten_bytes_list(Sorted, LastOffset, [])
|
||||
end.
|
||||
|
||||
all(CsumT) ->
|
||||
FoldFun = fun(E, Acc) -> [E|Acc] end,
|
||||
lists:reverse(foldl_chunks(FoldFun, [], CsumT)).
|
||||
|
||||
-spec close(table()) -> ok.
|
||||
close(#machi_csum_table{table=T, fd=Fd}) ->
|
||||
true = ets:delete(T),
|
||||
ok = file:close(Fd).
|
||||
close(#machi_csum_table{table=T}) ->
|
||||
ok = eleveldb:close(T).
|
||||
|
||||
-spec delete(table()) -> ok.
|
||||
delete(#machi_csum_table{file=F} = C) ->
|
||||
catch close(C),
|
||||
case file:delete(F) of
|
||||
ok -> ok;
|
||||
{error, enoent} -> ok;
|
||||
delete(#machi_csum_table{table=T, file=F}) ->
|
||||
catch eleveldb:close(T),
|
||||
%% TODO change this to directory walk
|
||||
case os:cmd("rm -rf " ++ F) of
|
||||
"" -> ok;
|
||||
E -> E
|
||||
end.
|
||||
|
||||
-spec foldl_chunks(fun(({non_neg_integer(), non_neg_integer(), term()},
|
||||
Acc0 :: term())
|
||||
-> Acc :: term()),
|
||||
-spec foldl_chunks(fun((chunk(), Acc0 :: term()) -> Acc :: term()),
|
||||
Acc0 :: term(), table()) -> Acc :: term().
|
||||
foldl_chunks(Fun, Acc0, #machi_csum_table{table=T}) ->
|
||||
ets:foldl(Fun, Acc0, T).
|
||||
|
||||
%% @doc Encode `Offset + Size + TaggedCSum' into an `iolist()' type for
|
||||
%% internal storage by the FLU.
|
||||
|
||||
-spec encode_csum_file_entry(
|
||||
machi_dt:file_offset(), machi_dt:chunk_size(), machi_dt:chunk_s()) ->
|
||||
iolist().
|
||||
encode_csum_file_entry(Offset, Size, TaggedCSum) ->
|
||||
Len = 8 + 4 + byte_size(TaggedCSum),
|
||||
[<<$w, Len:8/unsigned-big, Offset:64/unsigned-big, Size:32/unsigned-big>>,
|
||||
TaggedCSum].
|
||||
|
||||
%% @doc Encode `Offset + Size + TaggedCSum' into an `binary()' type for
|
||||
%% internal storage by the FLU.
|
||||
|
||||
-spec encode_csum_file_entry_bin(
|
||||
machi_dt:file_offset(), machi_dt:chunk_size(), machi_dt:chunk_s()) ->
|
||||
binary().
|
||||
encode_csum_file_entry_bin(Offset, Size, trimmed) ->
|
||||
<<$t, Offset:64/unsigned-big, Size:32/unsigned-big>>;
|
||||
encode_csum_file_entry_bin(Offset, Size, TaggedCSum) ->
|
||||
Len = 8 + 4 + byte_size(TaggedCSum),
|
||||
<<$w, Len:8/unsigned-big, Offset:64/unsigned-big, Size:32/unsigned-big,
|
||||
TaggedCSum/binary>>.
|
||||
|
||||
%% @doc Decode a single `binary()' blob into an
|
||||
%% `{Offset,Size,TaggedCSum}' tuple.
|
||||
%%
|
||||
%% The internal encoding (which is currently exposed to the outside world
|
||||
%% via this function and related ones) is:
|
||||
%%
|
||||
%% <ul>
|
||||
%% <li> 1 byte: record length
|
||||
%% </li>
|
||||
%% <li> 8 bytes (unsigned big-endian): byte offset
|
||||
%% </li>
|
||||
%% <li> 4 bytes (unsigned big-endian): chunk size
|
||||
%% </li>
|
||||
%% <li> all remaining bytes: tagged checksum (1st byte = type tag)
|
||||
%% </li>
|
||||
%% </ul>
|
||||
%%
|
||||
%% See `machi.hrl' for the tagged checksum types, e.g.,
|
||||
%% `?CSUM_TAG_NONE'.
|
||||
|
||||
-spec decode_csum_file_entry(binary()) ->
|
||||
error |
|
||||
{machi_dt:file_offset(), machi_dt:chunk_size(), machi_dt:chunk_s()}.
|
||||
decode_csum_file_entry(<<_:8/unsigned-big, Offset:64/unsigned-big, Size:32/unsigned-big, TaggedCSum/binary>>) ->
|
||||
{Offset, Size, TaggedCSum};
|
||||
decode_csum_file_entry(_Else) ->
|
||||
error.
|
||||
|
||||
%% @doc Split a `binary()' blob of `checksum_list' data into a list of
|
||||
%% `{Offset,Size,TaggedCSum}' tuples.
|
||||
|
||||
-spec split_checksum_list_blob_decode(binary()) ->
|
||||
{list({machi_dt:file_offset(), machi_dt:chunk_size(), machi_dt:chunk_s()}),
|
||||
TrailingJunk::binary()}.
|
||||
split_checksum_list_blob_decode(Bin) ->
|
||||
split_checksum_list_blob_decode(Bin, []).
|
||||
|
||||
split_checksum_list_blob_decode(<<$w, Len:8/unsigned-big, Part:Len/binary, Rest/binary>>, Acc)->
|
||||
One = <<Len:8/unsigned-big, Part/binary>>,
|
||||
case decode_csum_file_entry(One) of
|
||||
error ->
|
||||
split_checksum_list_blob_decode(Rest, Acc);
|
||||
DecOne ->
|
||||
split_checksum_list_blob_decode(Rest, [DecOne|Acc])
|
||||
end;
|
||||
split_checksum_list_blob_decode(<<$t, Offset:64/unsigned-big, Size:32/unsigned-big, Rest/binary>>, Acc) ->
|
||||
%% trimmed offset
|
||||
split_checksum_list_blob_decode(Rest, [{Offset, Size, trimmed}|Acc]);
|
||||
split_checksum_list_blob_decode(Rest, Acc) ->
|
||||
{lists:reverse(Acc), Rest}.
|
||||
FoldFun = fun({K, V}, Acc) ->
|
||||
{Offset, Len} = sext:decode(K),
|
||||
Fun({Offset, Len, sext:decode(V)}, Acc);
|
||||
(_K, Acc) ->
|
||||
_ = lager:error("~p: wrong option?", [_K]),
|
||||
Acc
|
||||
end,
|
||||
eleveldb:fold(T, FoldFun, Acc0, [{verify_checksums, true}]).
|
||||
|
||||
-spec build_unwritten_bytes_list( CsumData :: [{ Offset :: non_neg_integer(),
|
||||
Size :: pos_integer(),
|
||||
|
@ -322,21 +255,46 @@ build_unwritten_bytes_list([{CurrentOffset, CurrentSize, _Csum}|Rest], LastOffse
|
|||
build_unwritten_bytes_list([{CO, CS, _Ck}|Rest], _LastOffset, Acc) ->
|
||||
build_unwritten_bytes_list(Rest, CO + CS, Acc).
|
||||
|
||||
%% @doc make sure all trimmed chunks are continously chained
|
||||
%% TODO: test with EQC
|
||||
runthru([], Pos, Pos) -> true;
|
||||
runthru([], Pos0, Pos) when Pos0 < Pos -> false;
|
||||
runthru([{Offset0, Size0, trimmed}|T], Offset, Pos) when Offset0 =< Offset ->
|
||||
runthru(T, Offset0+Size0, Pos);
|
||||
runthru(_L, _O, _P) ->
|
||||
false.
|
||||
|
||||
%% @doc If you want to find an overlap among two areas [x, y] and [a,
|
||||
%% b] where x < y and a < b; if (a-y)*(b-x) < 0 then there's a
|
||||
%% overlap, else, > 0 then there're no overlap. border condition = 0
|
||||
%% is not overlap in this offset-size case.
|
||||
inclusion_match_spec(Offset, Size) ->
|
||||
{'>', 0,
|
||||
{'*',
|
||||
{'-', Offset + Size, '$1'},
|
||||
{'-', Offset, {'+', '$1', '$2'}}}}.
|
||||
%% inclusion_match_spec(Offset, Size) ->
|
||||
%% {'>', 0,
|
||||
%% {'*',
|
||||
%% {'-', Offset + Size, '$1'},
|
||||
%% {'-', Offset, {'+', '$1', '$2'}}}}.
|
||||
|
||||
-spec eleveldb_fold(eleveldb:db_ref(), binary(), binary(),
|
||||
fun(({binary(), binary()}, AccType::term()) -> AccType::term()),
|
||||
AccType0::term()) ->
|
||||
AccType::term().
|
||||
eleveldb_fold(Ref, Start, End, FoldFun, InitAcc) ->
|
||||
{ok, Iterator} = eleveldb:iterator(Ref, []),
|
||||
try
|
||||
eleveldb_do_fold(eleveldb:iterator_move(Iterator, Start),
|
||||
Iterator, End, FoldFun, InitAcc)
|
||||
catch throw:IteratorClosed ->
|
||||
{error, IteratorClosed}
|
||||
after
|
||||
eleveldb:iterator_close(Iterator)
|
||||
end.
|
||||
|
||||
-spec eleveldb_do_fold({ok, binary(), binary()}|{error, iterator_closed|invalid_iterator}|{ok,binary()},
|
||||
eleveldb:itr_ref(), binary(),
|
||||
fun(({binary(), binary()}, AccType::term()) -> AccType::term()),
|
||||
AccType::term()) ->
|
||||
AccType::term().
|
||||
eleveldb_do_fold({ok, Key, Value}, _, End, FoldFun, Acc)
|
||||
when End < Key ->
|
||||
FoldFun({Key, Value}, Acc);
|
||||
eleveldb_do_fold({ok, Key, Value}, Iterator, End, FoldFun, Acc) ->
|
||||
eleveldb_do_fold(eleveldb:iterator_move(Iterator, next),
|
||||
Iterator, End, FoldFun,
|
||||
FoldFun({Key, Value}, Acc));
|
||||
eleveldb_do_fold({error, iterator_closed}, _, _, _, Acc) ->
|
||||
%% It's really an error which is not expected
|
||||
throw({iterator_closed, Acc});
|
||||
eleveldb_do_fold({error, invalid_iterator}, _, _, _, Acc) ->
|
||||
%% Probably reached to end
|
||||
Acc.
|
||||
|
|
|
@ -57,7 +57,8 @@
|
|||
write/4,
|
||||
trim/4,
|
||||
append/2,
|
||||
append/4
|
||||
append/4,
|
||||
checksum_list/1
|
||||
]).
|
||||
|
||||
%% gen_server callbacks
|
||||
|
@ -210,6 +211,10 @@ append(_Pid, ClientMeta, Extra, _Data) ->
|
|||
lager:warning("Bad arg to append: ClientMeta ~p, Extra ~p", [ClientMeta, Extra]),
|
||||
{error, bad_arg}.
|
||||
|
||||
-spec checksum_list(pid()) -> {ok, list()}.
|
||||
checksum_list(Pid) ->
|
||||
gen_server:call(Pid, {checksum_list}, ?TIMEOUT).
|
||||
|
||||
%% gen_server callbacks
|
||||
|
||||
% @private
|
||||
|
@ -249,27 +254,22 @@ handle_call({sync, data}, _From, State = #state{ data_filehandle = FHd }) ->
|
|||
R = file:sync(FHd),
|
||||
{reply, R, State};
|
||||
|
||||
handle_call({sync, csum}, _From, State = #state{ csum_table = T }) ->
|
||||
R = machi_csum_table:sync(T),
|
||||
{reply, R, State};
|
||||
handle_call({sync, csum}, _From, State) ->
|
||||
%% machi_csum_table always writes in {sync, true} option, so here
|
||||
%% explicit sync isn't actually needed.
|
||||
{reply, ok, State};
|
||||
|
||||
handle_call({sync, all}, _From, State = #state{filename = F,
|
||||
data_filehandle = FHd,
|
||||
csum_table = T
|
||||
csum_table = _T
|
||||
}) ->
|
||||
R = machi_csum_table:sync(T),
|
||||
R1 = file:sync(FHd),
|
||||
Resp = case {R, R1} of
|
||||
{ok, ok} -> ok;
|
||||
{ok, O1} ->
|
||||
lager:error("Got ~p during a data file sync on file ~p", [O1, F]),
|
||||
O1;
|
||||
{O2, ok} ->
|
||||
lager:error("Got ~p during a csum file sync on file ~p", [O2, F]),
|
||||
O2;
|
||||
{O3, O4} ->
|
||||
lager:error("Got ~p ~p syncing all files for file ~p", [O3, O4, F]),
|
||||
{O3, O4}
|
||||
Resp = case file:sync(FHd) of
|
||||
ok ->
|
||||
ok;
|
||||
Error ->
|
||||
lager:error("Got ~p syncing all files for file ~p",
|
||||
[Error, F]),
|
||||
Error
|
||||
end,
|
||||
{reply, Resp, State};
|
||||
|
||||
|
@ -435,6 +435,10 @@ handle_call({append, ClientMeta, Extra, Data}, _From,
|
|||
{reply, Resp, State#state{appends = {T+1, NewErr},
|
||||
eof_position = NewEof}};
|
||||
|
||||
handle_call({checksum_list}, _FRom, State = #state{csum_table=T}) ->
|
||||
All = machi_csum_table:all(T),
|
||||
{reply, {ok, All}, State};
|
||||
|
||||
handle_call(Req, _From, State) ->
|
||||
lager:warning("Unknown call: ~p", [Req]),
|
||||
{reply, whoaaaaaaaaaaaa, State}.
|
||||
|
@ -545,7 +549,6 @@ terminate(Reason, #state{filename = F,
|
|||
undefined ->
|
||||
noop; %% file deleted
|
||||
_ ->
|
||||
ok = machi_csum_table:sync(T),
|
||||
ok = machi_csum_table:close(T)
|
||||
end,
|
||||
ok.
|
||||
|
@ -597,7 +600,8 @@ check_or_make_tagged_csum(OtherTag, _ClientCsum, _Data) ->
|
|||
Size :: non_neg_integer(),
|
||||
NoChunk :: boolean(),
|
||||
NoChecksum :: boolean()
|
||||
) -> {ok, Chunks :: [{string(), Offset::non_neg_integer(), binary(), Csum :: binary()}]} |
|
||||
) -> {ok, {Chunks :: [{string(), Offset::non_neg_integer(), binary(), Csum :: binary()}],
|
||||
Trimmed :: [{string(), Offset::non_neg_integer(), Size::non_neg_integer()}]}} |
|
||||
{error, bad_checksum} |
|
||||
{error, partial_read} |
|
||||
{error, file:posix()} |
|
||||
|
@ -621,6 +625,14 @@ do_read(FHd, Filename, CsumTable, Offset, Size, _, _) ->
|
|||
ChunkCsums = machi_csum_table:find(CsumTable, Offset, Size),
|
||||
read_all_ranges(FHd, Filename, ChunkCsums, [], []).
|
||||
|
||||
-spec read_all_ranges(file:io_device(), string(),
|
||||
[{non_neg_integer(),non_neg_integer(),trimmed|binary()}],
|
||||
Chunks :: [{string(), Offset::non_neg_integer(), binary(), Csum::binary()}],
|
||||
Trimmed :: [{string(), Offset::non_neg_integer(), Size::non_neg_integer()}]) ->
|
||||
{ok, {
|
||||
Chunks :: [{string(), Offset::non_neg_integer(), binary(), Csum::binary()}],
|
||||
Trimmed :: [{string(), Offset::non_neg_integer(), Size::non_neg_integer()}]}} |
|
||||
{erorr, term()|partial_read}.
|
||||
read_all_ranges(_, _, [], ReadChunks, TrimmedChunks) ->
|
||||
%% TODO: currently returns empty list of trimmed chunks
|
||||
{ok, {lists:reverse(ReadChunks), lists:reverse(TrimmedChunks)}};
|
||||
|
@ -632,6 +644,11 @@ read_all_ranges(FHd, Filename, [{Offset, Size, TaggedCsum}|T], ReadChunks, Trimm
|
|||
case file:pread(FHd, Offset, Size) of
|
||||
eof ->
|
||||
read_all_ranges(FHd, Filename, T, ReadChunks, TrimmedChunks);
|
||||
{ok, Bytes} when byte_size(Bytes) == Size, TaggedCsum =:= none ->
|
||||
read_all_ranges(FHd, Filename, T,
|
||||
[{Filename, Offset, Bytes,
|
||||
machi_util:make_tagged_csum(none, <<>>)}|ReadChunks],
|
||||
TrimmedChunks);
|
||||
{ok, Bytes} when byte_size(Bytes) == Size ->
|
||||
{Tag, Ck} = machi_util:unmake_tagged_csum(TaggedCsum),
|
||||
case check_or_make_tagged_csum(Tag, Ck, Bytes) of
|
||||
|
@ -827,7 +844,7 @@ maybe_gc(Reply, S = #state{fluname=FluName,
|
|||
filename = Filename,
|
||||
eof_position = Eof,
|
||||
csum_table=CsumTable}) ->
|
||||
case machi_csum_table:all_trimmed(CsumTable, Eof) of
|
||||
case machi_csum_table:all_trimmed(CsumTable, ?MINIMUM_OFFSET, Eof) of
|
||||
true ->
|
||||
lager:debug("GC? Let's do it: ~p.~n", [Filename]),
|
||||
%% Before unlinking a file, it should inform
|
||||
|
|
|
@ -595,26 +595,22 @@ do_server_trim_chunk(File, Offset, Size, TriggerGC, #state{flu_name=FluName}) ->
|
|||
do_server_checksum_listing(File, #state{flu_name=FluName, data_dir=DataDir}=_S) ->
|
||||
case sanitize_file_string(File) of
|
||||
ok ->
|
||||
ok = sync_checksum_file(FluName, File),
|
||||
CSumPath = machi_util:make_checksum_filename(DataDir, File),
|
||||
%% TODO: If this file is legitimately bigger than our
|
||||
%% {packet_size,N} limit, then we'll have a difficult time, eh?
|
||||
case file:read_file(CSumPath) of
|
||||
{ok, Bin} ->
|
||||
case machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}) of
|
||||
{ok, Pid} ->
|
||||
{ok, List} = machi_file_proxy:checksum_list(Pid),
|
||||
Bin = erlang:term_to_binary(List),
|
||||
if byte_size(Bin) > (?PB_MAX_MSG_SIZE - 1024) ->
|
||||
%% TODO: Fix this limitation by streaming the
|
||||
%% binary in multiple smaller PB messages.
|
||||
%% Also, don't read the file all at once. ^_^
|
||||
error_logger:error_msg("~s:~w oversize ~s\n",
|
||||
[?MODULE, ?LINE, CSumPath]),
|
||||
[?MODULE, ?LINE, DataDir]),
|
||||
{error, bad_arg};
|
||||
true ->
|
||||
{ok, Bin}
|
||||
end;
|
||||
{error, enoent} ->
|
||||
{error, no_such_file};
|
||||
{error, _} ->
|
||||
{error, bad_arg}
|
||||
{error, trimmed} ->
|
||||
{error, trimmed}
|
||||
end;
|
||||
_ ->
|
||||
{error, bad_arg}
|
||||
|
@ -728,21 +724,6 @@ sanitize_prefix(Prefix) ->
|
|||
error
|
||||
end.
|
||||
|
||||
sync_checksum_file(FluName, File) ->
|
||||
%% We just lookup the pid here - we don't start a proxy server. If
|
||||
%% there isn't a pid for this file, then we just return ok. The
|
||||
%% csum file was synced when the proxy was shutdown.
|
||||
%%
|
||||
%% If there *is* a pid, we call the sync function to ensure the
|
||||
%% csum file is sync'd before we return. (Or an error if we get
|
||||
%% an error).
|
||||
case machi_flu_metadata_mgr:lookup_proxy_pid(FluName, {file, File}) of
|
||||
undefined ->
|
||||
ok;
|
||||
Pid ->
|
||||
machi_file_proxy:sync(Pid, csum)
|
||||
end.
|
||||
|
||||
make_listener_regname(BaseName) ->
|
||||
list_to_atom(atom_to_list(BaseName) ++ "_listener").
|
||||
|
||||
|
|
|
@ -180,9 +180,9 @@ checksum_list(#yessir{name=Name,chunk_size=ChunkSize}, _EpochID, File) ->
|
|||
MaxOffset ->
|
||||
C = machi_util:make_tagged_csum(client_sha,
|
||||
make_csum(Name, ChunkSize)),
|
||||
Cs = [machi_csum_table:encode_csum_file_entry_bin(Offset, ChunkSize, C) ||
|
||||
Offset <- lists:seq(?MINIMUM_OFFSET, MaxOffset, ChunkSize)],
|
||||
{ok, Cs}
|
||||
Cs = [{Offset, ChunkSize, C} ||
|
||||
Offset <- lists:seq(?MINIMUM_OFFSET, MaxOffset, ChunkSize)],
|
||||
{ok, term_to_binary(Cs)}
|
||||
end.
|
||||
|
||||
%% @doc Fetch the list of chunk checksums for `File'.
|
||||
|
|
|
@ -33,7 +33,11 @@
|
|||
-define(FLU_C, machi_flu1_client).
|
||||
|
||||
verify_file_checksums_test_() ->
|
||||
{timeout, 60, fun() -> verify_file_checksums_test2() end}.
|
||||
{setup,
|
||||
fun() -> os:cmd("rm -rf ./data") end,
|
||||
fun(_) -> os:cmd("rm -rf ./data") end,
|
||||
{timeout, 60, fun() -> verify_file_checksums_test2() end}
|
||||
}.
|
||||
|
||||
verify_file_checksums_test2() ->
|
||||
Host = "localhost",
|
||||
|
@ -50,8 +54,9 @@ verify_file_checksums_test2() ->
|
|||
Prefix, <<X:(X*8)/big>>) ||
|
||||
X <- lists:seq(1, NumChunks)],
|
||||
{ok, [{_FileSize,File}]} = ?FLU_C:list_files(Sock1, ?DUMMY_PV1_EPOCH),
|
||||
{ok, []} = machi_admin_util:verify_file_checksums_remote(
|
||||
Host, TcpPort, ?DUMMY_PV1_EPOCH, File),
|
||||
?assertEqual({ok, []},
|
||||
machi_admin_util:verify_file_checksums_remote(
|
||||
Host, TcpPort, ?DUMMY_PV1_EPOCH, File)),
|
||||
|
||||
%% Clobber the first 3 chunks, which are sizes 1/2/3.
|
||||
{_, Path} = machi_util:make_data_filename(DataDir,binary_to_list(File)),
|
||||
|
|
|
@ -2,26 +2,31 @@
|
|||
-compile(export_all).
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-define(HDR, {0, 1024, <<0>>}).
|
||||
-define(HDR, {0, 1024, none}).
|
||||
|
||||
cleanup(Dir) ->
|
||||
os:cmd("rm -rf " ++ Dir).
|
||||
|
||||
smoke_test() ->
|
||||
Filename = "./temp-checksum-dumb-file",
|
||||
_ = file:delete(Filename),
|
||||
_ = cleanup(Filename),
|
||||
{ok, MC} = machi_csum_table:open(Filename, []),
|
||||
[{1024, infinity}] = machi_csum_table:calc_unwritten_bytes(MC),
|
||||
?assertEqual([{1024, infinity}],
|
||||
machi_csum_table:calc_unwritten_bytes(MC)),
|
||||
Entry = {Offset, Size, Checksum} = {1064, 34, <<"deadbeef">>},
|
||||
[] = machi_csum_table:find(MC, Offset, Size),
|
||||
ok = machi_csum_table:write(MC, Offset, Size, Checksum),
|
||||
[{1024, 40}, {1098, infinity}] = machi_csum_table:calc_unwritten_bytes(MC),
|
||||
[Entry] = machi_csum_table:find(MC, Offset, Size),
|
||||
ok = machi_csum_table:trim(MC, Offset, Size),
|
||||
[{Offset, Size, trimmed}] = machi_csum_table:find(MC, Offset, Size),
|
||||
?assertEqual([Entry], machi_csum_table:find(MC, Offset, Size)),
|
||||
ok = machi_csum_table:trim(MC, Offset, Size, undefined, undefined),
|
||||
?assertEqual([{Offset, Size, trimmed}],
|
||||
machi_csum_table:find(MC, Offset, Size)),
|
||||
ok = machi_csum_table:close(MC),
|
||||
ok = machi_csum_table:delete(MC).
|
||||
|
||||
close_test() ->
|
||||
Filename = "./temp-checksum-dumb-file-2",
|
||||
_ = file:delete(Filename),
|
||||
_ = cleanup(Filename),
|
||||
{ok, MC} = machi_csum_table:open(Filename, []),
|
||||
Entry = {Offset, Size, Checksum} = {1064, 34, <<"deadbeef">>},
|
||||
[] = machi_csum_table:find(MC, Offset, Size),
|
||||
|
@ -31,32 +36,33 @@ close_test() ->
|
|||
|
||||
{ok, MC2} = machi_csum_table:open(Filename, []),
|
||||
[Entry] = machi_csum_table:find(MC2, Offset, Size),
|
||||
ok = machi_csum_table:trim(MC2, Offset, Size),
|
||||
ok = machi_csum_table:trim(MC2, Offset, Size, undefined, undefined),
|
||||
[{Offset, Size, trimmed}] = machi_csum_table:find(MC2, Offset, Size),
|
||||
ok = machi_csum_table:delete(MC2).
|
||||
|
||||
smoke2_test() ->
|
||||
Filename = "./temp-checksum-dumb-file-3",
|
||||
_ = file:delete(Filename),
|
||||
_ = cleanup(Filename),
|
||||
{ok, MC} = machi_csum_table:open(Filename, []),
|
||||
Entry = {Offset, Size, Checksum} = {1025, 10, <<"deadbeef">>},
|
||||
ok = machi_csum_table:write(MC, Offset, Size, Checksum),
|
||||
[] = machi_csum_table:find(MC, 0, 0),
|
||||
[?HDR] = machi_csum_table:find(MC, 0, 1),
|
||||
?assertEqual([], machi_csum_table:find(MC, 0, 0)),
|
||||
?assertEqual([?HDR], machi_csum_table:find(MC, 0, 1)),
|
||||
[Entry] = machi_csum_table:find(MC, Offset, Size),
|
||||
[?HDR] = machi_csum_table:find(MC, 1, 1024),
|
||||
[?HDR, Entry] = machi_csum_table:find(MC, 1023, 1024),
|
||||
?assertEqual([?HDR, Entry],
|
||||
machi_csum_table:find(MC, 1023, 1024)),
|
||||
[Entry] = machi_csum_table:find(MC, 1024, 1024),
|
||||
[Entry] = machi_csum_table:find(MC, 1025, 1024),
|
||||
|
||||
ok = machi_csum_table:trim(MC, Offset, Size),
|
||||
ok = machi_csum_table:trim(MC, Offset, Size, undefined, undefined),
|
||||
[{Offset, Size, trimmed}] = machi_csum_table:find(MC, Offset, Size),
|
||||
ok = machi_csum_table:close(MC),
|
||||
ok = machi_csum_table:delete(MC).
|
||||
|
||||
smoke3_test() ->
|
||||
Filename = "./temp-checksum-dumb-file-4",
|
||||
_ = file:delete(Filename),
|
||||
_ = cleanup(Filename),
|
||||
{ok, MC} = machi_csum_table:open(Filename, []),
|
||||
Scenario =
|
||||
[%% Command, {Offset, Size, Csum}, LeftNeighbor, RightNeibor
|
||||
|
@ -107,3 +113,19 @@ smoke3_test() ->
|
|||
ok = machi_csum_table:delete(MC).
|
||||
|
||||
%% TODO: add quickcheck test here
|
||||
|
||||
%% Previous implementation
|
||||
-spec all_trimmed2(machi_csum_table:table(),
|
||||
non_neg_integer(), non_neg_integer()) -> boolean().
|
||||
all_trimmed2(CsumT, Left, Right) ->
|
||||
Chunks = machi_csum_table:find(CsumT, Left, Right),
|
||||
runthru(Chunks, Left, Right).
|
||||
|
||||
%% @doc make sure all trimmed chunks are continously chained
|
||||
%% TODO: test with EQC
|
||||
runthru([], Pos, Pos) -> true;
|
||||
runthru([], Pos0, Pos) when Pos0 < Pos -> false;
|
||||
runthru([{Offset0, Size0, trimmed}|T], Offset, Pos) when Offset0 =< Offset ->
|
||||
runthru(T, Offset0+Size0, Pos);
|
||||
runthru(_L, _O, _P) ->
|
||||
false.
|
||||
|
|
|
@ -38,7 +38,7 @@ eqc_test_() ->
|
|||
{timeout, 60,
|
||||
{spawn,
|
||||
[
|
||||
{timeout, 30, ?_assertEqual(true, eqc:quickcheck(eqc:testing_time(15, ?QC_OUT(prop_ok()))))}
|
||||
?_assertEqual(true, eqc:quickcheck(eqc:testing_time(30, ?QC_OUT(prop_ok()))))
|
||||
]
|
||||
}}.
|
||||
|
||||
|
|
|
@ -76,54 +76,66 @@ random_binary(Start, End) ->
|
|||
binary:part(random_binary_single(), Start, End)
|
||||
end.
|
||||
|
||||
setup() ->
|
||||
{ok, Pid} = machi_file_proxy:start_link(fluname, "test", ?TESTDIR),
|
||||
Pid.
|
||||
|
||||
teardown(Pid) ->
|
||||
catch machi_file_proxy:stop(Pid).
|
||||
|
||||
machi_file_proxy_test_() ->
|
||||
clean_up_data_dir(?TESTDIR),
|
||||
{ok, Pid} = machi_file_proxy:start_link(fluname, "test", ?TESTDIR),
|
||||
[
|
||||
?_assertEqual({error, bad_arg}, machi_file_proxy:read(Pid, -1, -1)),
|
||||
?_assertEqual({error, bad_arg}, machi_file_proxy:write(Pid, -1, <<"yo">>)),
|
||||
?_assertEqual({error, bad_arg}, machi_file_proxy:append(Pid, [], -1, <<"krep">>)),
|
||||
?_assertMatch({ok, {_, []}}, machi_file_proxy:read(Pid, 1, 1)),
|
||||
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1024, 1)),
|
||||
?_assertMatch({ok, {_, []}}, machi_file_proxy:read(Pid, 1, 1024)),
|
||||
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1024, ?HYOOGE)),
|
||||
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, ?HYOOGE, 1)),
|
||||
?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1, random_binary(0, ?HYOOGE))),
|
||||
?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, random_binary(0, 1024))),
|
||||
?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1024, <<"fail">>)),
|
||||
?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1, <<"fail">>)),
|
||||
?_assertMatch({ok, {[{_, _, _, _}], []}}, machi_file_proxy:read(Pid, 1025, 1000)),
|
||||
?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, [], 1024, <<"mind the gap">>)),
|
||||
?_assertEqual(ok, machi_file_proxy:write(Pid, 2060, [], random_binary(0, 1024))),
|
||||
?_assertException(exit, {normal, _}, machi_file_proxy:stop(Pid))
|
||||
].
|
||||
{setup,
|
||||
fun setup/0,
|
||||
fun teardown/1,
|
||||
fun(Pid) ->
|
||||
[
|
||||
?_assertEqual({error, bad_arg}, machi_file_proxy:read(Pid, -1, -1)),
|
||||
?_assertEqual({error, bad_arg}, machi_file_proxy:write(Pid, -1, <<"yo">>)),
|
||||
?_assertEqual({error, bad_arg}, machi_file_proxy:append(Pid, [], -1, <<"krep">>)),
|
||||
?_assertMatch({ok, {_, []}}, machi_file_proxy:read(Pid, 1, 1)),
|
||||
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1024, 1)),
|
||||
?_assertMatch({ok, {_, []}}, machi_file_proxy:read(Pid, 1, 1024)),
|
||||
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1024, ?HYOOGE)),
|
||||
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, ?HYOOGE, 1)),
|
||||
?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1, random_binary(0, ?HYOOGE))),
|
||||
?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, random_binary(0, 1024))),
|
||||
?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1024, <<"fail">>)),
|
||||
?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1, <<"fail">>)),
|
||||
?_assertMatch({ok, {[{_, _, _, _}], []}}, machi_file_proxy:read(Pid, 1025, 1000)),
|
||||
?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, [], 1024, <<"mind the gap">>)),
|
||||
?_assertEqual(ok, machi_file_proxy:write(Pid, 2060, [], random_binary(0, 1024)))
|
||||
]
|
||||
end}.
|
||||
|
||||
multiple_chunks_read_test_() ->
|
||||
clean_up_data_dir(?TESTDIR),
|
||||
{ok, Pid} = machi_file_proxy:start_link(fluname, "test", ?TESTDIR),
|
||||
[
|
||||
?_assertEqual(ok, machi_file_proxy:trim(Pid, 0, 1, false)),
|
||||
?_assertMatch({ok, {[], [{"test", 0, 1}]}},
|
||||
machi_file_proxy:read(Pid, 0, 1,
|
||||
[{needs_trimmed, true}])),
|
||||
?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, random_binary(0, 1024))),
|
||||
?_assertEqual(ok, machi_file_proxy:write(Pid, 10000, <<"fail">>)),
|
||||
?_assertEqual(ok, machi_file_proxy:write(Pid, 20000, <<"fail">>)),
|
||||
?_assertEqual(ok, machi_file_proxy:write(Pid, 30000, <<"fail">>)),
|
||||
%% Freeza
|
||||
?_assertEqual(ok, machi_file_proxy:write(Pid, 530000, <<"fail">>)),
|
||||
?_assertMatch({ok, {[{"test", 1024, _, _},
|
||||
{"test", 10000, <<"fail">>, _},
|
||||
{"test", 20000, <<"fail">>, _},
|
||||
{"test", 30000, <<"fail">>, _},
|
||||
{"test", 530000, <<"fail">>, _}], []}},
|
||||
machi_file_proxy:read(Pid, 1024, 530000)),
|
||||
?_assertMatch({ok, {[{"test", 1, _, _}], [{"test", 0, 1}]}},
|
||||
machi_file_proxy:read(Pid, 0, 1024,
|
||||
[{needs_trimmed, true}])),
|
||||
?_assertException(exit, {normal, _}, machi_file_proxy:stop(Pid))
|
||||
].
|
||||
|
||||
{setup,
|
||||
fun setup/0,
|
||||
fun teardown/1,
|
||||
fun(Pid) ->
|
||||
[
|
||||
?_assertEqual(ok, machi_file_proxy:trim(Pid, 0, 1, false)),
|
||||
?_assertMatch({ok, {[], [{"test", 0, 1}]}},
|
||||
machi_file_proxy:read(Pid, 0, 1,
|
||||
[{needs_trimmed, true}])),
|
||||
?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, random_binary(0, 1024))),
|
||||
?_assertEqual(ok, machi_file_proxy:write(Pid, 10000, <<"fail">>)),
|
||||
?_assertEqual(ok, machi_file_proxy:write(Pid, 20000, <<"fail">>)),
|
||||
?_assertEqual(ok, machi_file_proxy:write(Pid, 30000, <<"fail">>)),
|
||||
%% Freeza
|
||||
?_assertEqual(ok, machi_file_proxy:write(Pid, 530000, <<"fail">>)),
|
||||
?_assertMatch({ok, {[{"test", 1024, _, _},
|
||||
{"test", 10000, <<"fail">>, _},
|
||||
{"test", 20000, <<"fail">>, _},
|
||||
{"test", 30000, <<"fail">>, _},
|
||||
{"test", 530000, <<"fail">>, _}], []}},
|
||||
machi_file_proxy:read(Pid, 1024, 530000)),
|
||||
?_assertMatch({ok, {[{"test", 1, _, _}], [{"test", 0, 1}]}},
|
||||
machi_file_proxy:read(Pid, 0, 1024,
|
||||
[{needs_trimmed, true}]))
|
||||
]
|
||||
end}.
|
||||
|
||||
-endif. % !PULSE
|
||||
-endif. % TEST.
|
||||
|
|
Loading…
Reference in a new issue