WIP: chain mgmt prototype scaffolding 5: before refactor & continuing

This commit is contained in:
Scott Lystig Fritchie 2014-10-28 14:22:24 +09:00
parent b757878c81
commit e5b9230af0

View file

@ -30,7 +30,6 @@
%% API %% API
-export([start_link/6, stop/1, ping/1, -export([start_link/6, stop/1, ping/1,
calculate_projection_internal_old/1, calculate_projection_internal_old/1,
sync_projection_calculation/1,
cl_write_current_projection/1]). cl_write_current_projection/1]).
-export([trigger_projection_calculation/1]). -export([trigger_projection_calculation/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
@ -39,7 +38,7 @@
-export([make_projection_summary/1]). -export([make_projection_summary/1]).
-ifdef(TEST). -ifdef(TEST).
-export([test_cl_read_public_proj/1]). -export([test_projection_calculation/1, test_cl_read_public_proj/1]).
-ifdef(EQC). -ifdef(EQC).
-include_lib("eqc/include/eqc.hrl"). -include_lib("eqc/include/eqc.hrl").
@ -68,9 +67,6 @@ ping(Pid) ->
calculate_projection_internal_old(Pid) -> calculate_projection_internal_old(Pid) ->
gen_server:call(Pid, {calculate_projection_internal_old}, infinity). gen_server:call(Pid, {calculate_projection_internal_old}, infinity).
sync_projection_calculation(Pid) ->
gen_server:call(Pid, {sync_projection_calculation}, infinity).
cl_write_current_projection(Pid) -> cl_write_current_projection(Pid) ->
gen_server:call(Pid, {cl_write_current_projection}, infinity). gen_server:call(Pid, {cl_write_current_projection}, infinity).
@ -78,6 +74,9 @@ trigger_projection_calculation(Pid) ->
gen_server:cast(Pid, {trigger_projection_calculation}). gen_server:cast(Pid, {trigger_projection_calculation}).
-ifdef(TEST). -ifdef(TEST).
test_projection_calculation(Pid) ->
gen_server:call(Pid, {test_projection_calculation}, infinity).
test_cl_read_public_proj(Pid) -> test_cl_read_public_proj(Pid) ->
gen_server:call(Pid, {test_cl_read_public_proj}, infinity). gen_server:call(Pid, {test_cl_read_public_proj}, infinity).
-endif. % TEST -endif. % TEST
@ -99,9 +98,6 @@ init({MyName, All_list, Seed, OldThreshold, NoPartitionThreshold, MyFLUPid}) ->
handle_call({calculate_projection_internal_old}, _From, S) -> handle_call({calculate_projection_internal_old}, _From, S) ->
{Reply, S2} = calc_projection(S, [{author_proc, call}]), {Reply, S2} = calc_projection(S, [{author_proc, call}]),
{reply, Reply, S2}; {reply, Reply, S2};
handle_call({sync_projection_calculation}, _From, S) ->
{P, S2} = calc_projection(S, [{author_proc, cast}]),
{reply, {ok, P}, S2};
handle_call({cl_write_current_projection}, _From, S) -> handle_call({cl_write_current_projection}, _From, S) ->
{Res, S2} = do_cl_write_current_proj(S), {Res, S2} = do_cl_write_current_proj(S),
{reply, Res, S2}; {reply, Res, S2};
@ -109,8 +105,11 @@ handle_call({ping}, _From, S) ->
{reply, pong, S}; {reply, pong, S};
handle_call({stop}, _From, S) -> handle_call({stop}, _From, S) ->
{stop, normal, ok, S}; {stop, normal, ok, S};
handle_call({test_projection_calculation}, _From, S) ->
{P, S2} = calc_projection(S, [{author_proc, cast}]),
{reply, {ok, P}, S2};
handle_call({test_cl_read_public_proj}, _From, S) -> handle_call({test_cl_read_public_proj}, _From, S) ->
{Res, _DiscardS2} = cl_read_public_proj(S), {Res, _DiscardS2} = cl_read_public_proj(S, true),
{reply, Res, S}; {reply, Res, S};
handle_call(_Call, _From, S) -> handle_call(_Call, _From, S) ->
{reply, whaaaaaaaaaa, S}. {reply, whaaaaaaaaaa, S}.
@ -138,7 +137,7 @@ do_cl_write_current_proj(#ch_mgr{proj=Proj, myflu=MyFLU} = S) ->
#projection{epoch_number=Epoch} = Proj, #projection{epoch_number=Epoch} = Proj,
case cl_write_public_proj(Epoch, Proj, S) of case cl_write_public_proj(Epoch, Proj, S) of
{ok, S2} -> {ok, S2} ->
case cl_read_public_proj(S2) of case cl_read_public_proj(S2, true) of
{ok, Proj2, S3} -> {ok, Proj2, S3} ->
%% ?D(Proj2), %% ?D(Proj2),
%% ?D(machi_flu0:get_epoch(MyFLU)), %% ?D(machi_flu0:get_epoch(MyFLU)),
@ -188,7 +187,7 @@ cl_write_public_proj_remote(FLUs, Partitions, Epoch, Proj, S) ->
{{mixed_bag, lists:zip(FLUs, Rs)}, S} {{mixed_bag, lists:zip(FLUs, Rs)}, S}
end. end.
cl_read_public_proj(#ch_mgr{proj=Proj}=S) -> cl_read_public_proj(#ch_mgr{proj=Proj}=S, ReadRepairP) ->
#projection{all_members=All_list} = Proj, #projection{all_members=All_list} = Proj,
{_UpNodes, Partitions, S2} = calc_up_nodes(S), {_UpNodes, Partitions, S2} = calc_up_nodes(S),
DoIt = fun(X) -> DoIt = fun(X) ->
@ -203,9 +202,11 @@ cl_read_public_proj(#ch_mgr{proj=Proj}=S) ->
case lists:usort(Rs) of case lists:usort(Rs) of
[P] when is_record(P, projection) -> [P] when is_record(P, projection) ->
{ok, P, S2}; {ok, P, S2};
_ -> _ when ReadRepairP ->
FLUsRs = lists:zip(All_list, Rs), FLUsRs = lists:zip(All_list, Rs),
rank_and_repair_instructions(FLUsRs, Proj) rank_and_repair_instructions(FLUsRs, Proj);
_ when not ReadRepairP ->
{error_not_unanimous, undefined_todo, S2} % todo?
end. end.
make_initial_projection(MyName, All_list, UPI_list, Repairing_list, Ps) -> make_initial_projection(MyName, All_list, UPI_list, Repairing_list, Ps) ->
@ -356,18 +357,40 @@ rank_and_repair_instructions([_|_] = FLUsRs, Proj) ->
MemberRank = orddict:from_list( MemberRank = orddict:from_list(
lists:zip(All_list, lists:seq(1, length(All_list)))), lists:zip(All_list, lists:seq(1, length(All_list)))),
N = length(All_list), N = length(All_list),
rank_and_repair(FLUsRs, Proj, -1, undefined, [], MemberRank, N). TopRank = lists:max([-1] ++
[rank_projection(ProjX, MemberRank, N) ||
{_FLU, ProjX} <- FLUsRs,
is_record(ProjX, projection)]),
rank_and_repair(FLUsRs, Proj, TopRank, undefined, [], MemberRank, N).
rank_and_repair([], _Proj, TopRank, WinningProj, RepairInstr, rank_and_repair([], _Proj, TopRank, WinningProj,
_MemberRank, _N) -> RepairInstr, _MemberRank, _N) ->
{TopRank, WinningProj, RepairInstr}; {TopRank, WinningProj, RepairInstr};
rank_and_repair([Proj|T], OldProj, TopRank, WinningProj, RepairInstr, rank_and_repair([{FLU, Proj}|T], OldProj, TopRank, WinningProj,
MemberRank, N) RepairInstr, MemberRank, N)
when is_record(projection, Proj) -> when is_record(Proj, projection) ->
AuthorRank = orddict:fetch(Proj#projection.author_server, MemberRank), case rank_projection(Proj, MemberRank, N) of
left off here.......... Rank when Rank == TopRank ->
rank_and_repair(T, OldProj, TopRank, Proj,
RepairInstr, MemberRank, N);
_Rank ->
rank_and_repair(T, OldProj, TopRank, WinningProj,
[FLU|RepairInstr], MemberRank, N)
end;
rank_and_repair([{FLU, _SomeError}|T], OldProj, TopRank, WinningProj,
RepairInstr, MemberRank, N) ->
?D({?LINE, _SomeError}),
rank_and_repair(T, OldProj, TopRank, WinningProj,
[FLU|RepairInstr], MemberRank, N).
rank_projection(#projection{author_server=Author,
upi=UPI_list,
repairing=Repairing_list}, MemberRank, N) ->
AuthorRank = orddict:fetch(Author, MemberRank),
AuthorRank +
(1*N + length(Repairing_list)) +
(2*N + length(UPI_list)).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
@ -442,7 +465,7 @@ smoke1_test() ->
ok = cl_write_current_projection(M0), ok = cl_write_current_projection(M0),
_ = trigger_projection_calculation(M0), _ = trigger_projection_calculation(M0),
pong = ping(M0), pong = ping(M0),
_Pa = sync_projection_calculation(M0), _Pa = test_projection_calculation(M0),
ok ok
after after
ok = ?MGR:stop(M0), ok = ?MGR:stop(M0),
@ -458,7 +481,7 @@ nonunanimous_read_setup_test() ->
{ok, M0} = ?MGR:start_link(I_represent, [a,b], {1,2,3}, 50, 50, I_am), {ok, M0} = ?MGR:start_link(I_represent, [a,b], {1,2,3}, 50, 50, I_am),
try try
ok = cl_write_current_projection(M0), ok = cl_write_current_projection(M0),
{ok, P1} = sync_projection_calculation(M0), {ok, P1} = test_projection_calculation(M0),
P1a = P1#projection{down=[b], upi=[a], dbg=[{hackhack, ?LINE}]}, P1a = P1#projection{down=[b], upi=[a], dbg=[{hackhack, ?LINE}]},
P1b = P1#projection{down=[a], upi=[b], dbg=[{hackhack, ?LINE}]}, P1b = P1#projection{down=[a], upi=[b], dbg=[{hackhack, ?LINE}]},
P1Epoch = P1#projection.epoch_number, P1Epoch = P1#projection.epoch_number,