machi/src/machi_proxy_flu1_client.erl

382 lines
13 KiB
Erlang
Raw Normal View History

%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
%% @doc Erlang API for the Machi FLU TCP protocol version 1, with a
%% proxy-process style API for hiding messy details such as TCP
%% connection/disconnection with the remote Machi server.
%%
%% Machi is intentionally avoiding using distributed Erlang for
%% Machi's communication. This design decision makes Erlang-side code
%% more difficult & complex, but it's the price to pay for some
%% language independence. Later in Machi's life cycle, we need to
%% (re-)implement some components in a non-Erlang/BEAM-based language.
%%
%% This module implements a "man in the middle" proxy between the
%% Erlang client and Machi server (which is on the "far side" of a TCP
%% connection to somewhere). This proxy process will always execute
%% on the same Erlang node as the Erlang client that uses it. The
%% proxy is intended to be a stable, long-lived process that survives
%% TCP communication problems with the remote server.
-module(machi_proxy_flu1_client).
-behaviour(gen_server).
-include("machi.hrl").
-include("machi_projection.hrl").
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif. % TEST.
-export([start_link/1]).
%% FLU1 API
-export([
%% File API
append_chunk/4, append_chunk/5,
2015-05-17 05:10:42 +00:00
append_chunk_extra/5, append_chunk_extra/6,
read_chunk/5, read_chunk/6,
checksum_list/3, checksum_list/4,
list_files/2, list_files/3,
wedge_status/1, wedge_status/2,
%% %% Projection API
get_epoch_id/1, get_epoch_id/2,
get_latest_epoch/2, get_latest_epoch/3,
read_latest_projection/2, read_latest_projection/3,
read_projection/3, read_projection/4,
write_projection/3, write_projection/4,
get_all_projections/2, get_all_projections/3,
list_all_projections/2, list_all_projections/3,
%% Common API
quit/1,
%% Internal API
write_chunk/5, write_chunk/6
]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-define(FLU_C, machi_flu1_client).
-record(state, {
i :: #p_srvr{},
sock :: 'undefined' | port()
}).
%% @doc Start a local, long-lived process that will be our steady
%% & reliable communication proxy with the fickle & flaky
%% remote Machi server.
start_link(#p_srvr{}=I) ->
gen_server:start_link(?MODULE, [I], []).
%% @doc Append a chunk (binary- or iolist-style) of data to a file
%% with `Prefix'.
append_chunk(PidSpec, EpochID, Prefix, Chunk) ->
append_chunk(PidSpec, EpochID, Prefix, Chunk, infinity).
%% @doc Append a chunk (binary- or iolist-style) of data to a file
%% with `Prefix'.
append_chunk(PidSpec, EpochID, Prefix, Chunk, Timeout) ->
gen_server:call(PidSpec, {req, {append_chunk, EpochID, Prefix, Chunk}},
Timeout).
2015-05-17 05:10:42 +00:00
%% @doc Append a chunk (binary- or iolist-style) of data to a file
%% with `Prefix'.
append_chunk_extra(PidSpec, EpochID, Prefix, Chunk, ChunkExtra)
when is_integer(ChunkExtra), ChunkExtra >= 0 ->
append_chunk_extra(PidSpec, EpochID, Prefix, Chunk, ChunkExtra, infinity).
%% @doc Append a chunk (binary- or iolist-style) of data to a file
%% with `Prefix'.
append_chunk_extra(PidSpec, EpochID, Prefix, Chunk, ChunkExtra, Timeout) ->
gen_server:call(PidSpec, {req, {append_chunk_extra, EpochID, Prefix,
Chunk, ChunkExtra}},
Timeout).
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
read_chunk(PidSpec, EpochID, File, Offset, Size) ->
read_chunk(PidSpec, EpochID, File, Offset, Size, infinity).
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
read_chunk(PidSpec, EpochID, File, Offset, Size, Timeout) ->
gen_server:call(PidSpec, {req, {read_chunk, EpochID, File, Offset, Size}},
Timeout).
%% @doc Fetch the list of chunk checksums for `File'.
checksum_list(PidSpec, EpochID, File) ->
checksum_list(PidSpec, EpochID, File, infinity).
%% @doc Fetch the list of chunk checksums for `File'.
checksum_list(PidSpec, EpochID, File, Timeout) ->
gen_server:call(PidSpec, {req, {checksum_list, EpochID, File}},
Timeout).
%% @doc Fetch the list of all files on the remote FLU.
list_files(PidSpec, EpochID) ->
list_files(PidSpec, EpochID, infinity).
%% @doc Fetch the list of all files on the remote FLU.
list_files(PidSpec, EpochID, Timeout) ->
gen_server:call(PidSpec, {req, {list_files, EpochID}},
Timeout).
%% @doc Fetch the wedge status from the remote FLU.
wedge_status(PidSpec) ->
wedge_status(PidSpec, infinity).
%% @doc Fetch the wedge status from the remote FLU.
wedge_status(PidSpec, Timeout) ->
gen_server:call(PidSpec, {req, {wedge_status}},
Timeout).
%% @doc Get the `epoch_id()' of the FLU's current/latest projection.
get_epoch_id(PidSpec) ->
get_epoch_id(PidSpec, infinity).
%% @doc Get the `epoch_id()' of the FLU's current/latest projection.
get_epoch_id(PidSpec, Timeout) ->
gen_server:call(PidSpec, {req, {get_epoch_id}}, Timeout).
%% @doc Get the latest epoch number + checksum from the FLU's projection store.
get_latest_epoch(PidSpec, ProjType) ->
get_latest_epoch(PidSpec, ProjType, infinity).
%% @doc Get the latest epoch number + checksum from the FLU's projection store.
get_latest_epoch(PidSpec, ProjType, Timeout) ->
gen_server:call(PidSpec, {req, {get_latest_epoch, ProjType}},
Timeout).
%% @doc Get the latest projection from the FLU's projection store for `ProjType'
read_latest_projection(PidSpec, ProjType) ->
read_latest_projection(PidSpec, ProjType, infinity).
%% @doc Get the latest projection from the FLU's projection store for `ProjType'
read_latest_projection(PidSpec, ProjType, Timeout) ->
gen_server:call(PidSpec, {req, {read_latest_projection, ProjType}},
Timeout).
%% @doc Read a projection `Proj' of type `ProjType'.
read_projection(PidSpec, ProjType, Epoch) ->
read_projection(PidSpec, ProjType, Epoch, infinity).
%% @doc Read a projection `Proj' of type `ProjType'.
read_projection(PidSpec, ProjType, Epoch, Timeout) ->
gen_server:call(PidSpec, {req, {read_projection, ProjType, Epoch}},
Timeout).
%% @doc Write a projection `Proj' of type `ProjType'.
write_projection(PidSpec, ProjType, Proj) ->
write_projection(PidSpec, ProjType, Proj, infinity).
%% @doc Write a projection `Proj' of type `ProjType'.
write_projection(PidSpec, ProjType, Proj, Timeout) ->
gen_server:call(PidSpec, {req, {write_projection, ProjType, Proj}},
Timeout).
%% @doc Get all projections from the FLU's projection store.
get_all_projections(PidSpec, ProjType) ->
get_all_projections(PidSpec, ProjType, infinity).
%% @doc Get all projections from the FLU's projection store.
get_all_projections(PidSpec, ProjType, Timeout) ->
gen_server:call(PidSpec, {req, {get_all_projections, ProjType}},
Timeout).
%% @doc Get all epoch numbers from the FLU's projection store.
list_all_projections(PidSpec, ProjType) ->
list_all_projections(PidSpec, ProjType, infinity).
%% @doc Get all epoch numbers from the FLU's projection store.
list_all_projections(PidSpec, ProjType, Timeout) ->
gen_server:call(PidSpec, {req, {list_all_projections, ProjType}},
Timeout).
%% @doc Quit & close the connection to remote FLU and stop our
%% proxy process.
quit(PidSpec) ->
gen_server:call(PidSpec, quit, infinity).
%% @doc Write a chunk (binary- or iolist-style) of data to a file
%% with `Prefix' at `Offset'.
write_chunk(PidSpec, EpochID, File, Offset, Chunk) ->
write_chunk(PidSpec, EpochID, File, Offset, Chunk, infinity).
%% @doc Write a chunk (binary- or iolist-style) of data to a file
%% with `Prefix' at `Offset'.
write_chunk(PidSpec, EpochID, File, Offset, Chunk, Timeout) ->
gen_server:call(PidSpec, {req, {write_chunk, EpochID, File, Offset, Chunk}},
Timeout).
%%%%%%%%%%%%%%%%%%%%%%%%%%%
init([I]) ->
S0 = #state{i=I},
S1 = try_connect(S0),
{ok, S1}.
handle_call({req, Req}, _From, S) ->
{Reply, NewS} = do_req(Req, S),
{reply, Reply, NewS};
handle_call(quit, _From, S) ->
{stop, normal, ok, disconnect(S)};
handle_call(_Request, _From, S) ->
Reply = ok,
{reply, Reply, S}.
handle_cast(_Msg, S) ->
{noreply, S}.
handle_info(_Info, S) ->
{noreply, S}.
terminate(_Reason, _S) ->
ok.
code_change(_OldVsn, S, _Extra) ->
{ok, S}.
%%%%%%%%%%%%%%%%%%%%%%%%%%%
do_req(Req, S) ->
S2 = try_connect(S),
Fun = make_req_fun(Req, S2),
case connected_p(S2) of
true ->
case Fun() of
T when element(1, T) == ok ->
{T, S2};
Else ->
case get(bad_sock) of
Bad when Bad == S2#state.sock ->
{Else, disconnect(S2)};
_ ->
{Else, S2}
end
end;
false ->
{{error, partition}, S2}
end.
make_req_fun({append_chunk, EpochID, Prefix, Chunk}, #state{sock=Sock}) ->
fun() -> ?FLU_C:append_chunk(Sock, EpochID, Prefix, Chunk) end;
2015-05-17 05:10:42 +00:00
make_req_fun({append_chunk_extra, EpochID, Prefix, Chunk, ChunkExtra}, #state{sock=Sock}) ->
fun() -> ?FLU_C:append_chunk_extra(Sock, EpochID, Prefix, Chunk, ChunkExtra) end;
make_req_fun({read_chunk, EpochID, File, Offset, Size}, #state{sock=Sock}) ->
fun() -> ?FLU_C:read_chunk(Sock, EpochID, File, Offset, Size) end;
make_req_fun({write_chunk, EpochID, File, Offset, Chunk}, #state{sock=Sock}) ->
fun() -> ?FLU_C:write_chunk(Sock, EpochID, File, Offset, Chunk) end;
make_req_fun({checksum_list, EpochID, File}, #state{sock=Sock}) ->
fun() -> ?FLU_C:checksum_list(Sock, EpochID, File) end;
make_req_fun({list_files, EpochID}, #state{sock=Sock}) ->
fun() -> ?FLU_C:list_files(Sock, EpochID) end;
make_req_fun({wedge_status}, #state{sock=Sock}) ->
fun() -> ?FLU_C:wedge_status(Sock) end;
make_req_fun({get_epoch_id}, #state{sock=Sock}) ->
fun() -> case ?FLU_C:read_latest_projection(Sock, private) of
{ok, P} ->
#projection_v1{epoch_number=Epoch,
epoch_csum=CSum} =
machi_chain_manager1:inner_projection_or_self(P),
{ok, {Epoch, CSum}};
Error ->
Error
end
end;
make_req_fun({get_latest_epoch, ProjType}, #state{sock=Sock}) ->
fun() -> ?FLU_C:get_latest_epoch(Sock, ProjType) end;
make_req_fun({read_latest_projection, ProjType}, #state{sock=Sock}) ->
fun() -> ?FLU_C:read_latest_projection(Sock, ProjType) end;
make_req_fun({read_projection, ProjType, Epoch}, #state{sock=Sock}) ->
fun() -> ?FLU_C:read_projection(Sock, ProjType, Epoch) end;
make_req_fun({write_projection, ProjType, Proj}, #state{sock=Sock}) ->
fun() -> ?FLU_C:write_projection(Sock, ProjType, Proj) end;
make_req_fun({get_all_projections, ProjType}, #state{sock=Sock}) ->
fun() -> ?FLU_C:get_all_projections(Sock, ProjType) end;
make_req_fun({list_all_projections, ProjType}, #state{sock=Sock}) ->
fun() -> ?FLU_C:list_all_projections(Sock, ProjType) end.
connected_p(#state{sock=SockMaybe,
i=#p_srvr{proto=ipv4}=_I}=_S) ->
is_port(SockMaybe);
connected_p(#state{i=#p_srvr{proto=disterl,
name=_NodeName}=_I}=_S) ->
true.
%% case net_adm:ping(NodeName) of
%% ping ->
%% true;
%% _ ->
%% false
%% end.
try_connect(#state{sock=undefined,
i=#p_srvr{proto=ipv4, address=Host, port=TcpPort}=_I}=S) ->
try
Sock = machi_util:connect(Host, TcpPort),
S#state{sock=Sock}
catch
_:_ ->
S
end;
try_connect(S) ->
%% If we're connection-based, we're already connected.
%% If we're not connection-based, then there's nothing to do.
S.
disconnect(#state{sock=Sock,
i=#p_srvr{proto=ipv4}=_I}=S) ->
(catch gen_tcp:close(Sock)),
S#state{sock=undefined};
disconnect(S) ->
S.