diff --git a/src/machi_cr_client.erl b/src/machi_cr_client.erl index cf282e2..bc3d1e6 100644 --- a/src/machi_cr_client.erl +++ b/src/machi_cr_client.erl @@ -66,6 +66,7 @@ -define(FLU_PC, machi_proxy_flu1_client). -define(TIMEOUT, 2*1000). +-define(MAX_RUNTIME, 5*1000). -record(state, { members_dict :: p_srvr_dict(), @@ -279,43 +280,134 @@ code_change(_OldVsn, S, _Extra) -> %%%%%%%%%%%%%%%%%%%%%%%%%%% handle_call2({append_chunk_extra, Prefix, Chunk, ChunkExtra}, _From, S) -> - do_append_chunk_extra1(Prefix, Chunk, ChunkExtra, S). + do_append_head(Prefix, Chunk, ChunkExtra, 0, os:timestamp(), S). -do_append_chunk_extra1(Prefix, Chunk, ChunkExtra, - #state{epoch_id=EpochID, proj=P, - proxies_dict=PD}=S) -> - #projection_v1{upi=[HeadFLU|RestFLUs]} = P, - case ?FLU_PC:append_chunk_extra(orddict:fetch(HeadFLU, PD), +do_append_head(Prefix, Chunk, ChunkExtra, 0=Depth, STime, S) -> + do_append_head2(Prefix, Chunk, ChunkExtra, Depth + 1, STime, S); +do_append_head(Prefix, Chunk, ChunkExtra, Depth, STime, #state{proj=P}=S) -> + io:format(user, "append sleep1,", []), + sleep_a_while(Depth), + DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, + if DiffMs > ?MAX_RUNTIME -> + {error, partition}; + true -> + %% This is suboptimal for performance: there are some paths + %% through this point where our current projection is good + %% enough. But we're going to try to keep the code as simple + %% as we can for now. + S2 = update_proj(S#state{proj=undefined, bad_proj=P}), + case S2#state.proj of + P2 when P2 == undefined orelse + P2#projection_v1.upi == [] -> + do_append_head(Prefix, Chunk, ChunkExtra, Depth + 1, + STime, S); + _ -> + do_append_head2(Prefix, Chunk, ChunkExtra, Depth + 1, + STime, S) + end + end. + +do_append_head2(Prefix, Chunk, ChunkExtra, Depth, STime, + #state{epoch_id=EpochID, proj=P, proxies_dict=PD}=S) -> + [HeadFLU|RestFLUs] = mutation_flus(P), + Proxy = orddict:fetch(HeadFLU, PD), + io:format(user, "append ~w,", [Proxy]), + case ?FLU_PC:append_chunk_extra(Proxy, EpochID, Prefix, Chunk, ChunkExtra, ?TIMEOUT) of {ok, {Offset, _Size, File}=_X} -> - do_append_chunk_extra2(RestFLUs, File, Offset, Chunk, - HeadFLU, 1, S); - {error, Change} when Change == bad_epoch; Change == wedged -> - S2 = update_proj(S#state{proj=undefined, bad_proj=P}), - do_append_chunk_extra1(Prefix, Chunk, ChunkExtra, S2) - %% TODO return values here + %% io:format(user, "append ~w@~p,~w,", [HeadFLU, File, Offset]), + io:format(user, "append ~w,", [HeadFLU]), + do_append_midtail(RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra, + [HeadFLU], 0, STime, S); + {error, Retry} + when Retry == partition; Retry == bad_epoch; Retry == wedged -> + do_append_head(Prefix, Chunk, ChunkExtra, Depth, STime, S); + {error, written} -> + %% Implicit sequencing + this error = we don't know where this + %% written block is. But we lost a race. Repeat, with a new + %% sequencer assignment. + do_append_head(Prefix, Chunk, ChunkExtra, Depth, STime, S); + {error, not_written} -> + exit({todo_should_never_happen,?MODULE,?LINE, + Prefix,iolist_size(Chunk)}) end. -do_append_chunk_extra2([], File, Offset, Chunk, _OldHeadFLU, _OkCount, S) -> - {reply, {ok, {Offset, size(Chunk), File}}, S}; -do_append_chunk_extra2([FLU|RestFLUs], File, Offset, Chunk, OldHeadFLU, OkCount, - #state{epoch_id=EpochID, proj=P, - proxies_dict=PD}=S) -> - case ?FLU_PC:write_chunk(orddict:fetch(FLU, PD), - EpochID, File, Offset, Chunk, ?TIMEOUT) of - ok -> - do_append_chunk_extra2(RestFLUs, File, Offset, Chunk, - OldHeadFLU, OkCount + 1, S); - {error, Change} when Change == bad_epoch; Change == wedged -> +do_append_midtail(RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra, + Ws, Depth, STime, S) + when RestFLUs == [] orelse Depth == 0 -> + do_append_midtail2(RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra, + Ws, Depth + 1, STime, S); +do_append_midtail(RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra, + Ws, Depth, STime, #state{proj=P}=S) -> + io:format(user, "append sleep2,", []), + sleep_a_while(Depth), + DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, + if DiffMs > ?MAX_RUNTIME -> + {error, partition}; + true -> S2 = update_proj(S#state{proj=undefined, bad_proj=P}), - #projection_v1{upi=[NewHeadFLU|NewRestFLUs]} = S2#state.proj, - if OkCount == 1, NewHeadFLU == OldHeadFLU -> - do_append_chunk_extra2(NewRestFLUs, File, Offset, Chunk, - OldHeadFLU, OkCount, S2); - true -> - {error, partition} + case S2#state.proj of + undefined -> + {error, partition}; + P2 -> + RestFLUs2 = mutation_flus(P2), + case RestFLUs2 -- Ws of + RestFLUs -> + %% None of the writes that we have done so far + %% are to FLUs that are in the RestFLUs2 list. + %% We are pessimistic here and assume that + %% those FLUs are permanently dead. Start + %% over with a new sequencer assignment, at + %% the 2nd have of the impl (we have already + %% slept & refreshed the projection). + do_append_head2(Prefix, Chunk, ChunkExtra, Depth, + STime, S); + RestFLUs3 -> + do_append_midtail2(RestFLUs3, Prefix, File, Offset, + Chunk, ChunkExtra, + Ws, Depth + 1, STime, S) + end end + end. + +do_append_midtail2([], _Prefix, File, Offset, Chunk, + _ChunkExtra, _Ws, _Depth, _STime, S) -> + io:format(user, "ok!\n", []), + {reply, {ok, {Offset, iolist_size(Chunk), File}}, S}; +do_append_midtail2([FLU|RestFLUs]=FLUs, Prefix, File, Offset, Chunk, + ChunkExtra, Ws, Depth, STime, + #state{epoch_id=EpochID, proxies_dict=PD}=S) -> + Proxy = orddict:fetch(FLU, PD), + io:format(user, "write ~w,", [Proxy]), + case ?FLU_PC:write_chunk(Proxy, EpochID, File, Offset, Chunk, ?TIMEOUT) of + ok -> + %% io:format(user, "write ~w@~p~w,", [FLU, File, Offset]), + io:format(user, "write ~w,", [FLU]), + do_append_midtail2(RestFLUs, Prefix, File, Offset, Chunk, + ChunkExtra, [FLU|Ws], Depth, STime, S); + {error, Retry} + when Retry == partition; Retry == bad_epoch; Retry == wedged -> + do_append_midtail(FLUs, Prefix, File, Offset, Chunk, + ChunkExtra, Ws, Depth, STime, S); + {error, written} -> + exit({todo,read_repair,?MODULE,?LINE,File,Offset,iolist_size(Chunk)}) + %% read_repair(P#projection_v1.upi ++ P#projection_v1.repairing, + %% Chunk, Depth, STime, S) + %% Chunk1 = if is_binary(Chunk) -> Chunk; + %% is_list(Chunk) -> list_to_binary(Chunk) + %% end, + %% case ?FLU_PC:read_chunk(Proxy, EpochID, File, Offset, + %% size(Chunk1), ?TIMEOUT) of + %% {ok, Chunk2} when Chunk2 == Chunk1 -> + %% %% Someone has been read-repairing this chunk. + %% %% Keep going. + %% do_append_chunk_extra2(RestFLUs, File, Offset, Chunk, + %% OldHeadFLU, OkCount + 1, S); + %% {error, not_written} -> + %% exit({todo_should_never_happen,?MODULE,?LINE, + %% File,Offset, size(Chunk1)}); + %% { -> %% TODO return values here end. @@ -328,7 +420,7 @@ update_proj2(Count, #state{bad_proj=BadProj, proxies_dict=ProxiesDict}=S) -> Timeout = 2*1000, Parent = self(), Proxies = orddict:to_list(ProxiesDict), - Worker = spawn( + MiddleWorker = spawn( fun() -> PidsMons = [spawn_monitor(fun() -> @@ -339,7 +431,7 @@ update_proj2(Count, #state{bad_proj=BadProj, proxies_dict=ProxiesDict}=S) -> Parent ! {res, self(), Rs}, exit(normal) end), - Rs = receive {res, Worker, Results} -> Results + Rs = receive {res, MiddleWorker, Results} -> Results after Timeout*2 -> [] end, %% TODO: There's a possible bug here when running multiple independent @@ -352,14 +444,14 @@ update_proj2(Count, #state{bad_proj=BadProj, proxies_dict=ProxiesDict}=S) -> P when P >= BadProj -> #projection_v1{epoch_number=Epoch, epoch_csum=CSum, members_dict=NewMembersDict} = P, - io:format(user, "~s: proj ~P\n", [?MODULE, P, 10]), + io:format(user, "~s: proj ~P\n", [?MODULE, P, 15]), EpochID = {Epoch, CSum}, ?FLU_PC:stop_proxies(ProxiesDict), NewProxiesDict = ?FLU_PC:start_proxies(NewMembersDict), S#state{bad_proj=undefined, proj=P, epoch_id=EpochID, members_dict=NewMembersDict, proxies_dict=NewProxiesDict}; _ -> - timer:sleep(10 + (Count * 20)), + sleep_a_while(Count), update_proj2(Count + 1, S) end. @@ -382,5 +474,22 @@ choose_best_proj(Rs) -> BestEpoch end, WorstEpoch, Rs). +mutation_flus(#projection_v1{upi=UPI, repairing=Repairing}) -> + UPI ++ Repairing; +mutation_flus(#state{proj=P}) -> + mutation_flus(P). + +readonly_flus(#projection_v1{upi=UPI}) -> + UPI; +readonly_flus(#state{proj=P}) -> + readonly_flus(P). + +sleep_a_while(0) -> + ok; +sleep_a_while(1) -> + ok; +sleep_a_while(Depth) -> + timer:sleep(30 + trunc(math:pow(1.9, Depth))). + noop() -> ok.