512 lines
18 KiB
Erlang
512 lines
18 KiB
Erlang
%% -------------------------------------------------------------------
|
|
%%
|
|
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
|
|
%%
|
|
%% This file is provided to you under the Apache License,
|
|
%% Version 2.0 (the "License"); you may not use this file
|
|
%% except in compliance with the License. You may obtain
|
|
%% a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing,
|
|
%% software distributed under the License is distributed on an
|
|
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
%% KIND, either express or implied. See the License for the
|
|
%% specific language governing permissions and limitations
|
|
%% under the License.
|
|
%%
|
|
%% -------------------------------------------------------------------
|
|
|
|
%% @doc Erlang API for the Machi FLU TCP protocol version 1, with a
|
|
%% proxy-process style API for hiding messy details such as TCP
|
|
%% connection/disconnection with the remote Machi server.
|
|
%%
|
|
%% Machi is intentionally avoiding using distributed Erlang for
|
|
%% Machi's communication. This design decision makes Erlang-side code
|
|
%% more difficult & complex, but it's the price to pay for some
|
|
%% language independence. Later in Machi's life cycle, we may (?) need to
|
|
%% (re-)implement some components in a non-Erlang/BEAM-based language.
|
|
%%
|
|
%% This module implements a "man in the middle" proxy between the
|
|
%% Erlang client and Machi server (which is on the "far side" of a TCP
|
|
%% connection to somewhere). This proxy process will always execute
|
|
%% on the same Erlang node as the Erlang client that uses it. The
|
|
%% proxy is intended to be a stable, long-lived process that survives
|
|
%% TCP communication problems with the remote server.
|
|
%%
|
|
%% For a higher level interface, see {@link machi_cr_client}.
|
|
%% For a lower level interface, see {@link machi_flu1_client}.
|
|
|
|
-module(machi_proxy_flu1_client).
|
|
|
|
-behaviour(gen_server).
|
|
|
|
-include("machi.hrl").
|
|
-include("machi_projection.hrl").
|
|
|
|
-ifdef(TEST).
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
-ifdef(PULSE).
|
|
-compile({parse_transform, pulse_instrument}).
|
|
-include_lib("pulse_otp/include/pulse_otp.hrl").
|
|
-endif.
|
|
-endif. % TEST.
|
|
|
|
-export([start_link/1]).
|
|
%% FLU1 API
|
|
-export([
|
|
%% File API
|
|
append_chunk/4, append_chunk/5,
|
|
append_chunk/6, append_chunk/7,
|
|
append_chunk_extra/5, append_chunk_extra/6,
|
|
append_chunk_extra/7, append_chunk_extra/8,
|
|
read_chunk/6, read_chunk/7,
|
|
checksum_list/3, checksum_list/4,
|
|
list_files/2, list_files/3,
|
|
wedge_status/1, wedge_status/2,
|
|
|
|
%% %% Projection API
|
|
get_epoch_id/1, get_epoch_id/2,
|
|
get_latest_epochid/2, get_latest_epochid/3,
|
|
read_latest_projection/2, read_latest_projection/3,
|
|
read_projection/3, read_projection/4,
|
|
write_projection/3, write_projection/4,
|
|
get_all_projections/2, get_all_projections/3,
|
|
list_all_projections/2, list_all_projections/3,
|
|
kick_projection_reaction/2, kick_projection_reaction/3,
|
|
|
|
%% Common API
|
|
quit/1,
|
|
|
|
%% Internal API
|
|
write_chunk/5, write_chunk/6,
|
|
trim_chunk/5, trim_chunk/6,
|
|
|
|
%% Helpers
|
|
stop_proxies/1, start_proxies/1
|
|
]).
|
|
|
|
%% gen_server callbacks
|
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
|
terminate/2, code_change/3]).
|
|
|
|
-record(state, {
|
|
i :: #p_srvr{},
|
|
sock :: 'undefined' | port()
|
|
}).
|
|
|
|
%% @doc Start a local, long-lived process that will be our steady
|
|
%% & reliable communication proxy with the fickle & flaky
|
|
%% remote Machi server.
|
|
|
|
start_link(#p_srvr{}=I) ->
|
|
gen_server:start_link(?MODULE, [I], []).
|
|
|
|
%% @doc Append a chunk (binary- or iolist-style) of data to a file
|
|
%% with `Prefix'.
|
|
|
|
append_chunk(PidSpec, EpochID, Prefix, Chunk) ->
|
|
append_chunk(PidSpec, EpochID, Prefix, Chunk, infinity).
|
|
|
|
%% @doc Append a chunk (binary- or iolist-style) of data to a file
|
|
%% with `Prefix'.
|
|
|
|
append_chunk(PidSpec, EpochID, Prefix, Chunk, Timeout) ->
|
|
append_chunk_extra(PidSpec, EpochID,
|
|
?DEFAULT_COC_NAMESPACE, ?DEFAULT_COC_LOCATOR,
|
|
Prefix, Chunk, 0, Timeout).
|
|
|
|
%% @doc Append a chunk (binary- or iolist-style) of data to a file
|
|
%% with `Prefix'.
|
|
|
|
append_chunk(PidSpec, EpochID, CoC_Namespace, CoC_Locator, Prefix, Chunk) ->
|
|
append_chunk(PidSpec, EpochID, CoC_Namespace, CoC_Locator, Prefix, Chunk, infinity).
|
|
|
|
%% @doc Append a chunk (binary- or iolist-style) of data to a file
|
|
%% with `Prefix'.
|
|
|
|
append_chunk(PidSpec, EpochID, CoC_Namespace, CoC_Locator, Prefix, Chunk, Timeout) ->
|
|
append_chunk_extra(PidSpec, EpochID,
|
|
CoC_Namespace, CoC_Locator,
|
|
Prefix, Chunk, 0, Timeout).
|
|
|
|
%% @doc Append a chunk (binary- or iolist-style) of data to a file
|
|
%% with `Prefix'.
|
|
|
|
append_chunk_extra(PidSpec, EpochID, Prefix, Chunk, ChunkExtra)
|
|
when is_integer(ChunkExtra), ChunkExtra >= 0 ->
|
|
append_chunk_extra(PidSpec, EpochID,
|
|
?DEFAULT_COC_NAMESPACE, ?DEFAULT_COC_LOCATOR,
|
|
Prefix, Chunk, ChunkExtra, infinity).
|
|
|
|
%% @doc Append a chunk (binary- or iolist-style) of data to a file
|
|
%% with `Prefix'.
|
|
|
|
append_chunk_extra(PidSpec, EpochID, Prefix, Chunk, ChunkExtra, Timeout) ->
|
|
append_chunk_extra(PidSpec, EpochID,
|
|
?DEFAULT_COC_NAMESPACE, ?DEFAULT_COC_LOCATOR,
|
|
Prefix, Chunk, ChunkExtra, Timeout).
|
|
|
|
append_chunk_extra(PidSpec, EpochID, CoC_Namespace, CoC_Locator,
|
|
Prefix, Chunk, ChunkExtra) ->
|
|
append_chunk_extra(PidSpec, EpochID, CoC_Namespace, CoC_Locator,
|
|
Prefix, Chunk, ChunkExtra, infinity).
|
|
|
|
append_chunk_extra(PidSpec, EpochID, CoC_Namespace, CoC_Locator,
|
|
Prefix, Chunk, ChunkExtra, Timeout) ->
|
|
gen_server:call(PidSpec, {req, {append_chunk_extra, EpochID,
|
|
CoC_Namespace, CoC_Locator,
|
|
Prefix, Chunk, ChunkExtra}},
|
|
Timeout).
|
|
|
|
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
|
|
|
|
read_chunk(PidSpec, EpochID, File, Offset, Size, Opts) ->
|
|
read_chunk(PidSpec, EpochID, File, Offset, Size, Opts, infinity).
|
|
|
|
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
|
|
|
|
read_chunk(PidSpec, EpochID, File, Offset, Size, Opts, Timeout) ->
|
|
gen_server:call(PidSpec, {req, {read_chunk, EpochID, File, Offset, Size, Opts}},
|
|
Timeout).
|
|
|
|
%% @doc Fetch the list of chunk checksums for `File'.
|
|
|
|
checksum_list(PidSpec, EpochID, File) ->
|
|
checksum_list(PidSpec, EpochID, File, infinity).
|
|
|
|
%% @doc Fetch the list of chunk checksums for `File'.
|
|
|
|
checksum_list(PidSpec, EpochID, File, Timeout) ->
|
|
gen_server:call(PidSpec, {req, {checksum_list, EpochID, File}},
|
|
Timeout).
|
|
|
|
%% @doc Fetch the list of all files on the remote FLU.
|
|
|
|
list_files(PidSpec, EpochID) ->
|
|
list_files(PidSpec, EpochID, infinity).
|
|
|
|
%% @doc Fetch the list of all files on the remote FLU.
|
|
|
|
list_files(PidSpec, EpochID, Timeout) ->
|
|
gen_server:call(PidSpec, {req, {list_files, EpochID}},
|
|
Timeout).
|
|
|
|
%% @doc Fetch the wedge status from the remote FLU.
|
|
|
|
wedge_status(PidSpec) ->
|
|
wedge_status(PidSpec, infinity).
|
|
|
|
%% @doc Fetch the wedge status from the remote FLU.
|
|
|
|
wedge_status(PidSpec, Timeout) ->
|
|
gen_server:call(PidSpec, {req, {wedge_status}},
|
|
Timeout).
|
|
|
|
%% @doc Get the `epoch_id()' of the FLU's current/latest projection.
|
|
|
|
get_epoch_id(PidSpec) ->
|
|
get_epoch_id(PidSpec, infinity).
|
|
|
|
%% @doc Get the `epoch_id()' of the FLU's current/latest projection.
|
|
|
|
get_epoch_id(PidSpec, Timeout) ->
|
|
gen_server:call(PidSpec, {req, {get_epoch_id}}, Timeout).
|
|
|
|
%% @doc Get the latest epoch number + checksum from the FLU's projection store.
|
|
|
|
get_latest_epochid(PidSpec, ProjType) ->
|
|
get_latest_epochid(PidSpec, ProjType, infinity).
|
|
|
|
%% @doc Get the latest epoch number + checksum from the FLU's projection store.
|
|
|
|
get_latest_epochid(PidSpec, ProjType, Timeout) ->
|
|
gen_server:call(PidSpec, {req, {get_latest_epochid, ProjType}},
|
|
Timeout).
|
|
|
|
%% @doc Get the latest projection from the FLU's projection store for `ProjType'
|
|
|
|
read_latest_projection(PidSpec, ProjType) ->
|
|
read_latest_projection(PidSpec, ProjType, infinity).
|
|
|
|
%% @doc Get the latest projection from the FLU's projection store for `ProjType'
|
|
|
|
read_latest_projection(PidSpec, ProjType, Timeout) ->
|
|
gen_server:call(PidSpec, {req, {read_latest_projection, ProjType}},
|
|
Timeout).
|
|
|
|
%% @doc Read a projection `Proj' of type `ProjType'.
|
|
|
|
read_projection(PidSpec, ProjType, Epoch) ->
|
|
read_projection(PidSpec, ProjType, Epoch, infinity).
|
|
|
|
%% @doc Read a projection `Proj' of type `ProjType'.
|
|
|
|
read_projection(PidSpec, ProjType, Epoch, Timeout) ->
|
|
gen_server:call(PidSpec, {req, {read_projection, ProjType, Epoch}},
|
|
Timeout).
|
|
|
|
%% @doc Write a projection `Proj' of type `ProjType'.
|
|
|
|
write_projection(PidSpec, ProjType, Proj) ->
|
|
write_projection(PidSpec, ProjType, Proj, infinity).
|
|
|
|
%% @doc Write a projection `Proj' of type `ProjType'.
|
|
|
|
write_projection(PidSpec, ProjType, Proj, Timeout) ->
|
|
case gen_server:call(PidSpec, {req, {write_projection, ProjType, Proj}},
|
|
Timeout) of
|
|
{error, written}=Err ->
|
|
Epoch = Proj#projection_v1.epoch_number,
|
|
case read_projection(PidSpec, ProjType, Epoch, Timeout) of
|
|
{ok, Proj2} when Proj2 == Proj ->
|
|
%% The proxy made (at least) two attempts to write
|
|
%% this projection. An earlier one appeared to
|
|
%% have failed, so the proxy retried. The later
|
|
%% attempt returned to us {error,written} because
|
|
%% the earlier attempt was actually received &
|
|
%% processed by the server. So, we consider this
|
|
%% a successful write.
|
|
ok;
|
|
_ ->
|
|
Err
|
|
end;
|
|
Else ->
|
|
Else
|
|
end.
|
|
|
|
%% @doc Get all projections from the FLU's projection store.
|
|
|
|
get_all_projections(PidSpec, ProjType) ->
|
|
get_all_projections(PidSpec, ProjType, infinity).
|
|
|
|
%% @doc Get all projections from the FLU's projection store.
|
|
|
|
get_all_projections(PidSpec, ProjType, Timeout) ->
|
|
gen_server:call(PidSpec, {req, {get_all_projections, ProjType}},
|
|
Timeout).
|
|
|
|
%% @doc Get all epoch numbers from the FLU's projection store.
|
|
|
|
list_all_projections(PidSpec, ProjType) ->
|
|
list_all_projections(PidSpec, ProjType, infinity).
|
|
|
|
%% @doc Get all epoch numbers from the FLU's projection store.
|
|
|
|
list_all_projections(PidSpec, ProjType, Timeout) ->
|
|
gen_server:call(PidSpec, {req, {list_all_projections, ProjType}},
|
|
Timeout).
|
|
|
|
%% @doc Kick (politely) the remote chain manager to react to a
|
|
%% projection change.
|
|
|
|
kick_projection_reaction(PidSpec, Options) ->
|
|
kick_projection_reaction(PidSpec, Options, infinity).
|
|
|
|
%% @doc Kick (politely) the remote chain manager to react to a
|
|
%% projection change.
|
|
|
|
kick_projection_reaction(PidSpec, Options, Timeout) ->
|
|
gen_server:call(PidSpec, {req, {kick_projection_reaction, Options}},
|
|
Timeout).
|
|
|
|
%% @doc Quit & close the connection to remote FLU and stop our
|
|
%% proxy process.
|
|
|
|
quit(PidSpec) ->
|
|
gen_server:call(PidSpec, quit, infinity).
|
|
|
|
%% @doc Write a chunk (binary- or iolist-style) of data to a file
|
|
%% with `Prefix' at `Offset'.
|
|
|
|
write_chunk(PidSpec, EpochID, File, Offset, Chunk) ->
|
|
write_chunk(PidSpec, EpochID, File, Offset, Chunk, infinity).
|
|
|
|
%% @doc Write a chunk (binary- or iolist-style) of data to a file
|
|
%% with `Prefix' at `Offset'.
|
|
|
|
write_chunk(PidSpec, EpochID, File, Offset, Chunk, Timeout) ->
|
|
case gen_server:call(PidSpec, {req, {write_chunk, EpochID, File, Offset, Chunk}},
|
|
Timeout) of
|
|
{error, written}=Err ->
|
|
Size = byte_size(Chunk),
|
|
case read_chunk(PidSpec, EpochID, File, Offset, Size, [], Timeout) of
|
|
{ok, {[{File, Offset, Chunk2, _}], []}} when Chunk2 == Chunk ->
|
|
%% See equivalent comment inside write_projection().
|
|
ok;
|
|
_ ->
|
|
Err
|
|
end;
|
|
Else ->
|
|
Else
|
|
end.
|
|
|
|
|
|
trim_chunk(PidSpec, EpochID, File, Offset, Size) ->
|
|
trim_chunk(PidSpec, EpochID, File, Offset, Size, infinity).
|
|
|
|
%% @doc Write a chunk (binary- or iolist-style) of data to a file
|
|
%% with `Prefix' at `Offset'.
|
|
|
|
trim_chunk(PidSpec, EpochID, File, Offset, Chunk, Timeout) ->
|
|
gen_server:call(PidSpec,
|
|
{req, {trim_chunk, EpochID, File, Offset, Chunk}},
|
|
Timeout).
|
|
|
|
%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
|
|
|
init([I]) ->
|
|
S0 = #state{i=I},
|
|
S1 = try_connect(S0),
|
|
{ok, S1}.
|
|
|
|
handle_call({req, Req}, _From, S) ->
|
|
{Reply, NewS} = do_req(Req, S),
|
|
{reply, Reply, NewS};
|
|
handle_call(quit, _From, S) ->
|
|
{stop, normal, ok, disconnect(S)};
|
|
handle_call(_Request, _From, S) ->
|
|
Reply = ok,
|
|
{reply, Reply, S}.
|
|
|
|
handle_cast(_Msg, S) ->
|
|
{noreply, S}.
|
|
|
|
handle_info(_Info, S) ->
|
|
io:format(user, "~s:handle_info: ~p\n", [?MODULE, _Info]),
|
|
{noreply, S}.
|
|
|
|
terminate(_Reason, _S) ->
|
|
ok.
|
|
|
|
code_change(_OldVsn, S, _Extra) ->
|
|
{ok, S}.
|
|
|
|
%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
|
|
|
do_req(Req, S) ->
|
|
do_req(Req, 1, S).
|
|
|
|
do_req(Req, Depth, S) ->
|
|
S2 = try_connect(S),
|
|
Fun = make_req_fun(Req, S2),
|
|
case connected_p(S2) of
|
|
true ->
|
|
case Fun() of
|
|
ok ->
|
|
{ok, S2};
|
|
T when element(1, T) == ok ->
|
|
{T, S2};
|
|
TheErr ->
|
|
case get(bad_sock) of
|
|
Bad when Bad == S2#state.sock ->
|
|
do_req_retry(Req, Depth, TheErr, S2);
|
|
_ ->
|
|
{TheErr, S2}
|
|
end
|
|
end;
|
|
false ->
|
|
{{error, partition}, S2}
|
|
end.
|
|
|
|
do_req_retry(_Req, 2, Err, S) ->
|
|
{Err, disconnect(S)};
|
|
do_req_retry(Req, Depth, _Err, S) ->
|
|
do_req(Req, Depth + 1, try_connect(disconnect(S))).
|
|
|
|
make_req_fun({append_chunk_extra, EpochID, CoC_Namespace, CoC_Locator,
|
|
Prefix, Chunk, ChunkExtra},
|
|
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
|
|
fun() -> Mod:append_chunk_extra(Sock, EpochID, CoC_Namespace, CoC_Locator,
|
|
Prefix, Chunk, ChunkExtra)
|
|
end;
|
|
make_req_fun({read_chunk, EpochID, File, Offset, Size, Opts},
|
|
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
|
|
fun() -> Mod:read_chunk(Sock, EpochID, File, Offset, Size, Opts) end;
|
|
make_req_fun({write_chunk, EpochID, File, Offset, Chunk},
|
|
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
|
|
fun() -> Mod:write_chunk(Sock, EpochID, File, Offset, Chunk) end;
|
|
make_req_fun({trim_chunk, EpochID, File, Offset, Size},
|
|
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
|
|
fun() -> Mod:trim_chunk(Sock, EpochID, File, Offset, Size) end;
|
|
make_req_fun({checksum_list, EpochID, File},
|
|
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
|
|
fun() -> Mod:checksum_list(Sock, EpochID, File) end;
|
|
make_req_fun({list_files, EpochID},
|
|
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
|
|
fun() -> Mod:list_files(Sock, EpochID) end;
|
|
make_req_fun({wedge_status},
|
|
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
|
|
fun() -> Mod:wedge_status(Sock) end;
|
|
make_req_fun({get_epoch_id},
|
|
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
|
|
fun() -> case Mod:read_latest_projection(Sock, private) of
|
|
{ok, P} ->
|
|
#projection_v1{epoch_number=Epoch,
|
|
epoch_csum=CSum} = P,
|
|
{ok, {Epoch, CSum}};
|
|
Error ->
|
|
Error
|
|
end
|
|
end;
|
|
make_req_fun({get_latest_epochid, ProjType},
|
|
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
|
|
fun() -> Mod:get_latest_epochid(Sock, ProjType) end;
|
|
make_req_fun({read_latest_projection, ProjType},
|
|
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
|
|
fun() -> Mod:read_latest_projection(Sock, ProjType) end;
|
|
make_req_fun({read_projection, ProjType, Epoch},
|
|
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
|
|
fun() -> Mod:read_projection(Sock, ProjType, Epoch) end;
|
|
make_req_fun({write_projection, ProjType, Proj},
|
|
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
|
|
fun() -> Mod:write_projection(Sock, ProjType, Proj) end;
|
|
make_req_fun({get_all_projections, ProjType},
|
|
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
|
|
fun() -> Mod:get_all_projections(Sock, ProjType) end;
|
|
make_req_fun({list_all_projections, ProjType},
|
|
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
|
|
fun() -> Mod:list_all_projections(Sock, ProjType) end;
|
|
make_req_fun({kick_projection_reaction, Options},
|
|
#state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) ->
|
|
fun() -> Mod:kick_projection_reaction(Sock, Options) end.
|
|
|
|
connected_p(#state{sock=SockMaybe,
|
|
i=#p_srvr{proto_mod=Mod}=_I}=_S) ->
|
|
Mod:connected_p(SockMaybe).
|
|
|
|
try_connect(#state{sock=undefined,
|
|
i=#p_srvr{proto_mod=Mod}=P}=S) ->
|
|
Sock = Mod:connect(P),
|
|
S#state{sock=Sock};
|
|
try_connect(S) ->
|
|
%% If we're connection-based, we're already connected.
|
|
%% If we're not connection-based, then there's nothing to do.
|
|
S.
|
|
|
|
disconnect(#state{sock=undefined}=S) ->
|
|
S;
|
|
disconnect(#state{sock=Sock,
|
|
i=#p_srvr{proto_mod=Mod}=_I}=S) ->
|
|
Mod:disconnect(Sock),
|
|
S#state{sock=undefined}.
|
|
|
|
|
|
stop_proxies(undefined) ->
|
|
[];
|
|
stop_proxies(ProxiesDict) ->
|
|
orddict:fold(
|
|
fun(_K, Pid, _Acc) ->
|
|
_ = (catch machi_proxy_flu1_client:quit(Pid))
|
|
end, [], ProxiesDict).
|
|
|
|
start_proxies(MembersDict) ->
|
|
Proxies = orddict:fold(
|
|
fun(K, P, Acc) ->
|
|
{ok, Pid} = machi_proxy_flu1_client:start_link(P),
|
|
[{K, Pid}|Acc]
|
|
end, [], orddict:to_list(MembersDict)),
|
|
orddict:from_list(Proxies).
|
|
|
|
|