%% ------------------------------------------------------------------- %% %% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved. %% %% This file is provided to you under the Apache License, %% Version 2.0 (the "License"); you may not use this file %% except in compliance with the License. You may obtain %% a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, %% software distributed under the License is distributed on an %% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY %% KIND, either express or implied. See the License for the %% specific language governing permissions and limitations %% under the License. %% %% ------------------------------------------------------------------- %% @doc Erlang API for the Machi client-implemented Chain Replication %% (CORFU-style) protocol. %% %% See also the docs for {@link machi_flu1_client} for additional %% details on data types and operation descriptions. %% %% The API here is much simpler than the {@link machi_flu1_client} or %% {@link machi_proxy_flu1_client} APIs. This module's API is a %% proposed simple-but-complete form for clients who are not %% interested in being an active participant in a Machi cluster and to %% have the responsibility for Machi internals, i.e., client-side %% Chain Replication, client-side read repair, client-side tracking of %% internal Machi epoch & projection changes, etc. %% %% This client is implemented as a long-lived Erlang process using %% `gen_server'-style OTP code practice. A naive client can expect %% that this process will manage all transient TCP session %% disconnections and Machi chain reconfigurations. This client's %% efforts are best-effort and can require some time to retry %% operations in certain failure cases, i.e., up to several seconds %% during a Machi projection & epoch change when a new server is %% added to the chain. %% %% Doc TODO: Once this API stabilizes, add all relevant data type details %% to the EDoc here. %% %% %% === Missing API features === %% %% So far, there is one missing client API feature that ought to be %% added to Machi in the near future: more flexible checksum %% management. %% %% Add a `source' annotation to all checksums to indicate where the %% checksum was calculated. For example, %% %% %% %% Client-side checksums would be the "strongest" type of %% checksum, meaning that any data corruption (of the original %% data and/or of the checksum itself) can be detected after the %% client-side calculation. There are too many horror stories on %% The Net about IP PDUs that are corrupted but unnoticed due to %% weak TCP checksums, buggy hardware, buggy OS drivers, etc. %% Checksum versioning is also desirable if/when the current checksum %% implementation changes from SHA-1 to something else. %% %% %% === Implementation notes === %% %% The major operation processing is implemented in a state machine-like %% manner. Before attempting an operation `X', there's an initial %% operation `pre-X' that takes care of updating the epoch id, %% restarting client protocol proxies, and if there's any server %% instability (e.g. some server is wedged), then insert some sleep %% time. When the chain appears to have stabilized, then we try the `X' %% operation again. %% %% Function name for the `pre-X' stuff is usually `X()', and the %% function name for the `X' stuff is usually `X2()'. (I.e., the `X' %% stuff follows after `pre-X' and therefore has a `2' suffix on the %% function name.) %% %% In the case of read repair, there are two stages: find the value to %% perform the repair, then perform the repair writes. In the case of %% the repair writes, the `pre-X' function is named `read_repair3()', %% and the `X' function is named `read_repair4()'. %% %% TODO: It would be nifty to lift the very-nearly-but-not-quite-boilerplate %% of the `pre-X' functions into a single common function ... but I'm not %% sure yet on how to do it without making the code uglier. -module(machi_cr_client). -behaviour(gen_server). -include("machi.hrl"). -include("machi_projection.hrl"). -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). -endif. % TEST. -export([start_link/1]). %% FLU1 API -export([ %% File API append_chunk/3, append_chunk/4, append_chunk_extra/4, append_chunk_extra/5, read_chunk/4, read_chunk/5, checksum_list/2, checksum_list/3, list_files/1, list_files/2, %% Common API quit/1 ]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -define(FLU_PC, machi_proxy_flu1_client). -define(TIMEOUT, 2*1000). -define(DEFAULT_TIMEOUT, 10*1000). -define(MAX_RUNTIME, 8*1000). -record(state, { members_dict :: p_srvr_dict(), proxies_dict :: orddict:orddict(), epoch_id, proj, bad_proj }). %% @doc Start a local, long-lived process that will be our steady %% & reliable communication proxy with the fickle & flaky %% remote Machi server. start_link(P_srvr_list) -> gen_server:start_link(?MODULE, [P_srvr_list], []). %% @doc Append a chunk (binary- or iolist-style) of data to a file %% with `Prefix'. append_chunk(PidSpec, Prefix, Chunk) -> append_chunk(PidSpec, Prefix, Chunk, ?DEFAULT_TIMEOUT). %% @doc Append a chunk (binary- or iolist-style) of data to a file %% with `Prefix'. append_chunk(PidSpec, Prefix, Chunk, Timeout) -> append_chunk_extra(PidSpec, Prefix, Chunk, 0, Timeout). %% @doc Append a chunk (binary- or iolist-style) of data to a file %% with `Prefix'. append_chunk_extra(PidSpec, Prefix, Chunk, ChunkExtra) when is_integer(ChunkExtra), ChunkExtra >= 0 -> append_chunk_extra(PidSpec, Prefix, Chunk, ChunkExtra, ?DEFAULT_TIMEOUT). %% @doc Append a chunk (binary- or iolist-style) of data to a file %% with `Prefix'. append_chunk_extra(PidSpec, Prefix, Chunk, ChunkExtra, Timeout) -> gen_server:call(PidSpec, {req, {append_chunk_extra, Prefix, Chunk, ChunkExtra}}, Timeout). %% @doc Read a chunk of data of size `Size' from `File' at `Offset'. read_chunk(PidSpec, File, Offset, Size) -> read_chunk(PidSpec, File, Offset, Size, ?DEFAULT_TIMEOUT). %% @doc Read a chunk of data of size `Size' from `File' at `Offset'. read_chunk(PidSpec, File, Offset, Size, Timeout) -> gen_server:call(PidSpec, {req, {read_chunk, File, Offset, Size}}, Timeout). %% @doc Fetch the list of chunk checksums for `File'. checksum_list(PidSpec, File) -> checksum_list(PidSpec, File, ?DEFAULT_TIMEOUT). %% @doc Fetch the list of chunk checksums for `File'. checksum_list(PidSpec, File, Timeout) -> gen_server:call(PidSpec, {req, {checksum_list, File}}, Timeout). %% @doc Fetch the list of all files on the remote FLU. list_files(PidSpec) -> list_files(PidSpec, ?DEFAULT_TIMEOUT). %% @doc Fetch the list of all files on the remote FLU. list_files(PidSpec, Timeout) -> gen_server:call(PidSpec, {req, {list_files}}, Timeout). %% @doc Quit & close the connection to remote FLU and stop our %% proxy process. quit(PidSpec) -> gen_server:call(PidSpec, quit, ?DEFAULT_TIMEOUT). %%%%%%%%%%%%%%%%%%%%%%%%%%% init([P_srvr_list]) -> MembersDict = orddict:from_list([{P#p_srvr.name, P} || P <- P_srvr_list]), ProxiesDict = ?FLU_PC:start_proxies(MembersDict), {ok, #state{members_dict=MembersDict, proxies_dict=ProxiesDict}}. handle_call({req, Req}, From, S) -> handle_call2(Req, From, update_proj(S)); handle_call(quit, _From, S) -> {stop, normal, ok, S}; handle_call(_Request, _From, S) -> Reply = whaaaaaaaaaaaaaaaaaaaa, {reply, Reply, S}. handle_cast(_Msg, S) -> {noreply, S}. handle_info(_Info, S) -> io:format(user, "~s:handle_info: ~p\n", [?MODULE, _Info]), {noreply, S}. terminate(_Reason, #state{proxies_dict=ProxiesDict}=_S) -> _ = ?FLU_PC:stop_proxies(ProxiesDict), ok. code_change(_OldVsn, S, _Extra) -> {ok, S}. %%%%%%%%%%%%%%%%%%%%%%%%%%% handle_call2({append_chunk_extra, Prefix, Chunk, ChunkExtra}, _From, S) -> do_append_head(Prefix, Chunk, ChunkExtra, 0, os:timestamp(), S); handle_call2({read_chunk, File, Offset, Size}, _From, S) -> do_read_chunk(File, Offset, Size, 0, os:timestamp(), S); handle_call2({checksum_list, File}, _From, S) -> do_checksum_list(File, 0, os:timestamp(), S); handle_call2({list_files}, _From, S) -> do_list_files(0, os:timestamp(), S). do_append_head(Prefix, Chunk, ChunkExtra, 0=Depth, STime, S) -> do_append_head2(Prefix, Chunk, ChunkExtra, Depth + 1, STime, S); do_append_head(Prefix, Chunk, ChunkExtra, Depth, STime, #state{proj=P}=S) -> %% io:format(user, "head sleep1,", []), sleep_a_while(Depth), DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, if DiffMs > ?MAX_RUNTIME -> {reply, {error, partition}, S}; true -> %% This is suboptimal for performance: there are some paths %% through this point where our current projection is good %% enough. But we're going to try to keep the code as simple %% as we can for now. S2 = update_proj(S#state{proj=undefined, bad_proj=P}), case S2#state.proj of P2 when P2 == undefined orelse P2#projection_v1.upi == [] -> do_append_head(Prefix, Chunk, ChunkExtra, Depth + 1, STime, S2); _ -> do_append_head2(Prefix, Chunk, ChunkExtra, Depth + 1, STime, S2) end end. do_append_head2(Prefix, Chunk, ChunkExtra, Depth, STime, #state{epoch_id=EpochID, proj=P, proxies_dict=PD}=S) -> [HeadFLU|RestFLUs] = mutation_flus(P), Proxy = orddict:fetch(HeadFLU, PD), case ?FLU_PC:append_chunk_extra(Proxy, EpochID, Prefix, Chunk, ChunkExtra, ?TIMEOUT) of {ok, {Offset, _Size, File}=_X} -> %% io:format(user, "append ~w,", [HeadFLU]), do_append_midtail(RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra, [HeadFLU], 0, STime, S); {error, Retry} when Retry == partition; Retry == bad_epoch; Retry == wedged -> do_append_head(Prefix, Chunk, ChunkExtra, Depth, STime, S); {error, written} -> %% Implicit sequencing + this error = we don't know where this %% written block is. But we lost a race. Repeat, with a new %% sequencer assignment. do_append_head(Prefix, Chunk, ChunkExtra, Depth, STime, S); {error, not_written} -> exit({todo_should_never_happen,?MODULE,?LINE, Prefix,iolist_size(Chunk)}) end. do_append_midtail(RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra, Ws, Depth, STime, S) when RestFLUs == [] orelse Depth == 0 -> do_append_midtail2(RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra, Ws, Depth + 1, STime, S); do_append_midtail(_RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra, Ws, Depth, STime, #state{proj=P}=S) -> %% io:format(user, "midtail sleep2,", []), sleep_a_while(Depth), DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, if DiffMs > ?MAX_RUNTIME -> {reply, {error, partition}, S}; true -> S2 = update_proj(S#state{proj=undefined, bad_proj=P}), case S2#state.proj of undefined -> {reply, {error, partition}, S}; P2 -> RestFLUs2 = mutation_flus(P2), case RestFLUs2 -- Ws of RestFLUs2 -> %% None of the writes that we have done so far %% are to FLUs that are in the RestFLUs2 list. %% We are pessimistic here and assume that %% those FLUs are permanently dead. Start %% over with a new sequencer assignment, at %% the 2nd have of the impl (we have already %% slept & refreshed the projection). do_append_head2(Prefix, Chunk, ChunkExtra, Depth, STime, S2); RestFLUs3 -> do_append_midtail2(RestFLUs3, Prefix, File, Offset, Chunk, ChunkExtra, Ws, Depth + 1, STime, S2) end end end. do_append_midtail2([], _Prefix, File, Offset, Chunk, _ChunkExtra, _Ws, _Depth, _STime, S) -> %% io:format(user, "ok!\n", []), {reply, {ok, {Offset, iolist_size(Chunk), File}}, S}; do_append_midtail2([FLU|RestFLUs]=FLUs, Prefix, File, Offset, Chunk, ChunkExtra, Ws, Depth, STime, #state{epoch_id=EpochID, proxies_dict=PD}=S) -> Proxy = orddict:fetch(FLU, PD), case ?FLU_PC:write_chunk(Proxy, EpochID, File, Offset, Chunk, ?TIMEOUT) of ok -> %% io:format(user, "write ~w,", [FLU]), do_append_midtail2(RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra, [FLU|Ws], Depth, STime, S); {error, Retry} when Retry == partition; Retry == bad_epoch; Retry == wedged -> do_append_midtail(FLUs, Prefix, File, Offset, Chunk, ChunkExtra, Ws, Depth, STime, S); {error, written} -> %% We know what the chunk ought to be, so jump to the %% middle of read-repair. Resume = {append, Offset, iolist_size(Chunk), File}, read_repair3(FLUs, Resume, Chunk, [], File, Offset, iolist_size(Chunk), Depth, STime, S); {error, not_written} -> exit({todo_should_never_happen,?MODULE,?LINE,File,Offset}) end. do_read_chunk(File, Offset, Size, 0=Depth, STime, #state{proj=#projection_v1{upi=[_|_]}}=S) -> % UPI is non-empty do_read_chunk2(File, Offset, Size, Depth + 1, STime, S); do_read_chunk(File, Offset, Size, Depth, STime, #state{proj=P}=S) -> %% io:format(user, "read sleep1,", []), sleep_a_while(Depth), DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, if DiffMs > ?MAX_RUNTIME -> {reply, {error, partition}, S}; true -> S2 = update_proj(S#state{proj=undefined, bad_proj=P}), case S2#state.proj of P2 when P2 == undefined orelse P2#projection_v1.upi == [] -> do_read_chunk(File, Offset, Size, Depth + 1, STime, S2); _ -> do_read_chunk2(File, Offset, Size, Depth + 1, STime, S2) end end. do_read_chunk2(File, Offset, Size, Depth, STime, #state{proj=P, epoch_id=EpochID, proxies_dict=PD}=S) -> UPI = readonly_flus(P), Tail = lists:last(UPI), ConsistencyMode = P#projection_v1.mode, case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), EpochID, File, Offset, Size, ?TIMEOUT) of {ok, Chunk} when byte_size(Chunk) == Size -> {reply, {ok, Chunk}, S}; {ok, BadChunk} -> exit({todo, bad_chunk_size, ?MODULE, ?LINE, File, Offset, Size, got, byte_size(BadChunk)}); {error, Retry} when Retry == partition; Retry == bad_epoch; Retry == wedged -> do_read_chunk(File, Offset, Size, Depth, STime, S); {error, not_written} -> read_repair(ConsistencyMode, read, File, Offset, Size, Depth, STime, S); %% {reply, {error, not_written}, S}; {error, written} -> exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size}) end. %% Read repair: depends on the consistency mode that we're in: %% %% CP mode: If the head is written, then use it to repair UPI++Repairing. %% If head is not_written, then do nothing. %% AP mode: If any FLU in UPI++Repairing is written, then use it to repair %% UPI+repairing. %% If all FLUs in UPI++Repairing are not_written, then do nothing. read_repair(ConsistencyMode, ReturnMode, File, Offset, Size, 0=Depth, STime, #state{proj=#projection_v1{upi=[_|_]}}=S) -> % UPI is non-empty read_repair2(ConsistencyMode, ReturnMode, File, Offset, Size, Depth + 1, STime, S); read_repair(ConsistencyMode, ReturnMode, File, Offset, Size, Depth, STime, #state{proj=P}=S) -> sleep_a_while(Depth), DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, if DiffMs > ?MAX_RUNTIME -> {reply, {error, partition}, S}; true -> S2 = update_proj(S#state{proj=undefined, bad_proj=P}), case S2#state.proj of P2 when P2 == undefined orelse P2#projection_v1.upi == [] -> read_repair(ConsistencyMode, ReturnMode, File, Offset, Size, Depth + 1, STime, S2); _ -> read_repair2(ConsistencyMode, ReturnMode, File, Offset, Size, Depth + 1, STime, S2) end end. read_repair2(cp_mode=ConsistencyMode, ReturnMode, File, Offset, Size, Depth, STime, #state{proj=P, epoch_id=EpochID, proxies_dict=PD}=S) -> Tail = lists:last(readonly_flus(P)), case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), EpochID, File, Offset, Size, ?TIMEOUT) of {ok, Chunk} when byte_size(Chunk) == Size -> ToRepair = mutation_flus(P) -- [Tail], read_repair3(ToRepair, ReturnMode, Chunk, [Tail], File, Offset, Size, Depth, STime, S); {ok, BadChunk} -> exit({todo, bad_chunk_size, ?MODULE, ?LINE, File, Offset, Size, got, byte_size(BadChunk)}); {error, Retry} when Retry == partition; Retry == bad_epoch; Retry == wedged -> read_repair(ConsistencyMode, ReturnMode, File, Offset, Size, Depth, STime, S); {error, not_written} -> {reply, {error, not_written}, S}; {error, written} -> exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size}) end; read_repair2(ap_mode=ConsistencyMode, ReturnMode, File, Offset, Size, Depth, STime, #state{proj=P}=S) -> Eligible = mutation_flus(P), case try_to_find_chunk(Eligible, File, Offset, Size, S) of {ok, Chunk, GotItFrom} when byte_size(Chunk) == Size -> ToRepair = mutation_flus(P) -- [GotItFrom], read_repair3(ToRepair, ReturnMode, Chunk, [GotItFrom], File, Offset, Size, Depth, STime, S); {ok, BadChunk} -> exit({todo, bad_chunk_size, ?MODULE, ?LINE, File, Offset, Size, got, byte_size(BadChunk)}); {error, Retry} when Retry == partition; Retry == bad_epoch; Retry == wedged -> read_repair(ConsistencyMode, ReturnMode, File, Offset, Size, Depth, STime, S); {error, not_written} -> {reply, {error, not_written}, S}; {error, written} -> exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size}) end. read_repair3([], ReturnMode, Chunk, Repaired, File, Offset, Size, Depth, STime, S) -> read_repair4([], ReturnMode, Chunk, Repaired, File, Offset, Size, Depth, STime, S); read_repair3(ToRepair, ReturnMode, Chunk, Repaired, File, Offset, Size, 0=Depth, STime, S) -> read_repair4(ToRepair, ReturnMode, Chunk, Repaired, File, Offset, Size, Depth + 1, STime, S); read_repair3(ToRepair, ReturnMode, Chunk, Repaired, File, Offset, Size, Depth, STime, #state{proj=P}=S) -> %% io:format(user, "read_repair3 sleep1,", []), sleep_a_while(Depth), DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, if DiffMs > ?MAX_RUNTIME -> {reply, {error, partition}, S}; true -> S2 = update_proj(S#state{proj=undefined, bad_proj=P}), case S2#state.proj of P2 when P2 == undefined orelse P2#projection_v1.upi == [] -> read_repair3(ToRepair, ReturnMode, Chunk, Repaired, File, Offset, Size, Depth + 1, STime, S2); P2 -> ToRepair2 = mutation_flus(P2) -- Repaired, read_repair4(ToRepair2, ReturnMode, Chunk, Repaired, File, Offset, Size, Depth + 1, STime, S2) end end. read_repair4([], ReturnMode, Chunk, _Repaired, File, Offset, _IgnoreSize, _Depth, _STime, S) -> %% TODO: add stats for # of repairs, length(_Repaired)-1, etc etc? case ReturnMode of read -> {reply, {ok, Chunk}, S}; {append, Offset, Size, File} -> {reply, {ok, {Offset, Size, File}}, S} end; read_repair4([First|Rest]=ToRepair, ReturnMode, Chunk, Repaired, File, Offset, Size, Depth, STime, #state{epoch_id=EpochID, proxies_dict=PD}=S) -> Proxy = orddict:fetch(First, PD), case ?FLU_PC:write_chunk(Proxy, EpochID, File, Offset, Chunk, ?TIMEOUT) of ok -> read_repair4(Rest, ReturnMode, Chunk, [First|Repaired], File, Offset, Size, Depth, STime, S); {error, Retry} when Retry == partition; Retry == bad_epoch; Retry == wedged -> read_repair3(ToRepair, ReturnMode, Chunk, Repaired, File, Offset, Size, Depth, STime, S); {error, written} -> %% TODO: To be very paranoid, read the chunk here to verify %% that it is exactly our Chunk. read_repair4(Rest, ReturnMode, Chunk, Repaired, File, Offset, Size, Depth, STime, S); {error, not_written} -> exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size}) end. do_checksum_list(File, 0=Depth, STime, S) -> do_checksum_list2(File, Depth + 1, STime, S); do_checksum_list(File, Depth, STime, #state{proj=P}=S) -> sleep_a_while(Depth), DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, if DiffMs > ?MAX_RUNTIME -> {reply, {error, partition}, S}; true -> %% This is suboptimal for performance: there are some paths %% through this point where our current projection is good %% enough. But we're going to try to keep the code as simple %% as we can for now. S2 = update_proj(S#state{proj=undefined, bad_proj=P}), case S2#state.proj of P2 when P2 == undefined orelse P2#projection_v1.upi == [] -> do_checksum_list(File, Depth + 1, STime, S2); _ -> do_checksum_list2(File, Depth + 1, STime, S2) end end. do_checksum_list2(File, Depth, STime, #state{epoch_id=EpochID, proj=P, proxies_dict=PD}=S) -> Proxy = orddict:fetch(lists:last(readonly_flus(P)), PD), case ?FLU_PC:checksum_list(Proxy, EpochID, File, ?TIMEOUT) of {ok, _}=OK -> {reply, OK, S}; {error, Retry} when Retry == partition; Retry == bad_epoch; Retry == wedged -> do_checksum_list(File, Depth, STime, S); {error, _}=Error -> {reply, Error, S} end. do_list_files(0=Depth, STime, S) -> do_list_files2(Depth + 1, STime, S); do_list_files(Depth, STime, #state{proj=P}=S) -> sleep_a_while(Depth), DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, if DiffMs > ?MAX_RUNTIME -> {reply, {error, partition}, S}; true -> %% This is suboptimal for performance: there are some paths %% through this point where our current projection is good %% enough. But we're going to try to keep the code as simple %% as we can for now. S2 = update_proj(S#state{proj=undefined, bad_proj=P}), case S2#state.proj of P2 when P2 == undefined orelse P2#projection_v1.upi == [] -> do_list_files(Depth + 1, STime, S2); _ -> do_list_files2(Depth + 1, STime, S2) end end. do_list_files2(Depth, STime, #state{epoch_id=EpochID, proj=P, proxies_dict=PD}=S) -> Proxy = orddict:fetch(lists:last(readonly_flus(P)), PD), case ?FLU_PC:list_files(Proxy, EpochID, ?TIMEOUT) of {ok, _}=OK -> {reply, OK, S}; {error, Retry} when Retry == partition; Retry == bad_epoch; Retry == wedged -> do_list_files(Depth, STime, S); {error, _}=Error -> {reply, Error, S} end. update_proj(#state{proj=undefined}=S) -> update_proj2(1, S); update_proj(S) -> S. update_proj2(Count, #state{bad_proj=BadProj, proxies_dict=ProxiesDict}=S) -> Timeout = 2*1000, WTimeout = 2*Timeout, Proxies = orddict:to_list(ProxiesDict), Work = fun({_K, Proxy}) -> ?FLU_PC:read_latest_projection(Proxy, private, Timeout) end, Rs = run_middleworker_job(Work, Proxies, WTimeout), %% TODO: There's a possible bug here when running multiple independent %% Machi clusters/chains. If our chain used to be [a,b], but our %% sysadmin has changed our cluster to be [a] and a completely seprate %% cluster with [b], and if b is reusing the address & port number, %% then it is possible that choose_best_projs() can incorrectly choose %% b's projection. case choose_best_proj(Rs) of P when P >= BadProj -> #projection_v1{epoch_number=Epoch, epoch_csum=CSum, members_dict=NewMembersDict} = P, EpochID = {Epoch, CSum}, ?FLU_PC:stop_proxies(ProxiesDict), NewProxiesDict = ?FLU_PC:start_proxies(NewMembersDict), S#state{bad_proj=undefined, proj=P, epoch_id=EpochID, members_dict=NewMembersDict, proxies_dict=NewProxiesDict}; _ -> sleep_a_while(Count), update_proj2(Count + 1, S) end. run_middleworker_job(Fun, ArgList, WTimeout) -> Parent = self(), MiddleWorker = spawn(fun() -> PidsMons = [spawn_monitor(fun() -> Res = (catch Fun(Arg)), exit(Res) end) || Arg <- ArgList], Rs = gather_worker_statuses(PidsMons, WTimeout), Parent ! {res, self(), Rs}, exit(normal) end), receive {res, MiddleWorker, Results} -> Results after WTimeout+100 -> [] end. gather_worker_statuses([], _Timeout) -> []; gather_worker_statuses([{Pid,Ref}|Rest], Timeout) -> receive {'DOWN', R, process, P, Status} when R == Ref, P == Pid -> [Status|gather_worker_statuses(Rest, Timeout)] after Timeout -> gather_worker_statuses(Rest, 0) end. choose_best_proj(Rs) -> WorstEpoch = #projection_v1{epoch_number=-1,epoch_csum= <<>>}, lists:foldl(fun({ok, NewEpoch}, BestEpoch) when NewEpoch > BestEpoch -> NewEpoch; (_, BestEpoch) -> BestEpoch end, WorstEpoch, Rs). try_to_find_chunk(Eligible, File, Offset, Size, #state{epoch_id=EpochID, proxies_dict=PD}) -> Timeout = 2*1000, Work = fun(FLU) -> Proxy = orddict:fetch(FLU, PD), case ?FLU_PC:read_chunk(Proxy, EpochID, File, Offset, Size) of {ok, Chunk} when byte_size(Chunk) == Size -> {FLU, {ok, Chunk}}; Else -> {FLU, Else} end end, Rs = run_middleworker_job(Work, Eligible, Timeout), case [X || {_, {ok, B}}=X <- Rs, is_binary(B)] of [{FoundFLU, {ok, Chunk}}|_] -> {ok, Chunk, FoundFLU}; [] -> RetryErrs = [partition, bad_epoch, wedged], case [Err || {error, Err} <- Rs, lists:member(Err, RetryErrs)] of [SomeErr|_] -> {error, SomeErr}; [] -> %% TODO does this really work 100% of the time? {error, not_written} end end. mutation_flus(#projection_v1{upi=UPI, repairing=Repairing}) -> UPI ++ Repairing; mutation_flus(#state{proj=P}) -> mutation_flus(P). readonly_flus(#projection_v1{upi=UPI}) -> UPI; readonly_flus(#state{proj=P}) -> readonly_flus(P). sleep_a_while(0) -> ok; sleep_a_while(1) -> ok; sleep_a_while(Depth) -> timer:sleep(30 + trunc(math:pow(1.9, Depth))).