Merge branch 'slf/otp-refactoring-step4'

This commit is contained in:
Scott Lystig Fritchie 2015-05-07 18:52:33 +09:00
commit 6143bb60e9
15 changed files with 575 additions and 141 deletions

View file

@ -25,6 +25,22 @@ func, and pattern match Erlang style in that func.
*** DONE Do it. *** DONE Do it.
** DONE Finish OTP'izing the Chain Manager with FLU & proj store processes
** DONE Eliminate the timeout exception for the client: just {error,timeout} ret
** DONE Move prototype/chain-manager code to "top" of source tree
*** DONE Preserve current test code (leave as-is? tiny changes?)
*** DONE Make chain manager code flexible enough to run "real world" or "sim"
** TODO Implement real data repair, orchestrated by the chain manager
** TODO Change all protocol ops to enforce the epoch ID
** TODO Add projection wedging logic to each FLU.
- Add no-wedging state to make testing easier?
** TODO Move the FLU server to gen_server behavior?
** TODO Add gproc and get rid of registered name rendezvous
*** TODO Fixes the atom table leak
*** TODO Fixes the problem of having active sequencer for the same prefix
on two FLUS in the same VM
** TODO Fix all known bugs with Chain Manager ** TODO Fix all known bugs with Chain Manager
*** DONE Fix known bugs *** DONE Fix known bugs
@ -32,15 +48,3 @@ func, and pattern match Erlang style in that func.
*** TODO Re-add verification step of stable epochs, including inner projections! *** TODO Re-add verification step of stable epochs, including inner projections!
*** TODO Attempt to remove cruft items in flapping_i? *** TODO Attempt to remove cruft items in flapping_i?
** TODO Finish OTP'izing the Chain Manager with FLU & proj store processes
** TODO Change all protocol ops to enforce the epoch ID
** TODO Add projection wedging logic to each FLU.
- Add no-wedging state to make testing easier?
** TODO Move prototype/chain-manager code to "top" of source tree
*** TODO Preserve current test code (leave as-is? tiny changes?)
*** TODO Make chain manager code flexible enough to run "real world" or "sim"
** TODO Replace registered name use from FLU write/append dispatcher
** TODO Move the FLU server to gen_server behavior?
** TODO Implement real data repair, orchestrated by the chain manager

View file

@ -30,6 +30,7 @@
proj :: projection(), proj :: projection(),
%% %%
timer :: 'undefined' | timer:tref(), timer :: 'undefined' | timer:tref(),
ignore_timer :: boolean(),
proj_history :: queue:queue(), proj_history :: queue:queue(),
flaps=0 :: integer(), flaps=0 :: integer(),
flap_start = ?NOT_FLAPPING flap_start = ?NOT_FLAPPING

View file

@ -7,7 +7,7 @@
{env, [ {env, [
{flu_list, {flu_list,
[ [
{flu_a, 32900, "./data.flu_a"} %%%%%% {flu_a, 32900, "./data.flu_a"}
]} ]}
]} ]}
]}. ]}.

View file

