diff --git a/prototype/poc-machi/include/machi.hrl b/prototype/poc-machi/include/machi.hrl index 2f52c3e..7a1d969 100644 --- a/prototype/poc-machi/include/machi.hrl +++ b/prototype/poc-machi/include/machi.hrl @@ -37,14 +37,14 @@ epoch_csum :: m_csum(), prev_epoch_num :: m_epoch_n(), prev_epoch_csum :: m_csum(), - creation_time :: timestamp(), - author_server :: m_server(), all_members :: [m_server()], down :: [m_server()], + creation_time :: timestamp(), + author_server :: m_server(), upi :: [m_server()], repairing :: [m_server()], - dbg :: list(), %proplist() checksummed - dbg2 :: list() %proplist() not checksummed + dbg :: list(), %proplist(), is checksummed + dbg2 :: list() %proplist(), is not checksummed }). -record(ch_mgr, { diff --git a/prototype/poc-machi/src/machi_chain_manager1.erl b/prototype/poc-machi/src/machi_chain_manager1.erl index dce336a..3d6fe50 100644 --- a/prototype/poc-machi/src/machi_chain_manager1.erl +++ b/prototype/poc-machi/src/machi_chain_manager1.erl @@ -39,7 +39,7 @@ -export([test_calc_projection/2, test_calc_proposed_projection/1, test_write_proposed_projection/1, - test_read_public_projection/2]). + test_read_latest_public_projection/2]). -ifdef(EQC). -include_lib("eqc/include/eqc.hrl"). @@ -87,8 +87,8 @@ test_calc_projection(Pid, KeepRunenvP) -> test_calc_proposed_projection(Pid) -> gen_server:cast(Pid, {test_calc_proposed_projection}). -test_read_public_projection(Pid, ReadRepairP) -> - gen_server:call(Pid, {test_read_public_projection, ReadRepairP}, infinity). +test_read_latest_public_projection(Pid, ReadRepairP) -> + gen_server:call(Pid, {test_read_latest_public_projection, ReadRepairP}, infinity). -endif. % TEST %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -131,9 +131,9 @@ handle_call({test_calc_projection, KeepRunenvP}, _From, S) -> {reply, {ok, P}, if KeepRunenvP -> S2; true -> S end}; -handle_call({test_read_public_projection, ReadRepairP}, _From, S) -> - {Res, ExtraInfo, _DiscardS2} = do_cl_read_public_projection(ReadRepairP, S), - {reply, {Res, ExtraInfo}, S}; +handle_call({test_read_latest_public_projection, ReadRepairP}, _From, S) -> + {Res, ExtraInfo, S2} = do_cl_read_latest_public_projection(ReadRepairP, S), + {reply, {Res, ExtraInfo}, S2}; handle_call(_Call, _From, S) -> {reply, whaaaaaaaaaa, S}. @@ -195,19 +195,31 @@ do_cl_write_proposed_proj(#ch_mgr{proj_proposed=Proj} = S) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% cl_write_public_proj(Epoch, Proj, S) -> + cl_write_public_proj(Epoch, Proj, false, S). + +cl_write_public_proj_skip_local_error(Epoch, Proj, S) -> + cl_write_public_proj(Epoch, Proj, true, S). + +cl_write_public_proj(Epoch, Proj, SkipLocalWriteErrorP, S) -> %% Write to local public projection store first, and if it succeeds, %% then write to all remote public projection stores. - cl_write_public_proj_local(Epoch, Proj, S). + cl_write_public_proj_local(Epoch, Proj, SkipLocalWriteErrorP, S). -cl_write_public_proj_local(Epoch, Proj, #ch_mgr{myflu=MyFLU}=S) -> +cl_write_public_proj_local(Epoch, Proj, SkipLocalWriteErrorP, + #ch_mgr{myflu=MyFLU}=S) -> {_UpNodes, Partitions, S2} = calc_up_nodes(S), Res0 = perhaps_call_t( S, Partitions, MyFLU, fun() -> machi_flu0:proj_write(MyFLU, Epoch, public, Proj) end), + Continue = fun() -> + FLUs = Proj#projection.all_members -- [MyFLU], + cl_write_public_proj_remote(FLUs, Partitions, Epoch, Proj, S) + end, case Res0 of ok -> - FLUs = Proj#projection.all_members -- [MyFLU], - cl_write_public_proj_remote(FLUs, Partitions, Epoch, Proj, S); + Continue(); + _Else when SkipLocalWriteErrorP -> + Continue(); Else when Else == error_written; Else == timeout; Else == t_timeout -> {Else, S2} end. @@ -220,10 +232,10 @@ cl_write_public_proj_remote(FLUs, Partitions, Epoch, Proj, S) -> FLU <- FLUs], {{remote_write_results, Rs}, S}. -do_cl_read_public_projection(ReadRepairP, - #ch_mgr{proj=Proj1, myflu=MyFLU} = S) -> +do_cl_read_latest_public_projection(ReadRepairP, + #ch_mgr{proj=Proj1, myflu=MyFLU} = S) -> Epoch1 = Proj1#projection.epoch_number, - case cl_read_public_projection(S) of + case cl_read_latest_public_projection(S) of {unanimous, Proj2, Extra, S3} when Proj2 == Proj1 -> {Proj2, Extra, S3}; {unanimous, #projection{epoch_number=Epoch2}=Proj2, Extra, _S3} @@ -236,25 +248,16 @@ do_cl_read_public_projection(ReadRepairP, Proj2, [{hooray, {date(), time()}}|Extra]), ok = machi_flu0:proj_write(MyFLU, Epoch2, private, Proj2b), {Proj2, Extra, S3#ch_mgr{proj=Proj2b}}; - {needs_work, _FLUsRs, _Extra, _S3}=Else3 -> + {needs_work, FLUsRs, Extra, S3} -> if not ReadRepairP -> - Else3; + {not_unanimous, [{results, FLUsRs}|Extra], S3}; true -> - %% 1. Do the results contain a projection? - %% perhaps figure that in cl_read_public_projection()? - %% 2. Were there any error_unwritten? - %% 3. Repair the unwritten FLUs. - %% 4. Nothing to do with timeouts, right? They're - %% hopeless for the moment, need to wait. - - %% For read-repair, just choose the best and then brute- - %% force write, don't care about write status, then - %% repeat do_cl_read_public_projection() ?? - Else3 + {_Status, S4} = do_read_repair(FLUsRs, Extra, S3), + {not_unanimous, Extra, S4} end end. -cl_read_public_projection(#ch_mgr{proj=Proj}=S) -> +cl_read_latest_public_projection(#ch_mgr{proj=Proj}=S) -> #projection{all_members=All_list} = Proj, {_UpNodes, Partitions, S2} = calc_up_nodes(S), DoIt = fun(X) -> @@ -274,6 +277,36 @@ cl_read_public_projection(#ch_mgr{proj=Proj}=S) -> {needs_work, FLUsRs, [flarfus], S2} % todo? end. +% {_Status, S4} = do_read_repair(FLUsRs, Extra, S3), + +%% 1. Do the results contain a projection? +%% perhaps figure that in cl_read_latest_public_projection()? +%% 2. Were there any error_unwritten? +%% 3. Repair the unwritten FLUs. +%% 4. Nothing to do with timeouts, right? They're +%% hopeless for the moment, need to wait. +%% +%% For read-repair, just choose the best and then brute- +%% force write, don't care about write status, then +%% repeat do_cl_read_latest_public_projection() ?? + +do_read_repair(FLUsRs, _Extra, #ch_mgr{proj=CurrentProj} = S) -> + Ps = [Proj || {_FLU, Proj} <- FLUsRs, is_record(Proj, projection)], + if Ps == [] -> + {nothing_to_do, S}; + true -> + [{_Rank, BestProj}|_] = rank_and_sort_projections(Ps, CurrentProj), + Epoch = BestProj#projection.epoch_number, + %% We're doing repair, so use the flavor that will skip an + %% error on the local FLU, which if it happens is almost + %% certainly error_written ... that or any other error on + %% the local projection store should not interfere with + %% attempting to write the other projection stores. + {_DontCare, _S2}=Res = cl_write_public_proj_skip_local_error( + Epoch, BestProj, S), + Res + end. + make_initial_projection(MyName, All_list, UPI_list, Repairing_list, Ps) -> make_projection(0, -1, <<>>, MyName, All_list, [], UPI_list, Repairing_list, Ps). @@ -301,9 +334,12 @@ make_projection(EpochNum, PrevEpochNum, PrevEpochCSum, dbg=Dbg, dbg2=[] % always checksums as [] }, + P2 = update_projection_checksum(P), + P2#projection{dbg2=Dbg2}. + +update_projection_checksum(P) -> CSum = crypto:hash(sha, term_to_binary(P)), - P#projection{epoch_csum=CSum, - dbg2=Dbg2}. + P#projection{epoch_csum=CSum}. update_projection_dbg2(P, Dbg2) when is_list(Dbg2) -> P#projection{dbg2=Dbg2}. @@ -417,37 +453,22 @@ roll_dice(N, RunEnv) -> {Val, Seed2} = random:uniform_s(N, Seed1), {Val, replace(RunEnv, [{seed, Seed2}])}. -rank_and_repair_instructions([_|_] = FLUsRs, Proj) -> - #projection{all_members=All_list} = Proj, +rank_and_sort_projections(Ps, CurrentProj) -> + Epoch = lists:max([Proj#projection.epoch_number || Proj <- Ps]), + MaxPs = [Proj || Proj <- Ps, + Proj#projection.epoch_number == Epoch], + %% Sort with highest rank first (custom sort) + lists:sort(fun({RankA,_}, {RankB,_}) -> RankA > RankB end, + rank_projections(MaxPs, CurrentProj)). + +%% Caller must ensure all Projs are of the same epoch number. + +rank_projections(Projs, CurrentProj) -> + #projection{all_members=All_list} = CurrentProj, MemberRank = orddict:from_list( lists:zip(All_list, lists:seq(1, length(All_list)))), N = length(All_list), - TopRank = lists:max([-1] ++ - [rank_projection(ProjX, MemberRank, N) || - {_FLU, ProjX} <- FLUsRs, - is_record(ProjX, projection)]), - rank_and_repair(FLUsRs, Proj, TopRank, undefined, [], MemberRank, N). - -rank_and_repair([], _Proj, TopRank, WinningProj, - RepairInstr, _MemberRank, _N) -> - {TopRank, WinningProj, RepairInstr}; -rank_and_repair([{FLU, Proj}|T], OldProj, TopRank, WinningProj, - RepairInstr, MemberRank, N) - when is_record(Proj, projection) -> - case rank_projection(Proj, MemberRank, N) of - Rank when Rank == TopRank -> - rank_and_repair(T, OldProj, TopRank, Proj, - RepairInstr, MemberRank, N); - _Rank -> - rank_and_repair(T, OldProj, TopRank, WinningProj, - [FLU|RepairInstr], MemberRank, N) - end; -rank_and_repair([{FLU, _SomeError}|T], OldProj, TopRank, WinningProj, - RepairInstr, MemberRank, N) -> - ?D({?LINE, _SomeError}), - rank_and_repair(T, OldProj, TopRank, WinningProj, - [FLU|RepairInstr], MemberRank, N). - + [{rank_projection(Proj, MemberRank, N), Proj} || Proj <- Projs]. rank_projection(#projection{author_server=Author, upi=UPI_list, @@ -534,17 +555,12 @@ smoke1_test() -> %% {ok, M0} = ?MGR:start_link(I_represent, [a,b,c], {1,2,3}, 50, 50, I_am), {ok, M0} = ?MGR:start_link(I_represent, [a,b,c], {1,2,3}, 0, 100, I_am), try - {ok, P1} = test_calc_projection(M0, false), - %% ?D(make_projection_summary(P1)), - %% {ok, _Pa} = test_calc_projection(M0, true), - %% %% ?D(make_projection_summary(_Pa)), - %% {ok, _Pb} = test_calc_projection(M0, true), - %% %% ?D(make_projection_summary(_Pb)), + {ok, _P1} = test_calc_projection(M0, false), _ = test_calc_proposed_projection(M0), - {remote_write_results, [{b,ok},{c,ok}]} = - test_write_proposed_projection(M0), - XX = test_read_public_projection(M0, false), + {remote_write_results, + [{b,ok},{c,ok}]} = test_write_proposed_projection(M0), + XX = test_read_latest_public_projection(M0, false), ?D(XX), ok @@ -555,27 +571,40 @@ smoke1_test() -> ok = machi_flu0:stop(FLUc) end. -%% nonunanimous_read_setup_test() -> -%% {ok, FLUa} = machi_flu0:start_link(a), -%% {ok, FLUb} = machi_flu0:start_link(b), -%% I_represent = I_am = a, -%% {ok, M0} = ?MGR:start_link(I_represent, [a,b], {1,2,3}, 50, 50, I_am), -%% try -%% ok = test_write_proposed_projection(M0), -%% {ok, P1} = test_calc_projection(M0), -%% P1a = P1#projection{down=[b], upi=[a], dbg=[{hackhack, ?LINE}]}, -%% P1b = P1#projection{down=[a], upi=[b], dbg=[{hackhack, ?LINE}]}, -%% P1Epoch = P1#projection.epoch_number, -%% ok = machi_flu0:proj_write(FLUa, P1Epoch, public, P1a), -%% ok = machi_flu0:proj_write(FLUb, P1Epoch, public, P1b), -%% XX = test_read_public_projection(M0, false), -%% ?D(XX), -%% XX2 = test_read_public_projection(M0, true), -%% ?D(XX2) -%% after -%% ok = ?MGR:stop(M0), -%% ok = machi_flu0:stop(FLUa), -%% ok = machi_flu0:stop(FLUb) -%% end. +nonunanimous_read_setup_test() -> + {ok, FLUa} = machi_flu0:start_link(a), + {ok, FLUb} = machi_flu0:start_link(b), + I_represent = I_am = a, + {ok, Ma} = ?MGR:start_link(I_represent, [a,b], {1,2,3}, 0, 100, I_am), + try + {ok, P1} = test_calc_projection(Ma, false), + + P1a = update_projection_checksum( + P1#projection{down=[b], upi=[a], dbg=[{hackhack, ?LINE}]}), + P1b = update_projection_checksum( + P1#projection{author_server=b, creation_time=now(), + down=[a], upi=[b], dbg=[{hackhack, ?LINE}]}), + P1Epoch = P1#projection.epoch_number, + ok = machi_flu0:proj_write(FLUa, P1Epoch, public, P1a), + ok = machi_flu0:proj_write(FLUb, P1Epoch, public, P1b), + + ?D(x), + {not_unanimous, _}=_XX = test_read_latest_public_projection(Ma, false), + ?Dw(_XX), + {not_unanimous, _}=_YY = test_read_latest_public_projection(Ma, true), + %% The read repair here doesn't automatically trigger the creation of + %% a new projection (to try to create a unanimous projection). So + %% we expect nothing to change when called again. + {not_unanimous, _}=_YY = test_read_latest_public_projection(Ma, true), + + ok + + %% XX2 = test_read_public_projection(Ma, true), + %% ?D(XX2) + after + ok = ?MGR:stop(Ma), + ok = machi_flu0:stop(FLUa), + ok = machi_flu0:stop(FLUb) + end. -endif.