From 5e49bd6c29cc4c93521e71ccb2b9d3d1b785b3c0 Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Mon, 27 Oct 2014 17:18:34 +0900 Subject: [PATCH] WIP: Machi chain management PULSE prototype work --- prototype/poc-machi/include/machi.hrl | 1 + .../poc-machi/src/machi_chain_manager1.erl | 259 ++++++++++++++++++ 2 files changed, 260 insertions(+) create mode 100644 prototype/poc-machi/src/machi_chain_manager1.erl diff --git a/prototype/poc-machi/include/machi.hrl b/prototype/poc-machi/include/machi.hrl index b110f39..1c76764 100644 --- a/prototype/poc-machi/include/machi.hrl +++ b/prototype/poc-machi/include/machi.hrl @@ -49,6 +49,7 @@ -record(ch_mgr, { name :: m_server(), proj :: #projection{}, + myflu :: pid() | atom(), %% runenv :: list() %proplist() }). diff --git a/prototype/poc-machi/src/machi_chain_manager1.erl b/prototype/poc-machi/src/machi_chain_manager1.erl new file mode 100644 index 0000000..2974bc5 --- /dev/null +++ b/prototype/poc-machi/src/machi_chain_manager1.erl @@ -0,0 +1,259 @@ +%% ------------------------------------------------------------------- +%% +%% Machi: a small village of replicated files +%% +%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- +-module(machi_chain_manager1). + +-behaviour(gen_server). + +-include("machi.hrl"). +-define(D(X), io:format(user, "~s ~p\n", [??X, X])). + +%% API +-export([start_link/4, stop/1, + calculate_projection_internal_old/3]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-export([calc_projection/3, make_projection_summary/1]). + +-ifdef(TEST). + +-ifdef(EQC). +-include_lib("eqc/include/eqc.hrl"). +-endif. +-ifdef(PULSE). +-compile({parse_transform, pulse_instrument}). +-endif. + +-include_lib("eunit/include/eunit.hrl"). +-compile(export_all). +-endif. %TEST + +start_link(MyName, All_list, Seed, MyFLUPid) -> + gen_server:start_link(?MODULE, {MyName, All_list, Seed, MyFLUPid}, []). + +calculate_projection_internal_old(Pid, OldThreshold, NoPartitionThreshold) -> + gen_server:call(Pid, {calculate_projection_internal_old, OldThreshold, NoPartitionThreshold}, infinity). + +stop(Pid) -> + gen_server:call(Pid, {stop}, infinity). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +init({MyName, All_list, Seed, MyFLUPid}) -> + RunEnv = [{seed, Seed}, + {network_partitions, []}], + S = #ch_mgr{name=MyName, + proj=make_initial_projection(MyName, All_list, All_list, + [], []), + myflu=MyFLUPid, % pid or atom local name + runenv=RunEnv}, + {ok, S}. + +handle_call({calculate_projection_internal_old, OldThreshold, NoPartitionThreshold}, _From, S) -> + {Reply, S2} = calc_projection(OldThreshold, NoPartitionThreshold, S), + {reply, Reply, S2}; +handle_call({stop}, _From, S) -> + {stop, normal, ok, S}; +handle_call(_Call, _From, S) -> + {reply, whaaaaaaaaaa, S}. + +handle_cast(_Cast, S) -> + {noreply, S}. + +handle_info(_Msg, S) -> + {noreply, S}. + +terminate(_Reason, _S) -> + ok. + +code_change(_OldVsn, S, _Extra) -> + {ok, S}. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +make_initial_projection(MyName, All_list, UPI_list, Repairing_list, Ps) -> + make_projection(1, 0, <<>>, + MyName, All_list, [], UPI_list, Repairing_list, Ps). + +make_projection(EpochNum, PrevEpochNum, PrevEpochCSum, + MyName, All_list, Down_list, UPI_list, Repairing_list, Ps) -> + P = #projection{epoch_number=EpochNum, + epoch_csum= <<>>, + prev_epoch_num=PrevEpochNum, + prev_epoch_csum=PrevEpochCSum, + creation_time=now(), + author_server=MyName, + all_members=All_list, + down=Down_list, + upi=UPI_list, + repairing=Repairing_list, + dbg=Ps}, + CSum = crypto:hash(sha, term_to_binary(P)), + P#projection{epoch_csum=CSum}. + +%% OldThreshold: Percent chance of using the old/previous network partition list +%% NoPartitionThreshold: If the network partition changes, what are the odds +%% that there are no partitions at all? + +calc_projection(OldThreshold, NoPartitionThreshold, + #ch_mgr{proj=LastProj} = S) -> + calc_projection(OldThreshold, NoPartitionThreshold, LastProj, S). + +calc_projection(OldThreshold, NoPartitionThreshold, LastProj, + #ch_mgr{name=MyName, runenv=RunEnv1} = S) -> + #projection{epoch_number=OldEpochNum, + epoch_csum=OldEpochCsum, + all_members=All_list, + upi=OldUPI_list, + repairing=OldRepairing_list + } = LastProj, + LastUp = lists:usort(OldUPI_list ++ OldRepairing_list), + AllMembers = (S#ch_mgr.proj)#projection.all_members, + {Up, RunEnv2} = calc_up_nodes(MyName, OldThreshold, NoPartitionThreshold, + AllMembers, RunEnv1), + NewUp = Up -- LastUp, + Down = AllMembers -- Up, + + NewUPI_list = [X || X <- OldUPI_list, lists:member(X, Up)], + Repairing_list2 = [X || X <- OldRepairing_list, lists:member(X, Up)], + {NewUPI_list3, Repairing_list3, RunEnv3} = + case {NewUp, Repairing_list2} of + {[], []} -> + {NewUPI_list, [], RunEnv2}; + {[], [H|T]} -> + {Prob, RunEnvX} = roll_dice(100, RunEnv2), + if Prob =< 50 -> + {NewUPI_list ++ [H], T, RunEnvX}; + true -> + {NewUPI_list, OldRepairing_list, RunEnvX} + end; + {_, _} -> + {NewUPI_list, OldRepairing_list, RunEnv2} + end, + Repairing_list4 = case NewUp of + [] -> Repairing_list3; + NewUp -> Repairing_list3 ++ NewUp + end, + Repairing_list5 = Repairing_list4 -- Down, + P = make_projection(OldEpochNum + 1, OldEpochNum, OldEpochCsum, + MyName, All_list, Down, NewUPI_list3, Repairing_list5, + [goo]), + {P, S#ch_mgr{runenv=RunEnv3}}. + +calc_up_nodes(MyName, OldThreshold, NoPartitionThreshold, + AllMembers, RunEnv1) -> + Seed1 = proplists:get_value(seed, RunEnv1), + Partitions1 = proplists:get_value(network_partitions, RunEnv1), + {Seed2, Partitions2} = + calc_network_partitions(AllMembers, Seed1, Partitions1, + OldThreshold, NoPartitionThreshold), + UpNodes = lists:sort( + [Node || Node <- AllMembers, + not lists:member({MyName, Node}, Partitions2), + not lists:member({Node, MyName}, Partitions2)]), + RunEnv2 = replace(RunEnv1, + [{seed, Seed2}, {network_partitions, Partitions2}]), + {UpNodes, RunEnv2}. + + +calc_network_partitions(Nodes, Seed1, OldPartition, + OldThreshold, NoPartitionThreshold) -> + {Cutoff2, Seed2} = random:uniform_s(100, Seed1), + if Cutoff2 < OldThreshold -> + {Seed2, OldPartition}; + true -> + {Cutoff3, Seed3} = random:uniform_s(100, Seed1), + if Cutoff3 < NoPartitionThreshold -> + {Seed3, []}; + %% Cutoff3 rem 10 < 5 -> + %% %% case get(goofus) of undefined -> put(goofus, true), io:format(user, "~w", [Cutoff3 rem 10]); _ -> ok end, + %% OldSeed = case random:seed(Seed3) of undefined -> now(); + %% Else -> Else + %% end, + %% {Down, _} = lists:partition(fun(X) -> X /= 'a' andalso random:uniform() < 0.5 end, Nodes), + %% %% case get(goofus) of undefined -> put(goofus, true), io:format(user, "~w", [Down]); _ -> ok end, + %% Partitions = [{X, Y} || X <- Nodes, Y <- Down], + %% random:seed(OldSeed), + %% {Seed3, Partitions}; + true -> + make_network_partition_locations(Nodes, Seed3) + end + end. + +replace(PropList, Items) -> + lists:foldl(fun({Key, Val}, Ps) -> + lists:keyreplace(Key, 1, Ps, {Key,Val}) + end, PropList, Items). + +make_projection_summary(#projection{epoch_number=EpochNum, + all_members=_All_list, + down=Down_list, + upi=UPI_list, + repairing=Repairing_list}) -> + [{epoch,EpochNum}, + {upi,UPI_list},{repair,Repairing_list},{down,Down_list}]. + +roll_dice(N, RunEnv) -> + Seed1 = proplists:get_value(seed, RunEnv), + {Val, Seed2} = random:uniform_s(N, Seed1), + {Val, replace(RunEnv, [{seed, Seed2}])}. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +make_network_partition_locations(Nodes, Seed1) -> + Pairs = make_all_pairs(Nodes), + Num = length(Pairs), + {Seed2, Weights} = lists:foldl( + fun(_, {Seeda, Acc}) -> + {Cutoff, Seedb} = random:uniform_s(100, Seeda), + {Seedb, [Cutoff|Acc]} + end, {Seed1, []}, lists:seq(1, Num)), + {Cutoff3, Seed3} = random:uniform_s(100, Seed2), + {Seed3, [X || {Weight, X} <- lists:zip(Weights, Pairs), + Weight < Cutoff3]}. + +make_all_pairs(L) -> + lists:flatten(make_all_pairs2(lists:usort(L))). + +make_all_pairs2([]) -> + []; +make_all_pairs2([_]) -> + []; +make_all_pairs2([H1|T]) -> + [[{H1, X}, {X, H1}] || X <- T] ++ make_all_pairs(T). + +-ifdef(TEST). + +-define(MGR, machi_chain_manager1). + +smoke0_test() -> + {ok, M0} = ?MGR:start_link(a, [a,b,c], {1,2,3}, a), + try + [begin + Proj = ?MGR:calculate_projection_internal_old(M0, 50, 50), + io:format(user, "~p\n", [?MGR:make_projection_summary(Proj)]) + end || _ <- lists:seq(1,5)] + after + ok = ?MGR:stop(M0) + end. + +-endif.