Fix process leak of repair eqc #44

Merged
shino merged 2 commits from ss/repair-eqc-pleak-fix into master 2015-11-17 10:16:24 +00:00
Showing only changes of commit ad419ada50 - Show all commits

View file

@ -47,6 +47,8 @@
%% - Operations other than append, write, trim %% - Operations other than append, write, trim
%% - Use checksum instead of binary to save memory %% - Use checksum instead of binary to save memory
%% - More variety for partitioning pattern: non-constant failure %% - More variety for partitioning pattern: non-constant failure
%% - Stop and restart
%% - Suspend and resume of some erlang processes
-module(machi_ap_repair_eqc). -module(machi_ap_repair_eqc).
@ -60,6 +62,16 @@
-include_lib("eqc/include/eqc_statem.hrl"). -include_lib("eqc/include/eqc_statem.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-record(target, {verbose=false,
flu_names,
mgr_names}).
-record(state, {num,
verbose=false,
flu_names,
mgr_names,
cr_count}).
%% ETS table names %% ETS table names
-define(WRITTEN_TAB, written). % Successfully written data -define(WRITTEN_TAB, written). % Successfully written data
-define(ACCPT_TAB, accpt). % Errors with no harm, e.g. timeout -define(ACCPT_TAB, accpt). % Errors with no harm, e.g. timeout
@ -91,25 +103,12 @@ prop_repair_par_test_() ->
eqc:quickcheck(eqc:testing_time( eqc:quickcheck(eqc:testing_time(
PropTO, ?QC_OUT(noshrink(prop_repair_par(Verbose))))))}]}. PropTO, ?QC_OUT(noshrink(prop_repair_par(Verbose))))))}]}.
%% SHELL HELPERS %% Model
test() -> test(100).
test(N) -> test(N, true).
test(N, Verbose) -> quickcheck(numtests(N, noshrink(prop_repair_par(Verbose)))).
check() -> check(prop_repair_par(true), current_counterexample()).
-record(state, {num, % Number of FLU servers
seed, % Seed for partition simulator
verbose=false, % Verbose output for debugging
flu_names, % List of FLU names
mgr_names, % List of chain manager names
fc_list, % List of FLU1 proxy clients
cr_count, % Count of CR clients
cr_list}). % List of CR clients
weight(_S, change_partition) -> 20; weight(_S, change_partition) -> 20;
weight(_S, _) -> 100. weight(_S, _) -> 100.
%% append %% Append
append_args(#state{cr_count=CRCount}=S) -> append_args(#state{cr_count=CRCount}=S) ->
[choose(1, CRCount), chunk(), S]. [choose(1, CRCount), chunk(), S].
@ -174,7 +173,7 @@ append(CRIndex, Bin, #state{verbose=V}=S) ->
{other_error, Other} {other_error, Other}
end. end.
%% change partition %% Change partition
change_partition_args(#state{flu_names=FLUNames}=S) -> change_partition_args(#state{flu_names=FLUNames}=S) ->
%% [partition(FLUNames), S]. %% [partition(FLUNames), S].
@ -197,11 +196,22 @@ change_partition(Partition,
num() -> num() ->
choose(2, 5). choose(2, 5).
%% return(3).
cr_count(Num) -> cr_count(Num) ->
Approx = 10, % ad-hoc Num * 3.
(Approx div Num) + 1.
%% Returns a list like
%% `[{#p_srvr{name=a, port=7501, ..}, "./eqc/data.eqc.a/"}, ...]'
all_list_extra(Num) ->
{PortBase, DirBase} = get_port_dir_base(),
[begin
FLUNameStr = [$a + I - 1],
FLUName = list_to_atom(FLUNameStr),
MgrName = machi_flu_psup:make_mgr_supname(FLUName),
{#p_srvr{name=FLUName, address="localhost", port=PortBase+I,
props=[{chmgr, MgrName}]},
DirBase ++ "/data.eqc." ++ FLUNameStr}
end || I <- lists:seq(1, Num)].
%% Generator for possibly assymmetric partition information %% Generator for possibly assymmetric partition information
partition(FLUNames) -> partition(FLUNames) ->
@ -223,7 +233,7 @@ flu_pairs(FLUNames) ->
chunk() -> chunk() ->
non_empty(binary(10)). non_empty(binary(10)).
%% Property %% Properties
prop_repair(Verbose) -> prop_repair(Verbose) ->
error_logger:tty(false), error_logger:tty(false),
@ -232,17 +242,17 @@ prop_repair(Verbose) ->
Seed = {1445,935441,287549}, Seed = {1445,935441,287549},
?FORALL(Num, num(), ?FORALL(Num, num(),
?FORALL(Cmds, commands(?MODULE, initial_state(Num, Seed, Verbose)), ?FORALL(Cmds, commands(?MODULE, initial_state(Num, Verbose)),
begin begin
SetupState = setup_chain(Num, Seed, Verbose), Target = setup_target(Num, Seed, Verbose),
{H, S1, Res} = run_commands(?MODULE, Cmds), {H, S1, Res0} = run_commands(?MODULE, Cmds),
%% ?V("S1=~w~n", [S1]), %% ?V("S1=~w~n", [S1]),
?V("==== Start post operations, stabilize and confirm results~n", []), ?V("==== Start post operations, stabilize and confirm results~n", []),
{_Res2, S2} = stabilize(commands_len(Cmds), SetupState), _ = stabilize(commands_len(Cmds), Target),
{Dataloss, Critical} = confirm_result(S2), {Dataloss, Critical} = confirm_result(Target),
_ = cleanup(SetupState), _ = cleanup(Target),
pretty_commands( pretty_commands(
?MODULE, Cmds, {H, S1, Res}, ?MODULE, Cmds, {H, S1, Res0},
aggregate(with_title(cmds), command_names(Cmds), aggregate(with_title(cmds), command_names(Cmds),
collect(with_title(length5), (length(Cmds) div 5) * 5, collect(with_title(length5), (length(Cmds) div 5) * 5,
{Dataloss, Critical} =:= {0, 0}))) {Dataloss, Critical} =:= {0, 0})))
@ -258,35 +268,35 @@ prop_repair_par(Verbose) ->
?FORALL(Cmds, ?FORALL(Cmds,
%% Now try-and-err'ing, how to control command length and concurrency? %% Now try-and-err'ing, how to control command length and concurrency?
?SUCHTHAT(Cmds0, ?SIZED(Size, resize(Size, ?SUCHTHAT(Cmds0, ?SIZED(Size, resize(Size,
parallel_commands(?MODULE, initial_state(Num, Seed, Verbose)))), parallel_commands(?MODULE, initial_state(Num, Verbose)))),
commands_len(Cmds0) > 20 commands_len(Cmds0) > 20
andalso andalso
concurrency(Cmds0) > 2), concurrency(Cmds0) > 2),
begin begin
CmdsLen= commands_len(Cmds), CmdsLen= commands_len(Cmds),
SetupState = setup_chain(Num, Seed, Verbose), Target = setup_target(Num, Seed, Verbose),
{Seq, Par, Res} = run_parallel_commands(?MODULE, Cmds), {Seq, Par, Res0} = run_parallel_commands(?MODULE, Cmds),
%% ?V("Seq=~w~n", [Seq]), %% ?V("Seq=~w~n", [Seq]),
%% ?V("Par=~w~n", [Par]), %% ?V("Par=~w~n", [Par]),
?V("==== Start post operations, stabilize and confirm results~n", []), ?V("==== Start post operations, stabilize and confirm results~n", []),
{Dataloss, Critical} = {FinalRes, {Dataloss, Critical}} =
case Res of case Res0 of
ok -> ok ->
{_Res2, S2} = stabilize(CmdsLen, SetupState), Res1 = stabilize(CmdsLen, Target),
confirm_result(S2); {Res1, confirm_result(Target)};
_ -> _ ->
?V("Res=~w~n", [Res]), ?V("Res0=~w~n", [Res0]),
{undefined, undefined} {Res0, {undefined, undefined}}
end, end,
_ = cleanup(SetupState), _ = cleanup(Target),
%% Process is leaking? This log line can be removed after fix. %% Process is leaking? This log line can be removed after fix.
[?V("process_count=~w~n", [erlang:system_info(process_count)]) || Verbose], [?V("process_count=~w~n", [erlang:system_info(process_count)]) || Verbose],
pretty_commands( pretty_commands(
?MODULE, Cmds, {Seq, Par, Res}, ?MODULE, Cmds, {Seq, Par, Res0},
aggregate(with_title(cmds), command_names(Cmds), aggregate(with_title(cmds), command_names(Cmds),
collect(with_title(length5), (CmdsLen div 5) * 5, collect(with_title(length5), (CmdsLen div 5) * 5,
collect(with_title(conc), concurrency(Cmds), collect(with_title(conc), concurrency(Cmds),
{Dataloss, Critical} =:= {0, 0}))) {FinalRes, {Dataloss, Critical}} =:= {ok, {0, 0}})))
) )
end)). end)).
@ -296,24 +306,35 @@ prop_repair_par(Verbose) ->
%% > eqc_gen:sample(eqc_statem:commands(machi_ap_repair_eqc)). %% > eqc_gen:sample(eqc_statem:commands(machi_ap_repair_eqc)).
%% but not so helpful. %% but not so helpful.
initial_state() -> initial_state() ->
#state{cr_count=3, cr_list=[a,b,c]}. #state{cr_count=3}.
initial_state(Num, Seed, Verbose) -> initial_state(Num, Verbose) ->
AllListE = all_list_extra(Num), AllListE = all_list_extra(Num),
FLUNames = [P#p_srvr.name || {P, _Dir} <- AllListE], FLUNames = [P#p_srvr.name || {P, _Dir} <- AllListE],
MgrNames = [{Name, machi_flu_psup:make_mgr_supname(Name)} || Name <- FLUNames], MgrNames = [{Name, machi_flu_psup:make_mgr_supname(Name)} || Name <- FLUNames],
#state{num=Num, seed=Seed, verbose=Verbose, #state{num=Num, verbose=Verbose,
flu_names=FLUNames, mgr_names=MgrNames, flu_names=FLUNames, mgr_names=MgrNames,
cr_count=cr_count(Num), cr_count=cr_count(Num)}.
cr_list=undefined, fc_list=undefined}.
setup_chain(Num, Seed, Verbose) -> setup_target(Num, Seed, Verbose) ->
%% ?V("setup_chain(Num=~w, Seed=~w~nn", [Num, Seed]), %% ?V("setup_target(Num=~w, Seed=~w~nn", [Num, Seed]),
AllListE = all_list_extra(Num), AllListE = all_list_extra(Num),
FLUNames = [P#p_srvr.name || {P, _Dir} <- AllListE], FLUNames = [P#p_srvr.name || {P, _Dir} <- AllListE],
MgrNames = [{Name, machi_flu_psup:make_mgr_supname(Name)} || Name <- FLUNames], MgrNames = [{Name, machi_flu_psup:make_mgr_supname(Name)} || Name <- FLUNames],
Dict = orddict:from_list([{P#p_srvr.name, P} || {P, _Dir} <- AllListE]), Dict = orddict:from_list([{P#p_srvr.name, P} || {P, _Dir} <- AllListE]),
setup_chain(Seed, AllListE, FLUNames, MgrNames, Dict),
_ = setup_cpool(AllListE, FLUNames, Dict),
Target = #target{flu_names=FLUNames, mgr_names=MgrNames,
verbose=Verbose},
%% Don't wait for complete chain. Even partialy completed, the chain
%% should work fine. Right?
wait_until_stable(chain_state_all_ok(FLUNames), FLUNames, MgrNames,
20, Verbose),
Target.
setup_chain(Seed, AllListE, FLUNames, MgrNames, Dict) ->
ok = shutdown_hard(), ok = shutdown_hard(),
[begin [begin
machi_flu1_test:clean_up_data_dir(Dir), machi_flu1_test:clean_up_data_dir(Dir),
@ -341,23 +362,10 @@ setup_chain(Num, Seed, Verbose) ->
[{ok, _} = machi_flu_psup:start_flu_package(Name, Port, Dir, FLUOpts) || [{ok, _} = machi_flu_psup:start_flu_package(Name, Port, Dir, FLUOpts) ||
{#p_srvr{name=Name, port=Port}, Dir} <- AllListE], {#p_srvr{name=Name, port=Port}, Dir} <- AllListE],
[machi_chain_manager1:set_chain_members(MgrName, Dict) || {_, MgrName} <- MgrNames], [machi_chain_manager1:set_chain_members(MgrName, Dict) || {_, MgrName} <- MgrNames],
ok.
{CRList, FCList} = setup_pool(AllListE), setup_cpool(AllListE, FLUNames, Dict) ->
State = #state{num=Num, seed=Seed, verbose=Verbose,
flu_names=FLUNames, mgr_names=MgrNames,
cr_count=cr_count(Num),
cr_list=CRList, fc_list=FCList},
%% Don't wait for complete chain. Even partialy completed, the chain
%% should work fine. Right?
wait_until_stable(chain_state_all_ok(FLUNames), FLUNames, MgrNames,
20, Verbose),
State.
setup_pool(AllListE) ->
Num = length(AllListE), Num = length(AllListE),
FLUNames = [P#p_srvr.name || {P, _Dir} <- AllListE],
Dict = orddict:from_list([{P#p_srvr.name, P} || {P, _Dir} <- AllListE]),
FCList = [begin FCList = [begin
{ok, PCPid} = machi_proxy_flu1_client:start_link(P), {ok, PCPid} = machi_proxy_flu1_client:start_link(P),
{Name, PCPid} {Name, PCPid}
@ -387,21 +395,21 @@ cr_list() ->
[{cr_list, CRList}] = ets:lookup(cpool, cr_list), [{cr_list, CRList}] = ets:lookup(cpool, cr_list),
CRList. CRList.
%% Post commands %% Post run_commands
stabilize(0, S) -> stabilize(0, _T) ->
{ok, S}; ok;
stabilize(_CmdsLen, #state{flu_names=FLUNames, mgr_names=MgrNames, stabilize(_CmdsLen, #target{flu_names=FLUNames, mgr_names=MgrNames,
verbose=Verbose}=S) -> verbose=Verbose}) ->
machi_partition_simulator:no_partitions(), machi_partition_simulator:no_partitions(),
wait_until_stable(chain_state_all_ok(FLUNames), FLUNames, MgrNames, wait_until_stable(chain_state_all_ok(FLUNames), FLUNames, MgrNames,
100, Verbose), 100, Verbose),
{ok, S}. ok.
chain_state_all_ok(FLUNames) -> chain_state_all_ok(FLUNames) ->
[{FLUName, {FLUNames, [], []}} || FLUName <- FLUNames]. [{FLUName, {FLUNames, [], []}} || FLUName <- FLUNames].
confirm_result(#state{}=_S) -> confirm_result(_T) ->
[{_, C} | _] = cr_list(), [{_, C} | _] = cr_list(),
[{written, _Written}, {accpt, Accpt}, [{written, _Written}, {accpt, Accpt},
{failed, Failed}, {critical, Critical}] = tab_counts(), {failed, Failed}, {critical, Critical}] = tab_counts(),
@ -455,12 +463,12 @@ assert_chunk(C, {Off, Len, FileName}=Key, Bin) ->
{error, Other} {error, Other}
end. end.
cleanup(#state{fc_list=FCList, cr_list=CRList}=_S) -> cleanup(_Target) ->
[begin unlink(FC), catch exit(FC, kill) end || {_, FC} <- FCList], [begin unlink(FC), catch exit(FC, kill) end || {_, FC} <- fc_list()],
[begin unlink(CR), catch exit(CR, kill) end || {_, CR} <- CRList], [begin unlink(CR), catch exit(CR, kill) end || {_, CR} <- cr_list()],
_ = shutdown_hard(). _ = shutdown_hard().
%% Internal utilities %% Internal misc utilities
eqc_verbose() -> eqc_verbose() ->
os:getenv("EQC_VERBOSE") =:= "true". os:getenv("EQC_VERBOSE") =:= "true".
@ -472,19 +480,6 @@ eqc_timeout(Default) ->
end, end,
{PropTimeout, PropTimeout * 300}. {PropTimeout, PropTimeout * 300}.
%% Returns a list like
%% `[{#p_srvr{name=a, port=7501, ..}, "./eqc/data.eqc.a/"}, ...]'
all_list_extra(Num) ->
{PortBase, DirBase} = get_port_dir_base(),
[begin
FLUNameStr = [$a + I - 1],
FLUName = list_to_atom(FLUNameStr),
MgrName = machi_flu_psup:make_mgr_supname(FLUName),
{#p_srvr{name=FLUName, address="localhost", port=PortBase+I,
props=[{chmgr, MgrName}]},
DirBase ++ "/data.eqc." ++ FLUNameStr}
end || I <- lists:seq(1, Num)].
get_port_dir_base() -> get_port_dir_base() ->
I = case os:getenv("EQC_BASE_PORT") of I = case os:getenv("EQC_BASE_PORT") of
false -> 0; false -> 0;