From add6f421aa03a6f397b90ceaed4b8438f625ec2f Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Mon, 27 Oct 2014 19:48:12 +0900 Subject: [PATCH] WIP: chain mgmt prototype scaffolding 2 --- prototype/poc-machi/include/machi.hrl | 3 +- .../poc-machi/src/machi_chain_manager1.erl | 65 +++++++++++++++---- prototype/poc-machi/src/machi_flu0.erl | 8 +++ 3 files changed, 61 insertions(+), 15 deletions(-) diff --git a/prototype/poc-machi/include/machi.hrl b/prototype/poc-machi/include/machi.hrl index 1c76764..91e7541 100644 --- a/prototype/poc-machi/include/machi.hrl +++ b/prototype/poc-machi/include/machi.hrl @@ -43,7 +43,8 @@ down :: [m_server()], upi :: [m_server()], repairing :: [m_server()], - dbg :: list() %proplist() + dbg :: list(), %proplist() checksummed + dbg2 :: list() %proplist() not checksummed }). -record(ch_mgr, { diff --git a/prototype/poc-machi/src/machi_chain_manager1.erl b/prototype/poc-machi/src/machi_chain_manager1.erl index fa3ab34..9e80b52 100644 --- a/prototype/poc-machi/src/machi_chain_manager1.erl +++ b/prototype/poc-machi/src/machi_chain_manager1.erl @@ -73,7 +73,7 @@ init({MyName, All_list, Seed, OldThreshold, NoPartitionThreshold, MyFLUPid}) -> {no_partition_threshold, NoPartitionThreshold}], S = #ch_mgr{name=MyName, proj=make_initial_projection(MyName, All_list, All_list, - [], []), + [{author_proc, init}], []), myflu=MyFLUPid, % pid or atom local name runenv=RunEnv}, {ok, S}. @@ -106,12 +106,20 @@ code_change(_OldVsn, S, _Extra) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -do_cl_write_current_proj(#ch_mgr{proj=Proj} = S) -> +do_cl_write_current_proj(#ch_mgr{proj=Proj, myflu=MyFLU} = S) -> #projection{epoch_number=Epoch} = Proj, case cl_write_public_proj(Epoch, Proj, S) of {ok, S2} -> case cl_read_public_proj(S2) of {ok, Proj2, S3} -> + ?D(Proj2), + ?D(machi_flu0:get_epoch(MyFLU)), + Proj2b = update_projection_dbg2( + Proj2, [{hooray, {date(), time()}}]), + ok = machi_flu0:proj_write(MyFLU, Epoch, private, Proj2b), + MyP = make_projection_summary(element(2, + machi_flu0:proj_read_latest(MyFLU, private))), + ?D(MyP), {ok, S3#ch_mgr{proj=Proj2}}; {_Other3, _S3}=Else3 -> Else3 @@ -142,10 +150,9 @@ cl_write_public_proj_local(Epoch, Proj, #ch_mgr{myflu=MyFLU}=S) -> cl_write_public_proj_remote(FLUs, Partitions, Epoch, Proj, S) -> %% We're going to be very cavalier about this write because we'll rely %% on the read side to do any read repair. - Rs = - [perhaps_call_t(S, Partitions, FLU, - fun() -> machi_flu0:proj_write(FLU, Epoch, public, Proj) end) || - FLU <- FLUs], + DoIt = fun(X) -> machi_flu0:proj_write(X, Epoch, public, Proj) end, + Rs = [perhaps_call_t(S, Partitions, FLU, fun() -> DoIt(FLU) end) || + FLU <- FLUs], case lists:usort(Rs) of [ok] -> {ok, S}; @@ -153,18 +160,40 @@ cl_write_public_proj_remote(FLUs, Partitions, Epoch, Proj, S) -> {{mixed_bag, lists:zip(FLUs, Rs)}, S} end. -cl_read_public_proj(S) -> +cl_read_public_proj(#ch_mgr{proj=Proj0}=S) -> + #projection{all_members=All_list} = Proj0, + {_UpNodes, Partitions, S2} = calc_up_nodes(S), %% todo - {ok, S#ch_mgr.proj, S}. + ?D({todo, All_list, Partitions}), + DoIt = fun(X) -> + case machi_flu0:proj_read_latest(X, public) of + {ok, P} -> P; + Else -> Else + end + end, + Rs = [perhaps_call_t(S, Partitions, FLU, fun() -> DoIt(FLU) end) || + FLU <- All_list], + case lists:usort(Rs) of + [P] when is_record(P, projection) -> + {ok, S#ch_mgr.proj, S2} + end. make_initial_projection(MyName, All_list, UPI_list, Repairing_list, Ps) -> make_projection(1, 0, <<>>, MyName, All_list, [], UPI_list, Repairing_list, Ps). make_projection(EpochNum, PrevEpochNum, PrevEpochCSum, - MyName, All_list, Down_list, UPI_list, Repairing_list, Ps) -> + MyName, All_list, Down_list, UPI_list, Repairing_list, + Dbg) -> + make_projection(EpochNum, PrevEpochNum, PrevEpochCSum, + MyName, All_list, Down_list, UPI_list, Repairing_list, + Dbg, []). + +make_projection(EpochNum, PrevEpochNum, PrevEpochCSum, + MyName, All_list, Down_list, UPI_list, Repairing_list, + Dbg, Dbg2) -> P = #projection{epoch_number=EpochNum, - epoch_csum= <<>>, + epoch_csum= <<>>, % always checksums as <<>> prev_epoch_num=PrevEpochNum, prev_epoch_csum=PrevEpochCSum, creation_time=now(), @@ -173,9 +202,15 @@ make_projection(EpochNum, PrevEpochNum, PrevEpochCSum, down=Down_list, upi=UPI_list, repairing=Repairing_list, - dbg=Ps}, + dbg=Dbg, + dbg2=[] % always checksums as [] + }, CSum = crypto:hash(sha, term_to_binary(P)), - P#projection{epoch_csum=CSum}. + P#projection{epoch_csum=CSum, + dbg2=Dbg2}. + +update_projection_dbg2(P, Dbg2) when is_list(Dbg2) -> + P#projection{dbg2=Dbg2}. %% OldThreshold: Percent chance of using the old/previous network partition list %% NoPartitionThreshold: If the network partition changes, what are the odds @@ -274,9 +309,11 @@ make_projection_summary(#projection{epoch_number=EpochNum, all_members=_All_list, down=Down_list, upi=UPI_list, - repairing=Repairing_list}) -> + repairing=Repairing_list, + dbg=Dbg, dbg2=Dbg2}) -> [{epoch,EpochNum}, - {upi,UPI_list},{repair,Repairing_list},{down,Down_list}]. + {upi,UPI_list},{repair,Repairing_list},{down,Down_list}, + {d,Dbg}, {d2,Dbg2}]. roll_dice(N, RunEnv) -> Seed1 = proplists:get_value(seed, RunEnv), diff --git a/prototype/poc-machi/src/machi_flu0.erl b/prototype/poc-machi/src/machi_flu0.erl index b38040b..e9a3e66 100644 --- a/prototype/poc-machi/src/machi_flu0.erl +++ b/prototype/poc-machi/src/machi_flu0.erl @@ -27,6 +27,7 @@ -export([start_link/1, stop/1, write/3, read/2, trim/2, + get_epoch/1, proj_write/4, proj_read/3, proj_get_latest_num/2, proj_read_latest/2]). -export([set_fake_repairing_status/2, get_fake_repairing_status/1]). -export([make_proj/1, make_proj/2]). @@ -78,6 +79,9 @@ write(Pid, Epoch, Bin) -> trim(Pid, Epoch) -> g_call(Pid, {reg_op, Epoch, trim}, ?LONG_TIME). +get_epoch(Pid) -> + g_call(Pid, {get_epoch}, ?LONG_TIME). + proj_write(Pid, Epoch, StoreType, Proj) when StoreType == public; StoreType == private -> g_call(Pid, {proj_write, Epoch, StoreType, Proj}, ?LONG_TIME). @@ -169,6 +173,10 @@ handle_call({{reg_op, _Epoch, trim}, LC1}, _From, #state{register=trimmed} = S) LC2 = lclock_update(LC1), {reply, {error_trimmed, LC2}, S}; +handle_call({{get_epoch}, LC1}, _From, S) -> + LC2 = lclock_update(LC1), + {Reply, NewS} = {S#state.proj_epoch, S}, + {reply, {Reply, LC2}, NewS}; handle_call({{proj_write, Epoch, StoreType, Proj}, LC1}, _From, S) -> LC2 = lclock_update(LC1), {Reply, NewS} = do_proj_write(Epoch, StoreType, Proj, S),