diff --git a/TODO-shortterm.org b/TODO-shortterm.org index 1849ca7..985ebf1 100644 --- a/TODO-shortterm.org +++ b/TODO-shortterm.org @@ -37,14 +37,17 @@ func, and pattern match Erlang style in that func. - Add no-wedging state to make testing easier? -** TODO Adapt the projection-aware, CR-implementing client from demo-day +** DONE Adapt the projection-aware, CR-implementing client from demo-day +** DONE Add major comment sections to the CR-impl client +** TODO Simple basho_bench driver, put some unscientific chalk on the benchtop ** TODO Create parallel PULSE test for basic API plus chain manager repair +** TODO Add client-side vs. server-side checksum type, expand client API? ** TODO Add gproc and get rid of registered name rendezvous *** TODO Fixes the atom table leak *** TODO Fixes the problem of having active sequencer for the same prefix on two FLUS in the same VM -** TODO Fix all known bugs with Chain Manager (list below) +** TODO Fix all known bugs/cruft with Chain Manager (list below) *** DONE Fix known bugs *** DONE Clean up crufty TODO comments and other obvious cruft *** TODO Re-add verification step of stable epochs, including inner projections! diff --git a/include/machi_projection.hrl b/include/machi_projection.hrl index e20908a..ba0b8fd 100644 --- a/include/machi_projection.hrl +++ b/include/machi_projection.hrl @@ -21,6 +21,7 @@ -ifndef(MACHI_PROJECTION_HRL). -define(MACHI_PROJECTION_HRL, true). +-type pv1_consistency_mode() :: 'ap_mode' | 'cp_mode'. -type pv1_csum() :: binary(). -type pv1_epoch() :: {pv1_epoch_n(), pv1_csum()}. -type pv1_epoch_n() :: non_neg_integer(). @@ -46,6 +47,7 @@ author_server :: pv1_server(), all_members :: [pv1_server()], creation_time :: pv1_timestamp(), + mode = ap_mode :: pv1_consistency_mode(), upi :: [pv1_server()], repairing :: [pv1_server()], down :: [pv1_server()], diff --git a/src/machi_chain_manager1.erl b/src/machi_chain_manager1.erl index 5f73c6c..6e48978 100644 --- a/src/machi_chain_manager1.erl +++ b/src/machi_chain_manager1.erl @@ -466,11 +466,12 @@ cl_read_latest_projection(ProjectionType, AllHosed, S) -> {All_queried_list, FLUsRs, S2} = read_latest_projection_call_only(ProjectionType, AllHosed, S), - rank_and_sort_projections_with_extra(All_queried_list, FLUsRs, S2). + rank_and_sort_projections_with_extra(All_queried_list, FLUsRs, + ProjectionType, S2). -rank_and_sort_projections_with_extra(All_queried_list, FLUsRs, +rank_and_sort_projections_with_extra(All_queried_list, FLUsRs, ProjectionType, #ch_mgr{name=MyName,proj=CurrentProj}=S) -> - UnwrittenRs = [x || {_, error_unwritten} <- FLUsRs], + UnwrittenRs = [x || {_, {error, not_written}} <- FLUsRs], Ps = [Proj || {_FLU, Proj} <- FLUsRs, is_record(Proj, projection_v1)], BadAnswerFLUs = [FLU || {FLU, Answer} <- FLUsRs, not is_record(Answer, projection_v1)], @@ -489,7 +490,7 @@ rank_and_sort_projections_with_extra(All_queried_list, FLUsRs, {trans_all_hosed, []}, {trans_all_flap_counts, []}], {not_unanimous, NoneProj, Extra2, S}; - UnwrittenRs /= [] -> + ProjectionType == public, UnwrittenRs /= [] -> {needs_repair, FLUsRs, [flarfus], S}; true -> [{_Rank, BestProj}|_] = rank_and_sort_projections(Ps, CurrentProj), @@ -516,7 +517,7 @@ rank_and_sort_projections_with_extra(All_queried_list, FLUsRs, end. do_read_repair(FLUsRs, _Extra, #ch_mgr{proj=CurrentProj} = S) -> - Unwrittens = [x || {_FLU, error_unwritten} <- FLUsRs], + Unwrittens = [x || {_FLU, {error, not_written}} <- FLUsRs], Ps = [Proj || {_FLU, Proj} <- FLUsRs, is_record(Proj, projection_v1)], if Unwrittens == [] orelse Ps == [] -> {nothing_to_do, S}; @@ -762,17 +763,10 @@ rank_projection(#projection_v1{author_server=Author, (N*N * length(UPI_list)). do_set_chain_members_dict(MembersDict, #ch_mgr{proxies_dict=OldProxiesDict}=S)-> - catch orddict:fold( - fun(_K, Pid, _Acc) -> - _ = (catch ?FLU_PC:quit(Pid)) - end, [], OldProxiesDict), - Proxies = orddict:fold( - fun(K, P, Acc) -> - {ok, Pid} = ?FLU_PC:start_link(P), - [{K, Pid}|Acc] - end, [], MembersDict), + _ = ?FLU_PC:stop_proxies(OldProxiesDict), + ProxiesDict = ?FLU_PC:start_proxies(MembersDict), {ok, S#ch_mgr{members_dict=MembersDict, - proxies_dict=orddict:from_list(Proxies)}}. + proxies_dict=ProxiesDict}}. do_react_to_env(#ch_mgr{name=MyName, proj=#projection_v1{epoch_number=Epoch, @@ -1936,9 +1930,12 @@ perhaps_start_repair( %% RepairOpts = [{repair_mode, check}, verbose], RepairFun = fun() -> do_repair(S, RepairOpts, ap_mode) end, LastUPI = lists:last(UPI), + IgnoreStabilityTime_p = proplists:get_value(ignore_stability_time, + S#ch_mgr.opts, false), case timer:now_diff(os:timestamp(), Start) div 1000000 of - N when MyName == LastUPI, - N >= ?REPAIR_START_STABILITY_TIME -> + N when MyName == LastUPI andalso + (IgnoreStabilityTime_p orelse + N >= ?REPAIR_START_STABILITY_TIME) -> {WorkerPid, _Ref} = spawn_monitor(RepairFun), S#ch_mgr{repair_worker=WorkerPid, repair_start=os:timestamp(), diff --git a/src/machi_chain_repair.erl b/src/machi_chain_repair.erl index 1ef772f..b890d80 100644 --- a/src/machi_chain_repair.erl +++ b/src/machi_chain_repair.erl @@ -107,8 +107,6 @@ repair(ap_mode=ConsistencyMode, Src, Repairing, UPI, MembersDict, ETS, Opts) -> catch What:Why -> Stack = erlang:get_stacktrace(), - io:format(user, "What Why ~p ~p @\n\t~p\n", - [What, Why, Stack]), {error, {What, Why, Stack}} after [(catch machi_proxy_flu1_client:quit(Pid)) || diff --git a/src/machi_cr_client.erl b/src/machi_cr_client.erl new file mode 100644 index 0000000..e6c145d --- /dev/null +++ b/src/machi_cr_client.erl @@ -0,0 +1,674 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +%% @doc Erlang API for the Machi client-implemented Chain Replication +%% (CORFU-style) protocol. +%% +%% The major operation processing is implemented in a state machine-like +%% manner. Before attempting an operation `X', there's an initial +%% operation `pre-X' that takes care of updating the epoch id, +%% restarting client protocol proxies, and if there's any server +%% instability (e.g. some server is wedged), then insert some sleep +%% time. When the chain appears to have stabilized, then we try the `X' +%% operation again. +%% +%% Function name for the `pre-X' stuff is usually `X()', and the +%% function name for the `X' stuff is usually `X2()'. (I.e., the `X' +%% stuff follows after `pre-X' and therefore has a `2' suffix on the +%% function name.) +%% +%% In the case of read repair, there are two stages: find the value to +%% perform the repair, then perform the repair writes. In the case of +%% the repair writes, the `pre-X' function is named `read_repair3()', +%% and the `X' function is named `read_repair4()'. +%% +%% TODO: It would be nifty to lift the very-nearly-but-not-quite-boilerplate +%% of the `pre-X' functions into a single common function ... but I'm not +%% sure yet on how to do it without making the code uglier. + +-module(machi_cr_client). + +-behaviour(gen_server). + +-include("machi.hrl"). +-include("machi_projection.hrl"). + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). +-endif. % TEST. + +-export([start_link/1]). +%% FLU1 API +-export([ + %% File API + append_chunk/3, append_chunk/4, + append_chunk_extra/4, append_chunk_extra/5, + read_chunk/4, read_chunk/5, + checksum_list/2, checksum_list/3, + list_files/1, list_files/2, + + %% Common API + quit/1 + ]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-define(FLU_PC, machi_proxy_flu1_client). +-define(TIMEOUT, 2*1000). +-define(MAX_RUNTIME, 8*1000). + +-record(state, { + members_dict :: p_srvr_dict(), + proxies_dict :: orddict:orddict(), + epoch_id, + proj, + bad_proj + }). + +%% @doc Start a local, long-lived process that will be our steady +%% & reliable communication proxy with the fickle & flaky +%% remote Machi server. + +start_link(P_srvr_list) -> + gen_server:start_link(?MODULE, [P_srvr_list], []). + +%% @doc Append a chunk (binary- or iolist-style) of data to a file +%% with `Prefix'. + +append_chunk(PidSpec, Prefix, Chunk) -> + append_chunk(PidSpec, Prefix, Chunk, infinity). + +%% @doc Append a chunk (binary- or iolist-style) of data to a file +%% with `Prefix'. + +append_chunk(PidSpec, Prefix, Chunk, Timeout) -> + append_chunk_extra(PidSpec, Prefix, Chunk, 0, Timeout). + +%% @doc Append a chunk (binary- or iolist-style) of data to a file +%% with `Prefix'. + +append_chunk_extra(PidSpec, Prefix, Chunk, ChunkExtra) + when is_integer(ChunkExtra), ChunkExtra >= 0 -> + append_chunk_extra(PidSpec, Prefix, Chunk, ChunkExtra, infinity). + +%% @doc Append a chunk (binary- or iolist-style) of data to a file +%% with `Prefix'. + +append_chunk_extra(PidSpec, Prefix, Chunk, ChunkExtra, Timeout) -> + gen_server:call(PidSpec, {req, {append_chunk_extra, Prefix, + Chunk, ChunkExtra}}, + Timeout). + +%% %% @doc Read a chunk of data of size `Size' from `File' at `Offset'. + +read_chunk(PidSpec, File, Offset, Size) -> + read_chunk(PidSpec, File, Offset, Size, infinity). + +%% @doc Read a chunk of data of size `Size' from `File' at `Offset'. + +read_chunk(PidSpec, File, Offset, Size, Timeout) -> + gen_server:call(PidSpec, {req, {read_chunk, File, Offset, Size}}, + Timeout). + +%% @doc Fetch the list of chunk checksums for `File'. + +checksum_list(PidSpec, File) -> + checksum_list(PidSpec, File, infinity). + +%% @doc Fetch the list of chunk checksums for `File'. + +checksum_list(PidSpec, File, Timeout) -> + gen_server:call(PidSpec, {req, {checksum_list, File}}, + Timeout). + +%% @doc Fetch the list of all files on the remote FLU. + +list_files(PidSpec) -> + list_files(PidSpec, infinity). + +%% @doc Fetch the list of all files on the remote FLU. + +list_files(PidSpec, Timeout) -> + gen_server:call(PidSpec, {req, {list_files}}, + Timeout). + +%% @doc Quit & close the connection to remote FLU and stop our +%% proxy process. + +quit(PidSpec) -> + gen_server:call(PidSpec, quit, infinity). + +%%%%%%%%%%%%%%%%%%%%%%%%%%% + +init([P_srvr_list]) -> + MembersDict = orddict:from_list([{P#p_srvr.name, P} || P <- P_srvr_list]), + ProxiesDict = ?FLU_PC:start_proxies(MembersDict), + {ok, #state{members_dict=MembersDict, proxies_dict=ProxiesDict}}. + +handle_call({req, Req}, From, S) -> + handle_call2(Req, From, update_proj(S)); +handle_call(quit, _From, S) -> + {stop, normal, ok, S}; +handle_call(_Request, _From, S) -> + Reply = whaaaaaaaaaaaaaaaaaaaa, + {reply, Reply, S}. + +handle_cast(_Msg, S) -> + {noreply, S}. + +handle_info(_Info, S) -> + io:format(user, "~s:handle_info: ~p\n", [?MODULE, _Info]), + {noreply, S}. + +terminate(_Reason, #state{proxies_dict=ProxiesDict}=_S) -> + _ = ?FLU_PC:stop_proxies(ProxiesDict), + ok. + +code_change(_OldVsn, S, _Extra) -> + {ok, S}. + +%%%%%%%%%%%%%%%%%%%%%%%%%%% + +handle_call2({append_chunk_extra, Prefix, Chunk, ChunkExtra}, _From, S) -> + do_append_head(Prefix, Chunk, ChunkExtra, 0, os:timestamp(), S); +handle_call2({read_chunk, File, Offset, Size}, _From, S) -> + do_read_chunk(File, Offset, Size, 0, os:timestamp(), S); +handle_call2({checksum_list, File}, _From, S) -> + do_checksum_list(File, 0, os:timestamp(), S); +handle_call2({list_files}, _From, S) -> + do_list_files(0, os:timestamp(), S). + +do_append_head(Prefix, Chunk, ChunkExtra, 0=Depth, STime, S) -> + do_append_head2(Prefix, Chunk, ChunkExtra, Depth + 1, STime, S); +do_append_head(Prefix, Chunk, ChunkExtra, Depth, STime, #state{proj=P}=S) -> + %% io:format(user, "head sleep1,", []), + sleep_a_while(Depth), + DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, + if DiffMs > ?MAX_RUNTIME -> + {reply, {error, partition}, S}; + true -> + %% This is suboptimal for performance: there are some paths + %% through this point where our current projection is good + %% enough. But we're going to try to keep the code as simple + %% as we can for now. + S2 = update_proj(S#state{proj=undefined, bad_proj=P}), + case S2#state.proj of + P2 when P2 == undefined orelse + P2#projection_v1.upi == [] -> + do_append_head(Prefix, Chunk, ChunkExtra, Depth + 1, + STime, S2); + _ -> + do_append_head2(Prefix, Chunk, ChunkExtra, Depth + 1, + STime, S2) + end + end. + +do_append_head2(Prefix, Chunk, ChunkExtra, Depth, STime, + #state{epoch_id=EpochID, proj=P, proxies_dict=PD}=S) -> + [HeadFLU|RestFLUs] = mutation_flus(P), + Proxy = orddict:fetch(HeadFLU, PD), + case ?FLU_PC:append_chunk_extra(Proxy, + EpochID, Prefix, Chunk, ChunkExtra, + ?TIMEOUT) of + {ok, {Offset, _Size, File}=_X} -> + %% io:format(user, "append ~w,", [HeadFLU]), + do_append_midtail(RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra, + [HeadFLU], 0, STime, S); + {error, Retry} + when Retry == partition; Retry == bad_epoch; Retry == wedged -> + do_append_head(Prefix, Chunk, ChunkExtra, Depth, STime, S); + {error, written} -> + %% Implicit sequencing + this error = we don't know where this + %% written block is. But we lost a race. Repeat, with a new + %% sequencer assignment. + do_append_head(Prefix, Chunk, ChunkExtra, Depth, STime, S); + {error, not_written} -> + exit({todo_should_never_happen,?MODULE,?LINE, + Prefix,iolist_size(Chunk)}) + end. + +do_append_midtail(RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra, + Ws, Depth, STime, S) + when RestFLUs == [] orelse Depth == 0 -> + do_append_midtail2(RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra, + Ws, Depth + 1, STime, S); +do_append_midtail(_RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra, + Ws, Depth, STime, #state{proj=P}=S) -> + %% io:format(user, "midtail sleep2,", []), + sleep_a_while(Depth), + DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, + if DiffMs > ?MAX_RUNTIME -> + {reply, {error, partition}, S}; + true -> + S2 = update_proj(S#state{proj=undefined, bad_proj=P}), + case S2#state.proj of + undefined -> + {reply, {error, partition}, S}; + P2 -> + RestFLUs2 = mutation_flus(P2), + case RestFLUs2 -- Ws of + RestFLUs2 -> + %% None of the writes that we have done so far + %% are to FLUs that are in the RestFLUs2 list. + %% We are pessimistic here and assume that + %% those FLUs are permanently dead. Start + %% over with a new sequencer assignment, at + %% the 2nd have of the impl (we have already + %% slept & refreshed the projection). + do_append_head2(Prefix, Chunk, ChunkExtra, Depth, + STime, S2); + RestFLUs3 -> + do_append_midtail2(RestFLUs3, Prefix, File, Offset, + Chunk, ChunkExtra, + Ws, Depth + 1, STime, S2) + end + end + end. + +do_append_midtail2([], _Prefix, File, Offset, Chunk, + _ChunkExtra, _Ws, _Depth, _STime, S) -> + %% io:format(user, "ok!\n", []), + {reply, {ok, {Offset, iolist_size(Chunk), File}}, S}; +do_append_midtail2([FLU|RestFLUs]=FLUs, Prefix, File, Offset, Chunk, + ChunkExtra, Ws, Depth, STime, + #state{epoch_id=EpochID, proxies_dict=PD}=S) -> + Proxy = orddict:fetch(FLU, PD), + case ?FLU_PC:write_chunk(Proxy, EpochID, File, Offset, Chunk, ?TIMEOUT) of + ok -> + %% io:format(user, "write ~w,", [FLU]), + do_append_midtail2(RestFLUs, Prefix, File, Offset, Chunk, + ChunkExtra, [FLU|Ws], Depth, STime, S); + {error, Retry} + when Retry == partition; Retry == bad_epoch; Retry == wedged -> + do_append_midtail(FLUs, Prefix, File, Offset, Chunk, + ChunkExtra, Ws, Depth, STime, S); + {error, written} -> + %% We know what the chunk ought to be, so jump to the + %% middle of read-repair. + Resume = {append, Offset, iolist_size(Chunk), File}, + read_repair3(FLUs, Resume, Chunk, [], File, Offset, + iolist_size(Chunk), Depth, STime, S); + {error, not_written} -> + exit({todo_should_never_happen,?MODULE,?LINE,File,Offset}) + end. + +do_read_chunk(File, Offset, Size, 0=Depth, STime, + #state{proj=#projection_v1{upi=[_|_]}}=S) -> % UPI is non-empty + do_read_chunk2(File, Offset, Size, Depth + 1, STime, S); +do_read_chunk(File, Offset, Size, Depth, STime, #state{proj=P}=S) -> + %% io:format(user, "read sleep1,", []), + sleep_a_while(Depth), + DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, + if DiffMs > ?MAX_RUNTIME -> + {reply, {error, partition}, S}; + true -> + S2 = update_proj(S#state{proj=undefined, bad_proj=P}), + case S2#state.proj of + P2 when P2 == undefined orelse + P2#projection_v1.upi == [] -> + do_read_chunk(File, Offset, Size, Depth + 1, STime, S2); + _ -> + do_read_chunk2(File, Offset, Size, Depth + 1, STime, S2) + end + end. + +do_read_chunk2(File, Offset, Size, Depth, STime, + #state{proj=P, epoch_id=EpochID, proxies_dict=PD}=S) -> + UPI = readonly_flus(P), + Tail = lists:last(UPI), + ConsistencyMode = P#projection_v1.mode, + case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), EpochID, + File, Offset, Size, ?TIMEOUT) of + {ok, Chunk} when byte_size(Chunk) == Size -> + {reply, {ok, Chunk}, S}; + {ok, BadChunk} -> + exit({todo, bad_chunk_size, ?MODULE, ?LINE, File, Offset, Size, + got, byte_size(BadChunk)}); + {error, Retry} + when Retry == partition; Retry == bad_epoch; Retry == wedged -> + do_read_chunk(File, Offset, Size, Depth, STime, S); + {error, not_written} -> + read_repair(ConsistencyMode, read, File, Offset, Size, Depth, STime, S); + %% {reply, {error, not_written}, S}; + {error, written} -> + exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size}) + end. + +%% Read repair: depends on the consistency mode that we're in: +%% +%% CP mode: If the head is written, then use it to repair UPI++Repairing. +%% If head is not_written, then do nothing. +%% AP mode: If any FLU in UPI++Repairing is written, then use it to repair +%% UPI+repairing. +%% If all FLUs in UPI++Repairing are not_written, then do nothing. + +read_repair(ConsistencyMode, ReturnMode, File, Offset, Size, 0=Depth, + STime, #state{proj=#projection_v1{upi=[_|_]}}=S) -> % UPI is non-empty + read_repair2(ConsistencyMode, ReturnMode, File, Offset, Size, Depth + 1, + STime, S); +read_repair(ConsistencyMode, ReturnMode, File, Offset, Size, Depth, + STime, #state{proj=P}=S) -> + sleep_a_while(Depth), + DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, + if DiffMs > ?MAX_RUNTIME -> + {reply, {error, partition}, S}; + true -> + S2 = update_proj(S#state{proj=undefined, bad_proj=P}), + case S2#state.proj of + P2 when P2 == undefined orelse + P2#projection_v1.upi == [] -> + read_repair(ConsistencyMode, ReturnMode, File, Offset, + Size, Depth + 1, STime, S2); + _ -> + read_repair2(ConsistencyMode, ReturnMode, File, Offset, + Size, Depth + 1, STime, S2) + end + end. + +read_repair2(cp_mode=ConsistencyMode, + ReturnMode, File, Offset, Size, Depth, STime, + #state{proj=P, epoch_id=EpochID, proxies_dict=PD}=S) -> + Tail = lists:last(readonly_flus(P)), + case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), EpochID, + File, Offset, Size, ?TIMEOUT) of + {ok, Chunk} when byte_size(Chunk) == Size -> + ToRepair = mutation_flus(P) -- [Tail], + read_repair3(ToRepair, ReturnMode, Chunk, [Tail], File, Offset, + Size, Depth, STime, S); + {ok, BadChunk} -> + exit({todo, bad_chunk_size, ?MODULE, ?LINE, File, Offset, + Size, got, byte_size(BadChunk)}); + {error, Retry} + when Retry == partition; Retry == bad_epoch; Retry == wedged -> + read_repair(ConsistencyMode, ReturnMode, File, Offset, + Size, Depth, STime, S); + {error, not_written} -> + {reply, {error, not_written}, S}; + {error, written} -> + exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size}) + end; +read_repair2(ap_mode=ConsistencyMode, + ReturnMode, File, Offset, Size, Depth, STime, + #state{proj=P}=S) -> + Eligible = mutation_flus(P), + case try_to_find_chunk(Eligible, File, Offset, Size, S) of + {ok, Chunk, GotItFrom} when byte_size(Chunk) == Size -> + ToRepair = mutation_flus(P) -- [GotItFrom], + read_repair3(ToRepair, ReturnMode, Chunk, [GotItFrom], File, + Offset, Size, Depth, STime, S); + {ok, BadChunk} -> + exit({todo, bad_chunk_size, ?MODULE, ?LINE, File, + Offset, Size, got, byte_size(BadChunk)}); + {error, Retry} + when Retry == partition; Retry == bad_epoch; Retry == wedged -> + read_repair(ConsistencyMode, ReturnMode, File, + Offset, Size, Depth, STime, S); + {error, not_written} -> + {reply, {error, not_written}, S}; + {error, written} -> + exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size}) + end. + +read_repair3([], ReturnMode, Chunk, Repaired, File, Offset, + Size, Depth, STime, S) -> + read_repair4([], ReturnMode, Chunk, Repaired, File, Offset, + Size, Depth, STime, S); +read_repair3(ToRepair, ReturnMode, Chunk, Repaired, File, Offset, + Size, 0=Depth, STime, S) -> + read_repair4(ToRepair, ReturnMode, Chunk, Repaired, File, Offset, + Size, Depth + 1, STime, S); +read_repair3(ToRepair, ReturnMode, Chunk, Repaired, File, Offset, + Size, Depth, STime, #state{proj=P}=S) -> + %% io:format(user, "read_repair3 sleep1,", []), + sleep_a_while(Depth), + DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, + if DiffMs > ?MAX_RUNTIME -> + {reply, {error, partition}, S}; + true -> + S2 = update_proj(S#state{proj=undefined, bad_proj=P}), + case S2#state.proj of + P2 when P2 == undefined orelse + P2#projection_v1.upi == [] -> + read_repair3(ToRepair, ReturnMode, Chunk, Repaired, File, + Offset, Size, Depth + 1, STime, S2); + P2 -> + ToRepair2 = mutation_flus(P2) -- Repaired, + read_repair4(ToRepair2, ReturnMode, Chunk, Repaired, File, + Offset, Size, Depth + 1, STime, S2) + end + end. + +read_repair4([], ReturnMode, Chunk, _Repaired, File, Offset, + _IgnoreSize, _Depth, _STime, S) -> + %% TODO: add stats for # of repairs, length(_Repaired)-1, etc etc? + case ReturnMode of + read -> + {reply, {ok, Chunk}, S}; + {append, Offset, Size, File} -> + {reply, {ok, {Offset, Size, File}}, S} + end; +read_repair4([First|Rest]=ToRepair, ReturnMode, Chunk, Repaired, File, Offset, + Size, Depth, STime, #state{epoch_id=EpochID, proxies_dict=PD}=S) -> + Proxy = orddict:fetch(First, PD), + case ?FLU_PC:write_chunk(Proxy, EpochID, File, Offset, Chunk, ?TIMEOUT) of + ok -> + read_repair4(Rest, ReturnMode, Chunk, [First|Repaired], File, + Offset, Size, Depth, STime, S); + {error, Retry} + when Retry == partition; Retry == bad_epoch; Retry == wedged -> + read_repair3(ToRepair, ReturnMode, Chunk, Repaired, File, + Offset, Size, Depth, STime, S); + {error, written} -> + %% TODO: To be very paranoid, read the chunk here to verify + %% that it is exactly our Chunk. + read_repair4(Rest, ReturnMode, Chunk, Repaired, File, + Offset, Size, Depth, STime, S); + {error, not_written} -> + exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size}) + end. + +do_checksum_list(File, 0=Depth, STime, S) -> + do_checksum_list2(File, Depth + 1, STime, S); +do_checksum_list(File, Depth, STime, #state{proj=P}=S) -> + sleep_a_while(Depth), + DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, + if DiffMs > ?MAX_RUNTIME -> + {reply, {error, partition}, S}; + true -> + %% This is suboptimal for performance: there are some paths + %% through this point where our current projection is good + %% enough. But we're going to try to keep the code as simple + %% as we can for now. + S2 = update_proj(S#state{proj=undefined, bad_proj=P}), + case S2#state.proj of + P2 when P2 == undefined orelse + P2#projection_v1.upi == [] -> + do_checksum_list(File, Depth + 1, STime, S2); + _ -> + do_checksum_list2(File, Depth + 1, STime, S2) + end + end. + +do_checksum_list2(File, Depth, STime, + #state{epoch_id=EpochID, proj=P, proxies_dict=PD}=S) -> + Proxy = orddict:fetch(lists:last(readonly_flus(P)), PD), + case ?FLU_PC:checksum_list(Proxy, EpochID, File, ?TIMEOUT) of + {ok, _}=OK -> + {reply, OK, S}; + {error, Retry} + when Retry == partition; Retry == bad_epoch; Retry == wedged -> + do_checksum_list(File, Depth, STime, S); + {error, _}=Error -> + {reply, Error, S} + end. + +do_list_files(0=Depth, STime, S) -> + do_list_files2(Depth + 1, STime, S); +do_list_files(Depth, STime, #state{proj=P}=S) -> + sleep_a_while(Depth), + DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, + if DiffMs > ?MAX_RUNTIME -> + {reply, {error, partition}, S}; + true -> + %% This is suboptimal for performance: there are some paths + %% through this point where our current projection is good + %% enough. But we're going to try to keep the code as simple + %% as we can for now. + S2 = update_proj(S#state{proj=undefined, bad_proj=P}), + case S2#state.proj of + P2 when P2 == undefined orelse + P2#projection_v1.upi == [] -> + do_list_files(Depth + 1, STime, S2); + _ -> + do_list_files2(Depth + 1, STime, S2) + end + end. + +do_list_files2(Depth, STime, + #state{epoch_id=EpochID, proj=P, proxies_dict=PD}=S) -> + Proxy = orddict:fetch(lists:last(readonly_flus(P)), PD), + case ?FLU_PC:list_files(Proxy, EpochID, ?TIMEOUT) of + {ok, _}=OK -> + {reply, OK, S}; + {error, Retry} + when Retry == partition; Retry == bad_epoch; Retry == wedged -> + do_list_files(Depth, STime, S); + {error, _}=Error -> + {reply, Error, S} + end. + +update_proj(#state{proj=undefined}=S) -> + update_proj2(1, S); +update_proj(S) -> + S. + +update_proj2(Count, #state{bad_proj=BadProj, proxies_dict=ProxiesDict}=S) -> + Timeout = 2*1000, + WTimeout = 2*Timeout, + Proxies = orddict:to_list(ProxiesDict), + Work = fun({_K, Proxy}) -> + ?FLU_PC:read_latest_projection(Proxy, private, Timeout) + end, + Rs = run_middleworker_job(Work, Proxies, WTimeout), + %% TODO: There's a possible bug here when running multiple independent + %% Machi clusters/chains. If our chain used to be [a,b], but our + %% sysadmin has changed our cluster to be [a] and a completely seprate + %% cluster with [b], and if b is reusing the address & port number, + %% then it is possible that choose_best_projs() can incorrectly choose + %% b's projection. + case choose_best_proj(Rs) of + P when P >= BadProj -> + #projection_v1{epoch_number=Epoch, epoch_csum=CSum, + members_dict=NewMembersDict} = P, + EpochID = {Epoch, CSum}, + ?FLU_PC:stop_proxies(ProxiesDict), + NewProxiesDict = ?FLU_PC:start_proxies(NewMembersDict), + S#state{bad_proj=undefined, proj=P, epoch_id=EpochID, + members_dict=NewMembersDict, proxies_dict=NewProxiesDict}; + _ -> + sleep_a_while(Count), + update_proj2(Count + 1, S) + end. + +run_middleworker_job(Fun, ArgList, WTimeout) -> + Parent = self(), + MiddleWorker = + spawn(fun() -> + PidsMons = + [spawn_monitor(fun() -> + Res = (catch Fun(Arg)), + exit(Res) + end) || Arg <- ArgList], + Rs = gather_worker_statuses(PidsMons, WTimeout), + Parent ! {res, self(), Rs}, + exit(normal) + end), + receive + {res, MiddleWorker, Results} -> + Results + after WTimeout+100 -> + [] + end. + +gather_worker_statuses([], _Timeout) -> + []; +gather_worker_statuses([{Pid,Ref}|Rest], Timeout) -> + receive + {'DOWN', R, process, P, Status} when R == Ref, P == Pid -> + [Status|gather_worker_statuses(Rest, Timeout)] + after Timeout -> + gather_worker_statuses(Rest, 0) + end. + +choose_best_proj(Rs) -> + WorstEpoch = #projection_v1{epoch_number=-1,epoch_csum= <<>>}, + lists:foldl(fun({ok, NewEpoch}, BestEpoch) + when NewEpoch > BestEpoch -> + NewEpoch; + (_, BestEpoch) -> + BestEpoch + end, WorstEpoch, Rs). + +try_to_find_chunk(Eligible, File, Offset, Size, + #state{epoch_id=EpochID, proxies_dict=PD}) -> + Timeout = 2*1000, + Work = fun(FLU) -> + Proxy = orddict:fetch(FLU, PD), + case ?FLU_PC:read_chunk(Proxy, EpochID, + File, Offset, Size) of + {ok, Chunk} when byte_size(Chunk) == Size -> + {FLU, {ok, Chunk}}; + Else -> + {FLU, Else} + end + end, + Rs = run_middleworker_job(Work, Eligible, Timeout), + case [X || {_, {ok, B}}=X <- Rs, is_binary(B)] of + [{FoundFLU, {ok, Chunk}}|_] -> + {ok, Chunk, FoundFLU}; + [] -> + RetryErrs = [partition, bad_epoch, wedged], + case [Err || {error, Err} <- Rs, lists:member(Err, RetryErrs)] of + [SomeErr|_] -> + {error, SomeErr}; + [] -> + %% TODO does this really work 100% of the time? + {error, not_written} + end + end. + +mutation_flus(#projection_v1{upi=UPI, repairing=Repairing}) -> + UPI ++ Repairing; +mutation_flus(#state{proj=P}) -> + mutation_flus(P). + +readonly_flus(#projection_v1{upi=UPI}) -> + UPI; +readonly_flus(#state{proj=P}) -> + readonly_flus(P). + +sleep_a_while(0) -> + ok; +sleep_a_while(1) -> + ok; +sleep_a_while(Depth) -> + timer:sleep(30 + trunc(math:pow(1.9, Depth))). diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl index a830c18..0106c37 100644 --- a/src/machi_flu1.erl +++ b/src/machi_flu1.erl @@ -66,6 +66,8 @@ -include("machi.hrl"). -include("machi_projection.hrl"). +-define(SERVER_CMD_READ_TIMEOUT, 600*1000). + -export([start_link/1, stop/1, update_wedge_state/3]). -export([make_listener_regname/1, make_projection_server_regname/1]). @@ -191,8 +193,15 @@ run_listen_server(#state{flu_name=FluName, tcp_port=TcpPort}=S) -> register(make_listener_regname(FluName), self()), SockOpts = [{reuseaddr, true}, {mode, binary}, {active, false}, {packet, line}], - {ok, LSock} = gen_tcp:listen(TcpPort, SockOpts), - listen_server_loop(LSock, S). + case gen_tcp:listen(TcpPort, SockOpts) of + {ok, LSock} -> + listen_server_loop(LSock, S); + Else -> + error_logger:warning_msg("~s:run_listen_server: " + "listen to TCP port ~w: ~w\n", + [?MODULE, TcpPort, Else]), + exit({?MODULE, run_listen_server, tcp_port, TcpPort, Else}) + end. run_append_server(FluPid, AckPid, #state{flu_name=Name, wedged=Wedged_p,epoch_id=EpochId}=S) -> @@ -244,7 +253,9 @@ decode_epoch_id(EpochIDHex) -> net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) -> ok = inet:setopts(Sock, [{packet, line}]), - case gen_tcp:recv(Sock, 0, 600*1000) of + %% TODO: Add testing control knob to adjust this timeout and/or inject + %% timeout condition. + case gen_tcp:recv(Sock, 0, ?SERVER_CMD_READ_TIMEOUT) of {ok, Line} -> %% machi_util:verb("Got: ~p\n", [Line]), PrefixLenLF = byte_size(Line) - 2 - ?EpochIDSpace - 8 -8 - 1, @@ -428,6 +439,7 @@ do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin, DataDir, <> = machi_util:hexstr_to_bin(LenHex), {_, Path} = machi_util:make_data_filename(DataDir, FileBin), OptsHasWrite = lists:member(write, FileOpts), + OptsHasRead = lists:member(read, FileOpts), case file:open(Path, FileOpts) of {ok, FH} -> try @@ -440,8 +452,9 @@ do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin, DataDir, Sock, OffsetHex, LenHex, FileBin, DataDir, FileOpts, DoItFun, EpochID, Wedged_p, CurrentEpochId); + {error, enoent} when OptsHasRead -> + ok = gen_tcp:send(Sock, <<"ERROR NO-SUCH-FILE\n">>); _Else -> - %%%%%% keep?? machi_util:verb("Else ~p ~p ~p ~p\n", [Offset, Len, Path, _Else]), ok = gen_tcp:send(Sock, <<"ERROR BAD-IO\n">>) end. @@ -759,9 +772,9 @@ do_projection_command(Sock, LenHex, S) -> _ = (catch gen_tcp:send(Sock, [<<"ERROR ">>, WHA, <<"\n">>])) end. -handle_projection_command({get_latest_epoch, ProjType}, +handle_projection_command({get_latest_epochid, ProjType}, #state{proj_store=ProjStore}) -> - machi_projection_store:get_latest_epoch(ProjStore, ProjType); + machi_projection_store:get_latest_epochid(ProjStore, ProjType); handle_projection_command({read_latest_projection, ProjType}, #state{proj_store=ProjStore}) -> machi_projection_store:read_latest_projection(ProjStore, ProjType); diff --git a/src/machi_flu1_client.erl b/src/machi_flu1_client.erl index 80e1b6a..14f18a0 100644 --- a/src/machi_flu1_client.erl +++ b/src/machi_flu1_client.erl @@ -25,6 +25,8 @@ -include("machi.hrl"). -include("machi_projection.hrl"). +-define(HARD_TIMEOUT, 2500). + -export([ %% File API append_chunk/4, append_chunk/5, @@ -35,7 +37,7 @@ wedge_status/1, wedge_status/2, %% Projection API - get_latest_epoch/2, get_latest_epoch/3, + get_latest_epochid/2, get_latest_epochid/3, read_latest_projection/2, read_latest_projection/3, read_projection/3, read_projection/4, write_projection/3, write_projection/4, @@ -138,7 +140,7 @@ append_chunk_extra(Host, TcpPort, EpochID, Prefix, Chunk, ChunkExtra) -spec read_chunk(port_wrap(), epoch_id(), file_name(), file_offset(), chunk_size()) -> {ok, chunk_s()} | - {error, error_general() | 'no_such_file' | 'partial_read'} | + {error, error_general() | 'not_written' | 'partial_read'} | {error, term()}. read_chunk(Sock, EpochID, File, Offset, Size) when Offset >= ?MINIMUM_OFFSET, Size >= 0 -> @@ -149,7 +151,7 @@ read_chunk(Sock, EpochID, File, Offset, Size) -spec read_chunk(inet_host(), inet_port(), epoch_id(), file_name(), file_offset(), chunk_size()) -> {ok, chunk_s()} | - {error, error_general() | 'no_such_file' | 'partial_read'} | + {error, error_general() | 'not_written' | 'partial_read'} | {error, term()}. read_chunk(Host, TcpPort, EpochID, File, Offset, Size) when Offset >= ?MINIMUM_OFFSET, Size >= 0 -> @@ -223,22 +225,21 @@ wedge_status(Host, TcpPort) when is_integer(TcpPort) -> %% @doc Get the latest epoch number + checksum from the FLU's projection store. --spec get_latest_epoch(port_wrap(), projection_type()) -> +-spec get_latest_epochid(port_wrap(), projection_type()) -> {ok, epoch_id()} | {error, term()}. -get_latest_epoch(Sock, ProjType) +get_latest_epochid(Sock, ProjType) when ProjType == 'public' orelse ProjType == 'private' -> - get_latest_epoch2(Sock, ProjType). + get_latest_epochid2(Sock, ProjType). %% @doc Get the latest epoch number + checksum from the FLU's projection store. --spec get_latest_epoch(inet_host(), inet_port(), - projection_type()) -> +-spec get_latest_epochid(inet_host(), inet_port(), projection_type()) -> {ok, epoch_id()} | {error, term()}. -get_latest_epoch(Host, TcpPort, ProjType) +get_latest_epochid(Host, TcpPort, ProjType) when ProjType == 'public' orelse ProjType == 'private' -> Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), try - get_latest_epoch2(Sock, ProjType) + get_latest_epochid2(Sock, ProjType) after disconnect(Sock) end. @@ -290,7 +291,7 @@ read_projection(Host, TcpPort, ProjType, Epoch) %% @doc Write a projection `Proj' of type `ProjType'. -spec write_projection(port_wrap(), projection_type(), projection()) -> - 'ok' | {error, written} | {error, term()}. + 'ok' | {error, 'written'} | {error, term()}. write_projection(Sock, ProjType, Proj) when ProjType == 'public' orelse ProjType == 'private', is_record(Proj, projection_v1) -> @@ -300,7 +301,7 @@ write_projection(Sock, ProjType, Proj) -spec write_projection(inet_host(), inet_port(), projection_type(), projection()) -> - 'ok' | {error, written} | {error, term()}. + 'ok' | {error, 'written'} | {error, term()}. write_projection(Host, TcpPort, ProjType, Proj) when ProjType == 'public' orelse ProjType == 'private', is_record(Proj, projection_v1) -> @@ -501,24 +502,25 @@ read_chunk2(Sock, EpochID, File0, Offset, Size) -> SizeHex = machi_util:int_to_hexbin(Size, 32), CmdLF = [$R, 32, EpochIDHex, PrefixHex, SizeHex, File, 10], ok = w_send(Sock, CmdLF), + ok = w_setopts(Sock, [{packet, raw}]), case w_recv(Sock, 3) of {ok, <<"OK\n">>} -> {ok, _Chunk}=Res = w_recv(Sock, Size), Res; {ok, Else} -> - {ok, OldOpts} = w_getopts(Sock, [packet]), ok = w_setopts(Sock, [{packet, line}]), {ok, Else2} = w_recv(Sock, 0), - ok = w_setopts(Sock, OldOpts), case Else of <<"ERA">> -> {error, todo_erasure_coded}; %% escript_cc_parse_ec_info(Sock, Line, Else2); <<"ERR">> -> case Else2 of - <<"OR BAD-IO\n">> -> - {error, no_such_file}; + <<"OR NO-SUCH-FILE\n">> -> + {error, not_written}; <<"OR NOT-ERASURE\n">> -> - {error, no_such_file}; + %% {error, no_such_file}; + %% Ignore the fact that the file doesn't exist. + {error, not_written}; <<"OR BAD-ARG\n">> -> {error, bad_arg}; <<"OR PARTIAL-READ\n">> -> @@ -536,6 +538,9 @@ read_chunk2(Sock, EpochID, File0, Offset, Size) -> throw:Error -> put(bad_sock, Sock), Error; + error:{case_clause,_}=Noo -> + put(bad_sock, Sock), + {error, {badmatch, Noo, erlang:get_stacktrace()}}; error:{badmatch,_}=BadMatch -> put(bad_sock, Sock), {error, {badmatch, BadMatch, erlang:get_stacktrace()}} @@ -561,7 +566,11 @@ list2(Sock, EpochID) -> catch throw:Error -> Error; + error:{case_clause,_}=Noo -> + put(bad_sock, Sock), + {error, {badmatch, Noo, erlang:get_stacktrace()}}; error:{badmatch,_}=BadMatch -> + put(bad_sock, Sock), {error, {badmatch, BadMatch}} end. @@ -593,7 +602,11 @@ wedge_status2(Sock) -> catch throw:Error -> Error; + error:{case_clause,_}=Noo -> + put(bad_sock, Sock), + {error, {badmatch, Noo, erlang:get_stacktrace()}}; error:{badmatch,_}=BadMatch -> + put(bad_sock, Sock), {error, {badmatch, BadMatch}} end. @@ -620,16 +633,15 @@ checksum_list2(Sock, EpochID, File) -> {ok, <<"ERROR WEDGED", _/binary>>} -> {error, wedged}; {ok, Else} -> - throw({server_protocol_error, Else}); - {error, closed} -> - throw({error, closed}); - Else -> - throw(Else) + throw({server_protocol_error, Else}) end catch throw:Error -> put(bad_sock, Sock), Error; + error:{case_clause,_}=Noo -> + put(bad_sock, Sock), + {error, {badmatch, Noo, erlang:get_stacktrace()}}; error:{badmatch,_}=BadMatch -> put(bad_sock, Sock), {error, {badmatch, BadMatch}} @@ -724,6 +736,9 @@ delete_migration2(Sock, EpochID, File) -> throw:Error -> put(bad_sock, Sock), Error; + error:{case_clause,_}=Noo -> + put(bad_sock, Sock), + {error, {badmatch, Noo, erlang:get_stacktrace()}}; error:{badmatch,_}=BadMatch -> put(bad_sock, Sock), {error, {badmatch, BadMatch}} @@ -754,13 +769,16 @@ trunc_hack2(Sock, EpochID, File) -> throw:Error -> put(bad_sock, Sock), Error; + error:{case_clause,_}=Noo -> + put(bad_sock, Sock), + {error, {badmatch, Noo, erlang:get_stacktrace()}}; error:{badmatch,_}=BadMatch -> put(bad_sock, Sock), {error, {badmatch, BadMatch}} end. -get_latest_epoch2(Sock, ProjType) -> - ProjCmd = {get_latest_epoch, ProjType}, +get_latest_epochid2(Sock, ProjType) -> + ProjCmd = {get_latest_epochid, ProjType}, do_projection_common(Sock, ProjCmd). read_latest_projection2(Sock, ProjType) -> @@ -804,14 +822,15 @@ do_projection_common(Sock, ProjCmd) -> binary_to_term(ResBin); Else -> {error, Else} - end; - {error, _} = Bad -> - throw(Bad) + end end catch throw:Error -> put(bad_sock, Sock), Error; + error:{case_clause,_}=Noo -> + put(bad_sock, Sock), + {error, {badmatch, Noo, erlang:get_stacktrace()}}; error:{badmatch,_}=BadMatch -> put(bad_sock, Sock), {error, {badmatch, BadMatch, erlang:get_stacktrace()}} @@ -823,7 +842,7 @@ w_connect(#p_srvr{proto_mod=?MODULE, address=Host, port=Port, props=Props})-> try case proplists:get_value(session_proto, Props, tcp) of tcp -> - Sock = machi_util:connect(Host, Port), + Sock = machi_util:connect(Host, Port, ?HARD_TIMEOUT), {w,tcp,Sock}; %% sctp -> %% %% TODO: not implemented @@ -845,14 +864,11 @@ w_close({w,tcp,Sock}) -> ok. w_recv({w,tcp,Sock}, Amt) -> - gen_tcp:recv(Sock, Amt). + gen_tcp:recv(Sock, Amt, ?HARD_TIMEOUT). w_send({w,tcp,Sock}, IoData) -> gen_tcp:send(Sock, IoData). -w_getopts({w,tcp,Sock}, Opts) -> - inet:getopts(Sock, Opts). - w_setopts({w,tcp,Sock}, Opts) -> inet:setopts(Sock, Opts). diff --git a/src/machi_projection_store.erl b/src/machi_projection_store.erl index 9ddeb0d..8818588 100644 --- a/src/machi_projection_store.erl +++ b/src/machi_projection_store.erl @@ -43,7 +43,7 @@ %% API -export([ start_link/3, - get_latest_epoch/2, get_latest_epoch/3, + get_latest_epochid/2, get_latest_epochid/3, read_latest_projection/2, read_latest_projection/3, read/3, read/4, write/3, write/4, @@ -62,8 +62,8 @@ public_dir = "" :: string(), private_dir = "" :: string(), wedge_notify_pid :: pid() | atom(), - max_public_epoch = ?NO_EPOCH :: {-1 | non_neg_integer(), binary()}, - max_private_epoch = ?NO_EPOCH :: {-1 | non_neg_integer(), binary()} + max_public_epochid = ?NO_EPOCH :: {-1 | non_neg_integer(), binary()}, + max_private_epochid = ?NO_EPOCH :: {-1 | non_neg_integer(), binary()} }). %% @doc Start a new projection store server. @@ -80,15 +80,15 @@ start_link(RegName, DataDir, NotifyWedgeStateChanges) -> %% @doc Fetch the latest epoch number + checksum for type `ProjType'. -get_latest_epoch(PidSpec, ProjType) -> - get_latest_epoch(PidSpec, ProjType, infinity). +get_latest_epochid(PidSpec, ProjType) -> + get_latest_epochid(PidSpec, ProjType, infinity). %% @doc Fetch the latest epoch number + checksum for type `ProjType'. %% projection. -get_latest_epoch(PidSpec, ProjType, Timeout) +get_latest_epochid(PidSpec, ProjType, Timeout) when ProjType == 'public' orelse ProjType == 'private' -> - g_call(PidSpec, {get_latest_epoch, ProjType}, Timeout). + g_call(PidSpec, {get_latest_epochid, ProjType}, Timeout). %% @doc Fetch the latest projection record for type `ProjType'. @@ -168,25 +168,27 @@ init([DataDir, NotifyWedgeStateChanges]) -> PrivateDir = machi_util:make_projection_filename(DataDir, "private"), ok = filelib:ensure_dir(PublicDir ++ "/ignored"), ok = filelib:ensure_dir(PrivateDir ++ "/ignored"), - MaxPublicEpoch = find_max_epoch(PublicDir), - MaxPrivateEpoch = find_max_epoch(PrivateDir), + MbEpoch = find_max_epochid(PublicDir), + %% MbEpoch = {Mb#projection_v1.epoch_number, Mb#projection_v1.epoch_csum}, + MvEpoch = find_max_epochid(PrivateDir), + %% MvEpoch = {Mv#projection_v1.epoch_number, Mv#projection_v1.epoch_csum}, {ok, #state{public_dir=PublicDir, private_dir=PrivateDir, wedge_notify_pid=NotifyWedgeStateChanges, - max_public_epoch=MaxPublicEpoch, - max_private_epoch=MaxPrivateEpoch}}. + max_public_epochid=MbEpoch, + max_private_epochid=MvEpoch}}. -handle_call({{get_latest_epoch, ProjType}, LC1}, _From, S) -> +handle_call({{get_latest_epochid, ProjType}, LC1}, _From, S) -> LC2 = lclock_update(LC1), - EpochId = if ProjType == public -> S#state.max_public_epoch; - ProjType == private -> S#state.max_private_epoch + EpochId = if ProjType == public -> S#state.max_public_epochid; + ProjType == private -> S#state.max_private_epochid end, {reply, {{ok, EpochId}, LC2}, S}; handle_call({{read_latest_projection, ProjType}, LC1}, _From, S) -> LC2 = lclock_update(LC1), - {EpochNum, _CSum} = if ProjType == public -> S#state.max_public_epoch; - ProjType == private -> S#state.max_private_epoch + {EpochNum, _CSum} = if ProjType == public -> S#state.max_public_epochid; + ProjType == private -> S#state.max_private_epochid end, {Reply, NewS} = do_proj_read(ProjType, EpochNum, S), {reply, {Reply, LC2}, NewS}; @@ -268,7 +270,7 @@ do_proj_write(ProjType, #projection_v1{epoch_number=Epoch}=Proj, S) -> EffectiveEpochId = {EffectiveEpoch, EffectiveProj#projection_v1.epoch_csum}, %% NewS = if ProjType == public, - Epoch > element(1, S#state.max_public_epoch) -> + Epoch > element(1, S#state.max_public_epochid) -> if Epoch == EffectiveEpoch -> %% This is a regular projection, i.e., %% does not have an inner proj. @@ -281,13 +283,13 @@ do_proj_write(ProjType, #projection_v1{epoch_number=Epoch}=Proj, S) -> %% not bother wedging. ok end, - S#state{max_public_epoch=EpochId}; + S#state{max_public_epochid=EpochId}; ProjType == private, - Epoch > element(1, S#state.max_private_epoch) -> + Epoch > element(1, S#state.max_private_epochid) -> update_wedge_state( S#state.wedge_notify_pid, false, EffectiveEpochId), - S#state{max_private_epoch=EpochId}; + S#state{max_private_epochid=EpochId}; true -> S end, @@ -344,14 +346,14 @@ find_all(Dir) -> Fs = filelib:wildcard("*", Dir), lists:sort([name2epoch(F) || F <- Fs]). -find_max_epoch(Dir) -> +find_max_epochid(Dir) -> Fs = lists:sort(filelib:wildcard("*", Dir)), if Fs == [] -> ?NO_EPOCH; true -> EpochNum = name2epoch(lists:last(Fs)), {{ok, Proj}, _} = do_proj_read(proj_type_ignored, EpochNum, Dir), - {EpochNum, Proj} + {EpochNum, Proj#projection_v1.epoch_csum} end. %%%%%%%%%%%%%%%%%%%%%%%%%%% diff --git a/src/machi_proxy_flu1_client.erl b/src/machi_proxy_flu1_client.erl index 74fb116..3d0100e 100644 --- a/src/machi_proxy_flu1_client.erl +++ b/src/machi_proxy_flu1_client.erl @@ -59,7 +59,7 @@ %% %% Projection API get_epoch_id/1, get_epoch_id/2, - get_latest_epoch/2, get_latest_epoch/3, + get_latest_epochid/2, get_latest_epochid/3, read_latest_projection/2, read_latest_projection/3, read_projection/3, read_projection/4, write_projection/3, write_projection/4, @@ -70,7 +70,10 @@ quit/1, %% Internal API - write_chunk/5, write_chunk/6 + write_chunk/5, write_chunk/6, + + %% Helpers + stop_proxies/1, start_proxies/1 ]). %% gen_server callbacks @@ -173,13 +176,13 @@ get_epoch_id(PidSpec, Timeout) -> %% @doc Get the latest epoch number + checksum from the FLU's projection store. -get_latest_epoch(PidSpec, ProjType) -> - get_latest_epoch(PidSpec, ProjType, infinity). +get_latest_epochid(PidSpec, ProjType) -> + get_latest_epochid(PidSpec, ProjType, infinity). %% @doc Get the latest epoch number + checksum from the FLU's projection store. -get_latest_epoch(PidSpec, ProjType, Timeout) -> - gen_server:call(PidSpec, {req, {get_latest_epoch, ProjType}}, +get_latest_epochid(PidSpec, ProjType, Timeout) -> + gen_server:call(PidSpec, {req, {get_latest_epochid, ProjType}}, Timeout). %% @doc Get the latest projection from the FLU's projection store for `ProjType' @@ -276,6 +279,7 @@ handle_cast(_Msg, S) -> {noreply, S}. handle_info(_Info, S) -> + io:format(user, "~s:handle_info: ~p\n", [?MODULE, _Info]), {noreply, S}. terminate(_Reason, _S) -> @@ -287,25 +291,38 @@ code_change(_OldVsn, S, _Extra) -> %%%%%%%%%%%%%%%%%%%%%%%%%%% do_req(Req, S) -> + do_req(Req, 1, S). + +do_req(Req, Depth, S) -> S2 = try_connect(S), Fun = make_req_fun(Req, S2), case connected_p(S2) of true -> case Fun() of + ok -> + {ok, S2}; T when element(1, T) == ok -> {T, S2}; - Else -> + %% {error, {badmatch, {badmatch, {error, Why}=TheErr}, _Stk}} + %% when Why == closed; Why == timeout -> + %% do_req_retry(Req, Depth, TheErr, S2); + TheErr -> case get(bad_sock) of Bad when Bad == S2#state.sock -> - {Else, disconnect(S2)}; + do_req_retry(Req, Depth, TheErr, S2); _ -> - {Else, S2} + {TheErr, S2} end end; false -> {{error, partition}, S2} end. +do_req_retry(_Req, 2, Err, S) -> + {Err, disconnect(S)}; +do_req_retry(Req, Depth, _Err, S) -> + do_req(Req, Depth + 1, try_connect(disconnect(S))). + make_req_fun({append_chunk, EpochID, Prefix, Chunk}, #state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) -> fun() -> Mod:append_chunk(Sock, EpochID, Prefix, Chunk) end; @@ -339,9 +356,9 @@ make_req_fun({get_epoch_id}, Error end end; -make_req_fun({get_latest_epoch, ProjType}, +make_req_fun({get_latest_epochid, ProjType}, #state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) -> - fun() -> Mod:get_latest_epoch(Sock, ProjType) end; + fun() -> Mod:get_latest_epochid(Sock, ProjType) end; make_req_fun({read_latest_projection, ProjType}, #state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) -> fun() -> Mod:read_latest_projection(Sock, ProjType) end; @@ -377,3 +394,22 @@ disconnect(#state{sock=Sock, i=#p_srvr{proto_mod=Mod}=_I}=S) -> Mod:disconnect(Sock), S#state{sock=undefined}. + + +stop_proxies(undefined) -> + []; +stop_proxies(ProxiesDict) -> + orddict:fold( + fun(_K, Pid, _Acc) -> + _ = (catch machi_proxy_flu1_client:quit(Pid)) + end, [], ProxiesDict). + +start_proxies(MembersDict) -> + Proxies = orddict:fold( + fun(K, P, Acc) -> + {ok, Pid} = machi_proxy_flu1_client:start_link(P), + [{K, Pid}|Acc] + end, [], orddict:to_list(MembersDict)), + orddict:from_list(Proxies). + + diff --git a/src/machi_util.erl b/src/machi_util.erl index aedf4c6..6d1b2ca 100644 --- a/src/machi_util.erl +++ b/src/machi_util.erl @@ -274,4 +274,3 @@ escript_connect(Host, Port, Timeout) when is_integer(Port) -> {ok, Sock} = gen_tcp:connect(Host, Port, [{active,false}, {mode,binary}, {packet, raw}], Timeout), Sock. - diff --git a/test/machi_chain_manager1_converge_demo.erl b/test/machi_chain_manager1_converge_demo.erl index af14870..93e73eb 100644 --- a/test/machi_chain_manager1_converge_demo.erl +++ b/test/machi_chain_manager1_converge_demo.erl @@ -378,10 +378,10 @@ todo_why_does_this_crash_sometimes(FLUName, FLU, PPPepoch) -> end. private_projections_are_stable(Namez, PollFunc) -> - Private1 = [?FLU_PC:get_latest_epoch(FLU, private) || + Private1 = [?FLU_PC:get_latest_epochid(FLU, private) || {_Name, FLU} <- Namez], PollFunc(5, 1, 10), - Private2 = [?FLU_PC:get_latest_epoch(FLU, private) || + Private2 = [?FLU_PC:get_latest_epochid(FLU, private) || {_Name, FLU} <- Namez], true = (Private1 == Private2). diff --git a/test/machi_cr_client_test.erl b/test/machi_cr_client_test.erl new file mode 100644 index 0000000..3865b54 --- /dev/null +++ b/test/machi_cr_client_test.erl @@ -0,0 +1,136 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(machi_cr_client_test). + +-include("machi.hrl"). +-include("machi_projection.hrl"). + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + + +smoke_test() -> + os:cmd("rm -rf ./data.a ./data.b ./data.c"), + {ok, SupPid} = machi_flu_sup:start_link(), + error_logger:tty(false), + try + Prefix = <<"pre">>, + Chunk1 = <<"yochunk">>, + Host = "localhost", + PortBase = 4444, + Os = [{ignore_stability_time, true}, {active_mode, false}], + {ok,_}=machi_flu_psup:start_flu_package(a, PortBase+0, "./data.a", Os), + {ok,_}=machi_flu_psup:start_flu_package(b, PortBase+1, "./data.b", Os), + {ok,_}=machi_flu_psup:start_flu_package(c, PortBase+2, "./data.c", Os), + D = orddict:from_list( + [{a,{p_srvr,a,machi_flu1_client,"localhost",PortBase+0,[]}}, + {b,{p_srvr,b,machi_flu1_client,"localhost",PortBase+1,[]}}, + {c,{p_srvr,c,machi_flu1_client,"localhost",PortBase+2,[]}}]), + %% Force the chain to repair & fully assemble as quickly as possible. + %% 1. Use set_chain_members() on all 3 + %% 2. Force a to run repair in a tight loop + %% 3. Stop as soon as we see UPI=[a,b,c] and also author=c. + %% Otherwise, we can have a race with later, minor + %% projection changes which will change our understanding of + %% the epoch id. (C is the author with highest weight.) + %% 4. Wait until all others are using epoch id from #3. + %% + %% Damn, this is a pain to make 100% deterministic, bleh. + ok = machi_chain_manager1:set_chain_members(a_chmgr, D), + ok = machi_chain_manager1:set_chain_members(b_chmgr, D), + ok = machi_chain_manager1:set_chain_members(c_chmgr, D), + TickAll = fun() -> [begin + Pid ! tick_check_environment, + timer:sleep(50) + end || Pid <- [a_chmgr,b_chmgr,c_chmgr] ] + end, + _ = lists:foldl( + fun(_, [{c,[a,b,c]}]=Acc) -> Acc; + (_, _Acc) -> + TickAll(), % has some sleep time inside + Xs = [begin + {ok, Prj} = machi_projection_store:read_latest_projection(PStore, private), + {Prj#projection_v1.author_server, + Prj#projection_v1.upi} + end || PStore <- [a_pstore,b_pstore,c_pstore] ], + lists:usort(Xs) + end, undefined, lists:seq(1,10000)), + %% Everyone is settled on the same damn epoch id. + {ok, EpochID} = machi_flu1_client:get_latest_epochid(Host, PortBase+0, + private), + {ok, EpochID} = machi_flu1_client:get_latest_epochid(Host, PortBase+1, + private), + {ok, EpochID} = machi_flu1_client:get_latest_epochid(Host, PortBase+2, + private), + + %% Whew ... ok, now start some damn tests. + {ok, C1} = machi_cr_client:start_link([P || {_,P}<-orddict:to_list(D)]), + machi_cr_client:append_chunk(C1, Prefix, Chunk1), + %% {machi_flu_psup:stop_flu_package(c), timer:sleep(50)}, + {ok, {Off1,Size1,File1}} = + machi_cr_client:append_chunk(C1, Prefix, Chunk1), + {ok, Chunk1} = machi_cr_client:read_chunk(C1, File1, Off1, Size1), + {ok, PPP} = machi_flu1_client:read_latest_projection(Host, PortBase+0, + private), + %% Verify that the client's CR wrote to all of them. + [{ok, Chunk1} = machi_flu1_client:read_chunk( + Host, PortBase+X, EpochID, File1, Off1, Size1) || + X <- [0,1,2] ], + + %% Test read repair: Manually write to head, then verify that + %% read-repair fixes all. + FooOff1 = Off1 + (1024*1024), + [{error, not_written} = machi_flu1_client:read_chunk( + Host, PortBase+X, EpochID, + File1, FooOff1, Size1) || X <- [0,1,2] ], + ok = machi_flu1_client:write_chunk(Host, PortBase+0, EpochID, + File1, FooOff1, Chunk1), + {ok, Chunk1} = machi_cr_client:read_chunk(C1, File1, FooOff1, Size1), + [{X,{ok, Chunk1}} = {X,machi_flu1_client:read_chunk( + Host, PortBase+X, EpochID, + File1, FooOff1, Size1)} || X <- [0,1,2] ], + + %% Test read repair: Manually write to middle, then same checking. + FooOff2 = Off1 + (2*1024*1024), + Chunk2 = <<"Middle repair chunk">>, + Size2 = size(Chunk2), + ok = machi_flu1_client:write_chunk(Host, PortBase+1, EpochID, + File1, FooOff2, Chunk2), + {ok, Chunk2} = machi_cr_client:read_chunk(C1, File1, FooOff2, Size2), + [{X,{ok, Chunk2}} = {X,machi_flu1_client:read_chunk( + Host, PortBase+X, EpochID, + File1, FooOff2, Size2)} || X <- [0,1,2] ], + + %% Misc API smoke + %% Checksum lists are 3-tuples + {ok, [{_,_,_}|_]} = machi_cr_client:checksum_list(C1, File1), + {error, no_such_file} = machi_cr_client:checksum_list(C1, <<"!!!!">>), + %% Exactly one file right now + {ok, [_]} = machi_cr_client:list_files(C1), + + ok + after + error_logger:tty(true), + catch application:stop(machi), + exit(SupPid, normal) + end. + +-endif. % TEST. diff --git a/test/machi_flu1_test.erl b/test/machi_flu1_test.erl index 48e0831..143e79a 100644 --- a/test/machi_flu1_test.erl +++ b/test/machi_flu1_test.erl @@ -80,9 +80,9 @@ flu_smoke_test() -> BadPrefix, Chunk1), {ok, [{_,File1}]} = ?FLU_C:list_files(Host, TcpPort, ?DUMMY_PV1_EPOCH), Len1 = size(Chunk1), - {error, no_such_file} = ?FLU_C:read_chunk(Host, TcpPort, + {error, not_written} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH, - File1, Off1*983, Len1), + File1, Off1*983829323, Len1), {error, partial_read} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH, File1, Off1, Len1*984), @@ -114,9 +114,9 @@ flu_smoke_test() -> BadFile, Off2, Chunk2), {ok, Chunk2} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH, File2, Off2, Len2), - {error, no_such_file} = ?FLU_C:read_chunk(Host, TcpPort, - ?DUMMY_PV1_EPOCH, - "no!!", Off2, Len2), + {error, not_written} = ?FLU_C:read_chunk(Host, TcpPort, + ?DUMMY_PV1_EPOCH, + "no!!", Off2, Len2), {error, bad_arg} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH, BadFile, Off2, Len2), @@ -149,7 +149,7 @@ flu_projection_smoke_test() -> FLU1 = setup_test_flu(projection_test_flu, TcpPort, DataDir), try [begin - {ok, {0,_}} = ?FLU_C:get_latest_epoch(Host, TcpPort, T), + {ok, {0,_}} = ?FLU_C:get_latest_epochid(Host, TcpPort, T), {error, not_written} = ?FLU_C:read_latest_projection(Host, TcpPort, T), {ok, []} = ?FLU_C:list_all_projections(Host, TcpPort, T), @@ -160,7 +160,7 @@ flu_projection_smoke_test() -> ok = ?FLU_C:write_projection(Host, TcpPort, T, P1), {error, written} = ?FLU_C:write_projection(Host, TcpPort, T, P1), {ok, P1} = ?FLU_C:read_projection(Host, TcpPort, T, 1), - {ok, {1,_}} = ?FLU_C:get_latest_epoch(Host, TcpPort, T), + {ok, {1,_}} = ?FLU_C:get_latest_epochid(Host, TcpPort, T), {ok, P1} = ?FLU_C:read_latest_projection(Host, TcpPort, T), {ok, [1]} = ?FLU_C:list_all_projections(Host, TcpPort, T), {ok, [P1]} = ?FLU_C:get_all_projections(Host, TcpPort, T), diff --git a/test/machi_proxy_flu1_client_test.erl b/test/machi_proxy_flu1_client_test.erl index 4b0c7c7..5b723fe 100644 --- a/test/machi_proxy_flu1_client_test.erl +++ b/test/machi_proxy_flu1_client_test.erl @@ -74,7 +74,7 @@ api_smoke_test() -> {error, no_such_file} = ?MUT:checksum_list(Prox1, FakeEpoch, BadFile), {ok, [_|_]} = ?MUT:list_files(Prox1, FakeEpoch), {ok, {false, _}} = ?MUT:wedge_status(Prox1), - {ok, FakeEpoch} = ?MUT:get_latest_epoch(Prox1, public), + {ok, FakeEpoch} = ?MUT:get_latest_epochid(Prox1, public), {error, not_written} = ?MUT:read_latest_projection(Prox1, public), {error, not_written} = ?MUT:read_projection(Prox1, public, 44), P_a = #p_srvr{name=a, address="localhost", port=6622}, @@ -83,6 +83,7 @@ api_smoke_test() -> {ok, P1} = ?MUT:read_projection(Prox1, public, 1), {ok, [P1]} = ?MUT:get_all_projections(Prox1, public), {ok, [1]} = ?MUT:list_all_projections(Prox1, public), + ok after _ = (catch ?MUT:quit(Prox1)) @@ -91,5 +92,183 @@ api_smoke_test() -> (catch machi_flu1:stop(FLU1)), (catch machi_flu1:stop(get(flu_pid))) end. + +flu_restart_test() -> + RegName = api_smoke_flu, + Host = "localhost", + TcpPort = 57125, + DataDir = "./data.api_smoke_flu2", + W_props = [{initial_wedged, false}], + erase(flu_pid), + put(flu_pid, []), + FLU1 = machi_flu1_test:setup_test_flu(RegName, TcpPort, DataDir, + W_props), + put(flu_pid, [FLU1|get(flu_pid)]), + + try + I = #p_srvr{name=RegName, address=Host, port=TcpPort}, + {ok, Prox1} = ?MUT:start_link(I), + try + FakeEpoch = ?DUMMY_PV1_EPOCH, + Data = <<"data!">>, + {ok, {Off1,Size1,File1}} = ?MUT:append_chunk(Prox1, + FakeEpoch, <<"prefix">>, Data, + infinity), + P_a = #p_srvr{name=a, address="localhost", port=6622}, + P1 = machi_projection:new(1, a, [P_a], [], [a], [], []), + EpochID = {P1#projection_v1.epoch_number, + P1#projection_v1.epoch_csum}, + ok = ?MUT:write_projection(Prox1, public, P1), + ok = ?MUT:write_projection(Prox1, private, P1), + {ok, EpochID} = ?MUT:get_epoch_id(Prox1), + {ok, EpochID} = ?MUT:get_latest_epochid(Prox1, public), + {ok, EpochID} = ?MUT:get_latest_epochid(Prox1, private), + ok = machi_flu1:stop(FLU1), timer:sleep(50), + + %% Now that the last proxy op was successful and only + %% after did we stop the FLU, let's check that both the + %% 1st & 2nd ops-via-proxy after FLU is restarted are + %% successful. And immediately after stopping the FLU, + %% both 1st & 2nd ops-via-proxy should always fail. + %% + %% Some of the expectations have unbound variables, which + %% makes the code a bit convoluted. (No LFE or + %% Elixir macros here, alas, they'd be useful.) + + ExpectedOps = + [ + fun(run) -> {ok, EpochID} = ?MUT:get_epoch_id(Prox1), + ok; + (line) -> io:format("line ~p, ", [?LINE]); + (stop) -> ?MUT:get_epoch_id(Prox1) end, + fun(run) -> {ok, EpochID} = + ?MUT:get_latest_epochid(Prox1, public), + ok; + (line) -> io:format("line ~p, ", [?LINE]); + (stop) -> ?MUT:get_latest_epochid(Prox1, public) + end, + fun(run) -> {ok, EpochID} = + ?MUT:get_latest_epochid(Prox1, private), + ok; + (line) -> io:format("line ~p, ", [?LINE]); + (stop) -> ?MUT:get_latest_epochid(Prox1, private) + end, + fun(run) -> {ok, P1} = + ?MUT:read_projection(Prox1, public, 1), + ok; + (line) -> io:format("line ~p, ", [?LINE]); + (stop) -> ?MUT:read_projection(Prox1, public, 1) + end, + fun(run) -> {ok, P1} = + ?MUT:read_projection(Prox1, private, 1), + ok; + (line) -> io:format("line ~p, ", [?LINE]); + (stop) -> ?MUT:read_projection(Prox1, private, 1) + end, + fun(run) -> {error, not_written} = + ?MUT:read_projection(Prox1, private, 7), + ok; + (line) -> io:format("line ~p, ", [?LINE]); + (stop) -> ?MUT:read_projection(Prox1, private, 7) + end, + fun(run) -> {error, written} = + ?MUT:write_projection(Prox1, public, P1), + ok; + (line) -> io:format("line ~p, ", [?LINE]); + (stop) -> ?MUT:write_projection(Prox1, public, P1) + end, + fun(run) -> {error, written} = + ?MUT:write_projection(Prox1, private, P1), + ok; + (line) -> io:format("line ~p, ", [?LINE]); + (stop) -> ?MUT:write_projection(Prox1, private, P1) + end, + fun(run) -> {ok, [_]} = + ?MUT:get_all_projections(Prox1, public), + ok; + (line) -> io:format("line ~p, ", [?LINE]); + (stop) -> ?MUT:get_all_projections(Prox1, public) + end, + fun(run) -> {ok, [_]} = + ?MUT:get_all_projections(Prox1, private), + ok; + (line) -> io:format("line ~p, ", [?LINE]); + (stop) -> ?MUT:get_all_projections(Prox1, private) + end, + fun(run) -> {ok, {_,_,_}} = + ?MUT:append_chunk(Prox1, FakeEpoch, + <<"prefix">>, Data, infinity), + ok; + (line) -> io:format("line ~p, ", [?LINE]); + (stop) -> ?MUT:append_chunk(Prox1, FakeEpoch, + <<"prefix">>, Data, infinity) + end, + fun(run) -> {ok, {_,_,_}} = + ?MUT:append_chunk_extra(Prox1, FakeEpoch, + <<"prefix">>, Data, 42, infinity), + ok; + (line) -> io:format("line ~p, ", [?LINE]); + (stop) -> ?MUT:append_chunk_extra(Prox1, FakeEpoch, + <<"prefix">>, Data, 42, infinity) + end, + fun(run) -> {ok, Data} = + ?MUT:read_chunk(Prox1, FakeEpoch, + File1, Off1, Size1), + ok; + (line) -> io:format("line ~p, ", [?LINE]); + (stop) -> ?MUT:read_chunk(Prox1, FakeEpoch, + File1, Off1, Size1) + end, + fun(run) -> {ok, _} = + ?MUT:checksum_list(Prox1, FakeEpoch, File1), + ok; + (line) -> io:format("line ~p, ", [?LINE]); + (stop) -> ?MUT:checksum_list(Prox1, FakeEpoch, File1) + end, + fun(run) -> {ok, _} = + ?MUT:list_files(Prox1, FakeEpoch), + ok; + (line) -> io:format("line ~p, ", [?LINE]); + (stop) -> ?MUT:list_files(Prox1, FakeEpoch) + end, + fun(run) -> {ok, _} = + ?MUT:wedge_status(Prox1), + ok; + (line) -> io:format("line ~p, ", [?LINE]); + (stop) -> ?MUT:wedge_status(Prox1) + end, + %% NOTE: When write-once enforcement is enabled, this test + %% will fail: change ok -> {error, written} + fun(run) -> %% {error, written} = + ok = + ?MUT:write_chunk(Prox1, FakeEpoch, File1, Off1, + Data, infinity), + ok; + (line) -> io:format("line ~p, ", [?LINE]); + (stop) -> ?MUT:write_chunk(Prox1, FakeEpoch, File1, Off1, + Data, infinity) + end + ], + + [begin + FLU2 = machi_flu1_test:setup_test_flu( + RegName, TcpPort, DataDir, + [save_data_dir|W_props]), + put(flu_pid, [FLU2|get(flu_pid)]), + _ = Fun(line), + ok = Fun(run), + ok = Fun(run), + ok = machi_flu1:stop(FLU2), + {error, partition} = Fun(stop), + {error, partition} = Fun(stop), + ok + end || Fun <- ExpectedOps ], + ok + after + _ = (catch ?MUT:quit(Prox1)) + end + after + [catch machi_flu1:stop(Pid) || Pid <- get(flu_pid)] + end. -endif. % TEST