WIP: chain mgmt prototype scaffolding 4: uncompileable at the moment

This commit is contained in:
Scott Lystig Fritchie 2014-10-27 21:30:10 +09:00
parent dfbbaf6bfe
commit b757878c81

View file

@ -25,18 +25,21 @@
-include("machi.hrl").
-define(D(X), io:format(user, "~s ~p\n", [??X, X])).
-define(Dw(X), io:format(user, "~s ~w\n", [??X, X])).
%% API
-export([start_link/6, stop/1, ping/1,
calculate_projection_internal_old/1,
sync_projection_calculation/1,
cl_write_current_projection/1]).
-export([trigger_projection_calculation/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-export([calc_projection/3, make_projection_summary/1]).
-export([make_projection_summary/1]).
-ifdef(TEST).
-export([test_cl_read_public_proj/1]).
-ifdef(EQC).
-include_lib("eqc/include/eqc.hrl").
@ -65,12 +68,20 @@ ping(Pid) ->
calculate_projection_internal_old(Pid) ->
gen_server:call(Pid, {calculate_projection_internal_old}, infinity).
sync_projection_calculation(Pid) ->
gen_server:call(Pid, {sync_projection_calculation}, infinity).
cl_write_current_projection(Pid) ->
gen_server:call(Pid, {cl_write_current_projection}, infinity).
trigger_projection_calculation(Pid) ->
gen_server:cast(Pid, {trigger_projection_calculation}).
-ifdef(TEST).
test_cl_read_public_proj(Pid) ->
gen_server:call(Pid, {test_cl_read_public_proj}, infinity).
-endif. % TEST
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
init({MyName, All_list, Seed, OldThreshold, NoPartitionThreshold, MyFLUPid}) ->
@ -85,12 +96,12 @@ init({MyName, All_list, Seed, OldThreshold, NoPartitionThreshold, MyFLUPid}) ->
runenv=RunEnv},
{ok, S}.
handle_call({calculate_projection_internal_old}, _From,
#ch_mgr{runenv=RunEnv}=S) ->
OldThreshold = proplists:get_value(old_threshold, RunEnv),
NoPartitionThreshold = proplists:get_value(no_partition_threshold, RunEnv),
{Reply, S2} = calc_projection(OldThreshold, NoPartitionThreshold, S),
handle_call({calculate_projection_internal_old}, _From, S) ->
{Reply, S2} = calc_projection(S, [{author_proc, call}]),
{reply, Reply, S2};
handle_call({sync_projection_calculation}, _From, S) ->
{P, S2} = calc_projection(S, [{author_proc, cast}]),
{reply, {ok, P}, S2};
handle_call({cl_write_current_projection}, _From, S) ->
{Res, S2} = do_cl_write_current_proj(S),
{reply, Res, S2};
@ -98,9 +109,16 @@ handle_call({ping}, _From, S) ->
{reply, pong, S};
handle_call({stop}, _From, S) ->
{stop, normal, ok, S};
handle_call({test_cl_read_public_proj}, _From, S) ->
{Res, _DiscardS2} = cl_read_public_proj(S),
{reply, Res, S};
handle_call(_Call, _From, S) ->
{reply, whaaaaaaaaaa, S}.
handle_cast({trigger_projection_calculation}, S) ->
{P, S2} = calc_projection(S, [{author_proc, cast}]),
?D({todododo,make_projection_summary(P)}),
{noreply, S2};
handle_cast(_Cast, S) ->
?D({cast_whaaaaaaaaaaa, _Cast}),
{noreply, S}.
@ -151,7 +169,6 @@ cl_write_public_proj_local(Epoch, Proj, #ch_mgr{myflu=MyFLU}=S) ->
fun() -> machi_flu0:proj_write(MyFLU, Epoch, public, Proj) end),
case Res0 of
ok ->
%% todo
FLUs = Proj#projection.all_members -- [MyFLU],
cl_write_public_proj_remote(FLUs, Partitions, Epoch, Proj, S);
Else when Else == error_written; Else == timeout; Else == t_timeout ->
@ -171,8 +188,8 @@ cl_write_public_proj_remote(FLUs, Partitions, Epoch, Proj, S) ->
{{mixed_bag, lists:zip(FLUs, Rs)}, S}
end.
cl_read_public_proj(#ch_mgr{proj=Proj0}=S) ->
#projection{all_members=All_list} = Proj0,
cl_read_public_proj(#ch_mgr{proj=Proj}=S) ->
#projection{all_members=All_list} = Proj,
{_UpNodes, Partitions, S2} = calc_up_nodes(S),
DoIt = fun(X) ->
case machi_flu0:proj_read_latest(X, public) of
@ -182,10 +199,13 @@ cl_read_public_proj(#ch_mgr{proj=Proj0}=S) ->
end,
Rs = [perhaps_call_t(S, Partitions, FLU, fun() -> DoIt(FLU) end) ||
FLU <- All_list],
%% todo fix the disparity between the return tuples below
case lists:usort(Rs) of
[P] when is_record(P, projection) ->
{ok, S#ch_mgr.proj, S2}
%% todo finish other cases here.
{ok, P, S2};
_ ->
FLUsRs = lists:zip(All_list, Rs),
rank_and_repair_instructions(FLUsRs, Proj)
end.
make_initial_projection(MyName, All_list, UPI_list, Repairing_list, Ps) ->
@ -222,15 +242,16 @@ make_projection(EpochNum, PrevEpochNum, PrevEpochCSum,
update_projection_dbg2(P, Dbg2) when is_list(Dbg2) ->
P#projection{dbg2=Dbg2}.
calc_projection(#ch_mgr{proj=LastProj, runenv=RunEnv} = S, Dbg) ->
OldThreshold = proplists:get_value(old_threshold, RunEnv),
NoPartitionThreshold = proplists:get_value(no_partition_threshold, RunEnv),
calc_projection(OldThreshold, NoPartitionThreshold, LastProj, Dbg, S).
%% OldThreshold: Percent chance of using the old/previous network partition list
%% NoPartitionThreshold: If the network partition changes, what are the odds
%% that there are no partitions at all?
calc_projection(OldThreshold, NoPartitionThreshold,
#ch_mgr{proj=LastProj} = S) ->
calc_projection(OldThreshold, NoPartitionThreshold, LastProj, S).
calc_projection(OldThreshold, NoPartitionThreshold, LastProj,
calc_projection(OldThreshold, NoPartitionThreshold, LastProj, Dbg,
#ch_mgr{name=MyName, runenv=RunEnv1} = S) ->
#projection{epoch_number=OldEpochNum,
epoch_csum=OldEpochCsum,
@ -268,7 +289,7 @@ calc_projection(OldThreshold, NoPartitionThreshold, LastProj,
Repairing_list5 = Repairing_list4 -- Down,
P = make_projection(OldEpochNum + 1, OldEpochNum, OldEpochCsum,
MyName, All_list, Down, NewUPI_list3, Repairing_list5,
[goo]),
Dbg),
{P, S#ch_mgr{runenv=RunEnv3}}.
calc_up_nodes(#ch_mgr{name=MyName, proj=Proj, runenv=RunEnv1}=S) ->
@ -330,6 +351,24 @@ roll_dice(N, RunEnv) ->
{Val, Seed2} = random:uniform_s(N, Seed1),
{Val, replace(RunEnv, [{seed, Seed2}])}.
rank_and_repair_instructions([_|_] = FLUsRs, Proj) ->
#projection{all_members=All_list} = Proj,
MemberRank = orddict:from_list(
lists:zip(All_list, lists:seq(1, length(All_list)))),
N = length(All_list),
rank_and_repair(FLUsRs, Proj, -1, undefined, [], MemberRank, N).
rank_and_repair([], _Proj, TopRank, WinningProj, RepairInstr,
_MemberRank, _N) ->
{TopRank, WinningProj, RepairInstr};
rank_and_repair([Proj|T], OldProj, TopRank, WinningProj, RepairInstr,
MemberRank, N)
when is_record(projection, Proj) ->
AuthorRank = orddict:fetch(Proj#projection.author_server, MemberRank),
left off here..........
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
make_network_partition_locations(Nodes, Seed1) ->
@ -401,8 +440,10 @@ smoke1_test() ->
{ok, M0} = ?MGR:start_link(I_represent, [a,b,c], {1,2,3}, 50, 50, I_am),
try
ok = cl_write_current_projection(M0),
trigger_projection_calculation(M0),
pong = ping(M0)
_ = trigger_projection_calculation(M0),
pong = ping(M0),
_Pa = sync_projection_calculation(M0),
ok
after
ok = ?MGR:stop(M0),
ok = machi_flu0:stop(FLUa),
@ -410,4 +451,25 @@ smoke1_test() ->
ok = machi_flu0:stop(FLUc)
end.
nonunanimous_read_setup_test() ->
{ok, FLUa} = machi_flu0:start_link(a),
{ok, FLUb} = machi_flu0:start_link(b),
I_represent = I_am = a,
{ok, M0} = ?MGR:start_link(I_represent, [a,b], {1,2,3}, 50, 50, I_am),
try
ok = cl_write_current_projection(M0),
{ok, P1} = sync_projection_calculation(M0),
P1a = P1#projection{down=[b], upi=[a], dbg=[{hackhack, ?LINE}]},
P1b = P1#projection{down=[a], upi=[b], dbg=[{hackhack, ?LINE}]},
P1Epoch = P1#projection.epoch_number,
ok = machi_flu0:proj_write(FLUa, P1Epoch, public, P1a),
ok = machi_flu0:proj_write(FLUb, P1Epoch, public, P1b),
XX = test_cl_read_public_proj(M0),
?D(XX)
after
ok = ?MGR:stop(M0),
ok = machi_flu0:stop(FLUa),
ok = machi_flu0:stop(FLUb)
end.
-endif.