Fix process leak of CR clients and FLU1 proxy clients
This commit is contained in:
parent
3a35fe38c8
commit
049311614f
1 changed files with 70 additions and 39 deletions
|
@ -103,6 +103,7 @@ check() -> check(prop_repair_par(true), current_counterexample()).
|
|||
flu_names, % List of FLU names
|
||||
mgr_names, % List of chain manager names
|
||||
fc_list, % List of FLU1 proxy clients
|
||||
cr_count, % Count of CR clients
|
||||
cr_list}). % List of CR clients
|
||||
|
||||
weight(_S, change_partition) -> 20;
|
||||
|
@ -110,10 +111,12 @@ weight(_S, _) -> 100.
|
|||
|
||||
%% append
|
||||
|
||||
append_args(#state{cr_list=CRList}=S) ->
|
||||
[elements(CRList), chunk(), S].
|
||||
append_args(#state{cr_count=CRCount}=S) ->
|
||||
[choose(1, CRCount), chunk(), S].
|
||||
|
||||
append({_SimSelfName, C}, Bin, #state{verbose=V}=S) ->
|
||||
append(CRIndex, Bin, #state{verbose=V}=S) ->
|
||||
CRList = cr_list(),
|
||||
{_SimSelfName, C} = lists:nth(CRIndex, CRList),
|
||||
Prefix = <<"pre">>,
|
||||
Len = byte_size(Bin),
|
||||
Res = (catch machi_cr_client:append_chunk(C, Prefix, Bin, {sec(1), sec(1)})),
|
||||
|
@ -196,6 +199,10 @@ num() ->
|
|||
choose(2, 5).
|
||||
%% return(3).
|
||||
|
||||
cr_count(Num) ->
|
||||
Approx = 10, % ad-hoc
|
||||
(Approx div Num) + 1.
|
||||
|
||||
%% Generator for possibly assymmetric partition information
|
||||
partition(FLUNames) ->
|
||||
frequency([{10, return([])},
|
||||
|
@ -273,7 +280,7 @@ prop_repair_par(Verbose) ->
|
|||
end,
|
||||
_ = cleanup(SetupState),
|
||||
%% Process is leaking? This log line can be removed after fix.
|
||||
?V("process_count=~w~n", [erlang:system_info(process_count)]),
|
||||
[?V("process_count=~w~n", [erlang:system_info(process_count)]) || Verbose],
|
||||
pretty_commands(
|
||||
?MODULE, Cmds, {Seq, Par, Res},
|
||||
aggregate(with_title(cmds), command_names(Cmds),
|
||||
|
@ -289,33 +296,16 @@ prop_repair_par(Verbose) ->
|
|||
%% > eqc_gen:sample(eqc_statem:commands(machi_ap_repair_eqc)).
|
||||
%% but not so helpful.
|
||||
initial_state() ->
|
||||
#state{cr_list=[a,b,c]}.
|
||||
#state{cr_count=3, cr_list=[a,b,c]}.
|
||||
|
||||
initial_state(Num, Seed, Verbose) ->
|
||||
AllListE = all_list_extra(Num),
|
||||
FLUNames = [P#p_srvr.name || {P, _Dir} <- AllListE],
|
||||
MgrNames = [{Name, machi_flu_psup:make_mgr_supname(Name)} || Name <- FLUNames],
|
||||
Dict = orddict:from_list([{P#p_srvr.name, P} || {P, _Dir} <- AllListE]),
|
||||
|
||||
FCList = [begin
|
||||
{ok, PCPid} = machi_proxy_flu1_client:start_link(P),
|
||||
{Name, PCPid}
|
||||
end || {_, #p_srvr{name=Name}=P} <- Dict],
|
||||
%% CR clients are pooled, each has "name" which is interpreted "From"
|
||||
%% side of simulated partition.
|
||||
CRListCount = 10, % ad-hoc
|
||||
SimSelfNames = lists:append(lists:duplicate(CRListCount div Num +1, FLUNames)),
|
||||
CRList = [begin
|
||||
{ok, C} = machi_cr_client:start_link(
|
||||
[P || {_, P} <- Dict],
|
||||
[{use_partition_simulator, true},
|
||||
{simulator_self_name, SimSelfName},
|
||||
{simulator_members, FLUNames}]),
|
||||
{SimSelfName, C}
|
||||
end || SimSelfName <- SimSelfNames],
|
||||
#state{num=Num, seed=Seed, verbose=Verbose,
|
||||
flu_names=FLUNames, mgr_names=MgrNames,
|
||||
cr_list=CRList, fc_list=FCList}.
|
||||
cr_count=cr_count(Num),
|
||||
cr_list=undefined, fc_list=undefined}.
|
||||
|
||||
setup_chain(Num, Seed, Verbose) ->
|
||||
%% ?V("setup_chain(Num=~w, Seed=~w~nn", [Num, Seed]),
|
||||
|
@ -352,28 +342,67 @@ setup_chain(Num, Seed, Verbose) ->
|
|||
{#p_srvr{name=Name, port=Port}, Dir} <- AllListE],
|
||||
[machi_chain_manager1:set_chain_members(MgrName, Dict) || {_, MgrName} <- MgrNames],
|
||||
|
||||
State = initial_state(Num, Seed, Verbose),
|
||||
{CRList, FCList} = setup_pool(AllListE),
|
||||
State = #state{num=Num, seed=Seed, verbose=Verbose,
|
||||
flu_names=FLUNames, mgr_names=MgrNames,
|
||||
cr_count=cr_count(Num),
|
||||
cr_list=CRList, fc_list=FCList},
|
||||
%% Don't wait for complete chain. Even partialy completed, the chain
|
||||
%% should work fine. Right?
|
||||
wait_until_stable(chain_state_all_ok(FLUNames), FLUNames, MgrNames,
|
||||
State#state.fc_list, 20, Verbose),
|
||||
20, Verbose),
|
||||
State.
|
||||
|
||||
setup_pool(AllListE) ->
|
||||
Num = length(AllListE),
|
||||
FLUNames = [P#p_srvr.name || {P, _Dir} <- AllListE],
|
||||
Dict = orddict:from_list([{P#p_srvr.name, P} || {P, _Dir} <- AllListE]),
|
||||
|
||||
FCList = [begin
|
||||
{ok, PCPid} = machi_proxy_flu1_client:start_link(P),
|
||||
{Name, PCPid}
|
||||
end || {_, #p_srvr{name=Name}=P} <- Dict],
|
||||
%% CR clients are pooled, each has "name" which is interpreted "From"
|
||||
%% side of simulated partition.
|
||||
SimSelfNames = lists:append(lists:duplicate(cr_count(Num), FLUNames)),
|
||||
CRList = [begin
|
||||
{ok, C} = machi_cr_client:start_link(
|
||||
[P || {_, P} <- Dict],
|
||||
[{use_partition_simulator, true},
|
||||
{simulator_self_name, SimSelfName},
|
||||
{simulator_members, FLUNames}]),
|
||||
{SimSelfName, C}
|
||||
end || SimSelfName <- SimSelfNames],
|
||||
catch ets:delete(cpool),
|
||||
ets:new(cpool, [set, protected, named_table, {read_concurrency, true}]),
|
||||
ets:insert(cpool, {fc_list, FCList}),
|
||||
ets:insert(cpool, {cr_list, CRList}),
|
||||
{CRList, FCList}.
|
||||
|
||||
fc_list() ->
|
||||
[{fc_list, FCList}] = ets:lookup(cpool, fc_list),
|
||||
FCList.
|
||||
|
||||
cr_list() ->
|
||||
[{cr_list, CRList}] = ets:lookup(cpool, cr_list),
|
||||
CRList.
|
||||
|
||||
%% Post commands
|
||||
|
||||
stabilize(0, S) ->
|
||||
{ok, S};
|
||||
stabilize(_CmdsLen, #state{flu_names=FLUNames, mgr_names=MgrNames,
|
||||
fc_list=FCList, verbose=Verbose}=S) ->
|
||||
verbose=Verbose}=S) ->
|
||||
machi_partition_simulator:no_partitions(),
|
||||
wait_until_stable(chain_state_all_ok(FLUNames), FLUNames, MgrNames,
|
||||
FCList, 100, Verbose),
|
||||
100, Verbose),
|
||||
{ok, S}.
|
||||
|
||||
chain_state_all_ok(FLUNames) ->
|
||||
[{FLUName, {FLUNames, [], []}} || FLUName <- FLUNames].
|
||||
|
||||
confirm_result(#state{cr_list=[{_, C}|_]}=_S) ->
|
||||
confirm_result(#state{}=_S) ->
|
||||
[{_, C} | _] = cr_list(),
|
||||
[{written, _Written}, {accpt, Accpt},
|
||||
{failed, Failed}, {critical, Critical}] = tab_counts(),
|
||||
{OK, Dataloss} = confirm_written(C),
|
||||
|
@ -427,8 +456,8 @@ assert_chunk(C, {Off, Len, FileName}=Key, Bin) ->
|
|||
end.
|
||||
|
||||
cleanup(#state{fc_list=FCList, cr_list=CRList}=_S) ->
|
||||
[catch machi_proxy_flu1_client:quit(FC) || FC <- FCList],
|
||||
[catch machi_cr_client:quit(CR) || CR <- CRList],
|
||||
[begin unlink(FC), catch exit(FC, kill) end || {_, FC} <- FCList],
|
||||
[begin unlink(CR), catch exit(CR, kill) end || {_, CR} <- CRList],
|
||||
_ = shutdown_hard().
|
||||
|
||||
%% Internal utilities
|
||||
|
@ -475,15 +504,16 @@ shutdown_hard() ->
|
|||
timer:sleep(100).
|
||||
|
||||
tick(#state{flu_names=FLUNames, mgr_names=MgrNames,
|
||||
fc_list=FCList, verbose=Verbose}) ->
|
||||
tick(FLUNames, MgrNames, FCList, Verbose).
|
||||
verbose=Verbose}) ->
|
||||
tick(FLUNames, MgrNames, Verbose).
|
||||
|
||||
tick(FLUNames, MgrNames, FCList, Verbose) ->
|
||||
tick(FLUNames, MgrNames, 2, 100, FCList, Verbose).
|
||||
tick(FLUNames, MgrNames, Verbose) ->
|
||||
tick(FLUNames, MgrNames, 2, 100, Verbose).
|
||||
|
||||
tick(FLUNames, MgrNames, Iter, SleepMax, FCList, Verbose) ->
|
||||
tick(FLUNames, MgrNames, Iter, SleepMax, Verbose) ->
|
||||
TickFun = tick_fun(FLUNames, MgrNames, self()),
|
||||
TickFun(Iter, 0, SleepMax),
|
||||
FCList = fc_list(),
|
||||
[?V("## Chain state after tick()=~w~n", [chain_state(FCList)]) || Verbose].
|
||||
|
||||
tick_fun(FLUNames, MgrNames, Parent) ->
|
||||
|
@ -516,11 +546,12 @@ tick_fun(FLUNames, MgrNames, Parent) ->
|
|||
end || {ThePid, M_name} <- Pids]
|
||||
end.
|
||||
|
||||
wait_until_stable(ExpectedChainState, FLUNames, MgrNames, FCList, Verbose) ->
|
||||
wait_until_stable(ExpectedChainState, FLUNames, MgrNames, FCList, 20, Verbose).
|
||||
wait_until_stable(ExpectedChainState, FLUNames, MgrNames, Verbose) ->
|
||||
wait_until_stable(ExpectedChainState, FLUNames, MgrNames, 20, Verbose).
|
||||
|
||||
wait_until_stable(ExpectedChainState, FLUNames, MgrNames, FCList, Retries, Verbose) ->
|
||||
wait_until_stable(ExpectedChainState, FLUNames, MgrNames, Retries, Verbose) ->
|
||||
TickFun = tick_fun(FLUNames, MgrNames, self()),
|
||||
FCList = fc_list(),
|
||||
wait_until_stable1(ExpectedChainState, TickFun, FCList, Retries, Verbose).
|
||||
|
||||
wait_until_stable1(_ExpectedChainState, _TickFun, FCList, 0, _Verbose) ->
|
||||
|
|
Loading…
Reference in a new issue