Merge pull request #46 from basho/ss/flu1-factorization1-ranch
FLU1 Factorization 1/N: Introduce ranch and factor out socket handling process
This commit is contained in:
commit
436c308db2
18 changed files with 943 additions and 669 deletions
|
@ -11,6 +11,7 @@
|
||||||
{lager, ".*", {git, "git://github.com/basho/lager.git", {tag, "2.2.0"}}},
|
{lager, ".*", {git, "git://github.com/basho/lager.git", {tag, "2.2.0"}}},
|
||||||
{protobuffs, "0.8.*", {git, "git://github.com/basho/erlang_protobuffs.git", {tag, "0.8.1p4"}}},
|
{protobuffs, "0.8.*", {git, "git://github.com/basho/erlang_protobuffs.git", {tag, "0.8.1p4"}}},
|
||||||
{riak_dt, ".*", {git, "git://github.com/basho/riak_dt.git", {branch, "develop"}}},
|
{riak_dt, ".*", {git, "git://github.com/basho/riak_dt.git", {branch, "develop"}}},
|
||||||
|
{ranch, ".*", {git, "git://github.com/ninenines/ranch.git", {branch, "master"}}},
|
||||||
{node_package, ".*", {git, "git://github.com/basho/node_package.git", {branch, "develop"}}},
|
{node_package, ".*", {git, "git://github.com/basho/node_package.git", {branch, "develop"}}},
|
||||||
{eper, ".*", {git, "git://github.com/basho/eper.git", {tag, "0.92-basho1"}}},
|
{eper, ".*", {git, "git://github.com/basho/eper.git", {tag, "0.92-basho1"}}},
|
||||||
{cluster_info, ".*", {git, "git://github.com/basho/cluster_info", {branch, "develop"}}}
|
{cluster_info, ".*", {git, "git://github.com/basho/cluster_info", {branch, "develop"}}}
|
||||||
|
|
|
@ -58,9 +58,9 @@
|
||||||
|
|
||||||
-export([start_link/1, stop/1,
|
-export([start_link/1, stop/1,
|
||||||
update_wedge_state/3, wedge_myself/2]).
|
update_wedge_state/3, wedge_myself/2]).
|
||||||
-export([make_listener_regname/1, make_projection_server_regname/1]).
|
-export([make_projection_server_regname/1]).
|
||||||
%% TODO: remove or replace in OTP way after gen_*'ified
|
%% TODO: remove or replace in OTP way after gen_*'ified
|
||||||
-export([main2/4, run_append_server/2, run_listen_server/1,
|
-export([main2/4, run_append_server/2,
|
||||||
current_state/1, format_state/1]).
|
current_state/1, format_state/1]).
|
||||||
|
|
||||||
-record(state, {
|
-record(state, {
|
||||||
|
@ -68,14 +68,9 @@
|
||||||
proj_store :: pid(),
|
proj_store :: pid(),
|
||||||
witness = false :: boolean(),
|
witness = false :: boolean(),
|
||||||
append_pid :: pid(),
|
append_pid :: pid(),
|
||||||
tcp_port :: non_neg_integer(),
|
|
||||||
data_dir :: string(),
|
|
||||||
wedged = true :: boolean(),
|
wedged = true :: boolean(),
|
||||||
etstab :: ets:tid(),
|
etstab :: ets:tid(),
|
||||||
epoch_id :: 'undefined' | machi_dt:epoch_id(),
|
epoch_id :: 'undefined' | machi_dt:epoch_id(),
|
||||||
pb_mode = undefined :: 'undefined' | 'high' | 'low',
|
|
||||||
high_clnt :: 'undefined' | pid(),
|
|
||||||
trim_table :: ets:tid(),
|
|
||||||
props = [] :: list() % proplist
|
props = [] :: list() % proplist
|
||||||
}).
|
}).
|
||||||
|
|
||||||
|
@ -87,7 +82,12 @@ start_link([{FluName, TcpPort, DataDir}|Rest])
|
||||||
proc_lib:start_link(?MODULE, main2, [FluName, TcpPort, DataDir, Rest],
|
proc_lib:start_link(?MODULE, main2, [FluName, TcpPort, DataDir, Rest],
|
||||||
?INIT_TIMEOUT).
|
?INIT_TIMEOUT).
|
||||||
|
|
||||||
stop(Pid) ->
|
stop(RegName) when is_atom(RegName) ->
|
||||||
|
case whereis(RegName) of
|
||||||
|
undefined -> ok;
|
||||||
|
Pid -> stop(Pid)
|
||||||
|
end;
|
||||||
|
stop(Pid) when is_pid(Pid) ->
|
||||||
case erlang:is_process_alive(Pid) of
|
case erlang:is_process_alive(Pid) of
|
||||||
true ->
|
true ->
|
||||||
Pid ! killme,
|
Pid ! killme,
|
||||||
|
@ -152,8 +152,6 @@ main2(FluName, TcpPort, DataDir, Props) ->
|
||||||
|
|
||||||
S0 = #state{flu_name=FluName,
|
S0 = #state{flu_name=FluName,
|
||||||
proj_store=ProjectionPid,
|
proj_store=ProjectionPid,
|
||||||
tcp_port=TcpPort,
|
|
||||||
data_dir=DataDir,
|
|
||||||
wedged=Wedged_p,
|
wedged=Wedged_p,
|
||||||
witness=Witness_p,
|
witness=Witness_p,
|
||||||
etstab=ets_table_name(FluName),
|
etstab=ets_table_name(FluName),
|
||||||
|
@ -167,7 +165,8 @@ main2(FluName, TcpPort, DataDir, Props) ->
|
||||||
ok
|
ok
|
||||||
end,
|
end,
|
||||||
S1 = S0#state{append_pid=AppendPid},
|
S1 = S0#state{append_pid=AppendPid},
|
||||||
{ok, ListenPid} = start_listen_server(S1),
|
{ok, ListenerPid} = start_listen_server(TcpPort, DataDir, S1),
|
||||||
|
%% io:format(user, "Listener started: ~w~n", [{FluName, ListenerPid}]),
|
||||||
|
|
||||||
Config_e = machi_util:make_config_filename(DataDir, "unused"),
|
Config_e = machi_util:make_config_filename(DataDir, "unused"),
|
||||||
ok = filelib:ensure_dir(Config_e),
|
ok = filelib:ensure_dir(Config_e),
|
||||||
|
@ -179,36 +178,23 @@ main2(FluName, TcpPort, DataDir, Props) ->
|
||||||
put(flu_flu_name, FluName),
|
put(flu_flu_name, FluName),
|
||||||
put(flu_append_pid, S1#state.append_pid),
|
put(flu_append_pid, S1#state.append_pid),
|
||||||
put(flu_projection_pid, ProjectionPid),
|
put(flu_projection_pid, ProjectionPid),
|
||||||
put(flu_listen_pid, ListenPid),
|
put(flu_listen_pid, ListenerPid),
|
||||||
proc_lib:init_ack({ok, self()}),
|
proc_lib:init_ack({ok, self()}),
|
||||||
|
|
||||||
receive killme -> ok end,
|
receive killme -> ok end,
|
||||||
(catch exit(S1#state.append_pid, kill)),
|
(catch exit(S1#state.append_pid, kill)),
|
||||||
(catch exit(ProjectionPid, kill)),
|
(catch exit(ProjectionPid, kill)),
|
||||||
(catch exit(ListenPid, kill)),
|
(catch exit(ListenerPid, kill)),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
start_listen_server(S) ->
|
|
||||||
proc_lib:start_link(?MODULE, run_listen_server, [S], ?INIT_TIMEOUT).
|
|
||||||
|
|
||||||
start_append_server(S, AckPid) ->
|
start_append_server(S, AckPid) ->
|
||||||
proc_lib:start_link(?MODULE, run_append_server, [AckPid, S], ?INIT_TIMEOUT).
|
proc_lib:start_link(?MODULE, run_append_server, [AckPid, S], ?INIT_TIMEOUT).
|
||||||
|
|
||||||
run_listen_server(#state{flu_name=FluName, tcp_port=TcpPort}=S) ->
|
start_listen_server(TcpPort, DataDir,
|
||||||
register(make_listener_regname(FluName), self()),
|
#state{flu_name=FluName, witness=Witness, etstab=EtsTab,
|
||||||
SockOpts = ?PB_PACKET_OPTS ++
|
proj_store=ProjStore}=_S) ->
|
||||||
[{reuseaddr, true}, {mode, binary}, {active, false},
|
machi_listener_sup:start_listener(FluName, TcpPort, Witness, DataDir,
|
||||||
{backlog,8192}],
|
EtsTab, ProjStore).
|
||||||
case gen_tcp:listen(TcpPort, SockOpts) of
|
|
||||||
{ok, LSock} ->
|
|
||||||
proc_lib:init_ack({ok, self()}),
|
|
||||||
listen_server_loop(LSock, S);
|
|
||||||
Else ->
|
|
||||||
error_logger:warning_msg("~s:run_listen_server: "
|
|
||||||
"listen to TCP port ~w: ~w\n",
|
|
||||||
[?MODULE, TcpPort, Else]),
|
|
||||||
exit({?MODULE, run_listen_server, tcp_port, TcpPort, Else})
|
|
||||||
end.
|
|
||||||
|
|
||||||
run_append_server(FluPid, #state{flu_name=Name,
|
run_append_server(FluPid, #state{flu_name=Name,
|
||||||
wedged=Wedged_p,epoch_id=EpochId}=S) ->
|
wedged=Wedged_p,epoch_id=EpochId}=S) ->
|
||||||
|
@ -220,18 +206,13 @@ run_append_server(FluPid, #state{flu_name=Name,
|
||||||
proc_lib:init_ack({ok, self()}),
|
proc_lib:init_ack({ok, self()}),
|
||||||
append_server_loop(FluPid, S#state{etstab=TID}).
|
append_server_loop(FluPid, S#state{etstab=TID}).
|
||||||
|
|
||||||
listen_server_loop(LSock, S) ->
|
|
||||||
{ok, Sock} = gen_tcp:accept(LSock),
|
|
||||||
spawn_link(fun() -> net_server_loop(Sock, S) end),
|
|
||||||
listen_server_loop(LSock, S).
|
|
||||||
|
|
||||||
append_server_loop(FluPid, #state{wedged=Wedged_p,
|
append_server_loop(FluPid, #state{wedged=Wedged_p,
|
||||||
witness=Witness_p,
|
witness=Witness_p,
|
||||||
epoch_id=OldEpochId, flu_name=FluName}=S) ->
|
epoch_id=OldEpochId, flu_name=FluName}=S) ->
|
||||||
receive
|
receive
|
||||||
{seq_append, From, _N, _L, _Prefix, _Chunk, _CSum, _Extra, _EpochID}
|
{seq_append, From, _N, _L, _Prefix, _Chunk, _CSum, _Extra, _EpochID}
|
||||||
when Witness_p ->
|
when Witness_p ->
|
||||||
%% The FLU's net_server_loop() process ought to filter all
|
%% The FLU's machi_flu1_net_server process ought to filter all
|
||||||
%% witness states, but we'll keep this clause for extra
|
%% witness states, but we'll keep this clause for extra
|
||||||
%% paranoia.
|
%% paranoia.
|
||||||
From ! witness,
|
From ! witness,
|
||||||
|
@ -292,398 +273,6 @@ append_server_loop(FluPid, #state{wedged=Wedged_p,
|
||||||
append_server_loop(FluPid, S)
|
append_server_loop(FluPid, S)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
net_server_loop(Sock, S) ->
|
|
||||||
case gen_tcp:recv(Sock, 0, ?SERVER_CMD_READ_TIMEOUT) of
|
|
||||||
{ok, Bin} ->
|
|
||||||
{RespBin, S2} =
|
|
||||||
case machi_pb:decode_mpb_ll_request(Bin) of
|
|
||||||
LL_req when LL_req#mpb_ll_request.do_not_alter == 2 ->
|
|
||||||
{R, NewS} = do_pb_ll_request(LL_req, S),
|
|
||||||
{maybe_encode_response(R), mode(low, NewS)};
|
|
||||||
_ ->
|
|
||||||
HL_req = machi_pb:decode_mpb_request(Bin),
|
|
||||||
1 = HL_req#mpb_request.do_not_alter,
|
|
||||||
{R, NewS} = do_pb_hl_request(HL_req, make_high_clnt(S)),
|
|
||||||
{machi_pb:encode_mpb_response(R), mode(high, NewS)}
|
|
||||||
end,
|
|
||||||
if RespBin == async_no_response ->
|
|
||||||
net_server_loop(Sock, S2);
|
|
||||||
true ->
|
|
||||||
case gen_tcp:send(Sock, RespBin) of
|
|
||||||
ok ->
|
|
||||||
net_server_loop(Sock, S2);
|
|
||||||
{error, _} ->
|
|
||||||
(catch gen_tcp:close(Sock)),
|
|
||||||
exit(normal)
|
|
||||||
end
|
|
||||||
end;
|
|
||||||
{error, SockError} ->
|
|
||||||
Msg = io_lib:format("Socket error ~w", [SockError]),
|
|
||||||
R = #mpb_ll_response{req_id= <<>>,
|
|
||||||
generic=#mpb_errorresp{code=1, msg=Msg}},
|
|
||||||
_Resp = machi_pb:encode_mpb_ll_response(R),
|
|
||||||
%% TODO: Weird that sometimes neither catch nor try/catch
|
|
||||||
%% can prevent OTP's SASL from logging an error here.
|
|
||||||
%% Error in process <0.545.0> with exit value: {badarg,[{erlang,port_command,.......
|
|
||||||
%% TODO: is this what causes the intermittent PULSE deadlock errors?
|
|
||||||
%% _ = (catch gen_tcp:send(Sock, _Resp)), timer:sleep(1000),
|
|
||||||
(catch gen_tcp:close(Sock)),
|
|
||||||
exit(normal)
|
|
||||||
end.
|
|
||||||
|
|
||||||
maybe_encode_response(async_no_response=X) ->
|
|
||||||
X;
|
|
||||||
maybe_encode_response(R) ->
|
|
||||||
machi_pb:encode_mpb_ll_response(R).
|
|
||||||
|
|
||||||
mode(Mode, #state{pb_mode=undefined}=S) ->
|
|
||||||
S#state{pb_mode=Mode};
|
|
||||||
mode(_, S) ->
|
|
||||||
S.
|
|
||||||
|
|
||||||
make_high_clnt(#state{high_clnt=undefined}=S) ->
|
|
||||||
{ok, Proj} = machi_projection_store:read_latest_projection(
|
|
||||||
S#state.proj_store, private),
|
|
||||||
Ps = [P_srvr || {_, P_srvr} <- orddict:to_list(
|
|
||||||
Proj#projection_v1.members_dict)],
|
|
||||||
{ok, Clnt} = machi_cr_client:start_link(Ps),
|
|
||||||
S#state{high_clnt=Clnt};
|
|
||||||
make_high_clnt(S) ->
|
|
||||||
S.
|
|
||||||
|
|
||||||
do_pb_ll_request(#mpb_ll_request{req_id=ReqID}, #state{pb_mode=high}=S) ->
|
|
||||||
Result = {high_error, 41, "Low protocol request while in high mode"},
|
|
||||||
{machi_pb_translate:to_pb_response(ReqID, unused, Result), S};
|
|
||||||
do_pb_ll_request(PB_request, S) ->
|
|
||||||
Req = machi_pb_translate:from_pb_request(PB_request),
|
|
||||||
{ReqID, Cmd, Result, S2} =
|
|
||||||
case Req of
|
|
||||||
{RqID, {LowCmd, _}=CMD}
|
|
||||||
when LowCmd == low_proj;
|
|
||||||
LowCmd == low_wedge_status; LowCmd == low_list_files ->
|
|
||||||
%% Skip wedge check for projection commands!
|
|
||||||
%% Skip wedge check for these unprivileged commands
|
|
||||||
{Rs, NewS} = do_pb_ll_request3(CMD, S),
|
|
||||||
{RqID, CMD, Rs, NewS};
|
|
||||||
{RqID, CMD} ->
|
|
||||||
EpochID = element(2, CMD), % by common convention
|
|
||||||
{Rs, NewS} = do_pb_ll_request2(EpochID, CMD, S),
|
|
||||||
{RqID, CMD, Rs, NewS}
|
|
||||||
end,
|
|
||||||
{machi_pb_translate:to_pb_response(ReqID, Cmd, Result), S2}.
|
|
||||||
|
|
||||||
do_pb_ll_request2(EpochID, CMD, S) ->
|
|
||||||
{Wedged_p, CurrentEpochID} = ets:lookup_element(S#state.etstab, epoch, 2),
|
|
||||||
if Wedged_p == true ->
|
|
||||||
{{error, wedged}, S#state{epoch_id=CurrentEpochID}};
|
|
||||||
is_tuple(EpochID)
|
|
||||||
andalso
|
|
||||||
EpochID /= CurrentEpochID ->
|
|
||||||
{Epoch, _} = EpochID,
|
|
||||||
{CurrentEpoch, _} = CurrentEpochID,
|
|
||||||
if Epoch < CurrentEpoch ->
|
|
||||||
ok;
|
|
||||||
true ->
|
|
||||||
%% We're at same epoch # but different checksum, or
|
|
||||||
%% we're at a newer/bigger epoch #.
|
|
||||||
_ = wedge_myself(S#state.flu_name, CurrentEpochID),
|
|
||||||
ok
|
|
||||||
end,
|
|
||||||
{{error, bad_epoch}, S#state{epoch_id=CurrentEpochID}};
|
|
||||||
true ->
|
|
||||||
do_pb_ll_request3(CMD, S#state{epoch_id=CurrentEpochID})
|
|
||||||
end.
|
|
||||||
|
|
||||||
%% Witness status does not matter below.
|
|
||||||
do_pb_ll_request3({low_echo, _BogusEpochID, Msg}, S) ->
|
|
||||||
{Msg, S};
|
|
||||||
do_pb_ll_request3({low_auth, _BogusEpochID, _User, _Pass}, S) ->
|
|
||||||
{-6, S};
|
|
||||||
do_pb_ll_request3({low_wedge_status, _EpochID}, S) ->
|
|
||||||
{do_server_wedge_status(S), S};
|
|
||||||
do_pb_ll_request3({low_proj, PCMD}, S) ->
|
|
||||||
{do_server_proj_request(PCMD, S), S};
|
|
||||||
%% Witness status *matters* below
|
|
||||||
do_pb_ll_request3({low_append_chunk, _EpochID, CoC_Namespace, CoC_Locator,
|
|
||||||
Prefix, Chunk, CSum_tag,
|
|
||||||
CSum, ChunkExtra},
|
|
||||||
#state{witness=false}=S) ->
|
|
||||||
{do_server_append_chunk(CoC_Namespace, CoC_Locator,
|
|
||||||
Prefix, Chunk, CSum_tag, CSum,
|
|
||||||
ChunkExtra, S), S};
|
|
||||||
do_pb_ll_request3({low_write_chunk, _EpochID, File, Offset, Chunk, CSum_tag,
|
|
||||||
CSum},
|
|
||||||
#state{witness=false}=S) ->
|
|
||||||
{do_server_write_chunk(File, Offset, Chunk, CSum_tag, CSum, S), S};
|
|
||||||
do_pb_ll_request3({low_read_chunk, _EpochID, File, Offset, Size, Opts},
|
|
||||||
#state{witness=false} = S) ->
|
|
||||||
{do_server_read_chunk(File, Offset, Size, Opts, S), S};
|
|
||||||
do_pb_ll_request3({low_trim_chunk, _EpochID, File, Offset, Size, TriggerGC},
|
|
||||||
#state{witness=false}=S) ->
|
|
||||||
{do_server_trim_chunk(File, Offset, Size, TriggerGC, S), S};
|
|
||||||
do_pb_ll_request3({low_checksum_list, _EpochID, File},
|
|
||||||
#state{witness=false}=S) ->
|
|
||||||
{do_server_checksum_listing(File, S), S};
|
|
||||||
do_pb_ll_request3({low_list_files, _EpochID},
|
|
||||||
#state{witness=false}=S) ->
|
|
||||||
{do_server_list_files(S), S};
|
|
||||||
do_pb_ll_request3({low_delete_migration, _EpochID, File},
|
|
||||||
#state{witness=false}=S) ->
|
|
||||||
{do_server_delete_migration(File, S),
|
|
||||||
#state{witness=false}=S};
|
|
||||||
do_pb_ll_request3({low_trunc_hack, _EpochID, File},
|
|
||||||
#state{witness=false}=S) ->
|
|
||||||
{do_server_trunc_hack(File, S), S};
|
|
||||||
do_pb_ll_request3(_, #state{witness=true}=S) ->
|
|
||||||
{{error, bad_arg}, S}. % TODO: new status code??
|
|
||||||
|
|
||||||
do_pb_hl_request(#mpb_request{req_id=ReqID}, #state{pb_mode=low}=S) ->
|
|
||||||
Result = {low_error, 41, "High protocol request while in low mode"},
|
|
||||||
{machi_pb_translate:to_pb_response(ReqID, unused, Result), S};
|
|
||||||
do_pb_hl_request(PB_request, S) ->
|
|
||||||
{ReqID, Cmd} = machi_pb_translate:from_pb_request(PB_request),
|
|
||||||
{Result, S2} = do_pb_hl_request2(Cmd, S),
|
|
||||||
{machi_pb_translate:to_pb_response(ReqID, Cmd, Result), S2}.
|
|
||||||
|
|
||||||
do_pb_hl_request2({high_echo, Msg}, S) ->
|
|
||||||
{Msg, S};
|
|
||||||
do_pb_hl_request2({high_auth, _User, _Pass}, S) ->
|
|
||||||
{-77, S};
|
|
||||||
do_pb_hl_request2({high_append_chunk, CoC_Namespace, CoC_Locator,
|
|
||||||
Prefix, ChunkBin, TaggedCSum,
|
|
||||||
ChunkExtra}, #state{high_clnt=Clnt}=S) ->
|
|
||||||
Chunk = {TaggedCSum, ChunkBin},
|
|
||||||
Res = machi_cr_client:append_chunk_extra(Clnt, CoC_Namespace, CoC_Locator,
|
|
||||||
Prefix, Chunk,
|
|
||||||
ChunkExtra),
|
|
||||||
{Res, S};
|
|
||||||
do_pb_hl_request2({high_write_chunk, File, Offset, ChunkBin, TaggedCSum},
|
|
||||||
#state{high_clnt=Clnt}=S) ->
|
|
||||||
Chunk = {TaggedCSum, ChunkBin},
|
|
||||||
Res = machi_cr_client:write_chunk(Clnt, File, Offset, Chunk),
|
|
||||||
{Res, S};
|
|
||||||
do_pb_hl_request2({high_read_chunk, File, Offset, Size, Opts},
|
|
||||||
#state{high_clnt=Clnt}=S) ->
|
|
||||||
Res = machi_cr_client:read_chunk(Clnt, File, Offset, Size, Opts),
|
|
||||||
{Res, S};
|
|
||||||
do_pb_hl_request2({high_trim_chunk, File, Offset, Size},
|
|
||||||
#state{high_clnt=Clnt}=S) ->
|
|
||||||
Res = machi_cr_client:trim_chunk(Clnt, File, Offset, Size),
|
|
||||||
{Res, S};
|
|
||||||
do_pb_hl_request2({high_checksum_list, File}, #state{high_clnt=Clnt}=S) ->
|
|
||||||
Res = machi_cr_client:checksum_list(Clnt, File),
|
|
||||||
{Res, S};
|
|
||||||
do_pb_hl_request2({high_list_files}, #state{high_clnt=Clnt}=S) ->
|
|
||||||
Res = machi_cr_client:list_files(Clnt),
|
|
||||||
{Res, S}.
|
|
||||||
|
|
||||||
do_server_proj_request({get_latest_epochid, ProjType},
|
|
||||||
#state{proj_store=ProjStore}) ->
|
|
||||||
machi_projection_store:get_latest_epochid(ProjStore, ProjType);
|
|
||||||
do_server_proj_request({read_latest_projection, ProjType},
|
|
||||||
#state{proj_store=ProjStore}) ->
|
|
||||||
machi_projection_store:read_latest_projection(ProjStore, ProjType);
|
|
||||||
do_server_proj_request({read_projection, ProjType, Epoch},
|
|
||||||
#state{proj_store=ProjStore}) ->
|
|
||||||
machi_projection_store:read(ProjStore, ProjType, Epoch);
|
|
||||||
do_server_proj_request({write_projection, ProjType, Proj},
|
|
||||||
#state{flu_name=FluName, proj_store=ProjStore}) ->
|
|
||||||
if Proj#projection_v1.epoch_number == ?SPAM_PROJ_EPOCH ->
|
|
||||||
%% io:format(user, "DBG ~s ~w ~P\n", [?MODULE, ?LINE, Proj, 5]),
|
|
||||||
Chmgr = machi_flu_psup:make_fitness_regname(FluName),
|
|
||||||
[Map] = Proj#projection_v1.dbg,
|
|
||||||
catch machi_fitness:send_fitness_update_spam(
|
|
||||||
Chmgr, Proj#projection_v1.author_server, Map);
|
|
||||||
true ->
|
|
||||||
catch machi_projection_store:write(ProjStore, ProjType, Proj)
|
|
||||||
end;
|
|
||||||
do_server_proj_request({get_all_projections, ProjType},
|
|
||||||
#state{proj_store=ProjStore}) ->
|
|
||||||
machi_projection_store:get_all_projections(ProjStore, ProjType);
|
|
||||||
do_server_proj_request({list_all_projections, ProjType},
|
|
||||||
#state{proj_store=ProjStore}) ->
|
|
||||||
machi_projection_store:list_all_projections(ProjStore, ProjType);
|
|
||||||
do_server_proj_request({kick_projection_reaction},
|
|
||||||
#state{flu_name=FluName}) ->
|
|
||||||
%% Tell my chain manager that it might want to react to
|
|
||||||
%% this new world.
|
|
||||||
Chmgr = machi_chain_manager1:make_chmgr_regname(FluName),
|
|
||||||
spawn(fun() ->
|
|
||||||
catch machi_chain_manager1:trigger_react_to_env(Chmgr)
|
|
||||||
end),
|
|
||||||
async_no_response.
|
|
||||||
|
|
||||||
do_server_append_chunk(CoC_Namespace, CoC_Locator,
|
|
||||||
Prefix, Chunk, CSum_tag, CSum,
|
|
||||||
ChunkExtra, S) ->
|
|
||||||
case sanitize_prefix(Prefix) of
|
|
||||||
ok ->
|
|
||||||
do_server_append_chunk2(CoC_Namespace, CoC_Locator,
|
|
||||||
Prefix, Chunk, CSum_tag, CSum,
|
|
||||||
ChunkExtra, S);
|
|
||||||
_ ->
|
|
||||||
{error, bad_arg}
|
|
||||||
end.
|
|
||||||
|
|
||||||
do_server_append_chunk2(CoC_Namespace, CoC_Locator,
|
|
||||||
Prefix, Chunk, CSum_tag, Client_CSum,
|
|
||||||
ChunkExtra, #state{flu_name=FluName,
|
|
||||||
epoch_id=EpochID}=_S) ->
|
|
||||||
%% TODO: Do anything with PKey?
|
|
||||||
try
|
|
||||||
TaggedCSum = check_or_make_tagged_checksum(CSum_tag, Client_CSum,Chunk),
|
|
||||||
R = {seq_append, self(), CoC_Namespace, CoC_Locator,
|
|
||||||
Prefix, Chunk, TaggedCSum, ChunkExtra, EpochID},
|
|
||||||
FluName ! R,
|
|
||||||
receive
|
|
||||||
{assignment, Offset, File} ->
|
|
||||||
Size = iolist_size(Chunk),
|
|
||||||
{ok, {Offset, Size, File}};
|
|
||||||
witness ->
|
|
||||||
{error, bad_arg};
|
|
||||||
wedged ->
|
|
||||||
{error, wedged}
|
|
||||||
after 10*1000 ->
|
|
||||||
{error, partition}
|
|
||||||
end
|
|
||||||
catch
|
|
||||||
throw:{bad_csum, _CS} ->
|
|
||||||
{error, bad_checksum};
|
|
||||||
error:badarg ->
|
|
||||||
error_logger:error_msg("Message send to ~p gave badarg, make certain server is running with correct registered name\n", [?MODULE]),
|
|
||||||
{error, bad_arg}
|
|
||||||
end.
|
|
||||||
|
|
||||||
do_server_write_chunk(File, Offset, Chunk, CSum_tag, CSum, #state{flu_name=FluName}) ->
|
|
||||||
case sanitize_file_string(File) of
|
|
||||||
ok ->
|
|
||||||
case machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}) of
|
|
||||||
{ok, Pid} ->
|
|
||||||
Meta = [{client_csum_tag, CSum_tag}, {client_csum, CSum}],
|
|
||||||
machi_file_proxy:write(Pid, Offset, Meta, Chunk);
|
|
||||||
{error, trimmed} = Error ->
|
|
||||||
Error
|
|
||||||
end;
|
|
||||||
_ ->
|
|
||||||
{error, bad_arg}
|
|
||||||
end.
|
|
||||||
|
|
||||||
do_server_read_chunk(File, Offset, Size, Opts, #state{flu_name=FluName})->
|
|
||||||
case sanitize_file_string(File) of
|
|
||||||
ok ->
|
|
||||||
case machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}) of
|
|
||||||
{ok, Pid} ->
|
|
||||||
case machi_file_proxy:read(Pid, Offset, Size, Opts) of
|
|
||||||
%% XXX FIXME
|
|
||||||
%% For now we are omiting the checksum data because it blows up
|
|
||||||
%% protobufs.
|
|
||||||
{ok, ChunksAndTrimmed} -> {ok, ChunksAndTrimmed};
|
|
||||||
Other -> Other
|
|
||||||
end;
|
|
||||||
{error, trimmed} = Error ->
|
|
||||||
Error
|
|
||||||
end;
|
|
||||||
_ ->
|
|
||||||
{error, bad_arg}
|
|
||||||
end.
|
|
||||||
|
|
||||||
do_server_trim_chunk(File, Offset, Size, TriggerGC, #state{flu_name=FluName}) ->
|
|
||||||
lager:debug("Hi there! I'm trimming this: ~s, (~p, ~p), ~p~n",
|
|
||||||
[File, Offset, Size, TriggerGC]),
|
|
||||||
case sanitize_file_string(File) of
|
|
||||||
ok ->
|
|
||||||
case machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}) of
|
|
||||||
{ok, Pid} ->
|
|
||||||
machi_file_proxy:trim(Pid, Offset, Size, TriggerGC);
|
|
||||||
{error, trimmed} = Trimmed ->
|
|
||||||
%% Should be returned back to (maybe) trigger repair
|
|
||||||
Trimmed
|
|
||||||
end;
|
|
||||||
_ ->
|
|
||||||
{error, bad_arg}
|
|
||||||
end.
|
|
||||||
|
|
||||||
do_server_checksum_listing(File, #state{flu_name=FluName, data_dir=DataDir}=_S) ->
|
|
||||||
case sanitize_file_string(File) of
|
|
||||||
ok ->
|
|
||||||
case machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}) of
|
|
||||||
{ok, Pid} ->
|
|
||||||
{ok, List} = machi_file_proxy:checksum_list(Pid),
|
|
||||||
Bin = erlang:term_to_binary(List),
|
|
||||||
if byte_size(Bin) > (?PB_MAX_MSG_SIZE - 1024) ->
|
|
||||||
%% TODO: Fix this limitation by streaming the
|
|
||||||
%% binary in multiple smaller PB messages.
|
|
||||||
%% Also, don't read the file all at once. ^_^
|
|
||||||
error_logger:error_msg("~s:~w oversize ~s\n",
|
|
||||||
[?MODULE, ?LINE, DataDir]),
|
|
||||||
{error, bad_arg};
|
|
||||||
true ->
|
|
||||||
{ok, Bin}
|
|
||||||
end;
|
|
||||||
{error, trimmed} ->
|
|
||||||
{error, trimmed}
|
|
||||||
end;
|
|
||||||
_ ->
|
|
||||||
{error, bad_arg}
|
|
||||||
end.
|
|
||||||
|
|
||||||
do_server_list_files(#state{data_dir=DataDir}=_S) ->
|
|
||||||
{_, WildPath} = machi_util:make_data_filename(DataDir, ""),
|
|
||||||
Files = filelib:wildcard("*", WildPath),
|
|
||||||
{ok, [begin
|
|
||||||
{ok, FI} = file:read_file_info(WildPath ++ "/" ++ File),
|
|
||||||
Size = FI#file_info.size,
|
|
||||||
{Size, File}
|
|
||||||
end || File <- Files]}.
|
|
||||||
|
|
||||||
do_server_wedge_status(S) ->
|
|
||||||
{Wedged_p, CurrentEpochID0} = ets:lookup_element(S#state.etstab, epoch, 2),
|
|
||||||
CurrentEpochID = if CurrentEpochID0 == undefined ->
|
|
||||||
?DUMMY_PV1_EPOCH;
|
|
||||||
true ->
|
|
||||||
CurrentEpochID0
|
|
||||||
end,
|
|
||||||
{Wedged_p, CurrentEpochID}.
|
|
||||||
|
|
||||||
do_server_delete_migration(File, #state{data_dir=DataDir}=_S) ->
|
|
||||||
case sanitize_file_string(File) of
|
|
||||||
ok ->
|
|
||||||
{_, Path} = machi_util:make_data_filename(DataDir, File),
|
|
||||||
case file:delete(Path) of
|
|
||||||
ok ->
|
|
||||||
ok;
|
|
||||||
{error, enoent} ->
|
|
||||||
{error, no_such_file};
|
|
||||||
_ ->
|
|
||||||
{error, bad_arg}
|
|
||||||
end;
|
|
||||||
_ ->
|
|
||||||
{error, bad_arg}
|
|
||||||
end.
|
|
||||||
|
|
||||||
do_server_trunc_hack(File, #state{data_dir=DataDir}=_S) ->
|
|
||||||
case sanitize_file_string(File) of
|
|
||||||
ok ->
|
|
||||||
{_, Path} = machi_util:make_data_filename(DataDir, File),
|
|
||||||
case file:open(Path, [read, write, binary, raw]) of
|
|
||||||
{ok, FH} ->
|
|
||||||
try
|
|
||||||
{ok, ?MINIMUM_OFFSET} = file:position(FH,
|
|
||||||
?MINIMUM_OFFSET),
|
|
||||||
ok = file:truncate(FH),
|
|
||||||
ok
|
|
||||||
after
|
|
||||||
file:close(FH)
|
|
||||||
end;
|
|
||||||
{error, enoent} ->
|
|
||||||
{error, no_such_file};
|
|
||||||
_ ->
|
|
||||||
{error, bad_arg}
|
|
||||||
end;
|
|
||||||
_ ->
|
|
||||||
{error, bad_arg}
|
|
||||||
end.
|
|
||||||
|
|
||||||
append_server_dispatch(From, CoC_Namespace, CoC_Locator,
|
append_server_dispatch(From, CoC_Namespace, CoC_Locator,
|
||||||
Prefix, Chunk, CSum, Extra, FluName, EpochId) ->
|
Prefix, Chunk, CSum, Extra, FluName, EpochId) ->
|
||||||
Result = case handle_append(CoC_Namespace, CoC_Locator,
|
Result = case handle_append(CoC_Namespace, CoC_Locator,
|
||||||
|
@ -717,32 +306,6 @@ handle_append(CoC_Namespace, CoC_Locator,
|
||||||
Error
|
Error
|
||||||
end.
|
end.
|
||||||
|
|
||||||
sanitize_file_string(Str) ->
|
|
||||||
case has_no_prohibited_chars(Str) andalso machi_util:is_valid_filename(Str) of
|
|
||||||
true -> ok;
|
|
||||||
false -> error
|
|
||||||
end.
|
|
||||||
|
|
||||||
has_no_prohibited_chars(Str) ->
|
|
||||||
case re:run(Str, "/") of
|
|
||||||
nomatch ->
|
|
||||||
true;
|
|
||||||
_ ->
|
|
||||||
true
|
|
||||||
end.
|
|
||||||
|
|
||||||
sanitize_prefix(Prefix) ->
|
|
||||||
%% We are using '^' as our component delimiter
|
|
||||||
case re:run(Prefix, "/|\\^") of
|
|
||||||
nomatch ->
|
|
||||||
ok;
|
|
||||||
_ ->
|
|
||||||
error
|
|
||||||
end.
|
|
||||||
|
|
||||||
make_listener_regname(BaseName) ->
|
|
||||||
list_to_atom(atom_to_list(BaseName) ++ "_listener").
|
|
||||||
|
|
||||||
%% This is the name of the projection store that is spawned by the
|
%% This is the name of the projection store that is spawned by the
|
||||||
%% *flu*, for use primarily in testing scenarios. In normal use, we
|
%% *flu*, for use primarily in testing scenarios. In normal use, we
|
||||||
%% ought to be using the OTP style of managing processes, via
|
%% ought to be using the OTP style of managing processes, via
|
||||||
|
@ -753,26 +316,6 @@ make_listener_regname(BaseName) ->
|
||||||
make_projection_server_regname(BaseName) ->
|
make_projection_server_regname(BaseName) ->
|
||||||
list_to_atom(atom_to_list(BaseName) ++ "_pstore").
|
list_to_atom(atom_to_list(BaseName) ++ "_pstore").
|
||||||
|
|
||||||
check_or_make_tagged_checksum(?CSUM_TAG_NONE, _Client_CSum, Chunk) ->
|
|
||||||
%% TODO: If the client was foolish enough to use
|
|
||||||
%% this type of non-checksum, then the client gets
|
|
||||||
%% what it deserves wrt data integrity, alas. In
|
|
||||||
%% the client-side Chain Replication method, each
|
|
||||||
%% server will calculated this independently, which
|
|
||||||
%% isn't exactly what ought to happen for best data
|
|
||||||
%% integrity checking. In server-side CR, the csum
|
|
||||||
%% should be calculated by the head and passed down
|
|
||||||
%% the chain together with the value.
|
|
||||||
CS = machi_util:checksum_chunk(Chunk),
|
|
||||||
machi_util:make_tagged_csum(server_sha, CS);
|
|
||||||
check_or_make_tagged_checksum(?CSUM_TAG_CLIENT_SHA, Client_CSum, Chunk) ->
|
|
||||||
CS = machi_util:checksum_chunk(Chunk),
|
|
||||||
if CS == Client_CSum ->
|
|
||||||
machi_util:make_tagged_csum(server_sha,
|
|
||||||
Client_CSum);
|
|
||||||
true ->
|
|
||||||
throw({bad_csum, CS})
|
|
||||||
end.
|
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
|
|
||||||
|
|
603
src/machi_flu1_net_server.erl
Normal file
603
src/machi_flu1_net_server.erl
Normal file
|
@ -0,0 +1,603 @@
|
||||||
|
%% -------------------------------------------------------------------
|
||||||
|
%%
|
||||||
|
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% This file is provided to you under the Apache License,
|
||||||
|
%% Version 2.0 (the "License"); you may not use this file
|
||||||
|
%% except in compliance with the License. You may obtain
|
||||||
|
%% a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing,
|
||||||
|
%% software distributed under the License is distributed on an
|
||||||
|
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
%% KIND, either express or implied. See the License for the
|
||||||
|
%% specific language governing permissions and limitations
|
||||||
|
%% under the License.
|
||||||
|
%%
|
||||||
|
%% -------------------------------------------------------------------
|
||||||
|
|
||||||
|
%% @doc Ranch protocol callback module to handle PB protocol over
|
||||||
|
%% transport, including both high and low modes.
|
||||||
|
|
||||||
|
%% TODO
|
||||||
|
%% - Two modes, high and low should be separated at listener level?
|
||||||
|
|
||||||
|
-module(machi_flu1_net_server).
|
||||||
|
|
||||||
|
-behaviour(gen_server).
|
||||||
|
-behaviour(ranch_protocol).
|
||||||
|
|
||||||
|
-export([start_link/4]).
|
||||||
|
-export([init/1]).
|
||||||
|
-export([handle_call/3, handle_cast/2, handle_info/2,
|
||||||
|
terminate/2, code_change/3]).
|
||||||
|
|
||||||
|
-include_lib("kernel/include/file.hrl").
|
||||||
|
|
||||||
|
-include("machi.hrl").
|
||||||
|
-include("machi_pb.hrl").
|
||||||
|
-include("machi_projection.hrl").
|
||||||
|
|
||||||
|
-ifdef(TEST).
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
-endif. % TEST
|
||||||
|
|
||||||
|
-record(state, {
|
||||||
|
%% Ranch's transport management stuff
|
||||||
|
ref :: ranch:ref(),
|
||||||
|
socket :: socket(),
|
||||||
|
transport :: module(),
|
||||||
|
|
||||||
|
%% Machi FLU configurations, common for low and high
|
||||||
|
data_dir :: string(),
|
||||||
|
witness :: boolean(),
|
||||||
|
pb_mode :: undefined | high | low,
|
||||||
|
%% - Used in projection related requests in low mode
|
||||||
|
%% - Used in spawning CR client in high mode
|
||||||
|
proj_store :: pid(),
|
||||||
|
|
||||||
|
%% Low mode only items
|
||||||
|
%% Current best knowledge, used for wedge_self / bad_epoch check
|
||||||
|
epoch_id :: undefined | machi_dt:epoch_id(),
|
||||||
|
%% Used in dispatching append_chunk* reqs to the
|
||||||
|
%% append serializing process
|
||||||
|
flu_name :: pv1_server(),
|
||||||
|
%% Used in server_wedge_status to lookup the table
|
||||||
|
epoch_tab :: ets:tab(),
|
||||||
|
|
||||||
|
%% High mode only
|
||||||
|
high_clnt :: pid(),
|
||||||
|
|
||||||
|
%% anything you want
|
||||||
|
props = [] :: list() % proplist
|
||||||
|
}).
|
||||||
|
|
||||||
|
-type socket() :: any().
|
||||||
|
-type state() :: #state{}.
|
||||||
|
|
||||||
|
-spec start_link(ranch:ref(), socket(), module(), [term()]) -> {ok, pid()}.
|
||||||
|
start_link(Ref, Socket, Transport, [FluName, Witness, DataDir, EpochTab, ProjStore]) ->
|
||||||
|
proc_lib:start_link(?MODULE, init, [#state{ref=Ref,
|
||||||
|
socket=Socket,
|
||||||
|
transport=Transport,
|
||||||
|
flu_name=FluName,
|
||||||
|
witness=Witness,
|
||||||
|
data_dir=DataDir,
|
||||||
|
epoch_tab=EpochTab,
|
||||||
|
proj_store=ProjStore}]).
|
||||||
|
|
||||||
|
-spec init(state()) -> no_return().
|
||||||
|
init(#state{ref=Ref, socket=Socket, transport=Transport}=State) ->
|
||||||
|
ok = proc_lib:init_ack({ok, self()}),
|
||||||
|
ok = ranch:accept_ack(Ref),
|
||||||
|
{_Wedged_p, CurrentEpochID} = lookup_epoch(State),
|
||||||
|
ok = Transport:setopts(Socket, [{active, once}|?PB_PACKET_OPTS]),
|
||||||
|
gen_server:enter_loop(?MODULE, [], State#state{epoch_id=CurrentEpochID}).
|
||||||
|
|
||||||
|
handle_call(Request, _From, S) ->
|
||||||
|
lager:warning("~s:handle_call UNKNOWN message: ~w", [?MODULE, Request]),
|
||||||
|
Reply = {error, {unknown_message, Request}},
|
||||||
|
{reply, Reply, S}.
|
||||||
|
|
||||||
|
handle_cast(_Msg, S) ->
|
||||||
|
lager:warning("~s:handle_cast UNKNOWN message: ~w", [?MODULE, _Msg]),
|
||||||
|
{noreply, S}.
|
||||||
|
|
||||||
|
%% TODO: Other transport support needed?? TLS/SSL, SCTP
|
||||||
|
handle_info({tcp, Socket, Data}=_Info, #state{socket=Socket}=S) ->
|
||||||
|
lager:debug("~s:handle_info: ~w", [?MODULE, _Info]),
|
||||||
|
transport_received(Socket, Data, S);
|
||||||
|
handle_info({tcp_closed, Socket}=_Info, #state{socket=Socket}=S) ->
|
||||||
|
lager:debug("~s:handle_info: ~w", [?MODULE, _Info]),
|
||||||
|
transport_closed(Socket, S);
|
||||||
|
handle_info({tcp_error, Socket, Reason}=_Info, #state{socket=Socket}=S) ->
|
||||||
|
lager:warning("~s:handle_info (socket=~w) tcp_error: ~w", [?MODULE, Socket, Reason]),
|
||||||
|
transport_error(Socket, Reason, S);
|
||||||
|
handle_info(_Info, S) ->
|
||||||
|
lager:warning("~s:handle_info UNKNOWN message: ~w", [?MODULE, _Info]),
|
||||||
|
{noreply, S}.
|
||||||
|
|
||||||
|
terminate(normal, #state{socket=undefined}=_S) ->
|
||||||
|
ok;
|
||||||
|
terminate(Reason, #state{socket=undefined}=_S) ->
|
||||||
|
lager:warning("~s:terminate (socket=undefined): ~w", [?MODULE, Reason]),
|
||||||
|
ok;
|
||||||
|
terminate(normal, #state{socket=Socket}=_S) ->
|
||||||
|
(catch gen_tcp:close(Socket)),
|
||||||
|
ok;
|
||||||
|
terminate(Reason, #state{socket=Socket}=_S) ->
|
||||||
|
lager:warning("~s:terminate (socket=Socket): ~w", [?MODULE, Reason]),
|
||||||
|
(catch gen_tcp:close(Socket)),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
code_change(_OldVsn, S, _Extra) ->
|
||||||
|
{ok, S}.
|
||||||
|
|
||||||
|
%% -- private
|
||||||
|
|
||||||
|
%%%% Common transport handling
|
||||||
|
|
||||||
|
-spec transport_received(socket(), machi_dt:chunk(), state()) ->
|
||||||
|
{noreply, state()}.
|
||||||
|
transport_received(Socket, <<"QUIT\n">>, #state{socket=Socket}=S) ->
|
||||||
|
{stop, normal, S};
|
||||||
|
transport_received(Socket, Bin, #state{transport=Transport}=S) ->
|
||||||
|
{RespBin, S2} =
|
||||||
|
case machi_pb:decode_mpb_ll_request(Bin) of
|
||||||
|
LL_req when LL_req#mpb_ll_request.do_not_alter == 2 ->
|
||||||
|
{R, NewS} = do_pb_ll_request(LL_req, S),
|
||||||
|
{maybe_encode_response(R), set_mode(low, NewS)};
|
||||||
|
_ ->
|
||||||
|
HL_req = machi_pb:decode_mpb_request(Bin),
|
||||||
|
1 = HL_req#mpb_request.do_not_alter,
|
||||||
|
{R, NewS} = do_pb_hl_request(HL_req, make_high_clnt(S)),
|
||||||
|
{machi_pb:encode_mpb_response(R), set_mode(high, NewS)}
|
||||||
|
end,
|
||||||
|
case RespBin of
|
||||||
|
async_no_response ->
|
||||||
|
Transport:setopts(Socket, [{active, once}]),
|
||||||
|
{noreply, S2};
|
||||||
|
_ ->
|
||||||
|
case Transport:send(Socket, RespBin) of
|
||||||
|
ok ->
|
||||||
|
Transport:setopts(Socket, [{active, once}]),
|
||||||
|
{noreply, S2};
|
||||||
|
{error, Reason} ->
|
||||||
|
transport_error(Socket, Reason, S2)
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec transport_closed(socket(), state()) -> {stop, term(), state()}.
|
||||||
|
transport_closed(_Socket, S) ->
|
||||||
|
{stop, normal, S}.
|
||||||
|
|
||||||
|
-spec transport_error(socket(), term(), state()) -> no_return().
|
||||||
|
transport_error(Socket, Reason, #state{transport=Transport}=_S) ->
|
||||||
|
Msg = io_lib:format("Socket error ~w", [Reason]),
|
||||||
|
R = #mpb_ll_response{req_id= <<>>,
|
||||||
|
generic=#mpb_errorresp{code=1, msg=Msg}},
|
||||||
|
_Resp = machi_pb:encode_mpb_ll_response(R),
|
||||||
|
%% TODO for TODO comments: comments below with four %s are copy-n-paste'd,
|
||||||
|
%% then it should be considered they are still open and should be addressed.
|
||||||
|
%%%% TODO: Weird that sometimes neither catch nor try/catch
|
||||||
|
%%%% can prevent OTP's SASL from logging an error here.
|
||||||
|
%%%% Error in process <0.545.0> with exit value: {badarg,[{erlang,port_command,.......
|
||||||
|
%%%% TODO: is this what causes the intermittent PULSE deadlock errors?
|
||||||
|
%%%% _ = (catch gen_tcp:send(Sock, _Resp)), timer:sleep(1000),
|
||||||
|
(catch Transport:close(Socket)),
|
||||||
|
_ = lager:warning("Socket error (~w -> ~w): ~w",
|
||||||
|
[Transport:sockname(Socket), Transport:peername(Socket), Reason]),
|
||||||
|
%% TODO: better to exit with `Reason' without logging?
|
||||||
|
exit(normal).
|
||||||
|
|
||||||
|
maybe_encode_response(async_no_response=R) ->
|
||||||
|
R;
|
||||||
|
maybe_encode_response(R) ->
|
||||||
|
machi_pb:encode_mpb_ll_response(R).
|
||||||
|
|
||||||
|
set_mode(Mode, #state{pb_mode=undefined}=S) ->
|
||||||
|
S#state{pb_mode=Mode};
|
||||||
|
set_mode(_, S) ->
|
||||||
|
S.
|
||||||
|
|
||||||
|
%%%% Low PB mode %%%%
|
||||||
|
|
||||||
|
do_pb_ll_request(#mpb_ll_request{req_id=ReqID}, #state{pb_mode=high}=S) ->
|
||||||
|
Result = {high_error, 41, "Low protocol request while in high mode"},
|
||||||
|
{machi_pb_translate:to_pb_response(ReqID, unused, Result), S};
|
||||||
|
do_pb_ll_request(PB_request, S) ->
|
||||||
|
Req = machi_pb_translate:from_pb_request(PB_request),
|
||||||
|
%% io:format(user, "[~w] do_pb_ll_request Req: ~w~n", [S#state.flu_name, Req]),
|
||||||
|
{ReqID, Cmd, Result, S2} =
|
||||||
|
case Req of
|
||||||
|
{RqID, {LowCmd, _}=Cmd0}
|
||||||
|
when LowCmd =:= low_proj;
|
||||||
|
LowCmd =:= low_wedge_status;
|
||||||
|
LowCmd =:= low_list_files ->
|
||||||
|
%% Skip wedge check for these unprivileged commands
|
||||||
|
{Rs, NewS} = do_pb_ll_request3(Cmd0, S),
|
||||||
|
{RqID, Cmd0, Rs, NewS};
|
||||||
|
{RqID, Cmd0} ->
|
||||||
|
EpochID = element(2, Cmd0), % by common convention
|
||||||
|
{Rs, NewS} = do_pb_ll_request2(EpochID, Cmd0, S),
|
||||||
|
{RqID, Cmd0, Rs, NewS}
|
||||||
|
end,
|
||||||
|
{machi_pb_translate:to_pb_response(ReqID, Cmd, Result), S2}.
|
||||||
|
|
||||||
|
do_pb_ll_request2(EpochID, CMD, S) ->
|
||||||
|
{Wedged_p, CurrentEpochID} = lookup_epoch(S),
|
||||||
|
%% io:format(user, "{Wedged_p, CurrentEpochID}: ~w~n", [{Wedged_p, CurrentEpochID}]),
|
||||||
|
if Wedged_p == true ->
|
||||||
|
{{error, wedged}, S#state{epoch_id=CurrentEpochID}};
|
||||||
|
is_tuple(EpochID)
|
||||||
|
andalso
|
||||||
|
EpochID /= CurrentEpochID ->
|
||||||
|
{Epoch, _} = EpochID,
|
||||||
|
{CurrentEpoch, _} = CurrentEpochID,
|
||||||
|
if Epoch < CurrentEpoch ->
|
||||||
|
ok;
|
||||||
|
true ->
|
||||||
|
%% We're at same epoch # but different checksum, or
|
||||||
|
%% we're at a newer/bigger epoch #.
|
||||||
|
_ = machi_flu1:wedge_myself(S#state.flu_name, CurrentEpochID),
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
{{error, bad_epoch}, S#state{epoch_id=CurrentEpochID}};
|
||||||
|
true ->
|
||||||
|
do_pb_ll_request3(CMD, S#state{epoch_id=CurrentEpochID})
|
||||||
|
end.
|
||||||
|
|
||||||
|
lookup_epoch(#state{epoch_tab=T}) ->
|
||||||
|
%% TODO: race in shutdown to access ets table after owner dies
|
||||||
|
ets:lookup_element(T, epoch, 2).
|
||||||
|
|
||||||
|
%% Witness status does not matter below.
|
||||||
|
do_pb_ll_request3({low_echo, _BogusEpochID, Msg}, S) ->
|
||||||
|
{Msg, S};
|
||||||
|
do_pb_ll_request3({low_auth, _BogusEpochID, _User, _Pass}, S) ->
|
||||||
|
{-6, S};
|
||||||
|
do_pb_ll_request3({low_wedge_status, _EpochID}, S) ->
|
||||||
|
{do_server_wedge_status(S), S};
|
||||||
|
do_pb_ll_request3({low_proj, PCMD}, S) ->
|
||||||
|
{do_server_proj_request(PCMD, S), S};
|
||||||
|
|
||||||
|
%% Witness status *matters* below
|
||||||
|
do_pb_ll_request3({low_append_chunk, _EpochID, CoC_Namespace, CoC_Locator,
|
||||||
|
Prefix, Chunk, CSum_tag,
|
||||||
|
CSum, ChunkExtra},
|
||||||
|
#state{witness=false}=S) ->
|
||||||
|
{do_server_append_chunk(CoC_Namespace, CoC_Locator,
|
||||||
|
Prefix, Chunk, CSum_tag, CSum,
|
||||||
|
ChunkExtra, S), S};
|
||||||
|
do_pb_ll_request3({low_write_chunk, _EpochID, File, Offset, Chunk, CSum_tag,
|
||||||
|
CSum},
|
||||||
|
#state{witness=false}=S) ->
|
||||||
|
{do_server_write_chunk(File, Offset, Chunk, CSum_tag, CSum, S), S};
|
||||||
|
do_pb_ll_request3({low_read_chunk, _EpochID, File, Offset, Size, Opts},
|
||||||
|
#state{witness=false} = S) ->
|
||||||
|
{do_server_read_chunk(File, Offset, Size, Opts, S), S};
|
||||||
|
do_pb_ll_request3({low_trim_chunk, _EpochID, File, Offset, Size, TriggerGC},
|
||||||
|
#state{witness=false}=S) ->
|
||||||
|
{do_server_trim_chunk(File, Offset, Size, TriggerGC, S), S};
|
||||||
|
do_pb_ll_request3({low_checksum_list, _EpochID, File},
|
||||||
|
#state{witness=false}=S) ->
|
||||||
|
{do_server_checksum_listing(File, S), S};
|
||||||
|
do_pb_ll_request3({low_list_files, _EpochID},
|
||||||
|
#state{witness=false}=S) ->
|
||||||
|
{do_server_list_files(S), S};
|
||||||
|
do_pb_ll_request3({low_delete_migration, _EpochID, File},
|
||||||
|
#state{witness=false}=S) ->
|
||||||
|
{do_server_delete_migration(File, S),
|
||||||
|
#state{witness=false}=S};
|
||||||
|
do_pb_ll_request3({low_trunc_hack, _EpochID, File},
|
||||||
|
#state{witness=false}=S) ->
|
||||||
|
{do_server_trunc_hack(File, S), S};
|
||||||
|
|
||||||
|
do_pb_ll_request3(_, #state{witness=true}=S) ->
|
||||||
|
{{error, bad_arg}, S}. % TODO: new status code??
|
||||||
|
|
||||||
|
do_server_proj_request({get_latest_epochid, ProjType},
|
||||||
|
#state{proj_store=ProjStore}) ->
|
||||||
|
machi_projection_store:get_latest_epochid(ProjStore, ProjType);
|
||||||
|
do_server_proj_request({read_latest_projection, ProjType},
|
||||||
|
#state{proj_store=ProjStore}) ->
|
||||||
|
machi_projection_store:read_latest_projection(ProjStore, ProjType);
|
||||||
|
do_server_proj_request({read_projection, ProjType, Epoch},
|
||||||
|
#state{proj_store=ProjStore}) ->
|
||||||
|
machi_projection_store:read(ProjStore, ProjType, Epoch);
|
||||||
|
do_server_proj_request({write_projection, ProjType, Proj},
|
||||||
|
#state{flu_name=FluName, proj_store=ProjStore}) ->
|
||||||
|
if Proj#projection_v1.epoch_number == ?SPAM_PROJ_EPOCH ->
|
||||||
|
%% io:format(user, "DBG ~s ~w ~P\n", [?MODULE, ?LINE, Proj, 5]),
|
||||||
|
Chmgr = machi_flu_psup:make_fitness_regname(FluName),
|
||||||
|
[Map] = Proj#projection_v1.dbg,
|
||||||
|
catch machi_fitness:send_fitness_update_spam(
|
||||||
|
Chmgr, Proj#projection_v1.author_server, Map);
|
||||||
|
true ->
|
||||||
|
catch machi_projection_store:write(ProjStore, ProjType, Proj)
|
||||||
|
end;
|
||||||
|
do_server_proj_request({get_all_projections, ProjType},
|
||||||
|
#state{proj_store=ProjStore}) ->
|
||||||
|
machi_projection_store:get_all_projections(ProjStore, ProjType);
|
||||||
|
do_server_proj_request({list_all_projections, ProjType},
|
||||||
|
#state{proj_store=ProjStore}) ->
|
||||||
|
machi_projection_store:list_all_projections(ProjStore, ProjType);
|
||||||
|
do_server_proj_request({kick_projection_reaction},
|
||||||
|
#state{flu_name=FluName}) ->
|
||||||
|
%% Tell my chain manager that it might want to react to
|
||||||
|
%% this new world.
|
||||||
|
Chmgr = machi_chain_manager1:make_chmgr_regname(FluName),
|
||||||
|
spawn(fun() ->
|
||||||
|
catch machi_chain_manager1:trigger_react_to_env(Chmgr)
|
||||||
|
end),
|
||||||
|
async_no_response.
|
||||||
|
|
||||||
|
do_server_append_chunk(CoC_Namespace, CoC_Locator,
|
||||||
|
Prefix, Chunk, CSum_tag, CSum,
|
||||||
|
ChunkExtra, S) ->
|
||||||
|
case sanitize_prefix(Prefix) of
|
||||||
|
ok ->
|
||||||
|
do_server_append_chunk2(CoC_Namespace, CoC_Locator,
|
||||||
|
Prefix, Chunk, CSum_tag, CSum,
|
||||||
|
ChunkExtra, S);
|
||||||
|
_ ->
|
||||||
|
{error, bad_arg}
|
||||||
|
end.
|
||||||
|
|
||||||
|
do_server_append_chunk2(CoC_Namespace, CoC_Locator,
|
||||||
|
Prefix, Chunk, CSum_tag, Client_CSum,
|
||||||
|
ChunkExtra, #state{flu_name=FluName,
|
||||||
|
epoch_id=EpochID}=_S) ->
|
||||||
|
%% TODO: Do anything with PKey?
|
||||||
|
try
|
||||||
|
TaggedCSum = check_or_make_tagged_checksum(CSum_tag, Client_CSum,Chunk),
|
||||||
|
R = {seq_append, self(), CoC_Namespace, CoC_Locator,
|
||||||
|
Prefix, Chunk, TaggedCSum, ChunkExtra, EpochID},
|
||||||
|
FluName ! R,
|
||||||
|
receive
|
||||||
|
{assignment, Offset, File} ->
|
||||||
|
Size = iolist_size(Chunk),
|
||||||
|
{ok, {Offset, Size, File}};
|
||||||
|
witness ->
|
||||||
|
{error, bad_arg};
|
||||||
|
wedged ->
|
||||||
|
{error, wedged}
|
||||||
|
after 10*1000 ->
|
||||||
|
{error, partition}
|
||||||
|
end
|
||||||
|
catch
|
||||||
|
throw:{bad_csum, _CS} ->
|
||||||
|
{error, bad_checksum};
|
||||||
|
error:badarg ->
|
||||||
|
lager:error("badarg at ~w:do_server_append_chunk2:~w ~w",
|
||||||
|
[?MODULE, ?LINE, erlang:get_stacktrace()]),
|
||||||
|
{error, bad_arg}
|
||||||
|
end.
|
||||||
|
|
||||||
|
do_server_write_chunk(File, Offset, Chunk, CSum_tag, CSum, #state{flu_name=FluName}) ->
|
||||||
|
case sanitize_file_string(File) of
|
||||||
|
ok ->
|
||||||
|
case machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}) of
|
||||||
|
{ok, Pid} ->
|
||||||
|
Meta = [{client_csum_tag, CSum_tag}, {client_csum, CSum}],
|
||||||
|
machi_file_proxy:write(Pid, Offset, Meta, Chunk);
|
||||||
|
{error, trimmed} = Error ->
|
||||||
|
Error
|
||||||
|
end;
|
||||||
|
_ ->
|
||||||
|
{error, bad_arg}
|
||||||
|
end.
|
||||||
|
|
||||||
|
do_server_read_chunk(File, Offset, Size, Opts, #state{flu_name=FluName})->
|
||||||
|
case sanitize_file_string(File) of
|
||||||
|
ok ->
|
||||||
|
case machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}) of
|
||||||
|
{ok, Pid} ->
|
||||||
|
case machi_file_proxy:read(Pid, Offset, Size, Opts) of
|
||||||
|
%% XXX FIXME
|
||||||
|
%% For now we are omiting the checksum data because it blows up
|
||||||
|
%% protobufs.
|
||||||
|
{ok, ChunksAndTrimmed} -> {ok, ChunksAndTrimmed};
|
||||||
|
Other -> Other
|
||||||
|
end;
|
||||||
|
{error, trimmed} = Error ->
|
||||||
|
Error
|
||||||
|
end;
|
||||||
|
_ ->
|
||||||
|
{error, bad_arg}
|
||||||
|
end.
|
||||||
|
|
||||||
|
do_server_trim_chunk(File, Offset, Size, TriggerGC, #state{flu_name=FluName}) ->
|
||||||
|
lager:debug("Hi there! I'm trimming this: ~s, (~p, ~p), ~p~n",
|
||||||
|
[File, Offset, Size, TriggerGC]),
|
||||||
|
case sanitize_file_string(File) of
|
||||||
|
ok ->
|
||||||
|
case machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}) of
|
||||||
|
{ok, Pid} ->
|
||||||
|
machi_file_proxy:trim(Pid, Offset, Size, TriggerGC);
|
||||||
|
{error, trimmed} = Trimmed ->
|
||||||
|
%% Should be returned back to (maybe) trigger repair
|
||||||
|
Trimmed
|
||||||
|
end;
|
||||||
|
_ ->
|
||||||
|
{error, bad_arg}
|
||||||
|
end.
|
||||||
|
|
||||||
|
do_server_checksum_listing(File, #state{flu_name=FluName, data_dir=DataDir}=_S) ->
|
||||||
|
case sanitize_file_string(File) of
|
||||||
|
ok ->
|
||||||
|
case machi_flu_metadata_mgr:start_proxy_pid(FluName, {file, File}) of
|
||||||
|
{ok, Pid} ->
|
||||||
|
{ok, List} = machi_file_proxy:checksum_list(Pid),
|
||||||
|
Bin = erlang:term_to_binary(List),
|
||||||
|
if byte_size(Bin) > (?PB_MAX_MSG_SIZE - 1024) ->
|
||||||
|
%% TODO: Fix this limitation by streaming the
|
||||||
|
%% binary in multiple smaller PB messages.
|
||||||
|
%% Also, don't read the file all at once. ^_^
|
||||||
|
error_logger:error_msg("~s:~w oversize ~s\n",
|
||||||
|
[?MODULE, ?LINE, DataDir]),
|
||||||
|
{error, bad_arg};
|
||||||
|
true ->
|
||||||
|
{ok, Bin}
|
||||||
|
end;
|
||||||
|
{error, trimmed} ->
|
||||||
|
{error, trimmed}
|
||||||
|
end;
|
||||||
|
_ ->
|
||||||
|
{error, bad_arg}
|
||||||
|
end.
|
||||||
|
|
||||||
|
do_server_list_files(#state{data_dir=DataDir}=_S) ->
|
||||||
|
{_, WildPath} = machi_util:make_data_filename(DataDir, ""),
|
||||||
|
Files = filelib:wildcard("*", WildPath),
|
||||||
|
{ok, [begin
|
||||||
|
{ok, FI} = file:read_file_info(WildPath ++ "/" ++ File),
|
||||||
|
Size = FI#file_info.size,
|
||||||
|
{Size, File}
|
||||||
|
end || File <- Files]}.
|
||||||
|
|
||||||
|
do_server_wedge_status(S) ->
|
||||||
|
{Wedged_p, CurrentEpochID0} = lookup_epoch(S),
|
||||||
|
CurrentEpochID = if CurrentEpochID0 == undefined ->
|
||||||
|
?DUMMY_PV1_EPOCH;
|
||||||
|
true ->
|
||||||
|
CurrentEpochID0
|
||||||
|
end,
|
||||||
|
{Wedged_p, CurrentEpochID}.
|
||||||
|
|
||||||
|
do_server_delete_migration(File, #state{data_dir=DataDir}=_S) ->
|
||||||
|
case sanitize_file_string(File) of
|
||||||
|
ok ->
|
||||||
|
{_, Path} = machi_util:make_data_filename(DataDir, File),
|
||||||
|
case file:delete(Path) of
|
||||||
|
ok ->
|
||||||
|
ok;
|
||||||
|
{error, enoent} ->
|
||||||
|
{error, no_such_file};
|
||||||
|
_ ->
|
||||||
|
{error, bad_arg}
|
||||||
|
end;
|
||||||
|
_ ->
|
||||||
|
{error, bad_arg}
|
||||||
|
end.
|
||||||
|
|
||||||
|
do_server_trunc_hack(File, #state{data_dir=DataDir}=_S) ->
|
||||||
|
case sanitize_file_string(File) of
|
||||||
|
ok ->
|
||||||
|
{_, Path} = machi_util:make_data_filename(DataDir, File),
|
||||||
|
case file:open(Path, [read, write, binary, raw]) of
|
||||||
|
{ok, FH} ->
|
||||||
|
try
|
||||||
|
{ok, ?MINIMUM_OFFSET} = file:position(FH,
|
||||||
|
?MINIMUM_OFFSET),
|
||||||
|
ok = file:truncate(FH),
|
||||||
|
ok
|
||||||
|
after
|
||||||
|
file:close(FH)
|
||||||
|
end;
|
||||||
|
{error, enoent} ->
|
||||||
|
{error, no_such_file};
|
||||||
|
_ ->
|
||||||
|
{error, bad_arg}
|
||||||
|
end;
|
||||||
|
_ ->
|
||||||
|
{error, bad_arg}
|
||||||
|
end.
|
||||||
|
|
||||||
|
sanitize_file_string(Str) ->
|
||||||
|
case has_no_prohibited_chars(Str) andalso machi_util:is_valid_filename(Str) of
|
||||||
|
true -> ok;
|
||||||
|
false -> error
|
||||||
|
end.
|
||||||
|
|
||||||
|
has_no_prohibited_chars(Str) ->
|
||||||
|
case re:run(Str, "/") of
|
||||||
|
nomatch ->
|
||||||
|
true;
|
||||||
|
_ ->
|
||||||
|
true
|
||||||
|
end.
|
||||||
|
|
||||||
|
sanitize_prefix(Prefix) ->
|
||||||
|
%% We are using '^' as our component delimiter
|
||||||
|
case re:run(Prefix, "/|\\^") of
|
||||||
|
nomatch ->
|
||||||
|
ok;
|
||||||
|
_ ->
|
||||||
|
error
|
||||||
|
end.
|
||||||
|
|
||||||
|
check_or_make_tagged_checksum(?CSUM_TAG_NONE, _Client_CSum, Chunk) ->
|
||||||
|
%% TODO: If the client was foolish enough to use
|
||||||
|
%% this type of non-checksum, then the client gets
|
||||||
|
%% what it deserves wrt data integrity, alas. In
|
||||||
|
%% the client-side Chain Replication method, each
|
||||||
|
%% server will calculated this independently, which
|
||||||
|
%% isn't exactly what ought to happen for best data
|
||||||
|
%% integrity checking. In server-side CR, the csum
|
||||||
|
%% should be calculated by the head and passed down
|
||||||
|
%% the chain together with the value.
|
||||||
|
CS = machi_util:checksum_chunk(Chunk),
|
||||||
|
machi_util:make_tagged_csum(server_sha, CS);
|
||||||
|
check_or_make_tagged_checksum(?CSUM_TAG_CLIENT_SHA, Client_CSum, Chunk) ->
|
||||||
|
CS = machi_util:checksum_chunk(Chunk),
|
||||||
|
if CS == Client_CSum ->
|
||||||
|
machi_util:make_tagged_csum(server_sha,
|
||||||
|
Client_CSum);
|
||||||
|
true ->
|
||||||
|
throw({bad_csum, CS})
|
||||||
|
end.
|
||||||
|
|
||||||
|
%%%% High PB mode %%%%
|
||||||
|
|
||||||
|
do_pb_hl_request(#mpb_request{req_id=ReqID}, #state{pb_mode=low}=S) ->
|
||||||
|
Result = {low_error, 41, "High protocol request while in low mode"},
|
||||||
|
{machi_pb_translate:to_pb_response(ReqID, unused, Result), S};
|
||||||
|
do_pb_hl_request(PB_request, S) ->
|
||||||
|
{ReqID, Cmd} = machi_pb_translate:from_pb_request(PB_request),
|
||||||
|
{Result, S2} = do_pb_hl_request2(Cmd, S),
|
||||||
|
{machi_pb_translate:to_pb_response(ReqID, Cmd, Result), S2}.
|
||||||
|
|
||||||
|
do_pb_hl_request2({high_echo, Msg}, S) ->
|
||||||
|
{Msg, S};
|
||||||
|
do_pb_hl_request2({high_auth, _User, _Pass}, S) ->
|
||||||
|
{-77, S};
|
||||||
|
do_pb_hl_request2({high_append_chunk, CoC_Namespace, CoC_Locator,
|
||||||
|
Prefix, ChunkBin, TaggedCSum,
|
||||||
|
ChunkExtra}, #state{high_clnt=Clnt}=S) ->
|
||||||
|
Chunk = {TaggedCSum, ChunkBin},
|
||||||
|
Res = machi_cr_client:append_chunk_extra(Clnt, CoC_Namespace, CoC_Locator,
|
||||||
|
Prefix, Chunk,
|
||||||
|
ChunkExtra),
|
||||||
|
{Res, S};
|
||||||
|
do_pb_hl_request2({high_write_chunk, File, Offset, ChunkBin, TaggedCSum},
|
||||||
|
#state{high_clnt=Clnt}=S) ->
|
||||||
|
Chunk = {TaggedCSum, ChunkBin},
|
||||||
|
Res = machi_cr_client:write_chunk(Clnt, File, Offset, Chunk),
|
||||||
|
{Res, S};
|
||||||
|
do_pb_hl_request2({high_read_chunk, File, Offset, Size, Opts},
|
||||||
|
#state{high_clnt=Clnt}=S) ->
|
||||||
|
Res = machi_cr_client:read_chunk(Clnt, File, Offset, Size, Opts),
|
||||||
|
{Res, S};
|
||||||
|
do_pb_hl_request2({high_trim_chunk, File, Offset, Size},
|
||||||
|
#state{high_clnt=Clnt}=S) ->
|
||||||
|
Res = machi_cr_client:trim_chunk(Clnt, File, Offset, Size),
|
||||||
|
{Res, S};
|
||||||
|
do_pb_hl_request2({high_checksum_list, File}, #state{high_clnt=Clnt}=S) ->
|
||||||
|
Res = machi_cr_client:checksum_list(Clnt, File),
|
||||||
|
{Res, S};
|
||||||
|
do_pb_hl_request2({high_list_files}, #state{high_clnt=Clnt}=S) ->
|
||||||
|
Res = machi_cr_client:list_files(Clnt),
|
||||||
|
{Res, S}.
|
||||||
|
|
||||||
|
make_high_clnt(#state{high_clnt=undefined}=S) ->
|
||||||
|
{ok, Proj} = machi_projection_store:read_latest_projection(
|
||||||
|
S#state.proj_store, private),
|
||||||
|
Ps = [P_srvr || {_, P_srvr} <- orddict:to_list(
|
||||||
|
Proj#projection_v1.members_dict)],
|
||||||
|
{ok, Clnt} = machi_cr_client:start_link(Ps),
|
||||||
|
S#state{high_clnt=Clnt};
|
||||||
|
make_high_clnt(S) ->
|
||||||
|
S.
|
|
@ -143,16 +143,19 @@ init([FluName, TcpPort, DataDir, Props0]) ->
|
||||||
|
|
||||||
FProxySupSpec = machi_file_proxy_sup:child_spec(FluName),
|
FProxySupSpec = machi_file_proxy_sup:child_spec(FluName),
|
||||||
|
|
||||||
|
ListenerSupSpec = {machi_listener_sup:make_listener_sup_name(FluName),
|
||||||
|
{machi_listener_sup, start_link, [FluName]},
|
||||||
|
permanent, ?SHUTDOWN, supervisor, []},
|
||||||
|
|
||||||
FluSpec = {FluName,
|
FluSpec = {FluName,
|
||||||
{machi_flu1, start_link,
|
{machi_flu1, start_link,
|
||||||
[ [{FluName, TcpPort, DataDir}|Props] ]},
|
[ [{FluName, TcpPort, DataDir}|Props] ]},
|
||||||
permanent, ?SHUTDOWN, worker, []},
|
permanent, ?SHUTDOWN, worker, []},
|
||||||
|
|
||||||
|
|
||||||
{ok, {SupFlags, [
|
{ok, {SupFlags, [
|
||||||
ProjSpec, FitnessSpec, MgrSpec,
|
ProjSpec, FitnessSpec, MgrSpec,
|
||||||
FProxySupSpec, FNameMgrSpec, MetaSupSpec,
|
FProxySupSpec, FNameMgrSpec, MetaSupSpec,
|
||||||
FluSpec]}}.
|
ListenerSupSpec, FluSpec]}}.
|
||||||
|
|
||||||
make_flu_regname(FluName) when is_atom(FluName) ->
|
make_flu_regname(FluName) when is_atom(FluName) ->
|
||||||
FluName.
|
FluName.
|
||||||
|
|
89
src/machi_listener_sup.erl
Normal file
89
src/machi_listener_sup.erl
Normal file
|
@ -0,0 +1,89 @@
|
||||||
|
%% -------------------------------------------------------------------
|
||||||
|
%%
|
||||||
|
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% This file is provided to you under the Apache License,
|
||||||
|
%% Version 2.0 (the "License"); you may not use this file
|
||||||
|
%% except in compliance with the License. You may obtain
|
||||||
|
%% a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing,
|
||||||
|
%% software distributed under the License is distributed on an
|
||||||
|
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
%% KIND, either express or implied. See the License for the
|
||||||
|
%% specific language governing permissions and limitations
|
||||||
|
%% under the License.
|
||||||
|
%%
|
||||||
|
%% -------------------------------------------------------------------
|
||||||
|
|
||||||
|
%% @doc A supervisor to hold ranch listener for sigle FLU.
|
||||||
|
%% It holds at most one child worker.
|
||||||
|
|
||||||
|
%% TODO: This supervisor is maybe useless. First introduced for
|
||||||
|
%% workaround to start listener dynamically in flu1 initialization
|
||||||
|
%% time. Because psup is being blocked in flu1 initialization time,
|
||||||
|
%% adding a child to psup leads to deadlock. If initialization can be
|
||||||
|
%% done only by static arguments, then this supervisor should be
|
||||||
|
%% removed and added as a direct child of `machi_flu_psup'.
|
||||||
|
|
||||||
|
-module(machi_listener_sup).
|
||||||
|
-behaviour(supervisor).
|
||||||
|
|
||||||
|
%% public API
|
||||||
|
-export([start_link/1,
|
||||||
|
start_listener/6,
|
||||||
|
stop_listener/1,
|
||||||
|
make_listener_sup_name/1,
|
||||||
|
make_listener_name/1]).
|
||||||
|
|
||||||
|
%% supervisor callback
|
||||||
|
-export([init/1]).
|
||||||
|
|
||||||
|
-include("machi_projection.hrl").
|
||||||
|
|
||||||
|
-define(BACKLOG, 8192).
|
||||||
|
|
||||||
|
-spec start_link(pv1_server()) -> {ok, pid()}.
|
||||||
|
start_link(FluName) ->
|
||||||
|
supervisor:start_link({local, make_listener_sup_name(FluName)}, ?MODULE, []).
|
||||||
|
|
||||||
|
-spec start_listener(pv1_server(), inet:port_number(), boolean(),
|
||||||
|
string(), ets:tab(), atom() | pid()) -> {ok, pid()}.
|
||||||
|
start_listener(FluName, TcpPort, Witness, DataDir, EpochTab, ProjStore) ->
|
||||||
|
supervisor:start_child(make_listener_sup_name(FluName),
|
||||||
|
child_spec(FluName, TcpPort, Witness, DataDir,
|
||||||
|
EpochTab, ProjStore)).
|
||||||
|
|
||||||
|
-spec stop_listener(pv1_server()) -> ok.
|
||||||
|
stop_listener(FluName) ->
|
||||||
|
SupName = make_listener_sup_name(FluName),
|
||||||
|
ListenerName = make_listener_name(FluName),
|
||||||
|
ok = supervisor:terminate_child(SupName, ListenerName),
|
||||||
|
ok = supervisor:delete_child(SupName, ListenerName).
|
||||||
|
|
||||||
|
-spec make_listener_name(pv1_server()) -> atom().
|
||||||
|
make_listener_sup_name(FluName) when is_atom(FluName) ->
|
||||||
|
list_to_atom(atom_to_list(FluName) ++ "_listener_sup").
|
||||||
|
|
||||||
|
-spec make_listener_sup_name(pv1_server()) -> atom().
|
||||||
|
make_listener_name(FluName) ->
|
||||||
|
list_to_atom(atom_to_list(FluName) ++ "_listener").
|
||||||
|
|
||||||
|
%% Supervisor callback
|
||||||
|
|
||||||
|
init([]) ->
|
||||||
|
SupFlags = {one_for_one, 1000, 10},
|
||||||
|
{ok, {SupFlags, []}}.
|
||||||
|
|
||||||
|
-spec child_spec(pv1_server(), inet:port_number(), boolean(),
|
||||||
|
string(), ets:tab(), atom() | pid()) -> supervisor:child_spec().
|
||||||
|
child_spec(FluName, TcpPort, Witness, DataDir, EpochTab, ProjStore) ->
|
||||||
|
ListenerName = make_listener_name(FluName),
|
||||||
|
NbAcceptors = 100,
|
||||||
|
TcpOpts = [{port, TcpPort}, {backlog, ?BACKLOG}],
|
||||||
|
NetServerOpts = [FluName, Witness, DataDir, EpochTab, ProjStore],
|
||||||
|
ranch:child_spec(ListenerName, NbAcceptors,
|
||||||
|
ranch_tcp, TcpOpts,
|
||||||
|
machi_flu1_net_server, NetServerOpts).
|
|
@ -47,8 +47,6 @@ start_link() ->
|
||||||
supervisor:start_link({local, ?SERVER}, ?MODULE, []).
|
supervisor:start_link({local, ?SERVER}, ?MODULE, []).
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
%% {_, Ps} = process_info(self(), links),
|
|
||||||
%% [unlink(P) || P <- Ps],
|
|
||||||
RestartStrategy = one_for_one,
|
RestartStrategy = one_for_one,
|
||||||
MaxRestarts = 1000,
|
MaxRestarts = 1000,
|
||||||
MaxSecondsBetweenRestarts = 3600,
|
MaxSecondsBetweenRestarts = 3600,
|
||||||
|
@ -59,12 +57,8 @@ init([]) ->
|
||||||
Shutdown = ?SHUTDOWN,
|
Shutdown = ?SHUTDOWN,
|
||||||
Type = supervisor,
|
Type = supervisor,
|
||||||
|
|
||||||
ServerSup =
|
FluSup = {machi_flu_sup, {machi_flu_sup, start_link, []},
|
||||||
{machi_flu_sup, {machi_flu_sup, start_link, []},
|
Restart, Shutdown, Type, []},
|
||||||
Restart, Shutdown, Type, []},
|
RanchSup = {ranch_sup, {ranch_sup, start_link, []},
|
||||||
|
Restart, Shutdown, supervisor, [ranch_sup]},
|
||||||
{ok, {SupFlags, [ServerSup]}}.
|
{ok, {SupFlags, [FluSup, RanchSup]}}.
|
||||||
|
|
||||||
%% AChild = {'AName', {'AModule', start_link, []},
|
|
||||||
%% Restart, Shutdown, Type, ['AModule']},
|
|
||||||
%% {ok, {SupFlags, [AChild]}}.
|
|
||||||
|
|
|
@ -44,43 +44,46 @@ verify_file_checksums_test2() ->
|
||||||
TcpPort = 32958,
|
TcpPort = 32958,
|
||||||
DataDir = "./data",
|
DataDir = "./data",
|
||||||
W_props = [{initial_wedged, false}],
|
W_props = [{initial_wedged, false}],
|
||||||
machi_flu1_test:start_flu_package(verify1_flu, TcpPort, DataDir,
|
|
||||||
W_props),
|
|
||||||
Sock1 = ?FLU_C:connect(#p_srvr{address=Host, port=TcpPort}),
|
|
||||||
try
|
try
|
||||||
Prefix = <<"verify_prefix">>,
|
machi_test_util:start_flu_package(verify1_flu, TcpPort, DataDir,
|
||||||
NumChunks = 10,
|
W_props),
|
||||||
[{ok, _} = ?FLU_C:append_chunk(Sock1, ?DUMMY_PV1_EPOCH,
|
Sock1 = ?FLU_C:connect(#p_srvr{address=Host, port=TcpPort}),
|
||||||
Prefix, <<X:(X*8)/big>>) ||
|
try
|
||||||
X <- lists:seq(1, NumChunks)],
|
Prefix = <<"verify_prefix">>,
|
||||||
{ok, [{_FileSize,File}]} = ?FLU_C:list_files(Sock1, ?DUMMY_PV1_EPOCH),
|
NumChunks = 10,
|
||||||
?assertEqual({ok, []},
|
[{ok, _} = ?FLU_C:append_chunk(Sock1, ?DUMMY_PV1_EPOCH,
|
||||||
machi_admin_util:verify_file_checksums_remote(
|
Prefix, <<X:(X*8)/big>>) ||
|
||||||
Host, TcpPort, ?DUMMY_PV1_EPOCH, File)),
|
X <- lists:seq(1, NumChunks)],
|
||||||
|
{ok, [{_FileSize,File}]} = ?FLU_C:list_files(Sock1, ?DUMMY_PV1_EPOCH),
|
||||||
|
?assertEqual({ok, []},
|
||||||
|
machi_admin_util:verify_file_checksums_remote(
|
||||||
|
Host, TcpPort, ?DUMMY_PV1_EPOCH, File)),
|
||||||
|
|
||||||
%% Clobber the first 3 chunks, which are sizes 1/2/3.
|
%% Clobber the first 3 chunks, which are sizes 1/2/3.
|
||||||
{_, Path} = machi_util:make_data_filename(DataDir,binary_to_list(File)),
|
{_, Path} = machi_util:make_data_filename(DataDir,binary_to_list(File)),
|
||||||
{ok, FH} = file:open(Path, [read,write]),
|
{ok, FH} = file:open(Path, [read,write]),
|
||||||
{ok, _} = file:position(FH, ?MINIMUM_OFFSET),
|
{ok, _} = file:position(FH, ?MINIMUM_OFFSET),
|
||||||
ok = file:write(FH, "y"),
|
ok = file:write(FH, "y"),
|
||||||
ok = file:write(FH, "yo"),
|
ok = file:write(FH, "yo"),
|
||||||
ok = file:write(FH, "yo!"),
|
ok = file:write(FH, "yo!"),
|
||||||
ok = file:close(FH),
|
ok = file:close(FH),
|
||||||
|
|
||||||
%% Check the local flavor of the API: should be 3 bad checksums
|
%% Check the local flavor of the API: should be 3 bad checksums
|
||||||
{ok, Res1} = machi_admin_util:verify_file_checksums_local(
|
{ok, Res1} = machi_admin_util:verify_file_checksums_local(
|
||||||
Host, TcpPort, ?DUMMY_PV1_EPOCH, Path),
|
Host, TcpPort, ?DUMMY_PV1_EPOCH, Path),
|
||||||
3 = length(Res1),
|
3 = length(Res1),
|
||||||
|
|
||||||
%% Check the remote flavor of the API: should be 3 bad checksums
|
%% Check the remote flavor of the API: should be 3 bad checksums
|
||||||
{ok, Res2} = machi_admin_util:verify_file_checksums_remote(
|
{ok, Res2} = machi_admin_util:verify_file_checksums_remote(
|
||||||
Host, TcpPort, ?DUMMY_PV1_EPOCH, File),
|
Host, TcpPort, ?DUMMY_PV1_EPOCH, File),
|
||||||
3 = length(Res2),
|
3 = length(Res2),
|
||||||
|
|
||||||
ok
|
ok
|
||||||
|
after
|
||||||
|
catch ?FLU_C:quit(Sock1)
|
||||||
|
end
|
||||||
after
|
after
|
||||||
catch ?FLU_C:quit(Sock1),
|
catch machi_test_util:stop_flu_package()
|
||||||
catch machi_flu1_test:stop_flu_package(verify1_flu)
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-endif. % !PULSE
|
-endif. % !PULSE
|
||||||
|
|
|
@ -342,7 +342,7 @@ setup_target(Num, Seed, Verbose) ->
|
||||||
setup_chain(Seed, AllListE, FLUNames, MgrNames, Dict) ->
|
setup_chain(Seed, AllListE, FLUNames, MgrNames, Dict) ->
|
||||||
ok = shutdown_hard(),
|
ok = shutdown_hard(),
|
||||||
[begin
|
[begin
|
||||||
machi_flu1_test:clean_up_data_dir(Dir),
|
machi_test_util:clean_up_dir(Dir),
|
||||||
filelib:ensure_dir(Dir ++ "/not-used")
|
filelib:ensure_dir(Dir ++ "/not-used")
|
||||||
end || {_P, Dir} <- AllListE],
|
end || {_P, Dir} <- AllListE],
|
||||||
[catch ets:delete(T) || T <- tabs()],
|
[catch ets:delete(T) || T <- tabs()],
|
||||||
|
|
|
@ -274,42 +274,31 @@ make_prop_ets() ->
|
||||||
-endif. % EQC
|
-endif. % EQC
|
||||||
|
|
||||||
smoke0_test() ->
|
smoke0_test() ->
|
||||||
{ok, _} = machi_partition_simulator:start_link({1,2,3}, 50, 50),
|
|
||||||
Host = "localhost",
|
|
||||||
TcpPort = 6623,
|
TcpPort = 6623,
|
||||||
{ok, FLUa} = machi_flu1:start_link([{a,TcpPort,"./data.a"}]),
|
{[Pa], [M0], _Dirs} = machi_test_util:start_flu_packages(
|
||||||
Pa = #p_srvr{name=a, address=Host, port=TcpPort},
|
1, TcpPort, "./data.", []),
|
||||||
Members_Dict = machi_projection:make_members_dict([Pa]),
|
|
||||||
%% Egadz, more racing on startup, yay. TODO fix.
|
|
||||||
timer:sleep(1),
|
|
||||||
{ok, FLUaP} = ?FLU_PC:start_link(Pa),
|
{ok, FLUaP} = ?FLU_PC:start_link(Pa),
|
||||||
{ok, M0} = ?MGR:start_link(a, Members_Dict, [{active_mode, false}]),
|
|
||||||
try
|
try
|
||||||
pong = ?MGR:ping(M0)
|
pong = ?MGR:ping(M0)
|
||||||
after
|
after
|
||||||
ok = ?MGR:stop(M0),
|
|
||||||
ok = machi_flu1:stop(FLUa),
|
|
||||||
ok = ?FLU_PC:quit(FLUaP),
|
ok = ?FLU_PC:quit(FLUaP),
|
||||||
ok = machi_partition_simulator:stop()
|
machi_test_util:stop_flu_packages()
|
||||||
end.
|
end.
|
||||||
|
|
||||||
smoke1_test_() ->
|
smoke1_test_() ->
|
||||||
{timeout, 1*60, fun() -> smoke1_test2() end}.
|
{timeout, 1*60, fun() -> smoke1_test2() end}.
|
||||||
|
|
||||||
smoke1_test2() ->
|
smoke1_test2() ->
|
||||||
machi_partition_simulator:start_link({1,2,3}, 100, 0),
|
|
||||||
TcpPort = 62777,
|
TcpPort = 62777,
|
||||||
FluInfo = [{a,TcpPort+0,"./data.a"}, {b,TcpPort+1,"./data.b"}, {c,TcpPort+2,"./data.c"}],
|
MgrOpts = [{active_mode,false}],
|
||||||
P_s = [#p_srvr{name=Name, address="localhost", port=Port} ||
|
|
||||||
{Name,Port,_Dir} <- FluInfo],
|
|
||||||
|
|
||||||
[machi_flu1_test:clean_up_data_dir(Dir) || {_,_,Dir} <- FluInfo],
|
|
||||||
FLUs = [element(2, machi_flu1:start_link([{Name,Port,Dir}])) ||
|
|
||||||
{Name,Port,Dir} <- FluInfo],
|
|
||||||
MembersDict = machi_projection:make_members_dict(P_s),
|
|
||||||
{ok, M0} = ?MGR:start_link(a, MembersDict, [{active_mode,false}]),
|
|
||||||
try
|
try
|
||||||
{ok, P1} = ?MGR:test_calc_projection(M0, false),
|
{Ps, MgrNames, _Dirs} = machi_test_util:start_flu_packages(
|
||||||
|
3, TcpPort, "./data.", MgrOpts),
|
||||||
|
MembersDict = machi_projection:make_members_dict(Ps),
|
||||||
|
[machi_chain_manager1:set_chain_members(M, MembersDict) || M <- MgrNames],
|
||||||
|
Ma = hd(MgrNames),
|
||||||
|
|
||||||
|
{ok, P1} = ?MGR:test_calc_projection(Ma, false),
|
||||||
% DERP! Check for race with manager's proxy vs. proj listener
|
% DERP! Check for race with manager's proxy vs. proj listener
|
||||||
ok = lists:foldl(
|
ok = lists:foldl(
|
||||||
fun(_, {_,{true,[{c,ok},{b,ok},{a,ok}]}}) ->
|
fun(_, {_,{true,[{c,ok},{b,ok},{a,ok}]}}) ->
|
||||||
|
@ -318,40 +307,32 @@ smoke1_test2() ->
|
||||||
ok; % Skip remaining!
|
ok; % Skip remaining!
|
||||||
(_, _Else) ->
|
(_, _Else) ->
|
||||||
timer:sleep(10),
|
timer:sleep(10),
|
||||||
?MGR:test_write_public_projection(M0, P1)
|
?MGR:test_write_public_projection(Ma, P1)
|
||||||
end, not_ok, lists:seq(1, 1000)),
|
end, not_ok, lists:seq(1, 1000)),
|
||||||
%% Writing the exact same projection multiple times returns ok:
|
%% Writing the exact same projection multiple times returns ok:
|
||||||
%% no change!
|
%% no change!
|
||||||
{_,{true,[{c,ok},{b,ok},{a,ok}]}} = ?MGR:test_write_public_projection(M0, P1),
|
{_,{true,[{c,ok},{b,ok},{a,ok}]}} = ?MGR:test_write_public_projection(Ma, P1),
|
||||||
{unanimous, P1, Extra1} = ?MGR:test_read_latest_public_projection(M0, false),
|
{unanimous, P1, Extra1} = ?MGR:test_read_latest_public_projection(Ma, false),
|
||||||
|
|
||||||
ok
|
ok
|
||||||
after
|
after
|
||||||
ok = ?MGR:stop(M0),
|
machi_test_util:stop_flu_packages()
|
||||||
[ok = machi_flu1:stop(X) || X <- FLUs],
|
|
||||||
ok = machi_partition_simulator:stop()
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
nonunanimous_setup_and_fix_test() ->
|
nonunanimous_setup_and_fix_test() ->
|
||||||
machi_partition_simulator:start_link({1,2,3}, 100, 0),
|
|
||||||
TcpPort = 62877,
|
TcpPort = 62877,
|
||||||
FluInfo = [{a,TcpPort+0,"./data.a"}, {b,TcpPort+1,"./data.b"}],
|
MgrOpts = [{active_mode,false}],
|
||||||
P_s = [#p_srvr{name=Name, address="localhost", port=Port} ||
|
{Ps, [Ma,Mb], _Dirs} = machi_test_util:start_flu_packages(
|
||||||
{Name,Port,_Dir} <- FluInfo],
|
2, TcpPort, "./data.", MgrOpts),
|
||||||
|
MembersDict = machi_projection:make_members_dict(Ps),
|
||||||
[machi_flu1_test:clean_up_data_dir(Dir) || {_,_,Dir} <- FluInfo],
|
[machi_chain_manager1:set_chain_members(M, MembersDict) || M <- [Ma, Mb]],
|
||||||
{ok, SupPid} = machi_flu_sup:start_link(),
|
|
||||||
Opts = [{active_mode, false}],
|
|
||||||
%% {ok, Mb} = ?MGR:start_link(b, MembersDict, [{active_mode, false}]++XX),
|
|
||||||
[{ok,_}=machi_flu_psup:start_flu_package(Name, Port, Dir, Opts) ||
|
|
||||||
{Name,Port,Dir} <- FluInfo],
|
|
||||||
[Proxy_a, Proxy_b] = Proxies =
|
[Proxy_a, Proxy_b] = Proxies =
|
||||||
[element(2,?FLU_PC:start_link(P)) || P <- P_s],
|
[element(2, ?FLU_PC:start_link(P)) || P <- Ps],
|
||||||
MembersDict = machi_projection:make_members_dict(P_s),
|
|
||||||
[Ma,Mb] = [a_chmgr, b_chmgr],
|
|
||||||
ok = machi_chain_manager1:set_chain_members(Ma, MembersDict, []),
|
|
||||||
ok = machi_chain_manager1:set_chain_members(Mb, MembersDict, []),
|
|
||||||
try
|
try
|
||||||
|
ok = machi_chain_manager1:set_chain_members(Ma, MembersDict, []),
|
||||||
|
ok = machi_chain_manager1:set_chain_members(Mb, MembersDict, []),
|
||||||
{ok, P1} = ?MGR:test_calc_projection(Ma, false),
|
{ok, P1} = ?MGR:test_calc_projection(Ma, false),
|
||||||
|
|
||||||
P1a = machi_projection:update_checksum(
|
P1a = machi_projection:update_checksum(
|
||||||
|
@ -394,9 +375,8 @@ nonunanimous_setup_and_fix_test() ->
|
||||||
|
|
||||||
ok
|
ok
|
||||||
after
|
after
|
||||||
exit(SupPid, normal),
|
|
||||||
[ok = ?FLU_PC:quit(X) || X <- Proxies],
|
[ok = ?FLU_PC:quit(X) || X <- Proxies],
|
||||||
ok = machi_partition_simulator:stop()
|
machi_test_util:stop_flu_packages()
|
||||||
end.
|
end.
|
||||||
|
|
||||||
unanimous_report_test() ->
|
unanimous_report_test() ->
|
||||||
|
|
|
@ -48,7 +48,7 @@ setup() ->
|
||||||
{c,#p_srvr{name=c, address="localhost", port=5557, props="./data.c"}}
|
{c,#p_srvr{name=c, address="localhost", port=5557, props="./data.c"}}
|
||||||
],
|
],
|
||||||
[os:cmd("rm -rf " ++ P#p_srvr.props) || {_,P} <- Ps],
|
[os:cmd("rm -rf " ++ P#p_srvr.props) || {_,P} <- Ps],
|
||||||
{ok, SupPid} = machi_flu_sup:start_link(),
|
{ok, SupPid} = machi_sup:start_link(),
|
||||||
%% Only run a, don't run b & c so we have 100% failures talking to them
|
%% Only run a, don't run b & c so we have 100% failures talking to them
|
||||||
[begin
|
[begin
|
||||||
#p_srvr{name=Name, port=Port, props=Dir} = P,
|
#p_srvr{name=Name, port=Port, props=Dir} = P,
|
||||||
|
|
|
@ -96,7 +96,7 @@ run_ticks(MgrList) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
smoke_test2() ->
|
smoke_test2() ->
|
||||||
{ok, SupPid} = machi_flu_sup:start_link(),
|
{ok, SupPid} = machi_sup:start_link(),
|
||||||
error_logger:tty(false),
|
error_logger:tty(false),
|
||||||
try
|
try
|
||||||
Prefix = <<"pre">>,
|
Prefix = <<"pre">>,
|
||||||
|
@ -208,7 +208,7 @@ io:format(user, "\nFiles = ~p\n", [machi_cr_client:list_files(C1)]),
|
||||||
witness_smoke_test_() -> {timeout, 1*60, fun() -> witness_smoke_test2() end}.
|
witness_smoke_test_() -> {timeout, 1*60, fun() -> witness_smoke_test2() end}.
|
||||||
|
|
||||||
witness_smoke_test2() ->
|
witness_smoke_test2() ->
|
||||||
SupPid = case machi_flu_sup:start_link() of
|
SupPid = case machi_sup:start_link() of
|
||||||
{ok, P} -> P;
|
{ok, P} -> P;
|
||||||
{error, {already_started, P1}} -> P1;
|
{error, {already_started, P1}} -> P1;
|
||||||
Other -> error(Other)
|
Other -> error(Other)
|
||||||
|
|
|
@ -98,7 +98,8 @@ machi_file_proxy_test_() ->
|
||||||
?_assertMatch({ok, {_, []}}, machi_file_proxy:read(Pid, 1, 1024)),
|
?_assertMatch({ok, {_, []}}, machi_file_proxy:read(Pid, 1, 1024)),
|
||||||
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1024, ?HYOOGE)),
|
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, 1024, ?HYOOGE)),
|
||||||
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, ?HYOOGE, 1)),
|
?_assertEqual({error, not_written}, machi_file_proxy:read(Pid, ?HYOOGE, 1)),
|
||||||
?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1, random_binary(0, ?HYOOGE))),
|
{timeout, 10,
|
||||||
|
?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1, random_binary(0, ?HYOOGE)))},
|
||||||
?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, random_binary(0, 1024))),
|
?_assertMatch({ok, "test", _}, machi_file_proxy:append(Pid, random_binary(0, 1024))),
|
||||||
?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1024, <<"fail">>)),
|
?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1024, <<"fail">>)),
|
||||||
?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1, <<"fail">>)),
|
?_assertEqual({error, written}, machi_file_proxy:write(Pid, 1, <<"fail">>)),
|
||||||
|
|
|
@ -30,57 +30,16 @@
|
||||||
-define(FLU, machi_flu1).
|
-define(FLU, machi_flu1).
|
||||||
-define(FLU_C, machi_flu1_client).
|
-define(FLU_C, machi_flu1_client).
|
||||||
|
|
||||||
clean_up_data_dir(DataDir) ->
|
|
||||||
[begin
|
|
||||||
Fs = filelib:wildcard(DataDir ++ Glob),
|
|
||||||
[file:delete(F) || F <- Fs],
|
|
||||||
[file:del_dir(F) || F <- Fs]
|
|
||||||
end || Glob <- ["*/*/*/*", "*/*/*", "*/*", "*"] ],
|
|
||||||
_ = file:del_dir(DataDir),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
start_flu_package(RegName, TcpPort, DataDir) ->
|
|
||||||
start_flu_package(RegName, TcpPort, DataDir, []).
|
|
||||||
|
|
||||||
start_flu_package(RegName, TcpPort, DataDir, Props) ->
|
|
||||||
case proplists:get_value(save_data_dir, Props) of
|
|
||||||
true ->
|
|
||||||
ok;
|
|
||||||
_ ->
|
|
||||||
clean_up_data_dir(DataDir)
|
|
||||||
end,
|
|
||||||
|
|
||||||
maybe_start_sup(),
|
|
||||||
machi_flu_psup:start_flu_package(RegName, TcpPort, DataDir, Props).
|
|
||||||
|
|
||||||
stop_flu_package(FluName) ->
|
|
||||||
machi_flu_psup:stop_flu_package(FluName),
|
|
||||||
Pid = whereis(machi_sup),
|
|
||||||
exit(Pid, normal),
|
|
||||||
machi_util:wait_for_death(Pid, 100).
|
|
||||||
|
|
||||||
maybe_start_sup() ->
|
|
||||||
case whereis(machi_sup) of
|
|
||||||
undefined ->
|
|
||||||
machi_sup:start_link(),
|
|
||||||
%% evil but we have to let stuff start up
|
|
||||||
timer:sleep(10),
|
|
||||||
maybe_start_sup();
|
|
||||||
Pid -> Pid
|
|
||||||
end.
|
|
||||||
|
|
||||||
|
|
||||||
-ifndef(PULSE).
|
-ifndef(PULSE).
|
||||||
|
|
||||||
flu_smoke_test() ->
|
flu_smoke_test() ->
|
||||||
Host = "localhost",
|
Host = "localhost",
|
||||||
TcpPort = 32957,
|
TcpPort = 12957,
|
||||||
DataDir = "./data",
|
DataDir = "./data",
|
||||||
Prefix = <<"prefix!">>,
|
Prefix = <<"prefix!">>,
|
||||||
BadPrefix = BadFile = "no/good",
|
BadPrefix = BadFile = "no/good",
|
||||||
|
|
||||||
W_props = [{initial_wedged, false}],
|
W_props = [{initial_wedged, false}],
|
||||||
start_flu_package(smoke_flu, TcpPort, DataDir, W_props),
|
{_, _, _} = machi_test_util:start_flu_package(smoke_flu, TcpPort, DataDir, W_props),
|
||||||
try
|
try
|
||||||
Msg = "Hello, world!",
|
Msg = "Hello, world!",
|
||||||
Msg = ?FLU_C:echo(Host, TcpPort, Msg),
|
Msg = ?FLU_C:echo(Host, TcpPort, Msg),
|
||||||
|
@ -178,22 +137,21 @@ flu_smoke_test() ->
|
||||||
ok = ?FLU_C:quit(?FLU_C:connect(#p_srvr{address=Host,
|
ok = ?FLU_C:quit(?FLU_C:connect(#p_srvr{address=Host,
|
||||||
port=TcpPort}))
|
port=TcpPort}))
|
||||||
after
|
after
|
||||||
stop_flu_package(smoke_flu)
|
machi_test_util:stop_flu_package()
|
||||||
end.
|
end.
|
||||||
|
|
||||||
flu_projection_smoke_test() ->
|
flu_projection_smoke_test() ->
|
||||||
Host = "localhost",
|
Host = "localhost",
|
||||||
TcpPort = 32959,
|
TcpPort = 12959,
|
||||||
DataDir = "./data.projst",
|
DataDir = "./data.projst",
|
||||||
|
{_,_,_} = machi_test_util:start_flu_package(projection_test_flu, TcpPort, DataDir),
|
||||||
start_flu_package(projection_test_flu, TcpPort, DataDir),
|
|
||||||
try
|
try
|
||||||
[ok = flu_projection_common(Host, TcpPort, T) ||
|
[ok = flu_projection_common(Host, TcpPort, T) ||
|
||||||
T <- [public, private] ]
|
T <- [public, private] ]
|
||||||
%% , {ok, {false, EpochID1}} = ?FLU_C:wedge_status(Host, TcpPort),
|
%% , {ok, {false, EpochID1}} = ?FLU_C:wedge_status(Host, TcpPort),
|
||||||
%% io:format(user, "EpochID1 ~p\n", [EpochID1])
|
%% io:format(user, "EpochID1 ~p\n", [EpochID1])
|
||||||
after
|
after
|
||||||
stop_flu_package(projection_test_flu)
|
machi_test_util:stop_flu_package()
|
||||||
end.
|
end.
|
||||||
|
|
||||||
flu_projection_common(Host, TcpPort, T) ->
|
flu_projection_common(Host, TcpPort, T) ->
|
||||||
|
@ -221,11 +179,10 @@ flu_projection_common(Host, TcpPort, T) ->
|
||||||
|
|
||||||
bad_checksum_test() ->
|
bad_checksum_test() ->
|
||||||
Host = "localhost",
|
Host = "localhost",
|
||||||
TcpPort = 32960,
|
TcpPort = 12960,
|
||||||
DataDir = "./data.bct",
|
DataDir = "./data.bct",
|
||||||
|
|
||||||
Opts = [{initial_wedged, false}],
|
Opts = [{initial_wedged, false}],
|
||||||
start_flu_package(projection_test_flu, TcpPort, DataDir, Opts),
|
{_,_,_} = machi_test_util:start_flu_package(projection_test_flu, TcpPort, DataDir, Opts),
|
||||||
try
|
try
|
||||||
Prefix = <<"some prefix">>,
|
Prefix = <<"some prefix">>,
|
||||||
Chunk1 = <<"yo yo yo">>,
|
Chunk1 = <<"yo yo yo">>,
|
||||||
|
@ -235,16 +192,15 @@ bad_checksum_test() ->
|
||||||
Prefix, Chunk1_badcs),
|
Prefix, Chunk1_badcs),
|
||||||
ok
|
ok
|
||||||
after
|
after
|
||||||
stop_flu_package(projection_test_flu)
|
machi_test_util:stop_flu_package()
|
||||||
end.
|
end.
|
||||||
|
|
||||||
witness_test() ->
|
witness_test() ->
|
||||||
Host = "localhost",
|
Host = "localhost",
|
||||||
TcpPort = 32961,
|
TcpPort = 12961,
|
||||||
DataDir = "./data.witness",
|
DataDir = "./data.witness",
|
||||||
|
|
||||||
Opts = [{initial_wedged, false}, {witness_mode, true}],
|
Opts = [{initial_wedged, false}, {witness_mode, true}],
|
||||||
start_flu_package(projection_test_flu, TcpPort, DataDir, Opts),
|
{_,_,_} = machi_test_util:start_flu_package(projection_test_flu, TcpPort, DataDir, Opts),
|
||||||
try
|
try
|
||||||
Prefix = <<"some prefix">>,
|
Prefix = <<"some prefix">>,
|
||||||
Chunk1 = <<"yo yo yo">>,
|
Chunk1 = <<"yo yo yo">>,
|
||||||
|
@ -276,7 +232,7 @@ witness_test() ->
|
||||||
|
|
||||||
ok
|
ok
|
||||||
after
|
after
|
||||||
stop_flu_package(projection_test_flu)
|
machi_test_util:stop_flu_package()
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% The purpose of timing_pb_encoding_test_ and timing_bif_encoding_test_ is
|
%% The purpose of timing_pb_encoding_test_ and timing_bif_encoding_test_ is
|
||||||
|
|
|
@ -43,7 +43,7 @@ smoke_test2() ->
|
||||||
{c,#p_srvr{name=c, address="localhost", port=5552, props="./data.c"}}
|
{c,#p_srvr{name=c, address="localhost", port=5552, props="./data.c"}}
|
||||||
],
|
],
|
||||||
[os:cmd("rm -rf " ++ P#p_srvr.props) || {_,P} <- Ps],
|
[os:cmd("rm -rf " ++ P#p_srvr.props) || {_,P} <- Ps],
|
||||||
{ok, SupPid} = machi_flu_sup:start_link(),
|
{ok, SupPid} = machi_sup:start_link(),
|
||||||
try
|
try
|
||||||
%% Only run a, don't run b & c so we have 100% failures talking to them
|
%% Only run a, don't run b & c so we have 100% failures talking to them
|
||||||
[begin
|
[begin
|
||||||
|
@ -74,7 +74,7 @@ partial_stop_restart2() ->
|
||||||
PStores = [machi_flu_psup:make_proj_supname(P#p_srvr.name) || {_,P} <-Ps],
|
PStores = [machi_flu_psup:make_proj_supname(P#p_srvr.name) || {_,P} <-Ps],
|
||||||
Dict = orddict:from_list(Ps),
|
Dict = orddict:from_list(Ps),
|
||||||
[os:cmd("rm -rf " ++ P#p_srvr.props) || {_,P} <- Ps],
|
[os:cmd("rm -rf " ++ P#p_srvr.props) || {_,P} <- Ps],
|
||||||
{ok, SupPid} = machi_flu_sup:start_link(),
|
{ok, SupPid} = machi_sup:start_link(),
|
||||||
DbgProps = [{initial_wedged, true}],
|
DbgProps = [{initial_wedged, true}],
|
||||||
Start = fun({_,P}) ->
|
Start = fun({_,P}) ->
|
||||||
#p_srvr{name=Name, port=Port, props=Dir} = P,
|
#p_srvr{name=Name, port=Port, props=Dir} = P,
|
||||||
|
|
|
@ -34,21 +34,15 @@ smoke_test_() ->
|
||||||
{timeout, 5*60, fun() -> smoke_test2() end}.
|
{timeout, 5*60, fun() -> smoke_test2() end}.
|
||||||
|
|
||||||
smoke_test2() ->
|
smoke_test2() ->
|
||||||
Port = 5720,
|
PortBase = 5720,
|
||||||
Ps = [#p_srvr{name=a, address="localhost", port=Port, props="./data.a"}
|
|
||||||
],
|
|
||||||
D = orddict:from_list([{P#p_srvr.name, P} || P <- Ps]),
|
|
||||||
ok = application:set_env(machi, max_file_size, 1024*1024),
|
ok = application:set_env(machi, max_file_size, 1024*1024),
|
||||||
|
|
||||||
[os:cmd("rm -rf " ++ P#p_srvr.props) || P <- Ps],
|
|
||||||
{ok, SupPid} = machi_flu_sup:start_link(),
|
|
||||||
try
|
try
|
||||||
[begin
|
{Ps, MgrNames, Dirs} = machi_test_util:start_flu_packages(
|
||||||
#p_srvr{name=Name, port=Port, props=Dir} = P,
|
1, PortBase, "./data.", []),
|
||||||
{ok, _} = machi_flu_psup:start_flu_package(Name, Port, Dir, [])
|
D = orddict:from_list([{P#p_srvr.name, P} || P <- Ps]),
|
||||||
end || P <- Ps],
|
M0 = hd(MgrNames),
|
||||||
ok = machi_chain_manager1:set_chain_members(a_chmgr, D),
|
ok = machi_chain_manager1:set_chain_members(M0, D),
|
||||||
[machi_chain_manager1:trigger_react_to_env(a_chmgr) || _ <-lists:seq(1,5)],
|
[machi_chain_manager1:trigger_react_to_env(M0) || _ <-lists:seq(1,5)],
|
||||||
|
|
||||||
{ok, Clnt} = ?C:start_link(Ps),
|
{ok, Clnt} = ?C:start_link(Ps),
|
||||||
try
|
try
|
||||||
|
@ -94,7 +88,8 @@ smoke_test2() ->
|
||||||
|
|
||||||
File1Bin = binary_to_list(File1),
|
File1Bin = binary_to_list(File1),
|
||||||
[begin
|
[begin
|
||||||
#p_srvr{name=Name, port=Port, props=Dir} = P,
|
#p_srvr{name=Name, props=Props} = P,
|
||||||
|
Dir = proplists:get_value(data_dir, Props),
|
||||||
?assertEqual({ok, [File1Bin]},
|
?assertEqual({ok, [File1Bin]},
|
||||||
file:list_dir(filename:join([Dir, "data"]))),
|
file:list_dir(filename:join([Dir, "data"]))),
|
||||||
FileListFileName = filename:join([Dir, "known_files_" ++ atom_to_list(Name)]),
|
FileListFileName = filename:join([Dir, "known_files_" ++ atom_to_list(Name)]),
|
||||||
|
@ -122,7 +117,8 @@ smoke_test2() ->
|
||||||
%% Make sure everything was trimmed
|
%% Make sure everything was trimmed
|
||||||
File = binary_to_list(Filex),
|
File = binary_to_list(Filex),
|
||||||
[begin
|
[begin
|
||||||
#p_srvr{name=Name, port=_Port, props=Dir} = P,
|
#p_srvr{name=Name, props=Props} = P,
|
||||||
|
Dir = proplists:get_value(data_dir, Props),
|
||||||
?assertEqual({ok, []},
|
?assertEqual({ok, []},
|
||||||
file:list_dir(filename:join([Dir, "data"]))),
|
file:list_dir(filename:join([Dir, "data"]))),
|
||||||
FileListFileName = filename:join([Dir, "known_files_" ++ atom_to_list(Name)]),
|
FileListFileName = filename:join([Dir, "known_files_" ++ atom_to_list(Name)]),
|
||||||
|
@ -139,10 +135,7 @@ smoke_test2() ->
|
||||||
(catch ?C:quit(Clnt))
|
(catch ?C:quit(Clnt))
|
||||||
end
|
end
|
||||||
after
|
after
|
||||||
exit(SupPid, normal),
|
machi_test_util:stop_flu_packages()
|
||||||
[os:cmd("rm -rf " ++ P#p_srvr.props) || P <- Ps],
|
|
||||||
machi_util:wait_for_death(SupPid, 100),
|
|
||||||
ok
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-endif. % !PULSE
|
-endif. % !PULSE
|
||||||
|
|
|
@ -33,7 +33,7 @@ smoke_test() ->
|
||||||
Dir = "./data.a",
|
Dir = "./data.a",
|
||||||
Os = [{ignore_stability_time, true}, {active_mode, false}],
|
Os = [{ignore_stability_time, true}, {active_mode, false}],
|
||||||
os:cmd("rm -rf " ++ Dir),
|
os:cmd("rm -rf " ++ Dir),
|
||||||
machi_flu1_test:start_flu_package(a, PortBase, "./data.a", Os),
|
machi_test_util:start_flu_package(a, PortBase, "./data.a", Os),
|
||||||
|
|
||||||
try
|
try
|
||||||
P1 = machi_projection:new(1, a, [], [], [], [], []),
|
P1 = machi_projection:new(1, a, [], [], [], [], []),
|
||||||
|
@ -58,7 +58,7 @@ smoke_test() ->
|
||||||
|
|
||||||
ok
|
ok
|
||||||
after
|
after
|
||||||
machi_flu1_test:stop_flu_package(a)
|
machi_test_util:stop_flu_package()
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-endif. % !PULSE
|
-endif. % !PULSE
|
||||||
|
|
|
@ -32,16 +32,14 @@
|
||||||
|
|
||||||
api_smoke_test() ->
|
api_smoke_test() ->
|
||||||
RegName = api_smoke_flu,
|
RegName = api_smoke_flu,
|
||||||
Host = "localhost",
|
|
||||||
TcpPort = 57124,
|
TcpPort = 57124,
|
||||||
DataDir = "./data.api_smoke_flu",
|
DataDir = "./data.api_smoke_flu",
|
||||||
W_props = [{active_mode, false},{initial_wedged, false}],
|
W_props = [{active_mode, false},{initial_wedged, false}],
|
||||||
Prefix = <<"prefix">>,
|
Prefix = <<"prefix">>,
|
||||||
|
|
||||||
machi_flu1_test:start_flu_package(RegName, TcpPort, DataDir, W_props),
|
|
||||||
|
|
||||||
try
|
try
|
||||||
I = #p_srvr{name=RegName, address=Host, port=TcpPort},
|
{[I], _, _} = machi_test_util:start_flu_package(
|
||||||
|
RegName, TcpPort, DataDir, W_props),
|
||||||
{ok, Prox1} = ?MUT:start_link(I),
|
{ok, Prox1} = ?MUT:start_link(I),
|
||||||
try
|
try
|
||||||
FakeEpoch = ?DUMMY_PV1_EPOCH,
|
FakeEpoch = ?DUMMY_PV1_EPOCH,
|
||||||
|
@ -49,13 +47,13 @@ api_smoke_test() ->
|
||||||
FakeEpoch, Prefix, <<"data">>,
|
FakeEpoch, Prefix, <<"data">>,
|
||||||
infinity) || _ <- lists:seq(1,5)],
|
infinity) || _ <- lists:seq(1,5)],
|
||||||
%% Stop the FLU, what happens?
|
%% Stop the FLU, what happens?
|
||||||
machi_flu1_test:stop_flu_package(RegName),
|
machi_test_util:stop_flu_package(),
|
||||||
[{error,partition} = ?MUT:append_chunk(Prox1,
|
[{error,partition} = ?MUT:append_chunk(Prox1,
|
||||||
FakeEpoch, Prefix, <<"data-stopped1">>,
|
FakeEpoch, Prefix, <<"data-stopped1">>,
|
||||||
infinity) || _ <- lists:seq(1,3)],
|
infinity) || _ <- lists:seq(1,3)],
|
||||||
%% Start the FLU again, we should be able to do stuff immediately
|
%% Start the FLU again, we should be able to do stuff immediately
|
||||||
machi_flu1_test:start_flu_package(RegName, TcpPort, DataDir,
|
machi_test_util:start_flu_package(RegName, TcpPort, DataDir,
|
||||||
[save_data_dir|W_props]),
|
[no_cleanup|W_props]),
|
||||||
MyChunk = <<"my chunk data">>,
|
MyChunk = <<"my chunk data">>,
|
||||||
{ok, {MyOff,MySize,MyFile}} =
|
{ok, {MyOff,MySize,MyFile}} =
|
||||||
?MUT:append_chunk(Prox1, FakeEpoch, Prefix, MyChunk,
|
?MUT:append_chunk(Prox1, FakeEpoch, Prefix, MyChunk,
|
||||||
|
@ -102,7 +100,7 @@ api_smoke_test() ->
|
||||||
_ = (catch ?MUT:quit(Prox1))
|
_ = (catch ?MUT:quit(Prox1))
|
||||||
end
|
end
|
||||||
after
|
after
|
||||||
(catch machi_flu1_test:stop_flu_package(RegName))
|
(catch machi_test_util:stop_flu_package())
|
||||||
end.
|
end.
|
||||||
|
|
||||||
flu_restart_test_() ->
|
flu_restart_test_() ->
|
||||||
|
@ -110,14 +108,13 @@ flu_restart_test_() ->
|
||||||
|
|
||||||
flu_restart_test2() ->
|
flu_restart_test2() ->
|
||||||
RegName = a,
|
RegName = a,
|
||||||
Host = "localhost",
|
|
||||||
TcpPort = 57125,
|
TcpPort = 57125,
|
||||||
DataDir = "./data.api_smoke_flu2",
|
DataDir = "./data.api_smoke_flu2",
|
||||||
W_props = [{initial_wedged, false}, {active_mode, false}],
|
W_props = [{initial_wedged, false}, {active_mode, false}],
|
||||||
machi_flu1_test:start_flu_package(RegName, TcpPort, DataDir, W_props),
|
|
||||||
|
|
||||||
try
|
try
|
||||||
I = #p_srvr{name=RegName, address=Host, port=TcpPort},
|
{[I], _, _} = machi_test_util:start_flu_package(
|
||||||
|
RegName, TcpPort, DataDir, W_props),
|
||||||
{ok, Prox1} = ?MUT:start_link(I),
|
{ok, Prox1} = ?MUT:start_link(I),
|
||||||
try
|
try
|
||||||
FakeEpoch = ?DUMMY_PV1_EPOCH,
|
FakeEpoch = ?DUMMY_PV1_EPOCH,
|
||||||
|
@ -137,7 +134,7 @@ flu_restart_test2() ->
|
||||||
{ok, EpochID} = ?MUT:get_epoch_id(Prox1),
|
{ok, EpochID} = ?MUT:get_epoch_id(Prox1),
|
||||||
{ok, EpochID} = ?MUT:get_latest_epochid(Prox1, public),
|
{ok, EpochID} = ?MUT:get_latest_epochid(Prox1, public),
|
||||||
{ok, EpochID} = ?MUT:get_latest_epochid(Prox1, private),
|
{ok, EpochID} = ?MUT:get_latest_epochid(Prox1, private),
|
||||||
ok = machi_flu1_test:stop_flu_package(RegName), timer:sleep(50),
|
ok = machi_test_util:stop_flu_package(), timer:sleep(50),
|
||||||
|
|
||||||
%% Now that the last proxy op was successful and only
|
%% Now that the last proxy op was successful and only
|
||||||
%% after did we stop the FLU, let's check that both the
|
%% after did we stop the FLU, let's check that both the
|
||||||
|
@ -151,7 +148,7 @@ flu_restart_test2() ->
|
||||||
|
|
||||||
ExpectedOps =
|
ExpectedOps =
|
||||||
[
|
[
|
||||||
fun(run) -> {ok, EpochID} = ?MUT:get_epoch_id(Prox1),
|
fun(run) -> ?assertEqual({ok, EpochID}, ?MUT:get_epoch_id(Prox1)),
|
||||||
ok;
|
ok;
|
||||||
(line) -> io:format("line ~p, ", [?LINE]);
|
(line) -> io:format("line ~p, ", [?LINE]);
|
||||||
(stop) -> ?MUT:get_epoch_id(Prox1) end,
|
(stop) -> ?MUT:get_epoch_id(Prox1) end,
|
||||||
|
@ -293,13 +290,13 @@ flu_restart_test2() ->
|
||||||
],
|
],
|
||||||
|
|
||||||
[begin
|
[begin
|
||||||
machi_flu1_test:start_flu_package(
|
machi_test_util:start_flu_package(
|
||||||
RegName, TcpPort, DataDir,
|
RegName, TcpPort, DataDir,
|
||||||
[save_data_dir|W_props]),
|
[no_cleanup|W_props]),
|
||||||
_ = Fun(line),
|
_ = Fun(line),
|
||||||
ok = Fun(run),
|
ok = Fun(run),
|
||||||
ok = Fun(run),
|
ok = Fun(run),
|
||||||
ok = machi_flu1_test:stop_flu_package(RegName),
|
ok = machi_test_util:stop_flu_package(),
|
||||||
{error, partition} = Fun(stop),
|
{error, partition} = Fun(stop),
|
||||||
{error, partition} = Fun(stop),
|
{error, partition} = Fun(stop),
|
||||||
ok
|
ok
|
||||||
|
@ -309,8 +306,8 @@ flu_restart_test2() ->
|
||||||
_ = (catch ?MUT:quit(Prox1))
|
_ = (catch ?MUT:quit(Prox1))
|
||||||
end
|
end
|
||||||
after
|
after
|
||||||
(catch machi_flu1_test:stop_flu_package(RegName))
|
(catch machi_test_util:stop_flu_package())
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-endif. % !PULSE
|
-endif. % !PULSE
|
||||||
-endif. % TEST
|
-endif. % TEST
|
||||||
|
|
111
test/machi_test_util.erl
Normal file
111
test/machi_test_util.erl
Normal file
|
@ -0,0 +1,111 @@
|
||||||
|
%% -------------------------------------------------------------------
|
||||||
|
%%
|
||||||
|
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% This file is provided to you under the Apache License,
|
||||||
|
%% Version 2.0 (the "License"); you may not use this file
|
||||||
|
%% except in compliance with the License. You may obtain
|
||||||
|
%% a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing,
|
||||||
|
%% software distributed under the License is distributed on an
|
||||||
|
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
%% KIND, either express or implied. See the License for the
|
||||||
|
%% specific language governing permissions and limitations
|
||||||
|
%% under the License.
|
||||||
|
%%
|
||||||
|
%% -------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(machi_test_util).
|
||||||
|
-compile(export_all).
|
||||||
|
|
||||||
|
-ifdef(TEST).
|
||||||
|
-ifndef(PULSE).
|
||||||
|
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
|
-include("machi.hrl").
|
||||||
|
-include("machi_projection.hrl").
|
||||||
|
|
||||||
|
-define(FLU, machi_flu1).
|
||||||
|
-define(FLU_C, machi_flu1_client).
|
||||||
|
|
||||||
|
-spec start_flu_package(atom(), inet:port_number(), string()) ->
|
||||||
|
{Ps::[#p_srvr{}], MgrNames::[atom()], Dirs::[string()]}.
|
||||||
|
start_flu_package(FluName, TcpPort, DataDir) ->
|
||||||
|
start_flu_package(FluName, TcpPort, DataDir, []).
|
||||||
|
|
||||||
|
-spec start_flu_package(atom(), inet:port_number(), string(), list()) ->
|
||||||
|
{Ps::[#p_srvr{}], MgrNames::[atom()], Dirs::[string()]}.
|
||||||
|
start_flu_package(FluName, TcpPort, DataDir, Props) ->
|
||||||
|
MgrName = machi_flu_psup:make_mgr_supname(FluName),
|
||||||
|
FluInfo = [{#p_srvr{name=FluName, address="localhost", port=TcpPort,
|
||||||
|
props=[{chmgr, MgrName}, {data_dir, DataDir} | Props]},
|
||||||
|
DataDir, MgrName}],
|
||||||
|
start_flu_packages(FluInfo).
|
||||||
|
|
||||||
|
-spec start_flu_packages(pos_integer(), inet:port_number(), string(), list()) ->
|
||||||
|
{Ps::[#p_srvr{}], MgrNames::[atom()], Dirs::[string()]}.
|
||||||
|
start_flu_packages(FluCount, BaseTcpPort, DirPrefix, Props) ->
|
||||||
|
FluInfo = flu_info(FluCount, BaseTcpPort, DirPrefix, Props),
|
||||||
|
start_flu_packages(FluInfo).
|
||||||
|
|
||||||
|
start_flu_packages(FluInfo) ->
|
||||||
|
_ = stop_machi_sup(),
|
||||||
|
clean_up(FluInfo),
|
||||||
|
{ok, _SupPid} = machi_sup:start_link(),
|
||||||
|
[{ok, _} = machi_flu_psup:start_flu_package(Name, Port, Dir, Props) ||
|
||||||
|
{#p_srvr{name=Name, port=Port, props=Props}, Dir, _} <- FluInfo],
|
||||||
|
{Ps, Dirs, MgrNames} = lists:unzip3(FluInfo),
|
||||||
|
{Ps, MgrNames, Dirs}.
|
||||||
|
|
||||||
|
stop_flu_package() ->
|
||||||
|
stop_flu_packages().
|
||||||
|
|
||||||
|
stop_flu_packages() ->
|
||||||
|
stop_machi_sup().
|
||||||
|
|
||||||
|
flu_info(FluCount, BaseTcpPort, DirPrefix, Props) ->
|
||||||
|
[begin
|
||||||
|
FLUNameStr = [$a + I - 1],
|
||||||
|
FLUName = list_to_atom(FLUNameStr),
|
||||||
|
MgrName = machi_flu_psup:make_mgr_supname(FLUName),
|
||||||
|
DataDir = DirPrefix ++ "/data.eqc." ++ FLUNameStr,
|
||||||
|
{#p_srvr{name=FLUName, address="localhost", port=BaseTcpPort + I,
|
||||||
|
props=[{chmgr, MgrName}, {data_dir, DataDir} | Props]},
|
||||||
|
DataDir, MgrName}
|
||||||
|
end || I <- lists:seq(1, FluCount)].
|
||||||
|
|
||||||
|
stop_machi_sup() ->
|
||||||
|
case whereis(machi_sup) of
|
||||||
|
undefined -> ok;
|
||||||
|
Pid ->
|
||||||
|
catch exit(whereis(machi_sup), normal),
|
||||||
|
machi_util:wait_for_death(Pid, 30)
|
||||||
|
end.
|
||||||
|
|
||||||
|
clean_up(FluInfo) ->
|
||||||
|
_ = [begin
|
||||||
|
case proplists:get_value(no_cleanup, Props) of
|
||||||
|
true -> ok;
|
||||||
|
_ ->
|
||||||
|
_ = machi_flu1:stop(FLUName),
|
||||||
|
clean_up_dir(Dir)
|
||||||
|
end
|
||||||
|
end || {#p_srvr{name=FLUName, props=Props}, Dir, _} <- FluInfo],
|
||||||
|
ok.
|
||||||
|
|
||||||
|
clean_up_dir(Dir) ->
|
||||||
|
[begin
|
||||||
|
Fs = filelib:wildcard(Dir ++ Glob),
|
||||||
|
[file:delete(F) || F <- Fs],
|
||||||
|
[file:del_dir(F) || F <- Fs]
|
||||||
|
end || Glob <- ["*/*/*/*", "*/*/*", "*/*", "*"] ],
|
||||||
|
_ = file:del_dir(Dir),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
-endif. % !PULSE
|
||||||
|
-endif. % TEST
|
||||||
|
|
Loading…
Reference in a new issue