From 9c045374973cd1f77e29170895bacd625d2a6bf4 Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Mon, 27 Oct 2014 18:52:42 +0900 Subject: [PATCH] WIP: chain mgmt prototype scaffolding --- .../poc-machi/src/machi_chain_manager1.erl | 119 ++++++++++++++---- .../test/machi_chain_manager0_test.erl | 2 +- 2 files changed, 95 insertions(+), 26 deletions(-) diff --git a/prototype/poc-machi/src/machi_chain_manager1.erl b/prototype/poc-machi/src/machi_chain_manager1.erl index 5fb31ee..fa3ab34 100644 --- a/prototype/poc-machi/src/machi_chain_manager1.erl +++ b/prototype/poc-machi/src/machi_chain_manager1.erl @@ -27,8 +27,8 @@ -define(D(X), io:format(user, "~s ~p\n", [??X, X])). %% API --export([start_link/4, stop/1, - calculate_projection_internal_old/3, +-export([start_link/6, stop/1, + calculate_projection_internal_old/1, cl_write_current_projection/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -48,23 +48,29 @@ -compile(export_all). -endif. %TEST -start_link(MyName, All_list, Seed, MyFLUPid) -> - gen_server:start_link(?MODULE, {MyName, All_list, Seed, MyFLUPid}, []). +start_link(MyName, All_list, Seed, + OldThreshold, NoPartitionThreshold, + MyFLUPid) -> + gen_server:start_link(?MODULE, {MyName, All_list, Seed, + OldThreshold, NoPartitionThreshold, + MyFLUPid}, []). stop(Pid) -> gen_server:call(Pid, {stop}, infinity). -calculate_projection_internal_old(Pid, OldThreshold, NoPartitionThreshold) -> - gen_server:call(Pid, {calculate_projection_internal_old, OldThreshold, NoPartitionThreshold}, infinity). +calculate_projection_internal_old(Pid) -> + gen_server:call(Pid, {calculate_projection_internal_old}, infinity). cl_write_current_projection(Pid) -> gen_server:call(Pid, {cl_write_current_projection}, infinity). %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -init({MyName, All_list, Seed, MyFLUPid}) -> +init({MyName, All_list, Seed, OldThreshold, NoPartitionThreshold, MyFLUPid}) -> RunEnv = [{seed, Seed}, - {network_partitions, []}], + {network_partitions, []}, + {old_threshold, OldThreshold}, + {no_partition_threshold, NoPartitionThreshold}], S = #ch_mgr{name=MyName, proj=make_initial_projection(MyName, All_list, All_list, [], []), @@ -72,7 +78,10 @@ init({MyName, All_list, Seed, MyFLUPid}) -> runenv=RunEnv}, {ok, S}. -handle_call({calculate_projection_internal_old, OldThreshold, NoPartitionThreshold}, _From, S) -> +handle_call({calculate_projection_internal_old}, _From, + #ch_mgr{runenv=RunEnv}=S) -> + OldThreshold = proplists:get_value(old_threshold, RunEnv), + NoPartitionThreshold = proplists:get_value(no_partition_threshold, RunEnv), {Reply, S2} = calc_projection(OldThreshold, NoPartitionThreshold, S), {reply, Reply, S2}; handle_call({cl_write_current_projection}, _From, S) -> @@ -113,14 +122,36 @@ do_cl_write_current_proj(#ch_mgr{proj=Proj} = S) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -cl_write_current_proj(#ch_mgr{proj=Proj, myflu=MyFLU, runenv=RunEnv1} = S) -> - #projection{epoch_number=Epoch} = Proj, - Epoch=Epoch,MyFLU=MyFLU,RunEnv1=RunEnv1, - {todo, S}. +cl_write_public_proj(Epoch, Proj, S) -> + cl_write_public_proj_local(Epoch, Proj, S). -cl_write_public_proj(_Epoch, _Proj, S) -> - %% todo - {ok, S}. +cl_write_public_proj_local(Epoch, Proj, #ch_mgr{myflu=MyFLU}=S) -> + {_UpNodes, Partitions, S2} = calc_up_nodes(S), + Res0 = perhaps_call_t( + S, Partitions, MyFLU, + fun() -> machi_flu0:proj_write(MyFLU, Epoch, public, Proj) end), + case Res0 of + ok -> + %% todo + FLUs = Proj#projection.all_members -- [MyFLU], + cl_write_public_proj_remote(FLUs, Partitions, Epoch, Proj, S); + Else when Else == error_written; Else == timeout; Else == t_timeout -> + {Else, S2} + end. + +cl_write_public_proj_remote(FLUs, Partitions, Epoch, Proj, S) -> + %% We're going to be very cavalier about this write because we'll rely + %% on the read side to do any read repair. + Rs = + [perhaps_call_t(S, Partitions, FLU, + fun() -> machi_flu0:proj_write(FLU, Epoch, public, Proj) end) || + FLU <- FLUs], + case lists:usort(Rs) of + [ok] -> + {ok, S}; + _ -> + {{mixed_bag, lists:zip(FLUs, Rs)}, S} + end. cl_read_public_proj(S) -> %% todo @@ -164,8 +195,8 @@ calc_projection(OldThreshold, NoPartitionThreshold, LastProj, } = LastProj, LastUp = lists:usort(OldUPI_list ++ OldRepairing_list), AllMembers = (S#ch_mgr.proj)#projection.all_members, - {Up, RunEnv2} = calc_up_nodes(MyName, OldThreshold, NoPartitionThreshold, - AllMembers, RunEnv1), + {Up, _, RunEnv2} = calc_up_nodes(MyName, OldThreshold, NoPartitionThreshold, + AllMembers, RunEnv1), NewUp = Up -- LastUp, Down = AllMembers -- Up, @@ -195,6 +226,15 @@ calc_projection(OldThreshold, NoPartitionThreshold, LastProj, [goo]), {P, S#ch_mgr{runenv=RunEnv3}}. +calc_up_nodes(#ch_mgr{name=MyName, proj=Proj, runenv=RunEnv1}=S) -> + OldThreshold = proplists:get_value(old_threshold, RunEnv1), + NoPartitionThreshold = proplists:get_value(no_partition_threshold, RunEnv1), + AllMembers = Proj#projection.all_members, + {UpNodes, Partitions, RunEnv2} = + calc_up_nodes(MyName, OldThreshold, NoPartitionThreshold, + AllMembers, RunEnv1), + {UpNodes, Partitions, S#ch_mgr{runenv=RunEnv2}}. + calc_up_nodes(MyName, OldThreshold, NoPartitionThreshold, AllMembers, RunEnv1) -> Seed1 = proplists:get_value(seed, RunEnv1), @@ -208,7 +248,7 @@ calc_up_nodes(MyName, OldThreshold, NoPartitionThreshold, not lists:member({Node, MyName}, Partitions2)]), RunEnv2 = replace(RunEnv1, [{seed, Seed2}, {network_partitions, Partitions2}]), - {UpNodes, RunEnv2}. + {UpNodes, Partitions2, RunEnv2}. calc_network_partitions(Nodes, Seed1, OldPartition, @@ -267,29 +307,58 @@ make_all_pairs2([_]) -> make_all_pairs2([H1|T]) -> [[{H1, X}, {X, H1}] || X <- T] ++ make_all_pairs(T). +perhaps_call_t(S, Partitions, FLU, DoIt) -> + try + perhaps_call(S, Partitions, FLU, DoIt) + catch + exit:timeout -> + t_timeout + end. + +perhaps_call(#ch_mgr{name=MyName}, Partitions, FLU, DoIt) -> + case lists:keyfind(FLU, 1, Partitions) of + false -> + Res = DoIt(), + case lists:keyfind(MyName, 2, Partitions) of + false -> + Res; + _ -> + exit(timeout) + end; + _ -> + exit(timeout) + end. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + -ifdef(TEST). -define(MGR, machi_chain_manager1). smoke0_test() -> - {ok, M0} = ?MGR:start_link(a, [a,b,c], {1,2,3}, a), + {ok, M0} = ?MGR:start_link(a, [a,b,c], {1,2,3}, 50, 50, a), try [begin - Proj = ?MGR:calculate_projection_internal_old(M0, 50, 50), + Proj = ?MGR:calculate_projection_internal_old(M0), io:format(user, "~p\n", [?MGR:make_projection_summary(Proj)]) - end || _ <- lists:seq(1,5)], - ?D(cl_write_current_projection(M0)) + end || _ <- lists:seq(1,5)] after ok = ?MGR:stop(M0) end. smoke1_test() -> {ok, FLUa} = machi_flu0:start_link(a), - {ok, M0} = ?MGR:start_link(a, [a,b,c], {1,2,3}, FLUa), + {ok, FLUb} = machi_flu0:start_link(b), + {ok, FLUc} = machi_flu0:start_link(c), + I_represent = I_am = a, + {ok, M0} = ?MGR:start_link(I_represent, [a,b,c], {1,2,3}, 50, 50, I_am), try ok = cl_write_current_projection(M0) after - ok = ?MGR:stop(M0) + ok = ?MGR:stop(M0), + ok = machi_flu0:stop(FLUa), + ok = machi_flu0:stop(FLUb), + ok = machi_flu0:stop(FLUc) end. -endif. diff --git a/prototype/poc-machi/test/machi_chain_manager0_test.erl b/prototype/poc-machi/test/machi_chain_manager0_test.erl index df92813..34f11ad 100644 --- a/prototype/poc-machi/test/machi_chain_manager0_test.erl +++ b/prototype/poc-machi/test/machi_chain_manager0_test.erl @@ -140,7 +140,7 @@ find_common_prefix(_, _) -> []. calc_projection_test_() -> - Runtime = 15, %% Runtime = 60*60, + Runtime = 2, %% Runtime = 60*60, {timeout, Runtime * 500, fun() -> Nodes = [a,b,c],