diff --git a/prototype/poc-machi/include/machi.hrl b/prototype/poc-machi/include/machi.hrl index 29542e0..2f52c3e 100644 --- a/prototype/poc-machi/include/machi.hrl +++ b/prototype/poc-machi/include/machi.hrl @@ -51,7 +51,7 @@ init_finished :: boolean(), name :: m_server(), proj :: #projection{}, - proj_proposed :: #projection{}, + proj_proposed :: 'none' | #projection{}, myflu :: pid() | atom(), %% runenv :: list() %proplist() diff --git a/prototype/poc-machi/src/machi_chain_manager1.erl b/prototype/poc-machi/src/machi_chain_manager1.erl index 6ef340b..dce336a 100644 --- a/prototype/poc-machi/src/machi_chain_manager1.erl +++ b/prototype/poc-machi/src/machi_chain_manager1.erl @@ -29,9 +29,7 @@ %% API -export([start_link/6, stop/1, ping/1, - calculate_projection_internal_old/1, - test_write_proposed_projection/1]). -%% -export([]). + calculate_projection_internal_old/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -40,7 +38,8 @@ -ifdef(TEST). -export([test_calc_projection/2, test_calc_proposed_projection/1, - test_cl_read_public_proj/1]). + test_write_proposed_projection/1, + test_read_public_projection/2]). -ifdef(EQC). -include_lib("eqc/include/eqc.hrl"). @@ -88,8 +87,8 @@ test_calc_projection(Pid, KeepRunenvP) -> test_calc_proposed_projection(Pid) -> gen_server:cast(Pid, {test_calc_proposed_projection}). -test_cl_read_public_proj(Pid) -> - gen_server:call(Pid, {test_cl_read_public_proj}, infinity). +test_read_public_projection(Pid, ReadRepairP) -> + gen_server:call(Pid, {test_read_public_projection, ReadRepairP}, infinity). -endif. % TEST %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -117,8 +116,12 @@ handle_call({calculate_projection_internal_old}, _From, S) -> {Reply, S2} = calc_projection(S, [{author_proc, call}]), {reply, Reply, S2}; handle_call({test_write_proposed_projection}, _From, S) -> - {Res, S2} = do_cl_write_current_proj(S), - {reply, Res, S2}; + if S#ch_mgr.proj_proposed == none -> + {reply, none, S}; + true -> + {Res, S2} = do_cl_write_proposed_proj(S), + {reply, Res, S2} + end; handle_call({ping}, _From, S) -> {reply, pong, S}; handle_call({stop}, _From, S) -> @@ -128,9 +131,9 @@ handle_call({test_calc_projection, KeepRunenvP}, _From, S) -> {reply, {ok, P}, if KeepRunenvP -> S2; true -> S end}; -handle_call({test_cl_read_public_proj}, _From, S) -> - {Res, _DiscardS2} = cl_read_public_proj(S, true), - {reply, Res, S}; +handle_call({test_read_public_projection, ReadRepairP}, _From, S) -> + {Res, ExtraInfo, _DiscardS2} = do_cl_read_public_projection(ReadRepairP, S), + {reply, {Res, ExtraInfo}, S}; handle_call(_Call, _From, S) -> {reply, whaaaaaaaaaa, S}. @@ -180,25 +183,11 @@ finish_init(BestProj, #ch_mgr{init_finished=false, myflu=MyFLU} = S) -> exit({yo_weird, Else}) end. -do_cl_write_current_proj(#ch_mgr{proj=Proj, myflu=MyFLU} = S) -> +do_cl_write_proposed_proj(#ch_mgr{proj_proposed=Proj} = S) -> #projection{epoch_number=Epoch} = Proj, case cl_write_public_proj(Epoch, Proj, S) of - {ok, S2} -> - case cl_read_public_proj(S2, true) of - {ok, Proj2, S3} -> - %% ?D(Proj2), - %% ?D(machi_flu0:get_epoch(MyFLU)), - Proj2b = update_projection_dbg2( - Proj2, [{hooray, {date(), time()}}]), - ok = machi_flu0:proj_write(MyFLU, Epoch, private, Proj2b), - MyP = make_projection_summary(element(2, - machi_flu0:proj_read_latest(MyFLU, private))), - ?D(MyP), - ?D(S3#ch_mgr.runenv), - {ok, S3#ch_mgr{proj=Proj2}}; - {_Other3, _S3}=Else3 -> - Else3 - end; + {ok, _S2}=Res -> + Res; {_Other2, _S2}=Else2 -> Else2 end. @@ -206,6 +195,8 @@ do_cl_write_current_proj(#ch_mgr{proj=Proj, myflu=MyFLU} = S) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% cl_write_public_proj(Epoch, Proj, S) -> + %% Write to local public projection store first, and if it succeeds, + %% then write to all remote public projection stores. cl_write_public_proj_local(Epoch, Proj, S). cl_write_public_proj_local(Epoch, Proj, #ch_mgr{myflu=MyFLU}=S) -> @@ -225,16 +216,45 @@ cl_write_public_proj_remote(FLUs, Partitions, Epoch, Proj, S) -> %% We're going to be very cavalier about this write because we'll rely %% on the read side to do any read repair. DoIt = fun(X) -> machi_flu0:proj_write(X, Epoch, public, Proj) end, - Rs = [perhaps_call_t(S, Partitions, FLU, fun() -> DoIt(FLU) end) || + Rs = [{FLU, perhaps_call_t(S, Partitions, FLU, fun() -> DoIt(FLU) end)} || FLU <- FLUs], - case lists:usort(Rs) of - [ok] -> - {ok, S}; - _ -> - {{mixed_bag, lists:zip(FLUs, Rs)}, S} + {{remote_write_results, Rs}, S}. + +do_cl_read_public_projection(ReadRepairP, + #ch_mgr{proj=Proj1, myflu=MyFLU} = S) -> + Epoch1 = Proj1#projection.epoch_number, + case cl_read_public_projection(S) of + {unanimous, Proj2, Extra, S3} when Proj2 == Proj1 -> + {Proj2, Extra, S3}; + {unanimous, #projection{epoch_number=Epoch2}=Proj2, Extra, _S3} + when Epoch2 < Epoch1 orelse + (Epoch2 == Epoch1 andalso Proj2 /= Proj1) -> + exit({invariant_error, mine, Proj1, cl_unanimous, Proj2, extra, Extra}); + {unanimous, #projection{epoch_number=Epoch2}=Proj2, Extra, S3} + when Epoch2 > Epoch1 -> + Proj2b = update_projection_dbg2( + Proj2, [{hooray, {date(), time()}}|Extra]), + ok = machi_flu0:proj_write(MyFLU, Epoch2, private, Proj2b), + {Proj2, Extra, S3#ch_mgr{proj=Proj2b}}; + {needs_work, _FLUsRs, _Extra, _S3}=Else3 -> + if not ReadRepairP -> + Else3; + true -> + %% 1. Do the results contain a projection? + %% perhaps figure that in cl_read_public_projection()? + %% 2. Were there any error_unwritten? + %% 3. Repair the unwritten FLUs. + %% 4. Nothing to do with timeouts, right? They're + %% hopeless for the moment, need to wait. + + %% For read-repair, just choose the best and then brute- + %% force write, don't care about write status, then + %% repeat do_cl_read_public_projection() ?? + Else3 + end end. -cl_read_public_proj(#ch_mgr{proj=Proj}=S, ReadRepairP) -> +cl_read_public_projection(#ch_mgr{proj=Proj}=S) -> #projection{all_members=All_list} = Proj, {_UpNodes, Partitions, S2} = calc_up_nodes(S), DoIt = fun(X) -> @@ -245,15 +265,13 @@ cl_read_public_proj(#ch_mgr{proj=Proj}=S, ReadRepairP) -> end, Rs = [perhaps_call_t(S, Partitions, FLU, fun() -> DoIt(FLU) end) || FLU <- All_list], - %% todo fix the disparity between the return tuples below case lists:usort(Rs) of [P] when is_record(P, projection) -> - {ok, P, S2}; - _ when ReadRepairP -> + Extra = [{all_replied, length(Rs) == length(All_list)}], + {unanimous, P, [{unanimous,true}|Extra], S2}; + _ -> FLUsRs = lists:zip(All_list, Rs), - rank_and_repair_instructions(FLUsRs, Proj); - _ when not ReadRepairP -> - {error_not_unanimous, undefined_todo, S2} % todo? + {needs_work, FLUsRs, [flarfus], S2} % todo? end. make_initial_projection(MyName, All_list, UPI_list, Repairing_list, Ps) -> @@ -497,6 +515,8 @@ smoke0_test() -> {ok, FLUa} = machi_flu0:start_link(a), {ok, M0} = ?MGR:start_link(a, [a,b,c], {1,2,3}, 50, 50, a), try + pong = ping(M0), + [begin Proj = ?MGR:calculate_projection_internal_old(M0), io:format(user, "~w\n", [?MGR:make_projection_summary(Proj)]) @@ -511,7 +531,8 @@ smoke1_test() -> {ok, FLUb} = machi_flu0:start_link(b), {ok, FLUc} = machi_flu0:start_link(c), I_represent = I_am = a, - {ok, M0} = ?MGR:start_link(I_represent, [a,b,c], {1,2,3}, 50, 50, I_am), + %% {ok, M0} = ?MGR:start_link(I_represent, [a,b,c], {1,2,3}, 50, 50, I_am), + {ok, M0} = ?MGR:start_link(I_represent, [a,b,c], {1,2,3}, 0, 100, I_am), try {ok, P1} = test_calc_projection(M0, false), %% ?D(make_projection_summary(P1)), @@ -520,10 +541,11 @@ smoke1_test() -> %% {ok, _Pb} = test_calc_projection(M0, true), %% %% ?D(make_projection_summary(_Pb)), - pong = ping(M0), - - %% _ = test_calc_proposed_projection(M0), - %% ok = test_write_proposed_projection(M0), + _ = test_calc_proposed_projection(M0), + {remote_write_results, [{b,ok},{c,ok}]} = + test_write_proposed_projection(M0), + XX = test_read_public_projection(M0, false), + ?D(XX), ok after @@ -546,8 +568,10 @@ smoke1_test() -> %% P1Epoch = P1#projection.epoch_number, %% ok = machi_flu0:proj_write(FLUa, P1Epoch, public, P1a), %% ok = machi_flu0:proj_write(FLUb, P1Epoch, public, P1b), -%% XX = test_cl_read_public_proj(M0), -%% ?D(XX) +%% XX = test_read_public_projection(M0, false), +%% ?D(XX), +%% XX2 = test_read_public_projection(M0, true), +%% ?D(XX2) %% after %% ok = ?MGR:stop(M0), %% ok = machi_flu0:stop(FLUa),