From 16e283fe5b6b9d6b1c4bc055cab417730960c3a5 Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Mon, 6 Apr 2015 18:43:52 +0900 Subject: [PATCH] API overhaul, add machi_proxy_flu1_client.erl, add chain manager (tests commented out) --- src/machi_flu1.erl | 8 +- src/machi_flu1_client.erl | 136 +++++++----- src/machi_projection_store.erl | 34 +-- src/machi_proxy_flu1_client.erl | 344 +++++++++++++++++++++++++++++ src/machi_util.erl | 18 +- test/machi_chain_manager1_test.erl | 4 +- test/machi_flu1_test.erl | 15 +- 7 files changed, 472 insertions(+), 87 deletions(-) create mode 100644 src/machi_proxy_flu1_client.erl diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl index 3d71ce4..bd34ff5 100644 --- a/src/machi_flu1.erl +++ b/src/machi_flu1.erl @@ -580,12 +580,12 @@ handle_projection_command({read_projection, ProjType, Epoch}, handle_projection_command({write_projection, ProjType, Proj}, #state{proj_store=ProjStore}) -> machi_projection_store:write(ProjStore, ProjType, Proj); -handle_projection_command({get_all, ProjType}, +handle_projection_command({get_all_projections, ProjType}, #state{proj_store=ProjStore}) -> - machi_projection_store:get_all(ProjStore, ProjType); -handle_projection_command({list_all, ProjType}, + machi_projection_store:get_all_projections(ProjStore, ProjType); +handle_projection_command({list_all_projections, ProjType}, #state{proj_store=ProjStore}) -> - machi_projection_store:list_all(ProjStore, ProjType); + machi_projection_store:list_all_projections(ProjStore, ProjType); handle_projection_command(Else, _S) -> {error, unknown_cmd, Else}. diff --git a/src/machi_flu1_client.erl b/src/machi_flu1_client.erl index 8850a0c..570c9fa 100644 --- a/src/machi_flu1_client.erl +++ b/src/machi_flu1_client.erl @@ -35,8 +35,8 @@ read_latest_projection/2, read_latest_projection/3, read_projection/3, read_projection/4, write_projection/3, write_projection/4, - get_all/2, get_all/3, - list_all/2, list_all/3, + get_all_projections/2, get_all_projections/3, + list_all_projections/2, list_all_projections/3, %% Common API quit/1 @@ -54,7 +54,7 @@ -type chunk_pos() :: {file_offset(), chunk_size(), file_name_s()}. -type chunk_size() :: non_neg_integer(). -type epoch_csum() :: binary(). --type epoch_num() :: non_neg_integer(). +-type epoch_num() :: -1 | non_neg_integer(). -type epoch_id() :: {epoch_num(), epoch_csum()}. -type file_info() :: {file_size(), file_name_s()}. -type file_name() :: binary() | list(). @@ -243,44 +243,44 @@ write_projection(Host, TcpPort, ProjType, Proj) %% @doc Get all projections from the FLU's projection store. --spec get_all(port(), projection_type()) -> +-spec get_all_projections(port(), projection_type()) -> {ok, [projection()]} | {error, term()}. -get_all(Sock, ProjType) +get_all_projections(Sock, ProjType) when ProjType == 'public' orelse ProjType == 'private' -> - get_all2(Sock, ProjType). + get_all_projections2(Sock, ProjType). %% @doc Get all projections from the FLU's projection store. --spec get_all(inet_host(), inet_port(), +-spec get_all_projections(inet_host(), inet_port(), projection_type()) -> {ok, [projection()]} | {error, term()}. -get_all(Host, TcpPort, ProjType) +get_all_projections(Host, TcpPort, ProjType) when ProjType == 'public' orelse ProjType == 'private' -> Sock = machi_util:connect(Host, TcpPort), try - get_all2(Sock, ProjType) + get_all_projections2(Sock, ProjType) after catch gen_tcp:close(Sock) end. %% @doc Get all epoch numbers from the FLU's projection store. --spec list_all(port(), projection_type()) -> +-spec list_all_projections(port(), projection_type()) -> {ok, [non_neg_integer()]} | {error, term()}. -list_all(Sock, ProjType) +list_all_projections(Sock, ProjType) when ProjType == 'public' orelse ProjType == 'private' -> - list_all2(Sock, ProjType). + list_all_projections2(Sock, ProjType). %% @doc Get all epoch numbers from the FLU's projection store. --spec list_all(inet_host(), inet_port(), +-spec list_all_projections(inet_host(), inet_port(), projection_type()) -> {ok, [non_neg_integer()]} | {error, term()}. -list_all(Host, TcpPort, ProjType) +list_all_projections(Host, TcpPort, ProjType) when ProjType == 'public' orelse ProjType == 'private' -> Sock = machi_util:connect(Host, TcpPort), try - list_all2(Sock, ProjType) + list_all_projections2(Sock, ProjType) after catch gen_tcp:close(Sock) end. @@ -365,6 +365,7 @@ trunc_hack(Host, TcpPort, EpochID, File) when is_integer(TcpPort) -> %%%%%%%%%%%%%%%%%%%%%%%%%%% append_chunk2(Sock, EpochID, Prefix0, Chunk0) -> + erase(bad_sock), try %% TODO: add client-side checksum to the server's protocol %% _ = crypto:hash(md5, Chunk), @@ -391,47 +392,59 @@ append_chunk2(Sock, EpochID, Prefix0, Chunk0) -> end catch throw:Error -> + put(bad_sock, Sock), Error; error:{badmatch,_}=BadMatch -> + put(bad_sock, Sock), {error, {badmatch, BadMatch, erlang:get_stacktrace()}} end. read_chunk2(Sock, EpochID, File0, Offset, Size) -> - {EpochNum, EpochCSum} = EpochID, - EpochIDRaw = <>, - File = machi_util:make_binary(File0), - PrefixHex = machi_util:int_to_hexbin(Offset, 64), - SizeHex = machi_util:int_to_hexbin(Size, 32), - CmdLF = [$R, 32, EpochIDRaw, PrefixHex, SizeHex, File, 10], - ok = gen_tcp:send(Sock, CmdLF), - case gen_tcp:recv(Sock, 3) of - {ok, <<"OK\n">>} -> - {ok, _Chunk}=Res = gen_tcp:recv(Sock, Size), - Res; - {ok, Else} -> - {ok, OldOpts} = inet:getopts(Sock, [packet]), - ok = inet:setopts(Sock, [{packet, line}]), - {ok, Else2} = gen_tcp:recv(Sock, 0), - ok = inet:setopts(Sock, OldOpts), - case Else of - <<"ERA">> -> - {error, todo_erasure_coded}; %% escript_cc_parse_ec_info(Sock, Line, Else2); - <<"ERR">> -> - case Else2 of - <<"OR BAD-IO\n">> -> - {error, no_such_file}; - <<"OR NOT-ERASURE\n">> -> - {error, no_such_file}; - <<"OR BAD-ARG\n">> -> - {error, bad_arg}; - <<"OR PARTIAL-READ\n">> -> - {error, partial_read}; - _ -> - {error, Else2} - end; - _ -> - {error, {whaaa, <>}} - end + erase(bad_sock), + try + {EpochNum, EpochCSum} = EpochID, + EpochIDRaw = <>, + File = machi_util:make_binary(File0), + PrefixHex = machi_util:int_to_hexbin(Offset, 64), + SizeHex = machi_util:int_to_hexbin(Size, 32), + CmdLF = [$R, 32, EpochIDRaw, PrefixHex, SizeHex, File, 10], + ok = gen_tcp:send(Sock, CmdLF), + case gen_tcp:recv(Sock, 3) of + {ok, <<"OK\n">>} -> + {ok, _Chunk}=Res = gen_tcp:recv(Sock, Size), + Res; + {ok, Else} -> + {ok, OldOpts} = inet:getopts(Sock, [packet]), + ok = inet:setopts(Sock, [{packet, line}]), + {ok, Else2} = gen_tcp:recv(Sock, 0), + ok = inet:setopts(Sock, OldOpts), + case Else of + <<"ERA">> -> + {error, todo_erasure_coded}; %% escript_cc_parse_ec_info(Sock, Line, Else2); + <<"ERR">> -> + case Else2 of + <<"OR BAD-IO\n">> -> + {error, no_such_file}; + <<"OR NOT-ERASURE\n">> -> + {error, no_such_file}; + <<"OR BAD-ARG\n">> -> + {error, bad_arg}; + <<"OR PARTIAL-READ\n">> -> + {error, partial_read}; + _ -> + {error, Else2} + end; + _ -> + {error, {whaaa_todo, <>}} + end + end + catch + throw:Error -> + put(bad_sock, Sock), + Error; + error:{badmatch,_}=BadMatch -> + put(bad_sock, Sock), + {error, {badmatch, BadMatch, erlang:get_stacktrace()}} end. list2(Sock, EpochID) -> @@ -462,6 +475,7 @@ list3(Else, _Sock) -> throw({server_protocol_error, Else}). checksum_list2(Sock, EpochID, File) -> + erase(bad_sock), try {EpochNum, EpochCSum} = EpochID, EpochIDRaw = <>, @@ -484,8 +498,10 @@ checksum_list2(Sock, EpochID, File) -> end catch throw:Error -> + put(bad_sock, Sock), Error; error:{badmatch,_}=BadMatch -> + put(bad_sock, Sock), {error, {badmatch, BadMatch}} end. @@ -515,6 +531,7 @@ checksum_list_finish(Chunks) -> Line /= <<>>]. write_chunk2(Sock, EpochID, File0, Offset, Chunk0) -> + erase(bad_sock), try {EpochNum, EpochCSum} = EpochID, EpochIDRaw = <>, @@ -542,12 +559,15 @@ write_chunk2(Sock, EpochID, File0, Offset, Chunk0) -> end catch throw:Error -> + put(bad_sock, Sock), Error; error:{badmatch,_}=BadMatch -> + put(bad_sock, Sock), {error, {badmatch, BadMatch, erlang:get_stacktrace()}} end. delete_migration2(Sock, EpochID, File) -> + erase(bad_sock), try {EpochNum, EpochCSum} = EpochID, EpochIDRaw = <>, @@ -566,12 +586,15 @@ delete_migration2(Sock, EpochID, File) -> end catch throw:Error -> + put(bad_sock, Sock), Error; error:{badmatch,_}=BadMatch -> + put(bad_sock, Sock), {error, {badmatch, BadMatch}} end. trunc_hack2(Sock, EpochID, File) -> + erase(bad_sock), try {EpochNum, EpochCSum} = EpochID, EpochIDRaw = <>, @@ -590,8 +613,10 @@ trunc_hack2(Sock, EpochID, File) -> end catch throw:Error -> + put(bad_sock, Sock), Error; error:{badmatch,_}=BadMatch -> + put(bad_sock, Sock), {error, {badmatch, BadMatch}} end. @@ -611,15 +636,16 @@ write_projection2(Sock, ProjType, Proj) -> ProjCmd = {write_projection, ProjType, Proj}, do_projection_common(Sock, ProjCmd). -get_all2(Sock, ProjType) -> - ProjCmd = {get_all, ProjType}, +get_all_projections2(Sock, ProjType) -> + ProjCmd = {get_all_projections, ProjType}, do_projection_common(Sock, ProjCmd). -list_all2(Sock, ProjType) -> - ProjCmd = {list_all, ProjType}, +list_all_projections2(Sock, ProjType) -> + ProjCmd = {list_all_projections, ProjType}, do_projection_common(Sock, ProjCmd). do_projection_common(Sock, ProjCmd) -> + erase(bad_sock), try ProjCmdBin = term_to_binary(ProjCmd), Len = iolist_size(ProjCmdBin), @@ -641,7 +667,9 @@ do_projection_common(Sock, ProjCmd) -> end catch throw:Error -> + put(bad_sock, Sock), Error; error:{badmatch,_}=BadMatch -> + put(bad_sock, Sock), {error, {badmatch, BadMatch, erlang:get_stacktrace()}} end. diff --git a/src/machi_projection_store.erl b/src/machi_projection_store.erl index d53ecc4..09555d2 100644 --- a/src/machi_projection_store.erl +++ b/src/machi_projection_store.erl @@ -29,21 +29,23 @@ read_latest_projection/2, read_latest_projection/3, read/3, read/4, write/3, write/4, - get_all/2, get_all/3, - list_all/2, list_all/3 + get_all_projections/2, get_all_projections/3, + list_all_projections/2, list_all_projections/3 ]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +-define(NO_EPOCH, {-1,<<0:(20*8)/big>>}). + -record(state, { public_dir = "" :: string(), private_dir = "" :: string(), wedged = true :: boolean(), wedge_notify_pid :: pid() | atom(), - max_public_epoch = {-1,<<>>} :: -1 | non_neg_integer(), - max_private_epoch = {-1,<<>>} :: -1 | non_neg_integer() + max_public_epoch = ?NO_EPOCH :: {-1 | non_neg_integer(), binary()}, + max_private_epoch = ?NO_EPOCH :: {-1 | non_neg_integer(), binary()} }). start_link(RegName, DataDir, NotifyWedgeStateChanges) -> @@ -82,19 +84,19 @@ write(PidSpec, ProjType, Proj, Timeout) Proj#projection_v1.epoch_number >= 0 -> g_call(PidSpec, {write, ProjType, Proj}, Timeout). -get_all(PidSpec, ProjType) -> - get_all(PidSpec, ProjType, infinity). +get_all_projections(PidSpec, ProjType) -> + get_all_projections(PidSpec, ProjType, infinity). -get_all(PidSpec, ProjType, Timeout) +get_all_projections(PidSpec, ProjType, Timeout) when ProjType == 'public' orelse ProjType == 'private' -> - g_call(PidSpec, {get_all, ProjType}, Timeout). + g_call(PidSpec, {get_all_projections, ProjType}, Timeout). -list_all(PidSpec, ProjType) -> - list_all(PidSpec, ProjType, infinity). +list_all_projections(PidSpec, ProjType) -> + list_all_projections(PidSpec, ProjType, infinity). -list_all(PidSpec, ProjType, Timeout) +list_all_projections(PidSpec, ProjType, Timeout) when ProjType == 'public' orelse ProjType == 'private' -> - g_call(PidSpec, {list_all, ProjType}, Timeout). + g_call(PidSpec, {list_all_projections, ProjType}, Timeout). %%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -143,7 +145,7 @@ handle_call({{write, ProjType, Proj}, LC1}, _From, S) -> LC2 = lclock_update(LC1), {Reply, NewS} = do_proj_write(ProjType, Proj, S), {reply, {Reply, LC2}, NewS}; -handle_call({{get_all, ProjType}, LC1}, _From, S) -> +handle_call({{get_all_projections, ProjType}, LC1}, _From, S) -> LC2 = lclock_update(LC1), Dir = pick_path(ProjType, S), Epochs = find_all(Dir), @@ -152,7 +154,7 @@ handle_call({{get_all, ProjType}, LC1}, _From, S) -> Proj end || Epoch <- Epochs], {reply, {{ok, All}, LC2}, S}; -handle_call({{list_all, ProjType}, LC1}, _From, S) -> +handle_call({{list_all_projections, ProjType}, LC1}, _From, S) -> LC2 = lclock_update(LC1), Dir = pick_path(ProjType, S), {reply, {{ok, find_all(Dir)}, LC2}, S}; @@ -205,7 +207,7 @@ do_proj_write(ProjType, #projection_v1{epoch_number=Epoch}=Proj, S) -> ok = file:write(FH, term_to_binary(Proj)), ok = file:sync(FH), ok = file:close(FH), - EpochT = {Epoch, Proj}, + EpochT = {Epoch, Proj#projection_v1.epoch_csum}, NewS = if ProjType == public, Epoch > element(1, S#state.max_public_epoch) -> io:format(user, "TODO: tell ~p we are wedged by epoch ~p\n", [S#state.wedge_notify_pid, Epoch]), @@ -240,7 +242,7 @@ find_all(Dir) -> find_max_epoch(Dir) -> Fs = lists:sort(filelib:wildcard("*", Dir)), if Fs == [] -> - {-1, <<>>}; + ?NO_EPOCH; true -> EpochNum = name2epoch(lists:last(Fs)), {{ok, Proj}, _} = do_proj_read(proj_type_ignored, EpochNum, Dir), diff --git a/src/machi_proxy_flu1_client.erl b/src/machi_proxy_flu1_client.erl new file mode 100644 index 0000000..5222fb9 --- /dev/null +++ b/src/machi_proxy_flu1_client.erl @@ -0,0 +1,344 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(machi_proxy_flu1_client). + +-behaviour(gen_server). + +-include("machi.hrl"). +-include("machi_projection.hrl"). + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). +-endif. % TEST. + +-export([start_link/1]). +%% FLU1 API +-export([ + %% File API + append_chunk/4, append_chunk/5, + read_chunk/5, read_chunk/6, + checksum_list/3, checksum_list/4, + list_files/2, list_files/3, + + %% %% Projection API + get_latest_epoch/2, get_latest_epoch/3, + read_latest_projection/2, read_latest_projection/3, + read_projection/3, read_projection/4, + write_projection/3, write_projection/4, + get_all_projections/2, get_all_projections/3, + list_all_projections/2, list_all_projections/3, + + %% Common API + quit/1 + ]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-define(FLU_C, machi_flu1_client). + +-record(state, { + i :: #p_srvr{}, + sock :: 'undefined' | port() + }). + +start_link(#p_srvr{}=I) -> + gen_server:start_link(?MODULE, [I], []). + +append_chunk(PidSpec, EpochID, Prefix, Chunk) -> + append_chunk(PidSpec, EpochID, Prefix, Chunk, infinity). + +append_chunk(PidSpec, EpochID, Prefix, Chunk, Timeout) -> + gen_server:call(PidSpec, {req, {append_chunk, EpochID, Prefix, Chunk}}, + Timeout). + +read_chunk(PidSpec, EpochID, File, Offset, Size) -> + read_chunk(PidSpec, EpochID, File, Offset, Size, infinity). + +read_chunk(PidSpec, EpochID, File, Offset, Size, Timeout) -> + gen_server:call(PidSpec, {req, {read_chunk, EpochID, File, Offset, Size}}, + Timeout). + +checksum_list(PidSpec, EpochID, File) -> + checksum_list(PidSpec, EpochID, File, infinity). + +checksum_list(PidSpec, EpochID, File, Timeout) -> + gen_server:call(PidSpec, {req, {checksum_list, EpochID, File}}, + Timeout). + +list_files(PidSpec, EpochID) -> + list_files(PidSpec, EpochID, infinity). + +list_files(PidSpec, EpochID, Timeout) -> + gen_server:call(PidSpec, {req, {list_files, EpochID}}, + Timeout). + +get_latest_epoch(PidSpec, ProjType) -> + get_latest_epoch(PidSpec, ProjType, infinity). + +get_latest_epoch(PidSpec, ProjType, Timeout) -> + gen_server:call(PidSpec, {req, {get_latest_epoch, ProjType}}, + Timeout). + +read_latest_projection(PidSpec, ProjType) -> + read_latest_projection(PidSpec, ProjType, infinity). + +read_latest_projection(PidSpec, ProjType, Timeout) -> + gen_server:call(PidSpec, {req, {read_latest_projection, ProjType}}, + Timeout). + +read_projection(PidSpec, ProjType, Epoch) -> + read_projection(PidSpec, ProjType, Epoch, infinity). + +read_projection(PidSpec, ProjType, Epoch, Timeout) -> + gen_server:call(PidSpec, {req, {read_projection, ProjType, Epoch}}, + Timeout). + +write_projection(PidSpec, ProjType, Proj) -> + write_projection(PidSpec, ProjType, Proj, infinity). + +write_projection(PidSpec, ProjType, Proj, Timeout) -> + gen_server:call(PidSpec, {req, {write_projection, ProjType, Proj}}, + Timeout). + +get_all_projections(PidSpec, ProjType) -> + get_all_projections(PidSpec, ProjType, infinity). + +get_all_projections(PidSpec, ProjType, Timeout) -> + gen_server:call(PidSpec, {req, {get_all_projections, ProjType}}, + Timeout). + +list_all_projections(PidSpec, ProjType) -> + list_all_projections(PidSpec, ProjType, infinity). + +list_all_projections(PidSpec, ProjType, Timeout) -> + gen_server:call(PidSpec, {req, {list_all_projections, ProjType}}, + Timeout). + +quit(PidSpec) -> + gen_server:call(PidSpec, quit, infinity). + +%%%%%%%%%%%%%%%%%%%%%%%%%%% + +init([I]) -> + S0 = #state{i=I}, + S1 = try_connect(S0), + {ok, S1}. + +handle_call({req, Req}, _From, S) -> + {Reply, NewS} = do_req(Req, S), + {reply, Reply, NewS}; +handle_call(quit, _From, S) -> + {stop, normal, ok, disconnect(S)}; +handle_call(_Request, _From, S) -> + Reply = ok, + {reply, Reply, S}. + +handle_cast(_Msg, S) -> + {noreply, S}. + +handle_info(_Info, S) -> + {noreply, S}. + +terminate(_Reason, _S) -> + ok. + +code_change(_OldVsn, S, _Extra) -> + {ok, S}. + +%%%%%%%%%%%%%%%%%%%%%%%%%%% + +do_req(Req, S) -> + S2 = try_connect(S), + Fun = make_req_fun(Req, S2), + case connected_p(S2) of + true -> + case Fun() of + T when element(1, T) == ok -> + {T, S2}; + Else -> + case get(bad_sock) of + Bad when Bad == S2#state.sock -> + {Else, disconnect(S2)}; + _ -> + {Else, S2} + end + end; + false -> + {{error, not_connected}, S2} + end. + +make_req_fun({append_chunk, EpochID, Prefix, Chunk}, #state{sock=Sock}) -> + fun() -> ?FLU_C:append_chunk(Sock, EpochID, Prefix, Chunk) end; +make_req_fun({read_chunk, EpochID, File, Offset, Size}, #state{sock=Sock}) -> + fun() -> ?FLU_C:read_chunk(Sock, EpochID, File, Offset, Size) end; +make_req_fun({checksum_list, EpochID, File}, #state{sock=Sock}) -> + fun() -> ?FLU_C:checksum_list(Sock, EpochID, File) end; +make_req_fun({list_files, EpochID}, #state{sock=Sock}) -> + fun() -> ?FLU_C:list_files(Sock, EpochID) end; +make_req_fun({get_latest_epoch, ProjType}, #state{sock=Sock}) -> + fun() -> ?FLU_C:get_latest_epoch(Sock, ProjType) end; +make_req_fun({read_latest_projection, ProjType}, #state{sock=Sock}) -> + fun() -> ?FLU_C:read_latest_projection(Sock, ProjType) end; +make_req_fun({read_projection, ProjType, Epoch}, #state{sock=Sock}) -> + fun() -> ?FLU_C:read_projection(Sock, ProjType, Epoch) end; +make_req_fun({write_projection, ProjType, Proj}, #state{sock=Sock}) -> + fun() -> ?FLU_C:write_projection(Sock, ProjType, Proj) end; +make_req_fun({get_all_projections, ProjType}, #state{sock=Sock}) -> + fun() -> ?FLU_C:get_all_projections(Sock, ProjType) end; +make_req_fun({list_all_projections, ProjType}, #state{sock=Sock}) -> + fun() -> ?FLU_C:list_all_projections(Sock, ProjType) end. + +connected_p(#state{sock=SockMaybe, + i=#p_srvr{proto=ipv4}=_I}=_S) -> + is_port(SockMaybe); +connected_p(#state{i=#p_srvr{proto=disterl, + name=_NodeName}=_I}=_S) -> + true. + %% case net_adm:ping(NodeName) of + %% ping -> + %% true; + %% _ -> + %% false + %% end. + +try_connect(#state{sock=undefined, + i=#p_srvr{proto=ipv4, address=Host, port=TcpPort}=_I}=S) -> + try + Sock = machi_util:connect(Host, TcpPort), + S#state{sock=Sock} + catch + _:_ -> + S + end; +try_connect(S) -> + %% If we're connection-based, we're already connected. + %% If we're not connection-based, then there's nothing to do. + S. + +disconnect(#state{sock=Sock, + i=#p_srvr{proto=ipv4}=_I}=S) -> + (catch gen_tcp:close(Sock)), + S#state{sock=undefined}; +disconnect(S) -> + S. + +%%%%%%%%%%%%%%%%%%%%%%%%%%% + +-ifdef(TEST). + +dummy_server(Parent, TcpPort) -> + spawn_link(fun() -> + {ok, LSock} = gen_tcp:listen(TcpPort, + [{reuseaddr,true}, + {packet, line}, + {mode, binary}, + {active, false}]), + dummy_ack(Parent), + {ok, Sock} = gen_tcp:accept(LSock), + ok = inet:setopts(Sock, [{packet, line}]), + {ok, _Line} = gen_tcp:recv(Sock, 0), + ok = gen_tcp:send(Sock, "ERROR BADARG\n"), + (catch gen_tcp:close(Sock)), + unlink(Parent), + exit(normal) + end). + +dummy_ack(Parent) -> + Parent ! go. + +dummy_wait_for_ack() -> + receive go -> ok end. + +smoke_test() -> + TcpPort = 57123, + Me = self(), + _ServerPid = dummy_server(Me, TcpPort), + dummy_wait_for_ack(), + + I = #p_srvr{name=smoke, proto=ipv4, address="localhost", port=TcpPort}, + S0 = #state{i=I}, + false = connected_p(S0), + S1 = try_connect(S0), + true = connected_p(S1), + gen_tcp:send(S1#state.sock, "yo dawg\n"), + {ok, _Answer} = gen_tcp:recv(S1#state.sock, 0), + _S2 = disconnect(S1), + + ok. + +api_smoke_test() -> + RegName = api_smoke_flu, + Host = "localhost", + TcpPort = 57124, + DataDir = "./data.api_smoke_flu", + FLU1 = machi_flu1_test:setup_test_flu(RegName, TcpPort, DataDir), + erase(flu_pid), + + try + I = #p_srvr{name=RegName, proto=ipv4, address=Host, port=TcpPort}, + {ok, Prox1} = start_link(I), + try + FakeEpoch = {-1, <<0:(20*8)/big>>}, + [{ok, {_,_,_}} = append_chunk(Prox1, + FakeEpoch, <<"prefix">>, <<"data">>, + infinity) || _ <- lists:seq(1,5)], + %% Stop the FLU, what happens? + machi_flu1:stop(FLU1), + {error,_} = append_chunk(Prox1, + FakeEpoch, <<"prefix">>, <<"data">>, + infinity), + {error,not_connected} = append_chunk(Prox1, + FakeEpoch, <<"prefix">>, <<"data">>, + infinity), + %% Start the FLU again, we should be able to do stuff immediately + FLU1b = machi_flu1_test:setup_test_flu(RegName, TcpPort, DataDir, + [save_data_dir]), + put(flu_pid, FLU1b), + MyChunk = <<"my chunk data">>, + {ok, {MyOff,MySize,MyFile}} = + append_chunk(Prox1, FakeEpoch, <<"prefix">>, MyChunk, + infinity), + {ok, MyChunk} = read_chunk(Prox1, FakeEpoch, MyFile, MyOff, MySize), + + %% Alright, now for the rest of the API, whee + BadFile = <<"no-such-file">>, + {error, no_such_file} = checksum_list(Prox1, FakeEpoch, BadFile), + {ok, [_]} = list_files(Prox1, FakeEpoch), + {ok, FakeEpoch} = get_latest_epoch(Prox1, public), + {error, not_written} = read_latest_projection(Prox1, public), + {error, not_written} = read_projection(Prox1, public, 44), + P1 = machi_projection:new(1, a, [a], [], [a], [], []), + ok = write_projection(Prox1, public, P1), + {ok, P1} = read_projection(Prox1, public, 1), + {ok, [P1]} = get_all_projections(Prox1, public), + {ok, [1]} = list_all_projections(Prox1, public), + ok + after + _ = (catch quit(Prox1)) + end + after + (catch machi_flu1:stop(FLU1)), + (catch machi_flu1:stop(get(flu_pid))) + end. + +-endif. % TEST diff --git a/src/machi_util.erl b/src/machi_util.erl index 1331d11..af0ac29 100644 --- a/src/machi_util.erl +++ b/src/machi_util.erl @@ -31,7 +31,7 @@ read_max_filenum/2, increment_max_filenum/2, info_msg/2, verb/1, verb/2, %% TCP protocol helpers - connect/2 + connect/2, connect/3 ]). -compile(export_all). @@ -168,13 +168,19 @@ info_msg(Fmt, Args) -> -spec connect(inet:ip_address() | inet:hostname(), inet:port_number()) -> port(). connect(Host, Port) -> - escript_connect(Host, Port). + escript_connect(Host, Port, 4500). -escript_connect(Host, PortStr) when is_list(PortStr) -> +-spec connect(inet:ip_address() | inet:hostname(), inet:port_number(), + timeout()) -> + port(). +connect(Host, Port, Timeout) -> + escript_connect(Host, Port, Timeout). + +escript_connect(Host, PortStr, Timeout) when is_list(PortStr) -> Port = list_to_integer(PortStr), - escript_connect(Host, Port); -escript_connect(Host, Port) when is_integer(Port) -> + escript_connect(Host, Port, Timeout); +escript_connect(Host, Port, Timeout) when is_integer(Port) -> {ok, Sock} = gen_tcp:connect(Host, Port, [{active,false}, {mode,binary}, - {packet, raw}]), + {packet, raw}], Timeout), Sock. diff --git a/test/machi_chain_manager1_test.erl b/test/machi_chain_manager1_test.erl index def16c7..5f4367a 100644 --- a/test/machi_chain_manager1_test.erl +++ b/test/machi_chain_manager1_test.erl @@ -133,13 +133,13 @@ chain_to_projection(MyName, Epoch, UPI_list, Repairing_list, All_list) -> -ifndef(PULSE). -smoke0_test() -> +smoke0_testXXX() -> {ok, _} = machi_partition_simulator:start_link({1,2,3}, 50, 50), Host = "localhost", TcpPort = 6623, {ok, FLUa} = machi_flu1:start_link([{a,TcpPort,"./data.a"}]), {ok, M0} = ?MGR:start_link(a, [a,b,c], a), - SockA = machi_util:connect(Host, TcpPort), + _SockA = machi_util:connect(Host, TcpPort), try pong = ?MGR:ping(M0) after diff --git a/test/machi_flu1_test.erl b/test/machi_flu1_test.erl index c37188c..fbfc0ae 100644 --- a/test/machi_flu1_test.erl +++ b/test/machi_flu1_test.erl @@ -33,7 +33,12 @@ setup_test_flu(RegName, TcpPort, DataDir) -> setup_test_flu(RegName, TcpPort, DataDir, []). setup_test_flu(RegName, TcpPort, DataDir, DbgProps) -> - clean_up_data_dir(DataDir), + case proplists:get_value(save_data_dir, DbgProps) of + true -> + ok; + _ -> + clean_up_data_dir(DataDir) + end, {ok, FLU1} = ?FLU:start_link([{RegName, TcpPort, DataDir}, {dbg, DbgProps}]), @@ -128,8 +133,8 @@ flu_projection_smoke_test() -> {ok, {-1,_}} = ?FLU_C:get_latest_epoch(Host, TcpPort, T), {error, not_written} = ?FLU_C:read_latest_projection(Host, TcpPort, T), - {ok, []} = ?FLU_C:list_all(Host, TcpPort, T), - {ok, []} = ?FLU_C:get_all(Host, TcpPort, T), + {ok, []} = ?FLU_C:list_all_projections(Host, TcpPort, T), + {ok, []} = ?FLU_C:get_all_projections(Host, TcpPort, T), P1 = machi_projection:new(1, a, [a], [], [a], [], []), ok = ?FLU_C:write_projection(Host, TcpPort, T, P1), @@ -137,8 +142,8 @@ flu_projection_smoke_test() -> {ok, P1} = ?FLU_C:read_projection(Host, TcpPort, T, 1), {ok, {1,_}} = ?FLU_C:get_latest_epoch(Host, TcpPort, T), {ok, P1} = ?FLU_C:read_latest_projection(Host, TcpPort, T), - {ok, [1]} = ?FLU_C:list_all(Host, TcpPort, T), - {ok, [P1]} = ?FLU_C:get_all(Host, TcpPort, T), + {ok, [1]} = ?FLU_C:list_all_projections(Host, TcpPort, T), + {ok, [P1]} = ?FLU_C:get_all_projections(Host, TcpPort, T), {error, not_written} = ?FLU_C:read_projection(Host, TcpPort, T, 2) end || T <- [public, private] ] after