diff --git a/TODO-shortterm.org b/TODO-shortterm.org index 8e01a13..1849ca7 100644 --- a/TODO-shortterm.org +++ b/TODO-shortterm.org @@ -31,20 +31,24 @@ func, and pattern match Erlang style in that func. *** DONE Preserve current test code (leave as-is? tiny changes?) *** DONE Make chain manager code flexible enough to run "real world" or "sim" ** DONE Add projection wedging logic to each FLU. -** Started.... Implement real data repair, orchestrated by the chain manager -** TODO Change all protocol ops to enforce the epoch ID +** DONE Implement real data repair, orchestrated by the chain manager +** DONE Change all protocol ops to enforce the epoch ID - Add no-wedging state to make testing easier? -** TODO Move the FLU server to gen_server behavior? + +** TODO Adapt the projection-aware, CR-implementing client from demo-day +** TODO Create parallel PULSE test for basic API plus chain manager repair ** TODO Add gproc and get rid of registered name rendezvous *** TODO Fixes the atom table leak *** TODO Fixes the problem of having active sequencer for the same prefix on two FLUS in the same VM -** TODO Fix all known bugs with Chain Manager +** TODO Fix all known bugs with Chain Manager (list below) *** DONE Fix known bugs *** DONE Clean up crufty TODO comments and other obvious cruft *** TODO Re-add verification step of stable epochs, including inner projections! *** TODO Attempt to remove cruft items in flapping_i? +** TODO Move the FLU server to gen_server behavior? + diff --git a/include/machi_chain_manager.hrl b/include/machi_chain_manager.hrl index 1fb4b5e..4f522f5 100644 --- a/include/machi_chain_manager.hrl +++ b/include/machi_chain_manager.hrl @@ -24,19 +24,3 @@ -type projection() :: #projection_v1{}. --record(ch_mgr, { - name :: pv1_server(), - flap_limit :: non_neg_integer(), - proj :: projection(), - %% - timer :: 'undefined' | timer:tref(), - ignore_timer :: boolean(), - proj_history :: queue:queue(), - flaps=0 :: integer(), - flap_start = ?NOT_FLAPPING - :: erlang:timestamp(), - runenv :: list(), %proplist() - opts :: list(), %proplist() - members_dict :: p_srvr_dict(), - proxies_dict :: orddict:orddict() - }). diff --git a/include/machi_projection.hrl b/include/machi_projection.hrl index 670116f..e20908a 100644 --- a/include/machi_projection.hrl +++ b/include/machi_projection.hrl @@ -29,7 +29,7 @@ -record(p_srvr, { name :: pv1_server(), - proto = 'ipv4' :: 'ipv4' | 'disterl', % disterl? Hrm. + proto_mod = 'machi_flu1_client' :: atom(), % Module name address :: term(), % Protocol-specific port :: term(), % Protocol-specific props = [] :: list() % proplist for other related info @@ -58,4 +58,8 @@ -define(SHA_MAX, (1 bsl (20*8))). +%% Set a limit to the maximum chain length, so that it's easier to +%% create a consistent projection ranking score. +-define(MAX_CHAIN_LENGTH, 64). + -endif. % !MACHI_PROJECTION_HRL diff --git a/src/machi_admin_util.erl b/src/machi_admin_util.erl index f0db9d0..25c96be 100644 --- a/src/machi_admin_util.erl +++ b/src/machi_admin_util.erl @@ -46,11 +46,11 @@ verify_file_checksums_local(Sock1, EpochID, Path) when is_port(Sock1) -> machi_flu1_client:epoch_id(), binary()|list()) -> {ok, [tuple()]} | {error, term()}. verify_file_checksums_local(Host, TcpPort, EpochID, Path) -> - Sock1 = machi_util:connect(Host, TcpPort), + Sock1 = ?FLU_C:connect(#p_srvr{address=Host, port=TcpPort}), try verify_file_checksums_local2(Sock1, EpochID, Path) after - catch gen_tcp:close(Sock1) + catch ?FLU_C:disconnect(Sock1) end. -spec verify_file_checksums_remote(port(), machi_flu1_client:epoch_id(), binary()|list()) -> @@ -62,11 +62,11 @@ verify_file_checksums_remote(Sock1, EpochID, File) when is_port(Sock1) -> machi_flu1_client:epoch_id(), binary()|list()) -> {ok, [tuple()]} | {error, term()}. verify_file_checksums_remote(Host, TcpPort, EpochID, File) -> - Sock1 = machi_util:connect(Host, TcpPort), + Sock1 = ?FLU_C:connect(#p_srvr{address=Host, port=TcpPort}), try verify_file_checksums_remote2(Sock1, EpochID, File) after - catch gen_tcp:close(Sock1) + catch ?FLU_C:disconnect(Sock1) end. %%%%%%%%%%%%%%%%%%%%%%%%%%% diff --git a/src/machi_chain_manager1.erl b/src/machi_chain_manager1.erl index 12b885e..5f73c6c 100644 --- a/src/machi_chain_manager1.erl +++ b/src/machi_chain_manager1.erl @@ -54,16 +54,43 @@ -include("machi_projection.hrl"). -include("machi_chain_manager.hrl"). +-record(ch_mgr, { + name :: pv1_server(), + flap_limit :: non_neg_integer(), + proj :: projection(), + %% + timer :: 'undefined' | timer:tref(), + ignore_timer :: boolean(), + proj_history :: queue:queue(), + flaps=0 :: integer(), + flap_start = ?NOT_FLAPPING + :: erlang:timestamp(), + repair_worker :: 'undefined' | pid(), + repair_start :: 'undefined' | erlang:timestamp(), + repair_final_status :: 'undefined' | term(), + runenv :: list(), %proplist() + opts :: list(), %proplist() + members_dict :: p_srvr_dict(), + proxies_dict :: orddict:orddict() + }). + -define(D(X), io:format(user, "~s ~p\n", [??X, X])). -define(Dw(X), io:format(user, "~s ~w\n", [??X, X])). --define(FLU_C, machi_flu1_client). -define(FLU_PC, machi_proxy_flu1_client). -define(TO, (2*1000)). % default timeout %% Keep a history of our flowchart execution in the process dictionary. -define(REACT(T), put(react, [T|get(react)])). +%% Define the period of private projection stability before we'll +%% start repair. +-ifdef(TEST). +-define(REPAIR_START_STABILITY_TIME, 3). +-else. % TEST +-define(REPAIR_START_STABILITY_TIME, 10). +-endif. % TEST + %% API -export([start_link/2, start_link/3, stop/1, ping/1, set_chain_members/2, set_active/2]). @@ -162,6 +189,7 @@ test_react_to_env(Pid) -> %% local projection store. init({MyName, InitMembersDict, MgrOpts}) -> + random:seed(now()), init_remember_partition_hack(), ZeroAll_list = [P#p_srvr.name || {_,P} <- orddict:to_list(InitMembersDict)], ZeroProj = make_none_projection(MyName, ZeroAll_list, InitMembersDict), @@ -223,7 +251,11 @@ handle_call({set_chain_members, MembersDict}, _From, repairing=[], down=NewDown, members_dict=MembersDict}), - S3 = S2#ch_mgr{proj=NewProj}, + %% Reset all flapping state. + NewProj2 = NewProj#projection_v1{dbg=replace(NewProj#projection_v1.dbg, + [make_flapping_i()])}, + S3 = S2#ch_mgr{proj=NewProj2, + proj_history=queue:new()}, {_QQ, S4} = do_react_to_env(S3), {reply, Reply, S4}; handle_call({set_active, Boolean}, _From, #ch_mgr{timer=TRef}=S) -> @@ -255,7 +287,7 @@ handle_call({test_read_latest_public_projection, ReadRepairP}, _From, S) -> Res = {Perhaps, Val, ExtraInfo}, {reply, Res, S2}; handle_call({test_react_to_env}, _From, S) -> - {TODOtodo, S2} = do_react_to_env(S), + {TODOtodo, S2} = do_react_to_env(S), {reply, TODOtodo, S2}; handle_call(_Call, _From, S) -> {reply, whaaaaaaaaaa, S}. @@ -267,7 +299,9 @@ handle_cast(_Cast, S) -> handle_info(tick_check_environment, #ch_mgr{ignore_timer=true}=S) -> {noreply, S}; handle_info(tick_check_environment, S) -> - {{_Delta, Props, _Epoch}, S2} = do_react_to_env(S), + {{_Delta, Props, _Epoch}, S1} = do_react_to_env(S), + S2 = sanitize_repair_state(S1), + S3 = perhaps_start_repair(S2), case proplists:get_value(throttle_seconds, Props) of N when is_integer(N), N > 0 -> %% We are flapping. Set ignore_timer=true and schedule a @@ -276,12 +310,17 @@ handle_info(tick_check_environment, S) -> %% state C200 is ever implemented, then it should be %% implemented via the test_react_to_env style. erlang:send_after(N*1000, self(), stop_ignoring_timer), - {noreply, S#ch_mgr{ignore_timer=true}}; + {noreply, S3#ch_mgr{ignore_timer=true}}; _ -> - {noreply, S2} + {noreply, S3} end; handle_info(stop_ignoring_timer, S) -> {noreply, S#ch_mgr{ignore_timer=false}}; +handle_info({'DOWN',_Ref,process,Worker,Res}, + #ch_mgr{repair_worker=Worker}=S)-> + {noreply, S#ch_mgr{ignore_timer=false, + repair_worker=undefined, + repair_final_status=Res}}; handle_info(Msg, S) -> case get(todo_bummer) of undefined -> io:format("TODO: got ~p\n", [Msg]); _ -> ok @@ -303,12 +342,18 @@ make_none_projection(MyName, All_list, MembersDict) -> machi_projection:new(MyName, MembersDict, UPI_list, Down_list, [], []). get_my_private_proj_boot_info(MgrOpts, DefaultDict, DefaultProj) -> + get_my_proj_boot_info(MgrOpts, DefaultDict, DefaultProj, private). + +get_my_public_proj_boot_info(MgrOpts, DefaultDict, DefaultProj) -> + get_my_proj_boot_info(MgrOpts, DefaultDict, DefaultProj, public). + +get_my_proj_boot_info(MgrOpts, DefaultDict, DefaultProj, ProjType) -> case proplists:get_value(projection_store_registered_name, MgrOpts) of undefined -> {DefaultDict, DefaultProj}; Store -> {ok, P} = machi_projection_store:read_latest_projection(Store, - private), + ProjType), {P#projection_v1.members_dict, P} end. @@ -327,8 +372,11 @@ store_zeroth_projection_maybe(ZeroProj, MgrOpts) -> set_active_timer(#ch_mgr{name=MyName, members_dict=MembersDict}=S) -> FLU_list = [P#p_srvr.name || {_,P} <- orddict:to_list(MembersDict)], - USec = calc_sleep_ranked_order(1000, 2000, MyName, FLU_list), - {ok, TRef} = timer:send_interval(USec, tick_check_environment), + %% Perturb the order a little bit, to avoid near-lock-step + %% operations every few ticks. + MSec = calc_sleep_ranked_order(400, 1500, MyName, FLU_list) + + random:uniform(100), + {ok, TRef} = timer:send_interval(MSec, tick_check_environment), S#ch_mgr{timer=TRef}. do_cl_write_public_proj(Proj, S) -> @@ -392,7 +440,7 @@ do_cl_read_latest_public_projection(ReadRepairP, end. read_latest_projection_call_only(ProjectionType, AllHosed, - #ch_mgr{proj=CurrentProj}=S) -> + #ch_mgr{proj=CurrentProj}=S) -> #projection_v1{all_members=All_list} = CurrentProj, All_queried_list = All_list -- AllHosed, @@ -403,7 +451,6 @@ read_latest_projection_call_only(ProjectionType, AllHosed, Else -> Else end end, -%% io:format(user, "All_queried_list ~p\n", [All_queried_list]), Rs = [perhaps_call_t(S, Partitions, FLU, fun(Pid) -> DoIt(Pid) end) || FLU <- All_queried_list], %% Rs = [perhaps_call_t(S, Partitions, FLU, fun(Pid) -> DoIt(Pid) end) || @@ -508,7 +555,9 @@ calc_projection(#ch_mgr{proj=LastProj, runenv=RunEnv} = S, calc_projection(_OldThreshold, _NoPartitionThreshold, LastProj, RelativeToServer, AllHosed, Dbg, - #ch_mgr{name=MyName, runenv=RunEnv1}=S) -> + #ch_mgr{name=MyName, + runenv=RunEnv1, + repair_final_status=RepairFS}=S) -> #projection_v1{epoch_number=OldEpochNum, members_dict=MembersDict, upi=OldUPI_list, @@ -549,6 +598,11 @@ calc_projection(_OldThreshold, _NoPartitionThreshold, LastProj, if Simulator_p andalso SameEpoch_p -> D_foo=[{repair_airquote_done, {we_agree, (S#ch_mgr.proj)#projection_v1.epoch_number}}], {NewUPI_list ++ [H], T, RunEnv2}; + not Simulator_p + andalso + RepairFS == {repair_final_status, ok} -> + D_foo=[{repair_done, {repair_final_status, ok, (S#ch_mgr.proj)#projection_v1.epoch_number}}], + {NewUPI_list ++ Repairing_list2, [], RunEnv2}; true -> D_foo=[], {NewUPI_list, OldRepairing_list, RunEnv2} @@ -679,7 +733,7 @@ rank_projections(Projs, CurrentProj) -> #projection_v1{all_members=All_list} = CurrentProj, MemberRank = orddict:from_list( lists:zip(All_list, lists:seq(1, length(All_list)))), - N = length(All_list), + N = ?MAX_CHAIN_LENGTH + 1, [{rank_projection(Proj, MemberRank, N), Proj} || Proj <- Projs]. rank_projection(#projection_v1{upi=[]}, _MemberRank, _N) -> @@ -687,7 +741,22 @@ rank_projection(#projection_v1{upi=[]}, _MemberRank, _N) -> rank_projection(#projection_v1{author_server=Author, upi=UPI_list, repairing=Repairing_list}, MemberRank, N) -> - AuthorRank = orddict:fetch(Author, MemberRank), + %% It's possible that there's "cross-talk" across projection + %% stores. For example, we were a chain of [a,b], then the + %% administrator sets a's members_dict to include only a. + %% However, b is still running and has written a public projection + %% suggestion to a, and a has seen it. (Or perhaps b has old + %% chain information from one/many configurations ago, and its + %% projection store was not wiped clean, then b was restarted & + %% begins using its local outdated projection information.) + %% + %% Server b is no longer a member of a's MemberRank scheme, so we + %% need to compensate for this by giving b an extremely low author + %% ranking. + AuthorRank = case orddict:find(Author, MemberRank) of + {ok, Rank} -> Rank; + error -> -(N*N*N*N) + end, AuthorRank + ( N * length(Repairing_list)) + (N*N * length(UPI_list)). @@ -705,9 +774,24 @@ do_set_chain_members_dict(MembersDict, #ch_mgr{proxies_dict=OldProxiesDict}=S)-> {ok, S#ch_mgr{members_dict=MembersDict, proxies_dict=orddict:from_list(Proxies)}}. -do_react_to_env(#ch_mgr{proj=#projection_v1{epoch_number=Epoch, - members_dict=[]}}=S) -> - {{empty_members_dict, [], Epoch}, S}; +do_react_to_env(#ch_mgr{name=MyName, + proj=#projection_v1{epoch_number=Epoch, + members_dict=[]=OldDict}=OldProj, + opts=Opts}=S) -> + %% Read from our local *public* projection store. If some other + %% chain member has written something there, and if we are a + %% member of that chain, then we'll adopt that projection and then + %% start actively humming in that chain. + {NewMembersDict, NewProj} = + get_my_public_proj_boot_info(Opts, OldDict, OldProj), + case orddict:is_key(MyName, NewMembersDict) of + false -> + {{empty_members_dict, [], Epoch}, S}; + true -> + {_, S2} = do_set_chain_members_dict(NewMembersDict, S), + {{empty_members_dict, [], Epoch}, + S2#ch_mgr{proj=NewProj, members_dict=NewMembersDict}} + end; do_react_to_env(S) -> put(react, []), react_to_env_A10(S). @@ -716,11 +800,44 @@ react_to_env_A10(S) -> ?REACT(a10), react_to_env_A20(0, S). -react_to_env_A20(Retries, S) -> +react_to_env_A20(Retries, #ch_mgr{name=MyName}=S) -> ?REACT(a20), init_remember_partition_hack(), {UnanimousTag, P_latest, ReadExtra, S2} = do_cl_read_latest_public_projection(true, S), + LastComplaint = get(rogue_server_epoch), + case orddict:is_key(P_latest#projection_v1.author_server, + S#ch_mgr.members_dict) of + false when P_latest#projection_v1.epoch_number /= LastComplaint -> + put(rogue_server_epoch, P_latest#projection_v1.epoch_number), + Rogue = P_latest#projection_v1.author_server, + error_logger:info_msg("Chain manager ~w found latest public " + "projection ~w has author ~w not a member " + "of our members list ~w. Please check " + "chain membership on this " + "rogue chain manager ~w.\n", + [S#ch_mgr.name, + P_latest#projection_v1.epoch_number, + Rogue, + [K || {K,_} <- orddict:to_list(S#ch_mgr.members_dict)], + Rogue]); + _ -> + ok + end, + case lists:member(MyName, P_latest#projection_v1.all_members) of + false when P_latest#projection_v1.epoch_number /= LastComplaint, + P_latest#projection_v1.all_members /= [] -> + put(rogue_server_epoch, P_latest#projection_v1.epoch_number), + error_logger:info_msg("Chain manager ~p found latest public " + "projection ~p has author ~p has a " + "members list ~p that does not include me.\n", + [S#ch_mgr.name, + P_latest#projection_v1.epoch_number, + P_latest#projection_v1.author_server, + P_latest#projection_v1.all_members]); + _ -> + ok + end, %% The UnanimousTag isn't quite sufficient for our needs. We need %% to determine if *all* of the UPI+Repairing FLUs are members of @@ -1140,10 +1257,10 @@ react_to_env_B10(Retries, P_newprop, P_latest, LatestUnanimousP, {X, S2} = gimme_random_uniform(100, S), if X < 80 -> ?REACT({b10, ?LINE, [flap_stop]}), - ThrottleTime = if FlapLimit < 500 -> 1; - FlapLimit < 1000 -> 5; - FlapLimit < 5000 -> 10; - true -> 30 + ThrottleTime = if P_newprop_flap_count < 500 -> 1; + P_newprop_flap_count < 1000 -> 5; + P_newprop_flap_count < 5000 -> 10; + true -> 30 end, FinalProps = [{my_flap_limit, FlapLimit}, {throttle_seconds, ThrottleTime}], @@ -1429,10 +1546,8 @@ calculate_flaps(P_newprop, _P_current, _FlapLimit, AllHosed = [] end, - FlappingI = {flapping_i, [{flap_count, {NewFlapStart, NewFlaps}}, - {all_hosed, AllHosed}, - {all_flap_counts, lists:sort(AllFlapCounts)}, - {bad,BadFLUs}]}, + FlappingI = make_flapping_i(NewFlapStart, NewFlaps, AllHosed, + AllFlapCounts, BadFLUs), Dbg2 = [FlappingI|P_newprop#projection_v1.dbg], %% TODO: 2015-03-04: I'm growing increasingly suspicious of %% the 'runenv' variable that's threaded through all this code. @@ -1452,6 +1567,15 @@ calculate_flaps(P_newprop, _P_current, _FlapLimit, {machi_projection:update_checksum(P_newprop#projection_v1{dbg=Dbg2}), S#ch_mgr{flaps=NewFlaps, flap_start=NewFlapStart, runenv=RunEnv2}}. +make_flapping_i() -> + make_flapping_i({{epk,-1},?NOT_FLAPPING}, 0, [], [], []). + +make_flapping_i(NewFlapStart, NewFlaps, AllHosed, AllFlapCounts, BadFLUs) -> + {flapping_i, [{flap_count, {NewFlapStart, NewFlaps}}, + {all_hosed, AllHosed}, + {all_flap_counts, lists:sort(AllFlapCounts)}, + {bad,BadFLUs}]}. + projection_transitions_are_sane(Ps, RelativeToServer) -> projection_transitions_are_sane(Ps, RelativeToServer, false). @@ -1717,7 +1841,8 @@ sleep_ranked_order(MinSleep, MaxSleep, FLU, FLU_list) -> USec. calc_sleep_ranked_order(MinSleep, MaxSleep, FLU, FLU_list) -> - Front = lists:takewhile(fun(X) -> X /= FLU end, lists:sort(FLU_list)), + Front = lists:takewhile(fun(X) -> X /= FLU end, + lists:reverse(lists:sort(FLU_list))), Index = length(Front), NumNodes = length(FLU_list), SleepChunk = if NumNodes == 0 -> 0; @@ -1800,6 +1925,73 @@ make_chmgr_regname(B) when is_binary(B) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +perhaps_start_repair( + #ch_mgr{name=MyName, + repair_worker=undefined, + proj=#projection_v1{creation_time=Start, + upi=[_|_]=UPI, + repairing=[_|_]}}=S) -> + RepairId = {MyName, os:timestamp()}, + RepairOpts = [{repair_mode, repair}, verbose, {repair_id, RepairId}], + %% RepairOpts = [{repair_mode, check}, verbose], + RepairFun = fun() -> do_repair(S, RepairOpts, ap_mode) end, + LastUPI = lists:last(UPI), + case timer:now_diff(os:timestamp(), Start) div 1000000 of + N when MyName == LastUPI, + N >= ?REPAIR_START_STABILITY_TIME -> + {WorkerPid, _Ref} = spawn_monitor(RepairFun), + S#ch_mgr{repair_worker=WorkerPid, + repair_start=os:timestamp(), + repair_final_status=undefined}; + _ -> + S + end; +perhaps_start_repair(S) -> + S. + +do_repair( + #ch_mgr{name=MyName, + proj=#projection_v1{upi=UPI, + repairing=[_|_]=Repairing, + members_dict=MembersDict}}=_S_copy, + Opts, ap_mode=RepairMode) -> + T1 = os:timestamp(), + RepairId = proplists:get_value(repair_id, Opts, id1), + error_logger:info_msg("Repair start: tail ~p of ~p -> ~p, ~p ID ~w\n", + [MyName, UPI, Repairing, RepairMode, RepairId]), + + ETS = ets:new(repair_stats, [private, set]), + ETS_T_Keys = [t_in_files, t_in_chunks, t_in_bytes, + t_out_files, t_out_chunks, t_out_bytes, + t_bad_chunks, t_elapsed_seconds], + [ets:insert(ETS, {K, 0}) || K <- ETS_T_Keys], + + Res = machi_chain_repair:repair(ap_mode, MyName, Repairing, UPI, + MembersDict, ETS, Opts), + T2 = os:timestamp(), + Elapsed = (timer:now_diff(T2, T1) div 1000) / 1000, + ets:insert(ETS, {t_elapsed_seconds, Elapsed}), + Summary = case Res of ok -> "success"; + _ -> "FAILURE" + end, + Stats = [{K, ets:lookup_element(ETS, K, 2)} || K <- ETS_T_Keys], + error_logger:info_msg("Repair ~s: tail ~p of ~p finished ~p repair ID ~w: " + "~w\nStats ~w\n", + [Summary, MyName, UPI, RepairMode, RepairId, + Res, Stats]), + ets:delete(ETS), + exit({repair_final_status, Res}). + +sanitize_repair_state(#ch_mgr{repair_final_status=Res, + proj=#projection_v1{upi=[_|_]}}=S) + when Res /= undefined -> + S#ch_mgr{repair_worker=undefined, repair_start=undefined, + repair_final_status=undefined}; +sanitize_repair_state(S) -> + S. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + perhaps_call_t(S, Partitions, FLU, DoIt) -> try perhaps_call(S, Partitions, FLU, DoIt) diff --git a/src/machi_chain_repair.erl b/src/machi_chain_repair.erl new file mode 100644 index 0000000..1ef772f --- /dev/null +++ b/src/machi_chain_repair.erl @@ -0,0 +1,385 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +%% @doc Erlang API for the Machi FLU TCP protocol version 1, with a +%% proxy-process style API for hiding messy details such as TCP +%% connection/disconnection with the remote Machi server. +%% +%% Machi is intentionally avoiding using distributed Erlang for +%% Machi's communication. This design decision makes Erlang-side code +%% more difficult & complex, but it's the price to pay for some +%% language independence. Later in Machi's life cycle, we need to +%% (re-)implement some components in a non-Erlang/BEAM-based language. +%% +%% This module implements a "man in the middle" proxy between the +%% Erlang client and Machi server (which is on the "far side" of a TCP +%% connection to somewhere). This proxy process will always execute +%% on the same Erlang node as the Erlang client that uses it. The +%% proxy is intended to be a stable, long-lived process that survives +%% TCP communication problems with the remote server. + +-module(machi_chain_repair). + +-include("machi_projection.hrl"). + +-define(SHORT_TIMEOUT, 5*1000). +-define(LONG_TIMEOUT, 60*1000). +-define(MAX_OFFSET, 999*1024*1024*1024*1024*1024*1024*1024). + +%% These macros assume there's a bound variable called Verb. +-define(VERB(Fmt), if Verb -> io:format(Fmt ); true -> ok end). +-define(VERB(Fmt, Args), if Verb -> io:format(Fmt, Args); true -> ok end). + +-ifdef(TEST). +-compile(export_all). +-endif. % TEST + +-export([repair/7]). + +repair_cp(_Src, _Dst, _MembersDict, _Opts) -> + %% TODO: add missing function: wipe away any trace of chunks + %% are present on Dst but missing on Src. + exit(todo_cp_mode). + +repair(ap_mode=ConsistencyMode, Src, Repairing, UPI, MembersDict, ETS, Opts) -> + %% Use process dict so that 'after' clause can always quit all + %% proxy pids. + put(proxies_dict, orddict:new()), + Add = fun(Name, Pid) -> put(proxies_dict, orddict:store(Name, Pid, get(proxies_dict))) end, + OurFLUs = lists:usort([Src] ++ Repairing ++ UPI), % AP assumption! + RepairMode = proplists:get_value(repair_mode, Opts, repair), + Verb = proplists:get_value(verbose, Opts, true), + Res = try + [begin + {ok, Proxy} = machi_proxy_flu1_client:start_link(P), + Add(FLU, Proxy) + end || {FLU,P} <- MembersDict, lists:member(FLU, OurFLUs)], + ProxiesDict = get(proxies_dict), + + D = dict:new(), + D2 = lists:foldl(fun({FLU, Proxy}, Dict) -> + get_file_lists(Proxy, FLU, Dict) + end, D, ProxiesDict), + MissingFileSummary = make_missing_file_summary(D2, OurFLUs), + ?VERB("MissingFileSummary ~p\n", [MissingFileSummary]), + + [ets:insert(ETS, {{directive_bytes, FLU}, 0}) || FLU <- OurFLUs], + %% Repair files from perspective of Src, i.e. tail(UPI). + SrcProxy = orddict:fetch(Src, ProxiesDict), + {ok, EpochID} = machi_proxy_flu1_client:get_epoch_id( + SrcProxy, ?SHORT_TIMEOUT), + ?VERB("Make repair directives: "), + Ds = + [{File, make_repair_directives( + ConsistencyMode, RepairMode, File, Size, EpochID, + Verb, + Src, OurFLUs, ProxiesDict, ETS)} || + {File, {Size, _MissingList}} <- MissingFileSummary], + ?VERB(" done\n"), + [begin + [{_, Bytes}] = ets:lookup(ETS, {directive_bytes, FLU}), + ?VERB("Out-of-sync data for FLU ~p: ~s MBytes\n", + [FLU, mbytes(Bytes)]) + end || FLU <- OurFLUs], + + ?VERB("Execute repair directives: "), + ok = execute_repair_directives(ConsistencyMode, Ds, Src, EpochID, + Verb, OurFLUs, ProxiesDict, ETS), + ?VERB(" done\n"), + ok + catch + What:Why -> + Stack = erlang:get_stacktrace(), + io:format(user, "What Why ~p ~p @\n\t~p\n", + [What, Why, Stack]), + {error, {What, Why, Stack}} + after + [(catch machi_proxy_flu1_client:quit(Pid)) || + Pid <- orddict:to_list(get(proxies_dict))] + end, + Res. + +%% Create a list of servers where the file is completely missing. +%% In the "demo day" implementation and in an early integration WIP, +%% this was a useful thing. TODO: Can this be removed? + +make_missing_file_summary(Dict, AllFLUs) -> + %% FileFilterFun = fun(_) -> true end, + FoldRes = lists:sort(dict:to_list(Dict)), + %% NOTE: MissingFileSummary = [{File, {FileSize, ServersMissingFrom}}] + MissingFileSummary = + [begin + {GotIt, Sizes} = lists:unzip(GotSizes), + Size = lists:max(Sizes), + Missing = {File, {Size, AllFLUs -- GotIt}}, + Missing + end || {File, GotSizes} <- FoldRes %% , FileFilterFun(File) + ], + MissingFileSummary. + +get_file_lists(Proxy, FLU_name, D) -> + {ok, Res} = machi_proxy_flu1_client:list_files(Proxy, ?DUMMY_PV1_EPOCH, + ?SHORT_TIMEOUT), + lists:foldl(fun({Size, File}, Dict) -> + dict:append(File, {FLU_name, Size}, Dict) + end, D, Res). + +%% Wow, it's so easy to bikeshed this into a 1 year programming exercise. +%% +%% TODO: There are a lot of areas for exploiting parallelism here. +%% I've set the bikeshed aside for now, but "make repair faster" has a +%% lot of room for exploiting concurrency, overlapping reads & writes, +%% etc etc. There are also lots of different trade-offs to make with +%% regard to RAM use vs. disk use. +%% +%% TODO: There's no reason why repair can't be done 1).in parallel +%% across multiple repairees, and/or 2). with multiple byte ranges in +%% the same file, and/or 3). with bigger chunks. +%% +%% 1. Optimization +%% 2. Optimization +%% 3. Optimization, but it would be the easiest to implement, e.g. use +%% constant-sized 4MB chunks. Unfortuntely, it would also destroy +%% the ability to verify here that the chunk checksums are correct +%% *and* also propagate the correct checksum metadata to the +%% destination FLU. +%% As an additional optimization, add a bit of #2 to start the next +%% read while the current write is still in progress. +%% +%% Most/all of this could be executed in parallel on each FLU relative to +%% its own files. Then, in another TODO option, perhaps build a Merkle tree +%% or other summary of the local files & send that data structure to the +%% repair coordinator. +%% +%% Also, as another TODO note, repair_both_present() in the +%% prototype/demo-day code uses an optimization of calculating the MD5 +%% checksum of the chunk checksum data as it arrives, and if the two MD5s +%% match, then we consider the two files in sync. If there isn't a match, +%% then we sort the lines and try another MD5, and if they match, then we're +%% in sync. In theory, that's lower overhead than the procedure used here. +%% +%% NOTE that one reason I chose the "directives list" method is to have an +%% option, later, of choosing to repair a subset of repairee FLUs if there +%% is a big discrepency between out of sync files: e.g., if FLU x has N +%% bytes out of sync but FLU y has 50N bytes out of sync, then it's likely +%% better to repair x only so that x can return to the UPI list quickly. +%% Also, in the event that all repairees are roughly comparably out of sync, +%% then the repair network traffic can be minimized by reading each chunk +%% only once. + +make_repair_compare_fun(SrcFLU) -> + fun({{Offset_X, _Sz_a, _Cs_a, FLU_a}, _N_a}, + {{Offset_X, _Sz_b, _CS_b, FLU_b}, _N_b}) -> + %% The repair source FLU always sorts less/earlier than anything else. + if FLU_a == SrcFLU -> + true; + FLU_b == SrcFLU -> + false; + true -> + %% Implicitly, smallest offset first. + %% Secondarily (and implicitly), sort smallest chunk size first + FLU_a < FLU_b + end; + (T_a, T_b) -> + %% See implicitly comments above + T_a =< T_b + end. + +make_repair_directives(ConsistencyMode, RepairMode, File, Size, EpochID, + Verb, Src, FLUs0, ProxiesDict, ETS) -> + true = (Size < ?MAX_OFFSET), + FLUs = lists:usort(FLUs0), + C0 = [begin + %% erlang:garbage_collect(), + Proxy = orddict:fetch(FLU, ProxiesDict), + OffSzCs = case machi_proxy_flu1_client:checksum_list( + Proxy, EpochID, File, ?LONG_TIMEOUT) of + {ok, X} -> X; + {error, no_such_file} -> [] + end, + [{?MAX_OFFSET, 0, <<>>, FLU}] % our end-of-file marker + ++ + [{Off, Sz, Cs, FLU} || {Off, Sz, Cs} <- OffSzCs] + end || FLU <- FLUs], + C1 = lists:append(C0), + %% erlang:garbage_collect(), + C2 = lists:sort(make_repair_compare_fun(Src), C1), + %% erlang:garbage_collect(), + Ds = make_repair_directives2(C2, ConsistencyMode, RepairMode, + File, Verb, Src, FLUs, ProxiesDict, ETS), + Ds. + +make_repair_directives2(C2, ConsistencyMode, RepairMode, + File, Verb, Src, FLUs, ProxiesDict, ETS) -> + ?VERB("."), + make_repair_directives3(C2, ConsistencyMode, RepairMode, + File, Verb, Src, FLUs, ProxiesDict, ETS, []). + +make_repair_directives3([{?MAX_OFFSET, 0, <<>>, _FLU}|_Rest], + _ConsistencyMode, _RepairMode, + _File, _Verb, _Src, _FLUs, _ProxiesDict, _ETS, Acc) -> + lists:reverse(Acc); +make_repair_directives3([{Offset, Size, CSum, _FLU}=A|Rest0], + ConsistencyMode, RepairMode, + File, Verb, Src, FLUs, ProxiesDict, ETS, Acc) -> + {As0, Rest1} = take_same_offset_size(Rest0, Offset, Size), + As = [A|As0], + %% Sanity checking time + case lists:all(fun({_, _, Cs, _}) when Cs == CSum -> true; + (_) -> false + end, As) of + true -> + ok; + false -> + %% TODO: Pathology: someone has the wrong checksum. + %% 1. Fetch Src's chunk. If checksum is valid, use this chunk + %% to repair any invalid value. + %% 2. If Src's chunk is invalid, then check for other copies + %% in the UPI. If there is a valid chunk there, use it to + %% repair any invalid value. + %% 3a. If there is no valid UPI chunk, then delete this + %% byte range from all FLUs + %% 3b. Log big warning about data loss. + %% 4. Log any other checksum discrepencies as they are found. + exit({todo_repair_sanity_check, ?LINE, File, Offset, As}) + end, + %% List construction guarantees us that there's at least one ?MAX_OFFSET + %% item remains. Sort order + our "taking" of all exact Offset+Size + %% tuples guarantees that if there's a disagreement about chunk size at + %% this offset, we can look ahead exactly one to see if there is sanity + %% or not. + [{Offset_next, _Size_next, _, _}=A_next|_] = Rest1, + if Offset + Size =< Offset_next -> + ok; + true -> + exit({todo_repair_sanity_check, ?LINE, File, Offset, Size, + next_is, A_next}) + end, + Do = if ConsistencyMode == ap_mode -> + Gots = [FLU || {_Off, _Sz, _Cs, FLU} <- As], + Missing = FLUs -- Gots, + _ThisSrc = case lists:member(Src, Gots) of + true -> Src; + false -> hd(Gots) + end, + [ets:update_counter(ETS, {directive_bytes, FLU_m}, Size) || + FLU_m <- Missing], + if Missing == [] -> + noop; + true -> + {copy, A, Missing} + end; + ConsistencyMode == cp_mode -> + exit({todo_cp_mode, ?MODULE, ?LINE}) + end, + Acc2 = if Do == noop -> Acc; + true -> [Do|Acc] + end, + make_repair_directives3(Rest1, + ConsistencyMode, RepairMode, + File, Verb, Src, FLUs, ProxiesDict, ETS, Acc2). + +take_same_offset_size(L, Offset, Size) -> + take_same_offset_size(L, Offset, Size, []). + +take_same_offset_size([{Offset, Size, _CSum, _FLU}=A|Rest], Offset, Size, Acc) -> + take_same_offset_size(Rest, Offset, Size, [A|Acc]); +take_same_offset_size(Rest, _Offset, _Size, Acc) -> + {Acc, Rest}. + +execute_repair_directives(ap_mode=_ConsistencyMode, Ds, _Src, EpochID, Verb, + _OurFLUs, ProxiesDict, ETS) -> + {_,_,_,_} = lists:foldl(fun execute_repair_directive/2, + {ProxiesDict, EpochID, Verb, ETS}, Ds), + ok. + +execute_repair_directive({File, Cmds}, {ProxiesDict, EpochID, Verb, ETS}=Acc) -> + EtsKeys = [{in_files, t_in_files}, {in_chunks, t_in_chunks}, + {in_bytes, t_in_bytes}, {out_files, t_out_files}, + {out_chunks, t_out_chunks}, {out_bytes, t_out_bytes}], + [ets:insert(ETS, {L_K, 0}) || {L_K, _T_K} <- EtsKeys], + F = fun({copy, {Offset, Size, CSum, MySrc}, MyDsts}, Acc2) -> + SrcP = orddict:fetch(MySrc, ProxiesDict), + case ets:lookup_element(ETS, in_chunks, 2) rem 100 of + 0 -> ?VERB(".", []); + _ -> ok + end, + _T1 = os:timestamp(), + {ok, Chunk} = machi_proxy_flu1_client:read_chunk( + SrcP, EpochID, File, Offset, Size, + ?SHORT_TIMEOUT), + _T2 = os:timestamp(), + case machi_util:checksum_chunk(Chunk) of + CSum_now when CSum_now == CSum -> + [begin + DstP = orddict:fetch(DstFLU, ProxiesDict), + _T3 = os:timestamp(), + ok = machi_proxy_flu1_client:write_chunk( + DstP, EpochID, File, Offset, Chunk, + ?SHORT_TIMEOUT), + _T4 = os:timestamp() + end || DstFLU <- MyDsts], + ets:update_counter(ETS, in_chunks, 1), + ets:update_counter(ETS, in_bytes, Size), + N = length(MyDsts), + ets:update_counter(ETS, out_chunks, N), + ets:update_counter(ETS, out_bytes, N*Size), + Acc2; + CSum_now -> + error_logger:error_msg( + "TODO: Checksum failure: " + "file ~p offset ~p size ~p: " + "expected ~p got ~p\n", + [File, Offset, Size, CSum, CSum_now]), + case ets:update_counter(ETS, t_bad_chunks, 1) of + N when N > 100 -> + throw(todo_wow_so_many_errors_so_verbose); + _ -> + ok + end, + Acc2 + end + end, + ok = lists:foldl(F, ok, Cmds), + %% Copy this file's stats to the total counts. + [ets:update_counter(ETS, T_K, ets:lookup_element(ETS, L_K, 2)) || + {L_K, T_K} <- EtsKeys], + Acc. + +mbytes(0) -> + "0.0"; +mbytes(Size) -> + lists:flatten(io_lib:format("~.1.0f", [max(0.1, Size / (1024*1024))])). + +-ifdef(TEST). + +repair_compare_fun_test() -> + F = make_repair_compare_fun(b), + List = [{{1,10,x,b},y},{{50,10,x,a},y},{{50,10,x,b},y},{{50,10,x,c},y},{{90,10,x,d},y}], + Input = lists:reverse(lists:sort(List)), + %% Although the merge func should never have two of the same FLU + %% represented, it doesn't matter for the purposes of this test. + %% 1. Smaller offset (element #1) wins, else... + %% 2. The FLU (element #2) that's the repair source always wins, else... + %% 3. The FLU with smallest name wins. + Expect = [{{1,10,x,b},y},{{50,10,x,b},y},{{50,10,x,a},y},{{50,10,x,c},y},{{90,10,x,d},y}], + Expect = lists:sort(F, Input). + +-endif. % TEST diff --git a/src/machi_flu1.erl b/src/machi_flu1.erl index 47bd52a..a830c18 100644 --- a/src/machi_flu1.erl +++ b/src/machi_flu1.erl @@ -114,30 +114,50 @@ main2(FluName, TcpPort, DataDir, Rest) -> DPs -> {lists:keydelete(dbg, 1, Rest), DPs} end, + {SendAppendPidToProj_p, ProjectionPid} = + case proplists:get_value(projection_store_registered_name, Rest) of + undefined -> + RN = make_projection_server_regname(FluName), + {ok, PP} = + machi_projection_store:start_link(RN, DataDir, undefined), + {true, PP}; + RN -> + {false, whereis(RN)} + end, + InitialWedged_p = proplists:get_value(initial_wedged, DbgProps), + ProjRes = machi_projection_store:read_latest_projection(ProjectionPid, + private), + {Wedged_p, EpochId} = + if InitialWedged_p == undefined, + is_tuple(ProjRes), element(1, ProjRes) == ok -> + {ok, Proj} = ProjRes, + {false, {Proj#projection_v1.epoch_number, + Proj#projection_v1.epoch_csum}}; + InitialWedged_p == false -> + {false, ?DUMMY_PV1_EPOCH}; + true -> + {true, undefined} + end, S0 = #state{flu_name=FluName, + proj_store=ProjectionPid, tcp_port=TcpPort, data_dir=DataDir, - wedged=proplists:get_value(initial_wedged, DbgProps, true), + wedged=Wedged_p, etstab=ets_table_name(FluName), - epoch_id=undefined, + epoch_id=EpochId, dbg_props=DbgProps, props=Props}, AppendPid = start_append_server(S0, self()), receive append_server_ack -> ok end, - {_ProjRegName, ProjectionPid} = - case proplists:get_value(projection_store_registered_name, Rest) of - undefined -> - RN = make_projection_server_regname(FluName), - {ok, PP} = - machi_projection_store:start_link(RN, DataDir, AppendPid), - {RN, PP}; - RN -> - {RN, whereis(RN)} - end, - S1 = S0#state{append_pid=AppendPid, - proj_store=ProjectionPid}, + if SendAppendPidToProj_p -> + machi_projection_store:set_wedge_notify_pid(ProjectionPid, + AppendPid); + true -> + ok + end, + S1 = S0#state{append_pid=AppendPid}, ListenPid = start_listen_server(S1), Config_e = machi_util:make_config_filename(DataDir, "unused"), @@ -174,13 +194,15 @@ run_listen_server(#state{flu_name=FluName, tcp_port=TcpPort}=S) -> {ok, LSock} = gen_tcp:listen(TcpPort, SockOpts), listen_server_loop(LSock, S). -run_append_server(FluPid, AckPid, #state{flu_name=Name,dbg_props=DbgProps}=S) -> +run_append_server(FluPid, AckPid, #state{flu_name=Name, + wedged=Wedged_p,epoch_id=EpochId}=S) -> %% Reminder: Name is the "main" name of the FLU, i.e., no suffix register(Name, self()), TID = ets:new(ets_table_name(Name), [set, protected, named_table, {read_concurrency, true}]), - InitialWedged = proplists:get_value(initial_wedged, DbgProps, true), - ets:insert(TID, {epoch, {InitialWedged, {-65, <<"bogus epoch, yo">>}}}), + %% InitialWedged = proplists:get_value(initial_wedged, DbgProps, true), + %% ets:insert(TID, {epoch, {InitialWedged, {-65, <<"bogus epoch, yo">>}}}), + ets:insert(TID, {epoch, {Wedged_p, EpochId}}), AckPid ! append_server_ack, append_server_loop(FluPid, S#state{etstab=TID}). @@ -192,13 +214,13 @@ listen_server_loop(LSock, S) -> append_server_loop(FluPid, #state{data_dir=DataDir,wedged=Wedged_p}=S) -> AppendServerPid = self(), receive - {seq_append, From, _Prefix, _Chunk, _CSum} when Wedged_p -> + {seq_append, From, _Prefix, _Chunk, _CSum, _Extra} when Wedged_p -> From ! wedged, append_server_loop(FluPid, S); - {seq_append, From, Prefix, Chunk, CSum} -> - spawn(fun() -> append_server_dispatch(From, Prefix, Chunk, CSum, - DataDir, AppendServerPid) end), - %% DataDir, FluPid) end), + {seq_append, From, Prefix, Chunk, CSum, Extra} -> + spawn(fun() -> append_server_dispatch(From, Prefix, + Chunk, CSum, Extra, + DataDir, AppendServerPid) end), append_server_loop(FluPid, S); {wedge_state_change, Boolean, EpochId} -> true = ets:insert(S#state.etstab, {epoch, {Boolean, EpochId}}), @@ -213,14 +235,19 @@ append_server_loop(FluPid, #state{data_dir=DataDir,wedged=Wedged_p}=S) -> append_server_loop(FluPid, S) end. --define(EpochIDSpace, (4+20)). +-define(EpochIDSpace, ((4*2)+(20*2))). % hexencodingwhee! + +decode_epoch_id(EpochIDHex) -> + <> = + machi_util:hexstr_to_bin(EpochIDHex), + {EpochNum, EpochCSum}. net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) -> ok = inet:setopts(Sock, [{packet, line}]), case gen_tcp:recv(Sock, 0, 600*1000) of {ok, Line} -> %% machi_util:verb("Got: ~p\n", [Line]), - PrefixLenLF = byte_size(Line) - 2 - ?EpochIDSpace - 8 - 1, + PrefixLenLF = byte_size(Line) - 2 - ?EpochIDSpace - 8 -8 - 1, FileLenLF = byte_size(Line) - 2 - ?EpochIDSpace - 16 - 8 - 1, CSumFileLenLF = byte_size(Line) - 2 - ?EpochIDSpace - 1, WriteFileLenLF = byte_size(Line) - 7 - ?EpochIDSpace - 16 - 8 - 1, @@ -228,21 +255,26 @@ net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) -> case Line of %% For normal use <<"A ", - _EpochIDRaw:(?EpochIDSpace)/binary, - LenHex:8/binary, + EpochIDHex:(?EpochIDSpace)/binary, + LenHex:8/binary, ExtraHex:8/binary, Prefix:PrefixLenLF/binary, "\n">> -> - do_net_server_append(FluName, Sock, LenHex, Prefix); + _EpochID = decode_epoch_id(EpochIDHex), + do_net_server_append(FluName, Sock, LenHex, ExtraHex, + Prefix); <<"R ", - EpochIDRaw:(?EpochIDSpace)/binary, + EpochIDHex:(?EpochIDSpace)/binary, OffsetHex:16/binary, LenHex:8/binary, File:FileLenLF/binary, "\n">> -> + EpochID = decode_epoch_id(EpochIDHex), do_net_server_read(Sock, OffsetHex, LenHex, File, DataDir, - EpochIDRaw, S); - <<"L ", _EpochIDRaw:(?EpochIDSpace)/binary, "\n">> -> + EpochID, S); + <<"L ", EpochIDHex:(?EpochIDSpace)/binary, "\n">> -> + _EpochID = decode_epoch_id(EpochIDHex), do_net_server_listing(Sock, DataDir, S); <<"C ", - _EpochIDRaw:(?EpochIDSpace)/binary, + EpochIDHex:(?EpochIDSpace)/binary, File:CSumFileLenLF/binary, "\n">> -> + _EpochID = decode_epoch_id(EpochIDHex), do_net_server_checksum_listing(Sock, File, DataDir, S); <<"QUIT\n">> -> catch gen_tcp:close(Sock), @@ -252,20 +284,23 @@ net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) -> exit(normal); %% For "internal" replication only. <<"W-repl ", - _EpochIDRaw:(?EpochIDSpace)/binary, + EpochIDHex:(?EpochIDSpace)/binary, OffsetHex:16/binary, LenHex:8/binary, File:WriteFileLenLF/binary, "\n">> -> + _EpochID = decode_epoch_id(EpochIDHex), do_net_server_write(Sock, OffsetHex, LenHex, File, DataDir, <<"fixme1">>, false, <<"fixme2">>); %% For data migration only. <<"DEL-migration ", - _EpochIDRaw:(?EpochIDSpace)/binary, + EpochIDHex:(?EpochIDSpace)/binary, File:DelFileLenLF/binary, "\n">> -> + _EpochID = decode_epoch_id(EpochIDHex), do_net_server_delete_migration_only(Sock, File, DataDir); %% For erasure coding hackityhack <<"TRUNC-hack--- ", - _EpochIDRaw:(?EpochIDSpace)/binary, + EpochIDHex:(?EpochIDSpace)/binary, File:DelFileLenLF/binary, "\n">> -> + _EpochID = decode_epoch_id(EpochIDHex), do_net_server_truncate_hackityhack(Sock, File, DataDir); <<"PROJ ", LenHex:8/binary, "\n">> -> do_projection_command(Sock, LenHex, S); @@ -273,6 +308,7 @@ net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) -> do_wedge_status(FluName, Sock); _ -> machi_util:verb("Else Got: ~p\n", [Line]), + io:format(user, "TODO: Else Got: ~p\n", [Line]), gen_tcp:send(Sock, "ERROR SYNTAX\n"), catch gen_tcp:close(Sock), exit(normal) @@ -283,16 +319,16 @@ net_server_loop(Sock, #state{flu_name=FluName, data_dir=DataDir}=S) -> exit(normal) end. -append_server_dispatch(From, Prefix, Chunk, CSum, DataDir, LinkPid) -> +append_server_dispatch(From, Prefix, Chunk, CSum, Extra, DataDir, LinkPid) -> Pid = write_server_get_pid(Prefix, DataDir, LinkPid), - Pid ! {seq_append, From, Prefix, Chunk, CSum}, + Pid ! {seq_append, From, Prefix, Chunk, CSum, Extra}, exit(normal). -do_net_server_append(FluName, Sock, LenHex, Prefix) -> +do_net_server_append(FluName, Sock, LenHex, ExtraHex, Prefix) -> %% TODO: robustify against other invalid path characters such as NUL case sanitize_file_string(Prefix) of ok -> - do_net_server_append2(FluName, Sock, LenHex, Prefix); + do_net_server_append2(FluName, Sock, LenHex, ExtraHex, Prefix); _ -> ok = gen_tcp:send(Sock, <<"ERROR BAD-ARG">>) end. @@ -305,13 +341,14 @@ sanitize_file_string(Str) -> error end. -do_net_server_append2(FluName, Sock, LenHex, Prefix) -> +do_net_server_append2(FluName, Sock, LenHex, ExtraHex, Prefix) -> <> = machi_util:hexstr_to_bin(LenHex), + <> = machi_util:hexstr_to_bin(ExtraHex), ok = inet:setopts(Sock, [{packet, raw}]), {ok, Chunk} = gen_tcp:recv(Sock, Len, 60*1000), CSum = machi_util:checksum_chunk(Chunk), try - FluName ! {seq_append, self(), Prefix, Chunk, CSum} + FluName ! {seq_append, self(), Prefix, Chunk, CSum, Extra} catch error:badarg -> error_logger:error_msg("Message send to ~p gave badarg, make certain server is running with correct registered name\n", [?MODULE]) end, @@ -348,7 +385,7 @@ do_wedge_status(FluName, Sock) -> ok = gen_tcp:send(Sock, Reply). do_net_server_read(Sock, OffsetHex, LenHex, FileBin, DataDir, - EpochIDRaw, S) -> + EpochID, S) -> {Wedged_p, CurrentEpochId} = ets:lookup_element(S#state.etstab, epoch, 2), DoItFun = fun(FH, Offset, Len) -> case file:pread(FH, Offset, Len) of @@ -368,16 +405,16 @@ do_net_server_read(Sock, OffsetHex, LenHex, FileBin, DataDir, end, do_net_server_readwrite_common(Sock, OffsetHex, LenHex, FileBin, DataDir, [read, binary, raw], DoItFun, - EpochIDRaw, Wedged_p, CurrentEpochId). + EpochID, Wedged_p, CurrentEpochId). do_net_server_readwrite_common(Sock, OffsetHex, LenHex, FileBin, DataDir, FileOpts, DoItFun, - EpochIDRaw, Wedged_p, CurrentEpochId) -> + EpochID, Wedged_p, CurrentEpochId) -> case {Wedged_p, sanitize_file_string(FileBin)} of {false, ok} -> do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin, DataDir, FileOpts, DoItFun, - EpochIDRaw, Wedged_p, CurrentEpochId); + EpochID, Wedged_p, CurrentEpochId); {true, _} -> ok = gen_tcp:send(Sock, <<"ERROR WEDGED\n">>); {_, __} -> @@ -386,7 +423,7 @@ do_net_server_readwrite_common(Sock, OffsetHex, LenHex, FileBin, DataDir, do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin, DataDir, FileOpts, DoItFun, - EpochIDRaw, Wedged_p, CurrentEpochId) -> + EpochID, Wedged_p, CurrentEpochId) -> <> = machi_util:hexstr_to_bin(OffsetHex), <> = machi_util:hexstr_to_bin(LenHex), {_, Path} = machi_util:make_data_filename(DataDir, FileBin), @@ -402,7 +439,7 @@ do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin, DataDir, do_net_server_readwrite_common( Sock, OffsetHex, LenHex, FileBin, DataDir, FileOpts, DoItFun, - EpochIDRaw, Wedged_p, CurrentEpochId); + EpochID, Wedged_p, CurrentEpochId); _Else -> %%%%%% keep?? machi_util:verb("Else ~p ~p ~p ~p\n", [Offset, Len, Path, _Else]), ok = gen_tcp:send(Sock, <<"ERROR BAD-IO\n">>) @@ -410,20 +447,20 @@ do_net_server_readwrite_common2(Sock, OffsetHex, LenHex, FileBin, DataDir, do_net_server_write(Sock, OffsetHex, LenHex, FileBin, DataDir, - EpochIDRaw, Wedged_p, CurrentEpochId) -> + EpochID, Wedged_p, CurrentEpochId) -> CSumPath = machi_util:make_checksum_filename(DataDir, FileBin), case file:open(CSumPath, [append, raw, binary, delayed_write]) of {ok, FHc} -> do_net_server_write2(Sock, OffsetHex, LenHex, FileBin, DataDir, FHc, - EpochIDRaw, Wedged_p, CurrentEpochId); + EpochID, Wedged_p, CurrentEpochId); {error, enoent} -> ok = filelib:ensure_dir(CSumPath), do_net_server_write(Sock, OffsetHex, LenHex, FileBin, DataDir, - EpochIDRaw, Wedged_p, CurrentEpochId) + EpochID, Wedged_p, CurrentEpochId) end. do_net_server_write2(Sock, OffsetHex, LenHex, FileBin, DataDir, FHc, - EpochIDRaw, Wedged_p, CurrentEpochId) -> + EpochID, Wedged_p, CurrentEpochId) -> DoItFun = fun(FHd, Offset, Len) -> ok = inet:setopts(Sock, [{packet, raw}]), {ok, Chunk} = gen_tcp:recv(Sock, Len), @@ -443,7 +480,7 @@ do_net_server_write2(Sock, OffsetHex, LenHex, FileBin, DataDir, FHc, end, do_net_server_readwrite_common(Sock, OffsetHex, LenHex, FileBin, DataDir, [write, read, binary, raw], DoItFun, - EpochIDRaw, Wedged_p, CurrentEpochId). + EpochID, Wedged_p, CurrentEpochId). perhaps_do_net_server_ec_read(Sock, FH) -> case file:pread(FH, 0, ?MINIMUM_OFFSET) of @@ -673,8 +710,12 @@ seq_append_server_loop(DataDir, Prefix, _File, {FHd,FHc}, FileNum, Offset) run_seq_append_server2(Prefix, DataDir); seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}=FH_, FileNum, Offset) -> receive - {seq_append, From, Prefix, Chunk, CSum} -> - ok = file:pwrite(FHd, Offset, Chunk), + {seq_append, From, Prefix, Chunk, CSum, Extra} -> + if Chunk /= <<>> -> + ok = file:pwrite(FHd, Offset, Chunk); + true -> + ok + end, From ! {assignment, Offset, File}, Len = byte_size(Chunk), OffsetHex = machi_util:bin_to_hexstr(<>), @@ -683,7 +724,7 @@ seq_append_server_loop(DataDir, Prefix, File, {FHd,FHc}=FH_, FileNum, Offset) -> CSum_info = [OffsetHex, 32, LenHex, 32, CSumHex, 10], ok = file:write(FHc, CSum_info), seq_append_server_loop(DataDir, Prefix, File, FH_, - FileNum, Offset + Len); + FileNum, Offset + Len + Extra); {sync_stuff, FromPid, Ref} -> file:sync(FHc), FromPid ! {sync_finished, Ref}, diff --git a/src/machi_flu1_client.erl b/src/machi_flu1_client.erl index 1311d74..80e1b6a 100644 --- a/src/machi_flu1_client.erl +++ b/src/machi_flu1_client.erl @@ -28,6 +28,7 @@ -export([ %% File API append_chunk/4, append_chunk/5, + append_chunk_extra/5, append_chunk_extra/6, read_chunk/5, read_chunk/6, checksum_list/3, checksum_list/4, list_files/2, list_files/3, @@ -42,7 +43,10 @@ list_all_projections/2, list_all_projections/3, %% Common API - quit/1 + quit/1, + + %% Connection management API + connected_p/1, connect/1, disconnect/1 ]). %% For "internal" replication only. -export([ @@ -68,6 +72,7 @@ -type file_prefix() :: binary() | list(). -type inet_host() :: inet:ip_address() | inet:hostname(). -type inet_port() :: inet:port_number(). +-type port_wrap() :: {w,atom(),term()}. -type projection() :: #projection_v1{}. -type projection_type() :: 'public' | 'private'. @@ -76,10 +81,10 @@ %% @doc Append a chunk (binary- or iolist-style) of data to a file %% with `Prefix'. --spec append_chunk(port(), epoch_id(), file_prefix(), chunk()) -> +-spec append_chunk(port_wrap(), epoch_id(), file_prefix(), chunk()) -> {ok, chunk_pos()} | {error, error_general()} | {error, term()}. append_chunk(Sock, EpochID, Prefix, Chunk) -> - append_chunk2(Sock, EpochID, Prefix, Chunk). + append_chunk2(Sock, EpochID, Prefix, Chunk, 0). %% @doc Append a chunk (binary- or iolist-style) of data to a file %% with `Prefix'. @@ -88,16 +93,50 @@ append_chunk(Sock, EpochID, Prefix, Chunk) -> epoch_id(), file_prefix(), chunk()) -> {ok, chunk_pos()} | {error, error_general()} | {error, term()}. append_chunk(Host, TcpPort, EpochID, Prefix, Chunk) -> - Sock = machi_util:connect(Host, TcpPort), + Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), try - append_chunk2(Sock, EpochID, Prefix, Chunk) + append_chunk2(Sock, EpochID, Prefix, Chunk, 0) after - catch gen_tcp:close(Sock) + disconnect(Sock) + end. + +%% @doc Append a chunk (binary- or iolist-style) of data to a file +%% with `Prefix' and also request an additional `Extra' bytes. +%% +%% For example, if the `Chunk' size is 1 KByte and `Extra' is 4K Bytes, then +%% the file offsets that follow `Chunk''s position for the following 4K will +%% be reserved by the file sequencer for later write(s) by the +%% `write_chunk()' API. + +-spec append_chunk_extra(port_wrap(), epoch_id(), file_prefix(), chunk(), chunk_size()) -> + {ok, chunk_pos()} | {error, error_general()} | {error, term()}. +append_chunk_extra(Sock, EpochID, Prefix, Chunk, ChunkExtra) + when is_integer(ChunkExtra), ChunkExtra >= 0 -> + append_chunk2(Sock, EpochID, Prefix, Chunk, ChunkExtra). + +%% @doc Append a chunk (binary- or iolist-style) of data to a file +%% with `Prefix' and also request an additional `Extra' bytes. +%% +%% For example, if the `Chunk' size is 1 KByte and `Extra' is 4K Bytes, then +%% the file offsets that follow `Chunk''s position for the following 4K will +%% be reserved by the file sequencer for later write(s) by the +%% `write_chunk()' API. + +-spec append_chunk_extra(inet_host(), inet_port(), + epoch_id(), file_prefix(), chunk(), chunk_size()) -> + {ok, chunk_pos()} | {error, error_general()} | {error, term()}. +append_chunk_extra(Host, TcpPort, EpochID, Prefix, Chunk, ChunkExtra) + when is_integer(ChunkExtra), ChunkExtra >= 0 -> + Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), + try + append_chunk2(Sock, EpochID, Prefix, Chunk, ChunkExtra) + after + disconnect(Sock) end. %% @doc Read a chunk of data of size `Size' from `File' at `Offset'. --spec read_chunk(port(), epoch_id(), file_name(), file_offset(), chunk_size()) -> +-spec read_chunk(port_wrap(), epoch_id(), file_name(), file_offset(), chunk_size()) -> {ok, chunk_s()} | {error, error_general() | 'no_such_file' | 'partial_read'} | {error, term()}. @@ -114,20 +153,20 @@ read_chunk(Sock, EpochID, File, Offset, Size) {error, term()}. read_chunk(Host, TcpPort, EpochID, File, Offset, Size) when Offset >= ?MINIMUM_OFFSET, Size >= 0 -> - Sock = machi_util:connect(Host, TcpPort), + Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), try read_chunk2(Sock, EpochID, File, Offset, Size) after - catch gen_tcp:close(Sock) + disconnect(Sock) end. %% @doc Fetch the list of chunk checksums for `File'. --spec checksum_list(port(), epoch_id(), file_name()) -> +-spec checksum_list(port_wrap(), epoch_id(), file_name()) -> {ok, [chunk_csum()]} | {error, error_general() | 'no_such_file' | 'partial_read'} | {error, term()}. -checksum_list(Sock, EpochID, File) when is_port(Sock) -> +checksum_list(Sock, EpochID, File) -> checksum_list2(Sock, EpochID, File). %% @doc Fetch the list of chunk checksums for `File'. @@ -136,18 +175,18 @@ checksum_list(Sock, EpochID, File) when is_port(Sock) -> {ok, [chunk_csum()]} | {error, error_general() | 'no_such_file'} | {error, term()}. checksum_list(Host, TcpPort, EpochID, File) when is_integer(TcpPort) -> - Sock = machi_util:connect(Host, TcpPort), + Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), try checksum_list2(Sock, EpochID, File) after - catch gen_tcp:close(Sock) + disconnect(Sock) end. %% @doc Fetch the list of all files on the remote FLU. --spec list_files(port(), epoch_id()) -> +-spec list_files(port_wrap(), epoch_id()) -> {ok, [file_info()]} | {error, term()}. -list_files(Sock, EpochID) when is_port(Sock) -> +list_files(Sock, EpochID) -> list2(Sock, EpochID). %% @doc Fetch the list of all files on the remote FLU. @@ -155,19 +194,19 @@ list_files(Sock, EpochID) when is_port(Sock) -> -spec list_files(inet_host(), inet_port(), epoch_id()) -> {ok, [file_info()]} | {error, term()}. list_files(Host, TcpPort, EpochID) when is_integer(TcpPort) -> - Sock = machi_util:connect(Host, TcpPort), + Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), try list2(Sock, EpochID) after - catch gen_tcp:close(Sock) + disconnect(Sock) end. %% @doc Fetch the wedge status from the remote FLU. --spec wedge_status(port()) -> +-spec wedge_status(port_wrap()) -> {ok, {boolean(), pv1_epoch()}} | {error, term()}. -wedge_status(Sock) when is_port(Sock) -> +wedge_status(Sock) -> wedge_status2(Sock). %% @doc Fetch the wedge status from the remote FLU. @@ -175,16 +214,16 @@ wedge_status(Sock) when is_port(Sock) -> -spec wedge_status(inet_host(), inet_port()) -> {ok, {boolean(), pv1_epoch()}} | {error, term()}. wedge_status(Host, TcpPort) when is_integer(TcpPort) -> - Sock = machi_util:connect(Host, TcpPort), + Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), try wedge_status2(Sock) after - catch gen_tcp:close(Sock) + disconnect(Sock) end. %% @doc Get the latest epoch number + checksum from the FLU's projection store. --spec get_latest_epoch(port(), projection_type()) -> +-spec get_latest_epoch(port_wrap(), projection_type()) -> {ok, epoch_id()} | {error, term()}. get_latest_epoch(Sock, ProjType) when ProjType == 'public' orelse ProjType == 'private' -> @@ -197,16 +236,16 @@ get_latest_epoch(Sock, ProjType) {ok, epoch_id()} | {error, term()}. get_latest_epoch(Host, TcpPort, ProjType) when ProjType == 'public' orelse ProjType == 'private' -> - Sock = machi_util:connect(Host, TcpPort), + Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), try get_latest_epoch2(Sock, ProjType) after - catch gen_tcp:close(Sock) + disconnect(Sock) end. %% @doc Get the latest projection from the FLU's projection store for `ProjType' --spec read_latest_projection(port(), projection_type()) -> +-spec read_latest_projection(port_wrap(), projection_type()) -> {ok, projection()} | {error, not_written} | {error, term()}. read_latest_projection(Sock, ProjType) when ProjType == 'public' orelse ProjType == 'private' -> @@ -219,17 +258,17 @@ read_latest_projection(Sock, ProjType) {ok, projection()} | {error, not_written} | {error, term()}. read_latest_projection(Host, TcpPort, ProjType) when ProjType == 'public' orelse ProjType == 'private' -> - Sock = machi_util:connect(Host, TcpPort), + Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), try read_latest_projection2(Sock, ProjType) after - catch gen_tcp:close(Sock) + disconnect(Sock) end. %% @doc Read a projection `Proj' of type `ProjType'. --spec read_projection(port(), projection_type(), epoch_num()) -> - {ok, projection()} | {error, written} | {error, term()}. +-spec read_projection(port_wrap(), projection_type(), epoch_num()) -> + {ok, projection()} | {error, not_written} | {error, term()}. read_projection(Sock, ProjType, Epoch) when ProjType == 'public' orelse ProjType == 'private' -> read_projection2(Sock, ProjType, Epoch). @@ -238,19 +277,19 @@ read_projection(Sock, ProjType, Epoch) -spec read_projection(inet_host(), inet_port(), projection_type(), epoch_num()) -> - {ok, projection()} | {error, written} | {error, term()}. + {ok, projection()} | {error, not_written} | {error, term()}. read_projection(Host, TcpPort, ProjType, Epoch) when ProjType == 'public' orelse ProjType == 'private' -> - Sock = machi_util:connect(Host, TcpPort), + Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), try read_projection2(Sock, ProjType, Epoch) after - catch gen_tcp:close(Sock) + disconnect(Sock) end. %% @doc Write a projection `Proj' of type `ProjType'. --spec write_projection(port(), projection_type(), projection()) -> +-spec write_projection(port_wrap(), projection_type(), projection()) -> 'ok' | {error, written} | {error, term()}. write_projection(Sock, ProjType, Proj) when ProjType == 'public' orelse ProjType == 'private', @@ -265,16 +304,16 @@ write_projection(Sock, ProjType, Proj) write_projection(Host, TcpPort, ProjType, Proj) when ProjType == 'public' orelse ProjType == 'private', is_record(Proj, projection_v1) -> - Sock = machi_util:connect(Host, TcpPort), + Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), try write_projection2(Sock, ProjType, Proj) after - catch gen_tcp:close(Sock) + disconnect(Sock) end. %% @doc Get all projections from the FLU's projection store. --spec get_all_projections(port(), projection_type()) -> +-spec get_all_projections(port_wrap(), projection_type()) -> {ok, [projection()]} | {error, term()}. get_all_projections(Sock, ProjType) when ProjType == 'public' orelse ProjType == 'private' -> @@ -287,16 +326,16 @@ get_all_projections(Sock, ProjType) {ok, [projection()]} | {error, term()}. get_all_projections(Host, TcpPort, ProjType) when ProjType == 'public' orelse ProjType == 'private' -> - Sock = machi_util:connect(Host, TcpPort), + Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), try get_all_projections2(Sock, ProjType) after - catch gen_tcp:close(Sock) + disconnect(Sock) end. %% @doc Get all epoch numbers from the FLU's projection store. --spec list_all_projections(port(), projection_type()) -> +-spec list_all_projections(port_wrap(), projection_type()) -> {ok, [non_neg_integer()]} | {error, term()}. list_all_projections(Sock, ProjType) when ProjType == 'public' orelse ProjType == 'private' -> @@ -309,20 +348,37 @@ list_all_projections(Sock, ProjType) {ok, [non_neg_integer()]} | {error, term()}. list_all_projections(Host, TcpPort, ProjType) when ProjType == 'public' orelse ProjType == 'private' -> - Sock = machi_util:connect(Host, TcpPort), + Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), try list_all_projections2(Sock, ProjType) after - catch gen_tcp:close(Sock) + disconnect(Sock) end. %% @doc Quit & close the connection to remote FLU. --spec quit(port()) -> +-spec quit(port_wrap()) -> ok. -quit(Sock) when is_port(Sock) -> - catch (_ = gen_tcp:send(Sock, <<"QUIT\n">>)), - catch gen_tcp:close(Sock), +quit(Sock) -> + catch (_ = w_send(Sock, <<"QUIT\n">>)), + disconnect(Sock), + ok. + +connected_p({w,tcp,Sock}) -> + case (catch inet:peername(Sock)) of + {ok, _} -> true; + _ -> false + end; +connected_p(_) -> + false. + +connect(#p_srvr{}=P) -> + w_connect(P). + +disconnect({w,tcp,_Sock}=WS) -> + w_close(WS), + ok; +disconnect(_) -> ok. %%%%%%%%%%%%%%%%%%%%%%%%%%% @@ -330,7 +386,7 @@ quit(Sock) when is_port(Sock) -> %% @doc Restricted API: Write a chunk of already-sequenced data to %% `File' at `Offset'. --spec write_chunk(port(), epoch_id(), file_name(), file_offset(), chunk()) -> +-spec write_chunk(port_wrap(), epoch_id(), file_name(), file_offset(), chunk()) -> ok | {error, error_general()} | {error, term()}. write_chunk(Sock, EpochID, File, Offset, Chunk) when Offset >= ?MINIMUM_OFFSET -> @@ -344,19 +400,19 @@ write_chunk(Sock, EpochID, File, Offset, Chunk) ok | {error, error_general()} | {error, term()}. write_chunk(Host, TcpPort, EpochID, File, Offset, Chunk) when Offset >= ?MINIMUM_OFFSET -> - Sock = machi_util:connect(Host, TcpPort), + Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), try write_chunk2(Sock, EpochID, File, Offset, Chunk) after - catch gen_tcp:close(Sock) + disconnect(Sock) end. %% @doc Restricted API: Delete a file after it has been successfully %% migrated. --spec delete_migration(port(), epoch_id(), file_name()) -> +-spec delete_migration(port_wrap(), epoch_id(), file_name()) -> ok | {error, error_general() | 'no_such_file'} | {error, term()}. -delete_migration(Sock, EpochID, File) when is_port(Sock) -> +delete_migration(Sock, EpochID, File) -> delete_migration2(Sock, EpochID, File). %% @doc Restricted API: Delete a file after it has been successfully @@ -365,19 +421,19 @@ delete_migration(Sock, EpochID, File) when is_port(Sock) -> -spec delete_migration(inet_host(), inet_port(), epoch_id(), file_name()) -> ok | {error, error_general() | 'no_such_file'} | {error, term()}. delete_migration(Host, TcpPort, EpochID, File) when is_integer(TcpPort) -> - Sock = machi_util:connect(Host, TcpPort), + Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), try delete_migration2(Sock, EpochID, File) after - catch gen_tcp:close(Sock) + disconnect(Sock) end. %% @doc Restricted API: Truncate a file after it has been successfully %% erasure coded. --spec trunc_hack(port(), epoch_id(), file_name()) -> +-spec trunc_hack(port_wrap(), epoch_id(), file_name()) -> ok | {error, error_general() | 'no_such_file'} | {error, term()}. -trunc_hack(Sock, EpochID, File) when is_port(Sock) -> +trunc_hack(Sock, EpochID, File) -> trunc_hack2(Sock, EpochID, File). %% @doc Restricted API: Truncate a file after it has been successfully @@ -386,16 +442,16 @@ trunc_hack(Sock, EpochID, File) when is_port(Sock) -> -spec trunc_hack(inet_host(), inet_port(), epoch_id(), file_name()) -> ok | {error, error_general() | 'no_such_file'} | {error, term()}. trunc_hack(Host, TcpPort, EpochID, File) when is_integer(TcpPort) -> - Sock = machi_util:connect(Host, TcpPort), + Sock = connect(#p_srvr{proto_mod=?MODULE, address=Host, port=TcpPort}), try trunc_hack2(Sock, EpochID, File) after - catch gen_tcp:close(Sock) + disconnect(Sock) end. %%%%%%%%%%%%%%%%%%%%%%%%%%% -append_chunk2(Sock, EpochID, Prefix0, Chunk0) -> +append_chunk2(Sock, EpochID, Prefix0, Chunk0, ChunkExtra) -> erase(bad_sock), try %% TODO: add client-side checksum to the server's protocol @@ -405,11 +461,13 @@ append_chunk2(Sock, EpochID, Prefix0, Chunk0) -> Len = iolist_size(Chunk0), true = (Len =< ?MAX_CHUNK_SIZE), {EpochNum, EpochCSum} = EpochID, - EpochIDRaw = <>, + EpochIDHex = machi_util:bin_to_hexstr( + <>), LenHex = machi_util:int_to_hexbin(Len, 32), - Cmd = [<<"A ">>, EpochIDRaw, LenHex, Prefix, 10], - ok = gen_tcp:send(Sock, [Cmd, Chunk]), - {ok, Line} = gen_tcp:recv(Sock, 0), + ExtraHex = machi_util:int_to_hexbin(ChunkExtra, 32), + Cmd = [<<"A ">>, EpochIDHex, LenHex, ExtraHex, Prefix, 10], + ok = w_send(Sock, [Cmd, Chunk]), + {ok, Line} = w_recv(Sock, 0), PathLen = byte_size(Line) - 3 - 16 - 1 - 1, case Line of <<"OK ", OffsetHex:16/binary, " ", @@ -436,21 +494,22 @@ read_chunk2(Sock, EpochID, File0, Offset, Size) -> erase(bad_sock), try {EpochNum, EpochCSum} = EpochID, - EpochIDRaw = <>, + EpochIDHex = machi_util:bin_to_hexstr( + <>), File = machi_util:make_binary(File0), PrefixHex = machi_util:int_to_hexbin(Offset, 64), SizeHex = machi_util:int_to_hexbin(Size, 32), - CmdLF = [$R, 32, EpochIDRaw, PrefixHex, SizeHex, File, 10], - ok = gen_tcp:send(Sock, CmdLF), - case gen_tcp:recv(Sock, 3) of + CmdLF = [$R, 32, EpochIDHex, PrefixHex, SizeHex, File, 10], + ok = w_send(Sock, CmdLF), + case w_recv(Sock, 3) of {ok, <<"OK\n">>} -> - {ok, _Chunk}=Res = gen_tcp:recv(Sock, Size), + {ok, _Chunk}=Res = w_recv(Sock, Size), Res; {ok, Else} -> - {ok, OldOpts} = inet:getopts(Sock, [packet]), - ok = inet:setopts(Sock, [{packet, line}]), - {ok, Else2} = gen_tcp:recv(Sock, 0), - ok = inet:setopts(Sock, OldOpts), + {ok, OldOpts} = w_getopts(Sock, [packet]), + ok = w_setopts(Sock, [{packet, line}]), + {ok, Else2} = w_recv(Sock, 0), + ok = w_setopts(Sock, OldOpts), case Else of <<"ERA">> -> {error, todo_erasure_coded}; %% escript_cc_parse_ec_info(Sock, Line, Else2); @@ -485,13 +544,14 @@ read_chunk2(Sock, EpochID, File0, Offset, Size) -> list2(Sock, EpochID) -> try {EpochNum, EpochCSum} = EpochID, - EpochIDRaw = <>, - ok = gen_tcp:send(Sock, [<<"L ">>, EpochIDRaw, <<"\n">>]), - ok = inet:setopts(Sock, [{packet, line}]), - case gen_tcp:recv(Sock, 0) of + EpochIDHex = machi_util:bin_to_hexstr( + <>), + ok = w_send(Sock, [<<"L ">>, EpochIDHex, <<"\n">>]), + ok = w_setopts(Sock, [{packet, line}]), + case w_recv(Sock, 0) of {ok, <<"OK\n">>} -> - Res = list3(gen_tcp:recv(Sock, 0), Sock), - ok = inet:setopts(Sock, [{packet, raw}]), + Res = list3(w_recv(Sock, 0), Sock), + ok = w_setopts(Sock, [{packet, raw}]), {ok, Res}; {ok, <<"ERROR WEDGED\n">>} -> {error, wedged}; @@ -511,19 +571,19 @@ list3({ok, Line}, Sock) -> FileLen = byte_size(Line) - 16 - 1 - 1, <> = Line, Size = machi_util:hexstr_to_int(SizeHex), - [{Size, File}|list3(gen_tcp:recv(Sock, 0), Sock)]; + [{Size, File}|list3(w_recv(Sock, 0), Sock)]; list3(Else, _Sock) -> throw({server_protocol_error, Else}). wedge_status2(Sock) -> try - ok = gen_tcp:send(Sock, [<<"WEDGE-STATUS\n">>]), - ok = inet:setopts(Sock, [{packet, line}]), + ok = w_send(Sock, [<<"WEDGE-STATUS\n">>]), + ok = w_setopts(Sock, [{packet, line}]), {ok, <<"OK ", BooleanHex:2/binary, " ", EpochHex:8/binary, " ", - CSumHex:40/binary, "\n">>} = gen_tcp:recv(Sock, 0), - ok = inet:setopts(Sock, [{packet, raw}]), + CSumHex:40/binary, "\n">>} = w_recv(Sock, 0), + ok = w_setopts(Sock, [{packet, raw}]), Boolean = if BooleanHex == <<"00">> -> false; BooleanHex == <<"01">> -> true end, @@ -541,16 +601,17 @@ checksum_list2(Sock, EpochID, File) -> erase(bad_sock), try {EpochNum, EpochCSum} = EpochID, - EpochIDRaw = <>, - ok = gen_tcp:send(Sock, [<<"C ">>, EpochIDRaw, File, <<"\n">>]), - ok = inet:setopts(Sock, [{packet, line}]), - case gen_tcp:recv(Sock, 0) of + EpochIDHex = machi_util:bin_to_hexstr( + <>), + ok = w_send(Sock, [<<"C ">>, EpochIDHex, File, <<"\n">>]), + ok = w_setopts(Sock, [{packet, line}]), + case w_recv(Sock, 0) of {ok, <<"OK ", Rest/binary>> = Line} -> put(status, ok), % may be unset later RestLen = byte_size(Rest) - 1, <> = Rest, <> = machi_util:hexstr_to_bin(LenHex), - ok = inet:setopts(Sock, [{packet, raw}]), + ok = w_setopts(Sock, [{packet, raw}]), {ok, checksum_list_finish(checksum_list_fast(Sock, Len))}; {ok, <<"ERROR NO-SUCH-FILE", _/binary>>} -> {error, no_such_file}; @@ -559,7 +620,11 @@ checksum_list2(Sock, EpochID, File) -> {ok, <<"ERROR WEDGED", _/binary>>} -> {error, wedged}; {ok, Else} -> - throw({server_protocol_error, Else}) + throw({server_protocol_error, Else}); + {error, closed} -> + throw({error, closed}); + Else -> + throw(Else) end catch throw:Error -> @@ -571,11 +636,11 @@ checksum_list2(Sock, EpochID, File) -> end. checksum_list_fast(Sock, 0) -> - {ok, <<".\n">> = _Line} = gen_tcp:recv(Sock, 2), + {ok, <<".\n">> = _Line} = w_recv(Sock, 2), []; checksum_list_fast(Sock, Remaining) -> Num = erlang:min(Remaining, 1024*1024), - {ok, Bytes} = gen_tcp:recv(Sock, Num), + {ok, Bytes} = w_recv(Sock, Num), [Bytes|checksum_list_fast(Sock, Remaining - byte_size(Bytes))]. checksum_list_finish(Chunks) -> @@ -599,7 +664,8 @@ write_chunk2(Sock, EpochID, File0, Offset, Chunk0) -> erase(bad_sock), try {EpochNum, EpochCSum} = EpochID, - EpochIDRaw = <>, + EpochIDHex = machi_util:bin_to_hexstr( + <>), %% TODO: add client-side checksum to the server's protocol %% _ = machi_util:checksum_chunk(Chunk), File = machi_util:make_binary(File0), @@ -609,10 +675,10 @@ write_chunk2(Sock, EpochID, File0, Offset, Chunk0) -> Len = iolist_size(Chunk0), true = (Len =< ?MAX_CHUNK_SIZE), LenHex = machi_util:int_to_hexbin(Len, 32), - Cmd = [<<"W-repl ">>, EpochIDRaw, OffsetHex, + Cmd = [<<"W-repl ">>, EpochIDHex, OffsetHex, LenHex, File, <<"\n">>], - ok = gen_tcp:send(Sock, [Cmd, Chunk]), - {ok, Line} = gen_tcp:recv(Sock, 0), + ok = w_send(Sock, [Cmd, Chunk]), + {ok, Line} = w_recv(Sock, 0), PathLen = byte_size(Line) - 3 - 16 - 1 - 1, case Line of <<"OK\n">> -> @@ -637,11 +703,12 @@ delete_migration2(Sock, EpochID, File) -> erase(bad_sock), try {EpochNum, EpochCSum} = EpochID, - EpochIDRaw = <>, - Cmd = [<<"DEL-migration ">>, EpochIDRaw, File, <<"\n">>], - ok = gen_tcp:send(Sock, Cmd), - ok = inet:setopts(Sock, [{packet, line}]), - case gen_tcp:recv(Sock, 0) of + EpochIDHex = machi_util:bin_to_hexstr( + <>), + Cmd = [<<"DEL-migration ">>, EpochIDHex, File, <<"\n">>], + ok = w_send(Sock, Cmd), + ok = w_setopts(Sock, [{packet, line}]), + case w_recv(Sock, 0) of {ok, <<"OK\n">>} -> ok; {ok, <<"ERROR NO-SUCH-FILE", _/binary>>} -> @@ -666,11 +733,12 @@ trunc_hack2(Sock, EpochID, File) -> erase(bad_sock), try {EpochNum, EpochCSum} = EpochID, - EpochIDRaw = <>, - Cmd = [<<"TRUNC-hack--- ">>, EpochIDRaw, File, <<"\n">>], - ok = gen_tcp:send(Sock, Cmd), - ok = inet:setopts(Sock, [{packet, line}]), - case gen_tcp:recv(Sock, 0) of + EpochIDHex = machi_util:bin_to_hexstr( + <>), + Cmd = [<<"TRUNC-hack--- ">>, EpochIDHex, File, <<"\n">>], + ok = w_send(Sock, Cmd), + ok = w_setopts(Sock, [{packet, line}]), + case w_recv(Sock, 0) of {ok, <<"OK\n">>} -> ok; {ok, <<"ERROR NO-SUCH-FILE", _/binary>>} -> @@ -723,18 +791,22 @@ do_projection_common(Sock, ProjCmd) -> true = (Len =< ?MAX_CHUNK_SIZE), LenHex = machi_util:int_to_hexbin(Len, 32), Cmd = [<<"PROJ ">>, LenHex, <<"\n">>], - ok = gen_tcp:send(Sock, [Cmd, ProjCmdBin]), - ok = inet:setopts(Sock, [{packet, line}]), - {ok, Line} = gen_tcp:recv(Sock, 0), - case Line of - <<"OK ", ResLenHex:8/binary, "\n">> -> - ResLen = machi_util:hexstr_to_int(ResLenHex), - ok = inet:setopts(Sock, [{packet, raw}]), - {ok, ResBin} = gen_tcp:recv(Sock, ResLen), - ok = inet:setopts(Sock, [{packet, line}]), - binary_to_term(ResBin); - Else -> - {error, Else} + ok = w_send(Sock, [Cmd, ProjCmdBin]), + ok = w_setopts(Sock, [{packet, line}]), + case w_recv(Sock, 0) of + {ok, Line} -> + case Line of + <<"OK ", ResLenHex:8/binary, "\n">> -> + ResLen = machi_util:hexstr_to_int(ResLenHex), + ok = w_setopts(Sock, [{packet, raw}]), + {ok, ResBin} = w_recv(Sock, ResLen), + ok = w_setopts(Sock, [{packet, line}]), + binary_to_term(ResBin); + Else -> + {error, Else} + end; + {error, _} = Bad -> + throw(Bad) end catch throw:Error -> @@ -744,3 +816,43 @@ do_projection_common(Sock, ProjCmd) -> put(bad_sock, Sock), {error, {badmatch, BadMatch, erlang:get_stacktrace()}} end. + +%%%%%%%%%%%%%%%%%%%%%%%%%%% + +w_connect(#p_srvr{proto_mod=?MODULE, address=Host, port=Port, props=Props})-> + try + case proplists:get_value(session_proto, Props, tcp) of + tcp -> + Sock = machi_util:connect(Host, Port), + {w,tcp,Sock}; + %% sctp -> + %% %% TODO: not implemented + %% {w,sctp,Sock} + ssl -> + %% TODO: veryveryuntested + SslOptions = proplists:get_value(ssl_options, Props), + Sock = machi_util:connect(Port, Port), + {ok, SslSock} = ssl:connect(Sock, SslOptions), + {w,ssl,SslSock} + end + catch + _:_ -> + undefined + end. + +w_close({w,tcp,Sock}) -> + catch gen_tcp:close(Sock), + ok. + +w_recv({w,tcp,Sock}, Amt) -> + gen_tcp:recv(Sock, Amt). + +w_send({w,tcp,Sock}, IoData) -> + gen_tcp:send(Sock, IoData). + +w_getopts({w,tcp,Sock}, Opts) -> + inet:getopts(Sock, Opts). + +w_setopts({w,tcp,Sock}, Opts) -> + inet:setopts(Sock, Opts). + diff --git a/src/machi_projection.erl b/src/machi_projection.erl index 8e48966..3d94cb1 100644 --- a/src/machi_projection.erl +++ b/src/machi_projection.erl @@ -59,6 +59,11 @@ new(EpochNum, MyName, [_|_] = MembersDict0, Down_list, UPI_list, Repairing_list, is_list(Repairing_list), is_list(Dbg), is_list(Dbg2) -> MembersDict = make_members_dict(MembersDict0), All_list = [Name || {Name, _P} <- MembersDict], + if length(All_list) =< ?MAX_CHAIN_LENGTH -> + ok; + true -> + exit(max_chain_length_error) + end, true = lists:all(fun(X) when is_atom(X) orelse is_binary(X) -> true; (_) -> false end, All_list), diff --git a/src/machi_projection_store.erl b/src/machi_projection_store.erl index cc74c7c..9ddeb0d 100644 --- a/src/machi_projection_store.erl +++ b/src/machi_projection_store.erl @@ -50,12 +50,13 @@ get_all_projections/2, get_all_projections/3, list_all_projections/2, list_all_projections/3 ]). +-export([set_wedge_notify_pid/2]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --define(NO_EPOCH, {-1,<<0:(20*8)/big>>}). +-define(NO_EPOCH, ?DUMMY_PV1_EPOCH). -record(state, { public_dir = "" :: string(), @@ -148,6 +149,9 @@ list_all_projections(PidSpec, ProjType, Timeout) when ProjType == 'public' orelse ProjType == 'private' -> g_call(PidSpec, {list_all_projections, ProjType}, Timeout). +set_wedge_notify_pid(PidSpec, NotifyWedgeStateChanges) -> + gen_server:call(PidSpec, {set_wedge_notify_pid, NotifyWedgeStateChanges}). + %%%%%%%%%%%%%%%%%%%%%%%%%%% g_call(PidSpec, Arg, Timeout) -> @@ -207,6 +211,8 @@ handle_call({{list_all_projections, ProjType}, LC1}, _From, S) -> LC2 = lclock_update(LC1), Dir = pick_path(ProjType, S), {reply, {{ok, find_all(Dir)}, LC2}, S}; +handle_call({set_wedge_notify_pid, NotifyWedgeStateChanges}, _From, S) -> + {reply, ok, S#state{wedge_notify_pid=NotifyWedgeStateChanges}}; handle_call(_Request, _From, S) -> Reply = whaaaaaaaaaaaaa, {reply, Reply, S}. @@ -258,7 +264,8 @@ do_proj_write(ProjType, #projection_v1{epoch_number=Epoch}=Proj, S) -> ok = file:close(FH), EffectiveProj = machi_chain_manager1:inner_projection_or_self(Proj), EffectiveEpoch = EffectiveProj#projection_v1.epoch_number, - EpochId = {EffectiveEpoch, EffectiveProj#projection_v1.epoch_csum}, + EpochId = {Epoch, Proj#projection_v1.epoch_csum}, + EffectiveEpochId = {EffectiveEpoch, EffectiveProj#projection_v1.epoch_csum}, %% NewS = if ProjType == public, Epoch > element(1, S#state.max_public_epoch) -> @@ -266,7 +273,8 @@ do_proj_write(ProjType, #projection_v1{epoch_number=Epoch}=Proj, S) -> %% This is a regular projection, i.e., %% does not have an inner proj. update_wedge_state( - S#state.wedge_notify_pid, true, EpochId); + S#state.wedge_notify_pid, true, + EffectiveEpochId); Epoch /= EffectiveEpoch -> %% This projection has an inner proj. %% The outer proj is flapping, so we do @@ -277,7 +285,8 @@ do_proj_write(ProjType, #projection_v1{epoch_number=Epoch}=Proj, S) -> ProjType == private, Epoch > element(1, S#state.max_private_epoch) -> update_wedge_state( - S#state.wedge_notify_pid, false, EpochId), + S#state.wedge_notify_pid, false, + EffectiveEpochId), S#state{max_private_epoch=EpochId}; true -> S diff --git a/src/machi_proxy_flu1_client.erl b/src/machi_proxy_flu1_client.erl index 3bbaa28..74fb116 100644 --- a/src/machi_proxy_flu1_client.erl +++ b/src/machi_proxy_flu1_client.erl @@ -51,12 +51,14 @@ -export([ %% File API append_chunk/4, append_chunk/5, + append_chunk_extra/5, append_chunk_extra/6, read_chunk/5, read_chunk/6, checksum_list/3, checksum_list/4, list_files/2, list_files/3, wedge_status/1, wedge_status/2, %% %% Projection API + get_epoch_id/1, get_epoch_id/2, get_latest_epoch/2, get_latest_epoch/3, read_latest_projection/2, read_latest_projection/3, read_projection/3, read_projection/4, @@ -65,15 +67,16 @@ list_all_projections/2, list_all_projections/3, %% Common API - quit/1 + quit/1, + + %% Internal API + write_chunk/5, write_chunk/6 ]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --define(FLU_C, machi_flu1_client). - -record(state, { i :: #p_srvr{}, sock :: 'undefined' | port() @@ -99,6 +102,21 @@ append_chunk(PidSpec, EpochID, Prefix, Chunk, Timeout) -> gen_server:call(PidSpec, {req, {append_chunk, EpochID, Prefix, Chunk}}, Timeout). +%% @doc Append a chunk (binary- or iolist-style) of data to a file +%% with `Prefix'. + +append_chunk_extra(PidSpec, EpochID, Prefix, Chunk, ChunkExtra) + when is_integer(ChunkExtra), ChunkExtra >= 0 -> + append_chunk_extra(PidSpec, EpochID, Prefix, Chunk, ChunkExtra, infinity). + +%% @doc Append a chunk (binary- or iolist-style) of data to a file +%% with `Prefix'. + +append_chunk_extra(PidSpec, EpochID, Prefix, Chunk, ChunkExtra, Timeout) -> + gen_server:call(PidSpec, {req, {append_chunk_extra, EpochID, Prefix, + Chunk, ChunkExtra}}, + Timeout). + %% @doc Read a chunk of data of size `Size' from `File' at `Offset'. read_chunk(PidSpec, EpochID, File, Offset, Size) -> @@ -143,6 +161,16 @@ wedge_status(PidSpec, Timeout) -> gen_server:call(PidSpec, {req, {wedge_status}}, Timeout). +%% @doc Get the `epoch_id()' of the FLU's current/latest projection. + +get_epoch_id(PidSpec) -> + get_epoch_id(PidSpec, infinity). + +%% @doc Get the `epoch_id()' of the FLU's current/latest projection. + +get_epoch_id(PidSpec, Timeout) -> + gen_server:call(PidSpec, {req, {get_epoch_id}}, Timeout). + %% @doc Get the latest epoch number + checksum from the FLU's projection store. get_latest_epoch(PidSpec, ProjType) -> @@ -215,6 +243,19 @@ list_all_projections(PidSpec, ProjType, Timeout) -> quit(PidSpec) -> gen_server:call(PidSpec, quit, infinity). +%% @doc Write a chunk (binary- or iolist-style) of data to a file +%% with `Prefix' at `Offset'. + +write_chunk(PidSpec, EpochID, File, Offset, Chunk) -> + write_chunk(PidSpec, EpochID, File, Offset, Chunk, infinity). + +%% @doc Write a chunk (binary- or iolist-style) of data to a file +%% with `Prefix' at `Offset'. + +write_chunk(PidSpec, EpochID, File, Offset, Chunk, Timeout) -> + gen_server:call(PidSpec, {req, {write_chunk, EpochID, File, Offset, Chunk}}, + Timeout). + %%%%%%%%%%%%%%%%%%%%%%%%%%% init([I]) -> @@ -265,59 +306,74 @@ do_req(Req, S) -> {{error, partition}, S2} end. -make_req_fun({append_chunk, EpochID, Prefix, Chunk}, #state{sock=Sock}) -> - fun() -> ?FLU_C:append_chunk(Sock, EpochID, Prefix, Chunk) end; -make_req_fun({read_chunk, EpochID, File, Offset, Size}, #state{sock=Sock}) -> - fun() -> ?FLU_C:read_chunk(Sock, EpochID, File, Offset, Size) end; -make_req_fun({checksum_list, EpochID, File}, #state{sock=Sock}) -> - fun() -> ?FLU_C:checksum_list(Sock, EpochID, File) end; -make_req_fun({list_files, EpochID}, #state{sock=Sock}) -> - fun() -> ?FLU_C:list_files(Sock, EpochID) end; -make_req_fun({wedge_status}, #state{sock=Sock}) -> - fun() -> ?FLU_C:wedge_status(Sock) end; -make_req_fun({get_latest_epoch, ProjType}, #state{sock=Sock}) -> - fun() -> ?FLU_C:get_latest_epoch(Sock, ProjType) end; -make_req_fun({read_latest_projection, ProjType}, #state{sock=Sock}) -> - fun() -> ?FLU_C:read_latest_projection(Sock, ProjType) end; -make_req_fun({read_projection, ProjType, Epoch}, #state{sock=Sock}) -> - fun() -> ?FLU_C:read_projection(Sock, ProjType, Epoch) end; -make_req_fun({write_projection, ProjType, Proj}, #state{sock=Sock}) -> - fun() -> ?FLU_C:write_projection(Sock, ProjType, Proj) end; -make_req_fun({get_all_projections, ProjType}, #state{sock=Sock}) -> - fun() -> ?FLU_C:get_all_projections(Sock, ProjType) end; -make_req_fun({list_all_projections, ProjType}, #state{sock=Sock}) -> - fun() -> ?FLU_C:list_all_projections(Sock, ProjType) end. +make_req_fun({append_chunk, EpochID, Prefix, Chunk}, + #state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) -> + fun() -> Mod:append_chunk(Sock, EpochID, Prefix, Chunk) end; +make_req_fun({append_chunk_extra, EpochID, Prefix, Chunk, ChunkExtra}, + #state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) -> + fun() -> Mod:append_chunk_extra(Sock, EpochID, Prefix, Chunk, ChunkExtra) end; +make_req_fun({read_chunk, EpochID, File, Offset, Size}, + #state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) -> + fun() -> Mod:read_chunk(Sock, EpochID, File, Offset, Size) end; +make_req_fun({write_chunk, EpochID, File, Offset, Chunk}, + #state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) -> + fun() -> Mod:write_chunk(Sock, EpochID, File, Offset, Chunk) end; +make_req_fun({checksum_list, EpochID, File}, + #state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) -> + fun() -> Mod:checksum_list(Sock, EpochID, File) end; +make_req_fun({list_files, EpochID}, + #state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) -> + fun() -> Mod:list_files(Sock, EpochID) end; +make_req_fun({wedge_status}, + #state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) -> + fun() -> Mod:wedge_status(Sock) end; +make_req_fun({get_epoch_id}, + #state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) -> + fun() -> case Mod:read_latest_projection(Sock, private) of + {ok, P} -> + #projection_v1{epoch_number=Epoch, + epoch_csum=CSum} = + machi_chain_manager1:inner_projection_or_self(P), + {ok, {Epoch, CSum}}; + Error -> + Error + end + end; +make_req_fun({get_latest_epoch, ProjType}, + #state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) -> + fun() -> Mod:get_latest_epoch(Sock, ProjType) end; +make_req_fun({read_latest_projection, ProjType}, + #state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) -> + fun() -> Mod:read_latest_projection(Sock, ProjType) end; +make_req_fun({read_projection, ProjType, Epoch}, + #state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) -> + fun() -> Mod:read_projection(Sock, ProjType, Epoch) end; +make_req_fun({write_projection, ProjType, Proj}, + #state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) -> + fun() -> Mod:write_projection(Sock, ProjType, Proj) end; +make_req_fun({get_all_projections, ProjType}, + #state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) -> + fun() -> Mod:get_all_projections(Sock, ProjType) end; +make_req_fun({list_all_projections, ProjType}, + #state{sock=Sock,i=#p_srvr{proto_mod=Mod}}) -> + fun() -> Mod:list_all_projections(Sock, ProjType) end. connected_p(#state{sock=SockMaybe, - i=#p_srvr{proto=ipv4}=_I}=_S) -> - is_port(SockMaybe); -connected_p(#state{i=#p_srvr{proto=disterl, - name=_NodeName}=_I}=_S) -> - true. - %% case net_adm:ping(NodeName) of - %% ping -> - %% true; - %% _ -> - %% false - %% end. + i=#p_srvr{proto_mod=Mod}=_I}=_S) -> + Mod:connected_p(SockMaybe). try_connect(#state{sock=undefined, - i=#p_srvr{proto=ipv4, address=Host, port=TcpPort}=_I}=S) -> - try - Sock = machi_util:connect(Host, TcpPort), - S#state{sock=Sock} - catch - _:_ -> - S - end; + i=#p_srvr{proto_mod=Mod}=P}=S) -> + Sock = Mod:connect(P), + S#state{sock=Sock}; try_connect(S) -> %% If we're connection-based, we're already connected. %% If we're not connection-based, then there's nothing to do. S. +disconnect(#state{sock=undefined}=S) -> + S; disconnect(#state{sock=Sock, - i=#p_srvr{proto=ipv4}=_I}=S) -> - (catch gen_tcp:close(Sock)), - S#state{sock=undefined}; -disconnect(S) -> - S. + i=#p_srvr{proto_mod=Mod}=_I}=S) -> + Mod:disconnect(Sock), + S#state{sock=undefined}. diff --git a/src/machi_yessir_client.erl b/src/machi_yessir_client.erl new file mode 100644 index 0000000..c3cedad --- /dev/null +++ b/src/machi_yessir_client.erl @@ -0,0 +1,617 @@ +%% ------------------------------------------------------------------- +%% +%% Copyright (c) 2007-2015 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +%% @doc "Yes, sir!" style dummy/mock client facade. + +-module(machi_yessir_client). + +-include("machi.hrl"). +-include("machi_projection.hrl"). + +-export([ + %% File API + append_chunk/4, append_chunk/5, + append_chunk_extra/5, append_chunk_extra/6, + read_chunk/5, read_chunk/6, + checksum_list/3, checksum_list/4, + list_files/2, list_files/3, + wedge_status/1, wedge_status/2, + + %% Projection API + get_latest_epoch/2, get_latest_epoch/3, + read_latest_projection/2, read_latest_projection/3, + read_projection/3, read_projection/4, + write_projection/3, write_projection/4, + get_all_projections/2, get_all_projections/3, + list_all_projections/2, list_all_projections/3, + + %% Common API + quit/1, + + %% Connection management API + connected_p/1, connect/1, disconnect/1 + ]). +%% For "internal" replication only. +-export([ + write_chunk/5, write_chunk/6, + delete_migration/3, delete_migration/4, + trunc_hack/3, trunc_hack/4 + ]). + +-record(yessir, { + name, + start, + start_bin, + num_files, + file_size, + chunk_size + }). + +-type chunk() :: binary() | iolist(). % client can use either +-type chunk_csum() :: {file_offset(), chunk_size(), binary()}. +-type chunk_s() :: binary(). % server always uses binary() +-type chunk_pos() :: {file_offset(), chunk_size(), file_name_s()}. +-type chunk_size() :: non_neg_integer(). +-type error_general() :: 'bad_arg' | 'wedged'. +-type epoch_csum() :: binary(). +-type epoch_num() :: -1 | non_neg_integer(). +-type epoch_id() :: {epoch_num(), epoch_csum()}. +-type file_info() :: {file_size(), file_name_s()}. +-type file_name() :: binary() | list(). +-type file_name_s() :: binary(). % server reply +-type file_offset() :: non_neg_integer(). +-type file_size() :: non_neg_integer(). +-type file_prefix() :: binary() | list(). +-type inet_host() :: inet:ip_address() | inet:hostname(). +-type inet_port() :: inet:port_number(). +-type port_wrap() :: {w,atom(),term()}. +-type projection() :: #projection_v1{}. +-type projection_type() :: 'public' | 'private'. + +-export_type([epoch_id/0]). + +%% @doc Append a chunk (binary- or iolist-style) of data to a file +%% with `Prefix'. + +-spec append_chunk(port_wrap(), epoch_id(), file_prefix(), chunk()) -> + {ok, chunk_pos()} | {error, error_general()} | {error, term()}. +append_chunk(Sock, EpochID, Prefix, Chunk) -> + append_chunk_extra(Sock, EpochID, Prefix, Chunk, 0). + +%% @doc Append a chunk (binary- or iolist-style) of data to a file +%% with `Prefix'. + +-spec append_chunk(inet_host(), inet_port(), + epoch_id(), file_prefix(), chunk()) -> + {ok, chunk_pos()} | {error, error_general()} | {error, term()}. +append_chunk(_Host, _TcpPort, EpochID, Prefix, Chunk) -> + Sock = connect(#p_srvr{proto_mod=?MODULE}), + try + append_chunk_extra(Sock, EpochID, Prefix, Chunk, 0) + after + disconnect(Sock) + end. + +%% @doc Append a chunk (binary- or iolist-style) of data to a file +%% with `Prefix' and also request an additional `Extra' bytes. +%% +%% For example, if the `Chunk' size is 1 KByte and `Extra' is 4K Bytes, then +%% the file offsets that follow `Chunk''s position for the following 4K will +%% be reserved by the file sequencer for later write(s) by the +%% `write_chunk()' API. + +-spec append_chunk_extra(port_wrap(), epoch_id(), file_prefix(), chunk(), chunk_size()) -> + {ok, chunk_pos()} | {error, error_general()} | {error, term()}. +append_chunk_extra(#yessir{name=Name,start_bin=StartBin}, + _EpochID, Prefix, Chunk, ChunkExtra) + when is_integer(ChunkExtra), ChunkExtra >= 0 -> + File = list_to_binary([Prefix, $/, StartBin]), + Pos = case get({Name,offset,File}) of + undefined -> ?MINIMUM_OFFSET; + N -> N + end, + put({Name,offset,File}, Pos + size(Chunk) + ChunkExtra), + {ok, {File, Pos}}. + +%% @doc Append a chunk (binary- or iolist-style) of data to a file +%% with `Prefix' and also request an additional `Extra' bytes. +%% +%% For example, if the `Chunk' size is 1 KByte and `Extra' is 4K Bytes, then +%% the file offsets that follow `Chunk''s position for the following 4K will +%% be reserved by the file sequencer for later write(s) by the +%% `write_chunk()' API. + +-spec append_chunk_extra(inet_host(), inet_port(), + epoch_id(), file_prefix(), chunk(), chunk_size()) -> + {ok, chunk_pos()} | {error, error_general()} | {error, term()}. +append_chunk_extra(_Host, _TcpPort, EpochID, Prefix, Chunk, ChunkExtra) + when is_integer(ChunkExtra), ChunkExtra >= 0 -> + Sock = connect(#p_srvr{proto_mod=?MODULE}), + try + append_chunk_extra(Sock, EpochID, Prefix, Chunk, ChunkExtra) + after + disconnect(Sock) + end. + +%% @doc Read a chunk of data of size `Size' from `File' at `Offset'. + +-spec read_chunk(port_wrap(), epoch_id(), file_name(), file_offset(), chunk_size()) -> + {ok, chunk_s()} | + {error, error_general() | 'no_such_file' | 'partial_read'} | + {error, term()}. +read_chunk(#yessir{name=Name}, _EpochID, File, Offset, Size) + when Offset >= ?MINIMUM_OFFSET, Size >= 0 -> + case get({Name,offset,File}) of + undefined -> + {error, no_such_file}; + MaxOffset -> + if Offset > MaxOffset -> + {error, not_written}; + %% To be more accurate, we ought to include this clause. + %% But checksum_list() is a bit dumb and can list a byte + %% range that overlaps the end of the file. + %% Offset + Size > MaxOffset -> + %% {error, partial_read}; + true -> + Chunk = make_chunk(Name, Size), + {ok, Chunk} + end + end. + +make_chunk(Name, Size) -> + case get({Name,chunk,Size}) of + undefined -> + Byte = Size rem 253, + C = list_to_binary( + lists:duplicate(Size, Byte)), + put({Name,chunk,Size}, C), + C; + C -> + C + end. + +make_csum(Name,Size) -> + case get({Name,csum,Size}) of + undefined -> + C = crypto:hash(sha, make_chunk(Name, Size)), + put({Name,csum,Size}, C), + C; + C -> + C + end. + +%% @doc Read a chunk of data of size `Size' from `File' at `Offset'. + +-spec read_chunk(inet_host(), inet_port(), epoch_id(), + file_name(), file_offset(), chunk_size()) -> + {ok, chunk_s()} | + {error, error_general() | 'no_such_file' | 'partial_read'} | + {error, term()}. +read_chunk(_Host, _TcpPort, EpochID, File, Offset, Size) + when Offset >= ?MINIMUM_OFFSET, Size >= 0 -> + Sock = connect(#p_srvr{proto_mod=?MODULE}), + try + read_chunk(Sock, EpochID, File, Offset, Size) + after + disconnect(Sock) + end. + +%% @doc Fetch the list of chunk checksums for `File'. + +-spec checksum_list(port_wrap(), epoch_id(), file_name()) -> + {ok, [chunk_csum()]} | + {error, error_general() | 'no_such_file' | 'partial_read'} | + {error, term()}. +checksum_list(#yessir{name=Name,chunk_size=ChunkSize}, _EpochID, File) -> + case get({Name,offset,File}) of + undefined -> + {error, no_such_file}; + MaxOffset -> + CSum = make_csum(Name, ChunkSize), + Cs = [{Offset, ChunkSize, CSum} || + Offset <- lists:seq(?MINIMUM_OFFSET, MaxOffset, ChunkSize)], + {ok, Cs} + end. + +%% @doc Fetch the list of chunk checksums for `File'. + +-spec checksum_list(inet_host(), inet_port(), epoch_id(), file_name()) -> + {ok, [chunk_csum()]} | + {error, error_general() | 'no_such_file'} | {error, term()}. +checksum_list(_Host, _TcpPort, EpochID, File) -> + Sock = connect(#p_srvr{proto_mod=?MODULE}), + try + checksum_list(Sock, EpochID, File) + after + disconnect(Sock) + end. + +%% @doc Fetch the list of all files on the remote FLU. + +-spec list_files(port_wrap(), epoch_id()) -> + {ok, [file_info()]} | {error, term()}. +list_files(#yessir{name=Name}, _EpochID) -> + Files = [{Offset, File} || {{N,offset,File}, Offset} <- get(), N == Name], + {ok, Files}. + +%% @doc Fetch the list of all files on the remote FLU. + +-spec list_files(inet_host(), inet_port(), epoch_id()) -> + {ok, [file_info()]} | {error, term()}. +list_files(_Host, _TcpPort, EpochID) -> + Sock = connect(#p_srvr{proto_mod=?MODULE}), + try + list_files(Sock, EpochID) + after + disconnect(Sock) + end. + +%% @doc Fetch the wedge status from the remote FLU. + +-spec wedge_status(port_wrap()) -> + {ok, {boolean(), pv1_epoch()}} | {error, term()}. + +wedge_status(_Sock) -> + {ok, {false, ?DUMMY_PV1_EPOCH}}. + +%% @doc Fetch the wedge status from the remote FLU. + +-spec wedge_status(inet_host(), inet_port()) -> + {ok, {boolean(), pv1_epoch()}} | {error, term()}. +wedge_status(_Host, _TcpPort) -> + Sock = connect(#p_srvr{proto_mod=?MODULE}), + try + wedge_status(Sock) + after + disconnect(Sock) + end. + +%% @doc Get the latest epoch number + checksum from the FLU's projection store. + +-spec get_latest_epoch(port_wrap(), projection_type()) -> + {ok, epoch_id()} | {error, term()}. +get_latest_epoch(Sock, ProjType) + when ProjType == 'public' orelse ProjType == 'private' -> + case read_latest_projection(Sock, ProjType) of + {ok, P} -> + {ok, {P#projection_v1.epoch_number, P#projection_v1.epoch_csum}}; + _ -> + {ok, {0, <<"no such checksum">>}} + end. + +%% @doc Get the latest epoch number + checksum from the FLU's projection store. + +-spec get_latest_epoch(inet_host(), inet_port(), + projection_type()) -> + {ok, epoch_id()} | {error, term()}. +get_latest_epoch(_Host, _TcpPort, ProjType) + when ProjType == 'public' orelse ProjType == 'private' -> + Sock = connect(#p_srvr{proto_mod=?MODULE}), + try + get_latest_epoch(Sock, ProjType) + after + disconnect(Sock) + end. + +%% @doc Get the latest projection from the FLU's projection store for `ProjType' + +-spec read_latest_projection(port_wrap(), projection_type()) -> + {ok, projection()} | {error, not_written} | {error, term()}. +read_latest_projection(#yessir{name=Name}, ProjType) + when ProjType == 'public' orelse ProjType == 'private' -> + Ps = [P || {{N,proj,PT,_Epoch}, P} <- get(), N == Name, PT == ProjType], + case (catch lists:last(lists:sort(Ps))) of + P when is_record(P, projection_v1) -> + {ok, P}; + _ -> + {ok, #projection_v1{epoch_number=0,epoch_csum= <<"yo">>, + author_server=zzya, + all_members=[],creation_time=now(),upi=[], + repairing=[],down=[],dbg=[],dbg2=[], + members_dict=[]}} + end. + +%% @doc Get the latest projection from the FLU's projection store for `ProjType' + +-spec read_latest_projection(inet_host(), inet_port(), + projection_type()) -> + {ok, projection()} | {error, not_written} | {error, term()}. +read_latest_projection(_Host, _TcpPort, ProjType) + when ProjType == 'public' orelse ProjType == 'private' -> + Sock = connect(#p_srvr{proto_mod=?MODULE}), + try + read_latest_projection(Sock, ProjType) + after + disconnect(Sock) + end. + +%% @doc Read a projection `Proj' of type `ProjType'. + +-spec read_projection(port_wrap(), projection_type(), epoch_num()) -> + {ok, projection()} | {error, not_written} | {error, term()}. +read_projection(#yessir{name=Name}, ProjType, Epoch) + when ProjType == 'public' orelse ProjType == 'private' -> + case get({Name,proj,ProjType,Epoch}) of + undefined -> + {error, not_written}; + P when is_record(P, projection_v1) -> + {ok, P} + end. + +%% @doc Read a projection `Proj' of type `ProjType'. + +-spec read_projection(inet_host(), inet_port(), + projection_type(), epoch_num()) -> + {ok, projection()} | {error, written} | {error, term()}. +read_projection(_Host, _TcpPort, ProjType, Epoch) + when ProjType == 'public' orelse ProjType == 'private' -> + Sock = connect(#p_srvr{proto_mod=?MODULE}), + try + read_projection(Sock, ProjType, Epoch) + after + disconnect(Sock) + end. + +%% @doc Write a projection `Proj' of type `ProjType'. + +-spec write_projection(port_wrap(), projection_type(), projection()) -> + 'ok' | {error, written} | {error, term()}. +write_projection(#yessir{name=Name}=Sock, ProjType, Proj) + when ProjType == 'public' orelse ProjType == 'private', + is_record(Proj, projection_v1) -> + Epoch = Proj#projection_v1.epoch_number, + case read_projection(Sock, ProjType, Epoch) of + {error, not_written} -> + put({Name,proj,ProjType,Epoch}, Proj), + ok; + {ok, _} -> + {error, written} + end. + +%% @doc Write a projection `Proj' of type `ProjType'. + +-spec write_projection(inet_host(), inet_port(), + projection_type(), projection()) -> + 'ok' | {error, written} | {error, term()}. +write_projection(_Host, _TcpPort, ProjType, Proj) + when ProjType == 'public' orelse ProjType == 'private', + is_record(Proj, projection_v1) -> + Sock = connect(#p_srvr{proto_mod=?MODULE}), + try + write_projection(Sock, ProjType, Proj) + after + disconnect(Sock) + end. + +%% @doc Get all projections from the FLU's projection store. + +-spec get_all_projections(port_wrap(), projection_type()) -> + {ok, [projection()]} | {error, term()}. +get_all_projections(#yessir{name=Name}, ProjType) + when ProjType == 'public' orelse ProjType == 'private' -> + Ps = [Proj || {{N,proj,PT,_}, Proj} <- get(), N == Name, PT == ProjType], + {ok, lists:sort(Ps)}. + +%% @doc Get all projections from the FLU's projection store. + +-spec get_all_projections(inet_host(), inet_port(), + projection_type()) -> + {ok, [projection()]} | {error, term()}. +get_all_projections(_Host, _TcpPort, ProjType) + when ProjType == 'public' orelse ProjType == 'private' -> + Sock = connect(#p_srvr{proto_mod=?MODULE}), + try + get_all_projections(Sock, ProjType) + after + disconnect(Sock) + end. + +%% @doc Get all epoch numbers from the FLU's projection store. + +-spec list_all_projections(port_wrap(), projection_type()) -> + {ok, [non_neg_integer()]} | {error, term()}. +list_all_projections(Sock, ProjType) + when ProjType == 'public' orelse ProjType == 'private' -> + case get_all_projections(Sock, ProjType) of + {ok, Ps} -> + {ok, [P#projection_v1.epoch_number || P <- Ps]}; + _ -> + {error, not_written} + end. + +%% @doc Get all epoch numbers from the FLU's projection store. + +-spec list_all_projections(inet_host(), inet_port(), + projection_type()) -> + {ok, [non_neg_integer()]} | {error, term()}. +list_all_projections(_Host, _TcpPort, ProjType) + when ProjType == 'public' orelse ProjType == 'private' -> + Sock = connect(#p_srvr{proto_mod=?MODULE}), + try + list_all_projections(Sock, ProjType) + after + disconnect(Sock) + end. + +%% @doc Quit & close the connection to remote FLU. + +-spec quit(port_wrap()) -> + ok. +quit(_) -> + ok. + +%%%%%%%%%%%%%%%%%%%%%%%%%%% + +%% @doc Restricted API: Write a chunk of already-sequenced data to +%% `File' at `Offset'. + +-spec write_chunk(port_wrap(), epoch_id(), file_name(), file_offset(), chunk()) -> + ok | {error, error_general()} | {error, term()}. +write_chunk(#yessir{name=Name}, _EpochID, File, Offset, Chunk) + when Offset >= ?MINIMUM_OFFSET -> + Pos = case get({Name,offset,File}) of + undefined -> Offset; + N -> erlang:max(N + size(Chunk), Offset) + end, + put({Name,offset,File}, Pos), + ok. + +%% @doc Restricted API: Write a chunk of already-sequenced data to +%% `File' at `Offset'. + +-spec write_chunk(inet_host(), inet_port(), + epoch_id(), file_name(), file_offset(), chunk()) -> + ok | {error, error_general()} | {error, term()}. +write_chunk(_Host, _TcpPort, EpochID, File, Offset, Chunk) + when Offset >= ?MINIMUM_OFFSET -> + Sock = connect(#p_srvr{proto_mod=?MODULE}), + try + write_chunk(Sock, EpochID, File, Offset, Chunk) + after + disconnect(Sock) + end. + +%% @doc Restricted API: Delete a file after it has been successfully +%% migrated. + +-spec delete_migration(port_wrap(), epoch_id(), file_name()) -> + ok | {error, error_general() | 'no_such_file'} | {error, term()}. +delete_migration(#yessir{name=Name}, _EpochID, File) -> + case get({Name,offset,File}) of + undefined -> + {error, no_such_file}; + _N -> + erase({name,offset,File}), + ok + end. + +%% @doc Restricted API: Delete a file after it has been successfully +%% migrated. + +-spec delete_migration(inet_host(), inet_port(), epoch_id(), file_name()) -> + ok | {error, error_general() | 'no_such_file'} | {error, term()}. +delete_migration(_Host, _TcpPort, EpochID, File) -> + Sock = connect(#p_srvr{proto_mod=?MODULE}), + try + delete_migration(Sock, EpochID, File) + after + disconnect(Sock) + end. + +%% @doc Restricted API: Truncate a file after it has been successfully +%% erasure coded. + +-spec trunc_hack(port_wrap(), epoch_id(), file_name()) -> + ok | {error, error_general() | 'no_such_file'} | {error, term()}. +trunc_hack(#yessir{name=Name}, _EpochID, File) -> + put({Name,offset,File}, ?MINIMUM_OFFSET). + +%% @doc Restricted API: Truncate a file after it has been successfully +%% erasure coded. + +-spec trunc_hack(inet_host(), inet_port(), epoch_id(), file_name()) -> + ok | {error, error_general() | 'no_such_file'} | {error, term()}. +trunc_hack(_Host, _TcpPort, EpochID, File) -> + Sock = connect(#p_srvr{proto_mod=?MODULE}), + try + trunc_hack(Sock, EpochID, File) + after + disconnect(Sock) + end. + +%%%%%%%%%%%%%%%%%%%%%%%%%%% + +connected_p(_) -> + true. + +connect(#p_srvr{name=Name, props=Props})-> + Now = os:timestamp(), + StartBin = proplists:get_value(file_suffix, Props, + list_to_binary(io_lib:format("~w", [Now]))), + NumFiles=proplists:get_value(num_files, Props, 12), + FileSize=proplists:get_value(file_size, Props, 50*1024*1024), + ChunkSize=proplists:get_value(chunk_size, Props, 64*1024), + Sock = #yessir{name=Name, + start=Now, + start_bin=StartBin, + num_files=NumFiles, + file_size=FileSize, + chunk_size=ChunkSize + }, + %% Add fake dict entries for these files + [begin + Prefix = list_to_binary(io_lib:format("fake~w", [X])), + {ok, _} = append_chunk_extra(Sock, unused, Prefix, <<>>, FileSize) + end || X <- lists:seq(1, NumFiles)], + + Sock. + +disconnect(#yessir{name=Name}) -> + [erase(K) || {{N,offset,_}=K, _V} <- get(), N == Name], + [erase(K) || {{N,chunk,_}=K, _V} <- get(), N == Name], + [erase(K) || {{N,csum,_}=K, _V} <- get(), N == Name], + [erase(K) || {{N,proj,_,_}=K, _V} <- get(), N == Name], + ok. + +%% Example use: + +%% application:ensure_all_started(machi). +%% machi_flu_psup:start_flu_package(a, 4444, "./data.a", []). +%% D = [{a,{p_srvr,a,machi_yessir_client,x,x, +%% [{file_suffix,<<"yo">>},{num_files,7}]}}, +%% {b,{p_srvr,b,machi_yessir_client,x,x, +%% [{file_suffix,<<"yo">>},{num_files,20}]}}]. +%% machi_chain_manager1:set_chain_members(a_chmgr, D). + + +%% =INFO REPORT==== 17-May-2015::18:57:47 === +%% Chain manager a found latest public projection 0 has author zzya not a member of our members list [a,b]. Please check chain membership on this rogue chain manager zzya. +%% ok +%% 5> +%% =INFO REPORT==== 17-May-2015::18:57:51 === +%% Repair start: tail a of [a] -> [b], ap_mode ID {a,{1431,856671,140404}} +%% MissingFileSummary [{<<"fake1/yo">>,{52429824,[]}}, +%% {<<"fake10/yo">>,{52429824,[a]}}, +%% {<<"fake11/yo">>,{52429824,[a]}}, +%% {<<"fake12/yo">>,{52429824,[a]}}, +%% {<<"fake13/yo">>,{52429824,[a]}}, +%% {<<"fake14/yo">>,{52429824,[a]}}, +%% {<<"fake15/yo">>,{52429824,[a]}}, +%% {<<"fake16/yo">>,{52429824,[a]}}, +%% {<<"fake17/yo">>,{52429824,[a]}}, +%% {<<"fake18/yo">>,{52429824,[a]}}, +%% {<<"fake19/yo">>,{52429824,[a]}}, +%% {<<"fake2/yo">>,{52429824,[]}}, +%% {<<"fake20/yo">>,{52429824,[a]}}, +%% {<<"fake3/yo">>,{52429824,[]}}, +%% {<<"fake4/yo">>,{52429824,[]}}, +%% {<<"fake5/yo">>,{52429824,[]}}, +%% {<<"fake6/yo">>,{52429824,[]}}, +%% {<<"fake7/yo">>,{52429824,[]}}, +%% {<<"fake8/yo">>,{52429824,[a]}}, +%% {<<"fake9/yo">>,{52429824,[a]}}] +%% Make repair directives: .................... done +%% Out-of-sync data for FLU a: 650.8 MBytes +%% Out-of-sync data for FLU b: 0.0 MBytes +%% Execute repair directives: ..................................................................................................................... done + +%% =INFO REPORT==== 17-May-2015::18:57:52 === +%% Repair success: tail a of [a] finished ap_mode repair ID {a,{1431,856671,140404}}: ok +%% Stats [{t_in_files,0},{t_in_chunks,10413},{t_in_bytes,682426368},{t_out_files,0},{t_out_chunks,10413},{t_out_bytes,682426368},{t_bad_chunks,0},{t_elapsed_seconds,1.591}] diff --git a/test/machi_admin_util_test.erl b/test/machi_admin_util_test.erl index 09d4717..9ce952f 100644 --- a/test/machi_admin_util_test.erl +++ b/test/machi_admin_util_test.erl @@ -38,7 +38,7 @@ verify_file_checksums_test() -> W_props = [{initial_wedged, false}], FLU1 = machi_flu1_test:setup_test_flu(verify1_flu, TcpPort, DataDir, W_props), - Sock1 = machi_util:connect(Host, TcpPort), + Sock1 = ?FLU_C:connect(#p_srvr{address=Host, port=TcpPort}), try Prefix = <<"verify_prefix">>, [{ok, _} = ?FLU_C:append_chunk(Sock1, ?DUMMY_PV1_EPOCH, diff --git a/test/machi_chain_manager1_test.erl b/test/machi_chain_manager1_test.erl index c51559d..ef9f92f 100644 --- a/test/machi_chain_manager1_test.erl +++ b/test/machi_chain_manager1_test.erl @@ -140,7 +140,7 @@ smoke0_test() -> Host = "localhost", TcpPort = 6623, {ok, FLUa} = machi_flu1:start_link([{a,TcpPort,"./data.a"}]), - Pa = #p_srvr{name=a, proto=ipv4, address=Host, port=TcpPort}, + Pa = #p_srvr{name=a, address=Host, port=TcpPort}, Members_Dict = machi_projection:make_members_dict([Pa]), %% Egadz, more racing on startup, yay. TODO fix. timer:sleep(1), diff --git a/test/machi_flu1_test.erl b/test/machi_flu1_test.erl index b66dcc3..48e0831 100644 --- a/test/machi_flu1_test.erl +++ b/test/machi_flu1_test.erl @@ -87,6 +87,23 @@ flu_smoke_test() -> ?DUMMY_PV1_EPOCH, File1, Off1, Len1*984), + {ok, {Off1b,Len1b,File1b}} = ?FLU_C:append_chunk(Host, TcpPort, + ?DUMMY_PV1_EPOCH, + Prefix, Chunk1), + Extra = 42, + {ok, {Off1c,Len1c,File1c}} = ?FLU_C:append_chunk_extra(Host, TcpPort, + ?DUMMY_PV1_EPOCH, + Prefix, Chunk1, Extra), + {ok, {Off1d,Len1d,File1d}} = ?FLU_C:append_chunk(Host, TcpPort, + ?DUMMY_PV1_EPOCH, + Prefix, Chunk1), + if File1b == File1c, File1c == File1d -> + true = (Off1c == Off1b + Len1b), + true = (Off1d == Off1c + Len1c + Extra); + true -> + exit(not_mandatory_but_test_expected_same_file_fixme) + end, + Chunk2 = <<"yo yo">>, Len2 = byte_size(Chunk2), Off2 = ?MINIMUM_OFFSET + 77, @@ -132,7 +149,7 @@ flu_projection_smoke_test() -> FLU1 = setup_test_flu(projection_test_flu, TcpPort, DataDir), try [begin - {ok, {-1,_}} = ?FLU_C:get_latest_epoch(Host, TcpPort, T), + {ok, {0,_}} = ?FLU_C:get_latest_epoch(Host, TcpPort, T), {error, not_written} = ?FLU_C:read_latest_projection(Host, TcpPort, T), {ok, []} = ?FLU_C:list_all_projections(Host, TcpPort, T), diff --git a/test/machi_flu_psup_test.erl b/test/machi_flu_psup_test.erl index 94ecf69..e47691a 100644 --- a/test/machi_flu_psup_test.erl +++ b/test/machi_flu_psup_test.erl @@ -74,10 +74,11 @@ partial_stop_restart2() -> Dict = orddict:from_list(Ps), [os:cmd("rm -rf " ++ P#p_srvr.props) || {_,P} <- Ps], {ok, SupPid} = machi_flu_sup:start_link(), + DbgProps = [{dbg, [{initial_wedged, true}]}], Start = fun({_,P}) -> #p_srvr{name=Name, port=Port, props=Dir} = P, {ok, _} = machi_flu_psup:start_flu_package( - Name, Port, Dir, [{active_mode,false}]) + Name, Port, Dir, [{active_mode,false}|DbgProps]) end, WedgeStatus = fun({_,#p_srvr{address=Addr, port=TcpPort}}) -> machi_flu1_client:wedge_status(Addr, TcpPort) diff --git a/test/machi_proxy_flu1_client_test.erl b/test/machi_proxy_flu1_client_test.erl index 624856e..4b0c7c7 100644 --- a/test/machi_proxy_flu1_client_test.erl +++ b/test/machi_proxy_flu1_client_test.erl @@ -39,10 +39,10 @@ api_smoke_test() -> erase(flu_pid), try - I = #p_srvr{name=RegName, proto=ipv4, address=Host, port=TcpPort}, + I = #p_srvr{name=RegName, address=Host, port=TcpPort}, {ok, Prox1} = ?MUT:start_link(I), try - FakeEpoch = {-1, <<0:(20*8)/big>>}, + FakeEpoch = ?DUMMY_PV1_EPOCH, [{ok, {_,_,_}} = ?MUT:append_chunk(Prox1, FakeEpoch, <<"prefix">>, <<"data">>, infinity) || _ <- lists:seq(1,5)], @@ -63,6 +63,11 @@ api_smoke_test() -> ?MUT:append_chunk(Prox1, FakeEpoch, <<"prefix">>, MyChunk, infinity), {ok, MyChunk} = ?MUT:read_chunk(Prox1, FakeEpoch, MyFile, MyOff, MySize), + MyChunk2 = <<"my chunk data, yeah, again">>, + {ok, {MyOff2,MySize2,MyFile2}} = + ?MUT:append_chunk_extra(Prox1, FakeEpoch, <<"prefix">>, + MyChunk2, 4242, infinity), + {ok, MyChunk2} = ?MUT:read_chunk(Prox1, FakeEpoch, MyFile2, MyOff2, MySize2), %% Alright, now for the rest of the API, whee BadFile = <<"no-such-file">>,