diff --git a/prototype/corfurl/Makefile b/prototype/corfurl/Makefile index ef51767..5a67094 100644 --- a/prototype/corfurl/Makefile +++ b/prototype/corfurl/Makefile @@ -21,3 +21,6 @@ test: deps compile eunit eunit: $(REBAR_BIN) -v skip_deps=true eunit +pulse: compile + env BITCASK_PULSE=1 $(REBAR_BIN) skip_deps=true clean compile + env BITCASK_PULSE=1 $(REBAR_BIN) skip_deps=true -D PULSE eunit diff --git a/prototype/corfurl/rebar.config.script b/prototype/corfurl/rebar.config.script new file mode 100644 index 0000000..2155bb5 --- /dev/null +++ b/prototype/corfurl/rebar.config.script @@ -0,0 +1,54 @@ +PulseBuild = case os:getenv("BITCASK_PULSE") of + false -> + false; + _ -> + true + end, +case PulseBuild of + true -> + PulseOpts = + [{pulse_no_side_effect, + [{erlang,display,1} + ]}, + {pulse_side_effect, + [ {corfurl_sequencer, get, 0} + , {corfurl_flu, write, 4} + , {corfurl_flu, read, 3} + , {corfurl_flu, seal, 2} + , {corfurl_flu, trim, 3} + , {corfurl_flu, fill, 3} + + , {event_logger, event, '_'} + + , {prim_file, '_', '_'} + , {file, '_', '_'} + , {filelib, '_', '_'} + , {os, '_', '_'} ]}, + + {pulse_replace_module, + [ {gen_server, pulse_gen_server} + , {application, pulse_application} + , {supervisor, pulse_supervisor} ]} + ], + PulseCFlags = [{"CFLAGS", "$CFLAGS -DPULSE"}], + UpdConfig = case lists:keysearch(eunit_compile_opts, 1, CONFIG) of + {value, {eunit_compile_opts, Opts}} -> + lists:keyreplace(eunit_compile_opts, + 1, + CONFIG, + {eunit_compile_opts, Opts ++ PulseOpts}); + _ -> + [{eunit_compile_opts, PulseOpts} | CONFIG] + end, + case lists:keysearch(port_env, 1, UpdConfig) of + {value, {port_env, PortEnv}} -> + lists:keyreplace(port_env, + 1, + UpdConfig, + {port_env, PortEnv ++ PulseCFlags}); + _ -> + [{port_env, PulseCFlags} | UpdConfig] + end; + false -> + CONFIG +end. diff --git a/prototype/corfurl/src/corfurl.erl b/prototype/corfurl/src/corfurl.erl index c242875..bbf84c2 100644 --- a/prototype/corfurl/src/corfurl.erl +++ b/prototype/corfurl/src/corfurl.erl @@ -30,6 +30,9 @@ -ifdef(TEST). -compile(export_all). +-ifdef(PULSE). +-compile({parse_transform, pulse_instrument}). +-endif. -endif. append_page(Sequencer, P, Page) -> diff --git a/prototype/corfurl/src/corfurl_flu.erl b/prototype/corfurl/src/corfurl_flu.erl index 01e3d06..4c02531 100644 --- a/prototype/corfurl/src/corfurl_flu.erl +++ b/prototype/corfurl/src/corfurl_flu.erl @@ -36,8 +36,10 @@ -include("corfurl.hrl"). -ifdef(TEST). --include_lib("eunit/include/eunit.hrl"). -export([get__mlp/1, get__min_epoch/1, get__trim_watermark/1]). +-ifdef(PULSE). +-compile({parse_transform, pulse_instrument}). +-endif. -endif. -include_lib("kernel/include/file.hrl"). @@ -117,8 +119,14 @@ init({Dir, ExpPageSize, ExpMaxMem}) -> end catch X:Y -> - io:format("init: caught ~p ~p @ ~p\n", - [X, Y, erlang:get_stacktrace()]), + if X == error, + Y == {case_clause,{error,enoent}} -> + ok; + true -> + %% TODO: log-ify this + io:format("init: caught ~p ~p @ ~p\n", + [X, Y, erlang:get_stacktrace()]) + end, {no_version_number, 0, ExpPageSize, ExpMaxMem, 0} end, State = #state{dir=Dir, mem_fh=FH, min_epoch=MinEpoch, page_size=PageSize, @@ -368,107 +376,3 @@ trim_page(Op, LogicalPN, #state{max_mem=MaxMem, mem_fh=FH} = S) -> true -> badarg end. - -%%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% - --ifdef(TEST). - -startstop_test() -> - Dir = "/tmp/flu." ++ os:getpid(), - {ok, P1} = start_link(Dir), - try - {ok, _} = status(P1), - ok = stop(P1), - {'EXIT', _} = (catch stop(P1)), - - {ok, P2} = start_link(Dir), - 0 = get__mlp(P2), - 0 = get__min_epoch(P2), - ok = stop(P2), - - ok - after - ok = corfurl_util:delete_dir(Dir) - end. - -basic_test() -> - Dir = "/tmp/flu." ++ os:getpid(), - {ok, P1} = start_link(Dir), - try - Epoch1 = 1, - Epoch2 = 2, - LPN = 1, - Bin1 = <<42:64>>, - Bin2 = <<42042:64>>, - - error_unwritten = read(P1, Epoch1, LPN), - error_unwritten = trim(P1, Epoch1, LPN), - error_unwritten = trim(P1, Epoch1, LPN+77), - - ok = write(P1, Epoch1, LPN, Bin1), - error_overwritten = write(P1, Epoch1, LPN, Bin1), - error_overwritten = fill(P1, Epoch1, LPN), - LPN = get__mlp(P1), - 0 = get__min_epoch(P1), - 0 = get__trim_watermark(P1), - {ok, LPN} = seal(P1, Epoch1), - 1 = get__min_epoch(P1), - - error_overwritten = write(P1, Epoch2, LPN, Bin1), - ok = write(P1, Epoch2, LPN+1, Bin2), - Epoch1 = get__min_epoch(P1), - - {ok, Bin1} = read(P1, Epoch1, LPN), - {ok, Bin2} = read(P1, Epoch2, LPN+1), - error_unwritten = read(P1, Epoch2, LPN+2), - badarg = read(P1, Epoch2, 1 bsl 2982), - - error_badepoch = seal(P1, Epoch1), - {ok, _} = seal(P1, Epoch2), - error_badepoch = seal(P1, Epoch2), - - error_badepoch = read(P1, Epoch1, LPN), - error_badepoch = read(P1, Epoch1, LPN+1), - {ok, Bin1} = read(P1, Epoch2, LPN), - {ok, Bin2} = read(P1, Epoch2, LPN+1), - - error_badepoch = trim(P1, Epoch1, LPN+1), - ok = trim(P1, Epoch2, LPN+1), - error_trimmed = trim(P1, Epoch2, LPN+1), - %% Current watermark processing is broken. But we'll test what's - %% there now. - ExpectedWaterFixMe = LPN+1, - ExpectedWaterFixMe = get__trim_watermark(P1), - - ok = fill(P1, Epoch2, LPN+3), - error_trimmed = read(P1, Epoch2, LPN+3), - error_trimmed = fill(P1, Epoch2, LPN+3), - error_trimmed = trim(P1, Epoch2, LPN+3), - - Epoch2 = get__min_epoch(P1), - ok = stop(P1), - ok - after - ok = corfurl_util:delete_dir(Dir) - end. - -seal_persistence_test() -> - Dir = "/tmp/flu." ++ os:getpid(), - {ok, P1} = start_link(Dir), - try - 0 = get__min_epoch(P1), - Epoch = 665, - {ok, LPN} = seal(P1, Epoch), - Epoch = get__min_epoch(P1), - ok = stop(P1), - - {ok, P2} = start_link(Dir), - Epoch = get__min_epoch(P2), - - ok = stop(P2), - ok - after - ok = corfurl_util:delete_dir(Dir) - end. - --endif. % TEST diff --git a/prototype/corfurl/src/corfurl_sequencer.erl b/prototype/corfurl/src/corfurl_sequencer.erl index 92426d0..a4bee05 100644 --- a/prototype/corfurl/src/corfurl_sequencer.erl +++ b/prototype/corfurl/src/corfurl_sequencer.erl @@ -29,12 +29,16 @@ -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). +-ifdef(PULSE). +-compile({parse_transform, pulse_instrument}). +-endif. -endif. -define(SERVER, ?MODULE). start_link(FLUs) -> - gen_server:start_link({local, ?SERVER}, ?MODULE, {FLUs}, []). + %% gen_server:start_link({local, ?SERVER}, ?MODULE, {FLUs}, []). + gen_server:start_link(?MODULE, {FLUs}, []). stop(Pid) -> gen_server:call(Pid, stop, infinity). @@ -46,6 +50,7 @@ get(Pid, NumPages) -> init({FLUs}) -> MLP = get_max_logical_page(FLUs), + io:format(user, "~s:init: MLP = ~p\n", [?MODULE, MLP]), {ok, MLP + 1}. handle_call({get, NumPages}, _From, MLP) -> @@ -78,6 +83,7 @@ get_max_logical_page(FLUs) -> %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% -ifdef(TEST). +-ifndef(PULSE). smoke_test() -> BaseDir = "/tmp/" ++ atom_to_list(?MODULE) ++ ".", @@ -120,4 +126,5 @@ smoke_test() -> Del() end. +-endif. % not PULSE -endif. % TEST diff --git a/prototype/corfurl/test/corfurl_flu_test.erl b/prototype/corfurl/test/corfurl_flu_test.erl new file mode 100644 index 0000000..21d0f15 --- /dev/null +++ b/prototype/corfurl/test/corfurl_flu_test.erl @@ -0,0 +1,134 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(corfurl_flu_test). + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). +-compile(export_all). +-endif. + +-include("corfurl.hrl"). + +-define(M, corfurl_flu). + +-ifdef(TEST). +-ifndef(PULSE). + +startstop_test() -> + Dir = "/tmp/flu." ++ os:getpid(), + {ok, P1} = ?M:start_link(Dir), + try + {ok, _} = ?M:status(P1), + ok = ?M:stop(P1), + {'EXIT', _} = (catch ?M:stop(P1)), + + {ok, P2} = ?M:start_link(Dir), + 0 = ?M:get__mlp(P2), + 0 = ?M:get__min_epoch(P2), + ok = ?M:stop(P2), + + ok + after + ok = corfurl_util:delete_dir(Dir) + end. + +basic_test() -> + Dir = "/tmp/flu." ++ os:getpid(), + {ok, P1} = ?M:start_link(Dir), + try + Epoch1 = 1, + Epoch2 = 2, + LPN = 1, + Bin1 = <<42:64>>, + Bin2 = <<42042:64>>, + + error_unwritten = ?M:read(P1, Epoch1, LPN), + error_unwritten = ?M:trim(P1, Epoch1, LPN), + error_unwritten = ?M:trim(P1, Epoch1, LPN+77), + + ok = ?M:write(P1, Epoch1, LPN, Bin1), + error_overwritten = ?M:write(P1, Epoch1, LPN, Bin1), + error_overwritten = ?M:fill(P1, Epoch1, LPN), + LPN = ?M:get__mlp(P1), + 0 = ?M:get__min_epoch(P1), + 0 = ?M:get__trim_watermark(P1), + {ok, LPN} = ?M:seal(P1, Epoch1), + 1 = ?M:get__min_epoch(P1), + + error_overwritten = ?M:write(P1, Epoch2, LPN, Bin1), + ok = ?M:write(P1, Epoch2, LPN+1, Bin2), + Epoch1 = ?M:get__min_epoch(P1), + + {ok, Bin1} = ?M:read(P1, Epoch1, LPN), + {ok, Bin2} = ?M:read(P1, Epoch2, LPN+1), + error_unwritten = ?M:read(P1, Epoch2, LPN+2), + badarg = ?M:read(P1, Epoch2, 1 bsl 2982), + + error_badepoch = ?M:seal(P1, Epoch1), + {ok, _} = ?M:seal(P1, Epoch2), + error_badepoch = ?M:seal(P1, Epoch2), + + error_badepoch = ?M:read(P1, Epoch1, LPN), + error_badepoch = ?M:read(P1, Epoch1, LPN+1), + {ok, Bin1} = ?M:read(P1, Epoch2, LPN), + {ok, Bin2} = ?M:read(P1, Epoch2, LPN+1), + + error_badepoch = ?M:trim(P1, Epoch1, LPN+1), + ok = ?M:trim(P1, Epoch2, LPN+1), + error_trimmed = ?M:trim(P1, Epoch2, LPN+1), + %% Current watermark processing is broken. But we'll test what's + %% there now. + ExpectedWaterFixMe = LPN+1, + ExpectedWaterFixMe = ?M:get__trim_watermark(P1), + + ok = ?M:fill(P1, Epoch2, LPN+3), + error_trimmed = ?M:read(P1, Epoch2, LPN+3), + error_trimmed = ?M:fill(P1, Epoch2, LPN+3), + error_trimmed = ?M:trim(P1, Epoch2, LPN+3), + + Epoch2 = ?M:get__min_epoch(P1), + ok = ?M:stop(P1), + ok + after + ok = corfurl_util:delete_dir(Dir) + end. + +seal_persistence_test() -> + Dir = "/tmp/flu." ++ os:getpid(), + {ok, P1} = ?M:start_link(Dir), + try + 0 = ?M:get__min_epoch(P1), + Epoch = 665, + {ok, LPN} = ?M:seal(P1, Epoch), + Epoch = ?M:get__min_epoch(P1), + ok = ?M:stop(P1), + + {ok, P2} = ?M:start_link(Dir), + Epoch = ?M:get__min_epoch(P2), + + ok = ?M:stop(P2), + ok + after + ok = corfurl_util:delete_dir(Dir) + end. + +-endif. % not PULSE +-endif. % TEST diff --git a/prototype/corfurl/test/corfurl_pulse.erl b/prototype/corfurl/test/corfurl_pulse.erl new file mode 100644 index 0000000..7e8ba55 --- /dev/null +++ b/prototype/corfurl/test/corfurl_pulse.erl @@ -0,0 +1,274 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(corfurl_pulse). + +-ifdef(TEST). +-ifdef(PULSE). + +-compile(export_all). + +-include_lib("eqc/include/eqc.hrl"). +-include_lib("eqc/include/eqc_statem.hrl"). + +-include("corfurl.hrl"). + +-include_lib("eunit/include/eunit.hrl"). + +-compile({parse_transform, pulse_instrument}). +%% -compile({pulse_replace_module, +%% [{application, pulse_application}]}). + +-compile({pulse_skip,[{prop_pulse_test_,0},{really_delete_bitcask,0},{copy_bitcask_app,0}]}). +-compile({pulse_no_side_effect,[{file,'_','_'}, {erlang, now, 0}]}). + +%% Used for output within EUnit... +-define(QC_FMT(Fmt, Args), + io:format(user, Fmt, Args)). + +%% And to force EUnit to output QuickCheck output... +-define(QC_OUT(P), + eqc:on_output(fun(Str, Args) -> ?QC_FMT(Str, Args) end, P)). + +-record(run, { + seq, % Sequencer + proj, % Projection + flus % List of FLUs + }). + +-record(state, { + is_setup = false :: boolean(), + num_chains = 0 :: integer(), + chain_len = 0 :: integer(), + page_size = 0 :: integer(), + run :: #run{} + }). + +initial_state() -> + #state{}. + +gen_page(PageSize) -> + binary(PageSize). + +command(#state{run=Run} = S) -> + ?LET({NumChains, ChainLen, PageSize}, + {parameter(num_chains), parameter(chain_len), parameter(page_size)}, + frequency( + [{10, {call, ?MODULE, setup, [NumChains, ChainLen, PageSize]}} + || not S#state.is_setup] ++ + [{10, {call, ?MODULE, append, [Run, gen_page(PageSize)]}} + || S#state.is_setup] ++ + [])). + +%% Precondition, checked before a command is added to the command sequence. +precondition(S, {call, _, setup, _}) -> + not S#state.is_setup; +precondition(S, {call, _, _, _}) -> + S#state.is_setup. + +%% Next state transformation, S is the current state and V is the result of the +%% command. +next_state(S, Res, {call, _, setup, [NumChains, ChainLen, PageSize]}) -> + S#state{is_setup=true, + num_chains=NumChains, + chain_len=ChainLen, + page_size=PageSize, + run=Res}; +next_state(S, _, {call, _, append, _}) -> + S. + +eqeq(X, X) -> true; +eqeq(X, Y) -> {X, '/=', Y}. + +postcondition(_S, {call, _, setup, _}, #run{} = _V) -> + true; +postcondition(_S, {call, _, append, _}, {ok, LPN}) when is_integer(LPN) -> + true; +postcondition(_S, {call, _, append, _}, V) -> + eqeq(V, todoTODO_fixit). + +run_commands_on_node(_LocalOrSlave, Cmds, Seed) -> + %% AfterTime = if LocalOrSlave == local -> 50000; + %% LocalOrSlave == slave -> 1000000 + %% end, + event_logger:start_link(), + pulse:start(), + error_logger:tty(false), + error_logger:add_report_handler(handle_errors), + event_logger:start_logging(), + X = + try + {H, S, Res, Trace} = pulse:run(fun() -> + %% application:start(my_test_app), + %% receive after AfterTime -> ok end, + {H, S, R} = run_parallel_commands(?MODULE, Cmds), + %% io:format(user, "Yooo: H = ~p\n", [H]), + %% io:format(user, "Yooo: S = ~p\n", [S]), + %% io:format(user, "Yooo: R = ~p\n", [R]), + %% receive after AfterTime -> ok end, + Trace = event_logger:get_events(), + %% receive after AfterTime -> ok end, + catch exit(pulse_application_controller, shutdown), + {H, S, R, Trace} + end, [{seed, Seed}, + {strategy, unfair}]), + Schedule = pulse:get_schedule(), + Errors = gen_event:call(error_logger, handle_errors, get_errors, 60*1000), + [clean_up_runtime(S) || S#state.run /= undefined], + {H, S, Res, Trace, Schedule, Errors} + catch + _:Err -> + {'EXIT', Err} + end, + X. + +prop_pulse() -> + prop_pulse(local). + +prop_pulse(LocalOrSlave) -> + ?FORALL({NumChains, ChainLen, PageSize}, + {choose(1, 3), choose(1, 3), choose(1, 16)}, + begin + P = ?FORALL({Cmds, Seed}, + {with_parameters([{num_chains, NumChains}, + {chain_len, ChainLen}, + {page_size, PageSize}], parallel_commands(?MODULE)), + pulse:seed()}, + begin + case run_commands_on_node(LocalOrSlave, Cmds, Seed) of + {'EXIT', Err} -> + equals({'EXIT', Err}, ok); + {_H, S, Res, Trace, Schedule, Errors} -> + CheckTrace = check_trace(Trace, Cmds, Seed), + ?WHENFAIL( + ?QC_FMT("\nState: ~p\n", [S]), + measure(schedule, length(Schedule), + conjunction( + [{simple_result, equals(Res, ok)}, + {errors, equals(Errors, [])}, + {events, CheckTrace} ]))) + end + end), + P + end). + +prop_pulse_test_() -> + Timeout = case os:getenv("PULSE_TIME") of + false -> 60; + Val -> list_to_integer(Val) + end, + ExtraTO = case os:getenv("PULSE_SHRINK_TIME") of + false -> 0; + Val2 -> list_to_integer(Val2) + end, + io:format(user, "prop_pulse_test time: ~p + ~p seconds\n", + [Timeout, ExtraTO]), + {timeout, (Timeout+ExtraTO) + 60, + fun() -> + ?assert(eqc:quickcheck(eqc:testing_time(Timeout,?QC_OUT(prop_pulse())))) + end}. + +check_trace(Trace, _Cmds, _Seed) -> + %% TODO: yeah + Results = [X || {_TS, {result, _Pid, X}} <- Trace], + lists:sort(Results) == lists:usort(Results). + +%% Presenting command data statistics in a nicer way +command_data({set, _, {call, _, Fun, _}}, {_S, _V}) -> + Fun. + +%% Convenience functions for running tests + +test() -> + test({20, sec}). + +test(N) when is_integer(N) -> + quickcheck(numtests(N, prop_pulse())); +test({Time, sec}) -> + quickcheck(eqc:testing_time(Time, prop_pulse())); +test({Time, min}) -> + test({Time * 60, sec}); +test({Time, h}) -> + test({Time * 60, min}). + +check() -> + check(current_counterexample()). + +verbose() -> + verbose(current_counterexample()). + +verbose(CE) -> + erlang:put(verbose, true), + Ok = check(CE), + erlang:put(verbose, false), + Ok. + +check(CE) -> + check(on_output(fun("OK" ++ _, []) -> ok; (Fmt, Args) -> io:format(Fmt, Args) end, + prop_pulse(true == erlang:get(verbose))), + CE). + +recheck() -> + recheck(prop_pulse()). + +zipwith(F, [X|Xs], [Y|Ys]) -> + [F(X, Y)|zipwith(F, Xs, Ys)]; +zipwith(_, _, _) -> []. + +clean_up_runtime(#state{run=R} = _S) -> + %% io:format(user, "clean_up_runtime: run = ~p\n", [R]), + catch corfurl_sequencer:stop(R#run.seq), + [catch corfurl_flu:stop(F) || F <- R#run.flus], + corfurl_test:setup_del_all(length(R#run.flus)). + +make_chains(ChainLen, FLUs) -> + make_chains(ChainLen, FLUs, [], []). + +make_chains(_ChainLen, [], SmallAcc, BigAcc) -> + lists:reverse([SmallAcc|BigAcc]); +make_chains(ChainLen, [H|T], SmallAcc, BigAcc) -> + if length(SmallAcc) == ChainLen -> + make_chains(ChainLen, T, [H], [SmallAcc|BigAcc]); + true -> + make_chains(ChainLen, T, [H|SmallAcc], BigAcc) + end. + +setup(NumChains, ChainLen, PageSize) -> + N = NumChains * ChainLen, + FLUs = corfurl_test:setup_basic_flus(N, PageSize, 50000), + {ok, Seq} = corfurl_sequencer:start_link(FLUs), + Chains = make_chains(ChainLen, FLUs), + %% io:format(user, "Cs = ~p\n", [Chains]), + Proj = corfurl:new_simple_projection(1, 1, 50000, Chains), + #run{seq=Seq, proj=Proj, flus=FLUs}. + +-define(LOG(Tag, MkCall), + event_logger:event({call, self(), Tag}), + __Result = MkCall, + event_logger:event({result, self(), __Result}), + __Result). + +append(#run{seq=Seq,proj=Proj}, Page) -> + ?LOG({append, Page}, + corfurl:append_page(Seq, Proj, Page)). + +-endif. % PULSE +-endif. % TEST + diff --git a/prototype/corfurl/test/corfurl_test.erl b/prototype/corfurl/test/corfurl_test.erl index 76af137..4490131 100644 --- a/prototype/corfurl/test/corfurl_test.erl +++ b/prototype/corfurl/test/corfurl_test.erl @@ -23,15 +23,33 @@ -include("corfurl.hrl"). -ifdef(TEST). + -include_lib("eunit/include/eunit.hrl"). -compile(export_all). --endif. -define(M, corfurl). %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% %%%% --ifdef(TEST). + +setup_flu_basedir() -> + "/tmp/" ++ atom_to_list(?MODULE) ++ ".". + +setup_flu_dir(N) -> + setup_flu_basedir() ++ integer_to_list(N). + +setup_del_all(NumFLUs) -> + [ok = corfurl_util:delete_dir(setup_flu_dir(N)) || + N <- lists:seq(1, NumFLUs)]. + +setup_basic_flus(NumFLUs, PageSize, NumPages) -> + setup_del_all(NumFLUs), + [begin + element(2, corfurl_flu:start_link(setup_flu_dir(X), + PageSize, NumPages * (PageSize * ?PAGE_OVERHEAD))) + end || X <- lists:seq(1, NumFLUs)]. + +-ifndef(PULSE). save_read_test() -> Dir = "/tmp/" ++ atom_to_list(?MODULE) ++".save-read", @@ -51,23 +69,6 @@ save_read_test() -> ok = corfurl_util:delete_dir(Dir) end. -setup_flu_basedir() -> - "/tmp/" ++ atom_to_list(?MODULE) ++ ".". - -setup_flu_dir(N) -> - setup_flu_basedir() ++ integer_to_list(N). - -setup_del_all(NumFLUs) -> - [ok = corfurl_util:delete_dir(setup_flu_dir(N)) || - N <- lists:seq(1, NumFLUs)]. - -setup_basic_flus(NumFLUs, PageSize, NumPages) -> - setup_del_all(NumFLUs), - [begin - element(2, corfurl_flu:start_link(setup_flu_dir(X), - PageSize, NumPages * (PageSize * ?PAGE_OVERHEAD))) - end || X <- lists:seq(1, NumFLUs)]. - smoke1_test() -> NumFLUs = 6, PageSize = 8, @@ -211,5 +212,5 @@ forfun(NumProcs) -> end. -endif. % TIMING_TEST - +-endif. % not PULSE -endif. % TEST diff --git a/prototype/corfurl/test/pulse_util/event_logger.erl b/prototype/corfurl/test/pulse_util/event_logger.erl new file mode 100644 index 0000000..54fa964 --- /dev/null +++ b/prototype/corfurl/test/pulse_util/event_logger.erl @@ -0,0 +1,131 @@ +%%% File : handle_errors.erl +%%% Author : Ulf Norell +%%% Description : +%%% Created : 26 Mar 2012 by Ulf Norell +-module(event_logger). + +-compile(export_all). + +-behaviour(gen_server). + +%% API +-export([start_link/0, event/1, get_events/0, start_logging/0]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +-record(state, { start_time, events = [] }). + +-record(event, { timestamp, data }). + + +%%==================================================================== +%% API +%%==================================================================== +%%-------------------------------------------------------------------- +%% Function: start_link() -> {ok,Pid} | ignore | {error,Error} +%% Description: Starts the server +%%-------------------------------------------------------------------- +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +start_logging() -> + gen_server:call(?MODULE, {start, timestamp()}). + +event(EventData) -> + gen_server:call(?MODULE, + #event{ timestamp = timestamp(), data = EventData }). + +async_event(EventData) -> + gen_server:cast(?MODULE, + #event{ timestamp = timestamp(), data = EventData }). + +get_events() -> + gen_server:call(?MODULE, get_events). + +%%==================================================================== +%% gen_server callbacks +%%==================================================================== + +%%-------------------------------------------------------------------- +%% Function: init(Args) -> {ok, State} | +%% {ok, State, Timeout} | +%% ignore | +%% {stop, Reason} +%% Description: Initiates the server +%%-------------------------------------------------------------------- +init([]) -> + {ok, #state{}}. + +%%-------------------------------------------------------------------- +%% Function: %% handle_call(Request, From, State) -> +%% {reply, Reply, State} | +%% {reply, Reply, State, Timeout} | +%% {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, Reply, State} | +%% {stop, Reason, State} +%% Description: Handling call messages +%%-------------------------------------------------------------------- +handle_call(Event = #event{}, _From, State) -> + {reply, ok, add_event(Event, State)}; +handle_call({start, Now}, _From, S) -> + {reply, ok, S#state{ events = [], start_time = Now }}; +handle_call(get_events, _From, S) -> + {reply, lists:reverse([ {E#event.timestamp, E#event.data} || E <- S#state.events]), + S#state{ events = [] }}; +handle_call(Request, _From, State) -> + {reply, {error, {bad_call, Request}}, State}. + +%%-------------------------------------------------------------------- +%% Function: handle_cast(Msg, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% Description: Handling cast messages +%%-------------------------------------------------------------------- +handle_cast(Event = #event{}, State) -> + {noreply, add_event(Event, State)}; +handle_cast(_Msg, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% Function: handle_info(Info, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% Description: Handling all non call/cast messages +%%-------------------------------------------------------------------- +handle_info(_Info, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% Function: terminate(Reason, State) -> void() +%% Description: This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any necessary +%% cleaning up. When it returns, the gen_server terminates with Reason. +%% The return value is ignored. +%%-------------------------------------------------------------------- +terminate(_Reason, _State) -> + ok. + +%%-------------------------------------------------------------------- +%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState} +%% Description: Convert process state when code is changed +%%-------------------------------------------------------------------- +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%%% Internal functions +%%-------------------------------------------------------------------- + +add_event(#event{timestamp = Now, data = Data}, State) -> + Event = #event{ timestamp = Now - State#state.start_time, data = Data }, + State#state{ events = [Event|State#state.events] }. + +timestamp() -> + {A, B, C} = erlang:now(), + 1000000 * (1000000 * A + B) + C. + diff --git a/prototype/corfurl/test/pulse_util/handle_errors.erl b/prototype/corfurl/test/pulse_util/handle_errors.erl new file mode 100644 index 0000000..798f379 --- /dev/null +++ b/prototype/corfurl/test/pulse_util/handle_errors.erl @@ -0,0 +1,153 @@ +%%%------------------------------------------------------------------- +%%% @author Hans Svensson <> +%%% @copyright (C) 2012, Hans Svensson +%%% @doc +%%% +%%% @end +%%% Created : 19 Mar 2012 by Hans Svensson <> +%%%------------------------------------------------------------------- +-module(handle_errors). + +-behaviour(gen_event). + +%% API +-export([start_link/0, add_handler/0]). + +%% gen_event callbacks +-export([init/1, handle_event/2, handle_call/2, + handle_info/2, terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +-record(state, { errors = [] }). + +%%%=================================================================== +%%% gen_event callbacks +%%%=================================================================== + +%%-------------------------------------------------------------------- +%% @doc +%% Creates an event manager +%% +%% @spec start_link() -> {ok, Pid} | {error, Error} +%% @end +%%-------------------------------------------------------------------- +start_link() -> + gen_event:start_link({local, ?SERVER}). + +%%-------------------------------------------------------------------- +%% @doc +%% Adds an event handler +%% +%% @spec add_handler() -> ok | {'EXIT', Reason} | term() +%% @end +%%-------------------------------------------------------------------- +add_handler() -> + gen_event:add_handler(?SERVER, ?MODULE, []). + +%%%=================================================================== +%%% gen_event callbacks +%%%=================================================================== + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Whenever a new event handler is added to an event manager, +%% this function is called to initialize the event handler. +%% +%% @spec init(Args) -> {ok, State} +%% @end +%%-------------------------------------------------------------------- +init([]) -> + {ok, #state{}}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Whenever an event manager receives an event sent using +%% gen_event:notify/2 or gen_event:sync_notify/2, this function is +%% called for each installed event handler to handle the event. +%% +%% @spec handle_event(Event, State) -> +%% {ok, State} | +%% {swap_handler, Args1, State1, Mod2, Args2} | +%% remove_handler +%% @end +%%-------------------------------------------------------------------- +handle_event({error, _, {_, "Hintfile '~s' has bad CRC" ++ _, _}}, State) -> + {ok, State}; +handle_event({error, _, {_, "** Generic server" ++ _, _}}, State) -> + {ok, State}; +handle_event({error, _, {_, "Failed to merge ~p: ~p\n", [_, not_ready]}}, State) -> + {ok, State}; +handle_event({error, _, {_, "Failed to merge ~p: ~p\n", [_, {merge_locked, _, _}]}}, State) -> + {ok, State}; +handle_event({error, _, {_, "Failed to read lock data from ~s: ~p\n", [_, {invalid_data, <<>>}]}}, State) -> + {ok, State}; +handle_event({error, _, Event}, State) -> + {ok, State#state{ errors = [Event|State#state.errors] }}; +handle_event(_Event, State) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Whenever an event manager receives a request sent using +%% gen_event:call/3,4, this function is called for the specified +%% event handler to handle the request. +%% +%% @spec handle_call(Request, State) -> +%% {ok, Reply, State} | +%% {swap_handler, Reply, Args1, State1, Mod2, Args2} | +%% {remove_handler, Reply} +%% @end +%%-------------------------------------------------------------------- +handle_call(get_errors, S) -> + {ok, S#state.errors, S#state{ errors = [] }}; +handle_call(_Request, State) -> + Reply = ok, + {ok, Reply, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% This function is called for each installed event handler when +%% an event manager receives any other message than an event or a +%% synchronous request (or a system message). +%% +%% @spec handle_info(Info, State) -> +%% {ok, State} | +%% {swap_handler, Args1, State1, Mod2, Args2} | +%% remove_handler +%% @end +%%-------------------------------------------------------------------- +handle_info(_Info, State) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Whenever an event handler is deleted from an event manager, this +%% function is called. It should be the opposite of Module:init/1 and +%% do any necessary cleaning up. +%% +%% @spec terminate(Reason, State) -> void() +%% @end +%%-------------------------------------------------------------------- +terminate(_Reason, _State) -> + ok. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Convert process state when code is changed +%% +%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState} +%% @end +%%-------------------------------------------------------------------- +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%===================================================================