From e5b9230af0c77b8686b602816e2646c9fffc91e5 Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Tue, 28 Oct 2014 14:22:24 +0900 Subject: [PATCH] WIP: chain mgmt prototype scaffolding 5: before refactor & continuing --- .../poc-machi/src/machi_chain_manager1.erl | 71 ++++++++++++------- 1 file changed, 47 insertions(+), 24 deletions(-) diff --git a/prototype/poc-machi/src/machi_chain_manager1.erl b/prototype/poc-machi/src/machi_chain_manager1.erl index 988d5fc..9ed5028 100644 --- a/prototype/poc-machi/src/machi_chain_manager1.erl +++ b/prototype/poc-machi/src/machi_chain_manager1.erl @@ -30,7 +30,6 @@ %% API -export([start_link/6, stop/1, ping/1, calculate_projection_internal_old/1, - sync_projection_calculation/1, cl_write_current_projection/1]). -export([trigger_projection_calculation/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -39,7 +38,7 @@ -export([make_projection_summary/1]). -ifdef(TEST). --export([test_cl_read_public_proj/1]). +-export([test_projection_calculation/1, test_cl_read_public_proj/1]). -ifdef(EQC). -include_lib("eqc/include/eqc.hrl"). @@ -68,9 +67,6 @@ ping(Pid) -> calculate_projection_internal_old(Pid) -> gen_server:call(Pid, {calculate_projection_internal_old}, infinity). -sync_projection_calculation(Pid) -> - gen_server:call(Pid, {sync_projection_calculation}, infinity). - cl_write_current_projection(Pid) -> gen_server:call(Pid, {cl_write_current_projection}, infinity). @@ -78,6 +74,9 @@ trigger_projection_calculation(Pid) -> gen_server:cast(Pid, {trigger_projection_calculation}). -ifdef(TEST). +test_projection_calculation(Pid) -> + gen_server:call(Pid, {test_projection_calculation}, infinity). + test_cl_read_public_proj(Pid) -> gen_server:call(Pid, {test_cl_read_public_proj}, infinity). -endif. % TEST @@ -99,9 +98,6 @@ init({MyName, All_list, Seed, OldThreshold, NoPartitionThreshold, MyFLUPid}) -> handle_call({calculate_projection_internal_old}, _From, S) -> {Reply, S2} = calc_projection(S, [{author_proc, call}]), {reply, Reply, S2}; -handle_call({sync_projection_calculation}, _From, S) -> - {P, S2} = calc_projection(S, [{author_proc, cast}]), - {reply, {ok, P}, S2}; handle_call({cl_write_current_projection}, _From, S) -> {Res, S2} = do_cl_write_current_proj(S), {reply, Res, S2}; @@ -109,8 +105,11 @@ handle_call({ping}, _From, S) -> {reply, pong, S}; handle_call({stop}, _From, S) -> {stop, normal, ok, S}; +handle_call({test_projection_calculation}, _From, S) -> + {P, S2} = calc_projection(S, [{author_proc, cast}]), + {reply, {ok, P}, S2}; handle_call({test_cl_read_public_proj}, _From, S) -> - {Res, _DiscardS2} = cl_read_public_proj(S), + {Res, _DiscardS2} = cl_read_public_proj(S, true), {reply, Res, S}; handle_call(_Call, _From, S) -> {reply, whaaaaaaaaaa, S}. @@ -138,7 +137,7 @@ do_cl_write_current_proj(#ch_mgr{proj=Proj, myflu=MyFLU} = S) -> #projection{epoch_number=Epoch} = Proj, case cl_write_public_proj(Epoch, Proj, S) of {ok, S2} -> - case cl_read_public_proj(S2) of + case cl_read_public_proj(S2, true) of {ok, Proj2, S3} -> %% ?D(Proj2), %% ?D(machi_flu0:get_epoch(MyFLU)), @@ -188,7 +187,7 @@ cl_write_public_proj_remote(FLUs, Partitions, Epoch, Proj, S) -> {{mixed_bag, lists:zip(FLUs, Rs)}, S} end. -cl_read_public_proj(#ch_mgr{proj=Proj}=S) -> +cl_read_public_proj(#ch_mgr{proj=Proj}=S, ReadRepairP) -> #projection{all_members=All_list} = Proj, {_UpNodes, Partitions, S2} = calc_up_nodes(S), DoIt = fun(X) -> @@ -203,9 +202,11 @@ cl_read_public_proj(#ch_mgr{proj=Proj}=S) -> case lists:usort(Rs) of [P] when is_record(P, projection) -> {ok, P, S2}; - _ -> + _ when ReadRepairP -> FLUsRs = lists:zip(All_list, Rs), - rank_and_repair_instructions(FLUsRs, Proj) + rank_and_repair_instructions(FLUsRs, Proj); + _ when not ReadRepairP -> + {error_not_unanimous, undefined_todo, S2} % todo? end. make_initial_projection(MyName, All_list, UPI_list, Repairing_list, Ps) -> @@ -356,18 +357,40 @@ rank_and_repair_instructions([_|_] = FLUsRs, Proj) -> MemberRank = orddict:from_list( lists:zip(All_list, lists:seq(1, length(All_list)))), N = length(All_list), - rank_and_repair(FLUsRs, Proj, -1, undefined, [], MemberRank, N). + TopRank = lists:max([-1] ++ + [rank_projection(ProjX, MemberRank, N) || + {_FLU, ProjX} <- FLUsRs, + is_record(ProjX, projection)]), + rank_and_repair(FLUsRs, Proj, TopRank, undefined, [], MemberRank, N). -rank_and_repair([], _Proj, TopRank, WinningProj, RepairInstr, - _MemberRank, _N) -> +rank_and_repair([], _Proj, TopRank, WinningProj, + RepairInstr, _MemberRank, _N) -> {TopRank, WinningProj, RepairInstr}; -rank_and_repair([Proj|T], OldProj, TopRank, WinningProj, RepairInstr, - MemberRank, N) - when is_record(projection, Proj) -> - AuthorRank = orddict:fetch(Proj#projection.author_server, MemberRank), - left off here.......... - +rank_and_repair([{FLU, Proj}|T], OldProj, TopRank, WinningProj, + RepairInstr, MemberRank, N) + when is_record(Proj, projection) -> + case rank_projection(Proj, MemberRank, N) of + Rank when Rank == TopRank -> + rank_and_repair(T, OldProj, TopRank, Proj, + RepairInstr, MemberRank, N); + _Rank -> + rank_and_repair(T, OldProj, TopRank, WinningProj, + [FLU|RepairInstr], MemberRank, N) + end; +rank_and_repair([{FLU, _SomeError}|T], OldProj, TopRank, WinningProj, + RepairInstr, MemberRank, N) -> + ?D({?LINE, _SomeError}), + rank_and_repair(T, OldProj, TopRank, WinningProj, + [FLU|RepairInstr], MemberRank, N). + +rank_projection(#projection{author_server=Author, + upi=UPI_list, + repairing=Repairing_list}, MemberRank, N) -> + AuthorRank = orddict:fetch(Author, MemberRank), + AuthorRank + + (1*N + length(Repairing_list)) + + (2*N + length(UPI_list)). %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -442,7 +465,7 @@ smoke1_test() -> ok = cl_write_current_projection(M0), _ = trigger_projection_calculation(M0), pong = ping(M0), - _Pa = sync_projection_calculation(M0), + _Pa = test_projection_calculation(M0), ok after ok = ?MGR:stop(M0), @@ -458,7 +481,7 @@ nonunanimous_read_setup_test() -> {ok, M0} = ?MGR:start_link(I_represent, [a,b], {1,2,3}, 50, 50, I_am), try ok = cl_write_current_projection(M0), - {ok, P1} = sync_projection_calculation(M0), + {ok, P1} = test_projection_calculation(M0), P1a = P1#projection{down=[b], upi=[a], dbg=[{hackhack, ?LINE}]}, P1b = P1#projection{down=[a], upi=[b], dbg=[{hackhack, ?LINE}]}, P1Epoch = P1#projection.epoch_number,