WIP: starting machi_cr_client:append_chunk*
This commit is contained in:
parent
b0607ae815
commit
a7f53cf21a
3 changed files with 15 additions and 4 deletions
|
@ -289,7 +289,6 @@ do_append_chunk_extra1(Prefix, Chunk, ChunkExtra,
|
||||||
EpochID, Prefix, Chunk, ChunkExtra,
|
EpochID, Prefix, Chunk, ChunkExtra,
|
||||||
?TIMEOUT) of
|
?TIMEOUT) of
|
||||||
{ok, {Offset, _Size, File}=_X} ->
|
{ok, {Offset, _Size, File}=_X} ->
|
||||||
io:format(user, "TODO: X ~p\n", [_X]),
|
|
||||||
do_append_chunk_extra2(RestFLUs, File, Offset, Chunk,
|
do_append_chunk_extra2(RestFLUs, File, Offset, Chunk,
|
||||||
HeadFLU, 1, S);
|
HeadFLU, 1, S);
|
||||||
{error, Change} when Change == bad_epoch; Change == wedged ->
|
{error, Change} when Change == bad_epoch; Change == wedged ->
|
||||||
|
@ -298,8 +297,8 @@ do_append_chunk_extra1(Prefix, Chunk, ChunkExtra,
|
||||||
%% TODO return values here
|
%% TODO return values here
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_append_chunk_extra2([], File, Offset, _Chunk, _OldHeadFLU, _OkCount, S) ->
|
do_append_chunk_extra2([], File, Offset, Chunk, _OldHeadFLU, _OkCount, S) ->
|
||||||
{reply, {ok, {File, Offset}}, S};
|
{reply, {ok, {Offset, size(Chunk), File}}, S};
|
||||||
do_append_chunk_extra2([FLU|RestFLUs], File, Offset, Chunk, OldHeadFLU, OkCount,
|
do_append_chunk_extra2([FLU|RestFLUs], File, Offset, Chunk, OldHeadFLU, OkCount,
|
||||||
#state{epoch_id=EpochID, proj=P,
|
#state{epoch_id=EpochID, proj=P,
|
||||||
proxies_dict=PD}=S) ->
|
proxies_dict=PD}=S) ->
|
||||||
|
|
|
@ -66,6 +66,8 @@
|
||||||
-include("machi.hrl").
|
-include("machi.hrl").
|
||||||
-include("machi_projection.hrl").
|
-include("machi_projection.hrl").
|
||||||
|
|
||||||
|
-define(SERVER_CMD_READ_TIMEOUT, 600*1000).
|
||||||
|
|
||||||
-export([start_link/1, stop/1,
|
-export([start_link/1, stop/1,
|
||||||
update_wedge_state/3]).
|
update_wedge_state/3]).
|
||||||
-export([make_listener_regname/1, make_projection_server_regname/1]).
|
-export([make_listener_regname/1, make_projection_server_regname/1]).
|
||||||
|
@ -244,7 +246,9 @@ decode_epoch_id(EpochIDHex) ->
|
||||||
|
|
||||||
net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) ->
|
net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) ->
|
||||||
ok = inet:setopts(Sock, [{packet, line}]),
|
ok = inet:setopts(Sock, [{packet, line}]),
|
||||||
case gen_tcp:recv(Sock, 0, 600*1000) of
|
%% TODO: Add testing control knob to adjust this timeout and/or inject
|
||||||
|
%% timeout condition.
|
||||||
|
case gen_tcp:recv(Sock, 0, ?SERVER_CMD_READ_TIMEOUT) of
|
||||||
{ok, Line} ->
|
{ok, Line} ->
|
||||||
%% machi_util:verb("Got: ~p\n", [Line]),
|
%% machi_util:verb("Got: ~p\n", [Line]),
|
||||||
PrefixLenLF = byte_size(Line) - 2 - ?EpochIDSpace - 8 -8 - 1,
|
PrefixLenLF = byte_size(Line) - 2 - ?EpochIDSpace - 8 -8 - 1,
|
||||||
|
|
|
@ -295,8 +295,16 @@ do_req(Req, S) ->
|
||||||
case connected_p(S2) of
|
case connected_p(S2) of
|
||||||
true ->
|
true ->
|
||||||
case Fun() of
|
case Fun() of
|
||||||
|
ok ->
|
||||||
|
{ok, S2};
|
||||||
T when element(1, T) == ok ->
|
T when element(1, T) == ok ->
|
||||||
{T, S2};
|
{T, S2};
|
||||||
|
{error, {badmatch, {badmatch, {error, Why}}, _Stk}}
|
||||||
|
when Why == closed; Why == timeout ->
|
||||||
|
%% TODO: Infinite recursion isn't
|
||||||
|
%% good. Exponential backoff might be good.
|
||||||
|
timer:sleep(500),
|
||||||
|
do_req(Req, disconnect(S2));
|
||||||
Else ->
|
Else ->
|
||||||
case get(bad_sock) of
|
case get(bad_sock) of
|
||||||
Bad when Bad == S2#state.sock ->
|
Bad when Bad == S2#state.sock ->
|
||||||
|
|
Loading…
Reference in a new issue