From d293170e929a0b2811c60ba67cceb8d118c67a62 Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Sun, 17 May 2015 23:48:05 +0900 Subject: [PATCH 01/13] WIP: starting machi_cr_client.erl --- include/machi_projection.hrl | 2 + src/machi_chain_manager1.erl | 13 +- src/machi_cr_client.erl | 327 ++++++++++++++++++++++++++++++++ src/machi_proxy_flu1_client.erl | 24 ++- src/machi_util.erl | 1 - 5 files changed, 355 insertions(+), 12 deletions(-) create mode 100644 src/machi_cr_client.erl diff --git a/include/machi_projection.hrl b/include/machi_projection.hrl index e20908a..ba0b8fd 100644 --- a/include/machi_projection.hrl +++ b/include/machi_projection.hrl @@ -21,6 +21,7 @@ -ifndef(MACHI_PROJECTION_HRL). -define(MACHI_PROJECTION_HRL, true). +-type pv1_consistency_mode() :: 'ap_mode' | 'cp_mode'. -type pv1_csum() :: binary(). -type pv1_epoch() :: {pv1_epoch_n(), pv1_csum()}. -type pv1_epoch_n() :: non_neg_integer(). @@ -46,6 +47,7 @@ author_server :: pv1_server(), all_members :: [pv1_server()], creation_time :: pv1_timestamp(), + mode = ap_mode :: pv1_consistency_mode(), upi :: [pv1_server()], repairing :: [pv1_server()], down :: [pv1_server()], diff --git a/src/machi_chain_manager1.erl b/src/machi_chain_manager1.erl index 5f73c6c..6aa1c90 100644 --- a/src/machi_chain_manager1.erl +++ b/src/machi_chain_manager1.erl @@ -762,17 +762,10 @@ rank_projection(#projection_v1{author_server=Author, (N*N * length(UPI_list)). do_set_chain_members_dict(MembersDict, #ch_mgr{proxies_dict=OldProxiesDict}=S)-> - catch orddict:fold( - fun(_K, Pid, _Acc) -> - _ = (catch ?FLU_PC:quit(Pid)) - end, [], OldProxiesDict), - Proxies = orddict:fold( - fun(K, P, Acc) -> - {ok, Pid} = ?FLU_PC:start_link(P), - [{K, Pid}|Acc] - end, [], MembersDict), + _ = ?FLU_PC:stop_proxies(OldProxiesDict), + ProxiesDict = ?FLU_PC:start_proxies(MembersDict), {ok, S#ch_mgr{members_dict=MembersDict, - proxies_dict=orddict:from_list(Proxies)}}. + proxies_dict=ProxiesDict}}. do_react_to_env(#ch_mgr{name=MyName, proj=#projection_v1{epoch_number=Epoch, diff --git a/src/machi_cr_client.erl b/src/machi_cr_client.erl new file mode 100644 index 0000000..2de4117 --- /dev/null +++ b/src/machi_cr_client.erl @@ -0,0 +1,327 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +%% @doc Erlang API for the Machi client-implemented Chain Replication +%% (CORFU-style) protocol. + +-module(machi_cr_client). + +-behaviour(gen_server). + +-include("machi.hrl"). +-include("machi_projection.hrl"). + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). +-endif. % TEST. + +-export([start_link/1]). +%% FLU1 API +-export([ + %% File API + append_chunk/3, append_chunk/4, + append_chunk_extra/4, append_chunk_extra/5, +%% read_chunk/5, read_chunk/6, +%% checksum_list/3, checksum_list/4, +%% list_files/2, list_files/3, +%% wedge_status/1, wedge_status/2, + +%% %% %% Projection API +%% get_epoch_id/1, get_epoch_id/2, +%% get_latest_epoch/2, get_latest_epoch/3, +%% read_latest_projection/2, read_latest_projection/3, +%% read_projection/3, read_projection/4, +%% write_projection/3, write_projection/4, +%% get_all_projections/2, get_all_projections/3, +%% list_all_projections/2, list_all_projections/3, + +%% %% Common API +%% quit/1, + +%% %% Internal API +%% write_chunk/5, write_chunk/6 + noop/0 + ]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-define(FLU_PC, machi_proxy_flu1_client). + +-record(state, { + members_dict :: p_srvr_dict(), + proxies_dict :: orddict:orddict(), + epoch_id, + proj, + bad_proj + }). + +%% @doc Start a local, long-lived process that will be our steady +%% & reliable communication proxy with the fickle & flaky +%% remote Machi server. + +start_link(P_srvr_list) -> + gen_server:start_link(?MODULE, [P_srvr_list], []). + +%% @doc Append a chunk (binary- or iolist-style) of data to a file +%% with `Prefix'. + +append_chunk(PidSpec, Prefix, Chunk) -> + append_chunk(PidSpec, Prefix, Chunk, infinity). + +%% @doc Append a chunk (binary- or iolist-style) of data to a file +%% with `Prefix'. + +append_chunk(PidSpec, Prefix, Chunk, Timeout) -> + append_chunk_extra(PidSpec, Prefix, Chunk, 0, Timeout). + +%% @doc Append a chunk (binary- or iolist-style) of data to a file +%% with `Prefix'. + +append_chunk_extra(PidSpec, Prefix, Chunk, ChunkExtra) + when is_integer(ChunkExtra), ChunkExtra >= 0 -> + append_chunk_extra(PidSpec, Prefix, Chunk, ChunkExtra, infinity). + +%% @doc Append a chunk (binary- or iolist-style) of data to a file +%% with `Prefix'. + +append_chunk_extra(PidSpec, Prefix, Chunk, ChunkExtra, Timeout) -> + gen_server:call(PidSpec, {req, {append_chunk_extra, Prefix, + Chunk, ChunkExtra}}, + Timeout). + +%% %% @doc Read a chunk of data of size `Size' from `File' at `Offset'. + +%% read_chunk(PidSpec, EpochID, File, Offset, Size) -> +%% read_chunk(PidSpec, EpochID, File, Offset, Size, infinity). + +%% %% @doc Read a chunk of data of size `Size' from `File' at `Offset'. + +%% read_chunk(PidSpec, EpochID, File, Offset, Size, Timeout) -> +%% gen_server:call(PidSpec, {req, {read_chunk, EpochID, File, Offset, Size}}, +%% Timeout). + +%% %% @doc Fetch the list of chunk checksums for `File'. + +%% checksum_list(PidSpec, EpochID, File) -> +%% checksum_list(PidSpec, EpochID, File, infinity). + +%% %% @doc Fetch the list of chunk checksums for `File'. + +%% checksum_list(PidSpec, EpochID, File, Timeout) -> +%% gen_server:call(PidSpec, {req, {checksum_list, EpochID, File}}, +%% Timeout). + +%% %% @doc Fetch the list of all files on the remote FLU. + +%% list_files(PidSpec, EpochID) -> +%% list_files(PidSpec, EpochID, infinity). + +%% %% @doc Fetch the list of all files on the remote FLU. + +%% list_files(PidSpec, EpochID, Timeout) -> +%% gen_server:call(PidSpec, {req, {list_files, EpochID}}, +%% Timeout). + +%% %% @doc Fetch the wedge status from the remote FLU. + +%% wedge_status(PidSpec) -> +%% wedge_status(PidSpec, infinity). + +%% %% @doc Fetch the wedge status from the remote FLU. + +%% wedge_status(PidSpec, Timeout) -> +%% gen_server:call(PidSpec, {req, {wedge_status}}, +%% Timeout). + +%% %% @doc Get the `epoch_id()' of the FLU's current/latest projection. + +%% get_epoch_id(PidSpec) -> +%% get_epoch_id(PidSpec, infinity). + +%% %% @doc Get the `epoch_id()' of the FLU's current/latest projection. + +%% get_epoch_id(PidSpec, Timeout) -> +%% gen_server:call(PidSpec, {req, {get_epoch_id}}, Timeout). + +%% %% @doc Get the latest epoch number + checksum from the FLU's projection store. + +%% get_latest_epoch(PidSpec, ProjType) -> +%% get_latest_epoch(PidSpec, ProjType, infinity). + +%% %% @doc Get the latest epoch number + checksum from the FLU's projection store. + +%% get_latest_epoch(PidSpec, ProjType, Timeout) -> +%% gen_server:call(PidSpec, {req, {get_latest_epoch, ProjType}}, +%% Timeout). + +%% %% @doc Get the latest projection from the FLU's projection store for `ProjType' + +%% read_latest_projection(PidSpec, ProjType) -> +%% read_latest_projection(PidSpec, ProjType, infinity). + +%% %% @doc Get the latest projection from the FLU's projection store for `ProjType' + +%% read_latest_projection(PidSpec, ProjType, Timeout) -> +%% gen_server:call(PidSpec, {req, {read_latest_projection, ProjType}}, +%% Timeout). + +%% %% @doc Read a projection `Proj' of type `ProjType'. + +%% read_projection(PidSpec, ProjType, Epoch) -> +%% read_projection(PidSpec, ProjType, Epoch, infinity). + +%% %% @doc Read a projection `Proj' of type `ProjType'. + +%% read_projection(PidSpec, ProjType, Epoch, Timeout) -> +%% gen_server:call(PidSpec, {req, {read_projection, ProjType, Epoch}}, +%% Timeout). + +%% %% @doc Write a projection `Proj' of type `ProjType'. + +%% write_projection(PidSpec, ProjType, Proj) -> +%% write_projection(PidSpec, ProjType, Proj, infinity). + +%% %% @doc Write a projection `Proj' of type `ProjType'. + +%% write_projection(PidSpec, ProjType, Proj, Timeout) -> +%% gen_server:call(PidSpec, {req, {write_projection, ProjType, Proj}}, +%% Timeout). + +%% %% @doc Get all projections from the FLU's projection store. + +%% get_all_projections(PidSpec, ProjType) -> +%% get_all_projections(PidSpec, ProjType, infinity). + +%% %% @doc Get all projections from the FLU's projection store. + +%% get_all_projections(PidSpec, ProjType, Timeout) -> +%% gen_server:call(PidSpec, {req, {get_all_projections, ProjType}}, +%% Timeout). + +%% %% @doc Get all epoch numbers from the FLU's projection store. + +%% list_all_projections(PidSpec, ProjType) -> +%% list_all_projections(PidSpec, ProjType, infinity). + +%% %% @doc Get all epoch numbers from the FLU's projection store. + +%% list_all_projections(PidSpec, ProjType, Timeout) -> +%% gen_server:call(PidSpec, {req, {list_all_projections, ProjType}}, +%% Timeout). + +%% %% @doc Quit & close the connection to remote FLU and stop our +%% %% proxy process. + +%% quit(PidSpec) -> +%% gen_server:call(PidSpec, quit, infinity). + +%% %% @doc Write a chunk (binary- or iolist-style) of data to a file +%% %% with `Prefix' at `Offset'. + +%% write_chunk(PidSpec, EpochID, File, Offset, Chunk) -> +%% write_chunk(PidSpec, EpochID, File, Offset, Chunk, infinity). + +%% %% @doc Write a chunk (binary- or iolist-style) of data to a file +%% %% with `Prefix' at `Offset'. + +%% write_chunk(PidSpec, EpochID, File, Offset, Chunk, Timeout) -> +%% gen_server:call(PidSpec, {req, {write_chunk, EpochID, File, Offset, Chunk}}, +%% Timeout). + +%%%%%%%%%%%%%%%%%%%%%%%%%%% + +init([P_srvr_list]) -> + MembersDict = orddict:from_list([{P#p_srvr.name, P} || P <- P_srvr_list]), + ProxiesDict = ?FLU_PC:start_proxies(MembersDict), + {ok, #state{members_dict=MembersDict, proxies_dict=ProxiesDict}}. + +handle_call({req, Req}, From, S) -> + handle_call2(Req, From, update_proj(S)); +handle_call(quit, _From, S) -> + {stop, normal, ok, S}; +handle_call(_Request, _From, S) -> + Reply = ok, + {reply, Reply, S}. + +handle_cast(_Msg, S) -> + {noreply, S}. + +handle_info(_Info, S) -> + {noreply, S}. + +terminate(_Reason, #state{proxies_dict=ProxiesDict}=_S) -> + _ = ?FLU_PC:stop_proxies(ProxiesDict), + ok. + +code_change(_OldVsn, S, _Extra) -> + {ok, S}. + +%%%%%%%%%%%%%%%%%%%%%%%%%%% + +handle_call2({append_chunk_extra, Prefix, Chunk, ChunkExtra}, _From, S) -> + {reply, sorry_dude, S}. + +update_proj(#state{proj=undefined}=S) -> + update_proj2(1, S); +update_proj(S) -> + S. + +update_proj2(Count, #state{bad_proj=BadProj, proxies_dict=ProxiesDict}=S) -> + Timeout = 2*1000, + PidsMons = + [spawn_monitor(fun() -> + exit(catch ?FLU_PC:read_latest_projection( + Proxy, private, Timeout)) + end) || {_K, Proxy} <- orddict:to_list(ProxiesDict)], + Rs = gather_worker_statuses(PidsMons, Timeout*2), + case choose_best_proj(Rs) of + P when P >= BadProj -> + io:format(user, "~s: proj ~P\n", [?MODULE, P, 10]), + EpochID = {P#projection_v1.epoch_number, + P#projection_v1.epoch_csum}, + S#state{bad_proj=undefined, proj=EpochID, epoch_id=EpochID}; + _ -> + timer:sleep(10 + (Count * 20)), + update_proj2(Count + 1, S) + end. + +gather_worker_statuses([], _Timeout) -> + []; +gather_worker_statuses([{Pid,Ref}|Rest], Timeout) -> + receive + {'DOWN', R, process, P, Status} when R == Ref, P == Pid -> + [Status|gather_worker_statuses(Rest, Timeout)] + after Timeout -> + gather_worker_statuses(Rest, 0) + end. + +choose_best_proj(Rs) -> + WorstEpoch = #projection_v1{epoch_number=-1}, + lists:foldl(fun({ok, NewEpoch}, BestEpoch) + when NewEpoch > BestEpoch -> + NewEpoch; + (_, BestEpoch) -> + BestEpoch + end, WorstEpoch, Rs). + +noop() -> + ok. diff --git a/src/machi_proxy_flu1_client.erl b/src/machi_proxy_flu1_client.erl index 74fb116..ce55313 100644 --- a/src/machi_proxy_flu1_client.erl +++ b/src/machi_proxy_flu1_client.erl @@ -70,7 +70,10 @@ quit/1, %% Internal API - write_chunk/5, write_chunk/6 + write_chunk/5, write_chunk/6, + + %% Helpers + stop_proxies/1, start_proxies/1 ]). %% gen_server callbacks @@ -377,3 +380,22 @@ disconnect(#state{sock=Sock, i=#p_srvr{proto_mod=Mod}=_I}=S) -> Mod:disconnect(Sock), S#state{sock=undefined}. + + +stop_proxies(undefined) -> + []; +stop_proxies(ProxiesDict) -> + orddict:fold( + fun(_K, Pid, _Acc) -> + _ = (catch machi_proxy_flu1_client:quit(Pid)) + end, [], ProxiesDict). + +start_proxies(MembersDict) -> + Proxies = orddict:fold( + fun(K, P, Acc) -> + {ok, Pid} = machi_proxy_flu1_client:start_link(P), + [{K, Pid}|Acc] + end, [], orddict:to_list(MembersDict)), + orddict:from_list(Proxies). + + diff --git a/src/machi_util.erl b/src/machi_util.erl index aedf4c6..6d1b2ca 100644 --- a/src/machi_util.erl +++ b/src/machi_util.erl @@ -274,4 +274,3 @@ escript_connect(Host, Port, Timeout) when is_integer(Port) -> {ok, Sock} = gen_tcp:connect(Host, Port, [{active,false}, {mode,binary}, {packet, raw}], Timeout), Sock. - From b0607ae81574132b35dc6f733df07dbe7a0d6b78 Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Mon, 18 May 2015 00:33:15 +0900 Subject: [PATCH 02/13] WIP: starting machi_cr_client:append_chunk* --- src/machi_cr_client.erl | 82 +++++++++++++++++++++++++++++++++++------ 1 file changed, 71 insertions(+), 11 deletions(-) diff --git a/src/machi_cr_client.erl b/src/machi_cr_client.erl index 2de4117..65ecdfa 100644 --- a/src/machi_cr_client.erl +++ b/src/machi_cr_client.erl @@ -65,6 +65,7 @@ terminate/2, code_change/3]). -define(FLU_PC, machi_proxy_flu1_client). +-define(TIMEOUT, 2*1000). -record(state, { members_dict :: p_srvr_dict(), @@ -278,7 +279,46 @@ code_change(_OldVsn, S, _Extra) -> %%%%%%%%%%%%%%%%%%%%%%%%%%% handle_call2({append_chunk_extra, Prefix, Chunk, ChunkExtra}, _From, S) -> - {reply, sorry_dude, S}. + do_append_chunk_extra1(Prefix, Chunk, ChunkExtra, S). + +do_append_chunk_extra1(Prefix, Chunk, ChunkExtra, + #state{epoch_id=EpochID, proj=P, + proxies_dict=PD}=S) -> + #projection_v1{upi=[HeadFLU|RestFLUs]} = P, + case ?FLU_PC:append_chunk_extra(orddict:fetch(HeadFLU, PD), + EpochID, Prefix, Chunk, ChunkExtra, + ?TIMEOUT) of + {ok, {Offset, _Size, File}=_X} -> + io:format(user, "TODO: X ~p\n", [_X]), + do_append_chunk_extra2(RestFLUs, File, Offset, Chunk, + HeadFLU, 1, S); + {error, Change} when Change == bad_epoch; Change == wedged -> + S2 = update_proj(S#state{proj=undefined, bad_proj=P}), + do_append_chunk_extra1(Prefix, Chunk, ChunkExtra, S2) + %% TODO return values here + end. + +do_append_chunk_extra2([], File, Offset, _Chunk, _OldHeadFLU, _OkCount, S) -> + {reply, {ok, {File, Offset}}, S}; +do_append_chunk_extra2([FLU|RestFLUs], File, Offset, Chunk, OldHeadFLU, OkCount, + #state{epoch_id=EpochID, proj=P, + proxies_dict=PD}=S) -> + case ?FLU_PC:write_chunk(orddict:fetch(FLU, PD), + EpochID, File, Offset, Chunk, ?TIMEOUT) of + ok -> + do_append_chunk_extra2(RestFLUs, File, Offset, Chunk, + OldHeadFLU, OkCount + 1, S); + {error, Change} when Change == bad_epoch; Change == wedged -> + S2 = update_proj(S#state{proj=undefined, bad_proj=P}), + #projection_v1{upi=[NewHeadFLU|NewRestFLUs]} = S2#state.proj, + if OkCount == 1, NewHeadFLU == OldHeadFLU -> + do_append_chunk_extra2(NewRestFLUs, File, Offset, Chunk, + OldHeadFLU, OkCount, S2); + true -> + {error, partition} + end + %% TODO return values here + end. update_proj(#state{proj=undefined}=S) -> update_proj2(1, S); @@ -287,18 +327,38 @@ update_proj(S) -> update_proj2(Count, #state{bad_proj=BadProj, proxies_dict=ProxiesDict}=S) -> Timeout = 2*1000, - PidsMons = - [spawn_monitor(fun() -> - exit(catch ?FLU_PC:read_latest_projection( - Proxy, private, Timeout)) - end) || {_K, Proxy} <- orddict:to_list(ProxiesDict)], - Rs = gather_worker_statuses(PidsMons, Timeout*2), + Parent = self(), + Proxies = orddict:to_list(ProxiesDict), + Worker = spawn( + fun() -> + PidsMons = + [spawn_monitor(fun() -> + exit(catch ?FLU_PC:read_latest_projection( + Proxy, private, Timeout)) + end) || {_K, Proxy} <- Proxies], + Rs = gather_worker_statuses(PidsMons, Timeout*2), + Parent ! {res, self(), Rs}, + exit(normal) + end), + Rs = receive {res, Worker, Results} -> Results + after Timeout*2 -> [] + end, + %% TODO: There's a possible bug here when running multiple independent + %% Machi clusters/chains. If our chain used to be [a,b], but our + %% sysadmin has changed our cluster to be [a] and a completely seprate + %% cluster with [b], and if b is reusing the address & port number, + %% then it is possible that choose_best_projs() can incorrectly choose + %% b's projection. case choose_best_proj(Rs) of P when P >= BadProj -> + #projection_v1{epoch_number=Epoch, epoch_csum=CSum, + members_dict=NewMembersDict} = P, io:format(user, "~s: proj ~P\n", [?MODULE, P, 10]), - EpochID = {P#projection_v1.epoch_number, - P#projection_v1.epoch_csum}, - S#state{bad_proj=undefined, proj=EpochID, epoch_id=EpochID}; + EpochID = {Epoch, CSum}, + ?FLU_PC:stop_proxies(ProxiesDict), + NewProxiesDict = ?FLU_PC:start_proxies(NewMembersDict), + S#state{bad_proj=undefined, proj=P, epoch_id=EpochID, + members_dict=NewMembersDict, proxies_dict=NewProxiesDict}; _ -> timer:sleep(10 + (Count * 20)), update_proj2(Count + 1, S) @@ -315,7 +375,7 @@ gather_worker_statuses([{Pid,Ref}|Rest], Timeout) -> end. choose_best_proj(Rs) -> - WorstEpoch = #projection_v1{epoch_number=-1}, + WorstEpoch = #projection_v1{epoch_number=-1,epoch_csum= <<>>}, lists:foldl(fun({ok, NewEpoch}, BestEpoch) when NewEpoch > BestEpoch -> NewEpoch; From a7f53cf21a021e773ce0bb9b158ce3230eb14f6d Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Mon, 18 May 2015 00:59:24 +0900 Subject: [PATCH 03/13] WIP: starting machi_cr_client:append_chunk* --- src/machi_cr_client.erl | 5 ++--- src/machi_flu1.erl | 6 +++++- src/machi_proxy_flu1_client.erl | 8 ++++++++ 3 files changed, 15 insertions(+), 4 deletions(-) diff --git a/src/machi_cr_client.erl b/src/machi_cr_client.erl index 65ecdfa..cf282e2 100644 --- a/src/machi_cr_client.erl +++ b/src/machi_cr_client.erl @@ -289,7 +289,6 @@ do_append_chunk_extra1(Prefix, Chunk, ChunkExtra, EpochID, Prefix, Chunk, ChunkExtra, ?TIMEOUT) of {ok, {Offset, _Size, File}=_X} -> - io:format(user, "TODO: X ~p\n", [_X]), do_append_chunk_extra2(RestFLUs, File, Offset, Chunk, HeadFLU, 1, S); {error, Change} when Change == bad_epoch; Change == wedged -> @@ -298,8 +297,8 @@ do_append_chunk_extra1(Prefix, Chunk, ChunkExtra, %% TODO return values here end. -do_append_chunk_extra2([], File, Offset, _Chunk, _OldHeadFLU, _OkCount, S) -> - {reply, {ok, {File, Offset}}, S}; +do_append_chunk_extra2([], File, Offset, Chunk, _OldHeadFLU, _OkCount, S) -> + {reply, {ok, {Offset, size(Chunk), File}}, S}; do_append_chunk_extra2([FLU|RestFLUs], File, Offset, Chunk, OldHeadFLU, OkCount, #state{epoch_id=EpochID, proj=P, proxies_dict=PD}=S) -> diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl index a830c18..481a19a 100644 --- a/src/machi_flu1.erl +++ b/src/machi_flu1.erl @@ -66,6 +66,8 @@ -include("machi.hrl"). -include("machi_projection.hrl"). +-define(SERVER_CMD_READ_TIMEOUT, 600*1000). + -export([start_link/1, stop/1, update_wedge_state/3]). -export([make_listener_regname/1, make_projection_server_regname/1]). @@ -244,7 +246,9 @@ decode_epoch_id(EpochIDHex) -> net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) -> ok = inet:setopts(Sock, [{packet, line}]), - case gen_tcp:recv(Sock, 0, 600*1000) of + %% TODO: Add testing control knob to adjust this timeout and/or inject + %% timeout condition. + case gen_tcp:recv(Sock, 0, ?SERVER_CMD_READ_TIMEOUT) of {ok, Line} -> %% machi_util:verb("Got: ~p\n", [Line]), PrefixLenLF = byte_size(Line) - 2 - ?EpochIDSpace - 8 -8 - 1, diff --git a/src/machi_proxy_flu1_client.erl b/src/machi_proxy_flu1_client.erl index ce55313..b49da56 100644 --- a/src/machi_proxy_flu1_client.erl +++ b/src/machi_proxy_flu1_client.erl @@ -295,8 +295,16 @@ do_req(Req, S) -> case connected_p(S2) of true -> case Fun() of + ok -> + {ok, S2}; T when element(1, T) == ok -> {T, S2}; + {error, {badmatch, {badmatch, {error, Why}}, _Stk}} + when Why == closed; Why == timeout -> + %% TODO: Infinite recursion isn't + %% good. Exponential backoff might be good. + timer:sleep(500), + do_req(Req, disconnect(S2)); Else -> case get(bad_sock) of Bad when Bad == S2#state.sock -> From 966d2edae88d192c6f703233b38e5b30f75bb7a4 Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Mon, 18 May 2015 15:49:05 +0900 Subject: [PATCH 04/13] WIP: refactoring machi_cr_client:append_chunk* --- src/machi_cr_client.erl | 175 ++++++++++++++++++++++++++++++++-------- 1 file changed, 142 insertions(+), 33 deletions(-) diff --git a/src/machi_cr_client.erl b/src/machi_cr_client.erl index cf282e2..bc3d1e6 100644 --- a/src/machi_cr_client.erl +++ b/src/machi_cr_client.erl @@ -66,6 +66,7 @@ -define(FLU_PC, machi_proxy_flu1_client). -define(TIMEOUT, 2*1000). +-define(MAX_RUNTIME, 5*1000). -record(state, { members_dict :: p_srvr_dict(), @@ -279,43 +280,134 @@ code_change(_OldVsn, S, _Extra) -> %%%%%%%%%%%%%%%%%%%%%%%%%%% handle_call2({append_chunk_extra, Prefix, Chunk, ChunkExtra}, _From, S) -> - do_append_chunk_extra1(Prefix, Chunk, ChunkExtra, S). + do_append_head(Prefix, Chunk, ChunkExtra, 0, os:timestamp(), S). -do_append_chunk_extra1(Prefix, Chunk, ChunkExtra, - #state{epoch_id=EpochID, proj=P, - proxies_dict=PD}=S) -> - #projection_v1{upi=[HeadFLU|RestFLUs]} = P, - case ?FLU_PC:append_chunk_extra(orddict:fetch(HeadFLU, PD), +do_append_head(Prefix, Chunk, ChunkExtra, 0=Depth, STime, S) -> + do_append_head2(Prefix, Chunk, ChunkExtra, Depth + 1, STime, S); +do_append_head(Prefix, Chunk, ChunkExtra, Depth, STime, #state{proj=P}=S) -> + io:format(user, "append sleep1,", []), + sleep_a_while(Depth), + DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, + if DiffMs > ?MAX_RUNTIME -> + {error, partition}; + true -> + %% This is suboptimal for performance: there are some paths + %% through this point where our current projection is good + %% enough. But we're going to try to keep the code as simple + %% as we can for now. + S2 = update_proj(S#state{proj=undefined, bad_proj=P}), + case S2#state.proj of + P2 when P2 == undefined orelse + P2#projection_v1.upi == [] -> + do_append_head(Prefix, Chunk, ChunkExtra, Depth + 1, + STime, S); + _ -> + do_append_head2(Prefix, Chunk, ChunkExtra, Depth + 1, + STime, S) + end + end. + +do_append_head2(Prefix, Chunk, ChunkExtra, Depth, STime, + #state{epoch_id=EpochID, proj=P, proxies_dict=PD}=S) -> + [HeadFLU|RestFLUs] = mutation_flus(P), + Proxy = orddict:fetch(HeadFLU, PD), + io:format(user, "append ~w,", [Proxy]), + case ?FLU_PC:append_chunk_extra(Proxy, EpochID, Prefix, Chunk, ChunkExtra, ?TIMEOUT) of {ok, {Offset, _Size, File}=_X} -> - do_append_chunk_extra2(RestFLUs, File, Offset, Chunk, - HeadFLU, 1, S); - {error, Change} when Change == bad_epoch; Change == wedged -> - S2 = update_proj(S#state{proj=undefined, bad_proj=P}), - do_append_chunk_extra1(Prefix, Chunk, ChunkExtra, S2) - %% TODO return values here + %% io:format(user, "append ~w@~p,~w,", [HeadFLU, File, Offset]), + io:format(user, "append ~w,", [HeadFLU]), + do_append_midtail(RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra, + [HeadFLU], 0, STime, S); + {error, Retry} + when Retry == partition; Retry == bad_epoch; Retry == wedged -> + do_append_head(Prefix, Chunk, ChunkExtra, Depth, STime, S); + {error, written} -> + %% Implicit sequencing + this error = we don't know where this + %% written block is. But we lost a race. Repeat, with a new + %% sequencer assignment. + do_append_head(Prefix, Chunk, ChunkExtra, Depth, STime, S); + {error, not_written} -> + exit({todo_should_never_happen,?MODULE,?LINE, + Prefix,iolist_size(Chunk)}) end. -do_append_chunk_extra2([], File, Offset, Chunk, _OldHeadFLU, _OkCount, S) -> - {reply, {ok, {Offset, size(Chunk), File}}, S}; -do_append_chunk_extra2([FLU|RestFLUs], File, Offset, Chunk, OldHeadFLU, OkCount, - #state{epoch_id=EpochID, proj=P, - proxies_dict=PD}=S) -> - case ?FLU_PC:write_chunk(orddict:fetch(FLU, PD), - EpochID, File, Offset, Chunk, ?TIMEOUT) of - ok -> - do_append_chunk_extra2(RestFLUs, File, Offset, Chunk, - OldHeadFLU, OkCount + 1, S); - {error, Change} when Change == bad_epoch; Change == wedged -> +do_append_midtail(RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra, + Ws, Depth, STime, S) + when RestFLUs == [] orelse Depth == 0 -> + do_append_midtail2(RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra, + Ws, Depth + 1, STime, S); +do_append_midtail(RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra, + Ws, Depth, STime, #state{proj=P}=S) -> + io:format(user, "append sleep2,", []), + sleep_a_while(Depth), + DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, + if DiffMs > ?MAX_RUNTIME -> + {error, partition}; + true -> S2 = update_proj(S#state{proj=undefined, bad_proj=P}), - #projection_v1{upi=[NewHeadFLU|NewRestFLUs]} = S2#state.proj, - if OkCount == 1, NewHeadFLU == OldHeadFLU -> - do_append_chunk_extra2(NewRestFLUs, File, Offset, Chunk, - OldHeadFLU, OkCount, S2); - true -> - {error, partition} + case S2#state.proj of + undefined -> + {error, partition}; + P2 -> + RestFLUs2 = mutation_flus(P2), + case RestFLUs2 -- Ws of + RestFLUs -> + %% None of the writes that we have done so far + %% are to FLUs that are in the RestFLUs2 list. + %% We are pessimistic here and assume that + %% those FLUs are permanently dead. Start + %% over with a new sequencer assignment, at + %% the 2nd have of the impl (we have already + %% slept & refreshed the projection). + do_append_head2(Prefix, Chunk, ChunkExtra, Depth, + STime, S); + RestFLUs3 -> + do_append_midtail2(RestFLUs3, Prefix, File, Offset, + Chunk, ChunkExtra, + Ws, Depth + 1, STime, S) + end end + end. + +do_append_midtail2([], _Prefix, File, Offset, Chunk, + _ChunkExtra, _Ws, _Depth, _STime, S) -> + io:format(user, "ok!\n", []), + {reply, {ok, {Offset, iolist_size(Chunk), File}}, S}; +do_append_midtail2([FLU|RestFLUs]=FLUs, Prefix, File, Offset, Chunk, + ChunkExtra, Ws, Depth, STime, + #state{epoch_id=EpochID, proxies_dict=PD}=S) -> + Proxy = orddict:fetch(FLU, PD), + io:format(user, "write ~w,", [Proxy]), + case ?FLU_PC:write_chunk(Proxy, EpochID, File, Offset, Chunk, ?TIMEOUT) of + ok -> + %% io:format(user, "write ~w@~p~w,", [FLU, File, Offset]), + io:format(user, "write ~w,", [FLU]), + do_append_midtail2(RestFLUs, Prefix, File, Offset, Chunk, + ChunkExtra, [FLU|Ws], Depth, STime, S); + {error, Retry} + when Retry == partition; Retry == bad_epoch; Retry == wedged -> + do_append_midtail(FLUs, Prefix, File, Offset, Chunk, + ChunkExtra, Ws, Depth, STime, S); + {error, written} -> + exit({todo,read_repair,?MODULE,?LINE,File,Offset,iolist_size(Chunk)}) + %% read_repair(P#projection_v1.upi ++ P#projection_v1.repairing, + %% Chunk, Depth, STime, S) + %% Chunk1 = if is_binary(Chunk) -> Chunk; + %% is_list(Chunk) -> list_to_binary(Chunk) + %% end, + %% case ?FLU_PC:read_chunk(Proxy, EpochID, File, Offset, + %% size(Chunk1), ?TIMEOUT) of + %% {ok, Chunk2} when Chunk2 == Chunk1 -> + %% %% Someone has been read-repairing this chunk. + %% %% Keep going. + %% do_append_chunk_extra2(RestFLUs, File, Offset, Chunk, + %% OldHeadFLU, OkCount + 1, S); + %% {error, not_written} -> + %% exit({todo_should_never_happen,?MODULE,?LINE, + %% File,Offset, size(Chunk1)}); + %% { -> %% TODO return values here end. @@ -328,7 +420,7 @@ update_proj2(Count, #state{bad_proj=BadProj, proxies_dict=ProxiesDict}=S) -> Timeout = 2*1000, Parent = self(), Proxies = orddict:to_list(ProxiesDict), - Worker = spawn( + MiddleWorker = spawn( fun() -> PidsMons = [spawn_monitor(fun() -> @@ -339,7 +431,7 @@ update_proj2(Count, #state{bad_proj=BadProj, proxies_dict=ProxiesDict}=S) -> Parent ! {res, self(), Rs}, exit(normal) end), - Rs = receive {res, Worker, Results} -> Results + Rs = receive {res, MiddleWorker, Results} -> Results after Timeout*2 -> [] end, %% TODO: There's a possible bug here when running multiple independent @@ -352,14 +444,14 @@ update_proj2(Count, #state{bad_proj=BadProj, proxies_dict=ProxiesDict}=S) -> P when P >= BadProj -> #projection_v1{epoch_number=Epoch, epoch_csum=CSum, members_dict=NewMembersDict} = P, - io:format(user, "~s: proj ~P\n", [?MODULE, P, 10]), + io:format(user, "~s: proj ~P\n", [?MODULE, P, 15]), EpochID = {Epoch, CSum}, ?FLU_PC:stop_proxies(ProxiesDict), NewProxiesDict = ?FLU_PC:start_proxies(NewMembersDict), S#state{bad_proj=undefined, proj=P, epoch_id=EpochID, members_dict=NewMembersDict, proxies_dict=NewProxiesDict}; _ -> - timer:sleep(10 + (Count * 20)), + sleep_a_while(Count), update_proj2(Count + 1, S) end. @@ -382,5 +474,22 @@ choose_best_proj(Rs) -> BestEpoch end, WorstEpoch, Rs). +mutation_flus(#projection_v1{upi=UPI, repairing=Repairing}) -> + UPI ++ Repairing; +mutation_flus(#state{proj=P}) -> + mutation_flus(P). + +readonly_flus(#projection_v1{upi=UPI}) -> + UPI; +readonly_flus(#state{proj=P}) -> + readonly_flus(P). + +sleep_a_while(0) -> + ok; +sleep_a_while(1) -> + ok; +sleep_a_while(Depth) -> + timer:sleep(30 + trunc(math:pow(1.9, Depth))). + noop() -> ok. From a347722a15fae2922fbfe37d4b0cd706a03c54d7 Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Mon, 18 May 2015 17:32:22 +0900 Subject: [PATCH 05/13] Fix {error,not_written} type bugs in chmgr --- src/machi_chain_manager1.erl | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/machi_chain_manager1.erl b/src/machi_chain_manager1.erl index 6aa1c90..594812d 100644 --- a/src/machi_chain_manager1.erl +++ b/src/machi_chain_manager1.erl @@ -466,11 +466,12 @@ cl_read_latest_projection(ProjectionType, AllHosed, S) -> {All_queried_list, FLUsRs, S2} = read_latest_projection_call_only(ProjectionType, AllHosed, S), - rank_and_sort_projections_with_extra(All_queried_list, FLUsRs, S2). + rank_and_sort_projections_with_extra(All_queried_list, FLUsRs, + ProjectionType, S2). -rank_and_sort_projections_with_extra(All_queried_list, FLUsRs, +rank_and_sort_projections_with_extra(All_queried_list, FLUsRs, ProjectionType, #ch_mgr{name=MyName,proj=CurrentProj}=S) -> - UnwrittenRs = [x || {_, error_unwritten} <- FLUsRs], + UnwrittenRs = [x || {_, {error, not_written}} <- FLUsRs], Ps = [Proj || {_FLU, Proj} <- FLUsRs, is_record(Proj, projection_v1)], BadAnswerFLUs = [FLU || {FLU, Answer} <- FLUsRs, not is_record(Answer, projection_v1)], @@ -489,7 +490,7 @@ rank_and_sort_projections_with_extra(All_queried_list, FLUsRs, {trans_all_hosed, []}, {trans_all_flap_counts, []}], {not_unanimous, NoneProj, Extra2, S}; - UnwrittenRs /= [] -> + ProjectionType == public, UnwrittenRs /= [] -> {needs_repair, FLUsRs, [flarfus], S}; true -> [{_Rank, BestProj}|_] = rank_and_sort_projections(Ps, CurrentProj), @@ -516,7 +517,7 @@ rank_and_sort_projections_with_extra(All_queried_list, FLUsRs, end. do_read_repair(FLUsRs, _Extra, #ch_mgr{proj=CurrentProj} = S) -> - Unwrittens = [x || {_FLU, error_unwritten} <- FLUsRs], + Unwrittens = [x || {_FLU, {error, not_written}} <- FLUsRs], Ps = [Proj || {_FLU, Proj} <- FLUsRs, is_record(Proj, projection_v1)], if Unwrittens == [] orelse Ps == [] -> {nothing_to_do, S}; From 185c670b2ff6bf4d78c4941ee6bc7cb0e214d80d Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Mon, 18 May 2015 19:06:06 +0900 Subject: [PATCH 06/13] WIP: refactoring machi_cr_client:append_chunk* --- src/machi_flu1.erl | 4 +- src/machi_flu1_client.erl | 64 ++++--- src/machi_projection_store.erl | 46 ++--- src/machi_proxy_flu1_client.erl | 37 ++-- test/machi_chain_manager1_converge_demo.erl | 4 +- test/machi_flu1_test.erl | 4 +- test/machi_proxy_flu1_client_test.erl | 181 +++++++++++++++++++- 7 files changed, 270 insertions(+), 70 deletions(-) diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl index 481a19a..c7bdc8d 100644 --- a/src/machi_flu1.erl +++ b/src/machi_flu1.erl @@ -763,9 +763,9 @@ do_projection_command(Sock, LenHex, S) -> _ = (catch gen_tcp:send(Sock, [<<"ERROR ">>, WHA, <<"\n">>])) end. -handle_projection_command({get_latest_epoch, ProjType}, +handle_projection_command({get_latest_epochid, ProjType}, #state{proj_store=ProjStore}) -> - machi_projection_store:get_latest_epoch(ProjStore, ProjType); + machi_projection_store:get_latest_epochid(ProjStore, ProjType); handle_projection_command({read_latest_projection, ProjType}, #state{proj_store=ProjStore}) -> machi_projection_store:read_latest_projection(ProjStore, ProjType); diff --git a/src/machi_flu1_client.erl b/src/machi_flu1_client.erl index 80e1b6a..611e3d3 100644 --- a/src/machi_flu1_client.erl +++ b/src/machi_flu1_client.erl @@ -25,6 +25,8 @@ -include("machi.hrl"). -include("machi_projection.hrl"). +-define(HARD_TIMEOUT, 2500). + -export([ %% File API append_chunk/4, append_chunk/5, @@ -35,7 +37,7 @@ wedge_status/1, wedge_status/2, %% Projection API - get_latest_epoch/2, get_latest_epoch/3, + get_latest_epochid/2, get_latest_epochid/3, read_latest_projection/2, read_latest_projection/3, read_projection/3, read_projection/4, write_projection/3, write_projection/4, @@ -223,22 +225,21 @@ wedge_status(Host, TcpPort) when is_integer(TcpPort) -> %% @doc Get the latest epoch number + checksum from the FLU's projection store. --spec get_latest_epoch(port_wrap(), projection_type()) -> +-spec get_latest_epochid(port_wrap(), projection_type()) -> {ok, epoch_id()} | {error, term()}. -get_latest_epoch(Sock, ProjType) +get_latest_epochid(Sock, ProjType) when ProjType == 'public' orelse ProjType == 'private' -> - get_latest_epoch2(Sock, ProjType). + get_latest_epochid2(Sock, ProjType). %% @doc Get the latest epoch number + checksum from the FLU's projection store. --spec get_latest_epoch(inet_host(), inet_port(), - projection_type()) -> +-spec get_latest_epochid(inet_host(), inet_port(), projection_type()) -> {ok, epoch_id()} | {error, term()}. -get_latest_epoch(Host, TcpPort, ProjType) +get_latest_epochid(Host, TcpPort, ProjType) when ProjType == 'public' orelse ProjType == 'private' -> Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), try - get_latest_epoch2(Sock, ProjType) + get_latest_epochid2(Sock, ProjType) after disconnect(Sock) end. @@ -501,15 +502,14 @@ read_chunk2(Sock, EpochID, File0, Offset, Size) -> SizeHex = machi_util:int_to_hexbin(Size, 32), CmdLF = [$R, 32, EpochIDHex, PrefixHex, SizeHex, File, 10], ok = w_send(Sock, CmdLF), + ok = w_setopts(Sock, [{packet, raw}]), case w_recv(Sock, 3) of {ok, <<"OK\n">>} -> {ok, _Chunk}=Res = w_recv(Sock, Size), Res; {ok, Else} -> - {ok, OldOpts} = w_getopts(Sock, [packet]), ok = w_setopts(Sock, [{packet, line}]), {ok, Else2} = w_recv(Sock, 0), - ok = w_setopts(Sock, OldOpts), case Else of <<"ERA">> -> {error, todo_erasure_coded}; %% escript_cc_parse_ec_info(Sock, Line, Else2); @@ -536,6 +536,9 @@ read_chunk2(Sock, EpochID, File0, Offset, Size) -> throw:Error -> put(bad_sock, Sock), Error; + error:{case_clause,_}=Noo -> + put(bad_sock, Sock), + {error, {badmatch, Noo, erlang:get_stacktrace()}}; error:{badmatch,_}=BadMatch -> put(bad_sock, Sock), {error, {badmatch, BadMatch, erlang:get_stacktrace()}} @@ -561,7 +564,11 @@ list2(Sock, EpochID) -> catch throw:Error -> Error; + error:{case_clause,_}=Noo -> + put(bad_sock, Sock), + {error, {badmatch, Noo, erlang:get_stacktrace()}}; error:{badmatch,_}=BadMatch -> + put(bad_sock, Sock), {error, {badmatch, BadMatch}} end. @@ -593,7 +600,11 @@ wedge_status2(Sock) -> catch throw:Error -> Error; + error:{case_clause,_}=Noo -> + put(bad_sock, Sock), + {error, {badmatch, Noo, erlang:get_stacktrace()}}; error:{badmatch,_}=BadMatch -> + put(bad_sock, Sock), {error, {badmatch, BadMatch}} end. @@ -620,16 +631,15 @@ checksum_list2(Sock, EpochID, File) -> {ok, <<"ERROR WEDGED", _/binary>>} -> {error, wedged}; {ok, Else} -> - throw({server_protocol_error, Else}); - {error, closed} -> - throw({error, closed}); - Else -> - throw(Else) + throw({server_protocol_error, Else}) end catch throw:Error -> put(bad_sock, Sock), Error; + error:{case_clause,_}=Noo -> + put(bad_sock, Sock), + {error, {badmatch, Noo, erlang:get_stacktrace()}}; error:{badmatch,_}=BadMatch -> put(bad_sock, Sock), {error, {badmatch, BadMatch}} @@ -724,6 +734,9 @@ delete_migration2(Sock, EpochID, File) -> throw:Error -> put(bad_sock, Sock), Error; + error:{case_clause,_}=Noo -> + put(bad_sock, Sock), + {error, {badmatch, Noo, erlang:get_stacktrace()}}; error:{badmatch,_}=BadMatch -> put(bad_sock, Sock), {error, {badmatch, BadMatch}} @@ -754,13 +767,16 @@ trunc_hack2(Sock, EpochID, File) -> throw:Error -> put(bad_sock, Sock), Error; + error:{case_clause,_}=Noo -> + put(bad_sock, Sock), + {error, {badmatch, Noo, erlang:get_stacktrace()}}; error:{badmatch,_}=BadMatch -> put(bad_sock, Sock), {error, {badmatch, BadMatch}} end. -get_latest_epoch2(Sock, ProjType) -> - ProjCmd = {get_latest_epoch, ProjType}, +get_latest_epochid2(Sock, ProjType) -> + ProjCmd = {get_latest_epochid, ProjType}, do_projection_common(Sock, ProjCmd). read_latest_projection2(Sock, ProjType) -> @@ -804,14 +820,15 @@ do_projection_common(Sock, ProjCmd) -> binary_to_term(ResBin); Else -> {error, Else} - end; - {error, _} = Bad -> - throw(Bad) + end end catch throw:Error -> put(bad_sock, Sock), Error; + error:{case_clause,_}=Noo -> + put(bad_sock, Sock), + {error, {badmatch, Noo, erlang:get_stacktrace()}}; error:{badmatch,_}=BadMatch -> put(bad_sock, Sock), {error, {badmatch, BadMatch, erlang:get_stacktrace()}} @@ -823,7 +840,7 @@ w_connect(#p_srvr{proto_mod=?MODULE, address=Host, port=Port, props=Props})-> try case proplists:get_value(session_proto, Props, tcp) of tcp -> - Sock = machi_util:connect(Host, Port), + Sock = machi_util:connect(Host, Port, ?HARD_TIMEOUT), {w,tcp,Sock}; %% sctp -> %% %% TODO: not implemented @@ -845,14 +862,11 @@ w_close({w,tcp,Sock}) -> ok. w_recv({w,tcp,Sock}, Amt) -> - gen_tcp:recv(Sock, Amt). + gen_tcp:recv(Sock, Amt, ?HARD_TIMEOUT). w_send({w,tcp,Sock}, IoData) -> gen_tcp:send(Sock, IoData). -w_getopts({w,tcp,Sock}, Opts) -> - inet:getopts(Sock, Opts). - w_setopts({w,tcp,Sock}, Opts) -> inet:setopts(Sock, Opts). diff --git a/src/machi_projection_store.erl b/src/machi_projection_store.erl index 9ddeb0d..8818588 100644 --- a/src/machi_projection_store.erl +++ b/src/machi_projection_store.erl @@ -43,7 +43,7 @@ %% API -export([ start_link/3, - get_latest_epoch/2, get_latest_epoch/3, + get_latest_epochid/2, get_latest_epochid/3, read_latest_projection/2, read_latest_projection/3, read/3, read/4, write/3, write/4, @@ -62,8 +62,8 @@ public_dir = "" :: string(), private_dir = "" :: string(), wedge_notify_pid :: pid() | atom(), - max_public_epoch = ?NO_EPOCH :: {-1 | non_neg_integer(), binary()}, - max_private_epoch = ?NO_EPOCH :: {-1 | non_neg_integer(), binary()} + max_public_epochid = ?NO_EPOCH :: {-1 | non_neg_integer(), binary()}, + max_private_epochid = ?NO_EPOCH :: {-1 | non_neg_integer(), binary()} }). %% @doc Start a new projection store server. @@ -80,15 +80,15 @@ start_link(RegName, DataDir, NotifyWedgeStateChanges) -> %% @doc Fetch the latest epoch number + checksum for type `ProjType'. -get_latest_epoch(PidSpec, ProjType) -> - get_latest_epoch(PidSpec, ProjType, infinity). +get_latest_epochid(PidSpec, ProjType) -> + get_latest_epochid(PidSpec, ProjType, infinity). %% @doc Fetch the latest epoch number + checksum for type `ProjType'. %% projection. -get_latest_epoch(PidSpec, ProjType, Timeout) +get_latest_epochid(PidSpec, ProjType, Timeout) when ProjType == 'public' orelse ProjType == 'private' -> - g_call(PidSpec, {get_latest_epoch, ProjType}, Timeout). + g_call(PidSpec, {get_latest_epochid, ProjType}, Timeout). %% @doc Fetch the latest projection record for type `ProjType'. @@ -168,25 +168,27 @@ init([DataDir, NotifyWedgeStateChanges]) -> PrivateDir = machi_util:make_projection_filename(DataDir, "private"), ok = filelib:ensure_dir(PublicDir ++ "/ignored"), ok = filelib:ensure_dir(PrivateDir ++ "/ignored"), - MaxPublicEpoch = find_max_epoch(PublicDir), - MaxPrivateEpoch = find_max_epoch(PrivateDir), + MbEpoch = find_max_epochid(PublicDir), + %% MbEpoch = {Mb#projection_v1.epoch_number, Mb#projection_v1.epoch_csum}, + MvEpoch = find_max_epochid(PrivateDir), + %% MvEpoch = {Mv#projection_v1.epoch_number, Mv#projection_v1.epoch_csum}, {ok, #state{public_dir=PublicDir, private_dir=PrivateDir, wedge_notify_pid=NotifyWedgeStateChanges, - max_public_epoch=MaxPublicEpoch, - max_private_epoch=MaxPrivateEpoch}}. + max_public_epochid=MbEpoch, + max_private_epochid=MvEpoch}}. -handle_call({{get_latest_epoch, ProjType}, LC1}, _From, S) -> +handle_call({{get_latest_epochid, ProjType}, LC1}, _From, S) -> LC2 = lclock_update(LC1), - EpochId = if ProjType == public -> S#state.max_public_epoch; - ProjType == private -> S#state.max_private_epoch + EpochId = if ProjType == public -> S#state.max_public_epochid; + ProjType == private -> S#state.max_private_epochid end, {reply, {{ok, EpochId}, LC2}, S}; handle_call({{read_latest_projection, ProjType}, LC1}, _From, S) -> LC2 = lclock_update(LC1), - {EpochNum, _CSum} = if ProjType == public -> S#state.max_public_epoch; - ProjType == private -> S#state.max_private_epoch + {EpochNum, _CSum} = if ProjType == public -> S#state.max_public_epochid; + ProjType == private -> S#state.max_private_epochid end, {Reply, NewS} = do_proj_read(ProjType, EpochNum, S), {reply, {Reply, LC2}, NewS}; @@ -268,7 +270,7 @@ do_proj_write(ProjType, #projection_v1{epoch_number=Epoch}=Proj, S) -> EffectiveEpochId = {EffectiveEpoch, EffectiveProj#projection_v1.epoch_csum}, %% NewS = if ProjType == public, - Epoch > element(1, S#state.max_public_epoch) -> + Epoch > element(1, S#state.max_public_epochid) -> if Epoch == EffectiveEpoch -> %% This is a regular projection, i.e., %% does not have an inner proj. @@ -281,13 +283,13 @@ do_proj_write(ProjType, #projection_v1{epoch_number=Epoch}=Proj, S) -> %% not bother wedging. ok end, - S#state{max_public_epoch=EpochId}; + S#state{max_public_epochid=EpochId}; ProjType == private, - Epoch > element(1, S#state.max_private_epoch) -> + Epoch > element(1, S#state.max_private_epochid) -> update_wedge_state( S#state.wedge_notify_pid, false, EffectiveEpochId), - S#state{max_private_epoch=EpochId}; + S#state{max_private_epochid=EpochId}; true -> S end, @@ -344,14 +346,14 @@ find_all(Dir) -> Fs = filelib:wildcard("*", Dir), lists:sort([name2epoch(F) || F <- Fs]). -find_max_epoch(Dir) -> +find_max_epochid(Dir) -> Fs = lists:sort(filelib:wildcard("*", Dir)), if Fs == [] -> ?NO_EPOCH; true -> EpochNum = name2epoch(lists:last(Fs)), {{ok, Proj}, _} = do_proj_read(proj_type_ignored, EpochNum, Dir), - {EpochNum, Proj} + {EpochNum, Proj#projection_v1.epoch_csum} end. %%%%%%%%%%%%%%%%%%%%%%%%%%% diff --git a/src/machi_proxy_flu1_client.erl b/src/machi_proxy_flu1_client.erl index b49da56..79cff36 100644 --- a/src/machi_proxy_flu1_client.erl +++ b/src/machi_proxy_flu1_client.erl @@ -59,7 +59,7 @@ %% %% Projection API get_epoch_id/1, get_epoch_id/2, - get_latest_epoch/2, get_latest_epoch/3, + get_latest_epochid/2, get_latest_epochid/3, read_latest_projection/2, read_latest_projection/3, read_projection/3, read_projection/4, write_projection/3, write_projection/4, @@ -176,13 +176,13 @@ get_epoch_id(PidSpec, Timeout) -> %% @doc Get the latest epoch number + checksum from the FLU's projection store. -get_latest_epoch(PidSpec, ProjType) -> - get_latest_epoch(PidSpec, ProjType, infinity). +get_latest_epochid(PidSpec, ProjType) -> + get_latest_epochid(PidSpec, ProjType, infinity). %% @doc Get the latest epoch number + checksum from the FLU's projection store. -get_latest_epoch(PidSpec, ProjType, Timeout) -> - gen_server:call(PidSpec, {req, {get_latest_epoch, ProjType}}, +get_latest_epochid(PidSpec, ProjType, Timeout) -> + gen_server:call(PidSpec, {req, {get_latest_epochid, ProjType}}, Timeout). %% @doc Get the latest projection from the FLU's projection store for `ProjType' @@ -290,6 +290,9 @@ code_change(_OldVsn, S, _Extra) -> %%%%%%%%%%%%%%%%%%%%%%%%%%% do_req(Req, S) -> + do_req(Req, 1, S). + +do_req(Req, Depth, S) -> S2 = try_connect(S), Fun = make_req_fun(Req, S2), case connected_p(S2) of @@ -299,24 +302,26 @@ do_req(Req, S) -> {ok, S2}; T when element(1, T) == ok -> {T, S2}; - {error, {badmatch, {badmatch, {error, Why}}, _Stk}} - when Why == closed; Why == timeout -> - %% TODO: Infinite recursion isn't - %% good. Exponential backoff might be good. - timer:sleep(500), - do_req(Req, disconnect(S2)); - Else -> + %% {error, {badmatch, {badmatch, {error, Why}=TheErr}, _Stk}} + %% when Why == closed; Why == timeout -> + %% do_req_retry(Req, Depth, TheErr, S2); + TheErr -> case get(bad_sock) of Bad when Bad == S2#state.sock -> - {Else, disconnect(S2)}; + do_req_retry(Req, Depth, TheErr, S2); _ -> - {Else, S2} + {TheErr, S2} end end; false -> {{error, partition}, S2} end. +do_req_retry(_Req, 2, Err, S) -> + {Err, disconnect(S)}; +do_req_retry(Req, Depth, _Err, S) -> + do_req(Req, Depth + 1, try_connect(disconnect(S))). + make_req_fun({append_chunk, EpochID, Prefix, Chunk}, #state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) -> fun() -> Mod:append_chunk(Sock, EpochID, Prefix, Chunk) end; @@ -350,9 +355,9 @@ make_req_fun({get_epoch_id}, Error end end; -make_req_fun({get_latest_epoch, ProjType}, +make_req_fun({get_latest_epochid, ProjType}, #state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) -> - fun() -> Mod:get_latest_epoch(Sock, ProjType) end; + fun() -> Mod:get_latest_epochid(Sock, ProjType) end; make_req_fun({read_latest_projection, ProjType}, #state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) -> fun() -> Mod:read_latest_projection(Sock, ProjType) end; diff --git a/test/machi_chain_manager1_converge_demo.erl b/test/machi_chain_manager1_converge_demo.erl index af14870..93e73eb 100644 --- a/test/machi_chain_manager1_converge_demo.erl +++ b/test/machi_chain_manager1_converge_demo.erl @@ -378,10 +378,10 @@ todo_why_does_this_crash_sometimes(FLUName, FLU, PPPepoch) -> end. private_projections_are_stable(Namez, PollFunc) -> - Private1 = [?FLU_PC:get_latest_epoch(FLU, private) || + Private1 = [?FLU_PC:get_latest_epochid(FLU, private) || {_Name, FLU} <- Namez], PollFunc(5, 1, 10), - Private2 = [?FLU_PC:get_latest_epoch(FLU, private) || + Private2 = [?FLU_PC:get_latest_epochid(FLU, private) || {_Name, FLU} <- Namez], true = (Private1 == Private2). diff --git a/test/machi_flu1_test.erl b/test/machi_flu1_test.erl index 48e0831..a3be0b1 100644 --- a/test/machi_flu1_test.erl +++ b/test/machi_flu1_test.erl @@ -149,7 +149,7 @@ flu_projection_smoke_test() -> FLU1 = setup_test_flu(projection_test_flu, TcpPort, DataDir), try [begin - {ok, {0,_}} = ?FLU_C:get_latest_epoch(Host, TcpPort, T), + {ok, {0,_}} = ?FLU_C:get_latest_epochid(Host, TcpPort, T), {error, not_written} = ?FLU_C:read_latest_projection(Host, TcpPort, T), {ok, []} = ?FLU_C:list_all_projections(Host, TcpPort, T), @@ -160,7 +160,7 @@ flu_projection_smoke_test() -> ok = ?FLU_C:write_projection(Host, TcpPort, T, P1), {error, written} = ?FLU_C:write_projection(Host, TcpPort, T, P1), {ok, P1} = ?FLU_C:read_projection(Host, TcpPort, T, 1), - {ok, {1,_}} = ?FLU_C:get_latest_epoch(Host, TcpPort, T), + {ok, {1,_}} = ?FLU_C:get_latest_epochid(Host, TcpPort, T), {ok, P1} = ?FLU_C:read_latest_projection(Host, TcpPort, T), {ok, [1]} = ?FLU_C:list_all_projections(Host, TcpPort, T), {ok, [P1]} = ?FLU_C:get_all_projections(Host, TcpPort, T), diff --git a/test/machi_proxy_flu1_client_test.erl b/test/machi_proxy_flu1_client_test.erl index 4b0c7c7..5b723fe 100644 --- a/test/machi_proxy_flu1_client_test.erl +++ b/test/machi_proxy_flu1_client_test.erl @@ -74,7 +74,7 @@ api_smoke_test() -> {error, no_such_file} = ?MUT:checksum_list(Prox1, FakeEpoch, BadFile), {ok, [_|_]} = ?MUT:list_files(Prox1, FakeEpoch), {ok, {false, _}} = ?MUT:wedge_status(Prox1), - {ok, FakeEpoch} = ?MUT:get_latest_epoch(Prox1, public), + {ok, FakeEpoch} = ?MUT:get_latest_epochid(Prox1, public), {error, not_written} = ?MUT:read_latest_projection(Prox1, public), {error, not_written} = ?MUT:read_projection(Prox1, public, 44), P_a = #p_srvr{name=a, address="localhost", port=6622}, @@ -83,6 +83,7 @@ api_smoke_test() -> {ok, P1} = ?MUT:read_projection(Prox1, public, 1), {ok, [P1]} = ?MUT:get_all_projections(Prox1, public), {ok, [1]} = ?MUT:list_all_projections(Prox1, public), + ok after _ = (catch ?MUT:quit(Prox1)) @@ -91,5 +92,183 @@ api_smoke_test() -> (catch machi_flu1:stop(FLU1)), (catch machi_flu1:stop(get(flu_pid))) end. + +flu_restart_test() -> + RegName = api_smoke_flu, + Host = "localhost", + TcpPort = 57125, + DataDir = "./data.api_smoke_flu2", + W_props = [{initial_wedged, false}], + erase(flu_pid), + put(flu_pid, []), + FLU1 = machi_flu1_test:setup_test_flu(RegName, TcpPort, DataDir, + W_props), + put(flu_pid, [FLU1|get(flu_pid)]), + + try + I = #p_srvr{name=RegName, address=Host, port=TcpPort}, + {ok, Prox1} = ?MUT:start_link(I), + try + FakeEpoch = ?DUMMY_PV1_EPOCH, + Data = <<"data!">>, + {ok, {Off1,Size1,File1}} = ?MUT:append_chunk(Prox1, + FakeEpoch, <<"prefix">>, Data, + infinity), + P_a = #p_srvr{name=a, address="localhost", port=6622}, + P1 = machi_projection:new(1, a, [P_a], [], [a], [], []), + EpochID = {P1#projection_v1.epoch_number, + P1#projection_v1.epoch_csum}, + ok = ?MUT:write_projection(Prox1, public, P1), + ok = ?MUT:write_projection(Prox1, private, P1), + {ok, EpochID} = ?MUT:get_epoch_id(Prox1), + {ok, EpochID} = ?MUT:get_latest_epochid(Prox1, public), + {ok, EpochID} = ?MUT:get_latest_epochid(Prox1, private), + ok = machi_flu1:stop(FLU1), timer:sleep(50), + + %% Now that the last proxy op was successful and only + %% after did we stop the FLU, let's check that both the + %% 1st & 2nd ops-via-proxy after FLU is restarted are + %% successful. And immediately after stopping the FLU, + %% both 1st & 2nd ops-via-proxy should always fail. + %% + %% Some of the expectations have unbound variables, which + %% makes the code a bit convoluted. (No LFE or + %% Elixir macros here, alas, they'd be useful.) + + ExpectedOps = + [ + fun(run) -> {ok, EpochID} = ?MUT:get_epoch_id(Prox1), + ok; + (line) -> io:format("line ~p, ", [?LINE]); + (stop) -> ?MUT:get_epoch_id(Prox1) end, + fun(run) -> {ok, EpochID} = + ?MUT:get_latest_epochid(Prox1, public), + ok; + (line) -> io:format("line ~p, ", [?LINE]); + (stop) -> ?MUT:get_latest_epochid(Prox1, public) + end, + fun(run) -> {ok, EpochID} = + ?MUT:get_latest_epochid(Prox1, private), + ok; + (line) -> io:format("line ~p, ", [?LINE]); + (stop) -> ?MUT:get_latest_epochid(Prox1, private) + end, + fun(run) -> {ok, P1} = + ?MUT:read_projection(Prox1, public, 1), + ok; + (line) -> io:format("line ~p, ", [?LINE]); + (stop) -> ?MUT:read_projection(Prox1, public, 1) + end, + fun(run) -> {ok, P1} = + ?MUT:read_projection(Prox1, private, 1), + ok; + (line) -> io:format("line ~p, ", [?LINE]); + (stop) -> ?MUT:read_projection(Prox1, private, 1) + end, + fun(run) -> {error, not_written} = + ?MUT:read_projection(Prox1, private, 7), + ok; + (line) -> io:format("line ~p, ", [?LINE]); + (stop) -> ?MUT:read_projection(Prox1, private, 7) + end, + fun(run) -> {error, written} = + ?MUT:write_projection(Prox1, public, P1), + ok; + (line) -> io:format("line ~p, ", [?LINE]); + (stop) -> ?MUT:write_projection(Prox1, public, P1) + end, + fun(run) -> {error, written} = + ?MUT:write_projection(Prox1, private, P1), + ok; + (line) -> io:format("line ~p, ", [?LINE]); + (stop) -> ?MUT:write_projection(Prox1, private, P1) + end, + fun(run) -> {ok, [_]} = + ?MUT:get_all_projections(Prox1, public), + ok; + (line) -> io:format("line ~p, ", [?LINE]); + (stop) -> ?MUT:get_all_projections(Prox1, public) + end, + fun(run) -> {ok, [_]} = + ?MUT:get_all_projections(Prox1, private), + ok; + (line) -> io:format("line ~p, ", [?LINE]); + (stop) -> ?MUT:get_all_projections(Prox1, private) + end, + fun(run) -> {ok, {_,_,_}} = + ?MUT:append_chunk(Prox1, FakeEpoch, + <<"prefix">>, Data, infinity), + ok; + (line) -> io:format("line ~p, ", [?LINE]); + (stop) -> ?MUT:append_chunk(Prox1, FakeEpoch, + <<"prefix">>, Data, infinity) + end, + fun(run) -> {ok, {_,_,_}} = + ?MUT:append_chunk_extra(Prox1, FakeEpoch, + <<"prefix">>, Data, 42, infinity), + ok; + (line) -> io:format("line ~p, ", [?LINE]); + (stop) -> ?MUT:append_chunk_extra(Prox1, FakeEpoch, + <<"prefix">>, Data, 42, infinity) + end, + fun(run) -> {ok, Data} = + ?MUT:read_chunk(Prox1, FakeEpoch, + File1, Off1, Size1), + ok; + (line) -> io:format("line ~p, ", [?LINE]); + (stop) -> ?MUT:read_chunk(Prox1, FakeEpoch, + File1, Off1, Size1) + end, + fun(run) -> {ok, _} = + ?MUT:checksum_list(Prox1, FakeEpoch, File1), + ok; + (line) -> io:format("line ~p, ", [?LINE]); + (stop) -> ?MUT:checksum_list(Prox1, FakeEpoch, File1) + end, + fun(run) -> {ok, _} = + ?MUT:list_files(Prox1, FakeEpoch), + ok; + (line) -> io:format("line ~p, ", [?LINE]); + (stop) -> ?MUT:list_files(Prox1, FakeEpoch) + end, + fun(run) -> {ok, _} = + ?MUT:wedge_status(Prox1), + ok; + (line) -> io:format("line ~p, ", [?LINE]); + (stop) -> ?MUT:wedge_status(Prox1) + end, + %% NOTE: When write-once enforcement is enabled, this test + %% will fail: change ok -> {error, written} + fun(run) -> %% {error, written} = + ok = + ?MUT:write_chunk(Prox1, FakeEpoch, File1, Off1, + Data, infinity), + ok; + (line) -> io:format("line ~p, ", [?LINE]); + (stop) -> ?MUT:write_chunk(Prox1, FakeEpoch, File1, Off1, + Data, infinity) + end + ], + + [begin + FLU2 = machi_flu1_test:setup_test_flu( + RegName, TcpPort, DataDir, + [save_data_dir|W_props]), + put(flu_pid, [FLU2|get(flu_pid)]), + _ = Fun(line), + ok = Fun(run), + ok = Fun(run), + ok = machi_flu1:stop(FLU2), + {error, partition} = Fun(stop), + {error, partition} = Fun(stop), + ok + end || Fun <- ExpectedOps ], + ok + after + _ = (catch ?MUT:quit(Prox1)) + end + after + [catch machi_flu1:stop(Pid) || Pid <- get(flu_pid)] + end. -endif. % TEST From f7274e7106f9ea1688891bfe5ddd553b69fb9869 Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Mon, 18 May 2015 23:26:21 +0900 Subject: [PATCH 07/13] WIP: brute-force read-repair --- src/machi_cr_client.erl | 204 +++++++++++++++++++++++++------- src/machi_flu1.erl | 13 +- src/machi_flu1_client.erl | 4 +- src/machi_proxy_flu1_client.erl | 1 + 4 files changed, 178 insertions(+), 44 deletions(-) diff --git a/src/machi_cr_client.erl b/src/machi_cr_client.erl index bc3d1e6..be4aacb 100644 --- a/src/machi_cr_client.erl +++ b/src/machi_cr_client.erl @@ -38,7 +38,7 @@ %% File API append_chunk/3, append_chunk/4, append_chunk_extra/4, append_chunk_extra/5, -%% read_chunk/5, read_chunk/6, + read_chunk/4, read_chunk/5, %% checksum_list/3, checksum_list/4, %% list_files/2, list_files/3, %% wedge_status/1, wedge_status/2, @@ -66,7 +66,7 @@ -define(FLU_PC, machi_proxy_flu1_client). -define(TIMEOUT, 2*1000). --define(MAX_RUNTIME, 5*1000). +-define(MAX_RUNTIME, 8*1000). -record(state, { members_dict :: p_srvr_dict(), @@ -112,14 +112,14 @@ append_chunk_extra(PidSpec, Prefix, Chunk, ChunkExtra, Timeout) -> %% %% @doc Read a chunk of data of size `Size' from `File' at `Offset'. -%% read_chunk(PidSpec, EpochID, File, Offset, Size) -> -%% read_chunk(PidSpec, EpochID, File, Offset, Size, infinity). +read_chunk(PidSpec, File, Offset, Size) -> + read_chunk(PidSpec, File, Offset, Size, infinity). -%% %% @doc Read a chunk of data of size `Size' from `File' at `Offset'. +%% @doc Read a chunk of data of size `Size' from `File' at `Offset'. -%% read_chunk(PidSpec, EpochID, File, Offset, Size, Timeout) -> -%% gen_server:call(PidSpec, {req, {read_chunk, EpochID, File, Offset, Size}}, -%% Timeout). +read_chunk(PidSpec, File, Offset, Size, Timeout) -> + gen_server:call(PidSpec, {req, {read_chunk, File, Offset, Size}}, + Timeout). %% %% @doc Fetch the list of chunk checksums for `File'. @@ -268,6 +268,7 @@ handle_cast(_Msg, S) -> {noreply, S}. handle_info(_Info, S) -> + io:format(user, "~s:handle_info: ~p\n", [?MODULE, _Info]), {noreply, S}. terminate(_Reason, #state{proxies_dict=ProxiesDict}=_S) -> @@ -280,12 +281,14 @@ code_change(_OldVsn, S, _Extra) -> %%%%%%%%%%%%%%%%%%%%%%%%%%% handle_call2({append_chunk_extra, Prefix, Chunk, ChunkExtra}, _From, S) -> - do_append_head(Prefix, Chunk, ChunkExtra, 0, os:timestamp(), S). + do_append_head(Prefix, Chunk, ChunkExtra, 0, os:timestamp(), S); +handle_call2({read_chunk, File, Offset, Size}, _From, S) -> + do_read_chunk(File, Offset, Size, 0, os:timestamp(), S). do_append_head(Prefix, Chunk, ChunkExtra, 0=Depth, STime, S) -> do_append_head2(Prefix, Chunk, ChunkExtra, Depth + 1, STime, S); do_append_head(Prefix, Chunk, ChunkExtra, Depth, STime, #state{proj=P}=S) -> - io:format(user, "append sleep1,", []), + %% io:format(user, "head sleep1,", []), sleep_a_while(Depth), DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, if DiffMs > ?MAX_RUNTIME -> @@ -300,10 +303,10 @@ do_append_head(Prefix, Chunk, ChunkExtra, Depth, STime, #state{proj=P}=S) -> P2 when P2 == undefined orelse P2#projection_v1.upi == [] -> do_append_head(Prefix, Chunk, ChunkExtra, Depth + 1, - STime, S); + STime, S2); _ -> do_append_head2(Prefix, Chunk, ChunkExtra, Depth + 1, - STime, S) + STime, S2) end end. @@ -311,13 +314,11 @@ do_append_head2(Prefix, Chunk, ChunkExtra, Depth, STime, #state{epoch_id=EpochID, proj=P, proxies_dict=PD}=S) -> [HeadFLU|RestFLUs] = mutation_flus(P), Proxy = orddict:fetch(HeadFLU, PD), - io:format(user, "append ~w,", [Proxy]), case ?FLU_PC:append_chunk_extra(Proxy, EpochID, Prefix, Chunk, ChunkExtra, ?TIMEOUT) of {ok, {Offset, _Size, File}=_X} -> - %% io:format(user, "append ~w@~p,~w,", [HeadFLU, File, Offset]), - io:format(user, "append ~w,", [HeadFLU]), + %% io:format(user, "append ~w,", [HeadFLU]), do_append_midtail(RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra, [HeadFLU], 0, STime, S); {error, Retry} @@ -338,9 +339,9 @@ do_append_midtail(RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra, when RestFLUs == [] orelse Depth == 0 -> do_append_midtail2(RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra, Ws, Depth + 1, STime, S); -do_append_midtail(RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra, +do_append_midtail(_RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra, Ws, Depth, STime, #state{proj=P}=S) -> - io:format(user, "append sleep2,", []), + %% io:format(user, "midtail sleep2,", []), sleep_a_while(Depth), DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, if DiffMs > ?MAX_RUNTIME -> @@ -353,7 +354,7 @@ do_append_midtail(RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra, P2 -> RestFLUs2 = mutation_flus(P2), case RestFLUs2 -- Ws of - RestFLUs -> + RestFLUs2 -> %% None of the writes that we have done so far %% are to FLUs that are in the RestFLUs2 list. %% We are pessimistic here and assume that @@ -362,28 +363,26 @@ do_append_midtail(RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra, %% the 2nd have of the impl (we have already %% slept & refreshed the projection). do_append_head2(Prefix, Chunk, ChunkExtra, Depth, - STime, S); + STime, S2); RestFLUs3 -> do_append_midtail2(RestFLUs3, Prefix, File, Offset, Chunk, ChunkExtra, - Ws, Depth + 1, STime, S) + Ws, Depth + 1, STime, S2) end end end. do_append_midtail2([], _Prefix, File, Offset, Chunk, _ChunkExtra, _Ws, _Depth, _STime, S) -> - io:format(user, "ok!\n", []), + %% io:format(user, "ok!\n", []), {reply, {ok, {Offset, iolist_size(Chunk), File}}, S}; do_append_midtail2([FLU|RestFLUs]=FLUs, Prefix, File, Offset, Chunk, ChunkExtra, Ws, Depth, STime, #state{epoch_id=EpochID, proxies_dict=PD}=S) -> Proxy = orddict:fetch(FLU, PD), - io:format(user, "write ~w,", [Proxy]), case ?FLU_PC:write_chunk(Proxy, EpochID, File, Offset, Chunk, ?TIMEOUT) of ok -> - %% io:format(user, "write ~w@~p~w,", [FLU, File, Offset]), - io:format(user, "write ~w,", [FLU]), + %% io:format(user, "write ~w,", [FLU]), do_append_midtail2(RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra, [FLU|Ws], Depth, STime, S); {error, Retry} @@ -392,25 +391,149 @@ do_append_midtail2([FLU|RestFLUs]=FLUs, Prefix, File, Offset, Chunk, ChunkExtra, Ws, Depth, STime, S); {error, written} -> exit({todo,read_repair,?MODULE,?LINE,File,Offset,iolist_size(Chunk)}) - %% read_repair(P#projection_v1.upi ++ P#projection_v1.repairing, - %% Chunk, Depth, STime, S) - %% Chunk1 = if is_binary(Chunk) -> Chunk; - %% is_list(Chunk) -> list_to_binary(Chunk) - %% end, - %% case ?FLU_PC:read_chunk(Proxy, EpochID, File, Offset, - %% size(Chunk1), ?TIMEOUT) of - %% {ok, Chunk2} when Chunk2 == Chunk1 -> - %% %% Someone has been read-repairing this chunk. - %% %% Keep going. - %% do_append_chunk_extra2(RestFLUs, File, Offset, Chunk, - %% OldHeadFLU, OkCount + 1, S); - %% {error, not_written} -> - %% exit({todo_should_never_happen,?MODULE,?LINE, - %% File,Offset, size(Chunk1)}); - %% { -> %% TODO return values here end. +do_read_chunk(File, Offset, Size, 0=Depth, STime, + #state{proj=#projection_v1{upi=[_|_]}}=S) -> % UPI is non-empty + do_read_chunk2(File, Offset, Size, Depth, STime, S); +do_read_chunk(File, Offset, Size, Depth, STime, #state{proj=P}=S) -> + %% io:format(user, "read sleep1,", []), + sleep_a_while(Depth), + DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, + if DiffMs > ?MAX_RUNTIME -> + {error, partition}; + true -> + S2 = update_proj(S#state{proj=undefined, bad_proj=P}), + case S2#state.proj of + P2 when P2 == undefined orelse + P2#projection_v1.upi == [] -> + do_read_chunk(File, Offset, Size, Depth + 1, STime, S2); + _ -> + do_read_chunk2(File, Offset, Size, Depth + 1, STime, S2) + end + end. + +do_read_chunk2(File, Offset, Size, Depth, STime, + #state{proj=P, epoch_id=EpochID, proxies_dict=PD}=S) -> + UPI = readonly_flus(P), + Head = hd(UPI), + Tail = lists:last(UPI), + case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), EpochID, + File, Offset, Size, ?TIMEOUT) of + {ok, Chunk} when byte_size(Chunk) == Size -> + {{ok, Chunk}, S}; + {ok, BadChunk} -> + exit({todo, bad_chunk_size, ?MODULE, ?LINE, File, Offset, Size, + got, byte_size(BadChunk)}); + {error, Retry} + when Retry == partition; Retry == bad_epoch; Retry == wedged -> + do_read_chunk(File, Offset, Size, Depth, STime, S); + {error, not_written} when Tail == Head -> + {{error, not_written}, S}; + {error, not_written} when Tail /= Head -> + read_repair(read, File, Offset, Size, Depth, STime, S); + {error, written} -> + exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size}) + end. + +read_repair(ReturnMode, File, Offset, Size, 0=Depth, STime, + #state{proj=#projection_v1{upi=[_|_]}}=S) -> % UPI is non-empty + read_repair2(ReturnMode, File, Offset, Size, Depth, STime, S); +read_repair(ReturnMode, File, Offset, Size, Depth, STime, #state{proj=P}=S) -> + %% io:format(user, "read_repair sleep1,", []), + sleep_a_while(Depth), + DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, + if DiffMs > ?MAX_RUNTIME -> + {error, partition}; + true -> + S2 = update_proj(S#state{proj=undefined, bad_proj=P}), + case S2#state.proj of + P2 when P2 == undefined orelse + P2#projection_v1.upi == [] -> + read_repair(ReturnMode, File, Offset, Size, + Depth + 1, STime, S2); + _ -> + read_repair2(ReturnMode, File, Offset, Size, + Depth + 1, STime, S2) + end + end. + +read_repair2(ReturnMode, File, Offset, Size, Depth, STime, + #state{proj=P, epoch_id=EpochID, proxies_dict=PD}=S) -> + [Head|MidsTails] = readonly_flus(P), + case ?FLU_PC:read_chunk(orddict:fetch(Head, PD), EpochID, + File, Offset, Size, ?TIMEOUT) of + {ok, Chunk} when byte_size(Chunk) == Size -> + read_repair3(MidsTails, ReturnMode, Chunk, [Head], File, Offset, + Size, Depth, STime, S); + {ok, BadChunk} -> + exit({todo, bad_chunk_size, ?MODULE, ?LINE, File, Offset, Size, + got, byte_size(BadChunk)}); + {error, Retry} + when Retry == partition; Retry == bad_epoch; Retry == wedged -> + read_repair(ReturnMode, File, Offset, Size, Depth, STime, S); + {error, not_written} -> + {{error, not_written}, S}; + {error, written} -> + exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size}) + end. + +read_repair3([], ReturnMode, Chunk, Repaired, File, Offset, + Size, Depth, STime, S) -> + read_repair4([], ReturnMode, Chunk, Repaired, File, Offset, + Size, Depth, STime, S); +read_repair3(MidsTails, ReturnMode, Chunk, Repaired, File, Offset, + Size, 0=Depth, STime, S) -> + read_repair4(MidsTails, ReturnMode, Chunk, Repaired, File, Offset, + Size, Depth, STime, S); +read_repair3(MidsTails, ReturnMode, Chunk, File, Repaired, Offset, + Size, Depth, STime, #state{proj=P}=S) -> + %% io:format(user, "read_repair3 sleep1,", []), + sleep_a_while(Depth), + DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, + if DiffMs > ?MAX_RUNTIME -> + {error, partition}; + true -> + S2 = update_proj(S#state{proj=undefined, bad_proj=P}), + case S2#state.proj of + P2 when P2 == undefined orelse + P2#projection_v1.upi == [] -> + read_repair3(MidsTails, ReturnMode, Chunk, Repaired, File, + Offset, Size, Depth + 1, STime, S2); + P2 -> + MidsTails2 = P2#projection_v1.upi -- Repaired, + read_repair4(MidsTails2, ReturnMode, Chunk, Repaired, File, + Offset, Size, Depth + 1, STime, S2) + end + end. + +read_repair4([], ReturnMode, Chunk, Repaired, File, Offset, + Size, Depth, STime, S) -> + case ReturnMode of + read -> + {reply, {ok, Chunk}, S} + end; +read_repair4([First|Rest]=MidsTails, ReturnMode, Chunk, Repaired, File, Offset, + Size, Depth, STime, #state{epoch_id=EpochID, proxies_dict=PD}=S) -> + Proxy = orddict:fetch(First, PD), + case ?FLU_PC:write_chunk(Proxy, EpochID, File, Offset, Chunk, ?TIMEOUT) of + ok -> + read_repair4(Rest, ReturnMode, Chunk, Repaired, File, Offset, + Size, Depth, STime, S); + {error, Retry} + when Retry == partition; Retry == bad_epoch; Retry == wedged -> + read_repair3(MidsTails, ReturnMode, Chunk, Repaired, File, Offset, + Size, Depth, STime, S); + {error, written} -> + %% TODO: To be very paranoid, read the chunk here to verify + %% that it is exactly our Chunk. + read_repair4(Rest, ReturnMode, Chunk, Repaired, File, Offset, + Size, Depth, STime, S); + {error, not_written} -> + exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size}) + end. + update_proj(#state{proj=undefined}=S) -> update_proj2(1, S); update_proj(S) -> @@ -444,7 +567,6 @@ update_proj2(Count, #state{bad_proj=BadProj, proxies_dict=ProxiesDict}=S) -> P when P >= BadProj -> #projection_v1{epoch_number=Epoch, epoch_csum=CSum, members_dict=NewMembersDict} = P, - io:format(user, "~s: proj ~P\n", [?MODULE, P, 15]), EpochID = {Epoch, CSum}, ?FLU_PC:stop_proxies(ProxiesDict), NewProxiesDict = ?FLU_PC:start_proxies(NewMembersDict), diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl index c7bdc8d..31fe56d 100644 --- a/src/machi_flu1.erl +++ b/src/machi_flu1.erl @@ -292,8 +292,17 @@ net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) -> OffsetHex:16/binary, LenHex:8/binary, File:WriteFileLenLF/binary, "\n">> -> _EpochID = decode_epoch_id(EpochIDHex), + %% do_net_server_write(Sock, OffsetHex, LenHex, File, DataDir, + %% <<"fixme1">>, false, <<"fixme2">>); + if FluName == a -> do_net_server_write(Sock, OffsetHex, LenHex, File, DataDir, <<"fixme1">>, false, <<"fixme2">>); + true -> + ok = inet:setopts(Sock, [{packet, raw}]), + Len = machi_util:hexstr_to_int(LenHex), + {ok, Chunk} = gen_tcp:recv(Sock, Len), + ok = gen_tcp:send(Sock, <<"OK\n">>) + end; %% For data migration only. <<"DEL-migration ", EpochIDHex:(?EpochIDSpace)/binary, @@ -432,6 +441,7 @@ do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin, DataDir, <> = machi_util:hexstr_to_bin(LenHex), {_, Path} = machi_util:make_data_filename(DataDir, FileBin), OptsHasWrite = lists:member(write, FileOpts), + OptsHasRead = lists:member(read, FileOpts), case file:open(Path, FileOpts) of {ok, FH} -> try @@ -444,8 +454,9 @@ do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin, DataDir, Sock, OffsetHex, LenHex, FileBin, DataDir, FileOpts, DoItFun, EpochID, Wedged_p, CurrentEpochId); + {error, enoent} when OptsHasRead -> + ok = gen_tcp:send(Sock, <<"ERROR NO-SUCH-FILE\n">>); _Else -> - %%%%%% keep?? machi_util:verb("Else ~p ~p ~p ~p\n", [Offset, Len, Path, _Else]), ok = gen_tcp:send(Sock, <<"ERROR BAD-IO\n">>) end. diff --git a/src/machi_flu1_client.erl b/src/machi_flu1_client.erl index 611e3d3..002b5f3 100644 --- a/src/machi_flu1_client.erl +++ b/src/machi_flu1_client.erl @@ -515,8 +515,8 @@ read_chunk2(Sock, EpochID, File0, Offset, Size) -> {error, todo_erasure_coded}; %% escript_cc_parse_ec_info(Sock, Line, Else2); <<"ERR">> -> case Else2 of - <<"OR BAD-IO\n">> -> - {error, no_such_file}; + <<"OR NO-SUCH-FILE\n">> -> + {error, not_written}; <<"OR NOT-ERASURE\n">> -> {error, no_such_file}; <<"OR BAD-ARG\n">> -> diff --git a/src/machi_proxy_flu1_client.erl b/src/machi_proxy_flu1_client.erl index 79cff36..3d0100e 100644 --- a/src/machi_proxy_flu1_client.erl +++ b/src/machi_proxy_flu1_client.erl @@ -279,6 +279,7 @@ handle_cast(_Msg, S) -> {noreply, S}. handle_info(_Info, S) -> + io:format(user, "~s:handle_info: ~p\n", [?MODULE, _Info]), {noreply, S}. terminate(_Reason, _S) -> From eaf007ec08094ea658caf9b8c69d884984c3115c Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Tue, 19 May 2015 13:56:12 +0900 Subject: [PATCH 08/13] Fix read repair FLU tracking --- src/machi_cr_client.erl | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/machi_cr_client.erl b/src/machi_cr_client.erl index be4aacb..a62e410 100644 --- a/src/machi_cr_client.erl +++ b/src/machi_cr_client.erl @@ -519,17 +519,17 @@ read_repair4([First|Rest]=MidsTails, ReturnMode, Chunk, Repaired, File, Offset, Proxy = orddict:fetch(First, PD), case ?FLU_PC:write_chunk(Proxy, EpochID, File, Offset, Chunk, ?TIMEOUT) of ok -> - read_repair4(Rest, ReturnMode, Chunk, Repaired, File, Offset, - Size, Depth, STime, S); + read_repair4(Rest, ReturnMode, Chunk, [First|Repaired], File, + Offset, Size, Depth, STime, S); {error, Retry} when Retry == partition; Retry == bad_epoch; Retry == wedged -> - read_repair3(MidsTails, ReturnMode, Chunk, Repaired, File, Offset, - Size, Depth, STime, S); + read_repair3(MidsTails, ReturnMode, Chunk, Repaired, File, + Offset, Size, Depth, STime, S); {error, written} -> %% TODO: To be very paranoid, read the chunk here to verify %% that it is exactly our Chunk. - read_repair4(Rest, ReturnMode, Chunk, Repaired, File, Offset, - Size, Depth, STime, S); + read_repair4(Rest, ReturnMode, Chunk, Repaired, File, + Offset, Size, Depth, STime, S); {error, not_written} -> exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size}) end. From 079d15dd5c8955cb5f50ff563a995f886bd8de7c Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Tue, 19 May 2015 14:05:18 +0900 Subject: [PATCH 09/13] Derp, remove debugging goop + fix eunit @ write_chunk() response change --- TODO-shortterm.org | 1 + src/machi_flu1.erl | 9 --------- test/machi_flu1_test.erl | 6 +++--- 3 files changed, 4 insertions(+), 12 deletions(-) diff --git a/TODO-shortterm.org b/TODO-shortterm.org index 1849ca7..e1ec295 100644 --- a/TODO-shortterm.org +++ b/TODO-shortterm.org @@ -38,6 +38,7 @@ func, and pattern match Erlang style in that func. ** TODO Adapt the projection-aware, CR-implementing client from demo-day +** TODO Add major comment sections to the CR-impl client ** TODO Create parallel PULSE test for basic API plus chain manager repair ** TODO Add gproc and get rid of registered name rendezvous *** TODO Fixes the atom table leak diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl index 31fe56d..24deb43 100644 --- a/src/machi_flu1.erl +++ b/src/machi_flu1.erl @@ -292,17 +292,8 @@ net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) -> OffsetHex:16/binary, LenHex:8/binary, File:WriteFileLenLF/binary, "\n">> -> _EpochID = decode_epoch_id(EpochIDHex), - %% do_net_server_write(Sock, OffsetHex, LenHex, File, DataDir, - %% <<"fixme1">>, false, <<"fixme2">>); - if FluName == a -> do_net_server_write(Sock, OffsetHex, LenHex, File, DataDir, <<"fixme1">>, false, <<"fixme2">>); - true -> - ok = inet:setopts(Sock, [{packet, raw}]), - Len = machi_util:hexstr_to_int(LenHex), - {ok, Chunk} = gen_tcp:recv(Sock, Len), - ok = gen_tcp:send(Sock, <<"OK\n">>) - end; %% For data migration only. <<"DEL-migration ", EpochIDHex:(?EpochIDSpace)/binary, diff --git a/test/machi_flu1_test.erl b/test/machi_flu1_test.erl index a3be0b1..a34d977 100644 --- a/test/machi_flu1_test.erl +++ b/test/machi_flu1_test.erl @@ -114,9 +114,9 @@ flu_smoke_test() -> BadFile, Off2, Chunk2), {ok, Chunk2} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH, File2, Off2, Len2), - {error, no_such_file} = ?FLU_C:read_chunk(Host, TcpPort, - ?DUMMY_PV1_EPOCH, - "no!!", Off2, Len2), + {error, not_written} = ?FLU_C:read_chunk(Host, TcpPort, + ?DUMMY_PV1_EPOCH, + "no!!", Off2, Len2), {error, bad_arg} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH, BadFile, Off2, Len2), From 152e487060f1bb346c0e8db00691651e11438fee Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Tue, 19 May 2015 15:15:05 +0900 Subject: [PATCH 10/13] WIP: read-repair, new test is failing, yay --- TODO-shortterm.org | 1 + src/machi_cr_client.erl | 10 +++-- src/machi_flu1.erl | 11 ++++- src/machi_flu1_client.erl | 12 ++--- test/machi_cr_client_test.erl | 83 +++++++++++++++++++++++++++++++++++ test/machi_flu1_test.erl | 4 +- 6 files changed, 109 insertions(+), 12 deletions(-) create mode 100644 test/machi_cr_client_test.erl diff --git a/TODO-shortterm.org b/TODO-shortterm.org index e1ec295..b03096d 100644 --- a/TODO-shortterm.org +++ b/TODO-shortterm.org @@ -39,6 +39,7 @@ func, and pattern match Erlang style in that func. ** TODO Adapt the projection-aware, CR-implementing client from demo-day ** TODO Add major comment sections to the CR-impl client +** TODO Add client-side vs. server-side checksum type, expand client API? ** TODO Create parallel PULSE test for basic API plus chain manager repair ** TODO Add gproc and get rid of registered name rendezvous *** TODO Fixes the atom table leak diff --git a/src/machi_cr_client.erl b/src/machi_cr_client.erl index a62e410..7903816 100644 --- a/src/machi_cr_client.erl +++ b/src/machi_cr_client.erl @@ -390,8 +390,12 @@ do_append_midtail2([FLU|RestFLUs]=FLUs, Prefix, File, Offset, Chunk, do_append_midtail(FLUs, Prefix, File, Offset, Chunk, ChunkExtra, Ws, Depth, STime, S); {error, written} -> - exit({todo,read_repair,?MODULE,?LINE,File,Offset,iolist_size(Chunk)}) - %% TODO return values here + %% We know what the chunk ought to be, so jump to the + %% middle of read-repair. + read_repair3(FLUs, {append}, Chunk, [], File, Offset, + iolist_size(Chunk), Depth, STime, S); + {error, not_written} -> + exit({todo_should_never_happen,?MODULE,?LINE,File,Offset}) end. do_read_chunk(File, Offset, Size, 0=Depth, STime, @@ -422,7 +426,7 @@ do_read_chunk2(File, Offset, Size, Depth, STime, case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), EpochID, File, Offset, Size, ?TIMEOUT) of {ok, Chunk} when byte_size(Chunk) == Size -> - {{ok, Chunk}, S}; + {reply, {ok, Chunk}, S}; {ok, BadChunk} -> exit({todo, bad_chunk_size, ?MODULE, ?LINE, File, Offset, Size, got, byte_size(BadChunk)}); diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl index 24deb43..0106c37 100644 --- a/src/machi_flu1.erl +++ b/src/machi_flu1.erl @@ -193,8 +193,15 @@ run_listen_server(#state{flu_name=FluName, tcp_port=TcpPort}=S) -> register(make_listener_regname(FluName), self()), SockOpts = [{reuseaddr, true}, {mode, binary}, {active, false}, {packet, line}], - {ok, LSock} = gen_tcp:listen(TcpPort, SockOpts), - listen_server_loop(LSock, S). + case gen_tcp:listen(TcpPort, SockOpts) of + {ok, LSock} -> + listen_server_loop(LSock, S); + Else -> + error_logger:warning_msg("~s:run_listen_server: " + "listen to TCP port ~w: ~w\n", + [?MODULE, TcpPort, Else]), + exit({?MODULE, run_listen_server, tcp_port, TcpPort, Else}) + end. run_append_server(FluPid, AckPid, #state{flu_name=Name, wedged=Wedged_p,epoch_id=EpochId}=S) -> diff --git a/src/machi_flu1_client.erl b/src/machi_flu1_client.erl index 002b5f3..14f18a0 100644 --- a/src/machi_flu1_client.erl +++ b/src/machi_flu1_client.erl @@ -140,7 +140,7 @@ append_chunk_extra(Host, TcpPort, EpochID, Prefix, Chunk, ChunkExtra) -spec read_chunk(port_wrap(), epoch_id(), file_name(), file_offset(), chunk_size()) -> {ok, chunk_s()} | - {error, error_general() | 'no_such_file' | 'partial_read'} | + {error, error_general() | 'not_written' | 'partial_read'} | {error, term()}. read_chunk(Sock, EpochID, File, Offset, Size) when Offset >= ?MINIMUM_OFFSET, Size >= 0 -> @@ -151,7 +151,7 @@ read_chunk(Sock, EpochID, File, Offset, Size) -spec read_chunk(inet_host(), inet_port(), epoch_id(), file_name(), file_offset(), chunk_size()) -> {ok, chunk_s()} | - {error, error_general() | 'no_such_file' | 'partial_read'} | + {error, error_general() | 'not_written' | 'partial_read'} | {error, term()}. read_chunk(Host, TcpPort, EpochID, File, Offset, Size) when Offset >= ?MINIMUM_OFFSET, Size >= 0 -> @@ -291,7 +291,7 @@ read_projection(Host, TcpPort, ProjType, Epoch) %% @doc Write a projection `Proj' of type `ProjType'. -spec write_projection(port_wrap(), projection_type(), projection()) -> - 'ok' | {error, written} | {error, term()}. + 'ok' | {error, 'written'} | {error, term()}. write_projection(Sock, ProjType, Proj) when ProjType == 'public' orelse ProjType == 'private', is_record(Proj, projection_v1) -> @@ -301,7 +301,7 @@ write_projection(Sock, ProjType, Proj) -spec write_projection(inet_host(), inet_port(), projection_type(), projection()) -> - 'ok' | {error, written} | {error, term()}. + 'ok' | {error, 'written'} | {error, term()}. write_projection(Host, TcpPort, ProjType, Proj) when ProjType == 'public' orelse ProjType == 'private', is_record(Proj, projection_v1) -> @@ -518,7 +518,9 @@ read_chunk2(Sock, EpochID, File0, Offset, Size) -> <<"OR NO-SUCH-FILE\n">> -> {error, not_written}; <<"OR NOT-ERASURE\n">> -> - {error, no_such_file}; + %% {error, no_such_file}; + %% Ignore the fact that the file doesn't exist. + {error, not_written}; <<"OR BAD-ARG\n">> -> {error, bad_arg}; <<"OR PARTIAL-READ\n">> -> diff --git a/test/machi_cr_client_test.erl b/test/machi_cr_client_test.erl new file mode 100644 index 0000000..ad3a2e1 --- /dev/null +++ b/test/machi_cr_client_test.erl @@ -0,0 +1,83 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +-module(machi_cr_client_test). + +-behaviour(gen_server). + +-include("machi.hrl"). +-include("machi_projection.hrl"). + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + + +smoke_test() -> + os:cmd("rm -rf ./data.a ./data.b ./data.c"), + {ok, SupPid} = machi_flu_sup:start_link(), + try + Prefix = <<"pre">>, + Chunk1 = <<"yochunk">>, + Host = "localhost", + PortBase = 4444, + {ok,_}=machi_flu_psup:start_flu_package(a, PortBase+0, "./data.a", []), + {ok,_}=machi_flu_psup:start_flu_package(b, PortBase+1, "./data.b", []), + {ok,_}=machi_flu_psup:start_flu_package(c, PortBase+2, "./data.c", []), + D = orddict:from_list( + [{a,{p_srvr,a,machi_flu1_client,"localhost",PortBase+0,[]}}, + {b,{p_srvr,b,machi_flu1_client,"localhost",PortBase+1,[]}}, + {c,{p_srvr,c,machi_flu1_client,"localhost",PortBase+2,[]}}]), + ok = machi_chain_manager1:set_chain_members(a_chmgr, D), + ok = machi_chain_manager1:set_chain_members(b_chmgr, D), + ok = machi_chain_manager1:set_chain_members(c_chmgr, D), + machi_projection_store:read_latest_projection(a_pstore, private), + + {ok, C1} = machi_cr_client:start_link([P || {_,P}<-orddict:to_list(D)]), + machi_cr_client:append_chunk(C1, Prefix, Chunk1), + %% {machi_flu_psup:stop_flu_package(c), timer:sleep(50)}, + {ok, {Off1,Size1,File1}} = + machi_cr_client:append_chunk(C1, Prefix, Chunk1), + {ok, Chunk1} = machi_cr_client:read_chunk(C1, File1, Off1, Size1), + {ok, EpochID} = machi_flu1_client:get_latest_epochid(Host, PortBase+0, + private), + %% Verify that the client's CR wrote to all of them. + [{ok, Chunk1} = machi_flu1_client:read_chunk( + Host, PortBase+X, EpochID, File1, Off1, Size1) || + X <- [0,1,2] ], + + %% Manually write to head, then verify that read-repair fixes all. + FooOff1 = Off1 + (1024*1024), + [{error, not_written} = machi_flu1_client:read_chunk( + Host, PortBase+X, EpochID, + File1, FooOff1, Size1) || X <- [0,1,2] ], + ok = machi_flu1_client:write_chunk(Host, PortBase+0, EpochID, + File1, FooOff1, Chunk1), + {ok, Chunk1} = machi_cr_client:read_chunk(C1, File1, FooOff1, Size1), + [{X,{ok, Chunk1}} = {X,machi_flu1_client:read_chunk( + Host, PortBase+X, EpochID, + File1, FooOff1, Size1)} || X <- [0,1,2] ], + + ok + after + catch application:stop(machi), + exit(SupPid, normal) + end. + +-endif. % TEST. diff --git a/test/machi_flu1_test.erl b/test/machi_flu1_test.erl index a34d977..143e79a 100644 --- a/test/machi_flu1_test.erl +++ b/test/machi_flu1_test.erl @@ -80,9 +80,9 @@ flu_smoke_test() -> BadPrefix, Chunk1), {ok, [{_,File1}]} = ?FLU_C:list_files(Host, TcpPort, ?DUMMY_PV1_EPOCH), Len1 = size(Chunk1), - {error, no_such_file} = ?FLU_C:read_chunk(Host, TcpPort, + {error, not_written} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH, - File1, Off1*983, Len1), + File1, Off1*983829323, Len1), {error, partial_read} = ?FLU_C:read_chunk(Host, TcpPort, ?DUMMY_PV1_EPOCH, File1, Off1, Len1*984), From a4266e8aa44e41018c72581a19bc1c8c7d528749 Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Tue, 19 May 2015 19:32:48 +0900 Subject: [PATCH 11/13] Fix known chain repair bugs, add basic smoke test --- src/machi_chain_manager1.erl | 7 +- src/machi_chain_repair.erl | 2 - src/machi_cr_client.erl | 178 +++++++++++++++++++++++----------- test/machi_cr_client_test.erl | 60 ++++++++++-- 4 files changed, 183 insertions(+), 64 deletions(-) diff --git a/src/machi_chain_manager1.erl b/src/machi_chain_manager1.erl index 594812d..6e48978 100644 --- a/src/machi_chain_manager1.erl +++ b/src/machi_chain_manager1.erl @@ -1930,9 +1930,12 @@ perhaps_start_repair( %% RepairOpts = [{repair_mode, check}, verbose], RepairFun = fun() -> do_repair(S, RepairOpts, ap_mode) end, LastUPI = lists:last(UPI), + IgnoreStabilityTime_p = proplists:get_value(ignore_stability_time, + S#ch_mgr.opts, false), case timer:now_diff(os:timestamp(), Start) div 1000000 of - N when MyName == LastUPI, - N >= ?REPAIR_START_STABILITY_TIME -> + N when MyName == LastUPI andalso + (IgnoreStabilityTime_p orelse + N >= ?REPAIR_START_STABILITY_TIME) -> {WorkerPid, _Ref} = spawn_monitor(RepairFun), S#ch_mgr{repair_worker=WorkerPid, repair_start=os:timestamp(), diff --git a/src/machi_chain_repair.erl b/src/machi_chain_repair.erl index 1ef772f..b890d80 100644 --- a/src/machi_chain_repair.erl +++ b/src/machi_chain_repair.erl @@ -107,8 +107,6 @@ repair(ap_mode=ConsistencyMode, Src, Repairing, UPI, MembersDict, ETS, Opts) -> catch What:Why -> Stack = erlang:get_stacktrace(), - io:format(user, "What Why ~p ~p @\n\t~p\n", - [What, Why, Stack]), {error, {What, Why, Stack}} after [(catch machi_proxy_flu1_client:quit(Pid)) || diff --git a/src/machi_cr_client.erl b/src/machi_cr_client.erl index 7903816..1299c28 100644 --- a/src/machi_cr_client.erl +++ b/src/machi_cr_client.erl @@ -261,7 +261,7 @@ handle_call({req, Req}, From, S) -> handle_call(quit, _From, S) -> {stop, normal, ok, S}; handle_call(_Request, _From, S) -> - Reply = ok, + Reply = whaaaaaaaaaaaaaaaaaaaa, {reply, Reply, S}. handle_cast(_Msg, S) -> @@ -292,7 +292,7 @@ do_append_head(Prefix, Chunk, ChunkExtra, Depth, STime, #state{proj=P}=S) -> sleep_a_while(Depth), DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, if DiffMs > ?MAX_RUNTIME -> - {error, partition}; + {reply, {error, partition}, S}; true -> %% This is suboptimal for performance: there are some paths %% through this point where our current projection is good @@ -345,12 +345,12 @@ do_append_midtail(_RestFLUs, Prefix, File, Offset, Chunk, ChunkExtra, sleep_a_while(Depth), DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, if DiffMs > ?MAX_RUNTIME -> - {error, partition}; + {reply, {error, partition}, S}; true -> S2 = update_proj(S#state{proj=undefined, bad_proj=P}), case S2#state.proj of undefined -> - {error, partition}; + {reply, {error, partition}, S}; P2 -> RestFLUs2 = mutation_flus(P2), case RestFLUs2 -- Ws of @@ -400,13 +400,13 @@ do_append_midtail2([FLU|RestFLUs]=FLUs, Prefix, File, Offset, Chunk, do_read_chunk(File, Offset, Size, 0=Depth, STime, #state{proj=#projection_v1{upi=[_|_]}}=S) -> % UPI is non-empty - do_read_chunk2(File, Offset, Size, Depth, STime, S); + do_read_chunk2(File, Offset, Size, Depth + 1, STime, S); do_read_chunk(File, Offset, Size, Depth, STime, #state{proj=P}=S) -> %% io:format(user, "read sleep1,", []), sleep_a_while(Depth), DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, if DiffMs > ?MAX_RUNTIME -> - {error, partition}; + {reply, {error, partition}, S}; true -> S2 = update_proj(S#state{proj=undefined, bad_proj=P}), case S2#state.proj of @@ -421,8 +421,8 @@ do_read_chunk(File, Offset, Size, Depth, STime, #state{proj=P}=S) -> do_read_chunk2(File, Offset, Size, Depth, STime, #state{proj=P, epoch_id=EpochID, proxies_dict=PD}=S) -> UPI = readonly_flus(P), - Head = hd(UPI), Tail = lists:last(UPI), + ConsistencyMode = P#projection_v1.mode, case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), EpochID, File, Offset, Size, ?TIMEOUT) of {ok, Chunk} when byte_size(Chunk) == Size -> @@ -433,52 +433,84 @@ do_read_chunk2(File, Offset, Size, Depth, STime, {error, Retry} when Retry == partition; Retry == bad_epoch; Retry == wedged -> do_read_chunk(File, Offset, Size, Depth, STime, S); - {error, not_written} when Tail == Head -> - {{error, not_written}, S}; - {error, not_written} when Tail /= Head -> - read_repair(read, File, Offset, Size, Depth, STime, S); + {error, not_written} -> + read_repair(ConsistencyMode, read, File, Offset, Size, Depth, STime, S); + %% {reply, {error, not_written}, S}; {error, written} -> exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size}) end. -read_repair(ReturnMode, File, Offset, Size, 0=Depth, STime, - #state{proj=#projection_v1{upi=[_|_]}}=S) -> % UPI is non-empty - read_repair2(ReturnMode, File, Offset, Size, Depth, STime, S); -read_repair(ReturnMode, File, Offset, Size, Depth, STime, #state{proj=P}=S) -> - %% io:format(user, "read_repair sleep1,", []), +%% Read repair: depends on the consistency mode that we're in: +%% +%% CP mode: If the head is written, then use it to repair UPI++Repairing. +%% If head is not_written, then do nothing. +%% AP mode: If any FLU in UPI++Repairing is written, then use it to repair +%% UPI+repairing. +%% If all FLUs in UPI++Repairing are not_written, then do nothing. + +read_repair(ConsistencyMode, ReturnMode, File, Offset, Size, 0=Depth, + STime, #state{proj=#projection_v1{upi=[_|_]}}=S) -> % UPI is non-empty + read_repair2(ConsistencyMode, ReturnMode, File, Offset, Size, Depth + 1, + STime, S); +read_repair(ConsistencyMode, ReturnMode, File, Offset, Size, Depth, + STime, #state{proj=P}=S) -> sleep_a_while(Depth), DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, if DiffMs > ?MAX_RUNTIME -> - {error, partition}; + {reply, {error, partition}, S}; true -> S2 = update_proj(S#state{proj=undefined, bad_proj=P}), case S2#state.proj of P2 when P2 == undefined orelse P2#projection_v1.upi == [] -> - read_repair(ReturnMode, File, Offset, Size, - Depth + 1, STime, S2); + read_repair(ConsistencyMode, ReturnMode, File, Offset, + Size, Depth + 1, STime, S2); _ -> - read_repair2(ReturnMode, File, Offset, Size, - Depth + 1, STime, S2) + read_repair2(ConsistencyMode, ReturnMode, File, Offset, + Size, Depth + 1, STime, S2) end end. -read_repair2(ReturnMode, File, Offset, Size, Depth, STime, +read_repair2(cp_mode=ConsistencyMode, + ReturnMode, File, Offset, Size, Depth, STime, #state{proj=P, epoch_id=EpochID, proxies_dict=PD}=S) -> - [Head|MidsTails] = readonly_flus(P), - case ?FLU_PC:read_chunk(orddict:fetch(Head, PD), EpochID, + Tail = lists:last(readonly_flus(P)), + case ?FLU_PC:read_chunk(orddict:fetch(Tail, PD), EpochID, File, Offset, Size, ?TIMEOUT) of {ok, Chunk} when byte_size(Chunk) == Size -> - read_repair3(MidsTails, ReturnMode, Chunk, [Head], File, Offset, + ToRepair = mutation_flus(P) -- [Tail], + read_repair3(ToRepair, ReturnMode, Chunk, [Tail], File, Offset, Size, Depth, STime, S); {ok, BadChunk} -> - exit({todo, bad_chunk_size, ?MODULE, ?LINE, File, Offset, Size, - got, byte_size(BadChunk)}); + exit({todo, bad_chunk_size, ?MODULE, ?LINE, File, Offset, + Size, got, byte_size(BadChunk)}); {error, Retry} when Retry == partition; Retry == bad_epoch; Retry == wedged -> - read_repair(ReturnMode, File, Offset, Size, Depth, STime, S); + read_repair(ConsistencyMode, ReturnMode, File, Offset, + Size, Depth, STime, S); {error, not_written} -> - {{error, not_written}, S}; + {reply, {error, not_written}, S}; + {error, written} -> + exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size}) + end; +read_repair2(ap_mode=ConsistencyMode, + ReturnMode, File, Offset, Size, Depth, STime, + #state{proj=P, epoch_id=EpochID, proxies_dict=PD}=S) -> + Eligible = mutation_flus(P), + case try_to_find_chunk(Eligible, File, Offset, Size, S) of + {ok, Chunk, GotItFrom} when byte_size(Chunk) == Size -> + ToRepair = mutation_flus(P) -- [GotItFrom], + read_repair3(ToRepair, ReturnMode, Chunk, [GotItFrom], File, + Offset, Size, Depth, STime, S); + {ok, BadChunk} -> + exit({todo, bad_chunk_size, ?MODULE, ?LINE, File, + Offset, Size, got, byte_size(BadChunk)}); + {error, Retry} + when Retry == partition; Retry == bad_epoch; Retry == wedged -> + read_repair(ConsistencyMode, ReturnMode, File, + Offset, Size, Depth, STime, S); + {error, not_written} -> + {reply, {error, not_written}, S}; {error, written} -> exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size}) end. @@ -487,27 +519,27 @@ read_repair3([], ReturnMode, Chunk, Repaired, File, Offset, Size, Depth, STime, S) -> read_repair4([], ReturnMode, Chunk, Repaired, File, Offset, Size, Depth, STime, S); -read_repair3(MidsTails, ReturnMode, Chunk, Repaired, File, Offset, +read_repair3(ToRepair, ReturnMode, Chunk, Repaired, File, Offset, Size, 0=Depth, STime, S) -> - read_repair4(MidsTails, ReturnMode, Chunk, Repaired, File, Offset, - Size, Depth, STime, S); -read_repair3(MidsTails, ReturnMode, Chunk, File, Repaired, Offset, + read_repair4(ToRepair, ReturnMode, Chunk, Repaired, File, Offset, + Size, Depth + 1, STime, S); +read_repair3(ToRepair, ReturnMode, Chunk, Repaired, File, Offset, Size, Depth, STime, #state{proj=P}=S) -> %% io:format(user, "read_repair3 sleep1,", []), sleep_a_while(Depth), DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, if DiffMs > ?MAX_RUNTIME -> - {error, partition}; + {reply, {error, partition}, S}; true -> S2 = update_proj(S#state{proj=undefined, bad_proj=P}), case S2#state.proj of P2 when P2 == undefined orelse P2#projection_v1.upi == [] -> - read_repair3(MidsTails, ReturnMode, Chunk, Repaired, File, + read_repair3(ToRepair, ReturnMode, Chunk, Repaired, File, Offset, Size, Depth + 1, STime, S2); P2 -> - MidsTails2 = P2#projection_v1.upi -- Repaired, - read_repair4(MidsTails2, ReturnMode, Chunk, Repaired, File, + ToRepair2 = mutation_flus(P2) -- Repaired, + read_repair4(ToRepair2, ReturnMode, Chunk, Repaired, File, Offset, Size, Depth + 1, STime, S2) end end. @@ -518,7 +550,7 @@ read_repair4([], ReturnMode, Chunk, Repaired, File, Offset, read -> {reply, {ok, Chunk}, S} end; -read_repair4([First|Rest]=MidsTails, ReturnMode, Chunk, Repaired, File, Offset, +read_repair4([First|Rest]=ToRepair, ReturnMode, Chunk, Repaired, File, Offset, Size, Depth, STime, #state{epoch_id=EpochID, proxies_dict=PD}=S) -> Proxy = orddict:fetch(First, PD), case ?FLU_PC:write_chunk(Proxy, EpochID, File, Offset, Chunk, ?TIMEOUT) of @@ -527,7 +559,7 @@ read_repair4([First|Rest]=MidsTails, ReturnMode, Chunk, Repaired, File, Offset, Offset, Size, Depth, STime, S); {error, Retry} when Retry == partition; Retry == bad_epoch; Retry == wedged -> - read_repair3(MidsTails, ReturnMode, Chunk, Repaired, File, + read_repair3(ToRepair, ReturnMode, Chunk, Repaired, File, Offset, Size, Depth, STime, S); {error, written} -> %% TODO: To be very paranoid, read the chunk here to verify @@ -545,22 +577,12 @@ update_proj(S) -> update_proj2(Count, #state{bad_proj=BadProj, proxies_dict=ProxiesDict}=S) -> Timeout = 2*1000, - Parent = self(), + WTimeout = 2*Timeout, Proxies = orddict:to_list(ProxiesDict), - MiddleWorker = spawn( - fun() -> - PidsMons = - [spawn_monitor(fun() -> - exit(catch ?FLU_PC:read_latest_projection( - Proxy, private, Timeout)) - end) || {_K, Proxy} <- Proxies], - Rs = gather_worker_statuses(PidsMons, Timeout*2), - Parent ! {res, self(), Rs}, - exit(normal) - end), - Rs = receive {res, MiddleWorker, Results} -> Results - after Timeout*2 -> [] - end, + Work = fun({_K, Proxy}) -> + ?FLU_PC:read_latest_projection(Proxy, private, Timeout) + end, + Rs = run_middleworker_job(Work, Proxies, WTimeout), %% TODO: There's a possible bug here when running multiple independent %% Machi clusters/chains. If our chain used to be [a,b], but our %% sysadmin has changed our cluster to be [a] and a completely seprate @@ -581,6 +603,26 @@ update_proj2(Count, #state{bad_proj=BadProj, proxies_dict=ProxiesDict}=S) -> update_proj2(Count + 1, S) end. +run_middleworker_job(Fun, ArgList, WTimeout) -> + Parent = self(), + MiddleWorker = + spawn(fun() -> + PidsMons = + [spawn_monitor(fun() -> + Res = (catch Fun(Arg)), + exit(Res) + end) || Arg <- ArgList], + Rs = gather_worker_statuses(PidsMons, WTimeout), + Parent ! {res, self(), Rs}, + exit(normal) + end), + receive + {res, MiddleWorker, Results} -> + Results + after WTimeout+100 -> + [] + end. + gather_worker_statuses([], _Timeout) -> []; gather_worker_statuses([{Pid,Ref}|Rest], Timeout) -> @@ -600,6 +642,34 @@ choose_best_proj(Rs) -> BestEpoch end, WorstEpoch, Rs). +try_to_find_chunk(Eligible, File, Offset, Size, + #state{proj=P, epoch_id=EpochID, proxies_dict=PD}=S) -> + Timeout = 2*1000, + Work = fun(FLU) -> + Proxy = orddict:fetch(FLU, PD), + case ?FLU_PC:read_chunk(Proxy, EpochID, + File, Offset, Size) of + {ok, Chunk} when byte_size(Chunk) == Size -> + {FLU, {ok, Chunk}}; + Else -> + {FLU, Else} + end + end, + Rs = run_middleworker_job(Work, Eligible, Timeout), + case [X || {_, {ok, B}}=X <- Rs, is_binary(B)] of + [{FoundFLU, {ok, Chunk}}|_] -> + {ok, Chunk, FoundFLU}; + [] -> + RetryErrs = [partition, bad_epoch, wedged], + case [Err || {error, Err} <- Rs, lists:member(Err, RetryErrs)] of + [SomeErr|_] -> + {error, SomeErr}; + [] -> + %% TODO does this really work 100% of the time? + {error, not_written} + end + end. + mutation_flus(#projection_v1{upi=UPI, repairing=Repairing}) -> UPI ++ Repairing; mutation_flus(#state{proj=P}) -> diff --git a/test/machi_cr_client_test.erl b/test/machi_cr_client_test.erl index ad3a2e1..b2d39a6 100644 --- a/test/machi_cr_client_test.erl +++ b/test/machi_cr_client_test.erl @@ -32,37 +32,73 @@ smoke_test() -> os:cmd("rm -rf ./data.a ./data.b ./data.c"), {ok, SupPid} = machi_flu_sup:start_link(), + error_logger:tty(false), try Prefix = <<"pre">>, Chunk1 = <<"yochunk">>, Host = "localhost", PortBase = 4444, - {ok,_}=machi_flu_psup:start_flu_package(a, PortBase+0, "./data.a", []), - {ok,_}=machi_flu_psup:start_flu_package(b, PortBase+1, "./data.b", []), - {ok,_}=machi_flu_psup:start_flu_package(c, PortBase+2, "./data.c", []), + Os = [{ignore_stability_time, true}, {active_mode, false}], + {ok,_}=machi_flu_psup:start_flu_package(a, PortBase+0, "./data.a", Os), + {ok,_}=machi_flu_psup:start_flu_package(b, PortBase+1, "./data.b", Os), + {ok,_}=machi_flu_psup:start_flu_package(c, PortBase+2, "./data.c", Os), D = orddict:from_list( [{a,{p_srvr,a,machi_flu1_client,"localhost",PortBase+0,[]}}, {b,{p_srvr,b,machi_flu1_client,"localhost",PortBase+1,[]}}, {c,{p_srvr,c,machi_flu1_client,"localhost",PortBase+2,[]}}]), + %% Force the chain to repair & fully assemble as quickly as possible. + %% 1. Use set_chain_members() on all 3 + %% 2. Force a to run repair in a tight loop + %% 3. Stop as soon as we see UPI=[a,b,c] and also author=c. + %% Otherwise, we can have a race with later, minor + %% projection changes which will change our understanding of + %% the epoch id. (C is the author with highest weight.) + %% 4. Wait until all others are using epoch id from #3. + %% + %% Damn, this is a pain to make 100% deterministic, bleh. ok = machi_chain_manager1:set_chain_members(a_chmgr, D), ok = machi_chain_manager1:set_chain_members(b_chmgr, D), ok = machi_chain_manager1:set_chain_members(c_chmgr, D), - machi_projection_store:read_latest_projection(a_pstore, private), + TickAll = fun() -> [begin + Pid ! tick_check_environment, + timer:sleep(50) + end || Pid <- [a_chmgr,b_chmgr,c_chmgr] ] + end, + _ = lists:foldl( + fun(_, [{c,[a,b,c]}]=Acc) -> Acc; + (_, Acc) -> + TickAll(), % has some sleep time inside + Xs = [begin + {ok, Prj} = machi_projection_store:read_latest_projection(PStore, private), + {Prj#projection_v1.author_server, + Prj#projection_v1.upi} + end || PStore <- [a_pstore,b_pstore,c_pstore] ], + lists:usort(Xs) + end, undefined, lists:seq(1,10000)), + %% Everyone is settled on the same damn epoch id. + {ok, EpochID} = machi_flu1_client:get_latest_epochid(Host, PortBase+0, + private), + {ok, EpochID} = machi_flu1_client:get_latest_epochid(Host, PortBase+1, + private), + {ok, EpochID} = machi_flu1_client:get_latest_epochid(Host, PortBase+2, + private), + %% Whew ... ok, now start some damn tests. {ok, C1} = machi_cr_client:start_link([P || {_,P}<-orddict:to_list(D)]), machi_cr_client:append_chunk(C1, Prefix, Chunk1), %% {machi_flu_psup:stop_flu_package(c), timer:sleep(50)}, {ok, {Off1,Size1,File1}} = machi_cr_client:append_chunk(C1, Prefix, Chunk1), {ok, Chunk1} = machi_cr_client:read_chunk(C1, File1, Off1, Size1), - {ok, EpochID} = machi_flu1_client:get_latest_epochid(Host, PortBase+0, + {ok, PPP} = machi_flu1_client:read_latest_projection(Host, PortBase+0, private), %% Verify that the client's CR wrote to all of them. [{ok, Chunk1} = machi_flu1_client:read_chunk( Host, PortBase+X, EpochID, File1, Off1, Size1) || X <- [0,1,2] ], - %% Manually write to head, then verify that read-repair fixes all. + %% Test read repair: Manually write to head, then verify that + %% read-repair fixes all. FooOff1 = Off1 + (1024*1024), [{error, not_written} = machi_flu1_client:read_chunk( Host, PortBase+X, EpochID, @@ -74,8 +110,20 @@ smoke_test() -> Host, PortBase+X, EpochID, File1, FooOff1, Size1)} || X <- [0,1,2] ], + %% Test read repair: Manually write to middle, then same checking. + FooOff2 = Off1 + (2*1024*1024), + Chunk2 = <<"Middle repair chunk">>, + Size2 = size(Chunk2), + ok = machi_flu1_client:write_chunk(Host, PortBase+1, EpochID, + File1, FooOff2, Chunk2), + {ok, Chunk2} = machi_cr_client:read_chunk(C1, File1, FooOff2, Size2), + [{X,{ok, Chunk2}} = {X,machi_flu1_client:read_chunk( + Host, PortBase+X, EpochID, + File1, FooOff2, Size2)} || X <- [0,1,2] ], + ok after + error_logger:tty(true), catch application:stop(machi), exit(SupPid, normal) end. From b5ddfaf0198ea4cf3d0aed4c9a52ea0a853f84cc Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Tue, 19 May 2015 20:04:36 +0900 Subject: [PATCH 12/13] Finish basic API for machi_cr_client.erl --- TODO-shortterm.org | 2 +- src/machi_cr_client.erl | 275 ++++++++++++++++------------------ test/machi_cr_client_test.erl | 11 +- 3 files changed, 138 insertions(+), 150 deletions(-) diff --git a/TODO-shortterm.org b/TODO-shortterm.org index b03096d..749b0e0 100644 --- a/TODO-shortterm.org +++ b/TODO-shortterm.org @@ -38,7 +38,7 @@ func, and pattern match Erlang style in that func. ** TODO Adapt the projection-aware, CR-implementing client from demo-day -** TODO Add major comment sections to the CR-impl client +** DONE Add major comment sections to the CR-impl client ** TODO Add client-side vs. server-side checksum type, expand client API? ** TODO Create parallel PULSE test for basic API plus chain manager repair ** TODO Add gproc and get rid of registered name rendezvous diff --git a/src/machi_cr_client.erl b/src/machi_cr_client.erl index 1299c28..e6c145d 100644 --- a/src/machi_cr_client.erl +++ b/src/machi_cr_client.erl @@ -20,6 +20,28 @@ %% @doc Erlang API for the Machi client-implemented Chain Replication %% (CORFU-style) protocol. +%% +%% The major operation processing is implemented in a state machine-like +%% manner. Before attempting an operation `X', there's an initial +%% operation `pre-X' that takes care of updating the epoch id, +%% restarting client protocol proxies, and if there's any server +%% instability (e.g. some server is wedged), then insert some sleep +%% time. When the chain appears to have stabilized, then we try the `X' +%% operation again. +%% +%% Function name for the `pre-X' stuff is usually `X()', and the +%% function name for the `X' stuff is usually `X2()'. (I.e., the `X' +%% stuff follows after `pre-X' and therefore has a `2' suffix on the +%% function name.) +%% +%% In the case of read repair, there are two stages: find the value to +%% perform the repair, then perform the repair writes. In the case of +%% the repair writes, the `pre-X' function is named `read_repair3()', +%% and the `X' function is named `read_repair4()'. +%% +%% TODO: It would be nifty to lift the very-nearly-but-not-quite-boilerplate +%% of the `pre-X' functions into a single common function ... but I'm not +%% sure yet on how to do it without making the code uglier. -module(machi_cr_client). @@ -39,25 +61,11 @@ append_chunk/3, append_chunk/4, append_chunk_extra/4, append_chunk_extra/5, read_chunk/4, read_chunk/5, -%% checksum_list/3, checksum_list/4, -%% list_files/2, list_files/3, -%% wedge_status/1, wedge_status/2, + checksum_list/2, checksum_list/3, + list_files/1, list_files/2, -%% %% %% Projection API -%% get_epoch_id/1, get_epoch_id/2, -%% get_latest_epoch/2, get_latest_epoch/3, -%% read_latest_projection/2, read_latest_projection/3, -%% read_projection/3, read_projection/4, -%% write_projection/3, write_projection/4, -%% get_all_projections/2, get_all_projections/3, -%% list_all_projections/2, list_all_projections/3, - -%% %% Common API -%% quit/1, - -%% %% Internal API -%% write_chunk/5, write_chunk/6 - noop/0 + %% Common API + quit/1 ]). %% gen_server callbacks @@ -121,133 +129,33 @@ read_chunk(PidSpec, File, Offset, Size, Timeout) -> gen_server:call(PidSpec, {req, {read_chunk, File, Offset, Size}}, Timeout). -%% %% @doc Fetch the list of chunk checksums for `File'. +%% @doc Fetch the list of chunk checksums for `File'. -%% checksum_list(PidSpec, EpochID, File) -> -%% checksum_list(PidSpec, EpochID, File, infinity). +checksum_list(PidSpec, File) -> + checksum_list(PidSpec, File, infinity). -%% %% @doc Fetch the list of chunk checksums for `File'. +%% @doc Fetch the list of chunk checksums for `File'. -%% checksum_list(PidSpec, EpochID, File, Timeout) -> -%% gen_server:call(PidSpec, {req, {checksum_list, EpochID, File}}, -%% Timeout). +checksum_list(PidSpec, File, Timeout) -> + gen_server:call(PidSpec, {req, {checksum_list, File}}, + Timeout). -%% %% @doc Fetch the list of all files on the remote FLU. +%% @doc Fetch the list of all files on the remote FLU. -%% list_files(PidSpec, EpochID) -> -%% list_files(PidSpec, EpochID, infinity). +list_files(PidSpec) -> + list_files(PidSpec, infinity). -%% %% @doc Fetch the list of all files on the remote FLU. +%% @doc Fetch the list of all files on the remote FLU. -%% list_files(PidSpec, EpochID, Timeout) -> -%% gen_server:call(PidSpec, {req, {list_files, EpochID}}, -%% Timeout). +list_files(PidSpec, Timeout) -> + gen_server:call(PidSpec, {req, {list_files}}, + Timeout). -%% %% @doc Fetch the wedge status from the remote FLU. +%% @doc Quit & close the connection to remote FLU and stop our +%% proxy process. -%% wedge_status(PidSpec) -> -%% wedge_status(PidSpec, infinity). - -%% %% @doc Fetch the wedge status from the remote FLU. - -%% wedge_status(PidSpec, Timeout) -> -%% gen_server:call(PidSpec, {req, {wedge_status}}, -%% Timeout). - -%% %% @doc Get the `epoch_id()' of the FLU's current/latest projection. - -%% get_epoch_id(PidSpec) -> -%% get_epoch_id(PidSpec, infinity). - -%% %% @doc Get the `epoch_id()' of the FLU's current/latest projection. - -%% get_epoch_id(PidSpec, Timeout) -> -%% gen_server:call(PidSpec, {req, {get_epoch_id}}, Timeout). - -%% %% @doc Get the latest epoch number + checksum from the FLU's projection store. - -%% get_latest_epoch(PidSpec, ProjType) -> -%% get_latest_epoch(PidSpec, ProjType, infinity). - -%% %% @doc Get the latest epoch number + checksum from the FLU's projection store. - -%% get_latest_epoch(PidSpec, ProjType, Timeout) -> -%% gen_server:call(PidSpec, {req, {get_latest_epoch, ProjType}}, -%% Timeout). - -%% %% @doc Get the latest projection from the FLU's projection store for `ProjType' - -%% read_latest_projection(PidSpec, ProjType) -> -%% read_latest_projection(PidSpec, ProjType, infinity). - -%% %% @doc Get the latest projection from the FLU's projection store for `ProjType' - -%% read_latest_projection(PidSpec, ProjType, Timeout) -> -%% gen_server:call(PidSpec, {req, {read_latest_projection, ProjType}}, -%% Timeout). - -%% %% @doc Read a projection `Proj' of type `ProjType'. - -%% read_projection(PidSpec, ProjType, Epoch) -> -%% read_projection(PidSpec, ProjType, Epoch, infinity). - -%% %% @doc Read a projection `Proj' of type `ProjType'. - -%% read_projection(PidSpec, ProjType, Epoch, Timeout) -> -%% gen_server:call(PidSpec, {req, {read_projection, ProjType, Epoch}}, -%% Timeout). - -%% %% @doc Write a projection `Proj' of type `ProjType'. - -%% write_projection(PidSpec, ProjType, Proj) -> -%% write_projection(PidSpec, ProjType, Proj, infinity). - -%% %% @doc Write a projection `Proj' of type `ProjType'. - -%% write_projection(PidSpec, ProjType, Proj, Timeout) -> -%% gen_server:call(PidSpec, {req, {write_projection, ProjType, Proj}}, -%% Timeout). - -%% %% @doc Get all projections from the FLU's projection store. - -%% get_all_projections(PidSpec, ProjType) -> -%% get_all_projections(PidSpec, ProjType, infinity). - -%% %% @doc Get all projections from the FLU's projection store. - -%% get_all_projections(PidSpec, ProjType, Timeout) -> -%% gen_server:call(PidSpec, {req, {get_all_projections, ProjType}}, -%% Timeout). - -%% %% @doc Get all epoch numbers from the FLU's projection store. - -%% list_all_projections(PidSpec, ProjType) -> -%% list_all_projections(PidSpec, ProjType, infinity). - -%% %% @doc Get all epoch numbers from the FLU's projection store. - -%% list_all_projections(PidSpec, ProjType, Timeout) -> -%% gen_server:call(PidSpec, {req, {list_all_projections, ProjType}}, -%% Timeout). - -%% %% @doc Quit & close the connection to remote FLU and stop our -%% %% proxy process. - -%% quit(PidSpec) -> -%% gen_server:call(PidSpec, quit, infinity). - -%% %% @doc Write a chunk (binary- or iolist-style) of data to a file -%% %% with `Prefix' at `Offset'. - -%% write_chunk(PidSpec, EpochID, File, Offset, Chunk) -> -%% write_chunk(PidSpec, EpochID, File, Offset, Chunk, infinity). - -%% %% @doc Write a chunk (binary- or iolist-style) of data to a file -%% %% with `Prefix' at `Offset'. - -%% write_chunk(PidSpec, EpochID, File, Offset, Chunk, Timeout) -> -%% gen_server:call(PidSpec, {req, {write_chunk, EpochID, File, Offset, Chunk}}, -%% Timeout). +quit(PidSpec) -> + gen_server:call(PidSpec, quit, infinity). %%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -283,7 +191,11 @@ code_change(_OldVsn, S, _Extra) -> handle_call2({append_chunk_extra, Prefix, Chunk, ChunkExtra}, _From, S) -> do_append_head(Prefix, Chunk, ChunkExtra, 0, os:timestamp(), S); handle_call2({read_chunk, File, Offset, Size}, _From, S) -> - do_read_chunk(File, Offset, Size, 0, os:timestamp(), S). + do_read_chunk(File, Offset, Size, 0, os:timestamp(), S); +handle_call2({checksum_list, File}, _From, S) -> + do_checksum_list(File, 0, os:timestamp(), S); +handle_call2({list_files}, _From, S) -> + do_list_files(0, os:timestamp(), S). do_append_head(Prefix, Chunk, ChunkExtra, 0=Depth, STime, S) -> do_append_head2(Prefix, Chunk, ChunkExtra, Depth + 1, STime, S); @@ -392,7 +304,8 @@ do_append_midtail2([FLU|RestFLUs]=FLUs, Prefix, File, Offset, Chunk, {error, written} -> %% We know what the chunk ought to be, so jump to the %% middle of read-repair. - read_repair3(FLUs, {append}, Chunk, [], File, Offset, + Resume = {append, Offset, iolist_size(Chunk), File}, + read_repair3(FLUs, Resume, Chunk, [], File, Offset, iolist_size(Chunk), Depth, STime, S); {error, not_written} -> exit({todo_should_never_happen,?MODULE,?LINE,File,Offset}) @@ -495,7 +408,7 @@ read_repair2(cp_mode=ConsistencyMode, end; read_repair2(ap_mode=ConsistencyMode, ReturnMode, File, Offset, Size, Depth, STime, - #state{proj=P, epoch_id=EpochID, proxies_dict=PD}=S) -> + #state{proj=P}=S) -> Eligible = mutation_flus(P), case try_to_find_chunk(Eligible, File, Offset, Size, S) of {ok, Chunk, GotItFrom} when byte_size(Chunk) == Size -> @@ -544,11 +457,14 @@ read_repair3(ToRepair, ReturnMode, Chunk, Repaired, File, Offset, end end. -read_repair4([], ReturnMode, Chunk, Repaired, File, Offset, - Size, Depth, STime, S) -> +read_repair4([], ReturnMode, Chunk, _Repaired, File, Offset, + _IgnoreSize, _Depth, _STime, S) -> + %% TODO: add stats for # of repairs, length(_Repaired)-1, etc etc? case ReturnMode of read -> - {reply, {ok, Chunk}, S} + {reply, {ok, Chunk}, S}; + {append, Offset, Size, File} -> + {reply, {ok, {Offset, Size, File}}, S} end; read_repair4([First|Rest]=ToRepair, ReturnMode, Chunk, Repaired, File, Offset, Size, Depth, STime, #state{epoch_id=EpochID, proxies_dict=PD}=S) -> @@ -570,6 +486,76 @@ read_repair4([First|Rest]=ToRepair, ReturnMode, Chunk, Repaired, File, Offset, exit({todo_should_never_happen,?MODULE,?LINE,File,Offset,Size}) end. +do_checksum_list(File, 0=Depth, STime, S) -> + do_checksum_list2(File, Depth + 1, STime, S); +do_checksum_list(File, Depth, STime, #state{proj=P}=S) -> + sleep_a_while(Depth), + DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, + if DiffMs > ?MAX_RUNTIME -> + {reply, {error, partition}, S}; + true -> + %% This is suboptimal for performance: there are some paths + %% through this point where our current projection is good + %% enough. But we're going to try to keep the code as simple + %% as we can for now. + S2 = update_proj(S#state{proj=undefined, bad_proj=P}), + case S2#state.proj of + P2 when P2 == undefined orelse + P2#projection_v1.upi == [] -> + do_checksum_list(File, Depth + 1, STime, S2); + _ -> + do_checksum_list2(File, Depth + 1, STime, S2) + end + end. + +do_checksum_list2(File, Depth, STime, + #state{epoch_id=EpochID, proj=P, proxies_dict=PD}=S) -> + Proxy = orddict:fetch(lists:last(readonly_flus(P)), PD), + case ?FLU_PC:checksum_list(Proxy, EpochID, File, ?TIMEOUT) of + {ok, _}=OK -> + {reply, OK, S}; + {error, Retry} + when Retry == partition; Retry == bad_epoch; Retry == wedged -> + do_checksum_list(File, Depth, STime, S); + {error, _}=Error -> + {reply, Error, S} + end. + +do_list_files(0=Depth, STime, S) -> + do_list_files2(Depth + 1, STime, S); +do_list_files(Depth, STime, #state{proj=P}=S) -> + sleep_a_while(Depth), + DiffMs = timer:now_diff(os:timestamp(), STime) div 1000, + if DiffMs > ?MAX_RUNTIME -> + {reply, {error, partition}, S}; + true -> + %% This is suboptimal for performance: there are some paths + %% through this point where our current projection is good + %% enough. But we're going to try to keep the code as simple + %% as we can for now. + S2 = update_proj(S#state{proj=undefined, bad_proj=P}), + case S2#state.proj of + P2 when P2 == undefined orelse + P2#projection_v1.upi == [] -> + do_list_files(Depth + 1, STime, S2); + _ -> + do_list_files2(Depth + 1, STime, S2) + end + end. + +do_list_files2(Depth, STime, + #state{epoch_id=EpochID, proj=P, proxies_dict=PD}=S) -> + Proxy = orddict:fetch(lists:last(readonly_flus(P)), PD), + case ?FLU_PC:list_files(Proxy, EpochID, ?TIMEOUT) of + {ok, _}=OK -> + {reply, OK, S}; + {error, Retry} + when Retry == partition; Retry == bad_epoch; Retry == wedged -> + do_list_files(Depth, STime, S); + {error, _}=Error -> + {reply, Error, S} + end. + update_proj(#state{proj=undefined}=S) -> update_proj2(1, S); update_proj(S) -> @@ -643,7 +629,7 @@ choose_best_proj(Rs) -> end, WorstEpoch, Rs). try_to_find_chunk(Eligible, File, Offset, Size, - #state{proj=P, epoch_id=EpochID, proxies_dict=PD}=S) -> + #state{epoch_id=EpochID, proxies_dict=PD}) -> Timeout = 2*1000, Work = fun(FLU) -> Proxy = orddict:fetch(FLU, PD), @@ -686,6 +672,3 @@ sleep_a_while(1) -> ok; sleep_a_while(Depth) -> timer:sleep(30 + trunc(math:pow(1.9, Depth))). - -noop() -> - ok. diff --git a/test/machi_cr_client_test.erl b/test/machi_cr_client_test.erl index b2d39a6..3865b54 100644 --- a/test/machi_cr_client_test.erl +++ b/test/machi_cr_client_test.erl @@ -20,8 +20,6 @@ -module(machi_cr_client_test). --behaviour(gen_server). - -include("machi.hrl"). -include("machi_projection.hrl"). @@ -66,7 +64,7 @@ smoke_test() -> end, _ = lists:foldl( fun(_, [{c,[a,b,c]}]=Acc) -> Acc; - (_, Acc) -> + (_, _Acc) -> TickAll(), % has some sleep time inside Xs = [begin {ok, Prj} = machi_projection_store:read_latest_projection(PStore, private), @@ -121,6 +119,13 @@ smoke_test() -> Host, PortBase+X, EpochID, File1, FooOff2, Size2)} || X <- [0,1,2] ], + %% Misc API smoke + %% Checksum lists are 3-tuples + {ok, [{_,_,_}|_]} = machi_cr_client:checksum_list(C1, File1), + {error, no_such_file} = machi_cr_client:checksum_list(C1, <<"!!!!">>), + %% Exactly one file right now + {ok, [_]} = machi_cr_client:list_files(C1), + ok after error_logger:tty(true), From f78039261c80a5237add0d925c99f2dd22a98996 Mon Sep 17 00:00:00 2001 From: Scott Lystig Fritchie Date: Wed, 20 May 2015 11:05:53 +0900 Subject: [PATCH 13/13] TODO-shortterm.org updates --- TODO-shortterm.org | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/TODO-shortterm.org b/TODO-shortterm.org index 749b0e0..985ebf1 100644 --- a/TODO-shortterm.org +++ b/TODO-shortterm.org @@ -37,16 +37,17 @@ func, and pattern match Erlang style in that func. - Add no-wedging state to make testing easier? -** TODO Adapt the projection-aware, CR-implementing client from demo-day +** DONE Adapt the projection-aware, CR-implementing client from demo-day ** DONE Add major comment sections to the CR-impl client -** TODO Add client-side vs. server-side checksum type, expand client API? +** TODO Simple basho_bench driver, put some unscientific chalk on the benchtop ** TODO Create parallel PULSE test for basic API plus chain manager repair +** TODO Add client-side vs. server-side checksum type, expand client API? ** TODO Add gproc and get rid of registered name rendezvous *** TODO Fixes the atom table leak *** TODO Fixes the problem of having active sequencer for the same prefix on two FLUS in the same VM -** TODO Fix all known bugs with Chain Manager (list below) +** TODO Fix all known bugs/cruft with Chain Manager (list below) *** DONE Fix known bugs *** DONE Clean up crufty TODO comments and other obvious cruft *** TODO Re-add verification step of stable epochs, including inner projections!