diff --git a/prototype/poc-machi/src/machi_chain_manager1.erl b/prototype/poc-machi/src/machi_chain_manager1.erl index 804ed46..988d5fc 100644 --- a/prototype/poc-machi/src/machi_chain_manager1.erl +++ b/prototype/poc-machi/src/machi_chain_manager1.erl @@ -25,18 +25,21 @@ -include("machi.hrl"). -define(D(X), io:format(user, "~s ~p\n", [??X, X])). +-define(Dw(X), io:format(user, "~s ~w\n", [??X, X])). %% API -export([start_link/6, stop/1, ping/1, calculate_projection_internal_old/1, + sync_projection_calculation/1, cl_write_current_projection/1]). -export([trigger_projection_calculation/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([calc_projection/3, make_projection_summary/1]). +-export([make_projection_summary/1]). -ifdef(TEST). +-export([test_cl_read_public_proj/1]). -ifdef(EQC). -include_lib("eqc/include/eqc.hrl"). @@ -65,12 +68,20 @@ ping(Pid) -> calculate_projection_internal_old(Pid) -> gen_server:call(Pid, {calculate_projection_internal_old}, infinity). +sync_projection_calculation(Pid) -> + gen_server:call(Pid, {sync_projection_calculation}, infinity). + cl_write_current_projection(Pid) -> gen_server:call(Pid, {cl_write_current_projection}, infinity). trigger_projection_calculation(Pid) -> gen_server:cast(Pid, {trigger_projection_calculation}). +-ifdef(TEST). +test_cl_read_public_proj(Pid) -> + gen_server:call(Pid, {test_cl_read_public_proj}, infinity). +-endif. % TEST + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% init({MyName, All_list, Seed, OldThreshold, NoPartitionThreshold, MyFLUPid}) -> @@ -85,12 +96,12 @@ init({MyName, All_list, Seed, OldThreshold, NoPartitionThreshold, MyFLUPid}) -> runenv=RunEnv}, {ok, S}. -handle_call({calculate_projection_internal_old}, _From, - #ch_mgr{runenv=RunEnv}=S) -> - OldThreshold = proplists:get_value(old_threshold, RunEnv), - NoPartitionThreshold = proplists:get_value(no_partition_threshold, RunEnv), - {Reply, S2} = calc_projection(OldThreshold, NoPartitionThreshold, S), +handle_call({calculate_projection_internal_old}, _From, S) -> + {Reply, S2} = calc_projection(S, [{author_proc, call}]), {reply, Reply, S2}; +handle_call({sync_projection_calculation}, _From, S) -> + {P, S2} = calc_projection(S, [{author_proc, cast}]), + {reply, {ok, P}, S2}; handle_call({cl_write_current_projection}, _From, S) -> {Res, S2} = do_cl_write_current_proj(S), {reply, Res, S2}; @@ -98,9 +109,16 @@ handle_call({ping}, _From, S) -> {reply, pong, S}; handle_call({stop}, _From, S) -> {stop, normal, ok, S}; +handle_call({test_cl_read_public_proj}, _From, S) -> + {Res, _DiscardS2} = cl_read_public_proj(S), + {reply, Res, S}; handle_call(_Call, _From, S) -> {reply, whaaaaaaaaaa, S}. +handle_cast({trigger_projection_calculation}, S) -> + {P, S2} = calc_projection(S, [{author_proc, cast}]), + ?D({todododo,make_projection_summary(P)}), + {noreply, S2}; handle_cast(_Cast, S) -> ?D({cast_whaaaaaaaaaaa, _Cast}), {noreply, S}. @@ -151,7 +169,6 @@ cl_write_public_proj_local(Epoch, Proj, #ch_mgr{myflu=MyFLU}=S) -> fun() -> machi_flu0:proj_write(MyFLU, Epoch, public, Proj) end), case Res0 of ok -> - %% todo FLUs = Proj#projection.all_members -- [MyFLU], cl_write_public_proj_remote(FLUs, Partitions, Epoch, Proj, S); Else when Else == error_written; Else == timeout; Else == t_timeout -> @@ -171,8 +188,8 @@ cl_write_public_proj_remote(FLUs, Partitions, Epoch, Proj, S) -> {{mixed_bag, lists:zip(FLUs, Rs)}, S} end. -cl_read_public_proj(#ch_mgr{proj=Proj0}=S) -> - #projection{all_members=All_list} = Proj0, +cl_read_public_proj(#ch_mgr{proj=Proj}=S) -> + #projection{all_members=All_list} = Proj, {_UpNodes, Partitions, S2} = calc_up_nodes(S), DoIt = fun(X) -> case machi_flu0:proj_read_latest(X, public) of @@ -182,10 +199,13 @@ cl_read_public_proj(#ch_mgr{proj=Proj0}=S) -> end, Rs = [perhaps_call_t(S, Partitions, FLU, fun() -> DoIt(FLU) end) || FLU <- All_list], + %% todo fix the disparity between the return tuples below case lists:usort(Rs) of [P] when is_record(P, projection) -> - {ok, S#ch_mgr.proj, S2} - %% todo finish other cases here. + {ok, P, S2}; + _ -> + FLUsRs = lists:zip(All_list, Rs), + rank_and_repair_instructions(FLUsRs, Proj) end. make_initial_projection(MyName, All_list, UPI_list, Repairing_list, Ps) -> @@ -222,15 +242,16 @@ make_projection(EpochNum, PrevEpochNum, PrevEpochCSum, update_projection_dbg2(P, Dbg2) when is_list(Dbg2) -> P#projection{dbg2=Dbg2}. +calc_projection(#ch_mgr{proj=LastProj, runenv=RunEnv} = S, Dbg) -> + OldThreshold = proplists:get_value(old_threshold, RunEnv), + NoPartitionThreshold = proplists:get_value(no_partition_threshold, RunEnv), + calc_projection(OldThreshold, NoPartitionThreshold, LastProj, Dbg, S). + %% OldThreshold: Percent chance of using the old/previous network partition list %% NoPartitionThreshold: If the network partition changes, what are the odds %% that there are no partitions at all? -calc_projection(OldThreshold, NoPartitionThreshold, - #ch_mgr{proj=LastProj} = S) -> - calc_projection(OldThreshold, NoPartitionThreshold, LastProj, S). - -calc_projection(OldThreshold, NoPartitionThreshold, LastProj, +calc_projection(OldThreshold, NoPartitionThreshold, LastProj, Dbg, #ch_mgr{name=MyName, runenv=RunEnv1} = S) -> #projection{epoch_number=OldEpochNum, epoch_csum=OldEpochCsum, @@ -268,7 +289,7 @@ calc_projection(OldThreshold, NoPartitionThreshold, LastProj, Repairing_list5 = Repairing_list4 -- Down, P = make_projection(OldEpochNum + 1, OldEpochNum, OldEpochCsum, MyName, All_list, Down, NewUPI_list3, Repairing_list5, - [goo]), + Dbg), {P, S#ch_mgr{runenv=RunEnv3}}. calc_up_nodes(#ch_mgr{name=MyName, proj=Proj, runenv=RunEnv1}=S) -> @@ -330,6 +351,24 @@ roll_dice(N, RunEnv) -> {Val, Seed2} = random:uniform_s(N, Seed1), {Val, replace(RunEnv, [{seed, Seed2}])}. +rank_and_repair_instructions([_|_] = FLUsRs, Proj) -> + #projection{all_members=All_list} = Proj, + MemberRank = orddict:from_list( + lists:zip(All_list, lists:seq(1, length(All_list)))), + N = length(All_list), + rank_and_repair(FLUsRs, Proj, -1, undefined, [], MemberRank, N). + +rank_and_repair([], _Proj, TopRank, WinningProj, RepairInstr, + _MemberRank, _N) -> + {TopRank, WinningProj, RepairInstr}; +rank_and_repair([Proj|T], OldProj, TopRank, WinningProj, RepairInstr, + MemberRank, N) + when is_record(projection, Proj) -> + AuthorRank = orddict:fetch(Proj#projection.author_server, MemberRank), + left off here.......... + + + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% make_network_partition_locations(Nodes, Seed1) -> @@ -401,8 +440,10 @@ smoke1_test() -> {ok, M0} = ?MGR:start_link(I_represent, [a,b,c], {1,2,3}, 50, 50, I_am), try ok = cl_write_current_projection(M0), - trigger_projection_calculation(M0), - pong = ping(M0) + _ = trigger_projection_calculation(M0), + pong = ping(M0), + _Pa = sync_projection_calculation(M0), + ok after ok = ?MGR:stop(M0), ok = machi_flu0:stop(FLUa), @@ -410,4 +451,25 @@ smoke1_test() -> ok = machi_flu0:stop(FLUc) end. +nonunanimous_read_setup_test() -> + {ok, FLUa} = machi_flu0:start_link(a), + {ok, FLUb} = machi_flu0:start_link(b), + I_represent = I_am = a, + {ok, M0} = ?MGR:start_link(I_represent, [a,b], {1,2,3}, 50, 50, I_am), + try + ok = cl_write_current_projection(M0), + {ok, P1} = sync_projection_calculation(M0), + P1a = P1#projection{down=[b], upi=[a], dbg=[{hackhack, ?LINE}]}, + P1b = P1#projection{down=[a], upi=[b], dbg=[{hackhack, ?LINE}]}, + P1Epoch = P1#projection.epoch_number, + ok = machi_flu0:proj_write(FLUa, P1Epoch, public, P1a), + ok = machi_flu0:proj_write(FLUb, P1Epoch, public, P1b), + XX = test_cl_read_public_proj(M0), + ?D(XX) + after + ok = ?MGR:stop(M0), + ok = machi_flu0:stop(FLUa), + ok = machi_flu0:stop(FLUb) + end. + -endif.