diff --git a/src/machi_cr_client.erl b/src/machi_cr_client.erl index 2de4117..65ecdfa 100644 --- a/src/machi_cr_client.erl +++ b/src/machi_cr_client.erl @@ -65,6 +65,7 @@ terminate/2, code_change/3]). -define(FLU_PC, machi_proxy_flu1_client). +-define(TIMEOUT, 2*1000). -record(state, { members_dict :: p_srvr_dict(), @@ -278,7 +279,46 @@ code_change(_OldVsn, S, _Extra) -> %%%%%%%%%%%%%%%%%%%%%%%%%%% handle_call2({append_chunk_extra, Prefix, Chunk, ChunkExtra}, _From, S) -> - {reply, sorry_dude, S}. + do_append_chunk_extra1(Prefix, Chunk, ChunkExtra, S). + +do_append_chunk_extra1(Prefix, Chunk, ChunkExtra, + #state{epoch_id=EpochID, proj=P, + proxies_dict=PD}=S) -> + #projection_v1{upi=[HeadFLU|RestFLUs]} = P, + case ?FLU_PC:append_chunk_extra(orddict:fetch(HeadFLU, PD), + EpochID, Prefix, Chunk, ChunkExtra, + ?TIMEOUT) of + {ok, {Offset, _Size, File}=_X} -> + io:format(user, "TODO: X ~p\n", [_X]), + do_append_chunk_extra2(RestFLUs, File, Offset, Chunk, + HeadFLU, 1, S); + {error, Change} when Change == bad_epoch; Change == wedged -> + S2 = update_proj(S#state{proj=undefined, bad_proj=P}), + do_append_chunk_extra1(Prefix, Chunk, ChunkExtra, S2) + %% TODO return values here + end. + +do_append_chunk_extra2([], File, Offset, _Chunk, _OldHeadFLU, _OkCount, S) -> + {reply, {ok, {File, Offset}}, S}; +do_append_chunk_extra2([FLU|RestFLUs], File, Offset, Chunk, OldHeadFLU, OkCount, + #state{epoch_id=EpochID, proj=P, + proxies_dict=PD}=S) -> + case ?FLU_PC:write_chunk(orddict:fetch(FLU, PD), + EpochID, File, Offset, Chunk, ?TIMEOUT) of + ok -> + do_append_chunk_extra2(RestFLUs, File, Offset, Chunk, + OldHeadFLU, OkCount + 1, S); + {error, Change} when Change == bad_epoch; Change == wedged -> + S2 = update_proj(S#state{proj=undefined, bad_proj=P}), + #projection_v1{upi=[NewHeadFLU|NewRestFLUs]} = S2#state.proj, + if OkCount == 1, NewHeadFLU == OldHeadFLU -> + do_append_chunk_extra2(NewRestFLUs, File, Offset, Chunk, + OldHeadFLU, OkCount, S2); + true -> + {error, partition} + end + %% TODO return values here + end. update_proj(#state{proj=undefined}=S) -> update_proj2(1, S); @@ -287,18 +327,38 @@ update_proj(S) -> update_proj2(Count, #state{bad_proj=BadProj, proxies_dict=ProxiesDict}=S) -> Timeout = 2*1000, - PidsMons = - [spawn_monitor(fun() -> - exit(catch ?FLU_PC:read_latest_projection( - Proxy, private, Timeout)) - end) || {_K, Proxy} <- orddict:to_list(ProxiesDict)], - Rs = gather_worker_statuses(PidsMons, Timeout*2), + Parent = self(), + Proxies = orddict:to_list(ProxiesDict), + Worker = spawn( + fun() -> + PidsMons = + [spawn_monitor(fun() -> + exit(catch ?FLU_PC:read_latest_projection( + Proxy, private, Timeout)) + end) || {_K, Proxy} <- Proxies], + Rs = gather_worker_statuses(PidsMons, Timeout*2), + Parent ! {res, self(), Rs}, + exit(normal) + end), + Rs = receive {res, Worker, Results} -> Results + after Timeout*2 -> [] + end, + %% TODO: There's a possible bug here when running multiple independent + %% Machi clusters/chains. If our chain used to be [a,b], but our + %% sysadmin has changed our cluster to be [a] and a completely seprate + %% cluster with [b], and if b is reusing the address & port number, + %% then it is possible that choose_best_projs() can incorrectly choose + %% b's projection. case choose_best_proj(Rs) of P when P >= BadProj -> + #projection_v1{epoch_number=Epoch, epoch_csum=CSum, + members_dict=NewMembersDict} = P, io:format(user, "~s: proj ~P\n", [?MODULE, P, 10]), - EpochID = {P#projection_v1.epoch_number, - P#projection_v1.epoch_csum}, - S#state{bad_proj=undefined, proj=EpochID, epoch_id=EpochID}; + EpochID = {Epoch, CSum}, + ?FLU_PC:stop_proxies(ProxiesDict), + NewProxiesDict = ?FLU_PC:start_proxies(NewMembersDict), + S#state{bad_proj=undefined, proj=P, epoch_id=EpochID, + members_dict=NewMembersDict, proxies_dict=NewProxiesDict}; _ -> timer:sleep(10 + (Count * 20)), update_proj2(Count + 1, S) @@ -315,7 +375,7 @@ gather_worker_statuses([{Pid,Ref}|Rest], Timeout) -> end. choose_best_proj(Rs) -> - WorstEpoch = #projection_v1{epoch_number=-1}, + WorstEpoch = #projection_v1{epoch_number=-1,epoch_csum= <<>>}, lists:foldl(fun({ok, NewEpoch}, BestEpoch) when NewEpoch > BestEpoch -> NewEpoch;