WIP: chain mgmt prototype scaffolding 6: refactoring

This commit is contained in:
Scott Lystig Fritchie 2014-10-29 17:05:28 +09:00
parent e5b9230af0
commit 616a11e230
2 changed files with 117 additions and 56 deletions

View file

@ -48,8 +48,10 @@
}). }).
-record(ch_mgr, { -record(ch_mgr, {
init_finished :: boolean(),
name :: m_server(), name :: m_server(),
proj :: #projection{}, proj :: #projection{},
proj_proposed :: #projection{},
myflu :: pid() | atom(), myflu :: pid() | atom(),
%% %%
runenv :: list() %proplist() runenv :: list() %proplist()

View file

@ -30,15 +30,17 @@
%% API %% API
-export([start_link/6, stop/1, ping/1, -export([start_link/6, stop/1, ping/1,
calculate_projection_internal_old/1, calculate_projection_internal_old/1,
cl_write_current_projection/1]). test_write_proposed_projection/1]).
-export([trigger_projection_calculation/1]). %% -export([]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]). terminate/2, code_change/3]).
-export([make_projection_summary/1]). -export([make_projection_summary/1]).
-ifdef(TEST). -ifdef(TEST).
-export([test_projection_calculation/1, test_cl_read_public_proj/1]). -export([test_calc_projection/2,
test_calc_proposed_projection/1,
test_cl_read_public_proj/1]).
-ifdef(EQC). -ifdef(EQC).
-include_lib("eqc/include/eqc.hrl"). -include_lib("eqc/include/eqc.hrl").
@ -67,15 +69,24 @@ ping(Pid) ->
calculate_projection_internal_old(Pid) -> calculate_projection_internal_old(Pid) ->
gen_server:call(Pid, {calculate_projection_internal_old}, infinity). gen_server:call(Pid, {calculate_projection_internal_old}, infinity).
cl_write_current_projection(Pid) -> test_write_proposed_projection(Pid) ->
gen_server:call(Pid, {cl_write_current_projection}, infinity). gen_server:call(Pid, {test_write_proposed_projection}, infinity).
trigger_projection_calculation(Pid) ->
gen_server:cast(Pid, {trigger_projection_calculation}).
-ifdef(TEST). -ifdef(TEST).
test_projection_calculation(Pid) ->
gen_server:call(Pid, {test_projection_calculation}, infinity). %% Test/debugging code only.
%% Calculate a projection and return it to us.
%% If KeepRunenvP is true, the server will retain its change in its
%% runtime environment, e.g., changes in simulated network partitions.
%% The server's internal proposed projection is not altered.
test_calc_projection(Pid, KeepRunenvP) ->
gen_server:call(Pid, {test_calc_projection, KeepRunenvP}, infinity).
%% Async!
%% The server's internal proposed projection *is* altered.
test_calc_proposed_projection(Pid) ->
gen_server:cast(Pid, {test_calc_proposed_projection}).
test_cl_read_public_proj(Pid) -> test_cl_read_public_proj(Pid) ->
gen_server:call(Pid, {test_cl_read_public_proj}, infinity). gen_server:call(Pid, {test_cl_read_public_proj}, infinity).
@ -88,41 +99,56 @@ init({MyName, All_list, Seed, OldThreshold, NoPartitionThreshold, MyFLUPid}) ->
{network_partitions, []}, {network_partitions, []},
{old_threshold, OldThreshold}, {old_threshold, OldThreshold},
{no_partition_threshold, NoPartitionThreshold}], {no_partition_threshold, NoPartitionThreshold}],
S = #ch_mgr{name=MyName, BestProj = make_initial_projection(MyName, All_list, All_list,
proj=make_initial_projection(MyName, All_list, All_list, [{author_proc, init_best}], []),
[{author_proc, init}], []), NoneProj = make_initial_projection(MyName, All_list, [],
[{author_proc, init_none}], []),
S = #ch_mgr{init_finished=false,
name=MyName,
proj=NoneProj,
myflu=MyFLUPid, % pid or atom local name myflu=MyFLUPid, % pid or atom local name
runenv=RunEnv}, runenv=RunEnv},
self() ! {finish_init, BestProj},
{ok, S}. {ok, S}.
handle_call(_Call, _From, #ch_mgr{init_finished=false} = S) ->
{reply, not_initialized, S};
handle_call({calculate_projection_internal_old}, _From, S) -> handle_call({calculate_projection_internal_old}, _From, S) ->
{Reply, S2} = calc_projection(S, [{author_proc, call}]), {Reply, S2} = calc_projection(S, [{author_proc, call}]),
{reply, Reply, S2}; {reply, Reply, S2};
handle_call({cl_write_current_projection}, _From, S) -> handle_call({test_write_proposed_projection}, _From, S) ->
{Res, S2} = do_cl_write_current_proj(S), {Res, S2} = do_cl_write_current_proj(S),
{reply, Res, S2}; {reply, Res, S2};
handle_call({ping}, _From, S) -> handle_call({ping}, _From, S) ->
{reply, pong, S}; {reply, pong, S};
handle_call({stop}, _From, S) -> handle_call({stop}, _From, S) ->
{stop, normal, ok, S}; {stop, normal, ok, S};
handle_call({test_projection_calculation}, _From, S) -> handle_call({test_calc_projection, KeepRunenvP}, _From, S) ->
{P, S2} = calc_projection(S, [{author_proc, cast}]), {P, S2} = calc_projection(S, [{author_proc, call}]),
{reply, {ok, P}, S2}; {reply, {ok, P}, if KeepRunenvP -> S2;
true -> S
end};
handle_call({test_cl_read_public_proj}, _From, S) -> handle_call({test_cl_read_public_proj}, _From, S) ->
{Res, _DiscardS2} = cl_read_public_proj(S, true), {Res, _DiscardS2} = cl_read_public_proj(S, true),
{reply, Res, S}; {reply, Res, S};
handle_call(_Call, _From, S) -> handle_call(_Call, _From, S) ->
{reply, whaaaaaaaaaa, S}. {reply, whaaaaaaaaaa, S}.
handle_cast({trigger_projection_calculation}, S) -> handle_cast(_Cast, #ch_mgr{init_finished=false} = S) ->
{P, S2} = calc_projection(S, [{author_proc, cast}]), {noreply, S};
?D({todododo,make_projection_summary(P)}), handle_cast({test_calc_proposed_projection}, S) ->
{noreply, S2}; {Proj, S2} = calc_projection(S, [{author_proc, cast}]),
?D({make_projection_summary(Proj)}),
{noreply, S2#ch_mgr{proj_proposed=Proj}};
handle_cast(_Cast, S) -> handle_cast(_Cast, S) ->
?D({cast_whaaaaaaaaaaa, _Cast}), ?D({cast_whaaaaaaaaaaa, _Cast}),
{noreply, S}. {noreply, S}.
handle_info(_Msg, S) -> handle_info({finish_init, BestProj}, S) ->
S2 = finish_init(BestProj, S),
{noreply, S2};
handle_info(Msg, S) ->
exit({bummer, Msg}),
{noreply, S}. {noreply, S}.
terminate(_Reason, _S) -> terminate(_Reason, _S) ->
@ -133,6 +159,27 @@ code_change(_OldVsn, S, _Extra) ->
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
finish_init(BestProj, #ch_mgr{init_finished=false, myflu=MyFLU} = S) ->
case machi_flu0:proj_read_latest(MyFLU, private) of
error_unwritten ->
Epoch = BestProj#projection.epoch_number,
case machi_flu0:proj_write(MyFLU, Epoch, private, BestProj) of
ok ->
S#ch_mgr{init_finished=true, proj=BestProj};
error_written ->
exit({yo_impossible, ?LINE});
Else ->
?D({retry,Else}),
timer:sleep(100),
finish_init(BestProj, S)
end;
{ok, Proj} ->
S#ch_mgr{init_finished=true, proj=Proj};
Else ->
?D({todo, fix_up_eventually, Else}),
exit({yo_weird, Else})
end.
do_cl_write_current_proj(#ch_mgr{proj=Proj, myflu=MyFLU} = S) -> do_cl_write_current_proj(#ch_mgr{proj=Proj, myflu=MyFLU} = S) ->
#projection{epoch_number=Epoch} = Proj, #projection{epoch_number=Epoch} = Proj,
case cl_write_public_proj(Epoch, Proj, S) of case cl_write_public_proj(Epoch, Proj, S) of
@ -210,7 +257,7 @@ cl_read_public_proj(#ch_mgr{proj=Proj}=S, ReadRepairP) ->
end. end.
make_initial_projection(MyName, All_list, UPI_list, Repairing_list, Ps) -> make_initial_projection(MyName, All_list, UPI_list, Repairing_list, Ps) ->
make_projection(1, 0, <<>>, make_projection(0, -1, <<>>,
MyName, All_list, [], UPI_list, Repairing_list, Ps). MyName, All_list, [], UPI_list, Repairing_list, Ps).
make_projection(EpochNum, PrevEpochNum, PrevEpochCSum, make_projection(EpochNum, PrevEpochNum, PrevEpochCSum,
@ -290,7 +337,7 @@ calc_projection(OldThreshold, NoPartitionThreshold, LastProj, Dbg,
Repairing_list5 = Repairing_list4 -- Down, Repairing_list5 = Repairing_list4 -- Down,
P = make_projection(OldEpochNum + 1, OldEpochNum, OldEpochCsum, P = make_projection(OldEpochNum + 1, OldEpochNum, OldEpochCsum,
MyName, All_list, Down, NewUPI_list3, Repairing_list5, MyName, All_list, Down, NewUPI_list3, Repairing_list5,
Dbg), Dbg ++ [{nodes_up, Up}]),
{P, S#ch_mgr{runenv=RunEnv3}}. {P, S#ch_mgr{runenv=RunEnv3}}.
calc_up_nodes(#ch_mgr{name=MyName, proj=Proj, runenv=RunEnv1}=S) -> calc_up_nodes(#ch_mgr{name=MyName, proj=Proj, runenv=RunEnv1}=S) ->
@ -421,14 +468,16 @@ perhaps_call_t(S, Partitions, FLU, DoIt) ->
perhaps_call(S, Partitions, FLU, DoIt) perhaps_call(S, Partitions, FLU, DoIt)
catch catch
exit:timeout -> exit:timeout ->
?D({perhaps_call, S#ch_mgr.myflu, FLU, Partitions}),
t_timeout t_timeout
end. end.
perhaps_call(#ch_mgr{name=MyName}, Partitions, FLU, DoIt) -> perhaps_call(#ch_mgr{name=MyName, myflu=MyFLU}, Partitions, FLU, DoIt) ->
case lists:keyfind(FLU, 1, Partitions) of RemoteFLU_p = FLU /= MyFLU,
case RemoteFLU_p andalso lists:keyfind(FLU, 1, Partitions) of
false -> false ->
Res = DoIt(), Res = DoIt(),
case lists:keyfind(MyName, 2, Partitions) of case RemoteFLU_p andalso lists:keyfind(MyName, 2, Partitions) of
false -> false ->
Res; Res;
_ -> _ ->
@ -445,14 +494,16 @@ perhaps_call(#ch_mgr{name=MyName}, Partitions, FLU, DoIt) ->
-define(MGR, machi_chain_manager1). -define(MGR, machi_chain_manager1).
smoke0_test() -> smoke0_test() ->
{ok, FLUa} = machi_flu0:start_link(a),
{ok, M0} = ?MGR:start_link(a, [a,b,c], {1,2,3}, 50, 50, a), {ok, M0} = ?MGR:start_link(a, [a,b,c], {1,2,3}, 50, 50, a),
try try
[begin [begin
Proj = ?MGR:calculate_projection_internal_old(M0), Proj = ?MGR:calculate_projection_internal_old(M0),
io:format(user, "~p\n", [?MGR:make_projection_summary(Proj)]) io:format(user, "~w\n", [?MGR:make_projection_summary(Proj)])
end || _ <- lists:seq(1,5)] end || _ <- lists:seq(1,5)]
after after
ok = ?MGR:stop(M0) ok = ?MGR:stop(M0),
ok = machi_flu0:stop(FLUa)
end. end.
smoke1_test() -> smoke1_test() ->
@ -462,10 +513,18 @@ smoke1_test() ->
I_represent = I_am = a, I_represent = I_am = a,
{ok, M0} = ?MGR:start_link(I_represent, [a,b,c], {1,2,3}, 50, 50, I_am), {ok, M0} = ?MGR:start_link(I_represent, [a,b,c], {1,2,3}, 50, 50, I_am),
try try
ok = cl_write_current_projection(M0), {ok, P1} = test_calc_projection(M0, false),
_ = trigger_projection_calculation(M0), %% ?D(make_projection_summary(P1)),
%% {ok, _Pa} = test_calc_projection(M0, true),
%% %% ?D(make_projection_summary(_Pa)),
%% {ok, _Pb} = test_calc_projection(M0, true),
%% %% ?D(make_projection_summary(_Pb)),
pong = ping(M0), pong = ping(M0),
_Pa = test_projection_calculation(M0),
%% _ = test_calc_proposed_projection(M0),
%% ok = test_write_proposed_projection(M0),
ok ok
after after
ok = ?MGR:stop(M0), ok = ?MGR:stop(M0),
@ -474,25 +533,25 @@ smoke1_test() ->
ok = machi_flu0:stop(FLUc) ok = machi_flu0:stop(FLUc)
end. end.
nonunanimous_read_setup_test() -> %% nonunanimous_read_setup_test() ->
{ok, FLUa} = machi_flu0:start_link(a), %% {ok, FLUa} = machi_flu0:start_link(a),
{ok, FLUb} = machi_flu0:start_link(b), %% {ok, FLUb} = machi_flu0:start_link(b),
I_represent = I_am = a, %% I_represent = I_am = a,
{ok, M0} = ?MGR:start_link(I_represent, [a,b], {1,2,3}, 50, 50, I_am), %% {ok, M0} = ?MGR:start_link(I_represent, [a,b], {1,2,3}, 50, 50, I_am),
try %% try
ok = cl_write_current_projection(M0), %% ok = test_write_proposed_projection(M0),
{ok, P1} = test_projection_calculation(M0), %% {ok, P1} = test_calc_projection(M0),
P1a = P1#projection{down=[b], upi=[a], dbg=[{hackhack, ?LINE}]}, %% P1a = P1#projection{down=[b], upi=[a], dbg=[{hackhack, ?LINE}]},
P1b = P1#projection{down=[a], upi=[b], dbg=[{hackhack, ?LINE}]}, %% P1b = P1#projection{down=[a], upi=[b], dbg=[{hackhack, ?LINE}]},
P1Epoch = P1#projection.epoch_number, %% P1Epoch = P1#projection.epoch_number,
ok = machi_flu0:proj_write(FLUa, P1Epoch, public, P1a), %% ok = machi_flu0:proj_write(FLUa, P1Epoch, public, P1a),
ok = machi_flu0:proj_write(FLUb, P1Epoch, public, P1b), %% ok = machi_flu0:proj_write(FLUb, P1Epoch, public, P1b),
XX = test_cl_read_public_proj(M0), %% XX = test_cl_read_public_proj(M0),
?D(XX) %% ?D(XX)
after %% after
ok = ?MGR:stop(M0), %% ok = ?MGR:stop(M0),
ok = machi_flu0:stop(FLUa), %% ok = machi_flu0:stop(FLUa),
ok = machi_flu0:stop(FLUb) %% ok = machi_flu0:stop(FLUb)
end. %% end.
-endif. -endif.