diff --git a/src/machi_cr_client.erl b/src/machi_cr_client.erl index b60fdee..5b19be5 100644 --- a/src/machi_cr_client.erl +++ b/src/machi_cr_client.erl @@ -177,9 +177,10 @@ append_chunk_extra(PidSpec, Prefix, Chunk, ChunkExtra) %% @doc Append a chunk (binary- or iolist-style) of data to a file %% with `Prefix'. -append_chunk_extra(PidSpec, Prefix, Chunk, ChunkExtra, Timeout) -> +append_chunk_extra(PidSpec, Prefix, Chunk, ChunkExtra, Timeout0) -> + {TO, Timeout} = timeout(Timeout0), gen_server:call(PidSpec, {req, {append_chunk_extra, Prefix, - Chunk, ChunkExtra}}, + Chunk, ChunkExtra, TO}}, Timeout). %% @doc Write a chunk of data (that has already been @@ -191,8 +192,9 @@ write_chunk(PidSpec, File, Offset, Chunk) -> %% @doc Read a chunk of data of size `Size' from `File' at `Offset'. -write_chunk(PidSpec, File, Offset, Chunk, Timeout) -> - gen_server:call(PidSpec, {req, {write_chunk, File, Offset, Chunk}}, +write_chunk(PidSpec, File, Offset, Chunk, Timeout0) -> + {TO, Timeout} = timeout(Timeout0), + gen_server:call(PidSpec, {req, {write_chunk, File, Offset, Chunk, TO}}, Timeout). %% @doc Read a chunk of data of size `Size' from `File' at `Offset'. @@ -202,8 +204,9 @@ read_chunk(PidSpec, File, Offset, Size) -> %% @doc Read a chunk of data of size `Size' from `File' at `Offset'. -read_chunk(PidSpec, File, Offset, Size, Timeout) -> - gen_server:call(PidSpec, {req, {read_chunk, File, Offset, Size}}, +read_chunk(PidSpec, File, Offset, Size, Timeout0) -> + {TO, Timeout} = timeout(Timeout0), + gen_server:call(PidSpec, {req, {read_chunk, File, Offset, Size, TO}}, Timeout). %% @doc Fetch the list of chunk checksums for `File'. @@ -213,8 +216,9 @@ checksum_list(PidSpec, File) -> %% @doc Fetch the list of chunk checksums for `File'. -checksum_list(PidSpec, File, Timeout) -> - gen_server:call(PidSpec, {req, {checksum_list, File}}, +checksum_list(PidSpec, File, Timeout0) -> + {TO, Timeout} = timeout(Timeout0), + gen_server:call(PidSpec, {req, {checksum_list, File, TO}}, Timeout). %% @doc Fetch the list of all files on the remote FLU. @@ -224,8 +228,9 @@ list_files(PidSpec) -> %% @doc Fetch the list of all files on the remote FLU. -list_files(PidSpec, Timeout) -> - gen_server:call(PidSpec, {req, {list_files}}, +list_files(PidSpec, Timeout0) -> + {TO, Timeout} = timeout(Timeout0), + gen_server:call(PidSpec, {req, {list_files, TO}}, Timeout). %% @doc Quit & close the connection to remote FLU and stop our @@ -265,24 +270,24 @@ code_change(_OldVsn, S, _Extra) -> %%%%%%%%%%%%%%%%%%%%%%%%%%% -handle_call2({append_chunk_extra, Prefix, Chunk, ChunkExtra}, _From, S) -> - do_append_head(Prefix, Chunk, ChunkExtra, 0, os:timestamp(), S); -handle_call2({write_chunk, File, Offset, Chunk}, _From, S) -> - do_write_head(File, Offset, Chunk, 0, os:timestamp(), S); -handle_call2({read_chunk, File, Offset, Size}, _From, S) -> - do_read_chunk(File, Offset, Size, 0, os:timestamp(), S); -handle_call2({checksum_list, File}, _From, S) -> - do_checksum_list(File, 0, os:timestamp(), S); -handle_call2({list_files}, _From, S) -> - do_list_files(0, os:timestamp(), S). +handle_call2({append_chunk_extra, Prefix, Chunk, ChunkExtra, TO}, _From, S) -> + do_append_head(Prefix, Chunk, ChunkExtra, 0, os:timestamp(), TO, S); +handle_call2({write_chunk, File, Offset, Chunk, TO}, _From, S) -> + do_write_head(File, Offset, Chunk, 0, os:timestamp(), TO, S); +handle_call2({read_chunk, File, Offset, Size, TO}, _From, S) -> + do_read_chunk(File, Offset, Size, 0, os:timestamp(), TO, S); +handle_call2({checksum_list, File, TO}, _From, S) -> + do_checksum_list(File, 0, os:timestamp(), TO, S); +handle_call2({list_files, TO}, _From, S) -> + do_list_files(0, os:timestamp(), TO, S). -do_append_head(Prefix, Chunk, ChunkExtra, 0=Depth, STime, S) -> - do_append_head2(Prefix, Chunk, ChunkExtra, Depth + 1, STime, S); -do_append_head(Prefix, Chunk, ChunkExtra, Depth, STime, #state{proj=P}=S) -> +do_append_head(Prefix, Chunk, ChunkExtra, 0=Depth, STime, TO, S) -> + do_append_head2(Prefix, Chunk, ChunkExtra, Depth + 1, STime, TO, S); +do_append_head(Prefix, Chunk, ChunkExtra, Depth, STime, TO, #state{proj=P}=S) -> %% io:format(user, "head sleep1,", []), sleep_a_while(Depth), DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, - if DiffMs > ?MAX_RUNTIME -> + if DiffMs > TO -> {reply, {error, partition}, S}; true -> %% This is suboptimal for performance: there are some paths @@ -294,30 +299,32 @@ do_append_head(Prefix, Chunk, ChunkExtra, Depth, STime, #state{proj=P}=S) -> P2 when P2 == undefined orelse P2#projection_v1.upi == [] -> do_append_head(Prefix, Chunk, ChunkExtra, Depth + 1, - STime, S2); + STime, TO, S2); _ -> do_append_head2(Prefix, Chunk, ChunkExtra, Depth + 1, - STime, S2) + STime, TO, S2) end end. -do_append_head2(Prefix, Chunk, ChunkExtra, Depth, STime, +do_append_head2(Prefix, Chunk, ChunkExtra, Depth, STime, TO, #state{proj=P}=S) -> [HeadFLU|_RestFLUs] = mutation_flus(P), case is_witness_flu(HeadFLU, P) of true -> case witnesses_use_our_epoch(S) of true -> - do_append_head3(Prefix, Chunk, ChunkExtra, Depth, STime, S); + do_append_head3(Prefix, Chunk, ChunkExtra, Depth, + STime, TO, S); false -> %% Bummer, go back to the beginning and retry. - do_append_head(Prefix, Chunk, ChunkExtra, Depth, STime, S) + do_append_head(Prefix, Chunk, ChunkExtra, Depth, + STime, TO, S) end; false -> - do_append_head3(Prefix, Chunk, ChunkExtra, Depth, STime, S) + do_append_head3(Prefix, Chunk, ChunkExtra, Depth, STime, TO, S) end. -do_append_head3(Prefix, Chunk, ChunkExtra, Depth, STime, +do_append_head3(Prefix, Chunk, ChunkExtra, Depth, STime, TO, #state{epoch_id=EpochID, proj=P, proxies_dict=PD}=S) -> [HeadFLU|RestFLUs] = non_witness_flus(mutation_flus(P), P), Proxy = orddict:fetch(HeadFLU, PD), @@ -327,33 +334,33 @@ do_append_head3(Prefix, Chunk, ChunkExtra, Depth, STime, {ok, {Offset, _Size, File}=_X} -> %% io:format(user, "append ~w,", [HeadFLU]), do_append_midtail(RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra, - [HeadFLU], 0, STime, S); + [HeadFLU], 0, STime, TO, S); {error, bad_checksum}=BadCS -> {reply, BadCS, S}; {error, Retry} when Retry == partition; Retry == bad_epoch; Retry == wedged -> - do_append_head(Prefix, Chunk, ChunkExtra, Depth, STime, S); + do_append_head(Prefix, Chunk, ChunkExtra, Depth, STime, TO, S); {error, written} -> %% Implicit sequencing + this error = we don't know where this %% written block is. But we lost a race. Repeat, with a new %% sequencer assignment. - do_append_head(Prefix, Chunk, ChunkExtra, Depth, STime, S); + do_append_head(Prefix, Chunk, ChunkExtra, Depth, STime, TO, S); {error, not_written} -> exit({todo_should_never_happen,?MODULE,?LINE, Prefix,iolist_size(Chunk)}) end. do_append_midtail(RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra, - Ws, Depth, STime, S) + Ws, Depth, STime, TO, S) when RestFLUs == [] orelse Depth == 0 -> do_append_midtail2(RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra, - Ws, Depth + 1, STime, S); + Ws, Depth + 1, STime, TO, S); do_append_midtail(_RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra, - Ws, Depth, STime, #state{proj=P}=S) -> + Ws, Depth, STime, TO, #state{proj=P}=S) -> %% io:format(user, "midtail sleep2,", []), sleep_a_while(Depth), DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, - if DiffMs > ?MAX_RUNTIME -> + if DiffMs > TO -> {reply, {error, partition}, S}; true -> S2 = update_proj(S#state{proj=undefined, bad_proj=P}), @@ -376,36 +383,36 @@ do_append_midtail(_RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra, {error, partition}; true -> do_append_head2(Prefix, Chunk, ChunkExtra, - Depth, STime, S2) + Depth, STime, TO, S2) end; RestFLUs3 -> do_append_midtail2(RestFLUs3, Prefix, File, Offset, Chunk, ChunkExtra, - Ws, Depth + 1, STime, S2) + Ws, Depth + 1, STime, TO, S2) end end end. do_append_midtail2([], _Prefix, File, Offset, Chunk, - _ChunkExtra, _Ws, _Depth, _STime, S) -> + _ChunkExtra, _Ws, _Depth, _STime, _TO, S) -> %% io:format(user, "ok!\n", []), {reply, {ok, {Offset, chunk_wrapper_size(Chunk), File}}, S}; do_append_midtail2([FLU|RestFLUs]=FLUs, Prefix, File, Offset, Chunk, - ChunkExtra, Ws, Depth, STime, + ChunkExtra, Ws, Depth, STime, TO, #state{epoch_id=EpochID, proxies_dict=PD}=S) -> Proxy = orddict:fetch(FLU, PD), case ?FLU_PC:write_chunk(Proxy, EpochID, File, Offset, Chunk, ?TIMEOUT) of ok -> %% io:format(user, "write ~w,", [FLU]), do_append_midtail2(RestFLUs, Prefix, File, Offset, Chunk, - ChunkExtra, [FLU|Ws], Depth, STime, S); + ChunkExtra, [FLU|Ws], Depth, STime, TO, S); {error, bad_checksum}=BadCS -> %% TODO: alternate strategy? {reply, BadCS, S}; {error, Retry} when Retry == partition; Retry == bad_epoch; Retry == wedged -> do_append_midtail(FLUs, Prefix, File, Offset, Chunk, - ChunkExtra, Ws, Depth, STime, S); + ChunkExtra, Ws, Depth, STime, TO, S); {error, written} -> %% We know what the chunk ought to be, so jump to the %% middle of read-repair. @@ -416,7 +423,7 @@ do_append_midtail2([FLU|RestFLUs]=FLUs, Prefix, File, Offset, Chunk, exit({todo_should_never_happen,?MODULE,?LINE,File,Offset}) end. -witnesses_use_our_epoch(#state{proj=P, epoch_id=EpochID}=S) -> +witnesses_use_our_epoch(#state{proj=P}=S) -> Witnesses = witness_flus(P#projection_v1.upi, P), witnesses_use_our_epoch(Witnesses, S). @@ -434,13 +441,13 @@ io:format(user, "Bummer, ~p uses ~P at ~p\n", [FLU, _Else, 7, now()]), false end. -do_write_head(File, Offset, Chunk, 0=Depth, STime, S) -> - do_write_head2(File, Offset, Chunk, Depth + 1, STime, S); -do_write_head(File, Offset, Chunk, Depth, STime, #state{proj=P}=S) -> +do_write_head(File, Offset, Chunk, 0=Depth, STime, TO, S) -> + do_write_head2(File, Offset, Chunk, Depth + 1, STime, TO, S); +do_write_head(File, Offset, Chunk, Depth, STime, TO, #state{proj=P}=S) -> %% io:format(user, "head sleep1,", []), sleep_a_while(Depth), DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, - if DiffMs > ?MAX_RUNTIME -> + if DiffMs > TO -> {reply, {error, partition}, S}; true -> %% This is suboptimal for performance: there are some paths @@ -451,13 +458,15 @@ do_write_head(File, Offset, Chunk, Depth, STime, #state{proj=P}=S) -> case S2#state.proj of P2 when P2 == undefined orelse P2#projection_v1.upi == [] -> - do_write_head(File, Offset, Chunk, Depth + 1, STime, S2); + do_write_head(File, Offset, Chunk, Depth + 1, + STime, TO, S2); _ -> - do_write_head2(File, Offset, Chunk, Depth + 1, STime, S2) + do_write_head2(File, Offset, Chunk, Depth + 1, + STime, TO, S2) end end. -do_write_head2(File, Offset, Chunk, Depth, STime, +do_write_head2(File, Offset, Chunk, Depth, STime, TO, #state{epoch_id=EpochID, proj=P, proxies_dict=PD}=S) -> [HeadFLU|RestFLUs] = mutation_flus(P), Proxy = orddict:fetch(HeadFLU, PD), @@ -466,12 +475,12 @@ do_write_head2(File, Offset, Chunk, Depth, STime, %% From this point onward, we use the same code & logic path as %% append does. do_append_midtail(RestFLUs, undefined, File, Offset, Chunk, - undefined, [HeadFLU], 0, STime, S); + undefined, [HeadFLU], 0, STime, TO, S); {error, bad_checksum}=BadCS -> {reply, BadCS, S}; {error, Retry} when Retry == partition; Retry == bad_epoch; Retry == wedged -> - do_write_head(File, Offset, Chunk, Depth, STime, S); + do_write_head(File, Offset, Chunk, Depth, STime, TO, S); {error, written}=Err -> Err; {error, not_written} -> @@ -479,27 +488,27 @@ do_write_head2(File, Offset, Chunk, Depth, STime, iolist_size(Chunk)}) end. -do_read_chunk(File, Offset, Size, 0=Depth, STime, +do_read_chunk(File, Offset, Size, 0=Depth, STime, TO, #state{proj=#projection_v1{upi=[_|_]}}=S) -> % UPI is non-empty - do_read_chunk2(File, Offset, Size, Depth + 1, STime, S); -do_read_chunk(File, Offset, Size, Depth, STime, #state{proj=P}=S) -> + do_read_chunk2(File, Offset, Size, Depth + 1, STime, TO, S); +do_read_chunk(File, Offset, Size, Depth, STime, TO, #state{proj=P}=S) -> %% io:format(user, "read sleep1,", []), sleep_a_while(Depth), DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, - if DiffMs > ?MAX_RUNTIME -> + if DiffMs > TO -> {reply, {error, partition}, S}; true -> S2 = update_proj(S#state{proj=undefined, bad_proj=P}), case S2#state.proj of P2 when P2 == undefined orelse P2#projection_v1.upi == [] -> - do_read_chunk(File, Offset, Size, Depth + 1, STime, S2); + do_read_chunk(File, Offset, Size, Depth + 1, STime, TO, S2); _ -> - do_read_chunk2(File, Offset, Size, Depth + 1, STime, S2) + do_read_chunk2(File, Offset, Size, Depth + 1, STime, TO, S2) end end. -do_read_chunk2(File, Offset, Size, Depth, STime, +do_read_chunk2(File, Offset, Size, Depth, STime, TO, #state{proj=P, epoch_id=EpochID, proxies_dict=PD}=S) -> UPI = readonly_flus(P), Tail = lists:last(UPI), @@ -518,7 +527,7 @@ do_read_chunk2(File, Offset, Size, Depth, STime, {reply, BadCS, S}; {error, Retry} when Retry == partition; Retry == bad_epoch; Retry == wedged -> - do_read_chunk(File, Offset, Size, Depth, STime, S); + do_read_chunk(File, Offset, Size, Depth, STime, TO, S); {error, not_written} -> read_repair(ConsistencyMode, read, File, Offset, Size, Depth, STime, S); %% {reply, {error, not_written}, S}; @@ -674,12 +683,12 @@ read_repair4([First|Rest]=ToRepair, ReturnMode, Chunk, Repaired, File, Offset, exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size}) end. -do_checksum_list(File, 0=Depth, STime, S) -> - do_checksum_list2(File, Depth + 1, STime, S); -do_checksum_list(File, Depth, STime, #state{proj=P}=S) -> +do_checksum_list(File, 0=Depth, STime, TO, S) -> + do_checksum_list2(File, Depth + 1, STime, TO, S); +do_checksum_list(File, Depth, STime, TO, #state{proj=P}=S) -> sleep_a_while(Depth), DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, - if DiffMs > ?MAX_RUNTIME -> + if DiffMs > TO -> {reply, {error, partition}, S}; true -> %% This is suboptimal for performance: there are some paths @@ -690,13 +699,13 @@ do_checksum_list(File, Depth, STime, #state{proj=P}=S) -> case S2#state.proj of P2 when P2 == undefined orelse P2#projection_v1.upi == [] -> - do_checksum_list(File, Depth + 1, STime, S2); + do_checksum_list(File, Depth + 1, STime, TO, S2); _ -> - do_checksum_list2(File, Depth + 1, STime, S2) + do_checksum_list2(File, Depth + 1, STime, TO, S2) end end. -do_checksum_list2(File, Depth, STime, +do_checksum_list2(File, Depth, STime, TO, #state{epoch_id=EpochID, proj=P, proxies_dict=PD}=S) -> Proxy = orddict:fetch(lists:last(readonly_flus(P)), PD), case ?FLU_PC:checksum_list(Proxy, EpochID, File, ?TIMEOUT) of @@ -704,17 +713,17 @@ do_checksum_list2(File, Depth, STime, {reply, OK, S}; {error, Retry} when Retry == partition; Retry == bad_epoch; Retry == wedged -> - do_checksum_list(File, Depth, STime, S); + do_checksum_list(File, Depth, STime, TO, S); {error, _}=Error -> {reply, Error, S} end. -do_list_files(0=Depth, STime, S) -> - do_list_files2(Depth + 1, STime, S); -do_list_files(Depth, STime, #state{proj=P}=S) -> +do_list_files(0=Depth, STime, TO, S) -> + do_list_files2(Depth + 1, STime, TO, S); +do_list_files(Depth, STime, TO, #state{proj=P}=S) -> sleep_a_while(Depth), DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, - if DiffMs > ?MAX_RUNTIME -> + if DiffMs > TO -> {reply, {error, partition}, S}; true -> %% This is suboptimal for performance: there are some paths @@ -725,13 +734,13 @@ do_list_files(Depth, STime, #state{proj=P}=S) -> case S2#state.proj of P2 when P2 == undefined orelse P2#projection_v1.upi == [] -> - do_list_files(Depth + 1, STime, S2); + do_list_files(Depth + 1, STime, TO, S2); _ -> - do_list_files2(Depth + 1, STime, S2) + do_list_files2(Depth + 1, STime, TO, S2) end end. -do_list_files2(Depth, STime, +do_list_files2(Depth, STime, TO, #state{epoch_id=EpochID, proj=P, proxies_dict=PD}=S) -> Proxy = orddict:fetch(lists:last(readonly_flus(P)), PD), case ?FLU_PC:list_files(Proxy, EpochID, ?TIMEOUT) of @@ -739,7 +748,7 @@ do_list_files2(Depth, STime, {reply, OK, S}; {error, Retry} when Retry == partition; Retry == bad_epoch; Retry == wedged -> - do_list_files(Depth, STime, S); + do_list_files(Depth, STime, TO, S); {error, _}=Error -> {reply, Error, S} end. @@ -877,3 +886,15 @@ chunk_wrapper_size({_TaggedCSum, Chunk}) -> iolist_size(Chunk); chunk_wrapper_size(Chunk) -> iolist_size(Chunk). + +%% The intra-gen_server-timeout is approximate. We'll add some 30 +%% seconds of fudge to the user-specified time to accomodate the +%% imprecision. Using 'infinity' would definitely handle the case of +%% horrible time consumption by the gen_server proc, but I'm slightly +%% leery of really waiting forever: better to have an +%% {'EXIT',{timeout,_}} to raise awareness of a serious problem. + +timeout(infinity) -> + timeout(15*60*1000); % close enough to infinity +timeout(Timeout0) -> + {Timeout0, Timeout0 + 30*1000}.