Add better (?) timeout handling to machi_cr_client.erl gen_server calls
This commit is contained in:
parent
e3d9ba2b83
commit
0f18ab8d20
1 changed files with 99 additions and 78 deletions
|
@ -177,9 +177,10 @@ append_chunk_extra(PidSpec, Prefix, Chunk, ChunkExtra)
|
||||||
%% @doc Append a chunk (binary- or iolist-style) of data to a file
|
%% @doc Append a chunk (binary- or iolist-style) of data to a file
|
||||||
%% with `Prefix'.
|
%% with `Prefix'.
|
||||||
|
|
||||||
append_chunk_extra(PidSpec, Prefix, Chunk, ChunkExtra, Timeout) ->
|
append_chunk_extra(PidSpec, Prefix, Chunk, ChunkExtra, Timeout0) ->
|
||||||
|
{TO, Timeout} = timeout(Timeout0),
|
||||||
gen_server:call(PidSpec, {req, {append_chunk_extra, Prefix,
|
gen_server:call(PidSpec, {req, {append_chunk_extra, Prefix,
|
||||||
Chunk, ChunkExtra}},
|
Chunk, ChunkExtra, TO}},
|
||||||
Timeout).
|
Timeout).
|
||||||
|
|
||||||
%% @doc Write a chunk of data (that has already been
|
%% @doc Write a chunk of data (that has already been
|
||||||
|
@ -191,8 +192,9 @@ write_chunk(PidSpec, File, Offset, Chunk) ->
|
||||||
|
|
||||||
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
|
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
|
||||||
|
|
||||||
write_chunk(PidSpec, File, Offset, Chunk, Timeout) ->
|
write_chunk(PidSpec, File, Offset, Chunk, Timeout0) ->
|
||||||
gen_server:call(PidSpec, {req, {write_chunk, File, Offset, Chunk}},
|
{TO, Timeout} = timeout(Timeout0),
|
||||||
|
gen_server:call(PidSpec, {req, {write_chunk, File, Offset, Chunk, TO}},
|
||||||
Timeout).
|
Timeout).
|
||||||
|
|
||||||
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
|
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
|
||||||
|
@ -202,8 +204,9 @@ read_chunk(PidSpec, File, Offset, Size) ->
|
||||||
|
|
||||||
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
|
%% @doc Read a chunk of data of size `Size' from `File' at `Offset'.
|
||||||
|
|
||||||
read_chunk(PidSpec, File, Offset, Size, Timeout) ->
|
read_chunk(PidSpec, File, Offset, Size, Timeout0) ->
|
||||||
gen_server:call(PidSpec, {req, {read_chunk, File, Offset, Size}},
|
{TO, Timeout} = timeout(Timeout0),
|
||||||
|
gen_server:call(PidSpec, {req, {read_chunk, File, Offset, Size, TO}},
|
||||||
Timeout).
|
Timeout).
|
||||||
|
|
||||||
%% @doc Fetch the list of chunk checksums for `File'.
|
%% @doc Fetch the list of chunk checksums for `File'.
|
||||||
|
@ -213,8 +216,9 @@ checksum_list(PidSpec, File) ->
|
||||||
|
|
||||||
%% @doc Fetch the list of chunk checksums for `File'.
|
%% @doc Fetch the list of chunk checksums for `File'.
|
||||||
|
|
||||||
checksum_list(PidSpec, File, Timeout) ->
|
checksum_list(PidSpec, File, Timeout0) ->
|
||||||
gen_server:call(PidSpec, {req, {checksum_list, File}},
|
{TO, Timeout} = timeout(Timeout0),
|
||||||
|
gen_server:call(PidSpec, {req, {checksum_list, File, TO}},
|
||||||
Timeout).
|
Timeout).
|
||||||
|
|
||||||
%% @doc Fetch the list of all files on the remote FLU.
|
%% @doc Fetch the list of all files on the remote FLU.
|
||||||
|
@ -224,8 +228,9 @@ list_files(PidSpec) ->
|
||||||
|
|
||||||
%% @doc Fetch the list of all files on the remote FLU.
|
%% @doc Fetch the list of all files on the remote FLU.
|
||||||
|
|
||||||
list_files(PidSpec, Timeout) ->
|
list_files(PidSpec, Timeout0) ->
|
||||||
gen_server:call(PidSpec, {req, {list_files}},
|
{TO, Timeout} = timeout(Timeout0),
|
||||||
|
gen_server:call(PidSpec, {req, {list_files, TO}},
|
||||||
Timeout).
|
Timeout).
|
||||||
|
|
||||||
%% @doc Quit & close the connection to remote FLU and stop our
|
%% @doc Quit & close the connection to remote FLU and stop our
|
||||||
|
@ -265,24 +270,24 @@ code_change(_OldVsn, S, _Extra) ->
|
||||||
|
|
||||||
%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
||||||
|
|
||||||
handle_call2({append_chunk_extra, Prefix, Chunk, ChunkExtra}, _From, S) ->
|
handle_call2({append_chunk_extra, Prefix, Chunk, ChunkExtra, TO}, _From, S) ->
|
||||||
do_append_head(Prefix, Chunk, ChunkExtra, 0, os:timestamp(), S);
|
do_append_head(Prefix, Chunk, ChunkExtra, 0, os:timestamp(), TO, S);
|
||||||
handle_call2({write_chunk, File, Offset, Chunk}, _From, S) ->
|
handle_call2({write_chunk, File, Offset, Chunk, TO}, _From, S) ->
|
||||||
do_write_head(File, Offset, Chunk, 0, os:timestamp(), S);
|
do_write_head(File, Offset, Chunk, 0, os:timestamp(), TO, S);
|
||||||
handle_call2({read_chunk, File, Offset, Size}, _From, S) ->
|
handle_call2({read_chunk, File, Offset, Size, TO}, _From, S) ->
|
||||||
do_read_chunk(File, Offset, Size, 0, os:timestamp(), S);
|
do_read_chunk(File, Offset, Size, 0, os:timestamp(), TO, S);
|
||||||
handle_call2({checksum_list, File}, _From, S) ->
|
handle_call2({checksum_list, File, TO}, _From, S) ->
|
||||||
do_checksum_list(File, 0, os:timestamp(), S);
|
do_checksum_list(File, 0, os:timestamp(), TO, S);
|
||||||
handle_call2({list_files}, _From, S) ->
|
handle_call2({list_files, TO}, _From, S) ->
|
||||||
do_list_files(0, os:timestamp(), S).
|
do_list_files(0, os:timestamp(), TO, S).
|
||||||
|
|
||||||
do_append_head(Prefix, Chunk, ChunkExtra, 0=Depth, STime, S) ->
|
do_append_head(Prefix, Chunk, ChunkExtra, 0=Depth, STime, TO, S) ->
|
||||||
do_append_head2(Prefix, Chunk, ChunkExtra, Depth + 1, STime, S);
|
do_append_head2(Prefix, Chunk, ChunkExtra, Depth + 1, STime, TO, S);
|
||||||
do_append_head(Prefix, Chunk, ChunkExtra, Depth, STime, #state{proj=P}=S) ->
|
do_append_head(Prefix, Chunk, ChunkExtra, Depth, STime, TO, #state{proj=P}=S) ->
|
||||||
%% io:format(user, "head sleep1,", []),
|
%% io:format(user, "head sleep1,", []),
|
||||||
sleep_a_while(Depth),
|
sleep_a_while(Depth),
|
||||||
DiffMs = timer:now_diff(os:timestamp(), STime) div 1000,
|
DiffMs = timer:now_diff(os:timestamp(), STime) div 1000,
|
||||||
if DiffMs > ?MAX_RUNTIME ->
|
if DiffMs > TO ->
|
||||||
{reply, {error, partition}, S};
|
{reply, {error, partition}, S};
|
||||||
true ->
|
true ->
|
||||||
%% This is suboptimal for performance: there are some paths
|
%% This is suboptimal for performance: there are some paths
|
||||||
|
@ -294,30 +299,32 @@ do_append_head(Prefix, Chunk, ChunkExtra, Depth, STime, #state{proj=P}=S) ->
|
||||||
P2 when P2 == undefined orelse
|
P2 when P2 == undefined orelse
|
||||||
P2#projection_v1.upi == [] ->
|
P2#projection_v1.upi == [] ->
|
||||||
do_append_head(Prefix, Chunk, ChunkExtra, Depth + 1,
|
do_append_head(Prefix, Chunk, ChunkExtra, Depth + 1,
|
||||||
STime, S2);
|
STime, TO, S2);
|
||||||
_ ->
|
_ ->
|
||||||
do_append_head2(Prefix, Chunk, ChunkExtra, Depth + 1,
|
do_append_head2(Prefix, Chunk, ChunkExtra, Depth + 1,
|
||||||
STime, S2)
|
STime, TO, S2)
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_append_head2(Prefix, Chunk, ChunkExtra, Depth, STime,
|
do_append_head2(Prefix, Chunk, ChunkExtra, Depth, STime, TO,
|
||||||
#state{proj=P}=S) ->
|
#state{proj=P}=S) ->
|
||||||
[HeadFLU|_RestFLUs] = mutation_flus(P),
|
[HeadFLU|_RestFLUs] = mutation_flus(P),
|
||||||
case is_witness_flu(HeadFLU, P) of
|
case is_witness_flu(HeadFLU, P) of
|
||||||
true ->
|
true ->
|
||||||
case witnesses_use_our_epoch(S) of
|
case witnesses_use_our_epoch(S) of
|
||||||
true ->
|
true ->
|
||||||
do_append_head3(Prefix, Chunk, ChunkExtra, Depth, STime, S);
|
do_append_head3(Prefix, Chunk, ChunkExtra, Depth,
|
||||||
|
STime, TO, S);
|
||||||
false ->
|
false ->
|
||||||
%% Bummer, go back to the beginning and retry.
|
%% Bummer, go back to the beginning and retry.
|
||||||
do_append_head(Prefix, Chunk, ChunkExtra, Depth, STime, S)
|
do_append_head(Prefix, Chunk, ChunkExtra, Depth,
|
||||||
|
STime, TO, S)
|
||||||
end;
|
end;
|
||||||
false ->
|
false ->
|
||||||
do_append_head3(Prefix, Chunk, ChunkExtra, Depth, STime, S)
|
do_append_head3(Prefix, Chunk, ChunkExtra, Depth, STime, TO, S)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_append_head3(Prefix, Chunk, ChunkExtra, Depth, STime,
|
do_append_head3(Prefix, Chunk, ChunkExtra, Depth, STime, TO,
|
||||||
#state{epoch_id=EpochID, proj=P, proxies_dict=PD}=S) ->
|
#state{epoch_id=EpochID, proj=P, proxies_dict=PD}=S) ->
|
||||||
[HeadFLU|RestFLUs] = non_witness_flus(mutation_flus(P), P),
|
[HeadFLU|RestFLUs] = non_witness_flus(mutation_flus(P), P),
|
||||||
Proxy = orddict:fetch(HeadFLU, PD),
|
Proxy = orddict:fetch(HeadFLU, PD),
|
||||||
|
@ -327,33 +334,33 @@ do_append_head3(Prefix, Chunk, ChunkExtra, Depth, STime,
|
||||||
{ok, {Offset, _Size, File}=_X} ->
|
{ok, {Offset, _Size, File}=_X} ->
|
||||||
%% io:format(user, "append ~w,", [HeadFLU]),
|
%% io:format(user, "append ~w,", [HeadFLU]),
|
||||||
do_append_midtail(RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra,
|
do_append_midtail(RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra,
|
||||||
[HeadFLU], 0, STime, S);
|
[HeadFLU], 0, STime, TO, S);
|
||||||
{error, bad_checksum}=BadCS ->
|
{error, bad_checksum}=BadCS ->
|
||||||
{reply, BadCS, S};
|
{reply, BadCS, S};
|
||||||
{error, Retry}
|
{error, Retry}
|
||||||
when Retry == partition; Retry == bad_epoch; Retry == wedged ->
|
when Retry == partition; Retry == bad_epoch; Retry == wedged ->
|
||||||
do_append_head(Prefix, Chunk, ChunkExtra, Depth, STime, S);
|
do_append_head(Prefix, Chunk, ChunkExtra, Depth, STime, TO, S);
|
||||||
{error, written} ->
|
{error, written} ->
|
||||||
%% Implicit sequencing + this error = we don't know where this
|
%% Implicit sequencing + this error = we don't know where this
|
||||||
%% written block is. But we lost a race. Repeat, with a new
|
%% written block is. But we lost a race. Repeat, with a new
|
||||||
%% sequencer assignment.
|
%% sequencer assignment.
|
||||||
do_append_head(Prefix, Chunk, ChunkExtra, Depth, STime, S);
|
do_append_head(Prefix, Chunk, ChunkExtra, Depth, STime, TO, S);
|
||||||
{error, not_written} ->
|
{error, not_written} ->
|
||||||
exit({todo_should_never_happen,?MODULE,?LINE,
|
exit({todo_should_never_happen,?MODULE,?LINE,
|
||||||
Prefix,iolist_size(Chunk)})
|
Prefix,iolist_size(Chunk)})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_append_midtail(RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra,
|
do_append_midtail(RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra,
|
||||||
Ws, Depth, STime, S)
|
Ws, Depth, STime, TO, S)
|
||||||
when RestFLUs == [] orelse Depth == 0 ->
|
when RestFLUs == [] orelse Depth == 0 ->
|
||||||
do_append_midtail2(RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra,
|
do_append_midtail2(RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra,
|
||||||
Ws, Depth + 1, STime, S);
|
Ws, Depth + 1, STime, TO, S);
|
||||||
do_append_midtail(_RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra,
|
do_append_midtail(_RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra,
|
||||||
Ws, Depth, STime, #state{proj=P}=S) ->
|
Ws, Depth, STime, TO, #state{proj=P}=S) ->
|
||||||
%% io:format(user, "midtail sleep2,", []),
|
%% io:format(user, "midtail sleep2,", []),
|
||||||
sleep_a_while(Depth),
|
sleep_a_while(Depth),
|
||||||
DiffMs = timer:now_diff(os:timestamp(), STime) div 1000,
|
DiffMs = timer:now_diff(os:timestamp(), STime) div 1000,
|
||||||
if DiffMs > ?MAX_RUNTIME ->
|
if DiffMs > TO ->
|
||||||
{reply, {error, partition}, S};
|
{reply, {error, partition}, S};
|
||||||
true ->
|
true ->
|
||||||
S2 = update_proj(S#state{proj=undefined, bad_proj=P}),
|
S2 = update_proj(S#state{proj=undefined, bad_proj=P}),
|
||||||
|
@ -376,36 +383,36 @@ do_append_midtail(_RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra,
|
||||||
{error, partition};
|
{error, partition};
|
||||||
true ->
|
true ->
|
||||||
do_append_head2(Prefix, Chunk, ChunkExtra,
|
do_append_head2(Prefix, Chunk, ChunkExtra,
|
||||||
Depth, STime, S2)
|
Depth, STime, TO, S2)
|
||||||
end;
|
end;
|
||||||
RestFLUs3 ->
|
RestFLUs3 ->
|
||||||
do_append_midtail2(RestFLUs3, Prefix, File, Offset,
|
do_append_midtail2(RestFLUs3, Prefix, File, Offset,
|
||||||
Chunk, ChunkExtra,
|
Chunk, ChunkExtra,
|
||||||
Ws, Depth + 1, STime, S2)
|
Ws, Depth + 1, STime, TO, S2)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_append_midtail2([], _Prefix, File, Offset, Chunk,
|
do_append_midtail2([], _Prefix, File, Offset, Chunk,
|
||||||
_ChunkExtra, _Ws, _Depth, _STime, S) ->
|
_ChunkExtra, _Ws, _Depth, _STime, _TO, S) ->
|
||||||
%% io:format(user, "ok!\n", []),
|
%% io:format(user, "ok!\n", []),
|
||||||
{reply, {ok, {Offset, chunk_wrapper_size(Chunk), File}}, S};
|
{reply, {ok, {Offset, chunk_wrapper_size(Chunk), File}}, S};
|
||||||
do_append_midtail2([FLU|RestFLUs]=FLUs, Prefix, File, Offset, Chunk,
|
do_append_midtail2([FLU|RestFLUs]=FLUs, Prefix, File, Offset, Chunk,
|
||||||
ChunkExtra, Ws, Depth, STime,
|
ChunkExtra, Ws, Depth, STime, TO,
|
||||||
#state{epoch_id=EpochID, proxies_dict=PD}=S) ->
|
#state{epoch_id=EpochID, proxies_dict=PD}=S) ->
|
||||||
Proxy = orddict:fetch(FLU, PD),
|
Proxy = orddict:fetch(FLU, PD),
|
||||||
case ?FLU_PC:write_chunk(Proxy, EpochID, File, Offset, Chunk, ?TIMEOUT) of
|
case ?FLU_PC:write_chunk(Proxy, EpochID, File, Offset, Chunk, ?TIMEOUT) of
|
||||||
ok ->
|
ok ->
|
||||||
%% io:format(user, "write ~w,", [FLU]),
|
%% io:format(user, "write ~w,", [FLU]),
|
||||||
do_append_midtail2(RestFLUs, Prefix, File, Offset, Chunk,
|
do_append_midtail2(RestFLUs, Prefix, File, Offset, Chunk,
|
||||||
ChunkExtra, [FLU|Ws], Depth, STime, S);
|
ChunkExtra, [FLU|Ws], Depth, STime, TO, S);
|
||||||
{error, bad_checksum}=BadCS ->
|
{error, bad_checksum}=BadCS ->
|
||||||
%% TODO: alternate strategy?
|
%% TODO: alternate strategy?
|
||||||
{reply, BadCS, S};
|
{reply, BadCS, S};
|
||||||
{error, Retry}
|
{error, Retry}
|
||||||
when Retry == partition; Retry == bad_epoch; Retry == wedged ->
|
when Retry == partition; Retry == bad_epoch; Retry == wedged ->
|
||||||
do_append_midtail(FLUs, Prefix, File, Offset, Chunk,
|
do_append_midtail(FLUs, Prefix, File, Offset, Chunk,
|
||||||
ChunkExtra, Ws, Depth, STime, S);
|
ChunkExtra, Ws, Depth, STime, TO, S);
|
||||||
{error, written} ->
|
{error, written} ->
|
||||||
%% We know what the chunk ought to be, so jump to the
|
%% We know what the chunk ought to be, so jump to the
|
||||||
%% middle of read-repair.
|
%% middle of read-repair.
|
||||||
|
@ -416,7 +423,7 @@ do_append_midtail2([FLU|RestFLUs]=FLUs, Prefix, File, Offset, Chunk,
|
||||||
exit({todo_should_never_happen,?MODULE,?LINE,File,Offset})
|
exit({todo_should_never_happen,?MODULE,?LINE,File,Offset})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
witnesses_use_our_epoch(#state{proj=P, epoch_id=EpochID}=S) ->
|
witnesses_use_our_epoch(#state{proj=P}=S) ->
|
||||||
Witnesses = witness_flus(P#projection_v1.upi, P),
|
Witnesses = witness_flus(P#projection_v1.upi, P),
|
||||||
witnesses_use_our_epoch(Witnesses, S).
|
witnesses_use_our_epoch(Witnesses, S).
|
||||||
|
|
||||||
|
@ -434,13 +441,13 @@ io:format(user, "Bummer, ~p uses ~P at ~p\n", [FLU, _Else, 7, now()]),
|
||||||
false
|
false
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_write_head(File, Offset, Chunk, 0=Depth, STime, S) ->
|
do_write_head(File, Offset, Chunk, 0=Depth, STime, TO, S) ->
|
||||||
do_write_head2(File, Offset, Chunk, Depth + 1, STime, S);
|
do_write_head2(File, Offset, Chunk, Depth + 1, STime, TO, S);
|
||||||
do_write_head(File, Offset, Chunk, Depth, STime, #state{proj=P}=S) ->
|
do_write_head(File, Offset, Chunk, Depth, STime, TO, #state{proj=P}=S) ->
|
||||||
%% io:format(user, "head sleep1,", []),
|
%% io:format(user, "head sleep1,", []),
|
||||||
sleep_a_while(Depth),
|
sleep_a_while(Depth),
|
||||||
DiffMs = timer:now_diff(os:timestamp(), STime) div 1000,
|
DiffMs = timer:now_diff(os:timestamp(), STime) div 1000,
|
||||||
if DiffMs > ?MAX_RUNTIME ->
|
if DiffMs > TO ->
|
||||||
{reply, {error, partition}, S};
|
{reply, {error, partition}, S};
|
||||||
true ->
|
true ->
|
||||||
%% This is suboptimal for performance: there are some paths
|
%% This is suboptimal for performance: there are some paths
|
||||||
|
@ -451,13 +458,15 @@ do_write_head(File, Offset, Chunk, Depth, STime, #state{proj=P}=S) ->
|
||||||
case S2#state.proj of
|
case S2#state.proj of
|
||||||
P2 when P2 == undefined orelse
|
P2 when P2 == undefined orelse
|
||||||
P2#projection_v1.upi == [] ->
|
P2#projection_v1.upi == [] ->
|
||||||
do_write_head(File, Offset, Chunk, Depth + 1, STime, S2);
|
do_write_head(File, Offset, Chunk, Depth + 1,
|
||||||
|
STime, TO, S2);
|
||||||
_ ->
|
_ ->
|
||||||
do_write_head2(File, Offset, Chunk, Depth + 1, STime, S2)
|
do_write_head2(File, Offset, Chunk, Depth + 1,
|
||||||
|
STime, TO, S2)
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_write_head2(File, Offset, Chunk, Depth, STime,
|
do_write_head2(File, Offset, Chunk, Depth, STime, TO,
|
||||||
#state{epoch_id=EpochID, proj=P, proxies_dict=PD}=S) ->
|
#state{epoch_id=EpochID, proj=P, proxies_dict=PD}=S) ->
|
||||||
[HeadFLU|RestFLUs] = mutation_flus(P),
|
[HeadFLU|RestFLUs] = mutation_flus(P),
|
||||||
Proxy = orddict:fetch(HeadFLU, PD),
|
Proxy = orddict:fetch(HeadFLU, PD),
|
||||||
|
@ -466,12 +475,12 @@ do_write_head2(File, Offset, Chunk, Depth, STime,
|
||||||
%% From this point onward, we use the same code & logic path as
|
%% From this point onward, we use the same code & logic path as
|
||||||
%% append does.
|
%% append does.
|
||||||
do_append_midtail(RestFLUs, undefined, File, Offset, Chunk,
|
do_append_midtail(RestFLUs, undefined, File, Offset, Chunk,
|
||||||
undefined, [HeadFLU], 0, STime, S);
|
undefined, [HeadFLU], 0, STime, TO, S);
|
||||||
{error, bad_checksum}=BadCS ->
|
{error, bad_checksum}=BadCS ->
|
||||||
{reply, BadCS, S};
|
{reply, BadCS, S};
|
||||||
{error, Retry}
|
{error, Retry}
|
||||||
when Retry == partition; Retry == bad_epoch; Retry == wedged ->
|
when Retry == partition; Retry == bad_epoch; Retry == wedged ->
|
||||||
do_write_head(File, Offset, Chunk, Depth, STime, S);
|
do_write_head(File, Offset, Chunk, Depth, STime, TO, S);
|
||||||
{error, written}=Err ->
|
{error, written}=Err ->
|
||||||
Err;
|
Err;
|
||||||
{error, not_written} ->
|
{error, not_written} ->
|
||||||
|
@ -479,27 +488,27 @@ do_write_head2(File, Offset, Chunk, Depth, STime,
|
||||||
iolist_size(Chunk)})
|
iolist_size(Chunk)})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_read_chunk(File, Offset, Size, 0=Depth, STime,
|
do_read_chunk(File, Offset, Size, 0=Depth, STime, TO,
|
||||||
#state{proj=#projection_v1{upi=[_|_]}}=S) -> % UPI is non-empty
|
#state{proj=#projection_v1{upi=[_|_]}}=S) -> % UPI is non-empty
|
||||||
do_read_chunk2(File, Offset, Size, Depth + 1, STime, S);
|
do_read_chunk2(File, Offset, Size, Depth + 1, STime, TO, S);
|
||||||
do_read_chunk(File, Offset, Size, Depth, STime, #state{proj=P}=S) ->
|
do_read_chunk(File, Offset, Size, Depth, STime, TO, #state{proj=P}=S) ->
|
||||||
%% io:format(user, "read sleep1,", []),
|
%% io:format(user, "read sleep1,", []),
|
||||||
sleep_a_while(Depth),
|
sleep_a_while(Depth),
|
||||||
DiffMs = timer:now_diff(os:timestamp(), STime) div 1000,
|
DiffMs = timer:now_diff(os:timestamp(), STime) div 1000,
|
||||||
if DiffMs > ?MAX_RUNTIME ->
|
if DiffMs > TO ->
|
||||||
{reply, {error, partition}, S};
|
{reply, {error, partition}, S};
|
||||||
true ->
|
true ->
|
||||||
S2 = update_proj(S#state{proj=undefined, bad_proj=P}),
|
S2 = update_proj(S#state{proj=undefined, bad_proj=P}),
|
||||||
case S2#state.proj of
|
case S2#state.proj of
|
||||||
P2 when P2 == undefined orelse
|
P2 when P2 == undefined orelse
|
||||||
P2#projection_v1.upi == [] ->
|
P2#projection_v1.upi == [] ->
|
||||||
do_read_chunk(File, Offset, Size, Depth + 1, STime, S2);
|
do_read_chunk(File, Offset, Size, Depth + 1, STime, TO, S2);
|
||||||
_ ->
|
_ ->
|
||||||
do_read_chunk2(File, Offset, Size, Depth + 1, STime, S2)
|
do_read_chunk2(File, Offset, Size, Depth + 1, STime, TO, S2)
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_read_chunk2(File, Offset, Size, Depth, STime,
|
do_read_chunk2(File, Offset, Size, Depth, STime, TO,
|
||||||
#state{proj=P, epoch_id=EpochID, proxies_dict=PD}=S) ->
|
#state{proj=P, epoch_id=EpochID, proxies_dict=PD}=S) ->
|
||||||
UPI = readonly_flus(P),
|
UPI = readonly_flus(P),
|
||||||
Tail = lists:last(UPI),
|
Tail = lists:last(UPI),
|
||||||
|
@ -518,7 +527,7 @@ do_read_chunk2(File, Offset, Size, Depth, STime,
|
||||||
{reply, BadCS, S};
|
{reply, BadCS, S};
|
||||||
{error, Retry}
|
{error, Retry}
|
||||||
when Retry == partition; Retry == bad_epoch; Retry == wedged ->
|
when Retry == partition; Retry == bad_epoch; Retry == wedged ->
|
||||||
do_read_chunk(File, Offset, Size, Depth, STime, S);
|
do_read_chunk(File, Offset, Size, Depth, STime, TO, S);
|
||||||
{error, not_written} ->
|
{error, not_written} ->
|
||||||
read_repair(ConsistencyMode, read, File, Offset, Size, Depth, STime, S);
|
read_repair(ConsistencyMode, read, File, Offset, Size, Depth, STime, S);
|
||||||
%% {reply, {error, not_written}, S};
|
%% {reply, {error, not_written}, S};
|
||||||
|
@ -674,12 +683,12 @@ read_repair4([First|Rest]=ToRepair, ReturnMode, Chunk, Repaired, File, Offset,
|
||||||
exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size})
|
exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_checksum_list(File, 0=Depth, STime, S) ->
|
do_checksum_list(File, 0=Depth, STime, TO, S) ->
|
||||||
do_checksum_list2(File, Depth + 1, STime, S);
|
do_checksum_list2(File, Depth + 1, STime, TO, S);
|
||||||
do_checksum_list(File, Depth, STime, #state{proj=P}=S) ->
|
do_checksum_list(File, Depth, STime, TO, #state{proj=P}=S) ->
|
||||||
sleep_a_while(Depth),
|
sleep_a_while(Depth),
|
||||||
DiffMs = timer:now_diff(os:timestamp(), STime) div 1000,
|
DiffMs = timer:now_diff(os:timestamp(), STime) div 1000,
|
||||||
if DiffMs > ?MAX_RUNTIME ->
|
if DiffMs > TO ->
|
||||||
{reply, {error, partition}, S};
|
{reply, {error, partition}, S};
|
||||||
true ->
|
true ->
|
||||||
%% This is suboptimal for performance: there are some paths
|
%% This is suboptimal for performance: there are some paths
|
||||||
|
@ -690,13 +699,13 @@ do_checksum_list(File, Depth, STime, #state{proj=P}=S) ->
|
||||||
case S2#state.proj of
|
case S2#state.proj of
|
||||||
P2 when P2 == undefined orelse
|
P2 when P2 == undefined orelse
|
||||||
P2#projection_v1.upi == [] ->
|
P2#projection_v1.upi == [] ->
|
||||||
do_checksum_list(File, Depth + 1, STime, S2);
|
do_checksum_list(File, Depth + 1, STime, TO, S2);
|
||||||
_ ->
|
_ ->
|
||||||
do_checksum_list2(File, Depth + 1, STime, S2)
|
do_checksum_list2(File, Depth + 1, STime, TO, S2)
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_checksum_list2(File, Depth, STime,
|
do_checksum_list2(File, Depth, STime, TO,
|
||||||
#state{epoch_id=EpochID, proj=P, proxies_dict=PD}=S) ->
|
#state{epoch_id=EpochID, proj=P, proxies_dict=PD}=S) ->
|
||||||
Proxy = orddict:fetch(lists:last(readonly_flus(P)), PD),
|
Proxy = orddict:fetch(lists:last(readonly_flus(P)), PD),
|
||||||
case ?FLU_PC:checksum_list(Proxy, EpochID, File, ?TIMEOUT) of
|
case ?FLU_PC:checksum_list(Proxy, EpochID, File, ?TIMEOUT) of
|
||||||
|
@ -704,17 +713,17 @@ do_checksum_list2(File, Depth, STime,
|
||||||
{reply, OK, S};
|
{reply, OK, S};
|
||||||
{error, Retry}
|
{error, Retry}
|
||||||
when Retry == partition; Retry == bad_epoch; Retry == wedged ->
|
when Retry == partition; Retry == bad_epoch; Retry == wedged ->
|
||||||
do_checksum_list(File, Depth, STime, S);
|
do_checksum_list(File, Depth, STime, TO, S);
|
||||||
{error, _}=Error ->
|
{error, _}=Error ->
|
||||||
{reply, Error, S}
|
{reply, Error, S}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_list_files(0=Depth, STime, S) ->
|
do_list_files(0=Depth, STime, TO, S) ->
|
||||||
do_list_files2(Depth + 1, STime, S);
|
do_list_files2(Depth + 1, STime, TO, S);
|
||||||
do_list_files(Depth, STime, #state{proj=P}=S) ->
|
do_list_files(Depth, STime, TO, #state{proj=P}=S) ->
|
||||||
sleep_a_while(Depth),
|
sleep_a_while(Depth),
|
||||||
DiffMs = timer:now_diff(os:timestamp(), STime) div 1000,
|
DiffMs = timer:now_diff(os:timestamp(), STime) div 1000,
|
||||||
if DiffMs > ?MAX_RUNTIME ->
|
if DiffMs > TO ->
|
||||||
{reply, {error, partition}, S};
|
{reply, {error, partition}, S};
|
||||||
true ->
|
true ->
|
||||||
%% This is suboptimal for performance: there are some paths
|
%% This is suboptimal for performance: there are some paths
|
||||||
|
@ -725,13 +734,13 @@ do_list_files(Depth, STime, #state{proj=P}=S) ->
|
||||||
case S2#state.proj of
|
case S2#state.proj of
|
||||||
P2 when P2 == undefined orelse
|
P2 when P2 == undefined orelse
|
||||||
P2#projection_v1.upi == [] ->
|
P2#projection_v1.upi == [] ->
|
||||||
do_list_files(Depth + 1, STime, S2);
|
do_list_files(Depth + 1, STime, TO, S2);
|
||||||
_ ->
|
_ ->
|
||||||
do_list_files2(Depth + 1, STime, S2)
|
do_list_files2(Depth + 1, STime, TO, S2)
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_list_files2(Depth, STime,
|
do_list_files2(Depth, STime, TO,
|
||||||
#state{epoch_id=EpochID, proj=P, proxies_dict=PD}=S) ->
|
#state{epoch_id=EpochID, proj=P, proxies_dict=PD}=S) ->
|
||||||
Proxy = orddict:fetch(lists:last(readonly_flus(P)), PD),
|
Proxy = orddict:fetch(lists:last(readonly_flus(P)), PD),
|
||||||
case ?FLU_PC:list_files(Proxy, EpochID, ?TIMEOUT) of
|
case ?FLU_PC:list_files(Proxy, EpochID, ?TIMEOUT) of
|
||||||
|
@ -739,7 +748,7 @@ do_list_files2(Depth, STime,
|
||||||
{reply, OK, S};
|
{reply, OK, S};
|
||||||
{error, Retry}
|
{error, Retry}
|
||||||
when Retry == partition; Retry == bad_epoch; Retry == wedged ->
|
when Retry == partition; Retry == bad_epoch; Retry == wedged ->
|
||||||
do_list_files(Depth, STime, S);
|
do_list_files(Depth, STime, TO, S);
|
||||||
{error, _}=Error ->
|
{error, _}=Error ->
|
||||||
{reply, Error, S}
|
{reply, Error, S}
|
||||||
end.
|
end.
|
||||||
|
@ -877,3 +886,15 @@ chunk_wrapper_size({_TaggedCSum, Chunk}) ->
|
||||||
iolist_size(Chunk);
|
iolist_size(Chunk);
|
||||||
chunk_wrapper_size(Chunk) ->
|
chunk_wrapper_size(Chunk) ->
|
||||||
iolist_size(Chunk).
|
iolist_size(Chunk).
|
||||||
|
|
||||||
|
%% The intra-gen_server-timeout is approximate. We'll add some 30
|
||||||
|
%% seconds of fudge to the user-specified time to accomodate the
|
||||||
|
%% imprecision. Using 'infinity' would definitely handle the case of
|
||||||
|
%% horrible time consumption by the gen_server proc, but I'm slightly
|
||||||
|
%% leery of really waiting forever: better to have an
|
||||||
|
%% {'EXIT',{timeout,_}} to raise awareness of a serious problem.
|
||||||
|
|
||||||
|
timeout(infinity) ->
|
||||||
|
timeout(15*60*1000); % close enough to infinity
|
||||||
|
timeout(Timeout0) ->
|
||||||
|
{Timeout0, Timeout0 + 30*1000}.
|
||||||
|
|
Loading…
Reference in a new issue