@ -65,18 +65,18 @@
-define(REACT(T), put(react, [T|get(react)])). -define(REACT(T), put(react, [T|get(react)])).
%% API %% API
-export([start_link/2, start_link/3, stop/1, ping/1]). -export([start_link/2, start_link/3, stop/1, ping/1,
set_chain_members/2, set_active/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]). terminate/2, code_change/3]).
-export([projection_transitions_are_sane/2]). -export([make_chmgr_regname/1, projection_transitions_are_sane/2]).
-ifdef(TEST). -ifdef(TEST).
-export([test_calc_projection/2, -export([test_calc_projection/2,
test_write_public_projection/2, test_write_public_projection/2,
test_read_latest_public_projection/2, test_read_latest_public_projection/2,
test_set_active/2,
test_react_to_env/1, test_react_to_env/1,
get_all_hosed/1]). get_all_hosed/1]).
@ -95,7 +95,8 @@ start_link(MyName, MembersDict) ->
start_link(MyName, MembersDict, []). start_link(MyName, MembersDict, []).
start_link(MyName, MembersDict, MgrOpts) -> start_link(MyName, MembersDict, MgrOpts) ->
gen_server:start_link(?MODULE, {MyName, MembersDict, MgrOpts}, []). gen_server:start_link({local, make_chmgr_regname(MyName)}, ?MODULE,
{MyName, MembersDict, MgrOpts}, []).
stop(Pid) -> stop(Pid) ->
gen_server:call(Pid, {stop}, infinity). gen_server:call(Pid, {stop}, infinity).
@ -103,6 +104,20 @@ stop(Pid) ->
ping(Pid) -> ping(Pid) ->
gen_server:call(Pid, {ping}, infinity). gen_server:call(Pid, {ping}, infinity).
%% @doc Set chain members list.
%%
%% NOTE: This implementation is a bit brittle, in that an author with
%% higher rank may try to re-suggest the old membership list if it
%% races with an author of lower rank. For now, we suggest calling
%% set_chain_members() first on the author of highest rank and finish
%% with lowest rank, i.e. name z* first, name a* last.
set_chain_members(Pid, MembersDict) ->
gen_server:call(Pid, {set_chain_members, MembersDict}, infinity).
set_active(Pid, Boolean) when Boolean == true; Boolean == false ->
gen_server:call(Pid, {set_active, Boolean}, infinity).
-ifdef(TEST). -ifdef(TEST).
%% Test/debugging code only. %% Test/debugging code only.
@ -120,9 +135,6 @@ test_read_latest_public_projection(Pid, ReadRepairP) ->
gen_server:call(Pid, {test_read_latest_public_projection, ReadRepairP}, gen_server:call(Pid, {test_read_latest_public_projection, ReadRepairP},
infinity). infinity).
test_set_active(Pid, Boolean) when Boolean == true; Boolean == false ->
gen_server:call(Pid, {test_set_active, Boolean}, infinity).
test_react_to_env(Pid) -> test_react_to_env(Pid) ->
gen_server:call(Pid, {test_react_to_env}, infinity). gen_server:call(Pid, {test_react_to_env}, infinity).
@ -130,43 +142,100 @@ test_react_to_env(Pid) ->
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
init({MyName, MembersDict, MgrOpts}) -> %% Bootstrapping is a hassle ... when when isn't it?
%%
%% If InitMembersDict == [], then we don't know anything about the chain
%% that we'll be participating in. We'll have to wait for directions from
%% our sysadmin later.
%%
%% If InitMembersDict /= [], then we do know what chain we're
%% participating in. It's probably test code, since that's about the
%% only time that we know so much at init() time.
%%
%% In either case, we'll try to create & store an epoch 0 projection
%% and store it to both projections stores. This is tricky if
%% InitMembersDict == [] because InitMembersDict usually contains the
%% #p_svrv records that we need to *write* to the projection store,
%% even our own private store! For test code, we get the store
%% manager's pid in MgrOpts and use direct gen_server calls to the
%% local projection store.
init({MyName, InitMembersDict, MgrOpts}) ->
init_remember_partition_hack(),
ZeroAll_list = [P#p_srvr.name || {_,P} <- orddict:to_list(InitMembersDict)],
ZeroProj = make_none_projection(MyName, ZeroAll_list, InitMembersDict),
ok = store_zeroth_projection_maybe(ZeroProj, MgrOpts),
{MembersDict, Proj} =
get_my_private_proj_boot_info(MgrOpts, InitMembersDict, ZeroProj),
All_list = [P#p_srvr.name || {_, P} <- orddict:to_list(MembersDict)], All_list = [P#p_srvr.name || {_, P} <- orddict:to_list(MembersDict)],
Opt = fun(Key, Default) -> proplists:get_value(Key, MgrOpts, Default) end, Opt = fun(Key, Default) -> proplists:get_value(Key, MgrOpts, Default) end,
RunEnv = [{seed, Opt(seed, now())}, RunEnv = [{seed, Opt(seed, now())},
{use_partition_simulator, Opt(use_partition_simulator, true)},
{network_partitions, Opt(network_partitions, [])}, {network_partitions, Opt(network_partitions, [])},
{network_islands, Opt(network_islands, [])}, {network_islands, Opt(network_islands, [])},
{flapping_i, Opt(flapping, [])}, {flapping_i, Opt(flapping, [])},
{up_nodes, Opt(up_nodes, not_init_yet)}], {up_nodes, Opt(up_nodes, not_init_yet)}],
ActiveP = Opt(active_mode, true), ActiveP = Opt(active_mode, true),
NoneProj = make_none_projection(MyName, All_list, MembersDict),
Proxies = orddict:fold(
fun(K, P, Acc) ->
{ok, Pid} = ?FLU_PC:start_link(P),
[{K, Pid}|Acc]
end, [], MembersDict),
S = #ch_mgr{name=MyName, S = #ch_mgr{name=MyName,
proj=Proj,
%% TODO 2015-03-04: revisit, should this constant be bigger? %% TODO 2015-03-04: revisit, should this constant be bigger?
%% Yes, this should be bigger, but it's a hack. There is %% Yes, this should be bigger, but it's a hack. There is
%% no guarantee that all parties will advance to a minimum %% no guarantee that all parties will advance to a minimum
%% flap awareness in the amount of time that this mgr will. %% flap awareness in the amount of time that this mgr will.
flap_limit=length(All_list) + 50, flap_limit=length(All_list) + 50,
proj=NoneProj,
timer='undefined', timer='undefined',
proj_history=queue:new(), proj_history=queue:new(),
runenv=RunEnv, runenv=RunEnv,
opts=MgrOpts, opts=MgrOpts},
members_dict=MembersDict, {_, S2} = do_set_chain_members_dict(MembersDict, S),
proxies_dict=orddict:from_list(Proxies)}, S3 = if ActiveP == false ->
S2 = if ActiveP == false -> S2;
S;
ActiveP == true -> ActiveP == true ->
set_active_timer(S) set_active_timer(S2)
end, end,
{ok, S2}. {ok, S3}.
handle_call({ping}, _From, S) -> handle_call({ping}, _From, S) ->
{reply, pong, S}; {reply, pong, S};
handle_call({set_chain_members, MembersDict}, _From,
#ch_mgr{name=MyName,
proj=#projection_v1{all_members=OldAll_list,
epoch_number=OldEpoch,
upi=OldUPI}=OldProj}=S) ->
{Reply, S2} = do_set_chain_members_dict(MembersDict, S),
%% TODO: should there be any additional sanity checks? Right now,
%% if someone does something bad, then do_react_to_env() will
%% crash, which will crash us, and we'll restart in a sane & old
%% config.
All_list = [P#p_srvr.name || {_, P} <- orddict:to_list(MembersDict)],
MissingInNew = OldAll_list -- All_list,
NewUPI = OldUPI -- MissingInNew,
NewDown = All_list -- NewUPI,
NewEpoch = OldEpoch + 1111,
NewProj = machi_projection:update_checksum(
OldProj#projection_v1{author_server=MyName,
creation_time=now(),
epoch_number=NewEpoch,
all_members=All_list,
upi=NewUPI,
repairing=[],
down=NewDown,
members_dict=MembersDict}),
S3 = S2#ch_mgr{proj=NewProj},
{_QQ, S4} = do_react_to_env(S3),
{reply, Reply, S4};
handle_call({set_active, Boolean}, _From, #ch_mgr{timer=TRef}=S) ->
case {Boolean, TRef} of
{true, undefined} ->
S2 = set_active_timer(S),
{reply, ok, S2};
{false, _} ->
(catch timer:cancel(TRef)),
{reply, ok, S#ch_mgr{timer=undefined}};
_ ->
{reply, error, S}
end;
handle_call({stop}, _From, S) -> handle_call({stop}, _From, S) ->
{stop, normal, ok, S}; {stop, normal, ok, S};
handle_call({test_calc_projection, KeepRunenvP}, _From, handle_call({test_calc_projection, KeepRunenvP}, _From,
@ -184,17 +253,6 @@ handle_call({test_read_latest_public_projection, ReadRepairP}, _From, S) ->
do_cl_read_latest_public_projection(ReadRepairP, S), do_cl_read_latest_public_projection(ReadRepairP, S),
Res = {Perhaps, Val, ExtraInfo}, Res = {Perhaps, Val, ExtraInfo},
{reply, Res, S2}; {reply, Res, S2};
handle_call({test_set_active, Boolean}, _From, #ch_mgr{timer=TRef}=S) ->
case {Boolean, TRef} of
{true, undefined} ->
S2 = set_active_timer(S),
{reply, ok, S2};
{false, _} ->
(catch timer:cancel(TRef)),
{reply, ok, S#ch_mgr{timer=undefined}};
_ ->
{reply, error, S}
end;
handle_call({test_react_to_env}, _From, S) -> handle_call({test_react_to_env}, _From, S) ->
{TODOtodo, S2} = do_react_to_env(S), {TODOtodo, S2} = do_react_to_env(S),
{reply, TODOtodo, S2}; {reply, TODOtodo, S2};
@ -205,8 +263,29 @@ handle_cast(_Cast, S) ->
?D({cast_whaaaaaaaaaaa, _Cast}), ?D({cast_whaaaaaaaaaaa, _Cast}),
{noreply, S}. {noreply, S}.
handle_info(tick_check_environment, #ch_mgr{ignore_timer=true}=S) ->
{noreply, S};
handle_info(tick_check_environment, S) ->
{{_Delta, Props, _Epoch}, S2} = do_react_to_env(S),
case proplists:get_value(throttle_seconds, Props) of
N when is_integer(N), N > 0 ->
%% We are flapping. Set ignore_timer=true and schedule a
%% reminder to stop ignoring. This slows down the rate of
%% flapping. If/when the yo:tell_author_yo() function in
%% state C200 is ever implemented, then it should be
%% implemented via the test_react_to_env style.
erlang:send_after(N*1000, self(), stop_ignoring_timer),
{noreply, S#ch_mgr{ignore_timer=true}};
_ ->
{noreply, S2}
end;
handle_info(stop_ignoring_timer, S) ->
{noreply, S#ch_mgr{ignore_timer=false}};
handle_info(Msg, S) -> handle_info(Msg, S) ->
exit({bummer, Msg}), case get(todo_bummer) of undefined -> io:format("TODO: got ~p\n", [Msg]);
_ -> ok
end,
put(todo_bummer, true),
{noreply, S}. {noreply, S}.
terminate(_Reason, _S) -> terminate(_Reason, _S) ->
@ -222,10 +301,33 @@ make_none_projection(MyName, All_list, MembersDict) ->
UPI_list = [], UPI_list = [],
machi_projection:new(MyName, MembersDict, UPI_list, Down_list, [], []). machi_projection:new(MyName, MembersDict, UPI_list, Down_list, [], []).
get_my_private_proj_boot_info(MgrOpts, DefaultDict, DefaultProj) ->
case proplists:get_value(projection_store_registered_name, MgrOpts) of
undefined ->
{DefaultDict, DefaultProj};
Store ->
{ok, P} = machi_projection_store:read_latest_projection(Store,
private),
{P#projection_v1.members_dict, P}
end.
%% Write the epoch 0 projection store, to assist bootstrapping. If the
%% 0th epoch is already written, there's no problem.
store_zeroth_projection_maybe(ZeroProj, MgrOpts) ->
case proplists:get_value(projection_store_registered_name, MgrOpts) of
undefined ->
ok;
Store ->
_ = machi_projection_store:write(Store, public, ZeroProj),
_ = machi_projection_store:write(Store, private, ZeroProj),
ok
end.
set_active_timer(#ch_mgr{name=MyName, members_dict=MembersDict}=S) -> set_active_timer(#ch_mgr{name=MyName, members_dict=MembersDict}=S) ->
FLU_list = [P#p_srvr.name || {_,P} <- orddict:to_list(MembersDict)], FLU_list = [P#p_srvr.name || {_,P} <- orddict:to_list(MembersDict)],
USec = calc_sleep_ranked_order(1000, 2000, MyName, FLU_list), USec = calc_sleep_ranked_order(1000, 2000, MyName, FLU_list),
{ok, TRef} = timer:send_interval(USec, yo_yo_yo_todo), {ok, TRef} = timer:send_interval(USec, tick_check_environment),
S#ch_mgr{timer=TRef}. S#ch_mgr{timer=TRef}.
do_cl_write_public_proj(Proj, S) -> do_cl_write_public_proj(Proj, S) ->
@ -260,7 +362,7 @@ cl_write_public_proj_local(Epoch, Proj, SkipLocalWriteErrorP,
Else when SkipLocalWriteErrorP -> Else when SkipLocalWriteErrorP ->
{XX, SS} = Continue(), {XX, SS} = Continue(),
{{local_write_result, Else, XX}, SS}; {{local_write_result, Else, XX}, SS};
Else when Else == error_written; Else == timeout; Else == t_timeout -> Else ->
{Else, S2} {Else, S2}
end. end.
@ -278,13 +380,14 @@ do_cl_read_latest_public_projection(ReadRepairP,
case cl_read_latest_projection(public, S) of case cl_read_latest_projection(public, S) of
{needs_repair, FLUsRs, Extra, S3} -> {needs_repair, FLUsRs, Extra, S3} ->
if not ReadRepairP -> if not ReadRepairP ->
{not_unanimous, todoxyz, [{results, FLUsRs}|Extra], S3}; {not_unanimous, todoxyz, [{unanimous_flus, []},
{results, FLUsRs}|Extra], S3};
true -> true ->
{_Status, S4} = do_read_repair(FLUsRs, Extra, S3), {_Status, S4} = do_read_repair(FLUsRs, Extra, S3),
do_cl_read_latest_public_projection(ReadRepairP, S4) do_cl_read_latest_public_projection(ReadRepairP, S4)
end; end;
{UnanimousTag, Proj2, Extra, S3}=_Else -> {_UnanimousTag, _Proj2, _Extra, _S3}=Else ->
{UnanimousTag, Proj2, Extra, S3} Else
end. end.
read_latest_projection_call_only(ProjectionType, AllHosed, read_latest_projection_call_only(ProjectionType, AllHosed,
@ -294,13 +397,16 @@ read_latest_projection_call_only(ProjectionType, AllHosed,
{_UpNodes, Partitions, S2} = calc_up_nodes(S), {_UpNodes, Partitions, S2} = calc_up_nodes(S),
DoIt = fun(Pid) -> DoIt = fun(Pid) ->
case ?FLU_PC:read_latest_projection(Pid, ProjectionType, ?TO) of case (?FLU_PC:read_latest_projection(Pid, ProjectionType, ?TO)) of
{ok, P} -> P; {ok, P} -> P;
Else -> Else Else -> Else
end end
end, end,
%% io:format(user, "All_queried_list ~p\n", [All_queried_list]),
Rs = [perhaps_call_t(S, Partitions, FLU, fun(Pid) -> DoIt(Pid) end) || Rs = [perhaps_call_t(S, Partitions, FLU, fun(Pid) -> DoIt(Pid) end) ||
FLU <- All_queried_list], FLU <- All_queried_list],
%% Rs = [perhaps_call_t(S, Partitions, FLU, fun(Pid) -> DoIt(Pid) end) ||
%% FLU <- All_queried_list],
FLUsRs = lists:zip(All_queried_list, Rs), FLUsRs = lists:zip(All_queried_list, Rs),
{All_queried_list, FLUsRs, S2}. {All_queried_list, FLUsRs, S2}.
@ -315,7 +421,7 @@ cl_read_latest_projection(ProjectionType, AllHosed, S) ->
rank_and_sort_projections_with_extra(All_queried_list, FLUsRs, S2). rank_and_sort_projections_with_extra(All_queried_list, FLUsRs, S2).
rank_and_sort_projections_with_extra(All_queried_list, FLUsRs, rank_and_sort_projections_with_extra(All_queried_list, FLUsRs,
#ch_mgr{proj=CurrentProj}=S) -> #ch_mgr{name=MyName,proj=CurrentProj}=S) ->
UnwrittenRs = [x || {_, error_unwritten} <- FLUsRs], UnwrittenRs = [x || {_, error_unwritten} <- FLUsRs],
Ps = [Proj || {_FLU, Proj} <- FLUsRs, is_record(Proj, projection_v1)], Ps = [Proj || {_FLU, Proj} <- FLUsRs, is_record(Proj, projection_v1)],
BadAnswerFLUs = [FLU || {FLU, Answer} <- FLUsRs, BadAnswerFLUs = [FLU || {FLU, Answer} <- FLUsRs,
@ -324,7 +430,17 @@ rank_and_sort_projections_with_extra(All_queried_list, FLUsRs,
if All_queried_list == [] if All_queried_list == []
orelse orelse
length(UnwrittenRs) == length(FLUsRs) -> length(UnwrittenRs) == length(FLUsRs) ->
{error_unwritten, FLUsRs, [todo_fix_caller_perhaps], S}; NoneProj = make_none_projection(MyName, [], orddict:new()),
Extra2 = [{all_members_replied, true},
{all_queried_list, All_queried_list},
{flus_rs, FLUsRs},
{unanimous_flus,[]},
{not_unanimous_flus, []},
{bad_answer_flus, BadAnswerFLUs},
{not_unanimous_answers, []},
{trans_all_hosed, []},
{trans_all_flap_counts, []}],
{not_unanimous, NoneProj, Extra2, S};
UnwrittenRs /= [] -> UnwrittenRs /= [] ->
{needs_repair, FLUsRs, [flarfus], S}; {needs_repair, FLUsRs, [flarfus], S};
true -> true ->
@ -489,9 +605,17 @@ calc_up_nodes(#ch_mgr{name=MyName, proj=Proj, runenv=RunEnv1}=S) ->
{UpNodes, Partitions, S#ch_mgr{runenv=RunEnv2}}. {UpNodes, Partitions, S#ch_mgr{runenv=RunEnv2}}.
calc_up_nodes(MyName, AllMembers, RunEnv1) -> calc_up_nodes(MyName, AllMembers, RunEnv1) ->
case proplists:get_value(use_partition_simulator, RunEnv1) of
true ->
calc_up_nodes_sim(MyName, AllMembers, RunEnv1);
false ->
{AllMembers -- get(remember_partition_hack), [], RunEnv1}
end.
calc_up_nodes_sim(MyName, AllMembers, RunEnv1) ->
{Partitions2, Islands2} = machi_partition_simulator:get(AllMembers), {Partitions2, Islands2} = machi_partition_simulator:get(AllMembers),
catch ?REACT({partitions,Partitions2}), catch ?REACT({calc_up_nodes,?LINE,[{partitions,Partitions2},
catch ?REACT({islands,Islands2}), {islands,Islands2}]}),
UpNodes = lists:sort( UpNodes = lists:sort(
[Node || Node <- AllMembers, [Node || Node <- AllMembers,
not lists:member({MyName, Node}, Partitions2), not lists:member({MyName, Node}, Partitions2),
@ -503,7 +627,8 @@ calc_up_nodes(MyName, AllMembers, RunEnv1) ->
{UpNodes, Partitions2, RunEnv2}. {UpNodes, Partitions2, RunEnv2}.
replace(PropList, Items) -> replace(PropList, Items) ->
proplists:compact(Items ++ PropList). Tmp = Items ++ PropList,
[{K, proplists:get_value(K, Tmp)} || K <- proplists:get_keys(Tmp)].
rank_and_sort_projections([], CurrentProj) -> rank_and_sort_projections([], CurrentProj) ->
rank_projections([CurrentProj], CurrentProj); rank_projections([CurrentProj], CurrentProj);
@ -561,6 +686,22 @@ rank_projection(#projection_v1{author_server=Author,
( N * length(Repairing_list)) + ( N * length(Repairing_list)) +
(N*N * length(UPI_list)). (N*N * length(UPI_list)).
do_set_chain_members_dict(MembersDict, #ch_mgr{proxies_dict=OldProxiesDict}=S)->
catch orddict:fold(
fun(_K, Pid, _Acc) ->
_ = (catch ?FLU_PC:quit(Pid))
end, [], OldProxiesDict),
Proxies = orddict:fold(
fun(K, P, Acc) ->
{ok, Pid} = ?FLU_PC:start_link(P),
[{K, Pid}|Acc]
end, [], MembersDict),
{ok, S#ch_mgr{members_dict=MembersDict,
proxies_dict=orddict:from_list(Proxies)}}.
do_react_to_env(#ch_mgr{proj=#projection_v1{epoch_number=Epoch,
members_dict=[]}}=S) ->
{{empty_members_dict, [], Epoch}, S};
do_react_to_env(S) -> do_react_to_env(S) ->
put(react, []), put(react, []),
react_to_env_A10(S). react_to_env_A10(S).
@ -571,12 +712,15 @@ react_to_env_A10(S) ->
react_to_env_A20(Retries, S) -> react_to_env_A20(Retries, S) ->
?REACT(a20), ?REACT(a20),
init_remember_partition_hack(),
{UnanimousTag, P_latest, ReadExtra, S2} = {UnanimousTag, P_latest, ReadExtra, S2} =
do_cl_read_latest_public_projection(true, S), do_cl_read_latest_public_projection(true, S),
%% The UnanimousTag isn't quite sufficient for our needs. We need %% The UnanimousTag isn't quite sufficient for our needs. We need
%% to determine if *all* of the UPI+Repairing FLUs are members of %% to determine if *all* of the UPI+Repairing FLUs are members of
%% the unanimous server replies. %% the unanimous server replies. All Repairing FLUs should be up
%% now (because if they aren't then they cannot be repairing), so
%% all Repairing FLUs have no non-race excuse not to be in UnanimousFLUs.
UnanimousFLUs = lists:sort(proplists:get_value(unanimous_flus, ReadExtra)), UnanimousFLUs = lists:sort(proplists:get_value(unanimous_flus, ReadExtra)),
UPI_Repairing_FLUs = lists:sort(P_latest#projection_v1.upi ++ UPI_Repairing_FLUs = lists:sort(P_latest#projection_v1.upi ++
P_latest#projection_v1.repairing), P_latest#projection_v1.repairing),
@ -1086,7 +1230,14 @@ react_to_env_C110(P_latest, #ch_mgr{name=MyName} = S) ->
P_latest2 = machi_projection:update_dbg2(P_latest, Extra_todo), P_latest2 = machi_projection:update_dbg2(P_latest, Extra_todo),
MyNamePid = proxy_pid(MyName, S), MyNamePid = proxy_pid(MyName, S),
ok = ?FLU_PC:write_projection(MyNamePid, private, P_latest2, ?TO), %% This is the local projection store. Use a larger timeout, so
%% that things locally are pretty horrible if we're killed by a
%% timeout exception.
%% ok = ?FLU_PC:write_projection(MyNamePid, private, P_latest2, ?TO*30),
Goo = P_latest2#projection_v1.epoch_number,
%% io:format(user, "HEE110 ~w ~w ~w\n", [S#ch_mgr.name, self(), lists:reverse(get(react))]),
{ok,Goo} = {?FLU_PC:write_projection(MyNamePid, private, P_latest2, ?TO*30),Goo},
case proplists:get_value(private_write_verbose, S#ch_mgr.opts) of case proplists:get_value(private_write_verbose, S#ch_mgr.opts) of
true -> true ->
{_,_,C} = os:timestamp(), {_,_,C} = os:timestamp(),
@ -1184,7 +1335,7 @@ calculate_flaps(P_newprop, _P_current, _FlapLimit,
{_WhateverUnanimous, BestP, Props, _S} = {_WhateverUnanimous, BestP, Props, _S} =
cl_read_latest_projection(private, S), cl_read_latest_projection(private, S),
NotBestPs = proplists:get_value(not_unanimous_answers, Props), NotBestPs = proplists:get_value(not_unanimous_answers, Props, []),
DownUnion = lists:usort( DownUnion = lists:usort(
lists:flatten( lists:flatten(
[P#projection_v1.down || [P#projection_v1.down ||
@ -1259,9 +1410,9 @@ calculate_flaps(P_newprop, _P_current, _FlapLimit,
%% %%
%% So b's working on epoch 451 at the same time that d's latest %% So b's working on epoch 451 at the same time that d's latest
%% public projection is only epoch 441. But there's enough %% public projection is only epoch 441. But there's enough
%% lag so that b can "see" that a's bad=[c] (due to t_timeout!) %% lag so that b can "see" that a's bad=[c] (due to
%% and c's bad=[a]. So voila, b magically knows about both %% {error,partition}!) and c's bad=[a]. So voila, b
%% problem FLUs. Weird/cool. %% magically knows about both problem FLUs. Weird/cool.
AllFlapCounts = TempAllFlapCounts, AllFlapCounts = TempAllFlapCounts,
AllHosed = lists:usort(DownUnion ++ HosedTransUnion ++ BadFLUs); AllHosed = lists:usort(DownUnion ++ HosedTransUnion ++ BadFLUs);
@ -1463,6 +1614,8 @@ projection_transition_is_sane(
if UPI_2_suffix == UPI_2_concat -> if UPI_2_suffix == UPI_2_concat ->
ok; ok;
true -> true ->
%% 'make dialyzer' will believe that this can never succeed.
%% 'make dialyzer-test' will not complain, however.
if RetrospectiveP -> if RetrospectiveP ->
%% We are in retrospective mode. But there are %% We are in retrospective mode. But there are
%% some transitions that are difficult to find %% some transitions that are difficult to find
@ -1558,19 +1711,13 @@ sleep_ranked_order(MinSleep, MaxSleep, FLU, FLU_list) ->
USec. USec.
calc_sleep_ranked_order(MinSleep, MaxSleep, FLU, FLU_list) -> calc_sleep_ranked_order(MinSleep, MaxSleep, FLU, FLU_list) ->
Front = lists:takewhile(fun(X) -> X /=FLU end, FLU_list), Front = lists:takewhile(fun(X) -> X /= FLU end, lists:sort(FLU_list)),
Index = length(Front) + 1, Index = length(Front),
NumNodes = length(FLU_list), NumNodes = length(FLU_list),
SleepIndex = NumNodes - Index, SleepChunk = if NumNodes == 0 -> 0;
SleepChunk = MaxSleep div NumNodes, true -> (MaxSleep - MinSleep) div NumNodes
MinSleep + (SleepChunk * SleepIndex). end,
MinSleep + (SleepChunk * Index).
my_find_minmost([]) ->
0;
my_find_minmost([{_,_}|_] = TransFlapCounts0) ->
lists:min([FlapCount || {_T, {_FlTime, FlapCount}} <- TransFlapCounts0]);
my_find_minmost(TransFlapCounts0) ->
lists:min(TransFlapCounts0).
get_raw_flapping_i(#projection_v1{dbg=Dbg}) -> get_raw_flapping_i(#projection_v1{dbg=Dbg}) ->
proplists:get_value(flapping_i, Dbg, []). proplists:get_value(flapping_i, Dbg, []).
@ -1581,14 +1728,6 @@ get_flap_count(P) ->
get_all_flap_counts(P) -> get_all_flap_counts(P) ->
proplists:get_value(all_flap_counts, get_raw_flapping_i(P), []). proplists:get_value(all_flap_counts, get_raw_flapping_i(P), []).
get_all_flap_counts_counts(P) ->
case get_all_flap_counts(P) of
[] ->
[];
[{_,{_,_}}|_] = Cs ->
[Count || {_FLU, {_Time, Count}} <- Cs]
end.
get_all_hosed(P) when is_record(P, projection_v1)-> get_all_hosed(P) when is_record(P, projection_v1)->
proplists:get_value(all_hosed, get_raw_flapping_i(P), []). proplists:get_value(all_hosed, get_raw_flapping_i(P), []).
@ -1648,6 +1787,11 @@ inner_projection_or_self(P) ->
P_inner P_inner
end. end.
make_chmgr_regname(A) when is_atom(A) ->
list_to_atom(atom_to_list(A) ++ "_chmgr");
make_chmgr_regname(B) when is_binary(B) ->
list_to_atom(binary_to_list(B) ++ "_chmgr").
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
perhaps_call_t(S, Partitions, FLU, DoIt) -> perhaps_call_t(S, Partitions, FLU, DoIt) ->
@ -1655,15 +1799,23 @@ perhaps_call_t(S, Partitions, FLU, DoIt) ->
perhaps_call(S, Partitions, FLU, DoIt) perhaps_call(S, Partitions, FLU, DoIt)
catch catch
exit:timeout -> exit:timeout ->
t_timeout {error, partition};
exit:{timeout,_} ->
{error, partition}
end. end.
perhaps_call(#ch_mgr{name=MyName}=S, Partitions, FLU, DoIt) -> perhaps_call(#ch_mgr{name=MyName}=S, Partitions, FLU, DoIt) ->
ProxyPid = proxy_pid(FLU, S), ProxyPid = proxy_pid(FLU, S),
RemoteFLU_p = FLU /= MyName, RemoteFLU_p = FLU /= MyName,
erase(bad_sock),
case RemoteFLU_p andalso lists:member({MyName, FLU}, Partitions) of case RemoteFLU_p andalso lists:member({MyName, FLU}, Partitions) of
false -> false ->
Res = DoIt(ProxyPid), Res = DoIt(ProxyPid),
if Res == {error, partition} ->
remember_partition_hack(FLU);
true ->
ok
end,
case RemoteFLU_p andalso lists:member({FLU, MyName}, Partitions) of case RemoteFLU_p andalso lists:member({FLU, MyName}, Partitions) of
false -> false ->
Res; Res;
@ -1676,5 +1828,10 @@ perhaps_call(#ch_mgr{name=MyName}=S, Partitions, FLU, DoIt) ->
exit(timeout) exit(timeout)
end. end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% init_remember_partition_hack() ->
put(remember_partition_hack, []).
remember_partition_hack(FLU) ->
put(remember_partition_hack, [FLU|get(remember_partition_hack)]).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

View file

@ -37,13 +37,27 @@
%% %%
%% The FLU is named after the CORFU server "FLU" or "FLash Unit" server. %% The FLU is named after the CORFU server "FLU" or "FLash Unit" server.
%% %%
%% TODO There is one major missing feature in this FLU implementation: %% TODO There is a major missing feature in this FLU implementation:
%% there is no "write-once" enforcement for any position in a Machi %% there is no "write-once" enforcement for any position in a Machi
%% file. At the moment, we rely on correct behavior of the client %% file. At the moment, we rely on correct behavior of the client
%% &amp; the sequencer to avoid overwriting data. In the Real World, %% &amp; the sequencer to avoid overwriting data. In the Real World,
%% however, all Machi file data is supposed to be exactly write-once %% however, all Machi file data is supposed to be exactly write-once
%% to avoid problems with bugs, wire protocol corruption, malicious %% to avoid problems with bugs, wire protocol corruption, malicious
%% clients, etc. %% clients, etc.
%%
%% TODO The per-file metadata tuple store is missing from this implementation.
%%
%% TODO Section 4.1 ("The FLU") of the Machi design doc suggests that
%% the FLU keep track of the epoch number of the last file write (and
%% perhaps last metadata write), as an optimization for inter-FLU data
%% replication/chain repair.
%%
%% TODO Section 4.2 ("The Sequencer") says that the sequencer must
%% change its file assignments to new & unique names whenever we move
%% to wedge state. This is not yet implemented. In the current
%% Erlang process scheme (which will probably be changing soon), a
%% simple implementation would stop all existing processes that are
%% running run_seq_append_server().
-module(machi_flu1). -module(machi_flu1).
@ -53,9 +67,10 @@
-include("machi_projection.hrl"). -include("machi_projection.hrl").
-export([start_link/1, stop/1]). -export([start_link/1, stop/1]).
-export([make_listener_regname/1, make_projection_server_regname/1]).
-record(state, { -record(state, {
reg_name :: atom(), flu_name :: atom(),
proj_store :: pid(), proj_store :: pid(),
append_pid :: pid(), append_pid :: pid(),
tcp_port :: non_neg_integer(), tcp_port :: non_neg_integer(),
@ -81,15 +96,22 @@ stop(Pid) ->
%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%
main2(RegName, TcpPort, DataDir, Rest) -> main2(FluName, TcpPort, DataDir, Rest) ->
S0 = #state{reg_name=RegName, S0 = #state{flu_name=FluName,
tcp_port=TcpPort, tcp_port=TcpPort,
data_dir=DataDir, data_dir=DataDir,
props=Rest}, props=Rest},
AppendPid = start_append_server(S0), AppendPid = start_append_server(S0),
ProjRegName = make_projection_server_regname(RegName), {_ProjRegName, ProjectionPid} =
{ok, ProjectionPid} = case proplists:get_value(projection_store_registered_name, Rest) of
machi_projection_store:start_link(ProjRegName, DataDir, AppendPid), undefined ->
RN = make_projection_server_regname(FluName),
{ok, PP} =
machi_projection_store:start_link(RN, DataDir, AppendPid),
{RN, PP};
RN ->
{RN, whereis(RN)}
end,
S1 = S0#state{append_pid=AppendPid, S1 = S0#state{append_pid=AppendPid,
proj_store=ProjectionPid}, proj_store=ProjectionPid},
S2 = case proplists:get_value(dbg, Rest) of S2 = case proplists:get_value(dbg, Rest) of
@ -109,7 +131,7 @@ main2(RegName, TcpPort, DataDir, Rest) ->
Projection_e = machi_util:make_projection_filename(DataDir, "unused"), Projection_e = machi_util:make_projection_filename(DataDir, "unused"),
ok = filelib:ensure_dir(Projection_e), ok = filelib:ensure_dir(Projection_e),
put(flu_reg_name, RegName), put(flu_flu_name, FluName),
put(flu_append_pid, AppendPid), put(flu_append_pid, AppendPid),
put(flu_projection_pid, ProjectionPid), put(flu_projection_pid, ProjectionPid),
put(flu_listen_pid, ListenPid), put(flu_listen_pid, ListenPid),
@ -120,44 +142,48 @@ main2(RegName, TcpPort, DataDir, Rest) ->
ok. ok.
start_listen_server(S) -> start_listen_server(S) ->
spawn_link(fun() -> run_listen_server(S) end). proc_lib:spawn_link(fun() -> run_listen_server(S) end).
start_append_server(S) -> start_append_server(S) ->
spawn_link(fun() -> run_append_server(S) end). FluPid = self(),
proc_lib:spawn_link(fun() -> run_append_server(FluPid, S) end).
%% start_projection_server(S) -> %% start_projection_server(S) ->
%% spawn_link(fun() -> run_projection_server(S) end). %% spawn_link(fun() -> run_projection_server(S) end).
run_listen_server(#state{tcp_port=TcpPort}=S) -> run_listen_server(#state{flu_name=FluName, tcp_port=TcpPort}=S) ->
register(make_listener_regname(FluName), self()),
SockOpts = [{reuseaddr, true}, SockOpts = [{reuseaddr, true},
{mode, binary}, {active, false}, {packet, line}], {mode, binary}, {active, false}, {packet, line}],
{ok, LSock} = gen_tcp:listen(TcpPort, SockOpts), {ok, LSock} = gen_tcp:listen(TcpPort, SockOpts),
listen_server_loop(LSock, S). listen_server_loop(LSock, S).
run_append_server(#state{reg_name=Name}=S) -> run_append_server(FluPid, #state{flu_name=Name}=S) ->
register(Name, self()), register(Name, self()),
append_server_loop(S). append_server_loop(FluPid, S).
listen_server_loop(LSock, S) -> listen_server_loop(LSock, S) ->
{ok, Sock} = gen_tcp:accept(LSock), {ok, Sock} = gen_tcp:accept(LSock),
spawn_link(fun() -> net_server_loop(Sock, S) end), spawn_link(fun() -> net_server_loop(Sock, S) end),
listen_server_loop(LSock, S). listen_server_loop(LSock, S).
append_server_loop(#state{data_dir=DataDir}=S) -> append_server_loop(FluPid, #state{data_dir=DataDir}=S) ->
AppendServerPid = self(),
receive receive
{seq_append, From, Prefix, Chunk, CSum} -> {seq_append, From, Prefix, Chunk, CSum} ->
spawn(fun() -> append_server_dispatch(From, Prefix, Chunk, CSum, spawn(fun() -> append_server_dispatch(From, Prefix, Chunk, CSum,
DataDir) end), DataDir, AppendServerPid) end),
append_server_loop(S); %% DataDir, FluPid) end),
append_server_loop(FluPid, S);
{wedge_state_change, Boolean} -> {wedge_state_change, Boolean} ->
append_server_loop(S#state{wedge=Boolean}) append_server_loop(FluPid, S#state{wedge=Boolean})
end. end.
-define(EpochIDSpace, (4+20)). -define(EpochIDSpace, (4+20)).
net_server_loop(Sock, #state{reg_name=RegName, data_dir=DataDir}=S) -> net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) ->
ok = inet:setopts(Sock, [{packet, line}]), ok = inet:setopts(Sock, [{packet, line}]),
case gen_tcp:recv(Sock, 0, 60*1000) of case gen_tcp:recv(Sock, 0, 600*1000) of
{ok, Line} -> {ok, Line} ->
%% machi_util:verb("Got: ~p\n", [Line]), %% machi_util:verb("Got: ~p\n", [Line]),
PrefixLenLF = byte_size(Line) - 2 - ?EpochIDSpace - 8 - 1, PrefixLenLF = byte_size(Line) - 2 - ?EpochIDSpace - 8 - 1,
@ -171,7 +197,7 @@ net_server_loop(Sock, #state{reg_name=RegName, data_dir=DataDir}=S) ->
_EpochIDRaw:(?EpochIDSpace)/binary, _EpochIDRaw:(?EpochIDSpace)/binary,
LenHex:8/binary, LenHex:8/binary,
Prefix:PrefixLenLF/binary, "\n">> -> Prefix:PrefixLenLF/binary, "\n">> ->
do_net_server_append(RegName, Sock, LenHex, Prefix); do_net_server_append(FluName, Sock, LenHex, Prefix);
<<"R ", <<"R ",
_EpochIDRaw:(?EpochIDSpace)/binary, _EpochIDRaw:(?EpochIDSpace)/binary,
OffsetHex:16/binary, LenHex:8/binary, OffsetHex:16/binary, LenHex:8/binary,
@ -219,16 +245,16 @@ net_server_loop(Sock, #state{reg_name=RegName, data_dir=DataDir}=S) ->
exit(normal) exit(normal)
end. end.
append_server_dispatch(From, Prefix, Chunk, CSum, DataDir) -> append_server_dispatch(From, Prefix, Chunk, CSum, DataDir, LinkPid) ->
Pid = write_server_get_pid(Prefix, DataDir), Pid = write_server_get_pid(Prefix, DataDir, LinkPid),
Pid ! {seq_append, From, Prefix, Chunk, CSum}, Pid ! {seq_append, From, Prefix, Chunk, CSum},
exit(normal). exit(normal).
do_net_server_append(RegName, Sock, LenHex, Prefix) -> do_net_server_append(FluName, Sock, LenHex, Prefix) ->
%% TODO: robustify against other invalid path characters such as NUL %% TODO: robustify against other invalid path characters such as NUL
case sanitize_file_string(Prefix) of case sanitize_file_string(Prefix) of
ok -> ok ->
do_net_server_append2(RegName, Sock, LenHex, Prefix); do_net_server_append2(FluName, Sock, LenHex, Prefix);
_ -> _ ->
ok = gen_tcp:send(Sock, <<"ERROR BAD-ARG">>) ok = gen_tcp:send(Sock, <<"ERROR BAD-ARG">>)
end. end.
@ -241,13 +267,13 @@ sanitize_file_string(Str) ->
error error
end. end.
do_net_server_append2(RegName, Sock, LenHex, Prefix) -> do_net_server_append2(FluName, Sock, LenHex, Prefix) ->
<<Len:32/big>> = machi_util:hexstr_to_bin(LenHex), <<Len:32/big>> = machi_util:hexstr_to_bin(LenHex),
ok = inet:setopts(Sock, [{packet, raw}]), ok = inet:setopts(Sock, [{packet, raw}]),
{ok, Chunk} = gen_tcp:recv(Sock, Len, 60*1000), {ok, Chunk} = gen_tcp:recv(Sock, Len, 60*1000),
CSum = machi_util:checksum_chunk(Chunk), CSum = machi_util:checksum_chunk(Chunk),
try try
RegName ! {seq_append, self(), Prefix, Chunk, CSum} FluName ! {seq_append, self(), Prefix, Chunk, CSum}
catch error:badarg -> catch error:badarg ->
error_logger:error_msg("Message send to ~p gave badarg, make certain server is running with correct registered name\n", [?MODULE]) error_logger:error_msg("Message send to ~p gave badarg, make certain server is running with correct registered name\n", [?MODULE])
end, end,
@ -489,22 +515,30 @@ do_net_server_truncate_hackityhack2(Sock, File, DataDir) ->
ok = gen_tcp:send(Sock, "ERROR\n") ok = gen_tcp:send(Sock, "ERROR\n")
end. end.
write_server_get_pid(Prefix, DataDir) -> write_server_get_pid(Prefix, DataDir, LinkPid) ->
case write_server_find_pid(Prefix) of case write_server_find_pid(Prefix) of
undefined -> undefined ->
start_seq_append_server(Prefix, DataDir), start_seq_append_server(Prefix, DataDir, LinkPid),
timer:sleep(1), timer:sleep(1),
write_server_get_pid(Prefix, DataDir); write_server_get_pid(Prefix, DataDir, LinkPid);
Pid -> Pid ->
Pid Pid
end. end.
write_server_find_pid(Prefix) -> write_server_find_pid(Prefix) ->
RegName = machi_util:make_regname(Prefix), FluName = machi_util:make_regname(Prefix),
whereis(RegName). whereis(FluName).
start_seq_append_server(Prefix, DataDir) -> start_seq_append_server(Prefix, DataDir, AppendServerPid) ->
spawn_link(fun() -> run_seq_append_server(Prefix, DataDir) end). proc_lib:spawn_link(fun() ->
%% The following is only necessary to
%% make nice process relationships in
%% 'appmon' and related tools.
put('$ancestors', [AppendServerPid]),
put('$initial_call', {x,y,3}),
link(AppendServerPid),
run_seq_append_server(Prefix, DataDir)
end).
run_seq_append_server(Prefix, DataDir) -> run_seq_append_server(Prefix, DataDir) ->
true = register(machi_util:make_regname(Prefix), self()), true = register(machi_util:make_regname(Prefix), self()),
@ -572,8 +606,8 @@ seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}=FH_, FileNum, Offset) ->
after 30*1000 -> after 30*1000 ->
ok = file:close(FHd), ok = file:close(FHd),
ok = file:close(FHc), ok = file:close(FHc),
machi_util:info_msg("stop: ~p server at file ~w offset ~w\n", machi_util:info_msg("stop: ~p server ~p at file ~w offset ~w\n",
[Prefix, FileNum, Offset]), [Prefix, self(), FileNum, Offset]),
exit(normal) exit(normal)
end. end.
@ -619,5 +653,15 @@ handle_projection_command({list_all_projections, ProjType},
handle_projection_command(Else, _S) -> handle_projection_command(Else, _S) ->
{error, unknown_cmd, Else}. {error, unknown_cmd, Else}.
make_listener_regname(BaseName) ->
list_to_atom(atom_to_list(BaseName) ++ "_listener").
%% This is the name of the projection store that is spawned by the
%% *flu*, for use primarily in testing scenarios. In normal use, we
%% ought to be using the OTP style of managing processes, via
%% supervisors, namely via machi_flu_psup.erl, which uses a
%% *different* naming convention for the projection store name that it
%% registers.
make_projection_server_regname(BaseName) -> make_projection_server_regname(BaseName) ->
list_to_atom(atom_to_list(BaseName) ++ "_projection"). list_to_atom(atom_to_list(BaseName) ++ "_pstore2").

88
src/machi_flu_psup.erl Normal file
View file

@ -0,0 +1,88 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
%% @doc Supervisor for Machi FLU servers and their related support
%% servers.
-module(machi_flu_psup).
-behaviour(supervisor).
%% External API
-export([make_package_spec/4, start_flu_package/4, stop_flu_package/1]).
%% Internal API
-export([start_link/4,
make_p_regname/1, make_mgr_supname/1, make_proj_supname/1]).
%% Supervisor callbacks
-export([init/1]).
make_package_spec(FluName, TcpPort, DataDir, Props) ->
{FluName, {machi_flu_psup, start_link,
[FluName, TcpPort, DataDir, Props]},
permanent, 5000, supervisor, []}.
start_flu_package(FluName, TcpPort, DataDir, Props) ->
Spec = make_package_spec(FluName, TcpPort, DataDir, Props),
{ok, _SupPid} = supervisor:start_child(machi_flu_sup, Spec).
stop_flu_package(FluName) ->
case supervisor:terminate_child(machi_flu_sup, FluName) of
ok ->
ok = supervisor:delete_child(machi_flu_sup, FluName);
Else ->
Else
end.
start_link(FluName, TcpPort, DataDir, Props) ->
supervisor:start_link({local, make_p_regname(FluName)}, ?MODULE,
[FluName, TcpPort, DataDir, Props]).
init([FluName, TcpPort, DataDir, Props0]) ->
RestartStrategy = one_for_all,
MaxRestarts = 1000,
MaxSecondsBetweenRestarts = 3600,
SupFlags = {RestartStrategy, MaxRestarts, MaxSecondsBetweenRestarts},
ProjRegName = make_proj_supname(FluName),
Props = [{projection_store_registered_name, ProjRegName},
{use_partition_simulator,false}|Props0],
ProjSpec = {ProjRegName,
{machi_projection_store, start_link,
[ProjRegName, DataDir, zarfus_todo]},
permanent, 5000, worker, []},
MgrSpec = {make_mgr_supname(FluName),
{machi_chain_manager1, start_link,
[FluName, [], Props]},
permanent, 5000, worker, []},
FluSpec = {FluName,
{machi_flu1, start_link,
[ [{FluName, TcpPort, DataDir}|Props] ]},
permanent, 5000, worker, []},
{ok, {SupFlags, [ProjSpec, MgrSpec, FluSpec]}}.
make_p_regname(FluName) when is_atom(FluName) ->
list_to_atom("flusup_" ++ atom_to_list(FluName)).
make_mgr_supname(MgrName) when is_atom(MgrName) ->
machi_chain_manager1:make_chmgr_regname(MgrName).
make_proj_supname(ProjName) when is_atom(ProjName) ->
list_to_atom(atom_to_list(ProjName) ++ "_pstore").

View file

@ -40,15 +40,12 @@ init([]) ->
RestartStrategy = one_for_one, RestartStrategy = one_for_one,
MaxRestarts = 1000, MaxRestarts = 1000,
MaxSecondsBetweenRestarts = 3600, MaxSecondsBetweenRestarts = 3600,
SupFlags = {RestartStrategy, MaxRestarts, MaxSecondsBetweenRestarts}, SupFlags = {RestartStrategy, MaxRestarts, MaxSecondsBetweenRestarts},
Restart = permanent, Ps = application:get_env(machi, initial_flus, []),
Shutdown = 5000, FLU_specs = [machi_flu_psup:make_package_spec(FluName, TcpPort,
Type = worker, DataDir, Props) ||
{FluName, TcpPort, DataDir, Props} <- Ps],
{ok, {SupFlags, FLU_specs}}.
{ok, FluList} = application:get_env(machi, flu_list),
FluSpecs = [{FluName, {machi_flu, start_link, [FluArgs]},
Restart, Shutdown, Type, []} ||
{FluName, _Port, _Dir}=FluArgs <- FluList],
{ok, {SupFlags, FluSpecs}}.

View file

@ -51,7 +51,7 @@ new(EpochNum, MyName, MemberDict, Down_list, UPI_list, Repairing_list, Dbg) ->
%% or it may be simply `list(p_srvr())', in which case we'll convert it %% or it may be simply `list(p_srvr())', in which case we'll convert it
%% to a `p_srvr_dict()'. %% to a `p_srvr_dict()'.
new(EpochNum, MyName, MembersDict0, Down_list, UPI_list, Repairing_list, new(EpochNum, MyName, [_|_] = MembersDict0, Down_list, UPI_list, Repairing_list,
Dbg, Dbg2) Dbg, Dbg2)
when is_integer(EpochNum), EpochNum >= 0, when is_integer(EpochNum), EpochNum >= 0,
is_atom(MyName) orelse is_binary(MyName), is_atom(MyName) orelse is_binary(MyName),
@ -85,6 +85,21 @@ new(EpochNum, MyName, MembersDict0, Down_list, UPI_list, Repairing_list,
repairing=Repairing_list, repairing=Repairing_list,
dbg=Dbg dbg=Dbg
}, },
update_dbg2(update_checksum(P), Dbg2);
new(EpochNum, MyName, [] = _MembersDict0, _Down_list, _UPI_list,_Repairing_list,
Dbg, Dbg2)
when is_integer(EpochNum), EpochNum >= 0,
is_atom(MyName) orelse is_binary(MyName) ->
P = #projection_v1{epoch_number=EpochNum,
creation_time=now(),
author_server=MyName,
all_members=[],
members_dict=[],
down=[],
upi=[],
repairing=[],
dbg=Dbg
},
update_dbg2(update_checksum(P), Dbg2). update_dbg2(update_checksum(P), Dbg2).
%% @doc Update the checksum element of a projection record. %% @doc Update the checksum element of a projection record.

View file

@ -36,7 +36,6 @@
%% `private' type); the value is a projection data structure %% `private' type); the value is a projection data structure
%% (`projection_v1()' type). %% (`projection_v1()' type).
-module(machi_projection_store). -module(machi_projection_store).
-include("machi_projection.hrl"). -include("machi_projection.hrl").

View file

@ -250,7 +250,7 @@ do_req(Req, S) ->
end end
end; end;
false -> false ->
{{error, not_connected}, S2} {{error, partition}, S2}
end. end.
make_req_fun({append_chunk, EpochID, Prefix, Chunk}, #state{sock=Sock}) -> make_req_fun({append_chunk, EpochID, Prefix, Chunk}, #state{sock=Sock}) ->

View file

@ -239,6 +239,17 @@ info_msg(Fmt, Args) ->
_ -> error_logger:info_msg(Fmt, Args) _ -> error_logger:info_msg(Fmt, Args)
end. end.
wait_for_death(Pid, 0) ->
exit({not_dead_yet, Pid});
wait_for_death(Pid, Iters) when is_pid(Pid) ->
case erlang:is_process_alive(Pid) of
false ->
ok;
true ->
timer:sleep(1),
wait_for_death(Pid, Iters-1)
end.
%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%
%% @doc Create a TCP connection to a remote Machi server. %% @doc Create a TCP connection to a remote Machi server.

View file

@ -23,6 +23,8 @@
-ifdef(TEST). -ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-include("machi.hrl"). -include("machi.hrl").
-include("machi_projection.hrl"). -include("machi_projection.hrl").

View file

@ -310,7 +310,7 @@ convergence_demo_testfun(NumFLUs) ->
io:format(user, "We should see convergence to 1 correct chain.\n", []), io:format(user, "We should see convergence to 1 correct chain.\n", []),
machi_partition_simulator:no_partitions(), machi_partition_simulator:no_partitions(),
[DoIt(50, 10, 100) || _ <- [1]], [DoIt(50, 10, 100) || _ <- [1]],
io:format(user, "Sweet, finishing early\n", []), exit(yoyoyo_testing_hack), io:format(user, "Sweet, finishing early\n", []), exit(yoyoyo_testing_hack_finishing_early),
%% WARNING: In asymmetric partitions, private_projections_are_stable() %% WARNING: In asymmetric partitions, private_projections_are_stable()
%% will never be true; code beyond this point on the -exp3 %% will never be true; code beyond this point on the -exp3
%% branch is bit-rotted, sorry! %% branch is bit-rotted, sorry!

View file

@ -0,0 +1,116 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(machi_flu_psup_test).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-include("machi_projection.hrl").
%% smoke_test2() will try repeatedly to make a TCP connection to ports
%% on localhost that have no listener.
%% If you use 'sysctl -w net.inet.icmp.icmplim=3' before running this
%% test, you'll get to exercise some timeout handling in
%% machi_chain_manager1:perhaps_call_t().
%% The default for net.inet.icmp.icmplim is 50.
smoke_test_() ->
{timeout, 5*60, fun() -> smoke_test2() end}.
smoke_test2() ->
Ps = [{a,#p_srvr{name=a, address="localhost", port=5555, props="./data.a"}},
{b,#p_srvr{name=b, address="localhost", port=5556, props="./data.b"}},
{c,#p_srvr{name=c, address="localhost", port=5557, props="./data.c"}}
],
[os:cmd("rm -rf " ++ P#p_srvr.props) || {_,P} <- Ps],
{ok, SupPid} = machi_flu_sup:start_link(),
try
%% Only run a, don't run b & c so we have 100% failures talking to them
[begin
#p_srvr{name=Name, port=Port, props=Dir} = P,
{ok, _} = machi_flu_psup:start_flu_package(Name, Port, Dir, [])
end || {_,P} <- [hd(Ps)]],
[machi_chain_manager1:test_react_to_env(a_chmgr) || _ <-lists:seq(1,5)],
machi_chain_manager1:set_chain_members(a_chmgr, orddict:from_list(Ps)),
[machi_chain_manager1:test_react_to_env(a_chmgr) || _ <-lists:seq(1,5)],
ok
after
exit(SupPid, normal),
[os:cmd("rm -rf " ++ P#p_srvr.props) || {_,P} <- Ps],
machi_util:wait_for_death(SupPid, 100),
ok
end.
smoke2_test_() ->
{timeout, 5*60, fun() -> smoke2_test2() end}.
smoke2_test2() ->
Ps = [{a,#p_srvr{name=a, address="localhost", port=5555, props="./data.a"}},
{b,#p_srvr{name=b, address="localhost", port=5556, props="./data.b"}},
{c,#p_srvr{name=c, address="localhost", port=5557, props="./data.c"}}
],
[os:cmd("rm -rf " ++ P#p_srvr.props) || {_,P} <- Ps],
{ok, SupPid} = machi_flu_sup:start_link(),
try
[begin
#p_srvr{name=Name, port=Port, props=Dir} = P,
{ok, _} = machi_flu_psup:start_flu_package(Name, Port, Dir,
[{active_mode,false}])
end || {_,P} <- Ps],
ChMgrs = [machi_flu_psup:make_mgr_supname(P#p_srvr.name) || {_,P} <-Ps],
PStores = [machi_flu_psup:make_proj_supname(P#p_srvr.name) || {_,P} <-Ps],
Dict = orddict:from_list(Ps),
[machi_chain_manager1:set_chain_members(ChMgr, Dict) ||
ChMgr <- ChMgrs ],
{now_using,_,_} = machi_chain_manager1:test_react_to_env(hd(ChMgrs)),
[begin
_QQa = machi_chain_manager1:test_react_to_env(ChMgr)
end || _ <- lists:seq(1,25), ChMgr <- ChMgrs],
%% All chain maanagers & projection stores should be using the
%% same projection which is max projection in each store.
{no_change,_,Epoch_z} = machi_chain_manager1:test_react_to_env(
hd(ChMgrs)),
[{no_change,_,Epoch_z} = machi_chain_manager1:test_react_to_env(
ChMgr )|| ChMgr <- ChMgrs],
{ok, Proj_z} = machi_projection_store:read_latest_projection(
hd(PStores), public),
[begin
{ok, Proj_z} = machi_projection_store:read_latest_projection(
PStore, ProjType)
end || ProjType <- [public, private], PStore <- PStores ],
Epoch_z = Proj_z#projection_v1.epoch_number,
ok
after
exit(SupPid, normal),
[os:cmd("rm -rf " ++ P#p_srvr.props) || {_,P} <- Ps],
machi_util:wait_for_death(SupPid, 100),
ok
end.
-endif. % TEST

View file

@ -49,7 +49,7 @@ api_smoke_test() ->
{error,_} = ?MUT:append_chunk(Prox1, {error,_} = ?MUT:append_chunk(Prox1,
FakeEpoch, <<"prefix">>, <<"data">>, FakeEpoch, <<"prefix">>, <<"data">>,
infinity), infinity),
{error,not_connected} = ?MUT:append_chunk(Prox1, {error,partition} = ?MUT:append_chunk(Prox1,
FakeEpoch, <<"prefix">>, <<"data">>, FakeEpoch, <<"prefix">>, <<"data">>,
infinity), infinity),
%% Start the FLU again, we should be able to do stuff immediately %% Start the FLU again, we should be able to do stuff immediately
@ -65,7 +65,7 @@ api_smoke_test() ->
%% Alright, now for the rest of the API, whee %% Alright, now for the rest of the API, whee
BadFile = <<"no-such-file">>, BadFile = <<"no-such-file">>,
{error, no_such_file} = ?MUT:checksum_list(Prox1, FakeEpoch, BadFile), {error, no_such_file} = ?MUT:checksum_list(Prox1, FakeEpoch, BadFile),
{ok, [_]} = ?MUT:list_files(Prox1, FakeEpoch), {ok, [_|_]} = ?MUT:list_files(Prox1, FakeEpoch),
{ok, FakeEpoch} = ?MUT:get_latest_epoch(Prox1, public), {ok, FakeEpoch} = ?MUT:get_latest_epoch(Prox1, public),
{error, not_written} = ?MUT:read_latest_projection(Prox1, public), {error, not_written} = ?MUT:read_latest_projection(Prox1, public),
{error, not_written} = ?MUT:read_projection(Prox1, public, 44), {error, not_written} = ?MUT:read_projection(Prox1, public, 44),