WIP: chain mgmt prototype scaffolding

This commit is contained in:
Scott Lystig Fritchie 2014-10-27 18:52:42 +09:00
parent 3e499e241a
commit 9c04537497
2 changed files with 95 additions and 26 deletions

View file

@ -27,8 +27,8 @@
-define(D(X), io:format(user, "~s ~p\n", [??X, X])).
%% API
-export([start_link/4, stop/1,
calculate_projection_internal_old/3,
-export([start_link/6, stop/1,
calculate_projection_internal_old/1,
cl_write_current_projection/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@ -48,23 +48,29 @@
-compile(export_all).
-endif. %TEST
start_link(MyName, All_list, Seed, MyFLUPid) ->
gen_server:start_link(?MODULE, {MyName, All_list, Seed, MyFLUPid}, []).
start_link(MyName, All_list, Seed,
OldThreshold, NoPartitionThreshold,
MyFLUPid) ->
gen_server:start_link(?MODULE, {MyName, All_list, Seed,
OldThreshold, NoPartitionThreshold,
MyFLUPid}, []).
stop(Pid) ->
gen_server:call(Pid, {stop}, infinity).
calculate_projection_internal_old(Pid, OldThreshold, NoPartitionThreshold) ->
gen_server:call(Pid, {calculate_projection_internal_old, OldThreshold, NoPartitionThreshold}, infinity).
calculate_projection_internal_old(Pid) ->
gen_server:call(Pid, {calculate_projection_internal_old}, infinity).
cl_write_current_projection(Pid) ->
gen_server:call(Pid, {cl_write_current_projection}, infinity).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
init({MyName, All_list, Seed, MyFLUPid}) ->
init({MyName, All_list, Seed, OldThreshold, NoPartitionThreshold, MyFLUPid}) ->
RunEnv = [{seed, Seed},
{network_partitions, []}],
{network_partitions, []},
{old_threshold, OldThreshold},
{no_partition_threshold, NoPartitionThreshold}],
S = #ch_mgr{name=MyName,
proj=make_initial_projection(MyName, All_list, All_list,
[], []),
@ -72,7 +78,10 @@ init({MyName, All_list, Seed, MyFLUPid}) ->
runenv=RunEnv},
{ok, S}.
handle_call({calculate_projection_internal_old, OldThreshold, NoPartitionThreshold}, _From, S) ->
handle_call({calculate_projection_internal_old}, _From,
#ch_mgr{runenv=RunEnv}=S) ->
OldThreshold = proplists:get_value(old_threshold, RunEnv),
NoPartitionThreshold = proplists:get_value(no_partition_threshold, RunEnv),
{Reply, S2} = calc_projection(OldThreshold, NoPartitionThreshold, S),
{reply, Reply, S2};
handle_call({cl_write_current_projection}, _From, S) ->
@ -113,14 +122,36 @@ do_cl_write_current_proj(#ch_mgr{proj=Proj} = S) ->
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
cl_write_current_proj(#ch_mgr{proj=Proj, myflu=MyFLU, runenv=RunEnv1} = S) ->
#projection{epoch_number=Epoch} = Proj,
Epoch=Epoch,MyFLU=MyFLU,RunEnv1=RunEnv1,
{todo, S}.
cl_write_public_proj(Epoch, Proj, S) ->
cl_write_public_proj_local(Epoch, Proj, S).
cl_write_public_proj(_Epoch, _Proj, S) ->
%% todo
{ok, S}.
cl_write_public_proj_local(Epoch, Proj, #ch_mgr{myflu=MyFLU}=S) ->
{_UpNodes, Partitions, S2} = calc_up_nodes(S),
Res0 = perhaps_call_t(
S, Partitions, MyFLU,
fun() -> machi_flu0:proj_write(MyFLU, Epoch, public, Proj) end),
case Res0 of
ok ->
%% todo
FLUs = Proj#projection.all_members -- [MyFLU],
cl_write_public_proj_remote(FLUs, Partitions, Epoch, Proj, S);
Else when Else == error_written; Else == timeout; Else == t_timeout ->
{Else, S2}
end.
cl_write_public_proj_remote(FLUs, Partitions, Epoch, Proj, S) ->
%% We're going to be very cavalier about this write because we'll rely
%% on the read side to do any read repair.
Rs =
[perhaps_call_t(S, Partitions, FLU,
fun() -> machi_flu0:proj_write(FLU, Epoch, public, Proj) end) ||
FLU <- FLUs],
case lists:usort(Rs) of
[ok] ->
{ok, S};
_ ->
{{mixed_bag, lists:zip(FLUs, Rs)}, S}
end.
cl_read_public_proj(S) ->
%% todo
@ -164,8 +195,8 @@ calc_projection(OldThreshold, NoPartitionThreshold, LastProj,
} = LastProj,
LastUp = lists:usort(OldUPI_list ++ OldRepairing_list),
AllMembers = (S#ch_mgr.proj)#projection.all_members,
{Up, RunEnv2} = calc_up_nodes(MyName, OldThreshold, NoPartitionThreshold,
AllMembers, RunEnv1),
{Up, _, RunEnv2} = calc_up_nodes(MyName, OldThreshold, NoPartitionThreshold,
AllMembers, RunEnv1),
NewUp = Up -- LastUp,
Down = AllMembers -- Up,
@ -195,6 +226,15 @@ calc_projection(OldThreshold, NoPartitionThreshold, LastProj,
[goo]),
{P, S#ch_mgr{runenv=RunEnv3}}.
calc_up_nodes(#ch_mgr{name=MyName, proj=Proj, runenv=RunEnv1}=S) ->
OldThreshold = proplists:get_value(old_threshold, RunEnv1),
NoPartitionThreshold = proplists:get_value(no_partition_threshold, RunEnv1),
AllMembers = Proj#projection.all_members,
{UpNodes, Partitions, RunEnv2} =
calc_up_nodes(MyName, OldThreshold, NoPartitionThreshold,
AllMembers, RunEnv1),
{UpNodes, Partitions, S#ch_mgr{runenv=RunEnv2}}.
calc_up_nodes(MyName, OldThreshold, NoPartitionThreshold,
AllMembers, RunEnv1) ->
Seed1 = proplists:get_value(seed, RunEnv1),
@ -208,7 +248,7 @@ calc_up_nodes(MyName, OldThreshold, NoPartitionThreshold,
not lists:member({Node, MyName}, Partitions2)]),
RunEnv2 = replace(RunEnv1,
[{seed, Seed2}, {network_partitions, Partitions2}]),
{UpNodes, RunEnv2}.
{UpNodes, Partitions2, RunEnv2}.
calc_network_partitions(Nodes, Seed1, OldPartition,
@ -267,29 +307,58 @@ make_all_pairs2([_]) ->
make_all_pairs2([H1|T]) ->
[[{H1, X}, {X, H1}] || X <- T] ++ make_all_pairs(T).
perhaps_call_t(S, Partitions, FLU, DoIt) ->
try
perhaps_call(S, Partitions, FLU, DoIt)
catch
exit:timeout ->
t_timeout
end.
perhaps_call(#ch_mgr{name=MyName}, Partitions, FLU, DoIt) ->
case lists:keyfind(FLU, 1, Partitions) of
false ->
Res = DoIt(),
case lists:keyfind(MyName, 2, Partitions) of
false ->
Res;
_ ->
exit(timeout)
end;
_ ->
exit(timeout)
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-ifdef(TEST).
-define(MGR, machi_chain_manager1).
smoke0_test() ->
{ok, M0} = ?MGR:start_link(a, [a,b,c], {1,2,3}, a),
{ok, M0} = ?MGR:start_link(a, [a,b,c], {1,2,3}, 50, 50, a),
try
[begin
Proj = ?MGR:calculate_projection_internal_old(M0, 50, 50),
Proj = ?MGR:calculate_projection_internal_old(M0),
io:format(user, "~p\n", [?MGR:make_projection_summary(Proj)])
end || _ <- lists:seq(1,5)],
?D(cl_write_current_projection(M0))
end || _ <- lists:seq(1,5)]
after
ok = ?MGR:stop(M0)
end.
smoke1_test() ->
{ok, FLUa} = machi_flu0:start_link(a),
{ok, M0} = ?MGR:start_link(a, [a,b,c], {1,2,3}, FLUa),
{ok, FLUb} = machi_flu0:start_link(b),
{ok, FLUc} = machi_flu0:start_link(c),
I_represent = I_am = a,
{ok, M0} = ?MGR:start_link(I_represent, [a,b,c], {1,2,3}, 50, 50, I_am),
try
ok = cl_write_current_projection(M0)
after
ok = ?MGR:stop(M0)
ok = ?MGR:stop(M0),
ok = machi_flu0:stop(FLUa),
ok = machi_flu0:stop(FLUb),
ok = machi_flu0:stop(FLUc)
end.
-endif.

View file

@ -140,7 +140,7 @@ find_common_prefix(_, _) ->
[].
calc_projection_test_() ->
Runtime = 15, %% Runtime = 60*60,
Runtime = 2, %% Runtime = 60*60,
{timeout, Runtime * 500,
fun() ->
Nodes = [a,b,c],