WIP: chain mgmt prototype scaffolding 2

This commit is contained in:
Scott Lystig Fritchie 2014-10-27 19:48:12 +09:00
parent 9c04537497
commit add6f421aa
3 changed files with 61 additions and 15 deletions

View file

@ -43,7 +43,8 @@
down :: [m_server()], down :: [m_server()],
upi :: [m_server()], upi :: [m_server()],
repairing :: [m_server()], repairing :: [m_server()],
dbg :: list() %proplist() dbg :: list(), %proplist() checksummed
dbg2 :: list() %proplist() not checksummed
}). }).
-record(ch_mgr, { -record(ch_mgr, {

View file

@ -73,7 +73,7 @@ init({MyName, All_list, Seed, OldThreshold, NoPartitionThreshold, MyFLUPid}) ->
{no_partition_threshold, NoPartitionThreshold}], {no_partition_threshold, NoPartitionThreshold}],
S = #ch_mgr{name=MyName, S = #ch_mgr{name=MyName,
proj=make_initial_projection(MyName, All_list, All_list, proj=make_initial_projection(MyName, All_list, All_list,
[], []), [{author_proc, init}], []),
myflu=MyFLUPid, % pid or atom local name myflu=MyFLUPid, % pid or atom local name
runenv=RunEnv}, runenv=RunEnv},
{ok, S}. {ok, S}.
@ -106,12 +106,20 @@ code_change(_OldVsn, S, _Extra) ->
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
do_cl_write_current_proj(#ch_mgr{proj=Proj} = S) -> do_cl_write_current_proj(#ch_mgr{proj=Proj, myflu=MyFLU} = S) ->
#projection{epoch_number=Epoch} = Proj, #projection{epoch_number=Epoch} = Proj,
case cl_write_public_proj(Epoch, Proj, S) of case cl_write_public_proj(Epoch, Proj, S) of
{ok, S2} -> {ok, S2} ->
case cl_read_public_proj(S2) of case cl_read_public_proj(S2) of
{ok, Proj2, S3} -> {ok, Proj2, S3} ->
?D(Proj2),
?D(machi_flu0:get_epoch(MyFLU)),
Proj2b = update_projection_dbg2(
Proj2, [{hooray, {date(), time()}}]),
ok = machi_flu0:proj_write(MyFLU, Epoch, private, Proj2b),
MyP = make_projection_summary(element(2,
machi_flu0:proj_read_latest(MyFLU, private))),
?D(MyP),
{ok, S3#ch_mgr{proj=Proj2}}; {ok, S3#ch_mgr{proj=Proj2}};
{_Other3, _S3}=Else3 -> {_Other3, _S3}=Else3 ->
Else3 Else3
@ -142,9 +150,8 @@ cl_write_public_proj_local(Epoch, Proj, #ch_mgr{myflu=MyFLU}=S) ->
cl_write_public_proj_remote(FLUs, Partitions, Epoch, Proj, S) -> cl_write_public_proj_remote(FLUs, Partitions, Epoch, Proj, S) ->
%% We're going to be very cavalier about this write because we'll rely %% We're going to be very cavalier about this write because we'll rely
%% on the read side to do any read repair. %% on the read side to do any read repair.
Rs = DoIt = fun(X) -> machi_flu0:proj_write(X, Epoch, public, Proj) end,
[perhaps_call_t(S, Partitions, FLU, Rs = [perhaps_call_t(S, Partitions, FLU, fun() -> DoIt(FLU) end) ||
fun() -> machi_flu0:proj_write(FLU, Epoch, public, Proj) end) ||
FLU <- FLUs], FLU <- FLUs],
case lists:usort(Rs) of case lists:usort(Rs) of
[ok] -> [ok] ->
@ -153,18 +160,40 @@ cl_write_public_proj_remote(FLUs, Partitions, Epoch, Proj, S) ->
{{mixed_bag, lists:zip(FLUs, Rs)}, S} {{mixed_bag, lists:zip(FLUs, Rs)}, S}
end. end.
cl_read_public_proj(S) -> cl_read_public_proj(#ch_mgr{proj=Proj0}=S) ->
#projection{all_members=All_list} = Proj0,
{_UpNodes, Partitions, S2} = calc_up_nodes(S),
%% todo %% todo
{ok, S#ch_mgr.proj, S}. ?D({todo, All_list, Partitions}),
DoIt = fun(X) ->
case machi_flu0:proj_read_latest(X, public) of
{ok, P} -> P;
Else -> Else
end
end,
Rs = [perhaps_call_t(S, Partitions, FLU, fun() -> DoIt(FLU) end) ||
FLU <- All_list],
case lists:usort(Rs) of
[P] when is_record(P, projection) ->
{ok, S#ch_mgr.proj, S2}
end.
make_initial_projection(MyName, All_list, UPI_list, Repairing_list, Ps) -> make_initial_projection(MyName, All_list, UPI_list, Repairing_list, Ps) ->
make_projection(1, 0, <<>>, make_projection(1, 0, <<>>,
MyName, All_list, [], UPI_list, Repairing_list, Ps). MyName, All_list, [], UPI_list, Repairing_list, Ps).
make_projection(EpochNum, PrevEpochNum, PrevEpochCSum, make_projection(EpochNum, PrevEpochNum, PrevEpochCSum,
MyName, All_list, Down_list, UPI_list, Repairing_list, Ps) -> MyName, All_list, Down_list, UPI_list, Repairing_list,
Dbg) ->
make_projection(EpochNum, PrevEpochNum, PrevEpochCSum,
MyName, All_list, Down_list, UPI_list, Repairing_list,
Dbg, []).
make_projection(EpochNum, PrevEpochNum, PrevEpochCSum,
MyName, All_list, Down_list, UPI_list, Repairing_list,
Dbg, Dbg2) ->
P = #projection{epoch_number=EpochNum, P = #projection{epoch_number=EpochNum,
epoch_csum= <<>>, epoch_csum= <<>>, % always checksums as <<>>
prev_epoch_num=PrevEpochNum, prev_epoch_num=PrevEpochNum,
prev_epoch_csum=PrevEpochCSum, prev_epoch_csum=PrevEpochCSum,
creation_time=now(), creation_time=now(),
@ -173,9 +202,15 @@ make_projection(EpochNum, PrevEpochNum, PrevEpochCSum,
down=Down_list, down=Down_list,
upi=UPI_list, upi=UPI_list,
repairing=Repairing_list, repairing=Repairing_list,
dbg=Ps}, dbg=Dbg,
dbg2=[] % always checksums as []
},
CSum = crypto:hash(sha, term_to_binary(P)), CSum = crypto:hash(sha, term_to_binary(P)),
P#projection{epoch_csum=CSum}. P#projection{epoch_csum=CSum,
dbg2=Dbg2}.
update_projection_dbg2(P, Dbg2) when is_list(Dbg2) ->
P#projection{dbg2=Dbg2}.
%% OldThreshold: Percent chance of using the old/previous network partition list %% OldThreshold: Percent chance of using the old/previous network partition list
%% NoPartitionThreshold: If the network partition changes, what are the odds %% NoPartitionThreshold: If the network partition changes, what are the odds
@ -274,9 +309,11 @@ make_projection_summary(#projection{epoch_number=EpochNum,
all_members=_All_list, all_members=_All_list,
down=Down_list, down=Down_list,
upi=UPI_list, upi=UPI_list,
repairing=Repairing_list}) -> repairing=Repairing_list,
dbg=Dbg, dbg2=Dbg2}) ->
[{epoch,EpochNum}, [{epoch,EpochNum},
{upi,UPI_list},{repair,Repairing_list},{down,Down_list}]. {upi,UPI_list},{repair,Repairing_list},{down,Down_list},
{d,Dbg}, {d2,Dbg2}].
roll_dice(N, RunEnv) -> roll_dice(N, RunEnv) ->
Seed1 = proplists:get_value(seed, RunEnv), Seed1 = proplists:get_value(seed, RunEnv),

View file

@ -27,6 +27,7 @@
-export([start_link/1, stop/1, -export([start_link/1, stop/1,
write/3, read/2, trim/2, write/3, read/2, trim/2,
get_epoch/1,
proj_write/4, proj_read/3, proj_get_latest_num/2, proj_read_latest/2]). proj_write/4, proj_read/3, proj_get_latest_num/2, proj_read_latest/2]).
-export([set_fake_repairing_status/2, get_fake_repairing_status/1]). -export([set_fake_repairing_status/2, get_fake_repairing_status/1]).
-export([make_proj/1, make_proj/2]). -export([make_proj/1, make_proj/2]).
@ -78,6 +79,9 @@ write(Pid, Epoch, Bin) ->
trim(Pid, Epoch) -> trim(Pid, Epoch) ->
g_call(Pid, {reg_op, Epoch, trim}, ?LONG_TIME). g_call(Pid, {reg_op, Epoch, trim}, ?LONG_TIME).
get_epoch(Pid) ->
g_call(Pid, {get_epoch}, ?LONG_TIME).
proj_write(Pid, Epoch, StoreType, Proj) proj_write(Pid, Epoch, StoreType, Proj)
when StoreType == public; StoreType == private -> when StoreType == public; StoreType == private ->
g_call(Pid, {proj_write, Epoch, StoreType, Proj}, ?LONG_TIME). g_call(Pid, {proj_write, Epoch, StoreType, Proj}, ?LONG_TIME).
@ -169,6 +173,10 @@ handle_call({{reg_op, _Epoch, trim}, LC1}, _From, #state{register=trimmed} = S)
LC2 = lclock_update(LC1), LC2 = lclock_update(LC1),
{reply, {error_trimmed, LC2}, S}; {reply, {error_trimmed, LC2}, S};
handle_call({{get_epoch}, LC1}, _From, S) ->
LC2 = lclock_update(LC1),
{Reply, NewS} = {S#state.proj_epoch, S},
{reply, {Reply, LC2}, NewS};
handle_call({{proj_write, Epoch, StoreType, Proj}, LC1}, _From, S) -> handle_call({{proj_write, Epoch, StoreType, Proj}, LC1}, _From, S) ->
LC2 = lclock_update(LC1), LC2 = lclock_update(LC1),
{Reply, NewS} = do_proj_write(Epoch, StoreType, Proj, S), {Reply, NewS} = do_proj_write(Epoch, StoreType, Proj, S),