Skeleton of PULSE test created, first bug (race in sequencer init) is found, huzzah!

This commit is contained in:
Scott Lystig Fritchie 2014-02-17 00:29:41 +09:00
parent feed231d5e
commit a294a0eff0
10 changed files with 792 additions and 128 deletions

View file

@ -21,3 +21,6 @@ test: deps compile eunit
eunit: eunit:
$(REBAR_BIN) -v skip_deps=true eunit $(REBAR_BIN) -v skip_deps=true eunit
pulse: compile
env BITCASK_PULSE=1 $(REBAR_BIN) skip_deps=true clean compile
env BITCASK_PULSE=1 $(REBAR_BIN) skip_deps=true -D PULSE eunit

View file

@ -0,0 +1,54 @@
PulseBuild = case os:getenv("BITCASK_PULSE") of
false ->
false;
_ ->
true
end,
case PulseBuild of
true ->
PulseOpts =
[{pulse_no_side_effect,
[{erlang,display,1}
]},
{pulse_side_effect,
[ {corfurl_sequencer, get, 0}
, {corfurl_flu, write, 4}
, {corfurl_flu, read, 3}
, {corfurl_flu, seal, 2}
, {corfurl_flu, trim, 3}
, {corfurl_flu, fill, 3}
, {event_logger, event, '_'}
, {prim_file, '_', '_'}
, {file, '_', '_'}
, {filelib, '_', '_'}
, {os, '_', '_'} ]},
{pulse_replace_module,
[ {gen_server, pulse_gen_server}
, {application, pulse_application}
, {supervisor, pulse_supervisor} ]}
],
PulseCFlags = [{"CFLAGS", "$CFLAGS -DPULSE"}],
UpdConfig = case lists:keysearch(eunit_compile_opts, 1, CONFIG) of
{value, {eunit_compile_opts, Opts}} ->
lists:keyreplace(eunit_compile_opts,
1,
CONFIG,
{eunit_compile_opts, Opts ++ PulseOpts});
_ ->
[{eunit_compile_opts, PulseOpts} | CONFIG]
end,
case lists:keysearch(port_env, 1, UpdConfig) of
{value, {port_env, PortEnv}} ->
lists:keyreplace(port_env,
1,
UpdConfig,
{port_env, PortEnv ++ PulseCFlags});
_ ->
[{port_env, PulseCFlags} | UpdConfig]
end;
false ->
CONFIG
end.

View file

@ -30,6 +30,9 @@
-ifdef(TEST). -ifdef(TEST).
-compile(export_all). -compile(export_all).
-ifdef(PULSE).
-compile({parse_transform, pulse_instrument}).
-endif.
-endif. -endif.
append_page(Sequencer, P, Page) -> append_page(Sequencer, P, Page) ->

View file

