API overhaul, add machi_proxy_flu1_client.erl, add chain manager (tests commented out)

This commit is contained in:
Scott Lystig Fritchie 2015-04-06 18:43:52 +09:00
parent 1d63b93fc0
commit 16e283fe5b
7 changed files with 472 additions and 87 deletions

View file

@ -580,12 +580,12 @@ handle_projection_command({read_projection, ProjType, Epoch},
handle_projection_command({write_projection, ProjType, Proj},
#state{proj_store=ProjStore}) ->
machi_projection_store:write(ProjStore, ProjType, Proj);
handle_projection_command({get_all, ProjType},
handle_projection_command({get_all_projections, ProjType},
#state{proj_store=ProjStore}) ->
machi_projection_store:get_all(ProjStore, ProjType);
handle_projection_command({list_all, ProjType},
machi_projection_store:get_all_projections(ProjStore, ProjType);
handle_projection_command({list_all_projections, ProjType},
#state{proj_store=ProjStore}) ->
machi_projection_store:list_all(ProjStore, ProjType);
machi_projection_store:list_all_projections(ProjStore, ProjType);
handle_projection_command(Else, _S) ->
{error, unknown_cmd, Else}.

View file

@ -35,8 +35,8 @@
read_latest_projection/2, read_latest_projection/3,
read_projection/3, read_projection/4,
write_projection/3, write_projection/4,
get_all/2, get_all/3,
list_all/2, list_all/3,
get_all_projections/2, get_all_projections/3,
list_all_projections/2, list_all_projections/3,
%% Common API
quit/1
@ -54,7 +54,7 @@
-type chunk_pos() :: {file_offset(), chunk_size(), file_name_s()}.
-type chunk_size() :: non_neg_integer().
-type epoch_csum() :: binary().
-type epoch_num() :: non_neg_integer().
-type epoch_num() :: -1 | non_neg_integer().
-type epoch_id() :: {epoch_num(), epoch_csum()}.
-type file_info() :: {file_size(), file_name_s()}.
-type file_name() :: binary() | list().
@ -243,44 +243,44 @@ write_projection(Host, TcpPort, ProjType, Proj)
%% @doc Get all projections from the FLU's projection store.
-spec get_all(port(), projection_type()) ->
-spec get_all_projections(port(), projection_type()) ->
{ok, [projection()]} | {error, term()}.
get_all(Sock, ProjType)
get_all_projections(Sock, ProjType)
when ProjType == 'public' orelse ProjType == 'private' ->
get_all2(Sock, ProjType).
get_all_projections2(Sock, ProjType).
%% @doc Get all projections from the FLU's projection store.
-spec get_all(inet_host(), inet_port(),
-spec get_all_projections(inet_host(), inet_port(),
projection_type()) ->
{ok, [projection()]} | {error, term()}.
get_all(Host, TcpPort, ProjType)
get_all_projections(Host, TcpPort, ProjType)
when ProjType == 'public' orelse ProjType == 'private' ->
Sock = machi_util:connect(Host, TcpPort),
try
get_all2(Sock, ProjType)
get_all_projections2(Sock, ProjType)
after
catch gen_tcp:close(Sock)
end.
%% @doc Get all epoch numbers from the FLU's projection store.
-spec list_all(port(), projection_type()) ->
-spec list_all_projections(port(), projection_type()) ->
{ok, [non_neg_integer()]} | {error, term()}.
list_all(Sock, ProjType)
list_all_projections(Sock, ProjType)
when ProjType == 'public' orelse ProjType == 'private' ->
list_all2(Sock, ProjType).
list_all_projections2(Sock, ProjType).
%% @doc Get all epoch numbers from the FLU's projection store.
-spec list_all(inet_host(), inet_port(),
-spec list_all_projections(inet_host(), inet_port(),
projection_type()) ->
{ok, [non_neg_integer()]} | {error, term()}.
list_all(Host, TcpPort, ProjType)
list_all_projections(Host, TcpPort, ProjType)
when ProjType == 'public' orelse ProjType == 'private' ->
Sock = machi_util:connect(Host, TcpPort),
try
list_all2(Sock, ProjType)
list_all_projections2(Sock, ProjType)
after
catch gen_tcp:close(Sock)
end.
@ -365,6 +365,7 @@ trunc_hack(Host, TcpPort, EpochID, File) when is_integer(TcpPort) ->
%%%%%%%%%%%%%%%%%%%%%%%%%%%
append_chunk2(Sock, EpochID, Prefix0, Chunk0) ->
erase(bad_sock),
try
%% TODO: add client-side checksum to the server's protocol
%% _ = crypto:hash(md5, Chunk),
@ -391,47 +392,59 @@ append_chunk2(Sock, EpochID, Prefix0, Chunk0) ->
end
catch
throw:Error ->
put(bad_sock, Sock),
Error;
error:{badmatch,_}=BadMatch ->
put(bad_sock, Sock),
{error, {badmatch, BadMatch, erlang:get_stacktrace()}}
end.
read_chunk2(Sock, EpochID, File0, Offset, Size) ->
{EpochNum, EpochCSum} = EpochID,
EpochIDRaw = <<EpochNum:(4*8)/big, EpochCSum/binary>>,
File = machi_util:make_binary(File0),
PrefixHex = machi_util:int_to_hexbin(Offset, 64),
SizeHex = machi_util:int_to_hexbin(Size, 32),
CmdLF = [$R, 32, EpochIDRaw, PrefixHex, SizeHex, File, 10],
ok = gen_tcp:send(Sock, CmdLF),
case gen_tcp:recv(Sock, 3) of
{ok, <<"OK\n">>} ->
{ok, _Chunk}=Res = gen_tcp:recv(Sock, Size),
Res;
{ok, Else} ->
{ok, OldOpts} = inet:getopts(Sock, [packet]),
ok = inet:setopts(Sock, [{packet, line}]),
{ok, Else2} = gen_tcp:recv(Sock, 0),
ok = inet:setopts(Sock, OldOpts),
case Else of
<<"ERA">> ->
{error, todo_erasure_coded}; %% escript_cc_parse_ec_info(Sock, Line, Else2);
<<"ERR">> ->
case Else2 of
<<"OR BAD-IO\n">> ->
{error, no_such_file};
<<"OR NOT-ERASURE\n">> ->
{error, no_such_file};
<<"OR BAD-ARG\n">> ->
{error, bad_arg};
<<"OR PARTIAL-READ\n">> ->
{error, partial_read};
_ ->
{error, Else2}
end;
_ ->
{error, {whaaa, <<Else/binary, Else2/binary>>}}
end
erase(bad_sock),
try
{EpochNum, EpochCSum} = EpochID,
EpochIDRaw = <<EpochNum:(4*8)/big, EpochCSum/binary>>,
File = machi_util:make_binary(File0),
PrefixHex = machi_util:int_to_hexbin(Offset, 64),
SizeHex = machi_util:int_to_hexbin(Size, 32),
CmdLF = [$R, 32, EpochIDRaw, PrefixHex, SizeHex, File, 10],
ok = gen_tcp:send(Sock, CmdLF),
case gen_tcp:recv(Sock, 3) of
{ok, <<"OK\n">>} ->
{ok, _Chunk}=Res = gen_tcp:recv(Sock, Size),
Res;
{ok, Else} ->
{ok, OldOpts} = inet:getopts(Sock, [packet]),
ok = inet:setopts(Sock, [{packet, line}]),
{ok, Else2} = gen_tcp:recv(Sock, 0),
ok = inet:setopts(Sock, OldOpts),
case Else of
<<"ERA">> ->
{error, todo_erasure_coded}; %% escript_cc_parse_ec_info(Sock, Line, Else2);
<<"ERR">> ->
case Else2 of
<<"OR BAD-IO\n">> ->
{error, no_such_file};
<<"OR NOT-ERASURE\n">> ->
{error, no_such_file};
<<"OR BAD-ARG\n">> ->
{error, bad_arg};
<<"OR PARTIAL-READ\n">> ->
{error, partial_read};
_ ->
{error, Else2}
end;
_ ->
{error, {whaaa_todo, <<Else/binary, Else2/binary>>}}
end
end
catch
throw:Error ->
put(bad_sock, Sock),
Error;
error:{badmatch,_}=BadMatch ->
put(bad_sock, Sock),
{error, {badmatch, BadMatch, erlang:get_stacktrace()}}
end.
list2(Sock, EpochID) ->
@ -462,6 +475,7 @@ list3(Else, _Sock) ->
throw({server_protocol_error, Else}).
checksum_list2(Sock, EpochID, File) ->
erase(bad_sock),
try
{EpochNum, EpochCSum} = EpochID,
EpochIDRaw = <<EpochNum:(4*8)/big, EpochCSum/binary>>,
@ -484,8 +498,10 @@ checksum_list2(Sock, EpochID, File) ->
end
catch
throw:Error ->
put(bad_sock, Sock),
Error;
error:{badmatch,_}=BadMatch ->
put(bad_sock, Sock),
{error, {badmatch, BadMatch}}
end.
@ -515,6 +531,7 @@ checksum_list_finish(Chunks) ->
Line /= <<>>].
write_chunk2(Sock, EpochID, File0, Offset, Chunk0) ->
erase(bad_sock),
try
{EpochNum, EpochCSum} = EpochID,
EpochIDRaw = <<EpochNum:(4*8)/big, EpochCSum/binary>>,
@ -542,12 +559,15 @@ write_chunk2(Sock, EpochID, File0, Offset, Chunk0) ->
end
catch
throw:Error ->
put(bad_sock, Sock),
Error;
error:{badmatch,_}=BadMatch ->
put(bad_sock, Sock),
{error, {badmatch, BadMatch, erlang:get_stacktrace()}}
end.
delete_migration2(Sock, EpochID, File) ->
erase(bad_sock),
try
{EpochNum, EpochCSum} = EpochID,
EpochIDRaw = <<EpochNum:(4*8)/big, EpochCSum/binary>>,
@ -566,12 +586,15 @@ delete_migration2(Sock, EpochID, File) ->
end
catch
throw:Error ->
put(bad_sock, Sock),
Error;
error:{badmatch,_}=BadMatch ->
put(bad_sock, Sock),
{error, {badmatch, BadMatch}}
end.
trunc_hack2(Sock, EpochID, File) ->
erase(bad_sock),
try
{EpochNum, EpochCSum} = EpochID,
EpochIDRaw = <<EpochNum:(4*8)/big, EpochCSum/binary>>,
@ -590,8 +613,10 @@ trunc_hack2(Sock, EpochID, File) ->
end
catch
throw:Error ->
put(bad_sock, Sock),
Error;
error:{badmatch,_}=BadMatch ->
put(bad_sock, Sock),
{error, {badmatch, BadMatch}}
end.
@ -611,15 +636,16 @@ write_projection2(Sock, ProjType, Proj) ->
ProjCmd = {write_projection, ProjType, Proj},
do_projection_common(Sock, ProjCmd).
get_all2(Sock, ProjType) ->
ProjCmd = {get_all, ProjType},
get_all_projections2(Sock, ProjType) ->
ProjCmd = {get_all_projections, ProjType},
do_projection_common(Sock, ProjCmd).
list_all2(Sock, ProjType) ->
ProjCmd = {list_all, ProjType},
list_all_projections2(Sock, ProjType) ->
ProjCmd = {list_all_projections, ProjType},
do_projection_common(Sock, ProjCmd).
do_projection_common(Sock, ProjCmd) ->
erase(bad_sock),
try
ProjCmdBin = term_to_binary(ProjCmd),
Len = iolist_size(ProjCmdBin),
@ -641,7 +667,9 @@ do_projection_common(Sock, ProjCmd) ->
end
catch
throw:Error ->
put(bad_sock, Sock),
Error;
error:{badmatch,_}=BadMatch ->
put(bad_sock, Sock),
{error, {badmatch, BadMatch, erlang:get_stacktrace()}}
end.

View file

@ -29,21 +29,23 @@
read_latest_projection/2, read_latest_projection/3,
read/3, read/4,
write/3, write/4,
get_all/2, get_all/3,
list_all/2, list_all/3
get_all_projections/2, get_all_projections/3,
list_all_projections/2, list_all_projections/3
]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-define(NO_EPOCH, {-1,<<0:(20*8)/big>>}).
-record(state, {
public_dir = "" :: string(),
private_dir = "" :: string(),
wedged = true :: boolean(),
wedge_notify_pid :: pid() | atom(),
max_public_epoch = {-1,<<>>} :: -1 | non_neg_integer(),
max_private_epoch = {-1,<<>>} :: -1 | non_neg_integer()
max_public_epoch = ?NO_EPOCH :: {-1 | non_neg_integer(), binary()},
max_private_epoch = ?NO_EPOCH :: {-1 | non_neg_integer(), binary()}
}).
start_link(RegName, DataDir, NotifyWedgeStateChanges) ->
@ -82,19 +84,19 @@ write(PidSpec, ProjType, Proj, Timeout)
Proj#projection_v1.epoch_number >= 0 ->
g_call(PidSpec, {write, ProjType, Proj}, Timeout).
get_all(PidSpec, ProjType) ->
get_all(PidSpec, ProjType, infinity).
get_all_projections(PidSpec, ProjType) ->
get_all_projections(PidSpec, ProjType, infinity).
get_all(PidSpec, ProjType, Timeout)
get_all_projections(PidSpec, ProjType, Timeout)
when ProjType == 'public' orelse ProjType == 'private' ->
g_call(PidSpec, {get_all, ProjType}, Timeout).
g_call(PidSpec, {get_all_projections, ProjType}, Timeout).
list_all(PidSpec, ProjType) ->
list_all(PidSpec, ProjType, infinity).
list_all_projections(PidSpec, ProjType) ->
list_all_projections(PidSpec, ProjType, infinity).
list_all(PidSpec, ProjType, Timeout)
list_all_projections(PidSpec, ProjType, Timeout)
when ProjType == 'public' orelse ProjType == 'private' ->
g_call(PidSpec, {list_all, ProjType}, Timeout).
g_call(PidSpec, {list_all_projections, ProjType}, Timeout).
%%%%%%%%%%%%%%%%%%%%%%%%%%%
@ -143,7 +145,7 @@ handle_call({{write, ProjType, Proj}, LC1}, _From, S) ->
LC2 = lclock_update(LC1),
{Reply, NewS} = do_proj_write(ProjType, Proj, S),
{reply, {Reply, LC2}, NewS};
handle_call({{get_all, ProjType}, LC1}, _From, S) ->
handle_call({{get_all_projections, ProjType}, LC1}, _From, S) ->
LC2 = lclock_update(LC1),
Dir = pick_path(ProjType, S),
Epochs = find_all(Dir),
@ -152,7 +154,7 @@ handle_call({{get_all, ProjType}, LC1}, _From, S) ->
Proj
end || Epoch <- Epochs],
{reply, {{ok, All}, LC2}, S};
handle_call({{list_all, ProjType}, LC1}, _From, S) ->
handle_call({{list_all_projections, ProjType}, LC1}, _From, S) ->
LC2 = lclock_update(LC1),
Dir = pick_path(ProjType, S),
{reply, {{ok, find_all(Dir)}, LC2}, S};
@ -205,7 +207,7 @@ do_proj_write(ProjType, #projection_v1{epoch_number=Epoch}=Proj, S) ->
ok = file:write(FH, term_to_binary(Proj)),
ok = file:sync(FH),
ok = file:close(FH),
EpochT = {Epoch, Proj},
EpochT = {Epoch, Proj#projection_v1.epoch_csum},
NewS = if ProjType == public,
Epoch > element(1, S#state.max_public_epoch) ->
io:format(user, "TODO: tell ~p we are wedged by epoch ~p\n", [S#state.wedge_notify_pid, Epoch]),
@ -240,7 +242,7 @@ find_all(Dir) ->
find_max_epoch(Dir) ->
Fs = lists:sort(filelib:wildcard("*", Dir)),
if Fs == [] ->
{-1, <<>>};
?NO_EPOCH;
true ->
EpochNum = name2epoch(lists:last(Fs)),
{{ok, Proj}, _} = do_proj_read(proj_type_ignored, EpochNum, Dir),

View file

@ -0,0 +1,344 @@
%% -------------------------------------------------------------------
%%
%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(machi_proxy_flu1_client).
-behaviour(gen_server).
-include("machi.hrl").
-include("machi_projection.hrl").
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif. % TEST.
-export([start_link/1]).
%% FLU1 API
-export([
%% File API
append_chunk/4, append_chunk/5,
read_chunk/5, read_chunk/6,
checksum_list/3, checksum_list/4,
list_files/2, list_files/3,
%% %% Projection API
get_latest_epoch/2, get_latest_epoch/3,
read_latest_projection/2, read_latest_projection/3,
read_projection/3, read_projection/4,
write_projection/3, write_projection/4,
get_all_projections/2, get_all_projections/3,
list_all_projections/2, list_all_projections/3,
%% Common API
quit/1
]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-define(FLU_C, machi_flu1_client).
-record(state, {
i :: #p_srvr{},
sock :: 'undefined' | port()
}).
start_link(#p_srvr{}=I) ->
gen_server:start_link(?MODULE, [I], []).
append_chunk(PidSpec, EpochID, Prefix, Chunk) ->
append_chunk(PidSpec, EpochID, Prefix, Chunk, infinity).
append_chunk(PidSpec, EpochID, Prefix, Chunk, Timeout) ->
gen_server:call(PidSpec, {req, {append_chunk, EpochID, Prefix, Chunk}},
Timeout).
read_chunk(PidSpec, EpochID, File, Offset, Size) ->
read_chunk(PidSpec, EpochID, File, Offset, Size, infinity).
read_chunk(PidSpec, EpochID, File, Offset, Size, Timeout) ->
gen_server:call(PidSpec, {req, {read_chunk, EpochID, File, Offset, Size}},
Timeout).
checksum_list(PidSpec, EpochID, File) ->
checksum_list(PidSpec, EpochID, File, infinity).
checksum_list(PidSpec, EpochID, File, Timeout) ->
gen_server:call(PidSpec, {req, {checksum_list, EpochID, File}},
Timeout).
list_files(PidSpec, EpochID) ->
list_files(PidSpec, EpochID, infinity).
list_files(PidSpec, EpochID, Timeout) ->
gen_server:call(PidSpec, {req, {list_files, EpochID}},
Timeout).
get_latest_epoch(PidSpec, ProjType) ->
get_latest_epoch(PidSpec, ProjType, infinity).
get_latest_epoch(PidSpec, ProjType, Timeout) ->
gen_server:call(PidSpec, {req, {get_latest_epoch, ProjType}},
Timeout).
read_latest_projection(PidSpec, ProjType) ->
read_latest_projection(PidSpec, ProjType, infinity).
read_latest_projection(PidSpec, ProjType, Timeout) ->
gen_server:call(PidSpec, {req, {read_latest_projection, ProjType}},
Timeout).
read_projection(PidSpec, ProjType, Epoch) ->
read_projection(PidSpec, ProjType, Epoch, infinity).
read_projection(PidSpec, ProjType, Epoch, Timeout) ->
gen_server:call(PidSpec, {req, {read_projection, ProjType, Epoch}},
Timeout).
write_projection(PidSpec, ProjType, Proj) ->
write_projection(PidSpec, ProjType, Proj, infinity).
write_projection(PidSpec, ProjType, Proj, Timeout) ->
gen_server:call(PidSpec, {req, {write_projection, ProjType, Proj}},
Timeout).
get_all_projections(PidSpec, ProjType) ->
get_all_projections(PidSpec, ProjType, infinity).
get_all_projections(PidSpec, ProjType, Timeout) ->
gen_server:call(PidSpec, {req, {get_all_projections, ProjType}},
Timeout).
list_all_projections(PidSpec, ProjType) ->
list_all_projections(PidSpec, ProjType, infinity).
list_all_projections(PidSpec, ProjType, Timeout) ->
gen_server:call(PidSpec, {req, {list_all_projections, ProjType}},
Timeout).
quit(PidSpec) ->
gen_server:call(PidSpec, quit, infinity).
%%%%%%%%%%%%%%%%%%%%%%%%%%%
init([I]) ->
S0 = #state{i=I},
S1 = try_connect(S0),
{ok, S1}.
handle_call({req, Req}, _From, S) ->
{Reply, NewS} = do_req(Req, S),
{reply, Reply, NewS};
handle_call(quit, _From, S) ->
{stop, normal, ok, disconnect(S)};
handle_call(_Request, _From, S) ->
Reply = ok,
{reply, Reply, S}.
handle_cast(_Msg, S) ->
{noreply, S}.
handle_info(_Info, S) ->
{noreply, S}.
terminate(_Reason, _S) ->
ok.
code_change(_OldVsn, S, _Extra) ->
{ok, S}.
%%%%%%%%%%%%%%%%%%%%%%%%%%%
do_req(Req, S) ->
S2 = try_connect(S),
Fun = make_req_fun(Req, S2),
case connected_p(S2) of
true ->
case Fun() of
T when element(1, T) == ok ->
{T, S2};
Else ->
case get(bad_sock) of
Bad when Bad == S2#state.sock ->
{Else, disconnect(S2)};
_ ->
{Else, S2}
end
end;
false ->
{{error, not_connected}, S2}
end.
make_req_fun({append_chunk, EpochID, Prefix, Chunk}, #state{sock=Sock}) ->
fun() -> ?FLU_C:append_chunk(Sock, EpochID, Prefix, Chunk) end;
make_req_fun({read_chunk, EpochID, File, Offset, Size}, #state{sock=Sock}) ->
fun() -> ?FLU_C:read_chunk(Sock, EpochID, File, Offset, Size) end;
make_req_fun({checksum_list, EpochID, File}, #state{sock=Sock}) ->
fun() -> ?FLU_C:checksum_list(Sock, EpochID, File) end;
make_req_fun({list_files, EpochID}, #state{sock=Sock}) ->
fun() -> ?FLU_C:list_files(Sock, EpochID) end;
make_req_fun({get_latest_epoch, ProjType}, #state{sock=Sock}) ->
fun() -> ?FLU_C:get_latest_epoch(Sock, ProjType) end;
make_req_fun({read_latest_projection, ProjType}, #state{sock=Sock}) ->
fun() -> ?FLU_C:read_latest_projection(Sock, ProjType) end;
make_req_fun({read_projection, ProjType, Epoch}, #state{sock=Sock}) ->
fun() -> ?FLU_C:read_projection(Sock, ProjType, Epoch) end;
make_req_fun({write_projection, ProjType, Proj}, #state{sock=Sock}) ->
fun() -> ?FLU_C:write_projection(Sock, ProjType, Proj) end;
make_req_fun({get_all_projections, ProjType}, #state{sock=Sock}) ->
fun() -> ?FLU_C:get_all_projections(Sock, ProjType) end;
make_req_fun({list_all_projections, ProjType}, #state{sock=Sock}) ->
fun() -> ?FLU_C:list_all_projections(Sock, ProjType) end.
connected_p(#state{sock=SockMaybe,
i=#p_srvr{proto=ipv4}=_I}=_S) ->
is_port(SockMaybe);
connected_p(#state{i=#p_srvr{proto=disterl,
name=_NodeName}=_I}=_S) ->
true.
%% case net_adm:ping(NodeName) of
%% ping ->
%% true;
%% _ ->
%% false
%% end.
try_connect(#state{sock=undefined,
i=#p_srvr{proto=ipv4, address=Host, port=TcpPort}=_I}=S) ->
try
Sock = machi_util:connect(Host, TcpPort),
S#state{sock=Sock}
catch
_:_ ->
S
end;
try_connect(S) ->
%% If we're connection-based, we're already connected.
%% If we're not connection-based, then there's nothing to do.
S.
disconnect(#state{sock=Sock,
i=#p_srvr{proto=ipv4}=_I}=S) ->
(catch gen_tcp:close(Sock)),
S#state{sock=undefined};
disconnect(S) ->
S.
%%%%%%%%%%%%%%%%%%%%%%%%%%%
-ifdef(TEST).
dummy_server(Parent, TcpPort) ->
spawn_link(fun() ->
{ok, LSock} = gen_tcp:listen(TcpPort,
[{reuseaddr,true},
{packet, line},
{mode, binary},
{active, false}]),
dummy_ack(Parent),
{ok, Sock} = gen_tcp:accept(LSock),
ok = inet:setopts(Sock, [{packet, line}]),
{ok, _Line} = gen_tcp:recv(Sock, 0),
ok = gen_tcp:send(Sock, "ERROR BADARG\n"),
(catch gen_tcp:close(Sock)),
unlink(Parent),
exit(normal)
end).
dummy_ack(Parent) ->
Parent ! go.
dummy_wait_for_ack() ->
receive go -> ok end.
smoke_test() ->
TcpPort = 57123,
Me = self(),
_ServerPid = dummy_server(Me, TcpPort),
dummy_wait_for_ack(),
I = #p_srvr{name=smoke, proto=ipv4, address="localhost", port=TcpPort},
S0 = #state{i=I},
false = connected_p(S0),
S1 = try_connect(S0),
true = connected_p(S1),
gen_tcp:send(S1#state.sock, "yo dawg\n"),
{ok, _Answer} = gen_tcp:recv(S1#state.sock, 0),
_S2 = disconnect(S1),
ok.
api_smoke_test() ->
RegName = api_smoke_flu,
Host = "localhost",
TcpPort = 57124,
DataDir = "./data.api_smoke_flu",
FLU1 = machi_flu1_test:setup_test_flu(RegName, TcpPort, DataDir),
erase(flu_pid),
try
I = #p_srvr{name=RegName, proto=ipv4, address=Host, port=TcpPort},
{ok, Prox1} = start_link(I),
try
FakeEpoch = {-1, <<0:(20*8)/big>>},
[{ok, {_,_,_}} = append_chunk(Prox1,
FakeEpoch, <<"prefix">>, <<"data">>,
infinity) || _ <- lists:seq(1,5)],
%% Stop the FLU, what happens?
machi_flu1:stop(FLU1),
{error,_} = append_chunk(Prox1,
FakeEpoch, <<"prefix">>, <<"data">>,
infinity),
{error,not_connected} = append_chunk(Prox1,
FakeEpoch, <<"prefix">>, <<"data">>,
infinity),
%% Start the FLU again, we should be able to do stuff immediately
FLU1b = machi_flu1_test:setup_test_flu(RegName, TcpPort, DataDir,
[save_data_dir]),
put(flu_pid, FLU1b),
MyChunk = <<"my chunk data">>,
{ok, {MyOff,MySize,MyFile}} =
append_chunk(Prox1, FakeEpoch, <<"prefix">>, MyChunk,
infinity),
{ok, MyChunk} = read_chunk(Prox1, FakeEpoch, MyFile, MyOff, MySize),
%% Alright, now for the rest of the API, whee
BadFile = <<"no-such-file">>,
{error, no_such_file} = checksum_list(Prox1, FakeEpoch, BadFile),
{ok, [_]} = list_files(Prox1, FakeEpoch),
{ok, FakeEpoch} = get_latest_epoch(Prox1, public),
{error, not_written} = read_latest_projection(Prox1, public),
{error, not_written} = read_projection(Prox1, public, 44),
P1 = machi_projection:new(1, a, [a], [], [a], [], []),
ok = write_projection(Prox1, public, P1),
{ok, P1} = read_projection(Prox1, public, 1),
{ok, [P1]} = get_all_projections(Prox1, public),
{ok, [1]} = list_all_projections(Prox1, public),
ok
after
_ = (catch quit(Prox1))
end
after
(catch machi_flu1:stop(FLU1)),
(catch machi_flu1:stop(get(flu_pid)))
end.
-endif. % TEST

View file

@ -31,7 +31,7 @@
read_max_filenum/2, increment_max_filenum/2,
info_msg/2, verb/1, verb/2,
%% TCP protocol helpers
connect/2
connect/2, connect/3
]).
-compile(export_all).
@ -168,13 +168,19 @@ info_msg(Fmt, Args) ->
-spec connect(inet:ip_address() | inet:hostname(), inet:port_number()) ->
port().
connect(Host, Port) ->
escript_connect(Host, Port).
escript_connect(Host, Port, 4500).
escript_connect(Host, PortStr) when is_list(PortStr) ->
-spec connect(inet:ip_address() | inet:hostname(), inet:port_number(),
timeout()) ->
port().
connect(Host, Port, Timeout) ->
escript_connect(Host, Port, Timeout).
escript_connect(Host, PortStr, Timeout) when is_list(PortStr) ->
Port = list_to_integer(PortStr),
escript_connect(Host, Port);
escript_connect(Host, Port) when is_integer(Port) ->
escript_connect(Host, Port, Timeout);
escript_connect(Host, Port, Timeout) when is_integer(Port) ->
{ok, Sock} = gen_tcp:connect(Host, Port, [{active,false}, {mode,binary},
{packet, raw}]),
{packet, raw}], Timeout),
Sock.

View file

@ -133,13 +133,13 @@ chain_to_projection(MyName, Epoch, UPI_list, Repairing_list, All_list) ->
-ifndef(PULSE).
smoke0_test() ->
smoke0_testXXX() ->
{ok, _} = machi_partition_simulator:start_link({1,2,3}, 50, 50),
Host = "localhost",
TcpPort = 6623,
{ok, FLUa} = machi_flu1:start_link([{a,TcpPort,"./data.a"}]),
{ok, M0} = ?MGR:start_link(a, [a,b,c], a),
SockA = machi_util:connect(Host, TcpPort),
_SockA = machi_util:connect(Host, TcpPort),
try
pong = ?MGR:ping(M0)
after

View file

@ -33,7 +33,12 @@ setup_test_flu(RegName, TcpPort, DataDir) ->
setup_test_flu(RegName, TcpPort, DataDir, []).
setup_test_flu(RegName, TcpPort, DataDir, DbgProps) ->
clean_up_data_dir(DataDir),
case proplists:get_value(save_data_dir, DbgProps) of
true ->
ok;
_ ->
clean_up_data_dir(DataDir)
end,
{ok, FLU1} = ?FLU:start_link([{RegName, TcpPort, DataDir},
{dbg, DbgProps}]),
@ -128,8 +133,8 @@ flu_projection_smoke_test() ->
{ok, {-1,_}} = ?FLU_C:get_latest_epoch(Host, TcpPort, T),
{error, not_written} =
?FLU_C:read_latest_projection(Host, TcpPort, T),
{ok, []} = ?FLU_C:list_all(Host, TcpPort, T),
{ok, []} = ?FLU_C:get_all(Host, TcpPort, T),
{ok, []} = ?FLU_C:list_all_projections(Host, TcpPort, T),
{ok, []} = ?FLU_C:get_all_projections(Host, TcpPort, T),
P1 = machi_projection:new(1, a, [a], [], [a], [], []),
ok = ?FLU_C:write_projection(Host, TcpPort, T, P1),
@ -137,8 +142,8 @@ flu_projection_smoke_test() ->
{ok, P1} = ?FLU_C:read_projection(Host, TcpPort, T, 1),
{ok, {1,_}} = ?FLU_C:get_latest_epoch(Host, TcpPort, T),
{ok, P1} = ?FLU_C:read_latest_projection(Host, TcpPort, T),
{ok, [1]} = ?FLU_C:list_all(Host, TcpPort, T),
{ok, [P1]} = ?FLU_C:get_all(Host, TcpPort, T),
{ok, [1]} = ?FLU_C:list_all_projections(Host, TcpPort, T),
{ok, [P1]} = ?FLU_C:get_all_projections(Host, TcpPort, T),
{error, not_written} = ?FLU_C:read_projection(Host, TcpPort, T, 2)
end || T <- [public, private] ]
after