WIP: chain mgmt prototype scaffolding 7: inching better

This commit is contained in:
Scott Lystig Fritchie 2014-10-29 19:03:15 +09:00
parent 616a11e230
commit 932b6afb76
2 changed files with 73 additions and 49 deletions

View file

@ -51,7 +51,7 @@
init_finished :: boolean(), init_finished :: boolean(),
name :: m_server(), name :: m_server(),
proj :: #projection{}, proj :: #projection{},
proj_proposed :: #projection{}, proj_proposed :: 'none' | #projection{},
myflu :: pid() | atom(), myflu :: pid() | atom(),
%% %%
runenv :: list() %proplist() runenv :: list() %proplist()

View file

@ -29,9 +29,7 @@
%% API %% API
-export([start_link/6, stop/1, ping/1, -export([start_link/6, stop/1, ping/1,
calculate_projection_internal_old/1, calculate_projection_internal_old/1]).
test_write_proposed_projection/1]).
%% -export([]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]). terminate/2, code_change/3]).
@ -40,7 +38,8 @@
-ifdef(TEST). -ifdef(TEST).
-export([test_calc_projection/2, -export([test_calc_projection/2,
test_calc_proposed_projection/1, test_calc_proposed_projection/1,
test_cl_read_public_proj/1]). test_write_proposed_projection/1,
test_read_public_projection/2]).
-ifdef(EQC). -ifdef(EQC).
-include_lib("eqc/include/eqc.hrl"). -include_lib("eqc/include/eqc.hrl").
@ -88,8 +87,8 @@ test_calc_projection(Pid, KeepRunenvP) ->
test_calc_proposed_projection(Pid) -> test_calc_proposed_projection(Pid) ->
gen_server:cast(Pid, {test_calc_proposed_projection}). gen_server:cast(Pid, {test_calc_proposed_projection}).
test_cl_read_public_proj(Pid) -> test_read_public_projection(Pid, ReadRepairP) ->
gen_server:call(Pid, {test_cl_read_public_proj}, infinity). gen_server:call(Pid, {test_read_public_projection, ReadRepairP}, infinity).
-endif. % TEST -endif. % TEST
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
@ -117,8 +116,12 @@ handle_call({calculate_projection_internal_old}, _From, S) ->
{Reply, S2} = calc_projection(S, [{author_proc, call}]), {Reply, S2} = calc_projection(S, [{author_proc, call}]),
{reply, Reply, S2}; {reply, Reply, S2};
handle_call({test_write_proposed_projection}, _From, S) -> handle_call({test_write_proposed_projection}, _From, S) ->
{Res, S2} = do_cl_write_current_proj(S), if S#ch_mgr.proj_proposed == none ->
{reply, Res, S2}; {reply, none, S};
true ->
{Res, S2} = do_cl_write_proposed_proj(S),
{reply, Res, S2}
end;
handle_call({ping}, _From, S) -> handle_call({ping}, _From, S) ->
{reply, pong, S}; {reply, pong, S};
handle_call({stop}, _From, S) -> handle_call({stop}, _From, S) ->
@ -128,9 +131,9 @@ handle_call({test_calc_projection, KeepRunenvP}, _From, S) ->
{reply, {ok, P}, if KeepRunenvP -> S2; {reply, {ok, P}, if KeepRunenvP -> S2;
true -> S true -> S
end}; end};
handle_call({test_cl_read_public_proj}, _From, S) -> handle_call({test_read_public_projection, ReadRepairP}, _From, S) ->
{Res, _DiscardS2} = cl_read_public_proj(S, true), {Res, ExtraInfo, _DiscardS2} = do_cl_read_public_projection(ReadRepairP, S),
{reply, Res, S}; {reply, {Res, ExtraInfo}, S};
handle_call(_Call, _From, S) -> handle_call(_Call, _From, S) ->
{reply, whaaaaaaaaaa, S}. {reply, whaaaaaaaaaa, S}.
@ -180,25 +183,11 @@ finish_init(BestProj, #ch_mgr{init_finished=false, myflu=MyFLU} = S) ->
exit({yo_weird, Else}) exit({yo_weird, Else})
end. end.
do_cl_write_current_proj(#ch_mgr{proj=Proj, myflu=MyFLU} = S) -> do_cl_write_proposed_proj(#ch_mgr{proj_proposed=Proj} = S) ->
#projection{epoch_number=Epoch} = Proj, #projection{epoch_number=Epoch} = Proj,
case cl_write_public_proj(Epoch, Proj, S) of case cl_write_public_proj(Epoch, Proj, S) of
{ok, S2} -> {ok, _S2}=Res ->
case cl_read_public_proj(S2, true) of Res;
{ok, Proj2, S3} ->
%% ?D(Proj2),
%% ?D(machi_flu0:get_epoch(MyFLU)),
Proj2b = update_projection_dbg2(
Proj2, [{hooray, {date(), time()}}]),
ok = machi_flu0:proj_write(MyFLU, Epoch, private, Proj2b),
MyP = make_projection_summary(element(2,
machi_flu0:proj_read_latest(MyFLU, private))),
?D(MyP),
?D(S3#ch_mgr.runenv),
{ok, S3#ch_mgr{proj=Proj2}};
{_Other3, _S3}=Else3 ->
Else3
end;
{_Other2, _S2}=Else2 -> {_Other2, _S2}=Else2 ->
Else2 Else2
end. end.
@ -206,6 +195,8 @@ do_cl_write_current_proj(#ch_mgr{proj=Proj, myflu=MyFLU} = S) ->
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
cl_write_public_proj(Epoch, Proj, S) -> cl_write_public_proj(Epoch, Proj, S) ->
%% Write to local public projection store first, and if it succeeds,
%% then write to all remote public projection stores.
cl_write_public_proj_local(Epoch, Proj, S). cl_write_public_proj_local(Epoch, Proj, S).
cl_write_public_proj_local(Epoch, Proj, #ch_mgr{myflu=MyFLU}=S) -> cl_write_public_proj_local(Epoch, Proj, #ch_mgr{myflu=MyFLU}=S) ->
@ -225,16 +216,45 @@ cl_write_public_proj_remote(FLUs, Partitions, Epoch, Proj, S) ->
%% We're going to be very cavalier about this write because we'll rely %% We're going to be very cavalier about this write because we'll rely
%% on the read side to do any read repair. %% on the read side to do any read repair.
DoIt = fun(X) -> machi_flu0:proj_write(X, Epoch, public, Proj) end, DoIt = fun(X) -> machi_flu0:proj_write(X, Epoch, public, Proj) end,
Rs = [perhaps_call_t(S, Partitions, FLU, fun() -> DoIt(FLU) end) || Rs = [{FLU, perhaps_call_t(S, Partitions, FLU, fun() -> DoIt(FLU) end)} ||
FLU <- FLUs], FLU <- FLUs],
case lists:usort(Rs) of {{remote_write_results, Rs}, S}.
[ok] ->
{ok, S}; do_cl_read_public_projection(ReadRepairP,
_ -> #ch_mgr{proj=Proj1, myflu=MyFLU} = S) ->
{{mixed_bag, lists:zip(FLUs, Rs)}, S} Epoch1 = Proj1#projection.epoch_number,
case cl_read_public_projection(S) of
{unanimous, Proj2, Extra, S3} when Proj2 == Proj1 ->
{Proj2, Extra, S3};
{unanimous, #projection{epoch_number=Epoch2}=Proj2, Extra, _S3}
when Epoch2 < Epoch1 orelse
(Epoch2 == Epoch1 andalso Proj2 /= Proj1) ->
exit({invariant_error, mine, Proj1, cl_unanimous, Proj2, extra, Extra});
{unanimous, #projection{epoch_number=Epoch2}=Proj2, Extra, S3}
when Epoch2 > Epoch1 ->
Proj2b = update_projection_dbg2(
Proj2, [{hooray, {date(), time()}}|Extra]),
ok = machi_flu0:proj_write(MyFLU, Epoch2, private, Proj2b),
{Proj2, Extra, S3#ch_mgr{proj=Proj2b}};
{needs_work, _FLUsRs, _Extra, _S3}=Else3 ->
if not ReadRepairP ->
Else3;
true ->
%% 1. Do the results contain a projection?
%% perhaps figure that in cl_read_public_projection()?
%% 2. Were there any error_unwritten?
%% 3. Repair the unwritten FLUs.
%% 4. Nothing to do with timeouts, right? They're
%% hopeless for the moment, need to wait.
%% For read-repair, just choose the best and then brute-
%% force write, don't care about write status, then
%% repeat do_cl_read_public_projection() ??
Else3
end
end. end.
cl_read_public_proj(#ch_mgr{proj=Proj}=S, ReadRepairP) -> cl_read_public_projection(#ch_mgr{proj=Proj}=S) ->
#projection{all_members=All_list} = Proj, #projection{all_members=All_list} = Proj,
{_UpNodes, Partitions, S2} = calc_up_nodes(S), {_UpNodes, Partitions, S2} = calc_up_nodes(S),
DoIt = fun(X) -> DoIt = fun(X) ->
@ -245,15 +265,13 @@ cl_read_public_proj(#ch_mgr{proj=Proj}=S, ReadRepairP) ->
end, end,
Rs = [perhaps_call_t(S, Partitions, FLU, fun() -> DoIt(FLU) end) || Rs = [perhaps_call_t(S, Partitions, FLU, fun() -> DoIt(FLU) end) ||
FLU <- All_list], FLU <- All_list],
%% todo fix the disparity between the return tuples below
case lists:usort(Rs) of case lists:usort(Rs) of
[P] when is_record(P, projection) -> [P] when is_record(P, projection) ->
{ok, P, S2}; Extra = [{all_replied, length(Rs) == length(All_list)}],
_ when ReadRepairP -> {unanimous, P, [{unanimous,true}|Extra], S2};
_ ->
FLUsRs = lists:zip(All_list, Rs), FLUsRs = lists:zip(All_list, Rs),
rank_and_repair_instructions(FLUsRs, Proj); {needs_work, FLUsRs, [flarfus], S2} % todo?
_ when not ReadRepairP ->
{error_not_unanimous, undefined_todo, S2} % todo?
end. end.
make_initial_projection(MyName, All_list, UPI_list, Repairing_list, Ps) -> make_initial_projection(MyName, All_list, UPI_list, Repairing_list, Ps) ->
@ -497,6 +515,8 @@ smoke0_test() ->
{ok, FLUa} = machi_flu0:start_link(a), {ok, FLUa} = machi_flu0:start_link(a),
{ok, M0} = ?MGR:start_link(a, [a,b,c], {1,2,3}, 50, 50, a), {ok, M0} = ?MGR:start_link(a, [a,b,c], {1,2,3}, 50, 50, a),
try try
pong = ping(M0),
[begin [begin
Proj = ?MGR:calculate_projection_internal_old(M0), Proj = ?MGR:calculate_projection_internal_old(M0),
io:format(user, "~w\n", [?MGR:make_projection_summary(Proj)]) io:format(user, "~w\n", [?MGR:make_projection_summary(Proj)])
@ -511,7 +531,8 @@ smoke1_test() ->
{ok, FLUb} = machi_flu0:start_link(b), {ok, FLUb} = machi_flu0:start_link(b),
{ok, FLUc} = machi_flu0:start_link(c), {ok, FLUc} = machi_flu0:start_link(c),
I_represent = I_am = a, I_represent = I_am = a,
{ok, M0} = ?MGR:start_link(I_represent, [a,b,c], {1,2,3}, 50, 50, I_am), %% {ok, M0} = ?MGR:start_link(I_represent, [a,b,c], {1,2,3}, 50, 50, I_am),
{ok, M0} = ?MGR:start_link(I_represent, [a,b,c], {1,2,3}, 0, 100, I_am),
try try
{ok, P1} = test_calc_projection(M0, false), {ok, P1} = test_calc_projection(M0, false),
%% ?D(make_projection_summary(P1)), %% ?D(make_projection_summary(P1)),
@ -520,10 +541,11 @@ smoke1_test() ->
%% {ok, _Pb} = test_calc_projection(M0, true), %% {ok, _Pb} = test_calc_projection(M0, true),
%% %% ?D(make_projection_summary(_Pb)), %% %% ?D(make_projection_summary(_Pb)),
pong = ping(M0), _ = test_calc_proposed_projection(M0),
{remote_write_results, [{b,ok},{c,ok}]} =
%% _ = test_calc_proposed_projection(M0), test_write_proposed_projection(M0),
%% ok = test_write_proposed_projection(M0), XX = test_read_public_projection(M0, false),
?D(XX),
ok ok
after after
@ -546,8 +568,10 @@ smoke1_test() ->
%% P1Epoch = P1#projection.epoch_number, %% P1Epoch = P1#projection.epoch_number,
%% ok = machi_flu0:proj_write(FLUa, P1Epoch, public, P1a), %% ok = machi_flu0:proj_write(FLUa, P1Epoch, public, P1a),
%% ok = machi_flu0:proj_write(FLUb, P1Epoch, public, P1b), %% ok = machi_flu0:proj_write(FLUb, P1Epoch, public, P1b),
%% XX = test_cl_read_public_proj(M0), %% XX = test_read_public_projection(M0, false),
%% ?D(XX) %% ?D(XX),
%% XX2 = test_read_public_projection(M0, true),
%% ?D(XX2)
%% after %% after
%% ok = ?MGR:stop(M0), %% ok = ?MGR:stop(M0),
%% ok = machi_flu0:stop(FLUa), %% ok = machi_flu0:stop(FLUa),