From a7f53cf21a021e773ce0bb9b158ce3230eb14f6d Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Mon, 18 May 2015 00:59:24 +0900 Subject: [PATCH] WIP: starting machi_cr_client:append_chunk* --- src/machi_cr_client.erl | 5 ++--- src/machi_flu1.erl | 6 +++++- src/machi_proxy_flu1_client.erl | 8 ++++++++ 3 files changed, 15 insertions(+), 4 deletions(-) diff --git a/src/machi_cr_client.erl b/src/machi_cr_client.erl index 65ecdfa..cf282e2 100644 --- a/src/machi_cr_client.erl +++ b/src/machi_cr_client.erl @@ -289,7 +289,6 @@ do_append_chunk_extra1(Prefix, Chunk, ChunkExtra, EpochID, Prefix, Chunk, ChunkExtra, ?TIMEOUT) of {ok, {Offset, _Size, File}=_X} -> - io:format(user, "TODO: X ~p\n", [_X]), do_append_chunk_extra2(RestFLUs, File, Offset, Chunk, HeadFLU, 1, S); {error, Change} when Change == bad_epoch; Change == wedged -> @@ -298,8 +297,8 @@ do_append_chunk_extra1(Prefix, Chunk, ChunkExtra, %% TODO return values here end. -do_append_chunk_extra2([], File, Offset, _Chunk, _OldHeadFLU, _OkCount, S) -> - {reply, {ok, {File, Offset}}, S}; +do_append_chunk_extra2([], File, Offset, Chunk, _OldHeadFLU, _OkCount, S) -> + {reply, {ok, {Offset, size(Chunk), File}}, S}; do_append_chunk_extra2([FLU|RestFLUs], File, Offset, Chunk, OldHeadFLU, OkCount, #state{epoch_id=EpochID, proj=P, proxies_dict=PD}=S) -> diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl index a830c18..481a19a 100644 --- a/src/machi_flu1.erl +++ b/src/machi_flu1.erl @@ -66,6 +66,8 @@ -include("machi.hrl"). -include("machi_projection.hrl"). +-define(SERVER_CMD_READ_TIMEOUT, 600*1000). + -export([start_link/1, stop/1, update_wedge_state/3]). -export([make_listener_regname/1, make_projection_server_regname/1]). @@ -244,7 +246,9 @@ decode_epoch_id(EpochIDHex) -> net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) -> ok = inet:setopts(Sock, [{packet, line}]), - case gen_tcp:recv(Sock, 0, 600*1000) of + %% TODO: Add testing control knob to adjust this timeout and/or inject + %% timeout condition. + case gen_tcp:recv(Sock, 0, ?SERVER_CMD_READ_TIMEOUT) of {ok, Line} -> %% machi_util:verb("Got: ~p\n", [Line]), PrefixLenLF = byte_size(Line) - 2 - ?EpochIDSpace - 8 -8 - 1, diff --git a/src/machi_proxy_flu1_client.erl b/src/machi_proxy_flu1_client.erl index ce55313..b49da56 100644 --- a/src/machi_proxy_flu1_client.erl +++ b/src/machi_proxy_flu1_client.erl @@ -295,8 +295,16 @@ do_req(Req, S) -> case connected_p(S2) of true -> case Fun() of + ok -> + {ok, S2}; T when element(1, T) == ok -> {T, S2}; + {error, {badmatch, {badmatch, {error, Why}}, _Stk}} + when Why == closed; Why == timeout -> + %% TODO: Infinite recursion isn't + %% good. Exponential backoff might be good. + timer:sleep(500), + do_req(Req, disconnect(S2)); Else -> case get(bad_sock) of Bad when Bad == S2#state.sock ->