Testing WIP

This commit is contained in:
Mark Allen 2015-08-24 12:17:56 -05:00
parent 772a7741f2
commit ff7a8e58c2
3 changed files with 388 additions and 30 deletions

View file

@ -46,6 +46,7 @@
%% public API
-export([
start_link/2,
stop/1,
sync/1,
sync/2,
read/3,
@ -65,7 +66,7 @@
code_change/3
]).
-define(TICK, 5*1000).
-define(TICK, 30*1000). %% XXX FIXME Should be something like 5 seconds
-define(TICK_THRESHOLD, 5). %% After this + 1 more quiescent ticks, shutdown
-define(TIMEOUT, 10*1000).
-define(TOO_MANY_ERRORS_RATIO, 50).
@ -104,28 +105,48 @@
start_link(Filename, DataDir) ->
gen_server:start_link(?MODULE, {Filename, DataDir}, []).
% @doc Request to stop an instance of the file proxy service.
-spec stop(Pid :: pid()) -> ok.
stop(Pid) when is_pid(Pid) ->
gen_server:call(Pid, {stop}, ?TIMEOUT).
% @doc Force a sync of all filehandles
-spec sync(Pid :: pid()) -> ok|{error, term()}.
sync(Pid) ->
sync(Pid, all).
sync(Pid) when is_pid(Pid) ->
sync(Pid, all);
sync(_Pid) ->
lager:warning("Bad pid to sync"),
{error, bad_arg}.
% @doc Force a sync of a specific filehandle type. Valid types are `all', `csum' and `data'.
-spec sync(Pid :: pid(), Type :: all|data|csum) -> ok|{error, term()}.
sync(Pid, Type) ->
gen_server:call(Pid, {sync, Type}, ?TIMEOUT).
sync(Pid, Type) when is_pid(Pid) andalso
( Type =:= all orelse Type =:= csum orelse Type =:= data ) ->
gen_server:call(Pid, {sync, Type}, ?TIMEOUT);
sync(_Pid, Type) ->
lager:warning("Bad arg to sync: Type ~p", [Type]),
{error, bad_arg}.
% @doc Read file at offset for length
-spec read(Pid :: pid(),
Offset :: non_neg_integer(),
Length :: non_neg_integer()) -> {ok, Data :: binary(), Checksum :: binary()} |
{error, Reason :: term()}.
read(Pid, Offset, Length) ->
gen_server:call(Pid, {read, Offset, Length}, ?TIMEOUT).
read(Pid, Offset, Length) when is_pid(Pid) andalso is_integer(Offset) andalso Offset >= 0
andalso is_integer(Length) andalso Length > 0 ->
gen_server:call(Pid, {read, Offset, Length}, ?TIMEOUT);
read(_Pid, Offset, Length) ->
lager:warning("Bad args to read: Offset ~p, Length ~p", [Offset, Length]),
{error, bad_arg}.
% @doc Write data at offset
-spec write(Pid :: pid(), Offset :: non_neg_integer(), Data :: binary()) -> ok|{error, term()}.
write(Pid, Offset, Data) ->
write(Pid, Offset, [], Data).
write(Pid, Offset, Data) when is_pid(Pid) andalso is_integer(Offset) andalso Offset >= 0
andalso is_binary(Data) ->
write(Pid, Offset, [], Data);
write(_Pid, Offset, _Data) ->
lager:warning("Bad arg to write: Offset ~p", [Offset]),
{error, bad_arg}.
% @doc Write data at offset, including the client metadata. ClientMeta is a proplist
% that expects the following keys and values:
@ -135,34 +156,50 @@ write(Pid, Offset, Data) ->
% </ul>
-spec write(Pid :: pid(), Offset :: non_neg_integer(), ClientMeta :: proplists:proplist(),
Data :: binary()) -> ok|{error, term()}.
write(Pid, Offset, ClientMeta, Data) ->
gen_server:call(Pid, {write, Offset, ClientMeta, Data}, ?TIMEOUT).
write(Pid, Offset, ClientMeta, Data) when is_pid(Pid) andalso is_integer(Offset) andalso Offset >= 0
andalso is_list(ClientMeta) andalso is_binary(Data) ->
gen_server:call(Pid, {write, Offset, ClientMeta, Data}, ?TIMEOUT);
write(_Pid, Offset, ClientMeta, _Data) ->
lager:warning("Bad arg to write: Offset ~p, ClientMeta: ~p", [Offset, ClientMeta]),
{error, bad_arg}.
% @doc Append data
-spec append(Pid :: pid(), Data :: binary()) -> ok|{error, term()}.
append(Pid, Data) ->
append(Pid, [], 0, Data).
-spec append(Pid :: pid(), Data :: binary()) -> {ok, File :: string(), Offset :: non_neg_integer()}
|{error, term()}.
append(Pid, Data) when is_pid(Pid) andalso is_binary(Data) ->
append(Pid, [], 0, Data);
append(_Pid, _Data) ->
lager:warning("Bad arguments to append/2"),
{error, bad_arg}.
% @doc Append data to file, supplying client metadata and (if desired) a
% reservation for additional space. ClientMeta is a proplist and expects the
% same keys as write/4.
-spec append(Pid :: pid(), ClientMeta :: proplists:proplist(),
Extra :: non_neg_integer(), Data :: binary()) -> ok|{error, term()}.
append(Pid, ClientMeta, Extra, Data) ->
gen_server:call(Pid, {append, ClientMeta, Extra, Data}, ?TIMEOUT).
Extra :: non_neg_integer(), Data :: binary()) -> {ok, File :: string(), Offset :: non_neg_integer()}
|{error, term()}.
append(Pid, ClientMeta, Extra, Data) when is_pid(Pid) andalso is_list(ClientMeta)
andalso is_integer(Extra) andalso Extra >= 0
andalso is_binary(Data) ->
gen_server:call(Pid, {append, ClientMeta, Extra, Data}, ?TIMEOUT);
append(_Pid, ClientMeta, Extra, _Data) ->
lager:warning("Bad arg to append: ClientMeta ~p, Extra ~p", [ClientMeta, Extra]),
{error, bad_arg}.
%% gen_server callbacks
% @private
init({Filename, DataDir}) ->
CsumFile = machi_util:make_csum_filename(DataDir, Filename),
CsumFile = machi_util:make_checksum_filename(DataDir, Filename),
{_, DPath} = machi_util:make_data_filename(DataDir, Filename),
ok = filelib:ensure_dir(CsumFile),
ok = filelib:ensure_dir(DPath),
UnwrittenBytes = parse_csum_file(CsumFile),
{Eof, infinity} = lists:last(UnwrittenBytes),
{ok, FHd} = file:open(DPath, [read, write, binary, raw]),
{ok, FHc} = file:open(CsumFile, [append, binary, raw]),
Tref = schedule_tick(),
{ok, #state{
St = #state{
filename = Filename,
data_dir = DataDir,
data_path = DPath,
@ -171,9 +208,16 @@ init({Filename, DataDir}) ->
csum_filehandle = FHc,
tref = Tref,
unwritten_bytes = UnwrittenBytes,
eof_position = Eof}}.
eof_position = Eof},
lager:debug("Starting file proxy ~p for filename ~p, state = ~p",
[self(), Filename, St]),
{ok, St}.
% @private
handle_call({stop}, _From, State) ->
lager:debug("Requested to stop."),
{stop, normal, State};
handle_call({sync, data}, _From, State = #state{ data_filehandle = FHd }) ->
R = file:sync(FHd),
{reply, R, State};
@ -216,7 +260,7 @@ handle_call({read, Offset, Length}, _From,
}) when Offset + Length > Eof ->
lager:error("Read request at offset ~p for ~p bytes is past the last write offset of ~p",
[Offset, Length, Eof]),
{reply, {error, not_written}, State = #state{reads = {T + 1, Err + 1}}};
{reply, {error, not_written}, State#state{reads = {T + 1, Err + 1}}};
handle_call({read, Offset, Length}, _From,
State = #state{filename = F,
@ -463,17 +507,22 @@ insert_offsets({Offset, Length, Checksum}) ->
-spec parse_csum_file( Filename :: string() ) -> [byte_sequence()].
parse_csum_file(Filename) ->
%% using file:read_file works as long as the files are "small"
{ok, CsumData} = file:read_file(Filename),
{DecodedCsums, _Junk} = machi_flu1:split_checksum_list_blob_decode(CsumData),
Sort = lists:sort(DecodedCsums),
case Sort of
try
{ok, CsumData} = file:read_file(Filename),
{DecodedCsums, _Junk} = machi_flu1:split_checksum_list_blob_decode(CsumData),
Sort = lists:sort(DecodedCsums),
case Sort of
[] -> [{?MINIMUM_OFFSET, infinity}];
_ ->
map_offsets_to_csums(DecodedCsums),
{First, _, _} = hd(Sort),
build_unwritten_bytes_list(Sort, First, [])
end
catch
_:{badmatch, {error, enoent}} ->
[{?MINIMUM_OFFSET, infinity}]
end.
-spec handle_read(FHd :: file:filehandle(),
Filename :: string(),
TaggedCsum :: undefined|binary(),
@ -511,20 +560,23 @@ handle_read(FHd, Filename, TaggedCsum, Offset, Size, U) ->
do_read(FHd, Filename, TaggedCsum, Offset, Size)
end.
% @private Implements the disk read
do_read(FHd, Filename, TaggedCsum, Offset, Size) ->
case file:pread(FHd, Offset, Size) of
eof ->
eof;
{ok, Bytes} when byte_size(Bytes) == Size ->
{Type, Ck} = machi_util:unmake_tagged_csum(TaggedCsum),
case check_or_make_tagged_csum(Type, Ck, Bytes) of
{Tag, Ck} = machi_util:unmake_tagged_csum(TaggedCsum),
case check_or_make_tagged_csum(Tag, Ck, Bytes) of
{error, Bad} ->
lager:error("Bad checksum; got ~p, expected ~p",
[Bad, Ck]),
{error, bad_csum};
TaggedCsum ->
{ok, Bytes, TaggedCsum}
{ok, Bytes, TaggedCsum};
%% XXX FIXME: Should we return something other than
%% {ok, ....} in this case?
OtherCsum when Tag =:= ?CSUM_TAG_NONE ->
{ok, Bytes, OtherCsum}
end;
{ok, Partial} ->
lager:error("In file ~p, offset ~p, wanted to read ~p bytes, but got ~p",
@ -665,6 +717,8 @@ lookup_unwritten(Offset, Size, [_H|Rest]) ->
%% These are not the droids you're looking for.
lookup_unwritten(Offset, Size, Rest).
%%% if the pos is greater than offset + size then we're done. End early.
-spec update_unwritten( Offset :: non_neg_integer(),
Size :: pos_integer(),
Unwritten :: [byte_sequence()] ) -> NewUnwritten :: [byte_sequence()].

View file

@ -0,0 +1,201 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(machi_file_proxy_eqc).
-ifdef(TEST).
-ifdef(EQC).
-compile(export_all).
-include("machi.hrl").
-include_lib("eqc/include/eqc.hrl").
-include_lib("eqc/include/eqc_statem.hrl").
-include_lib("eunit/include/eunit.hrl").
-define(QC_OUT(P),
eqc:on_output(fun(Str, Args) -> io:format(user, Str, Args) end, P)).
%% EUNIT TEST DEFINITION
eqc_test_() ->
{timeout, 60,
{spawn,
[
{timeout, 30, ?_assertEqual(true, eqc:quickcheck(eqc:testing_time(15, ?QC_OUT(prop_ok()))))}
]
}}.
%% SHELL HELPERS
test() ->
test(100).
test(N) ->
quickcheck(numtests(N, prop_ok())).
check() ->
check(prop_ok(), current_counterexample()).
%% GENERATORS
csum_type() ->
elements([?CSUM_TAG_NONE, ?CSUM_TAG_CLIENT_SHA, ?CSUM_TAG_SERVER_SHA]).
csum(Type, Binary) ->
case Type of
?CSUM_TAG_NONE -> <<>>;
_ -> machi_util:checksum_chunk(Binary)
end.
offset() ->
?SUCHTHAT(X, oneof([largeint(), int()]), X >= 0).
len() ->
?SUCHTHAT(X, oneof([largeint(), int()]), X >= 1).
data_with_csum() ->
?LET({B,T},{eqc_gen:largebinary(), csum_type()}, {B,T, csum(T, B)}).
small_data() ->
?LET(D, ?SUCHTHAT(S, int(), S >= 1 andalso S < 500), binary(D)).
-record(state, {pid, file = 0, written=[]}).
initial_state() -> #state{}.
precondition_common(S, Cmd) ->
S#state.pid /= undefined orelse Cmd == start.
%% check if an operation is permitted based on whether a write has
%% occurred
check_writes([], _Off, _L) ->
false;
check_writes([{Pos, Sz}|_T], Off, L) when Off >= Pos
andalso Off < (Pos + Sz)
andalso L < ( Sz - ( Off - Pos ) )->
true;
check_writes([{Pos, Sz}|_T], Off, _L) when Off > ( Pos + Sz ) ->
false;
check_writes([_H|T], Off, L) ->
check_writes(T, Off, L).
-define(TESTDIR, "./eqc").
cleanup() ->
[begin
Fs = filelib:wildcard(?TESTDIR ++ Glob),
[file:delete(F) || F <- Fs],
[file:del_dir(F) || F <- Fs]
end || Glob <- ["*/*/*/*", "*/*/*", "*/*", "*"] ],
_ = file:del_dir(?TESTDIR),
ok.
%% start
start_pre(S) ->
S#state.pid == undefined.
start_command(S) ->
File = "eqc_data." ++ integer_to_list(S#state.file),
{ok, Pid} = machi_file_proxy:start_link(File, ?TESTDIR),
unlink(Pid),
{ok, Pid}.
start_next(S, {ok, Pid}, _Args) ->
S#state{pid = Pid, file = S#state.file + 1}.
%% read
read_args(S) ->
[S#state.pid, offset(), len()].
read_ok(S, [_Pid, Off, L]) ->
case S#state.written of
[] -> false;
W -> check_writes(W, Off, L)
end.
read_post(S, [_Pid, Off, L], Res) ->
case read_ok(S, [Off, L]) of
true -> eq(Res, {ok, '_', '_'});
false -> eq(Res, {error, '_'})
end.
read_next(S, _Res, _Args) -> S.
read(Pid, Offset, Length) ->
machi_file_proxy:read(Pid, Offset, Length).
%% write
write_args(S) ->
[S#state.pid, offset(), data_with_csum()].
write_ok(S, [_Pid, Off, {Bin, _Tag, _Csum}]) ->
Size = iolist_size(Bin),
case S#state.written of
[] -> false;
W -> check_writes(W, Off, Size)
end.
write_post(S, Args, Res) ->
case write_ok(S, Args) of
true -> eq(Res, ok);
false -> eq(Res, {error, '_'})
end.
write_next(S, ok, [_Pid, Offset, {Bin, _Tag, _Csum}]) ->
S#state{written = lists:sort(S#state.written ++ {Offset, iolist_size(Bin)})};
write_next(S, _Res, _Args) -> S.
write(Pid, Offset, {Bin, Tag, Csum}) ->
Meta = [{client_csum_tag, Tag},
{client_csum, Csum}],
machi_file_proxy:write(Pid, Offset, Meta, Bin).
%% append
%% TODO - ensure offset is expected offset
append_args(S) ->
[S#state.pid, default(0, len()), data_with_csum()].
append(Pid, Extra, {Bin, Tag, Csum}) ->
Meta = [{client_csum_tag, Tag},
{client_csum, Csum}],
machi_file_proxy:write(Pid, Extra, Meta, Bin).
append_next(S, {ok, _File, Offset}, [_Pid, _Extra, {Bin, _Tag, _Csum}]) ->
S#state{written = lists:sort(S#state.written ++ {Offset, iolist_size(Bin)})};
append_next(S, _Res, _Args) -> S.
append_post(_S, _Args, Res) ->
eq(Res, {ok, '_', '_'}).
%% Property
prop_ok() ->
?FORALL(Cmds, commands(?MODULE),
begin
cleanup(),
{H, S, Res} = run_commands(?MODULE, Cmds),
pretty_commands(?MODULE, Cmds, {H, S, Res},
aggregate(command_names(Cmds), Res == ok))
end).
-endif. % EQC
-endif. % TEST

View file

@ -0,0 +1,103 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(machi_file_proxy_test).
-ifdef(TEST).
-compile(export_all).
-include_lib("eunit/include/eunit.hrl").
-include("machi.hrl").
clean_up_data_dir(DataDir) ->
[begin
Fs = filelib:wildcard(DataDir ++ Glob),
[file:delete(F) || F <- Fs],
[file:del_dir(F) || F <- Fs]
end || Glob <- ["*/*/*/*", "*/*/*", "*/*", "*"] ],
_ = file:del_dir(DataDir),
ok.
-ifndef(PULSE).
-define(TESTDIR, "./t").
-define(HYOOGE, 1 * 1024 * 1024 * 1024). % 1 long GB
random_binary_single() ->
%% OK, I guess it's not that random...
<<"Four score and seven years ago our fathers brought forth on this
continent a new nation, conceived in liberty, and dedicated to the
proposition that all men are created equal.
Now we are engaged in a great civil war, testing whether that nation, or any
nation so conceived and so dedicated, can long endure. We are met on a great
battlefield of that war. We have come to dedicate a portion of that field, as a
final resting place for those who here gave their lives that that nation
might live. It is altogether fitting and proper that we should do this.
But, in a larger sense, we can not dedicate, we can not consecrate, we can not
hallow this ground. The brave men, living and dead, who struggled here, have
consecrated it, far above our poor power to add or detract. The world will
little note, nor long remember what we say here, but it can never forget what
they did here. It is for us the living, rather, to be dedicated here to the
unfinished work which they who fought here have thus far so nobly advanced. It
is rather for us to be here dedicated to the great task remaining before us
that from these honored dead we take increased devotion to that cause for which
they gave the last full measure of devotion that we here highly resolve that
these dead shall not have died in vain that this nation, under God, shall have
a new birth of freedom and that government of the people, by the people, for
the people, shall not perish from the earth.">>.
random_binary(Start, End) ->
Size = byte_size(random_binary_single()) - 1,
case End > Size of
true ->
Copies = ( End div Size ) + 1,
D0 = binary:copy(random_binary_single(), Copies),
binary:part(<<D0/binary>>, Start, End);
false ->
binary:part(random_binary_single(), Start, End)
end.
machi_file_proxy_test_() ->
clean_up_data_dir(?TESTDIR),
{ok, Pid} = machi_file_proxy:start_link("test", ?TESTDIR),
[
?_assertEqual({error, bad_arg}, machi_file_proxy:read(Pid, -1, -1)),
?_assertEqual({error, bad_arg}, machi_file_proxy:write(Pid, -1, <<"yo">>)),
?_assertEqual({error, bad_arg}, machi_file_proxy:append(Pid, [], -1, <<"krep">>)),
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1, 1)),
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1024, 1)),
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1, 1024)),
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1024, ?HYOOGE)),
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, ?HYOOGE, 1)),
?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1, random_binary(0, ?HYOOGE))),
?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, random_binary(0, 1024))),
?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1024, <<"fail">>)),
?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1, <<"fail">>)),
?_assertMatch({ok, _, _}, machi_file_proxy:read(Pid, 1025, 1000)),
?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, [], 1024, <<"mind the gap">>)),
?_assertEqual(ok, machi_file_proxy:write(Pid, 2060, [], random_binary(0, 1024))),
?_assertException(exit, {normal, _}, machi_file_proxy:stop(Pid))
].
-endif. % !PULSE
-endif. % TEST.