@ -36,8 +36,10 @@
-include("corfurl.hrl"). -include("corfurl.hrl").
-ifdef(TEST). -ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-export([get__mlp/1, get__min_epoch/1, get__trim_watermark/1]). -export([get__mlp/1, get__min_epoch/1, get__trim_watermark/1]).
-ifdef(PULSE).
-compile({parse_transform, pulse_instrument}).
-endif.
-endif. -endif.
-include_lib("kernel/include/file.hrl"). -include_lib("kernel/include/file.hrl").
@ -117,8 +119,14 @@ init({Dir, ExpPageSize, ExpMaxMem}) ->
end end
catch catch
X:Y -> X:Y ->
io:format("init: caught ~p ~p @ ~p\n", if X == error,
[X, Y, erlang:get_stacktrace()]), Y == {case_clause,{error,enoent}} ->
ok;
true ->
%% TODO: log-ify this
io:format("init: caught ~p ~p @ ~p\n",
[X, Y, erlang:get_stacktrace()])
end,
{no_version_number, 0, ExpPageSize, ExpMaxMem, 0} {no_version_number, 0, ExpPageSize, ExpMaxMem, 0}
end, end,
State = #state{dir=Dir, mem_fh=FH, min_epoch=MinEpoch, page_size=PageSize, State = #state{dir=Dir, mem_fh=FH, min_epoch=MinEpoch, page_size=PageSize,
@ -368,107 +376,3 @@ trim_page(Op, LogicalPN, #state{max_mem=MaxMem, mem_fh=FH} = S) ->
true -> true ->
badarg badarg
end. end.
%%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%%
-ifdef(TEST).
startstop_test() ->
Dir = "/tmp/flu." ++ os:getpid(),
{ok, P1} = start_link(Dir),
try
{ok, _} = status(P1),
ok = stop(P1),
{'EXIT', _} = (catch stop(P1)),
{ok, P2} = start_link(Dir),
0 = get__mlp(P2),
0 = get__min_epoch(P2),
ok = stop(P2),
ok
after
ok = corfurl_util:delete_dir(Dir)
end.
basic_test() ->
Dir = "/tmp/flu." ++ os:getpid(),
{ok, P1} = start_link(Dir),
try
Epoch1 = 1,
Epoch2 = 2,
LPN = 1,
Bin1 = <<42:64>>,
Bin2 = <<42042:64>>,
error_unwritten = read(P1, Epoch1, LPN),
error_unwritten = trim(P1, Epoch1, LPN),
error_unwritten = trim(P1, Epoch1, LPN+77),
ok = write(P1, Epoch1, LPN, Bin1),
error_overwritten = write(P1, Epoch1, LPN, Bin1),
error_overwritten = fill(P1, Epoch1, LPN),
LPN = get__mlp(P1),
0 = get__min_epoch(P1),
0 = get__trim_watermark(P1),
{ok, LPN} = seal(P1, Epoch1),
1 = get__min_epoch(P1),
error_overwritten = write(P1, Epoch2, LPN, Bin1),
ok = write(P1, Epoch2, LPN+1, Bin2),
Epoch1 = get__min_epoch(P1),
{ok, Bin1} = read(P1, Epoch1, LPN),
{ok, Bin2} = read(P1, Epoch2, LPN+1),
error_unwritten = read(P1, Epoch2, LPN+2),
badarg = read(P1, Epoch2, 1 bsl 2982),
error_badepoch = seal(P1, Epoch1),
{ok, _} = seal(P1, Epoch2),
error_badepoch = seal(P1, Epoch2),
error_badepoch = read(P1, Epoch1, LPN),
error_badepoch = read(P1, Epoch1, LPN+1),
{ok, Bin1} = read(P1, Epoch2, LPN),
{ok, Bin2} = read(P1, Epoch2, LPN+1),
error_badepoch = trim(P1, Epoch1, LPN+1),
ok = trim(P1, Epoch2, LPN+1),
error_trimmed = trim(P1, Epoch2, LPN+1),
%% Current watermark processing is broken. But we'll test what's
%% there now.
ExpectedWaterFixMe = LPN+1,
ExpectedWaterFixMe = get__trim_watermark(P1),
ok = fill(P1, Epoch2, LPN+3),
error_trimmed = read(P1, Epoch2, LPN+3),
error_trimmed = fill(P1, Epoch2, LPN+3),
error_trimmed = trim(P1, Epoch2, LPN+3),
Epoch2 = get__min_epoch(P1),
ok = stop(P1),
ok
after
ok = corfurl_util:delete_dir(Dir)
end.
seal_persistence_test() ->
Dir = "/tmp/flu." ++ os:getpid(),
{ok, P1} = start_link(Dir),
try
0 = get__min_epoch(P1),
Epoch = 665,
{ok, LPN} = seal(P1, Epoch),
Epoch = get__min_epoch(P1),
ok = stop(P1),
{ok, P2} = start_link(Dir),
Epoch = get__min_epoch(P2),
ok = stop(P2),
ok
after
ok = corfurl_util:delete_dir(Dir)
end.
-endif. % TEST

View file

@ -29,12 +29,16 @@
-ifdef(TEST). -ifdef(TEST).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-ifdef(PULSE).
-compile({parse_transform, pulse_instrument}).
-endif.
-endif. -endif.
-define(SERVER, ?MODULE). -define(SERVER, ?MODULE).
start_link(FLUs) -> start_link(FLUs) ->
gen_server:start_link({local, ?SERVER}, ?MODULE, {FLUs}, []). %% gen_server:start_link({local, ?SERVER}, ?MODULE, {FLUs}, []).
gen_server:start_link(?MODULE, {FLUs}, []).
stop(Pid) -> stop(Pid) ->
gen_server:call(Pid, stop, infinity). gen_server:call(Pid, stop, infinity).
@ -46,6 +50,7 @@ get(Pid, NumPages) ->
init({FLUs}) -> init({FLUs}) ->
MLP = get_max_logical_page(FLUs), MLP = get_max_logical_page(FLUs),
io:format(user, "~s:init: MLP = ~p\n", [?MODULE, MLP]),
{ok, MLP + 1}. {ok, MLP + 1}.
handle_call({get, NumPages}, _From, MLP) -> handle_call({get, NumPages}, _From, MLP) ->
@ -78,6 +83,7 @@ get_max_logical_page(FLUs) ->
%%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%%
-ifdef(TEST). -ifdef(TEST).
-ifndef(PULSE).
smoke_test() -> smoke_test() ->
BaseDir = "/tmp/" ++ atom_to_list(?MODULE) ++ ".", BaseDir = "/tmp/" ++ atom_to_list(?MODULE) ++ ".",
@ -120,4 +126,5 @@ smoke_test() ->
Del() Del()
end. end.
-endif. % not PULSE
-endif. % TEST -endif. % TEST

View file

@ -0,0 +1,134 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(corfurl_flu_test).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-compile(export_all).
-endif.
-include("corfurl.hrl").
-define(M, corfurl_flu).
-ifdef(TEST).
-ifndef(PULSE).
startstop_test() ->
Dir = "/tmp/flu." ++ os:getpid(),
{ok, P1} = ?M:start_link(Dir),
try
{ok, _} = ?M:status(P1),
ok = ?M:stop(P1),
{'EXIT', _} = (catch ?M:stop(P1)),
{ok, P2} = ?M:start_link(Dir),
0 = ?M:get__mlp(P2),
0 = ?M:get__min_epoch(P2),
ok = ?M:stop(P2),
ok
after
ok = corfurl_util:delete_dir(Dir)
end.
basic_test() ->
Dir = "/tmp/flu." ++ os:getpid(),
{ok, P1} = ?M:start_link(Dir),
try
Epoch1 = 1,
Epoch2 = 2,
LPN = 1,
Bin1 = <<42:64>>,
Bin2 = <<42042:64>>,
error_unwritten = ?M:read(P1, Epoch1, LPN),
error_unwritten = ?M:trim(P1, Epoch1, LPN),
error_unwritten = ?M:trim(P1, Epoch1, LPN+77),
ok = ?M:write(P1, Epoch1, LPN, Bin1),
error_overwritten = ?M:write(P1, Epoch1, LPN, Bin1),
error_overwritten = ?M:fill(P1, Epoch1, LPN),
LPN = ?M:get__mlp(P1),
0 = ?M:get__min_epoch(P1),
0 = ?M:get__trim_watermark(P1),
{ok, LPN} = ?M:seal(P1, Epoch1),
1 = ?M:get__min_epoch(P1),
error_overwritten = ?M:write(P1, Epoch2, LPN, Bin1),
ok = ?M:write(P1, Epoch2, LPN+1, Bin2),
Epoch1 = ?M:get__min_epoch(P1),
{ok, Bin1} = ?M:read(P1, Epoch1, LPN),
{ok, Bin2} = ?M:read(P1, Epoch2, LPN+1),
error_unwritten = ?M:read(P1, Epoch2, LPN+2),
badarg = ?M:read(P1, Epoch2, 1 bsl 2982),
error_badepoch = ?M:seal(P1, Epoch1),
{ok, _} = ?M:seal(P1, Epoch2),
error_badepoch = ?M:seal(P1, Epoch2),
error_badepoch = ?M:read(P1, Epoch1, LPN),
error_badepoch = ?M:read(P1, Epoch1, LPN+1),
{ok, Bin1} = ?M:read(P1, Epoch2, LPN),
{ok, Bin2} = ?M:read(P1, Epoch2, LPN+1),
error_badepoch = ?M:trim(P1, Epoch1, LPN+1),
ok = ?M:trim(P1, Epoch2, LPN+1),
error_trimmed = ?M:trim(P1, Epoch2, LPN+1),
%% Current watermark processing is broken. But we'll test what's
%% there now.
ExpectedWaterFixMe = LPN+1,
ExpectedWaterFixMe = ?M:get__trim_watermark(P1),
ok = ?M:fill(P1, Epoch2, LPN+3),
error_trimmed = ?M:read(P1, Epoch2, LPN+3),
error_trimmed = ?M:fill(P1, Epoch2, LPN+3),
error_trimmed = ?M:trim(P1, Epoch2, LPN+3),
Epoch2 = ?M:get__min_epoch(P1),
ok = ?M:stop(P1),
ok
after
ok = corfurl_util:delete_dir(Dir)
end.
seal_persistence_test() ->
Dir = "/tmp/flu." ++ os:getpid(),
{ok, P1} = ?M:start_link(Dir),
try
0 = ?M:get__min_epoch(P1),
Epoch = 665,
{ok, LPN} = ?M:seal(P1, Epoch),
Epoch = ?M:get__min_epoch(P1),
ok = ?M:stop(P1),
{ok, P2} = ?M:start_link(Dir),
Epoch = ?M:get__min_epoch(P2),
ok = ?M:stop(P2),
ok
after
ok = corfurl_util:delete_dir(Dir)
end.
-endif. % not PULSE
-endif. % TEST

View file

@ -0,0 +1,274 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(corfurl_pulse).
-ifdef(TEST).
-ifdef(PULSE).
-compile(export_all).
-include_lib("eqc/include/eqc.hrl").
-include_lib("eqc/include/eqc_statem.hrl").
-include("corfurl.hrl").
-include_lib("eunit/include/eunit.hrl").
-compile({parse_transform, pulse_instrument}).
%% -compile({pulse_replace_module,
%% [{application, pulse_application}]}).
-compile({pulse_skip,[{prop_pulse_test_,0},{really_delete_bitcask,0},{copy_bitcask_app,0}]}).
-compile({pulse_no_side_effect,[{file,'_','_'}, {erlang, now, 0}]}).
%% Used for output within EUnit...
-define(QC_FMT(Fmt, Args),
io:format(user, Fmt, Args)).
%% And to force EUnit to output QuickCheck output...
-define(QC_OUT(P),
eqc:on_output(fun(Str, Args) -> ?QC_FMT(Str, Args) end, P)).
-record(run, {
seq, % Sequencer
proj, % Projection
flus % List of FLUs
}).
-record(state, {
is_setup = false :: boolean(),
num_chains = 0 :: integer(),
chain_len = 0 :: integer(),
page_size = 0 :: integer(),
run :: #run{}
}).
initial_state() ->
#state{}.
gen_page(PageSize) ->
binary(PageSize).
command(#state{run=Run} = S) ->
?LET({NumChains, ChainLen, PageSize},
{parameter(num_chains), parameter(chain_len), parameter(page_size)},
frequency(
[{10, {call, ?MODULE, setup, [NumChains, ChainLen, PageSize]}}
|| not S#state.is_setup] ++
[{10, {call, ?MODULE, append, [Run, gen_page(PageSize)]}}
|| S#state.is_setup] ++
[])).
%% Precondition, checked before a command is added to the command sequence.
precondition(S, {call, _, setup, _}) ->
not S#state.is_setup;
precondition(S, {call, _, _, _}) ->
S#state.is_setup.
%% Next state transformation, S is the current state and V is the result of the
%% command.
next_state(S, Res, {call, _, setup, [NumChains, ChainLen, PageSize]}) ->
S#state{is_setup=true,
num_chains=NumChains,
chain_len=ChainLen,
page_size=PageSize,
run=Res};
next_state(S, _, {call, _, append, _}) ->
S.
eqeq(X, X) -> true;
eqeq(X, Y) -> {X, '/=', Y}.
postcondition(_S, {call, _, setup, _}, #run{} = _V) ->
true;
postcondition(_S, {call, _, append, _}, {ok, LPN}) when is_integer(LPN) ->
true;
postcondition(_S, {call, _, append, _}, V) ->
eqeq(V, todoTODO_fixit).
run_commands_on_node(_LocalOrSlave, Cmds, Seed) ->
%% AfterTime = if LocalOrSlave == local -> 50000;
%% LocalOrSlave == slave -> 1000000
%% end,
event_logger:start_link(),
pulse:start(),
error_logger:tty(false),
error_logger:add_report_handler(handle_errors),
event_logger:start_logging(),
X =
try
{H, S, Res, Trace} = pulse:run(fun() ->
%% application:start(my_test_app),
%% receive after AfterTime -> ok end,
{H, S, R} = run_parallel_commands(?MODULE, Cmds),
%% io:format(user, "Yooo: H = ~p\n", [H]),
%% io:format(user, "Yooo: S = ~p\n", [S]),
%% io:format(user, "Yooo: R = ~p\n", [R]),
%% receive after AfterTime -> ok end,
Trace = event_logger:get_events(),
%% receive after AfterTime -> ok end,
catch exit(pulse_application_controller, shutdown),
{H, S, R, Trace}
end, [{seed, Seed},
{strategy, unfair}]),
Schedule = pulse:get_schedule(),
Errors = gen_event:call(error_logger, handle_errors, get_errors, 60*1000),
[clean_up_runtime(S) || S#state.run /= undefined],
{H, S, Res, Trace, Schedule, Errors}
catch
_:Err ->
{'EXIT', Err}
end,
X.
prop_pulse() ->
prop_pulse(local).
prop_pulse(LocalOrSlave) ->
?FORALL({NumChains, ChainLen, PageSize},
{choose(1, 3), choose(1, 3), choose(1, 16)},
begin
P = ?FORALL({Cmds, Seed},
{with_parameters([{num_chains, NumChains},
{chain_len, ChainLen},
{page_size, PageSize}], parallel_commands(?MODULE)),
pulse:seed()},
begin
case run_commands_on_node(LocalOrSlave, Cmds, Seed) of
{'EXIT', Err} ->
equals({'EXIT', Err}, ok);
{_H, S, Res, Trace, Schedule, Errors} ->
CheckTrace = check_trace(Trace, Cmds, Seed),
?WHENFAIL(
?QC_FMT("\nState: ~p\n", [S]),
measure(schedule, length(Schedule),
conjunction(
[{simple_result, equals(Res, ok)},
{errors, equals(Errors, [])},
{events, CheckTrace} ])))
end
end),
P
end).
prop_pulse_test_() ->
Timeout = case os:getenv("PULSE_TIME") of
false -> 60;
Val -> list_to_integer(Val)
end,
ExtraTO = case os:getenv("PULSE_SHRINK_TIME") of
false -> 0;
Val2 -> list_to_integer(Val2)
end,
io:format(user, "prop_pulse_test time: ~p + ~p seconds\n",
[Timeout, ExtraTO]),
{timeout, (Timeout+ExtraTO) + 60,
fun() ->
?assert(eqc:quickcheck(eqc:testing_time(Timeout,?QC_OUT(prop_pulse()))))
end}.
check_trace(Trace, _Cmds, _Seed) ->
%% TODO: yeah
Results = [X || {_TS, {result, _Pid, X}} <- Trace],
lists:sort(Results) == lists:usort(Results).
%% Presenting command data statistics in a nicer way
command_data({set, _, {call, _, Fun, _}}, {_S, _V}) ->
Fun.
%% Convenience functions for running tests
test() ->
test({20, sec}).
test(N) when is_integer(N) ->
quickcheck(numtests(N, prop_pulse()));
test({Time, sec}) ->
quickcheck(eqc:testing_time(Time, prop_pulse()));
test({Time, min}) ->
test({Time * 60, sec});
test({Time, h}) ->
test({Time * 60, min}).
check() ->
check(current_counterexample()).
verbose() ->
verbose(current_counterexample()).
verbose(CE) ->
erlang:put(verbose, true),
Ok = check(CE),
erlang:put(verbose, false),
Ok.
check(CE) ->
check(on_output(fun("OK" ++ _, []) -> ok; (Fmt, Args) -> io:format(Fmt, Args) end,
prop_pulse(true == erlang:get(verbose))),
CE).
recheck() ->
recheck(prop_pulse()).
zipwith(F, [X|Xs], [Y|Ys]) ->
[F(X, Y)|zipwith(F, Xs, Ys)];
zipwith(_, _, _) -> [].
clean_up_runtime(#state{run=R} = _S) ->
%% io:format(user, "clean_up_runtime: run = ~p\n", [R]),
catch corfurl_sequencer:stop(R#run.seq),
[catch corfurl_flu:stop(F) || F <- R#run.flus],
corfurl_test:setup_del_all(length(R#run.flus)).
make_chains(ChainLen, FLUs) ->
make_chains(ChainLen, FLUs, [], []).
make_chains(_ChainLen, [], SmallAcc, BigAcc) ->
lists:reverse([SmallAcc|BigAcc]);
make_chains(ChainLen, [H|T], SmallAcc, BigAcc) ->
if length(SmallAcc) == ChainLen ->
make_chains(ChainLen, T, [H], [SmallAcc|BigAcc]);
true ->
make_chains(ChainLen, T, [H|SmallAcc], BigAcc)
end.
setup(NumChains, ChainLen, PageSize) ->
N = NumChains * ChainLen,
FLUs = corfurl_test:setup_basic_flus(N, PageSize, 50000),
{ok, Seq} = corfurl_sequencer:start_link(FLUs),
Chains = make_chains(ChainLen, FLUs),
%% io:format(user, "Cs = ~p\n", [Chains]),
Proj = corfurl:new_simple_projection(1, 1, 50000, Chains),
#run{seq=Seq, proj=Proj, flus=FLUs}.
-define(LOG(Tag, MkCall),
event_logger:event({call, self(), Tag}),
__Result = MkCall,
event_logger:event({result, self(), __Result}),
__Result).
append(#run{seq=Seq,proj=Proj}, Page) ->
?LOG({append, Page},
corfurl:append_page(Seq, Proj, Page)).
-endif. % PULSE
-endif. % TEST

View file

@ -23,15 +23,33 @@
-include("corfurl.hrl"). -include("corfurl.hrl").
-ifdef(TEST). -ifdef(TEST).
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-compile(export_all). -compile(export_all).
-endif.
-define(M, corfurl). -define(M, corfurl).
%%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%%
-ifdef(TEST).
setup_flu_basedir() ->
"/tmp/" ++ atom_to_list(?MODULE) ++ ".".
setup_flu_dir(N) ->
setup_flu_basedir() ++ integer_to_list(N).
setup_del_all(NumFLUs) ->
[ok = corfurl_util:delete_dir(setup_flu_dir(N)) ||
N <- lists:seq(1, NumFLUs)].
setup_basic_flus(NumFLUs, PageSize, NumPages) ->
setup_del_all(NumFLUs),
[begin
element(2, corfurl_flu:start_link(setup_flu_dir(X),
PageSize, NumPages * (PageSize * ?PAGE_OVERHEAD)))
end || X <- lists:seq(1, NumFLUs)].
-ifndef(PULSE).
save_read_test() -> save_read_test() ->
Dir = "/tmp/" ++ atom_to_list(?MODULE) ++".save-read", Dir = "/tmp/" ++ atom_to_list(?MODULE) ++".save-read",
@ -51,23 +69,6 @@ save_read_test() ->
ok = corfurl_util:delete_dir(Dir) ok = corfurl_util:delete_dir(Dir)
end. end.
setup_flu_basedir() ->
"/tmp/" ++ atom_to_list(?MODULE) ++ ".".
setup_flu_dir(N) ->
setup_flu_basedir() ++ integer_to_list(N).
setup_del_all(NumFLUs) ->
[ok = corfurl_util:delete_dir(setup_flu_dir(N)) ||
N <- lists:seq(1, NumFLUs)].
setup_basic_flus(NumFLUs, PageSize, NumPages) ->
setup_del_all(NumFLUs),
[begin
element(2, corfurl_flu:start_link(setup_flu_dir(X),
PageSize, NumPages * (PageSize * ?PAGE_OVERHEAD)))
end || X <- lists:seq(1, NumFLUs)].
smoke1_test() -> smoke1_test() ->
NumFLUs = 6, NumFLUs = 6,
PageSize = 8, PageSize = 8,
@ -211,5 +212,5 @@ forfun(NumProcs) ->
end. end.
-endif. % TIMING_TEST -endif. % TIMING_TEST
-endif. % not PULSE
-endif. % TEST -endif. % TEST

View file

@ -0,0 +1,131 @@
%%% File : handle_errors.erl
%%% Author : Ulf Norell
%%% Description :
%%% Created : 26 Mar 2012 by Ulf Norell
-module(event_logger).
-compile(export_all).
-behaviour(gen_server).
%% API
-export([start_link/0, event/1, get_events/0, start_logging/0]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
-record(state, { start_time, events = [] }).
-record(event, { timestamp, data }).
%%====================================================================
%% API
%%====================================================================
%%--------------------------------------------------------------------
%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
%% Description: Starts the server
%%--------------------------------------------------------------------
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
start_logging() ->
gen_server:call(?MODULE, {start, timestamp()}).
event(EventData) ->
gen_server:call(?MODULE,
#event{ timestamp = timestamp(), data = EventData }).
async_event(EventData) ->
gen_server:cast(?MODULE,
#event{ timestamp = timestamp(), data = EventData }).
get_events() ->
gen_server:call(?MODULE, get_events).
%%====================================================================
%% gen_server callbacks
%%====================================================================
%%--------------------------------------------------------------------
%% Function: init(Args) -> {ok, State} |
%% {ok, State, Timeout} |
%% ignore |
%% {stop, Reason}
%% Description: Initiates the server
%%--------------------------------------------------------------------
init([]) ->
{ok, #state{}}.
%%--------------------------------------------------------------------
%% Function: %% handle_call(Request, From, State) ->
%% {reply, Reply, State} |
%% {reply, Reply, State, Timeout} |
%% {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, Reply, State} |
%% {stop, Reason, State}
%% Description: Handling call messages
%%--------------------------------------------------------------------
handle_call(Event = #event{}, _From, State) ->
{reply, ok, add_event(Event, State)};
handle_call({start, Now}, _From, S) ->
{reply, ok, S#state{ events = [], start_time = Now }};
handle_call(get_events, _From, S) ->
{reply, lists:reverse([ {E#event.timestamp, E#event.data} || E <- S#state.events]),
S#state{ events = [] }};
handle_call(Request, _From, State) ->
{reply, {error, {bad_call, Request}}, State}.
%%--------------------------------------------------------------------
%% Function: handle_cast(Msg, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% Description: Handling cast messages
%%--------------------------------------------------------------------
handle_cast(Event = #event{}, State) ->
{noreply, add_event(Event, State)};
handle_cast(_Msg, State) ->
{noreply, State}.
%%--------------------------------------------------------------------
%% Function: handle_info(Info, State) -> {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State}
%% Description: Handling all non call/cast messages
%%--------------------------------------------------------------------
handle_info(_Info, State) ->
{noreply, State}.
%%--------------------------------------------------------------------
%% Function: terminate(Reason, State) -> void()
%% Description: This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any necessary
%% cleaning up. When it returns, the gen_server terminates with Reason.
%% The return value is ignored.
%%--------------------------------------------------------------------
terminate(_Reason, _State) ->
ok.
%%--------------------------------------------------------------------
%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
%% Description: Convert process state when code is changed
%%--------------------------------------------------------------------
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------
add_event(#event{timestamp = Now, data = Data}, State) ->
Event = #event{ timestamp = Now - State#state.start_time, data = Data },
State#state{ events = [Event|State#state.events] }.
timestamp() ->
{A, B, C} = erlang:now(),
1000000 * (1000000 * A + B) + C.

View file

@ -0,0 +1,153 @@
%%%-------------------------------------------------------------------
%%% @author Hans Svensson <>
%%% @copyright (C) 2012, Hans Svensson
%%% @doc
%%%
%%% @end
%%% Created : 19 Mar 2012 by Hans Svensson <>
%%%-------------------------------------------------------------------
-module(handle_errors).
-behaviour(gen_event).
%% API
-export([start_link/0, add_handler/0]).
%% gen_event callbacks
-export([init/1, handle_event/2, handle_call/2,
handle_info/2, terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
-record(state, { errors = [] }).
%%%===================================================================
%%% gen_event callbacks
%%%===================================================================
%%--------------------------------------------------------------------
%% @doc
%% Creates an event manager
%%
%% @spec start_link() -> {ok, Pid} | {error, Error}
%% @end
%%--------------------------------------------------------------------
start_link() ->
gen_event:start_link({local, ?SERVER}).
%%--------------------------------------------------------------------
%% @doc
%% Adds an event handler
%%
%% @spec add_handler() -> ok | {'EXIT', Reason} | term()
%% @end
%%--------------------------------------------------------------------
add_handler() ->
gen_event:add_handler(?SERVER, ?MODULE, []).
%%%===================================================================
%%% gen_event callbacks
%%%===================================================================
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Whenever a new event handler is added to an event manager,
%% this function is called to initialize the event handler.
%%
%% @spec init(Args) -> {ok, State}
%% @end
%%--------------------------------------------------------------------
init([]) ->
{ok, #state{}}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Whenever an event manager receives an event sent using
%% gen_event:notify/2 or gen_event:sync_notify/2, this function is
%% called for each installed event handler to handle the event.
%%
%% @spec handle_event(Event, State) ->
%% {ok, State} |
%% {swap_handler, Args1, State1, Mod2, Args2} |
%% remove_handler
%% @end
%%--------------------------------------------------------------------
handle_event({error, _, {_, "Hintfile '~s' has bad CRC" ++ _, _}}, State) ->
{ok, State};
handle_event({error, _, {_, "** Generic server" ++ _, _}}, State) ->
{ok, State};
handle_event({error, _, {_, "Failed to merge ~p: ~p\n", [_, not_ready]}}, State) ->
{ok, State};
handle_event({error, _, {_, "Failed to merge ~p: ~p\n", [_, {merge_locked, _, _}]}}, State) ->
{ok, State};
handle_event({error, _, {_, "Failed to read lock data from ~s: ~p\n", [_, {invalid_data, <<>>}]}}, State) ->
{ok, State};
handle_event({error, _, Event}, State) ->
{ok, State#state{ errors = [Event|State#state.errors] }};
handle_event(_Event, State) ->
{ok, State}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Whenever an event manager receives a request sent using
%% gen_event:call/3,4, this function is called for the specified
%% event handler to handle the request.
%%
%% @spec handle_call(Request, State) ->
%% {ok, Reply, State} |
%% {swap_handler, Reply, Args1, State1, Mod2, Args2} |
%% {remove_handler, Reply}
%% @end
%%--------------------------------------------------------------------
handle_call(get_errors, S) ->
{ok, S#state.errors, S#state{ errors = [] }};
handle_call(_Request, State) ->
Reply = ok,
{ok, Reply, State}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% This function is called for each installed event handler when
%% an event manager receives any other message than an event or a
%% synchronous request (or a system message).
%%
%% @spec handle_info(Info, State) ->
%% {ok, State} |
%% {swap_handler, Args1, State1, Mod2, Args2} |
%% remove_handler
%% @end
%%--------------------------------------------------------------------
handle_info(_Info, State) ->
{ok, State}.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Whenever an event handler is deleted from an event manager, this
%% function is called. It should be the opposite of Module:init/1 and
%% do any necessary cleaning up.
%%
%% @spec terminate(Reason, State) -> void()
%% @end
%%--------------------------------------------------------------------
terminate(_Reason, _State) ->
ok.
%%--------------------------------------------------------------------
%% @private
%% @doc
%% Convert process state when code is changed
%%
%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState}
%% @end
%%--------------------------------------------------------------------
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================