Merge branch 'slf/flu-cleanup1'
This commit is contained in:
commit
4a09bfa2d1
13 changed files with 224 additions and 102 deletions
|
@ -525,6 +525,11 @@ message Mpb_LL_ListAllProjectionsResp {
|
||||||
repeated uint32 epochs = 2;
|
repeated uint32 epochs = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Low level API: kick_projection_reaction request, NO RESPONSE!
|
||||||
|
|
||||||
|
message Mpb_LL_KickProjectionReactionReq {
|
||||||
|
}
|
||||||
|
|
||||||
//////////////////////////////////////////
|
//////////////////////////////////////////
|
||||||
//
|
//
|
||||||
// Low API request & response wrapper
|
// Low API request & response wrapper
|
||||||
|
@ -552,6 +557,7 @@ message Mpb_LL_Request {
|
||||||
optional Mpb_LL_WriteProjectionReq proj_wp = 15;
|
optional Mpb_LL_WriteProjectionReq proj_wp = 15;
|
||||||
optional Mpb_LL_GetAllProjectionsReq proj_ga = 16;
|
optional Mpb_LL_GetAllProjectionsReq proj_ga = 16;
|
||||||
optional Mpb_LL_ListAllProjectionsReq proj_la = 17;
|
optional Mpb_LL_ListAllProjectionsReq proj_la = 17;
|
||||||
|
optional Mpb_LL_KickProjectionReactionReq proj_kp = 18;
|
||||||
|
|
||||||
optional Mpb_LL_AppendChunkReq append_chunk = 30;
|
optional Mpb_LL_AppendChunkReq append_chunk = 30;
|
||||||
optional Mpb_LL_WriteChunkReq write_chunk = 31;
|
optional Mpb_LL_WriteChunkReq write_chunk = 31;
|
||||||
|
@ -585,6 +591,7 @@ message Mpb_LL_Response {
|
||||||
optional Mpb_LL_WriteProjectionResp proj_wp = 15;
|
optional Mpb_LL_WriteProjectionResp proj_wp = 15;
|
||||||
optional Mpb_LL_GetAllProjectionsResp proj_ga = 16;
|
optional Mpb_LL_GetAllProjectionsResp proj_ga = 16;
|
||||||
optional Mpb_LL_ListAllProjectionsResp proj_la = 17;
|
optional Mpb_LL_ListAllProjectionsResp proj_la = 17;
|
||||||
|
// No reponse to Mpb_LL_KickProjectionReactionReq = 18;
|
||||||
|
|
||||||
optional Mpb_LL_AppendChunkResp append_chunk = 30;
|
optional Mpb_LL_AppendChunkResp append_chunk = 30;
|
||||||
optional Mpb_LL_WriteChunkResp write_chunk = 31;
|
optional Mpb_LL_WriteChunkResp write_chunk = 31;
|
||||||
|
|
|
@ -93,7 +93,8 @@
|
||||||
|
|
||||||
%% API
|
%% API
|
||||||
-export([start_link/2, start_link/3, stop/1, ping/1,
|
-export([start_link/2, start_link/3, stop/1, ping/1,
|
||||||
set_chain_members/2, set_active/2]).
|
set_chain_members/2, set_active/2,
|
||||||
|
trigger_react_to_env/1]).
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||||
terminate/2, code_change/3]).
|
terminate/2, code_change/3]).
|
||||||
|
|
||||||
|
@ -105,7 +106,6 @@
|
||||||
-export([test_calc_projection/2,
|
-export([test_calc_projection/2,
|
||||||
test_write_public_projection/2,
|
test_write_public_projection/2,
|
||||||
test_read_latest_public_projection/2,
|
test_read_latest_public_projection/2,
|
||||||
test_react_to_env/1,
|
|
||||||
get_all_hosed/1]).
|
get_all_hosed/1]).
|
||||||
|
|
||||||
-ifdef(EQC).
|
-ifdef(EQC).
|
||||||
|
@ -146,6 +146,9 @@ set_chain_members(Pid, MembersDict) ->
|
||||||
set_active(Pid, Boolean) when Boolean == true; Boolean == false ->
|
set_active(Pid, Boolean) when Boolean == true; Boolean == false ->
|
||||||
gen_server:call(Pid, {set_active, Boolean}, infinity).
|
gen_server:call(Pid, {set_active, Boolean}, infinity).
|
||||||
|
|
||||||
|
trigger_react_to_env(Pid) ->
|
||||||
|
gen_server:call(Pid, {trigger_react_to_env}, infinity).
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
|
|
||||||
%% Test/debugging code only.
|
%% Test/debugging code only.
|
||||||
|
@ -163,9 +166,6 @@ test_read_latest_public_projection(Pid, ReadRepairP) ->
|
||||||
gen_server:call(Pid, {test_read_latest_public_projection, ReadRepairP},
|
gen_server:call(Pid, {test_read_latest_public_projection, ReadRepairP},
|
||||||
infinity).
|
infinity).
|
||||||
|
|
||||||
test_react_to_env(Pid) ->
|
|
||||||
gen_server:call(Pid, {test_react_to_env}, infinity).
|
|
||||||
|
|
||||||
-endif. % TEST
|
-endif. % TEST
|
||||||
|
|
||||||
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
||||||
|
@ -284,7 +284,8 @@ handle_call({test_read_latest_public_projection, ReadRepairP}, _From, S) ->
|
||||||
do_cl_read_latest_public_projection(ReadRepairP, S),
|
do_cl_read_latest_public_projection(ReadRepairP, S),
|
||||||
Res = {Perhaps, Val, ExtraInfo},
|
Res = {Perhaps, Val, ExtraInfo},
|
||||||
{reply, Res, S2};
|
{reply, Res, S2};
|
||||||
handle_call({test_react_to_env}, _From, S) ->
|
handle_call({trigger_react_to_env}=Call, _From, S) ->
|
||||||
|
gobble_calls(Call),
|
||||||
{TODOtodo, S2} = do_react_to_env(S),
|
{TODOtodo, S2} = do_react_to_env(S),
|
||||||
{reply, TODOtodo, S2};
|
{reply, TODOtodo, S2};
|
||||||
handle_call(_Call, _From, S) ->
|
handle_call(_Call, _From, S) ->
|
||||||
|
@ -304,9 +305,7 @@ handle_info(tick_check_environment, S) ->
|
||||||
N when is_integer(N), N > 0 ->
|
N when is_integer(N), N > 0 ->
|
||||||
%% We are flapping. Set ignore_timer=true and schedule a
|
%% We are flapping. Set ignore_timer=true and schedule a
|
||||||
%% reminder to stop ignoring. This slows down the rate of
|
%% reminder to stop ignoring. This slows down the rate of
|
||||||
%% flapping. If/when the yo:tell_author_yo() function in
|
%% flapping.
|
||||||
%% state C200 is ever implemented, then it should be
|
|
||||||
%% implemented via the test_react_to_env style.
|
|
||||||
erlang:send_after(N*1000, self(), stop_ignoring_timer),
|
erlang:send_after(N*1000, self(), stop_ignoring_timer),
|
||||||
{noreply, S3#ch_mgr{ignore_timer=true}};
|
{noreply, S3#ch_mgr{ignore_timer=true}};
|
||||||
_ ->
|
_ ->
|
||||||
|
@ -1349,13 +1348,10 @@ react_to_env_C110(P_latest, #ch_mgr{name=MyName} = S) ->
|
||||||
P_latest2 = machi_projection:update_dbg2(P_latest, Extra_todo),
|
P_latest2 = machi_projection:update_dbg2(P_latest, Extra_todo),
|
||||||
|
|
||||||
MyNamePid = proxy_pid(MyName, S),
|
MyNamePid = proxy_pid(MyName, S),
|
||||||
|
Goo = P_latest2#projection_v1.epoch_number,
|
||||||
%% This is the local projection store. Use a larger timeout, so
|
%% This is the local projection store. Use a larger timeout, so
|
||||||
%% that things locally are pretty horrible if we're killed by a
|
%% that things locally are pretty horrible if we're killed by a
|
||||||
%% timeout exception.
|
%% timeout exception.
|
||||||
%% ok = ?FLU_PC:write_projection(MyNamePid, private, P_latest2, ?TO*30),
|
|
||||||
Goo = P_latest2#projection_v1.epoch_number,
|
|
||||||
%% io:format(user, "HEE110 ~w ~w ~w\n", [S#ch_mgr.name, self(), lists:reverse(get(react))]),
|
|
||||||
|
|
||||||
{ok,Goo} = {?FLU_PC:write_projection(MyNamePid, private, P_latest2, ?TO*30),Goo},
|
{ok,Goo} = {?FLU_PC:write_projection(MyNamePid, private, P_latest2, ?TO*30),Goo},
|
||||||
case proplists:get_value(private_write_verbose, S#ch_mgr.opts) of
|
case proplists:get_value(private_write_verbose, S#ch_mgr.opts) of
|
||||||
true ->
|
true ->
|
||||||
|
@ -1412,13 +1408,11 @@ react_to_env_C120(P_latest, FinalProps, #ch_mgr{proj_history=H} = S) ->
|
||||||
react_to_env_C200(Retries, P_latest, S) ->
|
react_to_env_C200(Retries, P_latest, S) ->
|
||||||
?REACT(c200),
|
?REACT(c200),
|
||||||
try
|
try
|
||||||
%% TODO: This code works "well enough" without actually
|
AuthorProxyPid = proxy_pid(P_latest#projection_v1.author_server, S),
|
||||||
%% telling anybody anything. Do we want to rip this out?
|
?FLU_PC:kick_projection_reaction(AuthorProxyPid, [])
|
||||||
%% Actually implement it? None of the above?
|
|
||||||
yo:tell_author_yo(P_latest#projection_v1.author_server)
|
|
||||||
catch _Type:_Err ->
|
catch _Type:_Err ->
|
||||||
%% io:format(user, "TODO: tell_author_yo is broken: ~p ~p\n",
|
io:format(user, "TODO: tell_author_yo error is probably ignorable: ~p ~p\n",
|
||||||
%% [_Type, _Err]),
|
[_Type, _Err]),
|
||||||
ok
|
ok
|
||||||
end,
|
end,
|
||||||
react_to_env_C210(Retries, S).
|
react_to_env_C210(Retries, S).
|
||||||
|
@ -2009,6 +2003,15 @@ make_chmgr_regname(A) when is_atom(A) ->
|
||||||
make_chmgr_regname(B) when is_binary(B) ->
|
make_chmgr_regname(B) when is_binary(B) ->
|
||||||
list_to_atom(binary_to_list(B) ++ "_chmgr").
|
list_to_atom(binary_to_list(B) ++ "_chmgr").
|
||||||
|
|
||||||
|
gobble_calls(StaticCall) ->
|
||||||
|
receive
|
||||||
|
{'$gen_call',From,{trigger_react_to_env}} ->
|
||||||
|
gen_server:reply(From, todo_overload),
|
||||||
|
gobble_calls(StaticCall)
|
||||||
|
after 1 -> % after 0 angers pulse.
|
||||||
|
ok
|
||||||
|
end.
|
||||||
|
|
||||||
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
||||||
|
|
||||||
perhaps_start_repair(
|
perhaps_start_repair(
|
||||||
|
|
|
@ -62,13 +62,6 @@
|
||||||
%% the FLU keep track of the epoch number of the last file write (and
|
%% the FLU keep track of the epoch number of the last file write (and
|
||||||
%% perhaps last metadata write), as an optimization for inter-FLU data
|
%% perhaps last metadata write), as an optimization for inter-FLU data
|
||||||
%% replication/chain repair.
|
%% replication/chain repair.
|
||||||
%%
|
|
||||||
%% TODO Section 4.2 ("The Sequencer") says that the sequencer must
|
|
||||||
%% change its file assignments to new & unique names whenever we move
|
|
||||||
%% to wedge state. This is not yet implemented. In the current
|
|
||||||
%% Erlang process scheme (which will probably be changing soon), a
|
|
||||||
%% simple implementation would stop all existing processes that are
|
|
||||||
%% running run_seq_append_server().
|
|
||||||
|
|
||||||
-module(machi_flu1).
|
-module(machi_flu1).
|
||||||
|
|
||||||
|
@ -216,9 +209,6 @@ start_append_server(S, AckPid) ->
|
||||||
FluPid = self(),
|
FluPid = self(),
|
||||||
proc_lib:spawn_link(fun() -> run_append_server(FluPid, AckPid, S) end).
|
proc_lib:spawn_link(fun() -> run_append_server(FluPid, AckPid, S) end).
|
||||||
|
|
||||||
%% start_projection_server(S) ->
|
|
||||||
%% spawn_link(fun() -> run_projection_server(S) end).
|
|
||||||
|
|
||||||
run_listen_server(#state{flu_name=FluName, tcp_port=TcpPort}=S) ->
|
run_listen_server(#state{flu_name=FluName, tcp_port=TcpPort}=S) ->
|
||||||
register(make_listener_regname(FluName), self()),
|
register(make_listener_regname(FluName), self()),
|
||||||
SockOpts = ?PB_PACKET_OPTS ++
|
SockOpts = ?PB_PACKET_OPTS ++
|
||||||
|
@ -251,21 +241,28 @@ listen_server_loop(LSock, S) ->
|
||||||
listen_server_loop(LSock, S).
|
listen_server_loop(LSock, S).
|
||||||
|
|
||||||
append_server_loop(FluPid, #state{data_dir=DataDir, wedged=Wedged_p,
|
append_server_loop(FluPid, #state{data_dir=DataDir, wedged=Wedged_p,
|
||||||
epoch_id=OldEpochId}=S) ->
|
epoch_id=OldEpochId, flu_name=FluName}=S) ->
|
||||||
AppendServerPid = self(),
|
AppendServerPid = self(),
|
||||||
receive
|
receive
|
||||||
{seq_append, From, _Prefix, _Chunk, _CSum, _Extra} when Wedged_p ->
|
{seq_append, From, _Prefix, _Chunk, _CSum, _Extra, _EpochID}
|
||||||
|
when Wedged_p ->
|
||||||
From ! wedged,
|
From ! wedged,
|
||||||
append_server_loop(FluPid, S);
|
append_server_loop(FluPid, S);
|
||||||
{seq_append, From, Prefix, Chunk, CSum, Extra} ->
|
{seq_append, From, Prefix, Chunk, CSum, Extra, EpochID} ->
|
||||||
spawn(fun() -> append_server_dispatch(From, Prefix,
|
spawn(fun() -> append_server_dispatch(From, Prefix,
|
||||||
Chunk, CSum, Extra,
|
Chunk, CSum, Extra, EpochID,
|
||||||
DataDir, AppendServerPid) end),
|
DataDir, AppendServerPid) end),
|
||||||
append_server_loop(FluPid, S);
|
append_server_loop(FluPid, S);
|
||||||
{wedge_myself, WedgeEpochId} ->
|
{wedge_myself, WedgeEpochId} ->
|
||||||
if WedgeEpochId == OldEpochId ->
|
if not Wedged_p andalso WedgeEpochId == OldEpochId ->
|
||||||
true = ets:insert(S#state.etstab,
|
true = ets:insert(S#state.etstab,
|
||||||
{epoch, {true, OldEpochId}}),
|
{epoch, {true, OldEpochId}}),
|
||||||
|
%% Tell my chain manager that it might want to react to
|
||||||
|
%% this new world.
|
||||||
|
Chmgr = machi_chain_manager1:make_chmgr_regname(FluName),
|
||||||
|
spawn(fun() ->
|
||||||
|
catch machi_chain_manager1:trigger_react_to_env(Chmgr)
|
||||||
|
end),
|
||||||
append_server_loop(FluPid, S#state{wedged=true});
|
append_server_loop(FluPid, S#state{wedged=true});
|
||||||
true ->
|
true ->
|
||||||
append_server_loop(FluPid, S)
|
append_server_loop(FluPid, S)
|
||||||
|
@ -298,14 +295,18 @@ net_server_loop(Sock, S) ->
|
||||||
case machi_pb:decode_mpb_ll_request(Bin) of
|
case machi_pb:decode_mpb_ll_request(Bin) of
|
||||||
LL_req when LL_req#mpb_ll_request.do_not_alter == 2 ->
|
LL_req when LL_req#mpb_ll_request.do_not_alter == 2 ->
|
||||||
{R, NewS} = do_pb_ll_request(LL_req, S),
|
{R, NewS} = do_pb_ll_request(LL_req, S),
|
||||||
{machi_pb:encode_mpb_ll_response(R), mode(low, NewS)};
|
{maybe_encode_response(R), mode(low, NewS)};
|
||||||
_ ->
|
_ ->
|
||||||
HL_req = machi_pb:decode_mpb_request(Bin),
|
HL_req = machi_pb:decode_mpb_request(Bin),
|
||||||
1 = HL_req#mpb_request.do_not_alter,
|
1 = HL_req#mpb_request.do_not_alter,
|
||||||
{R, NewS} = do_pb_hl_request(HL_req, make_high_clnt(S)),
|
{R, NewS} = do_pb_hl_request(HL_req, make_high_clnt(S)),
|
||||||
{machi_pb:encode_mpb_response(R), mode(high, NewS)}
|
{machi_pb:encode_mpb_response(R), mode(high, NewS)}
|
||||||
end,
|
end,
|
||||||
ok = gen_tcp:send(Sock, RespBin),
|
if RespBin == async_no_response ->
|
||||||
|
ok;
|
||||||
|
true ->
|
||||||
|
ok = gen_tcp:send(Sock, RespBin)
|
||||||
|
end,
|
||||||
net_server_loop(Sock, S2);
|
net_server_loop(Sock, S2);
|
||||||
{error, SockError} ->
|
{error, SockError} ->
|
||||||
Msg = io_lib:format("Socket error ~w", [SockError]),
|
Msg = io_lib:format("Socket error ~w", [SockError]),
|
||||||
|
@ -320,6 +321,11 @@ net_server_loop(Sock, S) ->
|
||||||
exit(normal)
|
exit(normal)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
maybe_encode_response(async_no_response=X) ->
|
||||||
|
X;
|
||||||
|
maybe_encode_response(R) ->
|
||||||
|
machi_pb:encode_mpb_ll_response(R).
|
||||||
|
|
||||||
mode(Mode, #state{pb_mode=undefined}=S) ->
|
mode(Mode, #state{pb_mode=undefined}=S) ->
|
||||||
S#state{pb_mode=Mode};
|
S#state{pb_mode=Mode};
|
||||||
mode(_, S) ->
|
mode(_, S) ->
|
||||||
|
@ -359,7 +365,7 @@ do_pb_ll_request(PB_request, S) ->
|
||||||
do_pb_ll_request2(EpochID, CMD, S) ->
|
do_pb_ll_request2(EpochID, CMD, S) ->
|
||||||
{Wedged_p, CurrentEpochID} = ets:lookup_element(S#state.etstab, epoch, 2),
|
{Wedged_p, CurrentEpochID} = ets:lookup_element(S#state.etstab, epoch, 2),
|
||||||
if Wedged_p == true ->
|
if Wedged_p == true ->
|
||||||
{{error, wedged}, S};
|
{{error, wedged}, S#state{epoch_id=CurrentEpochID}};
|
||||||
is_tuple(EpochID)
|
is_tuple(EpochID)
|
||||||
andalso
|
andalso
|
||||||
EpochID /= CurrentEpochID ->
|
EpochID /= CurrentEpochID ->
|
||||||
|
@ -370,13 +376,12 @@ do_pb_ll_request2(EpochID, CMD, S) ->
|
||||||
true ->
|
true ->
|
||||||
%% We're at same epoch # but different checksum, or
|
%% We're at same epoch # but different checksum, or
|
||||||
%% we're at a newer/bigger epoch #.
|
%% we're at a newer/bigger epoch #.
|
||||||
io:format(user, "\n\nTODO/monitor: wedging myself!\n\n",[]),
|
|
||||||
wedge_myself(S#state.flu_name, CurrentEpochID),
|
wedge_myself(S#state.flu_name, CurrentEpochID),
|
||||||
ok
|
ok
|
||||||
end,
|
end,
|
||||||
{{error, bad_epoch}, S};
|
{{error, bad_epoch}, S#state{epoch_id=CurrentEpochID}};
|
||||||
true ->
|
true ->
|
||||||
do_pb_ll_request3(CMD, S)
|
do_pb_ll_request3(CMD, S#state{epoch_id=CurrentEpochID})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_pb_ll_request3({low_echo, _BogusEpochID, Msg}, S) ->
|
do_pb_ll_request3({low_echo, _BogusEpochID, Msg}, S) ->
|
||||||
|
@ -456,7 +461,16 @@ do_server_proj_request({get_all_projections, ProjType},
|
||||||
machi_projection_store:get_all_projections(ProjStore, ProjType);
|
machi_projection_store:get_all_projections(ProjStore, ProjType);
|
||||||
do_server_proj_request({list_all_projections, ProjType},
|
do_server_proj_request({list_all_projections, ProjType},
|
||||||
#state{proj_store=ProjStore}) ->
|
#state{proj_store=ProjStore}) ->
|
||||||
machi_projection_store:list_all_projections(ProjStore, ProjType).
|
machi_projection_store:list_all_projections(ProjStore, ProjType);
|
||||||
|
do_server_proj_request({kick_projection_reaction},
|
||||||
|
#state{flu_name=FluName}) ->
|
||||||
|
%% Tell my chain manager that it might want to react to
|
||||||
|
%% this new world.
|
||||||
|
Chmgr = machi_chain_manager1:make_chmgr_regname(FluName),
|
||||||
|
spawn(fun() ->
|
||||||
|
catch machi_chain_manager1:trigger_react_to_env(Chmgr)
|
||||||
|
end),
|
||||||
|
async_no_response.
|
||||||
|
|
||||||
do_server_append_chunk(PKey, Prefix, Chunk, CSum_tag, CSum,
|
do_server_append_chunk(PKey, Prefix, Chunk, CSum_tag, CSum,
|
||||||
ChunkExtra, S) ->
|
ChunkExtra, S) ->
|
||||||
|
@ -469,11 +483,13 @@ do_server_append_chunk(PKey, Prefix, Chunk, CSum_tag, CSum,
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_server_append_chunk2(_PKey, Prefix, Chunk, CSum_tag, Client_CSum,
|
do_server_append_chunk2(_PKey, Prefix, Chunk, CSum_tag, Client_CSum,
|
||||||
ChunkExtra, #state{flu_name=FluName}=_S) ->
|
ChunkExtra, #state{flu_name=FluName,
|
||||||
|
epoch_id=EpochID}=_S) ->
|
||||||
%% TODO: Do anything with PKey?
|
%% TODO: Do anything with PKey?
|
||||||
try
|
try
|
||||||
TaggedCSum = check_or_make_tagged_checksum(CSum_tag, Client_CSum,Chunk),
|
TaggedCSum = check_or_make_tagged_checksum(CSum_tag, Client_CSum,Chunk),
|
||||||
FluName ! {seq_append, self(), Prefix, Chunk, TaggedCSum, ChunkExtra},
|
FluName ! {seq_append, self(), Prefix, Chunk, TaggedCSum,
|
||||||
|
ChunkExtra, EpochID},
|
||||||
receive
|
receive
|
||||||
{assignment, Offset, File} ->
|
{assignment, Offset, File} ->
|
||||||
Size = iolist_size(Chunk),
|
Size = iolist_size(Chunk),
|
||||||
|
@ -664,9 +680,10 @@ do_server_trunc_hack(File, #state{data_dir=DataDir}=_S) ->
|
||||||
{error, bad_arg}
|
{error, bad_arg}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
append_server_dispatch(From, Prefix, Chunk, CSum, Extra, DataDir, LinkPid) ->
|
append_server_dispatch(From, Prefix, Chunk, CSum, Extra, EpochID,
|
||||||
Pid = write_server_get_pid(Prefix, DataDir, LinkPid),
|
DataDir, LinkPid) ->
|
||||||
Pid ! {seq_append, From, Prefix, Chunk, CSum, Extra},
|
Pid = write_server_get_pid(Prefix, EpochID, DataDir, LinkPid),
|
||||||
|
Pid ! {seq_append, From, Prefix, Chunk, CSum, Extra, EpochID},
|
||||||
exit(normal).
|
exit(normal).
|
||||||
|
|
||||||
sanitize_file_string(Str) ->
|
sanitize_file_string(Str) ->
|
||||||
|
@ -700,12 +717,12 @@ sync_checksum_file(File) ->
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
write_server_get_pid(Prefix, DataDir, LinkPid) ->
|
write_server_get_pid(Prefix, EpochID, DataDir, LinkPid) ->
|
||||||
case write_server_find_pid(Prefix) of
|
case write_server_find_pid(Prefix) of
|
||||||
undefined ->
|
undefined ->
|
||||||
start_seq_append_server(Prefix, DataDir, LinkPid),
|
start_seq_append_server(Prefix, EpochID, DataDir, LinkPid),
|
||||||
timer:sleep(1),
|
timer:sleep(1),
|
||||||
write_server_get_pid(Prefix, DataDir, LinkPid);
|
write_server_get_pid(Prefix, EpochID, DataDir, LinkPid);
|
||||||
Pid ->
|
Pid ->
|
||||||
Pid
|
Pid
|
||||||
end.
|
end.
|
||||||
|
@ -714,7 +731,7 @@ write_server_find_pid(Prefix) ->
|
||||||
FluName = machi_util:make_regname(Prefix),
|
FluName = machi_util:make_regname(Prefix),
|
||||||
whereis(FluName).
|
whereis(FluName).
|
||||||
|
|
||||||
start_seq_append_server(Prefix, DataDir, AppendServerPid) ->
|
start_seq_append_server(Prefix, EpochID, DataDir, AppendServerPid) ->
|
||||||
proc_lib:spawn_link(fun() ->
|
proc_lib:spawn_link(fun() ->
|
||||||
%% The following is only necessary to
|
%% The following is only necessary to
|
||||||
%% make nice process relationships in
|
%% make nice process relationships in
|
||||||
|
@ -722,21 +739,20 @@ start_seq_append_server(Prefix, DataDir, AppendServerPid) ->
|
||||||
put('$ancestors', [AppendServerPid]),
|
put('$ancestors', [AppendServerPid]),
|
||||||
put('$initial_call', {x,y,3}),
|
put('$initial_call', {x,y,3}),
|
||||||
link(AppendServerPid),
|
link(AppendServerPid),
|
||||||
run_seq_append_server(Prefix, DataDir)
|
run_seq_append_server(Prefix, EpochID, DataDir)
|
||||||
end).
|
end).
|
||||||
|
|
||||||
run_seq_append_server(Prefix, DataDir) ->
|
run_seq_append_server(Prefix, EpochID, DataDir) ->
|
||||||
true = register(machi_util:make_regname(Prefix), self()),
|
true = register(machi_util:make_regname(Prefix), self()),
|
||||||
run_seq_append_server2(Prefix, DataDir).
|
run_seq_append_server2(Prefix, EpochID, DataDir).
|
||||||
|
|
||||||
run_seq_append_server2(Prefix, DataDir) ->
|
run_seq_append_server2(Prefix, EpochID, DataDir) ->
|
||||||
FileNum = machi_util:read_max_filenum(DataDir, Prefix) + 1,
|
FileNum = machi_util:read_max_filenum(DataDir, Prefix) + 1,
|
||||||
case machi_util:increment_max_filenum(DataDir, Prefix) of
|
case machi_util:increment_max_filenum(DataDir, Prefix) of
|
||||||
ok ->
|
ok ->
|
||||||
machi_util:increment_max_filenum(DataDir, Prefix),
|
|
||||||
machi_util:info_msg("start: ~p server at file ~w\n",
|
machi_util:info_msg("start: ~p server at file ~w\n",
|
||||||
[Prefix, FileNum]),
|
[Prefix, FileNum]),
|
||||||
seq_append_server_loop(DataDir, Prefix, FileNum);
|
seq_append_server_loop(DataDir, Prefix, EpochID, FileNum);
|
||||||
Else ->
|
Else ->
|
||||||
error_logger:error_msg("start: ~p server at file ~w: ~p\n",
|
error_logger:error_msg("start: ~p server at file ~w: ~p\n",
|
||||||
[Prefix, FileNum, Else]),
|
[Prefix, FileNum, Else]),
|
||||||
|
@ -750,7 +766,7 @@ seq_name_hack() ->
|
||||||
[element(3,now()),
|
[element(3,now()),
|
||||||
list_to_integer(os:getpid())])).
|
list_to_integer(os:getpid())])).
|
||||||
|
|
||||||
seq_append_server_loop(DataDir, Prefix, FileNum) ->
|
seq_append_server_loop(DataDir, Prefix, EpochID, FileNum) ->
|
||||||
SequencerNameHack = seq_name_hack(),
|
SequencerNameHack = seq_name_hack(),
|
||||||
{File, FullPath} = machi_util:make_data_filename(
|
{File, FullPath} = machi_util:make_data_filename(
|
||||||
DataDir, Prefix, SequencerNameHack, FileNum),
|
DataDir, Prefix, SequencerNameHack, FileNum),
|
||||||
|
@ -759,19 +775,22 @@ seq_append_server_loop(DataDir, Prefix, FileNum) ->
|
||||||
CSumPath = machi_util:make_checksum_filename(
|
CSumPath = machi_util:make_checksum_filename(
|
||||||
DataDir, Prefix, SequencerNameHack, FileNum),
|
DataDir, Prefix, SequencerNameHack, FileNum),
|
||||||
{ok, FHc} = file:open(CSumPath, [append, raw, binary]),
|
{ok, FHc} = file:open(CSumPath, [append, raw, binary]),
|
||||||
seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}, FileNum,
|
seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}, EpochID, FileNum,
|
||||||
?MINIMUM_OFFSET).
|
?MINIMUM_OFFSET).
|
||||||
|
|
||||||
seq_append_server_loop(DataDir, Prefix, _File, {FHd,FHc}, FileNum, Offset)
|
seq_append_server_loop(DataDir, Prefix, _File, {FHd,FHc}, EpochID,
|
||||||
|
FileNum, Offset)
|
||||||
when Offset > ?MAX_FILE_SIZE ->
|
when Offset > ?MAX_FILE_SIZE ->
|
||||||
ok = file:close(FHd),
|
ok = file:close(FHd),
|
||||||
ok = file:close(FHc),
|
ok = file:close(FHc),
|
||||||
machi_util:info_msg("rollover: ~p server at file ~w offset ~w\n",
|
machi_util:info_msg("rollover: ~p server at file ~w offset ~w\n",
|
||||||
[Prefix, FileNum, Offset]),
|
[Prefix, FileNum, Offset]),
|
||||||
run_seq_append_server2(Prefix, DataDir);
|
run_seq_append_server2(Prefix, EpochID, DataDir);
|
||||||
seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}=FH_, FileNum, Offset) ->
|
seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}=FH_, EpochID,
|
||||||
|
FileNum, Offset) ->
|
||||||
receive
|
receive
|
||||||
{seq_append, From, Prefix, Chunk, TaggedCSum, Extra} ->
|
{seq_append, From, Prefix, Chunk, TaggedCSum, Extra, R_EpochID}
|
||||||
|
when R_EpochID == EpochID ->
|
||||||
if Chunk /= <<>> ->
|
if Chunk /= <<>> ->
|
||||||
ok = file:pwrite(FHd, Offset, Chunk);
|
ok = file:pwrite(FHd, Offset, Chunk);
|
||||||
true ->
|
true ->
|
||||||
|
@ -781,12 +800,20 @@ seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}=FH_, FileNum, Offset) ->
|
||||||
Size = iolist_size(Chunk),
|
Size = iolist_size(Chunk),
|
||||||
CSum_info = encode_csum_file_entry(Offset, Size, TaggedCSum),
|
CSum_info = encode_csum_file_entry(Offset, Size, TaggedCSum),
|
||||||
ok = file:write(FHc, CSum_info),
|
ok = file:write(FHc, CSum_info),
|
||||||
seq_append_server_loop(DataDir, Prefix, File, FH_,
|
seq_append_server_loop(DataDir, Prefix, File, FH_, EpochID,
|
||||||
FileNum, Offset + Size + Extra);
|
FileNum, Offset + Size + Extra);
|
||||||
|
{seq_append, _From, _Prefix, _Chunk, _TCSum, _Extra, R_EpochID}=MSG ->
|
||||||
|
%% Rare'ish event: send MSG to myself so it doesn't get lost
|
||||||
|
%% while we recurse around to pick up a new FileNum.
|
||||||
|
self() ! MSG,
|
||||||
|
machi_util:info_msg("rollover: ~p server at file ~w offset ~w "
|
||||||
|
"by new epoch_id ~W\n",
|
||||||
|
[Prefix, FileNum, Offset, R_EpochID, 8]),
|
||||||
|
run_seq_append_server2(Prefix, R_EpochID, DataDir);
|
||||||
{sync_stuff, FromPid, Ref} ->
|
{sync_stuff, FromPid, Ref} ->
|
||||||
file:sync(FHc),
|
file:sync(FHc),
|
||||||
FromPid ! {sync_finished, Ref},
|
FromPid ! {sync_finished, Ref},
|
||||||
seq_append_server_loop(DataDir, Prefix, File, FH_,
|
seq_append_server_loop(DataDir, Prefix, File, FH_, EpochID,
|
||||||
FileNum, Offset)
|
FileNum, Offset)
|
||||||
after 30*1000 ->
|
after 30*1000 ->
|
||||||
ok = file:close(FHd),
|
ok = file:close(FHd),
|
||||||
|
@ -843,7 +870,7 @@ http_hack_server_put(Sock, G, FluName, MyURI) ->
|
||||||
throw({bad_csum, XX})
|
throw({bad_csum, XX})
|
||||||
end
|
end
|
||||||
end,
|
end,
|
||||||
FluName ! {seq_append, self(), MyURI, Chunk, CSum, 0}
|
FluName ! {seq_append, self(), MyURI, Chunk, CSum, 0, todo_epoch_id_bitrot}
|
||||||
catch
|
catch
|
||||||
throw:{bad_csum, _CS} ->
|
throw:{bad_csum, _CS} ->
|
||||||
Out = "HTTP/1.0 412 Precondition failed\r\n"
|
Out = "HTTP/1.0 412 Precondition failed\r\n"
|
||||||
|
|
|
@ -63,6 +63,7 @@
|
||||||
write_projection/3, write_projection/4,
|
write_projection/3, write_projection/4,
|
||||||
get_all_projections/2, get_all_projections/3,
|
get_all_projections/2, get_all_projections/3,
|
||||||
list_all_projections/2, list_all_projections/3,
|
list_all_projections/2, list_all_projections/3,
|
||||||
|
kick_projection_reaction/2, kick_projection_reaction/3,
|
||||||
|
|
||||||
%% Common API
|
%% Common API
|
||||||
echo/2, echo/3,
|
echo/2, echo/3,
|
||||||
|
@ -374,6 +375,27 @@ list_all_projections(Host, TcpPort, ProjType)
|
||||||
disconnect(Sock)
|
disconnect(Sock)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
%% @doc Kick (politely) the remote chain manager to react to a
|
||||||
|
%% projection change.
|
||||||
|
|
||||||
|
-spec kick_projection_reaction(port_wrap(), list()) ->
|
||||||
|
ok.
|
||||||
|
kick_projection_reaction(Sock, Options) ->
|
||||||
|
kick_projection_reaction2(Sock, Options).
|
||||||
|
|
||||||
|
%% @doc Kick (politely) the remote chain manager to react to a
|
||||||
|
%% projection change.
|
||||||
|
|
||||||
|
-spec kick_projection_reaction(machi_dt:inet_host(), machi_dt:inet_port(), list()) ->
|
||||||
|
ok.
|
||||||
|
kick_projection_reaction(Host, TcpPort, Options) ->
|
||||||
|
Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}),
|
||||||
|
try
|
||||||
|
kick_projection_reaction2(Sock, Options)
|
||||||
|
after
|
||||||
|
disconnect(Sock)
|
||||||
|
end.
|
||||||
|
|
||||||
%% @doc Echo -- test protocol round-trip.
|
%% @doc Echo -- test protocol round-trip.
|
||||||
|
|
||||||
-spec echo(port_wrap(), string()) ->
|
-spec echo(port_wrap(), string()) ->
|
||||||
|
@ -604,11 +626,21 @@ list_all_projections2(Sock, ProjType) ->
|
||||||
ReqID, {low_proj, {list_all_projections, ProjType}}),
|
ReqID, {low_proj, {list_all_projections, ProjType}}),
|
||||||
do_pb_request_common(Sock, ReqID, Req).
|
do_pb_request_common(Sock, ReqID, Req).
|
||||||
|
|
||||||
|
kick_projection_reaction2(Sock, _Options) ->
|
||||||
|
ReqID = <<42>>,
|
||||||
|
Req = machi_pb_translate:to_pb_request(
|
||||||
|
ReqID, {low_proj, {kick_projection_reaction}}),
|
||||||
|
do_pb_request_common(Sock, ReqID, Req, false).
|
||||||
|
|
||||||
do_pb_request_common(Sock, ReqID, Req) ->
|
do_pb_request_common(Sock, ReqID, Req) ->
|
||||||
|
do_pb_request_common(Sock, ReqID, Req, true).
|
||||||
|
|
||||||
|
do_pb_request_common(Sock, ReqID, Req, GetReply_p) ->
|
||||||
erase(bad_sock),
|
erase(bad_sock),
|
||||||
try
|
try
|
||||||
ReqBin = list_to_binary(machi_pb:encode_mpb_ll_request(Req)),
|
ReqBin = list_to_binary(machi_pb:encode_mpb_ll_request(Req)),
|
||||||
ok = w_send(Sock, ReqBin),
|
ok = w_send(Sock, ReqBin),
|
||||||
|
if GetReply_p ->
|
||||||
case w_recv(Sock, 0) of
|
case w_recv(Sock, 0) of
|
||||||
{ok, RespBin} ->
|
{ok, RespBin} ->
|
||||||
Resp = machi_pb:decode_mpb_ll_response(RespBin),
|
Resp = machi_pb:decode_mpb_ll_response(RespBin),
|
||||||
|
@ -617,6 +649,9 @@ do_pb_request_common(Sock, ReqID, Req) ->
|
||||||
Reply;
|
Reply;
|
||||||
{error, _}=Err ->
|
{error, _}=Err ->
|
||||||
throw(Err)
|
throw(Err)
|
||||||
|
end;
|
||||||
|
not GetReply_p ->
|
||||||
|
ok
|
||||||
end
|
end
|
||||||
catch
|
catch
|
||||||
throw:Error ->
|
throw:Error ->
|
||||||
|
|
|
@ -135,6 +135,10 @@ from_pb_request(#mpb_ll_request{
|
||||||
req_id=ReqID,
|
req_id=ReqID,
|
||||||
proj_la=#mpb_ll_listallprojectionsreq{type=ProjType}}) ->
|
proj_la=#mpb_ll_listallprojectionsreq{type=ProjType}}) ->
|
||||||
{ReqID, {low_proj, {list_all_projections, conv_to_type(ProjType)}}};
|
{ReqID, {low_proj, {list_all_projections, conv_to_type(ProjType)}}};
|
||||||
|
from_pb_request(#mpb_ll_request{
|
||||||
|
req_id=ReqID,
|
||||||
|
proj_kp=#mpb_ll_kickprojectionreactionreq{}}) ->
|
||||||
|
{ReqID, {low_proj, {kick_projection_reaction}}};
|
||||||
%%qqq
|
%%qqq
|
||||||
from_pb_request(#mpb_request{req_id=ReqID,
|
from_pb_request(#mpb_request{req_id=ReqID,
|
||||||
echo=#mpb_echoreq{message=Msg}}) ->
|
echo=#mpb_echoreq{message=Msg}}) ->
|
||||||
|
@ -313,6 +317,7 @@ from_pb_response(#mpb_ll_response{
|
||||||
_ ->
|
_ ->
|
||||||
{ReqID< machi_pb_high_client:convert_general_status_code(Status)}
|
{ReqID< machi_pb_high_client:convert_general_status_code(Status)}
|
||||||
end.
|
end.
|
||||||
|
%% No response for proj_kp/kick_projection_reaction
|
||||||
|
|
||||||
%% TODO: move the #mbp_* record making code from
|
%% TODO: move the #mbp_* record making code from
|
||||||
%% machi_pb_high_client:do_send_sync() clauses into to_pb_request().
|
%% machi_pb_high_client:do_send_sync() clauses into to_pb_request().
|
||||||
|
@ -403,9 +408,14 @@ to_pb_request(ReqID, {low_proj, {get_all_projections, ProjType}}) ->
|
||||||
proj_ga=#mpb_ll_getallprojectionsreq{type=conv_from_type(ProjType)}};
|
proj_ga=#mpb_ll_getallprojectionsreq{type=conv_from_type(ProjType)}};
|
||||||
to_pb_request(ReqID, {low_proj, {list_all_projections, ProjType}}) ->
|
to_pb_request(ReqID, {low_proj, {list_all_projections, ProjType}}) ->
|
||||||
#mpb_ll_request{req_id=ReqID, do_not_alter=2,
|
#mpb_ll_request{req_id=ReqID, do_not_alter=2,
|
||||||
proj_la=#mpb_ll_listallprojectionsreq{type=conv_from_type(ProjType)}}.
|
proj_la=#mpb_ll_listallprojectionsreq{type=conv_from_type(ProjType)}};
|
||||||
|
to_pb_request(ReqID, {low_proj, {kick_projection_reaction}}) ->
|
||||||
|
#mpb_ll_request{req_id=ReqID, do_not_alter=2,
|
||||||
|
proj_kp=#mpb_ll_kickprojectionreactionreq{}}.
|
||||||
%%qqq
|
%%qqq
|
||||||
|
|
||||||
|
to_pb_response(_ReqID, _, async_no_response=X) ->
|
||||||
|
X;
|
||||||
to_pb_response(ReqID, _, {low_error, ErrCode, ErrMsg}) ->
|
to_pb_response(ReqID, _, {low_error, ErrCode, ErrMsg}) ->
|
||||||
make_ll_error_resp(ReqID, ErrCode, ErrMsg);
|
make_ll_error_resp(ReqID, ErrCode, ErrMsg);
|
||||||
to_pb_response(ReqID, {low_echo, _BogusEpochID, _Msg}, Resp) ->
|
to_pb_response(ReqID, {low_echo, _BogusEpochID, _Msg}, Resp) ->
|
||||||
|
@ -567,6 +577,7 @@ to_pb_response(ReqID, {low_proj, {list_all_projections, _ProjType}}, Resp)->
|
||||||
proj_la=#mpb_ll_listallprojectionsresp{
|
proj_la=#mpb_ll_listallprojectionsresp{
|
||||||
status=Status}}
|
status=Status}}
|
||||||
end;
|
end;
|
||||||
|
%% No response for {kick_projection_reaction}!
|
||||||
%%qqq
|
%%qqq
|
||||||
to_pb_response(ReqID, _, {high_error, ErrCode, ErrMsg}) ->
|
to_pb_response(ReqID, _, {high_error, ErrCode, ErrMsg}) ->
|
||||||
make_error_resp(ReqID, ErrCode, ErrMsg);
|
make_error_resp(ReqID, ErrCode, ErrMsg);
|
||||||
|
|
|
@ -68,6 +68,7 @@
|
||||||
write_projection/3, write_projection/4,
|
write_projection/3, write_projection/4,
|
||||||
get_all_projections/2, get_all_projections/3,
|
get_all_projections/2, get_all_projections/3,
|
||||||
list_all_projections/2, list_all_projections/3,
|
list_all_projections/2, list_all_projections/3,
|
||||||
|
kick_projection_reaction/2, kick_projection_reaction/3,
|
||||||
|
|
||||||
%% Common API
|
%% Common API
|
||||||
quit/1,
|
quit/1,
|
||||||
|
@ -243,6 +244,19 @@ list_all_projections(PidSpec, ProjType, Timeout) ->
|
||||||
gen_server:call(PidSpec, {req, {list_all_projections, ProjType}},
|
gen_server:call(PidSpec, {req, {list_all_projections, ProjType}},
|
||||||
Timeout).
|
Timeout).
|
||||||
|
|
||||||
|
%% @doc Kick (politely) the remote chain manager to react to a
|
||||||
|
%% projection change.
|
||||||
|
|
||||||
|
kick_projection_reaction(PidSpec, Options) ->
|
||||||
|
kick_projection_reaction(PidSpec, Options, infinity).
|
||||||
|
|
||||||
|
%% @doc Kick (politely) the remote chain manager to react to a
|
||||||
|
%% projection change.
|
||||||
|
|
||||||
|
kick_projection_reaction(PidSpec, Options, Timeout) ->
|
||||||
|
gen_server:call(PidSpec, {req, {kick_projection_reaction, Options}},
|
||||||
|
Timeout).
|
||||||
|
|
||||||
%% @doc Quit & close the connection to remote FLU and stop our
|
%% @doc Quit & close the connection to remote FLU and stop our
|
||||||
%% proxy process.
|
%% proxy process.
|
||||||
|
|
||||||
|
@ -373,7 +387,10 @@ make_req_fun({get_all_projections, ProjType},
|
||||||
fun() -> Mod:get_all_projections(Sock, ProjType) end;
|
fun() -> Mod:get_all_projections(Sock, ProjType) end;
|
||||||
make_req_fun({list_all_projections, ProjType},
|
make_req_fun({list_all_projections, ProjType},
|
||||||
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
|
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
|
||||||
fun() -> Mod:list_all_projections(Sock, ProjType) end.
|
fun() -> Mod:list_all_projections(Sock, ProjType) end;
|
||||||
|
make_req_fun({kick_projection_reaction, Options},
|
||||||
|
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
|
||||||
|
fun() -> Mod:kick_projection_reaction(Sock, Options) end.
|
||||||
|
|
||||||
connected_p(#state{sock=SockMaybe,
|
connected_p(#state{sock=SockMaybe,
|
||||||
i=#p_srvr{proto_mod=Mod}=_I}=_S) ->
|
i=#p_srvr{proto_mod=Mod}=_I}=_S) ->
|
||||||
|
|
|
@ -132,7 +132,7 @@ increment_max_filenum(DataDir, Prefix) ->
|
||||||
try
|
try
|
||||||
{ok, FH} = file:open(make_config_filename(DataDir, Prefix), [append]),
|
{ok, FH} = file:open(make_config_filename(DataDir, Prefix), [append]),
|
||||||
ok = file:write(FH, "x"),
|
ok = file:write(FH, "x"),
|
||||||
%% ok = file:sync(FH),
|
ok = file:sync(FH),
|
||||||
ok = file:close(FH)
|
ok = file:close(FH)
|
||||||
catch
|
catch
|
||||||
error:{badmatch,_}=Error ->
|
error:{badmatch,_}=Error ->
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
%%
|
%%
|
||||||
%% Machi: a small village of replicated files
|
%% Machi: a small village of replicated files
|
||||||
%%
|
%%
|
||||||
%% Copyright (c) 2014 Basho Technologies, Inc. All Rights Reserved.
|
%% Copyright (c) 2014-2015 Basho Technologies, Inc. All Rights Reserved.
|
||||||
%%
|
%%
|
||||||
%% This file is provided to you under the Apache License,
|
%% This file is provided to you under the Apache License,
|
||||||
%% Version 2.0 (the "License"); you may not use this file
|
%% Version 2.0 (the "License"); you may not use this file
|
||||||
|
@ -207,7 +207,7 @@ convergence_demo_testfun(NumFLUs, MgrOpts0) ->
|
||||||
?MGR:sleep_ranked_order(
|
?MGR:sleep_ranked_order(
|
||||||
S_min, S_max_rand,
|
S_min, S_max_rand,
|
||||||
M_name, All_list),
|
M_name, All_list),
|
||||||
_ = ?MGR:test_react_to_env(MMM),
|
_ = ?MGR:trigger_react_to_env(MMM),
|
||||||
%% Be more unfair by not
|
%% Be more unfair by not
|
||||||
%% sleeping here.
|
%% sleeping here.
|
||||||
% timer:sleep(S_max - Elapsed),
|
% timer:sleep(S_max - Elapsed),
|
||||||
|
|
|
@ -82,7 +82,7 @@ command(S) ->
|
||||||
{ 1, {call, ?MODULE, change_partitions,
|
{ 1, {call, ?MODULE, change_partitions,
|
||||||
[gen_old_threshold(), gen_no_partition_threshold()]}},
|
[gen_old_threshold(), gen_no_partition_threshold()]}},
|
||||||
{50, {call, ?MODULE, do_ticks,
|
{50, {call, ?MODULE, do_ticks,
|
||||||
[choose(5, 200), S#state.pids,
|
[choose(5, 100), S#state.pids,
|
||||||
gen_old_threshold(), gen_no_partition_threshold()]}}
|
gen_old_threshold(), gen_no_partition_threshold()]}}
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
@ -149,11 +149,11 @@ setup(Num, Seed) ->
|
||||||
%% do all the same server first, then round-robin evenly across
|
%% do all the same server first, then round-robin evenly across
|
||||||
%% servers.
|
%% servers.
|
||||||
[begin
|
[begin
|
||||||
_QQa = machi_chain_manager1:test_react_to_env(get_chmgr(P))
|
_QQa = machi_chain_manager1:trigger_react_to_env(get_chmgr(P))
|
||||||
end || {P, _Dir} <- All_listE, _I <- lists:seq(1,20), _Repeat <- [1,2]],
|
end || {P, _Dir} <- All_listE, _I <- lists:seq(1,20), _Repeat <- [1,2]],
|
||||||
?QC_FMT(",z~w", [?LINE]),
|
?QC_FMT(",z~w", [?LINE]),
|
||||||
[begin
|
[begin
|
||||||
_QQa = machi_chain_manager1:test_react_to_env(get_chmgr(P))
|
_QQa = machi_chain_manager1:trigger_react_to_env(get_chmgr(P))
|
||||||
end || _I <- lists:seq(1,20), {P, _Dir} <- All_listE, _Repeat <- [1,2]],
|
end || _I <- lists:seq(1,20), {P, _Dir} <- All_listE, _Repeat <- [1,2]],
|
||||||
?QC_FMT(",z~w", [?LINE]),
|
?QC_FMT(",z~w", [?LINE]),
|
||||||
|
|
||||||
|
@ -280,7 +280,7 @@ dump_state() ->
|
||||||
|
|
||||||
prop_pulse() ->
|
prop_pulse() ->
|
||||||
?FORALL({Cmds0, Seed}, {non_empty(commands(?MODULE)), pulse:seed()},
|
?FORALL({Cmds0, Seed}, {non_empty(commands(?MODULE)), pulse:seed()},
|
||||||
?IMPLIES(1 < length(Cmds0) andalso length(Cmds0) < 5,
|
?IMPLIES(1 < length(Cmds0) andalso length(Cmds0) < 10,
|
||||||
begin
|
begin
|
||||||
ok = shutdown_hard(),
|
ok = shutdown_hard(),
|
||||||
%% PULSE can be really unfair, of course, including having exec_ticks
|
%% PULSE can be really unfair, of course, including having exec_ticks
|
||||||
|
@ -402,7 +402,7 @@ exec_ticks(Num, All_listE) ->
|
||||||
Max = 10,
|
Max = 10,
|
||||||
Elapsed =
|
Elapsed =
|
||||||
?MGR:sleep_ranked_order(1, Max, M_name, all_list()),
|
?MGR:sleep_ranked_order(1, Max, M_name, all_list()),
|
||||||
Res = ?MGR:test_react_to_env(get_chmgr(P)),
|
Res = ?MGR:trigger_react_to_env(get_chmgr(P)),
|
||||||
timer:sleep(erlang:max(0, Max - Elapsed)),
|
timer:sleep(erlang:max(0, Max - Elapsed)),
|
||||||
Res=Res %% ?D({self(), Res})
|
Res=Res %% ?D({self(), Res})
|
||||||
end || _ <- lists:seq(1,Num)],
|
end || _ <- lists:seq(1,Num)],
|
||||||
|
@ -421,10 +421,15 @@ private_projections_are_stable_check(ProxiesDict, All_listE) ->
|
||||||
%% also check for flapping, and if yes, to see if all_hosed are
|
%% also check for flapping, and if yes, to see if all_hosed are
|
||||||
%% all exactly equal.
|
%% all exactly equal.
|
||||||
|
|
||||||
_ = exec_ticks(40, All_listE),
|
%% gobble_calls() workaround: many small exec_ticks() calls +
|
||||||
|
%% sleep after each.
|
||||||
|
[begin
|
||||||
|
_ = exec_ticks(10, All_listE),
|
||||||
|
timer:sleep(10)
|
||||||
|
end|| _ <- lists:seq(1, 40)],
|
||||||
Private1 = [?FLU_PC:get_latest_epochid(Proxy, private) ||
|
Private1 = [?FLU_PC:get_latest_epochid(Proxy, private) ||
|
||||||
{_FLU, Proxy} <- orddict:to_list(ProxiesDict)],
|
{_FLU, Proxy} <- orddict:to_list(ProxiesDict)],
|
||||||
_ = exec_ticks(5, All_listE),
|
_ = exec_ticks(3*20, All_listE),
|
||||||
Private2 = [?FLU_PC:get_latest_epochid(Proxy, private) ||
|
Private2 = [?FLU_PC:get_latest_epochid(Proxy, private) ||
|
||||||
{_FLU, Proxy} <- orddict:to_list(ProxiesDict)],
|
{_FLU, Proxy} <- orddict:to_list(ProxiesDict)],
|
||||||
|
|
||||||
|
|
|
@ -232,22 +232,23 @@ nonunanimous_setup_and_fix_test() ->
|
||||||
%% we expect nothing to change when called again.
|
%% we expect nothing to change when called again.
|
||||||
{not_unanimous,_,_}=_YY = ?MGR:test_read_latest_public_projection(Ma, true),
|
{not_unanimous,_,_}=_YY = ?MGR:test_read_latest_public_projection(Ma, true),
|
||||||
|
|
||||||
{now_using, _, EpochNum_a} = ?MGR:test_react_to_env(Ma),
|
{now_using, _, EpochNum_a} = ?MGR:trigger_react_to_env(Ma),
|
||||||
{no_change, _, EpochNum_a} = ?MGR:test_react_to_env(Ma),
|
{no_change, _, EpochNum_a} = ?MGR:trigger_react_to_env(Ma),
|
||||||
{unanimous,P2,_E2} = ?MGR:test_read_latest_public_projection(Ma, false),
|
{unanimous,P2,_E2} = ?MGR:test_read_latest_public_projection(Ma, false),
|
||||||
{ok, P2pa} = ?FLU_PC:read_latest_projection(Proxy_a, private),
|
{ok, P2pa} = ?FLU_PC:read_latest_projection(Proxy_a, private),
|
||||||
P2 = P2pa#projection_v1{dbg2=[]},
|
P2 = P2pa#projection_v1{dbg2=[]},
|
||||||
|
|
||||||
%% FLUb should have nothing written to private because it hasn't
|
%% %% FLUb should have nothing written to private because it hasn't
|
||||||
%% reacted yet.
|
%% %% reacted yet.
|
||||||
{error, not_written} = ?FLU_PC:read_latest_projection(Proxy_b, private),
|
%% {error, not_written} = ?FLU_PC:read_latest_projection(Proxy_b, private),
|
||||||
|
|
||||||
%% Poke FLUb to react ... should be using the same private proj
|
%% %% Poke FLUb to react ... should be using the same private proj
|
||||||
%% as FLUa.
|
%% %% as FLUa.
|
||||||
{now_using, _, EpochNum_a} = ?MGR:test_react_to_env(Mb),
|
%% {now_using, _, EpochNum_a} = ?MGR:trigger_react_to_env(Mb),
|
||||||
{ok, P2pb} = ?FLU_PC:read_latest_projection(Proxy_b, private),
|
{ok, P2pb} = ?FLU_PC:read_latest_projection(Proxy_b, private),
|
||||||
P2 = P2pb#projection_v1{dbg2=[]},
|
P2 = P2pb#projection_v1{dbg2=[]},
|
||||||
|
|
||||||
|
timer:sleep(3000),
|
||||||
ok
|
ok
|
||||||
after
|
after
|
||||||
ok = ?MGR:stop(Ma),
|
ok = ?MGR:stop(Ma),
|
||||||
|
|
|
@ -51,9 +51,9 @@ smoke_test2() ->
|
||||||
{ok, _} = machi_flu_psup:start_flu_package(Name, Port, Dir, [])
|
{ok, _} = machi_flu_psup:start_flu_package(Name, Port, Dir, [])
|
||||||
end || {_,P} <- [hd(Ps)]],
|
end || {_,P} <- [hd(Ps)]],
|
||||||
|
|
||||||
[machi_chain_manager1:test_react_to_env(a_chmgr) || _ <-lists:seq(1,5)],
|
[machi_chain_manager1:trigger_react_to_env(a_chmgr) || _ <-lists:seq(1,5)],
|
||||||
machi_chain_manager1:set_chain_members(a_chmgr, orddict:from_list(Ps)),
|
machi_chain_manager1:set_chain_members(a_chmgr, orddict:from_list(Ps)),
|
||||||
[machi_chain_manager1:test_react_to_env(a_chmgr) || _ <-lists:seq(1,5)],
|
[machi_chain_manager1:trigger_react_to_env(a_chmgr) || _ <-lists:seq(1,5)],
|
||||||
ok
|
ok
|
||||||
after
|
after
|
||||||
exit(SupPid, normal),
|
exit(SupPid, normal),
|
||||||
|
@ -99,17 +99,18 @@ partial_stop_restart2() ->
|
||||||
{ok, {false, EpochID1}} = WedgeStatus(hd(Ps)),
|
{ok, {false, EpochID1}} = WedgeStatus(hd(Ps)),
|
||||||
[{ok, {false, EpochID1}} = WedgeStatus(P) || P <- Ps], % *not* wedged
|
[{ok, {false, EpochID1}} = WedgeStatus(P) || P <- Ps], % *not* wedged
|
||||||
[{ok,_} = Append(P, EpochID1) || P <- Ps], % *not* wedged
|
[{ok,_} = Append(P, EpochID1) || P <- Ps], % *not* wedged
|
||||||
|
{ok, {_,_,File1}} = Append(hd(Ps), EpochID1),
|
||||||
|
|
||||||
{_,_,_} = machi_chain_manager1:test_react_to_env(hd(ChMgrs)),
|
{_,_,_} = machi_chain_manager1:trigger_react_to_env(hd(ChMgrs)),
|
||||||
[begin
|
[begin
|
||||||
_QQa = machi_chain_manager1:test_react_to_env(ChMgr)
|
_QQa = machi_chain_manager1:trigger_react_to_env(ChMgr)
|
||||||
end || _ <- lists:seq(1,125), ChMgr <- ChMgrs],
|
end || _ <- lists:seq(1,125), ChMgr <- ChMgrs],
|
||||||
|
|
||||||
%% All chain managers & projection stores should be using the
|
%% All chain managers & projection stores should be using the
|
||||||
%% same projection which is max projection in each store.
|
%% same projection which is max projection in each store.
|
||||||
{no_change,_,Epoch_m} = machi_chain_manager1:test_react_to_env(
|
{no_change,_,Epoch_m} = machi_chain_manager1:trigger_react_to_env(
|
||||||
hd(ChMgrs)),
|
hd(ChMgrs)),
|
||||||
[{no_change,_,Epoch_m} = machi_chain_manager1:test_react_to_env(
|
[{no_change,_,Epoch_m} = machi_chain_manager1:trigger_react_to_env(
|
||||||
ChMgr )|| ChMgr <- ChMgrs],
|
ChMgr )|| ChMgr <- ChMgrs],
|
||||||
{ok, Proj_m} = machi_projection_store:read_latest_projection(
|
{ok, Proj_m} = machi_projection_store:read_latest_projection(
|
||||||
hd(PStores), public),
|
hd(PStores), public),
|
||||||
|
@ -126,6 +127,13 @@ partial_stop_restart2() ->
|
||||||
P <- Ps],
|
P <- Ps],
|
||||||
{ok, {false, EpochID2}} = WedgeStatus(hd(Ps)),
|
{ok, {false, EpochID2}} = WedgeStatus(hd(Ps)),
|
||||||
[{ok,_} = Append(P, EpochID2) || P <- Ps], % *not* wedged
|
[{ok,_} = Append(P, EpochID2) || P <- Ps], % *not* wedged
|
||||||
|
%% The file we're assigned should be different with the epoch change.
|
||||||
|
{ok, {_,_,File2}} = Append(hd(Ps), EpochID2),
|
||||||
|
true = (File1 /= File2),
|
||||||
|
%% If we use the old epoch, then we're told that it's bad
|
||||||
|
{error, bad_epoch} = Append(hd(Ps), EpochID1),
|
||||||
|
%% If we use the current epoch again, then it's OK and given same File2
|
||||||
|
{ok, {_,_,File2}} = Append(hd(Ps), EpochID2),
|
||||||
|
|
||||||
%% Stop all but 'a'.
|
%% Stop all but 'a'.
|
||||||
[ok = machi_flu_psup:stop_flu_package(Name) || {Name,_} <- tl(Ps)],
|
[ok = machi_flu_psup:stop_flu_package(Name) || {Name,_} <- tl(Ps)],
|
||||||
|
@ -152,10 +160,13 @@ partial_stop_restart2() ->
|
||||||
Addr_a, TcpPort_a, ?DUMMY_PV1_EPOCH),
|
Addr_a, TcpPort_a, ?DUMMY_PV1_EPOCH),
|
||||||
|
|
||||||
%% Iterate through humming consensus once
|
%% Iterate through humming consensus once
|
||||||
{now_using,_,Epoch_n} = machi_chain_manager1:test_react_to_env(
|
{now_using,_,Epoch_n} = machi_chain_manager1:trigger_react_to_env(
|
||||||
hd(ChMgrs)),
|
hd(ChMgrs)),
|
||||||
true = (Epoch_n > Epoch_m),
|
true = (Epoch_n > Epoch_m),
|
||||||
{ok, {false, EpochID3}} = WedgeStatus(hd(Ps)),
|
{ok, {false, EpochID3}} = WedgeStatus(hd(Ps)),
|
||||||
|
%% The file we're assigned should be different with the epoch change.
|
||||||
|
{ok, {_,_,File3}} = Append(hd(Ps), EpochID3),
|
||||||
|
true = (File2 /= File3),
|
||||||
|
|
||||||
%% Confirm that 'a' is *not* wedged
|
%% Confirm that 'a' is *not* wedged
|
||||||
{ok, _} = Append(hd(Ps), EpochID3),
|
{ok, _} = Append(hd(Ps), EpochID3),
|
||||||
|
|
|
@ -47,7 +47,7 @@ smoke_test2() ->
|
||||||
{ok, _} = machi_flu_psup:start_flu_package(Name, Port, Dir, [])
|
{ok, _} = machi_flu_psup:start_flu_package(Name, Port, Dir, [])
|
||||||
end || P <- Ps],
|
end || P <- Ps],
|
||||||
ok = machi_chain_manager1:set_chain_members(a_chmgr, D),
|
ok = machi_chain_manager1:set_chain_members(a_chmgr, D),
|
||||||
[machi_chain_manager1:test_react_to_env(a_chmgr) || _ <-lists:seq(1,5)],
|
[machi_chain_manager1:trigger_react_to_env(a_chmgr) || _ <-lists:seq(1,5)],
|
||||||
|
|
||||||
{ok, Clnt} = ?C:start_link(Ps),
|
{ok, Clnt} = ?C:start_link(Ps),
|
||||||
try
|
try
|
||||||
|
|
|
@ -75,6 +75,11 @@ api_smoke_test() ->
|
||||||
<<"foo-file">>, 99832,
|
<<"foo-file">>, 99832,
|
||||||
MyChunk_badcs),
|
MyChunk_badcs),
|
||||||
|
|
||||||
|
%% Put kick_projection_reaction() in the middle of the test so
|
||||||
|
%% that any problems with its async nature will (hopefully)
|
||||||
|
%% cause problems later in the test.
|
||||||
|
ok = ?MUT:kick_projection_reaction(Prox1, []),
|
||||||
|
|
||||||
%% Alright, now for the rest of the API, whee
|
%% Alright, now for the rest of the API, whee
|
||||||
BadFile = <<"no-such-file">>,
|
BadFile = <<"no-such-file">>,
|
||||||
{error, no_such_file} = ?MUT:checksum_list(Prox1, FakeEpoch, BadFile),
|
{error, no_such_file} = ?MUT:checksum_list(Prox1, FakeEpoch, BadFile),
|
||||||
|
|
Loading…
Reference in a new issue